Skip to content

Commit

Permalink
Pedro/messaging (#578)
Browse files Browse the repository at this point in the history
* fixes messaging

* fixes messaging

* messaging

* updates messaging

* updates messaging

* updates messaging

* Reinstates worker thread

* return worker thread to files

* return worker thread

* .

* upload docs

* fixes signals for qgis

* GTFS messages and linting

* GTFS messages

* GTFS signals

* .

* fixes docs

* update signals

* updates signals

* .

* update network skimming signals

* .

* Updates signal in GTFS loader

* Apply suggestions from code review

* updates signals

* removes double pbar from GTFS

* Updates assignment signals

* Updates linear approximation signals

* .

* removes unnecessary use of the temporary directory

* .

* major rework of the signal class

* lint

* ruff

* Update softwaredevelopment.rst

* Update .github/workflows/documentation.yml

Co-authored-by: Jamie Cook <[email protected]>

* update latex engine

* Update conf.py

* .

* .

* Update conf.py

* removes unicode char

* updates signals and does not create a pdf file

* fixes workflow

* Apply suggestions from code review

---------

Co-authored-by: pveigadecamargo <[email protected]>
Co-authored-by: Renata Imai <[email protected]>
Co-authored-by: Jamie Cook <[email protected]>
Co-authored-by: Jamie Cook <[email protected]>
  • Loading branch information
5 people authored Oct 31, 2024
1 parent b4e760a commit 692579c
Show file tree
Hide file tree
Showing 34 changed files with 441 additions and 306 deletions.
8 changes: 2 additions & 6 deletions .github/workflows/documentation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ jobs:
- name: Build documentation
run: |
jupyter nbconvert --to rst docs/source/useful_information/validation_benchmarking/IPF_benchmark.ipynb
sphinx-build -M latexpdf docs/source docs/source/_static
sphinx-build -b html docs/source docs/build
python3 -m zipfile -c AequilibraE.zip docs/build
cp AequilibraE.zip docs/source/_static
Expand Down Expand Up @@ -120,9 +119,6 @@ jobs:
SOURCE_DIR: 'docs/build/htmlv/' # optional: defaults to entire repository
DEST_DIR: 'python/' # optional: defaults to entire repository

- name: Rename documentation folder to push to S3 DEV
if: ${{ (github.event_name == 'pull_request') && (env.HAS_SECRETS == 'true') }}
run: mv docs/build/html/ docs/build/${{ github.event.number }}/

- name: Upload to DEV on S3
if: ${{ (github.event_name == 'pull_request') && (env.HAS_SECRETS == 'true') }}
Expand All @@ -134,5 +130,5 @@ jobs:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
AWS_REGION: 'us-east-1' # optional: defaults to us-east-1
SOURCE_DIR: 'docs/build/' # optional: defaults to entire repository
DEST_DIR: 'python/dev/' # optional: defaults to entire repository
SOURCE_DIR: 'docs/build/html/' # optional: defaults to entire repository
DEST_DIR: 'python/dev/${{ github.event.number }}/' # optional: defaults to entire repository
23 changes: 0 additions & 23 deletions aequilibrae/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,6 @@
from aequilibrae import transit
from aequilibrae import project

try:
from aequilibrae.paths.AoN import path_computation
except Exception as e:
global_logger.warning(f"Failed to import compiled modules. {e.args}")
raise

from aequilibrae.distribution import Ipf, GravityApplication, GravityCalibration, SyntheticGravityModel
from aequilibrae.matrix import AequilibraeMatrix
from aequilibrae import distribution
Expand All @@ -32,20 +26,3 @@
from aequilibrae import paths

name = "aequilibrae"


def setup():
sys.dont_write_bytecode = True
cleaning()


def cleaning():
p = tempfile.gettempdir() + "/aequilibrae_*"
for f in glob.glob(p):
try:
os.unlink(f)
except Exception as err:
global_logger.warning(err.__str__())


setup()
9 changes: 0 additions & 9 deletions aequilibrae/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,8 @@ def clear(self):

def _setup_logger():
# CREATE THE GLOBAL LOGGER

par = Parameters._default
do_log = par["system"]["logging"]
temp_folder = tempfile.gettempdir()

logger = logging.getLogger("aequilibrae")
logger.setLevel(logging.DEBUG)

if not len(logger.handlers) and do_log:
log_file = os.path.join(temp_folder, "aequilibrae.log")
logger.addHandler(get_log_handler(log_file))
return logger


Expand Down
27 changes: 15 additions & 12 deletions aequilibrae/paths/all_or_nothing.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,27 @@
except ImportError as ie:
global_logger.warning(f"Could not import procedures from the binary. {ie.args}")

from aequilibrae.utils.signal import SIGNAL
from aequilibrae.utils.aeq_signal import SIGNAL
from aequilibrae.utils.interface.worker_thread import WorkerThread

if False:
from .results import AssignmentResults
from .graph import Graph


class allOrNothing:
class allOrNothing(WorkerThread):
signal = SIGNAL(object)

def __init__(self, class_name, matrix, graph, results):
# type: (AequilibraeMatrix, Graph, AssignmentResults)->None
self.assignment: SIGNAL = None
# type: (str, AequilibraeMatrix, Graph, AssignmentResults)->None
WorkerThread.__init__(self, None)

self.class_name = class_name
self.matrix = matrix
self.graph = graph
self.results = results
self.aux_res = MultiThreadedAoN()
self.signal.emit(["start", self.matrix.zones, self.class_name])

if results._graph_id != graph._id:
raise ValueError("Results object not prepared. Use --> results.prepare(graph)")
Expand All @@ -42,16 +46,12 @@ def __init__(self, class_name, matrix, graph, results):
elif not np.array_equal(matrix.index, graph.centroids):
raise ValueError("Matrix and graph do not have compatible sets of centroids.")

def _build_signal(self):
if self.assignment is None:
self.assignment = SIGNAL(object)
self.assignment.emit(["start", self.matrix.zones, self.class_name])

def doWork(self):
self.execute()

def execute(self):
self._build_signal()
msg = f"All-or-Nothing - Traffic Class: {self.class_name} - Zones: 0/{self.matrix.zones}"
self.signal.emit(["set_text", msg])
self.report = []
self.cumulative = 0
self.aux_res.prepare(self.graph, self.results)
Expand All @@ -70,7 +70,9 @@ def execute(self):
pool.apply_async(self.func_assig_thread, args=(orig, all_threads))
pool.close()
pool.join()
self.assignment.emit(["update", self.matrix.index.shape[0], self.class_name])
val = self.matrix.index.shape[0]
msg = f"All-or-Nothing - Traffic Class: {self.class_name} - Zones: {val}/{self.matrix.zones}"
self.signal.emit(["set_text", msg])
# TODO: Multi-thread this sum
self.results.compact_link_loads = np.sum(self.aux_res.temp_link_loads, axis=0)
assign_link_loads(
Expand All @@ -89,4 +91,5 @@ def func_assig_thread(self, origin, all_threads):
if x != origin:
self.report.append(x)
if self.cumulative % 10 == 0:
self.assignment.emit(["update", self.cumulative, self.class_name])
msg = f"All-or-Nothing - Traffic Class: {self.class_name} - Zones: {self.cumulative}/{self.matrix.zones}"
self.signal.emit(["set_text", msg])
2 changes: 1 addition & 1 deletion aequilibrae/paths/connectivity_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from aequilibrae.paths.AoN import connectivity_multi_threaded

from aequilibrae.utils.core_setter import set_cores
from aequilibrae.utils.signal import SIGNAL
from aequilibrae.utils.aeq_signal import SIGNAL

sys.dont_write_bytecode = True

Expand Down
6 changes: 3 additions & 3 deletions aequilibrae/paths/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -518,13 +518,13 @@ def __determine_types__(self, new_type, current_type):
self.logger.warning("Could not convert {} - {}".format(new_type, verr.__str__()))
if isinstance(new_type, int):
def_type = int
if current_type == float:
if current_type is float:
def_type = float
elif current_type == str:
elif current_type is str:
def_type = str
elif isinstance(new_type, float):
def_type = float
if current_type == str:
if current_type is str:
def_type = str
elif isinstance(new_type, str):
def_type = str
Expand Down
47 changes: 22 additions & 25 deletions aequilibrae/paths/linear_approximation.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,18 @@
if False:
from aequilibrae.paths.traffic_assignment import TrafficAssignment

from aequilibrae.utils.signal import SIGNAL
from aequilibrae.utils.python_signal import PythonSignal
from aequilibrae.utils.aeq_signal import SIGNAL, simple_progress
from aequilibrae.utils.interface.worker_thread import WorkerThread


class LinearApproximation:
def __init__(self, assig_spec, algorithm, project=None) -> None:
self.equilibration = SIGNAL(object)
self.assignment = SIGNAL(object)
if isinstance(self.assignment, PythonSignal):
self.assignment.pos = 1
class LinearApproximation(WorkerThread):
equilibration = SIGNAL(object)
assignment = SIGNAL(object)
signal = SIGNAL(object)

def __init__(self, assig_spec, algorithm, project=None) -> None:
WorkerThread.__init__(self, None)
self.signal.emit(["set_text", "Linear Approximation"])
self.logger = project.logger if project else logging.getLogger("aequilibrae")

self.project_path = project.project_base_path if project else gettempdir()
Expand Down Expand Up @@ -468,28 +469,28 @@ def execute(self): # noqa: C901

self.aons[c._id] = allOrNothing(c._id, c.matrix, c.graph, c._aon_results)

self.equilibration.emit(["start", self.max_iter, "Equilibrium Assignment"])
self.logger.info(f"{self.algorithm} Assignment STATS")
self.logger.info("Iteration, RelativeGap, stepsize")
for self.iter in range(1, self.max_iter + 1): # noqa: B020

msg = "Equilibrium Assignment"
for self.iter in simple_progress(range(1, self.max_iter + 1), self.signal, msg): # noqa: B020
self.iteration_issue = []
self.equilibration.emit(["key_value", "rgap", self.rgap])
self.equilibration.emit(["key_value", "iterations", self.iter])

aon_flows = []

self.__maybe_create_path_file_directories()

for c in self.traffic_classes: # type: TrafficClass
self.assignment.emit(["start", c.matrix.zones, "All-or-Nothing"])
msg = f"All-or-Nothing - Traffic Class: {c._id}"
self.signal.emit(["set_text", msg])
# cost = c.fixed_cost / c.vot + self.congested_time # now only once
cost = c.fixed_cost + self.congested_time
aggregate_link_costs(cost, c.graph.compact_cost, c.results.crosswalk)

aon = self.aons[c._id] # This is a new object every iteration, with new aux_res
self.assignment.emit(["refresh"])
self.assignment.emit(["reset"])
aon.assignment = self.assignment
self.signal.emit(["refresh"])
self.signal.emit(["reset"])
aon.signal = self.signal

aon.execute()
c._aon_results.link_loads *= c.pce
Expand Down Expand Up @@ -574,10 +575,10 @@ def execute(self): # noqa: C901

if self.algorithm == "all-or-nothing":
break

# Check convergence
# This needs to be done with the current costs, and not the future ones
converged = self.check_convergence() if self.iter > 1 else False
self.equilibration.emit(["update", self.iter, f"Equilibrium Assignment: RGap - {self.rgap:.3E}"])
self.vdf.apply_vdf(
self.congested_time,
self.fw_total_flow,
Expand All @@ -596,8 +597,6 @@ def execute(self): # noqa: C901
self.convergence_report["rgap"].append(self.rgap)
self.convergence_report["warnings"].append("; ".join(self.iteration_issue))
self.convergence_report["alpha"].append(self.stepsize)
self.equilibration.emit(["key_value", "rgap", self.rgap])
self.equilibration.emit(["key_value", "iterations", self.iter])

if self.algorithm in ["cfw", "bfw"]:
self.convergence_report["beta0"].append(self.betas[0])
Expand All @@ -620,16 +619,17 @@ def execute(self): # noqa: C901
idx = c.graph.skim_fields.index(self.time_field)
c.graph.skims[:, idx] = self.congested_time[:]

msg = f"Equilibrium Assignment - Iteration: {self.iter}/{self.max_iter} - RGap: {self.rgap:.6}"
self.signal.emit(["set_text", msg])

for c in self.traffic_classes:
c.results.link_loads /= c.pce
c.results.total_flows()

if (self.rgap > self.rgap_target) and (self.algorithm != "all-or-nothing"):
self.logger.error(f"Desired RGap of {self.rgap_target} was NOT reached")
self.logger.info(f"{self.algorithm} Assignment finished. {self.iter} iterations and {self.rgap} final gap")
self.equilibration.emit(["update", self.max_iter, f"Equilibrium Assignment: RGap - {self.rgap:.3E}"])
self.assignment.emit(["finished"])
self.equilibration.emit(["finished"])
self.signal.emit(["finished"])

def __derivative_of_objective_stepsize_dependent(self, stepsize, const_term):
"""The stepsize-dependent part of the derivative of the objective function. If fixed costs are defined,
Expand Down Expand Up @@ -715,6 +715,3 @@ def check_convergence(self):
if self.rgap_target >= self.rgap:
return True
return False

def signal_handler(self, val):
self.assignment.emit(val)
20 changes: 10 additions & 10 deletions aequilibrae/paths/network_skimming.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@
from aequilibrae.paths.multi_threaded_skimming import MultiThreadedNetworkSkimming
from aequilibrae.paths.results.skim_results import SkimResults
from aequilibrae.utils.core_setter import set_cores
from aequilibrae.utils.signal import SIGNAL
from aequilibrae.utils.aeq_signal import SIGNAL
from aequilibrae.utils.interface.worker_thread import WorkerThread

sys.dont_write_bytecode = True


class NetworkSkimming:
class NetworkSkimming(WorkerThread):
signal = SIGNAL(object)

"""
.. code-block:: python
Expand Down Expand Up @@ -51,9 +54,8 @@ class NetworkSkimming:
>>> project.close()
"""

skimming = SIGNAL(object)

def __init__(self, graph, origins=None, project=None):
WorkerThread.__init__(self, None)
self.project = project
self.origins = origins
self.graph = graph
Expand All @@ -70,7 +72,7 @@ def doWork(self):

def execute(self):
"""Runs the skimming process as specified in the graph"""
self.skimming.emit(["zones finalized", 0])
self.signal.emit(["start", self.graph.num_zones, ""])
self.results.cores = self.cores
self.results.prepare(self.graph)
self.aux_res = MultiThreadedNetworkSkimming()
Expand All @@ -91,8 +93,8 @@ def execute(self):
self.procedure_id = uuid4().hex
self.procedure_date = str(datetime.today())

self.skimming.emit(["text skimming", "Saving Outputs"])
self.skimming.emit(["finished_threaded_procedure", None])
self.signal.emit(["set_text", "Saving Outputs"])
self.signal.emit(["finished"])

def set_cores(self, cores: int) -> None:
"""
Expand Down Expand Up @@ -143,6 +145,4 @@ def __func_skim_thread(self, origin, all_threads):
if x != origin:
self.report.append(x)

self.skimming.emit(["zones finalized", self.cumulative])
txt = str(self.cumulative) + " / " + str(self.matrix.zones)
self.skimming.emit(["text skimming", txt])
self.signal.emit(["update", self.cumulative, f"{self.cumulative}/{self.graph.num_zones}"])
14 changes: 8 additions & 6 deletions aequilibrae/project/network/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,25 @@
from aequilibrae.project.network.periods import Periods
from aequilibrae.project.project_creation import req_link_flds, req_node_flds, protected_fields
from aequilibrae.utils.db_utils import commit_and_close
from aequilibrae.utils.signal import SIGNAL
from aequilibrae.utils.aeq_signal import SIGNAL
from aequilibrae.utils.interface.worker_thread import WorkerThread
from aequilibrae.utils.qgis_utils import inside_qgis
from aequilibrae.utils.spatialite_utils import connect_spatialite


class Network:
class Network(WorkerThread):
"""
Network class. Member of an AequilibraE Project
"""

netsignal = SIGNAL(object)

req_link_flds = req_link_flds
req_node_flds = req_node_flds
protected_fields = protected_fields
link_types: LinkTypes = None
signal = SIGNAL(object)

def __init__(self, project) -> None:
WorkerThread.__init__(self, None)
from aequilibrae.paths import Graph

self.graphs = {} # type: Dict[Graph]
Expand Down Expand Up @@ -211,13 +213,13 @@ def create_from_osm(
polygons.append(subarea)
self.logger.info("Downloading data")
dwnloader = OSMDownloader(polygons, modes, logger=self.logger)
dwnloader.downloading = self.netsignal
dwnloader.signal = self.signal
dwnloader.doWork()

self.logger.info("Building Network")
self.builder = OSMBuilder(dwnloader.data, project=self.project, model_area=model_area, clean=clean)

self.builder.building = self.netsignal
self.builder.signal = self.signal
self.builder.doWork()

self.logger.info("Network built successfully")
Expand Down
Loading

0 comments on commit 692579c

Please sign in to comment.