Skip to content

Commit

Permalink
test: add more tests
Browse files Browse the repository at this point in the history
Signed-off-by: georgi-l95 <[email protected]>
  • Loading branch information
georgi-l95 committed Nov 25, 2024
1 parent 583aedd commit 4d37a65
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class ConsumerStreamObserver implements StreamObserver<SubscribeStreamRes
*
* @param metricsService The service for recording consumption metrics
* @param streamLatch A latch used to coordinate stream completion
* @param lastKnownStatuses List to store the most recent status messages
* @throws NullPointerException if any parameter is null
*/
public ConsumerStreamObserver(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@
import com.hedera.hapi.block.protoc.BlockItemSet;
import com.hedera.hapi.block.protoc.SubscribeStreamResponse;
import com.hedera.hapi.block.protoc.SubscribeStreamResponseCode;
import com.hedera.hapi.block.stream.output.protoc.BlockHeader;
import com.hedera.hapi.block.stream.protoc.BlockItem;
import com.hedera.hapi.block.stream.protoc.BlockProof;
import com.swirlds.config.api.Configuration;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -78,23 +79,30 @@ void testOnNextWithStatusResponse() {

@Test
void testOnNextWithBlockItemsResponse() {
BlockItem blockItemWithProof = mock(BlockItem.class);
when(blockItemWithProof.hasBlockProof()).thenReturn(true);

BlockItem blockItemWithoutProof = mock(BlockItem.class);
when(blockItemWithoutProof.hasBlockProof()).thenReturn(false);
BlockItem blockItemHeader = BlockItem.newBuilder()
.setBlockHeader(BlockHeader.newBuilder().setNumber(0).build())
.build();
BlockItem blockItemProof = BlockItem.newBuilder()
.setBlockProof(BlockProof.newBuilder().setBlock(0).build())
.build();
BlockItem blockItemProof1 = BlockItem.newBuilder()
.setBlockProof(BlockProof.newBuilder().setBlock(1).build())
.build();

List<BlockItem> blockItems = Arrays.asList(blockItemWithProof, blockItemWithoutProof, blockItemWithProof);
BlockItemSet blockItemSet =
BlockItemSet.newBuilder().addAllBlockItems(blockItems).build();
BlockItemSet blockItemsSet = BlockItemSet.newBuilder()
.addBlockItems(blockItemHeader)
.addBlockItems(blockItemProof)
.addBlockItems(blockItemProof1)
.build();

SubscribeStreamResponse response =
SubscribeStreamResponse.newBuilder().setBlockItems(blockItemSet).build();
assertEquals(metricsService.get(Counter.LiveBlocksConsumed).get(), 0);
SubscribeStreamResponse response = SubscribeStreamResponse.newBuilder()
.setBlockItems(blockItemsSet)
.build();
assertEquals(0, metricsService.get(Counter.LiveBlocksConsumed).get());

observer.onNext(response);

assertEquals(metricsService.get(Counter.LiveBlocksConsumed).get(), 2);
assertEquals(2, metricsService.get(Counter.LiveBlocksConsumed).get());
verifyNoInteractions(streamLatch);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.hedera.block.simulator.TestUtils;
import com.hedera.block.simulator.config.data.BlockStreamConfig;
import com.hedera.block.simulator.config.data.GrpcConfig;
import com.hedera.block.simulator.grpc.PublishStreamGrpcClient;
import com.hedera.block.simulator.metrics.MetricsService;
import com.hedera.block.simulator.metrics.MetricsServiceImpl;
import com.hedera.hapi.block.stream.protoc.Block;
Expand All @@ -38,7 +39,6 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand All @@ -48,10 +48,10 @@
class PublishStreamGrpcClientImplTest {

private MetricsService metricsService;

GrpcConfig grpcConfig;
BlockStreamConfig blockStreamConfig;
AtomicBoolean streamEnabled;
private PublishStreamGrpcClient publishStreamGrpcClient;
private GrpcConfig grpcConfig;
private BlockStreamConfig blockStreamConfig;
private AtomicBoolean streamEnabled;

@BeforeEach
void setUp() throws IOException {
Expand All @@ -63,23 +63,23 @@ void setUp() throws IOException {
Configuration config = TestUtils.getTestConfiguration();
metricsService = new MetricsServiceImpl(getTestMetrics(config));
streamEnabled = new AtomicBoolean(true);
publishStreamGrpcClient =
new PublishStreamGrpcClientImpl(grpcConfig, blockStreamConfig, metricsService, streamEnabled);
}

@AfterEach
void tearDown() {}

@Test
void streamBlockItem() {
void streamBlockItem() throws InterruptedException {
BlockItem blockItem = BlockItem.newBuilder().build();
PublishStreamGrpcClientImpl publishStreamGrpcClient =
new PublishStreamGrpcClientImpl(grpcConfig, blockStreamConfig, metricsService, streamEnabled);
publishStreamGrpcClient.init();
boolean result = publishStreamGrpcClient.streamBlockItem(List.of(blockItem));
assertTrue(result);

publishStreamGrpcClient.completeStreaming();
publishStreamGrpcClient.shutdown();
}

@Test
void streamBlock() {
void streamBlock() throws InterruptedException {
BlockItem blockItem = BlockItem.newBuilder().build();
Block block = Block.newBuilder().addItems(blockItem).build();

Expand All @@ -89,9 +89,6 @@ void streamBlock() {
.addItems(blockItem)
.build();

PublishStreamGrpcClientImpl publishStreamGrpcClient =
new PublishStreamGrpcClientImpl(grpcConfig, blockStreamConfig, metricsService, streamEnabled);

publishStreamGrpcClient.init();
assertTrue(publishStreamGrpcClient.getLastKnownStatuses().isEmpty());

Expand All @@ -102,35 +99,34 @@ void streamBlock() {
assertTrue(result1);

assertEquals(2, publishStreamGrpcClient.getPublishedBlocks());
publishStreamGrpcClient.completeStreaming();
publishStreamGrpcClient.shutdown();
}

@Test
void streamBlockFailsBecauseOfCompletedStreaming() throws InterruptedException {
BlockItem blockItem = BlockItem.newBuilder().build();
Block block = Block.newBuilder().addItems(blockItem).build();

PublishStreamGrpcClientImpl publishStreamGrpcClient =
new PublishStreamGrpcClientImpl(grpcConfig, blockStreamConfig, metricsService, streamEnabled);

publishStreamGrpcClient.init();
assertTrue(publishStreamGrpcClient.getLastKnownStatuses().isEmpty());

publishStreamGrpcClient.completeStreaming();

assertThrows(IllegalStateException.class, () -> publishStreamGrpcClient.streamBlock(block));
publishStreamGrpcClient.shutdown();
}

@Test
void streamBlockReturnsFalse() {
void streamBlockReturnsFalse() throws InterruptedException {
BlockItem blockItem = BlockItem.newBuilder().build();
Block block = Block.newBuilder().addItems(blockItem).build();
streamEnabled.set(false);
PublishStreamGrpcClientImpl publishStreamGrpcClient =
new PublishStreamGrpcClientImpl(grpcConfig, blockStreamConfig, metricsService, streamEnabled);
publishStreamGrpcClient.init();

boolean result = publishStreamGrpcClient.streamBlock(block);
assertFalse(result);
publishStreamGrpcClient.completeStreaming();
publishStreamGrpcClient.shutdown();
}

@Test
Expand Down

0 comments on commit 4d37a65

Please sign in to comment.