Skip to content

Commit

Permalink
Add "cancelled" event to the gRPC observation instrumentation (#5137)
Browse files Browse the repository at this point in the history
Closes #5109
  • Loading branch information
ttddyy authored Jun 11, 2024
1 parent 6e7c863 commit f4be539
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ public String getName() {
return "sent";
}

},
CANCELLED {
@Override
public String getName() {
return "cancelled";
}
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public void onHalfClose() {

@Override
public void onCancel() {
this.observation.event(GrpcServerEvents.CANCELLED);
try (Scope scope = this.observation.openScope()) {
super.onCancel();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,55 @@ void observationShouldBeCapturedByInterceptor() {

}

@Nested
class ClientInterruption {

private ClientInterruptionAwareService service = new ClientInterruptionAwareService();

@BeforeEach
void setUpService() throws Exception {
server = InProcessServerBuilder.forName("sample").addService(service).intercept(serverInterceptor).build();
server.start();

channel = InProcessChannelBuilder.forName("sample").intercept(clientInterceptor).build();
}

@Test
void cancel() {
SimpleServiceFutureStub stub = SimpleServiceGrpc.newFutureStub(channel);
SimpleRequest request = SimpleRequest.newBuilder().setRequestMessage("Hello").build();
ListenableFuture<SimpleResponse> future = stub.unaryRpc(request);

await().untilTrue(this.service.requestReceived);
future.cancel(true);
this.service.requestInterrupted.set(true);
await().until(future::isDone);
assertThat(future.isCancelled()).isTrue();
TestObservationRegistryAssert.assertThat(observationRegistry)
.hasAnObservation(observationContextAssert -> observationContextAssert.hasNameEqualTo("grpc.client")
.hasLowCardinalityKeyValue("grpc.status_code", "CANCELLED"));
assertThat(serverHandler.getEvents()).contains(GrpcServerEvents.CANCELLED);
}

@Test
void shutdown() {
SimpleServiceFutureStub stub = SimpleServiceGrpc.newFutureStub(channel);
SimpleRequest request = SimpleRequest.newBuilder().setRequestMessage("Hello").build();
ListenableFuture<SimpleResponse> future = stub.unaryRpc(request);

await().untilTrue(this.service.requestReceived);
channel.shutdownNow(); // shutdown client while server is processing
this.service.requestInterrupted.set(true);
await().until(channel::isTerminated);
await().until(future::isDone);
TestObservationRegistryAssert.assertThat(observationRegistry)
.hasAnObservation(observationContextAssert -> observationContextAssert.hasNameEqualTo("grpc.client")
.hasLowCardinalityKeyValue("grpc.status_code", "UNAVAILABLE"));
assertThat(serverHandler.getEvents()).contains(GrpcServerEvents.CANCELLED);
}

}

// perform server context verification on basic information
void verifyServerContext(String serviceName, String methodName, String contextualName, MethodType methodType) {
assertThat(serverHandler.getContext()).isNotNull().satisfies((serverContext) -> {
Expand Down Expand Up @@ -666,6 +715,25 @@ public StreamObserver<SimpleRequest> bidiStreamingRpc(StreamObserver<SimpleRespo

}

static class ClientInterruptionAwareService extends SimpleServiceImplBase {

AtomicBoolean requestReceived = new AtomicBoolean();

AtomicBoolean requestInterrupted = new AtomicBoolean();

@Override
public void unaryRpc(SimpleRequest request, StreamObserver<SimpleResponse> responseObserver) {
this.requestReceived.set(true);
SimpleResponse response = SimpleResponse.newBuilder()
.setResponseMessage(request.getRequestMessage())
.build();
await().untilTrue(this.requestInterrupted);
responseObserver.onNext(response);
responseObserver.onCompleted();
}

}

// Hold reference to the Context and Events happened in ObservationHandler
static class ContextAndEventHoldingObservationHandler<T extends Observation.Context>
implements ObservationHandler<T> {
Expand Down

0 comments on commit f4be539

Please sign in to comment.