Skip to content

Commit

Permalink
Fix sort order in getMaxSeqNoFromSegmentInfos and mute testRemoteTran…
Browse files Browse the repository at this point in the history
…slogRestore (opensearch-project#6230)

Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
sachinpkale authored and mch2 committed Mar 4, 2023
1 parent 72f6115 commit 82e7e65
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ protected Settings featureFlagSettings() {

@Before
public void setup() {
internalCluster().startClusterManagerOnlyNode();
Path absolutePath = randomRepoPath().toAbsolutePath();
assertAcked(
clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath))
Expand Down Expand Up @@ -161,10 +162,12 @@ public void testRemoteStoreRestoreFromRemoteSegmentStore() throws IOException {
assertAcked(client().admin().indices().prepareClose(INDEX_NAME));

client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), PlainActionFuture.newFuture());
ensureGreen(INDEX_NAME);

verifyRestoredData(indexStats, false);
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6188")
public void testRemoteTranslogRestore() throws IOException {
internalCluster().startNodes(3);
createIndex(INDEX_NAME, remoteTranslogIndexSettings(0));
Expand All @@ -177,6 +180,7 @@ public void testRemoteTranslogRestore() throws IOException {
assertAcked(client().admin().indices().prepareClose(INDEX_NAME));

client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), PlainActionFuture.newFuture());
ensureGreen(INDEX_NAME);

verifyRestoredData(indexStats, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ protected long getMaxSeqNoFromSearcher(IndexSearcher searcher) throws IOExceptio
ScoreDoc[] docs = searcher.search(
Queries.newMatchAllQuery(),
1,
new Sort(new SortField(SeqNoFieldMapper.NAME, SortField.Type.DOC, true))
new Sort(new SortField(SeqNoFieldMapper.NAME, SortField.Type.LONG, true))
).scoreDocs;
if (docs.length == 0) {
return SequenceNumbers.NO_OPS_PERFORMED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7684,4 +7684,57 @@ public void testGetMaxSeqNoFromSegmentInfos() throws IOException {
engine.close();
}

public void testGetMaxSeqNoFromSegmentInfosConcurrentWrites() throws IOException, BrokenBarrierException, InterruptedException {
IOUtils.close(store, engine);

final Settings.Builder settings = Settings.builder().put(defaultSettings.getSettings()).put("index.refresh_interval", "300s");
final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);

Store store = createStore();
InternalEngine engine = createEngine(indexSettings, store, createTempDir(), newMergePolicy());

final int numIndexingThreads = scaledRandomIntBetween(3, 8);
final int numDocsPerThread = randomIntBetween(500, 1000);
final CyclicBarrier barrier = new CyclicBarrier(numIndexingThreads + 1);
final List<Thread> indexingThreads = new ArrayList<>();
final CountDownLatch doneLatch = new CountDownLatch(numIndexingThreads);
// create N indexing threads to index documents simultaneously
for (int threadNum = 0; threadNum < numIndexingThreads; threadNum++) {
final int threadIdx = threadNum;
Thread indexingThread = new Thread(() -> {
try {
barrier.await(); // wait for all threads to start at the same time
// index random number of docs
for (int i = 0; i < numDocsPerThread; i++) {
final String id = "thread" + threadIdx + "#" + i;
ParsedDocument doc = testParsedDocument(id, null, testDocument(), B_1, null);
engine.index(indexForDoc(doc));
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
doneLatch.countDown();
}

});
indexingThreads.add(indexingThread);
}

// start the indexing threads
for (Thread thread : indexingThreads) {
thread.start();
}
barrier.await(); // wait for indexing threads to all be ready to start
assertTrue(doneLatch.await(10, TimeUnit.SECONDS));

engine.refresh("test");

try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = engine.getSegmentInfosSnapshot()) {
assertEquals(numIndexingThreads * numDocsPerThread - 1, engine.getMaxSeqNoFromSegmentInfos(segmentInfosGatedCloseable.get()));
}

store.close();
engine.close();
}
}

0 comments on commit 82e7e65

Please sign in to comment.