Skip to content

Commit

Permalink
Merge pull request #1224 from metno/mda8
Browse files Browse the repository at this point in the history
MDA8
  • Loading branch information
lewisblake authored Jul 2, 2024
2 parents e6c9d66 + ace42f6 commit f179213
Show file tree
Hide file tree
Showing 7 changed files with 304 additions and 4 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,6 @@ sorted_out

# Vim temp files
*.swp


pytest.ini
23 changes: 19 additions & 4 deletions pyaerocom/colocation/colocator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import os
import traceback
import warnings
from collections import defaultdict
from datetime import datetime
from typing import Any, Callable

Expand All @@ -30,6 +31,8 @@
from pyaerocom.io import ReadCAMS2_83, ReadGridded, ReadUngridded
from pyaerocom.io.helpers import get_all_supported_ids_ungridded
from pyaerocom.io.mscw_ctm.reader import ReadMscwCtm
from pyaerocom.stats.mda8.const import MDA_VARS
from pyaerocom.stats.mda8.mda8 import mda8_colocated_data

from .colocated_data import ColocatedData
from .colocation_3d import ColocatedDataLists, colocate_vertical_profile_gridded
Expand Down Expand Up @@ -360,7 +363,7 @@ def run(self, var_list: list = None):
dictionaries comprising key / value pairs of obs variables and
associated instances of :class:`ColocatedData`.
"""
data_out = {}
data_out = defaultdict(lambda: dict())
# ToDo: see if the following could be solved via custom context manager
try:
vars_to_process = self.prepare_run(var_list)
Expand All @@ -378,9 +381,21 @@ def run(self, var_list: list = None):
coldata = self._run_helper(
mod_var, obs_var
) # note this can be ColocatedData or ColocatedDataLists
if mod_var not in data_out:
data_out[mod_var] = {}
data_out[mod_var][obs_var] = coldata

if obs_var in MDA_VARS:
mda8 = None
try:
mda8 = mda8_colocated_data(
coldata, obs_var=f"{obs_var}mda8", mod_var=f"{mod_var}mda8"
)
except ValueError:
logger.warning(
"Tried calculating mda8 for [%s, %s], but failed.", obs_var, mod_var
)
finally:
data_out[f"{mod_var}mda8"][f"{obs_var}mda8"] = mda8

self._processing_status.append([mod_var, obs_var, 1])
except Exception:
msg = f"Failed to perform analysis: {traceback.format_exc()}\n"
Expand All @@ -397,7 +412,7 @@ def run(self, var_list: list = None):
self._print_processing_status()
if self.colocation_setup.keep_data:
self.data = data_out
return data_out
return dict(data_out)

def get_nc_files_in_coldatadir(self):
"""
Expand Down
Empty file.
2 changes: 2 additions & 0 deletions pyaerocom/stats/mda8/const.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Variable names for which mda8 values are to be calculated.
MDA_VARS = ("conco3", "vmro3")
90 changes: 90 additions & 0 deletions pyaerocom/stats/mda8/mda8.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import numpy as np
import xarray as xr

from pyaerocom.colocation.colocated_data import ColocatedData


def min_periods_max(x: np.ndarray, /, min_periods=1) -> float:
"""Calculates the max of a 1-dimensional array, returning
nan if not a minimum count of valid values exist.
:param x: 1-dimensional ndarray.
:param min_periods: minimum required non-nan values, defaults to 1
:return: A single value, which is either nan or a float.
"""
if x.ndim != 1:
raise ValueError(f"Unexpected number of dimensions. Got {x.ndim}, expected 1.")

length = np.sum(~np.isnan(x))
if length < min_periods:
return np.nan

return np.nanmax(x)


def mda8_colocated_data(coldat: ColocatedData, /, obs_var: str, mod_var: str) -> ColocatedData:
"""Applies the mda8 calculation to a colocated data object,
returning the new colocated data object.
:param data: The colocated data object.
:return: Colocated data object containing
"""
if not isinstance(coldat, ColocatedData):
raise ValueError(f"Unexpected type {type(coldat)}. Expected ColocatedData")

if coldat.ts_type != "hourly":
raise ValueError(f"Expected hourly timeseries. Got {coldat.ts_type}.")

# TODO: Currently order of dims matter in the implementation, so this check is
# stricter than it probably should be.
if coldat.dims != ("data_source", "time", "station_name"):
raise ValueError(
f"Unexpected dimensions. Got {coldat.dims}, expected ['data_source', 'time', 'station_name']."
)

cd = ColocatedData(_calc_mda8(coldat.data))
cd.data.attrs["var_name"] = [obs_var, mod_var]
return cd


def _calc_mda8(data: xr.DataArray) -> xr.DataArray:
"""Calculates the daily max 8h average for an array:
:param data: The DataArray for which to calculate the mda8. Input
should be a DataArray with dimensions ["data_source", "time", "station_name"]
(ie. the format of ColocatedData.data) representing hourly data.
:return: Equivalently structured DataArray, resampled along the "time"
dimension.
Note:
-----
The calculation for mda8 is defined as follows:
> Eight hours values: 75 % of values (i.e. 6 hours)
> Maximum daily 8-hour mean: 75 % of the hourly running eight hour
averages (i.e. 18 eight hour averages per
day)
> The maximum daily eight hour mean concentration will be selected by examining
> eight hour running averages, calculated from hourly data and updated each hour.
> Each eight hour average so calculated will be assigned to the day on which it
> ends i.e. the first calculation period for any one day will be the period from
> 17:00 on the previous day to 01:00 on that day; the last calculation period for
> any one day will be the period from 16:00 to 24:00 on that day.
"""
mda8 = _daily_max(_rolling_average_8hr(data))
mda8.attrs["ts_type"] = "daily"
return mda8


def _rolling_average_8hr(arr: xr.DataArray) -> xr.DataArray:
return arr.rolling(time=8, min_periods=6).mean()


def _daily_max(arr: xr.DataArray) -> xr.DataArray:
# TODO: Base is deprecated, and using offset="1h" is the proper way to do this.
# However this currently breaks the old-dependencies test in CI. Should be
# changed in the future.
return arr.resample(time="24H", base=1).reduce(
lambda x, axis: np.apply_along_axis(min_periods_max, 1, x, min_periods=18)
)
Empty file added tests/stats/mda8/__init__.py
Empty file.
190 changes: 190 additions & 0 deletions tests/stats/mda8/test_mda8.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
import numpy as np
import pytest
import xarray as xr

from pyaerocom.colocation.colocated_data import ColocatedData
from pyaerocom.colocation.colocation_3d import ColocatedDataLists
from pyaerocom.stats.mda8.mda8 import (
_calc_mda8,
_daily_max,
_rolling_average_8hr,
mda8_colocated_data,
)
from tests.fixtures.collocated_data import coldata


@pytest.fixture
def test_data(time, values) -> xr.DataArray:
return xr.DataArray(
[[[x] for x in values]],
dims=["data_source", "time", "station_name"],
coords={"time": time},
)


@pytest.mark.parametrize(
"time,values,exp_mda8",
(
pytest.param(
xr.date_range(start="2024-01-01 01:00", periods=49, freq="1h"),
[0] * 49,
[0, 0, np.nan],
id="zeros",
),
pytest.param(
xr.date_range(start="2024-01-01 01:00", periods=49, freq="1h"),
np.linspace(start=1, stop=49, num=49),
[20.5, 44.5, np.nan],
id="incrementing-by-1",
),
pytest.param(
xr.date_range(start="2024-01-01 13:00:00", periods=49, freq="1h"),
np.concatenate( # Sequence 1-7, 3x nan, 11-14, 3x nan, 28-49
(
np.arange(start=1, stop=8),
[np.nan] * 3,
np.arange(start=11, stop=25),
[np.nan] * 3,
np.arange(start=28, stop=50),
)
),
[np.nan] * 3,
id="with-nans",
),
),
)
def test_calc_mda8(test_data, exp_mda8):
mda8 = _calc_mda8(test_data)

assert mda8.shape[1] == len(exp_mda8)

assert all(mda8[0, :, 0] == pytest.approx(exp_mda8, abs=0, nan_ok=True))


def test_calc_mda8_with_gap():
arr1 = xr.DataArray(
[[[x] for x in np.linspace(start=1, stop=50, num=50)]],
dims=["data_source", "time", "station_name"],
coords={"time": xr.date_range(start="2024-01-01 01:00", periods=50, freq="1h")},
)

arr2 = xr.DataArray(
[[[x] for x in np.linspace(start=1, stop=50, num=50)]],
dims=["data_source", "time", "station_name"],
coords={"time": xr.date_range(start="2024-01-04 01:00", periods=50, freq="1h")},
)

arr = xr.concat((arr1, arr2), dim="time")

mda8 = _calc_mda8(arr)

assert mda8.shape == (1, 6, 1)
pytest.approx(mda8[0, :, 0], [20.5, 44.5, np.nan, 41.25, 44.5, np.nan], abs=10 * 10**-5)


@pytest.mark.parametrize("coldataset", ("fake_3d_hr",))
def test_coldata_to_mda8(coldata):
mda8 = mda8_colocated_data(coldata, obs_var="vmro3mda8", mod_var="vmro3mda8")

assert isinstance(mda8, ColocatedData)
assert mda8.metadata["ts_type"] == "daily"
assert mda8.metadata["var_name"] == ["vmro3mda8", "vmro3mda8"]
assert mda8.shape == (2, 9, 1)

assert mda8.data.values[0, :, 0] == pytest.approx(
[
np.nan,
np.nan,
np.nan,
1.18741556,
1.18777241,
1.18869106,
1.18879322,
1.18807846,
1.18700801,
],
abs=10**-5,
nan_ok=True,
)

assert mda8.data.values[1, :, 0] == pytest.approx(
[
np.nan,
1.57327333,
1.28884431,
1.28741556,
1.28777241,
1.28869106,
1.28879322,
1.28807846,
1.28700801,
],
abs=10**-5,
nan_ok=True,
)


@pytest.mark.parametrize(
"time,values,exp_ravg",
(
(
xr.date_range(start="2024-01-01 01:00", periods=48, freq="1h"),
[0, 1, 2, 3, 4, 5, 6, 7] * 6,
[np.nan] * 5 + [2.5, 3] + [3.5] * 41,
),
(
xr.date_range(start="2024-01-01 01:00", periods=24, freq="1h"),
[np.nan, 1, 2, 3, 4, 5, 6, 7] * 3,
[np.nan] * 6 + [3.5] + [4] * 17,
),
(
xr.date_range(start="2024-01-01 01:00", periods=24, freq="1h"),
[np.nan, np.nan, 2, 3, 4, 5, 6, 7] * 3,
[np.nan] * 7 + [4.5] * 17,
),
(
xr.date_range(start="2024-01-01 01:00", periods=24, freq="1h"),
[np.nan, np.nan, np.nan, 3, 4, 5, 6, 7] * 3,
[np.nan] * 24,
),
),
)
def test_rollingaverage(test_data, exp_ravg):
"""
Test of rolling average calculation
"""
ravg = _rolling_average_8hr(test_data)

assert all(ravg[0, :, 0] == pytest.approx(exp_ravg, abs=0, nan_ok=True))


@pytest.mark.parametrize(
"time,values,exp_daily_max",
(
(
xr.date_range(start="2024-01-01 01:00", periods=48, freq="1h"),
np.linspace(1, 48, num=48),
[24, 48],
),
(
xr.date_range(start="2024-01-01 01:00", periods=24, freq="1h"),
[np.nan] * 6 + list(range(7, 25)),
[24],
),
(
xr.date_range(start="2024-01-01 01:00", periods=24, freq="1h"),
[np.nan] * 7 + list(range(8, 25)),
[np.nan],
),
),
)
def test_daily_max(test_data, exp_daily_max):
"""
Tests the daily max calculation. Note that "the first calculation period
for any one day will be the period from 17:00 on the previous day to 01:00
on that day; the last calculation period for any one day will be the period
from 16:00 to 24:00 on that day
"""
dmax = _daily_max(test_data)

assert all(dmax[0, :, 0] == pytest.approx(exp_daily_max, abs=0, nan_ok=True))

0 comments on commit f179213

Please sign in to comment.