From 8bae1da61de3ea1d4aa3b7e338789a52427fab77 Mon Sep 17 00:00:00 2001 From: Matt Peterson Date: Thu, 8 Aug 2024 15:01:33 -0600 Subject: [PATCH] feat: added subscriber metric Signed-off-by: Matt Peterson --- .../block/server/mediator/LiveStreamMediatorImpl.java | 11 +++++++++++ .../hedera/block/server/metrics/MetricsService.java | 8 ++++++++ .../com/hedera/block/server/BlockStreamServiceIT.java | 1 - 3 files changed, 19 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java b/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java index 51704c3dc..24885815c 100644 --- a/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java +++ b/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java @@ -30,6 +30,7 @@ import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.util.DaemonThreadFactory; +import com.swirlds.metrics.api.LongGauge; import edu.umd.cs.findbugs.annotations.NonNull; import java.io.IOException; import java.util.Map; @@ -164,6 +165,8 @@ public void subscribe( // Keep track of the subscriber subscribers.put(handler, batchEventProcessor); + + updateSubscriberMetrics(); } @Override @@ -183,6 +186,8 @@ public void unsubscribe( // Remove the gating sequence from the ring buffer ringBuffer.removeGatingSequence(batchEventProcessor.getSequence()); } + + updateSubscriberMetrics(); } @Override @@ -200,4 +205,10 @@ private static SubscribeStreamResponse buildEndStreamResponse() { .setStatus(SubscribeStreamResponse.SubscribeStreamResponseCode.READ_STREAM_SUCCESS) .build(); } + + private void updateSubscriberMetrics() { + @NonNull final MetricsService metricsService = blockNodeContext.metricsService(); + @NonNull final LongGauge longGauge = metricsService.subscriberGauge; + longGauge.set(subscribers.size()); + } } diff --git a/server/src/main/java/com/hedera/block/server/metrics/MetricsService.java b/server/src/main/java/com/hedera/block/server/metrics/MetricsService.java index 67a716040..1aa9b9acf 100644 --- a/server/src/main/java/com/hedera/block/server/metrics/MetricsService.java +++ b/server/src/main/java/com/hedera/block/server/metrics/MetricsService.java @@ -37,6 +37,12 @@ public class MetricsService { new Counter.Config(LIVE_CATEGORY, "liveBlockItemCounter") .withDescription("Live BlockItem Counter"); + // Subscriber Gauge + private static final String SUBSCRIBER_CATEGORY = "subscriber"; + private static final LongGauge.Config SUBSCRIBER_GAUGE = + new LongGauge.Config(SUBSCRIBER_CATEGORY, "subscriberGauge") + .withDescription("Subscriber Gauge"); + /** An example gauge. */ public final LongGauge exampleGauge; @@ -44,6 +50,7 @@ public class MetricsService { public final Counter exampleCounter; public final Counter liveBlockItemCounter; + public final LongGauge subscriberGauge; /** * Creates a new instance of {@link MetricsService}. @@ -55,5 +62,6 @@ public MetricsService(@NonNull final Metrics metrics) { this.exampleCounter = metrics.getOrCreate(EXAMPLE_COUNTER); this.liveBlockItemCounter = metrics.getOrCreate(LIVE_BLOCK_ITEM_COUNTER); + this.subscriberGauge = metrics.getOrCreate(SUBSCRIBER_GAUGE); } } diff --git a/server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java b/server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java index 61641e7c0..0136279b8 100644 --- a/server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java +++ b/server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java @@ -175,7 +175,6 @@ public void testSubscribeBlockStream() throws IOException { // Calling onNext() with a BlockItem streamObserver.onNext(publishStreamRequest); - // Verify the counter was incremented assertEquals(1, blockNodeContext.metricsService().liveBlockItemCounter.get());