Skip to content

Commit

Permalink
Fix NRTMock Engine failing tests.
Browse files Browse the repository at this point in the history
Signed-off-by: Rishikesh1159 <[email protected]>
  • Loading branch information
Rishikesh1159 committed Sep 29, 2023
1 parent d797d67 commit 19815a8
Show file tree
Hide file tree
Showing 19 changed files with 97 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ protected boolean forbidPrivateIndexSettings() {
return false;
}

@Override
protected boolean useSegmentReplication() {
return false;
}

public void testCreateShrinkIndexToN() throws Exception {

assumeFalse("https://github.com/elastic/elasticsearch/issues/34080", Constants.WINDOWS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ protected boolean forbidPrivateIndexSettings() {
return false;
}

@Override
protected boolean useSegmentReplication() {
return false;
}

public void testCreateSplitIndexToN() throws IOException {
int[][] possibleShardSplits = new int[][] { { 2, 4, 8 }, { 3, 6, 12 }, { 1, 2, 4 } };
int[] shardSplits = randomFrom(possibleShardSplits);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@

public class ForceMergeIT extends OpenSearchIntegTestCase {

@Override
protected boolean useSegmentReplication() {
return false;
}

public void testForceMergeUUIDConsistent() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(2);
final String index = "test-index";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ protected int numberOfReplicas() {
return 2;
}

@Override
protected boolean useSegmentReplication() {
return false;
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(MockTransportService.TestPlugin.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ protected boolean addMockInternalEngine() {
return false;
}

@Override
protected boolean useSegmentReplication() {
return false;
}

public void testBulkWeirdScenario() throws Exception {
String clusterManager = internalCluster().startClusterManagerOnlyNode(Settings.EMPTY);
internalCluster().startDataOnlyNodes(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ public class DiskThresholdDeciderIT extends OpenSearchIntegTestCase {

private FileSystem defaultFileSystem;

@Override
protected boolean useSegmentReplication() {
return false;
}

@Before
public void installFilesystemProvider() {
assertNull(defaultFileSystem);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class ClusterDisruptionIT extends AbstractDisruptionTestCase {

@Override
protected boolean useSegmentReplication() {
return false;
}

private enum ConflictMode {
none,
external,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@
import static org.hamcrest.Matchers.notNullValue;

public class ExplainActionIT extends OpenSearchIntegTestCase {

@Override
protected boolean useSegmentReplication() {
return false;
}

public void testSimple() throws Exception {
assertAcked(prepareCreate("test").addAlias(new Alias("alias")).setSettings(Settings.builder().put("index.refresh_interval", -1)));
ensureGreen("test");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class ReplicaShardAllocatorIT extends OpenSearchIntegTestCase {

@Override
protected boolean useSegmentReplication() {
return false;
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class, InternalSettingsPlugin.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST)
public class IndexPrimaryRelocationIT extends OpenSearchIntegTestCase {

@Override
protected boolean addMockNRTReplicationEngine() {
return false;
}

private static final int RELOCATION_COUNT = 15;

public Settings indexSettings() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public static void waitForCurrentReplicas(String index, List<String> nodes) thro
assertBusy(() -> {
for (String node : nodes) {
final IndexShard indexShard = getIndexShard(node, index);
indexShard.getReplicationEngine().ifPresent((engine) -> { assertFalse(engine.hasRefreshPending()); });
indexShard.getReplicationEngineForTests().ifPresent((engine) -> { assertFalse(engine.hasRefreshPending()); });
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ public IndexStatsIT(Settings settings) {
super(settings);
}

@Override
protected boolean useSegmentReplication() {
return false;
}

@ParametersFactory
public static Collection<Object[]> parameters() {
return Arrays.asList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteIndexRecoveryIT extends IndexRecoveryIT {

@Override
protected boolean useSegmentReplication() {
return false;
}

protected static final String REPOSITORY_NAME = "test-remote-store-repo";

protected Path repositoryPath;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@

public class SimpleRoutingIT extends OpenSearchIntegTestCase {

@Override
protected boolean useSegmentReplication() {
return false;
}

@Override
protected int minimumNumberOfShards() {
return 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, minNumDataNodes = 3)
public class SearchWeightedRoutingIT extends OpenSearchIntegTestCase {

@Override
protected boolean useSegmentReplication() {
return false;
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class, MockRepository.Plugin.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY;
import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO;

/**
* The default internal engine (can be overridden by plugins)
*
Expand Down Expand Up @@ -382,7 +385,7 @@ private SoftDeletesPolicy newSoftDeletesPolicy() throws IOException {
if (commitUserData.containsKey(Engine.MIN_RETAINED_SEQNO)) {
lastMinRetainedSeqNo = Long.parseLong(commitUserData.get(Engine.MIN_RETAINED_SEQNO));
} else {
lastMinRetainedSeqNo = Long.parseLong(commitUserData.get(SequenceNumbers.MAX_SEQ_NO)) + 1;
lastMinRetainedSeqNo = Long.parseLong(commitUserData.get(MAX_SEQ_NO)) + 1;
}
return new SoftDeletesPolicy(
translogManager::getLastSyncedGlobalCheckpoint,
Expand Down Expand Up @@ -1815,9 +1818,7 @@ public boolean shouldPeriodicallyFlush() {
if (shouldPeriodicallyFlushAfterBigMerge.get()) {
return true;
}
final long localCheckpointOfLastCommit = Long.parseLong(
lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)
);
final long localCheckpointOfLastCommit = Long.parseLong(lastCommittedSegmentInfos.userData.get(LOCAL_CHECKPOINT_KEY));
return translogManager.shouldPeriodicallyFlush(
localCheckpointOfLastCommit,
config().getIndexSettings().getFlushThresholdSize().getBytes()
Expand Down Expand Up @@ -1855,9 +1856,7 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
if (hasUncommittedChanges
|| force
|| shouldPeriodicallyFlush
|| getProcessedLocalCheckpoint() > Long.parseLong(
lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)
)) {
|| getProcessedLocalCheckpoint() > Long.parseLong(lastCommittedSegmentInfos.userData.get(LOCAL_CHECKPOINT_KEY))) {
translogManager.ensureCanFlush();
try {
translogManager.rollTranslogGeneration();
Expand Down Expand Up @@ -2516,8 +2515,8 @@ protected void commitIndexWriter(final IndexWriter writer, final String translog
*/
final Map<String, String> commitData = new HashMap<>(7);
commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint));
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(localCheckpointTracker.getMaxSeqNo()));
commitData.put(LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint));
commitData.put(MAX_SEQ_NO, Long.toString(localCheckpointTracker.getMaxSeqNo()));
commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get()));
commitData.put(HISTORY_UUID_KEY, historyUUID);
commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(softDeletesPolicy.getMinRetainedSeqNo()));
Expand Down
11 changes: 9 additions & 2 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1013,7 +1013,7 @@ private Engine.IndexResult applyIndexOperation(
UNASSIGNED_SEQ_NO,
0
);
return getEngine().index(index);
return index(engine, index);
}
assert opPrimaryTerm <= getOperationPrimaryTerm() : "op term [ "
+ opPrimaryTerm
Expand Down Expand Up @@ -1571,6 +1571,12 @@ public Optional<NRTReplicationEngine> getReplicationEngine() {
}
}

public Optional<NRTReplicationEngine> getReplicationEngineForTests() {
return Optional.ofNullable(getEngineOrNull())
.filter((engine) -> engine instanceof NRTReplicationEngine)
.map((engine) -> (NRTReplicationEngine) engine);
}

public void finalizeReplication(SegmentInfos infos) throws IOException {
if (getReplicationEngine().isPresent()) {
getReplicationEngine().get().updateSegments(infos);
Expand Down Expand Up @@ -4419,7 +4425,8 @@ final long getLastSearcherAccess() {
* Returns true if this shard has some scheduled refresh that is pending because of search-idle.
*/
public final boolean hasRefreshPending() {
return pendingRefreshLocation.get() != null;
final Boolean nrtPending = getReplicationEngine().map(NRTReplicationEngine::hasRefreshPending).orElse(false);
return pendingRefreshLocation.get() != null || nrtPending;
}

private void setRefreshPending(Engine engine) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe
// if the shard is in any state
if (replicaShard.state().equals(IndexShardState.CLOSED)) {
// ignore if shard is closed
logger.trace(() -> "Ignoring checkpoint, Shard is closed");
logger.info(() -> "Ignoring checkpoint, Shard is closed");
return;
}
updateLatestReceivedCheckpoint(receivedCheckpoint, replicaShard);
Expand Down Expand Up @@ -281,7 +281,7 @@ public void onReplicationFailure(
});
}
} else {
logger.trace(
logger.info(
() -> new ParameterizedMessage("Ignoring checkpoint, shard not started {} {}", receivedCheckpoint, replicaShard.state())
);
}
Expand Down Expand Up @@ -330,7 +330,7 @@ protected void updateVisibleCheckpoint(long replicationId, IndexShard replicaSha
final TransportRequestOptions options = TransportRequestOptions.builder()
.withTimeout(recoverySettings.internalActionTimeout())
.build();
logger.trace(
logger.info(
() -> new ParameterizedMessage(
"Updating Primary shard that replica {}-{} is synced to checkpoint {}",
replicaShard.shardId(),
Expand All @@ -347,7 +347,7 @@ protected void updateVisibleCheckpoint(long replicationId, IndexShard replicaSha
final ActionListener<Void> listener = new ActionListener<>() {
@Override
public void onResponse(Void unused) {
logger.trace(
logger.info(
() -> new ParameterizedMessage(
"Successfully updated replication checkpoint {} for replica {}",
replicaShard.shardId(),
Expand Down Expand Up @@ -386,7 +386,7 @@ private DiscoveryNode getPrimaryNode(ShardRouting primaryShard) {
protected boolean processLatestReceivedCheckpoint(IndexShard replicaShard, Thread thread) {
final ReplicationCheckpoint latestPublishedCheckpoint = latestReceivedCheckpoint.get(replicaShard.shardId());
if (latestPublishedCheckpoint != null && latestPublishedCheckpoint.isAheadOf(replicaShard.getLatestReplicationCheckpoint())) {
logger.trace(
logger.info(
() -> new ParameterizedMessage(
"Processing latest received checkpoint for shard {} {}",
replicaShard.shardId(),
Expand Down Expand Up @@ -448,7 +448,7 @@ void startReplication(final SegmentReplicationTarget target) {
target.fail(e, false);
return;
}
logger.trace(() -> new ParameterizedMessage("Added new replication to collection {}", target.description()));
logger.info(() -> new ParameterizedMessage("Added new replication to collection {}", target.description()));
threadPool.generic().execute(new ReplicationRunner(replicationId));
}

Expand Down Expand Up @@ -573,7 +573,7 @@ private void forceReplication(ForceSyncRequest request, ActionListener<Transport
@Override
public void onReplicationDone(SegmentReplicationState state) {
try {
logger.trace(
logger.info(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Force replication Sync complete to {}, timing data: {}",
shardId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1697,7 +1697,7 @@ protected static Collection<IndexShard> getReplicaShards(String... node) {
public static void waitForCurrentReplicas(Collection<IndexShard> shards) throws Exception {
assertBusy(() -> {
for (IndexShard indexShard : shards) {
indexShard.getReplicationEngine().ifPresent((engine) -> assertFalse(engine.hasRefreshPending()));
indexShard.getReplicationEngineForTests().ifPresent((engine) -> assertFalse(engine.hasRefreshPending()));
}
});
}
Expand Down

0 comments on commit 19815a8

Please sign in to comment.