diff --git a/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java b/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java index 83f506132..94fa89dee 100644 --- a/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java +++ b/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java @@ -17,9 +17,14 @@ package com.hedera.block.server.mediator; import static com.hedera.block.server.metrics.BlockNodeMetricTypes.Counter.LiveBlockItems; +import static com.hedera.block.server.metrics.BlockNodeMetricTypes.Counter.LiveBlockStreamMediatorError; +import static com.hedera.block.server.util.PersistTestUtils.generateBlockItemsUnparsed; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.hedera.block.server.config.BlockNodeContext; @@ -28,12 +33,17 @@ import com.hedera.block.server.events.ObjectEvent; import com.hedera.block.server.metrics.BlockNodeMetricTypes; import com.hedera.block.server.notifier.Notifier; +import com.hedera.block.server.notifier.NotifierImpl; +import com.hedera.block.server.persistence.StreamPersistenceHandlerImpl; import com.hedera.block.server.persistence.storage.write.BlockWriter; import com.hedera.block.server.service.ServiceStatus; import com.hedera.block.server.service.ServiceStatusImpl; import com.hedera.block.server.util.TestConfigUtil; +import com.hedera.hapi.block.BlockItemSetUnparsed; +import com.hedera.hapi.block.BlockItemUnparsed; +import com.hedera.hapi.block.SubscribeStreamResponseCode; import com.hedera.hapi.block.SubscribeStreamResponseUnparsed; -import com.hedera.hapi.block.stream.BlockItem; +import com.hedera.hapi.block.stream.output.BlockHeader; import com.hedera.pbj.runtime.grpc.Pipeline; import com.swirlds.metrics.api.LongGauge; import java.io.IOException; @@ -41,6 +51,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -59,19 +70,19 @@ public class LiveStreamMediatorImplTest { private BlockNodeEventHandler> observer3; @Mock - private BlockWriter> blockWriter; + private BlockWriter> blockWriter; @Mock private Notifier notifier; @Mock - private Pipeline streamObserver1; + private Pipeline helidonSubscribeStreamObserver1; @Mock - private Pipeline streamObserver2; + private Pipeline helidonSubscribeStreamObserver2; @Mock - private Pipeline streamObserver3; + private Pipeline helidonSubscribeStreamObserver3; @Mock private InstantSource testClock; @@ -123,93 +134,92 @@ public void testUnsubscribeEach() throws InterruptedException, IOException { assertEquals(0, blockNodeContext.metricsService().get(LiveBlockItems).get()); } - // @Test - // public void testMediatorPersistenceWithoutSubscribers() throws IOException { - // - // final BlockNodeContext blockNodeContext = TestConfigUtil.getTestBlockNodeContext(); - // final ServiceStatus serviceStatus = new ServiceStatusImpl(blockNodeContext); - // final var streamMediator = LiveStreamMediatorBuilder.newBuilder(blockNodeContext, serviceStatus) - // .build(); - // final BlockItem blockItem = BlockItem.newBuilder().build(); - // - // // register the stream validator - // when(blockWriter.write(List.of(blockItem))).thenReturn(Optional.empty()); - // final var streamValidator = new StreamPersistenceHandlerImpl( - // streamMediator, notifier, blockWriter, blockNodeContext, serviceStatus); - // streamMediator.subscribe(streamValidator); - // - // // Acting as a producer, notify the mediator of a new block - // streamMediator.publish(List.of(blockItem)); - // - // // Verify the counter was incremented - // assertEquals(1, blockNodeContext.metricsService().get(LiveBlockItems).get()); - // - // // Confirm the BlockStorage write method was - // // called despite the absence of subscribers - // verify(blockWriter, timeout(testTimeout).times(1)).write(List.of(blockItem)); - // } + @Test + public void testMediatorPersistenceWithoutSubscribers() throws IOException { - // @Test - // public void testMediatorPublishEventToSubscribers() throws IOException { - // - // final BlockNodeContext blockNodeContext = TestConfigUtil.getTestBlockNodeContext(); - // final ServiceStatus serviceStatus = new ServiceStatusImpl(blockNodeContext); - // final var streamMediator = LiveStreamMediatorBuilder.newBuilder(blockNodeContext, serviceStatus) - // .build(); - // - // when(testClock.millis()).thenReturn(TEST_TIME, TEST_TIME + TIMEOUT_THRESHOLD_MILLIS); - // - // final var concreteObserver1 = - // new ConsumerStreamResponseObserver(testClock, streamMediator, streamObserver1, testContext); - // - // final var concreteObserver2 = - // new ConsumerStreamResponseObserver(testClock, streamMediator, streamObserver2, testContext); - // - // final var concreteObserver3 = - // new ConsumerStreamResponseObserver(testClock, streamMediator, streamObserver3, testContext); - // - // // Set up the subscribers - // streamMediator.subscribe(concreteObserver1); - // streamMediator.subscribe(concreteObserver2); - // streamMediator.subscribe(concreteObserver3); - // - // assertTrue( - // streamMediator.isSubscribed(concreteObserver1), "Expected the mediator to have observer1 - // subscribed"); - // assertTrue( - // streamMediator.isSubscribed(concreteObserver2), "Expected the mediator to have observer2 - // subscribed"); - // assertTrue( - // streamMediator.isSubscribed(concreteObserver3), "Expected the mediator to have observer3 - // subscribed"); - // - // final BlockHeader blockHeader = BlockHeader.newBuilder().number(1).build(); - // final BlockItem blockItem = - // BlockItem.newBuilder().blockHeader(blockHeader).build(); - // final BlockItemSet blockItemSet = - // BlockItemSet.newBuilder().blockItems(blockItem).build(); - // final SubscribeStreamResponse subscribeStreamResponse = - // SubscribeStreamResponse.newBuilder().blockItems(blockItemSet).build(); - // - // // register the stream validator - // when(blockWriter.write(List.of(blockItem))).thenReturn(Optional.empty()); - // final var streamValidator = new StreamPersistenceHandlerImpl( - // streamMediator, notifier, blockWriter, blockNodeContext, serviceStatus); - // streamMediator.subscribe(streamValidator); - // - // // Acting as a producer, notify the mediator of a new block - // streamMediator.publish(List.of(blockItem)); - // - // assertEquals(1, blockNodeContext.metricsService().get(LiveBlockItems).get()); - // - // // Confirm each subscriber was notified of the new block - // verify(streamObserver1, timeout(testTimeout).times(1)).onNext(subscribeStreamResponse); - // verify(streamObserver2, timeout(testTimeout).times(1)).onNext(subscribeStreamResponse); - // verify(streamObserver3, timeout(testTimeout).times(1)).onNext(subscribeStreamResponse); - // - // // Confirm the BlockStorage write method was called - // verify(blockWriter, timeout(testTimeout).times(1)).write(List.of(blockItem)); - // } + final BlockNodeContext blockNodeContext = TestConfigUtil.getTestBlockNodeContext(); + final ServiceStatus serviceStatus = new ServiceStatusImpl(blockNodeContext); + final var streamMediator = LiveStreamMediatorBuilder.newBuilder(blockNodeContext, serviceStatus) + .build(); + + final BlockItemUnparsed blockItem = BlockItemUnparsed.newBuilder().build(); + + // register the stream validator + when(blockWriter.write(List.of(blockItem))).thenReturn(Optional.empty()); + final var handler = new StreamPersistenceHandlerImpl( + streamMediator, notifier, blockWriter, blockNodeContext, serviceStatus); + streamMediator.subscribe(handler); + + // Acting as a producer, notify the mediator of a new block + streamMediator.publish(List.of(blockItem)); + + // Verify the counter was incremented + assertEquals(1, blockNodeContext.metricsService().get(LiveBlockItems).get()); + + // Confirm the BlockStorage write method was + // called despite the absence of subscribers + verify(blockWriter, timeout(testTimeout).times(1)).write(List.of(blockItem)); + } + + @Test + public void testMediatorPublishEventToSubscribers() throws IOException { + + final BlockNodeContext blockNodeContext = TestConfigUtil.getTestBlockNodeContext(); + final ServiceStatus serviceStatus = new ServiceStatusImpl(blockNodeContext); + final var streamMediator = LiveStreamMediatorBuilder.newBuilder(blockNodeContext, serviceStatus) + .build(); + + when(testClock.millis()).thenReturn(TEST_TIME, TEST_TIME + TIMEOUT_THRESHOLD_MILLIS); + + final var concreteObserver1 = new ConsumerStreamResponseObserver( + testClock, streamMediator, helidonSubscribeStreamObserver1, testContext); + + final var concreteObserver2 = new ConsumerStreamResponseObserver( + testClock, streamMediator, helidonSubscribeStreamObserver2, testContext); + + final var concreteObserver3 = new ConsumerStreamResponseObserver( + testClock, streamMediator, helidonSubscribeStreamObserver3, testContext); + + // Set up the subscribers + streamMediator.subscribe(concreteObserver1); + streamMediator.subscribe(concreteObserver2); + streamMediator.subscribe(concreteObserver3); + + assertTrue( + streamMediator.isSubscribed(concreteObserver1), "Expected the mediator to have observer1 subscribed"); + assertTrue( + streamMediator.isSubscribed(concreteObserver2), "Expected the mediator to have observer2 subscribed"); + assertTrue( + streamMediator.isSubscribed(concreteObserver3), "Expected the mediator to have observer3 subscribed"); + + final BlockItemUnparsed blockItem = BlockItemUnparsed.newBuilder() + .blockHeader(BlockHeader.PROTOBUF.toBytes( + BlockHeader.newBuilder().number(1).build())) + .build(); + final SubscribeStreamResponseUnparsed subscribeStreamResponse = SubscribeStreamResponseUnparsed.newBuilder() + .blockItems( + BlockItemSetUnparsed.newBuilder().blockItems(blockItem).build()) + .build(); + + // register the stream validator + when(blockWriter.write(List.of(blockItem))).thenReturn(Optional.empty()); + final var handler = new StreamPersistenceHandlerImpl( + streamMediator, notifier, blockWriter, blockNodeContext, serviceStatus); + streamMediator.subscribe(handler); + + // Acting as a producer, notify the mediator of a new block + streamMediator.publish(List.of(blockItem)); + + assertEquals(1, blockNodeContext.metricsService().get(LiveBlockItems).get()); + + // Confirm each subscriber was notified of the new block + verify(helidonSubscribeStreamObserver1, timeout(testTimeout).times(1)).onNext(subscribeStreamResponse); + verify(helidonSubscribeStreamObserver2, timeout(testTimeout).times(1)).onNext(subscribeStreamResponse); + verify(helidonSubscribeStreamObserver3, timeout(testTimeout).times(1)).onNext(subscribeStreamResponse); + + // Confirm the BlockStorage write method was called + verify(blockWriter, timeout(testTimeout).times(1)).write(List.of(blockItem)); + } @Test public void testSubAndUnsubHandling() throws IOException { @@ -221,14 +231,14 @@ public void testSubAndUnsubHandling() throws IOException { when(testClock.millis()).thenReturn(TEST_TIME, TEST_TIME + TIMEOUT_THRESHOLD_MILLIS); - final var concreteObserver1 = - new ConsumerStreamResponseObserver(testClock, streamMediator, streamObserver1, testContext); + final var concreteObserver1 = new ConsumerStreamResponseObserver( + testClock, streamMediator, helidonSubscribeStreamObserver1, testContext); - final var concreteObserver2 = - new ConsumerStreamResponseObserver(testClock, streamMediator, streamObserver2, testContext); + final var concreteObserver2 = new ConsumerStreamResponseObserver( + testClock, streamMediator, helidonSubscribeStreamObserver2, testContext); - final var concreteObserver3 = - new ConsumerStreamResponseObserver(testClock, streamMediator, streamObserver3, testContext); + final var concreteObserver3 = new ConsumerStreamResponseObserver( + testClock, streamMediator, helidonSubscribeStreamObserver3, testContext); // Set up the subscribers streamMediator.subscribe(concreteObserver1); @@ -251,8 +261,8 @@ public void testSubscribeWhenHandlerAlreadySubscribed() throws IOException { final var streamMediator = LiveStreamMediatorBuilder.newBuilder(blockNodeContext, serviceStatus) .build(); - final var concreteObserver1 = - new ConsumerStreamResponseObserver(testClock, streamMediator, streamObserver1, testContext); + final var concreteObserver1 = new ConsumerStreamResponseObserver( + testClock, streamMediator, helidonSubscribeStreamObserver1, testContext); streamMediator.subscribe(concreteObserver1); assertTrue(streamMediator.isSubscribed(concreteObserver1)); @@ -271,57 +281,56 @@ public void testSubscribeWhenHandlerAlreadySubscribed() throws IOException { assertEquals(0L, consumersGauge.get()); } - // @Test - // public void testOnCancelSubscriptionHandling() throws IOException { + // @Test + // public void testOnCancelSubscriptionHandling() throws IOException { // - // final BlockNodeContext blockNodeContext = TestConfigUtil.getTestBlockNodeContext(); - // final ServiceStatus serviceStatus = new ServiceStatusImpl(blockNodeContext); - // final var streamMediator = - // LiveStreamMediatorBuilder.newBuilder(blockNodeContext, serviceStatus).build(); + // final BlockNodeContext blockNodeContext = TestConfigUtil.getTestBlockNodeContext(); + // final ServiceStatus serviceStatus = new ServiceStatusImpl(blockNodeContext); + // final var streamMediator = + // LiveStreamMediatorBuilder.newBuilder(blockNodeContext, serviceStatus).build(); // - // when(testClock.millis()).thenReturn(TEST_TIME, TEST_TIME + TIMEOUT_THRESHOLD_MILLIS); + // when(testClock.millis()).thenReturn(TEST_TIME, TEST_TIME + TIMEOUT_THRESHOLD_MILLIS); // - // final List blockItems = generateBlockItems(1); + // final List blockItems = generateBlockItemsUnparsed(1); // - // // register the stream validator - // when(blockWriter.write(blockItems.getFirst())).thenReturn(Optional.empty()); - // final var streamValidator = - // new StreamPersistenceHandlerImpl( - // streamMediator, notifier, blockWriter, blockNodeContext, - // serviceStatus); - // streamMediator.subscribe(streamValidator); + // // register the stream validator + // when(blockWriter.write(List.of(blockItems.getFirst()))).thenReturn(Optional.empty()); + // final var streamValidator = + // new StreamPersistenceHandlerImpl( + // streamMediator, notifier, blockWriter, blockNodeContext, serviceStatus); + // streamMediator.subscribe(streamValidator); // - // // register the test observer - // final var testConsumerBlockItemObserver = - // new TestConsumerStreamResponseObserver( - // testClock, streamMediator, serverCallStreamObserver, testContext); + // // register the test observer + // final var testConsumerBlockItemObserver = + // new TestConsumerStreamResponseObserver( + // testClock, streamMediator, serverCallStreamObserver, testContext); // - // streamMediator.subscribe(testConsumerBlockItemObserver); - // assertTrue(streamMediator.isSubscribed(testConsumerBlockItemObserver)); + // streamMediator.subscribe(testConsumerBlockItemObserver); + // assertTrue(streamMediator.isSubscribed(testConsumerBlockItemObserver)); // - // // Simulate the producer notifying the mediator of a new block - // streamMediator.publish(blockItems.getFirst()); + // // Simulate the producer notifying the mediator of a new block + // streamMediator.publish(blockItems.getFirst()); // - // // Simulate the consumer cancelling the stream - // testConsumerBlockItemObserver.getOnCancel().run(); + // // Simulate the consumer cancelling the stream + // testConsumerBlockItemObserver.getOnCancel().run(); // - // // Verify the block item incremented the counter - // assertEquals(1, blockNodeContext.metricsService().get(LiveBlockItems).get()); + // // Verify the block item incremented the counter + // assertEquals(1, blockNodeContext.metricsService().get(LiveBlockItems).get()); // - // // Verify the event made it to the consumer - // verify(serverCallStreamObserver, - // timeout(testTimeout).times(1)).setOnCancelHandler(any()); + // // Verify the event made it to the consumer + // verify(serverCallStreamObserver, + // timeout(testTimeout).times(1)).setOnCancelHandler(any()); // - // // Confirm the mediator unsubscribed the consumer - // assertFalse(streamMediator.isSubscribed(testConsumerBlockItemObserver)); + // // Confirm the mediator unsubscribed the consumer + // assertFalse(streamMediator.isSubscribed(testConsumerBlockItemObserver)); // - // // Confirm the BlockStorage write method was called - // verify(blockWriter, timeout(testTimeout).times(1)).write(blockItems.getFirst()); - // - // // Confirm the stream validator is still subscribed - // assertTrue(streamMediator.isSubscribed(streamValidator)); - // } + // // Confirm the BlockStorage write method was called + // verify(blockWriter, timeout(testTimeout).times(1)).write(blockItems.getFirst()); // + // // Confirm the stream validator is still subscribed + // assertTrue(streamMediator.isSubscribed(streamValidator)); + // } + // @Test // public void testOnCloseSubscriptionHandling() throws IOException { // @@ -373,129 +382,121 @@ public void testSubscribeWhenHandlerAlreadySubscribed() throws IOException { // // Confirm the stream validator is still subscribed // assertTrue(streamMediator.isSubscribed(streamValidator)); // } - // - // @Test - // public void testMediatorBlocksPublishAfterException() throws IOException, InterruptedException { - // - // final BlockNodeContext blockNodeContext = TestConfigUtil.getTestBlockNodeContext(); - // final ServiceStatus serviceStatus = new ServiceStatusImpl(blockNodeContext); - // final var streamMediator = LiveStreamMediatorBuilder.newBuilder(blockNodeContext, serviceStatus) - // .build(); - // - // final var concreteObserver1 = - // new ConsumerStreamResponseObserver(testClock, streamMediator, streamObserver1, testContext); - // - // final var concreteObserver2 = - // new ConsumerStreamResponseObserver(testClock, streamMediator, streamObserver2, testContext); - // - // final var concreteObserver3 = - // new ConsumerStreamResponseObserver(testClock, streamMediator, streamObserver3, testContext); - // - // // Set up the subscribers - // streamMediator.subscribe(concreteObserver1); - // streamMediator.subscribe(concreteObserver2); - // streamMediator.subscribe(concreteObserver3); - // - // final Notifier notifier = new NotifierImpl(streamMediator, blockNodeContext, serviceStatus); - // final var streamValidator = new StreamPersistenceHandlerImpl( - // streamMediator, notifier, blockWriter, blockNodeContext, serviceStatus); - // - // // Set up the stream verifier - // streamMediator.subscribe(streamValidator); - // - // final List blockItems = generateBlockItems(1); - // final BlockItem firstBlockItem = blockItems.getFirst(); - // - // // Right now, only a single producer calls publishEvent. In - // // that case, they will get an IOException bubbled up to them. - // // However, we will need to support multiple producers in the - // // future. In that case, we need to make sure a second producer - // // is not able to publish a block after the first producer fails. - // doThrow(new IOException()).when(blockWriter).write(List.of(firstBlockItem)); - // - // streamMediator.publish(List.of(firstBlockItem)); - // - // Thread.sleep(testTimeout); - // - // // Confirm the counter was incremented only once - // assertEquals(1, blockNodeContext.metricsService().get(LiveBlockItems).get()); - // - // // Confirm the error counter was incremented - // assertEquals( - // 1, - // blockNodeContext - // .metricsService() - // .get(LiveBlockStreamMediatorError) - // .get()); - // - // // Send another block item after the exception - // streamMediator.publish(List.of(blockItems.get(1))); - // final BlockItemSet blockItemSet = - // BlockItemSet.newBuilder().blockItems(firstBlockItem).build(); - // final var subscribeStreamResponse = - // SubscribeStreamResponse.newBuilder().blockItems(blockItemSet).build(); - // verify(streamObserver1, timeout(testTimeout).times(1)).onNext(subscribeStreamResponse); - // verify(streamObserver2, timeout(testTimeout).times(1)).onNext(subscribeStreamResponse); - // verify(streamObserver3, timeout(testTimeout).times(1)).onNext(subscribeStreamResponse); - // - // // TODO: Replace READ_STREAM_SUCCESS (2) with a generic error code? - // final SubscribeStreamResponse endOfStreamResponse = SubscribeStreamResponse.newBuilder() - // .status(SubscribeStreamResponseCode.READ_STREAM_SUCCESS) - // .build(); - // verify(streamObserver1, timeout(testTimeout).times(1)).onNext(endOfStreamResponse); - // verify(streamObserver2, timeout(testTimeout).times(1)).onNext(endOfStreamResponse); - // verify(streamObserver3, timeout(testTimeout).times(1)).onNext(endOfStreamResponse); - // - // // verify write method only called once despite the second block being published. - // verify(blockWriter, timeout(testTimeout).times(1)).write(List.of(firstBlockItem)); - // } - // @Test - // public void testUnsubscribeWhenNotSubscribed() throws IOException { - // - // final BlockNodeContext blockNodeContext = TestConfigUtil.getTestBlockNodeContext(); - // final ServiceStatus serviceStatus = new ServiceStatusImpl(blockNodeContext); - // final var streamMediator = LiveStreamMediatorBuilder.newBuilder(blockNodeContext, serviceStatus) - // .build(); - // - // // register the stream validator - // final var streamValidator = new StreamPersistenceHandlerImpl( - // streamMediator, notifier, blockWriter, blockNodeContext, serviceStatus); - // streamMediator.subscribe(streamValidator); - // - // final var testConsumerBlockItemObserver = - // new TestConsumerStreamResponseObserver(testClock, streamMediator, streamObserver1, testContext); - // - // // Confirm the observer is not subscribed - // assertFalse(streamMediator.isSubscribed(testConsumerBlockItemObserver)); - // - // // Attempt to unsubscribe the observer - // streamMediator.unsubscribe(testConsumerBlockItemObserver); - // - // // Confirm the observer is still not subscribed - // assertFalse(streamMediator.isSubscribed(testConsumerBlockItemObserver)); - // - // // Confirm the stream validator is still subscribed - // assertTrue(streamMediator.isSubscribed(streamValidator)); - // } + @Test + public void testMediatorBlocksPublishAfterException() throws IOException, InterruptedException { - // private static class TestConsumerStreamResponseObserver extends ConsumerStreamResponseObserver { - // public TestConsumerStreamResponseObserver( - // @NonNull final InstantSource producerLivenessClock, - // @NonNull final StreamMediator, SubscribeStreamResponse> streamMediator, - // @NonNull final Pipeline responseStreamObserver, - // @NonNull final BlockNodeContext blockNodeContext) { - // super(producerLivenessClock, streamMediator, responseStreamObserver, blockNodeContext); - // } - // - // @NonNull - // public Runnable getOnCancel() { - // return onCancel; - // } - // - // @NonNull - // public Runnable getOnClose() { - // return onClose; + final BlockNodeContext blockNodeContext = TestConfigUtil.getTestBlockNodeContext(); + final ServiceStatus serviceStatus = new ServiceStatusImpl(blockNodeContext); + final var streamMediator = LiveStreamMediatorBuilder.newBuilder(blockNodeContext, serviceStatus) + .build(); + + final var concreteObserver1 = new ConsumerStreamResponseObserver( + testClock, streamMediator, helidonSubscribeStreamObserver1, testContext); + + final var concreteObserver2 = new ConsumerStreamResponseObserver( + testClock, streamMediator, helidonSubscribeStreamObserver2, testContext); + + final var concreteObserver3 = new ConsumerStreamResponseObserver( + testClock, streamMediator, helidonSubscribeStreamObserver3, testContext); + + // Set up the subscribers + streamMediator.subscribe(concreteObserver1); + streamMediator.subscribe(concreteObserver2); + streamMediator.subscribe(concreteObserver3); + + final Notifier notifier = new NotifierImpl(streamMediator, blockNodeContext, serviceStatus); + final var handler = new StreamPersistenceHandlerImpl( + streamMediator, notifier, blockWriter, blockNodeContext, serviceStatus); + + // Set up the stream verifier + streamMediator.subscribe(handler); + + final List blockItems = generateBlockItemsUnparsed(1); + final BlockItemUnparsed firstBlockItem = blockItems.getFirst(); + + // Right now, only a single producer calls publishEvent. In + // that case, they will get an IOException bubbled up to them. + // However, we will need to support multiple producers in the + // future. In that case, we need to make sure a second producer + // is not able to publish a block after the first producer fails. + doThrow(new IOException()).when(blockWriter).write(List.of(firstBlockItem)); + + streamMediator.publish(List.of(firstBlockItem)); + + Thread.sleep(testTimeout); + + // Confirm the counter was incremented only once + assertEquals(1, blockNodeContext.metricsService().get(LiveBlockItems).get()); + + // Confirm the error counter was incremented + assertEquals( + 1, + blockNodeContext + .metricsService() + .get(LiveBlockStreamMediatorError) + .get()); + + // Send another block item after the exception + streamMediator.publish(List.of(blockItems.get(1))); + final BlockItemSetUnparsed blockItemSet = + BlockItemSetUnparsed.newBuilder().blockItems(firstBlockItem).build(); + final var subscribeStreamResponse = SubscribeStreamResponseUnparsed.newBuilder() + .blockItems(blockItemSet) + .build(); + verify(helidonSubscribeStreamObserver1, timeout(testTimeout).times(1)).onNext(subscribeStreamResponse); + verify(helidonSubscribeStreamObserver2, timeout(testTimeout).times(1)).onNext(subscribeStreamResponse); + verify(helidonSubscribeStreamObserver3, timeout(testTimeout).times(1)).onNext(subscribeStreamResponse); + + // TODO: Replace READ_STREAM_SUCCESS (2) with a generic error code? + final var endOfStreamResponse = SubscribeStreamResponseUnparsed.newBuilder() + .status(SubscribeStreamResponseCode.READ_STREAM_SUCCESS) + .build(); + verify(helidonSubscribeStreamObserver1, timeout(testTimeout).times(1)).onNext(endOfStreamResponse); + verify(helidonSubscribeStreamObserver2, timeout(testTimeout).times(1)).onNext(endOfStreamResponse); + verify(helidonSubscribeStreamObserver3, timeout(testTimeout).times(1)).onNext(endOfStreamResponse); + + // verify write method only called once despite the second block being published. + verify(blockWriter, timeout(testTimeout).times(1)).write(List.of(firstBlockItem)); + } + + @Test + public void testUnsubscribeWhenNotSubscribed() throws IOException { + + final BlockNodeContext blockNodeContext = TestConfigUtil.getTestBlockNodeContext(); + final ServiceStatus serviceStatus = new ServiceStatusImpl(blockNodeContext); + final var streamMediator = LiveStreamMediatorBuilder.newBuilder(blockNodeContext, serviceStatus) + .build(); + + // register the stream validator + final var handler = new StreamPersistenceHandlerImpl( + streamMediator, notifier, blockWriter, blockNodeContext, serviceStatus); + streamMediator.subscribe(handler); + + final var testConsumerBlockItemObserver = new ConsumerStreamResponseObserver( + testClock, streamMediator, helidonSubscribeStreamObserver1, testContext); + + // Confirm the observer is not subscribed + assertFalse(streamMediator.isSubscribed(testConsumerBlockItemObserver)); + + // Attempt to unsubscribe the observer + streamMediator.unsubscribe(testConsumerBlockItemObserver); + + // Confirm the observer is still not subscribed + assertFalse(streamMediator.isSubscribed(testConsumerBlockItemObserver)); + + // Confirm the stream validator is still subscribed + assertTrue(streamMediator.isSubscribed(handler)); + } + + // private static class TestConsumerStreamResponseObserver extends ConsumerStreamResponseObserver { + // public TestConsumerStreamResponseObserver( + // @NonNull final InstantSource producerLivenessClock, + // @NonNull final StreamMediator, SubscribeStreamResponseUnparsed> + // streamMediator, + // @NonNull final Pipeline responseStreamObserver, + // @NonNull final BlockNodeContext blockNodeContext) { + // super(producerLivenessClock, streamMediator, responseStreamObserver, blockNodeContext); + // } // } - // } }