Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Prevent message collection from being updated after message count has been received (#2180) #3035

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,18 @@ public String getExceptionStackTrace() {
return (String) this.auditInfo.get(EXCEPTION);
}

public String getRequestBody() {
return (String) this.auditInfo.get(REQUEST_BODY);
}

public String getNodeId() {
return (String) this.auditInfo.get(NODE_ID);
}

public String getDocId() {
return (String) this.auditInfo.get(ID);
}

@Override
public String toString() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import org.opensearch.security.test.helper.rest.RestHelper.HttpResponse;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.core.AnyOf.anyOf;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.Assert.assertThrows;
Expand Down Expand Up @@ -90,10 +92,11 @@ public void testSourceFilter() throws Exception {
Assert.assertEquals(HttpStatus.SC_OK, response.getStatusCode());
});

Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("COMPLIANCE_DOC_READ"));
Assert.assertFalse(TestAuditlogImpl.sb.toString().contains("Designation"));
Assert.assertFalse(TestAuditlogImpl.sb.toString().contains("Salary"));
Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("Gender"));
assertThat(message.getCategory(), equalTo(AuditCategory.COMPLIANCE_DOC_READ));
assertThat(message.getRequestBody(), not(containsString("Designation")));
assertThat(message.getRequestBody(), not(containsString("Salary")));
assertThat(message.getRequestBody(), containsString("Gender"));

Assert.assertTrue(validateMsgs(TestAuditlogImpl.messages));
}

Expand Down Expand Up @@ -223,17 +226,26 @@ public void testSourceFilterMsearch() throws Exception {
+ "}"
+ System.lineSeparator();

TestAuditlogImpl.doThenWaitForMessages(() -> {
final List<AuditMessage> messages = TestAuditlogImpl.doThenWaitForMessages(() -> {
HttpResponse response = rh.executePostRequest("_msearch?pretty", search, encodeBasicHeader("admin", "admin"));
assertNotContains(response, "*exception*");
Assert.assertEquals(HttpStatus.SC_OK, response.getStatusCode());
}, 2);
System.out.println(TestAuditlogImpl.sb.toString());
Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("COMPLIANCE_DOC_READ"));
Assert.assertFalse(TestAuditlogImpl.sb.toString().contains("Salary"));
Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("Gender"));
Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("Designation"));
Assert.assertTrue(validateMsgs(TestAuditlogImpl.messages));

final AuditMessage desginationMsg = messages.stream()
.filter(msg -> msg.getRequestBody().contains("Designation"))
.findFirst()
.orElseThrow();
assertThat(desginationMsg.getCategory(), equalTo(AuditCategory.COMPLIANCE_DOC_READ));
assertThat(desginationMsg.getRequestBody(), containsString("Designation"));
assertThat(desginationMsg.getRequestBody(), not(containsString("Salary")));

final AuditMessage genderMsg = messages.stream().filter(msg -> msg.getRequestBody().contains("Gender")).findFirst().orElseThrow();
assertThat(genderMsg.getCategory(), equalTo(AuditCategory.COMPLIANCE_DOC_READ));
assertThat(genderMsg.getRequestBody(), containsString("Gender"));
assertThat(genderMsg.getRequestBody(), not(containsString("Salary")));

Assert.assertTrue(validateMsgs(messages));
}

@Test
Expand All @@ -253,6 +265,15 @@ public void testInternalConfig() throws Exception {

setup(additionalSettings);

final List<String> expectedDocumentsTypes = List.of(
"config",
"actiongroups",
"internalusers",
"roles",
"rolesmapping",
"tenants",
"audit"
);
final List<AuditMessage> messages = TestAuditlogImpl.doThenWaitForMessages(() -> {
try (RestHighLevelClient restHighLevelClient = getRestClient(clusterInfo, "kirk-keystore.jks", "truststore.jks")) {
for (IndexRequest ir : new DynamicSecurityConfig().setSecurityRoles("roles_2.yml").getDynamicConfig(getResourceFolder())) {
Expand All @@ -268,23 +289,20 @@ public void testInternalConfig() throws Exception {
assertThat(response.getStatusCode(), equalTo(HttpStatus.SC_OK));
}, 14);

Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("COMPLIANCE_INTERNAL_CONFIG_READ"));
Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("COMPLIANCE_INTERNAL_CONFIG_WRITE"));
Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("anonymous_auth_enabled"));
Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("indices:data/read/suggest"));
Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("internalusers"));
Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("opendistro_security_all_access"));
Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("indices:data/read/suggest"));
Assert.assertFalse(TestAuditlogImpl.sb.toString().contains("eyJzZWFyY2hndWFy"));
Assert.assertFalse(TestAuditlogImpl.sb.toString().contains("eyJBTEwiOlsiaW"));
Assert.assertFalse(TestAuditlogImpl.sb.toString().contains("eyJhZG1pbiI6e"));
Assert.assertFalse(TestAuditlogImpl.sb.toString().contains("eyJzZ19hb"));
Assert.assertFalse(TestAuditlogImpl.sb.toString().contains("eyJzZ19hbGx"));
Assert.assertFalse(TestAuditlogImpl.sb.toString().contains("dvcmYiOnsiY2x"));
Assert.assertTrue(
TestAuditlogImpl.sb.toString().contains("\\\"op\\\":\\\"remove\\\",\\\"path\\\":\\\"/opendistro_security_worf\\\"")
);
Assert.assertTrue(validateMsgs(TestAuditlogImpl.messages));
final List<String> documentIds = messages.stream().map(AuditMessage::getDocId).distinct().collect(Collectors.toList());
assertThat(documentIds, equalTo(expectedDocumentsTypes));

messages.stream().collect(Collectors.groupingBy(AuditMessage::getDocId)).entrySet().forEach((e) -> {
final String docId = e.getKey();
final List<AuditMessage> messagesByDocId = e.getValue();
assertThat(
"Doc " + docId + " should have a read/write config message",
messagesByDocId.stream().map(AuditMessage::getCategory).collect(Collectors.toList()),
equalTo(List.of(AuditCategory.COMPLIANCE_INTERNAL_CONFIG_WRITE, AuditCategory.COMPLIANCE_INTERNAL_CONFIG_READ))
);
});

Assert.assertTrue(validateMsgs(messages));
}

@Test
Expand All @@ -301,7 +319,7 @@ public void testExternalConfig() throws Exception {
.put(ConfigConstants.OPENDISTRO_SECURITY_AUDIT_CONFIG_DISABLED_REST_CATEGORIES, "authenticated,GRANTED_PRIVILEGES")
.build();

TestAuditlogImpl.doThenWaitForMessages(() -> {
final List<AuditMessage> messages = TestAuditlogImpl.doThenWaitForMessages(() -> {
try {
setup(additionalSettings);
} catch (final Exception ex) {
Expand All @@ -318,10 +336,17 @@ public void testExternalConfig() throws Exception {
Assert.assertEquals(HttpStatus.SC_OK, response.getStatusCode());
}, 4);

Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("external_configuration"));
Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("COMPLIANCE_EXTERNAL_CONFIG"));
Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("opensearch_yml"));
Assert.assertTrue(validateMsgs(TestAuditlogImpl.messages));
// Record the updated config, and then for each node record that the config was updated
assertThat(messages.get(0).getCategory(), equalTo(AuditCategory.COMPLIANCE_INTERNAL_CONFIG_WRITE));
assertThat(messages.get(1).getCategory(), equalTo(AuditCategory.COMPLIANCE_EXTERNAL_CONFIG));
assertThat(messages.get(2).getCategory(), equalTo(AuditCategory.COMPLIANCE_EXTERNAL_CONFIG));
assertThat(messages.get(3).getCategory(), equalTo(AuditCategory.COMPLIANCE_EXTERNAL_CONFIG));

// Make sure that the config update messsages are for each node in the cluster
assertThat(messages.get(1).getNodeId(), not(equalTo(messages.get(2).getNodeId())));
assertThat(messages.get(2).getNodeId(), not(equalTo(messages.get(3).getNodeId())));

Assert.assertTrue(validateMsgs(messages));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public void testSSLPlainText() throws Exception {
() -> nonSslRestHelper().executeGetRequest("_search", encodeBasicHeader("admin", "admin"))
);
Assert.assertEquals("org.apache.http.NoHttpResponseException", ex.getCause().getClass().getName());
}, 4);
}, 1);

// All of the messages should be the same as the http client is attempting multiple times.
messages.stream().forEach((message) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,32 +58,63 @@ public static synchronized void clear() {
* Perform an action and then wait until the expected number of messages have been found.
*/
public static List<AuditMessage> doThenWaitForMessages(final Runnable action, final int expectedCount) {
final CountDownLatch latch = new CountDownLatch(expectedCount);
final List<AuditMessage> missedMessages = new ArrayList<>();
final List<AuditMessage> messages = new ArrayList<>();
countDownRef.set(latch);
messagesRef.set(messages);

TestAuditlogImpl.sb = new StringBuffer();
TestAuditlogImpl.messages = messages;
final CountDownLatch latch = resetAuditStorage(expectedCount, messages);

try {
action.run();
final int maxSecondsToWaitForMessages = 1;
final boolean foundAll = latch.await(maxSecondsToWaitForMessages, TimeUnit.SECONDS);
if (!foundAll) {
boolean foundAll = false;
foundAll = latch.await(maxSecondsToWaitForMessages, TimeUnit.SECONDS);
// After the wait has prevent any new messages from being recieved
resetAuditStorage(0, missedMessages);
if (!foundAll || messages.size() != expectedCount) {
throw new MessagesNotFoundException(expectedCount, messages);
}
if (messages.size() != expectedCount) {
throw new RuntimeException(
"Unexpected number of messages, was expecting " + expectedCount + ", received " + messages.size()
);
}
} catch (final InterruptedException e) {
throw new RuntimeException("Unexpected exception", e);
}

// Do not check for missed messages if no messages were expected
if (expectedCount != 0) {
try {
Thread.sleep(100);
if (missedMessages.size() != 0) {
final String missedMessagesErrorMessage = new StringBuilder().append("Audit messages were missed! ")
.append("Found " + (missedMessages.size()) + " messages.")
.append("Messages found during this time: \n\n")
.append(missedMessages.stream().map(AuditMessage::toString).collect(Collectors.joining("\n")))
.toString();

throw new RuntimeException(missedMessagesErrorMessage);
}
} catch (final Exception e) {
throw new RuntimeException("Unexpected exception", e);
}
}

// Next usage of this class might be using raw stringbuilder / list so reset before that test might run
resetAuditStorage(0, new ArrayList<>());
return new ArrayList<>(messages);
}

/**
* Resets all of the mechanics for fresh messages to be captured
*
* @param expectedMessageCount The number of messages before the latch is signalled, indicating all messages have been recieved
* @param message Where messages will be stored after being recieved
*/
private static CountDownLatch resetAuditStorage(int expectedMessageCount, List<AuditMessage> messages) {
final CountDownLatch latch = new CountDownLatch(expectedMessageCount);
countDownRef.set(latch);
messagesRef.set(messages);

TestAuditlogImpl.sb = new StringBuffer();
TestAuditlogImpl.messages = messages;
return latch;
}

/**
* Perform an action and then wait until a single message has been found.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ protected final CloseableHttpClient getHTTPClient() throws Exception {

hcb.setDefaultSocketConfig(SocketConfig.custom().setSoTimeout(60 * 1000).build());

return hcb.build();
return hcb.disableAutomaticRetries().build();
}

public static class HttpResponse {
Expand Down