Skip to content

Commit

Permalink
Merge pull request #1716 from elementary-data/ele-3682-alerts-grouping
Browse files Browse the repository at this point in the history
Ele 3682 alerts grouping
  • Loading branch information
MikaKerman authored Oct 14, 2024
2 parents 60fd215 + a074335 commit b7e70f7
Show file tree
Hide file tree
Showing 12 changed files with 425 additions and 95 deletions.
10 changes: 9 additions & 1 deletion elementary/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from google.auth.exceptions import DefaultCredentialsError # type: ignore[import]

from elementary.exceptions.exceptions import InvalidArgumentsError
from elementary.monitor.alerts.group_of_alerts import GroupingType
from elementary.monitor.alerts.grouping_type import GroupingType
from elementary.utils.ordered_yaml import OrderedYaml


Expand Down Expand Up @@ -35,6 +35,8 @@ class Config:

DEFAULT_TARGET_PATH = os.getcwd() + "/edr_target"

DEFAULT_GROUP_ALERTS_THRESHOLD = 100

def __init__(
self,
config_dir: str = DEFAULT_CONFIG_DIR,
Expand All @@ -49,6 +51,7 @@ def __init__(
slack_token: Optional[str] = None,
slack_channel_name: Optional[str] = None,
slack_group_alerts_by: Optional[str] = None,
group_alerts_threshold: Optional[int] = None,
timezone: Optional[str] = None,
aws_profile_name: Optional[str] = None,
aws_region_name: Optional[str] = None,
Expand Down Expand Up @@ -124,6 +127,11 @@ def __init__(
slack_config.get("group_alerts_by"),
GroupingType.BY_ALERT.value,
)
self.group_alerts_threshold = self._first_not_none(
group_alerts_threshold,
slack_config.get("group_alerts_threshold"),
self.DEFAULT_GROUP_ALERTS_THRESHOLD,
)

teams_config = config.get(self._TEAMS, {})
self.teams_webhook = self._first_not_none(
Expand Down
7 changes: 7 additions & 0 deletions elementary/monitor/alerts/alerts_groups/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from .alerts_group import AlertsGroup
from .grouped_by_table import GroupedByTableAlerts

__all__ = [
"AlertsGroup",
"GroupedByTableAlerts",
]
Original file line number Diff line number Diff line change
@@ -1,23 +1,12 @@
from datetime import datetime
from enum import Enum
from typing import Dict, List, Optional, Union
from typing import Dict, List, Union

from elementary.monitor.alerts.model_alert import ModelAlertModel
from elementary.monitor.alerts.source_freshness_alert import SourceFreshnessAlertModel
from elementary.monitor.alerts.test_alert import TestAlertModel
from elementary.monitor.data_monitoring.alerts.integrations.utils.report_link import (
ReportLinkData,
get_model_test_runs_link,
)
from elementary.utils.models import get_shortened_model_name


class GroupingType(Enum):
BY_ALERT = "alert"
BY_TABLE = "table"


class GroupedByTableAlerts:
class AlertsGroup:
def __init__(
self,
alerts: List[Union[TestAlertModel, ModelAlertModel, SourceFreshnessAlertModel]],
Expand All @@ -30,26 +19,28 @@ def __init__(
self._sort_alerts()

@property
def model_unique_id(self) -> Optional[str]:
return self.alerts[0].model_unique_id

@property
def model(self) -> str:
return get_shortened_model_name(self.model_unique_id)
def summary(self) -> str:
return f"{len(self.alerts)} issues detected"

@property
def detected_at(self) -> datetime:
# We return the minimum alert detected at time as the group detected at time
return min(alert.detected_at or datetime.max for alert in self.alerts)

@property
def report_url(self) -> Optional[str]:
return self.alerts[0].report_url
def status(self) -> str:
if self.model_errors or self.test_errors:
return "error"
elif self.test_failures:
return "failure"
else:
return "warn"

@property
def data(self) -> List[Dict]:
return [alert.data for alert in self.alerts]

@property
def unified_meta(self) -> Dict:
# If a model level unified meta is defined, we use is.
# Else we use one of the tests level unified metas.
model_unified_meta = dict()
test_unified_meta = dict()
for alert in self.alerts:
Expand All @@ -62,29 +53,6 @@ def unified_meta(self) -> Dict:
test_unified_meta = alert_unified_meta
return model_unified_meta or test_unified_meta

@property
def data(self) -> List[Dict]:
return [alert.data for alert in self.alerts]

@property
def summary(self) -> str:
return f"{self.model}: {len(self.alerts)} issues detected"

@property
def status(self) -> str:
if self.model_errors or self.test_errors:
return "error"
elif self.test_failures:
return "failure"
else:
return "warn"

def get_report_link(self) -> Optional[ReportLinkData]:
if not self.model_errors:
return get_model_test_runs_link(self.report_url, self.model_unique_id)

return None

def _sort_alerts(self):
for alert in self.alerts:
if isinstance(alert, ModelAlertModel):
Expand Down
32 changes: 32 additions & 0 deletions elementary/monitor/alerts/alerts_groups/grouped_by_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from typing import Optional

from elementary.monitor.alerts.alerts_groups.alerts_group import AlertsGroup
from elementary.monitor.data_monitoring.alerts.integrations.utils.report_link import (
ReportLinkData,
get_model_test_runs_link,
)
from elementary.utils.models import get_shortened_model_name


class GroupedByTableAlerts(AlertsGroup):
@property
def model_unique_id(self) -> Optional[str]:
return self.alerts[0].model_unique_id

@property
def model(self) -> str:
return get_shortened_model_name(self.model_unique_id)

@property
def report_url(self) -> Optional[str]:
return self.alerts[0].report_url

@property
def summary(self) -> str:
return f"{self.model}: {len(self.alerts)} issues detected"

def get_report_link(self) -> Optional[ReportLinkData]:
if not self.model_errors:
return get_model_test_runs_link(self.report_url, self.model_unique_id)

return None
6 changes: 6 additions & 0 deletions elementary/monitor/alerts/grouping_type.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from enum import Enum


class GroupingType(str, Enum):
BY_ALERT = "alert"
BY_TABLE = "table"
8 changes: 8 additions & 0 deletions elementary/monitor/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,12 @@ def get_cli_properties() -> dict:
default=None,
help="DEPRECATED! - A slack webhook URL for sending alerts to a specific channel.",
)
@click.option(
"--group-alerts-threshold",
type=int,
default=Config.DEFAULT_GROUP_ALERTS_THRESHOLD,
help="The threshold for all alerts in a single message.",
)
@click.option(
"--timezone",
"-tz",
Expand Down Expand Up @@ -276,6 +282,7 @@ def monitor(
deprecated_slack_webhook,
slack_token,
slack_channel_name,
group_alerts_threshold,
timezone,
config_dir,
profiles_dir,
Expand Down Expand Up @@ -322,6 +329,7 @@ def monitor(
slack_webhook=slack_webhook,
slack_token=slack_token,
slack_channel_name=slack_channel_name,
group_alerts_threshold=group_alerts_threshold,
timezone=timezone,
env=env,
slack_group_alerts_by=group_by,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from alive_progress import alive_it

from elementary.config.config import Config
from elementary.monitor.alerts.group_of_alerts import GroupedByTableAlerts, GroupingType
from elementary.monitor.alerts.alerts_groups import AlertsGroup, GroupedByTableAlerts
from elementary.monitor.alerts.grouping_type import GroupingType
from elementary.monitor.alerts.model_alert import ModelAlertModel
from elementary.monitor.alerts.source_freshness_alert import SourceFreshnessAlertModel
from elementary.monitor.alerts.test_alert import TestAlertModel
Expand Down Expand Up @@ -251,18 +252,19 @@ def _send_alerts(

alerts_with_progress_bar = alive_it(alerts, title="Sending alerts")
sent_successfully_alerts = []
for alert in alerts_with_progress_bar:
sent_successfully = self.alerts_integration.send_alert(alert=alert)
for alert, sent_successfully in self.alerts_integration.send_alerts(
alerts_with_progress_bar, self.config.group_alerts_threshold
):
if sent_successfully:
if isinstance(alert, GroupedByTableAlerts):
if isinstance(alert, AlertsGroup):
sent_successfully_alerts.extend(alert.alerts)
else:
sent_successfully_alerts.append(alert)
else:
if isinstance(alert, GroupedByTableAlerts):
for grouped_alert in alert.alerts:
if isinstance(alert, AlertsGroup):
for inner_alert in alert.alerts:
logger.error(
f"Could not send the alert - {grouped_alert.id}. Full alert: {json.dumps(grouped_alert.data)}"
f"Could not send the alert - {inner_alert.id}. Full alert: {json.dumps(inner_alert.data)}"
)
else:
logger.error(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from abc import ABC, abstractmethod
from typing import Union
from typing import Generator, List, Sequence, Tuple, Union

from elementary.monitor.alerts.group_of_alerts import GroupedByTableAlerts
from elementary.monitor.alerts.alerts_groups import AlertsGroup, GroupedByTableAlerts
from elementary.monitor.alerts.model_alert import ModelAlertModel
from elementary.monitor.alerts.source_freshness_alert import SourceFreshnessAlertModel
from elementary.monitor.alerts.test_alert import TestAlertModel
from elementary.utils.log import get_logger

logger = get_logger(__name__)


class BaseIntegration(ABC):
Expand All @@ -22,9 +25,10 @@ def _get_alert_template(
ModelAlertModel,
SourceFreshnessAlertModel,
GroupedByTableAlerts,
AlertsGroup,
],
*args,
**kwargs
**kwargs,
):
if isinstance(alert, TestAlertModel):
if alert.is_elementary_test:
Expand All @@ -40,6 +44,8 @@ def _get_alert_template(
return self._get_source_freshness_template(alert)
elif isinstance(alert, GroupedByTableAlerts):
return self._get_group_by_table_template(alert)
elif isinstance(alert, AlertsGroup):
return self._get_alerts_group_template(alert)

@abstractmethod
def _get_dbt_test_template(self, alert: TestAlertModel, *args, **kwargs):
Expand Down Expand Up @@ -69,6 +75,10 @@ def _get_group_by_table_template(
):
raise NotImplementedError

@abstractmethod
def _get_alerts_group_template(self, alert: AlertsGroup, *args, **kwargs):
raise NotImplementedError

@abstractmethod
def _get_fallback_template(
self,
Expand All @@ -79,7 +89,7 @@ def _get_fallback_template(
GroupedByTableAlerts,
],
*args,
**kwargs
**kwargs,
):
raise NotImplementedError

Expand All @@ -91,12 +101,83 @@ def send_alert(
ModelAlertModel,
SourceFreshnessAlertModel,
GroupedByTableAlerts,
AlertsGroup,
],
*args,
**kwargs
**kwargs,
) -> bool:
raise NotImplementedError

def _group_alerts(
self,
alerts: Sequence[
Union[
TestAlertModel,
ModelAlertModel,
SourceFreshnessAlertModel,
GroupedByTableAlerts,
]
],
threshold: int,
) -> Sequence[
Union[
TestAlertModel,
ModelAlertModel,
SourceFreshnessAlertModel,
GroupedByTableAlerts,
AlertsGroup,
]
]:
flattened_alerts: List[
Union[TestAlertModel, ModelAlertModel, SourceFreshnessAlertModel]
] = []
for alert in alerts:
if isinstance(alert, AlertsGroup):
flattened_alerts.extend(alert.alerts)
else:
flattened_alerts.append(alert)

if len(flattened_alerts) >= threshold:
logger.info(f"Grouping {len(flattened_alerts)} alerts into one")
return [
AlertsGroup(alerts=flattened_alerts),
]
return alerts

def send_alerts(
self,
alerts: Sequence[
Union[
TestAlertModel,
ModelAlertModel,
SourceFreshnessAlertModel,
GroupedByTableAlerts,
]
],
group_alerts_threshold: int,
*args,
**kwargs,
) -> Generator[
Tuple[
Union[
TestAlertModel,
ModelAlertModel,
SourceFreshnessAlertModel,
],
bool,
],
None,
None,
]:
grouped_alerts = self._group_alerts(alerts, group_alerts_threshold)
for alert in grouped_alerts:
if isinstance(alert, AlertsGroup):
sent_successfully = self.send_alert(alert, *args, **kwargs)
for inner_alert in alert.alerts:
yield inner_alert, sent_successfully
else:
yield alert, self.send_alert(alert, *args, **kwargs)

@abstractmethod
def send_test_message(self, *args, **kwargs) -> bool:
raise NotImplementedError
Loading

0 comments on commit b7e70f7

Please sign in to comment.