diff --git a/CHANGELOG.md b/CHANGELOG.md index ea2cc9568ac58..c49be12b043c2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -90,6 +90,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add capability to restrict async durability mode for remote indexes ([#10189](https://github.com/opensearch-project/OpenSearch/pull/10189)) - Add Doc Status Counter for Indexing Engine ([#4562](https://github.com/opensearch-project/OpenSearch/issues/4562)) - Add unreferenced file cleanup count to merge stats ([#10204](https://github.com/opensearch-project/OpenSearch/pull/10204)) +- [Remote Store] Add support to restrict creation & deletion if system repository and mutation of immutable settings of system repository ([#9839](https://github.com/opensearch-project/OpenSearch/pull/9839)) - Improve compressed request handling ([#10261](https://github.com/opensearch-project/OpenSearch/pull/10261)) ### Dependencies @@ -104,7 +105,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Bump `tibdex/github-app-token` from 1.5.0 to 2.1.0 ([#10125](https://github.com/opensearch-project/OpenSearch/pull/10125)) - Bump `org.wiremock:wiremock-standalone` from 2.35.0 to 3.1.0 ([#9752](https://github.com/opensearch-project/OpenSearch/pull/9752)) - Bump `com.google.http-client:google-http-client-jackson2` from 1.43.2 to 1.43.3 ([#10126](https://github.com/opensearch-project/OpenSearch/pull/10126)) -- Bump `org.xerial.snappy:snappy-java` from 1.1.10.3 to 1.1.10.4 ([#10206](https://github.com/opensearch-project/OpenSearch/pull/10206)) +- Bump `org.xerial.snappy:snappy-java` from 1.1.10.3 to 1.1.10.5 ([#10206](https://github.com/opensearch-project/OpenSearch/pull/10206), [#10299](https://github.com/opensearch-project/OpenSearch/pull/10299)) - Bump `com.google.api.grpc:proto-google-common-protos` from 2.10.0 to 2.25.0 ([#10208](https://github.com/opensearch-project/OpenSearch/pull/10208)) - Bump `codecov/codecov-action` from 2 to 3 ([#10209](https://github.com/opensearch-project/OpenSearch/pull/10209)) - Bump `org.bouncycastle:bcpkix-jdk15to18` from 1.75 to 1.76 ([10219](https://github.com/opensearch-project/OpenSearch/pull/10219))` @@ -125,6 +126,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Tracing Framework] Add support for SpanKind. ([#10122](https://github.com/opensearch-project/OpenSearch/pull/10122)) - Pass parent filter to inner query in nested query ([#10246](https://github.com/opensearch-project/OpenSearch/pull/10246)) - Disable concurrent segment search when terminate_after is used ([#10200](https://github.com/opensearch-project/OpenSearch/pull/10200)) +- Enable remote segment upload backpressure by default ([#10356](https://github.com/opensearch-project/OpenSearch/pull/10356)) ### Deprecated diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultTracer.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultTracer.java index d3c28b3a9cb5e..79b7e4aca6c2f 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultTracer.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultTracer.java @@ -25,7 +25,10 @@ */ @InternalApi class DefaultTracer implements Tracer { - static final String THREAD_NAME = "th_name"; + /** + * Current thread name. + */ + static final String THREAD_NAME = "thread.name"; private final TracingTelemetry tracingTelemetry; private final TracerContextStorage tracerContextStorage; diff --git a/plugins/repository-azure/src/main/java/org/opensearch/repositories/azure/AzureRepository.java b/plugins/repository-azure/src/main/java/org/opensearch/repositories/azure/AzureRepository.java index 65852c4fc5bd0..381a35bbc11e1 100644 --- a/plugins/repository-azure/src/main/java/org/opensearch/repositories/azure/AzureRepository.java +++ b/plugins/repository-azure/src/main/java/org/opensearch/repositories/azure/AzureRepository.java @@ -47,6 +47,8 @@ import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.repositories.blobstore.MeteredBlobStoreRepository; +import java.util.ArrayList; +import java.util.List; import java.util.Locale; import java.util.Map; import java.util.function.Function; @@ -192,4 +194,13 @@ protected ByteSizeValue chunkSize() { public boolean isReadOnly() { return readonly; } + + @Override + public List> getRestrictedSystemRepositorySettings() { + List> restrictedSettings = new ArrayList<>(); + restrictedSettings.addAll(super.getRestrictedSystemRepositorySettings()); + restrictedSettings.add(Repository.BASE_PATH_SETTING); + restrictedSettings.add(Repository.LOCATION_MODE_SETTING); + return restrictedSettings; + } } diff --git a/plugins/repository-azure/src/test/java/org/opensearch/repositories/azure/AzureRepositorySettingsTests.java b/plugins/repository-azure/src/test/java/org/opensearch/repositories/azure/AzureRepositorySettingsTests.java index c8be30dbaf865..3356e5174592a 100644 --- a/plugins/repository-azure/src/test/java/org/opensearch/repositories/azure/AzureRepositorySettingsTests.java +++ b/plugins/repository-azure/src/test/java/org/opensearch/repositories/azure/AzureRepositorySettingsTests.java @@ -34,16 +34,20 @@ import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.Environment; import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.repositories.blobstore.BlobStoreTestUtil; import org.opensearch.test.OpenSearchTestCase; import org.junit.AfterClass; +import java.util.List; + import reactor.core.scheduler.Schedulers; import static org.hamcrest.Matchers.is; @@ -179,4 +183,21 @@ public void testChunkSize() { ); } + public void testSystemRepositoryDefault() { + assertThat(azureRepository(Settings.EMPTY).isSystemRepository(), is(false)); + } + + public void testSystemRepositoryOn() { + assertThat(azureRepository(Settings.builder().put("system_repository", true).build()).isSystemRepository(), is(true)); + } + + public void testRestrictedSettingsDefault() { + List> restrictedSettings = azureRepository(Settings.EMPTY).getRestrictedSystemRepositorySettings(); + assertThat(restrictedSettings.size(), is(5)); + assertTrue(restrictedSettings.contains(BlobStoreRepository.SYSTEM_REPOSITORY_SETTING)); + assertTrue(restrictedSettings.contains(BlobStoreRepository.READONLY_SETTING)); + assertTrue(restrictedSettings.contains(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY)); + assertTrue(restrictedSettings.contains(AzureRepository.Repository.BASE_PATH_SETTING)); + assertTrue(restrictedSettings.contains(AzureRepository.Repository.LOCATION_MODE_SETTING)); + } } diff --git a/plugins/repository-gcs/src/main/java/org/opensearch/repositories/gcs/GoogleCloudStorageRepository.java b/plugins/repository-gcs/src/main/java/org/opensearch/repositories/gcs/GoogleCloudStorageRepository.java index c42cd1802f6e9..13671bc2aa8d6 100644 --- a/plugins/repository-gcs/src/main/java/org/opensearch/repositories/gcs/GoogleCloudStorageRepository.java +++ b/plugins/repository-gcs/src/main/java/org/opensearch/repositories/gcs/GoogleCloudStorageRepository.java @@ -46,6 +46,8 @@ import org.opensearch.repositories.RepositoryException; import org.opensearch.repositories.blobstore.MeteredBlobStoreRepository; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.function.Function; @@ -138,6 +140,15 @@ protected ByteSizeValue chunkSize() { return chunkSize; } + @Override + public List> getRestrictedSystemRepositorySettings() { + List> restrictedSettings = new ArrayList<>(); + restrictedSettings.addAll(super.getRestrictedSystemRepositorySettings()); + restrictedSettings.add(BUCKET); + restrictedSettings.add(BASE_PATH); + return restrictedSettings; + } + /** * Get a given setting from the repository settings, throwing a {@link RepositoryException} if the setting does not exist or is empty. */ diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java index 59111e94df22e..8e69fc39f128e 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java @@ -62,7 +62,9 @@ import org.opensearch.snapshots.SnapshotInfo; import org.opensearch.threadpool.Scheduler; +import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -388,6 +390,15 @@ protected ByteSizeValue chunkSize() { return chunkSize; } + @Override + public List> getRestrictedSystemRepositorySettings() { + List> restrictedSettings = new ArrayList<>(); + restrictedSettings.addAll(super.getRestrictedSystemRepositorySettings()); + restrictedSettings.add(BUCKET_SETTING); + restrictedSettings.add(BASE_PATH_SETTING); + return restrictedSettings; + } + @Override protected void doClose() { final Scheduler.Cancellable cancellable = finalizationFuture.getAndSet(null); diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3RepositoryTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3RepositoryTests.java index 533c3aa17009d..9ea8d98505161 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3RepositoryTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3RepositoryTests.java @@ -36,17 +36,20 @@ import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.repositories.RepositoryException; +import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.repositories.blobstore.BlobStoreTestUtil; import org.opensearch.test.OpenSearchTestCase; import org.hamcrest.Matchers; import java.nio.file.Path; +import java.util.List; import java.util.Map; import static org.hamcrest.Matchers.containsString; @@ -133,6 +136,19 @@ public void testDefaultBufferSize() { } } + public void testRestrictedSettingsDefault() { + final RepositoryMetadata metadata = new RepositoryMetadata("dummy-repo", "mock", Settings.EMPTY); + try (S3Repository s3repo = createS3Repo(metadata)) { + List> restrictedSettings = s3repo.getRestrictedSystemRepositorySettings(); + assertThat(restrictedSettings.size(), is(5)); + assertTrue(restrictedSettings.contains(BlobStoreRepository.SYSTEM_REPOSITORY_SETTING)); + assertTrue(restrictedSettings.contains(BlobStoreRepository.READONLY_SETTING)); + assertTrue(restrictedSettings.contains(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY)); + assertTrue(restrictedSettings.contains(S3Repository.BUCKET_SETTING)); + assertTrue(restrictedSettings.contains(S3Repository.BASE_PATH_SETTING)); + } + } + private S3Repository createS3Repo(RepositoryMetadata metadata) { return new S3Repository( metadata, diff --git a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java index bb08b19df765b..07f85496f13cf 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java @@ -650,7 +650,15 @@ public void postDelete(ShardId shardId, Engine.Delete delete, Engine.DeleteResul } } }; - final IndexShard newShard = newIndexShard(indexService, shard, wrapper, getInstanceFromNode(CircuitBreakerService.class), listener); + NodeEnvironment env = getInstanceFromNode(NodeEnvironment.class); + final IndexShard newShard = newIndexShard( + indexService, + shard, + wrapper, + getInstanceFromNode(CircuitBreakerService.class), + env.nodeId(), + listener + ); shardRef.set(newShard); recoverShard(newShard); @@ -674,6 +682,7 @@ public static final IndexShard newIndexShard( final IndexShard shard, CheckedFunction wrapper, final CircuitBreakerService cbs, + final String nodeId, final IndexingOperationListener... listeners ) throws IOException { ShardRouting initializingShardRouting = getInitializingShardRouting(shard.routingEntry()); @@ -702,7 +711,8 @@ public static final IndexShard newIndexShard( SegmentReplicationCheckpointPublisher.EMPTY, null, null, - () -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL + () -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL, + nodeId ); } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/AbstractRemoteStoreMockRepositoryIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/AbstractRemoteStoreMockRepositoryIntegTestCase.java index bc55f6cc2cbcb..9d4d8aa24bd51 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/AbstractRemoteStoreMockRepositoryIntegTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/AbstractRemoteStoreMockRepositoryIntegTestCase.java @@ -33,7 +33,6 @@ import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY; -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; public abstract class AbstractRemoteStoreMockRepositoryIntegTestCase extends AbstractSnapshotIntegTestCase { @@ -107,11 +106,11 @@ public Settings buildRemoteStoreNodeAttributes(Path repoLocation, double ioFailu .build(); } - protected void deleteRepo() { - logger.info("--> Deleting the repository={}", REPOSITORY_NAME); - assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME)); - logger.info("--> Deleting the repository={}", TRANSLOG_REPOSITORY_NAME); - assertAcked(clusterAdmin().prepareDeleteRepository(TRANSLOG_REPOSITORY_NAME)); + protected void cleanupRepo() { + logger.info("--> Cleanup the repository={}", REPOSITORY_NAME); + clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).execute().actionGet(); + logger.info("--> Cleanup the repository={}", TRANSLOG_REPOSITORY_NAME); + clusterAdmin().prepareCleanupRepository(TRANSLOG_REPOSITORY_NAME).execute().actionGet(); } protected String setup(Path repoLocation, double ioFailureRate, String skipExceptionBlobList, long maxFailure) { @@ -125,6 +124,8 @@ protected String setup(Path repoLocation, double ioFailureRate, String skipExcep settings.put(CLUSTER_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT); } + disableRepoConsistencyCheck("Remote Store Creates System Repository"); + internalCluster().startClusterManagerOnlyNode(settings.build()); String dataNodeName = internalCluster().startDataOnlyNode(settings.build()); createIndex(INDEX_NAME); diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexRecoveryIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexRecoveryIT.java index 4eb1cc7703735..c957f1b338bfe 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexRecoveryIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexRecoveryIT.java @@ -23,7 +23,6 @@ import java.nio.file.Path; import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings; -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class RemoteIndexRecoveryIT extends IndexRecoveryIT { @@ -57,7 +56,7 @@ public Settings indexSettings() { @After public void teardown() { - assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME)); + clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get(); } @Override diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java index 4ebccb9b9e551..865b2d13f189e 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java @@ -57,7 +57,7 @@ public void setup() { @After public void teardown() { - assertAcked(clusterAdmin().prepareDeleteRepository(BASE_REMOTE_REPO)); + clusterAdmin().prepareCleanupRepository(BASE_REMOTE_REPO).get(); } @Override @@ -422,7 +422,7 @@ public void testRestoreShallowCopySnapshotWithDifferentRepo() throws IOException assertDocsPresentInIndex(client, restoredIndexName1, numDocsInIndex1 + 2); } - public void testRestoreShallowSnapshotRepositoryOverriden() throws ExecutionException, InterruptedException { + public void testRestoreShallowSnapshotRepository() throws ExecutionException, InterruptedException { String indexName1 = "testindex1"; String snapshotRepoName = "test-restore-snapshot-repo"; String remoteStoreRepoNameUpdated = "test-rs-repo-updated" + TEST_REMOTE_STORE_REPO_SUFFIX; @@ -464,22 +464,7 @@ public void testRestoreShallowSnapshotRepositoryOverriden() throws ExecutionExce assertThat(snapshotInfo1.successfulShards(), equalTo(snapshotInfo1.totalShards())); assertThat(snapshotInfo1.state(), equalTo(SnapshotState.SUCCESS)); - createRepository(BASE_REMOTE_REPO, "fs", absolutePath2); - - RestoreSnapshotResponse restoreSnapshotResponse = client.admin() - .cluster() - .prepareRestoreSnapshot(snapshotRepoName, snapshotName1) - .setWaitForCompletion(true) - .setIndices(indexName1) - .setRenamePattern(indexName1) - .setRenameReplacement(restoredIndexName1) - .get(); - - assertTrue(restoreSnapshotResponse.getRestoreInfo().failedShards() > 0); - - ensureRed(restoredIndexName1); - - client().admin().indices().close(Requests.closeIndexRequest(restoredIndexName1)).get(); + client().admin().indices().close(Requests.closeIndexRequest(indexName1)).get(); createRepository(remoteStoreRepoNameUpdated, "fs", remoteRepoPath); RestoreSnapshotResponse restoreSnapshotResponse2 = client.admin() .cluster() diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureIT.java index d02c5bf54fbed..3462054c23630 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureIT.java @@ -112,7 +112,7 @@ private void validateBackpressure( stats = stats(); indexDocAndRefresh(initialSource, initialDocsToIndex); assertEquals(rejectionCount, stats.rejectionCount); - deleteRepo(); + cleanupRepo(); } private RemoteSegmentTransferTracker.Stats stats() { diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java index 157f8e41fee24..bf536557b1485 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java @@ -27,6 +27,7 @@ import org.opensearch.index.IndexSettings; import org.opensearch.index.mapper.MapperService; import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.repositories.fs.FsRepository; import org.opensearch.test.OpenSearchIntegTestCase; @@ -50,7 +51,6 @@ import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY; -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; public class RemoteStoreBaseIntegTestCase extends OpenSearchIntegTestCase { protected static final String REPOSITORY_NAME = "test-remote-store-repo"; @@ -314,8 +314,8 @@ public void teardown() { clusterSettingsSuppliedByTest = false; assertRemoteStoreRepositoryOnAllNodes(REPOSITORY_NAME); assertRemoteStoreRepositoryOnAllNodes(REPOSITORY_2_NAME); - assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME)); - assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_2_NAME)); + clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get(); + clusterAdmin().prepareCleanupRepository(REPOSITORY_2_NAME).get(); } public RepositoryMetadata buildRepositoryMetadata(DiscoveryNode node, String name) { @@ -343,11 +343,18 @@ public void assertRemoteStoreRepositoryOnAllNodes(String repositoryName) { .custom(RepositoriesMetadata.TYPE); RepositoryMetadata actualRepository = repositories.repository(repositoryName); + final RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class); + final BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(repositoryName); + for (String nodeName : internalCluster().getNodeNames()) { ClusterService clusterService = internalCluster().getInstance(ClusterService.class, nodeName); DiscoveryNode node = clusterService.localNode(); RepositoryMetadata expectedRepository = buildRepositoryMetadata(node, repositoryName); - assertTrue(actualRepository.equalsIgnoreGenerations(expectedRepository)); + + // Validated that all the restricted settings are entact on all the nodes. + repository.getRestrictedSystemRepositorySettings() + .stream() + .forEach(setting -> assertEquals(setting.get(actualRepository.settings()), setting.get(expectedRepository.settings()))); } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java index b97e93f323fb2..88760b7bbfad2 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java @@ -56,7 +56,7 @@ public void testRemoteRefreshRetryOnFailure() throws Exception { logger.info("Local files = {}, Repo files = {}", sortedFilesInLocal, sortedFilesInRepo); assertTrue(filesInRepo.containsAll(filesInLocal)); }, 90, TimeUnit.SECONDS); - deleteRepo(); + cleanupRepo(); } public void testRemoteRefreshSegmentPressureSettingChanged() { diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRepositoryRegistrationIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRepositoryRegistrationIT.java index 4d56a1e94e3fc..002a149f0c286 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRepositoryRegistrationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRepositoryRegistrationIT.java @@ -8,6 +8,10 @@ package org.opensearch.remotestore; +import org.opensearch.client.Client; +import org.opensearch.cluster.metadata.RepositoryMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.plugins.Plugin; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.disruption.NetworkDisruption; @@ -94,4 +98,68 @@ public void testMultiNodeClusterRandomNodeRecoverNetworkIsolation() { internalCluster().clearDisruptionScheme(); } + + public void testMultiNodeClusterRandomNodeRecoverNetworkIsolationPostNonRestrictedSettingsUpdate() { + Set nodesInOneSide = internalCluster().startNodes(3).stream().collect(Collectors.toCollection(HashSet::new)); + Set nodesInAnotherSide = internalCluster().startNodes(3).stream().collect(Collectors.toCollection(HashSet::new)); + ensureStableCluster(6); + + NetworkDisruption networkDisruption = new NetworkDisruption( + new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInAnotherSide), + NetworkDisruption.DISCONNECT + ); + internalCluster().setDisruptionScheme(networkDisruption); + + networkDisruption.startDisrupting(); + + final Client client = client(nodesInOneSide.iterator().next()); + RepositoryMetadata repositoryMetadata = client.admin() + .cluster() + .prepareGetRepositories(REPOSITORY_NAME) + .get() + .repositories() + .get(0); + Settings.Builder updatedSettings = Settings.builder().put(repositoryMetadata.settings()).put("chunk_size", new ByteSizeValue(20)); + updatedSettings.remove("system_repository"); + + client.admin() + .cluster() + .preparePutRepository(repositoryMetadata.name()) + .setType(repositoryMetadata.type()) + .setSettings(updatedSettings) + .get(); + + ensureStableCluster(3, nodesInOneSide.stream().findAny().get()); + networkDisruption.stopDisrupting(); + + ensureStableCluster(6); + + internalCluster().clearDisruptionScheme(); + } + + public void testNodeRestartPostNonRestrictedSettingsUpdate() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + internalCluster().startNodes(3); + + final Client client = client(); + RepositoryMetadata repositoryMetadata = client.admin() + .cluster() + .prepareGetRepositories(REPOSITORY_NAME) + .get() + .repositories() + .get(0); + Settings.Builder updatedSettings = Settings.builder().put(repositoryMetadata.settings()).put("chunk_size", new ByteSizeValue(20)); + updatedSettings.remove("system_repository"); + + client.admin() + .cluster() + .preparePutRepository(repositoryMetadata.name()) + .setType(repositoryMetadata.type()) + .setSettings(updatedSettings) + .get(); + + internalCluster().restartRandomDataNode(); + + ensureStableCluster(4); + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreIT.java index 45c3ef7f5bae5..23864c35ad154 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreIT.java @@ -17,7 +17,6 @@ import java.nio.file.Path; import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings; -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; /** * This class runs Segment Replication Integ test suite with remote store enabled. @@ -50,6 +49,6 @@ public void setup() { @After public void teardown() { - assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME)); + clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get(); } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationWithRemoteStorePressureIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationWithRemoteStorePressureIT.java index 0da4d81a8871e..6cfc76b7e3223 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationWithRemoteStorePressureIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationWithRemoteStorePressureIT.java @@ -17,7 +17,6 @@ import java.nio.file.Path; import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings; -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; /** * This class executes the SegmentReplicationPressureIT suite with remote store integration enabled. @@ -49,6 +48,6 @@ public void setup() { @After public void teardown() { - assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME)); + clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get(); } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartIT.java index 98fab139f4902..3dfde6f472525 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartIT.java @@ -108,8 +108,15 @@ public RepositoryMetadata buildRepositoryMetadata(DiscoveryNode node, String nam } public void testRateLimitedRemoteUploads() throws Exception { + clusterSettingsSuppliedByTest = true; overrideBuildRepositoryMetadata = true; - internalCluster().startNode(); + Settings.Builder clusterSettings = Settings.builder() + .put(remoteStoreClusterSettings(REPOSITORY_NAME, repositoryLocation, REPOSITORY_2_NAME, repositoryLocation)); + clusterSettings.put( + String.format(Locale.getDefault(), "node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, REPOSITORY_NAME), + MockFsRepositoryPlugin.TYPE + ); + internalCluster().startNode(clusterSettings.build()); Client client = client(); logger.info("--> updating repository"); assertAcked( @@ -119,7 +126,6 @@ public void testRateLimitedRemoteUploads() throws Exception { .setType(MockFsRepositoryPlugin.TYPE) .setSettings( Settings.builder() - .put(BlobStoreRepository.SYSTEM_REPOSITORY_SETTING.getKey(), true) .put("location", repositoryLocation) .put("compress", compress) .put("max_remote_upload_bytes_per_sec", "1kb") diff --git a/server/src/internalClusterTest/java/org/opensearch/repositories/RepositoriesServiceIT.java b/server/src/internalClusterTest/java/org/opensearch/repositories/RepositoriesServiceIT.java index f149d538cc47a..b8415f4b41815 100644 --- a/server/src/internalClusterTest/java/org/opensearch/repositories/RepositoriesServiceIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/repositories/RepositoriesServiceIT.java @@ -108,4 +108,16 @@ public void testUpdateRepository() { final Repository updatedRepository = repositoriesService.repository(repositoryName); assertThat(updatedRepository, updated ? not(sameInstance(originalRepository)) : sameInstance(originalRepository)); } + + public void testSystemRepositoryCantBeCreated() { + internalCluster(); + final String repositoryName = "test-repo"; + final Client client = client(); + final Settings.Builder repoSettings = Settings.builder().put("system_repository", true).put("location", randomRepoPath()); + + assertThrows( + RepositoryException.class, + () -> client.admin().cluster().preparePutRepository(repositoryName).setType(FsRepository.TYPE).setSettings(repoSettings).get() + ); + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotIT.java index 066d82483ae91..83f93ab9ff6b5 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotIT.java @@ -215,9 +215,6 @@ public void testShallowCloneNameAvailability() throws Exception { final Path shallowSnapshotRepoPath = randomRepoPath(); createRepository(shallowSnapshotRepoName, "fs", snapshotRepoSettingsForShallowCopy(shallowSnapshotRepoPath)); - final Path remoteStoreRepoPath = randomRepoPath(); - createRepository(remoteStoreRepoName, "fs", remoteStoreRepoPath); - final String indexName = "index-1"; createIndexWithRandomDocs(indexName, randomIntBetween(5, 10)); diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/SystemRepositoryIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/SystemRepositoryIT.java new file mode 100644 index 0000000000000..f50fc691fb232 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/SystemRepositoryIT.java @@ -0,0 +1,71 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.snapshots; + +import org.opensearch.client.Client; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.repositories.RepositoryException; +import org.opensearch.repositories.fs.FsRepository; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.junit.Before; + +import java.nio.file.Path; + +import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class SystemRepositoryIT extends AbstractSnapshotIntegTestCase { + protected Path absolutePath; + final String systemRepoName = "system-repo-name"; + + @Before + public void setup() { + absolutePath = randomRepoPath().toAbsolutePath(); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(remoteStoreClusterSettings(systemRepoName, absolutePath)) + .build(); + } + + public void testRestrictedSettingsCantBeUpdated() { + disableRepoConsistencyCheck("System repository is being used for the test"); + + internalCluster().startNode(); + final Client client = client(); + final Settings.Builder repoSettings = Settings.builder().put("location", randomRepoPath()); + + RepositoryException e = expectThrows( + RepositoryException.class, + () -> client.admin().cluster().preparePutRepository(systemRepoName).setType("mock").setSettings(repoSettings).get() + ); + assertEquals( + e.getMessage(), + "[system-repo-name] trying to modify an unmodifiable attribute type of system " + + "repository from current value [fs] to new value [mock]" + ); + } + + public void testSystemRepositoryNonRestrictedSettingsCanBeUpdated() { + disableRepoConsistencyCheck("System repository is being used for the test"); + + internalCluster().startNode(); + final Client client = client(); + final Settings.Builder repoSettings = Settings.builder().put("location", absolutePath).put("chunk_size", new ByteSizeValue(20)); + + assertAcked( + client.admin().cluster().preparePutRepository(systemRepoName).setType(FsRepository.TYPE).setSettings(repoSettings).get() + ); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/repositories/put/TransportPutRepositoryAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/repositories/put/TransportPutRepositoryAction.java index 644f23d2bafe6..1eadab6b1352e 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/repositories/put/TransportPutRepositoryAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/repositories/put/TransportPutRepositoryAction.java @@ -100,7 +100,7 @@ protected void clusterManagerOperation( ClusterState state, final ActionListener listener ) { - repositoriesService.registerRepository( + repositoriesService.registerOrUpdateRepository( request, ActionListener.delegateFailure( listener, diff --git a/server/src/main/java/org/opensearch/cluster/metadata/RepositoriesMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/RepositoriesMetadata.java index 1ab402e1bde4e..a5ef337c3b62a 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/RepositoriesMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/RepositoriesMetadata.java @@ -54,6 +54,8 @@ import java.util.EnumSet; import java.util.List; +import static org.opensearch.repositories.blobstore.BlobStoreRepository.SYSTEM_REPOSITORY_SETTING; + /** * Contains metadata about registered snapshot repositories * @@ -288,8 +290,12 @@ public static void toXContent(RepositoryMetadata repository, XContentBuilder bui if (repository.cryptoMetadata() != null) { repository.cryptoMetadata().toXContent(repository.cryptoMetadata(), builder, params); } + Settings settings = repository.settings(); + if (SYSTEM_REPOSITORY_SETTING.get(settings)) { + settings = repository.settings().filter(s -> !s.equals(SYSTEM_REPOSITORY_SETTING.getKey())); + } builder.startObject("settings"); - repository.settings().toXContent(builder, params); + settings.toXContent(builder, params); builder.endObject(); if (params.paramAsBoolean(HIDE_GENERATIONS_PARAM, false) == false) { diff --git a/server/src/main/java/org/opensearch/http/AbstractHttpServerTransport.java b/server/src/main/java/org/opensearch/http/AbstractHttpServerTransport.java index bc64bd85cac04..7e847d04d30e3 100644 --- a/server/src/main/java/org/opensearch/http/AbstractHttpServerTransport.java +++ b/server/src/main/java/org/opensearch/http/AbstractHttpServerTransport.java @@ -298,6 +298,7 @@ static int resolvePublishPort(Settings settings, List boundAdd } public void onException(HttpChannel channel, Exception e) { + channel.handleException(e); if (lifecycle.started() == false) { // just close and ignore - we are already stopped and just need to make sure we release all resources CloseableChannel.closeChannel(channel); diff --git a/server/src/main/java/org/opensearch/http/HttpChannel.java b/server/src/main/java/org/opensearch/http/HttpChannel.java index 99aaed23c69b8..6dcdaf9034413 100644 --- a/server/src/main/java/org/opensearch/http/HttpChannel.java +++ b/server/src/main/java/org/opensearch/http/HttpChannel.java @@ -43,6 +43,11 @@ * @opensearch.internal */ public interface HttpChannel extends CloseableChannel { + /** + * Notify HTTP channel that exception happens and the response may not be sent (for example, timeout) + * @param ex the exception being raised + */ + default void handleException(Exception ex) {} /** * Sends an http response to the channel. The listener will be executed once the send process has been diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index ca0cc307e460b..fdda8d4ce2497 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -519,7 +519,8 @@ public synchronized IndexShard createShard( this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null, remoteStore, remoteStoreStatsTrackerFactory, - clusterRemoteTranslogBufferIntervalSupplier + clusterRemoteTranslogBufferIntervalSupplier, + nodeEnv.nodeId() ); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); diff --git a/server/src/main/java/org/opensearch/index/merge/MergeStats.java b/server/src/main/java/org/opensearch/index/merge/MergeStats.java index fc5bac24a60d6..a284cec247ff1 100644 --- a/server/src/main/java/org/opensearch/index/merge/MergeStats.java +++ b/server/src/main/java/org/opensearch/index/merge/MergeStats.java @@ -82,7 +82,7 @@ public MergeStats(StreamInput in) throws IOException { totalStoppedTimeInMillis = in.readVLong(); totalThrottledTimeInMillis = in.readVLong(); totalBytesPerSecAutoThrottle = in.readVLong(); - if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + if (in.getVersion().onOrAfter(Version.V_2_11_0)) { unreferencedFileCleanUpsPerformed = in.readOptionalVLong(); } } @@ -297,7 +297,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVLong(totalStoppedTimeInMillis); out.writeVLong(totalThrottledTimeInMillis); out.writeVLong(totalBytesPerSecAutoThrottle); - if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + if (out.getVersion().onOrAfter(Version.V_2_11_0)) { out.writeOptionalVLong(unreferencedFileCleanUpsPerformed); } } diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureSettings.java b/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureSettings.java index 864fe24c282a2..e66aa3444c214 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureSettings.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureSettings.java @@ -30,7 +30,7 @@ static class Defaults { public static final Setting REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED = Setting.boolSetting( "remote_store.segment.pressure.enabled", - false, + true, Setting.Property.Dynamic, Setting.Property.NodeScope ); diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java b/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java index 114d07589b0c0..0ca9e0209c5ec 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java @@ -8,7 +8,13 @@ package org.opensearch.index.remote; +import org.opensearch.common.collect.Tuple; + import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; /** * Utils for remote store @@ -69,4 +75,31 @@ public static String getSegmentName(String filename) { return filename.substring(0, endIdx); } + + /** + * + * @param mdFiles List of segment/translog metadata files + * @param fn Function to extract PrimaryTerm_Generation and Node Id from metadata file name . + * fn returns null if node id is not part of the file name + */ + static public void verifyNoMultipleWriters(List mdFiles, Function> fn) { + Map nodesByPrimaryTermAndGen = new HashMap<>(); + mdFiles.forEach(mdFile -> { + Tuple nodeIdByPrimaryTermAndGen = fn.apply(mdFile); + if (nodeIdByPrimaryTermAndGen != null) { + if (nodesByPrimaryTermAndGen.containsKey(nodeIdByPrimaryTermAndGen.v1()) + && (!nodesByPrimaryTermAndGen.get(nodeIdByPrimaryTermAndGen.v1()).equals(nodeIdByPrimaryTermAndGen.v2()))) { + throw new IllegalStateException( + "Multiple metadata files from different nodes" + + nodeIdByPrimaryTermAndGen.v1() + + " and " + + nodeIdByPrimaryTermAndGen.v2() + + "having same primary term and generations detected" + ); + } + nodesByPrimaryTermAndGen.put(nodeIdByPrimaryTermAndGen.v1(), nodeIdByPrimaryTermAndGen.v2()); + } + }); + } + } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index bc9d839624740..5818b2d866854 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -367,7 +367,8 @@ public IndexShard( @Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher, @Nullable final Store remoteStore, final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory, - final Supplier clusterRemoteTranslogBufferIntervalSupplier + final Supplier clusterRemoteTranslogBufferIntervalSupplier, + final String nodeId ) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); @@ -413,7 +414,7 @@ public IndexShard( logger.debug("state: [CREATED]"); this.checkIndexOnStartup = indexSettings.getValue(IndexSettings.INDEX_CHECK_ON_STARTUP); - this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays); + this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays, nodeId); final String aId = shardRouting.allocationId().getId(); final long primaryTerm = indexSettings.getIndexMetadata().primaryTerm(shardId.id()); this.pendingPrimaryTerm = primaryTerm; @@ -556,6 +557,10 @@ protected RemoteStoreStatsTrackerFactory getRemoteStoreStatsTrackerFactory() { return remoteStoreStatsTrackerFactory; } + public String getNodeId() { + return translogConfig.getNodeId(); + } + @Override public void updateShardState( final ShardRouting newRouting, diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 695c01367171a..698e61f6f7a09 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -346,7 +346,8 @@ void uploadMetadata(Collection localSegmentsPostRefresh, SegmentInfos se segmentInfosSnapshot, storeDirectory, translogFileGeneration, - replicationCheckpoint + replicationCheckpoint, + indexShard.getNodeId() ); } } diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index b23d2d7d0a3f8..21a84f2b8c903 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -25,6 +25,7 @@ import org.apache.lucene.util.Version; import org.opensearch.common.UUIDs; import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; +import org.opensearch.common.collect.Tuple; import org.opensearch.common.io.VersionedCodecStreamWrapper; import org.opensearch.common.logging.Loggers; import org.opensearch.common.lucene.store.ByteArrayIndexInput; @@ -114,6 +115,8 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement private final AtomicLong metadataUploadCounter = new AtomicLong(0); + public static final int METADATA_FILES_TO_FETCH = 10; + public RemoteSegmentStoreDirectory( RemoteDirectory remoteDataDirectory, RemoteDirectory remoteMetadataDirectory, @@ -187,9 +190,11 @@ public RemoteSegmentMetadata readLatestMetadataFile() throws IOException { List metadataFiles = remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder( MetadataFilenameUtils.METADATA_PREFIX, - 1 + METADATA_FILES_TO_FETCH ); + RemoteStoreUtils.verifyNoMultipleWriters(metadataFiles, MetadataFilenameUtils::getNodeIdByPrimaryTermAndGen); + if (metadataFiles.isEmpty() == false) { String latestMetadataFile = metadataFiles.get(0); logger.trace("Reading latest Metadata file {}", latestMetadataFile); @@ -306,12 +311,13 @@ static String getMetadataFilePrefixForCommit(long primaryTerm, long generation) } // Visible for testing - static String getMetadataFilename( + public static String getMetadataFilename( long primaryTerm, long generation, long translogGeneration, long uploadCounter, - int metadataVersion + int metadataVersion, + String nodeId ) { return String.join( SEPARATOR, @@ -320,6 +326,7 @@ static String getMetadataFilename( RemoteStoreUtils.invertLong(generation), RemoteStoreUtils.invertLong(translogGeneration), RemoteStoreUtils.invertLong(uploadCounter), + nodeId, RemoteStoreUtils.invertLong(System.currentTimeMillis()), String.valueOf(metadataVersion) ); @@ -334,6 +341,19 @@ static long getPrimaryTerm(String[] filenameTokens) { static long getGeneration(String[] filenameTokens) { return RemoteStoreUtils.invertLong(filenameTokens[2]); } + + public static Tuple getNodeIdByPrimaryTermAndGen(String filename) { + String[] tokens = filename.split(SEPARATOR); + if (tokens.length < 8) { + // For versions < 2.11, we don't have node id. + return null; + } + String primaryTermAndGen = String.join(SEPARATOR, tokens[1], tokens[2], tokens[3]); + + String nodeId = tokens[5]; + return new Tuple<>(primaryTermAndGen, nodeId); + } + } /** @@ -593,6 +613,7 @@ public boolean containsFile(String localFilename, String checksum) { * @param storeDirectory instance of local directory to temporarily create metadata file before upload * @param translogGeneration translog generation * @param replicationCheckpoint ReplicationCheckpoint of primary shard + * @param nodeId node id * @throws IOException in case of I/O error while uploading the metadata file */ public void uploadMetadata( @@ -600,7 +621,8 @@ public void uploadMetadata( SegmentInfos segmentInfosSnapshot, Directory storeDirectory, long translogGeneration, - ReplicationCheckpoint replicationCheckpoint + ReplicationCheckpoint replicationCheckpoint, + String nodeId ) throws IOException { synchronized (this) { String metadataFilename = MetadataFilenameUtils.getMetadataFilename( @@ -608,7 +630,8 @@ public void uploadMetadata( segmentInfosSnapshot.getGeneration(), translogGeneration, metadataUploadCounter.incrementAndGet(), - RemoteSegmentMetadata.CURRENT_VERSION + RemoteSegmentMetadata.CURRENT_VERSION, + nodeId ); try { try (IndexOutput indexOutput = storeDirectory.createOutput(metadataFilename, IOContext.DEFAULT)) { diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 004d1bfdca36d..29c825fd383c5 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -242,15 +242,10 @@ public static TranslogTransferManager buildTranslogTransferManager( @Override public boolean ensureSynced(Location location) throws IOException { - try { - assert location.generation <= current.getGeneration(); - if (location.generation == current.getGeneration()) { - ensureOpen(); - return prepareAndUpload(primaryTermSupplier.getAsLong(), location.generation); - } - } catch (final Exception ex) { - closeOnTragicEvent(ex); - throw ex; + assert location.generation <= current.getGeneration(); + if (location.generation == current.getGeneration()) { + ensureOpen(); + return prepareAndUpload(primaryTermSupplier.getAsLong(), location.generation); } return false; } @@ -327,7 +322,8 @@ private boolean upload(Long primaryTerm, Long generation) throws IOException { generation, location, readers, - Translog::getCommitCheckpointFileName + Translog::getCommitCheckpointFileName, + config.getNodeId() ).build() ) { return translogTransferManager.transferSnapshot( @@ -354,14 +350,8 @@ private boolean syncToDisk() throws IOException { @Override public void sync() throws IOException { - try { - if (syncToDisk() || syncNeeded()) { - prepareAndUpload(primaryTermSupplier.getAsLong(), null); - } - } catch (final Exception e) { - tragedy.setTragicException(e); - closeOnTragicEvent(e); - throw e; + if (syncToDisk() || syncNeeded()) { + prepareAndUpload(primaryTermSupplier.getAsLong(), null); } } diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogConfig.java b/server/src/main/java/org/opensearch/index/translog/TranslogConfig.java index cac88bee82a73..6e75ebd847b5e 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogConfig.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogConfig.java @@ -56,6 +56,7 @@ public final class TranslogConfig { private final ShardId shardId; private final Path translogPath; private final ByteSizeValue bufferSize; + private final String nodeId; /** * Creates a new TranslogConfig instance @@ -64,16 +65,24 @@ public final class TranslogConfig { * @param indexSettings the index settings used to set internal variables * @param bigArrays a bigArrays instance used for temporarily allocating write operations */ - public TranslogConfig(ShardId shardId, Path translogPath, IndexSettings indexSettings, BigArrays bigArrays) { - this(shardId, translogPath, indexSettings, bigArrays, DEFAULT_BUFFER_SIZE); + public TranslogConfig(ShardId shardId, Path translogPath, IndexSettings indexSettings, BigArrays bigArrays, String nodeId) { + this(shardId, translogPath, indexSettings, bigArrays, DEFAULT_BUFFER_SIZE, nodeId); } - TranslogConfig(ShardId shardId, Path translogPath, IndexSettings indexSettings, BigArrays bigArrays, ByteSizeValue bufferSize) { + TranslogConfig( + ShardId shardId, + Path translogPath, + IndexSettings indexSettings, + BigArrays bigArrays, + ByteSizeValue bufferSize, + String nodeId + ) { this.bufferSize = bufferSize; this.indexSettings = indexSettings; this.shardId = shardId; this.translogPath = translogPath; this.bigArrays = bigArrays; + this.nodeId = nodeId; } /** @@ -110,4 +119,8 @@ public Path getTranslogPath() { public ByteSizeValue getBufferSize() { return bufferSize; } + + public String getNodeId() { + return nodeId; + } } diff --git a/server/src/main/java/org/opensearch/index/translog/TruncateTranslogAction.java b/server/src/main/java/org/opensearch/index/translog/TruncateTranslogAction.java index 0d85123b60c75..25fcdc614172a 100644 --- a/server/src/main/java/org/opensearch/index/translog/TruncateTranslogAction.java +++ b/server/src/main/java/org/opensearch/index/translog/TruncateTranslogAction.java @@ -194,7 +194,8 @@ private boolean isTranslogClean(ShardPath shardPath, ClusterState clusterState, shardPath.getShardId(), translogPath, indexSettings, - BigArrays.NON_RECYCLING_INSTANCE + BigArrays.NON_RECYCLING_INSTANCE, + "" ); long primaryTerm = indexSettings.getIndexMetadata().primaryTerm(shardPath.getShardId().id()); // We open translog to check for corruption, do not clean anything. diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java index 10dec13c81e1a..fb78731246a07 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java @@ -40,11 +40,14 @@ public class TranslogCheckpointTransferSnapshot implements TransferSnapshot, Clo private final long primaryTerm; private long minTranslogGeneration; - TranslogCheckpointTransferSnapshot(long primaryTerm, long generation, int size) { + private String nodeId; + + TranslogCheckpointTransferSnapshot(long primaryTerm, long generation, int size, String nodeId) { translogCheckpointFileInfoTupleSet = new HashSet<>(size); this.size = size; this.generation = generation; this.primaryTerm = primaryTerm; + this.nodeId = nodeId; } private void add(TranslogFileSnapshot translogFileSnapshot, CheckpointFileSnapshot checkPointFileSnapshot) { @@ -63,7 +66,13 @@ public Set getTranslogFileSnapshots() { @Override public TranslogTransferMetadata getTranslogTransferMetadata() { - return new TranslogTransferMetadata(primaryTerm, generation, minTranslogGeneration, translogCheckpointFileInfoTupleSet.size() * 2); + return new TranslogTransferMetadata( + primaryTerm, + generation, + minTranslogGeneration, + translogCheckpointFileInfoTupleSet.size() * 2, + nodeId + ); } @Override @@ -110,19 +119,22 @@ public static class Builder { private final List readers; private final Function checkpointGenFileNameMapper; private final Path location; + private final String nodeId; public Builder( long primaryTerm, long generation, Path location, List readers, - Function checkpointGenFileNameMapper + Function checkpointGenFileNameMapper, + String nodeId ) { this.primaryTerm = primaryTerm; this.generation = generation; this.readers = readers; this.checkpointGenFileNameMapper = checkpointGenFileNameMapper; this.location = location; + this.nodeId = nodeId; } public TranslogCheckpointTransferSnapshot build() throws IOException { @@ -134,7 +146,8 @@ public TranslogCheckpointTransferSnapshot build() throws IOException { TranslogCheckpointTransferSnapshot translogTransferSnapshot = new TranslogCheckpointTransferSnapshot( primaryTerm, generation, - readers.size() + readers.size(), + nodeId ); for (TranslogReader reader : readers) { final long readerGeneration = reader.getGeneration(); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index d7e50cdcf32e4..d988b8a6254ff 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -24,6 +24,7 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.index.remote.RemoteTranslogTransferTracker; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.transfer.listener.TranslogTransferListener; @@ -64,6 +65,8 @@ public class TranslogTransferManager { private static final long TRANSFER_TIMEOUT_IN_MILLIS = 30000; + private static final int METADATA_FILES_TO_FETCH = 10; + private final Logger logger; private final static String METADATA_DIR = "metadata"; private final static String DATA_DIR = "data"; @@ -275,6 +278,10 @@ public TranslogTransferMetadata readMetadata() throws IOException { LatchedActionListener> latchedActionListener = new LatchedActionListener<>( ActionListener.wrap(blobMetadataList -> { if (blobMetadataList.isEmpty()) return; + RemoteStoreUtils.verifyNoMultipleWriters( + blobMetadataList.stream().map(BlobMetadata::name).collect(Collectors.toList()), + TranslogTransferMetadata::getNodeIdByPrimaryTermAndGen + ); String filename = blobMetadataList.get(0).name(); boolean downloadStatus = false; long downloadStartTime = System.nanoTime(), bytesToRead = 0; @@ -295,6 +302,9 @@ public TranslogTransferMetadata readMetadata() throws IOException { } } }, e -> { + if (e instanceof RuntimeException) { + throw (RuntimeException) e; + } logger.error(() -> new ParameterizedMessage("Exception while listing metadata files"), e); exceptionSetOnce.set((IOException) e); }), @@ -305,7 +315,7 @@ public TranslogTransferMetadata readMetadata() throws IOException { transferService.listAllInSortedOrder( remoteMetadataTransferPath, TranslogTransferMetadata.METADATA_PREFIX, - 1, + METADATA_FILES_TO_FETCH, latchedActionListener ); latch.await(); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java index a8b3404d3f2ce..07ec9160db1e3 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java @@ -9,6 +9,7 @@ package org.opensearch.index.translog.transfer; import org.opensearch.common.SetOnce; +import org.opensearch.common.collect.Tuple; import org.opensearch.index.remote.RemoteStoreUtils; import java.util.Arrays; @@ -30,7 +31,7 @@ public class TranslogTransferMetadata { private final long minTranslogGeneration; - private int count; + private final int count; private final SetOnce> generationToPrimaryTermMapper = new SetOnce<>(); @@ -46,12 +47,22 @@ public class TranslogTransferMetadata { private final long createdAt; - public TranslogTransferMetadata(long primaryTerm, long generation, long minTranslogGeneration, int count) { + private final String nodeId; + + public TranslogTransferMetadata(long primaryTerm, long generation, long minTranslogGeneration, int count, String nodeId) { this.primaryTerm = primaryTerm; this.generation = generation; this.minTranslogGeneration = minTranslogGeneration; this.count = count; this.createdAt = System.currentTimeMillis(); + this.nodeId = nodeId; + } + + /* + Used only at the time of download . Since details are read from content , nodeId is not available + */ + public TranslogTransferMetadata(long primaryTerm, long generation, long minTranslogGeneration, int count) { + this(primaryTerm, generation, minTranslogGeneration, count, ""); } public long getPrimaryTerm() { @@ -89,11 +100,33 @@ public String getFileName() { RemoteStoreUtils.invertLong(primaryTerm), RemoteStoreUtils.invertLong(generation), RemoteStoreUtils.invertLong(createdAt), + nodeId, String.valueOf(CURRENT_VERSION) ) ); } + public static Tuple, String> getNodeIdByPrimaryTermAndGeneration(String filename) { + String[] tokens = filename.split(METADATA_SEPARATOR); + if (tokens.length < 6) { + // For versions < 2.11, we don't have node id + return null; + } + return new Tuple<>(new Tuple<>(RemoteStoreUtils.invertLong(tokens[1]), RemoteStoreUtils.invertLong(tokens[2])), tokens[4]); + } + + public static Tuple getNodeIdByPrimaryTermAndGen(String filename) { + String[] tokens = filename.split(METADATA_SEPARATOR); + if (tokens.length < 6) { + // For versions < 2.11, we don't have node id. + return null; + } + String primaryTermAndGen = String.join(METADATA_SEPARATOR, tokens[1], tokens[2]); + + String nodeId = tokens[4]; + return new Tuple<>(primaryTermAndGen, nodeId); + } + @Override public int hashCode() { return Objects.hash(primaryTerm, generation); diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java index 26c078353d12a..ca2413a057a6b 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java @@ -17,6 +17,7 @@ import org.opensearch.common.settings.Setting; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; +import org.opensearch.repositories.RepositoryException; import org.opensearch.threadpool.ThreadPool; import java.util.ArrayList; @@ -133,10 +134,19 @@ public RepositoriesMetadata updateRepositoriesMetadata(DiscoveryNode joiningNode boolean repositoryAlreadyPresent = false; for (RepositoryMetadata existingRepositoryMetadata : existingRepositories.repositories()) { if (newRepositoryMetadata.name().equals(existingRepositoryMetadata.name())) { - if (newRepositoryMetadata.equalsIgnoreGenerations(existingRepositoryMetadata)) { + try { + // This will help in handling two scenarios - + // 1. When a fresh cluster is formed and a node tries to join the cluster, the repository + // metadata constructed from the node attributes of the joining node will be validated + // against the repository information provided by existing nodes in cluster state. + // 2. It's possible to update repository settings except the restricted ones post the + // creation of a system repository and if a node drops we will need to allow it to join + // even if the non-restricted system repository settings are now different. + repositoriesService.get().ensureValidSystemRepositoryUpdate(newRepositoryMetadata, existingRepositoryMetadata); + newRepositoryMetadata = existingRepositoryMetadata; repositoryAlreadyPresent = true; break; - } else { + } catch (RepositoryException e) { throw new IllegalStateException( "new repository metadata [" + newRepositoryMetadata diff --git a/server/src/main/java/org/opensearch/repositories/RepositoriesService.java b/server/src/main/java/org/opensearch/repositories/RepositoriesService.java index 3d6679b3ef80e..0361a71116a16 100644 --- a/server/src/main/java/org/opensearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/opensearch/repositories/RepositoriesService.java @@ -84,6 +84,7 @@ import java.util.stream.Stream; import static org.opensearch.repositories.blobstore.BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY; +import static org.opensearch.repositories.blobstore.BlobStoreRepository.SYSTEM_REPOSITORY_SETTING; /** * Service responsible for maintaining and providing access to snapshot repositories on nodes. @@ -154,7 +155,7 @@ public RepositoriesService( } /** - * Registers new repository in the cluster + * Registers new repository or updates an existing repository in the cluster *

* This method can be only called on the cluster-manager node. It tries to create a new repository on the master * and if it was successful it adds new repository to cluster metadata. @@ -162,7 +163,7 @@ public RepositoriesService( * @param request register repository request * @param listener register repository listener */ - public void registerRepository(final PutRepositoryRequest request, final ActionListener listener) { + public void registerOrUpdateRepository(final PutRepositoryRequest request, final ActionListener listener) { assert lifecycle.started() : "Trying to register new repository but service is in state [" + lifecycle.state() + "]"; final RepositoryMetadata newRepositoryMetadata = new RepositoryMetadata( @@ -236,14 +237,30 @@ public ClusterState execute(ClusterState currentState) { List repositoriesMetadata = new ArrayList<>(repositories.repositories().size() + 1); for (RepositoryMetadata repositoryMetadata : repositories.repositories()) { - if (repositoryMetadata.name().equals(newRepositoryMetadata.name())) { - if (newRepositoryMetadata.equalsIgnoreGenerations(repositoryMetadata)) { + RepositoryMetadata updatedRepositoryMetadata = newRepositoryMetadata; + if (isSystemRepositorySettingPresent(repositoryMetadata.settings())) { + Settings updatedSettings = Settings.builder() + .put(newRepositoryMetadata.settings()) + .put(SYSTEM_REPOSITORY_SETTING.getKey(), true) + .build(); + updatedRepositoryMetadata = new RepositoryMetadata( + newRepositoryMetadata.name(), + newRepositoryMetadata.type(), + updatedSettings, + newRepositoryMetadata.cryptoMetadata() + ); + } + if (repositoryMetadata.name().equals(updatedRepositoryMetadata.name())) { + if (updatedRepositoryMetadata.equalsIgnoreGenerations(repositoryMetadata)) { // Previous version is the same as this one no update is needed. return currentState; } ensureCryptoSettingsAreSame(repositoryMetadata, request); found = true; - repositoriesMetadata.add(newRepositoryMetadata); + if (isSystemRepositorySettingPresent(repositoryMetadata.settings())) { + ensureValidSystemRepositoryUpdate(updatedRepositoryMetadata, repositoryMetadata); + } + repositoriesMetadata.add(updatedRepositoryMetadata); } else { repositoriesMetadata.add(repositoryMetadata); } @@ -315,6 +332,7 @@ public ClusterState execute(ClusterState currentState) { for (RepositoryMetadata repositoryMetadata : repositories.repositories()) { if (Regex.simpleMatch(request.name(), repositoryMetadata.name())) { ensureRepositoryNotInUse(currentState, repositoryMetadata.name()); + ensureNotSystemRepository(repositoryMetadata); logger.info("delete repository [{}]", repositoryMetadata.name()); changed = true; } else { @@ -682,6 +700,15 @@ public static void validateRepositoryMetadataSettings( + minVersionInCluster ); } + // Validation to not allow users to create system repository via put repository call. + if (isSystemRepositorySettingPresent(repositoryMetadataSettings)) { + throw new RepositoryException( + repositoryName, + "setting " + + SYSTEM_REPOSITORY_SETTING.getKey() + + " cannot provide system repository setting; this setting is managed by OpenSearch" + ); + } } private static void ensureRepositoryNotInUse(ClusterState clusterState, String repository) { @@ -756,6 +783,65 @@ public void updateRepositoriesMap(Map repos) { } } + private static void ensureNotSystemRepository(RepositoryMetadata repositoryMetadata) { + if (isSystemRepositorySettingPresent(repositoryMetadata.settings())) { + throw new RepositoryException(repositoryMetadata.name(), "cannot delete a system repository"); + } + } + + private static boolean isSystemRepositorySettingPresent(Settings repositoryMetadataSettings) { + return SYSTEM_REPOSITORY_SETTING.get(repositoryMetadataSettings); + } + + private static boolean isValueEqual(String key, String newValue, String currentValue) { + if (newValue == null && currentValue == null) { + return true; + } + if (newValue == null) { + throw new IllegalArgumentException("[" + key + "] cannot be empty, " + "current value [" + currentValue + "]"); + } + if (newValue.equals(currentValue) == false) { + throw new IllegalArgumentException( + "trying to modify an unmodifiable attribute " + + key + + " of system repository from " + + "current value [" + + currentValue + + "] to new value [" + + newValue + + "]" + ); + } + return true; + } + + public void ensureValidSystemRepositoryUpdate(RepositoryMetadata newRepositoryMetadata, RepositoryMetadata currentRepositoryMetadata) { + if (isSystemRepositorySettingPresent(currentRepositoryMetadata.settings())) { + try { + isValueEqual("type", newRepositoryMetadata.type(), currentRepositoryMetadata.type()); + + Repository repository = repositories.get(currentRepositoryMetadata.name()); + Settings newRepositoryMetadataSettings = newRepositoryMetadata.settings(); + Settings currentRepositoryMetadataSettings = currentRepositoryMetadata.settings(); + + List restrictedSettings = repository.getRestrictedSystemRepositorySettings() + .stream() + .map(setting -> setting.getKey()) + .collect(Collectors.toList()); + + for (String restrictedSettingKey : restrictedSettings) { + isValueEqual( + restrictedSettingKey, + newRepositoryMetadataSettings.get(restrictedSettingKey), + currentRepositoryMetadataSettings.get(restrictedSettingKey) + ); + } + } catch (IllegalArgumentException e) { + throw new RepositoryException(currentRepositoryMetadata.name(), e.getMessage()); + } + } + } + @Override protected void doStart() { diff --git a/server/src/main/java/org/opensearch/repositories/Repository.java b/server/src/main/java/org/opensearch/repositories/Repository.java index 10f3dc2b6b340..02ba60cb23c4d 100644 --- a/server/src/main/java/org/opensearch/repositories/Repository.java +++ b/server/src/main/java/org/opensearch/repositories/Repository.java @@ -42,6 +42,7 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.Nullable; import org.opensearch.common.lifecycle.LifecycleComponent; +import org.opensearch.common.settings.Setting; import org.opensearch.core.action.ActionListener; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.mapper.MapperService; @@ -55,6 +56,8 @@ import java.io.IOException; import java.util.Collection; +import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.function.Consumer; import java.util.function.Function; @@ -342,6 +345,14 @@ void restoreShard( ActionListener listener ); + /** + * Returns the list of restricted system repository settings that cannot be mutated post repository creation. + * @return list of settings + */ + default List> getRestrictedSystemRepositorySettings() { + return Collections.emptyList(); + } + /** * Returns Snapshot Shard Metadata for remote store interop enabled snapshot. *

diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 490ebda24bf60..3481e43cf4c72 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -149,6 +149,7 @@ import java.io.InputStream; import java.nio.file.NoSuchFileException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -3138,6 +3139,11 @@ public InputStream maybeRateLimitSnapshots(InputStream stream) { return maybeRateLimit(stream, () -> snapshotRateLimiter, snapshotRateLimitingTimeInNanos, BlobStoreTransferContext.SNAPSHOT); } + @Override + public List> getRestrictedSystemRepositorySettings() { + return Arrays.asList(SYSTEM_REPOSITORY_SETTING, READONLY_SETTING, REMOTE_STORE_INDEX_SHALLOW_COPY); + } + @Override public RemoteStoreShardShallowCopySnapshot getRemoteStoreShallowCopyShardMetadata( SnapshotId snapshotId, diff --git a/server/src/main/java/org/opensearch/repositories/fs/FsRepository.java b/server/src/main/java/org/opensearch/repositories/fs/FsRepository.java index 3009466f03635..d432bab93bd71 100644 --- a/server/src/main/java/org/opensearch/repositories/fs/FsRepository.java +++ b/server/src/main/java/org/opensearch/repositories/fs/FsRepository.java @@ -50,6 +50,8 @@ import org.opensearch.repositories.blobstore.BlobStoreRepository; import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; import java.util.function.Function; /** @@ -187,4 +189,12 @@ protected ByteSizeValue chunkSize() { public BlobPath basePath() { return basePath; } + + @Override + public List> getRestrictedSystemRepositorySettings() { + List> restrictedSettings = new ArrayList<>(); + restrictedSettings.addAll(super.getRestrictedSystemRepositorySettings()); + restrictedSettings.add(LOCATION_SETTING); + return restrictedSettings; + } } diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java index af2f925f89726..1c25d8c71f948 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java @@ -37,6 +37,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.index.IndexCommit; import org.opensearch.Version; +import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.cluster.ClusterChangedEvent; import org.opensearch.cluster.ClusterStateListener; import org.opensearch.cluster.SnapshotsInProgress; @@ -73,6 +74,7 @@ import org.opensearch.transport.TransportService; import java.io.IOException; +import java.nio.file.NoSuchFileException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -401,18 +403,32 @@ private void snapshot( try { if (remoteStoreIndexShallowCopy && indexShard.indexSettings().isRemoteStoreEnabled()) { long startTime = threadPool.relativeTimeInMillis(); + long primaryTerm = indexShard.getOperationPrimaryTerm(); // we flush first to make sure we get the latest writes snapshotted wrappedSnapshot = indexShard.acquireLastIndexCommitAndRefresh(true); - long primaryTerm = indexShard.getOperationPrimaryTerm(); - final IndexCommit snapshotIndexCommit = wrappedSnapshot.get(); + IndexCommit snapshotIndexCommit = wrappedSnapshot.get(); long commitGeneration = snapshotIndexCommit.getGeneration(); - indexShard.acquireLockOnCommitData(snapshot.getSnapshotId().getUUID(), primaryTerm, commitGeneration); + try { + indexShard.acquireLockOnCommitData(snapshot.getSnapshotId().getUUID(), primaryTerm, commitGeneration); + } catch (NoSuchFileException e) { + wrappedSnapshot.close(); + logger.warn( + "Exception while acquiring lock on primaryTerm = {} and generation = {}", + primaryTerm, + commitGeneration + ); + indexShard.flush(new FlushRequest(shardId.getIndexName()).force(true)); + wrappedSnapshot = indexShard.acquireLastIndexCommit(false); + snapshotIndexCommit = wrappedSnapshot.get(); + commitGeneration = snapshotIndexCommit.getGeneration(); + indexShard.acquireLockOnCommitData(snapshot.getSnapshotId().getUUID(), primaryTerm, commitGeneration); + } try { repository.snapshotRemoteStoreIndexShard( indexShard.store(), snapshot.getSnapshotId(), indexId, - wrappedSnapshot.get(), + snapshotIndexCommit, getShardStateId(indexShard, snapshotIndexCommit), snapshotStatus, primaryTerm, diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableHttpChannel.java b/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableHttpChannel.java index 9229d334dea01..03848e8e58207 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableHttpChannel.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableHttpChannel.java @@ -56,6 +56,13 @@ public static HttpChannel create(HttpChannel delegate, Span span, Tracer tracer) } } + @Override + public void handleException(Exception ex) { + span.addEvent("The HttpChannel was closed without sending the response"); + span.setError(ex); + span.endSpan(); + } + @Override public void close() { delegate.close(); diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/handler/TraceableTransportResponseHandler.java b/server/src/main/java/org/opensearch/telemetry/tracing/handler/TraceableTransportResponseHandler.java index abddfcc6cebc1..538bf82a1dbec 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/handler/TraceableTransportResponseHandler.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/handler/TraceableTransportResponseHandler.java @@ -101,6 +101,7 @@ public void handleRejection(Exception exp) { try (SpanScope scope = tracer.withSpanInScope(span)) { delegate.handleRejection(exp); } finally { + span.setError(exp); span.endSpan(); } } diff --git a/server/src/main/java/org/opensearch/transport/TransportResponseHandler.java b/server/src/main/java/org/opensearch/transport/TransportResponseHandler.java index 90e94e52515ce..8992af18edb48 100644 --- a/server/src/main/java/org/opensearch/transport/TransportResponseHandler.java +++ b/server/src/main/java/org/opensearch/transport/TransportResponseHandler.java @@ -57,7 +57,7 @@ public interface TransportResponseHandler extends W * It should be used to clear up the resources held by the {@link TransportResponseHandler}. * @param exp exception */ - default void handleRejection(Exception exp) {}; + default void handleRejection(Exception exp) {} default TransportResponseHandler wrap(Function converter, Writeable.Reader reader) { final TransportResponseHandler self = this; diff --git a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java index efa13438cffcf..b8bb73bb89a82 100644 --- a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java @@ -4005,7 +4005,7 @@ public void testRecoverFromForeignTranslog() throws IOException { final Path badTranslogLog = createTempDir(); final String badUUID = Translog.createEmptyTranslog(badTranslogLog, SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get()); Translog translog = new LocalTranslog( - new TranslogConfig(shardId, badTranslogLog, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE), + new TranslogConfig(shardId, badTranslogLog, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, ""), badUUID, createTranslogDeletionPolicy(INDEX_SETTINGS), () -> SequenceNumbers.NO_OPS_PERFORMED, @@ -4022,7 +4022,8 @@ public void testRecoverFromForeignTranslog() throws IOException { shardId, translog.location(), config.getIndexSettings(), - BigArrays.NON_RECYCLING_INSTANCE + BigArrays.NON_RECYCLING_INSTANCE, + "" ); EngineConfig brokenConfig = new EngineConfig.Builder().shardId(shardId) @@ -7711,7 +7712,8 @@ public void testNotWarmUpSearcherInEngineCtor() throws Exception { config.getTranslogConfig().getShardId(), createTempDir(), config.getTranslogConfig().getIndexSettings(), - config.getTranslogConfig().getBigArrays() + config.getTranslogConfig().getBigArrays(), + "" ); EngineConfig configWithWarmer = new EngineConfig.Builder().shardId(config.getShardId()) .threadPool(config.getThreadPool()) diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureServiceTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureServiceTests.java index ada8d9983aa3d..de610083f3327 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureServiceTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureServiceTests.java @@ -58,7 +58,7 @@ public void tearDown() throws Exception { public void testIsSegmentsUploadBackpressureEnabled() { remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(clusterService, Settings.EMPTY); pressureService = new RemoteStorePressureService(clusterService, Settings.EMPTY, remoteStoreStatsTrackerFactory); - assertFalse(pressureService.isSegmentsUploadBackpressureEnabled()); + assertTrue(pressureService.isSegmentsUploadBackpressureEnabled()); Settings newSettings = Settings.builder() .put(RemoteStorePressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), "true") diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureSettingsTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureSettingsTests.java index f5514b8936a2f..064c6c10eba02 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureSettingsTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureSettingsTests.java @@ -48,7 +48,7 @@ public void testGetDefaultSettings() { ); // Check remote refresh segment pressure enabled is false - assertFalse(pressureSettings.isRemoteRefreshSegmentPressureEnabled()); + assertTrue(pressureSettings.isRemoteRefreshSegmentPressureEnabled()); // Check bytes lag variance threshold default value assertEquals(10.0, pressureSettings.getBytesLagVarianceFactor(), 0.0d); diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteStoreUtilsTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteStoreUtilsTests.java index 9afa75dd601b2..181c846d394a0 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteStoreUtilsTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteStoreUtilsTests.java @@ -8,10 +8,85 @@ package org.opensearch.index.remote; +import org.opensearch.common.blobstore.BlobMetadata; +import org.opensearch.common.blobstore.support.PlainBlobMetadata; +import org.opensearch.index.store.RemoteSegmentStoreDirectory; +import org.opensearch.index.translog.transfer.TranslogTransferMetadata; import org.opensearch.test.OpenSearchTestCase; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.stream.Collectors; + +import static org.opensearch.index.remote.RemoteStoreUtils.verifyNoMultipleWriters; +import static org.opensearch.index.store.RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX; +import static org.opensearch.index.store.RemoteSegmentStoreDirectory.MetadataFilenameUtils.SEPARATOR; +import static org.opensearch.index.translog.transfer.TranslogTransferMetadata.METADATA_SEPARATOR; + public class RemoteStoreUtilsTests extends OpenSearchTestCase { + private final String metadataFilename = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename( + 12, + 23, + 34, + 1, + 1, + "node-1" + ); + + private final String metadataFilenameDup = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename( + 12, + 23, + 34, + 2, + 1, + "node-2" + ); + private final String metadataFilename2 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename( + 12, + 13, + 34, + 1, + 1, + "node-1" + ); + + private final String oldMetadataFilename = getOldSegmentMetadataFilename(12, 23, 34, 1, 1); + + /* + Gives segment metadata filename for <2.11 version + */ + public static String getOldSegmentMetadataFilename( + long primaryTerm, + long generation, + long translogGeneration, + long uploadCounter, + int metadataVersion + ) { + return String.join( + SEPARATOR, + METADATA_PREFIX, + RemoteStoreUtils.invertLong(primaryTerm), + RemoteStoreUtils.invertLong(generation), + RemoteStoreUtils.invertLong(translogGeneration), + RemoteStoreUtils.invertLong(uploadCounter), + RemoteStoreUtils.invertLong(System.currentTimeMillis()), + String.valueOf(metadataVersion) + ); + } + + public static String getOldTranslogMetadataFilename(long primaryTerm, long generation, int metadataVersion) { + return String.join( + METADATA_SEPARATOR, + METADATA_PREFIX, + RemoteStoreUtils.invertLong(primaryTerm), + RemoteStoreUtils.invertLong(generation), + RemoteStoreUtils.invertLong(System.currentTimeMillis()), + String.valueOf(metadataVersion) + ); + } + public void testInvertToStrInvalid() { assertThrows(IllegalArgumentException.class, () -> RemoteStoreUtils.invertLong(-1)); } @@ -60,4 +135,48 @@ public void testGetSegmentNameUnderscoreDelimiterOverrides() { public void testGetSegmentNameException() { assertThrows(IllegalArgumentException.class, () -> RemoteStoreUtils.getSegmentName("dvd")); } + + public void testVerifyMultipleWriters_Segment() { + List mdFiles = new ArrayList<>(); + mdFiles.add(metadataFilename); + mdFiles.add(metadataFilename2); + mdFiles.add(oldMetadataFilename); + verifyNoMultipleWriters(mdFiles, RemoteSegmentStoreDirectory.MetadataFilenameUtils::getNodeIdByPrimaryTermAndGen); + + mdFiles.add(metadataFilenameDup); + assertThrows( + IllegalStateException.class, + () -> verifyNoMultipleWriters(mdFiles, RemoteSegmentStoreDirectory.MetadataFilenameUtils::getNodeIdByPrimaryTermAndGen) + ); + } + + public void testVerifyMultipleWriters_Translog() throws InterruptedException { + TranslogTransferMetadata tm = new TranslogTransferMetadata(1, 1, 1, 2, "node-1"); + String mdFilename = tm.getFileName(); + Thread.sleep(1); + TranslogTransferMetadata tm2 = new TranslogTransferMetadata(1, 1, 1, 2, "node-1"); + String mdFilename2 = tm2.getFileName(); + List bmList = new LinkedList<>(); + bmList.add(new PlainBlobMetadata(mdFilename, 1)); + bmList.add(new PlainBlobMetadata(mdFilename2, 1)); + bmList.add(new PlainBlobMetadata(getOldTranslogMetadataFilename(1, 1, 1), 1)); + RemoteStoreUtils.verifyNoMultipleWriters( + bmList.stream().map(BlobMetadata::name).collect(Collectors.toList()), + TranslogTransferMetadata::getNodeIdByPrimaryTermAndGen + ); + + bmList = new LinkedList<>(); + bmList.add(new PlainBlobMetadata(mdFilename, 1)); + TranslogTransferMetadata tm3 = new TranslogTransferMetadata(1, 1, 1, 2, "node-2"); + bmList.add(new PlainBlobMetadata(tm3.getFileName(), 1)); + List finalBmList = bmList; + assertThrows( + IllegalStateException.class, + () -> RemoteStoreUtils.verifyNoMultipleWriters( + finalBmList.stream().map(BlobMetadata::name).collect(Collectors.toList()), + TranslogTransferMetadata::getNodeIdByPrimaryTermAndGen + ) + ); + } + } diff --git a/server/src/test/java/org/opensearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/opensearch/index/shard/RefreshListenersTests.java index afe6e47bec7b2..a45b25f04060b 100644 --- a/server/src/test/java/org/opensearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RefreshListenersTests.java @@ -133,7 +133,8 @@ public void setupListeners() throws Exception { shardId, createTempDir("translog"), indexSettings, - BigArrays.NON_RECYCLING_INSTANCE + BigArrays.NON_RECYCLING_INSTANCE, + "" ); Engine.EventListener eventListener = new Engine.EventListener() { @Override diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 415efd4ac23b6..5a13f57db2c87 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -46,6 +46,7 @@ import java.util.concurrent.atomic.AtomicLong; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE; +import static org.opensearch.index.store.RemoteSegmentStoreDirectory.METADATA_FILES_TO_FETCH; import static org.opensearch.test.RemoteStoreTestUtils.createMetadataFileBytes; import static org.opensearch.test.RemoteStoreTestUtils.getDummyMetadata; import static org.mockito.ArgumentMatchers.any; @@ -138,7 +139,8 @@ public void testRemoteDirectoryInitThrowsException() throws IOException { return Collections.singletonList("dummy string"); } throw new IOException(); - }).when(remoteMetadataDirectory).listFilesByPrefixInLexicographicOrder(MetadataFilenameUtils.METADATA_PREFIX, 1); + }).when(remoteMetadataDirectory) + .listFilesByPrefixInLexicographicOrder(MetadataFilenameUtils.METADATA_PREFIX, METADATA_FILES_TO_FETCH); SegmentInfos segmentInfos; try (Store indexShardStore = indexShard.store()) { @@ -166,7 +168,10 @@ public void testRemoteDirectoryInitThrowsException() throws IOException { // Validate that the stream of metadata file of remoteMetadataDirectory has been opened only once and the // listFilesByPrefixInLexicographicOrder has been called twice. verify(remoteMetadataDirectory, times(1)).getBlobStream(any()); - verify(remoteMetadataDirectory, times(2)).listFilesByPrefixInLexicographicOrder(MetadataFilenameUtils.METADATA_PREFIX, 1); + verify(remoteMetadataDirectory, times(2)).listFilesByPrefixInLexicographicOrder( + MetadataFilenameUtils.METADATA_PREFIX, + METADATA_FILES_TO_FETCH + ); } public void testAfterRefresh() throws IOException { @@ -579,4 +584,5 @@ private void verifyUploadedSegments(RemoteSegmentStoreDirectory remoteSegmentSto } } } + } diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java index d7bbe52aa3905..cad5e47531cc6 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java @@ -35,6 +35,7 @@ import org.mockito.ArgumentCaptor; +import static org.opensearch.index.store.RemoteSegmentStoreDirectory.METADATA_FILES_TO_FETCH; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; @@ -78,7 +79,12 @@ public void testNewDirectory() throws IOException { latchedActionListener.onResponse(List.of()); return null; }).when(blobContainer) - .listBlobsByPrefixInSortedOrder(any(), eq(1), eq(BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC), any(ActionListener.class)); + .listBlobsByPrefixInSortedOrder( + any(), + eq(METADATA_FILES_TO_FETCH), + eq(BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC), + any(ActionListener.class) + ); when(repositoriesService.repository("remote_store_repository")).thenReturn(repository); @@ -93,7 +99,7 @@ public void testNewDirectory() throws IOException { verify(blobContainer).listBlobsByPrefixInSortedOrder( eq(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX), - eq(1), + eq(METADATA_FILES_TO_FETCH), eq(BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC), any() ); diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index 8d99d98fbaaf4..f154dddb0e7cc 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -62,6 +62,7 @@ import org.mockito.Mockito; +import static org.opensearch.index.store.RemoteSegmentStoreDirectory.METADATA_FILES_TO_FETCH; import static org.opensearch.test.RemoteStoreTestUtils.createMetadataFileBytes; import static org.opensearch.test.RemoteStoreTestUtils.getDummyMetadata; import static org.hamcrest.CoreMatchers.is; @@ -90,9 +91,39 @@ public class RemoteSegmentStoreDirectoryTests extends IndexShardTestCase { private SegmentInfos segmentInfos; private ThreadPool threadPool; - private final String metadataFilename = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(12, 23, 34, 1, 1); - private final String metadataFilename2 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(12, 13, 34, 1, 1); - private final String metadataFilename3 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(10, 38, 34, 1, 1); + private final String metadataFilename = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename( + 12, + 23, + 34, + 1, + 1, + "node-1" + ); + + private final String metadataFilenameDup = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename( + 12, + 23, + 34, + 2, + 1, + "node-2" + ); + private final String metadataFilename2 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename( + 12, + 13, + 34, + 1, + 1, + "node-1" + ); + private final String metadataFilename3 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename( + 10, + 38, + 34, + 1, + 1, + "node-1" + ); @Before public void setup() throws IOException { @@ -183,7 +214,7 @@ public void testGetPrimaryTermGenerationUuid() { } public void testInitException() throws IOException { - when(remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX, 1)).thenThrow( + when(remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX, METADATA_FILES_TO_FETCH)).thenThrow( new IOException("Error") ); @@ -202,6 +233,13 @@ public void testInitNoMetadataFile() throws IOException { assertEquals(Set.of(), actualCache.keySet()); } + public void testInitMultipleMetadataFile() throws IOException { + when(remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX, METADATA_FILES_TO_FETCH)).thenReturn( + List.of(metadataFilename, metadataFilenameDup) + ); + assertThrows(IllegalStateException.class, () -> remoteSegmentStoreDirectory.init()); + } + private Map> populateMetadata() throws IOException { List metadataFiles = new ArrayList<>(); @@ -212,7 +250,7 @@ private Map> populateMetadata() throws IOException { when( remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder( RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX, - 1 + METADATA_FILES_TO_FETCH ) ).thenReturn(List.of(metadataFilename)); when( @@ -262,7 +300,7 @@ public void testInit() throws IOException { when( remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder( RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX, - 1 + METADATA_FILES_TO_FETCH ) ).thenReturn(List.of(metadataFilename)); @@ -318,7 +356,7 @@ public void testFileLength() throws IOException { assertEquals(uploadedSegments.get("_0.si").getLength(), remoteSegmentStoreDirectory.fileLength("_0.si")); } - public void testFileLenghtNoSuchFile() throws IOException { + public void testFileLengthNoSuchFile() throws IOException { populateMetadata(); remoteSegmentStoreDirectory.init(); @@ -693,7 +731,7 @@ public void testContainsFile() throws IOException { when( remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder( RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX, - 1 + METADATA_FILES_TO_FETCH ) ).thenReturn(metadataFiles); @@ -739,7 +777,8 @@ public void testUploadMetadataEmpty() throws IOException { segmentInfos, storeDirectory, 34L, - indexShard.getLatestReplicationCheckpoint() + indexShard.getLatestReplicationCheckpoint(), + "" ) ); } @@ -757,7 +796,7 @@ public void testUploadMetadataNonEmpty() throws IOException { when( remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder( RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX, - 1 + METADATA_FILES_TO_FETCH ) ).thenReturn(metadataFiles); Map> metadataFilenameContentMapping = Map.of( @@ -785,7 +824,8 @@ public void testUploadMetadataNonEmpty() throws IOException { segInfos, storeDirectory, generation, - indexShard.getLatestReplicationCheckpoint() + indexShard.getLatestReplicationCheckpoint(), + "" ); verify(remoteMetadataDirectory).copyFrom( @@ -832,7 +872,8 @@ public void testUploadMetadataMissingSegment() throws IOException { segmentInfos, storeDirectory, 12L, - indexShard.getLatestReplicationCheckpoint() + indexShard.getLatestReplicationCheckpoint(), + "" ) ); verify(indexOutput).close(); @@ -855,7 +896,7 @@ public void testNoMetadataHeaderCorruptIndexException() throws IOException { when( remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder( RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX, - 1 + METADATA_FILES_TO_FETCH ) ).thenReturn(metadataFiles); @@ -878,7 +919,7 @@ public void testInvalidCodecHeaderCorruptIndexException() throws IOException { when( remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder( RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX, - 1 + METADATA_FILES_TO_FETCH ) ).thenReturn(metadataFiles); @@ -903,7 +944,7 @@ public void testHeaderMinVersionCorruptIndexException() throws IOException { when( remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder( RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX, - 1 + METADATA_FILES_TO_FETCH ) ).thenReturn(metadataFiles); @@ -928,7 +969,7 @@ public void testHeaderMaxVersionCorruptIndexException() throws IOException { when( remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder( RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX, - 1 + METADATA_FILES_TO_FETCH ) ).thenReturn(metadataFiles); @@ -953,7 +994,7 @@ public void testIncorrectChecksumCorruptIndexException() throws IOException { when( remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder( RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX, - 1 + METADATA_FILES_TO_FETCH ) ).thenReturn(metadataFiles); @@ -1114,12 +1155,12 @@ private void indexDocs(int startDocId, int numberOfDocs) throws IOException { } public void testMetadataFileNameOrder() { - String file1 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(15, 21, 23, 1, 1); - String file2 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(15, 38, 38, 1, 1); - String file3 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(18, 12, 26, 1, 1); - String file4 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(15, 38, 32, 10, 1); - String file5 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(15, 38, 32, 1, 1); - String file6 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(15, 38, 32, 5, 1); + String file1 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(15, 21, 23, 1, 1, ""); + String file2 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(15, 38, 38, 1, 1, ""); + String file3 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(18, 12, 26, 1, 1, ""); + String file4 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(15, 38, 32, 10, 1, ""); + String file5 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(15, 38, 32, 1, 1, ""); + String file6 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(15, 38, 32, 5, 1, ""); List actualList = new ArrayList<>(List.of(file1, file2, file3, file4, file5, file6)); actualList.sort(String::compareTo); diff --git a/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java b/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java index 2de36574064cb..c098d11a3487f 100644 --- a/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java @@ -38,7 +38,7 @@ public void testRecoveryFromTranslog() throws IOException { LocalCheckpointTracker tracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED); try { translogManager = new InternalTranslogManager( - new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE), + new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, ""), primaryTerm, globalCheckpoint::get, createTranslogDeletionPolicy(INDEX_SETTINGS), @@ -68,7 +68,7 @@ public void testRecoveryFromTranslog() throws IOException { translogManager.syncTranslog(); translogManager.close(); translogManager = new InternalTranslogManager( - new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE), + new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, ""), primaryTerm, globalCheckpoint::get, createTranslogDeletionPolicy(INDEX_SETTINGS), @@ -117,7 +117,7 @@ public void testTranslogRollsGeneration() throws IOException { LocalCheckpointTracker tracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED); try { translogManager = new InternalTranslogManager( - new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE), + new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, ""), primaryTerm, globalCheckpoint::get, createTranslogDeletionPolicy(INDEX_SETTINGS), @@ -147,7 +147,7 @@ public void testTranslogRollsGeneration() throws IOException { translogManager.syncTranslog(); translogManager.close(); translogManager = new InternalTranslogManager( - new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE), + new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, ""), primaryTerm, globalCheckpoint::get, createTranslogDeletionPolicy(INDEX_SETTINGS), @@ -182,7 +182,7 @@ public void testTrimOperationsFromTranslog() throws IOException { LocalCheckpointTracker tracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED); try { translogManager = new InternalTranslogManager( - new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE), + new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, ""), primaryTerm, globalCheckpoint::get, createTranslogDeletionPolicy(INDEX_SETTINGS), @@ -214,7 +214,7 @@ public void testTrimOperationsFromTranslog() throws IOException { translogManager.close(); translogManager = new InternalTranslogManager( - new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE), + new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, ""), primaryTerm, globalCheckpoint::get, createTranslogDeletionPolicy(INDEX_SETTINGS), @@ -253,7 +253,7 @@ public void testTranslogSync() throws IOException { ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null); AtomicReference translogManagerAtomicReference = new AtomicReference<>(); translogManager = new InternalTranslogManager( - new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE), + new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, ""), primaryTerm, globalCheckpoint::get, createTranslogDeletionPolicy(INDEX_SETTINGS), diff --git a/server/src/test/java/org/opensearch/index/translog/LocalTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/LocalTranslogTests.java index dbfc66d6de4b3..4997067b75198 100644 --- a/server/src/test/java/org/opensearch/index/translog/LocalTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/LocalTranslogTests.java @@ -291,7 +291,7 @@ private TranslogConfig getTranslogConfig(final Path path, final Settings setting ); final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(shardId.getIndex(), settings); - return new TranslogConfig(shardId, path, indexSettings, NON_RECYCLING_INSTANCE, bufferSize); + return new TranslogConfig(shardId, path, indexSettings, NON_RECYCLING_INSTANCE, bufferSize, ""); } private Location addToTranslogAndList(Translog translog, List list, Translog.Operation op) throws IOException { @@ -1452,7 +1452,8 @@ public void testTranslogWriterCanFlushInAddOrReadCall() throws IOException { temp.getTranslogPath(), temp.getIndexSettings(), temp.getBigArrays(), - new ByteSizeValue(1, ByteSizeUnit.KB) + new ByteSizeValue(1, ByteSizeUnit.KB), + "" ); final Set persistedSeqNos = new HashSet<>(); @@ -1550,7 +1551,8 @@ public void testTranslogWriterFsyncedWithLocalTranslog() throws IOException { temp.getTranslogPath(), temp.getIndexSettings(), temp.getBigArrays(), - new ByteSizeValue(1, ByteSizeUnit.KB) + new ByteSizeValue(1, ByteSizeUnit.KB), + "" ); final Set persistedSeqNos = new HashSet<>(); diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java index 233d6f319b797..b2310010620f7 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java @@ -67,6 +67,7 @@ import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.NoSuchFileException; import java.nio.file.Path; @@ -206,7 +207,7 @@ private TranslogConfig getTranslogConfig(final Path path, final Settings setting ); final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(shardId.getIndex(), settings); - return new TranslogConfig(shardId, path, indexSettings, NON_RECYCLING_INSTANCE, bufferSize); + return new TranslogConfig(shardId, path, indexSettings, NON_RECYCLING_INSTANCE, bufferSize, ""); } private BlobStoreRepository createRepository() { @@ -1049,7 +1050,7 @@ public void testSyncUpTo() throws IOException { } } - public void testSyncUpFailure() throws IOException { + public void testSyncUpLocationFailure() throws IOException { int translogOperations = randomIntBetween(1, 20); int count = 0; fail.failAlways(); @@ -1101,6 +1102,26 @@ public void testSyncUpFailure() throws IOException { assertDownloadStatsNoDownloads(statsTracker); } + public void testSyncUpAlwaysFailure() throws IOException { + int translogOperations = randomIntBetween(1, 20); + int count = 0; + fail.failAlways(); + for (int op = 0; op < translogOperations; op++) { + translog.add( + new Translog.Index(String.valueOf(op), count, primaryTerm.get(), Integer.toString(count).getBytes(StandardCharsets.UTF_8)) + ); + try { + translog.sync(); + fail("io exception expected"); + } catch (IOException e) { + assertTrue("at least one operation pending", translog.syncNeeded()); + } + } + assertTrue(translog.isOpen()); + fail.failNever(); + translog.sync(); + } + public void testSyncUpToStream() throws IOException { int iters = randomIntBetween(5, 10); for (int i = 0; i < iters; i++) { @@ -1258,7 +1279,8 @@ public void testTranslogWriterCanFlushInAddOrReadCall() throws IOException { temp.getTranslogPath(), temp.getIndexSettings(), temp.getBigArrays(), - new ByteSizeValue(1, ByteSizeUnit.KB) + new ByteSizeValue(1, ByteSizeUnit.KB), + "" ); final Set persistedSeqNos = new HashSet<>(); @@ -1360,7 +1382,8 @@ public void testTranslogWriterFsyncDisabledInRemoteFsTranslog() throws IOExcepti temp.getTranslogPath(), temp.getIndexSettings(), temp.getBigArrays(), - new ByteSizeValue(1, ByteSizeUnit.KB) + new ByteSizeValue(1, ByteSizeUnit.KB), + "" ); final Set persistedSeqNos = new HashSet<>(); diff --git a/server/src/test/java/org/opensearch/index/translog/TranslogManagerTestCase.java b/server/src/test/java/org/opensearch/index/translog/TranslogManagerTestCase.java index 43b4d2c9847ab..e17d2770f014a 100644 --- a/server/src/test/java/org/opensearch/index/translog/TranslogManagerTestCase.java +++ b/server/src/test/java/org/opensearch/index/translog/TranslogManagerTestCase.java @@ -74,7 +74,7 @@ protected Translog createTranslog(LongSupplier primaryTermSupplier) throws IOExc } protected Translog createTranslog(Path translogPath, LongSupplier primaryTermSupplier) throws IOException { - TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE); + TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, ""); String translogUUID = Translog.createEmptyTranslog( translogPath, SequenceNumbers.NO_OPS_PERFORMED, diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java index 6fc4557a75675..a48dbdcdacb71 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -16,6 +16,7 @@ import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.blobstore.support.PlainBlobMetadata; +import org.opensearch.common.collect.Tuple; import org.opensearch.core.action.ActionListener; import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; @@ -251,8 +252,8 @@ public void testReadMetadataNoFile() throws IOException { assertNoDownloadStats(false); } - // This should happen most of the time - Just a single metadata file - public void testReadMetadataSingleFile() throws IOException { + // This should happen most of the time - + public void testReadMetadataFile() throws IOException { TranslogTransferManager translogTransferManager = new TranslogTransferManager( shardId, transferService, @@ -260,12 +261,16 @@ public void testReadMetadataSingleFile() throws IOException { null, remoteTranslogTransferTracker ); - TranslogTransferMetadata tm = new TranslogTransferMetadata(1, 1, 1, 2); - String mdFilename = tm.getFileName(); + TranslogTransferMetadata metadata1 = new TranslogTransferMetadata(1, 1, 1, 2); + String mdFilename1 = metadata1.getFileName(); + + TranslogTransferMetadata metadata2 = new TranslogTransferMetadata(1, 0, 1, 2); + String mdFilename2 = metadata2.getFileName(); doAnswer(invocation -> { LatchedActionListener> latchedActionListener = invocation.getArgument(3); List bmList = new LinkedList<>(); - bmList.add(new PlainBlobMetadata(mdFilename, 1)); + bmList.add(new PlainBlobMetadata(mdFilename1, 1)); + bmList.add(new PlainBlobMetadata(mdFilename2, 1)); latchedActionListener.onResponse(bmList); return null; }).when(transferService) @@ -273,7 +278,7 @@ public void testReadMetadataSingleFile() throws IOException { TranslogTransferMetadata metadata = createTransferSnapshot().getTranslogTransferMetadata(); long delayForMdDownload = 1; - when(transferService.downloadBlob(any(BlobPath.class), eq(mdFilename))).thenAnswer(invocation -> { + when(transferService.downloadBlob(any(BlobPath.class), eq(mdFilename1))).thenAnswer(invocation -> { Thread.sleep(delayForMdDownload); return new ByteArrayInputStream(translogTransferManager.getMetadataBytes(metadata)); }); @@ -496,4 +501,36 @@ private void assertTlogCkpDownloadStats() { // Expect delay for both tlog and ckp file assertTrue(remoteTranslogTransferTracker.getTotalDownloadTimeInMillis() >= 2 * delayForBlobDownload); } + + public void testGetPrimaryTermAndGeneration() { + String tm = new TranslogTransferMetadata(1, 2, 1, 2, "node-1").getFileName(); + assertEquals(new Tuple<>(new Tuple<>(1L, 2L), "node-1"), TranslogTransferMetadata.getNodeIdByPrimaryTermAndGeneration(tm)); + } + + public void testMetadataConflict() throws InterruptedException { + TranslogTransferManager translogTransferManager = new TranslogTransferManager( + shardId, + transferService, + remoteBaseTransferPath, + null, + remoteTranslogTransferTracker + ); + TranslogTransferMetadata tm = new TranslogTransferMetadata(1, 1, 1, 2, "node-1"); + String mdFilename = tm.getFileName(); + Thread.sleep(1); + TranslogTransferMetadata tm2 = new TranslogTransferMetadata(1, 1, 1, 2, "node-2"); + String mdFilename2 = tm2.getFileName(); + + doAnswer(invocation -> { + LatchedActionListener> latchedActionListener = invocation.getArgument(3); + List bmList = new LinkedList<>(); + bmList.add(new PlainBlobMetadata(mdFilename, 1)); + bmList.add(new PlainBlobMetadata(mdFilename2, 1)); + latchedActionListener.onResponse(bmList); + return null; + }).when(transferService) + .listAllInSortedOrder(any(BlobPath.class), eq(TranslogTransferMetadata.METADATA_PREFIX), anyInt(), any(ActionListener.class)); + + assertThrows(RuntimeException.class, translogTransferManager::readMetadata); + } } diff --git a/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java index 889d0dc6ddb14..cc4f4f7b6413d 100644 --- a/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java @@ -94,6 +94,7 @@ import java.util.function.Consumer; import java.util.function.Function; +import static org.opensearch.repositories.blobstore.BlobStoreRepository.SYSTEM_REPOSITORY_SETTING; import static org.hamcrest.Matchers.equalTo; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; @@ -201,6 +202,13 @@ public void testRegisterRejectsInvalidRepositoryNames() { } } + public void testUpdateOrRegisterRejectsForSystemRepository() { + String repoName = "name"; + PutRepositoryRequest request = new PutRepositoryRequest(repoName); + request.settings(Settings.builder().put(SYSTEM_REPOSITORY_SETTING.getKey(), true).build()); + expectThrows(RepositoryException.class, () -> repositoriesService.registerOrUpdateRepository(request, null)); + } + public void testRepositoriesStatsCanHaveTheSameNameAndDifferentTypeOverTime() { String repoName = "name"; expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(repoName)); @@ -310,22 +318,22 @@ public void testRepositoryUpdateWithDifferentCryptoMetadata() { assertNotNull(repository.cryptoHandler); assertEquals(kpTypeA, repository.cryptoHandler.kpType); - expectThrows(IllegalArgumentException.class, () -> repositoriesService.registerRepository(request, null)); + expectThrows(IllegalArgumentException.class, () -> repositoriesService.registerOrUpdateRepository(request, null)); CryptoSettings cryptoSettings = new CryptoSettings(keyProviderName); cryptoSettings.keyProviderType(kpTypeA); cryptoSettings.settings(Settings.builder().put("key-1", "val-1")); request.cryptoSettings(cryptoSettings); - expectThrows(IllegalArgumentException.class, () -> repositoriesService.registerRepository(request, null)); + expectThrows(IllegalArgumentException.class, () -> repositoriesService.registerOrUpdateRepository(request, null)); cryptoSettings.settings(Settings.builder()); cryptoSettings.keyProviderName("random"); - expectThrows(IllegalArgumentException.class, () -> repositoriesService.registerRepository(request, null)); + expectThrows(IllegalArgumentException.class, () -> repositoriesService.registerOrUpdateRepository(request, null)); cryptoSettings.keyProviderName(keyProviderName); assertEquals(kpTypeA, repository.cryptoHandler.kpType); - repositoriesService.registerRepository(request, null); + repositoriesService.registerOrUpdateRepository(request, null); } public void testCryptoManagerClusterStateChanges() { @@ -355,7 +363,7 @@ public void testCryptoManagerClusterStateChanges() { verified, repositoryMetadata ); - repositoriesService.registerRepository(request, null); + repositoriesService.registerOrUpdateRepository(request, null); MeteredRepositoryTypeA repository = (MeteredRepositoryTypeA) repositoriesService.repository(repoName); assertNotNull(repository.cryptoHandler); assertEquals(kpTypeA, repository.cryptoHandler.kpType); @@ -375,7 +383,7 @@ public void testCryptoManagerClusterStateChanges() { verified, repositoryMetadata ); - repositoriesService.registerRepository(request, null); + repositoriesService.registerOrUpdateRepository(request, null); repository = (MeteredRepositoryTypeA) repositoriesService.repository(repoName); assertNotNull(repository.cryptoHandler); @@ -397,7 +405,7 @@ public void testCryptoManagerClusterStateChanges() { verified, repositoryMetadata ); - repositoriesService.registerRepository(request, null); + repositoriesService.registerOrUpdateRepository(request, null); repository = (MeteredRepositoryTypeA) repositoriesService.repository(repoName); assertNotNull(repository.cryptoHandler); assertEquals(kpTypeA, repository.cryptoHandler.kpType); @@ -418,7 +426,7 @@ public void testCryptoManagerClusterStateChanges() { verified, repositoryMetadata ); - repositoriesService.registerRepository(request, null); + repositoriesService.registerOrUpdateRepository(request, null); repository = (MeteredRepositoryTypeA) repositoriesService.repository(repoName); assertNotNull(repository.cryptoHandler); assertEquals(kpTypeB, repository.cryptoHandler.kpType); @@ -530,7 +538,7 @@ private ClusterState emptyState() { private void assertThrowsOnRegister(String repoName) { PutRepositoryRequest request = new PutRepositoryRequest(repoName); - expectThrows(RepositoryException.class, () -> repositoriesService.registerRepository(request, null)); + expectThrows(RepositoryException.class, () -> repositoriesService.registerOrUpdateRepository(request, null)); } private static class TestCryptoProvider implements CryptoHandler { diff --git a/server/src/test/java/org/opensearch/repositories/fs/FsRepositoryTests.java b/server/src/test/java/org/opensearch/repositories/fs/FsRepositoryTests.java index 303f60283f69f..d9f599714805b 100644 --- a/server/src/test/java/org/opensearch/repositories/fs/FsRepositoryTests.java +++ b/server/src/test/java/org/opensearch/repositories/fs/FsRepositoryTests.java @@ -58,6 +58,7 @@ import org.opensearch.cluster.routing.UnassignedInfo; import org.opensearch.common.lucene.Lucene; import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.index.shard.ShardId; @@ -70,6 +71,7 @@ import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.repositories.IndexId; +import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.repositories.blobstore.BlobStoreTestUtil; import org.opensearch.snapshots.Snapshot; import org.opensearch.snapshots.SnapshotId; @@ -90,6 +92,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; +import static org.hamcrest.Matchers.is; public class FsRepositoryTests extends OpenSearchTestCase { @@ -218,6 +221,31 @@ public void testSnapshotAndRestore() throws IOException, InterruptedException { } } + public void testRestrictedSettingsDefault() { + Path repo = createTempDir(); + Settings settings = Settings.builder() + .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toAbsolutePath()) + .put(Environment.PATH_REPO_SETTING.getKey(), repo.toAbsolutePath()) + .put("location", repo) + .put(FsRepository.BASE_PATH_SETTING.getKey(), "my_base_path") + .build(); + RepositoryMetadata metadata = new RepositoryMetadata("test", "fs", settings); + FsRepository repository = new FsRepository( + metadata, + new Environment(settings, null), + NamedXContentRegistry.EMPTY, + BlobStoreTestUtil.mockClusterService(), + new RecoverySettings(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)) + ); + + List> restrictedSettings = repository.getRestrictedSystemRepositorySettings(); + assertThat(restrictedSettings.size(), is(4)); + assertTrue(restrictedSettings.contains(BlobStoreRepository.SYSTEM_REPOSITORY_SETTING)); + assertTrue(restrictedSettings.contains(BlobStoreRepository.READONLY_SETTING)); + assertTrue(restrictedSettings.contains(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY)); + assertTrue(restrictedSettings.contains(FsRepository.LOCATION_SETTING)); + } + private void runGeneric(ThreadPool threadPool, Runnable runnable) throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); threadPool.generic().submit(() -> { diff --git a/test/fixtures/hdfs-fixture/build.gradle b/test/fixtures/hdfs-fixture/build.gradle index 4aad96eddaeb5..7149efe27d694 100644 --- a/test/fixtures/hdfs-fixture/build.gradle +++ b/test/fixtures/hdfs-fixture/build.gradle @@ -72,5 +72,5 @@ dependencies { exclude group: "com.squareup.okio" } runtimeOnly "com.squareup.okio:okio:3.5.0" - runtimeOnly "org.xerial.snappy:snappy-java:1.1.10.4" + runtimeOnly "org.xerial.snappy:snappy-java:1.1.10.5" } diff --git a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java index 15f9ee546fe6b..43289a7c89524 100644 --- a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java @@ -527,7 +527,7 @@ protected Translog createTranslog(LongSupplier primaryTermSupplier) throws IOExc } protected Translog createTranslog(Path translogPath, LongSupplier primaryTermSupplier) throws IOException { - TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE); + TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, ""); String translogUUID = Translog.createEmptyTranslog( translogPath, SequenceNumbers.NO_OPS_PERFORMED, @@ -872,7 +872,13 @@ public EngineConfig config( final Engine.EventListener eventListener ) { final IndexWriterConfig iwc = newIndexWriterConfig(); - final TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); + final TranslogConfig translogConfig = new TranslogConfig( + shardId, + translogPath, + indexSettings, + BigArrays.NON_RECYCLING_INSTANCE, + "" + ); final List extRefreshListenerList = externalRefreshListener == null ? emptyList() : Collections.singletonList(externalRefreshListener); @@ -939,7 +945,7 @@ protected EngineConfig config( .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) .build() ); - TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); + TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE, ""); return new EngineConfig.Builder().shardId(config.getShardId()) .threadPool(config.getThreadPool()) .indexSettings(indexSettings) diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 4505570b8ebe0..466c00d0648dc 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -697,7 +697,8 @@ protected IndexShard newShard( checkpointPublisher, remoteStore, remoteStoreStatsTrackerFactory, - () -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL + () -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL, + "dummy-node" ); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); if (remoteStoreStatsTrackerFactory != null) { diff --git a/test/framework/src/main/java/org/opensearch/test/TestCluster.java b/test/framework/src/main/java/org/opensearch/test/TestCluster.java index 61742cd4fb827..a9d9abf69ee41 100644 --- a/test/framework/src/main/java/org/opensearch/test/TestCluster.java +++ b/test/framework/src/main/java/org/opensearch/test/TestCluster.java @@ -42,10 +42,12 @@ import org.opensearch.client.Client; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexTemplateMetadata; +import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.index.IndexNotFoundException; import org.opensearch.indices.IndexTemplateMissingException; import org.opensearch.repositories.RepositoryMissingException; +import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.test.hamcrest.OpenSearchAssertions; import java.io.Closeable; @@ -253,7 +255,18 @@ public void wipeRepositories(String... repositories) { } for (String repository : repositories) { try { - client().admin().cluster().prepareDeleteRepository(repository).execute().actionGet(); + List repositoryMetadata = client().admin() + .cluster() + .prepareGetRepositories(repository) + .execute() + .actionGet() + .repositories(); + if (repositoryMetadata.isEmpty() == false + && BlobStoreRepository.SYSTEM_REPOSITORY_SETTING.get(repositoryMetadata.get(0).settings()) == true) { + client().admin().cluster().prepareCleanupRepository(repository).execute().actionGet(); + } else { + client().admin().cluster().prepareDeleteRepository(repository).execute().actionGet(); + } } catch (RepositoryMissingException ex) { // ignore }