From 25c5ce6e305294283365a400029e4252bf353401 Mon Sep 17 00:00:00 2001 From: Alfredo Gutierrez Date: Thu, 10 Oct 2024 15:39:49 -0600 Subject: [PATCH] Improvements to how the simulator is able to stream to the BlockNode, this commit creates a StreamingMode enum, with 2 values, CONSTANT_RATE and MILLIS_PER_BLOCK. CONSTANT_RATE is the streaming mode that we have been using, that streams a block item, then waits X amount of NS, then streams another blockItem. MILLIS_PER_BLOCK, does a lot less thread interruptions, and has a more realistic behaviour, since is going to stream the whole block, as fast as it can, and then going to sleep for X-(time it took to stream) being X the target of block production, by default 1 block is produced each second, so it will attempt to stream 1 block per every 1000 ms, however if the block takes more than 1000 ms to stream, it wont sleep at all, but WARN of a potential issue. Added documentation for explaining the changes Signed-off-by: Alfredo Gutierrez --- simulator/README.md | 20 +-- .../simulator/BlockStreamSimulatorApp.java | 79 +++++++--- .../com/hedera/block/simulator/Constants.java | 9 +- .../config/data/BlockStreamConfig.java | 7 +- .../simulator/config/types/StreamingMode.java | 41 ++++++ .../simulator/BlockStreamSimulatorTest.java | 138 +++++++++++++++++- .../config/data/BlockStreamConfigTest.java | 23 ++- .../config/types/StreamingModeTest.java | 29 ++++ .../BlockAsDirBlockStreamManagerTest.java | 5 +- .../BlockAsFileBlockStreamManagerTest.java | 5 +- .../BlockAsFileLargeDataSetsTest.java | 5 +- 11 files changed, 315 insertions(+), 46 deletions(-) create mode 100644 simulator/src/main/java/com/hedera/block/simulator/config/types/StreamingMode.java create mode 100644 simulator/src/test/java/com/hedera/block/simulator/config/types/StreamingModeTest.java diff --git a/simulator/README.md b/simulator/README.md index 0851d64c6..99084be72 100644 --- a/simulator/README.md +++ b/simulator/README.md @@ -38,15 +38,17 @@ There are 2 configuration sets: ### BlockStreamConfig Uses the prefix `blockStream` so all properties should start with `blockStream.` -| Key | Description | Default Value | -|--------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------| -| `generationMode` | The desired generation Mode to use, it can only be `DIR` or `AD_HOC` | `DIR` | -| `folderRootPath` | If the generationMode is DIR this will be used as the source of the recording to stream to the Block-Node | `` | -| `delayBetweenBlockItems` | The delay between each block item in nanoseconds | `1_500_000` | -| `managerImplementation` | The desired implementation of the BlockStreamManager to use, it can only be `BlockAsDirBlockStreamManager`, `BlockAsFileBlockStreamManager` or `BlockAsFileLargeDataSets` | `BlockAsFileBlockStreamManager` | -| `maxBlockItemsToStream` | exit condition for the simulator and the circular implementations such as `BlockAsDir` or `BlockAsFile` implementations | `10_000` | -| `paddedLength` | on the `BlockAsFileLargeDataSets` implementation, the length of the padded left zeroes `000001.blk.gz` | 36 | -| `fileExtension` | on the `BlockAsFileLargeDataSets` implementation, the extension of the files to be streamed | `.blk.gz` | +| Key | Description | Default Value | +|--------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------| +| `generationMode` | The desired generation Mode to use, it can only be `DIR` or `AD_HOC` | `DIR` | +| `folderRootPath` | If the generationMode is DIR this will be used as the source of the recording to stream to the Block-Node | `` | +| `delayBetweenBlockItems` | The delay between each block item in nanoseconds, only applicable when streamingMode=CONSTANT_RATE | `1_500_000` | +| `managerImplementation` | The desired implementation of the BlockStreamManager to use, it can only be `BlockAsDirBlockStreamManager`, `BlockAsFileBlockStreamManager` or `BlockAsFileLargeDataSets` | `BlockAsFileBlockStreamManager` | +| `maxBlockItemsToStream` | exit condition for the simulator and the circular implementations such as `BlockAsDir` or `BlockAsFile` implementations | `10_000` | +| `paddedLength` | on the `BlockAsFileLargeDataSets` implementation, the length of the padded left zeroes `000001.blk.gz` | 36 | +| `fileExtension` | on the `BlockAsFileLargeDataSets` implementation, the extension of the files to be streamed | `.blk.gz` | +| `streamingMode` | can either be `CONSTANT_RATE` or `MILLIS_PER_BLOCK`, if `CONSTANT_RATE` | `CONSTANT_RATE` | +| `millisecondsPerBlock` | if streamingMode is `MILLIS_PER_BLOCK` this will be the time to wait between blocks in milliseconds | `1_000` | ### GrpcConfig Uses the prefix `grpc` so all properties should start with `grpc.` diff --git a/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulatorApp.java b/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulatorApp.java index 391380fc3..59f7532e7 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulatorApp.java +++ b/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulatorApp.java @@ -17,8 +17,10 @@ package com.hedera.block.simulator; import com.hedera.block.simulator.config.data.BlockStreamConfig; +import com.hedera.block.simulator.config.types.StreamingMode; import com.hedera.block.simulator.generator.BlockStreamManager; import com.hedera.block.simulator.grpc.PublishStreamGrpcClient; +import com.hedera.hapi.block.stream.Block; import com.hedera.hapi.block.stream.BlockItem; import com.hedera.pbj.runtime.ParseException; import com.swirlds.config.api.Configuration; @@ -32,13 +34,15 @@ public class BlockStreamSimulatorApp { private static final System.Logger LOGGER = System.getLogger(BlockStreamSimulatorApp.class.getName()); - Configuration configuration; - BlockStreamManager blockStreamManager; - PublishStreamGrpcClient publishStreamGrpcClient; - BlockStreamConfig blockStreamConfig; + private final BlockStreamManager blockStreamManager; + private final PublishStreamGrpcClient publishStreamGrpcClient; + private final BlockStreamConfig blockStreamConfig; + private final StreamingMode streamingMode; private final int delayBetweenBlockItems; + private final int millisecondsPerBlock; + boolean isRunning = false; /** @@ -53,12 +57,13 @@ public BlockStreamSimulatorApp( @NonNull Configuration configuration, @NonNull BlockStreamManager blockStreamManager, @NonNull PublishStreamGrpcClient publishStreamGrpcClient) { - this.configuration = configuration; this.blockStreamManager = blockStreamManager; this.publishStreamGrpcClient = publishStreamGrpcClient; blockStreamConfig = configuration.getConfigData(BlockStreamConfig.class); + streamingMode = blockStreamConfig.streamingMode(); + millisecondsPerBlock = blockStreamConfig.millisecondsPerBlock(); delayBetweenBlockItems = blockStreamConfig.delayBetweenBlockItems(); } @@ -70,12 +75,37 @@ public BlockStreamSimulatorApp( * @throws IOException if an I/O error occurs */ public void start() throws InterruptedException, ParseException, IOException { - int delayMSBetweenBlockItems = delayBetweenBlockItems / 1_000_000; - int delayNSBetweenBlockItems = delayBetweenBlockItems % 1_000_000; isRunning = true; LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator has started"); + if (streamingMode == StreamingMode.MILLIS_PER_BLOCK) { + millisPerBlockStreaming(); + } else { + constantRateStreaming(); + } + + LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator has stopped"); + } + + /** + * Returns whether the block stream simulator is running. + * + * @return true if the block stream simulator is running, false otherwise + */ + public boolean isRunning() { + return isRunning; + } + + /** Stops the block stream simulator. */ + public void stop() { + isRunning = false; + LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator has stopped"); + } + + private void constantRateStreaming() throws InterruptedException, IOException, ParseException { + int delayMSBetweenBlockItems = delayBetweenBlockItems / 1_000_000; + int delayNSBetweenBlockItems = delayBetweenBlockItems % 1_000_000; boolean streamBlockItem = true; int blockItemsStreamed = 0; @@ -103,22 +133,29 @@ public void start() throws InterruptedException, ParseException, IOException { streamBlockItem = false; } } - - LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator has stopped"); } - /** - * Returns whether the block stream simulator is running. - * - * @return true if the block stream simulator is running, false otherwise - */ - public boolean isRunning() { - return isRunning; - } + private void millisPerBlockStreaming() + throws IOException, ParseException, InterruptedException { - /** Stops the block stream simulator. */ - public void stop() { - isRunning = false; - LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator has stopped"); + final long secondsPerBlockNanos = millisecondsPerBlock * 1_000_000L; + + Block nextBlock = blockStreamManager.getNextBlock(); + while (nextBlock != null) { + long startTime = System.nanoTime(); + publishStreamGrpcClient.streamBlock(nextBlock); + long elapsedTime = System.nanoTime() - startTime; + long timeToDelay = secondsPerBlockNanos - elapsedTime; + if (timeToDelay > 0) { + Thread.sleep(timeToDelay / 1_000_000, (int) (timeToDelay % 1_000_000)); + } else { + LOGGER.log( + System.Logger.Level.WARNING, + "Block Server is running behind, Streaming took longer than max expected: " + + millisecondsPerBlock + + " milliseconds"); + } + nextBlock = blockStreamManager.getNextBlock(); + } } } diff --git a/simulator/src/main/java/com/hedera/block/simulator/Constants.java b/simulator/src/main/java/com/hedera/block/simulator/Constants.java index 9b4bda17c..28e4da33f 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/Constants.java +++ b/simulator/src/main/java/com/hedera/block/simulator/Constants.java @@ -16,10 +16,15 @@ package com.hedera.block.simulator; +/** The Constants class defines the constants for the block simulator. */ public class Constants { - // The file extension for block files. + /** Constructor to prevent instantiation. this is only a utility class */ + private Constants() {} + + /** The file extension for block files. */ public static final String RECORD_EXTENSION = "blk"; - // postfix for gzipped files + + /** postfix for gzip files */ public static final String GZ_EXTENSION = ".gz"; } diff --git a/simulator/src/main/java/com/hedera/block/simulator/config/data/BlockStreamConfig.java b/simulator/src/main/java/com/hedera/block/simulator/config/data/BlockStreamConfig.java index 7c97934aa..ecc6d50aa 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/config/data/BlockStreamConfig.java +++ b/simulator/src/main/java/com/hedera/block/simulator/config/data/BlockStreamConfig.java @@ -17,6 +17,7 @@ package com.hedera.block.simulator.config.data; import com.hedera.block.simulator.config.types.GenerationMode; +import com.hedera.block.simulator.config.types.StreamingMode; import com.swirlds.config.api.ConfigData; import com.swirlds.config.api.ConfigProperty; import java.nio.file.Files; @@ -33,6 +34,8 @@ * @param maxBlockItemsToStream the maximum number of block items to stream * @param paddedLength the padded length of 0 the block file format * @param fileExtension the file extension of the block file format + * @param streamingMode the mode of streaming for the block stream + * @param millisecondsPerBlock the milliseconds per block */ @ConfigData("blockStream") public record BlockStreamConfig( @@ -43,7 +46,9 @@ public record BlockStreamConfig( String managerImplementation, @ConfigProperty(defaultValue = "10_000") int maxBlockItemsToStream, @ConfigProperty(defaultValue = "36") int paddedLength, - @ConfigProperty(defaultValue = ".blk.gz") String fileExtension) { + @ConfigProperty(defaultValue = ".blk.gz") String fileExtension, + @ConfigProperty(defaultValue = "MILLIS_PER_BLOCK") StreamingMode streamingMode, + @ConfigProperty(defaultValue = "1000") int millisecondsPerBlock) { /** * Constructor to set the default root path if not provided, it will be set to the data diff --git a/simulator/src/main/java/com/hedera/block/simulator/config/types/StreamingMode.java b/simulator/src/main/java/com/hedera/block/simulator/config/types/StreamingMode.java new file mode 100644 index 000000000..39ccd46d3 --- /dev/null +++ b/simulator/src/main/java/com/hedera/block/simulator/config/types/StreamingMode.java @@ -0,0 +1,41 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.simulator.config.types; + +/** The StreamingMode enum defines the different modes for streaming blocks. */ +public enum StreamingMode { + + /** It will wait X Nanos between each block. */ + CONSTANT_RATE, + + /** It will attempt to send a block each X Millis. */ + MILLIS_PER_BLOCK; + + /** + * Converts a string to a StreamingMode. + * + * @param mode the string to convert + * @return the StreamingMode + */ + public static StreamingMode fromString(String mode) { + return switch (mode) { + case "CONSTANT_RATE" -> CONSTANT_RATE; + case "MILLIS_PER_BLOCK" -> MILLIS_PER_BLOCK; + default -> throw new IllegalArgumentException("Invalid mode: " + mode); + }; + } +} diff --git a/simulator/src/test/java/com/hedera/block/simulator/BlockStreamSimulatorTest.java b/simulator/src/test/java/com/hedera/block/simulator/BlockStreamSimulatorTest.java index 37de86ff8..13314b78b 100644 --- a/simulator/src/test/java/com/hedera/block/simulator/BlockStreamSimulatorTest.java +++ b/simulator/src/test/java/com/hedera/block/simulator/BlockStreamSimulatorTest.java @@ -21,12 +21,19 @@ import com.hedera.block.simulator.generator.BlockStreamManager; import com.hedera.block.simulator.grpc.PublishStreamGrpcClient; +import com.hedera.hapi.block.stream.Block; import com.hedera.hapi.block.stream.BlockItem; +import com.hedera.hapi.block.stream.output.BlockHeader; import com.hedera.pbj.runtime.ParseException; import com.swirlds.config.api.Configuration; import java.io.IOException; import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; import java.util.Map; +import java.util.logging.Handler; +import java.util.logging.LogRecord; +import java.util.logging.Logger; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -38,8 +45,6 @@ @ExtendWith(MockitoExtension.class) class BlockStreamSimulatorTest { - private Configuration configuration; - @Mock private BlockStreamManager blockStreamManager; @Mock private PublishStreamGrpcClient publishStreamGrpcClient; @@ -49,8 +54,13 @@ class BlockStreamSimulatorTest { @BeforeEach void setUp() throws IOException { - configuration = - TestUtils.getTestConfiguration(Map.of("blockStream.maxBlockItemsToStream", "100")); + Configuration configuration = + TestUtils.getTestConfiguration( + Map.of( + "blockStream.maxBlockItemsToStream", + "100", + "blockStream.streamingMode", + "CONSTANT_RATE")); blockStreamSimulator = new BlockStreamSimulatorApp( @@ -82,7 +92,9 @@ void start_exitByBlockNull() throws InterruptedException, ParseException, IOExce "blockStream.BlockAsFileBlockStreamManager", "BlockAsFileLargeDataSets", "blockStream.rootPath", - getAbsoluteFolder("src/test/resources/block-0.0.3-blk/"))); + getAbsoluteFolder("src/test/resources/block-0.0.3-blk/"), + "blockStream.streamingMode", + "CONSTANT_RATE")); BlockStreamSimulatorApp blockStreamSimulator = new BlockStreamSimulatorApp( @@ -100,4 +112,120 @@ private String getAbsoluteFolder(String relativePath) { void stop_doesNotThrowException() { assertDoesNotThrow(() -> blockStreamSimulator.stop()); } + + @Test + void start_millisPerSecond() throws InterruptedException, ParseException, IOException { + BlockStreamManager blockStreamManager = Mockito.mock(BlockStreamManager.class); + BlockItem blockItem = + BlockItem.newBuilder() + .blockHeader(BlockHeader.newBuilder().number(1L).build()) + .build(); + Block block = Block.newBuilder().items(blockItem).build(); + when(blockStreamManager.getNextBlock()).thenReturn(block, block, null); + + Configuration configuration = + TestUtils.getTestConfiguration( + Map.of( + "blockStream.maxBlockItemsToStream", + "2", + "blockStream.BlockAsFileBlockStreamManager", + "BlockAsFileLargeDataSets", + "blockStream.rootPath", + getAbsoluteFolder("src/test/resources/block-0.0.3-blk/"), + "blockStream.streamingMode", + "MILLIS_PER_BLOCK")); + + BlockStreamSimulatorApp blockStreamSimulator = + new BlockStreamSimulatorApp( + configuration, blockStreamManager, publishStreamGrpcClient); + + blockStreamSimulator.start(); + assertTrue(blockStreamSimulator.isRunning()); + } + + @Test + void start_millisPerSecond_streamingLagVerifyWarnLog() + throws InterruptedException, ParseException, IOException { + List logRecords = captureLogs(); + + BlockStreamManager blockStreamManager = Mockito.mock(BlockStreamManager.class); + BlockItem blockItem = + BlockItem.newBuilder() + .blockHeader(BlockHeader.newBuilder().number(1L).build()) + .build(); + Block block = Block.newBuilder().items(blockItem).build(); + when(blockStreamManager.getNextBlock()).thenReturn(block, block, null); + PublishStreamGrpcClient publishStreamGrpcClient = + Mockito.mock(PublishStreamGrpcClient.class); + + // simulate that the first block takes 15ms to stream, when the limit is 10, to force to go + // over WARN Path. + when(publishStreamGrpcClient.streamBlock(block)) + .thenAnswer( + invocation -> { + Thread.sleep(15); + return true; + }) + .thenReturn(true); + + Configuration configuration = + TestUtils.getTestConfiguration( + Map.of( + "blockStream.maxBlockItemsToStream", + "2", + "blockStream.BlockAsFileBlockStreamManager", + "BlockAsFileLargeDataSets", + "blockStream.rootPath", + getAbsoluteFolder("src/test/resources/block-0.0.3-blk/"), + "blockStream.streamingMode", + "MILLIS_PER_BLOCK", + "blockStream.millisecondsPerBlock", + "10")); + + BlockStreamSimulatorApp blockStreamSimulator = + new BlockStreamSimulatorApp( + configuration, blockStreamManager, publishStreamGrpcClient); + + blockStreamSimulator.start(); + assertTrue(blockStreamSimulator.isRunning()); + + // Assert log exists + boolean found_log = + logRecords.stream() + .anyMatch( + logRecord -> + logRecord + .getMessage() + .contains( + "Block Server is running behind, Streaming" + + " took longer than max expected: 10" + + " milliseconds")); + assertTrue(found_log); + } + + private List captureLogs() { + // Capture logs + Logger logger = Logger.getLogger(BlockStreamSimulatorApp.class.getName()); + final List logRecords = new ArrayList<>(); + + // Custom handler to capture logs + Handler handler = + new Handler() { + @Override + public void publish(LogRecord record) { + logRecords.add(record); + } + + @Override + public void flush() {} + + @Override + public void close() throws SecurityException {} + }; + + // Add handler to logger + logger.addHandler(handler); + + return logRecords; + } } diff --git a/simulator/src/test/java/com/hedera/block/simulator/config/data/BlockStreamConfigTest.java b/simulator/src/test/java/com/hedera/block/simulator/config/data/BlockStreamConfigTest.java index 36c7f4128..7f7bf49fc 100644 --- a/simulator/src/test/java/com/hedera/block/simulator/config/data/BlockStreamConfigTest.java +++ b/simulator/src/test/java/com/hedera/block/simulator/config/data/BlockStreamConfigTest.java @@ -19,6 +19,7 @@ import static org.junit.jupiter.api.Assertions.*; import com.hedera.block.simulator.config.types.GenerationMode; +import com.hedera.block.simulator.config.types.StreamingMode; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; @@ -31,6 +32,8 @@ class BlockStreamConfigTest { private final int maxBlockItemsToStream = 10_000; private final int paddedLength = 36; private final String fileExtension = ".blk"; + private final StreamingMode streamingMode = StreamingMode.CONSTANT_RATE; + private final int millisPerBlock = 1000; private String getAbsoluteFolder(String relativePath) { return Paths.get(relativePath).toAbsolutePath().toString(); @@ -56,7 +59,9 @@ void testValidAbsolutePath() { blockStreamManagerImplementation, maxBlockItemsToStream, paddedLength, - fileExtension); + fileExtension, + streamingMode, + millisPerBlock); assertEquals(folderRootPath, config.folderRootPath()); assertEquals(GenerationMode.DIR, config.generationMode()); @@ -77,7 +82,9 @@ void testEmptyFolderRootPath() { blockStreamManagerImplementation, maxBlockItemsToStream, paddedLength, - fileExtension); + fileExtension, + streamingMode, + millisPerBlock); // Verify that the path is set to the default Path expectedPath = Paths.get("src/main/resources/block-0.0.3/").toAbsolutePath(); @@ -103,7 +110,9 @@ void testRelativeFolderPathThrowsException() { blockStreamManagerImplementation, maxBlockItemsToStream, paddedLength, - fileExtension)); + fileExtension, + streamingMode, + millisPerBlock)); // Verify the exception message assertEquals(relativeFolderPath + " Root path must be absolute", exception.getMessage()); @@ -131,7 +140,9 @@ void testNonExistentFolderThrowsException() { blockStreamManagerImplementation, maxBlockItemsToStream, paddedLength, - fileExtension)); + fileExtension, + streamingMode, + millisPerBlock)); // Verify the exception message assertEquals("Folder does not exist: " + path, exception.getMessage()); @@ -152,7 +163,9 @@ void testGenerationModeNonDirDoesNotCheckFolderExistence() { blockStreamManagerImplementation, maxBlockItemsToStream, paddedLength, - fileExtension); + fileExtension, + streamingMode, + millisPerBlock); // Verify that the configuration was created successfully assertEquals(folderRootPath, config.folderRootPath()); diff --git a/simulator/src/test/java/com/hedera/block/simulator/config/types/StreamingModeTest.java b/simulator/src/test/java/com/hedera/block/simulator/config/types/StreamingModeTest.java new file mode 100644 index 000000000..ca723e515 --- /dev/null +++ b/simulator/src/test/java/com/hedera/block/simulator/config/types/StreamingModeTest.java @@ -0,0 +1,29 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.simulator.config.types; + +import static org.junit.jupiter.api.Assertions.*; + +class StreamingModeTest { + + @org.junit.jupiter.api.Test + void fromString() { + assertEquals(StreamingMode.CONSTANT_RATE, StreamingMode.fromString("CONSTANT_RATE")); + assertEquals(StreamingMode.MILLIS_PER_BLOCK, StreamingMode.fromString("MILLIS_PER_BLOCK")); + assertThrows(IllegalArgumentException.class, () -> StreamingMode.fromString("INVALID")); + } +} diff --git a/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsDirBlockStreamManagerTest.java b/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsDirBlockStreamManagerTest.java index 1c391fea7..cebbe160e 100644 --- a/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsDirBlockStreamManagerTest.java +++ b/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsDirBlockStreamManagerTest.java @@ -22,6 +22,7 @@ import com.hedera.block.simulator.config.data.BlockStreamConfig; import com.hedera.block.simulator.config.types.GenerationMode; +import com.hedera.block.simulator.config.types.StreamingMode; import com.hedera.pbj.runtime.ParseException; import java.io.IOException; import java.nio.file.Paths; @@ -82,7 +83,9 @@ private BlockStreamManager getBlockAsDirBlockStreamManager(String rootFolder) { "BlockAsDirBlockStreamManager", 10_000, 36, - ".blk"); + ".blk", + StreamingMode.CONSTANT_RATE, + 1000); return new BlockAsDirBlockStreamManager(blockStreamConfig); } } diff --git a/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsFileBlockStreamManagerTest.java b/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsFileBlockStreamManagerTest.java index 9c207f117..c9e6172c8 100644 --- a/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsFileBlockStreamManagerTest.java +++ b/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsFileBlockStreamManagerTest.java @@ -20,6 +20,7 @@ import com.hedera.block.simulator.config.data.BlockStreamConfig; import com.hedera.block.simulator.config.types.GenerationMode; +import com.hedera.block.simulator.config.types.StreamingMode; import com.hedera.pbj.runtime.ParseException; import java.io.IOException; import java.nio.file.Paths; @@ -84,7 +85,9 @@ private BlockAsFileBlockStreamManager getBlockAsFileBlockStreamManager(String ro "BlockAsFileBlockStreamManager", 10_000, 36, - ".blk"); + ".blk", + StreamingMode.CONSTANT_RATE, + 1000); return new BlockAsFileBlockStreamManager(blockStreamConfig); } } diff --git a/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsFileLargeDataSetsTest.java b/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsFileLargeDataSetsTest.java index 87639a0e4..ef80d9635 100644 --- a/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsFileLargeDataSetsTest.java +++ b/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsFileLargeDataSetsTest.java @@ -20,6 +20,7 @@ import com.hedera.block.simulator.config.data.BlockStreamConfig; import com.hedera.block.simulator.config.types.GenerationMode; +import com.hedera.block.simulator.config.types.StreamingMode; import com.hedera.hapi.block.stream.BlockItem; import com.hedera.pbj.runtime.ParseException; import java.io.File; @@ -85,7 +86,9 @@ private BlockAsFileLargeDataSets getBlockAsFileLargeDatasetsBlockStreamManager( "BlockAsFileBlockStreamManager", 10_000, 36, - ".blk"); + ".blk", + StreamingMode.CONSTANT_RATE, + 1000); return new BlockAsFileLargeDataSets(blockStreamConfig); }