Skip to content

Commit

Permalink
Merge branch 'master' into ele-1812-upgrade-requirements
Browse files Browse the repository at this point in the history
  • Loading branch information
NoyaArie authored Sep 28, 2023
2 parents 041465c + 6a8a27a commit c77248a
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 61 deletions.
55 changes: 38 additions & 17 deletions elementary/monitor/alerts/source_freshness.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ def __init__(
max_loaded_at_time_ago_in_s: Optional[float],
source_name: str,
identifier: str,
freshness_error_after: str,
freshness_warn_after: str,
freshness_filter: str,
normalized_status: str,
error_after: str,
warn_after: str,
filter: Optional[str],
path: str,
error: str,
freshness_description: Optional[str] = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand All @@ -42,11 +44,13 @@ def __init__(
self.max_loaded_at_time_ago_in_s = max_loaded_at_time_ago_in_s
self.source_name = source_name
self.identifier = identifier
self.freshness_error_after = freshness_error_after
self.freshness_warn_after = freshness_warn_after
self.freshness_filter = freshness_filter
self.normalized_status = normalized_status
self.error_after = error_after
self.warn_after = warn_after
self.filter = filter
self.path = path
self.error = error
self.freshness_description = freshness_description
self.alerts_table = SourceFreshnessAlert.TABLE_NAME

def to_slack(self, is_slack_workflow: bool = False) -> SlackMessageSchema:
Expand All @@ -55,7 +59,7 @@ def to_slack(self, is_slack_workflow: bool = False) -> SlackMessageSchema:
subscribers = self.slack_message_builder.prettify_and_dedup_list(
self.subscribers or []
)
icon = self.slack_message_builder.get_slack_status_icon(self.status)
icon = self.slack_message_builder.get_slack_status_icon(self.normalized_status)

title = [
self.slack_message_builder.create_header_block(
Expand All @@ -68,7 +72,7 @@ def to_slack(self, is_slack_workflow: bool = False) -> SlackMessageSchema:
self.slack_message_builder.create_context_block(
[
f"*Source:* {self.source_name}.{self.identifier} |",
f"*Status:* {self.status}",
f"*Status:* {self.normalized_status}",
],
),
self.slack_message_builder.create_context_block(
Expand All @@ -84,7 +88,7 @@ def to_slack(self, is_slack_workflow: bool = False) -> SlackMessageSchema:
self.slack_message_builder.create_context_block(
[
f"*Source:* {self.source_name}.{self.identifier} |",
f"*Status:* {self.status} |",
f"*Status:* {self.normalized_status} |",
f"*{self.detected_at_str}*",
],
),
Expand All @@ -98,6 +102,24 @@ def to_slack(self, is_slack_workflow: bool = False) -> SlackMessageSchema:
]
)

if self.freshness_description:
preview.extend(
[
self.slack_message_builder.create_text_section_block(
"*Description*"
),
self.slack_message_builder.create_context_block(
[self.freshness_description]
),
]
)
else:
preview.append(
self.slack_message_builder.create_text_section_block(
"*Description*\n_No description_"
)
)

result = []
if self.status == "runtime error":
result.extend(
Expand All @@ -123,32 +145,31 @@ def to_slack(self, is_slack_workflow: bool = False) -> SlackMessageSchema:
)

configuration = []
if self.freshness_error_after:

if self.error_after:
configuration.append(
self.slack_message_builder.create_context_block(["*Error after*"])
)
configuration.append(
self.slack_message_builder.create_text_section_block(
f"`{self.freshness_error_after}`"
f"`{self.error_after}`"
)
)
if self.freshness_warn_after:
if self.warn_after:
configuration.append(
self.slack_message_builder.create_context_block(["*Warn after*"])
)
configuration.append(
self.slack_message_builder.create_text_section_block(
f"`{self.freshness_warn_after}`"
f"`{self.warn_after}`"
)
)
if self.freshness_filter:
if self.filter:
configuration.append(
self.slack_message_builder.create_context_block(["*Filter*"])
)
configuration.append(
self.slack_message_builder.create_text_section_block(
f"`{self.freshness_filter}`"
)
self.slack_message_builder.create_text_section_block(f"`{self.filter}`")
)
if self.path:
configuration.append(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,14 @@
-- depends_on: {{ ref('alerts_source_freshness') }}
{% set select_pending_alerts_query %}
with alerts_in_time_limit as (
select * from {{ ref('alerts_source_freshness') }}
select *, {{ elementary_cli.normalized_source_freshness_status()}} from {{ ref('alerts_source_freshness') }}
where {{ elementary.edr_cast_as_timestamp('detected_at') }} >= {{ elementary_cli.get_alerts_time_limit(days_back) }}
),

models as (
select * from {{ ref('elementary', 'dbt_models') }}
),

sources as (
select * from {{ ref('elementary', 'dbt_sources') }}
),

artifacts_meta as (
select unique_id, meta from models
union all
select unique_id, meta from sources
),

extended_alerts as (
select
alerts_in_time_limit.alert_id,
Expand All @@ -28,6 +18,7 @@
alerts_in_time_limit.detected_at,
alerts_in_time_limit.max_loaded_at_time_ago_in_s,
alerts_in_time_limit.status,
alerts_in_time_limit.normalized_status,
alerts_in_time_limit.error,
alerts_in_time_limit.unique_id,
{# Currently alert_class_id equals to unique_id - might change in the future so we return both #}
Expand All @@ -36,9 +27,9 @@
alerts_in_time_limit.schema_name,
alerts_in_time_limit.source_name,
alerts_in_time_limit.identifier,
alerts_in_time_limit.freshness_error_after,
alerts_in_time_limit.freshness_warn_after,
alerts_in_time_limit.freshness_filter,
alerts_in_time_limit.warn_after,
alerts_in_time_limit.error_after,
alerts_in_time_limit.filter,
alerts_in_time_limit.tags,
alerts_in_time_limit.meta,
alerts_in_time_limit.owner,
Expand All @@ -51,9 +42,13 @@
else suppression_status
end as suppression_status,
alerts_in_time_limit.sent_at,
artifacts_meta.meta as model_meta
sources.meta as model_meta,
sources.freshness_error_after,
sources.freshness_warn_after,
sources.freshness_filter,
sources.freshness_description
from alerts_in_time_limit
left join artifacts_meta on alerts_in_time_limit.unique_id = artifacts_meta.unique_id
left join sources on alerts_in_time_limit.unique_id = sources.unique_id
)

select *
Expand All @@ -65,6 +60,10 @@
{% set alerts_dicts = elementary.agate_to_dicts(alerts_agate) %}
{% set pending_alerts = [] %}
{% for alert_dict in alerts_dicts %}
{% set error_after = alert_dict.get('error_after') %}
{% set warn_after = alert_dict.get('warn_after') %}
{% set filter = alert_dict.get('filter') %}

{% set pending_alert_dict = {'id': alert_dict.get('alert_id'),
'model_unique_id': alert_dict.get('unique_id'),
'alert_class_id': alert_dict.get('alert_class_id'),
Expand All @@ -76,17 +75,19 @@
'schema_name': alert_dict.get('schema_name'),
'source_name': alert_dict.get('source_name'),
'identifier': alert_dict.get('identifier'),
'freshness_error_after': alert_dict.get('freshness_error_after'),
'freshness_warn_after': alert_dict.get('freshness_warn_after'),
'freshness_filter': alert_dict.get('freshness_filter'),
'error_after': error_after if error_after is not none else alert_dict.get('freshness_error_after'),
'warn_after': warn_after if warn_after is not none else alert_dict.get('freshness_warn_after'),
'filter': filter if filter is not none else alert_dict.get('freshness_filter'),
'status': alert_dict.get('status'),
'normalized_status': alert_dict.get('normalized_status'),
'owners': alert_dict.get('owner'),
'path': alert_dict.get('path'),
'error': alert_dict.get('error'),
'tags': alert_dict.get('tags'),
'model_meta': alert_dict.get('model_meta'),
'suppression_status': alert_dict.get('suppression_status'),
'sent_at': alert_dict.get('sent_at')
'sent_at': alert_dict.get('sent_at'),
'freshness_description': alert_dict.get('freshness_description')
} %}
{% do pending_alerts.append(pending_alert_dict) %}
{% endfor %}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{% macro normalized_source_freshness_status() %}
case
when status = 'error' then 'fail'
when status = 'runtime error' then 'error'
else status
end as normalized_status
{% endmacro %}
35 changes: 20 additions & 15 deletions tests/mocks/fetchers/alerts_fetcher_mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,10 +322,11 @@ def query_pending_source_freshness_alerts(self, *args, **kwargs):
schema_name="test_schema",
source_name="source_1",
identifier="identifier",
freshness_error_after="10",
freshness_warn_after="10",
freshness_filter="",
error_after="10",
warn_after="10",
filter="",
status="fail",
normalized_status="fail",
owners="[]",
path="",
error="",
Expand All @@ -347,10 +348,11 @@ def query_pending_source_freshness_alerts(self, *args, **kwargs):
schema_name="test_schema",
source_name="source_1",
identifier="identifier",
freshness_error_after="10",
freshness_warn_after="10",
freshness_filter="",
error_after="10",
warn_after="10",
filter="",
status="fail",
normalized_status="fail",
owners="[]",
path="",
error="",
Expand All @@ -372,10 +374,11 @@ def query_pending_source_freshness_alerts(self, *args, **kwargs):
schema_name="test_schema",
source_name="source_2",
identifier="identifier",
freshness_error_after="10",
freshness_warn_after="10",
freshness_filter="",
error_after="10",
warn_after="10",
filter="",
status="fail",
normalized_status="fail",
owners="[]",
path="",
error="",
Expand All @@ -397,10 +400,11 @@ def query_pending_source_freshness_alerts(self, *args, **kwargs):
schema_name="test_schema",
source_name="source_3",
identifier="identifier",
freshness_error_after="10",
freshness_warn_after="10",
freshness_filter="",
error_after="10",
warn_after="10",
filter="",
status="fail",
normalized_status="fail",
owners="[]",
path="",
error="",
Expand All @@ -424,10 +428,11 @@ def query_pending_source_freshness_alerts(self, *args, **kwargs):
schema_name="test_schema",
source_name="source_3",
identifier="identifier",
freshness_error_after="10",
freshness_warn_after="10",
freshness_filter="",
error_after="10",
warn_after="10",
filter="",
status="fail",
normalized_status="fail",
owners="[]",
path="",
error="",
Expand Down
21 changes: 12 additions & 9 deletions tests/unit/monitor/api/alerts/test_alert_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,15 @@ def initial_alerts():
tags='["one", "two"]',
owners='["jeff", "john"]',
status="error",
normalized_status="fail",
snapshotted_at="2023-08-15T12:26:06.884065+00:00",
max_loaded_at="1969-12-31T00:00:00+00:00",
max_loaded_at_time_ago_in_s=1692188766.884065,
source_name="elementary_integration_tests",
identifier="any_type_column_anomalies_validation",
freshness_error_after='{"count": null, "period": null}',
freshness_warn_after='{"count": 1, "period": "minute"}',
freshness_filter="null",
error_after='{"count": null, "period": null}',
warn_after='{"count": 1, "period": "minute"}',
filter="null",
error="problemz",
),
SourceFreshnessAlert(
Expand All @@ -196,14 +197,15 @@ def initial_alerts():
tags='["one", "two"]',
owners='["jeff", "john"]',
status="warn",
normalized_status="warn",
snapshotted_at="2023-08-15T12:26:06.884065+00:00",
max_loaded_at="1969-12-31T00:00:00+00:00",
max_loaded_at_time_ago_in_s=1692188766.884065,
source_name="elementary_integration_tests",
identifier="any_type_column_anomalies_validation",
freshness_error_after='{"count": null, "period": null}',
freshness_warn_after='{"count": 1, "period": "minute"}',
freshness_filter="null",
error_after='{"count": null, "period": null}',
warn_after='{"count": 1, "period": "minute"}',
filter="null",
error="problemz",
),
SourceFreshnessAlert(
Expand All @@ -221,14 +223,15 @@ def initial_alerts():
tags='["one", "two"]',
owners='["jeff", "john"]',
status="runtime error",
normalized_status="error",
snapshotted_at="2023-08-15T12:26:06.884065+00:00",
max_loaded_at="1969-12-31T00:00:00+00:00",
max_loaded_at_time_ago_in_s=1692188766.884065,
source_name="elementary_integration_tests",
identifier="any_type_column_anomalies_validation",
freshness_error_after='{"count": null, "period": null}',
freshness_warn_after='{"count": 1, "period": "minute"}',
freshness_filter="null",
error_after='{"count": null, "period": null}',
warn_after='{"count": 1, "period": "minute"}',
filter="null",
error="problemz",
),
]
Expand Down

0 comments on commit c77248a

Please sign in to comment.