Skip to content

Commit

Permalink
fix coordinator node logic
Browse files Browse the repository at this point in the history
Signed-off-by: Surya Sashank Nistala <[email protected]>
  • Loading branch information
eirsep committed Feb 23, 2024
1 parent f44d05c commit e68ea5b
Show file tree
Hide file tree
Showing 11 changed files with 241 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
AlertingSettings.MAX_ACTION_THROTTLE_VALUE,
AlertingSettings.FILTER_BY_BACKEND_ROLES,
AlertingSettings.MAX_ACTIONABLE_ALERT_COUNT,
AlertingSettings.DOC_LEVEL_MONITOR_FAN_OUT_NODES,
LegacyOpenDistroAlertingSettings.INPUT_TIMEOUT,
LegacyOpenDistroAlertingSettings.INDEX_TIMEOUT,
LegacyOpenDistroAlertingSettings.BULK_TIMEOUT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ import org.opensearch.alerting.model.userErrorMessage
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED
import org.opensearch.alerting.settings.AlertingSettings.Companion.PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT
import org.opensearch.alerting.settings.AlertingSettings.Companion.PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.defaultToPerExecutionAction
Expand Down Expand Up @@ -83,6 +81,9 @@ import java.time.Instant
import java.util.UUID
import java.util.concurrent.atomic.AtomicLong
import java.util.stream.Collectors
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine
import kotlin.math.max

object DocumentLevelMonitorRunner : MonitorRunner() {
Expand All @@ -96,8 +97,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
dryrun: Boolean,
workflowRunContext: WorkflowRunContext?,
executionId: String,
transportService: TransportService?
transportService: TransportService?,
): MonitorRunResult<DocumentLevelTriggerRunResult> {
if (transportService == null)
throw RuntimeException("transport service should not be null")
logger.debug("Document-level-monitor is running ...")
val isTempMonitor = dryrun || monitor.id == Monitor.NO_ID
var monitorResult = MonitorRunResult<DocumentLevelTriggerRunResult>(monitor.name, periodStart, periodEnd)
Expand Down Expand Up @@ -182,6 +185,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
val concreteIndicesSeenSoFar = mutableListOf<String>()
val updatedIndexNames = mutableListOf<String>()
val queryingStartTimeMillis = System.currentTimeMillis()
val docLevelMonitorFanOutResponses: MutableList<DocLevelMonitorFanOutResponse> = mutableListOf()
docLevelMonitorInput.indices.forEach { indexName ->

var concreteIndices = IndexUtils.resolveAllIndices(
Expand Down Expand Up @@ -278,8 +282,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
conflictingFields.toList(),
matchingDocIdsPerIndex?.get(concreteIndexName),
)

val shards = indexUpdatedRunContext.keys
val shards = mutableSetOf<String>()
shards.addAll(indexUpdatedRunContext.keys)
shards.remove("index")
shards.remove("shards_count")

Expand Down Expand Up @@ -307,57 +311,63 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
logger.info(it1.id.toString())
}
}

val listener = GroupedActionListener(
object : ActionListener<Collection<DocLevelMonitorFanOutResponse>> {
override fun onResponse(response: Collection<DocLevelMonitorFanOutResponse>) {
logger.info("hit here1")
}

override fun onFailure(e: Exception) {
logger.info("hit here2")
}
},
nodeMap.size
)
val responseReader = Writeable.Reader {
DocLevelMonitorFanOutResponse(it)
}
for (node in nodeMap) {
val docLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest(
node.key,
monitor,
monitorMetadata,
executionId,
listOf(indexExecutionContext),
nodeShardAssignments[node.key]!!.toList(),
workflowRunContext
)
transportService!!.sendRequest(
node.value,
DocLevelMonitorFanOutAction.NAME,
docLevelMonitorFanOutRequest,
TransportRequestOptions.EMPTY,
object : ActionListenerResponseHandler<DocLevelMonitorFanOutResponse>(listener, responseReader) {
override fun handleException(e: TransportException) {
listener.onFailure(e)
val responses: Collection<DocLevelMonitorFanOutResponse> = suspendCoroutine { cont ->
val listener = GroupedActionListener(
object : ActionListener<Collection<DocLevelMonitorFanOutResponse>> {
override fun onResponse(response: Collection<DocLevelMonitorFanOutResponse>) {
logger.info("hit here1")
cont.resume(response)
}

override fun handleResponse(response: DocLevelMonitorFanOutResponse) {
listener.onResponse(response)
override fun onFailure(e: Exception) {
logger.info("Fan out failed")
cont.resumeWithException(e)
}
}
},
nodeMap.size
)
val responseReader = Writeable.Reader {
DocLevelMonitorFanOutResponse(it)
}
for (node in nodeMap) {
val docLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest(
node.key,
monitor,
dryrun,
monitorMetadata,
executionId,
listOf(indexExecutionContext),
nodeShardAssignments[node.key]!!.toList(),
workflowRunContext
)

transportService.sendRequest(
node.value,
DocLevelMonitorFanOutAction.NAME,
docLevelMonitorFanOutRequest,
TransportRequestOptions.EMPTY,
object : ActionListenerResponseHandler<DocLevelMonitorFanOutResponse>(listener, responseReader) {
override fun handleException(e: TransportException) {
listener.onFailure(e)
}

override fun handleResponse(response: DocLevelMonitorFanOutResponse) {
listener.onResponse(response)
}
}
)
}
}
docLevelMonitorFanOutResponses.addAll(responses)
}
}

updateLastRunContextFromFanOutResponses(docLevelMonitorFanOutResponses, updatedLastRunContext)
MonitorMetadataService.upsertMetadata(
monitorMetadata.copy(lastRunContext = updatedLastRunContext),
true
)
// TODO: Update the Document as part of the Trigger and return back the trigger action result
return monitorResult.copy(triggerResults = emptyMap())
return monitorResult.copy(triggerResults = buildTriggerResults(docLevelMonitorFanOutResponses))
} catch (e: Exception) {
val errorMessage = ExceptionsHelper.detailedMessage(e)
monitorCtx.alertService!!.upsertMonitorErrorAlert(monitor, errorMessage, executionId, workflowRunContext)
Expand All @@ -382,6 +392,35 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
}
}

private fun buildTriggerResults(
docLevelMonitorFanOutResponses: MutableList<DocLevelMonitorFanOutResponse>,
): Map<String, DocumentLevelTriggerRunResult> {
val triggerResults = mutableMapOf<String, DocumentLevelTriggerRunResult>()
for (res in docLevelMonitorFanOutResponses) {
for (triggerId in res.triggerResults.keys) {
val documentLevelTriggerRunResult = res.triggerResults[triggerId]
if (documentLevelTriggerRunResult != null) {
if (false == triggerResults.contains(triggerId)) {
triggerResults[triggerId] = documentLevelTriggerRunResult
} else {
val currVal = triggerResults[triggerId]
val newTrigggeredDocs = mutableListOf<String>()
newTrigggeredDocs.addAll(currVal!!.triggeredDocs)
newTrigggeredDocs.addAll(documentLevelTriggerRunResult.triggeredDocs)
triggerResults.put(
triggerId,
currVal.copy(
triggeredDocs = newTrigggeredDocs,
error = if (currVal.error != null) currVal.error else documentLevelTriggerRunResult.error
)
)
}
}
}
}
return triggerResults
}

private suspend fun onSuccessfulMonitorRun(monitorCtx: MonitorRunnerExecutionContext, monitor: Monitor) {
monitorCtx.alertService!!.clearMonitorErrorAlert(monitor)
if (monitor.dataSources.alertsHistoryIndex != null) {
Expand Down Expand Up @@ -597,6 +636,35 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
)
}

private fun updateLastRunContextFromFanOutResponses(
docLevelMonitorFanOutResponses: MutableList<DocLevelMonitorFanOutResponse>,
updatedLastRunContext: MutableMap<String, MutableMap<String, Any>>,
) {
// Prepare updatedLastRunContext for each index
for (indexName in updatedLastRunContext.keys) {
for (fanOutResponse in docLevelMonitorFanOutResponses) {
// fanOutResponse.lastRunContexts //updatedContexts for relevant shards
val indexLastRunContext = updatedLastRunContext[indexName] as MutableMap<String, Any>

if (fanOutResponse.lastRunContexts.contains(indexName)) {
val partialUpdatedIndexLastRunContext = fanOutResponse.lastRunContexts[indexName] as MutableMap<String, Any>
partialUpdatedIndexLastRunContext.keys.forEach {

val seq_no = partialUpdatedIndexLastRunContext[it].toString().toIntOrNull()
if (
it != "shards_count" &&
it != "index" &&
seq_no != null &&
seq_no != SequenceNumbers.UNASSIGNED_SEQ_NO.toInt()
) {
indexLastRunContext[it] = seq_no
}
}
}
}
}
}

private fun initializeNewLastRunContext(
lastRunContext: Map<String, Any>,
monitorCtx: MonitorRunnerExecutionContext,
Expand Down Expand Up @@ -670,7 +738,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
}

private fun getShardsCount(clusterService: ClusterService, index: String): Int {
val allShards: List<ShardRouting> = clusterService!!.state().routingTable().allShards(index)
val allShards: List<ShardRouting> = clusterService.state().routingTable().allShards(index)
return allShards.filter { it.primary() }.size
}

Expand Down Expand Up @@ -1057,7 +1125,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
docsBytesSize: Long,
monitorCtx: MonitorRunnerExecutionContext,
): Boolean {
var thresholdPercentage = PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT.get(monitorCtx.settings)
var thresholdPercentage = monitorCtx.percQueryDocsSizeMemoryPercentageLimit
val heapMaxBytes = monitorCtx.jvmStats!!.mem.heapMax.bytes
val thresholdBytes = (thresholdPercentage.toDouble() / 100.0) * heapMaxBytes

Expand All @@ -1068,23 +1136,23 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
numDocs: Int,
monitorCtx: MonitorRunnerExecutionContext,
): Boolean {
var maxNumDocsThreshold = PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY.get(monitorCtx.settings)
var maxNumDocsThreshold = monitorCtx.percQueryMaxNumDocsInMemory
return numDocs >= maxNumDocsThreshold
}

private suspend fun getNodes(monitorCtx: MonitorRunnerExecutionContext): MutableMap<String, DiscoveryNode> {
return monitorCtx.clusterService!!.state().nodes.dataNodes
}

private suspend fun distributeShards(
private fun distributeShards(
monitorCtx: MonitorRunnerExecutionContext,
allNodes: List<String>,
shards: List<String>,
index: String
index: String,
): Map<String, MutableSet<ShardId>> {

val totalShards = shards.size
val totalNodes = allNodes.size.coerceAtMost(totalShards / 2)
val totalNodes = monitorCtx.totalNodesFanOut.coerceAtMost((totalShards + 1) / 2)
val shardsPerNode = totalShards / totalNodes
var shardsRemaining = totalShards % totalNodes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,9 @@ data class MonitorRunnerExecutionContext(
@Volatile var destinationContextFactory: DestinationContextFactory? = null,

@Volatile var maxActionableAlertCount: Long = AlertingSettings.DEFAULT_MAX_ACTIONABLE_ALERT_COUNT,
@Volatile var indexTimeout: TimeValue? = null
@Volatile var indexTimeout: TimeValue? = null,
@Volatile var totalNodesFanOut: Int = AlertingSettings.DEFAULT_FAN_OUT_NODES,
@Volatile var percQueryMaxNumDocsInMemory: Int = AlertingSettings.DEFAULT_PERCOLATE_QUERY_NUM_DOCS_IN_MEMORY,
@Volatile var percQueryDocsSizeMemoryPercentageLimit: Int =
AlertingSettings.DEFAULT_PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT,
)
Loading

0 comments on commit e68ea5b

Please sign in to comment.