Skip to content

Commit

Permalink
Added UTs
Browse files Browse the repository at this point in the history
Signed-off-by: Dhwanil Patel <[email protected]>
  • Loading branch information
dhwanilpatel committed Oct 9, 2023
1 parent d0d1ccf commit c2cf656
Show file tree
Hide file tree
Showing 9 changed files with 230 additions and 43 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Implement Visitor Design pattern in QueryBuilder to enable the capability to traverse through the complex QueryBuilder tree. ([#10110](https://github.com/opensearch-project/OpenSearch/pull/10110))
- Provide service accounts tokens to extensions ([#9618](https://github.com/opensearch-project/OpenSearch/pull/9618))
- Configurable merge policy for index with an option to choose from LogByteSize and Tiered merge policy ([#9992](https://github.com/opensearch-project/OpenSearch/pull/9992))
- Upload global cluster state to remote store([#10404](https://github.com/opensearch-project/OpenSearch/pull/10404))

### Dependencies
- Bump `log4j-core` from 2.18.0 to 2.19.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ public ClusterMetadataManifest(StreamInput in) throws IOException {
this.indices = Collections.unmodifiableList(in.readList(UploadedIndexMetadata::new));
this.previousClusterUUID = in.readString();
this.clusterUUIDCommitted = in.readBoolean();
if(in.getVersion().onOrAfter(Version.V_3_0_0)) {
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
this.globalMetadataFileName = in.readString();
} else {
this.globalMetadataFileName = null;
Expand Down Expand Up @@ -266,7 +266,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeCollection(indices);
out.writeString(previousClusterUUID);
out.writeBoolean(clusterUUIDCommitted);
if(out.getVersion().onOrAfter(Version.V_3_0_0)) {
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeString(globalMetadataFileName);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,16 +128,16 @@ public class RemoteClusterStateService implements Closeable {

private static final int CURRENT_GLOBAL_METADATA_VERSION = 1;

// ToXContent Params with gateway mode.
// We are using gateway context mode to persist all custom metadata.
private static final ToXContent.Params FORMAT_PARAMS;
// TODO: For now using Gateway context, check if how we can cover all type of custom by default.
// ToXContent Params with gateway mode.
// We are using gateway context mode to persist all custom metadata.
public static final ToXContent.Params FORMAT_PARAMS;
static {
Map<String, String> params = new HashMap<>(1);
params.put(Metadata.CONTEXT_MODE_PARAM, Metadata.CONTEXT_MODE_GATEWAY);
FORMAT_PARAMS = new ToXContent.MapParams(params);
}


public RemoteClusterStateService(
String nodeId,
Supplier<RepositoriesService> repositoriesService,
Expand Down Expand Up @@ -185,7 +185,13 @@ public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState, Stri
clusterState,
new ArrayList<>(clusterState.metadata().indices().values())
);
final ClusterMetadataManifest manifest = uploadManifest(clusterState, allUploadedIndexMetadata, previousClusterUUID, globalMetadataFile, false);
final ClusterMetadataManifest manifest = uploadManifest(
clusterState,
allUploadedIndexMetadata,
previousClusterUUID,
globalMetadataFile,
false
);
final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos);
if (durationMillis >= slowWriteLoggingThreshold.getMillis()) {
logger.warn(
Expand Down Expand Up @@ -228,7 +234,7 @@ public ClusterMetadataManifest writeIncrementalMetadata(
// Write Global Metadata
final boolean updateGlobalMeta = Metadata.isGlobalStateEquals(previousClusterState.metadata(), clusterState.metadata()) == false;
String globalMetadataFile;
if(updateGlobalMeta) {
if (updateGlobalMeta) {
globalMetadataFile = writeGlobalMetadata(clusterState);
} else {
globalMetadataFile = previousManifest.getGlobalMetadataFileName();
Expand Down Expand Up @@ -310,8 +316,7 @@ public ClusterMetadataManifest writeIncrementalMetadata(
* @param clusterState current ClusterState
* @return String file name where globalMetadata file is stored.
*/
private String writeGlobalMetadata(ClusterState clusterState)
throws IOException {
private String writeGlobalMetadata(ClusterState clusterState) throws IOException {

AtomicReference<String> result = new AtomicReference<String>();
final BlobContainer globalMetadataContainer = globalMetadataContainer(
Expand All @@ -323,22 +328,13 @@ private String writeGlobalMetadata(ClusterState clusterState)
// latch to wait until upload is not finished
CountDownLatch latch = new CountDownLatch(1);

LatchedActionListener completionListener = new LatchedActionListener<>(
ActionListener.wrap(
resp -> {
logger.trace(
String.format(Locale.ROOT, "GlobalMetadata uploaded successfully.")
);
result.set(globalMetadataContainer.path().buildAsString() + globalMetadataFilename);
}, ex -> {
logger.error(
() -> new ParameterizedMessage("Exception during transfer of GlobalMetadata to Remote {}", ex.getMessage()),
ex
);
throw new GlobalMetadataTransferException(ex.getMessage(), ex);
}),
latch
);
LatchedActionListener completionListener = new LatchedActionListener<>(ActionListener.wrap(resp -> {
logger.trace(String.format(Locale.ROOT, "GlobalMetadata uploaded successfully."));
result.set(globalMetadataContainer.path().buildAsString() + globalMetadataFilename);
}, ex -> {
logger.error(() -> new ParameterizedMessage("Exception during transfer of GlobalMetadata to Remote {}", ex.getMessage()), ex);
throw new GlobalMetadataTransferException(ex.getMessage(), ex);
}), latch);

BlobStoreRepository.GLOBAL_METADATA_FORMAT.writeAsync(
clusterState.metadata(),
Expand All @@ -352,19 +348,13 @@ private String writeGlobalMetadata(ClusterState clusterState)
try {
if (latch.await(GLOBAL_METADATA_UPLOAD_WAIT_MILLIS, TimeUnit.MILLISECONDS) == false) {
GlobalMetadataTransferException ex = new GlobalMetadataTransferException(
String.format(
Locale.ROOT,
"Timed out waiting for transfer of global metadata to complete"
)
String.format(Locale.ROOT, "Timed out waiting for transfer of global metadata to complete")
);
throw ex;
}
} catch (InterruptedException ex) {
GlobalMetadataTransferException exception = new GlobalMetadataTransferException(
String.format(
Locale.ROOT,
"Timed out waiting for transfer of index metadata to complete - %s"
),
String.format(Locale.ROOT, "Timed out waiting for transfer of index metadata to complete - %s"),
ex
);
Thread.currentThread().interrupt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3336,7 +3336,12 @@ private void writeShardIndexBlobAtomic(
() -> new ParameterizedMessage("[{}] Writing shard index [{}] to [{}]", metadata.name(), indexGeneration, shardContainer.path())
);
final String blobName = INDEX_SHARD_SNAPSHOTS_FORMAT.blobName(String.valueOf(indexGeneration));
writeAtomic(shardContainer, blobName, INDEX_SHARD_SNAPSHOTS_FORMAT.serialize(updatedSnapshots, blobName, compressor, SNAPSHOT_ONLY_FORMAT_PARAMS), true);
writeAtomic(
shardContainer,
blobName,
INDEX_SHARD_SNAPSHOTS_FORMAT.serialize(updatedSnapshots, blobName, compressor, SNAPSHOT_ONLY_FORMAT_PARAMS),
true
);
}

// Unused blobs are all previous index-, data- and meta-blobs and that are not referenced by the new index- as well as all
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ public void writeAsync(
}
}

public BytesReference serialize(final T obj, final String blobName, final Compressor compressor, final ToXContent.Params params) throws IOException {
public BytesReference serialize(final T obj, final String blobName, final Compressor compressor, final ToXContent.Params params)
throws IOException {
try (BytesStreamOutput outputStream = new BytesStreamOutput()) {
try (
OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -938,6 +938,7 @@ public void testHandlePrePublishAndCommitWhenRemoteStateEnabled() throws IOExcep
Version.CURRENT,
randomAlphaOfLength(10),
false,
randomAlphaOfLength(10),
Collections.emptyList(),
randomAlphaOfLength(10),
true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public void testClusterMetadataManifestXContent() throws IOException {
Version.CURRENT,
"test-node-id",
false,
"test-global-metadata-file",
Collections.singletonList(uploadedIndexMetadata),
"prev-cluster-uuid",
true
Expand All @@ -60,6 +61,7 @@ public void testClusterMetadataManifestSerializationEqualsHashCode() {
Version.CURRENT,
"B10RX1f5RJenMQvYccCgSQ",
true,
"test-global-metadata-file",
randomUploadedIndexMetadataList(),
"yfObdx8KSMKKrXf8UyHhM",
true
Expand Down
Loading

0 comments on commit c2cf656

Please sign in to comment.