diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java
index 047d632c44392..c242d98b4b65c 100644
--- a/server/src/main/java/org/opensearch/index/engine/Engine.java
+++ b/server/src/main/java/org/opensearch/index/engine/Engine.java
@@ -169,6 +169,12 @@ public final EngineConfig config() {
 
     protected abstract SegmentInfos getLastCommittedSegmentInfos();
 
+    /**
+     * Return the latest active SegmentInfos from the engine.
+     * @return {@link SegmentInfos}
+     */
+    protected abstract SegmentInfos getLatestSegmentInfos();
+
     public MergeStats getMergeStats() {
         return new MergeStats();
     }
@@ -176,6 +182,17 @@ public MergeStats getMergeStats() {
     /** returns the history uuid for the engine */
     public abstract String getHistoryUUID();
 
+    /**
+     * Reads the current stored history ID from commit data.
+     */
+    String loadHistoryUUID(Map<String, String> commitData) {
+        final String uuid = commitData.get(HISTORY_UUID_KEY);
+        if (uuid == null) {
+            throw new IllegalStateException("commit doesn't contain history uuid");
+        }
+        return uuid;
+    }
+
     /** Returns how many bytes we are currently moving from heap to disk */
     public abstract long getWritingBytes();
 
diff --git a/server/src/main/java/org/opensearch/index/engine/EngineConfig.java b/server/src/main/java/org/opensearch/index/engine/EngineConfig.java
index 0ea4a96a72362..4ae6646ed14f0 100644
--- a/server/src/main/java/org/opensearch/index/engine/EngineConfig.java
+++ b/server/src/main/java/org/opensearch/index/engine/EngineConfig.java
@@ -97,6 +97,7 @@ public final class EngineConfig {
     private final CircuitBreakerService circuitBreakerService;
     private final LongSupplier globalCheckpointSupplier;
     private final Supplier<RetentionLeases> retentionLeasesSupplier;
+    private final boolean isReadOnlyReplica;
 
     /**
      * A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been
@@ -228,6 +229,66 @@ public EngineConfig(
         LongSupplier primaryTermSupplier,
         TombstoneDocSupplier tombstoneDocSupplier
     ) {
+        this(
+            shardId,
+            threadPool,
+            indexSettings,
+            warmer,
+            store,
+            mergePolicy,
+            analyzer,
+            similarity,
+            codecService,
+            eventListener,
+            queryCache,
+            queryCachingPolicy,
+            translogConfig,
+            translogDeletionPolicyFactory,
+            flushMergesAfter,
+            externalRefreshListener,
+            internalRefreshListener,
+            indexSort,
+            circuitBreakerService,
+            globalCheckpointSupplier,
+            retentionLeasesSupplier,
+            primaryTermSupplier,
+            tombstoneDocSupplier,
+            false
+        );
+    }
+
+    /**
+     * Creates a new {@link org.opensearch.index.engine.EngineConfig}
+     */
+    EngineConfig(
+        ShardId shardId,
+        ThreadPool threadPool,
+        IndexSettings indexSettings,
+        Engine.Warmer warmer,
+        Store store,
+        MergePolicy mergePolicy,
+        Analyzer analyzer,
+        Similarity similarity,
+        CodecService codecService,
+        Engine.EventListener eventListener,
+        QueryCache queryCache,
+        QueryCachingPolicy queryCachingPolicy,
+        TranslogConfig translogConfig,
+        TranslogDeletionPolicyFactory translogDeletionPolicyFactory,
+        TimeValue flushMergesAfter,
+        List<ReferenceManager.RefreshListener> externalRefreshListener,
+        List<ReferenceManager.RefreshListener> internalRefreshListener,
+        Sort indexSort,
+        CircuitBreakerService circuitBreakerService,
+        LongSupplier globalCheckpointSupplier,
+        Supplier<RetentionLeases> retentionLeasesSupplier,
+        LongSupplier primaryTermSupplier,
+        TombstoneDocSupplier tombstoneDocSupplier,
+        boolean isReadOnlyReplica
+    ) {
+        if (isReadOnlyReplica && indexSettings.isSegRepEnabled() == false) {
+            throw new IllegalArgumentException("Shard can only be wired as a read only replica with Segment Replication enabled");
+        }
         this.shardId = shardId;
         this.indexSettings = indexSettings;
         this.threadPool = threadPool;
@@ -266,6 +327,7 @@ public EngineConfig(
         this.retentionLeasesSupplier = Objects.requireNonNull(retentionLeasesSupplier);
         this.primaryTermSupplier = primaryTermSupplier;
         this.tombstoneDocSupplier = tombstoneDocSupplier;
+        this.isReadOnlyReplica = isReadOnlyReplica;
     }
 
     /**
@@ -460,6 +522,16 @@ public LongSupplier getPrimaryTermSupplier() {
         return primaryTermSupplier;
     }
 
+    /**
+     * Returns if this replica should be wired as a read only.
+     * This is used for Segment Replication where the engine implementation used is dependent on
+     * if the shard is a primary/replica.
+     * @return true if this engine should be wired as read only.
+     */
+    public boolean isReadOnlyReplica() {
+        return indexSettings.isSegRepEnabled() && isReadOnlyReplica;
+    }
+
     /**
      * A supplier supplies tombstone documents which will be used in soft-update methods.
      * The returned document consists only _uid, _seqno, _term and _version fields; other metadata fields are excluded.
diff --git a/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java b/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java
index afab57905a9a7..c8aec3570f8b5 100644
--- a/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java
+++ b/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java
@@ -146,7 +146,8 @@ public EngineConfig newEngineConfig(
         LongSupplier globalCheckpointSupplier,
         Supplier<RetentionLeases> retentionLeasesSupplier,
         LongSupplier primaryTermSupplier,
-        EngineConfig.TombstoneDocSupplier tombstoneDocSupplier
+        EngineConfig.TombstoneDocSupplier tombstoneDocSupplier,
+        boolean isReadOnlyReplica
     ) {
         CodecService codecServiceToUse = codecService;
         if (codecService == null && this.codecServiceFactory != null) {
@@ -176,7 +177,8 @@ public EngineConfig newEngineConfig(
             globalCheckpointSupplier,
             retentionLeasesSupplier,
             primaryTermSupplier,
-            tombstoneDocSupplier
+            tombstoneDocSupplier,
+            isReadOnlyReplica
         );
     }
 
diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java
index eb91478b97adc..e60e650372ec4 100644
--- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java
+++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java
@@ -49,6 +49,7 @@
 import org.apache.lucene.index.SegmentInfos;
 import org.apache.lucene.index.ShuffleForcedMergePolicy;
 import org.apache.lucene.index.SoftDeletesRetentionMergePolicy;
+import org.apache.lucene.index.StandardDirectoryReader;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.sandbox.index.MergeOnFlushMergePolicy;
 import org.apache.lucene.search.BooleanClause;
@@ -648,17 +649,6 @@ public long getWritingBytes() {
         return indexWriter.getFlushingBytes() + versionMap.getRefreshingBytes();
     }
 
-    /**
-     * Reads the current stored history ID from the IW commit data.
-     */
-    private String loadHistoryUUID(Map<String, String> commitData) {
-        final String uuid = commitData.get(HISTORY_UUID_KEY);
-        if (uuid == null) {
-            throw new IllegalStateException("commit doesn't contain history uuid");
-        }
-        return uuid;
-    }
-
     private ExternalReaderManager createReaderManager(RefreshWarmerListener externalRefreshListener) throws EngineException {
         boolean success = false;
         OpenSearchReaderManager internalReaderManager = null;
@@ -2298,6 +2288,23 @@ protected SegmentInfos getLastCommittedSegmentInfos() {
         return lastCommittedSegmentInfos;
     }
 
+    @Override
+    public SegmentInfos getLatestSegmentInfos() {
+        OpenSearchDirectoryReader reader = null;
+        try {
+            reader = internalReaderManager.acquire();
+            return ((StandardDirectoryReader) reader.getDelegate()).getSegmentInfos();
+        } catch (IOException e) {
+            throw new EngineException(shardId, e.getMessage(), e);
+        } finally {
+            try {
+                internalReaderManager.release(reader);
+            } catch (IOException e) {
+                throw new EngineException(shardId, e.getMessage(), e);
+            }
+        }
+    }
+
     @Override
     protected final void writerSegmentStats(SegmentsStats stats) {
         stats.addVersionMapMemoryInBytes(versionMap.ramBytesUsed());
diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java
new file mode 100644
index 0000000000000..106643198cc3b
--- /dev/null
+++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java
@@ -0,0 +1,482 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.index.engine;
+
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.index.SegmentInfos;
+import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper;
+import org.apache.lucene.search.ReferenceManager;
+import org.opensearch.common.concurrent.GatedCloseable;
+import org.opensearch.common.lucene.Lucene;
+import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
+import org.opensearch.common.unit.ByteSizeValue;
+import org.opensearch.common.unit.TimeValue;
+import org.opensearch.common.util.concurrent.ReleasableLock;
+import org.opensearch.core.internal.io.IOUtils;
+import org.opensearch.index.seqno.LocalCheckpointTracker;
+import org.opensearch.index.seqno.SeqNoStats;
+import org.opensearch.index.seqno.SequenceNumbers;
+import org.opensearch.index.translog.DefaultTranslogDeletionPolicy;
+import org.opensearch.index.translog.Translog;
+import org.opensearch.index.translog.TranslogConfig;
+import org.opensearch.index.translog.TranslogDeletionPolicy;
+import org.opensearch.index.translog.TranslogStats;
+import org.opensearch.search.suggest.completion.CompletionStats;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CountDownLatch;
+import java.util.function.BiFunction;
+import java.util.function.LongConsumer;
+import java.util.function.LongSupplier;
+import java.util.stream.Stream;
+
+/**
+ * This is an {@link Engine} implementation intended for replica shards when Segment Replication
+ * is enabled.  This Engine does not create an IndexWriter, rather it refreshes a {@link NRTReplicationReaderManager}
+ * with new Segments when received from an external source.
+ *
+ * @opensearch.internal
+ */
+public class NRTReplicationEngine extends Engine {
+
+    private volatile SegmentInfos lastCommittedSegmentInfos;
+    private final NRTReplicationReaderManager readerManager;
+    private final CompletionStatsCache completionStatsCache;
+    private final LocalCheckpointTracker localCheckpointTracker;
+    private final Translog translog;
+
+    public NRTReplicationEngine(EngineConfig engineConfig) {
+        super(engineConfig);
+        store.incRef();
+        NRTReplicationReaderManager readerManager = null;
+        try {
+            lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
+            readerManager = new NRTReplicationReaderManager(OpenSearchDirectoryReader.wrap(getDirectoryReader(), shardId));
+            final SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(
+                this.lastCommittedSegmentInfos.getUserData().entrySet()
+            );
+            this.localCheckpointTracker = new LocalCheckpointTracker(commitInfo.maxSeqNo, commitInfo.localCheckpoint);
+            this.completionStatsCache = new CompletionStatsCache(() -> acquireSearcher("completion_stats"));
+            this.readerManager = readerManager;
+            this.readerManager.addListener(completionStatsCache);
+            this.translog = openTranslog(
+                engineConfig,
+                getTranslogDeletionPolicy(engineConfig),
+                engineConfig.getGlobalCheckpointSupplier(),
+                localCheckpointTracker::markSeqNoAsPersisted
+            );
+        } catch (IOException e) {
+            IOUtils.closeWhileHandlingException(store::decRef, readerManager);
+            throw new EngineCreationFailureException(shardId, "failed to create engine", e);
+        }
+    }
+
+    public synchronized void updateSegments(final SegmentInfos infos, long seqNo) throws IOException {
+        // Update the current infos reference on the Engine's reader.
+        readerManager.updateSegments(infos);
+
+        // only update the persistedSeqNo and "lastCommitted" infos reference if the incoming segments have a higher
+        // generation. We can still refresh with incoming SegmentInfos that are not part of a commit point.
+        if (infos.getGeneration() > lastCommittedSegmentInfos.getGeneration()) {
+            this.lastCommittedSegmentInfos = infos;
+            rollTranslogGeneration();
+        }
+        localCheckpointTracker.fastForwardProcessedSeqNo(seqNo);
+    }
+
+    @Override
+    public String getHistoryUUID() {
+        return loadHistoryUUID(lastCommittedSegmentInfos.userData);
+    }
+
+    @Override
+    public long getWritingBytes() {
+        return 0;
+    }
+
+    @Override
+    public CompletionStats completionStats(String... fieldNamePatterns) {
+        return completionStatsCache.get(fieldNamePatterns);
+    }
+
+    @Override
+    public long getIndexThrottleTimeInMillis() {
+        return 0;
+    }
+
+    @Override
+    public boolean isThrottled() {
+        return false;
+    }
+
+    @Override
+    public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws EngineException {
+        try (ReleasableLock lock = readLock.acquire()) {
+            ensureOpen();
+            translog.trimOperations(belowTerm, aboveSeqNo);
+        } catch (Exception e) {
+            try {
+                failEngine("translog operations trimming failed", e);
+            } catch (Exception inner) {
+                e.addSuppressed(inner);
+            }
+            throw new EngineException(shardId, "failed to trim translog operations", e);
+        }
+    }
+
+    @Override
+    public IndexResult index(Index index) throws IOException {
+        ensureOpen();
+        IndexResult indexResult = new IndexResult(index.version(), index.primaryTerm(), index.seqNo(), false);
+        final Translog.Location location = translog.add(new Translog.Index(index, indexResult));
+        indexResult.setTranslogLocation(location);
+        indexResult.setTook(System.nanoTime() - index.startTime());
+        indexResult.freeze();
+        localCheckpointTracker.advanceMaxSeqNo(index.seqNo());
+        return indexResult;
+    }
+
+    @Override
+    public DeleteResult delete(Delete delete) throws IOException {
+        ensureOpen();
+        DeleteResult deleteResult = new DeleteResult(delete.version(), delete.primaryTerm(), delete.seqNo(), true);
+        final Translog.Location location = translog.add(new Translog.Delete(delete, deleteResult));
+        deleteResult.setTranslogLocation(location);
+        deleteResult.setTook(System.nanoTime() - delete.startTime());
+        deleteResult.freeze();
+        localCheckpointTracker.advanceMaxSeqNo(delete.seqNo());
+        return deleteResult;
+    }
+
+    @Override
+    public NoOpResult noOp(NoOp noOp) throws IOException {
+        ensureOpen();
+        NoOpResult noOpResult = new NoOpResult(noOp.primaryTerm(), noOp.seqNo());
+        final Translog.Location location = translog.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason()));
+        noOpResult.setTranslogLocation(location);
+        noOpResult.setTook(System.nanoTime() - noOp.startTime());
+        noOpResult.freeze();
+        localCheckpointTracker.advanceMaxSeqNo(noOp.seqNo());
+        return noOpResult;
+    }
+
+    @Override
+    public GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> searcherFactory) throws EngineException {
+        return getFromSearcher(get, searcherFactory, SearcherScope.EXTERNAL);
+    }
+
+    @Override
+    protected ReferenceManager<OpenSearchDirectoryReader> getReferenceManager(SearcherScope scope) {
+        return readerManager;
+    }
+
+    @Override
+    public boolean isTranslogSyncNeeded() {
+        return translog.syncNeeded();
+    }
+
+    @Override
+    public boolean ensureTranslogSynced(Stream<Translog.Location> locations) throws IOException {
+        boolean synced = translog.ensureSynced(locations);
+        if (synced) {
+            translog.trimUnreferencedReaders();
+        }
+        return synced;
+    }
+
+    @Override
+    public void syncTranslog() throws IOException {
+        translog.sync();
+        translog.trimUnreferencedReaders();
+    }
+
+    @Override
+    public Closeable acquireHistoryRetentionLock() {
+        throw new UnsupportedOperationException("Not implemented");
+    }
+
+    @Override
+    public Translog.Snapshot newChangesSnapshot(
+        String source,
+        long fromSeqNo,
+        long toSeqNo,
+        boolean requiredFullRange,
+        boolean accurateCount
+    ) throws IOException {
+        throw new UnsupportedOperationException("Not implemented");
+    }
+
+    @Override
+    public int countNumberOfHistoryOperations(String source, long fromSeqNo, long toSeqNumber) throws IOException {
+        return 0;
+    }
+
+    @Override
+    public boolean hasCompleteOperationHistory(String reason, long startingSeqNo) {
+        return false;
+    }
+
+    @Override
+    public long getMinRetainedSeqNo() {
+        return localCheckpointTracker.getProcessedCheckpoint();
+    }
+
+    @Override
+    public TranslogStats getTranslogStats() {
+        return translog.stats();
+    }
+
+    @Override
+    public Translog.Location getTranslogLastWriteLocation() {
+        return translog.getLastWriteLocation();
+    }
+
+    @Override
+    public long getPersistedLocalCheckpoint() {
+        return localCheckpointTracker.getPersistedCheckpoint();
+    }
+
+    public long getProcessedLocalCheckpoint() {
+        return localCheckpointTracker.getProcessedCheckpoint();
+    }
+
+    @Override
+    public SeqNoStats getSeqNoStats(long globalCheckpoint) {
+        return localCheckpointTracker.getStats(globalCheckpoint);
+    }
+
+    @Override
+    public long getLastSyncedGlobalCheckpoint() {
+        return translog.getLastSyncedGlobalCheckpoint();
+    }
+
+    @Override
+    public long getIndexBufferRAMBytesUsed() {
+        return 0;
+    }
+
+    @Override
+    public List<Segment> segments(boolean verbose) {
+        return Arrays.asList(getSegmentInfo(getLatestSegmentInfos(), verbose));
+    }
+
+    @Override
+    public void refresh(String source) throws EngineException {}
+
+    @Override
+    public boolean maybeRefresh(String source) throws EngineException {
+        return false;
+    }
+
+    @Override
+    public void writeIndexingBuffer() throws EngineException {}
+
+    @Override
+    public boolean shouldPeriodicallyFlush() {
+        return false;
+    }
+
+    @Override
+    public void flush(boolean force, boolean waitIfOngoing) throws EngineException {}
+
+    @Override
+    public void trimUnreferencedTranslogFiles() throws EngineException {
+        try (ReleasableLock lock = readLock.acquire()) {
+            ensureOpen();
+            translog.trimUnreferencedReaders();
+        } catch (Exception e) {
+            try {
+                failEngine("translog trimming failed", e);
+            } catch (Exception inner) {
+                e.addSuppressed(inner);
+            }
+            throw new EngineException(shardId, "failed to trim translog", e);
+        }
+    }
+
+    @Override
+    public boolean shouldRollTranslogGeneration() {
+        return translog.shouldRollGeneration();
+    }
+
+    @Override
+    public void rollTranslogGeneration() throws EngineException {
+        try (ReleasableLock ignored = readLock.acquire()) {
+            ensureOpen();
+            translog.rollGeneration();
+            translog.trimUnreferencedReaders();
+        } catch (Exception e) {
+            try {
+                failEngine("translog trimming failed", e);
+            } catch (Exception inner) {
+                e.addSuppressed(inner);
+            }
+            throw new EngineException(shardId, "failed to roll translog", e);
+        }
+    }
+
+    @Override
+    public void forceMerge(
+        boolean flush,
+        int maxNumSegments,
+        boolean onlyExpungeDeletes,
+        boolean upgrade,
+        boolean upgradeOnlyAncientSegments,
+        String forceMergeUUID
+    ) throws EngineException, IOException {}
+
+    @Override
+    public GatedCloseable<IndexCommit> acquireLastIndexCommit(boolean flushFirst) throws EngineException {
+        try {
+            final IndexCommit indexCommit = Lucene.getIndexCommit(lastCommittedSegmentInfos, store.directory());
+            return new GatedCloseable<>(indexCommit, () -> {});
+        } catch (IOException e) {
+            throw new EngineException(shardId, "Unable to build latest IndexCommit", e);
+        }
+    }
+
+    @Override
+    public GatedCloseable<IndexCommit> acquireSafeIndexCommit() throws EngineException {
+        return acquireLastIndexCommit(false);
+    }
+
+    @Override
+    public SafeCommitInfo getSafeCommitInfo() {
+        return new SafeCommitInfo(localCheckpointTracker.getProcessedCheckpoint(), lastCommittedSegmentInfos.totalMaxDoc());
+    }
+
+    @Override
+    protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
+        if (isClosed.compareAndSet(false, true)) {
+            assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread()
+                : "Either the write lock must be held or the engine must be currently be failing itself";
+            try {
+                IOUtils.close(readerManager, translog, store::decRef);
+            } catch (Exception e) {
+                logger.warn("failed to close engine", e);
+            } finally {
+                logger.debug("engine closed [{}]", reason);
+                closedLatch.countDown();
+            }
+        }
+    }
+
+    @Override
+    public void activateThrottling() {}
+
+    @Override
+    public void deactivateThrottling() {}
+
+    @Override
+    public int restoreLocalHistoryFromTranslog(TranslogRecoveryRunner translogRecoveryRunner) throws IOException {
+        return 0;
+    }
+
+    @Override
+    public int fillSeqNoGaps(long primaryTerm) throws IOException {
+        return 0;
+    }
+
+    @Override
+    public Engine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException {
+        throw new UnsupportedOperationException("Read only replicas do not have an IndexWriter and cannot recover from a translog.");
+    }
+
+    @Override
+    public void skipTranslogRecovery() {
+        // Do nothing.
+    }
+
+    @Override
+    public void maybePruneDeletes() {}
+
+    @Override
+    public void updateMaxUnsafeAutoIdTimestamp(long newTimestamp) {}
+
+    @Override
+    public long getMaxSeqNoOfUpdatesOrDeletes() {
+        return localCheckpointTracker.getMaxSeqNo();
+    }
+
+    @Override
+    public void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary) {}
+
+    public Translog getTranslog() {
+        return translog;
+    }
+
+    @Override
+    public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) {
+        final TranslogDeletionPolicy translogDeletionPolicy = translog.getDeletionPolicy();
+        translogDeletionPolicy.setRetentionAgeInMillis(translogRetentionAge.millis());
+        translogDeletionPolicy.setRetentionSizeInBytes(translogRetentionSize.getBytes());
+    }
+
+    @Override
+    protected SegmentInfos getLastCommittedSegmentInfos() {
+        return lastCommittedSegmentInfos;
+    }
+
+    @Override
+    protected SegmentInfos getLatestSegmentInfos() {
+        return readerManager.getSegmentInfos();
+    }
+
+    protected LocalCheckpointTracker getLocalCheckpointTracker() {
+        return localCheckpointTracker;
+    }
+
+    private DirectoryReader getDirectoryReader() throws IOException {
+        // for segment replication: replicas should create the reader from store, we don't want an open IW on replicas.
+        return new SoftDeletesDirectoryReaderWrapper(DirectoryReader.open(store.directory()), Lucene.SOFT_DELETES_FIELD);
+    }
+
+    private Translog openTranslog(
+        EngineConfig engineConfig,
+        TranslogDeletionPolicy translogDeletionPolicy,
+        LongSupplier globalCheckpointSupplier,
+        LongConsumer persistedSequenceNumberConsumer
+    ) throws IOException {
+        final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
+        final Map<String, String> userData = lastCommittedSegmentInfos.getUserData();
+        final String translogUUID = Objects.requireNonNull(userData.get(Translog.TRANSLOG_UUID_KEY));
+        // We expect that this shard already exists, so it must already have an existing translog else something is badly wrong!
+        return new Translog(
+            translogConfig,
+            translogUUID,
+            translogDeletionPolicy,
+            globalCheckpointSupplier,
+            engineConfig.getPrimaryTermSupplier(),
+            persistedSequenceNumberConsumer
+        );
+    }
+
+    private TranslogDeletionPolicy getTranslogDeletionPolicy(EngineConfig engineConfig) {
+        TranslogDeletionPolicy customTranslogDeletionPolicy = null;
+        if (engineConfig.getCustomTranslogDeletionPolicyFactory() != null) {
+            customTranslogDeletionPolicy = engineConfig.getCustomTranslogDeletionPolicyFactory()
+                .create(engineConfig.getIndexSettings(), engineConfig.retentionLeasesSupplier());
+        }
+        return Objects.requireNonNullElseGet(
+            customTranslogDeletionPolicy,
+            () -> new DefaultTranslogDeletionPolicy(
+                engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
+                engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis(),
+                engineConfig.getIndexSettings().getTranslogRetentionTotalFiles()
+            )
+        );
+    }
+
+}
diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngineFactory.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngineFactory.java
new file mode 100644
index 0000000000000..45fe3086ac3f6
--- /dev/null
+++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngineFactory.java
@@ -0,0 +1,25 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.index.engine;
+
+/**
+ * Engine Factory implementation used with Segment Replication that wires up replica shards with an ${@link NRTReplicationEngine}
+ * and primary with an ${@link InternalEngine}
+ *
+ * @opensearch.internal
+ */
+public class NRTReplicationEngineFactory implements EngineFactory {
+    @Override
+    public Engine newReadWriteEngine(EngineConfig config) {
+        if (config.isReadOnlyReplica()) {
+            return new NRTReplicationEngine(config);
+        }
+        return new InternalEngine(config);
+    }
+}
diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java
new file mode 100644
index 0000000000000..16e615672a26f
--- /dev/null
+++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java
@@ -0,0 +1,92 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.index.engine;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.message.ParameterizedMessage;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.LeafReader;
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.index.SegmentInfos;
+import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper;
+import org.apache.lucene.index.StandardDirectoryReader;
+import org.opensearch.common.lucene.Lucene;
+import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * This is an extension of {@link OpenSearchReaderManager} for use with {@link NRTReplicationEngine}.
+ * The manager holds a reference to the latest {@link SegmentInfos} object that is used to refresh a reader.
+ *
+ * @opensearch.internal
+ */
+public class NRTReplicationReaderManager extends OpenSearchReaderManager {
+
+    private final static Logger logger = LogManager.getLogger(NRTReplicationReaderManager.class);
+    private volatile SegmentInfos currentInfos;
+
+    /**
+     * Creates and returns a new SegmentReplicationReaderManager from the given
+     * already-opened {@link OpenSearchDirectoryReader}, stealing
+     * the incoming reference.
+     *
+     * @param reader the SegmentReplicationReaderManager to use for future reopens
+     */
+    NRTReplicationReaderManager(OpenSearchDirectoryReader reader) {
+        super(reader);
+        currentInfos = unwrapStandardReader(reader).getSegmentInfos();
+    }
+
+    @Override
+    protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader referenceToRefresh) throws IOException {
+        Objects.requireNonNull(referenceToRefresh);
+        final List<LeafReader> subs = new ArrayList<>();
+        final StandardDirectoryReader standardDirectoryReader = unwrapStandardReader(referenceToRefresh);
+        for (LeafReaderContext ctx : standardDirectoryReader.leaves()) {
+            subs.add(ctx.reader());
+        }
+        DirectoryReader innerReader = StandardDirectoryReader.open(referenceToRefresh.directory(), currentInfos, subs, null);
+        final DirectoryReader softDeletesDirectoryReaderWrapper = new SoftDeletesDirectoryReaderWrapper(
+            innerReader,
+            Lucene.SOFT_DELETES_FIELD
+        );
+        logger.trace(
+            () -> new ParameterizedMessage("updated to SegmentInfosVersion=" + currentInfos.getVersion() + " reader=" + innerReader)
+        );
+        return OpenSearchDirectoryReader.wrap(softDeletesDirectoryReaderWrapper, referenceToRefresh.shardId());
+    }
+
+    /**
+     * Update this reader's segments and refresh.
+     *
+     * @param infos {@link SegmentInfos} infos
+     * @throws IOException - When Refresh fails with an IOException.
+     */
+    public synchronized void updateSegments(SegmentInfos infos) throws IOException {
+        currentInfos = infos;
+        maybeRefresh();
+    }
+
+    public SegmentInfos getSegmentInfos() {
+        return currentInfos;
+    }
+
+    private StandardDirectoryReader unwrapStandardReader(OpenSearchDirectoryReader reader) {
+        final DirectoryReader delegate = reader.getDelegate();
+        if (delegate instanceof SoftDeletesDirectoryReaderWrapper) {
+            return (StandardDirectoryReader) ((SoftDeletesDirectoryReaderWrapper) delegate).getDelegate();
+        }
+        return (StandardDirectoryReader) delegate;
+    }
+}
diff --git a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java
index 2e3155a4d173e..23a86d8da5599 100644
--- a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java
+++ b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java
@@ -270,6 +270,11 @@ protected SegmentInfos getLastCommittedSegmentInfos() {
         return lastCommittedSegmentInfos;
     }
 
+    @Override
+    protected SegmentInfos getLatestSegmentInfos() {
+        return lastCommittedSegmentInfos;
+    }
+
     @Override
     public String getHistoryUUID() {
         return lastCommittedSegmentInfos.userData.get(Engine.HISTORY_UUID_KEY);
diff --git a/server/src/main/java/org/opensearch/index/seqno/LocalCheckpointTracker.java b/server/src/main/java/org/opensearch/index/seqno/LocalCheckpointTracker.java
index dbcc5e2190006..d75893080c0d7 100644
--- a/server/src/main/java/org/opensearch/index/seqno/LocalCheckpointTracker.java
+++ b/server/src/main/java/org/opensearch/index/seqno/LocalCheckpointTracker.java
@@ -156,7 +156,7 @@ public synchronized void markSeqNoAsPersisted(final long seqNo) {
     public synchronized void fastForwardProcessedSeqNo(final long seqNo) {
         advanceMaxSeqNo(seqNo);
         final long currentProcessedCheckpoint = processedCheckpoint.get();
-        if (shouldUpdateSeqNo(seqNo, currentProcessedCheckpoint, persistedCheckpoint) == false) {
+        if (seqNo <= currentProcessedCheckpoint) {
             return;
         }
         processedCheckpoint.compareAndSet(currentProcessedCheckpoint, seqNo);
diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java
index 60a3305370c2a..995a92e94aeb3 100644
--- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java
+++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java
@@ -3160,7 +3160,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
             globalCheckpointSupplier,
             replicationTracker::getRetentionLeases,
             () -> getOperationPrimaryTerm(),
-            tombstoneDocSupplier()
+            tombstoneDocSupplier(),
+            indexSettings.isSegRepEnabled() && shardRouting.primary() == false
         );
     }
 
diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java
index 5ce10069aaa89..79fd2893fb78c 100644
--- a/server/src/main/java/org/opensearch/indices/IndicesService.java
+++ b/server/src/main/java/org/opensearch/indices/IndicesService.java
@@ -109,6 +109,7 @@
 import org.opensearch.index.engine.EngineConfigFactory;
 import org.opensearch.index.engine.EngineFactory;
 import org.opensearch.index.engine.InternalEngineFactory;
+import org.opensearch.index.engine.NRTReplicationEngineFactory;
 import org.opensearch.index.engine.NoOpEngine;
 import org.opensearch.index.fielddata.IndexFieldDataCache;
 import org.opensearch.index.flush.FlushStats;
@@ -764,6 +765,9 @@ private EngineFactory getEngineFactory(final IndexSettings idxSettings) {
             .filter(maybe -> Objects.requireNonNull(maybe).isPresent())
             .collect(Collectors.toList());
         if (engineFactories.isEmpty()) {
+            if (idxSettings.isSegRepEnabled()) {
+                return new NRTReplicationEngineFactory();
+            }
             return new InternalEngineFactory();
         } else if (engineFactories.size() == 1) {
             assert engineFactories.get(0).isPresent();
diff --git a/server/src/test/java/org/opensearch/index/engine/EngineConfigFactoryTests.java b/server/src/test/java/org/opensearch/index/engine/EngineConfigFactoryTests.java
index 8030619500278..7ddd92ea7b36e 100644
--- a/server/src/test/java/org/opensearch/index/engine/EngineConfigFactoryTests.java
+++ b/server/src/test/java/org/opensearch/index/engine/EngineConfigFactoryTests.java
@@ -65,7 +65,8 @@ public void testCreateEngineConfigFromFactory() {
             null,
             () -> new RetentionLeases(0, 0, Collections.emptyList()),
             null,
-            null
+            null,
+            false
         );
 
         assertNotNull(config.getCodec());
@@ -141,7 +142,8 @@ public void testCreateCodecServiceFromFactory() {
             null,
             () -> new RetentionLeases(0, 0, Collections.emptyList()),
             null,
-            null
+            null,
+            false
         );
         assertNotNull(config.getCodec());
     }
diff --git a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java
new file mode 100644
index 0000000000000..6aa00bb9312dd
--- /dev/null
+++ b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java
@@ -0,0 +1,239 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.index.engine;
+
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.index.NoMergePolicy;
+import org.apache.lucene.index.SegmentInfos;
+import org.hamcrest.MatcherAssert;
+import org.opensearch.common.concurrent.GatedCloseable;
+import org.opensearch.common.lucene.Lucene;
+import org.opensearch.common.lucene.search.Queries;
+import org.opensearch.index.mapper.ParsedDocument;
+import org.opensearch.index.seqno.SequenceNumbers;
+import org.opensearch.index.store.Store;
+import org.opensearch.index.translog.TestTranslog;
+import org.opensearch.index.translog.Translog;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.opensearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
+
+public class NRTReplicationEngineTests extends EngineTestCase {
+
+    public void testCreateEngine() throws IOException {
+        final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
+        try (
+            final Store nrtEngineStore = createStore();
+            final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore);
+        ) {
+            final SegmentInfos latestSegmentInfos = nrtEngine.getLatestSegmentInfos();
+            final SegmentInfos lastCommittedSegmentInfos = nrtEngine.getLastCommittedSegmentInfos();
+            assertEquals(latestSegmentInfos.version, lastCommittedSegmentInfos.version);
+            assertEquals(latestSegmentInfos.getGeneration(), lastCommittedSegmentInfos.getGeneration());
+            assertEquals(latestSegmentInfos.getUserData(), lastCommittedSegmentInfos.getUserData());
+            assertEquals(latestSegmentInfos.files(true), lastCommittedSegmentInfos.files(true));
+
+            assertTrue(nrtEngine.segments(true).isEmpty());
+
+            try (final GatedCloseable<IndexCommit> indexCommitGatedCloseable = nrtEngine.acquireLastIndexCommit(false)) {
+                final IndexCommit indexCommit = indexCommitGatedCloseable.get();
+                assertEquals(indexCommit.getUserData(), lastCommittedSegmentInfos.getUserData());
+                assertTrue(indexCommit.getFileNames().containsAll(lastCommittedSegmentInfos.files(true)));
+            }
+        }
+    }
+
+    public void testEngineWritesOpsToTranslog() throws Exception {
+        final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
+
+        try (
+            final Store nrtEngineStore = createStore();
+            final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore);
+        ) {
+            List<Engine.Operation> operations = generateHistoryOnReplica(
+                between(1, 500),
+                randomBoolean(),
+                randomBoolean(),
+                randomBoolean()
+            );
+            for (Engine.Operation op : operations) {
+                applyOperation(engine, op);
+                applyOperation(nrtEngine, op);
+            }
+
+            assertEquals(nrtEngine.getTranslogLastWriteLocation(), engine.getTranslogLastWriteLocation());
+            assertEquals(nrtEngine.getLastSyncedGlobalCheckpoint(), engine.getLastSyncedGlobalCheckpoint());
+
+            // we don't index into nrtEngine, so get the doc ids from the regular engine.
+            final List<DocIdSeqNoAndSource> docs = getDocIds(engine, true);
+
+            // recover a new engine from the nrtEngine's xlog.
+            nrtEngine.syncTranslog();
+            try (InternalEngine engine = new InternalEngine(nrtEngine.config())) {
+                engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
+                assertEquals(getDocIds(engine, true), docs);
+            }
+            assertEngineCleanedUp(nrtEngine, nrtEngine.getTranslog());
+        }
+    }
+
+    public void testUpdateSegments() throws Exception {
+        final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
+
+        try (
+            final Store nrtEngineStore = createStore();
+            final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore);
+        ) {
+            // add docs to the primary engine.
+            List<Engine.Operation> operations = generateHistoryOnReplica(between(1, 500), randomBoolean(), randomBoolean(), randomBoolean())
+                .stream()
+                .filter(op -> op.operationType().equals(Engine.Operation.TYPE.INDEX))
+                .collect(Collectors.toList());
+            for (Engine.Operation op : operations) {
+                applyOperation(engine, op);
+                applyOperation(nrtEngine, op);
+            }
+
+            engine.refresh("test");
+
+            nrtEngine.updateSegments(engine.getLatestSegmentInfos(), engine.getProcessedLocalCheckpoint());
+            assertMatchingSegmentsAndCheckpoints(nrtEngine);
+
+            // assert a doc from the operations exists.
+            final ParsedDocument parsedDoc = createParsedDoc(operations.stream().findFirst().get().id(), null);
+            try (Engine.GetResult getResult = engine.get(newGet(true, parsedDoc), engine::acquireSearcher)) {
+                assertThat(getResult.exists(), equalTo(true));
+                assertThat(getResult.docIdAndVersion(), notNullValue());
+            }
+
+            try (Engine.GetResult getResult = nrtEngine.get(newGet(true, parsedDoc), nrtEngine::acquireSearcher)) {
+                assertThat(getResult.exists(), equalTo(true));
+                assertThat(getResult.docIdAndVersion(), notNullValue());
+            }
+
+            // Flush the primary and update the NRTEngine with the latest committed infos.
+            engine.flush();
+            nrtEngine.syncTranslog(); // to advance persisted checkpoint
+
+            Set<Long> seqNos = operations.stream().map(Engine.Operation::seqNo).collect(Collectors.toSet());
+
+            try (Translog.Snapshot snapshot = nrtEngine.getTranslog().newSnapshot()) {
+                assertThat(snapshot.totalOperations(), equalTo(operations.size()));
+                assertThat(
+                    TestTranslog.drainSnapshot(snapshot, false).stream().map(Translog.Operation::seqNo).collect(Collectors.toSet()),
+                    equalTo(seqNos)
+                );
+            }
+
+            nrtEngine.updateSegments(engine.getLastCommittedSegmentInfos(), engine.getProcessedLocalCheckpoint());
+            assertMatchingSegmentsAndCheckpoints(nrtEngine);
+
+            assertEquals(
+                nrtEngine.getTranslog().getGeneration().translogFileGeneration,
+                engine.getTranslog().getGeneration().translogFileGeneration
+            );
+
+            try (Translog.Snapshot snapshot = nrtEngine.getTranslog().newSnapshot()) {
+                assertThat(snapshot.totalOperations(), equalTo(operations.size()));
+                assertThat(
+                    TestTranslog.drainSnapshot(snapshot, false).stream().map(Translog.Operation::seqNo).collect(Collectors.toSet()),
+                    equalTo(seqNos)
+                );
+            }
+
+            // Ensure the same hit count between engines.
+            int expectedDocCount;
+            try (final Engine.Searcher test = engine.acquireSearcher("test")) {
+                expectedDocCount = test.count(Queries.newMatchAllQuery());
+                assertSearcherHits(nrtEngine, expectedDocCount);
+            }
+            assertEngineCleanedUp(nrtEngine, nrtEngine.getTranslog());
+        }
+    }
+
+    public void testTrimTranslogOps() throws Exception {
+        final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
+
+        try (
+            final Store nrtEngineStore = createStore();
+            final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore);
+        ) {
+            List<Engine.Operation> operations = generateHistoryOnReplica(
+                between(1, 100),
+                randomBoolean(),
+                randomBoolean(),
+                randomBoolean()
+            );
+            applyOperations(nrtEngine, operations);
+            Set<Long> seqNos = operations.stream().map(Engine.Operation::seqNo).collect(Collectors.toSet());
+            try (Translog.Snapshot snapshot = nrtEngine.getTranslog().newSnapshot()) {
+                assertThat(snapshot.totalOperations(), equalTo(operations.size()));
+                assertThat(
+                    TestTranslog.drainSnapshot(snapshot, false).stream().map(Translog.Operation::seqNo).collect(Collectors.toSet()),
+                    equalTo(seqNos)
+                );
+            }
+            nrtEngine.rollTranslogGeneration();
+            nrtEngine.trimOperationsFromTranslog(primaryTerm.get(), NO_OPS_PERFORMED);
+            try (Translog.Snapshot snapshot = getTranslog(engine).newSnapshot()) {
+                assertThat(snapshot.totalOperations(), equalTo(0));
+                assertNull(snapshot.next());
+            }
+        }
+    }
+
+    private void assertMatchingSegmentsAndCheckpoints(NRTReplicationEngine nrtEngine) throws IOException {
+        assertEquals(engine.getPersistedLocalCheckpoint(), nrtEngine.getPersistedLocalCheckpoint());
+        assertEquals(engine.getProcessedLocalCheckpoint(), nrtEngine.getProcessedLocalCheckpoint());
+        assertEquals(engine.getLocalCheckpointTracker().getMaxSeqNo(), nrtEngine.getLocalCheckpointTracker().getMaxSeqNo());
+        assertEquals(engine.getLatestSegmentInfos().files(true), nrtEngine.getLatestSegmentInfos().files(true));
+        assertEquals(engine.getLatestSegmentInfos().getUserData(), nrtEngine.getLatestSegmentInfos().getUserData());
+        assertEquals(engine.getLatestSegmentInfos().getVersion(), nrtEngine.getLatestSegmentInfos().getVersion());
+        assertEquals(engine.segments(true), nrtEngine.segments(true));
+    }
+
+    private void assertSearcherHits(Engine engine, int hits) {
+        try (final Engine.Searcher test = engine.acquireSearcher("test")) {
+            MatcherAssert.assertThat(test, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(hits));
+        }
+    }
+
+    private NRTReplicationEngine buildNrtReplicaEngine(AtomicLong globalCheckpoint, Store store) throws IOException {
+        Lucene.cleanLuceneIndex(store.directory());
+        final Path translogDir = createTempDir();
+        final EngineConfig replicaConfig = config(
+            defaultSettings,
+            store,
+            translogDir,
+            NoMergePolicy.INSTANCE,
+            null,
+            null,
+            globalCheckpoint::get
+        );
+        if (Lucene.indexExists(store.directory()) == false) {
+            store.createEmpty(replicaConfig.getIndexSettings().getIndexVersionCreated().luceneVersion);
+            final String translogUuid = Translog.createEmptyTranslog(
+                replicaConfig.getTranslogConfig().getTranslogPath(),
+                SequenceNumbers.NO_OPS_PERFORMED,
+                shardId,
+                primaryTerm.get()
+            );
+            store.associateIndexWithNewTranslog(translogUuid);
+        }
+        return new NRTReplicationEngine(replicaConfig);
+    }
+}
diff --git a/server/src/test/java/org/opensearch/index/seqno/LocalCheckpointTrackerTests.java b/server/src/test/java/org/opensearch/index/seqno/LocalCheckpointTrackerTests.java
index 237066e549b09..3a450e1f72a8d 100644
--- a/server/src/test/java/org/opensearch/index/seqno/LocalCheckpointTrackerTests.java
+++ b/server/src/test/java/org/opensearch/index/seqno/LocalCheckpointTrackerTests.java
@@ -332,59 +332,22 @@ public void testContains() {
         assertThat(tracker.hasProcessed(seqNo), equalTo(seqNo <= localCheckpoint || seqNos.contains(seqNo)));
     }
 
-    public void testFastForwardProcessedNoPersistentUpdate() {
+    public void testFastForwardProcessedSeqNo() {
         // base case with no persistent checkpoint update
         long seqNo1;
         assertThat(tracker.getProcessedCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED));
         seqNo1 = tracker.generateSeqNo();
         assertThat(seqNo1, equalTo(0L));
         tracker.fastForwardProcessedSeqNo(seqNo1);
-        assertThat(tracker.getProcessedCheckpoint(), equalTo(-1L));
-    }
+        assertThat(tracker.getProcessedCheckpoint(), equalTo(seqNo1));
 
-    public void testFastForwardProcessedPersistentUpdate() {
-        // base case with persistent checkpoint update
-        long seqNo1;
-        assertThat(tracker.getProcessedCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED));
-        seqNo1 = tracker.generateSeqNo();
-        assertThat(seqNo1, equalTo(0L));
-
-        tracker.markSeqNoAsPersisted(seqNo1);
-        assertThat(tracker.getPersistedCheckpoint(), equalTo(0L));
+        // idempotent case
         tracker.fastForwardProcessedSeqNo(seqNo1);
         assertThat(tracker.getProcessedCheckpoint(), equalTo(0L));
         assertThat(tracker.hasProcessed(0L), equalTo(true));
-        assertThat(tracker.hasProcessed(atLeast(1)), equalTo(false));
 
-        // idempotent case
-        tracker.fastForwardProcessedSeqNo(seqNo1);
+        tracker.fastForwardProcessedSeqNo(-1);
         assertThat(tracker.getProcessedCheckpoint(), equalTo(0L));
         assertThat(tracker.hasProcessed(0L), equalTo(true));
-        assertThat(tracker.hasProcessed(atLeast(1)), equalTo(false));
-
-    }
-
-    public void testFastForwardProcessedPersistentUpdate2() {
-        long seqNo1, seqNo2;
-        assertThat(tracker.getProcessedCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED));
-        seqNo1 = tracker.generateSeqNo();
-        seqNo2 = tracker.generateSeqNo();
-        assertThat(seqNo1, equalTo(0L));
-        assertThat(seqNo2, equalTo(1L));
-        tracker.markSeqNoAsPersisted(seqNo1);
-        tracker.markSeqNoAsPersisted(seqNo2);
-        assertThat(tracker.getProcessedCheckpoint(), equalTo(-1L));
-        assertThat(tracker.getPersistedCheckpoint(), equalTo(1L));
-
-        tracker.fastForwardProcessedSeqNo(seqNo2);
-        assertThat(tracker.getProcessedCheckpoint(), equalTo(1L));
-        assertThat(tracker.hasProcessed(seqNo1), equalTo(true));
-        assertThat(tracker.hasProcessed(seqNo2), equalTo(true));
-
-        tracker.fastForwardProcessedSeqNo(seqNo1);
-        assertThat(tracker.getProcessedCheckpoint(), equalTo(1L));
-        assertThat(tracker.hasProcessed(between(0, 1)), equalTo(true));
-        assertThat(tracker.hasProcessed(atLeast(2)), equalTo(false));
-        assertThat(tracker.getMaxSeqNo(), equalTo(1L));
     }
 }
diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java
index bf9671964a210..49d0c089f072b 100644
--- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java
+++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java
@@ -101,6 +101,8 @@
 import org.opensearch.index.engine.EngineTestCase;
 import org.opensearch.index.engine.InternalEngine;
 import org.opensearch.index.engine.InternalEngineFactory;
+import org.opensearch.index.engine.NRTReplicationEngineFactory;
+import org.opensearch.index.engine.NRTReplicationEngine;
 import org.opensearch.index.engine.ReadOnlyEngine;
 import org.opensearch.index.fielddata.FieldDataStats;
 import org.opensearch.index.fielddata.IndexFieldData;
@@ -136,6 +138,7 @@
 import org.opensearch.indices.recovery.RecoveryTarget;
 import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
 import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
+import org.opensearch.indices.replication.common.ReplicationType;
 import org.opensearch.repositories.IndexId;
 import org.opensearch.snapshots.Snapshot;
 import org.opensearch.snapshots.SnapshotId;
@@ -4167,14 +4170,14 @@ public void testSnapshotWhileResettingEngine() throws Exception {
             @Override
             public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo)
                 throws IOException {
-                InternalEngine internalEngine = super.recoverFromTranslog(translogRecoveryRunner, recoverUpToSeqNo);
+                InternalEngine engine = super.recoverFromTranslog(translogRecoveryRunner, recoverUpToSeqNo);
                 readyToSnapshotLatch.countDown();
                 try {
                     snapshotDoneLatch.await();
                 } catch (InterruptedException e) {
                     throw new AssertionError(e);
                 }
-                return internalEngine;
+                return engine;
             }
         });
 
@@ -4447,6 +4450,27 @@ protected void ensureMaxSeqNoEqualsToGlobalCheckpoint(SeqNoStats seqNoStats) {
         closeShards(readonlyShard);
     }
 
+    public void testReadOnlyReplicaEngineConfig() throws IOException {
+        Settings primarySettings = Settings.builder()
+            .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
+            .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
+            .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
+            .build();
+        final IndexShard primaryShard = newStartedShard(false, primarySettings, new NRTReplicationEngineFactory());
+        assertFalse(primaryShard.getEngine().config().isReadOnlyReplica());
+        assertEquals(primaryShard.getEngine().getClass(), InternalEngine.class);
+
+        Settings replicaSettings = Settings.builder()
+            .put(primarySettings)
+            .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
+            .build();
+        final IndexShard replicaShard = newStartedShard(false, replicaSettings, new NRTReplicationEngineFactory());
+        assertTrue(replicaShard.getEngine().config().isReadOnlyReplica());
+        assertEquals(replicaShard.getEngine().getClass(), NRTReplicationEngine.class);
+
+        closeShards(primaryShard, replicaShard);
+    }
+
     public void testCloseShardWhileEngineIsWarming() throws Exception {
         CountDownLatch warmerStarted = new CountDownLatch(1);
         CountDownLatch warmerBlocking = new CountDownLatch(1);
diff --git a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java
index 2bce5a7c81794..66c697d83510b 100644
--- a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java
+++ b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java
@@ -328,24 +328,26 @@ public void tearDown() throws Exception {
         super.tearDown();
         try {
             if (engine != null && engine.isClosed.get() == false) {
-                engine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs();
-                assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine);
-                assertNoInFlightDocuments(engine);
-                assertMaxSeqNoInCommitUserData(engine);
-                assertAtMostOneLuceneDocumentPerSequenceNumber(engine);
+                assertEngineCleanedUp(engine, engine.getTranslog());
             }
             if (replicaEngine != null && replicaEngine.isClosed.get() == false) {
-                replicaEngine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs();
-                assertConsistentHistoryBetweenTranslogAndLuceneIndex(replicaEngine);
-                assertNoInFlightDocuments(replicaEngine);
-                assertMaxSeqNoInCommitUserData(replicaEngine);
-                assertAtMostOneLuceneDocumentPerSequenceNumber(replicaEngine);
+                assertEngineCleanedUp(replicaEngine, replicaEngine.getTranslog());
             }
         } finally {
             IOUtils.close(replicaEngine, storeReplica, engine, store, () -> terminate(threadPool));
         }
     }
 
+    protected void assertEngineCleanedUp(Engine engine, Translog translog) throws Exception {
+        if (engine.isClosed.get() == false) {
+            translog.getDeletionPolicy().assertNoOpenTranslogRefs();
+            assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine);
+            assertNoInFlightDocuments(engine);
+            assertMaxSeqNoInCommitUserData(engine);
+            assertAtMostOneLuceneDocumentPerSequenceNumber(engine);
+        }
+    }
+
     protected static ParseContext.Document testDocumentWithTextField() {
         return testDocumentWithTextField("test");
     }