Skip to content

Commit

Permalink
Merge branch '2.x' into 2.x
Browse files Browse the repository at this point in the history
Signed-off-by: Marc Handalian <[email protected]>
  • Loading branch information
mch2 authored Dec 6, 2023
2 parents f52ebc4 + d475227 commit 732597d
Show file tree
Hide file tree
Showing 32 changed files with 1,448 additions and 467 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Admission control] Add Resource usage collector service and resource usage tracker ([#10695](https://github.com/opensearch-project/OpenSearch/pull/10695))
- [Admission control] Add enhancements to FS stats to include read/write time, queue size and IO time ([#10696](https://github.com/opensearch-project/OpenSearch/pull/10696))
- [Remote cluster state] Change file names for remote cluster state ([#10557](https://github.com/opensearch-project/OpenSearch/pull/10557))
- [Search Pipelines] Add request-scoped state shared between processors (and three new processors) ([#9405](https://github.com/opensearch-project/OpenSearch/pull/9405))
- Per request phase latency ([#10351](https://github.com/opensearch-project/OpenSearch/issues/10351))
- [Remote Store] Add repository stats for remote store([#10567](https://github.com/opensearch-project/OpenSearch/pull/10567))
- [Remote cluster state] Upload global metadata in cluster state to remote store([#10404](https://github.com/opensearch-project/OpenSearch/pull/10404))
- [Remote cluster state] Download functionality of global metadata from remote store ([#10535](https://github.com/opensearch-project/OpenSearch/pull/10535))
Expand All @@ -34,7 +36,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Bumps jetty version to 9.4.52.v20230823 to fix GMS-2023-1857 ([#9822](https://github.com/opensearch-project/OpenSearch/pull/9822))
- Bump `netty` from 4.1.99.Final to 4.1.100.Final ([#10564](https://github.com/opensearch-project/OpenSearch/pull/10564))
- Bump Lucene from 9.7.0 to 9.8.0 ([10276](https://github.com/opensearch-project/OpenSearch/pull/10276))
- Bump `commons-io:commons-io` from 2.13.0 to 2.15.0 ([#10294](https://github.com/opensearch-project/OpenSearch/pull/10294), [#11001](https://github.com/opensearch-project/OpenSearch/pull/11001), [#11002](https://github.com/opensearch-project/OpenSearch/pull/11002))
- Bump `commons-io:commons-io` from 2.13.0 to 2.15.1 ([#10294](https://github.com/opensearch-project/OpenSearch/pull/10294), [#11001](https://github.com/opensearch-project/OpenSearch/pull/11001), [#11002](https://github.com/opensearch-project/OpenSearch/pull/11002), [#11446](https://github.com/opensearch-project/OpenSearch/pull/11446))
- Bump `com.google.api.grpc:proto-google-common-protos` from 2.25.0 to 2.25.1 ([#10298](https://github.com/opensearch-project/OpenSearch/pull/10298))
- Bump `de.thetaphi:forbiddenapis` from 3.5.1 to 3.6 ([#10508](https://github.com/opensearch-project/OpenSearch/pull/10508))
- Bump OpenTelemetry from 1.30.1 to 1.31.0 ([#10617](https://github.com/opensearch-project/OpenSearch/pull/10617))
Expand All @@ -55,6 +57,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Bump `actions/setup-java` from 3 to 4 ([#11447](https://github.com/opensearch-project/OpenSearch/pull/11447))
- Bump `org.apache.xmlbeans:xmlbeans` from 5.1.1 to 5.2.0 ([#11448](https://github.com/opensearch-project/OpenSearch/pull/11448))
- Bump `org.apache.maven:maven-model` from 3.9.4 to 3.9.6 ([#11445](https://github.com/opensearch-project/OpenSearch/pull/11445))
- Bump `commons-net:commons-net` from 3.9.0 to 3.10.0 ([#11450](https://github.com/opensearch-project/OpenSearch/pull/11450))
- Bump `org.apache.zookeeper:zookeeper` from 3.9.0 to 3.9.1 ([#10506](https://github.com/opensearch-project/OpenSearch/pull/10506))

### Changed
Expand Down
2 changes: 1 addition & 1 deletion buildSrc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ dependencies {
api 'com.netflix.nebula:nebula-publishing-plugin:20.3.0'
api 'com.netflix.nebula:gradle-info-plugin:12.1.6'
api 'org.apache.rat:apache-rat:0.15'
api 'commons-io:commons-io:2.11.0'
api 'commons-io:commons-io:2.15.1'
api "net.java.dev.jna:jna:5.13.0"
api 'com.github.johnrengelman:shadow:8.1.1'
api 'org.jdom:jdom2:2.0.6.1'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.pipeline.common;

import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;

/**
* Helper for map abstractions passed to scripting processors. Throws {@link UnsupportedOperationException} for almost
* all methods. Subclasses just need to implement get and put.
*/
abstract class BasicMap implements Map<String, Object> {

/**
* No-args constructor.
*/
protected BasicMap() {}

private static final String UNSUPPORTED_OP_ERR = " Method not supported in Search pipeline script";

@Override
public boolean isEmpty() {
throw new UnsupportedOperationException("isEmpty" + UNSUPPORTED_OP_ERR);
}

public int size() {
throw new UnsupportedOperationException("size" + UNSUPPORTED_OP_ERR);
}

public boolean containsKey(Object key) {
return get(key) != null;
}

public boolean containsValue(Object value) {
throw new UnsupportedOperationException("containsValue" + UNSUPPORTED_OP_ERR);
}

public Object remove(Object key) {
throw new UnsupportedOperationException("remove" + UNSUPPORTED_OP_ERR);
}

public void putAll(Map<? extends String, ?> m) {
throw new UnsupportedOperationException("putAll" + UNSUPPORTED_OP_ERR);
}

public void clear() {
throw new UnsupportedOperationException("clear" + UNSUPPORTED_OP_ERR);
}

public Set<String> keySet() {
throw new UnsupportedOperationException("keySet" + UNSUPPORTED_OP_ERR);
}

public Collection<Object> values() {
throw new UnsupportedOperationException("values" + UNSUPPORTED_OP_ERR);
}

public Set<Map.Entry<String, Object>> entrySet() {
throw new UnsupportedOperationException("entrySet" + UNSUPPORTED_OP_ERR);
}

@Override
public Object getOrDefault(Object key, Object defaultValue) {
throw new UnsupportedOperationException("getOrDefault" + UNSUPPORTED_OP_ERR);
}

@Override
public void forEach(BiConsumer<? super String, ? super Object> action) {
throw new UnsupportedOperationException("forEach" + UNSUPPORTED_OP_ERR);
}

@Override
public void replaceAll(BiFunction<? super String, ? super Object, ?> function) {
throw new UnsupportedOperationException("replaceAll" + UNSUPPORTED_OP_ERR);
}

@Override
public Object putIfAbsent(String key, Object value) {
throw new UnsupportedOperationException("putIfAbsent" + UNSUPPORTED_OP_ERR);
}

@Override
public boolean remove(Object key, Object value) {
throw new UnsupportedOperationException("remove" + UNSUPPORTED_OP_ERR);
}

@Override
public boolean replace(String key, Object oldValue, Object newValue) {
throw new UnsupportedOperationException("replace" + UNSUPPORTED_OP_ERR);
}

@Override
public Object replace(String key, Object value) {
throw new UnsupportedOperationException("replace" + UNSUPPORTED_OP_ERR);
}

@Override
public Object computeIfAbsent(String key, Function<? super String, ?> mappingFunction) {
throw new UnsupportedOperationException("computeIfAbsent" + UNSUPPORTED_OP_ERR);
}

@Override
public Object computeIfPresent(String key, BiFunction<? super String, ? super Object, ?> remappingFunction) {
throw new UnsupportedOperationException("computeIfPresent" + UNSUPPORTED_OP_ERR);
}

@Override
public Object compute(String key, BiFunction<? super String, ? super Object, ?> remappingFunction) {
throw new UnsupportedOperationException("compute" + UNSUPPORTED_OP_ERR);
}

@Override
public Object merge(String key, Object value, BiFunction<? super Object, ? super Object, ?> remappingFunction) {
throw new UnsupportedOperationException("merge" + UNSUPPORTED_OP_ERR);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.pipeline.common;

import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.common.document.DocumentField;
import org.opensearch.ingest.ConfigurationUtils;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;
import org.opensearch.search.pipeline.AbstractProcessor;
import org.opensearch.search.pipeline.Processor;
import org.opensearch.search.pipeline.SearchResponseProcessor;
import org.opensearch.search.pipeline.common.helpers.SearchResponseUtil;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
* A simple implementation of field collapsing on search responses. Note that this is not going to work as well as
* field collapsing at the shard level, as implemented with the "collapse" parameter in a search request. Mostly
* just using this to demo the oversample / truncate_hits processors.
*/
public class CollapseResponseProcessor extends AbstractProcessor implements SearchResponseProcessor {
/**
* Key to reference this processor type from a search pipeline.
*/
public static final String TYPE = "collapse";
static final String COLLAPSE_FIELD = "field";
private final String collapseField;

private CollapseResponseProcessor(String tag, String description, boolean ignoreFailure, String collapseField) {
super(tag, description, ignoreFailure);
this.collapseField = Objects.requireNonNull(collapseField);
}

@Override
public String getType() {
return TYPE;
}

@Override
public SearchResponse processResponse(SearchRequest request, SearchResponse response) {

if (response.getHits() != null) {
if (response.getHits().getCollapseField() != null) {
throw new IllegalStateException(
"Cannot collapse on " + collapseField + ". Results already collapsed on " + response.getHits().getCollapseField()
);
}
Map<String, SearchHit> collapsedHits = new LinkedHashMap<>();
List<Object> collapseValues = new ArrayList<>();
for (SearchHit hit : response.getHits()) {
Object fieldValue = null;
DocumentField docField = hit.getFields().get(collapseField);
if (docField != null) {
if (docField.getValues().size() > 1) {
throw new IllegalStateException(
"Failed to collapse " + hit.getId() + ": doc has multiple values for field " + collapseField
);
}
fieldValue = docField.getValues().get(0);
} else if (hit.getSourceAsMap() != null) {
fieldValue = hit.getSourceAsMap().get(collapseField);
}
String fieldValueString;
if (fieldValue == null) {
fieldValueString = "__missing__";
} else {
fieldValueString = fieldValue.toString();
}

// Results are already sorted by sort criterion. Only keep the first hit for each field.
if (collapsedHits.containsKey(fieldValueString) == false) {
collapsedHits.put(fieldValueString, hit);
collapseValues.add(fieldValue);
}
}
SearchHit[] newHits = new SearchHit[collapsedHits.size()];
int i = 0;
for (SearchHit collapsedHit : collapsedHits.values()) {
newHits[i++] = collapsedHit;
}
SearchHits searchHits = new SearchHits(
newHits,
response.getHits().getTotalHits(),
response.getHits().getMaxScore(),
response.getHits().getSortFields(),
collapseField,
collapseValues.toArray()
);
return SearchResponseUtil.replaceHits(searchHits, response);
}
return response;
}

static class Factory implements Processor.Factory<SearchResponseProcessor> {

@Override
public CollapseResponseProcessor create(
Map<String, Processor.Factory<SearchResponseProcessor>> processorFactories,
String tag,
String description,
boolean ignoreFailure,
Map<String, Object> config,
PipelineContext pipelineContext
) {
String collapseField = ConfigurationUtils.readStringProperty(TYPE, tag, config, COLLAPSE_FIELD);
return new CollapseResponseProcessor(tag, description, ignoreFailure, collapseField);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.pipeline.common;

import org.opensearch.action.search.SearchRequest;
import org.opensearch.ingest.ConfigurationUtils;
import org.opensearch.search.SearchService;
import org.opensearch.search.pipeline.AbstractProcessor;
import org.opensearch.search.pipeline.PipelineProcessingContext;
import org.opensearch.search.pipeline.Processor;
import org.opensearch.search.pipeline.SearchRequestProcessor;
import org.opensearch.search.pipeline.StatefulSearchRequestProcessor;
import org.opensearch.search.pipeline.common.helpers.ContextUtils;

import java.util.Map;

import static org.opensearch.search.pipeline.common.helpers.ContextUtils.applyContextPrefix;

/**
* Multiplies the "size" parameter on the {@link SearchRequest} by the given scaling factor, storing the original value
* in the request context as "original_size".
*/
public class OversampleRequestProcessor extends AbstractProcessor implements StatefulSearchRequestProcessor {

/**
* Key to reference this processor type from a search pipeline.
*/
public static final String TYPE = "oversample";
static final String SAMPLE_FACTOR = "sample_factor";
static final String ORIGINAL_SIZE = "original_size";
private final double sampleFactor;
private final String contextPrefix;

private OversampleRequestProcessor(String tag, String description, boolean ignoreFailure, double sampleFactor, String contextPrefix) {
super(tag, description, ignoreFailure);
this.sampleFactor = sampleFactor;
this.contextPrefix = contextPrefix;
}

@Override
public SearchRequest processRequest(SearchRequest request, PipelineProcessingContext requestContext) {
if (request.source() != null) {
int originalSize = request.source().size();
if (originalSize == -1) {
originalSize = SearchService.DEFAULT_SIZE;
}
requestContext.setAttribute(applyContextPrefix(contextPrefix, ORIGINAL_SIZE), originalSize);
int newSize = (int) Math.ceil(originalSize * sampleFactor);
request.source().size(newSize);
}
return request;
}

@Override
public String getType() {
return TYPE;
}

static class Factory implements Processor.Factory<SearchRequestProcessor> {
@Override
public OversampleRequestProcessor create(
Map<String, Processor.Factory<SearchRequestProcessor>> processorFactories,
String tag,
String description,
boolean ignoreFailure,
Map<String, Object> config,
PipelineContext pipelineContext
) {
double sampleFactor = ConfigurationUtils.readDoubleProperty(TYPE, tag, config, SAMPLE_FACTOR);
if (sampleFactor < 1.0) {
throw ConfigurationUtils.newConfigurationException(TYPE, tag, SAMPLE_FACTOR, "Value must be >= 1.0");
}
String contextPrefix = ConfigurationUtils.readOptionalStringProperty(TYPE, tag, config, ContextUtils.CONTEXT_PREFIX_PARAMETER);
return new OversampleRequestProcessor(tag, description, ignoreFailure, sampleFactor, contextPrefix);
}
}
}
Loading

0 comments on commit 732597d

Please sign in to comment.