diff --git a/src/integrationTest/java/org/opensearch/security/SearchOperationTest.java b/src/integrationTest/java/org/opensearch/security/SearchOperationTest.java index 91ef9f7999..451317298d 100644 --- a/src/integrationTest/java/org/opensearch/security/SearchOperationTest.java +++ b/src/integrationTest/java/org/opensearch/security/SearchOperationTest.java @@ -9,7 +9,9 @@ */ package org.opensearch.security; +import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStreamReader; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -70,6 +72,8 @@ import org.opensearch.client.Client; import org.opensearch.client.ClusterAdminClient; import org.opensearch.client.IndicesAdminClient; +import org.opensearch.client.Request; +import org.opensearch.client.Response; import org.opensearch.client.RestHighLevelClient; import org.opensearch.client.core.CountRequest; import org.opensearch.client.indices.CloseIndexRequest; @@ -96,6 +100,7 @@ import org.opensearch.repositories.RepositoryMissingException; import org.opensearch.core.rest.RestStatus; import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.security.auditlog.AuditLog; import org.opensearch.test.framework.AuditCompliance; import org.opensearch.test.framework.AuditConfiguration; import org.opensearch.test.framework.AuditFilters; @@ -1116,6 +1121,37 @@ public void shouldUpdateDocumentsInBulk_negative() throws IOException { @Test public void shouldDeleteDocumentInBulk_positive() throws IOException { + /** + Proof of concept changes to make tests stable. The problem was caused by nodes rotation in this method: + {@link org.opensearch.client.RestClient#performRequest(Request request)} which uses {@link org.opensearch.client.RestClient#nextNodes()} + to iterate through nodes, in each http call. This would cause different amount of audit log messages depending on node hit by http request. + the process is: + 1. create 5-node cluster, with 3 cluster managers + 2. create 2-shard, 2-replica index, find out which node holds the data + 3. create second high level rest client, using node found in previous step, for all calls. Minimize transport requests between cluster nodes. + */ + //create index + Settings sourceIndexSettings = Settings.builder() + .put("index.number_of_replicas", 2) + .put("index.number_of_shards", 2) + .build(); + IndexOperationsHelper.createIndex(cluster, WRITE_SONG_INDEX_NAME, sourceIndexSettings); + try (RestHighLevelClient restHighLevelClient = cluster.getRestHighLevelClient(ADMIN_USER)) { + // cat shards, get index and node name + // getNodeByName() + // get rest client of node + Request getIndicesRequest = new Request("GET", "/_cat/shards?v"); + // High level client doesn't support _cat/shards API + Response getIndicesResponse = restHighLevelClient.getLowLevelClient().performRequest(getIndicesRequest); + String primaryNode = new BufferedReader(new InputStreamReader(getIndicesResponse.getEntity().getContent())).lines() + .map(s -> s.split("\\s+")) + .filter(strings -> strings[0].equals(WRITE_SONG_INDEX_NAME)) + .filter(strings -> strings[2].equals("p")) + .map(strings -> strings[7]) + .findAny().orElseThrow(() -> new IllegalStateException("no primary shard found")); + cluster.setPrimaryNode(primaryNode); + } + try (RestHighLevelClient restHighLevelClient = cluster.getRestHighLevelClient(LIMITED_WRITE_USER)) { BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(IMMEDIATE); bulkRequest.add(new IndexRequest(WRITE_SONG_INDEX_NAME).id("one").source(SONGS[0].asMap())); @@ -1140,9 +1176,71 @@ public void shouldDeleteDocumentInBulk_positive() throws IOException { } auditLogsRule.assertExactly(2, userAuthenticated(LIMITED_WRITE_USER).withRestRequest(POST, "/_bulk")); auditLogsRule.assertExactly(2, grantedPrivilege(LIMITED_WRITE_USER, "BulkRequest")); - auditLogsRule.assertExactly(2, grantedPrivilege(LIMITED_WRITE_USER, "CreateIndexRequest")); auditLogsRule.assertExactly(4, grantedPrivilege(LIMITED_WRITE_USER, "PutMappingRequest")); - auditLogsRule.assertExactly(6, auditPredicate(INDEX_EVENT).withEffectiveUser(LIMITED_WRITE_USER)); + auditLogsRule.assertExactly(4, auditPredicate(INDEX_EVENT).withEffectiveUser(LIMITED_WRITE_USER)); + auditLogsRule.assertExactly(13, auditPredicate(null).withLayer(AuditLog.Origin.TRANSPORT)); + } + + @Test + public void shouldDeleteDocumentInBulk_positiveTransportRequests() throws IOException { + /** + Proof of concept changes to make tests stable. The problem was caused by nodes rotation in this method: + {@link org.opensearch.client.RestClient#performRequest(Request request)} which uses {@link org.opensearch.client.RestClient#nextNodes()} + to iterate through nodes, in each http call. This would cause different amount of audit log messages depending on node hit by http request. + the process is: + 1. create 5-node cluster, with 3 cluster managers + 2. create 2-shard, 2-replica index, find out which node is manager, doesn't hold data + 3. create second high level rest client, using node found in previous step, for all calls. Maximize transport requests between cluster nodes. + */ + //create index + Settings sourceIndexSettings = Settings.builder() + .put("index.number_of_replicas", 2) + .put("index.number_of_shards", 2) + .build(); + IndexOperationsHelper.createIndex(cluster, WRITE_SONG_INDEX_NAME, sourceIndexSettings); + try (RestHighLevelClient restHighLevelClient = cluster.getRestHighLevelClient(LIMITED_WRITE_USER)) { + // cat shards, get index and node name + // getNodeByName() + // get rest client of node + Request getIndicesRequest = new Request("GET", "/_cat/nodes?v"); + // High level client doesn't support _cat/shards API + Response getIndicesResponse = restHighLevelClient.getLowLevelClient().performRequest(getIndicesRequest); + String primaryNode = new BufferedReader(new InputStreamReader(getIndicesResponse.getEntity().getContent())).lines() + .map(s -> s.split("\\s+")) + .filter(strings -> strings[9].equals("*")) + .map(strings -> strings[10]) + .findAny().orElseThrow(() -> new IllegalStateException("no cluster manager found")); + cluster.setPrimaryNode(primaryNode); + } + + try (RestHighLevelClient restHighLevelClient = cluster.getRestHighLevelClient(LIMITED_WRITE_USER)) { + + BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(IMMEDIATE); + bulkRequest.add(new IndexRequest(WRITE_SONG_INDEX_NAME).id("one").source(SONGS[0].asMap())); + bulkRequest.add(new IndexRequest(WRITE_SONG_INDEX_NAME).id("two").source(SONGS[1].asMap())); + bulkRequest.add(new IndexRequest(WRITE_SONG_INDEX_NAME).id("three").source(SONGS[2].asMap())); + bulkRequest.add(new IndexRequest(WRITE_SONG_INDEX_NAME).id("four").source(SONGS[3].asMap())); + assertThat(restHighLevelClient.bulk(bulkRequest, DEFAULT), successBulkResponse()); + bulkRequest = new BulkRequest().setRefreshPolicy(IMMEDIATE); + bulkRequest.add(new DeleteRequest(WRITE_SONG_INDEX_NAME, "one")); + bulkRequest.add(new DeleteRequest(WRITE_SONG_INDEX_NAME, "three")); + + BulkResponse response = restHighLevelClient.bulk(bulkRequest, DEFAULT); + + assertThat(response, successBulkResponse()); + assertThat(internalClient, not(clusterContainsDocument(WRITE_SONG_INDEX_NAME, "one"))); + assertThat(internalClient, not(clusterContainsDocument(WRITE_SONG_INDEX_NAME, "three"))); + assertThat( + internalClient, + clusterContainsDocumentWithFieldValue(WRITE_SONG_INDEX_NAME, "two", FIELD_TITLE, TITLE_SONG_1_PLUS_1) + ); + assertThat(internalClient, clusterContainsDocumentWithFieldValue(WRITE_SONG_INDEX_NAME, "four", FIELD_TITLE, TITLE_POISON)); + } + auditLogsRule.assertExactly(2, userAuthenticated(LIMITED_WRITE_USER).withRestRequest(POST, "/_bulk")); + auditLogsRule.assertExactly(2, grantedPrivilege(LIMITED_WRITE_USER, "BulkRequest")); + auditLogsRule.assertExactly(4, grantedPrivilege(LIMITED_WRITE_USER, "PutMappingRequest")); + auditLogsRule.assertExactly(4, auditPredicate(INDEX_EVENT).withEffectiveUser(LIMITED_WRITE_USER)); + auditLogsRule.assertExactly(14, auditPredicate(null).withLayer(AuditLog.Origin.TRANSPORT)); } @Test diff --git a/src/integrationTest/java/org/opensearch/test/framework/cluster/LocalCluster.java b/src/integrationTest/java/org/opensearch/test/framework/cluster/LocalCluster.java index 64207ead5b..5480e1c664 100644 --- a/src/integrationTest/java/org/opensearch/test/framework/cluster/LocalCluster.java +++ b/src/integrationTest/java/org/opensearch/test/framework/cluster/LocalCluster.java @@ -94,6 +94,7 @@ public class LocalCluster extends ExternalResource implements AutoCloseable, Ope private final Map remotes; private volatile LocalOpenSearchCluster localOpenSearchCluster; private final List testIndices; + private String primaryNode; private boolean loadConfigurationIntoIndex; @@ -156,6 +157,7 @@ public void before() throws Throwable { @Override protected void after() { System.clearProperty(INIT_CONFIGURATION_DIR); + this.primaryNode = null; close(); } @@ -179,7 +181,7 @@ public String getClusterName() { @Override public InetSocketAddress getHttpAddress() { - return localOpenSearchCluster.clientNode().getHttpAddress(); + return this.primaryNode == null ? localOpenSearchCluster.clientNode().getHttpAddress() : localOpenSearchCluster.getNodeByName(primaryNode).getHttpAddress(); } public int getHttpPort() { @@ -531,4 +533,7 @@ public TestCertificates getTestCertificates() { return testCertificates; } + public void setPrimaryNode(String primaryNode) { + this.primaryNode = primaryNode; + } }