diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerIT.java index e96dedaa3e6a0..5074971ab1a1f 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerIT.java @@ -108,6 +108,9 @@ public void testRemoteCleanupDeleteStale() throws Exception { .add("cluster-state") .add(getClusterState().metadata().clusterUUID()); BlobPath manifestContainerPath = baseMetadataPath.add("manifest"); + RemoteClusterStateCleanupManager remoteClusterStateCleanupManager = internalCluster().getClusterManagerNodeInstance( + RemoteClusterStateCleanupManager.class + ); // set cleanup interval to 100 ms to make the test faster ClusterUpdateSettingsResponse response = client().admin() @@ -117,6 +120,7 @@ public void testRemoteCleanupDeleteStale() throws Exception { .get(); assertTrue(response.isAcknowledged()); + assertBusy(() -> assertEquals(100, remoteClusterStateCleanupManager.getStaleFileDeletionTask().getInterval().getMillis())); assertBusy(() -> { int manifestFiles = repository.blobStore().blobContainer(manifestContainerPath).listBlobsByPrefix("manifest").size(); @@ -128,7 +132,7 @@ public void testRemoteCleanupDeleteStale() throws Exception { "Current number of manifest files: " + manifestFiles, manifestFiles >= RETAINED_MANIFESTS && manifestFiles < RETAINED_MANIFESTS + 2 * SKIP_CLEANUP_STATE_CHANGES ); - }, 500, TimeUnit.MILLISECONDS); + }); // disable the clean up to avoid race condition during shutdown response = client().admin() diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java index 4098993246073..b052b6e1a613d 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java @@ -15,7 +15,6 @@ import org.opensearch.common.remote.RemoteWritableEntityStore; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; -import org.opensearch.core.xcontent.ToXContent; import org.opensearch.gateway.remote.model.RemoteClusterBlocks; import org.opensearch.gateway.remote.model.RemoteClusterStateBlobStore; import org.opensearch.gateway.remote.model.RemoteClusterStateCustoms; @@ -121,7 +120,7 @@ public CheckedRunnable<IOException> getAsyncMetadataReadAction( LatchedActionListener<RemoteReadResult> listener ) { final ActionListener actionListener = ActionListener.wrap( - response -> listener.onResponse(new RemoteReadResult((ToXContent) response, CLUSTER_STATE_ATTRIBUTE, component)), + response -> listener.onResponse(new RemoteReadResult(response, CLUSTER_STATE_ATTRIBUTE, component)), listener::onFailure ); return () -> getStore(blobEntity).readAsync(blobEntity, actionListener); diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index bd371ae671cf4..ada29fdb57c57 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -276,7 +276,7 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( ClusterState clusterState, ClusterMetadataManifest previousManifest ) throws IOException { - logger.info("WRITING INCREMENTAL STATE"); + logger.trace("WRITING INCREMENTAL STATE"); final long startTimeNanos = relativeTimeNanosSupplier.getAsLong(); if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) { @@ -766,7 +766,7 @@ private UploadedMetadataResults writeMetadataInParallel( throw new IllegalStateException("Unknown metadata component name " + name); } }); - logger.info("response {}", response.uploadedIndicesRoutingMetadata.toString()); + logger.trace("response {}", response.uploadedIndicesRoutingMetadata.toString()); return response; } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java index cd29114e05684..3053095368972 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java @@ -25,7 +25,6 @@ import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.compress.Compressor; import org.opensearch.core.xcontent.NamedXContentRegistry; -import org.opensearch.core.xcontent.ToXContent; import org.opensearch.gateway.remote.model.RemoteClusterStateBlobStore; import org.opensearch.gateway.remote.model.RemoteCoordinationMetadata; import org.opensearch.gateway.remote.model.RemoteCustomMetadata; @@ -194,7 +193,7 @@ CheckedRunnable<IOException> getAsyncMetadataReadAction( LatchedActionListener<RemoteReadResult> listener ) { ActionListener actionListener = ActionListener.wrap( - response -> listener.onResponse(new RemoteReadResult((ToXContent) response, readEntity.getType(), componentName)), + response -> listener.onResponse(new RemoteReadResult(response, readEntity.getType(), componentName)), listener::onFailure ); return () -> getStore(readEntity).readAsync(readEntity, actionListener); diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadataManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadataManager.java index a84161b202a22..39c7c73fae1f8 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadataManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadataManager.java @@ -136,24 +136,6 @@ IndexMetadata getIndexMetadata(ClusterMetadataManifest.UploadedIndexMetadata upl } } - /** - * Fetch latest index metadata from remote cluster state - * - * @param clusterMetadataManifest manifest file of cluster - * @param clusterUUID uuid of cluster state to refer to in remote - * @return {@code Map<String, IndexMetadata>} latest IndexUUID to IndexMetadata map - */ - Map<String, IndexMetadata> getIndexMetadataMap(String clusterUUID, ClusterMetadataManifest clusterMetadataManifest) { - assert Objects.equals(clusterUUID, clusterMetadataManifest.getClusterUUID()) - : "Corrupt ClusterMetadataManifest found. Cluster UUID mismatch."; - Map<String, IndexMetadata> remoteIndexMetadata = new HashMap<>(); - for (ClusterMetadataManifest.UploadedIndexMetadata uploadedIndexMetadata : clusterMetadataManifest.getIndices()) { - IndexMetadata indexMetadata = getIndexMetadata(uploadedIndexMetadata, clusterUUID); - remoteIndexMetadata.put(uploadedIndexMetadata.getIndexUUID(), indexMetadata); - } - return remoteIndexMetadata; - } - public TimeValue getIndexMetadataUploadTimeout() { return this.indexMetadataUploadTimeout; } diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifest.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifest.java index 328601139c150..1dc56712d4ab5 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifest.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifest.java @@ -131,16 +131,17 @@ public ClusterMetadataManifest deserialize(final InputStream inputStream) throws return blobStoreFormat.deserialize(blobName, getNamedXContentRegistry(), Streams.readFully(inputStream)); } - private int getManifestCodecVersion() { + // package private for testing + int getManifestCodecVersion() { assert blobName != null; - String[] splitName = blobName.split(DELIMITER); + String[] splitName = getBlobFileName().split(DELIMITER); if (splitName.length == SPLITTED_MANIFEST_FILE_LENGTH) { return Integer.parseInt(splitName[splitName.length - 1]); // Last value would be codec version. } else if (splitName.length < SPLITTED_MANIFEST_FILE_LENGTH) { // Where codec is not part of file name, i.e. default codec version 0 // is used. return ClusterMetadataManifest.CODEC_V0; } else { - throw new IllegalArgumentException("Manifest file name is corrupted"); + throw new IllegalArgumentException("Manifest file name is corrupted : " + blobName); } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteReadResult.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteReadResult.java index adee09eaeffef..06d3b88ae1ecf 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteReadResult.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteReadResult.java @@ -8,24 +8,22 @@ package org.opensearch.gateway.remote.model; -import org.opensearch.core.xcontent.ToXContent; - /** * Container class for entity read from remote store */ public class RemoteReadResult { - ToXContent obj; + Object obj; String component; String componentName; - public RemoteReadResult(ToXContent obj, String component, String componentName) { + public RemoteReadResult(Object obj, String component, String componentName) { this.obj = obj; this.component = component; this.componentName = componentName; } - public ToXContent getObj() { + public Object getObj() { return obj; } diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteIndexMetadataManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteIndexMetadataManagerTests.java new file mode 100644 index 0000000000000..2131576b118c9 --- /dev/null +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteIndexMetadataManagerTests.java @@ -0,0 +1,219 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.mockito.ArgumentCaptor; +import org.opensearch.Version; +import org.opensearch.action.LatchedActionListener; +import org.opensearch.cluster.metadata.AliasMetadata; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.CheckedRunnable; +import org.opensearch.common.Nullable; +import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.blobstore.stream.write.WriteContext; +import org.opensearch.common.blobstore.stream.write.WritePriority; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.compress.NoneCompressor; +import org.opensearch.gateway.remote.model.RemoteReadResult; +import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyIterable; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.opensearch.core.action.ActionListener.onResponse; +import static org.opensearch.gateway.remote.RemoteClusterStateService.FORMAT_PARAMS; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.PATH_DELIMITER; +import static org.opensearch.gateway.remote.model.RemoteIndexMetadata.INDEX; +import static org.opensearch.gateway.remote.model.RemoteIndexMetadata.INDEX_METADATA_FORMAT; + +public class RemoteIndexMetadataManagerTests extends OpenSearchTestCase { + + private RemoteIndexMetadataManager remoteIndexMetadataManager; + private BlobStoreRepository blobStoreRepository; + private BlobStoreTransferService blobStoreTransferService; + private Compressor compressor; + private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + @Before + public void setup() { + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + blobStoreRepository = mock(BlobStoreRepository.class); + BlobPath blobPath = new BlobPath().add("random-path"); + when((blobStoreRepository.basePath())).thenReturn(blobPath); + blobStoreTransferService = mock(BlobStoreTransferService.class); + compressor = new NoneCompressor(); + when(blobStoreRepository.getCompressor()).thenReturn(compressor); + remoteIndexMetadataManager = new RemoteIndexMetadataManager( + clusterSettings, + "test-cluster", + blobStoreRepository, + blobStoreTransferService, + threadPool + ); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdown(); + } + + public void testGetAsyncIndexMetadataWriteAction_Success() throws Exception { + IndexMetadata indexMetadata = getIndexMetadata(randomAlphaOfLength(10), randomBoolean(), randomAlphaOfLength(10)); + BlobContainer blobContainer = mock(AsyncMultiStreamBlobContainer.class); + BlobStore blobStore = mock(BlobStore.class); + when(blobStore.blobContainer(any())).thenReturn(blobContainer); + LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener = mock(LatchedActionListener.class); + ArgumentCaptor<ClusterMetadataManifest.UploadedMetadata> savedResult = ArgumentCaptor.forClass(ClusterMetadataManifest.UploadedMetadata.class); + String expectedFilePrefix = String.join( + DELIMITER, + "metadata", + RemoteStoreUtils.invertLong(indexMetadata.getVersion()) + ); + + doAnswer((invocationOnMock -> { + invocationOnMock.getArgument(4, ActionListener.class).onResponse(null); + return null; + })).when(blobStoreTransferService).uploadBlob(any(), any(), any(), eq(WritePriority.URGENT), any(ActionListener.class)); + + CheckedRunnable<IOException> runnable = remoteIndexMetadataManager.getAsyncIndexMetadataWriteAction( + indexMetadata, + "cluster-uuid", + latchedActionListener + ); + runnable.run(); + assertBusy(() -> verify(latchedActionListener, times(1)).onResponse(savedResult.capture())); + + ClusterMetadataManifest.UploadedMetadata uploadedMetadata = savedResult.getValue(); + assertNotNull(uploadedMetadata); + assertEquals(INDEX + "--" + indexMetadata.getIndex().getName(), uploadedMetadata.getComponent()); + String uploadedFileName = uploadedMetadata.getUploadedFilename(); + String[] pathTokens = uploadedFileName.split(PATH_DELIMITER); + assertEquals(7, pathTokens.length); + assertEquals(INDEX, pathTokens[4]); + assertEquals(indexMetadata.getIndex().getUUID(), pathTokens[5]); + assertTrue(pathTokens[6].startsWith(expectedFilePrefix)); + } + + public void testGetAsyncIndexMetadataWriteAction_IOFailure() throws Exception { + IndexMetadata indexMetadata = getIndexMetadata(randomAlphaOfLength(10), randomBoolean(), randomAlphaOfLength(10)); + BlobContainer blobContainer = mock(AsyncMultiStreamBlobContainer.class); + BlobStore blobStore = mock(BlobStore.class); + when(blobStore.blobContainer(any())).thenReturn(blobContainer); + LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener = mock(LatchedActionListener.class); + ArgumentCaptor<Exception> savedException = ArgumentCaptor.forClass(Exception.class); + String expectedFilePrefix = String.join( + DELIMITER, + "metadata", + RemoteStoreUtils.invertLong(indexMetadata.getVersion()) + ); + + doAnswer((invocationOnMock -> { + invocationOnMock.getArgument(4, ActionListener.class).onFailure(new IOException("failure")); + return null; + })).when(blobStoreTransferService).uploadBlob(any(), any(), any(), eq(WritePriority.URGENT), any(ActionListener.class)); + + CheckedRunnable<IOException> runnable = remoteIndexMetadataManager.getAsyncIndexMetadataWriteAction( + indexMetadata, + "cluster-uuid", + latchedActionListener + ); + runnable.run(); + assertBusy(() -> verify(latchedActionListener, times(1)).onFailure(savedException.capture())); + + Exception exception = savedException.getValue(); + assertNotNull(exception); + assertTrue(exception instanceof RemoteStateTransferException); + } + public void testGetAsyncIndexMetadataReadAction_Success() throws Exception { + IndexMetadata indexMetadata = getIndexMetadata(randomAlphaOfLength(10), randomBoolean(), randomAlphaOfLength(10)); + String fileName = randomAlphaOfLength(10); + fileName = fileName + DELIMITER + '2'; + when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( + INDEX_METADATA_FORMAT.serialize(indexMetadata, fileName, compressor, FORMAT_PARAMS).streamInput() + ); + AtomicReference<IndexMetadata> actualResponse = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + LatchedActionListener<RemoteReadResult> latchedActionListener = new LatchedActionListener<>( + ActionListener.wrap(response -> actualResponse.set((IndexMetadata) response.getObj()), Assert::assertNull) + , latch + ); + + CheckedRunnable<IOException> runnable = remoteIndexMetadataManager.getAsyncIndexMetadataReadAction( + "cluster-uuid", + fileName, + latchedActionListener + ); + assertNotNull(runnable); + try { + runnable.run(); + latch.await(); + assertEquals(indexMetadata, actualResponse.get()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public void testGetAsyncIndexMetadataReadAction_IOFailure() throws Exception { + String fileName = randomAlphaOfLength(10); + fileName = fileName + DELIMITER + '2'; + doThrow(new IOException("testing failure")).when(blobStoreTransferService).downloadBlob(anyIterable(), anyString()); + LatchedActionListener<RemoteReadResult> latchedActionListener = mock(LatchedActionListener.class); + + CheckedRunnable<IOException> runnable = remoteIndexMetadataManager.getAsyncIndexMetadataReadAction( + "cluster-uuid", + fileName, + latchedActionListener + ); + assertNotNull(runnable); + runnable.run(); + + verify(latchedActionListener, times(1)).onFailure(any(IOException.class)); + } + + private IndexMetadata getIndexMetadata(String name, @Nullable Boolean writeIndex, String... aliases) { + IndexMetadata.Builder builder = IndexMetadata.builder(name) + .settings( + Settings.builder() + .put("index.version.created", Version.CURRENT.id) + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 1) + ); + for (String alias : aliases) { + builder.putAlias(AliasMetadata.builder(alias).writeIndex(writeIndex).build()); + } + return builder.build(); + } +} diff --git a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifestTests.java b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifestTests.java index 7cb80a1600c03..938d7f9b84432 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifestTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifestTests.java @@ -41,6 +41,8 @@ import java.util.stream.Stream; import static java.util.stream.Collectors.toList; +import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V0; +import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V2; import static org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest.MANIFEST; import static org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest.MANIFEST_CURRENT_CODEC_VERSION; import static org.hamcrest.Matchers.greaterThan; @@ -191,6 +193,16 @@ public void testGenerateBlobFileName() { assertThat(nameTokens[3], is("C")); assertThat(RemoteStoreUtils.invertLong(nameTokens[4]), lessThanOrEqualTo(System.currentTimeMillis())); assertThat(nameTokens[5], is(String.valueOf(MANIFEST_CURRENT_CODEC_VERSION))); + + String blobName = "/usr/local/random/path/to/manifest/manifest__1__2__3__4__2"; + RemoteClusterMetadataManifest remoteObjectForDownload = new RemoteClusterMetadataManifest( + blobName, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertEquals("manifest__1__2__3__4__2", remoteObjectForDownload.generateBlobFileName()); + assertEquals(remoteObjectForDownload.getManifestCodecVersion(), 2); } public void testGetUploadedMetadata() throws IOException { @@ -236,6 +248,28 @@ public void testSerDe() throws IOException { assertThrows(IllegalArgumentException.class, () -> invalidRemoteObject.deserialize(new ByteArrayInputStream(new byte[0]))); } + public void testGetManifestCodecVersion() { + String manifestFileWithDelimiterInPath = + "123456789012_test-cluster/cluster-state/dsgYj10__Nkso7/manifest/manifest__9223372036854775806__9223372036854775804__C__9223370319103329556__2"; + RemoteClusterMetadataManifest remoteManifestForDownload = new RemoteClusterMetadataManifest( + manifestFileWithDelimiterInPath, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertEquals(CODEC_V2, remoteManifestForDownload.getManifestCodecVersion()); + + String v0ManifestFileWithDelimiterInPath = + "123456789012_test-cluster/cluster-state/dsgYj10__Nkso7/manifest/manifest__9223372036854775806__9223372036854775804__C__9223370319103329556"; + RemoteClusterMetadataManifest remoteManifestV0ForDownload = new RemoteClusterMetadataManifest( + v0ManifestFileWithDelimiterInPath, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertEquals(CODEC_V0, remoteManifestV0ForDownload.getManifestCodecVersion()); + } + private ClusterMetadataManifest getClusterMetadataManifest() { return ClusterMetadataManifest.builder() .opensearchVersion(Version.CURRENT)