Skip to content

Commit

Permalink
Add remote routing table changes in diff Manifest
Browse files Browse the repository at this point in the history
Signed-off-by: Arpit Bandejiya <[email protected]>
  • Loading branch information
Arpit-Bandejiya authored and shiv0408 committed Jun 8, 2024
1 parent 255075c commit c83706a
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,22 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.DiffableUtils;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.node.Node;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.blobstore.BlobStoreRepository;

import java.io.IOException;
import java.util.Map;
import java.util.function.Supplier;

import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;
Expand All @@ -36,12 +42,35 @@ public class RemoteRoutingTableService extends AbstractLifecycleComponent {
private final Supplier<RepositoriesService> repositoriesService;
private BlobStoreRepository blobStoreRepository;

private static final DiffableUtils.NonDiffableValueSerializer<String, IndexRoutingTable> CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER = new DiffableUtils.NonDiffableValueSerializer<String, IndexRoutingTable>() {
@Override
public void write(IndexRoutingTable value, StreamOutput out) throws IOException {
value.writeTo(out);
}

@Override
public IndexRoutingTable read(StreamInput in, String key) throws IOException {
return IndexRoutingTable.readFrom(in);
}
};

public RemoteRoutingTableService(Supplier<RepositoriesService> repositoriesService, Settings settings) {
assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled";
this.repositoriesService = repositoriesService;
this.settings = settings;
}


public static DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>> getIndicesRoutingMapDiff(RoutingTable before, RoutingTable after) {
return DiffableUtils.diff(
before.getIndicesRouting(),
after.getIndicesRouting(),
DiffableUtils.getStringKeySerializer(),
CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER
);
}


@Override
protected void doClose() throws IOException {
if (blobStoreRepository != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.remote.RemoteRoutingTableService;
import org.opensearch.core.common.Strings;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.core.xcontent.ToXContentObject;
Expand Down Expand Up @@ -45,6 +46,7 @@ public class ClusterStateDiffManifest implements ToXContentObject {
private static final String DELETES_FIELD = "deletes";
private static final String CLUSTER_BLOCKS_UPDATED_FIELD = "cluster_blocks_diff";
private static final String DISCOVERY_NODES_UPDATED_FIELD = "discovery_nodes_diff";
private static final String ROUTING_TABLE_DIFF = "routing_table_diff";
private static final String CLUSTER_STATE_CUSTOM_DIFF_FIELD = "cluster_state_custom_diff";

private final String fromStateUUID;
Expand All @@ -59,6 +61,8 @@ public class ClusterStateDiffManifest implements ToXContentObject {
private final List<String> indicesDeleted;
private final boolean clusterBlocksUpdated;
private final boolean discoveryNodesUpdated;
private final List<String> indicesRoutingUpdated;
private final List<String> indicesRoutingDeleted;
private final boolean hashesOfConsistentSettingsUpdated;
private final List<String> clusterStateCustomUpdated;
private final List<String> clusterStateCustomDeleted;
Expand Down Expand Up @@ -87,6 +91,13 @@ public class ClusterStateDiffManifest implements ToXContentObject {
}
}

DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>> routingTableDiff = RemoteRoutingTableService.getIndicesRoutingMapDiff(previousState.getRoutingTable(),
state.getRoutingTable());

indicesRoutingUpdated = new ArrayList<>();
routingTableDiff.getUpserts().forEach((k,v) -> indicesRoutingUpdated.add(k));

indicesRoutingDeleted = routingTableDiff.getDeletes();
hashesOfConsistentSettingsUpdated = !state.metadata().hashesOfConsistentSettings().equals(previousState.metadata().hashesOfConsistentSettings());
clusterStateCustomUpdated = new ArrayList<>();
clusterStateCustomDeleted = new ArrayList<>();
Expand Down Expand Up @@ -115,6 +126,8 @@ public ClusterStateDiffManifest(
List<String> indicesDeleted,
boolean clusterBlocksUpdated,
boolean discoveryNodesUpdated,
List<String>indicesRoutingUpdated,
List<String>indicesRoutingDeleted,
boolean hashesOfConsistentSettingsUpdated,
List<String> clusterStateCustomUpdated,
List<String> clusterStateCustomDeleted
Expand All @@ -131,6 +144,8 @@ public ClusterStateDiffManifest(
this.indicesDeleted = indicesDeleted;
this.clusterBlocksUpdated = clusterBlocksUpdated;
this.discoveryNodesUpdated = discoveryNodesUpdated;
this.indicesRoutingUpdated = indicesRoutingUpdated;
this.indicesRoutingDeleted = indicesRoutingDeleted;
this.hashesOfConsistentSettingsUpdated = hashesOfConsistentSettingsUpdated;
this.clusterStateCustomUpdated = clusterStateCustomUpdated;
this.clusterStateCustomDeleted = clusterStateCustomDeleted;
Expand Down Expand Up @@ -176,6 +191,19 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.endObject();
builder.field(CLUSTER_BLOCKS_UPDATED_FIELD, clusterBlocksUpdated);
builder.field(DISCOVERY_NODES_UPDATED_FIELD, discoveryNodesUpdated);

builder.startObject(ROUTING_TABLE_DIFF);
builder.startArray(UPSERTS_FIELD);
for (String index : indicesRoutingUpdated) {
builder.value(index);
}
builder.endArray();
builder.startArray(DELETES_FIELD);
for (String index : indicesRoutingDeleted) {
builder.value(index);
}
builder.endArray();
builder.endObject();
builder.startObject(CLUSTER_STATE_CUSTOM_DIFF_FIELD);
builder.startArray(UPSERTS_FIELD);
for (String custom : clusterStateCustomUpdated) {
Expand Down Expand Up @@ -270,6 +298,21 @@ public static ClusterStateDiffManifest fromXContent(XContentParser parser) throw
throw new XContentParseException("Unexpected token [" + token + "]");
}
}
} else if (currentFieldName.equals(ROUTING_TABLE_DIFF)) {
while ((parser.nextToken()) != XContentParser.Token.END_OBJECT) {
currentFieldName = parser.currentName();
parser.nextToken();
switch (currentFieldName) {
case UPSERTS_FIELD:
builder.indicesRoutingUpdated(parseStringList(parser));
break;
case DELETES_FIELD:
builder.indicesRoutingDeleted(parseStringList(parser));
break;
default:
throw new XContentParseException("Unexpected field [" + currentFieldName + "]");
}
}
} else if (currentFieldName.equals(CLUSTER_STATE_CUSTOM_DIFF_FIELD)) {
while ((parser.nextToken()) != XContentParser.Token.END_OBJECT) {
currentFieldName = parser.currentName();
Expand Down Expand Up @@ -420,6 +463,14 @@ public boolean isHashesOfConsistentSettingsUpdated() {
return hashesOfConsistentSettingsUpdated;
}

public List<String> getIndicesRoutingUpdated() {
return indicesRoutingUpdated;
}

public List<String> getIndicesRoutingDeleted() {
return indicesRoutingDeleted;
}

public List<String> getClusterStateCustomUpdated() {
return clusterStateCustomUpdated;
}
Expand All @@ -445,6 +496,8 @@ public static class Builder {
private List<String> indicesDeleted;
private boolean clusterBlocksUpdated;
private boolean discoveryNodesUpdated;
private List<String> indicesRoutingUpdated;
private List<String> indicesRoutingDeleted;
private boolean hashesOfConsistentSettingsUpdated;
private List<String> clusterStateCustomUpdated;
private List<String> clusterStateCustomDeleted;
Expand Down Expand Up @@ -516,6 +569,16 @@ public Builder discoveryNodesUpdated(boolean discoveryNodesUpdated) {
return this;
}

public Builder indicesRoutingUpdated(List<String> indicesRoutingUpdated) {
this.indicesRoutingUpdated = indicesRoutingUpdated;
return this;
}

public Builder indicesRoutingDeleted(List<String> indicesRoutingDeleted) {
this.indicesRoutingDeleted = indicesRoutingDeleted;
return this;
}

public Builder clusterStateCustomUpdated(List<String> clusterStateCustomUpdated) {
this.clusterStateCustomUpdated = clusterStateCustomUpdated;
return this;
Expand All @@ -540,6 +603,8 @@ public ClusterStateDiffManifest build() {
indicesDeleted,
clusterBlocksUpdated,
discoveryNodesUpdated,
indicesRoutingUpdated,
indicesRoutingDeleted,
hashesOfConsistentSettingsUpdated,
clusterStateCustomUpdated,
clusterStateCustomDeleted
Expand Down

0 comments on commit c83706a

Please sign in to comment.