From c85ee6898b4e3e2f4a2037e42337a7eb8033de9e Mon Sep 17 00:00:00 2001 From: gaobinlong Date: Tue, 24 Sep 2024 07:29:17 +0800 Subject: [PATCH 1/8] Fix search_as_you_type not supporting multi-fields (#15988) * Fix search_as_you_type not supporting multi-fields Signed-off-by: Gao Binlong * Modify change log Signed-off-by: Gao Binlong * Fix test failure Signed-off-by: Gao Binlong * Add more yaml test Signed-off-by: Gao Binlong --------- Signed-off-by: Gao Binlong --- CHANGELOG.md | 1 + .../mapper/SearchAsYouTypeFieldMapper.java | 13 +++- .../SearchAsYouTypeFieldMapperTests.java | 14 ++++ ...filtering.yml => 380_bitmap_filtering.yml} | 0 .../test/search/390_search_as_you_type.yml | 75 +++++++++++++++++++ 5 files changed, 101 insertions(+), 2 deletions(-) rename rest-api-spec/src/main/resources/rest-api-spec/test/search/{370_bitmap_filtering.yml => 380_bitmap_filtering.yml} (100%) create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/test/search/390_search_as_you_type.yml diff --git a/CHANGELOG.md b/CHANGELOG.md index 69a054acd24d2..c1041cdf4f64d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix wildcard query containing escaped character ([#15737](https://github.com/opensearch-project/OpenSearch/pull/15737)) - Fix case-insensitive query on wildcard field ([#15882](https://github.com/opensearch-project/OpenSearch/pull/15882)) - Add validation for the search backpressure cancellation settings ([#15501](https://github.com/opensearch-project/OpenSearch/pull/15501)) +- Fix search_as_you_type not supporting multi-fields ([#15988](https://github.com/opensearch-project/OpenSearch/pull/15988)) - Avoid infinite loop when `flat_object` field contains invalid token ([#15985](https://github.com/opensearch-project/OpenSearch/pull/15985)) - Fix infinite loop in nested agg ([#15931](https://github.com/opensearch-project/OpenSearch/pull/15931)) diff --git a/modules/mapper-extras/src/main/java/org/opensearch/index/mapper/SearchAsYouTypeFieldMapper.java b/modules/mapper-extras/src/main/java/org/opensearch/index/mapper/SearchAsYouTypeFieldMapper.java index 366e848416328..f08815ebbbd1e 100644 --- a/modules/mapper-extras/src/main/java/org/opensearch/index/mapper/SearchAsYouTypeFieldMapper.java +++ b/modules/mapper-extras/src/main/java/org/opensearch/index/mapper/SearchAsYouTypeFieldMapper.java @@ -264,7 +264,15 @@ public SearchAsYouTypeFieldMapper build(Mapper.BuilderContext context) { } ft.setPrefixField(prefixFieldType); ft.setShingleFields(shingleFieldTypes); - return new SearchAsYouTypeFieldMapper(name, ft, copyTo.build(), prefixFieldMapper, shingleFieldMappers, this); + return new SearchAsYouTypeFieldMapper( + name, + ft, + multiFieldsBuilder.build(this, context), + copyTo.build(), + prefixFieldMapper, + shingleFieldMappers, + this + ); } } @@ -623,12 +631,13 @@ public SpanQuery spanPrefixQuery(String value, SpanMultiTermQueryWrapper.SpanRew public SearchAsYouTypeFieldMapper( String simpleName, SearchAsYouTypeFieldType mappedFieldType, + MultiFields multiFields, CopyTo copyTo, PrefixFieldMapper prefixField, ShingleFieldMapper[] shingleFields, Builder builder ) { - super(simpleName, mappedFieldType, MultiFields.empty(), copyTo); + super(simpleName, mappedFieldType, multiFields, copyTo); this.prefixField = prefixField; this.shingleFields = shingleFields; this.maxShingleSize = builder.maxShingleSize.getValue(); diff --git a/modules/mapper-extras/src/test/java/org/opensearch/index/mapper/SearchAsYouTypeFieldMapperTests.java b/modules/mapper-extras/src/test/java/org/opensearch/index/mapper/SearchAsYouTypeFieldMapperTests.java index f55ad2e9d659c..7746cb714a019 100644 --- a/modules/mapper-extras/src/test/java/org/opensearch/index/mapper/SearchAsYouTypeFieldMapperTests.java +++ b/modules/mapper-extras/src/test/java/org/opensearch/index/mapper/SearchAsYouTypeFieldMapperTests.java @@ -298,6 +298,20 @@ private void assertMultiField(int shingleSize) throws IOException { } } + public void testSubField() throws IOException { + MapperService mapperService = createMapperService( + fieldMapping( + b -> b.field("type", "search_as_you_type") + .startObject("fields") + .startObject("subField") + .field("type", "keyword") + .endObject() + .endObject() + ) + ); + assertThat(mapperService.fieldType("field.subField"), instanceOf(KeywordFieldMapper.KeywordFieldType.class)); + } + public void testIndexOptions() throws IOException { DocumentMapper mapper = createDocumentMapper( fieldMapping(b -> b.field("type", "search_as_you_type").field("index_options", "offsets")) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search/370_bitmap_filtering.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/search/380_bitmap_filtering.yml similarity index 100% rename from rest-api-spec/src/main/resources/rest-api-spec/test/search/370_bitmap_filtering.yml rename to rest-api-spec/src/main/resources/rest-api-spec/test/search/380_bitmap_filtering.yml diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search/390_search_as_you_type.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/search/390_search_as_you_type.yml new file mode 100644 index 0000000000000..689b3edfd1066 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search/390_search_as_you_type.yml @@ -0,0 +1,75 @@ +setup: + - do: + indices.create: + index: test_1 + body: + mappings: + properties: + text: + type: search_as_you_type + fields: + subField: + type: keyword + - do: + index: + index: test_1 + id: 1 + body: { text: test search as you type } + + - do: + indices.refresh: + index: [test_1] + +--- +teardown: + - do: + indices.delete: + index: test_1 + +# related issue: https://github.com/opensearch-project/OpenSearch/issues/5035 +--- +"Test search_as_you_type data type supports multi-fields": + - skip: + version: " - 2.99.99" + reason: "the bug was fixed since 3.0.0" + + - do: + indices.get_mapping: { + index: test_1 + } + + - match: {test_1.mappings.properties.text.type: search_as_you_type} + - match: {test_1.mappings.properties.text.fields.subField.type: keyword} + + - do: + search: + index: test_1 + body: + query: + multi_match: + query: "test search" + type: "bool_prefix" + + - match: {hits.total.value: 1} + + - do: + search: + index: test_1 + body: + query: + multi_match: + query: "test search" + type: "bool_prefix" + fields: ["text.subField"] + + - match: {hits.total.value: 1} + + - do: + search: + index: test_1 + body: + query: + term: + text.subField: "test search as you type" + + - match: {hits.total.value: 1} From eeb2f3997bb84f33f13b848e125051ecf2c2a1c7 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Tue, 24 Sep 2024 11:07:30 +0530 Subject: [PATCH 2/8] Fix flaky test in RemoteStoreMigrationSettingsUpdateIT (#16048) Signed-off-by: Ashish Singh --- .../RemoteStoreMigrationSettingsUpdateIT.java | 7 +++++-- .../java/org/opensearch/test/OpenSearchIntegTestCase.java | 4 ++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteStoreMigrationSettingsUpdateIT.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteStoreMigrationSettingsUpdateIT.java index 30c597e405f4e..d9e72dd137182 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteStoreMigrationSettingsUpdateIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteStoreMigrationSettingsUpdateIT.java @@ -12,6 +12,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.SettingsException; import org.opensearch.common.unit.TimeValue; +import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; @@ -68,7 +69,6 @@ public void testNewIndexIsRemoteStoreBackedForRemoteStoreDirectionAndMixedMode() assertRemoteStoreBackedIndex(indexName2); } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/15793") public void testNewRestoredIndexIsRemoteStoreBackedForRemoteStoreDirectionAndMixedMode() throws Exception { logger.info("Initialize cluster: gives non remote cluster manager"); initializeCluster(false); @@ -76,7 +76,10 @@ public void testNewRestoredIndexIsRemoteStoreBackedForRemoteStoreDirectionAndMix logger.info("Add remote and non-remote nodes"); setClusterMode(MIXED.mode); addRemote = false; - String nonRemoteNodeName = internalCluster().startNode(); + Settings settings = Settings.builder() + .put(BlobStoreRepository.SNAPSHOT_SHARD_PATH_PREFIX_SETTING.getKey(), snapshotShardPathFixedPrefix ? "c" : "") + .build(); + String nonRemoteNodeName = internalCluster().startNode(settings); addRemote = true; String remoteNodeName = internalCluster().startNode(); internalCluster().validateClusterFormed(); diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index e474ef202b235..68a2b8086a92e 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -403,7 +403,7 @@ public abstract class OpenSearchIntegTestCase extends OpenSearchTestCase { private static Boolean segmentsPathFixedPrefix; - private static Boolean snapshotShardPathFixedPrefix; + protected static Boolean snapshotShardPathFixedPrefix; private Path remoteStoreRepositoryPath; @@ -2904,7 +2904,7 @@ private static Settings buildRemoteStoreNodeAttributes( settings.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), randomBoolean()); settings.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.getKey(), translogPathFixedPrefix ? "a" : ""); settings.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.getKey(), segmentsPathFixedPrefix ? "b" : ""); - settings.put(BlobStoreRepository.SNAPSHOT_SHARD_PATH_PREFIX_SETTING.getKey(), segmentsPathFixedPrefix ? "c" : ""); + settings.put(BlobStoreRepository.SNAPSHOT_SHARD_PATH_PREFIX_SETTING.getKey(), snapshotShardPathFixedPrefix ? "c" : ""); return settings.build(); } From d6bda7dca75dc4d328b61abcd21b35a6bdfcab05 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 24 Sep 2024 15:08:24 +0800 Subject: [PATCH 3/8] Bump dnsjava:dnsjava from 3.6.1 to 3.6.2 in /test/fixtures/hdfs-fixture (#16041) * Bump dnsjava:dnsjava from 3.6.1 to 3.6.2 in /test/fixtures/hdfs-fixture Bumps [dnsjava:dnsjava](https://github.com/dnsjava/dnsjava) from 3.6.1 to 3.6.2. - [Release notes](https://github.com/dnsjava/dnsjava/releases) - [Changelog](https://github.com/dnsjava/dnsjava/blob/master/Changelog) - [Commits](https://github.com/dnsjava/dnsjava/compare/v3.6.1...v3.6.2) --- updated-dependencies: - dependency-name: dnsjava:dnsjava dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] * Update changelog Signed-off-by: dependabot[bot] --------- Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: dependabot[bot] --- CHANGELOG.md | 1 + test/fixtures/hdfs-fixture/build.gradle | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c1041cdf4f64d..8a32f10ebc691 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Bump `ch.qos.logback:logback-core` from 1.5.6 to 1.5.8 ([#15946](https://github.com/opensearch-project/OpenSearch/pull/15946)) - Update protobuf from 3.25.4 to 3.25.5 ([#16011](https://github.com/opensearch-project/OpenSearch/pull/16011)) - Bump `actions/github-script` from 5 to 7 ([#16039](https://github.com/opensearch-project/OpenSearch/pull/16039)) +- Bump `dnsjava:dnsjava` from 3.6.1 to 3.6.2 ([#16041](https://github.com/opensearch-project/OpenSearch/pull/16041)) ### Changed diff --git a/test/fixtures/hdfs-fixture/build.gradle b/test/fixtures/hdfs-fixture/build.gradle index 20fd1058a181d..21fe28bfd835d 100644 --- a/test/fixtures/hdfs-fixture/build.gradle +++ b/test/fixtures/hdfs-fixture/build.gradle @@ -55,7 +55,7 @@ dependencies { exclude group: 'com.nimbusds' exclude module: "commons-configuration2" } - api "dnsjava:dnsjava:3.6.1" + api "dnsjava:dnsjava:3.6.2" api "org.codehaus.jettison:jettison:${versions.jettison}" api "org.apache.commons:commons-compress:${versions.commonscompress}" api "commons-codec:commons-codec:${versions.commonscodec}" From 848e5c676c780d11be78784da87359ae77f47f50 Mon Sep 17 00:00:00 2001 From: gaobinlong Date: Tue, 24 Sep 2024 22:35:30 +0800 Subject: [PATCH 4/8] Update version check in yaml test file for the bug fix of avoid infinite loop in flat_object parsing (#16057) Signed-off-by: Gao Binlong --- .../main/resources/rest-api-spec/test/index/90_flat_object.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/index/90_flat_object.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/index/90_flat_object.yml index c0fc0090abedf..e8da81d7bee41 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/index/90_flat_object.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/index/90_flat_object.yml @@ -76,7 +76,7 @@ teardown: --- "Invalid docs": - skip: - version: "- 2.99.99" + version: "- 2.17.99" reason: "parsing of these objects would infinite loop prior to 2.18" # The following documents are invalid. - do: From 6a2911962539c6288a7f126d529e2393c792fdeb Mon Sep 17 00:00:00 2001 From: Craig Perkins Date: Tue, 24 Sep 2024 12:49:31 -0400 Subject: [PATCH 5/8] Add support for docker compose v2 in TestFixturesPlugin (#16049) * Update references for docker-compose to docker compose Signed-off-by: Craig Perkins * Support both Signed-off-by: Craig Perkins * Check if V2 is available Signed-off-by: Craig Perkins * Add to CHANGELOG Signed-off-by: Craig Perkins * Update CHANGELOG text Signed-off-by: Craig Perkins --------- Signed-off-by: Craig Perkins --- CHANGELOG.md | 1 + .../gradle/docker/DockerSupportService.java | 11 +++++++++++ .../gradle/testfixtures/TestFixturesPlugin.java | 9 +++++++-- 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8a32f10ebc691..dfec81c070bab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Bump `dnsjava:dnsjava` from 3.6.1 to 3.6.2 ([#16041](https://github.com/opensearch-project/OpenSearch/pull/16041)) ### Changed +- Add support for docker compose v2 in TestFixturesPlugin ([#16049](https://github.com/opensearch-project/OpenSearch/pull/16049)) ### Deprecated diff --git a/buildSrc/src/main/java/org/opensearch/gradle/docker/DockerSupportService.java b/buildSrc/src/main/java/org/opensearch/gradle/docker/DockerSupportService.java index fc78792bb3551..77d7997d6d48d 100644 --- a/buildSrc/src/main/java/org/opensearch/gradle/docker/DockerSupportService.java +++ b/buildSrc/src/main/java/org/opensearch/gradle/docker/DockerSupportService.java @@ -106,6 +106,7 @@ public DockerAvailability getDockerAvailability() { Version version = null; boolean isVersionHighEnough = false; boolean isComposeAvailable = false; + boolean isComposeV2Available = false; // Check if the Docker binary exists final Optional dockerBinary = getDockerPath(); @@ -129,6 +130,8 @@ public DockerAvailability getDockerAvailability() { if (lastResult.isSuccess() && composePath.isPresent()) { isComposeAvailable = runCommand(composePath.get(), "version").isSuccess(); } + + isComposeV2Available = runCommand(dockerPath, "compose", "version").isSuccess(); } } } @@ -138,6 +141,7 @@ public DockerAvailability getDockerAvailability() { this.dockerAvailability = new DockerAvailability( isAvailable, isComposeAvailable, + isComposeV2Available, isVersionHighEnough, dockerPath, version, @@ -356,6 +360,11 @@ public static class DockerAvailability { */ public final boolean isComposeAvailable; + /** + * True if docker compose is available. + */ + public final boolean isComposeV2Available; + /** * True if the installed Docker version is >= 17.05 */ @@ -379,6 +388,7 @@ public static class DockerAvailability { DockerAvailability( boolean isAvailable, boolean isComposeAvailable, + boolean isComposeV2Available, boolean isVersionHighEnough, String path, Version version, @@ -386,6 +396,7 @@ public static class DockerAvailability { ) { this.isAvailable = isAvailable; this.isComposeAvailable = isComposeAvailable; + this.isComposeV2Available = isComposeV2Available; this.isVersionHighEnough = isVersionHighEnough; this.path = path; this.version = version; diff --git a/buildSrc/src/main/java/org/opensearch/gradle/testfixtures/TestFixturesPlugin.java b/buildSrc/src/main/java/org/opensearch/gradle/testfixtures/TestFixturesPlugin.java index e8772522b19a4..f65e231cd2e50 100644 --- a/buildSrc/src/main/java/org/opensearch/gradle/testfixtures/TestFixturesPlugin.java +++ b/buildSrc/src/main/java/org/opensearch/gradle/testfixtures/TestFixturesPlugin.java @@ -171,7 +171,11 @@ public void execute(Task task) { .findFirst(); composeExtension.getExecutable().set(dockerCompose.isPresent() ? dockerCompose.get() : "/usr/bin/docker"); - composeExtension.getUseDockerComposeV2().set(false); + if (dockerSupport.get().getDockerAvailability().isComposeV2Available) { + composeExtension.getUseDockerComposeV2().set(true); + } else if (dockerSupport.get().getDockerAvailability().isComposeAvailable) { + composeExtension.getUseDockerComposeV2().set(false); + } tasks.named("composeUp").configure(t -> { // Avoid running docker-compose tasks in parallel in CI due to some issues on certain Linux distributions @@ -228,7 +232,8 @@ private void maybeSkipTask(Provider dockerSupport, TaskPro private void maybeSkipTask(Provider dockerSupport, Task task) { task.onlyIf(spec -> { - boolean isComposeAvailable = dockerSupport.get().getDockerAvailability().isComposeAvailable; + boolean isComposeAvailable = dockerSupport.get().getDockerAvailability().isComposeV2Available + || dockerSupport.get().getDockerAvailability().isComposeAvailable; if (isComposeAvailable == false) { LOGGER.info("Task {} requires docker-compose but it is unavailable. Task will be skipped.", task.getPath()); } From b3cc802a11a5685bd32652caecca7e396b00e021 Mon Sep 17 00:00:00 2001 From: rajiv-kv <157019998+rajiv-kv@users.noreply.github.com> Date: Wed, 25 Sep 2024 02:24:00 +0530 Subject: [PATCH 6/8] Change the version to 2.18 as term-check fallback is merged to 2.x (#16062) Signed-off-by: Rajiv Kumar Vaidyanathan --- .../support/clustermanager/term/GetTermVersionResponse.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/support/clustermanager/term/GetTermVersionResponse.java b/server/src/main/java/org/opensearch/action/support/clustermanager/term/GetTermVersionResponse.java index cf8e6085d4b32..f3fd756c6db4b 100644 --- a/server/src/main/java/org/opensearch/action/support/clustermanager/term/GetTermVersionResponse.java +++ b/server/src/main/java/org/opensearch/action/support/clustermanager/term/GetTermVersionResponse.java @@ -41,7 +41,7 @@ public GetTermVersionResponse(ClusterStateTermVersion clusterStateTermVersion, b public GetTermVersionResponse(StreamInput in) throws IOException { super(in); this.clusterStateTermVersion = new ClusterStateTermVersion(in); - if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + if (in.getVersion().onOrAfter(Version.V_2_18_0)) { this.isStatePresentInRemote = in.readOptionalBoolean(); } else { this.isStatePresentInRemote = false; @@ -51,7 +51,7 @@ public GetTermVersionResponse(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { clusterStateTermVersion.writeTo(out); - if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + if (out.getVersion().onOrAfter(Version.V_2_18_0)) { out.writeOptionalBoolean(isStatePresentInRemote); } } From 12dadcf2aff9ce7e8f1bf4d6afe022684a7d6496 Mon Sep 17 00:00:00 2001 From: Sooraj Sinha <81695996+soosinha@users.noreply.github.com> Date: Wed, 25 Sep 2024 08:35:11 +0530 Subject: [PATCH 7/8] Retry remote state download while bootstrap (#15950) * Retry remote state download while bootstrap Signed-off-by: Sooraj Sinha --- .../RemoteStoreClusterStateRestoreIT.java | 4 +- .../opensearch/gateway/GatewayMetaState.java | 60 +++++++++++-- .../GatewayMetaStatePersistedStateTests.java | 84 ++++++++++++++++++- 3 files changed, 134 insertions(+), 14 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreClusterStateRestoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreClusterStateRestoreIT.java index d078ba05faa12..6a5adf5ea4fb7 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreClusterStateRestoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreClusterStateRestoreIT.java @@ -42,6 +42,7 @@ import org.opensearch.test.OpenSearchIntegTestCase; import org.junit.Before; +import java.io.IOError; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; @@ -339,10 +340,11 @@ public void testFullClusterRestoreManifestFilePointsToInvalidIndexMetadataPathTh for (UploadedIndexMetadata md : manifest.getIndices()) { Files.move(segmentRepoPath.resolve(md.getUploadedFilename()), segmentRepoPath.resolve("cluster-state/")); } + internalCluster().stopAllNodes(); } catch (IOException e) { throw new RuntimeException(e); } - assertThrows(IllegalStateException.class, () -> addNewNodes(dataNodeCount, clusterManagerNodeCount)); + assertThrows(IOError.class, () -> internalCluster().client()); // Test is complete // Starting a node without remote state to ensure test cleanup diff --git a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java index b3836edcd7d6c..f5da6df2689bd 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java @@ -66,13 +66,13 @@ import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.gateway.remote.model.RemoteClusterStateManifestInfo; import org.opensearch.index.recovery.RemoteStoreRestoreService; -import org.opensearch.index.recovery.RemoteStoreRestoreService.RemoteRestoreResult; import org.opensearch.node.Node; import org.opensearch.plugins.MetadataUpgrader; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; import java.io.Closeable; +import java.io.IOError; import java.io.IOException; import java.io.UncheckedIOException; import java.util.Collections; @@ -109,6 +109,8 @@ public class GatewayMetaState implements Closeable { */ public static final String STALE_STATE_CONFIG_NODE_ID = "STALE_STATE_CONFIG"; + private final Logger logger = LogManager.getLogger(GatewayMetaState.class); + private PersistedStateRegistry persistedStateRegistry; public PersistedState getPersistedState() { @@ -175,15 +177,11 @@ public void start( ); if (ClusterState.UNKNOWN_UUID.equals(lastKnownClusterUUID) == false) { // Load state from remote - final RemoteRestoreResult remoteRestoreResult = remoteStoreRestoreService.restore( - // Remote Metadata should always override local disk Metadata - // if local disk Metadata's cluster uuid is UNKNOWN_UUID - ClusterState.builder(clusterState).metadata(Metadata.EMPTY_METADATA).build(), - lastKnownClusterUUID, - false, - new String[] {} + clusterState = restoreClusterStateWithRetries( + remoteStoreRestoreService, + clusterState, + lastKnownClusterUUID ); - clusterState = remoteRestoreResult.getClusterState(); } } remotePersistedState = new RemotePersistedState(remoteClusterStateService, lastKnownClusterUUID); @@ -258,6 +256,50 @@ public void start( } } + private ClusterState restoreClusterStateWithRetries( + RemoteStoreRestoreService remoteStoreRestoreService, + ClusterState clusterState, + String lastKnownClusterUUID + ) { + int maxAttempts = 5; + int delayInMills = 200; + for (int attempt = 1; attempt <= maxAttempts; attempt++) { + try { + logger.info("Attempt {} to restore cluster state", attempt); + return restoreClusterState(remoteStoreRestoreService, clusterState, lastKnownClusterUUID); + } catch (Exception e) { + if (attempt == maxAttempts) { + // Throw an Error so that the process is halted. + throw new IOError(e); + } + try { + TimeUnit.MILLISECONDS.sleep(delayInMills); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); // Restore interrupted status + throw new RuntimeException(ie); + } + delayInMills = delayInMills * 2; + } + } + // This statement will never be reached. + return null; + } + + ClusterState restoreClusterState( + RemoteStoreRestoreService remoteStoreRestoreService, + ClusterState clusterState, + String lastKnownClusterUUID + ) { + return remoteStoreRestoreService.restore( + // Remote Metadata should always override local disk Metadata + // if local disk Metadata's cluster uuid is UNKNOWN_UUID + ClusterState.builder(clusterState).metadata(Metadata.EMPTY_METADATA).build(), + lastKnownClusterUUID, + false, + new String[] {} + ).getClusterState(); + } + // exposed so it can be overridden by tests ClusterState prepareInitialClusterState(TransportService transportService, ClusterService clusterService, ClusterState clusterState) { assert clusterState.nodes().getLocalNode() == null : "prepareInitialClusterState must only be called once"; diff --git a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java index 5ea5241762753..efdb3076f419c 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java @@ -1244,14 +1244,72 @@ public void testGatewayForRemoteStateForInitialBootstrapBlocksApplied() throws I } } - private MockGatewayMetaState newGatewayForRemoteState( + public void testGatewayMetaStateRemoteStateDownloadRetries() throws IOException { + MockGatewayMetaState gateway = null; + MockGatewayMetaState gatewayMetaStateSpy = null; + try { + RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class); + when(remoteClusterStateService.getLastKnownUUIDFromRemote("test-cluster")).thenReturn("test-cluster-uuid"); + RemoteStoreRestoreService remoteStoreRestoreService = mock(RemoteStoreRestoreService.class); + when(remoteStoreRestoreService.restore(any(), any(), anyBoolean(), any())).thenThrow( + new IllegalStateException("unable to download cluster state") + ).thenReturn(RemoteRestoreResult.build("test-cluster-uuid", null, ClusterState.EMPTY_STATE)); + final PersistedStateRegistry persistedStateRegistry = persistedStateRegistry(); + gateway = initializeGatewayForRemoteState(true); + gatewayMetaStateSpy = Mockito.spy(gateway); + startGatewayForRemoteState( + gatewayMetaStateSpy, + remoteClusterStateService, + remoteStoreRestoreService, + persistedStateRegistry, + ClusterState.EMPTY_STATE + ); + verify(gatewayMetaStateSpy, times(2)).restoreClusterState(Mockito.any(), Mockito.any(), Mockito.any()); + } finally { + IOUtils.close(gatewayMetaStateSpy); + } + } + + public void testGatewayMetaStateRemoteStateDownloadFailure() throws IOException { + MockGatewayMetaState gateway = null; + final MockGatewayMetaState gatewayMetaStateSpy; + try { + RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class); + when(remoteClusterStateService.getLastKnownUUIDFromRemote("test-cluster")).thenReturn("test-cluster-uuid"); + RemoteStoreRestoreService remoteStoreRestoreService = mock(RemoteStoreRestoreService.class); + when(remoteStoreRestoreService.restore(any(), any(), anyBoolean(), any())).thenThrow( + new IllegalStateException("unable to download cluster state") + ); + final PersistedStateRegistry persistedStateRegistry = persistedStateRegistry(); + gateway = initializeGatewayForRemoteState(true); + gatewayMetaStateSpy = Mockito.spy(gateway); + assertThrows( + Error.class, + () -> startGatewayForRemoteState( + gatewayMetaStateSpy, + remoteClusterStateService, + remoteStoreRestoreService, + persistedStateRegistry, + ClusterState.EMPTY_STATE + ) + ); + verify(gatewayMetaStateSpy, times(5)).restoreClusterState(Mockito.any(), Mockito.any(), Mockito.any()); + } finally { + IOUtils.close(gateway); + } + } + + private MockGatewayMetaState initializeGatewayForRemoteState(boolean prepareFullState) { + return new MockGatewayMetaState(localNode, bigArrays, prepareFullState); + } + + private MockGatewayMetaState startGatewayForRemoteState( + MockGatewayMetaState gateway, RemoteClusterStateService remoteClusterStateService, RemoteStoreRestoreService remoteStoreRestoreService, PersistedStateRegistry persistedStateRegistry, - ClusterState currentState, - boolean prepareFullState + ClusterState currentState ) throws IOException { - MockGatewayMetaState gateway = new MockGatewayMetaState(localNode, bigArrays, prepareFullState); String randomRepoName = "randomRepoName"; String stateRepoTypeAttributeKey = String.format( Locale.getDefault(), @@ -1305,6 +1363,24 @@ private MockGatewayMetaState newGatewayForRemoteState( return gateway; } + private MockGatewayMetaState newGatewayForRemoteState( + RemoteClusterStateService remoteClusterStateService, + RemoteStoreRestoreService remoteStoreRestoreService, + PersistedStateRegistry persistedStateRegistry, + ClusterState currentState, + boolean prepareFullState + ) throws IOException { + MockGatewayMetaState gatewayMetaState = initializeGatewayForRemoteState(prepareFullState); + startGatewayForRemoteState( + gatewayMetaState, + remoteClusterStateService, + remoteStoreRestoreService, + persistedStateRegistry, + currentState + ); + return gatewayMetaState; + } + private static BigArrays getBigArrays() { return usually() ? BigArrays.NON_RECYCLING_INSTANCE From dc4dbced2f9046ab5debb5dfec4e9634217be0c2 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Wed, 25 Sep 2024 15:27:25 +0530 Subject: [PATCH 8/8] Optimize remote store GC flow with pinned timestamps (#15943) Signed-off-by: Sachin Kale --- ...rePinnedTimestampsGarbageCollectionIT.java | 433 ++++++++++++++++++ .../snapshots/DeleteSnapshotV2IT.java | 231 +++++----- .../store/RemoteSegmentStoreDirectory.java | 16 +- .../RemoteFsTimestampAwareTranslog.java | 171 ++++--- .../index/translog/RemoteFsTranslog.java | 13 +- .../transfer/TranslogTransferMetadata.java | 10 + .../RemoteStorePinnedTimestampService.java | 4 + .../blobstore/BlobStoreRepository.java | 43 +- .../snapshots/SnapshotsService.java | 10 +- .../RemoteSegmentStoreDirectoryTests.java | 3 +- .../RemoteFsTimestampAwareTranslogTests.java | 147 +++++- 11 files changed, 854 insertions(+), 227 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java new file mode 100644 index 0000000000000..0a2668c60d3bd --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java @@ -0,0 +1,433 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore; + +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.collect.Tuple; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.action.ActionListener; +import org.opensearch.index.translog.transfer.TranslogTransferMetadata; +import org.opensearch.indices.RemoteStoreSettings; +import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.LongStream; + +import static org.opensearch.index.IndexSettings.INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING; +import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG; +import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA; +import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class RemoteStorePinnedTimestampsGarbageCollectionIT extends RemoteStoreBaseIntegTestCase { + static final String INDEX_NAME = "remote-store-test-idx-1"; + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true) + .put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA.getKey(), false) + .build(); + } + + private void keepPinnedTimestampSchedulerUpdated() throws InterruptedException { + long currentTime = System.currentTimeMillis(); + int maxRetry = 10; + while (maxRetry > 0 && RemoteStorePinnedTimestampService.getPinnedTimestamps().v1() <= currentTime) { + Thread.sleep(1000); + maxRetry--; + } + } + + ActionListener noOpActionListener = new ActionListener<>() { + @Override + public void onResponse(Void unused) {} + + @Override + public void onFailure(Exception e) {} + }; + + public void testLiveIndexNoPinnedTimestamps() throws Exception { + prepareCluster(1, 1, Settings.EMPTY); + Settings indexSettings = Settings.builder() + .put(remoteStoreIndexSettings(0, 1)) + .put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 0) + .build(); + createIndex(INDEX_NAME, indexSettings); + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + + RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance( + RemoteStorePinnedTimestampService.class, + primaryNodeName(INDEX_NAME) + ); + + remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1)); + + int numDocs = randomIntBetween(5, 10); + for (int i = 0; i < numDocs; i++) { + keepPinnedTimestampSchedulerUpdated(); + indexSingleDoc(INDEX_NAME, true); + } + + String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings()); + String shardDataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + DATA, + translogPathFixedPrefix + ).buildAsString(); + Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1"); + String shardMetadataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + METADATA, + translogPathFixedPrefix + ).buildAsString(); + Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath); + + assertBusy(() -> { + List metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList()); + assertEquals(2, metadataFiles.size()); + + verifyTranslogDataFileCount(metadataFiles, translogDataPath); + }); + } + + public void testLiveIndexNoPinnedTimestampsWithExtraGenSettingWithinLimit() throws Exception { + prepareCluster(1, 1, Settings.EMPTY); + Settings indexSettings = Settings.builder() + .put(remoteStoreIndexSettings(0, 1)) + .put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 10) + .build(); + createIndex(INDEX_NAME, indexSettings); + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + + RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance( + RemoteStorePinnedTimestampService.class, + primaryNodeName(INDEX_NAME) + ); + + remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1)); + + int numDocs = randomIntBetween(5, 9); + for (int i = 0; i < numDocs; i++) { + keepPinnedTimestampSchedulerUpdated(); + indexSingleDoc(INDEX_NAME, true); + } + + String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings()); + String shardDataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + DATA, + translogPathFixedPrefix + ).buildAsString(); + Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1"); + String shardMetadataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + METADATA, + translogPathFixedPrefix + ).buildAsString(); + Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath); + + assertBusy(() -> { + List metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList()); + assertTrue(metadataFiles.size() >= numDocs + 1); + + verifyTranslogDataFileCount(metadataFiles, translogDataPath); + }, 30, TimeUnit.SECONDS); + } + + public void testLiveIndexNoPinnedTimestampsWithExtraGenSetting() throws Exception { + prepareCluster(1, 1, Settings.EMPTY); + Settings indexSettings = Settings.builder() + .put(remoteStoreIndexSettings(0, 1)) + .put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 3) + .build(); + createIndex(INDEX_NAME, indexSettings); + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + + RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance( + RemoteStorePinnedTimestampService.class, + primaryNodeName(INDEX_NAME) + ); + + remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1)); + + int numDocs = 5; + for (int i = 0; i < numDocs; i++) { + keepPinnedTimestampSchedulerUpdated(); + indexSingleDoc(INDEX_NAME, true); + } + + String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings()); + String shardDataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + DATA, + translogPathFixedPrefix + ).buildAsString(); + Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1"); + String shardMetadataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + METADATA, + translogPathFixedPrefix + ).buildAsString(); + Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath); + + assertBusy(() -> { + List metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList()); + assertEquals(5, metadataFiles.size()); + + verifyTranslogDataFileCount(metadataFiles, translogDataPath); + }); + } + + public void testLiveIndexWithPinnedTimestamps() throws Exception { + prepareCluster(1, 1, Settings.EMPTY); + Settings indexSettings = Settings.builder() + .put(remoteStoreIndexSettings(0, 1)) + .put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 0) + .build(); + createIndex(INDEX_NAME, indexSettings); + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + + RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance( + RemoteStorePinnedTimestampService.class, + primaryNodeName(INDEX_NAME) + ); + + remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1)); + + int numDocs = randomIntBetween(5, 10); + for (int i = 0; i < numDocs; i++) { + keepPinnedTimestampSchedulerUpdated(); + indexSingleDoc(INDEX_NAME, true); + if (i == 2) { + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.timeValueMinutes(1)); + remoteStorePinnedTimestampService.pinTimestamp(System.currentTimeMillis(), "xyz", noOpActionListener); + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + } + } + + String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings()); + String shardDataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + DATA, + translogPathFixedPrefix + ).buildAsString(); + Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1"); + String shardMetadataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + METADATA, + translogPathFixedPrefix + ).buildAsString(); + Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath); + + assertBusy(() -> { + List metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList()); + assertEquals(3, metadataFiles.size()); + + verifyTranslogDataFileCount(metadataFiles, translogDataPath); + }); + } + + public void testIndexDeletionNoPinnedTimestamps() throws Exception { + prepareCluster(1, 1, Settings.EMPTY); + Settings indexSettings = Settings.builder() + .put(remoteStoreIndexSettings(0, 1)) + .put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 0) + .build(); + createIndex(INDEX_NAME, indexSettings); + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + + RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance( + RemoteStorePinnedTimestampService.class, + primaryNodeName(INDEX_NAME) + ); + + remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1)); + + int numDocs = randomIntBetween(5, 10); + for (int i = 0; i < numDocs; i++) { + keepPinnedTimestampSchedulerUpdated(); + indexSingleDoc(INDEX_NAME, true); + } + + String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings()); + String shardDataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + DATA, + translogPathFixedPrefix + ).buildAsString(); + Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1"); + String shardMetadataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + METADATA, + translogPathFixedPrefix + ).buildAsString(); + Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath); + + assertBusy(() -> { + List metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList()); + assertEquals(2, metadataFiles.size()); + + verifyTranslogDataFileCount(metadataFiles, translogDataPath); + }); + + keepPinnedTimestampSchedulerUpdated(); + client().admin().indices().prepareDelete(INDEX_NAME).get(); + + assertBusy(() -> { + assertEquals(0, Files.list(translogMetadataPath).collect(Collectors.toList()).size()); + assertEquals(0, Files.list(translogDataPath).collect(Collectors.toList()).size()); + }); + } + + public void testIndexDeletionWithPinnedTimestamps() throws Exception { + prepareCluster(1, 1, Settings.EMPTY); + Settings indexSettings = Settings.builder() + .put(remoteStoreIndexSettings(0, 1)) + .put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 0) + .build(); + createIndex(INDEX_NAME, indexSettings); + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + + RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance( + RemoteStorePinnedTimestampService.class, + primaryNodeName(INDEX_NAME) + ); + + remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1)); + + int numDocs = randomIntBetween(5, 10); + for (int i = 0; i < numDocs; i++) { + keepPinnedTimestampSchedulerUpdated(); + indexSingleDoc(INDEX_NAME, true); + if (i == 2) { + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.timeValueMinutes(1)); + remoteStorePinnedTimestampService.pinTimestamp(System.currentTimeMillis(), "xyz", noOpActionListener); + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + } + } + + String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings()); + String shardDataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + DATA, + translogPathFixedPrefix + ).buildAsString(); + Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1"); + String shardMetadataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + METADATA, + translogPathFixedPrefix + ).buildAsString(); + Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath); + + assertBusy(() -> { + List metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList()); + assertEquals(3, metadataFiles.size()); + + verifyTranslogDataFileCount(metadataFiles, translogDataPath); + }, 30, TimeUnit.SECONDS); + + keepPinnedTimestampSchedulerUpdated(); + client().admin().indices().prepareDelete(INDEX_NAME).get(); + + assertBusy(() -> { + List metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList()); + assertEquals(1, metadataFiles.size()); + + verifyTranslogDataFileCount(metadataFiles, translogDataPath); + }); + } + + private void verifyTranslogDataFileCount(List metadataFiles, Path translogDataPath) throws IOException { + List mdFiles = metadataFiles.stream().map(p -> p.getFileName().toString()).collect(Collectors.toList()); + Set generations = new HashSet<>(); + for (String mdFile : mdFiles) { + Tuple minMaxGen = TranslogTransferMetadata.getMinMaxTranslogGenerationFromFilename(mdFile); + generations.addAll(LongStream.rangeClosed(minMaxGen.v1(), minMaxGen.v2()).boxed().collect(Collectors.toList())); + } + assertEquals(generations.size() * 2, Files.list(translogDataPath).collect(Collectors.toList()).size()); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotV2IT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotV2IT.java index 1d7a58384c0be..c4e3a478c8540 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotV2IT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotV2IT.java @@ -23,19 +23,31 @@ import org.opensearch.repositories.fs.FsRepository; import org.opensearch.test.OpenSearchIntegTestCase; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; import java.nio.file.Path; +import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.lessThan; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class DeleteSnapshotV2IT extends AbstractSnapshotIntegTestCase { private static final String REMOTE_REPO_NAME = "remote-store-repo-name"; + private void keepPinnedTimestampSchedulerUpdated() throws InterruptedException { + long currentTime = System.currentTimeMillis(); + int maxRetry = 10; + while (maxRetry > 0 && RemoteStorePinnedTimestampService.getPinnedTimestamps().v1() <= currentTime) { + Thread.sleep(1000); + maxRetry--; + } + } + public void testDeleteShallowCopyV2() throws Exception { disableRepoConsistencyCheck("Remote store repository is being used in the test"); @@ -74,8 +86,8 @@ public void testDeleteShallowCopyV2() throws Exception { createIndex(indexName1, getRemoteStoreBackedIndexSettings()); createIndex(indexName2, getRemoteStoreBackedIndexSettings()); - final int numDocsInIndex1 = 10; - final int numDocsInIndex2 = 20; + final int numDocsInIndex1 = 1; + final int numDocsInIndex2 = 2; indexRandomDocs(indexName1, numDocsInIndex1); indexRandomDocs(indexName2, numDocsInIndex2); ensureGreen(indexName1, indexName2); @@ -92,7 +104,7 @@ public void testDeleteShallowCopyV2() throws Exception { assertThat(snapshotInfo.snapshotId().getName(), equalTo(snapshotName1)); createIndex(indexName3, getRemoteStoreBackedIndexSettings()); - indexRandomDocs(indexName3, 10); + indexRandomDocs(indexName3, 1); CreateSnapshotResponse createSnapshotResponse2 = client().admin() .cluster() .prepareCreateSnapshot(snapshotRepoName, snapshotName2) @@ -105,109 +117,101 @@ public void testDeleteShallowCopyV2() throws Exception { assertThat(snapshotInfo.snapshotId().getName(), equalTo(snapshotName2)); assertAcked(client().admin().indices().prepareDelete(indexName1)); - Thread.sleep(100); - AcknowledgedResponse deleteResponse = client().admin() - .cluster() - .prepareDeleteSnapshot(snapshotRepoName, snapshotName2) - .setSnapshots(snapshotName2) - .get(); + AcknowledgedResponse deleteResponse = client().admin().cluster().prepareDeleteSnapshot(snapshotRepoName, snapshotName2).get(); assertTrue(deleteResponse.isAcknowledged()); // test delete non-existent snapshot assertThrows( SnapshotMissingException.class, - () -> client().admin().cluster().prepareDeleteSnapshot(snapshotRepoName, "random-snapshot").setSnapshots(snapshotName2).get() + () -> client().admin().cluster().prepareDeleteSnapshot(snapshotRepoName, "random-snapshot").get() ); - } - public void testDeleteShallowCopyV2MultipleSnapshots() throws Exception { + public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exception { disableRepoConsistencyCheck("Remote store repository is being used in the test"); final Path remoteStoreRepoPath = randomRepoPath(); + Settings settings = remoteStoreClusterSettings(REMOTE_REPO_NAME, remoteStoreRepoPath); + settings = Settings.builder() + .put(settings) + .put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true) + .put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), RemoteStoreEnums.PathType.FIXED.toString()) + .build(); + String clusterManagerName = internalCluster().startClusterManagerOnlyNode(settings); + internalCluster().startDataOnlyNode(settings); + final Client clusterManagerClient = internalCluster().clusterManagerClient(); + ensureStableCluster(2); - internalCluster().startClusterManagerOnlyNode(snapshotV2Settings(remoteStoreRepoPath)); - internalCluster().startDataOnlyNode(snapshotV2Settings(remoteStoreRepoPath)); - internalCluster().startDataOnlyNode(snapshotV2Settings(remoteStoreRepoPath)); - - String indexName1 = "testindex1"; - String indexName2 = "testindex2"; - String indexName3 = "testindex3"; - String snapshotRepoName = "test-create-snapshot-repo"; - String snapshotName1 = "test-create-snapshot1"; - String snapshotName2 = "test-create-snapshot2"; - Path absolutePath1 = randomRepoPath().toAbsolutePath(); - logger.info("Snapshot Path [{}]", absolutePath1); - - Client client = client(); - - assertAcked( - client.admin() - .cluster() - .preparePutRepository(snapshotRepoName) - .setType(FsRepository.TYPE) - .setSettings( - Settings.builder() - .put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1) - .put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean()) - .put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES) - .put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true) - .put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true) - ) + RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance( + RemoteStorePinnedTimestampService.class, + clusterManagerName ); - createIndex(indexName1, getRemoteStoreBackedIndexSettings()); + final String snapshotRepoName = "snapshot-repo-name"; + final Path snapshotRepoPath = randomRepoPath(); + createRepository(snapshotRepoName, "mock", snapshotRepoSettingsForShallowV2(snapshotRepoPath)); - createIndex(indexName2, getRemoteStoreBackedIndexSettings()); + final String remoteStoreEnabledIndexName = "remote-index-1"; + final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings(); + createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings); + indexRandomDocs(remoteStoreEnabledIndexName, 25); - final int numDocsInIndex1 = 10; - final int numDocsInIndex2 = 20; - indexRandomDocs(indexName1, numDocsInIndex1); - indexRandomDocs(indexName2, numDocsInIndex2); - ensureGreen(indexName1, indexName2); + String indexUUID = client().admin() + .indices() + .prepareGetSettings(remoteStoreEnabledIndexName) + .get() + .getSetting(remoteStoreEnabledIndexName, IndexMetadata.SETTING_INDEX_UUID); + logger.info("--> create two remote index shallow snapshots"); CreateSnapshotResponse createSnapshotResponse = client().admin() .cluster() - .prepareCreateSnapshot(snapshotRepoName, snapshotName1) + .prepareCreateSnapshot(snapshotRepoName, "snap1") .setWaitForCompletion(true) .get(); - SnapshotInfo snapshotInfo = createSnapshotResponse.getSnapshotInfo(); - assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS)); - assertThat(snapshotInfo.successfulShards(), greaterThan(0)); - assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards())); - assertThat(snapshotInfo.snapshotId().getName(), equalTo(snapshotName1)); + SnapshotInfo snapshotInfo1 = createSnapshotResponse.getSnapshotInfo(); - createIndex(indexName3, getRemoteStoreBackedIndexSettings()); - indexRandomDocs(indexName3, 10); + Path indexPath = Path.of(String.valueOf(remoteStoreRepoPath), indexUUID); + Path shardPath = Path.of(String.valueOf(indexPath), "0"); - CreateSnapshotResponse createSnapshotResponse2 = client().admin() - .cluster() - .prepareCreateSnapshot(snapshotRepoName, snapshotName2) - .setWaitForCompletion(true) - .get(); - snapshotInfo = createSnapshotResponse2.getSnapshotInfo(); - assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS)); - assertThat(snapshotInfo.successfulShards(), greaterThan(0)); - assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards())); - assertThat(snapshotInfo.snapshotId().getName(), equalTo(snapshotName2)); + // delete remote store index + assertAcked(client().admin().indices().prepareDelete(remoteStoreEnabledIndexName)); + + logger.info("--> delete snapshot 1"); + + Path segmentsPath = Path.of(String.valueOf(shardPath), "segments"); + Path translogPath = Path.of(String.valueOf(shardPath), "translog"); + + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); - AcknowledgedResponse deleteResponse = client().admin() + remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1)); + + keepPinnedTimestampSchedulerUpdated(); + + AcknowledgedResponse deleteSnapshotResponse = clusterManagerClient.admin() .cluster() - .prepareDeleteSnapshot(snapshotRepoName, snapshotName1, snapshotName2) - .setSnapshots(snapshotName2) + .prepareDeleteSnapshot(snapshotRepoName, snapshotInfo1.snapshotId().getName()) .get(); - assertTrue(deleteResponse.isAcknowledged()); + assertAcked(deleteSnapshotResponse); - // test delete non-existent snapshot - assertThrows( - SnapshotMissingException.class, - () -> client().admin().cluster().prepareDeleteSnapshot(snapshotRepoName, "random-snapshot").setSnapshots(snapshotName2).get() - ); + // Delete is async. Give time for it + assertBusy(() -> { + try { + assertEquals(0, RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath)); + } catch (NoSuchFileException e) { + fail(); + } + }, 60, TimeUnit.SECONDS); + assertBusy(() -> { + try { + assertEquals(0, RemoteStoreBaseIntegTestCase.getFileCount(translogPath)); + } catch (NoSuchFileException e) { + fail(); + } + }, 60, TimeUnit.SECONDS); } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/15692") - public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exception { + public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2MultipleSnapshots() throws Exception { disableRepoConsistencyCheck("Remote store repository is being used in the test"); final Path remoteStoreRepoPath = randomRepoPath(); Settings settings = remoteStoreClusterSettings(REMOTE_REPO_NAME, remoteStoreRepoPath); @@ -242,11 +246,11 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exceptio .get() .getSetting(remoteStoreEnabledIndexName, IndexMetadata.SETTING_INDEX_UUID); - String numShards = client().admin() - .indices() - .prepareGetSettings(remoteStoreEnabledIndexName) - .get() - .getSetting(remoteStoreEnabledIndexName, IndexMetadata.SETTING_NUMBER_OF_SHARDS); + Path indexPath = Path.of(String.valueOf(remoteStoreRepoPath), indexUUID); + Path shardPath = Path.of(String.valueOf(indexPath), "0"); + + Path segmentsPath = Path.of(String.valueOf(shardPath), "segments", "data"); + Path translogPath = Path.of(String.valueOf(shardPath), "translog", "data", "1"); logger.info("--> create two remote index shallow snapshots"); CreateSnapshotResponse createSnapshotResponse = client().admin() @@ -256,6 +260,11 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exceptio .get(); SnapshotInfo snapshotInfo1 = createSnapshotResponse.getSnapshotInfo(); + List segmentsPostSnapshot1 = Files.list(segmentsPath).collect(Collectors.toList()); + List translogPostSnapshot1 = Files.list(translogPath).collect(Collectors.toList()); + + forceMerge(1); + refresh(remoteStoreEnabledIndexName); indexRandomDocs(remoteStoreEnabledIndexName, 25); CreateSnapshotResponse createSnapshotResponse2 = client().admin() @@ -264,46 +273,28 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exceptio .setWaitForCompletion(true) .get(); SnapshotInfo snapshotInfo2 = createSnapshotResponse2.getSnapshotInfo(); + + List segmentsPostSnapshot2 = Files.list(segmentsPath).collect(Collectors.toList()); + List translogPostSnapshot2 = Files.list(translogPath).collect(Collectors.toList()); + assertThat(snapshotInfo2.state(), equalTo(SnapshotState.SUCCESS)); assertThat(snapshotInfo2.successfulShards(), greaterThan(0)); assertThat(snapshotInfo2.successfulShards(), equalTo(snapshotInfo2.totalShards())); assertThat(snapshotInfo2.snapshotId().getName(), equalTo("snap2")); - // delete remote store index - assertAcked(client().admin().indices().prepareDelete(remoteStoreEnabledIndexName)); - - logger.info("--> delete snapshot 2"); + assertBusy(() -> assertTrue(translogPostSnapshot2.size() > translogPostSnapshot1.size()), 60, TimeUnit.SECONDS); + assertBusy(() -> assertTrue(segmentsPostSnapshot2.size() > segmentsPostSnapshot1.size()), 60, TimeUnit.SECONDS); - Path indexPath = Path.of(String.valueOf(remoteStoreRepoPath), indexUUID); - Path shardPath = Path.of(String.valueOf(indexPath), "0"); - Path segmentsPath = Path.of(String.valueOf(shardPath), "segments"); - Path translogPath = Path.of(String.valueOf(shardPath), "translog"); - - // Get total segments remote store directory file count for deleted index and shard 0 - int segmentFilesCountBeforeDeletingSnapshot1 = RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath); - int translogFilesCountBeforeDeletingSnapshot1 = RemoteStoreBaseIntegTestCase.getFileCount(translogPath); - - RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + keepPinnedTimestampSchedulerUpdated(); - AcknowledgedResponse deleteSnapshotResponse = clusterManagerClient.admin() - .cluster() - .prepareDeleteSnapshot(snapshotRepoName, snapshotInfo2.snapshotId().getName()) - .get(); - assertAcked(deleteSnapshotResponse); - - Thread.sleep(5000); - - assertBusy(() -> { - try { - assertThat(RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath), lessThan(segmentFilesCountBeforeDeletingSnapshot1)); - } catch (Exception e) {} - }, 30, TimeUnit.SECONDS); - int segmentFilesCountAfterDeletingSnapshot1 = RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath); + // delete remote store index + assertAcked(client().admin().indices().prepareDelete(remoteStoreEnabledIndexName)); logger.info("--> delete snapshot 1"); RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + keepPinnedTimestampSchedulerUpdated(); // on snapshot deletion, remote store segment files should get cleaned up for deleted index - `remote-index-1` - deleteSnapshotResponse = clusterManagerClient.admin() + AcknowledgedResponse deleteSnapshotResponse = clusterManagerClient.admin() .cluster() .prepareDeleteSnapshot(snapshotRepoName, snapshotInfo1.snapshotId().getName()) .get(); @@ -311,23 +302,21 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exceptio // Delete is async. Give time for it assertBusy(() -> { - try { - assertThat(RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath), lessThan(segmentFilesCountAfterDeletingSnapshot1)); - } catch (Exception e) {} + List segmentsPostDeletionOfSnapshot1 = Files.list(segmentsPath).collect(Collectors.toList()); + assertTrue(segmentsPostDeletionOfSnapshot1.size() < segmentsPostSnapshot2.size()); }, 60, TimeUnit.SECONDS); - - assertBusy(() -> { - try { - assertThat(RemoteStoreBaseIntegTestCase.getFileCount(translogPath), lessThan(translogFilesCountBeforeDeletingSnapshot1)); - } catch (Exception e) {} - }, 60, TimeUnit.SECONDS); - + // To uncomment following, we need to handle deletion of generations in translog cleanup flow + // List translogPostDeletionOfSnapshot1 = Files.list(translogPath).collect(Collectors.toList()); + // Delete is async. Give time for it + // assertBusy(() -> assertEquals(translogPostSnapshot2.size() - translogPostSnapshot1.size(), + // translogPostDeletionOfSnapshot1.size()), 60, TimeUnit.SECONDS); } private Settings snapshotV2Settings(Path remoteStoreRepoPath) { Settings settings = Settings.builder() .put(remoteStoreClusterSettings(REMOTE_REPO_NAME, remoteStoreRepoPath)) .put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true) + .put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA.getKey(), false) .build(); return settings; } diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index 72bf07d4b03b2..5be516166803e 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -854,7 +854,7 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException } // Check last fetch status of pinned timestamps. If stale, return. - if (RemoteStoreUtils.isPinnedTimestampStateStale()) { + if (lastNMetadataFilesToKeep != 0 && RemoteStoreUtils.isPinnedTimestampStateStale()) { logger.warn("Skipping remote segment store garbage collection as last fetch of pinned timestamp is stale"); return; } @@ -994,7 +994,8 @@ public static void remoteDirectoryCleanup( String remoteStoreRepoForIndex, String indexUUID, ShardId shardId, - RemoteStorePathStrategy pathStrategy + RemoteStorePathStrategy pathStrategy, + boolean forceClean ) { try { RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = (RemoteSegmentStoreDirectory) remoteDirectoryFactory.newDirectory( @@ -1003,8 +1004,12 @@ public static void remoteDirectoryCleanup( shardId, pathStrategy ); - remoteSegmentStoreDirectory.deleteStaleSegments(0); - remoteSegmentStoreDirectory.deleteIfEmpty(); + if (forceClean) { + remoteSegmentStoreDirectory.delete(); + } else { + remoteSegmentStoreDirectory.deleteStaleSegments(0); + remoteSegmentStoreDirectory.deleteIfEmpty(); + } } catch (Exception e) { staticLogger.error("Exception occurred while deleting directory", e); } @@ -1023,7 +1028,10 @@ private boolean deleteIfEmpty() throws IOException { logger.info("Remote directory still has files, not deleting the path"); return false; } + return delete(); + } + private boolean delete() { try { remoteDataDirectory.delete(); remoteMetadataDirectory.delete(); diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java index 27d34ec0d05af..e61a9606175ee 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java @@ -34,6 +34,7 @@ import java.util.Optional; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BooleanSupplier; import java.util.function.LongConsumer; @@ -61,6 +62,7 @@ public class RemoteFsTimestampAwareTranslog extends RemoteFsTranslog { private final Map> oldFormatMetadataFileGenerationMap; private final Map> oldFormatMetadataFilePrimaryTermMap; private final AtomicLong minPrimaryTermInRemote = new AtomicLong(Long.MAX_VALUE); + private final AtomicBoolean triggerTrimOnMinRemoteGenReferencedChange = new AtomicBoolean(false); public RemoteFsTimestampAwareTranslog( TranslogConfig config, @@ -105,6 +107,11 @@ protected void onDelete() { } } + @Override + protected void onMinRemoteGenReferencedChange() { + triggerTrimOnMinRemoteGenReferencedChange.set(true); + } + @Override public void trimUnreferencedReaders() throws IOException { trimUnreferencedReaders(false, true); @@ -135,14 +142,22 @@ protected void trimUnreferencedReaders(boolean indexDeleted, boolean trimLocal) // This is to ensure that after the permits are acquired during primary relocation, there are no further modification on remote // store. - if (startedPrimarySupplier.getAsBoolean() == false || pauseSync.get()) { + if (indexDeleted == false && (startedPrimarySupplier.getAsBoolean() == false || pauseSync.get())) { return; } // This is to fail fast and avoid listing md files un-necessarily. if (indexDeleted == false && RemoteStoreUtils.isPinnedTimestampStateStale()) { - logger.warn("Skipping remote segment store garbage collection as last fetch of pinned timestamp is stale"); + logger.warn("Skipping remote translog garbage collection as last fetch of pinned timestamp is stale"); + return; + } + + // This code block ensures parity with RemoteFsTranslog. Without this, we will end up making list translog metadata + // call in each invocation of trimUnreferencedReaders + if (indexDeleted == false && triggerTrimOnMinRemoteGenReferencedChange.get() == false) { return; + } else if (triggerTrimOnMinRemoteGenReferencedChange.get()) { + triggerTrimOnMinRemoteGenReferencedChange.set(false); } // Since remote generation deletion is async, this ensures that only one generation deletion happens at a time. @@ -158,7 +173,7 @@ public void onResponse(List blobMetadata) { List metadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList()); try { - if (metadataFiles.size() <= 1) { + if (indexDeleted == false && metadataFiles.size() <= 1) { logger.debug("No stale translog metadata files found"); remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS); return; @@ -166,16 +181,12 @@ public void onResponse(List blobMetadata) { // Check last fetch status of pinned timestamps. If stale, return. if (indexDeleted == false && RemoteStoreUtils.isPinnedTimestampStateStale()) { - logger.warn("Skipping remote segment store garbage collection as last fetch of pinned timestamp is stale"); + logger.warn("Skipping remote translog garbage collection as last fetch of pinned timestamp is stale"); remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS); return; } - List metadataFilesToBeDeleted = getMetadataFilesToBeDeleted( - metadataFiles, - metadataFilePinnedTimestampMap, - logger - ); + List metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(metadataFiles, indexDeleted); // If index is not deleted, make sure to keep latest metadata file if (indexDeleted == false) { @@ -194,10 +205,11 @@ public void onResponse(List blobMetadata) { metadataFilesNotToBeDeleted.removeAll(metadataFilesToBeDeleted); logger.debug(() -> "metadataFilesNotToBeDeleted = " + metadataFilesNotToBeDeleted); + Set generationsToBeDeleted = getGenerationsToBeDeleted( metadataFilesNotToBeDeleted, metadataFilesToBeDeleted, - indexDeleted + indexDeleted ? Long.MAX_VALUE : getMinGenerationToKeepInRemote() ); logger.debug(() -> "generationsToBeDeleted = " + generationsToBeDeleted); @@ -208,7 +220,11 @@ public void onResponse(List blobMetadata) { generationsToBeDeleted, remoteGenerationDeletionPermits::release ); + } else { + remoteGenerationDeletionPermits.release(); + } + if (metadataFilesToBeDeleted.isEmpty() == false) { // Delete stale metadata files translogTransferManager.deleteMetadataFilesAsync( metadataFilesToBeDeleted, @@ -217,11 +233,10 @@ public void onResponse(List blobMetadata) { // Update cache to keep only those metadata files that are not getting deleted oldFormatMetadataFileGenerationMap.keySet().retainAll(metadataFilesNotToBeDeleted); - // Delete stale primary terms deleteStaleRemotePrimaryTerms(metadataFilesNotToBeDeleted); } else { - remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS); + remoteGenerationDeletionPermits.release(); } } catch (Exception e) { remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS); @@ -237,18 +252,16 @@ public void onFailure(Exception e) { translogTransferManager.listTranslogMetadataFilesAsync(listMetadataFilesListener); } + private long getMinGenerationToKeepInRemote() { + return minRemoteGenReferenced - indexSettings().getRemoteTranslogExtraKeep(); + } + // Visible for testing protected Set getGenerationsToBeDeleted( List metadataFilesNotToBeDeleted, List metadataFilesToBeDeleted, - boolean indexDeleted + long minGenerationToKeepInRemote ) throws IOException { - long maxGenerationToBeDeleted = Long.MAX_VALUE; - - if (indexDeleted == false) { - maxGenerationToBeDeleted = minRemoteGenReferenced - 1 - indexSettings().getRemoteTranslogExtraKeep(); - } - Set generationsFromMetadataFilesToBeDeleted = new HashSet<>(); for (String mdFile : metadataFilesToBeDeleted) { Tuple minMaxGen = getMinMaxTranslogGenerationFromMetadataFile(mdFile, translogTransferManager); @@ -262,21 +275,31 @@ protected Set getGenerationsToBeDeleted( Set generationsToBeDeleted = new HashSet<>(); for (long generation : generationsFromMetadataFilesToBeDeleted) { // Check if the generation is not referred by metadata file matching pinned timestamps - if (generation <= maxGenerationToBeDeleted && isGenerationPinned(generation, pinnedGenerations) == false) { + // The check with minGenerationToKeep is redundant but kept as to make sure we don't delete generations + // that are not persisted in remote segment store yet. + if (generation < minGenerationToKeepInRemote && isGenerationPinned(generation, pinnedGenerations) == false) { generationsToBeDeleted.add(generation); } } return generationsToBeDeleted; } - protected List getMetadataFilesToBeDeleted(List metadataFiles) { - return getMetadataFilesToBeDeleted(metadataFiles, metadataFilePinnedTimestampMap, logger); + protected List getMetadataFilesToBeDeleted(List metadataFiles, boolean indexDeleted) { + return getMetadataFilesToBeDeleted( + metadataFiles, + metadataFilePinnedTimestampMap, + getMinGenerationToKeepInRemote(), + indexDeleted, + logger + ); } // Visible for testing protected static List getMetadataFilesToBeDeleted( List metadataFiles, Map metadataFilePinnedTimestampMap, + long minGenerationToKeepInRemote, + boolean indexDeleted, Logger logger ) { Tuple> pinnedTimestampsState = RemoteStorePinnedTimestampService.getPinnedTimestamps(); @@ -312,6 +335,22 @@ protected static List getMetadataFilesToBeDeleted( metadataFilesToBeDeleted.size() ); + if (indexDeleted == false) { + // Filter out metadata files based on minGenerationToKeep + List metadataFilesContainingMinGenerationToKeep = metadataFilesToBeDeleted.stream().filter(md -> { + long maxGeneration = TranslogTransferMetadata.getMaxGenerationFromFileName(md); + return maxGeneration == -1 || maxGeneration >= minGenerationToKeepInRemote; + }).collect(Collectors.toList()); + metadataFilesToBeDeleted.removeAll(metadataFilesContainingMinGenerationToKeep); + + logger.trace( + "metadataFilesContainingMinGenerationToKeep.size = {}, metadataFilesToBeDeleted based on minGenerationToKeep filtering = {}, minGenerationToKeep = {}", + metadataFilesContainingMinGenerationToKeep.size(), + metadataFilesToBeDeleted.size(), + minGenerationToKeepInRemote + ); + } + return metadataFilesToBeDeleted; } @@ -472,50 +511,60 @@ protected static Tuple getMinMaxPrimaryTermFromMetadataFile( } } - public static void cleanup(TranslogTransferManager translogTransferManager) throws IOException { - ActionListener> listMetadataFilesListener = new ActionListener<>() { - @Override - public void onResponse(List blobMetadata) { - List metadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList()); + public static void cleanupOfDeletedIndex(TranslogTransferManager translogTransferManager, boolean forceClean) throws IOException { + if (forceClean) { + translogTransferManager.delete(); + } else { + ActionListener> listMetadataFilesListener = new ActionListener<>() { + @Override + public void onResponse(List blobMetadata) { + List metadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList()); + + try { + if (metadataFiles.isEmpty()) { + staticLogger.debug("No stale translog metadata files found"); + return; + } + List metadataFilesToBeDeleted = getMetadataFilesToBeDeleted( + metadataFiles, + new HashMap<>(), + Long.MAX_VALUE, + true, // This method gets called when the index is no longer present + staticLogger + ); + if (metadataFilesToBeDeleted.isEmpty()) { + staticLogger.debug("No metadata files to delete"); + return; + } + staticLogger.debug(() -> "metadataFilesToBeDeleted = " + metadataFilesToBeDeleted); - try { - if (metadataFiles.isEmpty()) { - staticLogger.debug("No stale translog metadata files found"); - return; - } - List metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(metadataFiles, new HashMap<>(), staticLogger); - if (metadataFilesToBeDeleted.isEmpty()) { - staticLogger.debug("No metadata files to delete"); - return; - } - staticLogger.debug(() -> "metadataFilesToBeDeleted = " + metadataFilesToBeDeleted); + // For all the files that we are keeping, fetch min and max generations + List metadataFilesNotToBeDeleted = new ArrayList<>(metadataFiles); + metadataFilesNotToBeDeleted.removeAll(metadataFilesToBeDeleted); + staticLogger.debug(() -> "metadataFilesNotToBeDeleted = " + metadataFilesNotToBeDeleted); - // For all the files that we are keeping, fetch min and max generations - List metadataFilesNotToBeDeleted = new ArrayList<>(metadataFiles); - metadataFilesNotToBeDeleted.removeAll(metadataFilesToBeDeleted); - staticLogger.debug(() -> "metadataFilesNotToBeDeleted = " + metadataFilesNotToBeDeleted); + // Delete stale metadata files + translogTransferManager.deleteMetadataFilesAsync(metadataFilesToBeDeleted, () -> {}); - // Delete stale metadata files - translogTransferManager.deleteMetadataFilesAsync(metadataFilesToBeDeleted, () -> {}); + // Delete stale primary terms + deleteStaleRemotePrimaryTerms( + metadataFilesNotToBeDeleted, + translogTransferManager, + new HashMap<>(), + new AtomicLong(Long.MAX_VALUE), + staticLogger + ); + } catch (Exception e) { + staticLogger.error("Exception while cleaning up metadata and primary terms", e); + } + } - // Delete stale primary terms - deleteStaleRemotePrimaryTerms( - metadataFilesNotToBeDeleted, - translogTransferManager, - new HashMap<>(), - new AtomicLong(Long.MAX_VALUE), - staticLogger - ); - } catch (Exception e) { + @Override + public void onFailure(Exception e) { staticLogger.error("Exception while cleaning up metadata and primary terms", e); } - } - - @Override - public void onFailure(Exception e) { - staticLogger.error("Exception while cleaning up metadata and primary terms", e); - } - }; - translogTransferManager.listTranslogMetadataFilesAsync(listMetadataFilesListener); + }; + translogTransferManager.listTranslogMetadataFilesAsync(listMetadataFilesListener); + } } } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 812852d107682..85f58f898826f 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -683,12 +683,17 @@ private class RemoteFsTranslogTransferListener implements TranslogTransferListen @Override public void onUploadComplete(TransferSnapshot transferSnapshot) throws IOException { maxRemoteTranslogGenerationUploaded = generation; + long previousMinRemoteGenReferenced = minRemoteGenReferenced; minRemoteGenReferenced = getMinFileGeneration(); + if (previousMinRemoteGenReferenced != minRemoteGenReferenced) { + onMinRemoteGenReferencedChange(); + } logger.debug( - "Successfully uploaded translog for primary term = {}, generation = {}, maxSeqNo = {}", + "Successfully uploaded translog for primary term = {}, generation = {}, maxSeqNo = {}, minRemoteGenReferenced = {}", primaryTerm, generation, - maxSeqNo + maxSeqNo, + minRemoteGenReferenced ); } @@ -702,6 +707,10 @@ public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) thro } } + protected void onMinRemoteGenReferencedChange() { + + } + @Override public long getMinUnreferencedSeqNoInSegments(long minUnrefCheckpointInLastCommit) { return minSeqNoToKeep; diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java index 3b8885055e8f7..7fe3305545085 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java @@ -170,6 +170,16 @@ public static Tuple getMinMaxTranslogGenerationFromFilename(String f } } + public static long getMaxGenerationFromFileName(String filename) { + String[] tokens = filename.split(METADATA_SEPARATOR); + try { + return RemoteStoreUtils.invertLong(tokens[2]); + } catch (Exception e) { + logger.error(() -> new ParameterizedMessage("Exception while getting max generation from: {}", filename), e); + return -1; + } + } + public static Tuple getMinMaxPrimaryTermFromFilename(String filename) { String[] tokens = filename.split(METADATA_SEPARATOR); if (tokens.length < 7) { diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java index 71133615ed056..1448c46583f6a 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java @@ -240,6 +240,10 @@ public void unpinTimestamp(long timestamp, String pinningEntity, ActionListener< } } + public void forceSyncPinnedTimestamps() { + asyncUpdatePinnedTimestampTask.run(); + } + @Override public void close() throws IOException { asyncUpdatePinnedTimestampTask.close(); diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 0292cecc36a81..14c201e819994 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -199,7 +199,6 @@ import static org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm.FNV_1A_COMPOSITE_1; import static org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName; import static org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat.SNAPSHOT_ONLY_FORMAT_PARAMS; -import static org.opensearch.snapshots.SnapshotsService.SNAPSHOT_PINNED_TIMESTAMP_DELIMITER; /** * BlobStore - based implementation of Snapshot Repository @@ -1309,7 +1308,7 @@ private void cleanUpRemoteStoreFilesForDeletedIndicesV2( } // iterate through all the indices and trigger remote store directory cleanup for deleted index segments for (String indexId : uniqueIndexIds) { - cleanRemoteStoreDirectoryIfNeeded(snapshotIds, indexId, repositoryData, remoteSegmentStoreDirectoryFactory); + cleanRemoteStoreDirectoryIfNeeded(snapshotIds, indexId, repositoryData, remoteSegmentStoreDirectoryFactory, false); } afterCleanupsListener.onResponse(null); } catch (Exception e) { @@ -1357,11 +1356,17 @@ private void removeSnapshotPinnedTimestamp( ) { remoteStorePinnedTimestampService.unpinTimestamp( timestampToUnpin, - repository + SNAPSHOT_PINNED_TIMESTAMP_DELIMITER + snapshotId.getUUID(), + SnapshotsService.getPinningEntity(repository, snapshotId.getUUID()), new ActionListener() { @Override public void onResponse(Void unused) { - logger.debug("Timestamp {} unpinned successfully for snapshot {}", timestampToUnpin, snapshotId.getName()); + logger.info("Timestamp {} unpinned successfully for snapshot {}", timestampToUnpin, snapshotId.getName()); + try { + remoteStorePinnedTimestampService.forceSyncPinnedTimestamps(); + logger.debug("Successfully synced pinned timestamp state"); + } catch (Exception e) { + logger.warn("Exception while updating pinning timestamp state, snapshot deletion will continue", e); + } listener.onResponse(null); } @@ -1466,7 +1471,8 @@ public static void remoteDirectoryCleanupAsync( String indexUUID, ShardId shardId, String threadPoolName, - RemoteStorePathStrategy pathStrategy + RemoteStorePathStrategy pathStrategy, + boolean forceClean ) { threadpool.executor(threadPoolName) .execute( @@ -1476,7 +1482,8 @@ public static void remoteDirectoryCleanupAsync( remoteStoreRepoForIndex, indexUUID, shardId, - pathStrategy + pathStrategy, + forceClean ), indexUUID, shardId @@ -1532,7 +1539,8 @@ protected void releaseRemoteStoreLockAndCleanup( indexUUID, new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.parseInt(shardId)), ThreadPool.Names.REMOTE_PURGE, - remoteStoreShardShallowCopySnapshot.getRemoteStorePathStrategy() + remoteStoreShardShallowCopySnapshot.getRemoteStorePathStrategy(), + false ); } } @@ -2095,7 +2103,7 @@ private void executeOneStaleIndexDelete( deleteResult = deleteResult.add(cleanUpStaleSnapshotShardPathsFile(matchingShardPaths, snapshotShardPaths)); if (remoteSegmentStoreDirectoryFactory != null) { - cleanRemoteStoreDirectoryIfNeeded(deletedSnapshots, indexSnId, oldRepoData, remoteSegmentStoreDirectoryFactory); + cleanRemoteStoreDirectoryIfNeeded(deletedSnapshots, indexSnId, oldRepoData, remoteSegmentStoreDirectoryFactory, true); } // Finally, we delete the [base_path]/indexId folder @@ -2167,7 +2175,8 @@ private void cleanRemoteStoreDirectoryIfNeeded( Collection deletedSnapshots, String indexSnId, RepositoryData oldRepoData, - RemoteSegmentStoreDirectoryFactory remoteSegmentStoreDirectoryFactory + RemoteSegmentStoreDirectoryFactory remoteSegmentStoreDirectoryFactory, + boolean forceClean ) { assert (indexSnId != null); @@ -2219,9 +2228,16 @@ private void cleanRemoteStoreDirectoryIfNeeded( prevIndexMetadata.getIndexUUID(), shard, ThreadPool.Names.REMOTE_PURGE, - remoteStorePathStrategy + remoteStorePathStrategy, + forceClean + ); + remoteTranslogCleanupAsync( + remoteTranslogRepository, + shard, + remoteStorePathStrategy, + prevIndexMetadata, + forceClean ); - remoteTranslogCleanupAsync(remoteTranslogRepository, shard, remoteStorePathStrategy, prevIndexMetadata); } } } catch (Exception e) { @@ -2245,7 +2261,8 @@ private void remoteTranslogCleanupAsync( Repository remoteTranslogRepository, ShardId shardId, RemoteStorePathStrategy remoteStorePathStrategy, - IndexMetadata prevIndexMetadata + IndexMetadata prevIndexMetadata, + boolean forceClean ) { assert remoteTranslogRepository instanceof BlobStoreRepository; boolean indexMetadataEnabled = RemoteStoreUtils.determineTranslogMetadataEnabled(prevIndexMetadata); @@ -2262,7 +2279,7 @@ private void remoteTranslogCleanupAsync( indexMetadataEnabled ); try { - RemoteFsTimestampAwareTranslog.cleanup(translogTransferManager); + RemoteFsTimestampAwareTranslog.cleanupOfDeletedIndex(translogTransferManager, forceClean); } catch (IOException e) { logger.error("Exception while cleaning up remote translog for shard: " + shardId, e); } diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java index f6e550525a3e5..23f6deff3715d 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java @@ -613,7 +613,7 @@ private void updateSnapshotPinnedTimestamp( ) { remoteStorePinnedTimestampService.pinTimestamp( timestampToPin, - snapshot.getRepository() + SNAPSHOT_PINNED_TIMESTAMP_DELIMITER + snapshot.getSnapshotId().getUUID(), + getPinningEntity(snapshot.getRepository(), snapshot.getSnapshotId().getUUID()), new ActionListener() { @Override public void onResponse(Void unused) { @@ -631,6 +631,10 @@ public void onFailure(Exception e) { ); } + public static String getPinningEntity(String repositoryName, String snapshotUUID) { + return repositoryName + SNAPSHOT_PINNED_TIMESTAMP_DELIMITER + snapshotUUID; + } + private void cloneSnapshotPinnedTimestamp( RepositoryData repositoryData, SnapshotId sourceSnapshot, @@ -640,8 +644,8 @@ private void cloneSnapshotPinnedTimestamp( ) { remoteStorePinnedTimestampService.cloneTimestamp( timestampToPin, - snapshot.getRepository() + SNAPSHOT_PINNED_TIMESTAMP_DELIMITER + sourceSnapshot.getUUID(), - snapshot.getRepository() + SNAPSHOT_PINNED_TIMESTAMP_DELIMITER + snapshot.getSnapshotId().getUUID(), + getPinningEntity(snapshot.getRepository(), sourceSnapshot.getUUID()), + getPinningEntity(snapshot.getRepository(), snapshot.getSnapshotId().getUUID()), new ActionListener() { @Override public void onResponse(Void unused) { diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index ecd6620dbea15..df3df81361a12 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -565,7 +565,8 @@ public void testCleanupAsync() throws Exception { repositoryName, indexUUID, shardId, - pathStrategy + pathStrategy, + false ); verify(remoteSegmentStoreDirectoryFactory).newDirectory(repositoryName, indexUUID, shardId, pathStrategy); verify(threadPool, times(0)).executor(ThreadPool.Names.REMOTE_PURGE); diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java index 4ec68f7fb75b4..e6871414cf5e0 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java @@ -214,6 +214,7 @@ public void onFailure(Exception e) { // Old format metadata file String oldFormatMdFilename = "metadata__9223372036438563903__9223372036854774799__9223370311919910393__31__1"; assertNull(TranslogTransferMetadata.getMinMaxTranslogGenerationFromFilename(oldFormatMdFilename)); + assertEquals(Long.MAX_VALUE - 9223372036854774799L, TranslogTransferMetadata.getMaxGenerationFromFileName(oldFormatMdFilename)); // Node id containing separator String nodeIdWithSeparator = @@ -221,10 +222,14 @@ public void onFailure(Exception e) { Tuple minMaxGen = TranslogTransferMetadata.getMinMaxTranslogGenerationFromFilename(nodeIdWithSeparator); Long minGen = Long.MAX_VALUE - 9223372036438563958L; assertEquals(minGen, minMaxGen.v1()); + Long maxGen = Long.MAX_VALUE - 9223372036854774799L; + assertEquals(maxGen, minMaxGen.v2()); + assertEquals(Long.MAX_VALUE - 9223372036854774799L, TranslogTransferMetadata.getMaxGenerationFromFileName(nodeIdWithSeparator)); // Malformed md filename String malformedMdFileName = "metadata__9223372036438563903__9223372036854774799__9223370311919910393__node1__xyz__3__1"; assertNull(TranslogTransferMetadata.getMinMaxTranslogGenerationFromFilename(malformedMdFileName)); + assertEquals(Long.MAX_VALUE - 9223372036854774799L, TranslogTransferMetadata.getMaxGenerationFromFileName(malformedMdFileName)); } public void testGetMinMaxPrimaryTermFromFilename() throws Exception { @@ -330,43 +335,60 @@ public void testSimpleOperationsUpload() throws Exception { addToTranslogAndListAndUpload(translog, ops, new Translog.Index("2", 2, primaryTerm.get(), new byte[] { 1 })); addToTranslogAndListAndUpload(translog, ops, new Translog.Index("3", 3, primaryTerm.get(), new byte[] { 1 })); - addToTranslogAndListAndUpload(translog, ops, new Translog.Index("4", 4, primaryTerm.get(), new byte[] { 1 })); - addToTranslogAndListAndUpload(translog, ops, new Translog.Index("5", 5, primaryTerm.get(), new byte[] { 1 })); - addToTranslogAndListAndUpload(translog, ops, new Translog.Index("6", 6, primaryTerm.get(), new byte[] { 1 })); assertBusy(() -> { assertEquals( - 16, + 10, blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size() ); }); - assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); - RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); // Fetch pinned timestamps so that it won't be stale updatePinnedTimstampTask.run(); + translog.setMinSeqNoToKeep(3); + translog.trimUnreferencedReaders(); - translog.setMinSeqNoToKeep(4); + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("4", 4, primaryTerm.get(), new byte[] { 1 })); + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("5", 5, primaryTerm.get(), new byte[] { 1 })); + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("6", 6, primaryTerm.get(), new byte[] { 1 })); + + assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); + // Fetch pinned timestamps so that it won't be stale + updatePinnedTimstampTask.run(); + translog.setMinSeqNoToKeep(6); translog.trimUnreferencedReaders(); + assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); + + assertEquals(1, translog.readers.size()); + assertBusy(() -> { + assertEquals(2, translog.allUploaded().size()); + assertEquals(4, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()); + assertEquals( + 16, + blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size() + ); + }, 30, TimeUnit.SECONDS); + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("7", 7, primaryTerm.get(), new byte[] { 1 })); addToTranslogAndListAndUpload(translog, ops, new Translog.Index("8", 8, primaryTerm.get(), new byte[] { 1 })); - assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); + assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); // Fetch pinned timestamps so that it won't be stale updatePinnedTimstampTask.run(); translog.trimUnreferencedReaders(); - assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); - assertEquals(5, translog.readers.size()); + + assertEquals(3, translog.readers.size()); assertBusy(() -> { - assertEquals(1, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()); - assertEquals(10, translog.allUploaded().size()); + assertEquals(6, translog.allUploaded().size()); + assertEquals(3, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()); assertEquals( - 10, + 12, blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size() ); - }, 60, TimeUnit.SECONDS); + }, 30, TimeUnit.SECONDS); } @Override @@ -397,7 +419,7 @@ public void testMetadataFileDeletion() throws Exception { ); updatePinnedTimstampTask.run(); translog.trimUnreferencedReaders(); - assertBusy(() -> { assertEquals(1, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()); }); + assertBusy(() -> { assertEquals(3, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()); }); } public void testMetadataFileDeletionWithPinnedTimestamps() throws Exception { @@ -568,7 +590,7 @@ public void testDrainSync() throws Exception { assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); assertEquals(1, translog.readers.size()); assertBusy(() -> assertEquals(2, translog.allUploaded().size())); - assertBusy(() -> assertEquals(1, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size())); + assertBusy(() -> assertEquals(2, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size())); } @Override @@ -647,7 +669,7 @@ public void testGetGenerationsToBeDeletedEmptyMetadataFilesNotToBeDeleted() thro Set generations = ((RemoteFsTimestampAwareTranslog) translog).getGenerationsToBeDeleted( metadataFilesNotToBeDeleted, metadataFilesToBeDeleted, - true + Long.MAX_VALUE ); Set md1Generations = LongStream.rangeClosed(4, 7).boxed().collect(Collectors.toSet()); Set md2Generations = LongStream.rangeClosed(17, 37).boxed().collect(Collectors.toSet()); @@ -683,7 +705,7 @@ public void testGetGenerationsToBeDeleted() throws IOException { Set generations = ((RemoteFsTimestampAwareTranslog) translog).getGenerationsToBeDeleted( metadataFilesNotToBeDeleted, metadataFilesToBeDeleted, - true + Long.MAX_VALUE ); Set md1Generations = LongStream.rangeClosed(5, 7).boxed().collect(Collectors.toSet()); Set md2Generations = LongStream.rangeClosed(17, 25).boxed().collect(Collectors.toSet()); @@ -708,7 +730,10 @@ public void testGetMetadataFilesToBeDeletedNoExclusion() { "metadata__9223372036438563903__9223372036854775701__9223370311919910403__31__9223372036854775701__1" ); - assertEquals(metadataFiles, ((RemoteFsTimestampAwareTranslog) translog).getMetadataFilesToBeDeleted(metadataFiles)); + assertEquals( + metadataFiles, + RemoteFsTimestampAwareTranslog.getMetadataFilesToBeDeleted(metadataFiles, new HashMap<>(), Long.MAX_VALUE, false, logger) + ); } public void testGetMetadataFilesToBeDeletedExclusionBasedOnAgeOnly() { @@ -724,7 +749,13 @@ public void testGetMetadataFilesToBeDeletedExclusionBasedOnAgeOnly() { "metadata__9223372036438563903__9223372036854775701__" + md3Timestamp + "__31__9223372036854775701__1" ); - List metadataFilesToBeDeleted = ((RemoteFsTimestampAwareTranslog) translog).getMetadataFilesToBeDeleted(metadataFiles); + List metadataFilesToBeDeleted = RemoteFsTimestampAwareTranslog.getMetadataFilesToBeDeleted( + metadataFiles, + new HashMap<>(), + Long.MAX_VALUE, + false, + logger + ); assertEquals(1, metadataFilesToBeDeleted.size()); assertEquals(metadataFiles.get(0), metadataFilesToBeDeleted.get(0)); } @@ -746,7 +777,13 @@ public void testGetMetadataFilesToBeDeletedExclusionBasedOnPinningOnly() throws "metadata__9223372036438563903__9223372036854775701__" + md3Timestamp + "__31__9223372036854775701__1" ); - List metadataFilesToBeDeleted = ((RemoteFsTimestampAwareTranslog) translog).getMetadataFilesToBeDeleted(metadataFiles); + List metadataFilesToBeDeleted = RemoteFsTimestampAwareTranslog.getMetadataFilesToBeDeleted( + metadataFiles, + new HashMap<>(), + Long.MAX_VALUE, + false, + logger + ); assertEquals(2, metadataFilesToBeDeleted.size()); assertEquals(metadataFiles.get(0), metadataFilesToBeDeleted.get(0)); assertEquals(metadataFiles.get(2), metadataFilesToBeDeleted.get(1)); @@ -769,11 +806,77 @@ public void testGetMetadataFilesToBeDeletedExclusionBasedOnAgeAndPinning() throw "metadata__9223372036438563903__9223372036854775701__" + md3Timestamp + "__31__9223372036854775701__1" ); - List metadataFilesToBeDeleted = ((RemoteFsTimestampAwareTranslog) translog).getMetadataFilesToBeDeleted(metadataFiles); + List metadataFilesToBeDeleted = RemoteFsTimestampAwareTranslog.getMetadataFilesToBeDeleted( + metadataFiles, + new HashMap<>(), + Long.MAX_VALUE, + false, + logger + ); assertEquals(1, metadataFilesToBeDeleted.size()); assertEquals(metadataFiles.get(2), metadataFilesToBeDeleted.get(0)); } + public void testGetMetadataFilesToBeDeletedExclusionBasedOnGenerationOnly() throws IOException { + long currentTimeInMillis = System.currentTimeMillis(); + String md1Timestamp = RemoteStoreUtils.invertLong(currentTimeInMillis - 200000); + String md2Timestamp = RemoteStoreUtils.invertLong(currentTimeInMillis - 300000); + String md3Timestamp = RemoteStoreUtils.invertLong(currentTimeInMillis - 600000); + + when(blobContainer.listBlobs()).thenReturn(Map.of()); + + updatePinnedTimstampTask.run(); + + List metadataFiles = List.of( + // MaxGen 7 + "metadata__9223372036438563903__9223372036854775800__" + md1Timestamp + "__31__9223372036854775106__1", + // MaxGen 12 + "metadata__9223372036438563903__9223372036854775795__" + md2Timestamp + "__31__9223372036854775803__1", + // MaxGen 10 + "metadata__9223372036438563903__9223372036854775798__" + md3Timestamp + "__31__9223372036854775701__1" + ); + + List metadataFilesToBeDeleted = RemoteFsTimestampAwareTranslog.getMetadataFilesToBeDeleted( + metadataFiles, + new HashMap<>(), + 10L, + false, + logger + ); + assertEquals(2, metadataFilesToBeDeleted.size()); + assertEquals(metadataFiles.get(0), metadataFilesToBeDeleted.get(0)); + assertEquals(metadataFiles.get(2), metadataFilesToBeDeleted.get(1)); + } + + public void testGetMetadataFilesToBeDeletedExclusionBasedOnGenerationDeleteIndex() throws IOException { + long currentTimeInMillis = System.currentTimeMillis(); + String md1Timestamp = RemoteStoreUtils.invertLong(currentTimeInMillis - 200000); + String md2Timestamp = RemoteStoreUtils.invertLong(currentTimeInMillis - 300000); + String md3Timestamp = RemoteStoreUtils.invertLong(currentTimeInMillis - 600000); + + when(blobContainer.listBlobs()).thenReturn(Map.of()); + + updatePinnedTimstampTask.run(); + + List metadataFiles = List.of( + // MaxGen 7 + "metadata__9223372036438563903__9223372036854775800__" + md1Timestamp + "__31__9223372036854775106__1", + // MaxGen 12 + "metadata__9223372036438563903__9223372036854775795__" + md2Timestamp + "__31__9223372036854775803__1", + // MaxGen 17 + "metadata__9223372036438563903__9223372036854775790__" + md3Timestamp + "__31__9223372036854775701__1" + ); + + List metadataFilesToBeDeleted = RemoteFsTimestampAwareTranslog.getMetadataFilesToBeDeleted( + metadataFiles, + new HashMap<>(), + 10L, + true, + logger + ); + assertEquals(metadataFiles, metadataFilesToBeDeleted); + } + public void testIsGenerationPinned() { TreeSet> pinnedGenerations = new TreeSet<>(new TreeSet<>((o1, o2) -> { if (Objects.equals(o1.v1(), o2.v1()) == false) {