Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for dynamic monitor metadata updates in remote monitors #1585

Merged
merged 1 commit into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.opensearch.alerting.MonitorMetadataService
import org.opensearch.alerting.MonitorRunner
import org.opensearch.alerting.MonitorRunnerExecutionContext
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.node.DiscoveryNode
import org.opensearch.cluster.routing.ShardRouting
import org.opensearch.cluster.service.ClusterService
Expand All @@ -27,6 +28,7 @@ import org.opensearch.commons.alerting.model.remote.monitors.RemoteDocLevelMonit
import org.opensearch.commons.alerting.util.AlertingException
import org.opensearch.core.index.shard.ShardId
import org.opensearch.core.rest.RestStatus
import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.transport.TransportService
import java.io.IOException
import java.time.Instant
Expand Down Expand Up @@ -64,12 +66,27 @@ class RemoteDocumentLevelMonitorRunner : MonitorRunner() {
logger.info(monitorMetadata.lastRunContext.toMutableMap().toString())
val lastRunContext = if (monitorMetadata.lastRunContext.isNullOrEmpty()) mutableMapOf()
else monitorMetadata.lastRunContext.toMutableMap() as MutableMap<String, MutableMap<String, Any>>
val updatedLastRunContext = lastRunContext.toMutableMap()

val remoteDocLevelMonitorInput = monitor.inputs[0] as RemoteDocLevelMonitorInput
val docLevelMonitorInput = remoteDocLevelMonitorInput.docLevelMonitorInput
var shards: Set<String> = mutableSetOf()
var concreteIndices = listOf<String>()

// Resolve all passed indices to concrete indices
val allConcreteIndices = IndexUtils.resolveAllIndices(
docLevelMonitorInput.indices,
monitorCtx.clusterService!!,
monitorCtx.indexNameExpressionResolver!!
)
// cleanup old indices that are not monitored anymore from the same monitor
val runContextKeys = updatedLastRunContext.keys.toMutableSet()
for (ind in runContextKeys) {
if (!allConcreteIndices.contains(ind)) {
updatedLastRunContext.remove(ind)
}
}

try {
docLevelMonitorInput.indices.forEach { indexName ->
concreteIndices = IndexUtils.resolveAllIndices(
Expand All @@ -93,6 +110,39 @@ class RemoteDocumentLevelMonitorRunner : MonitorRunner() {
}
}

concreteIndices.forEach { concreteIndexName ->
// Prepare lastRunContext for each index
val indexLastRunContext = lastRunContext.getOrPut(concreteIndexName) {
val isIndexCreatedRecently = createdRecently(
monitor,
periodStart,
periodEnd,
monitorCtx.clusterService!!.state().metadata.index(concreteIndexName)
)
MonitorMetadataService.createRunContextForIndex(concreteIndexName, isIndexCreatedRecently)
}

val indexUpdatedRunContext = initializeNewLastRunContext(
indexLastRunContext.toMutableMap(),
monitorCtx,
concreteIndexName
) as MutableMap<String, Any>
if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) ||
IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state())
) {
if (concreteIndexName == IndexUtils.getWriteIndex(
indexName,
monitorCtx.clusterService!!.state()
)
) {
updatedLastRunContext.remove(lastWriteIndex)
updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext
}
} else {
updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext
}
}

concreteIndices.forEach {
val shardCount = getShardsCount(monitorCtx.clusterService!!, it)
for (i in 0 until shardCount) {
Expand All @@ -111,7 +161,7 @@ class RemoteDocumentLevelMonitorRunner : MonitorRunner() {
val docLevelMonitorFanOutResponses = monitorCtx.remoteMonitors[monitor.monitorType]!!.monitorRunner.doFanOut(
monitorCtx.clusterService!!,
monitor,
monitorMetadata,
monitorMetadata.copy(lastRunContext = updatedLastRunContext),
executionId,
concreteIndices,
workflowRunContext,
Expand All @@ -120,12 +170,12 @@ class RemoteDocumentLevelMonitorRunner : MonitorRunner() {
nodeMap,
nodeShardAssignments
)
updateLastRunContextFromFanOutResponses(docLevelMonitorFanOutResponses, lastRunContext)
updateLastRunContextFromFanOutResponses(docLevelMonitorFanOutResponses, updatedLastRunContext)
val triggerResults = buildTriggerResults(docLevelMonitorFanOutResponses)
val inputRunResults = buildInputRunResults(docLevelMonitorFanOutResponses)
if (!isTempMonitor) {
MonitorMetadataService.upsertMetadata(
monitorMetadata.copy(lastRunContext = lastRunContext),
monitorMetadata.copy(lastRunContext = updatedLastRunContext),
true
)
}
Expand Down Expand Up @@ -216,17 +266,17 @@ class RemoteDocumentLevelMonitorRunner : MonitorRunner() {
// fanOutResponse.lastRunContexts //updatedContexts for relevant shards
val indexLastRunContext = updatedLastRunContext[indexName] as MutableMap<String, Any>

if (fanOutResponse.lastRunContexts.contains("index") && fanOutResponse.lastRunContexts["index"] == indexName) {
fanOutResponse.lastRunContexts.keys.forEach {
if (fanOutResponse.lastRunContexts.contains(indexName)) {
(fanOutResponse.lastRunContexts[indexName] as Map<String, Any>).forEach {

val seq_no = fanOutResponse.lastRunContexts[it].toString().toIntOrNull()
val seq_no = it.value.toString().toIntOrNull()
if (
it != "shards_count" &&
it != "index" &&
it.key != "shards_count" &&
it.key != "index" &&
seq_no != null &&
seq_no >= 0
) {
indexLastRunContext[it] = seq_no
indexLastRunContext[it.key] = seq_no
}
}
}
Expand Down Expand Up @@ -309,4 +359,29 @@ class RemoteDocumentLevelMonitorRunner : MonitorRunner() {
}
return InputRunResults(listOf(inputRunResults), if (!errors.isEmpty()) AlertingException.merge(*errors.toTypedArray()) else null)
}

private fun createdRecently(
monitor: Monitor,
periodStart: Instant,
periodEnd: Instant,
indexMetadata: IndexMetadata
): Boolean {
val lastExecutionTime = if (periodStart == periodEnd) monitor.lastUpdateTime else periodStart
val indexCreationDate = indexMetadata.settings.get("index.creation_date")?.toLong() ?: 0L
return indexCreationDate > lastExecutionTime.toEpochMilli()
}

private fun initializeNewLastRunContext(
lastRunContext: Map<String, Any>,
monitorCtx: MonitorRunnerExecutionContext,
index: String,
): Map<String, Any> {
val count: Int = getShardsCount(monitorCtx.clusterService!!, index)
val updatedLastRunContext = lastRunContext.toMutableMap()
for (i: Int in 0 until count) {
val shard = i.toString()
updatedLastRunContext[shard] = SequenceNumbers.UNASSIGNED_SEQ_NO
}
return updatedLastRunContext
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,15 @@ public void onFailure(Exception e) {
);
};
} else {
String indices = restRequest.param("index", "index");
List<String> index = List.of(indices.split(","));
SampleRemoteDocLevelMonitorInput sampleRemoteDocLevelMonitorInput =
new SampleRemoteDocLevelMonitorInput("hello", Map.of("world", 1), 2);
BytesStreamOutput out2 = new BytesStreamOutput();
sampleRemoteDocLevelMonitorInput.writeTo(out2);
BytesReference sampleRemoteDocLevelMonitorInputSerialized = out2.bytes();

DocLevelMonitorInput docLevelMonitorInput = new DocLevelMonitorInput("description", List.of("index"), emptyList());
DocLevelMonitorInput docLevelMonitorInput = new DocLevelMonitorInput("description", index, emptyList());
RemoteDocLevelMonitorInput remoteDocLevelMonitorInput = new RemoteDocLevelMonitorInput(sampleRemoteDocLevelMonitorInputSerialized, docLevelMonitorInput);

Monitor remoteDocLevelMonitor = new Monitor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,12 @@ protected void doExecute(Task task, DocLevelMonitorFanOutRequest request, Action
SampleRemoteMonitorTrigger1 remoteMonitorTrigger = new SampleRemoteMonitorTrigger1(triggerSin);


((Map<String, Object>) lastRunContext.get(index)).put("0", 0);
if (lastRunContext.containsKey(index)) {
((Map<String, Object>) lastRunContext.get(index)).put("2", 0);
}
if (docLevelMonitorInput.getIndices().size() > 1 && lastRunContext.containsKey(docLevelMonitorInput.getIndices().get(1))) {
((Map<String, Object>) lastRunContext.get(docLevelMonitorInput.getIndices().get(1))).put("4", 0);
}
IndexRequest indexRequest = new IndexRequest(SampleRemoteDocLevelMonitorRunner.SAMPLE_REMOTE_DOC_LEVEL_MONITOR_RUNNER_INDEX)
.source(Map.of(sampleRemoteDocLevelMonitorInput.getA(), remoteMonitorTrigger.getA())).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
this.client.index(indexRequest, new ActionListener<>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -167,7 +168,112 @@ public void testSampleRemoteDocLevelMonitor() throws IOException, InterruptedExc
LoggingDeprecationHandler.INSTANCE,
searchResponse.getEntity().getContent()
).map();
found.set(Integer.parseInt((((Map<String, Object>) ((Map<String, Object>) searchResponseJson.get("hits")).get("total")).get("value")).toString()) == 1 &&
found.set(Integer.parseInt((((Map<String, Object>) ((Map<String, Object>) searchResponseJson.get("hits")).get("total")).get("value")).toString()) > 0 &&
((Map<String, Object>) ((List<Map<String, Object>>) ((Map<String, Object>) searchResponseJson.get("hits")).get("hits")).get(0).get("_source")).containsKey("hello") &&
((Map<String, Object>) ((List<Map<String, Object>>) ((Map<String, Object>) searchResponseJson.get("hits")).get("hits")).get(0).get("_source")).get("hello").toString().equals("hello"));
return found.get();
} catch (IOException ex) {
return false;
}
}, 10, TimeUnit.SECONDS);
Assert.assertTrue(found.get());
}

@SuppressWarnings("unchecked")
public void testSampleRemoteDocLevelMonitorWithDynamicMetadataUpdate() throws IOException, InterruptedException {
createIndex("index1", Settings.builder().put("number_of_shards", "7").build());
Response response = makeRequest(client(), "POST", "_plugins/_sample_remote_monitor/monitor",
Map.of("run_monitor", "doc_level", "index", "index1"), null);
Assert.assertEquals("Unable to create remote monitor", RestStatus.OK, RestStatus.fromCode(response.getStatusLine().getStatusCode()));

Map<String, Object> responseJson = JsonXContent.jsonXContent.createParser(
NamedXContentRegistry.EMPTY,
LoggingDeprecationHandler.INSTANCE,
response.getEntity().getContent()
).map();
String monitorId = responseJson.get("_id").toString();

response = makeRequest(client(), "POST", "/_plugins/_alerting/monitors/" + monitorId + "/_execute", Map.of(), null);
Assert.assertEquals("Unable to execute remote monitor", RestStatus.OK, RestStatus.fromCode(response.getStatusLine().getStatusCode()));

createIndex("index2", Settings.builder().put("number_of_shards", "7").build());
String updatedMonitor = String.format(Locale.ROOT, "{\"type\":\"monitor\",\"name\":\"remote_doc_level_monitor\",\"monitor_type\":\"remote_doc_level_monitor\"," +
"\"user\":{\"name\":\"\",\"backend_roles\":[],\"roles\":[],\"custom_attribute_names\":[],\"user_requested_tenant\":null},\"enabled\":true,\"schedule\":{\"period\":{\"interval\":5,\"unit\":\"MINUTES\"}}," +
"\"inputs\":[{\"remote_doc_level_monitor_input\":{\"size\":24,\"input\":\"BWhlbGxvCgEFd29ybGQBAAAAAQAAAAIA\",\"doc_level_input\":" +
"{\"doc_level_input\":{\"description\":\"description\",\"indices\":[%s],\"queries\":[]}}}}],\"" +
"triggers\":[{\"remote_monitor_trigger\":{\"id\":\"id\",\"name\":\"name\",\"severity\":\"1\"," +
"\"actions\":[{\"id\":\"id\",\"name\":\"name\",\"destination_id\":\"destinationId\",\"message_template\":{\"source\":\"Hello World\"," +
"\"lang\":\"mustache\"},\"throttle_enabled\":false,\"subject_template\":{\"source\":\"Hello World\",\"lang\":\"mustache\"}," +
"\"throttle\":{\"value\":60,\"unit\":\"MINUTES\"}}],\"size\":24,\"trigger\":\"BWhlbGxvCgEEdGVzdAM/gAAAAAAAAQAA\"}}]," +
"\"owner\":\"sample-remote-monitor-plugin\"}", "\"index1\", \"index2\"");
makeRequest(client(), "PUT", "/_plugins/_alerting/monitors/" + monitorId, Map.of(), new StringEntity(updatedMonitor, ContentType.APPLICATION_JSON));

response = makeRequest(client(), "POST", "/_plugins/_alerting/monitors/" + monitorId + "/_execute", Map.of(), null);
Assert.assertEquals("Unable to execute remote monitor", RestStatus.OK, RestStatus.fromCode(response.getStatusLine().getStatusCode()));

makeRequest(client(), "DELETE", "/index1", Map.of(), null);

response = makeRequest(client(), "POST", "/_plugins/_alerting/monitors/" + monitorId + "/_execute", Map.of(), null);
Assert.assertEquals("Unable to execute remote monitor", RestStatus.OK, RestStatus.fromCode(response.getStatusLine().getStatusCode()));

AtomicBoolean found = new AtomicBoolean(false);
OpenSearchRestTestCase.waitUntil(
() -> {
try {
Response searchResponse = makeRequest(client(), "POST", SampleRemoteDocLevelMonitorRunner.SAMPLE_REMOTE_DOC_LEVEL_MONITOR_RUNNER_INDEX + "/_search", Map.of(),
new StringEntity("{\"query\":{\"match_all\":{}}}", ContentType.APPLICATION_JSON));
Map<String, Object> searchResponseJson = JsonXContent.jsonXContent.createParser(
NamedXContentRegistry.EMPTY,
LoggingDeprecationHandler.INSTANCE,
searchResponse.getEntity().getContent()
).map();
found.set(Integer.parseInt((((Map<String, Object>) ((Map<String, Object>) searchResponseJson.get("hits")).get("total")).get("value")).toString()) > 0 &&
((Map<String, Object>) ((List<Map<String, Object>>) ((Map<String, Object>) searchResponseJson.get("hits")).get("hits")).get(0).get("_source")).containsKey("hello") &&
((Map<String, Object>) ((List<Map<String, Object>>) ((Map<String, Object>) searchResponseJson.get("hits")).get("hits")).get(0).get("_source")).get("hello").toString().equals("hello"));
return found.get();
} catch (IOException ex) {
return false;
}
}, 10, TimeUnit.SECONDS);
Assert.assertTrue(found.get());
}

@SuppressWarnings("unchecked")
public void testSampleRemoteDocLevelMonitorWithAlias() throws IOException, InterruptedException {
String indexAlias = "test_alias";
createIndex("index-000001", Settings.EMPTY);
makeRequest(client(), "POST", "_aliases", Map.of(),
new StringEntity(String.format(Locale.ROOT, "{\"actions\":[{\"add\":{\"index\":\"index-000001\",\"alias\":\"%s\",\"is_write_index\":true}}]}", indexAlias), ContentType.APPLICATION_JSON));
Response response = makeRequest(client(), "POST", "_plugins/_sample_remote_monitor/monitor", Map.of("run_monitor", "doc_level", "index", indexAlias), null);
Assert.assertEquals("Unable to create remote monitor", RestStatus.OK, RestStatus.fromCode(response.getStatusLine().getStatusCode()));

Map<String, Object> responseJson = JsonXContent.jsonXContent.createParser(
NamedXContentRegistry.EMPTY,
LoggingDeprecationHandler.INSTANCE,
response.getEntity().getContent()
).map();
String monitorId = responseJson.get("_id").toString();

response = makeRequest(client(), "POST", "/_plugins/_alerting/monitors/" + monitorId + "/_execute", Map.of(), null);
Assert.assertEquals("Unable to execute remote monitor", RestStatus.OK, RestStatus.fromCode(response.getStatusLine().getStatusCode()));

makeRequest(client(), "POST", String.format(Locale.ROOT, "%s/_rollover", indexAlias), Map.of(),
new StringEntity("", ContentType.APPLICATION_JSON));

response = makeRequest(client(), "POST", "/_plugins/_alerting/monitors/" + monitorId + "/_execute", Map.of(), null);
Assert.assertEquals("Unable to execute remote monitor", RestStatus.OK, RestStatus.fromCode(response.getStatusLine().getStatusCode()));
AtomicBoolean found = new AtomicBoolean(false);
OpenSearchRestTestCase.waitUntil(
() -> {
try {
Response searchResponse = makeRequest(client(), "POST", SampleRemoteDocLevelMonitorRunner.SAMPLE_REMOTE_DOC_LEVEL_MONITOR_RUNNER_INDEX + "/_search", Map.of(),
new StringEntity("{\"query\":{\"match_all\":{}}}", ContentType.APPLICATION_JSON));
Map<String, Object> searchResponseJson = JsonXContent.jsonXContent.createParser(
NamedXContentRegistry.EMPTY,
LoggingDeprecationHandler.INSTANCE,
searchResponse.getEntity().getContent()
).map();
found.set(Integer.parseInt((((Map<String, Object>) ((Map<String, Object>) searchResponseJson.get("hits")).get("total")).get("value")).toString()) > 0 &&
((Map<String, Object>) ((List<Map<String, Object>>) ((Map<String, Object>) searchResponseJson.get("hits")).get("hits")).get(0).get("_source")).containsKey("hello") &&
((Map<String, Object>) ((List<Map<String, Object>>) ((Map<String, Object>) searchResponseJson.get("hits")).get("hits")).get(0).get("_source")).get("hello").toString().equals("hello"));
return found.get();
Expand Down
Loading