Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add reindex ITs #446

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,60 @@ private String registerModelGroup() {
return modelGroupId;
}

protected void createIndexWithPipeline(String indexName, String indexMappingFileName, String pipelineName) throws Exception {
createIndexWithConfiguration(
indexName,
Files.readString(Path.of(classLoader.getResource("processor/" + indexMappingFileName).toURI())),
pipelineName
);
}

/**
* Ingest a document to index.
* @param indexName
* @param ingestDocument
* @throws Exception
*/
protected String ingestDocument(String indexName, String ingestDocument) throws Exception {
Response response = makeRequest(
client(),
"POST",
indexName + "/_doc?refresh",
null,
toHttpEntity(ingestDocument),
ImmutableList.of(new BasicHeader(HttpHeaders.USER_AGENT, "Kibana"))
);
Map<String, Object> map = XContentHelper.convertToMap(
XContentType.JSON.xContent(),
EntityUtils.toString(response.getEntity()),
false
);
return (String) map.get("result");
}

/**
* Reindex from one index to another
* @param fromIndexName
* @param toIndexName
* @throws Exception
*/
protected void reindex(String fromIndexName, String toIndexName) throws Exception {
Response response = makeRequest(
client(),
"POST",
"/_reindex?refresh",
null,
toHttpEntity("{\"source\":{\"index\":\"" + fromIndexName + "\"},\"dest\":{\"index\":\"" + toIndexName + "\"}}"),
ImmutableList.of(new BasicHeader(HttpHeaders.USER_AGENT, "Kibana"))
);
Map<String, Object> map = XContentHelper.convertToMap(
XContentType.JSON.xContent(),
EntityUtils.toString(response.getEntity()),
false
);
assertEquals(0, ((List) map.get("failures")).size());
}

/**
* Enumeration for types of pipeline processors, used to lookup resources like create
* processor request as those are type specific
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,31 @@

package org.opensearch.neuralsearch.processor;

import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;

import lombok.SneakyThrows;

import org.apache.hc.core5.http.HttpHeaders;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.http.message.BasicHeader;
import org.junit.After;
import org.opensearch.client.Response;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.neuralsearch.common.BaseSparseEncodingIT;

import com.google.common.collect.ImmutableList;

public class SparseEncodingProcessIT extends BaseSparseEncodingIT {

private static final String INDEX_NAME = "sparse_encoding_index";

private static final String PIPELINE_NAME = "pipeline-sparse-encoding";

private static final String INGEST_DOCUMENT = "{\n"
+ " \"title\": \"This is a good day\",\n"
+ " \"description\": \"daily logging\",\n"
+ " \"favor_list\": [\n"
+ " \"test\",\n"
+ " \"hello\",\n"
+ " \"mock\"\n"
+ " ],\n"
+ " \"favorites\": {\n"
+ " \"game\": \"overwatch\",\n"
+ " \"movie\": null\n"
+ " }\n"
+ "}\n";

@After
@SneakyThrows
public void tearDown() {
Expand All @@ -42,47 +44,26 @@ public void tearDown() {
public void testSparseEncodingProcessor() throws Exception {
String modelId = prepareModel();
createPipelineProcessor(modelId, PIPELINE_NAME, ProcessorType.SPARSE_ENCODING);
createSparseEncodingIndex();
ingestDocument();
createIndexWithPipeline(INDEX_NAME, "SparseEncodingIndexMappings.json", PIPELINE_NAME);
String result = ingestDocument(INDEX_NAME, INGEST_DOCUMENT);
assertEquals("created", result);
assertEquals(1, getDocCount(INDEX_NAME));
}

private void createSparseEncodingIndex() throws Exception {
createIndexWithConfiguration(
INDEX_NAME,
Files.readString(Path.of(classLoader.getResource("processor/SparseEncodingIndexMappings.json").toURI())),
PIPELINE_NAME
);
}

private void ingestDocument() throws Exception {
String ingestDocument = "{\n"
+ " \"title\": \"This is a good day\",\n"
+ " \"description\": \"daily logging\",\n"
+ " \"favor_list\": [\n"
+ " \"test\",\n"
+ " \"hello\",\n"
+ " \"mock\"\n"
+ " ],\n"
+ " \"favorites\": {\n"
+ " \"game\": \"overwatch\",\n"
+ " \"movie\": null\n"
+ " }\n"
+ "}\n";
Response response = makeRequest(
client(),
"POST",
INDEX_NAME + "/_doc?refresh",
null,
toHttpEntity(ingestDocument),
ImmutableList.of(new BasicHeader(HttpHeaders.USER_AGENT, "Kibana"))
);
Map<String, Object> map = XContentHelper.convertToMap(
XContentType.JSON.xContent(),
EntityUtils.toString(response.getEntity()),
false
);
assertEquals("created", map.get("result"));
public void testSparseEncodingProcessorWithReindex() throws Exception {
// create a simple index and indexing data into this index.
String fromIndexName = "test-reindex-from";
createIndexWithConfiguration(fromIndexName, "{ \"settings\": { \"number_of_shards\": 1, \"number_of_replicas\": 0 } }", null);
String result = ingestDocument(fromIndexName, "{ \"text\": \"hello world\" }");
assertEquals("created", result);
// create text embedding index for reindex
String modelId = prepareModel();
String toIndexName = "test-reindex-to";
String pipelineName = "pipeline-text-sparse-encoding";
createPipelineProcessor(modelId, pipelineName);
createIndexWithPipeline(toIndexName, "SparseEncodingIndexMappings.json", pipelineName);
reindex(fromIndexName, toIndexName);
assertEquals(1, getDocCount(toIndexName));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,32 @@

import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;

import lombok.SneakyThrows;

import org.apache.hc.core5.http.HttpHeaders;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.http.message.BasicHeader;
import org.junit.After;
import org.opensearch.client.Response;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.neuralsearch.common.BaseNeuralSearchIT;

import com.google.common.collect.ImmutableList;

public class TextEmbeddingProcessorIT extends BaseNeuralSearchIT {

private static final String INDEX_NAME = "text_embedding_index";

private static final String PIPELINE_NAME = "pipeline-hybrid";

private static final String TEXT_EMBEDDING_DOCUMENT = "{\n"
+ " \"title\": \"This is a good day\",\n"
+ " \"description\": \"daily logging\",\n"
+ " \"favor_list\": [\n"
+ " \"test\",\n"
+ " \"hello\",\n"
+ " \"mock\"\n"
+ " ],\n"
+ " \"favorites\": {\n"
+ " \"game\": \"overwatch\",\n"
+ " \"movie\": null\n"
+ " }\n"
+ "}\n";

@After
@SneakyThrows
public void tearDown() {
Expand All @@ -43,52 +48,32 @@ public void testTextEmbeddingProcessor() throws Exception {
String modelId = uploadTextEmbeddingModel();
loadModel(modelId);
createPipelineProcessor(modelId, PIPELINE_NAME);
createTextEmbeddingIndex();
ingestDocument();
createIndexWithPipeline(INDEX_NAME, "IndexMappings.json", PIPELINE_NAME);
String result = ingestDocument(INDEX_NAME, TEXT_EMBEDDING_DOCUMENT);
assertEquals("created", result);
assertEquals(1, getDocCount(INDEX_NAME));
}

public void testTextEmbeddingProcessorWithReindexOperation() throws Exception {
// create a simple index and indexing data into this index.
String fromIndexName = "test-reindex-from";
createIndexWithConfiguration(fromIndexName, "{ \"settings\": { \"number_of_shards\": 1, \"number_of_replicas\": 0 } }", null);
String result = ingestDocument(fromIndexName, "{ \"text\": \"hello world\" }");
assertEquals("created", result);
// create text embedding index for reindex
String modelId = uploadTextEmbeddingModel();
loadModel(modelId);
String toIndexName = "test-reindex-to";
String pipelineName = "pipeline-text-embedding";
createPipelineProcessor(modelId, pipelineName);
createIndexWithPipeline(toIndexName, "IndexMappings.json", pipelineName);
reindex(fromIndexName, toIndexName);
assertEquals(1, getDocCount(toIndexName));
}

private String uploadTextEmbeddingModel() throws Exception {
String requestBody = Files.readString(Path.of(classLoader.getResource("processor/UploadModelRequestBody.json").toURI()));
return uploadModel(requestBody);
}

private void createTextEmbeddingIndex() throws Exception {
createIndexWithConfiguration(
INDEX_NAME,
Files.readString(Path.of(classLoader.getResource("processor/IndexMappings.json").toURI())),
PIPELINE_NAME
);
}

private void ingestDocument() throws Exception {
String ingestDocument = "{\n"
+ " \"title\": \"This is a good day\",\n"
+ " \"description\": \"daily logging\",\n"
+ " \"favor_list\": [\n"
+ " \"test\",\n"
+ " \"hello\",\n"
+ " \"mock\"\n"
+ " ],\n"
+ " \"favorites\": {\n"
+ " \"game\": \"overwatch\",\n"
+ " \"movie\": null\n"
+ " }\n"
+ "}\n";
Response response = makeRequest(
client(),
"POST",
INDEX_NAME + "/_doc?refresh",
null,
toHttpEntity(ingestDocument),
ImmutableList.of(new BasicHeader(HttpHeaders.USER_AGENT, "Kibana"))
);
Map<String, Object> map = XContentHelper.convertToMap(
XContentType.JSON.xContent(),
EntityUtils.toString(response.getEntity()),
false
);
assertEquals("created", map.get("result"));
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"settings":{
"default_pipeline": "pipeline-sparse-encoding"
"default_pipeline": "%s"
},
"mappings": {
"properties": {
Expand All @@ -23,4 +23,4 @@
}
}
}
}
}
Loading