From b1ea596fc7948a9a1519e4998ad8d7cfb5e77898 Mon Sep 17 00:00:00 2001 From: Matt Peterson Date: Thu, 25 Jul 2024 15:09:01 -0600 Subject: [PATCH] Added hedera user to the container. Added rethrow of IOException in the reader. Now the server will stop when IOExceptions are thrown upstream Signed-off-by: Matt Peterson --- server/docker/Dockerfile | 13 ++- .../block/server/BlockStreamService.java | 9 +- .../java/com/hedera/block/server/Server.java | 89 ++++++++++--------- .../mediator/LiveStreamMediatorImpl.java | 7 +- .../block/server/mediator/StreamMediator.java | 3 - .../persistence/storage/BlockAsDirWriter.java | 1 + .../producer/ProducerBlockItemObserver.java | 9 +- .../block/server/BlockStreamServiceIT.java | 9 ++ .../storage/BlockAsDirectoryTest.java | 18 +--- .../ProducerBlockItemObserverTest.java | 20 ++++- 10 files changed, 102 insertions(+), 76 deletions(-) diff --git a/server/docker/Dockerfile b/server/docker/Dockerfile index 58b93ceca..4c57fb8b4 100644 --- a/server/docker/Dockerfile +++ b/server/docker/Dockerfile @@ -1,9 +1,19 @@ # Use Eclipse Temurin with Java 21 as the base image FROM eclipse-temurin:21 +# Expose the port that the application will run on +EXPOSE 8080 + # Define version ARG VERSION +ARG UNAME=hedera +ARG UID=2000 +ARG GID=2000 +RUN groupadd -g $GID -o $UNAME +RUN useradd -m -u $UID -g $GID -o -s /bin/bash $UNAME +USER $UNAME + # Set the working directory inside the container WORKDIR /app @@ -13,8 +23,5 @@ COPY --from=distributions server-${VERSION}.tar . # Extract the TAR file RUN tar -xvf server-${VERSION}.tar -# Expose the port that the application will run on -EXPOSE 8080 - # RUN the bin script for starting the server ENTRYPOINT ["sh", "-c", "/app/server-${VERSION}/bin/server"] diff --git a/server/src/main/java/com/hedera/block/server/BlockStreamService.java b/server/src/main/java/com/hedera/block/server/BlockStreamService.java index 50510cb74..b0b488aec 100644 --- a/server/src/main/java/com/hedera/block/server/BlockStreamService.java +++ b/server/src/main/java/com/hedera/block/server/BlockStreamService.java @@ -26,6 +26,7 @@ import com.hedera.block.server.producer.ItemAckBuilder; import com.hedera.block.server.producer.ProducerBlockItemObserver; import io.grpc.stub.StreamObserver; +import io.helidon.webserver.WebServer; import io.helidon.webserver.grpc.GrpcService; import java.time.Clock; @@ -47,6 +48,8 @@ public class BlockStreamService implements GrpcService { private final ItemAckBuilder itemAckBuilder; private final StreamMediator> streamMediator; + private WebServer webServer; + /** * Constructor for the BlockStreamService class. * @@ -94,6 +97,10 @@ public void update(final Routing routing) { routing.serverStream(SERVER_STREAMING_METHOD_NAME, this::subscribeBlockStream); } + public void register(final WebServer webServer) { + this.webServer = webServer; + } + StreamObserver publishBlockStream( final StreamObserver publishStreamResponseObserver) { LOGGER.log( @@ -101,7 +108,7 @@ StreamObserver publishBlockStream( "Executing bidirectional publishBlockStream gRPC method"); return new ProducerBlockItemObserver( - streamMediator, publishStreamResponseObserver, itemAckBuilder); + streamMediator, publishStreamResponseObserver, itemAckBuilder, webServer); } void subscribeBlockStream( diff --git a/server/src/main/java/com/hedera/block/server/Server.java b/server/src/main/java/com/hedera/block/server/Server.java index bfb3f7958..fae460c2b 100644 --- a/server/src/main/java/com/hedera/block/server/Server.java +++ b/server/src/main/java/com/hedera/block/server/Server.java @@ -19,7 +19,9 @@ import static com.hedera.block.protos.BlockStreamService.*; import static com.hedera.block.server.Constants.*; +import com.hedera.block.server.data.ObjectEvent; import com.hedera.block.server.mediator.LiveStreamMediatorImpl; +import com.hedera.block.server.mediator.StreamMediator; import com.hedera.block.server.persistence.FileSystemPersistenceHandler; import com.hedera.block.server.persistence.storage.*; import com.hedera.block.server.producer.ItemAckBuilder; @@ -59,57 +61,60 @@ public static void main(final String[] args) { final Config config = Config.create(); Config.global(config); - // Build the gRPC service - final GrpcRouting.Builder grpcRouting = buildGrpcRouting(config); - - // Start the web server - WebServer webServer = WebServer.builder().port(8080).addRouting(grpcRouting).build(); - - webServer.start(); - // .start(); - + try { + final StreamMediator> streamMediator = + buildStreamMediator(config); + final BlockStreamService blockStreamService = + buildBlockStreamService(config, streamMediator); + final GrpcRouting.Builder grpcRouting = buildGrpcRouting(blockStreamService); + + // Build the web server + final WebServer webServer = + WebServer.builder().port(8080).addRouting(grpcRouting).build(); + + blockStreamService.register(webServer); + + // Start the web server + webServer.start(); + } catch (IOException e) { + throw new RuntimeException(e); + } } - private static GrpcRouting.Builder buildGrpcRouting(final Config config) { + private static StreamMediator> + buildStreamMediator(final Config config) throws IOException { + return new LiveStreamMediatorImpl( + new ConcurrentHashMap<>(32), + new FileSystemPersistenceHandler( + new BlockAsDirReader(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config), + new BlockAsDirWriter(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config))); + } - try { - final BlockStreamService blockStreamService = buildBlockStreamService(config); - return GrpcRouting.builder() - .service(blockStreamService) - .bidi( - com.hedera.block.protos.BlockStreamService.getDescriptor(), - SERVICE_NAME, - CLIENT_STREAMING_METHOD_NAME, - clientBidiStreamingMethod) - .serverStream( - com.hedera.block.protos.BlockStreamService.getDescriptor(), - SERVICE_NAME, - SERVER_STREAMING_METHOD_NAME, - serverStreamingMethod); - } catch (IOException io) { - LOGGER.log( - System.Logger.Level.ERROR, "An exception was thrown starting the server", io); - throw new RuntimeException(io); - } + private static GrpcRouting.Builder buildGrpcRouting( + final BlockStreamService blockStreamService) { + + return GrpcRouting.builder() + .service(blockStreamService) + .bidi( + com.hedera.block.protos.BlockStreamService.getDescriptor(), + SERVICE_NAME, + CLIENT_STREAMING_METHOD_NAME, + clientBidiStreamingMethod) + .serverStream( + com.hedera.block.protos.BlockStreamService.getDescriptor(), + SERVICE_NAME, + SERVER_STREAMING_METHOD_NAME, + serverStreamingMethod); } - private static BlockStreamService buildBlockStreamService(final Config config) - throws IOException { + private static BlockStreamService buildBlockStreamService( + final Config config, + final StreamMediator> streamMediator) { // Get Timeout threshold from configuration final long consumerTimeoutThreshold = config.get(BLOCKNODE_SERVER_CONSUMER_TIMEOUT_THRESHOLD_KEY).asLong().orElse(1500L); - // Initialize the reader and writer for the block storage - final BlockWriter blockWriter = - new BlockAsDirWriter(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config); - final BlockReader blockReader = - new BlockAsDirReader(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config); - return new BlockStreamService( - consumerTimeoutThreshold, - new ItemAckBuilder(), - new LiveStreamMediatorImpl( - new ConcurrentHashMap<>(32), - new FileSystemPersistenceHandler(blockReader, blockWriter))); + consumerTimeoutThreshold, new ItemAckBuilder(), streamMediator); } } diff --git a/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java b/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java index 1e742bdd0..991d80c0d 100644 --- a/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java +++ b/server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java @@ -96,6 +96,8 @@ public void publishEvent(final BlockItem blockItem) throws IOException { final var subscribeStreamResponse = SubscribeStreamResponse.newBuilder().setBlockItem(blockItem).build(); ringBuffer.publishEvent((event, sequence) -> event.set(subscribeStreamResponse)); + + // Persist the BlockItem blockPersistenceHandler.persist(blockItem); } else { @@ -172,11 +174,6 @@ public boolean isSubscribed( return subscribers.containsKey(handler); } - @Override - public void register(final WebServer webServer) { - this.webserver = webServer; - } - private static SubscribeStreamResponse buildEndStreamResponse() { // The current spec does not contain a generic error code for // SubscribeStreamResponseCode. diff --git a/server/src/main/java/com/hedera/block/server/mediator/StreamMediator.java b/server/src/main/java/com/hedera/block/server/mediator/StreamMediator.java index ec4647e75..65c84a360 100644 --- a/server/src/main/java/com/hedera/block/server/mediator/StreamMediator.java +++ b/server/src/main/java/com/hedera/block/server/mediator/StreamMediator.java @@ -17,7 +17,6 @@ package com.hedera.block.server.mediator; import com.hedera.block.server.consumer.BlockItemEventHandler; -import io.helidon.webserver.WebServer; import java.io.IOException; /** @@ -42,6 +41,4 @@ public interface StreamMediator { void unsubscribe(final BlockItemEventHandler handler); boolean isSubscribed(final BlockItemEventHandler handler); - - void register(final WebServer webServer); } diff --git a/server/src/main/java/com/hedera/block/server/persistence/storage/BlockAsDirWriter.java b/server/src/main/java/com/hedera/block/server/persistence/storage/BlockAsDirWriter.java index 578322974..2ad6d1af6 100644 --- a/server/src/main/java/com/hedera/block/server/persistence/storage/BlockAsDirWriter.java +++ b/server/src/main/java/com/hedera/block/server/persistence/storage/BlockAsDirWriter.java @@ -70,6 +70,7 @@ public void write(final BlockItem blockItem) throws IOException { } catch (IOException e) { LOGGER.log( System.Logger.Level.ERROR, "Error writing the BlockItem protobuf to a file", e); + throw e; } } diff --git a/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java b/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java index ada3c1a00..fc8cf9957 100644 --- a/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java +++ b/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java @@ -22,6 +22,7 @@ import com.hedera.block.server.data.ObjectEvent; import com.hedera.block.server.mediator.StreamMediator; import io.grpc.stub.StreamObserver; +import io.helidon.webserver.WebServer; import java.io.IOException; import java.security.NoSuchAlgorithmException; @@ -37,6 +38,7 @@ public class ProducerBlockItemObserver implements StreamObserver publishStreamResponseObserver; private final StreamMediator> streamMediator; private final ItemAckBuilder itemAckBuilder; + private final WebServer webServer; /** * Constructor for the ProducerBlockStreamObserver class. It is responsible for calling the @@ -46,11 +48,13 @@ public class ProducerBlockItemObserver implements StreamObserver> streamMediator, final StreamObserver publishStreamResponseObserver, - final ItemAckBuilder itemAckBuilder) { + final ItemAckBuilder itemAckBuilder, + final WebServer webServer) { this.streamMediator = streamMediator; this.publishStreamResponseObserver = publishStreamResponseObserver; this.itemAckBuilder = itemAckBuilder; + this.webServer = webServer; } /** @@ -94,6 +98,9 @@ public void onNext(final PublishStreamRequest publishStreamRequest) { final var errorResponse = buildErrorStreamResponse(); publishStreamResponseObserver.onNext(errorResponse); LOGGER.log(System.Logger.Level.ERROR, "Exception thrown publishing BlockItem", io); + + LOGGER.log(System.Logger.Level.ERROR, "Shutting down the web server"); + webServer.stop(); } } diff --git a/server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java b/server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java index 558a22083..30dcfc0a0 100644 --- a/server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java +++ b/server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java @@ -40,6 +40,7 @@ import io.helidon.config.Config; import io.helidon.config.MapConfigSource; import io.helidon.config.spi.ConfigSource; +import io.helidon.webserver.WebServer; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; @@ -75,6 +76,8 @@ public class BlockStreamServiceIT { @Mock private StreamObserver subscribeStreamObserver5; @Mock private StreamObserver subscribeStreamObserver6; + @Mock private WebServer webServer; + @Mock private BlockReader blockReader; @Mock private BlockWriter blockWriter; @@ -389,6 +392,10 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure() final var streamMediator = buildStreamMediator(subscribers); final var blockStreamService = buildBlockStreamService(streamMediator); + // Register the web server to confirm + // the server is stopped when an exception occurs + blockStreamService.register(webServer); + // Subscribe the consumers blockStreamService.subscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver1); blockStreamService.subscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver2); @@ -441,6 +448,8 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure() final var endOfStreamResponse = PublishStreamResponse.newBuilder().setStatus(endOfStream).build(); verify(publishStreamResponseObserver, times(1)).onNext(endOfStreamResponse); + + verify(webServer, times(1)).stop(); } private static final String NO_WRITE = "r-xr-xr-x"; diff --git a/server/src/test/java/com/hedera/block/server/persistence/storage/BlockAsDirectoryTest.java b/server/src/test/java/com/hedera/block/server/persistence/storage/BlockAsDirectoryTest.java index 9d7ec6e9a..b0cd4f154 100644 --- a/server/src/test/java/com/hedera/block/server/persistence/storage/BlockAsDirectoryTest.java +++ b/server/src/test/java/com/hedera/block/server/persistence/storage/BlockAsDirectoryTest.java @@ -166,23 +166,7 @@ public void testRemoveBlockItemWritePerms() throws IOException { // Change the permissions on the block node root directory removeBlockWritePerms(1, testConfig); - - // Here, BlockItem writes won't throw an exception. - // We will rely on a different process to detect the invalid - // block and replace it. - for (int i = 2; i < blockItems.size(); i++) { - blockWriter.write(blockItems.get(1)); - } - - // Verify only the header block is on the file system - final BlockReader blockReader = new BlockAsDirReader(JUNIT, testConfig); - Optional blockOpt = blockReader.read(1); - assertFalse(blockOpt.isEmpty()); - - for (int i = 2; i < blockItems.size(); i++) { - blockOpt = blockReader.read(i); - assertTrue(blockOpt.isEmpty()); - } + assertThrows(IOException.class, () -> blockWriter.write(blockItems.get(1))); } @Test diff --git a/server/src/test/java/com/hedera/block/server/producer/ProducerBlockItemObserverTest.java b/server/src/test/java/com/hedera/block/server/producer/ProducerBlockItemObserverTest.java index 6eece47a5..0faddbe58 100644 --- a/server/src/test/java/com/hedera/block/server/producer/ProducerBlockItemObserverTest.java +++ b/server/src/test/java/com/hedera/block/server/producer/ProducerBlockItemObserverTest.java @@ -34,6 +34,7 @@ import com.hedera.block.server.persistence.storage.BlockReader; import com.hedera.block.server.persistence.storage.BlockWriter; import io.grpc.stub.StreamObserver; +import io.helidon.webserver.WebServer; import java.io.IOException; import java.security.NoSuchAlgorithmException; import java.util.List; @@ -58,6 +59,8 @@ public class ProducerBlockItemObserverTest { @Mock private StreamObserver streamObserver2; @Mock private StreamObserver streamObserver3; + @Mock private WebServer webServer; + @Test public void testProducerOnNext() throws InterruptedException, IOException, NoSuchAlgorithmException { @@ -65,7 +68,10 @@ public void testProducerOnNext() List blockItems = generateBlockItems(1); ProducerBlockItemObserver producerBlockItemObserver = new ProducerBlockItemObserver( - streamMediator, publishStreamResponseObserver, new ItemAckBuilder()); + streamMediator, + publishStreamResponseObserver, + new ItemAckBuilder(), + webServer); when(streamMediator.isPublishing()).thenReturn(true); @@ -147,7 +153,10 @@ public void testProducerToManyConsumers() throws IOException, InterruptedExcepti final ProducerBlockItemObserver producerBlockItemObserver = new ProducerBlockItemObserver( - streamMediator, publishStreamResponseObserver, new ItemAckBuilder()); + streamMediator, + publishStreamResponseObserver, + new ItemAckBuilder(), + webServer); PublishStreamRequest publishStreamRequest = PublishStreamRequest.newBuilder().setBlockItem(blockItem).build(); @@ -171,7 +180,10 @@ public void testProducerToManyConsumers() throws IOException, InterruptedExcepti public void testOnError() { ProducerBlockItemObserver producerBlockItemObserver = new ProducerBlockItemObserver( - streamMediator, publishStreamResponseObserver, new ItemAckBuilder()); + streamMediator, + publishStreamResponseObserver, + new ItemAckBuilder(), + webServer); Throwable t = new Throwable("Test error"); producerBlockItemObserver.onError(t); @@ -184,7 +196,7 @@ public void testItemAckBuilderExceptionTest() ProducerBlockItemObserver producerBlockItemObserver = new ProducerBlockItemObserver( - streamMediator, publishStreamResponseObserver, itemAckBuilder); + streamMediator, publishStreamResponseObserver, itemAckBuilder, webServer); when(streamMediator.isPublishing()).thenReturn(true); when(itemAckBuilder.buildAck(any()))