Skip to content

Commit

Permalink
test: add unit tests
Browse files Browse the repository at this point in the history
Signed-off-by: georgi-l95 <[email protected]>
  • Loading branch information
georgi-l95 committed Nov 25, 2024
1 parent a8b4589 commit 583aedd
Show file tree
Hide file tree
Showing 4 changed files with 209 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,13 @@ public void init() {
lastKnownStatuses.clear();
}

@Override
public void requestBlocks(long startBlock, long endBlock) throws InterruptedException {
Preconditions.requireWhole(startBlock);
Preconditions.requireWhole(endBlock);

CountDownLatch streamLatch = new CountDownLatch(1);
consumerStreamObserver = new ConsumerStreamObserver(metricsService, streamLatch);
consumerStreamObserver = new ConsumerStreamObserver(metricsService, streamLatch, lastKnownStatuses);

SubscribeStreamRequest request = SubscribeStreamRequest.newBuilder()
.setStartBlockNumber(startBlock)
Expand All @@ -96,6 +97,7 @@ public void requestBlocks(long startBlock, long endBlock) throws InterruptedExce
streamLatch.await();
}

@Override
public void completeStreaming() throws InterruptedException {
consumerStreamObserver.onCompleted();
// todo(352) Find a suitable solution for removing the sleep
Expand All @@ -112,6 +114,7 @@ public List<String> getLastKnownStatuses() {
return List.copyOf(lastKnownStatuses);
}

@Override
public void shutdown() {
channel.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class ConsumerStreamObserver implements StreamObserver<SubscribeStreamRes

// State
private final CountDownLatch streamLatch;
private final List<String> lastKnownStatuses;

/**
* Constructs a new ConsumerStreamObserver.
Expand All @@ -53,9 +54,12 @@ public class ConsumerStreamObserver implements StreamObserver<SubscribeStreamRes
* @throws NullPointerException if any parameter is null
*/
public ConsumerStreamObserver(
@NonNull final MetricsService metricsService, @NonNull final CountDownLatch streamLatch) {
@NonNull final MetricsService metricsService,
@NonNull final CountDownLatch streamLatch,
@NonNull final List<String> lastKnownStatuses) {
this.metricsService = requireNonNull(metricsService);
this.streamLatch = requireNonNull(streamLatch);
this.lastKnownStatuses = requireNonNull(lastKnownStatuses);
}

/**
Expand All @@ -67,6 +71,7 @@ public ConsumerStreamObserver(
@Override
public void onNext(SubscribeStreamResponse subscribeStreamResponse) {
final SubscribeStreamResponse.ResponseCase responseType = subscribeStreamResponse.getResponseCase();
lastKnownStatuses.add(subscribeStreamResponse.toString());

switch (responseType) {
case STATUS -> LOGGER.log(INFO, "Received Response: " + subscribeStreamResponse);
Expand All @@ -84,6 +89,7 @@ public void onNext(SubscribeStreamResponse subscribeStreamResponse) {
@Override
public void onError(Throwable streamError) {
Status status = Status.fromThrowable(streamError);
lastKnownStatuses.add(status.toString());
LOGGER.log(ERROR, "Error %s with status %s.".formatted(streamError, status), streamError);
streamLatch.countDown();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.block.simulator.grpc.impl;

import static com.hedera.block.simulator.TestUtils.getTestMetrics;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.*;

import com.hedera.block.simulator.TestUtils;
import com.hedera.block.simulator.config.data.GrpcConfig;
import com.hedera.block.simulator.grpc.ConsumerStreamGrpcClient;
import com.hedera.block.simulator.metrics.MetricsService;
import com.hedera.block.simulator.metrics.MetricsServiceImpl;
import com.hedera.hapi.block.protoc.*;
import com.hedera.hapi.block.stream.output.protoc.BlockHeader;
import com.hedera.hapi.block.stream.protoc.BlockItem;
import com.hedera.hapi.block.stream.protoc.BlockProof;
import com.swirlds.config.api.Configuration;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.net.ServerSocket;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

public class ConsumerStreamGrpcClientImplTest {
@Mock
private GrpcConfig grpcConfig;

private MetricsService metricsService;
private ConsumerStreamGrpcClient consumerStreamGrpcClientImpl;
private Server server;
private int serverPort;

@BeforeEach
void setUp() throws IOException {
MockitoAnnotations.openMocks(this);
serverPort = findFreePort();
server = ServerBuilder.forPort(serverPort)
.addService(new BlockStreamServiceGrpc.BlockStreamServiceImplBase() {
@Override
public void subscribeBlockStream(
SubscribeStreamRequest request, StreamObserver<SubscribeStreamResponse> responseObserver) {

// Simulate streaming blocks
long startBlock = request.getStartBlockNumber();
long endBlock = request.getEndBlockNumber();

for (long i = startBlock; i < endBlock; i++) {
// Simulate block items
BlockItem blockItemHeader = BlockItem.newBuilder()
.setBlockHeader(BlockHeader.newBuilder()
.setNumber(i)
.build())
.build();
BlockItem blockItemProof = BlockItem.newBuilder()
.setBlockProof(
BlockProof.newBuilder().setBlock(i).build())
.build();

BlockItemSet blockItems = BlockItemSet.newBuilder()
.addBlockItems(blockItemHeader)
.addBlockItems(blockItemProof)
.build();

responseObserver.onNext(SubscribeStreamResponse.newBuilder()
.setBlockItems(blockItems)
.build());
}

// Send success status code at the end
responseObserver.onNext(SubscribeStreamResponse.newBuilder()
.setStatus(SubscribeStreamResponseCode.READ_STREAM_SUCCESS)
.build());
responseObserver.onCompleted();
}
})
.build()
.start();

when(grpcConfig.serverAddress()).thenReturn("localhost");
when(grpcConfig.port()).thenReturn(serverPort);

Configuration config = TestUtils.getTestConfiguration();
metricsService = new MetricsServiceImpl(getTestMetrics(config));
consumerStreamGrpcClientImpl = new ConsumerStreamGrpcClientImpl(grpcConfig, metricsService);
consumerStreamGrpcClientImpl.init();
}

@AfterEach
public void tearDown() {
consumerStreamGrpcClientImpl.shutdown();
server.shutdownNow();
}

@Test
public void testInit() {
assertTrue(consumerStreamGrpcClientImpl.getLastKnownStatuses().isEmpty());
}

@Test
void requestBlocks_Success() throws InterruptedException {
final long startBlock = 0;
final long endBlock = 5;

assertEquals(startBlock, consumerStreamGrpcClientImpl.getConsumedBlocks());
assertTrue(consumerStreamGrpcClientImpl.getLastKnownStatuses().isEmpty());

consumerStreamGrpcClientImpl.requestBlocks(startBlock, endBlock);

// We check if the final status matches what we have send from the server.
final String lastStatus =
consumerStreamGrpcClientImpl.getLastKnownStatuses().getLast();
assertTrue(lastStatus.contains("status: %s".formatted(SubscribeStreamResponseCode.READ_STREAM_SUCCESS.name())));

assertEquals(endBlock, consumerStreamGrpcClientImpl.getConsumedBlocks());
}

@Test
void requestBlocks_InvalidStartBlock() {
final long startBlock = -1;
final long endBlock = 5;

assertThrows(
IllegalArgumentException.class, () -> consumerStreamGrpcClientImpl.requestBlocks(startBlock, endBlock));
}

@Test
void requestBlocks_InvalidEndBlock() {
final long startBlock = 0;
final long endBlock = -1;

assertThrows(
IllegalArgumentException.class, () -> consumerStreamGrpcClientImpl.requestBlocks(startBlock, endBlock));
}

@Test
void completeStreaming_Success() throws InterruptedException {
final long startBlock = 0;
final long endBlock = 5;

consumerStreamGrpcClientImpl.requestBlocks(startBlock, endBlock);
consumerStreamGrpcClientImpl.completeStreaming();
}

@Test
void shutdown() throws InterruptedException {
final long startBlock = 0;
final long endBlock = 5;

consumerStreamGrpcClientImpl.shutdown();
consumerStreamGrpcClientImpl.requestBlocks(startBlock, endBlock);

// We check if the first status is UNAVAILABLE, because should fail immediately
final String firstStatus =
consumerStreamGrpcClientImpl.getLastKnownStatuses().getFirst();
assertEquals(
Status.UNAVAILABLE
.augmentDescription("Channel shutdown invoked")
.toString(),
firstStatus);
}

private int findFreePort() throws IOException {
// Find a free port
try (ServerSocket socket = new ServerSocket(0)) {
return socket.getLocalPort();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.hedera.hapi.block.stream.protoc.BlockItem;
import com.swirlds.config.api.Configuration;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
Expand All @@ -40,6 +41,7 @@ class ConsumerStreamObserverTest {

private MetricsService metricsService;
private CountDownLatch streamLatch;
private List<String> lastKnownStatuses;
private ConsumerStreamObserver observer;

@BeforeEach
Expand All @@ -48,14 +50,18 @@ void setUp() throws IOException {

metricsService = spy(new MetricsServiceImpl(getTestMetrics(config)));
streamLatch = mock(CountDownLatch.class);
List<String> lastKnownStatuses = new ArrayList<>();

observer = new ConsumerStreamObserver(metricsService, streamLatch);
observer = new ConsumerStreamObserver(metricsService, streamLatch, lastKnownStatuses);
}

@Test
void testConstructorWithNullArguments() {
assertThrows(NullPointerException.class, () -> new ConsumerStreamObserver(null, streamLatch));
assertThrows(NullPointerException.class, () -> new ConsumerStreamObserver(metricsService, null));
assertThrows(
NullPointerException.class, () -> new ConsumerStreamObserver(null, streamLatch, lastKnownStatuses));
assertThrows(
NullPointerException.class, () -> new ConsumerStreamObserver(metricsService, null, lastKnownStatuses));
assertThrows(NullPointerException.class, () -> new ConsumerStreamObserver(metricsService, streamLatch, null));
}

@Test
Expand Down

0 comments on commit 583aedd

Please sign in to comment.