Skip to content

Commit

Permalink
Xee support for distributed worker initialization/authentication
Browse files Browse the repository at this point in the history
This adds an Earth Engine initialization check for dataset operations for remote workers to call Earth Engine. Also adds docs for submitting a Dataflow job using Xee

Dataflow jobs would fail with Xee due to the remote workers not having the EE client library initialized. This adds a check to all calls for the `EarthEngineBackendArray` object so that if there is a call to EE, it will be initialized if not already. It also includes a parameter a user has to set to allow the attempt of automatically initializing Earth Engine for workers, this is so users are explicit and know that EE will try initializing.

There was discussion on issue #99 regarding documentation for how to do initialize/authenticate on distributed cluster and this also includes a Dataflow example (docs, cloud setup, script) where that users can start from.

close #51

PiperOrigin-RevId: 596966033
  • Loading branch information
Xee authors committed Feb 5, 2024
1 parent 6700ed1 commit 1d15522
Show file tree
Hide file tree
Showing 7 changed files with 278 additions and 3 deletions.
5 changes: 5 additions & 0 deletions examples/dataflow/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
FROM apache/beam_python3.9_sdk:2.51.0

COPY requirements.txt ./

RUN pip install -r requirements.txt
117 changes: 117 additions & 0 deletions examples/dataflow/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# Xee Dataflow Example

This example illustrates how to run an Xee Beam process using Dataflow on Google Cloud Platform.

The example requires a Google Cloud account and will incur charges!

## Cloud setup

To begin, there is a fair amount of setup of Cloud resources to execute the workflow on a Cloud Project.

This example assumes you have the [Google Cloud SDK installed](https://cloud.google.com/sdk/docs/install) and an [Earth Engine project setup with your Cloud Project](https://developers.google.com/earth-engine/cloud/earthengine_cloud_project_setup).


### Set environment variables

These environment variables are used throughout the example to make life easier when working across different Cloud environments. These get Cloud project info as well as set naming information for infrastructure setup in example.

```shell
PROJECT=$(gcloud config get-value project)

REGION=us-central1

REPO=xee-dataflow
CONTAINER=beam-runner

SA_NAME=xee-dataflow-controller
SERVICE_ACCOUNT=${SA_NAME}@${PROJECT}.iam.gserviceaccount.com
```

### Create custom Docker Container with dependencies

One of the suggested ways to handle external dependencies within a Beam pipeline is to [use a custom Docker Container](https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/#custom-containers) with the pipeline. This is useful because each remote worker will need to install dependencies when it spins up and having a pre-built container makes that much quicker.

To do this with Google Cloud, one must first create an [Artifact Registry](https://cloud.google.com/artifact-registry/docs/overview) repository where the Docker Container can be stored and then build/push the container to the registry repository.

To create an Artifact Registry repository run the following command:

```shell
gcloud artifacts repositories create $REPO \
--location=$REGION \
--repository-format=docker \
--description="Repository for hosting the Docker images to test xee with Dataflow" \
--async
```

The next step is to build the Docker Container and push to the repository just created. This is done using [Cloud Build](https://cloud.google.com/build/docs/overview) with a configuration file. The config file defines how the image is built and where it is stored.

The `cloudbuild.yaml` file has general variables that need to be replaced with information with your Cloud environment. Open the file in your favorite text editor and replace "REGION" with the Cloud Region you selected, "YOUR_PROJECT" with your Cloud Project ID, "REPO" with the Artifact Registry repository name, and "CONTAINER" with the container name.

Alternatively, you can replace them with the following command:

```shell
sed -i 's/REGION/'"$REGION"'/g; s/YOUR_PROJECT/'"$PROJECT"'/g; s/REPO/'"$REPO"'/g; s/CONTAINER/'"$CONTAINER"'/g' cloudbuild.yaml
```

Run the following command to build the container to use with Dataflow:

```shell
gcloud builds submit --config cloudbuild.yaml
```

### Create custom Docker Container with dependencies

This example will output data to a Cloud Storage bucket so one needs to be created
for the pipeline. To do so run the following command:

```shell
gsutil mb -l $REGION gs://xee-out-${PROJECT}
```

Cloud bucket names need to be globally unique so this uses the Cloud Project Number (also globally unique) in the name.

### Create a Service Account

Service Accounts (SA) are used for authorization of remote workers to make calls to different services. It is good practice to create a SA for a specific process and this is to limit the roles assigned to one individual SA required for the process.

To create a SA run the following code:

```shell
gcloud iam service-accounts create ${SA_NAME} \
--description="Controller service account for services used with Dataflow" \
--display-name="Xee Dataflow Controller SA"
```

next assign the required roles to the Service Account to properly manage workers and read/write data.

```shell
roles=("roles/earthengine.writer" "roles/serviceusage.serviceUsageConsumer" "roles/storage.objectAdmin" "roles/artifactregistry.reader" "roles/dataflow.worker")

for role in ${roles[@]}
do
gcloud projects add-iam-policy-binding ${PROJECT} \
--member=serviceAccount:${SERVICE_ACCOUNT} \
--role=${role}
done
```

Now that all of the Cloud infrastructure is setup, it is time to run the pipeline!

## Run the pipeline

This example is focused on pulling data from Earth Engine, transforming the data into Zarr formats and storing the results. There is the script `ee_to_zarr_dataflow.py` script that defines the pipeline and passing command line arguments define how it is executed with Dataflow.

```shell
python ee_to_zarr_dataflow.py \
--input NASA/GPM_L3/IMERG_V06 \
--output gs://xee-out-${PROJECT} \
--target_chunks='time=6' \
--runner DataflowRunner \
--project $PROJECT \
--region $REGION \
--temp_location gs://xee-out-${PROJECT}/tmp/ \
--service_account_email $SERVICE_ACCOUNT \
--sdk_location=container \
--sdk_container_image=${REGION}-docker.pkg.dev/${PROJECT}/${REPO}/${CONTAINER} \
--job_name imerg-dataflow-$(date '+%Y%m%d%H%M%S')
```
4 changes: 4 additions & 0 deletions examples/dataflow/cloudbuild.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
steps:
- name: 'gcr.io/cloud-builders/docker'
args: [ 'build', '-t', 'REGION-docker.pkg.dev/YOUR_PROJECT/REPO/CONTAINER', '.' ]
images: ['REGION-docker.pkg.dev/YOUR_PROJECT/REPO/CONTAINER']
117 changes: 117 additions & 0 deletions examples/dataflow/ee_to_zarr_dataflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
r"""Exports EE ImageCollections to Zarr using Xarray-Beam."""


import logging
from typing import Dict, List

from absl import app
from absl import flags
import apache_beam as beam
import xarray as xr
import xarray_beam as xbeam
import xee

import ee

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


_INPUT = flags.DEFINE_string(
'input', '', help='The input Earth Engine ImageCollection.'
)
_CRS = flags.DEFINE_string(
'crs',
'EPSG:4326',
help='Coordinate Reference System for output Zarr.',
)
_SCALE = flags.DEFINE_float('scale', 0.25, help='Scale factor for output Zarr.')
_TARGET_CHUNKS = flags.DEFINE_string(
'target_chunks',
'',
help=(
'Chunks on the input Zarr dataset to change on the outputs, in the '
'form of a comma separated dimension=size pairs, e.g., '
"--target_chunks='x=10,y=10'. Omitted dimensions are not changed and a "
'chunksize of -1 indicates not to chunk a dimension.'
),
)
_OUTPUT = flags.DEFINE_string('output', '', help='The output zarr path.')
_RUNNER = flags.DEFINE_string('runner', None, help='beam.runners.Runner')


# pylint: disable=unused-argument
def _parse_dataflow_flags(argv: List[str]) -> List[str]:
parser = flags.argparse_flags.ArgumentParser(
description='parser for dataflow flags',
allow_abbrev=False,
)
_, dataflow_args = parser.parse_known_args()
return dataflow_args


# Borrowed from the xbeam examples:
# https://github.com/google/xarray-beam/blob/4f4fcb965a65b5d577601af311d0e0142ee38076/examples/xbeam_rechunk.py#L41
def _parse_chunks_str(chunks_str: str) -> Dict[str, int]:
chunks = {}
parts = chunks_str.split(',')
for part in parts:
k, v = part.split('=')
chunks[k] = int(v)
return chunks


def main(argv: list[str]) -> None:
assert _INPUT.value, 'Must specify --input'
assert _OUTPUT.value, 'Must specify --output'

source_chunks = {'time': 24}
target_chunks = dict(source_chunks, **_parse_chunks_str(_TARGET_CHUNKS.value))

ee.Initialize()

input_coll = (
ee.ImageCollection(_INPUT.value)
.limit(100, 'system:time_start', True)
.select('precipitationCal')
)

ds = xr.open_dataset(
input_coll,
crs=_CRS.value,
scale=_SCALE.value,
engine=xee.EarthEngineBackendEntrypoint,
)
template = xbeam.make_template(ds)
itemsize = max(variable.dtype.itemsize for variable in template.values())

with beam.Pipeline(runner=_RUNNER.value, argv=argv) as root:
_ = (
root
| xbeam.DatasetToChunks(ds, source_chunks)
| xbeam.Rechunk(
ds.sizes,
source_chunks,
target_chunks,
itemsize=itemsize,
)
| xbeam.ChunksToZarr(_OUTPUT.value, template, target_chunks)
)


if __name__ == '__main__':
app.run(main, flags_parser=_parse_dataflow_flags)
1 change: 1 addition & 0 deletions examples/dataflow/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
xee[examples]
9 changes: 6 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,14 @@ tests = [
"rasterio",
"rioxarray",
]
examples = [
"apache_beam[gcp]",
"xarray-beam",
dataflow = [
"absl-py",
"apache-beam[gcp]",
"gcsfs",
"xarray-beam",
]
examples = [
"xee[dataflow]",
]

[project.urls]
Expand Down
28 changes: 28 additions & 0 deletions xee/ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ def open(
primary_dim_property: Optional[str] = None,
mask_value: Optional[float] = None,
request_byte_limit: int = REQUEST_BYTE_LIMIT,
ee_init_kwargs: Optional[Dict[str, Any]] = None,
ee_init_if_necessary: bool = False,
) -> 'EarthEngineStore':
if mode != 'r':
raise ValueError(
Expand All @@ -162,6 +164,8 @@ def open(
primary_dim_property=primary_dim_property,
mask_value=mask_value,
request_byte_limit=request_byte_limit,
ee_init_kwargs=ee_init_kwargs,
ee_init_if_necessary=ee_init_if_necessary,
)

def __init__(
Expand All @@ -177,7 +181,12 @@ def __init__(
primary_dim_property: Optional[str] = None,
mask_value: Optional[float] = None,
request_byte_limit: int = REQUEST_BYTE_LIMIT,
ee_init_kwargs: Optional[Dict[str, Any]] = None,
ee_init_if_necessary: bool = False,
):
self.ee_init_kwargs = ee_init_kwargs
self.ee_init_if_necessary = ee_init_if_necessary

self.image_collection = image_collection
if n_images != -1:
self.image_collection = image_collection.limit(n_images)
Expand Down Expand Up @@ -722,6 +731,15 @@ def __init__(self, variable_name: str, ee_store: EarthEngineStore):
if isinstance(self.store.chunks, dict):
self._apparent_chunks = self.store.chunks.copy()

def _ee_init_check(self):
if not ee.data.is_initialized() and self.store.ee_init_if_necessary:
warnings.warn(
'Earth Engine is not initialized on worker. '
'Attempting to initialize using application default credentials.'
)

ee.Initialize(**(self.store.ee_init_kwargs or {}))

def __getitem__(self, key: indexing.ExplicitIndexer) -> np.typing.ArrayLike:
return indexing.explicit_indexing_adapter(
key,
Expand Down Expand Up @@ -761,6 +779,7 @@ def _slice_collection(self, image_slice: slice) -> ee.Image:
"""Reduce the ImageCollection into an Image with bands as index slices."""
# Get the right range of Images in the collection, either a single image or
# a range of images...
self._ee_init_check()
start, stop, stride = image_slice.indices(self.shape[0])

# If the input images have IDs, just slice them. Otherwise, we need to do
Expand Down Expand Up @@ -930,6 +949,8 @@ def open_dataset(
primary_dim_property: Optional[str] = None,
ee_mask_value: Optional[float] = None,
request_byte_limit: int = REQUEST_BYTE_LIMIT,
ee_init_if_necessary: bool = False,
ee_init_kwargs: Optional[Dict[str, Any]] = None,
) -> xarray.Dataset: # type: ignore
"""Open an Earth Engine ImageCollection as an Xarray Dataset.
Expand Down Expand Up @@ -994,6 +1015,11 @@ def open_dataset(
this is 'np.iinfo(np.int32).max' i.e. 2147483647.
request_byte_limit: the max allowed bytes to request at a time from Earth
Engine. By default, it is 48MBs.
ee_init_if_necessary: boolean flag to set if auto initialize for Earth
Engine should be attempted. Set to True if using distributed compute
frameworks.
ee_init_kwargs: keywords to pass to Earth Engine Initialize when
attempting to auto init for remote workers.
Returns:
An xarray.Dataset that streams in remote data from Earth Engine.
Expand Down Expand Up @@ -1021,6 +1047,8 @@ def open_dataset(
primary_dim_property=primary_dim_property,
mask_value=ee_mask_value,
request_byte_limit=request_byte_limit,
ee_init_kwargs=ee_init_kwargs,
ee_init_if_necessary=ee_init_if_necessary,
)

store_entrypoint = backends_store.StoreBackendEntrypoint()
Expand Down

0 comments on commit 1d15522

Please sign in to comment.