Skip to content

Commit

Permalink
Merge branch 'main' into geometry_bounds
Browse files Browse the repository at this point in the history
  • Loading branch information
dabhicusp committed Mar 28, 2024
2 parents b8960a4 + 5fe4395 commit c0d8aa2
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 58 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ jobs:
id: pip-cache
run: |
python -m pip install --upgrade pip wheel
echo "::set-output name=dir::$(pip cache dir)"
echo "dir=$(pip cache dir)" >> "$GITHUB_OUTPUT"
- name: pip cache
uses: actions/cache@v2
with:
Expand Down
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,20 @@ i = ee.ImageCollection(ee.Image("LANDSAT/LC08/C02/T1_TOA/LC08_044034_20140318"))
ds = xarray.open_dataset(i, engine='ee')
```

Open any Earth Engine ImageCollection to match an existing transform:

```python
raster = rioxarray.open_rasterio(...) # assume crs + transform is set
ds = xr.open_dataset(
'ee://ECMWF/ERA5_LAND/HOURLY',
engine='ee',
geometry=tuple(raster.rio.bounds()), # must be in EPSG:4326
projection=ee.Projection(
crs=str(raster.rio.crs), transform=raster.rio.transform()[:6]
),
)
```

See [examples](https://github.com/google/Xee/tree/main/examples) or [docs](https://github.com/google/Xee/tree/main/docs) for more uses and integrations.

## How to run integration tests
Expand Down
119 changes: 83 additions & 36 deletions xee/ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import math
import os
import sys
from typing import Any, Dict, Iterable, List, Literal, Optional, Tuple, Union
from typing import Any, Dict, Iterable, List, Literal, Optional, Sequence, Tuple, Union
from urllib import parse
import warnings

Expand Down Expand Up @@ -102,6 +102,11 @@ class EarthEngineStore(common.AbstractDataStore):
'height': 256,
}

GETITEM_KWARGS: Dict[str, int] = {
'max_retries': 6,
'initial_delay': 500,
}

SCALE_UNITS: Dict[str, int] = {
'degree': 1,
'metre': 10_000,
Expand Down Expand Up @@ -139,13 +144,15 @@ def open(
crs: Optional[str] = None,
scale: Optional[float] = None,
projection: Optional[ee.Projection] = None,
geometry: Optional[ee.Geometry] = None,
geometry: ee.Geometry | Tuple[float, float, float, float] | None = None,
primary_dim_name: Optional[str] = None,
primary_dim_property: Optional[str] = None,
mask_value: Optional[float] = None,
request_byte_limit: int = REQUEST_BYTE_LIMIT,
ee_init_kwargs: Optional[Dict[str, Any]] = None,
ee_init_if_necessary: bool = False,
executor_kwargs: Optional[Dict[str, Any]] = None,
getitem_kwargs: Optional[Dict[str, int]] = None,
) -> 'EarthEngineStore':
if mode != 'r':
raise ValueError(
Expand All @@ -166,6 +173,8 @@ def open(
request_byte_limit=request_byte_limit,
ee_init_kwargs=ee_init_kwargs,
ee_init_if_necessary=ee_init_if_necessary,
executor_kwargs=executor_kwargs,
getitem_kwargs=getitem_kwargs,
)

def __init__(
Expand All @@ -176,17 +185,26 @@ def __init__(
crs: Optional[str] = None,
scale: Union[float, int, None] = None,
projection: Optional[ee.Projection] = None,
geometry: Optional[ee.Geometry] = None,
geometry: ee.Geometry | Tuple[float, float, float, float] | None = None,
primary_dim_name: Optional[str] = None,
primary_dim_property: Optional[str] = None,
mask_value: Optional[float] = None,
request_byte_limit: int = REQUEST_BYTE_LIMIT,
ee_init_kwargs: Optional[Dict[str, Any]] = None,
ee_init_if_necessary: bool = False,
executor_kwargs: Optional[Dict[str, Any]] = None,
getitem_kwargs: Optional[Dict[str, int]] = None,
):
self.ee_init_kwargs = ee_init_kwargs
self.ee_init_if_necessary = ee_init_if_necessary

# Initialize executor_kwargs
if executor_kwargs is None:
executor_kwargs = {}
self.executor_kwargs = executor_kwargs

self.getitem_kwargs = {**self.GETITEM_KWARGS, **(getitem_kwargs or {})}

self.image_collection = image_collection
if n_images != -1:
self.image_collection = image_collection.limit(n_images)
Expand Down Expand Up @@ -228,26 +246,7 @@ def __init__(
self.scale_x, self.scale_y = transform.a, transform.e
self.scale = np.sqrt(np.abs(transform.determinant))

# Parse the dataset bounds from the native projection (either from the CRS
# or the image geometry) and translate it to the representation that will be
# used for all internal `computePixels()` calls.
try:
if isinstance(geometry, ee.Geometry):
x_min_0, y_min_0, x_max_0, y_max_0 = _ee_bounds_to_bounds(
self.get_info['bounds']
)
else:
x_min_0, y_min_0, x_max_0, y_max_0 = self.crs.area_of_use.bounds
except AttributeError:
# `area_of_use` is probable `None`. Parse the geometry from the first
# image instead (calculated in self.get_info())
x_min_0, y_min_0, x_max_0, y_max_0 = _ee_bounds_to_bounds(
self.get_info['bounds']
)

x_min, y_min = self.transform(x_min_0, y_min_0)
x_max, y_max = self.transform(x_max_0, y_max_0)
self.bounds = x_min, y_min, x_max, y_max
self.bounds = self._determine_bounds(geometry=geometry)

max_dtype = self._max_itemsize()

Expand Down Expand Up @@ -466,16 +465,19 @@ def image_to_array(
example, a `grid` dictionary.
Returns:
An numpy array containing the pixels computed based on the given image.
A numpy array containing the pixels computed based on the given image.
"""
image = image.unmask(self.mask_value)
image = image.unmask(self.mask_value, False)
params = {
'expression': image,
'fileFormat': 'NUMPY_NDARRAY',
**kwargs,
}
raw = common.robust_getitem(
pixels_getter, params, catch=ee.ee_exception.EEException
pixels_getter,
params,
catch=ee.ee_exception.EEException,
**self.getitem_kwargs,
)

# Extract out the shape information from EE response.
Expand Down Expand Up @@ -587,7 +589,7 @@ def _get_tile_from_ee(
)
target_image = ee.Image.pixelCoordinates(ee.Projection(self.crs_arg))
return tile_index, self.image_to_array(
target_image, grid=bbox, dtype=np.float32, bandIds=[band_id]
target_image, grid=bbox, dtype=np.float64, bandIds=[band_id]
)

def _process_coordinate_data(
Expand All @@ -603,14 +605,47 @@ def _process_coordinate_data(
for i in range(tile_count)
]
tiles = [None] * tile_count
with concurrent.futures.ThreadPoolExecutor() as pool:
with concurrent.futures.ThreadPoolExecutor(**self.executor_kwargs) as pool:
for i, arr in pool.map(
self._get_tile_from_ee,
list(zip(data, itertools.cycle([coordinate_type]))),
):
tiles[i] = arr.flatten()
return np.concatenate(tiles)

def _determine_bounds(
self,
geometry: ee.Geometry | Tuple[float, float, float, float] | None = None,
) -> Tuple[float, float, float, float]:
if geometry is None:
try:
x_min_0, y_min_0, x_max_0, y_max_0 = self.crs.area_of_use.bounds
except AttributeError:
# `area_of_use` is probably `None`. Parse the geometry from the first
# image instead (calculated in self.get_info())
x_min_0, y_min_0, x_max_0, y_max_0 = _ee_bounds_to_bounds(
self.get_info['bounds']
)
elif isinstance(geometry, ee.Geometry):
x_min_0, y_min_0, x_max_0, y_max_0 = _ee_bounds_to_bounds(
self.get_info['bounds']
)
elif isinstance(geometry, Sequence):
if len(geometry) != 4:
raise ValueError(
'geometry must be a tuple or list of length 4, or a ee.Geometry, '
f'but got {geometry!r}'
)
x_min_0, y_min_0, x_max_0, y_max_0 = geometry
else:
raise ValueError(
'geometry must be a tuple or list of length 4, a ee.Geometry, or'
f' None but got {type(geometry)}'
)
x_min, y_min = self.transform(x_min_0, y_min_0)
x_max, y_max = self.transform(x_max_0, y_max_0)
return x_min, y_min, x_max, y_max

def get_variables(self) -> utils.Frozen[str, xarray.Variable]:
vars_ = [(name, self.open_store_variable(name)) for name in self._bands()]

Expand Down Expand Up @@ -694,7 +729,7 @@ def _parse_dtype(data_type: types.DataType):


def _ee_bounds_to_bounds(bounds: ee.Bounds) -> types.Bounds:
coords = np.array(bounds['coordinates'], dtype=np.float32)[0]
coords = np.array(bounds['coordinates'], dtype=np.float64)[0]
x_min, y_min, x_max, y_max = (
min(coords[:, 0]),
min(coords[:, 1]),
Expand Down Expand Up @@ -723,8 +758,7 @@ def __init__(self, variable_name: str, ee_store: EarthEngineStore):
# It looks like different bands have different dimensions & transforms!
# Can we get this into consistent dimensions?
self._info = ee_store._band_attrs(variable_name)
self.dtype = _parse_dtype(self._info['data_type'])

self.dtype = np.dtype(np.float32)
x_min, y_min, x_max, y_max = self.bounds

# Make sure the size is at least 1x1.
Expand Down Expand Up @@ -863,8 +897,9 @@ def _raw_indexing_method(
for _ in range(shape[0])
]

# TODO(#11): Allow users to configure this via kwargs.
with concurrent.futures.ThreadPoolExecutor() as pool:
with concurrent.futures.ThreadPoolExecutor(
**self.store.executor_kwargs
) as pool:
for (i, j, k), arr in pool.map(
self._make_tile, self._tile_indexes(key[0], bbox)
):
Expand Down Expand Up @@ -949,13 +984,15 @@ def open_dataset(
crs: Optional[str] = None,
scale: Union[float, int, None] = None,
projection: Optional[ee.Projection] = None,
geometry: Optional[ee.Geometry] = None,
geometry: ee.Geometry | Tuple[float, float, float, float] | None = None,
primary_dim_name: Optional[str] = None,
primary_dim_property: Optional[str] = None,
ee_mask_value: Optional[float] = None,
request_byte_limit: int = REQUEST_BYTE_LIMIT,
ee_init_if_necessary: bool = False,
ee_init_kwargs: Optional[Dict[str, Any]] = None,
executor_kwargs: Optional[Dict[str, Any]] = None,
getitem_kwargs: Optional[Dict[str, int]] = None,
) -> xarray.Dataset: # type: ignore
"""Open an Earth Engine ImageCollection as an Xarray Dataset.
Expand Down Expand Up @@ -1008,7 +1045,8 @@ def open_dataset(
coalesce all variables upon opening. By default, the scale and reference
system is set by the the `crs` and `scale` arguments.
geometry (optional): Specify an `ee.Geometry` to define the regional
bounds when opening the data. When not set, the bounds are defined by
bounds when opening the data or a bbox specifying [x_min, y_min, x_max,
y_max] in EPSG:4326. When not set, the bounds are defined by
the CRS's 'area_of_use` boundaries. If those aren't present, the bounds
are derived from the geometry of the first image of the collection.
primary_dim_name (optional): Override the name of the primary dimension of
Expand All @@ -1025,7 +1063,14 @@ def open_dataset(
frameworks.
ee_init_kwargs: keywords to pass to Earth Engine Initialize when
attempting to auto init for remote workers.
executor_kwargs (optional): A dictionary of keyword arguments to pass to
the ThreadPoolExecutor that handles the parallel computation of pixels
i.e. {'max_workers': 2}.
getitem_kwargs (optional): Exponential backoff kwargs passed into the
xarray function to index the array (`robust_getitem`).
- 'max_retries', the maximum number of retry attempts. Defaults to 6.
- 'initial_delay', the initial delay in milliseconds before the first
retry. Defaults to 500.
Returns:
An xarray.Dataset that streams in remote data from Earth Engine.
"""
Expand Down Expand Up @@ -1054,6 +1099,8 @@ def open_dataset(
request_byte_limit=request_byte_limit,
ee_init_kwargs=ee_init_kwargs,
ee_init_if_necessary=ee_init_if_necessary,
executor_kwargs=executor_kwargs,
getitem_kwargs=getitem_kwargs,
)

store_entrypoint = backends_store.StoreBackendEntrypoint()
Expand Down
Loading

0 comments on commit c0d8aa2

Please sign in to comment.