diff --git a/CHANGELOG.md b/CHANGELOG.md index 95f958aaaba10..ef02c07076e6d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -94,6 +94,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Remote Store] Add repository stats for remote store([#10567](https://github.com/opensearch-project/OpenSearch/pull/10567)) - Add search query categorizer ([#10255](https://github.com/opensearch-project/OpenSearch/pull/10255)) - Introduce ConcurrentQueryProfiler to profile query using concurrent segment search path and support concurrency during rewrite and create weight ([10352](https://github.com/opensearch-project/OpenSearch/pull/10352)) +- [Remote cluster state] Make index and global metadata upload timeout dynamic cluster settings ([#10814](https://github.com/opensearch-project/OpenSearch/pull/10814)) - Add cluster state stats ([#10670](https://github.com/opensearch-project/OpenSearch/pull/10670)) ### Dependencies diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java index 8ae25c6758195..5e91176ed0473 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java @@ -15,6 +15,8 @@ import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsResponse; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.coordination.FollowersChecker; +import org.opensearch.cluster.coordination.LeaderChecker; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; @@ -23,15 +25,20 @@ import org.opensearch.index.IndexSettings; import org.opensearch.index.remote.RemoteSegmentTransferTracker; import org.opensearch.index.remote.RemoteTranslogTransferTracker; +import org.opensearch.plugins.Plugin; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; -import org.junit.Before; +import org.opensearch.test.disruption.NetworkDisruption; +import org.opensearch.test.transport.MockTransportService; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Locale; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -44,12 +51,17 @@ public class RemoteStoreStatsIT extends RemoteStoreBaseIntegTestCase { private static final String INDEX_NAME = "remote-store-test-idx-1"; - @Before + @Override + protected Collection> nodePlugins() { + return Arrays.asList(MockTransportService.TestPlugin.class); + } + public void setup() { internalCluster().startNodes(3); } public void testStatsResponseFromAllNodes() { + setup(); // Step 1 - We create cluster, create an index, and then index documents into. We also do multiple refreshes/flushes // during this time frame. This ensures that the segment upload has started. @@ -118,6 +130,7 @@ public void testStatsResponseFromAllNodes() { } public void testStatsResponseAllShards() { + setup(); // Step 1 - We create cluster, create an index, and then index documents into. We also do multiple refreshes/flushes // during this time frame. This ensures that the segment upload has started. @@ -175,6 +188,7 @@ public void testStatsResponseAllShards() { } public void testStatsResponseFromLocalNode() { + setup(); // Step 1 - We create cluster, create an index, and then index documents into. We also do multiple refreshes/flushes // during this time frame. This ensures that the segment upload has started. @@ -236,6 +250,7 @@ public void testStatsResponseFromLocalNode() { } public void testDownloadStatsCorrectnessSinglePrimarySingleReplica() throws Exception { + setup(); // Scenario: // - Create index with single primary and single replica shard // - Disable Refresh Interval for the index @@ -325,6 +340,7 @@ public void testDownloadStatsCorrectnessSinglePrimarySingleReplica() throws Exce } public void testDownloadStatsCorrectnessSinglePrimaryMultipleReplicaShards() throws Exception { + setup(); // Scenario: // - Create index with single primary and N-1 replica shards (N = no of data nodes) // - Disable Refresh Interval for the index @@ -416,6 +432,7 @@ public void testDownloadStatsCorrectnessSinglePrimaryMultipleReplicaShards() thr } public void testStatsOnShardRelocation() { + setup(); // Scenario: // - Create index with single primary and single replica shard // - Index documents @@ -471,6 +488,7 @@ public void testStatsOnShardRelocation() { } public void testStatsOnShardUnassigned() throws IOException { + setup(); // Scenario: // - Create index with single primary and two replica shard // - Index documents @@ -497,6 +515,7 @@ public void testStatsOnShardUnassigned() throws IOException { } public void testStatsOnRemoteStoreRestore() throws IOException { + setup(); // Creating an index with primary shard count == total nodes in cluster and 0 replicas int dataNodeCount = client().admin().cluster().prepareHealth().get().getNumberOfDataNodes(); createIndex(INDEX_NAME, remoteStoreIndexSettings(0, dataNodeCount)); @@ -544,6 +563,7 @@ public void testStatsOnRemoteStoreRestore() throws IOException { } public void testNonZeroPrimaryStatsOnNewlyCreatedIndexWithZeroDocs() throws Exception { + setup(); // Create an index with one primary and one replica shard createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 1)); ensureGreen(INDEX_NAME); @@ -581,6 +601,58 @@ public void testNonZeroPrimaryStatsOnNewlyCreatedIndexWithZeroDocs() throws Exce }, 5, TimeUnit.SECONDS); } + public void testStatsCorrectnessOnFailover() { + Settings clusterSettings = Settings.builder() + .put(LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING.getKey(), "100ms") + .put(LeaderChecker.LEADER_CHECK_INTERVAL_SETTING.getKey(), "500ms") + .put(LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), 1) + .put(FollowersChecker.FOLLOWER_CHECK_TIMEOUT_SETTING.getKey(), "100ms") + .put(FollowersChecker.FOLLOWER_CHECK_INTERVAL_SETTING.getKey(), "500ms") + .put(FollowersChecker.FOLLOWER_CHECK_RETRY_COUNT_SETTING.getKey(), 1) + .put(nodeSettings(0)) + .build(); + String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(clusterSettings); + internalCluster().startDataOnlyNodes(2, clusterSettings); + + // Create an index with one primary and one replica shard + createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 1)); + ensureGreen(INDEX_NAME); + + // Index some docs and refresh + indexDocs(); + refresh(INDEX_NAME); + + String primaryNode = primaryNodeName(INDEX_NAME); + String replicaNode = replicaNodeName(INDEX_NAME); + + // Start network disruption - primary node will be isolated + Set nodesInOneSide = Stream.of(clusterManagerNode, replicaNode).collect(Collectors.toCollection(HashSet::new)); + Set nodesInOtherSide = Stream.of(primaryNode).collect(Collectors.toCollection(HashSet::new)); + NetworkDisruption networkDisruption = new NetworkDisruption( + new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInOtherSide), + NetworkDisruption.DISCONNECT + ); + internalCluster().setDisruptionScheme(networkDisruption); + logger.info("--> network disruption is started"); + networkDisruption.startDisrupting(); + ensureStableCluster(2, clusterManagerNode); + + RemoteStoreStatsResponse response = client(clusterManagerNode).admin().cluster().prepareRemoteStoreStats(INDEX_NAME, "0").get(); + final String indexShardId = String.format(Locale.ROOT, "[%s][%s]", INDEX_NAME, "0"); + List matches = Arrays.stream(response.getRemoteStoreStats()) + .filter(stat -> indexShardId.equals(stat.getSegmentStats().shardId.toString())) + .collect(Collectors.toList()); + assertEquals(1, matches.size()); + RemoteSegmentTransferTracker.Stats segmentStats = matches.get(0).getSegmentStats(); + assertEquals(0, segmentStats.refreshTimeLagMs); + + networkDisruption.stopDisrupting(); + internalCluster().clearDisruptionScheme(); + ensureStableCluster(3, clusterManagerNode); + ensureGreen(INDEX_NAME); + logger.info("Test completed"); + } + private void indexDocs() { for (int i = 0; i < randomIntBetween(5, 10); i++) { if (randomBoolean()) { diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 7ac7da819b215..a0fca4f0a2ff0 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -682,6 +682,8 @@ public void apply(Settings value, Settings current, Settings previous) { // Remote cluster state settings RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING, + RemoteClusterStateService.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING, + RemoteClusterStateService.GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING, IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING, IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING, diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index 83bf8c82ee3dd..62e8faf33e1fa 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -221,6 +221,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { // Settings for remote translog IndexSettings.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING, + IndexSettings.INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING, // Settings for remote store enablement IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING, diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 74e0c786923cc..0246fe8947eb7 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -83,9 +83,23 @@ public class RemoteClusterStateService implements Closeable { private static final Logger logger = LogManager.getLogger(RemoteClusterStateService.class); - // TODO make this two variable as dynamic setting [issue: #10688] - public static final int INDEX_METADATA_UPLOAD_WAIT_MILLIS = 20000; - public static final int GLOBAL_METADATA_UPLOAD_WAIT_MILLIS = 20000; + public static final TimeValue INDEX_METADATA_UPLOAD_TIMEOUT_DEFAULT = TimeValue.timeValueMillis(20000); + + public static final TimeValue GLOBAL_METADATA_UPLOAD_TIMEOUT_DEFAULT = TimeValue.timeValueMillis(20000); + + public static final Setting INDEX_METADATA_UPLOAD_TIMEOUT_SETTING = Setting.timeSetting( + "cluster.remote_store.state.index_metadata.upload_timeout", + INDEX_METADATA_UPLOAD_TIMEOUT_DEFAULT, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + public static final Setting GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING = Setting.timeSetting( + "cluster.remote_store.state.global_metadata.upload_timeout", + GLOBAL_METADATA_UPLOAD_TIMEOUT_DEFAULT, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); public static final ChecksumBlobStoreFormat INDEX_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( "index-metadata", @@ -141,6 +155,9 @@ public class RemoteClusterStateService implements Closeable { private BlobStoreTransferService blobStoreTransferService; private volatile TimeValue slowWriteLoggingThreshold; + private volatile TimeValue indexMetadataUploadTimeout; + private volatile TimeValue globalMetadataUploadTimeout; + private final AtomicBoolean deleteStaleMetadataRunning = new AtomicBoolean(false); private final RemotePersistenceStats remoteStateStats; public static final int INDEX_METADATA_CURRENT_CODEC_VERSION = 1; @@ -171,7 +188,11 @@ public RemoteClusterStateService( this.relativeTimeNanosSupplier = relativeTimeNanosSupplier; this.threadpool = threadPool; this.slowWriteLoggingThreshold = clusterSettings.get(SLOW_WRITE_LOGGING_THRESHOLD); + this.indexMetadataUploadTimeout = clusterSettings.get(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING); + this.globalMetadataUploadTimeout = clusterSettings.get(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING); clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold); + clusterSettings.addSettingsUpdateConsumer(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING, this::setIndexMetadataUploadTimeout); + clusterSettings.addSettingsUpdateConsumer(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, this::setGlobalMetadataUploadTimeout); this.remoteStateStats = new RemotePersistenceStats(); } @@ -372,7 +393,7 @@ private String writeGlobalMetadata(ClusterState clusterState) throws IOException ); try { - if (latch.await(GLOBAL_METADATA_UPLOAD_WAIT_MILLIS, TimeUnit.MILLISECONDS) == false) { + if (latch.await(getGlobalMetadataUploadTimeout().millis(), TimeUnit.MILLISECONDS) == false) { // TODO: We should add metrics where transfer is timing out. [Issue: #10687] GlobalMetadataTransferException ex = new GlobalMetadataTransferException( String.format(Locale.ROOT, "Timed out waiting for transfer of global metadata to complete") @@ -427,7 +448,7 @@ private List writeIndexMetadataParallel(ClusterState clus } try { - if (latch.await(INDEX_METADATA_UPLOAD_WAIT_MILLIS, TimeUnit.MILLISECONDS) == false) { + if (latch.await(getIndexMetadataUploadTimeout().millis(), TimeUnit.MILLISECONDS) == false) { IndexMetadataTransferException ex = new IndexMetadataTransferException( String.format( Locale.ROOT, @@ -626,6 +647,22 @@ private void setSlowWriteLoggingThreshold(TimeValue slowWriteLoggingThreshold) { this.slowWriteLoggingThreshold = slowWriteLoggingThreshold; } + private void setIndexMetadataUploadTimeout(TimeValue newIndexMetadataUploadTimeout) { + this.indexMetadataUploadTimeout = newIndexMetadataUploadTimeout; + } + + private void setGlobalMetadataUploadTimeout(TimeValue newGlobalMetadataUploadTimeout) { + this.globalMetadataUploadTimeout = newGlobalMetadataUploadTimeout; + } + + public TimeValue getIndexMetadataUploadTimeout() { + return this.indexMetadataUploadTimeout; + } + + public TimeValue getGlobalMetadataUploadTimeout() { + return this.globalMetadataUploadTimeout; + } + static String getManifestFileName(long term, long version, boolean committed) { // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest/manifest______C/P____ return String.join( diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 99d2b5a74c406..00e765d73f77f 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -668,6 +668,14 @@ public static IndexMergePolicy fromString(String text) { Property.IndexScope ); + public static final Setting INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING = Setting.intSetting( + "index.remote_store.translog.keep_extra_gen", + 100, + 0, + Property.Dynamic, + Property.IndexScope + ); + private final Index index; private final Version version; private final Logger logger; @@ -680,6 +688,7 @@ public static IndexMergePolicy fromString(String text) { private final String remoteStoreTranslogRepository; private final String remoteStoreRepository; private final boolean isRemoteSnapshot; + private int remoteTranslogKeepExtraGen; private Version extendedCompatibilitySnapshotVersion; // volatile fields are updated via #updateIndexMetadata(IndexMetadata) under lock @@ -850,6 +859,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti remoteStoreTranslogRepository = settings.get(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY); remoteTranslogUploadBufferInterval = INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(settings); remoteStoreRepository = settings.get(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY); + this.remoteTranslogKeepExtraGen = INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.get(settings); isRemoteSnapshot = IndexModule.Type.REMOTE_SNAPSHOT.match(this.settings); if (isRemoteSnapshot && FeatureFlags.isEnabled(SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY)) { @@ -1021,6 +1031,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING, this::setRemoteTranslogUploadBufferInterval ); + scopedSettings.addSettingsUpdateConsumer(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING, this::setRemoteTranslogKeepExtraGen); } private void setSearchIdleAfter(TimeValue searchIdleAfter) { @@ -1300,6 +1311,10 @@ public TimeValue getRemoteTranslogUploadBufferInterval() { return remoteTranslogUploadBufferInterval; } + public int getRemoteTranslogExtraKeep() { + return remoteTranslogKeepExtraGen; + } + /** * Returns true iff the remote translog buffer interval setting exists or in other words is explicitly set. */ @@ -1311,6 +1326,10 @@ public void setRemoteTranslogUploadBufferInterval(TimeValue remoteTranslogUpload this.remoteTranslogUploadBufferInterval = remoteTranslogUploadBufferInterval; } + public void setRemoteTranslogKeepExtraGen(int extraGen) { + this.remoteTranslogKeepExtraGen = extraGen; + } + /** * Returns this interval in which the shards of this index are asynchronously refreshed. {@code -1} means async refresh is disabled. */ diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteTranslogTransferTracker.java b/server/src/main/java/org/opensearch/index/remote/RemoteTranslogTransferTracker.java index 1a9896540212e..4214a87049350 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteTranslogTransferTracker.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteTranslogTransferTracker.java @@ -232,6 +232,63 @@ public RemoteTranslogTransferTracker.Stats stats() { ); } + @Override + public String toString() { + return "RemoteTranslogTransferStats{" + + "lastSuccessfulUploadTimestamp=" + + lastSuccessfulUploadTimestamp.get() + + "," + + "totalUploadsStarted=" + + totalUploadsStarted.get() + + "," + + "totalUploadsSucceeded=" + + totalUploadsSucceeded.get() + + "," + + "totalUploadsFailed=" + + totalUploadsFailed.get() + + "," + + "uploadBytesStarted=" + + uploadBytesStarted.get() + + "," + + "uploadBytesFailed=" + + uploadBytesFailed.get() + + "," + + "totalUploadTimeInMillis=" + + totalUploadTimeInMillis.get() + + "," + + "uploadBytesMovingAverage=" + + uploadBytesMovingAverageReference.get().getAverage() + + "," + + "uploadBytesPerSecMovingAverage=" + + uploadBytesPerSecMovingAverageReference.get().getAverage() + + "," + + "uploadTimeMovingAverage=" + + uploadTimeMsMovingAverageReference.get().getAverage() + + "," + + "lastSuccessfulDownloadTimestamp=" + + lastSuccessfulDownloadTimestamp.get() + + "," + + "totalDownloadsSucceeded=" + + totalDownloadsSucceeded.get() + + "," + + "downloadBytesSucceeded=" + + downloadBytesSucceeded.get() + + "," + + "totalDownloadTimeInMillis=" + + totalDownloadTimeInMillis.get() + + "," + + "downloadBytesMovingAverage=" + + downloadBytesMovingAverageReference.get().getAverage() + + "," + + "downloadBytesPerSecMovingAverage=" + + downloadBytesPerSecMovingAverageReference.get().getAverage() + + "," + + "downloadTimeMovingAverage=" + + downloadTimeMsMovingAverageReference.get().getAverage() + + "," + + "}"; + } + /** * Represents the tracker's state as seen in the stats API. * diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index f990a3b56e856..fb4e9056153aa 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -4774,6 +4774,8 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOE * @throws IOException if exception occurs while reading segments from remote store. */ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runnable onFileSync) throws IOException { + boolean syncSegmentSuccess = false; + long startTimeMs = System.currentTimeMillis(); assert indexSettings.isRemoteStoreEnabled(); logger.trace("Downloading segments from remote segment store"); RemoteSegmentStoreDirectory remoteDirectory = getRemoteDirectory(); @@ -4823,9 +4825,15 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn : "There should not be any segments file in the dir"; store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint); } + syncSegmentSuccess = true; } catch (IOException e) { throw new IndexShardRecoveryException(shardId, "Exception while copying segment files from remote segment store", e); } finally { + logger.trace( + "syncSegmentsFromRemoteSegmentStore success={} elapsedTime={}", + syncSegmentSuccess, + (System.currentTimeMillis() - startTimeMs) + ); store.decRef(); remoteStore.decRef(); } diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index c650edc31da8d..3e97b07abfb5d 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -123,14 +123,13 @@ public void beforeRefresh() throws IOException {} @Override protected void runAfterRefreshExactlyOnce(boolean didRefresh) { - if (shouldSync(didRefresh)) { + // We have 2 separate methods to check if sync needs to be done or not. This is required since we use the return boolean + // from isReadyForUpload to schedule refresh retries as the index shard or the primary mode are not in complete + // ready state. + if (shouldSync(didRefresh) && isReadyForUpload()) { segmentTracker.updateLocalRefreshTimeAndSeqNo(); try { - if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) { - logger.debug("primaryTerm update from={} to={}", primaryTerm, indexShard.getOperationPrimaryTerm()); - this.primaryTerm = indexShard.getOperationPrimaryTerm(); - this.remoteDirectory.init(); - } + initializeRemoteDirectoryOnTermUpdate(); try (GatedCloseable segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) { Collection localSegmentsPostRefresh = segmentInfosGatedCloseable.get().files(true); updateLocalSizeMapAndTracker(localSegmentsPostRefresh); @@ -160,20 +159,20 @@ protected boolean performAfterRefreshWithPermit(boolean didRefresh) { } private boolean shouldSync(boolean didRefresh) { - // The third condition exists for uploading the zero state segments where the refresh has not changed the reader reference, but it - // is important to upload the zero state segments so that the restore does not break. return this.primaryTerm != indexShard.getOperationPrimaryTerm() + // If the readers change, didRefresh is always true. || didRefresh - || remoteDirectory.getSegmentsUploadedToRemoteStore().isEmpty(); + // The third condition exists for uploading the zero state segments where the refresh has not changed the reader + // reference, but it is important to upload the zero state segments so that the restore does not break. + || remoteDirectory.getSegmentsUploadedToRemoteStore().isEmpty() + // When the shouldSync is called the first time, then 1st condition on primary term is true. But after that + // we update the primary term and the same condition would not evaluate to true again in syncSegments. + // Below check ensures that if there is commit, then that gets picked up by both 1st and 2nd shouldSync call. + || isRefreshAfterCommitSafe(); } private boolean syncSegments() { - if (indexShard.getReplicationTracker().isPrimaryMode() == false || indexShard.state() == IndexShardState.CLOSED) { - logger.debug( - "Skipped syncing segments with primaryMode={} indexShardState={}", - indexShard.getReplicationTracker().isPrimaryMode(), - indexShard.state() - ); + if (isReadyForUpload() == false) { // Following check is required to enable retry and make sure that we do not lose this refresh event // When primary shard is restored from remote store, the recovery happens first followed by changing // primaryMode to true. Due to this, the refresh that is triggered post replay of translog will not go through @@ -323,6 +322,19 @@ private boolean isRefreshAfterCommit() throws IOException { && !remoteDirectory.containsFile(lastCommittedLocalSegmentFileName, getChecksumOfLocalFile(lastCommittedLocalSegmentFileName))); } + /** + * Returns if the current refresh has happened after a commit. + * @return true if this refresh has happened on account of a commit. If otherwise or exception, returns false. + */ + private boolean isRefreshAfterCommitSafe() { + try { + return isRefreshAfterCommit(); + } catch (Exception e) { + logger.info("Exception occurred in isRefreshAfterCommitSafe", e); + } + return false; + } + void uploadMetadata(Collection localSegmentsPostRefresh, SegmentInfos segmentInfos, ReplicationCheckpoint replicationCheckpoint) throws IOException { final long maxSeqNo = ((InternalEngine) indexShard.getEngine()).currentOngoingRefreshCheckpoint(); @@ -439,6 +451,48 @@ private void updateFinalStatusInSegmentTracker(boolean uploadStatus, long bytesB } } + /** + * On primary term update, we (re)initialise the remote segment directory to reflect the latest metadata file that + * has been uploaded to remote store successfully. This method also updates the segment tracker about the latest + * uploaded segment files onto remote store. + */ + private void initializeRemoteDirectoryOnTermUpdate() throws IOException { + if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) { + logger.trace("primaryTerm update from={} to={}", primaryTerm, indexShard.getOperationPrimaryTerm()); + this.primaryTerm = indexShard.getOperationPrimaryTerm(); + RemoteSegmentMetadata uploadedMetadata = this.remoteDirectory.init(); + + // During failover, the uploaded metadata would have names of files that have been uploaded to remote store. + // Here we update the tracker with latest remote uploaded files. + if (uploadedMetadata != null) { + segmentTracker.setLatestUploadedFiles(uploadedMetadata.getMetadata().keySet()); + } + } + } + + /** + * This checks for readiness of the index shard and primary mode. This has separated from shouldSync since we use the + * returned value of this method for scheduling retries in syncSegments method. + * @return true iff primaryMode is true and index shard is not in closed state. + */ + private boolean isReadyForUpload() { + boolean isReady = indexShard.getReplicationTracker().isPrimaryMode() && indexShard.state() != IndexShardState.CLOSED; + if (isReady == false) { + StringBuilder sb = new StringBuilder("Skipped syncing segments with"); + if (indexShard.getReplicationTracker() != null) { + sb.append(" primaryMode=").append(indexShard.getReplicationTracker().isPrimaryMode()); + } + if (indexShard.state() != null) { + sb.append(" indexShardState=").append(indexShard.state()); + } + if (indexShard.getEngineOrNull() != null) { + sb.append(" engineType=").append(indexShard.getEngine().getClass().getSimpleName()); + } + logger.trace(sb.toString()); + } + return isReady; + } + /** * Creates an {@link UploadListener} containing the stats population logic which would be triggered before and after segment upload events */ diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 2dd9b1a545d4a..a305a774f5854 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -161,6 +161,7 @@ public static void download(Repository repository, ShardId shardId, ThreadPool t remoteTranslogTransferTracker ); RemoteFsTranslog.download(translogTransferManager, location, logger); + logger.trace(remoteTranslogTransferTracker.toString()); } static void download(TranslogTransferManager translogTransferManager, Path location, Logger logger) throws IOException { @@ -173,15 +174,20 @@ static void download(TranslogTransferManager translogTransferManager, Path locat */ IOException ex = null; for (int i = 0; i <= DOWNLOAD_RETRIES; i++) { + boolean success = false; + long startTimeMs = System.currentTimeMillis(); try { downloadOnce(translogTransferManager, location, logger); + success = true; return; } catch (FileNotFoundException | NoSuchFileException e) { // continue till download retries ex = e; + } finally { + logger.trace("downloadOnce success={} timeElapsed={}", success, (System.currentTimeMillis() - startTimeMs)); } } - logger.debug("Exhausted all download retries during translog/checkpoint file download"); + logger.info("Exhausted all download retries during translog/checkpoint file download"); throw ex; } @@ -425,7 +431,7 @@ public void trimUnreferencedReaders() throws IOException { // cleans up remote translog files not referenced in latest uploaded metadata. // This enables us to restore translog from the metadata in case of failover or relocation. Set generationsToDelete = new HashSet<>(); - for (long generation = minRemoteGenReferenced - 1; generation >= 0; generation--) { + for (long generation = minRemoteGenReferenced - 1 - indexSettings().getRemoteTranslogExtraKeep(); generation >= 0; generation--) { if (fileTransferTracker.uploaded(Translog.getFilename(generation)) == false) { break; } diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 139b1664a544a..9950b5a08b89e 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -102,6 +102,7 @@ public class RemoteClusterStateServiceTests extends OpenSearchTestCase { private RemoteClusterStateService remoteClusterStateService; + private ClusterSettings clusterSettings; private Supplier repositoriesServiceSupplier; private RepositoriesService repositoriesService; private BlobStoreRepository blobStoreRepository; @@ -132,6 +133,7 @@ public void setup() { .put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) .build(); + clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); NamedXContentRegistry xContentRegistry = new NamedXContentRegistry( Stream.of( NetworkModule.getNamedXContents().stream(), @@ -149,7 +151,7 @@ public void setup() { "test-node-id", repositoriesServiceSupplier, settings, - new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + clusterSettings, () -> 0L, threadPool ); @@ -1087,6 +1089,38 @@ public void testSingleConcurrentExecutionOfStaleManifestCleanup() throws Excepti assertBusy(() -> assertEquals(1, callCount.get())); } + public void testIndexMetadataUploadWaitTimeSetting() { + // verify default value + assertEquals( + RemoteClusterStateService.INDEX_METADATA_UPLOAD_TIMEOUT_DEFAULT, + remoteClusterStateService.getIndexMetadataUploadTimeout() + ); + + // verify update index metadata upload timeout + int indexMetadataUploadTimeout = randomIntBetween(1, 10); + Settings newSettings = Settings.builder() + .put("cluster.remote_store.state.index_metadata.upload_timeout", indexMetadataUploadTimeout + "s") + .build(); + clusterSettings.applySettings(newSettings); + assertEquals(indexMetadataUploadTimeout, remoteClusterStateService.getIndexMetadataUploadTimeout().seconds()); + } + + public void testGlobalMetadataUploadWaitTimeSetting() { + // verify default value + assertEquals( + RemoteClusterStateService.GLOBAL_METADATA_UPLOAD_TIMEOUT_DEFAULT, + remoteClusterStateService.getGlobalMetadataUploadTimeout() + ); + + // verify update global metadata upload timeout + int globalMetadataUploadTimeout = randomIntBetween(1, 10); + Settings newSettings = Settings.builder() + .put("cluster.remote_store.state.global_metadata.upload_timeout", globalMetadataUploadTimeout + "s") + .build(); + clusterSettings.applySettings(newSettings); + assertEquals(globalMetadataUploadTimeout, remoteClusterStateService.getGlobalMetadataUploadTimeout().seconds()); + } + private void mockObjectsForGettingPreviousClusterUUID(Map clusterUUIDsPointers) throws IOException { mockObjectsForGettingPreviousClusterUUID(clusterUUIDsPointers, false); } diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java index 42e0df2dc90c1..3cb65610fab58 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java @@ -97,6 +97,7 @@ import java.util.zip.CheckedInputStream; import static org.opensearch.common.util.BigArrays.NON_RECYCLING_INSTANCE; +import static org.opensearch.index.IndexSettings.INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING; import static org.opensearch.index.translog.RemoteFsTranslog.TRANSLOG; import static org.opensearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder; import static org.opensearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; @@ -124,6 +125,8 @@ public class RemoteFsTranslogTests extends OpenSearchTestCase { private ThreadPool threadPool; private final static String METADATA_DIR = "metadata"; private final static String DATA_DIR = "data"; + + AtomicInteger writeCalls = new AtomicInteger(); BlobStoreRepository repository; BlobStoreTransferService blobStoreTransferService; @@ -163,13 +166,13 @@ public void tearDown() throws Exception { private RemoteFsTranslog create(Path path) throws IOException { final String translogUUID = Translog.createEmptyTranslog(path, SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get()); - return create(path, createRepository(), translogUUID); + return create(path, createRepository(), translogUUID, 0); } - private RemoteFsTranslog create(Path path, BlobStoreRepository repository, String translogUUID) throws IOException { + private RemoteFsTranslog create(Path path, BlobStoreRepository repository, String translogUUID, int extraGenToKeep) throws IOException { this.repository = repository; globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); - final TranslogConfig translogConfig = getTranslogConfig(path); + final TranslogConfig translogConfig = getTranslogConfig(path, extraGenToKeep); final TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(translogConfig.getIndexSettings()); threadPool = new TestThreadPool(getClass().getName()); blobStoreTransferService = new BlobStoreTransferService(repository.blobStore(), threadPool); @@ -185,10 +188,17 @@ private RemoteFsTranslog create(Path path, BlobStoreRepository repository, Strin primaryMode::get, new RemoteTranslogTransferTracker(shardId, 10) ); + } + private RemoteFsTranslog create(Path path, BlobStoreRepository repository, String translogUUID) throws IOException { + return create(path, repository, translogUUID, 0); } private TranslogConfig getTranslogConfig(final Path path) { + return getTranslogConfig(path, 0); + } + + private TranslogConfig getTranslogConfig(final Path path, int gensToKeep) { final Settings settings = Settings.builder() .put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT) // only randomize between nog age retention and a long one, so failures will have a chance of reproducing @@ -196,6 +206,7 @@ private TranslogConfig getTranslogConfig(final Path path) { .put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), randomIntBetween(-1, 2048) + "b") .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), gensToKeep) .build(); return getTranslogConfig(path, settings); } @@ -372,6 +383,111 @@ public void testSimpleOperations() throws IOException { } + private TranslogConfig getConfig(int gensToKeep) { + Path tempDir = createTempDir(); + final TranslogConfig temp = getTranslogConfig(tempDir, gensToKeep); + final TranslogConfig config = new TranslogConfig( + temp.getShardId(), + temp.getTranslogPath(), + temp.getIndexSettings(), + temp.getBigArrays(), + new ByteSizeValue(1, ByteSizeUnit.KB), + "" + ); + return config; + } + + private ChannelFactory getChannelFactory() { + writeCalls = new AtomicInteger(); + final ChannelFactory channelFactory = (file, openOption) -> { + FileChannel delegate = FileChannel.open(file, openOption); + boolean success = false; + try { + // don't do partial writes for checkpoints we rely on the fact that the bytes are written as an atomic operation + final boolean isCkpFile = file.getFileName().toString().endsWith(".ckp"); + + final FileChannel channel; + if (isCkpFile) { + channel = delegate; + } else { + channel = new FilterFileChannel(delegate) { + + @Override + public int write(ByteBuffer src) throws IOException { + writeCalls.incrementAndGet(); + return super.write(src); + } + }; + } + success = true; + return channel; + } finally { + if (success == false) { + IOUtils.closeWhileHandlingException(delegate); + } + } + }; + return channelFactory; + } + + public void testExtraGenToKeep() throws Exception { + TranslogConfig config = getConfig(1); + ChannelFactory channelFactory = getChannelFactory(); + final Set persistedSeqNos = new HashSet<>(); + String translogUUID = Translog.createEmptyTranslog( + config.getTranslogPath(), + SequenceNumbers.NO_OPS_PERFORMED, + shardId, + channelFactory, + primaryTerm.get() + ); + TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(config.getIndexSettings()); + ArrayList ops = new ArrayList<>(); + try ( + RemoteFsTranslog translog = new RemoteFsTranslog( + config, + translogUUID, + deletionPolicy, + () -> SequenceNumbers.NO_OPS_PERFORMED, + primaryTerm::get, + persistedSeqNos::add, + repository, + threadPool, + () -> Boolean.TRUE, + new RemoteTranslogTransferTracker(shardId, 10) + ) { + @Override + ChannelFactory getChannelFactory() { + return channelFactory; + } + } + ) { + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 0, primaryTerm.get(), new byte[] { 1 })); + + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("2", 1, primaryTerm.get(), new byte[] { 1 })); + + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("3", 2, primaryTerm.get(), new byte[] { 1 })); + + // expose the new checkpoint (simulating a commit), before we trim the translog + translog.setMinSeqNoToKeep(2); + + // Trims from local + translog.trimUnreferencedReaders(); + assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); + + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("4", 3, primaryTerm.get(), new byte[] { 1 })); + + // Trims from remote now + translog.trimUnreferencedReaders(); + assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); + assertEquals( + 6, + blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size() + ); + + } + } + public void testReadLocation() throws IOException { ArrayList ops = new ArrayList<>(); ArrayList locs = new ArrayList<>(); @@ -619,14 +735,22 @@ public void testSimpleOperationsUpload() throws Exception { // this should now trim as tlog-2 files from remote, but not tlog-3 and tlog-4 addToTranslogAndListAndUpload(translog, ops, new Translog.Index("2", 2, primaryTerm.get(), new byte[] { 1 })); assertEquals(2, translog.stats().estimatedNumberOfOperations()); + assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); translog.setMinSeqNoToKeep(2); - - assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); + // this should now trim as tlog-2 files from remote, but not tlog-3 and tlog-4 translog.trimUnreferencedReaders(); + assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); assertEquals(1, translog.readers.size()); assertEquals(1, translog.stats().estimatedNumberOfOperations()); - assertBusy(() -> assertEquals(4, translog.allUploaded().size())); + assertBusy(() -> { + assertEquals(4, translog.allUploaded().size()); + assertEquals( + 4, + blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size() + ); + }); + } public void testMetadataFileDeletion() throws Exception { @@ -1273,49 +1397,10 @@ public void testTranslogWriter() throws IOException { } public void testTranslogWriterCanFlushInAddOrReadCall() throws IOException { - Path tempDir = createTempDir(); - final TranslogConfig temp = getTranslogConfig(tempDir); - final TranslogConfig config = new TranslogConfig( - temp.getShardId(), - temp.getTranslogPath(), - temp.getIndexSettings(), - temp.getBigArrays(), - new ByteSizeValue(1, ByteSizeUnit.KB), - "" - ); - + final TranslogConfig config = getConfig(1); final Set persistedSeqNos = new HashSet<>(); - final AtomicInteger writeCalls = new AtomicInteger(); - - final ChannelFactory channelFactory = (file, openOption) -> { - FileChannel delegate = FileChannel.open(file, openOption); - boolean success = false; - try { - // don't do partial writes for checkpoints we rely on the fact that the bytes are written as an atomic operation - final boolean isCkpFile = file.getFileName().toString().endsWith(".ckp"); - - final FileChannel channel; - if (isCkpFile) { - channel = delegate; - } else { - channel = new FilterFileChannel(delegate) { - - @Override - public int write(ByteBuffer src) throws IOException { - writeCalls.incrementAndGet(); - return super.write(src); - } - }; - } - success = true; - return channel; - } finally { - if (success == false) { - IOUtils.closeWhileHandlingException(delegate); - } - } - }; - + writeCalls = new AtomicInteger(); + final ChannelFactory channelFactory = getChannelFactory(); String translogUUID = Translog.createEmptyTranslog( config.getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED,