diff --git a/pip/pip-374.md b/pip/pip-374.md index 4264617647433..49fe337159628 100644 --- a/pip/pip-374.md +++ b/pip/pip-374.md @@ -67,5 +67,5 @@ Since we added a default method onArrival() in interface, one who has provided t -* Mailing List discussion thread: -* Mailing List voting thread: +* Mailing List discussion thread: https://lists.apache.org/thread/hcfpm4j6hpwxb2olfrro8g4dls35q8rx +* Mailing List voting thread: https://lists.apache.org/thread/wrr02s4cdzqmo1vonp92w6229qo0rv0z diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java index afb17a186477c..8115f34121d3c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java @@ -33,6 +33,7 @@ import lombok.Cleanup; import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomUtils; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.client.impl.TopicMessageImpl; import org.apache.pulsar.common.api.proto.KeyValue; @@ -870,6 +871,101 @@ public void onPartitionsChange(String topicName, int partitions) { Assert.assertNull(reader.readNext(3, TimeUnit.SECONDS)); } + @Test(dataProvider = "topicPartition") + public void testConsumerInterceptorForOnArrive(int topicPartition) throws PulsarClientException, + InterruptedException, PulsarAdminException { + String topicName = "persistent://my-property/my-ns/on-arrive"; + if (topicPartition > 0) { + admin.topics().createPartitionedTopic(topicName, topicPartition); + } + + final int receiveQueueSize = 100; + final int totalNumOfMessages = receiveQueueSize * 2; + + // The onArrival method is called for half of the receiveQueueSize messages before beforeConsume is called for all messages. + CountDownLatch latch = new CountDownLatch(receiveQueueSize / 2); + final AtomicInteger onArrivalCount = new AtomicInteger(0); + ConsumerInterceptor interceptor = new ConsumerInterceptor() { + @Override + public void close() {} + + @Override + public Message onArrival(Consumer consumer, Message message) { + MessageImpl msg = (MessageImpl) message; + msg.getMessageBuilder().addProperty().setKey("onArrival").setValue("1"); + latch.countDown(); + onArrivalCount.incrementAndGet(); + return msg; + } + + @Override + public Message beforeConsume(Consumer consumer, Message message) { + return message; + } + + @Override + public void onAcknowledge(Consumer consumer, MessageId messageId, Throwable cause) { + + } + + @Override + public void onAcknowledgeCumulative(Consumer consumer, MessageId messageId, Throwable cause) { + + } + + @Override + public void onNegativeAcksSend(Consumer consumer, Set messageIds) { + } + + @Override + public void onAckTimeoutSend(Consumer consumer, Set messageIds) { + + } + }; + + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(topicName) + .create(); + + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topicName) + .subscriptionName("test-arrive") + .intercept(interceptor) + .receiverQueueSize(receiveQueueSize) + .subscribe(); + + for (int i = 0; i < totalNumOfMessages; i++) { + producer.send("Mock message"); + } + + // Not call receive message, just wait for onArrival interceptor. + latch.await(); + Assert.assertEquals(latch.getCount(), 0); + + for (int i = 0; i < totalNumOfMessages; i++) { + Message message = consumer.receive(); + MessageImpl msgImpl; + if (message instanceof MessageImpl) { + msgImpl = (MessageImpl) message; + } else if (message instanceof TopicMessageImpl) { + msgImpl = (MessageImpl) ((TopicMessageImpl) message).getMessage(); + } else { + throw new ClassCastException("Message type is not expected"); + } + boolean haveKey = false; + for (KeyValue keyValue : msgImpl.getMessageBuilder().getPropertiesList()) { + if ("onArrival".equals(keyValue.getKey())) { + haveKey = true; + } + } + Assert.assertTrue(haveKey); + } + Assert.assertEquals(totalNumOfMessages, onArrivalCount.get()); + + producer.close(); + consumer.close(); + } + private void produceAndConsume(int msgCount, Producer producer, Reader reader) throws PulsarClientException { for (int i = 0; i < msgCount; i++) { diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerInterceptor.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerInterceptor.java index be2f9b0f10826..1beea3adba239 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerInterceptor.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerInterceptor.java @@ -41,6 +41,44 @@ public interface ConsumerInterceptor extends AutoCloseable { */ void close(); + /** + * This method is called when a message arrives in the consumer. + * + *

This method provides visibility into the messages that have been received + * by the consumer but have not yet been processed. This can be useful for + * monitoring the state of the consumer's receiver queue and understanding + * the consumer's processing rate. + * + *

The method is allowed to modify the message, in which case the modified + * message will be returned. + * + *

Any exception thrown by this method will be caught by the caller, logged, + * but not propagated to the client. + * + *

Since the consumer may run multiple interceptors, a particular + * interceptor's onArrival callback will be called in the order + * specified by {@link ConsumerBuilder#intercept(ConsumerInterceptor[])}. The + * first interceptor in the list gets the consumed message, the following + * interceptor will be passed the message returned by the previous interceptor, + * and so on. Since interceptors are allowed to modify the message, interceptors + * may potentially get the messages already modified by other interceptors. + * However, building a pipeline of mutable interceptors that depend on the output + * of the previous interceptor is discouraged, because of potential side-effects + * caused by interceptors potentially failing to modify the message and throwing + * an exception. If one of the interceptors in the list throws an exception from + * onArrival, the exception is caught, logged, and the next interceptor + * is called with the message returned by the last successful interceptor in the + * list, or otherwise the original consumed message. + * + * @param consumer the consumer which contains the interceptor + * @param message the message that has arrived in the receiver queue + * @return the message that is either modified by the interceptor or the same + * message passed into the method + */ + default Message onArrival(Consumer consumer, Message message) { + return message; + } + /** * This is called just before the message is returned by * {@link Consumer#receive()}, {@link MessageListener#received(Consumer, diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 9748a42f0cb2b..03256a3e139b6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -852,6 +852,14 @@ public String toString() { + '}'; } + protected Message onArrival(Message message) { + if (interceptors != null) { + return interceptors.onArrival(this, message); + } else { + return message; + } + } + protected Message beforeConsume(Message message) { if (interceptors != null) { return interceptors.beforeConsume(this, message); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 996569704d712..60b9d145c4897 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1301,9 +1301,10 @@ private void executeNotifyCallback(final MessageImpl message) { increaseAvailablePermits(cnx()); return; } + Message interceptMsg = onArrival(message); if (hasNextPendingReceive()) { - notifyPendingReceivedCallback(message, null); - } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) { + notifyPendingReceivedCallback(interceptMsg, null); + } else if (enqueueMessageAndCheckBatchReceive(interceptMsg) && hasPendingBatchReceive()) { notifyPendingBatchReceivedCallBack(); } }); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java index 832dc0bacaee9..dd1e2cec3b3ef 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java @@ -44,6 +44,38 @@ public ConsumerInterceptors(List> interceptors) { this.interceptors = interceptors; } + + /** + * This method is called when a message arrives in the consumer. + *

+ * This method calls {@link ConsumerInterceptor#onArrival(Consumer, Message) method for each + * interceptor. + *

+ * This method does not throw exceptions. If any of the interceptors in the chain throws an exception, it gets + * caught and logged, and next interceptor in int the chain is called with 'messages' returned by the previous + * successful interceptor beforeConsume call. + * + * @param consumer the consumer which contains the interceptors + * @param message message to be consume by the client. + * @return messages that are either modified by interceptors or same as messages passed to this method. + */ + public Message onArrival(Consumer consumer, Message message) { + Message interceptorMessage = message; + for (int i = 0, interceptorsSize = interceptors.size(); i < interceptorsSize; i++) { + try { + interceptorMessage = interceptors.get(i).onArrival(consumer, interceptorMessage); + } catch (Throwable e) { + if (consumer != null) { + log.warn("Error executing interceptor beforeConsume callback topic: {} consumerName: {}", + consumer.getTopic(), consumer.getConsumerName(), e); + } else { + log.warn("Error executing interceptor beforeConsume callback", e); + } + } + } + return interceptorMessage; + } + /** * This is called just before the message is returned by {@link Consumer#receive()}, * {@link MessageListener#received(Consumer, Message)} or the {@link java.util.concurrent.CompletableFuture} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index bf8bd6cc95117..513c0101ac6ac 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -1608,6 +1608,11 @@ private CompletableFuture> getExistsPartitions(String topic) { private ConsumerInterceptors getInternalConsumerInterceptors(ConsumerInterceptors multiTopicInterceptors) { return new ConsumerInterceptors(new ArrayList<>()) { + @Override + public Message onArrival(Consumer consumer, Message message) { + return multiTopicInterceptors.onArrival(consumer, message); + } + @Override public Message beforeConsume(Consumer consumer, Message message) { return message;