diff --git a/recipes/fsspec_open_testing.ipynb b/recipes/fsspec_open_testing.ipynb new file mode 100644 index 0000000..bd19a45 --- /dev/null +++ b/recipes/fsspec_open_testing.ipynb @@ -0,0 +1,723 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 19, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "import fsspec\n", + "import xarray as xr\n", + "from fsspec.implementations.sftp import SFTPFileSystem" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Create SFTPFileSystem object to explore SFTP structure" + ] + }, + { + "cell_type": "code", + "execution_count": 33, + "metadata": {}, + "outputs": [], + "source": [ + "sftp_host = os.environ[\"GLEAM_FTP\"][7:-1] # host should not have sftp prefix\n", + "gleam_creds_sftp = dict(\n", + " username = os.environ[\"GLEAM_USER\"],\n", + " password = os.environ[\"GLEAM_PASSWORD\"],\n", + " port = int(os.environ[\"GLEAM_PORT\"])\n", + " )\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(sftp_host)" + ] + }, + { + "cell_type": "code", + "execution_count": 34, + "metadata": {}, + "outputs": [], + "source": [ + "# create link\n", + "fs_sftp = SFTPFileSystem(host=sftp_host, **gleam_creds_sftp)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Explore file structure" + ] + }, + { + "cell_type": "code", + "execution_count": 35, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "['./data']" + ] + }, + "execution_count": 35, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "fs_sftp.ls(\"./\")" + ] + }, + { + "cell_type": "code", + "execution_count": 36, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "['./data/CHANGELOG',\n", + " './data/README_GLEAM_v3.7.pdf',\n", + " './data/v3.7b',\n", + " './data/v3.8a']" + ] + }, + "execution_count": 36, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "fs_sftp.ls(\"./data\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "fs_sftp.ls(\"./data/v3.7b\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "fs_sftp.ls(\"./data/v3.7b/daily\")" + ] + }, + { + "cell_type": "code", + "execution_count": 37, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "['./data/v3.7b/daily/2003/E_2003_GLEAM_v3.7b.nc',\n", + " './data/v3.7b/daily/2003/Eb_2003_GLEAM_v3.7b.nc',\n", + " './data/v3.7b/daily/2003/Ei_2003_GLEAM_v3.7b.nc',\n", + " './data/v3.7b/daily/2003/Ep_2003_GLEAM_v3.7b.nc',\n", + " './data/v3.7b/daily/2003/Es_2003_GLEAM_v3.7b.nc',\n", + " './data/v3.7b/daily/2003/Et_2003_GLEAM_v3.7b.nc',\n", + " './data/v3.7b/daily/2003/Ew_2003_GLEAM_v3.7b.nc',\n", + " './data/v3.7b/daily/2003/SMroot_2003_GLEAM_v3.7b.nc',\n", + " './data/v3.7b/daily/2003/SMsurf_2003_GLEAM_v3.7b.nc',\n", + " './data/v3.7b/daily/2003/S_2003_GLEAM_v3.7b.nc']" + ] + }, + "execution_count": 37, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "fs_sftp.ls(\"./data/v3.7b/daily/2003\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# this will download the file\n", + "# fs_sftp.get('data/v3.7b/daily/2003/E_2003_GLEAM_v3.7b.nc', \"./download\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Create dictionary of environmental variables for GLEAM" + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "metadata": {}, + "outputs": [], + "source": [ + "gleam_creds = dict(\n", + " username = os.environ[\"GLEAM_USER\"],\n", + " password = os.environ[\"GLEAM_PASSWORD\"],\n", + " port = int(os.environ[\"GLEAM_PORT\"])\n", + " )" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Check that open file objects will work" + ] + }, + { + "cell_type": "code", + "execution_count": 38, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n" + ] + } + ], + "source": [ + "years = range(2003, 2023)\n", + "\n", + "for year in years:\n", + " # format file path on SFTP\n", + " filepath = f\"data/v3.7b/daily/{year}/Et_{year}_GLEAM_v3.7b.nc\"\n", + " # create full URL\n", + " urlpath = os.environ[\"GLEAM_FTP\"] + filepath\n", + "\n", + " # add to credentials to send to fsspec.open\n", + " gleam_creds[\"urlpath\"] = urlpath\n", + "\n", + " # create OpenFile object\n", + " file = fsspec.open(**gleam_creds)\n", + " print(file)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Start creating pangeo-forge-recipe" + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "metadata": {}, + "outputs": [], + "source": [ + "import pandas as pd\n", + "from pangeo_forge_recipes.patterns import ConcatDim, FilePattern\n", + "import apache_beam as beam\n", + "from pangeo_forge_recipes.transforms import OpenURLWithFSSpec, OpenWithXarray, StoreToZarr\n", + "from tempfile import TemporaryDirectory" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Create time range" + ] + }, + { + "cell_type": "code", + "execution_count": 32, + "metadata": {}, + "outputs": [], + "source": [ + "dates = pd.date_range(\"2003\", \"2005\", freq=\"A\")" + ] + }, + { + "cell_type": "code", + "execution_count": 23, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "ConcatDim(name='time', nitems_per_file=1)" + ] + }, + "execution_count": 23, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "time_concat_dim = ConcatDim(\"time\", dates, nitems_per_file=1)\n", + "time_concat_dim" + ] + }, + { + "cell_type": "code", + "execution_count": 24, + "metadata": {}, + "outputs": [], + "source": [ + "base_url = \"sftp://hydras.ugent.be/data/v3.7b/daily/{time:%Y}/Et_{time:%Y}_GLEAM_v3.7b.nc\"" + ] + }, + { + "cell_type": "code", + "execution_count": 25, + "metadata": {}, + "outputs": [], + "source": [ + "def make_url(time):\n", + " return base_url.format(time=time)" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "'sftp://hydras.ugent.be/data/v3.7b/daily/2003/Et_2003_GLEAM_v3.7b.nc'" + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "make_url(dates[-1])" + ] + }, + { + "cell_type": "code", + "execution_count": 26, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "" + ] + }, + "execution_count": 26, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "pattern = FilePattern(make_url, time_concat_dim, fsspec_open_kwargs=gleam_creds)\n", + "pattern" + ] + }, + { + "cell_type": "code", + "execution_count": 27, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{Dimension(name='time', operation=): Position(value=0, indexed=False)}\n", + "sftp://hydras.ugent.be/data/v3.7b/daily/2002/Et_2002_GLEAM_v3.7b.nc\n", + "{Dimension(name='time', operation=): Position(value=1, indexed=False)}\n", + "sftp://hydras.ugent.be/data/v3.7b/daily/2003/Et_2003_GLEAM_v3.7b.nc\n" + ] + } + ], + "source": [ + "for index, url in pattern.items():\n", + " print(index)\n", + " print(url)\n", + " # Stop after the 3rd filepath (September 3rd, 1981)\n", + " if '19810903' in url:\n", + " break" + ] + }, + { + "cell_type": "code", + "execution_count": 29, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "'/tmp/tmplepmcor8'" + ] + }, + "execution_count": 29, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "td = TemporaryDirectory()\n", + "target_path = td.name\n", + "target_name = \"output.zarr\"\n", + "target_path" + ] + }, + { + "cell_type": "code", + "execution_count": 30, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "<_ChainedPTransform(PTransform) label=[Create|OpenURLWithFSSpec|OpenWithXarray|StoreToZarr] at 0x7f1bff49ae00>" + ] + }, + "execution_count": 30, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "transforms = (\n", + " beam.Create(pattern.items())\n", + " | OpenURLWithFSSpec()\n", + " | OpenWithXarray(file_type=pattern.file_type)\n", + " | StoreToZarr(\n", + " target_root=target_path,\n", + " store_name=target_name,\n", + " combine_dims=pattern.combine_dim_keys,\n", + " )\n", + ")\n", + "transforms" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Run!" + ] + }, + { + "cell_type": "code", + "execution_count": 31, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "Custom TB Handler failed, unregistering\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Unexpected exception formatting exception. Falling back to standard exception\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "Traceback (most recent call last):\n", + " File \"apache_beam/runners/common.py\", line 1418, in apache_beam.runners.common.DoFnRunner.process\n", + " File \"apache_beam/runners/common.py\", line 838, in apache_beam.runners.common.PerWindowInvoker.invoke_process\n", + " File \"apache_beam/runners/common.py\", line 984, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/apache_beam/transforms/core.py\", line -1, in \n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/pangeo_forge_recipes/transforms.py\", line -1, in wrapper\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/pangeo_forge_recipes/openers.py\", line 35, in open_url\n", + " open_file = _get_opener(url, secrets, **kw)\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/pangeo_forge_recipes/storage.py\", line 212, in _get_opener\n", + " return fsspec.open(fname, mode=\"rb\", **open_kwargs)\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/fsspec/core.py\", line 439, in open\n", + " out = open_files(\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/fsspec/core.py\", line 282, in open_files\n", + " fs, fs_token, paths = get_fs_token_paths(\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/fsspec/core.py\", line 609, in get_fs_token_paths\n", + " fs = filesystem(protocol, **inkwargs)\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/fsspec/registry.py\", line 267, in filesystem\n", + " return cls(**storage_options)\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/fsspec/spec.py\", line 79, in __call__\n", + " obj = super().__call__(*args, **kwargs)\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/fsspec/implementations/sftp.py\", line 48, in __init__\n", + " self._connect()\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/fsspec/implementations/sftp.py\", line 54, in _connect\n", + " self.client.connect(self.host, **self.ssh_kwargs)\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/paramiko/client.py\", line 386, in connect\n", + " sock.connect(addr)\n", + "TimeoutError: [Errno 110] Connection timed out\n", + "\n", + "During handling of the above exception, another exception occurred:\n", + "\n", + "Traceback (most recent call last):\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/IPython/core/interactiveshell.py\", line 3508, in run_code\n", + " exec(code_obj, self.user_global_ns, self.user_ns)\n", + " File \"/tmp/ipykernel_10748/1176484966.py\", line 1, in \n", + " with beam.Pipeline() as p:\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/apache_beam/pipeline.py\", line 600, in __exit__\n", + " self.result = self.run()\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/apache_beam/pipeline.py\", line 577, in run\n", + " return self.runner.run_pipeline(self, self._options)\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/apache_beam/runners/direct/direct_runner.py\", line 129, in run_pipeline\n", + " return runner.run_pipeline(pipeline, options)\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py\", line 202, in run_pipeline\n", + " self._latest_run_result = self.run_via_runner_api(\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py\", line 224, in run_via_runner_api\n", + " return self.run_stages(stage_context, stages)\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py\", line 455, in run_stages\n", + " bundle_results = self._execute_bundle(\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py\", line 783, in _execute_bundle\n", + " self._run_bundle(\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py\", line 1012, in _run_bundle\n", + " result, splits = bundle_manager.process_bundle(\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py\", line 1348, in process_bundle\n", + " result_future = self._worker_handler.control_conn.push(process_bundle_req)\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py\", line 379, in push\n", + " response = self.worker.do_instruction(request)\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py\", line 629, in do_instruction\n", + " return getattr(self, request_type)(\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py\", line 667, in process_bundle\n", + " bundle_processor.process_bundle(instruction_id))\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py\", line 1061, in process_bundle\n", + " input_op_by_transform_id[element.transform_id].process_encoded(\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py\", line 231, in process_encoded\n", + " self.output(decoded_value)\n", + " File \"apache_beam/runners/worker/operations.py\", line 526, in apache_beam.runners.worker.operations.Operation.output\n", + " File \"apache_beam/runners/worker/operations.py\", line 528, in apache_beam.runners.worker.operations.Operation.output\n", + " File \"apache_beam/runners/worker/operations.py\", line 237, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive\n", + " File \"apache_beam/runners/worker/operations.py\", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive\n", + " File \"apache_beam/runners/worker/operations.py\", line 907, in apache_beam.runners.worker.operations.DoOperation.process\n", + " File \"apache_beam/runners/worker/operations.py\", line 908, in apache_beam.runners.worker.operations.DoOperation.process\n", + " File \"apache_beam/runners/common.py\", line 1420, in apache_beam.runners.common.DoFnRunner.process\n", + " File \"apache_beam/runners/common.py\", line 1492, in apache_beam.runners.common.DoFnRunner._reraise_augmented\n", + " File \"apache_beam/runners/common.py\", line 1418, in apache_beam.runners.common.DoFnRunner.process\n", + " File \"apache_beam/runners/common.py\", line 624, in apache_beam.runners.common.SimpleInvoker.invoke_process\n", + " File \"apache_beam/runners/common.py\", line 1582, in apache_beam.runners.common._OutputHandler.handle_process_outputs\n", + " File \"apache_beam/runners/common.py\", line 1695, in apache_beam.runners.common._OutputHandler._write_value_to_tag\n", + " File \"apache_beam/runners/worker/operations.py\", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive\n", + " File \"apache_beam/runners/worker/operations.py\", line 907, in apache_beam.runners.worker.operations.DoOperation.process\n", + " File \"apache_beam/runners/worker/operations.py\", line 908, in apache_beam.runners.worker.operations.DoOperation.process\n", + " File \"apache_beam/runners/common.py\", line 1420, in apache_beam.runners.common.DoFnRunner.process\n", + " File \"apache_beam/runners/common.py\", line 1492, in apache_beam.runners.common.DoFnRunner._reraise_augmented\n", + " File \"apache_beam/runners/common.py\", line 1418, in apache_beam.runners.common.DoFnRunner.process\n", + " File \"apache_beam/runners/common.py\", line 624, in apache_beam.runners.common.SimpleInvoker.invoke_process\n", + " File \"apache_beam/runners/common.py\", line 1582, in apache_beam.runners.common._OutputHandler.handle_process_outputs\n", + " File \"apache_beam/runners/common.py\", line 1695, in apache_beam.runners.common._OutputHandler._write_value_to_tag\n", + " File \"apache_beam/runners/worker/operations.py\", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive\n", + " File \"apache_beam/runners/worker/operations.py\", line 907, in apache_beam.runners.worker.operations.DoOperation.process\n", + " File \"apache_beam/runners/worker/operations.py\", line 908, in apache_beam.runners.worker.operations.DoOperation.process\n", + " File \"apache_beam/runners/common.py\", line 1420, in apache_beam.runners.common.DoFnRunner.process\n", + " File \"apache_beam/runners/common.py\", line 1492, in apache_beam.runners.common.DoFnRunner._reraise_augmented\n", + " File \"apache_beam/runners/common.py\", line 1418, in apache_beam.runners.common.DoFnRunner.process\n", + " File \"apache_beam/runners/common.py\", line 624, in apache_beam.runners.common.SimpleInvoker.invoke_process\n", + " File \"apache_beam/runners/common.py\", line 1582, in apache_beam.runners.common._OutputHandler.handle_process_outputs\n", + " File \"apache_beam/runners/common.py\", line 1695, in apache_beam.runners.common._OutputHandler._write_value_to_tag\n", + " File \"apache_beam/runners/worker/operations.py\", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive\n", + " File \"apache_beam/runners/worker/operations.py\", line 907, in apache_beam.runners.worker.operations.DoOperation.process\n", + " File \"apache_beam/runners/worker/operations.py\", line 908, in apache_beam.runners.worker.operations.DoOperation.process\n", + " File \"apache_beam/runners/common.py\", line 1420, in apache_beam.runners.common.DoFnRunner.process\n", + " File \"apache_beam/runners/common.py\", line 1508, in apache_beam.runners.common.DoFnRunner._reraise_augmented\n", + " File \"apache_beam/runners/common.py\", line 1418, in apache_beam.runners.common.DoFnRunner.process\n", + " File \"apache_beam/runners/common.py\", line 838, in apache_beam.runners.common.PerWindowInvoker.invoke_process\n", + " File \"apache_beam/runners/common.py\", line 984, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/apache_beam/transforms/core.py\", line -1, in \n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/pangeo_forge_recipes/transforms.py\", line -1, in wrapper\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/pangeo_forge_recipes/openers.py\", line 35, in open_url\n", + " open_file = _get_opener(url, secrets, **kw)\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/pangeo_forge_recipes/storage.py\", line 212, in _get_opener\n", + " return fsspec.open(fname, mode=\"rb\", **open_kwargs)\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/fsspec/core.py\", line 439, in open\n", + " out = open_files(\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/fsspec/core.py\", line 282, in open_files\n", + " fs, fs_token, paths = get_fs_token_paths(\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/fsspec/core.py\", line 609, in get_fs_token_paths\n", + " fs = filesystem(protocol, **inkwargs)\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/fsspec/registry.py\", line 267, in filesystem\n", + " return cls(**storage_options)\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/fsspec/spec.py\", line 79, in __call__\n", + " obj = super().__call__(*args, **kwargs)\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/fsspec/implementations/sftp.py\", line 48, in __init__\n", + " self._connect()\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/fsspec/implementations/sftp.py\", line 54, in _connect\n", + " self.client.connect(self.host, **self.ssh_kwargs)\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/paramiko/client.py\", line 386, in connect\n", + " sock.connect(addr)\n", + "RuntimeError: TimeoutError: [Errno 110] Connection timed out [while running '[31]: Create|OpenURLWithFSSpec|OpenWithXarray|StoreToZarr/OpenURLWithFSSpec/Open with fsspec']\n", + "\n", + "During handling of the above exception, another exception occurred:\n", + "\n", + "Traceback (most recent call last):\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/IPython/core/interactiveshell.py\", line 1975, in wrapped\n", + " stb = handler(self,etype,value,tb,tb_offset=tb_offset)\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/dask/base.py\", line 79, in _clean_ipython_traceback\n", + " stb = self.InteractiveTB.structured_traceback(\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/IPython/core/ultratb.py\", line 1428, in structured_traceback\n", + " return FormattedTB.structured_traceback(\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/IPython/core/ultratb.py\", line 1319, in structured_traceback\n", + " return VerboseTB.structured_traceback(\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/IPython/core/ultratb.py\", line 1172, in structured_traceback\n", + " formatted_exception = self.format_exception_as_a_whole(etype, evalue, etb, number_of_lines_of_context,\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/IPython/core/ultratb.py\", line 1087, in format_exception_as_a_whole\n", + " frames.append(self.format_record(record))\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/IPython/core/ultratb.py\", line 969, in format_record\n", + " frame_info.lines, Colors, self.has_colors, lvals\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/IPython/core/ultratb.py\", line 792, in lines\n", + " return self._sd.lines\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/stack_data/utils.py\", line 144, in cached_property_wrapper\n", + " value = obj.__dict__[self.func.__name__] = self.func(obj)\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/stack_data/core.py\", line 734, in lines\n", + " pieces = self.included_pieces\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/stack_data/utils.py\", line 144, in cached_property_wrapper\n", + " value = obj.__dict__[self.func.__name__] = self.func(obj)\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/stack_data/core.py\", line 681, in included_pieces\n", + " pos = scope_pieces.index(self.executing_piece)\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/stack_data/utils.py\", line 144, in cached_property_wrapper\n", + " value = obj.__dict__[self.func.__name__] = self.func(obj)\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/stack_data/core.py\", line 660, in executing_piece\n", + " return only(\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/executing/executing.py\", line 190, in only\n", + " raise NotOneValueFound('Expected one value, found 0')\n", + "executing.executing.NotOneValueFound: Expected one value, found 0\n", + "\n", + "During handling of the above exception, another exception occurred:\n", + "\n", + "Traceback (most recent call last):\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/IPython/core/interactiveshell.py\", line 3448, in run_ast_nodes\n", + " if await self.run_code(code, result, async_=asy):\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/IPython/core/interactiveshell.py\", line 3526, in run_code\n", + " self.CustomTB(etype, value, tb)\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/IPython/core/interactiveshell.py\", line 1982, in wrapped\n", + " stb = self.InteractiveTB.structured_traceback(*sys.exc_info())\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/IPython/core/ultratb.py\", line 1428, in structured_traceback\n", + " return FormattedTB.structured_traceback(\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/IPython/core/ultratb.py\", line 1319, in structured_traceback\n", + " return VerboseTB.structured_traceback(\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/IPython/core/ultratb.py\", line 1191, in structured_traceback\n", + " formatted_exceptions += self.format_exception_as_a_whole(etype, evalue, etb, lines_of_context,\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/IPython/core/ultratb.py\", line 1087, in format_exception_as_a_whole\n", + " frames.append(self.format_record(record))\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/IPython/core/ultratb.py\", line 969, in format_record\n", + " frame_info.lines, Colors, self.has_colors, lvals\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/IPython/core/ultratb.py\", line 792, in lines\n", + " return self._sd.lines\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/stack_data/utils.py\", line 144, in cached_property_wrapper\n", + " value = obj.__dict__[self.func.__name__] = self.func(obj)\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/stack_data/core.py\", line 734, in lines\n", + " pieces = self.included_pieces\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/stack_data/utils.py\", line 144, in cached_property_wrapper\n", + " value = obj.__dict__[self.func.__name__] = self.func(obj)\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/stack_data/core.py\", line 681, in included_pieces\n", + " pos = scope_pieces.index(self.executing_piece)\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/stack_data/utils.py\", line 144, in cached_property_wrapper\n", + " value = obj.__dict__[self.func.__name__] = self.func(obj)\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/stack_data/core.py\", line 660, in executing_piece\n", + " return only(\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/executing/executing.py\", line 190, in only\n", + " raise NotOneValueFound('Expected one value, found 0')\n", + "executing.executing.NotOneValueFound: Expected one value, found 0\n", + "\n", + "During handling of the above exception, another exception occurred:\n", + "\n", + "Traceback (most recent call last):\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/IPython/core/interactiveshell.py\", line 2105, in showtraceback\n", + " stb = self.InteractiveTB.structured_traceback(\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/IPython/core/ultratb.py\", line 1428, in structured_traceback\n", + " return FormattedTB.structured_traceback(\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/IPython/core/ultratb.py\", line 1319, in structured_traceback\n", + " return VerboseTB.structured_traceback(\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/IPython/core/ultratb.py\", line 1191, in structured_traceback\n", + " formatted_exceptions += self.format_exception_as_a_whole(etype, evalue, etb, lines_of_context,\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/IPython/core/ultratb.py\", line 1087, in format_exception_as_a_whole\n", + " frames.append(self.format_record(record))\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/IPython/core/ultratb.py\", line 969, in format_record\n", + " frame_info.lines, Colors, self.has_colors, lvals\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/IPython/core/ultratb.py\", line 792, in lines\n", + " return self._sd.lines\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/stack_data/utils.py\", line 144, in cached_property_wrapper\n", + " value = obj.__dict__[self.func.__name__] = self.func(obj)\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/stack_data/core.py\", line 734, in lines\n", + " pieces = self.included_pieces\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/stack_data/utils.py\", line 144, in cached_property_wrapper\n", + " value = obj.__dict__[self.func.__name__] = self.func(obj)\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/stack_data/core.py\", line 681, in included_pieces\n", + " pos = scope_pieces.index(self.executing_piece)\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/stack_data/utils.py\", line 144, in cached_property_wrapper\n", + " value = obj.__dict__[self.func.__name__] = self.func(obj)\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/stack_data/core.py\", line 660, in executing_piece\n", + " return only(\n", + " File \"/home/alaws/mambaforge/envs/pfrecip/lib/python3.10/site-packages/executing/executing.py\", line 190, in only\n", + " raise NotOneValueFound('Expected one value, found 0')\n", + "executing.executing.NotOneValueFound: Expected one value, found 0\n" + ] + } + ], + "source": [ + "with beam.Pipeline() as p:\n", + " p | transforms" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "pfrecip", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.12" + }, + "orig_nbformat": 4 + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/recipes/local_recipe.py b/recipes/local_recipe.py new file mode 100644 index 0000000..c35a1fc --- /dev/null +++ b/recipes/local_recipe.py @@ -0,0 +1,41 @@ +import pandas as pd +from pangeo_forge_recipes.patterns import ConcatDim, FilePattern +import apache_beam as beam +from pangeo_forge_recipes.transforms import OpenURLWithFSSpec, OpenWithXarray, StoreToZarr +from tempfile import TemporaryDirectory +import os + +gleam_creds = dict( + username = os.environ["GLEAM_USER"], + password = os.environ["GLEAM_PASSWORD"], + port = int(os.environ["GLEAM_PORT"]) + ) + +dates = pd.date_range("2003", "2005", freq="A") + +time_concat_dim = ConcatDim("time", dates, nitems_per_file=1) + +base_url = "sftp://hydras.ugent.be/data/v3.7b/daily/{time:%Y}/Et_{time:%Y}_GLEAM_v3.7b.nc" + +def make_url(time): + return base_url.format(time=time) + +pattern = FilePattern(make_url, time_concat_dim, fsspec_open_kwargs=gleam_creds) + +td = TemporaryDirectory() +target_path = td.name +target_name = "output.zarr" + +transforms = ( + beam.Create(pattern.items()) + | OpenURLWithFSSpec(open_kwargs=gleam_creds) + | OpenWithXarray(file_type=pattern.file_type) + | StoreToZarr( + target_root=target_path, + store_name=target_name, + combine_dims=pattern.combine_dim_keys, + ) +) + +with beam.Pipeline() as p: + p | transforms