diff --git a/.gitignore b/.gitignore index 2151e666ea209..03e2446ec4f13 100644 --- a/.gitignore +++ b/.gitignore @@ -12,7 +12,7 @@ out/ !.idea/eclipseCodeFormatter.xml !.idea/externalDependencies.xml !.idea/inspectionProfiles/Project_Default.xml -!.idea/runConfigurations/Debug_Elasticsearch.xml +!.idea/runConfigurations/ !.idea/scopes/x_pack.xml # These files are generated in the main tree by IntelliJ diff --git a/.idea/runConfigurations/Debug_Elasticsearch__node_2_.xml b/.idea/runConfigurations/Debug_Elasticsearch__node_2_.xml new file mode 100644 index 0000000000000..94bb079398ffd --- /dev/null +++ b/.idea/runConfigurations/Debug_Elasticsearch__node_2_.xml @@ -0,0 +1,11 @@ + + + + \ No newline at end of file diff --git a/.idea/runConfigurations/Debug_Elasticsearch__node_3_.xml b/.idea/runConfigurations/Debug_Elasticsearch__node_3_.xml new file mode 100644 index 0000000000000..aaef20fec729b --- /dev/null +++ b/.idea/runConfigurations/Debug_Elasticsearch__node_3_.xml @@ -0,0 +1,11 @@ + + + + \ No newline at end of file diff --git a/TESTING.asciidoc b/TESTING.asciidoc index 186ac155000d5..f574633a90eca 100644 --- a/TESTING.asciidoc +++ b/TESTING.asciidoc @@ -392,7 +392,8 @@ port of `5007`. NOTE: In the case of test clusters using multiple nodes, multiple debuggers will need to be attached on incrementing ports. For example, for a 3 node cluster ports `5007`, `5008`, and `5009` will attempt to attach to a listening -debugger. +debugger. You can use the "Debug Elasticsearch (node 2)" and "(node 3)" run +configurations should you need to debug a multi-node cluster. You can also use a combination of both flags to debug both tests and server. This is only applicable to Java REST tests. diff --git a/build-tools-internal/src/main/groovy/elasticsearch.runtime-jdk-provision.gradle b/build-tools-internal/src/main/groovy/elasticsearch.runtime-jdk-provision.gradle index 5b87a40ffb211..4a35fd9d447da 100644 --- a/build-tools-internal/src/main/groovy/elasticsearch.runtime-jdk-provision.gradle +++ b/build-tools-internal/src/main/groovy/elasticsearch.runtime-jdk-provision.gradle @@ -61,6 +61,7 @@ configure(allprojects) { project.getTasks().withType(ThirdPartyAuditTask.class).configureEach { if (BuildParams.getIsRuntimeJavaHomeSet() == false) { javaHome.set(providers.provider(() -> "${project.jdks.provisioned_runtime.javaHomePath}")) + targetCompatibility.set(providers.provider(() -> JavaVersion.toVersion(project.jdks.provisioned_runtime.major))) } } } diff --git a/docs/changelog/94000.yaml b/docs/changelog/94000.yaml new file mode 100644 index 0000000000000..debbf2fd205c7 --- /dev/null +++ b/docs/changelog/94000.yaml @@ -0,0 +1,6 @@ +pr: 94000 +summary: Introduce redirect method on IngestDocument +area: Ingest Node +type: enhancement +issues: + - 83653 diff --git a/modules/dlm/src/internalClusterTest/java/org/elasticsearch/dlm/DataLifecycleServiceIT.java b/modules/dlm/src/internalClusterTest/java/org/elasticsearch/dlm/DataLifecycleServiceIT.java index d42679ddb13e9..c9cd33c5a2b62 100644 --- a/modules/dlm/src/internalClusterTest/java/org/elasticsearch/dlm/DataLifecycleServiceIT.java +++ b/modules/dlm/src/internalClusterTest/java/org/elasticsearch/dlm/DataLifecycleServiceIT.java @@ -9,7 +9,6 @@ import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; -import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; @@ -240,11 +239,8 @@ public void testErrorRecordingOnRetention() throws Exception { String firstGenerationIndex = DataStream.getDefaultBackingIndexName(dataStreamName, 1L); // mark the first generation index as read-only so deletion fails when we enable the retention configuration - UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(firstGenerationIndex); - updateSettingsRequest.settings(Settings.builder().put(READ_ONLY.settingName(), true)); + updateIndexSettings(Settings.builder().put(READ_ONLY.settingName(), true), firstGenerationIndex); try { - client().admin().indices().updateSettings(updateSettingsRequest); - // TODO replace this with an API call to update the lifecycle for the data stream once available PlainActionFuture.get( fut -> internalCluster().getCurrentMasterNodeInstance(ClusterService.class) diff --git a/modules/dlm/src/main/java/org/elasticsearch/dlm/DataLifecycleErrorStore.java b/modules/dlm/src/main/java/org/elasticsearch/dlm/DataLifecycleErrorStore.java index 27a8c5ed5943a..4b1febc67ca41 100644 --- a/modules/dlm/src/main/java/org/elasticsearch/dlm/DataLifecycleErrorStore.java +++ b/modules/dlm/src/main/java/org/elasticsearch/dlm/DataLifecycleErrorStore.java @@ -11,6 +11,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.core.Nullable; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -23,38 +24,45 @@ */ public class DataLifecycleErrorStore { - private final ConcurrentMap targetToError = new ConcurrentHashMap<>(); + private final ConcurrentMap indexNameToError = new ConcurrentHashMap<>(); /** - * Records a string representation of the provided exception for the provided target. - * If an error was already recorded for the provided target this will override that error. + * Records a string representation of the provided exception for the provided index. + * If an error was already recorded for the provided index this will override that error. */ - public void recordError(String target, Exception e) { - targetToError.put(target, org.elasticsearch.common.Strings.toString(((builder, params) -> { + public void recordError(String indexName, Exception e) { + indexNameToError.put(indexName, org.elasticsearch.common.Strings.toString(((builder, params) -> { ElasticsearchException.generateThrowableXContent(builder, EMPTY_PARAMS, e); return builder; }))); } /** - * Clears the recorded error for the provided target (if any exists) + * Clears the recorded error for the provided index (if any exists) */ - public void clearRecordedError(String target) { - targetToError.remove(target); + public void clearRecordedError(String indexName) { + indexNameToError.remove(indexName); } /** * Clears all the errors recorded in the store. */ public void clearStore() { - targetToError.clear(); + indexNameToError.clear(); } /** - * Retrieves the recorderd error for the provided target. + * Retrieves the recorderd error for the provided index. */ @Nullable - public String getError(String target) { - return targetToError.get(target); + public String getError(String indexName) { + return indexNameToError.get(indexName); + } + + /** + * Return an immutable view (a snapshot) of the tracked indices at the moment this method is called. + */ + public List getAllIndices() { + return List.copyOf(indexNameToError.keySet()); } } diff --git a/modules/dlm/src/main/java/org/elasticsearch/dlm/DataLifecycleService.java b/modules/dlm/src/main/java/org/elasticsearch/dlm/DataLifecycleService.java index 0259d1e0f605c..3d52e53e4ece5 100644 --- a/modules/dlm/src/main/java/org/elasticsearch/dlm/DataLifecycleService.java +++ b/modules/dlm/src/main/java/org/elasticsearch/dlm/DataLifecycleService.java @@ -47,8 +47,6 @@ import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.function.BiConsumer; -import java.util.function.Consumer; import java.util.function.LongSupplier; /** @@ -136,13 +134,6 @@ public void clusterChanged(ClusterChangedEvent event) { errorStore.clearStore(); } } - if (event.localNodeMaster()) { - // only execute if we're the master - List indicesDeleted = event.indicesDeleted(); - for (Index deleted : indicesDeleted) { - errorStore.clearRecordedError(deleted.getName()); - } - } } @Override @@ -210,9 +201,12 @@ void run(ClusterState state) { */ private void clearErrorStoreForUnmanagedIndices(DataStream dataStream) { Metadata metadata = clusterService.state().metadata(); - for (Index index : dataStream.getIndices()) { - if (dataStream.isIndexManagedByDLM(index, metadata::index) == false) { - errorStore.clearRecordedError(index.getName()); + for (String indexName : errorStore.getAllIndices()) { + IndexMetadata indexMeta = metadata.index(indexName); + if (indexMeta == null) { + errorStore.clearRecordedError(indexName); + } else if (dataStream.isIndexManagedByDLM(indexMeta.getIndex(), metadata::index) == false) { + errorStore.clearRecordedError(indexName); } } } @@ -223,7 +217,7 @@ private void maybeExecuteRollover(ClusterState state, DataStream dataStream) { RolloverRequest rolloverRequest = getDefaultRolloverRequest(dataStream.getName()); transportActionsDeduplicator.executeOnce( rolloverRequest, - new ErrorRecordingActionListener(writeIndex.getName(), errorStore::recordError, errorStore::clearRecordedError), + new ErrorRecordingActionListener(writeIndex.getName(), errorStore), (req, reqListener) -> rolloverDataStream(writeIndex.getName(), rolloverRequest, reqListener) ); } @@ -247,7 +241,7 @@ private void maybeExecuteRetention(ClusterState state, DataStream dataStream) { // time to delete the index transportActionsDeduplicator.executeOnce( deleteRequest, - new ErrorRecordingActionListener(indexName, errorStore::recordError, errorStore::clearRecordedError), + new ErrorRecordingActionListener(indexName, errorStore), (req, reqListener) -> deleteIndex(deleteRequest, retention, reqListener) ); } @@ -348,23 +342,21 @@ static TimeValue getRetentionConfiguration(DataStream dataStream) { */ static class ErrorRecordingActionListener implements ActionListener { private final String targetIndex; - private final BiConsumer recordError; - private final Consumer clearErrorRecord; + private final DataLifecycleErrorStore errorStore; - ErrorRecordingActionListener(String targetIndex, BiConsumer recordError, Consumer clearErrorRecord) { + ErrorRecordingActionListener(String targetIndex, DataLifecycleErrorStore errorStore) { this.targetIndex = targetIndex; - this.recordError = recordError; - this.clearErrorRecord = clearErrorRecord; + this.errorStore = errorStore; } @Override public void onResponse(Void unused) { - clearErrorRecord.accept(targetIndex); + errorStore.clearRecordedError(targetIndex); } @Override public void onFailure(Exception e) { - recordError.accept(targetIndex, e); + errorStore.recordError(targetIndex, e); } } diff --git a/modules/dlm/src/test/java/org/elasticsearch/dlm/DataLifecycleErrorStoreTests.java b/modules/dlm/src/test/java/org/elasticsearch/dlm/DataLifecycleErrorStoreTests.java index be4b94d736b45..570a266603bfb 100644 --- a/modules/dlm/src/test/java/org/elasticsearch/dlm/DataLifecycleErrorStoreTests.java +++ b/modules/dlm/src/test/java/org/elasticsearch/dlm/DataLifecycleErrorStoreTests.java @@ -11,6 +11,10 @@ import org.elasticsearch.test.ESTestCase; import org.junit.Before; +import java.util.List; +import java.util.stream.Stream; + +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @@ -27,6 +31,8 @@ public void setupServices() { public void testRecordAndRetrieveError() { errorStore.recordError("test", new NullPointerException("testing")); assertThat(errorStore.getError("test"), is(notNullValue())); + assertThat(errorStore.getAllIndices().size(), is(1)); + assertThat(errorStore.getAllIndices().get(0), is("test")); } public void testRetrieveAfterClear() { @@ -34,4 +40,32 @@ public void testRetrieveAfterClear() { errorStore.clearStore(); assertThat(errorStore.getError("test"), is(nullValue())); } + + public void testGetAllIndicesIsASnapshotViewOfTheStore() { + Stream.iterate(0, i -> i + 1).limit(5).forEach(i -> errorStore.recordError("test" + i, new NullPointerException("testing"))); + List initialAllIndices = errorStore.getAllIndices(); + assertThat(initialAllIndices.size(), is(5)); + assertThat( + initialAllIndices, + containsInAnyOrder(Stream.iterate(0, i -> i + 1).limit(5).map(i -> "test" + i).toArray(String[]::new)) + ); + + // let's add some more items to the store and clear a couple of the initial ones + Stream.iterate(5, i -> i + 1).limit(5).forEach(i -> errorStore.recordError("test" + i, new NullPointerException("testing"))); + errorStore.clearRecordedError("test0"); + errorStore.clearRecordedError("test1"); + // the initial list should remain unchanged + assertThat(initialAllIndices.size(), is(5)); + assertThat( + initialAllIndices, + containsInAnyOrder(Stream.iterate(0, i -> i + 1).limit(5).map(i -> "test" + i).toArray(String[]::new)) + ); + + // calling getAllIndices again should reflect the latest state + assertThat(errorStore.getAllIndices().size(), is(8)); + assertThat( + errorStore.getAllIndices(), + containsInAnyOrder(Stream.iterate(2, i -> i + 1).limit(8).map(i -> "test" + i).toArray(String[]::new)) + ); + } } diff --git a/modules/dlm/src/test/java/org/elasticsearch/dlm/DataLifecycleServiceTests.java b/modules/dlm/src/test/java/org/elasticsearch/dlm/DataLifecycleServiceTests.java index 2679d514acccd..5f92e8c8e6eb7 100644 --- a/modules/dlm/src/test/java/org/elasticsearch/dlm/DataLifecycleServiceTests.java +++ b/modules/dlm/src/test/java/org/elasticsearch/dlm/DataLifecycleServiceTests.java @@ -17,7 +17,6 @@ import org.elasticsearch.action.admin.indices.rollover.MaxAgeCondition; import org.elasticsearch.action.admin.indices.rollover.RolloverInfo; import org.elasticsearch.action.admin.indices.rollover.RolloverRequest; -import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.DataLifecycle; @@ -36,7 +35,6 @@ import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.Index; -import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.client.NoOpClient; import org.elasticsearch.threadpool.TestThreadPool; @@ -58,6 +56,8 @@ import java.util.concurrent.CopyOnWriteArrayList; import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.newInstance; +import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; +import static org.elasticsearch.test.ClusterServiceUtils.setState; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; @@ -70,6 +70,7 @@ public class DataLifecycleServiceTests extends ESTestCase { private DataLifecycleService dataLifecycleService; private List clientSeenRequests; private NoOpClient client; + private ClusterService clusterService; @Before public void setupServices() { @@ -77,7 +78,7 @@ public void setupServices() { Set> builtInClusterSettings = new HashSet<>(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); builtInClusterSettings.add(DataLifecycleService.DLM_POLL_INTERVAL_SETTING); ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, builtInClusterSettings); - ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool, clusterSettings); + clusterService = createClusterService(threadPool, clusterSettings); now = System.currentTimeMillis(); Clock clock = Clock.fixed(Instant.ofEpochMilli(now), ZoneId.of(randomFrom(ZoneId.getAvailableZoneIds()))); @@ -100,6 +101,7 @@ public void setupServices() { public void cleanup() { clientSeenRequests.clear(); dataLifecycleService.close(); + clusterService.close(); threadPool.shutdownNow(); client.close(); } @@ -218,8 +220,8 @@ public void testDeletedIndicesAreRemovedFromTheErrorStore() throws IOException { builder, dataStreamName, numBackingIndices, - Settings.builder().put(IndexMetadata.LIFECYCLE_NAME, "ILM_policy").put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT), - null + Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT), + new DataLifecycle() ); builder.put(dataStream); String nodeId = "localNode"; @@ -247,9 +249,9 @@ public void testDeletedIndicesAreRemovedFromTheErrorStore() throws IOException { } newStateBuilder.metadata(metaBuilder); ClusterState stateWithDeletedIndices = newStateBuilder.nodes(buildNodes(nodeId).masterNodeId(nodeId)).build(); - ClusterChangedEvent event = new ClusterChangedEvent("_na_", stateWithDeletedIndices, previousState); + setState(clusterService, stateWithDeletedIndices); - dataLifecycleService.clusterChanged(event); + dataLifecycleService.run(stateWithDeletedIndices); for (Index deletedIndex : deletedIndices) { assertThat(dataLifecycleService.getErrorStore().getError(deletedIndex.getName()), nullValue()); @@ -258,54 +260,6 @@ public void testDeletedIndicesAreRemovedFromTheErrorStore() throws IOException { assertThat(dataLifecycleService.getErrorStore().getError(dataStream.getWriteIndex().getName()), notNullValue()); } - public void testErrorStoreIsNotUpdatedIfWeAreNotMaster() { - String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); - int numBackingIndices = 3; - Metadata.Builder builder = Metadata.builder(); - DataStream dataStream = createDataStream( - builder, - dataStreamName, - numBackingIndices, - Settings.builder().put(IndexMetadata.LIFECYCLE_NAME, "ILM_policy").put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT), - null - ); - builder.put(dataStream); - String localNode = "localNode"; - String masterNodeId = "some_other_node"; - DiscoveryNodes.Builder nodesBuilder = buildNodes(localNode).add(getNode(masterNodeId)).masterNodeId(masterNodeId); - ClusterState previousState = ClusterState.builder(ClusterName.DEFAULT).metadata(builder).nodes(nodesBuilder).build(); - - // all backing indices are in the error store - for (Index index : dataStream.getIndices()) { - dataLifecycleService.getErrorStore().recordError(index.getName(), new NullPointerException("bad")); - } - Index writeIndex = dataStream.getWriteIndex(); - // all indices but the write index are deleted - List deletedIndices = dataStream.getIndices().stream().filter(index -> index.equals(writeIndex) == false).toList(); - - ClusterState.Builder newStateBuilder = ClusterState.builder(previousState); - newStateBuilder.stateUUID(UUIDs.randomBase64UUID()); - Metadata.Builder metaBuilder = Metadata.builder(previousState.metadata()); - for (Index index : deletedIndices) { - metaBuilder.remove(index.getName()); - IndexGraveyard.Builder graveyardBuilder = IndexGraveyard.builder(metaBuilder.indexGraveyard()); - graveyardBuilder.addTombstone(index); - metaBuilder.indexGraveyard(graveyardBuilder.build()); - } - newStateBuilder.metadata(metaBuilder); - ClusterState stateWithDeletedIndices = newStateBuilder.nodes( - buildNodes(localNode).add(getNode(masterNodeId)).masterNodeId(masterNodeId) - ).build(); - ClusterChangedEvent event = new ClusterChangedEvent("_na_", stateWithDeletedIndices, previousState); - - dataLifecycleService.clusterChanged(event); - - for (Index index : dataStream.getIndices()) { - // all the errors shoudl still be in the error store as the node where DLM runs is not the master node - assertThat(dataLifecycleService.getErrorStore().getError(index.getName()), notNullValue()); - } - } - public void testErrorStoreIsClearedOnBackingIndexBecomingUnmanaged() { String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); int numBackingIndices = 3; diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/230_change_target_index.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/230_change_target_index.yml index 6a6e8f071024b..d9154174379bd 100644 --- a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/230_change_target_index.yml +++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/230_change_target_index.yml @@ -87,11 +87,7 @@ teardown: index: foo id: "1" - match: { _source.a: true } -# The next is commented out because there's a bug where the final_pipeline is executed twice under certain circumstances -# (See issue https://github.com/elastic/elasticsearch/issues/83653). -# TODO: Uncomment after the issue is fixed, and remove the repeated value test of the current behavior -#- match: { _source.accumulator: [ "non-repeated-value" ] } -- match: { _source.accumulator: [ "non-repeated-value", "non-repeated-value" ] } +- match: { _source.accumulator: [ "non-repeated-value" ] } # only the foo index - do: @@ -150,11 +146,7 @@ teardown: index: foo id: "1" - match: { _source.a: true } -# The next is commented out because there's a bug where the final_pipeline is executed twice under certain circumstances -# (See issue https://github.com/elastic/elasticsearch/issues/83653). -# TODO: Uncomment after the issue is fixed, and remove the repeated value test of the current behavior -#- match: { _source.accumulator: [ "non-repeated-value" ] } -- match: { _source.accumulator: [ "non-repeated-value", "non-repeated-value" ] } +- match: { _source.accumulator: [ "non-repeated-value" ] } # only the foo index - do: diff --git a/qa/ccs-common-rest/src/yamlRestTest/java/org/elasticsearch/test/rest/yaml/RcsCcsCommonYamlTestSuiteIT.java b/qa/ccs-common-rest/src/yamlRestTest/java/org/elasticsearch/test/rest/yaml/RcsCcsCommonYamlTestSuiteIT.java index 2a191223b42fb..6b681d5ef1143 100644 --- a/qa/ccs-common-rest/src/yamlRestTest/java/org/elasticsearch/test/rest/yaml/RcsCcsCommonYamlTestSuiteIT.java +++ b/qa/ccs-common-rest/src/yamlRestTest/java/org/elasticsearch/test/rest/yaml/RcsCcsCommonYamlTestSuiteIT.java @@ -17,8 +17,10 @@ import org.apache.lucene.tests.util.TimeUnits; import org.elasticsearch.Version; import org.elasticsearch.client.Request; +import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.SecureString; import org.elasticsearch.common.settings.Settings; @@ -38,9 +40,11 @@ import org.junit.rules.TestRule; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; import static java.util.Collections.unmodifiableList; import static org.elasticsearch.test.rest.yaml.CcsCommonYamlTestSuiteIT.CCS_APIS; @@ -65,6 +69,7 @@ public class RcsCcsCommonYamlTestSuiteIT extends ESClientYamlSuiteTestCase { private static TestCandidateAwareClient searchYamlTestClient; // the remote cluster is the one we write index operations etc... to private static final String REMOTE_CLUSTER_NAME = "remote_cluster"; + private static final AtomicReference> API_KEY_MAP_REF = new AtomicReference<>(); private static LocalClusterConfigProvider commonClusterConfig = cluster -> cluster.module("x-pack-async-search") .module("aggregations") @@ -98,10 +103,58 @@ public class RcsCcsCommonYamlTestSuiteIT extends ESClientYamlSuiteTestCase { .setting("node.roles", "[data,ingest,master,remote_cluster_client]") .setting("cluster.remote.connections_per_cluster", "1") .apply(commonClusterConfig) + .keystore("cluster.remote." + REMOTE_CLUSTER_NAME + ".credentials", () -> { + if (API_KEY_MAP_REF.get() == null) { + try { + API_KEY_MAP_REF.set(createCrossClusterAccessApiKey()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + return (String) API_KEY_MAP_REF.get().get("encoded"); + }) .rolesFile(Resource.fromClasspath("roles.yml")) .user("remote_search_user", "x-pack-test-password", "remote_search_role") .build(); + private static Map createCrossClusterAccessApiKey() throws IOException { + assert fulfillingCluster != null; + final var createApiKeyRequest = new Request("POST", "/_security/api_key"); + createApiKeyRequest.setJsonEntity(""" + { + "name": "cross_cluster_access_key", + "role_descriptors": { + "role": { + "cluster": ["cross_cluster_access"], + "index": [ + { + "names": ["*"], + "privileges": ["read", "read_cross_cluster"], + "allow_restricted_indices": true + } + ] + } + } + }"""); + createApiKeyRequest.setOptions( + RequestOptions.DEFAULT.toBuilder() + .addHeader("Authorization", basicAuthHeaderValue("test_admin", new SecureString("x-pack-test-password".toCharArray()))) + ); + + final int numberOfFcNodes = fulfillingCluster.getHttpAddresses().split(",").length; + final String url = fulfillingCluster.getHttpAddress(randomIntBetween(0, numberOfFcNodes - 1)); + final int portSeparator = url.lastIndexOf(':'); + final var httpHost = new HttpHost(url.substring(0, portSeparator), Integer.parseInt(url.substring(portSeparator + 1)), "http"); + RestClientBuilder builder = RestClient.builder(httpHost); + configureClient(builder, Settings.EMPTY); + builder.setStrictDeprecationMode(true); + try (RestClient fulfillingClusterClient = builder.build()) { + final Response createApiKeyResponse = fulfillingClusterClient.performRequest(createApiKeyRequest); + assertOK(createApiKeyResponse); + return responseAsMap(createApiKeyResponse); + } + } + @ClassRule // Use a RuleChain to ensure that remote cluster is started before local cluster public static TestRule clusterRule = RuleChain.outerRule(fulfillingCluster).around(queryCluster); @@ -206,30 +259,7 @@ public void initSearchClient() throws IOException { } private static void configureRemoteCluster() throws IOException { - final var createApiKeyRequest = new Request("POST", "/_security/api_key"); - createApiKeyRequest.setJsonEntity(""" - { - "name": "cross_cluster_access_key", - "role_descriptors": { - "role": { - "cluster": ["cross_cluster_access"], - "index": [ - { - "names": ["*"], - "privileges": ["read", "read_cross_cluster"], - "allow_restricted_indices": true - } - ] - } - } - }"""); - final Response createApiKeyResponse = adminClient().performRequest(createApiKeyRequest); - assertOK(createApiKeyResponse); - final Map apiKeyMap = responseAsMap(createApiKeyResponse); - final String encodedCrossClusterAccessApiKey = (String) apiKeyMap.get("encoded"); - - final Settings.Builder builder = Settings.builder() - .put("cluster.remote." + REMOTE_CLUSTER_NAME + ".credentials", encodedCrossClusterAccessApiKey); + final Settings.Builder builder = Settings.builder(); if (randomBoolean()) { builder.put("cluster.remote." + REMOTE_CLUSTER_NAME + ".mode", "proxy") .put("cluster.remote." + REMOTE_CLUSTER_NAME + ".proxy_address", fulfillingCluster.getRemoteClusterServerEndpoint(0)); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java index a2afa9a6285ae..003d8bcf9fafe 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java @@ -28,6 +28,7 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.ingest.AbstractProcessor; +import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.PipelineConfiguration; import org.elasticsearch.ingest.Processor; @@ -48,6 +49,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.function.BiConsumer; import java.util.function.Supplier; @@ -97,6 +99,26 @@ public void testFinalPipelineCantChangeDestination() { ); } + public void testFinalPipelineCantRerouteDestination() { + final Settings settings = Settings.builder().put(IndexSettings.FINAL_PIPELINE.getKey(), "final_pipeline").build(); + createIndex("index", settings); + + final BytesReference finalPipelineBody = new BytesArray(""" + {"processors": [{"reroute": {}}]}"""); + client().admin().cluster().putPipeline(new PutPipelineRequest("final_pipeline", finalPipelineBody, XContentType.JSON)).actionGet(); + + final IllegalStateException e = expectThrows( + IllegalStateException.class, + () -> client().prepareIndex("index").setId("1").setSource(Map.of("field", "value")).get() + ); + assertThat( + e, + hasToString( + endsWith("final pipeline [final_pipeline] can't change the target index (from [index] to [target]) for document [1]") + ) + ); + } + public void testFinalPipelineOfOldDestinationIsNotInvoked() { Settings settings = Settings.builder() .put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline") @@ -187,6 +209,73 @@ public void testDefaultPipelineOfNewDestinationIsNotInvoked() { assertFalse(target.getHits().getAt(0).getSourceAsMap().containsKey("final")); } + public void testDefaultPipelineOfRerouteDestinationIsInvoked() { + Settings settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline").build(); + createIndex("index", settings); + + settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "target_default_pipeline").build(); + createIndex("target", settings); + + BytesReference defaultPipelineBody = new BytesArray(""" + {"processors": [{"reroute": {}}]}"""); + client().admin() + .cluster() + .putPipeline(new PutPipelineRequest("default_pipeline", defaultPipelineBody, XContentType.JSON)) + .actionGet(); + + BytesReference targetPipeline = new BytesArray(""" + {"processors": [{"final": {}}]}"""); + client().admin() + .cluster() + .putPipeline(new PutPipelineRequest("target_default_pipeline", targetPipeline, XContentType.JSON)) + .actionGet(); + + IndexResponse indexResponse = client().prepareIndex("index") + .setId("1") + .setSource(Map.of("field", "value")) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get(); + assertEquals(RestStatus.CREATED, indexResponse.status()); + SearchResponse target = client().prepareSearch("target").get(); + assertEquals(1, target.getHits().getTotalHits().value); + assertTrue(target.getHits().getAt(0).getSourceAsMap().containsKey("final")); + } + + public void testAvoidIndexingLoop() { + Settings settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline").build(); + createIndex("index", settings); + + settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "target_default_pipeline").build(); + createIndex("target", settings); + + BytesReference defaultPipelineBody = new BytesArray(""" + {"processors": [{"reroute": {"dest": "target"}}]}"""); + client().admin() + .cluster() + .putPipeline(new PutPipelineRequest("default_pipeline", defaultPipelineBody, XContentType.JSON)) + .actionGet(); + + BytesReference targetPipeline = new BytesArray(""" + {"processors": [{"reroute": {"dest": "index"}}]}"""); + client().admin() + .cluster() + .putPipeline(new PutPipelineRequest("target_default_pipeline", targetPipeline, XContentType.JSON)) + .actionGet(); + + IllegalStateException exception = expectThrows( + IllegalStateException.class, + () -> client().prepareIndex("index") + .setId("1") + .setSource(Map.of("dest", "index")) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get() + ); + assertThat( + exception.getMessage(), + equalTo("index cycle detected while processing pipeline [target_default_pipeline] for document [1]: [index, target, index]") + ); + } + public void testFinalPipeline() { final Settings settings = Settings.builder().put(IndexSettings.FINAL_PIPELINE.getKey(), "final_pipeline").build(); createIndex("index", settings); @@ -393,6 +482,26 @@ public String getType() { return "changing_dest"; } + }, + "reroute", + (processorFactories, tag, description, config) -> { + final String dest = Objects.requireNonNullElse( + ConfigurationUtils.readOptionalStringProperty(description, tag, config, "dest"), + "target" + ); + return new AbstractProcessor(tag, description) { + @Override + public IngestDocument execute(final IngestDocument ingestDocument) throws Exception { + ingestDocument.reroute(dest); + return ingestDocument; + } + + @Override + public String getType() { + return "reroute"; + } + + }; } ); } diff --git a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java index 2910ab11e8c94..3aa9a68bd3d3a 100644 --- a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java @@ -141,7 +141,7 @@ public void execute(IngestDocument ingestDocument, BiConsumer handler) { assert currentProcessor <= processorsWithMetrics.size(); - if (currentProcessor == processorsWithMetrics.size()) { + if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isReroute()) { handler.accept(ingestDocument, null); return; } @@ -150,7 +150,9 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, final BiC Processor processor; IngestMetric metric; // iteratively execute any sync processors - while (currentProcessor < processorsWithMetrics.size() && processorsWithMetrics.get(currentProcessor).v1().isAsync() == false) { + while (currentProcessor < processorsWithMetrics.size() + && processorsWithMetrics.get(currentProcessor).v1().isAsync() == false + && ingestDocument.isReroute() == false) { processorWithMetric = processorsWithMetrics.get(currentProcessor); processor = processorWithMetric.v1(); metric = processorWithMetric.v2(); @@ -176,7 +178,7 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, final BiC } assert currentProcessor <= processorsWithMetrics.size(); - if (currentProcessor == processorsWithMetrics.size()) { + if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isReroute()) { handler.accept(ingestDocument, null); return; } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index f471926087ae5..07f7856323fb7 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -62,8 +62,8 @@ public final class IngestDocument { // Contains all pipelines that have been executed for this document private final Set executedPipelines = new LinkedHashSet<>(); - private boolean doNoSelfReferencesCheck = false; + private boolean reroute = false; public IngestDocument(String index, String id, long version, String routing, VersionType versionType, Map source) { this.ctxMap = new IngestCtxMap(index, id, version, routing, versionType, ZonedDateTime.now(ZoneOffset.UTC), source); @@ -80,6 +80,7 @@ public IngestDocument(IngestDocument other) { new IngestCtxMap(deepCopyMap(other.ctxMap.getSource()), other.ctxMap.getMetadata().clone()), deepCopyMap(other.ingestMetadata) ); + this.reroute = other.reroute; } /** @@ -903,6 +904,29 @@ public String toString() { return "IngestDocument{" + " sourceAndMetadata=" + ctxMap + ", ingestMetadata=" + ingestMetadata + '}'; } + public void reroute(String destIndex) { + getMetadata().setIndex(destIndex); + reroute = true; + } + + /** + * The document is redirected to another target. + * This implies that we'll skip the current pipeline and invoke the default pipeline of the new target + * + * @return whether the document is redirected to another target + */ + boolean isReroute() { + return reroute; + } + + /** + * Set the {@link #reroute} flag to false so that subsequent calls to {@link #isReroute()} will return false until/unless + * {@link #reroute(String)} is called. + */ + void resetReroute() { + reroute = false; + } + public enum Metadata { INDEX(IndexFieldMapper.NAME), TYPE("_type"), diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 83d982e1f4176..3cfa5d6fd444e 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -47,6 +47,7 @@ import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; @@ -68,6 +69,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Locale; @@ -441,6 +443,10 @@ public static boolean isNoOpPipelineUpdate(ClusterState state, PutPipelineReques * Returns the pipeline by the specified id */ public Pipeline getPipeline(String id) { + if (id == null) { + return null; + } + PipelineHolder holder = pipelines.get(id); if (holder != null) { return holder.pipeline; @@ -644,21 +650,8 @@ protected void doRun() { continue; } - final String pipelineId = indexRequest.getPipeline(); - indexRequest.setPipeline(NOOP_PIPELINE_NAME); - final String finalPipelineId = indexRequest.getFinalPipeline(); - indexRequest.setFinalPipeline(NOOP_PIPELINE_NAME); - boolean hasFinalPipeline = true; - final List pipelines; - if (IngestService.NOOP_PIPELINE_NAME.equals(pipelineId) == false - && IngestService.NOOP_PIPELINE_NAME.equals(finalPipelineId) == false) { - pipelines = List.of(pipelineId, finalPipelineId); - } else if (IngestService.NOOP_PIPELINE_NAME.equals(pipelineId) == false) { - pipelines = List.of(pipelineId); - hasFinalPipeline = false; - } else if (IngestService.NOOP_PIPELINE_NAME.equals(finalPipelineId) == false) { - pipelines = List.of(finalPipelineId); - } else { + PipelineIterator pipelines = getAndResetPipelines(indexRequest); + if (pipelines.hasNext() == false) { i++; continue; } @@ -693,8 +686,9 @@ public void onFailure(Exception e) { }); IngestDocument ingestDocument = newIngestDocument(indexRequest); - executePipelines(pipelines.iterator(), hasFinalPipeline, indexRequest, ingestDocument, documentListener); - + LinkedHashSet indexRecursionDetection = new LinkedHashSet<>(); + indexRecursionDetection.add(indexRequest.index()); + executePipelines(pipelines, indexRequest, ingestDocument, documentListener, indexRecursionDetection); i++; } } @@ -702,21 +696,99 @@ public void onFailure(Exception e) { }); } + /** + * Returns the pipelines of the request, and updates the request so that it no longer references + * any pipelines (both the default and final pipeline are set to the noop pipeline). + */ + private PipelineIterator getAndResetPipelines(IndexRequest indexRequest) { + final String pipelineId = indexRequest.getPipeline(); + indexRequest.setPipeline(NOOP_PIPELINE_NAME); + final String finalPipelineId = indexRequest.getFinalPipeline(); + indexRequest.setFinalPipeline(NOOP_PIPELINE_NAME); + return new PipelineIterator(pipelineId, finalPipelineId); + } + + /** + * A triple for tracking the non-null id of a pipeline, the pipeline itself, and whether the pipeline is a final pipeline. + * + * @param id the non-null id of the pipeline + * @param pipeline a possibly-null reference to the pipeline for the given pipeline id + * @param isFinal true if the pipeline is a final pipeline + */ + private record PipelineSlot(String id, @Nullable Pipeline pipeline, boolean isFinal) { + public PipelineSlot { + Objects.requireNonNull(id); + } + } + + private class PipelineIterator implements Iterator { + + private final String defaultPipeline; + private final String finalPipeline; + private final Iterator pipelineSlotIterator; + + private PipelineIterator(String defaultPipeline, String finalPipeline) { + this.defaultPipeline = NOOP_PIPELINE_NAME.equals(defaultPipeline) ? null : defaultPipeline; + this.finalPipeline = NOOP_PIPELINE_NAME.equals(finalPipeline) ? null : finalPipeline; + this.pipelineSlotIterator = iterator(); + } + + public PipelineIterator withoutDefaultPipeline() { + return new PipelineIterator(null, finalPipeline); + } + + private Iterator iterator() { + PipelineSlot defaultPipelineSlot = null, finalPipelineSlot = null; + if (defaultPipeline != null) { + defaultPipelineSlot = new PipelineSlot(defaultPipeline, getPipeline(defaultPipeline), false); + } + if (finalPipeline != null) { + finalPipelineSlot = new PipelineSlot(finalPipeline, getPipeline(finalPipeline), true); + } + + if (defaultPipeline != null && finalPipeline != null) { + return List.of(defaultPipelineSlot, finalPipelineSlot).iterator(); + } else if (finalPipeline != null) { + return List.of(finalPipelineSlot).iterator(); + } else if (defaultPipeline != null) { + return List.of(defaultPipelineSlot).iterator(); + } else { + return Collections.emptyIterator(); + } + } + + @Override + public boolean hasNext() { + return pipelineSlotIterator.hasNext(); + } + + @Override + public PipelineSlot next() { + return pipelineSlotIterator.next(); + } + } + private void executePipelines( - final Iterator pipelineIds, - final boolean hasFinalPipeline, + final PipelineIterator pipelines, final IndexRequest indexRequest, final IngestDocument ingestDocument, - final ActionListener listener + final ActionListener listener, + final Set indexRecursionDetection ) { - assert pipelineIds.hasNext(); - final String pipelineId = pipelineIds.next(); + assert pipelines.hasNext(); + PipelineSlot slot = pipelines.next(); + final String pipelineId = slot.id(); + final Pipeline pipeline = slot.pipeline(); + final boolean isFinalPipeline = slot.isFinal(); + + // reset the reroute flag, at the start of a new pipeline execution this document hasn't been rerouted yet + ingestDocument.resetReroute(); + try { - final PipelineHolder holder = pipelines.get(pipelineId); - if (holder == null) { + if (pipeline == null) { throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist"); } - final Pipeline pipeline = holder.pipeline; + final String originalIndex = indexRequest.indices()[0]; executePipeline(ingestDocument, pipeline, (keep, e) -> { assert keep != null; @@ -765,12 +837,12 @@ private void executePipelines( return; // document failed! } - Iterator newPipelineIds = pipelineIds; - boolean newHasFinalPipeline = hasFinalPipeline; + PipelineIterator newPipelines = pipelines; final String newIndex = indexRequest.indices()[0]; if (Objects.equals(originalIndex, newIndex) == false) { - if (hasFinalPipeline && pipelineIds.hasNext() == false) { + // final pipelines cannot change the target index (either directly or by way of a reroute) + if (isFinalPipeline) { listener.onFailure( new IllegalStateException( format( @@ -783,20 +855,40 @@ private void executePipelines( ) ); return; // document failed! - } else { - indexRequest.isPipelineResolved(false); - resolvePipelinesAndUpdateIndexRequest(null, indexRequest, state.metadata()); - if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getFinalPipeline()) == false) { - newPipelineIds = Collections.singleton(indexRequest.getFinalPipeline()).iterator(); - newHasFinalPipeline = true; - } else { - newPipelineIds = Collections.emptyIterator(); - } + } + + // check for cycles in the visited indices + if (indexRecursionDetection.add(newIndex) == false) { + List indexRoute = new ArrayList<>(indexRecursionDetection); + indexRoute.add(newIndex); + listener.onFailure( + new IllegalStateException( + format( + "index cycle detected while processing pipeline [%s] for document [%s]: %s", + pipelineId, + indexRequest.id(), + indexRoute + ) + ) + ); + return; // document failed! + } + + // clear the current pipeline, then re-resolve the pipelines for this request + indexRequest.setPipeline(null); + indexRequest.isPipelineResolved(false); + resolvePipelinesAndUpdateIndexRequest(null, indexRequest, state.metadata()); + newPipelines = getAndResetPipelines(indexRequest); + + // for backwards compatibility, when a pipeline changes the target index for a document without using the reroute + // mechanism, do not invoke the default pipeline of the new target index + if (ingestDocument.isReroute() == false) { + newPipelines = newPipelines.withoutDefaultPipeline(); } } - if (newPipelineIds.hasNext()) { - executePipelines(newPipelineIds, newHasFinalPipeline, indexRequest, ingestDocument, listener); + if (newPipelines.hasNext()) { + executePipelines(newPipelines, indexRequest, ingestDocument, listener, indexRecursionDetection); } else { // update the index request's source and (potentially) cache the timestamp for TSDB updateIndexRequestSource(indexRequest, ingestDocument); diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java index 3abc8b19f0662..641e87bbe3bae 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java @@ -104,7 +104,6 @@ public void listenForUpdates(ClusterSettings clusterSettings) { List> remoteClusterSettings = Stream.of( RemoteClusterService.REMOTE_CLUSTER_COMPRESS, RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE, - TcpTransport.isUntrustedRemoteClusterEnabled() ? RemoteClusterService.REMOTE_CLUSTER_CREDENTIALS : null, RemoteConnectionStrategy.REMOTE_CONNECTION_MODE, SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY, SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS, diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index b8f9898e519ba..b2ad7e94db425 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -57,11 +57,14 @@ final class RemoteClusterConnection implements Closeable { * @param settings the nodes settings object * @param clusterAlias the configured alias of the cluster to connect to * @param transportService the local nodes transport service + * @param credentialsProtected Whether the remote cluster is protected by a credentials, i.e. it has a credentials configured + * via secure setting. This means the remote cluster uses the new configurable access RCS model + * (as opposed to the basic model). */ - RemoteClusterConnection(Settings settings, String clusterAlias, TransportService transportService) { + RemoteClusterConnection(Settings settings, String clusterAlias, TransportService transportService, boolean credentialsProtected) { this.transportService = transportService; this.clusterAlias = clusterAlias; - ConnectionProfile profile = RemoteConnectionStrategy.buildConnectionProfile(clusterAlias, settings); + ConnectionProfile profile = RemoteConnectionStrategy.buildConnectionProfile(clusterAlias, settings, credentialsProtected); this.remoteConnectionManager = new RemoteConnectionManager(clusterAlias, createConnectionManager(profile, transportService)); this.connectionStrategy = RemoteConnectionStrategy.buildStrategy(clusterAlias, transportService, remoteConnectionManager, settings); // we register the transport service here as a listener to make sure we notify handlers on disconnect etc. diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index 831b966d17a82..5911952c0d34e 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -21,6 +21,8 @@ import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.SecureSetting; +import org.elasticsearch.common.settings.SecureString; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; @@ -122,10 +124,10 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl ) ); - public static final Setting.AffixSetting REMOTE_CLUSTER_CREDENTIALS = Setting.affixKeySetting( + public static final Setting.AffixSetting REMOTE_CLUSTER_CREDENTIALS = Setting.affixKeySetting( "cluster.remote.", "credentials", - key -> Setting.simpleString(key, v -> {}, Setting.Property.Dynamic, Setting.Property.NodeScope, Setting.Property.Filtered) + key -> SecureSetting.secureString(key, null) ); public static final String REMOTE_CLUSTER_HANDSHAKE_ACTION_NAME = "cluster:internal/remote_cluster/handshake"; @@ -143,14 +145,16 @@ public boolean isRemoteClusterServerEnabled() { private final TransportService transportService; private final Map remoteClusters = ConcurrentCollections.newConcurrentMap(); + private final Set credentialsProtectedRemoteClusters; RemoteClusterService(Settings settings, TransportService transportService) { super(settings); this.enabled = DiscoveryNode.isRemoteClusterClient(settings); this.remoteClusterServerEnabled = REMOTE_CLUSTER_SERVER_ENABLED.get(settings); this.transportService = transportService; + this.credentialsProtectedRemoteClusters = REMOTE_CLUSTER_CREDENTIALS.getAsMap(settings).keySet(); - if (RemoteClusterPortSettings.REMOTE_CLUSTER_SERVER_ENABLED.get(settings)) { + if (remoteClusterServerEnabled) { registerRemoteClusterHandshakeRequestHandler(transportService); } } @@ -320,7 +324,12 @@ synchronized void updateRemoteCluster(String clusterAlias, Settings newSettings, if (remote == null) { // this is a new cluster we have to add a new representation Settings finalSettings = Settings.builder().put(this.settings, false).put(newSettings, false).build(); - remote = new RemoteClusterConnection(finalSettings, clusterAlias, transportService); + remote = new RemoteClusterConnection( + finalSettings, + clusterAlias, + transportService, + credentialsProtectedRemoteClusters.contains(clusterAlias) + ); remoteClusters.put(clusterAlias, remote); remote.ensureConnected(listener); } else if (remote.shouldRebuildConnection(newSettings)) { @@ -332,7 +341,12 @@ synchronized void updateRemoteCluster(String clusterAlias, Settings newSettings, } remoteClusters.remove(clusterAlias); Settings finalSettings = Settings.builder().put(this.settings, false).put(newSettings, false).build(); - remote = new RemoteClusterConnection(finalSettings, clusterAlias, transportService); + remote = new RemoteClusterConnection( + finalSettings, + clusterAlias, + transportService, + credentialsProtectedRemoteClusters.contains(clusterAlias) + ); remoteClusters.put(clusterAlias, remote); remote.ensureConnected(listener); } else { diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java index 21dd72691bc7a..f0b837fa27cc5 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java @@ -44,7 +44,6 @@ import java.util.stream.Stream; import static org.elasticsearch.core.Strings.format; -import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_CREDENTIALS; public abstract class RemoteConnectionStrategy implements TransportConnectionListener, Closeable { @@ -141,8 +140,8 @@ public Writeable.Reader getReader() { connectionManager.addListener(this); } - static ConnectionProfile buildConnectionProfile(String clusterAlias, Settings settings) { - final String transportProfile = REMOTE_CLUSTER_CREDENTIALS.getConcreteSettingForNamespace(clusterAlias).exists(settings) + static ConnectionProfile buildConnectionProfile(String clusterAlias, Settings settings, boolean credentialsProtected) { + final String transportProfile = credentialsProtected ? RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE : TransportSettings.DEFAULT_PROFILE; diff --git a/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java index 4bc581594d8a4..327649a9819ba 100644 --- a/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java @@ -532,6 +532,85 @@ public void testMultipleProcessorsDoNotIgnoreFailures() { } } + public void testSkipPipeline() { + TestProcessor processor1 = new TestProcessor(doc -> doc.reroute("foo")); + TestProcessor processor2 = new TestProcessor(new RuntimeException("this processor was expected to be skipped")); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); + CompoundProcessor compoundProcessor = new CompoundProcessor( + false, + List.of(processor1, processor2), + List.of(), + relativeTimeProvider + ); + executeCompound(compoundProcessor, ingestDocument, (result, e) -> {}); + assertThat(processor1.getInvokedCounter(), equalTo(1)); + assertStats(0, compoundProcessor, 0, 1, 0, 0); + assertThat(processor2.getInvokedCounter(), equalTo(0)); + assertStats(1, compoundProcessor, 0, 0, 0, 0); + } + + public void testSkipAsyncProcessor() { + TestProcessor processor1 = new TestProcessor(doc -> doc.reroute("foo")) { + @Override + public boolean isAsync() { + return true; + } + }; + TestProcessor processor2 = new TestProcessor(new RuntimeException("this processor was expected to be skipped")); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); + CompoundProcessor compoundProcessor = new CompoundProcessor( + false, + List.of(processor1, processor2), + List.of(), + relativeTimeProvider + ); + executeCompound(compoundProcessor, ingestDocument, (result, e) -> {}); + assertThat(processor1.getInvokedCounter(), equalTo(1)); + assertStats(0, compoundProcessor, 0, 1, 0, 0); + assertThat(processor2.getInvokedCounter(), equalTo(0)); + assertStats(1, compoundProcessor, 0, 0, 0, 0); + } + + public void testSkipProcessorIgnoreFailure() { + TestProcessor processor1 = new TestProcessor(doc -> { + doc.reroute("foo"); + throw new RuntimeException("simulate processor failure after calling skipCurrentPipeline()"); + }); + TestProcessor processor2 = new TestProcessor(doc -> {}); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); + CompoundProcessor compoundProcessor = new CompoundProcessor(true, List.of(processor1, processor2), List.of(), relativeTimeProvider); + executeCompound(compoundProcessor, ingestDocument, (result, e) -> {}); + assertThat(processor1.getInvokedCounter(), equalTo(1)); + assertStats(0, compoundProcessor, 0, 1, 1, 0); + assertThat(processor2.getInvokedCounter(), equalTo(0)); + assertStats(1, compoundProcessor, 0, 0, 0, 0); + } + + public void testDontSkipFailureProcessor() { + TestProcessor processor = new TestProcessor(doc -> { + doc.reroute("foo"); + throw new RuntimeException("simulate processor failure after calling skipCurrentPipeline()"); + }); + TestProcessor failureProcessor1 = new TestProcessor(doc -> {}); + TestProcessor failureProcessor2 = new TestProcessor(doc -> {}); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); + CompoundProcessor compoundProcessor = new CompoundProcessor( + false, + List.of(processor), + List.of(failureProcessor1, failureProcessor2), + relativeTimeProvider + ); + executeCompound(compoundProcessor, ingestDocument, (result, e) -> {}); + assertThat(processor.getInvokedCounter(), equalTo(1)); + assertStats(0, compoundProcessor, 0, 1, 1, 0); + assertThat(failureProcessor1.getInvokedCounter(), equalTo(1)); + assertThat(failureProcessor2.getInvokedCounter(), equalTo(1)); + } + private TestProcessor getTestProcessor(String tag, boolean isAsync, boolean shouldThrowException) { return new TestProcessor( tag, diff --git a/server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java index de84cb0cc3a73..37b86567b7097 100644 --- a/server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java @@ -48,7 +48,7 @@ public class ProxyConnectionStrategyTests extends ESTestCase { private final String clusterAlias = "cluster-alias"; private final String modeKey = RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).getKey(); private final Settings settings = Settings.builder().put(modeKey, "proxy").build(); - private final ConnectionProfile profile = RemoteConnectionStrategy.buildConnectionProfile("cluster", settings); + private final ConnectionProfile profile = RemoteConnectionStrategy.buildConnectionProfile("cluster", settings, false); private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); @Override diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index 5b38da57e98b2..693983998ef2e 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.xcontent.XContentHelper; @@ -248,7 +249,7 @@ public void run() { AtomicReference exceptionReference = new AtomicReference<>(); String clusterAlias = "test-cluster"; Settings settings = buildRandomSettings(clusterAlias, addresses(seedNode)); - try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service)) { + try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service, randomBoolean())) { ActionListener listener = ActionListener.wrap(x -> { listenerCalled.countDown(); fail("expected exception"); @@ -308,7 +309,7 @@ public void testCloseWhileConcurrentlyConnecting() throws IOException, Interrupt service.acceptIncomingRequests(); String clusterAlias = "test-cluster"; Settings settings = buildRandomSettings(clusterAlias, seedNodes); - try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service)) { + try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service, randomBoolean())) { int numThreads = randomIntBetween(4, 10); Thread[] threads = new Thread[numThreads]; CyclicBarrier barrier = new CyclicBarrier(numThreads + 1); @@ -448,15 +449,16 @@ private void doTestGetConnectionInfo(boolean hasClusterCredentials) throws Excep .put(SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER.getKey(), maxNumConnections) .build(); if (hasClusterCredentials) { - settings = Settings.builder() - .put(settings) - .put( - RemoteClusterService.REMOTE_CLUSTER_CREDENTIALS.getConcreteSettingForNamespace(clusterAlias).getKey(), - randomAlphaOfLength(20) - ) - .build(); + final MockSecureSettings secureSettings = new MockSecureSettings(); + secureSettings.setString( + RemoteClusterService.REMOTE_CLUSTER_CREDENTIALS.getConcreteSettingForNamespace(clusterAlias).getKey(), + randomAlphaOfLength(20) + ); + settings = Settings.builder().put(settings).setSecureSettings(secureSettings).build(); } - try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service)) { + try ( + RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service, hasClusterCredentials) + ) { // test no nodes connected RemoteConnectionInfo remoteConnectionInfo = assertSerialization(connection.getConnectionInfo()); assertNotNull(remoteConnectionInfo); @@ -635,16 +637,17 @@ private void doTestCollectNodes(boolean hasClusterCredentials) throws Exception Settings settings = buildRandomSettings(clusterAlias, addresses(seedNode)); if (hasClusterCredentials) { - settings = Settings.builder() - .put(settings) - .put( - RemoteClusterService.REMOTE_CLUSTER_CREDENTIALS.getConcreteSettingForNamespace(clusterAlias).getKey(), - randomAlphaOfLength(20) - ) - .build(); + final MockSecureSettings secureSettings = new MockSecureSettings(); + secureSettings.setString( + RemoteClusterService.REMOTE_CLUSTER_CREDENTIALS.getConcreteSettingForNamespace(clusterAlias).getKey(), + randomAlphaOfLength(20) + ); + settings = Settings.builder().put(settings).setSecureSettings(secureSettings).build(); } - try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service)) { + try ( + RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service, hasClusterCredentials) + ) { CountDownLatch responseLatch = new CountDownLatch(1); AtomicReference> reference = new AtomicReference<>(); AtomicReference failReference = new AtomicReference<>(); @@ -687,7 +690,7 @@ public void testNoChannelsExceptREG() throws Exception { String clusterAlias = "test-cluster"; Settings settings = buildRandomSettings(clusterAlias, addresses(seedNode)); - try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service)) { + try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service, false)) { PlainActionFuture plainActionFuture = new PlainActionFuture<>(); connection.ensureConnected(plainActionFuture); plainActionFuture.get(10, TimeUnit.SECONDS); @@ -753,7 +756,7 @@ public void testConnectedNodesConcurrentAccess() throws IOException, Interrupted String clusterAlias = "test-cluster"; Settings settings = buildRandomSettings(clusterAlias, seedNodes); - try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service)) { + try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service, randomBoolean())) { final int numGetThreads = randomIntBetween(4, 10); final Thread[] getThreads = new Thread[numGetThreads]; final int numModifyingThreads = randomIntBetween(4, 10); @@ -842,7 +845,7 @@ public void testGetConnection() throws Exception { service.acceptIncomingRequests(); String clusterAlias = "test-cluster"; Settings settings = buildRandomSettings(clusterAlias, addresses(seedNode)); - try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service)) { + try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service, false)) { PlainActionFuture.get(fut -> connection.ensureConnected(fut.map(x -> null))); for (int i = 0; i < 10; i++) { // always a direct connection as the remote node is already connected diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index 6ba327b3fa817..bc891988b8ad2 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -18,6 +18,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.AbstractScopedSettings; import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.IOUtils; @@ -1129,6 +1130,95 @@ public void testRemoteClusterServiceNotEnabledGetCollectNodes() { } } + public void testUseDifferentTransportProfileForCredentialsProtectedRemoteClusters() throws IOException, InterruptedException { + final List knownNodes = new CopyOnWriteArrayList<>(); + try ( + MockTransportService c1 = startTransport( + "cluster_1", + knownNodes, + Version.CURRENT, + TransportVersion.CURRENT, + Settings.builder() + .put(RemoteClusterPortSettings.REMOTE_CLUSTER_SERVER_ENABLED.getKey(), "true") + .put(RemoteClusterPortSettings.PORT.getKey(), "0") + .build() + ); + MockTransportService c2 = startTransport("cluster_2", knownNodes, Version.CURRENT, TransportVersion.CURRENT); + ) { + final DiscoveryNode c1Node = c1.getLocalDiscoNode().withTransportAddress(c1.boundRemoteAccessAddress().publishAddress()); + final DiscoveryNode c2Node = c2.getLocalDiscoNode(); + + final MockSecureSettings secureSettings = new MockSecureSettings(); + secureSettings.setString("cluster.remote.cluster_1.credentials", randomAlphaOfLength(10)); + final Settings settings = Settings.builder().setSecureSettings(secureSettings).build(); + try ( + MockTransportService transportService = MockTransportService.createNewService( + settings, + Version.CURRENT, + TransportVersion.CURRENT, + threadPool, + null + ) + ) { + // remote cluster_1 has a credentials and uses the _remote_cluster transport profile + transportService.addConnectBehavior(c1Node.getAddress(), (transport, discoveryNode, profile, listener) -> { + assertThat(profile.getTransportProfile(), equalTo("_remote_cluster")); + transport.openConnection(discoveryNode, profile, listener); + }); + // remote cluster_2 has no credentials and uses legacy model + transportService.addConnectBehavior(c2Node.getAddress(), (transport, discoveryNode, profile, listener) -> { + assertThat(profile.getTransportProfile(), equalTo("default")); + transport.openConnection(discoveryNode, profile, listener); + }); + transportService.start(); + transportService.acceptIncomingRequests(); + try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) { + service.initializeRemoteClusters(); + + final CountDownLatch firstLatch = new CountDownLatch(1); + final Settings.Builder firstRemoteClusterSettingsBuilder = Settings.builder(); + final boolean firstRemoteClusterProxyMode = randomBoolean(); + if (firstRemoteClusterProxyMode) { + firstRemoteClusterSettingsBuilder.put("cluster.remote.cluster_1.mode", "proxy") + .put("cluster.remote.cluster_1.proxy_address", c1Node.getAddress().toString()); + } else { + firstRemoteClusterSettingsBuilder.put("cluster.remote.cluster_1.seeds", c1Node.getAddress().toString()); + } + service.updateRemoteCluster("cluster_1", firstRemoteClusterSettingsBuilder.build(), connectionListener(firstLatch)); + firstLatch.await(); + + final CountDownLatch secondLatch = new CountDownLatch(1); + final Settings.Builder secondRemoteClusterSettingsBuilder = Settings.builder(); + final boolean secondRemoteClusterProxyMode = randomBoolean(); + if (secondRemoteClusterProxyMode) { + secondRemoteClusterSettingsBuilder.put("cluster.remote.cluster_2.mode", "proxy") + .put("cluster.remote.cluster_2.proxy_address", c2Node.getAddress().toString()); + } else { + secondRemoteClusterSettingsBuilder.put("cluster.remote.cluster_2.seeds", c2Node.getAddress().toString()); + } + service.updateRemoteCluster("cluster_2", secondRemoteClusterSettingsBuilder.build(), connectionListener(secondLatch)); + secondLatch.await(); + + assertTrue(service.isCrossClusterSearchEnabled()); + assertTrue(service.isRemoteClusterRegistered("cluster_1")); + if (firstRemoteClusterProxyMode) { + assertFalse(service.isRemoteNodeConnected("cluster_1", c1Node)); + } else { + assertTrue(service.isRemoteNodeConnected("cluster_1", c1Node)); + } + assertTrue(service.isRemoteClusterRegistered("cluster_2")); + if (secondRemoteClusterProxyMode) { + assertFalse(service.isRemoteNodeConnected("cluster_2", c2Node)); + } else { + assertTrue(service.isRemoteNodeConnected("cluster_2", c2Node)); + } + // No local node connection + assertEquals(0, transportService.getConnectionManager().size()); + } + } + } + } + private static Settings createSettings(String clusterAlias, List seeds) { Settings.Builder builder = Settings.builder(); builder.put( diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterSettingsTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterSettingsTests.java index acb94a2fd2430..c61dc93f962c6 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterSettingsTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterSettingsTests.java @@ -10,12 +10,12 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; -import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.MockSecureSettings; +import org.elasticsearch.common.settings.SecureSetting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.node.NodeRoleSettings; import org.elasticsearch.test.ESTestCase; -import org.hamcrest.Matchers; import java.util.concurrent.TimeUnit; @@ -32,6 +32,7 @@ import static org.hamcrest.Matchers.emptyString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.isA; import static org.hamcrest.Matchers.not; public class RemoteClusterSettingsTests extends ESTestCase { @@ -75,34 +76,22 @@ public void testSeedsDefault() { assertThat(REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace(alias).get(Settings.EMPTY), emptyCollectionOf(String.class)); } - public void testAuthorizationDefault() { + public void testCredentialsDefault() { final String alias = randomAlphaOfLength(8); - assertThat(REMOTE_CLUSTER_CREDENTIALS.getConcreteSettingForNamespace(alias).get(Settings.EMPTY), emptyString()); + final Settings.Builder builder = Settings.builder(); + if (randomBoolean()) { + builder.setSecureSettings(new MockSecureSettings()); + } + assertThat(REMOTE_CLUSTER_CREDENTIALS.getConcreteSettingForNamespace(alias).get(builder.build()).toString(), emptyString()); } - public void testAuthorizationFiltered() { + public void testCredentialsIsSecureSetting() { final String alias = randomAlphaOfLength(8); - assertThat( - REMOTE_CLUSTER_CREDENTIALS.getConcreteSettingForNamespace(alias).getProperties(), - Matchers.hasItem(Setting.Property.Filtered) - ); + assertThat(REMOTE_CLUSTER_CREDENTIALS.getConcreteSettingForNamespace(alias), isA(SecureSetting.class)); } public void testProxyDefault() { final String alias = randomAlphaOfLength(8); assertThat(REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(alias).get(Settings.EMPTY), equalTo("")); } - - public void testRemoteClusterEmptyOrNullApiKey() { - // simple validation - Settings settings = Settings.builder() - .put("cluster.remote.cluster1.credentials", "apikey") - .put("cluster.remote.cluster3.credentials", (String) null) - .build(); - try { - REMOTE_CLUSTER_CREDENTIALS.getAllConcreteSettings(settings).forEach(setting -> setting.get(settings)); - } catch (Throwable t) { - fail("Cluster Settings must be able to accept a null, empty, or non-empty string. Exception: " + t.getMessage()); - } - } } diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java index 837235cd94c04..5d461e906a266 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java @@ -14,7 +14,6 @@ import org.elasticsearch.core.TimeValue; import org.elasticsearch.test.ESTestCase; -import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_CREDENTIALS; import static org.mockito.Mockito.mock; public class RemoteConnectionStrategyTests extends ESTestCase { @@ -112,7 +111,7 @@ public void testCorrectChannelNumber() { for (RemoteConnectionStrategy.ConnectionStrategy strategy : RemoteConnectionStrategy.ConnectionStrategy.values()) { String settingKey = RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).getKey(); Settings proxySettings = Settings.builder().put(settingKey, strategy.name()).build(); - ConnectionProfile proxyProfile = RemoteConnectionStrategy.buildConnectionProfile(clusterAlias, proxySettings); + ConnectionProfile proxyProfile = RemoteConnectionStrategy.buildConnectionProfile(clusterAlias, proxySettings, randomBoolean()); assertEquals( "Incorrect number of channels for " + strategy.name(), strategy.getNumberOfChannels(), @@ -126,9 +125,7 @@ public void testTransportProfile() { // New rcs connection with credentials for (RemoteConnectionStrategy.ConnectionStrategy strategy : RemoteConnectionStrategy.ConnectionStrategy.values()) { - String settingKey = REMOTE_CLUSTER_CREDENTIALS.getConcreteSettingForNamespace(clusterAlias).getKey(); - final Settings settings = Settings.builder().put(settingKey, randomAlphaOfLength(20)).build(); - ConnectionProfile profile = RemoteConnectionStrategy.buildConnectionProfile(clusterAlias, settings); + ConnectionProfile profile = RemoteConnectionStrategy.buildConnectionProfile(clusterAlias, Settings.EMPTY, true); assertEquals( "Incorrect transport profile for " + strategy.name(), RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE, @@ -138,7 +135,7 @@ public void testTransportProfile() { // Legacy ones without credentials for (RemoteConnectionStrategy.ConnectionStrategy strategy : RemoteConnectionStrategy.ConnectionStrategy.values()) { - ConnectionProfile profile = RemoteConnectionStrategy.buildConnectionProfile(clusterAlias, Settings.EMPTY); + ConnectionProfile profile = RemoteConnectionStrategy.buildConnectionProfile(clusterAlias, Settings.EMPTY, false); assertEquals( "Incorrect transport profile for " + strategy.name(), TransportSettings.DEFAULT_PROFILE, diff --git a/server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java index f07b8a30bfc27..1233afff28e37 100644 --- a/server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.AbstractScopedSettings; import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.BoundTransportAddress; @@ -69,13 +70,15 @@ public void setUp() throws Exception { hasClusterCredentials = randomBoolean(); final Settings.Builder builder = Settings.builder().put(modeKey, "sniff"); if (hasClusterCredentials) { - builder.put( + final MockSecureSettings secureSettings = new MockSecureSettings(); + secureSettings.setString( RemoteClusterService.REMOTE_CLUSTER_CREDENTIALS.getConcreteSettingForNamespace(clusterAlias).getKey(), randomAlphaOfLength(20) ); + builder.setSecureSettings(secureSettings); } clientSettings = builder.build(); - profile = RemoteConnectionStrategy.buildConnectionProfile(clusterAlias, clientSettings); + profile = RemoteConnectionStrategy.buildConnectionProfile(clusterAlias, clientSettings, hasClusterCredentials); } @Override diff --git a/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/AbstractRemoteClusterSecurityTestCase.java b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/AbstractRemoteClusterSecurityTestCase.java index 535542476d68c..cfe9c8eafc870 100644 --- a/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/AbstractRemoteClusterSecurityTestCase.java +++ b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/AbstractRemoteClusterSecurityTestCase.java @@ -12,6 +12,7 @@ import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.SecureString; import org.elasticsearch.common.settings.Settings; @@ -25,9 +26,10 @@ import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.xcontent.ObjectPath; import org.junit.AfterClass; -import org.junit.Before; +import org.junit.BeforeClass; import java.io.IOException; +import java.io.UncheckedIOException; import java.nio.charset.StandardCharsets; import java.util.Base64; import java.util.Map; @@ -70,12 +72,25 @@ public abstract class AbstractRemoteClusterSecurityTestCase extends ESRestTestCa protected static ElasticsearchCluster queryCluster; protected static RestClient fulfillingClusterClient; - @Before - public void initFulfillingClusterClient() throws IOException { - if (fulfillingClusterClient == null) { - assert fulfillingCluster != null; - fulfillingClusterClient = buildClient(fulfillingCluster.getHttpAddress(0)); + @BeforeClass + public static void initFulfillingClusterClient() { + if (fulfillingClusterClient != null) { + return; } + assert fulfillingCluster != null; + final int numberOfFcNodes = fulfillingCluster.getHttpAddresses().split(",").length; + final String url = fulfillingCluster.getHttpAddress(randomIntBetween(0, numberOfFcNodes - 1)); + + final int portSeparator = url.lastIndexOf(':'); + final var httpHost = new HttpHost(url.substring(0, portSeparator), Integer.parseInt(url.substring(portSeparator + 1)), "http"); + RestClientBuilder builder = RestClient.builder(httpHost); + try { + configureClient(builder, Settings.EMPTY); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + builder.setStrictDeprecationMode(true); + fulfillingClusterClient = builder.build(); } @AfterClass @@ -94,22 +109,40 @@ protected Settings restClientSettings() { return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", token).build(); } - protected String configureRemoteClustersWithApiKey(String indicesPrivilegesJson) throws Exception { - return configureRemoteClustersWithApiKey(indicesPrivilegesJson, randomBoolean()); + protected static Map createCrossClusterAccessApiKey(String indicesPrivilegesJson) { + initFulfillingClusterClient(); + // Create API key on FC + final var createApiKeyRequest = new Request("POST", "/_security/api_key"); + createApiKeyRequest.setJsonEntity(Strings.format(""" + { + "name": "cross_cluster_access_key", + "role_descriptors": { + "role": { + "cluster": ["cross_cluster_access"], + "index": %s + } + } + }""", indicesPrivilegesJson)); + try { + final Response createApiKeyResponse = performRequestAgainstFulfillingCluster(createApiKeyRequest); + assertOK(createApiKeyResponse); + return responseAsMap(createApiKeyResponse); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + protected void configureRemoteClusters() throws Exception { + // This method assume the cross cluster access API key is already configured in keystore + configureRemoteClusters(randomBoolean()); } /** * Returns API key ID of cross cluster access API key. */ - protected String configureRemoteClustersWithApiKey(String indicesPrivilegesJson, boolean isProxyMode) throws Exception { - // Create API key on FC - final Map apiKeyMap = createCrossClusterAccessApiKey(indicesPrivilegesJson); - final String encodedCrossClusterAccessApiKey = (String) apiKeyMap.get("encoded"); - - // Update remote cluster settings on QC with the API key - final Settings.Builder builder = Settings.builder() - .put("cluster.remote.my_remote_cluster.credentials", encodedCrossClusterAccessApiKey); - ; + protected void configureRemoteClusters(boolean isProxyMode) throws Exception { + // This method assume the cross cluster access API key is already configured in keystore + final Settings.Builder builder = Settings.builder(); if (isProxyMode) { builder.put("cluster.remote.my_remote_cluster.mode", "proxy") .put("cluster.remote.my_remote_cluster.proxy_address", fulfillingCluster.getRemoteClusterServerEndpoint(0)); @@ -133,44 +166,13 @@ protected String configureRemoteClustersWithApiKey(String indicesPrivilegesJson, } assertThat(ObjectPath.eval("my_remote_cluster.cluster_credentials", remoteInfoMap), equalTo("::es_redacted::")); }); - - return (String) apiKeyMap.get("id"); } - protected Map createCrossClusterAccessApiKey(String indicesPrivilegesJson) throws IOException { - // Create API key on FC - final var createApiKeyRequest = new Request("POST", "/_security/api_key"); - createApiKeyRequest.setJsonEntity(Strings.format(""" - { - "name": "cross_cluster_access_key", - "role_descriptors": { - "role": { - "cluster": ["cross_cluster_access"], - "index": %s - } - } - }""", indicesPrivilegesJson)); - final Response createApiKeyResponse = performRequestAgainstFulfillingCluster(createApiKeyRequest); - assertOK(createApiKeyResponse); - final Map apiKeyMap = responseAsMap(createApiKeyResponse); - return apiKeyMap; - } - - protected Response performRequestAgainstFulfillingCluster(Request request) throws IOException { + protected static Response performRequestAgainstFulfillingCluster(Request request) throws IOException { request.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("Authorization", basicAuthHeaderValue(USER, PASS))); return fulfillingClusterClient.performRequest(request); } - private RestClient buildClient(final String url) throws IOException { - final int portSeparator = url.lastIndexOf(':'); - final var httpHost = new HttpHost( - url.substring(0, portSeparator), - Integer.parseInt(url.substring(portSeparator + 1)), - getProtocol() - ); - return buildClient(Settings.EMPTY, new HttpHost[] { httpHost }); - } - // TODO centralize common usage of this across all tests protected static String randomEncodedApiKey() { return Base64.getEncoder().encodeToString((UUIDs.base64UUID() + ":" + UUIDs.base64UUID()).getBytes(StandardCharsets.UTF_8)); diff --git a/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityApiKeyRestIT.java b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityApiKeyRestIT.java index ce68b14d67052..8a49dbcb181fc 100644 --- a/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityApiKeyRestIT.java +++ b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityApiKeyRestIT.java @@ -25,12 +25,19 @@ import java.util.Arrays; import java.util.List; import java.util.Locale; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; -import static org.hamcrest.Matchers.*; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; public class RemoteClusterSecurityApiKeyRestIT extends AbstractRemoteClusterSecurityTestCase { + private static final AtomicReference> API_KEY_MAP_REF = new AtomicReference<>(); + static { fulfillingCluster = ElasticsearchCluster.local() .name("fulfilling-cluster") @@ -48,6 +55,20 @@ public class RemoteClusterSecurityApiKeyRestIT extends AbstractRemoteClusterSecu .apply(commonClusterConfig) .setting("xpack.security.remote_cluster_client.ssl.enabled", "true") .setting("xpack.security.remote_cluster_client.ssl.certificate_authorities", "remote-cluster-ca.crt") + .keystore("cluster.remote.my_remote_cluster.credentials", () -> { + if (API_KEY_MAP_REF.get() == null) { + final Map apiKeyMap = createCrossClusterAccessApiKey(""" + [ + { + "names": ["index*", "not_found_index"], + "privileges": ["read", "read_cross_cluster"] + } + ]"""); + API_KEY_MAP_REF.set(apiKeyMap); + } + return (String) API_KEY_MAP_REF.get().get("encoded"); + }) + .keystore("cluster.remote.invalid_remote.credentials", randomEncodedApiKey()) .build(); } @@ -56,13 +77,8 @@ public class RemoteClusterSecurityApiKeyRestIT extends AbstractRemoteClusterSecu public static TestRule clusterRule = RuleChain.outerRule(fulfillingCluster).around(queryCluster); public void testCrossClusterSearchWithApiKey() throws Exception { - final String remoteAccessApiKeyId = configureRemoteClustersWithApiKey(""" - [ - { - "names": ["index*", "not_found_index"], - "privileges": ["read", "read_cross_cluster"] - } - ]"""); + configureRemoteClusters(); + final String remoteAccessApiKeyId = (String) API_KEY_MAP_REF.get().get("id"); // Fulfilling cluster { @@ -296,14 +312,19 @@ public void testCrossClusterSearchWithApiKey() throws Exception { ); // Check that authentication fails if we use a non-existent cross cluster access API key - updateClusterSettings(Settings.builder().put("cluster.remote.my_remote_cluster.credentials", randomEncodedApiKey()).build()); + updateClusterSettings( + Settings.builder() + .put("cluster.remote.invalid_remote.mode", "proxy") + .put("cluster.remote.invalid_remote.proxy_address", fulfillingCluster.getRemoteClusterServerEndpoint(0)) + .build() + ); final ResponseException exception4 = expectThrows( ResponseException.class, - () -> performRequestWithApiKey(new Request("GET", "/my_remote_cluster:index1/_search"), apiKeyEncoded) + () -> performRequestWithApiKey(new Request("GET", "/invalid_remote:index1/_search"), apiKeyEncoded) ); - assertThat(exception4.getResponse().getStatusLine().getStatusCode(), equalTo(401)); - assertThat(exception4.getMessage(), containsString("unable to authenticate user")); - assertThat(exception4.getMessage(), containsString("unable to find apikey")); + // TODO: improve the error code and message + assertThat(exception4.getResponse().getStatusLine().getStatusCode(), equalTo(500)); + assertThat(exception4.getMessage(), containsString("Unable to open any proxy connections to cluster [invalid_remote]")); } } diff --git a/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityRestIT.java b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityRestIT.java index 38c24ba85b196..9befbf45eeb2e 100644 --- a/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityRestIT.java +++ b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityRestIT.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import static org.hamcrest.Matchers.anEmptyMap; @@ -39,6 +40,8 @@ public class RemoteClusterSecurityRestIT extends AbstractRemoteClusterSecurityTestCase { + private static final AtomicReference> API_KEY_MAP_REF = new AtomicReference<>(); + static { fulfillingCluster = ElasticsearchCluster.local() .name("fulfilling-cluster") @@ -57,6 +60,21 @@ public class RemoteClusterSecurityRestIT extends AbstractRemoteClusterSecurityTe .apply(commonClusterConfig) .setting("xpack.security.remote_cluster_client.ssl.enabled", "true") .setting("xpack.security.remote_cluster_client.ssl.certificate_authorities", "remote-cluster-ca.crt") + .keystore("cluster.remote.my_remote_cluster.credentials", () -> { + if (API_KEY_MAP_REF.get() == null) { + final Map apiKeyMap = createCrossClusterAccessApiKey(""" + [ + { + "names": ["index*", "not_found_index", "shared-metrics"], + "privileges": ["read", "read_cross_cluster"] + } + ]"""); + API_KEY_MAP_REF.set(apiKeyMap); + } + return (String) API_KEY_MAP_REF.get().get("encoded"); + }) + // Define a bogus API key for another remote cluster + .keystore("cluster.remote.invalid_remote.credentials", randomEncodedApiKey()) .rolesFile(Resource.fromClasspath("roles.yml")) .user(REMOTE_METRIC_USER, PASS.toString(), "read_remote_shared_metrics") .build(); @@ -67,13 +85,8 @@ public class RemoteClusterSecurityRestIT extends AbstractRemoteClusterSecurityTe public static TestRule clusterRule = RuleChain.outerRule(fulfillingCluster).around(queryCluster); public void testCrossClusterSearch() throws Exception { - final String crossClusterAccessApiKeyId = configureRemoteClustersWithApiKey(""" - [ - { - "names": ["index*", "not_found_index", "shared-metrics"], - "privileges": ["read", "read_cross_cluster"] - } - ]"""); + configureRemoteClusters(); + final String crossClusterAccessApiKeyId = (String) API_KEY_MAP_REF.get().get("id"); // Fulfilling cluster { @@ -257,14 +270,19 @@ public void testCrossClusterSearch() throws Exception { ); // Check that authentication fails if we use a non-existent API key - updateClusterSettings(Settings.builder().put("cluster.remote.my_remote_cluster.credentials", randomEncodedApiKey()).build()); + updateClusterSettings( + Settings.builder() + .put("cluster.remote.invalid_remote.mode", "proxy") + .put("cluster.remote.invalid_remote.proxy_address", fulfillingCluster.getRemoteClusterServerEndpoint(0)) + .build() + ); final ResponseException exception4 = expectThrows( ResponseException.class, - () -> performRequestWithRemoteSearchUser(new Request("GET", "/my_remote_cluster:index1/_search")) + () -> performRequestWithRemoteSearchUser(new Request("GET", "/invalid_remote:index1/_search")) ); - assertThat(exception4.getResponse().getStatusLine().getStatusCode(), equalTo(401)); - assertThat(exception4.getMessage(), containsString("unable to authenticate user")); - assertThat(exception4.getMessage(), containsString("unable to find apikey")); + // TODO: improve the error code and message + assertThat(exception4.getResponse().getStatusLine().getStatusCode(), equalTo(500)); + assertThat(exception4.getMessage(), containsString("Unable to open any proxy connections to cluster [invalid_remote]")); } } diff --git a/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecuritySpecialUserIT.java b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecuritySpecialUserIT.java index 66ae1df757018..1176204b988f6 100644 --- a/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecuritySpecialUserIT.java +++ b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecuritySpecialUserIT.java @@ -23,6 +23,8 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -32,6 +34,8 @@ public class RemoteClusterSecuritySpecialUserIT extends AbstractRemoteClusterSecurityTestCase { + private static final AtomicReference> API_KEY_MAP_REF = new AtomicReference<>(); + static { fulfillingCluster = ElasticsearchCluster.local() .name("fulfilling-cluster") @@ -54,6 +58,20 @@ public class RemoteClusterSecuritySpecialUserIT extends AbstractRemoteClusterSec .setting("xpack.security.remote_cluster_client.ssl.enabled", "true") .setting("xpack.security.remote_cluster_client.ssl.certificate_authorities", "remote-cluster-ca.crt") .user(REMOTE_SEARCH_USER, PASS.toString(), "read_remote_shared_metrics") + .keystore("cluster.remote.my_remote_cluster.credentials", () -> { + if (API_KEY_MAP_REF.get() == null) { + final Map apiKeyMap = createCrossClusterAccessApiKey(""" + [ + { + "names": ["shared-*", "apm-1", ".security*"], + "privileges": ["read", "read_cross_cluster"], + "allow_restricted_indices": true + } + ]"""); + API_KEY_MAP_REF.set(apiKeyMap); + } + return (String) API_KEY_MAP_REF.get().get("encoded"); + }) .build(); } @@ -62,14 +80,8 @@ public class RemoteClusterSecuritySpecialUserIT extends AbstractRemoteClusterSec public static TestRule clusterRule = RuleChain.outerRule(fulfillingCluster).around(queryCluster); public void testAnonymousUserFromQueryClusterWorks() throws Exception { - final String crossClusterAccessApiKeyId = configureRemoteClustersWithApiKey(""" - [ - { - "names": ["shared-*", "apm-1", ".security*"], - "privileges": ["read", "read_cross_cluster"], - "allow_restricted_indices": true - } - ]"""); + configureRemoteClusters(); + final String crossClusterAccessApiKeyId = (String) API_KEY_MAP_REF.get().get("id"); // Fulfilling cluster { diff --git a/x-pack/plugin/security/qa/security-trial/build.gradle b/x-pack/plugin/security/qa/security-trial/build.gradle index 7119b02b84aed..1b1d5495f1d37 100644 --- a/x-pack/plugin/security/qa/security-trial/build.gradle +++ b/x-pack/plugin/security/qa/security-trial/build.gradle @@ -24,6 +24,11 @@ testClusters.matching { it.name == 'javaRestTest' }.configureEach { setting 'xpack.security.authc.api_key.enabled', 'true' setting 'xpack.security.remote_cluster_client.ssl.enabled', 'false' + keystore 'cluster.remote.my_remote_cluster_a.credentials', 'cluster_a_credentials' + keystore 'cluster.remote.my_remote_cluster_b.credentials', 'cluster_b_credentials' + keystore 'cluster.remote.my_remote_cluster_a_1.credentials', 'cluster_a_credentials' + keystore 'cluster.remote.my_remote_cluster_a_2.credentials', 'cluster_a_credentials' + rolesFile file('src/javaRestTest/resources/roles.yml') user username: "admin_user", password: "admin-password" user username: "security_test_user", password: "security-test-password", role: "security_test_role" diff --git a/x-pack/plugin/security/qa/security-trial/src/javaRestTest/java/org/elasticsearch/xpack/security/crossclusteraccess/CrossClusterAccessHeadersForCcsRestIT.java b/x-pack/plugin/security/qa/security-trial/src/javaRestTest/java/org/elasticsearch/xpack/security/crossclusteraccess/CrossClusterAccessHeadersForCcsRestIT.java index 1fe225aab6818..2e4ffc198e4b1 100644 --- a/x-pack/plugin/security/qa/security-trial/src/javaRestTest/java/org/elasticsearch/xpack/security/crossclusteraccess/CrossClusterAccessHeadersForCcsRestIT.java +++ b/x-pack/plugin/security/qa/security-trial/src/javaRestTest/java/org/elasticsearch/xpack/security/crossclusteraccess/CrossClusterAccessHeadersForCcsRestIT.java @@ -71,7 +71,6 @@ import java.util.function.BiConsumer; import java.util.stream.Collectors; -import static org.elasticsearch.common.UUIDs.randomBase64UUID; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasKey; @@ -85,6 +84,8 @@ public static void checkFeatureFlag() { private static final String CLUSTER_A = "my_remote_cluster_a"; private static final String CLUSTER_B = "my_remote_cluster_b"; + private static final String CLUSTER_A_CREDENTIALS = "cluster_a_credentials"; + private static final String CLUSTER_B_CREDENTIALS = "cluster_b_credentials"; private static final String REMOTE_SEARCH_USER = "remote_search_user"; private static final SecureString PASSWORD = new SecureString("super-secret-password".toCharArray()); private static final String REMOTE_SEARCH_ROLE = "remote_search"; @@ -141,13 +142,12 @@ public void tearDown() throws Exception { public void testCrossClusterAccessHeadersSentSingleRemote() throws Exception { final BlockingQueue capturedHeaders = ConcurrentCollections.newBlockingQueue(); try (MockTransportService remoteTransport = startTransport("remoteNodeA", threadPool, capturedHeaders)) { - final String encodedCredential = randomBase64UUID(random()); final TransportAddress remoteAddress = remoteTransport.getOriginalTransport() .profileBoundAddresses() .get("_remote_cluster") .publishAddress(); final boolean useProxyMode = randomBoolean(); - setupClusterSettings(CLUSTER_A, encodedCredential, remoteAddress, useProxyMode); + setupClusterSettings(CLUSTER_A, remoteAddress, useProxyMode); final boolean alsoSearchLocally = randomBoolean(); final boolean minimizeRoundtrips = randomBoolean(); final Request searchRequest = new Request( @@ -173,7 +173,7 @@ public void testCrossClusterAccessHeadersSentSingleRemote() throws Exception { List.copyOf(capturedHeaders), useProxyMode, minimizeRoundtrips, - encodedCredential, + CLUSTER_A_CREDENTIALS, this::assertCrossClusterAccessSubjectInfoMatchesNativeUser, new RoleDescriptorsIntersection( List.of( @@ -211,20 +211,16 @@ public void testCrossClusterAccessHeadersSentMultipleRemotes() throws Exception MockTransportService remoteTransportA = startTransport("remoteNodeA", threadPool, capturedHeadersByCluster.get(CLUSTER_A)); MockTransportService remoteTransportB = startTransport("remoteNodeB", threadPool, capturedHeadersByCluster.get(CLUSTER_B)) ) { - final String clusterCredentialA = randomBase64UUID(random()); final boolean useProxyModeA = randomBoolean(); setupClusterSettings( CLUSTER_A, - clusterCredentialA, remoteTransportA.getOriginalTransport().profileBoundAddresses().get("_remote_cluster").publishAddress(), useProxyModeA ); - final String clusterCredentialB = randomBase64UUID(random()); final boolean useProxyModeB = randomBoolean(); setupClusterSettings( CLUSTER_B, - clusterCredentialB, remoteTransportB.getOriginalTransport().profileBoundAddresses().get("_remote_cluster").publishAddress(), useProxyModeB ); @@ -248,7 +244,7 @@ public void testCrossClusterAccessHeadersSentMultipleRemotes() throws Exception List.copyOf(capturedHeadersByCluster.get(CLUSTER_A)), useProxyModeA, minimizeRoundtrips, - clusterCredentialA, + CLUSTER_A_CREDENTIALS, this::assertCrossClusterAccessSubjectInfoMatchesNativeUser, new RoleDescriptorsIntersection( List.of( @@ -276,7 +272,7 @@ public void testCrossClusterAccessHeadersSentMultipleRemotes() throws Exception List.copyOf(capturedHeadersByCluster.get(CLUSTER_B)), useProxyModeB, minimizeRoundtrips, - clusterCredentialB, + CLUSTER_B_CREDENTIALS, this::assertCrossClusterAccessSubjectInfoMatchesNativeUser, new RoleDescriptorsIntersection( List.of( @@ -358,20 +354,16 @@ public void testApiKeyCrossClusterAccessHeadersSentMultipleRemotes() throws Exce MockTransportService remoteTransportA = startTransport("remoteNodeA", threadPool, capturedHeadersByCluster.get(CLUSTER_A)); MockTransportService remoteTransportB = startTransport("remoteNodeB", threadPool, capturedHeadersByCluster.get(CLUSTER_B)) ) { - final String clusterCredentialA = randomBase64UUID(random()); final boolean useProxyModeA = randomBoolean(); setupClusterSettings( CLUSTER_A, - clusterCredentialA, remoteTransportA.getOriginalTransport().profileBoundAddresses().get("_remote_cluster").publishAddress(), useProxyModeA ); - final String clusterCredentialB = randomBase64UUID(random()); final boolean useProxyModeB = randomBoolean(); setupClusterSettings( CLUSTER_B, - clusterCredentialB, remoteTransportB.getOriginalTransport().profileBoundAddresses().get("_remote_cluster").publishAddress(), useProxyModeB ); @@ -391,7 +383,7 @@ public void testApiKeyCrossClusterAccessHeadersSentMultipleRemotes() throws Exce List.copyOf(capturedHeadersByCluster.get(CLUSTER_A)), useProxyModeA, minimizeRoundtrips, - clusterCredentialA, + CLUSTER_A_CREDENTIALS, this::assertCrossClusterAccessSubjectInfoMatchesApiKey, new RoleDescriptorsIntersection( List.of( @@ -435,7 +427,7 @@ public void testApiKeyCrossClusterAccessHeadersSentMultipleRemotes() throws Exce List.copyOf(capturedHeadersByCluster.get(CLUSTER_B)), useProxyModeB, minimizeRoundtrips, - clusterCredentialB, + CLUSTER_B_CREDENTIALS, this::assertCrossClusterAccessSubjectInfoMatchesApiKey, new RoleDescriptorsIntersection( List.of( @@ -725,13 +717,12 @@ private void testCcsWithApiKeyCrossClusterAccessAuthenticationAgainstSingleClust ) throws IOException { final BlockingQueue capturedHeaders = ConcurrentCollections.newBlockingQueue(); try (MockTransportService remoteTransport = startTransport("remoteNode-" + cluster, threadPool, capturedHeaders)) { - final String clusterCredential = randomBase64UUID(random()); final TransportAddress remoteAddress = remoteTransport.getOriginalTransport() .profileBoundAddresses() .get("_remote_cluster") .publishAddress(); final boolean useProxyMode = randomBoolean(); - setupClusterSettings(cluster, clusterCredential, remoteAddress, useProxyMode); + setupClusterSettings(cluster, remoteAddress, useProxyMode); final boolean alsoSearchLocally = randomBoolean(); final boolean minimizeRoundtrips = randomBoolean(); final Request searchRequest = new Request( @@ -753,7 +744,7 @@ private void testCcsWithApiKeyCrossClusterAccessAuthenticationAgainstSingleClust List.copyOf(capturedHeaders), useProxyMode, minimizeRoundtrips, - clusterCredential, + CLUSTER_A_CREDENTIALS, this::assertCrossClusterAccessSubjectInfoMatchesApiKey, expectedRoleDescriptorsIntersection ); @@ -839,19 +830,12 @@ private void updateOrBulkUpdateApiKey(String id, String roleDescriptors) throws } } - private void setupClusterSettings( - final String clusterAlias, - final String clusterCredential, - final TransportAddress remoteAddress, - boolean useProxyMode - ) throws IOException { + private void setupClusterSettings(final String clusterAlias, final TransportAddress remoteAddress, boolean useProxyMode) + throws IOException { if (useProxyMode) { - updateRemoteClusterSettings( - clusterAlias, - Map.of("mode", "proxy", "proxy_address", remoteAddress.toString(), "credentials", clusterCredential) - ); + updateRemoteClusterSettings(clusterAlias, Map.of("mode", "proxy", "proxy_address", remoteAddress.toString())); } else { - updateRemoteClusterSettings(clusterAlias, Map.of("seeds", remoteAddress.toString(), "credentials", clusterCredential)); + updateRemoteClusterSettings(clusterAlias, Map.of("seeds", remoteAddress.toString())); } } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java index 436977d8f84d7..db5ebc30f1ba7 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java @@ -905,10 +905,7 @@ Collection createComponents( ipFilter.set(new IPFilter(settings, auditTrailService, clusterService.getClusterSettings(), getLicenseState())); components.add(ipFilter.get()); - final RemoteClusterCredentialsResolver remoteClusterCredentialsResolver = new RemoteClusterCredentialsResolver( - settings, - clusterService.getClusterSettings() - ); + final RemoteClusterCredentialsResolver remoteClusterCredentialsResolver = new RemoteClusterCredentialsResolver(settings); DestructiveOperations destructiveOperations = new DestructiveOperations(settings, clusterService.getClusterSettings()); final CrossClusterAccessAuthenticationService crossClusterAccessAuthcService = new CrossClusterAccessAuthenticationService( diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/RemoteClusterCredentialsResolver.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/RemoteClusterCredentialsResolver.java index f1adc1f1342c0..3b80cd678a568 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/RemoteClusterCredentialsResolver.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/RemoteClusterCredentialsResolver.java @@ -10,10 +10,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.SecureString; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.xpack.security.authc.ApiKeyService; import java.util.Map; @@ -23,38 +21,24 @@ public class RemoteClusterCredentialsResolver { - private static final Logger LOGGER = LogManager.getLogger(RemoteClusterCredentialsResolver.class); + private static final Logger logger = LogManager.getLogger(RemoteClusterCredentialsResolver.class); - private final Map apiKeys = ConcurrentCollections.newConcurrentMap(); + private final Map clusterCredentials; - public RemoteClusterCredentialsResolver(final Settings settings, final ClusterSettings clusterSettings) { - if (TcpTransport.isUntrustedRemoteClusterEnabled()) { - for (final Map.Entry entry : REMOTE_CLUSTER_CREDENTIALS.getAsMap(settings).entrySet()) { - if (Strings.isEmpty(entry.getValue()) == false) { - update(entry.getKey(), entry.getValue()); - } - } - clusterSettings.addAffixUpdateConsumer(REMOTE_CLUSTER_CREDENTIALS, this::update, (clusterAlias, credentials) -> {}); - } + public RemoteClusterCredentialsResolver(final Settings settings) { + this.clusterCredentials = REMOTE_CLUSTER_CREDENTIALS.getAsMap(settings); + logger.debug( + "Read cluster credentials for remote clusters [{}]", + Strings.collectionToCommaDelimitedString(clusterCredentials.keySet()) + ); } public Optional resolve(final String clusterAlias) { - if (TcpTransport.isUntrustedRemoteClusterEnabled()) { - final String apiKey = apiKeys.get(clusterAlias); - return apiKey == null - ? Optional.empty() - : Optional.of(new RemoteClusterCredentials(clusterAlias, ApiKeyService.withApiKeyPrefix(apiKey))); - } - return Optional.empty(); - } - - private void update(final String clusterAlias, final String credentials) { - if (Strings.isEmpty(credentials)) { - apiKeys.remove(clusterAlias); - LOGGER.debug("Credentials value for cluster alias [{}] removed", clusterAlias); + final SecureString apiKey = clusterCredentials.get(clusterAlias); + if (apiKey == null) { + return Optional.empty(); } else { - final boolean notFound = Strings.isEmpty(apiKeys.put(clusterAlias, credentials)); - LOGGER.debug("Credentials value for cluster alias [{}] {}", clusterAlias, (notFound ? "added" : "updated")); + return Optional.of(new RemoteClusterCredentials(clusterAlias, ApiKeyService.withApiKeyPrefix(apiKey.toString()))); } } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/RemoteClusterCredentialsResolverTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/RemoteClusterCredentialsResolverTests.java index 192ad27daeba3..debb50384e217 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/RemoteClusterCredentialsResolverTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/RemoteClusterCredentialsResolverTests.java @@ -6,26 +6,11 @@ */ package org.elasticsearch.xpack.security.transport; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.Metadata; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.RoutingTable; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.UUIDs; -import org.elasticsearch.common.settings.SecureString; +import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.TestThreadPool; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.xpack.security.authc.ApiKeyService; -import org.junit.BeforeClass; -import java.nio.charset.StandardCharsets; -import java.util.Base64; import java.util.Optional; import static org.elasticsearch.xpack.security.transport.RemoteClusterCredentialsResolver.RemoteClusterCredentials; @@ -34,116 +19,20 @@ public class RemoteClusterCredentialsResolverTests extends ESTestCase { - private ThreadPool threadPool; - private ClusterService clusterService; - - @BeforeClass - public static void checkFeatureFlag() { - assumeTrue("untrusted remote cluster feature flag must be enabled", TcpTransport.isUntrustedRemoteClusterEnabled()); - } - - @Override - public void setUp() throws Exception { - super.setUp(); - this.threadPool = new TestThreadPool(getTestName()); - this.clusterService = ClusterServiceUtils.createClusterService(this.threadPool); - } - - @Override - public void tearDown() throws Exception { - super.tearDown(); - clusterService.close(); - terminate(threadPool); - } - - public void testRemoteClusterApiKeyChanges() { + public void testResolveRemoteClusterCredentials() { final String clusterNameA = "clusterA"; - final String clusterNameB = "clusterB"; final String clusterDoesNotExist = randomAlphaOfLength(10); - final Settings.Builder initialSettingsBuilder = Settings.builder(); - initialSettingsBuilder.put("cluster.remote." + clusterNameA + ".credentials", "initialize"); - if (randomBoolean()) { - initialSettingsBuilder.put("cluster.remote." + clusterNameB + ".credentials", ""); - } - final Settings initialSettings = initialSettingsBuilder.build(); - RemoteClusterCredentialsResolver remoteClusterCredentialsResolver = new RemoteClusterCredentialsResolver( - initialSettings, - clusterService.getClusterSettings() - ); - assertThat( - remoteClusterCredentialsResolver.resolve(clusterNameA), - is(equalTo(remoteClusterCredentials(clusterNameA, "initialize"))) - ); - assertThat(remoteClusterCredentialsResolver.resolve(clusterNameB), is(Optional.empty())); - assertThat(remoteClusterCredentialsResolver.resolve(clusterDoesNotExist), is(Optional.empty())); - final DiscoveryNode masterNodeA = clusterService.state().nodes().getMasterNode(); - - // Add clusterB authorization setting - final String clusterBapiKey1 = randomApiKey(); - final Settings newSettingsAddClusterB = Settings.builder() - .put("cluster.remote." + clusterNameA + ".credentials", "addB") - .put("cluster.remote." + clusterNameB + ".credentials", clusterBapiKey1) - .build(); - final ClusterState newClusterState1 = createClusterState(clusterNameA, masterNodeA, newSettingsAddClusterB); - ClusterServiceUtils.setState(clusterService, newClusterState1); - assertThat(remoteClusterCredentialsResolver.resolve(clusterNameA), is(equalTo(remoteClusterCredentials(clusterNameA, "addB")))); - assertThat( - remoteClusterCredentialsResolver.resolve(clusterNameB), - is(equalTo(remoteClusterCredentials(clusterNameB, clusterBapiKey1))) - ); - assertThat(remoteClusterCredentialsResolver.resolve(clusterDoesNotExist), is(Optional.empty())); - - // Change clusterB authorization setting - final String clusterBapiKey2 = randomApiKey(); - final Settings newSettingsUpdateClusterB = Settings.builder() - .put("cluster.remote." + clusterNameA + ".credentials", "editB") - .put("cluster.remote." + clusterNameB + ".credentials", clusterBapiKey2) - .build(); - final ClusterState newClusterState2 = createClusterState(clusterNameA, masterNodeA, newSettingsUpdateClusterB); - ClusterServiceUtils.setState(clusterService, newClusterState2); - assertThat(remoteClusterCredentialsResolver.resolve(clusterNameA), is(equalTo(remoteClusterCredentials(clusterNameA, "editB")))); - assertThat( - remoteClusterCredentialsResolver.resolve(clusterNameB), - is(equalTo(remoteClusterCredentials(clusterNameB, clusterBapiKey2))) - ); - assertThat(remoteClusterCredentialsResolver.resolve(clusterDoesNotExist), is(Optional.empty())); - - // Remove clusterB authorization setting - final Settings.Builder newSettingsOmitClusterBBuilder = Settings.builder(); - newSettingsOmitClusterBBuilder.put("cluster.remote." + clusterNameA + ".credentials", "omitB"); - if (randomBoolean()) { - initialSettingsBuilder.put("cluster.remote." + clusterNameB + ".credentials", ""); - } - final Settings newSettingsOmitClusterB = newSettingsOmitClusterBBuilder.build(); - final ClusterState newClusterState3 = createClusterState(clusterNameA, masterNodeA, newSettingsOmitClusterB); - ClusterServiceUtils.setState(clusterService, newClusterState3); - assertThat(remoteClusterCredentialsResolver.resolve(clusterNameA), is(equalTo(remoteClusterCredentials(clusterNameA, "omitB")))); - assertThat(remoteClusterCredentialsResolver.resolve(clusterNameB), is(Optional.empty())); - assertThat(remoteClusterCredentialsResolver.resolve(clusterDoesNotExist), is(Optional.empty())); - } - - private static Optional remoteClusterCredentials(String clusterAlias, String encodedApiKeyValue) { - return Optional.of(new RemoteClusterCredentials(clusterAlias, ApiKeyService.withApiKeyPrefix(encodedApiKeyValue))); - } - - private static ClusterState createClusterState(final String clusterName, final DiscoveryNode masterNode, final Settings newSettings) { - final DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder(); - discoBuilder.add(masterNode); - discoBuilder.masterNodeId(masterNode.getId()); - - final ClusterState.Builder state = ClusterState.builder(new ClusterName(clusterName)); - state.nodes(discoBuilder); - state.metadata(Metadata.builder().persistentSettings(newSettings).generateClusterUuidIfNeeded()); - state.routingTable(RoutingTable.builder().build()); - return state.build(); - } - - private String randomApiKey() { - final String id = "apikey_" + randomAlphaOfLength(6); - // Sufficient for testing. See ApiKeyService and ApiKeyService.ApiKeyCredentials for actual API Key generation. - try (SecureString secret = UUIDs.randomBase64UUIDSecureString()) { - final String apiKey = id + ":" + secret; - return Base64.getEncoder().withoutPadding().encodeToString(apiKey.getBytes(StandardCharsets.UTF_8)); - } + final Settings.Builder builder = Settings.builder(); + + final String secret = randomAlphaOfLength(20); + final MockSecureSettings secureSettings = new MockSecureSettings(); + secureSettings.setString("cluster.remote." + clusterNameA + ".credentials", secret); + final Settings settings = builder.setSecureSettings(secureSettings).build(); + RemoteClusterCredentialsResolver remoteClusterAuthorizationResolver = new RemoteClusterCredentialsResolver(settings); + final Optional remoteClusterCredentials = remoteClusterAuthorizationResolver.resolve(clusterNameA); + assertThat(remoteClusterCredentials.isPresent(), is(true)); + assertThat(remoteClusterCredentials.get().clusterAlias(), equalTo(clusterNameA)); + assertThat(remoteClusterCredentials.get().credentials(), equalTo(ApiKeyService.withApiKeyPrefix(secret))); + assertThat(remoteClusterAuthorizationResolver.resolve(clusterDoesNotExist), is(Optional.empty())); } } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java index dc86c03199af9..a3ba66848602e 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java @@ -150,7 +150,7 @@ public void testSendAsync() throws Exception { new ClusterSettings(Settings.EMPTY, Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING)) ), mock(CrossClusterAccessAuthenticationService.class), - new RemoteClusterCredentialsResolver(settings, clusterService.getClusterSettings()) + new RemoteClusterCredentialsResolver(settings) ); ClusterServiceUtils.setState(clusterService, clusterService.state()); // force state update to trigger listener @@ -201,7 +201,7 @@ public void testSendAsyncSwitchToSystem() throws Exception { new ClusterSettings(Settings.EMPTY, Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING)) ), mock(CrossClusterAccessAuthenticationService.class), - new RemoteClusterCredentialsResolver(settings, clusterService.getClusterSettings()) + new RemoteClusterCredentialsResolver(settings) ); ClusterServiceUtils.setState(clusterService, clusterService.state()); // force state update to trigger listener @@ -245,7 +245,7 @@ public void testSendWithoutUser() throws Exception { new ClusterSettings(Settings.EMPTY, Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING)) ), mock(CrossClusterAccessAuthenticationService.class), - new RemoteClusterCredentialsResolver(settings, clusterService.getClusterSettings()) + new RemoteClusterCredentialsResolver(settings) ) { @Override void assertNoAuthentication(String action) {} @@ -307,7 +307,7 @@ public void testSendToNewerVersionSetsCorrectVersion() throws Exception { new ClusterSettings(Settings.EMPTY, Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING)) ), mock(CrossClusterAccessAuthenticationService.class), - new RemoteClusterCredentialsResolver(settings, clusterService.getClusterSettings()) + new RemoteClusterCredentialsResolver(settings) ); ClusterServiceUtils.setState(clusterService, clusterService.state()); // force state update to trigger listener @@ -375,7 +375,7 @@ public void testSendToOlderVersionSetsCorrectVersion() throws Exception { new ClusterSettings(Settings.EMPTY, Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING)) ), mock(CrossClusterAccessAuthenticationService.class), - new RemoteClusterCredentialsResolver(settings, clusterService.getClusterSettings()) + new RemoteClusterCredentialsResolver(settings) ); ClusterServiceUtils.setState(clusterService, clusterService.state()); // force state update to trigger listener @@ -439,7 +439,7 @@ public void testSetUserBasedOnActionOrigin() { new ClusterSettings(Settings.EMPTY, Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING)) ), mock(CrossClusterAccessAuthenticationService.class), - new RemoteClusterCredentialsResolver(settings, clusterService.getClusterSettings()) + new RemoteClusterCredentialsResolver(settings) ); final AtomicBoolean calledWrappedSender = new AtomicBoolean(false); @@ -1117,7 +1117,7 @@ public void testProfileFiltersCreatedDifferentlyForDifferentTransportAndRemoteCl new ClusterSettings(Settings.EMPTY, Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING)) ), mock(CrossClusterAccessAuthenticationService.class), - new RemoteClusterCredentialsResolver(settings, clusterService.getClusterSettings()) + new RemoteClusterCredentialsResolver(settings) ); final Map profileFilters = securityServerTransportInterceptor.getProfileFilters(); @@ -1170,7 +1170,7 @@ public void testNoProfileFilterForRemoteClusterWhenTheFeatureIsDisabled() { new ClusterSettings(Settings.EMPTY, Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING)) ), mock(CrossClusterAccessAuthenticationService.class), - new RemoteClusterCredentialsResolver(settings, clusterService.getClusterSettings()) + new RemoteClusterCredentialsResolver(settings) ); final Map profileFilters = securityServerTransportInterceptor.getProfileFilters();