Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrates KNN plugin with ConcurrentSearchRequestDecider interface #2111

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
## [Unreleased 3.0](https://github.com/opensearch-project/k-NN/compare/2.x...HEAD)
### Features
### Enhancements
* Adds concurrent segment search support for mode auto [#2111](https://github.com/opensearch-project/k-NN/pull/2111)
### Bug Fixes
* Add DocValuesProducers for releasing memory when close index [#1946](https://github.com/opensearch-project/k-NN/pull/1946)
### Infrastructure
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/org/opensearch/knn/plugin/KNNPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.index.engine.EngineFactory;
import org.opensearch.indices.SystemIndexDescriptor;
import org.opensearch.knn.index.KNNCircuitBreaker;
import org.opensearch.knn.plugin.search.KNNConcurrentSearchRequestDecider;
import org.opensearch.knn.index.util.KNNClusterUtil;
import org.opensearch.knn.index.query.KNNQueryBuilder;
import org.opensearch.knn.index.KNNSettings;
Expand Down Expand Up @@ -95,6 +96,7 @@
import org.opensearch.script.ScriptContext;
import org.opensearch.script.ScriptEngine;
import org.opensearch.script.ScriptService;
import org.opensearch.search.deciders.ConcurrentSearchRequestDecider;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.FixedExecutorBuilder;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -349,4 +351,9 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
return ImmutableList.of(new SystemIndexDescriptor(MODEL_INDEX_NAME, "Index for storing models used for k-NN indices"));
}

@Override
public Optional<ConcurrentSearchRequestDecider.Factory> getConcurrentSearchRequestDeciderFactory() {
return Optional.of(new KNNConcurrentSearchRequestDecider.Factory());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.plugin.search;

import lombok.EqualsAndHashCode;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.query.KNNQueryBuilder;
import org.opensearch.search.deciders.ConcurrentSearchDecision;
import org.opensearch.search.deciders.ConcurrentSearchRequestDecider;

import java.util.Optional;

/**
* Decides if the knn query uses concurrent segment search
* As of 2.17, this is only used when
* - "index.search.concurrent_segment_search.mode": "auto" or
* - "search.concurrent_segment_search.mode": "auto"
*
* Note: the class is not thread-safe and a new instance needs to be created for each request
jmazanec15 marked this conversation as resolved.
Show resolved Hide resolved
*/
@EqualsAndHashCode(callSuper = true)
public class KNNConcurrentSearchRequestDecider extends ConcurrentSearchRequestDecider {

private static final ConcurrentSearchDecision DEFAULT_KNN_DECISION = new ConcurrentSearchDecision(
ConcurrentSearchDecision.DecisionStatus.NO_OP,
"Default decision"
);
private static final ConcurrentSearchDecision YES = new ConcurrentSearchDecision(
ConcurrentSearchDecision.DecisionStatus.YES,
"Enable concurrent search for knn as Query has k-NN query in it and index is k-nn index"
);

private ConcurrentSearchDecision knnDecision = DEFAULT_KNN_DECISION;

@Override
public void evaluateForQuery(final QueryBuilder queryBuilder, final IndexSettings indexSettings) {
if (queryBuilder instanceof KNNQueryBuilder && indexSettings.getValue(KNNSettings.IS_KNN_INDEX_SETTING)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we be able to look into cpu and memory? Is this done in a chain like fashion?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we be able to look into cpu and memory

The control is with core and core decides based on CPU and couple other conditions. I don't think they have any immediate plans to give that control to plugin as per the RFC and plugin RFC

Is this done in a chain like fashion?

I don't completely understand the question. There will be multiple deciders based on number of plugins, it will loop through each of them and evaluate the query builder with visitor pattern using QueryBuilder.visit which evaluates and sets a decision. Even if one decider decides NO it will not execute concurrent search. Let me know if that answers your question

knnDecision = YES;
} else {
knnDecision = DEFAULT_KNN_DECISION;
}
}

@Override
public ConcurrentSearchDecision getConcurrentSearchDecision() {
return knnDecision;
}

/**
* Returns {@link KNNConcurrentSearchRequestDecider} when index.knn is true
*/
public static class Factory implements ConcurrentSearchRequestDecider.Factory {
shatejas marked this conversation as resolved.
Show resolved Hide resolved
public Optional<ConcurrentSearchRequestDecider> create(final IndexSettings indexSettings) {
if (indexSettings.getValue(KNNSettings.IS_KNN_INDEX_SETTING)) {
return Optional.of(new KNNConcurrentSearchRequestDecider());
}
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.integ.search;

import com.google.common.primitives.Floats;
import lombok.SneakyThrows;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.junit.BeforeClass;
import org.opensearch.client.Response;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.knn.KNNJsonIndexMappingsBuilder;
import org.opensearch.knn.KNNRestTestCase;
import org.opensearch.knn.KNNResult;
import org.opensearch.knn.TestUtils;
import org.opensearch.knn.index.SpaceType;
import org.opensearch.knn.index.engine.KNNEngine;
import org.opensearch.knn.index.query.KNNQueryBuilder;
import org.opensearch.knn.plugin.script.KNNScoringUtil;

import java.io.IOException;
import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

import static org.opensearch.knn.common.KNNConstants.METHOD_HNSW;

/**
* Note that this is simply a sanity test to make sure that concurrent search code path is hit E2E and scores are intact
* There is no latency verification as it can be better encapsulated in nightly runs.
*/
public class ConcurrentSegmentSearchIT extends KNNRestTestCase {

static TestUtils.TestData testData;

@BeforeClass
public static void setUpClass() throws IOException {
if (ConcurrentSegmentSearchIT.class.getClassLoader() == null) {
throw new IllegalStateException("ClassLoader of ConcurrentSegmentSearchIT Class is null");
}
URL testIndexVectors = ConcurrentSegmentSearchIT.class.getClassLoader().getResource("data/test_vectors_1000x128.json");
URL testQueries = ConcurrentSegmentSearchIT.class.getClassLoader().getResource("data/test_queries_100x128.csv");
assert testIndexVectors != null;
assert testQueries != null;
testData = new TestUtils.TestData(testIndexVectors.getPath(), testQueries.getPath());
}

@SneakyThrows
public void testConcurrentSegmentSearch_thenSucceed() {
String indexName = "test-concurrent-segment";
String fieldName = "test-field-1";
int dimension = testData.indexData.vectors[0].length;
final XContentBuilder indexBuilder = createFaissHnswIndexMapping(fieldName, dimension);
Map<String, Object> mappingMap = xContentBuilderToMap(indexBuilder);
String mapping = indexBuilder.toString();
createKnnIndex(indexName, mapping);
assertEquals(new TreeMap<>(mappingMap), new TreeMap<>(getIndexMappingAsMap(indexName)));

// Index the test data
for (int i = 0; i < testData.indexData.docs.length; i++) {
addKnnDoc(
indexName,
Integer.toString(testData.indexData.docs[i]),
fieldName,
Floats.asList(testData.indexData.vectors[i]).toArray()
);
}
refreshAllNonSystemIndices();
updateIndexSettings(indexName, Settings.builder().put("index.search.concurrent_segment_search.mode", "auto"));

// Test search queries
int k = 10;
verifySearch(indexName, fieldName, k);

updateIndexSettings(indexName, Settings.builder().put("index.search.concurrent_segment_search.mode", "all"));
Comment on lines +73 to +79
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the indexsetting should be reverted back to default otherwise for all other tests CSS will start to work.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its using a specific index name for the IT to make sure no other tests are affected.

Will delete the index to be safe

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure.

verifySearch(indexName, fieldName, k);

deleteKNNIndex(indexName);
}

/*
{
"properties": {
"<fieldName>": {
"type": "knn_vector",
"dimension": <dimension>,
"method": {
"name": "hnsw",
"space_type": "l2",
"engine": "faiss",
"parameters": {
"m": 16,
"ef_construction": 128,
"ef_search": 128
}
}
}
}
*/
@SneakyThrows
private XContentBuilder createFaissHnswIndexMapping(String fieldName, int dimension) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can do something like this:

createKnnIndex(testIndex, getKNNDefaultIndexSettings(), createKnnIndexMapping(TEST_FIELD, DIMENSIONS));

I would suggest checking KNNRestTestCase.java class as it has a lot of helper functions to create the index and not create another function in ITs for creating the mappings.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can do something like this

The mapping is being asserted so I need to separate it out

I would suggest checking KNNRestTestCase.java class as it has a lot of helper functions to create the index and not create another function in ITs for creating the mappings.

The requests for tests should be ideally localized to tests so we unknowingly don't affect multiple different test scenarios. Switching to KNNJsonIndexMappingsBuilder but I would prefer this mapping to be localized to this test

return KNNJsonIndexMappingsBuilder.builder()
.fieldName(fieldName)
.dimension(dimension)
.method(
KNNJsonIndexMappingsBuilder.Method.builder()
.engine(KNNEngine.FAISS.getName())
.methodName(METHOD_HNSW)
.spaceType(SpaceType.L2.getValue())
.parameters(KNNJsonIndexMappingsBuilder.Method.Parameters.builder().efConstruction(128).efSearch(128).m(16).build())
.build()
)
.build()
.getIndexMappingBuilder();
}

@SneakyThrows
private void verifySearch(String indexName, String fieldName, int k) {
for (int i = 0; i < testData.queries.length; i++) {
final KNNQueryBuilder queryBuilder = KNNQueryBuilder.builder().fieldName(fieldName).vector(testData.queries[i]).k(k).build();
Response response = searchKNNIndex(indexName, queryBuilder, k);
String responseBody = EntityUtils.toString(response.getEntity());
List<KNNResult> knnResults = parseSearchResponse(responseBody, fieldName);
assertEquals(k, knnResults.size());

List<Float> actualScores = parseSearchResponseScore(responseBody, fieldName);
for (int j = 0; j < k; j++) {
float[] primitiveArray = knnResults.get(j).getVector();
assertEquals(
KNNEngine.FAISS.score(KNNScoringUtil.l2Squared(testData.queries[i], primitiveArray), SpaceType.L2),
actualScores.get(j),
0.0001
);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.plugin.search;

import org.opensearch.index.IndexSettings;
import org.opensearch.index.query.MatchAllQueryBuilder;
import org.opensearch.knn.KNNTestCase;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.query.KNNQueryBuilder;
import org.opensearch.search.deciders.ConcurrentSearchDecision;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class KNNConcurrentSearchRequestDeciderTests extends KNNTestCase {

public void testDecider_thenSucceed() {
ConcurrentSearchDecision noop = new ConcurrentSearchDecision(ConcurrentSearchDecision.DecisionStatus.NO_OP, "Default decision");

KNNConcurrentSearchRequestDecider decider = new KNNConcurrentSearchRequestDecider();
assertDecision(noop, decider.getConcurrentSearchDecision());
IndexSettings indexSettingsMock = mock(IndexSettings.class);
when(indexSettingsMock.getValue(KNNSettings.IS_KNN_INDEX_SETTING)).thenReturn(Boolean.FALSE);

// Non KNNQueryBuilder
decider.evaluateForQuery(new MatchAllQueryBuilder(), indexSettingsMock);
assertDecision(noop, decider.getConcurrentSearchDecision());
decider.evaluateForQuery(
KNNQueryBuilder.builder().vector(new float[] { 1f, 2f, 3f, 4f, 5f, 6f }).fieldName("decider").k(10).build(),
indexSettingsMock
);
assertDecision(noop, decider.getConcurrentSearchDecision());

when(indexSettingsMock.getValue(KNNSettings.IS_KNN_INDEX_SETTING)).thenReturn(Boolean.TRUE);
decider.evaluateForQuery(
KNNQueryBuilder.builder().vector(new float[] { 1f, 2f, 3f, 4f, 5f, 6f }).fieldName("decider").k(10).build(),
indexSettingsMock
);
ConcurrentSearchDecision yes = new ConcurrentSearchDecision(
ConcurrentSearchDecision.DecisionStatus.YES,
"Enable concurrent search for knn as Query has k-NN query in it and index is k-nn index"
);
assertDecision(yes, decider.getConcurrentSearchDecision());

decider.evaluateForQuery(new MatchAllQueryBuilder(), indexSettingsMock);
assertDecision(noop, decider.getConcurrentSearchDecision());
}

public void testDeciderFactory_thenSucceed() {
KNNConcurrentSearchRequestDecider.Factory factory = new KNNConcurrentSearchRequestDecider.Factory();
IndexSettings indexSettingsMock = mock(IndexSettings.class);
when(indexSettingsMock.getValue(KNNSettings.IS_KNN_INDEX_SETTING)).thenReturn(Boolean.TRUE);
assertNotSame(factory.create(indexSettingsMock).get(), factory.create(indexSettingsMock).get());
when(indexSettingsMock.getValue(KNNSettings.IS_KNN_INDEX_SETTING)).thenReturn(Boolean.FALSE);
assertTrue(factory.create(indexSettingsMock).isEmpty());
}

private void assertDecision(ConcurrentSearchDecision expected, ConcurrentSearchDecision actual) {
assertEquals(expected.getDecisionReason(), actual.getDecisionReason());
assertEquals(expected.getDecisionStatus(), actual.getDecisionStatus());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import lombok.NonNull;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.knn.common.KNNConstants;

import java.io.IOException;

Expand All @@ -26,7 +27,7 @@ public class KNNJsonIndexMappingsBuilder {
private String vectorDataType;
private Method method;

public String getIndexMapping() throws IOException {
public XContentBuilder getIndexMappingBuilder() throws IOException {
if (nestedFieldName != null) {
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()
.startObject()
Expand All @@ -40,7 +41,7 @@ public String getIndexMapping() throws IOException {
addVectorDataType(xContentBuilder);
addMethod(xContentBuilder);
xContentBuilder.endObject().endObject().endObject().endObject().endObject();
return xContentBuilder.toString();
return xContentBuilder;
} else {
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()
.startObject()
Expand All @@ -51,10 +52,14 @@ public String getIndexMapping() throws IOException {
addVectorDataType(xContentBuilder);
addMethod(xContentBuilder);
xContentBuilder.endObject().endObject().endObject();
return xContentBuilder.toString();
return xContentBuilder;
}
}

public String getIndexMapping() throws IOException {
return getIndexMappingBuilder().toString();
}

private void addVectorDataType(final XContentBuilder xContentBuilder) throws IOException {
if (vectorDataType == null) {
return;
Expand Down Expand Up @@ -104,6 +109,7 @@ public static class Parameters {
private Encoder encoder;
private Integer efConstruction;
private Integer efSearch;
private Integer m;

private void addTo(final XContentBuilder xContentBuilder) throws IOException {
xContentBuilder.startObject("parameters");
Expand All @@ -113,6 +119,9 @@ private void addTo(final XContentBuilder xContentBuilder) throws IOException {
if (efSearch != null) {
xContentBuilder.field("ef_search", efSearch);
}
if (m != null) {
xContentBuilder.field(KNNConstants.METHOD_PARAMETER_M, m);
}
addEncoder(xContentBuilder);
xContentBuilder.endObject();
}
Expand Down
Loading