From b6c93e6ea292b2e68de6ba45165ee644aaabb34f Mon Sep 17 00:00:00 2001 From: Georgi Lazarov Date: Tue, 29 Oct 2024 17:43:33 +0200 Subject: [PATCH] feat: Move out concrete working implementation and add working modes (#307) Signed-off-by: georgi-l95 --- .../block/simulator/BlockStreamSimulator.java | 2 +- .../simulator/BlockStreamSimulatorApp.java | 117 +++------- .../com/hedera/block/simulator/Constants.java | 5 + .../config/data/BlockStreamConfig.java | 16 ++ .../simulator/config/types/SimulatorMode.java | 33 +++ .../grpc/PublishStreamGrpcClient.java | 10 + .../grpc/PublishStreamGrpcClientImpl.java | 46 ++-- .../simulator/mode/CombinedModeHandler.java | 59 +++++ .../simulator/mode/ConsumerModeHandler.java | 56 +++++ .../simulator/mode/PublisherModeHandler.java | 136 +++++++++++ .../simulator/mode/SimulatorModeHandler.java | 51 ++++ .../simulator/BlockStreamSimulatorTest.java | 220 +++++++++--------- .../config/data/BlockStreamConfigTest.java | 54 ++--- .../grpc/PublishStreamGrpcClientImplTest.java | 35 ++- .../mode/CombinedModeHandlerTest.java | 40 ++++ .../mode/ConsumerModeHandlerTest.java | 40 ++++ .../mode/PublisherModeHandlerTest.java | 157 +++++++++++++ .../com/hedera/block/suites/BaseSuite.java | 54 ++--- .../NegativeServerAvailabilityTests.java | 4 +- .../PositiveDataPersistenceTests.java | 86 ++++++- 20 files changed, 943 insertions(+), 278 deletions(-) create mode 100644 simulator/src/main/java/com/hedera/block/simulator/config/types/SimulatorMode.java create mode 100644 simulator/src/main/java/com/hedera/block/simulator/mode/CombinedModeHandler.java create mode 100644 simulator/src/main/java/com/hedera/block/simulator/mode/ConsumerModeHandler.java create mode 100644 simulator/src/main/java/com/hedera/block/simulator/mode/PublisherModeHandler.java create mode 100644 simulator/src/main/java/com/hedera/block/simulator/mode/SimulatorModeHandler.java create mode 100644 simulator/src/test/java/com/hedera/block/simulator/mode/CombinedModeHandlerTest.java create mode 100644 simulator/src/test/java/com/hedera/block/simulator/mode/ConsumerModeHandlerTest.java create mode 100644 simulator/src/test/java/com/hedera/block/simulator/mode/PublisherModeHandlerTest.java diff --git a/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulator.java b/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulator.java index 03b4571fd..a4a63c9dd 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulator.java +++ b/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulator.java @@ -47,7 +47,7 @@ private BlockStreamSimulator() {} public static void main(final String[] args) throws IOException, InterruptedException, BlockSimulatorParsingException { - LOGGER.log(INFO, "Starting Block Stream Simulator"); + LOGGER.log(INFO, "Starting Block Stream Simulator!"); final ConfigurationBuilder configurationBuilder = ConfigurationBuilder.create() .withSource(SystemEnvironmentConfigSource.getInstance()) diff --git a/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulatorApp.java b/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulatorApp.java index e7b04ef0b..ebff3b538 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulatorApp.java +++ b/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulatorApp.java @@ -16,12 +16,17 @@ package com.hedera.block.simulator; +import static java.util.Objects.requireNonNull; + import com.hedera.block.simulator.config.data.BlockStreamConfig; -import com.hedera.block.simulator.config.types.StreamingMode; +import com.hedera.block.simulator.config.types.SimulatorMode; import com.hedera.block.simulator.exception.BlockSimulatorParsingException; import com.hedera.block.simulator.generator.BlockStreamManager; import com.hedera.block.simulator.grpc.PublishStreamGrpcClient; -import com.hedera.hapi.block.stream.Block; +import com.hedera.block.simulator.mode.CombinedModeHandler; +import com.hedera.block.simulator.mode.ConsumerModeHandler; +import com.hedera.block.simulator.mode.PublisherModeHandler; +import com.hedera.block.simulator.mode.SimulatorModeHandler; import com.swirlds.config.api.Configuration; import edu.umd.cs.findbugs.annotations.NonNull; import java.io.IOException; @@ -31,16 +36,9 @@ /** BlockStream Simulator App */ public class BlockStreamSimulatorApp { - private static final System.Logger LOGGER = - System.getLogger(BlockStreamSimulatorApp.class.getName()); - - private final BlockStreamManager blockStreamManager; + private final System.Logger LOGGER = System.getLogger(getClass().getName()); private final PublishStreamGrpcClient publishStreamGrpcClient; - private final BlockStreamConfig blockStreamConfig; - private final StreamingMode streamingMode; - - private final int delayBetweenBlockItems; - private final int millisecondsPerBlock; + private final SimulatorModeHandler simulatorModeHandler; private final AtomicBoolean isRunning = new AtomicBoolean(false); /** @@ -55,14 +53,20 @@ public BlockStreamSimulatorApp( @NonNull Configuration configuration, @NonNull BlockStreamManager blockStreamManager, @NonNull PublishStreamGrpcClient publishStreamGrpcClient) { - this.blockStreamManager = blockStreamManager; - this.publishStreamGrpcClient = publishStreamGrpcClient; - - blockStreamConfig = configuration.getConfigData(BlockStreamConfig.class); - - streamingMode = blockStreamConfig.streamingMode(); - millisecondsPerBlock = blockStreamConfig.millisecondsPerBlock(); - delayBetweenBlockItems = blockStreamConfig.delayBetweenBlockItems(); + requireNonNull(blockStreamManager); + + this.publishStreamGrpcClient = requireNonNull(publishStreamGrpcClient); + final BlockStreamConfig blockStreamConfig = + requireNonNull(configuration.getConfigData(BlockStreamConfig.class)); + + final SimulatorMode simulatorMode = blockStreamConfig.simulatorMode(); + switch (simulatorMode) { + case PUBLISHER -> simulatorModeHandler = + new PublisherModeHandler(blockStreamConfig, publishStreamGrpcClient, blockStreamManager); + case CONSUMER -> simulatorModeHandler = new ConsumerModeHandler(blockStreamConfig); + case BOTH -> simulatorModeHandler = new CombinedModeHandler(blockStreamConfig); + default -> throw new IllegalArgumentException("Unknown SimulatorMode: " + simulatorMode); + } } /** @@ -73,77 +77,11 @@ public BlockStreamSimulatorApp( * @throws IOException if an I/O error occurs */ public void start() throws InterruptedException, BlockSimulatorParsingException, IOException { - + LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator started initializing components..."); + publishStreamGrpcClient.init(); isRunning.set(true); - LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator has started"); - - if (streamingMode == StreamingMode.MILLIS_PER_BLOCK) { - millisPerBlockStreaming(); - } else { - constantRateStreaming(); - } - - LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator has stopped"); - } - private void millisPerBlockStreaming() - throws IOException, InterruptedException, BlockSimulatorParsingException { - - final long secondsPerBlockNanos = millisecondsPerBlock * 1_000_000L; - - Block nextBlock = blockStreamManager.getNextBlock(); - while (nextBlock != null) { - long startTime = System.nanoTime(); - publishStreamGrpcClient.streamBlock(nextBlock); - long elapsedTime = System.nanoTime() - startTime; - long timeToDelay = secondsPerBlockNanos - elapsedTime; - if (timeToDelay > 0) { - Thread.sleep(timeToDelay / 1_000_000, (int) (timeToDelay % 1_000_000)); - } else { - LOGGER.log( - System.Logger.Level.WARNING, - "Block Server is running behind. Streaming took: " - + (elapsedTime / 1_000_000) - + "ms - Longer than max expected of: " - + millisecondsPerBlock - + " milliseconds"); - } - nextBlock = blockStreamManager.getNextBlock(); - } - LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator has stopped"); - } - - private void constantRateStreaming() - throws InterruptedException, IOException, BlockSimulatorParsingException { - int delayMSBetweenBlockItems = delayBetweenBlockItems / 1_000_000; - int delayNSBetweenBlockItems = delayBetweenBlockItems % 1_000_000; - boolean streamBlockItem = true; - int blockItemsStreamed = 0; - - while (streamBlockItem) { - // get block - Block block = blockStreamManager.getNextBlock(); - - if (block == null) { - LOGGER.log( - System.Logger.Level.INFO, - "Block Stream Simulator has reached the end of the block items"); - break; - } - - publishStreamGrpcClient.streamBlock(block); - blockItemsStreamed += block.items().size(); - - Thread.sleep(delayMSBetweenBlockItems, delayNSBetweenBlockItems); - - if (blockItemsStreamed >= blockStreamConfig.maxBlockItemsToStream()) { - LOGGER.log( - System.Logger.Level.INFO, - "Block Stream Simulator has reached the maximum number of block items to" - + " stream"); - streamBlockItem = false; - } - } + simulatorModeHandler.start(); } /** @@ -155,8 +93,9 @@ public boolean isRunning() { return isRunning.get(); } - /** Stops the block stream simulator. */ + /** Stops the Block Stream Simulator and closes off all grpc channels. */ public void stop() { + publishStreamGrpcClient.shutdown(); isRunning.set(false); LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator has stopped"); } diff --git a/simulator/src/main/java/com/hedera/block/simulator/Constants.java b/simulator/src/main/java/com/hedera/block/simulator/Constants.java index 0ab92a2f2..e26666033 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/Constants.java +++ b/simulator/src/main/java/com/hedera/block/simulator/Constants.java @@ -24,6 +24,11 @@ public final class Constants { /** postfix for gzip files */ public static final String GZ_EXTENSION = ".gz"; + /** + * Used for converting nanoseconds to milliseconds and vice versa + */ + public static final int NANOS_PER_MILLI = 1_000_000; + /** Constructor to prevent instantiation. this is only a utility class */ private Constants() {} } diff --git a/simulator/src/main/java/com/hedera/block/simulator/config/data/BlockStreamConfig.java b/simulator/src/main/java/com/hedera/block/simulator/config/data/BlockStreamConfig.java index d2e6a9860..c0dea3bc1 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/config/data/BlockStreamConfig.java +++ b/simulator/src/main/java/com/hedera/block/simulator/config/data/BlockStreamConfig.java @@ -16,6 +16,7 @@ package com.hedera.block.simulator.config.data; +import com.hedera.block.simulator.config.types.SimulatorMode; import com.hedera.block.simulator.config.types.StreamingMode; import com.swirlds.config.api.ConfigData; import com.swirlds.config.api.ConfigProperty; @@ -23,6 +24,7 @@ /** * Defines the configuration data for the block stream in the Hedera Block Simulator. * + * @param simulatorMode the mode of the simulator, in terms of publishing, consuming or both * @param delayBetweenBlockItems the delay in microseconds between streaming each block item * @param maxBlockItemsToStream the maximum number of block items to stream before stopping * @param streamingMode the mode of streaming for the block stream (e.g., time-based, count-based) @@ -31,6 +33,7 @@ */ @ConfigData("blockStream") public record BlockStreamConfig( + @ConfigProperty(defaultValue = "PUBLISHER") SimulatorMode simulatorMode, @ConfigProperty(defaultValue = "1_500_000") int delayBetweenBlockItems, @ConfigProperty(defaultValue = "100_000") int maxBlockItemsToStream, @ConfigProperty(defaultValue = "MILLIS_PER_BLOCK") StreamingMode streamingMode, @@ -50,6 +53,7 @@ public static Builder builder() { * A builder for creating instances of {@link BlockStreamConfig}. */ public static class Builder { + private SimulatorMode simulatorMode = SimulatorMode.PUBLISHER; private int delayBetweenBlockItems = 1_500_000; private int maxBlockItemsToStream = 10_000; private StreamingMode streamingMode = StreamingMode.MILLIS_PER_BLOCK; @@ -63,6 +67,17 @@ public Builder() { // Default constructor } + /** + * Sets the simulator mode for the block stream. + * + * @param simulatorMode the {@link SimulatorMode} to use + * @return this {@code Builder} instance + */ + public Builder simulatorMode(SimulatorMode simulatorMode) { + this.simulatorMode = simulatorMode; + return this; + } + /** * Sets the delay between streaming each block item. * @@ -125,6 +140,7 @@ public Builder blockItemsBatchSize(int blockItemsBatchSize) { */ public BlockStreamConfig build() { return new BlockStreamConfig( + simulatorMode, delayBetweenBlockItems, maxBlockItemsToStream, streamingMode, diff --git a/simulator/src/main/java/com/hedera/block/simulator/config/types/SimulatorMode.java b/simulator/src/main/java/com/hedera/block/simulator/config/types/SimulatorMode.java new file mode 100644 index 000000000..4033bc2e7 --- /dev/null +++ b/simulator/src/main/java/com/hedera/block/simulator/config/types/SimulatorMode.java @@ -0,0 +1,33 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.simulator.config.types; + +/** The SimulatorMode enum defines the work modes of the block stream simulator. */ +public enum SimulatorMode { + /** + * Indicates a work mode in which the simulator is working as both consumer and publisher. + */ + BOTH, + /** + * Indicates a work mode in which the simulator is working in consumer mode. + */ + CONSUMER, + /** + * Indicates a work mode in which the simulator is working in publisher mode. + */ + PUBLISHER +} diff --git a/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClient.java b/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClient.java index 4795acc7f..6b42f180d 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClient.java +++ b/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClient.java @@ -24,6 +24,11 @@ * The PublishStreamGrpcClient interface provides the methods to stream the block and block item. */ public interface PublishStreamGrpcClient { + /** + * Initialize, opens a gRPC channel and creates the needed stubs with the passed configuration. + */ + void init(); + /** * Streams the block item. * @@ -39,4 +44,9 @@ public interface PublishStreamGrpcClient { * @return true if the block is streamed successfully, false otherwise */ boolean streamBlock(Block block); + + /** + * Shutdowns the channel. + */ + void shutdown(); } diff --git a/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImpl.java b/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImpl.java index 1d192076e..485d478a2 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImpl.java +++ b/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImpl.java @@ -16,6 +16,8 @@ package com.hedera.block.simulator.grpc; +import static java.util.Objects.requireNonNull; + import com.hedera.block.common.utils.ChunkUtils; import com.hedera.block.simulator.Translator; import com.hedera.block.simulator.config.data.BlockStreamConfig; @@ -37,9 +39,10 @@ */ public class PublishStreamGrpcClientImpl implements PublishStreamGrpcClient { - private final BlockStreamServiceGrpc.BlockStreamServiceStub stub; - private final StreamObserver requestStreamObserver; + private StreamObserver requestStreamObserver; private final BlockStreamConfig blockStreamConfig; + private final GrpcConfig grpcConfig; + private ManagedChannel channel; /** * Creates a new PublishStreamGrpcClientImpl instance. @@ -49,15 +52,22 @@ public class PublishStreamGrpcClientImpl implements PublishStreamGrpcClient { */ @Inject public PublishStreamGrpcClientImpl( - @NonNull GrpcConfig grpcConfig, @NonNull BlockStreamConfig blockStreamConfig) { - ManagedChannel channel = - ManagedChannelBuilder.forAddress(grpcConfig.serverAddress(), grpcConfig.port()) - .usePlaintext() - .build(); - stub = BlockStreamServiceGrpc.newStub(channel); + @NonNull final GrpcConfig grpcConfig, @NonNull final BlockStreamConfig blockStreamConfig) { + this.grpcConfig = requireNonNull(grpcConfig); + this.blockStreamConfig = requireNonNull(blockStreamConfig); + } + + /** + * Initialize the channel and stub for publishBlockStream with the desired configuration. + */ + @Override + public void init() { + channel = ManagedChannelBuilder.forAddress(grpcConfig.serverAddress(), grpcConfig.port()) + .usePlaintext() + .build(); + BlockStreamServiceGrpc.BlockStreamServiceStub stub = BlockStreamServiceGrpc.newStub(channel); PublishStreamObserver publishStreamObserver = new PublishStreamObserver(); requestStreamObserver = stub.publishBlockStream(publishStreamObserver); - this.blockStreamConfig = blockStreamConfig; } /** @@ -72,8 +82,9 @@ public boolean streamBlockItem(List blockItems) { blockItemsProtoc.add(Translator.fromPbj(blockItem)); } - requestStreamObserver.onNext( - PublishStreamRequest.newBuilder().addAllBlockItems(blockItemsProtoc).build()); + requestStreamObserver.onNext(PublishStreamRequest.newBuilder() + .addAllBlockItems(blockItemsProtoc) + .build()); return true; } @@ -91,12 +102,17 @@ public boolean streamBlock(Block block) { List> streamingBatches = ChunkUtils.chunkify(blockItemsProtoc, blockStreamConfig.blockItemsBatchSize()); - for (List streamingBatch : - streamingBatches) { - requestStreamObserver.onNext( - PublishStreamRequest.newBuilder().addAllBlockItems(streamingBatch).build()); + for (List streamingBatch : streamingBatches) { + requestStreamObserver.onNext(PublishStreamRequest.newBuilder() + .addAllBlockItems(streamingBatch) + .build()); } return true; } + + @Override + public void shutdown() { + channel.shutdown(); + } } diff --git a/simulator/src/main/java/com/hedera/block/simulator/mode/CombinedModeHandler.java b/simulator/src/main/java/com/hedera/block/simulator/mode/CombinedModeHandler.java new file mode 100644 index 000000000..4a4353e1c --- /dev/null +++ b/simulator/src/main/java/com/hedera/block/simulator/mode/CombinedModeHandler.java @@ -0,0 +1,59 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.simulator.mode; + +import static java.util.Objects.requireNonNull; + +import com.hedera.block.simulator.config.data.BlockStreamConfig; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * The {@code CombinedModeHandler} class implements the {@link SimulatorModeHandler} interface + * and provides the behavior for a mode where both consuming and publishing of block data + * occur simultaneously. + * + *

This mode handles dual operations in the block streaming process, utilizing the + * {@link BlockStreamConfig} for configuration settings. It is designed for scenarios where + * the simulator needs to handle both the consumption and publication of blocks in parallel. + * + *

For now, the actual start behavior is not implemented, as indicated by the + * {@link UnsupportedOperationException}. + */ +public class CombinedModeHandler implements SimulatorModeHandler { + private final BlockStreamConfig blockStreamConfig; + + /** + * Constructs a new {@code CombinedModeHandler} with the specified block stream configuration. + * + * @param blockStreamConfig the configuration data for managing block streams + */ + public CombinedModeHandler(@NonNull final BlockStreamConfig blockStreamConfig) { + this.blockStreamConfig = requireNonNull(blockStreamConfig); + } + + /** + * Starts the simulator in combined mode, handling both consumption and publication + * of block stream. However, this method is currently not implemented, and will throw + * an {@link UnsupportedOperationException}. + * + * @throws UnsupportedOperationException as the method is not yet implemented + */ + @Override + public void start() { + throw new UnsupportedOperationException(); + } +} diff --git a/simulator/src/main/java/com/hedera/block/simulator/mode/ConsumerModeHandler.java b/simulator/src/main/java/com/hedera/block/simulator/mode/ConsumerModeHandler.java new file mode 100644 index 000000000..50a66ad27 --- /dev/null +++ b/simulator/src/main/java/com/hedera/block/simulator/mode/ConsumerModeHandler.java @@ -0,0 +1,56 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.simulator.mode; + +import static java.util.Objects.requireNonNull; + +import com.hedera.block.simulator.config.data.BlockStreamConfig; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * The {@code ConsumerModeHandler} class implements the {@link SimulatorModeHandler} interface + * and provides the behavior for a mode where only consumption of block data + * occurs. + * + *

This mode handles single operation in the block streaming process, utilizing the + * {@link BlockStreamConfig} for configuration settings. It is designed for scenarios where + * the simulator needs to handle the consumption of blocks. + * + *

For now, the actual start behavior is not implemented, as indicated by the + * {@link UnsupportedOperationException}. + */ +public class ConsumerModeHandler implements SimulatorModeHandler { + + private final BlockStreamConfig blockStreamConfig; + + /** + * Constructs a new {@code ConsumerModeHandler} with the specified block stream configuration. + * + * @param blockStreamConfig the configuration data for managing block streams + */ + public ConsumerModeHandler(@NonNull final BlockStreamConfig blockStreamConfig) { + this.blockStreamConfig = requireNonNull(blockStreamConfig); + } + + /** + * Starts the simulator and initiate streaming, depending on the working mode. + */ + @Override + public void start() { + throw new UnsupportedOperationException(); + } +} diff --git a/simulator/src/main/java/com/hedera/block/simulator/mode/PublisherModeHandler.java b/simulator/src/main/java/com/hedera/block/simulator/mode/PublisherModeHandler.java new file mode 100644 index 000000000..0ec4fc05e --- /dev/null +++ b/simulator/src/main/java/com/hedera/block/simulator/mode/PublisherModeHandler.java @@ -0,0 +1,136 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.simulator.mode; + +import static com.hedera.block.simulator.Constants.NANOS_PER_MILLI; +import static java.util.Objects.requireNonNull; + +import com.hedera.block.simulator.config.data.BlockStreamConfig; +import com.hedera.block.simulator.config.types.StreamingMode; +import com.hedera.block.simulator.exception.BlockSimulatorParsingException; +import com.hedera.block.simulator.generator.BlockStreamManager; +import com.hedera.block.simulator.grpc.PublishStreamGrpcClient; +import com.hedera.hapi.block.stream.Block; +import edu.umd.cs.findbugs.annotations.NonNull; +import java.io.IOException; + +/** + * The {@code PublisherModeHandler} class implements the {@link SimulatorModeHandler} interface + * and provides the behavior for a mode where only publishing of block data + * occurs. + * + *

This mode handles single operation in the block streaming process, utilizing the + * {@link BlockStreamConfig} for configuration settings. It is designed for scenarios where + * the simulator needs to handle publication of blocks. + */ +public class PublisherModeHandler implements SimulatorModeHandler { + private final System.Logger LOGGER = System.getLogger(getClass().getName()); + private final BlockStreamManager blockStreamManager; + private final BlockStreamConfig blockStreamConfig; + private final PublishStreamGrpcClient publishStreamGrpcClient; + private final StreamingMode streamingMode; + private final int delayBetweenBlockItems; + private final int millisecondsPerBlock; + + /** + * Constructs a new {@code PublisherModeHandler} with the specified block stream configuration and publisher client. + * + * @param blockStreamConfig the configuration data for managing block streams + * @param publishStreamGrpcClient the grpc client used for streaming blocks + * @param blockStreamManager the block stream manager, responsible for generating blocks + */ + public PublisherModeHandler( + @NonNull final BlockStreamConfig blockStreamConfig, + @NonNull final PublishStreamGrpcClient publishStreamGrpcClient, + @NonNull final BlockStreamManager blockStreamManager) { + this.blockStreamConfig = requireNonNull(blockStreamConfig); + this.publishStreamGrpcClient = requireNonNull(publishStreamGrpcClient); + this.blockStreamManager = requireNonNull(blockStreamManager); + + streamingMode = blockStreamConfig.streamingMode(); + delayBetweenBlockItems = blockStreamConfig.delayBetweenBlockItems(); + millisecondsPerBlock = blockStreamConfig.millisecondsPerBlock(); + } + + /** + * Starts the simulator and initiate streaming, depending on the working mode. + */ + @Override + public void start() throws BlockSimulatorParsingException, IOException, InterruptedException { + if (streamingMode == StreamingMode.MILLIS_PER_BLOCK) { + millisPerBlockStreaming(); + } else { + constantRateStreaming(); + } + LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator has stopped streaming."); + } + + private void millisPerBlockStreaming() throws IOException, InterruptedException, BlockSimulatorParsingException { + + final long secondsPerBlockNanos = (long) millisecondsPerBlock * NANOS_PER_MILLI; + + Block nextBlock = blockStreamManager.getNextBlock(); + while (nextBlock != null) { + long startTime = System.nanoTime(); + publishStreamGrpcClient.streamBlock(nextBlock); + long elapsedTime = System.nanoTime() - startTime; + long timeToDelay = secondsPerBlockNanos - elapsedTime; + if (timeToDelay > 0) { + Thread.sleep(timeToDelay / NANOS_PER_MILLI, (int) (timeToDelay % NANOS_PER_MILLI)); + } else { + LOGGER.log( + System.Logger.Level.WARNING, + "Block Server is running behind. Streaming took: " + + (elapsedTime / 1_000_000) + + "ms - Longer than max expected of: " + + millisecondsPerBlock + + " milliseconds"); + } + nextBlock = blockStreamManager.getNextBlock(); + } + LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator has stopped"); + } + + private void constantRateStreaming() throws InterruptedException, IOException, BlockSimulatorParsingException { + int delayMSBetweenBlockItems = delayBetweenBlockItems / NANOS_PER_MILLI; + int delayNSBetweenBlockItems = delayBetweenBlockItems % NANOS_PER_MILLI; + boolean streamBlockItem = true; + int blockItemsStreamed = 0; + + while (streamBlockItem) { + // get block + Block block = blockStreamManager.getNextBlock(); + + if (block == null) { + LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator has reached the end of the block items"); + break; + } + + publishStreamGrpcClient.streamBlock(block); + blockItemsStreamed += block.items().size(); + + Thread.sleep(delayMSBetweenBlockItems, delayNSBetweenBlockItems); + + if (blockItemsStreamed >= blockStreamConfig.maxBlockItemsToStream()) { + LOGGER.log( + System.Logger.Level.INFO, + "Block Stream Simulator has reached the maximum number of block items to" + " stream"); + streamBlockItem = false; + } + } + } +} diff --git a/simulator/src/main/java/com/hedera/block/simulator/mode/SimulatorModeHandler.java b/simulator/src/main/java/com/hedera/block/simulator/mode/SimulatorModeHandler.java new file mode 100644 index 000000000..c80685600 --- /dev/null +++ b/simulator/src/main/java/com/hedera/block/simulator/mode/SimulatorModeHandler.java @@ -0,0 +1,51 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.simulator.mode; + +import com.hedera.block.simulator.exception.BlockSimulatorParsingException; +import java.io.IOException; + +/** + * The {@code SimulatorModeHandler} interface defines the contract for implementing different + * working modes of the Block Stream Simulator. Implementations of this interface handle + * specific behaviors for starting the simulator and managing the streaming process, + * depending on the selected mode. + * + *

Examples of working modes include: + *

    + *
  • Consumer mode: The simulator consumes data from the block stream.
  • + *
  • Publisher mode: The simulator publishes data to the block stream.
  • + *
  • Combined mode: The simulator handles both consuming and publishing.
  • + *
+ * + *

The {@code SimulatorModeHandler} is responsible for managing the simulator lifecycle, + * starting and stopping the streaming of blocks and handling any exceptions that may arise + * during the process. + */ +public interface SimulatorModeHandler { + + /** + * Starts the simulator and initiates the streaming process, based on the + * configuration. The behavior + * of this method depends on the specific working mode (e.g., consumer, publisher, both). + * + * @throws BlockSimulatorParsingException if an error occurs while parsing blocks + * @throws IOException if an I/O error occurs during block streaming + * @throws InterruptedException if the thread running the simulator is interrupted + */ + void start() throws BlockSimulatorParsingException, IOException, InterruptedException; +} diff --git a/simulator/src/test/java/com/hedera/block/simulator/BlockStreamSimulatorTest.java b/simulator/src/test/java/com/hedera/block/simulator/BlockStreamSimulatorTest.java index 0c0c54436..72b8a40a6 100644 --- a/simulator/src/test/java/com/hedera/block/simulator/BlockStreamSimulatorTest.java +++ b/simulator/src/test/java/com/hedera/block/simulator/BlockStreamSimulatorTest.java @@ -17,13 +17,18 @@ package com.hedera.block.simulator; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import com.hedera.block.simulator.config.data.BlockStreamConfig; import com.hedera.block.simulator.exception.BlockSimulatorParsingException; import com.hedera.block.simulator.generator.BlockStreamManager; import com.hedera.block.simulator.grpc.PublishStreamGrpcClient; +import com.hedera.block.simulator.mode.PublisherModeHandler; import com.hedera.hapi.block.stream.Block; import com.hedera.hapi.block.stream.BlockItem; import com.hedera.hapi.block.stream.output.BlockHeader; @@ -41,32 +46,26 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; -import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; @ExtendWith(MockitoExtension.class) class BlockStreamSimulatorTest { - @Mock private BlockStreamManager blockStreamManager; + @Mock + private BlockStreamManager blockStreamManager; - @Mock private PublishStreamGrpcClient publishStreamGrpcClient; + @Mock + private PublishStreamGrpcClient publishStreamGrpcClient; private BlockStreamSimulatorApp blockStreamSimulator; @BeforeEach void setUp() throws IOException { - Configuration configuration = - TestUtils.getTestConfiguration( - Map.of( - "blockStream.maxBlockItemsToStream", - "100", - "blockStream.streamingMode", - "CONSTANT_RATE")); - - blockStreamSimulator = - new BlockStreamSimulatorApp( - configuration, blockStreamManager, publishStreamGrpcClient); + Configuration configuration = TestUtils.getTestConfiguration( + Map.of("blockStream.maxBlockItemsToStream", "100", "blockStream.streamingMode", "CONSTANT_RATE")); + + blockStreamSimulator = new BlockStreamSimulatorApp(configuration, blockStreamManager, publishStreamGrpcClient); } @AfterEach @@ -75,44 +74,38 @@ void tearDown() { } @Test - void start_logsStartedMessage() - throws InterruptedException, BlockSimulatorParsingException, IOException { + void start_logsStartedMessage() throws InterruptedException, BlockSimulatorParsingException, IOException { blockStreamSimulator.start(); assertTrue(blockStreamSimulator.isRunning()); } @Test - void start_constantRateStreaming() - throws InterruptedException, BlockSimulatorParsingException, IOException { + void start_constantRateStreaming() throws InterruptedException, BlockSimulatorParsingException, IOException { - BlockItem blockItem = - BlockItem.newBuilder() - .blockHeader(BlockHeader.newBuilder().number(1L).build()) - .build(); + BlockItem blockItem = BlockItem.newBuilder() + .blockHeader(BlockHeader.newBuilder().number(1L).build()) + .build(); Block block1 = Block.newBuilder().items(blockItem).build(); Block block2 = Block.newBuilder().items(blockItem, blockItem, blockItem).build(); - BlockStreamManager blockStreamManager = Mockito.mock(BlockStreamManager.class); + BlockStreamManager blockStreamManager = mock(BlockStreamManager.class); when(blockStreamManager.getNextBlock()).thenReturn(block1, block2, null); - Configuration configuration = - TestUtils.getTestConfiguration( - Map.of( - "blockStream.maxBlockItemsToStream", - "2", - "generator.managerImplementation", - "BlockAsFileLargeDataSets", - "generator.rootPath", - getAbsoluteFolder("src/test/resources/block-0.0.3-blk/"), - "blockStream.streamingMode", - "CONSTANT_RATE", - "blockStream.blockItemsBatchSize", - "2")); + Configuration configuration = TestUtils.getTestConfiguration(Map.of( + "blockStream.maxBlockItemsToStream", + "2", + "generator.managerImplementation", + "BlockAsFileLargeDataSets", + "generator.rootPath", + getAbsoluteFolder("src/test/resources/block-0.0.3-blk/"), + "blockStream.streamingMode", + "CONSTANT_RATE", + "blockStream.blockItemsBatchSize", + "2")); BlockStreamSimulatorApp blockStreamSimulator = - new BlockStreamSimulatorApp( - configuration, blockStreamManager, publishStreamGrpcClient); + new BlockStreamSimulatorApp(configuration, blockStreamManager, publishStreamGrpcClient); blockStreamSimulator.start(); assertTrue(blockStreamSimulator.isRunning()); @@ -125,34 +118,30 @@ private String getAbsoluteFolder(String relativePath) { @Test void stop_doesNotThrowException() { assertDoesNotThrow(() -> blockStreamSimulator.stop()); + assertFalse(blockStreamSimulator.isRunning()); } @Test - void start_millisPerBlockStreaming() - throws InterruptedException, IOException, BlockSimulatorParsingException { - BlockStreamManager blockStreamManager = Mockito.mock(BlockStreamManager.class); - BlockItem blockItem = - BlockItem.newBuilder() - .blockHeader(BlockHeader.newBuilder().number(1L).build()) - .build(); + void start_millisPerBlockStreaming() throws InterruptedException, IOException, BlockSimulatorParsingException { + BlockStreamManager blockStreamManager = mock(BlockStreamManager.class); + BlockItem blockItem = BlockItem.newBuilder() + .blockHeader(BlockHeader.newBuilder().number(1L).build()) + .build(); Block block = Block.newBuilder().items(blockItem).build(); when(blockStreamManager.getNextBlock()).thenReturn(block, block, null); - Configuration configuration = - TestUtils.getTestConfiguration( - Map.of( - "blockStream.maxBlockItemsToStream", - "2", - "generator.managerImplementation", - "BlockAsFileLargeDataSets", - "generator.rootPath", - getAbsoluteFolder("src/test/resources/block-0.0.3-blk/"), - "blockStream.streamingMode", - "MILLIS_PER_BLOCK")); + Configuration configuration = TestUtils.getTestConfiguration(Map.of( + "blockStream.maxBlockItemsToStream", + "2", + "generator.managerImplementation", + "BlockAsFileLargeDataSets", + "generator.rootPath", + getAbsoluteFolder("src/test/resources/block-0.0.3-blk/"), + "blockStream.streamingMode", + "MILLIS_PER_BLOCK")); BlockStreamSimulatorApp blockStreamSimulator = - new BlockStreamSimulatorApp( - configuration, blockStreamManager, publishStreamGrpcClient); + new BlockStreamSimulatorApp(configuration, blockStreamManager, publishStreamGrpcClient); blockStreamSimulator.start(); assertTrue(blockStreamSimulator.isRunning()); @@ -163,79 +152,94 @@ void start_millisPerSecond_streamingLagVerifyWarnLog() throws InterruptedException, IOException, BlockSimulatorParsingException { List logRecords = captureLogs(); - BlockStreamManager blockStreamManager = Mockito.mock(BlockStreamManager.class); - BlockItem blockItem = - BlockItem.newBuilder() - .blockHeader(BlockHeader.newBuilder().number(1L).build()) - .build(); + BlockStreamManager blockStreamManager = mock(BlockStreamManager.class); + BlockItem blockItem = BlockItem.newBuilder() + .blockHeader(BlockHeader.newBuilder().number(1L).build()) + .build(); Block block = Block.newBuilder().items(blockItem).build(); when(blockStreamManager.getNextBlock()).thenReturn(block, block, null); - PublishStreamGrpcClient publishStreamGrpcClient = - Mockito.mock(PublishStreamGrpcClient.class); + PublishStreamGrpcClient publishStreamGrpcClient = mock(PublishStreamGrpcClient.class); // simulate that the first block takes 15ms to stream, when the limit is 10, to force to go // over WARN Path. when(publishStreamGrpcClient.streamBlock(any())) - .thenAnswer( - invocation -> { - Thread.sleep(15); - return true; - }) + .thenAnswer(invocation -> { + Thread.sleep(15); + return true; + }) .thenReturn(true); - Configuration configuration = - TestUtils.getTestConfiguration( - Map.of( - "generator.managerImplementation", - "BlockAsFileBlockStreamManager", - "generator.rootPath", - getAbsoluteFolder("src/test/resources/block-0.0.3-blk/"), - "blockStream.maxBlockItemsToStream", - "2", - "blockStream.streamingMode", - "MILLIS_PER_BLOCK", - "blockStream.millisecondsPerBlock", - "10", - "blockStream.blockItemsBatchSize", - "1")); + Configuration configuration = TestUtils.getTestConfiguration(Map.of( + "generator.managerImplementation", + "BlockAsFileBlockStreamManager", + "generator.rootPath", + getAbsoluteFolder("src/test/resources/block-0.0.3-blk/"), + "blockStream.maxBlockItemsToStream", + "2", + "blockStream.streamingMode", + "MILLIS_PER_BLOCK", + "blockStream.millisecondsPerBlock", + "10", + "blockStream.blockItemsBatchSize", + "1")); BlockStreamSimulatorApp blockStreamSimulator = - new BlockStreamSimulatorApp( - configuration, blockStreamManager, publishStreamGrpcClient); + new BlockStreamSimulatorApp(configuration, blockStreamManager, publishStreamGrpcClient); blockStreamSimulator.start(); assertTrue(blockStreamSimulator.isRunning()); // Assert log exists - boolean found_log = - logRecords.stream() - .anyMatch( - logRecord -> - logRecord - .getMessage() - .contains("Block Server is running behind")); + boolean found_log = logRecords.stream() + .anyMatch(logRecord -> logRecord.getMessage().contains("Block Server is running behind")); assertTrue(found_log); } + @Test + void start_withBothMode_throwsUnsupportedOperationException() throws Exception { + Configuration configuration = TestUtils.getTestConfiguration(Map.of("blockStream.simulatorMode", "BOTH")); + blockStreamSimulator = new BlockStreamSimulatorApp(configuration, blockStreamManager, publishStreamGrpcClient); + assertThrows(UnsupportedOperationException.class, () -> blockStreamSimulator.start()); + } + + @Test + void start_withConsumerMode_throwsUnsupportedOperationException() throws Exception { + Configuration configuration = TestUtils.getTestConfiguration(Map.of("blockStream.simulatorMode", "CONSUMER")); + blockStreamSimulator = new BlockStreamSimulatorApp(configuration, blockStreamManager, publishStreamGrpcClient); + assertThrows(UnsupportedOperationException.class, () -> blockStreamSimulator.start()); + } + + @Test + void constructor_throwsExceptionForNullSimulatorMode() { + Configuration configuration = mock(Configuration.class); + BlockStreamConfig blockStreamConfig = mock(BlockStreamConfig.class); + + when(configuration.getConfigData(BlockStreamConfig.class)).thenReturn(blockStreamConfig); + when(blockStreamConfig.simulatorMode()).thenReturn(null); + + assertThrows(NullPointerException.class, () -> { + new BlockStreamSimulatorApp(configuration, blockStreamManager, publishStreamGrpcClient); + }); + } + private List captureLogs() { // Capture logs - Logger logger = Logger.getLogger(BlockStreamSimulatorApp.class.getName()); + Logger logger = Logger.getLogger(PublisherModeHandler.class.getName()); final List logRecords = new ArrayList<>(); // Custom handler to capture logs - Handler handler = - new Handler() { - @Override - public void publish(LogRecord record) { - logRecords.add(record); - } - - @Override - public void flush() {} - - @Override - public void close() throws SecurityException {} - }; + Handler handler = new Handler() { + @Override + public void publish(LogRecord record) { + logRecords.add(record); + } + + @Override + public void flush() {} + + @Override + public void close() throws SecurityException {} + }; // Add handler to logger logger.addHandler(handler); diff --git a/simulator/src/test/java/com/hedera/block/simulator/config/data/BlockStreamConfigTest.java b/simulator/src/test/java/com/hedera/block/simulator/config/data/BlockStreamConfigTest.java index 821735e04..194266cc7 100644 --- a/simulator/src/test/java/com/hedera/block/simulator/config/data/BlockStreamConfigTest.java +++ b/simulator/src/test/java/com/hedera/block/simulator/config/data/BlockStreamConfigTest.java @@ -19,6 +19,7 @@ import static org.junit.jupiter.api.Assertions.*; import com.hedera.block.simulator.config.types.GenerationMode; +import com.hedera.block.simulator.config.types.SimulatorMode; import com.hedera.block.simulator.config.types.StreamingMode; import java.nio.file.Files; import java.nio.file.Path; @@ -68,6 +69,15 @@ void testStreamConfigBuilder() { assertEquals(StreamingMode.CONSTANT_RATE, config.streamingMode()); } + @Test + void testSimulatorMode() { + BlockStreamConfig config = getBlockStreamConfigBuilder() + .simulatorMode(SimulatorMode.PUBLISHER) + .build(); + + assertEquals(SimulatorMode.PUBLISHER, config.simulatorMode()); + } + @Test void testValidAbsolutePath() { // Setup valid folder path and generation mode @@ -80,11 +90,10 @@ void testValidAbsolutePath() { assertTrue(Files.exists(path), "The folder must exist for this test."); // No exception should be thrown - BlockGeneratorConfig config = - getBlockGeneratorConfigBuilder() - .folderRootPath(folderRootPath) - .generationMode(generationMode) - .build(); + BlockGeneratorConfig config = getBlockGeneratorConfigBuilder() + .folderRootPath(folderRootPath) + .generationMode(generationMode) + .build(); assertEquals(folderRootPath, config.folderRootPath()); assertEquals(GenerationMode.DIR, config.generationMode()); @@ -96,9 +105,7 @@ void testEmptyFolderRootPath() { String folderRootPath = ""; GenerationMode generationMode = GenerationMode.DIR; BlockGeneratorConfig.Builder builder = - getBlockGeneratorConfigBuilder() - .folderRootPath(folderRootPath) - .generationMode(generationMode); + getBlockGeneratorConfigBuilder().folderRootPath(folderRootPath).generationMode(generationMode); BlockGeneratorConfig config = builder.build(); @@ -116,13 +123,10 @@ void testRelativeFolderPathThrowsException() { // An exception should be thrown because the path is not absolute IllegalArgumentException exception = - assertThrows( - IllegalArgumentException.class, - () -> - getBlockGeneratorConfigBuilder() - .folderRootPath(relativeFolderPath) - .generationMode(generationMode) - .build()); + assertThrows(IllegalArgumentException.class, () -> getBlockGeneratorConfigBuilder() + .folderRootPath(relativeFolderPath) + .generationMode(generationMode) + .build()); // Verify the exception message assertEquals(relativeFolderPath + " Root path must be absolute", exception.getMessage()); @@ -140,13 +144,10 @@ void testNonExistentFolderThrowsException() { // An exception should be thrown because the folder does not exist IllegalArgumentException exception = - assertThrows( - IllegalArgumentException.class, - () -> - getBlockGeneratorConfigBuilder() - .folderRootPath(folderRootPath) - .generationMode(generationMode) - .build()); + assertThrows(IllegalArgumentException.class, () -> getBlockGeneratorConfigBuilder() + .folderRootPath(folderRootPath) + .generationMode(generationMode) + .build()); // Verify the exception message assertEquals("Folder does not exist: " + path, exception.getMessage()); @@ -159,11 +160,10 @@ void testGenerationModeNonDirDoesNotCheckFolderExistence() { GenerationMode generationMode = GenerationMode.ADHOC; // No exception should be thrown because generation mode is not DIR - BlockGeneratorConfig config = - getBlockGeneratorConfigBuilder() - .folderRootPath(folderRootPath) - .generationMode(generationMode) - .build(); + BlockGeneratorConfig config = getBlockGeneratorConfigBuilder() + .folderRootPath(folderRootPath) + .generationMode(generationMode) + .build(); // Verify that the configuration was created successfully assertEquals(folderRootPath, config.folderRootPath()); diff --git a/simulator/src/test/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImplTest.java b/simulator/src/test/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImplTest.java index a60aed355..486bf8782 100644 --- a/simulator/src/test/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImplTest.java +++ b/simulator/src/test/java/com/hedera/block/simulator/grpc/PublishStreamGrpcClientImplTest.java @@ -16,14 +16,18 @@ package com.hedera.block.simulator.grpc; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import com.hedera.block.simulator.TestUtils; import com.hedera.block.simulator.config.data.BlockStreamConfig; import com.hedera.block.simulator.config.data.GrpcConfig; import com.hedera.hapi.block.stream.Block; import com.hedera.hapi.block.stream.BlockItem; +import io.grpc.ManagedChannel; import java.io.IOException; +import java.lang.reflect.Field; import java.util.List; import java.util.Map; import org.junit.jupiter.api.AfterEach; @@ -39,9 +43,8 @@ class PublishStreamGrpcClientImplTest { void setUp() throws IOException { grpcConfig = TestUtils.getTestConfiguration().getConfigData(GrpcConfig.class); - blockStreamConfig = - TestUtils.getTestConfiguration(Map.of("blockStream.blockItemsBatchSize", "2")) - .getConfigData(BlockStreamConfig.class); + blockStreamConfig = TestUtils.getTestConfiguration(Map.of("blockStream.blockItemsBatchSize", "2")) + .getConfigData(BlockStreamConfig.class); } @AfterEach @@ -52,6 +55,7 @@ void streamBlockItem() { BlockItem blockItem = BlockItem.newBuilder().build(); PublishStreamGrpcClientImpl publishStreamGrpcClient = new PublishStreamGrpcClientImpl(grpcConfig, blockStreamConfig); + publishStreamGrpcClient.init(); boolean result = publishStreamGrpcClient.streamBlockItem(List.of(blockItem)); assertTrue(result); } @@ -65,11 +69,32 @@ void streamBlock() { PublishStreamGrpcClientImpl publishStreamGrpcClient = new PublishStreamGrpcClientImpl(grpcConfig, blockStreamConfig); - + publishStreamGrpcClient.init(); boolean result = publishStreamGrpcClient.streamBlock(block); assertTrue(result); boolean result1 = publishStreamGrpcClient.streamBlock(block1); assertTrue(result1); } + + @Test + void testShutdown() throws Exception { + PublishStreamGrpcClientImpl publishStreamGrpcClient = + new PublishStreamGrpcClientImpl(grpcConfig, blockStreamConfig); + publishStreamGrpcClient.init(); + + Field channelField = PublishStreamGrpcClientImpl.class.getDeclaredField("channel"); + ManagedChannel mockChannel = mock(ManagedChannel.class); + + try { + channelField.setAccessible(true); + channelField.set(publishStreamGrpcClient, mockChannel); + } finally { + channelField.setAccessible(false); + } + publishStreamGrpcClient.shutdown(); + + // Verify that channel.shutdown() was called + verify(mockChannel).shutdown(); + } } diff --git a/simulator/src/test/java/com/hedera/block/simulator/mode/CombinedModeHandlerTest.java b/simulator/src/test/java/com/hedera/block/simulator/mode/CombinedModeHandlerTest.java new file mode 100644 index 000000000..0a522cf89 --- /dev/null +++ b/simulator/src/test/java/com/hedera/block/simulator/mode/CombinedModeHandlerTest.java @@ -0,0 +1,40 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.simulator.mode; + +import static org.junit.jupiter.api.Assertions.assertThrows; + +import com.hedera.block.simulator.config.data.BlockStreamConfig; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +public class CombinedModeHandlerTest { + + @Mock + private BlockStreamConfig blockStreamConfig; + + private CombinedModeHandler combinedModeHandler; + + @Test + void testStartThrowsUnsupportedOperationException() { + MockitoAnnotations.openMocks(this); + combinedModeHandler = new CombinedModeHandler(blockStreamConfig); + + assertThrows(UnsupportedOperationException.class, () -> combinedModeHandler.start()); + } +} diff --git a/simulator/src/test/java/com/hedera/block/simulator/mode/ConsumerModeHandlerTest.java b/simulator/src/test/java/com/hedera/block/simulator/mode/ConsumerModeHandlerTest.java new file mode 100644 index 000000000..df387f219 --- /dev/null +++ b/simulator/src/test/java/com/hedera/block/simulator/mode/ConsumerModeHandlerTest.java @@ -0,0 +1,40 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.simulator.mode; + +import static org.junit.jupiter.api.Assertions.assertThrows; + +import com.hedera.block.simulator.config.data.BlockStreamConfig; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +public class ConsumerModeHandlerTest { + + @Mock + private BlockStreamConfig blockStreamConfig; + + private ConsumerModeHandler consumerModeHandler; + + @Test + void testStartThrowsUnsupportedOperationException() { + MockitoAnnotations.openMocks(this); + consumerModeHandler = new ConsumerModeHandler(blockStreamConfig); + + assertThrows(UnsupportedOperationException.class, () -> consumerModeHandler.start()); + } +} diff --git a/simulator/src/test/java/com/hedera/block/simulator/mode/PublisherModeHandlerTest.java b/simulator/src/test/java/com/hedera/block/simulator/mode/PublisherModeHandlerTest.java new file mode 100644 index 000000000..9a50f76a9 --- /dev/null +++ b/simulator/src/test/java/com/hedera/block/simulator/mode/PublisherModeHandlerTest.java @@ -0,0 +1,157 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.simulator.mode; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import com.hedera.block.simulator.config.data.BlockStreamConfig; +import com.hedera.block.simulator.config.types.StreamingMode; +import com.hedera.block.simulator.generator.BlockStreamManager; +import com.hedera.block.simulator.grpc.PublishStreamGrpcClient; +import com.hedera.hapi.block.stream.Block; +import com.hedera.hapi.block.stream.BlockItem; +import java.io.IOException; +import java.util.Arrays; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +public class PublisherModeHandlerTest { + + @Mock + private BlockStreamConfig blockStreamConfig; + + @Mock + private PublishStreamGrpcClient publishStreamGrpcClient; + + @Mock + private BlockStreamManager blockStreamManager; + + private PublisherModeHandler publisherModeHandler; + + @BeforeEach + void setUp() { + MockitoAnnotations.openMocks(this); + } + + @Test + void testStartWithMillisPerBlockStreaming_WithBlocks() throws Exception { + // Configure blockStreamConfig + when(blockStreamConfig.streamingMode()).thenReturn(StreamingMode.MILLIS_PER_BLOCK); + when(blockStreamConfig.millisecondsPerBlock()).thenReturn(0); // No delay for testing + + publisherModeHandler = new PublisherModeHandler(blockStreamConfig, publishStreamGrpcClient, blockStreamManager); + + Block block1 = mock(Block.class); + Block block2 = mock(Block.class); + + when(blockStreamManager.getNextBlock()) + .thenReturn(block1) + .thenReturn(block2) + .thenReturn(null); + + publisherModeHandler.start(); + + verify(publishStreamGrpcClient).streamBlock(block1); + verify(publishStreamGrpcClient).streamBlock(block2); + verifyNoMoreInteractions(publishStreamGrpcClient); + verify(blockStreamManager, times(3)).getNextBlock(); + } + + @Test + void testStartWithMillisPerBlockStreaming_NoBlocks() throws Exception { + when(blockStreamConfig.streamingMode()).thenReturn(StreamingMode.MILLIS_PER_BLOCK); + + publisherModeHandler = new PublisherModeHandler(blockStreamConfig, publishStreamGrpcClient, blockStreamManager); + + when(blockStreamManager.getNextBlock()).thenReturn(null); + + publisherModeHandler.start(); + + verify(publishStreamGrpcClient, never()).streamBlock(any(Block.class)); + verify(blockStreamManager).getNextBlock(); + } + + @Test + void testStartWithConstantRateStreaming_WithinMaxItems() throws Exception { + when(blockStreamConfig.streamingMode()).thenReturn(StreamingMode.CONSTANT_RATE); + when(blockStreamConfig.delayBetweenBlockItems()).thenReturn(0); + when(blockStreamConfig.maxBlockItemsToStream()).thenReturn(5); + + publisherModeHandler = new PublisherModeHandler(blockStreamConfig, publishStreamGrpcClient, blockStreamManager); + + Block block1 = mock(Block.class); + Block block2 = mock(Block.class); + + BlockItem blockItem1 = mock(BlockItem.class); + BlockItem blockItem2 = mock(BlockItem.class); + BlockItem blockItem3 = mock(BlockItem.class); + BlockItem blockItem4 = mock(BlockItem.class); + + when(block1.items()).thenReturn(Arrays.asList(blockItem1, blockItem2)); + when(block2.items()).thenReturn(Arrays.asList(blockItem3, blockItem4)); + + when(blockStreamManager.getNextBlock()) + .thenReturn(block1) + .thenReturn(block2) + .thenReturn(null); + + publisherModeHandler.start(); + + verify(publishStreamGrpcClient).streamBlock(block1); + verify(publishStreamGrpcClient).streamBlock(block2); + verifyNoMoreInteractions(publishStreamGrpcClient); + verify(blockStreamManager, times(3)).getNextBlock(); + } + + @Test + void testStartWithConstantRateStreaming_NoBlocks() throws Exception { + when(blockStreamConfig.streamingMode()).thenReturn(StreamingMode.CONSTANT_RATE); + publisherModeHandler = new PublisherModeHandler(blockStreamConfig, publishStreamGrpcClient, blockStreamManager); + + when(blockStreamManager.getNextBlock()).thenReturn(null); + + publisherModeHandler.start(); + + verify(publishStreamGrpcClient, never()).streamBlock(any(Block.class)); + verify(blockStreamManager).getNextBlock(); + } + + @Test + void testStartWithExceptionDuringStreaming() throws Exception { + when(blockStreamConfig.streamingMode()).thenReturn(StreamingMode.MILLIS_PER_BLOCK); + + publisherModeHandler = new PublisherModeHandler(blockStreamConfig, publishStreamGrpcClient, blockStreamManager); + + when(blockStreamManager.getNextBlock()).thenThrow(new IOException("Test exception")); + + assertThrows(IOException.class, () -> publisherModeHandler.start()); + + verify(publishStreamGrpcClient, never()).streamBlock(any(Block.class)); + verify(blockStreamManager).getNextBlock(); + verifyNoMoreInteractions(publishStreamGrpcClient); + verifyNoMoreInteractions(blockStreamManager); + } +} diff --git a/suites/src/main/java/com/hedera/block/suites/BaseSuite.java b/suites/src/main/java/com/hedera/block/suites/BaseSuite.java index cbcaed78e..96a31d1ea 100644 --- a/suites/src/main/java/com/hedera/block/suites/BaseSuite.java +++ b/suites/src/main/java/com/hedera/block/suites/BaseSuite.java @@ -17,8 +17,6 @@ package com.hedera.block.suites; import com.hedera.block.simulator.BlockStreamSimulatorApp; -import com.hedera.block.simulator.BlockStreamSimulatorInjectionComponent; -import com.hedera.block.simulator.DaggerBlockStreamSimulatorInjectionComponent; import com.swirlds.config.api.Configuration; import com.swirlds.config.api.ConfigurationBuilder; import com.swirlds.config.extensions.sources.ClasspathFileConfigSource; @@ -27,6 +25,8 @@ import io.github.cdimascio.dotenv.Dotenv; import java.io.IOException; import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.testcontainers.containers.GenericContainer; @@ -71,20 +71,11 @@ public BaseSuite() { * Setup method to be executed before all tests. * *

This method initializes the Block Node server container using Testcontainers. - * - * @throws IOException if an I/O error occurs */ @BeforeAll - public static void setup() throws IOException { - blockNodeContainer = getConfiguration(); + public static void setup() { + blockNodeContainer = createContainer(); blockNodeContainer.start(); - - // TODO remove in the next PR which adds tests - BlockStreamSimulatorInjectionComponent DIComponent = - DaggerBlockStreamSimulatorInjectionComponent.factory() - .create(loadDefaultConfiguration()); - - BlockStreamSimulatorApp blockStreamSimulatorApp = DIComponent.getBlockStreamSimulatorApp(); } /** @@ -101,7 +92,7 @@ public static void teardown() { } /** - * Retrieves the configuration for the Block Node server container. + * Initialize container with the default configuration and returns it. * *

This method initializes the Block Node container with the version retrieved from the .env * file. It configures the container and returns it. @@ -116,16 +107,17 @@ public static void teardown() { * * @return a configured {@link GenericContainer} instance for the Block Node server */ - public static GenericContainer getConfiguration() { + protected static GenericContainer createContainer() { String blockNodeVersion = BaseSuite.getBlockNodeVersion(); blockNodePort = 8080; - blockNodeContainer = - new GenericContainer<>( - DockerImageName.parse("block-node-server:" + blockNodeVersion)) - .withExposedPorts(blockNodePort) - .withEnv("VERSION", blockNodeVersion) - .waitingFor(Wait.forListeningPort()) - .waitingFor(Wait.forHealthcheck()); + List portBindings = new ArrayList<>(); + portBindings.add(String.format("%d:%2d", blockNodePort, blockNodePort)); + blockNodeContainer = new GenericContainer<>(DockerImageName.parse("block-node-server:" + blockNodeVersion)) + .withExposedPorts(blockNodePort) + .withEnv("VERSION", blockNodeVersion) + .waitingFor(Wait.forListeningPort()) + .waitingFor(Wait.forHealthcheck()); + blockNodeContainer.setPortBindings(portBindings); return blockNodeContainer; } @@ -135,13 +127,12 @@ public static GenericContainer getConfiguration() { * @return default block simulator configuration * @throws IOException if an I/O error occurs */ - protected static Configuration loadDefaultConfiguration() throws IOException { - ConfigurationBuilder configurationBuilder = - ConfigurationBuilder.create() - .withSource(SystemEnvironmentConfigSource.getInstance()) - .withSource(SystemPropertiesConfigSource.getInstance()) - .withSource(new ClasspathFileConfigSource(Path.of("app.properties"))) - .autoDiscoverExtensions(); + protected static Configuration loadSimulatorDefaultConfiguration() throws IOException { + ConfigurationBuilder configurationBuilder = ConfigurationBuilder.create() + .withSource(SystemEnvironmentConfigSource.getInstance()) + .withSource(SystemPropertiesConfigSource.getInstance()) + .withSource(new ClasspathFileConfigSource(Path.of("app.properties"))) + .autoDiscoverExtensions(); return configurationBuilder.build(); } @@ -156,7 +147,10 @@ protected static Configuration loadDefaultConfiguration() throws IOException { * @return the version of the Block Node server as a string */ private static String getBlockNodeVersion() { - Dotenv dotenv = Dotenv.configure().directory("../server/docker").filename(".env").load(); + Dotenv dotenv = Dotenv.configure() + .directory("../server/docker") + .filename(".env") + .load(); return dotenv.get("VERSION"); } diff --git a/suites/src/main/java/com/hedera/block/suites/grpc/negative/NegativeServerAvailabilityTests.java b/suites/src/main/java/com/hedera/block/suites/grpc/negative/NegativeServerAvailabilityTests.java index 86e953eec..cc93fc5bc 100644 --- a/suites/src/main/java/com/hedera/block/suites/grpc/negative/NegativeServerAvailabilityTests.java +++ b/suites/src/main/java/com/hedera/block/suites/grpc/negative/NegativeServerAvailabilityTests.java @@ -45,13 +45,13 @@ public NegativeServerAvailabilityTests() {} * Clean up method executed after each test. * *

This method stops the running container, resets the container configuration by retrieving - * a new one through {@link BaseSuite#getConfiguration()}, and then starts the Block Node + * a new one through {@link BaseSuite#createContainer()}, and then starts the Block Node * container again. */ @AfterEach public void cleanUp() { blockNodeContainer.stop(); - blockNodeContainer = getConfiguration(); + blockNodeContainer = createContainer(); blockNodeContainer.start(); } diff --git a/suites/src/main/java/com/hedera/block/suites/persistence/positive/PositiveDataPersistenceTests.java b/suites/src/main/java/com/hedera/block/suites/persistence/positive/PositiveDataPersistenceTests.java index 0e9670e0a..43cc984b6 100644 --- a/suites/src/main/java/com/hedera/block/suites/persistence/positive/PositiveDataPersistenceTests.java +++ b/suites/src/main/java/com/hedera/block/suites/persistence/positive/PositiveDataPersistenceTests.java @@ -16,12 +16,96 @@ package com.hedera.block.suites.persistence.positive; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.hedera.block.simulator.BlockStreamSimulatorApp; +import com.hedera.block.simulator.BlockStreamSimulatorInjectionComponent; +import com.hedera.block.simulator.DaggerBlockStreamSimulatorInjectionComponent; import com.hedera.block.suites.BaseSuite; +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.Container; -/** Positive Data Persistence Tests */ +/** + * Test class for verifying the positive scenarios for data persistence. + * + *

Inherits from {@link BaseSuite} to reuse the container setup and teardown logic for the Block + * Node. + */ @DisplayName("Positive Data Persistence Tests") public class PositiveDataPersistenceTests extends BaseSuite { + private final String[] GET_BLOCKS_COMMAND = new String[] {"ls", "data", "-1"}; + private ExecutorService executorService; + /** Default constructor for the {@link PositiveDataPersistenceTests} class. */ public PositiveDataPersistenceTests() {} + + @BeforeEach + void setupEnvironment() { + executorService = Executors.newFixedThreadPool(2); + } + + @AfterEach + void teardownEnvironment() { + executorService.shutdownNow(); + } + + /** + * Verifies that block data is saved in the correct directory by comparing the count of saved + * blocks before and after running the simulator. The test asserts that the number of saved + * blocks increases after the simulator runs. + * + * @throws IOException if an I/O error occurs during execution in the container + * @throws InterruptedException if the thread is interrupted while sleeping or executing + * commands + */ + @Test + public void verifyBlockDataSavedInCorrectDirectory() throws InterruptedException, IOException { + String savedBlocksFolderBefore = getContainerCommandResult(GET_BLOCKS_COMMAND); + int savedBlocksCountBefore = getSavedBlocksCount(savedBlocksFolderBefore); + + BlockStreamSimulatorApp blockStreamSimulatorApp = createBlockSimulator(); + startSimulatorThread(blockStreamSimulatorApp); + Thread.sleep(5000); + blockStreamSimulatorApp.stop(); + + String savedBlocksFolderAfter = getContainerCommandResult(GET_BLOCKS_COMMAND); + int savedBlocksCountAfter = getSavedBlocksCount(savedBlocksFolderAfter); + + assertTrue(savedBlocksFolderBefore.isEmpty()); + assertFalse(savedBlocksFolderAfter.isEmpty()); + assertTrue(savedBlocksCountAfter > savedBlocksCountBefore); + } + + private BlockStreamSimulatorApp createBlockSimulator() throws IOException { + BlockStreamSimulatorInjectionComponent DIComponent = + DaggerBlockStreamSimulatorInjectionComponent.factory().create(loadSimulatorDefaultConfiguration()); + return DIComponent.getBlockStreamSimulatorApp(); + } + + private void startSimulatorThread(BlockStreamSimulatorApp blockStreamSimulatorAppInstance) { + executorService.submit(() -> { + try { + blockStreamSimulatorAppInstance.start(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + private int getSavedBlocksCount(String blocksFolders) { + String[] blocksArray = blocksFolders.split("\\n"); + return blocksArray.length; + } + + private String getContainerCommandResult(String[] command) throws IOException, InterruptedException { + Container.ExecResult result = blockNodeContainer.execInContainer(command); + return result.getStdout(); + } }