diff --git a/elementary/monitor/alerts/source_freshness.py b/elementary/monitor/alerts/source_freshness.py index 085b17592..1155b807b 100644 --- a/elementary/monitor/alerts/source_freshness.py +++ b/elementary/monitor/alerts/source_freshness.py @@ -20,9 +20,10 @@ def __init__( max_loaded_at_time_ago_in_s: Optional[float], source_name: str, identifier: str, - freshness_error_after: str, - freshness_warn_after: str, - freshness_filter: str, + normalized_status: str, + error_after: str, + warn_after: str, + filter: Optional[str], path: str, error: str, **kwargs, @@ -42,9 +43,10 @@ def __init__( self.max_loaded_at_time_ago_in_s = max_loaded_at_time_ago_in_s self.source_name = source_name self.identifier = identifier - self.freshness_error_after = freshness_error_after - self.freshness_warn_after = freshness_warn_after - self.freshness_filter = freshness_filter + self.normalized_status = normalized_status + self.error_after = error_after + self.warn_after = warn_after + self.filter = filter self.path = path self.error = error self.alerts_table = SourceFreshnessAlert.TABLE_NAME @@ -55,7 +57,7 @@ def to_slack(self, is_slack_workflow: bool = False) -> SlackMessageSchema: subscribers = self.slack_message_builder.prettify_and_dedup_list( self.subscribers or [] ) - icon = self.slack_message_builder.get_slack_status_icon(self.status) + icon = self.slack_message_builder.get_slack_status_icon(self.normalized_status) title = [ self.slack_message_builder.create_header_block( @@ -68,7 +70,7 @@ def to_slack(self, is_slack_workflow: bool = False) -> SlackMessageSchema: self.slack_message_builder.create_context_block( [ f"*Source:* {self.source_name}.{self.identifier} |", - f"*Status:* {self.status}", + f"*Status:* {self.normalized_status}", ], ), self.slack_message_builder.create_context_block( @@ -84,7 +86,7 @@ def to_slack(self, is_slack_workflow: bool = False) -> SlackMessageSchema: self.slack_message_builder.create_context_block( [ f"*Source:* {self.source_name}.{self.identifier} |", - f"*Status:* {self.status} |", + f"*Status:* {self.normalized_status} |", f"*{self.detected_at_str}*", ], ), @@ -123,32 +125,31 @@ def to_slack(self, is_slack_workflow: bool = False) -> SlackMessageSchema: ) configuration = [] - if self.freshness_error_after: + + if self.error_after: configuration.append( self.slack_message_builder.create_context_block(["*Error after*"]) ) configuration.append( self.slack_message_builder.create_text_section_block( - f"`{self.freshness_error_after}`" + f"`{self.error_after}`" ) ) - if self.freshness_warn_after: + if self.warn_after: configuration.append( self.slack_message_builder.create_context_block(["*Warn after*"]) ) configuration.append( self.slack_message_builder.create_text_section_block( - f"`{self.freshness_warn_after}`" + f"`{self.warn_after}`" ) ) - if self.freshness_filter: + if self.filter: configuration.append( self.slack_message_builder.create_context_block(["*Filter*"]) ) configuration.append( - self.slack_message_builder.create_text_section_block( - f"`{self.freshness_filter}`" - ) + self.slack_message_builder.create_text_section_block(f"`{self.filter}`") ) if self.path: configuration.append( diff --git a/elementary/monitor/dbt_project/macros/alerts/get_source_freshness_alerts.sql b/elementary/monitor/dbt_project/macros/alerts/get_source_freshness_alerts.sql index c525ab1a3..77fb58bc2 100644 --- a/elementary/monitor/dbt_project/macros/alerts/get_source_freshness_alerts.sql +++ b/elementary/monitor/dbt_project/macros/alerts/get_source_freshness_alerts.sql @@ -2,24 +2,14 @@ -- depends_on: {{ ref('alerts_source_freshness') }} {% set select_pending_alerts_query %} with alerts_in_time_limit as ( - select * from {{ ref('alerts_source_freshness') }} + select *, {{ elementary_cli.normalized_source_freshness_status()}} from {{ ref('alerts_source_freshness') }} where {{ elementary.edr_cast_as_timestamp('detected_at') }} >= {{ elementary_cli.get_alerts_time_limit(days_back) }} ), - models as ( - select * from {{ ref('elementary', 'dbt_models') }} - ), - sources as ( select * from {{ ref('elementary', 'dbt_sources') }} ), - artifacts_meta as ( - select unique_id, meta from models - union all - select unique_id, meta from sources - ), - extended_alerts as ( select alerts_in_time_limit.alert_id, @@ -28,6 +18,7 @@ alerts_in_time_limit.detected_at, alerts_in_time_limit.max_loaded_at_time_ago_in_s, alerts_in_time_limit.status, + alerts_in_time_limit.normalized_status, alerts_in_time_limit.error, alerts_in_time_limit.unique_id, {# Currently alert_class_id equals to unique_id - might change in the future so we return both #} @@ -36,9 +27,9 @@ alerts_in_time_limit.schema_name, alerts_in_time_limit.source_name, alerts_in_time_limit.identifier, - alerts_in_time_limit.freshness_error_after, - alerts_in_time_limit.freshness_warn_after, - alerts_in_time_limit.freshness_filter, + alerts_in_time_limit.warn_after, + alerts_in_time_limit.error_after, + alerts_in_time_limit.filter, alerts_in_time_limit.tags, alerts_in_time_limit.meta, alerts_in_time_limit.owner, @@ -51,9 +42,12 @@ else suppression_status end as suppression_status, alerts_in_time_limit.sent_at, - artifacts_meta.meta as model_meta + sources.meta as model_meta, + sources.freshness_error_after, + sources.freshness_warn_after, + sources.freshness_filter from alerts_in_time_limit - left join artifacts_meta on alerts_in_time_limit.unique_id = artifacts_meta.unique_id + left join sources on alerts_in_time_limit.unique_id = sources.unique_id ) select * @@ -65,6 +59,10 @@ {% set alerts_dicts = elementary.agate_to_dicts(alerts_agate) %} {% set pending_alerts = [] %} {% for alert_dict in alerts_dicts %} + {% set error_after = alert_dict.get('error_after') %} + {% set warn_after = alert_dict.get('warn_after') %} + {% set filter = alert_dict.get('filter') %} + {% set pending_alert_dict = {'id': alert_dict.get('alert_id'), 'model_unique_id': alert_dict.get('unique_id'), 'alert_class_id': alert_dict.get('alert_class_id'), @@ -76,10 +74,11 @@ 'schema_name': alert_dict.get('schema_name'), 'source_name': alert_dict.get('source_name'), 'identifier': alert_dict.get('identifier'), - 'freshness_error_after': alert_dict.get('freshness_error_after'), - 'freshness_warn_after': alert_dict.get('freshness_warn_after'), - 'freshness_filter': alert_dict.get('freshness_filter'), + 'error_after': error_after if error_after is not none else alert_dict.get('freshness_error_after'), + 'warn_after': warn_after if warn_after is not none else alert_dict.get('freshness_warn_after'), + 'filter': filter if filter is not none else alert_dict.get('freshness_filter'), 'status': alert_dict.get('status'), + 'normalized_status': alert_dict.get('normalized_status'), 'owners': alert_dict.get('owner'), 'path': alert_dict.get('path'), 'error': alert_dict.get('error'), diff --git a/elementary/monitor/dbt_project/macros/utils/normalized_source_freshness_status.sql b/elementary/monitor/dbt_project/macros/utils/normalized_source_freshness_status.sql new file mode 100644 index 000000000..d8ebab057 --- /dev/null +++ b/elementary/monitor/dbt_project/macros/utils/normalized_source_freshness_status.sql @@ -0,0 +1,7 @@ +{% macro normalized_source_freshness_status() %} + case + when status = 'error' then 'fail' + when status = 'runtime error' then 'error' + else status + end as normalized_status +{% endmacro %} diff --git a/tests/mocks/fetchers/alerts_fetcher_mock.py b/tests/mocks/fetchers/alerts_fetcher_mock.py index ab5704115..3986d12c9 100644 --- a/tests/mocks/fetchers/alerts_fetcher_mock.py +++ b/tests/mocks/fetchers/alerts_fetcher_mock.py @@ -322,10 +322,11 @@ def query_pending_source_freshness_alerts(self, *args, **kwargs): schema_name="test_schema", source_name="source_1", identifier="identifier", - freshness_error_after="10", - freshness_warn_after="10", - freshness_filter="", + error_after="10", + warn_after="10", + filter="", status="fail", + normalized_status="fail", owners="[]", path="", error="", @@ -347,10 +348,11 @@ def query_pending_source_freshness_alerts(self, *args, **kwargs): schema_name="test_schema", source_name="source_1", identifier="identifier", - freshness_error_after="10", - freshness_warn_after="10", - freshness_filter="", + error_after="10", + warn_after="10", + filter="", status="fail", + normalized_status="fail", owners="[]", path="", error="", @@ -372,10 +374,11 @@ def query_pending_source_freshness_alerts(self, *args, **kwargs): schema_name="test_schema", source_name="source_2", identifier="identifier", - freshness_error_after="10", - freshness_warn_after="10", - freshness_filter="", + error_after="10", + warn_after="10", + filter="", status="fail", + normalized_status="fail", owners="[]", path="", error="", @@ -397,10 +400,11 @@ def query_pending_source_freshness_alerts(self, *args, **kwargs): schema_name="test_schema", source_name="source_3", identifier="identifier", - freshness_error_after="10", - freshness_warn_after="10", - freshness_filter="", + error_after="10", + warn_after="10", + filter="", status="fail", + normalized_status="fail", owners="[]", path="", error="", @@ -424,10 +428,11 @@ def query_pending_source_freshness_alerts(self, *args, **kwargs): schema_name="test_schema", source_name="source_3", identifier="identifier", - freshness_error_after="10", - freshness_warn_after="10", - freshness_filter="", + error_after="10", + warn_after="10", + filter="", status="fail", + normalized_status="fail", owners="[]", path="", error="", diff --git a/tests/unit/monitor/api/alerts/test_alert_filters.py b/tests/unit/monitor/api/alerts/test_alert_filters.py index 42d69849b..9366a12b4 100644 --- a/tests/unit/monitor/api/alerts/test_alert_filters.py +++ b/tests/unit/monitor/api/alerts/test_alert_filters.py @@ -171,14 +171,15 @@ def initial_alerts(): tags='["one", "two"]', owners='["jeff", "john"]', status="error", + normalized_status="fail", snapshotted_at="2023-08-15T12:26:06.884065+00:00", max_loaded_at="1969-12-31T00:00:00+00:00", max_loaded_at_time_ago_in_s=1692188766.884065, source_name="elementary_integration_tests", identifier="any_type_column_anomalies_validation", - freshness_error_after='{"count": null, "period": null}', - freshness_warn_after='{"count": 1, "period": "minute"}', - freshness_filter="null", + error_after='{"count": null, "period": null}', + warn_after='{"count": 1, "period": "minute"}', + filter="null", error="problemz", ), SourceFreshnessAlert( @@ -196,14 +197,15 @@ def initial_alerts(): tags='["one", "two"]', owners='["jeff", "john"]', status="warn", + normalized_status="warn", snapshotted_at="2023-08-15T12:26:06.884065+00:00", max_loaded_at="1969-12-31T00:00:00+00:00", max_loaded_at_time_ago_in_s=1692188766.884065, source_name="elementary_integration_tests", identifier="any_type_column_anomalies_validation", - freshness_error_after='{"count": null, "period": null}', - freshness_warn_after='{"count": 1, "period": "minute"}', - freshness_filter="null", + error_after='{"count": null, "period": null}', + warn_after='{"count": 1, "period": "minute"}', + filter="null", error="problemz", ), SourceFreshnessAlert( @@ -221,14 +223,15 @@ def initial_alerts(): tags='["one", "two"]', owners='["jeff", "john"]', status="runtime error", + normalized_status="error", snapshotted_at="2023-08-15T12:26:06.884065+00:00", max_loaded_at="1969-12-31T00:00:00+00:00", max_loaded_at_time_ago_in_s=1692188766.884065, source_name="elementary_integration_tests", identifier="any_type_column_anomalies_validation", - freshness_error_after='{"count": null, "period": null}', - freshness_warn_after='{"count": 1, "period": "minute"}', - freshness_filter="null", + error_after='{"count": null, "period": null}', + warn_after='{"count": 1, "period": "minute"}', + filter="null", error="problemz", ), ]