diff --git a/server/src/main/java/org/opensearch/cluster/store/ClusterMetadataMarker.java b/server/src/main/java/org/opensearch/cluster/store/ClusterMetadataMarker.java index 07fd3cc891b8d..8028aa2035d63 100644 --- a/server/src/main/java/org/opensearch/cluster/store/ClusterMetadataMarker.java +++ b/server/src/main/java/org/opensearch/cluster/store/ClusterMetadataMarker.java @@ -1,23 +1,20 @@ package org.opensearch.cluster.store; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.function.Function; -import java.util.stream.Collectors; import org.opensearch.core.ParseField; +import org.opensearch.core.common.Strings; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.xcontent.ConstructingObjectParser; +import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.core.xcontent.XContentParser.Token; /** * Marker file which contains the details of the uploaded entity metadata @@ -32,8 +29,8 @@ public class ClusterMetadataMarker implements Writeable, ToXContentFragment { private static final ParseField CLUSTER_UUID_FIELD = new ParseField("cluster_uuid"); private static final ParseField STATE_UUID_FIELD = new ParseField("state_uuid"); - private static List indices(Object[] fields) { - return new ArrayList<>((List) fields[0]); + private static Map indices(Object[] fields) { + return (Map) fields[0]; } private static long term(Object[] fields) { @@ -52,13 +49,18 @@ private static String stateUUID(Object[] fields) { return (String) fields[4]; } - private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( - "cluster_metadata_marker", - fields -> new ClusterMetadataMarker(indices(fields), term(fields), version(fields), clusterUUID(fields), stateUUID(fields)) - ); + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("cluster_metadata_marker", + fields -> new ClusterMetadataMarker(indices(fields), term(fields), version(fields), clusterUUID(fields), stateUUID(fields))); static { - PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), (p, c) -> UploadedIndexMetadata.fromXContent(p), INDICES_FIELD); + PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> { + Map uploadMetadataMap = new HashMap<>(); + while (p.nextToken() != Token.END_OBJECT) { + UploadedIndexMetadata uploadMetadata = UploadedIndexMetadata.fromXContent(p); + uploadMetadataMap.put(uploadMetadata.getIndexName(), uploadMetadata); + } + return uploadMetadataMap; + }, INDICES_FIELD); PARSER.declareLong(ConstructingObjectParser.constructorArg(), TERM_FIELD); PARSER.declareLong(ConstructingObjectParser.constructorArg(), VERSION_FIELD); PARSER.declareString(ConstructingObjectParser.constructorArg(), CLUSTER_UUID_FIELD); @@ -99,33 +101,23 @@ public ClusterMetadataMarker(Map indices, long te this.stateUUID = stateUUID; } - public ClusterMetadataMarker(List indices, long term, long version, String clusterUUID, String stateUUID) { - this.indices = Collections.unmodifiableMap(toMap(indices)); - this.term = term; - this.version = version; - this.clusterUUID = clusterUUID; - this.stateUUID = stateUUID; - } - @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startArray(INDICES_FIELD.getPreferredName()); + builder.startObject(INDICES_FIELD.getPreferredName()); { for (UploadedIndexMetadata uploadedIndexMetadata : indices.values()) { uploadedIndexMetadata.toXContent(builder, params); } } - builder.endArray(); - builder.field(TERM_FIELD.getPreferredName(), getTerm()) - .field(VERSION_FIELD.getPreferredName(), getVersion()) - .field(CLUSTER_UUID_FIELD.getPreferredName(), getClusterUUID()) - .field(STATE_UUID_FIELD.getPreferredName(), getStateUUID()); + builder.endObject(); + builder.field(TERM_FIELD.getPreferredName(), getTerm()).field(VERSION_FIELD.getPreferredName(), getVersion()) + .field(CLUSTER_UUID_FIELD.getPreferredName(), getClusterUUID()).field(STATE_UUID_FIELD.getPreferredName(), getStateUUID()); return builder; } @Override public void writeTo(StreamOutput out) throws IOException { - out.writeCollection(indices.values()); + out.writeMap(indices, StreamOutput::writeString, (stream, uploadedMetadata) -> uploadedMetadata.writeTo(stream)); out.writeVLong(term); out.writeVLong(version); out.writeString(clusterUUID); @@ -150,16 +142,13 @@ public int hashCode() { return Objects.hash(indices, term, version, clusterUUID, stateUUID); } - public static ClusterMetadataMarker fromXContent(XContentParser parser) throws IOException { - return PARSER.parse(parser, null); + @Override + public String toString() { + return Strings.toString(MediaTypeRegistry.JSON, this); } - private static Map toMap(final Collection uploadedIndexMetadataList) { - // use a linked hash map to preserve order - return uploadedIndexMetadataList.stream().collect(Collectors.toMap(UploadedIndexMetadata::getIndexName, Function.identity(), (left, right) -> { - assert left.getIndexName().equals(right.getIndexName()) : "expected [" + left.getIndexName() + "] to equal [" + right.getIndexName() + "]"; - throw new IllegalStateException("duplicate index name [" + left.getIndexName() + "]"); - }, LinkedHashMap::new)); + public static ClusterMetadataMarker fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); } /** @@ -262,15 +251,15 @@ public String getIndexUUID() { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - return builder.startObject() - .field(INDEX_NAME_FIELD.getPreferredName(), getIndexName()) - .field(INDEX_UUID_FIELD.getPreferredName(), getIndexUUID()) - .field(UPLOADED_FILENAME_FIELD.getPreferredName(), getUploadedFilename()) + return builder.startObject(getIndexName()).field(INDEX_NAME_FIELD.getPreferredName(), getIndexName()) + .field(INDEX_UUID_FIELD.getPreferredName(), getIndexUUID()).field(UPLOADED_FILENAME_FIELD.getPreferredName(), getUploadedFilename()) .endObject(); } @Override public void writeTo(StreamOutput out) throws IOException { + out.writeString(indexName); + out.writeString(indexUUID); out.writeString(uploadedFilename); } @@ -292,6 +281,11 @@ public int hashCode() { return Objects.hash(indexName, indexUUID, uploadedFilename); } + @Override + public String toString() { + return Strings.toString(MediaTypeRegistry.JSON, this); + } + public static UploadedIndexMetadata fromXContent(XContentParser parser) throws IOException { return PARSER.parse(parser, null); } diff --git a/server/src/main/java/org/opensearch/cluster/store/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/cluster/store/RemoteClusterStateService.java index ff88a4d4f08bd..535c011dd7a98 100644 --- a/server/src/main/java/org/opensearch/cluster/store/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/cluster/store/RemoteClusterStateService.java @@ -16,6 +16,7 @@ import org.opensearch.indices.IndicesService; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; +import org.opensearch.repositories.RepositoryMissingException; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; @@ -55,7 +56,7 @@ public RemoteClusterStateService(Supplier repositoriesServi } public ClusterMetadataMarker writeFullMetadata(long currentTerm, ClusterState clusterState) throws IOException { - if (!clusterState.nodes().isLocalNodeElectedClusterManager()) { + if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) { logger.error("Local node is not elected cluster manager. Exiting"); return null; } @@ -85,19 +86,20 @@ private void setRepository() { return; } if (IndicesService.CLUSTER_REMOTE_STORE_ENABLED_SETTING.get(settings)) { - String remoteStoreRepo = CLUSTER_REMOTE_STATE_REPOSITORY_SETTING.get(settings); - Repository repository = repositoriesService.get().repository(remoteStoreRepo); + final String remoteStoreRepo = CLUSTER_REMOTE_STATE_REPOSITORY_SETTING.get(settings); + final Repository repository = repositoriesService.get().repository(remoteStoreRepo); assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository"; blobStoreRepository = (BlobStoreRepository) repository; } else { logger.info("remote store is not enabled"); } - } catch (Exception e) { - logger.error("set repo exception", e); + } catch (RepositoryMissingException e) { + logger.error("Remote state repository is missing", e); } } - public ClusterMetadataMarker writeIncrementalMetadata(long currentTerm, ClusterState previousClusterState, ClusterState clusterState, ClusterMetadataMarker previousMarker) throws IOException { + public ClusterMetadataMarker writeIncrementalMetadata(long currentTerm, ClusterState previousClusterState, ClusterState clusterState, + ClusterMetadataMarker previousMarker) throws IOException { assert previousClusterState.metadata().coordinationMetadata().term() == clusterState.metadata().coordinationMetadata().term(); final Map indexMetadataVersionByName = new HashMap<>(); for (final IndexMetadata indexMetadata : previousClusterState.metadata().indices().values()) { @@ -108,15 +110,14 @@ public ClusterMetadataMarker writeIncrementalMetadata(long currentTerm, ClusterS int numIndicesUnchanged = 0; final Map allUploadedIndexMetadata = new HashMap<>(previousMarker.getIndices()); for (final IndexMetadata indexMetadata : clusterState.metadata().indices().values()) { - // Is it okay to use indexName as key ? final Long previousVersion = indexMetadataVersionByName.get(indexMetadata.getIndex().getName()); if (previousVersion == null || indexMetadata.getVersion() != previousVersion) { logger.trace("updating metadata for [{}], changing version from [{}] to [{}]", indexMetadata.getIndex(), previousVersion, indexMetadata.getVersion()); numIndicesUpdated++; - String indexMetadataKey = writeIndexMetadata(clusterState.getClusterName().value(), clusterState.getMetadata().clusterUUID(), + final String indexMetadataKey = writeIndexMetadata(clusterState.getClusterName().value(), clusterState.getMetadata().clusterUUID(), indexMetadata, indexMetadataFileName(indexMetadata)); - UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata(indexMetadata.getIndex().getName(), indexMetadata.getIndexUUID(), + final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata(indexMetadata.getIndex().getName(), indexMetadata.getIndexUUID(), indexMetadataKey); allUploadedIndexMetadata.put(indexMetadata.getIndex().getName(), uploadedIndexMetadata); } else { @@ -136,11 +137,11 @@ public ClusterState getLatestClusterState(String clusterUUID) { return null; } - //todo exception handling - public ClusterMetadataMarker uploadMarker(ClusterState clusterState, Map uploadedIndexMetadata) throws IOException { + public ClusterMetadataMarker uploadMarker(ClusterState clusterState, Map uploadedIndexMetadata) + throws IOException { synchronized (this) { - String markerFileName = getMarkerFileName(clusterState.term(), clusterState.version()); - ClusterMetadataMarker marker = new ClusterMetadataMarker(uploadedIndexMetadata, clusterState.term(), clusterState.getVersion(), + final String markerFileName = getMarkerFileName(clusterState.term(), clusterState.version()); + final ClusterMetadataMarker marker = new ClusterMetadataMarker(uploadedIndexMetadata, clusterState.term(), clusterState.getVersion(), clusterState.metadata().clusterUUID(), clusterState.stateUUID()); writeMetadataMarker(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), marker, markerFileName); @@ -149,28 +150,17 @@ public ClusterMetadataMarker uploadMarker(ClusterState clusterState, Map