From 81b7a077c734472b66829f8523c2aa3231bdb00b Mon Sep 17 00:00:00 2001 From: Kaushal Kumar Date: Thu, 29 Aug 2024 17:12:44 -0700 Subject: [PATCH 1/7] add workload managementRequestFailureListener Signed-off-by: Kaushal Kumar --- .../org/opensearch/wlm/QueryGroupService.java | 50 +++++++++++++++++++ ...kloadManagementRequestFailureListener.java | 32 ++++++++++++ .../wlm/listeners/package-info.java | 12 +++++ .../opensearch/wlm/stats/QueryGroupState.java | 2 +- ...ManagementRequestFailureListenerTests.java | 36 +++++++++++++ 5 files changed, 131 insertions(+), 1 deletion(-) create mode 100644 server/src/main/java/org/opensearch/wlm/QueryGroupService.java create mode 100644 server/src/main/java/org/opensearch/wlm/listeners/WorkloadManagementRequestFailureListener.java create mode 100644 server/src/main/java/org/opensearch/wlm/listeners/package-info.java create mode 100644 server/src/test/java/org/opensearch/wlm/listeners/WorkloadManagementRequestFailureListenerTests.java diff --git a/server/src/main/java/org/opensearch/wlm/QueryGroupService.java b/server/src/main/java/org/opensearch/wlm/QueryGroupService.java new file mode 100644 index 0000000000000..8a4b6f0f3b061 --- /dev/null +++ b/server/src/main/java/org/opensearch/wlm/QueryGroupService.java @@ -0,0 +1,50 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.wlm; + +import org.opensearch.wlm.stats.QueryGroupState; +import org.opensearch.wlm.stats.QueryGroupStats; +import org.opensearch.wlm.stats.QueryGroupStats.QueryGroupStatsHolder; + +import java.util.HashMap; +import java.util.Map; + +/** + * As of now this is a stub and main implementation PR will be raised soon.Coming PR will collate these changes with core QueryGroupService changes + */ +public class QueryGroupService { + // This map does not need to be concurrent since we will process the cluster state change serially and update + // this map with new additions and deletions of entries. QueryGroupState is thread safe + private final Map queryGroupStateMap = new HashMap<>(); + + + /** + * updates the failure stats for the query group + * @param queryGroupId query group identifier + */ + public void incrementFailuresFor(final String queryGroupId) { + QueryGroupState queryGroupState = queryGroupStateMap.get(queryGroupId); + // This can happen if the request failed for a deleted query group + // or new queryGroup is being created and has not been acknowledged yet + if (queryGroupId == null) { + return ; + } + queryGroupState.failures.inc(); + } + + public QueryGroupStats nodeStats() { + Map statsHolderMap = new HashMap<>(); + for (Map.Entry queryGroupsState: queryGroupStateMap.entrySet()) { + final String queryGroupId = queryGroupsState.getKey(); +// final + } + QueryGroupStats queryGroupStats = new QueryGroupStats(); + return queryGroupStats; + } +} diff --git a/server/src/main/java/org/opensearch/wlm/listeners/WorkloadManagementRequestFailureListener.java b/server/src/main/java/org/opensearch/wlm/listeners/WorkloadManagementRequestFailureListener.java new file mode 100644 index 0000000000000..fc3b2c4bca604 --- /dev/null +++ b/server/src/main/java/org/opensearch/wlm/listeners/WorkloadManagementRequestFailureListener.java @@ -0,0 +1,32 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.wlm.listeners; + +import org.opensearch.action.search.SearchPhaseContext; +import org.opensearch.action.search.SearchRequestContext; +import org.opensearch.action.search.SearchRequestOperationsListener; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.wlm.QueryGroupService; +import org.opensearch.wlm.QueryGroupTask; + +public class WorkloadManagementRequestFailureListener extends SearchRequestOperationsListener { + private final QueryGroupService queryGroupService; + private final ThreadPool threadPool; + + public WorkloadManagementRequestFailureListener(QueryGroupService queryGroupService, ThreadPool threadPool) { + this.queryGroupService = queryGroupService; + this.threadPool = threadPool; + } + + @Override + protected void onRequestFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext) { + final String queryGroupId = threadPool.getThreadContext().getHeader(QueryGroupTask.QUERY_GROUP_ID_HEADER); + queryGroupService.incrementFailuresFor(queryGroupId); + } +} diff --git a/server/src/main/java/org/opensearch/wlm/listeners/package-info.java b/server/src/main/java/org/opensearch/wlm/listeners/package-info.java new file mode 100644 index 0000000000000..e900acf657085 --- /dev/null +++ b/server/src/main/java/org/opensearch/wlm/listeners/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * WLM related listener constructs + */ +package org.opensearch.wlm.listeners; diff --git a/server/src/main/java/org/opensearch/wlm/stats/QueryGroupState.java b/server/src/main/java/org/opensearch/wlm/stats/QueryGroupState.java index 93cfcea697c43..82737b1663776 100644 --- a/server/src/main/java/org/opensearch/wlm/stats/QueryGroupState.java +++ b/server/src/main/java/org/opensearch/wlm/stats/QueryGroupState.java @@ -31,7 +31,7 @@ public class QueryGroupState { /** * this will track the cumulative failures in a query group */ - final CounterMetric failures = new CounterMetric(); + public final CounterMetric failures = new CounterMetric(); /** * This will track total number of cancellations in the query group due to all resource type breaches diff --git a/server/src/test/java/org/opensearch/wlm/listeners/WorkloadManagementRequestFailureListenerTests.java b/server/src/test/java/org/opensearch/wlm/listeners/WorkloadManagementRequestFailureListenerTests.java new file mode 100644 index 0000000000000..c4b591adb1176 --- /dev/null +++ b/server/src/test/java/org/opensearch/wlm/listeners/WorkloadManagementRequestFailureListenerTests.java @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.wlm.listeners; + +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.wlm.QueryGroupService; + +import static org.mockito.Mockito.mock; + +public class WorkloadManagementRequestFailureListenerTests extends OpenSearchTestCase { + ThreadPool testThreadPool; + QueryGroupService queryGroupService; + WorkloadManagementRequestFailureListener sut; + + public void setUp() throws Exception { + super.setUp(); + testThreadPool = new TestThreadPool("RejectionTestThreadPool"); + queryGroupService = mock(QueryGroupService.class); + sut = new WorkloadManagementRequestFailureListener(queryGroupService, testThreadPool); + } + + public void tearDown() throws Exception { + super.tearDown(); + testThreadPool.shutdown(); + } + + +} From 3574e3c7cfe39384dfa54d4656a810e3b98bc092 Mon Sep 17 00:00:00 2001 From: Kaushal Kumar Date: Fri, 30 Aug 2024 09:00:54 -0700 Subject: [PATCH 2/7] add unit tests Signed-off-by: Kaushal Kumar --- .../org/opensearch/wlm/QueryGroupService.java | 27 +++- .../opensearch/wlm/stats/QueryGroupState.java | 9 ++ .../opensearch/wlm/stats/QueryGroupStats.java | 64 ++++++++- ...ManagementRequestFailureListenerTests.java | 135 +++++++++++++++++- 4 files changed, 218 insertions(+), 17 deletions(-) diff --git a/server/src/main/java/org/opensearch/wlm/QueryGroupService.java b/server/src/main/java/org/opensearch/wlm/QueryGroupService.java index 1d191f76ddcb6..6545598dd9951 100644 --- a/server/src/main/java/org/opensearch/wlm/QueryGroupService.java +++ b/server/src/main/java/org/opensearch/wlm/QueryGroupService.java @@ -22,8 +22,15 @@ public class QueryGroupService { // This map does not need to be concurrent since we will process the cluster state change serially and update // this map with new additions and deletions of entries. QueryGroupState is thread safe - private final Map queryGroupStateMap = new HashMap<>(); + private final Map queryGroupStateMap; + public QueryGroupService() { + this(new HashMap<>()); + } + + public QueryGroupService(Map queryGroupStateMap) { + this.queryGroupStateMap = queryGroupStateMap; + } /** * updates the failure stats for the query group @@ -33,22 +40,28 @@ public void incrementFailuresFor(final String queryGroupId) { QueryGroupState queryGroupState = queryGroupStateMap.get(queryGroupId); // This can happen if the request failed for a deleted query group // or new queryGroup is being created and has not been acknowledged yet - if (queryGroupId == null) { - return ; + if (queryGroupState == null) { + return; } queryGroupState.failures.inc(); } + /** + * + * @return node level query group stats + */ public QueryGroupStats nodeStats() { - Map statsHolderMap = new HashMap<>(); + final Map statsHolderMap = new HashMap<>(); for (Map.Entry queryGroupsState : queryGroupStateMap.entrySet()) { final String queryGroupId = queryGroupsState.getKey(); -// final + final QueryGroupState currentState = queryGroupsState.getValue(); + + statsHolderMap.put(queryGroupId, QueryGroupStatsHolder.from(currentState)); } - QueryGroupStats queryGroupStats = new QueryGroupStats(); - return queryGroupStats; + return new QueryGroupStats(statsHolderMap); } + /** * * @param queryGroupId query group identifier diff --git a/server/src/main/java/org/opensearch/wlm/stats/QueryGroupState.java b/server/src/main/java/org/opensearch/wlm/stats/QueryGroupState.java index 82737b1663776..376d34dd7c8ca 100644 --- a/server/src/main/java/org/opensearch/wlm/stats/QueryGroupState.java +++ b/server/src/main/java/org/opensearch/wlm/stats/QueryGroupState.java @@ -95,9 +95,18 @@ public static class ResourceTypeState { final ResourceType resourceType; final CounterMetric cancellations = new CounterMetric(); final CounterMetric rejections = new CounterMetric(); + private double lastRecordedUsage = 0; public ResourceTypeState(ResourceType resourceType) { this.resourceType = resourceType; } + + public void setLastRecordedUsage(double recordedUsage) { + lastRecordedUsage = recordedUsage; + } + + public double getLastRecordedUsage() { + return lastRecordedUsage; + } } } diff --git a/server/src/main/java/org/opensearch/wlm/stats/QueryGroupStats.java b/server/src/main/java/org/opensearch/wlm/stats/QueryGroupStats.java index d39bf104332da..99cbc9348821a 100644 --- a/server/src/main/java/org/opensearch/wlm/stats/QueryGroupStats.java +++ b/server/src/main/java/org/opensearch/wlm/stats/QueryGroupStats.java @@ -14,8 +14,12 @@ import org.opensearch.core.xcontent.ToXContentObject; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.wlm.ResourceType; +import org.opensearch.wlm.stats.QueryGroupState.ResourceTypeState; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; @@ -52,7 +56,11 @@ public void writeTo(StreamOutput out) throws IOException { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject("query_groups"); - for (Map.Entry queryGroupStats : stats.entrySet()) { + // to keep the toXContent consistent + List> entryList = new ArrayList<>(stats.entrySet()); + entryList.sort((k1, k2) -> k1.getKey().compareTo(k2.getKey())); + + for (Map.Entry queryGroupStats : entryList) { builder.startObject(queryGroupStats.getKey()); queryGroupStats.getValue().toXContent(builder, params); builder.endObject(); @@ -83,11 +91,14 @@ public static class QueryGroupStatsHolder implements ToXContentObject, Writeable public static final String REJECTIONS = "rejections"; public static final String TOTAL_CANCELLATIONS = "total_cancellations"; public static final String FAILURES = "failures"; - private final long completions; - private final long rejections; - private final long failures; - private final long totalCancellations; - private final Map resourceStats; + private long completions; + private long rejections; + private long failures; + private long totalCancellations; + private Map resourceStats; + + // this is needed to support the factory method + public QueryGroupStatsHolder() {} public QueryGroupStatsHolder( long completions, @@ -111,6 +122,28 @@ public QueryGroupStatsHolder(StreamInput in) throws IOException { this.resourceStats = in.readMap((i) -> ResourceType.fromName(i.readString()), ResourceStats::new); } + /** + * static factory method to convert {@link QueryGroupState} into {@link QueryGroupStatsHolder} + * @param queryGroupState which needs to be converted + * @return QueryGroupStatsHolder object + */ + public static QueryGroupStatsHolder from(QueryGroupState queryGroupState) { + final QueryGroupStatsHolder statsHolder = new QueryGroupStatsHolder(); + + Map resourceStatsMap = new HashMap<>(); + + for (Map.Entry resourceTypeStateEntry : queryGroupState.getResourceState().entrySet()) { + resourceStatsMap.put(resourceTypeStateEntry.getKey(), ResourceStats.from(resourceTypeStateEntry.getValue())); + } + + statsHolder.completions = queryGroupState.getCompletions(); + statsHolder.rejections = queryGroupState.getTotalRejections(); + statsHolder.failures = queryGroupState.getFailures(); + statsHolder.totalCancellations = queryGroupState.getTotalCancellations(); + statsHolder.resourceStats = resourceStatsMap; + return statsHolder; + } + /** * Writes the {@param statsHolder} to {@param out} * @param out StreamOutput @@ -136,7 +169,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(REJECTIONS, rejections); builder.field(FAILURES, failures); builder.field(TOTAL_CANCELLATIONS, totalCancellations); - for (Map.Entry resourceStat : resourceStats.entrySet()) { + + List> entryList = new ArrayList<>(resourceStats.entrySet()); + entryList.sort((k1, k2) -> k1.getKey().compareTo(k2.getKey())); + + for (Map.Entry resourceStat : entryList) { ResourceType resourceType = resourceStat.getKey(); ResourceStats resourceStats1 = resourceStat.getValue(); builder.startObject(resourceType.getName()); @@ -187,6 +224,19 @@ public ResourceStats(StreamInput in) throws IOException { this.rejections = in.readVLong(); } + /** + * static factory method to convert {@link ResourceTypeState} into {@link ResourceStats} + * @param resourceTypeState which needs to be converted + * @return QueryGroupStatsHolder object + */ + public static ResourceStats from(ResourceTypeState resourceTypeState) { + return new ResourceStats( + resourceTypeState.getLastRecordedUsage(), + resourceTypeState.cancellations.count(), + resourceTypeState.rejections.count() + ); + } + /** * Writes the {@param stats} to {@param out} * @param out StreamOutput diff --git a/server/src/test/java/org/opensearch/wlm/listeners/WorkloadManagementRequestFailureListenerTests.java b/server/src/test/java/org/opensearch/wlm/listeners/WorkloadManagementRequestFailureListenerTests.java index c4b591adb1176..0afb37ab3f69c 100644 --- a/server/src/test/java/org/opensearch/wlm/listeners/WorkloadManagementRequestFailureListenerTests.java +++ b/server/src/test/java/org/opensearch/wlm/listeners/WorkloadManagementRequestFailureListenerTests.java @@ -8,23 +8,36 @@ package org.opensearch.wlm.listeners; +import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.opensearch.wlm.QueryGroupService; +import org.opensearch.wlm.QueryGroupTask; +import org.opensearch.wlm.ResourceType; +import org.opensearch.wlm.stats.QueryGroupState; +import org.opensearch.wlm.stats.QueryGroupStats; -import static org.mockito.Mockito.mock; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; public class WorkloadManagementRequestFailureListenerTests extends OpenSearchTestCase { + public static final int ITERATIONS = 20; ThreadPool testThreadPool; QueryGroupService queryGroupService; + + Map queryGroupStateMap; + String testQueryGroupId; WorkloadManagementRequestFailureListener sut; public void setUp() throws Exception { super.setUp(); + queryGroupStateMap = new HashMap<>(); + testQueryGroupId = "safjgagnakg-3r3fads"; testThreadPool = new TestThreadPool("RejectionTestThreadPool"); - queryGroupService = mock(QueryGroupService.class); - sut = new WorkloadManagementRequestFailureListener(queryGroupService, testThreadPool); } public void tearDown() throws Exception { @@ -32,5 +45,121 @@ public void tearDown() throws Exception { testThreadPool.shutdown(); } + public void testValidQueryGroupRequestFailure() throws IOException { + + QueryGroupStats expectedStats = new QueryGroupStats( + Map.of( + testQueryGroupId, + new QueryGroupStats.QueryGroupStatsHolder( + 0, + 0, + 1, + 0, + Map.of( + ResourceType.CPU, + new QueryGroupStats.ResourceStats(0, 0, 0), + ResourceType.MEMORY, + new QueryGroupStats.ResourceStats(0, 0, 0) + ) + ) + ) + ); + + assertSuccess(testQueryGroupId, queryGroupStateMap, expectedStats, testQueryGroupId); + } + + public void testMultiThreadedValidQueryGroupRequestFailures() { + + queryGroupStateMap.put(testQueryGroupId, new QueryGroupState()); + + queryGroupService = new QueryGroupService(queryGroupStateMap); + + sut = new WorkloadManagementRequestFailureListener(queryGroupService, testThreadPool); + + List threads = new ArrayList<>(); + for (int i = 0; i < ITERATIONS; i++) { + threads.add(new Thread(() -> { + try (ThreadContext.StoredContext currentContext = testThreadPool.getThreadContext().stashContext()) { + testThreadPool.getThreadContext().putHeader(QueryGroupTask.QUERY_GROUP_ID_HEADER, testQueryGroupId); + sut.onRequestFailure(null, null); + } + })); + } + + threads.forEach(Thread::start); + threads.forEach(th -> { + try { + th.join(); + } catch (InterruptedException ignored) { + + } + }); + + QueryGroupStats actualStats = queryGroupService.nodeStats(); + QueryGroupStats expectedStats = new QueryGroupStats( + Map.of( + testQueryGroupId, + new QueryGroupStats.QueryGroupStatsHolder( + 0, + 0, + ITERATIONS, + 0, + Map.of( + ResourceType.CPU, + new QueryGroupStats.ResourceStats(0, 0, 0), + ResourceType.MEMORY, + new QueryGroupStats.ResourceStats(0, 0, 0) + ) + ) + ) + ); + + assertEquals(expectedStats, actualStats); + } + + public void testInvalidQueryGroupFailure() throws IOException { + QueryGroupStats expectedStats = new QueryGroupStats( + Map.of( + testQueryGroupId, + new QueryGroupStats.QueryGroupStatsHolder( + 0, + 0, + 0, + 0, + Map.of( + ResourceType.CPU, + new QueryGroupStats.ResourceStats(0, 0, 0), + ResourceType.MEMORY, + new QueryGroupStats.ResourceStats(0, 0, 0) + ) + ) + ) + ); + + assertSuccess(testQueryGroupId, queryGroupStateMap, expectedStats, "dummy-invalid-qg-id"); + + } + + private void assertSuccess( + String testQueryGroupId, + Map queryGroupStateMap, + QueryGroupStats expectedStats, + String threadContextQG_Id + ) { + + try (ThreadContext.StoredContext currentContext = testThreadPool.getThreadContext().stashContext()) { + testThreadPool.getThreadContext().putHeader(QueryGroupTask.QUERY_GROUP_ID_HEADER, threadContextQG_Id); + queryGroupStateMap.put(testQueryGroupId, new QueryGroupState()); + + queryGroupService = new QueryGroupService(queryGroupStateMap); + + sut = new WorkloadManagementRequestFailureListener(queryGroupService, testThreadPool); + sut.onRequestFailure(null, null); + + QueryGroupStats actualStats = queryGroupService.nodeStats(); + assertEquals(expectedStats, actualStats); + } + + } } From 3cbe8bbba18ee7b7392a1577338fa96e43f66ece Mon Sep 17 00:00:00 2001 From: Kaushal Kumar Date: Fri, 30 Aug 2024 09:16:02 -0700 Subject: [PATCH 3/7] add CHANGELOG Signed-off-by: Kaushal Kumar --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index fe1cee57279d2..5797b1dc60cae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,6 +34,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add runAs to Subject interface and introduce IdentityAwarePlugin extension point ([#14630](https://github.com/opensearch-project/OpenSearch/pull/14630)) - Optimize NodeIndicesStats output behind flag ([#14454](https://github.com/opensearch-project/OpenSearch/pull/14454)) - [Workload Management] Add rejection logic for co-ordinator and shard level requests ([#15428](https://github.com/opensearch-project/OpenSearch/pull/15428))) +- [Workload Management] Add query group level failure tracking ([#15227](https://github.com/opensearch-project/OpenSearch/pull/15527)) ### Dependencies - Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081)) From 2e4be6f2ea4642169e27f44ee511966514eca669 Mon Sep 17 00:00:00 2001 From: Kaushal Kumar Date: Fri, 30 Aug 2024 09:33:33 -0700 Subject: [PATCH 4/7] add missing javadoc Signed-off-by: Kaushal Kumar --- server/src/main/java/org/opensearch/node/Node.java | 13 ++++++++----- .../WorkloadManagementRequestFailureListener.java | 3 +++ 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index ea656af6110e5..aec71b9f7ddff 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -270,6 +270,7 @@ import org.opensearch.wlm.QueryGroupService; import org.opensearch.wlm.WorkloadManagementTransportInterceptor; import org.opensearch.wlm.listeners.QueryGroupRequestRejectionOperationListener; +import org.opensearch.wlm.listeners.WorkloadManagementRequestFailureListener; import javax.net.ssl.SNIHostName; @@ -1019,11 +1020,12 @@ protected Node( List identityAwarePlugins = pluginsService.filterPlugins(IdentityAwarePlugin.class); identityService.initializeIdentityAwarePlugins(identityAwarePlugins); + final QueryGroupService queryGroupService = new QueryGroupService(); // We will need to replace this with actual instance of the + // queryGroupService final QueryGroupRequestRejectionOperationListener queryGroupRequestRejectionListener = - new QueryGroupRequestRejectionOperationListener( - new QueryGroupService(), // We will need to replace this with actual instance of the queryGroupService - threadPool - ); + new QueryGroupRequestRejectionOperationListener(queryGroupService, threadPool); + final WorkloadManagementRequestFailureListener workloadManagementRequestFailureListener = + new WorkloadManagementRequestFailureListener(queryGroupService, threadPool); // register all standard SearchRequestOperationsCompositeListenerFactory to the SearchRequestOperationsCompositeListenerFactory final SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory = @@ -1033,7 +1035,8 @@ protected Node( searchRequestStats, searchRequestSlowLog, searchTaskRequestOperationsListener, - queryGroupRequestRejectionListener + queryGroupRequestRejectionListener, + workloadManagementRequestFailureListener ), pluginComponents.stream() .filter(p -> p instanceof SearchRequestOperationsListener) diff --git a/server/src/main/java/org/opensearch/wlm/listeners/WorkloadManagementRequestFailureListener.java b/server/src/main/java/org/opensearch/wlm/listeners/WorkloadManagementRequestFailureListener.java index fc3b2c4bca604..9d65392ccfdfb 100644 --- a/server/src/main/java/org/opensearch/wlm/listeners/WorkloadManagementRequestFailureListener.java +++ b/server/src/main/java/org/opensearch/wlm/listeners/WorkloadManagementRequestFailureListener.java @@ -15,6 +15,9 @@ import org.opensearch.wlm.QueryGroupService; import org.opensearch.wlm.QueryGroupTask; +/** + * Request listener to listen for failures and update query group failure stats + */ public class WorkloadManagementRequestFailureListener extends SearchRequestOperationsListener { private final QueryGroupService queryGroupService; private final ThreadPool threadPool; From fd8f3f5bd04ac888e6b30eae41934cfabdf3ed95 Mon Sep 17 00:00:00 2001 From: Kaushal Kumar Date: Fri, 30 Aug 2024 11:11:45 -0700 Subject: [PATCH 5/7] refactor Signed-off-by: Kaushal Kumar --- .../main/java/org/opensearch/node/Node.java | 12 ++--- ...> QueryGroupRequestOperationListener.java} | 13 +++-- ...kloadManagementRequestFailureListener.java | 35 ------------ ...ryGroupRequestOperationListenerTests.java} | 30 +++++++++-- ...equestRejectionOperationListenerTests.java | 53 ------------------- 5 files changed, 40 insertions(+), 103 deletions(-) rename server/src/main/java/org/opensearch/wlm/listeners/{QueryGroupRequestRejectionOperationListener.java => QueryGroupRequestOperationListener.java} (64%) delete mode 100644 server/src/main/java/org/opensearch/wlm/listeners/WorkloadManagementRequestFailureListener.java rename server/src/test/java/org/opensearch/wlm/listeners/{WorkloadManagementRequestFailureListenerTests.java => QueryGroupRequestOperationListenerTests.java} (78%) delete mode 100644 server/src/test/java/org/opensearch/wlm/listeners/QueryGroupRequestRejectionOperationListenerTests.java diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index aec71b9f7ddff..e3fb3718e63a2 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -269,8 +269,7 @@ import org.opensearch.watcher.ResourceWatcherService; import org.opensearch.wlm.QueryGroupService; import org.opensearch.wlm.WorkloadManagementTransportInterceptor; -import org.opensearch.wlm.listeners.QueryGroupRequestRejectionOperationListener; -import org.opensearch.wlm.listeners.WorkloadManagementRequestFailureListener; +import org.opensearch.wlm.listeners.QueryGroupRequestOperationListener; import javax.net.ssl.SNIHostName; @@ -1022,10 +1021,8 @@ protected Node( final QueryGroupService queryGroupService = new QueryGroupService(); // We will need to replace this with actual instance of the // queryGroupService - final QueryGroupRequestRejectionOperationListener queryGroupRequestRejectionListener = - new QueryGroupRequestRejectionOperationListener(queryGroupService, threadPool); - final WorkloadManagementRequestFailureListener workloadManagementRequestFailureListener = - new WorkloadManagementRequestFailureListener(queryGroupService, threadPool); + final QueryGroupRequestOperationListener queryGroupRequestRejectionListener = + new QueryGroupRequestOperationListener(queryGroupService, threadPool); // register all standard SearchRequestOperationsCompositeListenerFactory to the SearchRequestOperationsCompositeListenerFactory final SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory = @@ -1035,8 +1032,7 @@ protected Node( searchRequestStats, searchRequestSlowLog, searchTaskRequestOperationsListener, - queryGroupRequestRejectionListener, - workloadManagementRequestFailureListener + queryGroupRequestRejectionListener ), pluginComponents.stream() .filter(p -> p instanceof SearchRequestOperationsListener) diff --git a/server/src/main/java/org/opensearch/wlm/listeners/QueryGroupRequestRejectionOperationListener.java b/server/src/main/java/org/opensearch/wlm/listeners/QueryGroupRequestOperationListener.java similarity index 64% rename from server/src/main/java/org/opensearch/wlm/listeners/QueryGroupRequestRejectionOperationListener.java rename to server/src/main/java/org/opensearch/wlm/listeners/QueryGroupRequestOperationListener.java index 89f6fe709667f..a2ce2b57bfe0f 100644 --- a/server/src/main/java/org/opensearch/wlm/listeners/QueryGroupRequestRejectionOperationListener.java +++ b/server/src/main/java/org/opensearch/wlm/listeners/QueryGroupRequestOperationListener.java @@ -8,6 +8,7 @@ package org.opensearch.wlm.listeners; +import org.opensearch.action.search.SearchPhaseContext; import org.opensearch.action.search.SearchRequestContext; import org.opensearch.action.search.SearchRequestOperationsListener; import org.opensearch.threadpool.ThreadPool; @@ -15,14 +16,14 @@ import org.opensearch.wlm.QueryGroupTask; /** - * This listener is used to perform the rejections for incoming requests into a queryGroup + * This listener is used to listen for request lifecycle events for a queryGroup */ -public class QueryGroupRequestRejectionOperationListener extends SearchRequestOperationsListener { +public class QueryGroupRequestOperationListener extends SearchRequestOperationsListener { private final QueryGroupService queryGroupService; private final ThreadPool threadPool; - public QueryGroupRequestRejectionOperationListener(QueryGroupService queryGroupService, ThreadPool threadPool) { + public QueryGroupRequestOperationListener(QueryGroupService queryGroupService, ThreadPool threadPool) { this.queryGroupService = queryGroupService; this.threadPool = threadPool; } @@ -36,4 +37,10 @@ protected void onRequestStart(SearchRequestContext searchRequestContext) { final String queryGroupId = threadPool.getThreadContext().getHeader(QueryGroupTask.QUERY_GROUP_ID_HEADER); queryGroupService.rejectIfNeeded(queryGroupId); } + + @Override + protected void onRequestFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext) { + final String queryGroupId = threadPool.getThreadContext().getHeader(QueryGroupTask.QUERY_GROUP_ID_HEADER); + queryGroupService.incrementFailuresFor(queryGroupId); + } } diff --git a/server/src/main/java/org/opensearch/wlm/listeners/WorkloadManagementRequestFailureListener.java b/server/src/main/java/org/opensearch/wlm/listeners/WorkloadManagementRequestFailureListener.java deleted file mode 100644 index 9d65392ccfdfb..0000000000000 --- a/server/src/main/java/org/opensearch/wlm/listeners/WorkloadManagementRequestFailureListener.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.wlm.listeners; - -import org.opensearch.action.search.SearchPhaseContext; -import org.opensearch.action.search.SearchRequestContext; -import org.opensearch.action.search.SearchRequestOperationsListener; -import org.opensearch.threadpool.ThreadPool; -import org.opensearch.wlm.QueryGroupService; -import org.opensearch.wlm.QueryGroupTask; - -/** - * Request listener to listen for failures and update query group failure stats - */ -public class WorkloadManagementRequestFailureListener extends SearchRequestOperationsListener { - private final QueryGroupService queryGroupService; - private final ThreadPool threadPool; - - public WorkloadManagementRequestFailureListener(QueryGroupService queryGroupService, ThreadPool threadPool) { - this.queryGroupService = queryGroupService; - this.threadPool = threadPool; - } - - @Override - protected void onRequestFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext) { - final String queryGroupId = threadPool.getThreadContext().getHeader(QueryGroupTask.QUERY_GROUP_ID_HEADER); - queryGroupService.incrementFailuresFor(queryGroupId); - } -} diff --git a/server/src/test/java/org/opensearch/wlm/listeners/WorkloadManagementRequestFailureListenerTests.java b/server/src/test/java/org/opensearch/wlm/listeners/QueryGroupRequestOperationListenerTests.java similarity index 78% rename from server/src/test/java/org/opensearch/wlm/listeners/WorkloadManagementRequestFailureListenerTests.java rename to server/src/test/java/org/opensearch/wlm/listeners/QueryGroupRequestOperationListenerTests.java index 0afb37ab3f69c..0307ff623c408 100644 --- a/server/src/test/java/org/opensearch/wlm/listeners/WorkloadManagementRequestFailureListenerTests.java +++ b/server/src/test/java/org/opensearch/wlm/listeners/QueryGroupRequestOperationListenerTests.java @@ -9,6 +9,7 @@ package org.opensearch.wlm.listeners; import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; @@ -24,20 +25,26 @@ import java.util.List; import java.util.Map; -public class WorkloadManagementRequestFailureListenerTests extends OpenSearchTestCase { +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; + +public class QueryGroupRequestOperationListenerTests extends OpenSearchTestCase { public static final int ITERATIONS = 20; ThreadPool testThreadPool; QueryGroupService queryGroupService; Map queryGroupStateMap; String testQueryGroupId; - WorkloadManagementRequestFailureListener sut; + QueryGroupRequestOperationListener sut; public void setUp() throws Exception { super.setUp(); queryGroupStateMap = new HashMap<>(); testQueryGroupId = "safjgagnakg-3r3fads"; testThreadPool = new TestThreadPool("RejectionTestThreadPool"); + queryGroupService = mock(QueryGroupService.class); + sut = new QueryGroupRequestOperationListener(queryGroupService, testThreadPool); } public void tearDown() throws Exception { @@ -45,6 +52,21 @@ public void tearDown() throws Exception { testThreadPool.shutdown(); } + public void testRejectionCase() { + final String testQueryGroupId = "asdgasgkajgkw3141_3rt4t"; + testThreadPool.getThreadContext().putHeader(QueryGroupTask.QUERY_GROUP_ID_HEADER, testQueryGroupId); + doThrow(OpenSearchRejectedExecutionException.class).when(queryGroupService).rejectIfNeeded(testQueryGroupId); + assertThrows(OpenSearchRejectedExecutionException.class, () -> sut.onRequestStart(null)); + } + + public void testNonRejectionCase() { + final String testQueryGroupId = "asdgasgkajgkw3141_3rt4t"; + testThreadPool.getThreadContext().putHeader(QueryGroupTask.QUERY_GROUP_ID_HEADER, testQueryGroupId); + doNothing().when(queryGroupService).rejectIfNeeded(testQueryGroupId); + + sut.onRequestStart(null); + } + public void testValidQueryGroupRequestFailure() throws IOException { QueryGroupStats expectedStats = new QueryGroupStats( @@ -74,7 +96,7 @@ public void testMultiThreadedValidQueryGroupRequestFailures() { queryGroupService = new QueryGroupService(queryGroupStateMap); - sut = new WorkloadManagementRequestFailureListener(queryGroupService, testThreadPool); + sut = new QueryGroupRequestOperationListener(queryGroupService, testThreadPool); List threads = new ArrayList<>(); for (int i = 0; i < ITERATIONS; i++) { @@ -154,7 +176,7 @@ private void assertSuccess( queryGroupService = new QueryGroupService(queryGroupStateMap); - sut = new WorkloadManagementRequestFailureListener(queryGroupService, testThreadPool); + sut = new QueryGroupRequestOperationListener(queryGroupService, testThreadPool); sut.onRequestFailure(null, null); QueryGroupStats actualStats = queryGroupService.nodeStats(); diff --git a/server/src/test/java/org/opensearch/wlm/listeners/QueryGroupRequestRejectionOperationListenerTests.java b/server/src/test/java/org/opensearch/wlm/listeners/QueryGroupRequestRejectionOperationListenerTests.java deleted file mode 100644 index 19e82aca26153..0000000000000 --- a/server/src/test/java/org/opensearch/wlm/listeners/QueryGroupRequestRejectionOperationListenerTests.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.wlm.listeners; - -import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; -import org.opensearch.test.OpenSearchTestCase; -import org.opensearch.threadpool.TestThreadPool; -import org.opensearch.threadpool.ThreadPool; -import org.opensearch.wlm.QueryGroupService; -import org.opensearch.wlm.QueryGroupTask; - -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; - -public class QueryGroupRequestRejectionOperationListenerTests extends OpenSearchTestCase { - ThreadPool testThreadPool; - QueryGroupService queryGroupService; - QueryGroupRequestRejectionOperationListener sut; - - public void setUp() throws Exception { - super.setUp(); - testThreadPool = new TestThreadPool("RejectionTestThreadPool"); - queryGroupService = mock(QueryGroupService.class); - sut = new QueryGroupRequestRejectionOperationListener(queryGroupService, testThreadPool); - } - - public void tearDown() throws Exception { - super.tearDown(); - testThreadPool.shutdown(); - } - - public void testRejectionCase() { - final String testQueryGroupId = "asdgasgkajgkw3141_3rt4t"; - testThreadPool.getThreadContext().putHeader(QueryGroupTask.QUERY_GROUP_ID_HEADER, testQueryGroupId); - doThrow(OpenSearchRejectedExecutionException.class).when(queryGroupService).rejectIfNeeded(testQueryGroupId); - assertThrows(OpenSearchRejectedExecutionException.class, () -> sut.onRequestStart(null)); - } - - public void testNonRejectionCase() { - final String testQueryGroupId = "asdgasgkajgkw3141_3rt4t"; - testThreadPool.getThreadContext().putHeader(QueryGroupTask.QUERY_GROUP_ID_HEADER, testQueryGroupId); - doNothing().when(queryGroupService).rejectIfNeeded(testQueryGroupId); - - sut.onRequestStart(null); - } -} From d89422bff6206a5654370252c07cd9b2ec2eab31 Mon Sep 17 00:00:00 2001 From: Kaushal Kumar Date: Fri, 30 Aug 2024 11:40:56 -0700 Subject: [PATCH 6/7] address comments Signed-off-by: Kaushal Kumar --- server/src/main/java/org/opensearch/node/Node.java | 6 ++++-- .../src/main/java/org/opensearch/wlm/ResourceType.java | 7 +++++++ .../java/org/opensearch/wlm/stats/QueryGroupStats.java | 9 +++------ 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index e3fb3718e63a2..08cf73b28bee8 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -1021,8 +1021,10 @@ protected Node( final QueryGroupService queryGroupService = new QueryGroupService(); // We will need to replace this with actual instance of the // queryGroupService - final QueryGroupRequestOperationListener queryGroupRequestRejectionListener = - new QueryGroupRequestOperationListener(queryGroupService, threadPool); + final QueryGroupRequestOperationListener queryGroupRequestRejectionListener = new QueryGroupRequestOperationListener( + queryGroupService, + threadPool + ); // register all standard SearchRequestOperationsCompositeListenerFactory to the SearchRequestOperationsCompositeListenerFactory final SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory = diff --git a/server/src/main/java/org/opensearch/wlm/ResourceType.java b/server/src/main/java/org/opensearch/wlm/ResourceType.java index c3f48f5f793ce..2e8da4f57f36c 100644 --- a/server/src/main/java/org/opensearch/wlm/ResourceType.java +++ b/server/src/main/java/org/opensearch/wlm/ResourceType.java @@ -14,6 +14,7 @@ import org.opensearch.tasks.Task; import java.io.IOException; +import java.util.List; import java.util.function.Function; /** @@ -30,6 +31,8 @@ public enum ResourceType { private final Function getResourceUsage; private final boolean statsEnabled; + private static List sortedValues = List.of(CPU, MEMORY); + ResourceType(String name, Function getResourceUsage, boolean statsEnabled) { this.name = name; this.getResourceUsage = getResourceUsage; @@ -71,4 +74,8 @@ public long getResourceUsage(Task task) { public boolean hasStatsEnabled() { return statsEnabled; } + + public static List getSortedValues() { + return sortedValues; + } } diff --git a/server/src/main/java/org/opensearch/wlm/stats/QueryGroupStats.java b/server/src/main/java/org/opensearch/wlm/stats/QueryGroupStats.java index 99cbc9348821a..9fc7039cd1852 100644 --- a/server/src/main/java/org/opensearch/wlm/stats/QueryGroupStats.java +++ b/server/src/main/java/org/opensearch/wlm/stats/QueryGroupStats.java @@ -170,12 +170,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(FAILURES, failures); builder.field(TOTAL_CANCELLATIONS, totalCancellations); - List> entryList = new ArrayList<>(resourceStats.entrySet()); - entryList.sort((k1, k2) -> k1.getKey().compareTo(k2.getKey())); - - for (Map.Entry resourceStat : entryList) { - ResourceType resourceType = resourceStat.getKey(); - ResourceStats resourceStats1 = resourceStat.getValue(); + for (ResourceType resourceType : ResourceType.getSortedValues()) { + ResourceStats resourceStats1 = resourceStats.get(resourceType); + if (resourceStats1 == null) continue; builder.startObject(resourceType.getName()); resourceStats1.toXContent(builder, params); builder.endObject(); From 59952e4e220d132d6171cf8cee1f7ff7b86f5881 Mon Sep 17 00:00:00 2001 From: Kaushal Kumar Date: Fri, 30 Aug 2024 11:47:34 -0700 Subject: [PATCH 7/7] rename listener instance Signed-off-by: Kaushal Kumar --- server/src/main/java/org/opensearch/node/Node.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 08cf73b28bee8..6373621c1143f 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -1021,7 +1021,7 @@ protected Node( final QueryGroupService queryGroupService = new QueryGroupService(); // We will need to replace this with actual instance of the // queryGroupService - final QueryGroupRequestOperationListener queryGroupRequestRejectionListener = new QueryGroupRequestOperationListener( + final QueryGroupRequestOperationListener queryGroupRequestOperationListener = new QueryGroupRequestOperationListener( queryGroupService, threadPool ); @@ -1034,7 +1034,7 @@ protected Node( searchRequestStats, searchRequestSlowLog, searchTaskRequestOperationsListener, - queryGroupRequestRejectionListener + queryGroupRequestOperationListener ), pluginComponents.stream() .filter(p -> p instanceof SearchRequestOperationsListener)