Skip to content

Commit

Permalink
Merge pull request #32 from leap-stc/remove_data_transforms
Browse files Browse the repository at this point in the history
Added import of Copy and InjectAttrs to data_management_transforms
  • Loading branch information
jbusecke authored Apr 17, 2024
2 parents 456cfaa + e18a2b0 commit 311be42
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 45 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Click on the button on the top left to use this repository as a template for you

>[!WARNING]
> - Make sure to create the repo under the `leap-stc` github organization, not your personal account! If you already did that, you can always transfer the ownership afterwards.
> - Name your feedstock according to your data `<your_data>_feedstock`.
> - Name your feedstock according to your data `<your_data>_feedstock`.
Now you can locally check out the repository.

Expand Down
45 changes: 1 addition & 44 deletions feedstock/recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@
A synthetic prototype recipe
"""

import zarr
import os
from dataclasses import dataclass
from typing import List, Dict
import apache_beam as beam
from datetime import datetime, timezone
from leap_data_management_utils.data_management_transforms import Copy, InjectAttrs
from pangeo_forge_recipes.patterns import pattern_from_file_sequence
from pangeo_forge_recipes.transforms import (
OpenURLWithFSSpec,
Expand All @@ -21,48 +20,6 @@
yaml = YAML(typ="safe")


# copied from cmip feedstock (TODO: move to central repo?)
@dataclass
class Copy(beam.PTransform):
target: str

def _copy(self, store: zarr.storage.FSStore) -> zarr.storage.FSStore:
import os
import zarr
import gcsfs

# We do need the gs:// prefix?
# TODO: Determine this dynamically from zarr.storage.FSStore
source = f"gs://{os.path.normpath(store.path)}/" # FIXME more elegant. `.copytree` needs trailing slash
fs = gcsfs.GCSFileSystem() # FIXME: How can we generalize this?
fs.cp(source, self.target, recursive=True)
# return a new store with the new path that behaves exactly like the input
# to this stage (so we can slot this stage right before testing/logging stages)
return zarr.storage.FSStore(self.target)

def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
return pcoll | "Copying Store" >> beam.Map(self._copy)


@dataclass
class InjectAttrs(beam.PTransform):
inject_attrs: dict

def _update_zarr_attrs(self, store: zarr.storage.FSStore) -> zarr.storage.FSStore:
# TODO: Can we get a warning here if the store does not exist?
attrs = zarr.open(store, mode="a").attrs
attrs.update(self.inject_attrs)
# ? Should we consolidate here? We are explicitly doing that later...
return store

def expand(
self, pcoll: beam.PCollection[zarr.storage.FSStore]
) -> beam.PCollection[zarr.storage.FSStore]:
return pcoll | "Injecting Attributes" >> beam.Map(self._update_zarr_attrs)


# TODO: Both these stages are generally useful. They should at least be in the utils package, maybe in recipes?

# load the global config values (we will have to decide where these ultimately live)
catalog_meta = yaml.load(open("feedstock/catalog.yaml"))

Expand Down
1 change: 1 addition & 0 deletions feedstock/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pangeo-forge-recipes==0.10.7
apache-beam[gcp]
gcsfs
leap-data-management-utils==0.0.3

0 comments on commit 311be42

Please sign in to comment.