From c9871471f7e6922edab5be8de4d4228c6c848aa7 Mon Sep 17 00:00:00 2001 From: Matt Peterson Date: Tue, 17 Sep 2024 11:04:00 -0600 Subject: [PATCH] fix: added subscription test Signed-off-by: Matt Peterson --- .../mediator/LiveStreamMediatorImplTest.java | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java b/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java index 86c0bbc36..eb7477299 100644 --- a/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java +++ b/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java @@ -33,6 +33,7 @@ import com.hedera.block.server.consumer.ConsumerStreamResponseObserver; import com.hedera.block.server.events.BlockNodeEventHandler; import com.hedera.block.server.events.ObjectEvent; +import com.hedera.block.server.metrics.BlockNodeMetricTypes; import com.hedera.block.server.notifier.Notifier; import com.hedera.block.server.notifier.NotifierBuilder; import com.hedera.block.server.persistence.StreamPersistenceHandlerImpl; @@ -44,6 +45,7 @@ import com.hedera.hapi.block.SubscribeStreamResponseCode; import com.hedera.hapi.block.stream.BlockItem; import com.hedera.hapi.block.stream.output.BlockHeader; +import com.swirlds.metrics.api.LongGauge; import edu.umd.cs.findbugs.annotations.NonNull; import io.grpc.stub.ServerCallStreamObserver; import io.grpc.stub.StreamObserver; @@ -273,6 +275,36 @@ public void testSubAndUnsubHandling() throws IOException { assertEquals(0, blockNodeContext.metricsService().get(LiveBlockItems).get()); } + @Test + public void testSubscribeWhenHandlerAlreadySubscribed() throws IOException { + final BlockNodeContext blockNodeContext = TestConfigUtil.getTestBlockNodeContext(); + final LongGauge consumersGauge = + blockNodeContext.metricsService().get(BlockNodeMetricTypes.Gauge.Consumers); + final ServiceStatus serviceStatus = new ServiceStatusImpl(blockNodeContext); + final var streamMediator = + LiveStreamMediatorBuilder.newBuilder(blockNodeContext, serviceStatus).build(); + + final var concreteObserver1 = + new ConsumerStreamResponseObserver( + testClock, streamMediator, streamObserver1, testContext); + + streamMediator.subscribe(concreteObserver1); + assertTrue(streamMediator.isSubscribed(concreteObserver1)); + assertEquals(1L, consumersGauge.get()); + + // Attempt to "re-subscribe" the observer + // Should not increment the counter or change the implementation + streamMediator.subscribe(concreteObserver1); + assertTrue(streamMediator.isSubscribed(concreteObserver1)); + assertEquals(1L, consumersGauge.get()); + + streamMediator.unsubscribe(concreteObserver1); + assertFalse(streamMediator.isSubscribed(concreteObserver1)); + + // Confirm the counter was decremented + assertEquals(0L, consumersGauge.get()); + } + @Test public void testOnCancelSubscriptionHandling() throws IOException {