Skip to content

Commit

Permalink
OSS Directory as a dagster asset (#1720)
Browse files Browse the repository at this point in the history
* oss directory setup

* updates

* Add date columns to the tables

* fix

* Adds more inline docs
  • Loading branch information
ravenac95 authored Jun 27, 2024
1 parent 325135d commit 7f07b58
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 42 deletions.
97 changes: 60 additions & 37 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ requests = "^2.31.0"
dagster-postgres = "^0.23.6"
pytest = "^8.2.1"
ruff = "^0.4.10"
oss-directory = "^0.0.2"
dagster-polars = "^0.23.10"
click = "^8.1.7"
oss-directory = "^0.2.0"


[tool.poetry.scripts]
Expand Down
1 change: 1 addition & 0 deletions warehouse/oso_dagster/assets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@
from .optimism import *
from .pgn import *
from .zora import *
from .ossd import *
99 changes: 99 additions & 0 deletions warehouse/oso_dagster/assets/ossd.py
Original file line number Diff line number Diff line change
@@ -1 +1,100 @@
from typing import cast

from dagster import (
multi_asset,
Output,
AssetOut,
AssetExecutionContext,
JsonMetadataValue,
Config,
)
from ossdirectory import fetch_data
import polars as pl
import arrow


class OSSDirectoryConfig(Config):
# By default, the oss-directory asset doesn't write anything if there aren't
# any changes from the previous materialization. This happens by comparing
# the sha of the current fetch_data and the sha stored in the metadata of
# the last materialization
force_write: bool = False


@multi_asset(
outs={
"projects": AssetOut(is_required=False, key_prefix="ossd"),
"collections": AssetOut(is_required=False, key_prefix="ossd"),
},
can_subset=True,
)
def ossdirectory_repo(context: AssetExecutionContext, config: OSSDirectoryConfig):
"""Materializes both the projects/collections from the oss-directory repo
into separate dataframe assets.
"""
data = fetch_data()

if not data.meta:
raise Exception("ossdirectory repository metadata is required")

for output in context.op_execution_context.selected_output_names:
asset_key = context.asset_key_for_output(output)
latest_materialization = context.instance.get_latest_materialization_event(
asset_key=asset_key
)

# Check if there's a previous materialization. We can choose not to add
# any data to the database
if (
latest_materialization
and latest_materialization.asset_materialization
and not config.force_write
):
repo_meta = latest_materialization.asset_materialization.metadata.get(
"repo_meta", {}
)
if repo_meta:
repo_meta = cast(JsonMetadataValue, repo_meta)
repo_meta_dict = cast(dict, repo_meta.data)
context.log.debug(
{
"message": "repo_meta",
"repo_meta": repo_meta_dict,
}
)
# The previous sha for this asset and the current sha match. No
# need to update anything
if repo_meta_dict.get("sha", "") == data.meta.sha:
context.log.info(f"no changes for {output}")
continue
committed_dt = data.meta.committed_datetime

df = pl.from_dicts(getattr(data, output))
# Add sync time and commit sha to the dataframe
df = df.with_columns(
sha=pl.lit(bytes.fromhex(data.meta.sha)),
# We need to instantiate the datetime here using this pl.datetime
# constructor due to an issue with the datetime that's returned from
# ossdirectory that has a timezone type that seems to be
# incompatible with polars.
committed_time=pl.datetime(
committed_dt.year,
committed_dt.month,
committed_dt.day,
committed_dt.hour,
committed_dt.minute,
committed_dt.second,
),
)

yield Output(
df,
output,
metadata={
"repo_meta": {
"sha": data.meta.sha,
"committed": arrow.get(data.meta.committed_datetime).isoformat(),
"authored": arrow.get(data.meta.authored_datetime).isoformat(),
}
},
)
3 changes: 2 additions & 1 deletion warehouse/oso_dagster/compile.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# ruff: noqa: E402
"""
Tool for handling dbt compilation in a docker container
Tool for handling dbt compilation in a docker container. This should not be used
locally. This is truly only for docker.
"""

import click
Expand Down
12 changes: 9 additions & 3 deletions warehouse/oso_dagster/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
from dagster import Definitions, load_assets_from_modules
from dagster_dbt import DbtCliResource
from dagster_gcp import BigQueryResource, GCSResource
from dagster_polars import PolarsBigQueryIOManager

from .constants import main_dbt_project_dir, main_dbt_manifests
from . import constants
from .schedules import schedules
from .cbt import CBTResource
from .factories import load_assets_factories_from_modules
Expand All @@ -26,14 +27,19 @@ def load_definitions():
asset_factories = load_assets_factories_from_modules([assets])
asset_defs = load_assets_from_modules([assets])

io_manager = PolarsBigQueryIOManager(project=constants.project_id)

# Each of the dbt environments needs to be setup as a resource to be used in
# the dbt assets
resources = {
"gcs": gcs,
"cbt": cbt,
"bigquery": bigquery,
"io_manager": io_manager,
}
for target in main_dbt_manifests:
for target in constants.main_dbt_manifests:
resources[f"{target}_dbt"] = DbtCliResource(
project_dir=os.fspath(main_dbt_project_dir), target=target
project_dir=os.fspath(constants.main_dbt_project_dir), target=target
)

return Definitions(
Expand Down

0 comments on commit 7f07b58

Please sign in to comment.