Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Missing batchingMaxPublishDelay option #306

Open
jiangpengcheng opened this issue Feb 20, 2024 · 0 comments
Open

Missing batchingMaxPublishDelay option #306

jiangpengcheng opened this issue Feb 20, 2024 · 0 comments

Comments

@jiangpengcheng
Copy link
Member

In Java client, there is a batchingMaxPublishDelay option, which is 1 millisecond by default:

https://github.com/apache/pulsar/blob/02147454c425b92f0cd1caefa73b9339db6a0269/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java#L245

    public ProducerBuilder<T> batchingMaxPublishDelay(long batchDelay, @NonNull TimeUnit timeUnit) {
        conf.setBatchingMaxPublishDelayMicros(batchDelay, timeUnit);
        return this;
    }

And:

    @ApiModelProperty(
            name = "batchingMaxPublishDelayMicros",
            value = "Batching time period of sending messages."
    )
    private long batchingMaxPublishDelayMicros = TimeUnit.MILLISECONDS.toMicros(1);

Without such an option, if we set the batch_size to 1000 in pulsar-rs, and there are only 500 messages to be sent, then no messages will be sent to the Pulsar broker side at all:

use pulsar::{Authentication, ProducerOptions, Pulsar, TokioExecutor};

#[tokio::main]
async fn main() -> Result<(), pulsar::Error> {
    env_logger::init();

    let addr = "pulsar://localhost:6650".to_string();
    let mut builder = Pulsar::builder(addr, TokioExecutor);

    let pulsar: Pulsar<_> = builder.build().await?;
    let mut producer = pulsar
        .producer()
        .with_topic("persistent://public/default/test-batch")
        .with_options(ProducerOptions {
            encrypted: None,
            metadata: Default::default(),
            schema: None,
            batch_size: Some(1000),
            batch_byte_size: Some(128 * 1024),
            compression: None,
            access_mode: None,
        })
        .build()
        .await?;

    let mut counter = 0usize;
    loop {
        producer
            .send(format!("Hello-{}", counter))
            .await
            .unwrap();

        counter += 1;
        println!("{counter} messages");

        if counter >= 500 {
            producer.close().await.expect("Unable to close connection");
            break;
        }
    }

    Ok(())
}

It looks like there is a buffer in the backend, and only when the buffer is full(size == batch_size), the pulsar-rs will do the actual sending work.?

I think the batch sending needs to be triggered when one of the below conditions is met:

  1. the buffer is full

  2. a given timeout, such as 1 millisecond

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant