Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancing OutputPort Backpressure Handling via RecvError::Lagged Management and Buffer Configurability #225

Open
dcadenas opened this issue Mar 29, 2024 · 6 comments

Comments

@dcadenas
Copy link
Contributor

I've noticed some silent drops on the receiver side under heavy load, mainly because there's no handling for RecvError::Lagged in this section. Adjusting the broadcast buffer size could help temporarily, but directly addressing backpressure could offer a more durable solution.

Proposals:

Dealing with RecvError::Lagged:

Implementing explicit handling for RecvError::Lagged when backpressure interrupts the recv loop could greatly improve our insight into and control over backpressure issues. Logging or aborting cleanly in these cases would make such events more transparent.

Using Sender::len for Proactive Management:

Checking the channel's capacity with Sender::len before sending allows for preemptive action to avoid overloading. This approach could enable smarter decision-making on sending operations, based on the current state of the channel.

Configurable Broadcast Buffer Size:

Allowing the buffer size to be adjusted externally could provide the flexibility needed to optimize for varying workloads, helping to manage backpressure more effectively.

Integrating these suggestions could enhance how backpressure is managed, leading to a system that’s more robust and easier to fine-tune.

@slawlor
Copy link
Owner

slawlor commented Mar 30, 2024

I just wanted to state I'm seeing these posts, and am thrilled with the progress. However a sick 5-month-old has prevented me from doing proper reviews here. Rest assured I will get to it, sorry for the delay

@dcadenas
Copy link
Contributor Author

No worries at all! Family always comes first, and I hope your little one gets well soon. I'm just grateful for the work you've put into the library and am looking forward to your insights whenever you have the time. Take care and thank you for the update!

@slawlor
Copy link
Owner

slawlor commented Apr 1, 2024

OK now I've actually had 20 minutes to read this!

I'm actually surprised this is an issue. In the OutputPort, the spawned task is just doing a recv() -> map() -> send() to the target actor, which has an unbounded buffer. Do you have a repro you can share of where this is missing messages under heavy load? (I understand if it's proprietary or something of course).

However if this is indeed a problem, it means the tokio scheduler is really busy that it can't do a simple dequeue of a port, then two blocking operations.

My only concern with pushing logic into the sender space, is that we effect QoS for all downstream targets for a single slow target, which kind of breaks the point of pub-sub models. Probably the correct approach (imho) is dealing with the LAGGED signal as you suggest.

@dcadenas
Copy link
Contributor Author

dcadenas commented Apr 1, 2024

My only concern with pushing logic into the sender space, is that we effect QoS for all downstream targets for a single slow target, which kind of breaks the point of pub-sub models. Probably the correct approach (imho) is dealing with the LAGGED signal as you suggest.

I hadn't considered that, which makes a lot of sense. Here's some code to reproduce the issue:

use std::time::Duration;

use tracing_subscriber::{fmt, prelude::*, EnvFilter};

use ractor::{Actor, ActorProcessingErr, ActorRef, OutputPort};

struct Counter;

struct CounterState {
    count: i64,
    output_port: OutputPort<i64>,
}

enum CounterMessage {
    Increment(i64),
    Subscribe(ActorRef<CounterMessage>),
    DoSomethingSlow(i64),
}

#[ractor::async_trait]
impl Actor for Counter {
    type Msg = CounterMessage;

    type State = CounterState;
    type Arguments = ();

    async fn pre_start(
        &self,
        _myself: ActorRef<Self::Msg>,
        _: (),
    ) -> Result<Self::State, ActorProcessingErr> {
        tracing::info!("Starting the counter actor");
        // create the initial state
        Ok(CounterState {
            count: 0,
            output_port: OutputPort::default(),
        })
    }

    async fn handle(
        &self,
        _myself: ActorRef<Self::Msg>,
        message: Self::Msg,
        state: &mut Self::State,
    ) -> Result<(), ActorProcessingErr> {
        match message {
            CounterMessage::Increment(how_much) => {
                if state.count < 5 {
                    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
                }

                if state.count == 5 {
                    tracing::info!("Producer speed up, to ensure that we fill the 10 items broadcast buffer, receiver loop will be broken from lagging messages and silently drop");
                }

                state.count += how_much;
                tracing::info!("Producer current count: {}", state.count);
                state.output_port.send(how_much);
            }
            CounterMessage::Subscribe(subscriber) => {
                state
                    .output_port
                    .subscribe(subscriber, |i64| Some(CounterMessage::DoSomethingSlow(i64)));
            }
            CounterMessage::DoSomethingSlow(value) => {
                tokio::time::sleep(std::time::Duration::from_secs(2)).await;
                state.count += value;
                tracing::info!("Consumer current count: {}", state.count);
            }
        }
        Ok(())
    }
}

fn init_logging() {
    tracing_subscriber::registry()
        .with(fmt::layer())
        .with(EnvFilter::from_default_env())
        .init();
}

#[tokio::main]
async fn main() {
    init_logging();

    let (actor, handle) = Actor::spawn(Some("test_name".to_string()), Counter, ())
        .await
        .expect("Failed to start actor!");

    let (actor_subscriber, handle_subscriber) =
        Actor::spawn(Some("test_subscriber".to_string()), Counter, ())
            .await
            .expect("Failed to start actor!");

    actor
        .send_message(CounterMessage::Subscribe(actor_subscriber.clone()))
        .expect("Failed to send message");

    // +5 +10 -5 a few times, printing the value via RPC
    for _i in 0..25 {
        actor
            .send_message(CounterMessage::Increment(1))
            .expect("Failed to send message");
    }

    tokio::spawn(async move {
        tokio::time::sleep(Duration::from_secs(25)).await;
        actor.stop(None);
        actor_subscriber.stop(None);
    });

    handle.await.expect("Actor failed to exit cleanly");
    handle_subscriber
        .await
        .expect("Actor failed to exit cleanly");
}

I'm happy to share the real use case because it's open source. It's a server that consumes direct messages from the Nostr network.

The server processes messages that can arrive very quickly from a websocket port connected to the Nostr network relays. Initially, I encountered no issues because the test messages from my local test server were fewer than 10. However, as soon as I exceeded that limit, the initial query returned too much data too quickly, so my message handler here wasn't being called due to a broken loop. I was puzzled because my subscriber wasn't being called. Eventually, I "solved" it by adding a limit to the initial query here. This workaround mostly prevents such initial large bursts, which would be rare for the moment, but it's not a real long term solution.

I'm pretty new to Rust and the actor model, so any feedback or suggestion on how to use the crate or anything else will always be welcome.

@slawlor
Copy link
Owner

slawlor commented May 1, 2024

I'm trying to understand what's going on here, we're creating a broadcast channel with a "buffer" of 10 messages. We were exiting the subscription if we got a Lagged(num_dropped) error which isn't right I think. In the event we're lagged, we should log it and probably try and continue on.

However I'm honestly surprised this is happening, because we spawn a dedicated task to take the output port's received messages and forward them to an actor's (unbounded) channel. The only thing I could think of is if the converter is really slow converting messages from type A -> B.

Like as long as the subscription is running, we shouldn't ever fill the buffer of 10 unless we're under super heavy cpu pressure I would hope.

slawlor added a commit that referenced this issue May 1, 2024
@slawlor
Copy link
Owner

slawlor commented May 1, 2024

I added a branch to handle the lagged error, but i'm not in love with the idea of dropping messages. It makes me wonder if I should write my own implementation of the broadcast channel...

d4767b2

dcadenas added a commit to planetary-social/reportinator_server that referenced this issue Jun 14, 2024
There's a branch that solves the issue described in
slawlor/ractor#225 here we depend on a fork
own by us to solve this
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants