Skip to content

Commit

Permalink
Improve error messaging on exceptions from notification channel retri…
Browse files Browse the repository at this point in the history
…eval and fix bug (opensearch-project#451) (opensearch-project#452)

Signed-off-by: Ashish Agrawal <[email protected]>
(cherry picked from commit 32b336d)

Co-authored-by: Ashish Agrawal <[email protected]>
Signed-off-by: Angie Zhang <[email protected]>
  • Loading branch information
2 people authored and Angie Zhang committed Jun 28, 2022
1 parent 457bc36 commit 21a7301
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 49 deletions.
45 changes: 42 additions & 3 deletions alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,19 @@

package org.opensearch.alerting

import org.opensearch.OpenSearchSecurityException
import org.opensearch.alerting.action.GetDestinationsAction
import org.opensearch.alerting.action.GetDestinationsRequest
import org.opensearch.alerting.action.GetDestinationsResponse
import org.opensearch.alerting.model.ActionRunResult
import org.opensearch.alerting.model.AlertingConfigAccessor
import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.model.MonitorMetadata
import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.model.Table
import org.opensearch.alerting.model.action.Action
import org.opensearch.alerting.model.destination.Destination
import org.opensearch.alerting.opensearchapi.InjectorContextElement
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.opensearchapi.withClosableContext
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
import org.opensearch.alerting.script.TriggerExecutionContext
Expand Down Expand Up @@ -127,16 +132,50 @@ abstract class MonitorRunner {
monitorCtx: MonitorRunnerExecutionContext
): NotificationActionConfigs {
var destination: Destination? = null
val channel: NotificationConfigInfo? = getNotificationConfigInfo(monitorCtx.client as NodeClient, action.destinationId)
var notificationPermissionException: Exception? = null

var channel: NotificationConfigInfo? = null
try {
channel = getNotificationConfigInfo(monitorCtx.client as NodeClient, action.destinationId)
} catch (e: OpenSearchSecurityException) {
notificationPermissionException = e
}

// If the channel was not found, try to retrieve the Destination
if (channel == null) {
destination = try {
AlertingConfigAccessor.getDestinationInfo(monitorCtx.client!!, monitorCtx.xContentRegistry!!, action.destinationId)
val table = Table(
"asc",
"destination.name.keyword",
null,
1,
0,
null
)
val getDestinationsRequest = GetDestinationsRequest(
action.destinationId,
0L,
null,
table,
"ALL"
)

val getDestinationsResponse: GetDestinationsResponse = monitorCtx.client!!.suspendUntil {
monitorCtx.client!!.execute(GetDestinationsAction.INSTANCE, getDestinationsRequest, it)
}
getDestinationsResponse.destinations.firstOrNull()
} catch (e: IllegalStateException) {
// Catching the exception thrown when the Destination was not found so the NotificationActionConfigs object can be returned
null
} catch (e: OpenSearchSecurityException) {
if (notificationPermissionException != null)
throw notificationPermissionException
else
throw e
}

if (destination == null && notificationPermissionException != null)
throw notificationPermissionException
}

return NotificationActionConfigs(destination, channel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import kotlinx.coroutines.withContext
import org.opensearch.action.get.GetRequest
import org.opensearch.action.get.GetResponse
import org.opensearch.alerting.core.model.ScheduledJob
import org.opensearch.alerting.model.destination.Destination
import org.opensearch.alerting.model.destination.email.EmailAccount
import org.opensearch.alerting.model.destination.email.EmailGroup
import org.opensearch.alerting.opensearchapi.suspendUntil
Expand All @@ -30,18 +29,6 @@ import org.opensearch.index.IndexNotFoundException
class AlertingConfigAccessor {
companion object {

suspend fun getMonitorInfo(client: Client, xContentRegistry: NamedXContentRegistry, monitorId: String): Monitor {
val jobSource = getAlertingConfigDocumentSource(client, "Monitor", monitorId)
return withContext(Dispatchers.IO) {
val xcp = XContentHelper.createParser(
xContentRegistry, LoggingDeprecationHandler.INSTANCE,
jobSource, XContentType.JSON
)
val monitor = Monitor.parse(xcp)
monitor
}
}

suspend fun getMonitorMetadata(client: Client, xContentRegistry: NamedXContentRegistry, metadataId: String): MonitorMetadata? {
return try {
val jobSource = getAlertingConfigDocumentSource(client, "Monitor Metadata", metadataId)
Expand All @@ -64,18 +51,6 @@ class AlertingConfigAccessor {
}
}

suspend fun getDestinationInfo(client: Client, xContentRegistry: NamedXContentRegistry, destinationId: String): Destination {
val jobSource = getAlertingConfigDocumentSource(client, "Destination", destinationId)
return withContext(Dispatchers.IO) {
val xcp = XContentHelper.createParser(
xContentRegistry, LoggingDeprecationHandler.INSTANCE,
jobSource, XContentType.JSON
)
val destination = Destination.parseWithType(xcp)
destination
}
}

suspend fun getEmailAccountInfo(client: Client, xContentRegistry: NamedXContentRegistry, emailAccountId: String): EmailAccount {
val source = getAlertingConfigDocumentSource(client, "Email account", emailAccountId)
return withContext(Dispatchers.IO) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.settings.DestinationSettings
import org.opensearch.client.Client
import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentFactory

Expand Down Expand Up @@ -125,26 +124,6 @@ fun defaultToPerExecutionAction(
return false
}

// TODO: Check if this can be more generic such that TransportIndexMonitorAction class can use this. Also see if this should be refactored
// to another class. Include tests for this as well.
suspend fun updateMonitor(client: Client, xContentRegistry: NamedXContentRegistry, settings: Settings, monitor: Monitor): IndexResponse {
/*val currentMonitor = AlertingConfigAccessor.getMonitorInfo(client, xContentRegistry, monitor.id)
var updateMonitor = monitor
// If both are enabled, use the current existing monitor enabled time, otherwise the next execution will be
// incorrect.
if (monitor.enabled && currentMonitor.enabled)
updateMonitor = monitor.copy(enabledTime = currentMonitor.enabledTime)*/

val indexRequest = IndexRequest(ScheduledJob.SCHEDULED_JOBS_INDEX)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(monitor.toXContentWithUser(XContentFactory.jsonBuilder(), ToXContent.MapParams(mapOf("with_type" to "true"))))
.id(monitor.id)
.timeout(AlertingSettings.INDEX_TIMEOUT.get(settings))

return client.suspendUntil { client.index(indexRequest, it) }
}

suspend fun updateMonitorMetadata(client: Client, settings: Settings, monitorMetadata: MonitorMetadata): IndexResponse {
val indexRequest = IndexRequest(ScheduledJob.SCHEDULED_JOBS_INDEX)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.alerting.util.destinationmigration

import org.apache.logging.log4j.LogManager
import org.opensearch.OpenSearchSecurityException
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.bulk.BackoffPolicy
import org.opensearch.alerting.model.destination.Destination
Expand Down Expand Up @@ -48,6 +49,8 @@ class NotificationApiUtils {
return try {
val res: GetNotificationConfigResponse = getNotificationConfig(client, GetNotificationConfigRequest(setOf(id)))
res.searchResult.objectList.firstOrNull()
} catch (e: OpenSearchSecurityException) {
throw e
} catch (e: OpenSearchStatusException) {
if (e.status() == RestStatus.NOT_FOUND) {
logger.debug("Notification config [$id] was not found")
Expand Down

0 comments on commit 21a7301

Please sign in to comment.