Skip to content

Commit

Permalink
test: add unit tests
Browse files Browse the repository at this point in the history
Signed-off-by: georgi-l95 <[email protected]>
  • Loading branch information
georgi-l95 committed Nov 22, 2024
1 parent 98ad021 commit a8b4589
Show file tree
Hide file tree
Showing 6 changed files with 228 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void init() {

/**
* Starts consuming blocks from the stream beginning at genesis (block 0).
* Currently requests an infinite stream of blocks starting from genesis.
* Currently, requests an infinite stream of blocks starting from genesis.
*
* @throws InterruptedException if the consumption process is interrupted
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,25 @@ void tearDown() throws InterruptedException {
}

@Test
void start_logsStartedMessage() throws InterruptedException, BlockSimulatorParsingException, IOException {
void startPublishing_logsStartedMessage() throws InterruptedException, BlockSimulatorParsingException, IOException {
blockStreamSimulator.start();
assertTrue(blockStreamSimulator.isRunning());
}

@Test
void startConsuming() throws IOException, BlockSimulatorParsingException, InterruptedException {
Configuration configuration = TestUtils.getTestConfiguration(Map.of("blockStream.simulatorMode", "CONSUMER"));

metricsService = new MetricsServiceImpl(getTestMetrics(configuration));
blockStreamSimulator = new BlockStreamSimulatorApp(
configuration, blockStreamManager, publishStreamGrpcClient, consumerStreamGrpcClient, metricsService);
blockStreamSimulator.start();

verify(consumerStreamGrpcClient).init();
verify(consumerStreamGrpcClient).requestBlocks(0, 0);
assertTrue(blockStreamSimulator.isRunning());
}

@Test
void start_constantRateStreaming() throws InterruptedException, BlockSimulatorParsingException, IOException {

Expand All @@ -116,6 +130,8 @@ void start_constantRateStreaming() throws InterruptedException, BlockSimulatorPa
when(blockStreamManager.getNextBlock()).thenReturn(block1, block2, null);

Configuration configuration = TestUtils.getTestConfiguration(Map.of(
"blockStream.simulatorMode",
"PUBLISHER",
"blockStream.maxBlockItemsToStream",
"2",
"generator.managerImplementation",
Expand All @@ -139,12 +155,30 @@ private String getAbsoluteFolder(String relativePath) {
}

@Test
void stop_doesNotThrowException() throws InterruptedException {
void stopPublishing_doesNotThrowException() throws InterruptedException, IOException {
Configuration configuration = TestUtils.getTestConfiguration(Map.of("blockStream.simulatorMode", "PUBLISHER"));

metricsService = new MetricsServiceImpl(getTestMetrics(configuration));
blockStreamSimulator = new BlockStreamSimulatorApp(
configuration, blockStreamManager, publishStreamGrpcClient, consumerStreamGrpcClient, metricsService);

assertDoesNotThrow(() -> blockStreamSimulator.stop());
assertFalse(blockStreamSimulator.isRunning());
verify(publishStreamGrpcClient, atLeast(1)).completeStreaming();
}

@Test
void stopConsuming_doesNotThrowException() throws InterruptedException, IOException {
Configuration configuration = TestUtils.getTestConfiguration(Map.of("blockStream.simulatorMode", "CONSUMER"));

metricsService = new MetricsServiceImpl(getTestMetrics(configuration));
blockStreamSimulator = new BlockStreamSimulatorApp(
configuration, blockStreamManager, publishStreamGrpcClient, consumerStreamGrpcClient, metricsService);
assertDoesNotThrow(() -> blockStreamSimulator.stop());
assertFalse(blockStreamSimulator.isRunning());
verify(consumerStreamGrpcClient, atLeast(1)).completeStreaming();
}

@Test
void start_millisPerBlockStreaming() throws InterruptedException, IOException, BlockSimulatorParsingException {
BlockStreamManager blockStreamManager = mock(BlockStreamManager.class);
Expand All @@ -155,6 +189,8 @@ void start_millisPerBlockStreaming() throws InterruptedException, IOException, B
when(blockStreamManager.getNextBlock()).thenReturn(block, block, null);

Configuration configuration = TestUtils.getTestConfiguration(Map.of(
"blockStream.simulatorMode",
"PUBLISHER",
"blockStream.maxBlockItemsToStream",
"2",
"generator.managerImplementation",
Expand Down Expand Up @@ -194,6 +230,8 @@ void start_millisPerSecond_streamingLagVerifyWarnLog()
.thenReturn(true);

Configuration configuration = TestUtils.getTestConfiguration(Map.of(
"blockStream.simulatorMode",
"PUBLISHER",
"generator.managerImplementation",
"BlockAsFileBlockStreamManager",
"generator.rootPath",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.block.simulator.grpc.impl;

import static com.hedera.block.simulator.TestUtils.getTestMetrics;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.*;

import com.hedera.block.simulator.TestUtils;
import com.hedera.block.simulator.metrics.MetricsService;
import com.hedera.block.simulator.metrics.MetricsServiceImpl;
import com.hedera.block.simulator.metrics.SimulatorMetricTypes.Counter;
import com.hedera.hapi.block.protoc.BlockItemSet;
import com.hedera.hapi.block.protoc.SubscribeStreamResponse;
import com.hedera.hapi.block.protoc.SubscribeStreamResponseCode;
import com.hedera.hapi.block.stream.protoc.BlockItem;
import com.swirlds.config.api.Configuration;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class ConsumerStreamObserverTest {

private MetricsService metricsService;
private CountDownLatch streamLatch;
private ConsumerStreamObserver observer;

@BeforeEach
void setUp() throws IOException {
Configuration config = TestUtils.getTestConfiguration();

metricsService = spy(new MetricsServiceImpl(getTestMetrics(config)));
streamLatch = mock(CountDownLatch.class);

observer = new ConsumerStreamObserver(metricsService, streamLatch);
}

@Test
void testConstructorWithNullArguments() {
assertThrows(NullPointerException.class, () -> new ConsumerStreamObserver(null, streamLatch));
assertThrows(NullPointerException.class, () -> new ConsumerStreamObserver(metricsService, null));
}

@Test
void testOnNextWithStatusResponse() {
SubscribeStreamResponse response = SubscribeStreamResponse.newBuilder()
.setStatus(SubscribeStreamResponseCode.READ_STREAM_SUCCESS)
.build();

observer.onNext(response);

verifyNoInteractions(metricsService);
verifyNoInteractions(streamLatch);
}

@Test
void testOnNextWithBlockItemsResponse() {
BlockItem blockItemWithProof = mock(BlockItem.class);
when(blockItemWithProof.hasBlockProof()).thenReturn(true);

BlockItem blockItemWithoutProof = mock(BlockItem.class);
when(blockItemWithoutProof.hasBlockProof()).thenReturn(false);

List<BlockItem> blockItems = Arrays.asList(blockItemWithProof, blockItemWithoutProof, blockItemWithProof);
BlockItemSet blockItemSet =
BlockItemSet.newBuilder().addAllBlockItems(blockItems).build();

SubscribeStreamResponse response =
SubscribeStreamResponse.newBuilder().setBlockItems(blockItemSet).build();
assertEquals(metricsService.get(Counter.LiveBlocksConsumed).get(), 0);

observer.onNext(response);

assertEquals(metricsService.get(Counter.LiveBlocksConsumed).get(), 2);
verifyNoInteractions(streamLatch);
}

@Test
void testOnNextWithUnknownResponseType() {
SubscribeStreamResponse response = SubscribeStreamResponse.newBuilder().build();

IllegalArgumentException exception =
assertThrows(IllegalArgumentException.class, () -> observer.onNext(response));

assertEquals("Unknown response type: RESPONSE_NOT_SET", exception.getMessage());
verifyNoInteractions(metricsService);
verifyNoInteractions(streamLatch);
}

@Test
void testOnError() {
Throwable testException = new RuntimeException("Test exception");

observer.onError(testException);

verify(streamLatch).countDown();
verifyNoInteractions(metricsService);
}

@Test
void testOnCompleted() {
observer.onCompleted();

verify(streamLatch).countDown();
verifyNoInteractions(metricsService);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package com.hedera.block.simulator.grpc;
package com.hedera.block.simulator.grpc.impl;

import static com.hedera.block.simulator.TestUtils.getTestMetrics;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand All @@ -27,7 +27,6 @@
import com.hedera.block.simulator.TestUtils;
import com.hedera.block.simulator.config.data.BlockStreamConfig;
import com.hedera.block.simulator.config.data.GrpcConfig;
import com.hedera.block.simulator.grpc.impl.PublishStreamGrpcClientImpl;
import com.hedera.block.simulator.metrics.MetricsService;
import com.hedera.block.simulator.metrics.MetricsServiceImpl;
import com.hedera.hapi.block.stream.protoc.Block;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@
* limitations under the License.
*/

package com.hedera.block.simulator.grpc;
package com.hedera.block.simulator.grpc.impl;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.hedera.block.simulator.grpc.impl.PublishStreamObserver;
import com.hedera.hapi.block.protoc.PublishStreamResponse;
import java.util.ArrayList;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,72 @@

package com.hedera.block.simulator.mode;

import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.*;

import com.hedera.block.simulator.config.data.BlockStreamConfig;
import com.hedera.block.simulator.grpc.impl.ConsumerStreamGrpcClientImpl;
import org.mockito.Mock;
import com.hedera.block.simulator.grpc.ConsumerStreamGrpcClient;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class ConsumerModeHandlerTest {
class ConsumerModeHandlerTest {

@Mock
private BlockStreamConfig blockStreamConfig;
private ConsumerStreamGrpcClient consumerStreamGrpcClient;
private ConsumerModeHandler consumerModeHandler;

@Mock
private ConsumerStreamGrpcClientImpl consumerStreamGrpcClient;
@BeforeEach
void setUp() {
blockStreamConfig = mock(BlockStreamConfig.class);
consumerStreamGrpcClient = mock(ConsumerStreamGrpcClient.class);

private ConsumerModeHandler consumerModeHandler;
consumerModeHandler = new ConsumerModeHandler(blockStreamConfig, consumerStreamGrpcClient);
}

@Test
void testConstructorWithNullArguments() {
assertThrows(NullPointerException.class, () -> new ConsumerModeHandler(null, consumerStreamGrpcClient));
assertThrows(NullPointerException.class, () -> new ConsumerModeHandler(blockStreamConfig, null));
}

@Test
void testInit() {
consumerModeHandler.init();

verify(consumerStreamGrpcClient).init();
}

@Test
void testStart() throws InterruptedException {
consumerModeHandler.start();
verify(consumerStreamGrpcClient).requestBlocks(0, 0);
}

@Test
void testStart_throwsExceptionDuringConsuming() throws InterruptedException {
consumerModeHandler.start();

doThrow(new InterruptedException("Test exception"))
.when(consumerStreamGrpcClient)
.requestBlocks(0, 0);
assertThrows(InterruptedException.class, () -> consumerModeHandler.start());
}

@Test
void testStop() throws InterruptedException {
consumerModeHandler.stop();

verify(consumerStreamGrpcClient).completeStreaming();
verify(consumerStreamGrpcClient).shutdown();
}

@Test
void testStop_throwsExceptionDuringCompleteStreaming() throws InterruptedException {
consumerModeHandler.stop();
doThrow(new InterruptedException("Test exception"))
.when(consumerStreamGrpcClient)
.completeStreaming();

// @Test
// void testStartThrowsUnsupportedOperationException() {
// MockitoAnnotations.openMocks(this);
// consumerModeHandler = new ConsumerModeHandler(consumerStreamGrpcClient, blockStreamConfig);
//
// assertThrows(UnsupportedOperationException.class, () -> consumerModeHandler.start());
// }
assertThrows(InterruptedException.class, () -> consumerModeHandler.stop());
}
}

0 comments on commit a8b4589

Please sign in to comment.