Skip to content

Commit

Permalink
Move dynamic chunking function (#35)
Browse files Browse the repository at this point in the history
* Move dynamic chunking function

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Update pyproject.toml

* Update test_cmip_transforms.py

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Update pyproject.toml

* Update cmip_transforms.py

* Update pyproject.toml

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Update cmip_transforms.py

* Update test_cmip_transforms.py

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Update test_cmip_transforms.py

* Update test_cmip_transforms.py

* Update cmip_transforms.py

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Update pyproject.toml

* Update cmip_transforms.py

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Update test_cmip_transforms.py

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
jbusecke and pre-commit-ci[bot] authored May 8, 2024
1 parent 38bafdf commit 0567f0e
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 1 deletion.
78 changes: 78 additions & 0 deletions leap_data_management_utils/cmip_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,95 @@
"""

import datetime
import logging
import warnings
from dataclasses import dataclass

import apache_beam as beam
import xarray as xr
import zarr
from dask.utils import parse_bytes
from dynamic_chunks.algorithms import (
NoMatchingChunks,
even_divisor_algo,
iterative_ratio_increase_algo,
)
from google.cloud import bigquery
from pangeo_forge_recipes.transforms import Indexed, T
from tqdm.auto import tqdm

from leap_data_management_utils.cmip_testing import test_all
from leap_data_management_utils.data_management_transforms import BQInterface

# TODO: I am not sure the chunking function belongs here, but it clutters the recipe and I did not want
# To open a whole file for this.
logger = logging.getLogger(__name__)


## Dynamic Chunking Wrapper
def dynamic_chunking_func(ds: xr.Dataset) -> dict[str, int]:
logger.info(f'Input Dataset for dynamic chunking {ds =}')

target_chunk_size = '150MB'
target_chunks_aspect_ratio = {
'time': 10,
'x': 1,
'i': 1,
'ni': 1,
'xh': 1,
'nlon': 1,
'lon': 1, # TODO: Maybe import all the known spatial dimensions from xmip?
'y': 1,
'j': 1,
'nj': 1,
'yh': 1,
'nlat': 1,
'lat': 1,
}
size_tolerance = 0.5

# Some datasets are smaller than the target chunk size and should not be chunked at all
if ds.nbytes < parse_bytes(target_chunk_size):
target_chunks = dict(ds.dims)

else:
try:
target_chunks = even_divisor_algo(
ds,
target_chunk_size,
target_chunks_aspect_ratio,
size_tolerance,
allow_extra_dims=True,
)

except NoMatchingChunks:
warnings.warn(
'Primary algorithm using even divisors along each dimension failed '
'with. Trying secondary algorithm.'
f'Input {ds=}'
)
try:
target_chunks = iterative_ratio_increase_algo(
ds,
target_chunk_size,
target_chunks_aspect_ratio,
size_tolerance,
allow_extra_dims=True,
)
except NoMatchingChunks:
raise ValueError(
'Could not find any chunk combinations satisfying '
'the size constraint with either algorithm.'
f'Input {ds=}'
)
# If something fails
except Exception as e:
raise e
except Exception as e:
raise e
logger.info(f'Dynamic Chunking determined {target_chunks =}')
return target_chunks


@dataclass
class IIDEntry:
Expand Down
11 changes: 10 additions & 1 deletion leap_data_management_utils/tests/test_cmip_transforms.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import numpy as np
import pytest
import xarray as xr

from leap_data_management_utils.cmip_transforms import IIDEntry
from leap_data_management_utils.cmip_transforms import IIDEntry, dynamic_chunking_func


class TestIIDEntry:
Expand All @@ -24,4 +26,11 @@ def test_too_long(self):
IIDEntry(iid, store, retracted, tests_passed)


class TestDynamicChunks:
def test_too_small(self):
ds = xr.DataArray(np.random.rand(4, 6)).to_dataset(name='data')
chunks = dynamic_chunking_func(ds)
assert chunks == {'dim_0': 4, 'dim_1': 6}


# TODO Its super hard to test anything involving big query, because AFAIK there is no way to mock it.
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ pangeo-forge=[
"pangeo-forge-esgf",
"pangeo-forge-recipes",
"apache-beam",
"dynamic-chunks",
"dask"
]

catalog = [
Expand Down

0 comments on commit 0567f0e

Please sign in to comment.