From 692579ca4664ae48c8d438c3e4f686a7d25ce240 Mon Sep 17 00:00:00 2001 From: Pedro Camargo Date: Thu, 31 Oct 2024 23:23:48 +1000 Subject: [PATCH] Pedro/messaging (#578) * fixes messaging * fixes messaging * messaging * updates messaging * updates messaging * updates messaging * Reinstates worker thread * return worker thread to files * return worker thread * . * upload docs * fixes signals for qgis * GTFS messages and linting * GTFS messages * GTFS signals * . * fixes docs * update signals * updates signals * . * update network skimming signals * . * Updates signal in GTFS loader * Apply suggestions from code review * updates signals * removes double pbar from GTFS * Updates assignment signals * Updates linear approximation signals * . * removes unnecessary use of the temporary directory * . * major rework of the signal class * lint * ruff * Update softwaredevelopment.rst * Update .github/workflows/documentation.yml Co-authored-by: Jamie Cook * update latex engine * Update conf.py * . * . * Update conf.py * removes unicode char * updates signals and does not create a pdf file * fixes workflow * Apply suggestions from code review --------- Co-authored-by: pveigadecamargo Co-authored-by: Renata Imai Co-authored-by: Jamie Cook Co-authored-by: Jamie Cook --- .github/workflows/documentation.yml | 8 +- aequilibrae/__init__.py | 23 ------ aequilibrae/log.py | 9 --- aequilibrae/paths/all_or_nothing.py | 27 ++++--- aequilibrae/paths/connectivity_analysis.py | 2 +- aequilibrae/paths/graph.py | 6 +- aequilibrae/paths/linear_approximation.py | 47 +++++------ aequilibrae/paths/network_skimming.py | 20 ++--- aequilibrae/project/network/network.py | 14 ++-- .../project/network/osm/osm_builder.py | 44 ++++------ .../project/network/osm/osm_downloader.py | 26 +++--- aequilibrae/transit/gtfs_loader.py | 38 ++++----- aequilibrae/transit/lib_gtfs.py | 32 ++++---- aequilibrae/transit/map_matching_graph.py | 33 +++++--- aequilibrae/transit/parse_csv.py | 2 +- aequilibrae/transit/transit.py | 10 ++- aequilibrae/utils/aeq_signal.py | 43 ++++++++++ aequilibrae/utils/geo_index.py | 33 ++++---- aequilibrae/utils/interface/__init__.py | 0 aequilibrae/utils/interface/worker_thread.py | 27 +++++++ aequilibrae/utils/python_signal.py | 77 +++++++++--------- aequilibrae/utils/qgis_utils.py | 1 + aequilibrae/utils/signal.py | 15 ---- docs/source/conf.py | 3 +- .../creating_models/plot_create_from_gmns.py | 17 ++++ .../creating_models/plot_create_zoning.py | 24 +++++- .../plot_find_disconnected.py | 17 ++++ .../source/examples/skimming/plot_skimming.py | 17 ++++ .../development/softwaredevelopment.rst | 81 +++++++++++++------ .../IPF_benchmark.ipynb | 7 +- tests/aequilibrae/log/test_log.py | 15 ---- .../paths/test_transit_graph_builder.py | 2 +- .../project/test_osm_downloader.py | 4 - tests/aequilibrae/utils/test_signal.py | 23 ++++++ 34 files changed, 441 insertions(+), 306 deletions(-) create mode 100644 aequilibrae/utils/aeq_signal.py create mode 100644 aequilibrae/utils/interface/__init__.py create mode 100644 aequilibrae/utils/interface/worker_thread.py delete mode 100644 aequilibrae/utils/signal.py create mode 100644 tests/aequilibrae/utils/test_signal.py diff --git a/.github/workflows/documentation.yml b/.github/workflows/documentation.yml index bad2dacf5..5907925c0 100644 --- a/.github/workflows/documentation.yml +++ b/.github/workflows/documentation.yml @@ -57,7 +57,6 @@ jobs: - name: Build documentation run: | jupyter nbconvert --to rst docs/source/useful_information/validation_benchmarking/IPF_benchmark.ipynb - sphinx-build -M latexpdf docs/source docs/source/_static sphinx-build -b html docs/source docs/build python3 -m zipfile -c AequilibraE.zip docs/build cp AequilibraE.zip docs/source/_static @@ -120,9 +119,6 @@ jobs: SOURCE_DIR: 'docs/build/htmlv/' # optional: defaults to entire repository DEST_DIR: 'python/' # optional: defaults to entire repository - - name: Rename documentation folder to push to S3 DEV - if: ${{ (github.event_name == 'pull_request') && (env.HAS_SECRETS == 'true') }} - run: mv docs/build/html/ docs/build/${{ github.event.number }}/ - name: Upload to DEV on S3 if: ${{ (github.event_name == 'pull_request') && (env.HAS_SECRETS == 'true') }} @@ -134,5 +130,5 @@ jobs: AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} AWS_REGION: 'us-east-1' # optional: defaults to us-east-1 - SOURCE_DIR: 'docs/build/' # optional: defaults to entire repository - DEST_DIR: 'python/dev/' # optional: defaults to entire repository \ No newline at end of file + SOURCE_DIR: 'docs/build/html/' # optional: defaults to entire repository + DEST_DIR: 'python/dev/${{ github.event.number }}/' # optional: defaults to entire repository \ No newline at end of file diff --git a/aequilibrae/__init__.py b/aequilibrae/__init__.py index 53dabf0ac..85ff7defe 100644 --- a/aequilibrae/__init__.py +++ b/aequilibrae/__init__.py @@ -10,12 +10,6 @@ from aequilibrae import transit from aequilibrae import project -try: - from aequilibrae.paths.AoN import path_computation -except Exception as e: - global_logger.warning(f"Failed to import compiled modules. {e.args}") - raise - from aequilibrae.distribution import Ipf, GravityApplication, GravityCalibration, SyntheticGravityModel from aequilibrae.matrix import AequilibraeMatrix from aequilibrae import distribution @@ -32,20 +26,3 @@ from aequilibrae import paths name = "aequilibrae" - - -def setup(): - sys.dont_write_bytecode = True - cleaning() - - -def cleaning(): - p = tempfile.gettempdir() + "/aequilibrae_*" - for f in glob.glob(p): - try: - os.unlink(f) - except Exception as err: - global_logger.warning(err.__str__()) - - -setup() diff --git a/aequilibrae/log.py b/aequilibrae/log.py index 22efd8922..0fc515ebf 100644 --- a/aequilibrae/log.py +++ b/aequilibrae/log.py @@ -43,17 +43,8 @@ def clear(self): def _setup_logger(): # CREATE THE GLOBAL LOGGER - - par = Parameters._default - do_log = par["system"]["logging"] - temp_folder = tempfile.gettempdir() - logger = logging.getLogger("aequilibrae") logger.setLevel(logging.DEBUG) - - if not len(logger.handlers) and do_log: - log_file = os.path.join(temp_folder, "aequilibrae.log") - logger.addHandler(get_log_handler(log_file)) return logger diff --git a/aequilibrae/paths/all_or_nothing.py b/aequilibrae/paths/all_or_nothing.py index 387b073e9..dffe96b88 100644 --- a/aequilibrae/paths/all_or_nothing.py +++ b/aequilibrae/paths/all_or_nothing.py @@ -12,23 +12,27 @@ except ImportError as ie: global_logger.warning(f"Could not import procedures from the binary. {ie.args}") -from aequilibrae.utils.signal import SIGNAL +from aequilibrae.utils.aeq_signal import SIGNAL +from aequilibrae.utils.interface.worker_thread import WorkerThread if False: from .results import AssignmentResults from .graph import Graph -class allOrNothing: +class allOrNothing(WorkerThread): + signal = SIGNAL(object) + def __init__(self, class_name, matrix, graph, results): - # type: (AequilibraeMatrix, Graph, AssignmentResults)->None - self.assignment: SIGNAL = None + # type: (str, AequilibraeMatrix, Graph, AssignmentResults)->None + WorkerThread.__init__(self, None) self.class_name = class_name self.matrix = matrix self.graph = graph self.results = results self.aux_res = MultiThreadedAoN() + self.signal.emit(["start", self.matrix.zones, self.class_name]) if results._graph_id != graph._id: raise ValueError("Results object not prepared. Use --> results.prepare(graph)") @@ -42,16 +46,12 @@ def __init__(self, class_name, matrix, graph, results): elif not np.array_equal(matrix.index, graph.centroids): raise ValueError("Matrix and graph do not have compatible sets of centroids.") - def _build_signal(self): - if self.assignment is None: - self.assignment = SIGNAL(object) - self.assignment.emit(["start", self.matrix.zones, self.class_name]) - def doWork(self): self.execute() def execute(self): - self._build_signal() + msg = f"All-or-Nothing - Traffic Class: {self.class_name} - Zones: 0/{self.matrix.zones}" + self.signal.emit(["set_text", msg]) self.report = [] self.cumulative = 0 self.aux_res.prepare(self.graph, self.results) @@ -70,7 +70,9 @@ def execute(self): pool.apply_async(self.func_assig_thread, args=(orig, all_threads)) pool.close() pool.join() - self.assignment.emit(["update", self.matrix.index.shape[0], self.class_name]) + val = self.matrix.index.shape[0] + msg = f"All-or-Nothing - Traffic Class: {self.class_name} - Zones: {val}/{self.matrix.zones}" + self.signal.emit(["set_text", msg]) # TODO: Multi-thread this sum self.results.compact_link_loads = np.sum(self.aux_res.temp_link_loads, axis=0) assign_link_loads( @@ -89,4 +91,5 @@ def func_assig_thread(self, origin, all_threads): if x != origin: self.report.append(x) if self.cumulative % 10 == 0: - self.assignment.emit(["update", self.cumulative, self.class_name]) + msg = f"All-or-Nothing - Traffic Class: {self.class_name} - Zones: {self.cumulative}/{self.matrix.zones}" + self.signal.emit(["set_text", msg]) diff --git a/aequilibrae/paths/connectivity_analysis.py b/aequilibrae/paths/connectivity_analysis.py index c1958d099..740492984 100644 --- a/aequilibrae/paths/connectivity_analysis.py +++ b/aequilibrae/paths/connectivity_analysis.py @@ -4,7 +4,7 @@ from aequilibrae.paths.AoN import connectivity_multi_threaded from aequilibrae.utils.core_setter import set_cores -from aequilibrae.utils.signal import SIGNAL +from aequilibrae.utils.aeq_signal import SIGNAL sys.dont_write_bytecode = True diff --git a/aequilibrae/paths/graph.py b/aequilibrae/paths/graph.py index a3ac40f91..407c6fcb0 100644 --- a/aequilibrae/paths/graph.py +++ b/aequilibrae/paths/graph.py @@ -518,13 +518,13 @@ def __determine_types__(self, new_type, current_type): self.logger.warning("Could not convert {} - {}".format(new_type, verr.__str__())) if isinstance(new_type, int): def_type = int - if current_type == float: + if current_type is float: def_type = float - elif current_type == str: + elif current_type is str: def_type = str elif isinstance(new_type, float): def_type = float - if current_type == str: + if current_type is str: def_type = str elif isinstance(new_type, str): def_type = str diff --git a/aequilibrae/paths/linear_approximation.py b/aequilibrae/paths/linear_approximation.py index 3c7f56f39..8a5760d0d 100644 --- a/aequilibrae/paths/linear_approximation.py +++ b/aequilibrae/paths/linear_approximation.py @@ -19,17 +19,18 @@ if False: from aequilibrae.paths.traffic_assignment import TrafficAssignment -from aequilibrae.utils.signal import SIGNAL -from aequilibrae.utils.python_signal import PythonSignal +from aequilibrae.utils.aeq_signal import SIGNAL, simple_progress +from aequilibrae.utils.interface.worker_thread import WorkerThread -class LinearApproximation: - def __init__(self, assig_spec, algorithm, project=None) -> None: - self.equilibration = SIGNAL(object) - self.assignment = SIGNAL(object) - if isinstance(self.assignment, PythonSignal): - self.assignment.pos = 1 +class LinearApproximation(WorkerThread): + equilibration = SIGNAL(object) + assignment = SIGNAL(object) + signal = SIGNAL(object) + def __init__(self, assig_spec, algorithm, project=None) -> None: + WorkerThread.__init__(self, None) + self.signal.emit(["set_text", "Linear Approximation"]) self.logger = project.logger if project else logging.getLogger("aequilibrae") self.project_path = project.project_base_path if project else gettempdir() @@ -468,28 +469,28 @@ def execute(self): # noqa: C901 self.aons[c._id] = allOrNothing(c._id, c.matrix, c.graph, c._aon_results) - self.equilibration.emit(["start", self.max_iter, "Equilibrium Assignment"]) self.logger.info(f"{self.algorithm} Assignment STATS") self.logger.info("Iteration, RelativeGap, stepsize") - for self.iter in range(1, self.max_iter + 1): # noqa: B020 + + msg = "Equilibrium Assignment" + for self.iter in simple_progress(range(1, self.max_iter + 1), self.signal, msg): # noqa: B020 self.iteration_issue = [] - self.equilibration.emit(["key_value", "rgap", self.rgap]) - self.equilibration.emit(["key_value", "iterations", self.iter]) aon_flows = [] self.__maybe_create_path_file_directories() for c in self.traffic_classes: # type: TrafficClass - self.assignment.emit(["start", c.matrix.zones, "All-or-Nothing"]) + msg = f"All-or-Nothing - Traffic Class: {c._id}" + self.signal.emit(["set_text", msg]) # cost = c.fixed_cost / c.vot + self.congested_time # now only once cost = c.fixed_cost + self.congested_time aggregate_link_costs(cost, c.graph.compact_cost, c.results.crosswalk) aon = self.aons[c._id] # This is a new object every iteration, with new aux_res - self.assignment.emit(["refresh"]) - self.assignment.emit(["reset"]) - aon.assignment = self.assignment + self.signal.emit(["refresh"]) + self.signal.emit(["reset"]) + aon.signal = self.signal aon.execute() c._aon_results.link_loads *= c.pce @@ -574,10 +575,10 @@ def execute(self): # noqa: C901 if self.algorithm == "all-or-nothing": break + # Check convergence # This needs to be done with the current costs, and not the future ones converged = self.check_convergence() if self.iter > 1 else False - self.equilibration.emit(["update", self.iter, f"Equilibrium Assignment: RGap - {self.rgap:.3E}"]) self.vdf.apply_vdf( self.congested_time, self.fw_total_flow, @@ -596,8 +597,6 @@ def execute(self): # noqa: C901 self.convergence_report["rgap"].append(self.rgap) self.convergence_report["warnings"].append("; ".join(self.iteration_issue)) self.convergence_report["alpha"].append(self.stepsize) - self.equilibration.emit(["key_value", "rgap", self.rgap]) - self.equilibration.emit(["key_value", "iterations", self.iter]) if self.algorithm in ["cfw", "bfw"]: self.convergence_report["beta0"].append(self.betas[0]) @@ -620,6 +619,9 @@ def execute(self): # noqa: C901 idx = c.graph.skim_fields.index(self.time_field) c.graph.skims[:, idx] = self.congested_time[:] + msg = f"Equilibrium Assignment - Iteration: {self.iter}/{self.max_iter} - RGap: {self.rgap:.6}" + self.signal.emit(["set_text", msg]) + for c in self.traffic_classes: c.results.link_loads /= c.pce c.results.total_flows() @@ -627,9 +629,7 @@ def execute(self): # noqa: C901 if (self.rgap > self.rgap_target) and (self.algorithm != "all-or-nothing"): self.logger.error(f"Desired RGap of {self.rgap_target} was NOT reached") self.logger.info(f"{self.algorithm} Assignment finished. {self.iter} iterations and {self.rgap} final gap") - self.equilibration.emit(["update", self.max_iter, f"Equilibrium Assignment: RGap - {self.rgap:.3E}"]) - self.assignment.emit(["finished"]) - self.equilibration.emit(["finished"]) + self.signal.emit(["finished"]) def __derivative_of_objective_stepsize_dependent(self, stepsize, const_term): """The stepsize-dependent part of the derivative of the objective function. If fixed costs are defined, @@ -715,6 +715,3 @@ def check_convergence(self): if self.rgap_target >= self.rgap: return True return False - - def signal_handler(self, val): - self.assignment.emit(val) diff --git a/aequilibrae/paths/network_skimming.py b/aequilibrae/paths/network_skimming.py index fbf4ebe73..34a673eaf 100644 --- a/aequilibrae/paths/network_skimming.py +++ b/aequilibrae/paths/network_skimming.py @@ -11,12 +11,15 @@ from aequilibrae.paths.multi_threaded_skimming import MultiThreadedNetworkSkimming from aequilibrae.paths.results.skim_results import SkimResults from aequilibrae.utils.core_setter import set_cores -from aequilibrae.utils.signal import SIGNAL +from aequilibrae.utils.aeq_signal import SIGNAL +from aequilibrae.utils.interface.worker_thread import WorkerThread sys.dont_write_bytecode = True -class NetworkSkimming: +class NetworkSkimming(WorkerThread): + signal = SIGNAL(object) + """ .. code-block:: python @@ -51,9 +54,8 @@ class NetworkSkimming: >>> project.close() """ - skimming = SIGNAL(object) - def __init__(self, graph, origins=None, project=None): + WorkerThread.__init__(self, None) self.project = project self.origins = origins self.graph = graph @@ -70,7 +72,7 @@ def doWork(self): def execute(self): """Runs the skimming process as specified in the graph""" - self.skimming.emit(["zones finalized", 0]) + self.signal.emit(["start", self.graph.num_zones, ""]) self.results.cores = self.cores self.results.prepare(self.graph) self.aux_res = MultiThreadedNetworkSkimming() @@ -91,8 +93,8 @@ def execute(self): self.procedure_id = uuid4().hex self.procedure_date = str(datetime.today()) - self.skimming.emit(["text skimming", "Saving Outputs"]) - self.skimming.emit(["finished_threaded_procedure", None]) + self.signal.emit(["set_text", "Saving Outputs"]) + self.signal.emit(["finished"]) def set_cores(self, cores: int) -> None: """ @@ -143,6 +145,4 @@ def __func_skim_thread(self, origin, all_threads): if x != origin: self.report.append(x) - self.skimming.emit(["zones finalized", self.cumulative]) - txt = str(self.cumulative) + " / " + str(self.matrix.zones) - self.skimming.emit(["text skimming", txt]) + self.signal.emit(["update", self.cumulative, f"{self.cumulative}/{self.graph.num_zones}"]) diff --git a/aequilibrae/project/network/network.py b/aequilibrae/project/network/network.py index 3fbb58ee8..914b89300 100644 --- a/aequilibrae/project/network/network.py +++ b/aequilibrae/project/network/network.py @@ -23,23 +23,25 @@ from aequilibrae.project.network.periods import Periods from aequilibrae.project.project_creation import req_link_flds, req_node_flds, protected_fields from aequilibrae.utils.db_utils import commit_and_close -from aequilibrae.utils.signal import SIGNAL +from aequilibrae.utils.aeq_signal import SIGNAL +from aequilibrae.utils.interface.worker_thread import WorkerThread +from aequilibrae.utils.qgis_utils import inside_qgis from aequilibrae.utils.spatialite_utils import connect_spatialite -class Network: +class Network(WorkerThread): """ Network class. Member of an AequilibraE Project """ - netsignal = SIGNAL(object) - req_link_flds = req_link_flds req_node_flds = req_node_flds protected_fields = protected_fields link_types: LinkTypes = None + signal = SIGNAL(object) def __init__(self, project) -> None: + WorkerThread.__init__(self, None) from aequilibrae.paths import Graph self.graphs = {} # type: Dict[Graph] @@ -211,13 +213,13 @@ def create_from_osm( polygons.append(subarea) self.logger.info("Downloading data") dwnloader = OSMDownloader(polygons, modes, logger=self.logger) - dwnloader.downloading = self.netsignal + dwnloader.signal = self.signal dwnloader.doWork() self.logger.info("Building Network") self.builder = OSMBuilder(dwnloader.data, project=self.project, model_area=model_area, clean=clean) - self.builder.building = self.netsignal + self.builder.signal = self.signal self.builder.doWork() self.logger.info("Network built successfully") diff --git a/aequilibrae/project/network/osm/osm_builder.py b/aequilibrae/project/network/osm/osm_builder.py index 5032df488..a42cfc810 100644 --- a/aequilibrae/project/network/osm/osm_builder.py +++ b/aequilibrae/project/network/osm/osm_builder.py @@ -14,18 +14,20 @@ from aequilibrae.parameters import Parameters from aequilibrae.project.project_creation import remove_triggers, add_triggers from aequilibrae.utils.db_utils import commit_and_close, read_and_close, list_columns -from aequilibrae.utils.signal import SIGNAL +from aequilibrae.utils.aeq_signal import SIGNAL, simple_progress +from aequilibrae.utils.interface.worker_thread import WorkerThread from aequilibrae.utils.spatialite_utils import connect_spatialite from .model_area_gridding import geometry_grid -class OSMBuilder: - building = SIGNAL(object) +class OSMBuilder(WorkerThread): + signal = SIGNAL(object) def __init__(self, data, project, model_area: Polygon, clean: bool) -> None: + WorkerThread.__init__(self, None) project.logger.info("Preparing OSM builder") - self.__emit_all(["text", "Preparing OSM builder"]) + self.signal.emit(["set_text", "Preparing OSM builder"]) self.project = project or get_active_project() self.logger = self.project.logger @@ -44,9 +46,6 @@ def __init__(self, data, project, model_area: Polygon, clean: bool) -> None: gc.collect() self.links_df = data["links"] - def __emit_all(self, *args): - self.building.emit(*args) - def doWork(self): with commit_and_close(connect_spatialite(self.path)) as conn: self.__update_table_structure(conn) @@ -59,7 +58,7 @@ def doWork(self): conn.commit() self.__do_clean(conn) - self.__emit_all(["finished_threaded_procedure", 0]) + self.signal.emit(["finished"]) def importing_network(self, conn): self.logger.info("Importing the network") @@ -69,18 +68,12 @@ def importing_network(self, conn): self.node_df.set_index(["osm_id"], inplace=True) self.__process_link_chunk() - shape_ = self.links_df.shape[0] - message_step = max(1, floor(shape_ / 100)) - self.__emit_all(["maxValue", shape_]) self.logger.info("Geo-procesing links") - self.__emit_all(["text", "Adding network links"]) geometries = [] self.links_df.set_index(["osm_id"], inplace=True) - for counter, (idx, link) in enumerate(self.links_df.iterrows()): - self.__emit_all(["Value", counter]) - if counter % message_step == 0: - self.logger.info(f"Creating segments from {counter:,} out of {shape_ :,} OSM link objects") + + for idx, link in simple_progress(self.links_df.iterrows(), self.signal, "Adding network links"): # How can I link have less than two points? if not isinstance(link["nodes"], list): @@ -127,7 +120,7 @@ def importing_network(self, conn): self.node_df.to_parquet(osm_data_path / "nodes.parquet") self.logger.info("Adding nodes to file") - self.__emit_all(["text", "Adding nodes to file"]) + self.signal.emit(["set_text", "Adding nodes to file"]) # Removing the triggers before adding all nodes makes things a LOT faster remove_triggers(conn, self.logger, "network") @@ -158,7 +151,7 @@ def importing_network(self, conn): del self.links_df gc.collect() self.logger.info("Adding links to file") - self.__emit_all(["text", "Adding links to file"]) + self.signal.emit(["set_text", "Adding links to file"]) conn.executemany(insert_qry, links_df) def _build_geometry(self, nodes: List[int]) -> str: @@ -180,21 +173,18 @@ def __do_clean(self, conn): def __process_link_chunk(self): self.logger.info("Processing link modes, types and fields") - self.__emit_all(["text", "Processing link modes, types and fields"]) + self.signal.emit(["set_text", "Processing link modes, types and fields"]) # It is hard to define an optimal chunk_size, so let's assume that 1GB is a good size per chunk # And let's also assume that each row will be 200 fields at 8 bytes each # This makes 2Gb roughly equal to 2.6 million rows, so 2 million would so. chunk_size = 1_000_000 list_dfs = [self.links_df.iloc[i : i + chunk_size] for i in range(0, self.links_df.shape[0], chunk_size)] - self.links_df = pd.DataFrame([]) + self.links_df = [] # Initialize link types with read_and_close(self.project.path_to_file) as conn: self.__all_ltp = pd.read_sql('SELECT link_type_id, link_type, "" as highway from link_types', conn) - self.__emit_all(["maxValue", len(list_dfs)]) - for i, df in enumerate(list_dfs): - self.logger.info(f"Processing chunk {i + 1}/{len(list_dfs)}") - self.__emit_all(["Value", i]) + for df in simple_progress(list_dfs, self.signal, "Processing chunks"): if "tags" in df.columns: # It is critical to reset the index for the concat below to work df.reset_index(drop=True, inplace=True) @@ -206,14 +196,14 @@ def __process_link_chunk(self): else: self.logger.error("OSM link data does not have tags. Skipping an entire data chunk") df = pd.DataFrame([]) - list_dfs[i] = df - self.links_df = pd.concat(list_dfs, ignore_index=True) + self.links_df.append(df) + self.links_df = pd.concat(self.links_df, ignore_index=True) def __build_link_types(self, df): data = [] df = df.fillna(value={"highway": "missing"}) df.highway = df.highway.str.lower() - for i, lt in enumerate(df.highway.unique()): + for lt in df.highway.unique(): if str(lt) in self.__all_ltp.highway.values: continue data.append([*self.__define_link_type(str(lt)), str(lt)]) diff --git a/aequilibrae/project/network/osm/osm_downloader.py b/aequilibrae/project/network/osm/osm_downloader.py index 29a145ab5..47f2c9a02 100644 --- a/aequilibrae/project/network/osm/osm_downloader.py +++ b/aequilibrae/project/network/osm/osm_downloader.py @@ -22,17 +22,17 @@ from aequilibrae.context import get_logger from aequilibrae.parameters import Parameters -from aequilibrae.utils.signal import SIGNAL +from aequilibrae.utils.aeq_signal import SIGNAL, simple_progress +from aequilibrae.utils.interface.worker_thread import WorkerThread from .osm_params import http_headers, memory -class OSMDownloader: - downloading = SIGNAL(object) - - def __emit_all(self, *args): - self.downloading.emit(*args) +class OSMDownloader(WorkerThread): + signal = SIGNAL(object) def __init__(self, polygons: List[Polygon], modes, logger: logging.Logger = None): + WorkerThread.__init__(self, None) + self.logger = logger or get_logger() self.polygons = polygons self.filter = self.get_osm_filter(modes) @@ -52,16 +52,11 @@ def doWork(self): "{memory}[out:json][timeout:{timeout}];({infrastructure}{filters}({south:.6f},{west:.6f}," "{north:.6f},{east:.6f});>;);out;" ) - self.__emit_all(["maxValue", len(self.polygons)]) - self.__emit_all(["Value", 0]) m = "" if memory > 0: m = f"[maxsize: {memory}]" - for counter, poly in enumerate(self.polygons): - msg = f"Downloading polygon {counter + 1} of {len(self.polygons)}" - self.logger.info(msg) - self.__emit_all(["Value", counter]) - self.__emit_all(["text", msg]) + msg = f"Total polygons: {len(self.polygons)}" + for poly in simple_progress(self.polygons, self.signal, msg): west, south, east, north = poly.bounds query_str = query_template.format( north=north, @@ -83,8 +78,7 @@ def doWork(self): del json gc.collect() - self.__emit_all(["Value", len(self.polygons)]) - self.__emit_all(["text", "Downloading finished. Processing data"]) + self.signal.emit(["set_text", "Downloading finished. Processing data"]) for lst, table in [(self._links, "links"), (self._nodes, "nodes")]: df = pd.DataFrame([]) if len(lst) > 0: @@ -95,7 +89,7 @@ def doWork(self): lst.clear() gc.collect() - self.__emit_all(["FinishedDownloading", 0]) + self.signal.emit(["finished"]) def overpass_request(self, data, pause_duration=None, timeout=180, error_pause_duration=None): """Send a request to the Overpass API via HTTP POST and return the JSON response. diff --git a/aequilibrae/transit/gtfs_loader.py b/aequilibrae/transit/gtfs_loader.py index 9cfea2525..9b70be646 100644 --- a/aequilibrae/transit/gtfs_loader.py +++ b/aequilibrae/transit/gtfs_loader.py @@ -18,15 +18,18 @@ from aequilibrae.transit.functions.get_srid import get_srid from aequilibrae.transit.parse_csv import parse_csv from aequilibrae.transit.transit_elements import Fare, Agency, FareRule, Service, Trip, Stop, Route -from aequilibrae.utils.signal import SIGNAL +from aequilibrae.utils.aeq_signal import SIGNAL, simple_progress +from aequilibrae.utils.interface.worker_thread import WorkerThread -class GTFSReader: - """Loader for GTFS data. Not meant to be used directly by the user""" - +class GTFSReader(WorkerThread): signal = SIGNAL(object) + """Loader for GTFS data. Not meant to be used directly by the user""" + def __init__(self): + WorkerThread.__init__(self, None) + self.__capacities__ = {} self.__pces__ = {} self.__max_speeds__ = {} @@ -90,36 +93,29 @@ def __load_date(self): self.logger.debug("Starting __load_date") self.zip_archive = zipfile.ZipFile(self.archive_dir) - self.signal.emit(["start", 7, "Loading routes"]) self.__load_routes_table() - self.signal.emit(["update", 1, "Loading stops"]) self.__load_stops_table() - self.signal.emit(["update", 2, "Loading stop times"]) self.__load_stop_times() - self.signal.emit(["update", 3, "Loading shapes"]) self.__load_shapes_table() - self.signal.emit(["update", 4, "Loading trips"]) self.__load_trips_table() - self.signal.emit(["update", 5, "De-conflicting stop times"]) self.__deconflict_stop_times() - self.signal.emit(["update", 6, "Loading fares"]) self.__load_fare_data() self.zip_archive.close() - self.signal.emit(["finished"]) self.signal = SIGNAL(object) def __deconflict_stop_times(self) -> None: self.logger.info("Starting deconflict_stop_times") + msg = "De-conflicting stop times (Step: 6/12)" total_fast = 0 - for prog_counter, route in enumerate(self.trips): + for route in simple_progress(self.trips, self.signal, msg): max_speeds = self.__max_speeds__.get(self.routes[route].route_type, pd.DataFrame([])) for pattern in self.trips[route]: # type: Trip for trip in self.trips[route][pattern]: @@ -200,6 +196,7 @@ def __load_fare_data(self): self.logger.debug("Starting __load_fare_data") fareatttxt = "fare_attributes.txt" self.fare_attributes = {} + self.signal.emit(["set_text", "Loading fare data (Step: 7/12)"]) if fareatttxt in self.zip_archive.namelist(): self.logger.debug(' Loading "fare_attributes" table') @@ -263,7 +260,8 @@ def __load_shapes_table(self): lats, lons = self.transformer.transform(shapes[:]["shape_pt_lat"], shapes[:]["shape_pt_lon"]) shapes[:]["shape_pt_lat"][:] = lats[:] shapes[:]["shape_pt_lon"][:] = lons[:] - for i, shape_id in enumerate(all_shape_ids): + + for shape_id in simple_progress(all_shape_ids, self.signal, "Loading shapes (Step: 4/12)"): items = shapes[shapes["shape_id"] == shape_id] items = items[np.argsort(items["shape_pt_sequence"])] shape = LineString(list(zip(items["shape_pt_lon"], items["shape_pt_lat"]))) @@ -299,7 +297,7 @@ def __load_trips_table(self): self.trips = {str(x): {} for x in np.unique(trips_array["route_id"])} - for i, line in enumerate(trips_array): + for line in simple_progress(trips_array, self.signal, "Loading trips (Step: 5/12)"): trip = Trip() trip._populate(line, trips_array.dtype.names) trip.route_id = self.routes[trip.route].route_id @@ -421,10 +419,11 @@ def __load_stop_times(self): df = df.merge(stop_list, on="stop") df.sort_values(["trip_id", "stop_sequence"], inplace=True) df = df.assign(source_time=0) - for trip_id, data in [[trip_id, x] for trip_id, x in df.groupby(df["trip_id"])]: + + msg = "Loading stop times (Step: 3/12)" + for trip_id, data in simple_progress(df.groupby(df["trip_id"]), self.signal, msg): data.loc[:, "stop_sequence"] = np.arange(data.shape[0]) self.stop_times[trip_id] = data - counter += data.shape[0] def __load_stops_table(self): self.logger.debug("Starting __load_stops_table") @@ -443,7 +442,7 @@ def __load_stops_table(self): stops[:]["stop_lat"][:] = lats[:] stops[:]["stop_lon"][:] = lons[:] - for i, line in enumerate(stops): + for line in simple_progress(stops, self.signal, "Loading stops (Step: 2/12)"): s = Stop(self.agency.agency_id, line, stops.dtype.names) s.agency = self.agency.agency s.srid = self.srid @@ -474,7 +473,7 @@ def __load_routes_table(self): for route_type, pce in self.__pces__.items(): routes.loc[routes.route_type == route_type, ["pce"]] = pce - for i, line in routes.iterrows(): + for i, line in simple_progress(routes.iterrows(), self.signal, "Loading routes (Step: 1/12)"): r = Route(self.agency.agency_id) r.populate(line.values, routes.columns) self.routes[r.route] = r @@ -485,6 +484,7 @@ def __load_feed_calendar(self): has_cal, has_caldate = True, True + self.signal.emit(["set_text", "Loading feed calendar"]) caltxt = "calendar.txt" if caltxt in self.zip_archive.namelist(): self.logger.debug(' Loading "calendar" table') diff --git a/aequilibrae/transit/lib_gtfs.py b/aequilibrae/transit/lib_gtfs.py index bae1e6c12..c19a7e784 100644 --- a/aequilibrae/transit/lib_gtfs.py +++ b/aequilibrae/transit/lib_gtfs.py @@ -12,16 +12,17 @@ from aequilibrae.transit.constants import Constants, PATTERN_ID_MULTIPLIER from aequilibrae.transit.functions.get_srid import get_srid from aequilibrae.transit.transit_elements import Link, Pattern, mode_correspondence -from aequilibrae.utils.signal import SIGNAL +from aequilibrae.utils.aeq_signal import SIGNAL, simple_progress +from aequilibrae.utils.interface.worker_thread import WorkerThread from .gtfs_loader import GTFSReader from .map_matching_graph import MMGraph -class GTFSRouteSystemBuilder: - """Container for GTFS feeds providing data retrieval for the importer""" - +class GTFSRouteSystemBuilder(WorkerThread): signal = SIGNAL(object) + """Container for GTFS feeds providing data retrieval for the importer""" + def __init__( self, network, agency_identifier, file_path, day="", description="", capacities=None, pces=None ): # noqa: B006 @@ -39,6 +40,8 @@ def __init__( **description** (:obj:`str`, *Optional*): Description for this feed (e.g. 'CTA19 fixed by John after coffee') """ + WorkerThread.__init__(self, None) + self.__network = network self.project = get_active_project(False) self.archive_dir = None # type: str @@ -143,15 +146,12 @@ def map_match(self, route_types=[3]) -> None: # noqa: B006 if any(not isinstance(item, int) for item in route_types): raise TypeError("All route types must be integers") - self.signal.emit(["start", len(self.select_patterns), "Map-matching patterns"]) - for i, pat in enumerate(self.select_patterns.values()): - self.signal.emit(["update", i, f"Map-matching pattern {pat.pattern_id}"]) + for pat in simple_progress(self.select_patterns.values(), self.signal, "Map-matching patterns"): if pat.route_type in route_types: pat.map_match() msg = pat.get_error("stop_from_pattern") if msg is not None: self.logger.warning(msg) - self.signal.emit(["finished"]) def set_agency_identifier(self, agency_id: str) -> None: """Adds agency ID to this GTFS for use on import. @@ -199,7 +199,9 @@ def load_date(self, service_date: str) -> None: self.gtfs_data.load_data(service_date) self.logger.info(" Building data structures") + self.__build_data() + self.gtfs_data.agency.service_date = self.day def doWork(self): @@ -223,17 +225,17 @@ def save_to_disk(self): """Saves all transit elements built in memory to disk""" with closing(database_connection("transit")) as conn: - for counter, (_, pattern) in enumerate(self.select_patterns.items()): + for pattern in simple_progress(self.select_patterns.values(), self.signal, "Saving patterns (Step: 10/12)"): pattern.save_to_database(conn, commit=False) conn.commit() self.gtfs_data.agency.save_to_database(conn) - for counter, trip in enumerate(self.select_trips): + for trip in simple_progress(self.select_trips, self.signal, "Saving trips (Step: 11/12)"): trip.save_to_database(conn, commit=False) conn.commit() - for counter, (_, link) in enumerate(self.select_links.items()): + for link in simple_progress(self.select_links.values(), self.signal, "Saving links (Step: 11/12)"): link.save_to_database(conn, commit=False) conn.commit() @@ -254,7 +256,7 @@ def save_to_disk(self): for fare_rule in self.gtfs_data.fare_rules: fare_rule.save_to_database(conn) - for counter, (_, stop) in enumerate(self.select_stops.items()): + for stop in simple_progress(self.select_stops.values(), self.signal, "Saving stops (Step: 12/12)"): if stop.zone in zone_ids: stop.zone_id = zone_ids[stop.zone] if self.__has_taz: @@ -280,10 +282,9 @@ def __build_data(self): if self.__do_execute_map_matching: self.builds_link_graphs_with_broken_stops() + msg = f"Loading data for {self.day} (Step: 9/12) - " c = Constants() - self.signal.emit(["start", len(self.select_routes), f"Loading data for {self.day}"]) - for counter, (route_id, route) in enumerate(self.select_routes.items()): - self.signal.emit(["update", counter]) + for route_id, route in simple_progress(self.select_routes.items(), self.signal, msg): new_trips = self._get_trips_by_date_and_route(route_id, self.day) all_pats = [trip.pattern_hash for trip in new_trips] @@ -308,7 +309,6 @@ def __build_data(self): route.shape = self.__build_route_shape(patterns) route.pattern_id = trip.pattern_id - self.signal.emit(["finished"]) def __build_new_pattern(self, route, route_id, trip) -> Pattern: self.logger.debug(f"New Pattern ID {trip.pattern_id} for route ID {route_id}") diff --git a/aequilibrae/transit/map_matching_graph.py b/aequilibrae/transit/map_matching_graph.py index eaafd71f1..41707d758 100644 --- a/aequilibrae/transit/map_matching_graph.py +++ b/aequilibrae/transit/map_matching_graph.py @@ -18,18 +18,19 @@ from aequilibrae.transit.constants import DRIVING_SIDE from aequilibrae.transit.functions.compute_line_bearing import compute_line_bearing from aequilibrae.transit.transit_elements import mode_correspondence -from aequilibrae.utils.signal import SIGNAL +from aequilibrae.utils.aeq_signal import simple_progress +from aequilibrae.utils.interface.worker_thread import WorkerThread GRAPH_VERSION = 1 CONNECTOR_SPEED = 1 -class MMGraph: +class MMGraph(WorkerThread): """Build specialized map-matching graphs. Not designed to be used by the final user""" - signal = SIGNAL(object) - def __init__(self, lib_gtfs): + WorkerThread.__init__(self, None) + self.project = lib_gtfs.project self.stops = lib_gtfs.gtfs_data.stops self.lib_gtfs = lib_gtfs @@ -49,6 +50,7 @@ def __init__(self, lib_gtfs): self.distance_to_project = -1 self.df = pd.DataFrame([]) self.logger = logger + self.signal = lib_gtfs.signal def build_graph_with_broken_stops(self, mode_id: int, distance_to_project=200): """Build the graph for links for a certain mode while splitting the closest links at stops' projection @@ -60,6 +62,7 @@ def build_graph_with_broken_stops(self, mode_id: int, distance_to_project=200): Defaults to 50m """ self.logger.debug(f"Called build_graph_with_broken_stops for mode_id={mode_id}") + self.mode_id = mode_id self.distance_to_project = distance_to_project self.__mode = mode_correspondence[self.mode_id] @@ -100,34 +103,46 @@ def build_graph_with_broken_stops(self, mode_id: int, distance_to_project=200): return self.__build_graph_from_scratch() def __build_graph_from_cache(self): - self.logger.info(f"Loading map-matching graph from disk for mode_id={self.mode_id}") + msg = f"Loading map-matching graph from disk for mode_id={self.mode_id} (Step: 8/12)" + self.logger.info(msg) + self.signal.emit(["set_text", msg]) + net = pd.read_csv(self.__df_file) centroid_corresp = pd.read_csv(self.__centroids_file) centroids = np.copy(centroid_corresp.centroid_id.values) centroid_corresp.set_index("node_id", inplace=True) for stop in self.stops.values(): stop.__map_matching_id__[self.mode_id] = centroid_corresp.loc[stop.stop_id, "centroid_id"] + return self.__graph_from_broken_net(centroids, net) def __build_graph_from_scratch(self): - self.logger.info(f"Creating map-matching graph from scratch for mode_id={self.mode_id}") + msg = f"Creating map-matching graph from scratch for mode_id={self.mode_id} (Step: 8/12)" + self.logger.info(msg) + self.signal.emit(["set_text", msg]) + self.df = self.df.assign(original_id=self.df.link_id, is_connector=0, geo=self.df.wkt.apply(shapely.wkt.loads)) self.df.loc[self.df.link_id < 0, "link_id"] = self.df.link_id * -1 + self.df.link_id.max() + 1 - # We make sure all link IDs are in proper order + # We make sure all link IDs are in proper order self.max_link_id = self.df.link_id.max() + 1 self.max_node_id = self.df[["a_node", "b_node"]].max().max() + 1 + # Build initial index self._idx = GeoIndex() - for counter, (_, record) in enumerate(self.df.iterrows()): + msg = "Building graphs - Indexing links (Step: 8/12)" + for _, record in simple_progress(self.df.iterrows(), self.signal, msg): self._idx.insert(feature_id=record.link_id, geometry=record.geo) + # We will progressively break links at stops' projection # But only on the right side of the link (no boarding at the opposing link's side) centroids = [] self.node_corresp = [] self.df = self.df.assign(direction=1, free_flow_time=np.inf, wrong_side=0, closest=1, to_remove=0) self.__all_links = {rec.link_id: rec for _, rec in self.df.iterrows()} - for counter, (stop_id, stop) in enumerate(self.stops.items()): + + msg = "Building graphs - Breaking links (Step: 8/12)" + for stop_id, stop in simple_progress(self.stops.items(), self.signal, msg): stop.__map_matching_id__[self.mode_id] = self.max_node_id self.node_corresp.append([stop_id, self.max_node_id]) centroids.append(stop.__map_matching_id__[self.mode_id]) diff --git a/aequilibrae/transit/parse_csv.py b/aequilibrae/transit/parse_csv.py index 408febda2..e390827ef 100644 --- a/aequilibrae/transit/parse_csv.py +++ b/aequilibrae/transit/parse_csv.py @@ -55,7 +55,7 @@ def parse_csv(file_name: str, column_order=[]): # noqa B006 if int(data.shape.__len__()) > 0: # handle the case of int data given as a float string for j, (_, dtype) in enumerate(new_data_dt): - if dtype == int: + if dtype is int: for item in data: item[j] = item[j].split(".")[0] diff --git a/aequilibrae/transit/transit.py b/aequilibrae/transit/transit.py index fd19a72d9..1229d8c7d 100644 --- a/aequilibrae/transit/transit.py +++ b/aequilibrae/transit/transit.py @@ -12,11 +12,15 @@ from aequilibrae.paths.graph import TransitGraph from aequilibrae.project.database_connection import database_connection from aequilibrae.utils.db_utils import read_and_close +from aequilibrae.utils.aeq_signal import SIGNAL +from aequilibrae.utils.interface.worker_thread import WorkerThread import sqlite3 import pandas as pd -class Transit: +class Transit(WorkerThread): + + transit = SIGNAL(object) default_capacities = { 0: [150, 300], # Tram, Streetcar, Light rail 1: [280, 560], # Subway/metro @@ -38,6 +42,7 @@ def __init__(self, project): **project** (:obj:`Project`, *Optional*): The Project to connect to. By default, uses the currently active project """ + WorkerThread.__init__(self, None) self.project_base_path = project.project_base_path self.logger = logger @@ -71,6 +76,9 @@ def new_gtfs_builder(self, agency, file_path, day="", description="") -> GTFSRou capacities=self.default_capacities, pces=self.default_pces, ) + + gtfs.signal = self.transit + gtfs.gtfs_data.signal = self.transit return gtfs def create_transit_database(self): diff --git a/aequilibrae/utils/aeq_signal.py b/aequilibrae/utils/aeq_signal.py new file mode 100644 index 000000000..ca4329439 --- /dev/null +++ b/aequilibrae/utils/aeq_signal.py @@ -0,0 +1,43 @@ +from aequilibrae.utils.qgis_utils import inside_qgis + + +def noop(_): + pass + + +if inside_qgis: + from PyQt5.QtCore import pyqtSignal as SIGNAL # type: ignore + + noop(SIGNAL.__class__) # This should be no-op but it stops PyCharm from "optimising" the above import +else: + from aequilibrae.utils.python_signal import PythonSignal as SIGNAL # type: ignore + + noop(SIGNAL.__class__) # This should be no-op but it stops PyCharm from "optimising" the above import + + +class simple_progress(object): + """A `tqdm` style iterable wrapper using aequilibrae signals""" + + def __init__(self, thing, signal, msg=None): + self.thing = thing + self.iterator = None + + try: + num_elements = len(self.thing) + except (TypeError, AttributeError): + num_elements = 0 + + self.msg = msg or f"{{}}/{num_elements}" + self.signal = signal + self.signal.emit(["start", num_elements, self.msg.format(0)]) + self.counter = 1 + + def __iter__(self): + self.iterator = iter(self.thing) + return self + + def __next__(self): + current = next(self.iterator) + self.signal.emit(["update", self.counter, self.msg.format(self.counter)]) + self.counter = self.counter + 1 + return current diff --git a/aequilibrae/utils/geo_index.py b/aequilibrae/utils/geo_index.py index 048799478..d554f1e35 100644 --- a/aequilibrae/utils/geo_index.py +++ b/aequilibrae/utils/geo_index.py @@ -1,23 +1,16 @@ -import importlib.util as iutil import warnings from typing import Union, List from shapely.geometry import Point, Polygon, LineString, MultiPoint, MultiPolygon, MultiLineString from shapely.wkb import loads -rtree_avail = iutil.find_spec("rtree") is not None -qgis = iutil.find_spec("qgis") is not None -if qgis: +from aequilibrae.utils.qgis_utils import inside_qgis, rtree_avail + +if inside_qgis: from qgis.core import QgsSpatialIndex as Index from qgis.core import QgsGeometry, QgsFeature - - env = "QGIS" -elif rtree_avail: - from rtree.index import Index as Index - - env = "Python" else: - env = "NOT AVAILABLE" + from rtree import Index class GeoIndex: @@ -28,14 +21,16 @@ def __init__(self): self.built = False def build_from_layer(self, layer) -> dict: - if env != "QGIS": + if inside_qgis: warnings.warn("This method works inside QGIS only") self.built = True self.idx = Index(layer.getFeatures()) return {f.id(): loads(f.geometry().asWkb().data()) for f in layer.getFeatures()} def insert( - self, feature_id: int, geometry: Union[Point, Polygon, LineString, MultiPoint, MultiPolygon, MultiLineString] + self, + feature_id: int, + geometry: Union[Point, Polygon, LineString, MultiPoint, MultiPolygon, MultiLineString], ) -> None: """Inserts a valid shapely geometry in the index @@ -44,14 +39,14 @@ def insert( **geo** (:obj:`Shapely.geometry`): Any valid shapely geometry """ self.built = True - if env == "QGIS": + if inside_qgis: g = QgsGeometry() g.fromWkb(geometry.wkb) feature = QgsFeature() feature.setGeometry(g) feature.setId(feature_id) self.idx.addFeature(feature) - elif env == "Python": + elif rtree_avail: self.idx.insert(feature_id, geometry.bounds) else: warnings.warn("You need RTREE to build a spatial index") @@ -67,24 +62,24 @@ def nearest(self, geo: Union[Point, Polygon, LineString, MultiPoint, MultiPolygo :Returns: **neighbors** (:obj:`List[int]`): List of IDs of the closest neighbors in the index """ - if env == "QGIS": + if inside_qgis: g = QgsGeometry() g.fromWkb(geo.wkb) return self.idx.nearestNeighbor(g, num_results) - elif env == "Python": + elif rtree_avail: return self.idx.nearest(geo.bounds, num_results) else: warnings.warn("You need RTREE to build a spatial index") def delete(self, feature_id, geometry: Union[Point, Polygon, LineString, MultiPoint, MultiPolygon]): - if env == "QGIS": + if inside_qgis: g = QgsGeometry() g.fromWkb(geometry.wkb) feature = QgsFeature() feature.setGeometry(g) feature.setId(feature_id) self.idx.deleteFeature(feature) - elif env == "Python": + elif rtree_avail: self.idx.delete(feature_id, geometry.bounds) else: warnings.warn("You need RTREE to build a spatial index") diff --git a/aequilibrae/utils/interface/__init__.py b/aequilibrae/utils/interface/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/aequilibrae/utils/interface/worker_thread.py b/aequilibrae/utils/interface/worker_thread.py new file mode 100644 index 000000000..f537886bf --- /dev/null +++ b/aequilibrae/utils/interface/worker_thread.py @@ -0,0 +1,27 @@ +from aequilibrae.utils.qgis_utils import inside_qgis + +if inside_qgis: + from PyQt5.QtCore import QThread + from PyQt5.QtCore import pyqtSignal + + class WorkerThread(QThread): + if inside_qgis: + jobFinished = pyqtSignal(object) + + def __init__(self, parentThread): + QThread.__init__(self, parentThread) + + def run(self): + self.running = True + success = self.doWork() + if inside_qgis: + self.jobFinished.emit(success) + + def stop(self): + self.running = False + +else: + + class WorkerThread: # type: ignore + def __init__(self, *arg): + pass diff --git a/aequilibrae/utils/python_signal.py b/aequilibrae/utils/python_signal.py index 25b7b7da0..f7b9bcfde 100644 --- a/aequilibrae/utils/python_signal.py +++ b/aequilibrae/utils/python_signal.py @@ -3,6 +3,8 @@ import warnings from random import choice +from aequilibrae.utils.qgis_utils import inside_qgis + missing_tqdm = iutil.find_spec("tqdm") is None if not missing_tqdm: @@ -12,74 +14,75 @@ else: from tqdm import tqdm # type: ignore -qgis = iutil.find_spec("qgis") is not None - -show_status = os.environ.get("AEQ_SHOW_PROGRESS", "FALSE") == "TRUE" +show_status = os.environ.get("AEQ_SHOW_PROGRESS", "TRUE") == "TRUE" class PythonSignal: # type: ignore """ - This class only manages where the updating information will flow to, either emitting signals - to the QGIS interface to update is progress bars or to update the terminal progress bars - powered by tqdm + This class provides a pure python equivalent of the Signal passing infrastructure present in QGIS. + It takes updates in the same format as the QGIS progress bar manager used in QAequilibrae and translates + them into TQDM syntax. - Structure of data is the following: + The expected structure of update data is a list where the first element is string describing the desired action: - ['action', 'bar hierarchy', 'value', 'text', 'master'] + ['action', *args] - 'action': 'start', 'update', or 'finished_*_processing' (the last one applies in QGIS) - 'position': Position (0 for top, 1 for bottom) - 'value': Numerical value for the action (total or current) - 'text': Whatever label to be updated - 'master': The corresponding master bar for this task - """ + The currently supported formats for actions are listed here: + + 1. ['finished'] - close out the current progress bar + 2. ['refresh'] - force the current progress bar to refresh + 3. ['reset'] - reset the current progress bar + 4. ['start', num_elements: int, desc: str] - start a new progress bar + 5. ['set_position', pos: int] - set the position of the current progress bar + 6. ['set_text', desc: str] - set the description of the current progress bar + 7. ['update', pos: int, desc: str] - set both pos and desc of current progress bar - deactivate = not show_status # by default don't use progress bars in tests + """ def __init__(self, object): self.color = choice(["green", "magenta", "cyan", "blue", "red", "yellow"]) self.pbar = None # type: tqdm self.keydata = {} self.position = 0 + self.deactivate = not show_status # by default don't use progress bars in tests def emit(self, val): if self.deactivate: return - if val[0] == "set_position": - self.position = val[1] - if val[0] == "finished": - if self.pbar is not None: - self.pbar.close() + action = val[0] - elif val[0] == "refresh": + # handle actions which just send a signal onto the progress bar + if action in ["finished", "refresh", "reset"]: if self.pbar is not None: - self.pbar.refresh() + method = {"finished": "close", "refresh": "refresh", "reset": "reset"}[action] + getattr(self.pbar, method)() - elif val[0] == "reset": - if self.pbar is not None: - self.pbar.reset() + elif action == "set_position": + self.position = val[1] - elif val[0] == "key_value": - self.keydata[val[1]] = val[2] + elif action == "set_text": + desc = str(val[1]).ljust(50) + if self.pbar is not None and self.pbar.desc != desc: + self.pbar.set_description(desc, refresh=True) - elif val[0] == "start": - if missing_tqdm and not qgis: + elif action == "start": + if missing_tqdm and not inside_qgis: self.deactivate = True warnings.warn("No progress bars will be shown. Please install tqdm to see them") + return + + # Close any existing bars if self.pbar is not None: self.pbar.close() - desc = str(val[2]).rjust(50) + + # Create a new bar with the given capacity + desc = str(val[2]).ljust(50) self.pbar = tqdm(total=val[1], colour=self.color, leave=False, desc=desc, position=self.position) - elif val[0] == "update": + elif action == "update": self.pbar.update(val[1] - self.pbar.n) if len(val) > 2: - desc = str(val[2]).rjust(50) + desc = str(val[2]).ljust(50) if self.pbar.desc != desc: self.pbar.set_description(desc, refresh=True) - - elif val[0] == "set_text": - desc = str(val[1]).rjust(50) - if self.pbar.desc != desc: - self.pbar.set_description(desc, refresh=True) diff --git a/aequilibrae/utils/qgis_utils.py b/aequilibrae/utils/qgis_utils.py index b87a159ea..d066031a2 100644 --- a/aequilibrae/utils/qgis_utils.py +++ b/aequilibrae/utils/qgis_utils.py @@ -2,3 +2,4 @@ # If we can find the qgis module to import ... we are running inside qgis inside_qgis = iutil.find_spec("qgis") is not None +rtree_avail = iutil.find_spec("rtree") is not None diff --git a/aequilibrae/utils/signal.py b/aequilibrae/utils/signal.py deleted file mode 100644 index a5a467030..000000000 --- a/aequilibrae/utils/signal.py +++ /dev/null @@ -1,15 +0,0 @@ -import importlib.util as iutil - - -def noop(_): - pass - - -if iutil.find_spec("qgis") is not None: - from PyQt5.QtCore import pyqtSignal as SIGNAL # type: ignore - - noop(SIGNAL.__class__) # This should be no-op but it stops PyCharm from "optimising" the above import -else: - from aequilibrae.utils.python_signal import PythonSignal as SIGNAL # type: ignore - - noop(SIGNAL.__class__) # This should be no-op but it stops PyCharm from "optimising" the above import diff --git a/docs/source/conf.py b/docs/source/conf.py index cd90b2050..5650d40ea 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -155,8 +155,7 @@ htmlhelp_basename = "AequilibraEdoc" # -- Options for LaTeX output ------------------------------------------------ - -latex_elements = {} +# latex_elements = {} # Grouping the document tree into LaTeX files. List of tuples # (source start file, target name, title, diff --git a/docs/source/examples/creating_models/plot_create_from_gmns.py b/docs/source/examples/creating_models/plot_create_from_gmns.py index 1ebbad0ea..8ebc3134a 100644 --- a/docs/source/examples/creating_models/plot_create_from_gmns.py +++ b/docs/source/examples/creating_models/plot_create_from_gmns.py @@ -1,3 +1,20 @@ +# --- +# jupyter: +# jupytext: +# cell_metadata_filter: -all +# custom_cell_magics: kql +# text_representation: +# extension: .py +# format_name: percent +# format_version: '1.3' +# jupytext_version: 1.11.2 +# kernelspec: +# display_name: venv +# language: python +# name: python3 +# --- + +# %% """ .. _import_from_gmns: diff --git a/docs/source/examples/creating_models/plot_create_zoning.py b/docs/source/examples/creating_models/plot_create_zoning.py index e8f03afad..ab2088968 100644 --- a/docs/source/examples/creating_models/plot_create_zoning.py +++ b/docs/source/examples/creating_models/plot_create_zoning.py @@ -1,3 +1,20 @@ +# --- +# jupyter: +# jupytext: +# cell_metadata_filter: -all +# custom_cell_magics: kql +# text_representation: +# extension: .py +# format_name: percent +# format_version: '1.3' +# jupytext_version: 1.11.2 +# kernelspec: +# display_name: venv +# language: python +# name: python3 +# --- + +# %% """ .. _create_zones: @@ -37,7 +54,11 @@ from math import sqrt from shapely.geometry import Point import shapely.wkb + from aequilibrae.utils.create_example import create_example, list_examples +from aequilibrae.utils.aeq_signal import simple_progress, SIGNAL +s = SIGNAL(object) + # sphinx_gallery_thumbnail_path = "images/plot_create_zoning.png" # %% @@ -110,9 +131,10 @@ nd.renumber(i + 1300) # %% + # Now we can add them to the model and add centroids to them while we are at it. zoning = project.zoning -for i, zone_geo in enumerate(grid): +for i, zone_geo in enumerate(simple_progress(grid, s, "Add zone centroids")): zone = zoning.new(i + 1) zone.geometry = zone_geo zone.save() diff --git a/docs/source/examples/other_applications/plot_find_disconnected.py b/docs/source/examples/other_applications/plot_find_disconnected.py index 1b98fe5e8..4221fb0c6 100644 --- a/docs/source/examples/other_applications/plot_find_disconnected.py +++ b/docs/source/examples/other_applications/plot_find_disconnected.py @@ -1,3 +1,20 @@ +# --- +# jupyter: +# jupytext: +# cell_metadata_filter: -all +# custom_cell_magics: kql +# text_representation: +# extension: .py +# format_name: percent +# format_version: '1.3' +# jupytext_version: 1.11.2 +# kernelspec: +# display_name: venv +# language: python +# name: python3 +# --- + +# %% """ .. _find_disconnected_links: diff --git a/docs/source/examples/skimming/plot_skimming.py b/docs/source/examples/skimming/plot_skimming.py index f1fec8057..78388541f 100644 --- a/docs/source/examples/skimming/plot_skimming.py +++ b/docs/source/examples/skimming/plot_skimming.py @@ -1,3 +1,20 @@ +# --- +# jupyter: +# jupytext: +# cell_metadata_filter: -all +# custom_cell_magics: kql +# text_representation: +# extension: .py +# format_name: percent +# format_version: '1.3' +# jupytext_version: 1.11.2 +# kernelspec: +# display_name: venv +# language: python +# name: python3 +# --- + +# %% """ .. _example_usage_skimming: diff --git a/docs/source/useful_information/development/softwaredevelopment.rst b/docs/source/useful_information/development/softwaredevelopment.rst index 0b78b7675..60b036649 100644 --- a/docs/source/useful_information/development/softwaredevelopment.rst +++ b/docs/source/useful_information/development/softwaredevelopment.rst @@ -10,20 +10,20 @@ AequilibraE and lists the requirements for all pull requests to be merged into m Software Design and requirements -------------------------------- -The most important piece of AequilibraE's backend is, without a doubt, `numpy `__. +The most important piece of AequilibraE's backend is, without a doubt, `NumPy `__. Whenever vectorization is not possible through the use of NumPy functions, compiled code is developed in order to accelerate computation. All compiled code is written in `Cython `_. We have not yet found an ideal source of recommendations for developing AequilibraE, but a good initial take can be -found in `this article. `__ +found in `this article `_. Development Install ------------------- As it goes with most Python packages, we recommend using a dedicated virtual environment to develop AequilibraE. -AequilibraE is currently tested for Python 3.9, 3.10, 3.11 & 3.12, but we recommend using Python 3.9 or 2.10 for +AequilibraE is currently tested for Python 3.9, 3.10, 3.11 & 3.12, but we recommend using Python 3.11 or 3.12 for development. We also assume you are using `PyCharm `_ or @@ -40,11 +40,11 @@ Non-Windows Windows ~~~~~~~ -Make sure to clone the AequilibraE repository and run the following from withinthat cloned repo using an elevated command prompt. +Make sure to clone the AequilibraE repository and run the following from within that cloned repo using an elevated command prompt. + +Python 3.12 (or whatever version you chose) needs to be installed, and the following instructions assume you are +using `Chocolatey `_ as a package manager. -Python 3.9 (or whatever version you chose) needs to be installed, and the -following instructions assume you are using `Chocolatey -`_ as a package manager. :: cinst python3 --version 3.9 @@ -62,7 +62,6 @@ Setup Pycharm with the virtual environment you just created. Settings -> Project -> Project Interpreter -> Gear Icon -> Add -> Existing VEnv - Development Guidelines ----------------------- @@ -74,8 +73,8 @@ Style * Python code should follow (mostly) the `pycodestyle style guide `_ * Python docstrings should follow the `reStructuredText Docstring Format `_ -* We are big fans of auto-code formatting. - For that, we use `Black `_ +* We are big fans of auto-code formatting. For that, we use `ruff `_ and + `Black `_. * Negating some of what we have said so far, we use maximum line length of 120 characters Imports @@ -138,23 +137,25 @@ AequilibraE uses the de-facto Python standard for `versioning - MAJOR designates a major revision number for the software. Usually, raising a major revision number means that you are adding a lot of features, breaking backward-compatibility or drastically changing the API. -- MINOR usually groups moderate changes to the software like bug fixes or minor improvements. Most of the time, end \ - users can upgrade with no risks their software to a new minor release. In case an API changes, the end users will be \ +- MINOR usually groups moderate changes to the software like bug fixes or minor improvements. Most of the time, end + users can upgrade with no risks their software to a new minor release. In case an API changes, the end users will be notified with deprecation warnings. In other words, API stability is usually a promise between two minor releases. - Some software use a third level: MICRO. This level is used when the release cycle of minor release is quite long. In that case, micro releases are dedicated to bug fixes. -AequilibraE's development is happening mostly within the Minor and Micro levels, as we are still in version 0 +AequilibraE's development is happening mostly within the Minor and Micro levels. Testing ~~~~~~~~ -AequilibraE testing is done with three tools: +AequilibraE style checking is done with two tools: -* `Flake8 `_, a tool to check Python code style +* `ruff `_, a tool to check Python code style * `Black `_, The uncompromising code formatter +And testing is done using `pytest `_. + Testing is done for Windows, MacOs and Ubuntu Linux on all supported Python versions, and we use GitHub Actions to run these tests. These tests need to pass and additionally somebody has to manually review the code before merging it into master (or returning for corrections). @@ -164,16 +165,17 @@ are now the correct results. In order to update the test targets, first determi failing and then review the failing lines in the source files. These are easy to identify since each test ultimately comes down to one of Python's various types of ``assert`` statements. Once you identify which ``assert`` is failing, you can work your way back through the code that creates the test targets in -order to update it. After updating the test targets, re-run the tests to confirm the new code passes all +order to update it. After updating the test targets, re-run the tests to confirm the new code passes all the tests. Documentation ~~~~~~~~~~~~~~ -All the AequilibraE documentation is (unfortunately) written in `reStructuredText -`__ and built with `Sphinx `__. -Although Restructured Text is often unnecessarily convoluted to write, Sphinx is capable of converting it to standard- -looking html pages, while also bringing the docstring documentation along for the ride. +All the AequilibraE documentation is (unfortunately) written in +`reStructuredText `_ and built with +`Sphinx `_. +Although reStructuredText is often unnecessarily convoluted to write, Sphinx is capable of converting it to standard- +looking HTML pages, while also bringing the docstring documentation along for the ride. To build the documentation, first make sure the required packages are installed. If you have correctly setup the dev environment above, then nothing else is needed. However, if you have incorrectly only run:: @@ -184,21 +186,52 @@ Then you will have to run:: python -m pipenv install --dev - Next, build the documentation in html format with the following commands run from the ``root`` folder:: sphinx-apidoc -T -o docs/source/generated aequilibrae cd docs make html +Working with progress bars +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +From version 1.1.0, AequilibraE is capable of displaying progress bars in Jupyter Notebooks using +`TQDM `_. For the companion QGIS plugin, `PyQt5 `_ +is used to emit messages in progress bars. + +AequilibraE provides a wrapper class `SIGNAL` that will use the appropriate underlying mechanism to display +the progress bars. + +.. code-block:: python + + from aequilibrae.utils.signal import SIGNAL + + class MyClass: + signal = SIGNAL(object) + + def my_method(self): + signal.emit(['start', 10, 'running my method']) + for i in range(0, 10): + signal.emit(['update', i, f"Current val: {i}"]) + sleep(0.4) + + +Calling `MyClass().my_method()` will generate a progress bar in the following form (outside QGIS). + +.. code-block:: text + + running my method : 30%|█████▍ | 3/10 [00:01<00:02, 2.50it/s] + +The full set of emitted signals which can be used to control progress bars is given in `python_signal.py` + Releases ~~~~~~~~~ -AequilibraE releases are automatically uploaded to the `Python Package Index -`_ (pypi) at each new GitHub release (2 to 6 times per year). +AequilibraE releases are automatically uploaded to the `Python Package Index +`_ (PyPi) at each new GitHub release (2 to 6 times per year). Acknowledgement ~~~~~~~~~~~~~~~ A LOT of the structure around the documentation was borrowed (copied) from the excellent project `ActivitySim -`_ \ No newline at end of file +`_. \ No newline at end of file diff --git a/docs/source/useful_information/validation_benchmarking/IPF_benchmark.ipynb b/docs/source/useful_information/validation_benchmarking/IPF_benchmark.ipynb index 1759d4d8a..8fc00f073 100644 --- a/docs/source/useful_information/validation_benchmarking/IPF_benchmark.ipynb +++ b/docs/source/useful_information/validation_benchmarking/IPF_benchmark.ipynb @@ -23,8 +23,7 @@ "of using both 32-bit and 64-bit floating-point seed matrices, which has direct impact\n", "on cache use and consequently computational performance.\n", "\n", - "In this section, we compare the\n", - "runtime of AequilibraE's current implementation of IPF, \n", + "In this section, we compare the runtime of AequilibraE's current implementation of IPF, \n", "with a general IPF algorithm written in pure Python, available\n", "[here](https://github.com/joshchea/python-tdm/blob/master/scripts/CalcDistribution.py).\n", "\n", @@ -32,8 +31,8 @@ "code. From the figure below, we can notice that the runtimes were practically the same for the\n", "instances with 1,000 zones or less. As the number of zones increases, AequilibraE demonstrated to be slightly faster\n", "than the benchmark python code, while applying IPF to a 32-bit NumPy array (np.float32) was significantly faster.\n", - "It's worth mentioning that\n", - "the user can set up a threshold for AequilibraE's IPF function, as well as use more than one\n", + "\n", + "It's worth mentioning that the user can set up a threshold for AequilibraE's IPF function, as well as use more than one\n", "core to speed up the fitting process.\n", "\n", "![AequilibraE's IPF runtime](../../images/ipf_runtime_aequilibrae_vs_benchmark.png)\n", diff --git a/tests/aequilibrae/log/test_log.py b/tests/aequilibrae/log/test_log.py index 405e51ad4..273a283b1 100644 --- a/tests/aequilibrae/log/test_log.py +++ b/tests/aequilibrae/log/test_log.py @@ -9,8 +9,6 @@ from ...data import siouxfalls_project from aequilibrae.project import Project -from aequilibrae import log -from aequilibrae.log import _setup_logger class TestLog(TestCase): @@ -58,23 +56,10 @@ def get_logger_file(cls, logger, name="aequilibrae") -> str: return handlers[0].baseFilename - def test_logger_has_single_handler_named_aquilibrae(self): - _setup_logger() - logger = _setup_logger() - assert len(self.get_handlers(logger)) == 1 - - def test_default_logger_handler(self): - logger = _setup_logger() - assert self.get_logger_file(logger).endswith("aequilibrae.log") - def test_project_logger(self, create_project): project = create_project() assert self.get_logger_file(project.logger).startswith(project.project_base_path) - def test_activate_project_leaves_global_logger_intact(self, create_project): - project = create_project() - assert self.get_logger_file(log.global_logger) != self.get_logger_file(project.logger) - def test_multiple_projects_have_separate_logger(self, create_project): a = create_project() b = create_project() diff --git a/tests/aequilibrae/paths/test_transit_graph_builder.py b/tests/aequilibrae/paths/test_transit_graph_builder.py index 230585216..dd4735b64 100644 --- a/tests/aequilibrae/paths/test_transit_graph_builder.py +++ b/tests/aequilibrae/paths/test_transit_graph_builder.py @@ -38,7 +38,7 @@ def setUp(self) -> None: def tearDown(self) -> None: self.project.close() - def test_create_line_gemoetry(self): + def test_create_line_geometry(self): self.project.network.build_graphs() for connector_method in ["overlapping_regions", "nearest_neighbour"]: for method in ["connector project match", "direct"]: diff --git a/tests/aequilibrae/project/test_osm_downloader.py b/tests/aequilibrae/project/test_osm_downloader.py index 7c5fe3bb7..e0d0289b0 100644 --- a/tests/aequilibrae/project/test_osm_downloader.py +++ b/tests/aequilibrae/project/test_osm_downloader.py @@ -1,4 +1,3 @@ -import importlib.util as iutil import os from random import random from tempfile import gettempdir @@ -8,9 +7,6 @@ from aequilibrae.project.network.osm.osm_downloader import OSMDownloader -spec = iutil.find_spec("PyQt5") -pyqt = spec is not None - class TestOSMDownloader(TestCase): def setUp(self) -> None: diff --git a/tests/aequilibrae/utils/test_signal.py b/tests/aequilibrae/utils/test_signal.py new file mode 100644 index 000000000..55874eb5f --- /dev/null +++ b/tests/aequilibrae/utils/test_signal.py @@ -0,0 +1,23 @@ +from aequilibrae.utils.aeq_signal import simple_progress + + +class EmitCapture: + """A signal like object that just captures any emits that are given to it""" + + def __init__(self): + self.emits = [] + + def emit(self, e): + self.emits.append(e) + + +def test_simple_progress(): + emit_capturer = EmitCapture() + for i in simple_progress([1, 2, 3], emit_capturer): + print(i) + assert emit_capturer.emits == [ + ["start", 3, "0/3"], + ["update", 1, "1/3"], + ["update", 2, "2/3"], + ["update", 3, "3/3"], + ]