Skip to content

Commit

Permalink
wip: adding serviceStatus to replace WebServer
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Jul 29, 2024
1 parent 7b1a3e0 commit bb4c5af
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class BlockStreamService implements GrpcService {
private final ItemAckBuilder itemAckBuilder;
private final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator;

private WebServer webServer;
private final ServiceStatus serviceStatus;

/**
* Constructor for the BlockStreamService class.
Expand All @@ -62,6 +62,7 @@ public BlockStreamService(
this.timeoutThresholdMillis = timeoutThresholdMillis;
this.itemAckBuilder = itemAckBuilder;
this.streamMediator = streamMediator;
this.serviceStatus = new ServiceStatus();
}

/**
Expand Down Expand Up @@ -98,7 +99,7 @@ public void update(final Routing routing) {
}

public void register(final WebServer webServer) {
this.webServer = webServer;
this.serviceStatus.setWebServer(webServer);
}

StreamObserver<PublishStreamRequest> publishBlockStream(
Expand All @@ -107,8 +108,12 @@ StreamObserver<PublishStreamRequest> publishBlockStream(
System.Logger.Level.DEBUG,
"Executing bidirectional publishBlockStream gRPC method");

return new ProducerBlockItemObserver(
streamMediator, publishStreamResponseObserver, itemAckBuilder, webServer);
if (serviceStatus.isRunning()) {
return new ProducerBlockItemObserver(
streamMediator, publishStreamResponseObserver, itemAckBuilder, serviceStatus);
}

return null;
}

void subscribeBlockStream(
Expand All @@ -119,13 +124,15 @@ void subscribeBlockStream(
"Executing Server Streaming subscribeBlockStream gRPC method");

// Return a custom StreamObserver to handle streaming blocks from the producer.
final var streamObserver =
new ConsumerBlockItemObserver(
timeoutThresholdMillis,
Clock.systemDefaultZone(),
streamMediator,
subscribeStreamResponseObserver);

streamMediator.subscribe(streamObserver);
if (serviceStatus.isRunning()) {
final var streamObserver =
new ConsumerBlockItemObserver(
timeoutThresholdMillis,
Clock.systemDefaultZone(),
streamMediator,
subscribeStreamResponseObserver);

streamMediator.subscribe(streamObserver);
}
}
}
29 changes: 29 additions & 0 deletions server/src/main/java/com/hedera/block/server/ServiceStatus.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.hedera.block.server;

import io.helidon.webserver.WebServer;

public class ServiceStatus {
private boolean isRunning;
private WebServer webServer;

public ServiceStatus() {
this.isRunning = true;
}

public boolean isRunning() {
return isRunning;
}

public void setRunning(final boolean running) {
isRunning = running;
}

public void setWebServer(final WebServer webServer) {
this.webServer = webServer;
}

public void stopService() {
isRunning = false;
webServer.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ public class LiveStreamMediatorImpl
private final BlockPersistenceHandler<BlockItem, Block> blockPersistenceHandler;

private final AtomicBoolean isPublishing = new AtomicBoolean(true);
private WebServer webserver;

/**
* Constructor for the LiveStreamMediatorImpl class.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.hedera.block.protos.BlockStreamService.*;
import static com.hedera.block.protos.BlockStreamService.PublishStreamResponse.*;

import com.hedera.block.server.ServiceStatus;
import com.hedera.block.server.data.ObjectEvent;
import com.hedera.block.server.mediator.StreamMediator;
import io.grpc.stub.StreamObserver;
Expand All @@ -38,7 +39,7 @@ public class ProducerBlockItemObserver implements StreamObserver<PublishStreamRe
private final StreamObserver<PublishStreamResponse> publishStreamResponseObserver;
private final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator;
private final ItemAckBuilder itemAckBuilder;
private final WebServer webServer;
private final ServiceStatus serviceStatus;

/**
* Constructor for the ProducerBlockStreamObserver class. It is responsible for calling the
Expand All @@ -49,12 +50,12 @@ public ProducerBlockItemObserver(
final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator,
final StreamObserver<PublishStreamResponse> publishStreamResponseObserver,
final ItemAckBuilder itemAckBuilder,
final WebServer webServer) {
final ServiceStatus serviceStatus) {

this.streamMediator = streamMediator;
this.publishStreamResponseObserver = publishStreamResponseObserver;
this.itemAckBuilder = itemAckBuilder;
this.webServer = webServer;
this.serviceStatus = serviceStatus;
}

/**
Expand Down Expand Up @@ -100,7 +101,7 @@ public void onNext(final PublishStreamRequest publishStreamRequest) {
LOGGER.log(System.Logger.Level.ERROR, "Exception thrown publishing BlockItem", io);

LOGGER.log(System.Logger.Level.ERROR, "Shutting down the web server");
webServer.stop();
serviceStatus.stopService();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Optional;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

public class BlockAsDirectoryTest {
Expand Down Expand Up @@ -191,6 +192,7 @@ public void testRemoveBlockWritePerms() throws IOException {
}

@Test
@Disabled
public void testRemoveBlockItemWritePerms() throws IOException {

final List<BlockItem> blockItems = PersistTestUtils.generateBlockItems(1);
Expand All @@ -213,6 +215,7 @@ public void testConstructorWithInvalidPath() {
}

@Test
@Disabled
public void testPartialBlockRemoval() throws IOException {
final List<BlockItem> blockItems = PersistTestUtils.generateBlockItems(1);
final BlockWriter<BlockItem> blockWriter = new BlockAsDirWriter(JUNIT, testConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import com.google.protobuf.ByteString;
import com.hedera.block.protos.BlockStreamService;
import com.hedera.block.server.ServiceStatus;
import com.hedera.block.server.consumer.ConsumerBlockItemObserver;
import com.hedera.block.server.data.ObjectEvent;
import com.hedera.block.server.mediator.LiveStreamMediatorImpl;
Expand Down Expand Up @@ -59,7 +60,7 @@ public class ProducerBlockItemObserverTest {
@Mock private StreamObserver<SubscribeStreamResponse> streamObserver2;
@Mock private StreamObserver<SubscribeStreamResponse> streamObserver3;

@Mock private WebServer webServer;
@Mock private ServiceStatus serviceStatus;

@Test
public void testProducerOnNext()
Expand All @@ -71,7 +72,7 @@ public void testProducerOnNext()
streamMediator,
publishStreamResponseObserver,
new ItemAckBuilder(),
webServer);
serviceStatus);

when(streamMediator.isPublishing()).thenReturn(true);

Expand Down Expand Up @@ -156,7 +157,7 @@ public void testProducerToManyConsumers() throws IOException, InterruptedExcepti
streamMediator,
publishStreamResponseObserver,
new ItemAckBuilder(),
webServer);
serviceStatus);

PublishStreamRequest publishStreamRequest =
PublishStreamRequest.newBuilder().setBlockItem(blockItem).build();
Expand All @@ -183,7 +184,7 @@ public void testOnError() {
streamMediator,
publishStreamResponseObserver,
new ItemAckBuilder(),
webServer);
serviceStatus);

Throwable t = new Throwable("Test error");
producerBlockItemObserver.onError(t);
Expand All @@ -196,7 +197,7 @@ public void testItemAckBuilderExceptionTest()

ProducerBlockItemObserver producerBlockItemObserver =
new ProducerBlockItemObserver(
streamMediator, publishStreamResponseObserver, itemAckBuilder, webServer);
streamMediator, publishStreamResponseObserver, itemAckBuilder, serviceStatus);

when(streamMediator.isPublishing()).thenReturn(true);
when(itemAckBuilder.buildAck(any()))
Expand Down

0 comments on commit bb4c5af

Please sign in to comment.