From ec73bef3121e64f8d3f5b598c1267cf301b17438 Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Mon, 19 Feb 2024 14:52:58 -0800 Subject: [PATCH] Refactor search phase tracing instrumentation to detach span creation from the SearchRequestOperationsListener Signed-off-by: Andriy Redko --- .../telemetry/tracing/SpanContext.java | 15 ++++++++++ .../tracing/SpanCreationContext.java | 4 +-- .../search/AbstractSearchAsyncAction.java | 12 ++++++-- .../search/CanMatchPreFilterSearchPhase.java | 7 +++-- .../SearchDfsQueryThenFetchAsyncAction.java | 7 +++-- .../SearchQueryThenFetchAsyncAction.java | 7 +++-- .../action/search/TransportSearchAction.java | 12 +++++--- ...ceableSearchRequestOperationsListener.java | 28 ++++++++----------- .../AbstractSearchAsyncActionTests.java | 12 +++++--- .../CanMatchPreFilterSearchPhaseTests.java | 27 ++++++++++++------ .../action/search/SearchAsyncActionTests.java | 16 +++++++---- .../SearchQueryThenFetchAsyncActionTests.java | 4 ++- 12 files changed, 101 insertions(+), 50 deletions(-) diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/SpanContext.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/SpanContext.java index f9af611553aff..e5e62c795e5d0 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/SpanContext.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/SpanContext.java @@ -31,4 +31,19 @@ public SpanContext(Span span) { Span getSpan() { return span; } + + /** + * Sets the error for the current span behind this context + * @param cause error + */ + public void setError(final Exception cause) { + span.setError(cause); + } + + /** + * Ends current span + */ + public void endSpan() { + span.endSpan(); + } } diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/SpanCreationContext.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/SpanCreationContext.java index cbbcfe7a85d57..6af7c440f8de9 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/SpanCreationContext.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/SpanCreationContext.java @@ -79,8 +79,8 @@ public SpanCreationContext attributes(Attributes attributes) { } /** - * Sets the parent for spann - * @param parent parent + * Sets the parent for span + * @param parent parent span context * @return spanCreationContext */ public SpanCreationContext parent(SpanContext parent) { diff --git a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java index dfe85f73b43c2..635c001b121b6 100644 --- a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java @@ -58,6 +58,10 @@ import org.opensearch.search.internal.SearchContext; import org.opensearch.search.internal.ShardSearchRequest; import org.opensearch.search.pipeline.PipelinedRequest; +import org.opensearch.telemetry.tracing.Span; +import org.opensearch.telemetry.tracing.SpanCreationContext; +import org.opensearch.telemetry.tracing.SpanScope; +import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.transport.Transport; import java.util.ArrayDeque; @@ -116,6 +120,7 @@ abstract class AbstractSearchAsyncAction exten private final Map pendingExecutionsPerNode = new ConcurrentHashMap<>(); private final boolean throttleConcurrentRequests; private final SearchRequestContext searchRequestContext; + private final Tracer tracer; private SearchPhase currentPhase; private boolean currentPhaseHasLifecycle; @@ -140,7 +145,8 @@ abstract class AbstractSearchAsyncAction exten SearchPhaseResults resultConsumer, int maxConcurrentRequestsPerNode, SearchResponse.Clusters clusters, - SearchRequestContext searchRequestContext + SearchRequestContext searchRequestContext, + Tracer tracer ) { super(name); final List toSkipIterators = new ArrayList<>(); @@ -177,6 +183,7 @@ abstract class AbstractSearchAsyncAction exten this.results = resultConsumer; this.clusters = clusters; this.searchRequestContext = searchRequestContext; + this.tracer = tracer; } @Override @@ -455,7 +462,8 @@ private void onRequestEnd(SearchRequestContext searchRequestContext) { } private void executePhase(SearchPhase phase) { - try { + final Span phaseSpan = tracer.startSpan(SpanCreationContext.server().name("[phase/" + phase.getName() + "]")); + try (final SpanScope scope = tracer.withSpanInScope(phaseSpan)) { onPhaseStart(phase); phase.recordAndRun(); } catch (Exception e) { diff --git a/server/src/main/java/org/opensearch/action/search/CanMatchPreFilterSearchPhase.java b/server/src/main/java/org/opensearch/action/search/CanMatchPreFilterSearchPhase.java index c693eea4a2c33..952d83b9e4539 100644 --- a/server/src/main/java/org/opensearch/action/search/CanMatchPreFilterSearchPhase.java +++ b/server/src/main/java/org/opensearch/action/search/CanMatchPreFilterSearchPhase.java @@ -44,6 +44,7 @@ import org.opensearch.search.sort.FieldSortBuilder; import org.opensearch.search.sort.MinAndMax; import org.opensearch.search.sort.SortOrder; +import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.transport.Transport; import java.util.Comparator; @@ -91,7 +92,8 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction, SearchPhase> phaseFactory, SearchResponse.Clusters clusters, - SearchRequestContext searchRequestContext + SearchRequestContext searchRequestContext, + Tracer tracer ) { // We set max concurrent shard requests to the number of shards so no throttling happens for can_match requests super( @@ -112,7 +114,8 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction(shardsIts.size()), request.getMaxConcurrentShardRequests(), clusters, - searchRequestContext + searchRequestContext, + tracer ); this.queryPhaseResultConsumer = queryPhaseResultConsumer; this.searchPhaseController = searchPhaseController; diff --git a/server/src/main/java/org/opensearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/opensearch/action/search/SearchQueryThenFetchAsyncAction.java index c26bd5eef8c15..c8ab5fdaf61a1 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/opensearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -43,6 +43,7 @@ import org.opensearch.search.internal.SearchContext; import org.opensearch.search.internal.ShardSearchRequest; import org.opensearch.search.query.QuerySearchResult; +import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.transport.Transport; import java.util.Map; @@ -82,7 +83,8 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction asyncSearchAction( new ArraySearchPhaseResults<>(shardsIts.size()), searchRequest.getMaxConcurrentShardRequests(), clusters, - searchRequestContext + searchRequestContext, + tracer ) { @Override protected void executePhaseOnShard( @@ -1258,7 +1259,8 @@ private AbstractSearchAsyncAction searchAsyncAction ) ), clusters, - searchRequestContext + searchRequestContext, + tracer ); } else { final QueryPhaseResultConsumer queryResultConsumer = searchPhaseController.newSearchPhaseResults( @@ -1289,7 +1291,8 @@ private AbstractSearchAsyncAction searchAsyncAction clusterState, task, clusters, - searchRequestContext + searchRequestContext, + tracer ); break; case QUERY_THEN_FETCH: @@ -1310,7 +1313,8 @@ private AbstractSearchAsyncAction searchAsyncAction clusterState, task, clusters, - searchRequestContext + searchRequestContext, + tracer ); break; default: diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/listener/TraceableSearchRequestOperationsListener.java b/server/src/main/java/org/opensearch/telemetry/tracing/listener/TraceableSearchRequestOperationsListener.java index 8c39dfa68682c..71fb59194c447 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/listener/TraceableSearchRequestOperationsListener.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/listener/TraceableSearchRequestOperationsListener.java @@ -13,13 +13,9 @@ import org.opensearch.action.search.SearchRequestOperationsListener; import org.opensearch.telemetry.tracing.AttributeNames; import org.opensearch.telemetry.tracing.Span; -import org.opensearch.telemetry.tracing.SpanBuilder; import org.opensearch.telemetry.tracing.SpanContext; -import org.opensearch.telemetry.tracing.SpanScope; import org.opensearch.telemetry.tracing.Tracer; -import static org.opensearch.core.common.Strings.capitalize; - /** * SearchRequestOperationsListener subscriber for search request tracing * @@ -28,14 +24,12 @@ public final class TraceableSearchRequestOperationsListener extends SearchRequestOperationsListener { private final Tracer tracer; private final Span requestSpan; - private Span phaseSpan; - private SpanScope phaseSpanScope; + private SpanContext phaseSpanContext; public TraceableSearchRequestOperationsListener(final Tracer tracer, final Span requestSpan) { this.tracer = tracer; this.requestSpan = requestSpan; - this.phaseSpan = null; - this.phaseSpanScope = null; + this.phaseSpanContext = null; } public static SearchRequestOperationsListener create(final Tracer tracer, final Span requestSpan) { @@ -48,23 +42,23 @@ public static SearchRequestOperationsListener create(final Tracer tracer, final @Override protected void onPhaseStart(SearchPhaseContext context) { - assert phaseSpan == null : "There should be only one search phase active at a time"; - phaseSpan = tracer.startSpan(SpanBuilder.from(capitalize(context.getCurrentPhase().getName()), new SpanContext(requestSpan))); + assert phaseSpanContext == null : "There should be only one search phase active at a time"; + phaseSpanContext = tracer.getCurrentSpan(); } @Override protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { - assert phaseSpan != null : "There should be a search phase active at that time"; - phaseSpan.endSpan(); - phaseSpan = null; + assert phaseSpanContext != null : "There should be a search phase active at that time"; + phaseSpanContext.endSpan(); + phaseSpanContext = null; } @Override protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) { - assert phaseSpan != null : "There should be a search phase active at that time"; - phaseSpan.setError(new Exception(cause)); - phaseSpan.endSpan(); - phaseSpan = null; + assert phaseSpanContext != null : "There should be a search phase active at that time"; + phaseSpanContext.setError((Exception) cause); + phaseSpanContext.endSpan(); + phaseSpanContext = null; } @Override diff --git a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java index 601aa9dc1856e..15dddc6640e77 100644 --- a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java @@ -58,6 +58,7 @@ import org.opensearch.search.internal.ShardSearchContextId; import org.opensearch.search.internal.ShardSearchRequest; import org.opensearch.search.query.QuerySearchResult; +import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.InternalAggregationTestCase; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; @@ -122,7 +123,7 @@ protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searc } @Override - protected void onPhaseFailure(SearchPhaseContext context) { + protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) { assertThat(phase, is(context.getCurrentPhase())); phase = null; } @@ -205,7 +206,8 @@ private AbstractSearchAsyncAction createAction( new SearchRequestContext( new SearchRequestOperationsListener.CompositeListener(List.of(assertingListener), LogManager.getLogger()), request - ) + ), + NoopTracer.INSTANCE ) { @Override protected SearchPhase getNextPhase(final SearchPhaseResults results, SearchPhaseContext context) { @@ -746,7 +748,8 @@ private SearchDfsQueryThenFetchAsyncAction createSearchDfsQueryThenFetchAsyncAct new SearchRequestContext( new SearchRequestOperationsListener.CompositeListener(searchRequestOperationsListeners, logger), searchRequest - ) + ), + NoopTracer.INSTANCE ); } @@ -799,7 +802,8 @@ private SearchQueryThenFetchAsyncAction createSearchQueryThenFetchAsyncAction( new SearchRequestContext( new SearchRequestOperationsListener.CompositeListener(searchRequestOperationsListeners, logger), searchRequest - ) + ), + NoopTracer.INSTANCE ) { @Override ShardSearchFailure[] buildShardFailures() { diff --git a/server/src/test/java/org/opensearch/action/search/CanMatchPreFilterSearchPhaseTests.java b/server/src/test/java/org/opensearch/action/search/CanMatchPreFilterSearchPhaseTests.java index 30fc50f91dabd..7d16ee335ea93 100644 --- a/server/src/test/java/org/opensearch/action/search/CanMatchPreFilterSearchPhaseTests.java +++ b/server/src/test/java/org/opensearch/action/search/CanMatchPreFilterSearchPhaseTests.java @@ -55,6 +55,7 @@ import org.opensearch.search.sort.MinAndMax; import org.opensearch.search.sort.SortBuilders; import org.opensearch.search.sort.SortOrder; +import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.InternalAggregationTestCase; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.transport.Transport; @@ -108,7 +109,7 @@ protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searc } @Override - protected void onPhaseFailure(SearchPhaseContext context) { + protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) { assertThat(phases.contains(context.getCurrentPhase()), is(true)); phases.remove(context.getCurrentPhase()); } @@ -190,7 +191,8 @@ public void run() throws IOException { new SearchRequestContext( new SearchRequestOperationsListener.CompositeListener(List.of(assertingListener), LogManager.getLogger()), searchRequest - ) + ), + NoopTracer.INSTANCE ); canMatchPhase.start(); @@ -286,7 +288,8 @@ public void run() throws IOException { new SearchRequestContext( new SearchRequestOperationsListener.CompositeListener(List.of(assertingListener), LogManager.getLogger()), searchRequest - ) + ), + NoopTracer.INSTANCE ); canMatchPhase.start(); @@ -380,7 +383,8 @@ public void sendCanMatch( new SearchRequestContext( new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), searchRequest - ) + ), + NoopTracer.INSTANCE ) { @Override @@ -411,7 +415,8 @@ protected void executePhaseOnShard( new SearchRequestContext( new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), searchRequest - ) + ), + NoopTracer.INSTANCE ); canMatchPhase.start(); @@ -501,7 +506,8 @@ public void run() { new SearchRequestContext( new SearchRequestOperationsListener.CompositeListener(List.of(assertingListener), LogManager.getLogger()), searchRequest - ) + ), + NoopTracer.INSTANCE ); canMatchPhase.start(); @@ -606,7 +612,8 @@ public void run() { new SearchRequestContext( new SearchRequestOperationsListener.CompositeListener(List.of(assertingListener), LogManager.getLogger()), searchRequest - ) + ), + NoopTracer.INSTANCE ); canMatchPhase.start(); @@ -730,7 +737,8 @@ public void run() { }; }, SearchResponse.Clusters.EMPTY, - searchRequestContext + searchRequestContext, + NoopTracer.INSTANCE ); canMatchPhase.start(); @@ -779,7 +787,8 @@ private static final class SearchDfsQueryAsyncAction extends AbstractSearchAsync new ArraySearchPhaseResults<>(shardsIts.size()), request.getMaxConcurrentShardRequests(), clusters, - searchRequestContext + searchRequestContext, + NoopTracer.INSTANCE ); this.listener = searchRequestContext.getSearchRequestOperationsListener(); } diff --git a/server/src/test/java/org/opensearch/action/search/SearchAsyncActionTests.java b/server/src/test/java/org/opensearch/action/search/SearchAsyncActionTests.java index af7adc4e58fb8..029ee4f29e76b 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchAsyncActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchAsyncActionTests.java @@ -51,6 +51,7 @@ import org.opensearch.search.internal.AliasFilter; import org.opensearch.search.internal.InternalSearchResponse; import org.opensearch.search.internal.ShardSearchContextId; +import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.transport.Transport; import org.opensearch.transport.TransportException; @@ -138,7 +139,8 @@ public void testSkipSearchShards() throws InterruptedException { new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), SearchResponse.Clusters.EMPTY, - new SearchRequestContext(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), request) + new SearchRequestContext(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), request), + NoopTracer.INSTANCE ) { @Override @@ -257,7 +259,8 @@ public void testLimitConcurrentShardRequests() throws InterruptedException { new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), SearchResponse.Clusters.EMPTY, - new SearchRequestContext(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), request) + new SearchRequestContext(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), request), + NoopTracer.INSTANCE ) { @Override @@ -375,7 +378,8 @@ public void sendFreeContext(Transport.Connection connection, ShardSearchContextI new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), SearchResponse.Clusters.EMPTY, - new SearchRequestContext(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), request) + new SearchRequestContext(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), request), + NoopTracer.INSTANCE ) { TestSearchResponse response = new TestSearchResponse(); @@ -498,7 +502,8 @@ public void sendFreeContext(Transport.Connection connection, ShardSearchContextI new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), SearchResponse.Clusters.EMPTY, - new SearchRequestContext(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), request) + new SearchRequestContext(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), request), + NoopTracer.INSTANCE ) { TestSearchResponse response = new TestSearchResponse(); @@ -612,7 +617,8 @@ public void testAllowPartialResults() throws InterruptedException { new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), SearchResponse.Clusters.EMPTY, - new SearchRequestContext(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), request) + new SearchRequestContext(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), request), + NoopTracer.INSTANCE ) { @Override protected void executePhaseOnShard( diff --git a/server/src/test/java/org/opensearch/action/search/SearchQueryThenFetchAsyncActionTests.java b/server/src/test/java/org/opensearch/action/search/SearchQueryThenFetchAsyncActionTests.java index faf6f86c69c27..bbad2eb935c13 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchQueryThenFetchAsyncActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchQueryThenFetchAsyncActionTests.java @@ -59,6 +59,7 @@ import org.opensearch.search.internal.ShardSearchRequest; import org.opensearch.search.query.QuerySearchResult; import org.opensearch.search.sort.SortBuilders; +import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.InternalAggregationTestCase; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.transport.Transport; @@ -220,7 +221,8 @@ public void sendExecuteQuery( new SearchRequestContext( new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), searchRequest - ) + ), + NoopTracer.INSTANCE ) { @Override protected SearchPhase getNextPhase(SearchPhaseResults results, SearchPhaseContext context) {