Skip to content

Commit

Permalink
feat: Implement gRPC publishBlockStream streaming of blocks (#15541)
Browse files Browse the repository at this point in the history
Signed-off-by: Petar Tonev <[email protected]>
  • Loading branch information
petreze authored Oct 31, 2024
1 parent 0b2f935 commit 196d78a
Show file tree
Hide file tree
Showing 11 changed files with 384 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,19 @@ extraJavaModuleInfo {
module("io.grpc:grpc-util", "io.grpc.util")
module("io.grpc:grpc-protobuf", "io.grpc.protobuf")
module("io.grpc:grpc-protobuf-lite", "io.grpc.protobuf.lite")
module("io.helidon.common:helidon-common", "io.helidon.common") {
exportAllPackages()
patchRealModule()
}
module("io.helidon.webclient:helidon-webclient", "io.helidon.webclient") {
requireAllDefinedDependencies()
patchRealModule()
}
module("io.helidon.webclient:helidon-webclient-grpc", "io.helidon.webclient.grpc") {
exportAllPackages()
requireAllDefinedDependencies()
patchRealModule()
}
module("com.github.spotbugs:spotbugs-annotations", "com.github.spotbugs.annotations")
module("com.google.code.findbugs:jsr305", "java.annotation")
module("com.google.protobuf:protobuf-java", "com.google.protobuf") {
Expand Down
3 changes: 3 additions & 0 deletions hapi/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@
exports com.hedera.hapi.block.stream.schema;
exports com.hedera.hapi.node.state.tss;
exports com.hedera.hapi.services.auxiliary.tss;
exports com.hedera.hapi.block.protoc;
exports com.hedera.hapi.block.stream.protoc;
exports com.hedera.hapi.block;

requires transitive com.google.common;
requires transitive com.google.protobuf;
Expand Down
9 changes: 9 additions & 0 deletions hedera-dependency-versions/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ dependencies {
}

dependencies.constraints {
api("io.helidon.common:helidon-common:4.1.1") {
because("io.helidon.common")
}
api("io.helidon.webclient:helidon-webclient:4.1.1") {
because("io.helidon.webclient")
}
api("io.helidon.webclient:helidon-webclient-grpc:4.1.1") {
because("io.helidon.webclient.grpc")
}
api("org.awaitility:awaitility:4.2.0") {
because("awaitility")
}
Expand Down
3 changes: 3 additions & 0 deletions hedera-node/hedera-app/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ mainModuleInfo {
// This is needed to pick up and include the native libraries for the netty epoll transport
runtimeOnly("io.netty.transport.epoll.linux.x86_64")
runtimeOnly("io.netty.transport.epoll.linux.aarch_64")
runtimeOnly("io.helidon.grpc.core")
runtimeOnly("io.helidon.webclient")
runtimeOnly("io.helidon.webclient.grpc")
}

testModuleInfo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public interface BlockItemWriter {
* Writes a serialized item to the destination stream.
*
* @param bytes the serialized item to write
* @return the block item writer
*/
default BlockItemWriter writePbjItem(@NonNull final Bytes bytes) {
requireNonNull(bytes);
Expand All @@ -47,13 +48,15 @@ default BlockItemWriter writePbjItem(@NonNull final Bytes bytes) {
* Writes a serialized item to the destination stream.
*
* @param bytes the serialized item to write
* @return the block item writer
*/
BlockItemWriter writeItem(@NonNull byte[] bytes);

/**
* Writes a pre-serialized sequence of items to the destination stream.
*
* @param data the serialized item to write
* @return the block item writer
*/
BlockItemWriter writeItems(@NonNull BufferedData data);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.hedera.node.app.blocks.impl.BlockStreamManagerImpl;
import com.hedera.node.app.blocks.impl.FileBlockItemWriter;
import com.hedera.node.app.blocks.impl.GrpcBlockItemWriter;
import com.hedera.node.config.ConfigProvider;
import com.hedera.node.config.data.BlockStreamConfig;
import com.swirlds.state.spi.info.NodeInfo;
Expand Down Expand Up @@ -45,7 +46,7 @@ static Supplier<BlockItemWriter> bindBlockItemWriterSupplier(
final var blockStreamConfig = config.getConfigData(BlockStreamConfig.class);
return switch (blockStreamConfig.writerMode()) {
case FILE -> () -> new FileBlockItemWriter(configProvider, selfNodeInfo, fileSystem);
case GRPC -> throw new IllegalArgumentException("gRPC block writer not yet implemented");
case GRPC -> () -> new GrpcBlockItemWriter(blockStreamConfig);
};
}
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import com.hedera.node.config.data.BlockRecordStreamConfig;
import com.hedera.node.config.data.BlockStreamConfig;
import com.hedera.node.config.data.VersionConfig;
import com.hedera.node.config.types.BlockStreamWriterMode;
import com.hedera.pbj.runtime.io.buffer.BufferedData;
import com.hedera.pbj.runtime.io.buffer.Bytes;
import com.swirlds.config.api.Configuration;
Expand Down Expand Up @@ -95,6 +96,7 @@ public class BlockStreamManagerImpl implements BlockStreamManager {
private static final Logger log = LogManager.getLogger(BlockStreamManagerImpl.class);

private final int roundsPerBlock;
private final BlockStreamWriterMode streamWriterType;
private final int hashCombineBatchSize;
private final int serializationBatchSize;
private final TssBaseService tssBaseService;
Expand Down Expand Up @@ -174,6 +176,7 @@ public BlockStreamManagerImpl(
this.hapiVersion = hapiVersionFrom(config);
final var blockStreamConfig = config.getConfigData(BlockStreamConfig.class);
this.roundsPerBlock = blockStreamConfig.roundsPerBlock();
this.streamWriterType = blockStreamConfig.writerMode();
this.hashCombineBatchSize = blockStreamConfig.hashCombineBatchSize();
this.serializationBatchSize = blockStreamConfig.serializationBatchSize();
this.blockHashManager = new BlockHashManager(config);
Expand Down Expand Up @@ -394,7 +397,10 @@ public synchronized void accept(@NonNull final byte[] message, @NonNull final by
.blockSignature(blockSignature)
.siblingHashes(siblingHashes.stream().flatMap(List::stream).toList());
final var proofItem = BlockItem.newBuilder().blockProof(proof).build();
block.writer().writePbjItem(BlockItem.PROTOBUF.toBytes(proofItem)).closeBlock();
block.writer().writePbjItem(BlockItem.PROTOBUF.toBytes(proofItem));
if (streamWriterType == BlockStreamWriterMode.FILE) {
block.writer().closeBlock();
}
if (block.number() != blockNumber) {
siblingHashes.removeFirst();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.node.app.blocks.impl;

import static com.hedera.hapi.block.protoc.PublishStreamResponseCode.STREAM_ITEMS_UNKNOWN;
import static io.grpc.Status.fromThrowable;
import static java.util.Objects.requireNonNull;

import com.google.common.annotations.VisibleForTesting;
import com.hedera.hapi.block.protoc.BlockStreamServiceGrpc;
import com.hedera.hapi.block.protoc.PublishStreamRequest;
import com.hedera.hapi.block.protoc.PublishStreamResponse;
import com.hedera.hapi.block.protoc.PublishStreamResponse.Acknowledgement;
import com.hedera.hapi.block.protoc.PublishStreamResponse.EndOfStream;
import com.hedera.hapi.block.protoc.PublishStreamResponseCode;
import com.hedera.hapi.block.stream.protoc.BlockItem;
import com.hedera.node.app.blocks.BlockItemWriter;
import com.hedera.node.config.data.BlockStreamConfig;
import com.hedera.pbj.runtime.io.buffer.BufferedData;
import edu.umd.cs.findbugs.annotations.NonNull;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import io.helidon.common.tls.Tls;
import io.helidon.webclient.grpc.GrpcClient;
import io.helidon.webclient.grpc.GrpcClientMethodDescriptor;
import io.helidon.webclient.grpc.GrpcClientProtocolConfig;
import io.helidon.webclient.grpc.GrpcServiceClient;
import io.helidon.webclient.grpc.GrpcServiceDescriptor;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
* Implements the bidirectional streaming RPC for the publishBlockStream rpc in BlockStreamService
* See <a href="https://grpc.io/docs/languages/java/basics/">gRPC Basics</a>
*/
public class GrpcBlockItemWriter implements BlockItemWriter {

private static final Logger logger = LogManager.getLogger(GrpcBlockItemWriter.class);
private static final String INVALID_MESSAGE = "Invalid protocol buffer converting %s from PBJ to protoc for %s";
private static final String GRPC_END_POINT =
BlockStreamServiceGrpc.getPublishBlockStreamMethod().getBareMethodName();

private StreamObserver<PublishStreamRequest> requestObserver;
private final GrpcServiceClient grpcServiceClient;
private long blockNumber;

/** The state of this writer */
private State state = State.UNINITIALIZED;

/**
* The current state of the gRPC writer.
*/
public enum State {
/**
* The gRPC client is not initialized.
*/
UNINITIALIZED,
/**
* The gRPC client is currently open and blocks can be streamed.
*/
OPEN,
/**
* The gRPC client is already closed and cannot be used to stream blocks.
*/
CLOSED
}

/**
* @param blockStreamConfig the block stream configuration
*/
public GrpcBlockItemWriter(@NonNull final BlockStreamConfig blockStreamConfig) {
requireNonNull(blockStreamConfig, "The supplied argument 'blockStreamConfig' cannot be null!");
GrpcClient client;
try {
client = GrpcClient.builder()
.tls(Tls.builder().enabled(false).build())
.baseUri(new URI(
null,
null,
blockStreamConfig.grpcAddress(),
blockStreamConfig.grpcPort(),
null,
null,
null))
.protocolConfig(GrpcClientProtocolConfig.builder()
.abortPollTimeExpired(false)
.build())
.build();
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
grpcServiceClient = client.serviceClient(GrpcServiceDescriptor.builder()
.serviceName(BlockStreamServiceGrpc.SERVICE_NAME)
.putMethod(
GRPC_END_POINT,
GrpcClientMethodDescriptor.bidirectional(BlockStreamServiceGrpc.SERVICE_NAME, GRPC_END_POINT)
.requestType(PublishStreamRequest.class)
.responseType(PublishStreamResponse.class)
.build())
.build());
}

@Override
public void openBlock(long blockNumber) {
if (state != State.UNINITIALIZED) throw new IllegalStateException("GrpcBlockItemWriter initialized twice");

if (blockNumber < 0) throw new IllegalArgumentException("Block number must be non-negative");
this.blockNumber = blockNumber;
requestObserver = grpcServiceClient.bidi(GRPC_END_POINT, new StreamObserver<PublishStreamResponse>() {
@Override
public void onNext(PublishStreamResponse streamResponse) {
if (streamResponse.hasAcknowledgement()) {
final Acknowledgement acknowledgement = streamResponse.getAcknowledgement();
if (acknowledgement.hasBlockAck()) {
logger.info("PublishStreamResponse: a full block received: {}", acknowledgement.getBlockAck());
} else if (acknowledgement.hasItemAck()) {
logger.info(
"PublishStreamResponse: a single block item is received: {}",
acknowledgement.getItemAck());
}
} else if (streamResponse.hasStatus()) {
final EndOfStream endOfStream = streamResponse.getStatus();
if (endOfStream.getStatus().equals(STREAM_ITEMS_UNKNOWN)) {
logger.info(
"Error returned from block node at block number {}: {}",
endOfStream.getBlockNumber(),
endOfStream);
onNext(buildErrorResponse(STREAM_ITEMS_UNKNOWN));
}
}
}

@Override
public void onError(Throwable t) {
// Maybe this should be considered in this case:
// https://github.com/hashgraph/hedera-services/issues/15530
final Status status = fromThrowable(t);
logger.error("error occurred with an exception: ", status.toString());
requestObserver.onError(t);
}

@Override
public void onCompleted() {
logger.info("PublishStreamResponse completed");
requestObserver.onCompleted();
}
});
this.state = State.OPEN;
}

@Override
public BlockItemWriter writeItem(@NonNull final byte[] bytes) {
requireNonNull(bytes);
if (state != State.OPEN) {
throw new IllegalStateException(
"Cannot write to a GrpcBlockItemWriter that is not open for block: " + this.blockNumber);
}

PublishStreamRequest request = PublishStreamRequest.newBuilder().build();
try {
request = PublishStreamRequest.newBuilder()
.setBlockItem(BlockItem.parseFrom(bytes))
.build();
requestObserver.onNext(request);
} catch (IOException e) {
final String message = INVALID_MESSAGE.formatted("PublishStreamResponse", request);
throw new RuntimeException(message, e);
}
return this;
}

@Override
public BlockItemWriter writeItems(@NonNull BufferedData data) {
requireNonNull(data);
if (state != State.OPEN) {
throw new IllegalStateException(
"Cannot write to a GrpcBlockItemWriter that is not open for block: " + this.blockNumber);
}

PublishStreamRequest request = PublishStreamRequest.newBuilder().build();
try {
request = PublishStreamRequest.newBuilder()
.setBlockItem(BlockItem.parseFrom(data.asInputStream()))
.build();
requestObserver.onNext(request);
} catch (IOException e) {
final String message = INVALID_MESSAGE.formatted("PublishStreamResponse", request);
throw new RuntimeException(message, e);
}
return this;
}

@Override
public void closeBlock() {
if (state.ordinal() < State.OPEN.ordinal()) {
throw new IllegalStateException("Cannot close a GrpcBlockItemWriter that is not open");
} else if (state.ordinal() == State.CLOSED.ordinal()) {
throw new IllegalStateException("Cannot close a GrpcBlockItemWriter that is already closed");
}

requestObserver.onCompleted();
this.state = State.CLOSED;
}

/**
* @return the current state of the gRPC writer
*/
@VisibleForTesting
public long getBlockNumber() {
return blockNumber;
}

/**
* @return the current state of the gRPC writer
*/
@VisibleForTesting
public State getState() {
return state;
}

/**
* @param errorCode the error code for the stream response
* @return the error stream response
*/
private PublishStreamResponse buildErrorResponse(PublishStreamResponseCode errorCode) {
final EndOfStream endOfStream =
EndOfStream.newBuilder().setStatus(errorCode).build();
return PublishStreamResponse.newBuilder().setStatus(endOfStream).build();
}
}
3 changes: 3 additions & 0 deletions hedera-node/hedera-app/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
requires com.google.protobuf;
requires io.grpc.netty;
requires io.grpc;
requires io.helidon.common.tls;
requires io.helidon.webclient.api;
requires io.helidon.webclient.grpc;
requires io.netty.handler;
requires io.netty.transport.classes.epoll;
requires io.netty.transport;
Expand Down
Loading

0 comments on commit 196d78a

Please sign in to comment.