Skip to content

Commit

Permalink
Add POJO classes required for cluster state publication from remote (o…
Browse files Browse the repository at this point in the history
…pensearch-project#14006) (opensearch-project#14172)

* Add POJO classes required for cluster state publication from remote
* Use InputStreams rather than XContent for serialization for ehpemeral objects
Signed-off-by: Shivansh Arora <[email protected]>

* Add remote routing table changes in diff Manifest
Signed-off-by: Arpit Bandejiya <[email protected]>

(cherry picked from commit a9d2050)
Signed-off-by: kkewwei <[email protected]>
  • Loading branch information
shiv0408 authored and kkewwei committed Jul 24, 2024
1 parent cecc5f9 commit c550d74
Show file tree
Hide file tree
Showing 26 changed files with 3,399 additions and 233 deletions.
16 changes: 16 additions & 0 deletions server/src/main/java/org/opensearch/cluster/DiffableUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,18 @@ public void writeDiff(Diff<V> value, StreamOutput out) throws IOException {
* @opensearch.internal
*/
public abstract static class NonDiffableValueSerializer<K, V> implements ValueSerializer<K, V> {
private static final NonDiffableValueSerializer ABSTRACT_INSTANCE = new NonDiffableValueSerializer<>() {
@Override
public void write(Object value, StreamOutput out) {
throw new UnsupportedOperationException();
}

@Override
public Object read(StreamInput in, Object key) {
throw new UnsupportedOperationException();
}
};

@Override
public boolean supportsDiffableValues() {
return false;
Expand All @@ -513,6 +525,10 @@ public void writeDiff(Diff<V> value, StreamOutput out) throws IOException {
public Diff<V> readDiff(StreamInput in, K key) throws IOException {
throw new UnsupportedOperationException();
}

public static <K, V> NonDiffableValueSerializer<K, V> getAbstractInstance() {
return ABSTRACT_INSTANCE;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public static DiffableStringMap readFrom(StreamInput in) throws IOException {
return map.isEmpty() ? EMPTY : new DiffableStringMap(map);
}

DiffableStringMap(final Map<String, String> map) {
public DiffableStringMap(final Map<String, String> map) {
this.innerMap = Collections.unmodifiableMap(map);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -970,6 +970,10 @@ public static boolean isSettingsMetadataEqual(Metadata metadata1, Metadata metad
return metadata1.persistentSettings.equals(metadata2.persistentSettings);
}

public static boolean isTransientSettingsMetadataEqual(Metadata metadata1, Metadata metadata2) {
return metadata1.transientSettings.equals(metadata2.transientSettings);
}

public static boolean isTemplatesMetadataEqual(Metadata metadata1, Metadata metadata2) {
return metadata1.templates.equals(metadata2.templates);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* Interface for RemoteRoutingTableService. Exposes methods to orchestrate upload and download of routing table from remote store.
*/
public interface RemoteRoutingTableService extends LifecycleComponent {
static final DiffableUtils.NonDiffableValueSerializer<String, IndexRoutingTable> CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER =
public static final DiffableUtils.NonDiffableValueSerializer<String, IndexRoutingTable> CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER =
new DiffableUtils.NonDiffableValueSerializer<String, IndexRoutingTable>() {
@Override
public void write(IndexRoutingTable value, StreamOutput out) throws IOException {
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote;

import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterState.Custom;
import org.opensearch.cluster.block.ClusterBlocks;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.compress.Compressor;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.gateway.remote.model.RemoteClusterBlocks;
import org.opensearch.gateway.remote.model.RemoteClusterStateBlobStore;
import org.opensearch.gateway.remote.model.RemoteClusterStateCustoms;
import org.opensearch.gateway.remote.model.RemoteDiscoveryNodes;
import org.opensearch.gateway.remote.model.RemoteReadResult;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
* A Manager which provides APIs to upload and download attributes of ClusterState to the {@link RemoteClusterStateBlobStore}
*
* @opensearch.internal
*/
public class RemoteClusterStateAttributesManager {
public static final String CLUSTER_STATE_ATTRIBUTE = "cluster_state_attribute";
public static final String DISCOVERY_NODES = "nodes";
public static final String CLUSTER_BLOCKS = "blocks";
public static final int CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION = 1;
private final RemoteClusterStateBlobStore<ClusterBlocks, RemoteClusterBlocks> clusterBlocksBlobStore;
private final RemoteClusterStateBlobStore<DiscoveryNodes, RemoteDiscoveryNodes> discoveryNodesBlobStore;
private final RemoteClusterStateBlobStore<Custom, RemoteClusterStateCustoms> customsBlobStore;
private final Compressor compressor;
private final NamedXContentRegistry namedXContentRegistry;
private final NamedWriteableRegistry namedWriteableRegistry;

RemoteClusterStateAttributesManager(
RemoteClusterStateBlobStore<ClusterBlocks, RemoteClusterBlocks> clusterBlocksBlobStore,
RemoteClusterStateBlobStore<DiscoveryNodes, RemoteDiscoveryNodes> discoveryNodesBlobStore,
RemoteClusterStateBlobStore<Custom, RemoteClusterStateCustoms> customsBlobStore,
Compressor compressor,
NamedXContentRegistry namedXContentRegistry,
NamedWriteableRegistry namedWriteableRegistry
) {
this.clusterBlocksBlobStore = clusterBlocksBlobStore;
this.discoveryNodesBlobStore = discoveryNodesBlobStore;
this.customsBlobStore = customsBlobStore;
this.compressor = compressor;
this.namedXContentRegistry = namedXContentRegistry;
this.namedWriteableRegistry = namedWriteableRegistry;
}

/**
* Allows async upload of Cluster State Attribute components to remote
*/
CheckedRunnable<IOException> getAsyncMetadataWriteAction(
String component,
AbstractRemoteWritableBlobEntity blobEntity,
RemoteClusterStateBlobStore remoteEntityStore,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
) {
return () -> remoteEntityStore.writeAsync(blobEntity, getActionListener(component, blobEntity, latchedActionListener));
}

private ActionListener<Void> getActionListener(
String component,
AbstractRemoteWritableBlobEntity remoteObject,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
) {
return ActionListener.wrap(
resp -> latchedActionListener.onResponse(remoteObject.getUploadedMetadata()),
ex -> latchedActionListener.onFailure(new RemoteStateTransferException(component, remoteObject, ex))
);
}

public CheckedRunnable<IOException> getAsyncMetadataReadAction(
String component,
AbstractRemoteWritableBlobEntity blobEntity,
RemoteClusterStateBlobStore remoteEntityStore,
LatchedActionListener<RemoteReadResult> listener
) {
final ActionListener actionListener = ActionListener.wrap(
response -> listener.onResponse(new RemoteReadResult((ToXContent) response, CLUSTER_STATE_ATTRIBUTE, component)),
listener::onFailure
);
return () -> remoteEntityStore.readAsync(blobEntity, actionListener);
}

public Map<String, ClusterState.Custom> getUpdatedCustoms(ClusterState clusterState, ClusterState previousClusterState) {
Map<String, ClusterState.Custom> updatedCustoms = new HashMap<>();
Set<String> currentCustoms = new HashSet<>(clusterState.customs().keySet());
for (Map.Entry<String, ClusterState.Custom> entry : previousClusterState.customs().entrySet()) {
if (currentCustoms.contains(entry.getKey()) && !entry.getValue().equals(clusterState.customs().get(entry.getKey()))) {
updatedCustoms.put(entry.getKey(), clusterState.customs().get(entry.getKey()));
}
currentCustoms.remove(entry.getKey());
}
for (String custom : currentCustoms) {
updatedCustoms.put(custom, clusterState.customs().get(custom));
}
return updatedCustoms;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@

import static java.util.Objects.requireNonNull;
import static org.opensearch.gateway.PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD;
import static org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest.MANIFEST_CURRENT_CODEC_VERSION;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;

/**
Expand Down Expand Up @@ -171,12 +172,6 @@ public class RemoteClusterStateService implements Closeable {
/**
* Manifest format compatible with codec v2, where global metadata file is replaced with multiple metadata attribute files
*/
public static final ChecksumBlobStoreFormat<ClusterMetadataManifest> CLUSTER_METADATA_MANIFEST_FORMAT_V2 =
new ChecksumBlobStoreFormat<>("cluster-metadata-manifest", METADATA_MANIFEST_NAME_FORMAT, ClusterMetadataManifest::fromXContentV2);

/**
* Manifest format compatible with codec v3, where global metadata file is replaced with multiple metadata attribute files
*/
public static final ChecksumBlobStoreFormat<ClusterMetadataManifest> CLUSTER_METADATA_MANIFEST_FORMAT = new ChecksumBlobStoreFormat<>(
"cluster-metadata-manifest",
METADATA_MANIFEST_NAME_FORMAT,
Expand Down Expand Up @@ -226,7 +221,6 @@ public class RemoteClusterStateService implements Closeable {
+ "indices, coordination metadata updated : [{}], settings metadata updated : [{}], templates metadata "
+ "updated : [{}], custom metadata updated : [{}], indices routing updated : [{}]";
public static final int INDEX_METADATA_CURRENT_CODEC_VERSION = 1;
public static final int MANIFEST_CURRENT_CODEC_VERSION = ClusterMetadataManifest.CODEC_V3;
public static final int GLOBAL_METADATA_CURRENT_CODEC_VERSION = 2;

// ToXContent Params with gateway mode.
Expand Down Expand Up @@ -836,26 +830,25 @@ private RemoteClusterStateManifestInfo uploadManifest(
committed,
MANIFEST_CURRENT_CODEC_VERSION
);
final ClusterMetadataManifest manifest = new ClusterMetadataManifest(
clusterState.term(),
clusterState.getVersion(),
clusterState.metadata().clusterUUID(),
clusterState.stateUUID(),
Version.CURRENT,
nodeId,
committed,
MANIFEST_CURRENT_CODEC_VERSION,
null,
uploadedIndexMetadata,
previousClusterUUID,
clusterState.metadata().clusterUUIDCommitted(),
uploadedCoordinationMetadata,
uploadedSettingsMetadata,
uploadedTemplatesMetadata,
uploadedCustomMetadataMap,
clusterState.routingTable().version(),
uploadedIndicesRouting
);
final ClusterMetadataManifest manifest = ClusterMetadataManifest.builder()
.clusterTerm(clusterState.term())
.stateVersion(clusterState.getVersion())
.clusterUUID(clusterState.metadata().clusterUUID())
.stateUUID(clusterState.stateUUID())
.opensearchVersion(Version.CURRENT)
.nodeId(nodeId)
.committed(committed)
.codecVersion(MANIFEST_CURRENT_CODEC_VERSION)
.indices(uploadedIndexMetadata)
.previousClusterUUID(previousClusterUUID)
.clusterUUIDCommitted(clusterState.metadata().clusterUUIDCommitted())
.coordinationMetadata(uploadedCoordinationMetadata)
.settingMetadata(uploadedSettingsMetadata)
.templatesMetadata(uploadedTemplatesMetadata)
.customMetadataMap(uploadedCustomMetadataMap)
.routingTableVersion(clusterState.routingTable().version())
.indicesRouting(uploadedIndicesRouting)
.build();
writeMetadataManifest(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), manifest, manifestFileName);
return new RemoteClusterStateManifestInfo(manifest, manifestFileName);
}
Expand Down Expand Up @@ -1540,8 +1533,6 @@ private ChecksumBlobStoreFormat<ClusterMetadataManifest> getClusterMetadataManif
long codecVersion = getManifestCodecVersion(fileName);
if (codecVersion == MANIFEST_CURRENT_CODEC_VERSION) {
return CLUSTER_METADATA_MANIFEST_FORMAT;
} else if (codecVersion == ClusterMetadataManifest.CODEC_V2) {
return CLUSTER_METADATA_MANIFEST_FORMAT_V2;
} else if (codecVersion == ClusterMetadataManifest.CODEC_V1) {
return CLUSTER_METADATA_MANIFEST_FORMAT_V1;
} else if (codecVersion == ClusterMetadataManifest.CODEC_V0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,17 @@
package org.opensearch.gateway.remote;

import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.repositories.blobstore.BlobStoreRepository;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;

/**
Expand All @@ -21,19 +28,99 @@
public class RemoteClusterStateUtils {

public static final String DELIMITER = "__";
public static final String PATH_DELIMITER = "/";
public static final String GLOBAL_METADATA_PATH_TOKEN = "global-metadata";
public static final String METADATA_NAME_FORMAT = "%s.dat";
public static final String METADATA_NAME_PLAIN_FORMAT = "%s";
public static final String CLUSTER_STATE_PATH_TOKEN = "cluster-state";
public static final String GLOBAL_METADATA_PATH_TOKEN = "global-metadata";
public static final String CLUSTER_STATE_EPHEMERAL_PATH_TOKEN = "ephemeral";
public static final int GLOBAL_METADATA_CURRENT_CODEC_VERSION = 1;
public static final String CUSTOM_DELIMITER = "--";
public static final String PATH_DELIMITER = "/";
public static final String METADATA_NAME_PLAIN_FORMAT = "%s";

// ToXContent Params with gateway mode.
// We are using gateway context mode to persist all custom metadata.
public static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(
Map.of(Metadata.CONTEXT_MODE_PARAM, Metadata.CONTEXT_MODE_GATEWAY)
);

public static BlobPath getCusterMetadataBasePath(BlobStoreRepository blobStoreRepository, String clusterName, String clusterUUID) {
return blobStoreRepository.basePath().add(encodeString(clusterName)).add(CLUSTER_STATE_PATH_TOKEN).add(clusterUUID);
}

public static String encodeString(String content) {
return Base64.getUrlEncoder().withoutPadding().encodeToString(content.getBytes(StandardCharsets.UTF_8));
}

public static String getFormattedFileName(String fileName, int codecVersion) {
if (codecVersion < ClusterMetadataManifest.CODEC_V2) {
return String.format(Locale.ROOT, METADATA_NAME_FORMAT, fileName);
}
return fileName;
}

static BlobContainer clusterUUIDContainer(BlobStoreRepository blobStoreRepository, String clusterName) {
return blobStoreRepository.blobStore()
.blobContainer(
blobStoreRepository.basePath()
.add(Base64.getUrlEncoder().withoutPadding().encodeToString(clusterName.getBytes(StandardCharsets.UTF_8)))
.add(CLUSTER_STATE_PATH_TOKEN)
);
}

/**
* Container class to keep metadata of all uploaded attributes
*/
public static class UploadedMetadataResults {
List<ClusterMetadataManifest.UploadedIndexMetadata> uploadedIndexMetadata;
Map<String, ClusterMetadataManifest.UploadedMetadataAttribute> uploadedCustomMetadataMap;
Map<String, ClusterMetadataManifest.UploadedMetadataAttribute> uploadedClusterStateCustomMetadataMap;
ClusterMetadataManifest.UploadedMetadataAttribute uploadedCoordinationMetadata;
ClusterMetadataManifest.UploadedMetadataAttribute uploadedSettingsMetadata;
ClusterMetadataManifest.UploadedMetadataAttribute uploadedTransientSettingsMetadata;
ClusterMetadataManifest.UploadedMetadataAttribute uploadedTemplatesMetadata;
ClusterMetadataManifest.UploadedMetadataAttribute uploadedDiscoveryNodes;
ClusterMetadataManifest.UploadedMetadataAttribute uploadedClusterBlocks;
List<ClusterMetadataManifest.UploadedIndexMetadata> uploadedIndicesRoutingMetadata;
ClusterMetadataManifest.UploadedMetadataAttribute uploadedHashesOfConsistentSettings;

public UploadedMetadataResults(
List<ClusterMetadataManifest.UploadedIndexMetadata> uploadedIndexMetadata,
Map<String, ClusterMetadataManifest.UploadedMetadataAttribute> uploadedCustomMetadataMap,
ClusterMetadataManifest.UploadedMetadataAttribute uploadedCoordinationMetadata,
ClusterMetadataManifest.UploadedMetadataAttribute uploadedSettingsMetadata,
ClusterMetadataManifest.UploadedMetadataAttribute uploadedTransientSettingsMetadata,
ClusterMetadataManifest.UploadedMetadataAttribute uploadedTemplatesMetadata,
ClusterMetadataManifest.UploadedMetadataAttribute uploadedDiscoveryNodes,
ClusterMetadataManifest.UploadedMetadataAttribute uploadedClusterBlocks,
List<ClusterMetadataManifest.UploadedIndexMetadata> uploadedIndicesRoutingMetadata,
ClusterMetadataManifest.UploadedMetadataAttribute uploadedHashesOfConsistentSettings,
Map<String, ClusterMetadataManifest.UploadedMetadataAttribute> uploadedClusterStateCustomMap
) {
this.uploadedIndexMetadata = uploadedIndexMetadata;
this.uploadedCustomMetadataMap = uploadedCustomMetadataMap;
this.uploadedCoordinationMetadata = uploadedCoordinationMetadata;
this.uploadedSettingsMetadata = uploadedSettingsMetadata;
this.uploadedTransientSettingsMetadata = uploadedTransientSettingsMetadata;
this.uploadedTemplatesMetadata = uploadedTemplatesMetadata;
this.uploadedDiscoveryNodes = uploadedDiscoveryNodes;
this.uploadedClusterBlocks = uploadedClusterBlocks;
this.uploadedIndicesRoutingMetadata = uploadedIndicesRoutingMetadata;
this.uploadedHashesOfConsistentSettings = uploadedHashesOfConsistentSettings;
this.uploadedClusterStateCustomMetadataMap = uploadedClusterStateCustomMap;
}

public UploadedMetadataResults() {
this.uploadedIndexMetadata = new ArrayList<>();
this.uploadedCustomMetadataMap = new HashMap<>();
this.uploadedCoordinationMetadata = null;
this.uploadedSettingsMetadata = null;
this.uploadedTransientSettingsMetadata = null;
this.uploadedTemplatesMetadata = null;
this.uploadedDiscoveryNodes = null;
this.uploadedClusterBlocks = null;
this.uploadedIndicesRoutingMetadata = new ArrayList<>();
this.uploadedHashesOfConsistentSettings = null;
this.uploadedClusterStateCustomMetadataMap = new HashMap<>();
}
}
}
Loading

0 comments on commit c550d74

Please sign in to comment.