Skip to content

Commit

Permalink
Goldsky Backfill and Automated Block number data checks (#1551)
Browse files Browse the repository at this point in the history
* goldsky backfills and cleanup

* Working backfill job

* Adds automated data check for block number
  • Loading branch information
ravenac95 authored May 28, 2024
1 parent 3a3b690 commit 0a79d92
Show file tree
Hide file tree
Showing 13 changed files with 582 additions and 140 deletions.
25 changes: 20 additions & 5 deletions warehouse/oso_dagster/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@
from dagster_dbt import DbtCliResource, dbt_assets, DagsterDbtTranslator
from google.cloud.bigquery.schema import SchemaField
from .constants import main_dbt_manifests, main_dbt_project_dir
from .goldsky import (
from .factories.goldsky import (
GoldskyConfig,
goldsky_asset,
traces_checks,
transactions_checks,
blocks_checks,
)
from .factories import interval_gcs_import_asset, SourceMode, Interval, IntervalGCSAsset

Expand Down Expand Up @@ -105,7 +108,7 @@ def _generated_dbt_assets(context: AssetExecutionContext, **kwargs):
destination_dataset_name="superchain",
partition_column_name="timestamp",
partition_column_transform=lambda c: f"TIMESTAMP_SECONDS(`{c}`)",
# uncomment the following value to test
checks=[blocks_checks()],
),
)

Expand All @@ -121,7 +124,7 @@ def _generated_dbt_assets(context: AssetExecutionContext, **kwargs):
partition_column_name="block_timestamp",
partition_column_transform=lambda c: f"TIMESTAMP_SECONDS(`{c}`)",
schema_overrides=[SchemaField(name="value", field_type="BYTES")],
# uncomment the following value to test
checks=[transactions_checks()],
),
)

Expand All @@ -136,8 +139,7 @@ def _generated_dbt_assets(context: AssetExecutionContext, **kwargs):
destination_dataset_name="superchain",
partition_column_name="block_timestamp",
partition_column_transform=lambda c: f"TIMESTAMP_SECONDS(`{c}`)",
# uncomment the following value to test
# max_objects_to_load=2,
checks=[traces_checks()],
),
)

Expand All @@ -152,6 +154,7 @@ def _generated_dbt_assets(context: AssetExecutionContext, **kwargs):
destination_dataset_name="superchain",
partition_column_name="timestamp",
partition_column_transform=lambda c: f"TIMESTAMP_SECONDS(`{c}`)",
checks=[blocks_checks()],
# uncomment the following value to test
# max_objects_to_load=1,
),
Expand All @@ -169,6 +172,7 @@ def _generated_dbt_assets(context: AssetExecutionContext, **kwargs):
partition_column_name="block_timestamp",
partition_column_transform=lambda c: f"TIMESTAMP_SECONDS(`{c}`)",
schema_overrides=[SchemaField(name="value", field_type="BYTES")],
checks=[transactions_checks()],
# uncomment the following value to test
# max_objects_to_load=1,
),
Expand All @@ -185,6 +189,7 @@ def _generated_dbt_assets(context: AssetExecutionContext, **kwargs):
destination_dataset_name="superchain",
partition_column_name="block_timestamp",
partition_column_transform=lambda c: f"TIMESTAMP_SECONDS(`{c}`)",
checks=[traces_checks()],
# uncomment the following value to test
# max_objects_to_load=1,
),
Expand All @@ -201,6 +206,7 @@ def _generated_dbt_assets(context: AssetExecutionContext, **kwargs):
destination_dataset_name="superchain",
partition_column_name="timestamp",
partition_column_transform=lambda c: f"TIMESTAMP_SECONDS(`{c}`)",
checks=[blocks_checks()],
# uncomment the following value to test
# max_objects_to_load=1,
),
Expand All @@ -218,6 +224,7 @@ def _generated_dbt_assets(context: AssetExecutionContext, **kwargs):
partition_column_name="block_timestamp",
partition_column_transform=lambda c: f"TIMESTAMP_SECONDS(`{c}`)",
schema_overrides=[SchemaField(name="value", field_type="BYTES")],
checks=[transactions_checks()],
# uncomment the following value to test
# max_objects_to_load=1,
),
Expand All @@ -234,6 +241,7 @@ def _generated_dbt_assets(context: AssetExecutionContext, **kwargs):
destination_dataset_name="superchain",
partition_column_name="block_timestamp",
partition_column_transform=lambda c: f"TIMESTAMP_SECONDS(`{c}`)",
checks=[traces_checks()],
# uncomment the following value to test
# max_objects_to_load=1,
),
Expand All @@ -250,6 +258,7 @@ def _generated_dbt_assets(context: AssetExecutionContext, **kwargs):
destination_dataset_name="superchain",
partition_column_name="block_timestamp",
dedupe_model="optimism_dedupe.sql",
checks=[traces_checks()],
# uncomment the following value to test
# max_objects_to_load=2000,
),
Expand All @@ -266,6 +275,7 @@ def _generated_dbt_assets(context: AssetExecutionContext, **kwargs):
destination_dataset_name="superchain",
partition_column_name="timestamp",
partition_column_transform=lambda c: f"TIMESTAMP_SECONDS(`{c}`)",
checks=[blocks_checks()],
# uncomment the following value to test
# max_objects_to_load=1,
),
Expand All @@ -283,6 +293,7 @@ def _generated_dbt_assets(context: AssetExecutionContext, **kwargs):
partition_column_name="block_timestamp",
partition_column_transform=lambda c: f"TIMESTAMP_SECONDS(`{c}`)",
schema_overrides=[SchemaField(name="value", field_type="BYTES")],
checks=[transactions_checks()],
# uncomment the following value to test
# max_objects_to_load=1,
),
Expand All @@ -299,6 +310,7 @@ def _generated_dbt_assets(context: AssetExecutionContext, **kwargs):
destination_dataset_name="superchain",
partition_column_name="block_timestamp",
partition_column_transform=lambda c: f"TIMESTAMP_SECONDS(`{c}`)",
checks=[traces_checks()],
# uncomment the following value to test
# max_objects_to_load=1,
),
Expand All @@ -315,6 +327,7 @@ def _generated_dbt_assets(context: AssetExecutionContext, **kwargs):
destination_dataset_name="superchain",
partition_column_name="timestamp",
partition_column_transform=lambda c: f"TIMESTAMP_SECONDS(`{c}`)",
checks=[blocks_checks()],
# uncomment the following value to test
# max_objects_to_load=1,
),
Expand All @@ -332,6 +345,7 @@ def _generated_dbt_assets(context: AssetExecutionContext, **kwargs):
partition_column_name="block_timestamp",
partition_column_transform=lambda c: f"TIMESTAMP_SECONDS(`{c}`)",
schema_overrides=[SchemaField(name="value", field_type="BYTES")],
checks=[transactions_checks()],
# uncomment the following value to test
# max_objects_to_load=1,
),
Expand All @@ -348,6 +362,7 @@ def _generated_dbt_assets(context: AssetExecutionContext, **kwargs):
destination_dataset_name="superchain",
partition_column_name="block_timestamp",
partition_column_transform=lambda c: f"TIMESTAMP_SECONDS(`{c}`)",
checks=[traces_checks()],
# uncomment the following value to test
# max_objects_to_load=1,
),
Expand Down
25 changes: 21 additions & 4 deletions warehouse/oso_dagster/cbt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,31 @@ def __init__(
search_paths: List[str],
):
self.bigquery = bigquery
search_paths.append(
self.search_paths = [
os.path.join(os.path.abspath(os.path.dirname(__file__)), "operations"),
)
loader = FileSystemLoader(search_paths)
]
self.add_search_paths(search_paths)

self.log = log
self.load_env()

def load_env(self):
loader = FileSystemLoader(self.search_paths)
self.env = Environment(
loader=loader,
)
self.log = log

def add_search_paths(self, search_paths: List[str]):
for p in search_paths:
if not p in self.search_paths:
self.search_paths.append(p)
self.load_env()

def query(self, model_file: str, timeout: float = 300, **vars):
with self.bigquery.get_client() as client:
rendered = self.render_model(model_file, **vars)
job = client.query(rendered)
return job.result()

def transform(
self,
Expand Down
1 change: 1 addition & 0 deletions warehouse/oso_dagster/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pathlib

import requests
from dagster import DefaultSensorStatus
from dagster_dbt import DbtCliResource

main_dbt_project_dir = Path(__file__).joinpath("..", "..", "..").resolve()
Expand Down
1 change: 1 addition & 0 deletions warehouse/oso_dagster/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def load_definitions():
assets=asset_defs + asset_factories.assets,
schedules=schedules,
jobs=asset_factories.jobs,
asset_checks=asset_factories.checks,
sensors=asset_factories.sensors,
resources=resources,
)
Expand Down
14 changes: 12 additions & 2 deletions warehouse/oso_dagster/factories/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@
from typing import List
from dataclasses import dataclass, field

from dagster import SensorDefinition, AssetsDefinition, JobDefinition
from dagster import (
SensorDefinition,
AssetsDefinition,
JobDefinition,
AssetChecksDefinition,
)


class GenericGCSAsset:
Expand All @@ -18,6 +23,7 @@ class AssetFactoryResponse:
assets: List[AssetsDefinition]
sensors: List[SensorDefinition] = field(default_factory=lambda: [])
jobs: List[JobDefinition] = field(default_factory=lambda: [])
checks: List[AssetChecksDefinition] = field(default_factory=lambda: [])


def load_assets_factories_from_modules(
Expand All @@ -26,10 +32,14 @@ def load_assets_factories_from_modules(
assets: List[AssetsDefinition] = []
sensors: List[SensorDefinition] = []
jobs: List[JobDefinition] = []
checks: List[AssetChecksDefinition] = []
for module in modules:
for _, obj in module.__dict__.items():
if type(obj) == AssetFactoryResponse:
assets.extend(obj.assets)
sensors.extend(obj.sensors)
jobs.extend(obj.jobs)
return AssetFactoryResponse(assets=assets, sensors=sensors, jobs=jobs)
checks.extend(obj.checks)
return AssetFactoryResponse(
assets=assets, sensors=sensors, jobs=jobs, checks=checks
)
8 changes: 5 additions & 3 deletions warehouse/oso_dagster/factories/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,11 @@ def gcs_asset(
}
)

asset_config = config

@op(name=f"{config.name}_clean_up_op")
def gcs_clean_up_op(context: OpExecutionContext, config: dict):
context.log.info(f"Running clean up for {key}")
context.log.info(f"Running clean up for {asset_config.name}")
print(config)

@job(name=f"{config.name}_clean_up_job")
Expand All @@ -209,7 +211,7 @@ def gcs_clean_up_job():
asset_key=gcs_asset.key,
name=f"{config.name}_clean_up_sensor",
job=gcs_clean_up_job,
default_status=DefaultSensorStatus.RUNNING,
default_status=DefaultSensorStatus.STOPPED,
)
def gcs_clean_up_sensor(
context: SensorEvaluationContext, gcs: GCSResource, asset_event: EventLogEntry
Expand All @@ -220,7 +222,7 @@ def gcs_clean_up_sensor(
run_config=RunConfig(
ops={
f"{config.name}_clean_up_op": {
"config": {"asset_event": asset_event}
"op_config": {"asset_event": asset_event}
}
}
),
Expand Down
3 changes: 3 additions & 0 deletions warehouse/oso_dagster/factories/goldsky/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .assets import *
from .dask import *
from .checks import *
Loading

0 comments on commit 0a79d92

Please sign in to comment.