Skip to content

Commit

Permalink
Diffable metadata publish changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Harsh Garg committed Jul 2, 2024
1 parent 7baadfc commit ece68ce
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 42 deletions.
33 changes: 31 additions & 2 deletions server/src/main/java/org/opensearch/cluster/DiffableUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -88,6 +89,16 @@ public static <K, T extends Diffable<T>> MapDiff<K, T, Map<K, T>> diff(
return new JdkMapDiff<>(before, after, keySerializer, DiffableValueSerializer.getWriteOnlyInstance());
}

public static <K, T extends Diffable<T>> MapDiff<K, T, Map<K, T>> diff(
Map<K, T> before,
Map<K, T> after,
KeySerializer<K> keySerializer,
boolean maintainOrder
) {
assert after != null && before != null;
return new JdkMapDiff<>(before, after, keySerializer, DiffableValueSerializer.getWriteOnlyInstance(), maintainOrder);
}

/**
* Calculates diff between two Maps of non-diffable objects
*/
Expand Down Expand Up @@ -138,7 +149,11 @@ protected JdkMapDiff(StreamInput in, KeySerializer<K> keySerializer, ValueSerial
}

JdkMapDiff(Map<K, T> before, Map<K, T> after, KeySerializer<K> keySerializer, ValueSerializer<K, T> valueSerializer) {
super(keySerializer, valueSerializer);
this(before, after, keySerializer, valueSerializer, false);
}

JdkMapDiff(Map<K, T> before, Map<K, T> after, KeySerializer<K> keySerializer, ValueSerializer<K, T> valueSerializer, boolean maintainOrder) {
super(keySerializer, valueSerializer, maintainOrder);
assert after != null && before != null;

for (K key : before.keySet()) {
Expand All @@ -163,7 +178,8 @@ protected JdkMapDiff(StreamInput in, KeySerializer<K> keySerializer, ValueSerial

@Override
public Map<K, T> apply(Map<K, T> map) {
Map<K, T> builder = new HashMap<>(map);
Map<K, T> builder = maintainOrder ? new LinkedHashMap<>() : new LinkedHashMap<>(map);
System.out.println("Trying to apply diff for map " + map.getClass());

for (K part : deletes) {
builder.remove(part);
Expand Down Expand Up @@ -198,13 +214,24 @@ public abstract static class MapDiff<K, T, M> implements Diff<M> {
protected final Map<K, T> upserts; // additions or full updates
protected final KeySerializer<K> keySerializer;
protected final ValueSerializer<K, T> valueSerializer;
protected final boolean maintainOrder;

protected MapDiff(KeySerializer<K> keySerializer, ValueSerializer<K, T> valueSerializer) {
this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer;
deletes = new ArrayList<>();
diffs = new HashMap<>();
upserts = new HashMap<>();
this.maintainOrder = false;
}

protected MapDiff(KeySerializer<K> keySerializer, ValueSerializer<K, T> valueSerializer, boolean maintainOrder) {
this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer;
deletes = new ArrayList<>();
diffs = new HashMap<>();
upserts = maintainOrder ? new LinkedHashMap<>() : new HashMap<>();
this.maintainOrder = maintainOrder;
}

protected MapDiff(
Expand All @@ -219,11 +246,13 @@ protected MapDiff(
this.deletes = deletes;
this.diffs = diffs;
this.upserts = upserts;
this.maintainOrder = false;
}

protected MapDiff(StreamInput in, KeySerializer<K> keySerializer, ValueSerializer<K, T> valueSerializer) throws IOException {
this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer;
this.maintainOrder = false;
deletes = in.readList(keySerializer::readKey);
int diffsCount = in.readVInt();
diffs = diffsCount == 0 ? Collections.emptyMap() : new HashMap<>(diffsCount);
Expand Down
82 changes: 43 additions & 39 deletions server/src/main/java/org/opensearch/cluster/metadata/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,8 @@ static Custom fromXContent(XContentParser parser, String name) throws IOExceptio
this.persistentSettings = persistentSettings;
this.settings = Settings.builder().put(persistentSettings).put(transientSettings).build();
this.hashesOfConsistentSettings = hashesOfConsistentSettings;
this.indices = Collections.unmodifiableMap(indices);
LinkedHashMap<String, IndexMetadata> linkedMap = new LinkedHashMap<>(indices);
this.indices = Collections.unmodifiableMap(linkedMap);
this.customs = Collections.unmodifiableMap(customs);
this.templates = new TemplatesMetadata(templates);
int totalNumberOfShards = 0;
Expand Down Expand Up @@ -775,6 +776,9 @@ public IndexMetadata index(Index index) {
if (metadata != null && metadata.getIndexUUID().equals(index.getUUID())) {
return metadata;
}
if (metadata == null) {
System.out.println("returning null for index from metadata class");
}
return null;
}

Expand Down Expand Up @@ -1040,7 +1044,7 @@ private static class MetadataDiff implements Diff<Metadata> {
transientSettings = after.transientSettings;
persistentSettings = after.persistentSettings;
hashesOfConsistentSettings = after.hashesOfConsistentSettings.diff(before.hashesOfConsistentSettings);
indices = DiffableUtils.diff(before.indices, after.indices, DiffableUtils.getStringKeySerializer());
indices = DiffableUtils.diff(before.indices, after.indices, DiffableUtils.getStringKeySerializer(), true);
templates = DiffableUtils.diff(
before.templates.getTemplates(),
after.templates.getTemplates(),
Expand Down Expand Up @@ -1211,31 +1215,31 @@ public Builder put(IndexMetadata.Builder indexMetadataBuilder) {
IndexMetadata indexMetadata = indexMetadataBuilder.build();
indices.put(indexMetadata.getIndex().getName(), indexMetadata);

// Create a list from the entry set of the TreeMap
List<Map.Entry<String, IndexMetadata>> indicesMapEnties = new ArrayList<>(indices.entrySet());
System.out.println("-----------------printing entries for list1-------------------------------");
for (Map.Entry<String, IndexMetadata> entry : indicesMapEnties) {
System.out.println("In list1 entries " + entry.getKey() + " value is " + entry.getValue());
}
// Sort the list by values using a custom comparator
Collections.sort(indicesMapEnties, (o1, o2) -> {
Long creationTimeDiffEpochs = o1.getValue().getCreationDate() - o2.getValue().getCreationDate();
if (creationTimeDiffEpochs == 0) {
return o1.getKey().compareTo(o2.getKey());
}
return creationTimeDiffEpochs > 0 ? 1 : -1;
});
System.out.println("-----------------printing entries for list2-------------------------------");
for (Map.Entry<String, IndexMetadata> entry : indicesMapEnties) {
System.out.println("In list2 entries " + entry.getKey() + " value is " + entry.getValue());
}

indices.clear();
System.out.println("-----------------printing entries for map-------------------------------");
indicesMapEnties.stream().map(mapEntry -> indices.put(mapEntry.getKey(), mapEntry.getValue()));
for (Map.Entry<String, IndexMetadata> entry : indices.entrySet()) {
System.out.println("entries " + entry.getKey() + " value is " + entry.getValue());
}
// // Create a list from the entry set of the TreeMap
// List<Map.Entry<String, IndexMetadata>> indicesMapEnties = new ArrayList<>(indices.entrySet());
// System.out.println("-----------------printing entries for list1-------------------------------");
// for (Map.Entry<String, IndexMetadata> entry : indicesMapEnties) {
// System.out.println("In list1 entries " + entry.getKey() + " value is " + entry.getValue());
// }
// // Sort the list by values using a custom comparator
// Collections.sort(indicesMapEnties, (o1, o2) -> {
// Long creationTimeDiffEpochs = o1.getValue().getCreationDate() - o2.getValue().getCreationDate();
// if (creationTimeDiffEpochs == 0) {
// return o1.getKey().compareTo(o2.getKey());
// }
// return creationTimeDiffEpochs > 0 ? 1 : -1;
// });
// System.out.println("-----------------printing entries for list2-------------------------------");
// for (Map.Entry<String, IndexMetadata> entry : indicesMapEnties) {
// System.out.println("In list2 entries " + entry.getKey() + " value is " + entry.getValue());
// }
//
// indices.clear();
// System.out.println("-----------------printing entries for map-------------------------------");
// indicesMapEnties.stream().forEach(mapEntry -> indices.put(mapEntry.getKey(), mapEntry.getValue()));
// for (Map.Entry<String, IndexMetadata> entry : indices.entrySet()) {
// System.out.println("entries " + entry.getKey() + " value is " + entry.getValue());
// }
return this;
}

Expand All @@ -1249,18 +1253,18 @@ public Builder put(IndexMetadata indexMetadata, boolean incrementVersion) {
}
indices.put(indexMetadata.getIndex().getName(), indexMetadata);

// Create a list from the entry set of the TreeMap
List<Map.Entry<String, IndexMetadata>> indicesMapEntries = new ArrayList<>(indices.entrySet());
// Sort the list by values using a custom comparator
Collections.sort(indicesMapEntries, (o1, o2) -> {
Long creationTimeDiffEpochs = o1.getValue().getCreationDate() - o2.getValue().getCreationDate();
if (creationTimeDiffEpochs == 0) {
return o1.getKey().compareTo(o2.getKey());
}
return creationTimeDiffEpochs > 0 ? 1 : -1;
});
indices.clear();
indicesMapEntries.stream().forEach(mapEntry -> indices.put(mapEntry.getKey(), mapEntry.getValue()));
// // Create a list from the entry set of the TreeMap
// List<Map.Entry<String, IndexMetadata>> indicesMapEntries = new ArrayList<>(indices.entrySet());
// // Sort the list by values using a custom comparator
// Collections.sort(indicesMapEntries, (o1, o2) -> {
// Long creationTimeDiffEpochs = o1.getValue().getCreationDate() - o2.getValue().getCreationDate();
// if (creationTimeDiffEpochs == 0) {
// return o1.getKey().compareTo(o2.getKey());
// }
// return creationTimeDiffEpochs > 0 ? 1 : -1;
// });
// indices.clear();
// indicesMapEntries.stream().forEach(mapEntry -> indices.put(mapEntry.getKey(), mapEntry.getValue()));
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,9 @@ private void callClusterStateListener(
clusterManagerMetrics.clusterStateListenersHistogram,
(double) Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - listenerStartTimeNS)),
Optional.of(Tags.create().addTag("Operation", listener.getClass().getSimpleName()))



);
}
} catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,7 @@ public synchronized IndexService createIndex(
final boolean writeDanglingIndices
) throws IOException {
ensureChangesAllowed();
System.out.println("--------------------Index metadata for index ----------------" + indexMetadata.getCreationDate() + "name is " + indexMetadata.getIndex().getName());
if (indexMetadata.getIndexUUID().equals(IndexMetadata.INDEX_UUID_NA_VALUE)) {
throw new IllegalArgumentException("index must have a real UUID found value: [" + indexMetadata.getIndexUUID() + "]");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ private void createIndices(final ClusterState state) {
for (Map.Entry<Index, List<ShardRouting>> entry : indicesToCreate.entrySet()) {
final Index index = entry.getKey();
final IndexMetadata indexMetadata = state.metadata().index(index);
logger.debug("[{}] creating index", index);
logger.info("[{}] creating index", index);

AllocatedIndex<? extends Shard> indexService = null;
try {
Expand Down

0 comments on commit ece68ce

Please sign in to comment.