Skip to content

Commit

Permalink
[Backport 2.x] [Remote Routing Table] Initial commit for index routin…
Browse files Browse the repository at this point in the history
…g table manifest (opensearch-project#14066)

* [Remote Routing Table] Remote Index routing table manifest (opensearch-project#13577)

Co-authored-by: Bukhtawar Khan <[email protected]>
Co-authored-by: Himshikha Gupta <[email protected]>
Signed-off-by: kkewwei <[email protected]>
  • Loading branch information
3 people authored and kkewwei committed Jul 24, 2024
1 parent 7bf7e63 commit c641bc4
Show file tree
Hide file tree
Showing 9 changed files with 533 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class ClusterMetadataManifest implements Writeable, ToXContentFragment {
public static final int CODEC_V0 = 0; // Older codec version, where we haven't introduced codec versions for manifest.
public static final int CODEC_V1 = 1; // In Codec V1 we have introduced global-metadata and codec version in Manifest file.
public static final int CODEC_V2 = 2; // In Codec V2, there are seperate metadata files rather than a single global metadata file.
public static final int CODEC_V3 = 3; // In Codec V3, we introduce index routing-metadata in manifest file.

private static final ParseField CLUSTER_TERM_FIELD = new ParseField("cluster_term");
private static final ParseField STATE_VERSION_FIELD = new ParseField("state_version");
Expand All @@ -58,6 +59,8 @@ public class ClusterMetadataManifest implements Writeable, ToXContentFragment {
private static final ParseField UPLOADED_SETTINGS_METADATA = new ParseField("uploaded_settings_metadata");
private static final ParseField UPLOADED_TEMPLATES_METADATA = new ParseField("uploaded_templates_metadata");
private static final ParseField UPLOADED_CUSTOM_METADATA = new ParseField("uploaded_custom_metadata");
private static final ParseField ROUTING_TABLE_VERSION_FIELD = new ParseField("routing_table_version");
private static final ParseField INDICES_ROUTING_FIELD = new ParseField("indices_routing");

private static ClusterMetadataManifest.Builder manifestV0Builder(Object[] fields) {
return ClusterMetadataManifest.builder()
Expand Down Expand Up @@ -86,6 +89,12 @@ private static ClusterMetadataManifest.Builder manifestV2Builder(Object[] fields
.customMetadataMap(customMetadata(fields));
}

private static ClusterMetadataManifest.Builder manifestV3Builder(Object[] fields) {
return manifestV2Builder(fields).codecVersion(codecVersion(fields))
.routingTableVersion(routingTableVersion(fields))
.indicesRouting(indicesRouting(fields));
}

private static long term(Object[] fields) {
return (long) fields[0];
}
Expand Down Expand Up @@ -151,6 +160,14 @@ private static Map<String, UploadedMetadataAttribute> customMetadata(Object[] fi
return customs.stream().collect(Collectors.toMap(UploadedMetadataAttribute::getAttributeName, Function.identity()));
}

private static long routingTableVersion(Object[] fields) {
return (long) fields[15];
}

private static List<UploadedIndexMetadata> indicesRouting(Object[] fields) {
return (List<UploadedIndexMetadata>) fields[16];
}

private static final ConstructingObjectParser<ClusterMetadataManifest, Void> PARSER_V0 = new ConstructingObjectParser<>(
"cluster_metadata_manifest",
fields -> manifestV0Builder(fields).build()
Expand All @@ -166,12 +183,18 @@ private static Map<String, UploadedMetadataAttribute> customMetadata(Object[] fi
fields -> manifestV2Builder(fields).build()
);

private static final ConstructingObjectParser<ClusterMetadataManifest, Void> CURRENT_PARSER = PARSER_V2;
private static final ConstructingObjectParser<ClusterMetadataManifest, Void> PARSER_V3 = new ConstructingObjectParser<>(
"cluster_metadata_manifest",
fields -> manifestV3Builder(fields).build()
);

private static final ConstructingObjectParser<ClusterMetadataManifest, Void> CURRENT_PARSER = PARSER_V3;

static {
declareParser(PARSER_V0, CODEC_V0);
declareParser(PARSER_V1, CODEC_V1);
declareParser(PARSER_V2, CODEC_V2);
declareParser(PARSER_V3, CODEC_V3);
}

private static void declareParser(ConstructingObjectParser<ClusterMetadataManifest, Void> parser, long codec_version) {
Expand Down Expand Up @@ -216,6 +239,14 @@ private static void declareParser(ConstructingObjectParser<ClusterMetadataManife
UPLOADED_CUSTOM_METADATA
);
}
if (codec_version >= CODEC_V3) {
parser.declareLong(ConstructingObjectParser.constructorArg(), ROUTING_TABLE_VERSION_FIELD);
parser.declareObjectArray(
ConstructingObjectParser.constructorArg(),
(p, c) -> UploadedIndexMetadata.fromXContent(p),
INDICES_ROUTING_FIELD
);
}
}

private final int codecVersion;
Expand All @@ -234,6 +265,8 @@ private static void declareParser(ConstructingObjectParser<ClusterMetadataManife
private final boolean committed;
private final String previousClusterUUID;
private final boolean clusterUUIDCommitted;
private final long routingTableVersion;
private final List<UploadedIndexMetadata> indicesRouting;

public List<UploadedIndexMetadata> getIndices() {
return indices;
Expand Down Expand Up @@ -306,6 +339,14 @@ public boolean hasMetadataAttributesFiles() {
|| !uploadedCustomMetadataMap.isEmpty();
}

public long getRoutingTableVersion() {
return routingTableVersion;
}

public List<UploadedIndexMetadata> getIndicesRouting() {
return indicesRouting;
}

public ClusterMetadataManifest(
long clusterTerm,
long version,
Expand All @@ -322,7 +363,9 @@ public ClusterMetadataManifest(
UploadedMetadataAttribute uploadedCoordinationMetadata,
UploadedMetadataAttribute uploadedSettingsMetadata,
UploadedMetadataAttribute uploadedTemplatesMetadata,
Map<String, UploadedMetadataAttribute> uploadedCustomMetadataMap
Map<String, UploadedMetadataAttribute> uploadedCustomMetadataMap,
long routingTableVersion,
List<UploadedIndexMetadata> indicesRouting
) {
this.clusterTerm = clusterTerm;
this.stateVersion = version;
Expand All @@ -336,6 +379,8 @@ public ClusterMetadataManifest(
this.indices = Collections.unmodifiableList(indices);
this.previousClusterUUID = previousClusterUUID;
this.clusterUUIDCommitted = clusterUUIDCommitted;
this.routingTableVersion = routingTableVersion;
this.indicesRouting = Collections.unmodifiableList(indicesRouting);
this.uploadedCoordinationMetadata = uploadedCoordinationMetadata;
this.uploadedSettingsMetadata = uploadedSettingsMetadata;
this.uploadedTemplatesMetadata = uploadedTemplatesMetadata;
Expand Down Expand Up @@ -364,20 +409,26 @@ public ClusterMetadataManifest(StreamInput in) throws IOException {
in.readMap(StreamInput::readString, UploadedMetadataAttribute::new)
);
this.globalMetadataFileName = null;
this.routingTableVersion = in.readLong();
this.indicesRouting = Collections.unmodifiableList(in.readList(UploadedIndexMetadata::new));
} else if (in.getVersion().onOrAfter(Version.V_2_12_0)) {
this.codecVersion = in.readInt();
this.globalMetadataFileName = in.readString();
this.uploadedCoordinationMetadata = null;
this.uploadedSettingsMetadata = null;
this.uploadedTemplatesMetadata = null;
this.uploadedCustomMetadataMap = null;
this.routingTableVersion = -1;
this.indicesRouting = null;
} else {
this.codecVersion = CODEC_V0; // Default codec
this.globalMetadataFileName = null;
this.uploadedCoordinationMetadata = null;
this.uploadedSettingsMetadata = null;
this.uploadedTemplatesMetadata = null;
this.uploadedCustomMetadataMap = null;
this.routingTableVersion = -1;
this.indicesRouting = null;
}
}

Expand All @@ -401,7 +452,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.startArray(INDICES_FIELD.getPreferredName());
{
for (UploadedIndexMetadata uploadedIndexMetadata : indices) {
builder.startObject();
uploadedIndexMetadata.toXContent(builder, params);
builder.endObject();
}
}
builder.endArray();
Expand Down Expand Up @@ -433,6 +486,18 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(CODEC_VERSION_FIELD.getPreferredName(), getCodecVersion());
builder.field(GLOBAL_METADATA_FIELD.getPreferredName(), getGlobalMetadataFileName());
}
if (onOrAfterCodecVersion(CODEC_V3)) {
builder.field(ROUTING_TABLE_VERSION_FIELD.getPreferredName(), getRoutingTableVersion());
builder.startArray(INDICES_ROUTING_FIELD.getPreferredName());
{
for (UploadedIndexMetadata uploadedIndexMetadata : indicesRouting) {
builder.startObject();
uploadedIndexMetadata.toXContent(builder, params);
builder.endObject();
}
}
builder.endArray();
}
return builder;
}

Expand All @@ -454,6 +519,8 @@ public void writeTo(StreamOutput out) throws IOException {
uploadedSettingsMetadata.writeTo(out);
uploadedTemplatesMetadata.writeTo(out);
out.writeMap(uploadedCustomMetadataMap, StreamOutput::writeString, (o, v) -> v.writeTo(o));
out.writeLong(routingTableVersion);
out.writeCollection(indicesRouting);
} else if (out.getVersion().onOrAfter(Version.V_2_12_0)) {
out.writeInt(codecVersion);
out.writeString(globalMetadataFileName);
Expand All @@ -480,7 +547,9 @@ public boolean equals(Object o) {
&& Objects.equals(previousClusterUUID, that.previousClusterUUID)
&& Objects.equals(clusterUUIDCommitted, that.clusterUUIDCommitted)
&& Objects.equals(globalMetadataFileName, that.globalMetadataFileName)
&& Objects.equals(codecVersion, that.codecVersion);
&& Objects.equals(codecVersion, that.codecVersion)
&& Objects.equals(routingTableVersion, that.routingTableVersion)
&& Objects.equals(indicesRouting, that.indicesRouting);
}

@Override
Expand All @@ -497,7 +566,9 @@ public int hashCode() {
nodeId,
committed,
previousClusterUUID,
clusterUUIDCommitted
clusterUUIDCommitted,
routingTableVersion,
indicesRouting
);
}

Expand All @@ -518,6 +589,10 @@ public static ClusterMetadataManifest fromXContentV1(XContentParser parser) thro
return PARSER_V1.parse(parser, null);
}

public static ClusterMetadataManifest fromXContentV2(XContentParser parser) throws IOException {
return PARSER_V2.parse(parser, null);
}

public static ClusterMetadataManifest fromXContent(XContentParser parser) throws IOException {
return CURRENT_PARSER.parse(parser, null);
}
Expand Down Expand Up @@ -545,12 +620,24 @@ public static class Builder {
private String previousClusterUUID;
private boolean committed;
private boolean clusterUUIDCommitted;
private long routingTableVersion;
private List<UploadedIndexMetadata> indicesRouting;

public Builder indices(List<UploadedIndexMetadata> indices) {
this.indices = indices;
return this;
}

public Builder routingTableVersion(long routingTableVersion) {
this.routingTableVersion = routingTableVersion;
return this;
}

public Builder indicesRouting(List<UploadedIndexMetadata> indicesRouting) {
this.indicesRouting = indicesRouting;
return this;
}

public Builder codecVersion(int codecVersion) {
this.codecVersion = codecVersion;
return this;
Expand Down Expand Up @@ -625,6 +712,10 @@ public List<UploadedIndexMetadata> getIndices() {
return indices;
}

public List<UploadedIndexMetadata> getIndicesRouting() {
return indicesRouting;
}

public Builder previousClusterUUID(String previousClusterUUID) {
this.previousClusterUUID = previousClusterUUID;
return this;
Expand All @@ -638,6 +729,7 @@ public Builder clusterUUIDCommitted(boolean clusterUUIDCommitted) {
public Builder() {
indices = new ArrayList<>();
customMetadataMap = new HashMap<>();
indicesRouting = new ArrayList<>();
}

public Builder(ClusterMetadataManifest manifest) {
Expand All @@ -657,6 +749,8 @@ public Builder(ClusterMetadataManifest manifest) {
this.indices = new ArrayList<>(manifest.indices);
this.previousClusterUUID = manifest.previousClusterUUID;
this.clusterUUIDCommitted = manifest.clusterUUIDCommitted;
this.routingTableVersion = manifest.routingTableVersion;
this.indicesRouting = new ArrayList<>(manifest.indicesRouting);
}

public ClusterMetadataManifest build() {
Expand All @@ -676,7 +770,9 @@ public ClusterMetadataManifest build() {
coordinationMetadata,
settingsMetadata,
templatesMetadata,
customMetadataMap
customMetadataMap,
routingTableVersion,
indicesRouting
);
}

Expand Down Expand Up @@ -776,11 +872,9 @@ public String getIndexUUID() {

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject()
.field(INDEX_NAME_FIELD.getPreferredName(), getIndexName())
return builder.field(INDEX_NAME_FIELD.getPreferredName(), getIndexName())
.field(INDEX_UUID_FIELD.getPreferredName(), getIndexUUID())
.field(UPLOADED_FILENAME_FIELD.getPreferredName(), getUploadedFilePath())
.endObject();
.field(UPLOADED_FILENAME_FIELD.getPreferredName(), getUploadedFilePath());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ public class RemoteClusterStateService implements Closeable {
+ "indices, coordination metadata updated : [{}], settings metadata updated : [{}], templates metadata "
+ "updated : [{}], custom metadata updated : [{}]";
public static final int INDEX_METADATA_CURRENT_CODEC_VERSION = 1;
public static final int MANIFEST_CURRENT_CODEC_VERSION = ClusterMetadataManifest.CODEC_V2;
public static final int MANIFEST_CURRENT_CODEC_VERSION = ClusterMetadataManifest.CODEC_V3;
public static final int GLOBAL_METADATA_CURRENT_CODEC_VERSION = 2;

// ToXContent Params with gateway mode.
Expand Down Expand Up @@ -795,7 +795,10 @@ private ClusterMetadataManifest uploadManifest(
uploadedCoordinationMetadata,
uploadedSettingsMetadata,
uploadedTemplatesMetadata,
uploadedCustomMetadataMap
uploadedCustomMetadataMap,
clusterState.routingTable().version(),
// TODO: Add actual list of changed indices routing with index routing upload flow.
new ArrayList<>()
);
writeMetadataManifest(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), manifest, manifestFileName);
return manifest;
Expand Down
Loading

0 comments on commit c641bc4

Please sign in to comment.