Skip to content

Commit

Permalink
fix: fixing and adding tests
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Nov 20, 2024
1 parent b16e985 commit ff0f3f4
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 202 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ public void onSubscribe(Flow.Subscription subscription) {
public void onNext(@NonNull final List<BlockItemUnparsed> blockItems) {

LOGGER.log(DEBUG, "Received PublishStreamRequest from producer with " + blockItems.size() + " BlockItems.");
if (blockItems.isEmpty()) {
return;
}

metricsService.get(LiveBlockItemsReceived).add(blockItems.size());

// Publish the block to all the subscribers unless
Expand All @@ -125,9 +129,7 @@ public void onNext(@NonNull final List<BlockItemUnparsed> blockItems) {
livenessCalculator.refresh();

// Publish the block to the mediator
if (!blockItems.isEmpty()) {
publisher.publish(blockItems);
}
publisher.publish(blockItems);

} else {
LOGGER.log(ERROR, getClass().getName() + " is not accepting BlockItems");
Expand All @@ -147,6 +149,7 @@ public void onEvent(ObjectEvent<PublishStreamResponse> event, long sequence, boo

if (isResponsePermitted.get()) {
if (isTimeoutExpired()) {
isResponsePermitted.set(false);
subscriptionHandler.unsubscribe(this);
LOGGER.log(DEBUG, "Producer liveness timeout. Unsubscribed ProducerBlockItemObserver.");
} else {
Expand Down Expand Up @@ -190,8 +193,6 @@ public void onComplete() {
isResponsePermitted.set(false);
subscriptionHandler.unsubscribe(this);
LOGGER.log(DEBUG, "Producer completed the stream. Observer unsubscribed.");

publishStreamResponseObserver.onComplete();
}

@Override
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,83 +16,150 @@

package com.hedera.block.server.producer;

import static com.hedera.block.server.util.PersistTestUtils.generateBlockItemsUnparsed;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.hedera.block.server.config.BlockNodeContext;
import com.hedera.block.server.events.ObjectEvent;
import com.hedera.block.server.mediator.Publisher;
import com.hedera.block.server.mediator.SubscriptionHandler;
import com.hedera.block.server.service.ServiceStatus;
import com.hedera.block.server.service.ServiceStatusImpl;
import com.hedera.block.server.util.TestConfigUtil;
import com.hedera.hapi.block.BlockItemUnparsed;
import com.hedera.hapi.block.PublishStreamResponse;
import com.hedera.pbj.runtime.grpc.Pipeline;
import java.io.IOException;
import java.time.InstantSource;
import java.util.List;
import java.util.Map;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
public class ProducerBlockItemObserverTest {

// @Mock
// private InstantSource testClock;
//
// @Mock
// private Publisher<List<BlockItem>> publisher;
//
// @Mock
// private SubscriptionHandler<PublishStreamResponse> subscriptionHandler;
//
// @Mock
// private Pipeline<PublishStreamResponse> publishStreamResponseObserver;
//
// @Mock
// private ServiceStatus serviceStatus;
//
// @Mock
// private ObjectEvent<PublishStreamResponse> objectEvent;
//
// private final long TIMEOUT_THRESHOLD_MILLIS = 50L;
// private static final int testTimeout = 1000;
//
// BlockNodeContext testContext;
//
// @BeforeEach
// public void setUp() throws IOException {
// this.testContext = TestConfigUtil.getTestBlockNodeContext(
// Map.of(TestConfigUtil.CONSUMER_TIMEOUT_THRESHOLD_KEY, String.valueOf(TIMEOUT_THRESHOLD_MILLIS)));
// }
//
// @Test
// @Disabled
// public void testOnError() throws IOException {
//
// final BlockNodeContext blockNodeContext = TestConfigUtil.getTestBlockNodeContext();
// final ProducerBlockItemObserver producerBlockItemObserver = new ProducerBlockItemObserver(
// testClock,
// publisher,
// subscriptionHandler,
// publishStreamResponseObserver,
// blockNodeContext,
// serviceStatus);
//
// final Throwable t = new Throwable("Test error");
// producerBlockItemObserver.onError(t);
// verify(publishStreamResponseObserver).onError(t);
// }
//
// @Test
// public void testOnlyErrorStreamResponseAllowedAfterStatusChange() {
//
// final ServiceStatus serviceStatus = new ServiceStatusImpl(testContext);
//
// final ProducerBlockItemObserver producerBlockItemObserver = new ProducerBlockItemObserver(
// testClock, publisher, subscriptionHandler, publishStreamResponseObserver, testContext,
// serviceStatus);
//
// final List<BlockItem> blockItems = generateBlockItems(1);
// final PublishStreamRequest publishStreamRequest = PublishStreamRequest.newBuilder()
// .blockItems(new BlockItemSet(blockItems))
// .build();
//
// // Confirm that the observer is called with the first BlockItem
// producerBlockItemObserver.onNext(publishStreamRequest);
//
// // Change the status of the service
// serviceStatus.stopRunning(getClass().getName());
//
// // Confirm that the observer is called with the first BlockItem
// producerBlockItemObserver.onNext(publishStreamRequest);
//
// // Confirm that closing the observer allowed only 1 response to be sent.
// verify(publishStreamResponseObserver, timeout(testTimeout).times(1)).onNext(any());
// }
@Mock
private InstantSource testClock;

@Mock
private Publisher<List<BlockItemUnparsed>> publisher;

@Mock
private SubscriptionHandler<PublishStreamResponse> subscriptionHandler;

@Mock
private Pipeline<PublishStreamResponse> helidonPublishPipeline;

@Mock
private ServiceStatus serviceStatus;

@Mock
private ObjectEvent<PublishStreamResponse> objectEvent;

private final long TEST_TIME = 1_719_427_664_950L;
private final long TIMEOUT_THRESHOLD_MILLIS = 50L;
private static final int testTimeout = 1000;

BlockNodeContext testContext;

@BeforeEach
public void setUp() throws IOException {
this.testContext = TestConfigUtil.getTestBlockNodeContext(
Map.of(TestConfigUtil.CONSUMER_TIMEOUT_THRESHOLD_KEY, String.valueOf(TIMEOUT_THRESHOLD_MILLIS)));
}

@Test
public void testConfirmOnErrorNotCalled() {

final ProducerBlockItemObserver producerBlockItemObserver = new ProducerBlockItemObserver(
testClock, publisher, subscriptionHandler, helidonPublishPipeline, testContext, serviceStatus);

// Confirm that onError will call the handler
// to unsubscribe but make sure onError is never
// called on the helidonPublishPipeline.
// Calling onError() on the helidonPublishPipeline
// passed by the Helidon PBJ plugin may cause
// a loop of calls.
final Throwable t = new Throwable("Test error");
producerBlockItemObserver.onError(t);
verify(subscriptionHandler, timeout(testTimeout).times(1)).unsubscribe(any());
verify(helidonPublishPipeline, never()).onError(t);
}

@Test
public void testOnEventCallsUnsubscribeOnExpiration() {

when(testClock.millis()).thenReturn(TEST_TIME, TEST_TIME + TIMEOUT_THRESHOLD_MILLIS + 1);
final ProducerBlockItemObserver producerBlockItemObserver = new ProducerBlockItemObserver(
testClock, publisher, subscriptionHandler, helidonPublishPipeline, testContext, serviceStatus);

producerBlockItemObserver.onEvent(objectEvent, 0, true);
producerBlockItemObserver.onEvent(objectEvent, 0, true);

verify(subscriptionHandler, timeout(testTimeout).times(1)).unsubscribe(producerBlockItemObserver);
}

@Test
public void testOnSubscribe() {

final ProducerBlockItemObserver producerBlockItemObserver = new ProducerBlockItemObserver(
testClock, publisher, subscriptionHandler, helidonPublishPipeline, testContext, serviceStatus);

// Currently, our implementation of onSubscribe() is a
// no-op.
producerBlockItemObserver.onSubscribe(null);
}

@Test
public void testEmptyBlockItems() {

final ProducerBlockItemObserver producerBlockItemObserver = new ProducerBlockItemObserver(
testClock, publisher, subscriptionHandler, helidonPublishPipeline, testContext, serviceStatus);

producerBlockItemObserver.onNext(List.of());
verify(publisher, never()).publish(any());
}

@Test
public void testOnlyErrorStreamResponseAllowedAfterStatusChange() {

final ServiceStatus serviceStatus = new ServiceStatusImpl(testContext);

final ProducerBlockItemObserver producerBlockItemObserver = new ProducerBlockItemObserver(
testClock, publisher, subscriptionHandler, helidonPublishPipeline, testContext, serviceStatus);

final List<BlockItemUnparsed> blockItems = generateBlockItemsUnparsed(1);

// Send a request
producerBlockItemObserver.onNext(blockItems);

// Change the status of the service
serviceStatus.stopRunning(getClass().getName());

// Send another request
producerBlockItemObserver.onNext(blockItems);

// Confirm that closing the observer allowed only 1 response to be sent.
verify(helidonPublishPipeline, timeout(testTimeout).times(1)).onNext(any());
}

@Test
public void testClientEndStreamReceived() {

final ProducerBlockItemObserver producerBlockItemObserver = new ProducerBlockItemObserver(
testClock, publisher, subscriptionHandler, helidonPublishPipeline, testContext, serviceStatus);

producerBlockItemObserver.clientEndStreamReceived();

// Confirm that the observer was unsubscribed
verify(subscriptionHandler, timeout(testTimeout).times(1)).unsubscribe(producerBlockItemObserver);
}
}
Loading

0 comments on commit ff0f3f4

Please sign in to comment.