Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OkHTTP3 - fix response charset and adding support for gzip compression #405

Merged
merged 11 commits into from
Sep 10, 2024
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright The Hypertrace Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.opentelemetry.instrumentation.hypertrace.apachehttpasyncclient;

import io.opentelemetry.proto.trace.v1.Span;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.zip.GZIPInputStream;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.hypertrace.agent.testing.AbstractInstrumenterTest;
import org.hypertrace.agent.testing.TestHttpServer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

class ApacheAsyncClientGzipHandlingTest extends AbstractInstrumenterTest {

private static final TestHttpServer testHttpServer = new TestHttpServer();

private static final CloseableHttpAsyncClient client = HttpAsyncClients.createDefault();

@BeforeAll
public static void startServer() throws Exception {
testHttpServer.start();
client.start();
}

@AfterAll
public static void closeServer() throws Exception {
testHttpServer.close();
}

@Test
public void getGzipResponse()
throws ExecutionException, InterruptedException, TimeoutException, IOException {
HttpGet getRequest =
new HttpGet(String.format("http://localhost:%s/gzip", testHttpServer.port()));
getRequest.addHeader("foo", "bar");
Future<HttpResponse> futureResponse =
client.execute(
getRequest, new ApacheAsyncClientInstrumentationModuleTest.NoopFutureCallback());

HttpResponse response = futureResponse.get();
Assertions.assertEquals(200, response.getStatusLine().getStatusCode());
try (InputStream gzipStream = new GZIPInputStream(response.getEntity().getContent())) {
String responseBody = readInputStream(gzipStream);
Assertions.assertEquals(TestHttpServer.GzipHandler.RESPONSE_BODY, responseBody);
}

TEST_WRITER.waitForTraces(1);
// exclude server spans
List<List<Span>> traces =
TEST_WRITER.waitForSpans(
2,
span ->
span.getKind().equals(Span.SpanKind.SPAN_KIND_SERVER)
|| span.getAttributesList().stream()
.noneMatch(
keyValue ->
keyValue.getKey().equals("http.response.header.content-encoding")
&& keyValue.getValue().getStringValue().contains("gzip")));
Assertions.assertEquals(1, traces.size());
Assertions.assertEquals(2, traces.get(0).size());
Span clientSpan = traces.get(0).get(1);
Span responseBodySpan = traces.get(0).get(0);
if (traces.get(0).get(0).getKind().equals(Span.SpanKind.SPAN_KIND_CLIENT)) {
clientSpan = traces.get(0).get(0);
responseBodySpan = traces.get(0).get(1);
}

Assertions.assertEquals(
"test-value",
TEST_WRITER
.getAttributesMap(clientSpan)
.get("http.response.header.test-response-header")
.getStringValue());
Assertions.assertEquals(
"bar",
TEST_WRITER.getAttributesMap(clientSpan).get("http.request.header.foo").getStringValue());
Assertions.assertNull(TEST_WRITER.getAttributesMap(clientSpan).get("http.request.body"));

Assertions.assertEquals(
TestHttpServer.GzipHandler.RESPONSE_BODY,
TEST_WRITER.getAttributesMap(responseBodySpan).get("http.response.body").getStringValue());
}

private String readInputStream(InputStream inputStream) throws IOException {
StringBuilder textBuilder = new StringBuilder();

try (BufferedReader reader =
new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
int c;
while ((c = reader.read()) != -1) {
textBuilder.append((char) c);
}
}
return textBuilder.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.javaagent.instrumentation.hypertrace.apachehttpclient.v4_0.ApacheHttpClientObjectRegistry.SpanAndAttributeKey;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.nio.charset.Charset;
import java.util.function.Function;
import java.util.zip.GZIPInputStream;
import org.apache.http.Header;
import org.apache.http.HeaderIterator;
import org.apache.http.HttpEntity;
Expand Down Expand Up @@ -100,26 +103,32 @@ public static void traceEntity(
if (contentType == null || !ContentTypeUtils.shouldCapture(contentType.getValue())) {
return;
}

String charsetStr = ContentTypeUtils.parseCharset(contentType.getValue());
Charset charset = ContentTypeCharsetUtils.toCharset(charsetStr);

// Get the content encoding header and check if it's gzip
Header contentEncoding = entity.getContentEncoding();
boolean isGzipEncoded =
contentEncoding != null
&& contentEncoding.getValue() != null
&& contentEncoding.getValue().toLowerCase().contains("gzip");
if (entity.isRepeatable()) {
try {
BoundedByteArrayOutputStream byteArrayOutputStream =
BoundedBuffersFactory.createStream(charset);
entity.writeTo(byteArrayOutputStream);

try {
String body = byteArrayOutputStream.toStringWithSuppliedCharset();
span.setAttribute(bodyAttributeKey, body);
} catch (UnsupportedEncodingException e) {
log.error("Could not parse charset from encoding {}", charsetStr, e);
InputStream contentStream = entity.getContent();
if (isGzipEncoded) {
try {
contentStream = new GZIPInputStream(contentStream);
} catch (IOException e) {
log.error("Failed to create GZIPInputStream", e);
return;
}
}

String body = readInputStream(contentStream, charset);
span.setAttribute(bodyAttributeKey, body);

} catch (IOException e) {
log.error("Could not read request input stream from repeatable request entity/body", e);
throw new RuntimeException(e);
}

return;
}

Expand All @@ -133,4 +142,18 @@ public static void traceEntity(
ApacheHttpClientObjectRegistry.entityToSpan.put(
entity, new SpanAndAttributeKey(span, bodyAttributeKey));
}

public static String readInputStream(InputStream inputStream, Charset charset)
throws IOException {
BoundedByteArrayOutputStream outputStream = BoundedBuffersFactory.createStream(charset);
try (InputStreamReader reader = new InputStreamReader(inputStream, charset);
OutputStreamWriter writer = new OutputStreamWriter(outputStream, charset)) {
int c;
while ((c = reader.read()) != -1) {
writer.write(c);
}
writer.flush();
}
return outputStream.toStringWithSuppliedCharset();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,18 @@ public static void exit(@Advice.This HttpEntity thizz, @Advice.Return InputStrea
}
Charset charset = ContentTypeCharsetUtils.toCharset(charsetStr);

String contentEncoding = null;
Header contentEncodingHeader = thizz.getContentEncoding();
if (contentEncodingHeader != null) {
contentEncoding = contentEncodingHeader.getValue();
}
SpanAndBuffer spanAndBuffer =
new SpanAndBuffer(
clientSpan.span,
BoundedBuffersFactory.createStream((int) contentSize, charset),
clientSpan.attributeKey,
charset);
charset,
contentEncoding);
VirtualField.find(InputStream.class, SpanAndBuffer.class).set(inputStream, spanAndBuffer);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,8 @@ public static void exit(@Advice.This InputStream thizz, @Advice.Return int avail
spanAndBuffer.span,
spanAndBuffer.attributeKey,
spanAndBuffer.byteArrayBuffer,
spanAndBuffer.charset);
spanAndBuffer.charset,
spanAndBuffer.contentEncoding);
contextStore.set(thizz, null);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,22 @@
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.charset.Charset;
import java.util.zip.GZIPInputStream;
import org.hypertrace.agent.core.instrumentation.HypertraceCallDepthThreadLocalMap;
import org.hypertrace.agent.core.instrumentation.HypertraceSemanticAttributes;
import org.hypertrace.agent.core.instrumentation.SpanAndBuffer;
import org.hypertrace.agent.core.instrumentation.buffer.BoundedBuffersFactory;
import org.hypertrace.agent.core.instrumentation.buffer.BoundedByteArrayOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -85,6 +91,13 @@ public static void addAttribute(Span span, AttributeKey<String> attributeKey, St
spanBuilder.setAttribute(
"http.response.header.content-type", (String) resContentType);
}
Object resContentEncoding =
getAttribute.invoke(
span, HypertraceSemanticAttributes.HTTP_RESPONSE_HEADER_CONTENT_ENCODING);
if (resContentEncoding != null) {
spanBuilder.setAttribute(
"http.response.header.content-encoding", (String) resContentEncoding);
}
}
} catch (IllegalAccessException | InvocationTargetException e) {
// ignore and continue
Expand All @@ -100,12 +113,31 @@ public static void addAttribute(Span span, AttributeKey<String> attributeKey, St
}

public static void addBody(
Span span, AttributeKey<String> attributeKey, ByteArrayOutputStream buffer, Charset charset) {
Span span,
AttributeKey<String> attributeKey,
ByteArrayOutputStream buffer,
Charset charset,
String contentEncoding) {
try {
String body = buffer.toString(charset.name());
InputStreamUtils.addAttribute(span, attributeKey, body);
byte[] data = buffer.toByteArray();

// if content-encoding is gzip,
if (contentEncoding != null && contentEncoding.toLowerCase().contains("gzip")) {
try (GZIPInputStream gzipInputStream =
new GZIPInputStream(new ByteArrayInputStream(data))) {
InputStreamReader reader = new InputStreamReader(gzipInputStream, charset);
String body = readInputStream(reader, charset);
InputStreamUtils.addAttribute(span, attributeKey, body);
}
} else {
// No decompression needed, convert directly to string
String body = new String(data, charset);
InputStreamUtils.addAttribute(span, attributeKey, body);
}
} catch (UnsupportedEncodingException e) {
log.error("Failed to parse encofing from charset {}", charset, e);
log.error("Failed to parse encoding from charset {}", charset, e);
} catch (IOException e) {
log.error("Failed to read or decompress data", e);
}
}

Expand All @@ -132,7 +164,8 @@ public static void read(
spanAndBuffer.span,
spanAndBuffer.attributeKey,
spanAndBuffer.byteArrayBuffer,
spanAndBuffer.charset);
spanAndBuffer.charset,
spanAndBuffer.contentEncoding);
contextStore.set(inputStream, null);
}
}
Expand All @@ -146,7 +179,8 @@ public static void read(
spanAndBuffer.span,
spanAndBuffer.attributeKey,
spanAndBuffer.byteArrayBuffer,
spanAndBuffer.charset);
spanAndBuffer.charset,
spanAndBuffer.contentEncoding);
VirtualField.find(InputStream.class, SpanAndBuffer.class).set(inputStream, null);
}
}
Expand All @@ -166,7 +200,8 @@ public static void read(
spanAndBuffer.span,
spanAndBuffer.attributeKey,
spanAndBuffer.byteArrayBuffer,
spanAndBuffer.charset);
spanAndBuffer.charset,
spanAndBuffer.contentEncoding);
contextStore.set(inputStream, null);
}
}
Expand Down Expand Up @@ -194,10 +229,24 @@ public static void readNBytes(
spanAndBuffer.span,
spanAndBuffer.attributeKey,
spanAndBuffer.byteArrayBuffer,
spanAndBuffer.charset);
spanAndBuffer.charset,
spanAndBuffer.contentEncoding);
contextStore.set(inputStream, null);
} else {
spanAndBuffer.byteArrayBuffer.write(b, off, read);
}
}

public static String readInputStream(InputStreamReader inputReader, Charset charset)
throws IOException {
BoundedByteArrayOutputStream outputStream = BoundedBuffersFactory.createStream(charset);
try (OutputStreamWriter writer = new OutputStreamWriter(outputStream, charset)) {
int c;
while ((c = inputReader.read()) != -1) {
writer.write(c);
}
writer.flush();
}
return outputStream.toStringWithSuppliedCharset();
}
}
Loading
Loading