Skip to content

Commit

Permalink
moves InjectAttrs & Copy into data_managment_transforms.py (#21)
Browse files Browse the repository at this point in the history
  • Loading branch information
norlandrhagen authored Apr 17, 2024
1 parent b586c70 commit 28df142
Showing 1 changed file with 40 additions and 0 deletions.
40 changes: 40 additions & 0 deletions leap_data_management_utils/data_management_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,43 @@ def _register_dataset_to_catalog(self, store: zarr.storage.FSStore) -> zarr.stor

def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
return pcoll | beam.Map(self._register_dataset_to_catalog)


@dataclass
class Copy(beam.PTransform):
target: str

def _copy(self, store: zarr.storage.FSStore) -> zarr.storage.FSStore:
import os

import gcsfs
import zarr

# We do need the gs:// prefix?
# TODO: Determine this dynamically from zarr.storage.FSStore
source = f'gs://{os.path.normpath(store.path)}/' # FIXME more elegant. `.copytree` needs trailing slash
fs = gcsfs.GCSFileSystem() # FIXME: How can we generalize this?
fs.cp(source, self.target, recursive=True)
# return a new store with the new path that behaves exactly like the input
# to this stage (so we can slot this stage right before testing/logging stages)
return zarr.storage.FSStore(self.target)

def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
return pcoll | 'Copying Store' >> beam.Map(self._copy)


@dataclass
class InjectAttrs(beam.PTransform):
inject_attrs: dict

def _update_zarr_attrs(self, store: zarr.storage.FSStore) -> zarr.storage.FSStore:
# TODO: Can we get a warning here if the store does not exist?
attrs = zarr.open(store, mode='a').attrs
attrs.update(self.inject_attrs)
# ? Should we consolidate here? We are explicitly doing that later...
return store

def expand(
self, pcoll: beam.PCollection[zarr.storage.FSStore]
) -> beam.PCollection[zarr.storage.FSStore]:
return pcoll | 'Injecting Attributes' >> beam.Map(self._update_zarr_attrs)

0 comments on commit 28df142

Please sign in to comment.