Skip to content

Commit

Permalink
enrich tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jakelandis committed Apr 4, 2024
1 parent 8c5e6aa commit 0a09403
Showing 1 changed file with 237 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;

Expand Down Expand Up @@ -262,6 +265,12 @@ public void populateData() throws Exception {
"privileges": ["read", "read_cross_cluster"],
"clusters": ["my_remote_cluster"]
}
],
"remote_cluster": [
{
"privileges": ["monitor_enrich"],
"clusters": ["my_remote_cluster"]
}
]
}""");
assertOK(adminClient().performRequest(putRoleRequest));
Expand All @@ -284,7 +293,7 @@ public void wipeData() throws Exception {
wipe.accept(client());
}

@AwaitsFix(bugUrl = "cross-clusters query doesn't work with RCS 2.0")
@SuppressWarnings("unchecked")
public void testCrossClusterQuery() throws Exception {
configureRemoteCluster();
populateData();
Expand All @@ -297,7 +306,15 @@ public void testCrossClusterQuery() throws Exception {
| LIMIT 2
| KEEP emp_id, department"""));
assertOK(response);
Map<String, Object> values = entityAsMap(response);
Map<String, Object> responseAsMap = entityAsMap(response);
List<?> columns = (List<?>) responseAsMap.get("columns");
List<?> values = (List<?>) responseAsMap.get("values");
assertEquals(2, columns.size());
assertEquals(2, values.size());
List<String> flatList = values.stream()
.flatMap(innerList -> innerList instanceof List ? ((List<String>) innerList).stream() : Stream.empty())
.collect(Collectors.toList());
assertThat(flatList, containsInAnyOrder("1", "3", "engineering", "sales"));
}
{
Response response = performRequestWithRemoteSearchUser(esqlRequest("""
Expand All @@ -306,31 +323,125 @@ public void testCrossClusterQuery() throws Exception {
| LIMIT 10"""));
assertOK(response);

}
// Check that authentication fails if we use a non-existent API key
updateClusterSettings(
randomBoolean()
? Settings.builder()
.put("cluster.remote.invalid_remote.seeds", fulfillingCluster.getRemoteClusterServerEndpoint(0))
.build()
: Settings.builder()
.put("cluster.remote.invalid_remote.mode", "proxy")
.put("cluster.remote.invalid_remote.proxy_address", fulfillingCluster.getRemoteClusterServerEndpoint(0))
.build()
);
for (String indices : List.of("my_remote_cluster:employees,employees", "my_remote_cluster:employees")) {
ResponseException error = expectThrows(ResponseException.class, () -> {
var q = "FROM " + indices + "| SORT emp_id DESC | LIMIT 10";
performRequestWithLocalSearchUser(esqlRequest(q));
});
assertThat(error.getResponse().getStatusLine().getStatusCode(), equalTo(403));
assertThat(error.getResponse().getStatusLine().getStatusCode(), equalTo(401));
assertThat(error.getMessage(), containsString("unable to find apikey"));
Map<String, Object> responseAsMap = entityAsMap(response);
List<?> columns = (List<?>) responseAsMap.get("columns");
List<?> values = (List<?>) responseAsMap.get("values");
assertEquals(2, columns.size());
assertEquals(9, values.size());
List<String> flatList = values.stream()
.flatMap(innerList -> innerList instanceof List ? ((List<String>) innerList).stream() : Stream.empty())
.collect(Collectors.toList());
assertThat(
flatList,
containsInAnyOrder(
"1",
"2",
"3",
"4",
"5",
"6",
"7",
"8",
"9",
"engineering",
"engineering",
"engineering",
"management",
"sales",
"sales",
"marketing",
"marketing",
"support"
)
);
}
}
}

@AwaitsFix(bugUrl = "cross-clusters enrich doesn't work with RCS 2.0")
@SuppressWarnings("unchecked")
@AwaitsFix(bugUrl = "this trips ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH_COORDINATION)")
// comment out those assertions in EsqlIndexResolver and TransportFieldCapabilitiesAction to see this test pass
public void testCrossClusterQueryAgainstInvalidRemote() throws Exception {
configureRemoteCluster();
populateData();

// avoids getting 404 errors
updateClusterSettings(
randomBoolean()
? Settings.builder().put("cluster.remote.invalid_remote.seeds", fulfillingCluster.getRemoteClusterServerEndpoint(0)).build()
: Settings.builder()
.put("cluster.remote.invalid_remote.mode", "proxy")
.put("cluster.remote.invalid_remote.proxy_address", fulfillingCluster.getRemoteClusterServerEndpoint(0))
.build()
);

// invalid remote with local index should return local results
var q = "FROM invalid_remote:employees,employees | SORT emp_id DESC | LIMIT 10";
Response response = performRequestWithRemoteSearchUser(esqlRequest(q));
assertOK(response);
Map<String, Object> responseAsMap = entityAsMap(response);
List<?> columns = (List<?>) responseAsMap.get("columns");
List<?> values = (List<?>) responseAsMap.get("values");
assertEquals(2, columns.size());
assertEquals(4, values.size());
List<String> flatList = values.stream()
.flatMap(innerList -> innerList instanceof List ? ((List<String>) innerList).stream() : Stream.empty())
.collect(Collectors.toList());
// local results
assertThat(flatList, containsInAnyOrder("2", "4", "6", "8", "support", "management", "engineering", "marketing"));

// only calling an invalid remote should error
ResponseException error = expectThrows(ResponseException.class, () -> {
var q2 = "FROM invalid_remote:employees | SORT emp_id DESC | LIMIT 10";
performRequestWithRemoteSearchUser(esqlRequest(q2));
});
assertThat(error.getResponse().getStatusLine().getStatusCode(), equalTo(401));
assertThat(error.getMessage(), containsString("unable to find apikey"));
}

@SuppressWarnings("unchecked")
@AwaitsFix(bugUrl = "cross-clusters search should not require local index permissions")
// will work if you add change "indices": [] to : "indices": [ { "names": [""], "privileges": ["indices:data/read/esql"] } ]
// however that should not be required to executed search across clusters
public void testCrossClusterQueryWithOnlyRemotePrivs() throws Exception {
configureRemoteCluster();
populateData();

// Query cluster
final var putRoleRequest = new Request("PUT", "/_security/role/" + REMOTE_SEARCH_ROLE);

putRoleRequest.setJsonEntity("""
{
"indices": [],
"remote_indices": [
{
"names": ["employees"],
"privileges": ["read", "read_cross_cluster"],
"clusters": ["my_remote_cluster"]
}
]
}""");
assertOK(adminClient().performRequest(putRoleRequest));

// Query cluster
Response response = performRequestWithRemoteSearchUser(esqlRequest("""
FROM my_remote_cluster:employees
| SORT emp_id ASC
| LIMIT 2
| KEEP emp_id, department"""));
assertOK(response);
Map<String, Object> responseAsMap = entityAsMap(response);
List<?> columns = (List<?>) responseAsMap.get("columns");
List<?> values = (List<?>) responseAsMap.get("values");
assertEquals(2, columns.size());
assertEquals(2, values.size());
List<String> flatList = values.stream()
.flatMap(innerList -> innerList instanceof List ? ((List<String>) innerList).stream() : Stream.empty())
.collect(Collectors.toList());
assertThat(flatList, containsInAnyOrder("1", "3", "engineering", "sales"));
}

@SuppressWarnings("unchecked")
public void testCrossClusterEnrich() throws Exception {
configureRemoteCluster();
populateData();
Expand All @@ -344,50 +455,112 @@ public void testCrossClusterEnrich() throws Exception {
| SORT size DESC
| LIMIT 2"""));
assertOK(response);
Map<String, Object> values = entityAsMap(response);
Map<String, Object> responseAsMap = entityAsMap(response);
List<?> columns = (List<?>) responseAsMap.get("columns");
List<?> values = (List<?>) responseAsMap.get("values");
assertEquals(2, columns.size());
assertEquals(2, values.size());
List<?> flatList = values.stream()
.flatMap(innerList -> innerList instanceof List ? ((List<?>) innerList).stream() : Stream.empty())
.collect(Collectors.toList());
assertThat(flatList, containsInAnyOrder(2, 3, "usa", "canada"));

// ESQL with enrich is denied when user has no access to enrich policies
final var putLocalSearchRoleRequest = new Request("PUT", "/_security/role/local_search");
putLocalSearchRoleRequest.setJsonEntity("""

// // ESQL with enrich is denied when user has no access to enrich policies
// final var putLocalSearchRoleRequest = new Request("PUT", "/_security/role/local_search");
// putLocalSearchRoleRequest.setJsonEntity("""
// {
// "indices": [
// {
// "names": ["employees"],
// "privileges": ["read"]
// }
// ],
// "cluster": [ ],
// "remote_indices": [
// {
// "names": ["employees"],
// "privileges": ["read", "read_cross_cluster"],
// "clusters": ["my_remote_cluster"]
// }
// ]
// }""");
// assertOK(adminClient().performRequest(putLocalSearchRoleRequest));
// final var putlocalSearchUserRequest = new Request("PUT", "/_security/user/local_search_user");
// putlocalSearchUserRequest.setJsonEntity("""
// {
// "password": "x-pack-test-password",
// "roles" : ["local_search"]
// }""");
// assertOK(adminClient().performRequest(putlocalSearchUserRequest));
// for (String indices : List.of("my_remote_cluster:employees,employees", "my_remote_cluster:employees")) {
// ResponseException error = expectThrows(ResponseException.class, () -> {
// var q = "FROM " + indices + "| ENRICH countries | STATS size=count(*) by country | SORT size | LIMIT 2";
// performRequestWithLocalSearchUser(esqlRequest(q));
// });
// assertThat(error.getResponse().getStatusLine().getStatusCode(), equalTo(403));
// assertThat(
// error.getMessage(),
// containsString(
// "action [cluster:monitor/xpack/enrich/esql/resolve_policy] towards remote cluster [my_remote_cluster]"
// + " is unauthorized for user [local_search_user] with effective roles [local_search]"
// )
// );
// }
}
}

@SuppressWarnings("unchecked")
@AwaitsFix(bugUrl = "cross-clusters search should not require local index permissions")
// will work if you add change "indices": [] to : "indices": [ { "names": [""], "privileges": ["indices:data/read/esql"] } ]
// and if you change "cluster": [] to "cluster": [ "monitor_enrich" ]
// however that should not be required to executed search + enrich across clusters
public void testCrossClusterEnrichWithOnlyRemotePrivs() throws Exception {
configureRemoteCluster();
populateData();

// Query cluster
final var putRoleRequest = new Request("PUT", "/_security/role/" + REMOTE_SEARCH_ROLE);

putRoleRequest.setJsonEntity("""
{
"indices": [],
"cluster": [],
"remote_indices": [
{
"indices": [
{
"names": ["employees"],
"privileges": ["read"]
}
],
"cluster": [ ],
"remote_indices": [
{
"names": ["employees"],
"privileges": ["read", "read_cross_cluster"],
"clusters": ["my_remote_cluster"]
}
]
}""");
assertOK(adminClient().performRequest(putLocalSearchRoleRequest));
final var putlocalSearchUserRequest = new Request("PUT", "/_security/user/local_search_user");
putlocalSearchUserRequest.setJsonEntity("""
"names": ["employees"],
"privileges": ["read", "read_cross_cluster"],
"clusters": ["my_remote_cluster"]
}
],
"remote_cluster": [
{
"password": "x-pack-test-password",
"roles" : ["local_search"]
}""");
assertOK(adminClient().performRequest(putlocalSearchUserRequest));
for (String indices : List.of("my_remote_cluster:employees,employees", "my_remote_cluster:employees")) {
ResponseException error = expectThrows(ResponseException.class, () -> {
var q = "FROM " + indices + "| ENRICH countries | STATS size=count(*) by country | SORT size | LIMIT 2";
performRequestWithLocalSearchUser(esqlRequest(q));
});
assertThat(error.getResponse().getStatusLine().getStatusCode(), equalTo(403));
assertThat(
error.getMessage(),
containsString(
"action [cluster:monitor/xpack/enrich/esql/resolve_policy] towards remote cluster [my_remote_cluster]"
+ " is unauthorized for user [local_search_user] with effective roles [local_search]"
)
);
}
}
"privileges": ["monitor_enrich"],
"clusters": ["my_remote_cluster"]
}
]
}""");
assertOK(adminClient().performRequest(putRoleRequest));

// Query cluster
// ESQL with enrich is okay when user has access to enrich polices
Response response = performRequestWithRemoteSearchUser(esqlRequest("""
FROM my_remote_cluster:employees
| ENRICH countries
| STATS size=count(*) by country
| SORT size DESC
| LIMIT 2"""));
assertOK(response);

Map<String, Object> responseAsMap = entityAsMap(response);
List<?> columns = (List<?>) responseAsMap.get("columns");
List<?> values = (List<?>) responseAsMap.get("values");
assertEquals(2, columns.size());
assertEquals(2, values.size());
List<?> flatList = values.stream()
.flatMap(innerList -> innerList instanceof List ? ((List<?>) innerList).stream() : Stream.empty())
.collect(Collectors.toList());
assertThat(flatList, containsInAnyOrder(1, 3, "usa", "germany"));
}

protected Request esqlRequest(String command) throws IOException {
Expand Down

0 comments on commit 0a09403

Please sign in to comment.