Skip to content

Commit

Permalink
fix unit tests
Browse files Browse the repository at this point in the history
Signed-off-by: georgi-l95 <[email protected]>
  • Loading branch information
georgi-l95 committed Nov 27, 2024
1 parent d6d5deb commit 87b6d89
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 159 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class PublishStreamGrpcClientImpl implements PublishStreamGrpcClient {

// State
private final AtomicBoolean streamEnabled;
private final List<String> lastKnownStatuses;
private final List<String> lastKnownStatuses = new ArrayList<>();

/**
* Creates a new PublishStreamGrpcClientImpl with the specified dependencies.
Expand All @@ -85,7 +85,6 @@ public PublishStreamGrpcClientImpl(
this.blockStreamConfig = requireNonNull(blockStreamConfig);
this.metricsService = requireNonNull(metricsService);
this.streamEnabled = requireNonNull(streamEnabled);
this.lastKnownStatuses = new ArrayList<>();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.net.ServerSocket;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import static com.hedera.block.simulator.TestUtils.findFreePort;
import static com.hedera.block.simulator.TestUtils.getTestMetrics;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.*;

Expand All @@ -31,123 +29,100 @@
import com.hedera.block.simulator.metrics.MetricsService;
import com.hedera.block.simulator.metrics.MetricsServiceImpl;
import com.hedera.hapi.block.protoc.*;
import com.hedera.hapi.block.stream.output.protoc.BlockHeader;
import com.hedera.hapi.block.stream.protoc.Block;
import com.hedera.hapi.block.stream.protoc.BlockItem;
import com.hedera.hapi.block.stream.protoc.BlockProof;
import com.swirlds.config.api.Configuration;
import io.grpc.ManagedChannel;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
class PublishStreamGrpcClientImplTest {

private MetricsService metricsService;
private PublishStreamGrpcClient publishStreamGrpcClient;

@Mock
private GrpcConfig grpcConfig;

private BlockStreamConfig blockStreamConfig;
private AtomicBoolean streamEnabled;
private Server server;

@BeforeEach
void setUp() throws IOException {
// MockitoAnnotations.openMocks(this);
//
// // Initialize the AtomicBoolean
// streamEnabled = new AtomicBoolean(true);
//
// // Start the server on a random port
final int serverPort = findFreePort();
// server = ServerBuilder.forPort(serverPort)
// .addService(new BlockStreamServiceGrpc.BlockStreamServiceImplBase() {
// @Override
// public StreamObserver<PublishStreamRequest> publishBlockStream(
// StreamObserver<PublishStreamResponse> responseObserver) {
//
// return new StreamObserver<PublishStreamRequest>() {
//
// private boolean firstBlockReceived = false;
// private long lastBlockNumber = 0;
//
// @Override
// public void onNext(PublishStreamRequest request) {
// if (!streamEnabled.get()) {
// // Send an error response
// PublishStreamResponse.EndOfStream endOfStream = PublishStreamResponse.EndOfStream.newBuilder()
// .setStatus(PublishStreamResponseCode.STREA)
// .setBlockNumber(lastBlockNumber)
// .build();
// responseObserver.onNext(PublishStreamResponse.newBuilder()
// .setStatus(endOfStream)
// .build());
// responseObserver.onCompleted();
// return;
// }
//
// BlockItemSet blockItems = request.getBlockItems();
// List<BlockItem> items = blockItems.getBlockItemsList();
//
// // Simulate processing of block items
// for (BlockItem item : items) {
// // Assume that the first BlockItem is a BlockHeader
// if (item.hasBlockHeader()) {
// firstBlockReceived = true;
// lastBlockNumber = item.getBlockHeader().getBlockNumber();
// }
// // Assume that the last BlockItem is a BlockProof
// if (item.hasBlockProof()) {
// // Send BlockAcknowledgement
// PublishStreamResponse.Acknowledgement acknowledgement =
// PublishStreamResponse.Acknowledgement.newBuilder()
// .setBlockAck(PublishStreamResponse.BlockAcknowledgement.newBuilder()
// .setBlockNumber(lastBlockNumber)
// .setBlockRootHash(item.getBlockProof().getMerkleRoot())
// .build())
// .build();
// responseObserver.onNext(PublishStreamResponse.newBuilder()
// .setAcknowledgement(acknowledgement)
// .build());
// }
// }
// }
//
// @Override
// public void onError(Throwable t) {
// // Handle error
// }
//
// @Override
// public void onCompleted() {
// // Send EndOfStream response
// PublishStreamResponse.EndOfStream endOfStream = PublishStreamResponse.EndOfStream.newBuilder()
// .setStatus(PublishStreamResponseCode.STREAM_ITEMS_SUCCESS)
// .setBlockNumber(lastBlockNumber)
// .build();
// responseObserver.onNext(PublishStreamResponse.newBuilder()
// .setStatus(endOfStream)
// .build());
// responseObserver.onCompleted();
// }
// };
// }
// })
// .build()
// .start();
MockitoAnnotations.openMocks(this);

streamEnabled = new AtomicBoolean(true);

final int serverPort = findFreePort();
server = ServerBuilder.forPort(serverPort)
.addService(new BlockStreamServiceGrpc.BlockStreamServiceImplBase() {
@Override
public StreamObserver<PublishStreamRequest> publishBlockStream(
StreamObserver<PublishStreamResponse> responseObserver) {
return new StreamObserver<>() {
private long lastBlockNumber = 0;

@Override
public void onNext(PublishStreamRequest request) {
BlockItemSet blockItems = request.getBlockItems();
List<BlockItem> items = blockItems.getBlockItemsList();
// Simulate processing of block items
for (BlockItem item : items) {
// Assume that the first BlockItem is a BlockHeader
if (item.hasBlockHeader()) {
lastBlockNumber = item.getBlockHeader().getNumber();
}
// Assume that the last BlockItem is a BlockProof
if (item.hasBlockProof()) {
// Send BlockAcknowledgement
PublishStreamResponse.Acknowledgement acknowledgement =
PublishStreamResponse.Acknowledgement.newBuilder()
.setBlockAck(
PublishStreamResponse.BlockAcknowledgement.newBuilder()
.setBlockNumber(lastBlockNumber)
.build())
.build();
responseObserver.onNext(PublishStreamResponse.newBuilder()
.setAcknowledgement(acknowledgement)
.build());
}
}
}

@Override
public void onError(Throwable t) {
// handle onError
}

@Override
public void onCompleted() {
PublishStreamResponse.EndOfStream endOfStream =
PublishStreamResponse.EndOfStream.newBuilder()
.setStatus(PublishStreamResponseCode.STREAM_ITEMS_SUCCESS)
.setBlockNumber(lastBlockNumber)
.build();
responseObserver.onNext(PublishStreamResponse.newBuilder()
.setStatus(endOfStream)
.build());
responseObserver.onCompleted();
}
};
}
})
.build()
.start();
blockStreamConfig = TestUtils.getTestConfiguration(Map.of("blockStream.blockItemsBatchSize", "2"))
.getConfigData(BlockStreamConfig.class);

Expand All @@ -163,90 +138,71 @@ void setUp() throws IOException {
}

@AfterEach
void teardown() {
// server.shutdownNow();
}

@Test
void streamBlockItem() throws InterruptedException {
BlockItem blockItem = BlockItem.newBuilder().build();
publishStreamGrpcClient.init();
boolean result = publishStreamGrpcClient.streamBlockItem(List.of(blockItem));
assertTrue(result);

void teardown() throws InterruptedException {
publishStreamGrpcClient.completeStreaming();
publishStreamGrpcClient.shutdown();

if (server != null) {
server.shutdown();
}
}

@Test
void streamBlock() throws InterruptedException {
BlockItem blockItem = BlockItem.newBuilder().build();
Block block = Block.newBuilder().addItems(blockItem).build();

Block block1 = Block.newBuilder()
.addItems(blockItem)
.addItems(blockItem)
.addItems(blockItem)
.build();

public void testInit() {
publishStreamGrpcClient.init();
// Verify that lastKnownStatuses is cleared
assertTrue(publishStreamGrpcClient.getLastKnownStatuses().isEmpty());

boolean result = publishStreamGrpcClient.streamBlock(block);
assertTrue(result);

boolean result1 = publishStreamGrpcClient.streamBlock(block1);
assertTrue(result1);

assertEquals(2, publishStreamGrpcClient.getPublishedBlocks());
publishStreamGrpcClient.completeStreaming();
publishStreamGrpcClient.shutdown();
}

@Test
void streamBlockFailsBecauseOfCompletedStreaming() throws InterruptedException {
BlockItem blockItem = BlockItem.newBuilder().build();
Block block = Block.newBuilder().addItems(blockItem).build();

void testStreamBlockItem_Success() {
publishStreamGrpcClient.init();
assertTrue(publishStreamGrpcClient.getLastKnownStatuses().isEmpty());

publishStreamGrpcClient.completeStreaming();
assertThrows(IllegalStateException.class, () -> publishStreamGrpcClient.streamBlock(block));
publishStreamGrpcClient.shutdown();
}
BlockItem blockItem = BlockItem.newBuilder()
.setBlockHeader(BlockHeader.newBuilder().setNumber(0).build())
.build();

@Test
void streamBlockReturnsFalse() throws InterruptedException {
BlockItem blockItem = BlockItem.newBuilder().build();
Block block = Block.newBuilder().addItems(blockItem).build();
streamEnabled.set(false);
publishStreamGrpcClient.init();
List<BlockItem> blockItems = List.of(blockItem);

boolean result = publishStreamGrpcClient.streamBlock(block);
assertFalse(result);
publishStreamGrpcClient.completeStreaming();
publishStreamGrpcClient.shutdown();
final boolean result = publishStreamGrpcClient.streamBlockItem(blockItems);
assertTrue(result);
}

@Test
void testShutdown() throws Exception {
PublishStreamGrpcClientImpl publishStreamGrpcClient =
new PublishStreamGrpcClientImpl(grpcConfig, blockStreamConfig, metricsService, streamEnabled);
void testStreamBlock_Success() throws InterruptedException {
publishStreamGrpcClient.init();
final int streamedBlocks = 3;

for (int i = 0; i < streamedBlocks; i++) {
BlockItem blockItemHeader = BlockItem.newBuilder()
.setBlockHeader(BlockHeader.newBuilder().setNumber(i).build())
.build();
BlockItem blockItemProof = BlockItem.newBuilder()
.setBlockProof(BlockProof.newBuilder().setBlock(i).build())
.build();
Block block = Block.newBuilder()
.addItems(blockItemHeader)
.addItems(blockItemProof)
.build();

final boolean result = publishStreamGrpcClient.streamBlock(block);
assertTrue(result);
}

Field channelField = PublishStreamGrpcClientImpl.class.getDeclaredField("channel");
ManagedChannel mockChannel = mock(ManagedChannel.class);
// we use simple retry mechanism here, because sometimes server takes some time to receive the stream
long retryNumber = 1;
long waitTime = 500;

try {
channelField.setAccessible(true);
channelField.set(publishStreamGrpcClient, mockChannel);
} finally {
channelField.setAccessible(false);
while (retryNumber < 3) {
if (!publishStreamGrpcClient.getLastKnownStatuses().isEmpty()) {
break;
}
Thread.sleep(retryNumber * waitTime);
retryNumber++;
}
publishStreamGrpcClient.shutdown();

// Verify that channel.shutdown() was called
verify(mockChannel).shutdown();
assertEquals(streamedBlocks, publishStreamGrpcClient.getPublishedBlocks());
assertEquals(
streamedBlocks, publishStreamGrpcClient.getLastKnownStatuses().size());
}
}

0 comments on commit 87b6d89

Please sign in to comment.