Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using generic enumMap for holding phase stats #9

Merged
merged 1 commit into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -189,21 +189,24 @@ public void testSearchWithWRRShardRouting() throws IOException {

for (NodeStats stat : nodeStats.getNodes()) {
SearchStats.Stats searchStats = stat.getIndices().getSearch().getTotal();
if (searchStats.getRequestStatsLongHolder().getSearchPhaseMetricMap().get(SearchPhaseName.QUERY.getName()) > 0) {
if (searchStats.getRequestStatsLongHolder()
.getRequestStatsHolder()
.get(SearchPhaseName.QUERY.getName())
.getTimeInMillis() > 0) {
assertThat(
searchStats.getRequestStatsLongHolder().getSearchPhaseTotalMap().get(SearchPhaseName.QUERY.getName()).longValue(),
searchStats.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.QUERY.getName()).getTotal(),
greaterThan(0L)
);
assertThat(
searchStats.getRequestStatsLongHolder().getSearchPhaseMetricMap().get(SearchPhaseName.FETCH.getName()).longValue(),
searchStats.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.FETCH.getName()).getTimeInMillis(),
greaterThan(0L)
);
assertThat(
searchStats.getRequestStatsLongHolder().getSearchPhaseTotalMap().get(SearchPhaseName.FETCH.getName()).longValue(),
searchStats.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.FETCH.getName()).getTotal(),
greaterThan(0L)
);
assertThat(
searchStats.getRequestStatsLongHolder().getSearchPhaseTotalMap().get(SearchPhaseName.EXPAND.getName()).longValue(),
searchStats.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.EXPAND.getName()).getTotal(),
greaterThan(0L)
);
coordNumber += 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,22 +199,22 @@ public void testSimpleStats() throws Exception {

for (NodeStats stat : nodeStats.getNodes()) {
Stats total = stat.getIndices().getSearch().getTotal();
if (total.getRequestStatsLongHolder().getSearchPhaseMetricMap().get(SearchPhaseName.QUERY.getName()) > 0) {
assertEquals(
iters,
total.getRequestStatsLongHolder().getSearchPhaseTotalMap().get(SearchPhaseName.QUERY.getName()).intValue()
);
if (total.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.QUERY.getName()).getTimeInMillis() > 0) {
assertThat(
total.getRequestStatsLongHolder().getSearchPhaseMetricMap().get(SearchPhaseName.FETCH.getName()),
total.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.FETCH.getName()).getTimeInMillis(),
greaterThan(0L)
);
assertEquals(
iters,
total.getRequestStatsLongHolder().getSearchPhaseTotalMap().get(SearchPhaseName.FETCH.getName()).intValue()
total.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.FETCH.getName()).getTotal()
);
assertEquals(
iters,
total.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.EXPAND.getName()).getTotal()
);
assertEquals(
iters,
total.getRequestStatsLongHolder().getSearchPhaseTotalMap().get(SearchPhaseName.EXPAND.getName()).intValue()
total.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.FETCH.getName()).getTotal()
);
numOfCoordinators += 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
package org.opensearch.action.admin.indices.stats;

import org.apache.lucene.store.AlreadyClosedException;
import org.opensearch.action.RequestStats;
import org.opensearch.common.Nullable;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -285,14 +284,6 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(recoveryStats);
}

// We are adding request stats with a separate setter method since SearchStats was tightly coupled with Shard Search Stats, and all
// nodes won't share the same response in requestStats
public void addRequestStats(RequestStats requestStats) {
if (requestStats.getSearchRequestStats() != null && this.search != null) {
search.setSearchRequestStats(requestStats.getSearchRequestStats());
}
}

public void add(CommonStats stats) {
if (docs == null) {
if (stats.getDocs() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.opensearch.common.util.concurrent.AtomicArray;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.action.ShardOperationFailedException;
import org.opensearch.core.common.util.CollectionUtils;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.search.SearchPhaseResult;
import org.opensearch.search.SearchShardTarget;
Expand All @@ -66,6 +65,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -120,8 +120,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten

private final List<Releasable> releasables = new ArrayList<>();

private SearchRequestOperationsListener searchRequestOperationsListener;
private List<SearchRequestOperationsListener> searchListenersList;
private Optional<SearchRequestOperationsListener> searchRequestOperationsListener;

AbstractSearchAsyncAction(
String name,
Expand All @@ -141,7 +140,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
SearchPhaseResults<Result> resultConsumer,
int maxConcurrentRequestsPerNode,
SearchResponse.Clusters clusters,
List<SearchRequestOperationsListener> searchListenersList
SearchRequestOperationsListener searchRequestOperationsListener
) {
super(name);
final List<SearchShardIterator> toSkipIterators = new ArrayList<>();
Expand Down Expand Up @@ -177,10 +176,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
this.indexRoutings = indexRoutings;
this.results = resultConsumer;
this.clusters = clusters;
if (!CollectionUtils.isEmpty(searchListenersList)) {
this.searchListenersList = searchListenersList;
this.searchRequestOperationsListener = new SearchRequestOperationsListener.CompositeListener(this.searchListenersList, logger);
}
this.searchRequestOperationsListener = Optional.ofNullable(searchRequestOperationsListener);
}

@Override
Expand Down Expand Up @@ -436,16 +432,12 @@ public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPha
}

private void onPhaseEnd() {
if (!CollectionUtils.isEmpty(searchListenersList)) {
searchRequestOperationsListener.onPhaseEnd(this);
}
this.searchRequestOperationsListener.ifPresent(searchRequestOperations -> { searchRequestOperations.onPhaseEnd(this); });
}

private void onPhaseStart(SearchPhase phase) {
setCurrentPhase(phase);
if (!CollectionUtils.isEmpty(searchListenersList)) {
searchRequestOperationsListener.onPhaseStart(this);
}
this.searchRequestOperationsListener.ifPresent(searchRequestOperations -> { searchRequestOperations.onPhaseStart(this); });
}

private void executePhase(SearchPhase phase) {
Expand Down Expand Up @@ -710,9 +702,7 @@ public void sendSearchResponse(InternalSearchResponse internalSearchResponse, At

@Override
public final void onPhaseFailure(SearchPhase phase, String msg, Throwable cause) {
if (!CollectionUtils.isEmpty(searchListenersList)) {
searchRequestOperationsListener.onPhaseFailure(this);
}
this.searchRequestOperationsListener.ifPresent(searchRequestOperations -> searchRequestOperations.onPhaseFailure(this));
raisePhaseFailure(new SearchPhaseExecutionException(phase.getName(), msg, cause, buildShardFailures()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<CanMa
SearchTask task,
Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory,
SearchResponse.Clusters clusters,
List<SearchRequestOperationsListener> searchListenersList
SearchRequestOperationsListener searchRequestOperationsListener
) {
// We set max concurrent shard requests to the number of shards so no throttling happens for can_match requests
super(
Expand All @@ -112,7 +112,7 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<CanMa
new CanMatchSearchPhaseResults(shardsIts.size()),
shardsIts.size(),
clusters,
searchListenersList
searchRequestOperationsListener
);
this.phaseFactory = phaseFactory;
this.shardsIts = shardsIts;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
final ClusterState clusterState,
final SearchTask task,
SearchResponse.Clusters clusters,
List<SearchRequestOperationsListener> searchListenersList
SearchRequestOperationsListener searchRequestOperationsListener
) {
super(
SearchPhaseName.DFS_PRE_QUERY.getName(),
Expand All @@ -97,7 +97,7 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
new ArraySearchPhaseResults<>(shardsIts.size()),
request.getMaxConcurrentShardRequests(),
clusters,
searchListenersList
searchRequestOperationsListener
);
this.queryPhaseResultConsumer = queryPhaseResultConsumer;
this.searchPhaseController = searchPhaseController;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
*
* @opensearch.internal
*/
abstract class SearchPhase implements CheckedRunnable<IOException> {
public abstract class SearchPhase implements CheckedRunnable<IOException> {
private final String name;
private long startTimeInNanos;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,18 @@

package org.opensearch.action.search;

import java.util.HashMap;
import java.util.Map;

/**
* Enum for different Search Phases in OpenSearch
* @opensearch.internal
*/
public enum SearchPhaseName {
DFS_PRE_QUERY("dfs"),
DFS_PRE_QUERY("dfs_pre_query"),
QUERY("query"),
FETCH("fetch"),
DFS_QUERY("dfs_query"),
EXPAND("expand"),
CAN_MATCH("can_match");

private static final Map<String, SearchPhaseName> STRING_TO_ENUM = new HashMap<>();

static {
for (SearchPhaseName searchPhaseName : values()) {
STRING_TO_ENUM.put(searchPhaseName.getName(), searchPhaseName);
}
}

private final String name;

SearchPhaseName(final String name) {
Expand All @@ -40,8 +29,4 @@ public enum SearchPhaseName {
public String getName() {
return name;
}

public static SearchPhaseName getSearchPhaseName(String value) {
return STRING_TO_ENUM.get(value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.opensearch.search.query.QuerySearchResult;
import org.opensearch.transport.Transport;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -83,7 +82,7 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPh
ClusterState clusterState,
SearchTask task,
SearchResponse.Clusters clusters,
List<SearchRequestOperationsListener> searchListenersList
SearchRequestOperationsListener searchRequestOperationsListener
) {
super(
SearchPhaseName.QUERY.getName(),
Expand All @@ -103,7 +102,7 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPh
resultConsumer,
request.getMaxConcurrentShardRequests(),
clusters,
searchListenersList
searchRequestOperationsListener
);
this.topDocsSize = SearchPhaseController.getTopDocsSize(request);
this.trackTotalHitsUpTo = request.resolveTrackTotalHitsUpTo();
Expand Down
Loading