diff --git a/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/PublishStreamGrpcClientImpl.java b/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/PublishStreamGrpcClientImpl.java index e9ab8068..6356141d 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/PublishStreamGrpcClientImpl.java +++ b/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/PublishStreamGrpcClientImpl.java @@ -64,7 +64,7 @@ public class PublishStreamGrpcClientImpl implements PublishStreamGrpcClient { // State private final AtomicBoolean streamEnabled; - private final List lastKnownStatuses; + private final List lastKnownStatuses = new ArrayList<>(); /** * Creates a new PublishStreamGrpcClientImpl with the specified dependencies. @@ -85,7 +85,6 @@ public PublishStreamGrpcClientImpl( this.blockStreamConfig = requireNonNull(blockStreamConfig); this.metricsService = requireNonNull(metricsService); this.streamEnabled = requireNonNull(streamEnabled); - this.lastKnownStatuses = new ArrayList<>(); } /** diff --git a/simulator/src/test/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamGrpcClientImplTest.java b/simulator/src/test/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamGrpcClientImplTest.java index ba2e0a8f..c6a8e12d 100644 --- a/simulator/src/test/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamGrpcClientImplTest.java +++ b/simulator/src/test/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamGrpcClientImplTest.java @@ -36,7 +36,6 @@ import io.grpc.Status; import io.grpc.stub.StreamObserver; import java.io.IOException; -import java.net.ServerSocket; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; diff --git a/simulator/src/test/java/com/hedera/block/simulator/grpc/impl/PublishStreamGrpcClientImplTest.java b/simulator/src/test/java/com/hedera/block/simulator/grpc/impl/PublishStreamGrpcClientImplTest.java index d9441e73..dc467d80 100644 --- a/simulator/src/test/java/com/hedera/block/simulator/grpc/impl/PublishStreamGrpcClientImplTest.java +++ b/simulator/src/test/java/com/hedera/block/simulator/grpc/impl/PublishStreamGrpcClientImplTest.java @@ -19,8 +19,6 @@ import static com.hedera.block.simulator.TestUtils.findFreePort; import static com.hedera.block.simulator.TestUtils.getTestMetrics; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.*; @@ -31,123 +29,100 @@ import com.hedera.block.simulator.metrics.MetricsService; import com.hedera.block.simulator.metrics.MetricsServiceImpl; import com.hedera.hapi.block.protoc.*; +import com.hedera.hapi.block.stream.output.protoc.BlockHeader; import com.hedera.hapi.block.stream.protoc.Block; import com.hedera.hapi.block.stream.protoc.BlockItem; +import com.hedera.hapi.block.stream.protoc.BlockProof; import com.swirlds.config.api.Configuration; -import io.grpc.ManagedChannel; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.stub.StreamObserver; import java.io.IOException; -import java.lang.reflect.Field; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; - -import io.grpc.Server; -import io.grpc.ServerBuilder; -import io.grpc.stub.StreamObserver; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import org.mockito.junit.jupiter.MockitoExtension; -@ExtendWith(MockitoExtension.class) class PublishStreamGrpcClientImplTest { private MetricsService metricsService; private PublishStreamGrpcClient publishStreamGrpcClient; + @Mock private GrpcConfig grpcConfig; + private BlockStreamConfig blockStreamConfig; private AtomicBoolean streamEnabled; private Server server; @BeforeEach void setUp() throws IOException { -// MockitoAnnotations.openMocks(this); -// -// // Initialize the AtomicBoolean -// streamEnabled = new AtomicBoolean(true); -// -// // Start the server on a random port - final int serverPort = findFreePort(); -// server = ServerBuilder.forPort(serverPort) -// .addService(new BlockStreamServiceGrpc.BlockStreamServiceImplBase() { -// @Override -// public StreamObserver publishBlockStream( -// StreamObserver responseObserver) { -// -// return new StreamObserver() { -// -// private boolean firstBlockReceived = false; -// private long lastBlockNumber = 0; -// -// @Override -// public void onNext(PublishStreamRequest request) { -// if (!streamEnabled.get()) { -// // Send an error response -// PublishStreamResponse.EndOfStream endOfStream = PublishStreamResponse.EndOfStream.newBuilder() -// .setStatus(PublishStreamResponseCode.STREA) -// .setBlockNumber(lastBlockNumber) -// .build(); -// responseObserver.onNext(PublishStreamResponse.newBuilder() -// .setStatus(endOfStream) -// .build()); -// responseObserver.onCompleted(); -// return; -// } -// -// BlockItemSet blockItems = request.getBlockItems(); -// List items = blockItems.getBlockItemsList(); -// -// // Simulate processing of block items -// for (BlockItem item : items) { -// // Assume that the first BlockItem is a BlockHeader -// if (item.hasBlockHeader()) { -// firstBlockReceived = true; -// lastBlockNumber = item.getBlockHeader().getBlockNumber(); -// } -// // Assume that the last BlockItem is a BlockProof -// if (item.hasBlockProof()) { -// // Send BlockAcknowledgement -// PublishStreamResponse.Acknowledgement acknowledgement = -// PublishStreamResponse.Acknowledgement.newBuilder() -// .setBlockAck(PublishStreamResponse.BlockAcknowledgement.newBuilder() -// .setBlockNumber(lastBlockNumber) -// .setBlockRootHash(item.getBlockProof().getMerkleRoot()) -// .build()) -// .build(); -// responseObserver.onNext(PublishStreamResponse.newBuilder() -// .setAcknowledgement(acknowledgement) -// .build()); -// } -// } -// } -// -// @Override -// public void onError(Throwable t) { -// // Handle error -// } -// -// @Override -// public void onCompleted() { -// // Send EndOfStream response -// PublishStreamResponse.EndOfStream endOfStream = PublishStreamResponse.EndOfStream.newBuilder() -// .setStatus(PublishStreamResponseCode.STREAM_ITEMS_SUCCESS) -// .setBlockNumber(lastBlockNumber) -// .build(); -// responseObserver.onNext(PublishStreamResponse.newBuilder() -// .setStatus(endOfStream) -// .build()); -// responseObserver.onCompleted(); -// } -// }; -// } -// }) -// .build() -// .start(); + MockitoAnnotations.openMocks(this); + + streamEnabled = new AtomicBoolean(true); + final int serverPort = findFreePort(); + server = ServerBuilder.forPort(serverPort) + .addService(new BlockStreamServiceGrpc.BlockStreamServiceImplBase() { + @Override + public StreamObserver publishBlockStream( + StreamObserver responseObserver) { + return new StreamObserver<>() { + private long lastBlockNumber = 0; + + @Override + public void onNext(PublishStreamRequest request) { + BlockItemSet blockItems = request.getBlockItems(); + List items = blockItems.getBlockItemsList(); + // Simulate processing of block items + for (BlockItem item : items) { + // Assume that the first BlockItem is a BlockHeader + if (item.hasBlockHeader()) { + lastBlockNumber = item.getBlockHeader().getNumber(); + } + // Assume that the last BlockItem is a BlockProof + if (item.hasBlockProof()) { + // Send BlockAcknowledgement + PublishStreamResponse.Acknowledgement acknowledgement = + PublishStreamResponse.Acknowledgement.newBuilder() + .setBlockAck( + PublishStreamResponse.BlockAcknowledgement.newBuilder() + .setBlockNumber(lastBlockNumber) + .build()) + .build(); + responseObserver.onNext(PublishStreamResponse.newBuilder() + .setAcknowledgement(acknowledgement) + .build()); + } + } + } + + @Override + public void onError(Throwable t) { + // handle onError + } + + @Override + public void onCompleted() { + PublishStreamResponse.EndOfStream endOfStream = + PublishStreamResponse.EndOfStream.newBuilder() + .setStatus(PublishStreamResponseCode.STREAM_ITEMS_SUCCESS) + .setBlockNumber(lastBlockNumber) + .build(); + responseObserver.onNext(PublishStreamResponse.newBuilder() + .setStatus(endOfStream) + .build()); + responseObserver.onCompleted(); + } + }; + } + }) + .build() + .start(); blockStreamConfig = TestUtils.getTestConfiguration(Map.of("blockStream.blockItemsBatchSize", "2")) .getConfigData(BlockStreamConfig.class); @@ -163,90 +138,71 @@ void setUp() throws IOException { } @AfterEach - void teardown() { -// server.shutdownNow(); - } - - @Test - void streamBlockItem() throws InterruptedException { - BlockItem blockItem = BlockItem.newBuilder().build(); - publishStreamGrpcClient.init(); - boolean result = publishStreamGrpcClient.streamBlockItem(List.of(blockItem)); - assertTrue(result); - + void teardown() throws InterruptedException { publishStreamGrpcClient.completeStreaming(); publishStreamGrpcClient.shutdown(); + + if (server != null) { + server.shutdown(); + } } @Test - void streamBlock() throws InterruptedException { - BlockItem blockItem = BlockItem.newBuilder().build(); - Block block = Block.newBuilder().addItems(blockItem).build(); - - Block block1 = Block.newBuilder() - .addItems(blockItem) - .addItems(blockItem) - .addItems(blockItem) - .build(); - + public void testInit() { publishStreamGrpcClient.init(); + // Verify that lastKnownStatuses is cleared assertTrue(publishStreamGrpcClient.getLastKnownStatuses().isEmpty()); - - boolean result = publishStreamGrpcClient.streamBlock(block); - assertTrue(result); - - boolean result1 = publishStreamGrpcClient.streamBlock(block1); - assertTrue(result1); - - assertEquals(2, publishStreamGrpcClient.getPublishedBlocks()); - publishStreamGrpcClient.completeStreaming(); - publishStreamGrpcClient.shutdown(); } @Test - void streamBlockFailsBecauseOfCompletedStreaming() throws InterruptedException { - BlockItem blockItem = BlockItem.newBuilder().build(); - Block block = Block.newBuilder().addItems(blockItem).build(); - + void testStreamBlockItem_Success() { publishStreamGrpcClient.init(); - assertTrue(publishStreamGrpcClient.getLastKnownStatuses().isEmpty()); - publishStreamGrpcClient.completeStreaming(); - assertThrows(IllegalStateException.class, () -> publishStreamGrpcClient.streamBlock(block)); - publishStreamGrpcClient.shutdown(); - } + BlockItem blockItem = BlockItem.newBuilder() + .setBlockHeader(BlockHeader.newBuilder().setNumber(0).build()) + .build(); - @Test - void streamBlockReturnsFalse() throws InterruptedException { - BlockItem blockItem = BlockItem.newBuilder().build(); - Block block = Block.newBuilder().addItems(blockItem).build(); - streamEnabled.set(false); - publishStreamGrpcClient.init(); + List blockItems = List.of(blockItem); - boolean result = publishStreamGrpcClient.streamBlock(block); - assertFalse(result); - publishStreamGrpcClient.completeStreaming(); - publishStreamGrpcClient.shutdown(); + final boolean result = publishStreamGrpcClient.streamBlockItem(blockItems); + assertTrue(result); } @Test - void testShutdown() throws Exception { - PublishStreamGrpcClientImpl publishStreamGrpcClient = - new PublishStreamGrpcClientImpl(grpcConfig, blockStreamConfig, metricsService, streamEnabled); + void testStreamBlock_Success() throws InterruptedException { publishStreamGrpcClient.init(); + final int streamedBlocks = 3; + + for (int i = 0; i < streamedBlocks; i++) { + BlockItem blockItemHeader = BlockItem.newBuilder() + .setBlockHeader(BlockHeader.newBuilder().setNumber(i).build()) + .build(); + BlockItem blockItemProof = BlockItem.newBuilder() + .setBlockProof(BlockProof.newBuilder().setBlock(i).build()) + .build(); + Block block = Block.newBuilder() + .addItems(blockItemHeader) + .addItems(blockItemProof) + .build(); + + final boolean result = publishStreamGrpcClient.streamBlock(block); + assertTrue(result); + } - Field channelField = PublishStreamGrpcClientImpl.class.getDeclaredField("channel"); - ManagedChannel mockChannel = mock(ManagedChannel.class); + // we use simple retry mechanism here, because sometimes server takes some time to receive the stream + long retryNumber = 1; + long waitTime = 500; - try { - channelField.setAccessible(true); - channelField.set(publishStreamGrpcClient, mockChannel); - } finally { - channelField.setAccessible(false); + while (retryNumber < 3) { + if (!publishStreamGrpcClient.getLastKnownStatuses().isEmpty()) { + break; + } + Thread.sleep(retryNumber * waitTime); + retryNumber++; } - publishStreamGrpcClient.shutdown(); - // Verify that channel.shutdown() was called - verify(mockChannel).shutdown(); + assertEquals(streamedBlocks, publishStreamGrpcClient.getPublishedBlocks()); + assertEquals( + streamedBlocks, publishStreamGrpcClient.getLastKnownStatuses().size()); } }