Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Refactor] LuceneChangesSnapshot to use accurate ops history #2452

Merged
merged 4 commits into from
Mar 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
---
"Return empty object if field doesn't exist, but index does":
- skip:
version: "all"
reason: "AwaitsFix https://github.com/opensearch-project/OpenSearch/issues/2440"

- do:
indices.create:
index: test_index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,7 @@ public void testShardChangesWithDefaultDocType() throws Exception {
}
IndexShard shard = indexService.getShard(0);
try (
Translog.Snapshot luceneSnapshot = shard.newChangesSnapshot("test", 0, numOps - 1, true);
Translog.Snapshot luceneSnapshot = shard.newChangesSnapshot("test", 0, numOps - 1, true, randomBoolean());
Translog.Snapshot translogSnapshot = getTranslog(shard).newSnapshot()
) {
List<Translog.Operation> opsFromLucene = TestTranslog.drainSnapshot(luceneSnapshot, true);
Expand Down
18 changes: 16 additions & 2 deletions server/src/main/java/org/opensearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -735,8 +735,22 @@ public enum SearcherScope {
* Creates a new history snapshot from Lucene for reading operations whose seqno in the requesting seqno range (both inclusive).
* This feature requires soft-deletes enabled. If soft-deletes are disabled, this method will throw an {@link IllegalStateException}.
*/
public abstract Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long toSeqNo, boolean requiredFullRange)
throws IOException;
public abstract Translog.Snapshot newChangesSnapshot(
String source,
long fromSeqNo,
long toSeqNo,
boolean requiredFullRange,
boolean accurateCount
) throws IOException;

/**
* Counts the number of history operations in the given sequence number range
* @param source source of the request
* @param fromSeqNo from sequence number; included
* @param toSeqNumber to sequence number; included
* @return number of history operations
*/
public abstract int countNumberOfHistoryOperations(String source, long fromSeqNo, long toSeqNumber) throws IOException;

public abstract boolean hasCompleteOperationHistory(String reason, long startingSeqNo);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2772,7 +2772,13 @@ long getNumDocUpdates() {
}

@Override
public Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
public Translog.Snapshot newChangesSnapshot(
String source,
long fromSeqNo,
long toSeqNo,
boolean requiredFullRange,
boolean accurateCount
) throws IOException {
ensureOpen();
refreshIfNeeded(source, toSeqNo);
Searcher searcher = acquireSearcher(source, SearcherScope.INTERNAL);
Expand All @@ -2782,7 +2788,8 @@ public Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long
LuceneChangesSnapshot.DEFAULT_BATCH_SIZE,
fromSeqNo,
toSeqNo,
requiredFullRange
requiredFullRange,
accurateCount
);
searcher = null;
return snapshot;
Expand All @@ -2798,6 +2805,21 @@ public Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long
}
}

public int countNumberOfHistoryOperations(String source, long fromSeqNo, long toSeqNo) throws IOException {
ensureOpen();
refreshIfNeeded(source, toSeqNo);
try (Searcher s = acquireSearcher(source, SearcherScope.INTERNAL)) {
return LuceneChangesSnapshot.countNumberOfHistoryOperations(s, fromSeqNo, toSeqNo);
} catch (IOException e) {
try {
maybeFailEngine(source, e);
} catch (Exception innerException) {
e.addSuppressed(innerException);
}
throw e;
}
}

public boolean hasCompleteOperationHistory(String reason, long startingSeqNo) {
return getMinRetainedSeqNo() <= startingSeqNo;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,19 @@
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.DocValuesFieldExistsQuery;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopFieldCollector;
import org.apache.lucene.util.ArrayUtil;
import org.opensearch.Version;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.common.lucene.search.Queries;
import org.opensearch.core.internal.io.IOUtils;
import org.opensearch.index.fieldvisitor.FieldsVisitor;
import org.opensearch.index.mapper.SeqNoFieldMapper;
Expand Down Expand Up @@ -88,8 +91,14 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
* @param toSeqNo the maximum requesting seq# - inclusive
* @param requiredFullRange if true, the snapshot will strictly check for the existence of operations between fromSeqNo and toSeqNo
*/
LuceneChangesSnapshot(Engine.Searcher engineSearcher, int searchBatchSize, long fromSeqNo, long toSeqNo, boolean requiredFullRange)
throws IOException {
LuceneChangesSnapshot(
Engine.Searcher engineSearcher,
int searchBatchSize,
long fromSeqNo,
long toSeqNo,
boolean requiredFullRange,
boolean accurateCount
) throws IOException {
if (fromSeqNo < 0 || toSeqNo < 0 || fromSeqNo > toSeqNo) {
throw new IllegalArgumentException("Invalid range; from_seqno [" + fromSeqNo + "], to_seqno [" + toSeqNo + "]");
}
Expand All @@ -111,7 +120,7 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
this.indexSearcher = new IndexSearcher(Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader()));
this.indexSearcher.setQueryCache(null);
this.parallelArray = new ParallelArray(this.searchBatchSize);
final TopDocs topDocs = searchOperations(null);
final TopDocs topDocs = searchOperations(null, accurateCount);
this.totalHits = Math.toIntExact(topDocs.totalHits.value);
this.scoreDocs = topDocs.scoreDocs;
fillParallelArray(scoreDocs, parallelArray);
Expand Down Expand Up @@ -187,7 +196,7 @@ private int nextDocIndex() throws IOException {
// we have processed all docs in the current search - fetch the next batch
if (docIndex == scoreDocs.length && docIndex > 0) {
final ScoreDoc prev = scoreDocs[scoreDocs.length - 1];
scoreDocs = searchOperations(prev).scoreDocs;
scoreDocs = searchOperations((FieldDoc) prev, false).scoreDocs;
fillParallelArray(scoreDocs, parallelArray);
docIndex = 0;
}
Expand Down Expand Up @@ -236,16 +245,31 @@ private void fillParallelArray(ScoreDoc[] scoreDocs, ParallelArray parallelArray
}
}

private TopDocs searchOperations(ScoreDoc after) throws IOException {
final Query rangeQuery = new BooleanQuery.Builder().add(
LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, Math.max(fromSeqNo, lastSeenSeqNo), toSeqNo),
BooleanClause.Occur.MUST
)
// exclude non-root nested documents
.add(new DocValuesFieldExistsQuery(SeqNoFieldMapper.PRIMARY_TERM_NAME), BooleanClause.Occur.MUST)
private static Query operationsRangeQuery(long fromSeqNo, long toSeqNo) {
return new BooleanQuery.Builder().add(LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, fromSeqNo, toSeqNo), BooleanClause.Occur.MUST)
.add(Queries.newNonNestedFilter(Version.CURRENT), BooleanClause.Occur.MUST) // exclude non-root nested docs
.build();
}

static int countNumberOfHistoryOperations(Engine.Searcher searcher, long fromSeqNo, long toSeqNo) throws IOException {
if (fromSeqNo > toSeqNo || fromSeqNo < 0 || toSeqNo < 0) {
throw new IllegalArgumentException("Invalid sequence range; fromSeqNo [" + fromSeqNo + "] toSeqNo [" + toSeqNo + "]");
}
IndexSearcher indexSearcher = new IndexSearcher(Lucene.wrapAllDocsLive(searcher.getDirectoryReader()));
return indexSearcher.count(operationsRangeQuery(fromSeqNo, toSeqNo));
}

private TopDocs searchOperations(FieldDoc after, boolean accurate) throws IOException {
final Query rangeQuery = operationsRangeQuery(Math.max(fromSeqNo, lastSeenSeqNo), toSeqNo);
final Sort sortedBySeqNo = new Sort(new SortField(SeqNoFieldMapper.NAME, SortField.Type.LONG));
return indexSearcher.searchAfter(after, rangeQuery, searchBatchSize, sortedBySeqNo);
final TopFieldCollector topFieldCollector = TopFieldCollector.create(
sortedBySeqNo,
searchBatchSize,
after,
accurate ? Integer.MAX_VALUE : 0
);
indexSearcher.search(rangeQuery, topFieldCollector);
return topFieldCollector.topDocs();
}

private Translog.Operation readDocAsOp(int docIndex) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,10 +325,23 @@ public Closeable acquireHistoryRetentionLock() {
}

@Override
public Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long toSeqNo, boolean requiredFullRange) {
public Translog.Snapshot newChangesSnapshot(
String source,
long fromSeqNo,
long toSeqNo,
boolean requiredFullRange,
boolean accurateCount
) {
return newEmptySnapshot();
}

@Override
public int countNumberOfHistoryOperations(String source, long fromSeqNo, long toSeqNo) throws IOException {
try (Translog.Snapshot snapshot = newChangesSnapshot(source, fromSeqNo, toSeqNo, false, true)) {
return snapshot.totalOperations();
}
}

public boolean hasCompleteOperationHistory(String reason, long startingSeqNo) {
// we can do operation-based recovery if we don't have to replay any operation.
return startingSeqNo > seqNoStats.getMaxSeqNo();
Expand Down
27 changes: 22 additions & 5 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -2231,13 +2231,13 @@ public Closeable acquireHistoryRetentionLock() {
}

/**
*
* Creates a new history snapshot for reading operations since
* the provided starting seqno (inclusive) and ending seqno (inclusive)
* The returned snapshot can be retrieved from either Lucene index or translog files.
*/
public Translog.Snapshot getHistoryOperations(String reason, long startingSeqNo, long endSeqNo) throws IOException {
return getEngine().newChangesSnapshot(reason, startingSeqNo, endSeqNo, true);
nknize marked this conversation as resolved.
Show resolved Hide resolved
public Translog.Snapshot getHistoryOperations(String reason, long startingSeqNo, long endSeqNo, boolean accurateCount)
throws IOException {
return getEngine().newChangesSnapshot(reason, startingSeqNo, endSeqNo, true, accurateCount);
}

/**
Expand All @@ -2257,6 +2257,17 @@ public long getMinRetainedSeqNo() {
return getEngine().getMinRetainedSeqNo();
}

/**
* Counts the number of history operations within the provided sequence numbers
* @param source source of the requester (e.g., peer-recovery)
* @param fromSeqNo from sequence number, included
* @param toSeqNo to sequence number, included
* @return number of history operations in the sequence number range
*/
public int countNumberOfHistoryOperations(String source, long fromSeqNo, long toSeqNo) throws IOException {
return getEngine().countNumberOfHistoryOperations(source, fromSeqNo, toSeqNo);
}

/**
* Creates a new changes snapshot for reading operations whose seq_no are between {@code fromSeqNo}(inclusive)
* and {@code toSeqNo}(inclusive). The caller has to close the returned snapshot after finishing the reading.
Expand All @@ -2268,8 +2279,14 @@ public long getMinRetainedSeqNo() {
* if any operation between {@code fromSeqNo} and {@code toSeqNo} is missing.
* This parameter should be only enabled when the entire requesting range is below the global checkpoint.
*/
public Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
return getEngine().newChangesSnapshot(source, fromSeqNo, toSeqNo, requiredFullRange);
public Translog.Snapshot newChangesSnapshot(
String source,
long fromSeqNo,
long toSeqNo,
boolean requiredFullRange,
boolean accurateCount
) throws IOException {
return getEngine().newChangesSnapshot(source, fromSeqNo, toSeqNo, requiredFullRange, accurateCount);
}

public List<Segment> segments(boolean verbose) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void resync(final IndexShard indexShard, final ActionListener<ResyncTask>
// Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender.
// Even though those calls are not concurrent, snapshot.next() uses non-synchronized state and is not multi-thread-compatible
// Also fail the resync early if the shard is shutting down
snapshot = indexShard.newChangesSnapshot("resync", startingSeqNo, Long.MAX_VALUE, false);
snapshot = indexShard.newChangesSnapshot("resync", startingSeqNo, Long.MAX_VALUE, false, true);
final Translog.Snapshot originalSnapshot = snapshot;
final Translog.Snapshot wrappedSnapshot = new Translog.Snapshot() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public class RecoverySourceHandler {
private final CancellableThreads cancellableThreads = new CancellableThreads();
private final List<Closeable> resources = new CopyOnWriteArrayList<>();
private final ListenableFuture<RecoveryResponse> future = new ListenableFuture<>();
private static final String PEER_RECOVERY_NAME = "peer-recovery";
public static final String PEER_RECOVERY_NAME = "peer-recovery";

public RecoverySourceHandler(
IndexShard shard,
Expand Down Expand Up @@ -272,7 +272,7 @@ && isTargetSameHistory()
logger.trace("performing file-based recovery followed by history replay starting at [{}]", startingSeqNo);

try {
final int estimateNumOps = estimateNumberOfHistoryOperations(startingSeqNo);
final int estimateNumOps = countNumberOfHistoryOperations(startingSeqNo);
final Releasable releaseStore = acquireStore(shard.store());
resources.add(releaseStore);
sendFileStep.whenComplete(r -> IOUtils.close(wrappedSafeCommit, releaseStore), e -> {
Expand Down Expand Up @@ -319,7 +319,7 @@ && isTargetSameHistory()
sendFileStep.whenComplete(r -> {
assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[prepareTargetForTranslog]");
// For a sequence based recovery, the target can keep its local translog
prepareTargetForTranslog(estimateNumberOfHistoryOperations(startingSeqNo), prepareEngineStep);
prepareTargetForTranslog(countNumberOfHistoryOperations(startingSeqNo), prepareEngineStep);
}, onFailure);

prepareEngineStep.whenComplete(prepareEngineTime -> {
Expand All @@ -340,9 +340,15 @@ && isTargetSameHistory()

final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();
if (logger.isTraceEnabled()) {
logger.trace("snapshot translog for recovery; current size is [{}]", estimateNumberOfHistoryOperations(startingSeqNo));
logger.trace("snapshot translog for recovery; current size is [{}]", countNumberOfHistoryOperations(startingSeqNo));
}
final Translog.Snapshot phase2Snapshot = shard.newChangesSnapshot(PEER_RECOVERY_NAME, startingSeqNo, Long.MAX_VALUE, false);
final Translog.Snapshot phase2Snapshot = shard.newChangesSnapshot(
PEER_RECOVERY_NAME,
startingSeqNo,
Long.MAX_VALUE,
false,
true
);
resources.add(phase2Snapshot);
retentionLock.close();

Expand Down Expand Up @@ -403,10 +409,13 @@ private boolean isTargetSameHistory() {
return targetHistoryUUID.equals(shard.getHistoryUUID());
}

private int estimateNumberOfHistoryOperations(long startingSeqNo) throws IOException {
try (Translog.Snapshot snapshot = shard.newChangesSnapshot(PEER_RECOVERY_NAME, startingSeqNo, Long.MAX_VALUE, false)) {
return snapshot.totalOperations();
}
/**
* Counts the number of history operations from the starting sequence number
* @param startingSeqNo the starting sequence number to count; included
* @return number of history operations
*/
private int countNumberOfHistoryOperations(long startingSeqNo) throws IOException {
return shard.countNumberOfHistoryOperations(PEER_RECOVERY_NAME, startingSeqNo, Long.MAX_VALUE);
}

static void runUnderPrimaryPermit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,11 +344,11 @@ public void finalizeRecovery(final long globalCheckpoint, final long trimAboveSe

private boolean hasUncommittedOperations() throws IOException {
long localCheckpointOfCommit = Long.parseLong(indexShard.commitStats().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
try (
Translog.Snapshot snapshot = indexShard.newChangesSnapshot("peer-recovery", localCheckpointOfCommit + 1, Long.MAX_VALUE, false)
) {
return snapshot.totalOperations() > 0;
}
return indexShard.countNumberOfHistoryOperations(
RecoverySourceHandler.PEER_RECOVERY_NAME,
localCheckpointOfCommit + 1,
Long.MAX_VALUE
) > 0;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6362,8 +6362,12 @@ public void onFailure(Exception e) {
@Override
protected void doRun() throws Exception {
latch.await();
Translog.Snapshot changes = engine.newChangesSnapshot("test", min, max, true);
changes.close();
if (randomBoolean()) {
Translog.Snapshot changes = engine.newChangesSnapshot("test", min, max, true, randomBoolean());
changes.close();
} else {
engine.countNumberOfHistoryOperations("test", min, max);
}
}
});
snapshotThreads[i].start();
Expand Down
Loading