diff --git a/.github/workflows/dependabot_pr.yml b/.github/workflows/dependabot_pr.yml index e6feb3b852ad0..cfd3ac81c5bd3 100644 --- a/.github/workflows/dependabot_pr.yml +++ b/.github/workflows/dependabot_pr.yml @@ -7,7 +7,7 @@ jobs: permissions: pull-requests: write contents: write - if: ${{ github.actor == 'dependabot[bot]' }} + if: ${{ github.event.pull_request.user.login == 'dependabot[bot]' }} steps: - name: GitHub App token id: github_app_token diff --git a/.github/workflows/pull-request-checks.yml b/.github/workflows/pull-request-checks.yml index a62ea9cfa179b..eec363572478c 100644 --- a/.github/workflows/pull-request-checks.yml +++ b/.github/workflows/pull-request-checks.yml @@ -18,7 +18,7 @@ jobs: runs-on: ubuntu-latest steps: - uses: peternied/check-pull-request-description-checklist@v1.1 - if: github.actor != 'dependabot[bot]' + if: github.event.pull_request.user.login != 'dependabot[bot]' with: checklist-items: | New functionality includes testing. diff --git a/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/ClusterStateIT.java b/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/ClusterStateIT.java new file mode 100644 index 0000000000000..2d606d27a34e0 --- /dev/null +++ b/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/ClusterStateIT.java @@ -0,0 +1,37 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.upgrades; + +import org.opensearch.client.Request; +import org.opensearch.client.Response; + +import java.util.Map; + +public class ClusterStateIT extends AbstractRollingTestCase{ + public void testTemplateMetadataUpgrades() throws Exception { + if (CLUSTER_TYPE == ClusterType.OLD) { + String templateName = "my_template"; + Request putIndexTemplate = new Request("PUT", "_template/" + templateName); + putIndexTemplate.setJsonEntity("{\"index_patterns\": [\"pattern-1\", \"log-*\"]}"); + client().performRequest(putIndexTemplate); + verifyTemplateMetadataInClusterState(); + } else { + verifyTemplateMetadataInClusterState(); + } + } + + @SuppressWarnings("unchecked") + private static void verifyTemplateMetadataInClusterState() throws Exception { + Request request = new Request("GET", "_cluster/state/metadata"); + Response response = client().performRequest(request); + assertOK(response); + Map metadata = (Map) entityAsMap(response).get("metadata"); + assertNotNull(metadata.get("templates")); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java index 3a023670c4abb..56fcf1de38e0f 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java @@ -10,14 +10,28 @@ import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.settings.Settings; import org.opensearch.discovery.DiscoveryStats; import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.test.OpenSearchIntegTestCase; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.Map; +import java.util.function.Function; import java.util.stream.Collectors; +import static org.opensearch.gateway.remote.RemoteClusterStateService.COORDINATION_METADATA; +import static org.opensearch.gateway.remote.RemoteClusterStateService.CUSTOM_METADATA; +import static org.opensearch.gateway.remote.RemoteClusterStateService.DELIMITER; +import static org.opensearch.gateway.remote.RemoteClusterStateService.METADATA_FILE_PREFIX; import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; +import static org.opensearch.gateway.remote.RemoteClusterStateService.SETTING_METADATA; +import static org.opensearch.gateway.remote.RemoteClusterStateService.TEMPLATES_METADATA; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class RemoteClusterStateServiceIT extends RemoteStoreBaseIntegTestCase { @@ -29,6 +43,14 @@ protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true).build(); } + private Map initialTestSetup(int shardCount, int replicaCount, int dataNodeCount, int clusterManagerNodeCount) { + prepareCluster(clusterManagerNodeCount, dataNodeCount, INDEX_NAME, replicaCount, shardCount); + Map indexStats = indexData(1, false, INDEX_NAME); + assertEquals(shardCount * (replicaCount + 1), getNumShards(INDEX_NAME).totalNumShards); + ensureGreen(INDEX_NAME); + return indexStats; + } + public void testRemoteStateStats() { int shardCount = randomIntBetween(1, 2); int replicaCount = 1; @@ -112,6 +134,45 @@ public void testRemoteStateStatsFromAllNodes() { } } + public void testRemoteClusterStateMetadataSplit() throws IOException { + initialTestSetup(1, 0, 1, 1); + + RemoteClusterStateService remoteClusterStateService = internalCluster().getClusterManagerNodeInstance( + RemoteClusterStateService.class + ); + RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class); + BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(REPOSITORY_NAME); + BlobPath globalMetadataPath = repository.basePath() + .add( + Base64.getUrlEncoder() + .withoutPadding() + .encodeToString(getClusterState().getClusterName().value().getBytes(StandardCharsets.UTF_8)) + ) + .add("cluster-state") + .add(getClusterState().metadata().clusterUUID()) + .add("global-metadata"); + + Map metadataFiles = repository.blobStore() + .blobContainer(globalMetadataPath) + .listBlobs() + .keySet() + .stream() + .map(fileName -> { + logger.info(fileName); + return fileName.split(DELIMITER)[0]; + }) + .collect(Collectors.toMap(Function.identity(), key -> 1, Integer::sum)); + + assertTrue(metadataFiles.containsKey(COORDINATION_METADATA)); + assertEquals(1, (int) metadataFiles.get(COORDINATION_METADATA)); + assertTrue(metadataFiles.containsKey(SETTING_METADATA)); + assertEquals(1, (int) metadataFiles.get(SETTING_METADATA)); + assertTrue(metadataFiles.containsKey(TEMPLATES_METADATA)); + assertEquals(1, (int) metadataFiles.get(TEMPLATES_METADATA)); + assertTrue(metadataFiles.keySet().stream().anyMatch(key -> key.startsWith(CUSTOM_METADATA))); + assertFalse(metadataFiles.containsKey(METADATA_FILE_PREFIX)); + } + private void validateNodesStatsResponse(NodesStatsResponse nodesStatsResponse) { // _nodes/stats/discovery must never fail due to any exception assertFalse(nodesStatsResponse.toString().contains("exception")); diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreClusterStateRestoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreClusterStateRestoreIT.java index 3f90732f1f13d..8c8209b80bfd8 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreClusterStateRestoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreClusterStateRestoreIT.java @@ -8,16 +8,30 @@ package org.opensearch.remotestore; +import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsAction; +import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsRequest; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; +import org.opensearch.action.admin.indices.alias.Alias; import org.opensearch.action.admin.indices.datastream.DataStreamRolloverIT; import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest; +import org.opensearch.action.admin.indices.template.put.PutComponentTemplateAction; +import org.opensearch.action.admin.indices.template.put.PutComposableIndexTemplateAction; import org.opensearch.action.admin.indices.template.put.PutIndexTemplateRequest; +import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.block.ClusterBlockException; +import org.opensearch.cluster.metadata.ComponentTemplate; +import org.opensearch.cluster.metadata.ComponentTemplateMetadata; +import org.opensearch.cluster.metadata.ComposableIndexTemplate; +import org.opensearch.cluster.metadata.ComposableIndexTemplateMetadata; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexTemplateMetadata; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.RepositoriesMetadata; +import org.opensearch.cluster.metadata.Template; +import org.opensearch.common.action.ActionFuture; +import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.gateway.remote.ClusterMetadataManifest; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; @@ -29,11 +43,13 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; import static org.opensearch.cluster.coordination.ClusterBootstrapService.INITIAL_CLUSTER_MANAGER_NODES_SETTING; import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_READ_ONLY_SETTING; @@ -46,6 +62,11 @@ @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class RemoteStoreClusterStateRestoreIT extends BaseRemoteStoreRestoreIT { + static final String TEMPLATE_NAME = "remote-store-test-template"; + static final String COMPONENT_TEMPLATE_NAME = "remote-component-template1"; + static final String COMPOSABLE_TEMPLATE_NAME = "remote-composable-template1"; + static final Setting MOCK_SETTING = Setting.simpleString("mock-setting"); + static final String[] EXCLUDED_NODES = { "ex-1", "ex-2" }; @Override protected Settings nodeSettings(int nodeOrdinal) { @@ -87,6 +108,45 @@ public void testFullClusterRestore() throws Exception { Map indexStats = initialTestSetup(shardCount, replicaCount, dataNodeCount, 1); String prevClusterUUID = clusterService().state().metadata().clusterUUID(); long prevClusterStateVersion = clusterService().state().version(); + // Step - 1.1 Add some cluster state elements + ActionFuture response = client().admin() + .indices() + .preparePutTemplate(TEMPLATE_NAME) + .addAlias(new Alias(INDEX_NAME)) + .setPatterns(Arrays.stream(INDEX_NAMES_WILDCARD.split(",")).collect(Collectors.toList())) + .execute(); + assertTrue(response.get().isAcknowledged()); + ActionFuture clusterUpdateSettingsResponse = client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder().put(SETTING_READ_ONLY_SETTING.getKey(), false).build()) + .execute(); + assertTrue(clusterUpdateSettingsResponse.get().isAcknowledged()); + // update coordination metadata + client().execute(AddVotingConfigExclusionsAction.INSTANCE, new AddVotingConfigExclusionsRequest(EXCLUDED_NODES)); + // Add a custom metadata as component index template + ActionFuture componentTemplateResponse = client().execute( + PutComponentTemplateAction.INSTANCE, + new PutComponentTemplateAction.Request(COMPONENT_TEMPLATE_NAME).componentTemplate( + new ComponentTemplate(new Template(Settings.EMPTY, null, Collections.emptyMap()), 1L, Collections.emptyMap()) + ) + ); + assertTrue(componentTemplateResponse.get().isAcknowledged()); + ActionFuture composableTemplateResponse = client().execute( + PutComposableIndexTemplateAction.INSTANCE, + new PutComposableIndexTemplateAction.Request(COMPOSABLE_TEMPLATE_NAME).indexTemplate( + new ComposableIndexTemplate( + Arrays.stream(INDEX_NAMES_WILDCARD.split(",")).collect(Collectors.toList()), + new Template(Settings.EMPTY, null, Collections.emptyMap()), + Collections.singletonList(COMPONENT_TEMPLATE_NAME), + 1L, + 1L, + Collections.emptyMap(), + null + ) + ) + ); + assertTrue(composableTemplateResponse.get().isAcknowledged()); // Step - 2 Replace all nodes in the cluster with new nodes. This ensures new cluster state doesn't have previous index metadata resetCluster(dataNodeCount, clusterManagerNodeCount); @@ -104,7 +164,24 @@ public void testFullClusterRestore() throws Exception { ); validateMetadata(List.of(INDEX_NAME)); verifyRedIndicesAndTriggerRestore(indexStats, INDEX_NAME, true); - + clusterService().state() + .metadata() + .coordinationMetadata() + .getVotingConfigExclusions() + .stream() + .forEach(config -> assertTrue(Arrays.stream(EXCLUDED_NODES).anyMatch(node -> node.equals(config.getNodeId())))); + assertFalse(clusterService().state().metadata().templates().isEmpty()); + assertTrue(clusterService().state().metadata().templates().containsKey(TEMPLATE_NAME)); + assertFalse(clusterService().state().metadata().settings().isEmpty()); + assertFalse(clusterService().state().metadata().settings().getAsBoolean(SETTING_READ_ONLY_SETTING.getKey(), true)); + assertNotNull(clusterService().state().metadata().custom("component_template")); + ComponentTemplateMetadata componentTemplateMetadata = clusterService().state().metadata().custom("component_template"); + assertFalse(componentTemplateMetadata.componentTemplates().isEmpty()); + assertTrue(componentTemplateMetadata.componentTemplates().containsKey(COMPONENT_TEMPLATE_NAME)); + assertNotNull(clusterService().state().metadata().custom("index_template")); + ComposableIndexTemplateMetadata composableIndexTemplate = clusterService().state().metadata().custom("index_template"); + assertFalse(composableIndexTemplate.indexTemplates().isEmpty()); + assertTrue(composableIndexTemplate.indexTemplates().containsKey(COMPOSABLE_TEMPLATE_NAME)); } /** diff --git a/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java b/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java index 59dc86ea28ed6..d016501dd0910 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java @@ -175,6 +175,16 @@ public enum XContentContext { public interface Custom extends NamedDiffable, ToXContentFragment, ClusterState.FeatureAware { EnumSet context(); + + static Custom fromXContent(XContentParser parser, String name) throws IOException { + // handling any Exception is caller's responsibility + return parser.namedObject(Custom.class, name, null); + } + + static Custom fromXContent(XContentParser parser) throws IOException { + String currentFieldName = parser.currentName(); + return fromXContent(parser, currentFieldName); + } } public static final Setting DEFAULT_REPLICA_COUNT_SETTING = Setting.intSetting( @@ -260,7 +270,7 @@ public interface Custom extends NamedDiffable, ToXContentFragment, Clust private final Settings settings; private final DiffableStringMap hashesOfConsistentSettings; private final Map indices; - private final Map templates; + private final TemplatesMetadata templates; private final Map customs; private final transient int totalNumberOfShards; // Transient ? not serializable anyway? @@ -304,7 +314,7 @@ public interface Custom extends NamedDiffable, ToXContentFragment, Clust this.hashesOfConsistentSettings = hashesOfConsistentSettings; this.indices = Collections.unmodifiableMap(indices); this.customs = Collections.unmodifiableMap(customs); - this.templates = Collections.unmodifiableMap(templates); + this.templates = new TemplatesMetadata(templates); int totalNumberOfShards = 0; int totalOpenIndexShards = 0; for (IndexMetadata cursor : indices.values()) { @@ -806,13 +816,17 @@ public Map getIndices() { } public Map templates() { - return this.templates; + return this.templates.getTemplates(); } public Map getTemplates() { return templates(); } + public TemplatesMetadata templatesMetadata() { + return this.templates; + } + public Map componentTemplates() { return Optional.ofNullable((ComponentTemplateMetadata) this.custom(ComponentTemplateMetadata.TYPE)) .map(ComponentTemplateMetadata::componentTemplates) @@ -927,7 +941,7 @@ public Iterator iterator() { } public static boolean isGlobalStateEquals(Metadata metadata1, Metadata metadata2) { - if (!metadata1.coordinationMetadata.equals(metadata2.coordinationMetadata)) { + if (!isCoordinationMetadataEqual(metadata1, metadata2)) { return false; } if (!metadata1.hashesOfConsistentSettings.equals(metadata2.hashesOfConsistentSettings)) { @@ -946,13 +960,29 @@ public static boolean isGlobalStateEquals(Metadata metadata1, Metadata metadata2 * Compares Metadata entities persisted in Remote Store. */ public static boolean isGlobalResourcesMetadataEquals(Metadata metadata1, Metadata metadata2) { - if (!metadata1.persistentSettings.equals(metadata2.persistentSettings)) { + if (!isSettingsMetadataEqual(metadata1, metadata2)) { return false; } - if (!metadata1.templates.equals(metadata2.templates())) { + if (!isTemplatesMetadataEqual(metadata1, metadata2)) { return false; } // Check if any persistent metadata needs to be saved + return isCustomMetadataEqual(metadata1, metadata2); + } + + public static boolean isCoordinationMetadataEqual(Metadata metadata1, Metadata metadata2) { + return metadata1.coordinationMetadata.equals(metadata2.coordinationMetadata); + } + + public static boolean isSettingsMetadataEqual(Metadata metadata1, Metadata metadata2) { + return metadata1.persistentSettings.equals(metadata2.persistentSettings); + } + + public static boolean isTemplatesMetadataEqual(Metadata metadata1, Metadata metadata2) { + return metadata1.templates.equals(metadata2.templates); + } + + public static boolean isCustomMetadataEqual(Metadata metadata1, Metadata metadata2) { int customCount1 = 0; for (Map.Entry cursor : metadata1.customs.entrySet()) { if (cursor.getValue().context().contains(XContentContext.GATEWAY)) { @@ -966,8 +996,7 @@ public static boolean isGlobalResourcesMetadataEquals(Metadata metadata1, Metada customCount2++; } } - if (customCount1 != customCount2) return false; - return true; + return customCount1 == customCount2; } @Override @@ -1016,7 +1045,11 @@ private static class MetadataDiff implements Diff { persistentSettings = after.persistentSettings; hashesOfConsistentSettings = after.hashesOfConsistentSettings.diff(before.hashesOfConsistentSettings); indices = DiffableUtils.diff(before.indices, after.indices, DiffableUtils.getStringKeySerializer()); - templates = DiffableUtils.diff(before.templates, after.templates, DiffableUtils.getStringKeySerializer()); + templates = DiffableUtils.diff( + before.templates.getTemplates(), + after.templates.getTemplates(), + DiffableUtils.getStringKeySerializer() + ); customs = DiffableUtils.diff(before.customs, after.customs, DiffableUtils.getStringKeySerializer(), CUSTOM_VALUE_SERIALIZER); } @@ -1063,7 +1096,7 @@ public Metadata apply(Metadata part) { builder.persistentSettings(persistentSettings); builder.hashesOfConsistentSettings(hashesOfConsistentSettings.apply(part.hashesOfConsistentSettings)); builder.indices(indices.apply(part.indices)); - builder.templates(templates.apply(part.templates)); + builder.templates(templates.apply(part.templates.getTemplates())); builder.customs(customs.apply(part.customs)); return builder.build(); } @@ -1107,10 +1140,7 @@ public void writeTo(StreamOutput out) throws IOException { for (IndexMetadata indexMetadata : this) { indexMetadata.writeTo(out); } - out.writeVInt(templates.size()); - for (final IndexTemplateMetadata cursor : templates.values()) { - cursor.writeTo(out); - } + templates.writeTo(out); // filter out custom states not supported by the other node int numberOfCustoms = 0; for (final Custom cursor : customs.values()) { @@ -1174,7 +1204,7 @@ public Builder(Metadata metadata) { this.hashesOfConsistentSettings = metadata.hashesOfConsistentSettings; this.version = metadata.version; this.indices = new HashMap<>(metadata.indices); - this.templates = new HashMap<>(metadata.templates); + this.templates = new HashMap<>(metadata.templates.getTemplates()); this.customs = new HashMap<>(metadata.customs); this.previousMetadata = metadata; } @@ -1253,6 +1283,11 @@ public Builder templates(Map templates) { return this; } + public Builder templates(TemplatesMetadata templatesMetadata) { + this.templates.putAll(templatesMetadata.getTemplates()); + return this; + } + public Builder put(String name, ComponentTemplate componentTemplate) { Objects.requireNonNull(componentTemplate, "it is invalid to add a null component template: " + name); Map existingTemplates = Optional.ofNullable( @@ -1773,9 +1808,7 @@ public static void toXContent(Metadata metadata, XContentBuilder builder, ToXCon } builder.startObject("templates"); - for (final IndexTemplateMetadata cursor : metadata.templates().values()) { - IndexTemplateMetadata.Builder.toXContentWithTypes(cursor, builder, params); - } + metadata.templatesMetadata().toXContent(builder, params); builder.endObject(); if (context == XContentContext.API) { @@ -1838,12 +1871,10 @@ public static Metadata fromXContent(XContentParser parser) throws IOException { } else if ("hashes_of_consistent_settings".equals(currentFieldName)) { builder.hashesOfConsistentSettings(parser.mapStrings()); } else if ("templates".equals(currentFieldName)) { - while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { - builder.put(IndexTemplateMetadata.Builder.fromXContent(parser, parser.currentName())); - } + builder.templates(TemplatesMetadata.fromXContent(parser)); } else { try { - Custom custom = parser.namedObject(Custom.class, currentFieldName, null); + Custom custom = Custom.fromXContent(parser, currentFieldName); builder.putCustom(custom.getWriteableName(), custom); } catch (NamedObjectNotFoundException ex) { logger.warn("Skipping unknown custom object with type {}", currentFieldName); diff --git a/server/src/main/java/org/opensearch/cluster/metadata/RepositoriesMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/RepositoriesMetadata.java index e3689d046193c..9b52bdd1b16c5 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/RepositoriesMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/RepositoriesMetadata.java @@ -202,6 +202,10 @@ public static RepositoriesMetadata fromXContent(XContentParser parser) throws IO XContentParser.Token token; List repository = new ArrayList<>(); while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.START_OBJECT) { + // move to next token if parsing the whole object + token = parser.nextToken(); + } if (token == XContentParser.Token.FIELD_NAME) { String name = parser.currentName(); if (parser.nextToken() != XContentParser.Token.START_OBJECT) { diff --git a/server/src/main/java/org/opensearch/cluster/metadata/TemplatesMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/TemplatesMetadata.java new file mode 100644 index 0000000000000..6ecc471c5e0ae --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/metadata/TemplatesMetadata.java @@ -0,0 +1,150 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.metadata; + +import org.opensearch.cluster.AbstractDiffable; +import org.opensearch.common.annotation.PublicApi; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Metadata for legacy templates + * + * @opensearch.api + */ +@PublicApi(since = "2.15.0") +public class TemplatesMetadata extends AbstractDiffable implements ToXContentFragment { + public static TemplatesMetadata EMPTY_METADATA = builder().build(); + private final Map templates; + + public TemplatesMetadata() { + this(Collections.emptyMap()); + } + + public TemplatesMetadata(Map templates) { + this.templates = Collections.unmodifiableMap(templates); + } + + public static Builder builder() { + return new Builder(); + } + + public Map getTemplates() { + return this.templates; + } + + public static TemplatesMetadata fromXContent(XContentParser parser) throws IOException { + return Builder.fromXContent(parser); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + Builder.toXContent(this, builder, params); + return builder; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVInt(templates.size()); + for (final IndexTemplateMetadata cursor : templates.values()) { + cursor.writeTo(out); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TemplatesMetadata that = (TemplatesMetadata) o; + + return Objects.equals(templates, that.templates); + } + + @Override + public int hashCode() { + return templates != null ? templates.hashCode() : 0; + } + + /** + * Builder for the templates metadata + * + * @opensearch.api + */ + @PublicApi(since = "2.15.0") + public static class Builder { + private final Map templates; + + public Builder() { + this.templates = new HashMap(); + } + + public Builder(Map templates) { + this.templates = templates; + } + + public Builder put(IndexTemplateMetadata.Builder templateBuilder) { + return put(templateBuilder.build()); + } + + public Builder put(IndexTemplateMetadata template) { + templates.put(template.name(), template); + return this; + } + + public Builder removeTemplate(String templateName) { + templates.remove(templateName); + return this; + } + + public Builder templates(Map templates) { + this.templates.putAll(templates); + return this; + } + + public TemplatesMetadata build() { + return new TemplatesMetadata(templates); + } + + public static void toXContent(TemplatesMetadata templatesMetadata, XContentBuilder builder, Params params) throws IOException { + for (IndexTemplateMetadata cursor : templatesMetadata.getTemplates().values()) { + IndexTemplateMetadata.Builder.toXContentWithTypes(cursor, builder, params); + } + } + + public static TemplatesMetadata fromXContent(XContentParser parser) throws IOException { + Builder builder = new Builder(); + + XContentParser.Token token = parser.currentToken(); + String currentFieldName = parser.currentName(); + if (currentFieldName == null) { + token = parser.nextToken(); + if (token == XContentParser.Token.START_OBJECT) { + // move to the field name + token = parser.nextToken(); + } + currentFieldName = parser.currentName(); + } + if (currentFieldName != null) { + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + builder.put(IndexTemplateMetadata.Builder.fromXContent(parser, parser.currentName())); + } + } + return builder.build(); + } + } +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java index 4725f40076ce2..bf02c73ca560b 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java @@ -16,6 +16,7 @@ import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.xcontent.ConstructingObjectParser; import org.opensearch.core.xcontent.MediaTypeRegistry; +import org.opensearch.core.xcontent.ObjectParser; import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; @@ -23,8 +24,12 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.function.Function; +import java.util.stream.Collectors; /** * Manifest file which contains the details of the uploaded entity metadata @@ -35,6 +40,7 @@ public class ClusterMetadataManifest implements Writeable, ToXContentFragment { public static final int CODEC_V0 = 0; // Older codec version, where we haven't introduced codec versions for manifest. public static final int CODEC_V1 = 1; // In Codec V1 we have introduced global-metadata and codec version in Manifest file. + public static final int CODEC_V2 = 2; // In Codec V2, there are seperate metadata files rather than a single global metadata file. private static final ParseField CLUSTER_TERM_FIELD = new ParseField("cluster_term"); private static final ParseField STATE_VERSION_FIELD = new ParseField("state_version"); @@ -48,6 +54,37 @@ public class ClusterMetadataManifest implements Writeable, ToXContentFragment { private static final ParseField INDICES_FIELD = new ParseField("indices"); private static final ParseField PREVIOUS_CLUSTER_UUID = new ParseField("previous_cluster_uuid"); private static final ParseField CLUSTER_UUID_COMMITTED = new ParseField("cluster_uuid_committed"); + private static final ParseField UPLOADED_COORDINATOR_METADATA = new ParseField("uploaded_coordinator_metadata"); + private static final ParseField UPLOADED_SETTINGS_METADATA = new ParseField("uploaded_settings_metadata"); + private static final ParseField UPLOADED_TEMPLATES_METADATA = new ParseField("uploaded_templates_metadata"); + private static final ParseField UPLOADED_CUSTOM_METADATA = new ParseField("uploaded_custom_metadata"); + + private static ClusterMetadataManifest.Builder manifestV0Builder(Object[] fields) { + return ClusterMetadataManifest.builder() + .clusterTerm(term(fields)) + .stateVersion(version(fields)) + .clusterUUID(clusterUUID(fields)) + .stateUUID(stateUUID(fields)) + .opensearchVersion(opensearchVersion(fields)) + .nodeId(nodeId(fields)) + .committed(committed(fields)) + .codecVersion(CODEC_V0) + .indices(indices(fields)) + .previousClusterUUID(previousClusterUUID(fields)) + .clusterUUIDCommitted(clusterUUIDCommitted(fields)); + } + + private static ClusterMetadataManifest.Builder manifestV1Builder(Object[] fields) { + return manifestV0Builder(fields).codecVersion(codecVersion(fields)).globalMetadataFileName(globalMetadataFileName(fields)); + } + + private static ClusterMetadataManifest.Builder manifestV2Builder(Object[] fields) { + return manifestV0Builder(fields).codecVersion(codecVersion(fields)) + .coordinationMetadata(coordinationMetadata(fields)) + .settingMetadata(settingsMetadata(fields)) + .templatesMetadata(templatesMetadata(fields)) + .customMetadataMap(customMetadata(fields)); + } private static long term(Object[] fields) { return (long) fields[0]; @@ -97,47 +134,44 @@ private static String globalMetadataFileName(Object[] fields) { return (String) fields[11]; } + private static UploadedMetadataAttribute coordinationMetadata(Object[] fields) { + return (UploadedMetadataAttribute) fields[11]; + } + + private static UploadedMetadataAttribute settingsMetadata(Object[] fields) { + return (UploadedMetadataAttribute) fields[12]; + } + + private static UploadedMetadataAttribute templatesMetadata(Object[] fields) { + return (UploadedMetadataAttribute) fields[13]; + } + + private static Map customMetadata(Object[] fields) { + List customs = (List) fields[14]; + return customs.stream().collect(Collectors.toMap(UploadedMetadataAttribute::getAttributeName, Function.identity())); + } + private static final ConstructingObjectParser PARSER_V0 = new ConstructingObjectParser<>( "cluster_metadata_manifest", - fields -> new ClusterMetadataManifest( - term(fields), - version(fields), - clusterUUID(fields), - stateUUID(fields), - opensearchVersion(fields), - nodeId(fields), - committed(fields), - CODEC_V0, - null, - indices(fields), - previousClusterUUID(fields), - clusterUUIDCommitted(fields) - ) + fields -> manifestV0Builder(fields).build() ); private static final ConstructingObjectParser PARSER_V1 = new ConstructingObjectParser<>( "cluster_metadata_manifest", - fields -> new ClusterMetadataManifest( - term(fields), - version(fields), - clusterUUID(fields), - stateUUID(fields), - opensearchVersion(fields), - nodeId(fields), - committed(fields), - codecVersion(fields), - globalMetadataFileName(fields), - indices(fields), - previousClusterUUID(fields), - clusterUUIDCommitted(fields) - ) + fields -> manifestV1Builder(fields).build() ); - private static final ConstructingObjectParser CURRENT_PARSER = PARSER_V1; + private static final ConstructingObjectParser PARSER_V2 = new ConstructingObjectParser<>( + "cluster_metadata_manifest", + fields -> manifestV2Builder(fields).build() + ); + + private static final ConstructingObjectParser CURRENT_PARSER = PARSER_V2; static { declareParser(PARSER_V0, CODEC_V0); declareParser(PARSER_V1, CODEC_V1); + declareParser(PARSER_V2, CODEC_V2); } private static void declareParser(ConstructingObjectParser parser, long codec_version) { @@ -156,14 +190,40 @@ private static void declareParser(ConstructingObjectParser= CODEC_V1) { + if (codec_version == CODEC_V1) { parser.declareInt(ConstructingObjectParser.constructorArg(), CODEC_VERSION_FIELD); parser.declareString(ConstructingObjectParser.constructorArg(), GLOBAL_METADATA_FIELD); + } else if (codec_version >= CODEC_V2) { + parser.declareInt(ConstructingObjectParser.constructorArg(), CODEC_VERSION_FIELD); + parser.declareNamedObject( + ConstructingObjectParser.optionalConstructorArg(), + UploadedMetadataAttribute.PARSER, + UPLOADED_COORDINATOR_METADATA + ); + parser.declareNamedObject( + ConstructingObjectParser.optionalConstructorArg(), + UploadedMetadataAttribute.PARSER, + UPLOADED_SETTINGS_METADATA + ); + parser.declareNamedObject( + ConstructingObjectParser.optionalConstructorArg(), + UploadedMetadataAttribute.PARSER, + UPLOADED_TEMPLATES_METADATA + ); + parser.declareNamedObjects( + ConstructingObjectParser.optionalConstructorArg(), + UploadedMetadataAttribute.PARSER, + UPLOADED_CUSTOM_METADATA + ); } } private final int codecVersion; private final String globalMetadataFileName; + private final UploadedMetadataAttribute uploadedCoordinationMetadata; + private final UploadedMetadataAttribute uploadedSettingsMetadata; + private final UploadedMetadataAttribute uploadedTemplatesMetadata; + private final Map uploadedCustomMetadataMap; private final List indices; private final long clusterTerm; private final long stateVersion; @@ -223,6 +283,29 @@ public String getGlobalMetadataFileName() { return globalMetadataFileName; } + public UploadedMetadataAttribute getCoordinationMetadata() { + return uploadedCoordinationMetadata; + } + + public UploadedMetadataAttribute getSettingsMetadata() { + return uploadedSettingsMetadata; + } + + public UploadedMetadataAttribute getTemplatesMetadata() { + return uploadedTemplatesMetadata; + } + + public Map getCustomMetadataMap() { + return uploadedCustomMetadataMap; + } + + public boolean hasMetadataAttributesFiles() { + return uploadedCoordinationMetadata != null + || uploadedSettingsMetadata != null + || uploadedTemplatesMetadata != null + || !uploadedCustomMetadataMap.isEmpty(); + } + public ClusterMetadataManifest( long clusterTerm, long version, @@ -235,7 +318,11 @@ public ClusterMetadataManifest( String globalMetadataFileName, List indices, String previousClusterUUID, - boolean clusterUUIDCommitted + boolean clusterUUIDCommitted, + UploadedMetadataAttribute uploadedCoordinationMetadata, + UploadedMetadataAttribute uploadedSettingsMetadata, + UploadedMetadataAttribute uploadedTemplatesMetadata, + Map uploadedCustomMetadataMap ) { this.clusterTerm = clusterTerm; this.stateVersion = version; @@ -249,6 +336,12 @@ public ClusterMetadataManifest( this.indices = Collections.unmodifiableList(indices); this.previousClusterUUID = previousClusterUUID; this.clusterUUIDCommitted = clusterUUIDCommitted; + this.uploadedCoordinationMetadata = uploadedCoordinationMetadata; + this.uploadedSettingsMetadata = uploadedSettingsMetadata; + this.uploadedTemplatesMetadata = uploadedTemplatesMetadata; + this.uploadedCustomMetadataMap = Collections.unmodifiableMap( + uploadedCustomMetadataMap != null ? uploadedCustomMetadataMap : new HashMap<>() + ); } public ClusterMetadataManifest(StreamInput in) throws IOException { @@ -262,12 +355,29 @@ public ClusterMetadataManifest(StreamInput in) throws IOException { this.indices = Collections.unmodifiableList(in.readList(UploadedIndexMetadata::new)); this.previousClusterUUID = in.readString(); this.clusterUUIDCommitted = in.readBoolean(); - if (in.getVersion().onOrAfter(Version.V_2_12_0)) { + if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + this.codecVersion = in.readInt(); + this.uploadedCoordinationMetadata = new UploadedMetadataAttribute(in); + this.uploadedSettingsMetadata = new UploadedMetadataAttribute(in); + this.uploadedTemplatesMetadata = new UploadedMetadataAttribute(in); + this.uploadedCustomMetadataMap = Collections.unmodifiableMap( + in.readMap(StreamInput::readString, UploadedMetadataAttribute::new) + ); + this.globalMetadataFileName = null; + } else if (in.getVersion().onOrAfter(Version.V_2_12_0)) { this.codecVersion = in.readInt(); this.globalMetadataFileName = in.readString(); + this.uploadedCoordinationMetadata = null; + this.uploadedSettingsMetadata = null; + this.uploadedTemplatesMetadata = null; + this.uploadedCustomMetadataMap = null; } else { this.codecVersion = CODEC_V0; // Default codec this.globalMetadataFileName = null; + this.uploadedCoordinationMetadata = null; + this.uploadedSettingsMetadata = null; + this.uploadedTemplatesMetadata = null; + this.uploadedCustomMetadataMap = null; } } @@ -297,7 +407,29 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.endArray(); builder.field(PREVIOUS_CLUSTER_UUID.getPreferredName(), getPreviousClusterUUID()); builder.field(CLUSTER_UUID_COMMITTED.getPreferredName(), isClusterUUIDCommitted()); - if (onOrAfterCodecVersion(CODEC_V1)) { + if (onOrAfterCodecVersion(CODEC_V2)) { + builder.field(CODEC_VERSION_FIELD.getPreferredName(), getCodecVersion()); + if (getCoordinationMetadata() != null) { + builder.startObject(UPLOADED_COORDINATOR_METADATA.getPreferredName()); + getCoordinationMetadata().toXContent(builder, params); + builder.endObject(); + } + if (getSettingsMetadata() != null) { + builder.startObject(UPLOADED_SETTINGS_METADATA.getPreferredName()); + getSettingsMetadata().toXContent(builder, params); + builder.endObject(); + } + if (getTemplatesMetadata() != null) { + builder.startObject(UPLOADED_TEMPLATES_METADATA.getPreferredName()); + getTemplatesMetadata().toXContent(builder, params); + builder.endObject(); + } + builder.startObject(UPLOADED_CUSTOM_METADATA.getPreferredName()); + for (UploadedMetadataAttribute attribute : getCustomMetadataMap().values()) { + attribute.toXContent(builder, params); + } + builder.endObject(); + } else if (onOrAfterCodecVersion(CODEC_V1)) { builder.field(CODEC_VERSION_FIELD.getPreferredName(), getCodecVersion()); builder.field(GLOBAL_METADATA_FIELD.getPreferredName(), getGlobalMetadataFileName()); } @@ -316,7 +448,13 @@ public void writeTo(StreamOutput out) throws IOException { out.writeCollection(indices); out.writeString(previousClusterUUID); out.writeBoolean(clusterUUIDCommitted); - if (out.getVersion().onOrAfter(Version.V_2_12_0)) { + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + out.writeInt(codecVersion); + uploadedCoordinationMetadata.writeTo(out); + uploadedSettingsMetadata.writeTo(out); + uploadedTemplatesMetadata.writeTo(out); + out.writeMap(uploadedCustomMetadataMap, StreamOutput::writeString, (o, v) -> v.writeTo(o)); + } else if (out.getVersion().onOrAfter(Version.V_2_12_0)) { out.writeInt(codecVersion); out.writeString(globalMetadataFileName); } @@ -376,6 +514,10 @@ public static ClusterMetadataManifest fromXContentV0(XContentParser parser) thro return PARSER_V0.parse(parser, null); } + public static ClusterMetadataManifest fromXContentV1(XContentParser parser) throws IOException { + return PARSER_V1.parse(parser, null); + } + public static ClusterMetadataManifest fromXContent(XContentParser parser) throws IOException { return CURRENT_PARSER.parse(parser, null); } @@ -388,6 +530,10 @@ public static ClusterMetadataManifest fromXContent(XContentParser parser) throws public static class Builder { private String globalMetadataFileName; + private UploadedMetadataAttribute coordinationMetadata; + private UploadedMetadataAttribute settingsMetadata; + private UploadedMetadataAttribute templatesMetadata; + private Map customMetadataMap; private int codecVersion; private List indices; private long clusterTerm; @@ -415,6 +561,31 @@ public Builder globalMetadataFileName(String globalMetadataFileName) { return this; } + public Builder coordinationMetadata(UploadedMetadataAttribute coordinationMetadata) { + this.coordinationMetadata = coordinationMetadata; + return this; + } + + public Builder settingMetadata(UploadedMetadataAttribute settingsMetadata) { + this.settingsMetadata = settingsMetadata; + return this; + } + + public Builder templatesMetadata(UploadedMetadataAttribute templatesMetadata) { + this.templatesMetadata = templatesMetadata; + return this; + } + + public Builder customMetadataMap(Map customMetadataMap) { + this.customMetadataMap = customMetadataMap; + return this; + } + + public Builder put(String custom, UploadedMetadataAttribute customMetadata) { + this.customMetadataMap.put(custom, customMetadata); + return this; + } + public Builder clusterTerm(long clusterTerm) { this.clusterTerm = clusterTerm; return this; @@ -466,6 +637,7 @@ public Builder clusterUUIDCommitted(boolean clusterUUIDCommitted) { public Builder() { indices = new ArrayList<>(); + customMetadataMap = new HashMap<>(); } public Builder(ClusterMetadataManifest manifest) { @@ -477,6 +649,10 @@ public Builder(ClusterMetadataManifest manifest) { this.nodeId = manifest.nodeId; this.committed = manifest.committed; this.globalMetadataFileName = manifest.globalMetadataFileName; + this.coordinationMetadata = manifest.uploadedCoordinationMetadata; + this.settingsMetadata = manifest.uploadedSettingsMetadata; + this.templatesMetadata = manifest.uploadedTemplatesMetadata; + this.customMetadataMap = manifest.uploadedCustomMetadataMap; this.codecVersion = manifest.codecVersion; this.indices = new ArrayList<>(manifest.indices); this.previousClusterUUID = manifest.previousClusterUUID; @@ -496,18 +672,41 @@ public ClusterMetadataManifest build() { globalMetadataFileName, indices, previousClusterUUID, - clusterUUIDCommitted + clusterUUIDCommitted, + coordinationMetadata, + settingsMetadata, + templatesMetadata, + customMetadataMap ); } } + /** + * Interface representing uploaded metadata + */ + public interface UploadedMetadata { + /** + * Gets the component or part of the system this upload belongs to. + * + * @return A string identifying the component + */ + String getComponent(); + + /** + * Gets the name of the file that was uploaded + * + * @return The name of the uploaded file as a string + */ + String getUploadedFilename(); + } + /** * Metadata for uploaded index metadata * * @opensearch.internal */ - public static class UploadedIndexMetadata implements Writeable, ToXContentFragment { + public static class UploadedIndexMetadata implements UploadedMetadata, Writeable, ToXContentFragment { private static final ParseField INDEX_NAME_FIELD = new ParseField("index_name"); private static final ParseField INDEX_UUID_FIELD = new ParseField("index_uuid"); @@ -536,6 +735,7 @@ private static String uploadedFilename(Object[] fields) { PARSER.declareString(ConstructingObjectParser.constructorArg(), UPLOADED_FILENAME_FIELD); } + static final String COMPONENT_PREFIX = "index--"; private final String indexName; private final String indexUUID; private final String uploadedFilename; @@ -556,6 +756,11 @@ public String getUploadedFilePath() { return uploadedFilename; } + @Override + public String getComponent() { + return COMPONENT_PREFIX + getIndexName(); + } + public String getUploadedFilename() { String[] splitPath = uploadedFilename.split("/"); return splitPath[splitPath.length - 1]; @@ -613,4 +818,83 @@ public static UploadedIndexMetadata fromXContent(XContentParser parser) throws I return PARSER.parse(parser, null); } } + + /** + * Metadata for uploaded metadata attribute + * + * @opensearch.internal + */ + public static class UploadedMetadataAttribute implements UploadedMetadata, Writeable, ToXContentFragment { + private static final ParseField UPLOADED_FILENAME_FIELD = new ParseField("uploaded_filename"); + + private static final ObjectParser.NamedObjectParser PARSER; + + static { + ConstructingObjectParser innerParser = new ConstructingObjectParser<>( + "uploaded_metadata_attribute", + true, + (Object[] parsedObject, String name) -> { + String uploadedFilename = (String) parsedObject[0]; + return new UploadedMetadataAttribute(name, uploadedFilename); + } + ); + innerParser.declareString(ConstructingObjectParser.constructorArg(), UPLOADED_FILENAME_FIELD); + PARSER = ((p, c, name) -> innerParser.parse(p, name)); + } + + private final String attributeName; + private final String uploadedFilename; + + public UploadedMetadataAttribute(String attributeName, String uploadedFilename) { + this.attributeName = attributeName; + this.uploadedFilename = uploadedFilename; + } + + public UploadedMetadataAttribute(StreamInput in) throws IOException { + this.attributeName = in.readString(); + this.uploadedFilename = in.readString(); + } + + public String getAttributeName() { + return attributeName; + } + + @Override + public String getComponent() { + return getAttributeName(); + } + + public String getUploadedFilename() { + return uploadedFilename; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(attributeName); + out.writeString(uploadedFilename); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return builder.startObject(getAttributeName()) + .field(UPLOADED_FILENAME_FIELD.getPreferredName(), getUploadedFilename()) + .endObject(); + } + + public static UploadedMetadataAttribute fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null, parser.currentName()); + } + + @Override + public String toString() { + return "UploadedMetadataAttribute{" + + "attributeName='" + + attributeName + + '\'' + + ", uploadedFilename='" + + uploadedFilename + + '\'' + + '}'; + } + } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 237f1f7a6abf2..9bf1dff2359ca 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -14,9 +14,12 @@ import org.opensearch.Version; import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.coordination.CoordinationMetadata; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.metadata.TemplatesMetadata; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.CheckedRunnable; import org.opensearch.common.Nullable; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobMetadata; @@ -29,9 +32,9 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.action.ActionListener; -import org.opensearch.core.index.Index; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; +import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.node.Node; @@ -49,6 +52,7 @@ import java.util.Base64; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; @@ -63,6 +67,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; +import static java.util.Objects.requireNonNull; import static org.opensearch.gateway.PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled; @@ -78,6 +83,7 @@ public class RemoteClusterStateService implements Closeable { public static final String METADATA_MANIFEST_NAME_FORMAT = "%s"; public static final String DELIMITER = "__"; + public static final String CUSTOM_DELIMITER = "--"; private static final Logger logger = LogManager.getLogger(RemoteClusterStateService.class); @@ -120,6 +126,30 @@ public class RemoteClusterStateService implements Closeable { Metadata::fromXContent ); + public static final ChecksumBlobStoreFormat COORDINATION_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( + "coordination", + METADATA_NAME_FORMAT, + CoordinationMetadata::fromXContent + ); + + public static final ChecksumBlobStoreFormat SETTINGS_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( + "settings", + METADATA_NAME_FORMAT, + Settings::fromXContent + ); + + public static final ChecksumBlobStoreFormat TEMPLATES_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( + "templates", + METADATA_NAME_FORMAT, + TemplatesMetadata::fromXContent + ); + + public static final ChecksumBlobStoreFormat CUSTOM_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( + "custom", + METADATA_NAME_FORMAT, + Metadata.Custom::fromXContent + ); + /** * Manifest format compatible with older codec v0, where codec version was missing. */ @@ -127,7 +157,13 @@ public class RemoteClusterStateService implements Closeable { new ChecksumBlobStoreFormat<>("cluster-metadata-manifest", METADATA_MANIFEST_NAME_FORMAT, ClusterMetadataManifest::fromXContentV0); /** - * Manifest format compatible with codec v1, where we introduced codec versions/global metadata. + * Manifest format compatible with older codec v1, where codec versions/global metadata was introduced. + */ + public static final ChecksumBlobStoreFormat CLUSTER_METADATA_MANIFEST_FORMAT_V1 = + new ChecksumBlobStoreFormat<>("cluster-metadata-manifest", METADATA_MANIFEST_NAME_FORMAT, ClusterMetadataManifest::fromXContentV1); + + /** + * Manifest format compatible with codec v2, where global metadata file is replaced with multiple metadata attribute files */ public static final ChecksumBlobStoreFormat CLUSTER_METADATA_MANIFEST_FORMAT = new ChecksumBlobStoreFormat<>( "cluster-metadata-manifest", @@ -151,6 +187,10 @@ public class RemoteClusterStateService implements Closeable { public static final String MANIFEST_PATH_TOKEN = "manifest"; public static final String MANIFEST_FILE_PREFIX = "manifest"; public static final String METADATA_FILE_PREFIX = "metadata"; + public static final String COORDINATION_METADATA = "coordination"; + public static final String SETTING_METADATA = "settings"; + public static final String TEMPLATES_METADATA = "templates"; + public static final String CUSTOM_METADATA = "custom"; public static final int SPLITED_MANIFEST_FILE_LENGTH = 6; // file name manifest__term__version__C/P__timestamp__codecversion private final String nodeId; @@ -168,9 +208,13 @@ public class RemoteClusterStateService implements Closeable { private volatile TimeValue metadataManifestUploadTimeout; private RemoteClusterStateCleanupManager remoteClusterStateCleanupManager; private final RemotePersistenceStats remoteStateStats; + private final String CLUSTER_STATE_UPLOAD_TIME_LOG_STRING = "writing cluster state for version [{}] took [{}ms]"; + private final String METADATA_UPDATE_LOG_STRING = "wrote metadata for [{}] indices and skipped [{}] unchanged " + + "indices, coordination metadata updated : [{}], settings metadata updated : [{}], templates metadata " + + "updated : [{}], custom metadata updated : [{}]"; public static final int INDEX_METADATA_CURRENT_CODEC_VERSION = 1; - public static final int MANIFEST_CURRENT_CODEC_VERSION = ClusterMetadataManifest.CODEC_V1; - public static final int GLOBAL_METADATA_CURRENT_CODEC_VERSION = 1; + public static final int MANIFEST_CURRENT_CODEC_VERSION = ClusterMetadataManifest.CODEC_V2; + public static final int GLOBAL_METADATA_CURRENT_CODEC_VERSION = 2; // ToXContent Params with gateway mode. // We are using gateway context mode to persist all custom metadata. @@ -225,22 +269,23 @@ public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState, Stri return null; } - // TODO: we can upload global metadata and index metadata in parallel. [issue: #10645] - // Write globalMetadata - String globalMetadataFile = writeGlobalMetadata(clusterState); - - List toUpload = new ArrayList<>(clusterState.metadata().indices().values()); - // any validations before/after upload ? - final List allUploadedIndexMetadata = writeIndexMetadataParallel( + UploadedMetadataResults uploadedMetadataResults = writeMetadataInParallel( clusterState, - toUpload, - Collections.emptyMap() + new ArrayList<>(clusterState.metadata().indices().values()), + Collections.emptyMap(), + clusterState.metadata().customs(), + true, + true, + true ); final ClusterMetadataManifest manifest = uploadManifest( clusterState, - allUploadedIndexMetadata, + uploadedMetadataResults.uploadedIndexMetadata, previousClusterUUID, - globalMetadataFile, + uploadedMetadataResults.uploadedCoordinationMetadata, + uploadedMetadataResults.uploadedSettingsMetadata, + uploadedMetadataResults.uploadedTemplatesMetadata, + uploadedMetadataResults.uploadedCustomMetadataMap, false ); final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos); @@ -251,13 +296,13 @@ public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState, Stri "writing cluster state took [{}ms] which is above the warn threshold of [{}]; " + "wrote full state with [{}] indices", durationMillis, slowWriteLoggingThreshold, - allUploadedIndexMetadata.size() + uploadedMetadataResults.uploadedIndexMetadata.size() ); } else { logger.info( "writing cluster state took [{}ms]; " + "wrote full state with [{}] indices and global metadata", durationMillis, - allUploadedIndexMetadata.size() + uploadedMetadataResults.uploadedIndexMetadata.size() ); } return manifest; @@ -283,26 +328,15 @@ public ClusterMetadataManifest writeIncrementalMetadata( } assert previousClusterState.metadata().coordinationMetadata().term() == clusterState.metadata().coordinationMetadata().term(); - // Write Global Metadata - final boolean updateGlobalMetadata = Metadata.isGlobalStateEquals( - previousClusterState.metadata(), - clusterState.metadata() - ) == false; - String globalMetadataFile; - // For migration case from codec V0 to V1, we have added null check on global metadata file, - // If file is empty and codec is 1 then write global metadata. - if (updateGlobalMetadata || previousManifest.getGlobalMetadataFileName() == null) { - globalMetadataFile = writeGlobalMetadata(clusterState); - } else { - logger.debug("Global metadata has not updated in cluster state, skipping upload of it"); - globalMetadataFile = previousManifest.getGlobalMetadataFileName(); + final Map customsToBeDeletedFromRemote = new HashMap<>(previousManifest.getCustomMetadataMap()); + final Map customsToUpload = getUpdatedCustoms(clusterState, previousClusterState); + final Map allUploadedCustomMap = new HashMap<>(previousManifest.getCustomMetadataMap()); + for (final String custom : clusterState.metadata().customs().keySet()) { + // remove all the customs which are present currently + customsToBeDeletedFromRemote.remove(custom); } - // Write Index Metadata - final Map previousStateIndexMetadataByName = new HashMap<>(); - for (final IndexMetadata indexMetadata : previousClusterState.metadata().indices().values()) { - previousStateIndexMetadataByName.put(indexMetadata.getIndex().getName(), indexMetadata); - } + final Map indicesToBeDeletedFromRemote = new HashMap<>(previousClusterState.metadata().indices()); int numIndicesUpdated = 0; int numIndicesUnchanged = 0; @@ -315,7 +349,7 @@ public ClusterMetadataManifest writeIncrementalMetadata( Map prevIndexMetadataByName = new HashMap<>(); for (final IndexMetadata indexMetadata : clusterState.metadata().indices().values()) { String indexName = indexMetadata.getIndex().getName(); - final IndexMetadata prevIndexMetadata = previousStateIndexMetadataByName.get(indexName); + final IndexMetadata prevIndexMetadata = indicesToBeDeletedFromRemote.get(indexName); Long previousVersion = prevIndexMetadata != null ? prevIndexMetadata.getVersion() : null; if (previousVersion == null || indexMetadata.getVersion() != previousVersion) { logger.debug( @@ -330,137 +364,107 @@ public ClusterMetadataManifest writeIncrementalMetadata( } else { numIndicesUnchanged++; } - previousStateIndexMetadataByName.remove(indexMetadata.getIndex().getName()); + // index present in current cluster state + indicesToBeDeletedFromRemote.remove(indexMetadata.getIndex().getName()); } + UploadedMetadataResults uploadedMetadataResults; + // For migration case from codec V0 or V1 to V2, we have added null check on metadata attribute files, + // If file is empty and codec is 1 then write global metadata. + boolean firstUploadForSplitGlobalMetadata = !previousManifest.hasMetadataAttributesFiles(); + boolean updateCoordinationMetadata = firstUploadForSplitGlobalMetadata + || Metadata.isCoordinationMetadataEqual(previousClusterState.metadata(), clusterState.metadata()) == false; + ; + boolean updateSettingsMetadata = firstUploadForSplitGlobalMetadata + || Metadata.isSettingsMetadataEqual(previousClusterState.metadata(), clusterState.metadata()) == false; + boolean updateTemplatesMetadata = firstUploadForSplitGlobalMetadata + || Metadata.isTemplatesMetadataEqual(previousClusterState.metadata(), clusterState.metadata()) == false; + + uploadedMetadataResults = writeMetadataInParallel( + clusterState, + toUpload, + prevIndexMetadataByName, + firstUploadForSplitGlobalMetadata ? clusterState.metadata().customs() : customsToUpload, + updateCoordinationMetadata, + updateSettingsMetadata, + updateTemplatesMetadata + ); - List uploadedIndexMetadataList = writeIndexMetadataParallel(clusterState, toUpload, prevIndexMetadataByName); - uploadedIndexMetadataList.forEach( + // update the map if the metadata was uploaded + uploadedMetadataResults.uploadedIndexMetadata.forEach( uploadedIndexMetadata -> allUploadedIndexMetadata.put(uploadedIndexMetadata.getIndexName(), uploadedIndexMetadata) ); + allUploadedCustomMap.putAll(uploadedMetadataResults.uploadedCustomMetadataMap); + // remove the data for removed custom/indices + customsToBeDeletedFromRemote.keySet().forEach(allUploadedCustomMap::remove); + indicesToBeDeletedFromRemote.keySet().forEach(allUploadedIndexMetadata::remove); - for (String removedIndexName : previousStateIndexMetadataByName.keySet()) { - allUploadedIndexMetadata.remove(removedIndexName); - } final ClusterMetadataManifest manifest = uploadManifest( clusterState, new ArrayList<>(allUploadedIndexMetadata.values()), previousManifest.getPreviousClusterUUID(), - globalMetadataFile, + updateCoordinationMetadata ? uploadedMetadataResults.uploadedCoordinationMetadata : previousManifest.getCoordinationMetadata(), + updateSettingsMetadata ? uploadedMetadataResults.uploadedSettingsMetadata : previousManifest.getSettingsMetadata(), + updateTemplatesMetadata ? uploadedMetadataResults.uploadedTemplatesMetadata : previousManifest.getTemplatesMetadata(), + firstUploadForSplitGlobalMetadata || !customsToUpload.isEmpty() + ? allUploadedCustomMap + : previousManifest.getCustomMetadataMap(), false ); final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos); remoteStateStats.stateSucceeded(); remoteStateStats.stateTook(durationMillis); + ParameterizedMessage clusterStateUploadTimeMessage = new ParameterizedMessage( + CLUSTER_STATE_UPLOAD_TIME_LOG_STRING, + manifest.getStateVersion(), + durationMillis + ); + ParameterizedMessage metadataUpdateMessage = new ParameterizedMessage( + METADATA_UPDATE_LOG_STRING, + numIndicesUpdated, + numIndicesUnchanged, + updateCoordinationMetadata, + updateSettingsMetadata, + updateTemplatesMetadata, + customsToUpload.size() + ); if (durationMillis >= slowWriteLoggingThreshold.getMillis()) { logger.warn( - "writing cluster state took [{}ms] which is above the warn threshold of [{}]; " - + "wrote metadata for [{}] indices and skipped [{}] unchanged indices, global metadata updated : [{}]", - durationMillis, + "{} which is above the warn threshold of [{}]; {}", + clusterStateUploadTimeMessage, slowWriteLoggingThreshold, - numIndicesUpdated, - numIndicesUnchanged, - updateGlobalMetadata + metadataUpdateMessage ); } else { - logger.info( - "writing cluster state for version [{}] took [{}ms]; " - + "wrote metadata for [{}] indices and skipped [{}] unchanged indices, global metadata updated : [{}]", - manifest.getStateVersion(), - durationMillis, - numIndicesUpdated, - numIndicesUnchanged, - updateGlobalMetadata - ); + logger.info("{}; {}", clusterStateUploadTimeMessage, metadataUpdateMessage); } return manifest; } - /** - * Uploads provided ClusterState's global Metadata to remote store in parallel. - * The call is blocking so the method waits for upload to finish and then return. - * - * @param clusterState current ClusterState - * @return String file name where globalMetadata file is stored. - */ - private String writeGlobalMetadata(ClusterState clusterState) throws IOException { - - AtomicReference result = new AtomicReference(); - AtomicReference exceptionReference = new AtomicReference(); - - final BlobContainer globalMetadataContainer = globalMetadataContainer( - clusterState.getClusterName().value(), - clusterState.metadata().clusterUUID() - ); - final String globalMetadataFilename = globalMetadataFileName(clusterState.metadata()); - - // latch to wait until upload is not finished - CountDownLatch latch = new CountDownLatch(1); - - LatchedActionListener completionListener = new LatchedActionListener<>(ActionListener.wrap(resp -> { - logger.trace(String.format(Locale.ROOT, "GlobalMetadata uploaded successfully.")); - result.set(globalMetadataContainer.path().buildAsString() + globalMetadataFilename); - }, ex -> { exceptionReference.set(ex); }), latch); - - GLOBAL_METADATA_FORMAT.writeAsyncWithUrgentPriority( - clusterState.metadata(), - globalMetadataContainer, - globalMetadataFilename, - blobStoreRepository.getCompressor(), - completionListener, - FORMAT_PARAMS - ); - - try { - if (latch.await(getGlobalMetadataUploadTimeout().millis(), TimeUnit.MILLISECONDS) == false) { - // TODO: We should add metrics where transfer is timing out. [Issue: #10687] - RemoteStateTransferException ex = new RemoteStateTransferException( - String.format(Locale.ROOT, "Timed out waiting for transfer of global metadata to complete") - ); - throw ex; - } - } catch (InterruptedException ex) { - RemoteStateTransferException exception = new RemoteStateTransferException( - String.format(Locale.ROOT, "Timed out waiting for transfer of global metadata to complete - %s"), - ex - ); - Thread.currentThread().interrupt(); - throw exception; - } - if (exceptionReference.get() != null) { - throw new RemoteStateTransferException(exceptionReference.get().getMessage(), exceptionReference.get()); - } - return result.get(); - } - - /** - * Uploads provided IndexMetadata's to remote store in parallel. The call is blocking so the method waits for upload to finish and then return. - * - * @param clusterState current ClusterState - * @param toUpload list of IndexMetadata to upload - * @return {@code List} list of IndexMetadata uploaded to remote - */ - private List writeIndexMetadataParallel( + private UploadedMetadataResults writeMetadataInParallel( ClusterState clusterState, - List toUpload, - Map prevIndexMetadataByName + List indexToUpload, + Map prevIndexMetadataByName, + Map customToUpload, + boolean uploadCoordinationMetadata, + boolean uploadSettingsMetadata, + boolean uploadTemplateMetadata ) throws IOException { assert Objects.nonNull(indexMetadataUploadListeners) : "indexMetadataUploadListeners can not be null"; - int latchCount = toUpload.size() + indexMetadataUploadListeners.size(); - List exceptionList = Collections.synchronizedList(new ArrayList<>(latchCount)); - final CountDownLatch latch = new CountDownLatch(latchCount); - List result = new ArrayList<>(toUpload.size()); - - LatchedActionListener latchedActionListener = new LatchedActionListener<>( - ActionListener.wrap((UploadedIndexMetadata uploadedIndexMetadata) -> { - logger.trace( - String.format(Locale.ROOT, "IndexMetadata uploaded successfully for %s", uploadedIndexMetadata.getIndexName()) - ); - result.add(uploadedIndexMetadata); + int totalUploadTasks = indexToUpload.size() + indexMetadataUploadListeners.size() + customToUpload.size() + + (uploadCoordinationMetadata ? 1 : 0) + (uploadSettingsMetadata ? 1 : 0) + (uploadTemplateMetadata ? 1 : 0); + CountDownLatch latch = new CountDownLatch(totalUploadTasks); + Map> uploadTasks = new HashMap<>(totalUploadTasks); + Map results = new HashMap<>(totalUploadTasks); + List exceptionList = Collections.synchronizedList(new ArrayList<>(totalUploadTasks)); + + LatchedActionListener listener = new LatchedActionListener<>( + ActionListener.wrap((ClusterMetadataManifest.UploadedMetadata uploadedMetadata) -> { + logger.trace(String.format(Locale.ROOT, "Metadata component %s uploaded successfully.", uploadedMetadata.getComponent())); + results.put(uploadedMetadata.getComponent(), uploadedMetadata); }, ex -> { - assert ex instanceof RemoteStateTransferException; logger.error( - () -> new ParameterizedMessage("Exception during transfer of IndexMetadata to Remote {}", ex.getMessage()), + () -> new ParameterizedMessage("Exception during transfer of Metadata Fragment to Remote {}", ex.getMessage()), ex ); exceptionList.add(ex); @@ -468,20 +472,68 @@ private List writeIndexMetadataParallel( latch ); - for (IndexMetadata indexMetadata : toUpload) { - // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX/metadata_4_1690947200 - writeIndexMetadataAsync(clusterState, indexMetadata, latchedActionListener); + if (uploadSettingsMetadata) { + uploadTasks.put( + SETTING_METADATA, + getAsyncMetadataWriteAction( + clusterState, + SETTING_METADATA, + SETTINGS_METADATA_FORMAT, + clusterState.metadata().persistentSettings(), + listener + ) + ); + } + if (uploadCoordinationMetadata) { + uploadTasks.put( + COORDINATION_METADATA, + getAsyncMetadataWriteAction( + clusterState, + COORDINATION_METADATA, + COORDINATION_METADATA_FORMAT, + clusterState.metadata().coordinationMetadata(), + listener + ) + ); + } + if (uploadTemplateMetadata) { + uploadTasks.put( + TEMPLATES_METADATA, + getAsyncMetadataWriteAction( + clusterState, + TEMPLATES_METADATA, + TEMPLATES_METADATA_FORMAT, + clusterState.metadata().templatesMetadata(), + listener + ) + ); + } + customToUpload.forEach((key, value) -> { + String customComponent = String.join(CUSTOM_DELIMITER, CUSTOM_METADATA, key); + uploadTasks.put( + customComponent, + getAsyncMetadataWriteAction(clusterState, customComponent, CUSTOM_METADATA_FORMAT, value, listener) + ); + }); + indexToUpload.forEach(indexMetadata -> { + uploadTasks.put(indexMetadata.getIndex().getName(), getIndexMetadataAsyncAction(clusterState, indexMetadata, listener)); + }); + + // start async upload of all required metadata files + for (CheckedRunnable uploadTask : uploadTasks.values()) { + uploadTask.run(); } - invokeIndexMetadataUploadListeners(toUpload, prevIndexMetadataByName, latch, exceptionList); + invokeIndexMetadataUploadListeners(indexToUpload, prevIndexMetadataByName, latch, exceptionList); try { - if (latch.await(getIndexMetadataUploadTimeout().millis(), TimeUnit.MILLISECONDS) == false) { + if (latch.await(getGlobalMetadataUploadTimeout().millis(), TimeUnit.MILLISECONDS) == false) { + // TODO: We should add metrics where transfer is timing out. [Issue: #10687] RemoteStateTransferException ex = new RemoteStateTransferException( String.format( Locale.ROOT, - "Timed out waiting for transfer of index metadata to complete - %s", - toUpload.stream().map(IndexMetadata::getIndex).map(Index::toString).collect(Collectors.joining("")) + "Timed out waiting for transfer of following metadata to complete - %s", + String.join(", ", uploadTasks.keySet()) ) ); exceptionList.forEach(ex::addSuppressed); @@ -492,26 +544,47 @@ private List writeIndexMetadataParallel( RemoteStateTransferException exception = new RemoteStateTransferException( String.format( Locale.ROOT, - "Timed out waiting for transfer of index metadata to complete - %s", - toUpload.stream().map(IndexMetadata::getIndex).map(Index::toString).collect(Collectors.joining("")) + "Timed out waiting for transfer of metadata to complete - %s", + String.join(", ", uploadTasks.keySet()) ), ex ); Thread.currentThread().interrupt(); throw exception; } - if (exceptionList.size() > 0) { + if (!exceptionList.isEmpty()) { RemoteStateTransferException exception = new RemoteStateTransferException( String.format( Locale.ROOT, - "Exception during transfer of IndexMetadata to Remote %s", - toUpload.stream().map(IndexMetadata::getIndex).map(Index::toString).collect(Collectors.joining("")) + "Exception during transfer of following metadata to Remote - %s", + String.join(", ", uploadTasks.keySet()) ) ); exceptionList.forEach(exception::addSuppressed); throw exception; } - return result; + UploadedMetadataResults response = new UploadedMetadataResults(); + results.forEach((name, uploadedMetadata) -> { + if (name.contains(CUSTOM_METADATA)) { + // component name for custom metadata will look like custom-- + String custom = name.split(DELIMITER)[0].split(CUSTOM_DELIMITER)[1]; + response.uploadedCustomMetadataMap.put( + custom, + new UploadedMetadataAttribute(custom, uploadedMetadata.getUploadedFilename()) + ); + } else if (COORDINATION_METADATA.equals(name)) { + response.uploadedCoordinationMetadata = (UploadedMetadataAttribute) uploadedMetadata; + } else if (SETTING_METADATA.equals(name)) { + response.uploadedSettingsMetadata = (UploadedMetadataAttribute) uploadedMetadata; + } else if (TEMPLATES_METADATA.equals(name)) { + response.uploadedTemplatesMetadata = (UploadedMetadataAttribute) uploadedMetadata; + } else if (name.contains(UploadedIndexMetadata.COMPONENT_PREFIX)) { + response.uploadedIndexMetadata.add((UploadedIndexMetadata) uploadedMetadata); + } else { + throw new IllegalStateException("Unknown metadata component name " + name); + } + }); + return response; } /** @@ -578,11 +651,11 @@ private ActionListener getIndexMetadataUploadActionListener( * @param indexMetadata {@link IndexMetadata} to upload * @param latchedActionListener listener to respond back on after upload finishes */ - private void writeIndexMetadataAsync( + private CheckedRunnable getIndexMetadataAsyncAction( ClusterState clusterState, IndexMetadata indexMetadata, - LatchedActionListener latchedActionListener - ) throws IOException { + LatchedActionListener latchedActionListener + ) { final BlobContainer indexMetadataContainer = indexMetadataContainer( clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), @@ -600,7 +673,7 @@ private void writeIndexMetadataAsync( ex -> latchedActionListener.onFailure(new RemoteStateTransferException(indexMetadata.getIndex().toString(), ex)) ); - INDEX_METADATA_FORMAT.writeAsyncWithUrgentPriority( + return () -> INDEX_METADATA_FORMAT.writeAsyncWithUrgentPriority( indexMetadata, indexMetadataContainer, indexMetadataFilename, @@ -610,6 +683,36 @@ private void writeIndexMetadataAsync( ); } + /** + * Allows async upload of Metadata components to remote + */ + + private CheckedRunnable getAsyncMetadataWriteAction( + ClusterState clusterState, + String component, + ChecksumBlobStoreFormat componentMetadataBlobStore, + ToXContent componentMetadata, + LatchedActionListener latchedActionListener + ) { + final BlobContainer globalMetadataContainer = globalMetadataContainer( + clusterState.getClusterName().value(), + clusterState.metadata().clusterUUID() + ); + final String componentMetadataFilename = metadataAttributeFileName(component, clusterState.metadata().version()); + ActionListener completionListener = ActionListener.wrap( + resp -> latchedActionListener.onResponse(new UploadedMetadataAttribute(component, componentMetadataFilename)), + ex -> latchedActionListener.onFailure(new RemoteStateTransferException(component, ex)) + ); + return () -> componentMetadataBlobStore.writeAsyncWithUrgentPriority( + componentMetadata, + globalMetadataContainer, + componentMetadataFilename, + blobStoreRepository.getCompressor(), + completionListener, + FORMAT_PARAMS + ); + } + public RemoteClusterStateCleanupManager getCleanupManager() { return remoteClusterStateCleanupManager; } @@ -627,7 +730,10 @@ public ClusterMetadataManifest markLastStateAsCommitted(ClusterState clusterStat clusterState, previousManifest.getIndices(), previousManifest.getPreviousClusterUUID(), - previousManifest.getGlobalMetadataFileName(), + previousManifest.getCoordinationMetadata(), + previousManifest.getSettingsMetadata(), + previousManifest.getTemplatesMetadata(), + previousManifest.getCustomMetadataMap(), true ); if (!previousManifest.isClusterUUIDCommitted() && committedManifest.isClusterUUIDCommitted()) { @@ -661,11 +767,19 @@ private ClusterMetadataManifest uploadManifest( ClusterState clusterState, List uploadedIndexMetadata, String previousClusterUUID, - String globalClusterMetadataFileName, + UploadedMetadataAttribute uploadedCoordinationMetadata, + UploadedMetadataAttribute uploadedSettingsMetadata, + UploadedMetadataAttribute uploadedTemplatesMetadata, + Map uploadedCustomMetadataMap, boolean committed ) throws IOException { synchronized (this) { - final String manifestFileName = getManifestFileName(clusterState.term(), clusterState.version(), committed); + final String manifestFileName = getManifestFileName( + clusterState.term(), + clusterState.version(), + committed, + MANIFEST_CURRENT_CODEC_VERSION + ); final ClusterMetadataManifest manifest = new ClusterMetadataManifest( clusterState.term(), clusterState.getVersion(), @@ -675,10 +789,14 @@ private ClusterMetadataManifest uploadManifest( nodeId, committed, MANIFEST_CURRENT_CODEC_VERSION, - globalClusterMetadataFileName, + null, uploadedIndexMetadata, previousClusterUUID, - clusterState.metadata().clusterUUIDCommitted() + clusterState.metadata().clusterUUIDCommitted(), + uploadedCoordinationMetadata, + uploadedSettingsMetadata, + uploadedTemplatesMetadata, + uploadedCustomMetadataMap ); writeMetadataManifest(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), manifest, manifestFileName); return manifest; @@ -699,7 +817,7 @@ private void writeMetadataManifest(String clusterName, String clusterUUID, Clust logger.trace(String.format(Locale.ROOT, "Manifest file uploaded successfully.")); }, ex -> { exceptionReference.set(ex); }), latch); - CLUSTER_METADATA_MANIFEST_FORMAT.writeAsyncWithUrgentPriority( + getClusterMetadataManifestBlobStoreFormat(fileName).writeAsyncWithUrgentPriority( uploadManifest, metadataManifestContainer, fileName, @@ -787,6 +905,31 @@ private void setMetadataManifestUploadTimeout(TimeValue newMetadataManifestUploa this.metadataManifestUploadTimeout = newMetadataManifestUploadTimeout; } + private Map getUpdatedCustoms(ClusterState currentState, ClusterState previousState) { + if (Metadata.isCustomMetadataEqual(previousState.metadata(), currentState.metadata())) { + return new HashMap<>(); + } + Map updatedCustom = new HashMap<>(); + Set currentCustoms = new HashSet<>(currentState.metadata().customs().keySet()); + for (Map.Entry cursor : previousState.metadata().customs().entrySet()) { + if (cursor.getValue().context().contains(Metadata.XContentContext.GATEWAY)) { + if (currentCustoms.contains(cursor.getKey()) + && !cursor.getValue().equals(currentState.metadata().custom(cursor.getKey()))) { + // If the custom metadata is updated, we need to upload the new version. + updatedCustom.put(cursor.getKey(), currentState.metadata().custom(cursor.getKey())); + } + currentCustoms.remove(cursor.getKey()); + } + } + for (String custom : currentCustoms) { + Metadata.Custom cursor = currentState.metadata().custom(custom); + if (cursor.context().contains(Metadata.XContentContext.GATEWAY)) { + updatedCustom.put(custom, cursor); + } + } + return updatedCustom; + } + public TimeValue getIndexMetadataUploadTimeout() { return this.indexMetadataUploadTimeout; } @@ -799,7 +942,7 @@ public TimeValue getMetadataManifestUploadTimeout() { return this.metadataManifestUploadTimeout; } - static String getManifestFileName(long term, long version, boolean committed) { + static String getManifestFileName(long term, long version, boolean committed, int codecVersion) { // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest/manifest______C/P____ return String.join( DELIMITER, @@ -808,7 +951,7 @@ static String getManifestFileName(long term, long version, boolean committed) { RemoteStoreUtils.invertLong(version), (committed ? "C" : "P"), // C for committed and P for published RemoteStoreUtils.invertLong(System.currentTimeMillis()), - String.valueOf(MANIFEST_CURRENT_CODEC_VERSION) // Keep the codec version at last place only, during read we reads last place to + String.valueOf(codecVersion) // Keep the codec version at last place only, during read we reads last place to // determine codec version. ); } @@ -837,6 +980,17 @@ private static String globalMetadataFileName(Metadata metadata) { ); } + private static String metadataAttributeFileName(String componentPrefix, Long metadataVersion) { + // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/global-metadata/______ + return String.join( + DELIMITER, + componentPrefix, + RemoteStoreUtils.invertLong(metadataVersion), + RemoteStoreUtils.invertLong(System.currentTimeMillis()), + String.valueOf(GLOBAL_METADATA_CURRENT_CODEC_VERSION) + ); + } + BlobPath getManifestFolderPath(String clusterName, String clusterUUID) { return getCusterMetadataBasePath(clusterName, clusterUUID).add(MANIFEST_PATH_TOKEN); } @@ -903,6 +1057,7 @@ public ClusterState getLatestClusterState(String clusterName, String clusterUUID String.format(Locale.ROOT, "Latest cluster metadata manifest is not present for the provided clusterUUID: %s", clusterUUID) ); } + // Fetch Global Metadata Metadata globalMetadata = getGlobalMetadata(clusterName, clusterUUID, clusterMetadataManifest.get()); @@ -929,6 +1084,34 @@ private Metadata getGlobalMetadata(String clusterName, String clusterUUID, Clust splitPath[splitPath.length - 1], blobStoreRepository.getNamedXContentRegistry() ); + } else if (clusterMetadataManifest.hasMetadataAttributesFiles()) { + CoordinationMetadata coordinationMetadata = getCoordinationMetadata( + clusterName, + clusterUUID, + clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename() + ); + Settings settingsMetadata = getSettingsMetadata( + clusterName, + clusterUUID, + clusterMetadataManifest.getSettingsMetadata().getUploadedFilename() + ); + TemplatesMetadata templatesMetadata = getTemplatesMetadata( + clusterName, + clusterUUID, + clusterMetadataManifest.getTemplatesMetadata().getUploadedFilename() + ); + Metadata.Builder builder = new Metadata.Builder(); + builder.coordinationMetadata(coordinationMetadata); + builder.persistentSettings(settingsMetadata); + builder.templates(templatesMetadata); + clusterMetadataManifest.getCustomMetadataMap() + .forEach( + (key, value) -> builder.putCustom( + key, + getCustomsMetadata(clusterName, clusterUUID, value.getUploadedFilename(), key) + ) + ); + return builder.build(); } else { return Metadata.EMPTY_METADATA; } @@ -940,6 +1123,92 @@ private Metadata getGlobalMetadata(String clusterName, String clusterUUID, Clust } } + private CoordinationMetadata getCoordinationMetadata(String clusterName, String clusterUUID, String coordinationMetadataFileName) { + try { + // Fetch Coordination metadata + if (coordinationMetadataFileName != null) { + String[] splitPath = coordinationMetadataFileName.split("/"); + return COORDINATION_METADATA_FORMAT.read( + globalMetadataContainer(clusterName, clusterUUID), + splitPath[splitPath.length - 1], + blobStoreRepository.getNamedXContentRegistry() + ); + } else { + return CoordinationMetadata.EMPTY_METADATA; + } + } catch (IOException e) { + throw new IllegalStateException( + String.format(Locale.ROOT, "Error while downloading Coordination Metadata - %s", coordinationMetadataFileName), + e + ); + } + } + + private Settings getSettingsMetadata(String clusterName, String clusterUUID, String settingsMetadataFileName) { + try { + // Fetch Settings metadata + if (settingsMetadataFileName != null) { + String[] splitPath = settingsMetadataFileName.split("/"); + return SETTINGS_METADATA_FORMAT.read( + globalMetadataContainer(clusterName, clusterUUID), + splitPath[splitPath.length - 1], + blobStoreRepository.getNamedXContentRegistry() + ); + } else { + return Settings.EMPTY; + } + } catch (IOException e) { + throw new IllegalStateException( + String.format(Locale.ROOT, "Error while downloading Settings Metadata - %s", settingsMetadataFileName), + e + ); + } + } + + private TemplatesMetadata getTemplatesMetadata(String clusterName, String clusterUUID, String templatesMetadataFileName) { + try { + // Fetch Templates metadata + if (templatesMetadataFileName != null) { + String[] splitPath = templatesMetadataFileName.split("/"); + return TEMPLATES_METADATA_FORMAT.read( + globalMetadataContainer(clusterName, clusterUUID), + splitPath[splitPath.length - 1], + blobStoreRepository.getNamedXContentRegistry() + ); + } else { + return TemplatesMetadata.EMPTY_METADATA; + } + } catch (IOException e) { + throw new IllegalStateException( + String.format(Locale.ROOT, "Error while downloading Templates Metadata - %s", templatesMetadataFileName), + e + ); + } + } + + private Metadata.Custom getCustomsMetadata(String clusterName, String clusterUUID, String customMetadataFileName, String custom) { + requireNonNull(customMetadataFileName); + try { + // Fetch Custom metadata + String[] splitPath = customMetadataFileName.split("/"); + ChecksumBlobStoreFormat customChecksumBlobStoreFormat = new ChecksumBlobStoreFormat<>( + "custom", + METADATA_NAME_FORMAT, + (parser -> Metadata.Custom.fromXContent(parser, custom)) + ); + return customChecksumBlobStoreFormat.read( + globalMetadataContainer(clusterName, clusterUUID), + splitPath[splitPath.length - 1], + blobStoreRepository.getNamedXContentRegistry() + ); + } catch (IOException e) { + throw new IllegalStateException( + String.format(Locale.ROOT, "Error while downloading Custom Metadata - %s", customMetadataFileName), + e + ); + } + } + /** * Fetch latest ClusterMetadataManifest from remote state store * @@ -1183,6 +1452,8 @@ private ChecksumBlobStoreFormat getClusterMetadataManif long codecVersion = getManifestCodecVersion(fileName); if (codecVersion == MANIFEST_CURRENT_CODEC_VERSION) { return CLUSTER_METADATA_MANIFEST_FORMAT; + } else if (codecVersion == ClusterMetadataManifest.CODEC_V1) { + return CLUSTER_METADATA_MANIFEST_FORMAT_V1; } else if (codecVersion == ClusterMetadataManifest.CODEC_V0) { return CLUSTER_METADATA_MANIFEST_FORMAT_V0; } @@ -1227,4 +1498,34 @@ public RemoteStateTransferException(String errorDesc, Throwable cause) { public RemotePersistenceStats getStats() { return remoteStateStats; } + + private static class UploadedMetadataResults { + List uploadedIndexMetadata; + Map uploadedCustomMetadataMap; + UploadedMetadataAttribute uploadedCoordinationMetadata; + UploadedMetadataAttribute uploadedSettingsMetadata; + UploadedMetadataAttribute uploadedTemplatesMetadata; + + public UploadedMetadataResults( + List uploadedIndexMetadata, + Map uploadedCustomMetadataMap, + UploadedMetadataAttribute uploadedCoordinationMetadata, + UploadedMetadataAttribute uploadedSettingsMetadata, + UploadedMetadataAttribute uploadedTemplatesMetadata + ) { + this.uploadedIndexMetadata = uploadedIndexMetadata; + this.uploadedCustomMetadataMap = uploadedCustomMetadataMap; + this.uploadedCoordinationMetadata = uploadedCoordinationMetadata; + this.uploadedSettingsMetadata = uploadedSettingsMetadata; + this.uploadedTemplatesMetadata = uploadedTemplatesMetadata; + } + + public UploadedMetadataResults() { + this.uploadedIndexMetadata = new ArrayList<>(); + this.uploadedCustomMetadataMap = new HashMap<>(); + this.uploadedCoordinationMetadata = null; + this.uploadedSettingsMetadata = null; + this.uploadedTemplatesMetadata = null; + } + } } diff --git a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java index 1c0dc7fc1ca2d..bd71aecf89101 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java @@ -930,20 +930,20 @@ public void testHandlePrePublishAndCommitWhenRemoteStateEnabled() throws IOExcep final VotingConfiguration initialConfig = VotingConfiguration.of(node1); final ClusterState clusterState = clusterState(0L, 0L, node1, initialConfig, initialConfig, 42L); final String previousClusterUUID = "prev-cluster-uuid"; - final ClusterMetadataManifest manifest = new ClusterMetadataManifest( - 0L, - 0L, - randomAlphaOfLength(10), - randomAlphaOfLength(10), - Version.CURRENT, - randomAlphaOfLength(10), - false, - 1, - randomAlphaOfLength(10), - Collections.emptyList(), - randomAlphaOfLength(10), - true - ); + final ClusterMetadataManifest manifest = ClusterMetadataManifest.builder() + .clusterTerm(0L) + .stateVersion(0L) + .clusterUUID(randomAlphaOfLength(10)) + .stateUUID(randomAlphaOfLength(10)) + .opensearchVersion(Version.CURRENT) + .nodeId(randomAlphaOfLength(10)) + .committed(false) + .codecVersion(1) + .globalMetadataFileName(randomAlphaOfLength(10)) + .indices(Collections.emptyList()) + .previousClusterUUID(randomAlphaOfLength(10)) + .clusterUUIDCommitted(true) + .build(); Mockito.when(remoteClusterStateService.writeFullMetadata(clusterState, previousClusterUUID)).thenReturn(manifest); final PersistedStateRegistry persistedStateRegistry = persistedStateRegistry(); diff --git a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java index 6c9a3201656d7..0b3cd49140939 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java @@ -9,6 +9,9 @@ package org.opensearch.gateway.remote; import org.opensearch.Version; +import org.opensearch.cluster.metadata.IndexGraveyard; +import org.opensearch.cluster.metadata.RepositoriesMetadata; +import org.opensearch.cluster.metadata.WeightedRoutingMetadata; import org.opensearch.common.xcontent.json.JsonXContent; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; @@ -16,32 +19,38 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; +import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; import org.opensearch.test.EqualsHashCodeTestUtils; import org.opensearch.test.OpenSearchTestCase; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V0; +import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V1; public class ClusterMetadataManifestTests extends OpenSearchTestCase { public void testClusterMetadataManifestXContentV0() throws IOException { UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "test-uuid", "/test/upload/path"); - ClusterMetadataManifest originalManifest = new ClusterMetadataManifest( - 1L, - 1L, - "test-cluster-uuid", - "test-state-uuid", - Version.CURRENT, - "test-node-id", - false, - ClusterMetadataManifest.CODEC_V0, - null, - Collections.singletonList(uploadedIndexMetadata), - "prev-cluster-uuid", - true - ); + ClusterMetadataManifest originalManifest = ClusterMetadataManifest.builder() + .clusterTerm(1L) + .stateVersion(1L) + .clusterUUID("test-cluster-uuid") + .stateUUID("test-state-uuid") + .opensearchVersion(Version.CURRENT) + .nodeId("test-node-id") + .committed(false) + .codecVersion(CODEC_V0) + .indices(Collections.singletonList(uploadedIndexMetadata)) + .previousClusterUUID("prev-cluster-uuid") + .clusterUUIDCommitted(true) + .build(); final XContentBuilder builder = JsonXContent.contentBuilder(); builder.startObject(); originalManifest.toXContent(builder, ToXContent.EMPTY_PARAMS); @@ -53,6 +62,33 @@ public void testClusterMetadataManifestXContentV0() throws IOException { } } + public void testClusterMetadataManifestXContentV1() throws IOException { + UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "test-uuid", "/test/upload/path"); + ClusterMetadataManifest originalManifest = ClusterMetadataManifest.builder() + .clusterTerm(1L) + .stateVersion(1L) + .clusterUUID("test-cluster-uuid") + .stateUUID("test-state-uuid") + .opensearchVersion(Version.CURRENT) + .nodeId("test-node-id") + .committed(false) + .codecVersion(CODEC_V1) + .globalMetadataFileName("test-global-metadata-file") + .indices(Collections.singletonList(uploadedIndexMetadata)) + .previousClusterUUID("prev-cluster-uuid") + .clusterUUIDCommitted(true) + .build(); + final XContentBuilder builder = JsonXContent.contentBuilder(); + builder.startObject(); + originalManifest.toXContent(builder, ToXContent.EMPTY_PARAMS); + builder.endObject(); + + try (XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder))) { + final ClusterMetadataManifest fromXContentManifest = ClusterMetadataManifest.fromXContentV1(parser); + assertEquals(originalManifest, fromXContentManifest); + } + } + public void testClusterMetadataManifestXContent() throws IOException { UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "test-uuid", "/test/upload/path"); ClusterMetadataManifest originalManifest = new ClusterMetadataManifest( @@ -63,11 +99,31 @@ public void testClusterMetadataManifestXContent() throws IOException { Version.CURRENT, "test-node-id", false, - ClusterMetadataManifest.CODEC_V1, - "test-global-metadata-file", + ClusterMetadataManifest.CODEC_V2, + null, Collections.singletonList(uploadedIndexMetadata), "prev-cluster-uuid", - true + true, + new UploadedMetadataAttribute(RemoteClusterStateService.COORDINATION_METADATA, "coordination-file"), + new UploadedMetadataAttribute(RemoteClusterStateService.SETTING_METADATA, "setting-file"), + new UploadedMetadataAttribute(RemoteClusterStateService.TEMPLATES_METADATA, "templates-file"), + Collections.unmodifiableList( + Arrays.asList( + new UploadedMetadataAttribute( + RemoteClusterStateService.CUSTOM_METADATA + RemoteClusterStateService.CUSTOM_DELIMITER + RepositoriesMetadata.TYPE, + "custom--repositories-file" + ), + new UploadedMetadataAttribute( + RemoteClusterStateService.CUSTOM_METADATA + RemoteClusterStateService.CUSTOM_DELIMITER + IndexGraveyard.TYPE, + "custom--index_graveyard-file" + ), + new UploadedMetadataAttribute( + RemoteClusterStateService.CUSTOM_METADATA + RemoteClusterStateService.CUSTOM_DELIMITER + + WeightedRoutingMetadata.TYPE, + "custom--weighted_routing_netadata-file" + ) + ) + ).stream().collect(Collectors.toMap(UploadedMetadataAttribute::getAttributeName, Function.identity())) ); final XContentBuilder builder = JsonXContent.contentBuilder(); builder.startObject(); @@ -89,11 +145,31 @@ public void testClusterMetadataManifestSerializationEqualsHashCode() { Version.CURRENT, "B10RX1f5RJenMQvYccCgSQ", true, - 1, - "test-global-metadata-file", + 2, + null, randomUploadedIndexMetadataList(), "yfObdx8KSMKKrXf8UyHhM", - true + true, + new UploadedMetadataAttribute(RemoteClusterStateService.COORDINATION_METADATA, "coordination-file"), + new UploadedMetadataAttribute(RemoteClusterStateService.SETTING_METADATA, "setting-file"), + new UploadedMetadataAttribute(RemoteClusterStateService.TEMPLATES_METADATA, "templates-file"), + Collections.unmodifiableList( + Arrays.asList( + new UploadedMetadataAttribute( + RemoteClusterStateService.CUSTOM_METADATA + RemoteClusterStateService.CUSTOM_DELIMITER + RepositoriesMetadata.TYPE, + "custom--repositories-file" + ), + new UploadedMetadataAttribute( + RemoteClusterStateService.CUSTOM_METADATA + RemoteClusterStateService.CUSTOM_DELIMITER + IndexGraveyard.TYPE, + "custom--index_graveyard-file" + ), + new UploadedMetadataAttribute( + RemoteClusterStateService.CUSTOM_METADATA + RemoteClusterStateService.CUSTOM_DELIMITER + + WeightedRoutingMetadata.TYPE, + "custom--weighted_routing_netadata-file" + ) + ) + ).stream().collect(Collectors.toMap(UploadedMetadataAttribute::getAttributeName, Function.identity())) ); { // Mutate Cluster Term EqualsHashCodeTestUtils.checkEqualsAndHashCode( diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 733e87f73cc23..5f0c371a3137e 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -15,7 +15,9 @@ import org.opensearch.cluster.coordination.CoordinationMetadata; import org.opensearch.cluster.metadata.IndexGraveyard; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.IndexTemplateMetadata; import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.metadata.TemplatesMetadata; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; @@ -39,6 +41,7 @@ import org.opensearch.core.index.Index; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; +import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; import org.opensearch.index.remote.RemoteIndexPathUploader; import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.indices.IndicesModule; @@ -48,6 +51,7 @@ import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.repositories.fs.FsRepository; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.TestCustomMetadata; import org.opensearch.test.VersionUtils; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; @@ -61,32 +65,40 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.BiConsumer; import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.Stream; import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatchers; import static java.util.stream.Collectors.toList; +import static org.opensearch.gateway.remote.RemoteClusterStateService.COORDINATION_METADATA; import static org.opensearch.gateway.remote.RemoteClusterStateService.DELIMITER; import static org.opensearch.gateway.remote.RemoteClusterStateService.FORMAT_PARAMS; import static org.opensearch.gateway.remote.RemoteClusterStateService.INDEX_METADATA_CURRENT_CODEC_VERSION; import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_CURRENT_CODEC_VERSION; import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_FILE_PREFIX; import static org.opensearch.gateway.remote.RemoteClusterStateService.METADATA_FILE_PREFIX; +import static org.opensearch.gateway.remote.RemoteClusterStateService.SETTING_METADATA; +import static org.opensearch.gateway.remote.RemoteClusterStateService.TEMPLATES_METADATA; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.mockito.ArgumentMatchers.any; @@ -225,6 +237,11 @@ public void testWriteFullMetadataSuccess() throws IOException { assertThat(manifest.getClusterUUID(), is(expectedManifest.getClusterUUID())); assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID())); assertThat(manifest.getPreviousClusterUUID(), is(expectedManifest.getPreviousClusterUUID())); + assertThat(manifest.getGlobalMetadataFileName(), nullValue()); + assertThat(manifest.getCoordinationMetadata(), notNullValue()); + assertThat(manifest.getSettingsMetadata(), notNullValue()); + assertThat(manifest.getTemplatesMetadata(), notNullValue()); + assertFalse(manifest.getCustomMetadataMap().isEmpty()); } public void testWriteFullMetadataInParallelSuccess() throws IOException { @@ -233,16 +250,11 @@ public void testWriteFullMetadataInParallelSuccess() throws IOException { ArgumentCaptor> actionListenerArgumentCaptor = ArgumentCaptor.forClass(ActionListener.class); ArgumentCaptor writeContextArgumentCaptor = ArgumentCaptor.forClass(WriteContext.class); - AtomicReference capturedWriteContext = new AtomicReference<>(); + ConcurrentHashMap capturedWriteContext = new ConcurrentHashMap<>(); doAnswer((i) -> { actionListenerArgumentCaptor.getValue().onResponse(null); - return null; - }).doAnswer((i) -> { - actionListenerArgumentCaptor.getValue().onResponse(null); - capturedWriteContext.set(writeContextArgumentCaptor.getValue()); - return null; - }).doAnswer((i) -> { - actionListenerArgumentCaptor.getValue().onResponse(null); + WriteContext writeContext = writeContextArgumentCaptor.getValue(); + capturedWriteContext.put(writeContext.getFileName().split(DELIMITER)[0], writeContextArgumentCaptor.getValue()); return null; }).when(container).asyncBlobUpload(writeContextArgumentCaptor.capture(), actionListenerArgumentCaptor.capture()); @@ -265,37 +277,41 @@ public void testWriteFullMetadataInParallelSuccess() throws IOException { assertThat(manifest.getIndices().get(0).getIndexName(), is(uploadedIndexMetadata.getIndexName())); assertThat(manifest.getIndices().get(0).getIndexUUID(), is(uploadedIndexMetadata.getIndexUUID())); assertThat(manifest.getIndices().get(0).getUploadedFilename(), notNullValue()); - assertThat(manifest.getGlobalMetadataFileName(), notNullValue()); + assertThat(manifest.getGlobalMetadataFileName(), nullValue()); + assertThat(manifest.getCoordinationMetadata(), notNullValue()); + assertThat(manifest.getSettingsMetadata(), notNullValue()); + assertThat(manifest.getTemplatesMetadata(), notNullValue()); + assertThat(manifest.getCustomMetadataMap().size(), not(0)); assertThat(manifest.getClusterTerm(), is(expectedManifest.getClusterTerm())); assertThat(manifest.getStateVersion(), is(expectedManifest.getStateVersion())); assertThat(manifest.getClusterUUID(), is(expectedManifest.getClusterUUID())); assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID())); assertThat(manifest.getPreviousClusterUUID(), is(expectedManifest.getPreviousClusterUUID())); - assertEquals(actionListenerArgumentCaptor.getAllValues().size(), 3); - assertEquals(writeContextArgumentCaptor.getAllValues().size(), 3); + assertEquals(7, actionListenerArgumentCaptor.getAllValues().size()); + assertEquals(7, writeContextArgumentCaptor.getAllValues().size()); - byte[] writtenBytes = capturedWriteContext.get() + byte[] writtenBytes = capturedWriteContext.get("metadata") .getStreamProvider(Integer.MAX_VALUE) .provideStream(0) .getInputStream() .readAllBytes(); IndexMetadata writtenIndexMetadata = RemoteClusterStateService.INDEX_METADATA_FORMAT.deserialize( - capturedWriteContext.get().getFileName(), + capturedWriteContext.get("metadata").getFileName(), blobStoreRepository.getNamedXContentRegistry(), new BytesArray(writtenBytes) ); - assertEquals(capturedWriteContext.get().getWritePriority(), WritePriority.URGENT); + assertEquals(capturedWriteContext.get("metadata").getWritePriority(), WritePriority.URGENT); assertEquals(writtenIndexMetadata.getNumberOfShards(), 1); assertEquals(writtenIndexMetadata.getNumberOfReplicas(), 0); assertEquals(writtenIndexMetadata.getIndex().getName(), "test-index"); assertEquals(writtenIndexMetadata.getIndex().getUUID(), "index-uuid"); long expectedChecksum = RemoteTransferContainer.checksumOfChecksum(new ByteArrayIndexInput("metadata-filename", writtenBytes), 8); - if (capturedWriteContext.get().doRemoteDataIntegrityCheck()) { - assertEquals(capturedWriteContext.get().getExpectedChecksum().longValue(), expectedChecksum); + if (capturedWriteContext.get("metadata").doRemoteDataIntegrityCheck()) { + assertEquals(capturedWriteContext.get("metadata").getExpectedChecksum().longValue(), expectedChecksum); } else { - assertEquals(capturedWriteContext.get().getExpectedChecksum(), null); + assertEquals(capturedWriteContext.get("metadata").getExpectedChecksum(), null); } } @@ -353,7 +369,7 @@ public void testTimeoutWhileWritingManifestFile() throws IOException { remoteClusterStateService.writeFullMetadata(clusterState, randomAlphaOfLength(10)); } catch (Exception e) { assertTrue(e instanceof RemoteClusterStateService.RemoteStateTransferException); - assertTrue(e.getMessage().contains("Timed out waiting for transfer of manifest file to complete")); + assertTrue(e.getMessage().contains("Timed out waiting for transfer of following metadata to complete")); } } @@ -438,13 +454,28 @@ public void testWriteIncrementalMetadataSuccess() throws IOException { } /* - * Here we will verify the migration of manifest file from codec V0 and V1. + * Here we will verify the migration of manifest file from codec V0. * * Initially codec version is 0 and global metadata is also null, we will perform index metadata update. - * In final manifest codec version should be 1 and - * global metadata should be updated, even if it was not changed in this cluster state update + * In final manifest codec version should be 2 and have metadata files updated, + * even if it was not changed in this cluster state update */ - public void testMigrationFromCodecV0ManifestToCodecV1Manifest() throws IOException { + public void testMigrationFromCodecV0ManifestToCodecV2Manifest() throws IOException { + verifyCodecMigrationManifest(ClusterMetadataManifest.CODEC_V0); + } + + /* + * Here we will verify the migration of manifest file from codec V1. + * + * Initially codec version is 1 and a global metadata file is there, we will perform index metadata update. + * In final manifest codec version should be 2 and have metadata files updated, + * even if it was not changed in this cluster state update + */ + public void testMigrationFromCodecV1ManifestToCodecV2Manifest() throws IOException { + verifyCodecMigrationManifest(ClusterMetadataManifest.CODEC_V1); + } + + private void verifyCodecMigrationManifest(int previousCodec) throws IOException { mockBlobStoreObjects(); final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build(); final ClusterState previousClusterState = ClusterState.builder(ClusterName.DEFAULT) @@ -464,7 +495,7 @@ public void testMigrationFromCodecV0ManifestToCodecV1Manifest() throws IOExcepti // previous manifest with codec 0 and null global metadata final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder() - .codecVersion(ClusterMetadataManifest.CODEC_V0) + .codecVersion(previousCodec) .globalMetadataFileName(null) .indices(Collections.emptyList()) .build(); @@ -477,12 +508,28 @@ public void testMigrationFromCodecV0ManifestToCodecV1Manifest() throws IOExcepti ); // global metadata is updated - assertThat(manifestAfterUpdate.getGlobalMetadataFileName(), notNullValue()); + assertThat(manifestAfterUpdate.hasMetadataAttributesFiles(), is(true)); // Manifest file with codec version with 1 is updated. - assertThat(manifestAfterUpdate.getCodecVersion(), is(ClusterMetadataManifest.CODEC_V1)); + assertThat(manifestAfterUpdate.getCodecVersion(), is(MANIFEST_CURRENT_CODEC_VERSION)); } - public void testWriteIncrementalGlobalMetadataSuccess() throws IOException { + public void testWriteIncrementalGlobalMetadataFromCodecV0Success() throws IOException { + final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder().indices(Collections.emptyList()).build(); + + verifyWriteIncrementalGlobalMetadataFromOlderCodecSuccess(previousManifest); + } + + public void testWriteIncrementalGlobalMetadataFromCodecV1Success() throws IOException { + final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder() + .codecVersion(1) + .globalMetadataFileName("global-metadata-file") + .indices(Collections.emptyList()) + .build(); + + verifyWriteIncrementalGlobalMetadataFromOlderCodecSuccess(previousManifest); + } + + private void verifyWriteIncrementalGlobalMetadataFromOlderCodecSuccess(ClusterMetadataManifest previousManifest) throws IOException { final ClusterState clusterState = generateClusterStateWithGlobalMetadata().nodes(nodesWithLocalNodeClusterManager()).build(); mockBlobStoreObjects(); final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build(); @@ -490,12 +537,6 @@ public void testWriteIncrementalGlobalMetadataSuccess() throws IOException { .metadata(Metadata.builder().coordinationMetadata(coordinationMetadata)) .build(); - final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder() - .codecVersion(2) - .globalMetadataFileName("global-metadata-file") - .indices(Collections.emptyList()) - .build(); - remoteClusterStateService.start(); final ClusterMetadataManifest manifest = remoteClusterStateService.writeIncrementalMetadata( previousClusterState, @@ -504,8 +545,8 @@ public void testWriteIncrementalGlobalMetadataSuccess() throws IOException { ); final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() + .codecVersion(2) .indices(Collections.emptyList()) - .globalMetadataFileName("mock-filename") .clusterTerm(1L) .stateVersion(1L) .stateUUID("state-uuid") @@ -513,130 +554,354 @@ public void testWriteIncrementalGlobalMetadataSuccess() throws IOException { .previousClusterUUID("prev-cluster-uuid") .build(); - assertThat(manifest.getGlobalMetadataFileName(), notNullValue()); - assertThat(manifest.getClusterTerm(), is(expectedManifest.getClusterTerm())); - assertThat(manifest.getStateVersion(), is(expectedManifest.getStateVersion())); - assertThat(manifest.getClusterUUID(), is(expectedManifest.getClusterUUID())); - assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID())); + assertNull(manifest.getGlobalMetadataFileName()); + assertNotNull(manifest.getCoordinationMetadata()); + assertNotNull(manifest.getSettingsMetadata()); + assertNotNull(manifest.getTemplatesMetadata()); + assertNotEquals(0, manifest.getCustomMetadataMap().size()); + + assertEquals(expectedManifest.getClusterTerm(), manifest.getClusterTerm()); + assertEquals(expectedManifest.getStateVersion(), manifest.getStateVersion()); + assertEquals(expectedManifest.getClusterUUID(), manifest.getClusterUUID()); + assertEquals(expectedManifest.getStateUUID(), manifest.getStateUUID()); + assertEquals(expectedManifest.getCodecVersion(), manifest.getCodecVersion()); } - /* - * Here we will verify index metadata is not uploaded again if change is only in global metadata - */ - public void testGlobalMetadataOnlyUpdated() throws IOException { - // setup - mockBlobStoreObjects(); - final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build(); - final ClusterState initialClusterState = ClusterState.builder(ClusterName.DEFAULT) - .metadata(Metadata.builder().coordinationMetadata(coordinationMetadata).version(randomNonNegativeLong())) + public void testCoordinationMetadataOnlyUpdated() throws IOException { + // Updating the voting config, as updating the term will upload the full cluster state and other files will also get updated + Function updater = (initialClusterState) -> ClusterState.builder(initialClusterState) + .metadata( + Metadata.builder(initialClusterState.metadata()) + .coordinationMetadata( + CoordinationMetadata.builder(initialClusterState.coordinationMetadata()) + .addVotingConfigExclusion(new CoordinationMetadata.VotingConfigExclusion("excludedNodeId", "excludedNodeName")) + .build() + ) + .build() + ) .build(); - final ClusterMetadataManifest initialManifest = ClusterMetadataManifest.builder() - .codecVersion(2) - .globalMetadataFileName("global-metadata-file") - .indices(Collections.emptyList()) + verifyMetadataAttributeOnlyUpdated(updater, (initialMetadata, metadataAfterUpdate) -> { + // Verify that index metadata information is same in manifest files + assertEquals(metadataAfterUpdate.getIndices().size(), initialMetadata.getIndices().size()); + IntStream.range(0, initialMetadata.getIndices().size()).forEach(i -> { + assertEquals(metadataAfterUpdate.getIndices().get(i).getIndexName(), initialMetadata.getIndices().get(i).getIndexName()); + assertEquals(metadataAfterUpdate.getIndices().get(i).getIndexUUID(), initialMetadata.getIndices().get(i).getIndexUUID()); + // since timestamp is part of file name, if file name is same we can confirm that file is not update in global metadata + // update + assertEquals( + metadataAfterUpdate.getIndices().get(i).getUploadedFilename(), + initialMetadata.getIndices().get(i).getUploadedFilename() + ); + }); + + // coordination metadata file would have changed + assertFalse( + metadataAfterUpdate.getCoordinationMetadata() + .getUploadedFilename() + .equalsIgnoreCase(initialMetadata.getCoordinationMetadata().getUploadedFilename()) + ); + // Other files will be equal + assertEquals( + metadataAfterUpdate.getSettingsMetadata().getUploadedFilename(), + initialMetadata.getSettingsMetadata().getUploadedFilename() + ); + assertEquals(metadataAfterUpdate.getTemplatesMetadata(), initialMetadata.getTemplatesMetadata()); + assertEquals(metadataAfterUpdate.getCustomMetadataMap(), initialMetadata.getCustomMetadataMap()); + }); + } + + public void testSettingsMetadataOnlyUpdated() throws IOException { + Function updater = (initialClusterState) -> ClusterState.builder(initialClusterState) + .metadata( + Metadata.builder(initialClusterState.metadata()).persistentSettings(Settings.builder().put("foo", "bar").build()).build() + ) .build(); - remoteClusterStateService.start(); - // Initial cluster state with index. - final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); - // Updating remote cluster state with changing index metadata - final ClusterMetadataManifest manifestAfterIndexMetadataUpdate = remoteClusterStateService.writeIncrementalMetadata( - initialClusterState, - clusterState, - initialManifest - ); + verifyMetadataAttributeOnlyUpdated(updater, (initialMetadata, metadataAfterUpdate) -> { + // Verify that index metadata information is same in manifest files + assertEquals(metadataAfterUpdate.getIndices().size(), initialMetadata.getIndices().size()); + IntStream.range(0, initialMetadata.getIndices().size()).forEach(i -> { + assertEquals(metadataAfterUpdate.getIndices().get(i).getIndexName(), initialMetadata.getIndices().get(i).getIndexName()); + assertEquals(metadataAfterUpdate.getIndices().get(i).getIndexUUID(), initialMetadata.getIndices().get(i).getIndexUUID()); + // since timestamp is part of file name, if file name is same we can confirm that file is not update in global metadata + // update + assertEquals( + metadataAfterUpdate.getIndices().get(i).getUploadedFilename(), + initialMetadata.getIndices().get(i).getUploadedFilename() + ); + }); - // new cluster state where only global metadata is different - Metadata newMetadata = Metadata.builder(clusterState.metadata()) - .persistentSettings(Settings.builder().put("cluster.blocks.read_only", true).build()) - .version(randomNonNegativeLong()) + // setting metadata file would have changed + assertFalse( + metadataAfterUpdate.getSettingsMetadata() + .getUploadedFilename() + .equalsIgnoreCase(initialMetadata.getSettingsMetadata().getUploadedFilename()) + ); + assertEquals(metadataAfterUpdate.getCoordinationMetadata(), initialMetadata.getCoordinationMetadata()); + assertEquals(metadataAfterUpdate.getTemplatesMetadata(), initialMetadata.getTemplatesMetadata()); + assertEquals(metadataAfterUpdate.getCustomMetadataMap(), initialMetadata.getCustomMetadataMap()); + }); + } + + public void testTemplatesMetadataOnlyUpdated() throws IOException { + Function updater = (initialClusterState) -> ClusterState.builder(initialClusterState) + .metadata( + Metadata.builder(initialClusterState.metadata()) + .templates( + TemplatesMetadata.builder() + .put( + IndexTemplateMetadata.builder("template" + randomAlphaOfLength(3)) + .patterns(Arrays.asList("bar-*", "foo-*")) + .settings( + Settings.builder() + .put("random_index_setting_" + randomAlphaOfLength(3), randomAlphaOfLength(5)) + .build() + ) + .build() + ) + .build() + ) + .build() + ) .build(); - ClusterState newClusterState = ClusterState.builder(clusterState).metadata(newMetadata).build(); - // updating remote cluster state with global metadata - final ClusterMetadataManifest manifestAfterGlobalMetadataUpdate = remoteClusterStateService.writeIncrementalMetadata( - clusterState, - newClusterState, - manifestAfterIndexMetadataUpdate - ); + verifyMetadataAttributeOnlyUpdated(updater, (initialMetadata, metadataAfterUpdate) -> { + // Verify that index metadata information is same in manifest files + assertEquals(metadataAfterUpdate.getIndices().size(), initialMetadata.getIndices().size()); + IntStream.range(0, initialMetadata.getIndices().size()).forEach(i -> { + assertEquals(metadataAfterUpdate.getIndices().get(i).getIndexName(), initialMetadata.getIndices().get(i).getIndexName()); + assertEquals(metadataAfterUpdate.getIndices().get(i).getIndexUUID(), initialMetadata.getIndices().get(i).getIndexUUID()); + // since timestamp is part of file name, if file name is same we can confirm that file is not update in global metadata + // update + assertEquals( + metadataAfterUpdate.getIndices().get(i).getUploadedFilename(), + initialMetadata.getIndices().get(i).getUploadedFilename() + ); + }); - // Verify that index metadata information is same in manifest files - assertThat(manifestAfterIndexMetadataUpdate.getIndices().size(), is(manifestAfterGlobalMetadataUpdate.getIndices().size())); - assertThat( - manifestAfterIndexMetadataUpdate.getIndices().get(0).getIndexName(), - is(manifestAfterGlobalMetadataUpdate.getIndices().get(0).getIndexName()) - ); - assertThat( - manifestAfterIndexMetadataUpdate.getIndices().get(0).getIndexUUID(), - is(manifestAfterGlobalMetadataUpdate.getIndices().get(0).getIndexUUID()) - ); + // template metadata file would have changed + assertFalse( + metadataAfterUpdate.getTemplatesMetadata() + .getUploadedFilename() + .equalsIgnoreCase(initialMetadata.getTemplatesMetadata().getUploadedFilename()) + ); + assertEquals(metadataAfterUpdate.getCoordinationMetadata(), initialMetadata.getCoordinationMetadata()); + assertEquals(metadataAfterUpdate.getSettingsMetadata(), initialMetadata.getSettingsMetadata()); + assertEquals(metadataAfterUpdate.getCustomMetadataMap(), initialMetadata.getCustomMetadataMap()); + }); + } - // since timestamp is part of file name, if file name is same we can confirm that file is not update in global metadata update - assertThat( - manifestAfterIndexMetadataUpdate.getIndices().get(0).getUploadedFilename(), - is(manifestAfterGlobalMetadataUpdate.getIndices().get(0).getUploadedFilename()) - ); + public void testCustomMetadataOnlyUpdated() throws IOException { + Function updater = (initialClusterState) -> ClusterState.builder(initialClusterState) + .metadata( + Metadata.builder(initialClusterState.metadata()) + .putCustom("custom_metadata_type", new CustomMetadata1("mock_custom_metadata")) + .build() + ) + .build(); - // global metadata file would have changed - assertFalse( - manifestAfterIndexMetadataUpdate.getGlobalMetadataFileName() - .equalsIgnoreCase(manifestAfterGlobalMetadataUpdate.getGlobalMetadataFileName()) + verifyMetadataAttributeOnlyUpdated(updater, (initialMetadata, metadataAfterUpdate) -> { + // Verify that index metadata information is same in manifest files + assertEquals(metadataAfterUpdate.getIndices().size(), initialMetadata.getIndices().size()); + IntStream.range(0, initialMetadata.getIndices().size()).forEach(i -> { + assertEquals(metadataAfterUpdate.getIndices().get(i).getIndexName(), initialMetadata.getIndices().get(i).getIndexName()); + assertEquals(metadataAfterUpdate.getIndices().get(i).getIndexUUID(), initialMetadata.getIndices().get(i).getIndexUUID()); + // since timestamp is part of file name, if file name is same we can confirm that file is not update in global metadata + // update + assertEquals( + metadataAfterUpdate.getIndices().get(i).getUploadedFilename(), + initialMetadata.getIndices().get(i).getUploadedFilename() + ); + // custom metadata map would have changed + assertNotEquals(metadataAfterUpdate.getCustomMetadataMap(), initialMetadata.getCustomMetadataMap()); + assertEquals(initialMetadata.getCustomMetadataMap().size() + 1, metadataAfterUpdate.getCustomMetadataMap().size()); + initialMetadata.getCustomMetadataMap().forEach((k, v) -> { + assertTrue(metadataAfterUpdate.getCustomMetadataMap().containsKey(k)); + assertEquals(v, metadataAfterUpdate.getCustomMetadataMap().get(k)); + }); + assertEquals(metadataAfterUpdate.getCoordinationMetadata(), initialMetadata.getCoordinationMetadata()); + assertEquals(metadataAfterUpdate.getSettingsMetadata(), initialMetadata.getSettingsMetadata()); + assertEquals(metadataAfterUpdate.getTemplatesMetadata(), initialMetadata.getTemplatesMetadata()); + }); + }); + } + + public void testCustomMetadataDeletedUpdatedAndAdded() throws IOException { + // setup + mockBlobStoreObjects(); + + // Initial cluster state with index. + final ClusterState initialClusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); + remoteClusterStateService.start(); + final ClusterMetadataManifest initialManifest = remoteClusterStateService.writeFullMetadata(initialClusterState, "_na_"); + + ClusterState clusterState1 = ClusterState.builder(initialClusterState) + .metadata( + Metadata.builder(initialClusterState.metadata()) + .putCustom("custom1", new CustomMetadata1("mock_custom_metadata1")) + .putCustom("custom2", new CustomMetadata1("mock_custom_metadata2")) + .putCustom("custom3", new CustomMetadata1("mock_custom_metadata3")) + ) + .build(); + + ClusterMetadataManifest manifest1 = remoteClusterStateService.writeIncrementalMetadata( + initialClusterState, + clusterState1, + initialManifest ); + // remove custom1 from the cluster state, update custom2, custom3 is at it is, added custom4 + ClusterState clusterState2 = ClusterState.builder(initialClusterState) + .metadata( + Metadata.builder(initialClusterState.metadata()) + .putCustom("custom2", new CustomMetadata1("mock_updated_custom_metadata")) + .putCustom("custom3", new CustomMetadata1("mock_custom_metadata3")) + .putCustom("custom4", new CustomMetadata1("mock_custom_metadata4")) + ) + .build(); + ClusterMetadataManifest manifest2 = remoteClusterStateService.writeIncrementalMetadata(clusterState1, clusterState2, manifest1); + // custom1 is removed + assertFalse(manifest2.getCustomMetadataMap().containsKey("custom1")); + // custom2 is updated + assertNotEquals(manifest1.getCustomMetadataMap().get("custom2"), manifest2.getCustomMetadataMap().get("custom2")); + // custom3 is unchanged + assertEquals(manifest1.getCustomMetadataMap().get("custom3"), manifest2.getCustomMetadataMap().get("custom3")); + // custom4 is added + assertTrue(manifest2.getCustomMetadataMap().containsKey("custom4")); + assertFalse(manifest1.getCustomMetadataMap().containsKey("custom4")); } /* * Here we will verify global metadata is not uploaded again if change is only in index metadata */ public void testIndexMetadataOnlyUpdated() throws IOException { + Function updater = (initialState) -> ClusterState.builder(initialState) + .metadata( + Metadata.builder(initialState.metadata()) + .put( + IndexMetadata.builder("test" + randomAlphaOfLength(3)) + .settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, "uuid") + .build() + ) + .numberOfShards(1) + .numberOfReplicas(0) + ) + .build() + ) + .build(); + + verifyMetadataAttributeOnlyUpdated(updater, (initialMetadata, metadataAfterUpdate) -> { + assertEquals(metadataAfterUpdate.getCoordinationMetadata(), initialMetadata.getCoordinationMetadata()); + assertEquals(metadataAfterUpdate.getSettingsMetadata(), initialMetadata.getSettingsMetadata()); + assertEquals(metadataAfterUpdate.getTemplatesMetadata(), initialMetadata.getTemplatesMetadata()); + assertEquals(metadataAfterUpdate.getCustomMetadataMap(), initialMetadata.getCustomMetadataMap()); + assertEquals(initialMetadata.getIndices().size() + 1, metadataAfterUpdate.getIndices().size()); + }); + } + + public void testIndexMetadataDeletedUpdatedAndAdded() throws IOException { // setup mockBlobStoreObjects(); - final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build(); - final ClusterState initialClusterState = ClusterState.builder(ClusterName.DEFAULT) - .metadata(Metadata.builder().coordinationMetadata(coordinationMetadata)) + + // Initial cluster state with index. + final ClusterState initialClusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); + remoteClusterStateService.start(); + final ClusterMetadataManifest initialManifest = remoteClusterStateService.writeFullMetadata(initialClusterState, "_na_"); + String initialIndex = "test-index"; + Index index1 = new Index("test-index-1", "index-uuid-1"); + Index index2 = new Index("test-index-2", "index-uuid-2"); + Settings idxSettings1 = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, index1.getUUID()) .build(); - final ClusterMetadataManifest initialManifest = ClusterMetadataManifest.builder() - .codecVersion(2) - .indices(Collections.emptyList()) + IndexMetadata indexMetadata1 = new IndexMetadata.Builder(index1.getName()).settings(idxSettings1) + .numberOfShards(1) + .numberOfReplicas(0) .build(); - remoteClusterStateService.start(); - - // Initial cluster state with global metadata. - final ClusterState clusterState = generateClusterStateWithGlobalMetadata().nodes(nodesWithLocalNodeClusterManager()).build(); - - // Updating remote cluster state with changing global metadata - final ClusterMetadataManifest manifestAfterGlobalMetadataUpdate = remoteClusterStateService.writeIncrementalMetadata( + Settings idxSettings2 = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, index2.getUUID()) + .build(); + IndexMetadata indexMetadata2 = new IndexMetadata.Builder(index2.getName()).settings(idxSettings2) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + ClusterState clusterState1 = ClusterState.builder(initialClusterState) + .metadata( + Metadata.builder(initialClusterState.getMetadata()) + .put(indexMetadata1, true) + .put(indexMetadata2, true) + .remove(initialIndex) + .build() + ) + .build(); + ClusterMetadataManifest manifest1 = remoteClusterStateService.writeIncrementalMetadata( initialClusterState, - clusterState, + clusterState1, initialManifest ); + // verify that initial index is removed, and new index are added + assertEquals(1, initialManifest.getIndices().size()); + assertEquals(2, manifest1.getIndices().size()); + assertTrue(initialManifest.getIndices().stream().anyMatch(indexMetadata -> indexMetadata.getIndexName().equals(initialIndex))); + assertFalse(manifest1.getIndices().stream().anyMatch(indexMetadata -> indexMetadata.getIndexName().equals(initialIndex))); + // update index1, index2 is unchanged + indexMetadata1 = new IndexMetadata.Builder(indexMetadata1).version(indexMetadata1.getVersion() + 1).build(); + ClusterState clusterState2 = ClusterState.builder(clusterState1) + .metadata(Metadata.builder(clusterState1.getMetadata()).put(indexMetadata1, true).build()) + .build(); + ClusterMetadataManifest manifest2 = remoteClusterStateService.writeIncrementalMetadata(clusterState1, clusterState2, manifest1); + // index1 is updated + assertEquals(2, manifest2.getIndices().size()); + assertEquals( + 1, + manifest2.getIndices().stream().filter(uploadedIndex -> uploadedIndex.getIndexName().equals(index1.getName())).count() + ); + assertNotEquals( + manifest2.getIndices() + .stream() + .filter(uploadedIndex -> uploadedIndex.getIndexName().equals(index1.getName())) + .findFirst() + .get() + .getUploadedFilename(), + manifest1.getIndices() + .stream() + .filter(uploadedIndex -> uploadedIndex.getIndexName().equals(index1.getName())) + .findFirst() + .get() + .getUploadedFilename() + ); + } - // new cluster state where only Index metadata is different - final IndexMetadata indexMetadata = new IndexMetadata.Builder("test").settings( - Settings.builder() - .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) - .put(IndexMetadata.SETTING_INDEX_UUID, "uuid") - .build() - ).numberOfShards(1).numberOfReplicas(0).build(); - Metadata newMetadata = Metadata.builder(clusterState.metadata()).put(indexMetadata, true).build(); - ClusterState newClusterState = ClusterState.builder(clusterState).metadata(newMetadata).build(); + private void verifyMetadataAttributeOnlyUpdated( + Function clusterStateUpdater, + BiConsumer assertions + ) throws IOException { + // setup + mockBlobStoreObjects(); - // updating remote cluster state with index metadata - final ClusterMetadataManifest manifestAfterIndexMetadataUpdate = remoteClusterStateService.writeIncrementalMetadata( - clusterState, - newClusterState, - manifestAfterGlobalMetadataUpdate - ); + // Initial cluster state with index. + final ClusterState initialClusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); + remoteClusterStateService.start(); + final ClusterMetadataManifest initialManifest = remoteClusterStateService.writeFullMetadata(initialClusterState, "_na_"); - // Verify that global metadata information is same in manifest files after updating index Metadata - // since timestamp is part of file name, if file name is same we can confirm that file is not update in index metadata update - assertThat( - manifestAfterIndexMetadataUpdate.getGlobalMetadataFileName(), - is(manifestAfterGlobalMetadataUpdate.getGlobalMetadataFileName()) - ); + ClusterState newClusterState = clusterStateUpdater.apply(initialClusterState); - // Index metadata would have changed - assertThat(manifestAfterGlobalMetadataUpdate.getIndices().size(), is(0)); - assertThat(manifestAfterIndexMetadataUpdate.getIndices().size(), is(1)); + // updating remote cluster state with global metadata + final ClusterMetadataManifest manifestAfterMetadataUpdate; + if (initialClusterState.term() == newClusterState.term()) { + manifestAfterMetadataUpdate = remoteClusterStateService.writeIncrementalMetadata( + initialClusterState, + newClusterState, + initialManifest + ); + } else { + manifestAfterMetadataUpdate = remoteClusterStateService.writeFullMetadata(newClusterState, initialClusterState.stateUUID()); + } + + assertions.accept(initialManifest, manifestAfterMetadataUpdate); } public void testReadLatestMetadataManifestFailedIOException() throws IOException { @@ -797,7 +1062,10 @@ public void testReadGlobalMetadata() throws IOException { .stateUUID("state-uuid") .clusterUUID("cluster-uuid") .codecVersion(MANIFEST_CURRENT_CODEC_VERSION) - .globalMetadataFileName("global-metadata-file") + .coordinationMetadata(new UploadedMetadataAttribute(COORDINATION_METADATA, "mock-coordination-file")) + .settingMetadata(new UploadedMetadataAttribute(SETTING_METADATA, "mock-setting-file")) + .templatesMetadata(new UploadedMetadataAttribute(TEMPLATES_METADATA, "mock-templates-file")) + .put(IndexGraveyard.TYPE, new UploadedMetadataAttribute(IndexGraveyard.TYPE, "mock-custom-" +IndexGraveyard.TYPE+ "-file")) .nodeId("nodeA") .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) .previousClusterUUID("prev-cluster-uuid") @@ -806,7 +1074,7 @@ public void testReadGlobalMetadata() throws IOException { Metadata expactedMetadata = Metadata.builder().persistentSettings(Settings.builder().put("readonly", true).build()).build(); mockBlobContainerForGlobalMetadata(mockBlobStoreObjects(), expectedManifest, expactedMetadata); - ClusterState newClusterState = remoteClusterStateService.getLatestClusterState( + ClusterState newClusterState = remoteClusterStateService.getLatestClusterState( clusterState.getClusterName().value(), clusterState.metadata().clusterUUID() ); @@ -832,7 +1100,7 @@ public void testReadGlobalMetadataIOException() throws IOException { .stateVersion(1L) .stateUUID("state-uuid") .clusterUUID("cluster-uuid") - .codecVersion(MANIFEST_CURRENT_CODEC_VERSION) + .codecVersion(ClusterMetadataManifest.CODEC_V1) .globalMetadataFileName(globalIndexMetadataName) .nodeId("nodeA") .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) @@ -1040,18 +1308,24 @@ public void testFileNames() { assertThat(splittedIndexMetadataFileName[1], is(RemoteStoreUtils.invertLong(indexMetadata.getVersion()))); assertThat(splittedIndexMetadataFileName[3], is(String.valueOf(INDEX_METADATA_CURRENT_CODEC_VERSION))); + verifyManifestFileNameWithCodec(MANIFEST_CURRENT_CODEC_VERSION); + verifyManifestFileNameWithCodec(ClusterMetadataManifest.CODEC_V1); + verifyManifestFileNameWithCodec(ClusterMetadataManifest.CODEC_V0); + } + + private void verifyManifestFileNameWithCodec(int codecVersion) { int term = randomIntBetween(5, 10); int version = randomIntBetween(5, 10); - String manifestFileName = RemoteClusterStateService.getManifestFileName(term, version, true); + String manifestFileName = RemoteClusterStateService.getManifestFileName(term, version, true, codecVersion); assertThat(manifestFileName.split(DELIMITER).length, is(6)); String[] splittedName = manifestFileName.split(DELIMITER); assertThat(splittedName[0], is(MANIFEST_FILE_PREFIX)); assertThat(splittedName[1], is(RemoteStoreUtils.invertLong(term))); assertThat(splittedName[2], is(RemoteStoreUtils.invertLong(version))); assertThat(splittedName[3], is("C")); - assertThat(splittedName[5], is(String.valueOf(MANIFEST_CURRENT_CODEC_VERSION))); + assertThat(splittedName[5], is(String.valueOf(codecVersion))); - manifestFileName = RemoteClusterStateService.getManifestFileName(term, version, false); + manifestFileName = RemoteClusterStateService.getManifestFileName(term, version, false, codecVersion); splittedName = manifestFileName.split(DELIMITER); assertThat(splittedName[3], is("P")); } @@ -1137,12 +1411,16 @@ private void mockObjectsForGettingPreviousClusterUUID( new UploadedIndexMetadata("index1", "index-uuid1", "key1"), new UploadedIndexMetadata("index2", "index-uuid2", "key2") ); + Map customMetadataMap = new HashMap<>(); final ClusterMetadataManifest clusterManifest1 = generateClusterMetadataManifest( "cluster-uuid1", clusterUUIDsPointers.get("cluster-uuid1"), randomAlphaOfLength(10), uploadedIndexMetadataList1, - "test-metadata1", + customMetadataMap, + new UploadedMetadataAttribute(COORDINATION_METADATA, "key3"), + new UploadedMetadataAttribute(SETTING_METADATA, "key4"), + new UploadedMetadataAttribute(TEMPLATES_METADATA, "key5"), clusterUUIDCommitted.getOrDefault("cluster-uuid1", true) ); Settings indexSettings = Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT).build(); @@ -1161,7 +1439,7 @@ private void mockObjectsForGettingPreviousClusterUUID( .build(); Map indexMetadataMap1 = Map.of("index-uuid1", indexMetadata1, "index-uuid2", indexMetadata2); mockBlobContainerForGlobalMetadata(blobContainer1, clusterManifest1, metadata1); - mockBlobContainer(blobContainer1, clusterManifest1, indexMetadataMap1, ClusterMetadataManifest.CODEC_V1); + mockBlobContainer(blobContainer1, clusterManifest1, indexMetadataMap1, ClusterMetadataManifest.CODEC_V2); List uploadedIndexMetadataList2 = List.of( new UploadedIndexMetadata("index1", "index-uuid1", "key1"), @@ -1172,7 +1450,10 @@ private void mockObjectsForGettingPreviousClusterUUID( clusterUUIDsPointers.get("cluster-uuid2"), randomAlphaOfLength(10), uploadedIndexMetadataList2, - "test-metadata2", + customMetadataMap, + new UploadedMetadataAttribute(COORDINATION_METADATA, "key3"), + new UploadedMetadataAttribute(SETTING_METADATA, "key4"), + new UploadedMetadataAttribute(TEMPLATES_METADATA, "key5"), clusterUUIDCommitted.getOrDefault("cluster-uuid2", true) ); IndexMetadata indexMetadata3 = IndexMetadata.builder("index1") @@ -1190,7 +1471,7 @@ private void mockObjectsForGettingPreviousClusterUUID( .build(); Map indexMetadataMap2 = Map.of("index-uuid1", indexMetadata3, "index-uuid2", indexMetadata4); mockBlobContainerForGlobalMetadata(blobContainer2, clusterManifest2, metadata2); - mockBlobContainer(blobContainer2, clusterManifest2, indexMetadataMap2, ClusterMetadataManifest.CODEC_V1); + mockBlobContainer(blobContainer2, clusterManifest2, indexMetadataMap2, ClusterMetadataManifest.CODEC_V2); // differGlobalMetadata controls which one of IndexMetadata or Metadata object would be different // when comparing cluster-uuid3 and cluster-uuid1 state. @@ -1212,17 +1493,19 @@ private void mockObjectsForGettingPreviousClusterUUID( Metadata metadata3 = Metadata.builder() .persistentSettings(Settings.builder().put(Metadata.SETTING_READ_ONLY_SETTING.getKey(), !differGlobalMetadata).build()) .build(); - final ClusterMetadataManifest clusterManifest3 = generateClusterMetadataManifest( "cluster-uuid3", clusterUUIDsPointers.get("cluster-uuid3"), randomAlphaOfLength(10), uploadedIndexMetadataList3, - "test-metadata3", + customMetadataMap, + new UploadedMetadataAttribute(COORDINATION_METADATA, "key3"), + new UploadedMetadataAttribute(SETTING_METADATA, "key4"), + new UploadedMetadataAttribute(TEMPLATES_METADATA, "key5"), clusterUUIDCommitted.getOrDefault("cluster-uuid3", true) ); mockBlobContainerForGlobalMetadata(blobContainer3, clusterManifest3, metadata3); - mockBlobContainer(blobContainer3, clusterManifest3, indexMetadataMap3, ClusterMetadataManifest.CODEC_V1); + mockBlobContainer(blobContainer3, clusterManifest3, indexMetadataMap3, ClusterMetadataManifest.CODEC_V2); ArrayList mockBlobContainerOrderedList = new ArrayList<>( List.of(blobContainer1, blobContainer1, blobContainer3, blobContainer3, blobContainer2, blobContainer2) @@ -1242,7 +1525,7 @@ private void mockObjectsForGettingPreviousClusterUUID( when(blobStoreRepository.getCompressor()).thenReturn(new DeflateCompressor()); } - private ClusterMetadataManifest generateClusterMetadataManifest( + private ClusterMetadataManifest generateV1ClusterMetadataManifest( String clusterUUID, String previousClusterUUID, String stateUUID, @@ -1266,6 +1549,36 @@ private ClusterMetadataManifest generateClusterMetadataManifest( .build(); } + private ClusterMetadataManifest generateClusterMetadataManifest( + String clusterUUID, + String previousClusterUUID, + String stateUUID, + List uploadedIndexMetadata, + Map customMetadataMap, + UploadedMetadataAttribute coordinationMetadata, + UploadedMetadataAttribute settingsMetadata, + UploadedMetadataAttribute templatesMetadata, + Boolean isUUIDCommitted + ) { + return ClusterMetadataManifest.builder() + .indices(uploadedIndexMetadata) + .clusterTerm(1L) + .stateVersion(1L) + .stateUUID(stateUUID) + .clusterUUID(clusterUUID) + .nodeId("nodeA") + .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) + .previousClusterUUID(previousClusterUUID) + .committed(true) + .clusterUUIDCommitted(isUUIDCommitted) + .coordinationMetadata(coordinationMetadata) + .settingMetadata(settingsMetadata) + .templatesMetadata(templatesMetadata) + .customMetadataMap(customMetadataMap) + .codecVersion(MANIFEST_CURRENT_CODEC_VERSION) + .build(); + } + private BlobContainer mockBlobStoreObjects() { return mockBlobStoreObjects(BlobContainer.class); } @@ -1305,7 +1618,7 @@ private void mockBlobContainer( int codecVersion ) throws IOException { String manifestFileName = codecVersion >= ClusterMetadataManifest.CODEC_V1 - ? "manifest__manifestFileName__abcd__abcd__abcd__1" + ? "manifest__manifestFileName__abcd__abcd__abcd__" + codecVersion : "manifestFileName"; BlobMetadata blobMetadata = new PlainBlobMetadata(manifestFileName, 1); when(blobContainer.listBlobsByPrefixInSortedOrder("manifest" + DELIMITER, 1, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC)) @@ -1346,7 +1659,8 @@ private void mockBlobContainerForGlobalMetadata( ClusterMetadataManifest clusterMetadataManifest, Metadata metadata ) throws IOException { - String mockManifestFileName = "manifest__1__2__C__456__1"; + int codecVersion = clusterMetadataManifest.getCodecVersion(); + String mockManifestFileName = "manifest__1__2__C__456__" + codecVersion; BlobMetadata blobMetadata = new PlainBlobMetadata(mockManifestFileName, 1); when( blobContainer.listBlobsByPrefixInSortedOrder( @@ -1363,19 +1677,84 @@ private void mockBlobContainerForGlobalMetadata( FORMAT_PARAMS ); when(blobContainer.readBlob(mockManifestFileName)).thenReturn(new ByteArrayInputStream(bytes.streamInput().readAllBytes())); + if (codecVersion >= ClusterMetadataManifest.CODEC_V2) { + String coordinationFileName = getFileNameFromPath(clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename()); + when(blobContainer.readBlob(RemoteClusterStateService.COORDINATION_METADATA_FORMAT.blobName(coordinationFileName))).thenAnswer( + (invocationOnMock) -> { + BytesReference bytesReference = RemoteClusterStateService.COORDINATION_METADATA_FORMAT.serialize( + metadata.coordinationMetadata(), + coordinationFileName, + blobStoreRepository.getCompressor(), + FORMAT_PARAMS + ); + return new ByteArrayInputStream(bytesReference.streamInput().readAllBytes()); + } + ); - String[] splitPath = clusterMetadataManifest.getGlobalMetadataFileName().split("/"); - when(blobContainer.readBlob(RemoteClusterStateService.GLOBAL_METADATA_FORMAT.blobName(splitPath[splitPath.length - 1]))).thenAnswer( - (invocationOnMock) -> { - BytesReference bytesGlobalMetadata = RemoteClusterStateService.GLOBAL_METADATA_FORMAT.serialize( - metadata, - "global-metadata-file", - blobStoreRepository.getCompressor(), - FORMAT_PARAMS + String settingsFileName = getFileNameFromPath(clusterMetadataManifest.getSettingsMetadata().getUploadedFilename()); + when(blobContainer.readBlob(RemoteClusterStateService.SETTINGS_METADATA_FORMAT.blobName(settingsFileName))).thenAnswer( + (invocationOnMock) -> { + BytesReference bytesReference = RemoteClusterStateService.SETTINGS_METADATA_FORMAT.serialize( + metadata.persistentSettings(), + settingsFileName, + blobStoreRepository.getCompressor(), + FORMAT_PARAMS + ); + return new ByteArrayInputStream(bytesReference.streamInput().readAllBytes()); + } + ); + + String templatesFileName = getFileNameFromPath(clusterMetadataManifest.getTemplatesMetadata().getUploadedFilename()); + when(blobContainer.readBlob(RemoteClusterStateService.TEMPLATES_METADATA_FORMAT.blobName(templatesFileName))).thenAnswer( + (invocationOnMock) -> { + BytesReference bytesReference = RemoteClusterStateService.TEMPLATES_METADATA_FORMAT.serialize( + metadata.templatesMetadata(), + templatesFileName, + blobStoreRepository.getCompressor(), + FORMAT_PARAMS + ); + return new ByteArrayInputStream(bytesReference.streamInput().readAllBytes()); + } + ); + + Map customFileMap = clusterMetadataManifest.getCustomMetadataMap() + .entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, entry -> getFileNameFromPath(entry.getValue().getUploadedFilename()))); + + for (Map.Entry entry : customFileMap.entrySet()) { + String custom = entry.getKey(); + String fileName = entry.getValue(); + when(blobContainer.readBlob(RemoteClusterStateService.CUSTOM_METADATA_FORMAT.blobName(fileName))).thenAnswer( + (invocation) -> { + BytesReference bytesReference = RemoteClusterStateService.CUSTOM_METADATA_FORMAT.serialize( + metadata.custom(custom), + fileName, + blobStoreRepository.getCompressor(), + FORMAT_PARAMS + ); + return new ByteArrayInputStream(bytesReference.streamInput().readAllBytes()); + } ); - return new ByteArrayInputStream(bytesGlobalMetadata.streamInput().readAllBytes()); } - ); + } else if (codecVersion == ClusterMetadataManifest.CODEC_V1) { + String[] splitPath = clusterMetadataManifest.getGlobalMetadataFileName().split("/"); + when(blobContainer.readBlob(RemoteClusterStateService.GLOBAL_METADATA_FORMAT.blobName(splitPath[splitPath.length - 1]))) + .thenAnswer((invocationOnMock) -> { + BytesReference bytesGlobalMetadata = RemoteClusterStateService.GLOBAL_METADATA_FORMAT.serialize( + metadata, + "global-metadata-file", + blobStoreRepository.getCompressor(), + FORMAT_PARAMS + ); + return new ByteArrayInputStream(bytesGlobalMetadata.streamInput().readAllBytes()); + }); + } + } + + private String getFileNameFromPath(String filePath) { + String[] splitPath = filePath.split("/"); + return splitPath[splitPath.length - 1]; } private static ClusterState.Builder generateClusterStateWithGlobalMetadata() { @@ -1405,7 +1784,9 @@ static ClusterState.Builder generateClusterStateWithOneIndex() { .numberOfReplicas(0) .build(); final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build(); - + final Settings settings = Settings.builder().put("mock-settings", true).build(); + final TemplatesMetadata templatesMetadata = TemplatesMetadata.EMPTY_METADATA; + final CustomMetadata1 customMetadata1 = new CustomMetadata1("custom-metadata-1"); return ClusterState.builder(ClusterName.DEFAULT) .version(1L) .stateUUID("state-uuid") @@ -1415,6 +1796,9 @@ static ClusterState.Builder generateClusterStateWithOneIndex() { .put(indexMetadata, true) .clusterUUID("cluster-uuid") .coordinationMetadata(coordinationMetadata) + .persistentSettings(settings) + .templates(templatesMetadata) + .putCustom(customMetadata1.getWriteableName(), customMetadata1) .build() ); } @@ -1423,4 +1807,27 @@ static DiscoveryNodes nodesWithLocalNodeClusterManager() { return DiscoveryNodes.builder().clusterManagerNodeId("cluster-manager-id").localNodeId("cluster-manager-id").build(); } + private static class CustomMetadata1 extends TestCustomMetadata { + public static final String TYPE = "custom_md_1"; + + CustomMetadata1(String data) { + super(data); + } + + @Override + public String getWriteableName() { + return TYPE; + } + + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT; + } + + @Override + public EnumSet context() { + return EnumSet.of(Metadata.XContentContext.GATEWAY); + } + } + }