diff --git a/docs/changelog/118102.yaml b/docs/changelog/118102.yaml new file mode 100644 index 0000000000000..e5ec32cdddbec --- /dev/null +++ b/docs/changelog/118102.yaml @@ -0,0 +1,5 @@ +pr: 118102 +summary: "ESQL: Enterprise license enforcement for CCS" +area: ES|QL +type: enhancement +issues: [] diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicensedFeature.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicensedFeature.java index d86c15aa14bc9..558303f7e0f0f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicensedFeature.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicensedFeature.java @@ -104,7 +104,7 @@ public boolean isNeedsActive() { return needsActive; } - /** Create a momentary feature for hte given license level */ + /** Create a momentary feature for the given license level */ public static Momentary momentary(String family, String name, License.OperationMode licenseLevel) { return new Momentary(family, name, licenseLevel, true); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/XPackLicenseState.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/XPackLicenseState.java index 4f8a18e28aea1..3c7b089b4cd63 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/XPackLicenseState.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/XPackLicenseState.java @@ -106,6 +106,7 @@ public class XPackLicenseState { messages.put(XPackField.CCR, XPackLicenseState::ccrAcknowledgementMessages); messages.put(XPackField.ENTERPRISE_SEARCH, XPackLicenseState::enterpriseSearchAcknowledgementMessages); messages.put(XPackField.REDACT_PROCESSOR, XPackLicenseState::redactProcessorAcknowledgementMessages); + messages.put(XPackField.ESQL, XPackLicenseState::esqlAcknowledgementMessages); ACKNOWLEDGMENT_MESSAGES = Collections.unmodifiableMap(messages); } @@ -243,6 +244,26 @@ private static String[] enterpriseSearchAcknowledgementMessages(OperationMode cu return Strings.EMPTY_ARRAY; } + private static String[] esqlAcknowledgementMessages(OperationMode currentMode, OperationMode newMode) { + /* + * Provide an acknowledgement warning to customers that downgrade from Trial or Enterprise to a lower + * license level (Basic, Standard, Gold or Premium) that they will no longer be able to do CCS in ES|QL. + */ + switch (newMode) { + case BASIC: + case STANDARD: + case GOLD: + case PLATINUM: + switch (currentMode) { + case TRIAL: + case ENTERPRISE: + return new String[] { "ES|QL cross-cluster search will be disabled." }; + } + break; + } + return Strings.EMPTY_ARRAY; + } + private static String[] machineLearningAcknowledgementMessages(OperationMode currentMode, OperationMode newMode) { switch (newMode) { case BASIC: diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/XPackLicenseStateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/XPackLicenseStateTests.java index e889d25cd7a96..d788a0b5abd37 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/XPackLicenseStateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/XPackLicenseStateTests.java @@ -13,6 +13,7 @@ import org.elasticsearch.xpack.core.XPackField; import java.util.Arrays; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -59,6 +60,12 @@ void assertAckMessages(String feature, OperationMode from, OperationMode to, int assertEquals(expectedMessages, gotMessages.length); } + void assertAckMessages(String feature, OperationMode from, OperationMode to, Set expectedMessages) { + String[] gotMessages = XPackLicenseState.ACKNOWLEDGMENT_MESSAGES.get(feature).apply(from, to); + Set actualMessages = Arrays.stream(gotMessages).collect(Collectors.toSet()); + assertThat(actualMessages, equalTo(expectedMessages)); + } + static T randomFrom(T[] values, Predicate filter) { return randomFrom(Arrays.stream(values).filter(filter).collect(Collectors.toList())); } @@ -143,6 +150,16 @@ public void testCcrAckTrialOrPlatinumToNotTrialOrPlatinum() { assertAckMessages(XPackField.CCR, randomTrialOrPlatinumMode(), randomBasicStandardOrGold(), 1); } + public void testEsqlAckToTrialOrPlatinum() { + assertAckMessages(XPackField.ESQL, randomMode(), randomFrom(TRIAL, ENTERPRISE), 0); + } + + public void testEsqlAckTrialOrEnterpriseToNotTrialOrEnterprise() { + for (OperationMode to : List.of(BASIC, STANDARD, GOLD, PLATINUM)) { + assertAckMessages(XPackField.ESQL, randomFrom(TRIAL, ENTERPRISE), to, Set.of("ES|QL cross-cluster search will be disabled.")); + } + } + public void testExpiredLicense() { // use standard feature which would normally be allowed at all license levels LicensedFeature feature = LicensedFeature.momentary("family", "enterpriseFeature", STANDARD); diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractEnrichBasedCrossClusterTestCase.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractEnrichBasedCrossClusterTestCase.java new file mode 100644 index 0000000000000..66ac32b33cd4d --- /dev/null +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractEnrichBasedCrossClusterTestCase.java @@ -0,0 +1,290 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.action; + +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.TransportAction; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.ingest.common.IngestCommonPlugin; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.license.LicenseService; +import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.protocol.xpack.XPackInfoRequest; +import org.elasticsearch.protocol.xpack.XPackInfoResponse; +import org.elasticsearch.reindex.ReindexPlugin; +import org.elasticsearch.test.AbstractMultiClustersTestCase; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; +import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.action.TransportXPackInfoAction; +import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction; +import org.elasticsearch.xpack.core.action.XPackInfoFeatureResponse; +import org.elasticsearch.xpack.core.enrich.EnrichPolicy; +import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction; +import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction; +import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction; +import org.elasticsearch.xpack.enrich.EnrichPlugin; +import org.elasticsearch.xpack.esql.EsqlTestUtils; +import org.elasticsearch.xpack.esql.plan.logical.Enrich; +import org.junit.After; +import org.junit.Before; + +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.containsString; + +public abstract class AbstractEnrichBasedCrossClusterTestCase extends AbstractMultiClustersTestCase { + + public static String REMOTE_CLUSTER_1 = "c1"; + public static String REMOTE_CLUSTER_2 = "c2"; + + /** + * subclasses should override if they don't want enrich policies wiped after each test method run + */ + protected boolean tolerateErrorsWhenWipingEnrichPolicies() { + return false; + } + + @Override + protected List remoteClusterAlias() { + return List.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2); + } + + protected Collection allClusters() { + return CollectionUtils.appendToCopy(remoteClusterAlias(), LOCAL_CLUSTER); + } + + @Override + protected Collection> nodePlugins(String clusterAlias) { + List> plugins = new ArrayList<>(super.nodePlugins(clusterAlias)); + plugins.add(CrossClustersEnrichIT.LocalStateEnrich.class); + plugins.add(IngestCommonPlugin.class); + plugins.add(ReindexPlugin.class); + return plugins; + } + + @Override + protected Settings nodeSettings() { + return Settings.builder().put(super.nodeSettings()).put(XPackSettings.SECURITY_ENABLED.getKey(), false).build(); + } + + static final EnrichPolicy hostPolicy = new EnrichPolicy("match", null, List.of("hosts"), "ip", List.of("ip", "os")); + static final EnrichPolicy vendorPolicy = new EnrichPolicy("match", null, List.of("vendors"), "os", List.of("os", "vendor")); + + @Before + public void setupHostsEnrich() { + // the hosts policy are identical on every node + Map allHosts = Map.of( + "192.168.1.2", + "Windows", + "192.168.1.3", + "MacOS", + "192.168.1.4", + "Linux", + "192.168.1.5", + "Android", + "192.168.1.6", + "iOS", + "192.168.1.7", + "Windows", + "192.168.1.8", + "MacOS", + "192.168.1.9", + "Linux", + "192.168.1.10", + "Linux", + "192.168.1.11", + "Windows" + ); + for (String cluster : allClusters()) { + Client client = client(cluster); + client.admin().indices().prepareCreate("hosts").setMapping("ip", "type=ip", "os", "type=keyword").get(); + for (Map.Entry h : allHosts.entrySet()) { + client.prepareIndex("hosts").setSource("ip", h.getKey(), "os", h.getValue()).get(); + } + client.admin().indices().prepareRefresh("hosts").get(); + client.execute(PutEnrichPolicyAction.INSTANCE, new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "hosts", hostPolicy)) + .actionGet(); + client.execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "hosts")) + .actionGet(); + assertAcked(client.admin().indices().prepareDelete("hosts")); + } + } + + @Before + public void setupVendorPolicy() { + var localVendors = Map.of("Windows", "Microsoft", "MacOS", "Apple", "iOS", "Apple", "Android", "Samsung", "Linux", "Redhat"); + var c1Vendors = Map.of("Windows", "Microsoft", "MacOS", "Apple", "iOS", "Apple", "Android", "Google", "Linux", "Suse"); + var c2Vendors = Map.of("Windows", "Microsoft", "MacOS", "Apple", "iOS", "Apple", "Android", "Sony", "Linux", "Ubuntu"); + var vendors = Map.of(LOCAL_CLUSTER, localVendors, REMOTE_CLUSTER_1, c1Vendors, REMOTE_CLUSTER_2, c2Vendors); + for (Map.Entry> e : vendors.entrySet()) { + Client client = client(e.getKey()); + client.admin().indices().prepareCreate("vendors").setMapping("os", "type=keyword", "vendor", "type=keyword").get(); + for (Map.Entry v : e.getValue().entrySet()) { + client.prepareIndex("vendors").setSource("os", v.getKey(), "vendor", v.getValue()).get(); + } + client.admin().indices().prepareRefresh("vendors").get(); + client.execute(PutEnrichPolicyAction.INSTANCE, new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "vendors", vendorPolicy)) + .actionGet(); + client.execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "vendors")) + .actionGet(); + assertAcked(client.admin().indices().prepareDelete("vendors")); + } + } + + @Before + public void setupEventsIndices() { + record Event(long timestamp, String user, String host) { + + } + List e0 = List.of( + new Event(1, "matthew", "192.168.1.3"), + new Event(2, "simon", "192.168.1.5"), + new Event(3, "park", "192.168.1.2"), + new Event(4, "andrew", "192.168.1.7"), + new Event(5, "simon", "192.168.1.20"), + new Event(6, "kevin", "192.168.1.2"), + new Event(7, "akio", "192.168.1.5"), + new Event(8, "luke", "192.168.1.2"), + new Event(9, "jack", "192.168.1.4") + ); + List e1 = List.of( + new Event(1, "andres", "192.168.1.2"), + new Event(2, "sergio", "192.168.1.6"), + new Event(3, "kylian", "192.168.1.8"), + new Event(4, "andrew", "192.168.1.9"), + new Event(5, "jack", "192.168.1.3"), + new Event(6, "kevin", "192.168.1.4"), + new Event(7, "akio", "192.168.1.7"), + new Event(8, "kevin", "192.168.1.21"), + new Event(9, "andres", "192.168.1.8") + ); + List e2 = List.of( + new Event(1, "park", "192.168.1.25"), + new Event(2, "akio", "192.168.1.5"), + new Event(3, "park", "192.168.1.2"), + new Event(4, "kevin", "192.168.1.3") + ); + for (var c : Map.of(LOCAL_CLUSTER, e0, REMOTE_CLUSTER_1, e1, REMOTE_CLUSTER_2, e2).entrySet()) { + Client client = client(c.getKey()); + client.admin() + .indices() + .prepareCreate("events") + .setMapping("timestamp", "type=long", "user", "type=keyword", "host", "type=ip") + .get(); + for (var e : c.getValue()) { + client.prepareIndex("events").setSource("timestamp", e.timestamp, "user", e.user, "host", e.host).get(); + } + client.admin().indices().prepareRefresh("events").get(); + } + } + + @After + public void wipeEnrichPolicies() { + for (String cluster : allClusters()) { + cluster(cluster).wipe(Set.of()); + for (String policy : List.of("hosts", "vendors")) { + if (tolerateErrorsWhenWipingEnrichPolicies()) { + try { + client(cluster).execute( + DeleteEnrichPolicyAction.INSTANCE, + new DeleteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policy) + ); + } catch (Exception e) { + assertThat(e.getMessage(), containsString("Cluster is already closed")); + } + + } else { + client(cluster).execute( + DeleteEnrichPolicyAction.INSTANCE, + new DeleteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policy) + ); + } + } + } + } + + static String enrichHosts(Enrich.Mode mode) { + return EsqlTestUtils.randomEnrichCommand("hosts", mode, hostPolicy.getMatchField(), hostPolicy.getEnrichFields()); + } + + static String enrichVendors(Enrich.Mode mode) { + return EsqlTestUtils.randomEnrichCommand("vendors", mode, vendorPolicy.getMatchField(), vendorPolicy.getEnrichFields()); + } + + protected EsqlQueryResponse runQuery(String query, Boolean ccsMetadataInResponse) { + EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest(); + request.query(query); + request.pragmas(AbstractEsqlIntegTestCase.randomPragmas()); + if (randomBoolean()) { + request.profile(true); + } + if (ccsMetadataInResponse != null) { + request.includeCCSMetadata(ccsMetadataInResponse); + } + return client(LOCAL_CLUSTER).execute(EsqlQueryAction.INSTANCE, request).actionGet(30, TimeUnit.SECONDS); + } + + public static Tuple randomIncludeCCSMetadata() { + return switch (randomIntBetween(1, 3)) { + case 1 -> new Tuple<>(Boolean.TRUE, Boolean.TRUE); + case 2 -> new Tuple<>(Boolean.FALSE, Boolean.FALSE); + case 3 -> new Tuple<>(null, Boolean.FALSE); + default -> throw new AssertionError("should not get here"); + }; + } + + public static class LocalStateEnrich extends LocalStateCompositeXPackPlugin { + public LocalStateEnrich(final Settings settings, final Path configPath) throws Exception { + super(settings, configPath); + + plugins.add(new EnrichPlugin(settings) { + @Override + protected XPackLicenseState getLicenseState() { + return this.getLicenseState(); + } + }); + } + + public static class EnrichTransportXPackInfoAction extends TransportXPackInfoAction { + @Inject + public EnrichTransportXPackInfoAction( + TransportService transportService, + ActionFilters actionFilters, + LicenseService licenseService, + NodeClient client + ) { + super(transportService, actionFilters, licenseService, client); + } + + @Override + protected List> infoActions() { + return Collections.singletonList(XPackInfoFeatureAction.ENRICH); + } + } + + @Override + protected Class> getInfoAction() { + return CrossClustersQueriesWithInvalidLicenseIT.LocalStateEnrich.EnrichTransportXPackInfoAction.class; + } + } +} diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryIT.java index c8206621de419..a2bba19db50fc 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryIT.java @@ -35,7 +35,6 @@ import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest; import org.elasticsearch.xpack.core.async.GetAsyncResultRequest; import org.elasticsearch.xpack.core.async.TransportDeleteAsyncResultAction; -import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; import org.junit.Before; import java.io.IOException; @@ -78,7 +77,7 @@ protected Map skipUnavailableForRemoteClusters() { @Override protected Collection> nodePlugins(String clusterAlias) { List> plugins = new ArrayList<>(super.nodePlugins(clusterAlias)); - plugins.add(EsqlPlugin.class); + plugins.add(EsqlPluginWithEnterpriseOrTrialLicense.class); plugins.add(EsqlAsyncActionIT.LocalStateEsqlAsync.class); // allows the async_search DELETE action plugins.add(InternalExchangePlugin.class); plugins.add(PauseFieldPlugin.class); diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichUnavailableClustersIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichUnavailableClustersIT.java index 5c3e1974e924f..09ad97b08f357 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichUnavailableClustersIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichUnavailableClustersIT.java @@ -8,36 +8,21 @@ package org.elasticsearch.xpack.esql.action; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.client.internal.Client; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.core.Tuple; -import org.elasticsearch.ingest.common.IngestCommonPlugin; import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.reindex.ReindexPlugin; -import org.elasticsearch.test.AbstractMultiClustersTestCase; import org.elasticsearch.transport.RemoteClusterAware; -import org.elasticsearch.xpack.core.XPackSettings; -import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction; -import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction; import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.plan.logical.Enrich; -import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; -import org.junit.Before; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Locale; -import java.util.Map; import java.util.Set; -import java.util.concurrent.TimeUnit; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList; -import static org.elasticsearch.xpack.esql.action.CrossClustersEnrichIT.enrichHosts; -import static org.elasticsearch.xpack.esql.action.CrossClustersEnrichIT.enrichVendors; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -47,151 +32,26 @@ * This IT test is the dual of CrossClustersEnrichIT, which tests "happy path" * and this one tests unavailable cluster scenarios using (most of) the same tests. */ -public class CrossClusterEnrichUnavailableClustersIT extends AbstractMultiClustersTestCase { - - public static String REMOTE_CLUSTER_1 = "c1"; - public static String REMOTE_CLUSTER_2 = "c2"; - - @Override - protected List remoteClusterAlias() { - return List.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2); - } +public class CrossClusterEnrichUnavailableClustersIT extends AbstractEnrichBasedCrossClusterTestCase { @Override protected boolean reuseClusters() { return false; } - private Collection allClusters() { - return CollectionUtils.appendToCopy(remoteClusterAlias(), LOCAL_CLUSTER); + @Override + protected boolean tolerateErrorsWhenWipingEnrichPolicies() { + // attempt to wipe will fail since some clusters are already closed + return true; } @Override protected Collection> nodePlugins(String clusterAlias) { List> plugins = new ArrayList<>(super.nodePlugins(clusterAlias)); - plugins.add(EsqlPlugin.class); - plugins.add(CrossClustersEnrichIT.LocalStateEnrich.class); - plugins.add(IngestCommonPlugin.class); - plugins.add(ReindexPlugin.class); + plugins.add(EsqlPluginWithEnterpriseOrTrialLicense.class); return plugins; } - @Override - protected Settings nodeSettings() { - return Settings.builder().put(super.nodeSettings()).put(XPackSettings.SECURITY_ENABLED.getKey(), false).build(); - } - - @Before - public void setupHostsEnrich() { - // the hosts policy are identical on every node - Map allHosts = Map.of( - "192.168.1.2", - "Windows", - "192.168.1.3", - "MacOS", - "192.168.1.4", - "Linux", - "192.168.1.5", - "Android", - "192.168.1.6", - "iOS", - "192.168.1.7", - "Windows", - "192.168.1.8", - "MacOS", - "192.168.1.9", - "Linux", - "192.168.1.10", - "Linux", - "192.168.1.11", - "Windows" - ); - for (String cluster : allClusters()) { - Client client = client(cluster); - client.admin().indices().prepareCreate("hosts").setMapping("ip", "type=ip", "os", "type=keyword").get(); - for (Map.Entry h : allHosts.entrySet()) { - client.prepareIndex("hosts").setSource("ip", h.getKey(), "os", h.getValue()).get(); - } - client.admin().indices().prepareRefresh("hosts").get(); - client.execute( - PutEnrichPolicyAction.INSTANCE, - new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "hosts", CrossClustersEnrichIT.hostPolicy) - ).actionGet(); - client.execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "hosts")) - .actionGet(); - assertAcked(client.admin().indices().prepareDelete("hosts")); - } - } - - @Before - public void setupVendorPolicy() { - var localVendors = Map.of("Windows", "Microsoft", "MacOS", "Apple", "iOS", "Apple", "Android", "Samsung", "Linux", "Redhat"); - var c1Vendors = Map.of("Windows", "Microsoft", "MacOS", "Apple", "iOS", "Apple", "Android", "Google", "Linux", "Suse"); - var c2Vendors = Map.of("Windows", "Microsoft", "MacOS", "Apple", "iOS", "Apple", "Android", "Sony", "Linux", "Ubuntu"); - var vendors = Map.of(LOCAL_CLUSTER, localVendors, "c1", c1Vendors, "c2", c2Vendors); - for (Map.Entry> e : vendors.entrySet()) { - Client client = client(e.getKey()); - client.admin().indices().prepareCreate("vendors").setMapping("os", "type=keyword", "vendor", "type=keyword").get(); - for (Map.Entry v : e.getValue().entrySet()) { - client.prepareIndex("vendors").setSource("os", v.getKey(), "vendor", v.getValue()).get(); - } - client.admin().indices().prepareRefresh("vendors").get(); - client.execute( - PutEnrichPolicyAction.INSTANCE, - new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "vendors", CrossClustersEnrichIT.vendorPolicy) - ).actionGet(); - client.execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "vendors")) - .actionGet(); - assertAcked(client.admin().indices().prepareDelete("vendors")); - } - } - - @Before - public void setupEventsIndices() { - record Event(long timestamp, String user, String host) {} - - List e0 = List.of( - new Event(1, "matthew", "192.168.1.3"), - new Event(2, "simon", "192.168.1.5"), - new Event(3, "park", "192.168.1.2"), - new Event(4, "andrew", "192.168.1.7"), - new Event(5, "simon", "192.168.1.20"), - new Event(6, "kevin", "192.168.1.2"), - new Event(7, "akio", "192.168.1.5"), - new Event(8, "luke", "192.168.1.2"), - new Event(9, "jack", "192.168.1.4") - ); - List e1 = List.of( - new Event(1, "andres", "192.168.1.2"), - new Event(2, "sergio", "192.168.1.6"), - new Event(3, "kylian", "192.168.1.8"), - new Event(4, "andrew", "192.168.1.9"), - new Event(5, "jack", "192.168.1.3"), - new Event(6, "kevin", "192.168.1.4"), - new Event(7, "akio", "192.168.1.7"), - new Event(8, "kevin", "192.168.1.21"), - new Event(9, "andres", "192.168.1.8") - ); - List e2 = List.of( - new Event(1, "park", "192.168.1.25"), - new Event(2, "akio", "192.168.1.5"), - new Event(3, "park", "192.168.1.2"), - new Event(4, "kevin", "192.168.1.3") - ); - for (var c : Map.of(LOCAL_CLUSTER, e0, "c1", e1, "c2", e2).entrySet()) { - Client client = client(c.getKey()); - client.admin() - .indices() - .prepareCreate("events") - .setMapping("timestamp", "type=long", "user", "type=keyword", "host", "type=ip") - .get(); - for (var e : c.getValue()) { - client.prepareIndex("events").setSource("timestamp", e.timestamp, "user", e.user, "host", e.host).get(); - } - client.admin().indices().prepareRefresh("events").get(); - } - } - public void testEnrichWithHostsPolicyAndDisconnectedRemotesWithSkipUnavailableTrue() throws IOException { setSkipUnavailable(REMOTE_CLUSTER_1, true); setSkipUnavailable(REMOTE_CLUSTER_2, true); @@ -645,19 +505,6 @@ public void testEnrichRemoteWithVendor() throws IOException { } } - protected EsqlQueryResponse runQuery(String query, Boolean ccsMetadataInResponse) { - EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest(); - request.query(query); - request.pragmas(AbstractEsqlIntegTestCase.randomPragmas()); - if (randomBoolean()) { - request.profile(true); - } - if (ccsMetadataInResponse != null) { - request.includeCCSMetadata(ccsMetadataInResponse); - } - return client(LOCAL_CLUSTER).execute(EsqlQueryAction.INSTANCE, request).actionGet(30, TimeUnit.SECONDS); - } - private static void assertCCSExecutionInfoDetails(EsqlExecutionInfo executionInfo) { assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L)); assertTrue(executionInfo.isCrossClusterSearch()); diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryUnavailableRemotesIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryUnavailableRemotesIT.java index d1c9b5cfb2ac7..f65764daafb8a 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryUnavailableRemotesIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryUnavailableRemotesIT.java @@ -18,7 +18,6 @@ import org.elasticsearch.test.AbstractMultiClustersTestCase; import org.elasticsearch.test.XContentTestUtils; import org.elasticsearch.xpack.esql.core.type.DataType; -import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; import java.io.IOException; import java.util.ArrayList; @@ -54,8 +53,8 @@ protected boolean reuseClusters() { @Override protected Collection> nodePlugins(String clusterAlias) { List> plugins = new ArrayList<>(super.nodePlugins(clusterAlias)); - plugins.add(EsqlPlugin.class); - plugins.add(org.elasticsearch.xpack.esql.action.CrossClustersQueryIT.InternalExchangePlugin.class); + plugins.add(EsqlPluginWithEnterpriseOrTrialLicense.class); + plugins.add(CrossClustersQueryIT.InternalExchangePlugin.class); return plugins; } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java index 5291ad3b0d039..17f5f81486651 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java @@ -33,7 +33,6 @@ import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.json.JsonXContent; import org.elasticsearch.xpack.esql.plugin.ComputeService; -import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; import org.junit.Before; import java.util.ArrayList; @@ -62,7 +61,7 @@ protected List remoteClusterAlias() { @Override protected Collection> nodePlugins(String clusterAlias) { List> plugins = new ArrayList<>(super.nodePlugins(clusterAlias)); - plugins.add(EsqlPlugin.class); + plugins.add(EsqlPluginWithEnterpriseOrTrialLicense.class); plugins.add(InternalExchangePlugin.class); plugins.add(PauseFieldPlugin.class); return plugins; diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersEnrichIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersEnrichIT.java index 57f85751999a5..4e6be6cc2bf74 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersEnrichIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersEnrichIT.java @@ -7,218 +7,34 @@ package org.elasticsearch.xpack.esql.action; -import org.elasticsearch.action.ActionType; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.TransportAction; -import org.elasticsearch.client.internal.Client; -import org.elasticsearch.client.internal.node.NodeClient; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.core.Tuple; -import org.elasticsearch.ingest.common.IngestCommonPlugin; -import org.elasticsearch.injection.guice.Inject; -import org.elasticsearch.license.LicenseService; -import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.protocol.xpack.XPackInfoRequest; -import org.elasticsearch.protocol.xpack.XPackInfoResponse; -import org.elasticsearch.reindex.ReindexPlugin; -import org.elasticsearch.test.AbstractMultiClustersTestCase; -import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; -import org.elasticsearch.xpack.core.XPackSettings; -import org.elasticsearch.xpack.core.action.TransportXPackInfoAction; -import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction; -import org.elasticsearch.xpack.core.action.XPackInfoFeatureResponse; -import org.elasticsearch.xpack.core.enrich.EnrichPolicy; -import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction; -import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction; -import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction; -import org.elasticsearch.xpack.enrich.EnrichPlugin; -import org.elasticsearch.xpack.esql.EsqlTestUtils; import org.elasticsearch.xpack.esql.VerificationException; import org.elasticsearch.xpack.esql.plan.logical.Enrich; -import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; -import org.junit.After; -import org.junit.Before; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Locale; -import java.util.Map; import java.util.Set; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; -public class CrossClustersEnrichIT extends AbstractMultiClustersTestCase { - - @Override - protected List remoteClusterAlias() { - return List.of("c1", "c2"); - } - - protected Collection allClusters() { - return CollectionUtils.appendToCopy(remoteClusterAlias(), LOCAL_CLUSTER); - } +public class CrossClustersEnrichIT extends AbstractEnrichBasedCrossClusterTestCase { @Override protected Collection> nodePlugins(String clusterAlias) { List> plugins = new ArrayList<>(super.nodePlugins(clusterAlias)); - plugins.add(EsqlPlugin.class); - plugins.add(LocalStateEnrich.class); - plugins.add(IngestCommonPlugin.class); - plugins.add(ReindexPlugin.class); + plugins.add(EsqlPluginWithEnterpriseOrTrialLicense.class); return plugins; } - @Override - protected Settings nodeSettings() { - return Settings.builder().put(super.nodeSettings()).put(XPackSettings.SECURITY_ENABLED.getKey(), false).build(); - } - - static final EnrichPolicy hostPolicy = new EnrichPolicy("match", null, List.of("hosts"), "ip", List.of("ip", "os")); - static final EnrichPolicy vendorPolicy = new EnrichPolicy("match", null, List.of("vendors"), "os", List.of("os", "vendor")); - - @Before - public void setupHostsEnrich() { - // the hosts policy are identical on every node - Map allHosts = Map.of( - "192.168.1.2", - "Windows", - "192.168.1.3", - "MacOS", - "192.168.1.4", - "Linux", - "192.168.1.5", - "Android", - "192.168.1.6", - "iOS", - "192.168.1.7", - "Windows", - "192.168.1.8", - "MacOS", - "192.168.1.9", - "Linux", - "192.168.1.10", - "Linux", - "192.168.1.11", - "Windows" - ); - for (String cluster : allClusters()) { - Client client = client(cluster); - client.admin().indices().prepareCreate("hosts").setMapping("ip", "type=ip", "os", "type=keyword").get(); - for (Map.Entry h : allHosts.entrySet()) { - client.prepareIndex("hosts").setSource("ip", h.getKey(), "os", h.getValue()).get(); - } - client.admin().indices().prepareRefresh("hosts").get(); - client.execute(PutEnrichPolicyAction.INSTANCE, new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "hosts", hostPolicy)) - .actionGet(); - client.execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "hosts")) - .actionGet(); - assertAcked(client.admin().indices().prepareDelete("hosts")); - } - } - - @Before - public void setupVendorPolicy() { - var localVendors = Map.of("Windows", "Microsoft", "MacOS", "Apple", "iOS", "Apple", "Android", "Samsung", "Linux", "Redhat"); - var c1Vendors = Map.of("Windows", "Microsoft", "MacOS", "Apple", "iOS", "Apple", "Android", "Google", "Linux", "Suse"); - var c2Vendors = Map.of("Windows", "Microsoft", "MacOS", "Apple", "iOS", "Apple", "Android", "Sony", "Linux", "Ubuntu"); - var vendors = Map.of(LOCAL_CLUSTER, localVendors, "c1", c1Vendors, "c2", c2Vendors); - for (Map.Entry> e : vendors.entrySet()) { - Client client = client(e.getKey()); - client.admin().indices().prepareCreate("vendors").setMapping("os", "type=keyword", "vendor", "type=keyword").get(); - for (Map.Entry v : e.getValue().entrySet()) { - client.prepareIndex("vendors").setSource("os", v.getKey(), "vendor", v.getValue()).get(); - } - client.admin().indices().prepareRefresh("vendors").get(); - client.execute(PutEnrichPolicyAction.INSTANCE, new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "vendors", vendorPolicy)) - .actionGet(); - client.execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "vendors")) - .actionGet(); - assertAcked(client.admin().indices().prepareDelete("vendors")); - } - } - - @Before - public void setupEventsIndices() { - record Event(long timestamp, String user, String host) { - - } - List e0 = List.of( - new Event(1, "matthew", "192.168.1.3"), - new Event(2, "simon", "192.168.1.5"), - new Event(3, "park", "192.168.1.2"), - new Event(4, "andrew", "192.168.1.7"), - new Event(5, "simon", "192.168.1.20"), - new Event(6, "kevin", "192.168.1.2"), - new Event(7, "akio", "192.168.1.5"), - new Event(8, "luke", "192.168.1.2"), - new Event(9, "jack", "192.168.1.4") - ); - List e1 = List.of( - new Event(1, "andres", "192.168.1.2"), - new Event(2, "sergio", "192.168.1.6"), - new Event(3, "kylian", "192.168.1.8"), - new Event(4, "andrew", "192.168.1.9"), - new Event(5, "jack", "192.168.1.3"), - new Event(6, "kevin", "192.168.1.4"), - new Event(7, "akio", "192.168.1.7"), - new Event(8, "kevin", "192.168.1.21"), - new Event(9, "andres", "192.168.1.8") - ); - List e2 = List.of( - new Event(1, "park", "192.168.1.25"), - new Event(2, "akio", "192.168.1.5"), - new Event(3, "park", "192.168.1.2"), - new Event(4, "kevin", "192.168.1.3") - ); - for (var c : Map.of(LOCAL_CLUSTER, e0, "c1", e1, "c2", e2).entrySet()) { - Client client = client(c.getKey()); - client.admin() - .indices() - .prepareCreate("events") - .setMapping("timestamp", "type=long", "user", "type=keyword", "host", "type=ip") - .get(); - for (var e : c.getValue()) { - client.prepareIndex("events").setSource("timestamp", e.timestamp, "user", e.user, "host", e.host).get(); - } - client.admin().indices().prepareRefresh("events").get(); - } - } - - @After - public void wipeEnrichPolicies() { - for (String cluster : allClusters()) { - cluster(cluster).wipe(Set.of()); - for (String policy : List.of("hosts", "vendors")) { - client(cluster).execute( - DeleteEnrichPolicyAction.INSTANCE, - new DeleteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policy) - ); - } - } - } - - static String enrichHosts(Enrich.Mode mode) { - return EsqlTestUtils.randomEnrichCommand("hosts", mode, hostPolicy.getMatchField(), hostPolicy.getEnrichFields()); - } - - static String enrichVendors(Enrich.Mode mode) { - return EsqlTestUtils.randomEnrichCommand("vendors", mode, vendorPolicy.getMatchField(), vendorPolicy.getEnrichFields()); - } - public void testWithHostsPolicy() { for (var mode : Enrich.Mode.values()) { String query = "FROM events | eval ip= TO_STR(host) | " + enrichHosts(mode) + " | stats c = COUNT(*) by os | SORT os"; @@ -606,19 +422,6 @@ public void testEnrichCoordinatorThenEnrichRemote() { ); } - protected EsqlQueryResponse runQuery(String query, Boolean ccsMetadataInResponse) { - EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest(); - request.query(query); - request.pragmas(AbstractEsqlIntegTestCase.randomPragmas()); - if (randomBoolean()) { - request.profile(true); - } - if (ccsMetadataInResponse != null) { - request.includeCCSMetadata(ccsMetadataInResponse); - } - return client(LOCAL_CLUSTER).execute(EsqlQueryAction.INSTANCE, request).actionGet(30, TimeUnit.SECONDS); - } - private static void assertCCSExecutionInfoDetails(EsqlExecutionInfo executionInfo) { assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L)); assertTrue(executionInfo.isCrossClusterSearch()); @@ -637,49 +440,4 @@ private static void assertCCSExecutionInfoDetails(EsqlExecutionInfo executionInf assertThat(cluster.getFailedShards(), equalTo(0)); } } - - public static Tuple randomIncludeCCSMetadata() { - return switch (randomIntBetween(1, 3)) { - case 1 -> new Tuple<>(Boolean.TRUE, Boolean.TRUE); - case 2 -> new Tuple<>(Boolean.FALSE, Boolean.FALSE); - case 3 -> new Tuple<>(null, Boolean.FALSE); - default -> throw new AssertionError("should not get here"); - }; - } - - public static class LocalStateEnrich extends LocalStateCompositeXPackPlugin { - - public LocalStateEnrich(final Settings settings, final Path configPath) throws Exception { - super(settings, configPath); - - plugins.add(new EnrichPlugin(settings) { - @Override - protected XPackLicenseState getLicenseState() { - return this.getLicenseState(); - } - }); - } - - public static class EnrichTransportXPackInfoAction extends TransportXPackInfoAction { - @Inject - public EnrichTransportXPackInfoAction( - TransportService transportService, - ActionFilters actionFilters, - LicenseService licenseService, - NodeClient client - ) { - super(transportService, actionFilters, licenseService, client); - } - - @Override - protected List> infoActions() { - return Collections.singletonList(XPackInfoFeatureAction.ENRICH); - } - } - - @Override - protected Class> getInfoAction() { - return EnrichTransportXPackInfoAction.class; - } - } } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueriesWithInvalidLicenseIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueriesWithInvalidLicenseIT.java new file mode 100644 index 0000000000000..1ed42b696d65e --- /dev/null +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueriesWithInvalidLicenseIT.java @@ -0,0 +1,203 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.action; + +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.xpack.esql.plan.logical.Enrich; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Locale; +import java.util.Set; + +import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; + +public class CrossClustersQueriesWithInvalidLicenseIT extends AbstractEnrichBasedCrossClusterTestCase { + + private static final String LICENSE_ERROR_MESSAGE = "A valid Enterprise license is required to run ES|QL cross-cluster searches."; + + @Override + protected Collection> nodePlugins(String clusterAlias) { + List> plugins = new ArrayList<>(super.nodePlugins(clusterAlias)); + plugins.add(EsqlPluginWithNonEnterpriseOrExpiredLicense.class); // key plugin for the test + return plugins; + } + + public void testBasicCrossClusterQuery() { + Tuple includeCCSMetadata = randomIncludeCCSMetadata(); + Boolean requestIncludeMeta = includeCCSMetadata.v1(); + ElasticsearchStatusException e = expectThrows( + ElasticsearchStatusException.class, + () -> runQuery("FROM *,*:* | LIMIT 5", requestIncludeMeta) + ); + assertThat(e.getMessage(), containsString(LICENSE_ERROR_MESSAGE)); + } + + public void testMetadataCrossClusterQuery() { + Tuple includeCCSMetadata = randomIncludeCCSMetadata(); + Boolean requestIncludeMeta = includeCCSMetadata.v1(); + ElasticsearchStatusException e = expectThrows( + ElasticsearchStatusException.class, + () -> runQuery("FROM events,*:* METADATA _index | SORT _index", requestIncludeMeta) + ); + assertThat(e.getMessage(), containsString(LICENSE_ERROR_MESSAGE)); + } + + public void testQueryAgainstNonMatchingClusterWildcardPattern() { + Tuple includeCCSMetadata = randomIncludeCCSMetadata(); + Boolean requestIncludeMeta = includeCCSMetadata.v1(); + boolean responseExpectMeta = includeCCSMetadata.v2(); + + // since this wildcarded expression does not resolve to a valid remote cluster, it is not considered + // a cross-cluster search and thus should not throw a license error + String q = "FROM xremote*:events"; + { + String limit1 = q + " | STATS count(*)"; + try (EsqlQueryResponse resp = runQuery(limit1, requestIncludeMeta)) { + assertThat(resp.columns().size(), equalTo(1)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertThat(executionInfo.isCrossClusterSearch(), is(false)); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + } + + String limit0 = q + " | LIMIT 0"; + try (EsqlQueryResponse resp = runQuery(limit0, requestIncludeMeta)) { + assertThat(resp.columns().size(), equalTo(1)); + assertThat(getValuesList(resp).size(), equalTo(0)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertThat(executionInfo.isCrossClusterSearch(), is(false)); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + } + } + } + + public void testCCSWithLimit0() { + Tuple includeCCSMetadata = randomIncludeCCSMetadata(); + Boolean requestIncludeMeta = includeCCSMetadata.v1(); + + // local only query does not need a valid Enterprise or Trial license + try (EsqlQueryResponse resp = runQuery("FROM events | LIMIT 0", requestIncludeMeta)) { + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertNotNull(executionInfo); + assertThat(executionInfo.isCrossClusterSearch(), is(false)); + assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L)); + } + + // cross-cluster searches should fail with license error + String q = randomFrom("FROM events,c1:* | LIMIT 0", "FROM c1:* | LIMIT 0"); + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> runQuery(q, requestIncludeMeta)); + assertThat(e.getMessage(), containsString(LICENSE_ERROR_MESSAGE)); + } + + public void testSearchesWhereNonExistentClusterIsSpecified() { + Tuple includeCCSMetadata = randomIncludeCCSMetadata(); + Boolean requestIncludeMeta = includeCCSMetadata.v1(); + boolean responseExpectMeta = includeCCSMetadata.v2(); + + // this one query should be allowed since x* does not resolve to any known remote cluster + try (EsqlQueryResponse resp = runQuery("FROM events,x*:no_such_index* | STATS count(*)", requestIncludeMeta)) { + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + List> values = getValuesList(resp); + assertThat(values, hasSize(1)); + + assertNotNull(executionInfo); + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(LOCAL_CLUSTER))); + assertThat(executionInfo.isCrossClusterSearch(), is(false)); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + // since this not a CCS, only the overall took time in the EsqlExecutionInfo matters + assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L)); + } + + ElasticsearchStatusException e = expectThrows( + ElasticsearchStatusException.class, + () -> runQuery("FROM events,no_such_cluster:no_such_index* | STATS count(*)", requestIncludeMeta) + ); + // with a valid license this would throw "no such remote cluster" exception, but without a valid license, should get a license error + assertThat(e.getMessage(), containsString(LICENSE_ERROR_MESSAGE)); + } + + public void testEnrichWithHostsPolicy() { + // local-only queries do not need an Enterprise or Trial license + for (var mode : Enrich.Mode.values()) { + String query = "FROM events | eval ip= TO_STR(host) | " + enrichHosts(mode) + " | stats c = COUNT(*) by os | SORT os"; + try (EsqlQueryResponse resp = runQuery(query, null)) { + List> rows = getValuesList(resp); + assertThat( + rows, + equalTo( + List.of( + List.of(2L, "Android"), + List.of(1L, "Linux"), + List.of(1L, "MacOS"), + List.of(4L, "Windows"), + Arrays.asList(1L, (String) null) + ) + ) + ); + assertFalse(resp.getExecutionInfo().isCrossClusterSearch()); + } + } + + // cross-cluster query should fail due to not having valid Enterprise or Trial license + Tuple includeCCSMetadata = randomIncludeCCSMetadata(); + Boolean requestIncludeMeta = includeCCSMetadata.v1(); + + for (var mode : Enrich.Mode.values()) { + String query = "FROM *:events | eval ip= TO_STR(host) | " + enrichHosts(mode) + " | stats c = COUNT(*) by os | SORT os"; + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> runQuery(query, requestIncludeMeta)); + assertThat(e.getMessage(), containsString("A valid Enterprise license is required to run ES|QL cross-cluster searches.")); + } + + for (var mode : Enrich.Mode.values()) { + String query = "FROM *:events,events | eval ip= TO_STR(host) | " + enrichHosts(mode) + " | stats c = COUNT(*) by os | SORT os"; + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> runQuery(query, requestIncludeMeta)); + assertThat(e.getMessage(), containsString("A valid Enterprise license is required to run ES|QL cross-cluster searches.")); + } + } + + public void testAggThenEnrichRemote() { + String query = String.format(Locale.ROOT, """ + FROM *:events,events + | eval ip= TO_STR(host) + | %s + | stats c = COUNT(*) by os + | %s + | sort vendor + """, enrichHosts(Enrich.Mode.ANY), enrichVendors(Enrich.Mode.REMOTE)); + var error = expectThrows(ElasticsearchStatusException.class, () -> runQuery(query, randomBoolean()).close()); + // with a valid license this would fail with "ENRICH with remote policy can't be executed after STATS", so ensure here + // that the license error is detected first and returned rather than a VerificationException + assertThat(error.getMessage(), containsString(LICENSE_ERROR_MESSAGE)); + } + + public void testEnrichCoordinatorThenEnrichRemote() { + String query = String.format(Locale.ROOT, """ + FROM *:events,events + | eval ip= TO_STR(host) + | %s + | %s + | sort vendor + """, enrichHosts(Enrich.Mode.COORDINATOR), enrichVendors(Enrich.Mode.REMOTE)); + var error = expectThrows(ElasticsearchStatusException.class, () -> runQuery(query, randomBoolean()).close()); + assertThat( + error.getMessage(), + // with a valid license the error is "ENRICH with remote policy can't be executed after another ENRICH with coordinator policy", + // so ensure here that the license error is detected first and returned rather than a VerificationException + containsString(LICENSE_ERROR_MESSAGE) + ); + } +} diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java index 46bbad5551e6b..347ef419cab9b 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java @@ -32,7 +32,6 @@ import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.esql.VerificationException; -import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; import org.elasticsearch.xpack.esql.plugin.QueryPragmas; import java.io.IOException; @@ -73,13 +72,13 @@ protected List remoteClusterAlias() { @Override protected Map skipUnavailableForRemoteClusters() { - return Map.of(REMOTE_CLUSTER_1, randomBoolean()); + return Map.of(REMOTE_CLUSTER_1, randomBoolean(), REMOTE_CLUSTER_2, randomBoolean()); } @Override protected Collection> nodePlugins(String clusterAlias) { List> plugins = new ArrayList<>(super.nodePlugins(clusterAlias)); - plugins.add(EsqlPlugin.class); + plugins.add(EsqlPluginWithEnterpriseOrTrialLicense.class); plugins.add(InternalExchangePlugin.class); return plugins; } @@ -184,7 +183,7 @@ public void testSuccessfulPathways() { } public void testSearchesAgainstNonMatchingIndicesWithLocalOnly() { - Map testClusterInfo = setupClusters(2); + Map testClusterInfo = setupTwoClusters(); String localIndex = (String) testClusterInfo.get("local.index"); { @@ -905,7 +904,7 @@ public void testSearchesWhereNonExistentClusterIsSpecifiedWithWildcards() { // cluster-foo* matches nothing and so should not be present in the EsqlExecutionInfo try ( EsqlQueryResponse resp = runQuery( - "from logs-*,no_such_index*,cluster-a:no_such_index*,cluster-foo*:* | stats sum (v)", + "FROM logs-*,no_such_index*,cluster-a:no_such_index*,cluster-foo*:* | STATS sum (v)", requestIncludeMeta ) ) { @@ -1009,7 +1008,7 @@ public void testMetadataIndex() { try ( EsqlQueryResponse resp = runQuery( - "FROM logs*,*:logs* METADATA _index | stats sum(v) by _index | sort _index", + Strings.format("FROM logs*,%s:logs* METADATA _index | stats sum(v) by _index | sort _index", REMOTE_CLUSTER_1), requestIncludeMeta ) ) { @@ -1091,7 +1090,7 @@ public void testProfile() { final int remoteOnlyProfiles; { EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest(); - request.query("FROM *:logs* | stats sum(v)"); + request.query("FROM c*:logs* | stats sum(v)"); request.pragmas(pragmas); request.profile(true); try (EsqlQueryResponse resp = runQuery(request)) { @@ -1124,7 +1123,7 @@ public void testProfile() { final int allProfiles; { EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest(); - request.query("FROM logs*,*:logs* | stats total = sum(v)"); + request.query("FROM logs*,c*:logs* | stats total = sum(v)"); request.pragmas(pragmas); request.profile(true); try (EsqlQueryResponse resp = runQuery(request)) { @@ -1169,7 +1168,7 @@ public void testWarnings() throws Exception { int remoteNumShards = (Integer) testClusterInfo.get("remote.num_shards"); EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest(); - request.query("FROM logs*,*:logs* | EVAL ip = to_ip(id) | STATS total = sum(v) by ip | LIMIT 10"); + request.query("FROM logs*,c*:logs* | EVAL ip = to_ip(id) | STATS total = sum(v) by ip | LIMIT 10"); InternalTestCluster cluster = cluster(LOCAL_CLUSTER); String node = randomFrom(cluster.getNodeNames()); CountDownLatch latch = new CountDownLatch(1); diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlPluginWithEnterpriseOrTrialLicense.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlPluginWithEnterpriseOrTrialLicense.java new file mode 100644 index 0000000000000..34d09fc541572 --- /dev/null +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlPluginWithEnterpriseOrTrialLicense.java @@ -0,0 +1,26 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.action; + +import org.elasticsearch.license.License; +import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.license.internal.XPackLicenseStatus; +import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; + +import static org.elasticsearch.test.ESTestCase.randomFrom; + +/** + * In IT tests, use this instead of the EsqlPlugin in order to use ES|QL features + * that require an Enteprise (or Trial) license. + */ +public class EsqlPluginWithEnterpriseOrTrialLicense extends EsqlPlugin { + protected XPackLicenseState getLicenseState() { + License.OperationMode operationMode = randomFrom(License.OperationMode.ENTERPRISE, License.OperationMode.TRIAL); + return new XPackLicenseState(() -> System.currentTimeMillis(), new XPackLicenseStatus(operationMode, true, "Test license expired")); + } +} diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlPluginWithNonEnterpriseOrExpiredLicense.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlPluginWithNonEnterpriseOrExpiredLicense.java new file mode 100644 index 0000000000000..46c3f3f6204cd --- /dev/null +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlPluginWithNonEnterpriseOrExpiredLicense.java @@ -0,0 +1,47 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.action; + +import org.elasticsearch.license.License; +import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.license.internal.XPackLicenseStatus; +import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; + +import static org.elasticsearch.test.ESTestCase.randomBoolean; +import static org.elasticsearch.test.ESTestCase.randomFrom; + +/** + * In IT tests, use this instead of the EsqlPlugin in order to test ES|QL features + * using either a: + * - an active (non-expired) basic, standard, missing, gold or platinum Elasticsearch license, OR + * - an expired enterprise or trial license + */ +public class EsqlPluginWithNonEnterpriseOrExpiredLicense extends EsqlPlugin { + protected XPackLicenseState getLicenseState() { + License.OperationMode operationMode; + boolean active; + if (randomBoolean()) { + operationMode = randomFrom( + License.OperationMode.PLATINUM, + License.OperationMode.GOLD, + License.OperationMode.BASIC, + License.OperationMode.MISSING, + License.OperationMode.STANDARD + ); + active = true; + } else { + operationMode = randomFrom(License.OperationMode.ENTERPRISE, License.OperationMode.TRIAL); + active = false; // expired + } + + return new XPackLicenseState( + () -> System.currentTimeMillis(), + new XPackLicenseStatus(operationMode, active, "Test license expired") + ); + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Verifier.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Verifier.java index 7a733d73941e4..f01cc265e330b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Verifier.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Verifier.java @@ -605,6 +605,10 @@ private void gatherMetrics(LogicalPlan plan, BitSet b) { functions.forEach(f -> metrics.incFunctionMetric(f)); } + public XPackLicenseState licenseState() { + return licenseState; + } + /** * Limit QL's comparisons to types we support. This should agree with * {@link EsqlBinaryComparison}'s checkCompatibility method diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlLicenseChecker.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlLicenseChecker.java new file mode 100644 index 0000000000000..0a52ee75de3b2 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlLicenseChecker.java @@ -0,0 +1,51 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.session; + +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.license.License; +import org.elasticsearch.license.LicensedFeature; +import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.rest.RestStatus; + +public class EsqlLicenseChecker { + + public static final LicensedFeature.Momentary CCS_FEATURE = LicensedFeature.momentary( + null, + "esql-ccs", + License.OperationMode.ENTERPRISE + ); + + /** + * Only call this method once you know the user is doing a cross-cluster query, as it will update + * the license_usage timestamp for the esql-ccs feature if the license is Enterprise (or Trial). + * @param licenseState + * @return true if the user has a license that allows ESQL CCS. + */ + public static boolean isCcsAllowed(XPackLicenseState licenseState) { + if (licenseState == null) { + return false; + } + return CCS_FEATURE.check(licenseState); + } + + /** + * @param licenseState existing license state. Need to extract info on the current installed license. + * @return ElasticsearchStatusException with an error message informing the caller what license is needed + * to run ES|QL cross-cluster searches and what license (if any) was found. + */ + public static ElasticsearchStatusException invalidLicenseForCcsException(XPackLicenseState licenseState) { + String message = "A valid Enterprise license is required to run ES|QL cross-cluster searches. License found: "; + if (licenseState == null) { + message += "none"; + } else { + message += licenseState.statusDescription(); + } + return new ElasticsearchStatusException(message, RestStatus.BAD_REQUEST); + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 4f7c620bc8d12..83480f6651abf 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -298,6 +298,9 @@ public void analyzedPlan( .map(e -> new EnrichPolicyResolver.UnresolvedPolicy((String) e.policyName().fold(), e.mode())) .collect(Collectors.toSet()); final List indices = preAnalysis.indices; + + EsqlSessionCCSUtils.checkForCcsLicense(indices, indicesExpressionGrouper, verifier.licenseState()); + // TODO: make a separate call for lookup indices final Set targetClusters = enrichPolicyResolver.groupIndicesPerCluster( indices.stream().flatMap(t -> Arrays.stream(Strings.commaDelimitedListToStringArray(t.id().index()))).toArray(String[]::new) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java index 4fe2fef7e3f45..662572c466511 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java @@ -9,17 +9,24 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure; import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.Strings; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.indices.IndicesExpressionGrouper; +import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.transport.ConnectTransportException; +import org.elasticsearch.transport.NoSuchRemoteClusterException; import org.elasticsearch.transport.RemoteClusterAware; +import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.xpack.esql.VerificationException; import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; import org.elasticsearch.xpack.esql.analysis.Analyzer; +import org.elasticsearch.xpack.esql.analysis.TableInfo; import org.elasticsearch.xpack.esql.index.IndexResolution; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; @@ -255,6 +262,9 @@ static boolean missingIndicesIsFatal(String clusterAlias, EsqlExecutionInfo exec } private static boolean concreteIndexRequested(String indexExpression) { + if (Strings.isNullOrBlank(indexExpression)) { + return false; + } for (String expr : indexExpression.split(",")) { if (expr.charAt(0) == '<' || expr.startsWith("-<")) { // skip date math expressions @@ -288,4 +298,37 @@ static void updateExecutionInfoAtEndOfPlanning(EsqlExecutionInfo execInfo) { } } } + + /** + * Checks the index expression for the presence of remote clusters. If found, it will ensure that the caller + * has a valid Enterprise (or Trial) license on the querying cluster. + * @param indices index expression requested by user + * @param indicesGrouper grouper of index expressions by cluster alias + * @param licenseState license state on the querying cluster + * @throws org.elasticsearch.ElasticsearchStatusException if the license is not valid (or present) for ES|QL CCS search. + */ + public static void checkForCcsLicense( + List indices, + IndicesExpressionGrouper indicesGrouper, + XPackLicenseState licenseState + ) { + for (TableInfo tableInfo : indices) { + Map groupedIndices; + try { + groupedIndices = indicesGrouper.groupIndices(IndicesOptions.DEFAULT, tableInfo.id().index()); + } catch (NoSuchRemoteClusterException e) { + if (EsqlLicenseChecker.isCcsAllowed(licenseState)) { + throw e; + } else { + throw EsqlLicenseChecker.invalidLicenseForCcsException(licenseState); + } + } + // check if it is a cross-cluster query + if (groupedIndices.size() > 1 || groupedIndices.containsKey(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY) == false) { + if (EsqlLicenseChecker.isCcsAllowed(licenseState) == false) { + throw EsqlLicenseChecker.invalidLicenseForCcsException(licenseState); + } + } + } + } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java index 60b632c443f8e..1000c05282fdb 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java @@ -8,10 +8,18 @@ package org.elasticsearch.xpack.esql.session; import org.apache.lucene.index.CorruptIndexException; +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure; import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.Strings; import org.elasticsearch.index.IndexMode; +import org.elasticsearch.indices.IndicesExpressionGrouper; +import org.elasticsearch.license.License; +import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.license.internal.XPackLicenseStatus; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.NoSeedNodeLeftException; @@ -20,9 +28,11 @@ import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.xpack.esql.VerificationException; import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; +import org.elasticsearch.xpack.esql.analysis.TableInfo; import org.elasticsearch.xpack.esql.core.type.EsField; import org.elasticsearch.xpack.esql.index.EsIndex; import org.elasticsearch.xpack.esql.index.IndexResolution; +import org.elasticsearch.xpack.esql.plan.TableIdentifier; import org.elasticsearch.xpack.esql.type.EsFieldTests; import java.util.ArrayList; @@ -32,8 +42,12 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.LongSupplier; import java.util.function.Predicate; +import java.util.stream.Collectors; +import static org.elasticsearch.xpack.esql.core.tree.Source.EMPTY; +import static org.elasticsearch.xpack.esql.session.EsqlSessionCCSUtils.checkForCcsLicense; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -627,4 +641,148 @@ public void testMissingIndicesIsFatal() { } } + + public void testCheckForCcsLicense() { + final TestIndicesExpressionGrouper indicesGrouper = new TestIndicesExpressionGrouper(); + + // this seems to be used only for tracking usage of features, not for checking if a license is expired + final LongSupplier currTime = () -> System.currentTimeMillis(); + + XPackLicenseState enterpriseLicenseValid = new XPackLicenseState(currTime, activeLicenseStatus(License.OperationMode.ENTERPRISE)); + XPackLicenseState trialLicenseValid = new XPackLicenseState(currTime, activeLicenseStatus(License.OperationMode.TRIAL)); + XPackLicenseState platinumLicenseValid = new XPackLicenseState(currTime, activeLicenseStatus(License.OperationMode.PLATINUM)); + XPackLicenseState goldLicenseValid = new XPackLicenseState(currTime, activeLicenseStatus(License.OperationMode.GOLD)); + XPackLicenseState basicLicenseValid = new XPackLicenseState(currTime, activeLicenseStatus(License.OperationMode.BASIC)); + XPackLicenseState standardLicenseValid = new XPackLicenseState(currTime, activeLicenseStatus(License.OperationMode.STANDARD)); + XPackLicenseState missingLicense = new XPackLicenseState(currTime, activeLicenseStatus(License.OperationMode.MISSING)); + XPackLicenseState nullLicense = null; + + final XPackLicenseStatus enterpriseStatus = inactiveLicenseStatus(License.OperationMode.ENTERPRISE); + XPackLicenseState enterpriseLicenseInactive = new XPackLicenseState(currTime, enterpriseStatus); + XPackLicenseState trialLicenseInactive = new XPackLicenseState(currTime, inactiveLicenseStatus(License.OperationMode.TRIAL)); + XPackLicenseState platinumLicenseInactive = new XPackLicenseState(currTime, inactiveLicenseStatus(License.OperationMode.PLATINUM)); + XPackLicenseState goldLicenseInactive = new XPackLicenseState(currTime, inactiveLicenseStatus(License.OperationMode.GOLD)); + XPackLicenseState basicLicenseInactive = new XPackLicenseState(currTime, inactiveLicenseStatus(License.OperationMode.BASIC)); + XPackLicenseState standardLicenseInactive = new XPackLicenseState(currTime, inactiveLicenseStatus(License.OperationMode.STANDARD)); + XPackLicenseState missingLicenseInactive = new XPackLicenseState(currTime, inactiveLicenseStatus(License.OperationMode.MISSING)); + + // local only search does not require an enterprise license + { + List indices = new ArrayList<>(); + indices.add(new TableInfo(new TableIdentifier(EMPTY, null, randomFrom("idx", "idx1,idx2*")))); + + checkForCcsLicense(indices, indicesGrouper, enterpriseLicenseValid); + checkForCcsLicense(indices, indicesGrouper, platinumLicenseValid); + checkForCcsLicense(indices, indicesGrouper, goldLicenseValid); + checkForCcsLicense(indices, indicesGrouper, trialLicenseValid); + checkForCcsLicense(indices, indicesGrouper, basicLicenseValid); + checkForCcsLicense(indices, indicesGrouper, standardLicenseValid); + checkForCcsLicense(indices, indicesGrouper, missingLicense); + checkForCcsLicense(indices, indicesGrouper, nullLicense); + + checkForCcsLicense(indices, indicesGrouper, enterpriseLicenseInactive); + checkForCcsLicense(indices, indicesGrouper, platinumLicenseInactive); + checkForCcsLicense(indices, indicesGrouper, goldLicenseInactive); + checkForCcsLicense(indices, indicesGrouper, trialLicenseInactive); + checkForCcsLicense(indices, indicesGrouper, basicLicenseInactive); + checkForCcsLicense(indices, indicesGrouper, standardLicenseInactive); + checkForCcsLicense(indices, indicesGrouper, missingLicenseInactive); + } + + // cross-cluster search requires a valid (active, non-expired) enterprise license OR a valid trial license + { + List indices = new ArrayList<>(); + final String indexExprWithRemotes = randomFrom("remote:idx", "idx1,remote:idx2*,remote:logs,c*:idx4"); + if (randomBoolean()) { + indices.add(new TableInfo(new TableIdentifier(EMPTY, null, indexExprWithRemotes))); + } else { + indices.add(new TableInfo(new TableIdentifier(EMPTY, null, randomFrom("idx", "idx1,idx2*")))); + indices.add(new TableInfo(new TableIdentifier(EMPTY, null, indexExprWithRemotes))); + } + + // licenses that work + checkForCcsLicense(indices, indicesGrouper, enterpriseLicenseValid); + checkForCcsLicense(indices, indicesGrouper, trialLicenseValid); + + // all others fail --- + + // active non-expired non-Enterprise non-Trial licenses + assertLicenseCheckFails(indices, indicesGrouper, platinumLicenseValid, "active platinum license"); + assertLicenseCheckFails(indices, indicesGrouper, goldLicenseValid, "active gold license"); + assertLicenseCheckFails(indices, indicesGrouper, basicLicenseValid, "active basic license"); + assertLicenseCheckFails(indices, indicesGrouper, standardLicenseValid, "active standard license"); + assertLicenseCheckFails(indices, indicesGrouper, missingLicense, "active missing license"); + assertLicenseCheckFails(indices, indicesGrouper, nullLicense, "none"); + + // inactive/expired licenses + assertLicenseCheckFails(indices, indicesGrouper, enterpriseLicenseInactive, "expired enterprise license"); + assertLicenseCheckFails(indices, indicesGrouper, trialLicenseInactive, "expired trial license"); + assertLicenseCheckFails(indices, indicesGrouper, platinumLicenseInactive, "expired platinum license"); + assertLicenseCheckFails(indices, indicesGrouper, goldLicenseInactive, "expired gold license"); + assertLicenseCheckFails(indices, indicesGrouper, basicLicenseInactive, "expired basic license"); + assertLicenseCheckFails(indices, indicesGrouper, standardLicenseInactive, "expired standard license"); + assertLicenseCheckFails(indices, indicesGrouper, missingLicenseInactive, "expired missing license"); + } + } + + private XPackLicenseStatus activeLicenseStatus(License.OperationMode operationMode) { + return new XPackLicenseStatus(operationMode, true, null); + } + + private XPackLicenseStatus inactiveLicenseStatus(License.OperationMode operationMode) { + return new XPackLicenseStatus(operationMode, false, "License Expired 123"); + } + + private void assertLicenseCheckFails( + List indices, + TestIndicesExpressionGrouper indicesGrouper, + XPackLicenseState licenseState, + String expectedErrorMessageSuffix + ) { + ElasticsearchStatusException e = expectThrows( + ElasticsearchStatusException.class, + () -> checkForCcsLicense(indices, indicesGrouper, licenseState) + ); + assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); + assertThat( + e.getMessage(), + equalTo( + "A valid Enterprise license is required to run ES|QL cross-cluster searches. License found: " + expectedErrorMessageSuffix + ) + ); + } + + static class TestIndicesExpressionGrouper implements IndicesExpressionGrouper { + @Override + public Map groupIndices(IndicesOptions indicesOptions, String[] indexExpressions) { + final Map originalIndicesMap = new HashMap<>(); + final String localKey = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; + + for (String expr : indexExpressions) { + assertFalse(Strings.isNullOrBlank(expr)); + String[] split = expr.split(":", 2); + assertTrue("Bad index expression: " + expr, split.length < 3); + String clusterAlias; + String indexExpr; + if (split.length == 1) { + clusterAlias = localKey; + indexExpr = expr; + } else { + clusterAlias = split[0]; + indexExpr = split[1]; + + } + OriginalIndices currIndices = originalIndicesMap.get(clusterAlias); + if (currIndices == null) { + originalIndicesMap.put(clusterAlias, new OriginalIndices(new String[] { indexExpr }, indicesOptions)); + } else { + List indicesList = Arrays.stream(currIndices.indices()).collect(Collectors.toList()); + indicesList.add(indexExpr); + originalIndicesMap.put(clusterAlias, new OriginalIndices(indicesList.toArray(new String[0]), indicesOptions)); + } + } + return originalIndicesMap; + } + } + } diff --git a/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityEsqlIT.java b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityEsqlIT.java index 09449f81121fd..d6bad85161fd9 100644 --- a/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityEsqlIT.java +++ b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityEsqlIT.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.remotecluster; +import org.apache.http.client.methods.HttpGet; import org.elasticsearch.Build; import org.elasticsearch.client.Request; import org.elasticsearch.client.RequestOptions; @@ -22,6 +23,7 @@ import org.elasticsearch.test.cluster.ElasticsearchCluster; import org.elasticsearch.test.cluster.util.resource.Resource; import org.elasticsearch.test.junit.RunnableTestRuleAdapter; +import org.elasticsearch.test.rest.ObjectPath; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.json.JsonXContent; import org.junit.After; @@ -34,6 +36,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Base64; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -51,6 +54,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.not; public class RemoteClusterSecurityEsqlIT extends AbstractRemoteClusterSecurityTestCase { private static final AtomicReference> API_KEY_MAP_REF = new AtomicReference<>(); @@ -342,6 +346,14 @@ public void testCrossClusterQuery() throws Exception { configureRemoteCluster(); populateData(); + Map esqlCcsLicenseFeatureUsage = fetchEsqlCcsFeatureUsageFromNode(client()); + + Object ccsLastUsedTimestampAtStartOfTest = null; + if (esqlCcsLicenseFeatureUsage.isEmpty() == false) { + // some test runs will have a usage value already, so capture that to compare at end of test + ccsLastUsedTimestampAtStartOfTest = esqlCcsLicenseFeatureUsage.get("last_used"); + } + // query remote cluster only Request request = esqlRequest(""" FROM my_remote_cluster:employees @@ -385,6 +397,15 @@ public void testCrossClusterQuery() throws Exception { | LIMIT 2 | KEEP emp_id, department""")); assertRemoteOnlyAgainst2IndexResults(response); + + // check that the esql-ccs license feature is now present and that the last_used field has been updated + esqlCcsLicenseFeatureUsage = fetchEsqlCcsFeatureUsageFromNode(client()); + assertThat(esqlCcsLicenseFeatureUsage.size(), equalTo(5)); + Object lastUsed = esqlCcsLicenseFeatureUsage.get("last_used"); + assertNotNull("lastUsed should not be null", lastUsed); + if (ccsLastUsedTimestampAtStartOfTest != null) { + assertThat(lastUsed.toString(), not(equalTo(ccsLastUsedTimestampAtStartOfTest.toString()))); + } } @SuppressWarnings("unchecked") @@ -1660,4 +1681,18 @@ void assertExpectedClustersForMissingIndicesTests(Map responseMa assertThat((int) shards.get("failed"), is(0)); } } + + private static Map fetchEsqlCcsFeatureUsageFromNode(RestClient client) throws IOException { + Request request = new Request(HttpGet.METHOD_NAME, "_license/feature_usage"); + request.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("Authorization", basicAuthHeaderValue(USER, PASS))); + Response response = client.performRequest(request); + ObjectPath path = ObjectPath.createFromResponse(response); + List> features = path.evaluate("features"); + for (var feature : features) { + if ("esql-ccs".equals(feature.get("name"))) { + return feature; + } + } + return Collections.emptyMap(); + } } diff --git a/x-pack/qa/multi-cluster-search-security/legacy-with-basic-license/src/test/resources/rest-api-spec/test/querying_cluster/80_esql.yml b/x-pack/qa/multi-cluster-search-security/legacy-with-basic-license/src/test/resources/rest-api-spec/test/querying_cluster/80_esql.yml index 4c0bbfd7ec139..1b435c551fbe9 100644 --- a/x-pack/qa/multi-cluster-search-security/legacy-with-basic-license/src/test/resources/rest-api-spec/test/querying_cluster/80_esql.yml +++ b/x-pack/qa/multi-cluster-search-security/legacy-with-basic-license/src/test/resources/rest-api-spec/test/querying_cluster/80_esql.yml @@ -86,11 +86,12 @@ teardown: ignore: 404 --- -"Index data and search on the mixed cluster": +"ES|QL cross-cluster query fails with basic license": - skip: features: allowed_warnings - do: + catch: bad_request allowed_warnings: - "Line 1:21: Square brackets '[]' need to be removed in FROM METADATA declaration" headers: { Authorization: "Basic am9lOnMza3JpdC1wYXNzd29yZA==" } @@ -98,23 +99,11 @@ teardown: body: query: 'FROM *:esql*,esql_* | STATS total = sum(cost) by tag | SORT tag | LIMIT 10' - - match: {columns.0.name: "total"} - - match: {columns.0.type: "long"} - - match: {columns.1.name: "tag"} - - match: {columns.1.type: "keyword"} - - - match: {values.0.0: 2200} - - match: {values.0.1: "computer"} - - match: {values.1.0: 170} - - match: {values.1.1: "headphone"} - - match: {values.2.0: 2100 } - - match: {values.2.1: "laptop" } - - match: {values.3.0: 1000 } - - match: {values.3.1: "monitor" } - - match: {values.4.0: 550 } - - match: {values.4.1: "tablet" } + - match: { error.type: "status_exception" } + - match: { error.reason: "A valid Enterprise license is required to run ES|QL cross-cluster searches. License found: active basic license" } - do: + catch: bad_request allowed_warnings: - "Line 1:21: Square brackets '[]' need to be removed in FROM METADATA declaration" headers: { Authorization: "Basic am9lOnMza3JpdC1wYXNzd29yZA==" } @@ -128,28 +117,11 @@ teardown: lte: "2023-01-03" format: "yyyy-MM-dd" - - match: {columns.0.name: "_index"} - - match: {columns.0.type: "keyword"} - - match: {columns.1.name: "tag"} - - match: {columns.1.type: "keyword"} - - match: {columns.2.name: "cost" } - - match: {columns.2.type: "long" } - - - match: {values.0.0: "esql_local"} - - match: {values.0.1: "monitor"} - - match: {values.0.2: 250 } - - match: {values.1.0: "my_remote_cluster:esql_index" } - - match: {values.1.1: "tablet"} - - match: {values.1.2: 450 } - - match: {values.2.0: "my_remote_cluster:esql_index" } - - match: {values.2.1: "computer" } - - match: {values.2.2: 1200 } - - match: {values.3.0: "esql_local"} - - match: {values.3.1: "laptop" } - - match: {values.3.2: 2100 } + - match: { error.type: "status_exception" } + - match: { error.reason: "A valid Enterprise license is required to run ES|QL cross-cluster searches. License found: active basic license" } --- -"Enrich across clusters": +"ES|QL enrich query across clusters fails with basic license": - requires: cluster_features: ["gte_v8.13.0"] reason: "Enrich across clusters available in 8.13 or later" @@ -194,27 +166,14 @@ teardown: index: suggestions - do: + catch: bad_request headers: { Authorization: "Basic am9lOnMza3JpdC1wYXNzd29yZA==" } esql.query: body: query: 'FROM *:esql*,esql_* | STATS total = sum(cost) by tag | SORT total DESC | LIMIT 3 | ENRICH suggestions | KEEP tag, total, phrase' - - match: {columns.0.name: "tag"} - - match: {columns.0.type: "keyword"} - - match: {columns.1.name: "total" } - - match: {columns.1.type: "long" } - - match: {columns.2.name: "phrase" } - - match: {columns.2.type: "keyword" } - - - match: {values.0.0: "computer"} - - match: {values.0.1: 2200} - - match: {values.0.2: "best desktop for programming"} - - match: {values.1.0: "laptop"} - - match: {values.1.1: 2100 } - - match: {values.1.2: "the best battery life laptop"} - - match: {values.2.0: "monitor" } - - match: {values.2.1: 1000 } - - match: {values.2.2: "4k or 5k or 6K monitor?" } + - match: { error.type: "status_exception" } + - match: { error.reason: "A valid Enterprise license is required to run ES|QL cross-cluster searches. License found: active basic license" } - do: enrich.delete_policy: