Skip to content

Commit

Permalink
POC, stable tests
Browse files Browse the repository at this point in the history
Signed-off-by: Maciej Mierzwa <[email protected]>
  • Loading branch information
MaciejMierzwa committed Nov 2, 2023
1 parent 6d2711f commit 6296d7d
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
*/
package org.opensearch.security;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -70,6 +72,8 @@
import org.opensearch.client.Client;
import org.opensearch.client.ClusterAdminClient;
import org.opensearch.client.IndicesAdminClient;
import org.opensearch.client.Request;
import org.opensearch.client.Response;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.core.CountRequest;
import org.opensearch.client.indices.CloseIndexRequest;
Expand All @@ -96,6 +100,7 @@
import org.opensearch.repositories.RepositoryMissingException;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.security.auditlog.AuditLog;
import org.opensearch.test.framework.AuditCompliance;
import org.opensearch.test.framework.AuditConfiguration;
import org.opensearch.test.framework.AuditFilters;
Expand Down Expand Up @@ -1116,6 +1121,37 @@ public void shouldUpdateDocumentsInBulk_negative() throws IOException {

@Test
public void shouldDeleteDocumentInBulk_positive() throws IOException {
/**
Proof of concept changes to make tests stable. The problem was caused by nodes rotation in this method:
{@link org.opensearch.client.RestClient#performRequest(Request request)} which uses {@link org.opensearch.client.RestClient#nextNodes()}
to iterate through nodes, in each http call. This would cause different amount of audit log messages depending on node hit by http request.
the process is:
1. create 5-node cluster, with 3 cluster managers
2. create 2-shard, 2-replica index, find out which node holds the data
3. create second high level rest client, using node found in previous step, for all calls. Minimize transport requests between cluster nodes.
*/
//create index
Settings sourceIndexSettings = Settings.builder()
.put("index.number_of_replicas", 2)
.put("index.number_of_shards", 2)
.build();
IndexOperationsHelper.createIndex(cluster, WRITE_SONG_INDEX_NAME, sourceIndexSettings);
try (RestHighLevelClient restHighLevelClient = cluster.getRestHighLevelClient(ADMIN_USER)) {
// cat shards, get index and node name
// getNodeByName()
// get rest client of node
Request getIndicesRequest = new Request("GET", "/_cat/shards?v");
// High level client doesn't support _cat/shards API
Response getIndicesResponse = restHighLevelClient.getLowLevelClient().performRequest(getIndicesRequest);
String primaryNode = new BufferedReader(new InputStreamReader(getIndicesResponse.getEntity().getContent())).lines()
.map(s -> s.split("\\s+"))
.filter(strings -> strings[0].equals(WRITE_SONG_INDEX_NAME))
.filter(strings -> strings[2].equals("p"))
.map(strings -> strings[7])
.findAny().orElseThrow(() -> new IllegalStateException("no primary shard found"));
cluster.setPrimaryNode(primaryNode);
}

try (RestHighLevelClient restHighLevelClient = cluster.getRestHighLevelClient(LIMITED_WRITE_USER)) {
BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(IMMEDIATE);
bulkRequest.add(new IndexRequest(WRITE_SONG_INDEX_NAME).id("one").source(SONGS[0].asMap()));
Expand All @@ -1140,9 +1176,71 @@ public void shouldDeleteDocumentInBulk_positive() throws IOException {
}
auditLogsRule.assertExactly(2, userAuthenticated(LIMITED_WRITE_USER).withRestRequest(POST, "/_bulk"));
auditLogsRule.assertExactly(2, grantedPrivilege(LIMITED_WRITE_USER, "BulkRequest"));
auditLogsRule.assertExactly(2, grantedPrivilege(LIMITED_WRITE_USER, "CreateIndexRequest"));
auditLogsRule.assertExactly(4, grantedPrivilege(LIMITED_WRITE_USER, "PutMappingRequest"));
auditLogsRule.assertExactly(6, auditPredicate(INDEX_EVENT).withEffectiveUser(LIMITED_WRITE_USER));
auditLogsRule.assertExactly(4, auditPredicate(INDEX_EVENT).withEffectiveUser(LIMITED_WRITE_USER));
auditLogsRule.assertExactly(13, auditPredicate(null).withLayer(AuditLog.Origin.TRANSPORT));
}

@Test
public void shouldDeleteDocumentInBulk_positiveTransportRequests() throws IOException {
/**
Proof of concept changes to make tests stable. The problem was caused by nodes rotation in this method:
{@link org.opensearch.client.RestClient#performRequest(Request request)} which uses {@link org.opensearch.client.RestClient#nextNodes()}
to iterate through nodes, in each http call. This would cause different amount of audit log messages depending on node hit by http request.
the process is:
1. create 5-node cluster, with 3 cluster managers
2. create 2-shard, 2-replica index, find out which node is manager, doesn't hold data
3. create second high level rest client, using node found in previous step, for all calls. Maximize transport requests between cluster nodes.
*/
//create index
Settings sourceIndexSettings = Settings.builder()
.put("index.number_of_replicas", 2)
.put("index.number_of_shards", 2)
.build();
IndexOperationsHelper.createIndex(cluster, WRITE_SONG_INDEX_NAME, sourceIndexSettings);
try (RestHighLevelClient restHighLevelClient = cluster.getRestHighLevelClient(LIMITED_WRITE_USER)) {
// cat shards, get index and node name
// getNodeByName()
// get rest client of node
Request getIndicesRequest = new Request("GET", "/_cat/nodes?v");
// High level client doesn't support _cat/shards API
Response getIndicesResponse = restHighLevelClient.getLowLevelClient().performRequest(getIndicesRequest);
String primaryNode = new BufferedReader(new InputStreamReader(getIndicesResponse.getEntity().getContent())).lines()
.map(s -> s.split("\\s+"))
.filter(strings -> strings[9].equals("*"))
.map(strings -> strings[10])
.findAny().orElseThrow(() -> new IllegalStateException("no cluster manager found"));
cluster.setPrimaryNode(primaryNode);
}

try (RestHighLevelClient restHighLevelClient = cluster.getRestHighLevelClient(LIMITED_WRITE_USER)) {

BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(IMMEDIATE);
bulkRequest.add(new IndexRequest(WRITE_SONG_INDEX_NAME).id("one").source(SONGS[0].asMap()));
bulkRequest.add(new IndexRequest(WRITE_SONG_INDEX_NAME).id("two").source(SONGS[1].asMap()));
bulkRequest.add(new IndexRequest(WRITE_SONG_INDEX_NAME).id("three").source(SONGS[2].asMap()));
bulkRequest.add(new IndexRequest(WRITE_SONG_INDEX_NAME).id("four").source(SONGS[3].asMap()));
assertThat(restHighLevelClient.bulk(bulkRequest, DEFAULT), successBulkResponse());
bulkRequest = new BulkRequest().setRefreshPolicy(IMMEDIATE);
bulkRequest.add(new DeleteRequest(WRITE_SONG_INDEX_NAME, "one"));
bulkRequest.add(new DeleteRequest(WRITE_SONG_INDEX_NAME, "three"));

BulkResponse response = restHighLevelClient.bulk(bulkRequest, DEFAULT);

assertThat(response, successBulkResponse());
assertThat(internalClient, not(clusterContainsDocument(WRITE_SONG_INDEX_NAME, "one")));
assertThat(internalClient, not(clusterContainsDocument(WRITE_SONG_INDEX_NAME, "three")));
assertThat(
internalClient,
clusterContainsDocumentWithFieldValue(WRITE_SONG_INDEX_NAME, "two", FIELD_TITLE, TITLE_SONG_1_PLUS_1)
);
assertThat(internalClient, clusterContainsDocumentWithFieldValue(WRITE_SONG_INDEX_NAME, "four", FIELD_TITLE, TITLE_POISON));
}
auditLogsRule.assertExactly(2, userAuthenticated(LIMITED_WRITE_USER).withRestRequest(POST, "/_bulk"));
auditLogsRule.assertExactly(2, grantedPrivilege(LIMITED_WRITE_USER, "BulkRequest"));
auditLogsRule.assertExactly(4, grantedPrivilege(LIMITED_WRITE_USER, "PutMappingRequest"));
auditLogsRule.assertExactly(4, auditPredicate(INDEX_EVENT).withEffectiveUser(LIMITED_WRITE_USER));
auditLogsRule.assertExactly(14, auditPredicate(null).withLayer(AuditLog.Origin.TRANSPORT));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public class LocalCluster extends ExternalResource implements AutoCloseable, Ope
private final Map<String, LocalCluster> remotes;
private volatile LocalOpenSearchCluster localOpenSearchCluster;
private final List<TestIndex> testIndices;
private String primaryNode;

private boolean loadConfigurationIntoIndex;

Expand Down Expand Up @@ -156,6 +157,7 @@ public void before() throws Throwable {
@Override
protected void after() {
System.clearProperty(INIT_CONFIGURATION_DIR);
this.primaryNode = null;
close();
}

Expand All @@ -179,7 +181,7 @@ public String getClusterName() {

@Override
public InetSocketAddress getHttpAddress() {
return localOpenSearchCluster.clientNode().getHttpAddress();
return this.primaryNode == null ? localOpenSearchCluster.clientNode().getHttpAddress() : localOpenSearchCluster.getNodeByName(primaryNode).getHttpAddress();
}

public int getHttpPort() {
Expand Down Expand Up @@ -531,4 +533,7 @@ public TestCertificates getTestCertificates() {
return testCertificates;
}

public void setPrimaryNode(String primaryNode) {
this.primaryNode = primaryNode;
}
}

0 comments on commit 6296d7d

Please sign in to comment.