From 4c50b799fbf52678940fbdd5e47aabfd26cce408 Mon Sep 17 00:00:00 2001 From: Ian Bolliger Date: Wed, 3 Apr 2024 04:09:11 +0000 Subject: [PATCH] update to SLIIDERS v1.2 --- HISTORY.rst | 5 +- .../models/create-surge-lookup-tables.ipynb | 136 ++++----- notebooks/models/fit-movefactor.ipynb | 210 ++++++------- .../models/run-pyCIAM-slrquantiles.ipynb | 30 +- notebooks/post-processing/zenodo-upload.ipynb | 278 +++++++++++------- notebooks/shared.py | 119 ++++---- pyCIAM/io.py | 28 +- pyCIAM/run.py | 66 +++-- pyCIAM/surge/__init__.py | 3 +- pyCIAM/surge/_calc.py | 8 +- pyCIAM/surge/damage_funcs.py | 7 +- pyCIAM/surge/lookup.py | 8 +- pyCIAM/utils.py | 22 +- 13 files changed, 501 insertions(+), 419 deletions(-) diff --git a/HISTORY.rst b/HISTORY.rst index 23d138e..4d32d89 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -1,11 +1,12 @@ History ======= -v1.2 (unreleased) -------------------- +v1.2.0 +------ * Point `data-acquisition.ipynb` to updated Zenodo deposit that fixes the dtype of `subsets` variable in `diaz2016_inputs_raw.zarr.zip` to be bool rather than int8 * Variable name bugfix in `data-acquisition.ipynb` * Add netcdf versions of SLIIDERS and the pyCIAM results to `upload-zenodo.ipynb` +* Update results in Zenodo record to use SLIIDERS v1.2 v1.1.2 ------ diff --git a/notebooks/models/create-surge-lookup-tables.ipynb b/notebooks/models/create-surge-lookup-tables.ipynb index 6d250e4..6568d90 100644 --- a/notebooks/models/create-surge-lookup-tables.ipynb +++ b/notebooks/models/create-surge-lookup-tables.ipynb @@ -30,8 +30,23 @@ { "cell_type": "code", "execution_count": 1, + "id": "6b183aa3-366a-4f5f-bb95-5ef4bf48d438", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "%load_ext autoreload\n", + "%autoreload 2" + ] + }, + { + "cell_type": "code", + "execution_count": 2, "id": "cd4b2928-65a0-4733-9c71-fbefa85590be", - "metadata": {}, + "metadata": { + "tags": [] + }, "outputs": [], "source": [ "import sys\n", @@ -41,16 +56,26 @@ }, { "cell_type": "code", - "execution_count": 2, + "execution_count": 3, "id": "afd03880-8e46-4d81-867d-bfb6505ea788", - "metadata": {}, + "metadata": { + "tags": [] + }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ - "/srv/conda/envs/notebook/lib/python3.9/site-packages/dask_gateway/client.py:21: FutureWarning: format_bytes is deprecated and will be removed in a future release. Please use dask.utils.format_bytes instead.\n", - " from distributed.utils import LoopRunner, format_bytes\n" + "/srv/conda/envs/notebook/lib/python3.10/site-packages/google/cloud/storage/transfer_manager.py:30: UserWarning: The module `transfer_manager` is a preview feature. Functionality and API may change. This warning will be removed in a future release.\n", + " warnings.warn(\n", + "/home/jovyan/git-repos/pyCIAM-public/notebooks/models/../shared.py:3: UserWarning: Shapely 2.0 is installed, but because PyGEOS is also installed, GeoPandas will still use PyGEOS by default for now. To force to use and test Shapely 2.0, you have to set the environment variable USE_PYGEOS=0. You can do this before starting the Python process, or in your code before importing geopandas:\n", + "\n", + "import os\n", + "os.environ['USE_PYGEOS'] = '0'\n", + "import geopandas\n", + "\n", + "In a future release, GeoPandas will switch to using Shapely by default. If you are using PyGEOS directly (calling PyGEOS functions on geometries from GeoPandas), this will then stop working and you are encouraged to migrate from PyGEOS to Shapely 2.0 (https://shapely.readthedocs.io/en/latest/migration_pygeos.html).\n", + " import geopandas as gpd\n" ] } ], @@ -75,7 +100,7 @@ }, { "cell_type": "code", - "execution_count": 3, + "execution_count": 4, "id": "47979c48-7fff-49b4-b445-ea6bf746fa37", "metadata": { "tags": [ @@ -86,8 +111,8 @@ "source": [ "# When running on larger/scalable dask cluster, may wish to specify number of workers\n", "# Default is LocalCluster which will use the number of CPUs available on local machine\n", - "# N_WORKERS_MIN = 7\n", - "# N_WORKERS_MAX = 700\n", + "N_WORKERS_MIN = 7\n", + "N_WORKERS_MAX = 700\n", "SEG_CHUNKSIZE = 5\n", "\n", "PARAMS = pd.read_json(PATH_PARAMS)[\"values\"]" @@ -95,9 +120,11 @@ }, { "cell_type": "code", - "execution_count": 4, + "execution_count": 5, "id": "6104bced", - "metadata": {}, + "metadata": { + "tags": [] + }, "outputs": [], "source": [ "DMF_I = getattr(damage_funcs, PARAMS.dmf + \"_i\")\n", @@ -106,70 +133,31 @@ }, { "cell_type": "code", - "execution_count": 5, + "execution_count": 6, "id": "dbb04b4a-14b8-4403-ad33-88bfe71bd8fc", - "metadata": {}, + "metadata": { + "tags": [] + }, "outputs": [ { "data": { - "text/html": [ - "
\n", - "
\n", - "
\n", - "

Client

\n", - "

Client-f992a2c6-c6d1-11ed-862e-665816eadc72

\n", - " \n", - "\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "\n", - "
Connection method: Cluster objectCluster type: dask_gateway.GatewayCluster
\n", - " Dashboard: /services/dask-gateway/clusters/daskhub-dev.e0f56d1ca0ae4a42b9352ad6b7204454/status\n", - "
\n", - "\n", - " \n", - "
\n", - "

Cluster Info

\n", - "
\n", - "

GatewayCluster

\n", - " \n", - "
\n", - "\n", - "
\n", - " \n", - "\n", - "
\n", - "
" - ], + "application/vnd.jupyter.widget-view+json": { + "model_id": "fcadb94377b4430d9c2964ac9c808c2a", + "version_major": 2, + "version_minor": 0 + }, "text/plain": [ - "" + "VBox(children=(HTML(value='

GatewayCluster

'), HBox(children=(HTML(value='\\n
\\n
<xarray.Dataset>\n",
        "Dimensions:   ()\n",
        "Data variables:\n",
-       "    K_2019    float64 6.5\n",
-       "    pop_2019  float64 9.5
" + " K_2019 float64 6.2\n", + " pop_2019 float64 8.9" ], "text/plain": [ "\n", "Dimensions: ()\n", "Data variables:\n", - " K_2019 float64 6.5\n", - " pop_2019 float64 9.5" + " K_2019 float64 6.2\n", + " pop_2019 float64 8.9" ] }, - "execution_count": 12, + "execution_count": 28, "metadata": {}, "output_type": "execute_result" } @@ -740,30 +746,30 @@ }, { "cell_type": "code", - "execution_count": 13, - "id": "61fa9dc4-bbeb-4710-bc84-65bdf6cd6a4c", - "metadata": {}, + "execution_count": 29, + "id": "29d533e9-8571-4a2f-9adc-b7a2d03af6d7", + "metadata": { + "tags": [] + }, "outputs": [ { "data": { "text/plain": [ - "(0.0, 45675703.08690297)" + "(0.0, 48382814.95746777)" ] }, - "execution_count": 13, + "execution_count": 29, "metadata": {}, "output_type": "execute_result" }, { "data": { - "image/png": "\n", + "image/png": "", "text/plain": [ - "
" + "
" ] }, - "metadata": { - "needs_background": "light" - }, + "metadata": {}, "output_type": "display_data" } ], @@ -778,9 +784,11 @@ }, { "cell_type": "code", - "execution_count": 14, + "execution_count": 31, "id": "17c41589-b943-42a8-9522-67e24f3ddf47", - "metadata": {}, + "metadata": { + "tags": [] + }, "outputs": [], "source": [ "for v in mvmt.variables:\n", @@ -791,9 +799,11 @@ }, { "cell_type": "code", - "execution_count": 15, + "execution_count": 32, "id": "f9812bce-b427-4941-820d-700ed1262d1a", - "metadata": {}, + "metadata": { + "tags": [] + }, "outputs": [ { "data": { @@ -801,13 +811,13 @@ "(None, None)" ] }, - "execution_count": 15, + "execution_count": 32, "metadata": {}, "output_type": "execute_result" } ], "source": [ - "client.cluster.close(), client.close()" + "cluster.close(), client.close()" ] }, { @@ -1692,7 +1702,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.9.10" + "version": "3.12.2" }, "widgets": { "application/vnd.jupyter.widget-state+json": { diff --git a/notebooks/models/run-pyCIAM-slrquantiles.ipynb b/notebooks/models/run-pyCIAM-slrquantiles.ipynb index b76556d..aa3ae75 100644 --- a/notebooks/models/run-pyCIAM-slrquantiles.ipynb +++ b/notebooks/models/run-pyCIAM-slrquantiles.ipynb @@ -11,7 +11,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 2, "metadata": { "tags": [] }, @@ -24,12 +24,13 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 3, "metadata": { "tags": [] }, "outputs": [], "source": [ + "from pyCIAM.run import execute_pyciam\n", "from shared import (\n", " AUTHOR,\n", " CONTACT,\n", @@ -47,14 +48,12 @@ " QUANTILES,\n", " STORAGE_OPTIONS,\n", " start_dask_cluster,\n", - ")\n", - "\n", - "from pyCIAM.run import execute_pyciam" + ")" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 4, "metadata": { "tags": [ "parameters" @@ -68,10 +67,9 @@ "\n", "# When running on larger/scalable dask cluster, may wish to specify number of workers\n", "# Default is LocalCluster which will use the number of CPUs available on local machine\n", - "N_WORKERS = None\n", - "# N_WORKERS = 400\n", + "N_WORKERS = 100\n", "\n", - "SEG_CHUNKSIZE = 3\n", + "SEG_CHUNKSIZE = 2\n", "\n", "SEG_ADM_SUBSET = None\n", "\n", @@ -80,7 +78,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 5, "metadata": { "tags": [] }, @@ -102,15 +100,14 @@ }, "outputs": [], "source": [ - "client = start_dask_cluster(\n", - " n_workers=N_WORKERS,\n", - ")\n", - "client" + "client, cluster = start_dask_cluster()\n", + "cluster.scale(N_WORKERS)\n", + "cluster" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 8, "metadata": { "tags": [] }, @@ -126,6 +123,7 @@ " output_path=PATH_OUTPUTS,\n", " seg_var_subset=SEG_ADM_SUBSET,\n", " surge_input_paths=PATHS_SURGE_LOOKUP,\n", + " pyciam_seg_chunksize=SEG_CHUNKSIZE,\n", " tmp_output_path=TMPPATH,\n", " quantiles=QUANTILES,\n", " dask_client_func=lambda: client,\n", @@ -156,7 +154,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.11.0" + "version": "3.12.2" }, "widgets": { "application/vnd.jupyter.widget-state+json": { diff --git a/notebooks/post-processing/zenodo-upload.ipynb b/notebooks/post-processing/zenodo-upload.ipynb index 8394580..3804624 100644 --- a/notebooks/post-processing/zenodo-upload.ipynb +++ b/notebooks/post-processing/zenodo-upload.ipynb @@ -35,7 +35,7 @@ "outputs": [], "source": [ "import json\n", - "from os import environ\n", + "import re\n", "from pathlib import Path\n", "from shutil import make_archive\n", "from tempfile import TemporaryDirectory\n", @@ -43,29 +43,62 @@ "import dask.config\n", "import requests\n", "import shared\n", - "import xarray as xr\n", + "from cloudpathlib import AnyPath\n", "from sliiders import settings as sset\n", - "from sliiders.io import open_zarr\n", "from zarr import ZipStore" ] }, { "cell_type": "code", "execution_count": 4, - "id": "f804bd08-6edb-43f6-8bbd-7a2b98281c29", + "id": "fc342d12-c698-4f64-aaea-fcbfa027431f", "metadata": {}, "outputs": [], "source": [ - "PATH_SLIIDERS_NC = sset.PATH_SLIIDERS.parent / (sset.PATH_SLIIDERS.stem + \".nc\")\n", - "PATH_OUTPUTS_NC = shared.PATH_OUTPUTS.parent / (shared.PATH_OUTPUTS.stem + \".nc\")\n", + "PATT_OUTPUTS_NC = shared.PATH_OUTPUTS.parent / (shared.PATH_OUTPUTS.stem + \"_{case}.nc\")\n", + "PATH_SLIIDERS_NC = sset.PATH_SLIIDERS.parent / (sset.PATH_SLIIDERS.stem + \".nc\")" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "a125e75e-2122-4bd1-a994-37f1dbaea124", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Processing noAdaptation\n", + "Processing protect10\n", + "Processing protect100\n", + "Processing protect1000\n", + "Processing protect10000\n", + "Processing retreat1\n", + "Processing retreat10\n", + "Processing retreat100\n", + "Processing retreat1000\n", + "Processing retreat10000\n", + "Processing optimalfixed\n" + ] + } + ], + "source": [ + "ds = shared.open_zarr(shared.PATH_OUTPUTS)\n", + "fpaths = []\n", + "for case in ds.case.values:\n", + " print(f\"Processing {case}\")\n", + " fpath = AnyPath(str(PATT_OUTPUTS_NC).format(case=case))\n", + " fpaths.append(fpath)\n", + " if not fpath.exists():\n", + " shared.save_dataset(ds.sel(case=case).load(), fpath)\n", "\n", - "shared.save_dataset(shared.open_zarr(sset.PATH_SLIIDERS).load(), PATH_SLIIDERS_NC)\n", - "shared.save_dataset(shared.open_zarr(sset.PATH_OUTPUTS).load(), PATH_OUTPUTS_NC)" + "shared.save_dataset(shared.open_zarr(sset.PATH_SLIIDERS).load(), PATH_SLIIDERS_NC)" ] }, { "cell_type": "code", - "execution_count": 4, + "execution_count": 6, "id": "d37ca26f-091d-4cb6-be02-385464e0b687", "metadata": { "tags": [] @@ -74,10 +107,10 @@ { "data": { "text/plain": [ - "" + "" ] }, - "execution_count": 4, + "execution_count": 6, "metadata": {}, "output_type": "execute_result" } @@ -96,7 +129,7 @@ }, { "cell_type": "code", - "execution_count": 21, + "execution_count": 7, "id": "55a3e849-a554-49fa-882c-846acf76f3b8", "metadata": { "tags": [ @@ -108,16 +141,20 @@ "ACCESS_TOKEN = \"Q5z5IQ1m5Z9l1QS7ZYeV78IS5bqPmhzcFVo0KSNLoh2p39HRMPgFoJsCyQt5\"\n", "VERSION = \"1.2.0\"\n", "TITLES = {\n", - " \"SLIIDERS\": \"SLIIDERS: Sea Level Impacts Input Dataset by Elevation, Region, and Scenario\",\n", - " # \"pyCIAM\": \"Estimates of Global Coastal Losses Under Multiple Sea Level Rise Scenarios\",\n", + " # \"SLIIDERS\": (\n", + " # \"SLIIDERS: Sea Level Impacts Input Dataset by Elevation, Region, and Scenario\"\n", + " # ),\n", + " \"pyCIAM\": (\n", + " \"Estimates of Global Coastal Losses Under Multiple Sea Level Rise Scenarios\"\n", + " ),\n", "}\n", - "PYCIAM_CODE_PATH = Path(\"pyCIAM-1.1.2.zip\")\n", - "SLIIDERS_CODE_PATH = Path(\"/tmp/sliiders-1.2.zip\")" + "PYCIAM_CODE_PATH = Path(\"pyCIAM.zip\")\n", + "SLIIDERS_CODE_PATH = Path(\"/tmp/sliiders.zip\")" ] }, { "cell_type": "code", - "execution_count": 22, + "execution_count": 8, "id": "8725b84c-8a7f-4d88-96ec-849e963ef8ce", "metadata": {}, "outputs": [], @@ -127,7 +164,7 @@ }, { "cell_type": "code", - "execution_count": 23, + "execution_count": 9, "id": "956c40ee-e3e8-4257-bd0d-2c938546604d", "metadata": {}, "outputs": [], @@ -162,7 +199,7 @@ }, { "cell_type": "code", - "execution_count": 24, + "execution_count": 10, "id": "b5fcb2b8-3fb4-4789-bae0-18f7a66b7c51", "metadata": {}, "outputs": [], @@ -170,22 +207,25 @@ "# Metadata\n", "AUTHORS = [\n", " {\n", - " \"affiliation\": \"Energy & Resources Group, University of California, Berkeley; Global Policy Lab, Goldman School of Public Policy, University of California, Berkeley\",\n", + " \"affiliation\": \"United Nations Development Programme\",\n", " \"name\": \"Depsky, Nicholas\",\n", " \"orcid\": \"0000-0002-9441-9042\",\n", " },\n", " {\n", - " \"affiliation\": \"BlackRock; Global Policy Lab, Goldman School of Public Policy, University of California, Berkeley\",\n", + " \"affiliation\": (\n", + " \"Reask; Global Policy Lab, Goldman School of Public Policy, University of \"\n", + " \"California, Berkeley\"\n", + " ),\n", " \"name\": \"Bolliger, Ian\",\n", " \"orcid\": \"0000-0001-8055-297X\",\n", " },\n", " {\n", - " \"affiliation\": \"Global Policy Lab, Goldman School of Public Policy, University of California, Berkeley\",\n", + " \"affiliation\": \"Recidiviz\",\n", " \"name\": \"Allen, Daniel\",\n", " \"orcid\": \"0000-0001-5366-5178\",\n", " },\n", " {\n", - " \"affiliation\": \"Energy Policy Institute, University of Chicago\",\n", + " \"affiliation\": \"Columbia University\",\n", " \"name\": \"Choi, Jun Ho\",\n", " \"orcid\": \"0000-0003-0749-9222\",\n", " },\n", @@ -195,12 +235,15 @@ " \"orcid\": \"0000-0002-2414-045X\",\n", " },\n", " {\n", - " \"affiliation\": \"National Bureau of Economic Research; Energy Policy Institute, University of Chicago\",\n", + " \"affiliation\": (\n", + " \"National Bureau of Economic Research; Energy Policy Institute, University \"\n", + " \"of Chicago\"\n", + " ),\n", " \"name\": \"Greenstone, Michael\",\n", " \"orcid\": \"0000-0002-2364-2810\",\n", " },\n", " {\n", - " \"affiliation\": \"The Rhodium Group\",\n", + " \"affiliation\": \"BlackRock\",\n", " \"name\": \"Hamidi, Ali\",\n", " \"orcid\": \"0000-0001-6235-0303\",\n", " },\n", @@ -210,12 +253,18 @@ " \"orcid\": \"0000-0002-0514-7058\",\n", " },\n", " {\n", - " \"affiliation\": \"Global Policy Lab, Goldman School of Public Policy, University of California, Berkeley; National Bureau of Economic Research\",\n", + " \"affiliation\": (\n", + " \"Global Policy Lab, Goldman School of Public Policy, University of \"\n", + " \"California, Berkeley; National Bureau of Economic Research\"\n", + " ),\n", " \"name\": \"Hsiang, Solomon\",\n", " \"orcid\": \"0000-0002-2074-0829\",\n", " },\n", " {\n", - " \"affiliation\": \"Department of Earth & Planetary Sciences and Rutgers Institute of Earth, Ocean and Atmospheric Sciences, Rutgers University\",\n", + " \"affiliation\": (\n", + " \"Department of Earth & Planetary Sciences and Rutgers Institute of Earth, \"\n", + " \"Ocean and Atmospheric Sciences, Rutgers University\"\n", + " ),\n", " \"name\": \"Kopp, Robert E.\",\n", " \"orcid\": \"0000-0003-4016-9428\",\n", " },\n", @@ -232,7 +281,7 @@ }, { "cell_type": "code", - "execution_count": 25, + "execution_count": 11, "id": "55188020-c02f-4fa6-bf4c-cf1cd87e1754", "metadata": {}, "outputs": [], @@ -247,16 +296,17 @@ " sset.PATH_SEG_PTS_MANUAL,\n", " ],\n", " },\n", + " # uncomment Diaz inputs if a re-upload is necessary\n", " \"pyCIAM\": {\n", " \"products\": [\n", " shared.PATH_OUTPUTS,\n", - " PATH_OUTPUTS_NC,\n", - " shared.PATH_DIAZ_RES,\n", + " # shared.PATH_DIAZ_RES,\n", " shared.PATH_MOVEFACTOR_DATA,\n", + " *fpaths,\n", " ],\n", " \"inputs\": [\n", - " shared.PATH_DIAZ_INPUTS_RAW,\n", - " shared.PATH_SLR_AR5_QUANTILES,\n", + " # shared.PATH_DIAZ_INPUTS_RAW,\n", + " # shared.PATH_SLR_AR5_QUANTILES,\n", " shared.PATH_SLIIDERS_INCOME_INTERMEDIATE_FILE,\n", " shared.PATHS_SURGE_LOOKUP[\"seg\"],\n", " shared.PATHS_SURGE_LOOKUP[\"seg_adm\"],\n", @@ -282,26 +332,37 @@ }, { "cell_type": "code", - "execution_count": 40, + "execution_count": 12, "id": "50c10e4c-b3eb-4714-8d32-486ac878d041", "metadata": {}, "outputs": [], "source": [ "def create_draft_deposit(name, update_dict={}, overwrite=False):\n", " dep = EXISTING_DEPOSITS[name]\n", - " # create new deposit\n", - " deposition_id = dep[\"id\"]\n", - " if \"latest_draft\" not in dep[\"links\"]:\n", - " url = f\"{Z_URL}/{deposition_id}/actions/newversion\"\n", - " r = requests.post(url, params=PARAMS)\n", - " if r.status_code not in [200, 201]:\n", - " raise ValueError(f\"{r.status_code}: {r.text}\")\n", + "\n", + " # create new deposit if needed\n", + " r = requests.post(dep[\"links\"][\"newversion\"], params=PARAMS)\n", + " # case 1: this is already a new unpublished version\n", + " if (\n", + " r.status_code == 404\n", + " and r.json()[\"message\"] == \"The persistent identifier is not registered.\"\n", + " ):\n", + " pass\n", + " # case 2: this is a successful new version request and we need to grab the new\n", + " # version deposition\n", + " elif r.status_code in [200, 201]:\n", + " # returned value would be original deposit version in case of new version\n", + " # created\n", " dep = r.json()\n", + " # case 3: some other error\n", + " else:\n", + " raise ValueError(f\"{r.status_code}: {r.text}\")\n", + "\n", + " dep = requests.get(dep[\"links\"][\"latest_draft\"], params=PARAMS).json()\n", "\n", " if overwrite:\n", " new_id = dep[\"links\"][\"latest_draft\"].split(\"/\")[-1]\n", " files = requests.get(dep[\"links\"][\"files\"], params=PARAMS).json()\n", - " print(files)\n", " if len(files):\n", " for f in files:\n", " file_url = f\"{Z_URL}/{new_id}/files/{f['id']}\"\n", @@ -324,31 +385,22 @@ "\n", "\n", "def create_all_new_deposits(titles=TITLES, overwrite=False):\n", - " deps = {}\n", - " ids = {}\n", - " for t in titles.keys():\n", - " dep = create_draft_deposit(t, overwrite=overwrite)\n", - " ids[t] = int(dep[\"links\"][\"latest_draft\"].split(\"/\")[-1])\n", - " all_deps = requests.get(\n", - " Z_URL,\n", - " params={\"access_token\": ACCESS_TOKEN},\n", - " ).json()\n", - " for t in titles.keys():\n", - " dep = [d for d in all_deps if d[\"id\"] == ids[t]]\n", - " assert len(dep) == 1\n", - " deps[t] = dep[0]\n", - " return deps\n", + " return {t: create_draft_deposit(t, overwrite=overwrite) for t in titles.keys()}\n", + "\n", + "\n", + "def _get_zenodo_name(fname):\n", + " # drop a datestamp if it exists\n", + " zenodo_name = re.sub(r\"_\\d{8}\", \"\", fname.name)\n", + " # drop version from name\n", + " return \"-\".join([i for i in zenodo_name.split(\"-\") if shared.RES_VERS not in i])\n", "\n", "\n", - "def upload_file(deposit_link_dict, fname, root, zenodo_name=None, overwrite=False):\n", + "def upload_file(\n", + " deposit_link_dict, fname, zenodo_name=None, overwrite=False, existing_files={}\n", + "):\n", " if zenodo_name is None:\n", - " zenodo_name = fname.name\n", - " zenodo_name = root + zenodo_name\n", + " zenodo_name = _get_zenodo_name(fname)\n", "\n", - " existing_files = {\n", - " f[\"filename\"]: f\n", - " for f in requests.get(deposit_link_dict[\"files\"], params=PARAMS).json()\n", - " }\n", " if zenodo_name in existing_files:\n", " if not overwrite:\n", " print(\"...Skipping b/c already uploaded\")\n", @@ -362,27 +414,36 @@ " data=fp,\n", " )\n", "\n", - " if r.status_code != 200:\n", + " if r.status_code not in [200, 201]:\n", " raise ValueError(f\"{r.status_code}: {r.text}\")\n", " return r.json()\n", "\n", "\n", - "def upload_file_list(deposit, flist, root, overwrite=False):\n", + "def upload_file_list(deposit, flist, overwrite=False):\n", " out = []\n", - " existing_files = {\n", - " f[\"filename\"]: f\n", - " for f in requests.get(deposit[\"links\"][\"files\"], params=PARAMS).json()\n", - " }\n", + " existing_file_request = requests.get(deposit[\"links\"][\"files\"], params=PARAMS)\n", + " if existing_file_request.status_code == 404:\n", + " existing_files = {}\n", + " else:\n", + " existing_files = {f[\"filename\"]: f for f in existing_file_request.json()}\n", " for f in flist:\n", " print(f\"Uploading: {str(f)}\")\n", + " zenodo_name = _get_zenodo_name(f)\n", " if (\n", - " (root + f.name) in existing_files\n", - " or (root + f.name + \".zip\") in existing_files\n", + " zenodo_name in existing_files or (zenodo_name + \".zip\") in existing_files\n", " ) and not overwrite:\n", " print(\"...Skipping b/c already uploaded\")\n", " continue\n", " if f.is_file():\n", - " out.append(upload_file(deposit[\"links\"], f, root, overwrite=overwrite))\n", + " out.append(\n", + " upload_file(\n", + " deposit[\"links\"],\n", + " f,\n", + " overwrite=overwrite,\n", + " zenodo_name=zenodo_name,\n", + " existing_files=existing_files,\n", + " )\n", + " )\n", " elif f.is_dir():\n", " with TemporaryDirectory() as d:\n", " tmp_file = Path(d) / (f.name + \".zip\")\n", @@ -406,7 +467,7 @@ " upload_file(\n", " deposit[\"links\"],\n", " tmp_file,\n", - " root,\n", + " zenodo_name=zenodo_name,\n", " overwrite=overwrite,\n", " )\n", " )\n", @@ -417,7 +478,7 @@ }, { "cell_type": "code", - "execution_count": 41, + "execution_count": 13, "id": "868b1d78-5eb4-4fa6-bcfc-0dbd8c88d39b", "metadata": {}, "outputs": [], @@ -425,17 +486,9 @@ "draft_deps = create_all_new_deposits(overwrite=False)" ] }, - { - "cell_type": "markdown", - "id": "558e67ba-77cb-4355-9dfb-d11533f1a344", - "metadata": {}, - "source": [ - "Note that it seems to take some time for the \"bucket\" link to show up, which is needed to use Zenodo's \"new\" file API, which allows for uploads larger than 100MB. So if bucket is not appearing, you may need to wait a while (<1 day) to be able to run the file uploads below." - ] - }, { "cell_type": "code", - "execution_count": 44, + "execution_count": 14, "id": "ff8703bb-c630-418e-9f2f-c62b116ee7e8", "metadata": {}, "outputs": [ @@ -443,39 +496,48 @@ "name": "stdout", "output_type": "stream", "text": [ - "SLIIDERS\n" - ] - }, - { - "ename": "TypeError", - "evalue": "string indices must be integers, not 'str'", - "output_type": "error", - "traceback": [ - "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", - "\u001b[0;31mTypeError\u001b[0m Traceback (most recent call last)", - "Cell \u001b[0;32mIn[44], line 6\u001b[0m\n\u001b[1;32m 3\u001b[0m \u001b[38;5;28mprint\u001b[39m(name)\n\u001b[1;32m 4\u001b[0m this_dep \u001b[38;5;241m=\u001b[39m draft_deps[name]\n\u001b[0;32m----> 6\u001b[0m uploads[name] \u001b[38;5;241m=\u001b[39m \u001b[43mupload_file_list\u001b[49m\u001b[43m(\u001b[49m\n\u001b[1;32m 7\u001b[0m \u001b[43m \u001b[49m\u001b[43mthis_dep\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mkind\u001b[49m\u001b[43m[\u001b[49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43minputs\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m]\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43minputs/\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43moverwrite\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[38;5;28;43;01mFalse\u001b[39;49;00m\n\u001b[1;32m 8\u001b[0m \u001b[43m\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 9\u001b[0m uploads[name] \u001b[38;5;241m+\u001b[39m\u001b[38;5;241m=\u001b[39m upload_file_list(\n\u001b[1;32m 10\u001b[0m this_dep, kind[\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mproducts\u001b[39m\u001b[38;5;124m\"\u001b[39m], \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mproducts/\u001b[39m\u001b[38;5;124m\"\u001b[39m, overwrite\u001b[38;5;241m=\u001b[39m\u001b[38;5;28;01mFalse\u001b[39;00m\n\u001b[1;32m 11\u001b[0m )\n\u001b[1;32m 12\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124msource\u001b[39m\u001b[38;5;124m\"\u001b[39m \u001b[38;5;129;01min\u001b[39;00m kind\u001b[38;5;241m.\u001b[39mkeys():\n", - "Cell \u001b[0;32mIn[40], line 83\u001b[0m, in \u001b[0;36mupload_file_list\u001b[0;34m(deposit, flist, root, overwrite)\u001b[0m\n\u001b[1;32m 81\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mupload_file_list\u001b[39m(deposit, flist, root, overwrite\u001b[38;5;241m=\u001b[39m\u001b[38;5;28;01mFalse\u001b[39;00m):\n\u001b[1;32m 82\u001b[0m out \u001b[38;5;241m=\u001b[39m []\n\u001b[0;32m---> 83\u001b[0m existing_files \u001b[38;5;241m=\u001b[39m \u001b[43m{\u001b[49m\n\u001b[1;32m 84\u001b[0m \u001b[43m \u001b[49m\u001b[43mf\u001b[49m\u001b[43m[\u001b[49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43mfilename\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m]\u001b[49m\u001b[43m:\u001b[49m\u001b[43m \u001b[49m\u001b[43mf\u001b[49m\n\u001b[1;32m 85\u001b[0m \u001b[43m \u001b[49m\u001b[38;5;28;43;01mfor\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[43mf\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;129;43;01min\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[43mrequests\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mget\u001b[49m\u001b[43m(\u001b[49m\u001b[43mdeposit\u001b[49m\u001b[43m[\u001b[49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43mlinks\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m]\u001b[49m\u001b[43m[\u001b[49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43mfiles\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m]\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mparams\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mPARAMS\u001b[49m\u001b[43m)\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mjson\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 86\u001b[0m \u001b[43m \u001b[49m\u001b[43m}\u001b[49m\n\u001b[1;32m 87\u001b[0m \u001b[38;5;28;01mfor\u001b[39;00m f \u001b[38;5;129;01min\u001b[39;00m flist:\n\u001b[1;32m 88\u001b[0m \u001b[38;5;28mprint\u001b[39m(\u001b[38;5;124mf\u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mUploading: \u001b[39m\u001b[38;5;132;01m{\u001b[39;00m\u001b[38;5;28mstr\u001b[39m(f)\u001b[38;5;132;01m}\u001b[39;00m\u001b[38;5;124m\"\u001b[39m)\n", - "Cell \u001b[0;32mIn[40], line 84\u001b[0m, in \u001b[0;36m\u001b[0;34m(.0)\u001b[0m\n\u001b[1;32m 81\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mupload_file_list\u001b[39m(deposit, flist, root, overwrite\u001b[38;5;241m=\u001b[39m\u001b[38;5;28;01mFalse\u001b[39;00m):\n\u001b[1;32m 82\u001b[0m out \u001b[38;5;241m=\u001b[39m []\n\u001b[1;32m 83\u001b[0m existing_files \u001b[38;5;241m=\u001b[39m {\n\u001b[0;32m---> 84\u001b[0m \u001b[43mf\u001b[49m\u001b[43m[\u001b[49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43mfilename\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m]\u001b[49m: f\n\u001b[1;32m 85\u001b[0m \u001b[38;5;28;01mfor\u001b[39;00m f \u001b[38;5;129;01min\u001b[39;00m requests\u001b[38;5;241m.\u001b[39mget(deposit[\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mlinks\u001b[39m\u001b[38;5;124m\"\u001b[39m][\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mfiles\u001b[39m\u001b[38;5;124m\"\u001b[39m], params\u001b[38;5;241m=\u001b[39mPARAMS)\u001b[38;5;241m.\u001b[39mjson()\n\u001b[1;32m 86\u001b[0m }\n\u001b[1;32m 87\u001b[0m \u001b[38;5;28;01mfor\u001b[39;00m f \u001b[38;5;129;01min\u001b[39;00m flist:\n\u001b[1;32m 88\u001b[0m \u001b[38;5;28mprint\u001b[39m(\u001b[38;5;124mf\u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mUploading: \u001b[39m\u001b[38;5;132;01m{\u001b[39;00m\u001b[38;5;28mstr\u001b[39m(f)\u001b[38;5;132;01m}\u001b[39;00m\u001b[38;5;124m\"\u001b[39m)\n", - "\u001b[0;31mTypeError\u001b[0m: string indices must be integers, not 'str'" + "pyCIAM\n", + "Uploading: gs://rhg-data/impactlab-rhg/coastal/sliiders/int/exposure/ypk/finalized/ypk_2000_2100_20240222.zarr\n", + "...Skipping b/c already uploaded\n", + "Uploading: gs://rhg-data/impactlab-rhg/coastal/ciam_paper/data/int/surge-lookup-v1.2-seg.zarr\n", + "...Skipping b/c already uploaded\n", + "Uploading: gs://rhg-data/impactlab-rhg/coastal/ciam_paper/data/int/surge-lookup-v1.2-seg_adm.zarr\n", + "...Skipping b/c already uploaded\n", + "Uploading: gs://rhg-data/impactlab-rhg/coastal/ciam_paper/results-v1.2/pyCIAM_outputs.zarr\n", + "...Skipping b/c already uploaded\n", + "Uploading: gs://rhg-data/impactlab-rhg/coastal/ciam_paper/results-v1.2/suboptimal_capital_by_movefactor.zarr\n", + "...Skipping b/c already uploaded\n", + "Uploading: gs://rhg-data/impactlab-rhg/coastal/ciam_paper/results-v1.2/pyCIAM_outputs_noAdaptation.nc\n", + "...Skipping b/c already uploaded\n", + "Uploading: gs://rhg-data/impactlab-rhg/coastal/ciam_paper/results-v1.2/pyCIAM_outputs_protect10.nc\n", + "...Skipping b/c already uploaded\n", + "Uploading: gs://rhg-data/impactlab-rhg/coastal/ciam_paper/results-v1.2/pyCIAM_outputs_protect100.nc\n", + "...Skipping b/c already uploaded\n", + "Uploading: gs://rhg-data/impactlab-rhg/coastal/ciam_paper/results-v1.2/pyCIAM_outputs_protect1000.nc\n", + "...Skipping b/c already uploaded\n", + "Uploading: gs://rhg-data/impactlab-rhg/coastal/ciam_paper/results-v1.2/pyCIAM_outputs_protect10000.nc\n", + "...Skipping b/c already uploaded\n", + "Uploading: gs://rhg-data/impactlab-rhg/coastal/ciam_paper/results-v1.2/pyCIAM_outputs_retreat1.nc\n", + "...Skipping b/c already uploaded\n", + "Uploading: gs://rhg-data/impactlab-rhg/coastal/ciam_paper/results-v1.2/pyCIAM_outputs_retreat10.nc\n", + "Uploading: gs://rhg-data/impactlab-rhg/coastal/ciam_paper/results-v1.2/pyCIAM_outputs_retreat100.nc\n", + "Uploading: gs://rhg-data/impactlab-rhg/coastal/ciam_paper/results-v1.2/pyCIAM_outputs_retreat1000.nc\n", + "Uploading: gs://rhg-data/impactlab-rhg/coastal/ciam_paper/results-v1.2/pyCIAM_outputs_optimalfixed.nc\n", + "Uploading: pyCIAM.zip\n" ] } ], "source": [ "uploads = {}\n", - "for name, kind in ORIGINAL_PATHS.items():\n", + "for name in TITLES:\n", " print(name)\n", + " kind = ORIGINAL_PATHS[name]\n", " this_dep = draft_deps[name]\n", "\n", - " uploads[name] = upload_file_list(\n", - " this_dep, kind[\"inputs\"], \"inputs/\", overwrite=False\n", - " )\n", - " uploads[name] += upload_file_list(\n", - " this_dep, kind[\"products\"], \"products/\", overwrite=False\n", - " )\n", - " if \"source\" in kind.keys():\n", - " uploads[name] += upload_file_list(\n", - " this_dep, kind[\"source\"], \"source/\", overwrite=False\n", - " )" + " uploads[name] = []\n", + " for filetype in [\"inputs\", \"products\", \"source\"]:\n", + " if filetype in kind:\n", + " uploads[name] += upload_file_list(this_dep, kind[filetype], overwrite=False)" ] } ], @@ -495,7 +557,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.11.8" + "version": "3.12.2" }, "widgets": { "application/vnd.jupyter.widget-state+json": { diff --git a/notebooks/shared.py b/notebooks/shared.py index 0825bb2..2d575fc 100644 --- a/notebooks/shared.py +++ b/notebooks/shared.py @@ -1,20 +1,20 @@ +import os from pathlib import Path +from zipfile import ZipFile import geopandas as gpd import xarray as xr -from distributed import Client -from distributed.diagnostics.plugin import UploadDirectory - +from dask_gateway import GatewayCluster from pyCIAM import __file__ DIR_SCRATCH = Path("/tmp/ciam-scratch") SLIIDERS_VERS = "v1.2" -RES_VERS = "v1.1" +RES_VERS = "v1.2" # Cloud Storage tools (will work with local storage as well but may need to be specifiec # for cloud buckets -STORAGE_OPTIONS = {} +STORAGE_OPTIONS = None def _to_fuse(path): @@ -25,9 +25,10 @@ def _to_fuse(path): QUANTILES = [0.17, 0.5, 0.83] # Output dataset attrs -HISTORY = """version 1.1: Version associated with Depsky et al. 2023""" +HISTORY = """version 1.1: Version associated with Depsky et al. 2023. +version 1.2: Updated to use SLIIDERS v1.2 as input.""" AUTHOR = "Ian Bolliger" -CONTACT = "ian.bolliger@blackrock.com" +CONTACT = "ian@reask.earth" # AR5 SLR projections info LOCALIZESL_COREFILES = { @@ -54,9 +55,9 @@ def _to_fuse(path): ################## # NECESSARY FOR EXAMPLE -PATH_PARAMS = Path.home() / "git-repos/pyciam/params.json" +PATH_PARAMS = Path.home() / "git-repos/pyCIAM/params.json" -PATH_PARAMS_DIAZ = Path.home() / "git-repos/pyciam/params_diaz.json" +PATH_PARAMS_DIAZ = Path.home() / "git-repos/pyCIAM/params_diaz.json" ################## # SOCIOECON INPUTS @@ -67,8 +68,9 @@ def _to_fuse(path): # NECESSARY FOR EXAMPLE PATH_SLIIDERS = DIR_RAW / f"sliiders-{SLIIDERS_VERS}.zarr" -# NECESSARY FOR EXAMPLE -PATH_SLIIDERS_SEG = DIR_INT / f"sliiders-{SLIIDERS_VERS}-seg.zarr" +PATH_SLIIDERS_SEG = PATH_SLIIDERS.parent / ( + PATH_SLIIDERS.stem + "-seg" + PATH_SLIIDERS.suffix +) # Diaz PATH_DIAZ_INPUTS_RAW = DIR_RAW / "diaz2016_inputs_raw.zarr" @@ -132,7 +134,7 @@ def _to_fuse(path): # FILES FOR PLOTTING RESULTS ############################ PATH_MOVEFACTOR_DATA = DIR_RES / "suboptimal_capital_by_movefactor.zarr" -PATH_SLIIDERS_INCOME_INTERMEDIATE_FILE = DIR_RAW / "ypk_2000_2100_20221122.zarr" +PATH_SLIIDERS_INCOME_INTERMEDIATE_FILE = DIR_RAW / "ypk_2000_2100_20240222.zarr" DIR_FIGS = Path("/home/jovyan/ciam-figures") DIR_SHP = DIR_RAW / "shapefiles" @@ -155,48 +157,48 @@ def _to_fuse(path): p.mkdir(exist_ok=True, parents=True) -def upload_pyciam(client, restart_client=True): - """Upload a local package to Dask Workers. After calling this function, the package - contained at ``pkg_dir`` will be available on all workers in your Dask cluster, - including those that are instantiated afterward. This package will take priority - over any existing packages of the same name. This is a useful tool for working with - remote dask clusters (e.g. via Dask Gateway) but is not needed for local clusters. - If you wish to use this, you should call this function from within - `start_dask_cluster`. - - Parameters - ---------- - client : :py:class:`distributed.Client` - The client object associated with your Dask cluster's scheduler. - pkg_dir : str or Path-like - Path to the package you wish to zip and upload - **kwargs - Passed directly to :py:class:`distributed.diagnostics.plugin.UploadDirectory` - """ - client.register_worker_plugin( - UploadDirectory( - Path(__file__).parents[1], - update_path=True, - restart=restart_client, - skip_words=( - ".git", - ".github", - ".pytest_cache", - "tests", - "docs", - "deploy", - "notebooks", - ".ipynb_checkpoints", - "__pycache__", - ".coverage", - "dockerignore", - ".gitignore", - ".gitlab-ci.yml", - ".gitmodules", - "pyclaw.log", - ), - ) - ) +def _zipdir( + path, + zip_filename, + skip_files=( + ".git", + ".github", + ".pytest_cache", + "tests", + "docs", + "deploy", + "notebooks", + ".ipynb_checkpoints", + "__pycache__", + ".coverage", + "dockerignore", + ".gitignore", + ".gitlab-ci.yml", + ".gitmodules", + "pyclaw.log", + "run_tests.sh", + ), +): + + with ZipFile(zip_filename, "w") as ziph: + for root, dirs, files in os.walk(path): + for file in files: + if any([f in file.split("/") for f in skip_files]): + continue + # Create a relative path for files to preserve the directory structure + # within the ZIP archive. This relative path is based on the directory + # being zipped, so files are stored in the same structure. + relative_path = os.path.relpath( + os.path.join(root, file), os.path.join(path, "..") + ) + ziph.write(os.path.join(root, file), arcname=relative_path) + + +def upload_pyciam(client): + package_dir = Path(__file__).parent + zip_filename = "/tmp/pyCIAM.zip" # Output ZIP file name + _zipdir(package_dir, zip_filename) + client.upload_file(zip_filename) def save(obj, path, *args, **kwargs): @@ -241,7 +243,8 @@ def read_shapefile(path, **kwargs): return gpd.read_file(_path, **kwargs) -def start_dask_cluster(**kwargs): - client = Client(**kwargs) - print(client.dashboard_link) - return client +def start_dask_cluster(profile="micro", **kwargs): + cluster = GatewayCluster(profile=profile, **kwargs) + client = cluster.get_client() + upload_pyciam(client) + return client, cluster diff --git a/pyCIAM/io.py b/pyCIAM/io.py index f0c9dc8..64a951e 100644 --- a/pyCIAM/io.py +++ b/pyCIAM/io.py @@ -2,19 +2,17 @@ outputs used in running pyCIAM. Functions - prep_sliiders - load_ciam_inputs - load_diaz_inputs +--------- +* prep_sliiders +* load_ciam_inputs +* load_diaz_inputs """ -from collections.abc import Iterable - import dask.array as da import numpy as np import pandas as pd import pint_xarray # noqa: F401 import xarray as xr - from pyCIAM.utils import spherical_nearest_neighbor as snn from .utils import _s2d @@ -297,11 +295,11 @@ def _load_lslr_for_ciam( def create_template_dataarray(dims, coords, chunks, dtype="float32", name=None): """A utility function helpful for creatting an empty, dask-backed - :py:class:`xarray.DataArray` with specific dimensions, coordinates, dtype, name, - and chunk structure. This is useful for "probabilistic" mode of pyCIAM, in which - you will run the model on a large ensemble of SLR trajectory realizations. In this - case, we save an empty Zarr store and then parallelize the model runs across - regions, with each processor writing to a region within the template store. + :py:class:`xarray.DataArray` with specific dimensions, coordinates, dtype, name, and + chunk structure. This is useful for "probabilistic" mode of pyCIAM, in which you + will run the model on a large ensemble of SLR trajectory realizations. In this case, + we save an empty Zarr store and then parallelize the model runs across regions, with + each processor writing to a region within the template store. Parameters ---------- @@ -336,10 +334,10 @@ def create_template_dataarray(dims, coords, chunks, dtype="float32", name=None): def create_template_dataset(var_dims, coords, chunks, dtypes): """A utility function helpful for creatting an empty, dask-backed :py:class:`xarray.Dataset` with specific variables, dimensions, coordinates, dtypes, - and chunk structure. This is useful for "probabilistic" mode of pyCIAM, in which - you will run the model on a large ensemble of SLR trajectory realizations. In this - case, we save an empty Zarr store and then parallelize the model runs across - regions, with each processor writing to a region within the template store. + and chunk structure. This is useful for "probabilistic" mode of pyCIAM, in which you + will run the model on a large ensemble of SLR trajectory realizations. In this case, + we save an empty Zarr store and then parallelize the model runs across regions, with + each processor writing to a region within the template store. Parameters ---------- diff --git a/pyCIAM/run.py b/pyCIAM/run.py index c5a0905..6e1378d 100644 --- a/pyCIAM/run.py +++ b/pyCIAM/run.py @@ -2,9 +2,10 @@ options are calculated. Functions - calc_costs - select_optimal_case - execute_pyciam +--------- +* calc_costs +* select_optimal_case +* execute_pyciam """ from collections import OrderedDict @@ -15,8 +16,6 @@ import xarray as xr from cloudpathlib import AnyPath, CloudPath from distributed import Client, wait -from rhg_compute_tools.xarray import dataarray_from_delayed - from pyCIAM.constants import CASE_DICT, CASES, COSTTYPES, PLIST, RLIST, SOLVCASES from pyCIAM.io import ( check_finished_zarr_workflow, @@ -39,6 +38,7 @@ collapse_econ_inputs_to_seg, subset_econ_inputs, ) +from rhg_compute_tools.xarray import dataarray_from_delayed def calc_costs( @@ -83,9 +83,9 @@ def calc_costs( elev_chunksize : int, default 1 Number of elevation slices to process simultaneously. Higher numbers improve efficiency through vectorization but result in a larger memory footprint. - ddf_i, dmf_i : func, default :py:func:`.damage_funcs.ddf_i`, :py:func:`.damage_funcs.dmf_i` - Damage functions relating physical capital loss and monetized mortality arising - from a certain depth of inundation. + ddf_i, dmf_i : func, default :py:func:`.damage_funcs.ddf_i`, + :py:func:`.damage_funcs.dmf_i`. Damage functions relating physical capital loss + and monetized mortality arising from a certain depth of inundation. diaz_protect_height : bool, default False If True, reduce the 1-in-10-year extreme sea level by 50% as in Diaz 2016. This hack should not be necessary when using the ESL heights from CoDEC (as in @@ -280,9 +280,9 @@ def calc_costs( surge_noadapt["stormCapital"] = surge_noadapt.stormCapital * inputs.K.sum( "elev" ) - surge_noadapt[ - "stormPopulation" - ] = surge_noadapt.stormPopulation * inputs.pop.sum("elev") + surge_noadapt["stormPopulation"] = ( + surge_noadapt.stormPopulation * inputs.pop.sum("elev") + ) else: if surge_lookup is None: rh_years = RH_heights.sel(at=at).drop("at") @@ -402,9 +402,9 @@ def calc_costs( surge_noadapt["stormCapital"] = surge_noadapt.stormCapital * inputs.K.sum( "elev" ) - surge_noadapt[ - "stormPopulation" - ] = surge_noadapt.stormPopulation * inputs.pop.sum("elev") + surge_noadapt["stormPopulation"] = ( + surge_noadapt.stormPopulation * inputs.pop.sum("elev") + ) surge = surge.stack(tmp=["adapttype", "return_period"]) surge = ( @@ -880,6 +880,7 @@ def execute_pyciam( surge_input_paths=None, output_path=None, tmp_output_path=AnyPath("pyciam_tmp_results.zarr"), + remove_tmpfile=True, overwrite=False, mc_dim="quantile", seg_var="seg_adm", @@ -895,7 +896,7 @@ def execute_pyciam( diaz_inputs=False, diaz_config=False, dask_client_func=Client, - storage_options={}, + storage_options=None, **model_kwargs ): """Execute the full pyCIAM model. The following inputs are assumed: @@ -957,6 +958,10 @@ def execute_pyciam( tmp_output_path : Path-like, default Path("pyciam_tmp_results.zarr") Path to temporary output zarr store that is written to and read from within this function. Ignored if `output_path` is not None. + remove_tmpfile : bool, default True + If True, remove the intermediate zarr store created before collapsing to + `adm_var` and rechunking. Setting to False can be useful for debugging if you + want to examine seg-adm level results. ovewrwrite : bool, default False If True, overwrite all intermediate output files mc_dim : str, default "quantile" @@ -1052,6 +1057,9 @@ def execute_pyciam( surge_input_paths = {k: AnyPath(v) for k, v in surge_input_paths.items()} slr_input_paths = [AnyPath(f) if f is not None else None for f in slr_input_paths] + if seg_var == "seg": + adm_var = "seg" + # read parameters params = pd.read_json(params_path)["values"] @@ -1095,7 +1103,6 @@ def execute_pyciam( if seg_var == "seg": econ_input_path_seg = econ_input_path else: - assert tmp_output_path is not None if overwrite or not econ_input_path_seg.is_dir(): collapse_econ_inputs_to_seg( econ_input_path, @@ -1382,13 +1389,15 @@ def execute_pyciam( .chunk({"year": 10}) .persist() ) - out["costs"] = ( - out.costs.groupby(ciam_in[adm_var]).sum().chunk({adm_var: this_chunksize}) - ).persist() - out["optimal_case"] = ( - out.optimal_case.load().groupby(ciam_in.seg).first(skipna=False).chunk() - ).persist() - out = out.drop(seg_var).unify_chunks() + if adm_var != seg_var: + out["costs"] = ( + out.costs.groupby(ciam_in[adm_var]).sum().chunk({adm_var: this_chunksize}) + ).persist() + out["optimal_case"] = ( + out.optimal_case.load().groupby(ciam_in.seg).first(skipna=False).chunk() + ).persist() + out = out.drop(seg_var) + out = out.unify_chunks() for v in out.data_vars: out[v].encoding.clear() @@ -1411,10 +1420,11 @@ def execute_pyciam( ) client.cluster.close() client.close() - if isinstance(tmp_output_path, CloudPath): - tmp_output_path.rmtree() - else: - rmtree(tmp_output_path) + if remove_tmpfile: + if isinstance(tmp_output_path, CloudPath): + tmp_output_path.rmtree() + else: + rmtree(tmp_output_path) def get_refA( @@ -1595,7 +1605,7 @@ def optimize_case( ) as ds: all_segs = ds.seg.load() - this_seg_adms = all_segs.seg_adm.isel({seg_var: all_segs.seg == seg}).values + this_seg_adms = all_segs[seg_var].isel({seg_var: all_segs.seg == seg}).values save_to_zarr_region( select_optimal_case( diff --git a/pyCIAM/surge/__init__.py b/pyCIAM/surge/__init__.py index a4af155..224fded 100644 --- a/pyCIAM/surge/__init__.py +++ b/pyCIAM/surge/__init__.py @@ -1 +1,2 @@ -"""This module contains functions related to calculating losses from extreme sea levels.""" +"""This module contains functions related to calculating losses from extreme sea +levels.""" diff --git a/pyCIAM/surge/_calc.py b/pyCIAM/surge/_calc.py index e3dc42f..5ddefa0 100644 --- a/pyCIAM/surge/_calc.py +++ b/pyCIAM/surge/_calc.py @@ -1,5 +1,5 @@ -"""This private module contains functions related to specific calculations within -pyCIAM that are called by the ``run`` module.""" +"""This private module contains functions related to specific calculations within pyCIAM +that are called by the ``run`` module.""" import numpy as np import xarray as xr @@ -12,8 +12,8 @@ def _get_surge_heights_probs( min_surge_ht, max_surge_ht, gumbel_params, n_surge_heights=100 ): """Create an array of ``n_surge_heights`` surge heights and associated probabilities - to apply in CIAM in order to sample an appropriate range of plausible surge heights. - """ + to apply in CIAM in order to sample an appropriate range of plausible surge + heights.""" # get gumbel params loc = gumbel_params.sel(params="loc", drop=True) diff --git a/pyCIAM/surge/damage_funcs.py b/pyCIAM/surge/damage_funcs.py index 95b675b..f4b45fe 100644 --- a/pyCIAM/surge/damage_funcs.py +++ b/pyCIAM/surge/damage_funcs.py @@ -2,8 +2,8 @@ relate storm surge depth to mortality and physical capital loss. As of April 7, 2022, the only functions available are those included in Diaz 2016. These define the "resilience-unadjusted" fractional damages. That is, for a region with resiliendce -factor (:math:`\rho`) of 1, what fraction of the exposed population and/or physical capital -will be lost conditional on a storm surge depth. +factor (:math:`\rho`) of 1, what fraction of the exposed population and/or physical +capital will be lost conditional on a storm surge depth. At the moment, functions must have an analytical integral and are actually defined by their integral, as can be seen with the suffix ``_i``. @@ -51,7 +51,8 @@ def diaz_ddf_i(depth_st, depth_end): def diaz_dmf_i(depth_st, depth_end, floodmortality=0.01): """Integral of mortality damage function as used in Diaz 2016, assuming unit - resilience (:math:`\rho`). It is just a constant fraction conditional on a unit of exposure being inundated. Note that kwargs are not optional and will raise an error + resilience (:math:`\rho`). It is just a constant fraction conditional on a unit of + exposure being inundated. Note that kwargs are not optional and will raise an error if not specified when called. Parameters diff --git a/pyCIAM/surge/lookup.py b/pyCIAM/surge/lookup.py index a6424c3..7e9d08d 100644 --- a/pyCIAM/surge/lookup.py +++ b/pyCIAM/surge/lookup.py @@ -17,7 +17,6 @@ create_surge_lookup """ - import dask.array as da import numpy as np import pandas as pd @@ -63,12 +62,15 @@ def _get_lslr_rhdiff_range( storage_options={}, ): """Get the range of lslr and rhdiff that we need to model to cover the full range - across scenario/mcs. The minimum LSLR value we'll need to model for the purposes of + across scenario/mcs. + + The minimum LSLR value we'll need to model for the purposes of assessing storm damage is the minimum across sites of: the site-level maximum of "0 minus the s10000 surge height" and "the minimum projected LSLR for all of the scenario/mcs we use in our binned LSL dataset". The maximum LSLR value is the maximum experienced at any site in any year for all of the sceanrio/mcs we use in - the binned LSL dataset.""" + the binned LSL dataset. + """ if isinstance(slr_0_years, int): slr_0_years = [slr_0_years] * len(slr_stores) diff --git a/pyCIAM/utils.py b/pyCIAM/utils.py index 8efde61..5430db3 100644 --- a/pyCIAM/utils.py +++ b/pyCIAM/utils.py @@ -1,4 +1,4 @@ -"""This private module contains miscellaneous functions to support pyCIAM""" +"""This private module contains miscellaneous functions to support pyCIAM.""" import shutil from pathlib import Path @@ -7,9 +7,8 @@ import pandas as pd import xarray as xr from cloudpathlib import CloudPath -from sklearn.neighbors import BallTree - from pyCIAM.constants import CASE_DICT +from sklearn.neighbors import BallTree def _s2d(ds): @@ -281,13 +280,18 @@ def weighted_avg(varname, wts_in): def subset_econ_inputs(ds, seg_var, seg_var_subset): if seg_var_subset is None: return ds - if seg_var == "seg": - return ds.sel(seg=ds.seg.str.contains(seg_var_subset)) - necessary_segs = np.unique( - ds.seg.sel({seg_var: ds[seg_var].str.contains(seg_var_subset)}) - ) - return ds.sel({seg_var: ds.seg.isin(necessary_segs)}) + if isinstance(seg_var_subset, str): + if seg_var == "seg": + subsetter = ds.seg.str.contains(seg_var_subset) + else: + subsetter = ds.seg.isin( + np.unique( + ds.seg.sel({seg_var: ds[seg_var].str.contains(seg_var_subset)}) + ) + ) + + return ds.sel({seg_var: subsetter}) def copy(path_src, path_trg):