Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature Update: Retry logic for the ee.data.computePixels update. #151

Merged
merged 12 commits into from
Mar 15, 2024
34 changes: 32 additions & 2 deletions xee/ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ class EarthEngineStore(common.AbstractDataStore):
'height': 256,
}

TILE_FETCH_KWARGS: Dict[str, int] = {
'max_retries': 6,
'initial_delay': 500,
}

SCALE_UNITS: Dict[str, int] = {
'degree': 1,
'metre': 10_000,
Expand Down Expand Up @@ -147,6 +152,7 @@ def open(
ee_init_kwargs: Optional[Dict[str, Any]] = None,
ee_init_if_necessary: bool = False,
executor_kwargs: Optional[Dict[str, Any]] = None,
tile_fetch_kwargs: Dict[str, int] = TILE_FETCH_KWARGS,
) -> 'EarthEngineStore':
if mode != 'r':
raise ValueError(
Expand All @@ -168,6 +174,7 @@ def open(
ee_init_kwargs=ee_init_kwargs,
ee_init_if_necessary=ee_init_if_necessary,
executor_kwargs=executor_kwargs,
tile_fetch_kwargs=tile_fetch_kwargs,
)

def __init__(
Expand All @@ -186,6 +193,7 @@ def __init__(
ee_init_kwargs: Optional[Dict[str, Any]] = None,
ee_init_if_necessary: bool = False,
executor_kwargs: Optional[Dict[str, Any]] = None,
tile_fetch_kwargs: Dict[str, int] = TILE_FETCH_KWARGS,
):
self.ee_init_kwargs = ee_init_kwargs
self.ee_init_if_necessary = ee_init_if_necessary
Expand All @@ -195,6 +203,8 @@ def __init__(
executor_kwargs = {}
self.executor_kwargs = executor_kwargs

self.tile_fetch_kwargs = tile_fetch_kwargs

self.image_collection = image_collection
if n_images != -1:
self.image_collection = image_collection.limit(n_images)
Expand Down Expand Up @@ -482,7 +492,15 @@ def image_to_array(
**kwargs,
}
raw = common.robust_getitem(
dabhicusp marked this conversation as resolved.
Show resolved Hide resolved
pixels_getter, params, catch=ee.ee_exception.EEException
pixels_getter,
params,
catch=ee.ee_exception.EEException,
max_retries=self.tile_fetch_kwargs.get(
dabhicusp marked this conversation as resolved.
Show resolved Hide resolved
'max_retries', self.TILE_FETCH_KWARGS.get('max_retries')
),
initial_delay=self.tile_fetch_kwargs.get(
'initial_delay', self.TILE_FETCH_KWARGS.get('initial_delay')
),
)

# Extract out the shape information from EE response.
Expand Down Expand Up @@ -965,6 +983,10 @@ def open_dataset(
ee_init_if_necessary: bool = False,
ee_init_kwargs: Optional[Dict[str, Any]] = None,
executor_kwargs: Optional[Dict[str, Any]] = None,
tile_fetch_kwargs: Dict[str, int] = {
dabhicusp marked this conversation as resolved.
Show resolved Hide resolved
'max_retries': 6,
'initial_delay': 500,
},
) -> xarray.Dataset: # type: ignore
"""Open an Earth Engine ImageCollection as an Xarray Dataset.

Expand Down Expand Up @@ -1037,7 +1059,14 @@ def open_dataset(
executor_kwargs (optional): A dictionary of keyword arguments to pass to
the ThreadPoolExecutor that handles the parallel computation of pixels
i.e. {'max_workers': 2}.

tile_fetch_kwargs (Dict): The necessary kwargs like `max_retries`,
`initial_delay` which helps while fetching data through calling
ee.data.computePixels(). i.e. {'max_retries' : 6, 'initial_delay': 500}.
- max_retries is maximum number of retry attempts for calling
ee.data.computePixels().By default, it is 6.
- initial_delay is the initial delay in milliseconds before the first
retry of calling ee.data.computePixels(). By default, it is 500.
(https://github.com/pydata/xarray/blob/main/xarray/backends/common.py#L181).
Returns:
An xarray.Dataset that streams in remote data from Earth Engine.
"""
Expand Down Expand Up @@ -1067,6 +1096,7 @@ def open_dataset(
ee_init_kwargs=ee_init_kwargs,
ee_init_if_necessary=ee_init_if_necessary,
executor_kwargs=executor_kwargs,
tile_fetch_kwargs=tile_fetch_kwargs,
)

store_entrypoint = backends_store.StoreBackendEntrypoint()
Expand Down
15 changes: 15 additions & 0 deletions xee/ext_integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def setUp(self):
'2017-01-01', '2017-01-03'
),
n_images=64,
tile_fetch_kwargs={'max_retries': 10, 'initial_delay': 1500},
)
self.lnglat_store = xee.EarthEngineStore(
ee.ImageCollection.fromImages([ee.Image.pixelLonLat()]),
Expand All @@ -80,6 +81,7 @@ def setUp(self):
'2020-03-30', '2020-04-01'
),
n_images=64,
tile_fetch_kwargs={'max_retries': 9},
)
self.all_img_store = xee.EarthEngineStore(
ee.ImageCollection('LANDSAT/LC08/C01/T1').filterDate(
Expand Down Expand Up @@ -255,6 +257,19 @@ def __getitem__(self, params):

self.assertEqual(getter.count, 3)

def test_tile_fetch_kwargs(self):
arr = xee.EarthEngineBackendArray('B4', self.store)
self.assertEqual(arr.store.tile_fetch_kwargs['initial_delay'], 1500)
self.assertEqual(arr.store.tile_fetch_kwargs['max_retries'], 10)

arr1 = xee.EarthEngineBackendArray('longitude', self.lnglat_store)
self.assertEqual(arr1.store.tile_fetch_kwargs['initial_delay'], 500)
self.assertEqual(arr1.store.tile_fetch_kwargs['max_retries'], 6)

arr1 = xee.EarthEngineBackendArray('spi2y', self.conus_store)
self.assertNotIn('initial_delay', arr1.store.tile_fetch_kwargs)
self.assertEqual(arr1.store.tile_fetch_kwargs['max_retries'], 9)


class EEBackendEntrypointTest(absltest.TestCase):

Expand Down
Loading