Skip to content

Commit

Permalink
fix:added negative tests for a producer
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Jul 22, 2024
1 parent 1c96bfe commit be21434
Show file tree
Hide file tree
Showing 8 changed files with 218 additions and 65 deletions.
65 changes: 60 additions & 5 deletions protos/src/main/protobuf/blockstream.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,64 @@ message PublishStreamRequest {
}

message PublishStreamResponse {
ItemAcknowledgement acknowledgement = 1;
}
oneof response {
/**
* A response sent for each item and for each block.
*/
ItemAcknowledgement acknowledgement = 1;

/**
* A response sent when a stream ends.
*/
EndOfStream status = 2;
}

message ItemAcknowledgement {
bytes item_ack = 1;
}

message EndOfStream {
PublishStreamResponseCode status = 1;
}

message ItemAcknowledgement {
bytes item_ack = 1;
/**
* An enumeration indicating the status of this request.
*
* This enumeration describes the reason a block stream
* (sent via `writeBlockStream`) ended.
*/
enum PublishStreamResponseCode {
/**
* An "unset value" flag, this value SHALL NOT be used.<br/>
* This status indicates the server software failed to set a
* status, and SHALL be considered a software defect.
*/
STREAM_ITEMS_UNKNOWN = 0;

/**
* The request succeeded.<br/>
* No errors occurred and the source node orderly ended the stream.
*/
STREAM_ITEMS_SUCCESS = 1;

/**
* The delay between items was too long.<br/>
* The source MUST start a new stream before the failed block.
*/
STREAM_ITEMS_TIMEOUT = 2;

/**
* An item was received out-of-order.<br/>
* The source MUST start a new stream before the failed block.
*/
STREAM_ITEMS_OUT_OF_ORDER = 3;

/**
* A block state proof item could not be validated.<br/>
* The source MUST start a new stream before the failed block.
*/
STREAM_ITEMS_BAD_STATE_PROOF = 4;
}
}

message SubscribeStreamRequest {
Expand All @@ -50,6 +103,7 @@ message SubscribeStreamResponse {
message Block {
repeated BlockItem block_items = 1;
}

/**
* A BlockItem is a simple message that contains an id and a value.
* This specification is a simple example meant to expedite development.
Expand All @@ -76,4 +130,5 @@ message EventMetadata {

message BlockProof {
uint64 block = 1;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.hedera.block.server.consumer.ConsumerBlockItemObserver;
import com.hedera.block.server.data.ObjectEvent;
import com.hedera.block.server.mediator.StreamMediator;
import com.hedera.block.server.producer.ItemAckBuilder;
import com.hedera.block.server.producer.ProducerBlockItemObserver;
import io.grpc.stub.StreamObserver;
import io.helidon.webserver.grpc.GrpcService;
Expand All @@ -43,6 +44,7 @@ public class BlockStreamService implements GrpcService {
private final System.Logger LOGGER = System.getLogger(getClass().getName());

private final long timeoutThresholdMillis;
private final ItemAckBuilder itemAckBuilder;
private final StreamMediator<ObjectEvent<BlockItem>, BlockItem> streamMediator;

/**
Expand All @@ -52,8 +54,10 @@ public class BlockStreamService implements GrpcService {
*/
public BlockStreamService(
final long timeoutThresholdMillis,
final ItemAckBuilder itemAckBuilder,
final StreamMediator<ObjectEvent<BlockItem>, BlockItem> streamMediator) {
this.timeoutThresholdMillis = timeoutThresholdMillis;
this.itemAckBuilder = itemAckBuilder;
this.streamMediator = streamMediator;
}

Expand Down Expand Up @@ -96,7 +100,8 @@ private StreamObserver<PublishStreamRequest> publishBlockStream(
System.Logger.Level.DEBUG,
"Executing bidirectional publishBlockStream gRPC method");

return new ProducerBlockItemObserver(streamMediator, publishStreamResponseObserver);
return new ProducerBlockItemObserver(
streamMediator, publishStreamResponseObserver, itemAckBuilder);
}

private void subscribeBlockStream(
Expand Down
2 changes: 2 additions & 0 deletions server/src/main/java/com/hedera/block/server/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.hedera.block.server.mediator.LiveStreamMediatorImpl;
import com.hedera.block.server.persistence.WriteThroughCacheHandler;
import com.hedera.block.server.persistence.storage.*;
import com.hedera.block.server.producer.ItemAckBuilder;
import io.grpc.stub.ServerCalls;
import io.grpc.stub.StreamObserver;
import io.helidon.config.Config;
Expand Down Expand Up @@ -74,6 +75,7 @@ public static void main(final String[] args) {
final BlockStreamService blockStreamService =
new BlockStreamService(
consumerTimeoutThreshold,
new ItemAckBuilder(),
new LiveStreamMediatorImpl(
new WriteThroughCacheHandler(blockReader, blockWriter),
(streamMediator) -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.block.server.producer;

import static com.hedera.block.protos.BlockStreamService.BlockItem;
import static com.hedera.block.protos.BlockStreamService.PublishStreamResponse.ItemAcknowledgement;
import static com.hedera.block.server.producer.Util.getFakeHash;

import com.google.protobuf.ByteString;
import java.io.IOException;
import java.security.NoSuchAlgorithmException;

public class ItemAckBuilder {
public ItemAcknowledgement buildAck(final BlockItem blockItem)
throws IOException, NoSuchAlgorithmException {
// TODO: Use real hash
return ItemAcknowledgement.newBuilder()
.setItemAck(ByteString.copyFrom(getFakeHash(blockItem)))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,12 @@
package com.hedera.block.server.producer;

import static com.hedera.block.protos.BlockStreamService.*;
import static com.hedera.block.protos.BlockStreamService.PublishStreamResponse.*;

import com.google.protobuf.ByteString;
import com.hedera.block.server.data.ObjectEvent;
import com.hedera.block.server.mediator.StreamMediator;
import io.grpc.stub.StreamObserver;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;

/**
Expand All @@ -39,6 +36,7 @@ public class ProducerBlockItemObserver implements StreamObserver<PublishStreamRe

private final StreamObserver<PublishStreamResponse> publishStreamResponseObserver;
private final StreamMediator<ObjectEvent<BlockItem>, BlockItem> streamMediator;
private final ItemAckBuilder itemAckBuilder;

/**
* Constructor for the ProducerBlockStreamObserver class. It is responsible for calling the
Expand All @@ -47,9 +45,12 @@ public class ProducerBlockItemObserver implements StreamObserver<PublishStreamRe
*/
public ProducerBlockItemObserver(
final StreamMediator<ObjectEvent<BlockItem>, BlockItem> streamMediator,
final StreamObserver<PublishStreamResponse> publishStreamResponseObserver) {
final StreamObserver<PublishStreamResponse> publishStreamResponseObserver,
final ItemAckBuilder itemAckBuilder) {

this.streamMediator = streamMediator;
this.publishStreamResponseObserver = publishStreamResponseObserver;
this.itemAckBuilder = itemAckBuilder;
}

/**
Expand All @@ -64,16 +65,20 @@ public void onNext(final PublishStreamRequest publishStreamRequest) {
streamMediator.publishEvent(blockItem);

try {
// Send a response back to the upstream producer
// TODO: Use real hash
final ItemAcknowledgement itemAck =
ItemAcknowledgement.newBuilder()
.setItemAck(ByteString.copyFrom(getFakeHash(blockItem)))
.build();
final ItemAcknowledgement itemAck = itemAckBuilder.buildAck(blockItem);
final PublishStreamResponse publishStreamResponse =
PublishStreamResponse.newBuilder().setAcknowledgement(itemAck).build();
publishStreamResponseObserver.onNext(publishStreamResponse);

} catch (IOException | NoSuchAlgorithmException e) {

final EndOfStream endOfStream =
EndOfStream.newBuilder()
.setStatus(PublishStreamResponseCode.STREAM_ITEMS_UNKNOWN)
.build();
final PublishStreamResponse errorResponse =
PublishStreamResponse.newBuilder().setStatus(endOfStream).build();
publishStreamResponseObserver.onNext(errorResponse);
LOGGER.log(System.Logger.Level.ERROR, "Error calculating hash", e);
}
}
Expand All @@ -99,20 +104,4 @@ public void onCompleted() {
LOGGER.log(System.Logger.Level.DEBUG, "ProducerBlockStreamObserver completed");
publishStreamResponseObserver.onCompleted();
}

private static byte[] getFakeHash(BlockItem blockItem)
throws IOException, NoSuchAlgorithmException {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
try (ObjectOutputStream objectOutputStream =
new ObjectOutputStream(byteArrayOutputStream)) {
objectOutputStream.writeObject(blockItem);
}

// Get the serialized bytes
byte[] serializedObject = byteArrayOutputStream.toByteArray();

// Calculate the SHA-256 hash
MessageDigest digest = MessageDigest.getInstance("SHA-384");
return digest.digest(serializedObject);
}
}
45 changes: 45 additions & 0 deletions server/src/main/java/com/hedera/block/server/producer/Util.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.block.server.producer;

import static com.hedera.block.protos.BlockStreamService.BlockItem;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;

public final class Util {
private Util() {}

public static byte[] getFakeHash(BlockItem blockItem)
throws IOException, NoSuchAlgorithmException {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
try (ObjectOutputStream objectOutputStream =
new ObjectOutputStream(byteArrayOutputStream)) {
objectOutputStream.writeObject(blockItem);
}

// Get the serialized bytes
byte[] serializedObject = byteArrayOutputStream.toByteArray();

// Calculate the SHA-256 hash
MessageDigest digest = MessageDigest.getInstance("SHA-384");
return digest.digest(serializedObject);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ public class ConsumerBlockItemObserverTest {
private final long TIMEOUT_THRESHOLD_MILLIS = 50L;
private final long TEST_TIME = 1_719_427_664_950L;

@Mock private StreamMediator<ObjectEvent<BlockItem>, BlockItem> streamMediator;
private Object lock = new Object();

@Mock private StreamMediator<ObjectEvent<BlockItem>, BlockItem> streamMediator;
@Mock private StreamObserver<SubscribeStreamResponse> responseStreamObserver;

@Mock private ObjectEvent<BlockItem> objectEvent;

@Test
Expand Down Expand Up @@ -115,8 +115,8 @@ public void testConsumerNotToSendBeforeBlockHeader() throws InterruptedException
final SubscribeStreamResponse subscribeStreamResponse =
SubscribeStreamResponse.newBuilder().setBlockItem(blockItem).build();

synchronized (responseStreamObserver) {
responseStreamObserver.wait(2000);
synchronized (lock) {
lock.wait(50);
}

// Confirm that the observer was called with the next BlockItem
Expand Down
Loading

0 comments on commit be21434

Please sign in to comment.