Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/index create dir #360

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,114 @@ Attributes:
institution: US National Weather Service - NCEP]


Alternatively, you can as well use the ``filter_heterogeneous`` args, *cfgrib* will try to load
all heterogeneous variables at once, and them merge them to a single ``xr.Dataset```.

When coordinates values mismatches, name will be incremented, eg. ``pressureFromGroundLayer1``.

Please note that variable name will be concatenation of ``{shortName}__{typeOfLevel}__{stepType}``,
cf. example below.

.. code-block: python

>>> import xarray as xr
>>> xr.open_dataset('nam.t00z.awp21100.tm00.grib2', engine='cfgrib',
backend_kwargs={'filter_heterogeneous': True})
<xarray.Dataset>
Dimensions: (y: 65, x: 93, isobaricInhPa: 19,
pressureFromGroundLayer: 5,
isobaricInhPa1: 5,
pressureFromGroundLayer1: 2,
pressureFromGroundLayer2: 2,
heightAboveGroundLayer: 2)
Coordinates:
time datetime64[ns] ...
step timedelta64[ns] ...
meanSea float64 ...
latitude (y, x) float64 ...
longitude (y, x) float64 ...
valid_time datetime64[ns] ...
surface float64 ...
* isobaricInhPa (isobaricInhPa) float64 1e+03 950.0 ... 100.0
cloudBase float64 ...
cloudTop float64 ...
maxWind float64 ...
isothermZero float64 ...
tropopause float64 ...
* pressureFromGroundLayer (pressureFromGroundLayer) float64 3e+03 ... 1.5...
* isobaricInhPa1 (isobaricInhPa1) float64 1e+03 850.0 ... 250.0
heightAboveGround float64 ...
heightAboveGround1 float64 ...
heightAboveGround2 float64 ...
* pressureFromGroundLayer1 (pressureFromGroundLayer1) float64 9e+03 1.8e+04
* pressureFromGroundLayer2 (pressureFromGroundLayer2) float64 9e+03 1.8e+04
atmosphereSingleLayer float64 ...
* heightAboveGroundLayer (heightAboveGroundLayer) float64 1e+03 3e+03
pressureFromGroundLayer3 float64 ...
pressureFromGroundLayer4 float64 ...
Dimensions without coordinates: y, x
Data variables:
prmsl__meanSea__instant (y, x) float32 ...
gust__surface__instant (y, x) float32 ...
gh__isobaricInhPa__instant (isobaricInhPa, y, x) float32 ...
gh__cloudBase__instant (y, x) float32 ...
gh__cloudTop__instant (y, x) float32 ...
gh__maxWind__instant (y, x) float32 ...
gh__isothermZero__instant (y, x) float32 ...
t__isobaricInhPa__instant (isobaricInhPa, y, x) float32 ...
t__cloudTop__instant (y, x) float32 ...
t__tropopause__instant (y, x) float32 ...
t__pressureFromGroundLayer__instant (pressureFromGroundLayer, y, x) float32 ...
r__isobaricInhPa__instant (isobaricInhPa, y, x) float32 ...
r__isothermZero__instant (y, x) float32 ...
r__pressureFromGroundLayer__instant (pressureFromGroundLayer, y, x) float32 ...
w__isobaricInhPa__instant (isobaricInhPa, y, x) float32 ...
u__isobaricInhPa__instant (isobaricInhPa, y, x) float32 ...
u__tropopause__instant (y, x) float32 ...
u__maxWind__instant (y, x) float32 ...
u__pressureFromGroundLayer__instant (pressureFromGroundLayer, y, x) float32 ...
v__isobaricInhPa__instant (isobaricInhPa, y, x) float32 ...
v__tropopause__instant (y, x) float32 ...
v__maxWind__instant (y, x) float32 ...
v__pressureFromGroundLayer__instant (pressureFromGroundLayer, y, x) float32 ...
absv__isobaricInhPa__instant (isobaricInhPa1, y, x) float32 ...
mslet__meanSea__instant (y, x) float32 ...
sp__surface__instant (y, x) float32 ...
orog__surface__instant (y, x) float32 ...
t2m__heightAboveGround__instant (y, x) float32 ...
r2__heightAboveGround__instant (y, x) float32 ...
u10__heightAboveGround__instant (y, x) float32 ...
v10__heightAboveGround__instant (y, x) float32 ...
tp__surface__accum (y, x) float32 ...
acpcp__surface__accum (y, x) float32 ...
csnow__surface__instant (y, x) float32 ...
cicep__surface__instant (y, x) float32 ...
cfrzr__surface__instant (y, x) float32 ...
crain__surface__instant (y, x) float32 ...
cape__surface__instant (y, x) float32 ...
cape__pressureFromGroundLayer__instant (pressureFromGroundLayer1, y, x) float32 ...
cin__surface__instant (y, x) float32 ...
cin__pressureFromGroundLayer__instant (pressureFromGroundLayer2, y, x) float32 ...
pwat__atmosphereSingleLayer__instant (y, x) float32 ...
pres__cloudBase__instant (y, x) float32 ...
pres__cloudTop__instant (y, x) float32 ...
pres__maxWind__instant (y, x) float32 ...
hlcy__heightAboveGroundLayer__instant (heightAboveGroundLayer, y, x) float32 ...
trpp__tropopause__instant (y, x) float32 ...
pli__pressureFromGroundLayer__instant (y, x) float32 ...
4lftx__pressureFromGroundLayer__instant (y, x) float32 ...
unknown__surface__instant (y, x) float32 ...
Attributes:
GRIB_edition: 2
GRIB_centre: kwbc
GRIB_centreDescription: US National Weather Service - NCEP
GRIB_subCentre: 0
Conventions: CF-1.7
institution: US National Weather Service - NCEP
history: 2023-12-04T16:56 GRIB to CDM+CF via cfgrib-0.9.1...



Advanced usage
==============

Expand Down
13 changes: 13 additions & 0 deletions cfgrib/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,5 +176,18 @@ def dump(inpaths, variable, cdm, engine):
print(ds_or_da)


@cfgrib_cli.command("build_index")
@click.argument("inpaths", nargs=-1, required=True)
@click.option("--index-basedir", default=None)
@click.option("--force-index-creation", default=None)
def build_index(inpaths, index_basedir, force_index_creation):
# type: (T.List[str], str, bool) -> None
from .dataset import get_or_create_index

for fp in inpaths:
print(f"{fp}: Creating index")
get_or_create_index(str(fp), index_basedir, force_index_creation)


if __name__ == "__main__": # pragma: no cover
cfgrib_cli()
78 changes: 76 additions & 2 deletions cfgrib/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import json
import logging
import os
import itertools
import typing as T
from pathlib import Path

import attr
import numpy as np
Expand Down Expand Up @@ -299,6 +301,17 @@ def __eq__(self, other):
equal = (self.dimensions, self.attributes) == (other.dimensions, other.attributes)
return equal and np.array_equal(self.data, other.data)

def rename_coordinate(self, existing_coord_name, new_coord_name):
# type: (str, str) -> None
# Update data_var dimensions, eg. ('isobaricInhPa', 'y', 'x')
self.dimensions = tuple(
[x.replace(existing_coord_name, new_coord_name) for x in self.dimensions]
)
if 'coordinates' in self.attributes:
# Update data_var attributes, eg. 'time step isobaricInhPa latitude longitude valid_time'
new_coordinates = self.attributes['coordinates'].replace(existing_coord_name, new_coord_name)
self.attributes['coordinates'] = new_coordinates


def expand_item(item, shape):
# type: (T.Tuple[T.Any, ...], T.Sequence[int]) -> T.Tuple[T.List[int], ...]
Expand Down Expand Up @@ -661,13 +674,31 @@ def build_dataset_components(
time_dims: T.Sequence[str] = ("time", "step"),
extra_coords: T.Dict[str, str] = {},
cache_geo_coords: bool = True,
filter_heterogeneous: bool = False,
) -> T.Tuple[T.Dict[str, int], T.Dict[str, Variable], T.Dict[str, T.Any], T.Dict[str, T.Any]]:
dimensions = {} # type: T.Dict[str, int]
variables = {} # type: T.Dict[str, Variable]
filter_by_keys = index.filter_by_keys

for param_id in index.get("paramId", []):
var_index = index.subindex(paramId=param_id)
subindex_kwargs_list = [{"paramId": param_id} for param_id in index.get("paramId", [])]
if filter_heterogeneous:
# Generate all possible combinations of paramId/typeOfLevel/stepType
subindex_kwargs_list = []
combinations = index.get('paramId', []), index.get('typeOfLevel', []), index.get('stepType', [])
for param_id, type_of_level, step_type in itertools.product(*combinations):
subindex_kwargs_list.append({
'paramId': param_id,
'typeOfLevel': type_of_level,
'stepType': step_type
})

for subindex_kwargs in subindex_kwargs_list:
var_index = index.subindex(**subindex_kwargs)
if not var_index.header_values:
# For some combinations, no match availables
log.debug(f"No match for {subindex_kwargs}")
continue

try:
dims, data_var, coord_vars = build_variable_components(
var_index,
Expand All @@ -692,10 +723,40 @@ def build_dataset_components(
fbks.append(fbk)
error_message += "\n filter_by_keys=%r" % fbk
raise DatasetBuildError(error_message, key, fbks)
param_id = subindex_kwargs['paramId']
short_name = data_var.attributes.get("GRIB_shortName", "paramId_%d" % param_id)
var_name = data_var.attributes.get("GRIB_cfVarName", "unknown")
if "parameter" in encode_cf and var_name not in ("undef", "unknown"):
short_name = var_name

if filter_heterogeneous:
# Unique variable name looking like "t__surface__instant"
short_name = "__".join([
short_name,
data_var.attributes.get("GRIB_typeOfLevel", "unknown"),
data_var.attributes.get("GRIB_stepType", "unknown"),
])

# Handle coordinates name/values collision
for coord_name in list(coord_vars.keys()):
if coord_name in variables:
current_coord_var = coord_vars[coord_name]
existing_coord_var = variables[coord_name]
if current_coord_var == existing_coord_var:
continue

# Coordinates have same name, but values not equal -> we need to rename to avoid collision
# Renaming will follow something like isobaricInhPa, isobaricInhPa1, etc.
coord_name_count = len([x for x in variables.keys() if x.startswith(coord_name)])
coord_name_unique = f"{coord_name}{coord_name_count}"

# Update attributes
if coord_name in dims:
dims[coord_name_unique] = dims.pop(coord_name)
data_var.rename_coordinate(coord_name, coord_name_unique)
current_coord_var.rename_coordinate(coord_name, coord_name_unique)
coord_vars[coord_name_unique] = coord_vars.pop(coord_name)

try:
dict_merge(variables, coord_vars)
dict_merge(variables, {short_name: data_var})
Expand Down Expand Up @@ -797,3 +858,16 @@ def open_file(
index = open_fileindex(stream, indexpath, index_keys, filter_by_keys=filter_by_keys)

return open_from_index(index, read_keys, time_dims, extra_coords, errors=errors, **kwargs)


def get_or_create_index(fp: str | Path, index_basedir: str | Path, force_index_creation: bool=False) -> messages.FileIndex:
""" Create a pygrib index file """
index_keys = compute_index_keys()
stream = messages.FileStream(str(fp))
index = messages.FileIndex.from_indexpath_or_filestream(
filestream=stream,
index_keys=index_keys,
indexpath=str(os.path.join(index_basedir, '{path}.idx')),
force_index_creation=force_index_creation
)
return index
15 changes: 13 additions & 2 deletions cfgrib/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import os
import pickle
import typing as T
from pathlib import Path

import attr
import eccodes # type: ignore
Expand Down Expand Up @@ -520,16 +521,26 @@ class FileIndex(FieldsetIndex):

@classmethod
def from_indexpath_or_filestream(
cls, filestream, index_keys, indexpath=DEFAULT_INDEXPATH, computed_keys={}, log=LOG
cls, filestream, index_keys, indexpath=DEFAULT_INDEXPATH, computed_keys={}, log=LOG,
force_index_creation=False
):
# type: (FileStream, T.Sequence[str], str, ComputedKeysType, logging.Logger) -> FileIndex
# type: (FileStream, T.Sequence[str], str, ComputedKeysType, logging.Logger, bool) -> FileIndex

# Reading and writing the index can be explicitly suppressed by passing indexpath==''.
if not indexpath:
return cls.from_fieldset(filestream, index_keys, computed_keys)

hash = hashlib.md5(repr(index_keys).encode("utf-8")).hexdigest()
indexpath = indexpath.format(path=filestream.path, hash=hash, short_hash=hash[:5])

if not Path(indexpath).parent.exists():
# When reading from read-only partition, we can define indexes in another
# directory, eg. indexpath='/tmp/indexes/{path}.idx'
Path(indexpath).parent.mkdir(parents=True, exist_ok=True)

if force_index_creation and os.path.exists(indexpath):
os.unlink(indexpath)

try:
with compat_create_exclusive(indexpath) as new_index_file:
self = cls.from_fieldset(filestream, index_keys, computed_keys)
Expand Down
2 changes: 2 additions & 0 deletions cfgrib/xarray_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def open_dataset(
lock: T.Union[T.ContextManager[T.Any], None] = None,
indexpath: str = messages.DEFAULT_INDEXPATH,
filter_by_keys: T.Dict[str, T.Any] = {},
filter_heterogeneous: bool = False,
read_keys: T.Iterable[str] = (),
encode_cf: T.Sequence[str] = ("parameter", "time", "geography", "vertical"),
squeeze: bool = True,
Expand All @@ -110,6 +111,7 @@ def open_dataset(
filename_or_obj,
indexpath=indexpath,
filter_by_keys=filter_by_keys,
filter_heterogeneous=filter_heterogeneous,
read_keys=read_keys,
encode_cf=encode_cf,
squeeze=squeeze,
Expand Down
9 changes: 9 additions & 0 deletions tests/test_20_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,15 @@ def test_FileIndex_from_indexpath_or_filestream(tmpdir: py.path.local) -> None:
)
assert isinstance(res, messages.FileIndex)

# trigger index dir creation
res = messages.FileIndex.from_indexpath_or_filestream(
messages.FileStream(str(grib_file)),
["paramId"],
indexpath=str(tmpdir / "non-existent-folder" / "{path}.idx"),
)
assert isinstance(res, messages.FileIndex)
assert (tmpdir / "non-existent-folder").exists()


def test_FileIndex_errors() -> None:
computed_keys = {"error_key": (lambda m: bool(1 / 0), lambda m, v: None)} # pragma: no branch
Expand Down
8 changes: 8 additions & 0 deletions tests/test_30_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,3 +324,11 @@ def test_missing_field_values() -> None:
t2 = res.variables["t2m"]
assert np.isclose(np.nanmean(t2.data[0, :, :]), 268.375)
assert np.isclose(np.nanmean(t2.data[1, :, :]), 270.716)


def test_get_or_create_index(tmpdir) -> None:
index = dataset.get_or_create_index(TEST_DATA, os.path.join(tmpdir, "indexes"))
assert isinstance(index, messages.FileIndex)

index = dataset.get_or_create_index(TEST_DATA, os.path.join(tmpdir, "indexes"), force_index_creation=True)
assert isinstance(index, messages.FileIndex)
10 changes: 10 additions & 0 deletions tests/test_40_xarray_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,16 @@ def test_open_datasets() -> None:
assert res[0].attrs["GRIB_centre"] == "ecmf"


def test_open_filter_heterogeneous() -> None:
res = xarray_store.open_dataset(TEST_DATA, backend_kwargs={
'filter_heterogeneous': True
})

assert isinstance(res, xr.Dataset)
assert "t__isobaricInhPa__instant" in res
assert res.attrs["GRIB_centre"] == "ecmf"


def test_cached_geo_coords() -> None:
ds1 = xarray_store.open_dataset(TEST_DATA_MULTIPLE_FIELDS)
ds2 = xarray_store.open_dataset(
Expand Down
31 changes: 31 additions & 0 deletions tests/test_50_sample_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,37 @@ def test_open_datasets(grib_name: str) -> None:
assert len(res) > 1


@pytest.mark.parametrize(
"grib_name",
[
"era5-levels-members",
"fields_with_missing_values",
"lambert_grid",
"reduced_gg",
"regular_gg_sfc",
"regular_gg_pl",
"regular_gg_ml",
"regular_gg_ml_g2",
"regular_ll_sfc",
"regular_ll_msl",
"scanning_mode_64",
"single_gridpoint",
"spherical_harmonics",
"t_analysis_and_fc_0",
"hpa_and_pa",
"uv_on_different_levels",
]
)
def test_open_dataset_heterogeneous(grib_name: str) -> None:
grib_path = os.path.join(SAMPLE_DATA_FOLDER, grib_name + ".grib")

res = xarray_store.open_dataset(grib_path, backend_kwargs={
'filter_heterogeneous': True
})

assert isinstance(res, xr.Dataset)


@pytest.mark.parametrize(
"grib_name",
[
Expand Down