Skip to content

Commit

Permalink
feat: Move out concrete working implementation and add working modes (#…
Browse files Browse the repository at this point in the history
…307)

Signed-off-by: georgi-l95 <[email protected]>
  • Loading branch information
georgi-l95 authored Oct 29, 2024
1 parent 88ef1f8 commit b6c93e6
Show file tree
Hide file tree
Showing 20 changed files with 943 additions and 278 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ private BlockStreamSimulator() {}
public static void main(final String[] args)
throws IOException, InterruptedException, BlockSimulatorParsingException {

LOGGER.log(INFO, "Starting Block Stream Simulator");
LOGGER.log(INFO, "Starting Block Stream Simulator!");

final ConfigurationBuilder configurationBuilder = ConfigurationBuilder.create()
.withSource(SystemEnvironmentConfigSource.getInstance())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@

package com.hedera.block.simulator;

import static java.util.Objects.requireNonNull;

import com.hedera.block.simulator.config.data.BlockStreamConfig;
import com.hedera.block.simulator.config.types.StreamingMode;
import com.hedera.block.simulator.config.types.SimulatorMode;
import com.hedera.block.simulator.exception.BlockSimulatorParsingException;
import com.hedera.block.simulator.generator.BlockStreamManager;
import com.hedera.block.simulator.grpc.PublishStreamGrpcClient;
import com.hedera.hapi.block.stream.Block;
import com.hedera.block.simulator.mode.CombinedModeHandler;
import com.hedera.block.simulator.mode.ConsumerModeHandler;
import com.hedera.block.simulator.mode.PublisherModeHandler;
import com.hedera.block.simulator.mode.SimulatorModeHandler;
import com.swirlds.config.api.Configuration;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.IOException;
Expand All @@ -31,16 +36,9 @@
/** BlockStream Simulator App */
public class BlockStreamSimulatorApp {

private static final System.Logger LOGGER =
System.getLogger(BlockStreamSimulatorApp.class.getName());

private final BlockStreamManager blockStreamManager;
private final System.Logger LOGGER = System.getLogger(getClass().getName());
private final PublishStreamGrpcClient publishStreamGrpcClient;
private final BlockStreamConfig blockStreamConfig;
private final StreamingMode streamingMode;

private final int delayBetweenBlockItems;
private final int millisecondsPerBlock;
private final SimulatorModeHandler simulatorModeHandler;
private final AtomicBoolean isRunning = new AtomicBoolean(false);

/**
Expand All @@ -55,14 +53,20 @@ public BlockStreamSimulatorApp(
@NonNull Configuration configuration,
@NonNull BlockStreamManager blockStreamManager,
@NonNull PublishStreamGrpcClient publishStreamGrpcClient) {
this.blockStreamManager = blockStreamManager;
this.publishStreamGrpcClient = publishStreamGrpcClient;

blockStreamConfig = configuration.getConfigData(BlockStreamConfig.class);

streamingMode = blockStreamConfig.streamingMode();
millisecondsPerBlock = blockStreamConfig.millisecondsPerBlock();
delayBetweenBlockItems = blockStreamConfig.delayBetweenBlockItems();
requireNonNull(blockStreamManager);

this.publishStreamGrpcClient = requireNonNull(publishStreamGrpcClient);
final BlockStreamConfig blockStreamConfig =
requireNonNull(configuration.getConfigData(BlockStreamConfig.class));

final SimulatorMode simulatorMode = blockStreamConfig.simulatorMode();
switch (simulatorMode) {
case PUBLISHER -> simulatorModeHandler =
new PublisherModeHandler(blockStreamConfig, publishStreamGrpcClient, blockStreamManager);
case CONSUMER -> simulatorModeHandler = new ConsumerModeHandler(blockStreamConfig);
case BOTH -> simulatorModeHandler = new CombinedModeHandler(blockStreamConfig);
default -> throw new IllegalArgumentException("Unknown SimulatorMode: " + simulatorMode);
}
}

/**
Expand All @@ -73,77 +77,11 @@ public BlockStreamSimulatorApp(
* @throws IOException if an I/O error occurs
*/
public void start() throws InterruptedException, BlockSimulatorParsingException, IOException {

LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator started initializing components...");
publishStreamGrpcClient.init();
isRunning.set(true);
LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator has started");

if (streamingMode == StreamingMode.MILLIS_PER_BLOCK) {
millisPerBlockStreaming();
} else {
constantRateStreaming();
}

LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator has stopped");
}

private void millisPerBlockStreaming()
throws IOException, InterruptedException, BlockSimulatorParsingException {

final long secondsPerBlockNanos = millisecondsPerBlock * 1_000_000L;

Block nextBlock = blockStreamManager.getNextBlock();
while (nextBlock != null) {
long startTime = System.nanoTime();
publishStreamGrpcClient.streamBlock(nextBlock);
long elapsedTime = System.nanoTime() - startTime;
long timeToDelay = secondsPerBlockNanos - elapsedTime;
if (timeToDelay > 0) {
Thread.sleep(timeToDelay / 1_000_000, (int) (timeToDelay % 1_000_000));
} else {
LOGGER.log(
System.Logger.Level.WARNING,
"Block Server is running behind. Streaming took: "
+ (elapsedTime / 1_000_000)
+ "ms - Longer than max expected of: "
+ millisecondsPerBlock
+ " milliseconds");
}
nextBlock = blockStreamManager.getNextBlock();
}
LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator has stopped");
}

private void constantRateStreaming()
throws InterruptedException, IOException, BlockSimulatorParsingException {
int delayMSBetweenBlockItems = delayBetweenBlockItems / 1_000_000;
int delayNSBetweenBlockItems = delayBetweenBlockItems % 1_000_000;
boolean streamBlockItem = true;
int blockItemsStreamed = 0;

while (streamBlockItem) {
// get block
Block block = blockStreamManager.getNextBlock();

if (block == null) {
LOGGER.log(
System.Logger.Level.INFO,
"Block Stream Simulator has reached the end of the block items");
break;
}

publishStreamGrpcClient.streamBlock(block);
blockItemsStreamed += block.items().size();

Thread.sleep(delayMSBetweenBlockItems, delayNSBetweenBlockItems);

if (blockItemsStreamed >= blockStreamConfig.maxBlockItemsToStream()) {
LOGGER.log(
System.Logger.Level.INFO,
"Block Stream Simulator has reached the maximum number of block items to"
+ " stream");
streamBlockItem = false;
}
}
simulatorModeHandler.start();
}

/**
Expand All @@ -155,8 +93,9 @@ public boolean isRunning() {
return isRunning.get();
}

/** Stops the block stream simulator. */
/** Stops the Block Stream Simulator and closes off all grpc channels. */
public void stop() {
publishStreamGrpcClient.shutdown();
isRunning.set(false);
LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator has stopped");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ public final class Constants {
/** postfix for gzip files */
public static final String GZ_EXTENSION = ".gz";

/**
* Used for converting nanoseconds to milliseconds and vice versa
*/
public static final int NANOS_PER_MILLI = 1_000_000;

/** Constructor to prevent instantiation. this is only a utility class */
private Constants() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@

package com.hedera.block.simulator.config.data;

import com.hedera.block.simulator.config.types.SimulatorMode;
import com.hedera.block.simulator.config.types.StreamingMode;
import com.swirlds.config.api.ConfigData;
import com.swirlds.config.api.ConfigProperty;

/**
* Defines the configuration data for the block stream in the Hedera Block Simulator.
*
* @param simulatorMode the mode of the simulator, in terms of publishing, consuming or both
* @param delayBetweenBlockItems the delay in microseconds between streaming each block item
* @param maxBlockItemsToStream the maximum number of block items to stream before stopping
* @param streamingMode the mode of streaming for the block stream (e.g., time-based, count-based)
Expand All @@ -31,6 +33,7 @@
*/
@ConfigData("blockStream")
public record BlockStreamConfig(
@ConfigProperty(defaultValue = "PUBLISHER") SimulatorMode simulatorMode,
@ConfigProperty(defaultValue = "1_500_000") int delayBetweenBlockItems,
@ConfigProperty(defaultValue = "100_000") int maxBlockItemsToStream,
@ConfigProperty(defaultValue = "MILLIS_PER_BLOCK") StreamingMode streamingMode,
Expand All @@ -50,6 +53,7 @@ public static Builder builder() {
* A builder for creating instances of {@link BlockStreamConfig}.
*/
public static class Builder {
private SimulatorMode simulatorMode = SimulatorMode.PUBLISHER;
private int delayBetweenBlockItems = 1_500_000;
private int maxBlockItemsToStream = 10_000;
private StreamingMode streamingMode = StreamingMode.MILLIS_PER_BLOCK;
Expand All @@ -63,6 +67,17 @@ public Builder() {
// Default constructor
}

/**
* Sets the simulator mode for the block stream.
*
* @param simulatorMode the {@link SimulatorMode} to use
* @return this {@code Builder} instance
*/
public Builder simulatorMode(SimulatorMode simulatorMode) {
this.simulatorMode = simulatorMode;
return this;
}

/**
* Sets the delay between streaming each block item.
*
Expand Down Expand Up @@ -125,6 +140,7 @@ public Builder blockItemsBatchSize(int blockItemsBatchSize) {
*/
public BlockStreamConfig build() {
return new BlockStreamConfig(
simulatorMode,
delayBetweenBlockItems,
maxBlockItemsToStream,
streamingMode,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.block.simulator.config.types;

/** The SimulatorMode enum defines the work modes of the block stream simulator. */
public enum SimulatorMode {
/**
* Indicates a work mode in which the simulator is working as both consumer and publisher.
*/
BOTH,
/**
* Indicates a work mode in which the simulator is working in consumer mode.
*/
CONSUMER,
/**
* Indicates a work mode in which the simulator is working in publisher mode.
*/
PUBLISHER
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
* The PublishStreamGrpcClient interface provides the methods to stream the block and block item.
*/
public interface PublishStreamGrpcClient {
/**
* Initialize, opens a gRPC channel and creates the needed stubs with the passed configuration.
*/
void init();

/**
* Streams the block item.
*
Expand All @@ -39,4 +44,9 @@ public interface PublishStreamGrpcClient {
* @return true if the block is streamed successfully, false otherwise
*/
boolean streamBlock(Block block);

/**
* Shutdowns the channel.
*/
void shutdown();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.hedera.block.simulator.grpc;

import static java.util.Objects.requireNonNull;

import com.hedera.block.common.utils.ChunkUtils;
import com.hedera.block.simulator.Translator;
import com.hedera.block.simulator.config.data.BlockStreamConfig;
Expand All @@ -37,9 +39,10 @@
*/
public class PublishStreamGrpcClientImpl implements PublishStreamGrpcClient {

private final BlockStreamServiceGrpc.BlockStreamServiceStub stub;
private final StreamObserver<PublishStreamRequest> requestStreamObserver;
private StreamObserver<PublishStreamRequest> requestStreamObserver;
private final BlockStreamConfig blockStreamConfig;
private final GrpcConfig grpcConfig;
private ManagedChannel channel;

/**
* Creates a new PublishStreamGrpcClientImpl instance.
Expand All @@ -49,15 +52,22 @@ public class PublishStreamGrpcClientImpl implements PublishStreamGrpcClient {
*/
@Inject
public PublishStreamGrpcClientImpl(
@NonNull GrpcConfig grpcConfig, @NonNull BlockStreamConfig blockStreamConfig) {
ManagedChannel channel =
ManagedChannelBuilder.forAddress(grpcConfig.serverAddress(), grpcConfig.port())
.usePlaintext()
.build();
stub = BlockStreamServiceGrpc.newStub(channel);
@NonNull final GrpcConfig grpcConfig, @NonNull final BlockStreamConfig blockStreamConfig) {
this.grpcConfig = requireNonNull(grpcConfig);
this.blockStreamConfig = requireNonNull(blockStreamConfig);
}

/**
* Initialize the channel and stub for publishBlockStream with the desired configuration.
*/
@Override
public void init() {
channel = ManagedChannelBuilder.forAddress(grpcConfig.serverAddress(), grpcConfig.port())
.usePlaintext()
.build();
BlockStreamServiceGrpc.BlockStreamServiceStub stub = BlockStreamServiceGrpc.newStub(channel);
PublishStreamObserver publishStreamObserver = new PublishStreamObserver();
requestStreamObserver = stub.publishBlockStream(publishStreamObserver);
this.blockStreamConfig = blockStreamConfig;
}

/**
Expand All @@ -72,8 +82,9 @@ public boolean streamBlockItem(List<BlockItem> blockItems) {
blockItemsProtoc.add(Translator.fromPbj(blockItem));
}

requestStreamObserver.onNext(
PublishStreamRequest.newBuilder().addAllBlockItems(blockItemsProtoc).build());
requestStreamObserver.onNext(PublishStreamRequest.newBuilder()
.addAllBlockItems(blockItemsProtoc)
.build());

return true;
}
Expand All @@ -91,12 +102,17 @@ public boolean streamBlock(Block block) {

List<List<com.hedera.hapi.block.stream.protoc.BlockItem>> streamingBatches =
ChunkUtils.chunkify(blockItemsProtoc, blockStreamConfig.blockItemsBatchSize());
for (List<com.hedera.hapi.block.stream.protoc.BlockItem> streamingBatch :
streamingBatches) {
requestStreamObserver.onNext(
PublishStreamRequest.newBuilder().addAllBlockItems(streamingBatch).build());
for (List<com.hedera.hapi.block.stream.protoc.BlockItem> streamingBatch : streamingBatches) {
requestStreamObserver.onNext(PublishStreamRequest.newBuilder()
.addAllBlockItems(streamingBatch)
.build());
}

return true;
}

@Override
public void shutdown() {
channel.shutdown();
}
}
Loading

0 comments on commit b6c93e6

Please sign in to comment.