diff --git a/.gitignore b/.gitignore index f0f492d91..594c0f46a 100644 --- a/.gitignore +++ b/.gitignore @@ -47,5 +47,7 @@ gradle-app.setting # JDT-specific (Eclipse Java Development Tools) .classpath +.idea +.DS_Store # .env files server/docker/.env diff --git a/protos/src/main/protobuf/blockstream.proto b/protos/src/main/protobuf/blockstream.proto index 87ff477f7..c148efda0 100644 --- a/protos/src/main/protobuf/blockstream.proto +++ b/protos/src/main/protobuf/blockstream.proto @@ -46,6 +46,8 @@ service BlockStreamGrpc { * message with the id of each block received. */ rpc StreamSource(stream BlockResponse) returns (stream Block) {} + + rpc GetBlock(Block) returns (Block) {} } /** diff --git a/server/src/main/java/com/hedera/block/server/BlockStreamService.java b/server/src/main/java/com/hedera/block/server/BlockStreamService.java index f301272df..c84a24cf1 100644 --- a/server/src/main/java/com/hedera/block/server/BlockStreamService.java +++ b/server/src/main/java/com/hedera/block/server/BlockStreamService.java @@ -21,11 +21,16 @@ import com.hedera.block.server.consumer.LiveStreamObserver; import com.hedera.block.server.consumer.LiveStreamObserverImpl; import com.hedera.block.server.mediator.StreamMediator; +import com.hedera.block.server.persistence.BlockPersistenceHandler; import com.hedera.block.server.producer.ProducerBlockStreamObserver; +import io.grpc.Status; import io.grpc.stub.StreamObserver; import io.helidon.webserver.grpc.GrpcService; import java.time.Clock; +import java.util.Optional; + +import static io.helidon.webserver.grpc.ResponseHelper.complete; import static com.hedera.block.server.Constants.*; @@ -43,6 +48,7 @@ public class BlockStreamService implements GrpcService { private final long timeoutThresholdMillis; private final StreamMediator streamMediator; + private final BlockPersistenceHandler blockPersistenceHandler; /** * Constructor for the BlockStreamService class. @@ -51,10 +57,12 @@ public class BlockStreamService implements GrpcService { * @param streamMediator the stream mediator */ public BlockStreamService(final long timeoutThresholdMillis, - final StreamMediator streamMediator) { + final StreamMediator streamMediator, + final BlockPersistenceHandler blockPersistenceHandler) { this.timeoutThresholdMillis = timeoutThresholdMillis; this.streamMediator = streamMediator; + this.blockPersistenceHandler = blockPersistenceHandler; } /** @@ -87,6 +95,7 @@ public String serviceName() { public void update(final Routing routing) { routing.bidi(CLIENT_STREAMING_METHOD_NAME, this::streamSink); routing.bidi(SERVER_STREAMING_METHOD_NAME, this::streamSource); + routing.unary("GetBlock", this::getBlock); } /** @@ -129,6 +138,24 @@ private StreamObserver streamSource(f return streamObserver; } + + private void getBlock(BlockStreamServiceGrpcProto.Block block, StreamObserver responseObserver) { + String message = "GET BLOCK RESPONSE! "; + LOGGER.log(System.Logger.Level.INFO, "GetBlock request received"); + Optional responseBlock = blockPersistenceHandler.read(block.getId()); + if(responseBlock.isPresent()) { + LOGGER.log(System.Logger.Level.INFO, "SENDING BLOCK # " + block.getId()); + complete(responseObserver, responseBlock.get()); // TODO: Should return int and not quoted string + } else { + LOGGER.log(System.Logger.Level.INFO, "DID NOT FIND YOUR BLOCK"); + // TODO: Fix below. It could return gRPC equivalent of 404 NOT FOUND + responseObserver.onError(Status.NOT_FOUND + .withDescription("DID NOT FIND YOUR BLOCK") + .asRuntimeException() + ); + // complete(responseObserver, BlockStreamServiceGrpcProto.Block.getDefaultInstance()); + } + } } diff --git a/server/src/main/java/com/hedera/block/server/Server.java b/server/src/main/java/com/hedera/block/server/Server.java index 08aebf39d..c8b46ce4f 100644 --- a/server/src/main/java/com/hedera/block/server/Server.java +++ b/server/src/main/java/com/hedera/block/server/Server.java @@ -26,7 +26,6 @@ import io.helidon.config.Config; import io.helidon.webserver.WebServer; import io.helidon.webserver.grpc.GrpcRouting; -import io.helidon.webserver.http.HttpRouting; import java.io.IOException; import java.util.stream.Stream; @@ -64,8 +63,11 @@ public static void main(final String[] args) { // Initialize the block storage, cache, and service final BlockStorage blockStorage = new FileSystemBlockStorage(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config); + + // TODO: Make timeoutThresholdMillis configurable final BlockStreamService blockStreamService = new BlockStreamService(consumerTimeoutThreshold, - new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage))); + new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage)), + new WriteThroughCacheHandler(blockStorage)); // Start the web server WebServer.builder() @@ -79,13 +81,18 @@ public static void main(final String[] args) { .bidi(BlockStreamServiceGrpcProto.getDescriptor(), SERVICE_NAME, SERVER_STREAMING_METHOD_NAME, - serverBidiStreamingMethod)) + serverBidiStreamingMethod) + .unary(BlockStreamServiceGrpcProto.getDescriptor(), + "BlockStreamGrpc", + "GetBlock", + Server::grpcGetBlock)) .build() .start(); - } catch (IOException e) { LOGGER.log(System.Logger.Level.ERROR, "An exception was thrown starting the server", e); throw new RuntimeException(e); } } + + static void grpcGetBlock(BlockStreamServiceGrpcProto.BlockRequest request, StreamObserver responseObserver) {} }