Skip to content

Commit

Permalink
Adding AwsUnsampledOnlySpanProcessor to export batches of unsampled s…
Browse files Browse the repository at this point in the history
…pans (#948)
  • Loading branch information
srprash authored Nov 12, 2024
1 parent ffc6afe commit 2f6490b
Show file tree
Hide file tree
Showing 4 changed files with 297 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ private AwsAttributeKeys() {}
static final AttributeKey<String> AWS_LAMBDA_RESOURCE_ID =
AttributeKey.stringKey("aws.lambda.resource_mapping.id");

static final AttributeKey<Boolean> AWS_TRACE_FLAG_SAMPLED =
AttributeKey.booleanKey("aws.trace.flag.sampled");

// use the same AWS Resource attribute name defined by OTel java auto-instr for aws_sdk_v_1_1
// TODO: all AWS specific attributes should be defined in semconv package and reused cross all
// otel packages. Related sim -
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright Amazon.com, Inc. or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.opentelemetry.javaagent.providers;

import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.ReadWriteSpan;
import io.opentelemetry.sdk.trace.ReadableSpan;
import io.opentelemetry.sdk.trace.SpanProcessor;

/**
* {@link SpanProcessor} that only exports unsampled spans in a batch via a delegated @{link
* BatchSpanProcessor}. The processor also adds an attribute to each processed span to indicate that
* it was sampled or not.
*/
final class AwsUnsampledOnlySpanProcessor implements SpanProcessor {

private final SpanProcessor delegate;

AwsUnsampledOnlySpanProcessor(SpanProcessor delegate) {
this.delegate = delegate;
}

public static AwsUnsampledOnlySpanProcessorBuilder builder() {
return new AwsUnsampledOnlySpanProcessorBuilder();
}

@Override
public void onStart(Context parentContext, ReadWriteSpan span) {
if (!span.getSpanContext().isSampled()) {
span.setAttribute(AwsAttributeKeys.AWS_TRACE_FLAG_SAMPLED, false);
}
delegate.onStart(parentContext, span);
}

@Override
public void onEnd(ReadableSpan span) {
if (!span.getSpanContext().isSampled()) {
delegate.onEnd(span);
}
}

@Override
public boolean isStartRequired() {
return true;
}

@Override
public boolean isEndRequired() {
return true;
}

@Override
public CompletableResultCode shutdown() {
return delegate.shutdown();
}

@Override
public CompletableResultCode forceFlush() {
return delegate.forceFlush();
}

@Override
public void close() {
delegate.close();
}

// Visible for testing
SpanProcessor getDelegate() {
return delegate;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright Amazon.com, Inc. or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.opentelemetry.javaagent.providers;

import static java.util.Objects.requireNonNull;

import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
import io.opentelemetry.sdk.trace.export.SpanExporter;

final class AwsUnsampledOnlySpanProcessorBuilder {

// Default exporter is OtlpUdpSpanExporter with unsampled payload prefix
private SpanExporter exporter =
new OtlpUdpSpanExporterBuilder()
.setPayloadSampleDecision(TracePayloadSampleDecision.UNSAMPLED)
.build();

public AwsUnsampledOnlySpanProcessorBuilder setSpanExporter(SpanExporter exporter) {
requireNonNull(exporter, "exporter cannot be null");
this.exporter = exporter;
return this;
}

public AwsUnsampledOnlySpanProcessor build() {
BatchSpanProcessor bsp =
BatchSpanProcessor.builder(exporter).setExportUnsampledSpans(true).build();
return new AwsUnsampledOnlySpanProcessor(bsp);
}

SpanExporter getSpanExporter() {
return exporter;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Copyright Amazon.com, Inc. or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.opentelemetry.javaagent.providers;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.*;

import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter;
import io.opentelemetry.sdk.trace.ReadWriteSpan;
import io.opentelemetry.sdk.trace.ReadableSpan;
import io.opentelemetry.sdk.trace.SpanProcessor;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.util.ArrayList;
import java.util.Collection;
import org.junit.jupiter.api.Test;

public class AwsUnsampledOnlySpanProcessorTest {

@Test
public void testIsStartRequired() {
SpanProcessor processor = AwsUnsampledOnlySpanProcessor.builder().build();
assertThat(processor.isStartRequired()).isTrue();
}

@Test
public void testIsEndRequired() {
SpanProcessor processor = AwsUnsampledOnlySpanProcessor.builder().build();
assertThat(processor.isEndRequired()).isTrue();
}

@Test
public void testDefaultSpanProcessor() {
AwsUnsampledOnlySpanProcessorBuilder builder = AwsUnsampledOnlySpanProcessor.builder();
AwsUnsampledOnlySpanProcessor unsampledSP = builder.build();

assertThat(builder.getSpanExporter()).isInstanceOf(OtlpUdpSpanExporter.class);
SpanProcessor delegate = unsampledSP.getDelegate();
assertThat(delegate).isInstanceOf(BatchSpanProcessor.class);
BatchSpanProcessor delegateBsp = (BatchSpanProcessor) delegate;
String delegateBspString = delegateBsp.toString();
assertThat(delegateBspString)
.contains(
"spanExporter=software.amazon.opentelemetry.javaagent.providers.OtlpUdpSpanExporter");
assertThat(delegateBspString).contains("exportUnsampledSpans=true");
}

@Test
public void testSpanProcessorWithExporter() {
AwsUnsampledOnlySpanProcessorBuilder builder =
AwsUnsampledOnlySpanProcessor.builder().setSpanExporter(InMemorySpanExporter.create());
AwsUnsampledOnlySpanProcessor unsampledSP = builder.build();

assertThat(builder.getSpanExporter()).isInstanceOf(InMemorySpanExporter.class);
SpanProcessor delegate = unsampledSP.getDelegate();
assertThat(delegate).isInstanceOf(BatchSpanProcessor.class);
BatchSpanProcessor delegateBsp = (BatchSpanProcessor) delegate;
String delegateBspString = delegateBsp.toString();
assertThat(delegateBspString)
.contains("spanExporter=io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter");
assertThat(delegateBspString).contains("exportUnsampledSpans=true");
}

@Test
public void testStartAddsAttributeToSampledSpan() {
SpanContext mockSpanContext = mock(SpanContext.class);
when(mockSpanContext.isSampled()).thenReturn(true);
Context parentContextMock = mock(Context.class);
ReadWriteSpan spanMock = mock(ReadWriteSpan.class);
when(spanMock.getSpanContext()).thenReturn(mockSpanContext);

AwsUnsampledOnlySpanProcessor processor = AwsUnsampledOnlySpanProcessor.builder().build();
processor.onStart(parentContextMock, spanMock);

// verify setAttribute was never called
verify(spanMock, never()).setAttribute(any(), anyBoolean());
}

@Test
public void testStartAddsAttributeToUnsampledSpan() {
SpanContext mockSpanContext = mock(SpanContext.class);
when(mockSpanContext.isSampled()).thenReturn(false);
Context parentContextMock = mock(Context.class);
ReadWriteSpan spanMock = mock(ReadWriteSpan.class);
when(spanMock.getSpanContext()).thenReturn(mockSpanContext);

AwsUnsampledOnlySpanProcessor processor = AwsUnsampledOnlySpanProcessor.builder().build();
processor.onStart(parentContextMock, spanMock);

// verify setAttribute was called with the correct arguments
verify(spanMock, times(1)).setAttribute(AwsAttributeKeys.AWS_TRACE_FLAG_SAMPLED, false);
}

@Test
public void testExportsOnlyUnsampledSpans() {
SpanExporter mockExporter = mock(SpanExporter.class);
when(mockExporter.export(anyCollection())).thenReturn(CompletableResultCode.ofSuccess());

TestDelegateProcessor delegate = new TestDelegateProcessor();
AwsUnsampledOnlySpanProcessor processor = new AwsUnsampledOnlySpanProcessor(delegate);

// unsampled span
SpanContext mockSpanContextUnsampled = mock(SpanContext.class);
when(mockSpanContextUnsampled.isSampled()).thenReturn(false);
ReadableSpan mockSpanUnsampled = mock(ReadableSpan.class);
when(mockSpanUnsampled.getSpanContext()).thenReturn(mockSpanContextUnsampled);

// sampled span
SpanContext mockSpanContextSampled = mock(SpanContext.class);
when(mockSpanContextSampled.isSampled()).thenReturn(true);
ReadableSpan mockSpanSampled = mock(ReadableSpan.class);
when(mockSpanSampled.getSpanContext()).thenReturn(mockSpanContextSampled);

processor.onEnd(mockSpanSampled);
processor.onEnd(mockSpanUnsampled);

// validate that only the unsampled span was delegated
assertThat(delegate.getEndedSpans()).containsExactly(mockSpanUnsampled);
}

private static class TestDelegateProcessor implements SpanProcessor {
// keep a queue of Readable spans added when onEnd is called
Collection<ReadableSpan> endedSpans = new ArrayList<>();

@Override
public void onStart(Context parentContext, ReadWriteSpan span) {}

@Override
public boolean isStartRequired() {
return false;
}

@Override
public void onEnd(ReadableSpan span) {
endedSpans.add(span);
}

@Override
public boolean isEndRequired() {
return false;
}

public Collection<ReadableSpan> getEndedSpans() {
return endedSpans;
}
}
}

0 comments on commit 2f6490b

Please sign in to comment.