Skip to content

Commit

Permalink
Adding UdpExporter for Otlp spans (#944)
Browse files Browse the repository at this point in the history
  • Loading branch information
srprash authored Nov 7, 2024
1 parent 61164e8 commit ffc6afe
Show file tree
Hide file tree
Showing 5 changed files with 400 additions and 0 deletions.
3 changes: 3 additions & 0 deletions awsagentprovider/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,15 @@ dependencies {
implementation("com.amazonaws:aws-java-sdk-core:1.12.773")
// Export configuration
compileOnly("io.opentelemetry:opentelemetry-exporter-otlp")
// For Udp emitter
compileOnly("io.opentelemetry:opentelemetry-exporter-otlp-common")

testImplementation("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure")
testImplementation("io.opentelemetry:opentelemetry-sdk-testing")
testImplementation("io.opentelemetry:opentelemetry-extension-aws")
testImplementation("io.opentelemetry:opentelemetry-extension-trace-propagators")
testImplementation("com.google.guava:guava")
testRuntimeOnly("io.opentelemetry:opentelemetry-exporter-otlp-common")

compileOnly("com.google.code.findbugs:jsr305:3.0.2")
testImplementation("org.mockito:mockito-core:5.3.1")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright Amazon.com, Inc. or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.opentelemetry.javaagent.providers;

import io.opentelemetry.exporter.internal.otlp.traces.TraceRequestMarshaler;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.concurrent.Immutable;

/**
* Exports spans via UDP, using OpenTelemetry's protobuf model. The protobuf modelled spans are
* Base64 encoded and prefixed with AWS X-Ray specific information before being sent over to {@link
* UdpSender}.
*
* <p>This exporter is NOT meant for generic use since the payload is prefixed with AWS X-Ray
* specific information.
*/
@Immutable
class OtlpUdpSpanExporter implements SpanExporter {

private static final Logger logger = Logger.getLogger(OtlpUdpSpanExporter.class.getName());

private final AtomicBoolean isShutdown = new AtomicBoolean();

private final UdpSender sender;
private final String payloadPrefix;

OtlpUdpSpanExporter(UdpSender sender, String payloadPrefix) {
this.sender = sender;
this.payloadPrefix = payloadPrefix;
}

@Override
public CompletableResultCode export(Collection<SpanData> spans) {
if (isShutdown.get()) {
return CompletableResultCode.ofFailure();
}

TraceRequestMarshaler exportRequest = TraceRequestMarshaler.create(spans);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
exportRequest.writeBinaryTo(baos);
String payload = payloadPrefix + Base64.getEncoder().encodeToString(baos.toByteArray());
sender.send(payload.getBytes(StandardCharsets.UTF_8));
return CompletableResultCode.ofSuccess();
} catch (Exception e) {
logger.log(Level.SEVERE, "Failed to export spans. Error: " + e.getMessage(), e);
return CompletableResultCode.ofFailure();
}
}

@Override
public CompletableResultCode flush() {
// TODO: implement
return CompletableResultCode.ofSuccess();
}

@Override
public CompletableResultCode shutdown() {
if (!isShutdown.compareAndSet(false, true)) {
logger.log(Level.INFO, "Calling shutdown() multiple times.");
return CompletableResultCode.ofSuccess();
}
return sender.shutdown();
}

// Visible for testing
UdpSender getSender() {
return sender;
}

// Visible for testing
String getPayloadPrefix() {
return payloadPrefix;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright Amazon.com, Inc. or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.opentelemetry.javaagent.providers;

import static java.util.Objects.requireNonNull;

final class OtlpUdpSpanExporterBuilder {

private static final String DEFAULT_HOST = "127.0.0.1";
private static final int DEFAULT_PORT = 2000;

// The protocol header and delimiter is required for sending data to X-Ray Daemon or when running
// in Lambda.
// https://docs.aws.amazon.com/xray/latest/devguide/xray-api-sendingdata.html#xray-api-daemon
private static final String PROTOCOL_HEADER = "{\"format\": \"json\", \"version\": 1}";
private static final char PROTOCOL_DELIMITER = '\n';

// These prefixes help the backend identify if the spans payload is sampled or not.
private static final String FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX = "T1S";
private static final String FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX = "T1U";

private UdpSender sender;
private String tracePayloadPrefix = FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX;

public OtlpUdpSpanExporterBuilder setEndpoint(String endpoint) {
requireNonNull(endpoint, "endpoint must not be null");
try {
String[] parts = endpoint.split(":");
String host = parts[0];
int port = Integer.parseInt(parts[1]);
this.sender = new UdpSender(host, port);
} catch (Exception e) {
throw new IllegalArgumentException("Invalid endpoint, must be a valid URL: " + endpoint, e);
}
return this;
}

public OtlpUdpSpanExporterBuilder setPayloadSampleDecision(TracePayloadSampleDecision decision) {
this.tracePayloadPrefix =
decision == TracePayloadSampleDecision.SAMPLED
? FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX
: FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX;
return this;
}

public OtlpUdpSpanExporter build() {
if (sender == null) {
this.sender = new UdpSender(DEFAULT_HOST, DEFAULT_PORT);
}
return new OtlpUdpSpanExporter(
this.sender, PROTOCOL_HEADER + PROTOCOL_DELIMITER + tracePayloadPrefix);
}

// Only for testing
OtlpUdpSpanExporterBuilder setSender(UdpSender sender) {
this.sender = sender;
return this;
}
}

enum TracePayloadSampleDecision {
SAMPLED,
UNSAMPLED
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright Amazon.com, Inc. or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.opentelemetry.javaagent.providers;

import io.opentelemetry.sdk.common.CompletableResultCode;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* This class represents a UDP sender that sends data to a specified endpoint. It is used to send
* data to a remote host and port using UDP protocol.
*/
class UdpSender {
private static final Logger logger = Logger.getLogger(UdpSender.class.getName());

private DatagramSocket socket;
private final InetSocketAddress endpoint;

public UdpSender(String host, int port) {
this.endpoint = new InetSocketAddress(host, port);
try {
this.socket = new DatagramSocket();
} catch (SocketException e) {
logger.log(Level.SEVERE, "Exception while instantiating UdpSender socket.", e);
}
}

public CompletableResultCode shutdown() {
try {
if (socket == null) {
return CompletableResultCode.ofSuccess();
}
socket.close();
return CompletableResultCode.ofSuccess();
} catch (Exception e) {
logger.log(Level.SEVERE, "Exception while closing UdpSender socket.", e);
return CompletableResultCode.ofFailure();
}
}

public void send(byte[] data) {
if (socket == null) {
logger.log(Level.WARNING, "UdpSender socket is null. Cannot send data.");
return;
}
DatagramPacket packet = new DatagramPacket(data, data.length, endpoint);
try {
socket.send(packet);
} catch (IOException e) {
logger.log(Level.SEVERE, "Exception while sending data.", e);
}
}

// Visible for testing
InetSocketAddress getEndpoint() {
return endpoint;
}
}
Loading

0 comments on commit ffc6afe

Please sign in to comment.