diff --git a/poetry.lock b/poetry.lock index ae6ead7a..d90d81e8 100644 --- a/poetry.lock +++ b/poetry.lock @@ -722,13 +722,13 @@ files = [ [[package]] name = "dagster" -version = "1.7.9" +version = "1.7.10" description = "Dagster is an orchestration platform for the development, production, and observation of data assets." optional = false python-versions = "<3.13,>=3.8" files = [ - {file = "dagster-1.7.9-py3-none-any.whl", hash = "sha256:4c4541cebec5084e44dd5a85ab2f2e38407e18e1efcf82c3f4f2dc3c41501155"}, - {file = "dagster-1.7.9.tar.gz", hash = "sha256:91be5523d9721fc0ff27ef49f9d392f12966f787c6208a5fa3ceb3142cc4493c"}, + {file = "dagster-1.7.10-py3-none-any.whl", hash = "sha256:8ae23b45b9ed7b7598e4b0150c23cf86e76392a8db26028d1af522dac03d6321"}, + {file = "dagster-1.7.10.tar.gz", hash = "sha256:5e4ae3307f17584a5fa51c1aef94ff599cd3ace9ebcf0e047d21956d9035080f"}, ] [package.dependencies] @@ -736,7 +736,7 @@ alembic = ">=1.2.1,<1.6.3 || >1.6.3,<1.7.0 || >1.7.0,<1.11.0 || >1.11.0" click = ">=5.0" coloredlogs = ">=6.1,<=14.0" croniter = ">=0.3.34" -dagster-pipes = "1.7.9" +dagster-pipes = "1.7.10" docstring-parser = "*" filelock = "*" grpcio = ">=1.44.0" @@ -774,17 +774,17 @@ test = ["buildkite-test-collector", "docker", "fsspec (<2024.5.0)", "grpcio-tool [[package]] name = "dagster-dbt" -version = "0.23.9" +version = "0.23.10" description = "A Dagster integration for dbt" optional = false python-versions = "<3.13,>=3.8" files = [ - {file = "dagster-dbt-0.23.9.tar.gz", hash = "sha256:bca9db6590da40b1c9722169ad0eab459209999fb880f035a1c78a9ed4b16ba5"}, - {file = "dagster_dbt-0.23.9-py3-none-any.whl", hash = "sha256:38bc352899503b1ca5f6d5da85f2eaa28295d7fd484b98b97724afccf899d92b"}, + {file = "dagster-dbt-0.23.10.tar.gz", hash = "sha256:ba666146b0dc7219b72216d6255f3b7f2643c26a58f68352a939be9282a0ca07"}, + {file = "dagster_dbt-0.23.10-py3-none-any.whl", hash = "sha256:04090bab492398c7752983d3c5173f4af98b37d79b174c8914c28e5a45c9052d"}, ] [package.dependencies] -dagster = "1.7.9" +dagster = "1.7.10" dbt-core = ">=1.6,<1.9" Jinja2 = "*" networkx = "*" @@ -800,18 +800,18 @@ test = ["dagster-duckdb", "dagster-duckdb-pandas", "dbt-duckdb"] [[package]] name = "dagster-gcp" -version = "0.23.9" +version = "0.23.10" description = "Package for GCP-specific Dagster framework op and resource components." optional = false python-versions = "<3.13,>=3.8" files = [ - {file = "dagster-gcp-0.23.9.tar.gz", hash = "sha256:be9acc411bbc818f8f580c9c8ed9757aa0e98bcb442dd18b77014324d24437a2"}, - {file = "dagster_gcp-0.23.9-py3-none-any.whl", hash = "sha256:205c80e2936e51dc105ad8b3c5d87c2f618a6770eb2937226351aefbe4fb258b"}, + {file = "dagster-gcp-0.23.10.tar.gz", hash = "sha256:00efd9f3326ffe76b29eb757bcb5b05bb7d3b7fb271586df6f611b0f8083911b"}, + {file = "dagster_gcp-0.23.10-py3-none-any.whl", hash = "sha256:af41377e77227d38c51d626d769a80ebc420149d1eabdc0df9baba02522357d1"}, ] [package.dependencies] -dagster = "1.7.9" -dagster-pandas = "0.23.9" +dagster = "1.7.10" +dagster-pandas = "0.23.10" db-dtypes = "*" google-api-python-client = "*" google-cloud-bigquery = "*" @@ -823,17 +823,17 @@ pyarrow = ["pyarrow"] [[package]] name = "dagster-graphql" -version = "1.7.9" +version = "1.7.10" description = "The GraphQL frontend to python dagster." optional = false python-versions = "<3.13,>=3.8" files = [ - {file = "dagster-graphql-1.7.9.tar.gz", hash = "sha256:ca2f37a9d801f25d59ff10e6f18f3d657a6f228797a9bc3f6be0a4cc8d174af3"}, - {file = "dagster_graphql-1.7.9-py3-none-any.whl", hash = "sha256:99880e9a451e208ab6f180b5fdbdb5276e74f6c19100399411a1b9908b14aa13"}, + {file = "dagster-graphql-1.7.10.tar.gz", hash = "sha256:bbb771f7e7b4676bc584c150f99841949f6bcd2869753ee6122958537599cf54"}, + {file = "dagster_graphql-1.7.10-py3-none-any.whl", hash = "sha256:fcfd47e6488880c4e38dc26be11a9214b2044aa7f5a2713e4bbb2d60c133c094"}, ] [package.dependencies] -dagster = "1.7.9" +dagster = "1.7.10" gql = {version = ">=3,<4", extras = ["requests"]} graphene = ">=3,<4" requests = "*" @@ -841,60 +841,83 @@ starlette = "*" [[package]] name = "dagster-pandas" -version = "0.23.9" +version = "0.23.10" description = "Utilities and examples for working with pandas and dagster, an opinionated framework for expressing data pipelines" optional = false python-versions = "<3.13,>=3.8" files = [ - {file = "dagster-pandas-0.23.9.tar.gz", hash = "sha256:dfcdeb4d0e81599cf74449a3865162cdeaea2fbf7f46f8495faa57bf14e60cb5"}, - {file = "dagster_pandas-0.23.9-py3-none-any.whl", hash = "sha256:8834b07f17c9480f786e30cd41fbd2b0c874ae5f5e3f1b7ac6c8ca8ead2b7390"}, + {file = "dagster-pandas-0.23.10.tar.gz", hash = "sha256:02335276dad84ed85f0a09a853936795de3072d74903063fd503075e3820efaa"}, + {file = "dagster_pandas-0.23.10-py3-none-any.whl", hash = "sha256:7db9b8c7bce4a65eac287581c318dc77620b80cb3be40da8d69607b866e915fa"}, ] [package.dependencies] -dagster = "1.7.9" +dagster = "1.7.10" pandas = "*" [[package]] name = "dagster-pipes" -version = "1.7.9" +version = "1.7.10" description = "Toolkit for Dagster integrations with transform logic outside of Dagster" optional = false python-versions = "<3.13,>=3.8" files = [ - {file = "dagster-pipes-1.7.9.tar.gz", hash = "sha256:4d18fd71b065749d4ac3f20aa9ccba5fb0a54227f6c4eb0828a4ba00a182cf39"}, - {file = "dagster_pipes-1.7.9-py3-none-any.whl", hash = "sha256:91abe159d6e233653009f3e54eb6a18ae76328c6b76b172234b310286f75cb99"}, + {file = "dagster-pipes-1.7.10.tar.gz", hash = "sha256:7353ba30a815b4d34b7e57345a326646fc6f10e3cc672b50f5f193a58c24cc17"}, + {file = "dagster_pipes-1.7.10-py3-none-any.whl", hash = "sha256:1dec32e58efac56e584e865ba34ba8acc03d547af4866d034200c7abe7626677"}, ] +[[package]] +name = "dagster-polars" +version = "0.23.10" +description = "Dagster integration library for Polars" +optional = false +python-versions = "<3.13,>=3.8" +files = [ + {file = "dagster-polars-0.23.10.tar.gz", hash = "sha256:5cd087fd13084017a49134dd7e1e801d2f083990fc8b2086dd5fa2e826f5799c"}, + {file = "dagster_polars-0.23.10-py3-none-any.whl", hash = "sha256:8b73ee062dd2bddc66f39f4a5c54131607e5b45b36f3bc4e863e69a5cce61e24"}, +] + +[package.dependencies] +dagster = "1.7.10" +polars = ">=0.20.0" +pyarrow = ">=8.0.0" +typing-extensions = ">=4.7.0" +universal-pathlib = ">=0.1.4" + +[package.extras] +deltalake = ["deltalake (>=0.15.0)"] +gcp = ["dagster-gcp (>=0.19.5)"] +test = ["deepdiff (>=6.3.0)", "hypothesis[zoneinfo] (>=6.89.0)", "pytest (>=7.3.1,<8.0.0)", "pytest-cases (>=3.6.14)"] + [[package]] name = "dagster-postgres" -version = "0.23.9" +version = "0.23.10" description = "A Dagster integration for postgres" optional = false python-versions = "<3.13,>=3.8" files = [ - {file = "dagster-postgres-0.23.9.tar.gz", hash = "sha256:5ada17de5c19d0df7af09e310ea274f88949e3e57f90bb9fb2651eddc48b675a"}, - {file = "dagster_postgres-0.23.9-py3-none-any.whl", hash = "sha256:c0bc510d92e633505f928278157986d37de6eb0d5233dc9e9e00d922b5f344f7"}, + {file = "dagster-postgres-0.23.10.tar.gz", hash = "sha256:cf1148d9d5eb7936133d16fcdd33f53ac30ffe71d2aa4021b3b8db66279ebc0c"}, + {file = "dagster_postgres-0.23.10-py3-none-any.whl", hash = "sha256:7a97eaf7263c02e3c9c4f6d19f8629495ee52e5fa3a7cae6a3aa40a79f697b10"}, ] [package.dependencies] -dagster = "1.7.9" +dagster = "1.7.10" psycopg2-binary = "*" [[package]] name = "dagster-webserver" -version = "1.7.9" +version = "1.7.10" description = "Web UI for dagster." optional = false python-versions = "<3.13,>=3.8" files = [ - {file = "dagster-webserver-1.7.9.tar.gz", hash = "sha256:bc9a493bfc48b103bca32bb94258480de4e404ae36c93bb37d9901629ca24675"}, - {file = "dagster_webserver-1.7.9-py3-none-any.whl", hash = "sha256:fc11480ec039746c59210bc9f466e41fb3462e04baef75fe7c7df23fdbc0309e"}, + {file = "dagster-webserver-1.7.10.tar.gz", hash = "sha256:f2f147f1d7fc1ebc82e42677070b2a65f2cb095d649bc284eb07da66464b1171"}, + {file = "dagster_webserver-1.7.10-py3-none-any.whl", hash = "sha256:9380d1eaf8108be340c9d801eecd230a55d0ab0254435abe62afe902262b0fbd"}, ] [package.dependencies] click = ">=7.0,<9.0" -dagster = "1.7.9" -dagster-graphql = "1.7.9" +dagster = "1.7.10" +dagster-graphql = "1.7.10" starlette = "!=0.36.0" uvicorn = {version = "*", extras = ["standard"]} @@ -3111,13 +3134,13 @@ files = [ [[package]] name = "oss-directory" -version = "0.0.2" +version = "0.2.0" description = "Open source software directory" optional = false python-versions = "<3.13,>=3.11" files = [ - {file = "oss_directory-0.0.2-py3-none-any.whl", hash = "sha256:52c74e0a6654ab7f2376898ef65de9a9d9a36b4965b0438b34440cb4bb419e4a"}, - {file = "oss_directory-0.0.2.tar.gz", hash = "sha256:34c760faba708568797f4ee04983b63d4e58f135cd4399067993b0d6b4fb0c28"}, + {file = "oss_directory-0.2.0-py3-none-any.whl", hash = "sha256:6c1127c8e8e929a09306c8b6a87af2a52bf0a7c689ded5239e41f5aa79841e42"}, + {file = "oss_directory-0.2.0.tar.gz", hash = "sha256:b0b4f82b5bdccdd3c083c46f6530c082dd61fb8829231f6eeed74dcb55ca4481"}, ] [package.dependencies] @@ -5621,4 +5644,4 @@ test = ["big-O", "importlib-resources", "jaraco.functools", "jaraco.itertools", [metadata] lock-version = "2.0" python-versions = "^3.12,<3.13" -content-hash = "2c7aafddb4e82908988daf5065d1cd46061a98d66f2ea8758ce684e3a507b8e1" +content-hash = "c272d0b98f287a5f847057d20107a1c6ac727584ddcdedfe1e5dab5b3d772423" diff --git a/pyproject.toml b/pyproject.toml index b0b86b89..4fe76a53 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,7 +42,9 @@ requests = "^2.31.0" dagster-postgres = "^0.23.6" pytest = "^8.2.1" ruff = "^0.4.10" -oss-directory = "^0.0.2" +dagster-polars = "^0.23.10" +click = "^8.1.7" +oss-directory = "^0.2.0" [tool.poetry.scripts] diff --git a/warehouse/oso_dagster/assets/__init__.py b/warehouse/oso_dagster/assets/__init__.py index 1e506331..099062b3 100644 --- a/warehouse/oso_dagster/assets/__init__.py +++ b/warehouse/oso_dagster/assets/__init__.py @@ -10,3 +10,4 @@ from .optimism import * from .pgn import * from .zora import * +from .ossd import * diff --git a/warehouse/oso_dagster/assets/ossd.py b/warehouse/oso_dagster/assets/ossd.py index 8b137891..5b58db3a 100644 --- a/warehouse/oso_dagster/assets/ossd.py +++ b/warehouse/oso_dagster/assets/ossd.py @@ -1 +1,100 @@ +from typing import cast +from dagster import ( + multi_asset, + Output, + AssetOut, + AssetExecutionContext, + JsonMetadataValue, + Config, +) +from ossdirectory import fetch_data +import polars as pl +import arrow + + +class OSSDirectoryConfig(Config): + # By default, the oss-directory asset doesn't write anything if there aren't + # any changes from the previous materialization. This happens by comparing + # the sha of the current fetch_data and the sha stored in the metadata of + # the last materialization + force_write: bool = False + + +@multi_asset( + outs={ + "projects": AssetOut(is_required=False, key_prefix="ossd"), + "collections": AssetOut(is_required=False, key_prefix="ossd"), + }, + can_subset=True, +) +def ossdirectory_repo(context: AssetExecutionContext, config: OSSDirectoryConfig): + """Materializes both the projects/collections from the oss-directory repo + into separate dataframe assets. + """ + data = fetch_data() + + if not data.meta: + raise Exception("ossdirectory repository metadata is required") + + for output in context.op_execution_context.selected_output_names: + asset_key = context.asset_key_for_output(output) + latest_materialization = context.instance.get_latest_materialization_event( + asset_key=asset_key + ) + + # Check if there's a previous materialization. We can choose not to add + # any data to the database + if ( + latest_materialization + and latest_materialization.asset_materialization + and not config.force_write + ): + repo_meta = latest_materialization.asset_materialization.metadata.get( + "repo_meta", {} + ) + if repo_meta: + repo_meta = cast(JsonMetadataValue, repo_meta) + repo_meta_dict = cast(dict, repo_meta.data) + context.log.debug( + { + "message": "repo_meta", + "repo_meta": repo_meta_dict, + } + ) + # The previous sha for this asset and the current sha match. No + # need to update anything + if repo_meta_dict.get("sha", "") == data.meta.sha: + context.log.info(f"no changes for {output}") + continue + committed_dt = data.meta.committed_datetime + + df = pl.from_dicts(getattr(data, output)) + # Add sync time and commit sha to the dataframe + df = df.with_columns( + sha=pl.lit(bytes.fromhex(data.meta.sha)), + # We need to instantiate the datetime here using this pl.datetime + # constructor due to an issue with the datetime that's returned from + # ossdirectory that has a timezone type that seems to be + # incompatible with polars. + committed_time=pl.datetime( + committed_dt.year, + committed_dt.month, + committed_dt.day, + committed_dt.hour, + committed_dt.minute, + committed_dt.second, + ), + ) + + yield Output( + df, + output, + metadata={ + "repo_meta": { + "sha": data.meta.sha, + "committed": arrow.get(data.meta.committed_datetime).isoformat(), + "authored": arrow.get(data.meta.authored_datetime).isoformat(), + } + }, + ) diff --git a/warehouse/oso_dagster/compile.py b/warehouse/oso_dagster/compile.py index f88aa64f..9176ac7b 100644 --- a/warehouse/oso_dagster/compile.py +++ b/warehouse/oso_dagster/compile.py @@ -1,6 +1,7 @@ # ruff: noqa: E402 """ -Tool for handling dbt compilation in a docker container +Tool for handling dbt compilation in a docker container. This should not be used +locally. This is truly only for docker. """ import click diff --git a/warehouse/oso_dagster/definitions.py b/warehouse/oso_dagster/definitions.py index 4927df7c..75473d87 100644 --- a/warehouse/oso_dagster/definitions.py +++ b/warehouse/oso_dagster/definitions.py @@ -3,8 +3,9 @@ from dagster import Definitions, load_assets_from_modules from dagster_dbt import DbtCliResource from dagster_gcp import BigQueryResource, GCSResource +from dagster_polars import PolarsBigQueryIOManager -from .constants import main_dbt_project_dir, main_dbt_manifests +from . import constants from .schedules import schedules from .cbt import CBTResource from .factories import load_assets_factories_from_modules @@ -26,14 +27,19 @@ def load_definitions(): asset_factories = load_assets_factories_from_modules([assets]) asset_defs = load_assets_from_modules([assets]) + io_manager = PolarsBigQueryIOManager(project=constants.project_id) + + # Each of the dbt environments needs to be setup as a resource to be used in + # the dbt assets resources = { "gcs": gcs, "cbt": cbt, "bigquery": bigquery, + "io_manager": io_manager, } - for target in main_dbt_manifests: + for target in constants.main_dbt_manifests: resources[f"{target}_dbt"] = DbtCliResource( - project_dir=os.fspath(main_dbt_project_dir), target=target + project_dir=os.fspath(constants.main_dbt_project_dir), target=target ) return Definitions(