Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding QueryGroup schema #13669

Merged
merged 31 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
d417d17
rebase with opensearch/main
kaushalmahi12 Apr 10, 2024
be38bae
add resourceLimitGroupId propagation logic from coordinator to data n…
kaushalmahi12 Apr 12, 2024
8651420
add sandbox schema
kaushalmahi12 Apr 15, 2024
07db8cc
add resourceLimitGroupTests
kaushalmahi12 Apr 16, 2024
f531c53
add resourceLimitGroupMetadata tests
kaushalmahi12 Apr 18, 2024
bfdfb88
run spotlessApply
kaushalmahi12 Apr 18, 2024
7631713
add mode field in ResourceLimitGroup schema
kaushalmahi12 Apr 18, 2024
49b63fa
fix breaking testcases
kaushalmahi12 Apr 19, 2024
0270812
add task cancellation skeleton
kaushalmahi12 Apr 22, 2024
942e142
add multitenant labels in searchSource builder
kaushalmahi12 May 8, 2024
0ca624c
write custom xcontent parser for ResourceLimitGroup
kaushalmahi12 May 14, 2024
163031f
remove unrelated changes
kaushalmahi12 May 14, 2024
c07e5f8
remove non-existing import fro cluster settings
kaushalmahi12 May 14, 2024
098e4df
remove non releated changes
kaushalmahi12 May 14, 2024
fba1dac
add _id as the resourceLimitGroup key
kaushalmahi12 May 15, 2024
7498237
add change to register resource limit group metadata
kaushalmahi12 May 20, 2024
34d22a9
add updatedAt in resource limit group
kaushalmahi12 Jun 3, 2024
a918d35
rename resourceLimitGroup to queryGroup
kaushalmahi12 Jun 13, 2024
a494037
address the comments on PR
kaushalmahi12 Jun 19, 2024
fb58576
rename the mode member var to resiliency mode
kaushalmahi12 Jun 19, 2024
3a1a3ef
address comments
kaushalmahi12 Jun 20, 2024
0da3b68
add change in CHANGELOG
kaushalmahi12 Jun 21, 2024
4508c9d
add tests for custom namedWritable QueryGroupMetadata
kaushalmahi12 Jun 21, 2024
1e45475
structure resourceLimits into an object
kaushalmahi12 Jun 25, 2024
769a46f
add QueryGroup.toXContent test case
kaushalmahi12 Jun 26, 2024
15076eb
Merge branch 'main' into feature/sandbox-schemaPR
kaushalmahi12 Jul 2, 2024
5a7f405
fix precommit errors
kaushalmahi12 Jul 2, 2024
5153a74
fix precommit errors
kaushalmahi12 Jul 2, 2024
c5a70e9
fix assemble errors
kaushalmahi12 Jul 2, 2024
42d3494
fix checkstyle errors
kaushalmahi12 Jul 2, 2024
8a45648
address comments
kaushalmahi12 Jul 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote Store] Rate limiter for remote store low priority uploads ([#14374](https://github.com/opensearch-project/OpenSearch/pull/14374/))
- Apply the date histogram rewrite optimization to range aggregation ([#13865](https://github.com/opensearch-project/OpenSearch/pull/13865))
- [Writable Warm] Add composite directory implementation and integrate it with FileCache ([12782](https://github.com/opensearch-project/OpenSearch/pull/12782))
- [Workload Management] Add QueryGroup schema ([13669](https://github.com/opensearch-project/OpenSearch/pull/13669))
- Add batching supported processor base type AbstractBatchingProcessor ([#14554](https://github.com/opensearch-project/OpenSearch/pull/14554))
- Fix race condition while parsing derived fields from search definition ([14445](https://github.com/opensearch-project/OpenSearch/pull/14445))
- Add allowlist setting for ingest-common and search-pipeline-common processors ([#14439](https://github.com/opensearch-project/OpenSearch/issues/14439))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.opensearch.cluster.metadata.MetadataIndexTemplateService;
import org.opensearch.cluster.metadata.MetadataMappingService;
import org.opensearch.cluster.metadata.MetadataUpdateSettingsService;
import org.opensearch.cluster.metadata.QueryGroupMetadata;
import org.opensearch.cluster.metadata.RepositoriesMetadata;
import org.opensearch.cluster.metadata.ViewMetadata;
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
Expand Down Expand Up @@ -214,6 +215,8 @@ public static List<Entry> getNamedWriteables() {
DecommissionAttributeMetadata::new,
DecommissionAttributeMetadata::readDiffFrom
);

registerMetadataCustom(entries, QueryGroupMetadata.TYPE, QueryGroupMetadata::new, QueryGroupMetadata::readDiffFrom);
kaushalmahi12 marked this conversation as resolved.
Show resolved Hide resolved
// Task Status (not Diffable)
entries.add(new Entry(Task.Status.class, PersistentTasksNodeService.Status.NAME, PersistentTasksNodeService.Status::new));
return entries;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1367,6 +1367,25 @@
return this;
}

public Builder queryGroups(final Map<String, QueryGroup> queryGroups) {
this.customs.put(QueryGroupMetadata.TYPE, new QueryGroupMetadata(queryGroups));
return this;

Check warning on line 1372 in server/src/main/java/org/opensearch/cluster/metadata/Metadata.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/metadata/Metadata.java#L1371-L1372

Added lines #L1371 - L1372 were not covered by tests
}

public Builder put(final QueryGroup queryGroup) {
Objects.requireNonNull(queryGroup, "queryGroup should not be null");
Map<String, QueryGroup> existing = new HashMap<>(getQueryGroups());
existing.put(queryGroup.get_id(), queryGroup);
return queryGroups(existing);

Check warning on line 1379 in server/src/main/java/org/opensearch/cluster/metadata/Metadata.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/metadata/Metadata.java#L1376-L1379

Added lines #L1376 - L1379 were not covered by tests
}

private Map<String, QueryGroup> getQueryGroups() {
return Optional.ofNullable(this.customs.get(QueryGroupMetadata.TYPE))
.map(o -> (QueryGroupMetadata) o)
.map(QueryGroupMetadata::queryGroups)
.orElse(Collections.emptyMap());

Check warning on line 1386 in server/src/main/java/org/opensearch/cluster/metadata/Metadata.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/metadata/Metadata.java#L1383-L1386

Added lines #L1383 - L1386 were not covered by tests
}

kaushalmahi12 marked this conversation as resolved.
Show resolved Hide resolved
private Map<String, View> getViews() {
return Optional.ofNullable(customs.get(ViewMetadata.TYPE))
.map(o -> (ViewMetadata) o)
Expand Down
317 changes: 317 additions & 0 deletions server/src/main/java/org/opensearch/cluster/metadata/QueryGroup.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,317 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.metadata;

import org.opensearch.cluster.AbstractDiffable;
import org.opensearch.cluster.Diff;
import org.opensearch.common.UUIDs;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.search.ResourceType;
import org.joda.time.Instant;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

/**
* Class to define the QueryGroup schema
* {
* "_id": "fafjafjkaf9ag8a9ga9g7ag0aagaga",
* "resourceLimits": {
* "jvm": 0.4
* },
* "resiliency_mode": "enforced",
* "name": "analytics",
* "updatedAt": 4513232415
* }
*/
@PublicApi(since = "2.15")
kaushalmahi12 marked this conversation as resolved.
Show resolved Hide resolved
public class QueryGroup extends AbstractDiffable<QueryGroup> implements ToXContentObject {

public static final int MAX_CHARS_ALLOWED_IN_NAME = 50;
kaushalmahi12 marked this conversation as resolved.
Show resolved Hide resolved
private final String name;
private final String _id;
private final ResiliencyMode resiliencyMode;
// It is an epoch in millis
private final long updatedAtInMillis;
private final Map<ResourceType, Object> resourceLimits;

public QueryGroup(String name, ResiliencyMode resiliencyMode, Map<ResourceType, Object> resourceLimits) {
this(name, UUIDs.randomBase64UUID(), resiliencyMode, resourceLimits, Instant.now().getMillis());
}

Check warning on line 53 in server/src/main/java/org/opensearch/cluster/metadata/QueryGroup.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/metadata/QueryGroup.java#L52-L53

Added lines #L52 - L53 were not covered by tests

public QueryGroup(String name, String _id, ResiliencyMode resiliencyMode, Map<ResourceType, Object> resourceLimits, long updatedAt) {
Objects.requireNonNull(name, "QueryGroup.name can't be null");
Objects.requireNonNull(resourceLimits, "QueryGroup.resourceLimits can't be null");
Objects.requireNonNull(resiliencyMode, "QueryGroup.resiliencyMode can't be null");
Objects.requireNonNull(_id, "QueryGroup._id can't be null");

if (name.length() > MAX_CHARS_ALLOWED_IN_NAME) {
throw new IllegalArgumentException("QueryGroup.name shouldn't be more than 50 chars long");

Check warning on line 62 in server/src/main/java/org/opensearch/cluster/metadata/QueryGroup.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/metadata/QueryGroup.java#L62

Added line #L62 was not covered by tests
}

if (resourceLimits.isEmpty()) {
throw new IllegalArgumentException("QueryGroup.resourceLimits should at least have 1 resource limit");
kaushalmahi12 marked this conversation as resolved.
Show resolved Hide resolved
}
validateResourceLimits(resourceLimits);
if (!isValid(updatedAt)) {
throw new IllegalArgumentException("QueryGroup.updatedAtInMillis is not a valid epoch");

Check warning on line 70 in server/src/main/java/org/opensearch/cluster/metadata/QueryGroup.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/metadata/QueryGroup.java#L70

Added line #L70 was not covered by tests
}
kaushalmahi12 marked this conversation as resolved.
Show resolved Hide resolved

this.name = name;
this._id = _id;
this.resiliencyMode = resiliencyMode;
this.resourceLimits = resourceLimits;
this.updatedAtInMillis = updatedAt;
}

private static boolean isValid(long updatedAt) {
long minValidTimestamp = Instant.ofEpochMilli(0L).getMillis();

// Use Instant.now() to get the current time in seconds since epoch
long currentSeconds = Instant.now().getMillis();

// Check if the timestamp is within a reasonable range
return minValidTimestamp <= updatedAt && updatedAt <= currentSeconds;
}

public QueryGroup(StreamInput in) throws IOException {
this(
in.readString(),
in.readString(),
ResiliencyMode.fromName(in.readString()),
in.readMap((i) -> ResourceType.fromName(i.readString()), StreamInput::readGenericValue),
in.readLong()
);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
out.writeString(_id);
out.writeString(resiliencyMode.getName());
out.writeMap(resourceLimits, ResourceType::writeTo, StreamOutput::writeGenericValue);
out.writeLong(updatedAtInMillis);
}

private void validateResourceLimits(Map<ResourceType, Object> resourceLimits) {
for (Map.Entry<ResourceType, Object> resource : resourceLimits.entrySet()) {
Double threshold = (Double) resource.getValue();
kaushalmahi12 marked this conversation as resolved.
Show resolved Hide resolved
Objects.requireNonNull(resource.getKey(), "resourceName can't be null");
Objects.requireNonNull(threshold, "resource limit threshold for" + resource.getKey().getName() + " : can't be null");

if (Double.compare(threshold, 1.0) > 0) {
throw new IllegalArgumentException("resource value should be less than 1.0");
}
}
}

@Override
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
builder.startObject();
builder.field("_id", _id);
builder.field("name", name);
builder.field("resiliency_mode", resiliencyMode.getName());
builder.field("updatedAt", updatedAtInMillis);
// write resource limits
builder.startObject("resourceLimits");
for (ResourceType resourceType : ResourceType.values()) {
if (resourceLimits.containsKey(resourceType)) {
builder.field(resourceType.getName(), resourceLimits.get(resourceType));
}
}
builder.endObject();

builder.endObject();
return builder;
}

public static QueryGroup fromXContent(final XContentParser parser) throws IOException {
if (parser.currentToken() == null) { // fresh parser? move to the first token
parser.nextToken();
}

Builder builder = builder();

XContentParser.Token token = parser.currentToken();

if (token != XContentParser.Token.START_OBJECT) {
throw new IllegalArgumentException("Expected START_OBJECT token but found [" + parser.currentName() + "]");

Check warning on line 151 in server/src/main/java/org/opensearch/cluster/metadata/QueryGroup.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/metadata/QueryGroup.java#L151

Added line #L151 was not covered by tests
}

String fieldName = "";
kaushalmahi12 marked this conversation as resolved.
Show resolved Hide resolved
// Map to hold resources
final Map<ResourceType, Object> resourceLimits = new HashMap<>();
while ((token = parser.nextToken()) != null) {
if (token == XContentParser.Token.FIELD_NAME) {
fieldName = parser.currentName();
} else if (token.isValue()) {
if (fieldName.equals("_id")) {
builder._id(parser.text());
} else if (fieldName.equals("name")) {
builder.name(parser.text());
} else if (fieldName.equals("resiliency_mode")) {
builder.mode(parser.text());
} else if (fieldName.equals("updatedAt")) {
builder.updatedAt(parser.longValue());
} else {
throw new IllegalArgumentException(fieldName + " is not a valid field in QueryGroup");

Check warning on line 170 in server/src/main/java/org/opensearch/cluster/metadata/QueryGroup.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/metadata/QueryGroup.java#L170

Added line #L170 was not covered by tests
}
} else if (token == XContentParser.Token.START_OBJECT) {

if (!fieldName.equals("resourceLimits")) {
throw new IllegalArgumentException(

Check warning on line 175 in server/src/main/java/org/opensearch/cluster/metadata/QueryGroup.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/metadata/QueryGroup.java#L175

Added line #L175 was not covered by tests
"QueryGroup.resourceLimits is an object and expected token was { " + " but found " + token
);
}

while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
fieldName = parser.currentName();
} else {
resourceLimits.put(ResourceType.fromName(fieldName), parser.doubleValue());
}
}

}
}
builder.resourceLimits(resourceLimits);
return builder.build();
}

public static Diff<QueryGroup> readDiff(final StreamInput in) throws IOException {
return readDiffFrom(QueryGroup::new, in);

Check warning on line 195 in server/src/main/java/org/opensearch/cluster/metadata/QueryGroup.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/metadata/QueryGroup.java#L195

Added line #L195 was not covered by tests
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
QueryGroup that = (QueryGroup) o;
return Objects.equals(name, that.name)
&& Objects.equals(resourceLimits, that.resourceLimits)
&& Objects.equals(_id, that._id)
&& updatedAtInMillis == that.updatedAtInMillis;
}

@Override
public int hashCode() {
return Objects.hash(name, resourceLimits, updatedAtInMillis, _id);
}

public String getName() {
return name;
}

public ResiliencyMode getResiliencyMode() {
return resiliencyMode;
}

public Map<ResourceType, Object> getResourceLimits() {
return resourceLimits;
}

public String get_id() {
return _id;
}

public long getUpdatedAtInMillis() {
return updatedAtInMillis;
}

/**
* builder method for the {@link QueryGroup}
* @return Builder object
*/
public static Builder builder() {
return new Builder();
}

/**
* This enum models the different QueryGroup resiliency modes
* SOFT - means that this query group can consume more than query group resource limits if node is not in duress
* ENFORCED - means that it will never breach the assigned limits and will cancel as soon as the limits are breached
* MONITOR - it will not cause any cancellation but just log the eligible task cancellations
*/
@PublicApi(since = "2.15")
public enum ResiliencyMode {
SOFT("soft"),
ENFORCED("enforced"),
MONITOR("monitor");

private final String name;

ResiliencyMode(String mode) {
this.name = mode;
}

public String getName() {
return name;
}

public static ResiliencyMode fromName(String s) {
for (ResiliencyMode mode : values()) {
if (mode.getName().equalsIgnoreCase(s)) return mode;

}
throw new IllegalArgumentException("Invalid value for QueryGroupMode: " + s);

Check warning on line 269 in server/src/main/java/org/opensearch/cluster/metadata/QueryGroup.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/metadata/QueryGroup.java#L269

Added line #L269 was not covered by tests
}

}

/**
* Builder class for {@link QueryGroup}
*/
@PublicApi(since = "2.15")
public static class Builder {
private String name;
private String _id;
private ResiliencyMode resiliencyMode;
private long updatedAt;
private Map<ResourceType, Object> resourceLimits;

private Builder() {}

public Builder name(String name) {
this.name = name;
return this;
}

public Builder _id(String _id) {
this._id = _id;
return this;
}

public Builder mode(String mode) {
this.resiliencyMode = ResiliencyMode.fromName(mode);
return this;
}

public Builder updatedAt(long updatedAt) {
this.updatedAt = updatedAt;
return this;
}

public Builder resourceLimits(Map<ResourceType, Object> resourceLimits) {
this.resourceLimits = resourceLimits;
return this;
}

public QueryGroup build() {
return new QueryGroup(name, _id, resiliencyMode, resourceLimits, updatedAt);
}

}
}
Loading
Loading