diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index 4c7b3fe25296e..528d6cc9f5e23 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -187,6 +187,8 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexSettings.FINAL_PIPELINE, MetadataIndexStateService.VERIFIED_BEFORE_CLOSE_SETTING, ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_SETTING, + IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED, + IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME, // validate that built-in similarities don't get redefined Setting.groupSetting("index.similarity.", (s) -> { diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 45d9a57442049..aa69417af1897 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -503,6 +503,27 @@ public final class IndexSettings { Setting.Property.IndexScope ); + /** + * Expert: sets the amount of time to wait for merges (during {@link org.apache.lucene.index.IndexWriter#commit} + * or {@link org.apache.lucene.index.IndexWriter#getReader(boolean, boolean)}) returned by MergePolicy.findFullFlushMerges(...). + * If this time is reached, we proceed with the commit based on segments merged up to that point. The merges are not + * aborted, and will still run to completion independent of the commit or getReader call, like natural segment merges. + */ + public static final Setting INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME = Setting.timeSetting( + "index.merge_on_flush.max_full_flush_merge_wait_time", + new TimeValue(10, TimeUnit.SECONDS), + new TimeValue(0, TimeUnit.MILLISECONDS), + Property.Dynamic, + Property.IndexScope + ); + + public static final Setting INDEX_MERGE_ON_FLUSH_ENABLED = Setting.boolSetting( + "index.merge_on_flush.enabled", + false, + Property.IndexScope, + Property.Dynamic + ); + private final Index index; private final Version version; private final Logger logger; @@ -584,6 +605,15 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) { */ private volatile int maxRegexLength; + /** + * The max amount of time to wait for merges + */ + private volatile TimeValue maxFullFlushMergeWaitTime; + /** + * Is merge of flush enabled or not + */ + private volatile boolean mergeOnFlushEnabled; + /** * Returns the default search fields for this index. */ @@ -696,6 +726,8 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti mappingTotalFieldsLimit = scopedSettings.get(INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING); mappingDepthLimit = scopedSettings.get(INDEX_MAPPING_DEPTH_LIMIT_SETTING); mappingFieldNameLengthLimit = scopedSettings.get(INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING); + maxFullFlushMergeWaitTime = scopedSettings.get(INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME); + mergeOnFlushEnabled = scopedSettings.get(INDEX_MERGE_ON_FLUSH_ENABLED); scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING, mergePolicyConfig::setNoCFSRatio); scopedSettings.addSettingsUpdateConsumer( @@ -765,6 +797,8 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti scopedSettings.addSettingsUpdateConsumer(INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING, this::setMappingTotalFieldsLimit); scopedSettings.addSettingsUpdateConsumer(INDEX_MAPPING_DEPTH_LIMIT_SETTING, this::setMappingDepthLimit); scopedSettings.addSettingsUpdateConsumer(INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING, this::setMappingFieldNameLengthLimit); + scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME, this::setMaxFullFlushMergeWaitTime); + scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_ON_FLUSH_ENABLED, this::setMergeOnFlushEnabled); } private void setSearchIdleAfter(TimeValue searchIdleAfter) { @@ -1328,4 +1362,20 @@ public long getMappingFieldNameLengthLimit() { private void setMappingFieldNameLengthLimit(long value) { this.mappingFieldNameLengthLimit = value; } + + private void setMaxFullFlushMergeWaitTime(TimeValue timeValue) { + this.maxFullFlushMergeWaitTime = timeValue; + } + + private void setMergeOnFlushEnabled(boolean enabled) { + this.mergeOnFlushEnabled = enabled; + } + + public TimeValue getMaxFullFlushMergeWaitTime() { + return this.maxFullFlushMergeWaitTime; + } + + public boolean isMergeOnFlushEnabled() { + return mergeOnFlushEnabled; + } } diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 84090047d68e8..6bef118e0b61f 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -50,6 +50,7 @@ import org.apache.lucene.index.ShuffleForcedMergePolicy; import org.apache.lucene.index.SoftDeletesRetentionMergePolicy; import org.apache.lucene.index.Term; +import org.apache.lucene.sandbox.index.MergeOnFlushMergePolicy; import org.apache.lucene.search.BooleanClause; import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.DocIdSetIterator; @@ -2425,6 +2426,21 @@ private IndexWriterConfig getIndexWriterConfig() { // to enable it. mergePolicy = new ShuffleForcedMergePolicy(mergePolicy); } + + if (config().getIndexSettings().isMergeOnFlushEnabled()) { + final long maxFullFlushMergeWaitMillis = config().getIndexSettings().getMaxFullFlushMergeWaitTime().millis(); + if (maxFullFlushMergeWaitMillis > 0) { + iwc.setMaxFullFlushMergeWaitMillis(maxFullFlushMergeWaitMillis); + mergePolicy = new MergeOnFlushMergePolicy(mergePolicy); + } else { + logger.warn( + "The {} is enabled but {} is set to 0, merge on flush will not be activated", + IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), + IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey() + ); + } + } + iwc.setMergePolicy(new OpenSearchMergePolicy(mergePolicy)); iwc.setSimilarity(engineConfig.getSimilarity()); iwc.setRAMBufferSizeMB(engineConfig.getIndexingBufferSize().getMbFrac()); diff --git a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java index 361013149578e..c33adf3bcb558 100644 --- a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java @@ -494,6 +494,212 @@ public void testSegments() throws Exception { } } + public void testMergeSegmentsOnCommitIsDisabled() throws Exception { + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + + final Settings.Builder settings = Settings.builder() + .put(defaultSettings.getSettings()) + .put(IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey(), TimeValue.timeValueMillis(0)) + .put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), true); + final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build(); + final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata); + + try ( + Store store = createStore(); + InternalEngine engine = createEngine( + config(indexSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get) + ) + ) { + assertThat(engine.segments(false), empty()); + int numDocsFirstSegment = randomIntBetween(5, 50); + Set liveDocsFirstSegment = new HashSet<>(); + for (int i = 0; i < numDocsFirstSegment; i++) { + String id = Integer.toString(i); + ParsedDocument doc = testParsedDocument(id, null, testDocument(), B_1, null); + engine.index(indexForDoc(doc)); + liveDocsFirstSegment.add(id); + } + engine.refresh("test"); + List segments = engine.segments(randomBoolean()); + assertThat(segments, hasSize(1)); + assertThat(segments.get(0).getNumDocs(), equalTo(liveDocsFirstSegment.size())); + assertThat(segments.get(0).getDeletedDocs(), equalTo(0)); + assertFalse(segments.get(0).committed); + int deletes = 0; + int updates = 0; + int appends = 0; + int iterations = scaledRandomIntBetween(1, 50); + for (int i = 0; i < iterations && liveDocsFirstSegment.isEmpty() == false; i++) { + String idToUpdate = randomFrom(liveDocsFirstSegment); + liveDocsFirstSegment.remove(idToUpdate); + ParsedDocument doc = testParsedDocument(idToUpdate, null, testDocument(), B_1, null); + if (randomBoolean()) { + engine.delete(new Engine.Delete(doc.id(), newUid(doc), primaryTerm.get())); + deletes++; + } else { + engine.index(indexForDoc(doc)); + updates++; + } + if (randomBoolean()) { + engine.index(indexForDoc(testParsedDocument(UUIDs.randomBase64UUID(), null, testDocument(), B_1, null))); + appends++; + } + } + + boolean committed = randomBoolean(); + if (committed) { + engine.flush(); + } + + engine.refresh("test"); + segments = engine.segments(randomBoolean()); + + assertThat(segments, hasSize(2)); + assertThat(segments, hasSize(2)); + assertThat(segments.get(0).getNumDocs(), equalTo(liveDocsFirstSegment.size())); + assertThat(segments.get(0).getDeletedDocs(), equalTo(updates + deletes)); + assertThat(segments.get(0).committed, equalTo(committed)); + + assertThat(segments.get(1).getNumDocs(), equalTo(updates + appends)); + assertThat(segments.get(1).getDeletedDocs(), equalTo(deletes)); // delete tombstones + assertThat(segments.get(1).committed, equalTo(committed)); + } + } + + public void testMergeSegmentsOnCommit() throws Exception { + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + + final Settings.Builder settings = Settings.builder() + .put(defaultSettings.getSettings()) + .put(IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey(), TimeValue.timeValueMillis(5000)) + .put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), true); + final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build(); + final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata); + + try ( + Store store = createStore(); + InternalEngine engine = createEngine( + config(indexSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get) + ) + ) { + assertThat(engine.segments(false), empty()); + int numDocsFirstSegment = randomIntBetween(5, 50); + Set liveDocsFirstSegment = new HashSet<>(); + for (int i = 0; i < numDocsFirstSegment; i++) { + String id = Integer.toString(i); + ParsedDocument doc = testParsedDocument(id, null, testDocument(), B_1, null); + engine.index(indexForDoc(doc)); + liveDocsFirstSegment.add(id); + } + engine.refresh("test"); + List segments = engine.segments(randomBoolean()); + assertThat(segments, hasSize(1)); + assertThat(segments.get(0).getNumDocs(), equalTo(liveDocsFirstSegment.size())); + assertThat(segments.get(0).getDeletedDocs(), equalTo(0)); + assertFalse(segments.get(0).committed); + int deletes = 0; + int updates = 0; + int appends = 0; + int iterations = scaledRandomIntBetween(1, 50); + for (int i = 0; i < iterations && liveDocsFirstSegment.isEmpty() == false; i++) { + String idToUpdate = randomFrom(liveDocsFirstSegment); + liveDocsFirstSegment.remove(idToUpdate); + ParsedDocument doc = testParsedDocument(idToUpdate, null, testDocument(), B_1, null); + if (randomBoolean()) { + engine.delete(new Engine.Delete(doc.id(), newUid(doc), primaryTerm.get())); + deletes++; + } else { + engine.index(indexForDoc(doc)); + updates++; + } + if (randomBoolean()) { + engine.index(indexForDoc(testParsedDocument(UUIDs.randomBase64UUID(), null, testDocument(), B_1, null))); + appends++; + } + } + + boolean committed = randomBoolean(); + if (committed) { + engine.flush(); + } + + engine.refresh("test"); + segments = engine.segments(randomBoolean()); + + // All segments have to be merged into one + assertThat(segments, hasSize(1)); + assertThat(segments.get(0).getNumDocs(), equalTo(numDocsFirstSegment + appends - deletes)); + assertThat(segments.get(0).getDeletedDocs(), equalTo(0)); + assertThat(segments.get(0).committed, equalTo(committed)); + } + } + + // this test writes documents to the engine while concurrently flushing/commit + public void testConcurrentMergeSegmentsOnCommit() throws Exception { + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + + final Settings.Builder settings = Settings.builder() + .put(defaultSettings.getSettings()) + .put(IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey(), TimeValue.timeValueMillis(5000)) + .put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), true); + final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build(); + final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata); + + try ( + Store store = createStore(); + InternalEngine engine = createEngine( + config(indexSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get) + ) + ) { + final int numIndexingThreads = scaledRandomIntBetween(3, 8); + final int numDocsPerThread = randomIntBetween(500, 1000); + final CyclicBarrier barrier = new CyclicBarrier(numIndexingThreads + 1); + final List indexingThreads = new ArrayList<>(); + final CountDownLatch doneLatch = new CountDownLatch(numIndexingThreads); + // create N indexing threads to index documents simultaneously + for (int threadNum = 0; threadNum < numIndexingThreads; threadNum++) { + final int threadIdx = threadNum; + Thread indexingThread = new Thread(() -> { + try { + barrier.await(); // wait for all threads to start at the same time + // index random number of docs + for (int i = 0; i < numDocsPerThread; i++) { + final String id = "thread" + threadIdx + "#" + i; + ParsedDocument doc = testParsedDocument(id, null, testDocument(), B_1, null); + engine.index(indexForDoc(doc)); + } + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + doneLatch.countDown(); + } + + }); + indexingThreads.add(indexingThread); + } + + // start the indexing threads + for (Thread thread : indexingThreads) { + thread.start(); + } + barrier.await(); // wait for indexing threads to all be ready to start + assertThat(doneLatch.await(10, TimeUnit.SECONDS), is(true)); + + boolean committed = randomBoolean(); + if (committed) { + engine.flush(); + } + + engine.refresh("test"); + List segments = engine.segments(randomBoolean()); + + // All segments have to be merged into one + assertThat(segments, hasSize(1)); + assertThat(segments.get(0).getNumDocs(), equalTo(numIndexingThreads * numDocsPerThread)); + assertThat(segments.get(0).committed, equalTo(committed)); + } + } + public void testCommitStats() throws IOException { final AtomicLong maxSeqNo = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); final AtomicLong localCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);