Skip to content

Commit

Permalink
Move TestCapturingListener to test/framework
Browse files Browse the repository at this point in the history
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
shiv0408 committed Jun 19, 2024
1 parent f40f749 commit 3d3f2c5
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.TestCapturingListener;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.common.io.stream.StreamInput;
Expand Down Expand Up @@ -116,18 +117,17 @@ public void testGetAsyncMetadataWriteAction_DiscoveryNodes() throws IOException,
}).when(blobStoreTransferService)
.uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class));
final CountDownLatch latch = new CountDownLatch(1);
final RemoteStateTestUtil.TestCapturingListener<ClusterMetadataManifest.UploadedMetadata> listener =
new RemoteStateTestUtil.TestCapturingListener<>();
final TestCapturingListener<ClusterMetadataManifest.UploadedMetadata> listener = new TestCapturingListener<>();
remoteClusterStateAttributesManager.getAsyncMetadataWriteAction(
DISCOVERY_NODES,
remoteDiscoveryNodes,
new LatchedActionListener<>(listener, latch)
).run();
latch.await();
assertNull(listener.failure);
assertNotNull(listener.result);
assertEquals(DISCOVERY_NODES, listener.result.getComponent());
String uploadedFileName = listener.result.getUploadedFilename();
assertNull(listener.getFailure());
assertNotNull(listener.getResult());
assertEquals(DISCOVERY_NODES, listener.getResult().getComponent());
String uploadedFileName = listener.getResult().getUploadedFilename();
String[] pathTokens = uploadedFileName.split(PATH_DELIMITER);
assertEquals(5, pathTokens.length);
assertEquals(RemoteClusterStateUtils.encodeString(CLUSTER_NAME), pathTokens[0]);
Expand All @@ -149,18 +149,18 @@ public void testGetAsyncMetadataReadAction_DiscoveryNodes() throws IOException,
);
RemoteDiscoveryNodes remoteObjForDownload = new RemoteDiscoveryNodes(fileName, "cluster-uuid", compressor);
CountDownLatch latch = new CountDownLatch(1);
RemoteStateTestUtil.TestCapturingListener<RemoteReadResult> listener = new RemoteStateTestUtil.TestCapturingListener<>();
TestCapturingListener<RemoteReadResult> listener = new TestCapturingListener<>();
remoteClusterStateAttributesManager.getAsyncMetadataReadAction(
DISCOVERY_NODES,
remoteObjForDownload,
new LatchedActionListener<>(listener, latch)
).run();
latch.await();
assertNull(listener.failure);
assertNotNull(listener.result);
assertEquals(CLUSTER_STATE_ATTRIBUTE, listener.result.getComponent());
assertEquals(DISCOVERY_NODES, listener.result.getComponentName());
DiscoveryNodes readDiscoveryNodes = (DiscoveryNodes) listener.result.getObj();
assertNull(listener.getFailure());
assertNotNull(listener.getResult());
assertEquals(CLUSTER_STATE_ATTRIBUTE, listener.getResult().getComponent());
assertEquals(DISCOVERY_NODES, listener.getResult().getComponentName());
DiscoveryNodes readDiscoveryNodes = (DiscoveryNodes) listener.getResult().getObj();
assertEquals(discoveryNodes.getSize(), readDiscoveryNodes.getSize());
discoveryNodes.getNodes().forEach((nodeId, node) -> assertEquals(readDiscoveryNodes.get(nodeId), node));
assertEquals(discoveryNodes.getClusterManagerNodeId(), readDiscoveryNodes.getClusterManagerNodeId());
Expand All @@ -175,18 +175,17 @@ public void testGetAsyncMetadataWriteAction_ClusterBlocks() throws IOException,
}).when(blobStoreTransferService)
.uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class));
final CountDownLatch latch = new CountDownLatch(1);
final RemoteStateTestUtil.TestCapturingListener<ClusterMetadataManifest.UploadedMetadata> listener =
new RemoteStateTestUtil.TestCapturingListener<>();
final TestCapturingListener<ClusterMetadataManifest.UploadedMetadata> listener = new TestCapturingListener<>();
remoteClusterStateAttributesManager.getAsyncMetadataWriteAction(
CLUSTER_BLOCKS,
remoteClusterBlocks,
new LatchedActionListener<>(listener, latch)
).run();
latch.await();
assertNull(listener.failure);
assertNotNull(listener.result);
assertEquals(CLUSTER_BLOCKS, listener.result.getComponent());
String uploadedFileName = listener.result.getUploadedFilename();
assertNull(listener.getFailure());
assertNotNull(listener.getResult());
assertEquals(CLUSTER_BLOCKS, listener.getResult().getComponent());
String uploadedFileName = listener.getResult().getUploadedFilename();
String[] pathTokens = uploadedFileName.split(PATH_DELIMITER);
assertEquals(5, pathTokens.length);
assertEquals(encodeString(CLUSTER_NAME), pathTokens[0]);
Expand All @@ -208,19 +207,19 @@ public void testGetAsyncMetadataReadAction_ClusterBlocks() throws IOException, I
);
RemoteClusterBlocks remoteClusterBlocks = new RemoteClusterBlocks(fileName, "cluster-uuid", compressor);
CountDownLatch latch = new CountDownLatch(1);
RemoteStateTestUtil.TestCapturingListener<RemoteReadResult> listener = new RemoteStateTestUtil.TestCapturingListener<>();
TestCapturingListener<RemoteReadResult> listener = new TestCapturingListener<>();

remoteClusterStateAttributesManager.getAsyncMetadataReadAction(
CLUSTER_BLOCKS,
remoteClusterBlocks,
new LatchedActionListener<>(listener, latch)
).run();
latch.await();
assertNull(listener.failure);
assertNotNull(listener.result);
assertEquals(CLUSTER_STATE_ATTRIBUTE, listener.result.getComponent());
assertEquals(CLUSTER_BLOCKS, listener.result.getComponentName());
ClusterBlocks readClusterBlocks = (ClusterBlocks) listener.result.getObj();
assertNull(listener.getFailure());
assertNotNull(listener.getResult());
assertEquals(CLUSTER_STATE_ATTRIBUTE, listener.getResult().getComponent());
assertEquals(CLUSTER_BLOCKS, listener.getResult().getComponentName());
ClusterBlocks readClusterBlocks = (ClusterBlocks) listener.getResult().getObj();
assertEquals(clusterBlocks.global(), readClusterBlocks.global());
assertEquals(clusterBlocks.indices().keySet(), readClusterBlocks.indices().keySet());
for (String index : clusterBlocks.indices().keySet()) {
Expand All @@ -243,19 +242,18 @@ public void testGetAsyncMetadataWriteAction_Custom() throws IOException, Interru
return null;
}).when(blobStoreTransferService)
.uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class));
final RemoteStateTestUtil.TestCapturingListener<ClusterMetadataManifest.UploadedMetadata> listener =
new RemoteStateTestUtil.TestCapturingListener<>();
final TestCapturingListener<ClusterMetadataManifest.UploadedMetadata> listener = new TestCapturingListener<>();
final CountDownLatch latch = new CountDownLatch(1);
remoteClusterStateAttributesManager.getAsyncMetadataWriteAction(
CLUSTER_STATE_CUSTOM,
remoteClusterStateCustoms,
new LatchedActionListener<>(listener, latch)
).run();
latch.await();
assertNull(listener.failure);
assertNotNull(listener.result);
assertEquals(String.join(CUSTOM_DELIMITER, CLUSTER_STATE_CUSTOM, custom.getWriteableName()), listener.result.getComponent());
String uploadedFileName = listener.result.getUploadedFilename();
assertNull(listener.getFailure());
assertNotNull(listener.getResult());
assertEquals(String.join(CUSTOM_DELIMITER, CLUSTER_STATE_CUSTOM, custom.getWriteableName()), listener.getResult().getComponent());
String uploadedFileName = listener.getResult().getUploadedFilename();
String[] pathTokens = uploadedFileName.split(PATH_DELIMITER);
assertEquals(5, pathTokens.length);
assertEquals(encodeString(CLUSTER_NAME), pathTokens[0]);
Expand All @@ -282,19 +280,19 @@ public void testGetAsyncMetadataReadAction_Custom() throws IOException, Interrup
when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn(
remoteClusterStateCustoms.clusterStateCustomsFormat.serialize(custom, fileName, compressor).streamInput()
);
RemoteStateTestUtil.TestCapturingListener<RemoteReadResult> capturingListener = new RemoteStateTestUtil.TestCapturingListener<>();
TestCapturingListener<RemoteReadResult> capturingListener = new TestCapturingListener<>();
final CountDownLatch latch = new CountDownLatch(1);
remoteClusterStateAttributesManager.getAsyncMetadataReadAction(
CLUSTER_STATE_CUSTOM,
remoteClusterStateCustoms,
new LatchedActionListener<>(capturingListener, latch)
).run();
latch.await();
assertNull(capturingListener.failure);
assertNotNull(capturingListener.result);
assertEquals(custom, capturingListener.result.getObj());
assertEquals(CLUSTER_STATE_ATTRIBUTE, capturingListener.result.getComponent());
assertEquals(CLUSTER_STATE_CUSTOM, capturingListener.result.getComponentName());
assertNull(capturingListener.getFailure());
assertNotNull(capturingListener.getResult());
assertEquals(custom, capturingListener.getResult().getObj());
assertEquals(CLUSTER_STATE_ATTRIBUTE, capturingListener.getResult().getComponent());
assertEquals(CLUSTER_STATE_CUSTOM, capturingListener.getResult().getComponentName());
}

public void testGetAsyncMetadataWriteAction_Exception() throws IOException, InterruptedException {
Expand All @@ -308,17 +306,17 @@ public void testGetAsyncMetadataWriteAction_Exception() throws IOException, Inte
}).when(blobStoreTransferService)
.uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class));

RemoteStateTestUtil.TestCapturingListener<ClusterMetadataManifest.UploadedMetadata> capturingListener =
new RemoteStateTestUtil.TestCapturingListener<>();
TestCapturingListener<ClusterMetadataManifest.UploadedMetadata> capturingListener = new TestCapturingListener<>();
final CountDownLatch latch = new CountDownLatch(1);
remoteClusterStateAttributesManager.getAsyncMetadataWriteAction(
DISCOVERY_NODES,
remoteDiscoveryNodes,
new LatchedActionListener<>(capturingListener, latch)
).run();
latch.await();
assertNull(capturingListener.result);
assertTrue(capturingListener.failure instanceof RemoteStateTransferException);
assertNull(capturingListener.getResult());
assertTrue(capturingListener.getFailure() instanceof RemoteStateTransferException);
assertEquals(ioException, capturingListener.getFailure().getCause());
}

public void testGetAsyncMetadataReadAction_Exception() throws IOException, InterruptedException {
Expand All @@ -327,15 +325,15 @@ public void testGetAsyncMetadataReadAction_Exception() throws IOException, Inter
Exception ioException = new IOException("mock test exception");
when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenThrow(ioException);
CountDownLatch latch = new CountDownLatch(1);
RemoteStateTestUtil.TestCapturingListener<RemoteReadResult> capturingListener = new RemoteStateTestUtil.TestCapturingListener<>();
TestCapturingListener<RemoteReadResult> capturingListener = new TestCapturingListener<>();
remoteClusterStateAttributesManager.getAsyncMetadataReadAction(
DISCOVERY_NODES,
remoteDiscoveryNodes,
new LatchedActionListener<>(capturingListener, latch)
).run();
latch.await();
assertNull(capturingListener.result);
assertEquals(ioException, capturingListener.failure);
assertNull(capturingListener.getResult());
assertEquals(ioException, capturingListener.getFailure());
}

public void testGetUpdatedCustoms() {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.util;

import org.opensearch.core.action.ActionListener;

/**
* A simple implementation of {@link ActionListener} that captures the response and failures used for testing purposes.
*
* @param <T> the result type
*/
public class TestCapturingListener<T> implements ActionListener<T> {
private T result;
private Exception failure;

@Override
public void onResponse(T result) {
this.result = result;
}

@Override
public void onFailure(Exception e) {
this.failure = e;
}

public T getResult() {
return result;
}

public Exception getFailure() {
return failure;
}
}

0 comments on commit 3d3f2c5

Please sign in to comment.