Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upload global cluster state to remote store #10404

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class ClusterMetadataManifest implements Writeable, ToXContentFragment {
private static final ParseField OPENSEARCH_VERSION_FIELD = new ParseField("opensearch_version");
private static final ParseField NODE_ID_FIELD = new ParseField("node_id");
private static final ParseField COMMITTED_FIELD = new ParseField("committed");
private static final ParseField GLOBAL_METADATA_FIELD = new ParseField("global_metadata");
private static final ParseField INDICES_FIELD = new ParseField("indices");
private static final ParseField PREVIOUS_CLUSTER_UUID = new ParseField("previous_cluster_uuid");
private static final ParseField CLUSTER_UUID_COMMITTED = new ParseField("cluster_uuid_committed");
Expand Down Expand Up @@ -84,6 +85,10 @@ private static boolean clusterUUIDCommitted(Object[] fields) {
return (boolean) fields[9];
}

private static String globalMetadataFileName(Object[] fields) {
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved
return (String) fields[10];
}

private static final ConstructingObjectParser<ClusterMetadataManifest, Void> PARSER = new ConstructingObjectParser<>(
"cluster_metadata_manifest",
fields -> new ClusterMetadataManifest(
Expand All @@ -94,6 +99,7 @@ private static boolean clusterUUIDCommitted(Object[] fields) {
opensearchVersion(fields),
nodeId(fields),
committed(fields),
globalMetadataFileName(fields),
indices(fields),
previousClusterUUID(fields),
clusterUUIDCommitted(fields)
Expand All @@ -115,8 +121,10 @@ private static boolean clusterUUIDCommitted(Object[] fields) {
);
PARSER.declareString(ConstructingObjectParser.constructorArg(), PREVIOUS_CLUSTER_UUID);
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), CLUSTER_UUID_COMMITTED);
PARSER.declareString(ConstructingObjectParser.constructorArg(), GLOBAL_METADATA_FIELD);
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved
}

private final String globalMetadataFileName;
private final List<UploadedIndexMetadata> indices;
private final long clusterTerm;
private final long stateVersion;
Expand Down Expand Up @@ -168,6 +176,10 @@ public boolean isClusterUUIDCommitted() {
return clusterUUIDCommitted;
}

public String getGlobalMetadataFileName() {
return globalMetadataFileName;
}

public ClusterMetadataManifest(
long clusterTerm,
long version,
Expand All @@ -176,6 +188,7 @@ public ClusterMetadataManifest(
Version opensearchVersion,
String nodeId,
boolean committed,
String globalMetadataFileName,
List<UploadedIndexMetadata> indices,
String previousClusterUUID,
boolean clusterUUIDCommitted
Expand All @@ -187,6 +200,7 @@ public ClusterMetadataManifest(
this.opensearchVersion = opensearchVersion;
this.nodeId = nodeId;
this.committed = committed;
this.globalMetadataFileName = globalMetadataFileName;
this.indices = Collections.unmodifiableList(indices);
this.previousClusterUUID = previousClusterUUID;
this.clusterUUIDCommitted = clusterUUIDCommitted;
Expand All @@ -203,6 +217,11 @@ public ClusterMetadataManifest(StreamInput in) throws IOException {
this.indices = Collections.unmodifiableList(in.readList(UploadedIndexMetadata::new));
this.previousClusterUUID = in.readString();
this.clusterUUIDCommitted = in.readBoolean();
if(in.getVersion().onOrAfter(Version.V_3_0_0)) {
this.globalMetadataFileName = in.readString();
} else {
this.globalMetadataFileName = null;
}
}

public static Builder builder() {
Expand Down Expand Up @@ -231,6 +250,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.endArray();
builder.field(PREVIOUS_CLUSTER_UUID.getPreferredName(), getPreviousClusterUUID());
builder.field(CLUSTER_UUID_COMMITTED.getPreferredName(), isClusterUUIDCommitted());
builder.field(GLOBAL_METADATA_FIELD.getPreferredName(), getGlobalMetadataFileName());
return builder;
}

Expand All @@ -246,6 +266,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeCollection(indices);
out.writeString(previousClusterUUID);
out.writeBoolean(clusterUUIDCommitted);
if(out.getVersion().onOrAfter(Version.V_3_0_0)) {
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved
out.writeString(globalMetadataFileName);
}
}

@Override
Expand All @@ -266,12 +289,14 @@ public boolean equals(Object o) {
&& Objects.equals(nodeId, that.nodeId)
&& Objects.equals(committed, that.committed)
&& Objects.equals(previousClusterUUID, that.previousClusterUUID)
&& Objects.equals(clusterUUIDCommitted, that.clusterUUIDCommitted);
&& Objects.equals(clusterUUIDCommitted, that.clusterUUIDCommitted)
&& Objects.equals(globalMetadataFileName, that.globalMetadataFileName);
}

@Override
public int hashCode() {
return Objects.hash(
globalMetadataFileName,
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved
indices,
clusterTerm,
stateVersion,
Expand Down Expand Up @@ -301,6 +326,7 @@ public static ClusterMetadataManifest fromXContent(XContentParser parser) throws
*/
public static class Builder {

private String globalMetadataFileName;
private List<UploadedIndexMetadata> indices;
private long clusterTerm;
private long stateVersion;
Expand All @@ -317,6 +343,11 @@ public Builder indices(List<UploadedIndexMetadata> indices) {
return this;
}

public Builder globalMetadataFileName(String globalMetadataFileName) {
this.globalMetadataFileName = globalMetadataFileName;
return this;
}

public Builder clusterTerm(long clusterTerm) {
this.clusterTerm = clusterTerm;
return this;
Expand Down Expand Up @@ -378,6 +409,7 @@ public Builder(ClusterMetadataManifest manifest) {
this.opensearchVersion = manifest.opensearchVersion;
this.nodeId = manifest.nodeId;
this.committed = manifest.committed;
this.globalMetadataFileName = manifest.globalMetadataFileName;
this.indices = new ArrayList<>(manifest.indices);
this.previousClusterUUID = manifest.previousClusterUUID;
this.clusterUUIDCommitted = manifest.clusterUUIDCommitted;
Expand All @@ -392,6 +424,7 @@ public ClusterMetadataManifest build() {
opensearchVersion,
nodeId,
committed,
globalMetadataFileName,
indices,
previousClusterUUID,
clusterUUIDCommitted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@
import org.opensearch.Version;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.ComposableIndexTemplate;
import org.opensearch.cluster.metadata.ComposableIndexTemplateMetadata;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.common.Nullable;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
Expand All @@ -27,6 +31,8 @@
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.index.Index;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
Expand Down Expand Up @@ -55,6 +61,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
Expand Down Expand Up @@ -105,9 +112,11 @@ public class RemoteClusterStateService implements Closeable {

private static final String CLUSTER_STATE_PATH_TOKEN = "cluster-state";
private static final String INDEX_PATH_TOKEN = "index";
private static final String GLOBAL_METADATA_PATH_TOKEN = "global-metadata";
private static final String MANIFEST_PATH_TOKEN = "manifest";
private static final String MANIFEST_FILE_PREFIX = "manifest";
private static final String INDEX_METADATA_FILE_PREFIX = "metadata";
private static final String GLOBAL_METADATA_FILE_PREFIX = "global-metadata";

private final String nodeId;
private final Supplier<RepositoriesService> repositoriesService;
Expand Down Expand Up @@ -159,12 +168,15 @@ public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState, Stri
return null;
}

// Write globalMetadata
String globalMetadataFile = writeGlobalMetadata(clusterState);

// any validations before/after upload ?
final List<UploadedIndexMetadata> allUploadedIndexMetadata = writeIndexMetadataParallel(
clusterState,
new ArrayList<>(clusterState.metadata().indices().values())
);
final ClusterMetadataManifest manifest = uploadManifest(clusterState, allUploadedIndexMetadata, previousClusterUUID, false);
final ClusterMetadataManifest manifest = uploadManifest(clusterState, allUploadedIndexMetadata, previousClusterUUID, globalMetadataFile, false);
final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos);
if (durationMillis >= slowWriteLoggingThreshold.getMillis()) {
logger.warn(
Expand Down Expand Up @@ -203,6 +215,17 @@ public ClusterMetadataManifest writeIncrementalMetadata(
return null;
}
assert previousClusterState.metadata().coordinationMetadata().term() == clusterState.metadata().coordinationMetadata().term();

// Write Global Metadata
final boolean updateGlobalMeta = Metadata.isGlobalStateEquals(previousClusterState.metadata(), clusterState.metadata()) == false;
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved
String globalMetadataFile;
if(updateGlobalMeta) {
globalMetadataFile = writeGlobalMetadata(clusterState);
} else {
globalMetadataFile = previousManifest.getGlobalMetadataFileName();
}

// Write Index Metadata
final Map<String, Long> previousStateIndexMetadataVersionByName = new HashMap<>();
for (final IndexMetadata indexMetadata : previousClusterState.metadata().indices().values()) {
previousStateIndexMetadataVersionByName.put(indexMetadata.getIndex().getName(), indexMetadata.getVersion());
Expand Down Expand Up @@ -245,6 +268,7 @@ public ClusterMetadataManifest writeIncrementalMetadata(
clusterState,
new ArrayList<>(allUploadedIndexMetadata.values()),
previousManifest.getPreviousClusterUUID(),
globalMetadataFile,
false
);
deleteStaleClusterMetadata(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), RETAINED_MANIFESTS);
Expand All @@ -270,6 +294,52 @@ public ClusterMetadataManifest writeIncrementalMetadata(
return manifest;
}

/**
* Uploads provided ClusterState's global Metadata to remote store in parallel.
* The call is blocking so the method waits for upload to finish and then return.
*
* @param clusterState current ClusterState
* @return String file name where globalMetadata file is stored.
*/
private String writeGlobalMetadata(ClusterState clusterState)
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved
throws IOException {

AtomicReference<String> result = new AtomicReference<String>();
final BlobContainer globalMetadataContainer = globalMetadataContainer(
clusterState.getClusterName().value(),
clusterState.metadata().clusterUUID()
);
final String globalMetadataFilename = globalMetadataFileName(clusterState.metadata());

// latch to wait until upload is not finished
CountDownLatch latch = new CountDownLatch(1);
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved
ActionListener<Void> completionListener = ActionListener.wrap(
resp -> {
result.set(globalMetadataContainer.path().buildAsString() + globalMetadataFilename);
latch.countDown();
}, ex -> {
// TODO change the exception for Global Metadata
throw new IndexMetadataTransferException(ex.getMessage(), ex);
}
);

BlobStoreRepository.GLOBAL_METADATA_FORMAT.writeAsync(
clusterState.metadata(),
globalMetadataContainer,
globalMetadataFilename,
blobStoreRepository.getCompressor(),
completionListener
);

// TODO Add proper exception handling.
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return result.get();
}

/**
* Uploads provided IndexMetadata's to remote store in parallel. The call is blocking so the method waits for upload to finish and then return.
*
Expand Down Expand Up @@ -391,7 +461,7 @@ public ClusterMetadataManifest markLastStateAsCommitted(ClusterState clusterStat
}
assert clusterState != null : "Last accepted cluster state is not set";
assert previousManifest != null : "Last cluster metadata manifest is not set";
return uploadManifest(clusterState, previousManifest.getIndices(), previousManifest.getPreviousClusterUUID(), true);
return uploadManifest(clusterState, previousManifest.getIndices(), previousManifest.getPreviousClusterUUID(), previousManifest.getGlobalMetadataFileName(), true);
}

@Override
Expand All @@ -415,6 +485,7 @@ public void start() {
private ClusterMetadataManifest uploadManifest(
ClusterState clusterState,
List<UploadedIndexMetadata> uploadedIndexMetadata,
String globalClusterMetadataFileName,
String previousClusterUUID,
boolean committed
) throws IOException {
Expand All @@ -428,6 +499,7 @@ private ClusterMetadataManifest uploadManifest(
Version.CURRENT,
nodeId,
committed,
globalClusterMetadataFileName,
uploadedIndexMetadata,
previousClusterUUID,
clusterState.metadata().clusterUUIDCommitted()
Expand Down Expand Up @@ -459,6 +531,12 @@ private BlobContainer indexMetadataContainer(String clusterName, String clusterU
.blobContainer(getCusterMetadataBasePath(clusterName, clusterUUID).add(INDEX_PATH_TOKEN).add(indexUUID));
}

private BlobContainer globalMetadataContainer(String clusterName, String clusterUUID) {
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/global-metadata/
return blobStoreRepository.blobStore()
.blobContainer(getCusterMetadataBasePath(clusterName, clusterUUID).add(GLOBAL_METADATA_PATH_TOKEN));
}

private BlobContainer manifestContainer(String clusterName, String clusterUUID) {
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest
return blobStoreRepository.blobStore().blobContainer(getManifestFolderPath(clusterName, clusterUUID));
Expand Down Expand Up @@ -500,6 +578,15 @@ private static String indexMetadataFileName(IndexMetadata indexMetadata) {
);
}

private static String globalMetadataFileName(Metadata metadata) {
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved
return String.join(
DELIMITER,
GLOBAL_METADATA_FILE_PREFIX,
String.valueOf(metadata.version()),
String.valueOf(System.currentTimeMillis())
);
}

private BlobPath getManifestFolderPath(String clusterName, String clusterUUID) {
return getCusterMetadataBasePath(clusterName, clusterUUID).add(MANIFEST_PATH_TOKEN);
}
Expand Down
Loading