From 7de6eb7f24952de3666f5a6f809e6c02a303b5f5 Mon Sep 17 00:00:00 2001 From: Al-Iqram Elahee Date: Sat, 7 Oct 2023 22:18:46 -0600 Subject: [PATCH 1/5] feat: Allow users to configure the internal thread pool (#11) --- configurable-thread-pool | 1 + xee/ext.py | 9 ++++++--- 2 files changed, 7 insertions(+), 3 deletions(-) create mode 100644 configurable-thread-pool diff --git a/configurable-thread-pool b/configurable-thread-pool new file mode 100644 index 0000000..98b9386 --- /dev/null +++ b/configurable-thread-pool @@ -0,0 +1 @@ +Branch 'configurable-thread-pool' set up to track remote branch 'main' from 'origin'. diff --git a/xee/ext.py b/xee/ext.py index 65bd9e0..c1865bf 100644 --- a/xee/ext.py +++ b/xee/ext.py @@ -634,7 +634,7 @@ def reduce_bands(x, acc): return target_image def _raw_indexing_method( - self, key: tuple[Union[int, slice], ...] + self, key: tuple[Union[int, slice], ...], executor_kwargs: Optional[dict] = None ) -> np.typing.ArrayLike: key, squeeze_axes = self._key_to_slices(key) @@ -682,8 +682,11 @@ def _raw_indexing_method( for _ in range(shape[0]) ] - # TODO(#11): Allow users to configure this via kwargs. - with concurrent.futures.ThreadPoolExecutor() as pool: + # If executor_kwargs is None, use an empty dictionary + if executor_kwargs is None: + executor_kwargs = {} + # Pass executor_kwargs to ThreadPoolExecutor + with concurrent.futures.ThreadPoolExecutor(**executor_kwargs) as pool: for (i, j, k), arr in pool.map( self._make_tile, self._tile_indexes(key[0], bbox) ): From 2d464f2718945c4ad18ddfed3b049928bb18dce0 Mon Sep 17 00:00:00 2001 From: Al-Iqram Elahee Date: Wed, 11 Oct 2023 23:34:24 -0600 Subject: [PATCH 2/5] Moved executor_kwargs logic to constructor and removed unnecessary file --- xee/ext.py | 16 +++++++++++----- xee/ext_integration_test.py | 12 ++++++++++++ 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/xee/ext.py b/xee/ext.py index c1865bf..f75e16a 100644 --- a/xee/ext.py +++ b/xee/ext.py @@ -89,6 +89,12 @@ class EarthEngineStore(common.AbstractDataStore): DEFAULT_MASK_VALUE = np.iinfo(np.int32).max + def __init__(self, executor_kwargs=None): + if executor_kwargs is None: + self.executor_kwargs = {} + else: + self.executor_kwargs = executor_kwargs + @classmethod def open( cls, @@ -634,7 +640,7 @@ def reduce_bands(x, acc): return target_image def _raw_indexing_method( - self, key: tuple[Union[int, slice], ...], executor_kwargs: Optional[dict] = None + self, key: tuple[Union[int, slice], ...] ) -> np.typing.ArrayLike: key, squeeze_axes = self._key_to_slices(key) @@ -682,11 +688,9 @@ def _raw_indexing_method( for _ in range(shape[0]) ] - # If executor_kwargs is None, use an empty dictionary - if executor_kwargs is None: - executor_kwargs = {} + # Pass executor_kwargs to ThreadPoolExecutor - with concurrent.futures.ThreadPoolExecutor(**executor_kwargs) as pool: + with concurrent.futures.ThreadPoolExecutor(**self.executor_kwargs) as pool: for (i, j, k), arr in pool.map( self._make_tile, self._tile_indexes(key[0], bbox) ): @@ -775,6 +779,7 @@ def open_dataset( primary_dim_name: Optional[str] = None, primary_dim_property: Optional[str] = None, ee_mask_value: Optional[float] = None, + executor_kwargs: Optional[dict] = None, ) -> xarray.Dataset: """Open an Earth Engine ImageCollection as an Xarray Dataset. @@ -862,6 +867,7 @@ def open_dataset( primary_dim_name=primary_dim_name, primary_dim_property=primary_dim_property, mask_value=ee_mask_value, + executor_kwargs=executor_kwargs, ) store_entrypoint = backends_store.StoreBackendEntrypoint() diff --git a/xee/ext_integration_test.py b/xee/ext_integration_test.py index 0f69ad0..630de51 100644 --- a/xee/ext_integration_test.py +++ b/xee/ext_integration_test.py @@ -358,6 +358,18 @@ def test_data_sanity_check(self): self.assertNotEqual(temperature_2m.min(), 0.0) self.assertNotEqual(temperature_2m.max(), 0.0) + def test_open_dataset_with_executor_kwargs(self): + executor_kwargs = {'max_workers': 2} + ds = self.entry.open_dataset( + 'ee://LANDSAT/LC08/C01/T1', + drop_variables=tuple(f'B{i}' for i in range(3, 12)), + scale=25.0, + n_images=3, + executor_kwargs=executor_kwargs, + ) + + self.assertEqual(ds.thread_pool.max_workers, executor_kwargs['max_workers']) + if __name__ == '__main__': absltest.main() From d445578fdc430ae21137b206cbde3c9e4af23d4b Mon Sep 17 00:00:00 2001 From: Al-Iqram Elahee Date: Wed, 11 Oct 2023 23:36:45 -0600 Subject: [PATCH 3/5] Remove unnecessary configurable-thread-pool file --- configurable-thread-pool | 1 - 1 file changed, 1 deletion(-) delete mode 100644 configurable-thread-pool diff --git a/configurable-thread-pool b/configurable-thread-pool deleted file mode 100644 index 98b9386..0000000 --- a/configurable-thread-pool +++ /dev/null @@ -1 +0,0 @@ -Branch 'configurable-thread-pool' set up to track remote branch 'main' from 'origin'. From 42312702d5cfb3aff2b75dfe29f142369aea5547 Mon Sep 17 00:00:00 2001 From: Al-Iqram Elahee Date: Thu, 12 Oct 2023 15:30:00 -0600 Subject: [PATCH 4/5] Added executor_kwargs to EarthEngineStore --- xee/ext.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/xee/ext.py b/xee/ext.py index f75e16a..883cdb4 100644 --- a/xee/ext.py +++ b/xee/ext.py @@ -89,12 +89,6 @@ class EarthEngineStore(common.AbstractDataStore): DEFAULT_MASK_VALUE = np.iinfo(np.int32).max - def __init__(self, executor_kwargs=None): - if executor_kwargs is None: - self.executor_kwargs = {} - else: - self.executor_kwargs = executor_kwargs - @classmethod def open( cls, @@ -140,6 +134,7 @@ def __init__( primary_dim_name: Optional[str] = None, primary_dim_property: Optional[str] = None, mask_value: Optional[float] = None, + executor_kwargs: Optional[dict] = None, ): self.image_collection = image_collection if n_images != -1: @@ -170,6 +165,11 @@ def __init__( coordinates=f'{self.primary_dim_name} {x_dim_name} {y_dim_name}', crs=self.crs_arg, ) + # Initialize executor_kwargs + if executor_kwargs is None: + self.executor_kwargs = {} + else: + self.executor_kwargs = executor_kwargs # Scale in the projection's units. Typically, either meters or degrees. # If we use the default CRS i.e. EPSG:3857, the units is in meters. From 4cd311d8cca9b7e786788b80eb46fdd72cd69bed Mon Sep 17 00:00:00 2001 From: alhridoy Date: Sun, 15 Oct 2023 02:31:28 -0600 Subject: [PATCH 5/5] Added executor_kwargs parameter to EarthEngineStore and open_dataset methods, updates docstring --- xee/ext.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/xee/ext.py b/xee/ext.py index 883cdb4..e82c6ab 100644 --- a/xee/ext.py +++ b/xee/ext.py @@ -136,6 +136,12 @@ def __init__( mask_value: Optional[float] = None, executor_kwargs: Optional[dict] = None, ): + # Initialize executor_kwargs + if executor_kwargs is None: + self.executor_kwargs = {} + else: + self.executor_kwargs = executor_kwargs + self.image_collection = image_collection if n_images != -1: self.image_collection = image_collection.limit(n_images) @@ -165,12 +171,6 @@ def __init__( coordinates=f'{self.primary_dim_name} {x_dim_name} {y_dim_name}', crs=self.crs_arg, ) - # Initialize executor_kwargs - if executor_kwargs is None: - self.executor_kwargs = {} - else: - self.executor_kwargs = executor_kwargs - # Scale in the projection's units. Typically, either meters or degrees. # If we use the default CRS i.e. EPSG:3857, the units is in meters. default_scale = self.SCALE_UNITS.get(self.scale_units, 1) @@ -640,7 +640,7 @@ def reduce_bands(x, acc): return target_image def _raw_indexing_method( - self, key: tuple[Union[int, slice], ...] + self, key: tuple[Union[int, slice], ...] ) -> np.typing.ArrayLike: key, squeeze_axes = self._key_to_slices(key) @@ -841,7 +841,9 @@ def open_dataset( 'system:time_start'. ee_mask_value (optional): Value to mask to EE nodata values. By default, this is 'np.iinfo(np.int32).max' i.e. 2147483647. - + executor_kwargs (optional): A dictionary of keyword arguments to pass to + the ThreadPoolExecutor that handles the parallel computation of pixels. + Returns: An xarray.Dataset that streams in remote data from Earth Engine. """