Skip to content

Commit

Permalink
POC, stable tests
Browse files Browse the repository at this point in the history
Signed-off-by: Maciej Mierzwa <[email protected]>
  • Loading branch information
MaciejMierzwa committed Nov 2, 2023
1 parent 6d2711f commit d3a4a2c
Showing 1 changed file with 100 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
*/
package org.opensearch.security;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -70,6 +72,8 @@
import org.opensearch.client.Client;
import org.opensearch.client.ClusterAdminClient;
import org.opensearch.client.IndicesAdminClient;
import org.opensearch.client.Request;
import org.opensearch.client.Response;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.core.CountRequest;
import org.opensearch.client.indices.CloseIndexRequest;
Expand All @@ -96,6 +100,7 @@
import org.opensearch.repositories.RepositoryMissingException;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.security.auditlog.AuditLog;
import org.opensearch.test.framework.AuditCompliance;
import org.opensearch.test.framework.AuditConfiguration;
import org.opensearch.test.framework.AuditFilters;
Expand Down Expand Up @@ -1116,6 +1121,37 @@ public void shouldUpdateDocumentsInBulk_negative() throws IOException {

@Test
public void shouldDeleteDocumentInBulk_positive() throws IOException {
/**
Proof of concept changes to make tests stable. The problem was caused by nodes rotation in this method:
{@link org.opensearch.client.RestClient#performRequest(Request request)} which uses {@link org.opensearch.client.RestClient#nextNodes()}
to iterate through nodes, in each http call. This would cause different amount of audit log messages depending on node hit by http request.
the process is:
1. create 5-node cluster, with 3 cluster managers
2. create 2-shard, 2-replica index, find out which node holds the data
3. create second high level rest client, using node found in previous step, for all calls. Minimize transport requests between cluster nodes.
*/
//create index
Settings sourceIndexSettings = Settings.builder()
.put("index.number_of_replicas", 2)
.put("index.number_of_shards", 2)
.build();
IndexOperationsHelper.createIndex(cluster, WRITE_SONG_INDEX_NAME, sourceIndexSettings);
try (RestHighLevelClient restHighLevelClient = cluster.getRestHighLevelClient(ADMIN_USER)) {
// cat shards, get index and node name
// getNodeByName()
// get rest client of node
Request getIndicesRequest = new Request("GET", "/_cat/shards?v");
// High level client doesn't support _cat/shards API
Response getIndicesResponse = restHighLevelClient.getLowLevelClient().performRequest(getIndicesRequest);
String primaryNode = new BufferedReader(new InputStreamReader(getIndicesResponse.getEntity().getContent())).lines()
.map(s -> s.split("\\s+"))
.filter(strings -> strings[0].equals(WRITE_SONG_INDEX_NAME))
.filter(strings -> strings[2].equals("p"))
.map(strings -> strings[7])
.findAny().orElseThrow(() -> new IllegalStateException("no primary shard found"));
cluster.setPrimaryNode(primaryNode);
}

try (RestHighLevelClient restHighLevelClient = cluster.getRestHighLevelClient(LIMITED_WRITE_USER)) {
BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(IMMEDIATE);
bulkRequest.add(new IndexRequest(WRITE_SONG_INDEX_NAME).id("one").source(SONGS[0].asMap()));
Expand All @@ -1140,9 +1176,71 @@ public void shouldDeleteDocumentInBulk_positive() throws IOException {
}
auditLogsRule.assertExactly(2, userAuthenticated(LIMITED_WRITE_USER).withRestRequest(POST, "/_bulk"));
auditLogsRule.assertExactly(2, grantedPrivilege(LIMITED_WRITE_USER, "BulkRequest"));
auditLogsRule.assertExactly(2, grantedPrivilege(LIMITED_WRITE_USER, "CreateIndexRequest"));
auditLogsRule.assertExactly(4, grantedPrivilege(LIMITED_WRITE_USER, "PutMappingRequest"));
auditLogsRule.assertExactly(6, auditPredicate(INDEX_EVENT).withEffectiveUser(LIMITED_WRITE_USER));
auditLogsRule.assertExactly(4, auditPredicate(INDEX_EVENT).withEffectiveUser(LIMITED_WRITE_USER));
auditLogsRule.assertExactly(13, auditPredicate(null).withLayer(AuditLog.Origin.TRANSPORT));
}

@Test
public void shouldDeleteDocumentInBulk_positiveTransportRequests() throws IOException {
/**
Proof of concept changes to make tests stable. The problem was caused by nodes rotation in this method:
{@link org.opensearch.client.RestClient#performRequest(Request request)} which uses {@link org.opensearch.client.RestClient#nextNodes()}
to iterate through nodes, in each http call. This would cause different amount of audit log messages depending on node hit by http request.
the process is:
1. create 5-node cluster, with 3 cluster managers
2. create 2-shard, 2-replica index, find out which node is manager, doesn't hold data
3. create second high level rest client, using node found in previous step, for all calls. Maximize transport requests between cluster nodes.
*/
//create index
Settings sourceIndexSettings = Settings.builder()
.put("index.number_of_replicas", 2)
.put("index.number_of_shards", 2)
.build();
IndexOperationsHelper.createIndex(cluster, WRITE_SONG_INDEX_NAME, sourceIndexSettings);
try (RestHighLevelClient restHighLevelClient = cluster.getRestHighLevelClient(LIMITED_WRITE_USER)) {
// cat shards, get index and node name
// getNodeByName()
// get rest client of node
Request getIndicesRequest = new Request("GET", "/_cat/nodes?v");
// High level client doesn't support _cat/shards API
Response getIndicesResponse = restHighLevelClient.getLowLevelClient().performRequest(getIndicesRequest);
String primaryNode = new BufferedReader(new InputStreamReader(getIndicesResponse.getEntity().getContent())).lines()
.map(s -> s.split("\\s+"))
.filter(strings -> strings[9].equals("*"))
.map(strings -> strings[10])
.findAny().orElseThrow(() -> new IllegalStateException("no cluster manager found"));
cluster.setPrimaryNode(primaryNode);
}

try (RestHighLevelClient restHighLevelClient = cluster.getRestHighLevelClient(LIMITED_WRITE_USER)) {

BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(IMMEDIATE);
bulkRequest.add(new IndexRequest(WRITE_SONG_INDEX_NAME).id("one").source(SONGS[0].asMap()));
bulkRequest.add(new IndexRequest(WRITE_SONG_INDEX_NAME).id("two").source(SONGS[1].asMap()));
bulkRequest.add(new IndexRequest(WRITE_SONG_INDEX_NAME).id("three").source(SONGS[2].asMap()));
bulkRequest.add(new IndexRequest(WRITE_SONG_INDEX_NAME).id("four").source(SONGS[3].asMap()));
assertThat(restHighLevelClient.bulk(bulkRequest, DEFAULT), successBulkResponse());
bulkRequest = new BulkRequest().setRefreshPolicy(IMMEDIATE);
bulkRequest.add(new DeleteRequest(WRITE_SONG_INDEX_NAME, "one"));
bulkRequest.add(new DeleteRequest(WRITE_SONG_INDEX_NAME, "three"));

BulkResponse response = restHighLevelClient.bulk(bulkRequest, DEFAULT);

assertThat(response, successBulkResponse());
assertThat(internalClient, not(clusterContainsDocument(WRITE_SONG_INDEX_NAME, "one")));
assertThat(internalClient, not(clusterContainsDocument(WRITE_SONG_INDEX_NAME, "three")));
assertThat(
internalClient,
clusterContainsDocumentWithFieldValue(WRITE_SONG_INDEX_NAME, "two", FIELD_TITLE, TITLE_SONG_1_PLUS_1)
);
assertThat(internalClient, clusterContainsDocumentWithFieldValue(WRITE_SONG_INDEX_NAME, "four", FIELD_TITLE, TITLE_POISON));
}
auditLogsRule.assertExactly(2, userAuthenticated(LIMITED_WRITE_USER).withRestRequest(POST, "/_bulk"));
auditLogsRule.assertExactly(2, grantedPrivilege(LIMITED_WRITE_USER, "BulkRequest"));
auditLogsRule.assertExactly(4, grantedPrivilege(LIMITED_WRITE_USER, "PutMappingRequest"));
auditLogsRule.assertExactly(4, auditPredicate(INDEX_EVENT).withEffectiveUser(LIMITED_WRITE_USER));
auditLogsRule.assertExactly(14, auditPredicate(null).withLayer(AuditLog.Origin.TRANSPORT));
}

@Test
Expand Down

0 comments on commit d3a4a2c

Please sign in to comment.