Skip to content

Commit

Permalink
chore: Fix Simulator unbounded memory growth (#406)
Browse files Browse the repository at this point in the history
Signed-off-by: georgi-l95 <[email protected]>
  • Loading branch information
georgi-l95 authored Dec 19, 2024
1 parent 9bae709 commit 3ad4efc
Show file tree
Hide file tree
Showing 15 changed files with 133 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public final class SimulatorMappedConfigSourceInitializer {

// Block stream configuration
new ConfigMapping("blockStream.simulatorMode", "BLOCK_STREAM_SIMULATOR_MODE"),
new ConfigMapping("blockStream.lastKnownStatusesCapacity", "BLOCK_STREAM_LAST_KNOWN_STATUSES_CAPACITY"),
new ConfigMapping("blockStream.delayBetweenBlockItems", "BLOCK_STREAM_DELAY_BETWEEN_BLOCK_ITEMS"),
new ConfigMapping("blockStream.maxBlockItemsToStream", "BLOCK_STREAM_MAX_BLOCK_ITEMS_TO_STREAM"),
new ConfigMapping("blockStream.streamingMode", "BLOCK_STREAM_STREAMING_MODE"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
* Defines the configuration data for the block stream in the Hedera Block Simulator.
*
* @param simulatorMode the mode of the simulator, in terms of publishing, consuming or both
* @param lastKnownStatusesCapacity the capacity of the last known statuses
* @param delayBetweenBlockItems the delay in microseconds between streaming each block item
* @param maxBlockItemsToStream the maximum number of block items to stream before stopping
* @param streamingMode the mode of streaming for the block stream (e.g., time-based, count-based)
Expand All @@ -34,6 +35,7 @@
@ConfigData("blockStream")
public record BlockStreamConfig(
@ConfigProperty(defaultValue = "PUBLISHER") SimulatorMode simulatorMode,
@ConfigProperty(defaultValue = "10") int lastKnownStatusesCapacity,
@ConfigProperty(defaultValue = "1_500_000") int delayBetweenBlockItems,
@ConfigProperty(defaultValue = "100_000") int maxBlockItemsToStream,
@ConfigProperty(defaultValue = "MILLIS_PER_BLOCK") StreamingMode streamingMode,
Expand All @@ -54,6 +56,7 @@ public static Builder builder() {
*/
public static class Builder {
private SimulatorMode simulatorMode = SimulatorMode.PUBLISHER;
private int lastKnownStatusesCapacity = 10;
private int delayBetweenBlockItems = 1_500_000;
private int maxBlockItemsToStream = 10_000;
private StreamingMode streamingMode = StreamingMode.MILLIS_PER_BLOCK;
Expand All @@ -78,6 +81,17 @@ public Builder simulatorMode(SimulatorMode simulatorMode) {
return this;
}

/**
* Sets the capacity of the last known statuses.
*
* @param lastKnownStatusesCapacity the capacity
* @return this {@code Builder} instance
*/
public Builder lastKnownStatusesCapacity(int lastKnownStatusesCapacity) {
this.lastKnownStatusesCapacity = lastKnownStatusesCapacity;
return this;
}

/**
* Sets the delay between streaming each block item.
*
Expand Down Expand Up @@ -141,6 +155,7 @@ public Builder blockItemsBatchSize(int blockItemsBatchSize) {
public BlockStreamConfig build() {
return new BlockStreamConfig(
simulatorMode,
lastKnownStatusesCapacity,
delayBetweenBlockItems,
maxBlockItemsToStream,
streamingMode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
import static com.hedera.block.common.utils.Preconditions.requireWhole;
import static java.util.Objects.requireNonNull;

import java.util.ArrayList;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;

/**
Expand All @@ -33,8 +34,8 @@
public record StreamStatus(
long publishedBlocks,
long consumedBlocks,
List<String> lastKnownPublisherStatuses,
List<String> lastKnownConsumersStatuses) {
Deque<String> lastKnownPublisherStatuses,
Deque<String> lastKnownConsumersStatuses) {

/**
* Creates a new {@link Builder} instance for constructing a {@code StreamStatus}.
Expand All @@ -51,8 +52,8 @@ public static Builder builder() {
public static class Builder {
private long publishedBlocks = 0;
private long consumedBlocks = 0;
private List<String> lastKnownPublisherStatuses = new ArrayList<>();
private List<String> lastKnownConsumersStatuses = new ArrayList<>();
private Deque<String> lastKnownPublisherStatuses = new ArrayDeque<>();
private Deque<String> lastKnownConsumersStatuses = new ArrayDeque<>();

/**
* Creates a new instance of the {@code Builder} class with default configuration values.
Expand Down Expand Up @@ -93,7 +94,7 @@ public Builder consumedBlocks(long consumedBlocks) {
*/
public Builder lastKnownPublisherStatuses(List<String> lastKnownPublisherStatuses) {
requireNonNull(lastKnownPublisherStatuses);
this.lastKnownPublisherStatuses = new ArrayList<>(lastKnownPublisherStatuses);
this.lastKnownPublisherStatuses = new ArrayDeque<>(lastKnownPublisherStatuses);
return this;
}

Expand All @@ -105,7 +106,7 @@ public Builder lastKnownPublisherStatuses(List<String> lastKnownPublisherStatuse
*/
public Builder lastKnownConsumersStatuses(List<String> lastKnownConsumersStatuses) {
requireNonNull(lastKnownConsumersStatuses);
this.lastKnownConsumersStatuses = new ArrayList<>(lastKnownConsumersStatuses);
this.lastKnownConsumersStatuses = new ArrayDeque<>(lastKnownConsumersStatuses);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static java.util.Objects.requireNonNull;

import com.hedera.block.common.utils.Preconditions;
import com.hedera.block.simulator.config.data.BlockStreamConfig;
import com.hedera.block.simulator.config.data.GrpcConfig;
import com.hedera.block.simulator.grpc.ConsumerStreamGrpcClient;
import com.hedera.block.simulator.metrics.MetricsService;
Expand All @@ -30,7 +31,8 @@
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import javax.inject.Inject;
Expand All @@ -43,6 +45,7 @@
public class ConsumerStreamGrpcClientImpl implements ConsumerStreamGrpcClient {
// Configuration
private final GrpcConfig grpcConfig;
private final BlockStreamConfig blockStreamConfig;

// Service dependencies
private final MetricsService metricsService;
Expand All @@ -53,22 +56,28 @@ public class ConsumerStreamGrpcClientImpl implements ConsumerStreamGrpcClient {
private StreamObserver<SubscribeStreamResponse> consumerStreamObserver;

// State
private final List<String> lastKnownStatuses;
private final int lastKnownStatusesCapacity;
private final Deque<String> lastKnownStatuses;
private CountDownLatch streamLatch;

/**
* Constructs a new ConsumerStreamGrpcClientImpl with the specified configuration and metrics service.
*
* @param grpcConfig The configuration for gRPC connection settings
* @param blockStreamConfig The configuration for the block stream
* @param metricsService The service for recording consumption metrics
* @throws NullPointerException if any parameter is null
*/
@Inject
public ConsumerStreamGrpcClientImpl(
@NonNull final GrpcConfig grpcConfig, @NonNull final MetricsService metricsService) {
@NonNull final GrpcConfig grpcConfig,
@NonNull final BlockStreamConfig blockStreamConfig,
@NonNull final MetricsService metricsService) {
this.grpcConfig = requireNonNull(grpcConfig);
this.metricsService = requireNonNull(metricsService);
this.lastKnownStatuses = new ArrayList<>();
this.blockStreamConfig = requireNonNull(blockStreamConfig);
this.lastKnownStatusesCapacity = blockStreamConfig.lastKnownStatusesCapacity();
this.lastKnownStatuses = new ArrayDeque<>(lastKnownStatusesCapacity);
}

@Override
Expand All @@ -87,7 +96,8 @@ public void requestBlocks(long startBlock, long endBlock) throws InterruptedExce
Preconditions.requireWhole(endBlock);
Preconditions.requireGreaterOrEqual(endBlock, startBlock);

consumerStreamObserver = new ConsumerStreamObserver(metricsService, streamLatch, lastKnownStatuses);
consumerStreamObserver =
new ConsumerStreamObserver(metricsService, streamLatch, lastKnownStatuses, lastKnownStatusesCapacity);

SubscribeStreamRequest request = SubscribeStreamRequest.newBuilder()
.setStartBlockNumber(startBlock)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import edu.umd.cs.findbugs.annotations.NonNull;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.CountDownLatch;

Expand All @@ -42,23 +43,27 @@ public class ConsumerStreamObserver implements StreamObserver<SubscribeStreamRes

// State
private final CountDownLatch streamLatch;
private final List<String> lastKnownStatuses;
private final int lastKnownStatusesCapacity;
private final Deque<String> lastKnownStatuses;

/**
* Constructs a new ConsumerStreamObserver.
*
* @param metricsService The service for recording consumption metrics
* @param streamLatch A latch used to coordinate stream completion
* @param lastKnownStatuses List to store the most recent status messages
* @param lastKnownStatusesCapacity the capacity of the last known statuses
* @throws NullPointerException if any parameter is null
*/
public ConsumerStreamObserver(
@NonNull final MetricsService metricsService,
@NonNull final CountDownLatch streamLatch,
@NonNull final List<String> lastKnownStatuses) {
@NonNull final Deque<String> lastKnownStatuses,
final int lastKnownStatusesCapacity) {
this.metricsService = requireNonNull(metricsService);
this.streamLatch = requireNonNull(streamLatch);
this.lastKnownStatuses = requireNonNull(lastKnownStatuses);
this.lastKnownStatusesCapacity = lastKnownStatusesCapacity;
}

/**
Expand All @@ -70,6 +75,9 @@ public ConsumerStreamObserver(
@Override
public void onNext(SubscribeStreamResponse subscribeStreamResponse) {
final SubscribeStreamResponse.ResponseCase responseType = subscribeStreamResponse.getResponseCase();
if (lastKnownStatuses.size() >= lastKnownStatusesCapacity) {
lastKnownStatuses.pollFirst();
}
lastKnownStatuses.add(subscribeStreamResponse.toString());

switch (responseType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.inject.Inject;
Expand All @@ -63,7 +64,8 @@ public class PublishStreamGrpcClientImpl implements PublishStreamGrpcClient {

// State
private final AtomicBoolean streamEnabled;
private final List<String> lastKnownStatuses = new ArrayList<>();
private final int lastKnownStatusesCapacity;
private final Deque<String> lastKnownStatuses;

/**
* Creates a new PublishStreamGrpcClientImpl with the specified dependencies.
Expand All @@ -84,6 +86,8 @@ public PublishStreamGrpcClientImpl(
this.blockStreamConfig = requireNonNull(blockStreamConfig);
this.metricsService = requireNonNull(metricsService);
this.streamEnabled = requireNonNull(streamEnabled);
this.lastKnownStatusesCapacity = blockStreamConfig.lastKnownStatusesCapacity();
lastKnownStatuses = new ArrayDeque<>(this.lastKnownStatusesCapacity);
}

/**
Expand All @@ -95,7 +99,8 @@ public void init() {
.usePlaintext()
.build();
BlockStreamServiceGrpc.BlockStreamServiceStub stub = BlockStreamServiceGrpc.newStub(channel);
PublishStreamObserver publishStreamObserver = new PublishStreamObserver(streamEnabled, lastKnownStatuses);
PublishStreamObserver publishStreamObserver =
new PublishStreamObserver(streamEnabled, lastKnownStatuses, lastKnownStatusesCapacity);
requestStreamObserver = stub.publishBlockStream(publishStreamObserver);
lastKnownStatuses.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import edu.umd.cs.findbugs.annotations.NonNull;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.util.List;
import java.util.Deque;
import java.util.concurrent.atomic.AtomicBoolean;

/**
Expand All @@ -36,19 +36,24 @@ public class PublishStreamObserver implements StreamObserver<PublishStreamRespon

// State
private final AtomicBoolean streamEnabled;
private final List<String> lastKnownStatuses;
private final int lastKnownStatusesCapacity;
private final Deque<String> lastKnownStatuses;

/**
* Creates a new PublishStreamObserver instance.
*
* @param streamEnabled Controls whether streaming should continue
* @param lastKnownStatuses List to store the most recent status messages
* @param lastKnownStatusesCapacity the capacity of the last known statuses
* @throws NullPointerException if any parameter is null
*/
public PublishStreamObserver(
@NonNull final AtomicBoolean streamEnabled, @NonNull final List<String> lastKnownStatuses) {
@NonNull final AtomicBoolean streamEnabled,
@NonNull final Deque<String> lastKnownStatuses,
final int lastKnownStatusesCapacity) {
this.streamEnabled = requireNonNull(streamEnabled);
this.lastKnownStatuses = requireNonNull(lastKnownStatuses);
this.lastKnownStatusesCapacity = lastKnownStatusesCapacity;
}

/**
Expand All @@ -58,6 +63,9 @@ public PublishStreamObserver(
*/
@Override
public void onNext(PublishStreamResponse publishStreamResponse) {
if (lastKnownStatuses.size() >= lastKnownStatusesCapacity) {
lastKnownStatuses.pollFirst();
}
lastKnownStatuses.add(publishStreamResponse.toString());
LOGGER.log(INFO, "Received Response: " + publishStreamResponse.toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertIterableEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -300,7 +301,7 @@ void testGetStreamStatus() {

assertNotNull(streamStatus, "StreamStatus should not be null");
assertEquals(expectedPublishedBlocks, streamStatus.publishedBlocks(), "Published blocks should match");
assertEquals(
assertIterableEquals(
expectedLastKnownStatuses,
streamStatus.lastKnownPublisherStatuses(),
"Last known statuses should match");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class SimulatorMappedConfigSourceInitializerTest {

// Block stream configuration
new ConfigMapping("blockStream.simulatorMode", "BLOCK_STREAM_SIMULATOR_MODE"),
new ConfigMapping("blockStream.lastKnownStatusesCapacity", "BLOCK_STREAM_LAST_KNOWN_STATUSES_CAPACITY"),
new ConfigMapping("blockStream.delayBetweenBlockItems", "BLOCK_STREAM_DELAY_BETWEEN_BLOCK_ITEMS"),
new ConfigMapping("blockStream.maxBlockItemsToStream", "BLOCK_STREAM_MAX_BLOCK_ITEMS_TO_STREAM"),
new ConfigMapping("blockStream.streamingMode", "BLOCK_STREAM_STREAMING_MODE"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,16 @@ void testSimulatorMode() {
assertEquals(SimulatorMode.PUBLISHER, config.simulatorMode());
}

@Test
void testLastKnownStatusesCapacity() {
final int capacity = 20;
BlockStreamConfig config = getBlockStreamConfigBuilder()
.lastKnownStatusesCapacity(capacity)
.build();

assertEquals(capacity, config.lastKnownStatusesCapacity());
}

@Test
void testValidAbsolutePath() {
// Setup valid folder path and generation mode
Expand Down
Loading

0 comments on commit 3ad4efc

Please sign in to comment.