Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OkHTTP3 - fix response charset and adding support for gzip compression #405

Merged
merged 11 commits into from
Sep 10, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.zip.GZIPInputStream;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
Expand Down Expand Up @@ -127,6 +127,51 @@ public void postJsonNonRepeatableEntity()
postJsonEntity(entity);
}

@Disabled("This is flaky !!")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this flaky? It should run properly always.

Copy link
Contributor Author

@thugrock7 thugrock7 Sep 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test always passes when it alone is executed. When running in batch with other tests, the receiver is getting some unwanted spans and assertions being failed. I tried excluding them in TEST_WRITER.waitForSpans(count, exclude_predicate), but couldn't make it work with batch

@Test
public void getGzipResponse()
throws ExecutionException, InterruptedException, TimeoutException, IOException {
HttpGet getRequest =
new HttpGet(String.format("http://localhost:%s/gzip", testHttpServer.port()));
getRequest.addHeader("foo", "bar");
Future<HttpResponse> futureResponse = client.execute(getRequest, new NoopFutureCallback());

HttpResponse response = futureResponse.get();
Assertions.assertEquals(200, response.getStatusLine().getStatusCode());
try (InputStream gzipStream = new GZIPInputStream(response.getEntity().getContent())) {
String responseBody = readInputStream(gzipStream);
Assertions.assertEquals(TestHttpServer.GzipHandler.RESPONSE_BODY, responseBody);
}

TEST_WRITER.waitForTraces(1);
// exclude server spans
List<List<Span>> traces =
TEST_WRITER.waitForSpans(2, span -> span.getKind().equals(Span.SpanKind.SPAN_KIND_SERVER));
Assertions.assertEquals(1, traces.size());
Assertions.assertEquals(2, traces.get(0).size());
Span clientSpan = traces.get(0).get(1);
Span responseBodySpan = traces.get(0).get(0);
if (traces.get(0).get(0).getKind().equals(Span.SpanKind.SPAN_KIND_CLIENT)) {
clientSpan = traces.get(0).get(0);
responseBodySpan = traces.get(0).get(1);
}

Assertions.assertEquals(
"test-value",
TEST_WRITER
.getAttributesMap(clientSpan)
.get("http.response.header.test-response-header")
.getStringValue());
Assertions.assertEquals(
"bar",
TEST_WRITER.getAttributesMap(clientSpan).get("http.request.header.foo").getStringValue());
Assertions.assertNull(TEST_WRITER.getAttributesMap(clientSpan).get("http.request.body"));

Assertions.assertEquals(
TestHttpServer.GzipHandler.RESPONSE_BODY,
TEST_WRITER.getAttributesMap(responseBodySpan).get("http.response.body").getStringValue());
}

public void postJsonEntity(HttpEntity entity)
throws TimeoutException, InterruptedException, IOException, ExecutionException {
HttpPost postRequest = new HttpPost();
Expand Down Expand Up @@ -165,8 +210,7 @@ private static String readInputStream(InputStream inputStream) throws IOExceptio
StringBuilder textBuilder = new StringBuilder();

try (BufferedReader reader =
new BufferedReader(
new InputStreamReader(inputStream, Charset.forName(StandardCharsets.UTF_8.name())))) {
new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
int c;
while ((c = reader.read()) != -1) {
textBuilder.append((char) c);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.javaagent.instrumentation.hypertrace.apachehttpclient.v4_0.ApacheHttpClientObjectRegistry.SpanAndAttributeKey;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.nio.charset.Charset;
import java.util.function.Function;
import java.util.zip.GZIPInputStream;
import org.apache.http.Header;
import org.apache.http.HeaderIterator;
import org.apache.http.HttpEntity;
Expand Down Expand Up @@ -100,26 +103,32 @@ public static void traceEntity(
if (contentType == null || !ContentTypeUtils.shouldCapture(contentType.getValue())) {
return;
}

String charsetStr = ContentTypeUtils.parseCharset(contentType.getValue());
Charset charset = ContentTypeCharsetUtils.toCharset(charsetStr);

// Get the content encoding header and check if it's gzip
Header contentEncoding = entity.getContentEncoding();
boolean isGzipEncoded =
contentEncoding != null
&& contentEncoding.getValue() != null
&& contentEncoding.getValue().toLowerCase().contains("gzip");
if (entity.isRepeatable()) {
try {
BoundedByteArrayOutputStream byteArrayOutputStream =
BoundedBuffersFactory.createStream(charset);
entity.writeTo(byteArrayOutputStream);

try {
String body = byteArrayOutputStream.toStringWithSuppliedCharset();
span.setAttribute(bodyAttributeKey, body);
} catch (UnsupportedEncodingException e) {
log.error("Could not parse charset from encoding {}", charsetStr, e);
InputStream contentStream = entity.getContent();
if (isGzipEncoded) {
try {
contentStream = new GZIPInputStream(contentStream);
} catch (IOException e) {
log.error("Failed to create GZIPInputStream", e);
return;
}
}

String body = readInputStream(contentStream, charset);
span.setAttribute(bodyAttributeKey, body);

} catch (IOException e) {
log.error("Could not read request input stream from repeatable request entity/body", e);
throw new RuntimeException(e);
}

return;
}

Expand All @@ -133,4 +142,18 @@ public static void traceEntity(
ApacheHttpClientObjectRegistry.entityToSpan.put(
entity, new SpanAndAttributeKey(span, bodyAttributeKey));
}

public static String readInputStream(InputStream inputStream, Charset charset)
throws IOException {
BoundedByteArrayOutputStream outputStream = BoundedBuffersFactory.createStream(charset);
try (InputStreamReader reader = new InputStreamReader(inputStream, charset);
OutputStreamWriter writer = new OutputStreamWriter(outputStream, charset)) {
int c;
while ((c = reader.read()) != -1) {
writer.write(c);
}
writer.flush();
}
return outputStream.toStringWithSuppliedCharset();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,18 @@ public static void exit(@Advice.This HttpEntity thizz, @Advice.Return InputStrea
}
Charset charset = ContentTypeCharsetUtils.toCharset(charsetStr);

String contentEncoding = null;
Header contentEncodingHeader = thizz.getContentEncoding();
if (contentEncodingHeader != null) {
contentEncoding = contentEncodingHeader.getValue();
}
SpanAndBuffer spanAndBuffer =
new SpanAndBuffer(
clientSpan.span,
BoundedBuffersFactory.createStream((int) contentSize, charset),
clientSpan.attributeKey,
charset);
charset,
contentEncoding);
VirtualField.find(InputStream.class, SpanAndBuffer.class).set(inputStream, spanAndBuffer);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,8 @@ public static void exit(@Advice.This InputStream thizz, @Advice.Return int avail
spanAndBuffer.span,
spanAndBuffer.attributeKey,
spanAndBuffer.byteArrayBuffer,
spanAndBuffer.charset);
spanAndBuffer.charset,
spanAndBuffer.contentEncoding);
contextStore.set(thizz, null);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,22 @@
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.charset.Charset;
import java.util.zip.GZIPInputStream;
import org.hypertrace.agent.core.instrumentation.HypertraceCallDepthThreadLocalMap;
import org.hypertrace.agent.core.instrumentation.HypertraceSemanticAttributes;
import org.hypertrace.agent.core.instrumentation.SpanAndBuffer;
import org.hypertrace.agent.core.instrumentation.buffer.BoundedBuffersFactory;
import org.hypertrace.agent.core.instrumentation.buffer.BoundedByteArrayOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -85,6 +91,13 @@ public static void addAttribute(Span span, AttributeKey<String> attributeKey, St
spanBuilder.setAttribute(
"http.response.header.content-type", (String) resContentType);
}
Object resContentEncoding =
getAttribute.invoke(
span, HypertraceSemanticAttributes.HTTP_RESPONSE_HEADER_CONTENT_ENCODING);
if (resContentEncoding != null) {
spanBuilder.setAttribute(
"http.response.header.content-encoding", (String) resContentEncoding);
}
}
} catch (IllegalAccessException | InvocationTargetException e) {
// ignore and continue
Expand All @@ -100,12 +113,31 @@ public static void addAttribute(Span span, AttributeKey<String> attributeKey, St
}

public static void addBody(
Span span, AttributeKey<String> attributeKey, ByteArrayOutputStream buffer, Charset charset) {
Span span,
AttributeKey<String> attributeKey,
ByteArrayOutputStream buffer,
Charset charset,
String contentEncoding) {
try {
String body = buffer.toString(charset.name());
InputStreamUtils.addAttribute(span, attributeKey, body);
byte[] data = buffer.toByteArray();

// if content-encoding is gzip,
if (contentEncoding != null && contentEncoding.toLowerCase().contains("gzip")) {
try (GZIPInputStream gzipInputStream =
new GZIPInputStream(new ByteArrayInputStream(data))) {
InputStreamReader reader = new InputStreamReader(gzipInputStream, charset);
String body = readInputStream(reader, charset);
InputStreamUtils.addAttribute(span, attributeKey, body);
}
} else {
// No decompression needed, convert directly to string
String body = new String(data, charset);
InputStreamUtils.addAttribute(span, attributeKey, body);
}
} catch (UnsupportedEncodingException e) {
log.error("Failed to parse encofing from charset {}", charset, e);
log.error("Failed to parse encoding from charset {}", charset, e);
} catch (IOException e) {
log.error("Failed to read or decompress data", e);
}
}

Expand All @@ -132,7 +164,8 @@ public static void read(
spanAndBuffer.span,
spanAndBuffer.attributeKey,
spanAndBuffer.byteArrayBuffer,
spanAndBuffer.charset);
spanAndBuffer.charset,
spanAndBuffer.contentEncoding);
contextStore.set(inputStream, null);
}
}
Expand All @@ -146,7 +179,8 @@ public static void read(
spanAndBuffer.span,
spanAndBuffer.attributeKey,
spanAndBuffer.byteArrayBuffer,
spanAndBuffer.charset);
spanAndBuffer.charset,
spanAndBuffer.contentEncoding);
VirtualField.find(InputStream.class, SpanAndBuffer.class).set(inputStream, null);
}
}
Expand All @@ -166,7 +200,8 @@ public static void read(
spanAndBuffer.span,
spanAndBuffer.attributeKey,
spanAndBuffer.byteArrayBuffer,
spanAndBuffer.charset);
spanAndBuffer.charset,
spanAndBuffer.contentEncoding);
contextStore.set(inputStream, null);
}
}
Expand Down Expand Up @@ -194,10 +229,24 @@ public static void readNBytes(
spanAndBuffer.span,
spanAndBuffer.attributeKey,
spanAndBuffer.byteArrayBuffer,
spanAndBuffer.charset);
spanAndBuffer.charset,
spanAndBuffer.contentEncoding);
contextStore.set(inputStream, null);
} else {
spanAndBuffer.byteArrayBuffer.write(b, off, read);
}
}

public static String readInputStream(InputStreamReader inputReader, Charset charset)
throws IOException {
BoundedByteArrayOutputStream outputStream = BoundedBuffersFactory.createStream(charset);
try (OutputStreamWriter writer = new OutputStreamWriter(outputStream, charset)) {
int c;
while ((c = inputReader.read()) != -1) {
writer.write(c);
}
writer.flush();
}
return outputStream.toStringWithSuppliedCharset();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.micronaut.http.client.annotation.Client;
import io.micronaut.test.annotation.MicronautTest;
import io.opentelemetry.proto.trace.v1.Span;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeoutException;
import javax.inject.Inject;
Expand Down Expand Up @@ -135,4 +136,41 @@ public void post() throws InterruptedException, TimeoutException {
TEST_WRITER.getAttributesMap(clientSpan).get("http.request.body").getStringValue());
Assertions.assertNull(TEST_WRITER.getAttributesMap(clientSpan).get("http.response.body"));
}

@Test
public void getGzipResponse() throws TimeoutException, InterruptedException, IOException {
String retrieve =
client
.toBlocking()
.retrieve(
HttpRequest.GET(String.format("http://localhost:%d/gzip", testHttpServer.port()))
.header(REQUEST_HEADER_NAME, REQUEST_HEADER_VALUE));
Assertions.assertEquals("{\"message\": \"hello\"}", retrieve);

TEST_WRITER.waitForTraces(1);
List<List<Span>> traces =
TEST_WRITER.waitForSpans(
1,
span ->
!span.getKind().equals(Span.SpanKind.SPAN_KIND_CLIENT)
|| span.getAttributesList().stream()
.noneMatch(
keyValue ->
keyValue.getKey().equals("http.url")
&& keyValue.getValue().getStringValue().contains("/gzip")));
Assertions.assertEquals(1, traces.size());
Assertions.assertEquals(1, traces.get(0).size());
Span clientSpan = traces.get(0).get(0);
Assertions.assertEquals(
REQUEST_HEADER_VALUE,
TEST_WRITER
.getAttributesMap(clientSpan)
.get("http.request.header." + REQUEST_HEADER_NAME)
.getStringValue());
Assertions.assertNull(TEST_WRITER.getAttributesMap(clientSpan).get("http.request.body"));

String respBodyCapturedInSpan =
TEST_WRITER.getAttributesMap(clientSpan).get("http.response.body").getStringValue();
Assertions.assertEquals("{\"message\": \"hello\"}", respBodyCapturedInSpan);
}
}
Loading
Loading