Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Tracing Framework] Redefine telemetry context restoration and propagation #9617

Merged
Prev Previous commit
Next Next commit
Address review comment
Signed-off-by: Gagan Juneja <[email protected]>
Gagan Juneja committed Aug 31, 2023
commit 8805a1da809844723f49094f0e74201601c01aa9
Original file line number Diff line number Diff line change
@@ -17,34 +17,51 @@ public class DefaultSpanScope implements SpanScope {
private final Span span;
private final SpanScope previousSpanScope;
private static final ThreadLocal<SpanScope> spanScopeThreadLocal = new ThreadLocal<>();
private final TracerContextStorage<String, Span> tracerContextStorage;

/**
* Constructor
* @param span span
* @param previousSpanScope before attached span scope.
*/
private DefaultSpanScope(Span span, SpanScope previousSpanScope) {
private DefaultSpanScope(Span span, SpanScope previousSpanScope, TracerContextStorage<String, Span> tracerContextStorage) {
this.span = Objects.requireNonNull(span);
this.previousSpanScope = previousSpanScope;
this.tracerContextStorage = tracerContextStorage;
}

/**
* Creates the SpanScope object.
* @param span span.
* @return SpanScope spanScope
*/
public static SpanScope create(Span span) {
public static SpanScope create(Span span, TracerContextStorage<String, Span> tracerContextStorage) {
final SpanScope beforeSpanScope = spanScopeThreadLocal.get();
SpanScope newSpanScope = new DefaultSpanScope(span, beforeSpanScope);
SpanScope newSpanScope = new DefaultSpanScope(span, beforeSpanScope, tracerContextStorage);
spanScopeThreadLocal.set(newSpanScope);
return newSpanScope;
}

@Override
public void close() {
detach();
spanScopeThreadLocal.set(previousSpanScope);
}

@Override
public SpanScope attach() {
tracerContextStorage.put(TracerContextStorage.CURRENT_SPAN, this.span);
return this;
}

private void detach() {
if (previousSpanScope != null) {
tracerContextStorage.put(TracerContextStorage.CURRENT_SPAN, previousSpanScope.getSpan());
} else {
tracerContextStorage.put(TracerContextStorage.CURRENT_SPAN, null);
}
}

@Override
public Span getSpan() {
return span;
Original file line number Diff line number Diff line change
@@ -96,11 +96,21 @@ public ScopedSpan startScopedSpan(SpanCreationContext spanCreationContext, SpanC

@Override
public SpanScope withSpanInScope(Span span) {
return DefaultSpanScope.create(span);
return DefaultSpanScope.create(span, tracerContextStorage).attach();
}

private Span createSpan(String spanName, Span parentSpan, Attributes attributes) {
return tracingTelemetry.createSpan(spanName, parentSpan, attributes, span -> onSpanEnd(span, parentSpan));
return tracingTelemetry.createSpan(spanName, parentSpan, attributes, new SpanLifecycleListener() {
@Override
public void onStart(Span span) {
setCurrentSpanInContext(span);
}

@Override
public void onEnd(Span span) {
onSpanEnd(span, parentSpan);
}
});
}

private void onSpanEnd(Span span, Span parentSpan) {
Gaganjuneja marked this conversation as resolved.
Show resolved Hide resolved
reta marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.telemetry.tracing;

/**
* A listener for Span Lifecycle
*/
public interface SpanLifecycleListener {
/**
* On Span start
* @param span span
*/
void onStart(Span span);

/**
* On span end
* @param span span
*/
void onEnd(Span span);
}
Original file line number Diff line number Diff line change
@@ -23,6 +23,12 @@ public interface SpanScope extends AutoCloseable {
@Override
void close();

/**
* Attaches span to the {@link SpanScope}
* @return spanScope
*/
SpanScope attach();

/**
* Returns span attached with the {@link SpanScope}
* @return span.
Original file line number Diff line number Diff line change
@@ -11,7 +11,6 @@
import org.opensearch.telemetry.tracing.attributes.Attributes;

import java.io.Closeable;
import java.util.function.Consumer;

/**
* Interface for tracing telemetry providers
@@ -22,13 +21,14 @@ public interface TracingTelemetry extends Closeable {

/**
* Creates span with provided arguments
* @param spanName name of the span
* @param parentSpan span's parent span
* @param attributes attributes to be added.
* @param onSpanEndConsumer consumer to be invoked on span end.
*
* @param spanName name of the span
* @param parentSpan span's parent span
* @param attributes attributes to be added.
* @param spanLifecycleListener consumer to be invoked on span end.
* @return span instance
*/
Span createSpan(String spanName, Span parentSpan, Attributes attributes, Consumer<Span> onSpanEndConsumer);
Span createSpan(String spanName, Span parentSpan, Attributes attributes, SpanLifecycleListener spanLifecycleListener);

/**
* provides tracing context propagator
Original file line number Diff line number Diff line change
@@ -27,6 +27,11 @@ public void close() {

}

@Override
public SpanScope attach() {
return this;
}

@Override
public Span getSpan() {
return NoopSpan.INSTANCE;
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@

import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.node.Node;
import org.opensearch.telemetry.tracing.attributes.Attributes;
import org.opensearch.test.OpenSearchTestCase;
@@ -20,7 +21,6 @@
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
@@ -67,22 +67,20 @@ public void testCreateSpan() {
public void testCreateSpanWithAttributesWithMock() {
DefaultTracer defaultTracer = new DefaultTracer(mockTracingTelemetry, mockTracerContextStorage);
Attributes attributes = Attributes.create().addAttribute("name", "value");
when(mockTracingTelemetry.createSpan(eq("span_name"), eq(mockParentSpan), eq(attributes), any(Consumer.class))).thenReturn(
mockSpan
);
when(mockTracingTelemetry.createSpan(eq("span_name"), eq(mockParentSpan), eq(attributes), any(SpanLifecycleListener.class)))
.thenReturn(mockSpan);
defaultTracer.startSpan("span_name", attributes);
verify(mockTracingTelemetry).createSpan(eq("span_name"), eq(mockParentSpan), eq(attributes), any(Consumer.class));
verify(mockTracingTelemetry).createSpan(eq("span_name"), eq(mockParentSpan), eq(attributes), any(SpanLifecycleListener.class));
}

@SuppressWarnings("unchecked")
public void testCreateSpanWithAttributesWithParentMock() {
DefaultTracer defaultTracer = new DefaultTracer(mockTracingTelemetry, mockTracerContextStorage);
Attributes attributes = Attributes.create().addAttribute("name", "value");
when(mockTracingTelemetry.createSpan(eq("span_name"), eq(mockParentSpan), eq(attributes), any(Consumer.class))).thenReturn(
mockSpan
);
when(mockTracingTelemetry.createSpan(eq("span_name"), eq(mockParentSpan), eq(attributes), any(SpanLifecycleListener.class)))
.thenReturn(mockSpan);
defaultTracer.startSpan("span_name", new SpanContext(mockParentSpan), attributes);
verify(mockTracingTelemetry).createSpan(eq("span_name"), eq(mockParentSpan), eq(attributes), any(Consumer.class));
verify(mockTracingTelemetry).createSpan(eq("span_name"), eq(mockParentSpan), eq(attributes), any(SpanLifecycleListener.class));
verify(mockTracerContextStorage, never()).get(TracerContextStorage.CURRENT_SPAN);
}

@@ -94,7 +92,7 @@ public void testCreateSpanWithAttributes() {
new ThreadContextBasedTracerContextStorage(threadContext, tracingTelemetry)
);

defaultTracer.startSpan(
Span span = defaultTracer.startSpan(
"span_name",
Attributes.create().addAttribute("key1", 1.0).addAttribute("key2", 2l).addAttribute("key3", true).addAttribute("key4", "key4")
);
@@ -104,6 +102,7 @@ public void testCreateSpanWithAttributes() {
assertEquals(2l, ((MockSpan) defaultTracer.getCurrentSpan().getSpan()).getAttribute("key2"));
assertEquals(true, ((MockSpan) defaultTracer.getCurrentSpan().getSpan()).getAttribute("key3"));
assertEquals("key4", ((MockSpan) defaultTracer.getCurrentSpan().getSpan()).getAttribute("key4"));
span.endSpan();
}

public void testCreateSpanWithParent() {
@@ -113,25 +112,26 @@ public void testCreateSpanWithParent() {
new ThreadContextBasedTracerContextStorage(new ThreadContext(Settings.EMPTY), tracingTelemetry)
);

defaultTracer.startSpan("span_name", null);
Span span = defaultTracer.startSpan("span_name", null);

SpanContext parentSpan = defaultTracer.getCurrentSpan();

defaultTracer.startSpan("span_name_1", parentSpan, Attributes.EMPTY);
Span span1 = defaultTracer.startSpan("span_name_1", parentSpan, Attributes.EMPTY);

assertEquals("span_name_1", defaultTracer.getCurrentSpan().getSpan().getSpanName());
assertEquals(parentSpan.getSpan(), defaultTracer.getCurrentSpan().getSpan().getParentSpan());
span1.endSpan();
span.endSpan();
}

@SuppressWarnings("unchecked")
public void testCreateSpanWithContext() {
DefaultTracer defaultTracer = new DefaultTracer(mockTracingTelemetry, mockTracerContextStorage);
Attributes attributes = Attributes.create().addAttribute("name", "value");
when(mockTracingTelemetry.createSpan(eq("span_name"), eq(mockParentSpan), eq(attributes), any(Consumer.class))).thenReturn(
mockSpan
);
when(mockTracingTelemetry.createSpan(eq("span_name"), eq(mockParentSpan), eq(attributes), any(SpanLifecycleListener.class)))
.thenReturn(mockSpan);
defaultTracer.startSpan(new SpanCreationContext("span_name", attributes));
verify(mockTracingTelemetry).createSpan(eq("span_name"), eq(mockParentSpan), eq(attributes), any(Consumer.class));
verify(mockTracingTelemetry).createSpan(eq("span_name"), eq(mockParentSpan), eq(attributes), any(SpanLifecycleListener.class));
}

public void testCreateSpanWithNullParent() {
@@ -142,10 +142,11 @@ public void testCreateSpanWithNullParent() {
new ThreadContextBasedTracerContextStorage(threadContext, tracingTelemetry)
);

defaultTracer.startSpan("span_name", (SpanContext) null, Attributes.EMPTY);
Span span = defaultTracer.startSpan("span_name", (SpanContext) null, Attributes.EMPTY);

assertEquals("span_name", defaultTracer.getCurrentSpan().getSpan().getSpanName());
assertEquals(null, defaultTracer.getCurrentSpan().getSpan().getParentSpan());
span.endSpan();
}

public void testEndSpanByClosingScopedSpan() {
@@ -282,16 +283,51 @@ public void testSpanAcrossThreads() {

spanT2.endSpan();
spanScopeT2.close();
currentSpanRefAfterEndThread2.set(defaultTracer.getCurrentSpan() != null ? defaultTracer.getCurrentSpan().getSpan() : null);
currentSpanRefAfterEndThread2.set(getCurrentSpanFromContext(defaultTracer));
});
spanScope.close();
currentSpanRefThread1.set(defaultTracer.getCurrentSpan().getSpan());
currentSpanRefThread1.set(getCurrentSpanFromContext(defaultTracer));
});
assertEquals(spanT2Ref.get(), currentSpanRefThread2.get());
assertEquals(spanRef.get(), currentSpanRefAfterEndThread2.get());
assertEquals(null, currentSpanRefThread1.get());
}

public void testSpanCloseOnThread2() {
TracingTelemetry tracingTelemetry = new MockTracingTelemetry();
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
ThreadContextBasedTracerContextStorage spanTracerStorage = new ThreadContextBasedTracerContextStorage(
threadContext,
tracingTelemetry
);
AtomicReference<SpanContext> currentSpanRefThread1 = new AtomicReference<>();
AtomicReference<SpanContext> currentSpanRefThread2 = new AtomicReference<>();
DefaultTracer defaultTracer = new DefaultTracer(tracingTelemetry, spanTracerStorage);
final Span span = defaultTracer.startSpan(new SpanCreationContext("span_name_t1", Attributes.EMPTY));
try (SpanScope spanScope = defaultTracer.withSpanInScope(span)) {
executorService.execute(() -> async(new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean response) {
span.endSpan();
currentSpanRefThread2.set(defaultTracer.getCurrentSpan());
}

@Override
public void onFailure(Exception e) {

}
}));
currentSpanRefThread1.set(defaultTracer.getCurrentSpan());
}
assertEquals(null, currentSpanRefThread2.get());
assertEquals(span, currentSpanRefThread1.get().getSpan());
assertEquals(null, defaultTracer.getCurrentSpan());
}

private void async(ActionListener<Boolean> actionListener) {
actionListener.onResponse(true);
}

/**
* 1. CreateSpan in ThreadA (NotScopedSpan)
* 2. create Async task and pass the span
@@ -331,29 +367,32 @@ public void testSpanAcrossThreadsMultipleSpans() {
executorService.execute(() -> {
Span spanT2 = defaultTracer.startSpan(new SpanCreationContext("span_name_t_2", Attributes.EMPTY));
SpanScope spanScopeT2 = defaultTracer.withSpanInScope(spanT2);
spanT2Ref.set(spanT2);

Span spanT21 = defaultTracer.startSpan(new SpanCreationContext("span_name_t_2", Attributes.EMPTY));
SpanScope spanScopeT21 = defaultTracer.withSpanInScope(spanT2);

spanT2Ref.set(spanT21);
currentSpanRefThread2.set(defaultTracer.getCurrentSpan().getSpan());

spanT21.endSpan();
spanScopeT21.close();

spanT2.endSpan();
spanScopeT2.close();
currentSpanRefAfterEndThread2.set(defaultTracer.getCurrentSpan() != null ? defaultTracer.getCurrentSpan().getSpan() : null);
currentSpanRefAfterEndThread2.set(getCurrentSpanFromContext(defaultTracer));
});
spanScope.close();
parentSpanScope.close();
currentSpanRefThread1.set(defaultTracer.getCurrentSpan().getSpan());
currentSpanRefThread1.set(getCurrentSpanFromContext(defaultTracer));
});
assertEquals(spanT2Ref.get(), currentSpanRefThread2.get());
assertEquals(spanRef.get(), currentSpanRefAfterEndThread2.get());
assertEquals(null, currentSpanRefThread1.get());
}

Gaganjuneja marked this conversation as resolved.
Show resolved Hide resolved
private static Span getCurrentSpanFromContext(DefaultTracer defaultTracer) {
return defaultTracer.getCurrentSpan() != null ? defaultTracer.getCurrentSpan().getSpan() : null;
}

public void testClose() throws IOException {
Tracer defaultTracer = new DefaultTracer(mockTracingTelemetry, mockTracerContextStorage);

@@ -376,8 +415,7 @@ private void setupMocks() {
when(mockParentSpan.getSpanId()).thenReturn("parent_span_id");
when(mockParentSpan.getTraceId()).thenReturn("trace_id");
when(mockTracerContextStorage.get(TracerContextStorage.CURRENT_SPAN)).thenReturn(mockParentSpan, mockSpan);
when(mockTracingTelemetry.createSpan(eq("span_name"), eq(mockParentSpan), any(Attributes.class), any(Consumer.class))).thenReturn(
mockSpan
);
when(mockTracingTelemetry.createSpan(eq("span_name"), eq(mockParentSpan), any(Attributes.class), any(SpanLifecycleListener.class)))
.thenReturn(mockSpan);
}
}
Original file line number Diff line number Diff line change
@@ -18,6 +18,16 @@ public class OTelPropagatedSpan extends OTelSpan {
* @param span otel propagated span
*/
public OTelPropagatedSpan(io.opentelemetry.api.trace.Span span) {
super(null, span, null, a -> {});
super(null, span, null, new SpanLifecycleListener() {
@Override
public void onStart(Span span) {

}

@Override
public void onEnd(Span span) {

}
});
}
}
Loading