Skip to content

Commit

Permalink
feat: Added Get Block (#26)
Browse files Browse the repository at this point in the history
Signed-off-by: a-saksena <[email protected]>
  • Loading branch information
a-saksena authored Jul 29, 2024
1 parent b5bebc5 commit 691a4c1
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 64 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,7 @@ gradle-app.setting
# JDT-specific (Eclipse Java Development Tools)
.classpath

.idea
.DS_Store
# .env files
server/docker/.env
15 changes: 15 additions & 0 deletions protos/src/main/protobuf/blockstream.proto
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ service BlockStreamGrpc {
* message with the id of each block received.
*/
rpc StreamSource(stream BlockResponse) returns (stream Block) {}

rpc GetBlock(Block) returns (Block) {}
}

/**
Expand Down Expand Up @@ -79,3 +81,16 @@ message BlockResponse {
*/
int64 id = 1;
}

/**
* A block request is a simple message that contains an id.
* This specification is a simple example meant to expedite development.
* It will be replaced with a PBJ implementation in the future.
*/
message BlockRequest {
/**
* The id of the block which was requested. Each block id should
* correlate with the id of a Block message id.
*/
int64 id = 1;
}
112 changes: 77 additions & 35 deletions server/src/main/java/com/hedera/block/server/BlockStreamService.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,45 +16,60 @@

package com.hedera.block.server;

import static com.hedera.block.server.Constants.*;
import static io.helidon.webserver.grpc.ResponseHelper.complete;

import com.google.protobuf.Descriptors;
import com.hedera.block.protos.BlockStreamServiceGrpcProto;
import com.hedera.block.server.consumer.LiveStreamObserver;
import com.hedera.block.server.consumer.LiveStreamObserverImpl;
import com.hedera.block.server.mediator.StreamMediator;
import com.hedera.block.server.persistence.BlockPersistenceHandler;
import com.hedera.block.server.producer.ProducerBlockStreamObserver;
import io.grpc.stub.StreamObserver;
import io.helidon.webserver.grpc.GrpcService;

import java.time.Clock;

import static com.hedera.block.server.Constants.*;
import java.util.Optional;

/**
* This class implements the GrpcService interface and provides the functionality for the BlockStreamService.
* It sets up the bidirectional streaming methods for the service and handles the routing for these methods.
* It also initializes the StreamMediator, BlockStorage, and BlockCache upon creation.
* This class implements the GrpcService interface and provides the functionality for the
* BlockStreamService. It sets up the bidirectional streaming methods for the service and handles
* the routing for these methods. It also initializes the StreamMediator, BlockStorage, and
* BlockCache upon creation.
*
* <p>The class provides two main methods, streamSink and streamSource, which handle the client and server streaming
* respectively. These methods return custom StreamObservers which are used to observe and respond to the streams.
* <p>The class provides two main methods, streamSink and streamSource, which handle the client and
* server streaming respectively. These methods return custom StreamObservers which are used to
* observe and respond to the streams.
*/
public class BlockStreamService implements GrpcService {

private final System.Logger LOGGER = System.getLogger(getClass().getName());

private final long timeoutThresholdMillis;
private final StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamMediator;
private final StreamMediator<
BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse>
streamMediator;
private final BlockPersistenceHandler<BlockStreamServiceGrpcProto.Block>
blockPersistenceHandler;

/**
* Constructor for the BlockStreamService class.
*
* @param timeoutThresholdMillis the timeout threshold in milliseconds
* @param streamMediator the stream mediator
*/
public BlockStreamService(final long timeoutThresholdMillis,
final StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamMediator) {
public BlockStreamService(
final long timeoutThresholdMillis,
final StreamMediator<
BlockStreamServiceGrpcProto.Block,
BlockStreamServiceGrpcProto.BlockResponse>
streamMediator,
final BlockPersistenceHandler<BlockStreamServiceGrpcProto.Block>
blockPersistenceHandler) {

this.timeoutThresholdMillis = timeoutThresholdMillis;
this.streamMediator = streamMediator;
this.blockPersistenceHandler = blockPersistenceHandler;
}

/**
Expand All @@ -68,8 +83,8 @@ public Descriptors.FileDescriptor proto() {
}

/**
* Returns the service name for the BlockStreamService. This service name corresponds to the service name in
* the proto file.
* Returns the service name for the BlockStreamService. This service name corresponds to the
* service name in the proto file.
*
* @return the service name corresponding to the service name in the proto file
*/
Expand All @@ -79,56 +94,83 @@ public String serviceName() {
}

/**
* Updates the routing for the BlockStreamService. It sets up the bidirectional streaming methods for the service.
* Updates the routing for the BlockStreamService. It sets up the bidirectional streaming
* methods for the service.
*
* @param routing the routing for the BlockStreamService
*/
@Override
public void update(final Routing routing) {
routing.bidi(CLIENT_STREAMING_METHOD_NAME, this::streamSink);
routing.bidi(SERVER_STREAMING_METHOD_NAME, this::streamSource);
routing.unary(GET_BLOCK_METHOD_NAME, this::getBlock);
}

/**
* The streamSink method is called by Helidon each time a producer initiates a bidirectional stream.
*
* @param responseStreamObserver Helidon provides a StreamObserver to handle responses back to the producer.
* The streamSink method is called by Helidon each time a producer initiates a bidirectional
* stream.
*
* @return a custom StreamObserver to handle streaming blocks from the producer to all subscribed consumer
* via the streamMediator as well as sending responses back to the producer.
* @param responseStreamObserver Helidon provides a StreamObserver to handle responses back to
* the producer.
* @return a custom StreamObserver to handle streaming blocks from the producer to all
* subscribed consumer via the streamMediator as well as sending responses back to the
* producer.
*/
private StreamObserver<BlockStreamServiceGrpcProto.Block> streamSink(
final StreamObserver<BlockStreamServiceGrpcProto.BlockResponse> responseStreamObserver) {
final StreamObserver<BlockStreamServiceGrpcProto.BlockResponse>
responseStreamObserver) {
LOGGER.log(System.Logger.Level.DEBUG, "Executing bidirectional streamSink method");

return new ProducerBlockStreamObserver(streamMediator, responseStreamObserver);
}

/**
* The streamSource method is called by Helidon each time a consumer initiates a bidirectional stream.
* The streamSource method is called by Helidon each time a consumer initiates a bidirectional
* stream.
*
* @param responseStreamObserver Helidon provides a StreamObserver to handle responses from the consumer
* back to the server.
*
* @return a custom StreamObserver to handle streaming blocks from the producer to the consumer as well
* as handling responses from the consumer.
* @param responseStreamObserver Helidon provides a StreamObserver to handle responses from the
* consumer back to the server.
* @return a custom StreamObserver to handle streaming blocks from the producer to the consumer
* as well as handling responses from the consumer.
*/
private StreamObserver<BlockStreamServiceGrpcProto.BlockResponse> streamSource(final StreamObserver<BlockStreamServiceGrpcProto.Block> responseStreamObserver) {
private StreamObserver<BlockStreamServiceGrpcProto.BlockResponse> streamSource(
final StreamObserver<BlockStreamServiceGrpcProto.Block> responseStreamObserver) {
LOGGER.log(System.Logger.Level.DEBUG, "Executing bidirectional streamSource method");

// Return a custom StreamObserver to handle streaming blocks from the producer.
final LiveStreamObserver<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamObserver = new LiveStreamObserverImpl(
timeoutThresholdMillis,
Clock.systemDefaultZone(),
Clock.systemDefaultZone(),
streamMediator,
responseStreamObserver);
final LiveStreamObserver<
BlockStreamServiceGrpcProto.Block,
BlockStreamServiceGrpcProto.BlockResponse>
streamObserver =
new LiveStreamObserverImpl(
timeoutThresholdMillis,
Clock.systemDefaultZone(),
Clock.systemDefaultZone(),
streamMediator,
responseStreamObserver);

// Subscribe the observer to the mediator
streamMediator.subscribe(streamObserver);

return streamObserver;
}
}


void getBlock(
BlockStreamServiceGrpcProto.Block block,
StreamObserver<BlockStreamServiceGrpcProto.Block> responseObserver) {
LOGGER.log(System.Logger.Level.INFO, "GetBlock request received");
final Optional<BlockStreamServiceGrpcProto.Block> responseBlock =
blockPersistenceHandler.read(block.getId());
if (responseBlock.isPresent()) {
LOGGER.log(System.Logger.Level.INFO, "Returning block with id: {0}", block.getId());
complete(responseObserver, responseBlock.get());
} else {
LOGGER.log(
System.Logger.Level.INFO,
"Did not find your block with id: {0}",
block.getId());
responseObserver.onNext(
BlockStreamServiceGrpcProto.Block.newBuilder().setId(0).build());
}
}
}
8 changes: 4 additions & 4 deletions server/src/main/java/com/hedera/block/server/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,18 @@

package com.hedera.block.server;

/**
* Constants used in the BlockNode service.
*/
/** Constants used in the BlockNode service. */
public final class Constants {
private Constants() {}

// Config Constants
public static final String BLOCKNODE_STORAGE_ROOT_PATH_KEY = "blocknode.storage.root.path";
public static final String BLOCKNODE_SERVER_CONSUMER_TIMEOUT_THRESHOLD_KEY = "blocknode.server.consumer.timeout.threshold";
public static final String BLOCKNODE_SERVER_CONSUMER_TIMEOUT_THRESHOLD_KEY =
"blocknode.server.consumer.timeout.threshold";

// Constants specified in the service definition of the .proto file
public static final String SERVICE_NAME = "BlockStreamGrpc";
public static final String CLIENT_STREAMING_METHOD_NAME = "StreamSink";
public static final String SERVER_STREAMING_METHOD_NAME = "StreamSource";
public static final String GET_BLOCK_METHOD_NAME = "GetBlock";
}
73 changes: 48 additions & 25 deletions server/src/main/java/com/hedera/block/server/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.hedera.block.server;

import static com.hedera.block.server.Constants.*;

import com.hedera.block.protos.BlockStreamServiceGrpcProto;
import com.hedera.block.server.mediator.LiveStreamMediatorImpl;
import com.hedera.block.server.persistence.WriteThroughCacheHandler;
Expand All @@ -26,21 +28,22 @@
import io.helidon.config.Config;
import io.helidon.webserver.WebServer;
import io.helidon.webserver.grpc.GrpcRouting;
import io.helidon.webserver.http.HttpRouting;

import java.io.IOException;
import java.util.stream.Stream;

import static com.hedera.block.server.Constants.*;

/**
* Main class for the block node server
*/
/** Main class for the block node server */
public class Server {

// Function stubs to satisfy the bidi routing param signatures. The implementations are in the service class.
private static ServerCalls.BidiStreamingMethod<Stream<BlockStreamServiceGrpcProto.Block>, StreamObserver<BlockStreamServiceGrpcProto.Block>> clientBidiStreamingMethod;
private static ServerCalls.BidiStreamingMethod<Stream<BlockStreamServiceGrpcProto.BlockResponse>, StreamObserver<BlockStreamServiceGrpcProto.Block>> serverBidiStreamingMethod;
// Function stubs to satisfy the bidi routing param signatures. The implementations are in the
// service class.
private static ServerCalls.BidiStreamingMethod<
Stream<BlockStreamServiceGrpcProto.Block>,
StreamObserver<BlockStreamServiceGrpcProto.Block>>
clientBidiStreamingMethod;
private static ServerCalls.BidiStreamingMethod<
Stream<BlockStreamServiceGrpcProto.BlockResponse>,
StreamObserver<BlockStreamServiceGrpcProto.Block>>
serverBidiStreamingMethod;

private static final System.Logger LOGGER = System.getLogger(Server.class.getName());

Expand All @@ -60,32 +63,52 @@ public static void main(final String[] args) {
Config.global(config);

// Get Timeout threshold from configuration
final long consumerTimeoutThreshold = config.get(BLOCKNODE_SERVER_CONSUMER_TIMEOUT_THRESHOLD_KEY).asLong().orElse(1500L);
final long consumerTimeoutThreshold =
config.get(BLOCKNODE_SERVER_CONSUMER_TIMEOUT_THRESHOLD_KEY)
.asLong()
.orElse(1500L);

// Initialize the block storage, cache, and service
final BlockStorage<BlockStreamServiceGrpcProto.Block> blockStorage = new FileSystemBlockStorage(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);
final BlockStreamService blockStreamService = new BlockStreamService(consumerTimeoutThreshold,
new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage)));
final BlockStorage<BlockStreamServiceGrpcProto.Block> blockStorage =
new FileSystemBlockStorage(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);

// Initialize blockStreamService with Live Stream and Cache
final BlockStreamService blockStreamService =
new BlockStreamService(
consumerTimeoutThreshold,
new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage)),
new WriteThroughCacheHandler(blockStorage));

// Start the web server
WebServer.builder()
.port(8080)
.addRouting(GrpcRouting.builder()
.service(blockStreamService)
.bidi(BlockStreamServiceGrpcProto.getDescriptor(),
SERVICE_NAME,
CLIENT_STREAMING_METHOD_NAME,
clientBidiStreamingMethod)
.bidi(BlockStreamServiceGrpcProto.getDescriptor(),
SERVICE_NAME,
SERVER_STREAMING_METHOD_NAME,
serverBidiStreamingMethod))
.addRouting(
GrpcRouting.builder()
.service(blockStreamService)
.bidi(
BlockStreamServiceGrpcProto.getDescriptor(),
SERVICE_NAME,
CLIENT_STREAMING_METHOD_NAME,
clientBidiStreamingMethod)
.bidi(
BlockStreamServiceGrpcProto.getDescriptor(),
SERVICE_NAME,
SERVER_STREAMING_METHOD_NAME,
serverBidiStreamingMethod)
.unary(
BlockStreamServiceGrpcProto.getDescriptor(),
SERVICE_NAME,
GET_BLOCK_METHOD_NAME,
Server::grpcGetBlock))
.build()
.start();

} catch (IOException e) {
LOGGER.log(System.Logger.Level.ERROR, "An exception was thrown starting the server", e);
throw new RuntimeException(e);
}
}

static void grpcGetBlock(
BlockStreamServiceGrpcProto.BlockRequest request,
StreamObserver<BlockStreamServiceGrpcProto.Block> responseObserver) {}
}
Loading

0 comments on commit 691a4c1

Please sign in to comment.