diff --git a/CHANGELOG.md b/CHANGELOG.md index 7367bb5b9870b..d27f0fa592868 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,7 +22,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add new configuration setting `synonym_analyzer`, to the `synonym` and `synonym_graph` filters, enabling the specification of a custom analyzer for reading the synonym file ([#16488](https://github.com/opensearch-project/OpenSearch/pull/16488)). - Add stats for remote publication failure and move download failure stats to remote methods([#16682](https://github.com/opensearch-project/OpenSearch/pull/16682/)) - Added a precaution to handle extreme date values during sorting to prevent `arithmetic_exception: long overflow` ([#16812](https://github.com/opensearch-project/OpenSearch/pull/16812)). -- Add `verbose_pipeline` parameter to output each processor's execution details ([#14745](https://github.com/opensearch-project/OpenSearch/pull/14745)). +- Add `verbose_pipeline` parameter to output each processor's execution details ([#16843](https://github.com/opensearch-project/OpenSearch/pull/16843)). - Add search replica stats to segment replication stats API ([#16678](https://github.com/opensearch-project/OpenSearch/pull/16678)) ### Dependencies diff --git a/server/src/main/java/org/opensearch/search/builder/SearchSourceBuilder.java b/server/src/main/java/org/opensearch/search/builder/SearchSourceBuilder.java index 301e195dda672..ebd3a3cb9b92b 100644 --- a/server/src/main/java/org/opensearch/search/builder/SearchSourceBuilder.java +++ b/server/src/main/java/org/opensearch/search/builder/SearchSourceBuilder.java @@ -1167,7 +1167,7 @@ public SearchSourceBuilder verbosePipeline(boolean verbosePipeline) { return this; } - public Boolean verbosePipeline() { + public boolean verbosePipeline() { return verbosePipeline; } diff --git a/server/src/main/java/org/opensearch/search/internal/InternalSearchResponse.java b/server/src/main/java/org/opensearch/search/internal/InternalSearchResponse.java index 532575fe097fa..0d0247542345d 100644 --- a/server/src/main/java/org/opensearch/search/internal/InternalSearchResponse.java +++ b/server/src/main/java/org/opensearch/search/internal/InternalSearchResponse.java @@ -149,12 +149,12 @@ private static void writeSearchExtBuildersOnOrAfter(StreamOutput out, List readProcessorResultOnOrAfter(StreamInput in) throws IOException { - return (in.getVersion().onOrAfter(Version.V_2_18_0)) ? in.readList(ProcessorExecutionDetail::new) : Collections.emptyList(); + return (in.getVersion().onOrAfter(Version.CURRENT)) ? in.readList(ProcessorExecutionDetail::new) : Collections.emptyList(); } private static void writeProcessorResultOnOrAfter(StreamOutput out, List processorResult) throws IOException { - if (out.getVersion().onOrAfter(Version.V_2_18_0)) { - out.writeCollection(processorResult, (o, detail) -> detail.writeTo(o)); + if (out.getVersion().onOrAfter(Version.CURRENT)) { + out.writeList(processorResult); } } diff --git a/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java b/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java index 4600cda4e993a..c0794f42dfc79 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java +++ b/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java @@ -148,15 +148,17 @@ void transformRequest(SearchRequest request, ActionListener reque SearchRequestProcessor processor = searchRequestProcessors.get(i); currentListener = ActionListener.wrap(r -> { ProcessorExecutionDetail detail = new ProcessorExecutionDetail(processor.getType()); - detail.addInput(r.source().shallowCopy()); + if (r.source().verbosePipeline()) { + detail.addInput(r.source().shallowCopy()); + } long start = relativeTimeSupplier.getAsLong(); beforeRequestProcessor(processor); processor.processRequestAsync(r, requestContext, ActionListener.wrap(rr -> { long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start); afterRequestProcessor(processor, took); - detail.addOutput(rr.source().shallowCopy()); - detail.addTook(took); if (rr.source().verbosePipeline()) { + detail.addOutput(rr.source().shallowCopy()); + detail.addTook(took); requestContext.addProcessorExecutionDetail(detail); } nextListener.onResponse(rr); @@ -210,7 +212,7 @@ ActionListener transformResponseListener( ) { if (searchResponseProcessors.isEmpty()) { // No response transformation necessary - if (!requestContext.getProcessorExecutionDetails().isEmpty()) { + if (request.source() != null && request.source().verbosePipeline()) { ActionListener finalResponseListener = responseListener; return ActionListener.wrap(r -> { List details = requestContext.getProcessorExecutionDetails(); @@ -242,15 +244,17 @@ ActionListener transformResponseListener( responseListener = ActionListener.wrap(r -> { ProcessorExecutionDetail detail = new ProcessorExecutionDetail(processor.getType()); - detail.addInput(Arrays.asList(r.getHits().deepCopy().getHits())); + if (request.source().verbosePipeline()) { + detail.addInput(Arrays.asList(r.getHits().deepCopy().getHits())); + } beforeResponseProcessor(processor); final long start = relativeTimeSupplier.getAsLong(); processor.processResponseAsync(request, r, requestContext, ActionListener.wrap(rr -> { long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start); afterResponseProcessor(processor, took); - detail.addOutput(Arrays.asList(rr.getHits().deepCopy().getHits())); - detail.addTook(took); if (request.source().verbosePipeline()) { + detail.addOutput(Arrays.asList(rr.getHits().deepCopy().getHits())); + detail.addTook(took); requestContext.addProcessorExecutionDetail(detail); rr.getInternalResponse().getProcessorResult().add(detail); } diff --git a/server/src/main/java/org/opensearch/search/pipeline/PipelineProcessingContext.java b/server/src/main/java/org/opensearch/search/pipeline/PipelineProcessingContext.java index af30b3cbd3dd6..52f29011725cb 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/PipelineProcessingContext.java +++ b/server/src/main/java/org/opensearch/search/pipeline/PipelineProcessingContext.java @@ -47,10 +47,12 @@ public Object getAttribute(String name) { * * @param detail the ProcessorExecutionDetail to add */ - @SuppressWarnings("unchecked") public void addProcessorExecutionDetail(ProcessorExecutionDetail detail) { - attributes.computeIfAbsent(PROCESSOR_EXECUTION_DETAILS_KEY, k -> new ArrayList()); - List details = (List) attributes.get(PROCESSOR_EXECUTION_DETAILS_KEY); + @SuppressWarnings("unchecked") + List details = (List) attributes.computeIfAbsent( + PROCESSOR_EXECUTION_DETAILS_KEY, + k -> new ArrayList<>() + ); details.add(detail); }