Skip to content

Commit

Permalink
feat: add error handling in publisher (#327)
Browse files Browse the repository at this point in the history
Signed-off-by: georgi-l95 <[email protected]>
  • Loading branch information
georgi-l95 authored Nov 1, 2024
1 parent ccff828 commit b143a45
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import dagger.Binds;
import dagger.Module;
import dagger.Provides;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.inject.Singleton;

/** The module used to inject the gRPC client. */
Expand All @@ -32,6 +34,16 @@ public interface GrpcInjectionModule {
*/
@Singleton
@Binds
PublishStreamGrpcClient bindPublishStreamGrpcClient(
PublishStreamGrpcClientImpl publishStreamGrpcClient);
PublishStreamGrpcClient bindPublishStreamGrpcClient(PublishStreamGrpcClientImpl publishStreamGrpcClient);

/**
* Provides the stream enabled flag
*
* @return the stream enabled flag
*/
@Singleton
@Provides
static AtomicBoolean provideStreamEnabledFlag() {
return new AtomicBoolean(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.inject.Inject;

/**
Expand All @@ -47,6 +48,7 @@ public class PublishStreamGrpcClientImpl implements PublishStreamGrpcClient {
private StreamObserver<PublishStreamRequest> requestStreamObserver;
private final BlockStreamConfig blockStreamConfig;
private final GrpcConfig grpcConfig;
private final AtomicBoolean streamEnabled;
private ManagedChannel channel;
private final MetricsService metricsService;

Expand All @@ -56,15 +58,18 @@ public class PublishStreamGrpcClientImpl implements PublishStreamGrpcClient {
* @param grpcConfig the gRPC configuration
* @param blockStreamConfig the block stream configuration
* @param metricsService the metrics service
* @param streamEnabled the flag responsible for enabling and disabling of the streaming
*/
@Inject
public PublishStreamGrpcClientImpl(
@NonNull final GrpcConfig grpcConfig,
@NonNull final BlockStreamConfig blockStreamConfig,
@NonNull final MetricsService metricsService) {
@NonNull final MetricsService metricsService,
@NonNull final AtomicBoolean streamEnabled) {
this.grpcConfig = requireNonNull(grpcConfig);
this.blockStreamConfig = requireNonNull(blockStreamConfig);
this.metricsService = requireNonNull(metricsService);
this.streamEnabled = requireNonNull(streamEnabled);
}

/**
Expand All @@ -76,7 +81,7 @@ public void init() {
.usePlaintext()
.build();
BlockStreamServiceGrpc.BlockStreamServiceStub stub = BlockStreamServiceGrpc.newStub(channel);
PublishStreamObserver publishStreamObserver = new PublishStreamObserver();
PublishStreamObserver publishStreamObserver = new PublishStreamObserver(streamEnabled);
requestStreamObserver = stub.publishBlockStream(publishStreamObserver);
}

Expand Down Expand Up @@ -110,6 +115,9 @@ public boolean streamBlockItem(List<BlockItem> blockItems) {
*/
@Override
public boolean streamBlock(Block block) {
if (!streamEnabled.get()) {
return false;
}
List<com.hedera.hapi.block.stream.protoc.BlockItem> blockItemsProtoc = new ArrayList<>();
for (BlockItem blockItem : block.items()) {
blockItemsProtoc.add(Translator.fromPbj(blockItem));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,14 @@

package com.hedera.block.simulator.grpc;

import static java.util.Objects.requireNonNull;

import com.hedera.hapi.block.protoc.PublishStreamResponse;
import edu.umd.cs.findbugs.annotations.NonNull;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.lang.System.Logger;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* The PublishStreamObserver class provides the methods to observe the stream of the published
Expand All @@ -27,21 +32,28 @@
public class PublishStreamObserver implements StreamObserver<PublishStreamResponse> {

private final Logger logger = System.getLogger(getClass().getName());
private final AtomicBoolean streamEnabled;

/** Creates a new PublishStreamObserver instance. */
public PublishStreamObserver() {}
/** Creates a new PublishStreamObserver instance.
*
* @param streamEnabled is responsible for signaling, whether streaming should continue
*/
public PublishStreamObserver(@NonNull final AtomicBoolean streamEnabled) {
this.streamEnabled = requireNonNull(streamEnabled);
}

/** what will the stream observer do with the response from the server */
@Override
public void onNext(PublishStreamResponse publishStreamResponse) {
logger.log(Logger.Level.INFO, "Received Response: " + publishStreamResponse.toString());
}

/** what will the stream observer do when an error occurs */
/** Responsible for stream observer behaviour, in case of error. For now, we will stop the stream for every error. In the future we'd want to have a retry mechanism depending on the error. */
@Override
public void onError(Throwable throwable) {
logger.log(Logger.Level.ERROR, "Error: " + throwable.toString());
// @todo(286) - handle the error
public void onError(@NonNull final Throwable streamError) {
streamEnabled.set(false);
Status status = Status.fromThrowable(streamError);
logger.log(Logger.Level.ERROR, "Error %s with status %s.".formatted(streamError, status), streamError);
}

/** what will the stream observer do when the stream is completed */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,16 @@ public void start() throws BlockSimulatorParsingException, IOException, Interrup
}

private void millisPerBlockStreaming() throws IOException, InterruptedException, BlockSimulatorParsingException {

final long secondsPerBlockNanos = (long) millisecondsPerBlock * NANOS_PER_MILLI;

Block nextBlock = blockStreamManager.getNextBlock();
while (nextBlock != null) {
long startTime = System.nanoTime();
publishStreamGrpcClient.streamBlock(nextBlock);
if (!publishStreamGrpcClient.streamBlock(nextBlock)) {
LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator stopped streaming due to errors.");
break;
}

long elapsedTime = System.nanoTime() - startTime;
long timeToDelay = secondsPerBlockNanos - elapsedTime;
if (timeToDelay > 0) {
Expand All @@ -102,7 +105,6 @@ private void millisPerBlockStreaming() throws IOException, InterruptedException,
}
nextBlock = blockStreamManager.getNextBlock();
}
LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator has stopped");
}

private void constantRateStreaming() throws InterruptedException, IOException, BlockSimulatorParsingException {
Expand All @@ -119,8 +121,11 @@ private void constantRateStreaming() throws InterruptedException, IOException, B
LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator has reached the end of the block items");
break;
}
if (!publishStreamGrpcClient.streamBlock(block)) {
LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator stopped streaming due to errors.");
break;
}

publishStreamGrpcClient.streamBlock(block);
blockItemsStreamed += block.items().size();

Thread.sleep(delayMSBetweenBlockItems, delayNSBetweenBlockItems);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.hedera.block.simulator.grpc;

import static com.hedera.block.simulator.TestUtils.getTestMetrics;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
Expand All @@ -34,6 +35,7 @@
import java.lang.reflect.Field;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -46,6 +48,7 @@ class PublishStreamGrpcClientImplTest {
GrpcConfig grpcConfig;
BlockStreamConfig blockStreamConfig;
MetricsService metricsService;
AtomicBoolean streamEnabled;

@BeforeEach
void setUp() throws IOException {
Expand All @@ -56,6 +59,7 @@ void setUp() throws IOException {

Configuration config = TestUtils.getTestConfiguration();
metricsService = new MetricsServiceImpl(getTestMetrics(config));
streamEnabled = new AtomicBoolean(true);
}

@AfterEach
Expand All @@ -65,7 +69,7 @@ void tearDown() {}
void streamBlockItem() {
BlockItem blockItem = BlockItem.newBuilder().build();
PublishStreamGrpcClientImpl publishStreamGrpcClient =
new PublishStreamGrpcClientImpl(grpcConfig, blockStreamConfig, metricsService);
new PublishStreamGrpcClientImpl(grpcConfig, blockStreamConfig, metricsService, streamEnabled);
publishStreamGrpcClient.init();
boolean result = publishStreamGrpcClient.streamBlockItem(List.of(blockItem));
assertTrue(result);
Expand All @@ -79,7 +83,8 @@ void streamBlock() {
Block block1 = Block.newBuilder().items(blockItem, blockItem, blockItem).build();

PublishStreamGrpcClientImpl publishStreamGrpcClient =
new PublishStreamGrpcClientImpl(grpcConfig, blockStreamConfig, metricsService);
new PublishStreamGrpcClientImpl(grpcConfig, blockStreamConfig, metricsService, streamEnabled);

publishStreamGrpcClient.init();
boolean result = publishStreamGrpcClient.streamBlock(block);
assertTrue(result);
Expand All @@ -88,10 +93,23 @@ void streamBlock() {
assertTrue(result1);
}

@Test
void streamBlockReturnsFalse() {
BlockItem blockItem = BlockItem.newBuilder().build();
Block block = Block.newBuilder().items(blockItem).build();
streamEnabled.set(false);
PublishStreamGrpcClientImpl publishStreamGrpcClient =
new PublishStreamGrpcClientImpl(grpcConfig, blockStreamConfig, metricsService, streamEnabled);
publishStreamGrpcClient.init();

boolean result = publishStreamGrpcClient.streamBlock(block);
assertFalse(result);
}

@Test
void testShutdown() throws Exception {
PublishStreamGrpcClientImpl publishStreamGrpcClient =
new PublishStreamGrpcClientImpl(grpcConfig, blockStreamConfig, metricsService);
new PublishStreamGrpcClientImpl(grpcConfig, blockStreamConfig, metricsService, streamEnabled);
publishStreamGrpcClient.init();

Field channelField = PublishStreamGrpcClientImpl.class.getDeclaredField("channel");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,40 @@

package com.hedera.block.simulator.grpc;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.hedera.hapi.block.protoc.PublishStreamResponse;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.jupiter.api.Test;

class PublishStreamObserverTest {

@Test
void onNext() {
PublishStreamResponse response = PublishStreamResponse.newBuilder().build();
PublishStreamObserver publishStreamObserver = new PublishStreamObserver();
AtomicBoolean streamEnabled = new AtomicBoolean(true);

PublishStreamObserver publishStreamObserver = new PublishStreamObserver(streamEnabled);
publishStreamObserver.onNext(response);
assertTrue(streamEnabled.get(), "allowNext should remain true after onCompleted");
}

@Test
void onError() {
PublishStreamObserver publishStreamObserver = new PublishStreamObserver();
AtomicBoolean streamEnabled = new AtomicBoolean(true);
PublishStreamObserver publishStreamObserver = new PublishStreamObserver(streamEnabled);

publishStreamObserver.onError(new Throwable());
assertFalse(streamEnabled.get(), "allowNext should be set to false after onError");
}

@Test
void onCompleted() {
PublishStreamObserver publishStreamObserver = new PublishStreamObserver();
AtomicBoolean streamEnabled = new AtomicBoolean(true);
PublishStreamObserver publishStreamObserver = new PublishStreamObserver(streamEnabled);

publishStreamObserver.onCompleted();
assertTrue(streamEnabled.get(), "allowNext should remain true after onCompleted");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ void testStartWithMillisPerBlockStreaming_WithBlocks() throws Exception {
.thenReturn(block2)
.thenReturn(null);

when(publishStreamGrpcClient.streamBlock(block1)).thenReturn(true);
when(publishStreamGrpcClient.streamBlock(block2)).thenReturn(true);

publisherModeHandler.start();

verify(publishStreamGrpcClient).streamBlock(block1);
Expand Down Expand Up @@ -118,6 +121,9 @@ void testStartWithConstantRateStreaming_WithinMaxItems() throws Exception {
.thenReturn(block2)
.thenReturn(null);

when(publishStreamGrpcClient.streamBlock(block1)).thenReturn(true);
when(publishStreamGrpcClient.streamBlock(block2)).thenReturn(true);

publisherModeHandler.start();

verify(publishStreamGrpcClient).streamBlock(block1);
Expand Down Expand Up @@ -154,4 +160,63 @@ void testStartWithExceptionDuringStreaming() throws Exception {
verifyNoMoreInteractions(publishStreamGrpcClient);
verifyNoMoreInteractions(blockStreamManager);
}

@Test
void testMillisPerBlockStreaming_streamSuccessBecomesFalse() throws Exception {
when(blockStreamConfig.streamingMode()).thenReturn(StreamingMode.MILLIS_PER_BLOCK);
when(blockStreamConfig.millisecondsPerBlock()).thenReturn(1000);

publisherModeHandler = new PublisherModeHandler(blockStreamConfig, publishStreamGrpcClient, blockStreamManager);

Block block1 = mock(Block.class);
Block block2 = mock(Block.class);

when(blockStreamManager.getNextBlock())
.thenReturn(block1)
.thenReturn(block2)
.thenReturn(null);

when(publishStreamGrpcClient.streamBlock(block1)).thenReturn(true);
when(publishStreamGrpcClient.streamBlock(block2)).thenReturn(false);

publisherModeHandler.start();

verify(publishStreamGrpcClient).streamBlock(block1);
verify(publishStreamGrpcClient).streamBlock(block2);
verifyNoMoreInteractions(publishStreamGrpcClient);
verify(blockStreamManager, times(2)).getNextBlock();
}

@Test
void testConstantRateStreaming_streamSuccessBecomesFalse() throws Exception {
when(blockStreamConfig.streamingMode()).thenReturn(StreamingMode.CONSTANT_RATE);
when(blockStreamConfig.delayBetweenBlockItems()).thenReturn(0);
when(blockStreamConfig.maxBlockItemsToStream()).thenReturn(100);

publisherModeHandler = new PublisherModeHandler(blockStreamConfig, publishStreamGrpcClient, blockStreamManager);

Block block1 = mock(Block.class);
Block block2 = mock(Block.class);

BlockItem blockItem1 = mock(BlockItem.class);
BlockItem blockItem2 = mock(BlockItem.class);

when(block1.items()).thenReturn(Arrays.asList(blockItem1));
when(block2.items()).thenReturn(Arrays.asList(blockItem2));

when(blockStreamManager.getNextBlock())
.thenReturn(block1)
.thenReturn(block2)
.thenReturn(null);

when(publishStreamGrpcClient.streamBlock(block1)).thenReturn(true);
when(publishStreamGrpcClient.streamBlock(block2)).thenReturn(false);

publisherModeHandler.start();

verify(publishStreamGrpcClient).streamBlock(block1);
verify(publishStreamGrpcClient).streamBlock(block2);
verifyNoMoreInteractions(publishStreamGrpcClient);
verify(blockStreamManager, times(2)).getNextBlock();
}
}

0 comments on commit b143a45

Please sign in to comment.