Skip to content

Commit

Permalink
OkHTTP3 - fix response charset and adding support for gzip compression (
Browse files Browse the repository at this point in the history
#405)

* fix response charset and adding support for gzip compression

* adding tests

* adding support for other clients

* fixing apacheasync http client test

* disabling flaky tests
  • Loading branch information
thugrock7 authored Sep 10, 2024
1 parent 5813589 commit e16b79e
Show file tree
Hide file tree
Showing 49 changed files with 1,515 additions and 172 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright The Hypertrace Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.opentelemetry.instrumentation.hypertrace.apachehttpasyncclient;

import io.opentelemetry.proto.trace.v1.Span;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.zip.GZIPInputStream;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.hypertrace.agent.testing.AbstractInstrumenterTest;
import org.hypertrace.agent.testing.TestHttpServer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

class ApacheAsyncClientGzipHandlingTest extends AbstractInstrumenterTest {

private static final TestHttpServer testHttpServer = new TestHttpServer();

private static final CloseableHttpAsyncClient client = HttpAsyncClients.createDefault();

@BeforeAll
public static void startServer() throws Exception {
testHttpServer.start();
client.start();
}

@AfterAll
public static void closeServer() throws Exception {
testHttpServer.close();
}

@Test
public void getGzipResponse()
throws ExecutionException, InterruptedException, TimeoutException, IOException {
HttpGet getRequest =
new HttpGet(String.format("http://localhost:%s/gzip", testHttpServer.port()));
getRequest.addHeader("foo", "bar");
Future<HttpResponse> futureResponse =
client.execute(
getRequest, new ApacheAsyncClientInstrumentationModuleTest.NoopFutureCallback());

HttpResponse response = futureResponse.get();
Assertions.assertEquals(200, response.getStatusLine().getStatusCode());
try (InputStream gzipStream = new GZIPInputStream(response.getEntity().getContent())) {
String responseBody = readInputStream(gzipStream);
Assertions.assertEquals(TestHttpServer.GzipHandler.RESPONSE_BODY, responseBody);
}

TEST_WRITER.waitForTraces(1);
// exclude server spans
List<List<Span>> traces =
TEST_WRITER.waitForSpans(
2,
span ->
span.getKind().equals(Span.SpanKind.SPAN_KIND_SERVER)
|| span.getAttributesList().stream()
.noneMatch(
keyValue ->
keyValue.getKey().equals("http.response.header.content-encoding")
&& keyValue.getValue().getStringValue().contains("gzip")));
Assertions.assertEquals(1, traces.size());
Assertions.assertEquals(2, traces.get(0).size());
Span clientSpan = traces.get(0).get(1);
Span responseBodySpan = traces.get(0).get(0);
if (traces.get(0).get(0).getKind().equals(Span.SpanKind.SPAN_KIND_CLIENT)) {
clientSpan = traces.get(0).get(0);
responseBodySpan = traces.get(0).get(1);
}

Assertions.assertEquals(
"test-value",
TEST_WRITER
.getAttributesMap(clientSpan)
.get("http.response.header.test-response-header")
.getStringValue());
Assertions.assertEquals(
"bar",
TEST_WRITER.getAttributesMap(clientSpan).get("http.request.header.foo").getStringValue());
Assertions.assertNull(TEST_WRITER.getAttributesMap(clientSpan).get("http.request.body"));

Assertions.assertEquals(
TestHttpServer.GzipHandler.RESPONSE_BODY,
TEST_WRITER.getAttributesMap(responseBodySpan).get("http.response.body").getStringValue());
}

private String readInputStream(InputStream inputStream) throws IOException {
StringBuilder textBuilder = new StringBuilder();

try (BufferedReader reader =
new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
int c;
while ((c = reader.read()) != -1) {
textBuilder.append((char) c);
}
}
return textBuilder.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.javaagent.instrumentation.hypertrace.apachehttpclient.v4_0.ApacheHttpClientObjectRegistry.SpanAndAttributeKey;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.nio.charset.Charset;
import java.util.function.Function;
import java.util.zip.GZIPInputStream;
import org.apache.http.Header;
import org.apache.http.HeaderIterator;
import org.apache.http.HttpEntity;
Expand Down Expand Up @@ -100,26 +103,32 @@ public static void traceEntity(
if (contentType == null || !ContentTypeUtils.shouldCapture(contentType.getValue())) {
return;
}

String charsetStr = ContentTypeUtils.parseCharset(contentType.getValue());
Charset charset = ContentTypeCharsetUtils.toCharset(charsetStr);

// Get the content encoding header and check if it's gzip
Header contentEncoding = entity.getContentEncoding();
boolean isGzipEncoded =
contentEncoding != null
&& contentEncoding.getValue() != null
&& contentEncoding.getValue().toLowerCase().contains("gzip");
if (entity.isRepeatable()) {
try {
BoundedByteArrayOutputStream byteArrayOutputStream =
BoundedBuffersFactory.createStream(charset);
entity.writeTo(byteArrayOutputStream);

try {
String body = byteArrayOutputStream.toStringWithSuppliedCharset();
span.setAttribute(bodyAttributeKey, body);
} catch (UnsupportedEncodingException e) {
log.error("Could not parse charset from encoding {}", charsetStr, e);
InputStream contentStream = entity.getContent();
if (isGzipEncoded) {
try {
contentStream = new GZIPInputStream(contentStream);
} catch (IOException e) {
log.error("Failed to create GZIPInputStream", e);
return;
}
}

String body = readInputStream(contentStream, charset);
span.setAttribute(bodyAttributeKey, body);

} catch (IOException e) {
log.error("Could not read request input stream from repeatable request entity/body", e);
throw new RuntimeException(e);
}

return;
}

Expand All @@ -133,4 +142,18 @@ public static void traceEntity(
ApacheHttpClientObjectRegistry.entityToSpan.put(
entity, new SpanAndAttributeKey(span, bodyAttributeKey));
}

public static String readInputStream(InputStream inputStream, Charset charset)
throws IOException {
BoundedByteArrayOutputStream outputStream = BoundedBuffersFactory.createStream(charset);
try (InputStreamReader reader = new InputStreamReader(inputStream, charset);
OutputStreamWriter writer = new OutputStreamWriter(outputStream, charset)) {
int c;
while ((c = reader.read()) != -1) {
writer.write(c);
}
writer.flush();
}
return outputStream.toStringWithSuppliedCharset();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,18 @@ public static void exit(@Advice.This HttpEntity thizz, @Advice.Return InputStrea
}
Charset charset = ContentTypeCharsetUtils.toCharset(charsetStr);

String contentEncoding = null;
Header contentEncodingHeader = thizz.getContentEncoding();
if (contentEncodingHeader != null) {
contentEncoding = contentEncodingHeader.getValue();
}
SpanAndBuffer spanAndBuffer =
new SpanAndBuffer(
clientSpan.span,
BoundedBuffersFactory.createStream((int) contentSize, charset),
clientSpan.attributeKey,
charset);
charset,
contentEncoding);
VirtualField.find(InputStream.class, SpanAndBuffer.class).set(inputStream, spanAndBuffer);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,8 @@ public static void exit(@Advice.This InputStream thizz, @Advice.Return int avail
spanAndBuffer.span,
spanAndBuffer.attributeKey,
spanAndBuffer.byteArrayBuffer,
spanAndBuffer.charset);
spanAndBuffer.charset,
spanAndBuffer.contentEncoding);
contextStore.set(thizz, null);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,22 @@
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.charset.Charset;
import java.util.zip.GZIPInputStream;
import org.hypertrace.agent.core.instrumentation.HypertraceCallDepthThreadLocalMap;
import org.hypertrace.agent.core.instrumentation.HypertraceSemanticAttributes;
import org.hypertrace.agent.core.instrumentation.SpanAndBuffer;
import org.hypertrace.agent.core.instrumentation.buffer.BoundedBuffersFactory;
import org.hypertrace.agent.core.instrumentation.buffer.BoundedByteArrayOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -85,6 +91,13 @@ public static void addAttribute(Span span, AttributeKey<String> attributeKey, St
spanBuilder.setAttribute(
"http.response.header.content-type", (String) resContentType);
}
Object resContentEncoding =
getAttribute.invoke(
span, HypertraceSemanticAttributes.HTTP_RESPONSE_HEADER_CONTENT_ENCODING);
if (resContentEncoding != null) {
spanBuilder.setAttribute(
"http.response.header.content-encoding", (String) resContentEncoding);
}
}
} catch (IllegalAccessException | InvocationTargetException e) {
// ignore and continue
Expand All @@ -100,12 +113,31 @@ public static void addAttribute(Span span, AttributeKey<String> attributeKey, St
}

public static void addBody(
Span span, AttributeKey<String> attributeKey, ByteArrayOutputStream buffer, Charset charset) {
Span span,
AttributeKey<String> attributeKey,
ByteArrayOutputStream buffer,
Charset charset,
String contentEncoding) {
try {
String body = buffer.toString(charset.name());
InputStreamUtils.addAttribute(span, attributeKey, body);
byte[] data = buffer.toByteArray();

// if content-encoding is gzip,
if (contentEncoding != null && contentEncoding.toLowerCase().contains("gzip")) {
try (GZIPInputStream gzipInputStream =
new GZIPInputStream(new ByteArrayInputStream(data))) {
InputStreamReader reader = new InputStreamReader(gzipInputStream, charset);
String body = readInputStream(reader, charset);
InputStreamUtils.addAttribute(span, attributeKey, body);
}
} else {
// No decompression needed, convert directly to string
String body = new String(data, charset);
InputStreamUtils.addAttribute(span, attributeKey, body);
}
} catch (UnsupportedEncodingException e) {
log.error("Failed to parse encofing from charset {}", charset, e);
log.error("Failed to parse encoding from charset {}", charset, e);
} catch (IOException e) {
log.error("Failed to read or decompress data", e);
}
}

Expand All @@ -132,7 +164,8 @@ public static void read(
spanAndBuffer.span,
spanAndBuffer.attributeKey,
spanAndBuffer.byteArrayBuffer,
spanAndBuffer.charset);
spanAndBuffer.charset,
spanAndBuffer.contentEncoding);
contextStore.set(inputStream, null);
}
}
Expand All @@ -146,7 +179,8 @@ public static void read(
spanAndBuffer.span,
spanAndBuffer.attributeKey,
spanAndBuffer.byteArrayBuffer,
spanAndBuffer.charset);
spanAndBuffer.charset,
spanAndBuffer.contentEncoding);
VirtualField.find(InputStream.class, SpanAndBuffer.class).set(inputStream, null);
}
}
Expand All @@ -166,7 +200,8 @@ public static void read(
spanAndBuffer.span,
spanAndBuffer.attributeKey,
spanAndBuffer.byteArrayBuffer,
spanAndBuffer.charset);
spanAndBuffer.charset,
spanAndBuffer.contentEncoding);
contextStore.set(inputStream, null);
}
}
Expand Down Expand Up @@ -194,10 +229,24 @@ public static void readNBytes(
spanAndBuffer.span,
spanAndBuffer.attributeKey,
spanAndBuffer.byteArrayBuffer,
spanAndBuffer.charset);
spanAndBuffer.charset,
spanAndBuffer.contentEncoding);
contextStore.set(inputStream, null);
} else {
spanAndBuffer.byteArrayBuffer.write(b, off, read);
}
}

public static String readInputStream(InputStreamReader inputReader, Charset charset)
throws IOException {
BoundedByteArrayOutputStream outputStream = BoundedBuffersFactory.createStream(charset);
try (OutputStreamWriter writer = new OutputStreamWriter(outputStream, charset)) {
int c;
while ((c = inputReader.read()) != -1) {
writer.write(c);
}
writer.flush();
}
return outputStream.toStringWithSuppliedCharset();
}
}
Loading

0 comments on commit e16b79e

Please sign in to comment.