diff --git a/README.md b/README.md index 341cda4..8b21d5f 100644 --- a/README.md +++ b/README.md @@ -237,6 +237,8 @@ Scenarios created with the `matsim-application` contrib provide an interface tha # ------------------------------------------------------------------- # 5. CALIBRATION: Automatic calibration for MATSim scenario. +from matsim.calibration import create_calibration, ASCCalibrator, utils, study_as_df + modes = ["walk", "car", "pt", "bike"] fixed_mode = "walk" target = { @@ -246,11 +248,29 @@ target = { "car": 0.359 } -study, obj = calibration.create_mode_share_study("calib", "./matsim-scenario-1.0-SNAPSHOT.jar", - "./scenarios/input/scenario-v1.0-10pct.config.xml", - modes, fixed_mode, target) +def filter_persons(df): + return df[df.subpopulation == "person"] + +def filter_modes(df): + return df[df.main_mode.isin(modes)] +study, obj = create_calibration("calib", ASCCalibrator(modes, initial, target, fixed_mode=fixed_mode, + lr=utils.linear_scheduler(start=0.25, interval=10)), + "./matsim-scenario-1.0-SNAPSHOT.jar", + "./scenarios/input/scenario-v1.0-10pct.config.xml", + args="--config:controler.lastIteration 400", + jvm_args="-Xmx12G -Xmx12G -XX:+AlwaysPreTouch -XX:+UseParallelGC", + transform_persons=filter_persons, + transform_trips=filter_modes, + chain_runs=utils.default_chain_scheduler, debug=False) study.optimize(obj, 10) +df = study_as_df(study) +df.to_csv("report.csv") + +from matsim.calibration.plot import plot_study + +plot_study(study) + ``` diff --git a/matsim/calibration.py b/matsim/calibration.py deleted file mode 100644 index 770e5ea..0000000 --- a/matsim/calibration.py +++ /dev/null @@ -1,452 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import os -import subprocess -import glob -import math -import shutil -import sys -from os import path, makedirs -from time import sleep - -from typing import Any, Union, Sequence, Dict, Callable, Tuple - -import yaml -import numpy as np -import pandas as pd -import geopandas -import optuna - -from optuna.trial import TrialState - -# This is there so that this file can be used standalone, as well as installed from the package -try: - import analysis -except ImportError: - from . import analysis - -def _completed_trials(study): - completed = filter(lambda s: s.state == TrialState.COMPLETE, study.trials) - return sorted(completed, key=lambda s: s.number) - -def _same_sign(x): - x = x.to_numpy() - return np.all(x >= 0) if x[0] >= 0 else np.all(x < 0) - -class ASCSampler(optuna.samplers.BaseSampler): - """ Sample asc according to obtained mode shares """ - - def __init__(self, mode_share, fixed_mode, initial_asc, lr, constraints): - - self.mode_share = mode_share - self.fixed_mode = fixed_mode - self.initial_asc = initial_asc - self.lr = lr - self.constraints = constraints - - def infer_relative_search_space(self, study, trial): - return {} - - def sample_relative(self, study, trial, search_space): - #study._storage.set_trial_system_attr(trial._trial_id, "search_space", self._search_space) - return {} - - def sample_independent(self, study, trial, param_name, param_distribution): - - if param_name == self.fixed_mode: - return 0 - - completed = _completed_trials(study) - - if len(completed) == 0: - asc = self.initial_asc[param_name] - - if self.constraints is not None and param_name in self.constraints: - asc = self.constraints[param_name](asc) - - return asc - - last = completed[-1] - - asc = last.params[param_name] - - step = self.calc_update(self.mode_share[param_name], last.user_attrs["%s_share" % param_name], - self.mode_share[self.fixed_mode], last.user_attrs["%s_share" % self.fixed_mode]) - - rate = 1.0 - if self.lr is not None: - - rate = self.lr(float(len(completed) + 1), param_name, step, self.mode_share, trial, study) - - # rate of None or 0 would be invalid - if not rate: - rate = 1 - - # numpy types need casting - rate = float(rate) - - trial.set_user_attr("%s_rate" % param_name, rate) - trial.set_user_attr("%s_step" % param_name, step) - - asc += rate * step - - # Call constraint if present - if self.constraints is not None and param_name in self.constraints: - asc = self.constraints[param_name](asc) - - return asc - - @staticmethod - def calc_update(z_i, m_i, z_0, m_0): - """ Calculates the asc update for one step """ - # Update asc - # (1) Let z_i be the observed share of mode i. (real data, to be reproduced) - # (2) Run the simulation to convergence. Obtain simulated mode shares m_i. - # (3) Do nothing for mode 0. For all other modes: add [ln(z_i) - ln(m_i)] – [ln(z_0) - ln(m_0)] to its ASC. - # (4) Goto 2. - return math.log(z_i) - math.log(m_i) - (math.log(z_0) - math.log(m_0)) - -def calc_adjusted_mode_share(sim: pd.DataFrame, survey: pd.DataFrame, - count_var: str = "trips", dist_var: str = "dist_group") -> Tuple[Any, pd.DataFrame]: - """ This function can be used if the given input trip data has a different distance distribution than the survey data. - It will rescale the survey data to match simulated data, which allows to calculate an adjusted overall mode share. - - :param sim: data frame containing shares for distance group and modes - :param survey: data frame containing shares from survey data - :param count_var: name of column containing the number of trips or share in 'sim' - :param dist_var: name of the column holding the distance group information - :return: tuple of optimization result and adjusted mode share - """ - from scipy.optimize import minimize - - sagg = sim.groupby(dist_var).sum() - sagg['share'] = sagg[count_var] / np.sum(sagg[count_var]) - - # Rescale the distance groups of the survey data so that it matches the distance group distribution of the simulation - # The overall mode share after this adjustment will the resulting adjusted mode share - def f(x, result=False): - adj = survey.copy() - - for i, t in enumerate(x): - adj.loc[adj[dist_var] == sagg.index[i], "share"] *= t - - adj.share = adj.share / np.sum(adj.share) - - agg = adj.groupby(dist_var).sum() - - # Minimized difference between adjusted and simulated distribution - err = sum((sagg.share - agg.share)**2) - - if result: - return adj - - return err - - # One variable for each distance group - x0 = np.ones(len(sagg.index)) / len(sagg.index) - - # Sum should be less than one - cons = [{'type': 'ineq', 'fun': lambda x: 1 - sum(x)}] - bnds = tuple((0, 1) for x in x0) - - res = minimize(f, x0, method='SLSQP', bounds=bnds, constraints=cons) - - df = f(res.x, True) - - return res, df - - -def study_as_df(study): - """ Convert study to dataframe """ - completed = _completed_trials(study) - - modes = study.user_attrs["modes"] - #fixed_mode = study.user_attrs["fixed_mode"] - - data = [] - - for i, trial in enumerate(completed): - - for j, m in enumerate(modes): - entry = { - "trial": i, - "mode": m, - "asc": trial.params[m], - "error": trial.values[j] - } - - for k, v in trial.user_attrs.items(): - if k.startswith(m + "_"): - entry[k[len(m) + 1:]] = v - - data.append(entry) - - return pd.DataFrame(data) - -def auto_lr_scheduler(start=9, interval=3, lookback=3, throttle=0.7): - """ Creates a function to be passed into the lr argument of the mode share study. This scheduler will - try to increase the step size automatically to speed up convergence. - - :param start: earliest run to start adjusting - :param interval: apply every x runs - :param lookback: look at x recent runs for the interpolation - :param throttle: reduce estimate as step size will most likely be over-estimated - """ - if start < 1: - raise ValueError("Start must be at least 1") - - if interval > start: - raise ValueError("Start must be less equal than interval") - - def _fn(n, mode, update, mode_share, trial, study): - - if n < start: - return 1 - - # Only run every x iterations - if (n - start) % interval != 0: - return 1 - - df = study_as_df(study) - df = df[df["mode"] == mode] - - updates = df.asc.diff(1) - changes = df.share.diff(1) - - # ensure that all previous updates and all previous reactions have been in the same direction - # otherwise there will be instability - if not _same_sign(updates.iloc[-lookback:]) or not _same_sign(changes.iloc[-lookback:]): - return 1 - - asc = df.asc.iloc[-1] - - x = df.share.iloc[-lookback:] - y = df.asc.iloc[-lookback:] - - A = np.vstack([x, np.ones(len(x))]).T - m, c = np.linalg.lstsq(A, y, rcond=None)[0] - - target = m*mode_share[mode] + c - new_update = (target - asc) * throttle - - # updates must be in same direction - if np.sign(update) != np.sign(new_update): - return 1 - - trial.set_user_attr("%s_auto_lr" % mode, True) - - return new_update / update - - - return _fn - -def default_chain_scheduler(completed): - """ Default function to determin when to chain runs. """ - - n = len(completed) - - if n <= 6: - return n % 2 == 0 - - if n <= 15: - return n % 5 == 0 - - if n <= 50: - return n % 10 == 0 - - return False - -def linear_lr_scheduler(start=0.6, end=1, interval=3): - """ Creates an lr scheduler that will interpolate linearly from start to end over the first n iterations. - - :param start: Initial learning rate. - :param end: Finial learning rate to reach. - :param interval: Number of runs until end rate should be reached. - """ - if interval < 2: - raise ValueError("N must be greater or equal 2.") - - def _fn(n, *args, **kwargs): - - if n > interval: - return end - - return start + (n - 1) * (end - start) / interval - - return _fn - -def create_mode_share_study(name: str, jar: str, config: str, - modes: Sequence[str], mode_share: Dict[str, float], - fixed_mode: str = "walk", - args: Union[str, Callable]="", jvm_args="", - initial_asc: Dict[str, float] = None, - transform_persons: Callable = None, - transform_trips: Callable = None, - chain_runs: Union[bool, int, Callable] = False, - lr: Callable[[int, str, float, Dict[str, float], optuna.Trial, optuna.Study], float] = None, - constraints: Dict[str, Callable] = None, - storage: optuna.storages.RDBStorage = None - ) -> Tuple[optuna.Study, Callable]: - """ Create or load an existing study for mode share calibration using asc values. - - This function returns the study and optimization objective as tuple. Which can be used like this: - study.optimize(obj, 10) - - :param name: name of the study - :param jar: path to executable jar file of the scenario - :param config: path to config file to run - :param modes: list of all relevant modes - :param mode_share: dict of target mode shares - :param fixed_mode: the fixed mode with asc=0 - :param args: additional arguments to the executable jar, can also be a callback function - :param jvm_args: additional jvm args - :param initial_asc: dict of initial asc values - :param transform_persons: callable to filter persons included in mode share - :param transform_trips: callable to modify trips included in mode share - :param chain_runs: automatically use the output plans of each run as input for the next, either True or number of iterations or callable - :param lr: learning rate schedule, will be called with (trial number, mode, asc update, mode_share, trial, study) - :param constraints: constraints for each mode, must return asc and will be called with original asc - :param storage: custom storage object to overwrite default sqlite backend - :return: tuple of study and optimization objective. - """ - - # Init with 0 - if initial_asc is None: - initial_asc = {} - for m in modes: - initial_asc[m] = 0 - - # Set some custom arguments to prevent known errors on NFS - if storage is None: - storage = optuna.storages.RDBStorage(url="sqlite:///%s.db" % name, - engine_kwargs={"connect_args": {"timeout": 100}, "isolation_level": "AUTOCOMMIT"}) - - # TODO: normalize target share for given modes - - study = optuna.create_study( - study_name=name, - storage=storage, - load_if_exists=True, - directions=["minimize"] * len(modes), - sampler=ASCSampler(mode_share, fixed_mode, initial_asc, lr, constraints) - ) - - study.set_user_attr("modes", modes) - study.set_user_attr("fixed_mode", fixed_mode) - - if not path.exists("params"): - makedirs("params") - - if not path.exists("runs"): - makedirs("runs") - - print("Running study with target shares:", mode_share) - - def f(trial): - - params_path = path.join("params", "run%d.yaml" % trial.number) - params = [] - - for mode in modes: - # preserve order - m = {"mode": mode} - - asc = trial.suggest_float(mode, sys.float_info.min, sys.float_info.max) - m["constant"] = asc - params.append(m) - - with open(params_path, "w") as f: - yaml.dump({"planCalcScore": {"scoringParameters": [{"modeParams": params}]}}, f, sort_keys=False) - - run_dir = "runs/%03d" % trial.number - - if os.path.exists(run_dir): - shutil.rmtree(run_dir) - - completed = _completed_trials(study) - - # Call args if necesarry - run_args = args(completed) if callable(args) else args - - cmd = "java %s -jar %s run --config %s --yaml %s --output %s --runId %03d %s" \ - % (jvm_args, jar, config, params_path, run_dir, trial.number, run_args) - - # Max fix extra whitespaces in args - cmd = cmd.strip() - - if chain_runs: - - out = None - for t in reversed(completed): - out = glob.glob("runs/%03d/*.output_plans.xml.gz" % t.number) - if out: - out = path.abspath(out[0]) - break - - if out: - last_chained = study.user_attrs.get("last_chained_run", None) - if (chain_runs is True or - (callable(chain_runs) and chain_runs(completed)) or - (type(chain_runs) == int and len(completed) % chain_runs == 0)): - cmd += " --config:plans.inputPlansFile=" + out - study.set_user_attr("last_chained_run", out) - - elif last_chained: - cmd += " --config:plans.inputPlansFile=" + last_chained - - else: - print("No output plans for chaining runs found.") - - # Extra whitespaces will break argument parsing - cmd = cmd.strip() - - print("Running cmd %s" % cmd) - - if os.name != 'nt': - cmd = cmd.split(" ") - - p = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) - try: - while p.poll() is None: - sleep(1) - - if p.returncode != 0: - raise Exception("Process returned with error code: %s" % p.returncode) - finally: - p.terminate() - - shares = analysis.calc_mode_share(run_dir, transform_persons=transform_persons, transform_trips=transform_trips) - - print("Obtained mode shares:", shares) - - for k, v in shares.items(): - trial.set_user_attr("%s_share" % k, v) - - res = [] - for mode in modes: - res.append(abs(mode_share[mode] - trial.user_attrs["%s_share" % mode])) - - return res - - return study, f - -if __name__ == "__main__": - import argparse - - parser = argparse.ArgumentParser(description="Calibration CLI") - parser.add_argument('file', nargs=1, type=str, help="Path to input db") - parser.add_argument("--name", type=str, default="calib", help="Calibration name") - parser.add_argument("--output", default=None, help="Output path") - args = parser.parse_args() - - study = optuna.load_study( - study_name=args.name, - storage="sqlite:///%s" % args.file[0], - ) - - if not args.output: - args.output = args.file[0] + ".csv" - - df = study_as_df(study) - df.to_csv(args.output, index=False) \ No newline at end of file diff --git a/matsim/calibration/__init__.py b/matsim/calibration/__init__.py new file mode 100644 index 0000000..77a4f47 --- /dev/null +++ b/matsim/calibration/__init__.py @@ -0,0 +1,242 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" Contains calibration related functions """ + +__all__ = ["create_calibration", "study_as_df", "ASCCalibrator", "ASCGroupCalibrator", "utils"] + +import glob +import os +import shutil +import subprocess +import sys +from os import path, makedirs +from time import sleep +from typing import Union, Sequence, Callable, Tuple + +import optuna +import yaml + +from . import utils +from .base import CalibratorBase, to_float +from .calib_asc import ASCCalibrator +from .calib_group_asc import ASCGroupCalibrator +from .utils import study_as_df +from .analysis import calc_mode_stats + + +class CalibrationSampler(optuna.samplers.BaseSampler): + """ Run calibrators on obtained mode shares """ + + def __init__(self, calibrators: Sequence[CalibratorBase]): + self.calibrators = {} + for c in calibrators: + if c.name not in self.calibrators: + self.calibrators[c.name] = c + else: + raise ValueError("Incompatible calibrators with same name occurred: %s (%s)" % (c.name, c)) + + def infer_relative_search_space(self, study, trial): + return {} + + def sample_relative(self, study, trial, search_space): + # study._storage.set_trial_system_attr(trial._trial_id, "search_space", self._search_space) + return {} + + def sample_independent(self, study, trial, param_name, param_distribution): + + prefix, _, param = param_name.partition("-") + c: CalibratorBase = self.calibrators[prefix] + + mode = param.rpartition("-")[2] if "-" in param else param + + completed = utils.completed_trials(study) + if len(completed) == 0: + initial = to_float(c.sample_initial(param)) + + if c.constraints is not None and mode in c.constraints: + initial = c.constraints[mode](mode, initial) + + return initial + + last = completed[-1] + last_param = last.params[param_name] + + step = to_float(c.update_step(param, last, completed)) + + rate = 1.0 + if c.lr is not None: + + rate = c.lr(float(len(completed) + 1), param_name, step, trial, study) + + # rate of None or 0 would be invalid + if not rate: + rate = 1 + + # numpy types need casting + rate = to_float(rate) + + # Need to use storage directly, frozen trial does not save attributes + study._storage.set_trial_user_attr(trial._trial_id, "%s_rate" % param_name, rate) + study._storage.set_trial_user_attr(trial._trial_id, "%s_step" % param_name, step) + + last_param += rate * step + + # Call constraint if present + if c.constraints is not None and mode in c.constraints: + last_param = c.constraints[mode](mode, last_param) + + return last_param + + +def create_calibration(name: str, calibrate: Union[CalibratorBase, Sequence[CalibratorBase]], + jar: Union[str, os.PathLike], config: Union[str, os.PathLike], + args: Union[str, Callable] = "", jvm_args="", + transform_persons: Callable = None, + transform_trips: Callable = None, + chain_runs: Union[bool, int, Callable] = False, + storage: optuna.storages.RDBStorage = None, + custom_cli: Callable = None, + debug: bool = False + ) -> Tuple[optuna.Study, Callable]: + """ Create or load an existing study for mode share calibration using asc values. + + This function returns the study and optimization objective as tuple. Which can be used like this: + study.optimize(obj, 10) + + :param name: name of the study + :param calibrate: (list) of calibrator instances + :param jar: path to executable jar file of the scenario + :param config: path to config file to run + :param args: additional arguments to the executable jar, can also be a callback function + :param jvm_args: additional jvm args + :param transform_persons: callable to filter persons included in mode share + :param transform_trips: callable to modify trips included in mode share + :param chain_runs: automatically use the output plans of each run as input for the next, either True or number of iterations or callable + :param storage: custom storage object to overwrite default sqlite backend + :param custom_cli: the scenario is not a matsim application and needs a different command line syntax + :param debug: enable debug output + :return: tuple of study and optimization objective. + """ + + # Init with 0 + if isinstance(calibrate, CalibratorBase): + calibrate = [calibrate] + + # Set some custom arguments to prevent known errors on NFS + if storage is None: + storage = optuna.storages.RDBStorage(url="sqlite:///%s.db" % name, + engine_kwargs={"connect_args": {"timeout": 100}, + "isolation_level": "AUTOCOMMIT"}) + + if not os.access(jar, os.R_OK): + raise ValueError("Can not access JAR File: %s" % jar) + + if not os.access(config, os.R_OK): + raise ValueError("Can not access config File: %s" % config) + + study = optuna.create_study( + study_name=name, + storage=storage, + load_if_exists=True, + directions=["minimize"] * sum(c.num_targets for c in calibrate), + sampler=CalibrationSampler(calibrate) + ) + + if not path.exists("params"): + makedirs("params") + + if not path.exists("runs"): + makedirs("runs") + + study.set_user_attr("calib", [c.name for c in calibrate]) + for c in calibrate: + c.init_study(study) + + def f(trial): + + params_path = path.join("params", "run%d.yaml" % trial.number) + params = {} + + for c in calibrate: + prefix = c.name + "-" + c.update_config(study, trial, prefix, params) + + with open(params_path, "w") as f: + yaml.dump(params, f, sort_keys=False) + + run_dir = "runs/%03d" % trial.number + + if os.path.exists(run_dir): + shutil.rmtree(run_dir) + + completed = utils.completed_trials(study) + + # Call args if necessary + run_args = args(completed) if callable(args) else args + + if custom_cli: + cmd = custom_cli(jvm_args, jar, config, params_path, run_dir, trial.number, run_args) + else: + cmd = "java %s -jar %s run --config %s --yaml %s --output %s --runId %03d %s" \ + % (jvm_args, jar, config, params_path, run_dir, trial.number, run_args) + + # Max fix extra whitespaces in args + cmd = cmd.strip() + + if chain_runs: + + out = None + for t in reversed(completed): + out = glob.glob("runs/%03d/*.output_plans.xml.gz" % t.number) + if out: + out = path.abspath(out[0]) + break + + if out: + last_chained = study.user_attrs.get("last_chained_run", None) + if (chain_runs is True or + (callable(chain_runs) and chain_runs(completed)) or + (type(chain_runs) == int and len(completed) % chain_runs == 0)): + cmd += " --config:plans.inputPlansFile=" + out + study.set_user_attr("last_chained_run", out) + + elif last_chained: + cmd += " --config:plans.inputPlansFile=" + last_chained + + else: + print("No output plans for chaining runs found.") + + # Extra whitespaces will break argument parsing + cmd = cmd.strip() + + print("Running cmd %s" % cmd) + + if os.name != 'nt': + cmd = cmd.split(" ") + + p = subprocess.Popen(cmd, + stdout=sys.stdout if debug else subprocess.DEVNULL, + stderr=sys.stderr if debug else subprocess.DEVNULL) + try: + while p.poll() is None: + sleep(1) + + if p.returncode != 0: + raise Exception("Process returned with error code: %s" % p.returncode) + finally: + p.terminate() + + mode_stats = calc_mode_stats(run_dir, + transform_persons=transform_persons, + transform_trips=transform_trips) + + trial.set_user_attr("mode_stats", mode_stats.to_dict(orient="tight")) + + err = [] + for c in calibrate: + e = c.calc_stats(trial, run_dir, transform_persons, transform_trips) + err.extend(to_float(f) for f in e) + + return err + + return study, f \ No newline at end of file diff --git a/matsim/calibration/__main__.py b/matsim/calibration/__main__.py new file mode 100644 index 0000000..576f745 --- /dev/null +++ b/matsim/calibration/__main__.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import optuna +import traceback + +from . import study_as_df + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser(prog="matsim-calibration", description="Calibration CLI") + parser.add_argument('file', nargs=1, type=str, help="Path to input db") + parser.add_argument("--name", type=str, default="calib", help="Calibration name") + parser.add_argument("--output", default=None, help="Output path") + args = parser.parse_args() + + study = optuna.load_study( + study_name=args.name, + storage="sqlite:///%s" % args.file[0], + ) + + if not args.output: + args.output = args.file[0] + ".csv" + + df = study_as_df(study) + df.to_csv(args.output, index=False) + + try: + from .plot import plot_study + plot_study(study) + + except ImportError: + print("Could not plot study.") + traceback.print_exc() + diff --git a/matsim/analysis.py b/matsim/calibration/analysis.py similarity index 64% rename from matsim/analysis.py rename to matsim/calibration/analysis.py index 17234db..dc82a4a 100644 --- a/matsim/analysis.py +++ b/matsim/calibration/analysis.py @@ -3,15 +3,72 @@ import glob -import pandas as pd -import numpy as np +from typing import Any, Tuple + import geopandas +import numpy as np +import pandas as pd + + +def calc_adjusted_mode_share(sim: pd.DataFrame, survey: pd.DataFrame, + count_var: str = "trips", dist_var: str = "dist_group") -> Tuple[Any, pd.DataFrame]: + """ This function can be used if the given input trip data has a different distance distribution than the survey data. + It will rescale the survey data to match simulated data, which allows to calculate an adjusted overall mode share. + + :param sim: data frame containing shares for distance group and modes + :param survey: data frame containing shares from survey data + :param count_var: name of column containing the number of trips or share in 'sim' + :param dist_var: name of the column holding the distance group information + :return: tuple of optimization result and adjusted mode share + """ + from scipy.optimize import minimize + + sagg = sim.groupby(dist_var).sum() + sagg['share'] = sagg[count_var] / np.sum(sagg[count_var]) + + # Rescale the distance groups of the survey data so that it matches the distance group distribution of the simulation + # The overall mode share after this adjustment will the resulting adjusted mode share + def f(x, result=False): + adj = survey.copy() + + for i, t in enumerate(x): + adj.loc[adj[dist_var] == sagg.index[i], "share"] *= t + + adj.share = adj.share / np.sum(adj.share) + + agg = adj.groupby(dist_var).sum() + + # Minimized difference between adjusted and simulated distribution + err = sum((sagg.share - agg.share)**2) + if result: + return adj -def read_trips_and_persons(run, transform_persons=None, transform_trips=None): + return err + + # One variable for each distance group + x0 = np.ones(len(sagg.index)) / len(sagg.index) + + # Sum should be less than one + cons = [{'type': 'ineq', 'fun': lambda x: 1 - sum(x)}] + bnds = tuple((0, 1) for x in x0) + + res = minimize(f, x0, method='SLSQP', bounds=bnds, constraints=cons) + + df = f(res.x, True) + + return res, df + + +def read_trips_and_persons(run, transform_persons=None, transform_trips=None) -> Tuple[pd.DataFrame, pd.DataFrame]: """ Read trips and persons from run directory """ - trips = glob.glob(run.rstrip("/") + "/*.output_trips.csv.gz")[0] + # Return input as output + # This allows to re-use input for the calc functions + if type(run) is tuple and len(run) == 2: + return run + + trips = glob.glob(run.rstrip("/") + "/*.output_trips.csv.gz")[0] persons = glob.glob(run.rstrip("/") + "/*.output_persons.csv.gz")[0] df = pd.read_csv(trips, sep=";", dtype={"person": "str"}) @@ -66,7 +123,7 @@ def calc_mode_share(run, transform_persons=None, transform_trips=None): def calc_mode_stats(run, attrs=[], dist_bins = [0, 1000, 2000, 5000, 10000, 20000, np.inf], dist_labels = ["0 - 1000", "1000 - 2000", "2000 - 5000", "5000 - 10000", "10000 - 20000", "20000+"], - transform_persons=None, transform_trips=None): + transform_persons=None, transform_trips=None) -> pd.DataFrame: """ Calculate detailed mode statistics """ trips, _ = read_trips_and_persons(run, transform_persons, transform_trips) @@ -76,19 +133,17 @@ def aggr(x): data = dict(n=len(x), mean_dist=np.average(x.traveled_distance)) return pd.Series(data=data) - aggr = trips.groupby(attrs + ["dist_group", "main_mode"]).apply(aggr) + aggr = trips.groupby(attrs + ["dist_group", "main_mode"], observed=True).apply(aggr) aggr["n"].fillna(0, inplace=True) aggr["share"] = aggr.n / aggr.n.sum() aggr["share"].fillna(0, inplace=True) - - return aggr -def calc_population_stats(run, attrs=[], transform_persons=None, transform_trips=None): +def calc_population_stats(run, attrs=[], transform_persons=None, transform_trips=None) -> pd.DataFrame: """ Calculate population statistics """ trips, persons = read_trips_and_persons(run, transform_persons, transform_trips) @@ -126,8 +181,11 @@ def summarize(x): return pd.Series(data=data) - - aggr = persons.groupby(attrs).apply(summarize) + if attrs: + aggr = persons.groupby(attrs).apply(summarize) + else: + aggr = summarize(persons) + aggr = aggr.to_frame(0).T aggr["population_share"] = aggr.n / aggr.n.sum() diff --git a/matsim/calibration/base.py b/matsim/calibration/base.py new file mode 100644 index 0000000..417c67e --- /dev/null +++ b/matsim/calibration/base.py @@ -0,0 +1,139 @@ +# -*- coding: utf-8 -*- + +import math +import os +from abc import ABC, abstractmethod +from typing import Union, Sequence, Dict, Tuple, Callable + +import optuna +import pandas as pd + +# Type alias for input variables +CalibrationInput = Union[str, os.PathLike, dict, pd.DataFrame] + + +def to_float(x): + return float(x.iloc[0]) if isinstance(x, pd.Series) else float(x) + + +def sanitize(x): + if 'main_mode' in x.columns and 'mode' not in x.columns: + x.rename(columns={'main_mode': 'mode'}, inplace=True) + + return x + + +class CalibratorBase(ABC): + """ Base class for calibrators. """ + + def __init__(self, + modes: Sequence[str], + initial: CalibrationInput, + target: CalibrationInput, + lr: Callable[[int, str, float, optuna.Trial, optuna.Study], float] = None, + constraints: Dict[str, Callable[[str, float], float]] = None): + """Abstract constructors for all calibrations. Usually the same parameters should be made available subclasses. + + :param modes: list of all relevant modes + :param initial: dict of initial asc values + :param target: target shares as csv or dict + :param lr: learning rate schedule, will be called with (trial number, mode, update, trial, study) + :param constraints: constraints for each mode, must return asc and will be called with original asc + """ + self.modes = modes + self.initial = sanitize(self.convert_input(initial)) + self.target = sanitize(self.convert_input(target)) + self.lr = lr + self.constraints = constraints + + @property + @abstractmethod + def name(self): + """ Short name of the calibrator. Will be used to differentiate attributes""" + + raise NotImplemented + + @property + def num_targets(self): + """ Number of targets/errors that will be returned in calc stats""" + return len(self.target) + + @abstractmethod + def init_study(self, study: optuna.Study): + """ Initialize study instance """ + raise NotImplemented + + @abstractmethod + def update_config(self, study: optuna.Study, trial: optuna.Trial, prefix: str, config: dict): + """ Calculate updated config for this trial. This method must interact with the trial using + `trial.suggest_float` and by using the given prefix . """ + raise NotImplemented + + @abstractmethod + def sample_initial(self, param: str) -> float: + """ Sample initial value for parameter """ + raise NotImplemented + + @abstractmethod + def update_step(self, param: str, last_trial: optuna.Trial, completed: Sequence[optuna.Trial]) -> float: + """ Return update step for param based on last trial """ + raise NotImplemented + + @abstractmethod + def calc_stats(self, trial: optuna.Trial, run_dir: str, + transform_persons: Callable = None, + transform_trips: Callable = None) -> Tuple[float]: + """ Calculate and store statistics for a trial. These usually needs to be accessed for calculating updates. + + :return: reported error metrics + """ + raise NotImplemented + + @classmethod + def convert_input(cls, arg) -> pd.DataFrame: + """ Method call to create target values from input argument """ + if isinstance(arg, pd.DataFrame): + return arg + if isinstance(arg, dict): + # Convert simple dict of modes and values + return pd.DataFrame(index=arg.keys(), data=arg.values(), columns=["value"]).rename_axis(index="mode") + + return pd.read_csv(arg) + + @staticmethod + def get_mode_params(config: dict, mode: str): + """ Retrieve the mode params from config object. Create if they don't exist yet. """ + if "planCalcScore" not in config: + config["planCalcScore"] = {} + + if "scoringParameters" not in config["planCalcScore"]: + config["planCalcScore"]["scoringParameters"] = [] + + ps = config["planCalcScore"]["scoringParameters"] + + for p in ps: + if "modeParams" in p: + for m in p["modeParams"]: + if m["mode"] == mode: + return m + + m = {"mode": mode} + p["modeParams"].append(m) + return m + + m = {"mode": mode} + ps.append({"modeParams": [m]}) + return m + + @staticmethod + def calc_asc_update(z_i, m_i, z_0, m_0) -> float: + """ Calculates the asc update for one step """ + # Update asc + # (1) Let z_i be the observed share of mode i. (real data, to be reproduced) + # (2) Run the simulation to convergence. Obtain simulated mode shares m_i. + # (3) Do nothing for mode 0. For all other modes: add [ln(z_i) - ln(m_i)] – [ln(z_0) - ln(m_0)] to its ASC. + # (4) Goto 2. + if m_i == 0: + return 0 + + return math.log(z_i) - math.log(m_i) - (math.log(z_0) - math.log(m_0)) diff --git a/matsim/calibration/calib_asc.py b/matsim/calibration/calib_asc.py new file mode 100644 index 0000000..d537272 --- /dev/null +++ b/matsim/calibration/calib_asc.py @@ -0,0 +1,83 @@ +# -*- coding: utf-8 -*- + +import sys +from typing import Sequence, Callable, Dict + +import optuna + +from .base import CalibratorBase, CalibrationInput, to_float +from .analysis import calc_mode_share + + +class ASCCalibrator(CalibratorBase): + """ Calibrates the alternative specific constant for desired modes """ + + def __init__(self, + modes: Sequence[str], + initial: CalibrationInput, + target: CalibrationInput, + fixed_mode: str = "walk", + lr: Callable[[int, str, float, optuna.Trial, optuna.Study], float] = None, + constraints: Dict[str, Callable[[str, float], float]] = None): + """Abstract constructors for all calibrations. Usually the same parameters should be made available subclasses. + + :param modes: list of all relevant modes + :param initial: dict of initial asc values + :param target: target shares as csv or dict + :param fixed_mode: the fixed mode with asc=0 + :param lr: learning rate schedule, will be called with (trial number, mode, update, trial, study) + :param constraints: constraints for each mode, must return asc and will be called with original asc + """ + super().__init__(modes, initial, target, lr, constraints) + + self.fixed_mode = fixed_mode + + self.target = self.target.rename(columns={"value": "share"}, errors="ignore") + self.target = self.target.groupby("mode").agg(share=("share", "sum")) + + @property + def name(self): + return "asc" + + def init_study(self, study: optuna.Trial): + study.set_user_attr("modes", self.modes) + study.set_user_attr("fixed_mode", self.fixed_mode) + + print("Running study with target:", self.target) + + def update_config(self, study: optuna.Study, trial: optuna.Trial, prefix: str, config: dict): + + # Update constants + for mode in self.modes: + m = self.get_mode_params(config, mode) + m["constant"] = trial.suggest_float(prefix + mode, sys.float_info.min, sys.float_info.max) + + def sample_initial(self, param: str) -> float: + if param in self.initial.index: + return self.initial.loc[param] + + return 0 + + def update_step(self, param: str, last_trial: optuna.Trial, completed: Sequence[optuna.Trial]) -> float: + + if param == self.fixed_mode: + return 0 + + step = self.calc_asc_update(self.target.loc[param].share, last_trial.user_attrs["%s_share" % param], + self.target.loc[self.fixed_mode].share, + last_trial.user_attrs["%s_share" % self.fixed_mode]) + + return step + + def calc_stats(self, trial: optuna.Trial, run_dir: str, + transform_persons: Callable = None, + transform_trips: Callable = None) -> Sequence[float]: + + shares = calc_mode_share(run_dir, transform_persons=transform_persons, transform_trips=transform_trips) + print("Obtained mode shares:", shares) + + for k, v in shares.items(): + trial.set_user_attr("%s_share" % k, v) + trial.set_user_attr("%s_mae" % k, to_float(abs(self.target.loc[k] - v))) + + return [to_float(abs(self.target.loc[mode] - trial.user_attrs["%s_share" % mode])) for mode in self.modes] diff --git a/matsim/calibration/calib_dist.py b/matsim/calibration/calib_dist.py new file mode 100644 index 0000000..44749ea --- /dev/null +++ b/matsim/calibration/calib_dist.py @@ -0,0 +1,43 @@ +# -*- coding: utf-8 -*- + +# TODO: collection of old code +# was just draft anyway, needs to be updated + +""" +m["marginalUtilityOfDistance_util_m"] = trial.suggest_float("dist_" + mode, sys.float_info.min, + sys.float_info.max) + + +def sample_dist_util(self, mode, last): + + df = pd.DataFrame.from_dict(last.user_attrs["mode_stats"], orient="tight") + target = self.target[self.target["mode"] == mode].reset_index(drop=True).copy() + + df = df.loc[target.dist_group].reset_index() + + # Trips distances shares over all modes + # Correction factor is calculated here + ref_dist = self.target.groupby("dist_group").agg(share=("share", "sum")) + sim_dist = df.groupby("dist_group").agg(share=("share", "sum")) + + correction = ref_dist.loc[target.dist_group] / sim_dist.loc[target.dist_group] + + df = df[df.main_mode == mode].reset_index(drop=True).copy() + + if "mean_dist" not in target.columns: + target["mean_dist"] = df.mean_dist + + df["correction"] = correction.values + + target.share = target.share / sum(target.share) + df.share = df.share / sum(df.share) + + real = (df.mean_dist * df.share * df.correction).sum() + target = (target.mean_dist * target.share).sum() + + # TODO: magnitude should depend on the asc + + # TODO: configurable parameter + return float(0.05 * (target - real) / (1000 * 1000)) + +""" \ No newline at end of file diff --git a/matsim/calibration/calib_group_asc.py b/matsim/calibration/calib_group_asc.py new file mode 100644 index 0000000..9b806c1 --- /dev/null +++ b/matsim/calibration/calib_group_asc.py @@ -0,0 +1,301 @@ +# -*- coding: utf-8 -*- + +import sys +from typing import Sequence, Callable, Dict, Literal +from functools import reduce +from collections import defaultdict + +import numpy as np +import optuna +import pandas as pd + +from .base import CalibratorBase, CalibrationInput +from .analysis import read_trips_and_persons + + +def get_sbb_params(config, group, value, mode): + """ Return parameter set for specific group """ + + if "SBBBehaviorGroups" not in config: + config["SBBBehaviorGroups"] = {} + + if "behaviorGroup" not in config["SBBBehaviorGroups"]: + config["SBBBehaviorGroups"]["behaviorGroup"] = [] + + groups = config["SBBBehaviorGroups"]["behaviorGroup"] + + for b_group in groups: + + if b_group["personAttribute"] == group: + + for v in b_group["personGroupAttributeValues"]: + if v["attributeValues"] == value: + + for m in v["absoluteModeCorrections"]: + if m == mode: + return m + + m = {"mode": mode} + v["absoluteModeCorrections"].append(m) + return m + + m = {"mode": mode} + b_group["personGroupAttributeValues"].append( + {"attributeValues": value, "absoluteModeCorrections": [m]} + ) + return m + + m = {"mode": mode} + groups.append({"personAttribute": group, + "personGroupAttributeValues": + [{"attributeValues": value, "absoluteModeCorrections": [m]}]}) + return m + + +def parse_group(p): + p = p.strip("[]") + return {x.split("=")[0]: x.split("=")[1] for x in p.split(",")} + + +class ASCGroupCalibrator(CalibratorBase): + """ Calibrates the alternative specific for specific subpopulations """ + + def __init__(self, + modes: Sequence[str], + initial: CalibrationInput, + target: CalibrationInput, + fixed_mode: str = "walk", + lr: Callable[[int, str, float, optuna.Trial, optuna.Study], float] = None, + constraints: Dict[str, Callable[[str, float], float]] = None, + calib_base: Literal['always', 'alternating', 'never'] = "always", + multi_groups: Sequence[str] = None, + corr_correction: float = 1, + config_format: Literal['default', 'sbb'] = "default"): + """Abstract constructors for all calibrations. Usually the same parameters should be made available subclasses. + + :param modes: list of all relevant modes + :param initial: dict of initial asc values + :param target: target shares as csv or dict + :param fixed_mode: the fixed mode with asc=0 + :param lr: learning rate schedule, will be called with (trial number, mode, update, trial, study) + :param constraints: constraints for each mode, must return asc and will be called with original asc + :param calib_base: how to calibrate the ungrouped base ASC, can be always, alternating (every 2nd iteration) or never + :param multi_groups: Groups that contain multiple values, that will be split by "," + :param corr_correction: factor to reduce learning rates for possibly correlated groups, 0 disables this correction + :param config_format: use SBBBehaviorGroups for the parameter config + """ + super().__init__(modes, initial, target, lr, constraints) + + self.fixed_mode = fixed_mode + self.multi_groups = set(multi_groups) + self.corr_correction = corr_correction + self.config_format = config_format + + if "mode" not in self.target.columns and "main_mode" not in self.target.columns: + raise ValueError("Target must have 'mode' or 'main_mode' column") + + if calib_base not in ("always", "alternating", "never"): + raise ValueError("calib_base must be one of 'always', 'alternating' or 'never'") + + if config_format not in ("default", "sbb"): + raise ValueError("config_format must be one of 'default' or 'sbb'") + + self.target = self.target.rename(columns={"value": "share", "main_mode": "mode"}, errors="ignore") + + if "share" not in self.target.columns: + raise ValueError("Target must have 'share' column") + + self.groups = [t for t in self.target.columns if t not in ('mode', 'share', 'asc')] + + if calib_base != "never": + self.base = self.get_group(self.target, None) + self.base = self.base.groupby("mode").agg(target=("share", "sum")) + self.alternate_base = calib_base == "alternating" + + if len(self.base) == 0: + raise ValueError("Target must contain base without any attributes") + else: + self.base = None + self.alternate_base = False + + def get_group(self, df, groups: dict = None, use_multi=False): + """ Get data of one group""" + # return result for empty groups + if groups is None: + idx = reduce(lambda x, y: x & y, [pd.isna(df[g]) for g in self.groups]) + return df[idx] + + if use_multi: + idx = [] + for g, v in groups.items(): + if g in self.multi_groups: + vs = v.split(",") + idx.append(df[g].isin(vs)) + else: + idx.append(df[g] == v) + + idx = reduce(lambda x, y: x & y, idx) + else: + idx = reduce(lambda x, y: x & y, [df[g] == v for g, v in groups.items()]) + + return df[idx] + + @property + def name(self): + return "asc" + + @property + def num_targets(self): + return len(self.base) + + def init_study(self, study: optuna.Trial): + study.set_user_attr("modes", self.modes) + study.set_user_attr("groups", self.groups) + study.set_user_attr("fixed_mode", self.fixed_mode) + + print("Calibrating groups:", self.groups) + print("Running with base target:", self.base) + + def update_config(self, study: optuna.Study, trial: optuna.Trial, prefix: str, config: dict): + + # Base ascs for each mode + base = defaultdict(lambda: 0) + + if self.base is not None: + for mode in self.modes: + m = self.get_mode_params(config, mode) + m["constant"] = trial.suggest_float(prefix + mode, sys.float_info.min, sys.float_info.max) + base[mode] = m["constant"] + + step = 1 + # If all groups were fully correlated, update step needs to be divided by number of groups + if self.corr_correction > 0: + step = 1 / (len(self.groups) * self.corr_correction) + + # Grouped ascs + for g in self.groups: + for v in set(self.target[g]): + if pd.isna(v): + continue + + attr = (str(g) + "=" + str(v)) + for mode in self.modes: + # Update constants + if self.config_format == "sbb": + param = "[%s]-%s" % (attr, mode) + p = trial.suggest_float(prefix + param, sys.float_info.min, sys.float_info.max) + + # don't write certain values + if p == 0 and mode == self.fixed_mode: + continue + + m = get_sbb_params(config, g, v, mode) + + delta = (p - base[mode]) * step + m["deltaConstant"] = delta + trial.set_user_attr("%s_delta" % param, delta) + else: + raise ValueError("Currently only ssb config format is supported") + + def sample_initial(self, param: str) -> float: + attr, _, mode = param.rpartition("-") + + if mode in self.initial.index: + return self.initial.loc[mode] + + # TODO: Load from column with same format as input + + return 0 + + def update_step(self, param: str, last_trial: optuna.Trial, completed: Sequence[optuna.Trial]) -> float: + + if param == self.fixed_mode: + return 0 + + # Base mode shares + if not param.startswith("["): + # No update every 2nd iteration + if self.alternate_base and len(completed) % 2 == 1: + return 0 + + return self.calc_asc_update(self.base.loc[param].target, last_trial.user_attrs["%s_share" % param], + self.base.loc[self.fixed_mode].target, + last_trial.user_attrs["%s_share" % self.fixed_mode]) + + else: + p, _, mode = param.rpartition("-") + + # No update every other iteration + if self.alternate_base and len(completed) % 2 == 0: + return 0 + + if mode == self.fixed_mode: + return 0 + + t = self.get_group(self.target, parse_group(p)).set_index("mode") + + # TODO: the previous applied correction might be considered here as well + + return self.calc_asc_update(t.loc[mode].share, last_trial.user_attrs["%s_share" % param], + t.loc[self.fixed_mode].share, + last_trial.user_attrs["%s-%s_share" % (p, self.fixed_mode)]) + + def calc_stats(self, trial: optuna.Trial, run_dir: str, + transform_persons: Callable = None, + transform_trips: Callable = None) -> Sequence[float]: + + trips, persons = read_trips_and_persons(run_dir, + transform_persons=transform_persons, + transform_trips=transform_trips) + + if self.base is not None: + base_share = trips.groupby("main_mode").count()["trip_number"] / len(trips) + base_share = self.base.merge(base_share, left_index=True, right_index=True).rename( + columns={"trip_number": "share"}) + + base_share["mae"] = np.abs(base_share.share - base_share.target) + + for kv in base_share.itertuples(): + trial.set_user_attr("%s_share" % kv.Index, kv.share) + trial.set_user_attr("%s_mae" % kv.Index, kv.mae) + + print("Obtained base shares:") + print(base_share) + + errs = [] + + for g in self.groups: + for v in set(self.target[g]): + if pd.isna(v): + continue + + sub_trips = self.get_group(trips, {g: v}, g in self.multi_groups) + sub_share = sub_trips.groupby("main_mode").count()["trip_number"] / len(sub_trips) + sub_share.rename("share", inplace=True) + + if len(sub_trips) == 0: + print("Empty subgroup %s=%s" % (g, v), file=sys.stderr) + + target = self.get_group(self.target, {g: v}).rename(columns={"share": "target"}) + target = target.merge(sub_share, how="outer", left_on="mode", right_index=True) + target.share.fillna(0, inplace=True) + + target["mae"] = np.abs(target.share - target.target) + + attr = (str(g) + "=" + str(v)) + for kv in target.itertuples(): + trial.set_user_attr("[%s]-%s_share" % (attr, kv.mode), kv.share) + trial.set_user_attr("[%s]-%s_mae" % (attr, kv.mode), kv.mae) + + errs.append(target) + + errs = pd.concat(errs) + print("Grouped sum of errors:") + for g in self.groups: + sub_err = errs.groupby(g).agg(sum_mae=("mae", "sum")) + print(sub_err) + + for e in sub_err.itertuples(): + trial.set_user_attr("%s=%s_sum_mae" % (g, e.Index), e.sum_mae) + + return base_share.mae.to_numpy() diff --git a/matsim/calibration/constraints.py b/matsim/calibration/constraints.py new file mode 100644 index 0000000..1f16d82 --- /dev/null +++ b/matsim/calibration/constraints.py @@ -0,0 +1,10 @@ +# -*- coding: utf-8 -*- + +def positive_constraint(mode, val): + """ Ensures parameter are positive """ + return max(0, val) + + +def negative_constraint(mode, val): + """ Ensure parameter are negative """ + return min(0, val) diff --git a/matsim/calibration/plot.py b/matsim/calibration/plot.py new file mode 100644 index 0000000..6892837 --- /dev/null +++ b/matsim/calibration/plot.py @@ -0,0 +1,57 @@ +# -*- coding: utf-8 -*- + +import re + +import pandas as pd +import seaborn as sns + +from .utils import study_as_df + + +def plot_study(study): + """ Create convergence plots for a study """ + + modes = study.user_attrs["modes"] + groups = study.user_attrs.get("groups", None) + + df = study_as_df(study) + + df_all = df[["trial"] + [x + "_mae" for x in modes]] + + df_all = pd.melt(df_all, "trial", var_name="main_mode", value_name="mae") + df_all.main_mode = df_all.main_mode.str.removesuffix("_mae") + + ax = sns.lineplot(x="trial", y="mae", hue="main_mode", hue_order=modes, data=df_all) + fig = ax.get_figure() + + fig.savefig("mae_all.png") + + if groups: + + for g in groups: + + p = re.compile(r"\[%s=(.+)\]-(.+)_mae" % g) + + columns = df.columns[df.columns.str.startswith("[" + g) & df.columns.str.endswith("_mae")] + vars = [p.search(x) for x in columns] + + df_g = df[["trial"] + list(columns)] + + data = [] + + # convert wide to long + for idx, r in df_g.iterrows(): + for c, v in zip(columns, vars): + data.append({ + "trial": r["trial"], + "mae": r[c], + g: v[1], + "main_mode": v[2] + }) + + df_g = pd.DataFrame(data) + + grid = sns.FacetGrid(df_g, col=g) + grid.map_dataframe(sns.lineplot, x="trial", y="mae", hue="main_mode", hue_order=modes) + + grid.figure.savefig("mae_%s.png" % g) diff --git a/matsim/calibration/utils.py b/matsim/calibration/utils.py new file mode 100644 index 0000000..9594f28 --- /dev/null +++ b/matsim/calibration/utils.py @@ -0,0 +1,90 @@ +# -*- coding: utf-8 -*- + +import numpy as np +import pandas as pd + +from optuna.trial import TrialState + + +def study_as_df(study): + """ Convert study to dataframe """ + completed = completed_trials(study) + + # modes = study.user_attrs["modes"] + # fixed_mode = study.user_attrs["fixed_mode"] + + data = [] + + for i, trial in enumerate(completed): + + entry = { + "trial": i, + "start": trial.datetime_start, + "duration": trial.duration, + } + + for k, v in trial.user_attrs.items(): + if type(v) in (float, int): + entry[k] = v + + data.append(entry) + + return pd.DataFrame(data) + + +def default_chain_scheduler(completed): + """ Default function to determine when to chain runs. """ + + n = len(completed) + + if n <= 6: + return n % 2 == 0 + + if n <= 15: + return n % 5 == 0 + + if n <= 50: + return n % 10 == 0 + + return False + + +def linear_scheduler(start=0.6, end=1, interval=3): + """ Creates a lr scheduler that will interpolate linearly from start to end over the first n iterations. + + :param start: Initial learning rate. + :param end: Final learning rate to reach. + :param interval: Number of runs until end rate should be reached. + """ + if interval < 2: + raise ValueError("N must be greater or equal 2.") + + def _fn(n, *args, **kwargs): + + if n > interval: + return end + + return start + (n - 1) * (end - start) / interval + + return _fn + + +def completed_trials(study): + completed = filter(lambda s: s.state == TrialState.COMPLETE, study.trials) + return sorted(completed, key=lambda s: s.number) + + +def same_sign(x): + x = x.to_numpy() + return np.all(x >= 0) if x[0] >= 0 else np.all(x < 0) + + +def cli_oldstyle(yaml_arg="params"): + """ Produces CLI for non matsim application scenarios """ + + def _f(jvm_args, jar, config, params_path, run_dir, trial_number, run_args): + return "java %s -jar %s %s --config:controler.outputDirectory %s --config:controler.runId %03d --%s %s %s" % ( + jvm_args, jar, config, run_dir, trial_number, yaml_arg, params_path, run_args + ) + + return _f diff --git a/tests/test_calibration.py b/tests/test_calibration.py index 4465b7e..570541a 100644 --- a/tests/test_calibration.py +++ b/tests/test_calibration.py @@ -1,16 +1,11 @@ -import gzip -import pathlib - -import pytest - import numpy as np -from matsim.calibration import ASCSampler +from matsim.calibration import ASCCalibrator -def test_asc_sampler(): +def test_asc_sampler(): # variables are not needed for this test - sampler = ASCSampler(None, None, None, None, None) + sampler = ASCCalibrator([], {}, {}) np.random.seed(0) @@ -20,7 +15,6 @@ def test_asc_sampler(): # real shares (target) z = [0.1, 0.5, 0.3, 0.1] - # 100 iterations for it in range(200): modes = np.argmax(utils + ascs + np.random.normal(size=(1000, 4)), axis=1) @@ -29,7 +23,7 @@ def test_asc_sampler(): share = counts / np.sum(counts) for i in range(1, len(share)): - ascs[i] += sampler.calc_update(z[i], share[i], z[0], share[0]) + ascs[i] += sampler.calc_asc_update(z[i], share[i], z[0], share[0]) err = np.sum(np.abs(z - share)) @@ -37,5 +31,6 @@ def test_asc_sampler(): assert err < 0.12 + if __name__ == "__main__": - test_asc_sampler() \ No newline at end of file + test_asc_sampler()