Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add verbose pipeline parameter to output each processor's execution details #16843

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add verbose pipeline parameter to output each processor's execution d…
…etails

Signed-off-by: Junwei Dai <[email protected]>
Junwei Dai committed Dec 13, 2024
commit 1c3b9461dc1eb2add5c7ccc168ee2bc444f43bc4
Original file line number Diff line number Diff line change
@@ -59,6 +59,7 @@
import org.opensearch.search.aggregations.Aggregations;
import org.opensearch.search.aggregations.InternalAggregations;
import org.opensearch.search.internal.InternalSearchResponse;
import org.opensearch.search.pipeline.ProcessorExecutionDetail;
import org.opensearch.search.profile.ProfileShardResult;
import org.opensearch.search.profile.SearchProfileShardResults;
import org.opensearch.search.suggest.Suggest;
@@ -394,6 +395,7 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE
List<ShardSearchFailure> failures = new ArrayList<>();
Clusters clusters = Clusters.EMPTY;
List<SearchExtBuilder> extBuilders = new ArrayList<>();
List<ProcessorExecutionDetail> processorResult = new ArrayList<>();
for (Token token = parser.nextToken(); token != Token.END_OBJECT; token = parser.nextToken()) {
if (token == Token.FIELD_NAME) {
currentFieldName = parser.currentName();
@@ -530,7 +532,8 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE
terminatedEarly,
profile,
numReducePhases,
extBuilders
extBuilders,
processorResult
);
return new SearchResponse(
searchResponseSections,
Original file line number Diff line number Diff line change
@@ -40,6 +40,7 @@
import org.opensearch.search.SearchExtBuilder;
import org.opensearch.search.SearchHits;
import org.opensearch.search.aggregations.Aggregations;
import org.opensearch.search.pipeline.ProcessorExecutionDetail;
import org.opensearch.search.profile.ProfileShardResult;
import org.opensearch.search.profile.SearchProfileShardResults;
import org.opensearch.search.suggest.Suggest;
@@ -65,7 +66,6 @@
public class SearchResponseSections implements ToXContentFragment {

public static final ParseField EXT_FIELD = new ParseField("ext");

protected final SearchHits hits;
protected final Aggregations aggregations;
protected final Suggest suggest;
@@ -74,6 +74,7 @@ public class SearchResponseSections implements ToXContentFragment {
protected final Boolean terminatedEarly;
protected final int numReducePhases;
protected final List<SearchExtBuilder> searchExtBuilders = new ArrayList<>();
protected final List<ProcessorExecutionDetail> processorResult = new ArrayList<>();

public SearchResponseSections(
SearchHits hits,
@@ -84,7 +85,17 @@ public SearchResponseSections(
SearchProfileShardResults profileResults,
int numReducePhases
) {
this(hits, aggregations, suggest, timedOut, terminatedEarly, profileResults, numReducePhases, Collections.emptyList());
this(
hits,
aggregations,
suggest,
timedOut,
terminatedEarly,
profileResults,
numReducePhases,
Collections.emptyList(),
Collections.emptyList()
);
}

public SearchResponseSections(
@@ -95,7 +106,8 @@ public SearchResponseSections(
Boolean terminatedEarly,
SearchProfileShardResults profileResults,
int numReducePhases,
List<SearchExtBuilder> searchExtBuilders
List<SearchExtBuilder> searchExtBuilders,
List<ProcessorExecutionDetail> processorResult
) {
this.hits = hits;
this.aggregations = aggregations;
@@ -104,6 +116,7 @@ public SearchResponseSections(
this.timedOut = timedOut;
this.terminatedEarly = terminatedEarly;
this.numReducePhases = numReducePhases;
this.processorResult.addAll(processorResult);
this.searchExtBuilders.addAll(Objects.requireNonNull(searchExtBuilders, "searchExtBuilders must not be null"));
}

@@ -166,13 +179,21 @@ public final XContentBuilder toXContent(XContentBuilder builder, Params params)
}
builder.endObject();
}

if (!processorResult.isEmpty()) {
builder.field("processor_result", processorResult);
}
return builder;
}

public List<SearchExtBuilder> getSearchExtBuilders() {
return Collections.unmodifiableList(this.searchExtBuilders);
}

public List<ProcessorExecutionDetail> getProcessorResult() {
return processorResult;
}

protected void writeTo(StreamOutput out) throws IOException {
throw new UnsupportedOperationException();
}
Original file line number Diff line number Diff line change
@@ -256,6 +256,9 @@ private static void parseSearchSource(final SearchSourceBuilder searchSourceBuil
if (request.hasParam("timeout")) {
searchSourceBuilder.timeout(request.paramAsTime("timeout", null));
}
if (request.hasParam("verbose_pipeline")) {
searchSourceBuilder.verbosePipeline(request.paramAsBoolean("verbose_pipeline", false));
}
if (request.hasParam("terminate_after")) {
int terminateAfter = request.paramAsInt("terminate_after", SearchContext.DEFAULT_TERMINATE_AFTER);
if (terminateAfter < 0) {
17 changes: 17 additions & 0 deletions server/src/main/java/org/opensearch/search/SearchHits.java
Original file line number Diff line number Diff line change
@@ -37,6 +37,7 @@
import org.apache.lucene.search.TotalHits.Relation;
import org.opensearch.common.Nullable;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
@@ -166,6 +167,22 @@ public SearchHit[] getHits() {
return this.hits;
}

/**
* Creates a deep copy of this SearchHits instance.
*
* @return a deep copy of the current SearchHits object
* @throws IOException if an I/O exception occurs during serialization or deserialization
*/
public SearchHits deepCopy() throws IOException {
junweid62 marked this conversation as resolved.
Show resolved Hide resolved
try (BytesStreamOutput out = new BytesStreamOutput()) {
this.writeTo(out);

try (StreamInput in = out.bytes().streamInput()) {
return new SearchHits(in);
}
}
}

/**
* Return the hit as the provided position.
*/
Original file line number Diff line number Diff line change
@@ -136,6 +136,7 @@ public final class SearchSourceBuilder implements Writeable, ToXContentObject, R
public static final ParseField SLICE = new ParseField("slice");
public static final ParseField POINT_IN_TIME = new ParseField("pit");
public static final ParseField SEARCH_PIPELINE = new ParseField("search_pipeline");
public static final ParseField VERBOSE_SEARCH_PIPELINE = new ParseField("verbose_pipeline");

public static SearchSourceBuilder fromXContent(XContentParser parser) throws IOException {
return fromXContent(parser, true);
@@ -226,6 +227,8 @@ public static HighlightBuilder highlight() {

private String searchPipeline;

private boolean verbosePipeline;

/**
* Constructs a new search source builder.
*/
@@ -301,6 +304,7 @@ public SearchSourceBuilder(StreamInput in) throws IOException {
}
if (in.getVersion().onOrAfter(Version.V_2_18_0)) {
searchPipeline = in.readOptionalString();
verbosePipeline = in.readBoolean();
}
}

@@ -384,6 +388,7 @@ public void writeTo(StreamOutput out) throws IOException {
}
if (out.getVersion().onOrAfter(Version.V_2_18_0)) {
out.writeOptionalString(searchPipeline);
out.writeOptionalBoolean(verbosePipeline);
}
}

@@ -1142,6 +1147,26 @@ public SearchSourceBuilder pipeline(String searchPipeline) {
return this;
}

/**
* Enables or disables verbose mode for the search pipeline.
*
* When verbose mode is enabled, detailed information about each processor
* in the search pipeline is included in the search response. This includes
* the processor name, execution status, input, output, and time taken for processing.
*
* This parameter is primarily intended for debugging purposes, allowing users
* to track how data flows and transforms through the search pipeline.
*
*/
public SearchSourceBuilder verbosePipeline(boolean verbosePipeline) {
this.verbosePipeline = verbosePipeline;
return this;
}

public Boolean verbosePipeline() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why return type is a wrapper around primitive Boolean?

return verbosePipeline;
}

/**
* Rewrites this search source builder into its primitive form. e.g. by
* rewriting the QueryBuilder. If the builder did not change the identity
@@ -1240,6 +1265,7 @@ private SearchSourceBuilder shallowCopy(
rewrittenBuilder.derivedFieldsObject = derivedFieldsObject;
rewrittenBuilder.derivedFields = derivedFields;
rewrittenBuilder.searchPipeline = searchPipeline;
rewrittenBuilder.verbosePipeline = verbosePipeline;
return rewrittenBuilder;
}

@@ -1309,6 +1335,8 @@ public void parseXContent(XContentParser parser, boolean checkTrailingTokens) th
profile = parser.booleanValue();
} else if (SEARCH_PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) {
searchPipeline = parser.text();
} else if (VERBOSE_SEARCH_PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) {
verbosePipeline = parser.booleanValue();
} else {
throw new ParsingException(
parser.getTokenLocation(),
@@ -1920,7 +1948,8 @@ public int hashCode() {
pointInTimeBuilder,
derivedFieldsObject,
derivedFields,
searchPipeline
searchPipeline,
verbosePipeline
);
}

@@ -1966,7 +1995,8 @@ public boolean equals(Object obj) {
&& Objects.equals(pointInTimeBuilder, other.pointInTimeBuilder)
&& Objects.equals(derivedFieldsObject, other.derivedFieldsObject)
&& Objects.equals(derivedFields, other.derivedFields)
&& Objects.equals(searchPipeline, other.searchPipeline);
&& Objects.equals(searchPipeline, other.searchPipeline)
&& Objects.equals(verbosePipeline, other.verbosePipeline);
}

@Override
Original file line number Diff line number Diff line change
@@ -42,6 +42,7 @@
import org.opensearch.search.SearchExtBuilder;
import org.opensearch.search.SearchHits;
import org.opensearch.search.aggregations.InternalAggregations;
import org.opensearch.search.pipeline.ProcessorExecutionDetail;
import org.opensearch.search.profile.SearchProfileShardResults;
import org.opensearch.search.suggest.Suggest;

@@ -73,7 +74,17 @@ public InternalSearchResponse(
Boolean terminatedEarly,
int numReducePhases
) {
this(hits, aggregations, suggest, profileResults, timedOut, terminatedEarly, numReducePhases, Collections.emptyList());
this(
hits,
aggregations,
suggest,
profileResults,
timedOut,
terminatedEarly,
numReducePhases,
Collections.emptyList(),
Collections.emptyList()
);
}

public InternalSearchResponse(
@@ -84,9 +95,20 @@ public InternalSearchResponse(
boolean timedOut,
Boolean terminatedEarly,
int numReducePhases,
List<SearchExtBuilder> searchExtBuilderList
List<SearchExtBuilder> searchExtBuilderList,
List<ProcessorExecutionDetail> processorResult
) {
super(hits, aggregations, suggest, timedOut, terminatedEarly, profileResults, numReducePhases, searchExtBuilderList);
super(
hits,
aggregations,
suggest,
timedOut,
terminatedEarly,
profileResults,
numReducePhases,
searchExtBuilderList,
processorResult
);
}

public InternalSearchResponse(StreamInput in) throws IOException {
@@ -98,7 +120,8 @@ public InternalSearchResponse(StreamInput in) throws IOException {
in.readOptionalBoolean(),
in.readOptionalWriteable(SearchProfileShardResults::new),
in.readVInt(),
readSearchExtBuildersOnOrAfter(in)
readSearchExtBuildersOnOrAfter(in),
readProcessorResultOnOrAfter(in)
);
}

@@ -112,6 +135,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(profileResults);
out.writeVInt(numReducePhases);
writeSearchExtBuildersOnOrAfter(out, searchExtBuilders);
writeProcessorResultOnOrAfter(out, processorResult);
}

private static List<SearchExtBuilder> readSearchExtBuildersOnOrAfter(StreamInput in) throws IOException {
@@ -123,4 +147,15 @@ private static void writeSearchExtBuildersOnOrAfter(StreamOutput out, List<Searc
out.writeNamedWriteableList(searchExtBuilders);
}
}

private static List<ProcessorExecutionDetail> readProcessorResultOnOrAfter(StreamInput in) throws IOException {
return (in.getVersion().onOrAfter(Version.V_2_18_0)) ? in.readList(ProcessorExecutionDetail::new) : Collections.emptyList();
}

private static void writeProcessorResultOnOrAfter(StreamOutput out, List<ProcessorExecutionDetail> processorResult) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_18_0)) {
out.writeCollection(processorResult, (o, detail) -> detail.writeTo(o));
junweid62 marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to Josh's comment, please use out.writeList()

}
}

}
Loading