diff --git a/server/src/main/java/com/hedera/block/server/consumer/ConsumerStreamResponseObserver.java b/server/src/main/java/com/hedera/block/server/consumer/ConsumerStreamResponseObserver.java index 11a204416..ba7902fce 100644 --- a/server/src/main/java/com/hedera/block/server/consumer/ConsumerStreamResponseObserver.java +++ b/server/src/main/java/com/hedera/block/server/consumer/ConsumerStreamResponseObserver.java @@ -110,7 +110,11 @@ public void onEvent( // or closed the stream. if (isResponsePermitted.get()) { if (isTimeoutExpired()) { - subscriptionHandler.unsubscribe(this); + stopProcessing(); + + // Notify the Helidon observer that we've + // stopped processing the stream + subscribeStreamResponseObserver.onComplete(); LOGGER.log(DEBUG, "Producer liveness timeout. Unsubscribed ConsumerBlockItemObserver."); } else { // Refresh the producer liveness and pass the BlockItem to the downstream observer. @@ -121,10 +125,17 @@ public void onEvent( try { responseSender.send(subscribeStreamResponse); } catch (IllegalArgumentException e) { + // Bubble protocol exceptions up to a higher + // layer where we can broadcast a uniform + // message to terminate all connections + // and shut the server down. throw e; } catch (RuntimeException e) { - isResponsePermitted.set(false); - subscriptionHandler.unsubscribe(this); + // RuntimeExceptions at this layer will almost + // always be wrapped SocketExceptions from individual + // clients disconnecting from the server streaming + // service. This should be happening all the time. + stopProcessing(); LOGGER.log( DEBUG, "RuntimeException caught from Pipeline instance. Unsubscribed ConsumerBlockItemObserver instance"); @@ -145,11 +156,16 @@ private ResponseSender getResponseSender(@NonNull final SubscribeStreamResponseU subscribeStreamResponse.response(); return switch (responseType.kind()) { case STATUS -> { - isResponsePermitted.set(false); - subscriptionHandler.unsubscribe(this); + // Per the spec, status messages signal + // the end of processing. Unsubscribe this + // observer and send a message back to the + // client + stopProcessing(); yield statusResponseSender; } case BLOCK_ITEMS -> blockItemsResponseSender; + // An unknown response type here is a protocol violation + // and should shut down the server. default -> throw new IllegalArgumentException("Unknown response type: " + responseType.kind()); }; } @@ -198,4 +214,9 @@ public void send(@NonNull final SubscribeStreamResponseUnparsed subscribeStreamR subscribeStreamResponseObserver.onComplete(); } } + + private void stopProcessing() { + isResponsePermitted.set(false); + subscriptionHandler.unsubscribe(this); + } } diff --git a/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java b/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java index ae7032cb5..21e1bd2af 100644 --- a/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java +++ b/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java @@ -133,13 +133,10 @@ public void onNext(@NonNull final List blockItems) { } else { LOGGER.log(ERROR, getClass().getName() + " is not accepting BlockItems"); + stopProcessing(); // Close the upstream connection to the producer(s) - final var errorResponse = buildErrorStreamResponse(); - - isResponsePermitted.set(false); - subscriptionHandler.unsubscribe(this); - publishStreamResponseObserver.onNext(errorResponse); + publishStreamResponseObserver.onNext(buildErrorStreamResponse()); LOGGER.log(ERROR, "Error PublishStreamResponse sent to upstream producer"); } } @@ -149,8 +146,7 @@ public void onEvent(ObjectEvent event, long sequence, boo if (isResponsePermitted.get()) { if (isTimeoutExpired()) { - isResponsePermitted.set(false); - subscriptionHandler.unsubscribe(this); + stopProcessing(); LOGGER.log(DEBUG, "Producer liveness timeout. Unsubscribed ProducerBlockItemObserver."); } else { LOGGER.log(DEBUG, "Publishing response to upstream producer: " + publishStreamResponseObserver); @@ -177,10 +173,8 @@ private static PublishStreamResponse buildErrorStreamResponse() { */ @Override public void onError(@NonNull final Throwable t) { + stopProcessing(); LOGGER.log(ERROR, "onError method invoked with an exception: ", t); - - isResponsePermitted.set(false); - subscriptionHandler.unsubscribe(this); LOGGER.log(ERROR, "Producer cancelled the stream. Observer unsubscribed."); } @@ -190,8 +184,7 @@ public void onError(@NonNull final Throwable t) { */ @Override public void onComplete() { - isResponsePermitted.set(false); - subscriptionHandler.unsubscribe(this); + stopProcessing(); LOGGER.log(DEBUG, "Producer completed the stream. Observer unsubscribed."); } @@ -202,8 +195,12 @@ public boolean isTimeoutExpired() { @Override public void clientEndStreamReceived() { + stopProcessing(); + LOGGER.log(DEBUG, "Producer cancelled the stream. Observer unsubscribed."); + } + + private void stopProcessing() { isResponsePermitted.set(false); subscriptionHandler.unsubscribe(this); - LOGGER.log(DEBUG, "Producer cancelled the stream. Observer unsubscribed."); } }