diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index 4c7b3fe25296e..bd6490af5c071 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -187,6 +187,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexSettings.FINAL_PIPELINE, MetadataIndexStateService.VERIFIED_BEFORE_CLOSE_SETTING, ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_SETTING, + IndexSettings.INDEX_SEGMENT_REPLICATION_SETTING, // validate that built-in similarities don't get redefined Setting.groupSetting("index.similarity.", (s) -> { diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 45d9a57442049..d5ea7a037af5d 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -503,6 +503,16 @@ public final class IndexSettings { Setting.Property.IndexScope ); + /** + * Used to specify if the index should use segment replication. If false, document replication is used. + */ + public static final Setting INDEX_SEGMENT_REPLICATION_SETTING = Setting.boolSetting( + "index.replication.segment_replication", + false, + Property.IndexScope, + Property.Final + ); + private final Index index; private final Version version; private final Logger logger; diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index 2d9cba2ee0926..ade8317f292ae 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -352,6 +352,8 @@ public Condition newCondition() { */ public abstract IndexResult index(Index index) throws IOException; + public abstract IndexResult addIndexOperationToTranslog(Index index) throws IOException; + /** * Perform document delete operation on the engine * @param delete operation to perform diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 1edd0c67c3317..c38cd11840130 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -1025,23 +1025,7 @@ public IndexResult index(Index index) throws IOException { } } if (index.origin().isFromTranslog() == false) { - final Translog.Location location; - if (indexResult.getResultType() == Result.Type.SUCCESS) { - location = translog.add(new Translog.Index(index, indexResult)); - } else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { - // if we have document failure, record it as a no-op in the translog and Lucene with the generated seq_no - final NoOp noOp = new NoOp( - indexResult.getSeqNo(), - index.primaryTerm(), - index.origin(), - index.startTime(), - indexResult.getFailure().toString() - ); - location = innerNoOp(noOp).getTranslogLocation(); - } else { - location = null; - } - indexResult.setTranslogLocation(location); + addIndexOperationToTranslog(index, indexResult); } if (plan.indexIntoLucene && indexResult.getResultType() == Result.Type.SUCCESS) { final Translog.Location translogLocation = trackTranslogLocation.get() ? indexResult.getTranslogLocation() : null; @@ -1076,6 +1060,42 @@ public IndexResult index(Index index) throws IOException { } } + @Override + public Engine.IndexResult addIndexOperationToTranslog(Index index) throws IOException { + IndexingStrategy plan = indexingStrategyForOperation(index); + /** + * Matches the logic in {@link #indexIntoLucene(Index, IndexingStrategy)} + */ + IndexResult indexResult = new IndexResult( + plan.versionForIndexing, + index.primaryTerm(), + index.seqNo(), + plan.currentNotFoundOrDeleted + ); + addIndexOperationToTranslog(index, indexResult); + indexResult.setTook(System.nanoTime() - index.startTime()); + indexResult.freeze(); + return indexResult; + } + + private void addIndexOperationToTranslog(Index index, IndexResult indexResult) throws IOException { + Translog.Location location = null; + if (indexResult.getResultType() == Result.Type.SUCCESS) { + location = translog.add(new Translog.Index(index, indexResult)); + } else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { + // if we have document failure, record it as a no-op in the translog and Lucene with the generated seq_no + final NoOp noOp = new NoOp( + indexResult.getSeqNo(), + index.primaryTerm(), + index.origin(), + index.startTime(), + indexResult.getFailure().toString() + ); + location = innerNoOp(noOp).getTranslogLocation(); + } + indexResult.setTranslogLocation(location); + } + protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOException { assert assertNonPrimaryOrigin(index); // needs to maintain the auto_id timestamp in case this replica becomes primary diff --git a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java index d9cf8e2cd65fe..e936ae10b04ea 100644 --- a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java @@ -294,6 +294,12 @@ public IndexResult index(Index index) { throw new UnsupportedOperationException("indexing is not supported on a read-only engine"); } + @Override + public IndexResult addIndexOperationToTranslog(Index index) throws IOException { + assert false : "this should not be called"; + throw new UnsupportedOperationException("Translog operations are not supported on a read-only engine"); + } + @Override public DeleteResult delete(Delete delete) { assert false : "this should not be called"; diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index b93fe71e3f70b..9c8fe44a9fbcd 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -830,21 +830,104 @@ public Engine.IndexResult applyIndexOperationOnReplica( boolean isRetry, SourceToParse sourceToParse ) throws IOException { - return applyIndexOperation( - getEngine(), + Boolean isSegRepEnabled = indexSettings.getValue(IndexSettings.INDEX_SEGMENT_REPLICATION_SETTING); + if (isSegRepEnabled != null && isSegRepEnabled) { + Engine.Index index; + try { + index = parseSourceAndPrepareIndex( + seqNo, + opPrimaryTerm, + version, + null, + UNASSIGNED_SEQ_NO, + 0, + autoGeneratedTimeStamp, + isRetry, + Engine.Operation.Origin.REPLICA, + sourceToParse + ); + Mapping update = index.parsedDoc().dynamicMappingsUpdate(); + if (update != null) { + return new Engine.IndexResult(update); + } + } catch (Exception e) { + return handleIndexFailure(seqNo, opPrimaryTerm, version, e); + } + return getEngine().addIndexOperationToTranslog(index); + } else { + return applyIndexOperation( + getEngine(), + seqNo, + opPrimaryTerm, + version, + null, + UNASSIGNED_SEQ_NO, + 0, + autoGeneratedTimeStamp, + isRetry, + Engine.Operation.Origin.REPLICA, + sourceToParse + ); + } + } + + private Engine.Index parseSourceAndPrepareIndex( + long seqNo, + long opPrimaryTerm, + long version, + @Nullable VersionType versionType, + long ifSeqNo, + long ifPrimaryTerm, + long autoGeneratedTimeStamp, + boolean isRetry, + Engine.Operation.Origin origin, + SourceToParse sourceToParse + ) throws Exception { + assert opPrimaryTerm <= getOperationPrimaryTerm() : "op term [ " + + opPrimaryTerm + + " ] > shard term [" + + getOperationPrimaryTerm() + + "]"; + ensureWriteAllowed(origin); + Engine.Index operation; + final String resolvedType = mapperService.resolveDocumentType(sourceToParse.type()); + final SourceToParse sourceWithResolvedType; + if (resolvedType.equals(sourceToParse.type())) { + sourceWithResolvedType = sourceToParse; + } else { + sourceWithResolvedType = new SourceToParse( + sourceToParse.index(), + resolvedType, + sourceToParse.id(), + sourceToParse.source(), + sourceToParse.getXContentType(), + sourceToParse.routing() + ); + } + return prepareIndex( + docMapper(resolvedType), + sourceWithResolvedType, seqNo, opPrimaryTerm, version, - null, - UNASSIGNED_SEQ_NO, - 0, + versionType, + origin, autoGeneratedTimeStamp, isRetry, - Engine.Operation.Origin.REPLICA, - sourceToParse + ifSeqNo, + ifPrimaryTerm ); } + private Engine.IndexResult handleIndexFailure(long seqNo, long opPrimaryTerm, long version, Exception e) { + // We treat any exception during parsing and or mapping update as a document level failure + // with the exception side effects of closing the shard. Since we don't have the shard, we + // can not raise an exception that may block any replication of previous operations to the + // replicas + verifyNotClosed(e); + return new Engine.IndexResult(e, version, opPrimaryTerm, seqNo); + } + private Engine.IndexResult applyIndexOperation( Engine engine, long seqNo, @@ -858,52 +941,26 @@ private Engine.IndexResult applyIndexOperation( Engine.Operation.Origin origin, SourceToParse sourceToParse ) throws IOException { - assert opPrimaryTerm <= getOperationPrimaryTerm() : "op term [ " - + opPrimaryTerm - + " ] > shard term [" - + getOperationPrimaryTerm() - + "]"; - ensureWriteAllowed(origin); Engine.Index operation; try { - final String resolvedType = mapperService.resolveDocumentType(sourceToParse.type()); - final SourceToParse sourceWithResolvedType; - if (resolvedType.equals(sourceToParse.type())) { - sourceWithResolvedType = sourceToParse; - } else { - sourceWithResolvedType = new SourceToParse( - sourceToParse.index(), - resolvedType, - sourceToParse.id(), - sourceToParse.source(), - sourceToParse.getXContentType(), - sourceToParse.routing() - ); - } - operation = prepareIndex( - docMapper(resolvedType), - sourceWithResolvedType, + operation = parseSourceAndPrepareIndex( seqNo, opPrimaryTerm, version, versionType, - origin, + ifSeqNo, + ifPrimaryTerm, autoGeneratedTimeStamp, isRetry, - ifSeqNo, - ifPrimaryTerm + origin, + sourceToParse ); Mapping update = operation.parsedDoc().dynamicMappingsUpdate(); if (update != null) { return new Engine.IndexResult(update); } } catch (Exception e) { - // We treat any exception during parsing and or mapping update as a document level failure - // with the exception side effects of closing the shard. Since we don't have the shard, we - // can not raise an exception that may block any replication of previous operations to the - // replicas - verifyNotClosed(e); - return new Engine.IndexResult(e, version, opPrimaryTerm, seqNo); + return handleIndexFailure(seqNo, opPrimaryTerm, version, e); } return index(engine, operation);