Skip to content

Commit

Permalink
fix: added subscription test
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Sep 17, 2024
1 parent ebe9856 commit c987147
Showing 1 changed file with 32 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.hedera.block.server.consumer.ConsumerStreamResponseObserver;
import com.hedera.block.server.events.BlockNodeEventHandler;
import com.hedera.block.server.events.ObjectEvent;
import com.hedera.block.server.metrics.BlockNodeMetricTypes;
import com.hedera.block.server.notifier.Notifier;
import com.hedera.block.server.notifier.NotifierBuilder;
import com.hedera.block.server.persistence.StreamPersistenceHandlerImpl;
Expand All @@ -44,6 +45,7 @@
import com.hedera.hapi.block.SubscribeStreamResponseCode;
import com.hedera.hapi.block.stream.BlockItem;
import com.hedera.hapi.block.stream.output.BlockHeader;
import com.swirlds.metrics.api.LongGauge;
import edu.umd.cs.findbugs.annotations.NonNull;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
Expand Down Expand Up @@ -273,6 +275,36 @@ public void testSubAndUnsubHandling() throws IOException {
assertEquals(0, blockNodeContext.metricsService().get(LiveBlockItems).get());
}

@Test
public void testSubscribeWhenHandlerAlreadySubscribed() throws IOException {
final BlockNodeContext blockNodeContext = TestConfigUtil.getTestBlockNodeContext();
final LongGauge consumersGauge =
blockNodeContext.metricsService().get(BlockNodeMetricTypes.Gauge.Consumers);
final ServiceStatus serviceStatus = new ServiceStatusImpl(blockNodeContext);
final var streamMediator =
LiveStreamMediatorBuilder.newBuilder(blockNodeContext, serviceStatus).build();

final var concreteObserver1 =
new ConsumerStreamResponseObserver(
testClock, streamMediator, streamObserver1, testContext);

streamMediator.subscribe(concreteObserver1);
assertTrue(streamMediator.isSubscribed(concreteObserver1));
assertEquals(1L, consumersGauge.get());

// Attempt to "re-subscribe" the observer
// Should not increment the counter or change the implementation
streamMediator.subscribe(concreteObserver1);
assertTrue(streamMediator.isSubscribed(concreteObserver1));
assertEquals(1L, consumersGauge.get());

streamMediator.unsubscribe(concreteObserver1);
assertFalse(streamMediator.isSubscribed(concreteObserver1));

// Confirm the counter was decremented
assertEquals(0L, consumersGauge.get());
}

@Test
public void testOnCancelSubscriptionHandling() throws IOException {

Expand Down

0 comments on commit c987147

Please sign in to comment.