From 691a4c14ca62b8f4f80d1c350673b8a82a8b1a23 Mon Sep 17 00:00:00 2001 From: a-saksena <168143982+a-saksena@users.noreply.github.com> Date: Mon, 29 Jul 2024 15:47:28 -0700 Subject: [PATCH] feat: Added Get Block (#26) Signed-off-by: a-saksena --- .gitignore | 2 + protos/src/main/protobuf/blockstream.proto | 15 +++ .../block/server/BlockStreamService.java | 112 ++++++++++++------ .../com/hedera/block/server/Constants.java | 8 +- .../java/com/hedera/block/server/Server.java | 73 ++++++++---- .../block/server/BlockStreamServiceTest.java | 74 ++++++++++++ 6 files changed, 220 insertions(+), 64 deletions(-) create mode 100644 server/src/test/java/com/hedera/block/server/BlockStreamServiceTest.java diff --git a/.gitignore b/.gitignore index f0f492d91..594c0f46a 100644 --- a/.gitignore +++ b/.gitignore @@ -47,5 +47,7 @@ gradle-app.setting # JDT-specific (Eclipse Java Development Tools) .classpath +.idea +.DS_Store # .env files server/docker/.env diff --git a/protos/src/main/protobuf/blockstream.proto b/protos/src/main/protobuf/blockstream.proto index 87ff477f7..c1d7a425d 100644 --- a/protos/src/main/protobuf/blockstream.proto +++ b/protos/src/main/protobuf/blockstream.proto @@ -46,6 +46,8 @@ service BlockStreamGrpc { * message with the id of each block received. */ rpc StreamSource(stream BlockResponse) returns (stream Block) {} + + rpc GetBlock(Block) returns (Block) {} } /** @@ -79,3 +81,16 @@ message BlockResponse { */ int64 id = 1; } + +/** + * A block request is a simple message that contains an id. + * This specification is a simple example meant to expedite development. + * It will be replaced with a PBJ implementation in the future. + */ +message BlockRequest { + /** + * The id of the block which was requested. Each block id should + * correlate with the id of a Block message id. + */ + int64 id = 1; +} diff --git a/server/src/main/java/com/hedera/block/server/BlockStreamService.java b/server/src/main/java/com/hedera/block/server/BlockStreamService.java index f301272df..7b40f4034 100644 --- a/server/src/main/java/com/hedera/block/server/BlockStreamService.java +++ b/server/src/main/java/com/hedera/block/server/BlockStreamService.java @@ -16,33 +16,41 @@ package com.hedera.block.server; +import static com.hedera.block.server.Constants.*; +import static io.helidon.webserver.grpc.ResponseHelper.complete; + import com.google.protobuf.Descriptors; import com.hedera.block.protos.BlockStreamServiceGrpcProto; import com.hedera.block.server.consumer.LiveStreamObserver; import com.hedera.block.server.consumer.LiveStreamObserverImpl; import com.hedera.block.server.mediator.StreamMediator; +import com.hedera.block.server.persistence.BlockPersistenceHandler; import com.hedera.block.server.producer.ProducerBlockStreamObserver; import io.grpc.stub.StreamObserver; import io.helidon.webserver.grpc.GrpcService; - import java.time.Clock; - -import static com.hedera.block.server.Constants.*; +import java.util.Optional; /** - * This class implements the GrpcService interface and provides the functionality for the BlockStreamService. - * It sets up the bidirectional streaming methods for the service and handles the routing for these methods. - * It also initializes the StreamMediator, BlockStorage, and BlockCache upon creation. + * This class implements the GrpcService interface and provides the functionality for the + * BlockStreamService. It sets up the bidirectional streaming methods for the service and handles + * the routing for these methods. It also initializes the StreamMediator, BlockStorage, and + * BlockCache upon creation. * - *

The class provides two main methods, streamSink and streamSource, which handle the client and server streaming - * respectively. These methods return custom StreamObservers which are used to observe and respond to the streams. + *

The class provides two main methods, streamSink and streamSource, which handle the client and + * server streaming respectively. These methods return custom StreamObservers which are used to + * observe and respond to the streams. */ public class BlockStreamService implements GrpcService { private final System.Logger LOGGER = System.getLogger(getClass().getName()); private final long timeoutThresholdMillis; - private final StreamMediator streamMediator; + private final StreamMediator< + BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> + streamMediator; + private final BlockPersistenceHandler + blockPersistenceHandler; /** * Constructor for the BlockStreamService class. @@ -50,11 +58,18 @@ public class BlockStreamService implements GrpcService { * @param timeoutThresholdMillis the timeout threshold in milliseconds * @param streamMediator the stream mediator */ - public BlockStreamService(final long timeoutThresholdMillis, - final StreamMediator streamMediator) { + public BlockStreamService( + final long timeoutThresholdMillis, + final StreamMediator< + BlockStreamServiceGrpcProto.Block, + BlockStreamServiceGrpcProto.BlockResponse> + streamMediator, + final BlockPersistenceHandler + blockPersistenceHandler) { this.timeoutThresholdMillis = timeoutThresholdMillis; this.streamMediator = streamMediator; + this.blockPersistenceHandler = blockPersistenceHandler; } /** @@ -68,8 +83,8 @@ public Descriptors.FileDescriptor proto() { } /** - * Returns the service name for the BlockStreamService. This service name corresponds to the service name in - * the proto file. + * Returns the service name for the BlockStreamService. This service name corresponds to the + * service name in the proto file. * * @return the service name corresponding to the service name in the proto file */ @@ -79,7 +94,8 @@ public String serviceName() { } /** - * Updates the routing for the BlockStreamService. It sets up the bidirectional streaming methods for the service. + * Updates the routing for the BlockStreamService. It sets up the bidirectional streaming + * methods for the service. * * @param routing the routing for the BlockStreamService */ @@ -87,48 +103,74 @@ public String serviceName() { public void update(final Routing routing) { routing.bidi(CLIENT_STREAMING_METHOD_NAME, this::streamSink); routing.bidi(SERVER_STREAMING_METHOD_NAME, this::streamSource); + routing.unary(GET_BLOCK_METHOD_NAME, this::getBlock); } /** - * The streamSink method is called by Helidon each time a producer initiates a bidirectional stream. - * - * @param responseStreamObserver Helidon provides a StreamObserver to handle responses back to the producer. + * The streamSink method is called by Helidon each time a producer initiates a bidirectional + * stream. * - * @return a custom StreamObserver to handle streaming blocks from the producer to all subscribed consumer - * via the streamMediator as well as sending responses back to the producer. + * @param responseStreamObserver Helidon provides a StreamObserver to handle responses back to + * the producer. + * @return a custom StreamObserver to handle streaming blocks from the producer to all + * subscribed consumer via the streamMediator as well as sending responses back to the + * producer. */ private StreamObserver streamSink( - final StreamObserver responseStreamObserver) { + final StreamObserver + responseStreamObserver) { LOGGER.log(System.Logger.Level.DEBUG, "Executing bidirectional streamSink method"); return new ProducerBlockStreamObserver(streamMediator, responseStreamObserver); } /** - * The streamSource method is called by Helidon each time a consumer initiates a bidirectional stream. + * The streamSource method is called by Helidon each time a consumer initiates a bidirectional + * stream. * - * @param responseStreamObserver Helidon provides a StreamObserver to handle responses from the consumer - * back to the server. - * - * @return a custom StreamObserver to handle streaming blocks from the producer to the consumer as well - * as handling responses from the consumer. + * @param responseStreamObserver Helidon provides a StreamObserver to handle responses from the + * consumer back to the server. + * @return a custom StreamObserver to handle streaming blocks from the producer to the consumer + * as well as handling responses from the consumer. */ - private StreamObserver streamSource(final StreamObserver responseStreamObserver) { + private StreamObserver streamSource( + final StreamObserver responseStreamObserver) { LOGGER.log(System.Logger.Level.DEBUG, "Executing bidirectional streamSource method"); // Return a custom StreamObserver to handle streaming blocks from the producer. - final LiveStreamObserver streamObserver = new LiveStreamObserverImpl( - timeoutThresholdMillis, - Clock.systemDefaultZone(), - Clock.systemDefaultZone(), - streamMediator, - responseStreamObserver); + final LiveStreamObserver< + BlockStreamServiceGrpcProto.Block, + BlockStreamServiceGrpcProto.BlockResponse> + streamObserver = + new LiveStreamObserverImpl( + timeoutThresholdMillis, + Clock.systemDefaultZone(), + Clock.systemDefaultZone(), + streamMediator, + responseStreamObserver); // Subscribe the observer to the mediator streamMediator.subscribe(streamObserver); return streamObserver; } -} - + void getBlock( + BlockStreamServiceGrpcProto.Block block, + StreamObserver responseObserver) { + LOGGER.log(System.Logger.Level.INFO, "GetBlock request received"); + final Optional responseBlock = + blockPersistenceHandler.read(block.getId()); + if (responseBlock.isPresent()) { + LOGGER.log(System.Logger.Level.INFO, "Returning block with id: {0}", block.getId()); + complete(responseObserver, responseBlock.get()); + } else { + LOGGER.log( + System.Logger.Level.INFO, + "Did not find your block with id: {0}", + block.getId()); + responseObserver.onNext( + BlockStreamServiceGrpcProto.Block.newBuilder().setId(0).build()); + } + } +} diff --git a/server/src/main/java/com/hedera/block/server/Constants.java b/server/src/main/java/com/hedera/block/server/Constants.java index 2651397b9..a48d3a4b7 100644 --- a/server/src/main/java/com/hedera/block/server/Constants.java +++ b/server/src/main/java/com/hedera/block/server/Constants.java @@ -16,18 +16,18 @@ package com.hedera.block.server; -/** - * Constants used in the BlockNode service. - */ +/** Constants used in the BlockNode service. */ public final class Constants { private Constants() {} // Config Constants public static final String BLOCKNODE_STORAGE_ROOT_PATH_KEY = "blocknode.storage.root.path"; - public static final String BLOCKNODE_SERVER_CONSUMER_TIMEOUT_THRESHOLD_KEY = "blocknode.server.consumer.timeout.threshold"; + public static final String BLOCKNODE_SERVER_CONSUMER_TIMEOUT_THRESHOLD_KEY = + "blocknode.server.consumer.timeout.threshold"; // Constants specified in the service definition of the .proto file public static final String SERVICE_NAME = "BlockStreamGrpc"; public static final String CLIENT_STREAMING_METHOD_NAME = "StreamSink"; public static final String SERVER_STREAMING_METHOD_NAME = "StreamSource"; + public static final String GET_BLOCK_METHOD_NAME = "GetBlock"; } diff --git a/server/src/main/java/com/hedera/block/server/Server.java b/server/src/main/java/com/hedera/block/server/Server.java index 08aebf39d..a14d2cb5f 100644 --- a/server/src/main/java/com/hedera/block/server/Server.java +++ b/server/src/main/java/com/hedera/block/server/Server.java @@ -16,6 +16,8 @@ package com.hedera.block.server; +import static com.hedera.block.server.Constants.*; + import com.hedera.block.protos.BlockStreamServiceGrpcProto; import com.hedera.block.server.mediator.LiveStreamMediatorImpl; import com.hedera.block.server.persistence.WriteThroughCacheHandler; @@ -26,21 +28,22 @@ import io.helidon.config.Config; import io.helidon.webserver.WebServer; import io.helidon.webserver.grpc.GrpcRouting; -import io.helidon.webserver.http.HttpRouting; - import java.io.IOException; import java.util.stream.Stream; -import static com.hedera.block.server.Constants.*; - -/** - * Main class for the block node server - */ +/** Main class for the block node server */ public class Server { - // Function stubs to satisfy the bidi routing param signatures. The implementations are in the service class. - private static ServerCalls.BidiStreamingMethod, StreamObserver> clientBidiStreamingMethod; - private static ServerCalls.BidiStreamingMethod, StreamObserver> serverBidiStreamingMethod; + // Function stubs to satisfy the bidi routing param signatures. The implementations are in the + // service class. + private static ServerCalls.BidiStreamingMethod< + Stream, + StreamObserver> + clientBidiStreamingMethod; + private static ServerCalls.BidiStreamingMethod< + Stream, + StreamObserver> + serverBidiStreamingMethod; private static final System.Logger LOGGER = System.getLogger(Server.class.getName()); @@ -60,32 +63,52 @@ public static void main(final String[] args) { Config.global(config); // Get Timeout threshold from configuration - final long consumerTimeoutThreshold = config.get(BLOCKNODE_SERVER_CONSUMER_TIMEOUT_THRESHOLD_KEY).asLong().orElse(1500L); + final long consumerTimeoutThreshold = + config.get(BLOCKNODE_SERVER_CONSUMER_TIMEOUT_THRESHOLD_KEY) + .asLong() + .orElse(1500L); // Initialize the block storage, cache, and service - final BlockStorage blockStorage = new FileSystemBlockStorage(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config); - final BlockStreamService blockStreamService = new BlockStreamService(consumerTimeoutThreshold, - new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage))); + final BlockStorage blockStorage = + new FileSystemBlockStorage(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config); + + // Initialize blockStreamService with Live Stream and Cache + final BlockStreamService blockStreamService = + new BlockStreamService( + consumerTimeoutThreshold, + new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage)), + new WriteThroughCacheHandler(blockStorage)); // Start the web server WebServer.builder() .port(8080) - .addRouting(GrpcRouting.builder() - .service(blockStreamService) - .bidi(BlockStreamServiceGrpcProto.getDescriptor(), - SERVICE_NAME, - CLIENT_STREAMING_METHOD_NAME, - clientBidiStreamingMethod) - .bidi(BlockStreamServiceGrpcProto.getDescriptor(), - SERVICE_NAME, - SERVER_STREAMING_METHOD_NAME, - serverBidiStreamingMethod)) + .addRouting( + GrpcRouting.builder() + .service(blockStreamService) + .bidi( + BlockStreamServiceGrpcProto.getDescriptor(), + SERVICE_NAME, + CLIENT_STREAMING_METHOD_NAME, + clientBidiStreamingMethod) + .bidi( + BlockStreamServiceGrpcProto.getDescriptor(), + SERVICE_NAME, + SERVER_STREAMING_METHOD_NAME, + serverBidiStreamingMethod) + .unary( + BlockStreamServiceGrpcProto.getDescriptor(), + SERVICE_NAME, + GET_BLOCK_METHOD_NAME, + Server::grpcGetBlock)) .build() .start(); - } catch (IOException e) { LOGGER.log(System.Logger.Level.ERROR, "An exception was thrown starting the server", e); throw new RuntimeException(e); } } + + static void grpcGetBlock( + BlockStreamServiceGrpcProto.BlockRequest request, + StreamObserver responseObserver) {} } diff --git a/server/src/test/java/com/hedera/block/server/BlockStreamServiceTest.java b/server/src/test/java/com/hedera/block/server/BlockStreamServiceTest.java new file mode 100644 index 000000000..fffb58847 --- /dev/null +++ b/server/src/test/java/com/hedera/block/server/BlockStreamServiceTest.java @@ -0,0 +1,74 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.server; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +import com.hedera.block.protos.BlockStreamServiceGrpcProto; +import com.hedera.block.server.mediator.StreamMediator; +import com.hedera.block.server.persistence.BlockPersistenceHandler; +import io.grpc.stub.StreamObserver; +import java.util.Optional; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class BlockStreamServiceTest { + + private final long TIMEOUT_THRESHOLD_MILLIS = 52L; + + @Mock private StreamObserver responseObserver; + + @Mock + private BlockPersistenceHandler blockPersistenceHandler; + + @Mock + private StreamMediator< + BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> + streamMediator; + + @Test + void getBlockHappyPath() { + BlockStreamServiceGrpcProto.Block block = + BlockStreamServiceGrpcProto.Block.newBuilder().setId(1).build(); + BlockStreamService blockStreamService = + new BlockStreamService( + TIMEOUT_THRESHOLD_MILLIS, streamMediator, blockPersistenceHandler); + when(blockPersistenceHandler.read(1)) + .thenReturn( + Optional.of( + BlockStreamServiceGrpcProto.Block.newBuilder().setId(1).build())); + blockStreamService.getBlock(block, responseObserver); + verify(responseObserver, times(1)).onNext(block); + } + + @Test + void getBlockErrorPath() { + BlockStreamServiceGrpcProto.Block block = + BlockStreamServiceGrpcProto.Block.newBuilder().setId(1).build(); + BlockStreamService blockStreamService = + new BlockStreamService( + TIMEOUT_THRESHOLD_MILLIS, streamMediator, blockPersistenceHandler); + when(blockPersistenceHandler.read(1)).thenReturn(Optional.empty()); + blockStreamService.getBlock(block, responseObserver); + verify(responseObserver, times(1)) + .onNext(BlockStreamServiceGrpcProto.Block.newBuilder().setId(0).build()); + } +}