Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 25 additions & 28 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,19 +282,21 @@ impl<State: Send + Sync + 'static> Server<State> {
}

#[cfg(feature = "h1-server")]
async fn handle_tcp(&self, stream: async_std::net::TcpStream) -> crate::Result<()> {
let this = self.clone();
fn handle_tcp(self, stream: async_std::net::TcpStream) {
let local_addr = stream.local_addr().ok();
let peer_addr = stream.peer_addr().ok();
task::spawn(async move {
async_h1::accept(stream, |mut req| async {
let result = async_h1::accept(stream, |mut req| async {
req.set_local_addr(local_addr);
req.set_peer_addr(peer_addr);
this.respond(req).await
self.respond(req).await
})
.await
})
.await
.await;

if let Err(error) = result {
log::error!("async-h1 error", { error: error.to_string() });
}
});
}

/// Asynchronously serve the app at the given address.
Expand All @@ -313,20 +315,16 @@ impl<State: Send + Sync + 'static> Server<State> {

let mut incoming = listener.incoming();
while let Some(stream) = incoming.next().await {
let stream = match stream {
match stream {
Err(ref e) if is_transient_error(e) => continue,
Err(error) => {
let delay = std::time::Duration::from_millis(500);
crate::log::error!("Error: {}. Pausing for {:?}.", error, delay);
task::sleep(delay).await;
continue;
}
Ok(s) => s,
Ok(stream) => self.clone().handle_tcp(stream),
};

if let Err(error) = self.handle_tcp(stream).await {
log::error!("async-h1 error", { error: error.to_string() });
}
}

Ok(())
Expand All @@ -352,38 +350,37 @@ impl<State: Send + Sync + 'static> Server<State> {

let mut incoming = listener.incoming();
while let Some(stream) = incoming.next().await {
let stream = match stream {
match stream {
Err(ref e) if is_transient_error(e) => continue,
Err(error) => {
let delay = std::time::Duration::from_millis(500);
crate::log::error!("Error: {}. Pausing for {:?}.", error, delay);
task::sleep(delay).await;
continue;
}
Ok(s) => s,
Ok(stream) => self.clone().handle_unix(stream),
};

if let Err(error) = self.handle_unix(stream).await {
log::error!("async-h1 error", { error: error.to_string() });
}
}
Ok(())
}

#[cfg(all(feature = "h1-server", unix))]
async fn handle_unix(&self, stream: async_std::os::unix::net::UnixStream) -> crate::Result<()> {
let this = self.clone();
let local_addr = stream.local_addr().ok().map(|addr| format!("{:?}", addr));
let peer_addr = stream.peer_addr().ok().map(|addr| format!("{:?}", addr));
fn handle_unix(self, stream: async_std::os::unix::net::UnixStream) {
task::spawn(async move {
async_h1::accept(stream, |mut req| async {
let local_addr = stream.local_addr().ok().map(|addr| format!("{:?}", addr));
let peer_addr = stream.peer_addr().ok().map(|addr| format!("{:?}", addr));

let result = async_h1::accept(stream, |mut req| async {
req.set_local_addr(local_addr.as_ref());
req.set_peer_addr(peer_addr.as_ref());
this.respond(req).await
self.respond(req).await
})
.await
})
.await
.await;

if let Err(error) = result {
log::error!("async-h1 error", { error: error.to_string() });
}
});
}

/// Respond to a `Request` with a `Response`.
Expand Down