Skip to content

Commit

Permalink
Tracing for deep search path (opensearch-project#12103)
Browse files Browse the repository at this point in the history
* Tracing for deep search path

Signed-off-by: David Zane <[email protected]>

* Refactor search phase tracing instrumentation to detach span creation from the SearchRequestOperationsListener

Signed-off-by: Andriy Redko <[email protected]>

* Close WrappedPhase span

Signed-off-by: David Zane <[email protected]>

* Add asserting listener class

Signed-off-by: David Zane <[email protected]>

* Cleanup AbstractSearchAsyncActionTests tests

Signed-off-by: Andriy Redko <[email protected]>

* Cleanup SearchAsyncActionTests tests

Signed-off-by: Andriy Redko <[email protected]>

* Cleanup CanMatchPreFilterSearchPhaseTests tests

Signed-off-by: Andriy Redko <[email protected]>

---------

Signed-off-by: David Zane <[email protected]>
Signed-off-by: Andriy Redko <[email protected]>
Co-authored-by: Andriy Redko <[email protected]>
  • Loading branch information
2 people authored and rayshrey committed Mar 18, 2024
1 parent fa4d913 commit ee27b4b
Show file tree
Hide file tree
Showing 26 changed files with 506 additions and 201 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add Remote Store Migration Experimental flag and allow mixed mode clusters under same ([#11986](https://github.com/opensearch-project/OpenSearch/pull/11986))
- Remote reindex: Add support for configurable retry mechanism ([#12561](https://github.com/opensearch-project/OpenSearch/pull/12561))
- [Admission Control] Integrate IO Usage Tracker to the Resource Usage Collector Service and Emit IO Usage Stats ([#11880](https://github.com/opensearch-project/OpenSearch/pull/11880))
- Tracing for deep search path ([#12103](https://github.com/opensearch-project/OpenSearch/pull/12103))

### Dependencies
- Bump `log4j-core` from 2.18.0 to 2.19.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,19 @@ public SpanContext(Span span) {
Span getSpan() {
return span;
}

/**
* Sets the error for the current span behind this context
* @param cause error
*/
public void setError(final Exception cause) {
span.setError(cause);
}

/**
* Ends current span
*/
public void endSpan() {
span.endSpan();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ public SpanCreationContext attributes(Attributes attributes) {
}

/**
* Sets the parent for spann
* @param parent parent
* Sets the parent for span
* @param parent parent span context
* @return spanCreationContext
*/
public SpanCreationContext parent(SpanContext parent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public void onPhaseStart(SearchPhaseContext context) {}
public void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}

@Override
public void onPhaseFailure(SearchPhaseContext context) {}
public void onPhaseFailure(SearchPhaseContext context, Throwable cause) {}

@Override
public void onRequestStart(SearchRequestContext searchRequestContext) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.internal.ShardSearchRequest;
import org.opensearch.search.pipeline.PipelinedRequest;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanCreationContext;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.transport.Transport;

import java.util.ArrayDeque;
Expand Down Expand Up @@ -116,6 +120,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final Map<String, PendingExecutions> pendingExecutionsPerNode = new ConcurrentHashMap<>();
private final boolean throttleConcurrentRequests;
private final SearchRequestContext searchRequestContext;
private final Tracer tracer;

private SearchPhase currentPhase;
private boolean currentPhaseHasLifecycle;
Expand All @@ -140,7 +145,8 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
SearchPhaseResults<Result> resultConsumer,
int maxConcurrentRequestsPerNode,
SearchResponse.Clusters clusters,
SearchRequestContext searchRequestContext
SearchRequestContext searchRequestContext,
Tracer tracer
) {
super(name);
final List<SearchShardIterator> toSkipIterators = new ArrayList<>();
Expand Down Expand Up @@ -177,6 +183,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
this.results = resultConsumer;
this.clusters = clusters;
this.searchRequestContext = searchRequestContext;
this.tracer = tracer;
}

@Override
Expand Down Expand Up @@ -221,6 +228,7 @@ public final void start() {
null
)
);
onRequestEnd(searchRequestContext);
return;
}
executePhase(this);
Expand Down Expand Up @@ -460,15 +468,24 @@ private void onRequestEnd(SearchRequestContext searchRequestContext) {
}

private void executePhase(SearchPhase phase) {
try {
Span phaseSpan = tracer.startSpan(SpanCreationContext.server().name("[phase/" + phase.getName() + "]"));
try (final SpanScope scope = tracer.withSpanInScope(phaseSpan)) {
onPhaseStart(phase);
phase.recordAndRun();
} catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug(new ParameterizedMessage("Failed to execute [{}] while moving to [{}] phase", request, phase.getName()), e);
}

if (currentPhaseHasLifecycle == false) {
phaseSpan.setError(e);
}

onPhaseFailure(phase, "", e);
} finally {
if (currentPhaseHasLifecycle == false) {
phaseSpan.endSpan();
}
}
}

Expand Down Expand Up @@ -733,7 +750,7 @@ public void sendSearchResponse(InternalSearchResponse internalSearchResponse, At
@Override
public final void onPhaseFailure(SearchPhase phase, String msg, Throwable cause) {
if (currentPhaseHasLifecycle) {
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseFailure(this);
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseFailure(this, cause);
}
raisePhaseFailure(new SearchPhaseExecutionException(phase.getName(), msg, cause, buildShardFailures()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.search.sort.FieldSortBuilder;
import org.opensearch.search.sort.MinAndMax;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.transport.Transport;

import java.util.Comparator;
Expand Down Expand Up @@ -91,7 +92,8 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<CanMa
SearchTask task,
Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory,
SearchResponse.Clusters clusters,
SearchRequestContext searchRequestContext
SearchRequestContext searchRequestContext,
Tracer tracer
) {
// We set max concurrent shard requests to the number of shards so no throttling happens for can_match requests
super(
Expand All @@ -112,7 +114,8 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<CanMa
new CanMatchSearchPhaseResults(shardsIts.size()),
shardsIts.size(),
clusters,
searchRequestContext
searchRequestContext,
tracer
);
this.phaseFactory = phaseFactory;
this.shardsIts = shardsIts;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.search.dfs.AggregatedDfs;
import org.opensearch.search.dfs.DfsSearchResult;
import org.opensearch.search.internal.AliasFilter;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.transport.Transport;

import java.util.List;
Expand Down Expand Up @@ -77,7 +78,8 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
final ClusterState clusterState,
final SearchTask task,
SearchResponse.Clusters clusters,
SearchRequestContext searchRequestContext
SearchRequestContext searchRequestContext,
final Tracer tracer
) {
super(
SearchPhaseName.DFS_PRE_QUERY.getName(),
Expand All @@ -97,7 +99,8 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
new ArraySearchPhaseResults<>(shardsIts.size()),
request.getMaxConcurrentShardRequests(),
clusters,
searchRequestContext
searchRequestContext,
tracer
);
this.queryPhaseResultConsumer = queryPhaseResultConsumer;
this.searchPhaseController = searchPhaseController;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.internal.ShardSearchRequest;
import org.opensearch.search.query.QuerySearchResult;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.transport.Transport;

import java.util.Map;
Expand Down Expand Up @@ -82,7 +83,8 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPh
ClusterState clusterState,
SearchTask task,
SearchResponse.Clusters clusters,
SearchRequestContext searchRequestContext
SearchRequestContext searchRequestContext,
final Tracer tracer
) {
super(
SearchPhaseName.QUERY.getName(),
Expand All @@ -102,7 +104,8 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPh
resultConsumer,
request.getMaxConcurrentShardRequests(),
clusters,
searchRequestContext
searchRequestContext,
tracer
);
this.topDocsSize = SearchPhaseController.getTopDocsSize(request);
this.trackTotalHitsUpTo = request.resolveTrackTotalHitsUpTo();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ void setTotalHits(TotalHits totalHits) {
this.totalHits = totalHits;
}

TotalHits totalHits() {
public TotalHits totalHits() {
return totalHits;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@
@InternalApi
public abstract class SearchRequestOperationsListener {
private volatile boolean enabled;
public static final SearchRequestOperationsListener NOOP = new SearchRequestOperationsListener(false) {
@Override
protected void onPhaseStart(SearchPhaseContext context) {}

@Override
protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}

@Override
protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) {}
};

protected SearchRequestOperationsListener() {
this.enabled = true;
Expand All @@ -35,7 +45,7 @@ protected SearchRequestOperationsListener(final boolean enabled) {

protected abstract void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext);

protected abstract void onPhaseFailure(SearchPhaseContext context);
protected abstract void onPhaseFailure(SearchPhaseContext context, Throwable cause);

protected void onRequestStart(SearchRequestContext searchRequestContext) {}

Expand Down Expand Up @@ -91,10 +101,10 @@ protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searc
}

@Override
protected void onPhaseFailure(SearchPhaseContext context) {
protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) {
for (SearchRequestOperationsListener listener : listeners) {
try {
listener.onPhaseFailure(context);
listener.onPhaseFailure(context, cause);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onPhaseFailure listener [{}] failed", listener), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ protected void onPhaseStart(SearchPhaseContext context) {}
protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}

@Override
protected void onPhaseFailure(SearchPhaseContext context) {}
protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) {}

@Override
protected void onRequestStart(SearchRequestContext searchRequestContext) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searc
}

@Override
protected void onPhaseFailure(SearchPhaseContext context) {
protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) {
phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName()).current.dec();
}

Expand Down
Loading

0 comments on commit ee27b4b

Please sign in to comment.