Skip to content

Commit

Permalink
Addressing Spotless styling issues.
Browse files Browse the repository at this point in the history
Signed-off-by: a-saksena <[email protected]>
  • Loading branch information
a-saksena committed Jul 24, 2024
1 parent 59b1f6e commit 0cafd92
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 80 deletions.
114 changes: 67 additions & 47 deletions server/src/main/java/com/hedera/block/server/BlockStreamService.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package com.hedera.block.server;

import static com.hedera.block.server.Constants.*;
import static io.helidon.webserver.grpc.ResponseHelper.complete;

import com.google.protobuf.Descriptors;
import com.hedera.block.protos.BlockStreamServiceGrpcProto;
import com.hedera.block.server.consumer.LiveStreamObserver;
Expand All @@ -26,39 +29,44 @@
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import io.helidon.webserver.grpc.GrpcService;

import java.time.Clock;
import java.util.Optional;

import static io.helidon.webserver.grpc.ResponseHelper.complete;

import static com.hedera.block.server.Constants.*;

/**
* This class implements the GrpcService interface and provides the functionality for the BlockStreamService.
* It sets up the bidirectional streaming methods for the service and handles the routing for these methods.
* It also initializes the StreamMediator, BlockStorage, and BlockCache upon creation.
* This class implements the GrpcService interface and provides the functionality for the
* BlockStreamService. It sets up the bidirectional streaming methods for the service and handles
* the routing for these methods. It also initializes the StreamMediator, BlockStorage, and
* BlockCache upon creation.
*
* <p>The class provides two main methods, streamSink and streamSource, which handle the client and server streaming
* respectively. These methods return custom StreamObservers which are used to observe and respond to the streams.
* <p>The class provides two main methods, streamSink and streamSource, which handle the client and
* server streaming respectively. These methods return custom StreamObservers which are used to
* observe and respond to the streams.
*/
public class BlockStreamService implements GrpcService {

private final System.Logger LOGGER = System.getLogger(getClass().getName());

private final long timeoutThresholdMillis;
private final StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamMediator;
private final BlockPersistenceHandler<BlockStreamServiceGrpcProto.Block> blockPersistenceHandler;
private final StreamMediator<
BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse>
streamMediator;
private final BlockPersistenceHandler<BlockStreamServiceGrpcProto.Block>
blockPersistenceHandler;

/**
* Constructor for the BlockStreamService class.
*
* @param timeoutThresholdMillis the timeout threshold in milliseconds
* @param streamMediator the stream mediator
*/
public BlockStreamService(final long timeoutThresholdMillis,
final StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamMediator,
final BlockPersistenceHandler<BlockStreamServiceGrpcProto.Block> blockPersistenceHandler) {
public BlockStreamService(
final long timeoutThresholdMillis,
final StreamMediator<
BlockStreamServiceGrpcProto.Block,
BlockStreamServiceGrpcProto.BlockResponse>
streamMediator,
final BlockPersistenceHandler<BlockStreamServiceGrpcProto.Block>
blockPersistenceHandler) {

this.timeoutThresholdMillis = timeoutThresholdMillis;
this.streamMediator = streamMediator;
Expand All @@ -76,8 +84,8 @@ public Descriptors.FileDescriptor proto() {
}

/**
* Returns the service name for the BlockStreamService. This service name corresponds to the service name in
* the proto file.
* Returns the service name for the BlockStreamService. This service name corresponds to the
* service name in the proto file.
*
* @return the service name corresponding to the service name in the proto file
*/
Expand All @@ -87,7 +95,8 @@ public String serviceName() {
}

/**
* Updates the routing for the BlockStreamService. It sets up the bidirectional streaming methods for the service.
* Updates the routing for the BlockStreamService. It sets up the bidirectional streaming
* methods for the service.
*
* @param routing the routing for the BlockStreamService
*/
Expand All @@ -99,64 +108,75 @@ public void update(final Routing routing) {
}

/**
* The streamSink method is called by Helidon each time a producer initiates a bidirectional stream.
*
* @param responseStreamObserver Helidon provides a StreamObserver to handle responses back to the producer.
* The streamSink method is called by Helidon each time a producer initiates a bidirectional
* stream.
*
* @return a custom StreamObserver to handle streaming blocks from the producer to all subscribed consumer
* via the streamMediator as well as sending responses back to the producer.
* @param responseStreamObserver Helidon provides a StreamObserver to handle responses back to
* the producer.
* @return a custom StreamObserver to handle streaming blocks from the producer to all
* subscribed consumer via the streamMediator as well as sending responses back to the
* producer.
*/
private StreamObserver<BlockStreamServiceGrpcProto.Block> streamSink(
final StreamObserver<BlockStreamServiceGrpcProto.BlockResponse> responseStreamObserver) {
final StreamObserver<BlockStreamServiceGrpcProto.BlockResponse>
responseStreamObserver) {
LOGGER.log(System.Logger.Level.DEBUG, "Executing bidirectional streamSink method");

return new ProducerBlockStreamObserver(streamMediator, responseStreamObserver);
}

/**
* The streamSource method is called by Helidon each time a consumer initiates a bidirectional stream.
* The streamSource method is called by Helidon each time a consumer initiates a bidirectional
* stream.
*
* @param responseStreamObserver Helidon provides a StreamObserver to handle responses from the consumer
* back to the server.
*
* @return a custom StreamObserver to handle streaming blocks from the producer to the consumer as well
* as handling responses from the consumer.
* @param responseStreamObserver Helidon provides a StreamObserver to handle responses from the
* consumer back to the server.
* @return a custom StreamObserver to handle streaming blocks from the producer to the consumer
* as well as handling responses from the consumer.
*/
private StreamObserver<BlockStreamServiceGrpcProto.BlockResponse> streamSource(final StreamObserver<BlockStreamServiceGrpcProto.Block> responseStreamObserver) {
private StreamObserver<BlockStreamServiceGrpcProto.BlockResponse> streamSource(
final StreamObserver<BlockStreamServiceGrpcProto.Block> responseStreamObserver) {
LOGGER.log(System.Logger.Level.DEBUG, "Executing bidirectional streamSource method");

// Return a custom StreamObserver to handle streaming blocks from the producer.
final LiveStreamObserver<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamObserver = new LiveStreamObserverImpl(
timeoutThresholdMillis,
Clock.systemDefaultZone(),
Clock.systemDefaultZone(),
streamMediator,
responseStreamObserver);
final LiveStreamObserver<
BlockStreamServiceGrpcProto.Block,
BlockStreamServiceGrpcProto.BlockResponse>
streamObserver =
new LiveStreamObserverImpl(
timeoutThresholdMillis,
Clock.systemDefaultZone(),
Clock.systemDefaultZone(),
streamMediator,
responseStreamObserver);

// Subscribe the observer to the mediator
streamMediator.subscribe(streamObserver);

return streamObserver;
}

private void getBlock(BlockStreamServiceGrpcProto.Block block, StreamObserver<BlockStreamServiceGrpcProto.Block> responseObserver) {
private void getBlock(
BlockStreamServiceGrpcProto.Block block,
StreamObserver<BlockStreamServiceGrpcProto.Block> responseObserver) {
String message = "GET BLOCK RESPONSE! ";
LOGGER.log(System.Logger.Level.INFO, "GetBlock request received");
Optional<BlockStreamServiceGrpcProto.Block> responseBlock = blockPersistenceHandler.read(block.getId());
if(responseBlock.isPresent()) {
Optional<BlockStreamServiceGrpcProto.Block> responseBlock =
blockPersistenceHandler.read(block.getId());
if (responseBlock.isPresent()) {
LOGGER.log(System.Logger.Level.INFO, "SENDING BLOCK # " + block.getId());
complete(responseObserver, responseBlock.get()); // TODO: Should return int and not quoted string
complete(
responseObserver,
responseBlock.get()); // TODO: Should return int and not quoted string
} else {
LOGGER.log(System.Logger.Level.INFO, "DID NOT FIND YOUR BLOCK");
// TODO: Fix below. It could return gRPC equivalent of 404 NOT FOUND
responseObserver.onError(Status.NOT_FOUND
.withDescription("DID NOT FIND YOUR BLOCK")
.asRuntimeException()
);
responseObserver.onError(
Status.NOT_FOUND
.withDescription("DID NOT FIND YOUR BLOCK")
.asRuntimeException());
// Keeping below comments for the fix needed above.
// complete(responseObserver, BlockStreamServiceGrpcProto.Block.getDefaultInstance());
}
}
}


7 changes: 3 additions & 4 deletions server/src/main/java/com/hedera/block/server/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@

package com.hedera.block.server;

/**
* Constants used in the BlockNode service.
*/
/** Constants used in the BlockNode service. */
public final class Constants {
private Constants() {}

// Config Constants
public static final String BLOCKNODE_STORAGE_ROOT_PATH_KEY = "blocknode.storage.root.path";
public static final String BLOCKNODE_SERVER_CONSUMER_TIMEOUT_THRESHOLD_KEY = "blocknode.server.consumer.timeout.threshold";
public static final String BLOCKNODE_SERVER_CONSUMER_TIMEOUT_THRESHOLD_KEY =
"blocknode.server.consumer.timeout.threshold";

// Constants specified in the service definition of the .proto file
public static final String SERVICE_NAME = "BlockStreamGrpc";
Expand Down
74 changes: 45 additions & 29 deletions server/src/main/java/com/hedera/block/server/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.hedera.block.server;

import static com.hedera.block.server.Constants.*;

import com.hedera.block.protos.BlockStreamServiceGrpcProto;
import com.hedera.block.server.mediator.LiveStreamMediatorImpl;
import com.hedera.block.server.persistence.WriteThroughCacheHandler;
Expand All @@ -26,20 +28,22 @@
import io.helidon.config.Config;
import io.helidon.webserver.WebServer;
import io.helidon.webserver.grpc.GrpcRouting;

import java.io.IOException;
import java.util.stream.Stream;

import static com.hedera.block.server.Constants.*;

/**
* Main class for the block node server
*/
/** Main class for the block node server */
public class Server {

// Function stubs to satisfy the bidi routing param signatures. The implementations are in the service class.
private static ServerCalls.BidiStreamingMethod<Stream<BlockStreamServiceGrpcProto.Block>, StreamObserver<BlockStreamServiceGrpcProto.Block>> clientBidiStreamingMethod;
private static ServerCalls.BidiStreamingMethod<Stream<BlockStreamServiceGrpcProto.BlockResponse>, StreamObserver<BlockStreamServiceGrpcProto.Block>> serverBidiStreamingMethod;
// Function stubs to satisfy the bidi routing param signatures. The implementations are in the
// service class.
private static ServerCalls.BidiStreamingMethod<
Stream<BlockStreamServiceGrpcProto.Block>,
StreamObserver<BlockStreamServiceGrpcProto.Block>>
clientBidiStreamingMethod;
private static ServerCalls.BidiStreamingMethod<
Stream<BlockStreamServiceGrpcProto.BlockResponse>,
StreamObserver<BlockStreamServiceGrpcProto.Block>>
serverBidiStreamingMethod;

private static final System.Logger LOGGER = System.getLogger(Server.class.getName());

Expand All @@ -59,33 +63,43 @@ public static void main(final String[] args) {
Config.global(config);

// Get Timeout threshold from configuration
final long consumerTimeoutThreshold = config.get(BLOCKNODE_SERVER_CONSUMER_TIMEOUT_THRESHOLD_KEY).asLong().orElse(1500L);
final long consumerTimeoutThreshold =
config.get(BLOCKNODE_SERVER_CONSUMER_TIMEOUT_THRESHOLD_KEY)
.asLong()
.orElse(1500L);

// Initialize the block storage, cache, and service
final BlockStorage<BlockStreamServiceGrpcProto.Block> blockStorage = new FileSystemBlockStorage(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);
final BlockStorage<BlockStreamServiceGrpcProto.Block> blockStorage =
new FileSystemBlockStorage(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);

// Initialize blockStreamService with Live Stream and Cache
final BlockStreamService blockStreamService = new BlockStreamService(consumerTimeoutThreshold,
new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage)),
new WriteThroughCacheHandler(blockStorage));
final BlockStreamService blockStreamService =
new BlockStreamService(
consumerTimeoutThreshold,
new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage)),
new WriteThroughCacheHandler(blockStorage));

// Start the web server
WebServer.builder()
.port(8080)
.addRouting(GrpcRouting.builder()
.service(blockStreamService)
.bidi(BlockStreamServiceGrpcProto.getDescriptor(),
SERVICE_NAME,
CLIENT_STREAMING_METHOD_NAME,
clientBidiStreamingMethod)
.bidi(BlockStreamServiceGrpcProto.getDescriptor(),
SERVICE_NAME,
SERVER_STREAMING_METHOD_NAME,
serverBidiStreamingMethod)
.unary(BlockStreamServiceGrpcProto.getDescriptor(),
SERVICE_NAME,
GET_BLOCK_METHOD_NAME,
Server::grpcGetBlock))
.addRouting(
GrpcRouting.builder()
.service(blockStreamService)
.bidi(
BlockStreamServiceGrpcProto.getDescriptor(),
SERVICE_NAME,
CLIENT_STREAMING_METHOD_NAME,
clientBidiStreamingMethod)
.bidi(
BlockStreamServiceGrpcProto.getDescriptor(),
SERVICE_NAME,
SERVER_STREAMING_METHOD_NAME,
serverBidiStreamingMethod)
.unary(
BlockStreamServiceGrpcProto.getDescriptor(),
SERVICE_NAME,
GET_BLOCK_METHOD_NAME,
Server::grpcGetBlock))
.build()
.start();
} catch (IOException e) {
Expand All @@ -94,5 +108,7 @@ public static void main(final String[] args) {
}
}

static void grpcGetBlock(BlockStreamServiceGrpcProto.BlockRequest request, StreamObserver<BlockStreamServiceGrpcProto.Block> responseObserver) {}
static void grpcGetBlock(
BlockStreamServiceGrpcProto.BlockRequest request,
StreamObserver<BlockStreamServiceGrpcProto.Block> responseObserver) {}
}

0 comments on commit 0cafd92

Please sign in to comment.