diff --git a/simulator/src/main/java/com/hedera/block/simulator/grpc/GrpcInjectionModule.java b/simulator/src/main/java/com/hedera/block/simulator/grpc/GrpcInjectionModule.java index dd058323e..846dcca05 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/grpc/GrpcInjectionModule.java +++ b/simulator/src/main/java/com/hedera/block/simulator/grpc/GrpcInjectionModule.java @@ -18,6 +18,8 @@ import dagger.Binds; import dagger.Module; +import dagger.Provides; +import java.util.concurrent.atomic.AtomicBoolean; import javax.inject.Singleton; /** The module used to inject the gRPC client. */ @@ -32,6 +34,16 @@ public interface GrpcInjectionModule { */ @Singleton @Binds - PublishStreamGrpcClient bindPublishStreamGrpcClient( - PublishStreamGrpcClientImpl publishStreamGrpcClient); + PublishStreamGrpcClient bindPublishStreamGrpcClient(PublishStreamGrpcClientImpl publishStreamGrpcClient); + + /** + * Provides the stream enabled flag + * + * @return the stream enabled flag + */ + @Singleton + @Provides + static AtomicBoolean provideStreamEnabledFlag() { + return new AtomicBoolean(true); + } } diff --git a/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImpl.java b/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImpl.java index 548db03c2..63b11dfcb 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImpl.java +++ b/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImpl.java @@ -35,6 +35,7 @@ import io.grpc.stub.StreamObserver; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import javax.inject.Inject; /** @@ -47,6 +48,7 @@ public class PublishStreamGrpcClientImpl implements PublishStreamGrpcClient { private StreamObserver requestStreamObserver; private final BlockStreamConfig blockStreamConfig; private final GrpcConfig grpcConfig; + private final AtomicBoolean streamEnabled; private ManagedChannel channel; private final MetricsService metricsService; @@ -56,15 +58,18 @@ public class PublishStreamGrpcClientImpl implements PublishStreamGrpcClient { * @param grpcConfig the gRPC configuration * @param blockStreamConfig the block stream configuration * @param metricsService the metrics service + * @param streamEnabled the flag responsible for enabling and disabling of the streaming */ @Inject public PublishStreamGrpcClientImpl( @NonNull final GrpcConfig grpcConfig, @NonNull final BlockStreamConfig blockStreamConfig, - @NonNull final MetricsService metricsService) { + @NonNull final MetricsService metricsService, + @NonNull final AtomicBoolean streamEnabled) { this.grpcConfig = requireNonNull(grpcConfig); this.blockStreamConfig = requireNonNull(blockStreamConfig); this.metricsService = requireNonNull(metricsService); + this.streamEnabled = requireNonNull(streamEnabled); } /** @@ -76,7 +81,7 @@ public void init() { .usePlaintext() .build(); BlockStreamServiceGrpc.BlockStreamServiceStub stub = BlockStreamServiceGrpc.newStub(channel); - PublishStreamObserver publishStreamObserver = new PublishStreamObserver(); + PublishStreamObserver publishStreamObserver = new PublishStreamObserver(streamEnabled); requestStreamObserver = stub.publishBlockStream(publishStreamObserver); } @@ -110,6 +115,9 @@ public boolean streamBlockItem(List blockItems) { */ @Override public boolean streamBlock(Block block) { + if (!streamEnabled.get()) { + return false; + } List blockItemsProtoc = new ArrayList<>(); for (BlockItem blockItem : block.items()) { blockItemsProtoc.add(Translator.fromPbj(blockItem)); diff --git a/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamObserver.java b/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamObserver.java index 8880c65b1..75bd210c2 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamObserver.java +++ b/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamObserver.java @@ -16,9 +16,14 @@ package com.hedera.block.simulator.grpc; +import static java.util.Objects.requireNonNull; + import com.hedera.hapi.block.protoc.PublishStreamResponse; +import edu.umd.cs.findbugs.annotations.NonNull; +import io.grpc.Status; import io.grpc.stub.StreamObserver; import java.lang.System.Logger; +import java.util.concurrent.atomic.AtomicBoolean; /** * The PublishStreamObserver class provides the methods to observe the stream of the published @@ -27,9 +32,15 @@ public class PublishStreamObserver implements StreamObserver { private final Logger logger = System.getLogger(getClass().getName()); + private final AtomicBoolean streamEnabled; - /** Creates a new PublishStreamObserver instance. */ - public PublishStreamObserver() {} + /** Creates a new PublishStreamObserver instance. + * + * @param streamEnabled is responsible for signaling, whether streaming should continue + */ + public PublishStreamObserver(@NonNull final AtomicBoolean streamEnabled) { + this.streamEnabled = requireNonNull(streamEnabled); + } /** what will the stream observer do with the response from the server */ @Override @@ -37,11 +48,12 @@ public void onNext(PublishStreamResponse publishStreamResponse) { logger.log(Logger.Level.INFO, "Received Response: " + publishStreamResponse.toString()); } - /** what will the stream observer do when an error occurs */ + /** Responsible for stream observer behaviour, in case of error. For now, we will stop the stream for every error. In the future we'd want to have a retry mechanism depending on the error. */ @Override - public void onError(Throwable throwable) { - logger.log(Logger.Level.ERROR, "Error: " + throwable.toString()); - // @todo(286) - handle the error + public void onError(@NonNull final Throwable streamError) { + streamEnabled.set(false); + Status status = Status.fromThrowable(streamError); + logger.log(Logger.Level.ERROR, "Error %s with status %s.".formatted(streamError, status), streamError); } /** what will the stream observer do when the stream is completed */ diff --git a/simulator/src/main/java/com/hedera/block/simulator/mode/PublisherModeHandler.java b/simulator/src/main/java/com/hedera/block/simulator/mode/PublisherModeHandler.java index 0ec4fc05e..3a1295361 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/mode/PublisherModeHandler.java +++ b/simulator/src/main/java/com/hedera/block/simulator/mode/PublisherModeHandler.java @@ -80,13 +80,16 @@ public void start() throws BlockSimulatorParsingException, IOException, Interrup } private void millisPerBlockStreaming() throws IOException, InterruptedException, BlockSimulatorParsingException { - final long secondsPerBlockNanos = (long) millisecondsPerBlock * NANOS_PER_MILLI; Block nextBlock = blockStreamManager.getNextBlock(); while (nextBlock != null) { long startTime = System.nanoTime(); - publishStreamGrpcClient.streamBlock(nextBlock); + if (!publishStreamGrpcClient.streamBlock(nextBlock)) { + LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator stopped streaming due to errors."); + break; + } + long elapsedTime = System.nanoTime() - startTime; long timeToDelay = secondsPerBlockNanos - elapsedTime; if (timeToDelay > 0) { @@ -102,7 +105,6 @@ private void millisPerBlockStreaming() throws IOException, InterruptedException, } nextBlock = blockStreamManager.getNextBlock(); } - LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator has stopped"); } private void constantRateStreaming() throws InterruptedException, IOException, BlockSimulatorParsingException { @@ -119,8 +121,11 @@ private void constantRateStreaming() throws InterruptedException, IOException, B LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator has reached the end of the block items"); break; } + if (!publishStreamGrpcClient.streamBlock(block)) { + LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator stopped streaming due to errors."); + break; + } - publishStreamGrpcClient.streamBlock(block); blockItemsStreamed += block.items().size(); Thread.sleep(delayMSBetweenBlockItems, delayNSBetweenBlockItems); diff --git a/simulator/src/test/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImplTest.java b/simulator/src/test/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImplTest.java index b768f63b6..743553ea1 100644 --- a/simulator/src/test/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImplTest.java +++ b/simulator/src/test/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImplTest.java @@ -17,6 +17,7 @@ package com.hedera.block.simulator.grpc; import static com.hedera.block.simulator.TestUtils.getTestMetrics; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -34,6 +35,7 @@ import java.lang.reflect.Field; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -46,6 +48,7 @@ class PublishStreamGrpcClientImplTest { GrpcConfig grpcConfig; BlockStreamConfig blockStreamConfig; MetricsService metricsService; + AtomicBoolean streamEnabled; @BeforeEach void setUp() throws IOException { @@ -56,6 +59,7 @@ void setUp() throws IOException { Configuration config = TestUtils.getTestConfiguration(); metricsService = new MetricsServiceImpl(getTestMetrics(config)); + streamEnabled = new AtomicBoolean(true); } @AfterEach @@ -65,7 +69,7 @@ void tearDown() {} void streamBlockItem() { BlockItem blockItem = BlockItem.newBuilder().build(); PublishStreamGrpcClientImpl publishStreamGrpcClient = - new PublishStreamGrpcClientImpl(grpcConfig, blockStreamConfig, metricsService); + new PublishStreamGrpcClientImpl(grpcConfig, blockStreamConfig, metricsService, streamEnabled); publishStreamGrpcClient.init(); boolean result = publishStreamGrpcClient.streamBlockItem(List.of(blockItem)); assertTrue(result); @@ -79,7 +83,8 @@ void streamBlock() { Block block1 = Block.newBuilder().items(blockItem, blockItem, blockItem).build(); PublishStreamGrpcClientImpl publishStreamGrpcClient = - new PublishStreamGrpcClientImpl(grpcConfig, blockStreamConfig, metricsService); + new PublishStreamGrpcClientImpl(grpcConfig, blockStreamConfig, metricsService, streamEnabled); + publishStreamGrpcClient.init(); boolean result = publishStreamGrpcClient.streamBlock(block); assertTrue(result); @@ -88,10 +93,23 @@ void streamBlock() { assertTrue(result1); } + @Test + void streamBlockReturnsFalse() { + BlockItem blockItem = BlockItem.newBuilder().build(); + Block block = Block.newBuilder().items(blockItem).build(); + streamEnabled.set(false); + PublishStreamGrpcClientImpl publishStreamGrpcClient = + new PublishStreamGrpcClientImpl(grpcConfig, blockStreamConfig, metricsService, streamEnabled); + publishStreamGrpcClient.init(); + + boolean result = publishStreamGrpcClient.streamBlock(block); + assertFalse(result); + } + @Test void testShutdown() throws Exception { PublishStreamGrpcClientImpl publishStreamGrpcClient = - new PublishStreamGrpcClientImpl(grpcConfig, blockStreamConfig, metricsService); + new PublishStreamGrpcClientImpl(grpcConfig, blockStreamConfig, metricsService, streamEnabled); publishStreamGrpcClient.init(); Field channelField = PublishStreamGrpcClientImpl.class.getDeclaredField("channel"); diff --git a/simulator/src/test/java/com/hedera/block/simulator/grpc/PublishStreamObserverTest.java b/simulator/src/test/java/com/hedera/block/simulator/grpc/PublishStreamObserverTest.java index 73c2c8222..52c42be03 100644 --- a/simulator/src/test/java/com/hedera/block/simulator/grpc/PublishStreamObserverTest.java +++ b/simulator/src/test/java/com/hedera/block/simulator/grpc/PublishStreamObserverTest.java @@ -16,7 +16,11 @@ package com.hedera.block.simulator.grpc; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + import com.hedera.hapi.block.protoc.PublishStreamResponse; +import java.util.concurrent.atomic.AtomicBoolean; import org.junit.jupiter.api.Test; class PublishStreamObserverTest { @@ -24,19 +28,28 @@ class PublishStreamObserverTest { @Test void onNext() { PublishStreamResponse response = PublishStreamResponse.newBuilder().build(); - PublishStreamObserver publishStreamObserver = new PublishStreamObserver(); + AtomicBoolean streamEnabled = new AtomicBoolean(true); + + PublishStreamObserver publishStreamObserver = new PublishStreamObserver(streamEnabled); publishStreamObserver.onNext(response); + assertTrue(streamEnabled.get(), "allowNext should remain true after onCompleted"); } @Test void onError() { - PublishStreamObserver publishStreamObserver = new PublishStreamObserver(); + AtomicBoolean streamEnabled = new AtomicBoolean(true); + PublishStreamObserver publishStreamObserver = new PublishStreamObserver(streamEnabled); + publishStreamObserver.onError(new Throwable()); + assertFalse(streamEnabled.get(), "allowNext should be set to false after onError"); } @Test void onCompleted() { - PublishStreamObserver publishStreamObserver = new PublishStreamObserver(); + AtomicBoolean streamEnabled = new AtomicBoolean(true); + PublishStreamObserver publishStreamObserver = new PublishStreamObserver(streamEnabled); + publishStreamObserver.onCompleted(); + assertTrue(streamEnabled.get(), "allowNext should remain true after onCompleted"); } } diff --git a/simulator/src/test/java/com/hedera/block/simulator/mode/PublisherModeHandlerTest.java b/simulator/src/test/java/com/hedera/block/simulator/mode/PublisherModeHandlerTest.java index 9a50f76a9..687c842c9 100644 --- a/simulator/src/test/java/com/hedera/block/simulator/mode/PublisherModeHandlerTest.java +++ b/simulator/src/test/java/com/hedera/block/simulator/mode/PublisherModeHandlerTest.java @@ -72,6 +72,9 @@ void testStartWithMillisPerBlockStreaming_WithBlocks() throws Exception { .thenReturn(block2) .thenReturn(null); + when(publishStreamGrpcClient.streamBlock(block1)).thenReturn(true); + when(publishStreamGrpcClient.streamBlock(block2)).thenReturn(true); + publisherModeHandler.start(); verify(publishStreamGrpcClient).streamBlock(block1); @@ -118,6 +121,9 @@ void testStartWithConstantRateStreaming_WithinMaxItems() throws Exception { .thenReturn(block2) .thenReturn(null); + when(publishStreamGrpcClient.streamBlock(block1)).thenReturn(true); + when(publishStreamGrpcClient.streamBlock(block2)).thenReturn(true); + publisherModeHandler.start(); verify(publishStreamGrpcClient).streamBlock(block1); @@ -154,4 +160,63 @@ void testStartWithExceptionDuringStreaming() throws Exception { verifyNoMoreInteractions(publishStreamGrpcClient); verifyNoMoreInteractions(blockStreamManager); } + + @Test + void testMillisPerBlockStreaming_streamSuccessBecomesFalse() throws Exception { + when(blockStreamConfig.streamingMode()).thenReturn(StreamingMode.MILLIS_PER_BLOCK); + when(blockStreamConfig.millisecondsPerBlock()).thenReturn(1000); + + publisherModeHandler = new PublisherModeHandler(blockStreamConfig, publishStreamGrpcClient, blockStreamManager); + + Block block1 = mock(Block.class); + Block block2 = mock(Block.class); + + when(blockStreamManager.getNextBlock()) + .thenReturn(block1) + .thenReturn(block2) + .thenReturn(null); + + when(publishStreamGrpcClient.streamBlock(block1)).thenReturn(true); + when(publishStreamGrpcClient.streamBlock(block2)).thenReturn(false); + + publisherModeHandler.start(); + + verify(publishStreamGrpcClient).streamBlock(block1); + verify(publishStreamGrpcClient).streamBlock(block2); + verifyNoMoreInteractions(publishStreamGrpcClient); + verify(blockStreamManager, times(2)).getNextBlock(); + } + + @Test + void testConstantRateStreaming_streamSuccessBecomesFalse() throws Exception { + when(blockStreamConfig.streamingMode()).thenReturn(StreamingMode.CONSTANT_RATE); + when(blockStreamConfig.delayBetweenBlockItems()).thenReturn(0); + when(blockStreamConfig.maxBlockItemsToStream()).thenReturn(100); + + publisherModeHandler = new PublisherModeHandler(blockStreamConfig, publishStreamGrpcClient, blockStreamManager); + + Block block1 = mock(Block.class); + Block block2 = mock(Block.class); + + BlockItem blockItem1 = mock(BlockItem.class); + BlockItem blockItem2 = mock(BlockItem.class); + + when(block1.items()).thenReturn(Arrays.asList(blockItem1)); + when(block2.items()).thenReturn(Arrays.asList(blockItem2)); + + when(blockStreamManager.getNextBlock()) + .thenReturn(block1) + .thenReturn(block2) + .thenReturn(null); + + when(publishStreamGrpcClient.streamBlock(block1)).thenReturn(true); + when(publishStreamGrpcClient.streamBlock(block2)).thenReturn(false); + + publisherModeHandler.start(); + + verify(publishStreamGrpcClient).streamBlock(block1); + verify(publishStreamGrpcClient).streamBlock(block2); + verifyNoMoreInteractions(publishStreamGrpcClient); + verify(blockStreamManager, times(2)).getNextBlock(); + } }