Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Local Test Passed > Testing Dataflow #1

Merged
merged 4 commits into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion configs/config_local_hub.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# This logic only works locally on the LEAP-Pangeo hub (or similar Jupyterhubs)
import os
import subprocess
user = os.environ['JUPYTERHUB_USER']

#TODO: factor this out into an importable function and import here and in config_local.py
Expand All @@ -8,11 +9,11 @@
repo_path = subprocess.check_output(['git', 'rev-parse', '--show-toplevel'], text=True).strip()
# Use os.path.basename to get the repository name from the path
repo_name = os.path.basename(repo_path)
return repo_name
except subprocess.CalledProcessError as e:
raise

BUCKET_PREFIX = f"gs://leap-scratch/{user}/{repo_name}"
print(f"{BUCKET_PREFIX=}")

c.Bake.prune = 1
c.Bake.bakery_class = "pangeo_forge_runner.bakery.local.LocalDirectBakery"
Expand Down
13 changes: 4 additions & 9 deletions feedstock/catalog.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,8 @@ tags:
- my-custom-tag
- zarr
stores:
- id: "small"
name: "The cool small Proto Dataset"
url: "gs://leap-scratch/data-library/feedstocks/latest/proto_feedstock/small.zarr"
"ncviewjs:rechunking":
- path: "gs://some-bucket/small.zarr"
use_case: "multiscales"
- id: "metaflux-daily"
url: "gs://leap-scratch/data-library/feedstocks/latest/metaflux_feedstock/metaflux_daily.zarr"

- id: "large"
name: "The even cooler large Proto Dataset" # no pyramids
url: "gs://leap-scratch/data-library/feedstocks/latest/proto_feedstock/large.zarr"
- id: "metaflux-monthly"
url: "gs://leap-scratch/data-library/feedstocks/latest/metaflux_feedstock/meatflux_monthly.zarr"
33 changes: 20 additions & 13 deletions feedstock/meta.yaml
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
title: "LEAP Data Library Prototype"
title: "MetaFlux"
description: >
A prototype test for the LEAP Data Library refactor
MetaFlux: Meta-learning global carbon fluxes from sparse spatiotemporal observations
recipes:
- id: "small"
object: "recipe:small"
- id: "large"
object: "recipe:large"
- id: "metaflux-daily"
object: "recipe:METAFLUX_GPP_RECO_daily"
- id: "metaflux-monthly"
object: "recipe:METAFLUX_GPP_RECO_monthly"

maintainers:
- name: "Julius Busecke"
orcid: "0000-0001-8571-865X"
github: jbusecke

provenance:
providers:
- name: "Julius"
description: "Just a guy testing some recipes. Nothing to see here."
- name: "Zenodo"
description: "Zenodo"
roles:
- host
url: https://zenodo.org/record/7761881#.ZFv9OS-B30p
- name: "Nathaniel et al."
description: "Authors of MetaFlux: Meta-learning global carbon fluxes from sparse spatiotemporal observations"
roles:
- producer
- licensor
license: "Just a Test"
maintainers:
- name: "Julius Busecke"
orcid: "0000-0001-8571-865X"
github: jbusecke
license: "CC-BY-4.0"
140 changes: 80 additions & 60 deletions feedstock/recipe.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
"""
A synthetic prototype recipe
MetaFlux is a global, long-term carbon flux dataset of gross
primary production and ecosystem respiration that is generated
using meta-learning. This dataset will be added to the existing
rodeo forecast model in order to improve its performances."
"""

import zarr
import os
from dataclasses import dataclass
from typing import List, Dict
from typing import List, Dict, Any
import apache_beam as beam
from datetime import datetime, timezone
from pangeo_forge_recipes.patterns import pattern_from_file_sequence
Expand Down Expand Up @@ -34,11 +37,15 @@ def _copy(self, store: zarr.storage.FSStore) -> zarr.storage.FSStore:
# We do need the gs:// prefix?
# TODO: Determine this dynamically from zarr.storage.FSStore
source = f"gs://{os.path.normpath(store.path)}/" # FIXME more elegant. `.copytree` needs trailing slash
fs = gcsfs.GCSFileSystem() # FIXME: How can we generalize this?
fs.cp(source, self.target, recursive=True)
# return a new store with the new path that behaves exactly like the input
# to this stage (so we can slot this stage right before testing/logging stages)
return zarr.storage.FSStore(self.target)
if self.target is False:
# dont do anything
return store
else:
fs = gcsfs.GCSFileSystem() # FIXME: How can we generalize this?
fs.cp(source, self.target, recursive=True)
# return a new store with the new path that behaves exactly like the input
# to this stage (so we can slot this stage right before testing/logging stages)
return zarr.storage.FSStore(self.target)

def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
return pcoll | "Copying Store" >> beam.Map(self._copy)
Expand All @@ -61,10 +68,26 @@ def expand(
return pcoll | "Injecting Attributes" >> beam.Map(self._update_zarr_attrs)


# TODO: Both these stages are generally useful. They should at least be in the utils package, maybe in recipes?
def get_pangeo_forge_build_attrs() -> dict[str, Any]:
"""Get build information (git hash and time) to add to the recipe output"""
# Set up injection attributes
# This is for demonstration purposes only and should be discussed with the broader LEAP/PGF community
# - Bake in information from the top level of the meta.yaml
# - Add a timestamp
# - Add the git hash
# - Add link to the meta.yaml on main
# - Add the recipe id

# load the global config values (we will have to decide where these ultimately live)
catalog_meta = yaml.load(open("feedstock/catalog.yaml"))
git_url_hash = f"{os.environ['GITHUB_SERVER_URL']}/{os.environ['GITHUB_REPOSITORY']}/commit/{os.environ['GITHUB_SHA']}"
timestamp = datetime.now(timezone.utc).isoformat()

return {
"pangeo_forge_build_git_hash": git_url_hash,
"pangeo_forge_build_timestamp": timestamp,
}


# TODO: Both these stages are generally useful. They should at least be in the utils package, maybe in recipes?


def find_recipe_meta(catalog_meta: List[Dict[str, str]], r_id: str) -> Dict[str, str]:
Expand All @@ -79,70 +102,67 @@ def find_recipe_meta(catalog_meta: List[Dict[str, str]], r_id: str) -> Dict[str,
return None # Return None if no matching dictionary is found


# Set up injection attributes
# This is for demonstration purposes only and should be discussed with the broader LEAP/PGF community
# - Bake in information from the top level of the meta.yaml
# - Add a timestamp
# - Add the git hash
# - Add link to the meta.yaml on main
# - Add the recipe id

git_url_hash = f"{os.environ['GITHUB_SERVER_URL']}/{os.environ['GITHUB_REPOSITORY']}/commit/{os.environ['GITHUB_SHA']}"
timestamp = datetime.now(timezone.utc).isoformat()
# load the global config values (we will have to decide where these ultimately live)
catalog_meta = yaml.load(open("feedstock/catalog.yaml"))

injection_attrs = {
"pangeo_forge_build_git_hash": git_url_hash,
"pangeo_forge_build_timestamp": timestamp,
}
print("DETECTING GITHUB ACTIONS RUN")
if os.getenv("GITHUB_ACTIONS") == "true":
print("Running inside GitHub Actions.")

# Get final store path from catalog.yaml input
target_daily = find_recipe_meta(catalog_meta["stores"], "metaflux-daily")["url"]
target_monthly = find_recipe_meta(catalog_meta["stores"], "metaflux-monthly")["url"]
pgf_build_attrs = get_pangeo_forge_build_attrs()
else:
print("Running locally. Deactivating final copy stage.")
# this deactivates the final copy stage for local testing execution
target_daily = False
target_monthly = False
pgf_build_attrs = {}

print("Final output locations")
print(f"{target_daily=}")
print(f"{target_monthly=}")
print(f"{pgf_build_attrs=}")

# Common Parameters
years = range(2001, 2022)
months = range(1, 13)
dataset_url = "https://zenodo.org/record/7761881/files"

## Monthly version
input_urls_a = [
"gs://cmip6/pgf-debugging/hanging_bug/file_a.nc",
"gs://cmip6/pgf-debugging/hanging_bug/file_b.nc",
]
input_urls_b = [
"gs://cmip6/pgf-debugging/hanging_bug/file_a_huge.nc",
"gs://cmip6/pgf-debugging/hanging_bug/file_b_huge.nc",
]
input_urls_monthly = [f"{dataset_url}/METAFLUX_GPP_RECO_monthly_{y}.nc" for y in years]

pattern_a = pattern_from_file_sequence(input_urls_a, concat_dim="time")
pattern_b = pattern_from_file_sequence(input_urls_b, concat_dim="time")

print(f"{catalog_meta=}")
target_small = find_recipe_meta(catalog_meta["stores"], "small")["url"]
target_large = find_recipe_meta(catalog_meta["stores"], "large")["url"]
print(f"{target_small=}")
print(f"{target_large=}")

# small recipe
small = (
beam.Create(pattern_a.items())
| OpenURLWithFSSpec()
pattern_monthly = pattern_from_file_sequence(input_urls_monthly, concat_dim="time")
METAFLUX_GPP_RECO_monthly = (
beam.Create(pattern_monthly.items())
| OpenURLWithFSSpec() # open_kwargs=open_kwargs
| OpenWithXarray()
| StoreToZarr(
store_name="small.zarr",
# FIXME: This is brittle. it needs to be named exactly like in meta.yaml...
# Can we inject this in the same way as the root?
# Maybe its better to find another way and avoid injections entirely...
combine_dims=pattern_a.combine_dim_keys,
store_name="METAFLUX_GPP_RECO_monthly.zarr",
combine_dims=pattern_monthly.combine_dim_keys,
)
| InjectAttrs(injection_attrs)
| InjectAttrs(pgf_build_attrs)
| ConsolidateDimensionCoordinates()
| ConsolidateMetadata()
| Copy(target=target_small)
| Copy(target=target_monthly)
)

# larger recipe
large = (
beam.Create(pattern_b.items())
| OpenURLWithFSSpec()
## daily version
input_urls_daily = [
f"{dataset_url}/METAFLUX_GPP_RECO_daily_{y}{m:02}.nc" for y in years for m in months
]
pattern_daily = pattern_from_file_sequence(input_urls_daily, concat_dim="time")
METAFLUX_GPP_RECO_daily = (
beam.Create(pattern_daily.items())
| OpenURLWithFSSpec() # open_kwargs=open_kwargs
| OpenWithXarray()
| StoreToZarr(
store_name="large.zarr",
combine_dims=pattern_b.combine_dim_keys,
store_name="METAFLUX_GPP_RECO_daily.zarr",
combine_dims=pattern_daily.combine_dim_keys,
)
| InjectAttrs(injection_attrs)
| InjectAttrs(pgf_build_attrs)
| ConsolidateDimensionCoordinates()
| ConsolidateMetadata()
| Copy(target=target_large)
| Copy(target=target_daily)
)