From 7e440af9e4effc74e398ad77c2f5b00d514eee0f Mon Sep 17 00:00:00 2001 From: Sai Date: Wed, 8 Sep 2021 20:22:24 +0530 Subject: [PATCH] Changes to support retrieval of operations from translog based on specified range (#1210) * Changes to support retrieval of operations from translog based on specified range Signed-off-by: Sai Kumar * Addressed CR comments Signed-off-by: Sai Kumar * Added testcases for internal engine Signed-off-by: Sai Kumar --- .../org/opensearch/index/engine/Engine.java | 9 +++ .../index/engine/InternalEngine.java | 11 +++ .../MissingHistoryOperationsException.java | 2 +- .../opensearch/index/shard/IndexShard.java | 11 +++ .../opensearch/index/translog/Translog.java | 23 +++++-- .../index/engine/InternalEngineTests.java | 42 ++++++++++++ .../index/translog/TranslogTests.java | 68 +++++++++++++++++++ .../index/engine/EngineTestCase.java | 16 +++++ 8 files changed, 176 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index a6322b5cca008..53f077c44d5a9 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -742,6 +742,15 @@ public enum SearcherScope { public abstract Translog.Snapshot newChangesSnapshot(String source, MapperService mapperService, long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException; + /** + * Creates a new history snapshot from either Lucene/Translog for reading operations whose seqno in the requesting + * seqno range (both inclusive). + */ + public Translog.Snapshot newChangesSnapshot(String source, HistorySource historySource, MapperService mapperService, + long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException { + return newChangesSnapshot(source, mapperService, fromSeqNo, toSeqNo, requiredFullRange); + } + /** * Creates a new history snapshot for reading operations since {@code startingSeqNo} (inclusive). * The returned snapshot can be retrieved from either Lucene index or translog files. diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 50ad79ae27c87..0df1747156672 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -2702,6 +2702,17 @@ private void ensureSoftDeletesEnabled() { } } + @Override + public Translog.Snapshot newChangesSnapshot(String source, HistorySource historySource, MapperService mapperService, + long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException { + if(historySource == HistorySource.INDEX) { + return newChangesSnapshot(source, mapperService, fromSeqNo, toSeqNo, requiredFullRange); + } else { + return getTranslog().newSnapshot(fromSeqNo, toSeqNo, requiredFullRange); + } + } + + @Override public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperService, long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException { diff --git a/server/src/main/java/org/opensearch/index/engine/MissingHistoryOperationsException.java b/server/src/main/java/org/opensearch/index/engine/MissingHistoryOperationsException.java index 39defce5b1e59..7c7a32a57bf9a 100644 --- a/server/src/main/java/org/opensearch/index/engine/MissingHistoryOperationsException.java +++ b/server/src/main/java/org/opensearch/index/engine/MissingHistoryOperationsException.java @@ -38,7 +38,7 @@ */ public final class MissingHistoryOperationsException extends IllegalStateException { - MissingHistoryOperationsException(String message) { + public MissingHistoryOperationsException(String message) { super(message); } } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 0b3311f872c63..53977ca2c600d 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -2011,6 +2011,17 @@ public Translog.Snapshot getHistoryOperations(String reason, Engine.HistorySourc return getEngine().readHistoryOperations(reason, source, mapperService, startingSeqNo); } + /** + * + * Creates a new history snapshot for reading operations since + * the provided starting seqno (inclusive) and ending seqno (inclusive) + * The returned snapshot can be retrieved from either Lucene index or translog files. + */ + public Translog.Snapshot getHistoryOperations(String reason, Engine.HistorySource source, + long startingSeqNo, long endSeqNo) throws IOException { + return getEngine().newChangesSnapshot(reason, source, mapperService, startingSeqNo, endSeqNo, true); + } + /** * Checks if we have a completed history of operations since the given starting seqno (inclusive). * This method should be called after acquiring the retention lock; See {@link #acquireHistoryRetentionLock(Engine.HistorySource)} diff --git a/server/src/main/java/org/opensearch/index/translog/Translog.java b/server/src/main/java/org/opensearch/index/translog/Translog.java index 15104a4fc32be..3151e36785342 100644 --- a/server/src/main/java/org/opensearch/index/translog/Translog.java +++ b/server/src/main/java/org/opensearch/index/translog/Translog.java @@ -53,6 +53,7 @@ import org.opensearch.index.IndexSettings; import org.opensearch.index.VersionType; import org.opensearch.index.engine.Engine; +import org.opensearch.index.engine.MissingHistoryOperationsException; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.AbstractIndexShardComponent; import org.opensearch.index.shard.IndexShardComponent; @@ -614,7 +615,11 @@ final Checkpoint getLastSyncedCheckpoint() { // for testing public Snapshot newSnapshot() throws IOException { - return newSnapshot(0, Long.MAX_VALUE); + return newSnapshot(0, Long.MAX_VALUE, false); + } + + public Snapshot newSnapshot(long fromSeqNo, long toSeqNo) throws IOException { + return newSnapshot(fromSeqNo, toSeqNo, false); } /** @@ -624,7 +629,7 @@ public Snapshot newSnapshot() throws IOException { * @param toSeqNo the upper bound of the range (inclusive) * @return the new snapshot */ - public Snapshot newSnapshot(long fromSeqNo, long toSeqNo) throws IOException { + public Snapshot newSnapshot(long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException { assert fromSeqNo <= toSeqNo : fromSeqNo + " > " + toSeqNo; assert fromSeqNo >= 0 : "from_seq_no must be non-negative " + fromSeqNo; try (ReleasableLock ignored = readLock.acquire()) { @@ -633,7 +638,7 @@ public Snapshot newSnapshot(long fromSeqNo, long toSeqNo) throws IOException { .filter(reader -> reader.getCheckpoint().minSeqNo <= toSeqNo && fromSeqNo <= reader.getCheckpoint().maxEffectiveSeqNo()) .map(BaseTranslogReader::newSnapshot).toArray(TranslogSnapshot[]::new); final Snapshot snapshot = newMultiSnapshot(snapshots); - return new SeqNoFilterSnapshot(snapshot, fromSeqNo, toSeqNo); + return new SeqNoFilterSnapshot(snapshot, fromSeqNo, toSeqNo, requiredFullRange); } } @@ -959,14 +964,17 @@ default int skippedOperations() { private static final class SeqNoFilterSnapshot implements Snapshot { private final Snapshot delegate; private int filteredOpsCount; + private int opsCount; + private boolean requiredFullRange; private final long fromSeqNo; // inclusive private final long toSeqNo; // inclusive - SeqNoFilterSnapshot(Snapshot delegate, long fromSeqNo, long toSeqNo) { + SeqNoFilterSnapshot(Snapshot delegate, long fromSeqNo, long toSeqNo, boolean requiredFullRange) { assert fromSeqNo <= toSeqNo : "from_seq_no[" + fromSeqNo + "] > to_seq_no[" + toSeqNo + "]"; this.delegate = delegate; this.fromSeqNo = fromSeqNo; this.toSeqNo = toSeqNo; + this.requiredFullRange = requiredFullRange; } @Override @@ -980,15 +988,20 @@ public int skippedOperations() { } @Override - public Operation next() throws IOException { + public Operation next() throws IOException, MissingHistoryOperationsException { Translog.Operation op; while ((op = delegate.next()) != null) { if (fromSeqNo <= op.seqNo() && op.seqNo() <= toSeqNo) { + opsCount++; return op; } else { filteredOpsCount++; } } + if(requiredFullRange && (toSeqNo - fromSeqNo +1) != opsCount) { + throw new MissingHistoryOperationsException("Not all operations between from_seqno [" + fromSeqNo + "] " + + "and to_seqno [" + toSeqNo + "] found"); + } return null; } diff --git a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java index 01b8481e3ce8e..b6e24c2f1ac8a 100644 --- a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java @@ -5417,6 +5417,48 @@ public void testTrimUnsafeCommits() throws Exception { } } + public void testHistoryBasedOnSource() throws Exception { + final List operations = generateSingleDocHistory(false, + randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), 2, 10, 300, "1"); + final MergePolicy keepSoftDeleteDocsMP = new SoftDeletesRetentionMergePolicy( + Lucene.SOFT_DELETES_FIELD, () -> new MatchAllDocsQuery(), engine.config().getMergePolicy()); + Settings.Builder settings = Settings.builder() + .put(defaultSettings.getSettings()) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), randomLongBetween(0, 10)); + final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build(); + final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata); + Set expectedSeqNos = new HashSet<>(); + try (Store store = createStore(); + Engine engine = createEngine(config(indexSettings, store, createTempDir(), keepSoftDeleteDocsMP, null))) { + for (Engine.Operation op : operations) { + if (op instanceof Engine.Index) { + Engine.IndexResult indexResult = engine.index((Engine.Index) op); + assertThat(indexResult.getFailure(), nullValue()); + expectedSeqNos.add(indexResult.getSeqNo()); + } else { + Engine.DeleteResult deleteResult = engine.delete((Engine.Delete) op); + assertThat(deleteResult.getFailure(), nullValue()); + expectedSeqNos.add(deleteResult.getSeqNo()); + } + if (rarely()) { + engine.refresh("test"); + } + if (rarely()) { + engine.flush(); + } + if (rarely()) { + engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID()); + } + } + MapperService mapperService = createMapperService("test"); + List luceneOps = readAllOperationsBasedOnSource(engine, Engine.HistorySource.INDEX, mapperService); + List translogOps = readAllOperationsBasedOnSource(engine, Engine.HistorySource.TRANSLOG, mapperService); + assertThat(luceneOps.stream().map(o -> o.seqNo()).collect(Collectors.toList()), containsInAnyOrder(expectedSeqNos.toArray())); + assertThat(translogOps.stream().map(o -> o.seqNo()).collect(Collectors.toList()), containsInAnyOrder(expectedSeqNos.toArray())); + } + } + public void testLuceneHistoryOnPrimary() throws Exception { final List operations = generateSingleDocHistory(false, randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), 2, 10, 300, "1"); diff --git a/server/src/test/java/org/opensearch/index/translog/TranslogTests.java b/server/src/test/java/org/opensearch/index/translog/TranslogTests.java index 0c6bbceff80d8..221a2e65bfba7 100644 --- a/server/src/test/java/org/opensearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/TranslogTests.java @@ -75,6 +75,7 @@ import org.opensearch.index.VersionType; import org.opensearch.index.engine.Engine; import org.opensearch.index.engine.Engine.Operation.Origin; +import org.opensearch.index.engine.MissingHistoryOperationsException; import org.opensearch.index.mapper.IdFieldMapper; import org.opensearch.index.mapper.ParseContext.Document; import org.opensearch.index.mapper.ParsedDocument; @@ -120,6 +121,7 @@ import java.util.List; import java.util.Map; import java.util.Queue; +import java.util.Random; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -737,6 +739,72 @@ public void testRangeSnapshot() throws Exception { } } + private Long populateTranslogOps(boolean withMissingOps) throws IOException { + long minSeqNo = SequenceNumbers.NO_OPS_PERFORMED; + long maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED; + final int generations = between(2, 20); + long currentSeqNo = 0L; + List firstGenOps = null; + Map> operationsByGen = new HashMap<>(); + for (int gen = 0; gen < generations; gen++) { + List seqNos = new ArrayList<>(); + int numOps = randomIntBetween(4, 10); + for (int i = 0; i < numOps; i++, currentSeqNo++) { + minSeqNo = SequenceNumbers.min(minSeqNo, currentSeqNo); + maxSeqNo = SequenceNumbers.max(maxSeqNo, currentSeqNo); + seqNos.add(currentSeqNo); + } + Collections.shuffle(seqNos, new Random(100)); + List ops = new ArrayList<>(seqNos.size()); + for (long seqNo : seqNos) { + Translog.Index op = new Translog.Index("_doc", randomAlphaOfLength(10), seqNo, primaryTerm.get(), new byte[]{randomByte()}); + boolean shouldAdd = !withMissingOps || seqNo % 4 != 0; + if(shouldAdd) { + translog.add(op); + ops.add(op); + } + } + operationsByGen.put(translog.currentFileGeneration(), ops); + if(firstGenOps == null) { + firstGenOps = ops; + } + translog.rollGeneration(); + if (rarely()) { + translog.rollGeneration(); // empty generation + } + } + return currentSeqNo; + } + + public void testFullRangeSnapshot() throws Exception { + // Successful snapshot + long nextSeqNo = populateTranslogOps(false); + long fromSeqNo = 0L; + long toSeqNo = Math.max(nextSeqNo - 1, fromSeqNo + 15); + try (Translog.Snapshot snapshot = translog.newSnapshot(fromSeqNo, toSeqNo, true)) { + int totOps = 0; + for (Translog.Operation op = snapshot.next(); op != null; op = snapshot.next()) { + totOps++; + } + assertEquals(totOps, toSeqNo - fromSeqNo + 1); + } + } + + public void testFullRangeSnapshotWithFailures() throws Exception { + long nextSeqNo = populateTranslogOps(true); + long fromSeqNo = 0L; + long toSeqNo = Math.max(nextSeqNo-1, fromSeqNo + 15); + try (Translog.Snapshot snapshot = translog.newSnapshot(fromSeqNo, toSeqNo, true)) { + int totOps = 0; + for (Translog.Operation op = snapshot.next(); op != null; op = snapshot.next()) { + totOps++; + } + fail("Should throw exception for missing operations"); + } catch(MissingHistoryOperationsException e) { + assertTrue(e.getMessage().contains("Not all operations between from_seqno")); + } + } + public void assertFileIsPresent(Translog translog, long id) { if (Files.exists(translog.location().resolve(Translog.getFilename(id)))) { return; diff --git a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java index c4689363bdea1..ec223dc962d4b 100644 --- a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java @@ -1081,6 +1081,22 @@ public static List readAllOperationsInLucene(Engine engine, return operations; } + /** + * Reads all engine operations that have been processed by the engine from Lucene index/Translog based on source. + */ + public static List readAllOperationsBasedOnSource(Engine engine, Engine.HistorySource historySource, + MapperService mapper) throws IOException { + final List operations = new ArrayList<>(); + try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", historySource, mapper, + 0, Long.MAX_VALUE, false)) { + Translog.Operation op; + while ((op = snapshot.next()) != null){ + operations.add(op); + } + } + return operations; + } + /** * Asserts the provided engine has a consistent document history between translog and Lucene index. */