Skip to content

Commit

Permalink
Segment Replication - Remove redundant replica doc parsing on writes. (
Browse files Browse the repository at this point in the history
…#7279) (#7281)

This change removes unnecessary doc parsing currently performed on replicas by
updating applyIndexOperationOnReplicas to pass a doc id from the primary.


(cherry picked from commit 66e49a6)

Signed-off-by: Marc Handalian <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent c1b986f commit bb32f75
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -857,6 +857,7 @@ private static Engine.Result performOpOnReplica(
indexRequest.routing()
);
result = replica.applyIndexOperationOnReplica(
primaryResponse.getId(),
primaryResponse.getSeqNo(),
primaryResponse.getPrimaryTerm(),
primaryResponse.getVersion(),
Expand Down
33 changes: 33 additions & 0 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -916,13 +916,31 @@ public Engine.IndexResult applyIndexOperationOnPrimary(
}

public Engine.IndexResult applyIndexOperationOnReplica(
String id,
long seqNo,
long opPrimaryTerm,
long version,
long autoGeneratedTimeStamp,
boolean isRetry,
SourceToParse sourceToParse
) throws IOException {
if (indexSettings.isSegRepEnabled()) {
Engine.Index index = new Engine.Index(
new Term(IdFieldMapper.NAME, Uid.encodeId(id)),
new ParsedDocument(null, null, id, null, null, sourceToParse.source(), sourceToParse.getXContentType(), null),
seqNo,
opPrimaryTerm,
version,
null,
Engine.Operation.Origin.REPLICA,
System.nanoTime(),
autoGeneratedTimeStamp,
isRetry,
UNASSIGNED_SEQ_NO,
0
);
return getEngine().index(index);
}
return applyIndexOperation(
getEngine(),
seqNo,
Expand Down Expand Up @@ -1133,6 +1151,21 @@ public Engine.DeleteResult applyDeleteOperationOnPrimary(
}

public Engine.DeleteResult applyDeleteOperationOnReplica(long seqNo, long opPrimaryTerm, long version, String id) throws IOException {
if (indexSettings.isSegRepEnabled()) {
final Engine.Delete delete = new Engine.Delete(
id,
new Term(IdFieldMapper.NAME, Uid.encodeId(id)),
seqNo,
opPrimaryTerm,
version,
null,
Engine.Operation.Origin.REPLICA,
System.nanoTime(),
UNASSIGNED_SEQ_NO,
0
);
return getEngine().delete(delete);
}
return applyDeleteOperation(
getEngine(),
seqNo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ public void testRecoveryToReplicaThatReceivedExtraDocument() throws Exception {
final IndexShard remainingReplica = shards.getReplicas().get(1);
// slip the extra document into the replica
remainingReplica.applyIndexOperationOnReplica(
"id",
remainingReplica.getLocalCheckpoint() + 1,
remainingReplica.getOperationPrimaryTerm(),
1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
Expand Down Expand Up @@ -2236,6 +2237,7 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException {
shard.applyDeleteOperationOnReplica(1, primaryTerm, 2, "id");
shard.getEngine().rollTranslogGeneration(); // isolate the delete in it's own generation
shard.applyIndexOperationOnReplica(
UUID.randomUUID().toString(),
0,
primaryTerm,
1,
Expand All @@ -2244,6 +2246,7 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException {
new SourceToParse(shard.shardId().getIndexName(), "id", new BytesArray("{}"), XContentType.JSON)
);
shard.applyIndexOperationOnReplica(
UUID.randomUUID().toString(),
3,
primaryTerm,
3,
Expand All @@ -2254,6 +2257,7 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException {
// Flushing a new commit with local checkpoint=1 allows to skip the translog gen #1 in recovery.
shard.flush(new FlushRequest().force(true).waitIfOngoing(true));
shard.applyIndexOperationOnReplica(
UUID.randomUUID().toString(),
2,
primaryTerm,
3,
Expand All @@ -2262,6 +2266,7 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException {
new SourceToParse(shard.shardId().getIndexName(), "id-2", new BytesArray("{}"), XContentType.JSON)
);
shard.applyIndexOperationOnReplica(
UUID.randomUUID().toString(),
5,
primaryTerm,
1,
Expand Down Expand Up @@ -2409,6 +2414,7 @@ public void testRecoverFromStoreWithNoOps() throws IOException {
updateMappings(otherShard, shard.indexSettings().getIndexMetadata());
SourceToParse sourceToParse = new SourceToParse(shard.shardId().getIndexName(), "1", new BytesArray("{}"), XContentType.JSON);
otherShard.applyIndexOperationOnReplica(
UUID.randomUUID().toString(),
1,
otherShard.getOperationPrimaryTerm(),
1,
Expand Down Expand Up @@ -2536,6 +2542,7 @@ public void testRecoverFromStoreRemoveStaleOperations() throws Exception {
final String indexName = shard.shardId().getIndexName();
// Index #0, index #1
shard.applyIndexOperationOnReplica(
UUID.randomUUID().toString(),
0,
primaryTerm,
1,
Expand All @@ -2546,6 +2553,7 @@ public void testRecoverFromStoreRemoveStaleOperations() throws Exception {
flushShard(shard);
shard.updateGlobalCheckpointOnReplica(0, "test"); // stick the global checkpoint here.
shard.applyIndexOperationOnReplica(
UUID.randomUUID().toString(),
1,
primaryTerm,
1,
Expand All @@ -2558,6 +2566,7 @@ public void testRecoverFromStoreRemoveStaleOperations() throws Exception {
shard.getEngine().rollTranslogGeneration();
shard.markSeqNoAsNoop(1, primaryTerm, "test");
shard.applyIndexOperationOnReplica(
UUID.randomUUID().toString(),
2,
primaryTerm,
1,
Expand Down Expand Up @@ -3948,6 +3957,7 @@ private Result indexOnReplicaWithGaps(final IndexShard indexShard, final int ope
XContentType.JSON
);
indexShard.applyIndexOperationOnReplica(
UUID.randomUUID().toString(),
i,
indexShard.getOperationPrimaryTerm(),
1,
Expand Down Expand Up @@ -4577,6 +4587,7 @@ public void testDoNotTrimCommitsWhenOpenReadOnlyEngine() throws Exception {
seqNo++; // create gaps in sequence numbers
}
shard.applyIndexOperationOnReplica(
UUID.randomUUID().toString(),
seqNo,
shard.getOperationPrimaryTerm(),
1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.opensearch.index.replication.TestReplicationSource;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.translog.SnapshotMatchers;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.RecoveryTarget;
import org.opensearch.indices.replication.CheckpointInfoResponse;
Expand Down Expand Up @@ -176,6 +178,7 @@ public void testSegmentReplication_Index_Update_Delete() throws Exception {
shards.index(new IndexRequest(index.getName()).id(String.valueOf(i)).source("{\"foo\": \"bar\"}", XContentType.JSON));
}

assertEqualTranslogOperations(shards, primaryShard);
primaryShard.refresh("Test");
replicateSegments(primaryShard, shards.getReplicas());

Expand All @@ -189,7 +192,7 @@ public void testSegmentReplication_Index_Update_Delete() throws Exception {
);
}
}

assertEqualTranslogOperations(shards, primaryShard);
primaryShard.refresh("Test");
replicateSegments(primaryShard, shards.getReplicas());
shards.assertAllEqual(numDocs);
Expand All @@ -204,6 +207,7 @@ public void testSegmentReplication_Index_Update_Delete() throws Exception {
shards.delete(new DeleteRequest(index.getName()).id(String.valueOf(i)));
}
}
assertEqualTranslogOperations(shards, primaryShard);
primaryShard.refresh("Test");
replicateSegments(primaryShard, shards.getReplicas());
final List<DocIdSeqNoAndSource> docsAfterDelete = getDocIdAndSeqNos(primaryShard);
Expand Down Expand Up @@ -753,6 +757,7 @@ public void testNRTReplicaPromotedAsPrimary() throws Exception {
for (IndexShard shard : shards.getReplicas()) {
assertDocCounts(shard, numDocs, numDocs);
}
assertEqualTranslogOperations(shards, oldPrimary);

// 2. Create ops that are in the replica's xlog, not in the index.
// index some more into both but don't replicate. replica will have only numDocs searchable, but should have totalDocs
Expand All @@ -761,6 +766,7 @@ public void testNRTReplicaPromotedAsPrimary() throws Exception {
final int totalDocs = numDocs + additonalDocs;

assertDocCounts(oldPrimary, totalDocs, totalDocs);
assertEqualTranslogOperations(shards, oldPrimary);
for (IndexShard shard : shards.getReplicas()) {
assertDocCounts(shard, totalDocs, numDocs);
}
Expand Down Expand Up @@ -1083,4 +1089,20 @@ private void assertEqualCommittedSegments(IndexShard primary, IndexShard... repl
assertTrue(diff.missing.isEmpty());
}
}

private void assertEqualTranslogOperations(ReplicationGroup shards, IndexShard primaryShard) throws IOException {
try (final Translog.Snapshot snapshot = getTranslog(primaryShard).newSnapshot()) {
List<Translog.Operation> operations = new ArrayList<>();
Translog.Operation op;
while ((op = snapshot.next()) != null) {
final Translog.Operation newOp = op;
operations.add(newOp);
}
for (IndexShard replica : shards.getReplicas()) {
try (final Translog.Snapshot replicaSnapshot = getTranslog(replica).newSnapshot()) {
assertThat(replicaSnapshot, SnapshotMatchers.containsOperationsInAnyOrder(operations));
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CyclicBarrier;
Expand Down Expand Up @@ -182,6 +183,7 @@ private SeqNoStats populateRandomData(IndexShard shard) throws IOException {
Randomness.shuffle(seqNos);
for (long seqNo : seqNos) {
shard.applyIndexOperationOnReplica(
UUID.randomUUID().toString(),
seqNo,
1,
shard.getOperationPrimaryTerm(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -181,6 +182,7 @@ public void testRecoveryWithOutOfOrderDeleteWithSoftDeletes() throws Exception {
orgReplica.flush(new FlushRequest().force(true)); // isolate delete#1 in its own translog generation and lucene segment
// index #0
orgReplica.applyIndexOperationOnReplica(
UUID.randomUUID().toString(),
0,
primaryTerm,
1,
Expand All @@ -190,6 +192,7 @@ public void testRecoveryWithOutOfOrderDeleteWithSoftDeletes() throws Exception {
);
// index #3
orgReplica.applyIndexOperationOnReplica(
UUID.randomUUID().toString(),
3,
primaryTerm,
1,
Expand All @@ -201,6 +204,7 @@ public void testRecoveryWithOutOfOrderDeleteWithSoftDeletes() throws Exception {
orgReplica.flush(new FlushRequest().force(true).waitIfOngoing(true));
// index #2
orgReplica.applyIndexOperationOnReplica(
UUID.randomUUID().toString(),
2,
primaryTerm,
1,
Expand All @@ -212,6 +216,7 @@ public void testRecoveryWithOutOfOrderDeleteWithSoftDeletes() throws Exception {
orgReplica.updateGlobalCheckpointOnReplica(3L, "test");
// index #5 -> force NoOp #4.
orgReplica.applyIndexOperationOnReplica(
UUID.randomUUID().toString(),
5,
primaryTerm,
1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -1122,6 +1123,7 @@ protected Engine.IndexResult indexDoc(IndexShard shard, String id, String source
final long seqNo = shard.seqNoStats().getMaxSeqNo() + 1;
shard.advanceMaxSeqNoOfUpdatesOrDeletes(seqNo); // manually replicate max_seq_no_of_updates
result = shard.applyIndexOperationOnReplica(
UUID.randomUUID().toString(),
seqNo,
shard.getOperationPrimaryTerm(),
0,
Expand Down

0 comments on commit bb32f75

Please sign in to comment.