From 63834d9f9d803c38c3a64a154a7986cfb8c1eaa8 Mon Sep 17 00:00:00 2001 From: Ashish Date: Mon, 22 May 2023 09:21:11 +0530 Subject: [PATCH] [Remote segments] Add backpressure in write path on segments lag between local and remote store (#7459) Signed-off-by: Ashish Singh --- .../opensearch/index/shard/IndexShardIT.java | 1 + ...emoteStoreMockRepositoryIntegTestCase.java | 145 +++++++ .../RemoteStoreBackpressureIT.java | 43 ++ ...java => RemoteStoreBaseIntegTestCase.java} | 2 +- .../opensearch/remotestore/RemoteStoreIT.java | 2 +- .../RemoteStoreRefreshListenerIT.java | 140 +----- .../ReplicaToPrimaryPromotionIT.java | 2 +- .../action/bulk/TransportShardBulkAction.java | 17 +- .../common/settings/ClusterSettings.java | 1 - .../org/opensearch/index/IndexService.java | 7 +- .../RemoteRefreshSegmentPressureService.java | 66 +-- .../RemoteRefreshSegmentPressureSettings.java | 19 +- .../remote/RemoteRefreshSegmentTracker.java | 86 ++-- .../opensearch/index/shard/IndexShard.java | 10 +- .../shard/RemoteStoreRefreshListener.java | 404 +++++++++++++----- .../org/opensearch/indices/IndicesModule.java | 5 + .../opensearch/indices/IndicesService.java | 12 +- .../cluster/IndicesClusterStateService.java | 24 +- .../bulk/TransportShardBulkActionTests.java | 5 + ...oteRefreshSegmentPressureServiceTests.java | 26 +- ...teRefreshSegmentPressureSettingsTests.java | 22 +- .../RemoteRefreshSegmentTrackerTests.java | 14 +- .../RemoteStoreRefreshListenerTests.java | 127 +++++- ...dicesLifecycleListenerSingleNodeTests.java | 3 +- ...actIndicesClusterStateServiceTestCase.java | 4 +- ...ClusterStateServiceRandomUpdatesTests.java | 3 +- .../snapshots/SnapshotResiliencyTests.java | 5 +- .../index/shard/IndexShardTestCase.java | 21 +- .../snapshots/mockstore/MockRepository.java | 11 +- 29 files changed, 816 insertions(+), 411 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotestore/AbstractRemoteStoreMockRepositoryIntegTestCase.java create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureIT.java rename server/src/internalClusterTest/java/org/opensearch/remotestore/{RemoteStoreBaseIT.java => RemoteStoreBaseIntegTestCase.java} (97%) diff --git a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java index 11f187ac6e619..ba567c125c6e9 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java @@ -701,6 +701,7 @@ public static final IndexShard newIndexShard( cbs, (indexSettings, shardRouting) -> new InternalTranslogFactory(), SegmentReplicationCheckpointPublisher.EMPTY, + null, null ); } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/AbstractRemoteStoreMockRepositoryIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/AbstractRemoteStoreMockRepositoryIntegTestCase.java new file mode 100644 index 0000000000000..dc312ffa6676d --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/AbstractRemoteStoreMockRepositoryIntegTestCase.java @@ -0,0 +1,145 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore; + +import org.junit.Before; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.UUIDs; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.core.util.FileSystemUtils; +import org.opensearch.index.IndexModule; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.store.RemoteSegmentStoreDirectory; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.snapshots.AbstractSnapshotIntegTestCase; +import org.opensearch.test.FeatureFlagSetter; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collections; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +public abstract class AbstractRemoteStoreMockRepositoryIntegTestCase extends AbstractSnapshotIntegTestCase { + + protected static final String REPOSITORY_NAME = "my-segment-repo-1"; + protected static final String INDEX_NAME = "remote-store-test-idx-1"; + + @Override + protected Settings featureFlagSettings() { + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE, "true").build(); + } + + @Before + public void setup() { + FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE); + internalCluster().startClusterManagerOnlyNode(); + } + + @Override + public Settings indexSettings() { + return remoteStoreIndexSettings(0); + } + + protected Settings remoteStoreIndexSettings(int numberOfReplicas) { + return Settings.builder() + .put(super.indexSettings()) + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "300s") + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas) + .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME) + .build(); + } + + protected void deleteRepo() { + logger.info("--> Deleting the repository={}", REPOSITORY_NAME); + assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME)); + } + + protected void setup(Path repoLocation, double ioFailureRate, String skipExceptionBlobList) { + logger.info("--> Creating repository={} at the path={}", REPOSITORY_NAME, repoLocation); + // The random_control_io_exception_rate setting ensures that 10-25% of all operations to remote store results in + /// IOException. skip_exception_on_verification_file & skip_exception_on_list_blobs settings ensures that the + // repository creation can happen without failure. + createRepository( + REPOSITORY_NAME, + "mock", + Settings.builder() + .put("location", repoLocation) + .put("random_control_io_exception_rate", ioFailureRate) + .put("skip_exception_on_verification_file", true) + .put("skip_exception_on_list_blobs", true) + // Skipping is required for metadata as it is part of recovery + .put("skip_exception_on_blobs", skipExceptionBlobList) + .put("max_failure_number", Long.MAX_VALUE) + ); + + internalCluster().startDataOnlyNodes(1); + createIndex(INDEX_NAME); + logger.info("--> Created index={}", INDEX_NAME); + ensureYellowAndNoInitializingShards(INDEX_NAME); + logger.info("--> Cluster is yellow with no initializing shards"); + ensureGreen(INDEX_NAME); + logger.info("--> Cluster is green"); + } + + /** + * Gets all segment files which starts with "_". For instance, _0.cfe, _o.cfs etc. + * + * @param location the path to location where segment files are being searched. + * @return set of file names of all segment file or empty set if there was IOException thrown. + */ + protected Set getSegmentFiles(Path location) { + try { + return Arrays.stream(FileSystemUtils.files(location)) + .filter(path -> path.getFileName().toString().startsWith("_")) + .map(path -> path.getFileName().toString()) + .map(this::getLocalSegmentFilename) + .collect(Collectors.toSet()); + } catch (IOException exception) { + logger.error("Exception occurred while getting segment files", exception); + } + return Collections.emptySet(); + } + + private String getLocalSegmentFilename(String remoteFilename) { + return remoteFilename.split(RemoteSegmentStoreDirectory.SEGMENT_NAME_UUID_SEPARATOR)[0]; + } + + private IndexResponse indexSingleDoc() { + return client().prepareIndex(INDEX_NAME) + .setId(UUIDs.randomBase64UUID()) + .setSource(randomAlphaOfLength(5), randomAlphaOfLength(5)) + .get(); + } + + protected void indexData(int numberOfIterations, boolean invokeFlush) { + logger.info("--> Indexing data for {} iterations with flush={}", numberOfIterations, invokeFlush); + for (int i = 0; i < numberOfIterations; i++) { + int numberOfOperations = randomIntBetween(1, 5); + logger.info("--> Indexing {} operations in iteration #{}", numberOfOperations, i); + for (int j = 0; j < numberOfOperations; j++) { + indexSingleDoc(); + } + if (invokeFlush) { + flush(INDEX_NAME); + } else { + refresh(INDEX_NAME); + } + } + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureIT.java new file mode 100644 index 0000000000000..c3e4af0f05e35 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureIT.java @@ -0,0 +1,43 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore; + +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.nio.file.Path; + +import static org.opensearch.index.remote.RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class RemoteStoreBackpressureIT extends AbstractRemoteStoreMockRepositoryIntegTestCase { + + public void testWritesRejected() { + Path location = randomRepoPath().toAbsolutePath(); + setup(location, 1d, "metadata"); + + Settings request = Settings.builder().put(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true).build(); + ClusterUpdateSettingsResponse clusterUpdateResponse = client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(request) + .get(); + assertEquals(clusterUpdateResponse.getPersistentSettings().get(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey()), "true"); + + logger.info("--> Indexing data"); + OpenSearchRejectedExecutionException ex = assertThrows( + OpenSearchRejectedExecutionException.class, + () -> indexData(randomIntBetween(10, 20), randomBoolean()) + ); + assertTrue(ex.getMessage().contains("rejected execution on primary shard")); + deleteRepo(); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java similarity index 97% rename from server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIT.java rename to server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java index bdaa5af222459..0914506e632dd 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java @@ -26,7 +26,7 @@ import static java.util.Arrays.asList; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; -public class RemoteStoreBaseIT extends OpenSearchIntegTestCase { +public class RemoteStoreBaseIntegTestCase extends OpenSearchIntegTestCase { protected static final String REPOSITORY_NAME = "test-remore-store-repo"; protected static final int SHARD_COUNT = 1; protected static final int REPLICA_COUNT = 1; diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index 41ae0da9ccb72..f069950c11f17 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -34,7 +34,7 @@ import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) -public class RemoteStoreIT extends RemoteStoreBaseIT { +public class RemoteStoreIT extends RemoteStoreBaseIntegTestCase { private static final String INDEX_NAME = "remote-store-test-idx-1"; private static final String TOTAL_OPERATIONS = "total-operations"; diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java index 30e370f3a528c..eb95c2a270d1a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java @@ -8,79 +8,21 @@ package org.opensearch.remotestore; -import org.junit.After; -import org.junit.Before; +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; import org.opensearch.action.admin.indices.stats.IndicesStatsRequest; import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; -import org.opensearch.action.index.IndexResponse; -import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.common.UUIDs; import org.opensearch.common.settings.Settings; -import org.opensearch.common.util.FeatureFlags; -import org.opensearch.core.util.FileSystemUtils; -import org.opensearch.index.IndexModule; -import org.opensearch.index.IndexSettings; -import org.opensearch.index.store.RemoteSegmentStoreDirectory; -import org.opensearch.indices.replication.common.ReplicationType; -import org.opensearch.snapshots.AbstractSnapshotIntegTestCase; -import org.opensearch.test.FeatureFlagSetter; import org.opensearch.test.OpenSearchIntegTestCase; -import java.io.IOException; import java.nio.file.Path; -import java.util.Arrays; -import java.util.Collections; import java.util.Locale; import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.index.remote.RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) -public class RemoteStoreRefreshListenerIT extends AbstractSnapshotIntegTestCase { - - private static final String REPOSITORY_NAME = "my-segment-repo-1"; - private static final String INDEX_NAME = "remote-store-test-idx-1"; - - @Override - protected Settings featureFlagSettings() { - return Settings.builder() - .put(super.featureFlagSettings()) - .put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true") - .put(FeatureFlags.REMOTE_STORE, "true") - .build(); - } - - @Before - public void setup() { - FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE); - internalCluster().startClusterManagerOnlyNode(); - } - - @Override - public Settings indexSettings() { - return remoteStoreIndexSettings(0); - } - - private Settings remoteStoreIndexSettings(int numberOfReplicas) { - return Settings.builder() - .put(super.indexSettings()) - .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "300s") - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas) - .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) - .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) - .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) - .put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME) - .build(); - } - - @After - public void teardown() { - logger.info("--> Deleting the repository={}", REPOSITORY_NAME); - assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME)); - } +public class RemoteStoreRefreshListenerIT extends AbstractRemoteStoreMockRepositoryIntegTestCase { public void testRemoteRefreshRetryOnFailure() throws Exception { @@ -107,76 +49,16 @@ public void testRemoteRefreshRetryOnFailure() throws Exception { Set filesInRepo = getSegmentFiles(segmentDataRepoPath); assertTrue(filesInRepo.containsAll(filesInLocal)); }, 60, TimeUnit.SECONDS); + deleteRepo(); } - private void setup(Path repoLocation, double ioFailureRate, String skipExceptionBlobList) { - logger.info("--> Creating repository={} at the path={}", REPOSITORY_NAME, repoLocation); - // The random_control_io_exception_rate setting ensures that 10-25% of all operations to remote store results in - /// IOException. skip_exception_on_verification_file & skip_exception_on_list_blobs settings ensures that the - // repository creation can happen without failure. - createRepository( - REPOSITORY_NAME, - "mock", - Settings.builder() - .put("location", repoLocation) - .put("random_control_io_exception_rate", ioFailureRate) - .put("skip_exception_on_verification_file", true) - .put("skip_exception_on_list_blobs", true) - .put("max_failure_number", Long.MAX_VALUE) - ); - - internalCluster().startDataOnlyNodes(1); - createIndex(INDEX_NAME); - logger.info("--> Created index={}", INDEX_NAME); - ensureYellowAndNoInitializingShards(INDEX_NAME); - logger.info("--> Cluster is yellow with no initializing shards"); - ensureGreen(INDEX_NAME); - logger.info("--> Cluster is green"); - } - - /** - * Gets all segment files which starts with "_". For instance, _0.cfe, _o.cfs etc. - * - * @param location the path to location where segment files are being searched. - * @return set of file names of all segment file or empty set if there was IOException thrown. - */ - private Set getSegmentFiles(Path location) { - try { - return Arrays.stream(FileSystemUtils.files(location)) - .filter(path -> path.getFileName().toString().startsWith("_")) - .map(path -> path.getFileName().toString()) - .map(this::getLocalSegmentFilename) - .collect(Collectors.toSet()); - } catch (IOException exception) { - logger.error("Exception occurred while getting segment files", exception); - } - return Collections.emptySet(); - } - - private String getLocalSegmentFilename(String remoteFilename) { - return remoteFilename.split(RemoteSegmentStoreDirectory.SEGMENT_NAME_UUID_SEPARATOR)[0]; - } - - private IndexResponse indexSingleDoc() { - return client().prepareIndex(INDEX_NAME) - .setId(UUIDs.randomBase64UUID()) - .setSource(randomAlphaOfLength(5), randomAlphaOfLength(5)) - .get(); - } + public void testRemoteRefreshSegmentPressureSettingChanged() { + Settings request = Settings.builder().put(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true).build(); + ClusterUpdateSettingsResponse response = client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get(); + assertEquals(response.getPersistentSettings().get(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey()), "true"); - private void indexData(int numberOfIterations, boolean invokeFlush) { - logger.info("--> Indexing data for {} iterations with flush={}", numberOfIterations, invokeFlush); - for (int i = 0; i < numberOfIterations; i++) { - int numberOfOperations = randomIntBetween(1, 5); - logger.info("--> Indexing {} operations in iteration #{}", numberOfOperations, i); - for (int j = 0; j < numberOfOperations; j++) { - indexSingleDoc(); - } - if (invokeFlush) { - flush(INDEX_NAME); - } else { - refresh(INDEX_NAME); - } - } + request = Settings.builder().put(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), false).build(); + response = client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get(); + assertEquals(response.getPersistentSettings().get(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey()), "false"); } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/ReplicaToPrimaryPromotionIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/ReplicaToPrimaryPromotionIT.java index fe8f3612be6bf..712747f7479ae 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/ReplicaToPrimaryPromotionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/ReplicaToPrimaryPromotionIT.java @@ -28,7 +28,7 @@ import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; @OpenSearchIntegTestCase.ClusterScope(numDataNodes = 0) -public class ReplicaToPrimaryPromotionIT extends RemoteStoreBaseIT { +public class ReplicaToPrimaryPromotionIT extends RemoteStoreBaseIntegTestCase { private int shard_count = 5; @Override diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java index c41a15ac41c4c..9e6b1bd59b386 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java @@ -76,6 +76,7 @@ import org.opensearch.common.lease.Releasable; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.concurrent.AbstractRunnable; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.common.xcontent.XContentHelper; @@ -88,6 +89,7 @@ import org.opensearch.index.mapper.MapperException; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.mapper.SourceToParse; +import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; @@ -135,6 +137,7 @@ public class TransportShardBulkAction extends TransportWriteAction globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, - final SegmentReplicationCheckpointPublisher checkpointPublisher + final SegmentReplicationCheckpointPublisher checkpointPublisher, + final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService ) throws IOException { Objects.requireNonNull(retentionLeaseSyncer); /* @@ -506,7 +508,8 @@ public synchronized IndexShard createShard( circuitBreakerService, translogFactorySupplier, this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null, - remoteStore + remoteStore, + remoteRefreshSegmentPressureService ); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureService.java b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureService.java index 37935cc0eb29d..280381a7b6109 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureService.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureService.java @@ -50,10 +50,9 @@ public class RemoteRefreshSegmentPressureService implements IndexEventListener { public RemoteRefreshSegmentPressureService(ClusterService clusterService, Settings settings) { pressureSettings = new RemoteRefreshSegmentPressureSettings(clusterService, settings, this); lagValidators = Arrays.asList( - new RefreshSeqNoLagValidator(pressureSettings), + new ConsecutiveFailureValidator(pressureSettings), new BytesLagValidator(pressureSettings), - new TimeLagValidator(pressureSettings), - new ConsecutiveFailureValidator(pressureSettings) + new TimeLagValidator(pressureSettings) ); } @@ -87,11 +86,10 @@ public void afterIndexShardCreated(IndexShard indexShard) { @Override public void afterIndexShardClosed(ShardId shardId, IndexShard indexShard, Settings indexSettings) { - if (indexShard.indexSettings().isRemoteStoreEnabled() == false) { - return; + RemoteRefreshSegmentTracker remoteRefreshSegmentTracker = trackerMap.remove(shardId); + if (remoteRefreshSegmentTracker != null) { + logger.trace("Deleted tracker for shardId={}", shardId); } - trackerMap.remove(shardId); - logger.trace("Deleted tracker for shardId={}", shardId); } /** @@ -103,11 +101,16 @@ public boolean isSegmentsUploadBackpressureEnabled() { return pressureSettings.isRemoteRefreshSegmentPressureEnabled(); } + /** + * Validates if segments are lagging more than the limits. If yes, it would lead to rejections of the requests. + * + * @param shardId shardId for which the validation needs to be done. + */ public void validateSegmentsUploadLag(ShardId shardId) { RemoteRefreshSegmentTracker remoteRefreshSegmentTracker = getRemoteRefreshSegmentTracker(shardId); - // Check if refresh checkpoint (a.k.a. seq number) lag is 2 or below - this is to handle segment merges that can - // increase the bytes to upload almost suddenly. - if (remoteRefreshSegmentTracker.getRefreshSeqNoLag() <= 1) { + // condition 1 - This will be null for non-remote backed indexes + // condition 2 - This will be zero if the remote store is + if (remoteRefreshSegmentTracker == null || remoteRefreshSegmentTracker.getRefreshSeqNoLag() == 0) { return; } @@ -167,43 +170,6 @@ private LagValidator(RemoteRefreshSegmentPressureSettings pressureSettings) { abstract String rejectionMessage(RemoteRefreshSegmentTracker pressureTracker, ShardId shardId); } - /** - * Check if the remote store seq no lag is above the min seq no lag limit - * - * @opensearch.internal - */ - private static class RefreshSeqNoLagValidator extends LagValidator { - - private static final String NAME = "refresh_seq_no_lag"; - - private RefreshSeqNoLagValidator(RemoteRefreshSegmentPressureSettings pressureSettings) { - super(pressureSettings); - } - - @Override - public boolean validate(RemoteRefreshSegmentTracker pressureTracker, ShardId shardId) { - // Check if the remote store seq no lag is above the min seq no lag limit - return pressureTracker.getRefreshSeqNoLag() <= pressureSettings.getMinRefreshSeqNoLagLimit(); - } - - @Override - String rejectionMessage(RemoteRefreshSegmentTracker pressureTracker, ShardId shardId) { - return String.format( - Locale.ROOT, - "rejected execution on primary shard:%s due to remote segments lagging behind local segments." - + "remote_refresh_seq_no:%s local_refresh_seq_no:%s", - shardId, - pressureTracker.getRemoteRefreshSeqNo(), - pressureTracker.getLocalRefreshSeqNo() - ); - } - - @Override - String name() { - return NAME; - } - } - /** * Check if the remote store is lagging more than the upload bytes average multiplied by a variance factor * @@ -219,6 +185,9 @@ private BytesLagValidator(RemoteRefreshSegmentPressureSettings pressureSettings) @Override public boolean validate(RemoteRefreshSegmentTracker pressureTracker, ShardId shardId) { + if (pressureTracker.getRefreshSeqNoLag() <= 1) { + return true; + } if (pressureTracker.isUploadBytesAverageReady() == false) { logger.trace("upload bytes moving average is not ready"); return true; @@ -262,6 +231,9 @@ private TimeLagValidator(RemoteRefreshSegmentPressureSettings pressureSettings) @Override public boolean validate(RemoteRefreshSegmentTracker pressureTracker, ShardId shardId) { + if (pressureTracker.getRefreshSeqNoLag() <= 1) { + return true; + } if (pressureTracker.isUploadTimeMsAverageReady() == false) { logger.trace("upload time moving average is not ready"); return true; diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettings.java b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettings.java index 6cb0d1d07e78b..2a098b8f7a89b 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettings.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettings.java @@ -21,12 +21,10 @@ public class RemoteRefreshSegmentPressureSettings { private static class Defaults { - private static final long MIN_SEQ_NO_LAG_LIMIT = 5; - private static final long MIN_SEQ_NO_LAG_LIMIT_MIN_VALUE = 2; - private static final double BYTES_LAG_VARIANCE_FACTOR = 2.0; - private static final double UPLOAD_TIME_LAG_VARIANCE_FACTOR = 2.0; + private static final double BYTES_LAG_VARIANCE_FACTOR = 10.0; + private static final double UPLOAD_TIME_LAG_VARIANCE_FACTOR = 10.0; private static final double VARIANCE_FACTOR_MIN_VALUE = 1.0; - private static final int MIN_CONSECUTIVE_FAILURES_LIMIT = 10; + private static final int MIN_CONSECUTIVE_FAILURES_LIMIT = 5; private static final int MIN_CONSECUTIVE_FAILURES_LIMIT_MIN_VALUE = 1; private static final int UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE = 20; private static final int UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE = 20; @@ -41,14 +39,6 @@ private static class Defaults { Setting.Property.NodeScope ); - public static final Setting MIN_SEQ_NO_LAG_LIMIT = Setting.longSetting( - "remote_store.segment.pressure.seq_no_lag.limit", - Defaults.MIN_SEQ_NO_LAG_LIMIT, - Defaults.MIN_SEQ_NO_LAG_LIMIT_MIN_VALUE, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - public static final Setting BYTES_LAG_VARIANCE_FACTOR = Setting.doubleSetting( "remote_store.segment.pressure.bytes_lag.variance_factor", Defaults.BYTES_LAG_VARIANCE_FACTOR, @@ -123,9 +113,6 @@ public RemoteRefreshSegmentPressureSettings( this.remoteRefreshSegmentPressureEnabled = REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.get(settings); clusterSettings.addSettingsUpdateConsumer(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED, this::setRemoteRefreshSegmentPressureEnabled); - this.minRefreshSeqNoLagLimit = MIN_SEQ_NO_LAG_LIMIT.get(settings); - clusterSettings.addSettingsUpdateConsumer(MIN_SEQ_NO_LAG_LIMIT, this::setMinRefreshSeqNoLagLimit); - this.bytesLagVarianceFactor = BYTES_LAG_VARIANCE_FACTOR.get(settings); clusterSettings.addSettingsUpdateConsumer(BYTES_LAG_VARIANCE_FACTOR, this::setBytesLagVarianceFactor); diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java index ac366806d9894..800cf176548a0 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java @@ -120,7 +120,7 @@ public class RemoteRefreshSegmentTracker { /** * Set of names of segment files that were uploaded as part of the most recent remote refresh. */ - private final Set latestUploadFiles = new HashSet<>(); + private final Set latestUploadedFiles = new HashSet<>(); /** * Keeps the bytes lag computed so that we do not compute it for every request. @@ -181,28 +181,28 @@ ShardId getShardId() { return shardId; } - long getLocalRefreshSeqNo() { + public long getLocalRefreshSeqNo() { return localRefreshSeqNo; } - void updateLocalRefreshSeqNo(long localRefreshSeqNo) { - assert localRefreshSeqNo > this.localRefreshSeqNo : "newLocalRefreshSeqNo=" + public void updateLocalRefreshSeqNo(long localRefreshSeqNo) { + assert localRefreshSeqNo >= this.localRefreshSeqNo : "newLocalRefreshSeqNo=" + localRefreshSeqNo - + ">=" + + " < " + "currentLocalRefreshSeqNo=" + this.localRefreshSeqNo; this.localRefreshSeqNo = localRefreshSeqNo; computeRefreshSeqNoLag(); } - long getLocalRefreshTimeMs() { + public long getLocalRefreshTimeMs() { return localRefreshTimeMs; } - void updateLocalRefreshTimeMs(long localRefreshTimeMs) { - assert localRefreshTimeMs > this.localRefreshTimeMs : "newLocalRefreshTimeMs=" + public void updateLocalRefreshTimeMs(long localRefreshTimeMs) { + assert localRefreshTimeMs >= this.localRefreshTimeMs : "newLocalRefreshTimeMs=" + localRefreshTimeMs - + ">=" + + " < " + "currentLocalRefreshTimeMs=" + this.localRefreshTimeMs; this.localRefreshTimeMs = localRefreshTimeMs; @@ -213,10 +213,10 @@ long getRemoteRefreshSeqNo() { return remoteRefreshSeqNo; } - void updateRemoteRefreshSeqNo(long remoteRefreshSeqNo) { - assert remoteRefreshSeqNo > this.remoteRefreshSeqNo : "newRemoteRefreshSeqNo=" + public void updateRemoteRefreshSeqNo(long remoteRefreshSeqNo) { + assert remoteRefreshSeqNo >= this.remoteRefreshSeqNo : "newRemoteRefreshSeqNo=" + remoteRefreshSeqNo - + ">=" + + " < " + "currentRemoteRefreshSeqNo=" + this.remoteRefreshSeqNo; this.remoteRefreshSeqNo = remoteRefreshSeqNo; @@ -227,10 +227,10 @@ long getRemoteRefreshTimeMs() { return remoteRefreshTimeMs; } - void updateRemoteRefreshTimeMs(long remoteRefreshTimeMs) { - assert remoteRefreshTimeMs > this.remoteRefreshTimeMs : "newRemoteRefreshTimeMs=" + public void updateRemoteRefreshTimeMs(long remoteRefreshTimeMs) { + assert remoteRefreshTimeMs >= this.remoteRefreshTimeMs : "newRemoteRefreshTimeMs=" + remoteRefreshTimeMs - + ">=" + + " < " + "currentRemoteRefreshTimeMs=" + this.remoteRefreshTimeMs; this.remoteRefreshTimeMs = remoteRefreshTimeMs; @@ -241,7 +241,7 @@ private void computeRefreshSeqNoLag() { refreshSeqNoLag = localRefreshSeqNo - remoteRefreshSeqNo; } - long getRefreshSeqNoLag() { + public long getRefreshSeqNoLag() { return refreshSeqNoLag; } @@ -249,73 +249,73 @@ private void computeTimeMsLag() { timeMsLag = localRefreshTimeMs - remoteRefreshTimeMs; } - long getTimeMsLag() { + public long getTimeMsLag() { return timeMsLag; } - long getBytesLag() { + public long getBytesLag() { return bytesLag; } - long getUploadBytesStarted() { + public long getUploadBytesStarted() { return uploadBytesStarted; } - void addUploadBytesStarted(long size) { + public void addUploadBytesStarted(long size) { uploadBytesStarted += size; } - long getUploadBytesFailed() { + public long getUploadBytesFailed() { return uploadBytesFailed; } - void addUploadBytesFailed(long size) { + public void addUploadBytesFailed(long size) { uploadBytesFailed += size; } - long getUploadBytesSucceeded() { + public long getUploadBytesSucceeded() { return uploadBytesSucceeded; } - void addUploadBytesSucceeded(long size) { + public void addUploadBytesSucceeded(long size) { uploadBytesSucceeded += size; } - long getInflightUploadBytes() { + public long getInflightUploadBytes() { return uploadBytesStarted - uploadBytesFailed - uploadBytesSucceeded; } - long getTotalUploadsStarted() { + public long getTotalUploadsStarted() { return totalUploadsStarted; } - void incrementTotalUploadsStarted() { + public void incrementTotalUploadsStarted() { totalUploadsStarted += 1; } - long getTotalUploadsFailed() { + public long getTotalUploadsFailed() { return totalUploadsFailed; } - void incrementTotalUploadsFailed() { + public void incrementTotalUploadsFailed() { totalUploadsFailed += 1; failures.record(true); } - long getTotalUploadsSucceeded() { + public long getTotalUploadsSucceeded() { return totalUploadsSucceeded; } - void incrementTotalUploadSucceeded() { + public void incrementTotalUploadsSucceeded() { totalUploadsSucceeded += 1; failures.record(false); } - long getInflightUploads() { + public long getInflightUploads() { return totalUploadsStarted - totalUploadsFailed - totalUploadsSucceeded; } - long getRejectionCount() { + public long getRejectionCount() { return rejectionCount.get(); } @@ -335,13 +335,19 @@ Map getLatestLocalFileNameLengthMap() { return latestLocalFileNameLengthMap; } - void setLatestLocalFileNameLengthMap(Map latestLocalFileNameLengthMap) { + public void setLatestLocalFileNameLengthMap(Map latestLocalFileNameLengthMap) { this.latestLocalFileNameLengthMap = latestLocalFileNameLengthMap; computeBytesLag(); } - void addToLatestUploadFiles(String file) { - this.latestUploadFiles.add(file); + public void addToLatestUploadedFiles(String file) { + this.latestUploadedFiles.add(file); + computeBytesLag(); + } + + public void setLatestUploadedFiles(Set files) { + this.latestUploadedFiles.clear(); + this.latestUploadedFiles.addAll(files); computeBytesLag(); } @@ -351,7 +357,7 @@ private void computeBytesLag() { } Set filesNotYetUploaded = latestLocalFileNameLengthMap.keySet() .stream() - .filter(f -> !latestUploadFiles.contains(f)) + .filter(f -> !latestUploadedFiles.contains(f)) .collect(Collectors.toSet()); this.bytesLag = filesNotYetUploaded.stream().map(latestLocalFileNameLengthMap::get).mapToLong(Long::longValue).sum(); } @@ -368,7 +374,7 @@ boolean isUploadBytesAverageReady() { return uploadBytesMovingAverageReference.get().getAverage(); } - void addUploadBytes(long size) { + public void addUploadBytes(long size) { lastSuccessfulRemoteRefreshBytes = size; synchronized (uploadBytesMutex) { this.uploadBytesMovingAverageReference.get().record(size); @@ -394,7 +400,7 @@ boolean isUploadBytesPerSecAverageReady() { return uploadBytesPerSecMovingAverageReference.get().getAverage(); } - void addUploadBytesPerSec(long bytesPerSec) { + public void addUploadBytesPerSec(long bytesPerSec) { synchronized (uploadBytesPerSecMutex) { this.uploadBytesPerSecMovingAverageReference.get().record(bytesPerSec); } @@ -419,7 +425,7 @@ boolean isUploadTimeMsAverageReady() { return uploadTimeMsMovingAverageReference.get().getAverage(); } - void addUploadTimeMs(long timeMs) { + public void addUploadTimeMs(long timeMs) { synchronized (uploadTimeMsMutex) { this.uploadTimeMsMovingAverageReference.get().record(timeMs); } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index a6225569d86b4..8b542be222f25 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -143,6 +143,7 @@ import org.opensearch.index.merge.MergeStats; import org.opensearch.index.recovery.RecoveryStats; import org.opensearch.index.refresh.RefreshStats; +import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; import org.opensearch.index.search.stats.SearchStats; import org.opensearch.index.search.stats.ShardSearchStats; import org.opensearch.index.seqno.ReplicationTracker; @@ -328,8 +329,8 @@ Runnable getGlobalCheckpointSyncer() { private volatile boolean useRetentionLeasesInPeerRecovery; private final Store remoteStore; private final BiFunction translogFactorySupplier; - private final boolean isTimeSeriesIndex; + private final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService; public IndexShard( final ShardRouting shardRouting, @@ -354,7 +355,8 @@ public IndexShard( final CircuitBreakerService circuitBreakerService, final BiFunction translogFactorySupplier, @Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher, - @Nullable final Store remoteStore + @Nullable final Store remoteStore, + final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService ) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); @@ -449,6 +451,7 @@ public boolean shouldCache(Query query) { this.isTimeSeriesIndex = (mapperService == null || mapperService.documentMapper() == null) ? false : mapperService.documentMapper().mappers().containsTimeStampField(); + this.remoteRefreshSegmentPressureService = remoteRefreshSegmentPressureService; } public ThreadPool getThreadPool() { @@ -3550,7 +3553,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro new RemoteStoreRefreshListener( this, // Add the checkpoint publisher if the Segment Replciation via remote store is enabled. - indexSettings.isSegRepWithRemoteEnabled() ? this.checkpointPublisher : SegmentReplicationCheckpointPublisher.EMPTY + indexSettings.isSegRepWithRemoteEnabled() ? this.checkpointPublisher : SegmentReplicationCheckpointPublisher.EMPTY, + remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(shardId()) ) ); } diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index ac9c35aaee6b5..88b71a92d7340 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -21,22 +21,26 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.opensearch.action.bulk.BackoffPolicy; +import org.opensearch.common.CheckedFunction; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.index.engine.EngineException; import org.opensearch.index.engine.InternalEngine; +import org.opensearch.index.remote.RemoteRefreshSegmentTracker; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.store.RemoteSegmentStoreDirectory; -import org.opensearch.threadpool.Scheduler; -import org.opensearch.threadpool.ThreadPool; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; +import org.opensearch.threadpool.Scheduler; +import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -86,6 +90,7 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres private final IndexShard indexShard; private final Directory storeDirectory; private final RemoteSegmentStoreDirectory remoteDirectory; + private final RemoteRefreshSegmentTracker segmentTracker; private final Map localSegmentChecksumMap; private long primaryTerm; @@ -98,9 +103,20 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres private volatile Scheduler.ScheduledCancellable scheduledCancellableRetry; + /** + * Keeps track of segment files and their size in bytes which are part of the most recent refresh. + */ + private final Map latestFileNameSizeOnLocalMap = ConcurrentCollections.newConcurrentMap(); + private final SegmentReplicationCheckpointPublisher checkpointPublisher; - public RemoteStoreRefreshListener(IndexShard indexShard, SegmentReplicationCheckpointPublisher checkpointPublisher) { + private final FileUploader fileUploader; + + public RemoteStoreRefreshListener( + IndexShard indexShard, + SegmentReplicationCheckpointPublisher checkpointPublisher, + RemoteRefreshSegmentTracker segmentTracker + ) { this.indexShard = indexShard; this.storeDirectory = indexShard.store().directory(); this.remoteDirectory = (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory()) @@ -114,14 +130,33 @@ public RemoteStoreRefreshListener(IndexShard indexShard, SegmentReplicationCheck logger.error("Exception while initialising RemoteSegmentStoreDirectory", e); } } + this.segmentTracker = segmentTracker; resetBackOffDelayIterator(); this.checkpointPublisher = checkpointPublisher; + this.fileUploader = new FileUploader(new UploadTracker() { + @Override + public void beforeUpload(String file) { + // Start tracking the upload bytes started + segmentTracker.addUploadBytesStarted(latestFileNameSizeOnLocalMap.get(file)); + } + + @Override + public void onSuccess(String file) { + // Track upload success + segmentTracker.addUploadBytesSucceeded(latestFileNameSizeOnLocalMap.get(file)); + segmentTracker.addToLatestUploadedFiles(file); + } + + @Override + public void onFailure(String file) { + // Track upload failure + segmentTracker.addUploadBytesFailed(latestFileNameSizeOnLocalMap.get(file)); + } + }, remoteDirectory, storeDirectory, this::getChecksumOfLocalFile); } @Override - public void beforeRefresh() throws IOException { - // Do Nothing - } + public void beforeRefresh() throws IOException {} /** * Upload new segment files created as part of the last refresh to the remote segment store. @@ -131,6 +166,11 @@ public void beforeRefresh() throws IOException { */ @Override public void afterRefresh(boolean didRefresh) { + + if (didRefresh) { + updateLocalRefreshTimeAndSeqNo(); + } + try { indexShard.getThreadPool().executor(ThreadPool.Names.REMOTE_REFRESH).submit(() -> syncSegments(false)).get(); } catch (InterruptedException | ExecutionException e) { @@ -139,108 +179,130 @@ public void afterRefresh(boolean didRefresh) { } private synchronized void syncSegments(boolean isRetry) { - boolean shouldRetry = false; + if (indexShard.getReplicationTracker().isPrimaryMode() == false) { + return; + } beforeSegmentsSync(isRetry); + long refreshTimeMs = segmentTracker.getLocalRefreshTimeMs(), refreshSeqNo = segmentTracker.getLocalRefreshSeqNo(); + long bytesBeforeUpload = segmentTracker.getUploadBytesSucceeded(), startTimeInNS = System.nanoTime(); + boolean shouldRetry = true; try { - if (indexShard.getReplicationTracker().isPrimaryMode()) { - if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) { - this.primaryTerm = indexShard.getOperationPrimaryTerm(); - this.remoteDirectory.init(); + + if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) { + this.primaryTerm = indexShard.getOperationPrimaryTerm(); + this.remoteDirectory.init(); + } + try { + // if a new segments_N file is present in local that is not uploaded to remote store yet, it + // is considered as a first refresh post commit. A cleanup of stale commit files is triggered. + // This is done to avoid delete post each refresh. + // Ideally, we want this to be done in async flow. (GitHub issue #4315) + if (isRefreshAfterCommit()) { + deleteStaleCommits(); } - try { - // if a new segments_N file is present in local that is not uploaded to remote store yet, it - // is considered as a first refresh post commit. A cleanup of stale commit files is triggered. - // This is done to avoid delete post each refresh. - // Ideally, we want this to be done in async flow. (GitHub issue #4315) - if (isRefreshAfterCommit()) { - deleteStaleCommits(); - } + String segmentInfoSnapshotFilename = null; + try (GatedCloseable segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) { + SegmentInfos segmentInfos = segmentInfosGatedCloseable.get(); // Capture replication checkpoint before uploading the segments as upload can take some time and checkpoint can // move. ReplicationCheckpoint checkpoint = indexShard.getLatestReplicationCheckpoint(); - - String segmentInfoSnapshotFilename = null; - try (GatedCloseable segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) { - SegmentInfos segmentInfos = segmentInfosGatedCloseable.get(); - - Collection localSegmentsPostRefresh = segmentInfos.files(true); - - List segmentInfosFiles = localSegmentsPostRefresh.stream() - .filter(file -> file.startsWith(IndexFileNames.SEGMENTS)) - .collect(Collectors.toList()); - Optional latestSegmentInfos = segmentInfosFiles.stream() - .max(Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName)); - - if (latestSegmentInfos.isPresent()) { - // SegmentInfosSnapshot is a snapshot of reader's view of segments and may not contain - // all the segments from last commit if they are merged away but not yet committed. - // Each metadata file in the remote segment store represents a commit and the following - // statement keeps sure that each metadata will always contain all the segments from last commit + refreshed - // segments. - localSegmentsPostRefresh.addAll(SegmentInfos.readCommit(storeDirectory, latestSegmentInfos.get()).files(true)); - segmentInfosFiles.stream() - .filter(file -> !file.equals(latestSegmentInfos.get())) - .forEach(localSegmentsPostRefresh::remove); - - boolean uploadStatus = uploadNewSegments(localSegmentsPostRefresh); - if (uploadStatus) { - segmentInfoSnapshotFilename = uploadSegmentInfosSnapshot(latestSegmentInfos.get(), segmentInfos); - localSegmentsPostRefresh.add(segmentInfoSnapshotFilename); - - remoteDirectory.uploadMetadata( - localSegmentsPostRefresh, - storeDirectory, - indexShard.getOperationPrimaryTerm(), - segmentInfos.getGeneration() - ); - localSegmentChecksumMap.keySet() - .stream() - .filter(file -> !localSegmentsPostRefresh.contains(file)) - .collect(Collectors.toSet()) - .forEach(localSegmentChecksumMap::remove); - onSuccessfulSegmentsSync(); - final long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint(); - indexShard.getEngine().translogManager().setMinSeqNoToKeep(lastRefreshedCheckpoint + 1); - - checkpointPublisher.publish(indexShard, checkpoint); - } else { - shouldRetry = true; - } + long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint(); + Collection localSegmentsPostRefresh = segmentInfos.files(true); + + List segmentInfosFiles = localSegmentsPostRefresh.stream() + .filter(file -> file.startsWith(IndexFileNames.SEGMENTS)) + .collect(Collectors.toList()); + Optional latestSegmentInfos = segmentInfosFiles.stream() + .max(Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName)); + + if (latestSegmentInfos.isPresent()) { + // SegmentInfosSnapshot is a snapshot of reader's view of segments and may not contain + // all the segments from last commit if they are merged away but not yet committed. + // Each metadata file in the remote segment store represents a commit and the following + // statement keeps sure that each metadata will always contain all the segments from last commit + refreshed + // segments. + localSegmentsPostRefresh.addAll(SegmentInfos.readCommit(storeDirectory, latestSegmentInfos.get()).files(true)); + segmentInfosFiles.stream() + .filter(file -> !file.equals(latestSegmentInfos.get())) + .forEach(localSegmentsPostRefresh::remove); + + // Create a map of file name to size and update the refresh segment tracker + updateLocalSizeMapAndTracker(localSegmentsPostRefresh); + + // Start the segments files upload + boolean newSegmentsUploadStatus = uploadNewSegments(localSegmentsPostRefresh); + if (newSegmentsUploadStatus) { + segmentInfoSnapshotFilename = uploadSegmentInfosSnapshot(latestSegmentInfos.get(), segmentInfos); + localSegmentsPostRefresh.add(segmentInfoSnapshotFilename); + + // Start metadata file upload + remoteDirectory.uploadMetadata( + localSegmentsPostRefresh, + storeDirectory, + indexShard.getOperationPrimaryTerm(), + segmentInfos.getGeneration() + ); + clearStaleFilesFromLocalSegmentChecksumMap(localSegmentsPostRefresh); + onSuccessfulSegmentsSync(refreshTimeMs, refreshSeqNo); + indexShard.getEngine().translogManager().setMinSeqNoToKeep(lastRefreshedCheckpoint + 1); + checkpointPublisher.publish(indexShard, checkpoint); + // At this point since we have uploaded new segments, segment infos and segment metadata file, + // along with marking minSeqNoToKeep, upload has succeeded completely. + shouldRetry = false; } - } catch (EngineException e) { - shouldRetry = true; - logger.warn("Exception while reading SegmentInfosSnapshot", e); - } finally { - try { - if (segmentInfoSnapshotFilename != null) { - storeDirectory.deleteFile(segmentInfoSnapshotFilename); - } - } catch (IOException e) { - logger.warn("Exception while deleting: " + segmentInfoSnapshotFilename, e); + } + } catch (EngineException e) { + logger.warn("Exception while reading SegmentInfosSnapshot", e); + } finally { + try { + if (segmentInfoSnapshotFilename != null) { + storeDirectory.deleteFile(segmentInfoSnapshotFilename); } + } catch (IOException e) { + logger.warn("Exception while deleting: " + segmentInfoSnapshotFilename, e); } - } catch (IOException e) { - shouldRetry = true; - // We don't want to fail refresh if upload of new segments fails. The missed segments will be re-tried - // in the next refresh. This should not affect durability of the indexed data after remote trans-log integration. - logger.warn("Exception while uploading new segments to the remote segment store", e); } + } catch (IOException e) { + // We don't want to fail refresh if upload of new segments fails. The missed segments will be re-tried + // in the next refresh. This should not affect durability of the indexed data after remote trans-log integration. + logger.warn("Exception while uploading new segments to the remote segment store", e); } } catch (Throwable t) { - shouldRetry = true; logger.error("Exception in RemoteStoreRefreshListener.afterRefresh()", t); + } finally { + // Update the segment tracker with the final upload status as seen at the end + updateFinalUploadStatusInSegmentTracker(shouldRetry == false, bytesBeforeUpload, startTimeInNS); } afterSegmentsSync(isRetry, shouldRetry); } + /** + * Clears the stale files from the latest local segment checksum map. + * + * @param localSegmentsPostRefresh list of segment files present post refresh + */ + private void clearStaleFilesFromLocalSegmentChecksumMap(Collection localSegmentsPostRefresh) { + localSegmentChecksumMap.keySet() + .stream() + .filter(file -> !localSegmentsPostRefresh.contains(file)) + .collect(Collectors.toSet()) + .forEach(localSegmentChecksumMap::remove); + } + private void beforeSegmentsSync(boolean isRetry) { if (isRetry) { logger.info("Retrying to sync the segments to remote store"); } + // Start tracking total uploads started + segmentTracker.incrementTotalUploadsStarted(); } - private void onSuccessfulSegmentsSync() { + private void onSuccessfulSegmentsSync(long refreshTimeMs, long refreshSeqNo) { + // Update latest uploaded segment files name in segment tracker + segmentTracker.setLatestUploadedFiles(latestFileNameSizeOnLocalMap.keySet()); + // Update the remote refresh time and refresh seq no + updateRemoteRefreshTimeAndSeqNo(refreshTimeMs, refreshSeqNo); // Reset the backoffDelayIterator for the future failures resetBackOffDelayIterator(); // Cancel the scheduled cancellable retry if possible and set it to null @@ -303,25 +365,13 @@ String uploadSegmentInfosSnapshot(String latestSegmentsNFilename, SegmentInfos s return segmentInfoSnapshotFilename; } - // Visible for testing - boolean uploadNewSegments(Collection localFiles) throws IOException { + private boolean uploadNewSegments(Collection localSegmentsPostRefresh) throws IOException { AtomicBoolean uploadSuccess = new AtomicBoolean(true); - localFiles.stream().filter(file -> !EXCLUDE_FILES.contains(file)).filter(file -> { + localSegmentsPostRefresh.forEach(file -> { try { - return !remoteDirectory.containsFile(file, getChecksumOfLocalFile(file)); - } catch (IOException e) { - logger.info( - "Exception while reading checksum of local segment file: {}, ignoring the exception and re-uploading the file", - file - ); - return true; - } - }).forEach(file -> { - try { - remoteDirectory.copyFrom(storeDirectory, file, file, IOContext.DEFAULT); + fileUploader.uploadFile(file); } catch (IOException e) { uploadSuccess.set(false); - // ToDO: Handle transient and permanent un-availability of the remote store (GitHub #3397) logger.warn(() -> new ParameterizedMessage("Exception while uploading file {} to the remote segment store", file), e); } }); @@ -345,4 +395,160 @@ private void deleteStaleCommits() { logger.info("Exception while deleting stale commits from remote segment store, will retry delete post next commit", e); } } + + /** + * Updates the last refresh time and refresh seq no which is seen by local store. + */ + private void updateLocalRefreshTimeAndSeqNo() { + segmentTracker.updateLocalRefreshTimeMs(System.nanoTime() / 1_000_000L); + segmentTracker.updateLocalRefreshSeqNo(segmentTracker.getLocalRefreshSeqNo() + 1); + } + + /** + * Updates the last refresh time and refresh seq no which is seen by remote store. + */ + private void updateRemoteRefreshTimeAndSeqNo(long refreshTimeMs, long refreshSeqNo) { + segmentTracker.updateRemoteRefreshTimeMs(refreshTimeMs); + segmentTracker.updateRemoteRefreshSeqNo(refreshSeqNo); + } + + /** + * Updates map of file name to size of the input segment files. Tries to reuse existing information by caching the size + * data, otherwise uses {@code storeDirectory.fileLength(file)} to get the size. This method also removes from the map + * such files that are not present in the list of segment files given in the input. + * + * @param segmentFiles list of segment files for which size needs to be known + */ + private void updateLocalSizeMapAndTracker(Collection segmentFiles) { + + // Update the map + segmentFiles.stream() + .filter(file -> !EXCLUDE_FILES.contains(file)) + .filter(file -> !latestFileNameSizeOnLocalMap.containsKey(file) || latestFileNameSizeOnLocalMap.get(file) == 0) + .forEach(file -> { + long fileSize = 0; + try { + fileSize = storeDirectory.fileLength(file); + } catch (IOException e) { + logger.warn(new ParameterizedMessage("Exception while reading the fileLength of file={}", file), e); + } + latestFileNameSizeOnLocalMap.put(file, fileSize); + }); + + Set fileSet = new HashSet<>(segmentFiles); + // Remove keys from the fileSizeMap that do not exist in the latest segment files + latestFileNameSizeOnLocalMap.entrySet().removeIf(entry -> fileSet.contains(entry.getKey()) == false); + // Update the tracker + segmentTracker.setLatestLocalFileNameLengthMap(latestFileNameSizeOnLocalMap); + } + + private void updateFinalUploadStatusInSegmentTracker(boolean uploadStatus, long bytesBeforeUpload, long startTimeInNS) { + if (uploadStatus) { + long bytesUploaded = segmentTracker.getUploadBytesSucceeded() - bytesBeforeUpload; + long timeTakenInMS = (System.nanoTime() - startTimeInNS) / 1_000_000L; + + segmentTracker.incrementTotalUploadsSucceeded(); + segmentTracker.addUploadBytes(bytesUploaded); + segmentTracker.addUploadBytesPerSec((bytesUploaded * 1_000L) / timeTakenInMS); + segmentTracker.addUploadTimeMs(timeTakenInMS); + } else { + segmentTracker.incrementTotalUploadsFailed(); + } + } + + /** + * This class is a wrapper over the copying of file from local to remote store allowing to decorate the actual copy + * method along with adding hooks of code that can be run before, on success and on failure. + * + * @opensearch.internal + */ + private static class FileUploader { + + private final UploadTracker uploadTracker; + + private final RemoteSegmentStoreDirectory remoteDirectory; + + private final Directory storeDirectory; + + private final CheckedFunction checksumProvider; + + public FileUploader( + UploadTracker uploadTracker, + RemoteSegmentStoreDirectory remoteDirectory, + Directory storeDirectory, + CheckedFunction checksumProvider + ) { + this.uploadTracker = uploadTracker; + this.remoteDirectory = remoteDirectory; + this.storeDirectory = storeDirectory; + this.checksumProvider = checksumProvider; + } + + /** + * Calling this method will lead to before getting executed and then the actual upload. Based on the upload status, + * the onSuccess or onFailure method gets invoked. + * + * @param file the file which is to be uploaded. + * @throws IOException is thrown if the upload fails. + */ + private void uploadFile(String file) throws IOException { + if (skipUpload(file)) { + return; + } + uploadTracker.beforeUpload(file); + boolean success = false; + try { + performUpload(file); + uploadTracker.onSuccess(file); + success = true; + } finally { + if (!success) { + uploadTracker.onFailure(file); + } + } + } + + /** + * Whether to upload a file or not depending on whether file is in excluded list or has been already uploaded. + * + * @param file that needs to be uploaded. + * @return true if the upload has to be skipped for the file. + */ + private boolean skipUpload(String file) { + try { + // Exclude files that are already uploaded and the exclude files to come up with the list of files to be uploaded. + return EXCLUDE_FILES.contains(file) || remoteDirectory.containsFile(file, checksumProvider.apply(file)); + } catch (IOException e) { + logger.error( + "Exception while reading checksum of local segment file: {}, ignoring the exception and re-uploading the file", + file + ); + } + return false; + } + + /** + * This method does the actual upload. + * + * @param file that needs to be uploaded. + * @throws IOException is thrown if the upload fails. + */ + private void performUpload(String file) throws IOException { + remoteDirectory.copyFrom(storeDirectory, file, file, IOContext.DEFAULT); + } + } + + /** + * A tracker class that is fed to FileUploader. + * + * @opensearch.internal + */ + interface UploadTracker { + + void beforeUpload(String file); + + void onSuccess(String file); + + void onFailure(String file); + } } diff --git a/server/src/main/java/org/opensearch/indices/IndicesModule.java b/server/src/main/java/org/opensearch/indices/IndicesModule.java index a5276350d582a..b868f6aa35aee 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesModule.java +++ b/server/src/main/java/org/opensearch/indices/IndicesModule.java @@ -37,6 +37,7 @@ import org.opensearch.action.admin.indices.rollover.MaxDocsCondition; import org.opensearch.action.admin.indices.rollover.MaxSizeCondition; import org.opensearch.action.resync.TransportResyncReplicationAction; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.core.ParseField; import org.opensearch.common.inject.AbstractModule; import org.opensearch.common.io.stream.NamedWriteableRegistry; @@ -69,6 +70,7 @@ import org.opensearch.index.mapper.SourceFieldMapper; import org.opensearch.index.mapper.TextFieldMapper; import org.opensearch.index.mapper.VersionFieldMapper; +import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; import org.opensearch.index.seqno.RetentionLeaseBackgroundSyncAction; import org.opensearch.index.seqno.RetentionLeaseSyncAction; import org.opensearch.index.seqno.RetentionLeaseSyncer; @@ -286,6 +288,9 @@ protected void configure() { bind(RetentionLeaseSyncer.class).asEagerSingleton(); bind(SegmentReplicationCheckpointPublisher.class).asEagerSingleton(); bind(SegmentReplicationPressureService.class).asEagerSingleton(); + if (FeatureFlags.isEnabled(FeatureFlags.REMOTE_STORE)) { + bind(RemoteRefreshSegmentPressureService.class).asEagerSingleton(); + } } /** diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index b3843dfd114a9..cdad2c45638e5 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -120,6 +120,7 @@ import org.opensearch.index.query.QueryRewriteContext; import org.opensearch.index.recovery.RecoveryStats; import org.opensearch.index.refresh.RefreshStats; +import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; import org.opensearch.index.search.stats.SearchStats; import org.opensearch.index.seqno.RetentionLeaseStats; import org.opensearch.index.seqno.RetentionLeaseSyncer; @@ -941,14 +942,21 @@ public IndexShard createShard( final Consumer globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, final DiscoveryNode targetNode, - final DiscoveryNode sourceNode + final DiscoveryNode sourceNode, + final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService ) throws IOException { Objects.requireNonNull(retentionLeaseSyncer); ensureChangesAllowed(); IndexService indexService = indexService(shardRouting.index()); assert indexService != null; RecoveryState recoveryState = indexService.createRecoveryState(shardRouting, targetNode, sourceNode); - IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer, checkpointPublisher); + IndexShard indexShard = indexService.createShard( + shardRouting, + globalCheckpointSyncer, + retentionLeaseSyncer, + checkpointPublisher, + remoteRefreshSegmentPressureService + ); indexShard.addShardFailureCallback(onShardFailure); indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, mapping -> { assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index f2a6583ae47bc..4a0fab82f9adc 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -56,6 +56,7 @@ import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.concurrent.AbstractRunnable; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.env.ShardLockObtainFailedException; @@ -64,6 +65,7 @@ import org.opensearch.index.IndexComponent; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; +import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; import org.opensearch.index.seqno.GlobalCheckpointSyncAction; import org.opensearch.index.seqno.ReplicationTracker; import org.opensearch.index.seqno.RetentionLeaseSyncer; @@ -146,6 +148,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple private final SegmentReplicationCheckpointPublisher checkpointPublisher; + private final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService; + @Inject public IndicesClusterStateService( final Settings settings, @@ -164,7 +168,8 @@ public IndicesClusterStateService( final PrimaryReplicaSyncer primaryReplicaSyncer, final GlobalCheckpointSyncAction globalCheckpointSyncAction, final RetentionLeaseSyncer retentionLeaseSyncer, - final SegmentReplicationCheckpointPublisher checkpointPublisher + final SegmentReplicationCheckpointPublisher checkpointPublisher, + final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService ) { this( settings, @@ -183,7 +188,8 @@ public IndicesClusterStateService( snapshotShardsService, primaryReplicaSyncer, globalCheckpointSyncAction::updateGlobalCheckpointForShard, - retentionLeaseSyncer + retentionLeaseSyncer, + remoteRefreshSegmentPressureService ); } @@ -205,7 +211,8 @@ public IndicesClusterStateService( final SnapshotShardsService snapshotShardsService, final PrimaryReplicaSyncer primaryReplicaSyncer, final Consumer globalCheckpointSyncer, - final RetentionLeaseSyncer retentionLeaseSyncer + final RetentionLeaseSyncer retentionLeaseSyncer, + final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService ) { this.settings = settings; this.checkpointPublisher = checkpointPublisher; @@ -215,6 +222,10 @@ public IndicesClusterStateService( ); indexEventListeners.add(segmentReplicationTargetService); indexEventListeners.add(segmentReplicationSourceService); + // if remote store feature is not enabled, do not wire the remote upload pressure service as an IndexEventListener. + if (FeatureFlags.isEnabled(FeatureFlags.REMOTE_STORE)) { + indexEventListeners.add(remoteRefreshSegmentPressureService); + } this.segmentReplicationTargetService = segmentReplicationTargetService; this.builtInIndexListener = Collections.unmodifiableList(indexEventListeners); this.indicesService = indicesService; @@ -228,6 +239,7 @@ public IndicesClusterStateService( this.globalCheckpointSyncer = globalCheckpointSyncer; this.retentionLeaseSyncer = Objects.requireNonNull(retentionLeaseSyncer); this.sendRefreshMapping = settings.getAsBoolean("indices.cluster.send_refresh_mapping", true); + this.remoteRefreshSegmentPressureService = remoteRefreshSegmentPressureService; } @Override @@ -657,7 +669,8 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR globalCheckpointSyncer, retentionLeaseSyncer, nodes.getLocalNode(), - sourceNode + sourceNode, + remoteRefreshSegmentPressureService ); } catch (Exception e) { failAndRemoveShard(shardRouting, true, "failed to create shard", e, state); @@ -1015,7 +1028,8 @@ T createShard( Consumer globalCheckpointSyncer, RetentionLeaseSyncer retentionLeaseSyncer, DiscoveryNode targetNode, - @Nullable DiscoveryNode sourceNode + @Nullable DiscoveryNode sourceNode, + RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService ) throws IOException; /** diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java index d412b5383bc89..cc7b5cb8dc845 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java @@ -77,6 +77,7 @@ import org.opensearch.index.mapper.Mapping; import org.opensearch.index.mapper.MetadataFieldMapper; import org.opensearch.index.mapper.RootObjectMapper; +import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; @@ -1072,6 +1073,7 @@ public void testHandlePrimaryTermValidationRequestWithDifferentAllocationId() { mock(ActionFilters.class), mock(IndexingPressureService.class), mock(SegmentReplicationPressureService.class), + mock(RemoteRefreshSegmentPressureService.class), mock(SystemIndices.class) ); action.handlePrimaryTermValidationRequest( @@ -1102,6 +1104,7 @@ public void testHandlePrimaryTermValidationRequestWithOlderPrimaryTerm() { mock(ActionFilters.class), mock(IndexingPressureService.class), mock(SegmentReplicationPressureService.class), + mock(RemoteRefreshSegmentPressureService.class), mock(SystemIndices.class) ); action.handlePrimaryTermValidationRequest( @@ -1132,6 +1135,7 @@ public void testHandlePrimaryTermValidationRequestSuccess() { mock(ActionFilters.class), mock(IndexingPressureService.class), mock(SegmentReplicationPressureService.class), + mock(RemoteRefreshSegmentPressureService.class), mock(SystemIndices.class) ); action.handlePrimaryTermValidationRequest( @@ -1173,6 +1177,7 @@ private TransportShardBulkAction createAction() { mock(ActionFilters.class), mock(IndexingPressureService.class), mock(SegmentReplicationPressureService.class), + mock(RemoteRefreshSegmentPressureService.class), mock(SystemIndices.class) ); } diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java index c5a6c0323a6f9..1bab4bbfd9d31 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java @@ -99,14 +99,10 @@ public void testValidateSegmentUploadLag() { pressureService = new RemoteRefreshSegmentPressureService(clusterService, Settings.EMPTY); pressureService.afterIndexShardCreated(indexShard); - // 1. Seq no - add data points to the pressure tracker RemoteRefreshSegmentTracker pressureTracker = pressureService.getRemoteRefreshSegmentTracker(shardId); pressureTracker.updateLocalRefreshSeqNo(6); - Exception e = assertThrows(OpenSearchRejectedExecutionException.class, () -> pressureService.validateSegmentsUploadLag(shardId)); - assertTrue(e.getMessage().contains("due to remote segments lagging behind local segments")); - assertTrue(e.getMessage().contains("remote_refresh_seq_no:0 local_refresh_seq_no:6")); - // 2. time lag more than dynamic threshold + // 1. time lag more than dynamic threshold pressureTracker.updateRemoteRefreshSeqNo(3); AtomicLong sum = new AtomicLong(); IntStream.range(0, 20).forEach(i -> { @@ -115,16 +111,16 @@ public void testValidateSegmentUploadLag() { }); double avg = (double) sum.get() / 20; long currentMs = System.nanoTime() / 1_000_000; - pressureTracker.updateLocalRefreshTimeMs((long) (currentMs + 4 * avg)); + pressureTracker.updateLocalRefreshTimeMs((long) (currentMs + 12 * avg)); pressureTracker.updateRemoteRefreshTimeMs(currentMs); - e = assertThrows(OpenSearchRejectedExecutionException.class, () -> pressureService.validateSegmentsUploadLag(shardId)); + Exception e = assertThrows(OpenSearchRejectedExecutionException.class, () -> pressureService.validateSegmentsUploadLag(shardId)); assertTrue(e.getMessage().contains("due to remote segments lagging behind local segments")); - assertTrue(e.getMessage().contains("time_lag:38 ms dynamic_time_lag_threshold:19.0 ms")); + assertTrue(e.getMessage().contains("time_lag:114 ms dynamic_time_lag_threshold:95.0 ms")); pressureTracker.updateRemoteRefreshTimeMs((long) (currentMs + 2 * avg)); pressureService.validateSegmentsUploadLag(shardId); - // 3. bytes lag more than dynamic threshold + // 2. bytes lag more than dynamic threshold sum.set(0); IntStream.range(0, 20).forEach(i -> { pressureTracker.addUploadBytes(i); @@ -132,24 +128,24 @@ public void testValidateSegmentUploadLag() { }); avg = (double) sum.get() / 20; Map nameSizeMap = new HashMap<>(); - nameSizeMap.put("a", (long) (4 * avg)); + nameSizeMap.put("a", (long) (12 * avg)); pressureTracker.setLatestLocalFileNameLengthMap(nameSizeMap); e = assertThrows(OpenSearchRejectedExecutionException.class, () -> pressureService.validateSegmentsUploadLag(shardId)); assertTrue(e.getMessage().contains("due to remote segments lagging behind local segments")); - assertTrue(e.getMessage().contains("bytes_lag:38 dynamic_bytes_lag_threshold:19.0")); + assertTrue(e.getMessage().contains("bytes_lag:114 dynamic_bytes_lag_threshold:95.0")); nameSizeMap.put("a", (long) (2 * avg)); pressureTracker.setLatestLocalFileNameLengthMap(nameSizeMap); pressureService.validateSegmentsUploadLag(shardId); - // 4. Consecutive failures more than the limit - IntStream.range(0, 10).forEach(ignore -> pressureTracker.incrementTotalUploadsFailed()); + // 3. Consecutive failures more than the limit + IntStream.range(0, 5).forEach(ignore -> pressureTracker.incrementTotalUploadsFailed()); pressureService.validateSegmentsUploadLag(shardId); pressureTracker.incrementTotalUploadsFailed(); e = assertThrows(OpenSearchRejectedExecutionException.class, () -> pressureService.validateSegmentsUploadLag(shardId)); assertTrue(e.getMessage().contains("due to remote segments lagging behind local segments")); - assertTrue(e.getMessage().contains("failure_streak_count:11 min_consecutive_failure_threshold:10")); - pressureTracker.incrementTotalUploadSucceeded(); + assertTrue(e.getMessage().contains("failure_streak_count:6 min_consecutive_failure_threshold:5")); + pressureTracker.incrementTotalUploadsSucceeded(); pressureService.validateSegmentsUploadLag(shardId); } diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettingsTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettingsTests.java index 66b5d6c4c19d8..75b5b946e8bf8 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettingsTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettingsTests.java @@ -54,17 +54,14 @@ public void testGetDefaultSettings() { // Check remote refresh segment pressure enabled is false assertFalse(pressureSettings.isRemoteRefreshSegmentPressureEnabled()); - // Check min sequence number lag limit default value - assertEquals(5L, pressureSettings.getMinRefreshSeqNoLagLimit()); - // Check bytes lag variance threshold default value - assertEquals(2.0, pressureSettings.getBytesLagVarianceFactor(), 0.0d); + assertEquals(10.0, pressureSettings.getBytesLagVarianceFactor(), 0.0d); // Check time lag variance threshold default value - assertEquals(2.0, pressureSettings.getUploadTimeLagVarianceFactor(), 0.0d); + assertEquals(10.0, pressureSettings.getUploadTimeLagVarianceFactor(), 0.0d); // Check minimum consecutive failures limit default value - assertEquals(10, pressureSettings.getMinConsecutiveFailuresLimit()); + assertEquals(5, pressureSettings.getMinConsecutiveFailuresLimit()); // Check upload bytes moving average window size default value assertEquals(20, pressureSettings.getUploadBytesMovingAverageWindowSize()); @@ -79,7 +76,6 @@ public void testGetDefaultSettings() { public void testGetConfiguredSettings() { Settings settings = Settings.builder() .put(RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true) - .put(RemoteRefreshSegmentPressureSettings.MIN_SEQ_NO_LAG_LIMIT.getKey(), 100) .put(RemoteRefreshSegmentPressureSettings.BYTES_LAG_VARIANCE_FACTOR.getKey(), 50.0) .put(RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_LAG_VARIANCE_FACTOR.getKey(), 60.0) .put(RemoteRefreshSegmentPressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT.getKey(), 121) @@ -96,9 +92,6 @@ public void testGetConfiguredSettings() { // Check remote refresh segment pressure enabled is true assertTrue(pressureSettings.isRemoteRefreshSegmentPressureEnabled()); - // Check min sequence number lag limit configured value - assertEquals(100L, pressureSettings.getMinRefreshSeqNoLagLimit()); - // Check bytes lag variance threshold configured value assertEquals(50.0, pressureSettings.getBytesLagVarianceFactor(), 0.0d); @@ -127,7 +120,6 @@ public void testUpdateAfterGetDefaultSettings() { Settings newSettings = Settings.builder() .put(RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true) - .put(RemoteRefreshSegmentPressureSettings.MIN_SEQ_NO_LAG_LIMIT.getKey(), 100) .put(RemoteRefreshSegmentPressureSettings.BYTES_LAG_VARIANCE_FACTOR.getKey(), 50.0) .put(RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_LAG_VARIANCE_FACTOR.getKey(), 60.0) .put(RemoteRefreshSegmentPressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT.getKey(), 121) @@ -140,9 +132,6 @@ public void testUpdateAfterGetDefaultSettings() { // Check updated remote refresh segment pressure enabled is false assertTrue(pressureSettings.isRemoteRefreshSegmentPressureEnabled()); - // Check min sequence number lag limit - assertEquals(100L, pressureSettings.getMinRefreshSeqNoLagLimit()); - // Check bytes lag variance threshold updated assertEquals(50.0, pressureSettings.getBytesLagVarianceFactor(), 0.0d); @@ -165,7 +154,6 @@ public void testUpdateAfterGetDefaultSettings() { public void testUpdateAfterGetConfiguredSettings() { Settings settings = Settings.builder() .put(RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true) - .put(RemoteRefreshSegmentPressureSettings.MIN_SEQ_NO_LAG_LIMIT.getKey(), 100) .put(RemoteRefreshSegmentPressureSettings.BYTES_LAG_VARIANCE_FACTOR.getKey(), 50.0) .put(RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_LAG_VARIANCE_FACTOR.getKey(), 60.0) .put(RemoteRefreshSegmentPressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT.getKey(), 121) @@ -180,7 +168,6 @@ public void testUpdateAfterGetConfiguredSettings() { ); Settings newSettings = Settings.builder() - .put(RemoteRefreshSegmentPressureSettings.MIN_SEQ_NO_LAG_LIMIT.getKey(), 80) .put(RemoteRefreshSegmentPressureSettings.BYTES_LAG_VARIANCE_FACTOR.getKey(), 40.0) .put(RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_LAG_VARIANCE_FACTOR.getKey(), 50.0) .put(RemoteRefreshSegmentPressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT.getKey(), 111) @@ -194,9 +181,6 @@ public void testUpdateAfterGetConfiguredSettings() { // Check updated remote refresh segment pressure enabled is true assertTrue(pressureSettings.isRemoteRefreshSegmentPressureEnabled()); - // Check min sequence number lag limit - assertEquals(80L, pressureSettings.getMinRefreshSeqNoLagLimit()); - // Check bytes lag variance threshold updated assertEquals(40.0, pressureSettings.getBytesLagVarianceFactor(), 0.0d); diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentTrackerTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentTrackerTests.java index d939d235ab3c7..4360fc0fe4011 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentTrackerTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentTrackerTests.java @@ -243,9 +243,9 @@ public void testIncrementTotalUploadSucceeded() { pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), pressureSettings.getUploadTimeMovingAverageWindowSize() ); - pressureTracker.incrementTotalUploadSucceeded(); + pressureTracker.incrementTotalUploadsSucceeded(); assertEquals(1, pressureTracker.getTotalUploadsSucceeded()); - pressureTracker.incrementTotalUploadSucceeded(); + pressureTracker.incrementTotalUploadsSucceeded(); assertEquals(2, pressureTracker.getTotalUploadsSucceeded()); } @@ -260,7 +260,7 @@ public void testGetInflightUploads() { assertEquals(1, pressureTracker.getInflightUploads()); pressureTracker.incrementTotalUploadsStarted(); assertEquals(2, pressureTracker.getInflightUploads()); - pressureTracker.incrementTotalUploadSucceeded(); + pressureTracker.incrementTotalUploadsSucceeded(); assertEquals(1, pressureTracker.getInflightUploads()); pressureTracker.incrementTotalUploadsFailed(); assertEquals(0, pressureTracker.getInflightUploads()); @@ -290,7 +290,7 @@ public void testGetConsecutiveFailureCount() { assertEquals(1, pressureTracker.getConsecutiveFailureCount()); pressureTracker.incrementTotalUploadsFailed(); assertEquals(2, pressureTracker.getConsecutiveFailureCount()); - pressureTracker.incrementTotalUploadSucceeded(); + pressureTracker.incrementTotalUploadsSucceeded(); assertEquals(0, pressureTracker.getConsecutiveFailureCount()); } @@ -309,17 +309,17 @@ public void testComputeBytesLag() { pressureTracker.setLatestLocalFileNameLengthMap(fileSizeMap); assertEquals(205L, pressureTracker.getBytesLag()); - pressureTracker.addToLatestUploadFiles("a"); + pressureTracker.addToLatestUploadedFiles("a"); assertEquals(105L, pressureTracker.getBytesLag()); fileSizeMap.put("c", 115L); pressureTracker.setLatestLocalFileNameLengthMap(fileSizeMap); assertEquals(220L, pressureTracker.getBytesLag()); - pressureTracker.addToLatestUploadFiles("b"); + pressureTracker.addToLatestUploadedFiles("b"); assertEquals(115L, pressureTracker.getBytesLag()); - pressureTracker.addToLatestUploadFiles("c"); + pressureTracker.addToLatestUploadedFiles("c"); assertEquals(0L, pressureTracker.getBytesLag()); } diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index bcc3268658ab8..365fb0237f80f 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -18,13 +18,19 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.collect.Tuple; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lease.Releasable; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.index.engine.InternalEngineFactory; +import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; +import org.opensearch.index.remote.RemoteRefreshSegmentTracker; import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.Store; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.threadpool.ThreadPool; import java.io.IOException; @@ -37,11 +43,14 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE; import static org.opensearch.index.shard.RemoteStoreRefreshListener.SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX; public class RemoteStoreRefreshListenerTests extends IndexShardTestCase { private IndexShard indexShard; + private ClusterService clusterService; private RemoteStoreRefreshListener remoteStoreRefreshListener; + private RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService; public void setup(boolean primary, int numberOfDocs) throws IOException { indexShard = newStartedShard( @@ -53,7 +62,18 @@ public void setup(boolean primary, int numberOfDocs) throws IOException { indexDocs(1, numberOfDocs); indexShard.refresh("test"); - remoteStoreRefreshListener = new RemoteStoreRefreshListener(indexShard, SegmentReplicationCheckpointPublisher.EMPTY); + clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool + ); + remoteRefreshSegmentPressureService = new RemoteRefreshSegmentPressureService(clusterService, Settings.EMPTY); + remoteRefreshSegmentPressureService.afterIndexShardCreated(indexShard); + remoteStoreRefreshListener = new RemoteStoreRefreshListener( + indexShard, + SegmentReplicationCheckpointPublisher.EMPTY, + remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()) + ); } private void indexDocs(int startDocId, int numberOfDocs) throws IOException { @@ -64,11 +84,9 @@ private void indexDocs(int startDocId, int numberOfDocs) throws IOException { @After public void tearDown() throws Exception { - if (indexShard != null) { - Directory storeDirectory = ((FilterDirectory) ((FilterDirectory) indexShard.store().directory()).getDelegate()).getDelegate(); - ((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false); - closeShards(indexShard); - } + Directory storeDirectory = ((FilterDirectory) ((FilterDirectory) indexShard.store().directory()).getDelegate()).getDelegate(); + ((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false); + closeShards(indexShard); super.tearDown(); } @@ -221,9 +239,19 @@ public void testRefreshSuccessOnFirstAttempt() throws Exception { // We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload. // Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down CountDownLatch successLatch = new CountDownLatch(3); - mockIndexShardWithRetryAndScheduleRefresh(succeedOnAttempt, refreshCountLatch, successLatch); + Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh( + succeedOnAttempt, + refreshCountLatch, + successLatch + ); assertBusy(() -> assertEquals(0, refreshCountLatch.getCount())); assertBusy(() -> assertEquals(0, successLatch.getCount())); + RemoteRefreshSegmentPressureService pressureService = tuple.v2(); + RemoteRefreshSegmentTracker segmentTracker = pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); + assertEquals(0, segmentTracker.getBytesLag()); + assertEquals(0, segmentTracker.getRefreshSeqNoLag()); + assertEquals(0, segmentTracker.getTimeMsLag()); + assertEquals(0, segmentTracker.getTotalUploadsFailed()); } public void testRefreshSuccessOnSecondAttempt() throws Exception { @@ -235,9 +263,19 @@ public void testRefreshSuccessOnSecondAttempt() throws Exception { // We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload. // Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down CountDownLatch successLatch = new CountDownLatch(3); - mockIndexShardWithRetryAndScheduleRefresh(succeedOnAttempt, refreshCountLatch, successLatch); + Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh( + succeedOnAttempt, + refreshCountLatch, + successLatch + ); assertBusy(() -> assertEquals(0, refreshCountLatch.getCount())); assertBusy(() -> assertEquals(0, successLatch.getCount())); + RemoteRefreshSegmentPressureService pressureService = tuple.v2(); + RemoteRefreshSegmentTracker segmentTracker = pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); + assertEquals(0, segmentTracker.getBytesLag()); + assertEquals(0, segmentTracker.getRefreshSeqNoLag()); + assertEquals(0, segmentTracker.getTimeMsLag()); + assertEquals(1, segmentTracker.getTotalUploadsFailed()); } public void testRefreshSuccessOnThirdAttemptAttempt() throws Exception { @@ -249,12 +287,55 @@ public void testRefreshSuccessOnThirdAttemptAttempt() throws Exception { // We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload. // Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down CountDownLatch successLatch = new CountDownLatch(3); - mockIndexShardWithRetryAndScheduleRefresh(succeedOnAttempt, refreshCountLatch, successLatch); + Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh( + succeedOnAttempt, + refreshCountLatch, + successLatch + ); assertBusy(() -> assertEquals(0, refreshCountLatch.getCount())); assertBusy(() -> assertEquals(0, successLatch.getCount())); + RemoteRefreshSegmentPressureService pressureService = tuple.v2(); + RemoteRefreshSegmentTracker segmentTracker = pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); + assertEquals(0, segmentTracker.getBytesLag()); + assertEquals(0, segmentTracker.getRefreshSeqNoLag()); + assertEquals(0, segmentTracker.getTimeMsLag()); + assertEquals(2, segmentTracker.getTotalUploadsFailed()); + + } + + public void testTrackerData() throws Exception { + Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh(1); + RemoteStoreRefreshListener listener = tuple.v1(); + RemoteRefreshSegmentPressureService pressureService = tuple.v2(); + RemoteRefreshSegmentTracker tracker = pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); + assertNoLag(tracker); + indexDocs(100, randomIntBetween(100, 200)); + indexShard.refresh("test"); + listener.afterRefresh(true); + assertBusy(() -> assertNoLag(tracker)); + } + + private void assertNoLag(RemoteRefreshSegmentTracker tracker) { + assertEquals(0, tracker.getRefreshSeqNoLag()); + assertEquals(0, tracker.getBytesLag()); + assertEquals(0, tracker.getTimeMsLag()); + assertEquals(0, tracker.getRejectionCount()); + assertEquals(tracker.getUploadBytesStarted(), tracker.getUploadBytesSucceeded()); + assertTrue(tracker.getUploadBytesStarted() > 0); + assertEquals(0, tracker.getUploadBytesFailed()); + assertEquals(0, tracker.getInflightUploads()); + assertEquals(tracker.getTotalUploadsStarted(), tracker.getTotalUploadsSucceeded()); + assertTrue(tracker.getTotalUploadsStarted() > 0); + assertEquals(0, tracker.getTotalUploadsFailed()); + } + + private Tuple mockIndexShardWithRetryAndScheduleRefresh( + int succeedOnAttempt + ) throws IOException { + return mockIndexShardWithRetryAndScheduleRefresh(succeedOnAttempt, null, null); } - private void mockIndexShardWithRetryAndScheduleRefresh( + private Tuple mockIndexShardWithRetryAndScheduleRefresh( int succeedOnAttempt, CountDownLatch refreshCountLatch, CountDownLatch successLatch @@ -262,7 +343,10 @@ private void mockIndexShardWithRetryAndScheduleRefresh( // Create index shard that we will be using to mock different methods in IndexShard for the unit test indexShard = newStartedShard( true, - Settings.builder().put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true).build(), + Settings.builder() + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build(), new InternalEngineFactory() ); @@ -317,8 +401,25 @@ private void mockIndexShardWithRetryAndScheduleRefresh( return indexShard.getEngine(); }).when(shard).getEngine(); - RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener(shard, SegmentReplicationCheckpointPublisher.EMPTY); - refreshListener.afterRefresh(false); + clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool + ); + RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService = new RemoteRefreshSegmentPressureService( + clusterService, + Settings.EMPTY + ); + when(shard.indexSettings()).thenReturn(indexShard.indexSettings()); + when(shard.shardId()).thenReturn(indexShard.shardId()); + remoteRefreshSegmentPressureService.afterIndexShardCreated(shard); + RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener( + shard, + SegmentReplicationCheckpointPublisher.EMPTY, + remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()) + ); + refreshListener.afterRefresh(true); + return Tuple.tuple(refreshListener, remoteRefreshSegmentPressureService); } private static class TestFilterDirectory extends FilterDirectory { diff --git a/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java b/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java index 0989bf869f18e..213a22539971f 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java @@ -153,7 +153,8 @@ public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRem newRouting, s -> {}, RetentionLeaseSyncer.EMPTY, - SegmentReplicationCheckpointPublisher.EMPTY + SegmentReplicationCheckpointPublisher.EMPTY, + null ); IndexShardTestCase.updateRoutingEntry(shard, newRouting); assertEquals(5, counter.get()); diff --git a/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java index 0619e3e3f62a2..c8e0460758df1 100644 --- a/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java +++ b/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java @@ -46,6 +46,7 @@ import org.opensearch.index.Index; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; +import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; import org.opensearch.index.seqno.RetentionLeaseSyncer; import org.opensearch.index.shard.IndexEventListener; import org.opensearch.index.shard.IndexShard; @@ -262,7 +263,8 @@ public MockIndexShard createShard( final Consumer globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, final DiscoveryNode targetNode, - final DiscoveryNode sourceNode + final DiscoveryNode sourceNode, + final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService ) throws IOException { failRandomly(); RecoveryState recoveryState = new RecoveryState(shardRouting, targetNode, sourceNode); diff --git a/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java b/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java index 9a0402423416b..be64d89130950 100644 --- a/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java +++ b/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java @@ -581,7 +581,8 @@ private IndicesClusterStateService createIndicesClusterStateService( null, primaryReplicaSyncer, s -> {}, - RetentionLeaseSyncer.EMPTY + RetentionLeaseSyncer.EMPTY, + null ); } diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 63ae57511dd34..a51dc1c770f26 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -171,6 +171,7 @@ import org.opensearch.index.IndexingPressureService; import org.opensearch.index.SegmentReplicationPressureService; import org.opensearch.index.analysis.AnalysisRegistry; +import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; import org.opensearch.index.seqno.GlobalCheckpointSyncAction; import org.opensearch.index.seqno.RetentionLeaseSyncer; import org.opensearch.index.shard.PrimaryReplicaSyncer; @@ -1901,7 +1902,8 @@ public void onFailure(final Exception e) { actionFilters ), RetentionLeaseSyncer.EMPTY, - SegmentReplicationCheckpointPublisher.EMPTY + SegmentReplicationCheckpointPublisher.EMPTY, + mock(RemoteRefreshSegmentPressureService.class) ); Map actions = new HashMap<>(); final SystemIndices systemIndices = new SystemIndices(emptyMap()); @@ -1952,6 +1954,7 @@ public void onFailure(final Exception e) { mock(ShardStateAction.class), mock(ThreadPool.class) ), + mock(RemoteRefreshSegmentPressureService.class), new SystemIndices(emptyMap()) ); actions.put( diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index b3833655ab1ea..b785574ca52b2 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -56,6 +56,7 @@ import org.opensearch.cluster.routing.ShardRoutingHelper; import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.TestShardRouting; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.CheckedFunction; import org.opensearch.common.Nullable; import org.opensearch.common.UUIDs; @@ -89,6 +90,7 @@ import org.opensearch.index.engine.InternalEngineFactory; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.mapper.SourceToParse; +import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; import org.opensearch.index.replication.TestReplicationSource; import org.opensearch.index.seqno.ReplicationTracker; import org.opensearch.index.seqno.RetentionLeaseSyncer; @@ -169,6 +171,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting; +import static org.opensearch.test.ClusterServiceUtils.createClusterService; /** * A base class for unit tests that need to create and shutdown {@link IndexShard} instances easily, @@ -204,12 +207,14 @@ public void onFailure(ReplicationState state, ReplicationFailedException e, bool protected ThreadPool threadPool; protected long primaryTerm; + protected ClusterService clusterService; @Override public void setUp() throws Exception { super.setUp(); threadPool = setUpThreadPool(); primaryTerm = randomIntBetween(1, 100); // use random but fixed term for creating shards + clusterService = createClusterService(threadPool); failOnShardFailures(); } @@ -221,6 +226,7 @@ protected ThreadPool setUpThreadPool() { public void tearDown() throws Exception { try { tearDownThreadPool(); + clusterService.close(); } finally { super.tearDown(); } @@ -564,8 +570,13 @@ protected IndexShard newShard( Collections.emptyList(), clusterSettings ); - if (remoteStore == null && indexSettings.isRemoteStoreEnabled()) { - remoteStore = createRemoteStore(createTempDir(), routing, indexMetadata); + + RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService = null; + if (indexSettings.isRemoteStoreEnabled()) { + if (remoteStore == null) { + remoteStore = createRemoteStore(createTempDir(), routing, indexMetadata); + } + remoteRefreshSegmentPressureService = new RemoteRefreshSegmentPressureService(clusterService, indexSettings.getSettings()); } final BiFunction translogFactorySupplier = (settings, shardRouting) -> { @@ -601,9 +612,13 @@ protected IndexShard newShard( breakerService, translogFactorySupplier, checkpointPublisher, - remoteStore + remoteStore, + remoteRefreshSegmentPressureService ); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); + if (remoteRefreshSegmentPressureService != null) { + remoteRefreshSegmentPressureService.afterIndexShardCreated(indexShard); + } success = true; } finally { if (success == false) { diff --git a/test/framework/src/main/java/org/opensearch/snapshots/mockstore/MockRepository.java b/test/framework/src/main/java/org/opensearch/snapshots/mockstore/MockRepository.java index 0e47130e424cd..fcaf9f6c900d3 100644 --- a/test/framework/src/main/java/org/opensearch/snapshots/mockstore/MockRepository.java +++ b/test/framework/src/main/java/org/opensearch/snapshots/mockstore/MockRepository.java @@ -122,6 +122,8 @@ public long getFailureCount() { private final boolean skipExceptionOnListBlobs; + private final List skipExceptionOnBlobs; + private final boolean useLuceneCorruptionException; private final long maximumNumberOfFailures; @@ -182,6 +184,7 @@ public MockRepository( randomDataFileIOExceptionRate = metadata.settings().getAsDouble("random_data_file_io_exception_rate", 0.0); skipExceptionOnVerificationFile = metadata.settings().getAsBoolean("skip_exception_on_verification_file", false); skipExceptionOnListBlobs = metadata.settings().getAsBoolean("skip_exception_on_list_blobs", false); + skipExceptionOnBlobs = metadata.settings().getAsList("skip_exception_on_blobs"); useLuceneCorruptionException = metadata.settings().getAsBoolean("use_lucene_corruption", false); maximumNumberOfFailures = metadata.settings().getAsLong("max_failure_number", 100L); blockOnAnyFiles = metadata.settings().getAsBoolean("block_on_control", false); @@ -370,12 +373,14 @@ private int hashCode(String path) { private void maybeIOExceptionOrBlock(String blobName) throws IOException { if (INDEX_LATEST_BLOB.equals(blobName) // Condition 1 || skipExceptionOnVerificationFiles(blobName) // Condition 2 - || skipExceptionOnListBlobs(blobName)) { // Condition 3 + || skipExceptionOnListBlobs(blobName) // Condition 3 + || skipExceptionOnBlob(blobName)) { // Condition 4 // Condition 1 - Don't mess with the index.latest blob here, failures to write to it are ignored by // upstream logic and we have specific tests that cover the error handling around this blob. // Condition 2 & 3 - This condition has been added to allow creation of repository which throws IO // exception during normal remote store operations. However, if we fail during verification as well, // then we can not add the repository as well. + // Condition 4 - This condition allows to skip exception on specific blobName or blobPrefix return; } if (blobName.startsWith("__")) { @@ -582,5 +587,9 @@ private boolean isVerificationFile(String blobName) { private boolean skipExceptionOnListBlobs(String blobName) { return skipExceptionOnListBlobs && DUMMY_FILE_NAME_LIST_BLOBS.equals(blobName); } + + private boolean skipExceptionOnBlob(String blobName) { + return skipExceptionOnBlobs.contains(blobName); + } } }