From 7faae255c2593921638e4a91898dce8d2867bede Mon Sep 17 00:00:00 2001 From: Himshikha Gupta Date: Thu, 6 Jun 2024 14:20:00 +0530 Subject: [PATCH] [Remote Routing Table] Initial commit for index routing table manifest (#13577) * Initial commit for index routing table manifest Co-authored-by: Bukhtawar Khan Co-authored-by: Himshikha Gupta Co-authored-by: Arpit Bandejiya --- .../stream}/BufferedChecksumStreamInput.java | 4 +- .../stream}/BufferedChecksumStreamOutput.java | 3 +- .../remote/ClusterMetadataManifest.java | 112 ++++++++++++++++-- .../remote/RemoteClusterStateService.java | 7 +- .../routingtable/IndexRoutingTableHeader.java | 81 +++++++++++++ .../routingtable/RemoteIndexRoutingTable.java | 100 ++++++++++++++++ .../remote/routingtable/package-info.java | 12 ++ .../index/translog/BaseTranslogReader.java | 1 + .../opensearch/index/translog/Translog.java | 2 + .../index/translog/TranslogHeader.java | 2 + .../index/translog/TranslogSnapshot.java | 1 + .../index/translog/TranslogWriter.java | 1 + .../remote/ClusterMetadataManifestTests.java | 110 ++++++++++++++++- .../RemoteClusterStateServiceTests.java | 10 +- .../IndexRoutingTableHeaderTests.java | 32 +++++ .../RemoteIndexRoutingTableTests.java | 87 ++++++++++++++ .../snapshots/BlobStoreFormatTests.java | 2 +- 17 files changed, 543 insertions(+), 24 deletions(-) rename {server/src/main/java/org/opensearch/index/translog => libs/core/src/main/java/org/opensearch/core/common/io/stream}/BufferedChecksumStreamInput.java (96%) rename {server/src/main/java/org/opensearch/index/translog => libs/core/src/main/java/org/opensearch/core/common/io/stream}/BufferedChecksumStreamOutput.java (96%) create mode 100644 server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeader.java create mode 100644 server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTable.java create mode 100644 server/src/main/java/org/opensearch/gateway/remote/routingtable/package-info.java create mode 100644 server/src/test/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeaderTests.java create mode 100644 server/src/test/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTableTests.java diff --git a/server/src/main/java/org/opensearch/index/translog/BufferedChecksumStreamInput.java b/libs/core/src/main/java/org/opensearch/core/common/io/stream/BufferedChecksumStreamInput.java similarity index 96% rename from server/src/main/java/org/opensearch/index/translog/BufferedChecksumStreamInput.java rename to libs/core/src/main/java/org/opensearch/core/common/io/stream/BufferedChecksumStreamInput.java index f75f27b7bcb91..41680961b36e9 100644 --- a/server/src/main/java/org/opensearch/index/translog/BufferedChecksumStreamInput.java +++ b/libs/core/src/main/java/org/opensearch/core/common/io/stream/BufferedChecksumStreamInput.java @@ -30,12 +30,10 @@ * GitHub history for details. */ -package org.opensearch.index.translog; +package org.opensearch.core.common.io.stream; import org.apache.lucene.store.BufferedChecksum; import org.apache.lucene.util.BitUtil; -import org.opensearch.core.common.io.stream.FilterStreamInput; -import org.opensearch.core.common.io.stream.StreamInput; import java.io.EOFException; import java.io.IOException; diff --git a/server/src/main/java/org/opensearch/index/translog/BufferedChecksumStreamOutput.java b/libs/core/src/main/java/org/opensearch/core/common/io/stream/BufferedChecksumStreamOutput.java similarity index 96% rename from server/src/main/java/org/opensearch/index/translog/BufferedChecksumStreamOutput.java rename to libs/core/src/main/java/org/opensearch/core/common/io/stream/BufferedChecksumStreamOutput.java index 9e96664c79cc5..422f956c0cd47 100644 --- a/server/src/main/java/org/opensearch/index/translog/BufferedChecksumStreamOutput.java +++ b/libs/core/src/main/java/org/opensearch/core/common/io/stream/BufferedChecksumStreamOutput.java @@ -30,11 +30,10 @@ * GitHub history for details. */ -package org.opensearch.index.translog; +package org.opensearch.core.common.io.stream; import org.apache.lucene.store.BufferedChecksum; import org.opensearch.common.annotation.PublicApi; -import org.opensearch.core.common.io.stream.StreamOutput; import java.io.IOException; import java.util.zip.CRC32; diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java index bf02c73ca560b..b3b1bf37f8696 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java @@ -41,6 +41,7 @@ public class ClusterMetadataManifest implements Writeable, ToXContentFragment { public static final int CODEC_V0 = 0; // Older codec version, where we haven't introduced codec versions for manifest. public static final int CODEC_V1 = 1; // In Codec V1 we have introduced global-metadata and codec version in Manifest file. public static final int CODEC_V2 = 2; // In Codec V2, there are seperate metadata files rather than a single global metadata file. + public static final int CODEC_V3 = 3; // In Codec V3, we introduce index routing-metadata in manifest file. private static final ParseField CLUSTER_TERM_FIELD = new ParseField("cluster_term"); private static final ParseField STATE_VERSION_FIELD = new ParseField("state_version"); @@ -58,6 +59,8 @@ public class ClusterMetadataManifest implements Writeable, ToXContentFragment { private static final ParseField UPLOADED_SETTINGS_METADATA = new ParseField("uploaded_settings_metadata"); private static final ParseField UPLOADED_TEMPLATES_METADATA = new ParseField("uploaded_templates_metadata"); private static final ParseField UPLOADED_CUSTOM_METADATA = new ParseField("uploaded_custom_metadata"); + private static final ParseField ROUTING_TABLE_VERSION_FIELD = new ParseField("routing_table_version"); + private static final ParseField INDICES_ROUTING_FIELD = new ParseField("indices_routing"); private static ClusterMetadataManifest.Builder manifestV0Builder(Object[] fields) { return ClusterMetadataManifest.builder() @@ -86,6 +89,12 @@ private static ClusterMetadataManifest.Builder manifestV2Builder(Object[] fields .customMetadataMap(customMetadata(fields)); } + private static ClusterMetadataManifest.Builder manifestV3Builder(Object[] fields) { + return manifestV2Builder(fields).codecVersion(codecVersion(fields)) + .routingTableVersion(routingTableVersion(fields)) + .indicesRouting(indicesRouting(fields)); + } + private static long term(Object[] fields) { return (long) fields[0]; } @@ -151,6 +160,14 @@ private static Map customMetadata(Object[] fi return customs.stream().collect(Collectors.toMap(UploadedMetadataAttribute::getAttributeName, Function.identity())); } + private static long routingTableVersion(Object[] fields) { + return (long) fields[15]; + } + + private static List indicesRouting(Object[] fields) { + return (List) fields[16]; + } + private static final ConstructingObjectParser PARSER_V0 = new ConstructingObjectParser<>( "cluster_metadata_manifest", fields -> manifestV0Builder(fields).build() @@ -166,12 +183,18 @@ private static Map customMetadata(Object[] fi fields -> manifestV2Builder(fields).build() ); - private static final ConstructingObjectParser CURRENT_PARSER = PARSER_V2; + private static final ConstructingObjectParser PARSER_V3 = new ConstructingObjectParser<>( + "cluster_metadata_manifest", + fields -> manifestV3Builder(fields).build() + ); + + private static final ConstructingObjectParser CURRENT_PARSER = PARSER_V3; static { declareParser(PARSER_V0, CODEC_V0); declareParser(PARSER_V1, CODEC_V1); declareParser(PARSER_V2, CODEC_V2); + declareParser(PARSER_V3, CODEC_V3); } private static void declareParser(ConstructingObjectParser parser, long codec_version) { @@ -216,6 +239,14 @@ private static void declareParser(ConstructingObjectParser= CODEC_V3) { + parser.declareLong(ConstructingObjectParser.constructorArg(), ROUTING_TABLE_VERSION_FIELD); + parser.declareObjectArray( + ConstructingObjectParser.constructorArg(), + (p, c) -> UploadedIndexMetadata.fromXContent(p), + INDICES_ROUTING_FIELD + ); + } } private final int codecVersion; @@ -234,6 +265,8 @@ private static void declareParser(ConstructingObjectParser indicesRouting; public List getIndices() { return indices; @@ -306,6 +339,14 @@ public boolean hasMetadataAttributesFiles() { || !uploadedCustomMetadataMap.isEmpty(); } + public long getRoutingTableVersion() { + return routingTableVersion; + } + + public List getIndicesRouting() { + return indicesRouting; + } + public ClusterMetadataManifest( long clusterTerm, long version, @@ -322,7 +363,9 @@ public ClusterMetadataManifest( UploadedMetadataAttribute uploadedCoordinationMetadata, UploadedMetadataAttribute uploadedSettingsMetadata, UploadedMetadataAttribute uploadedTemplatesMetadata, - Map uploadedCustomMetadataMap + Map uploadedCustomMetadataMap, + long routingTableVersion, + List indicesRouting ) { this.clusterTerm = clusterTerm; this.stateVersion = version; @@ -336,6 +379,8 @@ public ClusterMetadataManifest( this.indices = Collections.unmodifiableList(indices); this.previousClusterUUID = previousClusterUUID; this.clusterUUIDCommitted = clusterUUIDCommitted; + this.routingTableVersion = routingTableVersion; + this.indicesRouting = Collections.unmodifiableList(indicesRouting); this.uploadedCoordinationMetadata = uploadedCoordinationMetadata; this.uploadedSettingsMetadata = uploadedSettingsMetadata; this.uploadedTemplatesMetadata = uploadedTemplatesMetadata; @@ -364,6 +409,8 @@ public ClusterMetadataManifest(StreamInput in) throws IOException { in.readMap(StreamInput::readString, UploadedMetadataAttribute::new) ); this.globalMetadataFileName = null; + this.routingTableVersion = in.readLong(); + this.indicesRouting = Collections.unmodifiableList(in.readList(UploadedIndexMetadata::new)); } else if (in.getVersion().onOrAfter(Version.V_2_12_0)) { this.codecVersion = in.readInt(); this.globalMetadataFileName = in.readString(); @@ -371,6 +418,8 @@ public ClusterMetadataManifest(StreamInput in) throws IOException { this.uploadedSettingsMetadata = null; this.uploadedTemplatesMetadata = null; this.uploadedCustomMetadataMap = null; + this.routingTableVersion = -1; + this.indicesRouting = null; } else { this.codecVersion = CODEC_V0; // Default codec this.globalMetadataFileName = null; @@ -378,6 +427,8 @@ public ClusterMetadataManifest(StreamInput in) throws IOException { this.uploadedSettingsMetadata = null; this.uploadedTemplatesMetadata = null; this.uploadedCustomMetadataMap = null; + this.routingTableVersion = -1; + this.indicesRouting = null; } } @@ -401,7 +452,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.startArray(INDICES_FIELD.getPreferredName()); { for (UploadedIndexMetadata uploadedIndexMetadata : indices) { + builder.startObject(); uploadedIndexMetadata.toXContent(builder, params); + builder.endObject(); } } builder.endArray(); @@ -433,6 +486,18 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(CODEC_VERSION_FIELD.getPreferredName(), getCodecVersion()); builder.field(GLOBAL_METADATA_FIELD.getPreferredName(), getGlobalMetadataFileName()); } + if (onOrAfterCodecVersion(CODEC_V3)) { + builder.field(ROUTING_TABLE_VERSION_FIELD.getPreferredName(), getRoutingTableVersion()); + builder.startArray(INDICES_ROUTING_FIELD.getPreferredName()); + { + for (UploadedIndexMetadata uploadedIndexMetadata : indicesRouting) { + builder.startObject(); + uploadedIndexMetadata.toXContent(builder, params); + builder.endObject(); + } + } + builder.endArray(); + } return builder; } @@ -454,6 +519,8 @@ public void writeTo(StreamOutput out) throws IOException { uploadedSettingsMetadata.writeTo(out); uploadedTemplatesMetadata.writeTo(out); out.writeMap(uploadedCustomMetadataMap, StreamOutput::writeString, (o, v) -> v.writeTo(o)); + out.writeLong(routingTableVersion); + out.writeCollection(indicesRouting); } else if (out.getVersion().onOrAfter(Version.V_2_12_0)) { out.writeInt(codecVersion); out.writeString(globalMetadataFileName); @@ -480,7 +547,9 @@ public boolean equals(Object o) { && Objects.equals(previousClusterUUID, that.previousClusterUUID) && Objects.equals(clusterUUIDCommitted, that.clusterUUIDCommitted) && Objects.equals(globalMetadataFileName, that.globalMetadataFileName) - && Objects.equals(codecVersion, that.codecVersion); + && Objects.equals(codecVersion, that.codecVersion) + && Objects.equals(routingTableVersion, that.routingTableVersion) + && Objects.equals(indicesRouting, that.indicesRouting); } @Override @@ -497,7 +566,9 @@ public int hashCode() { nodeId, committed, previousClusterUUID, - clusterUUIDCommitted + clusterUUIDCommitted, + routingTableVersion, + indicesRouting ); } @@ -518,6 +589,10 @@ public static ClusterMetadataManifest fromXContentV1(XContentParser parser) thro return PARSER_V1.parse(parser, null); } + public static ClusterMetadataManifest fromXContentV2(XContentParser parser) throws IOException { + return PARSER_V2.parse(parser, null); + } + public static ClusterMetadataManifest fromXContent(XContentParser parser) throws IOException { return CURRENT_PARSER.parse(parser, null); } @@ -545,12 +620,24 @@ public static class Builder { private String previousClusterUUID; private boolean committed; private boolean clusterUUIDCommitted; + private long routingTableVersion; + private List indicesRouting; public Builder indices(List indices) { this.indices = indices; return this; } + public Builder routingTableVersion(long routingTableVersion) { + this.routingTableVersion = routingTableVersion; + return this; + } + + public Builder indicesRouting(List indicesRouting) { + this.indicesRouting = indicesRouting; + return this; + } + public Builder codecVersion(int codecVersion) { this.codecVersion = codecVersion; return this; @@ -625,6 +712,10 @@ public List getIndices() { return indices; } + public List getIndicesRouting() { + return indicesRouting; + } + public Builder previousClusterUUID(String previousClusterUUID) { this.previousClusterUUID = previousClusterUUID; return this; @@ -638,6 +729,7 @@ public Builder clusterUUIDCommitted(boolean clusterUUIDCommitted) { public Builder() { indices = new ArrayList<>(); customMetadataMap = new HashMap<>(); + indicesRouting = new ArrayList<>(); } public Builder(ClusterMetadataManifest manifest) { @@ -657,6 +749,8 @@ public Builder(ClusterMetadataManifest manifest) { this.indices = new ArrayList<>(manifest.indices); this.previousClusterUUID = manifest.previousClusterUUID; this.clusterUUIDCommitted = manifest.clusterUUIDCommitted; + this.routingTableVersion = manifest.routingTableVersion; + this.indicesRouting = new ArrayList<>(manifest.indicesRouting); } public ClusterMetadataManifest build() { @@ -676,7 +770,9 @@ public ClusterMetadataManifest build() { coordinationMetadata, settingsMetadata, templatesMetadata, - customMetadataMap + customMetadataMap, + routingTableVersion, + indicesRouting ); } @@ -776,11 +872,9 @@ public String getIndexUUID() { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - return builder.startObject() - .field(INDEX_NAME_FIELD.getPreferredName(), getIndexName()) + return builder.field(INDEX_NAME_FIELD.getPreferredName(), getIndexName()) .field(INDEX_UUID_FIELD.getPreferredName(), getIndexUUID()) - .field(UPLOADED_FILENAME_FIELD.getPreferredName(), getUploadedFilePath()) - .endObject(); + .field(UPLOADED_FILENAME_FIELD.getPreferredName(), getUploadedFilePath()); } @Override diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index fdf6b2f94ba6b..d0593dcd51475 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -216,7 +216,7 @@ public class RemoteClusterStateService implements Closeable { + "indices, coordination metadata updated : [{}], settings metadata updated : [{}], templates metadata " + "updated : [{}], custom metadata updated : [{}]"; public static final int INDEX_METADATA_CURRENT_CODEC_VERSION = 1; - public static final int MANIFEST_CURRENT_CODEC_VERSION = ClusterMetadataManifest.CODEC_V2; + public static final int MANIFEST_CURRENT_CODEC_VERSION = ClusterMetadataManifest.CODEC_V3; public static final int GLOBAL_METADATA_CURRENT_CODEC_VERSION = 2; // ToXContent Params with gateway mode. @@ -806,7 +806,10 @@ private ClusterMetadataManifest uploadManifest( uploadedCoordinationMetadata, uploadedSettingsMetadata, uploadedTemplatesMetadata, - uploadedCustomMetadataMap + uploadedCustomMetadataMap, + clusterState.routingTable().version(), + // TODO: Add actual list of changed indices routing with index routing upload flow. + new ArrayList<>() ); writeMetadataManifest(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), manifest, manifestFileName); return manifest; diff --git a/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeader.java b/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeader.java new file mode 100644 index 0000000000000..5baea6adba0c7 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeader.java @@ -0,0 +1,81 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.routingtable; + +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.index.IndexFormatTooNewException; +import org.apache.lucene.index.IndexFormatTooOldException; +import org.apache.lucene.store.InputStreamDataInput; +import org.apache.lucene.store.OutputStreamDataOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; + +import java.io.EOFException; +import java.io.IOException; + +/** + * The stored header information for the individual index routing table + */ +public class IndexRoutingTableHeader implements Writeable { + + public static final String INDEX_ROUTING_HEADER_CODEC = "index_routing_header_codec"; + public static final int INITIAL_VERSION = 1; + public static final int CURRENT_VERSION = INITIAL_VERSION; + private final String indexName; + + public IndexRoutingTableHeader(String indexName) { + this.indexName = indexName; + } + + /** + * Reads the contents on the stream into the corresponding {@link IndexRoutingTableHeader} + * + * @param in streamInput + * @throws IOException exception thrown on failing to read from stream. + */ + public IndexRoutingTableHeader(StreamInput in) throws IOException { + try { + readHeaderVersion(in); + indexName = in.readString(); + } catch (EOFException e) { + throw new IOException("index routing header truncated", e); + } + } + + private void readHeaderVersion(final StreamInput in) throws IOException { + try { + CodecUtil.checkHeader(new InputStreamDataInput(in), INDEX_ROUTING_HEADER_CODEC, INITIAL_VERSION, CURRENT_VERSION); + } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException e) { + throw new IOException("index routing table header corrupted", e); + } + } + + /** + * Write the IndexRoutingTable to given stream. + * + * @param out stream to write + * @throws IOException exception thrown on failing to write to stream. + */ + public void writeTo(StreamOutput out) throws IOException { + try { + CodecUtil.writeHeader(new OutputStreamDataOutput(out), INDEX_ROUTING_HEADER_CODEC, CURRENT_VERSION); + out.writeString(indexName); + out.flush(); + } catch (IOException e) { + throw new IOException("Failed to write IndexRoutingTable header", e); + } + } + + public String getIndexName() { + return indexName; + } + +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTable.java b/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTable.java new file mode 100644 index 0000000000000..17c55190da07f --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTable.java @@ -0,0 +1,100 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.routingtable; + +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.core.common.io.stream.BufferedChecksumStreamInput; +import org.opensearch.core.common.io.stream.BufferedChecksumStreamOutput; +import org.opensearch.core.common.io.stream.InputStreamStreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.index.Index; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; + +/** + * Remote store object for IndexRoutingTable + */ +public class RemoteIndexRoutingTable implements Writeable { + + private final IndexRoutingTable indexRoutingTable; + + public RemoteIndexRoutingTable(IndexRoutingTable indexRoutingTable) { + this.indexRoutingTable = indexRoutingTable; + } + + /** + * Reads data from inputStream and creates RemoteIndexRoutingTable object with the {@link IndexRoutingTable} + * @param inputStream input stream with index routing data + * @param index index for the current routing data + * @throws IOException exception thrown on failing to read from stream. + */ + public RemoteIndexRoutingTable(InputStream inputStream, Index index) throws IOException { + try { + try (BufferedChecksumStreamInput in = new BufferedChecksumStreamInput(new InputStreamStreamInput(inputStream), "assertion")) { + // Read the Table Header first and confirm the index + IndexRoutingTableHeader indexRoutingTableHeader = new IndexRoutingTableHeader(in); + assert indexRoutingTableHeader.getIndexName().equals(index.getName()); + + int numberOfShardRouting = in.readVInt(); + IndexRoutingTable.Builder indicesRoutingTable = IndexRoutingTable.builder(index); + for (int idx = 0; idx < numberOfShardRouting; idx++) { + IndexShardRoutingTable indexShardRoutingTable = IndexShardRoutingTable.Builder.readFrom(in); + indicesRoutingTable.addIndexShard(indexShardRoutingTable); + } + verifyCheckSum(in); + indexRoutingTable = indicesRoutingTable.build(); + } + } catch (EOFException e) { + throw new IOException("Indices Routing table is corrupted", e); + } + } + + public IndexRoutingTable getIndexRoutingTable() { + return indexRoutingTable; + } + + /** + * Writes {@link IndexRoutingTable} to the given stream + * @param streamOutput output stream to write + * @throws IOException exception thrown on failing to write to stream. + */ + @Override + public void writeTo(StreamOutput streamOutput) throws IOException { + try { + BufferedChecksumStreamOutput out = new BufferedChecksumStreamOutput(streamOutput); + IndexRoutingTableHeader indexRoutingTableHeader = new IndexRoutingTableHeader(indexRoutingTable.getIndex().getName()); + indexRoutingTableHeader.writeTo(out); + out.writeVInt(indexRoutingTable.shards().size()); + for (IndexShardRoutingTable next : indexRoutingTable) { + IndexShardRoutingTable.Builder.writeTo(next, out); + } + out.writeLong(out.getChecksum()); + out.flush(); + } catch (IOException e) { + throw new IOException("Failed to write IndexRoutingTable to stream", e); + } + } + + private void verifyCheckSum(BufferedChecksumStreamInput in) throws IOException { + long expectedChecksum = in.getChecksum(); + long readChecksum = in.readLong(); + if (readChecksum != expectedChecksum) { + throw new IOException( + "checksum verification failed - expected: 0x" + + Long.toHexString(expectedChecksum) + + ", got: 0x" + + Long.toHexString(readChecksum) + ); + } + } +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/routingtable/package-info.java b/server/src/main/java/org/opensearch/gateway/remote/routingtable/package-info.java new file mode 100644 index 0000000000000..a6cb2251a5dd7 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/routingtable/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Package containing class to perform operations on remote routing table. + */ +package org.opensearch.gateway.remote.routingtable; diff --git a/server/src/main/java/org/opensearch/index/translog/BaseTranslogReader.java b/server/src/main/java/org/opensearch/index/translog/BaseTranslogReader.java index d6fa2a2e53de3..37af1dcbeab8b 100644 --- a/server/src/main/java/org/opensearch/index/translog/BaseTranslogReader.java +++ b/server/src/main/java/org/opensearch/index/translog/BaseTranslogReader.java @@ -32,6 +32,7 @@ package org.opensearch.index.translog; +import org.opensearch.core.common.io.stream.BufferedChecksumStreamInput; import org.opensearch.core.common.io.stream.ByteBufferStreamInput; import org.opensearch.index.seqno.SequenceNumbers; diff --git a/server/src/main/java/org/opensearch/index/translog/Translog.java b/server/src/main/java/org/opensearch/index/translog/Translog.java index 842e9c77d2350..87e0c21b8203c 100644 --- a/server/src/main/java/org/opensearch/index/translog/Translog.java +++ b/server/src/main/java/org/opensearch/index/translog/Translog.java @@ -48,6 +48,8 @@ import org.opensearch.core.common.Strings; import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.BufferedChecksumStreamInput; +import org.opensearch.core.common.io.stream.BufferedChecksumStreamOutput; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.index.shard.ShardId; diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogHeader.java b/server/src/main/java/org/opensearch/index/translog/TranslogHeader.java index 7b5be9505f27a..66a9fe08d06b5 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogHeader.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogHeader.java @@ -40,6 +40,8 @@ import org.apache.lucene.store.OutputStreamDataOutput; import org.apache.lucene.util.BytesRef; import org.opensearch.common.io.Channels; +import org.opensearch.core.common.io.stream.BufferedChecksumStreamInput; +import org.opensearch.core.common.io.stream.BufferedChecksumStreamOutput; import org.opensearch.core.common.io.stream.InputStreamStreamInput; import org.opensearch.core.common.io.stream.OutputStreamStreamOutput; import org.opensearch.core.common.io.stream.StreamInput; diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogSnapshot.java b/server/src/main/java/org/opensearch/index/translog/TranslogSnapshot.java index 89718156cbbe8..521472f4d64a0 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogSnapshot.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogSnapshot.java @@ -32,6 +32,7 @@ package org.opensearch.index.translog; import org.opensearch.common.io.Channels; +import org.opensearch.core.common.io.stream.BufferedChecksumStreamInput; import org.opensearch.index.seqno.SequenceNumbers; import java.io.EOFException; diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogWriter.java b/server/src/main/java/org/opensearch/index/translog/TranslogWriter.java index 86f7567f3333d..b0c7d51c3e43b 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogWriter.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogWriter.java @@ -50,6 +50,7 @@ import org.opensearch.core.Assertions; import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.BufferedChecksumStreamInput; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.seqno.SequenceNumbers; diff --git a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java index 0b3cd49140939..d1f559eb75f85 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java @@ -99,7 +99,7 @@ public void testClusterMetadataManifestXContent() throws IOException { Version.CURRENT, "test-node-id", false, - ClusterMetadataManifest.CODEC_V2, + ClusterMetadataManifest.CODEC_V3, null, Collections.singletonList(uploadedIndexMetadata), "prev-cluster-uuid", @@ -123,7 +123,9 @@ public void testClusterMetadataManifestXContent() throws IOException { "custom--weighted_routing_netadata-file" ) ) - ).stream().collect(Collectors.toMap(UploadedMetadataAttribute::getAttributeName, Function.identity())) + ).stream().collect(Collectors.toMap(UploadedMetadataAttribute::getAttributeName, Function.identity())), + 1L, + randomUploadedIndexMetadataList() ); final XContentBuilder builder = JsonXContent.contentBuilder(); builder.startObject(); @@ -169,7 +171,9 @@ public void testClusterMetadataManifestSerializationEqualsHashCode() { "custom--weighted_routing_netadata-file" ) ) - ).stream().collect(Collectors.toMap(UploadedMetadataAttribute::getAttributeName, Function.identity())) + ).stream().collect(Collectors.toMap(UploadedMetadataAttribute::getAttributeName, Function.identity())), + 1L, + randomUploadedIndexMetadataList() ); { // Mutate Cluster Term EqualsHashCodeTestUtils.checkEqualsAndHashCode( @@ -309,6 +313,106 @@ public void testClusterMetadataManifestSerializationEqualsHashCode() { } } + public void testClusterMetadataManifestXContentV2() throws IOException { + UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "test-uuid", "/test/upload/path"); + UploadedMetadataAttribute uploadedMetadataAttribute = new UploadedMetadataAttribute("attribute_name", "testing_attribute"); + ClusterMetadataManifest originalManifest = new ClusterMetadataManifest( + 1L, + 1L, + "test-cluster-uuid", + "test-state-uuid", + Version.CURRENT, + "test-node-id", + false, + ClusterMetadataManifest.CODEC_V2, + null, + Collections.singletonList(uploadedIndexMetadata), + "prev-cluster-uuid", + true, + uploadedMetadataAttribute, + uploadedMetadataAttribute, + uploadedMetadataAttribute, + Collections.unmodifiableList( + Arrays.asList( + new UploadedMetadataAttribute( + RemoteClusterStateService.CUSTOM_METADATA + RemoteClusterStateService.CUSTOM_DELIMITER + RepositoriesMetadata.TYPE, + "custom--repositories-file" + ), + new UploadedMetadataAttribute( + RemoteClusterStateService.CUSTOM_METADATA + RemoteClusterStateService.CUSTOM_DELIMITER + IndexGraveyard.TYPE, + "custom--index_graveyard-file" + ), + new UploadedMetadataAttribute( + RemoteClusterStateService.CUSTOM_METADATA + RemoteClusterStateService.CUSTOM_DELIMITER + + WeightedRoutingMetadata.TYPE, + "custom--weighted_routing_netadata-file" + ) + ) + ).stream().collect(Collectors.toMap(UploadedMetadataAttribute::getAttributeName, Function.identity())), + 0, + new ArrayList<>() + ); + final XContentBuilder builder = JsonXContent.contentBuilder(); + builder.startObject(); + originalManifest.toXContent(builder, ToXContent.EMPTY_PARAMS); + builder.endObject(); + + try (XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder))) { + final ClusterMetadataManifest fromXContentManifest = ClusterMetadataManifest.fromXContentV2(parser); + assertEquals(originalManifest, fromXContentManifest); + } + } + + public void testClusterMetadataManifestXContentV3() throws IOException { + UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "test-uuid", "/test/upload/path"); + UploadedMetadataAttribute uploadedMetadataAttribute = new UploadedMetadataAttribute("attribute_name", "testing_attribute"); + ClusterMetadataManifest originalManifest = new ClusterMetadataManifest( + 1L, + 1L, + "test-cluster-uuid", + "test-state-uuid", + Version.CURRENT, + "test-node-id", + false, + ClusterMetadataManifest.CODEC_V3, + null, + Collections.singletonList(uploadedIndexMetadata), + "prev-cluster-uuid", + true, + uploadedMetadataAttribute, + uploadedMetadataAttribute, + uploadedMetadataAttribute, + Collections.unmodifiableList( + Arrays.asList( + new UploadedMetadataAttribute( + RemoteClusterStateService.CUSTOM_METADATA + RemoteClusterStateService.CUSTOM_DELIMITER + RepositoriesMetadata.TYPE, + "custom--repositories-file" + ), + new UploadedMetadataAttribute( + RemoteClusterStateService.CUSTOM_METADATA + RemoteClusterStateService.CUSTOM_DELIMITER + IndexGraveyard.TYPE, + "custom--index_graveyard-file" + ), + new UploadedMetadataAttribute( + RemoteClusterStateService.CUSTOM_METADATA + RemoteClusterStateService.CUSTOM_DELIMITER + + WeightedRoutingMetadata.TYPE, + "custom--weighted_routing_netadata-file" + ) + ) + ).stream().collect(Collectors.toMap(UploadedMetadataAttribute::getAttributeName, Function.identity())), + 1L, + Collections.singletonList(uploadedIndexMetadata) + ); + final XContentBuilder builder = JsonXContent.contentBuilder(); + builder.startObject(); + originalManifest.toXContent(builder, ToXContent.EMPTY_PARAMS); + builder.endObject(); + + try (XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder))) { + final ClusterMetadataManifest fromXContentManifest = ClusterMetadataManifest.fromXContent(parser); + assertEquals(originalManifest, fromXContentManifest); + } + } + private List randomUploadedIndexMetadataList() { final int size = randomIntBetween(1, 10); final List uploadedIndexMetadataList = new ArrayList<>(size); diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 324023061ac46..4a53770c76d88 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -548,7 +548,7 @@ private void verifyWriteIncrementalGlobalMetadataFromOlderCodecSuccess(ClusterMe ); final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() - .codecVersion(2) + .codecVersion(3) .indices(Collections.emptyList()) .clusterTerm(1L) .stateVersion(1L) @@ -1072,6 +1072,8 @@ public void testReadGlobalMetadata() throws IOException { .nodeId("nodeA") .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) .previousClusterUUID("prev-cluster-uuid") + .routingTableVersion(1) + .indicesRouting(List.of()) .build(); Metadata expactedMetadata = Metadata.builder().persistentSettings(Settings.builder().put("readonly", true).build()).build(); @@ -1469,7 +1471,7 @@ private void mockObjectsForGettingPreviousClusterUUID( .build(); Map indexMetadataMap1 = Map.of("index-uuid1", indexMetadata1, "index-uuid2", indexMetadata2); mockBlobContainerForGlobalMetadata(blobContainer1, clusterManifest1, metadata1); - mockBlobContainer(blobContainer1, clusterManifest1, indexMetadataMap1, ClusterMetadataManifest.CODEC_V2); + mockBlobContainer(blobContainer1, clusterManifest1, indexMetadataMap1, ClusterMetadataManifest.CODEC_V3); List uploadedIndexMetadataList2 = List.of( new UploadedIndexMetadata("index1", "index-uuid1", "key1"), @@ -1501,7 +1503,7 @@ private void mockObjectsForGettingPreviousClusterUUID( .build(); Map indexMetadataMap2 = Map.of("index-uuid1", indexMetadata3, "index-uuid2", indexMetadata4); mockBlobContainerForGlobalMetadata(blobContainer2, clusterManifest2, metadata2); - mockBlobContainer(blobContainer2, clusterManifest2, indexMetadataMap2, ClusterMetadataManifest.CODEC_V2); + mockBlobContainer(blobContainer2, clusterManifest2, indexMetadataMap2, ClusterMetadataManifest.CODEC_V3); // differGlobalMetadata controls which one of IndexMetadata or Metadata object would be different // when comparing cluster-uuid3 and cluster-uuid1 state. @@ -1535,7 +1537,7 @@ private void mockObjectsForGettingPreviousClusterUUID( clusterUUIDCommitted.getOrDefault("cluster-uuid3", true) ); mockBlobContainerForGlobalMetadata(blobContainer3, clusterManifest3, metadata3); - mockBlobContainer(blobContainer3, clusterManifest3, indexMetadataMap3, ClusterMetadataManifest.CODEC_V2); + mockBlobContainer(blobContainer3, clusterManifest3, indexMetadataMap3, ClusterMetadataManifest.CODEC_V3); ArrayList mockBlobContainerOrderedList = new ArrayList<>( List.of(blobContainer1, blobContainer1, blobContainer3, blobContainer3, blobContainer2, blobContainer2) diff --git a/server/src/test/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeaderTests.java b/server/src/test/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeaderTests.java new file mode 100644 index 0000000000000..a3f0ac36a40f1 --- /dev/null +++ b/server/src/test/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeaderTests.java @@ -0,0 +1,32 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.routingtable; + +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.BytesStreamInput; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; + +public class IndexRoutingTableHeaderTests extends OpenSearchTestCase { + + public void testIndexRoutingTableHeader() throws IOException { + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + IndexRoutingTableHeader header = new IndexRoutingTableHeader(indexName); + try (BytesStreamOutput out = new BytesStreamOutput()) { + header.writeTo(out); + + BytesStreamInput in = new BytesStreamInput(out.bytes().toBytesRef().bytes); + IndexRoutingTableHeader headerRead = new IndexRoutingTableHeader(in); + assertEquals(indexName, headerRead.getIndexName()); + + } + } + +} diff --git a/server/src/test/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTableTests.java b/server/src/test/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTableTests.java new file mode 100644 index 0000000000000..72066d8afb45b --- /dev/null +++ b/server/src/test/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTableTests.java @@ -0,0 +1,87 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.routingtable; + +import org.opensearch.Version; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +public class RemoteIndexRoutingTableTests extends OpenSearchTestCase { + + public void testRoutingTableInput() { + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + int numberOfShards = randomIntBetween(1, 10); + int numberOfReplicas = randomIntBetween(1, 10); + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder(indexName) + .settings(settings(Version.CURRENT)) + .numberOfShards(numberOfShards) + .numberOfReplicas(numberOfReplicas) + ) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index(indexName)).build(); + + initialRoutingTable.getIndicesRouting().values().forEach(indexShardRoutingTables -> { + RemoteIndexRoutingTable indexRouting = new RemoteIndexRoutingTable(indexShardRoutingTables); + try (BytesStreamOutput streamOutput = new BytesStreamOutput();) { + indexRouting.writeTo(streamOutput); + RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable( + streamOutput.bytes().streamInput(), + metadata.index(indexName).getIndex() + ); + IndexRoutingTable indexRoutingTable = remoteIndexRoutingTable.getIndexRoutingTable(); + assertEquals(numberOfShards, indexRoutingTable.getShards().size()); + assertEquals(metadata.index(indexName).getIndex(), indexRoutingTable.getIndex()); + assertEquals( + numberOfShards * (1 + numberOfReplicas), + indexRoutingTable.shardsWithState(ShardRoutingState.UNASSIGNED).size() + ); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + + public void testRoutingTableInputStreamWithInvalidIndex() { + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .put(IndexMetadata.builder("invalid-index").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + AtomicInteger assertionError = new AtomicInteger(); + initialRoutingTable.getIndicesRouting().values().forEach(indexShardRoutingTables -> { + RemoteIndexRoutingTable indexRouting = new RemoteIndexRoutingTable(indexShardRoutingTables); + try (BytesStreamOutput streamOutput = new BytesStreamOutput()) { + indexRouting.writeTo(streamOutput); + RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable( + streamOutput.bytes().streamInput(), + metadata.index("invalid-index").getIndex() + ); + } catch (AssertionError e) { + assertionError.getAndIncrement(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + assertEquals(1, assertionError.get()); + } + +} diff --git a/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java b/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java index c5f36fcc01983..95a8267734a07 100644 --- a/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java +++ b/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java @@ -49,13 +49,13 @@ import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.common.io.stream.BufferedChecksumStreamOutput; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.compress.CompressorRegistry; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; -import org.opensearch.index.translog.BufferedChecksumStreamOutput; import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; import org.opensearch.test.OpenSearchTestCase;