From ff0f3f49a50f15e9129f99c390452a2c9cbef64e Mon Sep 17 00:00:00 2001 From: Matt Peterson Date: Wed, 20 Nov 2024 13:15:23 -0700 Subject: [PATCH] fix: fixing and adding tests Signed-off-by: Matt Peterson --- .../producer/ProducerBlockItemObserver.java | 11 +- .../server/grpc/BlockStreamServiceTest.java | 82 ------- .../ProducerBlockItemObserverTest.java | 213 ++++++++++++------ .../block/server/util/PersistTestUtils.java | 42 ---- 4 files changed, 146 insertions(+), 202 deletions(-) delete mode 100644 server/src/test/java/com/hedera/block/server/grpc/BlockStreamServiceTest.java diff --git a/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java b/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java index e2433df9..191d485c 100644 --- a/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java +++ b/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java @@ -116,6 +116,10 @@ public void onSubscribe(Flow.Subscription subscription) { public void onNext(@NonNull final List blockItems) { LOGGER.log(DEBUG, "Received PublishStreamRequest from producer with " + blockItems.size() + " BlockItems."); + if (blockItems.isEmpty()) { + return; + } + metricsService.get(LiveBlockItemsReceived).add(blockItems.size()); // Publish the block to all the subscribers unless @@ -125,9 +129,7 @@ public void onNext(@NonNull final List blockItems) { livenessCalculator.refresh(); // Publish the block to the mediator - if (!blockItems.isEmpty()) { - publisher.publish(blockItems); - } + publisher.publish(blockItems); } else { LOGGER.log(ERROR, getClass().getName() + " is not accepting BlockItems"); @@ -147,6 +149,7 @@ public void onEvent(ObjectEvent event, long sequence, boo if (isResponsePermitted.get()) { if (isTimeoutExpired()) { + isResponsePermitted.set(false); subscriptionHandler.unsubscribe(this); LOGGER.log(DEBUG, "Producer liveness timeout. Unsubscribed ProducerBlockItemObserver."); } else { @@ -190,8 +193,6 @@ public void onComplete() { isResponsePermitted.set(false); subscriptionHandler.unsubscribe(this); LOGGER.log(DEBUG, "Producer completed the stream. Observer unsubscribed."); - - publishStreamResponseObserver.onComplete(); } @Override diff --git a/server/src/test/java/com/hedera/block/server/grpc/BlockStreamServiceTest.java b/server/src/test/java/com/hedera/block/server/grpc/BlockStreamServiceTest.java deleted file mode 100644 index a40bf40c..00000000 --- a/server/src/test/java/com/hedera/block/server/grpc/BlockStreamServiceTest.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Copyright (C) 2024 Hedera Hashgraph, LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.hedera.block.server.grpc; - -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.junit.jupiter.MockitoExtension; - -@ExtendWith(MockitoExtension.class) -public class BlockStreamServiceTest { - - // @Mock - // private Notifier notifier; - // - // @Mock - // private Flow.Subscriber responseObserver; - // - // @Mock - // private LiveStreamMediator streamMediator; - // - // @Mock - // private BlockReader blockReader; - // - // @Mock - // private BlockWriter> blockWriter; - // - // @Mock - // private ServiceStatus serviceStatus; - // - // private final Logger LOGGER = System.getLogger(getClass().getName()); - // - // private static final int testTimeout = 1000; - // - // private PbjBlockStreamService blockStreamService; - // - // @TempDir - // private Path testPath; - // - // private BlockNodeContext blockNodeContext; - // private PersistenceStorageConfig config; - // - // @BeforeEach - // public void setUp() throws IOException { - // blockNodeContext = - // TestConfigUtil.getTestBlockNodeContext(Map.of("persistence.storage.rootPath", - // testPath.toString())); - // config = blockNodeContext.configuration().getConfigData(PersistenceStorageConfig.class); - // - // final var blockNodeEventHandler = new StreamPersistenceHandlerImpl( - // streamMediator, notifier, blockWriter, blockNodeContext, serviceStatus); - // blockStreamService = new PbjBlockStreamServiceProxy( - // streamMediator, serviceStatus, blockNodeEventHandler, notifier, blockNodeContext); - // } - // - // @Test - // public void testServiceName() { - // assertEquals(SERVICE_NAME_BLOCK_STREAM, blockStreamService.serviceName()); - // } - // - // @Test - // public void testFullName() { - // assertEquals(FULL_SERVICE_NAME_BLOCK_STREAM, blockStreamService.fullName()); - // } - // - // @Test - // public void testMethods() { - // assertEquals(2, blockStreamService.methods().size()); - // } -} diff --git a/server/src/test/java/com/hedera/block/server/producer/ProducerBlockItemObserverTest.java b/server/src/test/java/com/hedera/block/server/producer/ProducerBlockItemObserverTest.java index d2ca981c..d27889e8 100644 --- a/server/src/test/java/com/hedera/block/server/producer/ProducerBlockItemObserverTest.java +++ b/server/src/test/java/com/hedera/block/server/producer/ProducerBlockItemObserverTest.java @@ -16,83 +16,150 @@ package com.hedera.block.server.producer; +import static com.hedera.block.server.util.PersistTestUtils.generateBlockItemsUnparsed; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.hedera.block.server.config.BlockNodeContext; +import com.hedera.block.server.events.ObjectEvent; +import com.hedera.block.server.mediator.Publisher; +import com.hedera.block.server.mediator.SubscriptionHandler; +import com.hedera.block.server.service.ServiceStatus; +import com.hedera.block.server.service.ServiceStatusImpl; +import com.hedera.block.server.util.TestConfigUtil; +import com.hedera.hapi.block.BlockItemUnparsed; +import com.hedera.hapi.block.PublishStreamResponse; +import com.hedera.pbj.runtime.grpc.Pipeline; +import java.io.IOException; +import java.time.InstantSource; +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @ExtendWith(MockitoExtension.class) public class ProducerBlockItemObserverTest { - // @Mock - // private InstantSource testClock; - // - // @Mock - // private Publisher> publisher; - // - // @Mock - // private SubscriptionHandler subscriptionHandler; - // - // @Mock - // private Pipeline publishStreamResponseObserver; - // - // @Mock - // private ServiceStatus serviceStatus; - // - // @Mock - // private ObjectEvent objectEvent; - // - // private final long TIMEOUT_THRESHOLD_MILLIS = 50L; - // private static final int testTimeout = 1000; - // - // BlockNodeContext testContext; - // - // @BeforeEach - // public void setUp() throws IOException { - // this.testContext = TestConfigUtil.getTestBlockNodeContext( - // Map.of(TestConfigUtil.CONSUMER_TIMEOUT_THRESHOLD_KEY, String.valueOf(TIMEOUT_THRESHOLD_MILLIS))); - // } - // - // @Test - // @Disabled - // public void testOnError() throws IOException { - // - // final BlockNodeContext blockNodeContext = TestConfigUtil.getTestBlockNodeContext(); - // final ProducerBlockItemObserver producerBlockItemObserver = new ProducerBlockItemObserver( - // testClock, - // publisher, - // subscriptionHandler, - // publishStreamResponseObserver, - // blockNodeContext, - // serviceStatus); - // - // final Throwable t = new Throwable("Test error"); - // producerBlockItemObserver.onError(t); - // verify(publishStreamResponseObserver).onError(t); - // } - // - // @Test - // public void testOnlyErrorStreamResponseAllowedAfterStatusChange() { - // - // final ServiceStatus serviceStatus = new ServiceStatusImpl(testContext); - // - // final ProducerBlockItemObserver producerBlockItemObserver = new ProducerBlockItemObserver( - // testClock, publisher, subscriptionHandler, publishStreamResponseObserver, testContext, - // serviceStatus); - // - // final List blockItems = generateBlockItems(1); - // final PublishStreamRequest publishStreamRequest = PublishStreamRequest.newBuilder() - // .blockItems(new BlockItemSet(blockItems)) - // .build(); - // - // // Confirm that the observer is called with the first BlockItem - // producerBlockItemObserver.onNext(publishStreamRequest); - // - // // Change the status of the service - // serviceStatus.stopRunning(getClass().getName()); - // - // // Confirm that the observer is called with the first BlockItem - // producerBlockItemObserver.onNext(publishStreamRequest); - // - // // Confirm that closing the observer allowed only 1 response to be sent. - // verify(publishStreamResponseObserver, timeout(testTimeout).times(1)).onNext(any()); - // } + @Mock + private InstantSource testClock; + + @Mock + private Publisher> publisher; + + @Mock + private SubscriptionHandler subscriptionHandler; + + @Mock + private Pipeline helidonPublishPipeline; + + @Mock + private ServiceStatus serviceStatus; + + @Mock + private ObjectEvent objectEvent; + + private final long TEST_TIME = 1_719_427_664_950L; + private final long TIMEOUT_THRESHOLD_MILLIS = 50L; + private static final int testTimeout = 1000; + + BlockNodeContext testContext; + + @BeforeEach + public void setUp() throws IOException { + this.testContext = TestConfigUtil.getTestBlockNodeContext( + Map.of(TestConfigUtil.CONSUMER_TIMEOUT_THRESHOLD_KEY, String.valueOf(TIMEOUT_THRESHOLD_MILLIS))); + } + + @Test + public void testConfirmOnErrorNotCalled() { + + final ProducerBlockItemObserver producerBlockItemObserver = new ProducerBlockItemObserver( + testClock, publisher, subscriptionHandler, helidonPublishPipeline, testContext, serviceStatus); + + // Confirm that onError will call the handler + // to unsubscribe but make sure onError is never + // called on the helidonPublishPipeline. + // Calling onError() on the helidonPublishPipeline + // passed by the Helidon PBJ plugin may cause + // a loop of calls. + final Throwable t = new Throwable("Test error"); + producerBlockItemObserver.onError(t); + verify(subscriptionHandler, timeout(testTimeout).times(1)).unsubscribe(any()); + verify(helidonPublishPipeline, never()).onError(t); + } + + @Test + public void testOnEventCallsUnsubscribeOnExpiration() { + + when(testClock.millis()).thenReturn(TEST_TIME, TEST_TIME + TIMEOUT_THRESHOLD_MILLIS + 1); + final ProducerBlockItemObserver producerBlockItemObserver = new ProducerBlockItemObserver( + testClock, publisher, subscriptionHandler, helidonPublishPipeline, testContext, serviceStatus); + + producerBlockItemObserver.onEvent(objectEvent, 0, true); + producerBlockItemObserver.onEvent(objectEvent, 0, true); + + verify(subscriptionHandler, timeout(testTimeout).times(1)).unsubscribe(producerBlockItemObserver); + } + + @Test + public void testOnSubscribe() { + + final ProducerBlockItemObserver producerBlockItemObserver = new ProducerBlockItemObserver( + testClock, publisher, subscriptionHandler, helidonPublishPipeline, testContext, serviceStatus); + + // Currently, our implementation of onSubscribe() is a + // no-op. + producerBlockItemObserver.onSubscribe(null); + } + + @Test + public void testEmptyBlockItems() { + + final ProducerBlockItemObserver producerBlockItemObserver = new ProducerBlockItemObserver( + testClock, publisher, subscriptionHandler, helidonPublishPipeline, testContext, serviceStatus); + + producerBlockItemObserver.onNext(List.of()); + verify(publisher, never()).publish(any()); + } + + @Test + public void testOnlyErrorStreamResponseAllowedAfterStatusChange() { + + final ServiceStatus serviceStatus = new ServiceStatusImpl(testContext); + + final ProducerBlockItemObserver producerBlockItemObserver = new ProducerBlockItemObserver( + testClock, publisher, subscriptionHandler, helidonPublishPipeline, testContext, serviceStatus); + + final List blockItems = generateBlockItemsUnparsed(1); + + // Send a request + producerBlockItemObserver.onNext(blockItems); + + // Change the status of the service + serviceStatus.stopRunning(getClass().getName()); + + // Send another request + producerBlockItemObserver.onNext(blockItems); + + // Confirm that closing the observer allowed only 1 response to be sent. + verify(helidonPublishPipeline, timeout(testTimeout).times(1)).onNext(any()); + } + + @Test + public void testClientEndStreamReceived() { + + final ProducerBlockItemObserver producerBlockItemObserver = new ProducerBlockItemObserver( + testClock, publisher, subscriptionHandler, helidonPublishPipeline, testContext, serviceStatus); + + producerBlockItemObserver.clientEndStreamReceived(); + + // Confirm that the observer was unsubscribed + verify(subscriptionHandler, timeout(testTimeout).times(1)).unsubscribe(producerBlockItemObserver); + } } diff --git a/server/src/test/java/com/hedera/block/server/util/PersistTestUtils.java b/server/src/test/java/com/hedera/block/server/util/PersistTestUtils.java index a5d51a25..eac41ae0 100644 --- a/server/src/test/java/com/hedera/block/server/util/PersistTestUtils.java +++ b/server/src/test/java/com/hedera/block/server/util/PersistTestUtils.java @@ -20,7 +20,6 @@ import static java.lang.System.Logger.Level.INFO; import com.hedera.hapi.block.BlockItemUnparsed; -import com.hedera.hapi.block.stream.BlockItem; import com.hedera.hapi.block.stream.BlockProof; import com.hedera.hapi.block.stream.input.EventHeader; import com.hedera.hapi.block.stream.output.BlockHeader; @@ -94,47 +93,6 @@ public static List generateBlockItemsUnparsed(int numOfBlocks return blockItems; } - public static List generateBlockItems(int numOfBlocks) { - - List blockItems = new ArrayList<>(); - for (int i = 1; i <= numOfBlocks; i++) { - for (int j = 1; j <= 10; j++) { - switch (j) { - case 1: - // First block is always the header - blockItems.add(BlockItem.newBuilder() - .blockHeader(BlockHeader.newBuilder() - .number(i) - .softwareVersion(SemanticVersion.newBuilder() - .major(1) - .minor(0) - .build()) - .build()) - .build()); - break; - case 10: - // Last block is always the state proof - blockItems.add(BlockItem.newBuilder() - .blockProof(BlockProof.newBuilder().block(i).build()) - .build()); - break; - default: - // Middle blocks are events - blockItems.add(BlockItem.newBuilder() - .eventHeader(EventHeader.newBuilder() - .eventCore(EventCore.newBuilder() - .creatorNodeId(i) - .build()) - .build()) - .build()); - break; - } - } - } - - return blockItems; - } - public static byte[] reverseByteArray(byte[] input) { if (input == null || input.length == 0) { return input;