Skip to content

Commit

Permalink
Address further PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
shiv0408 committed May 10, 2024
1 parent fb0b6aa commit 032ced2
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,6 @@ public String getIndexUUID() {
return index.getUUID();
}


/**
* Test whether the current index UUID is the same as the given one. Returns true if either are _na_
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,7 @@ private static String uploadedFilename(Object[] fields) {
PARSER.declareString(ConstructingObjectParser.constructorArg(), UPLOADED_FILENAME_FIELD);
}

static final String COMPONENT_PREFIX = "index--";
private final String indexName;
private final String indexUUID;
private final String uploadedFilename;
Expand All @@ -757,7 +758,7 @@ public String getUploadedFilePath() {

@Override
public String getComponent() {
return getIndexName();
return COMPONENT_PREFIX + getIndexName();
}

public String getUploadedFilename() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,13 @@ public class RemoteClusterStateService implements Closeable {
new ChecksumBlobStoreFormat<>("cluster-metadata-manifest", METADATA_MANIFEST_NAME_FORMAT, ClusterMetadataManifest::fromXContentV0);

/**
* Manifest format compatible with older codec v1, where global metadata was missing.
* Manifest format compatible with older codec v1, where codec versions/global metadata was introduced.
*/
public static final ChecksumBlobStoreFormat<ClusterMetadataManifest> CLUSTER_METADATA_MANIFEST_FORMAT_V1 =
new ChecksumBlobStoreFormat<>("cluster-metadata-manifest", METADATA_MANIFEST_NAME_FORMAT, ClusterMetadataManifest::fromXContentV1);

/**
* Manifest format compatible with codec v2, where we introduced codec versions/global metadata.
* Manifest format compatible with codec v2, where global metadata file is replaced with multiple metadata attribute files
*/
public static final ChecksumBlobStoreFormat<ClusterMetadataManifest> CLUSTER_METADATA_MANIFEST_FORMAT = new ChecksumBlobStoreFormat<>(
"cluster-metadata-manifest",
Expand Down Expand Up @@ -216,7 +216,7 @@ public class RemoteClusterStateService implements Closeable {
+ "updated : [{}], custom metadata updated : [{}]";
public static final int INDEX_METADATA_CURRENT_CODEC_VERSION = 1;
public static final int MANIFEST_CURRENT_CODEC_VERSION = ClusterMetadataManifest.CODEC_V2;
public static final int GLOBAL_METADATA_CURRENT_CODEC_VERSION = 1;
public static final int GLOBAL_METADATA_CURRENT_CODEC_VERSION = 2;

// ToXContent Params with gateway mode.
// We are using gateway context mode to persist all custom metadata.
Expand Down Expand Up @@ -335,33 +335,15 @@ public ClusterMetadataManifest writeIncrementalMetadata(
}
assert previousClusterState.metadata().coordinationMetadata().term() == clusterState.metadata().coordinationMetadata().term();

// Write Global Metadata

final boolean updateCoordinationMetadata = Metadata.isCoordinationMetadataEqual(
previousClusterState.metadata(),
clusterState.metadata()
) == false;
final boolean updateSettingsMetadata = Metadata.isSettingsMetadataEqual(
previousClusterState.metadata(),
clusterState.metadata()
) == false;
final boolean updateTemplatesMetadata = Metadata.isTemplatesMetadataEqual(
previousClusterState.metadata(),
clusterState.metadata()
) == false;
final Map<String, UploadedMetadataAttribute> previousStateCustomMap = new HashMap<>(previousManifest.getCustomMetadataMap());
final Map<String, UploadedMetadataAttribute> customsToBeDeletedFromRemote = new HashMap<>(previousManifest.getCustomMetadataMap());
final Map<String, Metadata.Custom> customsToUpload = getUpdatedCustoms(clusterState, previousClusterState);
final Map<String, UploadedMetadataAttribute> allUploadedCustomMap = new HashMap<>(previousManifest.getCustomMetadataMap());
for (final String custom : clusterState.metadata().customs().keySet()) {
// remove all the customs which are present currently
previousStateCustomMap.remove(custom);
customsToBeDeletedFromRemote.remove(custom);
}

// Write Index Metadata
final Map<String, IndexMetadata> previousStateIndexMetadataByName = new HashMap<>();
for (final IndexMetadata indexMetadata : previousClusterState.metadata().indices().values()) {
previousStateIndexMetadataByName.put(indexMetadata.getIndex().getName(), indexMetadata);
}
final Map<String, IndexMetadata> indicesToBeDeletedFromRemote = new HashMap<>(previousClusterState.metadata().indices());

int numIndicesUpdated = 0;
int numIndicesUnchanged = 0;
Expand All @@ -374,7 +356,7 @@ public ClusterMetadataManifest writeIncrementalMetadata(
Map<String, IndexMetadata> prevIndexMetadataByName = new HashMap<>();
for (final IndexMetadata indexMetadata : clusterState.metadata().indices().values()) {
String indexName = indexMetadata.getIndex().getName();
final IndexMetadata prevIndexMetadata = previousStateIndexMetadataByName.get(indexName);
final IndexMetadata prevIndexMetadata = indicesToBeDeletedFromRemote.get(indexName);
Long previousVersion = prevIndexMetadata != null ? prevIndexMetadata.getVersion() : null;
if (previousVersion == null || indexMetadata.getVersion() != previousVersion) {
logger.debug(
Expand All @@ -388,57 +370,51 @@ public ClusterMetadataManifest writeIncrementalMetadata(
prevIndexMetadataByName.put(indexName, prevIndexMetadata);
} else {
numIndicesUnchanged++;
// index unchanged it shouldn't be deleted from remote
indicesToBeDeletedFromRemote.remove(indexMetadata.getIndex().getName());
}
previousStateIndexMetadataByName.remove(indexMetadata.getIndex().getName());
}
UploadedMetadataResults uploadedMetadataResults;
boolean firstUpload = !previousManifest.hasMetadataAttributesFiles();
// For migration case from codec V0 or V1 to V2, we have added null check on metadata attribute files,
// If file is empty and codec is 1 then write global metadata.
if (firstUpload) {
uploadedMetadataResults = writeMetadataInParallel(
clusterState,
toUpload,
prevIndexMetadataByName,
clusterState.metadata().customs(),
true,
true,
true
);
} else {
uploadedMetadataResults = writeMetadataInParallel(
clusterState,
toUpload,
prevIndexMetadataByName,
customsToUpload,
updateCoordinationMetadata,
updateSettingsMetadata,
updateTemplatesMetadata
);
}
boolean firstUploadForSplitGlobalMetadata = !previousManifest.hasMetadataAttributesFiles();
boolean updateCoordinationMetadata = firstUploadForSplitGlobalMetadata
|| Metadata.isCoordinationMetadataEqual(previousClusterState.metadata(), clusterState.metadata()) == false;
;
boolean updateSettingsMetadata = firstUploadForSplitGlobalMetadata
|| Metadata.isSettingsMetadataEqual(previousClusterState.metadata(), clusterState.metadata()) == false;
boolean updateTemplatesMetadata = firstUploadForSplitGlobalMetadata
|| Metadata.isTemplatesMetadataEqual(previousClusterState.metadata(), clusterState.metadata()) == false;

uploadedMetadataResults = writeMetadataInParallel(
clusterState,
toUpload,
prevIndexMetadataByName,
firstUploadForSplitGlobalMetadata ? clusterState.metadata().customs() : customsToUpload,
updateCoordinationMetadata,
updateSettingsMetadata,
updateTemplatesMetadata
);

// update the map if the metadata was uploaded
uploadedMetadataResults.uploadedIndexMetadata.forEach(
uploadedIndexMetadata -> allUploadedIndexMetadata.put(uploadedIndexMetadata.getIndexName(), uploadedIndexMetadata)
);
allUploadedCustomMap.putAll(uploadedMetadataResults.uploadedCustomMetadataMap);
// remove the data for removed custom/indices
previousStateCustomMap.keySet().forEach(allUploadedCustomMap::remove);
previousStateIndexMetadataByName.keySet().forEach(allUploadedIndexMetadata::remove);
customsToBeDeletedFromRemote.keySet().forEach(allUploadedCustomMap::remove);
indicesToBeDeletedFromRemote.keySet().forEach(allUploadedIndexMetadata::remove);

final ClusterMetadataManifest manifest = uploadManifest(
clusterState,
new ArrayList<>(allUploadedIndexMetadata.values()),
previousManifest.getPreviousClusterUUID(),
firstUpload || updateCoordinationMetadata
? uploadedMetadataResults.uploadedCoordinationMetadata
: previousManifest.getCoordinationMetadata(),
firstUpload || updateSettingsMetadata
? uploadedMetadataResults.uploadedSettingsMetadata
: previousManifest.getSettingsMetadata(),
firstUpload || updateTemplatesMetadata
? uploadedMetadataResults.uploadedTemplatesMetadata
: previousManifest.getTemplatesMetadata(),
firstUpload || !customsToUpload.isEmpty() ? allUploadedCustomMap : previousManifest.getCustomMetadataMap(),
updateCoordinationMetadata ? uploadedMetadataResults.uploadedCoordinationMetadata : previousManifest.getCoordinationMetadata(),
updateSettingsMetadata ? uploadedMetadataResults.uploadedSettingsMetadata : previousManifest.getSettingsMetadata(),
updateTemplatesMetadata ? uploadedMetadataResults.uploadedTemplatesMetadata : previousManifest.getTemplatesMetadata(),
firstUploadForSplitGlobalMetadata || !customsToUpload.isEmpty()
? allUploadedCustomMap
: previousManifest.getCustomMetadataMap(),
false
);
deleteStaleClusterMetadata(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), RETAINED_MANIFESTS);
Expand Down Expand Up @@ -597,23 +573,23 @@ private UploadedMetadataResults writeMetadataInParallel(
}
UploadedMetadataResults response = new UploadedMetadataResults();
results.forEach((name, uploadedMetadata) -> {
if (uploadedMetadata.getClass().equals(UploadedIndexMetadata.class)) {
response.uploadedIndexMetadata.add((UploadedIndexMetadata) uploadedMetadata);
} else if (uploadedMetadata.getComponent().contains(CUSTOM_METADATA)) {
if (name.contains(CUSTOM_METADATA)) {
// component name for custom metadata will look like custom--<metadata-attribute>
String custom = name.split(DELIMITER)[0].split(CUSTOM_DELIMITER)[1];
response.uploadedCustomMetadataMap.put(
custom,
new UploadedMetadataAttribute(custom, uploadedMetadata.getUploadedFilename())
);
} else if (COORDINATION_METADATA.equals(uploadedMetadata.getComponent())) {
} else if (COORDINATION_METADATA.equals(name)) {
response.uploadedCoordinationMetadata = (UploadedMetadataAttribute) uploadedMetadata;
} else if (SETTING_METADATA.equals(uploadedMetadata.getComponent())) {
} else if (SETTING_METADATA.equals(name)) {
response.uploadedSettingsMetadata = (UploadedMetadataAttribute) uploadedMetadata;
} else if (TEMPLATES_METADATA.equals(uploadedMetadata.getComponent())) {
} else if (TEMPLATES_METADATA.equals(name)) {
response.uploadedTemplatesMetadata = (UploadedMetadataAttribute) uploadedMetadata;
} else if (name.contains(UploadedIndexMetadata.COMPONENT_PREFIX)) {
response.uploadedIndexMetadata.add((UploadedIndexMetadata) uploadedMetadata);
} else {
throw new IllegalStateException("Unexpected metadata component " + uploadedMetadata.getComponent());
throw new IllegalStateException("Unknown metadata component name " + name);
}
});
return response;
Expand Down Expand Up @@ -786,39 +762,6 @@ public void start() {
blobStoreRepository = (BlobStoreRepository) repository;
}

private ClusterMetadataManifest uploadV1Manifest(
ClusterState clusterState,
List<UploadedIndexMetadata> uploadedIndexMetadata,
String previousClusterUUID,
String globalMetadataFileName,
boolean committed
) throws IOException {
synchronized (this) {
final String manifestFileName = getManifestFileName(
clusterState.term(),
clusterState.version(),
committed,
ClusterMetadataManifest.CODEC_V1
);
ClusterMetadataManifest manifest = ClusterMetadataManifest.builder()
.clusterTerm(clusterState.term())
.stateVersion(clusterState.getVersion())
.clusterUUID(clusterState.metadata().clusterUUID())
.stateUUID(clusterState.stateUUID())
.opensearchVersion(Version.CURRENT)
.nodeId(nodeId)
.committed(committed)
.codecVersion(ClusterMetadataManifest.CODEC_V1)
.globalMetadataFileName(globalMetadataFileName)
.indices(uploadedIndexMetadata)
.previousClusterUUID(previousClusterUUID)
.clusterUUIDCommitted(clusterState.metadata().clusterUUIDCommitted())
.build();
writeMetadataManifest(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), manifest, manifestFileName);
return manifest;
}
}

private ClusterMetadataManifest uploadManifest(
ClusterState clusterState,
List<UploadedIndexMetadata> uploadedIndexMetadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,55 @@ public void testIndexMetadataOnlyUpdated() throws IOException {
});
}

public void testIndexMetadataDeletedUpdatedAndAdded() throws IOException {
// setup
mockBlobStoreObjects();

// Initial cluster state with index.
final ClusterState initialClusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
remoteClusterStateService.start();
final ClusterMetadataManifest initialManifest = remoteClusterStateService.writeFullMetadata(initialClusterState, "_na_");
String initialIndex = "test-index";
Index index1 = new Index("test-index-1", "index-uuid-1");
Index index2 = new Index("test-index-2", "index-uuid-2");
final Settings idxSettings1 = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_INDEX_UUID, index1.getUUID())
.build();
final IndexMetadata indexMetadata1 = new IndexMetadata.Builder(index1.getName()).settings(idxSettings1)
.numberOfShards(1)
.numberOfReplicas(0)
.build();
final Settings idxSettings2 = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_INDEX_UUID, index2.getUUID())
.build();
final IndexMetadata indexMetadata2 = new IndexMetadata.Builder(index2.getName()).settings(idxSettings2)
.numberOfShards(1)
.numberOfReplicas(0)
.build();
ClusterState clusterState1 = ClusterState.builder(initialClusterState)
.metadata(
Metadata.builder(initialClusterState.getMetadata())
.put(indexMetadata1, false)
.put(indexMetadata2, false)
.remove(initialIndex)
.build()
)
.build();
ClusterMetadataManifest manifest1 = remoteClusterStateService.writeIncrementalMetadata(
initialClusterState,
clusterState1,
initialManifest
);
// verify that initial index is removed, and new index are added
assertTrue(initialManifest.getIndices().stream().anyMatch(indexMetadata -> indexMetadata.getIndexName().equals(initialIndex)));
assertFalse(manifest1.getIndices().stream().anyMatch(indexMetadata -> indexMetadata.getIndexName().equals(initialIndex)));
assertTrue(manifest1.getIndices().stream().anyMatch(indexMetadata -> indexMetadata.getIndexName().equals(index1.getName())));
assertTrue(manifest1.getIndices().stream().anyMatch(indexMetadata -> indexMetadata.getIndexName().equals(index2.getName())));

}

private void verifyMetadataAttributeOnlyUpdated(
Function<ClusterState, ClusterState> clusterStateUpdater,
BiConsumer<ClusterMetadataManifest, ClusterMetadataManifest> assertions
Expand Down

0 comments on commit 032ced2

Please sign in to comment.