From a3a97ccb4dd5b6ad2d0cb5dad355d4575681fed7 Mon Sep 17 00:00:00 2001 From: Alfredo Gutierrez Date: Fri, 4 Oct 2024 15:17:28 -0600 Subject: [PATCH] feat: Simulator: BlockStreamManager impl for very large DataSets (#225) Signed-off-by: Alfredo Gutierrez --- .github/workflows/smoke-test.yaml | 2 +- simulator/README.md | 83 ++++++++++++++ .../block/simulator/BlockStreamSimulator.java | 5 +- .../simulator/BlockStreamSimulatorApp.java | 18 ++- .../com/hedera/block/simulator/Constants.java | 25 +++++ .../config/data/BlockStreamConfig.java | 6 +- .../BlockAsDirBlockStreamManager.java | 12 +- .../BlockAsFileBlockStreamManager.java | 12 +- .../generator/BlockAsFileLargeDataSets.java | 95 ++++++++++++++++ .../generator/BlockStreamManager.java | 10 +- .../generator/GeneratorInjectionModule.java | 2 + .../block/simulator/generator/Utils.java | 20 +++- simulator/src/main/resources/app.properties | 7 +- .../simulator/BlockStreamSimulatorTest.java | 35 +++++- .../config/data/BlockStreamConfigTest.java | 22 +++- .../BlockAsDirBlockStreamManagerTest.java | 10 +- .../BlockAsFileBlockStreamManagerTest.java | 18 ++- .../BlockAsFileLargeDataSetsTest.java | 106 ++++++++++++++++++ .../GeneratorInjectionModuleTest.java | 17 +++ .../000000000000000000000000000000000008.blk | 0 .../resources/block-0.0.3-blk/notABlock.txt | 1 + 21 files changed, 465 insertions(+), 41 deletions(-) create mode 100644 simulator/README.md create mode 100644 simulator/src/main/java/com/hedera/block/simulator/Constants.java create mode 100644 simulator/src/main/java/com/hedera/block/simulator/generator/BlockAsFileLargeDataSets.java create mode 100644 simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsFileLargeDataSetsTest.java create mode 100644 simulator/src/test/resources/block-0.0.3-blk/000000000000000000000000000000000008.blk create mode 100644 simulator/src/test/resources/block-0.0.3-blk/notABlock.txt diff --git a/.github/workflows/smoke-test.yaml b/.github/workflows/smoke-test.yaml index a8c1f0de4..e39cce04e 100644 --- a/.github/workflows/smoke-test.yaml +++ b/.github/workflows/smoke-test.yaml @@ -81,7 +81,7 @@ jobs: - name: Run application in background, capture logs in app.log run: | - ${{ env.GRADLE_EXEC }} run -x :suites:run 2> server/src/test/resources/app.log < /dev/null & + ${{ env.GRADLE_EXEC }} :server:run 2> server/src/test/resources/app.log < /dev/null & echo "Application started with PID $APP_PID" sleep 10 diff --git a/simulator/README.md b/simulator/README.md new file mode 100644 index 000000000..0851d64c6 --- /dev/null +++ b/simulator/README.md @@ -0,0 +1,83 @@ +# Block Stream Simulator + +## Overview + +The Block Stream Simulator is designed to simulate block streaming for Hedera Hashgraph. +It uses various configuration sources and dependency injection to manage its components. + +## Prerequisites + +- Java 21 +- Gradle +- IntelliJ IDEA (recommended for development) + +## Project Design Structure + +Uses Dagger2 for dependency injection, the project has a modular structure and divides the Dagger dependencies into modules, but all modules used can be found at the root Injection Module: +```plaintext +src/java/com/hedera/block/simulator/BlockStreamSimulatorInjectionModule.java +``` +Entry point for the project is `BlockStreamSimulator.java`, in wich the main method is located and has 2 functions: +1. Create/Load the Application Configuration, it does this using Hedera Platform Configuration API. +2. Create a DaggerComponent and instantiate the BlockStreamSimulatorApp class using the DaggerComponent and it registered dependencies. +3. Start the BlockStreamSimulatorApp, contains the orchestration of the different parts of the simulation using generic interfaces and handles the rate of streaming and the exit conditions. + +The BlockStreamSimulatorApp consumes other services that are injected using DaggerComponent, these are: +1. **generator:** responsible for generating blocks, exposes a single interface `BlockStreamManager` and several implementations + 1. BlockAsDirBlockStreamManager: generates blocks from a directory, each folder is a block, and block-items are single 'blk' or 'blk.gz' files. + 2. BlockAsFileBlockStreamManager: generates blocks from a single file, each file is a block, used to the format of the CN recordings. (since it loads blocks on memory it can stream really fast, really useful for simple streaming tests) + 3. BlockAsFileLargeDataSets: similar to BlockAsFileBLockStreamManager, but designed to work with GB folders with thousands of big blocks (since it has a high size block and volume of blocks, is useful for performace, load and stress testing) +2. **grpc:** responsible for the communication with the Block-Node, currently only has 1 interface `PublishStreamGrpcClient` and 1 Implementation, however also exposes a `PublishStreamObserver' + +## Configuration + +There are 2 configuration sets: +1. BlockStreamConfig: contains the configuration for the Block Stream Simulator logic and the generation module. +2. GrpcConfig: contains the configuration for the gRPC communication with the Block-Node. + +### BlockStreamConfig +Uses the prefix `blockStream` so all properties should start with `blockStream.` + +| Key | Description | Default Value | +|--------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------| +| `generationMode` | The desired generation Mode to use, it can only be `DIR` or `AD_HOC` | `DIR` | +| `folderRootPath` | If the generationMode is DIR this will be used as the source of the recording to stream to the Block-Node | `` | +| `delayBetweenBlockItems` | The delay between each block item in nanoseconds | `1_500_000` | +| `managerImplementation` | The desired implementation of the BlockStreamManager to use, it can only be `BlockAsDirBlockStreamManager`, `BlockAsFileBlockStreamManager` or `BlockAsFileLargeDataSets` | `BlockAsFileBlockStreamManager` | +| `maxBlockItemsToStream` | exit condition for the simulator and the circular implementations such as `BlockAsDir` or `BlockAsFile` implementations | `10_000` | +| `paddedLength` | on the `BlockAsFileLargeDataSets` implementation, the length of the padded left zeroes `000001.blk.gz` | 36 | +| `fileExtension` | on the `BlockAsFileLargeDataSets` implementation, the extension of the files to be streamed | `.blk.gz` | + +### GrpcConfig +Uses the prefix `grpc` so all properties should start with `grpc.` + +| Key | Description | Default Value | +|-----------------|----------------------------|---------------| +| `serverAddress` | The host of the Block-Node | `localhost` | +| `port` | The port of the Block-Node | `8080` | + +## Building the Project + +To build the project, run the following command: + +```sh +./gradlew :simulator:build +``` + +## Running the Project + +Usually you will want to run a Block-Node server before the simulator, for that you can use the following commnad: + +```sh + ./gradlew :server:run +``` +However we recommend running the block-node server as a docker container: +```sh +./gradlew :server:build :server:createDockerImage :server:startDockerContainer +``` + +Once the project is built, you can run it using the following command: + +```sh +./gradlew :simulator:run +``` diff --git a/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulator.java b/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulator.java index 16c5fb9c9..f9b21a4c2 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulator.java +++ b/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulator.java @@ -18,6 +18,7 @@ import static java.lang.System.Logger.Level.INFO; +import com.hedera.pbj.runtime.ParseException; import com.swirlds.config.api.Configuration; import com.swirlds.config.api.ConfigurationBuilder; import com.swirlds.config.extensions.sources.ClasspathFileConfigSource; @@ -40,8 +41,10 @@ private BlockStreamSimulator() {} * @param args the arguments to be passed to the block stream simulator * @throws IOException if an I/O error occurs * @throws InterruptedException if the thread is interrupted + * @throws ParseException if a parse error occurs */ - public static void main(String[] args) throws IOException, InterruptedException { + public static void main(String[] args) + throws IOException, InterruptedException, ParseException { LOGGER.log(INFO, "Starting Block Stream Simulator"); diff --git a/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulatorApp.java b/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulatorApp.java index 958d1bd77..391380fc3 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulatorApp.java +++ b/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulatorApp.java @@ -20,8 +20,10 @@ import com.hedera.block.simulator.generator.BlockStreamManager; import com.hedera.block.simulator.grpc.PublishStreamGrpcClient; import com.hedera.hapi.block.stream.BlockItem; +import com.hedera.pbj.runtime.ParseException; import com.swirlds.config.api.Configuration; import edu.umd.cs.findbugs.annotations.NonNull; +import java.io.IOException; import javax.inject.Inject; /** BlockStream Simulator App */ @@ -64,8 +66,10 @@ public BlockStreamSimulatorApp( * Starts the block stream simulator. * * @throws InterruptedException if the thread is interrupted + * @throws ParseException if a parse error occurs + * @throws IOException if an I/O error occurs */ - public void start() throws InterruptedException { + public void start() throws InterruptedException, ParseException, IOException { int delayMSBetweenBlockItems = delayBetweenBlockItems / 1_000_000; int delayNSBetweenBlockItems = delayBetweenBlockItems % 1_000_000; @@ -78,12 +82,24 @@ public void start() throws InterruptedException { while (streamBlockItem) { // get block item BlockItem blockItem = blockStreamManager.getNextBlockItem(); + + if (blockItem == null) { + LOGGER.log( + System.Logger.Level.INFO, + "Block Stream Simulator has reached the end of the block items"); + break; + } + publishStreamGrpcClient.streamBlockItem(blockItem); blockItemsStreamed++; Thread.sleep(delayMSBetweenBlockItems, delayNSBetweenBlockItems); if (blockItemsStreamed >= blockStreamConfig.maxBlockItemsToStream()) { + LOGGER.log( + System.Logger.Level.INFO, + "Block Stream Simulator has reached the maximum number of block items to" + + " stream"); streamBlockItem = false; } } diff --git a/simulator/src/main/java/com/hedera/block/simulator/Constants.java b/simulator/src/main/java/com/hedera/block/simulator/Constants.java new file mode 100644 index 000000000..9b4bda17c --- /dev/null +++ b/simulator/src/main/java/com/hedera/block/simulator/Constants.java @@ -0,0 +1,25 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.simulator; + +public class Constants { + + // The file extension for block files. + public static final String RECORD_EXTENSION = "blk"; + // postfix for gzipped files + public static final String GZ_EXTENSION = ".gz"; +} diff --git a/simulator/src/main/java/com/hedera/block/simulator/config/data/BlockStreamConfig.java b/simulator/src/main/java/com/hedera/block/simulator/config/data/BlockStreamConfig.java index 299ec0b5c..7c97934aa 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/config/data/BlockStreamConfig.java +++ b/simulator/src/main/java/com/hedera/block/simulator/config/data/BlockStreamConfig.java @@ -31,6 +31,8 @@ * @param delayBetweenBlockItems the delay between block items * @param managerImplementation the implementation of the block stream manager * @param maxBlockItemsToStream the maximum number of block items to stream + * @param paddedLength the padded length of 0 the block file format + * @param fileExtension the file extension of the block file format */ @ConfigData("blockStream") public record BlockStreamConfig( @@ -39,7 +41,9 @@ public record BlockStreamConfig( @ConfigProperty(defaultValue = "1_500_000") int delayBetweenBlockItems, @ConfigProperty(defaultValue = "BlockAsFileBlockStreamManager") String managerImplementation, - @ConfigProperty(defaultValue = "10_000") int maxBlockItemsToStream) { + @ConfigProperty(defaultValue = "10_000") int maxBlockItemsToStream, + @ConfigProperty(defaultValue = "36") int paddedLength, + @ConfigProperty(defaultValue = ".blk.gz") String fileExtension) { /** * Constructor to set the default root path if not provided, it will be set to the data diff --git a/simulator/src/main/java/com/hedera/block/simulator/generator/BlockAsDirBlockStreamManager.java b/simulator/src/main/java/com/hedera/block/simulator/generator/BlockAsDirBlockStreamManager.java index fcf7c9349..ec1d9e9c5 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/generator/BlockAsDirBlockStreamManager.java +++ b/simulator/src/main/java/com/hedera/block/simulator/generator/BlockAsDirBlockStreamManager.java @@ -16,6 +16,7 @@ package com.hedera.block.simulator.generator; +import static com.hedera.block.simulator.generator.Utils.readFileBytes; import static java.lang.System.Logger.Level.DEBUG; import static java.lang.System.Logger.Level.ERROR; import static java.lang.System.Logger.Level.INFO; @@ -125,7 +126,7 @@ private void loadBlocks() throws IOException, ParseException { .toList(); for (Path pathBlockItem : sortedBlockItems) { - byte[] blockItemBytes = readBlockItemBytes(pathBlockItem); + byte[] blockItemBytes = readFileBytes(pathBlockItem); // if null means the file is not a block item and we can skip the file. if (blockItemBytes == null) { continue; @@ -141,15 +142,6 @@ private void loadBlocks() throws IOException, ParseException { } } - private byte[] readBlockItemBytes(Path pathBlockItem) throws IOException { - if (pathBlockItem.toString().endsWith(".gz")) { - return Utils.readGzFile(pathBlockItem); - } else if (pathBlockItem.toString().endsWith(".blk")) { - return Files.readAllBytes(pathBlockItem); - } - return null; - } - // Method to extract the numeric part of the filename from a Path object // Returns -1 if the filename is not a valid number private static int extractNumberFromPath(Path path) { diff --git a/simulator/src/main/java/com/hedera/block/simulator/generator/BlockAsFileBlockStreamManager.java b/simulator/src/main/java/com/hedera/block/simulator/generator/BlockAsFileBlockStreamManager.java index d973b71ba..56dba5765 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/generator/BlockAsFileBlockStreamManager.java +++ b/simulator/src/main/java/com/hedera/block/simulator/generator/BlockAsFileBlockStreamManager.java @@ -16,6 +16,7 @@ package com.hedera.block.simulator.generator; +import static com.hedera.block.simulator.generator.Utils.readFileBytes; import static java.lang.System.Logger.Level.DEBUG; import static java.lang.System.Logger.Level.ERROR; import static java.lang.System.Logger.Level.INFO; @@ -108,13 +109,10 @@ private void loadBlocks() throws IOException, ParseException { for (Path blockPath : sortedBlockFiles) { - byte[] blockBytes; - if (blockPath.toString().endsWith(".gz")) { - blockBytes = Utils.readGzFile(blockPath); - } else if (blockPath.toString().endsWith(".blk")) { - blockBytes = Files.readAllBytes(blockPath); - } else { - throw new IllegalArgumentException("Invalid file format: " + blockPath); + byte[] blockBytes = readFileBytes(blockPath); + // skip if block is null, usually due to SO files like .DS_STORE + if (blockBytes == null) { + continue; } Block block = Block.PROTOBUF.parse(Bytes.wrap(blockBytes)); diff --git a/simulator/src/main/java/com/hedera/block/simulator/generator/BlockAsFileLargeDataSets.java b/simulator/src/main/java/com/hedera/block/simulator/generator/BlockAsFileLargeDataSets.java new file mode 100644 index 000000000..c0c02a6b2 --- /dev/null +++ b/simulator/src/main/java/com/hedera/block/simulator/generator/BlockAsFileLargeDataSets.java @@ -0,0 +1,95 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.simulator.generator; + +import static com.hedera.block.simulator.generator.Utils.readFileBytes; +import static java.lang.System.Logger.Level.INFO; + +import com.hedera.block.simulator.config.data.BlockStreamConfig; +import com.hedera.block.simulator.config.types.GenerationMode; +import com.hedera.hapi.block.stream.Block; +import com.hedera.hapi.block.stream.BlockItem; +import com.hedera.pbj.runtime.ParseException; +import com.hedera.pbj.runtime.io.buffer.Bytes; +import edu.umd.cs.findbugs.annotations.NonNull; +import java.io.File; +import java.io.IOException; +import javax.inject.Inject; + +/** A block stream manager that reads blocks from files in a directory. */ +public class BlockAsFileLargeDataSets implements BlockStreamManager { + + private final System.Logger LOGGER = System.getLogger(getClass().getName()); + + private final String blockstreamPath; + private int currentBlockIndex = 0; + private int currentBlockItemIndex = 0; + + private Block currentBlock = null; + private final String formatString; + + /** + * Constructs a new BlockAsFileLargeDataSets instance. + * + * @param config the block stream configuration + */ + @Inject + public BlockAsFileLargeDataSets(@NonNull BlockStreamConfig config) { + this.blockstreamPath = config.folderRootPath(); + this.formatString = "%0" + config.paddedLength() + "d" + config.fileExtension(); + } + + @Override + public GenerationMode getGenerationMode() { + return GenerationMode.DIR; + } + + @Override + public BlockItem getNextBlockItem() throws IOException, ParseException { + if (currentBlock != null && currentBlock.items().size() > currentBlockItemIndex) { + return currentBlock.items().get(currentBlockItemIndex++); + } else { + currentBlock = getNextBlock(); + if (currentBlock != null) { + currentBlockItemIndex = 0; // Reset for new block + return getNextBlockItem(); + } + } + + return null; // No more blocks/items + } + + @Override + public Block getNextBlock() throws IOException, ParseException { + currentBlockIndex++; + + String nextBlockFileName = String.format(formatString, currentBlockIndex); + File blockFile = new File(blockstreamPath, nextBlockFileName); + + if (blockFile.exists()) { + byte[] blockBytes = readFileBytes(blockFile.toPath()); + + LOGGER.log(INFO, "Loading block: " + blockFile.getName()); + + Block block = Block.PROTOBUF.parse(Bytes.wrap(blockBytes)); + LOGGER.log(INFO, "block loaded with items size= " + block.items().size()); + return block; + } + + return null; // No more blocks found + } +} diff --git a/simulator/src/main/java/com/hedera/block/simulator/generator/BlockStreamManager.java b/simulator/src/main/java/com/hedera/block/simulator/generator/BlockStreamManager.java index ea8b75f5b..b9d7cdba2 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/generator/BlockStreamManager.java +++ b/simulator/src/main/java/com/hedera/block/simulator/generator/BlockStreamManager.java @@ -19,6 +19,8 @@ import com.hedera.block.simulator.config.types.GenerationMode; import com.hedera.hapi.block.stream.Block; import com.hedera.hapi.block.stream.BlockItem; +import com.hedera.pbj.runtime.ParseException; +import java.io.IOException; /** The block stream manager interface. */ public interface BlockStreamManager { @@ -34,13 +36,17 @@ public interface BlockStreamManager { * Get the next block item. * * @return the next block item + * @throws IOException if an I/O error occurs + * @throws ParseException if a parse error occurs */ - BlockItem getNextBlockItem(); + BlockItem getNextBlockItem() throws IOException, ParseException; /** * Get the next block. * * @return the next block + * @throws IOException if an I/O error occurs + * @throws ParseException if a parse error occurs */ - Block getNextBlock(); + Block getNextBlock() throws IOException, ParseException; } diff --git a/simulator/src/main/java/com/hedera/block/simulator/generator/GeneratorInjectionModule.java b/simulator/src/main/java/com/hedera/block/simulator/generator/GeneratorInjectionModule.java index 62b7451e4..f77f816ae 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/generator/GeneratorInjectionModule.java +++ b/simulator/src/main/java/com/hedera/block/simulator/generator/GeneratorInjectionModule.java @@ -39,6 +39,8 @@ static BlockStreamManager providesBlockStreamManager(BlockStreamConfig config) { if ("BlockAsDirBlockStreamManager".equalsIgnoreCase(config.managerImplementation())) { return new BlockAsDirBlockStreamManager(config); + } else if ("BlockAsFileLargeDataSets".equalsIgnoreCase(config.managerImplementation())) { + return new BlockAsFileLargeDataSets(config); } return new BlockAsFileBlockStreamManager(config); diff --git a/simulator/src/main/java/com/hedera/block/simulator/generator/Utils.java b/simulator/src/main/java/com/hedera/block/simulator/generator/Utils.java index f55da9c54..281fcfc5d 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/generator/Utils.java +++ b/simulator/src/main/java/com/hedera/block/simulator/generator/Utils.java @@ -16,6 +16,8 @@ package com.hedera.block.simulator.generator; +import com.hedera.block.simulator.Constants; +import edu.umd.cs.findbugs.annotations.NonNull; import java.io.IOException; import java.io.InputStream; import java.nio.file.Files; @@ -34,10 +36,26 @@ private Utils() {} * @return byte array of the content of the GZIP file * @throws IOException if an I/O error occurs */ - public static byte[] readGzFile(Path filePath) throws IOException { + public static byte[] readGzFile(@NonNull Path filePath) throws IOException { try (InputStream fileInputStream = Files.newInputStream(filePath); GZIPInputStream gzipInputStream = new GZIPInputStream(fileInputStream)) { return gzipInputStream.readAllBytes(); } } + + /** + * Read a file and return the content as a byte array. + * + * @param filePath Path to the file + * @return byte array of the content of the file or null if the file extension is not supported + * @throws IOException if an I/O error occurs + */ + public static byte[] readFileBytes(@NonNull Path filePath) throws IOException { + if (filePath.toString().endsWith(Constants.GZ_EXTENSION)) { + return Utils.readGzFile(filePath); + } else if (filePath.toString().endsWith(Constants.RECORD_EXTENSION)) { + return Files.readAllBytes(filePath); + } + return null; + } } diff --git a/simulator/src/main/resources/app.properties b/simulator/src/main/resources/app.properties index 4cff5ac6d..b030a898a 100644 --- a/simulator/src/main/resources/app.properties +++ b/simulator/src/main/resources/app.properties @@ -1,3 +1,4 @@ -#blockStream.delayBetweenBlockItems=2000000 -#blockStream.folderRootPath=/Users/user/Projects/hedera-block-node/server/FakeProducedBlock -#blockStream.managerImplementation=BlockAsDirBlockStreamManager +#blockStream.delayBetweenBlockItems=3_000_000 +#blockStream.folderRootPath=/Users/user/Downloads/block-0.0.3-perf1 +#blockStream.managerImplementation=BlockAsFileLargeDataSets +#blockStream.maxBlockItemsToStream=100_000_000 diff --git a/simulator/src/test/java/com/hedera/block/simulator/BlockStreamSimulatorTest.java b/simulator/src/test/java/com/hedera/block/simulator/BlockStreamSimulatorTest.java index bd646cf53..37de86ff8 100644 --- a/simulator/src/test/java/com/hedera/block/simulator/BlockStreamSimulatorTest.java +++ b/simulator/src/test/java/com/hedera/block/simulator/BlockStreamSimulatorTest.java @@ -17,17 +17,22 @@ package com.hedera.block.simulator; import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.when; import com.hedera.block.simulator.generator.BlockStreamManager; import com.hedera.block.simulator.grpc.PublishStreamGrpcClient; +import com.hedera.hapi.block.stream.BlockItem; +import com.hedera.pbj.runtime.ParseException; import com.swirlds.config.api.Configuration; import java.io.IOException; +import java.nio.file.Paths; import java.util.Map; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; @ExtendWith(MockitoExtension.class) @@ -58,11 +63,39 @@ void tearDown() { } @Test - void start_logsStartedMessage() throws InterruptedException { + void start_logsStartedMessage() throws InterruptedException, ParseException, IOException { blockStreamSimulator.start(); assertTrue(blockStreamSimulator.isRunning()); } + @Test + void start_exitByBlockNull() throws InterruptedException, ParseException, IOException { + + BlockStreamManager blockStreamManager = Mockito.mock(BlockStreamManager.class); + when(blockStreamManager.getNextBlockItem()).thenReturn(BlockItem.newBuilder().build()); + + Configuration configuration = + TestUtils.getTestConfiguration( + Map.of( + "blockStream.maxBlockItemsToStream", + "2", + "blockStream.BlockAsFileBlockStreamManager", + "BlockAsFileLargeDataSets", + "blockStream.rootPath", + getAbsoluteFolder("src/test/resources/block-0.0.3-blk/"))); + + BlockStreamSimulatorApp blockStreamSimulator = + new BlockStreamSimulatorApp( + configuration, blockStreamManager, publishStreamGrpcClient); + + blockStreamSimulator.start(); + assertTrue(blockStreamSimulator.isRunning()); + } + + private String getAbsoluteFolder(String relativePath) { + return Paths.get(relativePath).toAbsolutePath().toString(); + } + @Test void stop_doesNotThrowException() { assertDoesNotThrow(() -> blockStreamSimulator.stop()); diff --git a/simulator/src/test/java/com/hedera/block/simulator/config/data/BlockStreamConfigTest.java b/simulator/src/test/java/com/hedera/block/simulator/config/data/BlockStreamConfigTest.java index 940c4edb5..36c7f4128 100644 --- a/simulator/src/test/java/com/hedera/block/simulator/config/data/BlockStreamConfigTest.java +++ b/simulator/src/test/java/com/hedera/block/simulator/config/data/BlockStreamConfigTest.java @@ -29,6 +29,8 @@ class BlockStreamConfigTest { private final int delayBetweenBlockItems = 1_500_000; private final String blockStreamManagerImplementation = "BlockAsFileBlockStreamManager"; private final int maxBlockItemsToStream = 10_000; + private final int paddedLength = 36; + private final String fileExtension = ".blk"; private String getAbsoluteFolder(String relativePath) { return Paths.get(relativePath).toAbsolutePath().toString(); @@ -52,7 +54,9 @@ void testValidAbsolutePath() { folderRootPath, delayBetweenBlockItems, blockStreamManagerImplementation, - maxBlockItemsToStream); + maxBlockItemsToStream, + paddedLength, + fileExtension); assertEquals(folderRootPath, config.folderRootPath()); assertEquals(GenerationMode.DIR, config.generationMode()); @@ -71,7 +75,9 @@ void testEmptyFolderRootPath() { folderRootPath, delayBetweenBlockItems, blockStreamManagerImplementation, - maxBlockItemsToStream); + maxBlockItemsToStream, + paddedLength, + fileExtension); // Verify that the path is set to the default Path expectedPath = Paths.get("src/main/resources/block-0.0.3/").toAbsolutePath(); @@ -95,7 +101,9 @@ void testRelativeFolderPathThrowsException() { relativeFolderPath, delayBetweenBlockItems, blockStreamManagerImplementation, - maxBlockItemsToStream)); + maxBlockItemsToStream, + paddedLength, + fileExtension)); // Verify the exception message assertEquals(relativeFolderPath + " Root path must be absolute", exception.getMessage()); @@ -121,7 +129,9 @@ void testNonExistentFolderThrowsException() { folderRootPath, delayBetweenBlockItems, blockStreamManagerImplementation, - maxBlockItemsToStream)); + maxBlockItemsToStream, + paddedLength, + fileExtension)); // Verify the exception message assertEquals("Folder does not exist: " + path, exception.getMessage()); @@ -140,7 +150,9 @@ void testGenerationModeNonDirDoesNotCheckFolderExistence() { folderRootPath, delayBetweenBlockItems, blockStreamManagerImplementation, - maxBlockItemsToStream); + maxBlockItemsToStream, + paddedLength, + fileExtension); // Verify that the configuration was created successfully assertEquals(folderRootPath, config.folderRootPath()); diff --git a/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsDirBlockStreamManagerTest.java b/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsDirBlockStreamManagerTest.java index 1fb6d3b7a..1c391fea7 100644 --- a/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsDirBlockStreamManagerTest.java +++ b/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsDirBlockStreamManagerTest.java @@ -22,6 +22,8 @@ import com.hedera.block.simulator.config.data.BlockStreamConfig; import com.hedera.block.simulator.config.types.GenerationMode; +import com.hedera.pbj.runtime.ParseException; +import java.io.IOException; import java.nio.file.Paths; import org.junit.jupiter.api.Test; @@ -43,7 +45,7 @@ void getGenerationMode() { } @Test - void getNextBlockItem() { + void getNextBlockItem() throws IOException, ParseException { BlockStreamManager blockStreamManager = getBlockAsDirBlockStreamManager(getAbsoluteFolder(rootFolder)); @@ -53,7 +55,7 @@ void getNextBlockItem() { } @Test - void getNextBlock() { + void getNextBlock() throws IOException, ParseException { BlockStreamManager blockStreamManager = getBlockAsDirBlockStreamManager(getAbsoluteFolder(rootFolder)); @@ -78,7 +80,9 @@ private BlockStreamManager getBlockAsDirBlockStreamManager(String rootFolder) { rootFolder, 1_500_000, "BlockAsDirBlockStreamManager", - 10_000); + 10_000, + 36, + ".blk"); return new BlockAsDirBlockStreamManager(blockStreamConfig); } } diff --git a/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsFileBlockStreamManagerTest.java b/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsFileBlockStreamManagerTest.java index d22ca15c4..9c207f117 100644 --- a/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsFileBlockStreamManagerTest.java +++ b/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsFileBlockStreamManagerTest.java @@ -20,6 +20,8 @@ import com.hedera.block.simulator.config.data.BlockStreamConfig; import com.hedera.block.simulator.config.types.GenerationMode; +import com.hedera.pbj.runtime.ParseException; +import java.io.IOException; import java.nio.file.Paths; import org.junit.jupiter.api.Test; @@ -39,7 +41,7 @@ void getGenerationMode() { } @Test - void getNextBlock() { + void getNextBlock() throws IOException, ParseException { BlockStreamManager blockStreamManager = getBlockAsFileBlockStreamManager(getAbsoluteFolder(gzRootFolder)); for (int i = 0; i < 3000; i++) { @@ -48,7 +50,7 @@ void getNextBlock() { } @Test - void getNextBlockItem() { + void getNextBlockItem() throws IOException, ParseException { BlockStreamManager blockStreamManager = getBlockAsFileBlockStreamManager(getAbsoluteFolder(gzRootFolder)); for (int i = 0; i < 35000; i++) { @@ -57,7 +59,7 @@ void getNextBlockItem() { } @Test - void loadBlockBlk() { + void loadBlockBlk() throws IOException, ParseException { String blkRootFolder = "src/test/resources/block-0.0.3-blk/"; BlockStreamManager blockStreamManager = getBlockAsFileBlockStreamManager(getAbsoluteFolder(blkRootFolder)); @@ -66,7 +68,11 @@ void loadBlockBlk() { @Test void BlockAsFileBlockStreamManagerInvalidRootPath() { - assertThrows(RuntimeException.class, () -> getBlockAsFileBlockStreamManager("/etc")); + assertThrows( + RuntimeException.class, + () -> + getBlockAsFileBlockStreamManager( + getAbsoluteFolder("src/test/resources/BlockAsDirException/1/"))); } private BlockAsFileBlockStreamManager getBlockAsFileBlockStreamManager(String rootFolder) { @@ -76,7 +82,9 @@ private BlockAsFileBlockStreamManager getBlockAsFileBlockStreamManager(String ro rootFolder, 1_500_000, "BlockAsFileBlockStreamManager", - 10_000); + 10_000, + 36, + ".blk"); return new BlockAsFileBlockStreamManager(blockStreamConfig); } } diff --git a/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsFileLargeDataSetsTest.java b/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsFileLargeDataSetsTest.java new file mode 100644 index 000000000..87639a0e4 --- /dev/null +++ b/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsFileLargeDataSetsTest.java @@ -0,0 +1,106 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.simulator.generator; + +import static org.junit.jupiter.api.Assertions.*; + +import com.hedera.block.simulator.config.data.BlockStreamConfig; +import com.hedera.block.simulator.config.types.GenerationMode; +import com.hedera.hapi.block.stream.BlockItem; +import com.hedera.pbj.runtime.ParseException; +import java.io.File; +import java.io.IOException; +import java.nio.file.Paths; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +class BlockAsFileLargeDataSetsTest { + + private static final String rootFolder = "src/test/resources/block-0.0.3-blk/"; + private static int filesInFolder; + + @BeforeAll + static void setUp() { + filesInFolder = getFilesInFolder(getAbsoluteFolder(rootFolder)); + } + + @AfterEach + void tearDown() {} + + @Test + void getGenerationMode() { + BlockStreamManager blockStreamManager = + getBlockAsFileLargeDatasetsBlockStreamManager(getAbsoluteFolder(rootFolder)); + + assertEquals(GenerationMode.DIR, blockStreamManager.getGenerationMode()); + } + + @Test + void getNextBlock() throws IOException, ParseException { + BlockStreamManager blockStreamManager = + getBlockAsFileLargeDatasetsBlockStreamManager(getAbsoluteFolder(rootFolder)); + for (int i = 0; i < filesInFolder; i++) { + assertNotNull(blockStreamManager.getNextBlock()); + } + + assertNull(blockStreamManager.getNextBlock()); + } + + @Test + void getNextBlockItem() throws IOException, ParseException { + BlockStreamManager blockStreamManager = + getBlockAsFileLargeDatasetsBlockStreamManager(getAbsoluteFolder(rootFolder)); + + while (true) { + BlockItem blockItem = blockStreamManager.getNextBlockItem(); + if (blockItem == null) { + break; + } + assertNotNull(blockItem); + } + } + + private BlockAsFileLargeDataSets getBlockAsFileLargeDatasetsBlockStreamManager( + String rootFolder) { + BlockStreamConfig blockStreamConfig = + new BlockStreamConfig( + GenerationMode.DIR, + rootFolder, + 1_500_000, + "BlockAsFileBlockStreamManager", + 10_000, + 36, + ".blk"); + return new BlockAsFileLargeDataSets(blockStreamConfig); + } + + private static String getAbsoluteFolder(String relativePath) { + return Paths.get(relativePath).toAbsolutePath().toString(); + } + + private static int getFilesInFolder(String absolutePath) { + File folder = new File(absolutePath); + File[] blkFiles = + folder.listFiles( + file -> + file.isFile() + && (file.getName().endsWith(".blk") + || file.getName().endsWith(".blk.gz"))); + return blkFiles.length; + } +} diff --git a/simulator/src/test/java/com/hedera/block/simulator/generator/GeneratorInjectionModuleTest.java b/simulator/src/test/java/com/hedera/block/simulator/generator/GeneratorInjectionModuleTest.java index f47296900..cf6a069f5 100644 --- a/simulator/src/test/java/com/hedera/block/simulator/generator/GeneratorInjectionModuleTest.java +++ b/simulator/src/test/java/com/hedera/block/simulator/generator/GeneratorInjectionModuleTest.java @@ -26,6 +26,23 @@ class GeneratorInjectionModuleTest { + @Test + void providesBlockStreamManager_AsFileLargeDataSets() throws IOException { + + BlockStreamConfig blockStreamConfig = + TestUtils.getTestConfiguration( + Map.of( + "blockStream.managerImplementation", + "BlockAsFileLargeDataSets")) + .getConfigData(BlockStreamConfig.class); + + BlockStreamManager blockStreamManager = + GeneratorInjectionModule.providesBlockStreamManager(blockStreamConfig); + + assertEquals( + blockStreamManager.getClass().getName(), BlockAsFileLargeDataSets.class.getName()); + } + @Test void providesBlockStreamManager_AsFile() throws IOException { BlockStreamConfig blockStreamConfig = diff --git a/simulator/src/test/resources/block-0.0.3-blk/000000000000000000000000000000000008.blk b/simulator/src/test/resources/block-0.0.3-blk/000000000000000000000000000000000008.blk new file mode 100644 index 000000000..e69de29bb diff --git a/simulator/src/test/resources/block-0.0.3-blk/notABlock.txt b/simulator/src/test/resources/block-0.0.3-blk/notABlock.txt new file mode 100644 index 000000000..3c873ebe4 --- /dev/null +++ b/simulator/src/test/resources/block-0.0.3-blk/notABlock.txt @@ -0,0 +1 @@ +not a block, just ignore this file.