Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Request level latency tracking #10351

Merged
merged 1 commit into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 2.x]
### Added
- Per request phase latency ([#10351](https://github.com/opensearch-project/OpenSearch/issues/10351))

### Dependencies

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ public final void start() {
0,
0,
buildTookInMillis(),
timeProvider.getPhaseTook(),
ShardSearchFailure.EMPTY_ARRAY,
clusters,
null
Expand Down Expand Up @@ -662,6 +663,7 @@ protected final SearchResponse buildSearchResponse(
successfulOps.get(),
skippedOps.get(),
buildTookInMillis(),
timeProvider.getPhaseTook(),
failures,
clusters,
searchContextId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,8 @@ public static void readMultiLineFormat(
} else if ("cancel_after_time_interval".equals(entry.getKey())
|| "cancelAfterTimeInterval".equals(entry.getKey())) {
searchRequest.setCancelAfterTimeInterval(nodeTimeValue(value, null));
} else if ("phase_took".equals(entry.getKey())) {
searchRequest.setPhaseTook(nodeBooleanValue(value));
} else {
throw new IllegalArgumentException("key [" + entry.getKey() + "] is not supported in the metadata section");
}
Expand Down Expand Up @@ -374,6 +376,9 @@ public static void writeSearchRequestParams(SearchRequest request, XContentBuild
if (request.getCancelAfterTimeInterval() != null) {
xContentBuilder.field("cancel_after_time_interval", request.getCancelAfterTimeInterval().getStringRep());
}
if (request.isPhaseTook() != null) {
xContentBuilder.field("phase_took", request.isPhaseTook());
}
xContentBuilder.endObject();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla

private String pipeline;

private Boolean phaseTook = null;

public SearchRequest() {
this.localClusterAlias = null;
this.absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
Expand Down Expand Up @@ -209,6 +211,7 @@ private SearchRequest(
this.absoluteStartMillis = absoluteStartMillis;
this.finalReduce = finalReduce;
this.cancelAfterTimeInterval = searchRequest.cancelAfterTimeInterval;
this.phaseTook = searchRequest.phaseTook;
}

/**
Expand Down Expand Up @@ -253,6 +256,9 @@ public SearchRequest(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_2_7_0)) {
pipeline = in.readOptionalString();
}
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
phaseTook = in.readOptionalBoolean();
}
}

@Override
Expand Down Expand Up @@ -284,6 +290,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_7_0)) {
out.writeOptionalString(pipeline);
}
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeOptionalBoolean(phaseTook);
}
}

@Override
Expand Down Expand Up @@ -615,6 +624,20 @@ public void setPreFilterShardSize(int preFilterShardSize) {
this.preFilterShardSize = preFilterShardSize;
}

/**
* Returns value of user-provided phase_took query parameter for this search request.
*/
public Boolean isPhaseTook() {
return phaseTook;
}

/**
* Sets value of phase_took query param if provided by user. Defaults to <code>null</code>.
*/
public void setPhaseTook(Boolean phaseTook) {
this.phaseTook = phaseTook;
}

/**
* Returns a threshold that enforces a pre-filter roundtrip to pre-filter search shards based on query rewriting if the number of shards
* the search request expands to exceeds the threshold, or <code>null</code> if the threshold is unspecified.
Expand Down Expand Up @@ -719,7 +742,8 @@ public boolean equals(Object o) {
&& absoluteStartMillis == that.absoluteStartMillis
&& ccsMinimizeRoundtrips == that.ccsMinimizeRoundtrips
&& Objects.equals(cancelAfterTimeInterval, that.cancelAfterTimeInterval)
&& Objects.equals(pipeline, that.pipeline);
&& Objects.equals(pipeline, that.pipeline)
&& Objects.equals(phaseTook, that.phaseTook);
}

@Override
Expand All @@ -740,7 +764,8 @@ public int hashCode() {
localClusterAlias,
absoluteStartMillis,
ccsMinimizeRoundtrips,
cancelAfterTimeInterval
cancelAfterTimeInterval,
phaseTook
);
}

Expand Down Expand Up @@ -783,6 +808,8 @@ public String toString() {
+ cancelAfterTimeInterval
+ ", pipeline="
+ pipeline
+ ", phaseTook="
+ phaseTook
+ "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.action.search;

import org.apache.lucene.search.TotalHits;
import org.opensearch.Version;
import org.opensearch.common.Nullable;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.StatusToXContentObject;
Expand Down Expand Up @@ -63,7 +64,9 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
Expand Down Expand Up @@ -94,6 +97,7 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb
private final ShardSearchFailure[] shardFailures;
private final Clusters clusters;
private final long tookInMillis;
private final PhaseTook phaseTook;

public SearchResponse(StreamInput in) throws IOException {
super(in);
Expand All @@ -112,6 +116,11 @@ public SearchResponse(StreamInput in) throws IOException {
clusters = new Clusters(in);
scrollId = in.readOptionalString();
tookInMillis = in.readVLong();
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
phaseTook = in.readOptionalWriteable(PhaseTook::new);
} else {
phaseTook = null;
}
skippedShards = in.readVInt();
pointInTimeId = in.readOptionalString();
}
Expand All @@ -126,7 +135,32 @@ public SearchResponse(
ShardSearchFailure[] shardFailures,
Clusters clusters
) {
this(internalResponse, scrollId, totalShards, successfulShards, skippedShards, tookInMillis, shardFailures, clusters, null);
this(internalResponse, scrollId, totalShards, successfulShards, skippedShards, tookInMillis, null, shardFailures, clusters, null);
}

public SearchResponse(
SearchResponseSections internalResponse,
String scrollId,
int totalShards,
int successfulShards,
int skippedShards,
long tookInMillis,
ShardSearchFailure[] shardFailures,
Clusters clusters,
String pointInTimeId
) {
this(
internalResponse,
scrollId,
totalShards,
successfulShards,
skippedShards,
tookInMillis,
null,
shardFailures,
clusters,
pointInTimeId
);
}

public SearchResponse(
Expand All @@ -136,6 +170,7 @@ public SearchResponse(
int successfulShards,
int skippedShards,
long tookInMillis,
PhaseTook phaseTook,
ShardSearchFailure[] shardFailures,
Clusters clusters,
String pointInTimeId
Expand All @@ -148,6 +183,7 @@ public SearchResponse(
this.successfulShards = successfulShards;
this.skippedShards = skippedShards;
this.tookInMillis = tookInMillis;
this.phaseTook = phaseTook;
this.shardFailures = shardFailures;
assert skippedShards <= totalShards : "skipped: " + skippedShards + " total: " + totalShards;
assert scrollId == null || pointInTimeId == null : "SearchResponse can't have both scrollId ["
Expand Down Expand Up @@ -210,6 +246,13 @@ public TimeValue getTook() {
return new TimeValue(tookInMillis);
}

/**
* How long the request took in each search phase.
*/
public PhaseTook getPhaseTook() {
return phaseTook;
}

/**
* The total number of shards the search was executed on.
*/
Expand Down Expand Up @@ -298,6 +341,9 @@ public XContentBuilder innerToXContent(XContentBuilder builder, Params params) t
builder.field(POINT_IN_TIME_ID.getPreferredName(), pointInTimeId);
}
builder.field(TOOK.getPreferredName(), tookInMillis);
if (phaseTook != null) {
phaseTook.toXContent(builder, params);
}
builder.field(TIMED_OUT.getPreferredName(), isTimedOut());
if (isTerminatedEarly() != null) {
builder.field(TERMINATED_EARLY.getPreferredName(), isTerminatedEarly());
Expand Down Expand Up @@ -337,6 +383,7 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE
Boolean terminatedEarly = null;
int numReducePhases = 1;
long tookInMillis = -1;
PhaseTook phaseTook = null;
int successfulShards = -1;
int totalShards = -1;
int skippedShards = 0; // 0 for BWC
Expand Down Expand Up @@ -401,6 +448,24 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE
parser.skipChildren();
}
}
} else if (PhaseTook.PHASE_TOOK.match(currentFieldName, parser.getDeprecationHandler())) {
Map<String, Long> phaseTookMap = new HashMap<>();

while ((token = parser.nextToken()) != Token.END_OBJECT) {
if (token == Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
try {
SearchPhaseName.valueOf(currentFieldName.toUpperCase(Locale.ROOT));
phaseTookMap.put(currentFieldName, parser.longValue());
} catch (final IllegalArgumentException ex) {
parser.skipChildren();
}
} else {
parser.skipChildren();
}
}
phaseTook = new PhaseTook(phaseTookMap);
} else if (Clusters._CLUSTERS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
int successful = -1;
int total = -1;
Expand Down Expand Up @@ -472,6 +537,7 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE
successfulShards,
skippedShards,
tookInMillis,
phaseTook,
failures.toArray(ShardSearchFailure.EMPTY_ARRAY),
clusters,
searchContextId
Expand All @@ -491,6 +557,9 @@ public void writeTo(StreamOutput out) throws IOException {
clusters.writeTo(out);
out.writeOptionalString(scrollId);
out.writeVLong(tookInMillis);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeOptionalWriteable(phaseTook);
}
out.writeVInt(skippedShards);
out.writeOptionalString(pointInTimeId);
}
Expand Down Expand Up @@ -604,6 +673,67 @@ public String toString() {
}
}

/**
* Holds info about the clusters that the search was executed on: how many in total, how many of them were successful
* and how many of them were skipped.
*
* @opensearch.internal
*/
public static class PhaseTook implements ToXContentFragment, Writeable {
static final ParseField PHASE_TOOK = new ParseField("phase_took");
private final Map<String, Long> phaseTookMap;

public PhaseTook(Map<String, Long> phaseTookMap) {
this.phaseTookMap = phaseTookMap;
}

private PhaseTook(StreamInput in) throws IOException {
this(in.readMap(StreamInput::readString, StreamInput::readLong));
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeMap(phaseTookMap, StreamOutput::writeString, StreamOutput::writeLong);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(PHASE_TOOK.getPreferredName());

for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) {
if (phaseTookMap.containsKey(searchPhaseName.getName())) {
builder.field(searchPhaseName.getName(), phaseTookMap.get(searchPhaseName.getName()));
} else {
builder.field(searchPhaseName.getName(), 0);
}
}
builder.endObject();
return builder;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
dzane17 marked this conversation as resolved.
Show resolved Hide resolved
}
if (o == null || getClass() != o.getClass()) {
return false;
}
PhaseTook phaseTook = (PhaseTook) o;

if (phaseTook.phaseTookMap.equals(phaseTookMap)) {
return true;
} else {
return false;
}
}

@Override
public int hashCode() {
return Objects.hash(phaseTookMap);
}
}

static SearchResponse empty(Supplier<Long> tookInMillisSupplier, Clusters clusters) {
SearchHits searchHits = new SearchHits(new SearchHit[0], new TotalHits(0L, TotalHits.Relation.EQUAL_TO), Float.NaN);
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ SearchResponse getMergedResponse(SearchResponse.Clusters clusters) {
successfulShards,
skippedShards,
tookInMillis,
searchTimeProvider.getPhaseTook(),
shardFailures,
clusters,
null
Expand Down
Loading
Loading