Skip to content

Commit

Permalink
feat: Simulator: BlockStreamManager impl for very large DataSets (#225)
Browse files Browse the repository at this point in the history
Signed-off-by: Alfredo Gutierrez <[email protected]>
  • Loading branch information
AlfredoG87 authored Oct 4, 2024
1 parent 90e556a commit a3a97cc
Show file tree
Hide file tree
Showing 21 changed files with 465 additions and 41 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/smoke-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ jobs:

- name: Run application in background, capture logs in app.log
run: |
${{ env.GRADLE_EXEC }} run -x :suites:run 2> server/src/test/resources/app.log < /dev/null &
${{ env.GRADLE_EXEC }} :server:run 2> server/src/test/resources/app.log < /dev/null &
echo "Application started with PID $APP_PID"
sleep 10
Expand Down
83 changes: 83 additions & 0 deletions simulator/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Block Stream Simulator

## Overview

The Block Stream Simulator is designed to simulate block streaming for Hedera Hashgraph.
It uses various configuration sources and dependency injection to manage its components.

## Prerequisites

- Java 21
- Gradle
- IntelliJ IDEA (recommended for development)

## Project Design Structure

Uses Dagger2 for dependency injection, the project has a modular structure and divides the Dagger dependencies into modules, but all modules used can be found at the root Injection Module:
```plaintext
src/java/com/hedera/block/simulator/BlockStreamSimulatorInjectionModule.java
```
Entry point for the project is `BlockStreamSimulator.java`, in wich the main method is located and has 2 functions:
1. Create/Load the Application Configuration, it does this using Hedera Platform Configuration API.
2. Create a DaggerComponent and instantiate the BlockStreamSimulatorApp class using the DaggerComponent and it registered dependencies.
3. Start the BlockStreamSimulatorApp, contains the orchestration of the different parts of the simulation using generic interfaces and handles the rate of streaming and the exit conditions.

The BlockStreamSimulatorApp consumes other services that are injected using DaggerComponent, these are:
1. **generator:** responsible for generating blocks, exposes a single interface `BlockStreamManager` and several implementations
1. BlockAsDirBlockStreamManager: generates blocks from a directory, each folder is a block, and block-items are single 'blk' or 'blk.gz' files.
2. BlockAsFileBlockStreamManager: generates blocks from a single file, each file is a block, used to the format of the CN recordings. (since it loads blocks on memory it can stream really fast, really useful for simple streaming tests)
3. BlockAsFileLargeDataSets: similar to BlockAsFileBLockStreamManager, but designed to work with GB folders with thousands of big blocks (since it has a high size block and volume of blocks, is useful for performace, load and stress testing)
2. **grpc:** responsible for the communication with the Block-Node, currently only has 1 interface `PublishStreamGrpcClient` and 1 Implementation, however also exposes a `PublishStreamObserver'

## Configuration

There are 2 configuration sets:
1. BlockStreamConfig: contains the configuration for the Block Stream Simulator logic and the generation module.
2. GrpcConfig: contains the configuration for the gRPC communication with the Block-Node.

### BlockStreamConfig
Uses the prefix `blockStream` so all properties should start with `blockStream.`

| Key | Description | Default Value |
|--------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------|
| `generationMode` | The desired generation Mode to use, it can only be `DIR` or `AD_HOC` | `DIR` |
| `folderRootPath` | If the generationMode is DIR this will be used as the source of the recording to stream to the Block-Node | `` |
| `delayBetweenBlockItems` | The delay between each block item in nanoseconds | `1_500_000` |
| `managerImplementation` | The desired implementation of the BlockStreamManager to use, it can only be `BlockAsDirBlockStreamManager`, `BlockAsFileBlockStreamManager` or `BlockAsFileLargeDataSets` | `BlockAsFileBlockStreamManager` |
| `maxBlockItemsToStream` | exit condition for the simulator and the circular implementations such as `BlockAsDir` or `BlockAsFile` implementations | `10_000` |
| `paddedLength` | on the `BlockAsFileLargeDataSets` implementation, the length of the padded left zeroes `000001.blk.gz` | 36 |
| `fileExtension` | on the `BlockAsFileLargeDataSets` implementation, the extension of the files to be streamed | `.blk.gz` |

### GrpcConfig
Uses the prefix `grpc` so all properties should start with `grpc.`

| Key | Description | Default Value |
|-----------------|----------------------------|---------------|
| `serverAddress` | The host of the Block-Node | `localhost` |
| `port` | The port of the Block-Node | `8080` |

## Building the Project

To build the project, run the following command:

```sh
./gradlew :simulator:build
```

## Running the Project

Usually you will want to run a Block-Node server before the simulator, for that you can use the following commnad:

```sh
./gradlew :server:run
```
However we recommend running the block-node server as a docker container:
```sh
./gradlew :server:build :server:createDockerImage :server:startDockerContainer
```

Once the project is built, you can run it using the following command:

```sh
./gradlew :simulator:run
```
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static java.lang.System.Logger.Level.INFO;

import com.hedera.pbj.runtime.ParseException;
import com.swirlds.config.api.Configuration;
import com.swirlds.config.api.ConfigurationBuilder;
import com.swirlds.config.extensions.sources.ClasspathFileConfigSource;
Expand All @@ -40,8 +41,10 @@ private BlockStreamSimulator() {}
* @param args the arguments to be passed to the block stream simulator
* @throws IOException if an I/O error occurs
* @throws InterruptedException if the thread is interrupted
* @throws ParseException if a parse error occurs
*/
public static void main(String[] args) throws IOException, InterruptedException {
public static void main(String[] args)
throws IOException, InterruptedException, ParseException {

LOGGER.log(INFO, "Starting Block Stream Simulator");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import com.hedera.block.simulator.generator.BlockStreamManager;
import com.hedera.block.simulator.grpc.PublishStreamGrpcClient;
import com.hedera.hapi.block.stream.BlockItem;
import com.hedera.pbj.runtime.ParseException;
import com.swirlds.config.api.Configuration;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.IOException;
import javax.inject.Inject;

/** BlockStream Simulator App */
Expand Down Expand Up @@ -64,8 +66,10 @@ public BlockStreamSimulatorApp(
* Starts the block stream simulator.
*
* @throws InterruptedException if the thread is interrupted
* @throws ParseException if a parse error occurs
* @throws IOException if an I/O error occurs
*/
public void start() throws InterruptedException {
public void start() throws InterruptedException, ParseException, IOException {
int delayMSBetweenBlockItems = delayBetweenBlockItems / 1_000_000;
int delayNSBetweenBlockItems = delayBetweenBlockItems % 1_000_000;

Expand All @@ -78,12 +82,24 @@ public void start() throws InterruptedException {
while (streamBlockItem) {
// get block item
BlockItem blockItem = blockStreamManager.getNextBlockItem();

if (blockItem == null) {
LOGGER.log(
System.Logger.Level.INFO,
"Block Stream Simulator has reached the end of the block items");
break;
}

publishStreamGrpcClient.streamBlockItem(blockItem);
blockItemsStreamed++;

Thread.sleep(delayMSBetweenBlockItems, delayNSBetweenBlockItems);

if (blockItemsStreamed >= blockStreamConfig.maxBlockItemsToStream()) {
LOGGER.log(
System.Logger.Level.INFO,
"Block Stream Simulator has reached the maximum number of block items to"
+ " stream");
streamBlockItem = false;
}
}
Expand Down
25 changes: 25 additions & 0 deletions simulator/src/main/java/com/hedera/block/simulator/Constants.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.block.simulator;

public class Constants {

// The file extension for block files.
public static final String RECORD_EXTENSION = "blk";
// postfix for gzipped files
public static final String GZ_EXTENSION = ".gz";
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
* @param delayBetweenBlockItems the delay between block items
* @param managerImplementation the implementation of the block stream manager
* @param maxBlockItemsToStream the maximum number of block items to stream
* @param paddedLength the padded length of 0 the block file format
* @param fileExtension the file extension of the block file format
*/
@ConfigData("blockStream")
public record BlockStreamConfig(
Expand All @@ -39,7 +41,9 @@ public record BlockStreamConfig(
@ConfigProperty(defaultValue = "1_500_000") int delayBetweenBlockItems,
@ConfigProperty(defaultValue = "BlockAsFileBlockStreamManager")
String managerImplementation,
@ConfigProperty(defaultValue = "10_000") int maxBlockItemsToStream) {
@ConfigProperty(defaultValue = "10_000") int maxBlockItemsToStream,
@ConfigProperty(defaultValue = "36") int paddedLength,
@ConfigProperty(defaultValue = ".blk.gz") String fileExtension) {

/**
* Constructor to set the default root path if not provided, it will be set to the data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.hedera.block.simulator.generator;

import static com.hedera.block.simulator.generator.Utils.readFileBytes;
import static java.lang.System.Logger.Level.DEBUG;
import static java.lang.System.Logger.Level.ERROR;
import static java.lang.System.Logger.Level.INFO;
Expand Down Expand Up @@ -125,7 +126,7 @@ private void loadBlocks() throws IOException, ParseException {
.toList();

for (Path pathBlockItem : sortedBlockItems) {
byte[] blockItemBytes = readBlockItemBytes(pathBlockItem);
byte[] blockItemBytes = readFileBytes(pathBlockItem);
// if null means the file is not a block item and we can skip the file.
if (blockItemBytes == null) {
continue;
Expand All @@ -141,15 +142,6 @@ private void loadBlocks() throws IOException, ParseException {
}
}

private byte[] readBlockItemBytes(Path pathBlockItem) throws IOException {
if (pathBlockItem.toString().endsWith(".gz")) {
return Utils.readGzFile(pathBlockItem);
} else if (pathBlockItem.toString().endsWith(".blk")) {
return Files.readAllBytes(pathBlockItem);
}
return null;
}

// Method to extract the numeric part of the filename from a Path object
// Returns -1 if the filename is not a valid number
private static int extractNumberFromPath(Path path) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.hedera.block.simulator.generator;

import static com.hedera.block.simulator.generator.Utils.readFileBytes;
import static java.lang.System.Logger.Level.DEBUG;
import static java.lang.System.Logger.Level.ERROR;
import static java.lang.System.Logger.Level.INFO;
Expand Down Expand Up @@ -108,13 +109,10 @@ private void loadBlocks() throws IOException, ParseException {

for (Path blockPath : sortedBlockFiles) {

byte[] blockBytes;
if (blockPath.toString().endsWith(".gz")) {
blockBytes = Utils.readGzFile(blockPath);
} else if (blockPath.toString().endsWith(".blk")) {
blockBytes = Files.readAllBytes(blockPath);
} else {
throw new IllegalArgumentException("Invalid file format: " + blockPath);
byte[] blockBytes = readFileBytes(blockPath);
// skip if block is null, usually due to SO files like .DS_STORE
if (blockBytes == null) {
continue;
}

Block block = Block.PROTOBUF.parse(Bytes.wrap(blockBytes));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.block.simulator.generator;

import static com.hedera.block.simulator.generator.Utils.readFileBytes;
import static java.lang.System.Logger.Level.INFO;

import com.hedera.block.simulator.config.data.BlockStreamConfig;
import com.hedera.block.simulator.config.types.GenerationMode;
import com.hedera.hapi.block.stream.Block;
import com.hedera.hapi.block.stream.BlockItem;
import com.hedera.pbj.runtime.ParseException;
import com.hedera.pbj.runtime.io.buffer.Bytes;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.File;
import java.io.IOException;
import javax.inject.Inject;

/** A block stream manager that reads blocks from files in a directory. */
public class BlockAsFileLargeDataSets implements BlockStreamManager {

private final System.Logger LOGGER = System.getLogger(getClass().getName());

private final String blockstreamPath;
private int currentBlockIndex = 0;
private int currentBlockItemIndex = 0;

private Block currentBlock = null;
private final String formatString;

/**
* Constructs a new BlockAsFileLargeDataSets instance.
*
* @param config the block stream configuration
*/
@Inject
public BlockAsFileLargeDataSets(@NonNull BlockStreamConfig config) {
this.blockstreamPath = config.folderRootPath();
this.formatString = "%0" + config.paddedLength() + "d" + config.fileExtension();
}

@Override
public GenerationMode getGenerationMode() {
return GenerationMode.DIR;
}

@Override
public BlockItem getNextBlockItem() throws IOException, ParseException {
if (currentBlock != null && currentBlock.items().size() > currentBlockItemIndex) {
return currentBlock.items().get(currentBlockItemIndex++);
} else {
currentBlock = getNextBlock();
if (currentBlock != null) {
currentBlockItemIndex = 0; // Reset for new block
return getNextBlockItem();
}
}

return null; // No more blocks/items
}

@Override
public Block getNextBlock() throws IOException, ParseException {
currentBlockIndex++;

String nextBlockFileName = String.format(formatString, currentBlockIndex);
File blockFile = new File(blockstreamPath, nextBlockFileName);

if (blockFile.exists()) {
byte[] blockBytes = readFileBytes(blockFile.toPath());

LOGGER.log(INFO, "Loading block: " + blockFile.getName());

Block block = Block.PROTOBUF.parse(Bytes.wrap(blockBytes));
LOGGER.log(INFO, "block loaded with items size= " + block.items().size());
return block;
}

return null; // No more blocks found
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.hedera.block.simulator.config.types.GenerationMode;
import com.hedera.hapi.block.stream.Block;
import com.hedera.hapi.block.stream.BlockItem;
import com.hedera.pbj.runtime.ParseException;
import java.io.IOException;

/** The block stream manager interface. */
public interface BlockStreamManager {
Expand All @@ -34,13 +36,17 @@ public interface BlockStreamManager {
* Get the next block item.
*
* @return the next block item
* @throws IOException if an I/O error occurs
* @throws ParseException if a parse error occurs
*/
BlockItem getNextBlockItem();
BlockItem getNextBlockItem() throws IOException, ParseException;

/**
* Get the next block.
*
* @return the next block
* @throws IOException if an I/O error occurs
* @throws ParseException if a parse error occurs
*/
Block getNextBlock();
Block getNextBlock() throws IOException, ParseException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ static BlockStreamManager providesBlockStreamManager(BlockStreamConfig config) {

if ("BlockAsDirBlockStreamManager".equalsIgnoreCase(config.managerImplementation())) {
return new BlockAsDirBlockStreamManager(config);
} else if ("BlockAsFileLargeDataSets".equalsIgnoreCase(config.managerImplementation())) {
return new BlockAsFileLargeDataSets(config);
}

return new BlockAsFileBlockStreamManager(config);
Expand Down
Loading

0 comments on commit a3a97cc

Please sign in to comment.