Skip to content

Commit

Permalink
Refactor search phase tracing instrumentation to detach span creation…
Browse files Browse the repository at this point in the history
… from the SearchRequestOperationsListener

Signed-off-by: Andriy Redko <[email protected]>
  • Loading branch information
reta authored and dzane17 committed Feb 19, 2024
1 parent fcde9cb commit ec73bef
Show file tree
Hide file tree
Showing 12 changed files with 101 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,19 @@ public SpanContext(Span span) {
Span getSpan() {
return span;
}

/**
* Sets the error for the current span behind this context
* @param cause error
*/
public void setError(final Exception cause) {
span.setError(cause);
}

/**
* Ends current span
*/
public void endSpan() {
span.endSpan();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ public SpanCreationContext attributes(Attributes attributes) {
}

/**
* Sets the parent for spann
* @param parent parent
* Sets the parent for span
* @param parent parent span context
* @return spanCreationContext
*/
public SpanCreationContext parent(SpanContext parent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.internal.ShardSearchRequest;
import org.opensearch.search.pipeline.PipelinedRequest;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanCreationContext;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.transport.Transport;

import java.util.ArrayDeque;
Expand Down Expand Up @@ -116,6 +120,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final Map<String, PendingExecutions> pendingExecutionsPerNode = new ConcurrentHashMap<>();
private final boolean throttleConcurrentRequests;
private final SearchRequestContext searchRequestContext;
private final Tracer tracer;

private SearchPhase currentPhase;
private boolean currentPhaseHasLifecycle;
Expand All @@ -140,7 +145,8 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
SearchPhaseResults<Result> resultConsumer,
int maxConcurrentRequestsPerNode,
SearchResponse.Clusters clusters,
SearchRequestContext searchRequestContext
SearchRequestContext searchRequestContext,
Tracer tracer
) {
super(name);
final List<SearchShardIterator> toSkipIterators = new ArrayList<>();
Expand Down Expand Up @@ -177,6 +183,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
this.results = resultConsumer;
this.clusters = clusters;
this.searchRequestContext = searchRequestContext;
this.tracer = tracer;
}

@Override
Expand Down Expand Up @@ -455,7 +462,8 @@ private void onRequestEnd(SearchRequestContext searchRequestContext) {
}

private void executePhase(SearchPhase phase) {
try {
final Span phaseSpan = tracer.startSpan(SpanCreationContext.server().name("[phase/" + phase.getName() + "]"));
try (final SpanScope scope = tracer.withSpanInScope(phaseSpan)) {
onPhaseStart(phase);
phase.recordAndRun();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.search.sort.FieldSortBuilder;
import org.opensearch.search.sort.MinAndMax;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.transport.Transport;

import java.util.Comparator;
Expand Down Expand Up @@ -91,7 +92,8 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<CanMa
SearchTask task,
Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory,
SearchResponse.Clusters clusters,
SearchRequestContext searchRequestContext
SearchRequestContext searchRequestContext,
Tracer tracer
) {
// We set max concurrent shard requests to the number of shards so no throttling happens for can_match requests
super(
Expand All @@ -112,7 +114,8 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<CanMa
new CanMatchSearchPhaseResults(shardsIts.size()),
shardsIts.size(),
clusters,
searchRequestContext
searchRequestContext,
tracer
);
this.phaseFactory = phaseFactory;
this.shardsIts = shardsIts;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.search.dfs.AggregatedDfs;
import org.opensearch.search.dfs.DfsSearchResult;
import org.opensearch.search.internal.AliasFilter;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.transport.Transport;

import java.util.List;
Expand Down Expand Up @@ -77,7 +78,8 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
final ClusterState clusterState,
final SearchTask task,
SearchResponse.Clusters clusters,
SearchRequestContext searchRequestContext
SearchRequestContext searchRequestContext,
final Tracer tracer
) {
super(
SearchPhaseName.DFS_PRE_QUERY.getName(),
Expand All @@ -97,7 +99,8 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
new ArraySearchPhaseResults<>(shardsIts.size()),
request.getMaxConcurrentShardRequests(),
clusters,
searchRequestContext
searchRequestContext,
tracer
);
this.queryPhaseResultConsumer = queryPhaseResultConsumer;
this.searchPhaseController = searchPhaseController;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.internal.ShardSearchRequest;
import org.opensearch.search.query.QuerySearchResult;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.transport.Transport;

import java.util.Map;
Expand Down Expand Up @@ -82,7 +83,8 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPh
ClusterState clusterState,
SearchTask task,
SearchResponse.Clusters clusters,
SearchRequestContext searchRequestContext
SearchRequestContext searchRequestContext,
final Tracer tracer
) {
super(
SearchPhaseName.QUERY.getName(),
Expand All @@ -102,7 +104,8 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPh
resultConsumer,
request.getMaxConcurrentShardRequests(),
clusters,
searchRequestContext
searchRequestContext,
tracer
);
this.topDocsSize = SearchPhaseController.getTopDocsSize(request);
this.trackTotalHitsUpTo = request.resolveTrackTotalHitsUpTo();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,8 @@ public AbstractSearchAsyncAction<? extends SearchPhaseResult> asyncSearchAction(
new ArraySearchPhaseResults<>(shardsIts.size()),
searchRequest.getMaxConcurrentShardRequests(),
clusters,
searchRequestContext
searchRequestContext,
tracer
) {
@Override
protected void executePhaseOnShard(
Expand Down Expand Up @@ -1258,7 +1259,8 @@ private AbstractSearchAsyncAction<? extends SearchPhaseResult> searchAsyncAction
)
),
clusters,
searchRequestContext
searchRequestContext,
tracer
);
} else {
final QueryPhaseResultConsumer queryResultConsumer = searchPhaseController.newSearchPhaseResults(
Expand Down Expand Up @@ -1289,7 +1291,8 @@ private AbstractSearchAsyncAction<? extends SearchPhaseResult> searchAsyncAction
clusterState,
task,
clusters,
searchRequestContext
searchRequestContext,
tracer
);
break;
case QUERY_THEN_FETCH:
Expand All @@ -1310,7 +1313,8 @@ private AbstractSearchAsyncAction<? extends SearchPhaseResult> searchAsyncAction
clusterState,
task,
clusters,
searchRequestContext
searchRequestContext,
tracer
);
break;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,9 @@
import org.opensearch.action.search.SearchRequestOperationsListener;
import org.opensearch.telemetry.tracing.AttributeNames;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanBuilder;
import org.opensearch.telemetry.tracing.SpanContext;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.Tracer;

import static org.opensearch.core.common.Strings.capitalize;

/**
* SearchRequestOperationsListener subscriber for search request tracing
*
Expand All @@ -28,14 +24,12 @@
public final class TraceableSearchRequestOperationsListener extends SearchRequestOperationsListener {
private final Tracer tracer;
private final Span requestSpan;
private Span phaseSpan;
private SpanScope phaseSpanScope;
private SpanContext phaseSpanContext;

public TraceableSearchRequestOperationsListener(final Tracer tracer, final Span requestSpan) {
this.tracer = tracer;
this.requestSpan = requestSpan;
this.phaseSpan = null;
this.phaseSpanScope = null;
this.phaseSpanContext = null;
}

public static SearchRequestOperationsListener create(final Tracer tracer, final Span requestSpan) {
Expand All @@ -48,23 +42,23 @@ public static SearchRequestOperationsListener create(final Tracer tracer, final

@Override
protected void onPhaseStart(SearchPhaseContext context) {
assert phaseSpan == null : "There should be only one search phase active at a time";
phaseSpan = tracer.startSpan(SpanBuilder.from(capitalize(context.getCurrentPhase().getName()), new SpanContext(requestSpan)));
assert phaseSpanContext == null : "There should be only one search phase active at a time";
phaseSpanContext = tracer.getCurrentSpan();
}

@Override
protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
assert phaseSpan != null : "There should be a search phase active at that time";
phaseSpan.endSpan();
phaseSpan = null;
assert phaseSpanContext != null : "There should be a search phase active at that time";
phaseSpanContext.endSpan();
phaseSpanContext = null;
}

@Override
protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) {
assert phaseSpan != null : "There should be a search phase active at that time";
phaseSpan.setError(new Exception(cause));
phaseSpan.endSpan();
phaseSpan = null;
assert phaseSpanContext != null : "There should be a search phase active at that time";
phaseSpanContext.setError((Exception) cause);
phaseSpanContext.endSpan();
phaseSpanContext = null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.opensearch.search.internal.ShardSearchContextId;
import org.opensearch.search.internal.ShardSearchRequest;
import org.opensearch.search.query.QuerySearchResult;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.InternalAggregationTestCase;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
Expand Down Expand Up @@ -122,7 +123,7 @@ protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searc
}

@Override
protected void onPhaseFailure(SearchPhaseContext context) {
protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) {
assertThat(phase, is(context.getCurrentPhase()));
phase = null;
}
Expand Down Expand Up @@ -205,7 +206,8 @@ private AbstractSearchAsyncAction<SearchPhaseResult> createAction(
new SearchRequestContext(
new SearchRequestOperationsListener.CompositeListener(List.of(assertingListener), LogManager.getLogger()),
request
)
),
NoopTracer.INSTANCE
) {
@Override
protected SearchPhase getNextPhase(final SearchPhaseResults<SearchPhaseResult> results, SearchPhaseContext context) {
Expand Down Expand Up @@ -746,7 +748,8 @@ private SearchDfsQueryThenFetchAsyncAction createSearchDfsQueryThenFetchAsyncAct
new SearchRequestContext(
new SearchRequestOperationsListener.CompositeListener(searchRequestOperationsListeners, logger),
searchRequest
)
),
NoopTracer.INSTANCE
);
}

Expand Down Expand Up @@ -799,7 +802,8 @@ private SearchQueryThenFetchAsyncAction createSearchQueryThenFetchAsyncAction(
new SearchRequestContext(
new SearchRequestOperationsListener.CompositeListener(searchRequestOperationsListeners, logger),
searchRequest
)
),
NoopTracer.INSTANCE
) {
@Override
ShardSearchFailure[] buildShardFailures() {
Expand Down
Loading

0 comments on commit ec73bef

Please sign in to comment.