From a487f0b8a3fd73f299b4bdd13017b94c74d206c6 Mon Sep 17 00:00:00 2001 From: rakow Date: Fri, 14 Apr 2023 12:37:29 +0200 Subject: [PATCH 01/29] improved calibration to support multiple attributes --- matsim/analysis.py | 20 ++++-- matsim/calibration.py | 134 +++++++++++++++++++++++++++----------- tests/test_calibration.py | 9 +-- 3 files changed, 114 insertions(+), 49 deletions(-) diff --git a/matsim/analysis.py b/matsim/analysis.py index 17234db..c2373f9 100644 --- a/matsim/analysis.py +++ b/matsim/analysis.py @@ -10,8 +10,13 @@ def read_trips_and_persons(run, transform_persons=None, transform_trips=None): """ Read trips and persons from run directory """ - trips = glob.glob(run.rstrip("/") + "/*.output_trips.csv.gz")[0] + # Return input as output + # This allows to re-use input for the calc functions + if type(run) is tuple and len(run) == 2: + return run + + trips = glob.glob(run.rstrip("/") + "/*.output_trips.csv.gz")[0] persons = glob.glob(run.rstrip("/") + "/*.output_persons.csv.gz")[0] df = pd.read_csv(trips, sep=";", dtype={"person": "str"}) @@ -66,7 +71,7 @@ def calc_mode_share(run, transform_persons=None, transform_trips=None): def calc_mode_stats(run, attrs=[], dist_bins = [0, 1000, 2000, 5000, 10000, 20000, np.inf], dist_labels = ["0 - 1000", "1000 - 2000", "2000 - 5000", "5000 - 10000", "10000 - 20000", "20000+"], - transform_persons=None, transform_trips=None): + transform_persons=None, transform_trips=None) -> pd.DataFrame: """ Calculate detailed mode statistics """ trips, _ = read_trips_and_persons(run, transform_persons, transform_trips) @@ -83,12 +88,10 @@ def aggr(x): aggr["share"] = aggr.n / aggr.n.sum() aggr["share"].fillna(0, inplace=True) - - return aggr -def calc_population_stats(run, attrs=[], transform_persons=None, transform_trips=None): +def calc_population_stats(run, attrs=[], transform_persons=None, transform_trips=None) -> pd.DataFrame: """ Calculate population statistics """ trips, persons = read_trips_and_persons(run, transform_persons, transform_trips) @@ -126,8 +129,11 @@ def summarize(x): return pd.Series(data=data) - - aggr = persons.groupby(attrs).apply(summarize) + if attrs: + aggr = persons.groupby(attrs).apply(summarize) + else: + aggr = summarize(persons) + aggr = aggr.to_frame(0).T aggr["population_share"] = aggr.n / aggr.n.sum() diff --git a/matsim/calibration.py b/matsim/calibration.py index 0380ef3..de6c242 100644 --- a/matsim/calibration.py +++ b/matsim/calibration.py @@ -34,14 +34,15 @@ def _same_sign(x): x = x.to_numpy() return np.all(x >= 0) if x[0] >= 0 else np.all(x < 0) -class ASCSampler(optuna.samplers.BaseSampler): +class CalibrationSampler(optuna.samplers.BaseSampler): """ Sample asc according to obtained mode shares """ - def __init__(self, mode_share, fixed_mode, initial_asc, lr, constraints): + def __init__(self, target, mode_share, fixed_mode, initial, lr, constraints): + self.target = target self.mode_share = mode_share self.fixed_mode = fixed_mode - self.initial_asc = initial_asc + self.initial = initial self.lr = lr self.constraints = constraints @@ -54,25 +55,24 @@ def sample_relative(self, study, trial, search_space): def sample_independent(self, study, trial, param_name, param_distribution): - if param_name == self.fixed_mode: - return 0 + param, _, mode = param_name.partition("_") completed = _completed_trials(study) - if len(completed) == 0: - asc = self.initial_asc[param_name] + initial = self.initial.get(param, {}).get(mode, 0) - if self.constraints is not None and param_name in self.constraints: - asc = self.constraints[param_name](asc) + if self.constraints is not None and param in self.constraints and mode in self.constraints[param]: + initial = self.constraints[param](initial) - return asc + return initial last = completed[-1] + last_param = last.params[param_name] - asc = last.params[param_name] - - step = self.calc_update(self.mode_share[param_name], last.user_attrs["%s_share" % param_name], - self.mode_share[self.fixed_mode], last.user_attrs["%s_share" % self.fixed_mode]) + if param == "asc": + step = self.sample_asc(mode, last) + elif param == "dist": + step = self.sample_dist_util(mode, last) rate = 1.0 if self.lr is not None: @@ -89,16 +89,56 @@ def sample_independent(self, study, trial, param_name, param_distribution): trial.set_user_attr("%s_rate" % param_name, rate) trial.set_user_attr("%s_step" % param_name, step) - asc += rate * step + last_param += rate * step # Call constraint if present - if self.constraints is not None and param_name in self.constraints: - asc = self.constraints[param_name](asc) + if self.constraints is not None and param in self.constraints and mode in self.constraints[param]: + last_param = self.constraints[param](last_param) + + return last_param + + def sample_asc(self, mode, last): + + if mode == self.fixed_mode: + return 0 + + step = self.calc_asc_update(self.mode_share.loc[mode], last.user_attrs["%s_share" % mode], + self.mode_share.loc[self.fixed_mode], last.user_attrs["%s_share" % self.fixed_mode]) + + return step + + def sample_dist_util(self, mode, last): + + df = pd.DataFrame.from_dict(last.user_attrs["mode_stats"], orient="tight") + target = self.target[self.target["mode"] == mode].reset_index(drop=True).copy() + + df = df.loc[target.dist_group].reset_index() - return asc + # Trips distances shares over all modes + # Correction factor is calculated here + ref_dist = self.target.groupby("dist_group").agg(share=("share", "sum")) + sim_dist = df.groupby("dist_group").agg(share=("share", "sum")) + + correction = ref_dist.loc[target.dist_group] / sim_dist.loc[target.dist_group] + + df = df[df.main_mode == mode].reset_index(drop=True).copy() + + if "mean_dist" not in target.columns: + target["mean_dist"] = df.mean_dist + + df["correction"] = correction.values + + target.share = target.share / sum(target.share) + df.share = df.share / sum(df.share) + + real = (df.mean_dist * df.share * df.correction).sum() + target = (target.mean_dist * target.share).sum() + + # TODO: configurable parameter + return float(0.2 * (target - real) / (1000 * 1000)) @staticmethod - def calc_update(z_i, m_i, z_0, m_0): + def calc_asc_update(z_i, m_i, z_0, m_0): """ Calculates the asc update for one step """ # Update asc # (1) Let z_i be the observed share of mode i. (real data, to be reproduced) @@ -276,17 +316,19 @@ def _fn(n, *args, **kwargs): return _fn -def create_mode_share_study(name: str, jar: str, config: str, - modes: Sequence[str], mode_share: Dict[str, float], +def create_mode_calibration_study(name: str, params: set, + jar: Union[str, os.PathLike], config: Union[str, os.PathLike], + modes: Sequence[str], target: Union[str, os.PathLike], fixed_mode: str = "walk", args: Union[str, Callable]="", jvm_args="", - initial_asc: Dict[str, float] = None, + initial_params: Dict[str, Dict[str, float]] = None, transform_persons: Callable = None, transform_trips: Callable = None, chain_runs: Union[bool, int, Callable] = False, lr: Callable[[int, str, float, Dict[str, float], optuna.Trial, optuna.Study], float] = None, - constraints: Dict[str, Callable] = None, - storage: optuna.storages.RDBStorage = None + constraints: Dict[str, Dict[str, Callable]] = None, + storage: optuna.storages.RDBStorage = None, + debug: bool = False ) -> Tuple[optuna.Study, Callable]: """ Create or load an existing study for mode share calibration using asc values. @@ -294,10 +336,11 @@ def create_mode_share_study(name: str, jar: str, config: str, study.optimize(obj, 10) :param name: name of the study + :param params: parameters to calibrate :param jar: path to executable jar file of the scenario :param config: path to config file to run :param modes: list of all relevant modes - :param mode_share: dict of target mode shares + :param target: csv file with target shares :param fixed_mode: the fixed mode with asc=0 :param args: additional arguments to the executable jar, can also be a callback function :param jvm_args: additional jvm args @@ -308,29 +351,35 @@ def create_mode_share_study(name: str, jar: str, config: str, :param lr: learning rate schedule, will be called with (trial number, mode, asc update, mode_share, trial, study) :param constraints: constraints for each mode, must return asc and will be called with original asc :param storage: custom storage object to overwrite default sqlite backend + :param debug: enable debug output :return: tuple of study and optimization objective. """ # Init with 0 - if initial_asc is None: - initial_asc = {} - for m in modes: - initial_asc[m] = 0 + if initial_params is None: + for p in params: + for m in modes: + initial_params[m] = 0 # Set some custom arguments to prevent known errors on NFS if storage is None: storage = optuna.storages.RDBStorage(url="sqlite:///%s.db" % name, engine_kwargs={"connect_args": {"timeout": 100}, "isolation_level": "AUTOCOMMIT"}) + + target = pd.read_csv(target) + + mode_share = target.groupby("mode").agg(share=("share", "sum")) study = optuna.create_study( study_name=name, storage=storage, load_if_exists=True, directions=["minimize"] * len(modes), - sampler=ASCSampler(mode_share, fixed_mode, initial_asc, lr, constraints) + sampler=CalibrationSampler(target, mode_share, fixed_mode, initial_params, lr, constraints) ) study.set_user_attr("modes", modes) + study.set_user_attr("params", params) study.set_user_attr("fixed_mode", fixed_mode) if not path.exists("params"): @@ -339,23 +388,27 @@ def create_mode_share_study(name: str, jar: str, config: str, if not path.exists("runs"): makedirs("runs") - print("Running study with target shares:", mode_share) + print("Running study with target:", mode_share) def f(trial): params_path = path.join("params", "run%d.yaml" % trial.number) - params = [] + mode_params = [] for mode in modes: # preserve order m = {"mode": mode} - asc = trial.suggest_float(mode, sys.float_info.min, sys.float_info.max) - m["constant"] = asc - params.append(m) + if "asc" in params: + m["constant"] = trial.suggest_float("asc_" + mode, sys.float_info.min, sys.float_info.max) + + if "util_dist" in params or "dist" in params: + m["marginalUtilityOfDistance_util_m"] = trial.suggest_float("dist_" + mode, sys.float_info.min, sys.float_info.max) + + mode_params.append(m) with open(params_path, "w") as f: - yaml.dump({"planCalcScore": {"scoringParameters": [{"modeParams": params}]}}, f, sort_keys=False) + yaml.dump({"planCalcScore": {"scoringParameters": [{"modeParams": mode_params}]}}, f, sort_keys=False) run_dir = "runs/%03d" % trial.number @@ -404,7 +457,9 @@ def f(trial): if os.name != 'nt': cmd = cmd.split(" ") - p = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + p = subprocess.Popen(cmd, + stdout=sys.stdout if debug else subprocess.DEVNULL, + stderr=sys.stderr if debug else subprocess.DEVNULL) try: while p.poll() is None: sleep(1) @@ -415,6 +470,9 @@ def f(trial): p.terminate() shares = analysis.calc_mode_share(run_dir, transform_persons=transform_persons, transform_trips=transform_trips) + mode_stats = analysis.calc_mode_stats(run_dir, transform_persons=transform_persons, transform_trips=transform_trips) + + trial.set_user_attr("mode_stats", mode_stats.to_dict(orient="tight")) print("Obtained mode shares:", shares) @@ -423,7 +481,7 @@ def f(trial): res = [] for mode in modes: - res.append(abs(mode_share[mode] - trial.user_attrs["%s_share" % mode])) + res.append(abs(mode_share.loc[mode] - trial.user_attrs["%s_share" % mode])) return res diff --git a/tests/test_calibration.py b/tests/test_calibration.py index 4465b7e..4aee3c7 100644 --- a/tests/test_calibration.py +++ b/tests/test_calibration.py @@ -1,16 +1,18 @@ import gzip import pathlib +import math import pytest import numpy as np +import pandas as pd -from matsim.calibration import ASCSampler +from matsim.calibration import CalibrationSampler def test_asc_sampler(): # variables are not needed for this test - sampler = ASCSampler(None, None, None, None, None) + sampler = CalibrationSampler(None, None, None, None, None) np.random.seed(0) @@ -20,7 +22,6 @@ def test_asc_sampler(): # real shares (target) z = [0.1, 0.5, 0.3, 0.1] - # 100 iterations for it in range(200): modes = np.argmax(utils + ascs + np.random.normal(size=(1000, 4)), axis=1) @@ -29,7 +30,7 @@ def test_asc_sampler(): share = counts / np.sum(counts) for i in range(1, len(share)): - ascs[i] += sampler.calc_update(z[i], share[i], z[0], share[0]) + ascs[i] += sampler.calc_asc_update(z[i], share[i], z[0], share[0]) err = np.sum(np.abs(z - share)) From d0d5492c85dc7de63e208a4643a21ecdc19c3a3b Mon Sep 17 00:00:00 2001 From: rakow Date: Mon, 17 Apr 2023 12:31:38 +0200 Subject: [PATCH 02/29] add constraints, tune parameters --- matsim/calibration.py | 26 ++++++++++++++++++-------- setup.py | 2 +- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/matsim/calibration.py b/matsim/calibration.py index de6c242..e240965 100644 --- a/matsim/calibration.py +++ b/matsim/calibration.py @@ -34,6 +34,14 @@ def _same_sign(x): x = x.to_numpy() return np.all(x >= 0) if x[0] >= 0 else np.all(x < 0) +def positive_constraint(mode, val): + """ Ensures parameter are positive """ + return max(0, val) + +def negative_constraint(mode, val): + """ Ensure parameter are negative """ + return min(0, val) + class CalibrationSampler(optuna.samplers.BaseSampler): """ Sample asc according to obtained mode shares """ @@ -61,8 +69,8 @@ def sample_independent(self, study, trial, param_name, param_distribution): if len(completed) == 0: initial = self.initial.get(param, {}).get(mode, 0) - if self.constraints is not None and param in self.constraints and mode in self.constraints[param]: - initial = self.constraints[param](initial) + if self.constraints is not None and param in self.constraints: + initial = self.constraints[param](mode, initial) return initial @@ -92,8 +100,8 @@ def sample_independent(self, study, trial, param_name, param_distribution): last_param += rate * step # Call constraint if present - if self.constraints is not None and param in self.constraints and mode in self.constraints[param]: - last_param = self.constraints[param](last_param) + if self.constraints is not None and param in self.constraints: + last_param = self.constraints[param](mode, last_param) return last_param @@ -102,8 +110,8 @@ def sample_asc(self, mode, last): if mode == self.fixed_mode: return 0 - step = self.calc_asc_update(self.mode_share.loc[mode], last.user_attrs["%s_share" % mode], - self.mode_share.loc[self.fixed_mode], last.user_attrs["%s_share" % self.fixed_mode]) + step = self.calc_asc_update(self.mode_share.loc[mode].share, last.user_attrs["%s_share" % mode], + self.mode_share.loc[self.fixed_mode].share, last.user_attrs["%s_share" % self.fixed_mode]) return step @@ -134,8 +142,10 @@ def sample_dist_util(self, mode, last): real = (df.mean_dist * df.share * df.correction).sum() target = (target.mean_dist * target.share).sum() + # TODO: magnitude should depend on the asc + # TODO: configurable parameter - return float(0.2 * (target - real) / (1000 * 1000)) + return float(0.05 * (target - real) / (1000 * 1000)) @staticmethod def calc_asc_update(z_i, m_i, z_0, m_0): @@ -326,7 +336,7 @@ def create_mode_calibration_study(name: str, params: set, transform_trips: Callable = None, chain_runs: Union[bool, int, Callable] = False, lr: Callable[[int, str, float, Dict[str, float], optuna.Trial, optuna.Study], float] = None, - constraints: Dict[str, Dict[str, Callable]] = None, + constraints: Dict[str, Callable[[str, float], float]] = None, storage: optuna.storages.RDBStorage = None, debug: bool = False ) -> Tuple[optuna.Study, Callable]: diff --git a/setup.py b/setup.py index f7d1e31..015ab07 100644 --- a/setup.py +++ b/setup.py @@ -27,7 +27,7 @@ install_requires=[ "protobuf >= 3.10.0", "xopen", - "pandas", # "shapely", "geopandas >= 0.6.0" + "pandas >= 1.4.0", # "shapely", "geopandas >= 0.6.0" ], extras_require = { 'calibration': ["optuna >= 2.7.0"] From 242fd928092b539c652b3a25d8b1c1501033c2cc Mon Sep 17 00:00:00 2001 From: Christian Rakow Date: Fri, 29 Sep 2023 15:11:01 +0200 Subject: [PATCH 03/29] restructure calibration for more modularity --- matsim/calibration.py | 520 --------------------------- matsim/calibration/__init__.py | 315 ++++++++++++++++ matsim/{ => calibration}/analysis.py | 56 ++- matsim/calibration/base.py | 122 +++++++ matsim/calibration/calib_asc.py | 76 ++++ matsim/calibration/calib_dist.py | 43 +++ matsim/calibration/constraints.py | 10 + tests/test_calibration.py | 16 +- 8 files changed, 625 insertions(+), 533 deletions(-) delete mode 100644 matsim/calibration.py create mode 100644 matsim/calibration/__init__.py rename matsim/{ => calibration}/analysis.py (71%) create mode 100644 matsim/calibration/base.py create mode 100644 matsim/calibration/calib_asc.py create mode 100644 matsim/calibration/calib_dist.py create mode 100644 matsim/calibration/constraints.py diff --git a/matsim/calibration.py b/matsim/calibration.py deleted file mode 100644 index 30be58e..0000000 --- a/matsim/calibration.py +++ /dev/null @@ -1,520 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import os -import subprocess -import glob -import math -import shutil -import sys -from os import path, makedirs -from time import sleep - -from typing import Any, Union, Sequence, Dict, Callable, Tuple - -import yaml -import numpy as np -import pandas as pd -import geopandas -import optuna - -from optuna.trial import TrialState - -# This is there so that this file can be used standalone, as well as installed from the package -try: - import analysis -except ImportError: - from . import analysis - -def _completed_trials(study): - completed = filter(lambda s: s.state == TrialState.COMPLETE, study.trials) - return sorted(completed, key=lambda s: s.number) - -def _same_sign(x): - x = x.to_numpy() - return np.all(x >= 0) if x[0] >= 0 else np.all(x < 0) - -def positive_constraint(mode, val): - """ Ensures parameter are positive """ - return max(0, val) - -def negative_constraint(mode, val): - """ Ensure parameter are negative """ - return min(0, val) - -class CalibrationSampler(optuna.samplers.BaseSampler): - """ Sample asc according to obtained mode shares """ - - def __init__(self, target, mode_share, fixed_mode, initial, lr, constraints): - - self.target = target - self.mode_share = mode_share - self.fixed_mode = fixed_mode - self.initial = initial - self.lr = lr - self.constraints = constraints - - def infer_relative_search_space(self, study, trial): - return {} - - def sample_relative(self, study, trial, search_space): - #study._storage.set_trial_system_attr(trial._trial_id, "search_space", self._search_space) - return {} - - def sample_independent(self, study, trial, param_name, param_distribution): - - param, _, mode = param_name.partition("_") - - completed = _completed_trials(study) - if len(completed) == 0: - initial = self.initial.get(param, {}).get(mode, 0) - - if self.constraints is not None and param in self.constraints: - initial = self.constraints[param](mode, initial) - - return initial - - last = completed[-1] - last_param = last.params[param_name] - - if param == "asc": - step = self.sample_asc(mode, last) - elif param == "dist": - step = self.sample_dist_util(mode, last) - - rate = 1.0 - if self.lr is not None: - - rate = self.lr(float(len(completed) + 1), param_name, step, self.mode_share, trial, study) - - # rate of None or 0 would be invalid - if not rate: - rate = 1 - - # numpy types need casting - rate = float(rate) - - trial.set_user_attr("%s_rate" % param_name, rate) - trial.set_user_attr("%s_step" % param_name, step) - - last_param += rate * step - - # Call constraint if present - if self.constraints is not None and param in self.constraints: - last_param = self.constraints[param](mode, last_param) - - return last_param - - def sample_asc(self, mode, last): - - if mode == self.fixed_mode: - return 0 - - step = self.calc_asc_update(self.mode_share.loc[mode].share, last.user_attrs["%s_share" % mode], - self.mode_share.loc[self.fixed_mode].share, last.user_attrs["%s_share" % self.fixed_mode]) - - return step - - def sample_dist_util(self, mode, last): - - df = pd.DataFrame.from_dict(last.user_attrs["mode_stats"], orient="tight") - target = self.target[self.target["mode"] == mode].reset_index(drop=True).copy() - - df = df.loc[target.dist_group].reset_index() - - # Trips distances shares over all modes - # Correction factor is calculated here - ref_dist = self.target.groupby("dist_group").agg(share=("share", "sum")) - sim_dist = df.groupby("dist_group").agg(share=("share", "sum")) - - correction = ref_dist.loc[target.dist_group] / sim_dist.loc[target.dist_group] - - df = df[df.main_mode == mode].reset_index(drop=True).copy() - - if "mean_dist" not in target.columns: - target["mean_dist"] = df.mean_dist - - df["correction"] = correction.values - - target.share = target.share / sum(target.share) - df.share = df.share / sum(df.share) - - real = (df.mean_dist * df.share * df.correction).sum() - target = (target.mean_dist * target.share).sum() - - # TODO: magnitude should depend on the asc - - # TODO: configurable parameter - return float(0.05 * (target - real) / (1000 * 1000)) - - @staticmethod - def calc_asc_update(z_i, m_i, z_0, m_0): - """ Calculates the asc update for one step """ - # Update asc - # (1) Let z_i be the observed share of mode i. (real data, to be reproduced) - # (2) Run the simulation to convergence. Obtain simulated mode shares m_i. - # (3) Do nothing for mode 0. For all other modes: add [ln(z_i) - ln(m_i)] – [ln(z_0) - ln(m_0)] to its ASC. - # (4) Goto 2. - return math.log(z_i) - math.log(m_i) - (math.log(z_0) - math.log(m_0)) - -def calc_adjusted_mode_share(sim: pd.DataFrame, survey: pd.DataFrame, - count_var: str = "trips", dist_var: str = "dist_group") -> Tuple[Any, pd.DataFrame]: - """ This function can be used if the given input trip data has a different distance distribution than the survey data. - It will rescale the survey data to match simulated data, which allows to calculate an adjusted overall mode share. - - :param sim: data frame containing shares for distance group and modes - :param survey: data frame containing shares from survey data - :param count_var: name of column containing the number of trips or share in 'sim' - :param dist_var: name of the column holding the distance group information - :return: tuple of optimization result and adjusted mode share - """ - from scipy.optimize import minimize - - sagg = sim.groupby(dist_var).sum() - sagg['share'] = sagg[count_var] / np.sum(sagg[count_var]) - - # Rescale the distance groups of the survey data so that it matches the distance group distribution of the simulation - # The overall mode share after this adjustment will the resulting adjusted mode share - def f(x, result=False): - adj = survey.copy() - - for i, t in enumerate(x): - adj.loc[adj[dist_var] == sagg.index[i], "share"] *= t - - adj.share = adj.share / np.sum(adj.share) - - agg = adj.groupby(dist_var).sum() - - # Minimized difference between adjusted and simulated distribution - err = sum((sagg.share - agg.share)**2) - - if result: - return adj - - return err - - # One variable for each distance group - x0 = np.ones(len(sagg.index)) / len(sagg.index) - - # Sum should be less than one - cons = [{'type': 'ineq', 'fun': lambda x: 1 - sum(x)}] - bnds = tuple((0, 1) for x in x0) - - res = minimize(f, x0, method='SLSQP', bounds=bnds, constraints=cons) - - df = f(res.x, True) - - return res, df - - -def study_as_df(study): - """ Convert study to dataframe """ - completed = _completed_trials(study) - - modes = study.user_attrs["modes"] - #fixed_mode = study.user_attrs["fixed_mode"] - - data = [] - - for i, trial in enumerate(completed): - - for j, m in enumerate(modes): - entry = { - "trial": i, - "mode": m, - "asc": trial.params[m], - "error": trial.values[j] - } - - for k, v in trial.user_attrs.items(): - if k.startswith(m + "_"): - entry[k[len(m) + 1:]] = v - - data.append(entry) - - return pd.DataFrame(data) - -def auto_lr_scheduler(start=9, interval=3, lookback=3, throttle=0.7): - """ Creates a function to be passed into the lr argument of the mode share study. This scheduler will - try to increase the step size automatically to speed up convergence. - - :param start: earliest run to start adjusting - :param interval: apply every x runs - :param lookback: look at x recent runs for the interpolation - :param throttle: reduce estimate as step size will most likely be over-estimated - """ - if start < 1: - raise ValueError("Start must be at least 1") - - if interval > start: - raise ValueError("Start must be less equal than interval") - - def _fn(n, mode, update, mode_share, trial, study): - - if n < start: - return 1 - - # Only run every x iterations - if (n - start) % interval != 0: - return 1 - - df = study_as_df(study) - df = df[df["mode"] == mode] - - updates = df.asc.diff(1) - changes = df.share.diff(1) - - # ensure that all previous updates and all previous reactions have been in the same direction - # otherwise there will be instability - if not _same_sign(updates.iloc[-lookback:]) or not _same_sign(changes.iloc[-lookback:]): - return 1 - - asc = df.asc.iloc[-1] - - x = df.share.iloc[-lookback:] - y = df.asc.iloc[-lookback:] - - A = np.vstack([x, np.ones(len(x))]).T - m, c = np.linalg.lstsq(A, y, rcond=None)[0] - - target = m*mode_share[mode] + c - new_update = (target - asc) * throttle - - # updates must be in same direction - if np.sign(update) != np.sign(new_update): - return 1 - - trial.set_user_attr("%s_auto_lr" % mode, True) - - return new_update / update - - - return _fn - -def default_chain_scheduler(completed): - """ Default function to determin when to chain runs. """ - - n = len(completed) - - if n <= 6: - return n % 2 == 0 - - if n <= 15: - return n % 5 == 0 - - if n <= 50: - return n % 10 == 0 - - return False - -def linear_lr_scheduler(start=0.6, end=1, interval=3): - """ Creates an lr scheduler that will interpolate linearly from start to end over the first n iterations. - - :param start: Initial learning rate. - :param end: Finial learning rate to reach. - :param interval: Number of runs until end rate should be reached. - """ - if interval < 2: - raise ValueError("N must be greater or equal 2.") - - def _fn(n, *args, **kwargs): - - if n > interval: - return end - - return start + (n - 1) * (end - start) / interval - - return _fn - -def create_mode_calibration_study(name: str, params: set, - jar: Union[str, os.PathLike], config: Union[str, os.PathLike], - modes: Sequence[str], target: Union[str, os.PathLike], - fixed_mode: str = "walk", - args: Union[str, Callable]="", jvm_args="", - initial_params: Dict[str, Dict[str, float]] = None, - transform_persons: Callable = None, - transform_trips: Callable = None, - chain_runs: Union[bool, int, Callable] = False, - lr: Callable[[int, str, float, Dict[str, float], optuna.Trial, optuna.Study], float] = None, - constraints: Dict[str, Callable[[str, float], float]] = None, - storage: optuna.storages.RDBStorage = None, - debug: bool = False - ) -> Tuple[optuna.Study, Callable]: - """ Create or load an existing study for mode share calibration using asc values. - - This function returns the study and optimization objective as tuple. Which can be used like this: - study.optimize(obj, 10) - - :param name: name of the study - :param params: parameters to calibrate - :param jar: path to executable jar file of the scenario - :param config: path to config file to run - :param modes: list of all relevant modes - :param target: csv file with target shares - :param fixed_mode: the fixed mode with asc=0 - :param args: additional arguments to the executable jar, can also be a callback function - :param jvm_args: additional jvm args - :param initial_asc: dict of initial asc values - :param transform_persons: callable to filter persons included in mode share - :param transform_trips: callable to modify trips included in mode share - :param chain_runs: automatically use the output plans of each run as input for the next, either True or number of iterations or callable - :param lr: learning rate schedule, will be called with (trial number, mode, asc update, mode_share, trial, study) - :param constraints: constraints for each mode, must return asc and will be called with original asc - :param storage: custom storage object to overwrite default sqlite backend - :param debug: enable debug output - :return: tuple of study and optimization objective. - """ - - # Init with 0 - if initial_params is None: - for p in params: - for m in modes: - initial_params[m] = 0 - - # Set some custom arguments to prevent known errors on NFS - if storage is None: - storage = optuna.storages.RDBStorage(url="sqlite:///%s.db" % name, - engine_kwargs={"connect_args": {"timeout": 100}, "isolation_level": "AUTOCOMMIT"}) - - target = pd.read_csv(target) - - mode_share = target.groupby("mode").agg(share=("share", "sum")) - - # TODO: normalize target share for given modes - - study = optuna.create_study( - study_name=name, - storage=storage, - load_if_exists=True, - directions=["minimize"] * len(modes), - sampler=CalibrationSampler(target, mode_share, fixed_mode, initial_params, lr, constraints) - ) - - study.set_user_attr("modes", modes) - study.set_user_attr("params", params) - study.set_user_attr("fixed_mode", fixed_mode) - - if not path.exists("params"): - makedirs("params") - - if not path.exists("runs"): - makedirs("runs") - - print("Running study with target:", mode_share) - - def f(trial): - - params_path = path.join("params", "run%d.yaml" % trial.number) - mode_params = [] - - for mode in modes: - # preserve order - m = {"mode": mode} - - if "asc" in params: - m["constant"] = trial.suggest_float("asc_" + mode, sys.float_info.min, sys.float_info.max) - - if "util_dist" in params or "dist" in params: - m["marginalUtilityOfDistance_util_m"] = trial.suggest_float("dist_" + mode, sys.float_info.min, sys.float_info.max) - - mode_params.append(m) - - with open(params_path, "w") as f: - yaml.dump({"planCalcScore": {"scoringParameters": [{"modeParams": mode_params}]}}, f, sort_keys=False) - - run_dir = "runs/%03d" % trial.number - - if os.path.exists(run_dir): - shutil.rmtree(run_dir) - - completed = _completed_trials(study) - - # Call args if necesarry - run_args = args(completed) if callable(args) else args - - cmd = "java %s -jar %s run --config %s --yaml %s --output %s --runId %03d %s" \ - % (jvm_args, jar, config, params_path, run_dir, trial.number, run_args) - - # Max fix extra whitespaces in args - cmd = cmd.strip() - - if chain_runs: - - out = None - for t in reversed(completed): - out = glob.glob("runs/%03d/*.output_plans.xml.gz" % t.number) - if out: - out = path.abspath(out[0]) - break - - if out: - last_chained = study.user_attrs.get("last_chained_run", None) - if (chain_runs is True or - (callable(chain_runs) and chain_runs(completed)) or - (type(chain_runs) == int and len(completed) % chain_runs == 0)): - cmd += " --config:plans.inputPlansFile=" + out - study.set_user_attr("last_chained_run", out) - - elif last_chained: - cmd += " --config:plans.inputPlansFile=" + last_chained - - else: - print("No output plans for chaining runs found.") - - # Extra whitespaces will break argument parsing - cmd = cmd.strip() - - print("Running cmd %s" % cmd) - - if os.name != 'nt': - cmd = cmd.split(" ") - - p = subprocess.Popen(cmd, - stdout=sys.stdout if debug else subprocess.DEVNULL, - stderr=sys.stderr if debug else subprocess.DEVNULL) - try: - while p.poll() is None: - sleep(1) - - if p.returncode != 0: - raise Exception("Process returned with error code: %s" % p.returncode) - finally: - p.terminate() - - shares = analysis.calc_mode_share(run_dir, transform_persons=transform_persons, transform_trips=transform_trips) - mode_stats = analysis.calc_mode_stats(run_dir, transform_persons=transform_persons, transform_trips=transform_trips) - - trial.set_user_attr("mode_stats", mode_stats.to_dict(orient="tight")) - - print("Obtained mode shares:", shares) - - for k, v in shares.items(): - trial.set_user_attr("%s_share" % k, v) - - res = [] - for mode in modes: - res.append(abs(mode_share.loc[mode] - trial.user_attrs["%s_share" % mode])) - - return res - - return study, f - -if __name__ == "__main__": - import argparse - - parser = argparse.ArgumentParser(description="Calibration CLI") - parser.add_argument('file', nargs=1, type=str, help="Path to input db") - parser.add_argument("--name", type=str, default="calib", help="Calibration name") - parser.add_argument("--output", default=None, help="Output path") - args = parser.parse_args() - - study = optuna.load_study( - study_name=args.name, - storage="sqlite:///%s" % args.file[0], - ) - - if not args.output: - args.output = args.file[0] + ".csv" - - df = study_as_df(study) - df.to_csv(args.output, index=False) \ No newline at end of file diff --git a/matsim/calibration/__init__.py b/matsim/calibration/__init__.py new file mode 100644 index 0000000..055bb45 --- /dev/null +++ b/matsim/calibration/__init__.py @@ -0,0 +1,315 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" Contains calibration related functions """ + +__all__ = ["create_calibration", "ASCCalibrator"] + +import os +import shutil +import subprocess +import sys +from os import path, makedirs +from time import sleep +from typing import Union, Sequence, Callable + +import optuna +import yaml +from optuna.trial import TrialState + +from .analysis import * +from .base import CalibratorBase +from .calib_asc import ASCCalibrator + + +def _completed_trials(study): + completed = filter(lambda s: s.state == TrialState.COMPLETE, study.trials) + return sorted(completed, key=lambda s: s.number) + + +def _same_sign(x): + x = x.to_numpy() + return np.all(x >= 0) if x[0] >= 0 else np.all(x < 0) + + +class CalibrationSampler(optuna.samplers.BaseSampler): + """ Run calibrators on obtained mode shares """ + + def __init__(self, calibrators): + self.calibrators = {c.name: c for c in calibrators} + + def infer_relative_search_space(self, study, trial): + return {} + + def sample_relative(self, study, trial, search_space): + # study._storage.set_trial_system_attr(trial._trial_id, "search_space", self._search_space) + return {} + + def sample_independent(self, study, trial, param_name, param_distribution): + + prefix, _, param = param_name.partition("_") + c : CalibratorBase = self.calibrators[prefix] + + completed = _completed_trials(study) + if len(completed) == 0: + initial = c.sample_initial(param) + + if c.constraints is not None and param in c.constraints: + initial = c.constraints[param](param, initial) + + return initial + + last = completed[-1] + last_param = last.params[param_name] + + step = c.update_step(param, last) + + rate = 1.0 + if c.lr is not None: + + rate = c.lr(float(len(completed) + 1), param_name, step, trial, study) + + # rate of None or 0 would be invalid + if not rate: + rate = 1 + + # numpy types need casting + rate = float(rate) + + trial.set_user_attr("%s_rate" % param_name, rate) + trial.set_user_attr("%s_step" % param_name, step) + + last_param += rate * step + + # Call constraint if present + if c.constraints is not None and param in c.constraints: + last_param = c.constraints[param](param, last_param) + + return last_param + + + +def study_as_df(study): + """ Convert study to dataframe """ + completed = _completed_trials(study) + + modes = study.user_attrs["modes"] + # fixed_mode = study.user_attrs["fixed_mode"] + + data = [] + + for i, trial in enumerate(completed): + + for j, m in enumerate(modes): + entry = { + "trial": i, + "mode": m, + "asc": trial.params[m], + "error": trial.values[j] + } + + for k, v in trial.user_attrs.items(): + if k.startswith(m + "_"): + entry[k[len(m) + 1:]] = v + + data.append(entry) + + return pd.DataFrame(data) + +def default_chain_scheduler(completed): + """ Default function to determine when to chain runs. """ + + n = len(completed) + + if n <= 6: + return n % 2 == 0 + + if n <= 15: + return n % 5 == 0 + + if n <= 50: + return n % 10 == 0 + + return False + + +def linear_lr_scheduler(start=0.6, end=1, interval=3): + """ Creates a lr scheduler that will interpolate linearly from start to end over the first n iterations. + + :param start: Initial learning rate. + :param end: Finial learning rate to reach. + :param interval: Number of runs until end rate should be reached. + """ + if interval < 2: + raise ValueError("N must be greater or equal 2.") + + def _fn(n, *args, **kwargs): + + if n > interval: + return end + + return start + (n - 1) * (end - start) / interval + + return _fn + + +def create_calibration(name: str, calibrate: Union[CalibratorBase, Sequence[CalibratorBase]], + jar: Union[str, os.PathLike], config: Union[str, os.PathLike], + args: Union[str, Callable] = "", jvm_args="", + transform_persons: Callable = None, + transform_trips: Callable = None, + chain_runs: Union[bool, int, Callable] = False, + storage: optuna.storages.RDBStorage = None, + debug: bool = False + ) -> Tuple[optuna.Study, Callable]: + """ Create or load an existing study for mode share calibration using asc values. + + This function returns the study and optimization objective as tuple. Which can be used like this: + study.optimize(obj, 10) + + :param name: name of the study + :param calibrate: (list) of calibrator instances + :param jar: path to executable jar file of the scenario + :param config: path to config file to run + :param args: additional arguments to the executable jar, can also be a callback function + :param jvm_args: additional jvm args + :param transform_persons: callable to filter persons included in mode share + :param transform_trips: callable to modify trips included in mode share + :param chain_runs: automatically use the output plans of each run as input for the next, either True or number of iterations or callable + :param storage: custom storage object to overwrite default sqlite backend + :param debug: enable debug output + :return: tuple of study and optimization objective. + """ + + # Init with 0 + if isinstance(calibrate, CalibratorBase): + calibrate = [calibrate] + + # Set some custom arguments to prevent known errors on NFS + if storage is None: + storage = optuna.storages.RDBStorage(url="sqlite:///%s.db" % name, + engine_kwargs={"connect_args": {"timeout": 100}, + "isolation_level": "AUTOCOMMIT"}) + + study = optuna.create_study( + study_name=name, + storage=storage, + load_if_exists=True, + directions=["minimize"] * sum(len(c.target) for c in calibrate), + sampler=CalibrationSampler(calibrate) + ) + + if not path.exists("params"): + makedirs("params") + + if not path.exists("runs"): + makedirs("runs") + + study.set_user_attr("calib", [c.name for c in calibrate]) + for c in calibrate: + c.init_study(study) + + def f(trial): + + params_path = path.join("params", "run%d.yaml" % trial.number) + params = {} + + for c in calibrate: + prefix = c.name + "_" + c.update_config(trial, prefix, params) + + with open(params_path, "w") as f: + yaml.dump(params, f, sort_keys=False) + + run_dir = "runs/%03d" % trial.number + + if os.path.exists(run_dir): + shutil.rmtree(run_dir) + + completed = _completed_trials(study) + + # Call args if necessary + run_args = args(completed) if callable(args) else args + + cmd = "java %s -jar %s run --config %s --yaml %s --output %s --runId %03d %s" \ + % (jvm_args, jar, config, params_path, run_dir, trial.number, run_args) + + # Max fix extra whitespaces in args + cmd = cmd.strip() + + if chain_runs: + + out = None + for t in reversed(completed): + out = glob.glob("runs/%03d/*.output_plans.xml.gz" % t.number) + if out: + out = path.abspath(out[0]) + break + + if out: + last_chained = study.user_attrs.get("last_chained_run", None) + if (chain_runs is True or + (callable(chain_runs) and chain_runs(completed)) or + (type(chain_runs) == int and len(completed) % chain_runs == 0)): + cmd += " --config:plans.inputPlansFile=" + out + study.set_user_attr("last_chained_run", out) + + elif last_chained: + cmd += " --config:plans.inputPlansFile=" + last_chained + + else: + print("No output plans for chaining runs found.") + + # Extra whitespaces will break argument parsing + cmd = cmd.strip() + + print("Running cmd %s" % cmd) + + if os.name != 'nt': + cmd = cmd.split(" ") + + p = subprocess.Popen(cmd, + stdout=sys.stdout if debug else subprocess.DEVNULL, + stderr=sys.stderr if debug else subprocess.DEVNULL) + try: + while p.poll() is None: + sleep(1) + + if p.returncode != 0: + raise Exception("Process returned with error code: %s" % p.returncode) + finally: + p.terminate() + + mode_stats = analysis.calc_mode_stats(run_dir, transform_persons=transform_persons,transform_trips=transform_trips) + + trial.set_user_attr("mode_stats", mode_stats.to_dict(orient="tight")) + + res = [] + for c in calibrate: + c.calc_stats(trial, run_dir, transform_persons, transform_trips) + + res.extend(c.error_metrics(trial)) + + return res + + return study, f + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser(description="Calibration CLI") + parser.add_argument('file', nargs=1, type=str, help="Path to input db") + parser.add_argument("--name", type=str, default="calib", help="Calibration name") + parser.add_argument("--output", default=None, help="Output path") + args = parser.parse_args() + + study = optuna.load_study( + study_name=args.name, + storage="sqlite:///%s" % args.file[0], + ) + + if not args.output: + args.output = args.file[0] + ".csv" + + df = study_as_df(study) + df.to_csv(args.output, index=False) diff --git a/matsim/analysis.py b/matsim/calibration/analysis.py similarity index 71% rename from matsim/analysis.py rename to matsim/calibration/analysis.py index c2373f9..014d7bc 100644 --- a/matsim/analysis.py +++ b/matsim/calibration/analysis.py @@ -3,9 +3,61 @@ import glob -import pandas as pd -import numpy as np +from typing import Any, Tuple + import geopandas +import numpy as np +import pandas as pd + + +def calc_adjusted_mode_share(sim: pd.DataFrame, survey: pd.DataFrame, + count_var: str = "trips", dist_var: str = "dist_group") -> Tuple[Any, pd.DataFrame]: + """ This function can be used if the given input trip data has a different distance distribution than the survey data. + It will rescale the survey data to match simulated data, which allows to calculate an adjusted overall mode share. + + :param sim: data frame containing shares for distance group and modes + :param survey: data frame containing shares from survey data + :param count_var: name of column containing the number of trips or share in 'sim' + :param dist_var: name of the column holding the distance group information + :return: tuple of optimization result and adjusted mode share + """ + from scipy.optimize import minimize + + sagg = sim.groupby(dist_var).sum() + sagg['share'] = sagg[count_var] / np.sum(sagg[count_var]) + + # Rescale the distance groups of the survey data so that it matches the distance group distribution of the simulation + # The overall mode share after this adjustment will the resulting adjusted mode share + def f(x, result=False): + adj = survey.copy() + + for i, t in enumerate(x): + adj.loc[adj[dist_var] == sagg.index[i], "share"] *= t + + adj.share = adj.share / np.sum(adj.share) + + agg = adj.groupby(dist_var).sum() + + # Minimized difference between adjusted and simulated distribution + err = sum((sagg.share - agg.share)**2) + + if result: + return adj + + return err + + # One variable for each distance group + x0 = np.ones(len(sagg.index)) / len(sagg.index) + + # Sum should be less than one + cons = [{'type': 'ineq', 'fun': lambda x: 1 - sum(x)}] + bnds = tuple((0, 1) for x in x0) + + res = minimize(f, x0, method='SLSQP', bounds=bnds, constraints=cons) + + df = f(res.x, True) + + return res, df def read_trips_and_persons(run, transform_persons=None, transform_trips=None): diff --git a/matsim/calibration/base.py b/matsim/calibration/base.py new file mode 100644 index 0000000..fa9888c --- /dev/null +++ b/matsim/calibration/base.py @@ -0,0 +1,122 @@ +# -*- coding: utf-8 -*- + +import math +import os +from abc import ABC, abstractmethod +from typing import Union, Sequence, Dict, Callable + +import optuna +import pandas as pd + +# Type alias for input variables +CalibrationInput = Union[str, os.PathLike, dict, pd.DataFrame] + + +class CalibratorBase(ABC): + """ Base class for calibrators. """ + + def __init__(self, + modes: Sequence[str], + initial: CalibrationInput, + target: CalibrationInput, + lr: Callable[[int, str, float, optuna.Trial, optuna.Study], float] = None, + constraints: Dict[str, Callable[[str, float], float]] = None): + """Abstract constructors for all calibrations. Usually the same parameters should be made available subclasses. + + :param modes: list of all relevant modes + :param initial: dict of initial asc values + :param target: target shares as csv or dict + :param lr: learning rate schedule, will be called with (trial number, mode, update, trial, study) + :param constraints: constraints for each mode, must return asc and will be called with original asc + """ + self.modes = modes + self.initial = self.convert_input(initial) + self.target = self.convert_input(target) + self.lr = lr + self.constraints = constraints + + @property + @abstractmethod + def name(self): + """ Short name of the calibrator. Will be used to differentiate attributes""" + + raise NotImplemented + + @abstractmethod + def init_study(self, study: optuna.Study): + """ Initialize study instance """ + raise NotImplemented + + @abstractmethod + def update_config(self, trial: optuna.Trial, prefix: str, config: dict): + """ Calculate updated config for this trial. This method must interact with the trial using + `trial.suggest_float` and by using the given prefix . """ + raise NotImplemented + + @abstractmethod + def update_step(self, param: str, last_trial: optuna.Trial) -> float: + """ Return update step for param based on last trial """ + raise NotImplemented + + @abstractmethod + def calc_stats(self, trial: optuna.Trial, run_dir: str, + transform_persons: Callable = None, + transform_trips: Callable = None) -> Sequence[float]: + """ Calculate and store statistics for a trial. These usually needs to be accessed for calculating updates. + + :return: reported error metrics + """ + raise NotImplemented + + def sample_initial(self, param: str) -> float: + """ Sample initial value for parameter """ + if param in self.initial: + return self.initial[param] + + return 0 + + @classmethod + def convert_input(cls, arg) -> pd.DataFrame: + """ Method call to create target values from input argument """ + if isinstance(arg, pd.DataFrame): + return arg + if isinstance(arg, dict): + # Convert simple dict of modes and values + return pd.DataFrame(index=arg.keys(), data=arg.values(), columns=["value"]).rename_axis(index="mode") + + return pd.read_csv(arg) + + @staticmethod + def get_mode_params(config: dict, mode: str): + """ Retrieve the mode params from config object. Create if they don't exist yet. """ + if "planCalcScore" not in config: + config["planCalcScore"] = {} + + if "scoringParameters" not in config["planCalcScore"]: + config["planCalcScore"]["scoringParameters"] = [] + + ps = config["planCalcScore"]["scoringParameters"] + + for p in ps: + if "modeParams" in p: + for m in p["modeParams"]: + if m["mode"] == mode: + return m + + m = {"mode": mode} + p["modeParams"].append(m) + return m + + m = {"mode": mode} + ps.append({"modeParams": [m]}) + return m + + @staticmethod + def calc_asc_update(z_i, m_i, z_0, m_0) -> float: + """ Calculates the asc update for one step """ + # Update asc + # (1) Let z_i be the observed share of mode i. (real data, to be reproduced) + # (2) Run the simulation to convergence. Obtain simulated mode shares m_i. + # (3) Do nothing for mode 0. For all other modes: add [ln(z_i) - ln(m_i)] – [ln(z_0) - ln(m_0)] to its ASC. + # (4) Goto 2. + return math.log(z_i) - math.log(m_i) - (math.log(z_0) - math.log(m_0)) diff --git a/matsim/calibration/calib_asc.py b/matsim/calibration/calib_asc.py new file mode 100644 index 0000000..2b4d06c --- /dev/null +++ b/matsim/calibration/calib_asc.py @@ -0,0 +1,76 @@ +# -*- coding: utf-8 -*- + +import sys +from typing import Sequence, Callable, Dict + +import optuna + +from .base import CalibratorBase, CalibrationInput +from .analysis import calc_mode_share + + +class ASCCalibrator(CalibratorBase): + """ Calibrates the alternative specific constant for desired modes """ + + def __init__(self, + modes: Sequence[str], + initial: CalibrationInput, + target: CalibrationInput, + fixed_mode: str = "walk", + lr: Callable[[int, str, float, optuna.Trial, optuna.Study], float] = None, + constraints: Dict[str, Callable[[str, float], float]] = None): + """Abstract constructors for all calibrations. Usually the same parameters should be made available subclasses. + + :param modes: list of all relevant modes + :param initial: dict of initial asc values + :param target: target shares as csv or dict + :param fixed_mode: the fixed mode with asc=0 + :param lr: learning rate schedule, will be called with (trial number, mode, update, trial, study) + :param constraints: constraints for each mode, must return asc and will be called with original asc + """ + super().__init__(modes, initial, target, lr, constraints) + + self.fixed_mode = fixed_mode + + self.target = self.target.rename(columns={"value": "share"}, errors="ignore") + self.target = self.target.groupby("mode").agg(share=("share", "sum")) + + @property + def name(self): + return "asc" + + def init_study(self, study: optuna.Trial): + study.set_user_attr("modes", self.modes) + study.set_user_attr("fixed_mode", self.fixed_mode) + + print("Running study with target:", self.target) + + def update_config(self, trial: optuna.Trial, prefix: str, config: dict): + + # Update constants + for mode in self.modes: + m = self.get_mode_params(config, mode) + m["constant"] = trial.suggest_float(prefix + mode, sys.float_info.min, sys.float_info.max) + + def update_step(self, param: str, last_trial: optuna.Trial) -> float: + + if param == self.fixed_mode: + return 0 + + step = self.calc_asc_update(self.target.loc[param].share, last_trial.user_attrs["%s_share" % param], + self.target.loc[self.fixed_mode].share, + last_trial.user_attrs["%s_share" % self.fixed_mode]) + + return step + + def calc_stats(self, trial: optuna.Trial, run_dir: str, + transform_persons: Callable = None, + transform_trips: Callable = None) -> Sequence[float]: + + shares = calc_mode_share(run_dir, transform_persons=transform_persons, transform_trips=transform_trips) + print("Obtained mode shares:", shares) + + for k, v in shares.items(): + trial.set_user_attr("%s_share" % k, v) + + return [(abs(self.target.loc[mode] - trial.user_attrs["%s_share" % mode])) for mode in self.modes] diff --git a/matsim/calibration/calib_dist.py b/matsim/calibration/calib_dist.py new file mode 100644 index 0000000..44749ea --- /dev/null +++ b/matsim/calibration/calib_dist.py @@ -0,0 +1,43 @@ +# -*- coding: utf-8 -*- + +# TODO: collection of old code +# was just draft anyway, needs to be updated + +""" +m["marginalUtilityOfDistance_util_m"] = trial.suggest_float("dist_" + mode, sys.float_info.min, + sys.float_info.max) + + +def sample_dist_util(self, mode, last): + + df = pd.DataFrame.from_dict(last.user_attrs["mode_stats"], orient="tight") + target = self.target[self.target["mode"] == mode].reset_index(drop=True).copy() + + df = df.loc[target.dist_group].reset_index() + + # Trips distances shares over all modes + # Correction factor is calculated here + ref_dist = self.target.groupby("dist_group").agg(share=("share", "sum")) + sim_dist = df.groupby("dist_group").agg(share=("share", "sum")) + + correction = ref_dist.loc[target.dist_group] / sim_dist.loc[target.dist_group] + + df = df[df.main_mode == mode].reset_index(drop=True).copy() + + if "mean_dist" not in target.columns: + target["mean_dist"] = df.mean_dist + + df["correction"] = correction.values + + target.share = target.share / sum(target.share) + df.share = df.share / sum(df.share) + + real = (df.mean_dist * df.share * df.correction).sum() + target = (target.mean_dist * target.share).sum() + + # TODO: magnitude should depend on the asc + + # TODO: configurable parameter + return float(0.05 * (target - real) / (1000 * 1000)) + +""" \ No newline at end of file diff --git a/matsim/calibration/constraints.py b/matsim/calibration/constraints.py new file mode 100644 index 0000000..1f16d82 --- /dev/null +++ b/matsim/calibration/constraints.py @@ -0,0 +1,10 @@ +# -*- coding: utf-8 -*- + +def positive_constraint(mode, val): + """ Ensures parameter are positive """ + return max(0, val) + + +def negative_constraint(mode, val): + """ Ensure parameter are negative """ + return min(0, val) diff --git a/tests/test_calibration.py b/tests/test_calibration.py index 4aee3c7..570541a 100644 --- a/tests/test_calibration.py +++ b/tests/test_calibration.py @@ -1,18 +1,11 @@ -import gzip -import pathlib -import math - -import pytest - import numpy as np -import pandas as pd -from matsim.calibration import CalibrationSampler +from matsim.calibration import ASCCalibrator -def test_asc_sampler(): +def test_asc_sampler(): # variables are not needed for this test - sampler = CalibrationSampler(None, None, None, None, None) + sampler = ASCCalibrator([], {}, {}) np.random.seed(0) @@ -38,5 +31,6 @@ def test_asc_sampler(): assert err < 0.12 + if __name__ == "__main__": - test_asc_sampler() \ No newline at end of file + test_asc_sampler() From 2e3710dfb0c0628151d6888c491e4928083f276c Mon Sep 17 00:00:00 2001 From: Christian Rakow Date: Fri, 29 Sep 2023 15:50:47 +0200 Subject: [PATCH 04/29] improve API for scenarios with different CLI --- matsim/calibration/__init__.py | 100 ++++++-------------------------- matsim/calibration/base.py | 4 +- matsim/calibration/calib_asc.py | 1 - matsim/calibration/utils.py | 88 ++++++++++++++++++++++++++++ 4 files changed, 108 insertions(+), 85 deletions(-) create mode 100644 matsim/calibration/utils.py diff --git a/matsim/calibration/__init__.py b/matsim/calibration/__init__.py index 055bb45..d8b31ef 100644 --- a/matsim/calibration/__init__.py +++ b/matsim/calibration/__init__.py @@ -2,7 +2,7 @@ # -*- coding: utf-8 -*- """ Contains calibration related functions """ -__all__ = ["create_calibration", "ASCCalibrator"] +__all__ = ["create_calibration", "ASCCalibrator", "utils"] import os import shutil @@ -10,27 +10,17 @@ import sys from os import path, makedirs from time import sleep -from typing import Union, Sequence, Callable +from typing import Union, Sequence, Callable, Tuple import optuna import yaml from optuna.trial import TrialState -from .analysis import * +from . import utils from .base import CalibratorBase from .calib_asc import ASCCalibrator -def _completed_trials(study): - completed = filter(lambda s: s.state == TrialState.COMPLETE, study.trials) - return sorted(completed, key=lambda s: s.number) - - -def _same_sign(x): - x = x.to_numpy() - return np.all(x >= 0) if x[0] >= 0 else np.all(x < 0) - - class CalibrationSampler(optuna.samplers.BaseSampler): """ Run calibrators on obtained mode shares """ @@ -49,7 +39,7 @@ def sample_independent(self, study, trial, param_name, param_distribution): prefix, _, param = param_name.partition("_") c : CalibratorBase = self.calibrators[prefix] - completed = _completed_trials(study) + completed = utils.completed_trials(study) if len(completed) == 0: initial = c.sample_initial(param) @@ -87,71 +77,6 @@ def sample_independent(self, study, trial, param_name, param_distribution): return last_param - -def study_as_df(study): - """ Convert study to dataframe """ - completed = _completed_trials(study) - - modes = study.user_attrs["modes"] - # fixed_mode = study.user_attrs["fixed_mode"] - - data = [] - - for i, trial in enumerate(completed): - - for j, m in enumerate(modes): - entry = { - "trial": i, - "mode": m, - "asc": trial.params[m], - "error": trial.values[j] - } - - for k, v in trial.user_attrs.items(): - if k.startswith(m + "_"): - entry[k[len(m) + 1:]] = v - - data.append(entry) - - return pd.DataFrame(data) - -def default_chain_scheduler(completed): - """ Default function to determine when to chain runs. """ - - n = len(completed) - - if n <= 6: - return n % 2 == 0 - - if n <= 15: - return n % 5 == 0 - - if n <= 50: - return n % 10 == 0 - - return False - - -def linear_lr_scheduler(start=0.6, end=1, interval=3): - """ Creates a lr scheduler that will interpolate linearly from start to end over the first n iterations. - - :param start: Initial learning rate. - :param end: Finial learning rate to reach. - :param interval: Number of runs until end rate should be reached. - """ - if interval < 2: - raise ValueError("N must be greater or equal 2.") - - def _fn(n, *args, **kwargs): - - if n > interval: - return end - - return start + (n - 1) * (end - start) / interval - - return _fn - - def create_calibration(name: str, calibrate: Union[CalibratorBase, Sequence[CalibratorBase]], jar: Union[str, os.PathLike], config: Union[str, os.PathLike], args: Union[str, Callable] = "", jvm_args="", @@ -159,6 +84,7 @@ def create_calibration(name: str, calibrate: Union[CalibratorBase, Sequence[Cali transform_trips: Callable = None, chain_runs: Union[bool, int, Callable] = False, storage: optuna.storages.RDBStorage = None, + custom_cli: Callable = None, debug: bool = False ) -> Tuple[optuna.Study, Callable]: """ Create or load an existing study for mode share calibration using asc values. @@ -176,6 +102,7 @@ def create_calibration(name: str, calibrate: Union[CalibratorBase, Sequence[Cali :param transform_trips: callable to modify trips included in mode share :param chain_runs: automatically use the output plans of each run as input for the next, either True or number of iterations or callable :param storage: custom storage object to overwrite default sqlite backend + :param custom_cli: the scenario is not a matsim application and needs a different command line syntax :param debug: enable debug output :return: tuple of study and optimization objective. """ @@ -190,6 +117,12 @@ def create_calibration(name: str, calibrate: Union[CalibratorBase, Sequence[Cali engine_kwargs={"connect_args": {"timeout": 100}, "isolation_level": "AUTOCOMMIT"}) + if not os.access(jar, os.R_OK): + raise ValueError("Can not access JAR File: %s" % jar) + + if not os.access(config, os.R_OK): + raise ValueError("Can not access config File: %s" % config) + study = optuna.create_study( study_name=name, storage=storage, @@ -225,13 +158,16 @@ def f(trial): if os.path.exists(run_dir): shutil.rmtree(run_dir) - completed = _completed_trials(study) + completed = utils.completed_trials(study) # Call args if necessary run_args = args(completed) if callable(args) else args - cmd = "java %s -jar %s run --config %s --yaml %s --output %s --runId %03d %s" \ - % (jvm_args, jar, config, params_path, run_dir, trial.number, run_args) + if custom_cli: + cmd = custom_cli(jvm_args, jar, config, params_path, run_dir, trial.number, run_args) + else: + cmd = "java %s -jar %s run --config %s --yaml %s --output %s --runId %03d %s" \ + % (jvm_args, jar, config, params_path, run_dir, trial.number, run_args) # Max fix extra whitespaces in args cmd = cmd.strip() diff --git a/matsim/calibration/base.py b/matsim/calibration/base.py index fa9888c..02204bd 100644 --- a/matsim/calibration/base.py +++ b/matsim/calibration/base.py @@ -70,8 +70,8 @@ def calc_stats(self, trial: optuna.Trial, run_dir: str, def sample_initial(self, param: str) -> float: """ Sample initial value for parameter """ - if param in self.initial: - return self.initial[param] + if param in self.initial.index: + return float(self.initial.loc[param]) return 0 diff --git a/matsim/calibration/calib_asc.py b/matsim/calibration/calib_asc.py index 2b4d06c..fa6912f 100644 --- a/matsim/calibration/calib_asc.py +++ b/matsim/calibration/calib_asc.py @@ -8,7 +8,6 @@ from .base import CalibratorBase, CalibrationInput from .analysis import calc_mode_share - class ASCCalibrator(CalibratorBase): """ Calibrates the alternative specific constant for desired modes """ diff --git a/matsim/calibration/utils.py b/matsim/calibration/utils.py new file mode 100644 index 0000000..61ebcbf --- /dev/null +++ b/matsim/calibration/utils.py @@ -0,0 +1,88 @@ +# -*- coding: utf-8 -*- + +from optuna.trial import TrialState + +def study_as_df(study): + """ Convert study to dataframe """ + completed = completed_trials(study) + + modes = study.user_attrs["modes"] + # fixed_mode = study.user_attrs["fixed_mode"] + + data = [] + + for i, trial in enumerate(completed): + + for j, m in enumerate(modes): + entry = { + "trial": i, + "mode": m, + "asc": trial.params[m], + "error": trial.values[j] + } + + for k, v in trial.user_attrs.items(): + if k.startswith(m + "_"): + entry[k[len(m) + 1:]] = v + + data.append(entry) + + return pd.DataFrame(data) + + +def default_chain_scheduler(completed): + """ Default function to determine when to chain runs. """ + + n = len(completed) + + if n <= 6: + return n % 2 == 0 + + if n <= 15: + return n % 5 == 0 + + if n <= 50: + return n % 10 == 0 + + return False + + +def linear_scheduler(start=0.6, end=1, interval=3): + """ Creates a lr scheduler that will interpolate linearly from start to end over the first n iterations. + + :param start: Initial learning rate. + :param end: Finial learning rate to reach. + :param interval: Number of runs until end rate should be reached. + """ + if interval < 2: + raise ValueError("N must be greater or equal 2.") + + def _fn(n, *args, **kwargs): + + if n > interval: + return end + + return start + (n - 1) * (end - start) / interval + + return _fn + + +def completed_trials(study): + completed = filter(lambda s: s.state == TrialState.COMPLETE, study.trials) + return sorted(completed, key=lambda s: s.number) + + +def same_sign(x): + x = x.to_numpy() + return np.all(x >= 0) if x[0] >= 0 else np.all(x < 0) + + +def cli_oldstyle(yaml_arg="params"): + """ Produces CLI for non matsim application scenarios """ + + def _f(jvm_args, jar, config, params_path, run_dir, trial_number, run_args): + return "java %s -jar %s %s --config:controler.outputDirectory %s --config:controler.runId %03d --%s %s %s" % ( + jvm_args, jar, config, run_dir, trial_number, yaml_arg, params_path, run_args + ) + + return _f From e557d7fd71a9c9d4295c6bee4424f8636a7b6ec5 Mon Sep 17 00:00:00 2001 From: Christian Rakow Date: Wed, 4 Oct 2023 13:59:44 +0200 Subject: [PATCH 05/29] fix imports and warnings --- matsim/calibration/__init__.py | 11 ++++++----- matsim/calibration/analysis.py | 2 +- matsim/calibration/base.py | 4 ++-- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/matsim/calibration/__init__.py b/matsim/calibration/__init__.py index d8b31ef..809a622 100644 --- a/matsim/calibration/__init__.py +++ b/matsim/calibration/__init__.py @@ -8,6 +8,8 @@ import shutil import subprocess import sys +import glob + from os import path, makedirs from time import sleep from typing import Union, Sequence, Callable, Tuple @@ -219,13 +221,12 @@ def f(trial): trial.set_user_attr("mode_stats", mode_stats.to_dict(orient="tight")) - res = [] + err = [] for c in calibrate: - c.calc_stats(trial, run_dir, transform_persons, transform_trips) - - res.extend(c.error_metrics(trial)) + e = c.calc_stats(trial, run_dir, transform_persons, transform_trips) + err.extend(e) - return res + return err return study, f diff --git a/matsim/calibration/analysis.py b/matsim/calibration/analysis.py index 014d7bc..5e52328 100644 --- a/matsim/calibration/analysis.py +++ b/matsim/calibration/analysis.py @@ -133,7 +133,7 @@ def aggr(x): data = dict(n=len(x), mean_dist=np.average(x.traveled_distance)) return pd.Series(data=data) - aggr = trips.groupby(attrs + ["dist_group", "main_mode"]).apply(aggr) + aggr = trips.groupby(attrs + ["dist_group", "main_mode"], observed=True).apply(aggr) aggr["n"].fillna(0, inplace=True) diff --git a/matsim/calibration/base.py b/matsim/calibration/base.py index 02204bd..97b5cbc 100644 --- a/matsim/calibration/base.py +++ b/matsim/calibration/base.py @@ -3,7 +3,7 @@ import math import os from abc import ABC, abstractmethod -from typing import Union, Sequence, Dict, Callable +from typing import Union, Sequence, Dict, Tuple, Callable import optuna import pandas as pd @@ -61,7 +61,7 @@ def update_step(self, param: str, last_trial: optuna.Trial) -> float: @abstractmethod def calc_stats(self, trial: optuna.Trial, run_dir: str, transform_persons: Callable = None, - transform_trips: Callable = None) -> Sequence[float]: + transform_trips: Callable = None) -> Tuple[float]: """ Calculate and store statistics for a trial. These usually needs to be accessed for calculating updates. :return: reported error metrics From 8119a66826fb90a1a501d712751c9c14283f4234 Mon Sep 17 00:00:00 2001 From: Christian Rakow Date: Wed, 4 Oct 2023 14:43:58 +0200 Subject: [PATCH 06/29] fix more warnings --- matsim/calibration/__init__.py | 11 ++++++----- matsim/calibration/base.py | 4 +++- setup.py | 2 +- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/matsim/calibration/__init__.py b/matsim/calibration/__init__.py index 809a622..eb71a74 100644 --- a/matsim/calibration/__init__.py +++ b/matsim/calibration/__init__.py @@ -15,11 +15,12 @@ from typing import Union, Sequence, Callable, Tuple import optuna +import pandas as pd import yaml from optuna.trial import TrialState from . import utils -from .base import CalibratorBase +from .base import CalibratorBase, to_float from .calib_asc import ASCCalibrator @@ -43,7 +44,7 @@ def sample_independent(self, study, trial, param_name, param_distribution): completed = utils.completed_trials(study) if len(completed) == 0: - initial = c.sample_initial(param) + initial = to_float(c.sample_initial(param)) if c.constraints is not None and param in c.constraints: initial = c.constraints[param](param, initial) @@ -53,7 +54,7 @@ def sample_independent(self, study, trial, param_name, param_distribution): last = completed[-1] last_param = last.params[param_name] - step = c.update_step(param, last) + step = to_float(c.update_step(param, last)) rate = 1.0 if c.lr is not None: @@ -65,7 +66,7 @@ def sample_independent(self, study, trial, param_name, param_distribution): rate = 1 # numpy types need casting - rate = float(rate) + rate = to_float(rate) trial.set_user_attr("%s_rate" % param_name, rate) trial.set_user_attr("%s_step" % param_name, step) @@ -224,7 +225,7 @@ def f(trial): err = [] for c in calibrate: e = c.calc_stats(trial, run_dir, transform_persons, transform_trips) - err.extend(e) + err.extend(to_float(f) for f in e) return err diff --git a/matsim/calibration/base.py b/matsim/calibration/base.py index 97b5cbc..4054fa2 100644 --- a/matsim/calibration/base.py +++ b/matsim/calibration/base.py @@ -11,6 +11,8 @@ # Type alias for input variables CalibrationInput = Union[str, os.PathLike, dict, pd.DataFrame] +def to_float(x): + return float(x.iloc[0]) if isinstance(x, pd.Series) else float(x) class CalibratorBase(ABC): """ Base class for calibrators. """ @@ -71,7 +73,7 @@ def calc_stats(self, trial: optuna.Trial, run_dir: str, def sample_initial(self, param: str) -> float: """ Sample initial value for parameter """ if param in self.initial.index: - return float(self.initial.loc[param]) + return self.initial.loc[param] return 0 diff --git a/setup.py b/setup.py index 1bd9b63..9dde4d2 100644 --- a/setup.py +++ b/setup.py @@ -30,7 +30,7 @@ "pandas >= 1.4.0", ], extras_require={ - 'calibration': ["optuna >= 2.7.0", "shapely", "geopandas >= 0.6.0"], + 'calibration': ["optuna >= 3.3.0", "shapely", "geopandas >= 0.6.0"], # m2cgen has problems with newer xgb, see this issue # https://github.com/BayesWitnesses/m2cgen/issues/581 'scenariogen': ["sumolib", "traci", "lxml", "optax", "requests", "tqdm", "scikit-learn", "xgboost==1.7.1", "lightgbm", From 9dcfe2aeebcaccee989bf9945d46d47e95e77c1d Mon Sep 17 00:00:00 2001 From: Christian Rakow Date: Wed, 4 Oct 2023 16:55:26 +0200 Subject: [PATCH 07/29] fix imports --- matsim/calibration/utils.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/matsim/calibration/utils.py b/matsim/calibration/utils.py index 61ebcbf..e0f5afd 100644 --- a/matsim/calibration/utils.py +++ b/matsim/calibration/utils.py @@ -1,7 +1,11 @@ # -*- coding: utf-8 -*- +import numpy as np +import pandas as pd + from optuna.trial import TrialState + def study_as_df(study): """ Convert study to dataframe """ completed = completed_trials(study) From e80bd8518a361a64639a0bacc8471ad4939c9b82 Mon Sep 17 00:00:00 2001 From: Christian Rakow Date: Thu, 5 Oct 2023 18:04:41 +0200 Subject: [PATCH 08/29] created new grouped asc calibration --- matsim/calibration/__init__.py | 30 +++-- matsim/calibration/analysis.py | 2 +- matsim/calibration/base.py | 25 ++-- matsim/calibration/calib_asc.py | 7 + matsim/calibration/calib_group_asc.py | 181 ++++++++++++++++++++++++++ 5 files changed, 222 insertions(+), 23 deletions(-) create mode 100644 matsim/calibration/calib_group_asc.py diff --git a/matsim/calibration/__init__.py b/matsim/calibration/__init__.py index eb71a74..6d66e7f 100644 --- a/matsim/calibration/__init__.py +++ b/matsim/calibration/__init__.py @@ -2,26 +2,26 @@ # -*- coding: utf-8 -*- """ Contains calibration related functions """ -__all__ = ["create_calibration", "ASCCalibrator", "utils"] +__all__ = ["create_calibration", "ASCCalibrator", "ASCGroupCalibrator", "utils"] +import glob import os import shutil import subprocess import sys -import glob - from os import path, makedirs from time import sleep from typing import Union, Sequence, Callable, Tuple import optuna -import pandas as pd import yaml -from optuna.trial import TrialState from . import utils from .base import CalibratorBase, to_float from .calib_asc import ASCCalibrator +from .calib_group_asc import ASCGroupCalibrator +from .utils import study_as_df +from .analysis import calc_mode_stats class CalibrationSampler(optuna.samplers.BaseSampler): @@ -39,15 +39,17 @@ def sample_relative(self, study, trial, search_space): def sample_independent(self, study, trial, param_name, param_distribution): - prefix, _, param = param_name.partition("_") - c : CalibratorBase = self.calibrators[prefix] + prefix, _, param = param_name.partition("-") + c: CalibratorBase = self.calibrators[prefix] + + mode = param.rpartition("-")[2] if "-" in param else param completed = utils.completed_trials(study) if len(completed) == 0: initial = to_float(c.sample_initial(param)) - if c.constraints is not None and param in c.constraints: - initial = c.constraints[param](param, initial) + if c.constraints is not None and mode in c.constraints: + initial = c.constraints[mode](mode, initial) return initial @@ -74,8 +76,8 @@ def sample_independent(self, study, trial, param_name, param_distribution): last_param += rate * step # Call constraint if present - if c.constraints is not None and param in c.constraints: - last_param = c.constraints[param](param, last_param) + if c.constraints is not None and mode in c.constraints: + last_param = c.constraints[mode](mode, last_param) return last_param @@ -150,7 +152,7 @@ def f(trial): params = {} for c in calibrate: - prefix = c.name + "_" + prefix = c.name + "-" c.update_config(trial, prefix, params) with open(params_path, "w") as f: @@ -218,7 +220,9 @@ def f(trial): finally: p.terminate() - mode_stats = analysis.calc_mode_stats(run_dir, transform_persons=transform_persons,transform_trips=transform_trips) + mode_stats = calc_mode_stats(run_dir, + transform_persons=transform_persons, + transform_trips=transform_trips) trial.set_user_attr("mode_stats", mode_stats.to_dict(orient="tight")) diff --git a/matsim/calibration/analysis.py b/matsim/calibration/analysis.py index 5e52328..dc82a4a 100644 --- a/matsim/calibration/analysis.py +++ b/matsim/calibration/analysis.py @@ -60,7 +60,7 @@ def f(x, result=False): return res, df -def read_trips_and_persons(run, transform_persons=None, transform_trips=None): +def read_trips_and_persons(run, transform_persons=None, transform_trips=None) -> Tuple[pd.DataFrame, pd.DataFrame]: """ Read trips and persons from run directory """ # Return input as output diff --git a/matsim/calibration/base.py b/matsim/calibration/base.py index 4054fa2..ff13869 100644 --- a/matsim/calibration/base.py +++ b/matsim/calibration/base.py @@ -11,9 +11,18 @@ # Type alias for input variables CalibrationInput = Union[str, os.PathLike, dict, pd.DataFrame] + def to_float(x): return float(x.iloc[0]) if isinstance(x, pd.Series) else float(x) + +def sanitize(x): + if 'main_mode' in x.columns and 'mode' not in x.columns: + x.rename(columns={'main_mode': 'mode'}, inplace=True) + + return x + + class CalibratorBase(ABC): """ Base class for calibrators. """ @@ -32,8 +41,8 @@ def __init__(self, :param constraints: constraints for each mode, must return asc and will be called with original asc """ self.modes = modes - self.initial = self.convert_input(initial) - self.target = self.convert_input(target) + self.initial = sanitize(self.convert_input(initial)) + self.target = sanitize(self.convert_input(target)) self.lr = lr self.constraints = constraints @@ -55,6 +64,11 @@ def update_config(self, trial: optuna.Trial, prefix: str, config: dict): `trial.suggest_float` and by using the given prefix . """ raise NotImplemented + @abstractmethod + def sample_initial(self, param: str) -> float: + """ Sample initial value for parameter """ + raise NotImplemented + @abstractmethod def update_step(self, param: str, last_trial: optuna.Trial) -> float: """ Return update step for param based on last trial """ @@ -70,13 +84,6 @@ def calc_stats(self, trial: optuna.Trial, run_dir: str, """ raise NotImplemented - def sample_initial(self, param: str) -> float: - """ Sample initial value for parameter """ - if param in self.initial.index: - return self.initial.loc[param] - - return 0 - @classmethod def convert_input(cls, arg) -> pd.DataFrame: """ Method call to create target values from input argument """ diff --git a/matsim/calibration/calib_asc.py b/matsim/calibration/calib_asc.py index fa6912f..ec6cecb 100644 --- a/matsim/calibration/calib_asc.py +++ b/matsim/calibration/calib_asc.py @@ -8,6 +8,7 @@ from .base import CalibratorBase, CalibrationInput from .analysis import calc_mode_share + class ASCCalibrator(CalibratorBase): """ Calibrates the alternative specific constant for desired modes """ @@ -51,6 +52,12 @@ def update_config(self, trial: optuna.Trial, prefix: str, config: dict): m = self.get_mode_params(config, mode) m["constant"] = trial.suggest_float(prefix + mode, sys.float_info.min, sys.float_info.max) + def sample_initial(self, param: str) -> float: + if param in self.initial.index: + return self.initial.loc[param] + + return 0 + def update_step(self, param: str, last_trial: optuna.Trial) -> float: if param == self.fixed_mode: diff --git a/matsim/calibration/calib_group_asc.py b/matsim/calibration/calib_group_asc.py new file mode 100644 index 0000000..5eac6ab --- /dev/null +++ b/matsim/calibration/calib_group_asc.py @@ -0,0 +1,181 @@ +# -*- coding: utf-8 -*- + +import sys +from typing import Sequence, Callable, Dict, Literal +from functools import reduce + +import numpy as np +import optuna +import pandas as pd + +from .base import CalibratorBase, CalibrationInput +from .analysis import read_trips_and_persons + + +def get_sbb_params(config, group, value, mode): + """ Return parameter set for specific group """ + + if "SBBBehaviorGroups" not in config: + config["SBBBehaviorGroups"] = {} + + if "behaviorGroup" not in config["SBBBehaviorGroups"]: + config["SBBBehaviorGroups"]["behaviorGroup"] = [] + + groups = config["SBBBehaviorGroups"]["behaviorGroup"] + + for b_group in groups: + + if b_group["personAttribute"] == group: + + for v in b_group["personGroupAttributeValues"]: + if v["attributeValues"] == value: + + for m in v["absoluteModeCorrections"]: + if m == mode: + return m + + m = {"mode": mode} + v["absoluteModeCorrections"].append(m) + return m + + m = {"mode": mode} + b_group["personGroupAttributeValues"].append( + {"attributeValues": value, "absoluteModeCorrections": [m]} + ) + return m + + m = {"mode": mode} + groups.append({"personAttribute": group, + "personGroupAttributeValues": + [{"attributeValues": value, "absoluteModeCorrections": [m]}]}) + return m + + +class ASCGroupCalibrator(CalibratorBase): + """ Calibrates the alternative specific for specific subpopulations """ + + def __init__(self, + modes: Sequence[str], + initial: CalibrationInput, + target: CalibrationInput, + fixed_mode: str = "walk", + lr: Callable[[int, str, float, optuna.Trial, optuna.Study], float] = None, + constraints: Dict[str, Callable[[str, float], float]] = None, + config_format: Literal['default', 'sbb'] = "default"): + """Abstract constructors for all calibrations. Usually the same parameters should be made available subclasses. + + :param modes: list of all relevant modes + :param initial: dict of initial asc values + :param target: target shares as csv or dict + :param fixed_mode: the fixed mode with asc=0 + :param lr: learning rate schedule, will be called with (trial number, mode, update, trial, study) + :param constraints: constraints for each mode, must return asc and will be called with original asc + :param config_format: use SBBBehaviorGroups for the parameter config + """ + super().__init__(modes, initial, target, lr, constraints) + + self.fixed_mode = fixed_mode + self.config_format = config_format + + if "mode" not in self.target.columns: + raise ValueError("Target must have 'mode' or 'main_mode' column") + + self.target = self.target.rename(columns={"value": "share"}, errors="ignore") + + if "mode" not in self.target.columns: + raise ValueError("Target must have 'share' column") + + self.groups = [t for t in self.target.columns if t not in ('mode', 'share', 'asc')] + + self.base = self.get_group(self.target, None) + self.base = self.base.groupby("mode").agg(share=("share", "sum")) + + if len(self.base) == 0: + raise ValueError("Target must contain base without any attributes") + + def get_group(self, df, groups: dict = None): + """ Get data of one group""" + # return result for empty groups + if groups is None: + idx = reduce(lambda x, y: x & y, [pd.isna(df[g]) for g in self.groups]) + return df[idx] + + @property + def name(self): + return "asc" + + def init_study(self, study: optuna.Trial): + study.set_user_attr("modes", self.modes) + study.set_user_attr("groups", self.groups) + study.set_user_attr("fixed_mode", self.fixed_mode) + + print("Calibrating groups:", self.groups) + print("Running with base target:", self.base) + + def update_config(self, trial: optuna.Trial, prefix: str, config: dict): + + # Base ascs for each mode + base = {} + + for mode in self.modes: + m = self.get_mode_params(config, mode) + m["constant"] = trial.suggest_float(prefix + mode, sys.float_info.min, sys.float_info.max) + base[mode] = m["constant"] + + for g in self.groups: + for v in set(self.target[g]): + if pd.isna(v): + continue + + attr = (str(g) + "=" + str(v)) + for mode in self.modes: + # TODO: build config correctly + # Update constants + if self.config_format == "sbb": + p = trial.suggest_float(prefix + "[%s]-%s" % (attr, mode), + sys.float_info.min, sys.float_info.max) + + # don't write certain values + if p == 0 and mode == self.fixed_mode: + continue + + m = get_sbb_params(config, g, v, mode) + m["deltaConstant"] = p + + else: + raise ValueError("Currently only ssb config format is supported") + + def sample_initial(self, param: str) -> float: + attr, _, mode = param.rpartition("-") + + if mode in self.initial.index and attr == "": + return self.initial.loc[mode] + + # TODO: Load from column with same format as input + + return 0 + + def update_step(self, param: str, last_trial: optuna.Trial) -> float: + + if param == self.fixed_mode: + return 0 + + step = self.calc_asc_update(self.target.loc[param].share, last_trial.user_attrs["%s_share" % param], + self.target.loc[self.fixed_mode].share, + last_trial.user_attrs["%s_share" % self.fixed_mode]) + + return step + + def calc_stats(self, trial: optuna.Trial, run_dir: str, + transform_persons: Callable = None, + transform_trips: Callable = None) -> Sequence[float]: + + trips, persons = read_trips_and_persons(run_dir, + transform_persons=transform_persons, + transform_trips=transform_trips) + + errs = [] + + print("TODO") + + return [(abs(self.target.loc[mode] - trial.user_attrs["%s_share" % mode])) for mode in self.modes] From ced2854bd38ad2f2fc38a360cf3f528410798470 Mon Sep 17 00:00:00 2001 From: Christian Rakow Date: Fri, 6 Oct 2023 14:23:32 +0200 Subject: [PATCH 09/29] first running version --- matsim/calibration/__init__.py | 11 +++- matsim/calibration/base.py | 8 +++ matsim/calibration/calib_group_asc.py | 84 +++++++++++++++++++++++---- 3 files changed, 89 insertions(+), 14 deletions(-) diff --git a/matsim/calibration/__init__.py b/matsim/calibration/__init__.py index 6d66e7f..245ab56 100644 --- a/matsim/calibration/__init__.py +++ b/matsim/calibration/__init__.py @@ -27,8 +27,13 @@ class CalibrationSampler(optuna.samplers.BaseSampler): """ Run calibrators on obtained mode shares """ - def __init__(self, calibrators): - self.calibrators = {c.name: c for c in calibrators} + def __init__(self, calibrators: Sequence[CalibratorBase]): + self.calibrators = {} + for c in calibrators: + if c.name not in self.calibrators: + self.calibrators[c.name] = c + else: + raise ValueError("Incompatible calibrators with same name occurred: %s (%s)" % (c.name, c)) def infer_relative_search_space(self, study, trial): return {} @@ -132,7 +137,7 @@ def create_calibration(name: str, calibrate: Union[CalibratorBase, Sequence[Cali study_name=name, storage=storage, load_if_exists=True, - directions=["minimize"] * sum(len(c.target) for c in calibrate), + directions=["minimize"] * sum(c.num_targets for c in calibrate), sampler=CalibrationSampler(calibrate) ) diff --git a/matsim/calibration/base.py b/matsim/calibration/base.py index ff13869..6d43ccd 100644 --- a/matsim/calibration/base.py +++ b/matsim/calibration/base.py @@ -53,6 +53,11 @@ def name(self): raise NotImplemented + @property + def num_targets(self): + """ Number of targets/errors that will be returned in calc stats""" + return len(self.target) + @abstractmethod def init_study(self, study: optuna.Study): """ Initialize study instance """ @@ -128,4 +133,7 @@ def calc_asc_update(z_i, m_i, z_0, m_0) -> float: # (2) Run the simulation to convergence. Obtain simulated mode shares m_i. # (3) Do nothing for mode 0. For all other modes: add [ln(z_i) - ln(m_i)] – [ln(z_0) - ln(m_0)] to its ASC. # (4) Goto 2. + if m_i == 0: + return 0 + return math.log(z_i) - math.log(m_i) - (math.log(z_0) - math.log(m_0)) diff --git a/matsim/calibration/calib_group_asc.py b/matsim/calibration/calib_group_asc.py index 5eac6ab..269d1a7 100644 --- a/matsim/calibration/calib_group_asc.py +++ b/matsim/calibration/calib_group_asc.py @@ -50,6 +50,9 @@ def get_sbb_params(config, group, value, mode): [{"attributeValues": value, "absoluteModeCorrections": [m]}]}) return m +def parse_group(p): + p = p.strip("[]") + return {x.split("=")[0]:x.split("=")[1] for x in p.split(",")} class ASCGroupCalibrator(CalibratorBase): """ Calibrates the alternative specific for specific subpopulations """ @@ -88,7 +91,7 @@ def __init__(self, self.groups = [t for t in self.target.columns if t not in ('mode', 'share', 'asc')] self.base = self.get_group(self.target, None) - self.base = self.base.groupby("mode").agg(share=("share", "sum")) + self.base = self.base.groupby("mode").agg(target=("share", "sum")) if len(self.base) == 0: raise ValueError("Target must contain base without any attributes") @@ -100,10 +103,17 @@ def get_group(self, df, groups: dict = None): idx = reduce(lambda x, y: x & y, [pd.isna(df[g]) for g in self.groups]) return df[idx] + idx = reduce(lambda x, y: x & y, [df[g] == v for g, v in groups.items()]) + return df[idx] + @property def name(self): return "asc" + @property + def num_targets(self): + return len(self.base) + def init_study(self, study: optuna.Trial): study.set_user_attr("modes", self.modes) study.set_user_attr("groups", self.groups) @@ -122,6 +132,7 @@ def update_config(self, trial: optuna.Trial, prefix: str, config: dict): m["constant"] = trial.suggest_float(prefix + mode, sys.float_info.min, sys.float_info.max) base[mode] = m["constant"] + # Grouped ascs for g in self.groups: for v in set(self.target[g]): if pd.isna(v): @@ -129,18 +140,17 @@ def update_config(self, trial: optuna.Trial, prefix: str, config: dict): attr = (str(g) + "=" + str(v)) for mode in self.modes: - # TODO: build config correctly # Update constants if self.config_format == "sbb": p = trial.suggest_float(prefix + "[%s]-%s" % (attr, mode), - sys.float_info.min, sys.float_info.max) + sys.float_info.min, sys.float_info.max) # don't write certain values if p == 0 and mode == self.fixed_mode: continue m = get_sbb_params(config, g, v, mode) - m["deltaConstant"] = p + m["deltaConstant"] = p - base[mode] else: raise ValueError("Currently only ssb config format is supported") @@ -148,7 +158,7 @@ def update_config(self, trial: optuna.Trial, prefix: str, config: dict): def sample_initial(self, param: str) -> float: attr, _, mode = param.rpartition("-") - if mode in self.initial.index and attr == "": + if mode in self.initial.index: return self.initial.loc[mode] # TODO: Load from column with same format as input @@ -160,11 +170,23 @@ def update_step(self, param: str, last_trial: optuna.Trial) -> float: if param == self.fixed_mode: return 0 - step = self.calc_asc_update(self.target.loc[param].share, last_trial.user_attrs["%s_share" % param], - self.target.loc[self.fixed_mode].share, - last_trial.user_attrs["%s_share" % self.fixed_mode]) + # Base mode shares + if not param.startswith("["): + return self.calc_asc_update(self.base.loc[param].target, last_trial.user_attrs["%s_share" % param], + self.base.loc[self.fixed_mode].target, + last_trial.user_attrs["%s_share" % self.fixed_mode]) + + else: + p, _, mode = param.rpartition("-") + + if mode == self.fixed_mode: + return 0 + + t = self.get_group(self.target, parse_group(p)).set_index("mode") - return step + return self.calc_asc_update(t.loc[mode].share, last_trial.user_attrs["%s_share" % param], + t.loc[self.fixed_mode].share, + last_trial.user_attrs["%s-%s_share" % (p, self.fixed_mode)]) def calc_stats(self, trial: optuna.Trial, run_dir: str, transform_persons: Callable = None, @@ -174,8 +196,48 @@ def calc_stats(self, trial: optuna.Trial, run_dir: str, transform_persons=transform_persons, transform_trips=transform_trips) + base_share = trips.groupby("main_mode").count()["trip_number"] / len(trips) + base_share = self.base.merge(base_share, left_index=True, right_index=True).rename( + columns={"trip_number": "share"}) + + for kv in base_share.itertuples(): + trial.set_user_attr("%s_share" % kv.Index, kv.share) + + base_share["mae"] = np.abs(base_share.share - base_share.target) + + print("Obtained base shares:") + print(base_share) + errs = [] - print("TODO") + for g in self.groups: + for v in set(self.target[g]): + if pd.isna(v): + continue + + sub_trips = self.get_group(trips, {g: v}) + sub_share = sub_trips.groupby("main_mode").count()["trip_number"] / len(sub_trips) + sub_share.rename("share", inplace=True) + + if len(sub_trips) == 0: + print("Empty subgroup %s=%s" % (g, v), file=sys.stderr) + + target = self.get_group(self.target, {g: v}).rename(columns={"share": "target"}) + target = target.merge(sub_share, how="outer", left_on="mode", right_index=True) + target.share.fillna(0, inplace=True) + + target["mae"] = np.abs(target.share - target.target) + + attr = (str(g) + "=" + str(v)) + for kv in target.itertuples(): + trial.set_user_attr("[%s]-%s_share" % (attr, kv.mode), kv.share) + + errs.append(target) + + errs = pd.concat(errs) + print("Grouped sum of errors:") + for g in self.groups: + sub_err = errs.groupby(g).agg(sum_mae=("mae", "sum")) + print(sub_err) - return [(abs(self.target.loc[mode] - trial.user_attrs["%s_share" % mode])) for mode in self.modes] + return base_share.mae.to_numpy() From 3cc814805474041cd09abac960b1740b2a7e682f Mon Sep 17 00:00:00 2001 From: Christian Rakow Date: Fri, 6 Oct 2023 17:19:00 +0200 Subject: [PATCH 10/29] persist error metrics --- matsim/calibration/__init__.py | 2 +- matsim/calibration/calib_group_asc.py | 8 ++++++-- matsim/calibration/utils.py | 2 ++ 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/matsim/calibration/__init__.py b/matsim/calibration/__init__.py index 245ab56..9235f41 100644 --- a/matsim/calibration/__init__.py +++ b/matsim/calibration/__init__.py @@ -2,7 +2,7 @@ # -*- coding: utf-8 -*- """ Contains calibration related functions """ -__all__ = ["create_calibration", "ASCCalibrator", "ASCGroupCalibrator", "utils"] +__all__ = ["create_calibration", "study_as_df", "ASCCalibrator", "ASCGroupCalibrator", "utils"] import glob import os diff --git a/matsim/calibration/calib_group_asc.py b/matsim/calibration/calib_group_asc.py index 269d1a7..33d4635 100644 --- a/matsim/calibration/calib_group_asc.py +++ b/matsim/calibration/calib_group_asc.py @@ -200,10 +200,11 @@ def calc_stats(self, trial: optuna.Trial, run_dir: str, base_share = self.base.merge(base_share, left_index=True, right_index=True).rename( columns={"trip_number": "share"}) + base_share["mae"] = np.abs(base_share.share - base_share.target) + for kv in base_share.itertuples(): trial.set_user_attr("%s_share" % kv.Index, kv.share) - - base_share["mae"] = np.abs(base_share.share - base_share.target) + trial.set_user_attr("%s_mae" % kv.Index, kv.mae) print("Obtained base shares:") print(base_share) @@ -240,4 +241,7 @@ def calc_stats(self, trial: optuna.Trial, run_dir: str, sub_err = errs.groupby(g).agg(sum_mae=("mae", "sum")) print(sub_err) + for e in sub_err.itertuples(): + trial.set_user_attr("%s=%s_sum_mae" % (g, e.Index), e.sum_mae) + return base_share.mae.to_numpy() diff --git a/matsim/calibration/utils.py b/matsim/calibration/utils.py index e0f5afd..ef231f0 100644 --- a/matsim/calibration/utils.py +++ b/matsim/calibration/utils.py @@ -13,6 +13,8 @@ def study_as_df(study): modes = study.user_attrs["modes"] # fixed_mode = study.user_attrs["fixed_mode"] + # TODO: rework for multiple studies + data = [] for i, trial in enumerate(completed): From 3e9a6404929732cc94e9307dbf3eb3e3dbdb504b Mon Sep 17 00:00:00 2001 From: Christian Rakow Date: Fri, 6 Oct 2023 17:20:46 +0200 Subject: [PATCH 11/29] persist error metrics for normal asc as well --- matsim/calibration/calib_asc.py | 1 + 1 file changed, 1 insertion(+) diff --git a/matsim/calibration/calib_asc.py b/matsim/calibration/calib_asc.py index ec6cecb..4fa6ebb 100644 --- a/matsim/calibration/calib_asc.py +++ b/matsim/calibration/calib_asc.py @@ -78,5 +78,6 @@ def calc_stats(self, trial: optuna.Trial, run_dir: str, for k, v in shares.items(): trial.set_user_attr("%s_share" % k, v) + trial.set_user_attr("%s_mae" % k, abs(self.target.loc[k] - v)) return [(abs(self.target.loc[mode] - trial.user_attrs["%s_share" % mode])) for mode in self.modes] From ee965e401ab84ebee28b5877dee44bb162b2b2d5 Mon Sep 17 00:00:00 2001 From: Christian Rakow Date: Wed, 18 Oct 2023 11:34:58 +0200 Subject: [PATCH 12/29] write error metrics --- matsim/calibration/__init__.py | 23 +---------------------- matsim/calibration/__main__.py | 26 ++++++++++++++++++++++++++ matsim/calibration/calib_group_asc.py | 1 + matsim/calibration/utils.py | 24 ++++++++++-------------- 4 files changed, 38 insertions(+), 36 deletions(-) create mode 100644 matsim/calibration/__main__.py diff --git a/matsim/calibration/__init__.py b/matsim/calibration/__init__.py index 9235f41..778dc01 100644 --- a/matsim/calibration/__init__.py +++ b/matsim/calibration/__init__.py @@ -238,25 +238,4 @@ def f(trial): return err - return study, f - - -if __name__ == "__main__": - import argparse - - parser = argparse.ArgumentParser(description="Calibration CLI") - parser.add_argument('file', nargs=1, type=str, help="Path to input db") - parser.add_argument("--name", type=str, default="calib", help="Calibration name") - parser.add_argument("--output", default=None, help="Output path") - args = parser.parse_args() - - study = optuna.load_study( - study_name=args.name, - storage="sqlite:///%s" % args.file[0], - ) - - if not args.output: - args.output = args.file[0] + ".csv" - - df = study_as_df(study) - df.to_csv(args.output, index=False) + return study, f \ No newline at end of file diff --git a/matsim/calibration/__main__.py b/matsim/calibration/__main__.py new file mode 100644 index 0000000..62e5d44 --- /dev/null +++ b/matsim/calibration/__main__.py @@ -0,0 +1,26 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import optuna + +from . import study_as_df + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser(prog="matsim-calibration", description="Calibration CLI") + parser.add_argument('file', nargs=1, type=str, help="Path to input db") + parser.add_argument("--name", type=str, default="calib", help="Calibration name") + parser.add_argument("--output", default=None, help="Output path") + args = parser.parse_args() + + study = optuna.load_study( + study_name=args.name, + storage="sqlite:///%s" % args.file[0], + ) + + if not args.output: + args.output = args.file[0] + ".csv" + + df = study_as_df(study) + df.to_csv(args.output, index=False) diff --git a/matsim/calibration/calib_group_asc.py b/matsim/calibration/calib_group_asc.py index 33d4635..7c16a93 100644 --- a/matsim/calibration/calib_group_asc.py +++ b/matsim/calibration/calib_group_asc.py @@ -232,6 +232,7 @@ def calc_stats(self, trial: optuna.Trial, run_dir: str, attr = (str(g) + "=" + str(v)) for kv in target.itertuples(): trial.set_user_attr("[%s]-%s_share" % (attr, kv.mode), kv.share) + trial.set_user_attr("[%s]-%s_mae" % (attr, kv.mode), kv.mae) errs.append(target) diff --git a/matsim/calibration/utils.py b/matsim/calibration/utils.py index ef231f0..bc174c0 100644 --- a/matsim/calibration/utils.py +++ b/matsim/calibration/utils.py @@ -10,28 +10,24 @@ def study_as_df(study): """ Convert study to dataframe """ completed = completed_trials(study) - modes = study.user_attrs["modes"] + # modes = study.user_attrs["modes"] # fixed_mode = study.user_attrs["fixed_mode"] - # TODO: rework for multiple studies - data = [] for i, trial in enumerate(completed): - for j, m in enumerate(modes): - entry = { - "trial": i, - "mode": m, - "asc": trial.params[m], - "error": trial.values[j] - } + entry = { + "trial": i, + "start": trial.datetime_start, + "duration": trial.duration, + } - for k, v in trial.user_attrs.items(): - if k.startswith(m + "_"): - entry[k[len(m) + 1:]] = v + for k, v in trial.user_attrs.items(): + if type(v) in (float, int): + entry[k] = v - data.append(entry) + data.append(entry) return pd.DataFrame(data) From 7e3227997a08ba3698348cb4f63f2c9fafa72fce Mon Sep 17 00:00:00 2001 From: Christian Rakow Date: Thu, 19 Oct 2023 13:14:51 +0200 Subject: [PATCH 13/29] reduce step size for correlated groups --- matsim/calibration/calib_group_asc.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/matsim/calibration/calib_group_asc.py b/matsim/calibration/calib_group_asc.py index 7c16a93..22dabaa 100644 --- a/matsim/calibration/calib_group_asc.py +++ b/matsim/calibration/calib_group_asc.py @@ -64,6 +64,7 @@ def __init__(self, fixed_mode: str = "walk", lr: Callable[[int, str, float, optuna.Trial, optuna.Study], float] = None, constraints: Dict[str, Callable[[str, float], float]] = None, + corr_correction: float = 0.5, config_format: Literal['default', 'sbb'] = "default"): """Abstract constructors for all calibrations. Usually the same parameters should be made available subclasses. @@ -73,11 +74,13 @@ def __init__(self, :param fixed_mode: the fixed mode with asc=0 :param lr: learning rate schedule, will be called with (trial number, mode, update, trial, study) :param constraints: constraints for each mode, must return asc and will be called with original asc + :param corr_correction: factor to reduce learning rates for possibly correlated groups, 0 disables this correction :param config_format: use SBBBehaviorGroups for the parameter config """ super().__init__(modes, initial, target, lr, constraints) self.fixed_mode = fixed_mode + self.corr_correction = corr_correction self.config_format = config_format if "mode" not in self.target.columns: @@ -172,7 +175,13 @@ def update_step(self, param: str, last_trial: optuna.Trial) -> float: # Base mode shares if not param.startswith("["): - return self.calc_asc_update(self.base.loc[param].target, last_trial.user_attrs["%s_share" % param], + step = 1 + + # If all groups would be fully correlated, update step needs to be divided by number of groups + if self.corr_correction > 0: + step = 1 / (len(self.groups) * self.corr_correction) + + return step * self.calc_asc_update(self.base.loc[param].target, last_trial.user_attrs["%s_share" % param], self.base.loc[self.fixed_mode].target, last_trial.user_attrs["%s_share" % self.fixed_mode]) From 59d1dbd48dcb650d6e70e62c31537ab8868245cd Mon Sep 17 00:00:00 2001 From: Christian Rakow Date: Fri, 20 Oct 2023 12:13:34 +0200 Subject: [PATCH 14/29] fix corr correction --- matsim/calibration/calib_group_asc.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/matsim/calibration/calib_group_asc.py b/matsim/calibration/calib_group_asc.py index 22dabaa..8b4324f 100644 --- a/matsim/calibration/calib_group_asc.py +++ b/matsim/calibration/calib_group_asc.py @@ -175,13 +175,7 @@ def update_step(self, param: str, last_trial: optuna.Trial) -> float: # Base mode shares if not param.startswith("["): - step = 1 - - # If all groups would be fully correlated, update step needs to be divided by number of groups - if self.corr_correction > 0: - step = 1 / (len(self.groups) * self.corr_correction) - - return step * self.calc_asc_update(self.base.loc[param].target, last_trial.user_attrs["%s_share" % param], + return self.calc_asc_update(self.base.loc[param].target, last_trial.user_attrs["%s_share" % param], self.base.loc[self.fixed_mode].target, last_trial.user_attrs["%s_share" % self.fixed_mode]) @@ -193,7 +187,12 @@ def update_step(self, param: str, last_trial: optuna.Trial) -> float: t = self.get_group(self.target, parse_group(p)).set_index("mode") - return self.calc_asc_update(t.loc[mode].share, last_trial.user_attrs["%s_share" % param], + step = 1 + # If all groups would be fully correlated, update step needs to be divided by number of groups + if self.corr_correction > 0: + step = 1 / (len(self.groups) * self.corr_correction) + + return step * self.calc_asc_update(t.loc[mode].share, last_trial.user_attrs["%s_share" % param], t.loc[self.fixed_mode].share, last_trial.user_attrs["%s-%s_share" % (p, self.fixed_mode)]) From d47f051a70cce5b8b24272bd4a23b2b4803ab8c7 Mon Sep 17 00:00:00 2001 From: Christian Rakow Date: Mon, 23 Oct 2023 13:24:43 +0200 Subject: [PATCH 15/29] add functions for plotting --- matsim/calibration/__main__.py | 10 ++++++ matsim/calibration/plot.py | 57 ++++++++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+) create mode 100644 matsim/calibration/plot.py diff --git a/matsim/calibration/__main__.py b/matsim/calibration/__main__.py index 62e5d44..576f745 100644 --- a/matsim/calibration/__main__.py +++ b/matsim/calibration/__main__.py @@ -2,6 +2,7 @@ # -*- coding: utf-8 -*- import optuna +import traceback from . import study_as_df @@ -24,3 +25,12 @@ df = study_as_df(study) df.to_csv(args.output, index=False) + + try: + from .plot import plot_study + plot_study(study) + + except ImportError: + print("Could not plot study.") + traceback.print_exc() + diff --git a/matsim/calibration/plot.py b/matsim/calibration/plot.py new file mode 100644 index 0000000..6892837 --- /dev/null +++ b/matsim/calibration/plot.py @@ -0,0 +1,57 @@ +# -*- coding: utf-8 -*- + +import re + +import pandas as pd +import seaborn as sns + +from .utils import study_as_df + + +def plot_study(study): + """ Create convergence plots for a study """ + + modes = study.user_attrs["modes"] + groups = study.user_attrs.get("groups", None) + + df = study_as_df(study) + + df_all = df[["trial"] + [x + "_mae" for x in modes]] + + df_all = pd.melt(df_all, "trial", var_name="main_mode", value_name="mae") + df_all.main_mode = df_all.main_mode.str.removesuffix("_mae") + + ax = sns.lineplot(x="trial", y="mae", hue="main_mode", hue_order=modes, data=df_all) + fig = ax.get_figure() + + fig.savefig("mae_all.png") + + if groups: + + for g in groups: + + p = re.compile(r"\[%s=(.+)\]-(.+)_mae" % g) + + columns = df.columns[df.columns.str.startswith("[" + g) & df.columns.str.endswith("_mae")] + vars = [p.search(x) for x in columns] + + df_g = df[["trial"] + list(columns)] + + data = [] + + # convert wide to long + for idx, r in df_g.iterrows(): + for c, v in zip(columns, vars): + data.append({ + "trial": r["trial"], + "mae": r[c], + g: v[1], + "main_mode": v[2] + }) + + df_g = pd.DataFrame(data) + + grid = sns.FacetGrid(df_g, col=g) + grid.map_dataframe(sns.lineplot, x="trial", y="mae", hue="main_mode", hue_order=modes) + + grid.figure.savefig("mae_%s.png" % g) From 119a0e7ad3d82f8f34b821f448d1f1de29549a8b Mon Sep 17 00:00:00 2001 From: Christian Rakow Date: Thu, 26 Oct 2023 09:40:55 +0200 Subject: [PATCH 16/29] with bug with incorrect datatype --- matsim/calibration/calib_asc.py | 6 +++--- matsim/calibration/calib_group_asc.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/matsim/calibration/calib_asc.py b/matsim/calibration/calib_asc.py index 4fa6ebb..32d6d40 100644 --- a/matsim/calibration/calib_asc.py +++ b/matsim/calibration/calib_asc.py @@ -5,7 +5,7 @@ import optuna -from .base import CalibratorBase, CalibrationInput +from .base import CalibratorBase, CalibrationInput, to_float from .analysis import calc_mode_share @@ -78,6 +78,6 @@ def calc_stats(self, trial: optuna.Trial, run_dir: str, for k, v in shares.items(): trial.set_user_attr("%s_share" % k, v) - trial.set_user_attr("%s_mae" % k, abs(self.target.loc[k] - v)) + trial.set_user_attr("%s_mae" % k, to_float(abs(self.target.loc[k] - v))) - return [(abs(self.target.loc[mode] - trial.user_attrs["%s_share" % mode])) for mode in self.modes] + return [to_float(abs(self.target.loc[mode] - trial.user_attrs["%s_share" % mode])) for mode in self.modes] diff --git a/matsim/calibration/calib_group_asc.py b/matsim/calibration/calib_group_asc.py index 8b4324f..a6fd132 100644 --- a/matsim/calibration/calib_group_asc.py +++ b/matsim/calibration/calib_group_asc.py @@ -64,7 +64,7 @@ def __init__(self, fixed_mode: str = "walk", lr: Callable[[int, str, float, optuna.Trial, optuna.Study], float] = None, constraints: Dict[str, Callable[[str, float], float]] = None, - corr_correction: float = 0.5, + corr_correction: float = 1, config_format: Literal['default', 'sbb'] = "default"): """Abstract constructors for all calibrations. Usually the same parameters should be made available subclasses. From 07b81dda281289e0ad0ca47ffd93e457a90808fb Mon Sep 17 00:00:00 2001 From: Christian Rakow Date: Thu, 26 Oct 2023 11:03:16 +0200 Subject: [PATCH 17/29] save rate and step for debugging --- matsim/calibration/__init__.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/matsim/calibration/__init__.py b/matsim/calibration/__init__.py index 778dc01..a6b9e6e 100644 --- a/matsim/calibration/__init__.py +++ b/matsim/calibration/__init__.py @@ -75,8 +75,9 @@ def sample_independent(self, study, trial, param_name, param_distribution): # numpy types need casting rate = to_float(rate) - trial.set_user_attr("%s_rate" % param_name, rate) - trial.set_user_attr("%s_step" % param_name, step) + # Need to use storage directly, frozen trial does not save attributes + study._storage.set_trial_user_attr(trial._trial_id, "%s_rate" % param_name, rate) + study._storage.set_trial_user_attr(trial._trial_id, "%s_step" % param_name, step) last_param += rate * step From 30ee1ab8064121c6e0d43579fdc91a92e36ca99e Mon Sep 17 00:00:00 2001 From: Christian Rakow Date: Fri, 27 Oct 2023 11:52:32 +0200 Subject: [PATCH 18/29] prepare multi groups, fix some input value checks --- matsim/calibration/calib_group_asc.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/matsim/calibration/calib_group_asc.py b/matsim/calibration/calib_group_asc.py index a6fd132..01d7e0f 100644 --- a/matsim/calibration/calib_group_asc.py +++ b/matsim/calibration/calib_group_asc.py @@ -64,6 +64,7 @@ def __init__(self, fixed_mode: str = "walk", lr: Callable[[int, str, float, optuna.Trial, optuna.Study], float] = None, constraints: Dict[str, Callable[[str, float], float]] = None, + multi_groups: Sequence[str] = None, corr_correction: float = 1, config_format: Literal['default', 'sbb'] = "default"): """Abstract constructors for all calibrations. Usually the same parameters should be made available subclasses. @@ -74,21 +75,23 @@ def __init__(self, :param fixed_mode: the fixed mode with asc=0 :param lr: learning rate schedule, will be called with (trial number, mode, update, trial, study) :param constraints: constraints for each mode, must return asc and will be called with original asc + :param multi_groups: Groups that contain multiple values, that will be split by "," :param corr_correction: factor to reduce learning rates for possibly correlated groups, 0 disables this correction :param config_format: use SBBBehaviorGroups for the parameter config """ super().__init__(modes, initial, target, lr, constraints) self.fixed_mode = fixed_mode + self.multi_groups = multi_groups self.corr_correction = corr_correction self.config_format = config_format - if "mode" not in self.target.columns: + if "mode" not in self.target.columns and "main_mode" not in self.target.columns: raise ValueError("Target must have 'mode' or 'main_mode' column") - self.target = self.target.rename(columns={"value": "share"}, errors="ignore") + self.target = self.target.rename(columns={"value": "share", "main_mode": "mode"}, errors="ignore") - if "mode" not in self.target.columns: + if "share" not in self.target.columns: raise ValueError("Target must have 'share' column") self.groups = [t for t in self.target.columns if t not in ('mode', 'share', 'asc')] From bf00be570566c217f1602b4f13cff7b8c857e8ae Mon Sep 17 00:00:00 2001 From: Christian Rakow Date: Mon, 30 Oct 2023 09:30:41 +0100 Subject: [PATCH 19/29] argument to deactivate the base calibration --- matsim/calibration/calib_group_asc.py | 45 ++++++++++++++++----------- 1 file changed, 27 insertions(+), 18 deletions(-) diff --git a/matsim/calibration/calib_group_asc.py b/matsim/calibration/calib_group_asc.py index 01d7e0f..4d5362c 100644 --- a/matsim/calibration/calib_group_asc.py +++ b/matsim/calibration/calib_group_asc.py @@ -3,6 +3,7 @@ import sys from typing import Sequence, Callable, Dict, Literal from functools import reduce +from collections import defaultdict import numpy as np import optuna @@ -64,6 +65,7 @@ def __init__(self, fixed_mode: str = "walk", lr: Callable[[int, str, float, optuna.Trial, optuna.Study], float] = None, constraints: Dict[str, Callable[[str, float], float]] = None, + calib_base: bool = True, multi_groups: Sequence[str] = None, corr_correction: float = 1, config_format: Literal['default', 'sbb'] = "default"): @@ -75,6 +77,7 @@ def __init__(self, :param fixed_mode: the fixed mode with asc=0 :param lr: learning rate schedule, will be called with (trial number, mode, update, trial, study) :param constraints: constraints for each mode, must return asc and will be called with original asc + :param calib_base: whether to calibrate the ungrouped base ASC :param multi_groups: Groups that contain multiple values, that will be split by "," :param corr_correction: factor to reduce learning rates for possibly correlated groups, 0 disables this correction :param config_format: use SBBBehaviorGroups for the parameter config @@ -96,11 +99,15 @@ def __init__(self, self.groups = [t for t in self.target.columns if t not in ('mode', 'share', 'asc')] - self.base = self.get_group(self.target, None) - self.base = self.base.groupby("mode").agg(target=("share", "sum")) + # If self.base is None, base calibration won't be performed + if calib_base: + self.base = self.get_group(self.target, None) + self.base = self.base.groupby("mode").agg(target=("share", "sum")) - if len(self.base) == 0: - raise ValueError("Target must contain base without any attributes") + if len(self.base) == 0: + raise ValueError("Target must contain base without any attributes") + else: + self.base = None def get_group(self, df, groups: dict = None): """ Get data of one group""" @@ -131,12 +138,13 @@ def init_study(self, study: optuna.Trial): def update_config(self, trial: optuna.Trial, prefix: str, config: dict): # Base ascs for each mode - base = {} + base = defaultdict(lambda: 0) - for mode in self.modes: - m = self.get_mode_params(config, mode) - m["constant"] = trial.suggest_float(prefix + mode, sys.float_info.min, sys.float_info.max) - base[mode] = m["constant"] + if self.base is not None: + for mode in self.modes: + m = self.get_mode_params(config, mode) + m["constant"] = trial.suggest_float(prefix + mode, sys.float_info.min, sys.float_info.max) + base[mode] = m["constant"] # Grouped ascs for g in self.groups: @@ -207,18 +215,19 @@ def calc_stats(self, trial: optuna.Trial, run_dir: str, transform_persons=transform_persons, transform_trips=transform_trips) - base_share = trips.groupby("main_mode").count()["trip_number"] / len(trips) - base_share = self.base.merge(base_share, left_index=True, right_index=True).rename( - columns={"trip_number": "share"}) + if self.base is not None: + base_share = trips.groupby("main_mode").count()["trip_number"] / len(trips) + base_share = self.base.merge(base_share, left_index=True, right_index=True).rename( + columns={"trip_number": "share"}) - base_share["mae"] = np.abs(base_share.share - base_share.target) + base_share["mae"] = np.abs(base_share.share - base_share.target) - for kv in base_share.itertuples(): - trial.set_user_attr("%s_share" % kv.Index, kv.share) - trial.set_user_attr("%s_mae" % kv.Index, kv.mae) + for kv in base_share.itertuples(): + trial.set_user_attr("%s_share" % kv.Index, kv.share) + trial.set_user_attr("%s_mae" % kv.Index, kv.mae) - print("Obtained base shares:") - print(base_share) + print("Obtained base shares:") + print(base_share) errs = [] From abddb1814d9971270c32689c4f210b6dedcde27d Mon Sep 17 00:00:00 2001 From: Christian Rakow Date: Wed, 22 Nov 2023 16:22:47 +0100 Subject: [PATCH 20/29] change how the correction factor works --- matsim/calibration/calib_group_asc.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/matsim/calibration/calib_group_asc.py b/matsim/calibration/calib_group_asc.py index 4d5362c..d068fa1 100644 --- a/matsim/calibration/calib_group_asc.py +++ b/matsim/calibration/calib_group_asc.py @@ -146,6 +146,11 @@ def update_config(self, trial: optuna.Trial, prefix: str, config: dict): m["constant"] = trial.suggest_float(prefix + mode, sys.float_info.min, sys.float_info.max) base[mode] = m["constant"] + step = 1 + # If all groups were fully correlated, update step needs to be divided by number of groups + if self.corr_correction > 0: + step = 1 / (len(self.groups) * self.corr_correction) + # Grouped ascs for g in self.groups: for v in set(self.target[g]): @@ -164,7 +169,7 @@ def update_config(self, trial: optuna.Trial, prefix: str, config: dict): continue m = get_sbb_params(config, g, v, mode) - m["deltaConstant"] = p - base[mode] + m["deltaConstant"] = (p - base[mode]) * step else: raise ValueError("Currently only ssb config format is supported") @@ -198,12 +203,7 @@ def update_step(self, param: str, last_trial: optuna.Trial) -> float: t = self.get_group(self.target, parse_group(p)).set_index("mode") - step = 1 - # If all groups would be fully correlated, update step needs to be divided by number of groups - if self.corr_correction > 0: - step = 1 / (len(self.groups) * self.corr_correction) - - return step * self.calc_asc_update(t.loc[mode].share, last_trial.user_attrs["%s_share" % param], + return self.calc_asc_update(t.loc[mode].share, last_trial.user_attrs["%s_share" % param], t.loc[self.fixed_mode].share, last_trial.user_attrs["%s-%s_share" % (p, self.fixed_mode)]) From 96d30cff5a4059cf03730e69d623f70e49de8d10 Mon Sep 17 00:00:00 2001 From: Christian Rakow Date: Thu, 23 Nov 2023 13:38:37 +0100 Subject: [PATCH 21/29] save delta values as attributes --- matsim/calibration/calib_group_asc.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/matsim/calibration/calib_group_asc.py b/matsim/calibration/calib_group_asc.py index d068fa1..774ca84 100644 --- a/matsim/calibration/calib_group_asc.py +++ b/matsim/calibration/calib_group_asc.py @@ -51,9 +51,11 @@ def get_sbb_params(config, group, value, mode): [{"attributeValues": value, "absoluteModeCorrections": [m]}]}) return m + def parse_group(p): p = p.strip("[]") - return {x.split("=")[0]:x.split("=")[1] for x in p.split(",")} + return {x.split("=")[0]: x.split("=")[1] for x in p.split(",")} + class ASCGroupCalibrator(CalibratorBase): """ Calibrates the alternative specific for specific subpopulations """ @@ -161,18 +163,21 @@ def update_config(self, trial: optuna.Trial, prefix: str, config: dict): for mode in self.modes: # Update constants if self.config_format == "sbb": - p = trial.suggest_float(prefix + "[%s]-%s" % (attr, mode), - sys.float_info.min, sys.float_info.max) + param = "[%s]-%s" % (attr, mode) + p = trial.suggest_float(prefix + param, sys.float_info.min, sys.float_info.max) # don't write certain values if p == 0 and mode == self.fixed_mode: continue m = get_sbb_params(config, g, v, mode) - m["deltaConstant"] = (p - base[mode]) * step - else: - raise ValueError("Currently only ssb config format is supported") + delta = (p - base[mode]) * step + m["deltaConstant"] = delta + trial.set_user_attr("%s_delta" % param, delta) + + else: + raise ValueError("Currently only ssb config format is supported") def sample_initial(self, param: str) -> float: attr, _, mode = param.rpartition("-") @@ -203,6 +208,8 @@ def update_step(self, param: str, last_trial: optuna.Trial) -> float: t = self.get_group(self.target, parse_group(p)).set_index("mode") + # TODO: the previous applied correction might be considered here as well + return self.calc_asc_update(t.loc[mode].share, last_trial.user_attrs["%s_share" % param], t.loc[self.fixed_mode].share, last_trial.user_attrs["%s-%s_share" % (p, self.fixed_mode)]) From 0e17499304e11c8d6d73b1d3791bff86bd44f70b Mon Sep 17 00:00:00 2001 From: Christian Rakow Date: Fri, 24 Nov 2023 09:55:18 +0100 Subject: [PATCH 22/29] option to alternate base and group calibration --- matsim/calibration/__init__.py | 2 +- matsim/calibration/base.py | 2 +- matsim/calibration/calib_asc.py | 2 +- matsim/calibration/calib_group_asc.py | 34 ++++++++++++++++++++------- 4 files changed, 28 insertions(+), 12 deletions(-) diff --git a/matsim/calibration/__init__.py b/matsim/calibration/__init__.py index a6b9e6e..928ae12 100644 --- a/matsim/calibration/__init__.py +++ b/matsim/calibration/__init__.py @@ -159,7 +159,7 @@ def f(trial): for c in calibrate: prefix = c.name + "-" - c.update_config(trial, prefix, params) + c.update_config(study, trial, prefix, params) with open(params_path, "w") as f: yaml.dump(params, f, sort_keys=False) diff --git a/matsim/calibration/base.py b/matsim/calibration/base.py index 6d43ccd..26db386 100644 --- a/matsim/calibration/base.py +++ b/matsim/calibration/base.py @@ -64,7 +64,7 @@ def init_study(self, study: optuna.Study): raise NotImplemented @abstractmethod - def update_config(self, trial: optuna.Trial, prefix: str, config: dict): + def update_config(self, study: optuna.Study, trial: optuna.Trial, prefix: str, config: dict): """ Calculate updated config for this trial. This method must interact with the trial using `trial.suggest_float` and by using the given prefix . """ raise NotImplemented diff --git a/matsim/calibration/calib_asc.py b/matsim/calibration/calib_asc.py index 32d6d40..2aba03d 100644 --- a/matsim/calibration/calib_asc.py +++ b/matsim/calibration/calib_asc.py @@ -45,7 +45,7 @@ def init_study(self, study: optuna.Trial): print("Running study with target:", self.target) - def update_config(self, trial: optuna.Trial, prefix: str, config: dict): + def update_config(self, study: optuna.Study, trial: optuna.Trial, prefix: str, config: dict): # Update constants for mode in self.modes: diff --git a/matsim/calibration/calib_group_asc.py b/matsim/calibration/calib_group_asc.py index 774ca84..a1cc267 100644 --- a/matsim/calibration/calib_group_asc.py +++ b/matsim/calibration/calib_group_asc.py @@ -11,6 +11,7 @@ from .base import CalibratorBase, CalibrationInput from .analysis import read_trips_and_persons +from .utils import completed_trials def get_sbb_params(config, group, value, mode): @@ -67,7 +68,7 @@ def __init__(self, fixed_mode: str = "walk", lr: Callable[[int, str, float, optuna.Trial, optuna.Study], float] = None, constraints: Dict[str, Callable[[str, float], float]] = None, - calib_base: bool = True, + calib_base: Literal['always', 'alternating', 'never'] = "always", multi_groups: Sequence[str] = None, corr_correction: float = 1, config_format: Literal['default', 'sbb'] = "default"): @@ -79,7 +80,7 @@ def __init__(self, :param fixed_mode: the fixed mode with asc=0 :param lr: learning rate schedule, will be called with (trial number, mode, update, trial, study) :param constraints: constraints for each mode, must return asc and will be called with original asc - :param calib_base: whether to calibrate the ungrouped base ASC + :param calib_base: how to calibrate the ungrouped base ASC, can be always, alternating (every 2nd iteration) or never :param multi_groups: Groups that contain multiple values, that will be split by "," :param corr_correction: factor to reduce learning rates for possibly correlated groups, 0 disables this correction :param config_format: use SBBBehaviorGroups for the parameter config @@ -94,6 +95,9 @@ def __init__(self, if "mode" not in self.target.columns and "main_mode" not in self.target.columns: raise ValueError("Target must have 'mode' or 'main_mode' column") + if calib_base not in ("always", "alternating", "never"): + raise ValueError("calib_base must be one of 'always', 'alternating' or 'never'") + self.target = self.target.rename(columns={"value": "share", "main_mode": "mode"}, errors="ignore") if "share" not in self.target.columns: @@ -101,15 +105,16 @@ def __init__(self, self.groups = [t for t in self.target.columns if t not in ('mode', 'share', 'asc')] - # If self.base is None, base calibration won't be performed - if calib_base: + if calib_base != "never": self.base = self.get_group(self.target, None) self.base = self.base.groupby("mode").agg(target=("share", "sum")) + self.alternate_base = calib_base == "alternating" if len(self.base) == 0: raise ValueError("Target must contain base without any attributes") else: self.base = None + self.alternate_base = False def get_group(self, df, groups: dict = None): """ Get data of one group""" @@ -137,16 +142,27 @@ def init_study(self, study: optuna.Trial): print("Calibrating groups:", self.groups) print("Running with base target:", self.base) - def update_config(self, trial: optuna.Trial, prefix: str, config: dict): + def update_config(self, study: optuna.Study, trial: optuna.Trial, prefix: str, config: dict): # Base ascs for each mode base = defaultdict(lambda: 0) + completed = completed_trials(study) + if self.base is not None: - for mode in self.modes: - m = self.get_mode_params(config, mode) - m["constant"] = trial.suggest_float(prefix + mode, sys.float_info.min, sys.float_info.max) - base[mode] = m["constant"] + + if not self.alternate_base or len(completed) % 2 == 0: + for mode in self.modes: + m = self.get_mode_params(config, mode) + m["constant"] = trial.suggest_float(prefix + mode, sys.float_info.min, sys.float_info.max) + base[mode] = m["constant"] + else: + # Use attribute from last trial + last_trial = completed[-1] + for mode in self.modes: + m = self.get_mode_params(config, mode) + m["constant"] = last_trial.params[prefix + mode] + base[mode] = m["constant"] step = 1 # If all groups were fully correlated, update step needs to be divided by number of groups From e4d7f1e166d048841661a0ece91619033901a902 Mon Sep 17 00:00:00 2001 From: Christian Rakow Date: Sat, 25 Nov 2023 10:28:50 +0100 Subject: [PATCH 23/29] alternate group calibration --- matsim/calibration/calib_group_asc.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/matsim/calibration/calib_group_asc.py b/matsim/calibration/calib_group_asc.py index a1cc267..0f7fc50 100644 --- a/matsim/calibration/calib_group_asc.py +++ b/matsim/calibration/calib_group_asc.py @@ -180,7 +180,13 @@ def update_config(self, study: optuna.Study, trial: optuna.Trial, prefix: str, c # Update constants if self.config_format == "sbb": param = "[%s]-%s" % (attr, mode) - p = trial.suggest_float(prefix + param, sys.float_info.min, sys.float_info.max) + + if not self.alternate_base or len(completed) == 0 or len(completed) % 2 == 1: + p = trial.suggest_float(prefix + param, sys.float_info.min, sys.float_info.max) + else: + # Use from previous iteration + last_trial = completed[-1] + p = last_trial.params[prefix + mode] # don't write certain values if p == 0 and mode == self.fixed_mode: From 8896f089fb6d38b553bd1aae44859ea24ed2788a Mon Sep 17 00:00:00 2001 From: Christian Rakow Date: Sat, 25 Nov 2023 10:40:29 +0100 Subject: [PATCH 24/29] better error message --- matsim/calibration/calib_group_asc.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/matsim/calibration/calib_group_asc.py b/matsim/calibration/calib_group_asc.py index 0f7fc50..1ddf16b 100644 --- a/matsim/calibration/calib_group_asc.py +++ b/matsim/calibration/calib_group_asc.py @@ -98,6 +98,9 @@ def __init__(self, if calib_base not in ("always", "alternating", "never"): raise ValueError("calib_base must be one of 'always', 'alternating' or 'never'") + if config_format not in ("default", "sbb"): + raise ValueError("config_format must be one of 'default' or 'sbb'") + self.target = self.target.rename(columns={"value": "share", "main_mode": "mode"}, errors="ignore") if "share" not in self.target.columns: From f697ff8700c697baf2a77ddd6fd2bca58a43235a Mon Sep 17 00:00:00 2001 From: Christian Rakow Date: Sat, 25 Nov 2023 10:42:25 +0100 Subject: [PATCH 25/29] fix indent error --- matsim/calibration/calib_group_asc.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/matsim/calibration/calib_group_asc.py b/matsim/calibration/calib_group_asc.py index 1ddf16b..6a6a654 100644 --- a/matsim/calibration/calib_group_asc.py +++ b/matsim/calibration/calib_group_asc.py @@ -200,9 +200,8 @@ def update_config(self, study: optuna.Study, trial: optuna.Trial, prefix: str, c delta = (p - base[mode]) * step m["deltaConstant"] = delta trial.set_user_attr("%s_delta" % param, delta) - - else: - raise ValueError("Currently only ssb config format is supported") + else: + raise ValueError("Currently only ssb config format is supported") def sample_initial(self, param: str) -> float: attr, _, mode = param.rpartition("-") From 5f53f4b681094242ef08a56b22fd5177d2f0f7f4 Mon Sep 17 00:00:00 2001 From: Christian Rakow Date: Mon, 27 Nov 2023 14:41:31 +0100 Subject: [PATCH 26/29] implement alternating updates in the update step method --- matsim/calibration/__init__.py | 2 +- matsim/calibration/base.py | 2 +- matsim/calibration/calib_asc.py | 2 +- matsim/calibration/calib_group_asc.py | 37 ++++++++++----------------- 4 files changed, 17 insertions(+), 26 deletions(-) diff --git a/matsim/calibration/__init__.py b/matsim/calibration/__init__.py index 928ae12..77a4f47 100644 --- a/matsim/calibration/__init__.py +++ b/matsim/calibration/__init__.py @@ -61,7 +61,7 @@ def sample_independent(self, study, trial, param_name, param_distribution): last = completed[-1] last_param = last.params[param_name] - step = to_float(c.update_step(param, last)) + step = to_float(c.update_step(param, last, completed)) rate = 1.0 if c.lr is not None: diff --git a/matsim/calibration/base.py b/matsim/calibration/base.py index 26db386..417c67e 100644 --- a/matsim/calibration/base.py +++ b/matsim/calibration/base.py @@ -75,7 +75,7 @@ def sample_initial(self, param: str) -> float: raise NotImplemented @abstractmethod - def update_step(self, param: str, last_trial: optuna.Trial) -> float: + def update_step(self, param: str, last_trial: optuna.Trial, completed: Sequence[optuna.Trial]) -> float: """ Return update step for param based on last trial """ raise NotImplemented diff --git a/matsim/calibration/calib_asc.py b/matsim/calibration/calib_asc.py index 2aba03d..d537272 100644 --- a/matsim/calibration/calib_asc.py +++ b/matsim/calibration/calib_asc.py @@ -58,7 +58,7 @@ def sample_initial(self, param: str) -> float: return 0 - def update_step(self, param: str, last_trial: optuna.Trial) -> float: + def update_step(self, param: str, last_trial: optuna.Trial, completed: Sequence[optuna.Trial]) -> float: if param == self.fixed_mode: return 0 diff --git a/matsim/calibration/calib_group_asc.py b/matsim/calibration/calib_group_asc.py index 6a6a654..9264e1b 100644 --- a/matsim/calibration/calib_group_asc.py +++ b/matsim/calibration/calib_group_asc.py @@ -150,22 +150,11 @@ def update_config(self, study: optuna.Study, trial: optuna.Trial, prefix: str, c # Base ascs for each mode base = defaultdict(lambda: 0) - completed = completed_trials(study) - if self.base is not None: - - if not self.alternate_base or len(completed) % 2 == 0: - for mode in self.modes: - m = self.get_mode_params(config, mode) - m["constant"] = trial.suggest_float(prefix + mode, sys.float_info.min, sys.float_info.max) - base[mode] = m["constant"] - else: - # Use attribute from last trial - last_trial = completed[-1] - for mode in self.modes: - m = self.get_mode_params(config, mode) - m["constant"] = last_trial.params[prefix + mode] - base[mode] = m["constant"] + for mode in self.modes: + m = self.get_mode_params(config, mode) + m["constant"] = trial.suggest_float(prefix + mode, sys.float_info.min, sys.float_info.max) + base[mode] = m["constant"] step = 1 # If all groups were fully correlated, update step needs to be divided by number of groups @@ -183,13 +172,7 @@ def update_config(self, study: optuna.Study, trial: optuna.Trial, prefix: str, c # Update constants if self.config_format == "sbb": param = "[%s]-%s" % (attr, mode) - - if not self.alternate_base or len(completed) == 0 or len(completed) % 2 == 1: - p = trial.suggest_float(prefix + param, sys.float_info.min, sys.float_info.max) - else: - # Use from previous iteration - last_trial = completed[-1] - p = last_trial.params[prefix + mode] + p = trial.suggest_float(prefix + param, sys.float_info.min, sys.float_info.max) # don't write certain values if p == 0 and mode == self.fixed_mode: @@ -213,13 +196,17 @@ def sample_initial(self, param: str) -> float: return 0 - def update_step(self, param: str, last_trial: optuna.Trial) -> float: + def update_step(self, param: str, last_trial: optuna.Trial, completed: Sequence[optuna.Trial]) -> float: if param == self.fixed_mode: return 0 # Base mode shares if not param.startswith("["): + # No update every 2nd iteration + if self.alternate_base and len(completed) % 2 == 1: + return 0 + return self.calc_asc_update(self.base.loc[param].target, last_trial.user_attrs["%s_share" % param], self.base.loc[self.fixed_mode].target, last_trial.user_attrs["%s_share" % self.fixed_mode]) @@ -227,6 +214,10 @@ def update_step(self, param: str, last_trial: optuna.Trial) -> float: else: p, _, mode = param.rpartition("-") + # No update every other iteration + if self.alternate_base and len(completed) % 2 == 0: + return 0 + if mode == self.fixed_mode: return 0 From ed738d0e0217b946d2f473a860108e1d838a282d Mon Sep 17 00:00:00 2001 From: Christian Rakow Date: Thu, 30 Nov 2023 12:22:37 +0100 Subject: [PATCH 27/29] fix usage of multi groups --- matsim/calibration/calib_group_asc.py | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/matsim/calibration/calib_group_asc.py b/matsim/calibration/calib_group_asc.py index 9264e1b..9b806c1 100644 --- a/matsim/calibration/calib_group_asc.py +++ b/matsim/calibration/calib_group_asc.py @@ -11,7 +11,6 @@ from .base import CalibratorBase, CalibrationInput from .analysis import read_trips_and_persons -from .utils import completed_trials def get_sbb_params(config, group, value, mode): @@ -88,7 +87,7 @@ def __init__(self, super().__init__(modes, initial, target, lr, constraints) self.fixed_mode = fixed_mode - self.multi_groups = multi_groups + self.multi_groups = set(multi_groups) self.corr_correction = corr_correction self.config_format = config_format @@ -119,14 +118,26 @@ def __init__(self, self.base = None self.alternate_base = False - def get_group(self, df, groups: dict = None): + def get_group(self, df, groups: dict = None, use_multi=False): """ Get data of one group""" # return result for empty groups if groups is None: idx = reduce(lambda x, y: x & y, [pd.isna(df[g]) for g in self.groups]) return df[idx] - idx = reduce(lambda x, y: x & y, [df[g] == v for g, v in groups.items()]) + if use_multi: + idx = [] + for g, v in groups.items(): + if g in self.multi_groups: + vs = v.split(",") + idx.append(df[g].isin(vs)) + else: + idx.append(df[g] == v) + + idx = reduce(lambda x, y: x & y, idx) + else: + idx = reduce(lambda x, y: x & y, [df[g] == v for g, v in groups.items()]) + return df[idx] @property @@ -258,7 +269,7 @@ def calc_stats(self, trial: optuna.Trial, run_dir: str, if pd.isna(v): continue - sub_trips = self.get_group(trips, {g: v}) + sub_trips = self.get_group(trips, {g: v}, g in self.multi_groups) sub_share = sub_trips.groupby("main_mode").count()["trip_number"] / len(sub_trips) sub_share.rename("share", inplace=True) From 650862dcd75045fd5457fc20609fbfaa0f43fd38 Mon Sep 17 00:00:00 2001 From: Christian Rakow Date: Fri, 1 Dec 2023 11:28:31 +0100 Subject: [PATCH 28/29] update readme to new calibration --- README.md | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 341cda4..8b21d5f 100644 --- a/README.md +++ b/README.md @@ -237,6 +237,8 @@ Scenarios created with the `matsim-application` contrib provide an interface tha # ------------------------------------------------------------------- # 5. CALIBRATION: Automatic calibration for MATSim scenario. +from matsim.calibration import create_calibration, ASCCalibrator, utils, study_as_df + modes = ["walk", "car", "pt", "bike"] fixed_mode = "walk" target = { @@ -246,11 +248,29 @@ target = { "car": 0.359 } -study, obj = calibration.create_mode_share_study("calib", "./matsim-scenario-1.0-SNAPSHOT.jar", - "./scenarios/input/scenario-v1.0-10pct.config.xml", - modes, fixed_mode, target) +def filter_persons(df): + return df[df.subpopulation == "person"] + +def filter_modes(df): + return df[df.main_mode.isin(modes)] +study, obj = create_calibration("calib", ASCCalibrator(modes, initial, target, fixed_mode=fixed_mode, + lr=utils.linear_scheduler(start=0.25, interval=10)), + "./matsim-scenario-1.0-SNAPSHOT.jar", + "./scenarios/input/scenario-v1.0-10pct.config.xml", + args="--config:controler.lastIteration 400", + jvm_args="-Xmx12G -Xmx12G -XX:+AlwaysPreTouch -XX:+UseParallelGC", + transform_persons=filter_persons, + transform_trips=filter_modes, + chain_runs=utils.default_chain_scheduler, debug=False) study.optimize(obj, 10) +df = study_as_df(study) +df.to_csv("report.csv") + +from matsim.calibration.plot import plot_study + +plot_study(study) + ``` From f7615c177868be1a32b43115367b34605574eb99 Mon Sep 17 00:00:00 2001 From: Christian Rakow Date: Mon, 11 Dec 2023 09:27:47 +0100 Subject: [PATCH 29/29] fix typo --- matsim/calibration/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/matsim/calibration/utils.py b/matsim/calibration/utils.py index bc174c0..9594f28 100644 --- a/matsim/calibration/utils.py +++ b/matsim/calibration/utils.py @@ -53,7 +53,7 @@ def linear_scheduler(start=0.6, end=1, interval=3): """ Creates a lr scheduler that will interpolate linearly from start to end over the first n iterations. :param start: Initial learning rate. - :param end: Finial learning rate to reach. + :param end: Final learning rate to reach. :param interval: Number of runs until end rate should be reached. """ if interval < 2: