asserts =
+ actual -> assertThat(actual).isGreaterThanOrEqualTo(base).isEqualTo(toTest);
+
+ final long actual = Preconditions.requireGreaterOrEqual(toTest, base);
+ assertThat(actual).satisfies(asserts);
+
+ final long actualOverload = Preconditions.requireGreaterOrEqual(toTest, base, "test error message");
+ assertThat(actualOverload).satisfies(asserts);
+ }
+
+ /**
+ * This test aims to verify that the
+ * {@link Preconditions#requireGreaterOrEqual(long, long)} will throw an
+ * {@link IllegalArgumentException} if the check fails.
+ *
+ * @param toTest parameterized, the number to test
+ */
+ @ParameterizedTest
+ @MethodSource("com.hedera.block.common.CommonsTestUtility#invalidGreaterOrEqualValues")
+ void testRequireGreaterOrEqualFail(final long toTest, final long base) {
+ assertThatIllegalArgumentException().isThrownBy(() -> Preconditions.requireGreaterOrEqual(toTest, base));
+
+ final String testErrorMessage = "test error message";
+ assertThatIllegalArgumentException()
+ .isThrownBy(() -> Preconditions.requireGreaterOrEqual(toTest, base, testErrorMessage))
+ .withMessage(testErrorMessage);
+ }
+
/**
* This test aims to verify that the
* {@link Preconditions#requirePositive(int)} will return the input 'toTest'
diff --git a/simulator/docs/configuration.md b/simulator/docs/configuration.md
index dfa78fbb..939ea521 100644
--- a/simulator/docs/configuration.md
+++ b/simulator/docs/configuration.md
@@ -11,6 +11,7 @@ Uses the prefix `blockStream` so all properties should start with `blockStream.`
| Key | Description | Default Value |
|:---|:---|---:|
+| `simulatorMode` | The desired simulator mode to use, it can be either `PUBLISHER` or `CONSUMER`. | `PUBLISHER` |
| `delayBetweenBlockItems` | The delay between each block item in nanoseconds, only applicable when streamingMode=CONSTANT_RATE | `1_500_000` |
| `maxBlockItemsToStream` | exit condition for the simulator and the circular implementations such as `BlockAsDir` or `BlockAsFile` implementations | `10_000` |
| `streamingMode` | can either be `CONSTANT_RATE` or `MILLIS_PER_BLOCK` | `CONSTANT_RATE` |
diff --git a/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulatorApp.java b/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulatorApp.java
index 7cdb2205..1bfc3d82 100644
--- a/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulatorApp.java
+++ b/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulatorApp.java
@@ -26,6 +26,7 @@
import com.hedera.block.simulator.config.types.SimulatorMode;
import com.hedera.block.simulator.exception.BlockSimulatorParsingException;
import com.hedera.block.simulator.generator.BlockStreamManager;
+import com.hedera.block.simulator.grpc.ConsumerStreamGrpcClient;
import com.hedera.block.simulator.grpc.PublishStreamGrpcClient;
import com.hedera.block.simulator.metrics.MetricsService;
import com.hedera.block.simulator.mode.CombinedModeHandler;
@@ -43,98 +44,128 @@
import java.util.logging.Logger;
import javax.inject.Inject;
-/** BlockStream Simulator App */
+/**
+ * The BlockStream Simulator Application manages the lifecycle and coordination
+ * of block streaming
+ * operations. It supports different modes of operation including publishing,
+ * consuming, or both
+ * simultaneously.
+ *
+ *
+ * This class serves as the main entry point for the simulator, handling
+ * initialization,
+ * execution, and shutdown of streaming operations based on the configured mode.
+ */
public class BlockStreamSimulatorApp {
-
private final System.Logger LOGGER = System.getLogger(getClass().getName());
+ // Service dependencies
private final PublishStreamGrpcClient publishStreamGrpcClient;
+ private final ConsumerStreamGrpcClient consumerStreamGrpcClient;
private final SimulatorModeHandler simulatorModeHandler;
+
+ // State
private final AtomicBoolean isRunning = new AtomicBoolean(false);
- private final MetricsService metricsService;
/**
- * Creates a new BlockStreamSimulatorApp instance.
+ * Creates a new BlockStreamSimulatorApp instance with the specified
+ * dependencies.
*
- * @param configuration the configuration to be used by the block stream simulator
- * @param blockStreamManager the block stream manager to be used by the block stream simulator
- * @param publishStreamGrpcClient the gRPC client to be used by the block stream simulator
- * @param metricsService the metrics service to be used by the block stream simulator
+ * @param configuration The configuration to be used by the block
+ * stream simulator
+ * @param blockStreamManager The manager responsible for block stream
+ * generation
+ * @param publishStreamGrpcClient The gRPC client for publishing blocks
+ * @param consumerStreamGrpcClient The gRPC client for consuming blocks
+ * @param metricsService The service for recording metrics
+ * @throws NullPointerException if any parameter is null
+ * @throws IllegalArgumentException if an unknown simulator mode is configured
*/
@Inject
public BlockStreamSimulatorApp(
@NonNull Configuration configuration,
@NonNull BlockStreamManager blockStreamManager,
@NonNull PublishStreamGrpcClient publishStreamGrpcClient,
+ @NonNull ConsumerStreamGrpcClient consumerStreamGrpcClient,
@NonNull MetricsService metricsService) {
requireNonNull(configuration);
requireNonNull(blockStreamManager);
- this.metricsService = requireNonNull(metricsService);
- this.publishStreamGrpcClient = requireNonNull(publishStreamGrpcClient);
loadLoggingProperties();
+ this.publishStreamGrpcClient = requireNonNull(publishStreamGrpcClient);
+ this.consumerStreamGrpcClient = requireNonNull(consumerStreamGrpcClient);
+
+ // Initialize the appropriate mode handler based on configuration
final BlockStreamConfig blockStreamConfig =
requireNonNull(configuration.getConfigData(BlockStreamConfig.class));
-
+ // @todo(386) Load simulator mode using dagger
final SimulatorMode simulatorMode = blockStreamConfig.simulatorMode();
- switch (simulatorMode) {
- case PUBLISHER -> simulatorModeHandler = new PublisherModeHandler(
+ this.simulatorModeHandler = switch (simulatorMode) {
+ case PUBLISHER -> new PublisherModeHandler(
blockStreamConfig, publishStreamGrpcClient, blockStreamManager, metricsService);
- case CONSUMER -> simulatorModeHandler = new ConsumerModeHandler(blockStreamConfig);
- case BOTH -> simulatorModeHandler = new CombinedModeHandler(blockStreamConfig);
- default -> throw new IllegalArgumentException("Unknown SimulatorMode: " + simulatorMode);
- }
+ case CONSUMER -> new ConsumerModeHandler(consumerStreamGrpcClient);
+ case BOTH -> new CombinedModeHandler();
+ };
}
/**
- * Starts the block stream simulator.
+ * Initializes and starts the block stream simulator in the configured mode.
+ * This method initializes all components and begins the streaming process.
*
- * @throws InterruptedException if the thread is interrupted
- * @throws BlockSimulatorParsingException if a parse error occurs
- * @throws IOException if an I/O error occurs
+ * @throws InterruptedException if the streaming process is
+ * interrupted
+ * @throws BlockSimulatorParsingException if a block parsing error occurs
+ * @throws IOException if an I/O error occurs during
+ * streaming
*/
public void start() throws InterruptedException, BlockSimulatorParsingException, IOException {
- LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator started initializing components...");
- publishStreamGrpcClient.init();
+ LOGGER.log(INFO, "Block Stream Simulator started initializing components...");
+ simulatorModeHandler.init();
+
isRunning.set(true);
simulatorModeHandler.start();
}
/**
- * Returns whether the block stream simulator is running.
+ * Checks if the simulator is currently running.
*
- * @return true if the block stream simulator is running, false otherwise
+ * @return true if the simulator is running, false otherwise
*/
public boolean isRunning() {
return isRunning.get();
}
/**
- * Stops the Block Stream Simulator and closes off all grpc channels.
+ * Gracefully stops the simulator and closes all gRPC channels.
+ * This method ensures proper cleanup of resources and termination of streaming
+ * operations.
*
- * @throws InterruptedException if the thread is interrupted
+ * @throws InterruptedException if the shutdown process is interrupted
*/
public void stop() throws InterruptedException {
+ // @todo(322) Add real lifecycle to the simulator
simulatorModeHandler.stop();
- publishStreamGrpcClient.completeStreaming();
-
- publishStreamGrpcClient.shutdown();
isRunning.set(false);
LOGGER.log(INFO, "Block Stream Simulator has stopped");
}
/**
- * Gets the stream status from both the publisher and the consumer.
+ * Retrieves the current status of both publishing and consuming streams.
+ * This method provides information about the number of blocks processed and the
+ * last known status of both publisher and consumer operations.
*
- * @return the stream status
+ * @return A StreamStatus object containing current metrics and status
+ * information
*/
public StreamStatus getStreamStatus() {
return StreamStatus.builder()
.publishedBlocks(publishStreamGrpcClient.getPublishedBlocks())
+ .consumedBlocks(consumerStreamGrpcClient.getConsumedBlocks())
.lastKnownPublisherStatuses(publishStreamGrpcClient.getLastKnownStatuses())
+ .lastKnownConsumersStatuses(consumerStreamGrpcClient.getLastKnownStatuses())
.build();
}
diff --git a/simulator/src/main/java/com/hedera/block/simulator/generator/BlockAsDirBlockStreamManager.java b/simulator/src/main/java/com/hedera/block/simulator/generator/BlockAsDirBlockStreamManager.java
index 42f4aa62..c5a5ab4a 100644
--- a/simulator/src/main/java/com/hedera/block/simulator/generator/BlockAsDirBlockStreamManager.java
+++ b/simulator/src/main/java/com/hedera/block/simulator/generator/BlockAsDirBlockStreamManager.java
@@ -63,6 +63,13 @@ public class BlockAsDirBlockStreamManager implements BlockStreamManager {
@Inject
public BlockAsDirBlockStreamManager(@NonNull BlockGeneratorConfig blockGeneratorConfig) {
this.rootFolder = blockGeneratorConfig.folderRootPath();
+ }
+
+ /**
+ * Initialize the block stream manager and load blocks into memory.
+ */
+ @Override
+ public void init() {
try {
this.loadBlocks();
} catch (IOException | ParseException | IllegalArgumentException e) {
diff --git a/simulator/src/main/java/com/hedera/block/simulator/generator/BlockAsFileBlockStreamManager.java b/simulator/src/main/java/com/hedera/block/simulator/generator/BlockAsFileBlockStreamManager.java
index a9339c16..cdb9cfd4 100644
--- a/simulator/src/main/java/com/hedera/block/simulator/generator/BlockAsFileBlockStreamManager.java
+++ b/simulator/src/main/java/com/hedera/block/simulator/generator/BlockAsFileBlockStreamManager.java
@@ -59,6 +59,13 @@ public class BlockAsFileBlockStreamManager implements BlockStreamManager {
@Inject
public BlockAsFileBlockStreamManager(@NonNull BlockGeneratorConfig blockStreamConfig) {
this.rootFolder = blockStreamConfig.folderRootPath();
+ }
+
+ /**
+ * Initialize the block stream manager and load blocks into memory.
+ */
+ @Override
+ public void init() {
try {
this.loadBlocks();
} catch (IOException | ParseException | IllegalArgumentException e) {
diff --git a/simulator/src/main/java/com/hedera/block/simulator/generator/BlockStreamManager.java b/simulator/src/main/java/com/hedera/block/simulator/generator/BlockStreamManager.java
index b6012f06..2f951d91 100644
--- a/simulator/src/main/java/com/hedera/block/simulator/generator/BlockStreamManager.java
+++ b/simulator/src/main/java/com/hedera/block/simulator/generator/BlockStreamManager.java
@@ -25,6 +25,11 @@
/** The block stream manager interface. */
public interface BlockStreamManager {
+ /**
+ * Initialize the block stream manager and load blocks into memory.
+ */
+ default void init() {}
+
/**
* Get the generation mode.
*
diff --git a/simulator/src/main/java/com/hedera/block/simulator/grpc/ConsumerStreamGrpcClient.java b/simulator/src/main/java/com/hedera/block/simulator/grpc/ConsumerStreamGrpcClient.java
new file mode 100644
index 00000000..c403b871
--- /dev/null
+++ b/simulator/src/main/java/com/hedera/block/simulator/grpc/ConsumerStreamGrpcClient.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright (C) 2024 Hedera Hashgraph, LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.hedera.block.simulator.grpc;
+
+import java.util.List;
+
+/**
+ * Interface defining the contract for a gRPC client that consumes blocks from a stream.
+ * This interface provides methods for initializing, managing, and monitoring block consumption.
+ */
+public interface ConsumerStreamGrpcClient {
+
+ /**
+ * Initializes the gRPC channel and creates the necessary stubs based on configuration.
+ * This method must be called before any streaming operations can begin.
+ */
+ void init();
+
+ /**
+ * Requests a stream of blocks from the server within the specified range.
+ *
+ * @param startBlock The block number to start streaming from (inclusive)
+ * @param endBlock The block number to end streaming at (inclusive). Use 0 for infinite streaming
+ * @throws InterruptedException if the streaming process is interrupted
+ */
+ void requestBlocks(long startBlock, long endBlock) throws InterruptedException;
+
+ /**
+ * Shutdown the channel and signals completion of the streaming process to the server.
+ * This method should be called to gracefully terminate the stream.
+ *
+ * @throws InterruptedException if the completion process is interrupted
+ */
+ void completeStreaming() throws InterruptedException;
+
+ /**
+ * Retrieves the total number of blocks that have been consumed from the stream.
+ *
+ * @return the count of consumed blocks
+ */
+ long getConsumedBlocks();
+
+ /**
+ * Retrieves the most recent status messages received from the server.
+ *
+ * @return a list of status messages in chronological order
+ */
+ List getLastKnownStatuses();
+}
diff --git a/simulator/src/main/java/com/hedera/block/simulator/grpc/GrpcInjectionModule.java b/simulator/src/main/java/com/hedera/block/simulator/grpc/GrpcInjectionModule.java
index 846dcca0..6ae3ec66 100644
--- a/simulator/src/main/java/com/hedera/block/simulator/grpc/GrpcInjectionModule.java
+++ b/simulator/src/main/java/com/hedera/block/simulator/grpc/GrpcInjectionModule.java
@@ -16,6 +16,8 @@
package com.hedera.block.simulator.grpc;
+import com.hedera.block.simulator.grpc.impl.ConsumerStreamGrpcClientImpl;
+import com.hedera.block.simulator.grpc.impl.PublishStreamGrpcClientImpl;
import dagger.Binds;
import dagger.Module;
import dagger.Provides;
@@ -36,6 +38,16 @@ public interface GrpcInjectionModule {
@Binds
PublishStreamGrpcClient bindPublishStreamGrpcClient(PublishStreamGrpcClientImpl publishStreamGrpcClient);
+ /**
+ * Binds the ConsumerStreamGrpcClient to the ConsumerStreamGrpcClientImpl.
+ *
+ * @param consumerStreamGrpcClient the ConsumerStreamGrpcClientImpl
+ * @return the ConsumerStreamGrpcClient
+ */
+ @Singleton
+ @Binds
+ ConsumerStreamGrpcClient bindConsumerStreamGrpcClient(ConsumerStreamGrpcClientImpl consumerStreamGrpcClient);
+
/**
* Provides the stream enabled flag
*
diff --git a/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClient.java b/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClient.java
index 0f07cca6..0488a5a6 100644
--- a/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClient.java
+++ b/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClient.java
@@ -68,6 +68,8 @@ public interface PublishStreamGrpcClient {
/**
* Shutdowns the channel.
+ *
+ * @throws InterruptedException if the thread is interrupted
*/
- void shutdown();
+ void shutdown() throws InterruptedException;
}
diff --git a/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamGrpcClientImpl.java b/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamGrpcClientImpl.java
new file mode 100644
index 00000000..70e75ec8
--- /dev/null
+++ b/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamGrpcClientImpl.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright (C) 2024 Hedera Hashgraph, LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.hedera.block.simulator.grpc.impl;
+
+import static com.hedera.block.simulator.metrics.SimulatorMetricTypes.Counter.LiveBlocksConsumed;
+import static java.util.Objects.requireNonNull;
+
+import com.hedera.block.common.utils.Preconditions;
+import com.hedera.block.simulator.config.data.GrpcConfig;
+import com.hedera.block.simulator.grpc.ConsumerStreamGrpcClient;
+import com.hedera.block.simulator.metrics.MetricsService;
+import com.hedera.hapi.block.protoc.BlockStreamServiceGrpc;
+import com.hedera.hapi.block.protoc.SubscribeStreamRequest;
+import com.hedera.hapi.block.protoc.SubscribeStreamResponse;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.stub.StreamObserver;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import javax.inject.Inject;
+
+/**
+ * Implementation of {@link ConsumerStreamGrpcClient} that handles the consumption of blocks
+ * via gRPC streaming. This implementation manages the connection to the server and tracks
+ * metrics related to block consumption.
+ */
+public class ConsumerStreamGrpcClientImpl implements ConsumerStreamGrpcClient {
+ // Configuration
+ private final GrpcConfig grpcConfig;
+
+ // Service dependencies
+ private final MetricsService metricsService;
+
+ // gRPC components
+ private ManagedChannel channel;
+ private BlockStreamServiceGrpc.BlockStreamServiceStub stub;
+ private StreamObserver consumerStreamObserver;
+
+ // State
+ private final List lastKnownStatuses;
+ private CountDownLatch streamLatch;
+
+ /**
+ * Constructs a new ConsumerStreamGrpcClientImpl with the specified configuration and metrics service.
+ *
+ * @param grpcConfig The configuration for gRPC connection settings
+ * @param metricsService The service for recording consumption metrics
+ * @throws NullPointerException if any parameter is null
+ */
+ @Inject
+ public ConsumerStreamGrpcClientImpl(
+ @NonNull final GrpcConfig grpcConfig, @NonNull final MetricsService metricsService) {
+ this.grpcConfig = requireNonNull(grpcConfig);
+ this.metricsService = requireNonNull(metricsService);
+ this.lastKnownStatuses = new ArrayList<>();
+ }
+
+ @Override
+ public void init() {
+ channel = ManagedChannelBuilder.forAddress(grpcConfig.serverAddress(), grpcConfig.port())
+ .usePlaintext()
+ .build();
+ stub = BlockStreamServiceGrpc.newStub(channel);
+ lastKnownStatuses.clear();
+ streamLatch = new CountDownLatch(1);
+ }
+
+ @Override
+ public void requestBlocks(long startBlock, long endBlock) throws InterruptedException {
+ Preconditions.requireWhole(startBlock);
+ Preconditions.requireWhole(endBlock);
+ Preconditions.requireGreaterOrEqual(endBlock, startBlock);
+
+ consumerStreamObserver = new ConsumerStreamObserver(metricsService, streamLatch, lastKnownStatuses);
+
+ SubscribeStreamRequest request = SubscribeStreamRequest.newBuilder()
+ .setStartBlockNumber(startBlock)
+ .setEndBlockNumber(endBlock)
+ .setAllowUnverified(true)
+ .build();
+ stub.subscribeBlockStream(request, consumerStreamObserver);
+
+ streamLatch.await();
+ }
+
+ @Override
+ public void completeStreaming() {
+ streamLatch.countDown();
+ channel.shutdown();
+ }
+
+ @Override
+ public long getConsumedBlocks() {
+ return metricsService.get(LiveBlocksConsumed).get();
+ }
+
+ @Override
+ public List getLastKnownStatuses() {
+ return List.copyOf(lastKnownStatuses);
+ }
+}
diff --git a/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamObserver.java b/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamObserver.java
new file mode 100644
index 00000000..baeb5c92
--- /dev/null
+++ b/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamObserver.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright (C) 2024 Hedera Hashgraph, LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.hedera.block.simulator.grpc.impl;
+
+import static com.hedera.block.simulator.metrics.SimulatorMetricTypes.Counter.LiveBlocksConsumed;
+import static java.lang.System.Logger.Level.ERROR;
+import static java.lang.System.Logger.Level.INFO;
+import static java.util.Objects.requireNonNull;
+
+import com.hedera.block.simulator.metrics.MetricsService;
+import com.hedera.hapi.block.protoc.SubscribeStreamResponse;
+import com.hedera.hapi.block.stream.protoc.BlockItem;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import io.grpc.Status;
+import io.grpc.stub.StreamObserver;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Implementation of StreamObserver that handles responses from the block stream subscription.
+ * This class processes incoming blocks and status messages, updating metrics accordingly.
+ */
+public class ConsumerStreamObserver implements StreamObserver {
+ private final System.Logger LOGGER = System.getLogger(getClass().getName());
+
+ // Service dependencies
+ private final MetricsService metricsService;
+
+ // State
+ private final CountDownLatch streamLatch;
+ private final List lastKnownStatuses;
+
+ /**
+ * Constructs a new ConsumerStreamObserver.
+ *
+ * @param metricsService The service for recording consumption metrics
+ * @param streamLatch A latch used to coordinate stream completion
+ * @param lastKnownStatuses List to store the most recent status messages
+ * @throws NullPointerException if any parameter is null
+ */
+ public ConsumerStreamObserver(
+ @NonNull final MetricsService metricsService,
+ @NonNull final CountDownLatch streamLatch,
+ @NonNull final List lastKnownStatuses) {
+ this.metricsService = requireNonNull(metricsService);
+ this.streamLatch = requireNonNull(streamLatch);
+ this.lastKnownStatuses = requireNonNull(lastKnownStatuses);
+ }
+
+ /**
+ * Processes incoming stream responses, handling both status messages and block items.
+ *
+ * @param subscribeStreamResponse The response received from the server
+ * @throws IllegalArgumentException if an unknown response type is received
+ */
+ @Override
+ public void onNext(SubscribeStreamResponse subscribeStreamResponse) {
+ final SubscribeStreamResponse.ResponseCase responseType = subscribeStreamResponse.getResponseCase();
+ lastKnownStatuses.add(subscribeStreamResponse.toString());
+
+ switch (responseType) {
+ case STATUS -> LOGGER.log(INFO, "Received Response: " + subscribeStreamResponse);
+ case BLOCK_ITEMS -> processBlockItems(
+ subscribeStreamResponse.getBlockItems().getBlockItemsList());
+ default -> throw new IllegalArgumentException("Unknown response type: " + responseType);
+ }
+ }
+
+ /**
+ * Handles stream errors by logging the error and releasing the stream latch.
+ *
+ * @param streamError The error that occurred during streaming
+ */
+ @Override
+ public void onError(Throwable streamError) {
+ Status status = Status.fromThrowable(streamError);
+ lastKnownStatuses.add(status.toString());
+ LOGGER.log(ERROR, "Error %s with status %s.".formatted(streamError, status), streamError);
+ streamLatch.countDown();
+ }
+
+ /**
+ * Handles stream completion by logging the event and releasing the stream latch.
+ */
+ @Override
+ public void onCompleted() {
+ LOGGER.log(INFO, "Subscribe request completed.");
+ streamLatch.countDown();
+ }
+
+ private void processBlockItems(List blockItems) {
+ blockItems.stream().filter(BlockItem::hasBlockProof).forEach(blockItem -> {
+ metricsService.get(LiveBlocksConsumed).increment();
+ LOGGER.log(
+ INFO, "Received block number: " + blockItem.getBlockProof().getBlock());
+ });
+ }
+}
diff --git a/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImpl.java b/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/PublishStreamGrpcClientImpl.java
similarity index 80%
rename from simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImpl.java
rename to simulator/src/main/java/com/hedera/block/simulator/grpc/impl/PublishStreamGrpcClientImpl.java
index e11b76b5..4954f6b0 100644
--- a/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImpl.java
+++ b/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/PublishStreamGrpcClientImpl.java
@@ -14,7 +14,7 @@
* limitations under the License.
*/
-package com.hedera.block.simulator.grpc;
+package com.hedera.block.simulator.grpc.impl;
import static com.hedera.block.simulator.metrics.SimulatorMetricTypes.Counter.LiveBlockItemsSent;
import static com.hedera.block.simulator.metrics.SimulatorMetricTypes.Counter.LiveBlocksSent;
@@ -26,6 +26,7 @@
import com.hedera.block.common.utils.ChunkUtils;
import com.hedera.block.simulator.config.data.BlockStreamConfig;
import com.hedera.block.simulator.config.data.GrpcConfig;
+import com.hedera.block.simulator.grpc.PublishStreamGrpcClient;
import com.hedera.block.simulator.metrics.MetricsService;
import com.hedera.hapi.block.protoc.BlockItemSet;
import com.hedera.hapi.block.protoc.BlockStreamServiceGrpc;
@@ -42,29 +43,36 @@
import javax.inject.Inject;
/**
- * The PublishStreamGrpcClientImpl class provides the methods to stream the
- * block and block item.
+ * Implementation of {@link PublishStreamGrpcClient} that handles the publication of blocks
+ * via gRPC streaming. This implementation manages the connection to the server, handles
+ * block chunking, and tracks metrics related to block publication.
*/
public class PublishStreamGrpcClientImpl implements PublishStreamGrpcClient {
-
private final System.Logger LOGGER = System.getLogger(getClass().getName());
- private StreamObserver requestStreamObserver;
+ // Configuration
private final BlockStreamConfig blockStreamConfig;
private final GrpcConfig grpcConfig;
- private final AtomicBoolean streamEnabled;
- private ManagedChannel channel;
+
+ // Service dependencies
private final MetricsService metricsService;
+
+ // gRPC components
+ private ManagedChannel channel;
+ private StreamObserver requestStreamObserver;
+
+ // State
+ private final AtomicBoolean streamEnabled;
private final List lastKnownStatuses = new ArrayList<>();
/**
- * Creates a new PublishStreamGrpcClientImpl instance.
+ * Creates a new PublishStreamGrpcClientImpl with the specified dependencies.
*
- * @param grpcConfig the gRPC configuration
- * @param blockStreamConfig the block stream configuration
- * @param metricsService the metrics service
- * @param streamEnabled the flag responsible for enabling and disabling of
- * the streaming
+ * @param grpcConfig The configuration for gRPC connection settings
+ * @param blockStreamConfig The configuration for block streaming parameters
+ * @param metricsService The service for recording publication metrics
+ * @param streamEnabled Flag controlling stream state
+ * @throws NullPointerException if any parameter is null
*/
@Inject
public PublishStreamGrpcClientImpl(
@@ -79,8 +87,7 @@ public PublishStreamGrpcClientImpl(
}
/**
- * Initialize the channel and stub for publishBlockStream with the desired
- * configuration.
+ * Initializes the gRPC channel and creates the publishing stream.
*/
@Override
public void init() {
@@ -94,13 +101,13 @@ public void init() {
}
/**
- * The PublishStreamObserver class implements the StreamObserver interface to
- * observe the
- * stream.
+ * Streams a list of block items to the server.
+ *
+ * @param blockItems The list of block items to stream
+ * @return true if streaming should continue, false if streaming should stop
*/
@Override
public boolean streamBlockItem(List blockItems) {
-
if (streamEnabled.get()) {
requestStreamObserver.onNext(PublishStreamRequest.newBuilder()
.setBlockItems(BlockItemSet.newBuilder()
@@ -121,13 +128,13 @@ public boolean streamBlockItem(List blockItems) {
}
/**
- * The PublishStreamObserver class implements the StreamObserver interface to
- * observe the
- * stream.
+ * Streams a complete block to the server, chunking it if necessary based on configuration.
+ *
+ * @param block The block to stream
+ * @return true if streaming should continue, false if streaming should stop
*/
@Override
public boolean streamBlock(Block block) {
-
List> streamingBatches =
ChunkUtils.chunkify(block.getItemsList(), blockStreamConfig.blockItemsBatchSize());
for (List streamingBatch : streamingBatches) {
@@ -186,9 +193,12 @@ public List getLastKnownStatuses() {
/**
* Shutdowns the channel.
+ *
+ * @throws InterruptedException if the thread is interrupted
*/
@Override
- public void shutdown() {
+ public void shutdown() throws InterruptedException {
+ completeStreaming();
channel.shutdown();
}
}
diff --git a/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamObserver.java b/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/PublishStreamObserver.java
similarity index 60%
rename from simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamObserver.java
rename to simulator/src/main/java/com/hedera/block/simulator/grpc/impl/PublishStreamObserver.java
index a843a16c..94d53c93 100644
--- a/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamObserver.java
+++ b/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/PublishStreamObserver.java
@@ -14,8 +14,9 @@
* limitations under the License.
*/
-package com.hedera.block.simulator.grpc;
+package com.hedera.block.simulator.grpc.impl;
+import static java.lang.System.Logger.Level.ERROR;
import static java.lang.System.Logger.Level.INFO;
import static java.util.Objects.requireNonNull;
@@ -23,27 +24,26 @@
import edu.umd.cs.findbugs.annotations.NonNull;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
-import java.lang.System.Logger;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
/**
- * The PublishStreamObserver class provides the methods to observe the stream of
- * the published
- * stream.
+ * Implementation of StreamObserver that handles responses from the block publishing stream.
+ * This class processes server responses and manages the stream state based on server feedback.
*/
public class PublishStreamObserver implements StreamObserver {
+ private final System.Logger LOGGER = System.getLogger(getClass().getName());
- private final Logger logger = System.getLogger(getClass().getName());
+ // State
private final AtomicBoolean streamEnabled;
private final List lastKnownStatuses;
/**
* Creates a new PublishStreamObserver instance.
*
- * @param streamEnabled is responsible for signaling, whether streaming
- * should continue
- * @param lastKnownStatuses the last known statuses
+ * @param streamEnabled Controls whether streaming should continue
+ * @param lastKnownStatuses List to store the most recent status messages
+ * @throws NullPointerException if any parameter is null
*/
public PublishStreamObserver(
@NonNull final AtomicBoolean streamEnabled, @NonNull final List lastKnownStatuses) {
@@ -51,29 +51,37 @@ public PublishStreamObserver(
this.lastKnownStatuses = requireNonNull(lastKnownStatuses);
}
- /** what will the stream observer do with the response from the server */
+ /**
+ * Processes responses from the server, storing status information.
+ *
+ * @param publishStreamResponse The response received from the server
+ */
@Override
public void onNext(PublishStreamResponse publishStreamResponse) {
lastKnownStatuses.add(publishStreamResponse.toString());
- logger.log(INFO, "Received Response: " + publishStreamResponse.toString());
+ LOGGER.log(INFO, "Received Response: " + publishStreamResponse.toString());
}
/**
- * Responsible for stream observer behaviour, in case of error. For now, we will
- * stop the stream for every error. In the future we'd want to have a retry
- * mechanism depending on the error.
+ * Handles stream errors by disabling the stream and logging the error.
+ * Currently stops the stream for all errors, but could be enhanced with
+ * retry logic in the future.
+ *
+ * @param streamError The error that occurred during streaming
*/
@Override
public void onError(@NonNull final Throwable streamError) {
streamEnabled.set(false);
Status status = Status.fromThrowable(streamError);
lastKnownStatuses.add(status.toString());
- logger.log(Logger.Level.ERROR, "Error %s with status %s.".formatted(streamError, status), streamError);
+ LOGGER.log(ERROR, "Error %s with status %s.".formatted(streamError, status), streamError);
}
- /** what will the stream observer do when the stream is completed */
+ /**
+ * Handles stream completion by logging the event.
+ */
@Override
public void onCompleted() {
- logger.log(INFO, "Completed");
+ LOGGER.log(INFO, "Completed");
}
}
diff --git a/simulator/src/main/java/com/hedera/block/simulator/metrics/SimulatorMetricTypes.java b/simulator/src/main/java/com/hedera/block/simulator/metrics/SimulatorMetricTypes.java
index 3f6f0c16..821acacb 100644
--- a/simulator/src/main/java/com/hedera/block/simulator/metrics/SimulatorMetricTypes.java
+++ b/simulator/src/main/java/com/hedera/block/simulator/metrics/SimulatorMetricTypes.java
@@ -38,7 +38,9 @@ public enum Counter implements SimulatorMetricMetadata {
/** The number of live block items sent by the simulator . */
LiveBlockItemsSent("live_block_items_sent", "Live Block Items Sent"),
/** The number of live blocks sent by the simulator */
- LiveBlocksSent("live_blocks_sent", "Live Blocks Sent");
+ LiveBlocksSent("live_blocks_sent", "Live Blocks Sent"),
+ /** The number of live blocks consumed by the simulator */
+ LiveBlocksConsumed("live_blocks_consumed", "Live Blocks Consumed");
private final String grafanaLabel;
private final String description;
diff --git a/simulator/src/main/java/com/hedera/block/simulator/mode/CombinedModeHandler.java b/simulator/src/main/java/com/hedera/block/simulator/mode/CombinedModeHandler.java
index a4b092e0..3b918718 100644
--- a/simulator/src/main/java/com/hedera/block/simulator/mode/CombinedModeHandler.java
+++ b/simulator/src/main/java/com/hedera/block/simulator/mode/CombinedModeHandler.java
@@ -16,10 +16,7 @@
package com.hedera.block.simulator.mode;
-import static java.util.Objects.requireNonNull;
-
import com.hedera.block.simulator.config.data.BlockStreamConfig;
-import edu.umd.cs.findbugs.annotations.NonNull;
/**
* The {@code CombinedModeHandler} class implements the {@link SimulatorModeHandler} interface
@@ -34,23 +31,26 @@
* {@link UnsupportedOperationException}.
*/
public class CombinedModeHandler implements SimulatorModeHandler {
- private final BlockStreamConfig blockStreamConfig;
/**
- * Constructs a new {@code CombinedModeHandler} with the specified block stream configuration.
+ * Constructs a new {@code CombinedModeHandler} with the specified configuration.
+ */
+ public CombinedModeHandler() {}
+
+ /**
+ * Initializes resources for both consuming and publishing blocks.
*
- * @param blockStreamConfig the configuration data for managing block streams
+ * @throws UnsupportedOperationException as this functionality is not yet implemented
*/
- public CombinedModeHandler(@NonNull final BlockStreamConfig blockStreamConfig) {
- this.blockStreamConfig = requireNonNull(blockStreamConfig);
+ @Override
+ public void init() {
+ throw new UnsupportedOperationException();
}
/**
- * Starts the simulator in combined mode, handling both consumption and publication
- * of block stream. However, this method is currently not implemented, and will throw
- * an {@link UnsupportedOperationException}.
+ * Starts both consuming and publishing blocks simultaneously.
*
- * @throws UnsupportedOperationException as the method is not yet implemented
+ * @throws UnsupportedOperationException as this functionality is not yet implemented
*/
@Override
public void start() {
@@ -58,7 +58,9 @@ public void start() {
}
/**
- * Stops the handler and manager from streaming.
+ * Gracefully stops both consumption and publishing of blocks.
+ *
+ * @throws UnsupportedOperationException as this functionality is not yet implemented
*/
@Override
public void stop() {
diff --git a/simulator/src/main/java/com/hedera/block/simulator/mode/ConsumerModeHandler.java b/simulator/src/main/java/com/hedera/block/simulator/mode/ConsumerModeHandler.java
index 2b593867..feac24e8 100644
--- a/simulator/src/main/java/com/hedera/block/simulator/mode/ConsumerModeHandler.java
+++ b/simulator/src/main/java/com/hedera/block/simulator/mode/ConsumerModeHandler.java
@@ -16,9 +16,11 @@
package com.hedera.block.simulator.mode;
+import static java.lang.System.Logger.Level.INFO;
import static java.util.Objects.requireNonNull;
import com.hedera.block.simulator.config.data.BlockStreamConfig;
+import com.hedera.block.simulator.grpc.ConsumerStreamGrpcClient;
import edu.umd.cs.findbugs.annotations.NonNull;
/**
@@ -34,31 +36,49 @@
* {@link UnsupportedOperationException}.
*/
public class ConsumerModeHandler implements SimulatorModeHandler {
+ private final System.Logger LOGGER = System.getLogger(getClass().getName());
- private final BlockStreamConfig blockStreamConfig;
+ // Service dependencies
+ private final ConsumerStreamGrpcClient consumerStreamGrpcClient;
/**
- * Constructs a new {@code ConsumerModeHandler} with the specified block stream configuration.
+ * Constructs a new {@code ConsumerModeHandler} with the specified dependencies.
*
- * @param blockStreamConfig the configuration data for managing block streams
+ * @param consumerStreamGrpcClient The client for consuming blocks via gRPC
+ * @throws NullPointerException if any parameter is null
*/
- public ConsumerModeHandler(@NonNull final BlockStreamConfig blockStreamConfig) {
- this.blockStreamConfig = requireNonNull(blockStreamConfig);
+ public ConsumerModeHandler(@NonNull final ConsumerStreamGrpcClient consumerStreamGrpcClient) {
+ this.consumerStreamGrpcClient = requireNonNull(consumerStreamGrpcClient);
}
/**
- * Starts the simulator and initiate streaming, depending on the working mode.
+ * Initializes the gRPC channel for block consumption.
*/
@Override
- public void start() {
- throw new UnsupportedOperationException();
+ public void init() {
+ consumerStreamGrpcClient.init();
+ LOGGER.log(INFO, "gRPC Channel initialized for consuming blocks.");
}
/**
- * Stops the handler and manager from streaming.
+ * Starts consuming blocks from the stream beginning at genesis (block 0).
+ * Currently, requests an infinite stream of blocks starting from genesis.
+ *
+ * @throws InterruptedException if the consumption process is interrupted
+ */
+ @Override
+ public void start() throws InterruptedException {
+ LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator is starting in consumer mode.");
+ consumerStreamGrpcClient.requestBlocks(0, 0);
+ }
+
+ /**
+ * Gracefully stops block consumption and shuts down the gRPC client.
+ *
+ * @throws InterruptedException if the shutdown process is interrupted
*/
@Override
- public void stop() {
- throw new UnsupportedOperationException();
+ public void stop() throws InterruptedException {
+ consumerStreamGrpcClient.completeStreaming();
}
}
diff --git a/simulator/src/main/java/com/hedera/block/simulator/mode/PublisherModeHandler.java b/simulator/src/main/java/com/hedera/block/simulator/mode/PublisherModeHandler.java
index ff89b1a3..537e32b0 100644
--- a/simulator/src/main/java/com/hedera/block/simulator/mode/PublisherModeHandler.java
+++ b/simulator/src/main/java/com/hedera/block/simulator/mode/PublisherModeHandler.java
@@ -47,26 +47,29 @@
*/
public class PublisherModeHandler implements SimulatorModeHandler {
private final System.Logger LOGGER = System.getLogger(getClass().getName());
- private final BlockStreamManager blockStreamManager;
+
+ // Configuration fields
private final BlockStreamConfig blockStreamConfig;
- private final PublishStreamGrpcClient publishStreamGrpcClient;
private final StreamingMode streamingMode;
private final int delayBetweenBlockItems;
private final int millisecondsPerBlock;
+
+ // Service dependencies
+ private final BlockStreamManager blockStreamManager;
+ private final PublishStreamGrpcClient publishStreamGrpcClient;
private final MetricsService metricsService;
- private final AtomicBoolean shouldPublish = new AtomicBoolean(true);
+
+ // State fields
+ private final AtomicBoolean shouldPublish;
/**
- * Constructs a new {@code PublisherModeHandler} with the specified block stream
- * configuration and publisher client.
+ * Constructs a new {@code PublisherModeHandler} with the specified dependencies.
*
- * @param blockStreamConfig the configuration data for managing block
- * streams
- * @param publishStreamGrpcClient the grpc client used for streaming blocks
- * @param blockStreamManager the block stream manager, responsible for
- * generating blocks
- * @param metricsService the metrics service to record and report usage
- * statistics
+ * @param blockStreamConfig The configuration for block streaming parameters
+ * @param publishStreamGrpcClient The client for publishing blocks via gRPC
+ * @param blockStreamManager The manager responsible for block generation
+ * @param metricsService The service for recording metrics
+ * @throws NullPointerException if any parameter is null
*/
public PublisherModeHandler(
@NonNull final BlockStreamConfig blockStreamConfig,
@@ -81,6 +84,13 @@ public PublisherModeHandler(
streamingMode = blockStreamConfig.streamingMode();
delayBetweenBlockItems = blockStreamConfig.delayBetweenBlockItems();
millisecondsPerBlock = blockStreamConfig.millisecondsPerBlock();
+ shouldPublish = new AtomicBoolean(true);
+ }
+
+ public void init() {
+ blockStreamManager.init();
+ publishStreamGrpcClient.init();
+ LOGGER.log(INFO, "gRPC Channel initialized for publishing blocks.");
}
/**
@@ -95,7 +105,7 @@ public PublisherModeHandler(
*/
@Override
public void start() throws BlockSimulatorParsingException, IOException, InterruptedException {
- LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator has started streaming.");
+ LOGGER.log(INFO, "Block Stream Simulator is starting in publisher mode.");
if (streamingMode == StreamingMode.MILLIS_PER_BLOCK) {
millisPerBlockStreaming();
} else {
@@ -170,7 +180,8 @@ private void constantRateStreaming() throws InterruptedException, IOException, B
* Stops the handler and manager from streaming.
*/
@Override
- public void stop() {
+ public void stop() throws InterruptedException {
shouldPublish.set(false);
+ publishStreamGrpcClient.shutdown();
}
}
diff --git a/simulator/src/main/java/com/hedera/block/simulator/mode/SimulatorModeHandler.java b/simulator/src/main/java/com/hedera/block/simulator/mode/SimulatorModeHandler.java
index 273025e9..4be5c99b 100644
--- a/simulator/src/main/java/com/hedera/block/simulator/mode/SimulatorModeHandler.java
+++ b/simulator/src/main/java/com/hedera/block/simulator/mode/SimulatorModeHandler.java
@@ -31,17 +31,18 @@
* Publisher mode: The simulator publishes data to the block stream.
* Combined mode: The simulator handles both consuming and publishing.
*
- *
- * The {@code SimulatorModeHandler} is responsible for managing the simulator lifecycle,
- * starting and stopping the streaming of blocks and handling any exceptions that may arise
- * during the process.
*/
public interface SimulatorModeHandler {
/**
- * Starts the simulator and initiates the streaming process, based on the
- * configuration. The behavior
- * of this method depends on the specific working mode (e.g., consumer, publisher, both).
+ * Initializes the handler by setting up required resources and connections.
+ * This method should be called before {@link #start()}.
+ */
+ void init();
+
+ /**
+ * Starts the simulator and initiates the streaming process according to the configured mode.
+ * The behavior of this method depends on the specific working mode (consumer, publisher, or combined).
*
* @throws BlockSimulatorParsingException if an error occurs while parsing blocks
* @throws IOException if an I/O error occurs during block streaming
@@ -50,7 +51,9 @@ public interface SimulatorModeHandler {
void start() throws BlockSimulatorParsingException, IOException, InterruptedException;
/**
- * Stops the handler and manager from streaming.
+ * Gracefully stops the handler, cleaning up resources and terminating any active streams.
+ *
+ * @throws InterruptedException if the shutdown process is interrupted
*/
- void stop();
+ void stop() throws InterruptedException;
}
diff --git a/simulator/src/main/java/module-info.java b/simulator/src/main/java/module-info.java
index 8d7b5299..2da924ff 100644
--- a/simulator/src/main/java/module-info.java
+++ b/simulator/src/main/java/module-info.java
@@ -10,6 +10,7 @@
exports com.hedera.block.simulator.grpc;
exports com.hedera.block.simulator.generator;
exports com.hedera.block.simulator.metrics;
+ exports com.hedera.block.simulator.grpc.impl;
requires static com.github.spotbugs.annotations;
requires static com.google.auto.service;
diff --git a/simulator/src/test/java/com/hedera/block/simulator/BlockStreamSimulatorTest.java b/simulator/src/test/java/com/hedera/block/simulator/BlockStreamSimulatorTest.java
index 86161461..f52fab22 100644
--- a/simulator/src/test/java/com/hedera/block/simulator/BlockStreamSimulatorTest.java
+++ b/simulator/src/test/java/com/hedera/block/simulator/BlockStreamSimulatorTest.java
@@ -33,6 +33,7 @@
import com.hedera.block.simulator.config.data.StreamStatus;
import com.hedera.block.simulator.exception.BlockSimulatorParsingException;
import com.hedera.block.simulator.generator.BlockStreamManager;
+import com.hedera.block.simulator.grpc.ConsumerStreamGrpcClient;
import com.hedera.block.simulator.grpc.PublishStreamGrpcClient;
import com.hedera.block.simulator.metrics.MetricsService;
import com.hedera.block.simulator.metrics.MetricsServiceImpl;
@@ -65,6 +66,9 @@ class BlockStreamSimulatorTest {
@Mock
private PublishStreamGrpcClient publishStreamGrpcClient;
+ @Mock
+ private ConsumerStreamGrpcClient consumerStreamGrpcClient;
+
private BlockStreamSimulatorApp blockStreamSimulator;
private MetricsService metricsService;
@@ -75,8 +79,8 @@ void setUp() throws IOException {
Map.of("blockStream.maxBlockItemsToStream", "100", "blockStream.streamingMode", "CONSTANT_RATE"));
metricsService = new MetricsServiceImpl(getTestMetrics(configuration));
- blockStreamSimulator =
- new BlockStreamSimulatorApp(configuration, blockStreamManager, publishStreamGrpcClient, metricsService);
+ blockStreamSimulator = new BlockStreamSimulatorApp(
+ configuration, blockStreamManager, publishStreamGrpcClient, consumerStreamGrpcClient, metricsService);
}
@AfterEach
@@ -94,6 +98,26 @@ void start_logsStartedMessage() throws InterruptedException, BlockSimulatorParsi
assertTrue(blockStreamSimulator.isRunning());
}
+ @Test
+ void startPublishing_logsStartedMessage() throws InterruptedException, BlockSimulatorParsingException, IOException {
+ blockStreamSimulator.start();
+ assertTrue(blockStreamSimulator.isRunning());
+ }
+
+ @Test
+ void startConsuming() throws IOException, BlockSimulatorParsingException, InterruptedException {
+ Configuration configuration = TestUtils.getTestConfiguration(Map.of("blockStream.simulatorMode", "CONSUMER"));
+
+ metricsService = new MetricsServiceImpl(getTestMetrics(configuration));
+ blockStreamSimulator = new BlockStreamSimulatorApp(
+ configuration, blockStreamManager, publishStreamGrpcClient, consumerStreamGrpcClient, metricsService);
+ blockStreamSimulator.start();
+
+ verify(consumerStreamGrpcClient).init();
+ verify(consumerStreamGrpcClient).requestBlocks(0, 0);
+ assertTrue(blockStreamSimulator.isRunning());
+ }
+
@Test
void start_constantRateStreaming() throws InterruptedException, BlockSimulatorParsingException, IOException {
@@ -110,8 +134,9 @@ void start_constantRateStreaming() throws InterruptedException, BlockSimulatorPa
BlockStreamManager blockStreamManager = mock(BlockStreamManager.class);
when(blockStreamManager.getNextBlock()).thenReturn(block1, block2, null);
-
Configuration configuration = TestUtils.getTestConfiguration(Map.of(
+ "blockStream.simulatorMode",
+ "PUBLISHER",
"blockStream.maxBlockItemsToStream",
"2",
"generator.managerImplementation",
@@ -123,8 +148,8 @@ void start_constantRateStreaming() throws InterruptedException, BlockSimulatorPa
"blockStream.blockItemsBatchSize",
"2"));
- BlockStreamSimulatorApp blockStreamSimulator =
- new BlockStreamSimulatorApp(configuration, blockStreamManager, publishStreamGrpcClient, metricsService);
+ BlockStreamSimulatorApp blockStreamSimulator = new BlockStreamSimulatorApp(
+ configuration, blockStreamManager, publishStreamGrpcClient, consumerStreamGrpcClient, metricsService);
blockStreamSimulator.start();
assertTrue(blockStreamSimulator.isRunning());
@@ -135,10 +160,28 @@ private String getAbsoluteFolder(String relativePath) {
}
@Test
- void stop_doesNotThrowException() throws InterruptedException {
+ void stopPublishing_doesNotThrowException() throws InterruptedException, IOException {
+ Configuration configuration = TestUtils.getTestConfiguration(Map.of("blockStream.simulatorMode", "PUBLISHER"));
+
+ metricsService = new MetricsServiceImpl(getTestMetrics(configuration));
+ blockStreamSimulator = new BlockStreamSimulatorApp(
+ configuration, blockStreamManager, publishStreamGrpcClient, consumerStreamGrpcClient, metricsService);
+
+ assertDoesNotThrow(() -> blockStreamSimulator.stop());
+ assertFalse(blockStreamSimulator.isRunning());
+ verify(publishStreamGrpcClient, atLeast(1)).shutdown();
+ }
+
+ @Test
+ void stopConsuming_doesNotThrowException() throws InterruptedException, IOException {
+ Configuration configuration = TestUtils.getTestConfiguration(Map.of("blockStream.simulatorMode", "CONSUMER"));
+
+ metricsService = new MetricsServiceImpl(getTestMetrics(configuration));
+ blockStreamSimulator = new BlockStreamSimulatorApp(
+ configuration, blockStreamManager, publishStreamGrpcClient, consumerStreamGrpcClient, metricsService);
assertDoesNotThrow(() -> blockStreamSimulator.stop());
assertFalse(blockStreamSimulator.isRunning());
- verify(publishStreamGrpcClient, atLeast(1)).completeStreaming();
+ verify(consumerStreamGrpcClient, atLeast(1)).completeStreaming();
}
@Test
@@ -151,6 +194,8 @@ void start_millisPerBlockStreaming() throws InterruptedException, IOException, B
when(blockStreamManager.getNextBlock()).thenReturn(block, block, null);
Configuration configuration = TestUtils.getTestConfiguration(Map.of(
+ "blockStream.simulatorMode",
+ "PUBLISHER",
"blockStream.maxBlockItemsToStream",
"2",
"generator.managerImplementation",
@@ -160,8 +205,8 @@ void start_millisPerBlockStreaming() throws InterruptedException, IOException, B
"blockStream.streamingMode",
"MILLIS_PER_BLOCK"));
- BlockStreamSimulatorApp blockStreamSimulator =
- new BlockStreamSimulatorApp(configuration, blockStreamManager, publishStreamGrpcClient, metricsService);
+ BlockStreamSimulatorApp blockStreamSimulator = new BlockStreamSimulatorApp(
+ configuration, blockStreamManager, publishStreamGrpcClient, consumerStreamGrpcClient, metricsService);
blockStreamSimulator.start();
assertTrue(blockStreamSimulator.isRunning());
@@ -178,7 +223,8 @@ void start_millisPerSecond_streamingLagVerifyWarnLog()
when(blockStreamManager.getNextBlock()).thenReturn(block, block, null);
PublishStreamGrpcClient publishStreamGrpcClient = mock(PublishStreamGrpcClient.class);
- // simulate that the first block takes 15ms to stream, when the limit is 10, to force to go
+ // simulate that the first block takes 15ms to stream, when the limit is 10, to
+ // force to go
// over WARN Path.
when(publishStreamGrpcClient.streamBlock(any()))
.thenAnswer(invocation -> {
@@ -188,6 +234,8 @@ void start_millisPerSecond_streamingLagVerifyWarnLog()
.thenReturn(true);
Configuration configuration = TestUtils.getTestConfiguration(Map.of(
+ "blockStream.simulatorMode",
+ "PUBLISHER",
"generator.managerImplementation",
"BlockAsFileBlockStreamManager",
"generator.rootPath",
@@ -201,8 +249,8 @@ void start_millisPerSecond_streamingLagVerifyWarnLog()
"blockStream.blockItemsBatchSize",
"1"));
- BlockStreamSimulatorApp blockStreamSimulator =
- new BlockStreamSimulatorApp(configuration, blockStreamManager, publishStreamGrpcClient, metricsService);
+ BlockStreamSimulatorApp blockStreamSimulator = new BlockStreamSimulatorApp(
+ configuration, blockStreamManager, publishStreamGrpcClient, consumerStreamGrpcClient, metricsService);
List logRecords = captureLogs();
blockStreamSimulator.start();
@@ -217,16 +265,8 @@ void start_millisPerSecond_streamingLagVerifyWarnLog()
@Test
void start_withBothMode_throwsUnsupportedOperationException() throws Exception {
Configuration configuration = TestUtils.getTestConfiguration(Map.of("blockStream.simulatorMode", "BOTH"));
- blockStreamSimulator =
- new BlockStreamSimulatorApp(configuration, blockStreamManager, publishStreamGrpcClient, metricsService);
- assertThrows(UnsupportedOperationException.class, () -> blockStreamSimulator.start());
- }
-
- @Test
- void start_withConsumerMode_throwsUnsupportedOperationException() throws Exception {
- Configuration configuration = TestUtils.getTestConfiguration(Map.of("blockStream.simulatorMode", "CONSUMER"));
- blockStreamSimulator =
- new BlockStreamSimulatorApp(configuration, blockStreamManager, publishStreamGrpcClient, metricsService);
+ blockStreamSimulator = new BlockStreamSimulatorApp(
+ configuration, blockStreamManager, publishStreamGrpcClient, consumerStreamGrpcClient, metricsService);
assertThrows(UnsupportedOperationException.class, () -> blockStreamSimulator.start());
}
@@ -239,7 +279,12 @@ void constructor_throwsExceptionForNullSimulatorMode() {
when(blockStreamConfig.simulatorMode()).thenReturn(null);
assertThrows(NullPointerException.class, () -> {
- new BlockStreamSimulatorApp(configuration, blockStreamManager, publishStreamGrpcClient, metricsService);
+ new BlockStreamSimulatorApp(
+ configuration,
+ blockStreamManager,
+ publishStreamGrpcClient,
+ consumerStreamGrpcClient,
+ metricsService);
});
}
diff --git a/simulator/src/test/java/com/hedera/block/simulator/TestUtils.java b/simulator/src/test/java/com/hedera/block/simulator/TestUtils.java
index 2b5865ec..60a16391 100644
--- a/simulator/src/test/java/com/hedera/block/simulator/TestUtils.java
+++ b/simulator/src/test/java/com/hedera/block/simulator/TestUtils.java
@@ -24,6 +24,7 @@
import com.swirlds.metrics.api.Metrics;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.IOException;
+import java.net.ServerSocket;
import java.nio.file.Path;
import java.util.Map;
@@ -55,4 +56,10 @@ public static Metrics getTestMetrics(@NonNull Configuration configuration) {
metricsProvider.start();
return metrics;
}
+
+ public static int findFreePort() throws IOException {
+ try (ServerSocket socket = new ServerSocket(0)) {
+ return socket.getLocalPort();
+ }
+ }
}
diff --git a/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsDirBlockStreamManagerTest.java b/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsDirBlockStreamManagerTest.java
index 54236fd7..430a2795 100644
--- a/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsDirBlockStreamManagerTest.java
+++ b/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsDirBlockStreamManagerTest.java
@@ -46,6 +46,7 @@ void getGenerationMode() {
@Test
void getNextBlockItem() throws IOException, BlockSimulatorParsingException {
BlockStreamManager blockStreamManager = getBlockAsDirBlockStreamManager(getAbsoluteFolder(rootFolder));
+ blockStreamManager.init();
for (int i = 0; i < 1000; i++) {
assertNotNull(blockStreamManager.getNextBlockItem());
@@ -55,6 +56,7 @@ void getNextBlockItem() throws IOException, BlockSimulatorParsingException {
@Test
void getNextBlock() throws IOException, BlockSimulatorParsingException {
BlockStreamManager blockStreamManager = getBlockAsDirBlockStreamManager(getAbsoluteFolder(rootFolder));
+ blockStreamManager.init();
for (int i = 0; i < 3000; i++) {
assertNotNull(blockStreamManager.getNextBlock());
@@ -71,7 +73,8 @@ void BlockAsFileBlockStreamManagerInvalidRootPath() {
private BlockStreamManager getBlockAsDirBlockStreamManager(String rootFolder) {
final BlockGeneratorConfig blockGeneratorConfig = new BlockGeneratorConfig(
GenerationMode.DIR, rootFolder, "BlockAsDirBlockStreamManager", 36, ".blk", 0, 0);
-
- return new BlockAsDirBlockStreamManager(blockGeneratorConfig);
+ final BlockStreamManager blockStreamManager = new BlockAsDirBlockStreamManager(blockGeneratorConfig);
+ blockStreamManager.init();
+ return blockStreamManager;
}
}
diff --git a/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsFileBlockStreamManagerTest.java b/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsFileBlockStreamManagerTest.java
index 882db4c0..685e2b23 100644
--- a/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsFileBlockStreamManagerTest.java
+++ b/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsFileBlockStreamManagerTest.java
@@ -23,27 +23,31 @@
import com.hedera.block.simulator.exception.BlockSimulatorParsingException;
import java.io.IOException;
import java.nio.file.Paths;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
class BlockAsFileBlockStreamManagerTest {
private final String gzRootFolder = "src/main/resources/block-0.0.3/";
+ private BlockStreamManager blockStreamManager;
private String getAbsoluteFolder(String relativePath) {
return Paths.get(relativePath).toAbsolutePath().toString();
}
+ @BeforeEach
+ void setUp() {
+ blockStreamManager = getBlockAsFileBlockStreamManager(getAbsoluteFolder(gzRootFolder));
+ blockStreamManager.init();
+ }
+
@Test
void getGenerationMode() {
- BlockStreamManager blockStreamManager =
- getBlockAsFileBlockStreamManager(getAbsoluteFolder(gzRootFolder));
assertEquals(GenerationMode.DIR, blockStreamManager.getGenerationMode());
}
@Test
void getNextBlock() throws IOException, BlockSimulatorParsingException {
- BlockStreamManager blockStreamManager =
- getBlockAsFileBlockStreamManager(getAbsoluteFolder(gzRootFolder));
for (int i = 0; i < 3000; i++) {
assertNotNull(blockStreamManager.getNextBlock());
}
@@ -51,8 +55,6 @@ void getNextBlock() throws IOException, BlockSimulatorParsingException {
@Test
void getNextBlockItem() throws IOException, BlockSimulatorParsingException {
- BlockStreamManager blockStreamManager =
- getBlockAsFileBlockStreamManager(getAbsoluteFolder(gzRootFolder));
for (int i = 0; i < 35000; i++) {
assertNotNull(blockStreamManager.getNextBlockItem());
}
@@ -61,8 +63,9 @@ void getNextBlockItem() throws IOException, BlockSimulatorParsingException {
@Test
void loadBlockBlk() throws IOException, BlockSimulatorParsingException {
String blkRootFolder = "src/test/resources/block-0.0.3-blk/";
- BlockStreamManager blockStreamManager =
- getBlockAsFileBlockStreamManager(getAbsoluteFolder(blkRootFolder));
+ BlockStreamManager blockStreamManager = getBlockAsFileBlockStreamManager(getAbsoluteFolder(blkRootFolder));
+ blockStreamManager.init();
+
assertNotNull(blockStreamManager.getNextBlock());
}
@@ -70,22 +73,20 @@ void loadBlockBlk() throws IOException, BlockSimulatorParsingException {
void BlockAsFileBlockStreamManagerInvalidRootPath() {
assertThrows(
RuntimeException.class,
- () ->
- getBlockAsFileBlockStreamManager(
- getAbsoluteFolder("src/test/resources/BlockAsDirException/1/")));
+ () -> getBlockAsFileBlockStreamManager(getAbsoluteFolder("src/test/resources/BlockAsDirException/1/")));
}
- private BlockAsFileBlockStreamManager getBlockAsFileBlockStreamManager(String rootFolder) {
-
- BlockGeneratorConfig blockGeneratorConfig =
- BlockGeneratorConfig.builder()
- .generationMode(GenerationMode.DIR)
- .folderRootPath(rootFolder)
- .managerImplementation("BlockAsFileBlockStreamManager")
- .paddedLength(36)
- .fileExtension(".blk")
- .build();
+ private BlockStreamManager getBlockAsFileBlockStreamManager(String rootFolder) {
+ BlockGeneratorConfig blockGeneratorConfig = BlockGeneratorConfig.builder()
+ .generationMode(GenerationMode.DIR)
+ .folderRootPath(rootFolder)
+ .managerImplementation("BlockAsFileBlockStreamManager")
+ .paddedLength(36)
+ .fileExtension(".blk")
+ .build();
- return new BlockAsFileBlockStreamManager(blockGeneratorConfig);
+ BlockStreamManager blockStreamManager = new BlockAsFileBlockStreamManager(blockGeneratorConfig);
+ blockStreamManager.init();
+ return blockStreamManager;
}
}
diff --git a/simulator/src/test/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImplTest.java b/simulator/src/test/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImplTest.java
deleted file mode 100644
index 64ec0f7d..00000000
--- a/simulator/src/test/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImplTest.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Copyright (C) 2024 Hedera Hashgraph, LLC
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.hedera.block.simulator.grpc;
-
-import static com.hedera.block.simulator.TestUtils.getTestMetrics;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-
-import com.hedera.block.simulator.TestUtils;
-import com.hedera.block.simulator.config.data.BlockStreamConfig;
-import com.hedera.block.simulator.config.data.GrpcConfig;
-import com.hedera.block.simulator.metrics.MetricsService;
-import com.hedera.block.simulator.metrics.MetricsServiceImpl;
-import com.hedera.hapi.block.stream.protoc.Block;
-import com.hedera.hapi.block.stream.protoc.BlockItem;
-import com.swirlds.config.api.Configuration;
-import io.grpc.ManagedChannel;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.junit.jupiter.MockitoExtension;
-
-@ExtendWith(MockitoExtension.class)
-class PublishStreamGrpcClientImplTest {
-
- private MetricsService metricsService;
-
- GrpcConfig grpcConfig;
- BlockStreamConfig blockStreamConfig;
- AtomicBoolean streamEnabled;
-
- @BeforeEach
- void setUp() throws IOException {
-
- grpcConfig = TestUtils.getTestConfiguration().getConfigData(GrpcConfig.class);
- blockStreamConfig = TestUtils.getTestConfiguration(Map.of("blockStream.blockItemsBatchSize", "2"))
- .getConfigData(BlockStreamConfig.class);
-
- Configuration config = TestUtils.getTestConfiguration();
- metricsService = new MetricsServiceImpl(getTestMetrics(config));
- streamEnabled = new AtomicBoolean(true);
- }
-
- @AfterEach
- void tearDown() {}
-
- @Test
- void streamBlockItem() {
- BlockItem blockItem = BlockItem.newBuilder().build();
- PublishStreamGrpcClientImpl publishStreamGrpcClient =
- new PublishStreamGrpcClientImpl(grpcConfig, blockStreamConfig, metricsService, streamEnabled);
- publishStreamGrpcClient.init();
- boolean result = publishStreamGrpcClient.streamBlockItem(List.of(blockItem));
- assertTrue(result);
- }
-
- @Test
- void streamBlock() {
- BlockItem blockItem = BlockItem.newBuilder().build();
- Block block = Block.newBuilder().addItems(blockItem).build();
-
- Block block1 = Block.newBuilder()
- .addItems(blockItem)
- .addItems(blockItem)
- .addItems(blockItem)
- .build();
-
- PublishStreamGrpcClientImpl publishStreamGrpcClient =
- new PublishStreamGrpcClientImpl(grpcConfig, blockStreamConfig, metricsService, streamEnabled);
-
- publishStreamGrpcClient.init();
- assertTrue(publishStreamGrpcClient.getLastKnownStatuses().isEmpty());
-
- boolean result = publishStreamGrpcClient.streamBlock(block);
- assertTrue(result);
-
- boolean result1 = publishStreamGrpcClient.streamBlock(block1);
- assertTrue(result1);
-
- assertEquals(2, publishStreamGrpcClient.getPublishedBlocks());
- }
-
- @Test
- void streamBlockFailsBecauseOfCompletedStreaming() throws InterruptedException {
- BlockItem blockItem = BlockItem.newBuilder().build();
- Block block = Block.newBuilder().addItems(blockItem).build();
-
- PublishStreamGrpcClientImpl publishStreamGrpcClient =
- new PublishStreamGrpcClientImpl(grpcConfig, blockStreamConfig, metricsService, streamEnabled);
-
- publishStreamGrpcClient.init();
- assertTrue(publishStreamGrpcClient.getLastKnownStatuses().isEmpty());
-
- publishStreamGrpcClient.completeStreaming();
-
- assertThrows(IllegalStateException.class, () -> publishStreamGrpcClient.streamBlock(block));
- }
-
- @Test
- void streamBlockReturnsFalse() {
- BlockItem blockItem = BlockItem.newBuilder().build();
- Block block = Block.newBuilder().addItems(blockItem).build();
- streamEnabled.set(false);
- PublishStreamGrpcClientImpl publishStreamGrpcClient =
- new PublishStreamGrpcClientImpl(grpcConfig, blockStreamConfig, metricsService, streamEnabled);
- publishStreamGrpcClient.init();
-
- boolean result = publishStreamGrpcClient.streamBlock(block);
- assertFalse(result);
- }
-
- @Test
- void testShutdown() throws Exception {
- PublishStreamGrpcClientImpl publishStreamGrpcClient =
- new PublishStreamGrpcClientImpl(grpcConfig, blockStreamConfig, metricsService, streamEnabled);
- publishStreamGrpcClient.init();
-
- Field channelField = PublishStreamGrpcClientImpl.class.getDeclaredField("channel");
- ManagedChannel mockChannel = mock(ManagedChannel.class);
-
- try {
- channelField.setAccessible(true);
- channelField.set(publishStreamGrpcClient, mockChannel);
- } finally {
- channelField.setAccessible(false);
- }
- publishStreamGrpcClient.shutdown();
-
- // Verify that channel.shutdown() was called
- verify(mockChannel).shutdown();
- }
-}
diff --git a/simulator/src/test/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamGrpcClientImplTest.java b/simulator/src/test/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamGrpcClientImplTest.java
new file mode 100644
index 00000000..089f073c
--- /dev/null
+++ b/simulator/src/test/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamGrpcClientImplTest.java
@@ -0,0 +1,167 @@
+/*
+ * Copyright (C) 2024 Hedera Hashgraph, LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.hedera.block.simulator.grpc.impl;
+
+import static com.hedera.block.simulator.TestUtils.findFreePort;
+import static com.hedera.block.simulator.TestUtils.getTestMetrics;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.when;
+
+import com.hedera.block.simulator.TestUtils;
+import com.hedera.block.simulator.config.data.GrpcConfig;
+import com.hedera.block.simulator.grpc.ConsumerStreamGrpcClient;
+import com.hedera.block.simulator.metrics.MetricsService;
+import com.hedera.block.simulator.metrics.MetricsServiceImpl;
+import com.hedera.hapi.block.protoc.BlockItemSet;
+import com.hedera.hapi.block.protoc.BlockStreamServiceGrpc;
+import com.hedera.hapi.block.protoc.SubscribeStreamRequest;
+import com.hedera.hapi.block.protoc.SubscribeStreamResponse;
+import com.hedera.hapi.block.protoc.SubscribeStreamResponseCode;
+import com.hedera.hapi.block.stream.output.protoc.BlockHeader;
+import com.hedera.hapi.block.stream.protoc.BlockItem;
+import com.hedera.hapi.block.stream.protoc.BlockProof;
+import com.swirlds.config.api.Configuration;
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
+import io.grpc.stub.StreamObserver;
+import java.io.IOException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+public class ConsumerStreamGrpcClientImplTest {
+ @Mock
+ private GrpcConfig grpcConfig;
+
+ private ConsumerStreamGrpcClient consumerStreamGrpcClientImpl;
+ private Server server;
+
+ @BeforeEach
+ void setUp() throws IOException {
+ MockitoAnnotations.openMocks(this);
+ final int serverPort = findFreePort();
+ server = ServerBuilder.forPort(serverPort)
+ .addService(new BlockStreamServiceGrpc.BlockStreamServiceImplBase() {
+ @Override
+ public void subscribeBlockStream(
+ SubscribeStreamRequest request, StreamObserver responseObserver) {
+
+ // Simulate streaming blocks
+ long startBlock = request.getStartBlockNumber();
+ long endBlock = request.getEndBlockNumber();
+
+ for (long i = startBlock; i < endBlock; i++) {
+ // Simulate block items
+ BlockItem blockItemHeader = BlockItem.newBuilder()
+ .setBlockHeader(BlockHeader.newBuilder()
+ .setNumber(i)
+ .build())
+ .build();
+ BlockItem blockItemProof = BlockItem.newBuilder()
+ .setBlockProof(
+ BlockProof.newBuilder().setBlock(i).build())
+ .build();
+
+ BlockItemSet blockItems = BlockItemSet.newBuilder()
+ .addBlockItems(blockItemHeader)
+ .addBlockItems(blockItemProof)
+ .build();
+
+ responseObserver.onNext(SubscribeStreamResponse.newBuilder()
+ .setBlockItems(blockItems)
+ .build());
+ }
+
+ // Send success status code at the end
+ responseObserver.onNext(SubscribeStreamResponse.newBuilder()
+ .setStatus(SubscribeStreamResponseCode.READ_STREAM_SUCCESS)
+ .build());
+ responseObserver.onCompleted();
+ }
+ })
+ .build()
+ .start();
+
+ when(grpcConfig.serverAddress()).thenReturn("localhost");
+ when(grpcConfig.port()).thenReturn(serverPort);
+
+ final Configuration config = TestUtils.getTestConfiguration();
+ final MetricsService metricsService = new MetricsServiceImpl(getTestMetrics(config));
+ consumerStreamGrpcClientImpl = new ConsumerStreamGrpcClientImpl(grpcConfig, metricsService);
+ consumerStreamGrpcClientImpl.init();
+ }
+
+ @AfterEach
+ public void tearDown() throws InterruptedException {
+ consumerStreamGrpcClientImpl.completeStreaming();
+ server.shutdownNow();
+ }
+
+ @Test
+ public void testInit() {
+ assertTrue(consumerStreamGrpcClientImpl.getLastKnownStatuses().isEmpty());
+ }
+
+ @Test
+ void requestBlocks_Success() throws InterruptedException {
+ final long startBlock = 0;
+ final long endBlock = 5;
+
+ assertEquals(startBlock, consumerStreamGrpcClientImpl.getConsumedBlocks());
+ assertTrue(consumerStreamGrpcClientImpl.getLastKnownStatuses().isEmpty());
+
+ consumerStreamGrpcClientImpl.requestBlocks(startBlock, endBlock);
+
+ // We check if the final status matches what we have send from the server.
+ final String lastStatus =
+ consumerStreamGrpcClientImpl.getLastKnownStatuses().getLast();
+ assertTrue(lastStatus.contains("status: %s".formatted(SubscribeStreamResponseCode.READ_STREAM_SUCCESS.name())));
+
+ assertEquals(endBlock, consumerStreamGrpcClientImpl.getConsumedBlocks());
+ }
+
+ @Test
+ void requestBlocks_InvalidStartBlock() {
+ final long startBlock = -1;
+ final long endBlock = 5;
+
+ assertThrows(
+ IllegalArgumentException.class, () -> consumerStreamGrpcClientImpl.requestBlocks(startBlock, endBlock));
+ }
+
+ @Test
+ void requestBlocks_InvalidEndBlock() {
+ final long startBlock = 0;
+ final long endBlock = -1;
+
+ assertThrows(
+ IllegalArgumentException.class, () -> consumerStreamGrpcClientImpl.requestBlocks(startBlock, endBlock));
+ }
+
+ @Test
+ void completeStreaming_Success() throws InterruptedException {
+ final long startBlock = 0;
+ final long endBlock = 5;
+
+ consumerStreamGrpcClientImpl.requestBlocks(startBlock, endBlock);
+ consumerStreamGrpcClientImpl.completeStreaming();
+ }
+}
diff --git a/simulator/src/test/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamObserverTest.java b/simulator/src/test/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamObserverTest.java
new file mode 100644
index 00000000..dd665d1e
--- /dev/null
+++ b/simulator/src/test/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamObserverTest.java
@@ -0,0 +1,142 @@
+/*
+ * Copyright (C) 2024 Hedera Hashgraph, LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.hedera.block.simulator.grpc.impl;
+
+import static com.hedera.block.simulator.TestUtils.getTestMetrics;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+
+import com.hedera.block.simulator.TestUtils;
+import com.hedera.block.simulator.metrics.MetricsService;
+import com.hedera.block.simulator.metrics.MetricsServiceImpl;
+import com.hedera.block.simulator.metrics.SimulatorMetricTypes.Counter;
+import com.hedera.hapi.block.protoc.BlockItemSet;
+import com.hedera.hapi.block.protoc.SubscribeStreamResponse;
+import com.hedera.hapi.block.protoc.SubscribeStreamResponseCode;
+import com.hedera.hapi.block.stream.output.protoc.BlockHeader;
+import com.hedera.hapi.block.stream.protoc.BlockItem;
+import com.hedera.hapi.block.stream.protoc.BlockProof;
+import com.swirlds.config.api.Configuration;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class ConsumerStreamObserverTest {
+
+ private MetricsService metricsService;
+ private CountDownLatch streamLatch;
+ private List lastKnownStatuses;
+ private ConsumerStreamObserver observer;
+
+ @BeforeEach
+ void setUp() throws IOException {
+ Configuration config = TestUtils.getTestConfiguration();
+
+ metricsService = spy(new MetricsServiceImpl(getTestMetrics(config)));
+ streamLatch = mock(CountDownLatch.class);
+ List lastKnownStatuses = new ArrayList<>();
+
+ observer = new ConsumerStreamObserver(metricsService, streamLatch, lastKnownStatuses);
+ }
+
+ @Test
+ void testConstructorWithNullArguments() {
+ assertThrows(
+ NullPointerException.class, () -> new ConsumerStreamObserver(null, streamLatch, lastKnownStatuses));
+ assertThrows(
+ NullPointerException.class, () -> new ConsumerStreamObserver(metricsService, null, lastKnownStatuses));
+ assertThrows(NullPointerException.class, () -> new ConsumerStreamObserver(metricsService, streamLatch, null));
+ }
+
+ @Test
+ void testOnNextWithStatusResponse() {
+ SubscribeStreamResponse response = SubscribeStreamResponse.newBuilder()
+ .setStatus(SubscribeStreamResponseCode.READ_STREAM_SUCCESS)
+ .build();
+
+ observer.onNext(response);
+
+ verifyNoInteractions(metricsService);
+ verifyNoInteractions(streamLatch);
+ }
+
+ @Test
+ void testOnNextWithBlockItemsResponse() {
+ BlockItem blockItemHeader = BlockItem.newBuilder()
+ .setBlockHeader(BlockHeader.newBuilder().setNumber(0).build())
+ .build();
+ BlockItem blockItemProof = BlockItem.newBuilder()
+ .setBlockProof(BlockProof.newBuilder().setBlock(0).build())
+ .build();
+ BlockItem blockItemProof1 = BlockItem.newBuilder()
+ .setBlockProof(BlockProof.newBuilder().setBlock(1).build())
+ .build();
+
+ BlockItemSet blockItemsSet = BlockItemSet.newBuilder()
+ .addBlockItems(blockItemHeader)
+ .addBlockItems(blockItemProof)
+ .addBlockItems(blockItemProof1)
+ .build();
+
+ SubscribeStreamResponse response = SubscribeStreamResponse.newBuilder()
+ .setBlockItems(blockItemsSet)
+ .build();
+ assertEquals(0, metricsService.get(Counter.LiveBlocksConsumed).get());
+
+ observer.onNext(response);
+
+ assertEquals(2, metricsService.get(Counter.LiveBlocksConsumed).get());
+ verifyNoInteractions(streamLatch);
+ }
+
+ @Test
+ void testOnNextWithUnknownResponseType() {
+ SubscribeStreamResponse response = SubscribeStreamResponse.newBuilder().build();
+
+ IllegalArgumentException exception =
+ assertThrows(IllegalArgumentException.class, () -> observer.onNext(response));
+
+ assertEquals("Unknown response type: RESPONSE_NOT_SET", exception.getMessage());
+ verifyNoInteractions(metricsService);
+ verifyNoInteractions(streamLatch);
+ }
+
+ @Test
+ void testOnError() {
+ Throwable testException = new RuntimeException("Test exception");
+
+ observer.onError(testException);
+
+ verify(streamLatch).countDown();
+ verifyNoInteractions(metricsService);
+ }
+
+ @Test
+ void testOnCompleted() {
+ observer.onCompleted();
+
+ verify(streamLatch).countDown();
+ verifyNoInteractions(metricsService);
+ }
+}
diff --git a/simulator/src/test/java/com/hedera/block/simulator/grpc/impl/PublishStreamGrpcClientImplTest.java b/simulator/src/test/java/com/hedera/block/simulator/grpc/impl/PublishStreamGrpcClientImplTest.java
new file mode 100644
index 00000000..fa98846c
--- /dev/null
+++ b/simulator/src/test/java/com/hedera/block/simulator/grpc/impl/PublishStreamGrpcClientImplTest.java
@@ -0,0 +1,211 @@
+/*
+ * Copyright (C) 2024 Hedera Hashgraph, LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.hedera.block.simulator.grpc.impl;
+
+import static com.hedera.block.simulator.TestUtils.findFreePort;
+import static com.hedera.block.simulator.TestUtils.getTestMetrics;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.when;
+
+import com.hedera.block.simulator.TestUtils;
+import com.hedera.block.simulator.config.data.BlockStreamConfig;
+import com.hedera.block.simulator.config.data.GrpcConfig;
+import com.hedera.block.simulator.grpc.PublishStreamGrpcClient;
+import com.hedera.block.simulator.metrics.MetricsService;
+import com.hedera.block.simulator.metrics.MetricsServiceImpl;
+import com.hedera.hapi.block.protoc.BlockItemSet;
+import com.hedera.hapi.block.protoc.BlockStreamServiceGrpc;
+import com.hedera.hapi.block.protoc.PublishStreamRequest;
+import com.hedera.hapi.block.protoc.PublishStreamResponse;
+import com.hedera.hapi.block.protoc.PublishStreamResponseCode;
+import com.hedera.hapi.block.stream.output.protoc.BlockHeader;
+import com.hedera.hapi.block.stream.protoc.Block;
+import com.hedera.hapi.block.stream.protoc.BlockItem;
+import com.hedera.hapi.block.stream.protoc.BlockProof;
+import com.swirlds.config.api.Configuration;
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
+import io.grpc.stub.StreamObserver;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+class PublishStreamGrpcClientImplTest {
+
+ private MetricsService metricsService;
+ private PublishStreamGrpcClient publishStreamGrpcClient;
+
+ @Mock
+ private GrpcConfig grpcConfig;
+
+ private BlockStreamConfig blockStreamConfig;
+ private AtomicBoolean streamEnabled;
+ private Server server;
+
+ @BeforeEach
+ void setUp() throws IOException {
+ MockitoAnnotations.openMocks(this);
+
+ streamEnabled = new AtomicBoolean(true);
+
+ final int serverPort = findFreePort();
+ server = ServerBuilder.forPort(serverPort)
+ .addService(new BlockStreamServiceGrpc.BlockStreamServiceImplBase() {
+ @Override
+ public StreamObserver publishBlockStream(
+ StreamObserver responseObserver) {
+ return new StreamObserver<>() {
+ private long lastBlockNumber = 0;
+
+ @Override
+ public void onNext(PublishStreamRequest request) {
+ BlockItemSet blockItems = request.getBlockItems();
+ List items = blockItems.getBlockItemsList();
+ // Simulate processing of block items
+ for (BlockItem item : items) {
+ // Assume that the first BlockItem is a BlockHeader
+ if (item.hasBlockHeader()) {
+ lastBlockNumber = item.getBlockHeader().getNumber();
+ }
+ // Assume that the last BlockItem is a BlockProof
+ if (item.hasBlockProof()) {
+ // Send BlockAcknowledgement
+ PublishStreamResponse.Acknowledgement acknowledgement =
+ PublishStreamResponse.Acknowledgement.newBuilder()
+ .setBlockAck(
+ PublishStreamResponse.BlockAcknowledgement.newBuilder()
+ .setBlockNumber(lastBlockNumber)
+ .build())
+ .build();
+ responseObserver.onNext(PublishStreamResponse.newBuilder()
+ .setAcknowledgement(acknowledgement)
+ .build());
+ }
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ // handle onError
+ }
+
+ @Override
+ public void onCompleted() {
+ PublishStreamResponse.EndOfStream endOfStream =
+ PublishStreamResponse.EndOfStream.newBuilder()
+ .setStatus(PublishStreamResponseCode.STREAM_ITEMS_SUCCESS)
+ .setBlockNumber(lastBlockNumber)
+ .build();
+ responseObserver.onNext(PublishStreamResponse.newBuilder()
+ .setStatus(endOfStream)
+ .build());
+ responseObserver.onCompleted();
+ }
+ };
+ }
+ })
+ .build()
+ .start();
+ blockStreamConfig = TestUtils.getTestConfiguration(Map.of("blockStream.blockItemsBatchSize", "2"))
+ .getConfigData(BlockStreamConfig.class);
+
+ Configuration config = TestUtils.getTestConfiguration();
+ metricsService = new MetricsServiceImpl(getTestMetrics(config));
+ streamEnabled = new AtomicBoolean(true);
+
+ when(grpcConfig.serverAddress()).thenReturn("localhost");
+ when(grpcConfig.port()).thenReturn(serverPort);
+
+ publishStreamGrpcClient =
+ new PublishStreamGrpcClientImpl(grpcConfig, blockStreamConfig, metricsService, streamEnabled);
+ }
+
+ @AfterEach
+ void teardown() throws InterruptedException {
+ publishStreamGrpcClient.shutdown();
+
+ if (server != null) {
+ server.shutdown();
+ }
+ }
+
+ @Test
+ public void testInit() {
+ publishStreamGrpcClient.init();
+ // Verify that lastKnownStatuses is cleared
+ assertTrue(publishStreamGrpcClient.getLastKnownStatuses().isEmpty());
+ }
+
+ @Test
+ void testStreamBlockItem_Success() {
+ publishStreamGrpcClient.init();
+
+ BlockItem blockItem = BlockItem.newBuilder()
+ .setBlockHeader(BlockHeader.newBuilder().setNumber(0).build())
+ .build();
+
+ List blockItems = List.of(blockItem);
+
+ final boolean result = publishStreamGrpcClient.streamBlockItem(blockItems);
+ assertTrue(result);
+ }
+
+ @Test
+ void testStreamBlock_Success() throws InterruptedException {
+ publishStreamGrpcClient.init();
+ final int streamedBlocks = 3;
+
+ for (int i = 0; i < streamedBlocks; i++) {
+ BlockItem blockItemHeader = BlockItem.newBuilder()
+ .setBlockHeader(BlockHeader.newBuilder().setNumber(i).build())
+ .build();
+ BlockItem blockItemProof = BlockItem.newBuilder()
+ .setBlockProof(BlockProof.newBuilder().setBlock(i).build())
+ .build();
+ Block block = Block.newBuilder()
+ .addItems(blockItemHeader)
+ .addItems(blockItemProof)
+ .build();
+
+ final boolean result = publishStreamGrpcClient.streamBlock(block);
+ assertTrue(result);
+ }
+
+ // we use simple retry mechanism here, because sometimes server takes some time to receive the stream
+ long retryNumber = 1;
+ long waitTime = 500;
+
+ while (retryNumber < 3) {
+ if (!publishStreamGrpcClient.getLastKnownStatuses().isEmpty()) {
+ break;
+ }
+ Thread.sleep(retryNumber * waitTime);
+ retryNumber++;
+ }
+
+ assertEquals(streamedBlocks, publishStreamGrpcClient.getPublishedBlocks());
+ assertEquals(
+ streamedBlocks, publishStreamGrpcClient.getLastKnownStatuses().size());
+ }
+}
diff --git a/simulator/src/test/java/com/hedera/block/simulator/grpc/PublishStreamObserverTest.java b/simulator/src/test/java/com/hedera/block/simulator/grpc/impl/PublishStreamObserverTest.java
similarity index 98%
rename from simulator/src/test/java/com/hedera/block/simulator/grpc/PublishStreamObserverTest.java
rename to simulator/src/test/java/com/hedera/block/simulator/grpc/impl/PublishStreamObserverTest.java
index 967c3c3e..a41e9c59 100644
--- a/simulator/src/test/java/com/hedera/block/simulator/grpc/PublishStreamObserverTest.java
+++ b/simulator/src/test/java/com/hedera/block/simulator/grpc/impl/PublishStreamObserverTest.java
@@ -14,7 +14,7 @@
* limitations under the License.
*/
-package com.hedera.block.simulator.grpc;
+package com.hedera.block.simulator.grpc.impl;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
diff --git a/simulator/src/test/java/com/hedera/block/simulator/mode/CombinedModeHandlerTest.java b/simulator/src/test/java/com/hedera/block/simulator/mode/CombinedModeHandlerTest.java
index 0a522cf8..b4ed067a 100644
--- a/simulator/src/test/java/com/hedera/block/simulator/mode/CombinedModeHandlerTest.java
+++ b/simulator/src/test/java/com/hedera/block/simulator/mode/CombinedModeHandlerTest.java
@@ -18,22 +18,15 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
-import com.hedera.block.simulator.config.data.BlockStreamConfig;
import org.junit.jupiter.api.Test;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
public class CombinedModeHandlerTest {
- @Mock
- private BlockStreamConfig blockStreamConfig;
-
private CombinedModeHandler combinedModeHandler;
@Test
void testStartThrowsUnsupportedOperationException() {
- MockitoAnnotations.openMocks(this);
- combinedModeHandler = new CombinedModeHandler(blockStreamConfig);
+ combinedModeHandler = new CombinedModeHandler();
assertThrows(UnsupportedOperationException.class, () -> combinedModeHandler.start());
}
diff --git a/simulator/src/test/java/com/hedera/block/simulator/mode/ConsumerModeHandlerTest.java b/simulator/src/test/java/com/hedera/block/simulator/mode/ConsumerModeHandlerTest.java
index df387f21..017e5cd1 100644
--- a/simulator/src/test/java/com/hedera/block/simulator/mode/ConsumerModeHandlerTest.java
+++ b/simulator/src/test/java/com/hedera/block/simulator/mode/ConsumerModeHandlerTest.java
@@ -17,24 +17,68 @@
package com.hedera.block.simulator.mode;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
-import com.hedera.block.simulator.config.data.BlockStreamConfig;
+import com.hedera.block.simulator.grpc.ConsumerStreamGrpcClient;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-public class ConsumerModeHandlerTest {
-
- @Mock
- private BlockStreamConfig blockStreamConfig;
+class ConsumerModeHandlerTest {
+ private ConsumerStreamGrpcClient consumerStreamGrpcClient;
private ConsumerModeHandler consumerModeHandler;
+ @BeforeEach
+ void setUp() {
+ consumerStreamGrpcClient = mock(ConsumerStreamGrpcClient.class);
+
+ consumerModeHandler = new ConsumerModeHandler(consumerStreamGrpcClient);
+ }
+
+ @Test
+ void testConstructorWithNullArguments() {
+ assertThrows(NullPointerException.class, () -> new ConsumerModeHandler(null));
+ }
+
+ @Test
+ void testInit() {
+ consumerModeHandler.init();
+
+ verify(consumerStreamGrpcClient).init();
+ }
+
+ @Test
+ void testStart() throws InterruptedException {
+ consumerModeHandler.start();
+ verify(consumerStreamGrpcClient).requestBlocks(0, 0);
+ }
+
+ @Test
+ void testStart_throwsExceptionDuringConsuming() throws InterruptedException {
+ consumerModeHandler.start();
+
+ doThrow(new InterruptedException("Test exception"))
+ .when(consumerStreamGrpcClient)
+ .requestBlocks(0, 0);
+ assertThrows(InterruptedException.class, () -> consumerModeHandler.start());
+ }
+
+ @Test
+ void testStop() throws InterruptedException {
+ consumerModeHandler.stop();
+
+ verify(consumerStreamGrpcClient).completeStreaming();
+ }
+
@Test
- void testStartThrowsUnsupportedOperationException() {
- MockitoAnnotations.openMocks(this);
- consumerModeHandler = new ConsumerModeHandler(blockStreamConfig);
+ void testStop_throwsExceptionDuringCompleteStreaming() throws InterruptedException {
+ consumerModeHandler.stop();
+ doThrow(new InterruptedException("Test exception"))
+ .when(consumerStreamGrpcClient)
+ .completeStreaming();
- assertThrows(UnsupportedOperationException.class, () -> consumerModeHandler.start());
+ assertThrows(InterruptedException.class, () -> consumerModeHandler.stop());
}
}
diff --git a/stream/src/main/java/module-info.java b/stream/src/main/java/module-info.java
index f0e605cf..403a6aa4 100644
--- a/stream/src/main/java/module-info.java
+++ b/stream/src/main/java/module-info.java
@@ -67,6 +67,8 @@
exports com.hedera.hapi.platform.state;
exports com.hedera.hapi.node.state.roster;
exports com.hedera.hapi.block.stream.schema;
+ exports com.hedera.hapi.platform.state.legacy to
+ com.google.protobuf;
requires transitive com.google.common;
requires transitive com.google.protobuf;