Skip to content

Commit

Permalink
fix: if a subscriber gets backed up, hang up on them
Browse files Browse the repository at this point in the history
  • Loading branch information
cablehead committed Sep 24, 2024
1 parent 845e857 commit f9a8a1f
Showing 1 changed file with 9 additions and 9 deletions.
18 changes: 9 additions & 9 deletions src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@ use serde::{Deserialize, Deserializer, Serialize};

use fjall::{Config, Keyspace, PartitionCreateOptions, PartitionHandle};

#[derive(Debug)]
pub enum SendError {
ChannelClosed(tokio::sync::mpsc::error::TrySendError<Frame>),
LimitReached,
LockError,
}

#[derive(Debug)]
struct LimitedSender {
tx: Option<tokio::sync::mpsc::Sender<Frame>>,
Expand All @@ -26,7 +33,7 @@ impl LimitedSender {

fn send(&mut self, frame: Frame) -> Result<(), SendError> {
if let Some(tx) = &self.tx {
match tx.blocking_send(frame) {
match tx.try_send(frame) {
Ok(()) => {
if let Some(remaining) = &mut self.remaining {
*remaining -= 1;
Expand All @@ -44,7 +51,7 @@ impl LimitedSender {
}
} else {
Err(SendError::ChannelClosed(
tokio::sync::mpsc::error::SendError(frame),
tokio::sync::mpsc::error::SendError(frame).into(),
))
}
}
Expand All @@ -67,13 +74,6 @@ impl SharedLimitedSender {
}
}

#[derive(Debug)]
pub enum SendError {
ChannelClosed(tokio::sync::mpsc::error::SendError<Frame>),
LimitReached,
LockError,
}

#[derive(PartialEq, Debug, Serialize, Deserialize, Clone, Default, bon::Builder)]
#[builder(start_fn = with_topic)]
pub struct Frame {
Expand Down

0 comments on commit f9a8a1f

Please sign in to comment.