Skip to content

Commit

Permalink
[Segment Replication] Support realtime TermVector requests with Segme…
Browse files Browse the repository at this point in the history
…nt Replication (opensearch-project#9585)

* support realtime TermVector and MultiTermVector requests with segment replication.

Signed-off-by: Rishikesh1159 <[email protected]>

* Fix TermVector requests with segrep.

Signed-off-by: Rishikesh1159 <[email protected]>

* Refacotring.

Signed-off-by: Rishikesh1159 <[email protected]>

* Address comments on PR.

Signed-off-by: Rishikesh1159 <[email protected]>

---------

Signed-off-by: Rishikesh1159 <[email protected]>
Signed-off-by: Rishikesh Pasham <[email protected]>
  • Loading branch information
Rishikesh1159 authored Sep 8, 2023
1 parent 8e5e54b commit caf4c80
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.apache.lucene.document.SortedDocValuesField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.Fields;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.SegmentInfos;
Expand All @@ -38,6 +39,8 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchType;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.termvectors.TermVectorsRequestBuilder;
import org.opensearch.action.termvectors.TermVectorsResponse;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.Requests;
import org.opensearch.cluster.ClusterState;
Expand All @@ -57,6 +60,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.IndexModule;
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.SegmentReplicationPressureService;
Expand All @@ -81,6 +85,7 @@
import org.opensearch.transport.TransportService;
import org.junit.Before;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -1622,4 +1627,154 @@ public void testRealtimeMultiGetRequestsUnsuccessful() {
assertTrue(mgetResponse.getResponses()[1].isFailed());

}

/**
* Tests whether segment replication supports realtime termvector requests and reads and parses source from the translog to serve strong reads.
*/
public void testRealtimeTermVectorRequestsSuccessful() throws IOException {
final String primary = internalCluster().startDataOnlyNode();
XContentBuilder mapping = jsonBuilder().startObject()
.startObject("properties")
.startObject("field")
.field("type", "text")
.field("term_vector", "with_positions_offsets_payloads")
.field("analyzer", "tv_test")
.endObject()
.endObject()
.endObject();
// refresh interval disabled to ensure refresh rate of index (when data is ready for search) doesn't affect realtime termvectors
assertAcked(
prepareCreate(INDEX_NAME).setMapping(mapping)
.addAlias(new Alias("alias"))
.setSettings(
Settings.builder()
.put(indexSettings())
.put("index.analysis.analyzer.tv_test.tokenizer", "standard")
.put("index.refresh_interval", -1)
.putList("index.analysis.analyzer.tv_test.filter", "lowercase")
)
);
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);
final String id = routingKeyForShard(INDEX_NAME, 0);

TermVectorsResponse response = client(replica).prepareTermVectors(indexOrAlias(), "1").get();
assertFalse(response.isExists());

// index doc 1
client().prepareIndex(INDEX_NAME)
.setId(Integer.toString(1))
.setSource(jsonBuilder().startObject().field("field", "the quick brown fox jumps over the lazy dog").endObject())
.execute()
.actionGet();

// non realtime termvectors 1
response = client().prepareTermVectors(indexOrAlias(), Integer.toString(1)).setRealtime(false).get();
assertFalse(response.isExists());

// realtime termvectors 1
TermVectorsRequestBuilder resp = client().prepareTermVectors(indexOrAlias(), Integer.toString(1))
.setPayloads(true)
.setOffsets(true)
.setPositions(true)
.setRealtime(true)
.setSelectedFields();
response = resp.execute().actionGet();
assertThat(response.getIndex(), equalTo(INDEX_NAME));
assertThat("doc id: " + 1 + " doesn't exists but should", response.isExists(), equalTo(true));
Fields fields = response.getFields();
assertThat(fields.size(), equalTo(1));

// index doc 2 with routing
client().prepareIndex(INDEX_NAME)
.setId(Integer.toString(2))
.setRouting(id)
.setSource(jsonBuilder().startObject().field("field", "the quick brown fox jumps over the lazy dog").endObject())
.execute()
.actionGet();

// realtime termvectors 2 with routing
resp = client().prepareTermVectors(indexOrAlias(), Integer.toString(2))
.setPayloads(true)
.setOffsets(true)
.setPositions(true)
.setRouting(id)
.setSelectedFields();
response = resp.execute().actionGet();
assertThat(response.getIndex(), equalTo(INDEX_NAME));
assertThat("doc id: " + 1 + " doesn't exists but should", response.isExists(), equalTo(true));
fields = response.getFields();
assertThat(fields.size(), equalTo(1));

}

public void testRealtimeTermVectorRequestsUnSuccessful() throws IOException {
final String primary = internalCluster().startDataOnlyNode();
XContentBuilder mapping = jsonBuilder().startObject()
.startObject("properties")
.startObject("field")
.field("type", "text")
.field("term_vector", "with_positions_offsets_payloads")
.field("analyzer", "tv_test")
.endObject()
.endObject()
.endObject();
// refresh interval disabled to ensure refresh rate of index (when data is ready for search) doesn't affect realtime termvectors
assertAcked(
prepareCreate(INDEX_NAME).setMapping(mapping)
.addAlias(new Alias("alias"))
.setSettings(
Settings.builder()
.put(indexSettings())
.put("index.analysis.analyzer.tv_test.tokenizer", "standard")
.put("index.refresh_interval", -1)
.putList("index.analysis.analyzer.tv_test.filter", "lowercase")
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2)
)
);
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);
final String id = routingKeyForShard(INDEX_NAME, 0);
final String routingOtherShard = routingKeyForShard(INDEX_NAME, 1);

// index doc 1
client().prepareIndex(INDEX_NAME)
.setId(Integer.toString(1))
.setSource(jsonBuilder().startObject().field("field", "the quick brown fox jumps over the lazy dog").endObject())
.setRouting(id)
.execute()
.actionGet();

// non realtime termvectors 1
TermVectorsResponse response = client().prepareTermVectors(indexOrAlias(), Integer.toString(1)).setRealtime(false).get();
assertFalse(response.isExists());

// realtime termvectors (preference = _replica)
TermVectorsRequestBuilder resp = client(replica).prepareTermVectors(indexOrAlias(), Integer.toString(1))
.setPayloads(true)
.setOffsets(true)
.setPositions(true)
.setPreference(Preference.REPLICA.type())
.setRealtime(true)
.setSelectedFields();
response = resp.execute().actionGet();

assertFalse(response.isExists());
assertThat(response.getIndex(), equalTo(INDEX_NAME));

// realtime termvectors (with routing set)
resp = client(replica).prepareTermVectors(indexOrAlias(), Integer.toString(1))
.setPayloads(true)
.setOffsets(true)
.setPositions(true)
.setRouting(routingOtherShard)
.setSelectedFields();
response = resp.execute().actionGet();

assertFalse(response.isExists());
assertThat(response.getIndex(), equalTo(INDEX_NAME));

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.routing.GroupShardsIterator;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.cluster.routing.ShardIterator;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
Expand Down Expand Up @@ -87,15 +88,24 @@ public TransportTermVectorsAction(

@Override
protected ShardIterator shards(ClusterState state, InternalRequest request) {

String preference = request.request().preference;
// For a real time request on a seg rep index, use primary shard as the preferred query shard.
if (request.request().realtime()
&& preference == null
&& state.getMetadata().isSegmentReplicationEnabled(request.concreteIndex())) {
preference = Preference.PRIMARY.type();
}

if (request.request().doc() != null && request.request().routing() == null) {
// artificial document without routing specified, ignore its "id" and use either random shard or according to preference
GroupShardsIterator<ShardIterator> groupShardsIter = clusterService.operationRouting()
.searchShards(state, new String[] { request.concreteIndex() }, null, request.request().preference());
.searchShards(state, new String[] { request.concreteIndex() }, null, preference);
return groupShardsIter.iterator().next();
}

return clusterService.operationRouting()
.getShards(state, request.concreteIndex(), request.request().id(), request.request().routing(), request.request().preference());
.getShards(state, request.concreteIndex(), request.request().id(), request.request().routing(), preference);
}

@Override
Expand Down

0 comments on commit caf4c80

Please sign in to comment.