From ddd2fa8623789b169f62b3e66df8f032d1200199 Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Thu, 18 Jul 2024 20:12:03 +0530 Subject: [PATCH] Add an assertion number of results to be equal to upload tasks Signed-off-by: Shivansh Arora --- .../AbstractRemoteWritableEntityManager.java | 8 +-- .../RemoteClusterStateAttributesManager.java | 10 ++-- .../remote/RemoteClusterStateService.java | 10 ++++ .../remote/RemoteGlobalMetadataManager.java | 12 ++--- .../remote/RemoteIndexMetadataManager.java | 10 ++-- ...tractRemoteWritableEntityManagerTests.java | 4 +- .../RemoteClusterStateServiceTests.java | 49 +++++++++++++++++++ 7 files changed, 81 insertions(+), 22 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableEntityManager.java b/server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableEntityManager.java index a72794e23426a..a168b31272c2f 100644 --- a/server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableEntityManager.java +++ b/server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableEntityManager.java @@ -43,13 +43,13 @@ protected RemoteWritableEntityStore getStore(AbstractRemoteWritableBlobEntity en * Returns an ActionListener for handling the write operation for the specified component, remote object, and latched action listener. * * @param component the component for which the write operation is performed - * @param remoteObject the remote object to be written + * @param remoteEntity the remote object to be written * @param listener the listener to be notified when the write operation completes * @return an ActionListener for handling the write operation */ protected abstract ActionListener getWriteActionListener( String component, - AbstractRemoteWritableBlobEntity remoteObject, + AbstractRemoteWritableBlobEntity remoteEntity, ActionListener listener ); @@ -58,13 +58,13 @@ protected abstract ActionListener getWriteActionListener( * remote object, and latched action listener. * * @param component the component for which the read operation is performed - * @param remoteObject the remote object to be read + * @param remoteEntity the remote object to be read * @param listener the listener to be notified when the read operation completes * @return an ActionListener for handling the read operation */ protected abstract ActionListener getReadActionListener( String component, - AbstractRemoteWritableBlobEntity remoteObject, + AbstractRemoteWritableBlobEntity remoteEntity, ActionListener listener ); diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java index 258e537577261..bcf7a404f850f 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java @@ -80,24 +80,24 @@ public class RemoteClusterStateAttributesManager extends AbstractRemoteWritableE @Override protected ActionListener getWriteActionListener( String component, - AbstractRemoteWritableBlobEntity remoteObject, + AbstractRemoteWritableBlobEntity remoteEntity, ActionListener listener ) { return ActionListener.wrap( - resp -> listener.onResponse(remoteObject.getUploadedMetadata()), - ex -> listener.onFailure(new RemoteStateTransferException("Upload failed for " + component, remoteObject, ex)) + resp -> listener.onResponse(remoteEntity.getUploadedMetadata()), + ex -> listener.onFailure(new RemoteStateTransferException("Upload failed for " + component, remoteEntity, ex)) ); } @Override protected ActionListener getReadActionListener( String component, - AbstractRemoteWritableBlobEntity remoteObject, + AbstractRemoteWritableBlobEntity remoteEntity, ActionListener listener ) { return ActionListener.wrap( response -> listener.onResponse(new RemoteReadResult(response, CLUSTER_STATE_ATTRIBUTE, component)), - ex -> listener.onFailure(new RemoteStateTransferException("Download failed for " + component, remoteObject, ex)) + ex -> listener.onFailure(new RemoteStateTransferException("Download failed for " + component, remoteEntity, ex)) ); } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 402fd14128627..f912125d6f059 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -695,6 +695,16 @@ UploadedMetadataResults writeMetadataInParallel( exceptionList.forEach(exception::addSuppressed); throw exception; } + if (results.size() != uploadTasks.size()) { + throw new RemoteStateTransferException( + String.format( + Locale.ROOT, + "Some metadata components were not uploaded successfully. Objects to be uploaded: %s, uploaded objects: %s", + String.join(", ", uploadTasks), + String.join(", ", results.keySet()) + ) + ); + } UploadedMetadataResults response = new UploadedMetadataResults(); results.forEach((name, uploadedMetadata) -> { if (uploadedMetadata.getClass().equals(UploadedIndexMetadata.class) diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java index f227ef8ae20e2..cef54e64890f9 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java @@ -159,24 +159,24 @@ public class RemoteGlobalMetadataManager extends AbstractRemoteWritableEntityMan @Override protected ActionListener getWriteActionListener( String component, - AbstractRemoteWritableBlobEntity remoteObject, + AbstractRemoteWritableBlobEntity remoteEntity, ActionListener listener ) { return ActionListener.wrap( - resp -> listener.onResponse(remoteObject.getUploadedMetadata()), - ex -> listener.onFailure(new RemoteStateTransferException("Upload failed for " + component, remoteObject, ex)) + resp -> listener.onResponse(remoteEntity.getUploadedMetadata()), + ex -> listener.onFailure(new RemoteStateTransferException("Upload failed for " + component, remoteEntity, ex)) ); } @Override protected ActionListener getReadActionListener( String component, - AbstractRemoteWritableBlobEntity remoteObject, + AbstractRemoteWritableBlobEntity remoteEntity, ActionListener listener ) { return ActionListener.wrap( - response -> listener.onResponse(new RemoteReadResult(response, remoteObject.getType(), component)), - ex -> listener.onFailure(new RemoteStateTransferException("Download failed for " + component, remoteObject, ex)) + response -> listener.onResponse(new RemoteReadResult(response, remoteEntity.getType(), component)), + ex -> listener.onFailure(new RemoteStateTransferException("Download failed for " + component, remoteEntity, ex)) ); } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadataManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadataManager.java index a132ae1493686..757f6a586926e 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadataManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadataManager.java @@ -106,24 +106,24 @@ private void setIndexMetadataUploadTimeout(TimeValue newIndexMetadataUploadTimeo @Override protected ActionListener getWriteActionListener( String component, - AbstractRemoteWritableBlobEntity remoteObject, + AbstractRemoteWritableBlobEntity remoteEntity, ActionListener listener ) { return ActionListener.wrap( - resp -> listener.onResponse(remoteObject.getUploadedMetadata()), - ex -> listener.onFailure(new RemoteStateTransferException("Upload failed for " + component, remoteObject, ex)) + resp -> listener.onResponse(remoteEntity.getUploadedMetadata()), + ex -> listener.onFailure(new RemoteStateTransferException("Upload failed for " + component, remoteEntity, ex)) ); } @Override protected ActionListener getReadActionListener( String component, - AbstractRemoteWritableBlobEntity remoteObject, + AbstractRemoteWritableBlobEntity remoteEntity, ActionListener listener ) { return ActionListener.wrap( response -> listener.onResponse(new RemoteReadResult(response, RemoteIndexMetadata.INDEX, component)), - ex -> listener.onFailure(new RemoteStateTransferException("Download failed for " + component, remoteObject, ex)) + ex -> listener.onFailure(new RemoteStateTransferException("Download failed for " + component, remoteEntity, ex)) ); } } diff --git a/server/src/test/java/org/opensearch/common/remote/AbstractRemoteWritableEntityManagerTests.java b/server/src/test/java/org/opensearch/common/remote/AbstractRemoteWritableEntityManagerTests.java index 73d2bf2d58473..01661aca02701 100644 --- a/server/src/test/java/org/opensearch/common/remote/AbstractRemoteWritableEntityManagerTests.java +++ b/server/src/test/java/org/opensearch/common/remote/AbstractRemoteWritableEntityManagerTests.java @@ -46,7 +46,7 @@ private static class ConcreteRemoteWritableEntityManager extends AbstractRemoteW @Override protected ActionListener getWriteActionListener( String component, - AbstractRemoteWritableBlobEntity remoteObject, + AbstractRemoteWritableBlobEntity remoteEntity, ActionListener listener ) { return null; @@ -55,7 +55,7 @@ protected ActionListener getWriteActionListener( @Override protected ActionListener getReadActionListener( String component, - AbstractRemoteWritableBlobEntity remoteObject, + AbstractRemoteWritableBlobEntity remoteEntity, ActionListener listener ) { return null; diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index c9ebc62d5b73c..785a4b1e389f8 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -45,6 +45,7 @@ import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.FeatureFlags; import org.opensearch.core.ParseField; import org.opensearch.core.action.ActionListener; @@ -80,6 +81,7 @@ import java.io.ByteArrayInputStream; import java.io.FileNotFoundException; import java.io.IOException; +import java.rmi.Remote; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -121,6 +123,7 @@ import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.FORMAT_PARAMS; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.getFormattedIndexFileName; +import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.GLOBAL_METADATA_UPLOAD_TIMEOUT_DEFAULT; import static org.opensearch.gateway.remote.model.RemoteClusterBlocks.CLUSTER_BLOCKS_FORMAT; import static org.opensearch.gateway.remote.model.RemoteClusterBlocksTests.randomClusterBlocks; import static org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest.MANIFEST_CURRENT_CODEC_VERSION; @@ -590,6 +593,52 @@ public void testFailWriteIncrementalMetadataWhenTermChanged() { ); } + public void testWriteMetadataInParallelIncompleteUpload() throws IOException { + final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); + final RemoteClusterStateService rcssSpy = Mockito.spy(remoteClusterStateService); + rcssSpy.start(); + RemoteIndexMetadataManager mockedIndexManager = mock(RemoteIndexMetadataManager.class); + RemoteGlobalMetadataManager mockedGlobalMetadataManager = mock(RemoteGlobalMetadataManager.class); + RemoteClusterStateAttributesManager mockedClusterStateAttributeManager = mock(RemoteClusterStateAttributesManager.class); + ClusterMetadataManifest.UploadedMetadata mockedUploadedMetadata = mock(ClusterMetadataManifest.UploadedMetadata.class); + rcssSpy.setRemoteIndexMetadataManager(mockedIndexManager); + rcssSpy.setRemoteGlobalMetadataManager(mockedGlobalMetadataManager); + rcssSpy.setRemoteClusterStateAttributesManager(mockedClusterStateAttributeManager); + ArgumentCaptor listenerArgumentCaptor = ArgumentCaptor.forClass(LatchedActionListener.class); + + when(mockedGlobalMetadataManager.getGlobalMetadataUploadTimeout()).thenReturn(GLOBAL_METADATA_UPLOAD_TIMEOUT_DEFAULT); + when(mockedUploadedMetadata.getComponent()).thenReturn("test-component"); + doAnswer(invocation -> { + listenerArgumentCaptor.getValue().onResponse(mockedUploadedMetadata); + return null; + }).when(mockedIndexManager).writeAsync(any(), any(), listenerArgumentCaptor.capture()); + doAnswer(invocation -> { + listenerArgumentCaptor.getValue().onResponse(mockedUploadedMetadata); + return null; + }).when(mockedGlobalMetadataManager).writeAsync(anyString(), any(), listenerArgumentCaptor.capture()); + doAnswer(invocation -> { + listenerArgumentCaptor.getValue().onResponse(mockedUploadedMetadata); + return null; + }).when(mockedClusterStateAttributeManager).writeAsync(any(), any(), listenerArgumentCaptor.capture()); + + RemoteStateTransferException exception = expectThrows(RemoteStateTransferException.class, () -> rcssSpy.writeMetadataInParallel( + clusterState, + new ArrayList<>(clusterState.getMetadata().indices().values()), + emptyMap(), + clusterState.getMetadata().customs(), + true, + true, + true, + true, + true, + true, + clusterState.getCustoms(), + true, + emptyList() + )); + assertTrue(exception.getMessage().startsWith("Some metadata components were not uploaded successfully")); + } + public void testWriteIncrementalMetadataSuccess() throws IOException { final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); mockBlobStoreObjects();