From 5deb34f143325a847a767dfb5fc689f8228e962a Mon Sep 17 00:00:00 2001 From: David Sloan Date: Fri, 17 May 2024 16:48:00 +0100 Subject: [PATCH] Reducing verbose logging --- .../kafka-connect-common/src/main/resources/logback.xml | 1 - .../connect/gcp/pubsub/source/admin/PubSubService.java | 3 +-- .../gcp/pubsub/source/subscriber/PubSubSubscriber.java | 7 ++----- .../pubsub/source/subscriber/PubSubSubscriberManager.java | 8 ++++---- .../gcp/pubsub/source/subscriber/SourceOffset.java | 2 ++ .../gcp/pubsub/source/subscriber/SourcePartition.java | 2 ++ 6 files changed, 11 insertions(+), 12 deletions(-) diff --git a/java-connectors/kafka-connect-common/src/main/resources/logback.xml b/java-connectors/kafka-connect-common/src/main/resources/logback.xml index de7e1d1ca..a7902682c 100644 --- a/java-connectors/kafka-connect-common/src/main/resources/logback.xml +++ b/java-connectors/kafka-connect-common/src/main/resources/logback.xml @@ -24,5 +24,4 @@ - \ No newline at end of file diff --git a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/admin/PubSubService.java b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/admin/PubSubService.java index 50a2ffed6..8d8e40bf9 100644 --- a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/admin/PubSubService.java +++ b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/admin/PubSubService.java @@ -70,9 +70,8 @@ private SubscriptionAdminClient createSubscriptionAdminClient(final AuthMode aut } public String topicNameFor(final String subscriptionId) { - log.info("Requesting subscription details for projectId: {}, subscriptionId: {}", projectId, subscriptionId); val subscription = subscriptionAdminClient.getSubscription(createSubscriptionName(subscriptionId)); - log.info("Found subscription details for subscriptionId: {}", subscription.getTopic()); + log.info("Found topic details {} for subscriptionId {}", subscription.getTopic(), subscriptionId); return subscription.getTopic(); } diff --git a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/PubSubSubscriber.java b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/PubSubSubscriber.java index 3b3eb0b90..ff5f9ef9b 100644 --- a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/PubSubSubscriber.java +++ b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/PubSubSubscriber.java @@ -58,7 +58,7 @@ public PubSubSubscriber( String projectId, PubSubSubscription subscription ) { - log.info("Starting PubSubSubscriber"); + log.info("Starting PubSubSubscriber for subscription {}", subscription.getSubscriptionId()); targetTopicName = subscription.getTargetKafkaTopic(); batchSize = subscription.getBatchSize(); messageQueue = new ConcurrentLinkedQueue<>(); @@ -81,7 +81,6 @@ public PubSubSubscriber( } public void startAsync() { - log.info("Start Subscriber"); gcpSubscriber.startAsync(); } @@ -93,7 +92,6 @@ private MessageReceiver createMessageReceiver() { } public List getMessages() { - log.info("GetMessages"); return IntStream.range(0, batchSize) .mapToObj(i -> messageQueue.poll()) .filter(Objects::nonNull) @@ -107,7 +105,7 @@ public List getMessages() { } public void acknowledge(String messageId) { - log.info("Sending acknowledgement for {}}", messageId); + log.trace("Sending acknowledgement for {}}", messageId); Optional .ofNullable(ackCache.getIfPresent(messageId)) .ifPresent(e -> { @@ -117,7 +115,6 @@ public void acknowledge(String messageId) { } public void stopAsync() { - log.info("Stopping Subscriber"); gcpSubscriber.stopAsync(); } diff --git a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/PubSubSubscriberManager.java b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/PubSubSubscriberManager.java index 4adf321a1..b385d3bc2 100644 --- a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/PubSubSubscriberManager.java +++ b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/PubSubSubscriberManager.java @@ -41,7 +41,7 @@ public PubSubSubscriberManager( List subscriptionConfigs, SubscriberCreator subscriberCreator ) { - log.info("Starting PubSubSubscriberManager"); + log.info("Starting PubSubSubscriberManager for {} subscriptions", subscriptionConfigs.size()); subscribers = subscriptionConfigs .parallelStream() @@ -51,14 +51,14 @@ public PubSubSubscriberManager( } public List poll() { - log.info("Polling messages from PubSub"); + log.trace("Polling messages from all partitions"); val subs = subscribers .values() .parallelStream() .flatMap(pubSubSubscriber -> pubSubSubscriber.getMessages().stream()) .collect(Collectors.toList()); - log.info("Polled {} messages from PubSub", subs.size()); + log.debug("Polled {} messages from all partitions", subs.size()); return subs; } @@ -66,7 +66,7 @@ public void commitRecord( SourcePartition sourcePartition, SourceOffset sourceOffset ) { - log.info("Committing record for partition {} with offset {}", sourcePartition, sourceOffset); + log.trace("Committing record for partition {} with offset {}", sourcePartition, sourceOffset); subscribers .get(sourcePartition.getSubscriptionId()) .acknowledge(sourceOffset.getMessageId()); diff --git a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/SourceOffset.java b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/SourceOffset.java index 20a2315ec..d28662cca 100644 --- a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/SourceOffset.java +++ b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/SourceOffset.java @@ -19,6 +19,7 @@ import lombok.AllArgsConstructor; import lombok.Getter; +import lombok.ToString; /** * SourceOffset holds the message id from the PubSub message, to allow kafka connect to track the offset of the message, @@ -26,6 +27,7 @@ */ @AllArgsConstructor @Getter +@ToString public class SourceOffset { private static final String KEY_MESSAGE_ID = "message.id"; diff --git a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/SourcePartition.java b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/SourcePartition.java index 46854d3be..ef3e1337f 100644 --- a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/SourcePartition.java +++ b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/subscriber/SourcePartition.java @@ -19,6 +19,7 @@ import lombok.AllArgsConstructor; import lombok.Getter; +import lombok.ToString; /** * SourcePartition holds the location from which the message was sourced within GCP PubSub, for use reporting back @@ -26,6 +27,7 @@ */ @Getter @AllArgsConstructor +@ToString public class SourcePartition { private static final String KEY_PROJECT_ID = "project.id";