From e16b79e32f6cc09559745530ec919a680f88d09c Mon Sep 17 00:00:00 2001 From: Polapragada Yashwant <155515534+thugrock7@users.noreply.github.com> Date: Tue, 10 Sep 2024 12:29:00 +0000 Subject: [PATCH] OkHTTP3 - fix response charset and adding support for gzip compression (#405) * fix response charset and adding support for gzip compression * adding tests * adding support for other clients * fixing apacheasync http client test * disabling flaky tests --- .../ApacheAsyncClientGzipHandlingTest.java | 124 +++++++++++++ .../v4_0/ApacheHttpClientUtils.java | 51 ++++-- .../v4_0/HttpEntityInstrumentation.java | 8 +- .../InputStreamInstrumentationModule.java | 3 +- .../java/inputstream/InputStreamUtils.java | 65 ++++++- .../MicronautClientInstrumentationTest.java | 38 ++++ .../MicronautInstrumentationTest.java | 34 ++++ .../hypertrace/micronaut/TestController.java | 18 ++ .../MicronautClientInstrumentationTest.java | 38 ++++ .../v3/MicronautInstrumentationTest.java | 34 ++++ .../micronaut/v3/TestController.java | 18 ++ .../hypertrace/netty/v4_0/AttributeKeys.java | 10 + .../netty/v4_0/DataCaptureUtils.java | 51 +++++- .../HttpClientRequestTracingHandler.java | 9 +- .../HttpClientResponseTracingHandler.java | 18 +- .../HttpServerRequestTracingHandler.java | 9 +- .../HttpServerResponseTracingHandler.java | 19 +- ...tractNetty40ServerInstrumentationTest.java | 34 ++++ .../netty/v4_0/server/NettyTestServer.java | 29 +++ .../hypertrace/netty/v4_1/AttributeKeys.java | 8 + .../netty/v4_1/DataCaptureUtils.java | 50 ++++- .../HttpClientRequestTracingHandler.java | 9 +- .../HttpClientResponseTracingHandler.java | 18 +- .../HttpServerRequestTracingHandler.java | 9 +- .../HttpServerResponseTracingHandler.java | 20 +- ...tractNetty41ServerInstrumentationTest.java | 34 ++++ .../netty/v4_1/server/NettyTestServer.java | 29 +++ .../okhttp/v3_0/OkHttpTracingInterceptor.java | 44 ++++- .../v3_0/OkHttpTracingInterceptorTest.java | 10 +- .../Servlet30InstrumentationTest.java | 44 +++++ .../servlet/v3_0/nowrapping/TestServlets.java | 18 ++ .../v5_0/Servlet50InstrumentationTest.java | 45 +++++ .../hypertrace/servlet/v5_0/TestServlets.java | 18 ++ .../v3_0/rw/ServletRWInstrumentationTest.java | 46 +++++ .../servlet/v3_0/rw/TestServlets.java | 17 ++ .../SparkJavaInstrumentationTest.java | 63 +++++++ .../hypertrace/undertow/v1_4/utils/Utils.java | 5 +- .../v1_4/UndertowInstrumentationTest.java | 90 ++++++++- .../vertx/HttpResponseInstrumentation.java | 6 +- .../vertx/ResponseBodyWrappingHandler.java | 43 ++++- .../VertxClientInstrumentationPostTests.java | 171 ++++++++++++++++++ .../vertx/VertxClientInstrumentationTest.java | 110 ----------- .../vertx/VertxServerInstrumentationTest.java | 37 ++++ .../hypertrace/vertx/VertxWebServer.java | 33 ++++ .../HypertraceSemanticAttributes.java | 3 + .../core/instrumentation/SpanAndBuffer.java | 5 +- .../utils/ContentTypeCharsetUtils.java | 4 + .../agent/testing/AbstractHttpClientTest.java | 52 +++++- .../agent/testing/TestHttpServer.java | 36 ++++ 49 files changed, 1515 insertions(+), 172 deletions(-) create mode 100644 instrumentation/apache-httpasyncclient-4.1/src/test/java/io/opentelemetry/instrumentation/hypertrace/apachehttpasyncclient/ApacheAsyncClientGzipHandlingTest.java create mode 100644 instrumentation/vertx/vertx-web-3.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/vertx/VertxClientInstrumentationPostTests.java diff --git a/instrumentation/apache-httpasyncclient-4.1/src/test/java/io/opentelemetry/instrumentation/hypertrace/apachehttpasyncclient/ApacheAsyncClientGzipHandlingTest.java b/instrumentation/apache-httpasyncclient-4.1/src/test/java/io/opentelemetry/instrumentation/hypertrace/apachehttpasyncclient/ApacheAsyncClientGzipHandlingTest.java new file mode 100644 index 000000000..58fb7a87f --- /dev/null +++ b/instrumentation/apache-httpasyncclient-4.1/src/test/java/io/opentelemetry/instrumentation/hypertrace/apachehttpasyncclient/ApacheAsyncClientGzipHandlingTest.java @@ -0,0 +1,124 @@ +/* + * Copyright The Hypertrace Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opentelemetry.instrumentation.hypertrace.apachehttpasyncclient; + +import io.opentelemetry.proto.trace.v1.Span; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeoutException; +import java.util.zip.GZIPInputStream; +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; +import org.apache.http.impl.nio.client.HttpAsyncClients; +import org.hypertrace.agent.testing.AbstractInstrumenterTest; +import org.hypertrace.agent.testing.TestHttpServer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +class ApacheAsyncClientGzipHandlingTest extends AbstractInstrumenterTest { + + private static final TestHttpServer testHttpServer = new TestHttpServer(); + + private static final CloseableHttpAsyncClient client = HttpAsyncClients.createDefault(); + + @BeforeAll + public static void startServer() throws Exception { + testHttpServer.start(); + client.start(); + } + + @AfterAll + public static void closeServer() throws Exception { + testHttpServer.close(); + } + + @Test + public void getGzipResponse() + throws ExecutionException, InterruptedException, TimeoutException, IOException { + HttpGet getRequest = + new HttpGet(String.format("http://localhost:%s/gzip", testHttpServer.port())); + getRequest.addHeader("foo", "bar"); + Future futureResponse = + client.execute( + getRequest, new ApacheAsyncClientInstrumentationModuleTest.NoopFutureCallback()); + + HttpResponse response = futureResponse.get(); + Assertions.assertEquals(200, response.getStatusLine().getStatusCode()); + try (InputStream gzipStream = new GZIPInputStream(response.getEntity().getContent())) { + String responseBody = readInputStream(gzipStream); + Assertions.assertEquals(TestHttpServer.GzipHandler.RESPONSE_BODY, responseBody); + } + + TEST_WRITER.waitForTraces(1); + // exclude server spans + List> traces = + TEST_WRITER.waitForSpans( + 2, + span -> + span.getKind().equals(Span.SpanKind.SPAN_KIND_SERVER) + || span.getAttributesList().stream() + .noneMatch( + keyValue -> + keyValue.getKey().equals("http.response.header.content-encoding") + && keyValue.getValue().getStringValue().contains("gzip"))); + Assertions.assertEquals(1, traces.size()); + Assertions.assertEquals(2, traces.get(0).size()); + Span clientSpan = traces.get(0).get(1); + Span responseBodySpan = traces.get(0).get(0); + if (traces.get(0).get(0).getKind().equals(Span.SpanKind.SPAN_KIND_CLIENT)) { + clientSpan = traces.get(0).get(0); + responseBodySpan = traces.get(0).get(1); + } + + Assertions.assertEquals( + "test-value", + TEST_WRITER + .getAttributesMap(clientSpan) + .get("http.response.header.test-response-header") + .getStringValue()); + Assertions.assertEquals( + "bar", + TEST_WRITER.getAttributesMap(clientSpan).get("http.request.header.foo").getStringValue()); + Assertions.assertNull(TEST_WRITER.getAttributesMap(clientSpan).get("http.request.body")); + + Assertions.assertEquals( + TestHttpServer.GzipHandler.RESPONSE_BODY, + TEST_WRITER.getAttributesMap(responseBodySpan).get("http.response.body").getStringValue()); + } + + private String readInputStream(InputStream inputStream) throws IOException { + StringBuilder textBuilder = new StringBuilder(); + + try (BufferedReader reader = + new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) { + int c; + while ((c = reader.read()) != -1) { + textBuilder.append((char) c); + } + } + return textBuilder.toString(); + } +} diff --git a/instrumentation/apache-httpclient-4.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/apachehttpclient/v4_0/ApacheHttpClientUtils.java b/instrumentation/apache-httpclient-4.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/apachehttpclient/v4_0/ApacheHttpClientUtils.java index 83c02d2ac..8e49bc0b5 100644 --- a/instrumentation/apache-httpclient-4.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/apachehttpclient/v4_0/ApacheHttpClientUtils.java +++ b/instrumentation/apache-httpclient-4.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/apachehttpclient/v4_0/ApacheHttpClientUtils.java @@ -20,9 +20,12 @@ import io.opentelemetry.api.trace.Span; import io.opentelemetry.javaagent.instrumentation.hypertrace.apachehttpclient.v4_0.ApacheHttpClientObjectRegistry.SpanAndAttributeKey; import java.io.IOException; -import java.io.UnsupportedEncodingException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; import java.nio.charset.Charset; import java.util.function.Function; +import java.util.zip.GZIPInputStream; import org.apache.http.Header; import org.apache.http.HeaderIterator; import org.apache.http.HttpEntity; @@ -100,26 +103,32 @@ public static void traceEntity( if (contentType == null || !ContentTypeUtils.shouldCapture(contentType.getValue())) { return; } - String charsetStr = ContentTypeUtils.parseCharset(contentType.getValue()); Charset charset = ContentTypeCharsetUtils.toCharset(charsetStr); - + // Get the content encoding header and check if it's gzip + Header contentEncoding = entity.getContentEncoding(); + boolean isGzipEncoded = + contentEncoding != null + && contentEncoding.getValue() != null + && contentEncoding.getValue().toLowerCase().contains("gzip"); if (entity.isRepeatable()) { try { - BoundedByteArrayOutputStream byteArrayOutputStream = - BoundedBuffersFactory.createStream(charset); - entity.writeTo(byteArrayOutputStream); - - try { - String body = byteArrayOutputStream.toStringWithSuppliedCharset(); - span.setAttribute(bodyAttributeKey, body); - } catch (UnsupportedEncodingException e) { - log.error("Could not parse charset from encoding {}", charsetStr, e); + InputStream contentStream = entity.getContent(); + if (isGzipEncoded) { + try { + contentStream = new GZIPInputStream(contentStream); + } catch (IOException e) { + log.error("Failed to create GZIPInputStream", e); + return; + } } + + String body = readInputStream(contentStream, charset); + span.setAttribute(bodyAttributeKey, body); + } catch (IOException e) { - log.error("Could not read request input stream from repeatable request entity/body", e); + throw new RuntimeException(e); } - return; } @@ -133,4 +142,18 @@ public static void traceEntity( ApacheHttpClientObjectRegistry.entityToSpan.put( entity, new SpanAndAttributeKey(span, bodyAttributeKey)); } + + public static String readInputStream(InputStream inputStream, Charset charset) + throws IOException { + BoundedByteArrayOutputStream outputStream = BoundedBuffersFactory.createStream(charset); + try (InputStreamReader reader = new InputStreamReader(inputStream, charset); + OutputStreamWriter writer = new OutputStreamWriter(outputStream, charset)) { + int c; + while ((c = reader.read()) != -1) { + writer.write(c); + } + writer.flush(); + } + return outputStream.toStringWithSuppliedCharset(); + } } diff --git a/instrumentation/apache-httpclient-4.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/apachehttpclient/v4_0/HttpEntityInstrumentation.java b/instrumentation/apache-httpclient-4.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/apachehttpclient/v4_0/HttpEntityInstrumentation.java index 5ea2bf410..ce9a0abfb 100644 --- a/instrumentation/apache-httpclient-4.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/apachehttpclient/v4_0/HttpEntityInstrumentation.java +++ b/instrumentation/apache-httpclient-4.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/apachehttpclient/v4_0/HttpEntityInstrumentation.java @@ -92,12 +92,18 @@ public static void exit(@Advice.This HttpEntity thizz, @Advice.Return InputStrea } Charset charset = ContentTypeCharsetUtils.toCharset(charsetStr); + String contentEncoding = null; + Header contentEncodingHeader = thizz.getContentEncoding(); + if (contentEncodingHeader != null) { + contentEncoding = contentEncodingHeader.getValue(); + } SpanAndBuffer spanAndBuffer = new SpanAndBuffer( clientSpan.span, BoundedBuffersFactory.createStream((int) contentSize, charset), clientSpan.attributeKey, - charset); + charset, + contentEncoding); VirtualField.find(InputStream.class, SpanAndBuffer.class).set(inputStream, spanAndBuffer); } } diff --git a/instrumentation/java-streams/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/java/inputstream/InputStreamInstrumentationModule.java b/instrumentation/java-streams/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/java/inputstream/InputStreamInstrumentationModule.java index 6e995e8cb..6bc79ccd2 100644 --- a/instrumentation/java-streams/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/java/inputstream/InputStreamInstrumentationModule.java +++ b/instrumentation/java-streams/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/java/inputstream/InputStreamInstrumentationModule.java @@ -268,7 +268,8 @@ public static void exit(@Advice.This InputStream thizz, @Advice.Return int avail spanAndBuffer.span, spanAndBuffer.attributeKey, spanAndBuffer.byteArrayBuffer, - spanAndBuffer.charset); + spanAndBuffer.charset, + spanAndBuffer.contentEncoding); contextStore.set(thizz, null); } } diff --git a/instrumentation/java-streams/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/java/inputstream/InputStreamUtils.java b/instrumentation/java-streams/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/java/inputstream/InputStreamUtils.java index d34360676..61ef80013 100644 --- a/instrumentation/java-streams/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/java/inputstream/InputStreamUtils.java +++ b/instrumentation/java-streams/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/java/inputstream/InputStreamUtils.java @@ -23,16 +23,22 @@ import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.api.util.VirtualField; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; import java.io.UnsupportedEncodingException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.nio.charset.Charset; +import java.util.zip.GZIPInputStream; import org.hypertrace.agent.core.instrumentation.HypertraceCallDepthThreadLocalMap; import org.hypertrace.agent.core.instrumentation.HypertraceSemanticAttributes; import org.hypertrace.agent.core.instrumentation.SpanAndBuffer; +import org.hypertrace.agent.core.instrumentation.buffer.BoundedBuffersFactory; +import org.hypertrace.agent.core.instrumentation.buffer.BoundedByteArrayOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,6 +91,13 @@ public static void addAttribute(Span span, AttributeKey attributeKey, St spanBuilder.setAttribute( "http.response.header.content-type", (String) resContentType); } + Object resContentEncoding = + getAttribute.invoke( + span, HypertraceSemanticAttributes.HTTP_RESPONSE_HEADER_CONTENT_ENCODING); + if (resContentEncoding != null) { + spanBuilder.setAttribute( + "http.response.header.content-encoding", (String) resContentEncoding); + } } } catch (IllegalAccessException | InvocationTargetException e) { // ignore and continue @@ -100,12 +113,31 @@ public static void addAttribute(Span span, AttributeKey attributeKey, St } public static void addBody( - Span span, AttributeKey attributeKey, ByteArrayOutputStream buffer, Charset charset) { + Span span, + AttributeKey attributeKey, + ByteArrayOutputStream buffer, + Charset charset, + String contentEncoding) { try { - String body = buffer.toString(charset.name()); - InputStreamUtils.addAttribute(span, attributeKey, body); + byte[] data = buffer.toByteArray(); + + // if content-encoding is gzip, + if (contentEncoding != null && contentEncoding.toLowerCase().contains("gzip")) { + try (GZIPInputStream gzipInputStream = + new GZIPInputStream(new ByteArrayInputStream(data))) { + InputStreamReader reader = new InputStreamReader(gzipInputStream, charset); + String body = readInputStream(reader, charset); + InputStreamUtils.addAttribute(span, attributeKey, body); + } + } else { + // No decompression needed, convert directly to string + String body = new String(data, charset); + InputStreamUtils.addAttribute(span, attributeKey, body); + } } catch (UnsupportedEncodingException e) { - log.error("Failed to parse encofing from charset {}", charset, e); + log.error("Failed to parse encoding from charset {}", charset, e); + } catch (IOException e) { + log.error("Failed to read or decompress data", e); } } @@ -132,7 +164,8 @@ public static void read( spanAndBuffer.span, spanAndBuffer.attributeKey, spanAndBuffer.byteArrayBuffer, - spanAndBuffer.charset); + spanAndBuffer.charset, + spanAndBuffer.contentEncoding); contextStore.set(inputStream, null); } } @@ -146,7 +179,8 @@ public static void read( spanAndBuffer.span, spanAndBuffer.attributeKey, spanAndBuffer.byteArrayBuffer, - spanAndBuffer.charset); + spanAndBuffer.charset, + spanAndBuffer.contentEncoding); VirtualField.find(InputStream.class, SpanAndBuffer.class).set(inputStream, null); } } @@ -166,7 +200,8 @@ public static void read( spanAndBuffer.span, spanAndBuffer.attributeKey, spanAndBuffer.byteArrayBuffer, - spanAndBuffer.charset); + spanAndBuffer.charset, + spanAndBuffer.contentEncoding); contextStore.set(inputStream, null); } } @@ -194,10 +229,24 @@ public static void readNBytes( spanAndBuffer.span, spanAndBuffer.attributeKey, spanAndBuffer.byteArrayBuffer, - spanAndBuffer.charset); + spanAndBuffer.charset, + spanAndBuffer.contentEncoding); contextStore.set(inputStream, null); } else { spanAndBuffer.byteArrayBuffer.write(b, off, read); } } + + public static String readInputStream(InputStreamReader inputReader, Charset charset) + throws IOException { + BoundedByteArrayOutputStream outputStream = BoundedBuffersFactory.createStream(charset); + try (OutputStreamWriter writer = new OutputStreamWriter(outputStream, charset)) { + int c; + while ((c = inputReader.read()) != -1) { + writer.write(c); + } + writer.flush(); + } + return outputStream.toStringWithSuppliedCharset(); + } } diff --git a/instrumentation/micronaut-1.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/micronaut/MicronautClientInstrumentationTest.java b/instrumentation/micronaut-1.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/micronaut/MicronautClientInstrumentationTest.java index a5c698734..8f236f5cd 100644 --- a/instrumentation/micronaut-1.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/micronaut/MicronautClientInstrumentationTest.java +++ b/instrumentation/micronaut-1.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/micronaut/MicronautClientInstrumentationTest.java @@ -22,6 +22,7 @@ import io.micronaut.http.client.annotation.Client; import io.micronaut.test.annotation.MicronautTest; import io.opentelemetry.proto.trace.v1.Span; +import java.io.IOException; import java.util.List; import java.util.concurrent.TimeoutException; import javax.inject.Inject; @@ -135,4 +136,41 @@ public void post() throws InterruptedException, TimeoutException { TEST_WRITER.getAttributesMap(clientSpan).get("http.request.body").getStringValue()); Assertions.assertNull(TEST_WRITER.getAttributesMap(clientSpan).get("http.response.body")); } + + @Test + public void getGzipResponse() throws TimeoutException, InterruptedException, IOException { + String retrieve = + client + .toBlocking() + .retrieve( + HttpRequest.GET(String.format("http://localhost:%d/gzip", testHttpServer.port())) + .header(REQUEST_HEADER_NAME, REQUEST_HEADER_VALUE)); + Assertions.assertEquals("{\"message\": \"hello\"}", retrieve); + + TEST_WRITER.waitForTraces(1); + List> traces = + TEST_WRITER.waitForSpans( + 1, + span -> + !span.getKind().equals(Span.SpanKind.SPAN_KIND_CLIENT) + || span.getAttributesList().stream() + .noneMatch( + keyValue -> + keyValue.getKey().equals("http.url") + && keyValue.getValue().getStringValue().contains("/gzip"))); + Assertions.assertEquals(1, traces.size()); + Assertions.assertEquals(1, traces.get(0).size()); + Span clientSpan = traces.get(0).get(0); + Assertions.assertEquals( + REQUEST_HEADER_VALUE, + TEST_WRITER + .getAttributesMap(clientSpan) + .get("http.request.header." + REQUEST_HEADER_NAME) + .getStringValue()); + Assertions.assertNull(TEST_WRITER.getAttributesMap(clientSpan).get("http.request.body")); + + String respBodyCapturedInSpan = + TEST_WRITER.getAttributesMap(clientSpan).get("http.response.body").getStringValue(); + Assertions.assertEquals("{\"message\": \"hello\"}", respBodyCapturedInSpan); + } } diff --git a/instrumentation/micronaut-1.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/micronaut/MicronautInstrumentationTest.java b/instrumentation/micronaut-1.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/micronaut/MicronautInstrumentationTest.java index 3288765a3..3bbeb51ea 100644 --- a/instrumentation/micronaut-1.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/micronaut/MicronautInstrumentationTest.java +++ b/instrumentation/micronaut-1.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/micronaut/MicronautInstrumentationTest.java @@ -198,4 +198,38 @@ public void blocking() throws IOException, TimeoutException, InterruptedExceptio .get("http.response.header." + TestController.RESPONSE_HEADER_NAME)); Assertions.assertNull(TEST_WRITER.getAttributesMap(span).get("http.response.body")); } + + @Test + public void getGzipResponse() throws TimeoutException, InterruptedException, IOException { + Request request = + new Request.Builder() + .url(String.format("http://localhost:%d/get_gzip", server.getPort())) + .header(REQUEST_HEADER_NAME, REQUEST_HEADER_VALUE) + .get() + .build(); + + Response response = httpClient.newCall(request).execute(); + Assertions.assertEquals(200, response.code()); + + String responseBody = response.body().string(); + Assertions.assertEquals("{\"message\": \"hello\"}", responseBody); + + TEST_WRITER.waitForTraces(1); + List> traces = + TEST_WRITER.waitForSpans(1, span -> span.getKind().equals(Span.SpanKind.SPAN_KIND_CLIENT)); + Assertions.assertEquals(1, traces.size()); + Assertions.assertEquals(1, traces.get(0).size()); + Span clientSpan = traces.get(0).get(0); + Assertions.assertEquals( + REQUEST_HEADER_VALUE, + TEST_WRITER + .getAttributesMap(clientSpan) + .get("http.request.header." + REQUEST_HEADER_NAME) + .getStringValue()); + Assertions.assertNull(TEST_WRITER.getAttributesMap(clientSpan).get("http.request.body")); + + String respBodyCapturedInSpan = + TEST_WRITER.getAttributesMap(clientSpan).get("http.response.body").getStringValue(); + Assertions.assertEquals("{\"message\": \"hello\"}", respBodyCapturedInSpan); + } } diff --git a/instrumentation/micronaut-1.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/micronaut/TestController.java b/instrumentation/micronaut-1.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/micronaut/TestController.java index 42943dc30..96c513a7d 100644 --- a/instrumentation/micronaut-1.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/micronaut/TestController.java +++ b/instrumentation/micronaut-1.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/micronaut/TestController.java @@ -24,9 +24,12 @@ import io.micronaut.http.annotation.Get; import io.micronaut.http.annotation.Post; import io.reactivex.Flowable; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; +import java.util.zip.GZIPOutputStream; @Controller("/") public class TestController { @@ -56,6 +59,21 @@ public Flowable stream() { return Flowable.fromIterable(streamBody()); } + @Get(uri = "/get_gzip") + public HttpResponse getGzip() throws IOException { + String jsonResponse = "{\"message\": \"hello\"}"; + + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + try (GZIPOutputStream gzipOut = new GZIPOutputStream(byteArrayOutputStream)) { + gzipOut.write(jsonResponse.getBytes(StandardCharsets.UTF_8)); + gzipOut.finish(); + } + byte[] gzipData = byteArrayOutputStream.toByteArray(); + return HttpResponse.ok(gzipData) + .contentType("application/json") + .header("Content-Encoding", "gzip"); + } + static List streamBody() { List entities = new ArrayList<>(); for (int i = 0; i < 1000; i++) { diff --git a/instrumentation/micronaut-3.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/micronaut/v3/MicronautClientInstrumentationTest.java b/instrumentation/micronaut-3.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/micronaut/v3/MicronautClientInstrumentationTest.java index 3c29dd98f..a507cf78d 100644 --- a/instrumentation/micronaut-3.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/micronaut/v3/MicronautClientInstrumentationTest.java +++ b/instrumentation/micronaut-3.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/micronaut/v3/MicronautClientInstrumentationTest.java @@ -23,6 +23,7 @@ import io.micronaut.test.extensions.junit5.annotation.MicronautTest; import io.opentelemetry.proto.trace.v1.Span; import jakarta.inject.Inject; +import java.io.IOException; import java.util.List; import java.util.concurrent.TimeoutException; import org.hypertrace.agent.testing.AbstractInstrumenterTest; @@ -127,4 +128,41 @@ public void post() throws InterruptedException, TimeoutException { TEST_WRITER.getAttributesMap(clientSpan).get("http.request.body").getStringValue()); Assertions.assertNull(TEST_WRITER.getAttributesMap(clientSpan).get("http.response.body")); } + + @Test + public void getGzipResponse() throws TimeoutException, InterruptedException, IOException { + String retrieve = + client + .toBlocking() + .retrieve( + HttpRequest.GET(String.format("http://localhost:%d/gzip", testHttpServer.port())) + .header(REQUEST_HEADER_NAME, REQUEST_HEADER_VALUE)); + Assertions.assertEquals("{\"message\": \"hello\"}", retrieve); + + TEST_WRITER.waitForTraces(1); + List> traces = + TEST_WRITER.waitForSpans( + 1, + span -> + !span.getKind().equals(Span.SpanKind.SPAN_KIND_CLIENT) + || span.getAttributesList().stream() + .noneMatch( + keyValue -> + keyValue.getKey().equals("http.url") + && keyValue.getValue().getStringValue().contains("/gzip"))); + Assertions.assertEquals(1, traces.size()); + Assertions.assertEquals(1, traces.get(0).size()); + Span clientSpan = traces.get(0).get(0); + Assertions.assertEquals( + REQUEST_HEADER_VALUE, + TEST_WRITER + .getAttributesMap(clientSpan) + .get("http.request.header." + REQUEST_HEADER_NAME) + .getStringValue()); + Assertions.assertNull(TEST_WRITER.getAttributesMap(clientSpan).get("http.request.body")); + + String respBodyCapturedInSpan = + TEST_WRITER.getAttributesMap(clientSpan).get("http.response.body").getStringValue(); + Assertions.assertEquals("{\"message\": \"hello\"}", respBodyCapturedInSpan); + } } diff --git a/instrumentation/micronaut-3.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/micronaut/v3/MicronautInstrumentationTest.java b/instrumentation/micronaut-3.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/micronaut/v3/MicronautInstrumentationTest.java index b04843f5e..f3eda7103 100644 --- a/instrumentation/micronaut-3.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/micronaut/v3/MicronautInstrumentationTest.java +++ b/instrumentation/micronaut-3.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/micronaut/v3/MicronautInstrumentationTest.java @@ -198,4 +198,38 @@ public void blocking() throws IOException, TimeoutException, InterruptedExceptio .get("http.response.header." + TestController.RESPONSE_HEADER_NAME)); Assertions.assertNull(TEST_WRITER.getAttributesMap(span).get("http.response.body")); } + + @Test + public void getGzipResponse() throws TimeoutException, InterruptedException, IOException { + Request request = + new Request.Builder() + .url(String.format("http://localhost:%d/get_gzip", server.getPort())) + .header(REQUEST_HEADER_NAME, REQUEST_HEADER_VALUE) + .get() + .build(); + + Response response = httpClient.newCall(request).execute(); + Assertions.assertEquals(200, response.code()); + + String responseBody = response.body().string(); + Assertions.assertEquals("{\"message\": \"hello\"}", responseBody); + + TEST_WRITER.waitForTraces(1); + List> traces = + TEST_WRITER.waitForSpans(1, span -> span.getKind().equals(Span.SpanKind.SPAN_KIND_CLIENT)); + Assertions.assertEquals(1, traces.size()); + Assertions.assertEquals(1, traces.get(0).size()); + Span clientSpan = traces.get(0).get(0); + Assertions.assertEquals( + REQUEST_HEADER_VALUE, + TEST_WRITER + .getAttributesMap(clientSpan) + .get("http.request.header." + REQUEST_HEADER_NAME) + .getStringValue()); + Assertions.assertNull(TEST_WRITER.getAttributesMap(clientSpan).get("http.request.body")); + + String respBodyCapturedInSpan = + TEST_WRITER.getAttributesMap(clientSpan).get("http.response.body").getStringValue(); + Assertions.assertEquals("{\"message\": \"hello\"}", respBodyCapturedInSpan); + } } diff --git a/instrumentation/micronaut-3.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/micronaut/v3/TestController.java b/instrumentation/micronaut-3.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/micronaut/v3/TestController.java index 4924fa37e..476746d34 100644 --- a/instrumentation/micronaut-3.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/micronaut/v3/TestController.java +++ b/instrumentation/micronaut-3.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/micronaut/v3/TestController.java @@ -24,9 +24,12 @@ import io.micronaut.http.annotation.Get; import io.micronaut.http.annotation.Post; import io.reactivex.Flowable; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; +import java.util.zip.GZIPOutputStream; @Controller("/") public class TestController { @@ -51,6 +54,21 @@ public HttpResponse post(@Body String body) { .body(RESPONSE_BODY); } + @Get(uri = "/get_gzip") + public HttpResponse getGzip() throws IOException { + String jsonResponse = "{\"message\": \"hello\"}"; + + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + try (GZIPOutputStream gzipOut = new GZIPOutputStream(byteArrayOutputStream)) { + gzipOut.write(jsonResponse.getBytes(StandardCharsets.UTF_8)); + gzipOut.finish(); + } + byte[] gzipData = byteArrayOutputStream.toByteArray(); + return HttpResponse.ok(gzipData) + .contentType("application/json") + .header("Content-Encoding", "gzip"); + } + @Get(value = "/stream", produces = MediaType.APPLICATION_JSON_STREAM) public Flowable stream() { return Flowable.fromIterable(streamBody()); diff --git a/instrumentation/netty/netty-4.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/AttributeKeys.java b/instrumentation/netty/netty-4.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/AttributeKeys.java index fc7685ebb..5d1cdb09c 100644 --- a/instrumentation/netty/netty-4.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/AttributeKeys.java +++ b/instrumentation/netty/netty-4.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/AttributeKeys.java @@ -16,7 +16,9 @@ package io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0; +import io.netty.handler.codec.http.HttpHeaders; import io.netty.util.AttributeKey; +import java.nio.charset.Charset; import java.util.Map; import org.hypertrace.agent.core.instrumentation.HypertraceSemanticAttributes; import org.hypertrace.agent.core.instrumentation.buffer.BoundedByteArrayOutputStream; @@ -34,6 +36,14 @@ public class AttributeKeys { io.opentelemetry.javaagent.instrumentation.netty.v4_0.AttributeKeys.attributeKey( AttributeKeys.class.getName() + ".request-headers"); + public static final AttributeKey RESPONSE_HEADER_CONTENT_ENCODING = + io.opentelemetry.javaagent.instrumentation.netty.v4_0.AttributeKeys.attributeKey( + HypertraceSemanticAttributes.HTTP_RESPONSE_HEADER_CONTENT_ENCODING.getKey()); + + public static final AttributeKey CHARSET = + io.opentelemetry.javaagent.instrumentation.netty.v4_0.AttributeKeys.attributeKey( + HttpHeaders.Values.CHARSET); + public static final AttributeKey REQUEST = io.opentelemetry.javaagent.instrumentation.netty.v4_0.AttributeKeys.attributeKey( "io.opentelemetry.javaagent.instrumentation.netty.v4_0.AttributeKeys.http-server-request"); diff --git a/instrumentation/netty/netty-4.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/DataCaptureUtils.java b/instrumentation/netty/netty-4.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/DataCaptureUtils.java index af6ed1ed2..7b4726928 100644 --- a/instrumentation/netty/netty-4.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/DataCaptureUtils.java +++ b/instrumentation/netty/netty-4.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/DataCaptureUtils.java @@ -24,9 +24,16 @@ import io.netty.util.Attribute; import io.netty.util.AttributeKey; import io.opentelemetry.api.trace.Span; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; -import java.io.UnsupportedEncodingException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.nio.charset.Charset; +import java.util.zip.GZIPInputStream; +import org.hypertrace.agent.core.instrumentation.buffer.BoundedBuffersFactory; import org.hypertrace.agent.core.instrumentation.buffer.BoundedByteArrayOutputStream; +import org.hypertrace.agent.core.instrumentation.utils.ContentTypeCharsetUtils; public class DataCaptureUtils { @@ -36,7 +43,9 @@ public static void captureBody( Span span, Channel channel, AttributeKey attributeKey, - Object httpContentOrBuffer) { + Object httpContentOrBuffer, + String contentEncoding, + Charset charset) { Attribute bufferAttr = channel.attr(attributeKey); BoundedByteArrayOutputStream buffer = bufferAttr.get(); @@ -58,9 +67,24 @@ public static void captureBody( if (httpContentOrBuffer instanceof LastHttpContent) { bufferAttr.remove(); try { - span.setAttribute(attributeKey.name(), buffer.toStringWithSuppliedCharset()); - } catch (UnsupportedEncodingException e) { - // ignore charset was parsed before + byte[] data = buffer.toByteArray(); + + String body; + if (charset == null) { + charset = ContentTypeCharsetUtils.getDefaultCharset(); + } + // Decode based on content encoding + if (contentEncoding != null && contentEncoding.toLowerCase().contains("gzip")) { + try (GZIPInputStream gzipStream = new GZIPInputStream(new ByteArrayInputStream(data))) { + InputStreamReader reader = new InputStreamReader(gzipStream, charset); + body = readInputStream(reader, charset); + } + } else { + body = new String(data, charset); + } + span.setAttribute(attributeKey.name(), body); + } catch (IOException e) { + // eg: unsupported charset } } } @@ -83,4 +107,21 @@ public static CharSequence getContentType(HttpMessage message) { public static CharSequence getContentLength(HttpMessage message) { return message.headers().get("content-length"); } + + public static CharSequence getContentEncoding(HttpMessage message) { + return message.headers().get("content-encoding"); + } + + public static String readInputStream(InputStreamReader inputReader, Charset charset) + throws IOException { + BoundedByteArrayOutputStream outputStream = BoundedBuffersFactory.createStream(charset); + try (OutputStreamWriter writer = new OutputStreamWriter(outputStream, charset)) { + int c; + while ((c = inputReader.read()) != -1) { + writer.write(c); + } + writer.flush(); + } + return outputStream.toStringWithSuppliedCharset(); + } } diff --git a/instrumentation/netty/netty-4.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/client/HttpClientRequestTracingHandler.java b/instrumentation/netty/netty-4.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/client/HttpClientRequestTracingHandler.java index 613376925..4c4af06bd 100644 --- a/instrumentation/netty/netty-4.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/client/HttpClientRequestTracingHandler.java +++ b/instrumentation/netty/netty-4.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/client/HttpClientRequestTracingHandler.java @@ -84,12 +84,19 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise prm) { Attribute bufferAttr = ctx.channel().attr(AttributeKeys.REQUEST_BODY_BUFFER); bufferAttr.set(BoundedBuffersFactory.createStream(contentLength, charset)); + + channel.attr(AttributeKeys.CHARSET).set(charset); } } if ((msg instanceof HttpContent || msg instanceof ByteBuf) && instrumentationConfig.httpBody().request()) { - DataCaptureUtils.captureBody(span, channel, AttributeKeys.REQUEST_BODY_BUFFER, msg); + Charset charset = channel.attr(AttributeKeys.CHARSET).get(); + if (charset == null) { + charset = ContentTypeCharsetUtils.getDefaultCharset(); + } + DataCaptureUtils.captureBody( + span, channel, AttributeKeys.REQUEST_BODY_BUFFER, msg, null, charset); } ctx.write(msg, prm); diff --git a/instrumentation/netty/netty-4.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/client/HttpClientResponseTracingHandler.java b/instrumentation/netty/netty-4.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/client/HttpClientResponseTracingHandler.java index 438c1a46a..5b5609467 100644 --- a/instrumentation/netty/netty-4.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/client/HttpClientResponseTracingHandler.java +++ b/instrumentation/netty/netty-4.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/client/HttpClientResponseTracingHandler.java @@ -84,12 +84,28 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { Attribute bufferAttr = ctx.channel().attr(AttributeKeys.RESPONSE_BODY_BUFFER); bufferAttr.set(BoundedBuffersFactory.createStream(contentLength, charset)); + + channel.attr(AttributeKeys.CHARSET).set(charset); + // Store content encoding in a channel attribute + CharSequence contentEncodingSeq = DataCaptureUtils.getContentEncoding(httpResponse); + String contentEncoding = null; + if (contentEncodingSeq != null) { + contentEncoding = contentEncodingSeq.toString(); + } + channel.attr(AttributeKeys.RESPONSE_HEADER_CONTENT_ENCODING).set(contentEncoding); } } if ((msg instanceof HttpContent || msg instanceof ByteBuf) && instrumentationConfig.httpBody().response()) { - DataCaptureUtils.captureBody(span, ctx.channel(), AttributeKeys.RESPONSE_BODY_BUFFER, msg); + // Retrieve content encoding from the channel attribute + Charset charset = channel.attr(AttributeKeys.CHARSET).get(); + if (charset == null) { + charset = ContentTypeCharsetUtils.getDefaultCharset(); + } + String contentEncoding = channel.attr(AttributeKeys.RESPONSE_HEADER_CONTENT_ENCODING).get(); + DataCaptureUtils.captureBody( + span, ctx.channel(), AttributeKeys.RESPONSE_BODY_BUFFER, msg, contentEncoding, charset); } try (Scope ignored = context.makeCurrent()) { diff --git a/instrumentation/netty/netty-4.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/server/HttpServerRequestTracingHandler.java b/instrumentation/netty/netty-4.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/server/HttpServerRequestTracingHandler.java index ed6135b02..815a9cf64 100644 --- a/instrumentation/netty/netty-4.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/server/HttpServerRequestTracingHandler.java +++ b/instrumentation/netty/netty-4.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/server/HttpServerRequestTracingHandler.java @@ -85,12 +85,19 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { Attribute bufferAttr = ctx.channel().attr(AttributeKeys.REQUEST_BODY_BUFFER); bufferAttr.set(BoundedBuffersFactory.createStream(contentLength, charset)); + + channel.attr(AttributeKeys.CHARSET).set(charset); } } if ((msg instanceof HttpContent || msg instanceof ByteBuf) && instrumentationConfig.httpBody().request()) { - DataCaptureUtils.captureBody(span, channel, AttributeKeys.REQUEST_BODY_BUFFER, msg); + Charset charset = channel.attr(AttributeKeys.CHARSET).get(); + if (charset == null) { + charset = ContentTypeCharsetUtils.getDefaultCharset(); + } + DataCaptureUtils.captureBody( + span, channel, AttributeKeys.REQUEST_BODY_BUFFER, msg, null, charset); } ctx.fireChannelRead(msg); diff --git a/instrumentation/netty/netty-4.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/server/HttpServerResponseTracingHandler.java b/instrumentation/netty/netty-4.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/server/HttpServerResponseTracingHandler.java index 0a83e153d..9b123d5da 100644 --- a/instrumentation/netty/netty-4.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/server/HttpServerResponseTracingHandler.java +++ b/instrumentation/netty/netty-4.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/server/HttpServerResponseTracingHandler.java @@ -17,6 +17,7 @@ package io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.server; import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; @@ -50,6 +51,7 @@ public class HttpServerResponseTracingHandler extends ChannelOutboundHandlerAdap @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise prm) { + Channel channel = ctx.channel(); Context context = ctx.channel() .attr( @@ -83,12 +85,27 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise prm) { Attribute bufferAttr = ctx.channel().attr(AttributeKeys.RESPONSE_BODY_BUFFER); bufferAttr.set(BoundedBuffersFactory.createStream(contentLength, charset)); + + channel.attr(AttributeKeys.CHARSET).set(charset); + // Store content encoding in a channel attribute + CharSequence contentEncodingSeq = DataCaptureUtils.getContentEncoding(httpResponse); + String contentEncoding = null; + if (contentEncodingSeq != null) { + contentEncoding = contentEncodingSeq.toString(); + } + channel.attr(AttributeKeys.RESPONSE_HEADER_CONTENT_ENCODING).set(contentEncoding); } } if ((msg instanceof HttpContent || msg instanceof ByteBuf) && instrumentationConfig.httpBody().response()) { - DataCaptureUtils.captureBody(span, ctx.channel(), AttributeKeys.RESPONSE_BODY_BUFFER, msg); + Charset charset = channel.attr(AttributeKeys.CHARSET).get(); + if (charset == null) { + charset = ContentTypeCharsetUtils.getDefaultCharset(); + } + String contentEncoding = channel.attr(AttributeKeys.RESPONSE_HEADER_CONTENT_ENCODING).get(); + DataCaptureUtils.captureBody( + span, ctx.channel(), AttributeKeys.RESPONSE_BODY_BUFFER, msg, contentEncoding, charset); } try (Scope ignored = context.makeCurrent()) { diff --git a/instrumentation/netty/netty-4.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/server/AbstractNetty40ServerInstrumentationTest.java b/instrumentation/netty/netty-4.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/server/AbstractNetty40ServerInstrumentationTest.java index edfb04e59..1c13b3cb5 100644 --- a/instrumentation/netty/netty-4.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/server/AbstractNetty40ServerInstrumentationTest.java +++ b/instrumentation/netty/netty-4.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/server/AbstractNetty40ServerInstrumentationTest.java @@ -295,4 +295,38 @@ public void connectionKeepAlive() throws IOException, TimeoutException, Interrup TEST_WRITER.getAttributesMap(span2).get("http.response.header." + RESPONSE_HEADER_NAME)); Assertions.assertNull(TEST_WRITER.getAttributesMap(span2).get("http.response.body")); } + + @Test + public void getGzipResponse() throws TimeoutException, InterruptedException, IOException { + Request request = + new Request.Builder() + .url(String.format("http://localhost:%d/get_gzip", port)) + .header(REQUEST_HEADER_NAME, REQUEST_HEADER_VALUE) + .get() + .build(); + + Response response = httpClient.newCall(request).execute(); + Assertions.assertEquals(200, response.code()); + + String responseBody = response.body().string(); + Assertions.assertEquals(RESPONSE_BODY, responseBody); + + TEST_WRITER.waitForTraces(1); + List> traces = + TEST_WRITER.waitForSpans(1, span -> span.getKind().equals(Span.SpanKind.SPAN_KIND_CLIENT)); + Assertions.assertEquals(1, traces.size()); + Assertions.assertEquals(1, traces.get(0).size()); + Span clientSpan = traces.get(0).get(0); + Assertions.assertEquals( + REQUEST_HEADER_VALUE, + TEST_WRITER + .getAttributesMap(clientSpan) + .get("http.request.header." + REQUEST_HEADER_NAME) + .getStringValue()); + Assertions.assertNull(TEST_WRITER.getAttributesMap(clientSpan).get("http.request.body")); + + String respBodyCapturedInSpan = + TEST_WRITER.getAttributesMap(clientSpan).get("http.response.body").getStringValue(); + Assertions.assertEquals(RESPONSE_BODY, respBodyCapturedInSpan); + } } diff --git a/instrumentation/netty/netty-4.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/server/NettyTestServer.java b/instrumentation/netty/netty-4.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/server/NettyTestServer.java index 97dd9124e..8ecd1a40e 100644 --- a/instrumentation/netty/netty-4.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/server/NettyTestServer.java +++ b/instrumentation/netty/netty-4.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/server/NettyTestServer.java @@ -32,16 +32,20 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.LastHttpContent; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; +import io.netty.util.CharsetUtil; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.ServerSocket; import java.util.List; import java.util.concurrent.ExecutionException; +import java.util.zip.GZIPOutputStream; public class NettyTestServer { @@ -110,6 +114,31 @@ protected void channelRead0(ChannelHandlerContext ctx, Object msg) { response.headers().add("Content-Type", "application-json"); response.headers().set(CONTENT_LENGTH, responseBody.readableBytes()); ctx.write(response); + } else if (httpRequest.getUri().contains("get_gzip")) { + // Prepare GZIP-compressed response + ByteArrayOutputStream byteArrayOutputStream = + new ByteArrayOutputStream(); + try (GZIPOutputStream gzipOut = + new GZIPOutputStream(byteArrayOutputStream)) { + gzipOut.write(RESPONSE_BODY.getBytes(CharsetUtil.UTF_8)); + } catch (IOException e) { + throw new RuntimeException(e); + } + ByteBuf responseBody = + Unpooled.wrappedBuffer(byteArrayOutputStream.toByteArray()); + HttpResponse response = + new DefaultFullHttpResponse( + HTTP_1_1, HttpResponseStatus.OK, responseBody); + response.headers().add(RESPONSE_HEADER_NAME, RESPONSE_HEADER_VALUE); + response + .headers() + .set(HttpHeaders.Names.CONTENT_TYPE, "application/json"); + response.headers().set(HttpHeaders.Names.CONTENT_ENCODING, "gzip"); + response + .headers() + .set( + HttpHeaders.Names.CONTENT_LENGTH, responseBody.readableBytes()); + ctx.write(response); } } } diff --git a/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/AttributeKeys.java b/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/AttributeKeys.java index d500054bf..5286f3ae3 100644 --- a/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/AttributeKeys.java +++ b/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/AttributeKeys.java @@ -17,6 +17,7 @@ package io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1; import io.netty.util.AttributeKey; +import java.nio.charset.Charset; import java.util.Map; import org.hypertrace.agent.core.instrumentation.HypertraceSemanticAttributes; import org.hypertrace.agent.core.instrumentation.buffer.BoundedByteArrayOutputStream; @@ -32,6 +33,13 @@ public class AttributeKeys { public static final AttributeKey> REQUEST_HEADERS = AttributeKey.valueOf(AttributeKeys.class, "request-headers"); + public static final AttributeKey RESPONSE_HEADER_CONTENT_ENCODING = + AttributeKey.valueOf( + HypertraceSemanticAttributes.HTTP_RESPONSE_HEADER_CONTENT_ENCODING.getKey()); + + public static final AttributeKey PROVIDED_CHARSET = + AttributeKey.valueOf("provided-charset"); + public static final AttributeKey REQUEST = AttributeKey.valueOf( "io.opentelemetry.instrumentation.netty.v4_1.internal.server.HttpServerRequestTracingHandler#http-server-request"); diff --git a/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/DataCaptureUtils.java b/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/DataCaptureUtils.java index 3b1825483..049896dce 100644 --- a/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/DataCaptureUtils.java +++ b/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/DataCaptureUtils.java @@ -25,9 +25,16 @@ import io.netty.util.Attribute; import io.netty.util.AttributeKey; import io.opentelemetry.api.trace.Span; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; -import java.io.UnsupportedEncodingException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.nio.charset.Charset; +import java.util.zip.GZIPInputStream; +import org.hypertrace.agent.core.instrumentation.buffer.BoundedBuffersFactory; import org.hypertrace.agent.core.instrumentation.buffer.BoundedByteArrayOutputStream; +import org.hypertrace.agent.core.instrumentation.utils.ContentTypeCharsetUtils; public class DataCaptureUtils { @@ -37,7 +44,9 @@ public static void captureBody( Span span, Channel channel, AttributeKey attributeKey, - Object httpContentOrBuffer) { + Object httpContentOrBuffer, + String contentEncoding, + Charset charset) { Attribute bufferAttr = channel.attr(attributeKey); BoundedByteArrayOutputStream buffer = bufferAttr.get(); @@ -59,9 +68,23 @@ public static void captureBody( if (httpContentOrBuffer instanceof LastHttpContent) { bufferAttr.remove(); try { - span.setAttribute(attributeKey.name(), buffer.toStringWithSuppliedCharset()); - } catch (UnsupportedEncodingException e) { - // ignore charset was parsed before + byte[] data = buffer.toByteArray(); + + String body; + if (charset == null) { + charset = ContentTypeCharsetUtils.getDefaultCharset(); + } + if (contentEncoding != null && contentEncoding.toLowerCase().contains("gzip")) { + try (GZIPInputStream gzipStream = new GZIPInputStream(new ByteArrayInputStream(data))) { + InputStreamReader reader = new InputStreamReader(gzipStream, charset); + body = readInputStream(reader, charset); + } + } else { + body = new String(data, charset); + } + span.setAttribute(attributeKey.name(), body); + } catch (IOException e) { + // eg: unsupported charset } } } @@ -84,4 +107,21 @@ public static CharSequence getContentType(HttpMessage message) { public static CharSequence getContentLength(HttpMessage message) { return message.headers().get(HttpHeaderNames.CONTENT_LENGTH); } + + public static CharSequence getContentEncoding(HttpMessage message) { + return message.headers().get(HttpHeaderNames.CONTENT_ENCODING); + } + + public static String readInputStream(InputStreamReader inputReader, Charset charset) + throws IOException { + BoundedByteArrayOutputStream outputStream = BoundedBuffersFactory.createStream(charset); + try (OutputStreamWriter writer = new OutputStreamWriter(outputStream, charset)) { + int c; + while ((c = inputReader.read()) != -1) { + writer.write(c); + } + writer.flush(); + } + return outputStream.toStringWithSuppliedCharset(); + } } diff --git a/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/client/HttpClientRequestTracingHandler.java b/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/client/HttpClientRequestTracingHandler.java index 12e29864d..b6775666d 100644 --- a/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/client/HttpClientRequestTracingHandler.java +++ b/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/client/HttpClientRequestTracingHandler.java @@ -83,12 +83,19 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise prm) { Attribute bufferAttr = ctx.channel().attr(AttributeKeys.REQUEST_BODY_BUFFER); bufferAttr.set(BoundedBuffersFactory.createStream(contentLength, charset)); + + channel.attr(AttributeKeys.PROVIDED_CHARSET).set(charset); } } if ((msg instanceof HttpContent || msg instanceof ByteBuf) && instrumentationConfig.httpBody().request()) { - DataCaptureUtils.captureBody(span, channel, AttributeKeys.REQUEST_BODY_BUFFER, msg); + Charset charset = channel.attr(AttributeKeys.PROVIDED_CHARSET).get(); + if (charset == null) { + charset = ContentTypeCharsetUtils.getDefaultCharset(); + } + DataCaptureUtils.captureBody( + span, channel, AttributeKeys.REQUEST_BODY_BUFFER, msg, null, charset); } ctx.write(msg, prm); diff --git a/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/client/HttpClientResponseTracingHandler.java b/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/client/HttpClientResponseTracingHandler.java index 6f31be4ca..f1b418e1e 100644 --- a/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/client/HttpClientResponseTracingHandler.java +++ b/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/client/HttpClientResponseTracingHandler.java @@ -83,12 +83,28 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { Attribute bufferAttr = ctx.channel().attr(AttributeKeys.RESPONSE_BODY_BUFFER); bufferAttr.set(BoundedBuffersFactory.createStream(contentLength, charset)); + + channel.attr(AttributeKeys.PROVIDED_CHARSET).set(charset); + // Store content encoding in a channel attribute + CharSequence contentEncodingSeq = DataCaptureUtils.getContentEncoding(httpResponse); + String contentEncoding = null; + if (contentEncodingSeq != null) { + contentEncoding = contentEncodingSeq.toString(); + } + channel.attr(AttributeKeys.RESPONSE_HEADER_CONTENT_ENCODING).set(contentEncoding); } } if ((msg instanceof HttpContent || msg instanceof ByteBuf) && instrumentationConfig.httpBody().response()) { - DataCaptureUtils.captureBody(span, ctx.channel(), AttributeKeys.RESPONSE_BODY_BUFFER, msg); + // Retrieve content encoding from the channel attribute + Charset charset = channel.attr(AttributeKeys.PROVIDED_CHARSET).get(); + if (charset == null) { + charset = ContentTypeCharsetUtils.getDefaultCharset(); + } + String contentEncoding = channel.attr(AttributeKeys.RESPONSE_HEADER_CONTENT_ENCODING).get(); + DataCaptureUtils.captureBody( + span, ctx.channel(), AttributeKeys.RESPONSE_BODY_BUFFER, msg, contentEncoding, charset); } try (Scope ignored = context.makeCurrent()) { diff --git a/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/server/HttpServerRequestTracingHandler.java b/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/server/HttpServerRequestTracingHandler.java index 9cc9c7c1d..6fce19a2b 100644 --- a/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/server/HttpServerRequestTracingHandler.java +++ b/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/server/HttpServerRequestTracingHandler.java @@ -85,12 +85,19 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { Attribute bufferAttr = ctx.channel().attr(AttributeKeys.REQUEST_BODY_BUFFER); bufferAttr.set(BoundedBuffersFactory.createStream(contentLength, charset)); + + channel.attr(AttributeKeys.PROVIDED_CHARSET).set(charset); } } if ((msg instanceof HttpContent || msg instanceof ByteBuf) && instrumentationConfig.httpBody().request()) { - DataCaptureUtils.captureBody(span, channel, AttributeKeys.REQUEST_BODY_BUFFER, msg); + Charset charset = channel.attr(AttributeKeys.PROVIDED_CHARSET).get(); + if (charset == null) { + charset = ContentTypeCharsetUtils.getDefaultCharset(); + } + DataCaptureUtils.captureBody( + span, channel, AttributeKeys.REQUEST_BODY_BUFFER, msg, null, charset); } ctx.fireChannelRead(msg); diff --git a/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/server/HttpServerResponseTracingHandler.java b/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/server/HttpServerResponseTracingHandler.java index c68dacf89..7fb6eb145 100644 --- a/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/server/HttpServerResponseTracingHandler.java +++ b/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/server/HttpServerResponseTracingHandler.java @@ -17,6 +17,7 @@ package io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1.server; import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; @@ -51,6 +52,7 @@ public class HttpServerResponseTracingHandler extends ChannelOutboundHandlerAdap @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise prm) { + Channel channel = ctx.channel(); Deque serverContexts = ctx.channel() .attr(io.opentelemetry.instrumentation.netty.v4_1.internal.AttributeKeys.SERVER_CONTEXT) @@ -83,12 +85,28 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise prm) { Attribute bufferAttr = ctx.channel().attr(AttributeKeys.RESPONSE_BODY_BUFFER); bufferAttr.set(BoundedBuffersFactory.createStream(contentLength, charset)); + + channel.attr(AttributeKeys.PROVIDED_CHARSET).set(charset); + // Store content encoding in a channel attribute + CharSequence contentEncodingSeq = DataCaptureUtils.getContentEncoding(httpResponse); + String contentEncoding = null; + if (contentEncodingSeq != null) { + contentEncoding = contentEncodingSeq.toString(); + } + channel.attr(AttributeKeys.RESPONSE_HEADER_CONTENT_ENCODING).set(contentEncoding); } } if ((msg instanceof HttpContent || msg instanceof ByteBuf) && instrumentationConfig.httpBody().response()) { - DataCaptureUtils.captureBody(span, ctx.channel(), AttributeKeys.RESPONSE_BODY_BUFFER, msg); + // Retrieve content encoding from the channel attribute + Charset charset = channel.attr(AttributeKeys.PROVIDED_CHARSET).get(); + if (charset == null) { + charset = ContentTypeCharsetUtils.getDefaultCharset(); + } + String contentEncoding = channel.attr(AttributeKeys.RESPONSE_HEADER_CONTENT_ENCODING).get(); + DataCaptureUtils.captureBody( + span, ctx.channel(), AttributeKeys.RESPONSE_BODY_BUFFER, msg, contentEncoding, charset); } try (Scope ignored = serverContexts.element().context().makeCurrent()) { diff --git a/instrumentation/netty/netty-4.1/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/server/AbstractNetty41ServerInstrumentationTest.java b/instrumentation/netty/netty-4.1/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/server/AbstractNetty41ServerInstrumentationTest.java index 4a312157d..f1d81fc55 100644 --- a/instrumentation/netty/netty-4.1/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/server/AbstractNetty41ServerInstrumentationTest.java +++ b/instrumentation/netty/netty-4.1/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/server/AbstractNetty41ServerInstrumentationTest.java @@ -295,4 +295,38 @@ public void connectionKeepAlive() throws IOException, TimeoutException, Interrup TEST_WRITER.getAttributesMap(span2).get("http.response.header." + RESPONSE_HEADER_NAME)); Assertions.assertNull(TEST_WRITER.getAttributesMap(span2).get("http.response.body")); } + + @Test + public void getGzipResponse() throws TimeoutException, InterruptedException, IOException { + Request request = + new Request.Builder() + .url(String.format("http://localhost:%d/get_gzip", port)) + .header(REQUEST_HEADER_NAME, REQUEST_HEADER_VALUE) + .get() + .build(); + + Response response = httpClient.newCall(request).execute(); + Assertions.assertEquals(200, response.code()); + + String responseBody = response.body().string(); + Assertions.assertEquals(RESPONSE_BODY, responseBody); + + TEST_WRITER.waitForTraces(1); + List> traces = + TEST_WRITER.waitForSpans(1, span -> span.getKind().equals(Span.SpanKind.SPAN_KIND_CLIENT)); + Assertions.assertEquals(1, traces.size()); + Assertions.assertEquals(1, traces.get(0).size()); + Span clientSpan = traces.get(0).get(0); + Assertions.assertEquals( + REQUEST_HEADER_VALUE, + TEST_WRITER + .getAttributesMap(clientSpan) + .get("http.request.header." + REQUEST_HEADER_NAME) + .getStringValue()); + Assertions.assertNull(TEST_WRITER.getAttributesMap(clientSpan).get("http.request.body")); + + String respBodyCapturedInSpan = + TEST_WRITER.getAttributesMap(clientSpan).get("http.response.body").getStringValue(); + Assertions.assertEquals(RESPONSE_BODY, respBodyCapturedInSpan); + } } diff --git a/instrumentation/netty/netty-4.1/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/server/NettyTestServer.java b/instrumentation/netty/netty-4.1/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/server/NettyTestServer.java index fe6b11c15..154ab57e5 100644 --- a/instrumentation/netty/netty-4.1/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/server/NettyTestServer.java +++ b/instrumentation/netty/netty-4.1/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/server/NettyTestServer.java @@ -32,16 +32,21 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.LastHttpContent; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; +import io.netty.util.CharsetUtil; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.ServerSocket; import java.util.List; import java.util.concurrent.ExecutionException; +import java.util.zip.GZIPOutputStream; public class NettyTestServer { @@ -110,6 +115,30 @@ protected void channelRead0(ChannelHandlerContext ctx, Object msg) { response.headers().add("Content-Type", "application-json"); response.headers().set(CONTENT_LENGTH, responseBody.readableBytes()); ctx.write(response); + } else if (httpRequest.uri().contains("get_gzip")) { + // Prepare GZIP-compressed response + ByteArrayOutputStream byteArrayOutputStream = + new ByteArrayOutputStream(); + try (GZIPOutputStream gzipOut = + new GZIPOutputStream(byteArrayOutputStream)) { + gzipOut.write(RESPONSE_BODY.getBytes(CharsetUtil.UTF_8)); + } catch (IOException e) { + throw new RuntimeException(e); + } + ByteBuf responseBody = + Unpooled.wrappedBuffer(byteArrayOutputStream.toByteArray()); + HttpResponse response = + new DefaultFullHttpResponse( + HttpVersion.HTTP_1_1, HttpResponseStatus.OK, responseBody); + response.headers().add(RESPONSE_HEADER_NAME, RESPONSE_HEADER_VALUE); + response + .headers() + .set(HttpHeaderNames.CONTENT_TYPE, "application/json"); + response.headers().set(HttpHeaderNames.CONTENT_ENCODING, "gzip"); + response + .headers() + .set(HttpHeaderNames.CONTENT_LENGTH, responseBody.readableBytes()); + ctx.write(response); } } } diff --git a/instrumentation/okhttp/okhttp-3.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/okhttp/v3_0/OkHttpTracingInterceptor.java b/instrumentation/okhttp/okhttp-3.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/okhttp/v3_0/OkHttpTracingInterceptor.java index d2546a7da..9ba992a6a 100644 --- a/instrumentation/okhttp/okhttp-3.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/okhttp/v3_0/OkHttpTracingInterceptor.java +++ b/instrumentation/okhttp/okhttp-3.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/okhttp/v3_0/OkHttpTracingInterceptor.java @@ -19,6 +19,8 @@ import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.trace.Span; import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.function.Function; import okhttp3.Headers; import okhttp3.Interceptor; @@ -28,6 +30,9 @@ import okhttp3.Response; import okhttp3.ResponseBody; import okio.Buffer; +import okio.BufferedSource; +import okio.GzipSource; +import okio.Okio; import org.hypertrace.agent.core.config.InstrumentationConfig; import org.hypertrace.agent.core.instrumentation.HypertraceSemanticAttributes; import org.hypertrace.agent.core.instrumentation.utils.ContentTypeUtils; @@ -88,6 +93,7 @@ private static Response captureResponseBody(Span span, final Response response) if (response.body() == null) { return response; } + ResponseBody responseBody = response.body(); MediaType mediaType = responseBody.contentType(); if (mediaType == null || !ContentTypeUtils.shouldCapture(mediaType.toString())) { @@ -95,18 +101,46 @@ private static Response captureResponseBody(Span span, final Response response) } try { - String body = responseBody.string(); + // Read the entire response body one-shot into a byte-array + // responseBody.string(), this looks for the charset if available in content-type header + // else defaults to utf-8. So read bytes itself as done here and use for building new response + // ref: https://square.github.io/okhttp/3.x/okhttp/okhttp3/ResponseBody.html + byte[] byteArray = responseBody.source().readByteArray(); + String body; + + // Determine the content encoding + String contentEncoding = response.header("Content-Encoding"); + if (contentEncoding != null && contentEncoding.toLowerCase().contains("gzip")) { + // Decompress the response body if it is GZIP encoded using GzipSource + GzipSource gzipSource = new GzipSource(new Buffer().write(byteArray)); + BufferedSource bufferedGzipSource = Okio.buffer(gzipSource); + + // capture the decompressed content from gzip source to set as response body in span + body = bufferedGzipSource.readString(getCharset(mediaType)); + } else { + // capture the response body for other cases + body = new String(byteArray, getCharset(mediaType)); + } + span.setAttribute(HypertraceSemanticAttributes.HTTP_RESPONSE_BODY, body); - return response - .newBuilder() - .body(ResponseBody.create(responseBody.contentType(), body)) - .build(); + + // Return the response with its body and encoding exactly the same as the original response + return response.newBuilder().body(ResponseBody.create(mediaType, byteArray)).build(); } catch (IOException e) { log.error("Could not read response body", e); } + return response; } + // Helper method to determine charset from MediaType if available else default to UTF-8 + private static Charset getCharset(MediaType mediaType) { + if (mediaType != null && mediaType.charset() != null) { + return mediaType.charset(); + } + return StandardCharsets.UTF_8; // Default charset + } + private static void captureHeaders( Span span, Headers headers, Function> headerNameProvider) { for (String name : headers.names()) { diff --git a/instrumentation/okhttp/okhttp-3.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/okhttp/v3_0/OkHttpTracingInterceptorTest.java b/instrumentation/okhttp/okhttp-3.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/okhttp/v3_0/OkHttpTracingInterceptorTest.java index 7e0f13fd8..6b895c565 100644 --- a/instrumentation/okhttp/okhttp-3.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/okhttp/v3_0/OkHttpTracingInterceptorTest.java +++ b/instrumentation/okhttp/okhttp-3.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/okhttp/v3_0/OkHttpTracingInterceptorTest.java @@ -67,10 +67,10 @@ public Response doGetRequest(String uri, Map headersMap) throws okhttp3.Response response = client.newCall(request).execute(); - String responseBody = - (response.body() != null && response.body().contentLength() > 0) - ? response.body().string() - : null; - return new Response(responseBody, response.code()); + if (response.body() != null) { + String responseBody = response.body().string(); + return new Response(!responseBody.isEmpty() ? responseBody : null, response.code()); + } + return new Response(null, response.code()); } } diff --git a/instrumentation/servlet/servlet-3.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/servlet/v3_0/nowrapping/Servlet30InstrumentationTest.java b/instrumentation/servlet/servlet-3.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/servlet/v3_0/nowrapping/Servlet30InstrumentationTest.java index 0a50a4956..a596f517b 100644 --- a/instrumentation/servlet/servlet-3.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/servlet/v3_0/nowrapping/Servlet30InstrumentationTest.java +++ b/instrumentation/servlet/servlet-3.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/servlet/v3_0/nowrapping/Servlet30InstrumentationTest.java @@ -30,6 +30,7 @@ import java.io.IOException; import java.util.EnumSet; import java.util.List; +import java.util.concurrent.TimeoutException; import javax.servlet.DispatcherType; import javax.servlet.Filter; import javax.servlet.FilterChain; @@ -111,6 +112,7 @@ public static void startServer() throws Exception { handler.addServlet(EchoAsyncResponse_writer.class, "/echo_async_response_writer"); handler.addServlet(EchoStream_read_large_array.class, "/echo_stream_read_large_array"); handler.addServlet(EchoReader_read_large_array.class, "/echo_reader_read_large_array"); + handler.addServlet(TestServlets.GetGzip.class, "/get_gzip"); server.setHandler(handler); server.start(); serverPort = server.getConnectors()[0].getLocalPort(); @@ -401,4 +403,46 @@ public void postJson(String url) throws Exception { TestServlets.RESPONSE_BODY, TEST_WRITER.getAttributesMap(span).get("http.response.body").getStringValue()); } + + @Test + public void getGzipResponse() throws TimeoutException, InterruptedException, IOException { + Request request = + new Request.Builder() + .url(String.format("http://localhost:%d/get_gzip", serverPort)) + .header(REQUEST_HEADER, REQUEST_HEADER_VALUE) + .get() + .build(); + + Response response = httpClient.newCall(request).execute(); + Assertions.assertEquals(200, response.code()); + + String responseBody = response.body().string(); + Assertions.assertEquals("{\"message\": \"hello\"}", responseBody); + + TEST_WRITER.waitForTraces(1); + List> traces = + TEST_WRITER.waitForSpans( + 1, + span -> + !span.getKind().equals(Span.SpanKind.SPAN_KIND_CLIENT) + || span.getAttributesList().stream() + .noneMatch( + keyValue -> + keyValue.getKey().equals("http.url") + && keyValue.getValue().getStringValue().contains("/get_gzip"))); + Assertions.assertEquals(1, traces.size()); + Assertions.assertEquals(1, traces.get(0).size()); + Span clientSpan = traces.get(0).get(0); + Assertions.assertEquals( + REQUEST_HEADER_VALUE, + TEST_WRITER + .getAttributesMap(clientSpan) + .get("http.request.header." + REQUEST_HEADER) + .getStringValue()); + Assertions.assertNull(TEST_WRITER.getAttributesMap(clientSpan).get("http.request.body")); + + String respBodyCapturedInSpan = + TEST_WRITER.getAttributesMap(clientSpan).get("http.response.body").getStringValue(); + Assertions.assertEquals("{\"message\": \"hello\"}", respBodyCapturedInSpan); + } } diff --git a/instrumentation/servlet/servlet-3.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/servlet/v3_0/nowrapping/TestServlets.java b/instrumentation/servlet/servlet-3.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/servlet/v3_0/nowrapping/TestServlets.java index b79f1b742..a2da966ae 100644 --- a/instrumentation/servlet/servlet-3.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/servlet/v3_0/nowrapping/TestServlets.java +++ b/instrumentation/servlet/servlet-3.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/servlet/v3_0/nowrapping/TestServlets.java @@ -17,7 +17,9 @@ package io.opentelemetry.javaagent.instrumentation.hypertrace.servlet.v3_0.nowrapping; import java.io.IOException; +import java.io.OutputStream; import java.util.stream.Stream; +import java.util.zip.GZIPOutputStream; import javax.servlet.AsyncContext; import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; @@ -291,4 +293,20 @@ protected void service(HttpServletRequest req, HttpServletResponse resp) throws resp.getWriter().print(RESPONSE_BODY.toCharArray()); } } + + public static class GetGzip extends HttpServlet { + @Override + protected void service(HttpServletRequest req, HttpServletResponse resp) + throws ServletException, IOException { + while (req.getInputStream().read() != -1) {} + resp.setStatus(HttpServletResponse.SC_OK); + resp.setHeader("Content-Encoding", "gzip"); + resp.setHeader("Content-Type", "application/json"); + try (OutputStream out = resp.getOutputStream(); + GZIPOutputStream gzipOut = new GZIPOutputStream(out)) { + String jsonResponse = "{\"message\": \"hello\"}"; + gzipOut.write(jsonResponse.getBytes()); + } + } + } } diff --git a/instrumentation/servlet/servlet-5.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/servlet/v5_0/Servlet50InstrumentationTest.java b/instrumentation/servlet/servlet-5.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/servlet/v5_0/Servlet50InstrumentationTest.java index c80cf1003..b5787c7c1 100644 --- a/instrumentation/servlet/servlet-5.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/servlet/v5_0/Servlet50InstrumentationTest.java +++ b/instrumentation/servlet/servlet-5.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/servlet/v5_0/Servlet50InstrumentationTest.java @@ -37,6 +37,7 @@ import java.io.IOException; import java.util.EnumSet; import java.util.List; +import java.util.concurrent.TimeoutException; import okhttp3.FormBody; import okhttp3.MediaType; import okhttp3.Request; @@ -112,6 +113,8 @@ public static void startServer() throws Exception { handler.addServlet(EchoAsyncResponse_writer.class, "/echo_async_response_writer"); handler.addServlet(EchoStream_read_large_array.class, "/echo_stream_read_large_array"); handler.addServlet(EchoReader_read_large_array.class, "/echo_reader_read_large_array"); + handler.addServlet(TestServlets.GetGzip.class, "/get_gzip"); + server.setHandler(handler); server.start(); serverPort = ((ServerConnector) server.getConnectors()[0]).getLocalPort(); @@ -402,4 +405,46 @@ public void postJson(String url) throws Exception { TestServlets.RESPONSE_BODY, TEST_WRITER.getAttributesMap(span).get("http.response.body").getStringValue()); } + + @Test + public void getGzipResponse() throws TimeoutException, InterruptedException, IOException { + Request request = + new Request.Builder() + .url(String.format("http://localhost:%d/get_gzip", serverPort)) + .header(REQUEST_HEADER, REQUEST_HEADER_VALUE) + .get() + .build(); + + Response response = httpClient.newCall(request).execute(); + Assertions.assertEquals(200, response.code()); + + String responseBody = response.body().string(); + Assertions.assertEquals("{\"message\": \"hello\"}", responseBody); + + TEST_WRITER.waitForTraces(1); + List> traces = + TEST_WRITER.waitForSpans( + 1, + span -> + !span.getKind().equals(Span.SpanKind.SPAN_KIND_CLIENT) + || span.getAttributesList().stream() + .noneMatch( + keyValue -> + keyValue.getKey().equals("http.url") + && keyValue.getValue().getStringValue().contains("/get_gzip"))); + Assertions.assertEquals(1, traces.size()); + Assertions.assertEquals(1, traces.get(0).size()); + Span clientSpan = traces.get(0).get(0); + Assertions.assertEquals( + REQUEST_HEADER_VALUE, + TEST_WRITER + .getAttributesMap(clientSpan) + .get("http.request.header." + REQUEST_HEADER) + .getStringValue()); + Assertions.assertNull(TEST_WRITER.getAttributesMap(clientSpan).get("http.request.body")); + + String respBodyCapturedInSpan = + TEST_WRITER.getAttributesMap(clientSpan).get("http.response.body").getStringValue(); + Assertions.assertEquals("{\"message\": \"hello\"}", respBodyCapturedInSpan); + } } diff --git a/instrumentation/servlet/servlet-5.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/servlet/v5_0/TestServlets.java b/instrumentation/servlet/servlet-5.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/servlet/v5_0/TestServlets.java index e383284e7..da44ac6ad 100644 --- a/instrumentation/servlet/servlet-5.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/servlet/v5_0/TestServlets.java +++ b/instrumentation/servlet/servlet-5.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/servlet/v5_0/TestServlets.java @@ -22,7 +22,9 @@ import jakarta.servlet.http.HttpServletRequest; import jakarta.servlet.http.HttpServletResponse; import java.io.IOException; +import java.io.OutputStream; import java.util.stream.Stream; +import java.util.zip.GZIPOutputStream; public class TestServlets { @@ -291,4 +293,20 @@ protected void service(HttpServletRequest req, HttpServletResponse resp) throws resp.getWriter().print(RESPONSE_BODY.toCharArray()); } } + + public static class GetGzip extends HttpServlet { + @Override + protected void service(HttpServletRequest req, HttpServletResponse resp) + throws ServletException, IOException { + while (req.getInputStream().read() != -1) {} + resp.setStatus(javax.servlet.http.HttpServletResponse.SC_OK); + resp.setHeader("Content-Encoding", "gzip"); + resp.setHeader("Content-Type", "application/json"); + try (OutputStream out = resp.getOutputStream(); + GZIPOutputStream gzipOut = new GZIPOutputStream(out)) { + String jsonResponse = "{\"message\": \"hello\"}"; + gzipOut.write(jsonResponse.getBytes()); + } + } + } } diff --git a/instrumentation/servlet/servlet-rw/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/servlet/v3_0/rw/ServletRWInstrumentationTest.java b/instrumentation/servlet/servlet-rw/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/servlet/v3_0/rw/ServletRWInstrumentationTest.java index 8adf756bc..f705e7d62 100644 --- a/instrumentation/servlet/servlet-rw/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/servlet/v3_0/rw/ServletRWInstrumentationTest.java +++ b/instrumentation/servlet/servlet-rw/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/servlet/v3_0/rw/ServletRWInstrumentationTest.java @@ -17,8 +17,10 @@ package io.opentelemetry.javaagent.instrumentation.hypertrace.servlet.v3_0.rw; import io.opentelemetry.proto.trace.v1.Span; +import java.io.IOException; import java.util.EnumSet; import java.util.List; +import java.util.concurrent.TimeoutException; import javax.servlet.DispatcherType; import okhttp3.FormBody; import okhttp3.MediaType; @@ -63,6 +65,8 @@ public static void startServer() throws Exception { TestServlets.EchoStream_read_large_array.class, "/echo_stream_read_large_array"); handler.addServlet( TestServlets.EchoReader_read_large_array.class, "/echo_reader_read_large_array"); + handler.addServlet(TestServlets.GetGzip.class, "/get_gzip"); + server.setHandler(handler); server.start(); serverPort = server.getConnectors()[0].getLocalPort(); @@ -323,4 +327,46 @@ public void postJson(String url) throws Exception { TestServlets.RESPONSE_BODY, TEST_WRITER.getAttributesMap(span).get("http.response.body").getStringValue()); } + + @Test + public void getGzipResponse() throws TimeoutException, InterruptedException, IOException { + Request request = + new Request.Builder() + .url(String.format("http://localhost:%d/get_gzip", serverPort)) + .header(REQUEST_HEADER, REQUEST_HEADER_VALUE) + .get() + .build(); + + Response response = httpClient.newCall(request).execute(); + Assertions.assertEquals(200, response.code()); + + String responseBody = response.body().string(); + Assertions.assertEquals("{\"message\": \"hello\"}", responseBody); + + TEST_WRITER.waitForTraces(1); + List> traces = + TEST_WRITER.waitForSpans( + 1, + span -> + !span.getKind().equals(Span.SpanKind.SPAN_KIND_CLIENT) + || span.getAttributesList().stream() + .noneMatch( + keyValue -> + keyValue.getKey().equals("http.url") + && keyValue.getValue().getStringValue().contains("/get_gzip"))); + Assertions.assertEquals(1, traces.size()); + Assertions.assertEquals(1, traces.get(0).size()); + Span clientSpan = traces.get(0).get(0); + Assertions.assertEquals( + REQUEST_HEADER_VALUE, + TEST_WRITER + .getAttributesMap(clientSpan) + .get("http.request.header." + REQUEST_HEADER) + .getStringValue()); + Assertions.assertNull(TEST_WRITER.getAttributesMap(clientSpan).get("http.request.body")); + + String respBodyCapturedInSpan = + TEST_WRITER.getAttributesMap(clientSpan).get("http.response.body").getStringValue(); + Assertions.assertEquals("{\"message\": \"hello\"}", respBodyCapturedInSpan); + } } diff --git a/instrumentation/servlet/servlet-rw/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/servlet/v3_0/rw/TestServlets.java b/instrumentation/servlet/servlet-rw/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/servlet/v3_0/rw/TestServlets.java index e7344df9f..c33fde2f0 100644 --- a/instrumentation/servlet/servlet-rw/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/servlet/v3_0/rw/TestServlets.java +++ b/instrumentation/servlet/servlet-rw/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/servlet/v3_0/rw/TestServlets.java @@ -17,7 +17,9 @@ package io.opentelemetry.javaagent.instrumentation.hypertrace.servlet.v3_0.rw; import java.io.IOException; +import java.io.OutputStream; import java.util.stream.Stream; +import java.util.zip.GZIPOutputStream; import javax.servlet.AsyncContext; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; @@ -195,4 +197,19 @@ protected void service(HttpServletRequest req, HttpServletResponse resp) throws resp.getWriter().print(RESPONSE_BODY.toCharArray()); } } + + public static class GetGzip extends HttpServlet { + @Override + protected void service(HttpServletRequest req, HttpServletResponse resp) throws IOException { + while (req.getInputStream().read() != -1) {} + resp.setStatus(javax.servlet.http.HttpServletResponse.SC_OK); + resp.setHeader("Content-Encoding", "gzip"); + resp.setHeader("Content-Type", "application/json"); + try (OutputStream out = resp.getOutputStream(); + GZIPOutputStream gzipOut = new GZIPOutputStream(out)) { + String jsonResponse = "{\"message\": \"hello\"}"; + gzipOut.write(jsonResponse.getBytes()); + } + } + } } diff --git a/instrumentation/spark-2.3/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/sparkjava/SparkJavaInstrumentationTest.java b/instrumentation/spark-2.3/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/sparkjava/SparkJavaInstrumentationTest.java index 562707cdd..4fd5786c0 100644 --- a/instrumentation/spark-2.3/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/sparkjava/SparkJavaInstrumentationTest.java +++ b/instrumentation/spark-2.3/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/sparkjava/SparkJavaInstrumentationTest.java @@ -17,14 +17,18 @@ package io.opentelemetry.javaagent.instrumentation.hypertrace.sparkjava; import io.opentelemetry.proto.trace.v1.Span; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.List; import java.util.concurrent.TimeoutException; +import java.util.zip.GZIPOutputStream; import okhttp3.MediaType; import okhttp3.Request; import okhttp3.Request.Builder; import okhttp3.RequestBody; import okhttp3.Response; +import okhttp3.ResponseBody; import org.hypertrace.agent.testing.AbstractInstrumenterTest; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; @@ -63,6 +67,22 @@ public static void postJson() throws Exception { res.body(RESPONSE_BODY); throw new RuntimeException(); }); + Spark.get( + "/gzip", + (req, res) -> { + res.header("Content-Encoding", "gzip"); + res.type("application/json"); + String jsonResponse = "{\"message\": \"hello\"}"; + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + try (GZIPOutputStream gzipOut = new GZIPOutputStream(byteArrayOutputStream)) { + gzipOut.write(jsonResponse.getBytes(StandardCharsets.UTF_8)); + } + byte[] gzipData = byteArrayOutputStream.toByteArray(); + res.raw().getOutputStream().write(gzipData); + res.raw().getOutputStream().flush(); + return null; + }); + Spark.awaitInitialization(); } @@ -144,4 +164,47 @@ public void exceptionInHandler() throws IOException, InterruptedException, Timeo .get("http.response.header." + RESPONSE_HEADER) .getStringValue()); } + + @Test + public void getGzipResponse() throws TimeoutException, InterruptedException, IOException { + Request request = + new Request.Builder() + .url(String.format("http://localhost:%d/gzip", PORT)) + .header(REQUEST_HEADER, REQUEST_HEADER_VALUE) + .get() + .build(); + + Response response = httpClient.newCall(request).execute(); + Assertions.assertEquals(200, response.code()); + + ResponseBody responseBody = response.body(); + String body = responseBody.string(); + Assertions.assertEquals("{\"message\": \"hello\"}", body); + + TEST_WRITER.waitForTraces(1); + List> traces = + TEST_WRITER.waitForSpans( + 1, + span -> + !span.getKind().equals(Span.SpanKind.SPAN_KIND_CLIENT) + || span.getAttributesList().stream() + .noneMatch( + keyValue -> + keyValue.getKey().equals("http.url") + && keyValue.getValue().getStringValue().contains("/gzip"))); + Assertions.assertEquals(1, traces.size()); + Assertions.assertEquals(1, traces.get(0).size()); + Span clientSpan = traces.get(0).get(0); + Assertions.assertEquals( + REQUEST_HEADER_VALUE, + TEST_WRITER + .getAttributesMap(clientSpan) + .get("http.request.header." + REQUEST_HEADER) + .getStringValue()); + Assertions.assertNull(TEST_WRITER.getAttributesMap(clientSpan).get("http.request.body")); + + String respBodyCapturedInSpan = + TEST_WRITER.getAttributesMap(clientSpan).get("http.response.body").getStringValue(); + Assertions.assertEquals("{\"message\": \"hello\"}", respBodyCapturedInSpan); + } } diff --git a/instrumentation/undertow/undertow-1.4/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/undertow/v1_4/utils/Utils.java b/instrumentation/undertow/undertow-1.4/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/undertow/v1_4/utils/Utils.java index f42f23f14..2db70d107 100644 --- a/instrumentation/undertow/undertow-1.4/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/undertow/v1_4/utils/Utils.java +++ b/instrumentation/undertow/undertow-1.4/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/undertow/v1_4/utils/Utils.java @@ -59,12 +59,15 @@ public static void createAndStoreBufferForSpan( final BoundedByteArrayOutputStream boundedByteArrayOutputStream = BoundedBuffersFactory.createStream( (int) httpServerExchange.getRequestContentLength(), charset); + final String contentEncoding = + httpServerExchange.getRequestHeaders().getFirst(Headers.CONTENT_ENCODING); final SpanAndBuffer spanAndBuffer = new SpanAndBuffer( span, boundedByteArrayOutputStream, HypertraceSemanticAttributes.HTTP_REQUEST_BODY, - charset); + charset, + contentEncoding); contextStore.set(returnedChannel, spanAndBuffer); httpServerExchange.addExchangeCompleteListener( new BodyCapturingExchangeCompletionListener(spanAndBuffer)); diff --git a/instrumentation/undertow/undertow-1.4/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/undertow/v1_4/UndertowInstrumentationTest.java b/instrumentation/undertow/undertow-1.4/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/undertow/v1_4/UndertowInstrumentationTest.java index bcaeb3af2..b96b970bc 100644 --- a/instrumentation/undertow/undertow-1.4/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/undertow/v1_4/UndertowInstrumentationTest.java +++ b/instrumentation/undertow/undertow-1.4/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/undertow/v1_4/UndertowInstrumentationTest.java @@ -27,13 +27,22 @@ import io.undertow.servlet.Servlets; import io.undertow.servlet.api.DeploymentInfo; import io.undertow.servlet.api.DeploymentManager; +import io.undertow.util.Headers; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.StringWriter; import java.io.UncheckedIOException; import java.net.ServerSocket; +import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; import java.util.concurrent.TimeoutException; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; import javax.servlet.ServletException; import javax.servlet.ServletInputStream; import javax.servlet.ServletOutputStream; @@ -54,6 +63,7 @@ final class UndertowInstrumentationTest extends AbstractInstrumenterTest { private static Undertow server; private static int availablePort; + private static final String RESPONSE_BODY = "{\"message\": \"Hello World\"}"; @BeforeAll static void startServer() throws ServletException { @@ -211,6 +221,58 @@ void postUrlEncoded() throws InterruptedException, TimeoutException, IOException assertNull(TEST_WRITER.getAttributesMap(getHtmlSpan).get("http.request.body")); } + @Test + void getGzipResponse() throws IOException { + // This Builder is from okhttp client, which by default adds accept-encoding as gzip + // For testing, setting this as gzipw which test server recognises to send gzip response. + try (Response response = + httpClient + .newCall( + new Builder() + .url("http://localhost:" + availablePort + "/myapp/?format=json") + .addHeader(Headers.ACCEPT_ENCODING_STRING, "gzipw") + .get() + .build()) + .execute()) { + assertEquals(200, response.code()); + assertEquals("gzip", response.header(Headers.CONTENT_ENCODING_STRING)); + + String responseBody = decompressGzip(response.body().bytes(), StandardCharsets.UTF_8); + assertEquals(RESPONSE_BODY, responseBody); + + TEST_WRITER.waitForTraces(1); + List> traces = + TEST_WRITER.waitForSpans( + 1, + span -> + span.getKind().equals(Span.SpanKind.SPAN_KIND_SERVER) + || span.getKind().equals(Span.SpanKind.SPAN_KIND_INTERNAL)); + Assertions.assertEquals(1, traces.size()); + List trace = traces.get(0); + Assertions.assertEquals(1, trace.size()); + Span span = trace.get(0); + assertEquals( + RESPONSE_BODY, + TEST_WRITER.getAttributesMap(span).get("http.response.body").getStringValue()); + } catch (InterruptedException | TimeoutException e) { + throw new RuntimeException(e); + } + } + + private String decompressGzip(byte[] compressed, Charset charset) throws IOException { + try (InputStream byteStream = new ByteArrayInputStream(compressed); + GZIPInputStream gzipStream = new GZIPInputStream(byteStream); + InputStreamReader reader = new InputStreamReader(gzipStream, charset); + StringWriter writer = new StringWriter()) { + + int character; + while ((character = reader.read()) != -1) { + writer.write(character); + } + return writer.toString(); + } + } + public static final class TestServlet extends HttpServlet { @Override @@ -225,10 +287,30 @@ protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws IO responseBody = "

Hello World

".getBytes(StandardCharsets.UTF_8); resp.setContentType("text/html; charset=UTF-8"); } - resp.setStatus(200); - resp.setContentLength(responseBody.length); - try (ServletOutputStream servletOutputStream = resp.getOutputStream()) { - servletOutputStream.write(responseBody); + // Check if the request supports GZIP encoding + // For Building request we are using OkHttp Builder, which by defaults adds Accept-Encoding as + // gzip. + // So for testing purposes, the test case server sees for gzipw instead to send gzip + // responses. + if (req.getHeader("Accept-Encoding") != null + && req.getHeader("Accept-Encoding").contains("gzipw")) { + resp.setHeader("Content-Encoding", "gzip"); + resp.setStatus(HttpServletResponse.SC_OK); + try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream)) { + gzipOutputStream.write(responseBody); + gzipOutputStream.finish(); + resp.setContentLength(byteArrayOutputStream.size()); + try (ServletOutputStream servletOutputStream = resp.getOutputStream()) { + servletOutputStream.write(byteArrayOutputStream.toByteArray()); + } + } + } else { + resp.setStatus(HttpServletResponse.SC_OK); + resp.setContentLength(responseBody.length); + try (ServletOutputStream servletOutputStream = resp.getOutputStream()) { + servletOutputStream.write(responseBody); + } } } diff --git a/instrumentation/vertx/vertx-web-3.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/vertx/HttpResponseInstrumentation.java b/instrumentation/vertx/vertx-web-3.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/vertx/HttpResponseInstrumentation.java index e70389c4e..174792d84 100644 --- a/instrumentation/vertx/vertx-web-3.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/vertx/HttpResponseInstrumentation.java +++ b/instrumentation/vertx/vertx-web-3.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/vertx/HttpResponseInstrumentation.java @@ -65,7 +65,11 @@ public static void handleResponseEnter( } VirtualField.find(HttpClientResponse.class, Span.class).set(response, null); - handler = new ResponseBodyWrappingHandler(handler, span); + // Extract content encoding from the response headers + String contentEncoding = response.getHeader("Content-Encoding"); + String contentType = response.getHeader("Content-Type"); + + handler = new ResponseBodyWrappingHandler(handler, span, contentEncoding, contentType); } } } diff --git a/instrumentation/vertx/vertx-web-3.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/vertx/ResponseBodyWrappingHandler.java b/instrumentation/vertx/vertx-web-3.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/vertx/ResponseBodyWrappingHandler.java index 2ee406c01..2f66b0791 100644 --- a/instrumentation/vertx/vertx-web-3.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/vertx/ResponseBodyWrappingHandler.java +++ b/instrumentation/vertx/vertx-web-3.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/vertx/ResponseBodyWrappingHandler.java @@ -24,9 +24,17 @@ import io.opentelemetry.context.Context; import io.vertx.core.Handler; import io.vertx.core.buffer.Buffer; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.StringWriter; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.util.zip.GZIPInputStream; import org.hypertrace.agent.core.instrumentation.HypertraceSemanticAttributes; +import org.hypertrace.agent.core.instrumentation.utils.ContentTypeCharsetUtils; +import org.hypertrace.agent.core.instrumentation.utils.ContentTypeUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,15 +50,30 @@ public class ResponseBodyWrappingHandler implements Handler { private final Handler wrapped; private final Span span; + private final String encoding; + private final String contentType; - public ResponseBodyWrappingHandler(Handler wrapped, Span span) { + public ResponseBodyWrappingHandler( + Handler wrapped, Span span, String encoding, String contentType) { this.wrapped = wrapped; this.span = span; + this.encoding = encoding; + this.contentType = contentType; } @Override public void handle(Buffer event) { - String responseBody = event.getString(0, event.length()); + String responseBody; + try { + if (encoding != null && encoding.toLowerCase().contains("gzip")) { + responseBody = decompressGzip(event.getBytes()); + } else { + responseBody = event.getString(0, event.length()); + } + } catch (IOException e) { + responseBody = event.getString(0, event.length()); + } + if (span.isRecording()) { span.setAttribute(HypertraceSemanticAttributes.HTTP_RESPONSE_BODY, responseBody); } else { @@ -94,4 +117,20 @@ public void handle(Buffer event) { wrapped.handle(event); } + + private String decompressGzip(byte[] compressed) throws IOException { + String charset = ContentTypeUtils.parseCharset(contentType); + charset = ContentTypeCharsetUtils.toCharset(charset).name(); + try (InputStream byteStream = new ByteArrayInputStream(compressed); + GZIPInputStream gzipStream = new GZIPInputStream(byteStream); + InputStreamReader reader = new InputStreamReader(gzipStream, charset); + StringWriter writer = new StringWriter()) { + + int character; + while ((character = reader.read()) != -1) { + writer.write(character); + } + return writer.toString(); + } + } } diff --git a/instrumentation/vertx/vertx-web-3.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/vertx/VertxClientInstrumentationPostTests.java b/instrumentation/vertx/vertx-web-3.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/vertx/VertxClientInstrumentationPostTests.java new file mode 100644 index 000000000..63e26cdc3 --- /dev/null +++ b/instrumentation/vertx/vertx-web-3.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/vertx/VertxClientInstrumentationPostTests.java @@ -0,0 +1,171 @@ +/* + * Copyright The Hypertrace Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opentelemetry.javaagent.instrumentation.hypertrace.vertx; + +import io.opentelemetry.proto.trace.v1.Span; +import io.vertx.core.Vertx; +import io.vertx.core.VertxOptions; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpClient; +import io.vertx.core.http.HttpClientOptions; +import io.vertx.core.http.HttpClientRequest; +import io.vertx.core.http.HttpMethod; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeoutException; +import org.hypertrace.agent.testing.AbstractInstrumenterTest; +import org.hypertrace.agent.testing.TestHttpServer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +public class VertxClientInstrumentationPostTests extends AbstractInstrumenterTest { + + private static final TestHttpServer testHttpServer = new TestHttpServer(); + private final Vertx vertx = Vertx.vertx(new VertxOptions()); + private final HttpClientOptions clientOptions = new HttpClientOptions(); + private final HttpClient httpClient = vertx.createHttpClient(clientOptions); + + @BeforeAll + public static void startServer() throws Exception { + testHttpServer.start(); + } + + @AfterAll + public static void closeServer() throws Exception { + testHttpServer.close(); + } + + @Test + public void postJson_write_end() throws TimeoutException, InterruptedException { + String uri = String.format("http://localhost:%d/echo", testHttpServer.port()); + + CountDownLatch countDownLatch = new CountDownLatch(1); + HttpClientRequest request = httpClient.requestAbs(HttpMethod.POST, uri); + request = request.putHeader("Content-Type", "application/json"); + request.setChunked(true); + VertxClientInstrumentationTest.BufferHandler bufferHandler = + new VertxClientInstrumentationTest.BufferHandler(countDownLatch); + VertxClientInstrumentationTest.ResponseHandler responseHandler = + new VertxClientInstrumentationTest.ResponseHandler(bufferHandler); + + request + .handler(responseHandler) + .write("write") + .write(Buffer.buffer().appendString(" buffer")) + .write(" str_encoding ", "utf-8") + .end(); + countDownLatch.await(); + + TEST_WRITER.waitForTraces(1); + List> traces = + TEST_WRITER.waitForSpans( + 1, + span -> + span.getKind().equals(Span.SpanKind.SPAN_KIND_SERVER) + || span.getKind().equals(Span.SpanKind.SPAN_KIND_INTERNAL)); + Assertions.assertEquals(1, traces.size()); + Span clientSpan = traces.get(0).get(0); + Assertions.assertEquals( + "write buffer str_encoding ", + TEST_WRITER.getAttributesMap(clientSpan).get("http.request.body").getStringValue()); + } + + // Reason: The test OTLP receiver is getting spans from previous tests when tests are run in batch + // TODO: Need to make the otlp receiver to be independent for each test + @Disabled("This is flaky on GHA") + @Test + public void postJson_write_end_string() throws TimeoutException, InterruptedException { + String uri = String.format("http://localhost:%d/echo", testHttpServer.port()); + + CountDownLatch countDownLatch = new CountDownLatch(1); + HttpClientRequest request = httpClient.requestAbs(HttpMethod.POST, uri); + request = request.putHeader("Content-Type", "application/json"); + request.setChunked(true); + VertxClientInstrumentationTest.BufferHandler bufferHandler = + new VertxClientInstrumentationTest.BufferHandler(countDownLatch); + VertxClientInstrumentationTest.ResponseHandler responseHandler = + new VertxClientInstrumentationTest.ResponseHandler(bufferHandler); + + request + .handler(responseHandler) + .write("write") + .write(Buffer.buffer().appendString(" buffer")) + .write(" str_encoding ", "utf-8") + .end("end"); + countDownLatch.await(); + + TEST_WRITER.waitForTraces(1); + List> traces = + TEST_WRITER.waitForSpans( + 1, + span -> + !span.getKind().equals(Span.SpanKind.SPAN_KIND_CLIENT) + || span.getAttributesList().stream() + .noneMatch( + keyValue -> + keyValue.getKey().equals("http.url") + && keyValue.getValue().getStringValue().contains("/echo"))); + Assertions.assertEquals(1, traces.size(), String.format("was: %d", traces.size())); + Span clientSpan = traces.get(0).get(0); + Assertions.assertEquals( + "write buffer str_encoding end", + TEST_WRITER.getAttributesMap(clientSpan).get("http.request.body").getStringValue()); + } + + @Disabled("This is flaky on GHA") + @Test + public void postJson_write_end_buffer() throws TimeoutException, InterruptedException { + String uri = String.format("http://localhost:%d/echo", testHttpServer.port()); + + CountDownLatch countDownLatch = new CountDownLatch(1); + HttpClientRequest request = httpClient.requestAbs(HttpMethod.POST, uri); + request = request.putHeader("Content-Type", "application/json"); + request.setChunked(true); + VertxClientInstrumentationTest.BufferHandler bufferHandler = + new VertxClientInstrumentationTest.BufferHandler(countDownLatch); + VertxClientInstrumentationTest.ResponseHandler responseHandler = + new VertxClientInstrumentationTest.ResponseHandler(bufferHandler); + + request + .handler(responseHandler) + .write("write") + .write(Buffer.buffer().appendString(" buffer")) + .write(" str_encoding ", "utf-8") + .end(Buffer.buffer("end")); + countDownLatch.await(); + + TEST_WRITER.waitForTraces(1); + List> traces = + TEST_WRITER.waitForSpans( + 1, + span -> + !span.getKind().equals(Span.SpanKind.SPAN_KIND_CLIENT) + || span.getAttributesList().stream() + .noneMatch( + keyValue -> + keyValue.getKey().equals("http.url") + && keyValue.getValue().getStringValue().contains("/echo"))); + Assertions.assertEquals(1, traces.size(), String.format("was: %d", traces.size())); + Span clientSpan = traces.get(0).get(0); + Assertions.assertEquals( + "write buffer str_encoding end", + TEST_WRITER.getAttributesMap(clientSpan).get("http.request.body").getStringValue()); + } +} diff --git a/instrumentation/vertx/vertx-web-3.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/vertx/VertxClientInstrumentationTest.java b/instrumentation/vertx/vertx-web-3.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/vertx/VertxClientInstrumentationTest.java index 96e8ca9b3..abf93f1fa 100644 --- a/instrumentation/vertx/vertx-web-3.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/vertx/VertxClientInstrumentationTest.java +++ b/instrumentation/vertx/vertx-web-3.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/vertx/VertxClientInstrumentationTest.java @@ -16,7 +16,6 @@ package io.opentelemetry.javaagent.instrumentation.hypertrace.vertx; -import io.opentelemetry.proto.trace.v1.Span; import io.vertx.core.Handler; import io.vertx.core.Vertx; import io.vertx.core.VertxOptions; @@ -26,14 +25,9 @@ import io.vertx.core.http.HttpClientRequest; import io.vertx.core.http.HttpClientResponse; import io.vertx.core.http.HttpMethod; -import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeoutException; import org.hypertrace.agent.testing.AbstractHttpClientTest; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; public class VertxClientInstrumentationTest extends AbstractHttpClientTest { @@ -120,108 +114,4 @@ public void handle(Buffer responseBodyBuffer) { countDownLatch.countDown(); } } - - @Test - public void postJson_write_end() throws TimeoutException, InterruptedException { - String uri = String.format("http://localhost:%d/echo", testHttpServer.port()); - - CountDownLatch countDownLatch = new CountDownLatch(1); - HttpClientRequest request = httpClient.requestAbs(HttpMethod.POST, uri); - request = request.putHeader("Content-Type", "application/json"); - request.setChunked(true); - BufferHandler bufferHandler = new BufferHandler(countDownLatch); - ResponseHandler responseHandler = new ResponseHandler(bufferHandler); - - request - .handler(responseHandler) - .write("write") - .write(Buffer.buffer().appendString(" buffer")) - .write(" str_encoding ", "utf-8") - .end(); - countDownLatch.await(); - - TEST_WRITER.waitForTraces(1); - List> traces = - TEST_WRITER.waitForSpans( - 1, - span -> - span.getKind().equals(Span.SpanKind.SPAN_KIND_SERVER) - || span.getKind().equals(Span.SpanKind.SPAN_KIND_INTERNAL)); - Assertions.assertEquals(1, traces.size()); - Span clientSpan = traces.get(0).get(0); - Assertions.assertEquals( - "write buffer str_encoding ", - TEST_WRITER.getAttributesMap(clientSpan).get("http.request.body").getStringValue()); - } - - @Test - public void postJson_write_end_string() throws TimeoutException, InterruptedException { - String uri = String.format("http://localhost:%d/echo", testHttpServer.port()); - - CountDownLatch countDownLatch = new CountDownLatch(1); - HttpClientRequest request = httpClient.requestAbs(HttpMethod.POST, uri); - request = request.putHeader("Content-Type", "application/json"); - request.setChunked(true); - BufferHandler bufferHandler = new BufferHandler(countDownLatch); - ResponseHandler responseHandler = new ResponseHandler(bufferHandler); - - request - .handler(responseHandler) - .write("write") - .write(Buffer.buffer().appendString(" buffer")) - .write(" str_encoding ", "utf-8") - .end("end"); - countDownLatch.await(); - - TEST_WRITER.waitForTraces(1); - List> traces = - TEST_WRITER.waitForSpans( - 1, - span -> - span.getKind().equals(Span.SpanKind.SPAN_KIND_SERVER) - || span.getKind().equals(Span.SpanKind.SPAN_KIND_INTERNAL)); - Assertions.assertEquals(1, traces.size(), String.format("was: %d", traces.size())); - Span clientSpan = traces.get(0).get(0); - Assertions.assertEquals( - "write buffer str_encoding end", - TEST_WRITER.getAttributesMap(clientSpan).get("http.request.body").getStringValue()); - } - - @Test - @Disabled("This is flaky on github actions!!") - public void postJson_write_end_buffer() throws TimeoutException, InterruptedException { - String uri = String.format("http://localhost:%d/echo", testHttpServer.port()); - - CountDownLatch countDownLatch = new CountDownLatch(1); - HttpClientRequest request = httpClient.requestAbs(HttpMethod.POST, uri); - request = request.putHeader("Content-Type", "application/json"); - request.setChunked(true); - BufferHandler bufferHandler = new BufferHandler(countDownLatch); - ResponseHandler responseHandler = new ResponseHandler(bufferHandler); - - request - .handler(responseHandler) - .write("write") - .write(Buffer.buffer().appendString(" buffer")) - .write(" str_encoding ", "utf-8") - .end(Buffer.buffer("end")); - countDownLatch.await(); - - TEST_WRITER.waitForTraces(1); - List> traces = - TEST_WRITER.waitForSpans( - 1, - span -> - !span.getKind().equals(Span.SpanKind.SPAN_KIND_CLIENT) - || span.getAttributesList().stream() - .noneMatch( - keyValue -> - keyValue.getKey().equals("http.url") - && keyValue.getValue().getStringValue().contains("/echo"))); - Assertions.assertEquals(1, traces.size(), String.format("was: %d", traces.size())); - Span clientSpan = traces.get(0).get(0); - Assertions.assertEquals( - "write buffer str_encoding end", - TEST_WRITER.getAttributesMap(clientSpan).get("http.request.body").getStringValue()); - } } diff --git a/instrumentation/vertx/vertx-web-3.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/vertx/VertxServerInstrumentationTest.java b/instrumentation/vertx/vertx-web-3.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/vertx/VertxServerInstrumentationTest.java index 7f33390bd..55bccbaa8 100644 --- a/instrumentation/vertx/vertx-web-3.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/vertx/VertxServerInstrumentationTest.java +++ b/instrumentation/vertx/vertx-web-3.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/vertx/VertxServerInstrumentationTest.java @@ -203,4 +203,41 @@ public void postJson(String url) throws IOException, TimeoutException, Interrupt .get("http.response.header." + VertxWebServer.RESPONSE_HEADER_NAME) .getStringValue()); } + + @Test + public void getGzipResponse() throws IOException, TimeoutException, InterruptedException { + Request request = + new Request.Builder() + .url(String.format("http://localhost:%d/gzip", port)) + .header(REQUEST_HEADER_NAME, REQUEST_HEADER_VALUE) + .get() + .build(); + try (Response response = httpClient.newCall(request).execute()) { + Assertions.assertEquals(200, response.code()); + } + + TEST_WRITER.waitForTraces(1); + List> traces = + TEST_WRITER.waitForSpans(1, span -> span.getKind().equals(Span.SpanKind.SPAN_KIND_CLIENT)); + Assertions.assertEquals(1, traces.size()); + List trace = traces.get(0); + Assertions.assertEquals(1, trace.size()); + Span span = trace.get(0); + Assertions.assertNull(TEST_WRITER.getAttributesMap(span).get("http.request.body")); + Assertions.assertEquals( + REQUEST_HEADER_VALUE, + TEST_WRITER + .getAttributesMap(span) + .get("http.request.header." + REQUEST_HEADER_NAME) + .getStringValue()); + Assertions.assertEquals( + "{\"message\":\"Hello\",\"status\":\"success\"}", + TEST_WRITER.getAttributesMap(span).get("http.response.body").getStringValue()); + Assertions.assertEquals( + VertxWebServer.RESPONSE_HEADER_VALUE, + TEST_WRITER + .getAttributesMap(span) + .get("http.response.header." + VertxWebServer.RESPONSE_HEADER_NAME) + .getStringValue()); + } } diff --git a/instrumentation/vertx/vertx-web-3.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/vertx/VertxWebServer.java b/instrumentation/vertx/vertx-web-3.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/vertx/VertxWebServer.java index 6d93a00d6..bd8034800 100644 --- a/instrumentation/vertx/vertx-web-3.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/vertx/VertxWebServer.java +++ b/instrumentation/vertx/vertx-web-3.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/vertx/VertxWebServer.java @@ -18,7 +18,13 @@ import io.vertx.core.AbstractVerticle; import io.vertx.core.Future; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.json.JsonObject; import io.vertx.ext.web.Router; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.zip.GZIPOutputStream; import org.junit.jupiter.api.Assertions; public class VertxWebServer extends AbstractVerticle { @@ -79,6 +85,33 @@ public void start(Future startFuture) { ctx.response().putHeader(RESPONSE_HEADER_NAME, RESPONSE_HEADER_VALUE); ctx.response().end(); }); + router + .route("/gzip") + .handler( + ctx -> { + JsonObject jsonResponse = + new JsonObject().put("message", "Hello").put("status", "success"); + + byte[] jsonBytes = jsonResponse.encode().getBytes(StandardCharsets.UTF_8); + + // Compress the bytes using GZIP + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + try (GZIPOutputStream gzipOutputStream = + new GZIPOutputStream(byteArrayOutputStream)) { + gzipOutputStream.write(jsonBytes); + } catch (IOException e) { + ctx.fail(500); + return; + } + + // Convert the compressed bytes to a Vert.x Buffer + Buffer gzipBuffer = Buffer.buffer(byteArrayOutputStream.toByteArray()); + ctx.response().setStatusCode(200); + ctx.response().putHeader("Content-Encoding", "gzip"); + ctx.response().putHeader("Content-Type", "application/json"); + ctx.response().putHeader(RESPONSE_HEADER_NAME, RESPONSE_HEADER_VALUE); + ctx.response().end(gzipBuffer); + }); vertx .createHttpServer() diff --git a/javaagent-core/src/main/java/org/hypertrace/agent/core/instrumentation/HypertraceSemanticAttributes.java b/javaagent-core/src/main/java/org/hypertrace/agent/core/instrumentation/HypertraceSemanticAttributes.java index 20b5fd171..2bc005349 100644 --- a/javaagent-core/src/main/java/org/hypertrace/agent/core/instrumentation/HypertraceSemanticAttributes.java +++ b/javaagent-core/src/main/java/org/hypertrace/agent/core/instrumentation/HypertraceSemanticAttributes.java @@ -51,6 +51,9 @@ public static AttributeKey httpResponseHeader(String header) { public static final AttributeKey HTTP_RESPONSE_HEADER_CONTENT_TYPE = AttributeKey.stringKey("http.response.header.content-type"); + public static final AttributeKey HTTP_RESPONSE_HEADER_CONTENT_ENCODING = + AttributeKey.stringKey("http.response.header.content-encoding"); + public static final AttributeKey RPC_REQUEST_BODY = AttributeKey.stringKey("rpc.request.body"); public static final AttributeKey RPC_RESPONSE_BODY = diff --git a/javaagent-core/src/main/java/org/hypertrace/agent/core/instrumentation/SpanAndBuffer.java b/javaagent-core/src/main/java/org/hypertrace/agent/core/instrumentation/SpanAndBuffer.java index 35732dc79..8e129e4ed 100644 --- a/javaagent-core/src/main/java/org/hypertrace/agent/core/instrumentation/SpanAndBuffer.java +++ b/javaagent-core/src/main/java/org/hypertrace/agent/core/instrumentation/SpanAndBuffer.java @@ -26,15 +26,18 @@ public class SpanAndBuffer { public final BoundedByteArrayOutputStream byteArrayBuffer; public final AttributeKey attributeKey; public final Charset charset; + public final String contentEncoding; public SpanAndBuffer( Span span, BoundedByteArrayOutputStream byteArrayBuffer, AttributeKey attributeKey, - Charset charset) { + Charset charset, + String contentEncoding) { this.span = span; this.byteArrayBuffer = byteArrayBuffer; this.attributeKey = attributeKey; this.charset = charset; + this.contentEncoding = contentEncoding; } } diff --git a/javaagent-core/src/main/java/org/hypertrace/agent/core/instrumentation/utils/ContentTypeCharsetUtils.java b/javaagent-core/src/main/java/org/hypertrace/agent/core/instrumentation/utils/ContentTypeCharsetUtils.java index cfece2e49..8e397bfb5 100644 --- a/javaagent-core/src/main/java/org/hypertrace/agent/core/instrumentation/utils/ContentTypeCharsetUtils.java +++ b/javaagent-core/src/main/java/org/hypertrace/agent/core/instrumentation/utils/ContentTypeCharsetUtils.java @@ -41,4 +41,8 @@ public static Charset toCharset(String charsetName) { } return DEFAULT_CHARSET; } + + public static Charset getDefaultCharset() { + return DEFAULT_CHARSET; + } } diff --git a/testing-common/src/main/java/org/hypertrace/agent/testing/AbstractHttpClientTest.java b/testing-common/src/main/java/org/hypertrace/agent/testing/AbstractHttpClientTest.java index d71cdd21f..d6b7fad75 100644 --- a/testing-common/src/main/java/org/hypertrace/agent/testing/AbstractHttpClientTest.java +++ b/testing-common/src/main/java/org/hypertrace/agent/testing/AbstractHttpClientTest.java @@ -39,7 +39,7 @@ public abstract class AbstractHttpClientTest extends AbstractInstrumenterTest { private static final String ECHO_PATH_FORMAT = "http://localhost:%d/echo"; private static final String GET_NO_CONTENT_PATH_FORMAT = "http://localhost:%d/get_no_content"; private static final String GET_JSON_PATH_FORMAT = "http://localhost:%d/get_json"; - + private static final String GET_GZIP_FORMAT = "http://localhost:%d/gzip"; private static final String HEADER_NAME = "headername"; private static final String HEADER_VALUE = "headerValue"; private static final Map headers; @@ -308,6 +308,56 @@ public void getJson() } } + @Test + public void getGzipResponse() + throws IOException, TimeoutException, InterruptedException, ExecutionException { + String uri = String.format(GET_GZIP_FORMAT, testHttpServer.port()); + Response response = doGetRequest(uri, headers); + + Assertions.assertEquals(200, response.statusCode); + + TEST_WRITER.waitForTraces(1); + List> traces; + if (hasResponseBodySpan) { + traces = + TEST_WRITER.waitForSpans( + 2, span -> span.getKind().equals(Span.SpanKind.SPAN_KIND_SERVER)); + } else { + traces = + TEST_WRITER.waitForSpans( + 1, + span -> + !span.getKind().equals(Span.SpanKind.SPAN_KIND_CLIENT) + || span.getAttributesList().stream() + .noneMatch( + keyValue -> + keyValue.getKey().equals("http.url") + && keyValue.getValue().getStringValue().contains("/gzip"))); + } + + Assertions.assertEquals(1, traces.size()); + Span clientSpan = traces.get(0).get(0); + if (hasResponseBodySpan) { + Assertions.assertEquals(2, traces.get(0).size()); + Span responseBodySpan = traces.get(0).get(1); + if (traces.get(0).get(1).getKind().equals(Span.SpanKind.SPAN_KIND_CLIENT)) { + responseBodySpan = traces.get(0).get(0); + } + Assertions.assertNull(TEST_WRITER.getAttributesMap(clientSpan).get("http.request.body")); + + String respBodyCapturedInSpan = + TEST_WRITER.getAttributesMap(responseBodySpan).get("http.response.body").getStringValue(); + Assertions.assertEquals(TestHttpServer.GzipHandler.RESPONSE_BODY, respBodyCapturedInSpan); + } else { + Assertions.assertNull(TEST_WRITER.getAttributesMap(clientSpan).get("http.request.body")); + + Assertions.assertEquals(1, traces.get(0).size()); + String respBodyCapturedInSpan = + TEST_WRITER.getAttributesMap(clientSpan).get("http.response.body").getStringValue(); + Assertions.assertEquals(TestHttpServer.GzipHandler.RESPONSE_BODY, respBodyCapturedInSpan); + } + } + private void assertHeaders(Span span) { Assertions.assertEquals( TestHttpServer.RESPONSE_HEADER_VALUE, diff --git a/testing-common/src/main/java/org/hypertrace/agent/testing/TestHttpServer.java b/testing-common/src/main/java/org/hypertrace/agent/testing/TestHttpServer.java index 2ebb0f70b..7e49ab1c4 100644 --- a/testing-common/src/main/java/org/hypertrace/agent/testing/TestHttpServer.java +++ b/testing-common/src/main/java/org/hypertrace/agent/testing/TestHttpServer.java @@ -18,6 +18,8 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.zip.GZIPOutputStream; import javax.servlet.ServletInputStream; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -42,6 +44,7 @@ public void start() throws Exception { handlerList.addHandler(new PostHandler()); handlerList.addHandler(new PostRedirect()); handlerList.addHandler(new EchoHandler()); + handlerList.addHandler(new GzipHandler()); server.setHandler(handlerList); server.start(); } @@ -179,4 +182,37 @@ public void handle( } } } + + public static class GzipHandler extends ResponseTestHeadersHandler { + public static final String RESPONSE_BODY = "{\"message\": \"hello\"}"; + + @Override + public void handle( + String target, + Request baseRequest, + HttpServletRequest request, + HttpServletResponse response) + throws IOException { + super.handle(target, baseRequest, request, response); + + if (target.equals("/gzip") && "get".equalsIgnoreCase(request.getMethod())) { + response.setStatus(HttpServletResponse.SC_OK); + response.setHeader("Content-Encoding", "gzip"); + response.setContentType("application/json"); + + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + try (GZIPOutputStream gzipOut = new GZIPOutputStream(byteArrayOutputStream)) { + gzipOut.write(RESPONSE_BODY.getBytes(StandardCharsets.UTF_8)); + gzipOut.finish(); + } + + byte[] gzipData = byteArrayOutputStream.toByteArray(); + response.setContentLength(gzipData.length); + response.getOutputStream().write(gzipData); + response.getOutputStream().flush(); + + baseRequest.setHandled(true); + } + } + } }