From 1d4ed75f827c6649660d30accb3773aac5be12fa Mon Sep 17 00:00:00 2001 From: Noy Arie Date: Thu, 28 Sep 2023 09:46:06 +0300 Subject: [PATCH 1/2] Ele 1706 add to freshness result criteria (#1175) * criteria + normalized status * use error * cleanup * fix mock objects * query new field * add description to alert * Revert "add description to alert" This reverts commit 5af234ffad10e4db290aa19b895ed46c255948cf. * Revert "query new field" This reverts commit b47260178f15b8d8fee63f42a92c98b756a8a706. * fix tests --- elementary/monitor/alerts/source_freshness.py | 35 +++++++++--------- .../alerts/get_source_freshness_alerts.sql | 37 +++++++++---------- .../normalized_source_freshness_status.sql | 7 ++++ tests/mocks/fetchers/alerts_fetcher_mock.py | 35 ++++++++++-------- .../monitor/api/alerts/test_alert_filters.py | 21 ++++++----- 5 files changed, 75 insertions(+), 60 deletions(-) create mode 100644 elementary/monitor/dbt_project/macros/utils/normalized_source_freshness_status.sql diff --git a/elementary/monitor/alerts/source_freshness.py b/elementary/monitor/alerts/source_freshness.py index 085b17592..1155b807b 100644 --- a/elementary/monitor/alerts/source_freshness.py +++ b/elementary/monitor/alerts/source_freshness.py @@ -20,9 +20,10 @@ def __init__( max_loaded_at_time_ago_in_s: Optional[float], source_name: str, identifier: str, - freshness_error_after: str, - freshness_warn_after: str, - freshness_filter: str, + normalized_status: str, + error_after: str, + warn_after: str, + filter: Optional[str], path: str, error: str, **kwargs, @@ -42,9 +43,10 @@ def __init__( self.max_loaded_at_time_ago_in_s = max_loaded_at_time_ago_in_s self.source_name = source_name self.identifier = identifier - self.freshness_error_after = freshness_error_after - self.freshness_warn_after = freshness_warn_after - self.freshness_filter = freshness_filter + self.normalized_status = normalized_status + self.error_after = error_after + self.warn_after = warn_after + self.filter = filter self.path = path self.error = error self.alerts_table = SourceFreshnessAlert.TABLE_NAME @@ -55,7 +57,7 @@ def to_slack(self, is_slack_workflow: bool = False) -> SlackMessageSchema: subscribers = self.slack_message_builder.prettify_and_dedup_list( self.subscribers or [] ) - icon = self.slack_message_builder.get_slack_status_icon(self.status) + icon = self.slack_message_builder.get_slack_status_icon(self.normalized_status) title = [ self.slack_message_builder.create_header_block( @@ -68,7 +70,7 @@ def to_slack(self, is_slack_workflow: bool = False) -> SlackMessageSchema: self.slack_message_builder.create_context_block( [ f"*Source:* {self.source_name}.{self.identifier} |", - f"*Status:* {self.status}", + f"*Status:* {self.normalized_status}", ], ), self.slack_message_builder.create_context_block( @@ -84,7 +86,7 @@ def to_slack(self, is_slack_workflow: bool = False) -> SlackMessageSchema: self.slack_message_builder.create_context_block( [ f"*Source:* {self.source_name}.{self.identifier} |", - f"*Status:* {self.status} |", + f"*Status:* {self.normalized_status} |", f"*{self.detected_at_str}*", ], ), @@ -123,32 +125,31 @@ def to_slack(self, is_slack_workflow: bool = False) -> SlackMessageSchema: ) configuration = [] - if self.freshness_error_after: + + if self.error_after: configuration.append( self.slack_message_builder.create_context_block(["*Error after*"]) ) configuration.append( self.slack_message_builder.create_text_section_block( - f"`{self.freshness_error_after}`" + f"`{self.error_after}`" ) ) - if self.freshness_warn_after: + if self.warn_after: configuration.append( self.slack_message_builder.create_context_block(["*Warn after*"]) ) configuration.append( self.slack_message_builder.create_text_section_block( - f"`{self.freshness_warn_after}`" + f"`{self.warn_after}`" ) ) - if self.freshness_filter: + if self.filter: configuration.append( self.slack_message_builder.create_context_block(["*Filter*"]) ) configuration.append( - self.slack_message_builder.create_text_section_block( - f"`{self.freshness_filter}`" - ) + self.slack_message_builder.create_text_section_block(f"`{self.filter}`") ) if self.path: configuration.append( diff --git a/elementary/monitor/dbt_project/macros/alerts/get_source_freshness_alerts.sql b/elementary/monitor/dbt_project/macros/alerts/get_source_freshness_alerts.sql index c525ab1a3..77fb58bc2 100644 --- a/elementary/monitor/dbt_project/macros/alerts/get_source_freshness_alerts.sql +++ b/elementary/monitor/dbt_project/macros/alerts/get_source_freshness_alerts.sql @@ -2,24 +2,14 @@ -- depends_on: {{ ref('alerts_source_freshness') }} {% set select_pending_alerts_query %} with alerts_in_time_limit as ( - select * from {{ ref('alerts_source_freshness') }} + select *, {{ elementary_cli.normalized_source_freshness_status()}} from {{ ref('alerts_source_freshness') }} where {{ elementary.edr_cast_as_timestamp('detected_at') }} >= {{ elementary_cli.get_alerts_time_limit(days_back) }} ), - models as ( - select * from {{ ref('elementary', 'dbt_models') }} - ), - sources as ( select * from {{ ref('elementary', 'dbt_sources') }} ), - artifacts_meta as ( - select unique_id, meta from models - union all - select unique_id, meta from sources - ), - extended_alerts as ( select alerts_in_time_limit.alert_id, @@ -28,6 +18,7 @@ alerts_in_time_limit.detected_at, alerts_in_time_limit.max_loaded_at_time_ago_in_s, alerts_in_time_limit.status, + alerts_in_time_limit.normalized_status, alerts_in_time_limit.error, alerts_in_time_limit.unique_id, {# Currently alert_class_id equals to unique_id - might change in the future so we return both #} @@ -36,9 +27,9 @@ alerts_in_time_limit.schema_name, alerts_in_time_limit.source_name, alerts_in_time_limit.identifier, - alerts_in_time_limit.freshness_error_after, - alerts_in_time_limit.freshness_warn_after, - alerts_in_time_limit.freshness_filter, + alerts_in_time_limit.warn_after, + alerts_in_time_limit.error_after, + alerts_in_time_limit.filter, alerts_in_time_limit.tags, alerts_in_time_limit.meta, alerts_in_time_limit.owner, @@ -51,9 +42,12 @@ else suppression_status end as suppression_status, alerts_in_time_limit.sent_at, - artifacts_meta.meta as model_meta + sources.meta as model_meta, + sources.freshness_error_after, + sources.freshness_warn_after, + sources.freshness_filter from alerts_in_time_limit - left join artifacts_meta on alerts_in_time_limit.unique_id = artifacts_meta.unique_id + left join sources on alerts_in_time_limit.unique_id = sources.unique_id ) select * @@ -65,6 +59,10 @@ {% set alerts_dicts = elementary.agate_to_dicts(alerts_agate) %} {% set pending_alerts = [] %} {% for alert_dict in alerts_dicts %} + {% set error_after = alert_dict.get('error_after') %} + {% set warn_after = alert_dict.get('warn_after') %} + {% set filter = alert_dict.get('filter') %} + {% set pending_alert_dict = {'id': alert_dict.get('alert_id'), 'model_unique_id': alert_dict.get('unique_id'), 'alert_class_id': alert_dict.get('alert_class_id'), @@ -76,10 +74,11 @@ 'schema_name': alert_dict.get('schema_name'), 'source_name': alert_dict.get('source_name'), 'identifier': alert_dict.get('identifier'), - 'freshness_error_after': alert_dict.get('freshness_error_after'), - 'freshness_warn_after': alert_dict.get('freshness_warn_after'), - 'freshness_filter': alert_dict.get('freshness_filter'), + 'error_after': error_after if error_after is not none else alert_dict.get('freshness_error_after'), + 'warn_after': warn_after if warn_after is not none else alert_dict.get('freshness_warn_after'), + 'filter': filter if filter is not none else alert_dict.get('freshness_filter'), 'status': alert_dict.get('status'), + 'normalized_status': alert_dict.get('normalized_status'), 'owners': alert_dict.get('owner'), 'path': alert_dict.get('path'), 'error': alert_dict.get('error'), diff --git a/elementary/monitor/dbt_project/macros/utils/normalized_source_freshness_status.sql b/elementary/monitor/dbt_project/macros/utils/normalized_source_freshness_status.sql new file mode 100644 index 000000000..d8ebab057 --- /dev/null +++ b/elementary/monitor/dbt_project/macros/utils/normalized_source_freshness_status.sql @@ -0,0 +1,7 @@ +{% macro normalized_source_freshness_status() %} + case + when status = 'error' then 'fail' + when status = 'runtime error' then 'error' + else status + end as normalized_status +{% endmacro %} diff --git a/tests/mocks/fetchers/alerts_fetcher_mock.py b/tests/mocks/fetchers/alerts_fetcher_mock.py index ab5704115..3986d12c9 100644 --- a/tests/mocks/fetchers/alerts_fetcher_mock.py +++ b/tests/mocks/fetchers/alerts_fetcher_mock.py @@ -322,10 +322,11 @@ def query_pending_source_freshness_alerts(self, *args, **kwargs): schema_name="test_schema", source_name="source_1", identifier="identifier", - freshness_error_after="10", - freshness_warn_after="10", - freshness_filter="", + error_after="10", + warn_after="10", + filter="", status="fail", + normalized_status="fail", owners="[]", path="", error="", @@ -347,10 +348,11 @@ def query_pending_source_freshness_alerts(self, *args, **kwargs): schema_name="test_schema", source_name="source_1", identifier="identifier", - freshness_error_after="10", - freshness_warn_after="10", - freshness_filter="", + error_after="10", + warn_after="10", + filter="", status="fail", + normalized_status="fail", owners="[]", path="", error="", @@ -372,10 +374,11 @@ def query_pending_source_freshness_alerts(self, *args, **kwargs): schema_name="test_schema", source_name="source_2", identifier="identifier", - freshness_error_after="10", - freshness_warn_after="10", - freshness_filter="", + error_after="10", + warn_after="10", + filter="", status="fail", + normalized_status="fail", owners="[]", path="", error="", @@ -397,10 +400,11 @@ def query_pending_source_freshness_alerts(self, *args, **kwargs): schema_name="test_schema", source_name="source_3", identifier="identifier", - freshness_error_after="10", - freshness_warn_after="10", - freshness_filter="", + error_after="10", + warn_after="10", + filter="", status="fail", + normalized_status="fail", owners="[]", path="", error="", @@ -424,10 +428,11 @@ def query_pending_source_freshness_alerts(self, *args, **kwargs): schema_name="test_schema", source_name="source_3", identifier="identifier", - freshness_error_after="10", - freshness_warn_after="10", - freshness_filter="", + error_after="10", + warn_after="10", + filter="", status="fail", + normalized_status="fail", owners="[]", path="", error="", diff --git a/tests/unit/monitor/api/alerts/test_alert_filters.py b/tests/unit/monitor/api/alerts/test_alert_filters.py index 42d69849b..9366a12b4 100644 --- a/tests/unit/monitor/api/alerts/test_alert_filters.py +++ b/tests/unit/monitor/api/alerts/test_alert_filters.py @@ -171,14 +171,15 @@ def initial_alerts(): tags='["one", "two"]', owners='["jeff", "john"]', status="error", + normalized_status="fail", snapshotted_at="2023-08-15T12:26:06.884065+00:00", max_loaded_at="1969-12-31T00:00:00+00:00", max_loaded_at_time_ago_in_s=1692188766.884065, source_name="elementary_integration_tests", identifier="any_type_column_anomalies_validation", - freshness_error_after='{"count": null, "period": null}', - freshness_warn_after='{"count": 1, "period": "minute"}', - freshness_filter="null", + error_after='{"count": null, "period": null}', + warn_after='{"count": 1, "period": "minute"}', + filter="null", error="problemz", ), SourceFreshnessAlert( @@ -196,14 +197,15 @@ def initial_alerts(): tags='["one", "two"]', owners='["jeff", "john"]', status="warn", + normalized_status="warn", snapshotted_at="2023-08-15T12:26:06.884065+00:00", max_loaded_at="1969-12-31T00:00:00+00:00", max_loaded_at_time_ago_in_s=1692188766.884065, source_name="elementary_integration_tests", identifier="any_type_column_anomalies_validation", - freshness_error_after='{"count": null, "period": null}', - freshness_warn_after='{"count": 1, "period": "minute"}', - freshness_filter="null", + error_after='{"count": null, "period": null}', + warn_after='{"count": 1, "period": "minute"}', + filter="null", error="problemz", ), SourceFreshnessAlert( @@ -221,14 +223,15 @@ def initial_alerts(): tags='["one", "two"]', owners='["jeff", "john"]', status="runtime error", + normalized_status="error", snapshotted_at="2023-08-15T12:26:06.884065+00:00", max_loaded_at="1969-12-31T00:00:00+00:00", max_loaded_at_time_ago_in_s=1692188766.884065, source_name="elementary_integration_tests", identifier="any_type_column_anomalies_validation", - freshness_error_after='{"count": null, "period": null}', - freshness_warn_after='{"count": 1, "period": "minute"}', - freshness_filter="null", + error_after='{"count": null, "period": null}', + warn_after='{"count": 1, "period": "minute"}', + filter="null", error="problemz", ), ] From 6a8a27a76816f78806fe55f23a3c3483e0b87cd0 Mon Sep 17 00:00:00 2001 From: Noy Arie Date: Thu, 28 Sep 2023 10:49:24 +0300 Subject: [PATCH 2/2] Ele 1803 add test description for dbt sources (#1181) * query new field * add description to alert --- elementary/monitor/alerts/source_freshness.py | 20 +++++++++++++++++++ .../alerts/get_source_freshness_alerts.sql | 6 ++++-- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/elementary/monitor/alerts/source_freshness.py b/elementary/monitor/alerts/source_freshness.py index 1155b807b..3fdeb099a 100644 --- a/elementary/monitor/alerts/source_freshness.py +++ b/elementary/monitor/alerts/source_freshness.py @@ -26,6 +26,7 @@ def __init__( filter: Optional[str], path: str, error: str, + freshness_description: Optional[str] = None, **kwargs, ) -> None: super().__init__(**kwargs) @@ -49,6 +50,7 @@ def __init__( self.filter = filter self.path = path self.error = error + self.freshness_description = freshness_description self.alerts_table = SourceFreshnessAlert.TABLE_NAME def to_slack(self, is_slack_workflow: bool = False) -> SlackMessageSchema: @@ -100,6 +102,24 @@ def to_slack(self, is_slack_workflow: bool = False) -> SlackMessageSchema: ] ) + if self.freshness_description: + preview.extend( + [ + self.slack_message_builder.create_text_section_block( + "*Description*" + ), + self.slack_message_builder.create_context_block( + [self.freshness_description] + ), + ] + ) + else: + preview.append( + self.slack_message_builder.create_text_section_block( + "*Description*\n_No description_" + ) + ) + result = [] if self.status == "runtime error": result.extend( diff --git a/elementary/monitor/dbt_project/macros/alerts/get_source_freshness_alerts.sql b/elementary/monitor/dbt_project/macros/alerts/get_source_freshness_alerts.sql index 77fb58bc2..4804b06e0 100644 --- a/elementary/monitor/dbt_project/macros/alerts/get_source_freshness_alerts.sql +++ b/elementary/monitor/dbt_project/macros/alerts/get_source_freshness_alerts.sql @@ -45,7 +45,8 @@ sources.meta as model_meta, sources.freshness_error_after, sources.freshness_warn_after, - sources.freshness_filter + sources.freshness_filter, + sources.freshness_description from alerts_in_time_limit left join sources on alerts_in_time_limit.unique_id = sources.unique_id ) @@ -85,7 +86,8 @@ 'tags': alert_dict.get('tags'), 'model_meta': alert_dict.get('model_meta'), 'suppression_status': alert_dict.get('suppression_status'), - 'sent_at': alert_dict.get('sent_at') + 'sent_at': alert_dict.get('sent_at'), + 'freshness_description': alert_dict.get('freshness_description') } %} {% do pending_alerts.append(pending_alert_dict) %} {% endfor %}