Skip to content

Commit

Permalink
[8.x] Add Search Phase APM metrics (elastic#113194) (elastic#116751)
Browse files Browse the repository at this point in the history
  • Loading branch information
carlosdelest authored Nov 14, 2024
1 parent 2811a76 commit 161b7ef
Show file tree
Hide file tree
Showing 17 changed files with 371 additions and 311 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/113194.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 113194
summary: Add Search Phase APM metrics
area: Search
type: enhancement
issues: []

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -67,20 +67,6 @@
import java.util.concurrent.Executor;
import java.util.function.BiFunction;

import static org.elasticsearch.action.search.SearchTransportAPMMetrics.ACTION_ATTRIBUTE_NAME;
import static org.elasticsearch.action.search.SearchTransportAPMMetrics.CLEAR_SCROLL_CONTEXTS_ACTION_METRIC;
import static org.elasticsearch.action.search.SearchTransportAPMMetrics.DFS_ACTION_METRIC;
import static org.elasticsearch.action.search.SearchTransportAPMMetrics.FETCH_ID_ACTION_METRIC;
import static org.elasticsearch.action.search.SearchTransportAPMMetrics.FETCH_ID_SCROLL_ACTION_METRIC;
import static org.elasticsearch.action.search.SearchTransportAPMMetrics.FREE_CONTEXT_ACTION_METRIC;
import static org.elasticsearch.action.search.SearchTransportAPMMetrics.FREE_CONTEXT_SCROLL_ACTION_METRIC;
import static org.elasticsearch.action.search.SearchTransportAPMMetrics.QUERY_ACTION_METRIC;
import static org.elasticsearch.action.search.SearchTransportAPMMetrics.QUERY_CAN_MATCH_NODE_METRIC;
import static org.elasticsearch.action.search.SearchTransportAPMMetrics.QUERY_FETCH_SCROLL_ACTION_METRIC;
import static org.elasticsearch.action.search.SearchTransportAPMMetrics.QUERY_ID_ACTION_METRIC;
import static org.elasticsearch.action.search.SearchTransportAPMMetrics.QUERY_SCROLL_ACTION_METRIC;
import static org.elasticsearch.action.search.SearchTransportAPMMetrics.RANK_SHARD_FEATURE_ACTION_METRIC;

/**
* An encapsulation of {@link org.elasticsearch.search.SearchService} operations exposed through
* transport.
Expand Down Expand Up @@ -450,11 +436,7 @@ public void writeTo(StreamOutput out) throws IOException {
}
}

public static void registerRequestHandler(
TransportService transportService,
SearchService searchService,
SearchTransportAPMMetrics searchTransportMetrics
) {
public static void registerRequestHandler(TransportService transportService, SearchService searchService) {
final TransportRequestHandler<ScrollFreeContextRequest> freeContextHandler = (request, channel, task) -> {
logger.trace("releasing search context [{}]", request.id());
boolean freed = searchService.freeReaderContext(request.id());
Expand All @@ -465,7 +447,7 @@ public static void registerRequestHandler(
FREE_CONTEXT_SCROLL_ACTION_NAME,
freeContextExecutor,
ScrollFreeContextRequest::new,
instrumentedHandler(FREE_CONTEXT_SCROLL_ACTION_METRIC, transportService, searchTransportMetrics, freeContextHandler)
freeContextHandler
);
TransportActionProxy.registerProxyAction(
transportService,
Expand All @@ -478,18 +460,18 @@ public static void registerRequestHandler(
FREE_CONTEXT_ACTION_NAME,
freeContextExecutor,
SearchFreeContextRequest::new,
instrumentedHandler(FREE_CONTEXT_ACTION_METRIC, transportService, searchTransportMetrics, freeContextHandler)
freeContextHandler
);
TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_ACTION_NAME, false, SearchFreeContextResponse::readFrom);

transportService.registerRequestHandler(
CLEAR_SCROLL_CONTEXTS_ACTION_NAME,
freeContextExecutor,
ClearScrollContextsRequest::new,
instrumentedHandler(CLEAR_SCROLL_CONTEXTS_ACTION_METRIC, transportService, searchTransportMetrics, (request, channel, task) -> {
(request, channel, task) -> {
searchService.freeAllScrollContexts();
channel.sendResponse(TransportResponse.Empty.INSTANCE);
})
}
);
TransportActionProxy.registerProxyAction(
transportService,
Expand All @@ -502,32 +484,18 @@ public static void registerRequestHandler(
DFS_ACTION_NAME,
EsExecutors.DIRECT_EXECUTOR_SERVICE,
ShardSearchRequest::new,
instrumentedHandler(
DFS_ACTION_METRIC,
transportService,
searchTransportMetrics,
(request, channel, task) -> searchService.executeDfsPhase(
request,
(SearchShardTask) task,
new ChannelActionListener<>(channel)
)
)
(request, channel, task) -> searchService.executeDfsPhase(request, (SearchShardTask) task, new ChannelActionListener<>(channel))
);
TransportActionProxy.registerProxyAction(transportService, DFS_ACTION_NAME, true, DfsSearchResult::new);

transportService.registerRequestHandler(
QUERY_ACTION_NAME,
EsExecutors.DIRECT_EXECUTOR_SERVICE,
ShardSearchRequest::new,
instrumentedHandler(
QUERY_ACTION_METRIC,
transportService,
searchTransportMetrics,
(request, channel, task) -> searchService.executeQueryPhase(
request,
(SearchShardTask) task,
new ChannelActionListener<>(channel)
)
(request, channel, task) -> searchService.executeQueryPhase(
request,
(SearchShardTask) task,
new ChannelActionListener<>(channel)
)
);
TransportActionProxy.registerProxyActionWithDynamicResponseType(
Expand All @@ -541,15 +509,10 @@ public static void registerRequestHandler(
QUERY_ID_ACTION_NAME,
EsExecutors.DIRECT_EXECUTOR_SERVICE,
QuerySearchRequest::new,
instrumentedHandler(
QUERY_ID_ACTION_METRIC,
transportService,
searchTransportMetrics,
(request, channel, task) -> searchService.executeQueryPhase(
request,
(SearchShardTask) task,
new ChannelActionListener<>(channel)
)
(request, channel, task) -> searchService.executeQueryPhase(
request,
(SearchShardTask) task,
new ChannelActionListener<>(channel)
)
);
TransportActionProxy.registerProxyAction(transportService, QUERY_ID_ACTION_NAME, true, QuerySearchResult::new);
Expand All @@ -558,15 +521,10 @@ public static void registerRequestHandler(
QUERY_SCROLL_ACTION_NAME,
EsExecutors.DIRECT_EXECUTOR_SERVICE,
InternalScrollSearchRequest::new,
instrumentedHandler(
QUERY_SCROLL_ACTION_METRIC,
transportService,
searchTransportMetrics,
(request, channel, task) -> searchService.executeQueryPhase(
request,
(SearchShardTask) task,
new ChannelActionListener<>(channel)
)
(request, channel, task) -> searchService.executeQueryPhase(
request,
(SearchShardTask) task,
new ChannelActionListener<>(channel)
)
);
TransportActionProxy.registerProxyAction(transportService, QUERY_SCROLL_ACTION_NAME, true, ScrollQuerySearchResult::new);
Expand All @@ -575,15 +533,10 @@ public static void registerRequestHandler(
QUERY_FETCH_SCROLL_ACTION_NAME,
EsExecutors.DIRECT_EXECUTOR_SERVICE,
InternalScrollSearchRequest::new,
instrumentedHandler(
QUERY_FETCH_SCROLL_ACTION_METRIC,
transportService,
searchTransportMetrics,
(request, channel, task) -> searchService.executeFetchPhase(
request,
(SearchShardTask) task,
new ChannelActionListener<>(channel)
)
(request, channel, task) -> searchService.executeFetchPhase(
request,
(SearchShardTask) task,
new ChannelActionListener<>(channel)
)
);
TransportActionProxy.registerProxyAction(transportService, QUERY_FETCH_SCROLL_ACTION_NAME, true, ScrollQueryFetchSearchResult::new);
Expand All @@ -594,7 +547,7 @@ public static void registerRequestHandler(
RANK_FEATURE_SHARD_ACTION_NAME,
EsExecutors.DIRECT_EXECUTOR_SERVICE,
RankFeatureShardRequest::new,
instrumentedHandler(RANK_SHARD_FEATURE_ACTION_METRIC, transportService, searchTransportMetrics, rankShardFeatureRequest)
rankShardFeatureRequest
);
TransportActionProxy.registerProxyAction(transportService, RANK_FEATURE_SHARD_ACTION_NAME, true, RankFeatureResult::new);

Expand All @@ -604,7 +557,7 @@ public static void registerRequestHandler(
FETCH_ID_SCROLL_ACTION_NAME,
EsExecutors.DIRECT_EXECUTOR_SERVICE,
ShardFetchRequest::new,
instrumentedHandler(FETCH_ID_SCROLL_ACTION_METRIC, transportService, searchTransportMetrics, shardFetchRequestHandler)
shardFetchRequestHandler
);
TransportActionProxy.registerProxyAction(transportService, FETCH_ID_SCROLL_ACTION_NAME, true, FetchSearchResult::new);

Expand All @@ -614,20 +567,15 @@ public static void registerRequestHandler(
true,
true,
ShardFetchSearchRequest::new,
instrumentedHandler(FETCH_ID_ACTION_METRIC, transportService, searchTransportMetrics, shardFetchRequestHandler)
shardFetchRequestHandler
);
TransportActionProxy.registerProxyAction(transportService, FETCH_ID_ACTION_NAME, true, FetchSearchResult::new);

transportService.registerRequestHandler(
QUERY_CAN_MATCH_NODE_NAME,
transportService.getThreadPool().executor(ThreadPool.Names.SEARCH_COORDINATION),
CanMatchNodeRequest::new,
instrumentedHandler(
QUERY_CAN_MATCH_NODE_METRIC,
transportService,
searchTransportMetrics,
(request, channel, task) -> searchService.canMatch(request, new ChannelActionListener<>(channel))
)
(request, channel, task) -> searchService.canMatch(request, new ChannelActionListener<>(channel))
);
TransportActionProxy.registerProxyAction(transportService, QUERY_CAN_MATCH_NODE_NAME, true, CanMatchNodeResponse::new);
}
Expand Down Expand Up @@ -658,26 +606,6 @@ public void onFailure(Exception e) {
});
}

private static <Request extends TransportRequest> TransportRequestHandler<Request> instrumentedHandler(
String actionQualifier,
TransportService transportService,
SearchTransportAPMMetrics searchTransportMetrics,
TransportRequestHandler<Request> transportRequestHandler
) {
var threadPool = transportService.getThreadPool();
var latencies = searchTransportMetrics.getActionLatencies();
Map<String, Object> attributes = Map.of(ACTION_ATTRIBUTE_NAME, actionQualifier);
return (request, channel, task) -> {
var startTime = threadPool.relativeTimeInMillis();
try {
transportRequestHandler.messageReceived(request, channel, task);
} finally {
var elapsedTime = threadPool.relativeTimeInMillis() - startTime;
latencies.record(elapsedTime, attributes);
}
};
}

/**
* Returns a connection to the given node on the provided cluster. If the cluster alias is <code>null</code> the node will be resolved
* against the local cluster.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ public TransportSearchAction(
IndexNameExpressionResolver indexNameExpressionResolver,
NamedWriteableRegistry namedWriteableRegistry,
ExecutorSelector executorSelector,
SearchTransportAPMMetrics searchTransportMetrics,
SearchResponseMetrics searchResponseMetrics,
Client client,
UsageService usageService
Expand All @@ -186,7 +185,7 @@ public TransportSearchAction(
this.searchPhaseController = searchPhaseController;
this.searchTransportService = searchTransportService;
this.remoteClusterService = searchTransportService.getRemoteClusterService();
SearchTransportService.registerRequestHandler(transportService, searchService, searchTransportMetrics);
SearchTransportService.registerRequestHandler(transportService, searchService);
this.clusterService = clusterService;
this.transportService = transportService;
this.searchService = searchService;
Expand Down
7 changes: 5 additions & 2 deletions server/src/main/java/org/elasticsearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public interface DirectoryWrapper {
private final Map<String, TriFunction<Settings, IndexVersion, ScriptService, Similarity>> similarities = new HashMap<>();
private final Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories;
private final SetOnce<BiFunction<IndexSettings, IndicesQueryCache, QueryCache>> forceQueryCacheProvider = new SetOnce<>();
private final List<SearchOperationListener> searchOperationListeners = new ArrayList<>();
private final List<SearchOperationListener> searchOperationListeners;
private final List<IndexingOperationListener> indexOperationListeners = new ArrayList<>();
private final IndexNameExpressionResolver expressionResolver;
private final AtomicBoolean frozen = new AtomicBoolean(false);
Expand All @@ -199,11 +199,14 @@ public IndexModule(
final IndexNameExpressionResolver expressionResolver,
final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories,
final SlowLogFieldProvider slowLogFieldProvider,
final MapperMetrics mapperMetrics
final MapperMetrics mapperMetrics,
final List<SearchOperationListener> searchOperationListeners
) {
this.indexSettings = indexSettings;
this.analysisRegistry = analysisRegistry;
this.engineFactory = Objects.requireNonNull(engineFactory);
// Need to have a mutable arraylist for plugins to add listeners to it
this.searchOperationListeners = new ArrayList<>(searchOperationListeners);
this.searchOperationListeners.add(new SearchSlowLog(indexSettings, slowLogFieldProvider));
this.indexOperationListeners.add(new IndexingSlowLog(indexSettings, slowLogFieldProvider));
this.directoryFactories = Collections.unmodifiableMap(directoryFactories);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.search.stats;

import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.telemetry.metric.LongHistogram;
import org.elasticsearch.telemetry.metric.MeterRegistry;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public final class ShardSearchPhaseAPMMetrics implements SearchOperationListener {

public static final String QUERY_SEARCH_PHASE_METRIC = "es.search.shards.phases.query.duration.histogram";
public static final String FETCH_SEARCH_PHASE_METRIC = "es.search.shards.phases.fetch.duration.histogram";

public static final String SYSTEM_THREAD_ATTRIBUTE_NAME = "system_thread";

private final LongHistogram queryPhaseMetric;
private final LongHistogram fetchPhaseMetric;

// Avoid allocating objects in the search path and multithreading clashes
private static final ThreadLocal<Map<String, Object>> THREAD_LOCAL_ATTRS = ThreadLocal.withInitial(() -> new HashMap<>(1));

public ShardSearchPhaseAPMMetrics(MeterRegistry meterRegistry) {
this.queryPhaseMetric = meterRegistry.registerLongHistogram(
QUERY_SEARCH_PHASE_METRIC,
"Query search phase execution times at the shard level, expressed as a histogram",
"ms"
);
this.fetchPhaseMetric = meterRegistry.registerLongHistogram(
FETCH_SEARCH_PHASE_METRIC,
"Fetch search phase execution times at the shard level, expressed as a histogram",
"ms"
);
}

@Override
public void onQueryPhase(SearchContext searchContext, long tookInNanos) {
recordPhaseLatency(queryPhaseMetric, tookInNanos);
}

@Override
public void onFetchPhase(SearchContext searchContext, long tookInNanos) {
recordPhaseLatency(fetchPhaseMetric, tookInNanos);
}

private static void recordPhaseLatency(LongHistogram histogramMetric, long tookInNanos) {
Map<String, Object> attrs = ShardSearchPhaseAPMMetrics.THREAD_LOCAL_ATTRS.get();
boolean isSystem = ((EsExecutors.EsThread) Thread.currentThread()).isSystem();
attrs.put(SYSTEM_THREAD_ATTRIBUTE_NAME, isSystem);
histogramMetric.record(TimeUnit.NANOSECONDS.toMillis(tookInNanos), attrs);
}
}
Loading

0 comments on commit 161b7ef

Please sign in to comment.