Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add request failure listener to track failures at query group level #15527

1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Workload Management] Add rejection logic for co-ordinator and shard level requests ([#15428](https://github.com/opensearch-project/OpenSearch/pull/15428)))
- Adding translog durability validation in index templates ([#15494](https://github.com/opensearch-project/OpenSearch/pull/15494))
- Add index creation using the context field ([#15290](https://github.com/opensearch-project/OpenSearch/pull/15290))
- [Workload Management] Add query group level failure tracking ([#15227](https://github.com/opensearch-project/OpenSearch/pull/15527))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down
13 changes: 8 additions & 5 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@
import org.opensearch.wlm.QueryGroupService;
import org.opensearch.wlm.WorkloadManagementTransportInterceptor;
import org.opensearch.wlm.listeners.QueryGroupRequestRejectionOperationListener;
import org.opensearch.wlm.listeners.WorkloadManagementRequestFailureListener;

import javax.net.ssl.SNIHostName;

Expand Down Expand Up @@ -1019,11 +1020,12 @@ protected Node(
List<IdentityAwarePlugin> identityAwarePlugins = pluginsService.filterPlugins(IdentityAwarePlugin.class);
identityService.initializeIdentityAwarePlugins(identityAwarePlugins);

final QueryGroupService queryGroupService = new QueryGroupService(); // We will need to replace this with actual instance of the
// queryGroupService
final QueryGroupRequestRejectionOperationListener queryGroupRequestRejectionListener =
new QueryGroupRequestRejectionOperationListener(
new QueryGroupService(), // We will need to replace this with actual instance of the queryGroupService
threadPool
);
new QueryGroupRequestRejectionOperationListener(queryGroupService, threadPool);
final WorkloadManagementRequestFailureListener workloadManagementRequestFailureListener =
new WorkloadManagementRequestFailureListener(queryGroupService, threadPool);

// register all standard SearchRequestOperationsCompositeListenerFactory to the SearchRequestOperationsCompositeListenerFactory
final SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory =
Expand All @@ -1033,7 +1035,8 @@ protected Node(
searchRequestStats,
searchRequestSlowLog,
searchTaskRequestOperationsListener,
queryGroupRequestRejectionListener
queryGroupRequestRejectionListener,
workloadManagementRequestFailureListener
),
pluginComponents.stream()
.filter(p -> p instanceof SearchRequestOperationsListener)
Expand Down
50 changes: 49 additions & 1 deletion server/src/main/java/org/opensearch/wlm/QueryGroupService.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,59 @@
package org.opensearch.wlm;

import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.wlm.stats.QueryGroupState;
import org.opensearch.wlm.stats.QueryGroupStats;
import org.opensearch.wlm.stats.QueryGroupStats.QueryGroupStatsHolder;

import java.util.HashMap;
import java.util.Map;

/**
* This is stub at this point in time and will be replace by an acutal one in couple of days
* As of now this is a stub and main implementation PR will be raised soon.Coming PR will collate these changes with core QueryGroupService changes
*/
public class QueryGroupService {
// This map does not need to be concurrent since we will process the cluster state change serially and update
// this map with new additions and deletions of entries. QueryGroupState is thread safe
private final Map<String, QueryGroupState> queryGroupStateMap;

public QueryGroupService() {
this(new HashMap<>());
}

public QueryGroupService(Map<String, QueryGroupState> queryGroupStateMap) {
this.queryGroupStateMap = queryGroupStateMap;
}

/**
* updates the failure stats for the query group
* @param queryGroupId query group identifier
*/
public void incrementFailuresFor(final String queryGroupId) {
QueryGroupState queryGroupState = queryGroupStateMap.get(queryGroupId);
// This can happen if the request failed for a deleted query group
// or new queryGroup is being created and has not been acknowledged yet
if (queryGroupState == null) {
return;
}
queryGroupState.failures.inc();
}

/**
*
* @return node level query group stats
*/
public QueryGroupStats nodeStats() {
final Map<String, QueryGroupStatsHolder> statsHolderMap = new HashMap<>();
for (Map.Entry<String, QueryGroupState> queryGroupsState : queryGroupStateMap.entrySet()) {
final String queryGroupId = queryGroupsState.getKey();
final QueryGroupState currentState = queryGroupsState.getValue();

statsHolderMap.put(queryGroupId, QueryGroupStatsHolder.from(currentState));
}

return new QueryGroupStats(statsHolderMap);
}

/**
*
* @param queryGroupId query group identifier
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.wlm.listeners;

import org.opensearch.action.search.SearchPhaseContext;
import org.opensearch.action.search.SearchRequestContext;
import org.opensearch.action.search.SearchRequestOperationsListener;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.wlm.QueryGroupService;
import org.opensearch.wlm.QueryGroupTask;

/**
* Request listener to listen for failures and update query group failure stats
*/
public class WorkloadManagementRequestFailureListener extends SearchRequestOperationsListener {
kaushalmahi12 marked this conversation as resolved.
Show resolved Hide resolved
private final QueryGroupService queryGroupService;
private final ThreadPool threadPool;

public WorkloadManagementRequestFailureListener(QueryGroupService queryGroupService, ThreadPool threadPool) {
this.queryGroupService = queryGroupService;
this.threadPool = threadPool;
}

@Override
protected void onRequestFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
final String queryGroupId = threadPool.getThreadContext().getHeader(QueryGroupTask.QUERY_GROUP_ID_HEADER);
queryGroupService.incrementFailuresFor(queryGroupId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
/**
* this will track the cumulative failures in a query group
*/
final CounterMetric failures = new CounterMetric();
public final CounterMetric failures = new CounterMetric();

/**
* This will track total number of cancellations in the query group due to all resource type breaches
Expand Down Expand Up @@ -95,9 +95,18 @@
final ResourceType resourceType;
final CounterMetric cancellations = new CounterMetric();
final CounterMetric rejections = new CounterMetric();
private double lastRecordedUsage = 0;

public ResourceTypeState(ResourceType resourceType) {
this.resourceType = resourceType;
}

public void setLastRecordedUsage(double recordedUsage) {
lastRecordedUsage = recordedUsage;
}

Check warning on line 106 in server/src/main/java/org/opensearch/wlm/stats/QueryGroupState.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/wlm/stats/QueryGroupState.java#L105-L106

Added lines #L105 - L106 were not covered by tests

public double getLastRecordedUsage() {
return lastRecordedUsage;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.wlm.ResourceType;
import org.opensearch.wlm.stats.QueryGroupState.ResourceTypeState;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

Expand Down Expand Up @@ -52,7 +56,11 @@ public void writeTo(StreamOutput out) throws IOException {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("query_groups");
for (Map.Entry<String, QueryGroupStatsHolder> queryGroupStats : stats.entrySet()) {
// to keep the toXContent consistent
List<Map.Entry<String, QueryGroupStatsHolder>> entryList = new ArrayList<>(stats.entrySet());
entryList.sort((k1, k2) -> k1.getKey().compareTo(k2.getKey()));

for (Map.Entry<String, QueryGroupStatsHolder> queryGroupStats : entryList) {
builder.startObject(queryGroupStats.getKey());
queryGroupStats.getValue().toXContent(builder, params);
builder.endObject();
Expand Down Expand Up @@ -83,11 +91,14 @@ public static class QueryGroupStatsHolder implements ToXContentObject, Writeable
public static final String REJECTIONS = "rejections";
public static final String TOTAL_CANCELLATIONS = "total_cancellations";
public static final String FAILURES = "failures";
private final long completions;
private final long rejections;
private final long failures;
private final long totalCancellations;
private final Map<ResourceType, ResourceStats> resourceStats;
private long completions;
private long rejections;
private long failures;
private long totalCancellations;
private Map<ResourceType, ResourceStats> resourceStats;

// this is needed to support the factory method
public QueryGroupStatsHolder() {}

public QueryGroupStatsHolder(
long completions,
Expand All @@ -111,6 +122,28 @@ public QueryGroupStatsHolder(StreamInput in) throws IOException {
this.resourceStats = in.readMap((i) -> ResourceType.fromName(i.readString()), ResourceStats::new);
}

/**
* static factory method to convert {@link QueryGroupState} into {@link QueryGroupStatsHolder}
* @param queryGroupState which needs to be converted
* @return QueryGroupStatsHolder object
*/
public static QueryGroupStatsHolder from(QueryGroupState queryGroupState) {
final QueryGroupStatsHolder statsHolder = new QueryGroupStatsHolder();

Map<ResourceType, ResourceStats> resourceStatsMap = new HashMap<>();

for (Map.Entry<ResourceType, ResourceTypeState> resourceTypeStateEntry : queryGroupState.getResourceState().entrySet()) {
resourceStatsMap.put(resourceTypeStateEntry.getKey(), ResourceStats.from(resourceTypeStateEntry.getValue()));
}

statsHolder.completions = queryGroupState.getCompletions();
statsHolder.rejections = queryGroupState.getTotalRejections();
statsHolder.failures = queryGroupState.getFailures();
statsHolder.totalCancellations = queryGroupState.getTotalCancellations();
statsHolder.resourceStats = resourceStatsMap;
return statsHolder;
}

/**
* Writes the {@param statsHolder} to {@param out}
* @param out StreamOutput
Expand All @@ -136,7 +169,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(REJECTIONS, rejections);
builder.field(FAILURES, failures);
builder.field(TOTAL_CANCELLATIONS, totalCancellations);
for (Map.Entry<ResourceType, ResourceStats> resourceStat : resourceStats.entrySet()) {

List<Map.Entry<ResourceType, ResourceStats>> entryList = new ArrayList<>(resourceStats.entrySet());
entryList.sort((k1, k2) -> k1.getKey().compareTo(k2.getKey()));

for (Map.Entry<ResourceType, ResourceStats> resourceStat : entryList) {
kaushalmahi12 marked this conversation as resolved.
Show resolved Hide resolved
ResourceType resourceType = resourceStat.getKey();
ResourceStats resourceStats1 = resourceStat.getValue();
builder.startObject(resourceType.getName());
Expand Down Expand Up @@ -187,6 +224,19 @@ public ResourceStats(StreamInput in) throws IOException {
this.rejections = in.readVLong();
}

/**
* static factory method to convert {@link ResourceTypeState} into {@link ResourceStats}
* @param resourceTypeState which needs to be converted
* @return QueryGroupStatsHolder object
*/
public static ResourceStats from(ResourceTypeState resourceTypeState) {
return new ResourceStats(
resourceTypeState.getLastRecordedUsage(),
resourceTypeState.cancellations.count(),
resourceTypeState.rejections.count()
);
}

/**
* Writes the {@param stats} to {@param out}
* @param out StreamOutput
Expand Down
Loading
Loading