Skip to content

Commit

Permalink
wip: debugging client streaming issue
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Nov 21, 2024
1 parent dc28411 commit b527915
Show file tree
Hide file tree
Showing 10 changed files with 37 additions and 18 deletions.
5 changes: 4 additions & 1 deletion server/docker/logging.properties
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
.level=INFO

# Helidon loggers
io.helidon.webserver.level=INFO
io.helidon.webserver.level=FINE
io.helidon.config.level=SEVERE
io.helidon.security.level=INFO
io.helidon.common.level=INFO
Expand All @@ -27,6 +27,9 @@ io.helidon.common.level=INFO
#com.hedera.block.server.persistence.storage.write.BlockAsDirWriter.level=FINE
#com.hedera.block.server.consumer.ConsumerStreamResponseObserver.level=FINE

#com.hedera.pbj.grpc.helidon.PbjProtocolHandler.level=FINE
com.hedera.pbj.grpc.helidon.level=FINE

# Console handler configuration
handlers = java.util.logging.ConsoleHandler
java.util.logging.ConsoleHandler.level = FINE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ public ConsumerStreamResponseObserver(
public void onEvent(
@NonNull final ObjectEvent<SubscribeStreamResponseUnparsed> event, final long l, final boolean b) {

LOGGER.log(
DEBUG,
"Received SubscribeStreamResponse event: "
+ event.get().blockItems().blockItems().size());

// Only send the response if the consumer has not cancelled
// or closed the stream.
if (isResponsePermitted.get()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,14 @@ public void publish(@NonNull final List<BlockItemUnparsed> blockItems) {
if (serviceStatus.isRunning()) {

// Publish the block for all subscribers to receive
LOGGER.log(DEBUG, "Publishing BlockItem");
final BlockItemSetUnparsed blockItemsSet =
BlockItemSetUnparsed.newBuilder().blockItems(blockItems).build();

final var subscribeStreamResponse = SubscribeStreamResponseUnparsed.newBuilder()
.blockItems(blockItemsSet)
.build();

LOGGER.log(DEBUG, "Publishing BlockItems: " + blockItems.size());
ringBuffer.publishEvent((event, sequence) -> event.set(subscribeStreamResponse));

long remainingCapacity = ringBuffer.remainingCapacity();
Expand All @@ -117,6 +118,9 @@ public void publish(@NonNull final List<BlockItemUnparsed> blockItems) {
// Increment the block item counter by all block items published
metricsService.get(LiveBlockItems).add(blockItems.size());

// LOGGER.log(DEBUG, "Subscriber count: " + subscriberCount());
// metricsService.get(Consumers).set(subscriberCount());

} else {
LOGGER.log(ERROR, "StreamMediator is not accepting BlockItems");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,9 @@ public void unsubscribeAllExpired() {}
*/
@Override
public void notifyUnrecoverableError() {}

@Override
public int subscriberCount() {
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,6 @@ public interface SubscriptionHandler<V> {

/** Unsubscribes all the expired handlers from the stream of events. */
void unsubscribeAllExpired();

int subscriberCount();
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@
*/
public abstract class SubscriptionHandlerBase<V> implements SubscriptionHandler<V> {

private final Map<BlockNodeEventHandler<ObjectEvent<V>>, BatchEventProcessor<ObjectEvent<V>>>
subscribers;
private final Map<BlockNodeEventHandler<ObjectEvent<V>>, BatchEventProcessor<ObjectEvent<V>>> subscribers;

/** The ring buffer to publish events to the subscribers. */
protected final RingBuffer<ObjectEvent<V>> ringBuffer;
Expand All @@ -60,11 +59,7 @@ public abstract class SubscriptionHandlerBase<V> implements SubscriptionHandler<
* @param ringBufferSize the size of the ring buffer
*/
protected SubscriptionHandlerBase(
@NonNull
final Map<
BlockNodeEventHandler<ObjectEvent<V>>,
BatchEventProcessor<ObjectEvent<V>>>
subscribers,
@NonNull final Map<BlockNodeEventHandler<ObjectEvent<V>>, BatchEventProcessor<ObjectEvent<V>>> subscribers,
@NonNull final LongGauge subscriptionGauge,
final int ringBufferSize) {

Expand All @@ -89,8 +84,7 @@ public void subscribe(@NonNull final BlockNodeEventHandler<ObjectEvent<V>> handl
if (!subscribers.containsKey(handler)) {
// Initialize the batch event processor and set it on the ring buffer
final var batchEventProcessor =
new BatchEventProcessorBuilder()
.build(ringBuffer, ringBuffer.newBarrier(), handler);
new BatchEventProcessorBuilder().build(ringBuffer, ringBuffer.newBarrier(), handler);

ringBuffer.addGatingSequences(batchEventProcessor.getSequence());
executor.execute(batchEventProcessor);
Expand Down Expand Up @@ -143,4 +137,9 @@ public void unsubscribeAllExpired() {
.filter(BlockNodeEventHandler::isTimeoutExpired)
.forEach(this::unsubscribe);
}

@Override
public int subscriberCount() {
return subscribers.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,7 @@ void subscribeBlockStream(
@NonNull
private SubscribeStreamRequest parseSubscribeStreamRequest(
@NonNull final Bytes message, @NonNull final RequestOptions options) throws ParseException {
// TODO: Copying bytes to avoid using references passed from Helidon. Investigate if this is necessary.
return SubscribeStreamRequest.PROTOBUF.parse(Bytes.wrap(message.toByteArray()));
return SubscribeStreamRequest.PROTOBUF.parse(message);
}

@NonNull
Expand Down
2 changes: 1 addition & 1 deletion server/src/main/resources/app.properties
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ prometheus.endpointPortNumber=9999
# Timeout for consumers to wait for block item before timing out.
# Default is 1500 milliseconds.
#consumer.timeoutThresholdMillis=1500
consumer.timeoutThresholdMillis=20000
consumer.timeoutThresholdMillis=100000

#producer.type=NOOP
#mediator.type=NOOP
Expand Down
4 changes: 3 additions & 1 deletion settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ plugins {
id("com.gradle.enterprise").version("3.15.1")
}

includeBuild("/Users/mattpeterson/repos/pbj/pbj-core")

// Include the subprojects
include(":common")
include(":suites")
Expand Down Expand Up @@ -84,7 +86,7 @@ dependencyResolutionManagement {
version("com.google.protobuf", protobufVersion)
version("com.google.protobuf.util", protobufVersion)

var pbjVersion = "0.9.10"
var pbjVersion = "0.9.10-SNAPSHOT"

// PBJ dependencies
plugin("pbj", "com.hedera.pbj.pbj-compiler").version(pbjVersion)
Expand Down
6 changes: 3 additions & 3 deletions simulator/src/main/resources/app.properties
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ generator.folderRootPath=/Users/mattpeterson/Downloads/BN_Test_Data/block-0.0.3
generator.managerImplementation=BlockAsFileLargeDataSets

# Optional range configuration
#generator.startBlockNumber=10
#generator.endBlockNumber=10
generator.startBlockNumber=200
generator.endBlockNumber=3000

#blockStream.maxBlockItemsToStream=100_000_000
blockStream.streamingMode=MILLIS_PER_BLOCK
blockStream.millisecondsPerBlock=20
blockStream.millisecondsPerBlock=200
#blockStream.blockItemsBatchSize=1_000
#blockStream.delayBetweenBlockItems=3_000_000

Expand Down

0 comments on commit b527915

Please sign in to comment.