diff --git a/examples/15_Advanced_Choice.ipynb b/examples/15_Advanced_Choice.ipynb
new file mode 100644
index 00000000..7bd413c7
--- /dev/null
+++ b/examples/15_Advanced_Choice.ipynb
@@ -0,0 +1,522 @@
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "id": "e225bed3",
+ "metadata": {},
+ "source": [
+ "# Introduction"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "eaa3f397",
+ "metadata": {},
+ "source": [
+ "This notebook applies a simple location and mode choice model to a PAM population."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "d8729af8",
+ "metadata": {},
+ "source": [
+ "The `pam.planner.choice.ChoiceMNL` class allows the user to apply an MNL specification for selecting the location of activities and the mode for accessing them, given person characteristics, network conditions and/or zone attraction data.\n",
+ "\n",
+ "The typical workflow goes as follows:\n",
+ "\n",
+ "```\n",
+ "choice_model = ChoiceMNL(population, od, zones) # initialize the model and point to the data objects \n",
+ "choice_model.configure(u, scope) # configure the model by specifying a utility function and the scope of application.\n",
+ "choice_model.apply() # apply the model and update the population with the results.\n",
+ "\n",
+ "```"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "8a484a5b",
+ "metadata": {},
+ "source": [
+ "# Dependencies"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 1,
+ "id": "82e63bf7",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "import pandas as pd\n",
+ "from pam.read import read_matsim\n",
+ "from pam.planner import choice_location as choice\n",
+ "from pam.operations.cropping import link_population\n",
+ "from pam.planner.od import OD, Labels, ODMatrix, ODFactory\n",
+ "import random\n",
+ "import numpy as np\n",
+ "import os\n",
+ "from prettytable import PrettyTable\n",
+ "\n",
+ "import logging\n",
+ "logging.basicConfig(level=logging.DEBUG)\n",
+ "random.seed(0)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 2,
+ "id": "4154e47e",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "%load_ext autoreload\n",
+ "%autoreload 2"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "d863cc2c",
+ "metadata": {},
+ "source": [
+ "# Data"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "6d13dd57",
+ "metadata": {},
+ "source": [
+ "We read an example population, and set the location of all activities to zone `a`:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 3,
+ "id": "9542e3c7",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "population = read_matsim(os.path.join('..', 'tests', 'test_data', 'test_matsim_plansv12.xml'))\n",
+ "link_population(population)\n",
+ "for hid, pid, person in population.people():\n",
+ " for act in person.activities:\n",
+ " act.location.area = 'a' "
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 4,
+ "id": "e1163184",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "Work locations and travel modes:\n",
+ "+--------+-----+----------+------+\n",
+ "| pid | seq | location | mode |\n",
+ "+--------+-----+----------+------+\n",
+ "| chris | 1 | a | car |\n",
+ "| fatema | 1 | a | bike |\n",
+ "| fred | 3 | a | walk |\n",
+ "| gerry | 3 | a | walk |\n",
+ "| nick | 1 | a | car |\n",
+ "+--------+-----+----------+------+\n"
+ ]
+ }
+ ],
+ "source": [
+ "def print_activity_locs(population, act_scope='work'):\n",
+ " summary = PrettyTable(['pid', 'seq', 'location', 'mode'])\n",
+ " for hid, pid, person in population.people():\n",
+ " for seq, act in enumerate(person.plan.activities):\n",
+ " if (act.act==act_scope) or (act_scope=='all'):\n",
+ " trmode = act.previous.mode if act.previous is not None else 'NA'\n",
+ " summary.add_row([pid, seq, act.location.area, trmode])\n",
+ " \n",
+ " print(summary)\n",
+ "\n",
+ "print('Work locations and travel modes:')\n",
+ "print_activity_locs(population, act_scope='work')"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "63e93b1c",
+ "metadata": {},
+ "source": [
+ "Our `zones` dataset includes destination attraction data, for example the number of jobs or schools in each likely destination zone:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 5,
+ "id": "3081485c",
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/html": [
+ "
+ "\n",
+ "
+ " \n",
+ " \n",
+ " | \n",
+ " jobs | \n",
+ " schools | \n",
+ "
+ " \n",
+ " zone | \n",
+ " | \n",
+ " | \n",
+ "
+ " \n",
+ " \n",
+ " \n",
+ " a | \n",
+ " 100 | \n",
+ " 3 | \n",
+ "
+ " \n",
+ " b | \n",
+ " 200 | \n",
+ " 1 | \n",
+ "
+ " \n",
+ "
+ "
+ ],
+ "text/plain": [
+ " jobs schools\n",
+ "zone \n",
+ "a 100 3\n",
+ "b 200 1"
+ ]
+ },
+ "execution_count": 5,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "data_zones = pd.DataFrame(\n",
+ " {\n",
+ " 'zone': ['a', 'b'],\n",
+ " 'jobs': [100, 200],\n",
+ " 'schools': [3, 1]\n",
+ " }\n",
+ ").set_index('zone')\n",
+ "data_zones"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "1f6c39ea",
+ "metadata": {},
+ "source": [
+ "The `od` object holds origin-destination data, for example travel time and travel distance between each origin and destination, for each travel mode:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 6,
+ "id": "15027351",
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "Origin-destination dataset \n",
+ "--------------------------------------------------\n",
+ "Labels(vars=['time', 'distance'], origin_zones=('a', 'b'), destination_zones=('a', 'b'), mode=['car', 'bus'])\n",
+ "--------------------------------------------------\n",
+ "time - car:\n",
+ "[[20. 40.]\n",
+ " [40. 20.]]\n",
+ "--------------------------------------------------\n",
+ "time - bus:\n",
+ "[[30. 45.]\n",
+ " [45. 30.]]\n",
+ "--------------------------------------------------\n",
+ "distance - car:\n",
+ "[[5. 8.]\n",
+ " [8. 5.]]\n",
+ "--------------------------------------------------\n",
+ "distance - bus:\n",
+ "[[5. 9.]\n",
+ " [9. 5.]]\n",
+ "--------------------------------------------------"
+ ]
+ },
+ "execution_count": 6,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "zone_labels = ('a', 'b')\n",
+ "od = ODFactory.from_matrices([\n",
+ " ODMatrix('time', 'car', zone_labels, zone_labels, np.array([[20, 40], [40, 20]])),\n",
+ " ODMatrix('time', 'bus', zone_labels, zone_labels, np.array([[30, 45], [45, 30]])),\n",
+ " ODMatrix('distance', 'car', zone_labels, zone_labels, np.array([[5, 8], [8, 5]])),\n",
+ " ODMatrix('distance', 'bus', zone_labels, zone_labels, np.array([[5, 9], [9, 5]])) \n",
+ "])\n",
+ "od"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "55081a23",
+ "metadata": {},
+ "source": [
+ "The dimensions of the `od` object are always (in order): `variables`, `origin zone`, `destination zone`, and `mode`. It can be sliced using the respective labels under `od.labels`, for example:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 7,
+ "id": "243511f7",
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "45.0"
+ ]
+ },
+ "execution_count": 7,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "od['time', 'a', 'b', 'bus']"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "c27f75b6",
+ "metadata": {},
+ "source": [
+ "# Choice model"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 8,
+ "id": "760e9d2d",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "INFO:pam.planner.choice_location:Updated model configuration\n",
+ "INFO:pam.planner.choice_location:ChoiceConfiguration(u=None, scope=None, func_probabilities=, func_sampling=)\n"
+ ]
+ }
+ ],
+ "source": [
+ "planner = choice.ChoiceMNL(population, od, data_zones)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "0373eaeb",
+ "metadata": {},
+ "source": [
+ "We configure the model by specifying:\n",
+ "* the scope of the model. For example, work activities.\n",
+ "* the utility formulation of each alternative.\n",
+ "\n",
+ "Both settings are defined as strings. The stings may comprise mathematical operators, coefficients, planner data objects (`od` / `zones`), and/or PAM population objects (`person`/ `act`). \n",
+ "\n",
+ "Coefficients can be passed either as a number, or as a list, with each element in the list corresponding to one of the modes in the `od` object."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 9,
+ "id": "f368c9df",
+ "metadata": {
+ "scrolled": false
+ },
+ "outputs": [
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "INFO:pam.planner.choice_location:Updated model configuration\n",
+ "INFO:pam.planner.choice_location:ChoiceConfiguration(u=\"[0,-1]+(np.array([0,2])*(person.attributes['subpopulation']=='poor'))+([-0.05,-0.07]*od['time',person.home.area])+(0.4*np.log(zones['jobs']))\\n\", scope=\"act.act=='work'\", func_probabilities=, func_sampling=)\n"
+ ]
+ }
+ ],
+ "source": [
+ "scope = \"act.act=='work'\"\n",
+ "asc = [0, -1] # one value for each mode, 0->car, -1->\n",
+ "asc_shift_poor = [0, 2] # one value for each mode\n",
+ "beta_time = [-0.05, -0.07] # one value for each mode\n",
+ "beta_zones = 0.4\n",
+ "u = f\"\"\" \\\n",
+ " {asc} + \\\n",
+ " (np.array({asc_shift_poor}) * (person.attributes['subpopulation']=='poor')) + \\\n",
+ " ({beta_time} * od['time', person.home.area]) + \\\n",
+ " ({beta_zones} * np.log(zones['jobs']))\n",
+ "\"\"\"\n",
+ "\n",
+ "planner.configure(u=u, scope=scope)\n",
+ "# print('Utility specification: \\n', planner.u)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "0f74e719",
+ "metadata": {},
+ "source": [
+ "The `.get_choice_set()` provides with with the utilities of each alternative, as perceived by each agent."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 10,
+ "id": "48fc5154",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "Activities in scope: \n",
+ " [ChoiceIdx(pid='chris', hid='chris', seq=1, act=), ChoiceIdx(pid='fatema', hid='fatema', seq=1, act=), ChoiceIdx(pid='fred', hid='fred', seq=3, act=), ChoiceIdx(pid='gerry', hid='gerry', seq=3, act=), ChoiceIdx(pid='nick', hid='nick', seq=1, act=)]\n",
+ "\n",
+ "Alternatives: \n",
+ " [ChoiceLabel(destination='a', mode='car'), ChoiceLabel(destination='a', mode='bus'), ChoiceLabel(destination='b', mode='car'), ChoiceLabel(destination='b', mode='bus')]\n",
+ "\n",
+ "Choice set utilities: \n",
+ " [[ 0.84206807 -1.25793193 0.11932695 -2.03067305]\n",
+ " [ 0.84206807 0.74206807 0.11932695 -0.03067305]\n",
+ " [ 0.84206807 0.74206807 0.11932695 -0.03067305]\n",
+ " [ 0.84206807 0.74206807 0.11932695 -0.03067305]\n",
+ " [ 0.84206807 -1.25793193 0.11932695 -2.03067305]]\n"
+ ]
+ }
+ ],
+ "source": [
+ "choice_set = planner.get_choice_set()\n",
+ "print('Activities in scope: \\n', choice_set.idxs)\n",
+ "print('\\nAlternatives: \\n', choice_set.choice_labels)\n",
+ "print('\\nChoice set utilities: \\n', choice_set.u_choices)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "2befcd05",
+ "metadata": {},
+ "source": [
+ "The `.apply()` method samples from the alternatives, and updates the location and mode of each activity accordingly:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 11,
+ "id": "e26b40d1",
+ "metadata": {
+ "scrolled": false
+ },
+ "outputs": [
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "INFO:pam.planner.choice_location:Applying choice model...\n",
+ "INFO:pam.planner.choice_location:Configuration: \n",
+ "ChoiceConfiguration(u=\"[0,-1]+(np.array([0,2])*(person.attributes['subpopulation']=='poor'))+([-0.05,-0.07]*od['time',person.home.area])+(0.4*np.log(zones['jobs']))\\n\", scope=\"act.act=='work'\", func_probabilities=, func_sampling=)\n",
+ "INFO:pam.planner.choice_location:Choice model application complete.\n"
+ ]
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "Sampled choices: \n",
+ " [ChoiceLabel(destination='b', mode='car'), ChoiceLabel(destination='b', mode='car'), ChoiceLabel(destination='a', mode='bus'), ChoiceLabel(destination='a', mode='car'), ChoiceLabel(destination='a', mode='car')]\n"
+ ]
+ }
+ ],
+ "source": [
+ "planner.apply()\n",
+ "print('Sampled choices: \\n', planner._selections.selections)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "01fb7d67",
+ "metadata": {},
+ "source": [
+ "The population's activity locations and travel modes have now been updated accordingly:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 12,
+ "id": "4bfa1fee",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+--------+-----+----------+------+\n",
+ "| pid | seq | location | mode |\n",
+ "+--------+-----+----------+------+\n",
+ "| chris | 1 | b | car |\n",
+ "| fatema | 1 | b | car |\n",
+ "| fred | 3 | a | bus |\n",
+ "| gerry | 3 | a | car |\n",
+ "| nick | 1 | a | car |\n",
+ "+--------+-----+----------+------+\n"
+ ]
+ }
+ ],
+ "source": [
+ "print_activity_locs(planner.population)"
+ ]
+ }
+ ],
+ "metadata": {
+ "kernelspec": {
+ "display_name": "Python 3 (ipykernel)",
+ "language": "python",
+ "name": "python3"
+ },
+ "language_info": {
+ "codemirror_mode": {
+ "name": "ipython",
+ "version": 3
+ },
+ "file_extension": ".py",
+ "mimetype": "text/x-python",
+ "name": "python",
+ "nbconvert_exporter": "python",
+ "pygments_lexer": "ipython3",
+ "version": "3.8.10"
+ }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 5
diff --git a/pam/planner/choice_location.py b/pam/planner/choice_location.py
new file mode 100644
index 00000000..f91649fc
--- /dev/null
+++ b/pam/planner/choice_location.py
@@ -0,0 +1,264 @@
+Location and mode choice models for activity modelling
+from copy import deepcopy
+from dataclasses import dataclass
+import itertools
+import logging
+from typing import Optional, List, NamedTuple, Callable, Union
+import numpy as np
+import pandas as pd
+from pam.activity import Activity, Leg
+from pam.core import Population
+from pam.operations.cropping import link_population
+from pam.planner.od import OD
+from pam.planner.zones import Zones
+from pam.planner.utils_planner import calculate_mnl_probabilities, sample_weighted, \
+ apply_mode_to_home_chain
+class ChoiceLabel(NamedTuple):
+ """ Destination and mode choice labels of a selected option """
+ destination: str
+ mode: str
+class ChoiceIdx(NamedTuple):
+ """ Choice set index """
+ pid: str
+ hid: str
+ seq: int
+ act: Activity
+class ChoiceSet(NamedTuple):
+ """ MNL Choice set """
+ idxs: List[ChoiceIdx]
+ u_choices: np.array
+ choice_labels: List[ChoiceLabel]
+class SelectionSet:
+ """ Calculate probabilities and select alternative """
+ choice_set: ChoiceSet
+ func_probabilities: Callable
+ func_sampling: Optional[Callable] = None
+ _selections = None
+ @property
+ def probabilities(self) -> np.array:
+ """
+ Probabilities for each alternative.
+ """
+ return np.apply_along_axis(
+ func1d=self.func_probabilities,
+ axis=1,
+ arr=self.choice_set.u_choices
+ )
+ def sample(self) -> List:
+ """
+ Sample from a set of alternative options.
+ """
+ sampled = np.apply_along_axis(
+ func1d=self.func_sampling,
+ axis=1,
+ arr=self.probabilities
+ )
+ sampled_labels = [self.choice_set.choice_labels[x] for x in sampled]
+ self._selections = sampled_labels
+ return sampled_labels
+ @property
+ def selections(self) -> List[ChoiceLabel]:
+ if self._selections is None:
+ self.sample()
+ return self._selections
+class ChoiceConfiguration:
+ """
+ :param u: The utility function specification, defined as a string.
+ The string may point to household, person, act, leg,
+ od, or zone data.
+ It can also include values and/or mathematical operations.
+ Parameters may be passed as single values, or as lists
+ (with each element in the list corresponding to one of the modes in the OD object)
+ For example: u='-[0,1] - (2 * od['time']) - (od['time'] * person.attributes['age']>60)
+ :param scope: The scope of the function (for example, work activities).
+ :param func_probabilities: The function for calculating the probability of each alternative
+ :param func_sampling: The function for sampling across alternatives, ie softmax
+ """
+ u: Optional[str] = None
+ scope: Optional[str] = None
+ func_probabilities: Optional[Callable] = None
+ func_sampling: Optional[Callable] = None
+ def validate(self, vars: List[str]) -> None:
+ """
+ Return an error if a value has not been set
+ """
+ for var in vars:
+ if getattr(self, var) is None:
+ raise ValueError(f'Setting {var} has not been set yet')
+class ChoiceModel:
+ def __init__(
+ self,
+ population: Population,
+ od: OD,
+ zones: Union[pd.DataFrame, Zones]
+ ) -> None:
+ """
+ Choice model interface.
+ :param population: A PAM population.
+ :param od: An object holding origin-destination.
+ :param zones: Zone-level data.
+ """
+ self.logger = logging.getLogger(__name__)
+ self.population = population
+ link_population(self.population)
+ self.od = od
+ self.zones = self.parse_zone_data(zones)
+ self.zones.data = self.zones.data.loc[list(od.labels.destination_zones)]
+ self.configuration = ChoiceConfiguration()
+ self._selections = None
+ @staticmethod
+ def parse_zone_data(zones: Union[pd.DataFrame, Zones]) -> Zones:
+ if isinstance(zones, Zones):
+ return deepcopy(zones)
+ elif isinstance(zones, pd.DataFrame):
+ return Zones(data=zones.copy())
+ def configure(self, **kwargs) -> None:
+ """
+ Specify the model.
+ :Keyword Arguments: Parameters of the ChoiceConfiguration class.
+ """
+ for k, v in kwargs.items():
+ if type(v) == str:
+ v = v.replace(' ', '')
+ setattr(self.configuration, k, v)
+ self.logger.info('Updated model configuration')
+ self.logger.info(self.configuration)
+ def apply(self, apply_location=True, apply_mode=True, once_per_agent=True,
+ apply_mode_to='chain'):
+ """
+ Apply the choice model to the PAM population,
+ updating the activity locations and mode choices in scope.
+ :param apply_location: Whether to update activities' location
+ :param apply_mode: Whether to update travel modes
+ :param once_per_agent: If True, the same selected option
+ is applied to all activities within scope of an agent.
+ :param apply_mode_to: `chain` or `previous_leg`:
+ Whether to apply the mode to the entire trip chain
+ that contains the activity,
+ or the leg preceding the activity.
+ """
+ self.logger.info('Applying choice model...')
+ self.logger.info(f'Configuration: \n{self.configuration}')
+ pid = None
+ destination = None
+ trmode = None
+ # update location and mode
+ for idx, selection in zip(self.selections.choice_set.idxs, self.selections.selections):
+ if not (once_per_agent and (pid == idx.pid)):
+ destination = selection.destination
+ trmode = selection.mode
+ pid = idx.pid
+ act = idx.act
+ if apply_location:
+ act.location.area = destination
+ if apply_mode and (act.previous is not None):
+ if apply_mode_to == 'chain':
+ apply_mode_to_home_chain(act, trmode)
+ elif apply_mode_to == 'previous_leg':
+ act.previous.mode = trmode
+ else:
+ raise ValueError(f'Invalid option {apply_mode_to}')
+ self.logger.info('Choice model application complete.')
+ def get_choice_set(self) -> ChoiceSet:
+ """
+ Construct an agent's choice set for each activity/leg within scope.
+ """
+ self.configuration.validate(['u', 'scope'])
+ od = self.od
+ zones = self.zones
+ u = self.configuration.u
+ scope = self.configuration.scope
+ idxs = []
+ u_choices = []
+ choice_labels = list(itertools.product(
+ od.labels.destination_zones,
+ od.labels.mode
+ ))
+ choice_labels = [ChoiceLabel(*x) for x in choice_labels]
+ # iterate across activities
+ for hid, hh in self.population:
+ for pid, person in hh:
+ for i, act in enumerate(person.activities):
+ if eval(scope):
+ idx_act = ChoiceIdx(
+ pid=pid,
+ hid=hid,
+ seq=i,
+ act=act
+ )
+ # calculate utilities for each alternative
+ u_act = eval(u)
+ # flatten location-mode combinations
+ u_act = u_act.flatten()
+ u_choices.append(u_act)
+ idxs.append(idx_act)
+ u_choices = np.array(u_choices)
+ # check dimensions
+ assert u_choices.shape[1] == len(choice_labels)
+ assert u_choices.shape[0] == len(idxs)
+ return ChoiceSet(idxs=idxs, u_choices=u_choices, choice_labels=choice_labels)
+ @property
+ def selections(self) -> SelectionSet:
+ self.configuration.validate(['func_probabilities', 'func_sampling'])
+ if self._selections is None:
+ self._selections = SelectionSet(
+ choice_set=self.get_choice_set(),
+ func_probabilities=self.configuration.func_probabilities,
+ func_sampling=self.configuration.func_sampling
+ )
+ return self._selections
+class ChoiceMNL(ChoiceModel):
+ """
+ Applies a Multinomial Logit Choice model.
+ """
+ def __init__(self, population: Population, od: OD, zones: pd.DataFrame) -> None:
+ super().__init__(population, od, zones)
+ self.configure(
+ func_probabilities=calculate_mnl_probabilities,
+ func_sampling=sample_weighted
+ )
diff --git a/pam/planner/od.py b/pam/planner/od.py
new file mode 100644
index 00000000..ce8f42f9
--- /dev/null
+++ b/pam/planner/od.py
@@ -0,0 +1,153 @@
+Manages origin-destination data required by the planner module.
+import itertools
+from typing import Union, Optional, List, NamedTuple
+import numpy as np
+import pandas as pd
+class Labels(NamedTuple):
+ """ Data labels for the origin-destination dataset """
+ vars: List
+ origin_zones: List
+ destination_zones: List
+ mode: List
+class OD:
+ """
+ Holds origin-destination matrices for a number of modes and variables.
+ """
+ def __init__(
+ self,
+ data: np.ndarray,
+ labels: Union[Labels, List, dict]
+ ) -> None:
+ """
+ :param data: A multi-dimensional numpy array of the origin-destination data.
+ - First dimension: variable (ie travel time, distance, etc)
+ - Second dimension: origin zone
+ - Third dimension: destination zone
+ - Fourth dimension: mode (ie car, bus, etc)
+ """
+ self.data = data
+ self.labels = self.parse_labels(labels)
+ self.data_checks()
+ def data_checks(self):
+ """
+ Check the integrity of input data and labels.
+ """
+ assert self.data.ndim == 4, \
+ "The number of matrix dimensions should be 4 (mode, variable, origin, destination)"
+ for i, (key, labels) in enumerate(zip(self.labels._fields, self.labels)):
+ assert len(labels) == self.data.shape[i], \
+ f"The number of {key} labels should match the number of elements" \
+ f"in dimension {i} of the OD dataset"
+ @staticmethod
+ def parse_labels(labels: Union[Labels, List, dict]) -> Labels:
+ """
+ Parse labels as a named tuple.
+ """
+ if not isinstance(labels, Labels):
+ if isinstance(labels, list):
+ return Labels(*labels)
+ elif isinstance(labels, dict):
+ return Labels(**labels)
+ else:
+ raise ValueError('Please provide a valid label type')
+ return labels
+ def __getitem__(self, args):
+ _args = args if isinstance(args, tuple) else tuple([args])
+ _args_encoded = tuple()
+ for i, (arg, labels) in enumerate(zip(_args, self.labels)):
+ if arg == slice(None) or isinstance(arg, int):
+ _args_encoded += (arg,)
+ elif arg in labels:
+ _args_encoded += (labels.index(arg),)
+ else:
+ raise IndexError(f'Invalid slice value {arg}')
+ return self.data.__getitem__(_args_encoded)
+ def __repr__(self) -> str:
+ divider = '-'*50 + '\n'
+ r = f'Origin-destination dataset \n{divider}'
+ r += f'{self.labels.__str__()}\n{divider}'
+ for var in self.labels.vars:
+ for trmode in self.labels.mode:
+ r += f'{var} - {trmode}:\n'
+ r += f'{self[var, :, :, trmode].__str__()}\n{divider}'
+ return r
+class ODMatrix(NamedTuple):
+ var: str
+ mode: str
+ origin_zones: tuple
+ destination_zones: tuple
+ matrix: np.array
+class ODFactory:
+ @classmethod
+ def from_matrices(cls, matrices: List[ODMatrix]) -> OD:
+ """
+ Creates an OD instance from a list of ODMatrices
+ """
+ # collect dimensions
+ labels = cls.prepare_labels(matrices)
+ cls.check(matrices, labels)
+ # create ndarray
+ od = np.zeros(shape=[len(x) for x in labels])
+ for mat in matrices:
+ od[
+ labels.vars.index(mat.var),
+ labels.mode.index(mat.mode)
+ ] = mat.matrix
+ # move mode to last dimension
+ od = np.moveaxis(od, 1, 3)
+ return OD(data=od, labels=labels)
+ @staticmethod
+ def prepare_labels(matrices: List[ODMatrix]) -> Labels:
+ labels = Labels(
+ vars=list(pd.unique([mat.var for mat in matrices])),
+ origin_zones=matrices[0].origin_zones,
+ destination_zones=matrices[0].destination_zones,
+ mode=list(pd.unique([mat.mode for mat in matrices])),
+ )
+ return labels
+ @staticmethod
+ def check(matrices: List[ODMatrix], labels: Labels) -> None:
+ # all matrices follow the same zoning system and are equal size
+ for mat in matrices:
+ assert mat.origin_zones == labels.origin_zones, \
+ 'Please check zone labels'
+ assert mat.destination_zones == labels.destination_zones, \
+ 'Please check zone labels'
+ assert mat.matrix.shape == matrices[0].matrix.shape, \
+ 'Please check matrix dimensions'
+ # all possible combinations are provided
+ combinations_matrices = [(var, trmode)
+ for (var, trmode, *others) in matrices]
+ combinations_labels = list(itertools.product(labels.vars, labels.mode))
+ for combination in combinations_labels:
+ assert combination in combinations_matrices, \
+ f'Combination {combination} missing from the input matrices'
+ # no duplicate combinations
+ assert len(combinations_matrices) == len(set(combinations_matrices)), \
+ 'No duplicate keys are allowed'
diff --git a/pam/planner/utils_planner.py b/pam/planner/utils_planner.py
new file mode 100644
index 00000000..faaceabb
--- /dev/null
+++ b/pam/planner/utils_planner.py
@@ -0,0 +1,75 @@
+import numpy as np
+import random
+from typing import Union, List
+from pam.activity import Plan, Activity, Leg
+def calculate_mnl_probabilities(x: Union[np.array, List]) -> np.array:
+ """
+ Calculates MNL probabilities from a set of alternatives.
+ """
+ return np.exp(x)/np.exp(x).sum()
+def sample_weighted(weights: np.array) -> int:
+ """
+ Weighted sampling.
+ Returns the index of the selection.
+ """
+ return random.choices(range(len(weights)), weights=weights, k=1)[0]
+def get_trip_chains(
+ plan: Plan,
+ act: str = 'home'
+) -> List[List[Union[Activity, Leg]]]:
+ """
+ Get trip chains starting and/or ending at a long-term activity
+ """
+ chains = []
+ chain = []
+ for elem in plan.day:
+ if isinstance(elem, Activity) and elem.act == act:
+ if len(chain) > 0:
+ chains.append(chain+[elem])
+ chain = []
+ chain.append(elem)
+ if len(chain) > 1:
+ chains += [chain] # add any remaining trips until the end of the day
+ return chains
+def apply_mode_to_home_chain(act: Activity, trmode: str):
+ """
+ Apply a transport mode across a home-based trip chain,
+ which comprises the specified activity.
+ :param act: The activity that is part of the trip chain.
+ :param trmode: The mode to apply to each leg of the chain.
+ """
+ if not 'next' in act.__dict__:
+ raise KeyError('Plan is not linked. Please use `pam.operations.cropping.link_plan` to link activities and legs.')
+ # apply forwards
+ elem = act.next
+ while (elem is not None) and (elem.act != 'home'):
+ if isinstance(elem, Leg):
+ elem.mode = trmode
+ elem = elem.next
+ # apply backwards
+ elem = act.previous
+ while (elem is not None) and (elem.act != 'home'):
+ if isinstance(elem, Leg):
+ elem.mode = trmode
+ elem = elem.previous
+def get_validate(obj, name: str):
+ """
+ Get an object's attribute, or raise an error if its value is None.
+ """
+ attr = getattr(obj, name)
+ if attr is None:
+ raise ValueError(f'Attribute {name} has not been set yet')
+ return attr
\ No newline at end of file
diff --git a/pam/planner/zones.py b/pam/planner/zones.py
new file mode 100644
index 00000000..d7f3d2bd
--- /dev/null
+++ b/pam/planner/zones.py
@@ -0,0 +1,27 @@
+Manages zone-level data required by the planner module.
+import pandas as pd
+import numpy as np
+class Zones:
+ def __init__(
+ self,
+ data: pd.DataFrame
+ ) -> None:
+ """
+ :param data: A dataframe with variables as columns and the zone as index
+ """
+ self.data = data
+ def __getattr__(self, __name: str) -> np.array:
+ return self.data[__name].values[:, np.newaxis]
+ def __getitem__(self, __name: str) -> np.array:
+ return self.__getattr__(__name)
+ def __repr__(self) -> str:
+ r = 'Attraction data\n'
+ r += repr(self.data)
+ return r
diff --git a/tests/test_22_planner_od.py b/tests/test_22_planner_od.py
new file mode 100644
index 00000000..a538daed
--- /dev/null
+++ b/tests/test_22_planner_od.py
@@ -0,0 +1,114 @@
+import pytest
+from pam.planner.od import OD, Labels, ODMatrix, ODFactory
+import pandas as pd
+import numpy as np
+from copy import deepcopy
+def data_od():
+ matrices = np.array(
+ [[[[20, 30], [40, 45]], [[40, 45], [20, 30]]],
+ [[[5, 5], [8, 9]], [[8, 9], [5, 5]]]]
+ )
+ return matrices
+def labels():
+ labels = {
+ 'mode': ['car', 'bus'],
+ 'vars': ['time', 'distance'],
+ 'origin_zones': ['a', 'b'],
+ 'destination_zones': ['a', 'b']
+ }
+ return labels
+def od(data_od, labels):
+ od = OD(
+ data=data_od,
+ labels=labels
+ )
+ return od
+def od_matrices():
+ zone_labels = ['a', 'b']
+ matrices = [
+ ODMatrix('time', 'car', zone_labels, zone_labels, np.array([[20, 40], [40, 20]])),
+ ODMatrix('time', 'bus', zone_labels, zone_labels, np.array([[30, 45], [45, 30]])),
+ ODMatrix('distance', 'car', zone_labels, zone_labels, np.array([[5, 8], [8, 5]])),
+ ODMatrix('distance', 'bus', zone_labels, zone_labels, np.array([[5, 9], [9, 5]]))
+ ]
+ return matrices
+def test_label_type_is_parsed_correctly(labels):
+ assert type(OD.parse_labels(labels)) is Labels
+ assert type(OD.parse_labels(list(labels.values()))) is Labels
+def test_inconsistent_labels_raise_error(data_od, labels):
+ _labels = deepcopy(labels)
+ for k, v in _labels.items():
+ v.pop()
+ with pytest.raises(AssertionError):
+ OD(
+ data=data_od,
+ labels=_labels
+ )
+def test_od_slicing_is_correctly_encoded(od):
+ np.testing.assert_equal(od[0], od['time'])
+ np.testing.assert_equal(od[1], od['distance'])
+ np.testing.assert_equal(od[1, 0], od['distance', 'a'])
+ np.testing.assert_equal(od[:], od.data)
+ np.testing.assert_equal(od[:, :, :, :], od.data)
+ np.testing.assert_equal(od['time', 'a', 'b', :], np.array([40, 45]))
+ with pytest.raises(IndexError):
+ od['_']
+def test_class_represantation_is_string(od):
+ assert type(od.__repr__()) == str
+def test_matrix_dimensions_stay_the_same(od):
+ """
+ Regression test: Label dimensions need to stay the same.
+ To apply the model correctly,
+ we need the first dimension to select the variable,
+ the second to select the origin,
+ the third to select the destination,
+ and the last to select the mode.
+ """
+ assert od.labels._fields == tuple(
+ ['vars', 'origin_zones', 'destination_zones', 'mode'])
+def test_create_od_from_matrices(od_matrices, od):
+ od_from_matrices = ODFactory.from_matrices(od_matrices)
+ np.testing.assert_equal(od_from_matrices.data, od.data)
+ assert od_from_matrices.labels == od.labels
+def test_od_factory_inconsistent_inputs_raise_error(od_matrices):
+ labels = Labels(
+ vars=['time', 'distance'],
+ origin_zones=('a', 'b'),
+ destination_zones=('a', 'b'),
+ mode=['car', 'bus']
+ )
+ # duplicate key
+ with pytest.raises(AssertionError):
+ ODFactory.check(od_matrices+[od_matrices[0]], labels)
+ # combination missing
+ with pytest.raises(AssertionError):
+ ODFactory.check(od_matrices[:-1], labels)
+ # inconsistent zoning
+ with pytest.raises(AssertionError):
+ ODFactory.check(od_matrices[:-1], labels)
\ No newline at end of file
diff --git a/tests/test_23_planner_zones.py b/tests/test_23_planner_zones.py
new file mode 100644
index 00000000..abd67e7b
--- /dev/null
+++ b/tests/test_23_planner_zones.py
@@ -0,0 +1,25 @@
+import pytest
+from pam.planner.zones import Zones
+import pandas as pd
+import numpy as np
+def data_zones():
+ df = pd.DataFrame(
+ {
+ 'zone': ['a', 'b'],
+ 'jobs': [100, 200],
+ 'schools': [3, 1]
+ }
+ ).set_index('zone')
+ return df
+def zones(data_zones):
+ return Zones(data=data_zones)
+def test_get_variable(zones):
+ np.testing.assert_equal(zones.jobs, np.array([[100], [200]]))
+ np.testing.assert_equal(zones['jobs'], zones.jobs)
\ No newline at end of file
diff --git a/tests/test_24_planner_choice_location.py b/tests/test_24_planner_choice_location.py
new file mode 100644
index 00000000..33ab4d66
--- /dev/null
+++ b/tests/test_24_planner_choice_location.py
@@ -0,0 +1,188 @@
+import pytest
+import numpy as np
+import random
+from tests.test_22_planner_od import data_od, labels, od
+from tests.test_23_planner_zones import data_zones
+from pam.planner.choice_location import ChoiceModel, ChoiceMNL, ChoiceSet, \
+ ChoiceConfiguration
+from pam.planner.utils_planner import sample_weighted
+import os
+from pam.read import read_matsim
+from pam.planner.od import OD
+test_plans = os.path.abspath(
+ os.path.join(os.path.dirname(__file__),
+ "test_data/test_matsim_plansv12.xml")
+def population():
+ population = read_matsim(test_plans, version=12)
+ for hid, pid, person in population.people():
+ for act in person.activities:
+ act.location.area = 'a'
+ return population
+test_plans_experienced = os.path.abspath(
+ os.path.join(os.path.dirname(__file__),
+ "test_data/test_matsim_experienced_plans_v12.xml")
+def population_experienced():
+ population = read_matsim(test_plans_experienced, version=12)
+ return population
+def choice_model(population, od, data_zones) -> ChoiceModel:
+ return ChoiceModel(population, od, data_zones)
+def choice_model_mnl(population, od, data_zones) -> ChoiceModel:
+ return ChoiceMNL(population, od, data_zones)
+def test_zones_are_aligned(population, od, data_zones):
+ choice_model = ChoiceModel(population, od, data_zones.loc[['b', 'a']])
+ zones_destination = choice_model.od.labels.destination_zones
+ zones_index = list(choice_model.zones.data.index)
+ assert zones_destination == zones_index
+@pytest.mark.parametrize('var', ['time', 'distance'])
+@pytest.mark.parametrize('zone', ['a', 'b'])
+def test_choice_set_reads_od_correctly(choice_model, var, zone):
+ choice_model.configure(
+ u=f"""1 / od['{var}', '{zone}']""",
+ scope="""act.act=='work'"""
+ )
+ choice_set = choice_model.get_choice_set()
+ assert choice_set.choice_labels == [
+ ('a', 'car'), ('a', 'bus'), ('b', 'car'), ('b', 'bus')
+ ]
+ for choice in choice_set.u_choices:
+ np.testing.assert_array_equal(
+ choice,
+ 1 / choice_model.od[var, zone].flatten()
+ )
+def test_list_parameters_correspond_to_modes(choice_model):
+ """
+ When utility parameters are passed as a list,
+ they should be applied to each respective mode.
+ """
+ asc = [0, 10] # 0 should be applied to car, 10 to bus
+ choice_model.configure(
+ u=f"""{asc} + od['time', 'b']""",
+ scope="""True"""
+ )
+ choice_set = choice_model.get_choice_set()
+ u_choices = choice_set.u_choices
+ choice_labels = choice_set.choice_labels
+ idx_car = [i for (i, (zone, trmode)) in enumerate(
+ choice_labels) if trmode == 'car']
+ idx_bus = [i for (i, (zone, trmode)) in enumerate(
+ choice_labels) if trmode == 'bus']
+ np.testing.assert_equal(
+ u_choices[0, idx_car],
+ choice_model.od['time', 'b', :, 'car']
+ )
+ np.testing.assert_equal(
+ u_choices[0, idx_bus],
+ 10 + choice_model.od['time', 'b', :, 'bus']
+ )
+def test_get_probabilities_along_dimension(choice_model):
+ choice_model.configure(
+ u=f"""1 / od['time', 'b']""",
+ scope="""True""",
+ func_probabilities=lambda x: x / sum(x),
+ func_sampling=sample_weighted
+ )
+ choices = choice_model.get_choice_set().u_choices
+ probs = choice_model.selections.probabilities
+ assert (probs.sum(axis=1) == 1).all()
+ np.testing.assert_almost_equal(
+ choices / choices.sum(1).reshape(-1, 1),
+ probs
+ )
+def test_apply_once_per_agent_same_locations(choice_model_mnl):
+ choice_model_mnl.configure(
+ u=f"""1 / od['time', 'b']""",
+ scope="""True""",
+ func_probabilities=lambda x: x / sum(x)
+ )
+ def assert_single_location(population):
+ for hid, pid, person in population.people():
+ locs = [act.location.area for act in person.plan.activities]
+ assert len(set(locs)) == 1
+ with pytest.raises(AssertionError):
+ random.seed(10)
+ choice_model_mnl.apply(once_per_agent=False)
+ assert_single_location(choice_model_mnl.population)
+ # only apply once per agent:
+ choice_model_mnl.apply(once_per_agent=True)
+ # now all locations should be in the same zone
+ # for each agent
+ assert_single_location(choice_model_mnl.population)
+def test_nonset_config_attribute_valitation_raise_error():
+ config = ChoiceConfiguration()
+ with pytest.raises(ValueError):
+ config.validate(['u', 'scope'])
+ config.u = '1'
+ config.scope = 'True'
+ config.validate(['u', 'scope'])
+def test_model_checks_config_requirements(mocker, choice_model_mnl):
+ mocker.patch.object(ChoiceConfiguration, 'validate')
+ choice_model_mnl.configure(
+ u=f"""1 / od['time', 'a']""",
+ scope="""act.act=='work'"""
+ )
+ choice_model_mnl.get_choice_set()
+ ChoiceConfiguration.validate.assert_called_once_with(['u', 'scope'])
+def test_utility_calculation(choice_model_mnl):
+ scope = "act.act=='work'"
+ asc = [0, -1]
+ asc_shift_poor = [0, 2]
+ beta_time = [-0.05, -0.07]
+ beta_zones = 0.4
+ u = f""" \
+ {asc} + \
+ (np.array({asc_shift_poor}) * (person.attributes['subpopulation']=='poor')) + \
+ ({beta_time} * od['time', person.home.area]) + \
+ ({beta_zones} * np.log(zones.jobs))
+ """
+ # ({beta_zones} * np.log(zones['jobs'].values[:, np.newaxis]))
+ choice_model_mnl.configure(u=u, scope=scope)
+ np.testing.assert_almost_equal(
+ np.array([
+ 0.8420680743952365,
+ -1.2579319256047636,
+ 0.11932694661921461,
+ -2.0306730533807857
+ ]),
+ choice_model_mnl.get_choice_set().u_choices[0],
+ )
diff --git a/tests/test_25_planner_utils.py b/tests/test_25_planner_utils.py
new file mode 100644
index 00000000..5843b280
--- /dev/null
+++ b/tests/test_25_planner_utils.py
@@ -0,0 +1,83 @@
+import pytest
+import numpy as np
+import random
+import os
+from pam.read import read_matsim
+from pam.activity import Leg
+from pam.operations.cropping import link_population
+from pam.planner.utils_planner import calculate_mnl_probabilities, \
+ sample_weighted, get_trip_chains, apply_mode_to_home_chain, \
+ get_validate
+test_plans_experienced = os.path.abspath(
+ os.path.join(os.path.dirname(__file__),
+ "test_data/test_matsim_experienced_plans_v12.xml")
+def population_experienced():
+ population = read_matsim(test_plans_experienced, version=12)
+ return population
+def test_weighted_sampling_zero_weight():
+ choices = np.array([0, 0, 1])
+ assert sample_weighted(choices) == 2
+def test_weighted_sampling_seed_results():
+ choices = np.array([10, 3, 9, 0, 11, 2])
+ random.seed(10)
+ assert sample_weighted(choices) == 2
+def test_mnl_probabilities_add_up():
+ choices = np.array([10, 3, 9, 0, 11, 2])
+ probs = calculate_mnl_probabilities(choices)
+ assert round(probs.sum(), 4) == 1
+def test_mnl_equal_weights_equal_probs():
+ n = 10
+ choices = np.array([10]*n)
+ probs = calculate_mnl_probabilities(choices)
+ assert (probs==(1/n)).all()
+def test_get_home_trip_chains(population_experienced):
+ person = population_experienced['agent_1']['agent_1']
+ person.plan.day[12].act = 'home'
+ chains = get_trip_chains(person.plan)
+ assert len(chains) == 2
+ assert chains[0][-1] == person.plan.day[12]
+ assert chains[1][0] == person.plan.day[12]
+def test_apply_mode_to_chain(population_experienced):
+ link_population(population_experienced)
+ person = population_experienced['agent_1']['agent_1']
+ person.plan.day[12].act = 'home'
+ chains = get_trip_chains(person.plan)
+ apply_mode_to_home_chain(
+ person.plan.day[10], 'gondola'
+ )
+ # mode is applied to all legs in the chain
+ legs = [elem for elem in chains[0] if isinstance(elem, Leg)]
+ assert all([leg.mode=='gondola' for leg in legs])
+ # ..but not to the next trip chain
+ legs = [elem for elem in chains[1] if isinstance(elem, Leg)]
+ assert all([leg.mode!='gondola' for leg in legs])
+def test_nonset_attribute_raises_error():
+ class A:
+ a = 'b'
+ b = None
+ a = A()
+ with pytest.raises(ValueError):
+ get_validate(a, 'b')
\ No newline at end of file