Skip to content

Commit

Permalink
Merge pull request #9 from sgup432/CoordinatorStats
Browse files Browse the repository at this point in the history
Using generic enumMap for holding phase stats
  • Loading branch information
sgup432 authored Sep 19, 2023
2 parents 4d66914 + 706d4f5 commit b0a2ad3
Show file tree
Hide file tree
Showing 21 changed files with 229 additions and 375 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -189,21 +189,24 @@ public void testSearchWithWRRShardRouting() throws IOException {

for (NodeStats stat : nodeStats.getNodes()) {
SearchStats.Stats searchStats = stat.getIndices().getSearch().getTotal();
if (searchStats.getRequestStatsLongHolder().getSearchPhaseMetricMap().get(SearchPhaseName.QUERY.getName()) > 0) {
if (searchStats.getRequestStatsLongHolder()
.getRequestStatsHolder()
.get(SearchPhaseName.QUERY.getName())
.getTimeInMillis() > 0) {
assertThat(
searchStats.getRequestStatsLongHolder().getSearchPhaseTotalMap().get(SearchPhaseName.QUERY.getName()).longValue(),
searchStats.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.QUERY.getName()).getTotal(),
greaterThan(0L)
);
assertThat(
searchStats.getRequestStatsLongHolder().getSearchPhaseMetricMap().get(SearchPhaseName.FETCH.getName()).longValue(),
searchStats.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.FETCH.getName()).getTimeInMillis(),
greaterThan(0L)
);
assertThat(
searchStats.getRequestStatsLongHolder().getSearchPhaseTotalMap().get(SearchPhaseName.FETCH.getName()).longValue(),
searchStats.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.FETCH.getName()).getTotal(),
greaterThan(0L)
);
assertThat(
searchStats.getRequestStatsLongHolder().getSearchPhaseTotalMap().get(SearchPhaseName.EXPAND.getName()).longValue(),
searchStats.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.EXPAND.getName()).getTotal(),
greaterThan(0L)
);
coordNumber += 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,22 +199,22 @@ public void testSimpleStats() throws Exception {

for (NodeStats stat : nodeStats.getNodes()) {
Stats total = stat.getIndices().getSearch().getTotal();
if (total.getRequestStatsLongHolder().getSearchPhaseMetricMap().get(SearchPhaseName.QUERY.getName()) > 0) {
assertEquals(
iters,
total.getRequestStatsLongHolder().getSearchPhaseTotalMap().get(SearchPhaseName.QUERY.getName()).intValue()
);
if (total.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.QUERY.getName()).getTimeInMillis() > 0) {
assertThat(
total.getRequestStatsLongHolder().getSearchPhaseMetricMap().get(SearchPhaseName.FETCH.getName()),
total.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.FETCH.getName()).getTimeInMillis(),
greaterThan(0L)
);
assertEquals(
iters,
total.getRequestStatsLongHolder().getSearchPhaseTotalMap().get(SearchPhaseName.FETCH.getName()).intValue()
total.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.FETCH.getName()).getTotal()
);
assertEquals(
iters,
total.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.EXPAND.getName()).getTotal()
);
assertEquals(
iters,
total.getRequestStatsLongHolder().getSearchPhaseTotalMap().get(SearchPhaseName.EXPAND.getName()).intValue()
total.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.FETCH.getName()).getTotal()
);
numOfCoordinators += 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
package org.opensearch.action.admin.indices.stats;

import org.apache.lucene.store.AlreadyClosedException;
import org.opensearch.action.RequestStats;
import org.opensearch.common.Nullable;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -285,14 +284,6 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(recoveryStats);
}

// We are adding request stats with a separate setter method since SearchStats was tightly coupled with Shard Search Stats, and all
// nodes won't share the same response in requestStats
public void addRequestStats(RequestStats requestStats) {
if (requestStats.getSearchRequestStats() != null && this.search != null) {
search.setSearchRequestStats(requestStats.getSearchRequestStats());
}
}

public void add(CommonStats stats) {
if (docs == null) {
if (stats.getDocs() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.opensearch.common.util.concurrent.AtomicArray;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.action.ShardOperationFailedException;
import org.opensearch.core.common.util.CollectionUtils;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.search.SearchPhaseResult;
import org.opensearch.search.SearchShardTarget;
Expand All @@ -66,6 +65,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -120,8 +120,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten

private final List<Releasable> releasables = new ArrayList<>();

private SearchRequestOperationsListener searchRequestOperationsListener;
private List<SearchRequestOperationsListener> searchListenersList;
private Optional<SearchRequestOperationsListener> searchRequestOperationsListener;

AbstractSearchAsyncAction(
String name,
Expand All @@ -141,7 +140,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
SearchPhaseResults<Result> resultConsumer,
int maxConcurrentRequestsPerNode,
SearchResponse.Clusters clusters,
List<SearchRequestOperationsListener> searchListenersList
SearchRequestOperationsListener searchRequestOperationsListener
) {
super(name);
final List<SearchShardIterator> toSkipIterators = new ArrayList<>();
Expand Down Expand Up @@ -177,10 +176,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
this.indexRoutings = indexRoutings;
this.results = resultConsumer;
this.clusters = clusters;
if (!CollectionUtils.isEmpty(searchListenersList)) {
this.searchListenersList = searchListenersList;
this.searchRequestOperationsListener = new SearchRequestOperationsListener.CompositeListener(this.searchListenersList, logger);
}
this.searchRequestOperationsListener = Optional.ofNullable(searchRequestOperationsListener);
}

@Override
Expand Down Expand Up @@ -436,16 +432,12 @@ public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPha
}

private void onPhaseEnd() {
if (!CollectionUtils.isEmpty(searchListenersList)) {
searchRequestOperationsListener.onPhaseEnd(this);
}
this.searchRequestOperationsListener.ifPresent(searchRequestOperations -> { searchRequestOperations.onPhaseEnd(this); });
}

private void onPhaseStart(SearchPhase phase) {
setCurrentPhase(phase);
if (!CollectionUtils.isEmpty(searchListenersList)) {
searchRequestOperationsListener.onPhaseStart(this);
}
this.searchRequestOperationsListener.ifPresent(searchRequestOperations -> { searchRequestOperations.onPhaseStart(this); });
}

private void executePhase(SearchPhase phase) {
Expand Down Expand Up @@ -710,9 +702,7 @@ public void sendSearchResponse(InternalSearchResponse internalSearchResponse, At

@Override
public final void onPhaseFailure(SearchPhase phase, String msg, Throwable cause) {
if (!CollectionUtils.isEmpty(searchListenersList)) {
searchRequestOperationsListener.onPhaseFailure(this);
}
this.searchRequestOperationsListener.ifPresent(searchRequestOperations -> searchRequestOperations.onPhaseFailure(this));
raisePhaseFailure(new SearchPhaseExecutionException(phase.getName(), msg, cause, buildShardFailures()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<CanMa
SearchTask task,
Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory,
SearchResponse.Clusters clusters,
List<SearchRequestOperationsListener> searchListenersList
SearchRequestOperationsListener searchRequestOperationsListener
) {
// We set max concurrent shard requests to the number of shards so no throttling happens for can_match requests
super(
Expand All @@ -112,7 +112,7 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<CanMa
new CanMatchSearchPhaseResults(shardsIts.size()),
shardsIts.size(),
clusters,
searchListenersList
searchRequestOperationsListener
);
this.phaseFactory = phaseFactory;
this.shardsIts = shardsIts;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
final ClusterState clusterState,
final SearchTask task,
SearchResponse.Clusters clusters,
List<SearchRequestOperationsListener> searchListenersList
SearchRequestOperationsListener searchRequestOperationsListener
) {
super(
SearchPhaseName.DFS_PRE_QUERY.getName(),
Expand All @@ -97,7 +97,7 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
new ArraySearchPhaseResults<>(shardsIts.size()),
request.getMaxConcurrentShardRequests(),
clusters,
searchListenersList
searchRequestOperationsListener
);
this.queryPhaseResultConsumer = queryPhaseResultConsumer;
this.searchPhaseController = searchPhaseController;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
*
* @opensearch.internal
*/
abstract class SearchPhase implements CheckedRunnable<IOException> {
public abstract class SearchPhase implements CheckedRunnable<IOException> {
private final String name;
private long startTimeInNanos;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,18 @@

package org.opensearch.action.search;

import java.util.HashMap;
import java.util.Map;

/**
* Enum for different Search Phases in OpenSearch
* @opensearch.internal
*/
public enum SearchPhaseName {
DFS_PRE_QUERY("dfs"),
DFS_PRE_QUERY("dfs_pre_query"),
QUERY("query"),
FETCH("fetch"),
DFS_QUERY("dfs_query"),
EXPAND("expand"),
CAN_MATCH("can_match");

private static final Map<String, SearchPhaseName> STRING_TO_ENUM = new HashMap<>();

static {
for (SearchPhaseName searchPhaseName : values()) {
STRING_TO_ENUM.put(searchPhaseName.getName(), searchPhaseName);
}
}

private final String name;

SearchPhaseName(final String name) {
Expand All @@ -40,8 +29,4 @@ public enum SearchPhaseName {
public String getName() {
return name;
}

public static SearchPhaseName getSearchPhaseName(String value) {
return STRING_TO_ENUM.get(value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.opensearch.search.query.QuerySearchResult;
import org.opensearch.transport.Transport;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -83,7 +82,7 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPh
ClusterState clusterState,
SearchTask task,
SearchResponse.Clusters clusters,
List<SearchRequestOperationsListener> searchListenersList
SearchRequestOperationsListener searchRequestOperationsListener
) {
super(
SearchPhaseName.QUERY.getName(),
Expand All @@ -103,7 +102,7 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPh
resultConsumer,
request.getMaxConcurrentShardRequests(),
clusters,
searchListenersList
searchRequestOperationsListener
);
this.topDocsSize = SearchPhaseController.getTopDocsSize(request);
this.trackTotalHitsUpTo = request.resolveTrackTotalHitsUpTo();
Expand Down
Loading

0 comments on commit b0a2ad3

Please sign in to comment.