Skip to content

Commit

Permalink
Address review comments.
Browse files Browse the repository at this point in the history
Signed-off-by: Shailendra Singh <[email protected]>
  • Loading branch information
Shailendra Singh committed Jul 12, 2024
1 parent 5b81f3a commit 02e0313
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 48 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRouting
RoutingTable before,
RoutingTable after
) {
return DiffableUtils.diff(Map.of(), Map.of(), DiffableUtils.getStringKeySerializer(), CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER);
return DiffableUtils.diff(Map.of(), Map.of(), DiffableUtils.getStringKeySerializer(), CUSTOM_ROUTING_TABLE_DIFFABLE_VALUE_SERIALIZER);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,6 @@
* @opensearch.internal
*/
public interface RemoteRoutingTableService extends LifecycleComponent {
public static final DiffableUtils.NonDiffableValueSerializer<String, IndexRoutingTable> CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER =
new DiffableUtils.NonDiffableValueSerializer<String, IndexRoutingTable>() {
@Override
public void write(IndexRoutingTable value, StreamOutput out) throws IOException {
value.writeTo(out);
}

@Override
public IndexRoutingTable read(StreamInput in, String key) throws IOException {
return IndexRoutingTable.readFrom(in);
}
};

public static final DiffableUtils.DiffableValueSerializer<String, IndexRoutingTable> CUSTOM_ROUTING_TABLE_DIFFABLE_VALUE_SERIALIZER =
new DiffableUtils.DiffableValueSerializer<String, IndexRoutingTable>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public class ClusterStateDiffManifest implements ToXContentFragment, Writeable {
private final List<String> clusterStateCustomUpdated;
private final List<String> clusterStateCustomDeleted;

public ClusterStateDiffManifest(ClusterState state, ClusterState previousState) {
public ClusterStateDiffManifest(ClusterState state, ClusterState previousState, DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>> routingTableIncrementalDiff) {
fromStateUUID = previousState.stateUUID();
toStateUUID = state.stateUUID();
coordinationMetadataUpdated = !Metadata.isCoordinationMetadataEqual(state.metadata(), previousState.metadata());
Expand All @@ -104,24 +104,12 @@ public ClusterStateDiffManifest(ClusterState state, ClusterState previousState)
customMetadataUpdated.addAll(customDiff.getUpserts().keySet());
customMetadataDeleted = customDiff.getDeletes();

DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>> routingTableDiff = DiffableUtils.diff(
previousState.getRoutingTable().getIndicesRouting(),
state.getRoutingTable().getIndicesRouting(),
DiffableUtils.getStringKeySerializer(),
CUSTOM_ROUTING_TABLE_DIFFABLE_VALUE_SERIALIZER
);

DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>> routingTableDiff1 = DiffableUtils.diff(
previousState.getRoutingTable().getIndicesRouting(),
state.getRoutingTable().getIndicesRouting(),
DiffableUtils.getStringKeySerializer(),
CUSTOM_ROUTING_TABLE_DIFFABLE_VALUE_SERIALIZER
);

indicesRoutingUpdated = new ArrayList<>();
routingTableDiff.getUpserts().forEach((k, v) -> indicesRoutingUpdated.add(k));

indicesRoutingDeleted = routingTableDiff.getDeletes();
indicesRoutingDeleted = new ArrayList<>();
if (routingTableIncrementalDiff != null) {
routingTableIncrementalDiff.getUpserts().forEach((k, v) -> indicesRoutingUpdated.add(k));
indicesRoutingDeleted.addAll(routingTableIncrementalDiff.getDeletes());
}
hashesOfConsistentSettingsUpdated = !state.metadata()
.hashesOfConsistentSettings()
.equals(previousState.metadata().hashesOfConsistentSettings());
Expand Down Expand Up @@ -590,7 +578,8 @@ public static class Builder {
private List<String> clusterStateCustomUpdated;
private List<String> clusterStateCustomDeleted;

public Builder() {}
public Builder() {
}

public Builder fromStateUUID(String fromStateUUID) {
this.fromStateUUID = fromStateUUID;
Expand Down Expand Up @@ -667,7 +656,7 @@ public Builder indicesRoutingDeleted(List<String> indicesRoutingDeleted) {
return this;
}

public Builder indicesRoutingDiffPath(String indicesRoutingDiffPath){
public Builder indicesRoutingDiffPath(String indicesRoutingDiffPath) {
this.indicesRoutingDiff = indicesRoutingDiffPath;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,11 +242,15 @@ public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterStat
remoteRoutingTableService.getIndicesRouting(clusterState.getRoutingTable()),
null
);
ClusterStateDiffManifest clusterStateDiffManifest = new ClusterStateDiffManifest(clusterState, ClusterState.EMPTY_STATE, null);
if (uploadedMetadataResults.uploadedIndicesRoutingDiffMetadata != null) {
clusterStateDiffManifest.setIndicesRoutingDiffPath(uploadedMetadataResults.uploadedIndicesRoutingDiffMetadata.getUploadedFilePath());
}
final RemoteClusterStateManifestInfo manifestDetails = remoteManifestManager.uploadManifest(
clusterState,
uploadedMetadataResults,
previousClusterUUID,
new ClusterStateDiffManifest(clusterState, ClusterState.EMPTY_STATE),
clusterStateDiffManifest,
false
);

Expand Down Expand Up @@ -341,8 +345,8 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
.getIndicesRoutingMapDiff(previousClusterState.getRoutingTable(), clusterState.getRoutingTable());

Map<String, Diff<IndexRoutingTable>> indexRoutingTableDiffs = routingTableIncrementalDiff.getDiffs();
routingTableIncrementalDiff.getDiffs().forEach((k,v)-> indicesRoutingToUpload.add(clusterState.getRoutingTable().index(k)));
routingTableIncrementalDiff.getUpserts().forEach((k,v)-> indicesRoutingToUpload.add(v));
routingTableIncrementalDiff.getDiffs().forEach((k, v) -> indicesRoutingToUpload.add(clusterState.getRoutingTable().index(k)));
routingTableIncrementalDiff.getUpserts().forEach((k, v) -> indicesRoutingToUpload.add(v));

UploadedMetadataResults uploadedMetadataResults;
// For migration case from codec V0 or V1 to V2, we have added null check on metadata attribute files,
Expand Down Expand Up @@ -424,11 +428,15 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
routingTableIncrementalDiff.getDeletes()
);

ClusterStateDiffManifest clusterStateDiffManifest = new ClusterStateDiffManifest(clusterState, previousClusterState, routingTableIncrementalDiff);
if (uploadedMetadataResults.uploadedIndicesRoutingDiffMetadata != null) {
clusterStateDiffManifest.setIndicesRoutingDiffPath(uploadedMetadataResults.uploadedIndicesRoutingDiffMetadata.getUploadedFilePath());
}
final RemoteClusterStateManifestInfo manifestDetails = remoteManifestManager.uploadManifest(
clusterState,
uploadedMetadataResults,
previousManifest.getPreviousClusterUUID(),
new ClusterStateDiffManifest(clusterState, previousClusterState),
clusterStateDiffManifest,
false
);

Expand Down Expand Up @@ -506,7 +514,7 @@ UploadedMetadataResults writeMetadataInParallel(
+ (uploadCoordinationMetadata ? 1 : 0) + (uploadSettingsMetadata ? 1 : 0) + (uploadTemplateMetadata ? 1 : 0)
+ (uploadDiscoveryNodes ? 1 : 0) + (uploadClusterBlock ? 1 : 0) + (uploadTransientSettingMetadata ? 1 : 0)
+ clusterStateCustomToUpload.size() + (uploadHashesOfConsistentSettings ? 1 : 0) + indicesRoutingToUpload.size()
+ (indexRoutingTableDiff != null && !indexRoutingTableDiff.isEmpty()? 1 : 0);
+ (indexRoutingTableDiff != null && !indexRoutingTableDiff.isEmpty() ? 1 : 0);
CountDownLatch latch = new CountDownLatch(totalUploadTasks);
BlobPath clusterBasePath = getClusterMetadataBasePath(
blobStoreRepository,
Expand Down Expand Up @@ -678,7 +686,7 @@ UploadedMetadataResults writeMetadataInParallel(
});
indicesRoutingToUpload.forEach(indexRoutingTable -> {
uploadTasks.put(
InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX,
InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + indexRoutingTable.getIndex().getName(),
remoteRoutingTableService.getIndexRoutingAsyncAction(
clusterState,
indexRoutingTable,
Expand Down Expand Up @@ -748,7 +756,7 @@ UploadedMetadataResults writeMetadataInParallel(
&& uploadedMetadata.getComponent().contains(InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX)) {
response.uploadedIndicesRoutingMetadata.add((UploadedIndexMetadata) uploadedMetadata);
} else if (uploadedMetadata.getClass().equals(UploadedIndexMetadata.class)
&& uploadedMetadata.getComponent().contains(InternalRemoteRoutingTableService.INDEX_ROUTING_DIFF_METADATA_PREFIX)){
&& uploadedMetadata.getComponent().contains(InternalRemoteRoutingTableService.INDEX_ROUTING_DIFF_METADATA_PREFIX)) {
response.uploadedIndicesRoutingDiffMetadata = (UploadedIndexMetadata) uploadedMetadata;
} else if (name.startsWith(CUSTOM_METADATA)) {
// component name for custom metadata will look like custom--<metadata-attribute>
Expand Down Expand Up @@ -1024,7 +1032,7 @@ private ClusterState readClusterStateInParallel(
? 1
: 0) + (readTemplatesMetadata ? 1 : 0) + (readDiscoveryNodes ? 1 : 0) + (readClusterBlocks ? 1 : 0)
+ (readTransientSettingsMetadata ? 1 : 0) + (readHashesOfConsistentSettings ? 1 : 0) + clusterStateCustomToRead.size()
+ indicesRoutingToRead.size() + (readIndexRoutingTableDiff ? 1:0);
+ indicesRoutingToRead.size() + (readIndexRoutingTableDiff ? 1 : 0);
CountDownLatch latch = new CountDownLatch(totalReadTasks);
List<CheckedRunnable<IOException>> asyncMetadataReadActions = new ArrayList<>();
List<RemoteReadResult> readResults = Collections.synchronizedList(new ArrayList<>());
Expand Down Expand Up @@ -1346,7 +1354,7 @@ public ClusterState getClusterStateForManifest(
includeEphemeral ? manifest.getIndicesRouting() : emptyList(),
includeEphemeral && manifest.getHashesOfConsistentSettings() != null,
includeEphemeral ? manifest.getClusterStateCustomMap() : emptyMap(),
manifest.getDiffManifest() != null && manifest.getDiffManifest().getIndicesRoutingDiffPath()!=null && !manifest.getDiffManifest().getIndicesRoutingDiffPath().isEmpty(),
includeEphemeral && manifest.getDiffManifest() != null && manifest.getDiffManifest().getIndicesRoutingDiffPath() != null && !manifest.getDiffManifest().getIndicesRoutingDiffPath().isEmpty(),
includeEphemeral
);
} else {
Expand Down Expand Up @@ -1428,7 +1436,7 @@ public ClusterState getClusterStateUsingDiff(
updatedIndexRouting,
diff.isHashesOfConsistentSettingsUpdated(),
updatedClusterStateCustom,
manifest.getDiffManifest() != null && manifest.getDiffManifest().getIndicesRoutingDiffPath()!=null && !manifest.getDiffManifest().getIndicesRoutingDiffPath().isEmpty(),
manifest.getDiffManifest() != null && manifest.getDiffManifest().getIndicesRoutingDiffPath() != null && !manifest.getDiffManifest().getIndicesRoutingDiffPath().isEmpty(),
true
);
ClusterState.Builder clusterStateBuilder = ClusterState.builder(updatedClusterState);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote.routingtable;

import org.opensearch.cluster.Diff;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* Represents the differences between {@link IndexRoutingTable}s for uploading and downloading index routing table diff data to/from a remote store.
*
* @opensearch.internal
*/
public class RemoteIndexShardRoutingTableDiff implements Diff<IndexRoutingTable> {

private final List<IndexShardRoutingTable> indexShardRoutingTables;

/**
* Constructs a new RemoteIndexShardRoutingTableDiff with the given shard routing tables.
*
* @param indexShardRoutingTables the list of {@link IndexShardRoutingTable}s representing the differences.
*/
public RemoteIndexShardRoutingTableDiff(List<IndexShardRoutingTable> indexShardRoutingTables) {
this.indexShardRoutingTables = indexShardRoutingTables;
}

/**
* Applies the differences to the provided {@link IndexRoutingTable}.
*
* @param part the original {@link IndexRoutingTable}.
* @return the updated {@link IndexRoutingTable} with the applied differences.
*/
@Override
public IndexRoutingTable apply(IndexRoutingTable part) {
IndexRoutingTable.Builder builder = new IndexRoutingTable.Builder(part.getIndex());
for (IndexShardRoutingTable shardRoutingTable : part) {
builder.addIndexShard(shardRoutingTable); // Add existing shards to builder
}

// Apply the diff: update or add the new shard routing tables
for (IndexShardRoutingTable diffShard : indexShardRoutingTables) {
builder.addIndexShard(diffShard);
}
return builder.build();
}

/**
* Writes the shard routing tables to the provided {@link StreamOutput}.
*
* @param out the output stream to write to.
* @throws IOException if an I/O exception occurs while writing.
*/
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(indexShardRoutingTables.size());
for (IndexShardRoutingTable shardRoutingTable : indexShardRoutingTables) {
IndexShardRoutingTable.Builder.writeTo(shardRoutingTable, out);
}
}

/**
* Reads the shard routing tables from the provided {@link StreamInput}.
*
* @param in the input stream to read from.
* @return a new {@link RemoteIndexShardRoutingTableDiff} with the read shard routing tables.
* @throws IOException if an I/O exception occurs while reading.
*/
public static RemoteIndexShardRoutingTableDiff readFrom(StreamInput in) throws IOException {
int size = in.readVInt();
List<IndexShardRoutingTable> indexShardRoutingTables = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
IndexShardRoutingTable shardRoutingTable = IndexShardRoutingTable.Builder.readFrom(in);
indexShardRoutingTables.add(shardRoutingTable);
}
return new RemoteIndexShardRoutingTableDiff(indexShardRoutingTables);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,8 @@ public void testClusterMetadataManifestSerializationEqualsHashCode() {
.diffManifest(
new ClusterStateDiffManifest(
RemoteClusterStateServiceTests.generateClusterStateWithOneIndex().build(),
ClusterState.EMPTY_STATE
ClusterState.EMPTY_STATE,
null
)
)
.build();
Expand Down Expand Up @@ -523,7 +524,8 @@ public void testClusterMetadataManifestXContentV2() throws IOException {
.diffManifest(
new ClusterStateDiffManifest(
RemoteClusterStateServiceTests.generateClusterStateWithOneIndex().build(),
ClusterState.EMPTY_STATE
ClusterState.EMPTY_STATE,
null
)
)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ private ClusterStateDiffManifest updateAndVerifyState(
}
ClusterState updatedClusterState = clusterStateBuilder.metadata(metadataBuilder.build()).build();

ClusterStateDiffManifest manifest = new ClusterStateDiffManifest(updatedClusterState, initialState);
ClusterStateDiffManifest manifest = new ClusterStateDiffManifest(updatedClusterState, initialState, null);
assertEquals(indicesToAdd.stream().map(im -> im.getIndex().getName()).collect(toList()), manifest.getIndicesUpdated());
assertEquals(indicesToRemove, manifest.getIndicesDeleted());
assertEquals(new ArrayList<>(customsToAdd.keySet()), manifest.getCustomMetadataUpdated());
Expand Down

0 comments on commit 02e0313

Please sign in to comment.