Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement drain functionality, initial untested commit, DRAFT ONLY #1326

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions async-nats/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,22 @@ impl Client {
Ok(())
}

/// Drains all subscriptions, stops any new messages from being published, and flushes any remaining
/// messages, then closes the connection
///
/// # Examples
/// TODO
pub async fn drain(&self) -> Result<(), DrainError> {
// Drain all subscriptions
self.sender.send(Command::Drain { sid: None }).await?;

// Ensure any outgoing messages are flushed
self.flush().await?;

// Remaining process is handled on the handler-side
Ok(())
}

/// Returns the current state of the connection.
///
/// # Examples
Expand Down Expand Up @@ -793,6 +809,22 @@ impl From<tokio::sync::mpsc::error::SendError<Command>> for SubscribeError {
}
}

#[derive(Error, Debug)]
#[error("failed to send drain: {0}")]
pub struct DrainError(#[source] crate::Error);

impl From<tokio::sync::mpsc::error::SendError<Command>> for DrainError {
fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
DrainError(Box::new(err))
}
}

impl From<FlushError> for DrainError {
fn from(err: FlushError) -> Self {
DrainError(Box::new(err))
}
}

#[derive(Clone, Copy, Debug, PartialEq)]
pub enum RequestErrorKind {
/// There are services listening on requested subject, but they didn't respond
Expand Down
81 changes: 81 additions & 0 deletions async-nats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,9 @@ pub(crate) enum Command {
Flush {
observer: oneshot::Sender<()>,
},
Drain {
sid: Option<u64>,
},
Reconnect,
}

Expand Down Expand Up @@ -411,6 +414,7 @@ struct Subscription {
queue_group: Option<String>,
delivered: u64,
max: Option<u64>,
is_draining: bool,
}

#[derive(Debug)]
Expand All @@ -431,6 +435,7 @@ pub(crate) struct ConnectionHandler {
ping_interval: Interval,
should_reconnect: bool,
flush_observers: Vec<oneshot::Sender<()>>,
is_draining: bool,
}

impl ConnectionHandler {
Expand All @@ -453,6 +458,7 @@ impl ConnectionHandler {
ping_interval,
should_reconnect: false,
flush_observers: Vec::new(),
is_draining: false,
}
}

Expand Down Expand Up @@ -532,6 +538,19 @@ impl ConnectionHandler {
}
}

// Before handling any commands, drop any subscriptions which are draining
// Note: safe to assume drain has completed, as we would have flushed all outgoing
// UNSUB messages in the previous call to this fn, and we would have processed and delivered
// any remaining messages to the subscription in the loop above.
self.handler.subscriptions.retain(|_, s| !s.is_draining);

if self.handler.is_draining {
// The entire connection is draining. This means we flushed outgoing messages and all subs
// were drained by the above retain and we should exit instead of processing any further
// messages
return Poll::Ready(ExitReason::Closed);
}

// WARNING: after the following loop `handle_command`,
// or other functions which call `enqueue_write_op`,
// cannot be called anymore. Runtime wakeups won't
Expand Down Expand Up @@ -777,6 +796,25 @@ impl ConnectionHandler {
Command::Flush { observer } => {
self.flush_observers.push(observer);
}
Command::Drain { sid } => {
let mut drain_sub = |sid: &u64, sub: &mut Subscription| {
sub.is_draining = true;
self.connection.enqueue_write_op(&ClientOp::Unsubscribe {
sid: *sid,
max: None,
});
};

if let Some(sid) = sid {
if let Some(sub) = self.subscriptions.get_mut(&sid) {
drain_sub(&sid, sub);
}
} else {
for (sid, sub) in self.subscriptions.iter_mut() {
drain_sub(sid, sub);
}
}
}
Command::Subscribe {
sid,
subject,
Expand All @@ -789,6 +827,7 @@ impl ConnectionHandler {
max: None,
subject: subject.to_owned(),
queue_group: queue_group.to_owned(),
is_draining: false,
};

self.subscriptions.insert(sid, subscription);
Expand Down Expand Up @@ -1264,6 +1303,48 @@ impl Subscriber {
.await?;
Ok(())
}

/// Unsubscribes from subscription immediately leaves the stream open for the configured drain period
/// to allow any in-flight messages on the subscription to be delivered. The stream will be closed
/// at the end of the drain period
///
/// # Examples
/// ```
/// # use futures::StreamExt;
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// let client = async_nats::connect("demo.nats.io").await?;
///
/// let mut subscriber = client.subscribe("test").await?;
///
/// tokio::spawn({
/// let task_client = client.clone();
/// async move {
/// loop {
/// _ = task_client.publish("test", "data".into()).await;
/// }
/// }
/// });
///
/// client.flush().await?;
/// subscriber.drain().await?;
///
/// while let Some(message) = subscriber.next().await {
/// println!("message received: {:?}", message);
/// }
/// println!("no more messages, unsubscribed");
/// # Ok(())
/// # }
/// ```
pub async fn drain(&mut self) -> Result<(), UnsubscribeError> {
self.sender
.send(Command::Drain {
sid: Some(self.sid),
})
.await?;

Ok(())
}
}

#[derive(Error, Debug, PartialEq)]
Expand Down