diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile new file mode 100644 index 000000000..ad21f079b --- /dev/null +++ b/.devcontainer/Dockerfile @@ -0,0 +1,21 @@ +# See here for image contents: https://github.com/microsoft/vscode-dev-containers/tree/v0.245.0/containers/python-3/.devcontainer/base.Dockerfile + +# [Choice] Python version (use -bullseye variants on local arm64/Apple Silicon): 3, 3.10, 3.9, 3.8, 3.7, 3.6, 3-bullseye, 3.10-bullseye, 3.9-bullseye, 3.8-bullseye, 3.7-bullseye, 3.6-bullseye, 3-buster, 3.10-buster, 3.9-buster, 3.8-buster, 3.7-buster, 3.6-buster +ARG VARIANT="3.10-bullseye" +FROM mcr.microsoft.com/vscode/devcontainers/python:0-${VARIANT} + +# [Choice] Node.js version: none, lts/*, 16, 14, 12, 10 +ARG NODE_VERSION="none" +RUN if [ "${NODE_VERSION}" != "none" ]; then su vscode -c "umask 0002 && . /usr/local/share/nvm/nvm.sh && nvm install ${NODE_VERSION} 2>&1"; fi + +# [Optional] If your pip requirements rarely change, uncomment this section to add them to the image. +# COPY requirements.txt /tmp/pip-tmp/ +# RUN pip3 --disable-pip-version-check --no-cache-dir install -r /tmp/pip-tmp/requirements.txt \ +# && rm -rf /tmp/pip-tmp + +# [Optional] Uncomment this section to install additional OS packages. +# RUN apt-get update && export DEBIAN_FRONTEND=noninteractive \ +# && apt-get -y install --no-install-recommends + +# [Optional] Uncomment this line to install global node packages. +# RUN su vscode -c "source /usr/local/share/nvm/nvm.sh && npm install -g " 2>&1 \ No newline at end of file diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json new file mode 100644 index 000000000..f6a8f8434 --- /dev/null +++ b/.devcontainer/devcontainer.json @@ -0,0 +1,58 @@ +// For format details, see https://aka.ms/devcontainer.json. For config options, see the README at: +// https://github.com/microsoft/vscode-dev-containers/tree/v0.245.0/containers/python-3 +{ + "name": "Python 3", + "build": { + "dockerfile": "Dockerfile", + "context": "..", + "args": { + // Update 'VARIANT' to pick a Python version: 3, 3.10, 3.9, 3.8, 3.7, 3.6 + // Append -bullseye or -buster to pin to an OS version. + // Use -bullseye variants on local on arm64/Apple Silicon. + "VARIANT": "3.10-bullseye", + // Options + "NODE_VERSION": "none" + } + }, + + // Configure tool-specific properties. + "customizations": { + // Configure properties specific to VS Code. + "vscode": { + // Set *default* container specific settings.json values on container create. + "settings": { + "python.defaultInterpreterPath": "/usr/local/bin/python", + "python.linting.enabled": true, + "python.linting.pylintEnabled": true, + "python.formatting.autopep8Path": "/usr/local/py-utils/bin/autopep8", + "python.formatting.blackPath": "/usr/local/py-utils/bin/black", + "python.formatting.yapfPath": "/usr/local/py-utils/bin/yapf", + "python.linting.banditPath": "/usr/local/py-utils/bin/bandit", + "python.linting.flake8Path": "/usr/local/py-utils/bin/flake8", + "python.linting.mypyPath": "/usr/local/py-utils/bin/mypy", + "python.linting.pycodestylePath": "/usr/local/py-utils/bin/pycodestyle", + "python.linting.pydocstylePath": "/usr/local/py-utils/bin/pydocstyle", + "python.linting.pylintPath": "/usr/local/py-utils/bin/pylint" + }, + + // Add the IDs of extensions you want installed when the container is created. + "extensions": [ + "ms-python.python", + "ms-python.vscode-pylance" + ] + } + }, + + // Use 'forwardPorts' to make a list of ports inside the container available locally. + // "forwardPorts": [], + + // Use 'postCreateCommand' to run commands after the container is created. + "postCreateCommand": "pip3 install --user -r requirements.txt", + + // Comment out to connect as root instead. More info: https://aka.ms/vscode-remote/containers/non-root. + "remoteUser": "vscode", + "features": { + "git": "latest", + "jupyterlab": "latest" + } +} diff --git a/.gitignore b/.gitignore index d2631e985..42831aee9 100644 --- a/.gitignore +++ b/.gitignore @@ -96,3 +96,12 @@ build/ coverage.xml #### End snippet +.DS_Store + +# Images +*.png +*.csv + +# Aequilibrae Procedure Log +*.log +*.qgz \ No newline at end of file diff --git a/aequilibrae/__init__.py b/aequilibrae/__init__.py index c1a45f007..5c2fc4859 100644 --- a/aequilibrae/__init__.py +++ b/aequilibrae/__init__.py @@ -28,6 +28,7 @@ from aequilibrae.paths.assignment_paths import AssignmentPaths from aequilibrae.project import Project from aequilibrae.paths.results import AssignmentResults, SkimResults, PathResults +from aequilibrae.paths.odme import ODME from aequilibrae import paths diff --git a/aequilibrae/paths/__init__.py b/aequilibrae/paths/__init__.py index 61af0cc6e..5dabcd4db 100644 --- a/aequilibrae/paths/__init__.py +++ b/aequilibrae/paths/__init__.py @@ -9,6 +9,8 @@ from aequilibrae.paths.vdf import VDF from aequilibrae.paths.graph import Graph, TransitGraph +from aequilibrae.paths.odme import ODME + from aequilibrae import global_logger diff --git a/aequilibrae/paths/odme.py b/aequilibrae/paths/odme.py new file mode 100644 index 000000000..8fd50456d --- /dev/null +++ b/aequilibrae/paths/odme.py @@ -0,0 +1,637 @@ +""" +ODME Infrastructure (User Interaction Class): +""" + +from typing import Tuple +from uuid import uuid4 +from datetime import datetime +from os.path import join +from pathlib import Path +import json +import importlib.util as iutil +import numpy as np +import pandas as pd + +from aequilibrae.paths import TrafficAssignment, TrafficClass +from aequilibrae.paths.odme_ import ScalingFactors, ODMEResults + +from aequilibrae.context import get_active_project +from aequilibrae.matrix import AequilibraeMatrix + +# Checks if we can display OMX +spec = iutil.find_spec("openmatrix") +has_omx = spec is not None +if has_omx: + import openmatrix as omx + + +class ODME(object): + """Origin-Destination Matrix Estimation class. + + For a comprehensive example on use, see the Use examples page. + + .. code-block:: python + + >>> from aequilibrae import TrafficAssignment, TrafficClass, Graph, Project, ODME + >>> project = Project.from_path("/tmp/test_project") + >>> project.network.build_graphs() + + >>> graph = project.network.graphs['c'] # we grab the graph for cars + >>> graph.set_blocked_centroid_flows(False) + + >>> matrix = project.matrices.get_matrix("demand_omx") + >>> matrix.computational_view(['car']) # The demand matrix is what we want to estimate + + # See the TrafficAssignment class on details for how to create an assignment + >>> assignment = TrafficAssignment() + # Make sure we have the matrix we want to perturb included in the TrafficClass + >>> assignclass = TrafficClass("car", graph, matrix) + >>> assignment.set_classes([assignclass]) + >>> assignment.set_vdf("BPR") + >>> assignment.set_vdf_parameters({"alpha": 0.15, "beta": 4.0}) + >>> assignment.set_vdf_parameters({"alpha": "b", "beta": "power"}) + >>> assignment.set_capacity_field("capacity") + >>> assignment.set_time_field("free_flow_time") + >>> assignment.max_iter = 5 + >>> assignment.set_algorithm("msa") + + # We now need to create our data (count volumes), suppose we have these in a csv file: + >>> import pandas as pd + >>> counts = pd.read_csv("/tmp/test_data.csv") + + # We can now run the ODME procedure, see Use examples page for a more + # comprehensive overview of the options available when initialising. + >>> odme = ODME(assignment, counts) + >>> odme.execute() # See Use examples for optional arguments + + # There are multiple ways to deal with the output to ODME, + # the simplest is to save the procedure as output demand matrices + # and procedure statistics to the project database. + >>> odme.save_to_project("odme_test", "odme_test.omx", project=project) + + # We can also get statistics in memory immediately as pandas dataframes + >>> results = odme.results + + # Dataframe of the cumulative factors applied to the input demands + >>> cumulative_factors = results.get_cumulative_factors() + + # Statistics on the procedure across each iteration + >>> iterative_stats = results.get_iteration_statistics() + + # Statistics on the procedure tracking each link (count volumes) + >>> link_stats = results.get_link_statistics() + """ + + # Input count volume columns (assigned volumes will be added during execution) + COUNT_VOLUME_COLS = ["class", "link_id", "direction", "obs_volume"] + GMEAN_LIMIT = 0.01 # FACTOR LIMITING VARIABLE - FOR TESTING PURPOSES - DEFUNCT! + ALL_ALGORITHMS = ["gmean", "spiess", "reg_spiess"] + DEFAULT_STOP_CRIT = {"max_outer": 50, "max_inner": 50, "convergence_crit": 10**-4, "inner_convergence": 10**-4} + + def __init__( + self, + assignment: TrafficAssignment, + count_volumes: pd.DataFrame, + stop_crit=None, + algorithm: str = "spiess", + alpha: float = None, + ) -> None: + """ + Parameters: + assignment: the TrafficAssignment object - should be initialised with volume + delay functions and their parameters and an assignment algorithm, + as well as TrafficClass's containing initial demand matrices (which + will be duplicated so input data is not corrupted). Doesn't + need to have preset select links (these will be overwritten). + count_volumes: a dataframe detailing the links, the class they are associated with, + the direction and their observed volume. + stop_crit: the maximum number of iterations and the convergence criterion + (see ODME.DEFAULT_STOP_CRIT for formatting). + algorithm: specification for which gradient-descent based algorithm to use + (see ODME.ALL_ALGORITHMS for options). + alpha: used as a hyper-parameter for regularised spiess (see technical document for + details). + + NOTE - certain functionality is only implemented for single class ODME - see docstrings for + such cases. + """ + self.__check_inputs(count_volumes, stop_crit, alpha, algorithm) + + self.assignment = assignment + self.classes = assignment.classes + self.__duplicate_matrices() + + self.class_names = [user_class._id for user_class in self.classes] + self.names_to_indices = {name: index for index, name in enumerate(self.class_names)} + + # The following are implicitly ordered by the list of Traffic Classes: + self.aequilibrae_matrices = [user_class.matrix for user_class in self.classes] + self.demands = [user_class.matrix.matrix_view for user_class in self.classes] + # Reshaping matrices because when computational_view is done with a single class we get + # n x n instead of n x n x 1 - NOTE: this may be redundant now + for i, demand in enumerate(self.demands): + if len(demand.shape) == 2: + self.demands[i] = demand[:, :, np.newaxis] + + self.original_demands = [np.copy(matrix) for matrix in self.demands] + + # Observed Links & Associated Volumes + self.count_volumes = count_volumes.copy(deep=True) + + # Select Link: + self._sl_matrices = dict() + self.__set_select_links() + + # Algorithm Specifications: + self._norms = self.__get_norms(algorithm) + self._algorithm = algorithm + + # Objective Function: + self._obj_func = None + self.__init_objective_func() + self.last_convergence = None + # Component of objective function from flows/regularisation: + self.flow_obj, self.reg_obj = None, None + # Initially inf to ensure inner iterations begin + self.convergence_change = float("inf") + + # Stopping criterion + if not stop_crit: + stop_crit = self.DEFAULT_STOP_CRIT + self.max_outer = stop_crit["max_outer"] + self.max_inner = stop_crit["max_inner"] + self.outer_convergence_crit = stop_crit["convergence_crit"] + self.inner_convergence_crit = stop_crit["inner_convergence"] + + # Hyper-parameters for regularisation: + if algorithm in ["reg_spiess"]: + if alpha is None or alpha > 1 or alpha < 0: # THIS CHECK SHOULD PROBABLY BE MORE ROBUST + raise ValueError("Hyper-parameter alpha should be between 0 and 1") + self.alpha = alpha + self.beta = 1 - alpha + + # Results/Statistics: + self.results = ODMEResults(self) + + # Procedure Information: + self.procedure_date = "" + self.procedure_id = "" + + # Utilities: + def __check_inputs(self, counts: pd.DataFrame, stop_crit: dict, alpha: float, algorithm: str) -> None: + """ + Ensures all user input is of correct format/value. + NOTE - we do not check if the assignment is given properly, + this is assumed to either be done correctly or for any errors + to be picked up by the TrafficAssignment class. + """ + # Check algorithm + if not isinstance(algorithm, str): + raise ValueError("Algorithm must be input as a string") + elif algorithm not in self.ALL_ALGORITHMS: + raise ValueError( + f"'{algorithm}' is not a valid algorithm.\n" + + "Currently implemented algorithms include:\n" + + "\n".join(self.ALL_ALGORITHMS) + ) + + # Check stopping criteria if given + stop_error = False + if stop_crit is not None: + keys = self.DEFAULT_STOP_CRIT.keys() + if not isinstance(stop_crit, dict): + stop_error = True + else: + for key in keys: + if key not in stop_crit: + stop_error = True + elif key in ["max_outer", "max_inner"]: + if not isinstance(stop_crit[key], int): + stop_error = True + elif stop_crit[key] < 1: + stop_error = True + else: + if not isinstance(stop_crit[key], (float, int)): + stop_error = True + elif stop_crit[key] < 0: + stop_error = True + + if stop_error: + raise ValueError( + "Stopping criterion must be given as a dictionary as follows," + + "(key -> type of value):" + + "max_outer -> positive integer" + + "max_inner -> positive integer" + + "convergence_crit -> non-negative integer/float" + + "inner_convergence -> non-negative integer/float" + ) + + # Check count volumes + counts_error = False + if not isinstance(counts, pd.DataFrame): + counts_error = True + elif len(counts) < 1: + counts_error = True + elif len(counts.columns) != len(self.COUNT_VOLUME_COLS): + counts_error = True + + if not counts_error: + for col in counts.columns: + if col not in self.COUNT_VOLUME_COLS: + counts_error = True + + if not counts_error: + observed = counts["obs_volume"] + if not (pd.api.types.is_float_dtype(observed) or pd.api.types.is_integer_dtype(observed)): + counts_error = True + elif not np.all(observed >= 0): + counts_error = True + + if not counts_error: + if counts.duplicated(subset=["class", "link_id", "direction"]).any(): + counts_error = True + + if counts_error: + raise ValueError( + "Count volumes must be a non-empty pandas dataframe with columns:\n" + + "\n".join(self.COUNT_VOLUME_COLS) + + "\n and all observed volumes must be non-negative floats or integers, and" + + "only a single count volume should be given for a" + + "particular class, link_id and direction" + ) + + # Check alpha value if given + if alpha is not None: + if not isinstance(alpha, (float, int)): + raise ValueError("Input alpha should be a float or integer (0 to 1)") + elif alpha > 1 or alpha < 0: + raise ValueError("Input alpha should be between 0 and 1") + + def __duplicate_matrices(self): + """ + Duplicates the given matrices in memory only and replaces the TrafficClass objects. + """ + # Loop through TrafficClasses - create new and replace, then set classes + new_classes = [] + for usr_cls in self.classes: + mat = usr_cls.matrix.copy(cores=usr_cls.matrix.view_names, memory_only=True) + mat.computational_view() + + new_cls = TrafficClass(usr_cls._id, usr_cls.graph, mat) + new_cls.set_pce(usr_cls.pce) + if usr_cls.fixed_cost_field: + new_cls.set_fixed_cost(usr_cls.fixed_cost_field, usr_cls.fc_multiplier) + new_cls.set_vot(usr_cls.vot) + new_classes.append(new_cls) + + self.assignment.set_classes(new_classes) + self.classes = self.assignment.classes + + def estimate_alpha(self, alpha: float) -> float: + """ + Estimates a starting hyper-paramater for regularised + spiess given a number between 0-1. + + NOTE - currently only implemented for single class + """ + demand_sum = np.sum(self.demands[0]) + flow_sum = np.sum(self.count_volumes["obs_volume"]) + return (alpha * demand_sum) / ((alpha * flow_sum) + ((1 - alpha) * demand_sum)) + + def __get_norms(self, algo: str) -> Tuple[int, int]: + """ + Sets the specifications for the objective function for the algorithm chosen. + """ + if algo in ["gmean", "spiess"]: + return (2, 0) + elif algo in ["reg_spiess"]: + return (2, 2) + + def __set_select_links(self) -> None: + """ + Sets all select links for each class and for each associated set of count volumes. + """ + c_v = self.count_volumes + for user_class in self.classes: + user_class.set_select_links( + { + self.get_sl_key(row): [(row["link_id"], row["direction"])] + for _, row in c_v[c_v["class"] == user_class._id].iterrows() + } + ) + + @staticmethod + def get_sl_key(row: pd.Series) -> str: + """ + Given a particular row from the observervations (count_volumes) returns + a key corresponding to it for use in all select link extraction. + + NOTE - this is intended for internal use only - it has no relevance outside + of this class and classes within the odme_ submodule. + """ + return f"sl_{row['class']}_{row['link_id']}_{row['direction']}" + + def __set_convergence_values(self, new_convergence: float) -> None: + """ + Given a new convergence value calculates the difference between the previous convergence + and new convergence, and sets appropriate values. + """ + if self.last_convergence: + self.convergence_change = abs(self.last_convergence - new_convergence) + self.last_convergence = new_convergence + + def __init_objective_func(self) -> None: + """ + Initialises the objective function - depends on algorithm chosen by user. + + Current objective functions have 2 parts which are summed: + 1. The p-norm raised to the power p of the error vector for observed flows. + 2. The p-norm raised to the power p of the error matrix (treated as a n^2 vector) + for the demand matrix. + + NOTE - currently (1.) must always be present, but (2.) (the regularisation term) + need not be present. + """ + p_1 = self._norms[0] + p_2 = self._norms[1] + + def __reg_obj_func(self) -> None: + """ + Objective function containing regularisation term. + + # NOTE - pce not yet included for multi-class + """ + # Get flow term: + self.flow_obj = 0 + for name, usr_cls in zip(self.class_names, self.classes): + counts = self.count_volumes[self.count_volumes["class"] == name].reset_index(drop=True) + obs_vals = counts["obs_volume"].to_numpy() + assign_vals = counts["assign_volume"].to_numpy() + self.flow_obj += usr_cls.pce * np.sum(np.abs(obs_vals - assign_vals) ** p_1) / p_1 + self.flow_obj *= self.alpha + + # Get regularised term: + self.reg_obj = self.beta * np.sum(np.abs(self.original_demands[0] - self.demands[0]) ** p_2) / p_2 + + # Sum terms: + self.__set_convergence_values(self.flow_obj + self.reg_obj) + + def __obj_func(self) -> None: + """ + Objective function with no regularisation term. + + # NOTE - pce not yet included for multi-class + """ + # Get flow term: + self.flow_obj = 0 + for name, usr_cls in zip(self.class_names, self.classes): + counts = self.count_volumes[self.count_volumes["class"] == name].reset_index(drop=True) + obs_vals = counts["obs_volume"].to_numpy() + assign_vals = counts["assign_volume"].to_numpy() + self.flow_obj += usr_cls.pce * np.sum(np.abs(obs_vals - assign_vals) ** p_1) / p_1 + self.__set_convergence_values(self.flow_obj) + + if p_2: + self._obj_func = __reg_obj_func + else: + self._obj_func = __obj_func + + # Output/Results: + def save_to_project(self, name: str, file_name: str, project=None) -> None: + """Saves the final demand matrix output to the project file + + :Arguments: + **name** (:obj:`str`): Name of the desired matrix record + **file_name** (:obj:`str`): Name for the matrix file name. AEM and OMX supported + **project** (:obj:`Project`, Optional): Project we want to save the results to. + Defaults to the active project + """ + project = project or get_active_project() + mats = project.matrices + file_path = join(mats.fldr, file_name) + + if Path(file_name).suffix.lower() == ".omx": + self.__save_as_omx(file_path) + elif ".aem" in file_name: + self.__save_as_aem(file_path) + else: # unsupported file-type + raise ValueError("Only supporting .omx and .aem") + + record = mats.new_record(name, file_name) + record.procedure_id = self.procedure_id + record.timestamp = self.procedure_date + record.procedure = "Origin-Destination Matrix Estimation" + record.report = json.dumps( + { + "iterations": self.results.get_iteration_statistics().to_dict(), + "by_link": self.results.get_link_statistics().to_dict(), + } + ) + record.save() + + def __save_as_omx(self, file_path: str) -> None: + """Saves the final demand matrix output as a .omx file to the project file + + :Arguments: + **file_path** (:obj:`str`): File path for the matrix file name (must end with .omx) + """ + omx_mat = omx.open_file(file_path, "w") + for cls_name, matrix in zip(self.class_names, self.aequilibrae_matrices): + for core in matrix.view_names: + omx_mat[f"{cls_name}_{core}"] = matrix.matrix[core] + + index = self.aequilibrae_matrices[0].current_index + omx_mat.create_mapping(index, self.aequilibrae_matrices[0].index) + + for cls_name, matrix in zip(self.class_names, self.aequilibrae_matrices): + for core in matrix.view_names: + description = f"ODME Procedure {self.procedure_id}" + omx_mat[f"{cls_name}_{core}"].attrs.description = description + omx_mat.close() + + def __save_as_aem(self, file_path: str) -> None: + """ + Saves the final demand matrix output as a .aem file to the project file + + :Arguments: + **file_path** (:obj:`str`): File path for the matrix file name (must end with .aem) + """ + mat = AequilibraeMatrix() + matrix_names = [] + for cls_name, matrix in zip(self.class_names, self.aequilibrae_matrices): + for core in matrix.view_names: + matrix_names.append(f"{cls_name}_{core}") + + args = { + "zones": self.aequilibrae_matrices[0].zones, + "matrix_names": matrix_names, + "index_names": self.aequilibrae_matrices[0].index_names, + "memory_only": False, + "file_name": file_path, + } + mat.create_empty(**args) + mat.indices[:, :] = self.aequilibrae_matrices[0].indices[:, :] + for cls_name, matrix in zip(self.class_names, self.aequilibrae_matrices): + for core in matrix.view_names: + mat.matrix[f"{cls_name}_{core}"][:, :] = matrix.matrix[core][:, :] + mat.description = f"ODME Procedure {self.procedure_id}" + mat.save() + mat.close() + del mat + + def get_demands(self) -> list[np.ndarray]: + """ + Returns all demand matrices (can be called before or after execution). + """ + return self.demands + + # ODME Execution: + def execute(self, verbose=False, print_rate=1) -> None: + """ + Run ODME algorithm until either the maximum iterations has been reached, + or the convergence criterion has been met. + + Parameters: + verbose: if true will print output to screen during runtime so user + can track ODME progress. + print_rate: the rate at which to print output to user (if verbose is true). + Does nothing if verbose is false. + """ + # Initialise Procedure: + self.results.init_timer() + self.procedure_date = str(datetime.today()) + self.procedure_id = uuid4().hex + + # Create values for SL matrices & assigned flows + self.__perform_assignment() + + # Outer iterations + outer = 0 + while outer < self.max_outer and self.last_convergence > self.outer_convergence_crit: + # Log stats before running algorithm: + outer += 1 + self.results.log_iter(ODMEResults.OUTER) + + # Inner iterations: + # Ensure at least 1 inner iteration is run per outer loop + self.convergence_change = float("inf") + inner = 0 + while inner < self.max_inner and self.convergence_change > self.inner_convergence_crit: + inner += 1 + self.__execute_inner_iter() + self.results.log_iter(ODMEResults.INNER) + + if verbose and outer % print_rate == 0: + print(f"Outer iteration {outer} is complete.") + + # Reassign values at the end of each outer loop + self.__perform_assignment() + + # Add final stats following final assignment: + self.results.log_iter(ODMEResults.FINAL_LOG) + + # TODO - check whether the demand matrix replacement at the start of the following + # function is sufficient - we may want to replace the matrix.matrices value + # and call matrix.computational_view() (with appropriate arguments) instead. + def __perform_assignment(self) -> None: + """ + Uses current demand matrix to perform an assignment, then save + the assigned flows and select link matrices. Also recalculates the + objective function following an assignment. + + This function will only be called at the start of an outer + iteration & during the final convergence test. + """ + # Change the demand matrices within the TrafficClass's to the current + # demand matrices that have been calculated from the previous outer iteration. + for aeq_matrix, demand in zip(self.aequilibrae_matrices, self.demands): + aeq_matrix.matrix_view = demand + + # Perform the assignment + self.assignment.execute() + + # Store reference to select link demand matrices as proportion matrices + for assignclass, demand in zip(self.classes, self.demands): + sl_matrices = assignclass.results.select_link_od.matrix + for link in sl_matrices: + self._sl_matrices[link] = np.nan_to_num(sl_matrices[link] / demand) + + # Extract and store array of assigned volumes to the select links + self.__extract_volumes() + + # Recalculate convergence values + self._obj_func(self) + + def __extract_volumes(self) -> None: + """ + Extracts and stores assigned volumes (corresponding for those for which we have + observations - ie count volumes). + + NOTE - this does not take into account pce, ie this is the number of vehicles, not + 'flow'. + """ + assign_df = self.assignment.results().reset_index(drop=False).fillna(0) + # Dictionary to select correct column of results dataframe + col = dict() + for cls_name, matrix in zip(self.class_names, self.aequilibrae_matrices): + name = matrix.view_names[0] + col[cls_name] = {1: f"{name}_ab", -1: f"{name}_ba", 0: f"{name}_tot"} + + # For extracting a single assigned flow: + def extract_volume(row) -> None: + """ + Extracts volume corresponding to particular link (from row) and return it. + For inner iterations need to calculate this via __calculate_volumes + """ + return assign_df.loc[assign_df["link_id"] == row["link_id"], col[row["class"]][row["direction"]]].values[0] + + # Extract a flow for each count volume: + self.count_volumes["assign_volume"] = self.count_volumes.apply(extract_volume, axis=1) + + def __execute_inner_iter(self) -> None: + """ + Runs an inner iteration of the ODME algorithm. + This assumes the SL matrices stay constant and modifies the current demand matrices. + """ + # Element-wise multiplication of demand matrices by scaling factors + factors = self.__get_scaling_factors() + self.demands = [demand * factor for demand, factor in zip(self.demands, factors)] + + # Recalculate the link flows + self.__calculate_volumes() + + # Recalculate convergence level: + self._obj_func(self) + + def __get_scaling_factors(self) -> list[np.ndarray]: + """ + Returns scaling matrices for each user class - depending on algorithm chosen. + + NOTE - we expect any algorithm to return a list of factor matrices in order of the + stored user classes. + """ + algorithm = ScalingFactors(self, self._algorithm) + factors = algorithm.generate() + self.results.record_factor_stats(factors) + return factors + + def __calculate_volumes(self) -> None: + """ + Calculates and stores link volumes using current sl_matrices & demand matrices. + """ + + # Calculate a single flow: + def __calculate_volume(self, row: pd.Series) -> float: + """ + Given a single row of the count volumes dataframe, + calculates the appropriate corresponding assigned + volume. + """ + sl_matrix = self._sl_matrices[self.get_sl_key(row)] + demand_matrix = self.demands[self.names_to_indices[row["class"]]] + return np.sum(sl_matrix * demand_matrix) + + # Calculate flows for all rows: + self.count_volumes["assign_volume"] = self.count_volumes.apply( + lambda row: __calculate_volume(self, row), axis=1 + ) diff --git a/aequilibrae/paths/odme_/__init__.py b/aequilibrae/paths/odme_/__init__.py new file mode 100644 index 000000000..2841cfb08 --- /dev/null +++ b/aequilibrae/paths/odme_/__init__.py @@ -0,0 +1,16 @@ +""" +============= +ODME related code +============= + +This is copied from paths.results.__init__.py +""" + +__author__ = "Mittun Sudhahar ($Author: Mittun Sudhahar $)" +__version__ = "1.0" +__revision__ = "$Revision: 1 $" +__date__ = "$Date: 2024-10-01$" + +# NEED TO CHECK WHETHER THIS SHOULD BE HERE OR WHETHER THIS FILE SHOULD JUST BE LEFT BLANK +from .scaling_factors import ScalingFactors +from .results import ODMEResults diff --git a/aequilibrae/paths/odme_/results.py b/aequilibrae/paths/odme_/results.py new file mode 100644 index 000000000..b82aa67c0 --- /dev/null +++ b/aequilibrae/paths/odme_/results.py @@ -0,0 +1,272 @@ +""" +Results/statistics class to help record and analyse the ODME procedure: +""" + +import time +import numpy as np +import pandas as pd + + +# ADD FUNCTIONALITY TO SPLIT DATA INTO A: DEPENDENT ON ITERATION, B: DEPENDENT ON COUNT VOLUME +# ADD A WAY TO SAVE B TO CSV's +class ODMEResults(object): + """Results and statistics of an ODME procedure. + + See Use examples or docstring of ODME for how to use. + User interaction methods include: + + get_cumulative_factors() + get_iteration_statistics() + get_link_statistics() + """ + + # Columns for various dataframes: + + # This one get written to the procedure_report + ITERATION_COLS = [ + "class", + "Outer Loop #", + "Inner Loop #", + "Total Iteration #", + "Total Run Time (s)", + "Loop Time (s)", + "Convergence", + "Inner Convergence", + "Flow Objective", + "Reg Objective", + "mean_factor", + "median_factor", + "std_deviation_factor", + "variance_factor", + "min_factor", + "max_factor", + ] + + # This only for debugging + LINK_COLS = [ + "class", + "link_id", + "direction", + "Outer Loop #", + "Inner Loop #", + "Total Iteration #", + "obs_volume", + "assign_volume", + "Assigned - Observed", + ] + + # For logging different iterations: + INNER, OUTER, FINAL_LOG = 0, 1, 2 + + def __init__(self, odme: "ODME") -> None: + """ + Initialises necessary fields from odme object in order to generate + statistics and results. + + Parameters: + odme: an ODME object which this class will track. + """ + # ODME object: + self.odme = odme + + # Statistics depending on each iterations + self.iteration_stats = [] + # Information on factors to be logged with implicit ordering by ODME classes + self.current_factors = None + + # Statistics depending on each link + self.link_stats = [] + + # Iteration number data: + self.total_iter, self.outer, self.inner = 0, 0, 0 + + # Time data for logging information + self.total_time = 0 + self.loop_time = None + self.time = None + + # User Interaction Methods: + def get_cumulative_factors(self) -> pd.DataFrame: + """ + Return the cumulative factors (ratio of final to initial matrix) in a pandas dataframe. + """ + cumulative_factors = [] + for initial, final, name in zip(self.odme.original_demands, self.odme.demands, self.odme.class_names): + # Get cumulative factors for this demand matrix and store them: + factors = np.nan_to_num(final / initial, nan=1) + cumulative_factors.append( + pd.DataFrame({"class": [name for _ in range(final.size)], "Factors": factors.ravel()}) + ) + + return pd.concat(cumulative_factors, ignore_index=True) + + def get_iteration_statistics(self) -> pd.DataFrame: + """ + Returns pandas dataframe of all statistics relevant to each iteration. + See self.ITERATION_COLS for details. + """ + if len(self.iteration_stats) != 0: + return pd.concat(self.iteration_stats, ignore_index=True) + else: + return pd.DataFrame(columns=self.ITERATION_COLS) + + def get_link_statistics(self) -> pd.DataFrame: + """ + Returns pandas dataframe of all statistics relevant to each link. + See self.LINK_COLS for details. + """ + if len(self.link_stats) != 0: + return pd.concat(self.link_stats, ignore_index=True) + else: + return pd.DataFrame(columns=self.LINK_COLS) + + # Statistic Recording Methods: + def log_iter(self, iter_type: int) -> None: + """ + Logs statistics for a given iteration type (inner/outer/final log). + + Parameters: + iter_type: the type of iteration to log (includes INNER, OUTER, FINAL_LOG) + """ + if iter_type == self.INNER: + self.__prepare_inner() + elif iter_type == self.OUTER: + self.__prepare_outer() + elif iter_type == self.FINAL_LOG: + self.__prepare_final() + else: + raise ValueError(f"'{iter_type}' is not a valid type of iteration!") + + self.__log_stats() + + def __log_stats(self) -> None: + """ + Computes statistics regarding previous iteration and stores them in the statistics list. + """ + # Compute Timing: + self.__update_times() + + # Update Iteration Statistics + self.__update_iteration_stats() + + # Update Link Statistics + self.__update_link_stats() + + def __update_iteration_stats(self) -> None: + """ + Appends the newest set of statistics for the last iteration. + """ + # Create Data: + for cls_name, factor_stats in zip(self.odme.class_names, self.current_factors): + data = dict() + + data["class"] = [cls_name] + data["Outer Loop #"] = [self.outer] + data["Inner Loop #"] = [self.inner] + data["Total Iteration #"] = [self.total_iter] + data["Total Run Time (s)"] = [self.total_time] + data["Loop Time (s)"] = [self.loop_time] + data["Convergence"] = [self.odme.last_convergence] + data["Inner Convergence"] = [self.odme.convergence_change] + data["Flow Objective"] = [self.odme.flow_obj] + data["Reg Objective"] = [self.odme.reg_obj] # Only relevant for reg_spiess + data["mean_factor"] = factor_stats["mean_factor"] + data["median_factor"] = factor_stats["median_factor"] + data["std_deviation_factor"] = factor_stats["std_deviation_factor"] + data["variance_factor"] = factor_stats["variance_factor"] + data["min_factor"] = factor_stats["min_factor"] + data["max_factor"] = factor_stats["max_factor"] + + # Add the new row of statistics + self.iteration_stats.append(pd.DataFrame(data)) + + def __update_times(self): + """ + Updates the times for the last iteration. + """ + old_time = self.time + self.time = time.time() + self.loop_time = self.time - old_time + self.total_time += self.loop_time + + def __update_link_stats(self) -> None: + """ + Appends the newest set of link statistics. + """ + data = self.odme.count_volumes.copy(deep=True) + data["Outer Loop #"] = [self.outer for _ in range(len(data))] + data["Inner Loop #"] = [self.inner for _ in range(len(data))] + data["Total Iteration #"] = [self.total_iter for _ in range(len(data))] + data["Assigned - Observed"] = ( + self.odme.count_volumes["assign_volume"].to_numpy() - self.odme.count_volumes["obs_volume"].to_numpy() + ) + self.link_stats.append(data) + + def record_factor_stats(self, factors: list[np.ndarray]) -> None: + """ + Logs information on the current scaling matrix (ie + factor statistics per iteration per class). + """ + # Create statistics on all new factors: + self.current_factors = [] + for factor in factors: + self.current_factors.append( + { + "mean_factor": np.mean(factor), + "median_factor": np.median(factor), + "std_deviation_factor": np.std(factor), + "variance_factor": np.var(factor), + "min_factor": np.min(factor), + "max_factor": np.max(factor), + } + ) + + # Extra Utilities: + def init_timer(self) -> None: + """ + Initialises the internal times (for statistics purposes). + + Should be run when the ODME procedure begins execution. + """ + self.time = time.time() + + def __prepare_outer(self) -> None: + """ + Prepares logging of outer iteration + """ + self.outer += 1 + self.inner = 0 + self.total_iter += 1 + self.__reset_current_factors() + + def __prepare_inner(self) -> None: + """ + Prepares logging of inner iteration + """ + self.inner += 1 + self.total_iter += 1 + + def __prepare_final(self) -> None: + """ + Prepares final log before ODME procedure is expected to end + """ + self.outer += 1 + self.inner = 0 + self.__reset_current_factors() + + def __reset_current_factors(self) -> None: + """ + Resets the set of current factors to be missing values. + """ + self.current_factors = [] + for _ in self.odme.classes: + self.current_factors.append( + { + "mean_factor": None, + "median_factor": None, + "std_deviation_factor": None, + "variance_factor": None, + "min_factor": None, + "max_factor": None, + } + ) diff --git a/aequilibrae/paths/odme_/scaling_factors.py b/aequilibrae/paths/odme_/scaling_factors.py new file mode 100644 index 000000000..6ae5662c2 --- /dev/null +++ b/aequilibrae/paths/odme_/scaling_factors.py @@ -0,0 +1,394 @@ +""" +Implementation of ODME algorithms to obtain scaling matrices at each iteration: +""" + +from typing import Tuple +import numpy as np +import scipy.stats as spstats + + +class ScalingFactors(object): + """ODME Algorithms (Scaling Factor Generation) + + Class should not need to be used by users, only developers. + To add a new algorithm simply add it to the ALL_ALGORITHMS list here and + in the ODME class, and then update __set_algorithm and ensure your method + output a list of factor matrices for each input demand matrix as per speciciations. + """ + + ALL_ALGORITHMS = ["gmean", "spiess", "reg_spiess"] + + def __init__(self, odme: "ODME", algorithm: str) -> None: + """ + Initialises necessary fields from odme object in order to generate + a set of scaling matrices for the current iteration of the odme + procedure. + + Parameters: + odme: the ODME object containing all fields pertaining to the odme procedure + algorithm: the algorithm to use to generate scaling factors. + """ + # Set the algorithm to be used + self.algo_name = algorithm + self.__set_algorithm() + + # Partition count volumes per class for ease of use + self._c_v = odme.count_volumes + self.class_names = odme.class_names + self._class_counts = { + name: self._c_v[self._c_v["class"] == name].reset_index(drop=True) for name in self.class_names + } + + # Extra Data for Convenience + self.names_to_indices = odme.names_to_indices + + # Matrix Data + self._sl_matrices = odme._sl_matrices + self.demand_matrices = odme.demands + self.original_demands = odme.original_demands + + # Algorithm Specific Data + if algorithm in ["reg_spiess"]: + self._alpha, self._beta = odme.alpha, odme.beta + + # The ODME object itself + self.odme = odme + + def __set_algorithm(self) -> None: + """ + Set the algorithm to be used to obtain scaling factors. + """ + if self.algo_name == "gmean": + self._algorithm = self.__geometric_mean + + elif self.algo_name == "spiess": + self._algorithm = self.__spiess + + elif self.algo_name == "reg_spiess": + self._algorithm = self.__reg_spiess + + else: # Should never be called - should be dealt with in ODME class + raise ValueError( + f"Invalid algorithm name: {self.algo_name}" "Valid algorithms are: " "\n".join(self.ALL_ALGORITHMS) + ) + + def generate(self) -> list[np.ndarray]: + """ + Returns scaling factors for this iteration of the ODME procedure. + """ + return self._algorithm() + + # gmean (Geometric Mean): + def __geometric_mean(self) -> list[np.ndarray]: + """ + Calculates scaling factor based on geometric mean of ratio between + proportionally (via SL matrix) assigned flow & observed flows. + + MULTI-CLASS UNDER DEVELOPMENT! (REQUIRES TESTING) + NOTE - This algorithm is defunct, it has no advantage over spiess + and only existed for initial testing purposes. + """ + # Steps: + # 1. For each link create a factor f_a given by \hat v_a / v_a + # 2. Create a matrix of factors for each link where for a given OD pair i, + # the factor at i is f_a if the corresponding value in the SL matrix + # is non-zero, and 1 otherwise. + # 3. Return the geometric mean of all the factor matrices component-wise + # NOTE - This may be slower due to holding all these matrices in memory + # simultaneously. It is possible to do this e.g element-wise or row-wise + # to save on memory usage. Need to test this later on. + # NOTE - by not approximating step size we may over-correct massively. + + scaling_factors = [] + # Steps 1 & 2: + for demand, name in zip(self.demand_matrices, self.class_names): + observed = self._c_v[self._c_v["class"] == name] + + # If there are no observations leave matrix unchanged + if len(observed) == 0: + scaling_factors.append(np.ones(demand.shape)) + continue + + factors = np.empty((len(observed), *(demand.shape))) + for j, row in self._c_v.iterrows(): + # Create factor matrix: + if row["obs_volume"] != 0 and row["assign_volume"] != 0: + + # Modulate factor by select link dependency: + link_factor = (row["obs_volume"] / row["assign_volume"]) - 1 + sl_matrix = self._sl_matrices[self.odme.get_sl_key(row)] + factor_matrix = sl_matrix * link_factor + + # Apply factor limiting: + # factor_matrix = np.clip(factor_matrix, -self.GMEAN_LIMIT, self.GMEAN_LIMIT) + + # Center factors at 1: + factor_matrix = factor_matrix + 1 + + # If assigned or observed value is 0 we cannot do anything right now + else: + factor_matrix = np.ones(demand.shape) + + # Add factor matrix + factors[j, :, :] = factor_matrix + + # If the assigned volume was 0 (or both 0) no OD pair can have any effect + factors = np.nan_to_num(factors, nan=1, posinf=1, neginf=1) + + # Add the factors for this class to the array: + scaling_factors.append(spstats.gmean(factors, axis=0)) + + # Step 3: + return scaling_factors + + # spiess (Spiess, 1990) + def __spiess(self) -> list[np.ndarray]: + """ + Calculates scaling factor based on gradient descent method via SL matrix, + assigned flow & observed flows as described by (Spiess, 1990) + """ + # Derivative matrices for spiess algorithm: + gradient_matrices = self.__get_derivative_matrices_spiess() + + # Get optimum step sizes for current iteration: + step_sizes = self.__get_step_sizes_spiess(gradient_matrices) + + # Get scaling factors: + scaling_factors = [1 - (step * gradient) for step, gradient in zip(step_sizes, gradient_matrices)] + return scaling_factors + + def __get_derivative_matrices_spiess(self) -> list[np.ndarray]: + """ + Returns derivative matrix (see (Spiess, 1990) and technical documentation) + """ + # There are probably some numpy/cython ways to do this in parallel and + # without storing too many things in memory. + derivatives = [] + # Create a derivative matrix for each user class: + for demand, user_class in zip(self.demand_matrices, self.class_names): + observed = self._class_counts[user_class] + factors = np.empty((len(observed), *(demand.shape))) + for j, row in observed.iterrows(): + sl_matrix = self._sl_matrices[self.odme.get_sl_key(row)] + factors[j, :, :] = sl_matrix * (row["assign_volume"] - row["obs_volume"]) + + # Add derivative matrix to list of derivatives: + derivatives.append(np.sum(factors, axis=0)) + + return derivatives + + def __get_step_sizes_spiess(self, gradients: list[np.ndarray]) -> list[float]: + """ + Returns estimate of optimal step size (see (Spiess, 1990) and technical documentation) + + Parameters: + gradients: The previously calculated gradient matrices - required for calculating + derivative of link flows with respect to step size. + """ + # Note, we could reduce the number of bounds we need to calculate + # (see technical documentation) - however this likely doesn't give rise to + # any significant speed increase. + all_bounds = self.__get_step_size_limits_spiess(gradients) + + # Calculate step-sizes (lambdas) for each gradient matrix: + lambdas = [] + for bounds, user_class, gradient in zip(all_bounds, self.class_names, gradients): + # Calculating link flow derivatives: + flow_derivatives = self.__get_flow_derivatives_spiess(user_class, gradient) + + # Calculate minimising step length: + errors = self.__get_flow_errors(user_class) + min_lambda = np.sum(flow_derivatives * errors) / np.sum(np.square(flow_derivatives)) + + # If all flow derivatives are 0 we should not perturb matrix (i.e, step-size = 0) + if np.isnan(min_lambda): + min_lambda = 0 + + # Check minimising lambda is within bounds: + lambdas.append(self.__enforce_bounds(min_lambda, *bounds)) + + return lambdas + + def __get_flow_derivatives_spiess(self, user_class: str, gradient: np.ndarray) -> np.ndarray: + """ + Returns an array of flow derivatives (v_a' in technical documentation) + for the particular class. + + Parameters: + user_class: the name of the class from which to find flow derivatives + gradient: the gradient for the relevant class + """ + data = self._class_counts[user_class] + demand = self.demand_matrices[self.names_to_indices[user_class]] + + # Calculating link flow derivatives: + flow_derivatives = np.empty(len(data)) + for j, row in data.iterrows(): + sl_matrix = self._sl_matrices[self.odme.get_sl_key(row)] + flow_derivatives[j] = -np.sum(demand * sl_matrix * gradient) + + return flow_derivatives + + def __get_flow_errors(self, user_class: str) -> np.ndarray: + """ + For a particular class returns an array of errors + of the form (observed - assigned,...) for each count + volume given for that class. + """ + data = self._class_counts[user_class] + return data["obs_volume"].to_numpy() - data["assign_volume"].to_numpy() + + def __enforce_bounds(self, value: float, upper: float, lower: float) -> float: + """ + Given a value, and an upper and lower bounds returns the value + if it sits between the bounds, and otherwise whichever bounds was + violated. + + E.g. self.__enforce_bounds(1.1, 10, 2) = 2 + + Parameters: + value: the values to check + upper: the upper bound + lower: the lower bound + """ + if value > upper: + return upper # Upper Bound Violated + elif value < lower: + return lower # Lower Bound Violated + else: + return value # Bounds Not Violated + + def __get_step_size_limits_spiess(self, gradients: list[np.ndarray]) -> list[Tuple[float, float]]: + """ + Returns bounds for step size in order of upper bound, then lower bound (see (Spiess, 1990) + and technical documentation) for each gradient matrix. + + Parameters: + gradient: The currently calculating gradient matrix - required for calculating + derivative of link flows with respect to step size. + """ + bounds = [] + # Create each bound and check for edge cases with empty sets: + for demand, gradient in zip(self.demand_matrices, gradients): + # Upper bound: + upper_mask = np.logical_and(demand > 0, gradient > 0) + if np.any(upper_mask): + upper_lim = 1 / np.min(gradient[upper_mask]) + else: + upper_lim = float("inf") + + # Lower bound: + lower_mask = np.logical_and(demand > 0, gradient < 0) + if np.any(lower_mask): + lower_lim = 1 / np.max(gradient[lower_mask]) + else: + lower_lim = float("-inf") + + bounds.append((upper_lim, lower_lim)) # Tuple[float, float] + + return bounds + + # regularised spiess - see technical documentation for details + # TODO - generalise fully to multi-class (including within technical + # documentation) and experiment/record results to ascertain efficacy + # of algorithm. + def __reg_spiess(self) -> list[np.ndarray]: + """ + Calculates scaling factor based on gradient descent method via SL matrix, + assigned flow & observed flows for the algorithm by Spiess with a + regularisation term attached. + + NOTE - multi-class ODME is NOT yet supported for this algorithm, + the technical documentation has not been updated to check for certain + how this should be implemented for such cases. + """ + # Derivative matrices for spiess algorithm: + gradient_matrices = self.__get_derivative_matrices_reg_spiess() + + # Get optimum step sizes for current iteration: + step_sizes = self.__get_step_sizes_reg_spiess(gradient_matrices) + + # Get scaling factors: + scaling_factors = [1 - (step * gradient) for step, gradient in zip(step_sizes, gradient_matrices)] + + return scaling_factors + + def __get_derivative_matrices_reg_spiess(self) -> list[np.ndarray]: + """ + Returns derivative matrix (see notes in SMP Teams) + + NOTE - multi-class ODME is NOT yet supported for this algorithm, + the technical documentation has not been updated to check for certain + how this should be implemented for such cases. + """ + spiess_grads = self.__get_derivative_matrices_spiess() + g_hats = self.original_demands + reg_grads = [demand - g_hat for demand, g_hat in zip(self.demand_matrices, g_hats)] + + return [ + (self._alpha * spiess) + (self._beta * regularisation) + for regularisation, spiess in zip(reg_grads, spiess_grads) + ] + + def __get_step_sizes_reg_spiess(self, gradients: list[np.ndarray]) -> list[float]: + """ + Returns estimate of optimal step size (see technical documentation) + + Parameters: + gradients: The previously calculated gradient matrices - required for calculating + derivative of link flows with respect to step size and finding 'eta' term + (see same paper - basically rate of change of objective w.r.t. change in demand + across iteration application of 'f'). + + NOTE - multi-class ODME is NOT yet supported for this algorithm, + the technical documentation has not been updated to check for certain + how this should be implemented for such cases. Single class also required more testing + """ + # NOTE - as per technical doc, bounds do not change from reg_spiess to spiess for + # single class. + all_bounds = self.__get_step_size_limits_spiess(gradients) + + # Calculate step-sizes (lambdas) for each gradient matrix: + lambdas = [] + for gradient, user_class, bounds in zip(gradients, self.class_names, all_bounds): + # Calculating flow components for step size: + flow_derivatives = self.__get_flow_derivatives_spiess(user_class, gradient) + flow_errors = self.__get_flow_errors(user_class) + + # Calculate demand components of step size + demand_errors = self.__get_demand_errors(user_class) + demand_derivative = self.__get_demand_derivative(user_class, gradient) + + # Calculate minimising step length: MAY WANT TO MAKE THIS A SEPARATE FUNCTION + min_lambda = ( + (self._alpha * np.sum(flow_derivatives * flow_errors)) + + (self._beta * np.sum(demand_errors * demand_derivative)) + ) / ( + (self._alpha * np.sum(np.square(flow_derivatives))) + + (self._beta * np.sum(np.square(demand_derivative))) + ) + + # If all flow derivatives are 0 we should not perturb matrix (i.e, step-size = 0) + if np.isnan(min_lambda): + min_lambda = 0 + + # Check minimising lambda is within bounds: + lambdas.append(self.__enforce_bounds(min_lambda, *bounds)) + + return lambdas + + def __get_demand_errors(self, user_class: str) -> np.ndarray: + """ + Returns array of errors between current and initial demand matrices + of the form (initial - current,...) + """ + index = self.names_to_indices[user_class] + return self.original_demands[index] - self.demand_matrices[index] + + def __get_demand_derivative(self, user_class: str, gradient: np.ndarray) -> np.ndarray: + """ + Returns array of 'eta' terms (see technical documentation) + for a given class. + """ + demand = self.demand_matrices[self.names_to_indices[user_class]] + return -(demand * gradient) diff --git a/aequilibrae/paths/traffic_assignment.py b/aequilibrae/paths/traffic_assignment.py index a83b120d7..a2ec20bd0 100644 --- a/aequilibrae/paths/traffic_assignment.py +++ b/aequilibrae/paths/traffic_assignment.py @@ -70,8 +70,14 @@ def execute(self, log_specification=True) -> None: """Processes assignment""" if log_specification: self.log_specification() + + self._prepare_execute() self.assignment.execute() + @abstractmethod + def _prepare_execute(self): + pass + @abstractmethod def log_specification(self): pass @@ -502,6 +508,20 @@ def __validate_parameters(self, kwargs) -> bool: raise ValueError("List of functions {} for vdf {} has an inadequate set of parameters".format(q, self.vdf)) return True + def _prepare_execute(self) -> None: + """ + Reset's arrays used relating to congested times in assignment algorithm. + + Allows for re-use of assignment for ODME procedure. + """ + self.__dict__["congested_time"] = np.array(self.free_flow_tt, copy=True) + + # Re-instatiate arrays for use in assignment algorithm. + if self.algorithm in ["all-or-nothing", "msa", "frank-wolfe", "cfw", "bfw"]: + self.assignment = LinearApproximation(self, self.algorithm, project=self.project) + else: + raise ValueError("Algorithm not listed in the case selection") + def log_specification(self): self.logger.info("Traffic Class specification") for cls in self.classes: @@ -983,3 +1003,7 @@ def set_frequency_field(self, frequency_field: str) -> None: """ self._check_field(frequency_field) self._config["Frequency field"] = frequency_field + + def _prepare_execute(self) -> None: + """Does nothing, included for base class compatibility""" + pass diff --git a/aequilibrae/project/data/matrices.py b/aequilibrae/project/data/matrices.py index 9b6923675..6bd18d35e 100644 --- a/aequilibrae/project/data/matrices.py +++ b/aequilibrae/project/data/matrices.py @@ -6,7 +6,7 @@ from aequilibrae.matrix import AequilibraeMatrix from aequilibrae.project.data.matrix_record import MatrixRecord from aequilibrae.project.table_loader import TableLoader -from aequilibrae.utils.db_utils import commit_and_close +from aequilibrae.utils.db_utils import add_column_unless_exists, commit_and_close class Matrices: @@ -14,6 +14,7 @@ class Matrices: def __init__(self, project): self.project = project + self._ensure_report_column() self.logger = project.logger self.__items = {} self.__fields = [] @@ -195,3 +196,6 @@ def new_record(self, name: str, file_name: str, matrix=None) -> MatrixRecord: def _clear(self): """Eliminates records from memory. For internal use only""" self.__items.clear() + + def _ensure_report_column(self): + add_column_unless_exists(self.project.conn, "matrices", "report", "TEXT") diff --git a/aequilibrae/project/data/matrix_record.py b/aequilibrae/project/data/matrix_record.py index dec285bef..b5a8197d2 100644 --- a/aequilibrae/project/data/matrix_record.py +++ b/aequilibrae/project/data/matrix_record.py @@ -1,3 +1,4 @@ +import json from os import unlink from os.path import isfile, join @@ -47,6 +48,11 @@ def update_cores(self): """Updates this matrix record with the matrix core count in disk""" self.__dict__["cores"] = self.__get_cores() + @property + def report(self): + """Retrieves the underlying report and decodes from JSON""" + return json.loads(self.__dict__["report"]) + def get_data(self) -> AequilibraeMatrix: """Returns the actual matrix for further computation @@ -67,7 +73,11 @@ def __setattr__(self, instance, value) -> None: elif instance == "file_name": raise ValueError("There is another matrix record for this file") - self.__dict__[instance] = value + if instance == "report": + self.__dict__[instance] = json.dumps(value) + else: + self.__dict__[instance] = value + if instance in ["file_name", "cores"]: self.__dict__["cores"] = self.__get_cores() diff --git a/aequilibrae/project/database_specification/network/tables/matrices.sql b/aequilibrae/project/database_specification/network/tables/matrices.sql index 2e42de392..b2e84e606 100644 --- a/aequilibrae/project/database_specification/network/tables/matrices.sql +++ b/aequilibrae/project/database_specification/network/tables/matrices.sql @@ -24,7 +24,9 @@ create TABLE if not exists matrices (name TEXT NOT NULL PRIMARY KEY procedure TEXT, procedure_id TEXT, timestamp DATETIME DEFAULT current_timestamp, - description TEXT); + description TEXT, + report TEXT +); --# diff --git a/docs/source/examples/full_workflows/plot_matrix_estimation.py b/docs/source/examples/full_workflows/plot_matrix_estimation.py new file mode 100644 index 000000000..afbf822b7 --- /dev/null +++ b/docs/source/examples/full_workflows/plot_matrix_estimation.py @@ -0,0 +1,198 @@ +""" +.. _plot_matrix_estimation: + +Matrix Estimation +================= + +In this example, we show how to use the Origin Destination Matrix Estimation (ODME) procedure to deform a given set of demand matrices to more accurately produce known count volumes. +""" + +import os +import uuid +import zipfile +from os.path import join, dirname +from uuid import uuid4 +from tempfile import gettempdir +import pandas as pd +import matplotlib.pyplot as plt +import seaborn as sns +import random + +from aequilibrae import TrafficAssignment, TrafficClass, Graph, Project, ODME +from aequilibrae.utils.create_example import create_example + +import warnings +warnings.filterwarnings('ignore') + +# %% [markdown] +# To begin ODME - open the relevant project and demand matrix, and create the relevant graphs with any desired parameters.Here we use a sample project from 'Sioux Falls' with a single car user class. ODME can be performed with multiple user classes (although not multiple cores per class for the moment). + +# %% +# Create example project on Sioux Falls Network +fldr = join(gettempdir(), uuid4().hex) +project = create_example(fldr) + +project.network.build_graphs() +car_graph = project.network.graphs["c"] # type: Graph + +car_graph.set_graph("free_flow_time") +car_graph.set_blocked_centroid_flows(False) +matrix = project.matrices.get_matrix("demand_omx") +matrix.computational_view() # Make sure to set the computational view appropriately. + +# %% [markdown] +# The user must create a TrafficAssignment & TrafficClass objects and set their desired assignment parameters this is the assignment ODME will use when trying to make the assigned volumes conform to the user input count volumes. If an invalid assignment object is given ODME will not guarantee any particular behaviour - see TrafficAssignment (REFERENCE HERE) for more information on that. + +# %% +assignment = TrafficAssignment() +assignclass = TrafficClass("car", car_graph, matrix) +assignment.set_classes([assignclass]) +assignment.set_vdf("BPR") +assignment.set_vdf_parameters({"alpha": 0.15, "beta": 4.0}) +assignment.set_vdf_parameters({"alpha": "b", "beta": "power"}) +assignment.set_capacity_field("capacity") +assignment.set_time_field("free_flow_time") +assignment.max_iter = 5 +assignment.set_algorithm("msa") + +# %% [markdown] +# Now we need to produce a set of count data - in practise this will be determined experimentally. For now, we will run an assignment and change the assigned volumes by up to 3% (this will ensure the ODME procedure has something to do) for an arbitrarily chosen set of links to synthetically generate our data. See TrafficAssignment (REFERENCE HERE) for information on the assignment execution. + +# %% +# The set of links for our counts: +links = [1, 14, 17, 25, 38] + +assignment.execute() +assign_df = assignment.results().reset_index(drop=False).fillna(0) +flow = lambda link: assign_df.loc[assign_df["link_id"] == link, "matrix_ab"].values[0] + +# The columns to our data dataframe must be: 'class', 'link_id', 'direction', 'obs_volume' +# where 'obs_volume' is the count data. +columns = ODME.COUNT_VOLUME_COLS + +# Synthetically generated data (changing counts by upto a 5% increase): +random.seed(0) +data = [["car", link, 1, flow(link) * random.uniform(1, 1.05)] for link in links] + +counts = pd.DataFrame(data=data, columns=columns) + +# %% [markdown] +# Now that we have generated our data, we can now set our parameters for ODME. Feel free to use any of the algorithms available (see ODME.ALL_ALGORITHMS) and experiment with the stopping criteria. Stopping criteria if specified must be in the format shown below. + +# %% +# We will run a quick ODME procedure: +stop_crit = {"max_outer": 10, + "max_inner": 40, + "convergence_crit": 1, + "inner_convergence": 0.1} + +algorithm = "spiess" +odme = ODME(assignment, + counts, + stop_crit=stop_crit, + algorithm=algorithm, + alpha=0.5 # Only used for regularised spiess + ) + +# %% [markdown] +# We can now run the ODME procedure. Note that when we execute ODME it will make a copy of all input matrices and leave the original matrices unchanged. + +# %% +odme.execute(verbose=True, print_rate=2) # Verbose/print_rate allow us to track the progress of ODME as it occurs + +# %% [markdown] +# If we wish to we can save the results using the save_to_project() method which will both save the results and generate a procedure report (saved in the project database via sqlite). + +# %% +# If we wish to save the project we can call the following function - +# (both .omx or .aem are supported for save_to_project()) +odme.save_to_project("example_doc", "example_doc.omx", project=project) + +# %% [markdown] +# From here, we can use the methods below to extract and then visualise/process the results (without saving them). To access the matrices themselves directly use the TrafficAssignment object to access the new TrafficClasses within which the matrices are contained. + +# %% +# To get the demand matrices as a list of numpy arrays: +new_demands = odme.get_demands() + +# A dataframe containing extensive statistics for each iteration: +iteration_stats = odme.results.get_iteration_statistics() # use iteration_stats.columns to see all data provided + +# A dataframe containing statistics pertaining to each link +link_stats = odme.results.get_link_statistics() # use link_stats.columns to see all data provided + +# An unordered list of cumulative factors applied to each demand matrix +# (the same as dividing the final demand matrix by the initial demand) +cumulative_factors = odme.results.get_cumulative_factors() + +# %% [markdown] +# From here the results can be analysed by the user - we will provide a few sample plots of what can be visualised. + +# %% +# Uncomment this code to generate a directory within which to save plots: +# os.mkdir("odme_plots") + +# %% +# Plotting all count volumes across iterations: +sns.relplot(x='Total Iteration #', + y='Assigned - Observed', + hue='link_id', + kind='line', + data=link_stats, + markers=True, + dashes=False) +plt.title(f"Assigned Link Error over Iterations (using {algorithm})") +plt.tight_layout() + +# Uncomment this code to save the plot: +# plt.savefig(f"odme_plots/link_err_{algorithm}.png", dpi=300) + +# %% +# Plotting the factors applied across each iteration: +plt.plot(iteration_stats['Total Iteration #'], iteration_stats['mean_factor'], color="red") +plt.plot(iteration_stats['Total Iteration #'], iteration_stats['min_factor'], color="green") +plt.plot(iteration_stats['Total Iteration #'], iteration_stats['max_factor'], color="blue") + +plt.xlabel("Iterations") +plt.ylabel("Factor Size") +plt.title("Factor Distribution across Iterations") +plt.tight_layout() + +# Uncomment this code to save the plot: +# plt.savefig(f"odme_plots/factor_iter_{algorithm}.png", dpi=300) + +# %% +# Plotting a histogram to visualise the total (cumulative) factors applied to the matrix: +sns.histplot(cumulative_factors['Factors'], bins=40, kde=False, color='skyblue') +plt.title("Cumulative Factor Distribution") +plt.tight_layout() + +# Uncomment this code to save the plot: +# plt.savefig(f"odme_plots/cum_factor_{algorithm}.png", dpi=300) + +# %% +# Plotting how the objective function changes across iterations: +sns.lineplot(x='Total Iteration #', + y='Convergence', + data=iteration_stats, + markers=True, + dashes=False) +plt.title("Objective Function over Iterations") + +# We can extract and visualise when each new outer iteration begins: +outer_iterations = iteration_stats[iteration_stats["Inner Loop #"] == 0]["Total Iteration #"] +for outer_iteration in outer_iterations: + plt.axvline(x=outer_iteration, color='lightgrey', linestyle='--', linewidth=1) +plt.tight_layout() + +# Uncomment this code to save the plot: +# plt.savefig(f"odme_plots/obj_func_{algorithm}.png", dpi=300) + +# %% [markdown] +# Finally we close the project and matrices: + +# %% +matrix.close() +project.close() + + diff --git a/docs/source/modeling_with_aequilibrae/modeling_concepts/matrix_estimation.rst b/docs/source/modeling_with_aequilibrae/modeling_concepts/matrix_estimation.rst new file mode 100644 index 000000000..9574fc1d3 --- /dev/null +++ b/docs/source/modeling_with_aequilibrae/modeling_concepts/matrix_estimation.rst @@ -0,0 +1,133 @@ +Matrix estimation via AequilibraE's ODME Procedure +--------------------------------------------------- + +Origin-Destination Matrix Estimation involves fitting an initial estimate for an +OD matrix to more accurately represent count volume data. Within AequilibraE the +**ODME** object handles this procedure. The **ODME** object relies heavily on the +input **TrafficAssignment** object, details of which :ref:`are given here `. + +AequilibraE ODME Overview +~~~~~~~~~~~~~~~~~~~~~~~~~ +The **ODME** infrastructure involves 2 layers of iterative processes (ie a bilevel optimisation +problem). There is a 'outer loop' and for each iteration within such an outer loop +there is an 'inner loop'. The outer (or upper) optimisation task involves actually attempting +to converge upon a local (close to the initial demand matrices) solution which accurately fits +the count volume data. Within the inner (or lower) optimisation problem we make the assumption that +paths will remain constant with small changes to the demand matrices and attempt to optimise with +respect to that (see :ref:`technical documentation ` for details). At the beginning +of each outer iteration we recompute paths to compensate for congestion generated over the previous +inner loop. For various algorithms the process of optimising within this inner loop is what typically +defines each iterative gradient based algorithm. This inner loop involves iteratively finding factor +matrices which based on the current assumptions/constraints should optimise the solution best. Implementation +of algorithms themselves can be seen in the **ScalingFactors** class, whilst the main loop infrastructure +is contained in the **ODME** class. There is also a results/statistics gathering class called +**ODMEResults** which operates alongside the **ODME** class. + +ODME Parameters +~~~~~~~~~~~~~~~ +There are a few parameters the user must specify upon initialisation of the ODME object: + +* **assignment**: A TrafficAssignment object containing all necessary information to assign flows + to each link given input demand matrices (see :ref:`the TrafficAssignment page ` + for details). + +* **count_volumes**: The data collected on which **ODME** will attempt to optimise the demand matrices. + These should be given in a pandas dataframe as shown in :ref:`plot_matrix_estimation`. + +* **stop_crit**: Given as a dictionary (see :ref:`plot_matrix_estimation` for details) which provide the + necessary criterion for controlling the iterative process - in particular both the maximum number of + iterations for both the inner and outer loops, and the convergence threshold (a threshold for an objective + function determining when to exit the loop) for both the inner and outer loop. In particular, note that + the inner convergence criterion specifies a change in convergence rather than a fixed threshold, since + we do not always expect an inner loop to necessarily converge to 0. + +* **algorithm**: The algorithm to be used - currently there are 3 algorithms implemented and "spiess" + is currently the most reliable. + +* **alpha**: This is a hyper-parameter used currently only for the regularised spiess algorithm. + See the :ref:`technical documentation ` for details. + +Execution +~~~~~~~~~ +See the :ref:`plot_matrix_estimation` page for a full example on how to execute **ODME**. Given that +you have already created an appropriate **TrafficAssignment** object and have the **count_volumes** dataframe +loaded you can execute the following: + +.. code-block:: python + + odme = ODME(assignment, count_volumes) + odme.execute(verbose=True, print_rate=5) + +See :ref:`plot_matrix_estimation` for all optional arguments. During execution **verbose** will print out +updates to the terminal during runtime regarding how many outer loops have passed and will print at the rate +specified by **print_rate**. + +Algorithms +~~~~~~~~~~ + +For now, the only algorithms available in AequilibraE are: + +* Spiess: +The original paper by Spiess (1990) is :ref:`given here ` and we have implemented the +algorithm shown there. See the :ref:`technical documentation ` for details on this. The +main goal of Spiess at each inner loop is to minimise the following objective function: +.. math:: Z(demand) = \sum\limits_{a \in counts} \left(flow(a, demand) - count(a))^2 + +* Regularised Spiess: +This is a more novel algorithm intended to try and ensure the solution we obtain is closer to +the initial set of demand matrices. How tightly we control this is dependent on the input +hyper-parameter alpha. See the :ref:`technical documentation ` for details. This +procedure still requires testing to determine how useful it is - and users should feel free to +try it out for themselves. + +Stopping Criterion +~~~~~~~~~~~~~~~~~~ +The main hyper-parameter's to each iterative gradient based ODME procedure are the stopping criterion +(although some algorithms have additional parameters). There are 4 stopping criteria as follows: + +* **max_outer**: An integer representing the maximum number of outer iterations allowed - this will + supersede the regular convergence criterion if that isn't met. Note that this effectively places a + maximum time limit on the procedure - a large portion of time is spent on recomputing paths at each + outer iteration, and hence increasing this is expensive timewise. However, the more outer iterations + completed the more accurate typically a solution will be, so there is a tradeoff which has to be + determined by the user. + +* **max_inner**: This is the maximum number of inner iterations allowed and stops the procedure in the + event that the lower level optimisation problem is unable to converge. This is usually not as much + of a problem as the procedure is typically able to converge given the assumption that paths are constant + (see :ref:`technical documentation ` for details). + +* **convergence_crit**: A float which represents the threshold for the objective function which the + procedure aims to reach. The smaller this value, the longer the procedure will continue. Typically + a smaller value here will correspond to a more accurate solution - however, in many cases it is not + possible to converge to 0 among outer iterations and the outer maximum iteration limit will be hit + regardless. + +* **inner_convergence**: The inner convergence is actually representative of the change in convergence + across iterations. It determines when we are no longer converging quickly enough for the inner loop + to have any more significant of an effect and thus when the procedure should stop. Whilst the lower + level optimisation problem will be solved more accurately with a smaller **inner_convergence** - this + can potentially cause the assumption of constant paths to be violated across an inner iteration and so + it is up to the user to determine how small they wish to set this parameter. Note that in most cases + this should be smaller than the **convergence_crit** parameter at the very most. + +Results +~~~~~~~ +There are 2 ways to extract the results of **ODME** - you can load them in memory with the +**get_demands()** method or save them to disk using the **save_to_project** method. However, +aside from these we may also want to determine the effectiveness of the **ODME** procedure itself. +Within the :ref:`plot_matrix_estimation` notebook there are a number of examples of such plots +showing how the error in link flows and factor size proceeds over various iterations. In particular, +the plots on link flow errors are useful to determine if the solution is converging appropriately +(although for regularised spiess this is not alway intended to directly converge). Another important +plot is the cumulative factor distribution - this is useful for comparing different algorithms/runs of +**ODME** in order to determine the relative change to the intial demand matrices. Refer to the example +notebook at :ref:`plot_matrix_estimation` for more, here is the main code for obtaining results: +.. codeblock:: python + + odme.save_to_project("example_doc", "example_doc.omx", project=project) + new_demands = odme.get_demands() + + iteration_stats = odme.results.get_iteration_statistics() # Statistics over iterations + link_stats = odme.results.get_link_statistics() # Statistics tracking links + cumulative_factors = odme.results.get_cumulative_factors() # Cumulative factor distribution diff --git a/docs/source/modeling_with_aequilibrae/modeling_concepts/multi_class_equilibrium.rst b/docs/source/modeling_with_aequilibrae/modeling_concepts/multi_class_equilibrium.rst index ae9d673c5..a0446d4e6 100644 --- a/docs/source/modeling_with_aequilibrae/modeling_concepts/multi_class_equilibrium.rst +++ b/docs/source/modeling_with_aequilibrae/modeling_concepts/multi_class_equilibrium.rst @@ -149,7 +149,7 @@ Other opportunities for parallelization, such as the computation of costs and its derivatives (required during the line-search optimization step), as well as all linear combination operations for vectors and matrices have been achieved through the use of OpenMP in pure Cython code. These implementations can be -cound on a file called *parallel_numpy.pyx* if you are curious to look at. +found on a file called *parallel_numpy.pyx* if you are curious to look at. Much of the gains of going back to Cython to parallelize these functions came from making in-place computation using previously existing arrays, as the diff --git a/tests/aequilibrae/paths/test_odme.py b/tests/aequilibrae/paths/test_odme.py new file mode 100644 index 000000000..74188bdea --- /dev/null +++ b/tests/aequilibrae/paths/test_odme.py @@ -0,0 +1,498 @@ +"""Basic tests for ODME infrastructure.""" + +import os +import uuid +import zipfile +from os.path import join, dirname +from tempfile import gettempdir +from unittest import TestCase +import pandas as pd +import numpy as np + +from aequilibrae import TrafficAssignment, TrafficClass, Graph, Project, ODME +from aequilibrae.utils.create_example import create_example +from ...data import siouxfalls_project + +# NOTE - we cannot test using bfw/cfw until Issue #493 is resolved. + + +class TestODMESingleClass(TestCase): + """ + Basic unit tests for ODME single class execution + """ + + def setUp(self) -> None: + # Set up data: + os.environ["PATH"] = os.path.join(gettempdir(), "temp_data") + ";" + os.environ["PATH"] + proj_path = os.path.join(gettempdir(), "test_odme_files" + uuid.uuid4().hex) + os.mkdir(proj_path) + zipfile.ZipFile(join(dirname(siouxfalls_project), "sioux_falls_single_class.zip")).extractall(proj_path) + + # Initialise project: + self.project = Project() + self.project.open(proj_path) + self.project.network.build_graphs() + self.car_graph = self.project.network.graphs["c"] # type: Graph + + self.car_graph.set_graph("free_flow_time") + self.car_graph.set_blocked_centroid_flows(False) + + # Using copy ensures we can manipulate either .aem or .omx matrices + self.matrix = self.project.matrices.get_matrix("demand_aem").copy(memory_only=True) + self.matrix.computational_view() + + # Initial assignment parameters: + self.assignment = TrafficAssignment() + self.assignclass = TrafficClass("car", self.car_graph, self.matrix) + self.assignment.set_classes([self.assignclass]) + self.assignment.set_vdf("BPR") + self.assignment.set_vdf_parameters({"alpha": 0.15, "beta": 4.0}) + self.assignment.set_vdf_parameters({"alpha": "b", "beta": "power"}) + self.assignment.set_capacity_field("capacity") + self.assignment.set_time_field("free_flow_time") + self.assignment.max_iter = 5 + self.assignment.set_algorithm("msa") + + # Extra data for convenience: + self.index = self.car_graph.nodes_to_indices + # Extend dimensions by 1 to enable an AequilibraeMatrix to be used + self.dims = self.matrix.matrix_view.shape + (1,) + + # Change this to test different algorithms + self.algorithm = "spiess" + + def tearDown(self) -> None: + self.matrix.close() + self.project.close() + + # 1) Edge Cases + def test_basic_1_1(self) -> None: + """ + Check that running ODME with 0 demand matrix returns 0 matrix, with + single count volume of 0. + """ + # Set synthetic demand matrix & count volumes + self.matrix.matrices = np.zeros(self.dims) + count_volumes = pd.DataFrame(data=[["car", 1, 1, 0]], columns=ODME.COUNT_VOLUME_COLS) + + # Run ODME algorithm. + odme = ODME(self.assignment, count_volumes, algorithm=self.algorithm) + odme.execute() + + # Check result: + np.testing.assert_allclose( + np.zeros(self.dims), + odme.get_demands()[0], + err_msg="0 demand matrix with single count volume of 0 does not return 0 matrix", + ) + + def test_basic_1_2(self) -> None: + """ + Given a demand matrix with 0 demand at certain OD's, + following ODME the new demand matrix should have 0 demand at those OD's + with many count volumes. + """ + # Set synthetic demand matrix & count volumes + demand = np.ones(self.dims) + zeroes = [(18, 6), (5, 11), (11, 5), (23, 2), (13, 19), (19, 21), (19, 24), (17, 5)] + for orig, dest in zeroes: + demand[self.index[orig], self.index[dest], 0] = 0 + self.matrix.matrices = demand + + data = [ + ["car", 9, 1, 30], + ["car", 11, 1, 2500], + ["car", 35, 1, 0], + ["car", 18, 1, 100], + ["car", 6, 1, 2], + ["car", 65, 1, 85], + ["car", 23, 1, 0], + ] + count_volumes = pd.DataFrame(data=data, columns=ODME.COUNT_VOLUME_COLS) + + # Run ODME algorithm. + odme = ODME(self.assignment, count_volumes, algorithm=self.algorithm) + odme.execute() + + # Check result: + err_msg = "Demand matrix with many 0 entries, has non-zero demand " + "following ODME at one of those entries" + for orig, dest in zeroes: + np.testing.assert_array_equal( + odme.get_demands()[0][self.index[orig], self.index[dest], 0], + 0, + err_msg=err_msg, + ) + + def test_basic_1_3(self) -> None: + """ + Given count volumes which are identical to the assigned volumes of an + initial demand matrix - ODME should not change this demand matrix (since + we are looking for a local solution and this already provides one). + + Also checks that the shape of the resulting matrix matches the intial + demand matrix. + + Checks with many count volumes. + """ + init_demand = np.copy(self.matrix.matrix_view) + + # Extract assigned flow on various links + self.assignment.execute() + assign_df = self.assignment.results().reset_index(drop=False).fillna(0) + links = [1, 2, 4, 5, 6, 8, 11, 12, 14, 19, 23, 26, 32, 38, 49, 52, 64, 71, 72] + flows = [assign_df.loc[assign_df["link_id"] == link, "matrix_ab"].values[0] for link in links] + + # Perform ODME with unchanged count volumes + count_volumes = pd.DataFrame( + data=[["car", link, 1, flows[i]] for i, link in enumerate(links)], columns=ODME.COUNT_VOLUME_COLS + ) + odme = ODME(self.assignment, count_volumes, algorithm=self.algorithm) + odme.execute() + + # Check results + np.testing.assert_allclose( + init_demand[:, :, np.newaxis], + odme.get_demands()[0], + err_msg=( + "Demand matrix changed when given many links with observed " + + "volume equal to initial assigned volumes" + ), + ) + + # 2) Input Validity + def test_basic_2_1(self) -> None: + """ + Check ValueError is raised if count volumes are not given appropriately. + These include: + - no count volumes given + - negative count volumes given + - duplicate count volumes given + - non-float/integer count volumes given + """ + # No count volumes: + with self.assertRaises(ValueError): + ODME(self.assignment, pd.DataFrame(data=[], columns=ODME.COUNT_VOLUME_COLS), algorithm=self.algorithm) + + # Negative count volumes: + links = [1, 3, 10, 30, 36, 41, 49, 57, 62, 66, 69, 70] + count_volumes = pd.DataFrame(data=[["car", link, 1, -link] for link in links], columns=ODME.COUNT_VOLUME_COLS) + with self.assertRaises(ValueError): + ODME(self.assignment, count_volumes, algorithm=self.algorithm) + + # Duplicate count volumes: + count_volumes = pd.DataFrame(data=[["car", 1, 1, i] for i in range(5)], columns=ODME.COUNT_VOLUME_COLS) + with self.assertRaises(ValueError): + ODME(self.assignment, count_volumes, algorithm=self.algorithm) + + # Non-float/integer count volumes: + count_volumes = pd.DataFrame( + data=[["car", 1, 1, "7"], ["car", 10, 1, [1]], ["car", 15, 1, (1, 2)]], columns=ODME.COUNT_VOLUME_COLS + ) + with self.assertRaises(ValueError): + ODME(self.assignment, count_volumes, algorithm=self.algorithm) + + def test_basic_2_2(self) -> None: + """ + Check ValueError is raised if invalid stopping criteria are given + or stopping criteria are given with missing criteria. + """ + count_volumes = pd.DataFrame(data=[["car", 1, 1, 1]], columns=ODME.COUNT_VOLUME_COLS) + + # Check invalid (0) max iterations + stop_crit = {"max_outer": 0, "max_inner": 0, "convergence_crit": 10**-4, "inner_convergence": 10**-4} + with self.assertRaises(ValueError): + ODME(self.assignment, count_volumes, stop_crit=stop_crit, algorithm=self.algorithm) + + # Check invalid (negative) convergence + stop_crit = {"max_outer": 10, "max_inner": 10, "convergence_crit": -(10**-4), "inner_convergence": -(10**-4)} + with self.assertRaises(ValueError): + ODME(self.assignment, count_volumes, stop_crit=stop_crit, algorithm=self.algorithm) + + # Check missing criteria + stop_crit = { + "max_outer": 10, + "max_inner": 10, + "convergence_crit": 10**-4, + } + with self.assertRaises(ValueError): + ODME(self.assignment, count_volumes, stop_crit=stop_crit, algorithm=self.algorithm) + + # Simple Test Cases (Exact Results Expected For Spiess): + def test_basic_3_1(self) -> None: + """ + Test for single count volume observation which is double the currently assigned flow when + setting only 2 OD pairs to be non-negative such that all flow enters this link. + NOTE - we are using small flows with little congestion. + + Link is 38, and OD pairs are 13-12 & 24-12 (bottom left area of graph in QGIS) + + Check that the new flow following ODME matches the observed flow. + """ + # Set synthetic demand matrix + demand = np.zeros(self.dims) + demand[self.index[13], self.index[12], 0] = 1 + demand[self.index[24], self.index[12], 0] = 1 + self.matrix.matrices = demand + + # Extract assigned flow on link 38 + self.assignment.execute() + assign_df = self.assignment.results().reset_index(drop=False).fillna(0) + old_flow = assign_df.loc[assign_df["link_id"] == 38, "matrix_ab"].values[0] + + # Perform ODME with doubled link flow on link 38 + count_volumes = pd.DataFrame(data=[["car", 38, 1, 2 * old_flow]], columns=ODME.COUNT_VOLUME_COLS) + odme = ODME(self.assignment, count_volumes, algorithm=self.algorithm) + odme.execute() + + # Get results + new_demand = odme.get_demands()[0] + self.assignment.execute() + assign_df = self.assignment.results().reset_index(drop=False).fillna(0) + new_flow = assign_df.loc[assign_df["link_id"] == 38, "matrix_ab"].values[0] + + # Assert link flow is doubled: + self.assertAlmostEqual(new_flow, 2 * old_flow) + + # Assert only appropriate O-D's have increased non-zero demand + od_13_12 = new_demand[self.index[13], self.index[12]] + od_24_12 = new_demand[self.index[24], self.index[12]] + self.assertAlmostEqual(np.sum(new_demand), od_13_12 + od_24_12) + self.assertTrue(od_13_12 > 1 or od_24_12 > 1) + + def test_basic_3_2(self) -> None: + """ + Test for two count volume observations with competing priorities for links (ie differing + observed volumes). Only has 1 non-zero OD pair which influences both links. + + Links are 5 & 35, and OD pair is 13-1 (see left side of graph in QGIS) + + Check that error from assigned volumes and observed volumes is balanced across both links. + We expect flow on 5 and 35 to be equal and between the count volume on each + """ + # Set synthetic demand matrix + demand = np.zeros(self.dims) + demand[self.index[13], self.index[1], 0] = 10 + self.matrix.matrices = demand + + # Perform ODME with competing link flows on 5 & 35 + count_volumes = pd.DataFrame(data=[["car", 5, 1, 100], ["car", 35, 1, 50]], columns=ODME.COUNT_VOLUME_COLS) + odme = ODME(self.assignment, count_volumes, algorithm=self.algorithm) + odme.execute() + + # Get Results: + new_demand = odme.get_demands()[0] + self.assignment.execute() + assign_df = self.assignment.results().reset_index(drop=False).fillna(0) + flow_5 = assign_df.loc[assign_df["link_id"] == 5, "matrix_ab"].values[0] + flow_35 = assign_df.loc[assign_df["link_id"] == 35, "matrix_ab"].values[0] + + # Assert link flows are equal: + self.assertAlmostEqual(flow_5, flow_35, msg=f"Expected balanced flows but are: {flow_5} and {flow_35}") + + # Assert link flows are balanced halfway between each other: + self.assertTrue(flow_5 > 50 and flow_5 < 100, msg="Expected flows to be between 50 & 100") + + # Assert only appropriate O-D's have had demand changed + od_13_1 = new_demand[self.index[13], self.index[1]] + self.assertAlmostEqual(np.sum(new_demand), od_13_1, msg="Unexpected OD pair has non-zero demand") + + # Saving Test + def test_save(self) -> None: + """ + Checks that we can save matrices to project appropriately. + """ + # Create zero matrix to check we save this one + self.matrix.matrices = np.zeros(self.dims) + + # Placeholder count volumes: + counts = pd.DataFrame(data=[["car", 1, 1, 0]], columns=ODME.COUNT_VOLUME_COLS) + + # Create ODME object then save to appropriate location + odme = ODME(self.assignment, counts) + suffix = "omx" + fname = f"test.{suffix}" + name = f"test_{suffix}" + odme.save_to_project(name, fname, project=self.project) + + matrix = self.project.matrices.get_matrix(name) + matrix.computational_view() + + # Check saved matrix is the same as the initial matrix: + np.testing.assert_allclose( + np.zeros(self.dims), + matrix.matrix_view[:, :, np.newaxis], # Need new axis as this is single class + err_msg="Saved 0 demand matrix but did not get 0 demand matrix when extracted!", + ) + + +class TestODMEMultiClass(TestCase): + """ + Basic unit tests for ODME multiple class execution + """ + + def setUp(self) -> None: + # Download example project + os.environ["PATH"] = os.path.join(gettempdir(), "temp_data") + ";" + os.environ["PATH"] + + # Create graphs + proj_path = os.path.join(gettempdir(), "test_odme_traffic_assignment_" + uuid.uuid4().hex) + self.project = create_example(proj_path) + self.project.network.build_graphs() + self.car_graph = self.project.network.graphs["c"] # type: Graph + self.truck_graph = self.project.network.graphs["T"] # type: Graph + self.moto_graph = self.project.network.graphs["M"] # type: Graph + + for graph in [self.car_graph, self.truck_graph, self.moto_graph]: + graph.set_skimming(["free_flow_time"]) + graph.set_graph("free_flow_time") + graph.set_blocked_centroid_flows(False) + + # Open matrices: (note - we copy them to get a memory only non omx version) + self.car_matrix = self.project.matrices.get_matrix("demand_mc").copy(memory_only=True) + self.car_matrix.computational_view(["car"]) + + self.truck_matrix = self.project.matrices.get_matrix("demand_mc").copy(memory_only=True) + self.truck_matrix.computational_view(["trucks"]) + + self.moto_matrix = self.project.matrices.get_matrix("demand_mc").copy(memory_only=True) + self.moto_matrix.computational_view(["motorcycle"]) + + # Create assignment object and assign classes + self.assignment = TrafficAssignment() + self.carclass = TrafficClass("car", self.car_graph, self.car_matrix) + self.carclass.set_pce(1.0) + self.truckclass = TrafficClass("truck", self.truck_graph, self.truck_matrix) + self.truckclass.set_pce(2.5) + self.motoclass = TrafficClass("motorcycle", self.moto_graph, self.moto_matrix) + self.motoclass.set_pce(0.2) + + self.assignment.set_classes([self.carclass, self.truckclass, self.motoclass]) + + # Set assignment parameters + self.assignment.set_vdf("BPR") + self.assignment.set_vdf_parameters({"alpha": 0.15, "beta": 4.0}) + self.assignment.set_vdf_parameters({"alpha": "b", "beta": "power"}) + + self.assignment.set_capacity_field("capacity") + self.assignment.set_time_field("free_flow_time") + + self.assignment.max_iter = 5 + self.assignment.set_algorithm("msa") + + # Store extra variables needed for ODME/demand matrix manipulation: + self.car_index = self.car_graph.nodes_to_indices + self.truck_index = self.truck_graph.nodes_to_indices + self.moto_index = self.moto_graph.nodes_to_indices + + self.user_classes = self.assignment.classes + self.class_ids = [user_class._id for user_class in self.user_classes] + self.matrices = [user_class.matrix for user_class in self.user_classes] + self.matrix_view_names = [matrix.view_names[0] for matrix in self.matrices] + self.matrix_dims = [matrix.matrices.shape for matrix in self.matrices] + self.matrix_view_dims = [matrix.matrix_view.shape + (1,) for matrix in self.matrices] + self.class_to_matrix_idx = [matrix.names.index(matrix.view_names[0]) for matrix in self.matrices] + self.indexes = [self.car_index, self.truck_index, self.moto_index] + + # Currently testing algorithm: + self.algorithm = "spiess" + + def tearDown(self) -> None: + for mat in [self.car_matrix, self.truck_matrix, self.moto_matrix]: + mat.close() + self.project.close() + + # Basic Zeros Test + def test_all_zeros(self) -> None: + """ + Check that running ODME on 3 user classes with all 0 demand matrices, + returns 0 demand matrix when given a count volumes of 0 from each + class. + """ + # Set synthetic demand matrix & count volumes + for dims, matrix in zip(self.matrix_dims, self.matrices): + matrix.matrices = np.zeros(dims) + + count_volumes = pd.DataFrame( + data=[[user_class, 1, 1, 0] for user_class in self.class_ids], columns=ODME.COUNT_VOLUME_COLS + ) + + # Run ODME algorithm. + odme = ODME(self.assignment, count_volumes, algorithm=self.algorithm) + odme.execute() + demands = odme.get_demands() + + # Check for each class that the matrix is still 0's. + for demand, dims, matrix, mname in zip(demands, self.matrix_view_dims, self.matrices, self.class_ids): + np.testing.assert_allclose( + demand, np.zeros(dims), err_msg=f"The {mname} matrix was changed from 0 when initially a 0 matrix!" + ) + + # Input Validity + def test_mc_inputs(self) -> None: + """ + Checks ValueErrors are raised for invalid inputs involving duplicate + count volumes with multiple classes. + """ + # Duplicate count volumes: + data = [[cls_id, 10, 1, i] for i in range(3) for cls_id in self.class_ids] + count_volumes = pd.DataFrame(data=data, columns=ODME.COUNT_VOLUME_COLS) + with self.assertRaises(ValueError): + ODME(self.assignment, count_volumes, algorithm=self.algorithm) + + # Simple MC Test Case + def test_simple_mc(self) -> None: + """ + Tests whether ODME can handle multiple classes with multiple + links with competing priorities. Serves as an extension to test + 3.2 for single class. + + Links with competing priorities are again links 5/35 (upper left going downwards). + Single OD pair with (different across classes) non-zero demand for each class + is OD pair 13->1. + """ + # Set synthetic demand matrices + ods = [10, 20, 50] + for dims, matrix, index, o_d, idx in zip( + self.matrix_dims, self.matrices, self.indexes, ods, self.class_to_matrix_idx + ): + matrix.matrices = np.zeros(dims) + matrix.matrices[index[13], index[1], idx] = o_d + + # Perform ODME with competing link flows on 5 & 35 + flows = [[100, 50], [30, 10], [20, 60]] + count_volumes = pd.DataFrame( + data=[ + ["car", 5, 1, flows[0][0]], + ["car", 35, 1, flows[0][1]], + ["truck", 5, 1, flows[1][0]], + ["truck", 35, 1, flows[1][1]], + ["motorcycle", 5, 1, flows[2][0]], + ["motorcycle", 35, 1, flows[2][1]], + ], + columns=ODME.COUNT_VOLUME_COLS, + ) + odme = ODME(self.assignment, count_volumes, algorithm=self.algorithm) + odme.execute() + + # Get Results: + demands = odme.get_demands() + self.assignment.execute() + assign_df = self.assignment.results().reset_index(drop=False).fillna(0) + + # Check Results: + for index, demand, idx in zip(self.indexes, demands, self.class_to_matrix_idx): + # Assert only appropriate O-D's have had demand changed + od_13_1 = demand[index[13], index[1], 0] + self.assertAlmostEqual(np.sum(demand), od_13_1, msg="Unexpected OD pair has non-zero demand") + + for flow, name in zip(flows, self.matrix_view_names): + flow_5 = assign_df.loc[assign_df["link_id"] == 5, f"{name}_ab"].values[0] + flow_35 = assign_df.loc[assign_df["link_id"] == 35, f"{name}_ab"].values[0] + + # Assert link flows are equal: + self.assertAlmostEqual(flow_5, flow_35, msg=f"Expected balanced flows but are: {flow_5} and {flow_35}") + + # Assert link flows are balanced halfway between each other: + self.assertTrue( + flow_5 > min(flow) and flow_5 < max(flow), msg=f"Expected flows to be between {min(flow)} & {max(flow)}" + ) diff --git a/tests/aequilibrae/project/test_matrices.py b/tests/aequilibrae/project/test_matrices.py index da1d761e4..5f1edbe88 100644 --- a/tests/aequilibrae/project/test_matrices.py +++ b/tests/aequilibrae/project/test_matrices.py @@ -8,6 +8,7 @@ from shutil import copytree import uuid from tempfile import gettempdir + from ...data import siouxfalls_project from aequilibrae.project import Project @@ -46,6 +47,15 @@ def test_set_record(self): rec.file_name = "SiouxFalls.omx" self.assertEqual(rec.cores, 1, "Setting a file that exists did not correct the number of cores") + def test_report(self): + rec = self.matrices.get_record("omx2") + rec.report = {"a": 7} + self.assertEqual(rec.report, {"a": 7}, "Can store a dictionary") + rec.save() + + self.matrices.reload() + self.assertEqual(self.matrices.get_record("omx2").report, {"a": 7}, "Can store a dictionary") + def test_clear_database(self): self.__mat_count(3, "The test data started wrong")