Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make search pipelines asynchronous #10598

Merged
merged 3 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Mute the query profile IT with concurrent execution ([#9840](https://github.com/opensearch-project/OpenSearch/pull/9840))
- Force merge with `only_expunge_deletes` honors max segment size ([#10036](https://github.com/opensearch-project/OpenSearch/pull/10036))
- Add the means to extract the contextual properties from HttpChannel, TcpCChannel and TrasportChannel without excessive typecasting ([#10562](https://github.com/opensearch-project/OpenSearch/pull/10562))
- Search pipelines now support asynchronous request and response processors to avoid blocking on a transport thread ([#10598](https://github.com/opensearch-project/OpenSearch/pull/10598))
- [Remote Store] Add Remote Store backpressure rejection stats to `_nodes/stats` ([#10524](https://github.com/opensearch-project/OpenSearch/pull/10524))
- [BUG] Fix java.lang.SecurityException in repository-gcs plugin ([#10642](https://github.com/opensearch-project/OpenSearch/pull/10642))
- Add telemetry tracer/metric enable flag and integ test. ([#10395](https://github.com/opensearch-project/OpenSearch/pull/10395))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,24 +506,51 @@
ActionListener<SearchResponse> listener;
try {
searchRequest = searchPipelineService.resolvePipeline(originalSearchRequest);
listener = ActionListener.wrap(
r -> originalListener.onResponse(searchRequest.transformResponse(r)),
originalListener::onFailure
);
listener = searchRequest.transformResponseListener(originalListener);
} catch (Exception e) {
originalListener.onFailure(e);
return;
}

if (searchQueryMetricsEnabled) {
try {
searchQueryCategorizer.categorize(searchRequest.source());
} catch (Exception e) {
logger.error("Error while trying to categorize the query.", e);
ActionListener<SearchRequest> requestTransformListener = ActionListener.wrap(sr -> {
if (searchQueryMetricsEnabled) {
try {
searchQueryCategorizer.categorize(sr.source());
} catch (Exception e) {
logger.error("Error while trying to categorize the query.", e);
}

Check warning on line 521 in server/src/main/java/org/opensearch/action/search/TransportSearchAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/search/TransportSearchAction.java#L518-L521

Added lines #L518 - L521 were not covered by tests
}
}

ActionListener<SearchSourceBuilder> rewriteListener = ActionListener.wrap(source -> {
ActionListener<SearchSourceBuilder> rewriteListener = buildRewriteListener(
sr,
task,
timeProvider,
searchAsyncActionProvider,
listener,
searchRequestOperationsListener
);
if (sr.source() == null) {
rewriteListener.onResponse(sr.source());
} else {
Rewriteable.rewriteAndFetch(
sr.source(),
searchService.getRewriteContext(timeProvider::getAbsoluteStartMillis),
rewriteListener
);
}
}, listener::onFailure);
searchRequest.transformRequest(requestTransformListener);
}

private ActionListener<SearchSourceBuilder> buildRewriteListener(
SearchRequest searchRequest,
Task task,
SearchTimeProvider timeProvider,
SearchAsyncActionProvider searchAsyncActionProvider,
ActionListener<SearchResponse> listener,
SearchRequestOperationsListener searchRequestOperationsListener
) {
return ActionListener.wrap(source -> {
if (source != searchRequest.source()) {
// only set it if it changed - we don't allow null values to be set but it might be already null. this way we catch
// situations when source is rewritten to null due to a bug
Expand Down Expand Up @@ -634,15 +661,6 @@
}
}
}, listener::onFailure);
if (searchRequest.source() == null) {
rewriteListener.onResponse(searchRequest.source());
} else {
Rewriteable.rewriteAndFetch(
searchRequest.source(),
searchService.getRewriteContext(timeProvider::getAbsoluteStartMillis),
rewriteListener
);
}
}

static boolean shouldMinimizeRoundtrips(SearchRequest searchRequest) {
Expand Down
206 changes: 127 additions & 79 deletions server/src/main/java/org/opensearch/search/pipeline/Pipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.common.Nullable;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.NamedWriteableAwareStreamInput;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.search.SearchPhaseResult;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -117,92 +119,138 @@

protected void onResponseProcessorFailed(Processor processor) {}

SearchRequest transformRequest(SearchRequest request) throws SearchPipelineProcessingException {
if (searchRequestProcessors.isEmpty() == false) {
long pipelineStart = relativeTimeSupplier.getAsLong();
beforeTransformRequest();
try {
try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) {
request.writeTo(bytesStreamOutput);
try (StreamInput in = bytesStreamOutput.bytes().streamInput()) {
try (StreamInput input = new NamedWriteableAwareStreamInput(in, namedWriteableRegistry)) {
request = new SearchRequest(input);
}
}
}
for (SearchRequestProcessor processor : searchRequestProcessors) {
beforeRequestProcessor(processor);
long start = relativeTimeSupplier.getAsLong();
try {
request = processor.processRequest(request);
} catch (Exception e) {
onRequestProcessorFailed(processor);
if (processor.isIgnoreFailure()) {
logger.warn(
"The exception from request processor ["
+ processor.getType()
+ "] in the search pipeline ["
+ id
+ "] was ignored",
e
);
} else {
throw e;
}
} finally {
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start);
afterRequestProcessor(processor, took);
}
void transformRequest(SearchRequest request, ActionListener<SearchRequest> requestListener) throws SearchPipelineProcessingException {
if (searchRequestProcessors.isEmpty()) {
requestListener.onResponse(request);
return;
}

try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) {
request.writeTo(bytesStreamOutput);
try (StreamInput in = bytesStreamOutput.bytes().streamInput()) {
try (StreamInput input = new NamedWriteableAwareStreamInput(in, namedWriteableRegistry)) {
request = new SearchRequest(input);
}
} catch (Exception e) {
onTransformRequestFailure();
throw new SearchPipelineProcessingException(e);
} finally {
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart);
afterTransformRequest(took);
}
} catch (IOException e) {
requestListener.onFailure(new SearchPipelineProcessingException(e));
return;

Check warning on line 137 in server/src/main/java/org/opensearch/search/pipeline/Pipeline.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/pipeline/Pipeline.java#L135-L137

Added lines #L135 - L137 were not covered by tests
}
return request;

ActionListener<SearchRequest> finalListener = getTerminalSearchRequestActionListener(requestListener);

// Chain listeners back-to-front
ActionListener<SearchRequest> currentListener = finalListener;
for (int i = searchRequestProcessors.size() - 1; i >= 0; i--) {
final ActionListener<SearchRequest> nextListener = currentListener;
SearchRequestProcessor processor = searchRequestProcessors.get(i);
currentListener = ActionListener.wrap(r -> {
long start = relativeTimeSupplier.getAsLong();
beforeRequestProcessor(processor);
processor.processRequestAsync(r, ActionListener.wrap(rr -> {
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start);
afterRequestProcessor(processor, took);
nextListener.onResponse(rr);
}, e -> {
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start);
afterRequestProcessor(processor, took);
onRequestProcessorFailed(processor);
if (processor.isIgnoreFailure()) {
logger.warn(
"The exception from request processor ["
+ processor.getType()
+ "] in the search pipeline ["
+ id
+ "] was ignored",
e
);
nextListener.onResponse(r);
} else {
nextListener.onFailure(new SearchPipelineProcessingException(e));
}
}));
}, finalListener::onFailure);
}

beforeTransformRequest();
currentListener.onResponse(request);
}

SearchResponse transformResponse(SearchRequest request, SearchResponse response) throws SearchPipelineProcessingException {
if (searchResponseProcessors.isEmpty() == false) {
long pipelineStart = relativeTimeSupplier.getAsLong();
beforeTransformResponse();
try {
for (SearchResponseProcessor processor : searchResponseProcessors) {
beforeResponseProcessor(processor);
long start = relativeTimeSupplier.getAsLong();
try {
response = processor.processResponse(request, response);
} catch (Exception e) {
onResponseProcessorFailed(processor);
if (processor.isIgnoreFailure()) {
logger.warn(
"The exception from response processor ["
+ processor.getType()
+ "] in the search pipeline ["
+ id
+ "] was ignored",
e
);
} else {
throw e;
}
} finally {
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start);
afterResponseProcessor(processor, took);
private ActionListener<SearchRequest> getTerminalSearchRequestActionListener(ActionListener<SearchRequest> requestListener) {
final long pipelineStart = relativeTimeSupplier.getAsLong();

return ActionListener.wrap(r -> {
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart);
afterTransformRequest(took);
requestListener.onResponse(new PipelinedRequest(this, r));
}, e -> {
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart);
afterTransformRequest(took);
onTransformRequestFailure();
requestListener.onFailure(new SearchPipelineProcessingException(e));
});
}

ActionListener<SearchResponse> transformResponseListener(SearchRequest request, ActionListener<SearchResponse> responseListener) {
if (searchResponseProcessors.isEmpty()) {
// No response transformation necessary
return responseListener;
}

long[] pipelineStart = new long[1];

final ActionListener<SearchResponse> originalListener = responseListener;
responseListener = ActionListener.wrap(r -> {
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart[0]);
afterTransformResponse(took);
originalListener.onResponse(r);
}, e -> {
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart[0]);
afterTransformResponse(took);
onTransformResponseFailure();
originalListener.onFailure(e);
});
ActionListener<SearchResponse> finalListener = responseListener; // Jump directly to this one on exception.

for (int i = searchResponseProcessors.size() - 1; i >= 0; i--) {
final ActionListener<SearchResponse> currentFinalListener = responseListener;
final SearchResponseProcessor processor = searchResponseProcessors.get(i);

responseListener = ActionListener.wrap(r -> {
beforeResponseProcessor(processor);
msfroh marked this conversation as resolved.
Show resolved Hide resolved
final long start = relativeTimeSupplier.getAsLong();
processor.processResponseAsync(request, r, ActionListener.wrap(rr -> {
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start);
afterResponseProcessor(processor, took);
currentFinalListener.onResponse(rr);
}, e -> {
onResponseProcessorFailed(processor);
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start);
afterResponseProcessor(processor, took);
if (processor.isIgnoreFailure()) {
logger.warn(
"The exception from response processor ["
+ processor.getType()
+ "] in the search pipeline ["
+ id
+ "] was ignored",
e
);
// Pass the previous response through to the next processor in the chain
currentFinalListener.onResponse(r);
} else {
currentFinalListener.onFailure(new SearchPipelineProcessingException(e));
}
}
} catch (Exception e) {
onTransformResponseFailure();
throw new SearchPipelineProcessingException(e);
} finally {
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart);
afterTransformResponse(took);
}
}));
}, finalListener::onFailure);
}
return response;
final ActionListener<SearchResponse> chainListener = responseListener;
return ActionListener.wrap(r -> {
beforeTransformResponse();
pipelineStart[0] = relativeTimeSupplier.getAsLong();
chainListener.onResponse(r);
}, originalListener::onFailure);

}

<Result extends SearchPhaseResult> void runSearchPhaseResultsTransformer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.action.search.SearchPhaseResults;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.core.action.ActionListener;
import org.opensearch.search.SearchPhaseResult;

/**
Expand All @@ -27,8 +28,12 @@ public final class PipelinedRequest extends SearchRequest {
this.pipeline = pipeline;
}

public SearchResponse transformResponse(SearchResponse response) {
return pipeline.transformResponse(this, response);
public void transformRequest(ActionListener<SearchRequest> requestListener) {
pipeline.transformRequest(this, requestListener);
}

public ActionListener<SearchResponse> transformResponseListener(ActionListener<SearchResponse> responseListener) {
return pipeline.transformResponseListener(this, responseListener);
}

public <Result extends SearchPhaseResult> void transformSearchPhaseResults(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,8 +408,7 @@ public PipelinedRequest resolvePipeline(SearchRequest searchRequest) {
pipeline = pipelineHolder.pipeline;
}
}
SearchRequest transformedRequest = pipeline.transformRequest(searchRequest);
return new PipelinedRequest(pipeline, transformedRequest);
return new PipelinedRequest(pipeline, searchRequest);
}

Map<String, Processor.Factory<SearchRequestProcessor>> getRequestProcessorFactories() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,37 @@
package org.opensearch.search.pipeline;

import org.opensearch.action.search.SearchRequest;
import org.opensearch.core.action.ActionListener;

/**
* Interface for a search pipeline processor that modifies a search request.
*/
public interface SearchRequestProcessor extends Processor {

/**
* Transform a {@link SearchRequest}. Executed on the coordinator node before any {@link org.opensearch.action.search.SearchPhase}
* executes.
* <p>
* Implement this method if the processor makes no asynchronous calls.
* @param request the executed {@link SearchRequest}
* @return a new {@link SearchRequest} (or the input {@link SearchRequest} if no changes)
* @throws Exception if an error occurs during processing
*/
SearchRequest processRequest(SearchRequest request) throws Exception;

/**
* Transform a {@link SearchRequest}. Executed on the coordinator node before any {@link org.opensearch.action.search.SearchPhase}
* executes.
* <p>
* Expert method: Implement this if the processor needs to make asynchronous calls. Otherwise, implement processRequest.
* @param request the executed {@link SearchRequest}
* @param requestListener callback to be invoked on successful processing or on failure
*/
default void processRequestAsync(SearchRequest request, ActionListener<SearchRequest> requestListener) {
try {
requestListener.onResponse(processRequest(request));
} catch (Exception e) {
requestListener.onFailure(e);
}
}
}
Loading
Loading