From d2328521edcbadc9092b8b86b2fa743932a70b15 Mon Sep 17 00:00:00 2001 From: georgi-l95 Date: Mon, 16 Dec 2024 14:35:10 +0200 Subject: [PATCH] address feedback Signed-off-by: georgi-l95 --- .../simulator/config/data/BlockStreamConfig.java | 3 ++- .../grpc/impl/ConsumerStreamGrpcClientImpl.java | 4 +++- .../simulator/grpc/impl/ConsumerStreamObserver.java | 11 ++++++----- .../grpc/impl/PublishStreamGrpcClientImpl.java | 3 ++- .../simulator/grpc/impl/PublishStreamObserver.java | 11 ++++++----- .../grpc/positive/PositiveEndpointBehaviourTests.java | 9 ++++----- 6 files changed, 23 insertions(+), 18 deletions(-) diff --git a/simulator/src/main/java/com/hedera/block/simulator/config/data/BlockStreamConfig.java b/simulator/src/main/java/com/hedera/block/simulator/config/data/BlockStreamConfig.java index 9671549b2..34aba7c8d 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/config/data/BlockStreamConfig.java +++ b/simulator/src/main/java/com/hedera/block/simulator/config/data/BlockStreamConfig.java @@ -25,6 +25,7 @@ * Defines the configuration data for the block stream in the Hedera Block Simulator. * * @param simulatorMode the mode of the simulator, in terms of publishing, consuming or both + * @param lastKnownStatusesCapacity the capacity of the last known statuses * @param delayBetweenBlockItems the delay in microseconds between streaming each block item * @param maxBlockItemsToStream the maximum number of block items to stream before stopping * @param streamingMode the mode of streaming for the block stream (e.g., time-based, count-based) @@ -38,7 +39,7 @@ public record BlockStreamConfig( @ConfigProperty(defaultValue = "1_500_000") int delayBetweenBlockItems, @ConfigProperty(defaultValue = "100_000") int maxBlockItemsToStream, @ConfigProperty(defaultValue = "MILLIS_PER_BLOCK") StreamingMode streamingMode, - @ConfigProperty(defaultValue = "10") int millisecondsPerBlock, + @ConfigProperty(defaultValue = "1000") int millisecondsPerBlock, @ConfigProperty(defaultValue = "1000") int blockItemsBatchSize) { /** diff --git a/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamGrpcClientImpl.java b/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamGrpcClientImpl.java index 1159060a6..ce0ad580a 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamGrpcClientImpl.java +++ b/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamGrpcClientImpl.java @@ -32,6 +32,7 @@ import io.grpc.ManagedChannelBuilder; import io.grpc.stub.StreamObserver; import java.util.ArrayDeque; +import java.util.Deque; import java.util.List; import java.util.concurrent.CountDownLatch; import javax.inject.Inject; @@ -56,13 +57,14 @@ public class ConsumerStreamGrpcClientImpl implements ConsumerStreamGrpcClient { // State private final int lastKnownStatusesCapacity; - private final ArrayDeque lastKnownStatuses; + private final Deque lastKnownStatuses; private CountDownLatch streamLatch; /** * Constructs a new ConsumerStreamGrpcClientImpl with the specified configuration and metrics service. * * @param grpcConfig The configuration for gRPC connection settings + * @param blockStreamConfig The configuration for the block stream * @param metricsService The service for recording consumption metrics * @throws NullPointerException if any parameter is null */ diff --git a/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamObserver.java b/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamObserver.java index c7630e6be..c5868d5d9 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamObserver.java +++ b/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamObserver.java @@ -27,7 +27,7 @@ import edu.umd.cs.findbugs.annotations.NonNull; import io.grpc.Status; import io.grpc.stub.StreamObserver; -import java.util.ArrayDeque; +import java.util.Deque; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -44,7 +44,7 @@ public class ConsumerStreamObserver implements StreamObserver lastKnownStatuses; + private final Deque lastKnownStatuses; /** * Constructs a new ConsumerStreamObserver. @@ -52,13 +52,14 @@ public class ConsumerStreamObserver implements StreamObserver lastKnownStatuses, - @NonNull final int lastKnownStatusesCapacity) { + @NonNull final Deque lastKnownStatuses, + final int lastKnownStatusesCapacity) { this.metricsService = requireNonNull(metricsService); this.streamLatch = requireNonNull(streamLatch); this.lastKnownStatuses = requireNonNull(lastKnownStatuses); @@ -75,7 +76,7 @@ public ConsumerStreamObserver( public void onNext(SubscribeStreamResponse subscribeStreamResponse) { final SubscribeStreamResponse.ResponseCase responseType = subscribeStreamResponse.getResponseCase(); if (lastKnownStatuses.size() == lastKnownStatusesCapacity) { - lastKnownStatuses.removeFirst(); + lastKnownStatuses.pollFirst(); } lastKnownStatuses.add(subscribeStreamResponse.toString()); diff --git a/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/PublishStreamGrpcClientImpl.java b/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/PublishStreamGrpcClientImpl.java index 7e4a22621..c2b4a58a2 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/PublishStreamGrpcClientImpl.java +++ b/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/PublishStreamGrpcClientImpl.java @@ -38,6 +38,7 @@ import io.grpc.ManagedChannelBuilder; import io.grpc.stub.StreamObserver; import java.util.ArrayDeque; +import java.util.Deque; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import javax.inject.Inject; @@ -64,7 +65,7 @@ public class PublishStreamGrpcClientImpl implements PublishStreamGrpcClient { // State private final AtomicBoolean streamEnabled; private final int lastKnownStatusesCapacity; - private final ArrayDeque lastKnownStatuses; + private final Deque lastKnownStatuses; /** * Creates a new PublishStreamGrpcClientImpl with the specified dependencies. diff --git a/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/PublishStreamObserver.java b/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/PublishStreamObserver.java index c7fde69c6..6628228b2 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/PublishStreamObserver.java +++ b/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/PublishStreamObserver.java @@ -24,7 +24,7 @@ import edu.umd.cs.findbugs.annotations.NonNull; import io.grpc.Status; import io.grpc.stub.StreamObserver; -import java.util.ArrayDeque; +import java.util.Deque; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -37,19 +37,20 @@ public class PublishStreamObserver implements StreamObserver lastKnownStatuses; + private final Deque lastKnownStatuses; /** * Creates a new PublishStreamObserver instance. * * @param streamEnabled Controls whether streaming should continue * @param lastKnownStatuses List to store the most recent status messages + * @param lastKnownStatusesCapacity the capacity of the last known statuses * @throws NullPointerException if any parameter is null */ public PublishStreamObserver( @NonNull final AtomicBoolean streamEnabled, - @NonNull final ArrayDeque lastKnownStatuses, - @NonNull final int lastKnownStatusesCapacity) { + @NonNull final Deque lastKnownStatuses, + final int lastKnownStatusesCapacity) { this.streamEnabled = requireNonNull(streamEnabled); this.lastKnownStatuses = requireNonNull(lastKnownStatuses); this.lastKnownStatusesCapacity = lastKnownStatusesCapacity; @@ -63,7 +64,7 @@ public PublishStreamObserver( @Override public void onNext(PublishStreamResponse publishStreamResponse) { if (lastKnownStatuses.size() == lastKnownStatusesCapacity) { - lastKnownStatuses.removeFirst(); + lastKnownStatuses.pollFirst(); } lastKnownStatuses.add(publishStreamResponse.toString()); LOGGER.log(INFO, "Received Response: " + publishStreamResponse.toString()); diff --git a/suites/src/main/java/com/hedera/block/suites/grpc/positive/PositiveEndpointBehaviourTests.java b/suites/src/main/java/com/hedera/block/suites/grpc/positive/PositiveEndpointBehaviourTests.java index 2c4bd041b..de8e3194f 100644 --- a/suites/src/main/java/com/hedera/block/suites/grpc/positive/PositiveEndpointBehaviourTests.java +++ b/suites/src/main/java/com/hedera/block/suites/grpc/positive/PositiveEndpointBehaviourTests.java @@ -16,7 +16,6 @@ package com.hedera.block.suites.grpc.positive; -import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import com.hedera.block.simulator.BlockStreamSimulatorApp; @@ -74,10 +73,10 @@ void verifyPublishBlockStreamEndpoint() throws IOException, InterruptedException blockStreamSimulatorApp.stop(); StreamStatus streamStatus = blockStreamSimulatorApp.getStreamStatus(); assertTrue(streamStatus.publishedBlocks() > 0); - // We just need to make sure that number of published blocks is equal or greater than the statuses. Statuses are tracked in a queue to avoid unnecessary memory usage, therefore will always be less or equal to published. - assertTrue( - streamStatus.publishedBlocks() >= - streamStatus.lastKnownPublisherStatuses().size()); + // We just need to make sure that number of published blocks is equal or greater than the statuses. Statuses are + // tracked in a queue to avoid unnecessary memory usage, therefore will always be less or equal to published. + assertTrue(streamStatus.publishedBlocks() + >= streamStatus.lastKnownPublisherStatuses().size()); // Verify each status contains the word "acknowledgement" streamStatus