Reactive Streams adapter for Apache Pulsar Java Client. This uses Project Reactor as the Reactive Streams implementation.
Update: This project is deprecated and replaced by official Reactive client for Apache Pulsar and Spring Pulsar
Please migrate to use Reactive client for Apache Pulsar and Spring Pulsar. Spring Pulsar contains reactive support.
The API is evolving and the documentation and examples might not match the released version available in Maven central. Please keep this in mind when using the library and the applying the examples.
- SpringOne 2021: Reactive Applications with Apache Pulsar and Spring Boot
- ApacheCon 2021: Building resilient and scalable API backends with Apache Pulsar and Spring Reactive
This library requires Java 8 or + to run.
With Gradle:
repositories {
mavenCentral()
}
dependencies {
implementation "com.github.lhotari:reactive-pulsar-adapter:0.2.1"
}
With Maven:
<dependencies>
<dependency>
<groupId>com.github.lhotari</groupId>
<artifactId>reactive-pulsar-adapter</artifactId>
<version>0.2.1</version>
</dependency>
</dependencies>
There's a Spring Boot example at https://github.com/lhotari/reactive-pulsar-showcase . Another Spring Boot example is available at https://github.com/lhotari/reactive-iot-backend-ApacheCon2021 .
Getting it with Gradle:
repositories {
mavenCentral()
}
dependencies {
implementation "com.github.lhotari:reactive-pulsar-spring-boot-starter:0.2.1"
testImplementation "com.github.lhotari:reactive-pulsar-spring-test-support:0.2.1"
}
Getting it with Maven:
<dependencies>
<dependency>
<groupId>com.github.lhotari</groupId>
<artifactId>reactive-pulsar-spring-boot-starter</artifactId>
<version>0.2.1</version>
</dependency>
<dependency>
<groupId>com.github.lhotari</groupId>
<artifactId>reactive-pulsar-spring-test-support</artifactId>
<version>0.2.1</version>
<scope>test</scope>
</dependency>
</dependencies>
Using an existing PulsarClient instance:
ReactivePulsarClient reactivePulsarClient = ReactivePulsarClient.create(pulsarClient);
Configure pulsar.client.serviceUrl
property in application properties. Any additional properties under pulsar.client.
prefix will be used to configure the Pulsar Client.
The Spring Boot starter will configure a ReactivePulsarClient bean which will be available for autowiring.
ReactiveMessageSender<String> messageSender = reactivePulsarClient
.messageSender(Schema.STRING)
.topic(topicName)
.maxInflight(100)
.build();
Mono<MessageId> messageId = messageSender
.sendMessage(Mono.just(MessageSpec.of("Hello world!")));
// for demonstration
messageId.subscribe(System.out::println);
Add require dependency for cache implementation. This step isn't required when using reactive-pulsar-spring-boot-starter. A ReactiveProducerCache
instance will be made available as a Spring bean in that case. However, it
is necessary to set the cache on the ReactiveMessageSenderFactory.
With Gradle:
dependencies {
implementation "com.github.lhotari:reactive-pulsar-adapter:0.2.1"
implementation "com.github.lhotari:reactive-pulsar-caffeine-producer-cache:0.2.1"
}
With Maven:
<dependencies>
<dependency>
<groupId>com.github.lhotari</groupId>
<artifactId>reactive-pulsar-adapter</artifactId>
<version>0.2.1</version>
</dependency>
<dependency>
<groupId>com.github.lhotari</groupId>
<artifactId>reactive-pulsar-caffeine-producer-cache</artifactId>
<version>0.2.1</version>
</dependency>
</dependencies>
CaffeineReactiveProducerCache producerCache = new CaffeineReactiveProducerCache();
ReactiveMessageSender<String> messageSender = reactivePulsarClient
.messageSender(Schema.STRING)
.cache(producerCache)
.topic(topicName)
.maxInflight(100)
.build();
Mono<MessageId> messageId = messageSender
.sendMessage(Mono.just(MessageSpec.of("Hello world!")));
// for demonstration
messageId.subscribe(System.out::println);
It is recommended to use a cached producer in most cases. The cache enables reusing the Pulsar Producer instance and related resources across multiple message sending calls. This improves performance since a producer won't have to be created and closed before and after sending a message.
The adapter library implementation together with the cache implementation will also enable reactive backpressure for sending messages. The maxInflight
setting will limit the number of messages that are pending from the client to the broker. The solution will limit reactive streams subscription requests to keep the number of pending messages under the defined limit. This limit is per-topic and impacts the local JVM only.
Reading all messages for a topic:
ReactiveMessageReader<String> messageReader =
reactivePulsarClient.messageReader(Schema.STRING)
.topic(topicName)
.build();
messageReader.readMessages()
.map(Message::getValue)
// for demonstration
.subscribe(System.out::println);
By default, the stream will complete when end of the topic is reached. The end of the topic is detected with Pulsar Reader's hasMessageAvailableAsync
method.
The ReactiveMessageReader doesn't support partitioned topics. It's possible to read the content of indidual partitions. Topic names for individual partitions can be discovered using the PulsarClient's getPartitionsForTopic
method. The adapter library doesn't currently wrap that method.
With .endOfStreamAction(EndOfStreamAction.POLL)
the Reader will poll for new messages when the reader reaches the end of the topic.
ReactiveMessageReader<String> messageReader =
reactivePulsarClient.messageReader(Schema.STRING)
.topic(topicName)
.startAtSpec(StartAtSpec.LATEST)
.endOfStreamAction(EndOfStreamAction.POLL)
.build();
messageReader.readMessages()
.take(Duration.ofSeconds(5))
.take(5)
// for demonstration
.subscribe(System.out::println);
ReactiveMessageConsumer<String> messageConsumer=
reactivePulsarClient.messageConsumer(Schema.STRING)
.topic(topicName)
.consumerConfigurer(consumerBuilder->consumerBuilder.subscriptionName("sub"))
.build();
messageConsumer.consumeMessages(messageFlux ->
messageFlux.map(message ->
MessageResult.acknowledge(message.getMessageId(), message.getValue())))
.take(Duration.ofSeconds(2))
// for demonstration
.subscribe(System.out::println);
ReactiveMessageHandler reactiveMessageHandler=
ReactiveMessageHandlerBuilder
.builder(reactivePulsarClient
.messageConsumer(Schema.STRING)
.consumerConfigurer(consumerBuilder->
consumerBuilder.subscriptionName("sub")
.topic(topicName))
.build())
.messageHandler(message -> Mono.fromRunnable(()->{
System.out.println(message.getValue());
}))
.build()
.start();
// for demonstration
// the reactive message handler is running in the background, delay for 10 seconds
Thread.sleep(10000L);
// now stop the message handler component
reactiveMessageHandler.stop();
Reactive Pulsar adapter library is Open Source Software released under the Apache Software License 2.0.
The library is Apache 2.0 licensed.
Contributions are welcome. Contributors will be asked to sign a CLA before the contributions are merged since there's a desire to be able to move the Reactive Pulsar project under Apache in the future. Without CLAs that process comes complicated.
If you detect a bug or have a feature request or a good idea for Reactive Pulsar adapter, please open a GitHub issue or ping one of the contributors on Twitter or on Pulsar Slack.
Please use [reactive-pulsar] tag on Stackoverflow. Ask a question now.