Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upload global cluster state to remote store #10404

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
*/
public class ClusterMetadataManifest implements Writeable, ToXContentFragment {

public static final int CODEC_V0 = 0; // Default codec version, where we haven't introduced codec versions for manifest.
public static final int CODEC_V0 = 0; // Older codec version, where we haven't introduced codec versions for manifest.
public static final int CODEC_V1 = 1; // In Codec V1 we have introduced global-metadata and codec version in Manifest file.

private static final ParseField CLUSTER_TERM_FIELD = new ParseField("cluster_term");
Expand Down Expand Up @@ -97,7 +97,7 @@
return (String) fields[11];
}

private static final ConstructingObjectParser<ClusterMetadataManifest, Void> PARSER = new ConstructingObjectParser<>(
private static final ConstructingObjectParser<ClusterMetadataManifest, Void> PARSER_V0 = new ConstructingObjectParser<>(
"cluster_metadata_manifest",
fields -> new ClusterMetadataManifest(
term(fields),
Expand All @@ -107,8 +107,8 @@
opensearchVersion(fields),
nodeId(fields),
committed(fields),
CODEC_V0, // Default codec version
null, // null global metadata for manifest files with v0 codec
CODEC_V0,
null,
indices(fields),
previousClusterUUID(fields),
clusterUUIDCommitted(fields)
Expand All @@ -134,7 +134,7 @@
);

static {
declareParser(PARSER, CODEC_V0);
declareParser(PARSER_V0, CODEC_V0);
declareParser(PARSER_V1, CODEC_V1);
}

Expand All @@ -154,7 +154,7 @@
parser.declareString(ConstructingObjectParser.constructorArg(), PREVIOUS_CLUSTER_UUID);
parser.declareBoolean(ConstructingObjectParser.constructorArg(), CLUSTER_UUID_COMMITTED);

if (codec_version == CODEC_V1) {
if (codec_version >= CODEC_V1) {
parser.declareInt(ConstructingObjectParser.constructorArg(), CODEC_VERSION_FIELD);
parser.declareString(ConstructingObjectParser.constructorArg(), GLOBAL_METADATA_FIELD);
}
Expand Down Expand Up @@ -264,8 +264,8 @@
this.codecVersion = in.readInt();
this.globalMetadataFileName = in.readString();
} else {
this.codecVersion = CODEC_V0; // Default codec
this.globalMetadataFileName = null;

Check warning on line 268 in server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java#L267-L268

Added lines #L267 - L268 were not covered by tests
}
}

Expand Down Expand Up @@ -295,7 +295,7 @@
builder.endArray();
builder.field(PREVIOUS_CLUSTER_UUID.getPreferredName(), getPreviousClusterUUID());
builder.field(CLUSTER_UUID_COMMITTED.getPreferredName(), isClusterUUIDCommitted());
if (getCodecVersion() == CODEC_V1) {
if (onOrAfterCodecVersion(CODEC_V1)) {
builder.field(CODEC_VERSION_FIELD.getPreferredName(), getCodecVersion());
builder.field(GLOBAL_METADATA_FIELD.getPreferredName(), getGlobalMetadataFileName());
}
Expand Down Expand Up @@ -366,11 +366,15 @@
return Strings.toString(MediaTypeRegistry.JSON, this);
}

public static ClusterMetadataManifest fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
public boolean onOrAfterCodecVersion(int codecVersion) {
return this.codecVersion >= codecVersion;
}

public static ClusterMetadataManifest fromXContentV0(XContentParser parser) throws IOException {
return PARSER_V0.parse(parser, null);
}

public static ClusterMetadataManifest fromXContentV1(XContentParser parser) throws IOException {
public static ClusterMetadataManifest fromXContent(XContentParser parser) throws IOException {
return PARSER_V1.parse(parser, null);
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@

private static final Logger logger = LogManager.getLogger(RemoteClusterStateService.class);

// TODO make this two variable as dynamic setting [issue: #10688]
public static final int INDEX_METADATA_UPLOAD_WAIT_MILLIS = 20000;
public static final int GLOBAL_METADATA_UPLOAD_WAIT_MILLIS = 20000;
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved

Expand All @@ -101,18 +102,18 @@
/**
* Manifest format compatible with older codec v0, where codec version was missing.
*/
public static final ChecksumBlobStoreFormat<ClusterMetadataManifest> CLUSTER_METADATA_MANIFEST_FORMAT_V0 =
new ChecksumBlobStoreFormat<>("cluster-metadata-manifest", METADATA_MANIFEST_NAME_FORMAT, ClusterMetadataManifest::fromXContentV0);

/**
* Manifest format compatible with codec v1, where we introduced codec versions/global metadata.
*/
public static final ChecksumBlobStoreFormat<ClusterMetadataManifest> CLUSTER_METADATA_MANIFEST_FORMAT = new ChecksumBlobStoreFormat<>(
"cluster-metadata-manifest",
METADATA_MANIFEST_NAME_FORMAT,
ClusterMetadataManifest::fromXContent
);

/**
* Manifest format compatible with codec v1, where we introduced codec versions/global metadata.
*/
public static final ChecksumBlobStoreFormat<ClusterMetadataManifest> CLUSTER_METADATA_MANIFEST_FORMAT_V1 =
new ChecksumBlobStoreFormat<>("cluster-metadata-manifest", METADATA_MANIFEST_NAME_FORMAT, ClusterMetadataManifest::fromXContentV1);

/**
* Used to specify if cluster state metadata should be published to remote store
*/
Expand All @@ -129,6 +130,7 @@
public static final String MANIFEST_PATH_TOKEN = "manifest";
public static final String MANIFEST_FILE_PREFIX = "manifest";
public static final String METADATA_FILE_PREFIX = "metadata";
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved
public static final int SPLITED_MANIFEST_FILE_LENGTH = 6; // file name manifest__term__version__C/P__timestamp__codecversion

private final String nodeId;
private final Supplier<RepositoriesService> repositoriesService;
Expand Down Expand Up @@ -366,19 +368,19 @@

try {
if (latch.await(GLOBAL_METADATA_UPLOAD_WAIT_MILLIS, TimeUnit.MILLISECONDS) == false) {
// TODO: We should add metrics where transfer is timing out.
// TODO: We should add metrics where transfer is timing out. [Issue: #10687]
GlobalMetadataTransferException ex = new GlobalMetadataTransferException(
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved
String.format(Locale.ROOT, "Timed out waiting for transfer of global metadata to complete")

Check warning on line 373 in server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java#L372-L373

Added lines #L372 - L373 were not covered by tests
);
throw ex;

Check warning on line 375 in server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java#L375

Added line #L375 was not covered by tests
}
} catch (InterruptedException ex) {
GlobalMetadataTransferException exception = new GlobalMetadataTransferException(
String.format(Locale.ROOT, "Timed out waiting for transfer of global metadata to complete - %s"),

Check warning on line 379 in server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java#L377-L379

Added lines #L377 - L379 were not covered by tests
ex
);
Thread.currentThread().interrupt();
throw exception;

Check warning on line 383 in server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java#L382-L383

Added lines #L382 - L383 were not covered by tests
}

return result.get();
Expand Down Expand Up @@ -566,7 +568,7 @@
private void writeMetadataManifest(String clusterName, String clusterUUID, ClusterMetadataManifest uploadManifest, String fileName)
throws IOException {
final BlobContainer metadataManifestContainer = manifestContainer(clusterName, clusterUUID);
CLUSTER_METADATA_MANIFEST_FORMAT_V1.write(uploadManifest, metadataManifestContainer, fileName, blobStoreRepository.getCompressor());
CLUSTER_METADATA_MANIFEST_FORMAT.write(uploadManifest, metadataManifestContainer, fileName, blobStoreRepository.getCompressor());
}

private String fetchPreviousClusterUUID(String clusterName, String clusterUUID) {
Expand Down Expand Up @@ -734,7 +736,7 @@
return validChain.get(0);
} catch (IOException e) {
throw new IllegalStateException(
String.format(Locale.ROOT, "Error while fetching previous UUIDs from remote store for cluster name: %s", clusterName),

Check warning on line 739 in server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java#L739

Added line #L739 was not covered by tests
e
);
}
Expand Down Expand Up @@ -935,21 +937,24 @@

private ChecksumBlobStoreFormat<ClusterMetadataManifest> getClusterMetadataManifestBlobStoreFormat(String fileName) {
long codecVersion = getManifestCodecVersion(fileName);
if (codecVersion == ClusterMetadataManifest.CODEC_V0) {
if (codecVersion == ClusterMetadataManifest.CODEC_V1) {
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved
return CLUSTER_METADATA_MANIFEST_FORMAT;

Check warning on line 941 in server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java#L941

Added line #L941 was not covered by tests
} else if (codecVersion == ClusterMetadataManifest.CODEC_V1) {
return CLUSTER_METADATA_MANIFEST_FORMAT_V1;
} else if (codecVersion == ClusterMetadataManifest.CODEC_V0) {
return CLUSTER_METADATA_MANIFEST_FORMAT_V0;
}

throw new IllegalArgumentException("Cluster metadata manifest file is corrupted, don't have valid codec version");

Check warning on line 946 in server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java#L946

Added line #L946 was not covered by tests
}

private int getManifestCodecVersion(String fileName) {
if (fileName.split(DELIMITER).length < 6) { // Where codec is not part of file name, i.e. default codec version 0 is used.
String[] splitName = fileName.split(DELIMITER);
if (splitName.length == SPLITED_MANIFEST_FILE_LENGTH) {
return Integer.parseInt(splitName[splitName.length - 1]); // Last value would be codec version.

Check warning on line 952 in server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java#L952

Added line #L952 was not covered by tests
} else if (splitName.length < SPLITED_MANIFEST_FILE_LENGTH) { // Where codec is not part of file name, i.e. default codec version 0
// is used.
return ClusterMetadataManifest.CODEC_V0;
} else {
String[] splitName = fileName.split(DELIMITER);
return Integer.parseInt(splitName[splitName.length - 1]); // Last value would be codec version.
throw new IllegalArgumentException("Manifest file name is corrupted");

Check warning on line 957 in server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java#L957

Added line #L957 was not covered by tests
}
}

Expand Down Expand Up @@ -977,8 +982,8 @@
static class GlobalMetadataTransferException extends RuntimeException {

public GlobalMetadataTransferException(String errorDesc) {
super(errorDesc);
}

Check warning on line 986 in server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java#L985-L986

Added lines #L985 - L986 were not covered by tests

public GlobalMetadataTransferException(String errorDesc, Throwable cause) {
super(errorDesc, cause);
Expand Down Expand Up @@ -1076,7 +1081,7 @@
Set<String> filesToKeep = new HashSet<>();
Set<String> staleManifestPaths = new HashSet<>();
Set<String> staleIndexMetadataPaths = new HashSet<>();
Set<String> staleGlobalMetadataPaths = new HashSet<>();

Check warning on line 1084 in server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java#L1084

Added line #L1084 was not covered by tests
activeManifestBlobMetadata.forEach(blobMetadata -> {
ClusterMetadataManifest clusterMetadataManifest = fetchRemoteClusterMetadataManifest(
clusterName,
Expand All @@ -1085,7 +1090,7 @@
);
clusterMetadataManifest.getIndices()
.forEach(uploadedIndexMetadata -> filesToKeep.add(uploadedIndexMetadata.getUploadedFilename()));
filesToKeep.add(clusterMetadataManifest.getGlobalMetadataFileName());

Check warning on line 1093 in server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java#L1093

Added line #L1093 was not covered by tests
});
staleManifestBlobMetadata.forEach(blobMetadata -> {
ClusterMetadataManifest clusterMetadataManifest = fetchRemoteClusterMetadataManifest(
Expand All @@ -1095,9 +1100,9 @@
);
staleManifestPaths.add(new BlobPath().add(MANIFEST_PATH_TOKEN).buildAsString() + blobMetadata.name());
if (filesToKeep.contains(clusterMetadataManifest.getGlobalMetadataFileName()) == false) {
String[] globalMetadataSplitPath = clusterMetadataManifest.getGlobalMetadataFileName().split("/");
staleGlobalMetadataPaths.add(
new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + GLOBAL_METADATA_FORMAT.blobName(

Check warning on line 1105 in server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java#L1103-L1105

Added lines #L1103 - L1105 were not covered by tests
globalMetadataSplitPath[globalMetadataSplitPath.length - 1]
)
);
Expand All @@ -1117,7 +1122,7 @@
return;
}

deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleGlobalMetadataPaths));

Check warning on line 1125 in server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java#L1125

Added line #L1125 was not covered by tests
deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleIndexMetadataPaths));
deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleManifestPaths));
} catch (IllegalStateException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

public class ClusterMetadataManifestTests extends OpenSearchTestCase {

public void testClusterMetadataManifestXContent() throws IOException {
public void testClusterMetadataManifestXContentV0() throws IOException {
UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "test-uuid", "/test/upload/path");
ClusterMetadataManifest originalManifest = new ClusterMetadataManifest(
1L,
Expand All @@ -48,12 +48,12 @@ public void testClusterMetadataManifestXContent() throws IOException {
builder.endObject();

try (XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder))) {
final ClusterMetadataManifest fromXContentManifest = ClusterMetadataManifest.fromXContent(parser);
final ClusterMetadataManifest fromXContentManifest = ClusterMetadataManifest.fromXContentV0(parser);
assertEquals(originalManifest, fromXContentManifest);
}
}

public void testClusterMetadataManifestXContentCodecV1() throws IOException {
public void testClusterMetadataManifestXContent() throws IOException {
UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "test-uuid", "/test/upload/path");
ClusterMetadataManifest originalManifest = new ClusterMetadataManifest(
1L,
Expand All @@ -75,7 +75,7 @@ public void testClusterMetadataManifestXContentCodecV1() throws IOException {
builder.endObject();

try (XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder))) {
final ClusterMetadataManifest fromXContentManifest = ClusterMetadataManifest.fromXContentV1(parser);
final ClusterMetadataManifest fromXContentManifest = ClusterMetadataManifest.fromXContent(parser);
assertEquals(originalManifest, fromXContentManifest);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ public void testWriteFullMetadataInParallelSuccess() throws IOException {

}

public void testWriteFullMetadataInParallelFailureForGlobalMetadata() throws IOException {
public void testWriteFullMetadataFailureForGlobalMetadata() throws IOException {
final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
AsyncMultiStreamBlobContainer container = (AsyncMultiStreamBlobContainer) mockBlobStoreObjects(AsyncMultiStreamBlobContainer.class);

Expand Down Expand Up @@ -364,8 +364,8 @@ public void testWriteIncrementalMetadataSuccess() throws IOException {
/*
* Here we will verify the migration of manifest file from codec V0 and V1.
*
* Initially codec version is 1 and global metadata is also null, we will perform index metadata update.
* In final manifest codec version should be 2 and
* Initially codec version is 0 and global metadata is also null, we will perform index metadata update.
* In final manifest codec version should be 1 and
* global metadata should be updated, even if it was not changed in this cluster state update
*/
public void testMigrationFromCodecV0ManifestToCodecV1Manifest() throws IOException {
Expand Down Expand Up @@ -445,9 +445,9 @@ public void testWriteIncrementalGlobalMetadataSuccess() throws IOException {
}

/*
* Here we will verify global metadata is not uploaded again if change is only in index metadata
* Here we will verify index metadata is not uploaded again if change is only in global metadata
*/
public void testGlobalMetadataNotUpdatingIndexMetadata() throws IOException {
public void testGlobalMetadataOnlyUpdated() throws IOException {
// setup
mockBlobStoreObjects();
final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build();
Expand Down Expand Up @@ -508,9 +508,9 @@ public void testGlobalMetadataNotUpdatingIndexMetadata() throws IOException {
}

/*
* Here we will verify index metadata is not uploaded again if change is only in global metadata
* Here we will verify global metadata is not uploaded again if change is only in index metadata
*/
public void testIndexMetadataNotUpdatingGlobalMetadata() throws IOException {
public void testIndexMetadataOnlyUpdated() throws IOException {
// setup
mockBlobStoreObjects();
final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build();
Expand Down
Loading