Skip to content

Commit

Permalink
Enable testing remote metadata for ES|QL CCS (elastic#116767) (elasti…
Browse files Browse the repository at this point in the history
…c#116784)

* Enable testing remote metadata for CCS

(cherry picked from commit 270d9d2)
  • Loading branch information
smalyshev authored Nov 14, 2024
1 parent 63b5f05 commit ec89252
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 4 deletions.
1 change: 1 addition & 0 deletions x-pack/plugin/esql/qa/server/multi-clusters/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ apply plugin: 'elasticsearch.bwc-test'
dependencies {
javaRestTestImplementation project(xpackModule('esql:qa:testFixtures'))
javaRestTestImplementation project(xpackModule('esql:qa:server'))
javaRestTestImplementation project(xpackModule('esql'))
}

def supportedVersion = bwcVersion -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@
import static org.elasticsearch.xpack.esql.CsvTestUtils.isEnabled;
import static org.elasticsearch.xpack.esql.CsvTestsDataLoader.ENRICH_SOURCE_INDICES;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.classpathResources;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V2;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_PLANNING_V1;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.METADATA_FIELDS_REMOTE_TEST;
import static org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase.Mode.SYNC;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
Expand Down Expand Up @@ -101,16 +105,25 @@ public MultiClusterSpecIT(

@Override
protected void shouldSkipTest(String testName) throws IOException {
boolean remoteMetadata = testCase.requiredCapabilities.contains(METADATA_FIELDS_REMOTE_TEST.capabilityName());
if (remoteMetadata) {
// remove the capability from the test to enable it
testCase.requiredCapabilities = testCase.requiredCapabilities.stream()
.filter(c -> c.equals("metadata_fields_remote_test") == false)
.toList();
}
super.shouldSkipTest(testName);
checkCapabilities(remoteClusterClient(), remoteFeaturesService(), testName, testCase);
assumeFalse("can't test with _index metadata", hasIndexMetadata(testCase.query));
// Do not run tests including "METADATA _index" unless marked with metadata_fields_remote_test,
// because they may produce inconsistent results with multiple clusters.
assumeFalse("can't test with _index metadata", (remoteMetadata == false) && hasIndexMetadata(testCase.query));
assumeTrue(
"Test " + testName + " is skipped on " + Clusters.oldVersion(),
isEnabled(testName, instructions, Clusters.oldVersion())
);
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains("inlinestats"));
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains("inlinestats_v2"));
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains("join_planning_v1"));
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS.capabilityName()));
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V2.capabilityName()));
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_PLANNING_V1.capabilityName()));
}

private TestFeatureService remoteFeaturesService() throws IOException {
Expand Down Expand Up @@ -151,6 +164,9 @@ protected RestClient buildClient(Settings settings, HttpHost[] localHosts) throw
return twoClients(localClient, remoteClient);
}

// These indices are used in metadata tests so we want them on remote only for consistency
public static final List<String> METADATA_INDICES = List.of("employees", "apps", "ul_logs");

/**
* Creates a new mock client that dispatches every request to both the local and remote clusters, excluding _bulk and _query requests.
* - '_bulk' requests are randomly sent to either the local or remote cluster to populate data. Some spec tests, such as AVG,
Expand All @@ -166,6 +182,8 @@ static RestClient twoClients(RestClient localClient, RestClient remoteClient) th
String endpoint = request.getEndpoint();
if (endpoint.startsWith("/_query")) {
return localClient.performRequest(request);
} else if (endpoint.endsWith("/_bulk") && METADATA_INDICES.stream().anyMatch(i -> endpoint.equals("/" + i + "/_bulk"))) {
return remoteClient.performRequest(request);
} else if (endpoint.endsWith("/_bulk") && ENRICH_SOURCE_INDICES.stream().noneMatch(i -> endpoint.equals("/" + i + "/_bulk"))) {
return bulkClient.performRequest(request);
} else {
Expand Down Expand Up @@ -203,6 +221,9 @@ static Request[] cloneRequests(Request orig, int numClones) throws IOException {
return clones;
}

/**
* Convert FROM employees ... => FROM *:employees,employees
*/
static CsvSpecReader.CsvTestCase convertToRemoteIndices(CsvSpecReader.CsvTestCase testCase) {
String query = testCase.query;
String[] commands = query.split("\\|");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
simpleKeep
required_capability: metadata_fields
required_capability: metadata_fields_remote_test
from employees metadata _index, _version | sort _index desc, emp_no | limit 2 | keep emp_no, _index, _version;

emp_no:integer |_index:keyword |_version:long
10001 |remote_cluster:employees |1
10002 |remote_cluster:employees |1
;

aliasWithSameName
required_capability: metadata_fields
required_capability: metadata_fields_remote_test
from employees metadata _index, _version | sort _index desc, emp_no | limit 2 | eval _index = _index, _version = _version | keep emp_no, _index, _version;

emp_no:integer |_index:keyword |_version:long
10001 |remote_cluster:employees |1
10002 |remote_cluster:employees |1
;

inComparison
required_capability: metadata_fields
required_capability: metadata_fields_remote_test
from employees metadata _index, _version | sort emp_no | where _index == "remote_cluster:employees" | where _version == 1 | keep emp_no | limit 2;

emp_no:integer
10001
10002
;

metaIndexInAggs
required_capability: metadata_fields
required_capability: metadata_fields_remote_test
FROM employees METADATA _index, _id
| STATS max = MAX(emp_no) BY _index | SORT _index;

max:integer |_index:keyword
10100 |remote_cluster:employees
;

metaIndexAliasedInAggs
required_capability: metadata_fields
required_capability: metadata_fields_remote_test
from employees metadata _index | eval _i = _index | stats max = max(emp_no) by _i | SORT _i;

max:integer |_i:keyword
10100 |remote_cluster:employees
;

metaVersionInAggs
required_capability: metadata_fields
required_capability: metadata_fields_remote_test
from employees metadata _version | stats min = min(emp_no) by _version;

min:integer |_version:long
10001 |1
;

metaVersionAliasedInAggs
required_capability: metadata_fields
required_capability: metadata_fields_remote_test
from employees metadata _version | eval _v = _version | stats min = min(emp_no) by _v;

min:integer |_v:long
10001 |1
;

inAggsAndAsGroups
required_capability: metadata_fields
required_capability: metadata_fields_remote_test
from employees metadata _index, _version | stats max = max(_version) by _index | SORT _index;

max:long |_index:keyword
1 |remote_cluster:employees
;

inAggsAndAsGroupsAliased
required_capability: metadata_fields
required_capability: metadata_fields_remote_test
from employees metadata _index, _version | eval _i = _index, _v = _version | stats max = max(_v) by _i | SORT _i;

max:long |_i:keyword
1 |remote_cluster:employees
;

inFunction
required_capability: metadata_fields
required_capability: metadata_fields_remote_test
from employees metadata _index, _version | sort emp_no | where length(_index) == length("remote_cluster:employees") | where abs(_version) == 1 | keep emp_no | limit 2;

emp_no:integer
10001
10002
;

inArithmetics
required_capability: metadata_fields
required_capability: metadata_fields_remote_test
from employees metadata _index, _version | eval i = _version + 2 | stats min = min(emp_no) by i;

min:integer |i:long
10001 |3
;

inSort
required_capability: metadata_fields
required_capability: metadata_fields_remote_test
from employees metadata _index, _version | sort _version, _index desc, emp_no | keep emp_no, _version, _index | limit 2;

emp_no:integer |_version:long |_index:keyword
10001 |1 |remote_cluster:employees
10002 |1 |remote_cluster:employees
;

withMvFunction
required_capability: metadata_fields
required_capability: metadata_fields_remote_test
from employees metadata _version | eval i = mv_avg(_version) + 2 | stats min = min(emp_no) by i;

min:integer |i:double
10001 |3.0
;

overwritten
required_capability: metadata_fields
required_capability: metadata_fields_remote_test
from employees metadata _index, _version | sort emp_no | eval _index = 3, _version = "version" | keep emp_no, _index, _version | limit 3;

emp_no:integer |_index:integer |_version:keyword
10001 |3 |version
10002 |3 |version
10003 |3 |version
;

multipleIndices
required_capability: metadata_fields
required_capability: metadata_fields_remote_test
FROM ul_logs, apps METADATA _index, _version
| WHERE id IN (13, 14) AND _version == 1
| EVAL key = CONCAT(_index, "_", TO_STR(id))
| SORT id, _index
| KEEP id, _index, _version, key
;

id:long |_index:keyword |_version:long |key:keyword
13 |remote_cluster:apps |1 |remote_cluster:apps_13
13 |remote_cluster:ul_logs |1 |remote_cluster:ul_logs_13
14 |remote_cluster:apps |1 |remote_cluster:apps_14
14 |remote_cluster:ul_logs |1 |remote_cluster:ul_logs_14

;
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,9 @@ public enum Cap {
ADD_LIMIT_INSIDE_MV_EXPAND,

DELAY_DEBUG_FN(Build.current().isSnapshot()),

/** Capability for remote metadata test */
METADATA_FIELDS_REMOTE_TEST(false),
/**
* WIP on Join planning
* - Introduce BinaryPlan and co
Expand Down

0 comments on commit ec89252

Please sign in to comment.