Skip to content

Commit

Permalink
fix: consolidated logic to stop processing for by producer and consum…
Browse files Browse the repository at this point in the history
…er observers. Added comments

Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Nov 26, 2024
1 parent 0b8301c commit e3fdae4
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,11 @@ public void onEvent(
// or closed the stream.
if (isResponsePermitted.get()) {
if (isTimeoutExpired()) {
subscriptionHandler.unsubscribe(this);
stopProcessing();

// Notify the Helidon observer that we've
// stopped processing the stream
subscribeStreamResponseObserver.onComplete();
LOGGER.log(DEBUG, "Producer liveness timeout. Unsubscribed ConsumerBlockItemObserver.");
} else {
// Refresh the producer liveness and pass the BlockItem to the downstream observer.
Expand All @@ -121,10 +125,17 @@ public void onEvent(
try {
responseSender.send(subscribeStreamResponse);
} catch (IllegalArgumentException e) {
// Bubble protocol exceptions up to a higher
// layer where we can broadcast a uniform
// message to terminate all connections
// and shut the server down.
throw e;
} catch (RuntimeException e) {
isResponsePermitted.set(false);
subscriptionHandler.unsubscribe(this);
// RuntimeExceptions at this layer will almost
// always be wrapped SocketExceptions from individual
// clients disconnecting from the server streaming
// service. This should be happening all the time.
stopProcessing();
LOGGER.log(
DEBUG,
"RuntimeException caught from Pipeline instance. Unsubscribed ConsumerBlockItemObserver instance");
Expand All @@ -145,11 +156,16 @@ private ResponseSender getResponseSender(@NonNull final SubscribeStreamResponseU
subscribeStreamResponse.response();
return switch (responseType.kind()) {
case STATUS -> {
isResponsePermitted.set(false);
subscriptionHandler.unsubscribe(this);
// Per the spec, status messages signal
// the end of processing. Unsubscribe this
// observer and send a message back to the
// client
stopProcessing();
yield statusResponseSender;
}
case BLOCK_ITEMS -> blockItemsResponseSender;
// An unknown response type here is a protocol violation
// and should shut down the server.
default -> throw new IllegalArgumentException("Unknown response type: " + responseType.kind());
};
}
Expand Down Expand Up @@ -198,4 +214,9 @@ public void send(@NonNull final SubscribeStreamResponseUnparsed subscribeStreamR
subscribeStreamResponseObserver.onComplete();
}
}

private void stopProcessing() {
isResponsePermitted.set(false);
subscriptionHandler.unsubscribe(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,10 @@ public void onNext(@NonNull final List<BlockItemUnparsed> blockItems) {

} else {
LOGGER.log(ERROR, getClass().getName() + " is not accepting BlockItems");
stopProcessing();

// Close the upstream connection to the producer(s)
final var errorResponse = buildErrorStreamResponse();

isResponsePermitted.set(false);
subscriptionHandler.unsubscribe(this);
publishStreamResponseObserver.onNext(errorResponse);
publishStreamResponseObserver.onNext(buildErrorStreamResponse());
LOGGER.log(ERROR, "Error PublishStreamResponse sent to upstream producer");
}
}
Expand All @@ -149,8 +146,7 @@ public void onEvent(ObjectEvent<PublishStreamResponse> event, long sequence, boo

if (isResponsePermitted.get()) {
if (isTimeoutExpired()) {
isResponsePermitted.set(false);
subscriptionHandler.unsubscribe(this);
stopProcessing();
LOGGER.log(DEBUG, "Producer liveness timeout. Unsubscribed ProducerBlockItemObserver.");
} else {
LOGGER.log(DEBUG, "Publishing response to upstream producer: " + publishStreamResponseObserver);
Expand All @@ -177,10 +173,8 @@ private static PublishStreamResponse buildErrorStreamResponse() {
*/
@Override
public void onError(@NonNull final Throwable t) {
stopProcessing();
LOGGER.log(ERROR, "onError method invoked with an exception: ", t);

isResponsePermitted.set(false);
subscriptionHandler.unsubscribe(this);
LOGGER.log(ERROR, "Producer cancelled the stream. Observer unsubscribed.");
}

Expand All @@ -190,8 +184,7 @@ public void onError(@NonNull final Throwable t) {
*/
@Override
public void onComplete() {
isResponsePermitted.set(false);
subscriptionHandler.unsubscribe(this);
stopProcessing();
LOGGER.log(DEBUG, "Producer completed the stream. Observer unsubscribed.");
}

Expand All @@ -202,8 +195,12 @@ public boolean isTimeoutExpired() {

@Override
public void clientEndStreamReceived() {
stopProcessing();
LOGGER.log(DEBUG, "Producer cancelled the stream. Observer unsubscribed.");
}

private void stopProcessing() {
isResponsePermitted.set(false);
subscriptionHandler.unsubscribe(this);
LOGGER.log(DEBUG, "Producer cancelled the stream. Observer unsubscribed.");
}
}

0 comments on commit e3fdae4

Please sign in to comment.