Skip to content

Commit

Permalink
Attempting to send a gRPC Status.NOT_FOUND on when the block does not…
Browse files Browse the repository at this point in the history
… exist.

Signed-off-by: a-saksena <[email protected]>
  • Loading branch information
a-saksena committed Jul 24, 2024
1 parent e39f64d commit eaec07d
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 5 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,7 @@ gradle-app.setting
# JDT-specific (Eclipse Java Development Tools)
.classpath

.idea
.DS_Store
# .env files
server/docker/.env
2 changes: 2 additions & 0 deletions protos/src/main/protobuf/blockstream.proto
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ service BlockStreamGrpc {
* message with the id of each block received.
*/
rpc StreamSource(stream BlockResponse) returns (stream Block) {}

rpc GetBlock(Block) returns (Block) {}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,16 @@
import com.hedera.block.server.consumer.LiveStreamObserver;
import com.hedera.block.server.consumer.LiveStreamObserverImpl;
import com.hedera.block.server.mediator.StreamMediator;
import com.hedera.block.server.persistence.BlockPersistenceHandler;
import com.hedera.block.server.producer.ProducerBlockStreamObserver;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import io.helidon.webserver.grpc.GrpcService;

import java.time.Clock;
import java.util.Optional;

import static io.helidon.webserver.grpc.ResponseHelper.complete;

import static com.hedera.block.server.Constants.*;

Expand All @@ -43,6 +48,7 @@ public class BlockStreamService implements GrpcService {

private final long timeoutThresholdMillis;
private final StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamMediator;
private final BlockPersistenceHandler<BlockStreamServiceGrpcProto.Block> blockPersistenceHandler;

/**
* Constructor for the BlockStreamService class.
Expand All @@ -51,10 +57,12 @@ public class BlockStreamService implements GrpcService {
* @param streamMediator the stream mediator
*/
public BlockStreamService(final long timeoutThresholdMillis,
final StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamMediator) {
final StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamMediator,
final BlockPersistenceHandler<BlockStreamServiceGrpcProto.Block> blockPersistenceHandler) {

this.timeoutThresholdMillis = timeoutThresholdMillis;
this.streamMediator = streamMediator;
this.blockPersistenceHandler = blockPersistenceHandler;
}

/**
Expand Down Expand Up @@ -87,6 +95,7 @@ public String serviceName() {
public void update(final Routing routing) {
routing.bidi(CLIENT_STREAMING_METHOD_NAME, this::streamSink);
routing.bidi(SERVER_STREAMING_METHOD_NAME, this::streamSource);
routing.unary("GetBlock", this::getBlock);
}

/**
Expand Down Expand Up @@ -129,6 +138,24 @@ private StreamObserver<BlockStreamServiceGrpcProto.BlockResponse> streamSource(f

return streamObserver;
}

private void getBlock(BlockStreamServiceGrpcProto.Block block, StreamObserver<BlockStreamServiceGrpcProto.Block> responseObserver) {
String message = "GET BLOCK RESPONSE! ";
LOGGER.log(System.Logger.Level.INFO, "GetBlock request received");
Optional<BlockStreamServiceGrpcProto.Block> responseBlock = blockPersistenceHandler.read(block.getId());
if(responseBlock.isPresent()) {
LOGGER.log(System.Logger.Level.INFO, "SENDING BLOCK # " + block.getId());
complete(responseObserver, responseBlock.get()); // TODO: Should return int and not quoted string
} else {
LOGGER.log(System.Logger.Level.INFO, "DID NOT FIND YOUR BLOCK");
// TODO: Fix below. It could return gRPC equivalent of 404 NOT FOUND
responseObserver.onError(Status.NOT_FOUND
.withDescription("DID NOT FIND YOUR BLOCK")
.asRuntimeException()
);
// complete(responseObserver, BlockStreamServiceGrpcProto.Block.getDefaultInstance());
}
}
}


15 changes: 11 additions & 4 deletions server/src/main/java/com/hedera/block/server/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import io.helidon.config.Config;
import io.helidon.webserver.WebServer;
import io.helidon.webserver.grpc.GrpcRouting;
import io.helidon.webserver.http.HttpRouting;

import java.io.IOException;
import java.util.stream.Stream;
Expand Down Expand Up @@ -64,8 +63,11 @@ public static void main(final String[] args) {

// Initialize the block storage, cache, and service
final BlockStorage<BlockStreamServiceGrpcProto.Block> blockStorage = new FileSystemBlockStorage(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);

// TODO: Make timeoutThresholdMillis configurable
final BlockStreamService blockStreamService = new BlockStreamService(consumerTimeoutThreshold,
new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage)));
new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage)),
new WriteThroughCacheHandler(blockStorage));

// Start the web server
WebServer.builder()
Expand All @@ -79,13 +81,18 @@ public static void main(final String[] args) {
.bidi(BlockStreamServiceGrpcProto.getDescriptor(),
SERVICE_NAME,
SERVER_STREAMING_METHOD_NAME,
serverBidiStreamingMethod))
serverBidiStreamingMethod)
.unary(BlockStreamServiceGrpcProto.getDescriptor(),
"BlockStreamGrpc",
"GetBlock",
Server::grpcGetBlock))
.build()
.start();

} catch (IOException e) {
LOGGER.log(System.Logger.Level.ERROR, "An exception was thrown starting the server", e);
throw new RuntimeException(e);
}
}

static void grpcGetBlock(BlockStreamServiceGrpcProto.BlockRequest request, StreamObserver<BlockStreamServiceGrpcProto.Block> responseObserver) {}
}

0 comments on commit eaec07d

Please sign in to comment.