Skip to content

Commit

Permalink
Integrates KNN plugin with ConcurrentSearchRequestDecider interface
Browse files Browse the repository at this point in the history
This allows knn queries to enable concurrency when index.search.concurrent_segment_search.mode or
search.concurrent_segment_search.mode in auto mode. Without this the
default behavior of auto mode is non-concurrent search

Signed-off-by: Tejas Shah <[email protected]>
  • Loading branch information
shatejas committed Sep 18, 2024
1 parent 004fcc0 commit 886d57e
Show file tree
Hide file tree
Showing 5 changed files with 289 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
## [Unreleased 3.0](https://github.com/opensearch-project/k-NN/compare/2.x...HEAD)
### Features
### Enhancements
* Adds concurrent segment search support for mode auto [#2111](https://github.com/opensearch-project/k-NN/pull/2111)
### Bug Fixes
* Add DocValuesProducers for releasing memory when close index [#1946](https://github.com/opensearch-project/k-NN/pull/1946)
### Infrastructure
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/org/opensearch/knn/plugin/KNNPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.index.engine.EngineFactory;
import org.opensearch.indices.SystemIndexDescriptor;
import org.opensearch.knn.index.KNNCircuitBreaker;
import org.opensearch.knn.plugin.search.KNNConcurrentSearchRequestDecider;
import org.opensearch.knn.index.util.KNNClusterUtil;
import org.opensearch.knn.index.query.KNNQueryBuilder;
import org.opensearch.knn.index.KNNSettings;
Expand Down Expand Up @@ -95,6 +96,7 @@
import org.opensearch.script.ScriptContext;
import org.opensearch.script.ScriptEngine;
import org.opensearch.script.ScriptService;
import org.opensearch.search.deciders.ConcurrentSearchRequestDecider;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.FixedExecutorBuilder;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -349,4 +351,9 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
return ImmutableList.of(new SystemIndexDescriptor(MODEL_INDEX_NAME, "Index for storing models used for k-NN indices"));
}

@Override
public Optional<ConcurrentSearchRequestDecider.Factory> getConcurrentSearchRequestDeciderFactory() {
return Optional.of(new KNNConcurrentSearchRequestDecider.Factory());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.plugin.search;

import lombok.EqualsAndHashCode;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.query.KNNQueryBuilder;
import org.opensearch.search.deciders.ConcurrentSearchDecision;
import org.opensearch.search.deciders.ConcurrentSearchRequestDecider;

import java.util.Optional;

/**
* Decides if the knn query uses concurrent segment search
* As of 2.17, this is only used when
* - "index.search.concurrent_segment_search.mode": "auto" or
* - "search.concurrent_segment_search.mode": "auto"
*
* Note: the class is not thread-safe and a new instance needs to be created for each request
*/
@EqualsAndHashCode(callSuper = true)
public class KNNConcurrentSearchRequestDecider extends ConcurrentSearchRequestDecider {

private static final ConcurrentSearchDecision DEFAULT_KNN_DECISION = new ConcurrentSearchDecision(
ConcurrentSearchDecision.DecisionStatus.NO_OP,
"Default decision"
);
private static final ConcurrentSearchDecision YES = new ConcurrentSearchDecision(
ConcurrentSearchDecision.DecisionStatus.YES,
"Enable concurrent search for knn"
);

private ConcurrentSearchDecision knnDecision = DEFAULT_KNN_DECISION;

@Override
public void evaluateForQuery(final QueryBuilder queryBuilder, final IndexSettings indexSettings) {
if (queryBuilder instanceof KNNQueryBuilder && indexSettings.getValue(KNNSettings.IS_KNN_INDEX_SETTING)) {
knnDecision = YES;
} else {
knnDecision = DEFAULT_KNN_DECISION;
}
}

@Override
public ConcurrentSearchDecision getConcurrentSearchDecision() {
return knnDecision;
}

/**
* Returns {@link KNNConcurrentSearchRequestDecider} when index.knn is true
*/
public static class Factory implements ConcurrentSearchRequestDecider.Factory {
public Optional<ConcurrentSearchRequestDecider> create(final IndexSettings indexSettings) {
if (indexSettings.getValue(KNNSettings.IS_KNN_INDEX_SETTING)) {
return Optional.of(new KNNConcurrentSearchRequestDecider());
}
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.integ.search;

import com.google.common.primitives.Floats;
import lombok.SneakyThrows;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.junit.BeforeClass;
import org.opensearch.client.Response;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.knn.KNNRestTestCase;
import org.opensearch.knn.KNNResult;
import org.opensearch.knn.TestUtils;
import org.opensearch.knn.common.KNNConstants;
import org.opensearch.knn.index.SpaceType;
import org.opensearch.knn.index.engine.KNNEngine;
import org.opensearch.knn.index.query.KNNQueryBuilder;
import org.opensearch.knn.plugin.script.KNNScoringUtil;

import java.io.IOException;
import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

import static org.opensearch.knn.common.KNNConstants.KNN_ENGINE;
import static org.opensearch.knn.common.KNNConstants.METHOD_HNSW;
import static org.opensearch.knn.common.KNNConstants.METHOD_PARAMETER_SPACE_TYPE;
import static org.opensearch.knn.common.KNNConstants.NAME;
import static org.opensearch.knn.common.KNNConstants.PARAMETERS;

/**
* Note that this is simply a sanity test to make sure that concurrent search code path is hit E2E and scores are intact
* There is no latency verification as it can be better encapsulated in nightly runs.
*/
public class ConcurrentSegmentSearchIT extends KNNRestTestCase {

static TestUtils.TestData testData;

@BeforeClass
public static void setUpClass() throws IOException {
if (ConcurrentSegmentSearchIT.class.getClassLoader() == null) {
throw new IllegalStateException("ClassLoader of FaissIT Class is null");
}
URL testIndexVectors = ConcurrentSegmentSearchIT.class.getClassLoader().getResource("data/test_vectors_1000x128.json");
URL testQueries = ConcurrentSegmentSearchIT.class.getClassLoader().getResource("data/test_queries_100x128.csv");
assert testIndexVectors != null;
assert testQueries != null;
testData = new TestUtils.TestData(testIndexVectors.getPath(), testQueries.getPath());
}

@SneakyThrows
public void testConcurrentSegmentSearch() {
String indexName = "test-concurrent-segment";
String fieldName = "test-field-1";
int dimension = testData.indexData.vectors[0].length;
final XContentBuilder indexBuilder = createFaissHnswIndexMapping(fieldName, dimension);
Map<String, Object> mappingMap = xContentBuilderToMap(indexBuilder);
String mapping = indexBuilder.toString();
createKnnIndex(indexName, mapping);
assertEquals(new TreeMap<>(mappingMap), new TreeMap<>(getIndexMappingAsMap(indexName)));

// Index the test data
for (int i = 0; i < testData.indexData.docs.length; i++) {
addKnnDoc(
indexName,
Integer.toString(testData.indexData.docs[i]),
fieldName,
Floats.asList(testData.indexData.vectors[i]).toArray()
);
}
refreshAllNonSystemIndices();
updateIndexSettings(indexName, Settings.builder().put("index.search.concurrent_segment_search.mode", "auto"));

// Test search queries
int k = 10;
verifySearch(indexName, fieldName, k);

updateIndexSettings(indexName, Settings.builder().put("index.search.concurrent_segment_search.mode", "all"));
verifySearch(indexName, fieldName, k);
}

/*
{
"properties": {
"<fieldName>": {
"type": "knn_vector",
"dimension": <dimension>,
"method": {
"name": "hnsw",
"space_type": "l2",
"engine": "faiss",
"parameters": {
"m": 16,
"ef_construction": 128,
"ef_search": 128
}
}
}
}
*/
@SneakyThrows
private XContentBuilder createFaissHnswIndexMapping(String fieldName, int dimension) {
// Create an index
return XContentFactory.jsonBuilder()
.startObject()
.startObject("properties")
.startObject(fieldName)
.field("type", "knn_vector")
.field("dimension", dimension)
.startObject(KNNConstants.KNN_METHOD)
.field(NAME, METHOD_HNSW)
.field(METHOD_PARAMETER_SPACE_TYPE, SpaceType.L2.getValue())
.field(KNN_ENGINE, KNNEngine.FAISS.getName())
.startObject(PARAMETERS)
.field(KNNConstants.METHOD_PARAMETER_M, 16)
.field(KNNConstants.METHOD_PARAMETER_EF_CONSTRUCTION, 128)
.field(KNNConstants.METHOD_PARAMETER_EF_SEARCH, 128)
.endObject()
.endObject()
.endObject()
.endObject()
.endObject();
}

@SneakyThrows
private void verifySearch(String indexName, String fieldName, int k) {
for (int i = 0; i < testData.queries.length; i++) {
final KNNQueryBuilder queryBuilder = KNNQueryBuilder.builder().fieldName(fieldName).vector(testData.queries[i]).k(k).build();
Response response = searchKNNIndex(indexName, queryBuilder, k);
String responseBody = EntityUtils.toString(response.getEntity());
List<KNNResult> knnResults = parseSearchResponse(responseBody, fieldName);
assertEquals(k, knnResults.size());

List<Float> actualScores = parseSearchResponseScore(responseBody, fieldName);
for (int j = 0; j < k; j++) {
float[] primitiveArray = knnResults.get(j).getVector();
assertEquals(
KNNEngine.FAISS.score(KNNScoringUtil.l2Squared(testData.queries[i], primitiveArray), SpaceType.L2),
actualScores.get(j),
0.0001
);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.plugin.search;

import org.opensearch.index.IndexSettings;
import org.opensearch.index.query.MatchAllQueryBuilder;
import org.opensearch.knn.KNNTestCase;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.query.KNNQueryBuilder;
import org.opensearch.search.deciders.ConcurrentSearchDecision;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class KNNConcurrentSearchRequestDeciderTests extends KNNTestCase {

public void testDecider() {
ConcurrentSearchDecision noop = new ConcurrentSearchDecision(ConcurrentSearchDecision.DecisionStatus.NO_OP, "Default decision");

KNNConcurrentSearchRequestDecider decider = new KNNConcurrentSearchRequestDecider();
assertDecision(noop, decider.getConcurrentSearchDecision());
IndexSettings indexSettingsMock = mock(IndexSettings.class);
when(indexSettingsMock.getValue(KNNSettings.IS_KNN_INDEX_SETTING)).thenReturn(Boolean.FALSE);

// Non KNNQueryBuilder
decider.evaluateForQuery(new MatchAllQueryBuilder(), indexSettingsMock);
assertDecision(noop, decider.getConcurrentSearchDecision());
decider.evaluateForQuery(
KNNQueryBuilder.builder().vector(new float[] { 1f, 2f, 3f, 4f, 5f, 6f }).fieldName("decider").k(10).build(),
indexSettingsMock
);
assertDecision(noop, decider.getConcurrentSearchDecision());

when(indexSettingsMock.getValue(KNNSettings.IS_KNN_INDEX_SETTING)).thenReturn(Boolean.TRUE);
decider.evaluateForQuery(
KNNQueryBuilder.builder().vector(new float[] { 1f, 2f, 3f, 4f, 5f, 6f }).fieldName("decider").k(10).build(),
indexSettingsMock
);
ConcurrentSearchDecision yes = new ConcurrentSearchDecision(
ConcurrentSearchDecision.DecisionStatus.YES,
"Enable concurrent search for knn"
);
assertDecision(yes, decider.getConcurrentSearchDecision());

decider.evaluateForQuery(new MatchAllQueryBuilder(), indexSettingsMock);
assertDecision(noop, decider.getConcurrentSearchDecision());
}

public void testDeciderFactory() {
KNNConcurrentSearchRequestDecider.Factory factory = new KNNConcurrentSearchRequestDecider.Factory();
IndexSettings indexSettingsMock = mock(IndexSettings.class);
when(indexSettingsMock.getValue(KNNSettings.IS_KNN_INDEX_SETTING)).thenReturn(Boolean.TRUE);
assertNotSame(factory.create(indexSettingsMock).get(), factory.create(indexSettingsMock).get());
when(indexSettingsMock.getValue(KNNSettings.IS_KNN_INDEX_SETTING)).thenReturn(Boolean.FALSE);
assertTrue(factory.create(indexSettingsMock).isEmpty());
}

private void assertDecision(ConcurrentSearchDecision expected, ConcurrentSearchDecision actual) {
assertEquals(expected.getDecisionReason(), actual.getDecisionReason());
assertEquals(expected.getDecisionStatus(), actual.getDecisionStatus());
}
}

0 comments on commit 886d57e

Please sign in to comment.