From 29e62f939b04289619a67476ea31b12a312001e2 Mon Sep 17 00:00:00 2001 From: Steph Bench Date: Fri, 1 Dec 2023 14:18:17 +0100 Subject: [PATCH 01/19] Add indexpath parent directory creation --- cfgrib/messages.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cfgrib/messages.py b/cfgrib/messages.py index f7d725fb..c3e4ef90 100644 --- a/cfgrib/messages.py +++ b/cfgrib/messages.py @@ -23,6 +23,7 @@ import os import pickle import typing as T +from pathlib import Path import attr import eccodes # type: ignore @@ -530,6 +531,10 @@ def from_indexpath_or_filestream( hash = hashlib.md5(repr(index_keys).encode("utf-8")).hexdigest() indexpath = indexpath.format(path=filestream.path, hash=hash, short_hash=hash[:5]) + if not Path(indexpath).parent.exists(): + # When reading from read-only partition, we can define indexes in another + # directory, eg. indexpath='/tmp/indexes/{path}.idx' + Path(indexpath).parent.mkdir(parents=True, exist_ok=True) try: with compat_create_exclusive(indexpath) as new_index_file: self = cls.from_fieldset(filestream, index_keys, computed_keys) From 2b1306b3cd0b69292e1789066780e5040c429d67 Mon Sep 17 00:00:00 2001 From: Steph Bench Date: Fri, 1 Dec 2023 14:30:34 +0100 Subject: [PATCH 02/19] Add filter_heteorogeneous argument in build_dataset_components --- cfgrib/dataset.py | 31 +++++++++++++++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/cfgrib/dataset.py b/cfgrib/dataset.py index bf5eea9a..f79aa457 100644 --- a/cfgrib/dataset.py +++ b/cfgrib/dataset.py @@ -661,13 +661,31 @@ def build_dataset_components( time_dims: T.Sequence[str] = ("time", "step"), extra_coords: T.Dict[str, str] = {}, cache_geo_coords: bool = True, + filter_heterogeneous: bool = False, ) -> T.Tuple[T.Dict[str, int], T.Dict[str, Variable], T.Dict[str, T.Any], T.Dict[str, T.Any]]: dimensions = {} # type: T.Dict[str, int] variables = {} # type: T.Dict[str, Variable] filter_by_keys = index.filter_by_keys - for param_id in index.get("paramId", []): - var_index = index.subindex(paramId=param_id) + subindex_kwargs_list = [{"paramId": param_id} for param_id in index.get("paramId", [])] + if filter_heterogeneous: + subindex_kwargs_list = [] + for param_id in index.get('paramId', []): + for type_of_level in index.get('typeOfLevel', []): + for step_type in index.get('stepType', []): + subindex_kwargs_list.append({ + 'paramId': param_id, + 'typeOfLevel': type_of_level, + 'stepType': step_type + }) + + + for subindex_kwargs in subindex_kwargs_list: + var_index = index.subindex(**subindex_kwargs) + if not var_index.header_values: + log.debug(f"No match for {subindex_kwargs}") + continue + try: dims, data_var, coord_vars = build_variable_components( var_index, @@ -696,6 +714,15 @@ def build_dataset_components( var_name = data_var.attributes.get("GRIB_cfVarName", "unknown") if "parameter" in encode_cf and var_name not in ("undef", "unknown"): short_name = var_name + + if filter_heterogeneous: + # Unique variable name looking like "t__surface__instant" + short_name = "__".join([ + short_name, + data_var.attributes.get("GRIB_typeOfLevel", "unknown"), + data_var.attributes.get("GRIB_stepType", "unknown"), + ]) + try: dict_merge(variables, coord_vars) dict_merge(variables, {short_name: data_var}) From 1becba490aa8cf6f9c7aa0d4efe3143988185fd7 Mon Sep 17 00:00:00 2001 From: Steph Bench Date: Fri, 1 Dec 2023 14:31:06 +0100 Subject: [PATCH 03/19] Add filter_heteorogeneous args in xarray backend --- cfgrib/xarray_plugin.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cfgrib/xarray_plugin.py b/cfgrib/xarray_plugin.py index a9268208..a80b8dcd 100644 --- a/cfgrib/xarray_plugin.py +++ b/cfgrib/xarray_plugin.py @@ -98,6 +98,7 @@ def open_dataset( lock: T.Union[T.ContextManager[T.Any], None] = None, indexpath: str = messages.DEFAULT_INDEXPATH, filter_by_keys: T.Dict[str, T.Any] = {}, + filter_heterogeneous: bool = False, read_keys: T.Iterable[str] = (), encode_cf: T.Sequence[str] = ("parameter", "time", "geography", "vertical"), squeeze: bool = True, @@ -110,6 +111,7 @@ def open_dataset( filename_or_obj, indexpath=indexpath, filter_by_keys=filter_by_keys, + filter_heterogeneous=filter_heterogeneous, read_keys=read_keys, encode_cf=encode_cf, squeeze=squeeze, From 10ebae3b1775078c9336c4a010339823a4a669ec Mon Sep 17 00:00:00 2001 From: Steph Bench Date: Fri, 1 Dec 2023 14:31:22 +0100 Subject: [PATCH 04/19] Add updated tests --- tests/test_40_xarray_store.py | 9 +++++++++ tests/test_50_sample_data.py | 13 +++++++++++++ 2 files changed, 22 insertions(+) diff --git a/tests/test_40_xarray_store.py b/tests/test_40_xarray_store.py index 0f51613b..4d943425 100644 --- a/tests/test_40_xarray_store.py +++ b/tests/test_40_xarray_store.py @@ -126,6 +126,15 @@ def test_open_datasets() -> None: assert res[0].attrs["GRIB_centre"] == "ecmf" +def test_open_filter_heteorogeneous() -> None: + res = xarray_store.open_dataset(TEST_DATASETS, backend_kwargs={ + 'filter_heteorogeneous': True + }) + + assert isinstance(res, xr.Dataset) + assert res.attrs["GRIB_centre"] == "ecmf" + + def test_cached_geo_coords() -> None: ds1 = xarray_store.open_dataset(TEST_DATA_MULTIPLE_FIELDS) ds2 = xarray_store.open_dataset( diff --git a/tests/test_50_sample_data.py b/tests/test_50_sample_data.py index c4384dd7..ebe6150b 100644 --- a/tests/test_50_sample_data.py +++ b/tests/test_50_sample_data.py @@ -64,6 +64,19 @@ def test_open_datasets(grib_name: str) -> None: assert len(res) > 1 +@pytest.mark.parametrize( + "grib_name", ["hpa_and_pa", "t_on_different_level_types", "tp_on_different_grid_resolutions"] +) +def test_open_datasets(grib_name: str) -> None: + grib_path = os.path.join(SAMPLE_DATA_FOLDER, grib_name + ".grib") + + res = xarray_store.open_dataset(grib_path, backend_kwargs={ + 'filter_heteorogeneous': True + }) + + assert isinstance(res, xr.Dataset) + + @pytest.mark.parametrize( "grib_name", [ From 8ee54ad126adc22b23d34e3b46775f22b4f244e2 Mon Sep 17 00:00:00 2001 From: Steph Bench Date: Fri, 1 Dec 2023 15:47:19 +0100 Subject: [PATCH 05/19] Add param_id var --- cfgrib/dataset.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cfgrib/dataset.py b/cfgrib/dataset.py index f79aa457..209f64ea 100644 --- a/cfgrib/dataset.py +++ b/cfgrib/dataset.py @@ -710,6 +710,7 @@ def build_dataset_components( fbks.append(fbk) error_message += "\n filter_by_keys=%r" % fbk raise DatasetBuildError(error_message, key, fbks) + param_id = subindex_kwargs['paramId'] short_name = data_var.attributes.get("GRIB_shortName", "paramId_%d" % param_id) var_name = data_var.attributes.get("GRIB_cfVarName", "unknown") if "parameter" in encode_cf and var_name not in ("undef", "unknown"): From 8e8b6f19ffab5fffeab37955da6133b2cc85c1dc Mon Sep 17 00:00:00 2001 From: Steph Bench Date: Fri, 1 Dec 2023 15:47:37 +0100 Subject: [PATCH 06/19] Working on tests --- tests/test_40_xarray_store.py | 6 +++--- tests/test_50_sample_data.py | 24 +++++++++++++++++++++--- 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/tests/test_40_xarray_store.py b/tests/test_40_xarray_store.py index 4d943425..e34c84c0 100644 --- a/tests/test_40_xarray_store.py +++ b/tests/test_40_xarray_store.py @@ -126,9 +126,9 @@ def test_open_datasets() -> None: assert res[0].attrs["GRIB_centre"] == "ecmf" -def test_open_filter_heteorogeneous() -> None: - res = xarray_store.open_dataset(TEST_DATASETS, backend_kwargs={ - 'filter_heteorogeneous': True +def test_open_filter_heterogeneous() -> None: + res = xarray_store.open_dataset(TEST_DATA, backend_kwargs={ + 'filter_heterogeneous': True }) assert isinstance(res, xr.Dataset) diff --git a/tests/test_50_sample_data.py b/tests/test_50_sample_data.py index ebe6150b..3a16ac77 100644 --- a/tests/test_50_sample_data.py +++ b/tests/test_50_sample_data.py @@ -65,13 +65,31 @@ def test_open_datasets(grib_name: str) -> None: @pytest.mark.parametrize( - "grib_name", ["hpa_and_pa", "t_on_different_level_types", "tp_on_different_grid_resolutions"] + "grib_name", + [ + "era5-levels-members", + "fields_with_missing_values", + "lambert_grid", + "reduced_gg", + "regular_gg_sfc", + "regular_gg_pl", + "regular_gg_ml", + "regular_gg_ml_g2", + "regular_ll_sfc", + "regular_ll_msl", + "scanning_mode_64", + "single_gridpoint", + "spherical_harmonics", + "t_analysis_and_fc_0", + "hpa_and_pa", + "uv_on_different_levels", + ] ) -def test_open_datasets(grib_name: str) -> None: +def test_open_dataset_heterogeneous(grib_name: str) -> None: grib_path = os.path.join(SAMPLE_DATA_FOLDER, grib_name + ".grib") res = xarray_store.open_dataset(grib_path, backend_kwargs={ - 'filter_heteorogeneous': True + 'filter_heterogeneous': True }) assert isinstance(res, xr.Dataset) From c45e42d912dbd62d0ad70b2ac8056422fe2662a4 Mon Sep 17 00:00:00 2001 From: Steph Bench Date: Fri, 1 Dec 2023 16:32:09 +0100 Subject: [PATCH 07/19] Working on tests --- tests/test_40_xarray_store.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_40_xarray_store.py b/tests/test_40_xarray_store.py index e34c84c0..a4bc6544 100644 --- a/tests/test_40_xarray_store.py +++ b/tests/test_40_xarray_store.py @@ -132,6 +132,7 @@ def test_open_filter_heterogeneous() -> None: }) assert isinstance(res, xr.Dataset) + assert "t__isobaricInhPa__instant" in res assert res.attrs["GRIB_centre"] == "ecmf" From 9eac9bd3a0200fd65b064c537112428035f2be38 Mon Sep 17 00:00:00 2001 From: Steph Bench Date: Fri, 1 Dec 2023 16:35:28 +0100 Subject: [PATCH 08/19] Add build index cli --- cfgrib/__main__.py | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/cfgrib/__main__.py b/cfgrib/__main__.py index 0404c4e6..84718543 100644 --- a/cfgrib/__main__.py +++ b/cfgrib/__main__.py @@ -20,6 +20,7 @@ import json import os.path import typing as T +from pathlib import Path import click @@ -176,5 +177,35 @@ def dump(inpaths, variable, cdm, engine): print(ds_or_da) +@cfgrib_cli.command("build_index") +@click.argument("inpaths", nargs=-1, required=True) +@click.option("--index-basedir", default=None) +@click.option("--force", default=None) +def build_index(inpaths, index_basedir, force): + # type: (T.List[str], str, bool) -> None + from .messages import FileStream, FileIndex + from .dataset import compute_index_keys + + index_keys = compute_index_keys(("time", "step", "shortName"), {}) + indexpath = "{path}.idx" + if index_basedir: + indexpath = os.path.join(index_basedir, '{path}.idx') + + for fp in inpaths: + fp_idx = Path(indexpath.format(path=fp)) + if force: + fp_idx.unlink(missing_ok=True) + + print(f"{fp}: Creating index to {fp_idx}") + stream = FileStream(str(fp)) + index = FileIndex.from_indexpath_or_filestream( + filestream=stream, + index_keys=index_keys, + indexpath=indexpath + ) + + + + if __name__ == "__main__": # pragma: no cover cfgrib_cli() From d549e4c06a3421c13dc6b85c48cc199cbe73fa30 Mon Sep 17 00:00:00 2001 From: Steph Bench Date: Fri, 1 Dec 2023 22:58:45 +0100 Subject: [PATCH 09/19] Working on ... --- cfgrib/__main__.py | 28 +++++----------------------- cfgrib/dataset.py | 14 ++++++++++++++ cfgrib/messages.py | 9 +++++++-- tests/test_30_dataset.py | 8 ++++++++ 4 files changed, 34 insertions(+), 25 deletions(-) diff --git a/cfgrib/__main__.py b/cfgrib/__main__.py index 84718543..73b6c62f 100644 --- a/cfgrib/__main__.py +++ b/cfgrib/__main__.py @@ -20,7 +20,6 @@ import json import os.path import typing as T -from pathlib import Path import click @@ -180,31 +179,14 @@ def dump(inpaths, variable, cdm, engine): @cfgrib_cli.command("build_index") @click.argument("inpaths", nargs=-1, required=True) @click.option("--index-basedir", default=None) -@click.option("--force", default=None) -def build_index(inpaths, index_basedir, force): +@click.option("--force-index-creation", default=None) +def build_index(inpaths, index_basedir, force_index_creation): # type: (T.List[str], str, bool) -> None - from .messages import FileStream, FileIndex - from .dataset import compute_index_keys - - index_keys = compute_index_keys(("time", "step", "shortName"), {}) - indexpath = "{path}.idx" - if index_basedir: - indexpath = os.path.join(index_basedir, '{path}.idx') + from .dataset import get_or_create_index for fp in inpaths: - fp_idx = Path(indexpath.format(path=fp)) - if force: - fp_idx.unlink(missing_ok=True) - - print(f"{fp}: Creating index to {fp_idx}") - stream = FileStream(str(fp)) - index = FileIndex.from_indexpath_or_filestream( - filestream=stream, - index_keys=index_keys, - indexpath=indexpath - ) - - + print(f"{fp}: Creating index") + get_or_create_index(str(fp), index_basedir, force_index_creation) if __name__ == "__main__": # pragma: no cover diff --git a/cfgrib/dataset.py b/cfgrib/dataset.py index bf5eea9a..209dc459 100644 --- a/cfgrib/dataset.py +++ b/cfgrib/dataset.py @@ -23,6 +23,7 @@ import logging import os import typing as T +from pathlib import Path import attr import numpy as np @@ -797,3 +798,16 @@ def open_file( index = open_fileindex(stream, indexpath, index_keys, filter_by_keys=filter_by_keys) return open_from_index(index, read_keys, time_dims, extra_coords, errors=errors, **kwargs) + + +def get_or_create_index(fp: str | Path, index_basedir: str | Path, force_index_creation: bool=False) -> messages.FileIndex: + """ Create a pygrib index file """ + index_keys = compute_index_keys() + stream = messages.FileStream(str(fp)) + index = messages.FileIndex.from_indexpath_or_filestream( + filestream=stream, + index_keys=index_keys, + indexpath=str(os.path.join(index_basedir, '{path}.idx')), + force_index_creation=force_index_creation + ) + return index diff --git a/cfgrib/messages.py b/cfgrib/messages.py index f7d725fb..6aa365df 100644 --- a/cfgrib/messages.py +++ b/cfgrib/messages.py @@ -520,9 +520,10 @@ class FileIndex(FieldsetIndex): @classmethod def from_indexpath_or_filestream( - cls, filestream, index_keys, indexpath=DEFAULT_INDEXPATH, computed_keys={}, log=LOG + cls, filestream, index_keys, indexpath=DEFAULT_INDEXPATH, computed_keys={}, log=LOG, + force_index_creation=False ): - # type: (FileStream, T.Sequence[str], str, ComputedKeysType, logging.Logger) -> FileIndex + # type: (FileStream, T.Sequence[str], str, ComputedKeysType, logging.Logger, bool) -> FileIndex # Reading and writing the index can be explicitly suppressed by passing indexpath==''. if not indexpath: @@ -530,6 +531,10 @@ def from_indexpath_or_filestream( hash = hashlib.md5(repr(index_keys).encode("utf-8")).hexdigest() indexpath = indexpath.format(path=filestream.path, hash=hash, short_hash=hash[:5]) + + if force_index_creation and os.path.exists(indexpath): + os.unlink(indexpath) + try: with compat_create_exclusive(indexpath) as new_index_file: self = cls.from_fieldset(filestream, index_keys, computed_keys) diff --git a/tests/test_30_dataset.py b/tests/test_30_dataset.py index 5523d3ee..94acb316 100644 --- a/tests/test_30_dataset.py +++ b/tests/test_30_dataset.py @@ -324,3 +324,11 @@ def test_missing_field_values() -> None: t2 = res.variables["t2m"] assert np.isclose(np.nanmean(t2.data[0, :, :]), 268.375) assert np.isclose(np.nanmean(t2.data[1, :, :]), 270.716) + + +def test_get_or_create_index(tmpdir) -> None: + index = dataset.get_or_create_index(TEST_DATA, os.path.join(tmpdir, "indexes")) + assert isinstance(index, messages.FileIndex) + + index = dataset.get_or_create_index(TEST_DATA, os.path.join(tmpdir, "indexes"), force_index_creation=True) + assert isinstance(index, messages.FileIndex) From 0909daf6e8071e1cf296a65d285f8691898bc3ae Mon Sep 17 00:00:00 2001 From: Steph Bench Date: Fri, 1 Dec 2023 23:04:25 +0100 Subject: [PATCH 10/19] Add index parent dir creation if needed --- cfgrib/messages.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/cfgrib/messages.py b/cfgrib/messages.py index f7d725fb..98e5f257 100644 --- a/cfgrib/messages.py +++ b/cfgrib/messages.py @@ -23,6 +23,7 @@ import os import pickle import typing as T +from pathlib import Path import attr import eccodes # type: ignore @@ -530,6 +531,12 @@ def from_indexpath_or_filestream( hash = hashlib.md5(repr(index_keys).encode("utf-8")).hexdigest() indexpath = indexpath.format(path=filestream.path, hash=hash, short_hash=hash[:5]) + + if not Path(indexpath).parent.exists(): + # When reading from read-only partition, we can define indexes in another + # directory, eg. indexpath='/tmp/indexes/{path}.idx' + Path(indexpath).parent.mkdir(parents=True, exist_ok=True) + try: with compat_create_exclusive(indexpath) as new_index_file: self = cls.from_fieldset(filestream, index_keys, computed_keys) From 6a44b6f5e0f5eba09177ebd45a2d0e94099d8305 Mon Sep 17 00:00:00 2001 From: Steph Bench Date: Fri, 1 Dec 2023 23:10:59 +0100 Subject: [PATCH 11/19] Add test --- tests/test_20_messages.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/test_20_messages.py b/tests/test_20_messages.py index 490e3e18..25928aee 100644 --- a/tests/test_20_messages.py +++ b/tests/test_20_messages.py @@ -197,6 +197,15 @@ def test_FileIndex_from_indexpath_or_filestream(tmpdir: py.path.local) -> None: ) assert isinstance(res, messages.FileIndex) + # trigger index dir creation + res = messages.FileIndex.from_indexpath_or_filestream( + messages.FileStream(str(grib_file)), + ["paramId"], + indexpath=str(tmpdir / "non-existent-folder" / "{path}.idx"), + ) + assert isinstance(res, messages.FileIndex) + assert (tmpdir / "non-existent-folder").exists() + def test_FileIndex_errors() -> None: computed_keys = {"error_key": (lambda m: bool(1 / 0), lambda m, v: None)} # pragma: no branch From 7192662a2ccb3f3305d4df719dd5fc6f39a44d26 Mon Sep 17 00:00:00 2001 From: Steph Bench Date: Fri, 1 Dec 2023 23:13:08 +0100 Subject: [PATCH 12/19] Revert indexpath root creation, moved in another MR --- cfgrib/messages.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/cfgrib/messages.py b/cfgrib/messages.py index c3e4ef90..f7d725fb 100644 --- a/cfgrib/messages.py +++ b/cfgrib/messages.py @@ -23,7 +23,6 @@ import os import pickle import typing as T -from pathlib import Path import attr import eccodes # type: ignore @@ -531,10 +530,6 @@ def from_indexpath_or_filestream( hash = hashlib.md5(repr(index_keys).encode("utf-8")).hexdigest() indexpath = indexpath.format(path=filestream.path, hash=hash, short_hash=hash[:5]) - if not Path(indexpath).parent.exists(): - # When reading from read-only partition, we can define indexes in another - # directory, eg. indexpath='/tmp/indexes/{path}.idx' - Path(indexpath).parent.mkdir(parents=True, exist_ok=True) try: with compat_create_exclusive(indexpath) as new_index_file: self = cls.from_fieldset(filestream, index_keys, computed_keys) From c149cb0a3c2dd74ebc7a8a609ac3981935d035bd Mon Sep 17 00:00:00 2001 From: Steph Bench Date: Fri, 1 Dec 2023 23:25:17 +0100 Subject: [PATCH 13/19] Add documentation --- README.rst | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/README.rst b/README.rst index 0482d25b..bcbdaf0b 100644 --- a/README.rst +++ b/README.rst @@ -649,6 +649,46 @@ Attributes: institution: US National Weather Service - NCEP] +Alternatively, you can as well use the ``filter_heterogeneous`` args, *cfgrib* will try to load +all heterogeneous variable at once, and them merge them to a single ``xr.Dataset```. + +Please note that variable name will be concatenation of ``{shortName}__{typeOfLevel}__{stepType}``, +cf. example below. + +.. code-block: python + +>>> import xarray as xr +>>> xr.open_dataset('nam.t00z.awp21100.tm00.grib2', engine='cfgrib', + backend_kwargs={'filter_heterogeneous': True}) + +Dimensions: (y: 65, x: 93, isobaricInhPa: 19, + pressureFromGroundLayer: 5, + heightAboveGroundLayer: 2) +Coordinates: (12/17) + time datetime64[ns] ... + step timedelta64[ns] ... + meanSea float64 ... + gh__cloudBase__instant (y, x) float32 ... + gh__cloudTop__instant (y, x) float32 ... + gh__maxWind__instant (y, x) float32 ... + ... ... + pres__cloudBase__instant (y, x) float32 ... + pres__cloudTop__instant (y, x) float32 ... + pres__maxWind__instant (y, x) float32 ... + hlcy__heightAboveGroundLayer__instant (heightAboveGroundLayer, y, x) float32 ... + trpp__tropopause__instant (y, x) float32 ... + unknown__surface__instant (y, x) float32 ... +Attributes: + GRIB_edition: 2 + GRIB_centre: kwbc + GRIB_centreDescription: US National Weather Service - NCEP + GRIB_subCentre: 0 + Conventions: CF-1.7 + institution: US National Weather Service - NCEP + history: 2023-12-01T23:23 GRIB to CDM+CF via cfgrib-0.9.1... + + + Advanced usage ============== From 548c643ef28a0e4d01d7e7aa8b45b0b4a5f0fdc5 Mon Sep 17 00:00:00 2001 From: Steph Bench Date: Fri, 1 Dec 2023 23:28:51 +0100 Subject: [PATCH 14/19] Add documentation --- README.rst | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/README.rst b/README.rst index bcbdaf0b..8a6ce81c 100644 --- a/README.rst +++ b/README.rst @@ -650,7 +650,7 @@ Attributes: Alternatively, you can as well use the ``filter_heterogeneous`` args, *cfgrib* will try to load -all heterogeneous variable at once, and them merge them to a single ``xr.Dataset```. +all heterogeneous variables at once, and them merge them to a single ``xr.Dataset```. Please note that variable name will be concatenation of ``{shortName}__{typeOfLevel}__{stepType}``, cf. example below. @@ -668,10 +668,25 @@ Coordinates: (12/17) time datetime64[ns] ... step timedelta64[ns] ... meanSea float64 ... + prmsl__meanSea__instant (y, x) float32 ... + gust__surface__instant (y, x) float32 ... + gh__isobaricInhPa__instant (isobaricInhPa, y, x) float32 ... gh__cloudBase__instant (y, x) float32 ... gh__cloudTop__instant (y, x) float32 ... gh__maxWind__instant (y, x) float32 ... - ... ... + gh__isothermZero__instant (y, x) float32 ... + t__isobaricInhPa__instant (isobaricInhPa, y, x) float32 ... + t__cloudTop__instant (y, x) float32 ... + t__tropopause__instant (y, x) float32 ... + t__pressureFromGroundLayer__instant (pressureFromGroundLayer, y, x) float32 ... + acpcp__surface__accum (y, x) float32 ... + csnow__surface__instant (y, x) float32 ... + cicep__surface__instant (y, x) float32 ... + cfrzr__surface__instant (y, x) float32 ... + crain__surface__instant (y, x) float32 ... + cape__surface__instant (y, x) float32 ... + cin__surface__instant (y, x) float32 ... + pwat__atmosphereSingleLayer__instant (y, x) float32 ... pres__cloudBase__instant (y, x) float32 ... pres__cloudTop__instant (y, x) float32 ... pres__maxWind__instant (y, x) float32 ... From a1a3b7251ba63637aba9113d2a397a100dcae516 Mon Sep 17 00:00:00 2001 From: Steph Bench Date: Mon, 4 Dec 2023 15:47:52 +0100 Subject: [PATCH 15/19] Working on draft ... --- cfgrib/dataset.py | 48 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/cfgrib/dataset.py b/cfgrib/dataset.py index 209f64ea..71c07ea4 100644 --- a/cfgrib/dataset.py +++ b/cfgrib/dataset.py @@ -22,6 +22,7 @@ import json import logging import os +import random import typing as T import attr @@ -299,6 +300,13 @@ def __eq__(self, other): equal = (self.dimensions, self.attributes) == (other.dimensions, other.attributes) return equal and np.array_equal(self.data, other.data) + def merge_data(self, other_data): + data = np.unique(np.append(self.data, other_data)) + sort_reverse = self.attributes.get("stored_direction") == "decreasing" + self.data = np.array(sorted(data, reverse=sort_reverse)) + + return self + def expand_item(item, shape): # type: (T.Tuple[T.Any, ...], T.Sequence[int]) -> T.Tuple[T.List[int], ...] @@ -723,6 +731,46 @@ def build_dataset_components( data_var.attributes.get("GRIB_typeOfLevel", "unknown"), data_var.attributes.get("GRIB_stepType", "unknown"), ]) + import string, random + for coord_name in list(coord_vars.keys()): + if coord_name in variables: + if coord_vars[coord_name] == variables[coord_name]: + continue + + # When coordinates mismatches, create unique name + coord_name_count = len([x for x in variables.keys() if x.__contains__(coord_name)]) + coord_name_unique = f"{coord_name}{coord_name_count}"# + "".join(random.choices(string.ascii_lowercase, k=4)) + + # Update coord_vars + coord_vars[coord_name].dimensions = tuple( + [x.replace(coord_name, coord_name_unique) for x in coord_vars[coord_name].dimensions] + ) + # Update coord_vars dict + coord_vars[coord_name_unique] = coord_vars.pop(coord_name) + # Update data_var attributes, eg. 'time step isobaricInhPa latitude longitude valid_time' + data_var.attributes['coordinates'] = data_var.attributes['coordinates'].replace(coord_name, coord_name_unique) + # Update data_var dimensions, eg. ('isobaricInhPa', 'y', 'x') + data_var.dimensions = tuple( + [x.replace(coord_name, coord_name_unique) for x in data_var.dimensions] + ) + + if coord_name in dims: + dims[coord_name_unique] = dims.pop(coord_name) + + + # # Do our best to merge coordinates variables and prevent exception below + # # By merging only coordinate Variable, we expect missing data to get nan + # for coord_name in set(coord_vars.keys()).intersection(set(variables.keys())): + # coord_existing = variables[coord_name] + # coord_var = coord_vars[coord_name] + # if coord_existing == coord_var: + # continue + # # Update coords data + # coord_existing.merge_data(coord_var.data) + # coord_vars[coord_name] = coord_existing + # # Update coords shape + # dims[coord_name] = len(coord_existing.data) + # dimensions[coord_name] = len(coord_existing.data) try: dict_merge(variables, coord_vars) From 6c35c39aa15efded755a445f4aa24883d86abf34 Mon Sep 17 00:00:00 2001 From: Steph Bench Date: Mon, 4 Dec 2023 16:36:26 +0100 Subject: [PATCH 16/19] Working on draft ... --- cfgrib/dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cfgrib/dataset.py b/cfgrib/dataset.py index 71c07ea4..d29fdd1e 100644 --- a/cfgrib/dataset.py +++ b/cfgrib/dataset.py @@ -738,7 +738,7 @@ def build_dataset_components( continue # When coordinates mismatches, create unique name - coord_name_count = len([x for x in variables.keys() if x.__contains__(coord_name)]) + coord_name_count = len([x for x in variables.keys() if x.startswith(coord_name)]) coord_name_unique = f"{coord_name}{coord_name_count}"# + "".join(random.choices(string.ascii_lowercase, k=4)) # Update coord_vars From b0dd82948fc36b9378052a744e7ce89f74035b72 Mon Sep 17 00:00:00 2001 From: Steph Bench Date: Mon, 4 Dec 2023 17:06:30 +0100 Subject: [PATCH 17/19] Update README.rst --- README.rst | 119 ++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 86 insertions(+), 33 deletions(-) diff --git a/README.rst b/README.rst index 8a6ce81c..7852c29e 100644 --- a/README.rst +++ b/README.rst @@ -652,6 +652,8 @@ Attributes: Alternatively, you can as well use the ``filter_heterogeneous`` args, *cfgrib* will try to load all heterogeneous variables at once, and them merge them to a single ``xr.Dataset```. +When coordinates values mismatches, name will be incremented, eg. ``pressureFromGroundLayer1``. + Please note that variable name will be concatenation of ``{shortName}__{typeOfLevel}__{stepType}``, cf. example below. @@ -661,38 +663,89 @@ cf. example below. >>> xr.open_dataset('nam.t00z.awp21100.tm00.grib2', engine='cfgrib', backend_kwargs={'filter_heterogeneous': True}) -Dimensions: (y: 65, x: 93, isobaricInhPa: 19, - pressureFromGroundLayer: 5, - heightAboveGroundLayer: 2) -Coordinates: (12/17) - time datetime64[ns] ... - step timedelta64[ns] ... - meanSea float64 ... - prmsl__meanSea__instant (y, x) float32 ... - gust__surface__instant (y, x) float32 ... - gh__isobaricInhPa__instant (isobaricInhPa, y, x) float32 ... - gh__cloudBase__instant (y, x) float32 ... - gh__cloudTop__instant (y, x) float32 ... - gh__maxWind__instant (y, x) float32 ... - gh__isothermZero__instant (y, x) float32 ... - t__isobaricInhPa__instant (isobaricInhPa, y, x) float32 ... - t__cloudTop__instant (y, x) float32 ... - t__tropopause__instant (y, x) float32 ... - t__pressureFromGroundLayer__instant (pressureFromGroundLayer, y, x) float32 ... - acpcp__surface__accum (y, x) float32 ... - csnow__surface__instant (y, x) float32 ... - cicep__surface__instant (y, x) float32 ... - cfrzr__surface__instant (y, x) float32 ... - crain__surface__instant (y, x) float32 ... - cape__surface__instant (y, x) float32 ... - cin__surface__instant (y, x) float32 ... - pwat__atmosphereSingleLayer__instant (y, x) float32 ... - pres__cloudBase__instant (y, x) float32 ... - pres__cloudTop__instant (y, x) float32 ... - pres__maxWind__instant (y, x) float32 ... - hlcy__heightAboveGroundLayer__instant (heightAboveGroundLayer, y, x) float32 ... - trpp__tropopause__instant (y, x) float32 ... - unknown__surface__instant (y, x) float32 ... +Dimensions: (y: 65, x: 93, isobaricInhPa: 19, + pressureFromGroundLayer: 5, + isobaricInhPa1: 5, + pressureFromGroundLayer1: 2, + pressureFromGroundLayer2: 2, + heightAboveGroundLayer: 2) +Coordinates: + time datetime64[ns] ... + step timedelta64[ns] ... + meanSea float64 ... + latitude (y, x) float64 ... + longitude (y, x) float64 ... + valid_time datetime64[ns] ... + surface float64 ... + * isobaricInhPa (isobaricInhPa) float64 1e+03 950.0 ... 100.0 + cloudBase float64 ... + cloudTop float64 ... + maxWind float64 ... + isothermZero float64 ... + tropopause float64 ... + * pressureFromGroundLayer (pressureFromGroundLayer) float64 3e+03 ... 1.5... + * isobaricInhPa1 (isobaricInhPa1) float64 1e+03 850.0 ... 250.0 + heightAboveGround float64 ... + heightAboveGround1 float64 ... + heightAboveGround2 float64 ... + * pressureFromGroundLayer1 (pressureFromGroundLayer1) float64 9e+03 1.8e+04 + * pressureFromGroundLayer2 (pressureFromGroundLayer2) float64 9e+03 1.8e+04 + atmosphereSingleLayer float64 ... + * heightAboveGroundLayer (heightAboveGroundLayer) float64 1e+03 3e+03 + pressureFromGroundLayer3 float64 ... + pressureFromGroundLayer4 float64 ... +Dimensions without coordinates: y, x +Data variables: + prmsl__meanSea__instant (y, x) float32 ... + gust__surface__instant (y, x) float32 ... + gh__isobaricInhPa__instant (isobaricInhPa, y, x) float32 ... + gh__cloudBase__instant (y, x) float32 ... + gh__cloudTop__instant (y, x) float32 ... + gh__maxWind__instant (y, x) float32 ... + gh__isothermZero__instant (y, x) float32 ... + t__isobaricInhPa__instant (isobaricInhPa, y, x) float32 ... + t__cloudTop__instant (y, x) float32 ... + t__tropopause__instant (y, x) float32 ... + t__pressureFromGroundLayer__instant (pressureFromGroundLayer, y, x) float32 ... + r__isobaricInhPa__instant (isobaricInhPa, y, x) float32 ... + r__isothermZero__instant (y, x) float32 ... + r__pressureFromGroundLayer__instant (pressureFromGroundLayer, y, x) float32 ... + w__isobaricInhPa__instant (isobaricInhPa, y, x) float32 ... + u__isobaricInhPa__instant (isobaricInhPa, y, x) float32 ... + u__tropopause__instant (y, x) float32 ... + u__maxWind__instant (y, x) float32 ... + u__pressureFromGroundLayer__instant (pressureFromGroundLayer, y, x) float32 ... + v__isobaricInhPa__instant (isobaricInhPa, y, x) float32 ... + v__tropopause__instant (y, x) float32 ... + v__maxWind__instant (y, x) float32 ... + v__pressureFromGroundLayer__instant (pressureFromGroundLayer, y, x) float32 ... + absv__isobaricInhPa__instant (isobaricInhPa1, y, x) float32 ... + mslet__meanSea__instant (y, x) float32 ... + sp__surface__instant (y, x) float32 ... + orog__surface__instant (y, x) float32 ... + t2m__heightAboveGround__instant (y, x) float32 ... + r2__heightAboveGround__instant (y, x) float32 ... + u10__heightAboveGround__instant (y, x) float32 ... + v10__heightAboveGround__instant (y, x) float32 ... + tp__surface__accum (y, x) float32 ... + acpcp__surface__accum (y, x) float32 ... + csnow__surface__instant (y, x) float32 ... + cicep__surface__instant (y, x) float32 ... + cfrzr__surface__instant (y, x) float32 ... + crain__surface__instant (y, x) float32 ... + cape__surface__instant (y, x) float32 ... + cape__pressureFromGroundLayer__instant (pressureFromGroundLayer1, y, x) float32 ... + cin__surface__instant (y, x) float32 ... + cin__pressureFromGroundLayer__instant (pressureFromGroundLayer2, y, x) float32 ... + pwat__atmosphereSingleLayer__instant (y, x) float32 ... + pres__cloudBase__instant (y, x) float32 ... + pres__cloudTop__instant (y, x) float32 ... + pres__maxWind__instant (y, x) float32 ... + hlcy__heightAboveGroundLayer__instant (heightAboveGroundLayer, y, x) float32 ... + trpp__tropopause__instant (y, x) float32 ... + pli__pressureFromGroundLayer__instant (y, x) float32 ... + 4lftx__pressureFromGroundLayer__instant (y, x) float32 ... + unknown__surface__instant (y, x) float32 ... Attributes: GRIB_edition: 2 GRIB_centre: kwbc @@ -700,7 +753,7 @@ Attributes: GRIB_subCentre: 0 Conventions: CF-1.7 institution: US National Weather Service - NCEP - history: 2023-12-01T23:23 GRIB to CDM+CF via cfgrib-0.9.1... + history: 2023-12-04T16:56 GRIB to CDM+CF via cfgrib-0.9.1... From 8c18731b6354f1e82d5707666cc766bb189e3a23 Mon Sep 17 00:00:00 2001 From: Steph Bench Date: Mon, 4 Dec 2023 17:30:02 +0100 Subject: [PATCH 18/19] Factorize subindex kwargs generation --- cfgrib/dataset.py | 27 ++++++++++----------------- 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/cfgrib/dataset.py b/cfgrib/dataset.py index d29fdd1e..12ee5c81 100644 --- a/cfgrib/dataset.py +++ b/cfgrib/dataset.py @@ -22,7 +22,7 @@ import json import logging import os -import random +import itertools import typing as T import attr @@ -300,13 +300,6 @@ def __eq__(self, other): equal = (self.dimensions, self.attributes) == (other.dimensions, other.attributes) return equal and np.array_equal(self.data, other.data) - def merge_data(self, other_data): - data = np.unique(np.append(self.data, other_data)) - sort_reverse = self.attributes.get("stored_direction") == "decreasing" - self.data = np.array(sorted(data, reverse=sort_reverse)) - - return self - def expand_item(item, shape): # type: (T.Tuple[T.Any, ...], T.Sequence[int]) -> T.Tuple[T.List[int], ...] @@ -677,20 +670,20 @@ def build_dataset_components( subindex_kwargs_list = [{"paramId": param_id} for param_id in index.get("paramId", [])] if filter_heterogeneous: + # Generate all possible combinations of paramId/typeOfLevel/stepType subindex_kwargs_list = [] - for param_id in index.get('paramId', []): - for type_of_level in index.get('typeOfLevel', []): - for step_type in index.get('stepType', []): - subindex_kwargs_list.append({ - 'paramId': param_id, - 'typeOfLevel': type_of_level, - 'stepType': step_type - }) - + combinations = index.get('paramId', []), index.get('typeOfLevel', []), index.get('stepType', []) + for param_id, type_of_level, step_type in itertools.product(combinations): + subindex_kwargs_list.append({ + 'paramId': param_id, + 'typeOfLevel': type_of_level, + 'stepType': step_type + }) for subindex_kwargs in subindex_kwargs_list: var_index = index.subindex(**subindex_kwargs) if not var_index.header_values: + # For some combinations, no match availables log.debug(f"No match for {subindex_kwargs}") continue From f505940f7179bb2c5ecee931ba7c16dcc2b83088 Mon Sep 17 00:00:00 2001 From: Steph Bench Date: Mon, 4 Dec 2023 17:49:59 +0100 Subject: [PATCH 19/19] Factorize variable renaming --- cfgrib/dataset.py | 57 ++++++++++++++++++++--------------------------- 1 file changed, 24 insertions(+), 33 deletions(-) diff --git a/cfgrib/dataset.py b/cfgrib/dataset.py index 12ee5c81..6c5edd08 100644 --- a/cfgrib/dataset.py +++ b/cfgrib/dataset.py @@ -300,6 +300,17 @@ def __eq__(self, other): equal = (self.dimensions, self.attributes) == (other.dimensions, other.attributes) return equal and np.array_equal(self.data, other.data) + def rename_coordinate(self, existing_coord_name, new_coord_name): + # type: (str, str) -> None + # Update data_var dimensions, eg. ('isobaricInhPa', 'y', 'x') + self.dimensions = tuple( + [x.replace(existing_coord_name, new_coord_name) for x in self.dimensions] + ) + if 'coordinates' in self.attributes: + # Update data_var attributes, eg. 'time step isobaricInhPa latitude longitude valid_time' + new_coordinates = self.attributes['coordinates'].replace(existing_coord_name, new_coord_name) + self.attributes['coordinates'] = new_coordinates + def expand_item(item, shape): # type: (T.Tuple[T.Any, ...], T.Sequence[int]) -> T.Tuple[T.List[int], ...] @@ -673,7 +684,7 @@ def build_dataset_components( # Generate all possible combinations of paramId/typeOfLevel/stepType subindex_kwargs_list = [] combinations = index.get('paramId', []), index.get('typeOfLevel', []), index.get('stepType', []) - for param_id, type_of_level, step_type in itertools.product(combinations): + for param_id, type_of_level, step_type in itertools.product(*combinations): subindex_kwargs_list.append({ 'paramId': param_id, 'typeOfLevel': type_of_level, @@ -724,46 +735,26 @@ def build_dataset_components( data_var.attributes.get("GRIB_typeOfLevel", "unknown"), data_var.attributes.get("GRIB_stepType", "unknown"), ]) - import string, random + + # Handle coordinates name/values collision for coord_name in list(coord_vars.keys()): if coord_name in variables: - if coord_vars[coord_name] == variables[coord_name]: + current_coord_var = coord_vars[coord_name] + existing_coord_var = variables[coord_name] + if current_coord_var == existing_coord_var: continue - # When coordinates mismatches, create unique name + # Coordinates have same name, but values not equal -> we need to rename to avoid collision + # Renaming will follow something like isobaricInhPa, isobaricInhPa1, etc. coord_name_count = len([x for x in variables.keys() if x.startswith(coord_name)]) - coord_name_unique = f"{coord_name}{coord_name_count}"# + "".join(random.choices(string.ascii_lowercase, k=4)) - - # Update coord_vars - coord_vars[coord_name].dimensions = tuple( - [x.replace(coord_name, coord_name_unique) for x in coord_vars[coord_name].dimensions] - ) - # Update coord_vars dict - coord_vars[coord_name_unique] = coord_vars.pop(coord_name) - # Update data_var attributes, eg. 'time step isobaricInhPa latitude longitude valid_time' - data_var.attributes['coordinates'] = data_var.attributes['coordinates'].replace(coord_name, coord_name_unique) - # Update data_var dimensions, eg. ('isobaricInhPa', 'y', 'x') - data_var.dimensions = tuple( - [x.replace(coord_name, coord_name_unique) for x in data_var.dimensions] - ) + coord_name_unique = f"{coord_name}{coord_name_count}" + # Update attributes if coord_name in dims: dims[coord_name_unique] = dims.pop(coord_name) - - - # # Do our best to merge coordinates variables and prevent exception below - # # By merging only coordinate Variable, we expect missing data to get nan - # for coord_name in set(coord_vars.keys()).intersection(set(variables.keys())): - # coord_existing = variables[coord_name] - # coord_var = coord_vars[coord_name] - # if coord_existing == coord_var: - # continue - # # Update coords data - # coord_existing.merge_data(coord_var.data) - # coord_vars[coord_name] = coord_existing - # # Update coords shape - # dims[coord_name] = len(coord_existing.data) - # dimensions[coord_name] = len(coord_existing.data) + data_var.rename_coordinate(coord_name, coord_name_unique) + current_coord_var.rename_coordinate(coord_name, coord_name_unique) + coord_vars[coord_name_unique] = coord_vars.pop(coord_name) try: dict_merge(variables, coord_vars)