Skip to content

Commit

Permalink
add multitenant labels in searchSource builder
Browse files Browse the repository at this point in the history
Signed-off-by: Kaushal Kumar <[email protected]>
  • Loading branch information
kaushalmahi12 committed May 8, 2024
1 parent d2a19ee commit 3afd9db
Show file tree
Hide file tree
Showing 14 changed files with 184 additions and 157 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -840,8 +840,6 @@ public final ShardSearchRequest buildShardSearchRequest(SearchShardIterator shar
// Note that, we have to disable this shortcut for queries that create a context (scroll and search context).
shardRequest.canReturnNullResponseIfMatchNoDocs(hasShardResponse.get() && shardRequest.scroll() == null);

// Propagate the resource limit group from co-ordinator to data nodes
shardRequest.setResourceLimitGroupId(getTask().getResourceLimitGroupName());
return shardRequest;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,6 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla

private Boolean phaseTook = null;

private String resourceLimitGroupId;

public SearchRequest() {
this.localClusterAlias = null;
this.absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
Expand Down Expand Up @@ -264,10 +262,6 @@ public SearchRequest(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_2_12_0)) {
phaseTook = in.readOptionalBoolean();
}

if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
resourceLimitGroupId = in.readOptionalString();
}
}

@Override
Expand Down Expand Up @@ -302,9 +296,6 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_12_0)) {
out.writeOptionalBoolean(phaseTook);
}
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeOptionalString(resourceLimitGroupId);
}
}

@Override
Expand Down Expand Up @@ -706,16 +697,6 @@ public SearchRequest pipeline(String pipeline) {
public String pipeline() {
return pipeline;
}

public SearchRequest resourceLimitGroupId(String resourceLimitGroupId) {
this.resourceLimitGroupId = resourceLimitGroupId;
return this;
}

public String resourceLimitGroupId() {
return resourceLimitGroupId;
}

@Override
public SearchTask createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new SearchTask(id, type, action, this::buildDescription, parentTaskId, headers, cancelAfterTimeInterval);
Expand Down Expand Up @@ -770,8 +751,7 @@ public boolean equals(Object o) {
&& ccsMinimizeRoundtrips == that.ccsMinimizeRoundtrips
&& Objects.equals(cancelAfterTimeInterval, that.cancelAfterTimeInterval)
&& Objects.equals(pipeline, that.pipeline)
&& Objects.equals(phaseTook, that.phaseTook)
&& Objects.equals(resourceLimitGroupId, that.resourceLimitGroupId);
&& Objects.equals(phaseTook, that.phaseTook);
}

@Override
Expand All @@ -793,8 +773,7 @@ public int hashCode() {
absoluteStartMillis,
ccsMinimizeRoundtrips,
cancelAfterTimeInterval,
phaseTook,
resourceLimitGroupId
phaseTook
);
}

Expand Down Expand Up @@ -839,8 +818,6 @@ public String toString() {
+ pipeline
+ ", phaseTook="
+ phaseTook
+ ", resourceLimitGroupId="
+ resourceLimitGroupId
+ "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.opensearch.core.indices.breaker.CircuitBreakerService;
import org.opensearch.core.tasks.TaskId;
import org.opensearch.index.query.Rewriteable;
import org.opensearch.search.MultiTenantLabel;
import org.opensearch.search.SearchPhaseResult;
import org.opensearch.search.SearchService;
import org.opensearch.search.SearchShardTarget;
Expand Down Expand Up @@ -122,8 +123,7 @@
import java.util.stream.StreamSupport;

import static org.opensearch.action.admin.cluster.node.tasks.get.GetTaskAction.TASKS_ORIGIN;
import static org.opensearch.action.search.SearchType.DFS_QUERY_THEN_FETCH;
import static org.opensearch.action.search.SearchType.QUERY_THEN_FETCH;
import static org.opensearch.action.search.SearchType.*;
import static org.opensearch.search.sort.FieldSortBuilder.hasPrimaryFieldSort;

/**
Expand Down Expand Up @@ -166,6 +166,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
public static final String NOT_PROVIDED = "NOT_PROVIDED";

private final NodeClient client;
private final ThreadPool threadPool;
Expand Down Expand Up @@ -1105,7 +1106,13 @@ private void executeSearch(
localShardIterators.size() + remoteShardIterators.size()
);

task.setResourceLimitGroupName(searchRequest.resourceLimitGroupId());
// Set tenant for this request in the task for tracking the tasks across tenants
Map<String, Object> multiTenantLabels = searchRequest.source().multiTenantLabels();
String tenant = NOT_PROVIDED;
if (multiTenantLabels != null) {
tenant = (String) multiTenantLabels.get(MultiTenantLabel.TENANT_LABEL.name());
}
task.setResourceLimitGroupName(tenant);

searchAsyncActionProvider.asyncSearchAction(
task,
Expand Down
25 changes: 7 additions & 18 deletions server/src/main/java/org/opensearch/cluster/metadata/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -835,10 +835,10 @@ public Map<String, View> views() {
return Optional.ofNullable((ViewMetadata) this.custom(ViewMetadata.TYPE)).map(ViewMetadata::views).orElse(Collections.emptyMap());
}

public Map<String, ResourceLimitGroup> resourceLimitGroups() {
public Set<ResourceLimitGroup> resourceLimitGroups() {
return Optional.ofNullable((ResourceLimitGroupMetadata) this.custom(ResourceLimitGroupMetadata.TYPE))
.map(ResourceLimitGroupMetadata::resourceLimitGroups)
.orElse(Collections.emptyMap());
.orElse(Collections.emptySet());

}

Expand Down Expand Up @@ -1336,34 +1336,23 @@ public Builder removeDataStream(String name) {
return this;
}

public Builder resourceLimitGroups(final Map<String, ResourceLimitGroup> resourceLimitGroups) {
public Builder resourceLimitGroups(final Set<ResourceLimitGroup> resourceLimitGroups) {
this.customs.put(ResourceLimitGroupMetadata.TYPE, new ResourceLimitGroupMetadata(resourceLimitGroups));
return this;
}

public ResourceLimitGroup getResourceLimitGroup(final String resourceLimitGroupName) {
return getResourceLimitGroups().get(resourceLimitGroupName);
}

public Builder put(final ResourceLimitGroup resourceLimitGroup) {
Objects.requireNonNull(resourceLimitGroup, "resourceLimitGroup should not be null");
Map<String, ResourceLimitGroup> existing = new HashMap<>(getResourceLimitGroups());
existing.put(resourceLimitGroup.getName(), resourceLimitGroup);
return resourceLimitGroups(existing);
}

public Builder removeResourceLimitGroup(final String resourceLimitGroupName) {
Objects.requireNonNull(resourceLimitGroupName, "resourceLimitGroup should not be null");
Map<String, ResourceLimitGroup> existing = new HashMap<>(getResourceLimitGroups());
existing.remove(resourceLimitGroupName);
Set<ResourceLimitGroup> existing = getResourceLimitGroups();
existing.add(resourceLimitGroup);
return resourceLimitGroups(existing);
}

private Map<String, ResourceLimitGroup> getResourceLimitGroups() {
private Set<ResourceLimitGroup> getResourceLimitGroups() {
return Optional.ofNullable(this.customs.get(ResourceLimitGroupMetadata.TYPE))
.map(o -> (ResourceLimitGroupMetadata) o)
.map(ResourceLimitGroupMetadata::resourceLimitGroups)
.orElse(Collections.emptyMap());
.orElse(Collections.emptySet());
}

private Map<String, View> getViews() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public class ResourceLimitGroup extends AbstractDiffable<ResourceLimitGroup> imp
private final List<ResourceLimit> resourceLimits;
private final ResourceLimitGroupMode mode;

private static final List<String> ALLOWED_RESOURCES = List.of("jvm");
// list of resources that are allowed to be present in the ResourceLimitGroupSchema
public static final List<String> ALLOWED_RESOURCES = List.of("jvm");

public static final ParseField NAME_FIELD = new ParseField("name");
public static final ParseField RESOURCE_LIMITS_FIELD = new ParseField("resourceLimits");
Expand Down Expand Up @@ -96,10 +97,10 @@ public ResourceLimitGroup(StreamInput in) throws IOException {
@ExperimentalApi
public static class ResourceLimit implements Writeable, ToXContentObject {
private final String resourceName;
private final Double value;
private final Double threshold;

static final ParseField RESOURCE_NAME_FIELD = new ParseField("resourceName");
static final ParseField RESOURCE_VALUE_FIELD = new ParseField("value");
static final ParseField RESOURCE_VALUE_FIELD = new ParseField("threshold");

public static final ConstructingObjectParser<ResourceLimit, Void> PARSER = new ConstructingObjectParser<>(
"ResourceLimitParser",
Expand All @@ -125,7 +126,7 @@ public ResourceLimit(String resourceName, Double value) {
);
}
this.resourceName = resourceName;
this.value = value;
this.threshold = value;
}

public ResourceLimit(StreamInput in) throws IOException {
Expand All @@ -140,7 +141,7 @@ public ResourceLimit(StreamInput in) throws IOException {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(resourceName);
out.writeDouble(value);
out.writeDouble(threshold);
}

/**
Expand All @@ -153,7 +154,7 @@ public void writeTo(StreamOutput out) throws IOException {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(RESOURCE_NAME_FIELD.getPreferredName(), resourceName);
builder.field(RESOURCE_VALUE_FIELD.getPreferredName(), value);
builder.field(RESOURCE_VALUE_FIELD.getPreferredName(), threshold);
builder.endObject();
return builder;
}
Expand All @@ -167,20 +168,20 @@ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ResourceLimit that = (ResourceLimit) o;
return Objects.equals(resourceName, that.resourceName) && Objects.equals(value, that.value);
return Objects.equals(resourceName, that.resourceName) && Objects.equals(threshold, that.threshold);
}

@Override
public int hashCode() {
return Objects.hash(resourceName, value);
return Objects.hash(resourceName, threshold);
}

public String getResourceName() {
return resourceName;
}

public Double getValue() {
return value;
public Double getThreshold() {
return threshold;
}
}

Expand Down
Loading

0 comments on commit 3afd9db

Please sign in to comment.