From 9fd0cdb20d70f8a925485a8d943c9d9f1ec97af2 Mon Sep 17 00:00:00 2001 From: Raghuvansh Raj Date: Thu, 9 Nov 2023 20:10:01 +0530 Subject: [PATCH] Changes made to reproduce retry_on_conflict error in TransportShardBulkAction Signed-off-by: Raghuvansh Raj --- .../index/engine/InternalEngine.java | 107 ++++++++++-------- 1 file changed, 59 insertions(+), 48 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 8e1627af274c5..1609c985a6288 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -1052,54 +1052,65 @@ private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException { final VersionValue versionValue = resolveDocVersion(index, index.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO); final long currentVersion; final boolean currentNotFoundOrDeleted; - if (versionValue == null) { - currentVersion = Versions.NOT_FOUND; - currentNotFoundOrDeleted = true; - } else { - currentVersion = versionValue.version; - currentNotFoundOrDeleted = versionValue.isDelete(); - } - if (index.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && currentNotFoundOrDeleted) { - final VersionConflictEngineException e = new VersionConflictEngineException( - shardId, - index.id(), - index.getIfSeqNo(), - index.getIfPrimaryTerm(), - SequenceNumbers.UNASSIGNED_SEQ_NO, - SequenceNumbers.UNASSIGNED_PRIMARY_TERM - ); - plan = IndexingStrategy.skipDueToVersionConflict(e, true, currentVersion); - } else if (index.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO - && (versionValue.seqNo != index.getIfSeqNo() || versionValue.term != index.getIfPrimaryTerm())) { - final VersionConflictEngineException e = new VersionConflictEngineException( - shardId, - index.id(), - index.getIfSeqNo(), - index.getIfPrimaryTerm(), - versionValue.seqNo, - versionValue.term - ); - plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion); - } else if (index.versionType().isVersionConflictForWrites(currentVersion, index.version(), currentNotFoundOrDeleted)) { - final VersionConflictEngineException e = new VersionConflictEngineException( - shardId, - index, - currentVersion, - currentNotFoundOrDeleted - ); - plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion); - } else { - final Exception reserveError = tryAcquireInFlightDocs(index, reservingDocs); - if (reserveError != null) { - plan = IndexingStrategy.failAsTooManyDocs(reserveError); - } else { - plan = IndexingStrategy.processNormally( - currentNotFoundOrDeleted, - canOptimizeAddDocument ? 1L : index.versionType().updateVersion(currentVersion, index.version()), - reservingDocs - ); - } - } + + // always throw a VersionConflictEngineException + final VersionConflictEngineException e = new VersionConflictEngineException( + shardId, + index.id(), + index.getIfSeqNo(), + index.getIfPrimaryTerm(), + SequenceNumbers.UNASSIGNED_SEQ_NO, + SequenceNumbers.UNASSIGNED_PRIMARY_TERM + ); + plan = IndexingStrategy.skipDueToVersionConflict(e, true, Versions.NOT_FOUND); +// if (versionValue == null) { +// currentVersion = Versions.NOT_FOUND; +// currentNotFoundOrDeleted = true; +// } else { +// currentVersion = versionValue.version; +// currentNotFoundOrDeleted = versionValue.isDelete(); +// } +// if (index.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && currentNotFoundOrDeleted) { +// final VersionConflictEngineException e = new VersionConflictEngineException( +// shardId, +// index.id(), +// index.getIfSeqNo(), +// index.getIfPrimaryTerm(), +// SequenceNumbers.UNASSIGNED_SEQ_NO, +// SequenceNumbers.UNASSIGNED_PRIMARY_TERM +// ); +// plan = IndexingStrategy.skipDueToVersionConflict(e, true, currentVersion); +// } else if (index.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO +// && (versionValue.seqNo != index.getIfSeqNo() || versionValue.term != index.getIfPrimaryTerm())) { +// final VersionConflictEngineException e = new VersionConflictEngineException( +// shardId, +// index.id(), +// index.getIfSeqNo(), +// index.getIfPrimaryTerm(), +// versionValue.seqNo, +// versionValue.term +// ); +// plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion); +// } else if (index.versionType().isVersionConflictForWrites(currentVersion, index.version(), currentNotFoundOrDeleted)) { +// final VersionConflictEngineException e = new VersionConflictEngineException( +// shardId, +// index, +// currentVersion, +// currentNotFoundOrDeleted +// ); +// plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion); +// } else { +// final Exception reserveError = tryAcquireInFlightDocs(index, reservingDocs); +// if (reserveError != null) { +// plan = IndexingStrategy.failAsTooManyDocs(reserveError); +// } else { +// plan = IndexingStrategy.processNormally( +// currentNotFoundOrDeleted, +// canOptimizeAddDocument ? 1L : index.versionType().updateVersion(currentVersion, index.version()), +// reservingDocs +// ); +// } +// } } return plan; }