Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Fix Simulator unbounded memory growth #406

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public final class SimulatorMappedConfigSourceInitializer {

// Block stream configuration
new ConfigMapping("blockStream.simulatorMode", "BLOCK_STREAM_SIMULATOR_MODE"),
new ConfigMapping("blockStream.lastKnownStatusesCapacity", "BLOCK_STREAM_LAST_KNOWN_STATUSES_CAPACITY"),
new ConfigMapping("blockStream.delayBetweenBlockItems", "BLOCK_STREAM_DELAY_BETWEEN_BLOCK_ITEMS"),
new ConfigMapping("blockStream.maxBlockItemsToStream", "BLOCK_STREAM_MAX_BLOCK_ITEMS_TO_STREAM"),
new ConfigMapping("blockStream.streamingMode", "BLOCK_STREAM_STREAMING_MODE"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
* Defines the configuration data for the block stream in the Hedera Block Simulator.
*
* @param simulatorMode the mode of the simulator, in terms of publishing, consuming or both
* @param lastKnownStatusesCapacity the capacity of the last known statuses
* @param delayBetweenBlockItems the delay in microseconds between streaming each block item
* @param maxBlockItemsToStream the maximum number of block items to stream before stopping
* @param streamingMode the mode of streaming for the block stream (e.g., time-based, count-based)
Expand All @@ -34,6 +35,7 @@
@ConfigData("blockStream")
public record BlockStreamConfig(
@ConfigProperty(defaultValue = "PUBLISHER") SimulatorMode simulatorMode,
@ConfigProperty(defaultValue = "10") int lastKnownStatusesCapacity,
georgi-l95 marked this conversation as resolved.
Show resolved Hide resolved
@ConfigProperty(defaultValue = "1_500_000") int delayBetweenBlockItems,
@ConfigProperty(defaultValue = "100_000") int maxBlockItemsToStream,
@ConfigProperty(defaultValue = "MILLIS_PER_BLOCK") StreamingMode streamingMode,
Expand All @@ -54,6 +56,7 @@ public static Builder builder() {
*/
public static class Builder {
private SimulatorMode simulatorMode = SimulatorMode.PUBLISHER;
private int lastKnownStatusesCapacity = 10;
private int delayBetweenBlockItems = 1_500_000;
private int maxBlockItemsToStream = 10_000;
private StreamingMode streamingMode = StreamingMode.MILLIS_PER_BLOCK;
Expand All @@ -78,6 +81,17 @@ public Builder simulatorMode(SimulatorMode simulatorMode) {
return this;
}

/**
* Sets the capacity of the last known statuses.
*
* @param lastKnownStatusesCapacity the capacity
* @return this {@code Builder} instance
*/
public Builder lastKnownStatusesCapacity(int lastKnownStatusesCapacity) {
this.lastKnownStatusesCapacity = lastKnownStatusesCapacity;
return this;
}

/**
* Sets the delay between streaming each block item.
*
Expand Down Expand Up @@ -141,6 +155,7 @@ public Builder blockItemsBatchSize(int blockItemsBatchSize) {
public BlockStreamConfig build() {
return new BlockStreamConfig(
simulatorMode,
lastKnownStatusesCapacity,
delayBetweenBlockItems,
maxBlockItemsToStream,
streamingMode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
import static com.hedera.block.common.utils.Preconditions.requireWhole;
import static java.util.Objects.requireNonNull;

import java.util.ArrayList;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;

/**
Expand All @@ -33,8 +34,8 @@
public record StreamStatus(
long publishedBlocks,
long consumedBlocks,
List<String> lastKnownPublisherStatuses,
List<String> lastKnownConsumersStatuses) {
Deque<String> lastKnownPublisherStatuses,
Deque<String> lastKnownConsumersStatuses) {

/**
* Creates a new {@link Builder} instance for constructing a {@code StreamStatus}.
Expand All @@ -51,8 +52,8 @@ public static Builder builder() {
public static class Builder {
private long publishedBlocks = 0;
private long consumedBlocks = 0;
private List<String> lastKnownPublisherStatuses = new ArrayList<>();
private List<String> lastKnownConsumersStatuses = new ArrayList<>();
private Deque<String> lastKnownPublisherStatuses = new ArrayDeque<>();
private Deque<String> lastKnownConsumersStatuses = new ArrayDeque<>();

/**
* Creates a new instance of the {@code Builder} class with default configuration values.
Expand Down Expand Up @@ -93,7 +94,7 @@ public Builder consumedBlocks(long consumedBlocks) {
*/
public Builder lastKnownPublisherStatuses(List<String> lastKnownPublisherStatuses) {
requireNonNull(lastKnownPublisherStatuses);
this.lastKnownPublisherStatuses = new ArrayList<>(lastKnownPublisherStatuses);
this.lastKnownPublisherStatuses = new ArrayDeque<>(lastKnownPublisherStatuses);
return this;
}

Expand All @@ -105,7 +106,7 @@ public Builder lastKnownPublisherStatuses(List<String> lastKnownPublisherStatuse
*/
public Builder lastKnownConsumersStatuses(List<String> lastKnownConsumersStatuses) {
requireNonNull(lastKnownConsumersStatuses);
this.lastKnownConsumersStatuses = new ArrayList<>(lastKnownConsumersStatuses);
this.lastKnownConsumersStatuses = new ArrayDeque<>(lastKnownConsumersStatuses);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static java.util.Objects.requireNonNull;

import com.hedera.block.common.utils.Preconditions;
import com.hedera.block.simulator.config.data.BlockStreamConfig;
import com.hedera.block.simulator.config.data.GrpcConfig;
import com.hedera.block.simulator.grpc.ConsumerStreamGrpcClient;
import com.hedera.block.simulator.metrics.MetricsService;
Expand All @@ -30,7 +31,8 @@
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import javax.inject.Inject;
Expand All @@ -43,6 +45,7 @@
public class ConsumerStreamGrpcClientImpl implements ConsumerStreamGrpcClient {
// Configuration
private final GrpcConfig grpcConfig;
private final BlockStreamConfig blockStreamConfig;

// Service dependencies
private final MetricsService metricsService;
Expand All @@ -53,22 +56,28 @@ public class ConsumerStreamGrpcClientImpl implements ConsumerStreamGrpcClient {
private StreamObserver<SubscribeStreamResponse> consumerStreamObserver;

// State
private final List<String> lastKnownStatuses;
private final int lastKnownStatusesCapacity;
private final Deque<String> lastKnownStatuses;
private CountDownLatch streamLatch;

/**
* Constructs a new ConsumerStreamGrpcClientImpl with the specified configuration and metrics service.
*
* @param grpcConfig The configuration for gRPC connection settings
* @param blockStreamConfig The configuration for the block stream
* @param metricsService The service for recording consumption metrics
* @throws NullPointerException if any parameter is null
*/
@Inject
public ConsumerStreamGrpcClientImpl(
@NonNull final GrpcConfig grpcConfig, @NonNull final MetricsService metricsService) {
@NonNull final GrpcConfig grpcConfig,
@NonNull final BlockStreamConfig blockStreamConfig,
@NonNull final MetricsService metricsService) {
this.grpcConfig = requireNonNull(grpcConfig);
this.metricsService = requireNonNull(metricsService);
this.lastKnownStatuses = new ArrayList<>();
this.blockStreamConfig = requireNonNull(blockStreamConfig);
this.lastKnownStatusesCapacity = blockStreamConfig.lastKnownStatusesCapacity();
this.lastKnownStatuses = new ArrayDeque<>(lastKnownStatusesCapacity);
}

@Override
Expand All @@ -87,7 +96,8 @@ public void requestBlocks(long startBlock, long endBlock) throws InterruptedExce
Preconditions.requireWhole(endBlock);
Preconditions.requireGreaterOrEqual(endBlock, startBlock);

consumerStreamObserver = new ConsumerStreamObserver(metricsService, streamLatch, lastKnownStatuses);
consumerStreamObserver =
new ConsumerStreamObserver(metricsService, streamLatch, lastKnownStatuses, lastKnownStatusesCapacity);

SubscribeStreamRequest request = SubscribeStreamRequest.newBuilder()
.setStartBlockNumber(startBlock)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import edu.umd.cs.findbugs.annotations.NonNull;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.CountDownLatch;

Expand All @@ -42,23 +43,27 @@

// State
private final CountDownLatch streamLatch;
private final List<String> lastKnownStatuses;
private final int lastKnownStatusesCapacity;
private final Deque<String> lastKnownStatuses;

/**
* Constructs a new ConsumerStreamObserver.
*
* @param metricsService The service for recording consumption metrics
* @param streamLatch A latch used to coordinate stream completion
* @param lastKnownStatuses List to store the most recent status messages
* @param lastKnownStatusesCapacity the capacity of the last known statuses
* @throws NullPointerException if any parameter is null
*/
public ConsumerStreamObserver(
@NonNull final MetricsService metricsService,
@NonNull final CountDownLatch streamLatch,
@NonNull final List<String> lastKnownStatuses) {
@NonNull final Deque<String> lastKnownStatuses,
final int lastKnownStatusesCapacity) {
this.metricsService = requireNonNull(metricsService);
this.streamLatch = requireNonNull(streamLatch);
this.lastKnownStatuses = requireNonNull(lastKnownStatuses);
this.lastKnownStatusesCapacity = lastKnownStatusesCapacity;
}

/**
Expand All @@ -70,6 +75,9 @@
@Override
public void onNext(SubscribeStreamResponse subscribeStreamResponse) {
final SubscribeStreamResponse.ResponseCase responseType = subscribeStreamResponse.getResponseCase();
if (lastKnownStatuses.size() >= lastKnownStatusesCapacity) {
georgi-l95 marked this conversation as resolved.
Show resolved Hide resolved
lastKnownStatuses.pollFirst();

Check warning on line 79 in simulator/src/main/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamObserver.java

View check run for this annotation

Codecov / codecov/patch

simulator/src/main/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamObserver.java#L79

Added line #L79 was not covered by tests
}
lastKnownStatuses.add(subscribeStreamResponse.toString());

switch (responseType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.inject.Inject;
Expand All @@ -63,7 +64,8 @@ public class PublishStreamGrpcClientImpl implements PublishStreamGrpcClient {

// State
private final AtomicBoolean streamEnabled;
private final List<String> lastKnownStatuses = new ArrayList<>();
private final int lastKnownStatusesCapacity;
private final Deque<String> lastKnownStatuses;

/**
* Creates a new PublishStreamGrpcClientImpl with the specified dependencies.
Expand All @@ -84,6 +86,8 @@ public PublishStreamGrpcClientImpl(
this.blockStreamConfig = requireNonNull(blockStreamConfig);
this.metricsService = requireNonNull(metricsService);
this.streamEnabled = requireNonNull(streamEnabled);
this.lastKnownStatusesCapacity = blockStreamConfig.lastKnownStatusesCapacity();
lastKnownStatuses = new ArrayDeque<>(this.lastKnownStatusesCapacity);
}

/**
Expand All @@ -95,7 +99,8 @@ public void init() {
.usePlaintext()
.build();
BlockStreamServiceGrpc.BlockStreamServiceStub stub = BlockStreamServiceGrpc.newStub(channel);
PublishStreamObserver publishStreamObserver = new PublishStreamObserver(streamEnabled, lastKnownStatuses);
PublishStreamObserver publishStreamObserver =
new PublishStreamObserver(streamEnabled, lastKnownStatuses, lastKnownStatusesCapacity);
requestStreamObserver = stub.publishBlockStream(publishStreamObserver);
lastKnownStatuses.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import edu.umd.cs.findbugs.annotations.NonNull;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.util.List;
import java.util.Deque;
import java.util.concurrent.atomic.AtomicBoolean;

/**
Expand All @@ -36,19 +36,24 @@

// State
private final AtomicBoolean streamEnabled;
private final List<String> lastKnownStatuses;
private final int lastKnownStatusesCapacity;
private final Deque<String> lastKnownStatuses;

/**
* Creates a new PublishStreamObserver instance.
*
* @param streamEnabled Controls whether streaming should continue
* @param lastKnownStatuses List to store the most recent status messages
* @param lastKnownStatusesCapacity the capacity of the last known statuses
* @throws NullPointerException if any parameter is null
*/
public PublishStreamObserver(
@NonNull final AtomicBoolean streamEnabled, @NonNull final List<String> lastKnownStatuses) {
@NonNull final AtomicBoolean streamEnabled,
@NonNull final Deque<String> lastKnownStatuses,
final int lastKnownStatusesCapacity) {
this.streamEnabled = requireNonNull(streamEnabled);
this.lastKnownStatuses = requireNonNull(lastKnownStatuses);
this.lastKnownStatusesCapacity = lastKnownStatusesCapacity;
}

/**
Expand All @@ -58,6 +63,9 @@
*/
@Override
public void onNext(PublishStreamResponse publishStreamResponse) {
if (lastKnownStatuses.size() >= lastKnownStatusesCapacity) {
lastKnownStatuses.pollFirst();

Check warning on line 67 in simulator/src/main/java/com/hedera/block/simulator/grpc/impl/PublishStreamObserver.java

View check run for this annotation

Codecov / codecov/patch

simulator/src/main/java/com/hedera/block/simulator/grpc/impl/PublishStreamObserver.java#L67

Added line #L67 was not covered by tests
}
lastKnownStatuses.add(publishStreamResponse.toString());
LOGGER.log(INFO, "Received Response: " + publishStreamResponse.toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertIterableEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -300,7 +301,7 @@ void testGetStreamStatus() {

assertNotNull(streamStatus, "StreamStatus should not be null");
assertEquals(expectedPublishedBlocks, streamStatus.publishedBlocks(), "Published blocks should match");
assertEquals(
assertIterableEquals(
expectedLastKnownStatuses,
streamStatus.lastKnownPublisherStatuses(),
"Last known statuses should match");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class SimulatorMappedConfigSourceInitializerTest {

// Block stream configuration
new ConfigMapping("blockStream.simulatorMode", "BLOCK_STREAM_SIMULATOR_MODE"),
new ConfigMapping("blockStream.lastKnownStatusesCapacity", "BLOCK_STREAM_LAST_KNOWN_STATUSES_CAPACITY"),
new ConfigMapping("blockStream.delayBetweenBlockItems", "BLOCK_STREAM_DELAY_BETWEEN_BLOCK_ITEMS"),
new ConfigMapping("blockStream.maxBlockItemsToStream", "BLOCK_STREAM_MAX_BLOCK_ITEMS_TO_STREAM"),
new ConfigMapping("blockStream.streamingMode", "BLOCK_STREAM_STREAMING_MODE"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,16 @@ void testSimulatorMode() {
assertEquals(SimulatorMode.PUBLISHER, config.simulatorMode());
}

@Test
void testLastKnownStatusesCapacity() {
final int capacity = 20;
BlockStreamConfig config = getBlockStreamConfigBuilder()
.lastKnownStatusesCapacity(capacity)
.build();

assertEquals(capacity, config.lastKnownStatusesCapacity());
}

@Test
void testValidAbsolutePath() {
// Setup valid folder path and generation mode
Expand Down
Loading
Loading