Skip to content

Commit

Permalink
Factor out CMIP recipe stages (#27)
Browse files Browse the repository at this point in the history
* Factor out CMIP recipe stages

Moving more of the CMIP6 feedstock specific transforms into this repo

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Update cmip_transforms.py

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Add CI and add dependencies

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
jbusecke and pre-commit-ci[bot] authored Apr 30, 2024
1 parent 8c07e4b commit f599f6f
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 1 deletion.
47 changes: 47 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
name: CI

on:
push:
branches:
- "main"
pull_request:
branches:
- "*"
schedule:
- cron: "0 13 * * 1"

jobs:
build:
defaults:
run:
shell: bash -l {0}
strategy:
fail-fast: false
matrix:
os: ["ubuntu-latest"]
python-version: ["3.9", "3.10", "3.11"]
runs-on: ${{ matrix.os }}
steps:
- name: 🫙 Checkout code
uses: actions/checkout@v4
with:
fetch-depth: 0 # checkout tags (which is not done by default)
- name: 🔁 Setup Python
id: setup-python
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
cache: pip
cache-dependency-path: pyproject.toml
- name: 🎯 Check cache hit
run: echo '${{ steps.setup-python.outputs.cache-hit }}'
- name: 🌈 Install leap-data-management-utils package
shell: bash -l {0}
run: |
python -m pip install -e ".[test]"
- name: 🔎 Check current version
run: python -c "import leap_data_management_utils; print(leap_data_management_utils.__version__)"
- name: 🏄‍♂️ Run Tests
shell: bash -l {0}
run: |
py.test leap_data_management_utils/tests -v
62 changes: 62 additions & 0 deletions leap_data_management_utils/cmip_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import apache_beam as beam
import zarr
from google.cloud import bigquery
from pangeo_forge_recipes.transforms import Indexed, T

from leap_data_management_utils.cmip_testing import test_all
from leap_data_management_utils.data_management_transforms import BQInterface


Expand Down Expand Up @@ -192,3 +194,63 @@ def _log_to_bigquery(self, store: zarr.storage.FSStore) -> zarr.storage.FSStore:

def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
return pcoll | beam.Map(self._log_to_bigquery)


@dataclass
class Preprocessor(beam.PTransform):
"""
Preprocessor for xarray datasets.
Set all data_variables except for `variable_id` attrs to coord
Add additional information
"""

@staticmethod
def _keep_only_variable_id(item: Indexed[T]) -> Indexed[T]:
"""
Many netcdfs contain variables other than the one specified in the `variable_id` facet.
Set them all to coords
"""
index, ds = item
print(f'Preprocessing before {ds =}')
new_coords_vars = [var for var in ds.data_vars if var != ds.attrs['variable_id']]
ds = ds.set_coords(new_coords_vars)
print(f'Preprocessing after {ds =}')
return index, ds

@staticmethod
def _sanitize_attrs(item: Indexed[T]) -> Indexed[T]:
"""Removes non-ascii characters from attributes see https://github.com/pangeo-forge/pangeo-forge-recipes/issues/586"""
index, ds = item
for att, att_value in ds.attrs.items():
if isinstance(att_value, str):
new_value = att_value.encode('utf-8', 'ignore').decode()
if new_value != att_value:
print(
f'Sanitized datasets attributes field {att}: \n {att_value} \n ----> \n {new_value}'
)
ds.attrs[att] = new_value
return index, ds

def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
return (
pcoll
| 'Fix coordinates' >> beam.Map(self._keep_only_variable_id)
| 'Sanitize Attrs' >> beam.Map(self._sanitize_attrs)
)


@dataclass
class TestDataset(beam.PTransform):
"""
Test stage for data written to zarr store
"""

iid: str

def _test(self, store: zarr.storage.FSStore) -> zarr.storage.FSStore:
test_all(store, self.iid)
return store

def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
return pcoll | 'Testing - Running all tests' >> beam.Map(self._test)
8 changes: 7 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ dependencies = [
"google-cloud-bigquery",
"pandas",
"pangeo-forge-esgf",
"pangeo-forge-recipes",
"pydantic-core",
"pydantic>=2",
"pyyaml",
Expand All @@ -40,7 +41,12 @@ dependencies = [

[project.optional-dependencies]
test = [
"pre-commit",
"pytest"
]

dev = [
"leap-data-leap_data_management_utils[test]",
"pre-commit"
]


Expand Down

0 comments on commit f599f6f

Please sign in to comment.