Skip to content

Commit

Permalink
Copy trace.id in threadcontext stash (elastic#83218)
Browse files Browse the repository at this point in the history
because of incorrect change in elastic#81381 trace.id value (set in RestController basted on
traceparent) was not copied when stashing a context.
This commit do not allow to copy traceparent but allows to copy trace.id when stashing
ThreadContext
  • Loading branch information
pgomulka authored Feb 1, 2022
1 parent 086c6e8 commit 4d9ad06
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 6 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/83218.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 83218
summary: Copy `trace.id` in threadcontext stash
area: Infra/Core
type: bug
issues: []
2 changes: 1 addition & 1 deletion server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,7 @@ protected Node(
final Transport transport = networkModule.getTransportSupplier().get();
Set<String> taskHeaders = Stream.concat(
pluginsService.filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.getTaskHeaders().stream()),
Stream.of(Task.X_OPAQUE_ID_HTTP_HEADER, Task.TRACE_ID, Task.X_ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER)
Task.HEADERS_TO_COPY.stream()
).collect(Collectors.toSet());
final TransportService transportService = newTransportService(
settings,
Expand Down
7 changes: 2 additions & 5 deletions server/src/main/java/org/elasticsearch/tasks/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,14 @@ public class Task {
*/
public static final String X_ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER = "X-elastic-product-origin";

public static final Set<String> HEADERS_TO_COPY = Set.of(
X_OPAQUE_ID_HTTP_HEADER,
TRACE_PARENT_HTTP_HEADER,
X_ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER
);
/**
* Parsed part of traceparent. It is stored in thread context and emitted in logs.
* Has to be declared as a header copied over for tasks.
*/
public static final String TRACE_ID = "trace.id";

public static final Set<String> HEADERS_TO_COPY = Set.of(X_OPAQUE_ID_HTTP_HEADER, TRACE_ID, X_ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER);

private final long id;

private final String type;
Expand Down
4 changes: 4 additions & 0 deletions server/src/main/java/org/elasticsearch/tasks/TaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -730,4 +730,8 @@ public void cancelTaskAndDescendants(CancellableTask task, String reason, boolea
throw new IllegalStateException("TaskCancellationService is not initialized");
}
}

public List<String> getTaskHeaders() {
return taskHeaders;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.logging.HeaderWarning;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;
Expand Down Expand Up @@ -690,6 +691,26 @@ public void testPutHeaders() {
assertEquals("value for key [foo] already present", e.getMessage());
}

public void testHeadersCopiedOnStash() {
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);

try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
threadContext.putHeader(Task.X_OPAQUE_ID_HTTP_HEADER, "x-opaque-id");
threadContext.putHeader(Task.TRACE_ID, "0af7651916cd43dd8448eb211c80319c");
threadContext.putHeader(Task.X_ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER, "kibana");

try (ThreadContext.StoredContext ignored2 = threadContext.stashContext()) {
assertEquals("x-opaque-id", threadContext.getHeader(Task.X_OPAQUE_ID_HTTP_HEADER));
assertEquals("0af7651916cd43dd8448eb211c80319c", threadContext.getHeader(Task.TRACE_ID));
assertEquals("kibana", threadContext.getHeader(Task.X_ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER));
}

assertEquals("x-opaque-id", threadContext.getHeader(Task.X_OPAQUE_ID_HTTP_HEADER));
assertEquals("0af7651916cd43dd8448eb211c80319c", threadContext.getHeader(Task.TRACE_ID));
assertEquals("kibana", threadContext.getHeader(Task.X_ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER));
}
}

/**
* Sometimes wraps a Runnable in an AbstractRunnable.
*/
Expand Down
12 changes: 12 additions & 0 deletions server/src/test/java/org/elasticsearch/node/NodeTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.RecoveryPlannerPlugin;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.MockHttpTransport;
import org.elasticsearch.test.rest.FakeRestRequest;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.ContextParser;
import org.elasticsearch.xcontent.MediaType;
import org.elasticsearch.xcontent.NamedObjectNotFoundException;
Expand All @@ -60,6 +62,7 @@
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.test.NodeRoles.dataNode;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -347,6 +350,15 @@ public void testNodeFailsToStartWhenThereAreMultipleRecoveryPlannerPluginsLoaded
assertThat(exception.getMessage(), containsString("A single RecoveryPlannerPlugin was expected but got:"));
}

public void testHeadersToCopyInTaskManagerAreTheSameAsDeclaredInTask() throws IOException {
Settings.Builder settings = baseSettings();
try (Node node = new MockNode(settings.build(), basePlugins())) {
final TransportService transportService = node.injector().getInstance(TransportService.class);
final List<String> taskHeaders = transportService.getTaskManager().getTaskHeaders();
assertThat(taskHeaders, containsInAnyOrder(Task.HEADERS_TO_COPY.toArray(new String[] {})));
}
}

public static class MockRecoveryPlannerPlugin extends Plugin implements RecoveryPlannerPlugin {
public MockRecoveryPlannerPlugin() {}

Expand Down

0 comments on commit 4d9ad06

Please sign in to comment.