Skip to content

Commit

Permalink
Added hedera user to the container. Added rethrow of IOException in t…
Browse files Browse the repository at this point in the history
…he reader. Now the server will stop when IOExceptions are thrown upstream

Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Jul 25, 2024
1 parent a82c2e6 commit b1ea596
Show file tree
Hide file tree
Showing 10 changed files with 102 additions and 76 deletions.
13 changes: 10 additions & 3 deletions server/docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
# Use Eclipse Temurin with Java 21 as the base image
FROM eclipse-temurin:21

# Expose the port that the application will run on
EXPOSE 8080

# Define version
ARG VERSION

ARG UNAME=hedera
ARG UID=2000
ARG GID=2000
RUN groupadd -g $GID -o $UNAME
RUN useradd -m -u $UID -g $GID -o -s /bin/bash $UNAME
USER $UNAME

# Set the working directory inside the container
WORKDIR /app

Expand All @@ -13,8 +23,5 @@ COPY --from=distributions server-${VERSION}.tar .
# Extract the TAR file
RUN tar -xvf server-${VERSION}.tar

# Expose the port that the application will run on
EXPOSE 8080

# RUN the bin script for starting the server
ENTRYPOINT ["sh", "-c", "/app/server-${VERSION}/bin/server"]
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.hedera.block.server.producer.ItemAckBuilder;
import com.hedera.block.server.producer.ProducerBlockItemObserver;
import io.grpc.stub.StreamObserver;
import io.helidon.webserver.WebServer;
import io.helidon.webserver.grpc.GrpcService;
import java.time.Clock;

Expand All @@ -47,6 +48,8 @@ public class BlockStreamService implements GrpcService {
private final ItemAckBuilder itemAckBuilder;
private final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator;

private WebServer webServer;

/**
* Constructor for the BlockStreamService class.
*
Expand Down Expand Up @@ -94,14 +97,18 @@ public void update(final Routing routing) {
routing.serverStream(SERVER_STREAMING_METHOD_NAME, this::subscribeBlockStream);
}

public void register(final WebServer webServer) {
this.webServer = webServer;
}

StreamObserver<PublishStreamRequest> publishBlockStream(
final StreamObserver<PublishStreamResponse> publishStreamResponseObserver) {
LOGGER.log(
System.Logger.Level.DEBUG,
"Executing bidirectional publishBlockStream gRPC method");

return new ProducerBlockItemObserver(
streamMediator, publishStreamResponseObserver, itemAckBuilder);
streamMediator, publishStreamResponseObserver, itemAckBuilder, webServer);
}

void subscribeBlockStream(
Expand Down
89 changes: 47 additions & 42 deletions server/src/main/java/com/hedera/block/server/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import static com.hedera.block.protos.BlockStreamService.*;
import static com.hedera.block.server.Constants.*;

import com.hedera.block.server.data.ObjectEvent;
import com.hedera.block.server.mediator.LiveStreamMediatorImpl;
import com.hedera.block.server.mediator.StreamMediator;
import com.hedera.block.server.persistence.FileSystemPersistenceHandler;
import com.hedera.block.server.persistence.storage.*;
import com.hedera.block.server.producer.ItemAckBuilder;
Expand Down Expand Up @@ -59,57 +61,60 @@ public static void main(final String[] args) {
final Config config = Config.create();
Config.global(config);

// Build the gRPC service
final GrpcRouting.Builder grpcRouting = buildGrpcRouting(config);

// Start the web server
WebServer webServer = WebServer.builder().port(8080).addRouting(grpcRouting).build();

webServer.start();
// .start();

try {
final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator =
buildStreamMediator(config);
final BlockStreamService blockStreamService =
buildBlockStreamService(config, streamMediator);
final GrpcRouting.Builder grpcRouting = buildGrpcRouting(blockStreamService);

// Build the web server
final WebServer webServer =
WebServer.builder().port(8080).addRouting(grpcRouting).build();

blockStreamService.register(webServer);

// Start the web server
webServer.start();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private static GrpcRouting.Builder buildGrpcRouting(final Config config) {
private static StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>>
buildStreamMediator(final Config config) throws IOException {
return new LiveStreamMediatorImpl(
new ConcurrentHashMap<>(32),
new FileSystemPersistenceHandler(
new BlockAsDirReader(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config),
new BlockAsDirWriter(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config)));
}

try {
final BlockStreamService blockStreamService = buildBlockStreamService(config);
return GrpcRouting.builder()
.service(blockStreamService)
.bidi(
com.hedera.block.protos.BlockStreamService.getDescriptor(),
SERVICE_NAME,
CLIENT_STREAMING_METHOD_NAME,
clientBidiStreamingMethod)
.serverStream(
com.hedera.block.protos.BlockStreamService.getDescriptor(),
SERVICE_NAME,
SERVER_STREAMING_METHOD_NAME,
serverStreamingMethod);
} catch (IOException io) {
LOGGER.log(
System.Logger.Level.ERROR, "An exception was thrown starting the server", io);
throw new RuntimeException(io);
}
private static GrpcRouting.Builder buildGrpcRouting(
final BlockStreamService blockStreamService) {

return GrpcRouting.builder()
.service(blockStreamService)
.bidi(
com.hedera.block.protos.BlockStreamService.getDescriptor(),
SERVICE_NAME,
CLIENT_STREAMING_METHOD_NAME,
clientBidiStreamingMethod)
.serverStream(
com.hedera.block.protos.BlockStreamService.getDescriptor(),
SERVICE_NAME,
SERVER_STREAMING_METHOD_NAME,
serverStreamingMethod);
}

private static BlockStreamService buildBlockStreamService(final Config config)
throws IOException {
private static BlockStreamService buildBlockStreamService(
final Config config,
final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator) {
// Get Timeout threshold from configuration
final long consumerTimeoutThreshold =
config.get(BLOCKNODE_SERVER_CONSUMER_TIMEOUT_THRESHOLD_KEY).asLong().orElse(1500L);

// Initialize the reader and writer for the block storage
final BlockWriter<BlockItem> blockWriter =
new BlockAsDirWriter(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);
final BlockReader<Block> blockReader =
new BlockAsDirReader(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);

return new BlockStreamService(
consumerTimeoutThreshold,
new ItemAckBuilder(),
new LiveStreamMediatorImpl(
new ConcurrentHashMap<>(32),
new FileSystemPersistenceHandler(blockReader, blockWriter)));
consumerTimeoutThreshold, new ItemAckBuilder(), streamMediator);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ public void publishEvent(final BlockItem blockItem) throws IOException {
final var subscribeStreamResponse =
SubscribeStreamResponse.newBuilder().setBlockItem(blockItem).build();
ringBuffer.publishEvent((event, sequence) -> event.set(subscribeStreamResponse));

// Persist the BlockItem
blockPersistenceHandler.persist(blockItem);

} else {
Expand Down Expand Up @@ -172,11 +174,6 @@ public boolean isSubscribed(
return subscribers.containsKey(handler);
}

@Override
public void register(final WebServer webServer) {
this.webserver = webServer;
}

private static SubscribeStreamResponse buildEndStreamResponse() {
// The current spec does not contain a generic error code for
// SubscribeStreamResponseCode.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.hedera.block.server.mediator;

import com.hedera.block.server.consumer.BlockItemEventHandler;
import io.helidon.webserver.WebServer;
import java.io.IOException;

/**
Expand All @@ -42,6 +41,4 @@ public interface StreamMediator<U, V> {
void unsubscribe(final BlockItemEventHandler<V> handler);

boolean isSubscribed(final BlockItemEventHandler<V> handler);

void register(final WebServer webServer);
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public void write(final BlockItem blockItem) throws IOException {
} catch (IOException e) {
LOGGER.log(
System.Logger.Level.ERROR, "Error writing the BlockItem protobuf to a file", e);
throw e;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.hedera.block.server.data.ObjectEvent;
import com.hedera.block.server.mediator.StreamMediator;
import io.grpc.stub.StreamObserver;
import io.helidon.webserver.WebServer;
import java.io.IOException;
import java.security.NoSuchAlgorithmException;

Expand All @@ -37,6 +38,7 @@ public class ProducerBlockItemObserver implements StreamObserver<PublishStreamRe
private final StreamObserver<PublishStreamResponse> publishStreamResponseObserver;
private final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator;
private final ItemAckBuilder itemAckBuilder;
private final WebServer webServer;

/**
* Constructor for the ProducerBlockStreamObserver class. It is responsible for calling the
Expand All @@ -46,11 +48,13 @@ public class ProducerBlockItemObserver implements StreamObserver<PublishStreamRe
public ProducerBlockItemObserver(
final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator,
final StreamObserver<PublishStreamResponse> publishStreamResponseObserver,
final ItemAckBuilder itemAckBuilder) {
final ItemAckBuilder itemAckBuilder,
final WebServer webServer) {

this.streamMediator = streamMediator;
this.publishStreamResponseObserver = publishStreamResponseObserver;
this.itemAckBuilder = itemAckBuilder;
this.webServer = webServer;
}

/**
Expand Down Expand Up @@ -94,6 +98,9 @@ public void onNext(final PublishStreamRequest publishStreamRequest) {
final var errorResponse = buildErrorStreamResponse();
publishStreamResponseObserver.onNext(errorResponse);
LOGGER.log(System.Logger.Level.ERROR, "Exception thrown publishing BlockItem", io);

LOGGER.log(System.Logger.Level.ERROR, "Shutting down the web server");
webServer.stop();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.helidon.config.Config;
import io.helidon.config.MapConfigSource;
import io.helidon.config.spi.ConfigSource;
import io.helidon.webserver.WebServer;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
Expand Down Expand Up @@ -75,6 +76,8 @@ public class BlockStreamServiceIT {
@Mock private StreamObserver<SubscribeStreamResponse> subscribeStreamObserver5;
@Mock private StreamObserver<SubscribeStreamResponse> subscribeStreamObserver6;

@Mock private WebServer webServer;

@Mock private BlockReader<Block> blockReader;
@Mock private BlockWriter<BlockItem> blockWriter;

Expand Down Expand Up @@ -389,6 +392,10 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure()
final var streamMediator = buildStreamMediator(subscribers);
final var blockStreamService = buildBlockStreamService(streamMediator);

// Register the web server to confirm
// the server is stopped when an exception occurs
blockStreamService.register(webServer);

// Subscribe the consumers
blockStreamService.subscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver1);
blockStreamService.subscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver2);
Expand Down Expand Up @@ -441,6 +448,8 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure()
final var endOfStreamResponse =
PublishStreamResponse.newBuilder().setStatus(endOfStream).build();
verify(publishStreamResponseObserver, times(1)).onNext(endOfStreamResponse);

verify(webServer, times(1)).stop();
}

private static final String NO_WRITE = "r-xr-xr-x";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,23 +166,7 @@ public void testRemoveBlockItemWritePerms() throws IOException {

// Change the permissions on the block node root directory
removeBlockWritePerms(1, testConfig);

// Here, BlockItem writes won't throw an exception.
// We will rely on a different process to detect the invalid
// block and replace it.
for (int i = 2; i < blockItems.size(); i++) {
blockWriter.write(blockItems.get(1));
}

// Verify only the header block is on the file system
final BlockReader<Block> blockReader = new BlockAsDirReader(JUNIT, testConfig);
Optional<Block> blockOpt = blockReader.read(1);
assertFalse(blockOpt.isEmpty());

for (int i = 2; i < blockItems.size(); i++) {
blockOpt = blockReader.read(i);
assertTrue(blockOpt.isEmpty());
}
assertThrows(IOException.class, () -> blockWriter.write(blockItems.get(1)));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.hedera.block.server.persistence.storage.BlockReader;
import com.hedera.block.server.persistence.storage.BlockWriter;
import io.grpc.stub.StreamObserver;
import io.helidon.webserver.WebServer;
import java.io.IOException;
import java.security.NoSuchAlgorithmException;
import java.util.List;
Expand All @@ -58,14 +59,19 @@ public class ProducerBlockItemObserverTest {
@Mock private StreamObserver<SubscribeStreamResponse> streamObserver2;
@Mock private StreamObserver<SubscribeStreamResponse> streamObserver3;

@Mock private WebServer webServer;

@Test
public void testProducerOnNext()
throws InterruptedException, IOException, NoSuchAlgorithmException {

List<BlockItem> blockItems = generateBlockItems(1);
ProducerBlockItemObserver producerBlockItemObserver =
new ProducerBlockItemObserver(
streamMediator, publishStreamResponseObserver, new ItemAckBuilder());
streamMediator,
publishStreamResponseObserver,
new ItemAckBuilder(),
webServer);

when(streamMediator.isPublishing()).thenReturn(true);

Expand Down Expand Up @@ -147,7 +153,10 @@ public void testProducerToManyConsumers() throws IOException, InterruptedExcepti

final ProducerBlockItemObserver producerBlockItemObserver =
new ProducerBlockItemObserver(
streamMediator, publishStreamResponseObserver, new ItemAckBuilder());
streamMediator,
publishStreamResponseObserver,
new ItemAckBuilder(),
webServer);

PublishStreamRequest publishStreamRequest =
PublishStreamRequest.newBuilder().setBlockItem(blockItem).build();
Expand All @@ -171,7 +180,10 @@ public void testProducerToManyConsumers() throws IOException, InterruptedExcepti
public void testOnError() {
ProducerBlockItemObserver producerBlockItemObserver =
new ProducerBlockItemObserver(
streamMediator, publishStreamResponseObserver, new ItemAckBuilder());
streamMediator,
publishStreamResponseObserver,
new ItemAckBuilder(),
webServer);

Throwable t = new Throwable("Test error");
producerBlockItemObserver.onError(t);
Expand All @@ -184,7 +196,7 @@ public void testItemAckBuilderExceptionTest()

ProducerBlockItemObserver producerBlockItemObserver =
new ProducerBlockItemObserver(
streamMediator, publishStreamResponseObserver, itemAckBuilder);
streamMediator, publishStreamResponseObserver, itemAckBuilder, webServer);

when(streamMediator.isPublishing()).thenReturn(true);
when(itemAckBuilder.buildAck(any()))
Expand Down

0 comments on commit b1ea596

Please sign in to comment.