Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] The thread context is not properly cleared and messes up the traces #10873

Merged
merged 3 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix 'org.apache.hc.core5.http.ParseException: Invalid protocol version' under JDK 16+ ([#4827](https://github.com/opensearch-project/OpenSearch/pull/4827))
- Fix compression support for h2c protocol ([#4944](https://github.com/opensearch-project/OpenSearch/pull/4944))
- Don't over-allocate in HeapBufferedAsyncEntityConsumer in order to consume the response ([#9993](https://github.com/opensearch-project/OpenSearch/pull/9993))
- [BUG] Fix the thread context that is not properly cleared and messes up the traces ([#10873](https://github.com/opensearch-project/OpenSearch/pull/10873))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,23 +44,23 @@ private DefaultSpanScope(Span span, SpanScope previousSpanScope, TracerContextSt
public static SpanScope create(Span span, TracerContextStorage<String, Span> tracerContextStorage) {
final SpanScope beforeSpanScope = spanScopeThreadLocal.get();
SpanScope newSpanScope = new DefaultSpanScope(span, beforeSpanScope, tracerContextStorage);
spanScopeThreadLocal.set(newSpanScope);
reta marked this conversation as resolved.
Show resolved Hide resolved
return newSpanScope;
}

@Override
public void close() {
detach();
spanScopeThreadLocal.set(previousSpanScope);
}

@Override
public SpanScope attach() {
spanScopeThreadLocal.set(this);
tracerContextStorage.put(TracerContextStorage.CURRENT_SPAN, this.span);
return this;
}

private void detach() {
spanScopeThreadLocal.set(previousSpanScope);
if (previousSpanScope != null) {
tracerContextStorage.put(TracerContextStorage.CURRENT_SPAN, previousSpanScope.getSpan());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;

Expand Down Expand Up @@ -53,7 +53,6 @@ public Span startSpan(SpanCreationContext context) {
parentSpan = getCurrentSpanInternal();
}
Span span = createSpan(context, parentSpan);
setCurrentSpanInContext(span);
addDefaultAttributes(span);
return span;
}
Expand Down Expand Up @@ -94,10 +93,6 @@ private Span createSpan(SpanCreationContext spanCreationContext, Span parentSpan
return tracingTelemetry.createSpan(spanCreationContext, parentSpan);
}

private void setCurrentSpanInContext(Span span) {
tracerContextStorage.put(TracerContextStorage.CURRENT_SPAN, span);
}

/**
* Adds default attributes in the span
* @param span the current active span
Expand All @@ -107,7 +102,7 @@ protected void addDefaultAttributes(Span span) {
}

@Override
public Span startSpan(SpanCreationContext spanCreationContext, Map<String, List<String>> headers) {
public Span startSpan(SpanCreationContext spanCreationContext, Map<String, Collection<String>> headers) {
Optional<Span> propagatedSpan = tracingTelemetry.getContextPropagator().extractFromHeaders(headers);
return startSpan(spanCreationContext.parent(propagatedSpan.map(SpanContext::new).orElse(null)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
package org.opensearch.telemetry.tracing;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.telemetry.tracing.http.HttpTracer;
import org.opensearch.telemetry.tracing.transport.TransportTracer;

import java.io.Closeable;

Expand All @@ -22,7 +22,7 @@
* @opensearch.experimental
*/
@ExperimentalApi
public interface Tracer extends HttpTracer, Closeable {
public interface Tracer extends TransportTracer, Closeable {
/**
* Starts the {@link Span} with given {@link SpanCreationContext}
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import org.opensearch.common.annotation.ExperimentalApi;

import java.util.List;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiConsumer;
Expand All @@ -36,7 +36,7 @@ public interface TracingContextPropagator {
* @param headers request headers to extract the context from
* @return current span
*/
Optional<Span> extractFromHeaders(Map<String, List<String>> headers);
Optional<Span> extractFromHeaders(Map<String, Collection<String>> headers);

/**
* Injects tracing context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.Tracer;

import java.util.List;
import java.util.Collection;
import java.util.Map;

/**
Expand Down Expand Up @@ -65,7 +65,7 @@ public void close() {
}

@Override
public Span startSpan(SpanCreationContext spanCreationContext, Map<String, List<String>> header) {
public Span startSpan(SpanCreationContext spanCreationContext, Map<String, Collection<String>> header) {
return NoopSpan.INSTANCE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,31 @@
* compatible open source license.
*/

package org.opensearch.telemetry.tracing.http;
package org.opensearch.telemetry.tracing.transport;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanCreationContext;

import java.util.List;
import java.util.Collection;
import java.util.Map;

/**
* HttpTracer helps in creating a {@link Span} which reads the incoming tracing information
* from the HttpRequest header and propagate the span accordingly.
* TransportTracer helps in creating a {@link Span} which reads the incoming tracing information
* from the HTTP or TCP transport headers and propagate the span accordingly.
* <p>
* All methods on the Tracer object are multi-thread safe.
*
* @opensearch.experimental
*/
@ExperimentalApi
public interface HttpTracer {
public interface TransportTracer {
/**
* Start the span with propagating the tracing info from the HttpRequest header.
*
* @param spanCreationContext span name.
* @param header http request header.
* @return span.
* @param headers transport headers
* @return the span instance
*/
Span startSpan(SpanCreationContext spanCreationContext, Map<String, List<String>> header);
Span startSpan(SpanCreationContext spanCreationContext, Map<String, Collection<String>> headers);
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@
*/

/**
* Contains No-op implementations
* Contains HTTP or TCP transport related tracer capabilities
*/
package org.opensearch.telemetry.tracing.http;
package org.opensearch.telemetry.tracing.transport;
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand All @@ -35,7 +37,6 @@ public class DefaultTracerTests extends OpenSearchTestCase {
private Span mockSpan;
private Span mockParentSpan;

private SpanScope mockSpanScope;
private ThreadPool threadPool;
private ExecutorService executorService;
private SpanCreationContext spanCreationContext;
Expand Down Expand Up @@ -102,11 +103,11 @@ public void testCreateSpanWithAttributes() {

Span span = defaultTracer.startSpan(spanCreationContext);

assertEquals("span_name", defaultTracer.getCurrentSpan().getSpan().getSpanName());
assertEquals(1.0, ((MockSpan) defaultTracer.getCurrentSpan().getSpan()).getAttribute("key1"));
assertEquals(2l, ((MockSpan) defaultTracer.getCurrentSpan().getSpan()).getAttribute("key2"));
assertEquals(true, ((MockSpan) defaultTracer.getCurrentSpan().getSpan()).getAttribute("key3"));
assertEquals("key4", ((MockSpan) defaultTracer.getCurrentSpan().getSpan()).getAttribute("key4"));
assertThat(defaultTracer.getCurrentSpan(), is(nullValue()));
assertEquals(1.0, ((MockSpan) span).getAttribute("key1"));
assertEquals(2l, ((MockSpan) span).getAttribute("key2"));
assertEquals(true, ((MockSpan) span).getAttribute("key3"));
assertEquals("key4", ((MockSpan) span).getAttribute("key4"));
span.endSpan();
}

Expand All @@ -121,16 +122,18 @@ public void testCreateSpanWithParent() {

Span span = defaultTracer.startSpan(spanCreationContext, null);

SpanContext parentSpan = defaultTracer.getCurrentSpan();

SpanCreationContext spanCreationContext1 = buildSpanCreationContext("span_name_1", Attributes.EMPTY, parentSpan.getSpan());
try (final SpanScope scope = defaultTracer.withSpanInScope(span)) {
SpanContext parentSpan = defaultTracer.getCurrentSpan();

Span span1 = defaultTracer.startSpan(spanCreationContext1);
SpanCreationContext spanCreationContext1 = buildSpanCreationContext("span_name_1", Attributes.EMPTY, parentSpan.getSpan());

assertEquals("span_name_1", defaultTracer.getCurrentSpan().getSpan().getSpanName());
assertEquals(parentSpan.getSpan(), defaultTracer.getCurrentSpan().getSpan().getParentSpan());
span1.endSpan();
span.endSpan();
try (final ScopedSpan span1 = defaultTracer.startScopedSpan(spanCreationContext1)) {
assertEquals("span_name_1", defaultTracer.getCurrentSpan().getSpan().getSpanName());
assertEquals(parentSpan.getSpan(), defaultTracer.getCurrentSpan().getSpan().getParentSpan());
}
} finally {
span.endSpan();
}
}

@SuppressWarnings("unchecked")
Expand All @@ -155,8 +158,7 @@ public void testCreateSpanWithNullParent() {

Span span = defaultTracer.startSpan(spanCreationContext);

assertEquals("span_name", defaultTracer.getCurrentSpan().getSpan().getSpanName());
assertEquals(null, defaultTracer.getCurrentSpan().getSpan().getParentSpan());
assertThat(defaultTracer.getCurrentSpan(), is(nullValue()));
span.endSpan();
}

Expand Down Expand Up @@ -403,7 +405,6 @@ private void setupMocks() {
mockTracingTelemetry = mock(TracingTelemetry.class);
mockSpan = mock(Span.class);
mockParentSpan = mock(Span.class);
mockSpanScope = mock(SpanScope.class);
mockTracerContextStorage = mock(TracerContextStorage.class);
when(mockSpan.getSpanName()).thenReturn("span_name");
when(mockSpan.getSpanId()).thenReturn("span_id");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,22 @@ public void testSanityChecksWhenTracingEnabled() throws Exception {

// Create Index and ingest data
String indexName = "test-index-11";
Settings basicSettings = Settings.builder().put("number_of_shards", 3).put("number_of_replicas", 0).build();
Settings basicSettings = Settings.builder()
.put("number_of_shards", 2)
.put("number_of_replicas", 0)
.put("index.routing.allocation.total_shards_per_node", 1)
.build();
createIndex(indexName, basicSettings);
indexRandom(true, client.prepareIndex(indexName).setId("1").setSource("field1", "the fox jumps in the well"));
indexRandom(true, client.prepareIndex(indexName).setId("1").setSource("field2", "another fox did the same."));

indexRandom(false, client.prepareIndex(indexName).setId("1").setSource("field1", "the fox jumps in the well"));
indexRandom(false, client.prepareIndex(indexName).setId("2").setSource("field2", "another fox did the same."));

ensureGreen();
refresh();

// Make the search calls; adding the searchType and PreFilterShardSize to make the query path predictable across all the runs.
client.prepareSearch().setSearchType("query_then_fetch").setPreFilterShardSize(3).setQuery(queryStringQuery("fox")).get();
client.prepareSearch().setSearchType("query_then_fetch").setPreFilterShardSize(3).setQuery(queryStringQuery("jumps")).get();

ensureGreen();
refresh();
client.prepareSearch().setSearchType("dfs_query_then_fetch").setPreFilterShardSize(2).setQuery(queryStringQuery("fox")).get();
client.prepareSearch().setSearchType("dfs_query_then_fetch").setPreFilterShardSize(2).setQuery(queryStringQuery("jumps")).get();

// Sleep for about 3s to wait for traces are published, delay is (the delay is 1s).
Thread.sleep(3000);
Expand All @@ -88,8 +90,10 @@ public void testSanityChecksWhenTracingEnabled() throws Exception {
)
);

// See please https://github.com/opensearch-project/OpenSearch/issues/10291 till local transport is not instrumented,
// capturing only the inter-nodes transport actions.
InMemorySingletonSpanExporter exporter = InMemorySingletonSpanExporter.INSTANCE;
validators.validate(exporter.getFinishedSpanItems(), 6);
validators.validate(exporter.getFinishedSpanItems(), 4);
}

private static void updateTelemetrySetting(Client client, boolean value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@

import org.opensearch.core.common.Strings;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -51,7 +51,7 @@ private static OTelPropagatedSpan getPropagatedSpan(Context context) {
}

@Override
public Optional<Span> extractFromHeaders(Map<String, List<String>> headers) {
public Optional<Span> extractFromHeaders(Map<String, Collection<String>> headers) {
Context context = openTelemetry.getPropagators().getTextMapPropagator().extract(Context.current(), headers, HEADER_TEXT_MAP_GETTER);
return Optional.ofNullable(getPropagatedSpan(context));
}
Expand Down Expand Up @@ -87,9 +87,9 @@ public String get(Map<String, String> headers, String key) {
}
};

private static final TextMapGetter<Map<String, List<String>>> HEADER_TEXT_MAP_GETTER = new TextMapGetter<>() {
private static final TextMapGetter<Map<String, Collection<String>>> HEADER_TEXT_MAP_GETTER = new TextMapGetter<>() {
@Override
public Iterable<String> keys(Map<String, List<String>> headers) {
public Iterable<String> keys(Map<String, Collection<String>> headers) {
if (headers != null) {
return headers.keySet();
} else {
Expand All @@ -98,7 +98,7 @@ public Iterable<String> keys(Map<String, List<String>> headers) {
}

@Override
public String get(Map<String, List<String>> headers, String key) {
public String get(Map<String, Collection<String>> headers, String key) {
if (headers != null && headers.containsKey(key)) {
return Strings.collectionToCommaDelimitedString(headers.get(key));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
import org.opensearch.test.OpenSearchTestCase;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import io.opentelemetry.api.OpenTelemetry;
Expand Down Expand Up @@ -57,7 +57,7 @@ public void testExtractTracerContextFromHeader() {
}

public void testExtractTracerContextFromHttpHeader() {
Map<String, List<String>> requestHeaders = new HashMap<>();
Map<String, Collection<String>> requestHeaders = new HashMap<>();
requestHeaders.put("traceparent", Arrays.asList("00-" + TRACE_ID + "-" + SPAN_ID + "-00"));
OpenTelemetry mockOpenTelemetry = mock(OpenTelemetry.class);
when(mockOpenTelemetry.getPropagators()).thenReturn(ContextPropagators.create(W3CTraceContextPropagator.getInstance()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,11 @@
import java.nio.channels.CancelledKeyException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -362,7 +364,7 @@ protected void serverAcceptedChannel(HttpChannel httpChannel) {
* @param httpChannel that received the http request
*/
public void incomingRequest(final HttpRequest httpRequest, final HttpChannel httpChannel) {
final Span span = tracer.startSpan(SpanBuilder.from(httpRequest), httpRequest.getHeaders());
final Span span = tracer.startSpan(SpanBuilder.from(httpRequest), extractHeaders(httpRequest.getHeaders()));
try (final SpanScope httpRequestSpanScope = tracer.withSpanInScope(span)) {
HttpChannel traceableHttpChannel = TraceableHttpChannel.create(httpChannel, span, tracer);
handleIncomingRequest(httpRequest, traceableHttpChannel, httpRequest.getInboundException());
Expand Down Expand Up @@ -483,4 +485,9 @@ private static ActionListener<Void> earlyResponseListener(HttpRequest request, H
return NO_OP;
}
}

@SuppressWarnings("unchecked")
private static <Values extends Collection<String>> Map<String, Collection<String>> extractHeaders(Map<String, Values> headers) {
return (Map<String, Collection<String>>) headers;
}
}
Loading
Loading