Skip to content

Commit

Permalink
feat: add sensor that posts gcp metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
JasperHG90 committed Mar 23, 2024
1 parent 7a74ea4 commit 1badba3
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
station_names,
)
from luchtmeetnet_ingestion.resource_definitions import env_resources, environment
from luchtmeetnet_ingestion.sensors import post_gcp_metrics

try:
__version__ = metadata.version("luchtmeetnet_ingestion")
Expand All @@ -18,4 +19,5 @@
definition = Definitions(
assets=[air_quality_data, daily_air_quality_data, station_names, air_quality_station_names],
resources=env_resources[environment],
sensors=[post_gcp_metrics],
)
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import hashlib
import os
import warnings

import pandas as pd
from dagster import (
Expand All @@ -9,14 +10,14 @@
AutoMaterializeRule,
Backoff,
DataVersion,
ExperimentalWarning,
FreshnessPolicy,
Jitter,
MultiToSingleDimensionPartitionMapping,
Output,
RetryPolicy,
asset,
)
from dagster_utils.IO.gcp_metrics import GcpMetricsResource
from luchtmeetnet_ingestion.assets import const
from luchtmeetnet_ingestion.assets.utils import get_air_quality_data_for_partition_key
from luchtmeetnet_ingestion.IO.resources import LuchtMeetNetResource
Expand All @@ -27,23 +28,7 @@
)
from pandas.util import hash_pandas_object


def post_job_success(context: AssetExecutionContext, gcp_metrics: GcpMetricsResource, value: int):
context.log.info("Posting metrics to GCP")
context.log.debug(context.run.tags)
labels = {
"job_name": context.asset_key,
"partition": context.partition_key,
"run_id": context.run_id,
"trigger_type": "asset",
"trigger_name": "test",
}
# Post metrics to GCP
gcp_metrics.post_time_series(
series_type="custom.googleapis.com/dagster/job_success",
value={"bool_value": value},
metric_labels=labels,
)
warnings.filterwarnings("ignore", category=ExperimentalWarning)


@asset(
Expand All @@ -61,7 +46,7 @@ def post_job_success(context: AssetExecutionContext, gcp_metrics: GcpMetricsReso
# backfill_policy=BackfillPolicy.multi_run(max_partitions_per_run=1),
auto_materialize_policy=AutoMaterializePolicy.eager(max_materializations_per_minute=None)
.with_rules(
AutoMaterializeRule.materialize_on_cron("0 3 * * *", all_partitions=False),
AutoMaterializeRule.materialize_on_cron("32 21 * * *", all_partitions=False),
)
.without_rules(
AutoMaterializeRule.skip_on_parent_outdated(),
Expand Down
86 changes: 86 additions & 0 deletions dags/luchtmeetnet_ingestion/src/luchtmeetnet_ingestion/sensors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import os
import typing

import pendulum
from dagster import (
DagsterRunStatus,
DefaultSensorStatus,
RunsFilter,
SensorEvaluationContext,
sensor,
)
from dagster_utils.IO.gcp_metrics import GcpMetricsResource

environment = os.getenv("ENVIRONMENT", "dev")


def post_job_success(context: SensorEvaluationContext, gcp_metrics: GcpMetricsResource, value: int):
context.log.info("Posting metrics to GCP")
context.log.debug(context.run.tags)
labels = {
"job_name": context.asset_key,
"partition": context.partition_key,
"run_id": context.run_id,
"trigger_type": "asset",
"trigger_name": "test",
}
# Post metrics to GCP
gcp_metrics.post_time_series(
series_type="custom.googleapis.com/dagster/job_success",
value={"bool_value": value},
metric_labels=labels,
)


def parse_run_trigger(tags: typing.Dict[str, str]) -> typing.Dict[str, str]:
if tags.get("dagster/backfill") is not None:
trigger_type = "backfill"
trigger_name = tags["dagster/backfill"]
elif tags.get("dagster/schedule_name") is not None:
trigger_type = "schedule"
trigger_name = tags["dagster/schedule_name"]
elif tags.get("dagster/auto_materialize"):
trigger_type = "auto_materialize"
trigger_name = "N/A"
else:
trigger_type = "manual"
trigger_name = "N/A"
return {
"trigger_type": trigger_type,
"trigger_name": trigger_name,
}


@sensor(
description="This sensor retrieves recent run records and posts metrics to GCP.",
default_status=DefaultSensorStatus.RUNNING,
)
def post_gcp_metrics(context: SensorEvaluationContext, gcp_metrics: GcpMetricsResource):
cursor = float(context.cursor) if context.cursor else float(0)
run_records = context.instance.get_run_records(
filters=RunsFilter(
statuses=[DagsterRunStatus.SUCCESS, DagsterRunStatus.FAILURE],
updated_after=pendulum.from_timestamp(cursor, tz="UTC"),
),
)
context.log.info(f"Found {len(run_records)} runs")

max_run_ts = cursor
for run_record in run_records:
labels = {
"asset_name": list(run_record.dagster_run.asset_selection)[0].to_user_string(),
"run_id": run_record.dagster_run.run_id,
"partition_key": run_record.dagster_run.tags.get("dagster/partition", "N/A"),
"environment": environment,
**parse_run_trigger(run_record.dagster_run.tags),
}
gcp_metrics.post_time_series(
series_type="custom.googleapis.com/dagster/job_success",
value={"bool_value": 1 if run_record.dagster_run.is_success else 0},
metric_labels=labels,
timestamp=run_record.end_time,
)
max_run_ts = max(max_run_ts, run_record.end_time)
if max_run_ts != cursor:
context.log.debug(f"Setting cursor to {max_run_ts}")
context.update_cursor(str(max_run_ts))
12 changes: 8 additions & 4 deletions shared/dagster_utils/src/dagster_utils/IO/gcp_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ class GcpMetricsResource(ConfigurableResource):
environment: str
project_id: str

@retry(attempts=3, seconds=5)
@retry(attempts=3, seconds=1)
def post_time_series(
self,
series_type: str,
value: typing.Union[int, float, bool],
metric_labels: typing.Dict[str, str],
timestamp: typing.Optional[float] = None,
):
client = monitoring_v3.MetricServiceClient()
project_name = f"projects/{self.project_id}"
Expand All @@ -28,9 +29,12 @@ def post_time_series(
for k, v in metric_labels.items():
series.metric.labels[k] = v
series.metric.labels["environment"] = self.environment
now = time.time()
seconds = int(now)
nanos = int((now - seconds) * 10**9)
if timestamp is None:
ts = time.time()
else:
ts = timestamp
seconds = int(ts)
nanos = int((ts - seconds) * 10**9)
interval = monitoring_v3.TimeInterval({"end_time": {"seconds": seconds, "nanos": nanos}})
point = monitoring_v3.Point({"interval": interval, "value": value})
series.points = [point]
Expand Down

0 comments on commit 1badba3

Please sign in to comment.