Skip to content

Commit

Permalink
fix: #29 refactored live stream implementation and singleBlock
Browse files Browse the repository at this point in the history
- Integration with LMAX Disruptor RingBuffer
- Changed toy types to align with names and relationships like
  hedera-protobufs
- Boosted unit test coverage to 100% for most packages
- Adjusted Docker container to run as the hedera user rather than root
- Changed the consumer bidirection stream to server streaming
- Added exception handling when reading and writing to storage
- Refactored getBlock to singleBlock to align with hedera-protobufs rpc service definition
  and types
- Separated ReadBlock and WriteBlock
- Added RemoveBlock to handle removal of partially written Blocks when an exception is thrown
- Adjusted producer and consumer scripts to work with new types
- Added metrics to report: live block items counted, blocks persisted, single blocks retrieved
  and current subscribers

Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Aug 12, 2024
1 parent 1a98d20 commit fdd0f5c
Show file tree
Hide file tree
Showing 54 changed files with 4,889 additions and 1,431 deletions.
2 changes: 2 additions & 0 deletions gradle/modules.properties
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ com.swirlds.config.processor=com.swirlds:swirlds-config-processor
com.google.auto.service=com.google.auto.service:auto-service-annotations
com.google.auto.service.processor=com.google.auto.service:auto-service
com.google.auto.common=com.google.auto:auto-common
com.github.spotbugs.annotations=com.github.spotbugs:spotbugs-annotations
com.lmax.disruptor=com.lmax:disruptor
io.helidon.webserver=io.helidon.webserver:helidon-webserver
io.helidon.webserver.grpc=io.helidon.webserver:helidon-webserver-grpc
io.helidon.webserver.testing.junit5=io.helidon.webserver.testing.junit5:helidon-webserver-testing-junit5
Expand Down
190 changes: 130 additions & 60 deletions protos/src/main/protobuf/blockstream.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,80 +17,150 @@ syntax = "proto3";
*/

option java_package = "com.hedera.block.protos";
option java_outer_classname = "BlockStreamServiceGrpcProto";
option java_outer_classname = "BlockStreamService";

/**
* The BlockStreamGrpc service definition provides 2 bidirectional streaming methods for
* exchanging blocks with the Block Node server.
*
* A producer (e.g. Consensus Node) can use the StreamSink method to stream blocks to the
* Block Node server. The Block Node server will respond with a BlockResponse message for
* each block received.
*
* A consumer (e.g. Mirror Node) can use the StreamSource method to request a stream of
* blocks from the server. The consumer is expected to respond with a BlockResponse message
* with the id of each block received.
*/
service BlockStreamGrpc {
service BlockStreamGrpcService {

/**
* StreamSink is a bidirectional streaming method that allows a producer to stream blocks
* to the Block Node server. The server will respond with a BlockResponse message for each
* block received.
*/
rpc StreamSink(stream Block) returns (stream BlockResponse) {}
rpc publishBlockStream (stream PublishStreamRequest) returns (stream PublishStreamResponse) {}

/**
* StreamSource is a bidirectional streaming method that allows a consumer to request a
* stream of blocks from the server. The consumer is expected to respond with a BlockResponse
* message with the id of each block received.
*/
rpc StreamSource(stream BlockResponse) returns (stream Block) {}
rpc subscribeBlockStream (SubscribeStreamRequest) returns (stream SubscribeStreamResponse) {}

rpc GetBlock(Block) returns (Block) {}
rpc singleBlock(SingleBlockRequest) returns (SingleBlockResponse) {}
}

/**
* A block is a simple message that contains an id and a value.
* This specification is a simple example meant to expedite development.
* It will be replaced with a PBJ implementation in the future.
*/
message Block {
/**
* The id of the block. Each block id should be unique.
*/
int64 id = 1;
message PublishStreamRequest {
BlockItem block_item = 1;
}

message PublishStreamResponse {
oneof response {
/**
* A response sent for each item and for each block.
*/
ItemAcknowledgement acknowledgement = 1;

/**
* A response sent when a stream ends.
*/
EndOfStream status = 2;
}

message ItemAcknowledgement {
bytes item_ack = 1;
}

message EndOfStream {
PublishStreamResponseCode status = 1;
}

/**
* The value of the block. The value can be any string.
*/
string value = 2;
* An enumeration indicating the status of this request.
*
* This enumeration describes the reason a block stream
* (sent via `writeBlockStream`) ended.
*/
enum PublishStreamResponseCode {
/**
* An "unset value" flag, this value SHALL NOT be used.<br/>
* This status indicates the server software failed to set a
* status, and SHALL be considered a software defect.
*/
STREAM_ITEMS_UNKNOWN = 0;

/**
* The request succeeded.<br/>
* No errors occurred and the source node orderly ended the stream.
*/
STREAM_ITEMS_SUCCESS = 1;

/**
* The delay between items was too long.<br/>
* The source MUST start a new stream before the failed block.
*/
STREAM_ITEMS_TIMEOUT = 2;

/**
* An item was received out-of-order.<br/>
* The source MUST start a new stream before the failed block.
*/
STREAM_ITEMS_OUT_OF_ORDER = 3;

/**
* A block state proof item could not be validated.<br/>
* The source MUST start a new stream before the failed block.
*/
STREAM_ITEMS_BAD_STATE_PROOF = 4;
}
}

/**
* A block response is a simple message that contains an id.
* The block response message is simply meant to disambiguate it
* from the original request. This specification is a simple
* example meant to expedite development. It will be replaced with
* a PBJ implementation in the future.
*/
message BlockResponse {
/**
* The id of the block which was received. Each block id should
* correlate with the id of a Block message id.
*/
int64 id = 1;
message SubscribeStreamRequest {
uint64 start_block_number = 1;
}

message SubscribeStreamResponse {
oneof response {
SubscribeStreamResponseCode status = 1;
BlockItem block_item = 2;
}

enum SubscribeStreamResponseCode {
READ_STREAM_UNKNOWN = 0;
READ_STREAM_INSUFFICIENT_BALANCE = 1;
READ_STREAM_SUCCESS = 2;
READ_STREAM_INVALID_START_BLOCK_NUMBER = 3;
READ_STREAM_INVALID_END_BLOCK_NUMBER = 4;
}
}



message Block {
repeated BlockItem block_items = 1;
}

/**
* A block request is a simple message that contains an id.
* A BlockItem is a simple message that contains an id and a value.
* This specification is a simple example meant to expedite development.
* It will be replaced with a PBJ implementation in the future.
*/
message BlockRequest {
/**
* The id of the block which was requested. Each block id should
* correlate with the id of a Block message id.
*/
int64 id = 1;
message BlockItem {

oneof items {
BlockHeader header = 1;
EventMetadata start_event = 2;
BlockProof state_proof = 3;
}

string value = 4;
}

message BlockHeader {
uint64 block_number = 1;
}

message EventMetadata {
uint64 creator_id = 1;
}

message BlockProof {
uint64 block = 1;
}

message SingleBlockRequest {
uint64 block_number = 1;
}

message SingleBlockResponse {
oneof response {
SingleBlockResponseCode status = 1;
Block block = 2;
}

enum SingleBlockResponseCode {
READ_BLOCK_UNKNOWN = 0;
READ_BLOCK_INSUFFICIENT_BALANCE = 1;
READ_BLOCK_SUCCESS = 2;
READ_BLOCK_NOT_FOUND = 3;
READ_BLOCK_NOT_AVAILABLE = 4;
}
}
16 changes: 12 additions & 4 deletions server/docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,9 +1,20 @@
# Use Eclipse Temurin with Java 21 as the base image
FROM eclipse-temurin:21

# Expose the port that the application will run on
EXPOSE 8080

# Define version
ARG VERSION

# Create a non-root user and group
ARG UNAME=hedera
ARG UID=2000
ARG GID=2000
RUN groupadd -g $GID -o $UNAME
RUN useradd -m -u $UID -g $GID -o -s /bin/bash $UNAME
USER $UNAME

# Set the working directory inside the container
WORKDIR /app

Expand All @@ -13,8 +24,5 @@ COPY --from=distributions server-${VERSION}.tar .
# Extract the TAR file
RUN tar -xvf server-${VERSION}.tar

# Expose the port that the application will run on
EXPOSE 8080

# RUN the bin script for starting the server
ENTRYPOINT ["sh", "-c", "/app/server-${VERSION}/bin/server"]
ENTRYPOINT ["/bin/bash", "-c", "/app/server-${VERSION}/bin/server"]
1 change: 1 addition & 0 deletions server/docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ services:
env_file:
- .env
ports:
- "9999:9999"
- "8080:8080"
- "5005:5005"
56 changes: 56 additions & 0 deletions server/docs/design/block-persistence.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Block Persistence

## Purpose

The main objective of the `hedera-block-node` project is to replace the storage of Consensus Node artifacts (e.g.
Blocks) on cloud storage buckets (e.g. GCS and S3) with a solution managed by the Block Node server. This document aims
to describe the high-level design of how the Block Node persists and retrieves Blocks and how it handles exception cases
when they arise.

---

### Goals

1) BlockItems streamed from a producer (e.g. Consensus Node) must be collated and persisted as a Block. Per the
specification, a Block is an ordered list of BlockItems. How the Block is persisted is an implementation detail.
2) A Block must be efficiently retrieved by block number.

---

### Terms

**BlockItem** - A BlockItem is the primary data structure passed between the producer, the `hedera-block-node`
and consumers. The BlockItem description and protobuf definition are maintained in the `hedera-protobuf`
[project](https://github.com/hashgraph/hedera-protobufs/blob/continue-block-node/documents/api/block/stream/block_item.md).

**Block** - A Block is the base element of the block stream at rest. At present, it consists of an ordered collection of
BlockItems. The Block description and protobuf definition are maintained in the `hedera-protobuf`
[project](https://github.com/hashgraph/hedera-protobufs/blob/continue-block-node/documents/api/block/stream/block.md).

---

### Entities

**BlockReader** - An interface defining methods used to read a Block from storage. It represents a lower-level
component whose implementation is directly responsible for reading a Block from storage.

**BlockWriter** - An interface defining methods used to write BlockItems to storage. It represents a lower-level
component whose implementation is directly responsible for writing a BlockItem to storage as a Block.

**BlockRemover** - An interface defining the methods used to remove a Block from storage. It represents a lower-level
component whose implementation is directly responsible for removing a Block from storage.

---

### Design

The design for `Block` persistence is fairly straightforward. Block server objects should use the persistence entity
interfaces to read, write and remove `Block`s from storage. `BlockItem`s streamed from a producer are read off the wire
one by one and passed to an implementation of `BlockWriter`. The `BlockWriter` is responsible for collecting related
`BlockItem`s into a `Block` and persisting the `Block` to storage in a way that is efficient for retrieval at a later
time. The `BlockWriter` is also responsible for removing a partially written `Block` if an exception occurs while
writing it. For example, if half the `BlockItem`s of a `Block` are written when an IOException occurs, the `BlockWriter`
should remove all the `BlockItem`s of the partially written `Block` and pass the exception up to the caller. Services
requiring one or more `Block`s should leverage a `BlockReader` implementation. The `BlockReader` should be able to
efficiently retrieve a `Block` by block number. The `BlockReader` should pass unrecoverable exceptions when reading
a `Block` up to the caller.
Loading

0 comments on commit fdd0f5c

Please sign in to comment.