Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Add SortResponseProcessor to Search Pipelines (#14785) #14868

Merged
merged 1 commit into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Print reason why parent task was cancelled ([#14604](https://github.com/opensearch-project/OpenSearch/issues/14604))
- Add matchesPluginSystemIndexPattern to SystemIndexRegistry ([#14750](https://github.com/opensearch-project/OpenSearch/pull/14750))
- Add Plugin interface for loading application based configuration templates (([#14659](https://github.com/opensearch-project/OpenSearch/issues/14659)))
- Refactor remote-routing-table service inline with remote state interfaces([#14668](https://github.com/opensearch-project/OpenSearch/pull/14668))
- Add SortResponseProcessor to Search Pipelines (([#14785](https://github.com/opensearch-project/OpenSearch/issues/14785)))
- Add prefix mode verification setting for repository verification (([#14790](https://github.com/opensearch-project/OpenSearch/pull/14790)))
- Add SplitResponseProcessor to Search Pipelines (([#14800](https://github.com/opensearch-project/OpenSearch/issues/14800)))
- Optimize TransportNodesAction to not send DiscoveryNodes for NodeStats, NodesInfo and ClusterStats call ([14749](https://github.com/opensearch-project/OpenSearch/pull/14749))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ public Map<String, Processor.Factory<SearchResponseProcessor>> getResponseProces
new TruncateHitsResponseProcessor.Factory(),
CollapseResponseProcessor.TYPE,
new CollapseResponseProcessor.Factory(),
SplitResponseProcessor.TYPE,
new SplitResponseProcessor.Factory()
SortResponseProcessor.TYPE,
new SortResponseProcessor.Factory()
)
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.pipeline.common;

import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.document.DocumentField;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.xcontent.MediaType;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.ingest.ConfigurationUtils;
import org.opensearch.search.SearchHit;
import org.opensearch.search.pipeline.AbstractProcessor;
import org.opensearch.search.pipeline.Processor;
import org.opensearch.search.pipeline.SearchResponseProcessor;

import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

/**
* Processor that sorts an array of items.
* Throws exception is the specified field is not an array.
*/
public class SortResponseProcessor extends AbstractProcessor implements SearchResponseProcessor {
/** Key to reference this processor type from a search pipeline. */
public static final String TYPE = "sort";
/** Key defining the array field to be sorted. */
public static final String SORT_FIELD = "field";
/** Optional key defining the sort order. */
public static final String SORT_ORDER = "order";
/** Optional key to put the sorted values in a different field. */
public static final String TARGET_FIELD = "target_field";
/** Default sort order if not specified */
public static final String DEFAULT_ORDER = "asc";

/** Enum defining how elements will be sorted */
public enum SortOrder {
/** Sort in ascending (natural) order */
ASCENDING("asc"),
/** Sort in descending (reverse) order */
DESCENDING("desc");

private final String direction;

SortOrder(String direction) {
this.direction = direction;
}

@Override
public String toString() {
return this.direction;
}

/**
* Converts the string representation of the enum value to the enum.
* @param value A string ("asc" or "desc")
* @return the corresponding enum value
*/
public static SortOrder fromString(String value) {
if (value == null) {
throw new IllegalArgumentException("Sort direction cannot be null");
}

if (value.equals(ASCENDING.toString())) {
return ASCENDING;
} else if (value.equals(DESCENDING.toString())) {
return DESCENDING;
}
throw new IllegalArgumentException("Sort direction [" + value + "] not recognized." + " Valid values are: [asc, desc]");
}
}

private final String sortField;
private final SortOrder sortOrder;
private final String targetField;

SortResponseProcessor(
String tag,
String description,
boolean ignoreFailure,
String sortField,
SortOrder sortOrder,
String targetField
) {
super(tag, description, ignoreFailure);
this.sortField = Objects.requireNonNull(sortField);
this.sortOrder = Objects.requireNonNull(sortOrder);
this.targetField = targetField == null ? sortField : targetField;
}

/**
* Getter function for sortField
* @return sortField
*/
public String getSortField() {
return sortField;
}

/**
* Getter function for targetField
* @return targetField
*/
public String getTargetField() {
return targetField;
}

/**
* Getter function for sortOrder
* @return sortOrder
*/
public SortOrder getSortOrder() {
return sortOrder;
}

@Override
public String getType() {
return TYPE;
}

@Override
public SearchResponse processResponse(SearchRequest request, SearchResponse response) throws Exception {
SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
Map<String, DocumentField> fields = hit.getFields();
if (fields.containsKey(sortField)) {
DocumentField docField = hit.getFields().get(sortField);
if (docField == null) {
throw new IllegalArgumentException("field [" + sortField + "] is null, cannot sort.");
}
hit.setDocumentField(targetField, new DocumentField(targetField, getSortedValues(docField.getValues())));
}
if (hit.hasSource()) {
BytesReference sourceRef = hit.getSourceRef();
Tuple<? extends MediaType, Map<String, Object>> typeAndSourceMap = XContentHelper.convertToMap(
sourceRef,
false,
(MediaType) null
);

Map<String, Object> sourceAsMap = typeAndSourceMap.v2();
if (sourceAsMap.containsKey(sortField)) {
Object val = sourceAsMap.get(sortField);
if (val instanceof List) {
@SuppressWarnings("unchecked")
List<Object> listVal = (List<Object>) val;
sourceAsMap.put(targetField, getSortedValues(listVal));
}
XContentBuilder builder = XContentBuilder.builder(typeAndSourceMap.v1().xContent());
builder.map(sourceAsMap);
hit.sourceRef(BytesReference.bytes(builder));
}
}
}
return response;
}

private List<Object> getSortedValues(List<Object> values) {
return values.stream()
.map(this::downcastToComparable)
.sorted(sortOrder.equals(SortOrder.ASCENDING) ? Comparator.naturalOrder() : Comparator.reverseOrder())
.collect(Collectors.toList());
}

@SuppressWarnings("unchecked")
private Comparable<Object> downcastToComparable(Object obj) {
if (obj instanceof Comparable) {
return (Comparable<Object>) obj;
} else if (obj == null) {
throw new IllegalArgumentException("field [" + sortField + "] contains a null value.]");
} else {
throw new IllegalArgumentException("field [" + sortField + "] of type [" + obj.getClass().getName() + "] is not comparable.]");
}
}

static class Factory implements Processor.Factory<SearchResponseProcessor> {

@Override
public SortResponseProcessor create(
Map<String, Processor.Factory<SearchResponseProcessor>> processorFactories,
String tag,
String description,
boolean ignoreFailure,
Map<String, Object> config,
PipelineContext pipelineContext
) {
String sortField = ConfigurationUtils.readStringProperty(TYPE, tag, config, SORT_FIELD);
String targetField = ConfigurationUtils.readStringProperty(TYPE, tag, config, TARGET_FIELD, sortField);
try {
SortOrder sortOrder = SortOrder.fromString(
ConfigurationUtils.readStringProperty(TYPE, tag, config, SORT_ORDER, DEFAULT_ORDER)
);
return new SortResponseProcessor(tag, description, ignoreFailure, sortField, sortOrder, targetField);
} catch (IllegalArgumentException e) {
throw ConfigurationUtils.newConfigurationException(TYPE, tag, SORT_ORDER, e.getMessage());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public SearchResponse processResponse(SearchRequest request, SearchResponse resp
throw new IllegalArgumentException("field [" + splitField + "] is null, cannot split.");
}
Object val = docField.getValue();
if (val == null || !String.class.isAssignableFrom(val.getClass())) {
if (!(val instanceof String)) {
throw new IllegalArgumentException("field [" + splitField + "] is not a string, cannot split");
}
Object[] strings = ((String) val).split(separator, preserveTrailing ? -1 : 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void testAllowlistNotSpecified() throws IOException {
try (SearchPipelineCommonModulePlugin plugin = new SearchPipelineCommonModulePlugin()) {
assertEquals(Set.of("oversample", "filter_query", "script"), plugin.getRequestProcessors(createParameters(settings)).keySet());
assertEquals(
Set.of("rename_field", "truncate_hits", "collapse", "split"),
Set.of("rename_field", "truncate_hits", "collapse", "sort"),
plugin.getResponseProcessors(createParameters(settings)).keySet()
);
assertEquals(Set.of(), plugin.getSearchPhaseResultsProcessors(createParameters(settings)).keySet());
Expand Down
Loading
Loading