Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions tower/src/buffer/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Error types
//! Error types for the `Buffer` middleware.

use std::{fmt, sync::Arc};

Expand All @@ -8,8 +8,7 @@ pub struct ServiceError {
inner: Arc<Error>,
}

/// An error when the buffer's worker closes unexpectedly.
#[derive(Debug)]
/// An error produced when the a buffer's worker closes unexpectedly.
pub struct Closed {
_p: (),
}
Expand All @@ -25,7 +24,7 @@ impl ServiceError {
ServiceError { inner }
}

/// Private to avoid exposing `Clone` trait as part of the public API
// Private to avoid exposing `Clone` trait as part of the public API
pub(crate) fn clone(&self) -> ServiceError {
ServiceError {
inner: self.inner.clone(),
Expand Down Expand Up @@ -53,6 +52,12 @@ impl Closed {
}
}

impl fmt::Debug for Closed {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_tuple("Closed").finish()
}
}

impl fmt::Display for Closed {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.write_str("buffer's worker closed unexpectedly")
Expand Down
4 changes: 2 additions & 2 deletions tower/src/buffer/future.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Future types
//! Future types for the `Buffer` middleware.

use super::{
error::{Closed, Error},
Expand All @@ -12,7 +12,7 @@ use std::{
task::{Context, Poll},
};

/// Future eventually completed with the response to the original request.
/// Future that completes when the buffered service eventually services the submitted request.
#[pin_project]
#[derive(Debug)]
pub struct ResponseFuture<T> {
Expand Down
22 changes: 20 additions & 2 deletions tower/src/buffer/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,32 @@ use std::{fmt, marker::PhantomData};
use tower_layer::Layer;
use tower_service::Service;

/// Buffer requests with a bounded buffer
/// Adds an mpsc buffer in front of an inner service.
///
/// The default Tokio executor is used to run the given service,
/// which means that this layer can only be used on the Tokio runtime.
///
/// See the module documentation for more details.
pub struct BufferLayer<Request> {
bound: usize,
_p: PhantomData<fn(Request)>,
}

impl<Request> BufferLayer<Request> {
/// Create a new `BufferLayer` with the provided `bound`.
/// Creates a new `BufferLayer` with the provided `bound`.
///
/// `bound` gives the maximal number of requests that can be queued for the service before
/// backpressure is applied to callers.
///
/// # A note on choosing a `bound`
///
/// When `Buffer`'s implementation of `poll_ready` returns `Poll::Ready`, it reserves a
/// slot in the channel for the forthcoming `call()`. However, if this call doesn't arrive,
/// this reserved slot may be held up for a long time. As a result, it's advisable to set
/// `bound` to be at least the maximum number of concurrent requests the `Buffer` will see.
/// If you do not, all the slots in the buffer may be held up by futures that have just called
/// `poll_ready` but will not issue a `call`, which prevents other senders from issuing new
/// requests.
pub fn new(bound: usize) -> Self {
BufferLayer {
bound,
Expand Down
39 changes: 34 additions & 5 deletions tower/src/buffer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,38 @@
//! Buffer requests when the inner service is out of capacity.
//! Middleware that provides a buffered mpsc channel to a service.
//!
//! Buffering works by spawning a new task that is dedicated to pulling requests
//! out of the buffer and dispatching them to the inner service. By adding a
//! buffer and a dedicated task, the `Buffer` layer in front of the service can
//! be `Clone` even if the inner service is not.
//! Sometimes you want to give out multiple handles to a single service, and allow each handle to
//! enqueue requests. That is, you want a `Service` to be `Clone`. This module allows you to do
//! that by placing the service behind a multi-producer, single-consumer buffering channel. Clients
//! enqueue requests by sending on the channel from any of the handles ([`Buffer`]), and the single
//! service running elsewhere (usually spawned) receives and services the requests one by one. Each
//! request is enqueued alongside a response channel that allows the service to report the result
//! of the request back to the caller.
//!
//! # Examples
//!
//! ```rust
//! # #[cfg(feature = "util")]
//! use tower::buffer::Buffer;
//! # #[cfg(feature = "util")]
//! use tower::{Service, ServiceExt};
//! # #[cfg(feature = "util")]
//! async fn mass_produce<S: Service<usize>>(svc: S)
//! where
//! S: 'static + Send,
//! S::Error: Send + Sync + std::error::Error,
//! S::Future: Send
//! {
//! let svc = Buffer::new(svc, 10 /* buffer length */);
//! for _ in 0..10 {
//! let mut svc = svc.clone();
//! tokio::spawn(async move {
//! for i in 0usize.. {
//! svc.ready_and().await.expect("service crashed").call(i).await;
//! }
//! });
//! }
//! }
//! ```

pub mod error;
pub mod future;
Expand Down
12 changes: 8 additions & 4 deletions tower/src/buffer/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ use std::task::{Context, Poll};
use tokio::sync::{mpsc, oneshot};
use tower_service::Service;

/// Adds a buffer in front of an inner service.
/// Adds an mpsc buffer in front of an inner service.
///
/// See crate level documentation for more details.
/// See the module documentation for more details.
#[derive(Debug)]
pub struct Buffer<T, Request>
where
Expand All @@ -35,11 +35,15 @@ where
/// The default Tokio executor is used to run the given service, which means that this method
/// must be called while on the Tokio runtime.
///
/// # Note on setting `bound`
/// # A note on choosing a `bound`
///
/// When `Buffer`'s implementation of `poll_ready` returns `Poll::Ready`, it reserves a
/// slot in the channel for the forthcoming `call()`. However, if this call doesn't arrive,
/// this reserved slot may be held up for a long time. As a result, it's advisable to set
/// `bound` to be at least the maximum number of concurrent requests the `Buffer` will see.
/// If you do not, all the slots in the buffer may be held up by futures that have just called
/// `poll_ready` but will not issue a `call`, which prevents other senders from issuing new
/// requests.
pub fn new(service: T, bound: usize) -> Self
where
T: Send + 'static,
Expand All @@ -53,7 +57,7 @@ where
Buffer { tx, handle }
}

/// Creates a new `Buffer` wrapping `service` but returns the background worker.
/// Creates a new `Buffer` wrapping `service`, but returns the background worker.
///
/// This is useful if you do not want to spawn directly onto the `tokio` runtime
/// but instead want to use your own executor. This will return the `Buffer` and
Expand Down
2 changes: 1 addition & 1 deletion tower/src/discover/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use std::{
/// [`Change`]s are returned which provide unique identifiers
/// for the services.
///
/// See crate documentation for more details.
/// See the module documentation for more details.
pub trait Discover: Sealed<Change<(), ()>> {
/// A unique identifier for each active service.
///
Expand Down