Skip to content

Commit

Permalink
fix: enabled another test
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Sep 11, 2024
1 parent 750b0f0 commit 895070b
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import com.hedera.block.server.events.ObjectEvent;
import com.hedera.block.server.mediator.LiveStreamMediator;
import com.hedera.block.server.mediator.LiveStreamMediatorBuilder;
import com.hedera.block.server.persistence.storage.PersistenceStorageConfig;
import com.hedera.block.server.persistence.storage.read.BlockReader;
import com.hedera.block.server.persistence.storage.write.BlockAsDirWriterBuilder;
import com.hedera.block.server.persistence.storage.write.BlockWriter;
Expand Down Expand Up @@ -68,6 +67,7 @@
import java.nio.file.Path;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -135,7 +135,6 @@ public class BlockStreamServiceIntegrationTest {

private Path testPath;
private BlockNodeContext blockNodeContext;
private PersistenceStorageConfig persistenceStorageConfig;

private static final int testTimeout = 1000;

Expand All @@ -148,11 +147,8 @@ public void setUp() throws IOException {
properties.put("persistence.storage.rootPath", testPath.toString());
properties.put("persistence.storage.blockItemSize", "1024");
properties.put("producer.blockItemSize", "1024");
properties.put("consumer.timeoutThresholdMillis", "40000");

blockNodeContext = TestConfigUtil.getTestBlockNodeContext(properties);
persistenceStorageConfig =
blockNodeContext.configuration().getConfigData(PersistenceStorageConfig.class);
}

@AfterEach
Expand Down Expand Up @@ -422,100 +418,102 @@ public void testFullWithSubscribersAddedDynamically() {
streamObserver.onCompleted();
}

// @Test
// public void testSubAndUnsubWhileStreaming() throws IOException {
//
// int numberOfBlocks = 100;
//
// final LinkedHashMap<
// BlockNodeEventHandler<ObjectEvent<SubscribeStreamResponse>>,
// BatchEventProcessor<ObjectEvent<SubscribeStreamResponse>>>
// subscribers = new LinkedHashMap<>();
// final var streamMediator = buildStreamMediator(subscribers);
// final var blockStreamService =
// buildBlockStreamService(streamMediator, blockReader, serviceStatus);
//
// // Enable the serviceStatus
// when(serviceStatus.isRunning()).thenReturn(true);
//
// // Pass a StreamObserver to the producer as Helidon does
// final StreamObserver<com.hedera.hapi.block.protoc.PublishStreamRequest> streamObserver
// =
// blockStreamService.protocPublishBlockStream(publishStreamResponseObserver);
//
// final List<BlockItem> blockItems = generateBlockItems(numberOfBlocks);
//
// blockStreamService.protocSubscribeBlockStream(
// subscribeStreamRequest, subscribeStreamObserver1);
// blockStreamService.protocSubscribeBlockStream(
// subscribeStreamRequest, subscribeStreamObserver2);
// blockStreamService.protocSubscribeBlockStream(
// subscribeStreamRequest, subscribeStreamObserver3);
//
// for (int i = 0; i < blockItems.size(); i++) {
// final PublishStreamRequest publishStreamRequest =
// PublishStreamRequest.newBuilder().blockItem(blockItems.get(i)).build();
//
// // Remove a subscriber
// if (i == 10) {
// final var k = subscribers.firstEntry().getKey();
// streamMediator.unsubscribe(k);
// }
//
// if (i == 60) {
// final var k = subscribers.firstEntry().getKey();
// streamMediator.unsubscribe(k);
// }
//
// // Add a new subscriber
// if (i == 51) {
// blockStreamService.protocSubscribeBlockStream(
// subscribeStreamRequest, subscribeStreamObserver4);
// }
//
// // Transmit the BlockItem
// streamObserver.onNext(fromPbj(publishStreamRequest));
//
// if (i == 70) {
// final var k = subscribers.firstEntry().getKey();
// streamMediator.unsubscribe(k);
// }
//
// // Add a new subscriber
// if (i == 76) {
// blockStreamService.protocSubscribeBlockStream(
// subscribeStreamRequest, subscribeStreamObserver5);
// }
//
// // Add a new subscriber
// if (i == 88) {
// blockStreamService.protocSubscribeBlockStream(
// subscribeStreamRequest, subscribeStreamObserver6);
// }
// }
//
// // Verify subscribers who were listening before the stream started
// verifySubscribeStreamResponse(numberOfBlocks, 0, 10, subscribeStreamObserver1,
// blockItems);
// verifySubscribeStreamResponse(numberOfBlocks, 0, 60, subscribeStreamObserver2,
// blockItems);
// verifySubscribeStreamResponse(numberOfBlocks, 0, 70, subscribeStreamObserver3,
// blockItems);
//
// // Verify subscribers added while the stream was in progress.
// // The Helidon-provided StreamObserver onNext() method will only
// // be called once a Header BlockItem is reached. So, pass in
// // the number of BlockItems to wait to verify that the method
// // was called.
// verifySubscribeStreamResponse(
// numberOfBlocks, 59, numberOfBlocks, subscribeStreamObserver4, blockItems);
// verifySubscribeStreamResponse(
// numberOfBlocks, 79, numberOfBlocks, subscribeStreamObserver5, blockItems);
// verifySubscribeStreamResponse(
// numberOfBlocks, 89, numberOfBlocks, subscribeStreamObserver6, blockItems);
//
// streamObserver.onCompleted();
// }
@Test
public void testSubAndUnsubWhileStreaming() throws IOException {

int numberOfBlocks = 100;

final LinkedHashMap<
BlockNodeEventHandler<ObjectEvent<SubscribeStreamResponse>>,
BatchEventProcessor<ObjectEvent<SubscribeStreamResponse>>>
consumers = new LinkedHashMap<>();

final ServiceStatus serviceStatus = new ServiceStatusImpl(blockNodeContext);
final var streamMediator = buildStreamMediator(consumers, serviceStatus);
final var streamValidatorBuilder =
StreamValidatorBuilder.newBuilder(blockWriter, blockNodeContext, serviceStatus);
final var blockStreamService =
new BlockStreamService(
streamMediator,
blockReader,
serviceStatus,
streamValidatorBuilder,
blockNodeContext);

// Pass a StreamObserver to the producer as Helidon does
final StreamObserver<com.hedera.hapi.block.protoc.PublishStreamRequest> streamObserver =
blockStreamService.protocPublishBlockStream(publishStreamResponseObserver1);

final List<BlockItem> blockItems = generateBlockItems(numberOfBlocks);

blockStreamService.protocSubscribeBlockStream(
subscribeStreamRequest, subscribeStreamObserver1);
blockStreamService.protocSubscribeBlockStream(
subscribeStreamRequest, subscribeStreamObserver2);
blockStreamService.protocSubscribeBlockStream(
subscribeStreamRequest, subscribeStreamObserver3);

for (int i = 0; i < blockItems.size(); i++) {
final PublishStreamRequest publishStreamRequest =
PublishStreamRequest.newBuilder().blockItem(blockItems.get(i)).build();

// Remove a subscriber
if (i == 10) {
final var k = consumers.firstEntry().getKey();
streamMediator.unsubscribe(k);
}

if (i == 60) {
final var k = consumers.firstEntry().getKey();
streamMediator.unsubscribe(k);
}

// Add a new subscriber
if (i == 51) {
blockStreamService.protocSubscribeBlockStream(
subscribeStreamRequest, subscribeStreamObserver4);
}

// Transmit the BlockItem
streamObserver.onNext(fromPbj(publishStreamRequest));

if (i == 70) {
final var k = consumers.firstEntry().getKey();
streamMediator.unsubscribe(k);
}

// Add a new subscriber
if (i == 76) {
blockStreamService.protocSubscribeBlockStream(
subscribeStreamRequest, subscribeStreamObserver5);
}

// Add a new subscriber
if (i == 88) {
blockStreamService.protocSubscribeBlockStream(
subscribeStreamRequest, subscribeStreamObserver6);
}
}

// Verify subscribers who were listening before the stream started
verifySubscribeStreamResponse(numberOfBlocks, 0, 10, subscribeStreamObserver1, blockItems);
verifySubscribeStreamResponse(numberOfBlocks, 0, 60, subscribeStreamObserver2, blockItems);
verifySubscribeStreamResponse(numberOfBlocks, 0, 70, subscribeStreamObserver3, blockItems);

// Verify subscribers added while the stream was in progress.
// The Helidon-provided StreamObserver onNext() method will only
// be called once a Header BlockItem is reached. So, pass in
// the number of BlockItems to wait to verify that the method
// was called.
verifySubscribeStreamResponse(
numberOfBlocks, 59, numberOfBlocks, subscribeStreamObserver4, blockItems);
verifySubscribeStreamResponse(
numberOfBlocks, 79, numberOfBlocks, subscribeStreamObserver5, blockItems);
verifySubscribeStreamResponse(
numberOfBlocks, 89, numberOfBlocks, subscribeStreamObserver6, blockItems);

streamObserver.onCompleted();
}

@Test
public void testMediatorExceptionHandlingWhenPersistenceFailure() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.hedera.block.server.mediator.SubscriptionHandler;
import com.hedera.block.server.producer.ProducerBlockItemObserver;
import com.hedera.block.server.service.ServiceStatus;
import com.hedera.block.server.service.ServiceStatusImpl;
import com.hedera.block.server.util.TestConfigUtil;
import com.hedera.hapi.block.Acknowledgement;
import com.hedera.hapi.block.PublishStreamResponse;
Expand Down Expand Up @@ -60,10 +61,6 @@ public class NotifierImplTest {
@Mock private ServiceStatus serviceStatus;
@Mock private SubscriptionHandler<PublishStreamResponse> subscriptionHandler;

// @Mock private EventHandler<ObjectEvent<PublishStreamResponse>> observer1;
// @Mock private EventHandler<ObjectEvent<PublishStreamResponse>> observer2;
// @Mock private EventHandler<ObjectEvent<PublishStreamResponse>> observer3;

@Mock
private StreamObserver<com.hedera.hapi.block.protoc.PublishStreamResponse> streamObserver1;

Expand Down Expand Up @@ -91,8 +88,7 @@ public NotifierImplTest() throws IOException {
@Test
public void testRegistration() throws NoSuchAlgorithmException {

when(serviceStatus.isRunning()).thenReturn(true);

final ServiceStatus serviceStatus = new ServiceStatusImpl(testContext);
final var notifier =
NotifierBuilder.newBuilder(mediator, testContext, serviceStatus)
.blockStreamService(blockStreamService)
Expand Down

0 comments on commit 895070b

Please sign in to comment.