Skip to content

Commit

Permalink
track resource usage for failed requests
Browse files Browse the repository at this point in the history
Signed-off-by: Chenyang Ji <[email protected]>
  • Loading branch information
ansjcy committed May 30, 2024
1 parent d972ef1 commit 8c9109a
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,10 @@ private void onRequestEnd(SearchRequestContext searchRequestContext) {
this.searchRequestContext.getSearchRequestOperationsListener().onRequestEnd(this, searchRequestContext);
}

private void onRequestFailure(SearchRequestContext searchRequestContext) {
this.searchRequestContext.getSearchRequestOperationsListener().onRequestFailure(this, searchRequestContext);
}

private void executePhase(SearchPhase phase) {
Span phaseSpan = tracer.startSpan(SpanCreationContext.server().name("[phase/" + phase.getName() + "]"));
try (final SpanScope scope = tracer.withSpanInScope(phaseSpan)) {
Expand Down Expand Up @@ -507,6 +511,7 @@ ShardSearchFailure[] buildShardFailures() {
private void onShardFailure(final int shardIndex, @Nullable SearchShardTarget shard, final SearchShardIterator shardIt, Exception e) {
// we always add the shard failure for a specific shard instance
// we do make sure to clean it on a successful response from a shard
setPhaseResourceUsages();
onShardFailure(shardIndex, shard, e);
SearchShardTarget nextShard = FailAwareWeightedRouting.getInstance()
.findNext(shardIt, clusterState, e, () -> totalOps.incrementAndGet());
Expand Down Expand Up @@ -757,6 +762,7 @@ public void sendSearchResponse(InternalSearchResponse internalSearchResponse, At

@Override
public final void onPhaseFailure(SearchPhase phase, String msg, Throwable cause) {
setPhaseResourceUsages();
if (currentPhaseHasLifecycle) {
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseFailure(this, cause);
}
Expand Down Expand Up @@ -786,6 +792,7 @@ private void raisePhaseFailure(SearchPhaseExecutionException exception) {
});
}
Releasables.close(releasables);
onRequestFailure(searchRequestContext);
listener.onFailure(exception);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ public void onFailure(Exception e) {
e
);
progressListener.notifyFetchFailure(shardIndex, shardTarget, e);
context.setPhaseResourceUsages();
counter.onFailure(shardIndex, shardTarget, e);
} finally {
// the search context might not be cleared on the node where the fetch was executed for example
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ protected void onRequestStart(SearchRequestContext searchRequestContext) {}

protected void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}

protected void onRequestFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}

protected boolean isEnabled(SearchRequest searchRequest) {
return isEnabled();
}
Expand Down Expand Up @@ -133,6 +135,17 @@ public void onRequestEnd(SearchPhaseContext context, SearchRequestContext search
}
}

@Override
public void onRequestFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
for (SearchRequestOperationsListener listener : listeners) {
try {
listener.onRequestFailure(context, searchRequestContext);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onRequestFailure listener [{}] failed", listener), e);
}
}
}

public List<SearchRequestOperationsListener> getListeners() {
return listeners;
}
Expand Down
37 changes: 17 additions & 20 deletions server/src/main/java/org/opensearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -550,12 +550,13 @@ private DfsSearchResult executeDfsPhase(ShardSearchRequest request, SearchShardT
SearchContext context = createContext(readerContext, request, task, true)
) {
dfsPhase.execute(context);
writeTaskResourceUsage(task);
return context.dfsResult();
} catch (Exception e) {
logger.trace("Dfs phase failed", e);
processFailure(readerContext, e);
throw e;
} finally {
writeTaskResourceUsage(task);
}
}

Expand Down Expand Up @@ -646,7 +647,6 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchSh
final RescoreDocIds rescoreDocIds = context.rescoreDocIds();
context.queryResult().setRescoreDocIds(rescoreDocIds);
readerContext.setRescoreDocIds(rescoreDocIds);
writeTaskResourceUsage(task);
return context.queryResult();
}
} catch (Exception e) {
Expand All @@ -659,6 +659,8 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchSh
logger.trace("Query phase failed", e);
processFailure(readerContext, e);
throw e;
} finally {
writeTaskResourceUsage(task);
}
}

Expand All @@ -671,9 +673,7 @@ private QueryFetchSearchResult executeFetchPhase(ReaderContext reader, SearchCon
}
executor.success();
}
QueryFetchSearchResult result = new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
writeTaskResourceUsage(context.getTask());
return result;
return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
}

public void executeQueryPhase(
Expand Down Expand Up @@ -701,16 +701,13 @@ public void executeQueryPhase(
queryPhase.execute(searchContext);
executor.success();
readerContext.setRescoreDocIds(searchContext.rescoreDocIds());
ScrollQuerySearchResult scrollQuerySearchResult = new ScrollQuerySearchResult(
searchContext.queryResult(),
searchContext.shardTarget()
);
writeTaskResourceUsage(task);
return scrollQuerySearchResult;
return new ScrollQuerySearchResult(searchContext.queryResult(), searchContext.shardTarget());
} catch (Exception e) {
logger.trace("Query phase failed", e);
// we handle the failure in the failure listener below
throw e;
} finally {
writeTaskResourceUsage(task);
}
}, wrapFailureListener(listener, readerContext, markAsUsed));
}
Expand All @@ -737,13 +734,14 @@ public void executeQueryPhase(QuerySearchRequest request, SearchShardTask task,
final RescoreDocIds rescoreDocIds = searchContext.rescoreDocIds();
searchContext.queryResult().setRescoreDocIds(rescoreDocIds);
readerContext.setRescoreDocIds(rescoreDocIds);
writeTaskResourceUsage(task);
return searchContext.queryResult();
} catch (Exception e) {
assert TransportActions.isShardNotAvailableException(e) == false : new AssertionError(e);
logger.trace("Query phase failed", e);
// we handle the failure in the failure listener below
throw e;
} finally {
writeTaskResourceUsage(task);
}
}, wrapFailureListener(listener, readerContext, markAsUsed));
}
Expand Down Expand Up @@ -787,17 +785,14 @@ public void executeFetchPhase(
queryPhase.execute(searchContext);
final long afterQueryTime = executor.success();
QueryFetchSearchResult fetchSearchResult = executeFetchPhase(readerContext, searchContext, afterQueryTime);
ScrollQueryFetchSearchResult scrollQueryFetchSearchResult = new ScrollQueryFetchSearchResult(
fetchSearchResult,
searchContext.shardTarget()
);
writeTaskResourceUsage(task);
return scrollQueryFetchSearchResult;
return new ScrollQueryFetchSearchResult(fetchSearchResult, searchContext.shardTarget());
} catch (Exception e) {
assert TransportActions.isShardNotAvailableException(e) == false : new AssertionError(e);
logger.trace("Fetch phase failed", e);
// we handle the failure in the failure listener below
throw e;
} finally {
writeTaskResourceUsage(task);
}
}, wrapFailureListener(listener, readerContext, markAsUsed));
}
Expand All @@ -823,12 +818,13 @@ public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, A
}
executor.success();
}
writeTaskResourceUsage(task);
return searchContext.fetchResult();
} catch (Exception e) {
assert TransportActions.isShardNotAvailableException(e) == false : new AssertionError(e);
// we handle the failure in the failure listener below
throw e;
} finally {
writeTaskResourceUsage(task);
}
}, wrapFailureListener(listener, readerContext, markAsUsed));
}
Expand Down Expand Up @@ -1045,6 +1041,7 @@ final SearchContext createContext(
context.size(DEFAULT_SIZE);
}
context.setTask(task);

// pre process
queryPhase.preProcess(context);
} catch (Exception e) {
Expand Down Expand Up @@ -1143,7 +1140,7 @@ private DefaultSearchContext createSearchContext(ReaderContext reader, ShardSear

private void writeTaskResourceUsage(SearchShardTask task) {
try {
// Get resource usages when task starts
// Get resource usages from when the task started
ThreadResourceInfo threadResourceInfo = task.getActiveThreadResourceInfo(
Thread.currentThread().getId(),
ResourceStatsType.WORKER_STATS
Expand Down

0 comments on commit 8c9109a

Please sign in to comment.