Skip to content

Commit

Permalink
Add POJO classes required for cluster state publication from remote
Browse files Browse the repository at this point in the history
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
shiv0408 committed Jun 8, 2024
1 parent 49282c1 commit 255075c
Show file tree
Hide file tree
Showing 10 changed files with 1,505 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -973,6 +973,10 @@ public static boolean isSettingsMetadataEqual(Metadata metadata1, Metadata metad
return metadata1.persistentSettings.equals(metadata2.persistentSettings);
}

public static boolean isTransientSettingsMetadataEqual(Metadata metadata1, Metadata metadata2) {
return metadata1.transientSettings.equals(metadata2.transientSettings);
}

public static boolean isTemplatesMetadataEqual(Metadata metadata1, Metadata metadata2) {
return metadata1.templates.equals(metadata2.templates);
}
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote;

import java.io.IOException;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterState.Custom;
import org.opensearch.cluster.block.ClusterBlocks;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.compress.Compressor;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.gateway.remote.RemoteClusterStateUtils.RemoteStateTransferException;
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
import org.opensearch.gateway.remote.model.RemoteClusterStateBlobStore;
import org.opensearch.gateway.remote.model.RemoteClusterBlocks;
import org.opensearch.gateway.remote.model.RemoteClusterStateCustoms;
import org.opensearch.gateway.remote.model.RemoteDiscoveryNodes;
import org.opensearch.gateway.remote.model.RemoteReadResult;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import static org.opensearch.gateway.remote.model.RemoteClusterStateCustoms.CLUSTER_STATE_CUSTOM;
import static org.opensearch.gateway.remote.model.RemoteCustomMetadata.CUSTOM_DELIMITER;

public class RemoteClusterStateAttributesManager {
public static final String CLUSTER_STATE_ATTRIBUTE = "cluster_state_attribute";
public static final String DISCOVERY_NODES = "nodes";
public static final String CLUSTER_BLOCKS = "blocks";
public static final int CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION = 1;
private final RemoteClusterStateBlobStore<ClusterBlocks, RemoteClusterBlocks> clusterBlocksBlobStore;
private final RemoteClusterStateBlobStore<DiscoveryNodes, RemoteDiscoveryNodes> discoveryNodesBlobStore;
private final RemoteClusterStateBlobStore<Custom, RemoteClusterStateCustoms> customsBlobStore;
private final Compressor compressor;
private final NamedXContentRegistry namedXContentRegistry;

RemoteClusterStateAttributesManager(
RemoteClusterStateBlobStore<ClusterBlocks, RemoteClusterBlocks> clusterBlocksBlobStore, RemoteClusterStateBlobStore<DiscoveryNodes, RemoteDiscoveryNodes> discoveryNodesBlobStore, RemoteClusterStateBlobStore<Custom, RemoteClusterStateCustoms> customsBlobStore, Compressor compressor, NamedXContentRegistry namedXContentRegistry) {
this.clusterBlocksBlobStore = clusterBlocksBlobStore;
this.discoveryNodesBlobStore = discoveryNodesBlobStore;
this.customsBlobStore = customsBlobStore;
this.compressor = compressor;
this.namedXContentRegistry = namedXContentRegistry;
}

/**
* Allows async upload of Cluster State Attribute components to remote
*/
CheckedRunnable<IOException> getAsyncMetadataWriteAction(
ClusterState clusterState,
String component,
ToXContent componentData,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
) {
if (componentData instanceof DiscoveryNodes) {
RemoteDiscoveryNodes remoteObject = new RemoteDiscoveryNodes((DiscoveryNodes)componentData, clusterState.version(), clusterState.metadata().clusterUUID(), compressor, namedXContentRegistry);
return () -> discoveryNodesBlobStore.writeAsync(remoteObject, getActionListener(component, remoteObject, latchedActionListener));
} else if (componentData instanceof ClusterBlocks) {
RemoteClusterBlocks remoteObject = new RemoteClusterBlocks((ClusterBlocks) componentData, clusterState.version(), clusterState.metadata().clusterUUID(), compressor, namedXContentRegistry);
return () -> clusterBlocksBlobStore.writeAsync(remoteObject, getActionListener(component, remoteObject, latchedActionListener));
} else if (componentData instanceof ClusterState.Custom) {
RemoteClusterStateCustoms remoteObject = new RemoteClusterStateCustoms(
(ClusterState.Custom) componentData,
component,
clusterState.version(),
clusterState.metadata().clusterUUID(),
compressor,
namedXContentRegistry
);
return () -> customsBlobStore.writeAsync(remoteObject, getActionListener(component, remoteObject, latchedActionListener));
} else {
throw new RemoteStateTransferException("Remote object not found for "+ componentData.getClass());
}
}

private ActionListener<Void> getActionListener(String component, AbstractRemoteWritableBlobEntity remoteObject, LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener) {
return ActionListener.wrap(
resp -> latchedActionListener.onResponse(
remoteObject.getUploadedMetadata()
),
ex -> latchedActionListener.onFailure(new RemoteClusterStateUtils.RemoteStateTransferException(component, ex))
);
}

public CheckedRunnable<IOException> getAsyncMetadataReadAction(
String clusterUUID,
String component,
String componentName,
String uploadedFilename,
LatchedActionListener<RemoteReadResult> listener
) {
final ActionListener actionListener = ActionListener.wrap(response -> listener.onResponse(new RemoteReadResult((ToXContent) response, CLUSTER_STATE_ATTRIBUTE, component)), listener::onFailure);
if (component.equals(RemoteDiscoveryNodes.DISCOVERY_NODES)) {
RemoteDiscoveryNodes remoteDiscoveryNodes = new RemoteDiscoveryNodes(uploadedFilename, clusterUUID, compressor, namedXContentRegistry);
return () -> discoveryNodesBlobStore.readAsync(remoteDiscoveryNodes, actionListener);
} else if (component.equals(RemoteClusterBlocks.CLUSTER_BLOCKS)) {
RemoteClusterBlocks remoteClusterBlocks = new RemoteClusterBlocks(uploadedFilename, clusterUUID, compressor, namedXContentRegistry);
return () -> clusterBlocksBlobStore.readAsync(remoteClusterBlocks, actionListener);
} else if (component.equals(CLUSTER_STATE_CUSTOM)) {
final ActionListener customActionListener = ActionListener.wrap(response -> listener.onResponse(new RemoteReadResult((ToXContent) response, CLUSTER_STATE_ATTRIBUTE, String.join(CUSTOM_DELIMITER, component, componentName))), listener::onFailure);
RemoteClusterStateCustoms remoteClusterStateCustoms = new RemoteClusterStateCustoms(uploadedFilename, componentName, clusterUUID, compressor, namedXContentRegistry);
return () -> customsBlobStore.readAsync(remoteClusterStateCustoms, customActionListener);
} else {
throw new RemoteStateTransferException("Remote object not found for "+ component);
}
}

public Map<String, ClusterState.Custom> getUpdatedCustoms(ClusterState clusterState, ClusterState previousClusterState) {
Map<String, ClusterState.Custom> updatedCustoms = new HashMap<>();
Set<String> currentCustoms = new HashSet<>(clusterState.customs().keySet());
for (Map.Entry<String, ClusterState.Custom> entry : previousClusterState.customs().entrySet()) {
if (currentCustoms.contains(entry.getKey()) && !entry.getValue().equals(clusterState.customs().get(entry.getKey()))) {
updatedCustoms.put(entry.getKey(), clusterState.customs().get(entry.getKey()));
}
currentCustoms.remove(entry.getKey());
}
for (String custom : currentCustoms) {
updatedCustoms.put(custom, clusterState.customs().get(custom));
}
return updatedCustoms;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote.model;

import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_NAME_FORMAT;

import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import org.opensearch.cluster.block.ClusterBlocks;
import org.opensearch.common.io.Streams;
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
import org.opensearch.common.remote.BlobPathParameters;
import org.opensearch.core.compress.Compressor;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute;
import org.opensearch.gateway.remote.RemoteClusterStateUtils;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat;

/**
* Wrapper class for uploading/downloading {@link ClusterBlocks} to/from remote blob store
*/
public class RemoteClusterBlocks extends AbstractRemoteWritableBlobEntity<ClusterBlocks> {

public static final String CLUSTER_BLOCKS = "blocks";
public static final ChecksumBlobStoreFormat<ClusterBlocks> CLUSTER_BLOCKS_FORMAT = new ChecksumBlobStoreFormat<>(
"blocks",
METADATA_NAME_FORMAT,
ClusterBlocks::fromXContent
);

private ClusterBlocks clusterBlocks;
private long stateVersion;

public RemoteClusterBlocks(final ClusterBlocks clusterBlocks, long stateVersion, String clusterUUID,
final Compressor compressor, final NamedXContentRegistry namedXContentRegistry) {
super(clusterUUID, compressor, namedXContentRegistry);
this.clusterBlocks = clusterBlocks;
this.stateVersion = stateVersion;
}

public RemoteClusterBlocks(final String blobName, final String clusterUUID, final Compressor compressor, final NamedXContentRegistry namedXContentRegistry) {
super(clusterUUID, compressor, namedXContentRegistry);
this.blobName = blobName;
}

@Override
public BlobPathParameters getBlobPathParameters() {
return new BlobPathParameters(List.of("transient"), CLUSTER_BLOCKS);
}

@Override
public String generateBlobFileName() {
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/transient/<componentPrefix>__<inverted_state_version>__<inverted__timestamp>__<codec_version>
String blobFileName = String.join(
DELIMITER,
getBlobPathParameters().getFilePrefix(),
RemoteStoreUtils.invertLong(stateVersion),
RemoteStoreUtils.invertLong(System.currentTimeMillis()),
String.valueOf(CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION)
);
this.blobFileName = blobFileName;
return blobFileName;
}

@Override
public UploadedMetadata getUploadedMetadata() {
assert blobName != null;
return new UploadedMetadataAttribute(CLUSTER_BLOCKS, blobName);
}

@Override
public void set(final ClusterBlocks clusterBlocks) {
this.clusterBlocks = clusterBlocks;
}

@Override
public ClusterBlocks get() {
return clusterBlocks;
}


@Override
public InputStream serialize() throws IOException {
return CLUSTER_BLOCKS_FORMAT.serialize(clusterBlocks, generateBlobFileName(), getCompressor(), RemoteClusterStateUtils.FORMAT_PARAMS).streamInput();
}

@Override
public ClusterBlocks deserialize(final InputStream inputStream) throws IOException {
return CLUSTER_BLOCKS_FORMAT.deserialize(blobName, getNamedXContentRegistry(), Streams.readFully(inputStream));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote.model;

import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterState.Custom;
import org.opensearch.common.io.Streams;
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
import org.opensearch.common.remote.BlobPathParameters;
import org.opensearch.core.compress.Compressor;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteClusterStateUtils;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat;

import java.io.IOException;
import java.io.InputStream;
import java.util.List;

import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CLUSTER_STATE_EPHEMERAL_PATH_TOKEN;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_NAME_FORMAT;
import static org.opensearch.gateway.remote.model.RemoteCustomMetadata.CUSTOM_DELIMITER;

public class RemoteClusterStateCustoms extends AbstractRemoteWritableBlobEntity<Custom> {
public static final String CLUSTER_STATE_CUSTOM = "cluster-state-custom";

public final ChecksumBlobStoreFormat<ClusterState.Custom> clusterStateCustomBlobStoreFormat;
private long stateVersion;
private String customType;
private ClusterState.Custom custom;

public RemoteClusterStateCustoms(final ClusterState.Custom custom, final String customType, final long stateVersion, final String clusterUUID, final Compressor compressor, final NamedXContentRegistry namedXContentRegistry) {
super(clusterUUID, compressor, namedXContentRegistry);
this.stateVersion = stateVersion;
this.customType = customType;
this.custom = custom;
this.clusterStateCustomBlobStoreFormat = new ChecksumBlobStoreFormat<>(
CLUSTER_STATE_CUSTOM,
METADATA_NAME_FORMAT,
parser -> ClusterState.Custom.fromXContent(parser, customType)
);
}

public RemoteClusterStateCustoms(final String blobName, final String customType, final String clusterUUID, final Compressor compressor, final NamedXContentRegistry namedXContentRegistry) {
super(clusterUUID, compressor, namedXContentRegistry);
this.blobName = blobName;
this.customType = customType;
this.clusterStateCustomBlobStoreFormat = new ChecksumBlobStoreFormat<>(
CLUSTER_STATE_CUSTOM,
METADATA_NAME_FORMAT,
parser -> ClusterState.Custom.fromXContent(parser, customType)
);
}

@Override
public BlobPathParameters getBlobPathParameters() {
return new BlobPathParameters(List.of(CLUSTER_STATE_EPHEMERAL_PATH_TOKEN), CLUSTER_STATE_CUSTOM);
}

@Override
public String generateBlobFileName() {
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/ephemeral/<componentPrefix>__<inverted_state_version>__<inverted__timestamp>__<codec_version>
String blobFileName = String.join(
DELIMITER,
getBlobPathParameters().getFilePrefix(),
RemoteStoreUtils.invertLong(stateVersion),
RemoteStoreUtils.invertLong(System.currentTimeMillis()),
String.valueOf(CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION)
);
this.blobFileName = blobFileName;
return blobFileName;
}

@Override
public ClusterMetadataManifest.UploadedMetadata getUploadedMetadata() {
assert blobName != null;
return new ClusterMetadataManifest.UploadedMetadataAttribute(String.join(CUSTOM_DELIMITER, CLUSTER_STATE_CUSTOM, customType), blobName);
}

@Override
public void set(Custom custom) {
this.custom = custom;
}

@Override
public ClusterState.Custom get() {
return custom;
}

@Override
public InputStream serialize() throws IOException {
return clusterStateCustomBlobStoreFormat.serialize(custom, generateBlobFileName(), getCompressor(), RemoteClusterStateUtils.FORMAT_PARAMS).streamInput();
}

@Override
public ClusterState.Custom deserialize(final InputStream inputStream) throws IOException {
return clusterStateCustomBlobStoreFormat.deserialize(blobName, getNamedXContentRegistry(), Streams.readFully(inputStream));
}
}
Loading

0 comments on commit 255075c

Please sign in to comment.