diff --git a/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamGrpcClientImpl.java b/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamGrpcClientImpl.java index 3cbeef3d..1d6b97bc 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamGrpcClientImpl.java +++ b/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamGrpcClientImpl.java @@ -79,12 +79,13 @@ public void init() { lastKnownStatuses.clear(); } + @Override public void requestBlocks(long startBlock, long endBlock) throws InterruptedException { Preconditions.requireWhole(startBlock); Preconditions.requireWhole(endBlock); CountDownLatch streamLatch = new CountDownLatch(1); - consumerStreamObserver = new ConsumerStreamObserver(metricsService, streamLatch); + consumerStreamObserver = new ConsumerStreamObserver(metricsService, streamLatch, lastKnownStatuses); SubscribeStreamRequest request = SubscribeStreamRequest.newBuilder() .setStartBlockNumber(startBlock) @@ -96,6 +97,7 @@ public void requestBlocks(long startBlock, long endBlock) throws InterruptedExce streamLatch.await(); } + @Override public void completeStreaming() throws InterruptedException { consumerStreamObserver.onCompleted(); // todo(352) Find a suitable solution for removing the sleep @@ -112,6 +114,7 @@ public List getLastKnownStatuses() { return List.copyOf(lastKnownStatuses); } + @Override public void shutdown() { channel.shutdown(); } diff --git a/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamObserver.java b/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamObserver.java index 7e327e7c..2c2daff8 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamObserver.java +++ b/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamObserver.java @@ -44,6 +44,7 @@ public class ConsumerStreamObserver implements StreamObserver lastKnownStatuses; /** * Constructs a new ConsumerStreamObserver. @@ -53,9 +54,12 @@ public class ConsumerStreamObserver implements StreamObserver lastKnownStatuses) { this.metricsService = requireNonNull(metricsService); this.streamLatch = requireNonNull(streamLatch); + this.lastKnownStatuses = requireNonNull(lastKnownStatuses); } /** @@ -67,6 +71,7 @@ public ConsumerStreamObserver( @Override public void onNext(SubscribeStreamResponse subscribeStreamResponse) { final SubscribeStreamResponse.ResponseCase responseType = subscribeStreamResponse.getResponseCase(); + lastKnownStatuses.add(subscribeStreamResponse.toString()); switch (responseType) { case STATUS -> LOGGER.log(INFO, "Received Response: " + subscribeStreamResponse); @@ -84,6 +89,7 @@ public void onNext(SubscribeStreamResponse subscribeStreamResponse) { @Override public void onError(Throwable streamError) { Status status = Status.fromThrowable(streamError); + lastKnownStatuses.add(status.toString()); LOGGER.log(ERROR, "Error %s with status %s.".formatted(streamError, status), streamError); streamLatch.countDown(); } diff --git a/simulator/src/test/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamGrpcClientImplTest.java b/simulator/src/test/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamGrpcClientImplTest.java new file mode 100644 index 00000000..7a30896b --- /dev/null +++ b/simulator/src/test/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamGrpcClientImplTest.java @@ -0,0 +1,189 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.simulator.grpc.impl; + +import static com.hedera.block.simulator.TestUtils.getTestMetrics; +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +import com.hedera.block.simulator.TestUtils; +import com.hedera.block.simulator.config.data.GrpcConfig; +import com.hedera.block.simulator.grpc.ConsumerStreamGrpcClient; +import com.hedera.block.simulator.metrics.MetricsService; +import com.hedera.block.simulator.metrics.MetricsServiceImpl; +import com.hedera.hapi.block.protoc.*; +import com.hedera.hapi.block.stream.output.protoc.BlockHeader; +import com.hedera.hapi.block.stream.protoc.BlockItem; +import com.hedera.hapi.block.stream.protoc.BlockProof; +import com.swirlds.config.api.Configuration; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.Status; +import io.grpc.stub.StreamObserver; +import java.io.IOException; +import java.net.ServerSocket; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +public class ConsumerStreamGrpcClientImplTest { + @Mock + private GrpcConfig grpcConfig; + + private MetricsService metricsService; + private ConsumerStreamGrpcClient consumerStreamGrpcClientImpl; + private Server server; + private int serverPort; + + @BeforeEach + void setUp() throws IOException { + MockitoAnnotations.openMocks(this); + serverPort = findFreePort(); + server = ServerBuilder.forPort(serverPort) + .addService(new BlockStreamServiceGrpc.BlockStreamServiceImplBase() { + @Override + public void subscribeBlockStream( + SubscribeStreamRequest request, StreamObserver responseObserver) { + + // Simulate streaming blocks + long startBlock = request.getStartBlockNumber(); + long endBlock = request.getEndBlockNumber(); + + for (long i = startBlock; i < endBlock; i++) { + // Simulate block items + BlockItem blockItemHeader = BlockItem.newBuilder() + .setBlockHeader(BlockHeader.newBuilder() + .setNumber(i) + .build()) + .build(); + BlockItem blockItemProof = BlockItem.newBuilder() + .setBlockProof( + BlockProof.newBuilder().setBlock(i).build()) + .build(); + + BlockItemSet blockItems = BlockItemSet.newBuilder() + .addBlockItems(blockItemHeader) + .addBlockItems(blockItemProof) + .build(); + + responseObserver.onNext(SubscribeStreamResponse.newBuilder() + .setBlockItems(blockItems) + .build()); + } + + // Send success status code at the end + responseObserver.onNext(SubscribeStreamResponse.newBuilder() + .setStatus(SubscribeStreamResponseCode.READ_STREAM_SUCCESS) + .build()); + responseObserver.onCompleted(); + } + }) + .build() + .start(); + + when(grpcConfig.serverAddress()).thenReturn("localhost"); + when(grpcConfig.port()).thenReturn(serverPort); + + Configuration config = TestUtils.getTestConfiguration(); + metricsService = new MetricsServiceImpl(getTestMetrics(config)); + consumerStreamGrpcClientImpl = new ConsumerStreamGrpcClientImpl(grpcConfig, metricsService); + consumerStreamGrpcClientImpl.init(); + } + + @AfterEach + public void tearDown() { + consumerStreamGrpcClientImpl.shutdown(); + server.shutdownNow(); + } + + @Test + public void testInit() { + assertTrue(consumerStreamGrpcClientImpl.getLastKnownStatuses().isEmpty()); + } + + @Test + void requestBlocks_Success() throws InterruptedException { + final long startBlock = 0; + final long endBlock = 5; + + assertEquals(startBlock, consumerStreamGrpcClientImpl.getConsumedBlocks()); + assertTrue(consumerStreamGrpcClientImpl.getLastKnownStatuses().isEmpty()); + + consumerStreamGrpcClientImpl.requestBlocks(startBlock, endBlock); + + // We check if the final status matches what we have send from the server. + final String lastStatus = + consumerStreamGrpcClientImpl.getLastKnownStatuses().getLast(); + assertTrue(lastStatus.contains("status: %s".formatted(SubscribeStreamResponseCode.READ_STREAM_SUCCESS.name()))); + + assertEquals(endBlock, consumerStreamGrpcClientImpl.getConsumedBlocks()); + } + + @Test + void requestBlocks_InvalidStartBlock() { + final long startBlock = -1; + final long endBlock = 5; + + assertThrows( + IllegalArgumentException.class, () -> consumerStreamGrpcClientImpl.requestBlocks(startBlock, endBlock)); + } + + @Test + void requestBlocks_InvalidEndBlock() { + final long startBlock = 0; + final long endBlock = -1; + + assertThrows( + IllegalArgumentException.class, () -> consumerStreamGrpcClientImpl.requestBlocks(startBlock, endBlock)); + } + + @Test + void completeStreaming_Success() throws InterruptedException { + final long startBlock = 0; + final long endBlock = 5; + + consumerStreamGrpcClientImpl.requestBlocks(startBlock, endBlock); + consumerStreamGrpcClientImpl.completeStreaming(); + } + + @Test + void shutdown() throws InterruptedException { + final long startBlock = 0; + final long endBlock = 5; + + consumerStreamGrpcClientImpl.shutdown(); + consumerStreamGrpcClientImpl.requestBlocks(startBlock, endBlock); + + // We check if the first status is UNAVAILABLE, because should fail immediately + final String firstStatus = + consumerStreamGrpcClientImpl.getLastKnownStatuses().getFirst(); + assertEquals( + Status.UNAVAILABLE + .augmentDescription("Channel shutdown invoked") + .toString(), + firstStatus); + } + + private int findFreePort() throws IOException { + // Find a free port + try (ServerSocket socket = new ServerSocket(0)) { + return socket.getLocalPort(); + } + } +} diff --git a/simulator/src/test/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamObserverTest.java b/simulator/src/test/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamObserverTest.java index 64d7627e..5e15600a 100644 --- a/simulator/src/test/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamObserverTest.java +++ b/simulator/src/test/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamObserverTest.java @@ -30,6 +30,7 @@ import com.hedera.hapi.block.stream.protoc.BlockItem; import com.swirlds.config.api.Configuration; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -40,6 +41,7 @@ class ConsumerStreamObserverTest { private MetricsService metricsService; private CountDownLatch streamLatch; + private List lastKnownStatuses; private ConsumerStreamObserver observer; @BeforeEach @@ -48,14 +50,18 @@ void setUp() throws IOException { metricsService = spy(new MetricsServiceImpl(getTestMetrics(config))); streamLatch = mock(CountDownLatch.class); + List lastKnownStatuses = new ArrayList<>(); - observer = new ConsumerStreamObserver(metricsService, streamLatch); + observer = new ConsumerStreamObserver(metricsService, streamLatch, lastKnownStatuses); } @Test void testConstructorWithNullArguments() { - assertThrows(NullPointerException.class, () -> new ConsumerStreamObserver(null, streamLatch)); - assertThrows(NullPointerException.class, () -> new ConsumerStreamObserver(metricsService, null)); + assertThrows( + NullPointerException.class, () -> new ConsumerStreamObserver(null, streamLatch, lastKnownStatuses)); + assertThrows( + NullPointerException.class, () -> new ConsumerStreamObserver(metricsService, null, lastKnownStatuses)); + assertThrows(NullPointerException.class, () -> new ConsumerStreamObserver(metricsService, streamLatch, null)); } @Test