diff --git a/hierarchicalforecast/_modidx.py b/hierarchicalforecast/_modidx.py
index 122f24d..0e27fa2 100644
--- a/hierarchicalforecast/_modidx.py
+++ b/hierarchicalforecast/_modidx.py
@@ -202,8 +202,6 @@
'hierarchicalforecast/utils.py'),
'hierarchicalforecast.utils.aggregate': ( 'src/utils.html#aggregate',
'hierarchicalforecast/utils.py'),
- 'hierarchicalforecast.utils.cov2corr': ( 'src/utils.html#cov2corr',
- 'hierarchicalforecast/utils.py'),
'hierarchicalforecast.utils.is_strictly_hierarchical': ( 'src/utils.html#is_strictly_hierarchical',
'hierarchicalforecast/utils.py'),
'hierarchicalforecast.utils.level_to_outputs': ( 'src/utils.html#level_to_outputs',
diff --git a/hierarchicalforecast/probabilistic_methods.py b/hierarchicalforecast/probabilistic_methods.py
index c5cf844..d5a90c8 100644
--- a/hierarchicalforecast/probabilistic_methods.py
+++ b/hierarchicalforecast/probabilistic_methods.py
@@ -11,7 +11,7 @@
from scipy.stats import norm
from sklearn.preprocessing import OneHotEncoder
-from .utils import is_strictly_hierarchical, cov2corr
+from .utils import is_strictly_hierarchical
# %% ../nbs/src/probabilistic_methods.ipynb 6
class Normality:
@@ -64,7 +64,8 @@ def __init__(
# Base Normality Errors assume independence/diagonal covariance
# TODO: replace bilinearity with elementwise row multiplication
- R1 = cov2corr(self.W)
+ std_ = np.sqrt(np.diag(self.W))
+ R1 = self.W / np.outer(std_, std_)
Wh = [np.diag(sigma) @ R1 @ np.diag(sigma).T for sigma in self.sigmah.T]
# Reconciled covariances across forecast horizon
diff --git a/hierarchicalforecast/utils.py b/hierarchicalforecast/utils.py
index aabeaa8..2688559 100644
--- a/hierarchicalforecast/utils.py
+++ b/hierarchicalforecast/utils.py
@@ -15,7 +15,7 @@
from narwhals.typing import Frame, FrameT
from numba import njit, prange
from sklearn.preprocessing import OneHotEncoder
-from typing import Dict, List, Optional, Iterable, Union, Sequence
+from typing import Dict, List, Optional, Union, Sequence
# %% ../nbs/src/utils.ipynb 6
# Global variables
@@ -44,7 +44,7 @@ def __exit__(self, exc_type, exc_value, traceback):
)
# %% ../nbs/src/utils.ipynb 8
-def is_strictly_hierarchical(S: np.ndarray, tags: Dict[str, np.ndarray]):
+def is_strictly_hierarchical(S: np.ndarray, tags: Dict[str, np.ndarray]) -> bool:
# main idea:
# if S represents a strictly hierarchical structure
# the number of paths before the bottom level
@@ -60,25 +60,10 @@ def is_strictly_hierarchical(S: np.ndarray, tags: Dict[str, np.ndarray]):
nodes = levels_.popitem()[1].size
return paths == nodes
-# %% ../nbs/src/utils.ipynb 9
-def cov2corr(cov, return_std=False):
- """convert covariance matrix to correlation matrix
- **Parameters:**
- `cov`: array_like, 2d covariance matrix.
- `return_std`: bool=False, if True returned std.
- **Returns:**
- `corr`: ndarray (subclass) correlation matrix
- """
- cov = np.asanyarray(cov)
- std_ = np.sqrt(np.diag(cov))
- corr = cov / np.outer(std_, std_)
- if return_std:
- return corr, std_
- else:
- return corr
-
-# %% ../nbs/src/utils.ipynb 11
-def _to_upper_hierarchy(bottom_split, bottom_values, upper_key):
+# %% ../nbs/src/utils.ipynb 10
+def _to_upper_hierarchy(
+ bottom_split: List[str], bottom_values: str, upper_key: str
+) -> List[str]:
upper_split = upper_key.split("/")
upper_idxs = [bottom_split.index(i) for i in upper_split]
@@ -88,7 +73,7 @@ def join_upper(bottom_value):
return [join_upper(val) for val in bottom_values]
-# %% ../nbs/src/utils.ipynb 14
+# %% ../nbs/src/utils.ipynb 11
def aggregate(
df: Frame,
spec: List[List[str]],
@@ -139,16 +124,18 @@ def aggregate(
raise ValueError("Sparse output is only supported for Pandas DataFrames.")
for col in df_nw.columns:
- assert (
- not df_nw[col].is_null().any()
- ), f"Column {col} contains null values. Make sure no column in the DataFrame contains null values."
+ if df_nw[col].is_null().any():
+ raise ValueError(
+ f"Column {col} contains null values. Make sure no column in the DataFrame contains null values."
+ )
# Check whether all columns in the spec are in the df
aggregation_cols_in_spec = list(
dict.fromkeys([col for cols in spec for col in cols])
)
for col in aggregation_cols_in_spec:
- assert col in df_nw.columns, f"Column {col} in spec not present in df"
+ if col not in df_nw.columns:
+ raise ValueError(f"Column {col} in spec not present in df")
# Prepare the aggregation dictionary
agg_dict = dict(
@@ -247,7 +234,7 @@ def aggregate(
return Y_df, S_df, tags
-# %% ../nbs/src/utils.ipynb 30
+# %% ../nbs/src/utils.ipynb 25
class HierarchicalPlot:
"""Hierarchical Plot
@@ -532,9 +519,9 @@ def plot_hierarchical_predictions_gap(
plt.grid()
plt.show()
-# %% ../nbs/src/utils.ipynb 51
+# %% ../nbs/src/utils.ipynb 46
# convert levels to output quantile names
-def level_to_outputs(level: Iterable[int]):
+def level_to_outputs(level: List[int]) -> tuple[List[float], List[str]]:
"""Converts list of levels into output names matching StatsForecast and NeuralForecast methods.
**Parameters:**
@@ -558,7 +545,7 @@ def level_to_outputs(level: Iterable[int]):
# convert quantiles to output quantile names
-def quantiles_to_outputs(quantiles: Iterable[float]):
+def quantiles_to_outputs(quantiles: List[float]) -> tuple[List[float], List[str]]:
"""Converts list of quantiles into output names matching StatsForecast and NeuralForecast methods.
**Parameters:**
@@ -577,7 +564,7 @@ def quantiles_to_outputs(quantiles: Iterable[float]):
output_names.append("-median")
return quantiles, output_names
-# %% ../nbs/src/utils.ipynb 52
+# %% ../nbs/src/utils.ipynb 47
# given input array of sample forecasts and inptut quantiles/levels,
# output a Pandas Dataframe with columns of quantile predictions
def samples_to_quantiles_df(
@@ -586,10 +573,11 @@ def samples_to_quantiles_df(
dates: List[str],
quantiles: Optional[List[float]] = None,
level: Optional[List[int]] = None,
- model_name: Optional[str] = "model",
+ model_name: str = "model",
id_col: str = "unique_id",
time_col: str = "ds",
-):
+ backend: str = "pandas",
+) -> tuple[List[float], FrameT]:
"""Transform Random Samples into HierarchicalForecast input.
Auxiliary function to create compatible HierarchicalForecast input `Y_hat_df` dataframe.
@@ -602,26 +590,35 @@ def samples_to_quantiles_df(
`model_name`: string. Name of forecasting model.
`id_col` : str='unique_id', column that identifies each serie.
`time_col` : str='ds', column that identifies each timestep, its values can be timestamps or integers.
+ `backend` : str='pandas', backend to use for the output dataframe, either 'pandas' or 'polars'.
**Returns:**
`quantiles`: float list in [0., 1.]. quantiles to estimate from y distribution .
- `Y_hat_df`: pd.DataFrame. With base quantile forecasts with columns ds and models to reconcile indexed by unique_id.
+ `Y_hat_df`: DataFrame. With base quantile forecasts with columns ds and models to reconcile indexed by unique_id.
"""
# Get the shape of the array
n_series, n_samples, horizon = samples.shape
- assert n_series == len(unique_ids)
- assert horizon == len(dates)
- assert (quantiles is not None) ^ (
- level is not None
- ) # check exactly one of quantiles/levels has been input
+ if n_series != len(unique_ids):
+ raise ValueError(
+ f"Number of unique_ids ({len(unique_ids)}) must match the number of series ({n_series})."
+ )
+ if horizon != len(dates):
+ raise ValueError(
+ f"Number of dates ({len(dates)}) must match third dimension of samples array ({horizon})."
+ )
+ if not ((quantiles is None) ^ (level is None)):
+ raise ValueError("Either quantiles or level must be provided, but not both.")
+
+ namespace = sys.modules.get(backend, None)
+ if namespace is None:
+ raise ValueError(f"DataFrame backend {backend} not installed.")
# create initial dictionary
forecasts_mean = np.mean(samples, axis=1).flatten()
unique_ids = np.repeat(unique_ids, horizon)
ds = np.tile(dates, n_series)
- data = pd.DataFrame({id_col: unique_ids, time_col: ds, model_name: forecasts_mean})
# create quantiles and quantile names
if level is not None:
@@ -642,11 +639,15 @@ def samples_to_quantiles_df(
) # [Q,H,N] -> [N,H,Q]
forecasts_quantiles = forecasts_quantiles.reshape(-1, len(_quantiles))
- df = pd.DataFrame(data=forecasts_quantiles, columns=col_names)
+ df_nw = nw.from_dict(
+ {id_col: unique_ids, time_col: ds, model_name: forecasts_mean},
+ native_namespace=namespace,
+ )
+ df_nw = df_nw.with_columns(**dict(zip(col_names, forecasts_quantiles.T)))
- return _quantiles, pd.concat([data, df], axis=1).set_index(id_col)
+ return _quantiles, df_nw.to_native()
-# %% ../nbs/src/utils.ipynb 59
+# %% ../nbs/src/utils.ipynb 55
# Masked empirical covariance matrix
@njit(
"Array(float64, 2, 'F')(Array(float64, 2, 'C'), Array(bool, 2, 'C'))",
@@ -685,7 +686,7 @@ def _ma_cov(residuals: np.ndarray, not_nan_mask: np.ndarray):
return W
-# %% ../nbs/src/utils.ipynb 60
+# %% ../nbs/src/utils.ipynb 56
# Shrunk covariance matrix using the Schafer-Strimmer method
@@ -836,7 +837,7 @@ def _shrunk_covariance_schaferstrimmer_with_nans(
return W
-# %% ../nbs/src/utils.ipynb 62
+# %% ../nbs/src/utils.ipynb 58
# Lasso cyclic coordinate descent
@njit(
"Array(float64, 1, 'C')(Array(float64, 2, 'C'), Array(float64, 1, 'C'), float64, int64, float64)",
diff --git a/nbs/src/core.ipynb b/nbs/src/core.ipynb
index af26b38..83c5e33 100644
--- a/nbs/src/core.ipynb
+++ b/nbs/src/core.ipynb
@@ -69,7 +69,7 @@
"import os\n",
"\n",
"from fastcore.test import test_close, test_eq, test_fail\n",
- "from nbdev.showdoc import add_docs, show_doc\n",
+ "from nbdev.showdoc import show_doc\n",
"import pandas as pd"
]
},
@@ -1282,7 +1282,7 @@
"#| hide\n",
"from statsforecast import StatsForecast\n",
"from statsforecast.utils import generate_series\n",
- "from statsforecast.models import RandomWalkWithDrift, AutoETS"
+ "from statsforecast.models import RandomWalkWithDrift"
]
},
{
@@ -1294,7 +1294,7 @@
"#| hide\n",
"# test unbalanced dataset\n",
"max_tenure = 24\n",
- "dates = pd.date_range(start='2019-01-31', freq='M', periods=max_tenure)\n",
+ "dates = pd.date_range(start='2019-01-31', freq='ME', periods=max_tenure)\n",
"cohort_tenure = [24, 23, 22, 21]\n",
"\n",
"ts_list = []\n",
@@ -1302,7 +1302,7 @@
"# Create ts for each cohort\n",
"for i in range(len(cohort_tenure)):\n",
" ts_list.append(\n",
- " generate_series(n_series=1, freq='M', min_length=cohort_tenure[i], max_length=cohort_tenure[i]).reset_index() \\\n",
+ " generate_series(n_series=1, freq='ME', min_length=cohort_tenure[i], max_length=cohort_tenure[i]).reset_index() \\\n",
" .assign(ult=i) \\\n",
" .assign(ds=dates[-cohort_tenure[i]:]) \\\n",
" .drop(columns=['unique_id'])\n",
@@ -1328,7 +1328,7 @@
" models=[\n",
" RandomWalkWithDrift(),\n",
" ],\n",
- " freq='M',\n",
+ " freq='ME',\n",
" n_jobs=1,\n",
")\n",
"\n",
@@ -1343,8 +1343,8 @@
"fitted_df = fcst.forecast_fitted_values()\n",
"\n",
"fcst_df = hrec.reconcile(\n",
- " Y_hat_df=fcst_df.reset_index(),\n",
- " Y_df=fitted_df.reset_index(),\n",
+ " Y_hat_df=fcst_df,\n",
+ " Y_df=fitted_df,\n",
" S=S_df,\n",
" tags=tags,\n",
")"
@@ -1380,15 +1380,18 @@
" ]\n",
")\n",
"\n",
- "fcst_df = fcst.forecast(df=train_df, h=12, fitted=True)\n",
+ "fcst_df_pl = fcst.forecast(df=train_df, h=12, fitted=True)\n",
"fitted_df = fcst.forecast_fitted_values()\n",
"\n",
- "fcst_df = hrec.reconcile(\n",
- " Y_hat_df=fcst_df,\n",
+ "fcst_df_pl = hrec.reconcile(\n",
+ " Y_hat_df=fcst_df_pl,\n",
" Y_df=fitted_df,\n",
" S=S_df_pl,\n",
" tags=tags,\n",
- ")"
+ ")\n",
+ "\n",
+ "# Test equivalence\n",
+ "pd.testing.assert_frame_equal(fcst_df, fcst_df_pl.to_pandas())"
]
},
{
@@ -1744,7 +1747,7 @@
"import pandas as pd\n",
"\n",
"from statsforecast.core import StatsForecast\n",
- "from statsforecast.models import AutoETS, Naive\n",
+ "from statsforecast.models import AutoETS\n",
"\n",
"from hierarchicalforecast.utils import aggregate\n",
"from hierarchicalforecast.core import HierarchicalReconciliation\n",
diff --git a/nbs/src/probabilistic_methods.ipynb b/nbs/src/probabilistic_methods.ipynb
index d280f89..f91e770 100644
--- a/nbs/src/probabilistic_methods.ipynb
+++ b/nbs/src/probabilistic_methods.ipynb
@@ -42,7 +42,7 @@
"from scipy.stats import norm\n",
"from sklearn.preprocessing import OneHotEncoder\n",
"\n",
- "from hierarchicalforecast.utils import is_strictly_hierarchical, cov2corr"
+ "from hierarchicalforecast.utils import is_strictly_hierarchical"
]
},
{
@@ -120,7 +120,8 @@
"\n",
" # Base Normality Errors assume independence/diagonal covariance\n",
" # TODO: replace bilinearity with elementwise row multiplication\n",
- " R1 = cov2corr(self.W)\n",
+ " std_ = np.sqrt(np.diag(self.W))\n",
+ " R1 = self.W / np.outer(std_, std_) \n",
" Wh = [np.diag(sigma) @ R1 @ np.diag(sigma).T for sigma in self.sigmah.T]\n",
"\n",
" # Reconciled covariances across forecast horizon\n",
diff --git a/nbs/src/utils.ipynb b/nbs/src/utils.ipynb
index e226694..479b6bc 100644
--- a/nbs/src/utils.ipynb
+++ b/nbs/src/utils.ipynb
@@ -50,7 +50,7 @@
"from narwhals.typing import Frame, FrameT\n",
"from numba import njit, prange\n",
"from sklearn.preprocessing import OneHotEncoder\n",
- "from typing import Dict, List, Optional, Iterable, Union, Sequence"
+ "from typing import Dict, List, Optional, Union, Sequence"
]
},
{
@@ -62,8 +62,7 @@
"source": [
"#| hide\n",
"import os\n",
- "import warnings\n",
- "from nbdev.showdoc import add_docs, show_doc\n",
+ "from nbdev.showdoc import show_doc\n",
"from fastcore.test import test_eq, test_close, test_fail\n",
"from statsforecast.utils import generate_series"
]
@@ -126,7 +125,7 @@
"source": [
"#| exporti\n",
"def is_strictly_hierarchical(S: np.ndarray, \n",
- " tags: Dict[str, np.ndarray]):\n",
+ " tags: Dict[str, np.ndarray]) -> bool:\n",
" # main idea:\n",
" # if S represents a strictly hierarchical structure\n",
" # the number of paths before the bottom level\n",
@@ -143,31 +142,6 @@
" return paths == nodes"
]
},
- {
- "cell_type": "code",
- "execution_count": null,
- "id": "da433b2e",
- "metadata": {},
- "outputs": [],
- "source": [
- "#| exporti\n",
- "def cov2corr(cov, return_std=False):\n",
- " \"\"\" convert covariance matrix to correlation matrix\n",
- " **Parameters:**
\n",
- " `cov`: array_like, 2d covariance matrix.
\n",
- " `return_std`: bool=False, if True returned std.
\n",
- " **Returns:**
\n",
- " `corr`: ndarray (subclass) correlation matrix\n",
- " \"\"\"\n",
- " cov = np.asanyarray(cov)\n",
- " std_ = np.sqrt(np.diag(cov))\n",
- " corr = cov / np.outer(std_, std_)\n",
- " if return_std:\n",
- " return corr, std_\n",
- " else:\n",
- " return corr"
- ]
- },
{
"cell_type": "markdown",
"id": "3a1f4267",
@@ -184,7 +158,7 @@
"outputs": [],
"source": [
"#| exporti\n",
- "def _to_upper_hierarchy(bottom_split, bottom_values, upper_key):\n",
+ "def _to_upper_hierarchy(bottom_split: List[str], bottom_values: str, upper_key: str) -> List[str]:\n",
" upper_split = upper_key.split('/')\n",
" upper_idxs = [bottom_split.index(i) for i in upper_split]\n",
"\n",
@@ -195,135 +169,6 @@
" return [join_upper(val) for val in bottom_values]"
]
},
- {
- "cell_type": "code",
- "execution_count": null,
- "id": "f9fdc577",
- "metadata": {},
- "outputs": [],
- "source": [
- "#| hide\n",
- "import warnings"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "id": "6be82d73",
- "metadata": {},
- "outputs": [],
- "source": [
- "#| hide\n",
- "def aggregate_old(\n",
- " df: pd.DataFrame,\n",
- " spec: List[List[str]],\n",
- " exog_vars: Optional[Dict[str, Union[str, List[str]]]] = None,\n",
- " is_balanced: bool = False,\n",
- " sparse_s: bool = False,\n",
- "):\n",
- " \"\"\"Utils Aggregation Function.\n",
- " Aggregates bottom level series contained in the pandas DataFrame `df` according\n",
- " to levels defined in the `spec` list.\n",
- "\n",
- " Parameters\n",
- " ----------\n",
- " df : pandas DataFrame\n",
- " Dataframe with columns `['ds', 'y']` and columns to aggregate.\n",
- " spec : list of list of str\n",
- " List of levels. Each element of the list should contain a list of columns of `df` to aggregate.\n",
- " exog_vars: dictionary of string keys & values that can either be a list of strings or a single string\n",
- " keys correspond to column names and the values represent the aggregation(s) that will be applied to each column. Accepted values are those from Pandas aggregation Functions, check the Pandas docs for guidance\n",
- " is_balanced : bool (default=False)\n",
- " Deprecated.\n",
- " sparse_s : bool (default=False)\n",
- " Return `S_df` as a sparse dataframe.\n",
- "\n",
- " Returns\n",
- " -------\n",
- " Y_df : pandas DataFrame\n",
- " Hierarchically structured series.\n",
- " S_df : pandas DataFrame\n",
- " Summing dataframe.\n",
- " tags : dict\n",
- " Aggregation indices.\n",
- " \"\"\"\n",
- " # Checks\n",
- " if df.isnull().values.any():\n",
- " raise ValueError('`df` contains null values')\n",
- " if is_balanced:\n",
- " warnings.warn(\n",
- " \"`is_balanced` is deprecated and will be removed in a future version. \"\n",
- " \"Don't set this argument to suppress this warning.\",\n",
- " category=DeprecationWarning,\n",
- " )\n",
- "\n",
- " \n",
- " # compute aggregations and tags\n",
- " spec = sorted(spec, key=len)\n",
- " bottom = spec[-1]\n",
- " aggs = []\n",
- " tags = {}\n",
- " # Prepare the aggregation dictionary\n",
- " agg_dict = {\n",
- " \"y\": (\"y\", \"sum\")\n",
- " }\n",
- "\n",
- " # Check if exog_vars are present in df & add to the aggregation dictionary if it is not None\n",
- " if exog_vars is not None:\n",
- " missing_vars = [var for var in exog_vars.keys() if var not in df.columns]\n",
- " if missing_vars:\n",
- " raise ValueError(f\"The following exogenous variables are not present in the DataFrame: {', '.join(missing_vars)}\") \n",
- " else:\n",
- " # Update agg_dict to handle multiple aggregations for each exog_vars key\n",
- " for key, agg_func in exog_vars.items():\n",
- " # Ensure agg_func is a list\n",
- " if isinstance(agg_func, str): # If it's a single string, convert to list\n",
- " agg_func = [agg_func]\n",
- " elif not isinstance(agg_func, list): # Raise an error if it's neither\n",
- " raise ValueError(f\"Aggregation functions for '{key}' must be a string or a list of strings.\")\n",
- " \n",
- " for func in agg_func:\n",
- " agg_dict[f\"{key}_{func}\"] = (key, func) # Update the agg_dict with the new naming structure\n",
- "\n",
- " # Perform the aggregation\n",
- " for levels in spec:\n",
- " agg = df.groupby(levels + ['ds'], observed=True).agg(**agg_dict)\n",
- " if not agg.index.is_monotonic_increasing:\n",
- " agg = agg.sort_index()\n",
- " agg = agg.reset_index('ds')\n",
- " group = agg.index.get_level_values(0)\n",
- " if not pd.api.types.is_string_dtype(group.dtype):\n",
- " group = group.astype(str)\n",
- " for level in levels[1:]:\n",
- " group = group + '/' + agg.index.get_level_values(level).str.replace('/', '_')\n",
- " agg.index = group\n",
- " agg.index.name = 'unique_id'\n",
- " tags['/'.join(levels)] = group.unique().values\n",
- " aggs.append(agg)\n",
- " Y_df = pd.concat(aggs)\n",
- "\n",
- " # construct S\n",
- " bottom_key = '/'.join(bottom)\n",
- " bottom_levels = tags[bottom_key]\n",
- " S = np.empty((len(bottom_levels), len(spec)), dtype=object)\n",
- " for j, levels in enumerate(spec[:-1]):\n",
- " S[:, j] = _to_upper_hierarchy(bottom, bottom_levels, '/'.join(levels))\n",
- " S[:, -1] = tags[bottom_key]\n",
- "\n",
- " categories = list(tags.values())\n",
- " try:\n",
- " encoder = OneHotEncoder(categories=categories, sparse_output=sparse_s, dtype=np.float64)\n",
- " except TypeError: # sklearn < 1.2\n",
- " encoder = OneHotEncoder(categories=categories, sparse=sparse_s, dtype=np.float64) \n",
- " S = encoder.fit_transform(S).T\n",
- " if sparse_s:\n",
- " df_constructor = pd.DataFrame.sparse.from_spmatrix\n",
- " else:\n",
- " df_constructor = pd.DataFrame\n",
- " S_df = df_constructor(S, index=np.hstack(categories), columns=bottom_levels)\n",
- " return Y_df, S_df, tags"
- ]
- },
{
"cell_type": "code",
"execution_count": null,
@@ -382,12 +227,14 @@
" raise ValueError(\"Sparse output is only supported for Pandas DataFrames.\")\n",
" \n",
" for col in df_nw.columns:\n",
- " assert not df_nw[col].is_null().any(), f\"Column {col} contains null values. Make sure no column in the DataFrame contains null values.\"\n",
+ " if df_nw[col].is_null().any():\n",
+ " raise ValueError(f\"Column {col} contains null values. Make sure no column in the DataFrame contains null values.\")\n",
"\n",
" # Check whether all columns in the spec are in the df\n",
" aggregation_cols_in_spec = list(dict.fromkeys([col for cols in spec for col in cols]))\n",
" for col in aggregation_cols_in_spec:\n",
- " assert col in df_nw.columns, f\"Column {col} in spec not present in df\"\n",
+ " if col not in df_nw.columns:\n",
+ " raise ValueError(f\"Column {col} in spec not present in df\")\n",
"\n",
" # Prepare the aggregation dictionary \n",
" agg_dict = dict(zip(target_cols, tuple(zip(target_cols, len(target_cols)*[\"sum\"]))))\n",
@@ -569,23 +416,6 @@
" test_eq(tags[tag], tags_f[tag]) "
]
},
- {
- "cell_type": "code",
- "execution_count": null,
- "id": "db5a1398",
- "metadata": {},
- "outputs": [],
- "source": [
- "#| hide\n",
- "# test against old aggregation function\n",
- "Y_df_old, S_df_old, tags_old = aggregate_old(df, spec)\n",
- "\n",
- "test_eq(Y_df_old.reset_index(), Y_df)\n",
- "test_eq(S_df_old.reset_index(names=\"unique_id\"), S_df) \n",
- "for tag in tags:\n",
- " test_eq(tags[tag], tags_old[tag])"
- ]
- },
{
"cell_type": "code",
"execution_count": null,
@@ -633,10 +463,7 @@
" ['pen', 'ult'],\n",
"]\n",
"\n",
- "hier_df, S_df, tags = aggregate(df=df, spec=hier_levels)\n",
- "hier_df_old, S_df_old, _ = aggregate_old(df=df, spec=hier_levels)\n",
- "test_eq(S_df, S_df_old.reset_index(names=\"unique_id\"))\n",
- "test_eq(hier_df, hier_df_old.reset_index(names=\"unique_id\"))"
+ "hier_df, S_df, tags = aggregate(df=df, spec=hier_levels)"
]
},
{
@@ -676,11 +503,6 @@
"test_eq(hier_df[\"unique_id\"].unique(), S_df[\"unique_id\"])\n",
"test_eq(len(tags), len(hiers_strictly)) \n",
"\n",
- "# Test old vs new\n",
- "hier_df_old, S_df_old, tags_old = aggregate_old(df=df, spec=hiers_strictly)\n",
- "test_eq(hier_df, hier_df_old.reset_index())\n",
- "test_eq(S_df, S_df_old.reset_index(names=\"unique_id\"))\n",
- "\n",
"# grouped structure\n",
"hiers_grouped = [['Country'],\n",
" ['Country', 'State'], \n",
@@ -696,14 +518,7 @@
"test_eq(hier_df[\"unique_id\"].nunique(), 425)\n",
"test_eq(S_df.shape, (425, 305))\n",
"test_eq(hier_df[\"unique_id\"].unique(), S_df[\"unique_id\"])\n",
- "test_eq(len(tags), len(hiers_grouped))\n",
- "\n",
- "# Test old vs new - equivalent up to a different sorting, tbd if this is fine\n",
- "hier_df_old, S_df_old, tags_old = aggregate_old(df=df, spec=hiers_grouped)\n",
- "test_eq(hier_df.sort_values(by=[\"unique_id\", \"ds\"], ignore_index=True), \n",
- " hier_df_old.reset_index().sort_values(by=[\"unique_id\", \"ds\"], ignore_index=True))\n",
- "test_eq(S_df.sort_values(by=\"unique_id\", ignore_index=True)[S_df.columns], \n",
- " S_df_old.reset_index(names=\"unique_id\").sort_values(by=\"unique_id\", ignore_index=True)[S_df.columns])"
+ "test_eq(len(tags), len(hiers_grouped))"
]
},
{
@@ -777,41 +592,6 @@
")"
]
},
- {
- "cell_type": "code",
- "execution_count": null,
- "id": "f4b3828f-bbcc-4116-a969-49c78c33bf72",
- "metadata": {},
- "outputs": [],
- "source": [
- "#| hide\n",
- "# Test equality of aggregation and aggregation_before\n",
- "for name, spec in zip(['strict', 'grouped'], [hiers_strictly, hiers_grouped]):\n",
- " with CodeTimer(f'{name} aggregation before'):\n",
- " Y_df_old, S_df_old, tags_old = aggregate_old(df=df, spec=spec)\n",
- " \n",
- " with CodeTimer(f'{name} aggregation now'):\n",
- " Y_df, S_df, tags = aggregate(df=df, spec=spec)\n",
- "\n",
- " Y_df = Y_df.sort_values(by=[\"unique_id\", \"ds\"], ignore_index=True)\n",
- " Y_df_old = Y_df_old.reset_index().sort_values(by=[\"unique_id\", \"ds\"], ignore_index=True)\n",
- "\n",
- " S_df = S_df.sort_values(by=\"unique_id\", ignore_index=True)[S_df.columns]\n",
- " S_df_old = S_df_old.reset_index(names=\"unique_id\").sort_values(by=\"unique_id\", ignore_index=True)[S_df.columns]\n",
- " \n",
- " np.testing.assert_allclose(\n",
- " Y_df['y'].values,\n",
- " Y_df_old['y'].values,\n",
- " )\n",
- " np.testing.assert_equal(S_df.values, S_df_old.values)\n",
- " \n",
- " test_eq(S_df.columns, S_df_old.columns)\n",
- " test_eq(S_df.index, S_df_old.index)\n",
- " \n",
- " test_eq(Y_df.columns, Y_df_old.columns)\n",
- " test_eq(Y_df.index, Y_df_old.index)"
- ]
- },
{
"cell_type": "code",
"execution_count": null,
@@ -1421,7 +1201,7 @@
"#| exporti\n",
"\n",
"# convert levels to output quantile names\n",
- "def level_to_outputs(level:Iterable[int]):\n",
+ "def level_to_outputs(level: List[int]) -> tuple[List[float], List[str]]:\n",
" \"\"\" Converts list of levels into output names matching StatsForecast and NeuralForecast methods.\n",
"\n",
" **Parameters:**
\n",
@@ -1444,7 +1224,7 @@
" return quantiles, output_names\n",
"\n",
"# convert quantiles to output quantile names\n",
- "def quantiles_to_outputs(quantiles:Iterable[float]):\n",
+ "def quantiles_to_outputs(quantiles: List[float]) -> tuple[List[float], List[str]]:\n",
" \"\"\"Converts list of quantiles into output names matching StatsForecast and NeuralForecast methods.\n",
"\n",
" **Parameters:**
\n",
@@ -1480,10 +1260,11 @@
" dates: List[str], \n",
" quantiles: Optional[List[float]] = None,\n",
" level: Optional[List[int]] = None, \n",
- " model_name: Optional[str] = \"model\",\n",
+ " model_name: str = \"model\",\n",
" id_col: str = 'unique_id',\n",
" time_col: str = 'ds',\n",
- " ):\n",
+ " backend: str = 'pandas',\n",
+ " ) -> tuple[List[float], FrameT]:\n",
" \"\"\" Transform Random Samples into HierarchicalForecast input.\n",
" Auxiliary function to create compatible HierarchicalForecast input `Y_hat_df` dataframe.\n",
"\n",
@@ -1496,24 +1277,35 @@
" `model_name`: string. Name of forecasting model.
\n",
" `id_col` : str='unique_id', column that identifies each serie.
\n",
" `time_col` : str='ds', column that identifies each timestep, its values can be timestamps or integers.
\n",
+ " `backend` : str='pandas', backend to use for the output dataframe, either 'pandas' or 'polars'.
\n",
"\n",
" **Returns:**
\n",
" `quantiles`: float list in [0., 1.]. quantiles to estimate from y distribution .
\n",
- " `Y_hat_df`: pd.DataFrame. With base quantile forecasts with columns ds and models to reconcile indexed by unique_id.\n",
+ " `Y_hat_df`: DataFrame. With base quantile forecasts with columns ds and models to reconcile indexed by unique_id.\n",
" \"\"\"\n",
" \n",
" # Get the shape of the array\n",
" n_series, n_samples, horizon = samples.shape\n",
"\n",
- " assert n_series == len(unique_ids)\n",
- " assert horizon == len(dates)\n",
- " assert (quantiles is not None) ^ (level is not None) #check exactly one of quantiles/levels has been input\n",
+ " if n_series != len(unique_ids):\n",
+ " raise ValueError(\n",
+ " f\"Number of unique_ids ({len(unique_ids)}) must match the number of series ({n_series}).\"\n",
+ " )\n",
+ " if horizon != len(dates):\n",
+ " raise ValueError(\n",
+ " f\"Number of dates ({len(dates)}) must match third dimension of samples array ({horizon}).\"\n",
+ " )\n",
+ " if not ((quantiles is None) ^ (level is None)):\n",
+ " raise ValueError(\"Either quantiles or level must be provided, but not both.\")\n",
+ "\n",
+ " namespace = sys.modules.get(backend, None)\n",
+ " if namespace is None:\n",
+ " raise ValueError(f\"DataFrame backend {backend} not installed.\")\n",
"\n",
" #create initial dictionary\n",
" forecasts_mean = np.mean(samples, axis=1).flatten()\n",
" unique_ids = np.repeat(unique_ids, horizon)\n",
" ds = np.tile(dates, n_series)\n",
- " data = pd.DataFrame({id_col:unique_ids, time_col:ds, model_name:forecasts_mean})\n",
"\n",
" #create quantiles and quantile names\n",
" if level is not None:\n",
@@ -1529,11 +1321,11 @@
"\n",
" forecasts_quantiles = np.transpose(forecasts_quantiles, (1,2,0)) # [Q,H,N] -> [N,H,Q]\n",
" forecasts_quantiles = forecasts_quantiles.reshape(-1,len(_quantiles))\n",
- "\n",
- " df = pd.DataFrame(data=forecasts_quantiles, \n",
- " columns=col_names)\n",
" \n",
- " return _quantiles, pd.concat([data,df], axis=1).set_index(id_col)"
+ " df_nw = nw.from_dict({id_col:unique_ids, time_col:ds, model_name:forecasts_mean}, native_namespace=namespace)\n",
+ " df_nw = df_nw.with_columns(**dict(zip(col_names, forecasts_quantiles.T))) \n",
+ "\n",
+ " return _quantiles, df_nw.to_native()"
]
},
{
@@ -1604,10 +1396,45 @@
")\n",
"test_eq(\n",
" ret_df_1.columns,\n",
- " ['ds', 'model', 'model-median', 'model-lo-90', 'model-lo-50', 'model-lo-10', 'model-hi-10', 'model-hi-50', 'model-hi-90']\n",
+ " ['unique_id', 'ds', 'model', 'model-median', 'model-lo-90', 'model-lo-50', 'model-lo-10', 'model-hi-10', 'model-hi-50', 'model-hi-90']\n",
+ ")\n",
+ "test_eq(\n",
+ " ret_df_1[\"unique_id\"].values,\n",
+ " ['id1', 'id1', 'id1', 'id1', 'id1', 'id1', 'id1', 'id1', 'id1', 'id1',\n",
+ " 'id2', 'id2', 'id2', 'id2', 'id2', 'id2', 'id2', 'id2', 'id2', 'id2',\n",
+ " 'id3', 'id3', 'id3', 'id3', 'id3', 'id3', 'id3', 'id3', 'id3', 'id3']\n",
+ ")\n",
+ "test_eq(\n",
+ " ret_quantiles_1, ret_quantiles_2\n",
+ ")\n",
+ "test_eq(\n",
+ " ret_df_1[\"unique_id\"], ret_df_2[\"unique_id\"]\n",
+ ")"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "d6eeb27e",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "#| hide\n",
+ "# polars\n",
+ "\n",
+ "ret_quantiles_1, ret_df_1 = samples_to_quantiles_df(samples, unique_ids, dates, level=level, backend='polars')\n",
+ "ret_quantiles_2, ret_df_2 = samples_to_quantiles_df(samples, unique_ids, dates, quantiles=quantiles, backend='polars')\n",
+ "\n",
+ "test_eq(\n",
+ " ret_quantiles_1,\n",
+ " quantiles\n",
+ ")\n",
+ "test_eq(\n",
+ " ret_df_1.columns,\n",
+ " ['unique_id', 'ds', 'model', 'model-median', 'model-lo-90', 'model-lo-50', 'model-lo-10', 'model-hi-10', 'model-hi-50', 'model-hi-90']\n",
")\n",
"test_eq(\n",
- " ret_df_1.index,\n",
+ " list(ret_df_1[\"unique_id\"]),\n",
" ['id1', 'id1', 'id1', 'id1', 'id1', 'id1', 'id1', 'id1', 'id1', 'id1',\n",
" 'id2', 'id2', 'id2', 'id2', 'id2', 'id2', 'id2', 'id2', 'id2', 'id2',\n",
" 'id3', 'id3', 'id3', 'id3', 'id3', 'id3', 'id3', 'id3', 'id3', 'id3']\n",
@@ -1616,7 +1443,7 @@
" ret_quantiles_1, ret_quantiles_2\n",
")\n",
"test_eq(\n",
- " ret_df_1.index, ret_df_2.index\n",
+ " ret_df_1[\"unique_id\"], ret_df_2[\"unique_id\"]\n",
")"
]
},