Skip to content

Commit

Permalink
Add X-Opaque-Id to search request metadata for query insights (opense…
Browse files Browse the repository at this point in the history
…arch-project#13374)

---------

Signed-off-by: Chenyang Ji <[email protected]>
(cherry picked from commit 9d3cf43)
Signed-off-by: Chenyang Ji <[email protected]>
  • Loading branch information
ansjcy committed Jun 7, 2024
1 parent d2da9bb commit dcb30c3
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Allow setting query parameters on requests ([#13776](https://github.com/opensearch-project/OpenSearch/issues/13776))
- [Remote Store] Add support to disable flush based on translog reader count ([#14027](https://github.com/opensearch-project/OpenSearch/pull/14027))
- [Query Insights] Add exporter support for top n queries ([#12982](https://github.com/opensearch-project/OpenSearch/pull/12982))
- [Query Insights] Add X-Opaque-Id to search request metadata for top n queries ([#13374](https://github.com/opensearch-project/OpenSearch/pull/13374))

### Dependencies
- Bump `com.github.spullara.mustache.java:compiler` from 0.9.10 to 0.9.13 ([#13329](https://github.com/opensearch-project/OpenSearch/pull/13329), [#13559](https://github.com/opensearch-project/OpenSearch/pull/13559))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.plugin.insights.rules.model.Attribute;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.opensearch.tasks.Task;

import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -138,6 +139,15 @@ public void onRequestEnd(final SearchPhaseContext context, final SearchRequestCo
attributes.put(Attribute.TOTAL_SHARDS, context.getNumShards());
attributes.put(Attribute.INDICES, request.indices());
attributes.put(Attribute.PHASE_LATENCY_MAP, searchRequestContext.phaseTookMap());

Map<String, Object> labels = new HashMap<>();
// Retrieve user provided label if exists
String userProvidedLabel = context.getTask().getHeader(Task.X_OPAQUE_ID);
if (userProvidedLabel != null) {
labels.put(Task.X_OPAQUE_ID, userProvidedLabel);
}
attributes.put(Attribute.LABELS, labels);
// construct SearchQueryRecord from attributes and measurements
SearchQueryRecord record = new SearchQueryRecord(request.getOrCreateAbsoluteStartMillis(), measurements, attributes);
queryInsightsService.addRecord(record);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ public enum Attribute {
/**
* The node id for this request
*/
NODE_ID;
NODE_ID,
/**
* Custom search request labels
*/
LABELS;

/**
* Read an Attribute from a StreamInput
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,38 @@
import org.opensearch.action.search.SearchPhaseContext;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchRequestContext;
import org.opensearch.action.search.SearchTask;
import org.opensearch.action.search.SearchType;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.plugin.insights.core.service.QueryInsightsService;
import org.opensearch.plugin.insights.core.service.TopQueriesService;
import org.opensearch.plugin.insights.rules.model.Attribute;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.opensearch.search.aggregations.support.ValueType;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.tasks.Task;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.ThreadPool;
import org.junit.Before;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Phaser;

import org.mockito.ArgumentCaptor;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand All @@ -47,6 +58,7 @@ public class QueryInsightsListenerTests extends OpenSearchTestCase {
private final SearchRequest searchRequest = mock(SearchRequest.class);
private final QueryInsightsService queryInsightsService = mock(QueryInsightsService.class);
private final TopQueriesService topQueriesService = mock(TopQueriesService.class);
private final ThreadPool threadPool = mock(ThreadPool.class);
private ClusterService clusterService;

@Before
Expand All @@ -60,15 +72,21 @@ public void setup() {
clusterService = new ClusterService(settings, clusterSettings, null);
when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(true);
when(queryInsightsService.getTopQueriesService(MetricType.LATENCY)).thenReturn(topQueriesService);

ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
threadContext.setHeaders(new Tuple<>(Collections.singletonMap(Task.X_OPAQUE_ID, "userLabel"), new HashMap<>()));
when(threadPool.getThreadContext()).thenReturn(threadContext);
}

@SuppressWarnings("unchecked")
public void testOnRequestEnd() throws InterruptedException {
Long timestamp = System.currentTimeMillis() - 100L;
SearchType searchType = SearchType.QUERY_THEN_FETCH;

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.aggregation(new TermsAggregationBuilder("agg1").userValueTypeHint(ValueType.STRING).field("type.keyword"));
searchSourceBuilder.size(0);
SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.singletonMap(Task.X_OPAQUE_ID, "userLabel"));

String[] indices = new String[] { "index-1", "index-2" };

Expand All @@ -88,10 +106,19 @@ public void testOnRequestEnd() throws InterruptedException {
when(searchRequestContext.phaseTookMap()).thenReturn(phaseLatencyMap);
when(searchPhaseContext.getRequest()).thenReturn(searchRequest);
when(searchPhaseContext.getNumShards()).thenReturn(numberOfShards);
when(searchPhaseContext.getTask()).thenReturn(task);
ArgumentCaptor<SearchQueryRecord> captor = ArgumentCaptor.forClass(SearchQueryRecord.class);

queryInsightsListener.onRequestEnd(searchPhaseContext, searchRequestContext);

verify(queryInsightsService, times(1)).addRecord(any());
verify(queryInsightsService, times(1)).addRecord(captor.capture());
SearchQueryRecord generatedRecord = captor.getValue();
assertEquals(timestamp.longValue(), generatedRecord.getTimestamp());
assertEquals(numberOfShards, generatedRecord.getAttributes().get(Attribute.TOTAL_SHARDS));
assertEquals(searchType.toString().toLowerCase(Locale.ROOT), generatedRecord.getAttributes().get(Attribute.SEARCH_TYPE));
assertEquals(searchSourceBuilder.toString(), generatedRecord.getAttributes().get(Attribute.SOURCE));
Map<String, String> labels = (Map<String, String>) generatedRecord.getAttributes().get(Attribute.LABELS);
assertEquals("userLabel", labels.get(Task.X_OPAQUE_ID));
}

public void testConcurrentOnRequestEnd() throws InterruptedException {
Expand All @@ -101,6 +128,7 @@ public void testConcurrentOnRequestEnd() throws InterruptedException {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.aggregation(new TermsAggregationBuilder("agg1").userValueTypeHint(ValueType.STRING).field("type.keyword"));
searchSourceBuilder.size(0);
SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.singletonMap(Task.X_OPAQUE_ID, "userLabel"));

String[] indices = new String[] { "index-1", "index-2" };

Expand All @@ -120,6 +148,7 @@ public void testConcurrentOnRequestEnd() throws InterruptedException {
when(searchRequestContext.phaseTookMap()).thenReturn(phaseLatencyMap);
when(searchPhaseContext.getRequest()).thenReturn(searchRequest);
when(searchPhaseContext.getNumShards()).thenReturn(numberOfShards);
when(searchPhaseContext.getTask()).thenReturn(task);

int numRequests = 50;
Thread[] threads = new Thread[numRequests];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ String formattedShardStats() {
);
}
}

public SearchRequest getRequest() {
return searchRequest;
}
}

enum ShardStatsFieldNames {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ protected SearchRequestOperationsListener(final boolean enabled) {
this.enabled = enabled;
}

protected abstract void onPhaseStart(SearchPhaseContext context);
protected void onPhaseStart(SearchPhaseContext context) {};

protected abstract void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext);
protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {};

protected abstract void onPhaseFailure(SearchPhaseContext context, Throwable cause);
protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) {};

protected void onRequestStart(SearchRequestContext searchRequestContext) {}

Expand Down

0 comments on commit dcb30c3

Please sign in to comment.