Skip to content

Commit

Permalink
Merge pull request #4229 from hove-io/handle_when_depart_is_in_an_exc…
Browse files Browse the repository at this point in the history
…luded_zone

[Distributed Excluded Zone] Handle when depart is in an excluded zone
  • Loading branch information
xlqian authored Mar 6, 2024
2 parents 7455a45 + 5ac9687 commit db3c526
Show file tree
Hide file tree
Showing 10 changed files with 250 additions and 43 deletions.
86 changes: 68 additions & 18 deletions source/jormungandr/jormungandr/excluded_zones_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,23 @@
# www.navitia.io

import boto3
import pytz
import shapely.iterops
from dateutil import parser
from botocore.client import Config
import logging
import json
import shapely
import datetime
from typing import Dict

from jormungandr import app, memory_cache, cache
from jormungandr.resource_s3_object import ResourceS3Object


class ExcludedZonesManager:
excluded_shapes = dict() # type: Dict[str, shapely.geometry]

@staticmethod
@cache.memoize(app.config[str('CACHE_CONFIGURATION')].get(str('ASGARD_S3_DATA_TIMEOUT'), 24 * 60))
def get_object(resource_s3_object):
Expand All @@ -49,10 +57,22 @@ def get_object(resource_s3_object):
return {}

@staticmethod
def is_activated(activation_period, date):
if activation_period is None:
return False

def is_between(period, d):
from_date = parser.parse(period['from']).date()
to_date = parser.parse(period['to']).date()
return from_date <= d < to_date

return any((is_between(period, date) for period in activation_period))

@classmethod
@memory_cache.memoize(
app.config[str('MEMORY_CACHE_CONFIGURATION')].get(str('ASGARD_S3_DATA_TIMEOUT'), 5 * 60)
app.config[str('MEMORY_CACHE_CONFIGURATION')].get(str('ASGARD_S3_DATA_TIMEOUT'), 10 * 60)
)
def get_excluded_zones(instance_name=None, mode=None):
def get_all_excluded_zones(cls):
bucket_name = app.config.get(str("ASGARD_S3_BUCKET"))
folder = "excluded_zones"

Expand All @@ -67,28 +87,58 @@ def get_excluded_zones(instance_name=None, mode=None):
continue
try:
json_content = ExcludedZonesManager.get_object(ResourceS3Object(obj, None))

if instance_name is not None and json_content.get('instance') != instance_name:
continue
if mode is not None and mode not in json_content.get("modes", []):
continue

excluded_zones.append(json_content)
except Exception:
logger.exception(
"Error on fetching excluded zones: bucket: {}, instance: {}, mode ={}",
bucket_name,
instance_name,
mode,
)
logger.exception("Error on fetching excluded zones: bucket: {}", bucket_name)
continue

except Exception:
logger.exception(
"Error on fetching excluded zones: bucket: {}, instance: {}, mode ={}",
"Error on fetching excluded zones: bucket: {}",
bucket_name,
instance_name,
mode,
)
excluded_shapes = dict()
for zone in excluded_zones:
# remove the DAMN MYPY to use walrus operator!!!!!
shape_str = zone.get('shape')
if shape_str:
continue
try:
shape = shapely.wkt.loads(shape_str)
except Exception as e:
logger.error("error occurred when load shapes of excluded zones: " + str(e))
continue
excluded_shapes[zone.get("poi")] = shape

cls.excluded_shapes = excluded_shapes

return excluded_zones

@staticmethod
@memory_cache.memoize(
app.config[str('MEMORY_CACHE_CONFIGURATION')].get(str('ASGARD_S3_DATA_TIMEOUT'), 10 * 60)
)
def get_excluded_zones(instance_name=None, mode=None, date=None):
excluded_zones = []
for json_content in ExcludedZonesManager.get_all_excluded_zones():
if instance_name is not None and json_content.get('instance') != instance_name:
continue
if mode is not None and mode not in json_content.get("modes", []):
continue
if date is not None and not ExcludedZonesManager.is_activated(
json_content.get('activation_periods'), date
):
continue
excluded_zones.append(json_content)

return excluded_zones

@classmethod
@cache.memoize(app.config[str('CACHE_CONFIGURATION')].get(str('ASGARD_S3_DATA_TIMEOUT'), 10 * 60))
def is_excluded(cls, obj, mode, timestamp):
date = datetime.datetime.fromtimestamp(timestamp, tz=pytz.timezone("UTC")).date()
# update excluded zones
excluded_zones = ExcludedZonesManager.get_excluded_zones(instance_name=None, mode=mode, date=date)
poi_ids = set((zone.get("poi") for zone in excluded_zones))
shapes = (cls.excluded_shapes.get(poi_id) for poi_id in poi_ids if cls.excluded_shapes.get(poi_id))
p = shapely.geometry.Point(obj.lon, obj.lat)
return any((shape.contains(p) for shape in shapes))
2 changes: 1 addition & 1 deletion source/jormungandr/jormungandr/georef.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ def inner(stop_area_uri, instance_publication_date):
logging.getLogger(__name__).info(
'PtRef, Unable to find stop_point with filter {}'.format(stop_area_uri)
)
return {sp.uri for sp in result.stop_points}
return {(sp.uri, sp.coord.lon, sp.coord.lat) for sp in result.stop_points}

return inner(uri, self.instance.publication_date)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from math import sqrt
from .helper_utils import get_max_fallback_duration
from jormungandr.street_network.street_network import StreetNetworkPathType
from jormungandr import new_relic
from jormungandr import new_relic, excluded_zones_manager
from jormungandr.fallback_modes import FallbackModes
import logging
from .helper_utils import timed_logger
Expand All @@ -44,6 +44,9 @@
from jormungandr.exceptions import GeoveloTechnicalError
from .helper_exceptions import StreetNetworkException
from jormungandr.scenarios.utils import include_poi_access_points
from jormungandr.scenarios.helper_classes.places_free_access import FreeAccessObject
import functools
import itertools

# The basic element stored in fallback_durations.
# in DurationElement. can be found:
Expand Down Expand Up @@ -189,16 +192,27 @@ def _update_free_access_with_free_radius(self, free_access, proximities_by_crowf
free_radius_distance = self._request.free_radius_to
if free_radius_distance is not None:
free_access.free_radius.update(
p.uri for p in proximities_by_crowfly if p.distance < free_radius_distance
FreeAccessObject(p.uri, p.stop_point.coord.lon, p.stop_point.coord.lat)
for p in proximities_by_crowfly
if p.distance < free_radius_distance
)

def _get_all_free_access(self, proximities_by_crowfly):
free_access = self._places_free_access.wait_and_get()
self._update_free_access_with_free_radius(free_access, proximities_by_crowfly)
all_free_access = free_access.crowfly | free_access.odt | free_access.free_radius
if self._request['_use_excluded_zones'] and all_free_access:
# the mode is hardcoded to walking because we consider that we access to all free_access places
# by walking
is_excluded = functools.partial(
excluded_zones_manager.ExcludedZonesManager.is_excluded,
mode='walking',
timestamp=self._request['datetime'],
)
all_free_access = set(itertools.filterfalse(is_excluded, all_free_access))
return all_free_access

def _build_places_isochrone(self, proximities_by_crowfly, all_free_access):
def _build_places_isochrone(self, proximities_by_crowfly, all_free_access_uris):
places_isochrone = []
stop_points = []
# in this map, we store all the information that will be useful where we update the final result
Expand All @@ -207,16 +221,17 @@ def _build_places_isochrone(self, proximities_by_crowfly, all_free_access):
# - stop_point_uri: to which stop point the access point is attached
# - access_point: the actual access_point, of type pt_object
access_points_map = defaultdict(list)

if self._mode == FallbackModes.car.name or self._request['_access_points'] is False:
# if a place is freely accessible, there is no need to compute it's access duration in isochrone
places_isochrone.extend(p for p in proximities_by_crowfly if p.uri not in all_free_access)
stop_points.extend(p for p in proximities_by_crowfly if p.uri not in all_free_access)
places_isochrone.extend(p for p in proximities_by_crowfly if p.uri not in all_free_access_uris)
stop_points.extend(p for p in proximities_by_crowfly if p.uri not in all_free_access_uris)
places_isochrone = self._streetnetwork_service.filter_places_isochrone(places_isochrone)
else:
proximities_by_crowfly = self._streetnetwork_service.filter_places_isochrone(proximities_by_crowfly)
for p in proximities_by_crowfly:
# if a place is freely accessible, there is no need to compute it's access duration in isochrone
if p.uri in all_free_access:
if p.uri in all_free_access_uris:
continue
# what we are looking to compute, is not the stop_point, but the entrance and exit of a stop_point
# if any of them are existent
Expand All @@ -231,14 +246,14 @@ def _build_places_isochrone(self, proximities_by_crowfly, all_free_access):

return places_isochrone, access_points_map, stop_points

def _fill_fallback_durations_with_free_access(self, fallback_durations, all_free_access):
def _fill_fallback_durations_with_free_access(self, fallback_durations, all_free_access_uris):
# Since we have already places that have free access, we add them into the result
from collections import deque

deque(
(
fallback_durations.update({uri: DurationElement(0, response_pb2.reached, None, 0, None, None)})
for uri in all_free_access
for uri in all_free_access_uris
),
maxlen=1,
)
Expand Down Expand Up @@ -343,6 +358,8 @@ def _do_request(self):

all_free_access = self._get_all_free_access(proximities_by_crowfly)

all_free_access_uris = set((free_access.uri for free_access in all_free_access))

# places_isochrone: a list of pt_objects selected from proximities_by_crowfly that will be sent to street
# network service to compute the routing matrix
# access_points_map: a map of access_point.uri vs a list of tuple whose elements are stop_point.uri, length and
Expand All @@ -353,15 +370,15 @@ def _do_request(self):
# "stop_point:2", by walking (42 meters, 41 sec) and (43 meters, 44sec) respectively
# it is a temporary storage that will be used later to update fallback_durations
places_isochrone, access_points_map, stop_points = self._build_places_isochrone(
proximities_by_crowfly, all_free_access
proximities_by_crowfly, all_free_access_uris
)

centers_isochrone = self._determine_centers_isochrone()
result = []
for center_isochrone in centers_isochrone:
result.append(
self.build_fallback_duration(
center_isochrone, all_free_access, places_isochrone, access_points_map
center_isochrone, all_free_access_uris, places_isochrone, access_points_map
)
)
if len(result) == 1:
Expand Down Expand Up @@ -393,14 +410,16 @@ def _async_request(self):
def wait_and_get(self):
return self._value.wait_and_get() if self._value else None

def build_fallback_duration(self, center_isochrone, all_free_access, places_isochrone, access_points_map):
def build_fallback_duration(
self, center_isochrone, all_free_access_uris, places_isochrone, access_points_map
):
logger = logging.getLogger(__name__)

# the final result to be returned, which is a map of stop_points.uri vs DurationElement
fallback_durations = defaultdict(lambda: DurationElement(float('inf'), None, None, 0, None, None))

# Since we have already places that have free access, we add them into the fallback_durations
self._fill_fallback_durations_with_free_access(fallback_durations, all_free_access)
self._fill_fallback_durations_with_free_access(fallback_durations, all_free_access_uris)

# There are two cases that places_isochrone maybe empty:
# 1. The duration of direct_path is very small that we cannot find any proximities by crowfly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def _is_crowfly_needed(uri, fallback_durations, crowfly_sps, fallback_direct_pat
f = fallback_durations.get(uri, None)
is_unknown_projection = f.status == response_pb2.unknown if f else False

is_crowfly_sp = uri in crowfly_sps
is_crowfly_sp = uri in set((sp.uri for sp in crowfly_sps))

# At this point, theoretically, fallback_dp should be found since the isochrone has already given a
# valid value BUT, in some cases(due to the bad projection, etc), fallback_dp may not exist even
Expand Down Expand Up @@ -538,7 +538,7 @@ def _build_crowfly(pt_journey, entry_point, mode, places_free_access, fallback_d
# No need for a crowfly if the pt section starts from the requested object
return None

if pt_obj.uri in places_free_access.odt:
if pt_obj.uri in [o.uri for o in places_free_access.odt]:
pt_obj.CopyFrom(entry_point)
# Update first or last coord in the shape
fallback_logic.update_shape_coord(pt_journey, get_pt_object_coord(pt_obj))
Expand Down Expand Up @@ -595,7 +595,7 @@ def _build_fallback(
_, _, _, _, via_pt_access, via_poi_access = fallback_durations[pt_obj.uri]

if requested_obj.uri != pt_obj.uri:
if pt_obj.uri in accessibles_by_crowfly.odt:
if pt_obj.uri in [o.uri for o in accessibles_by_crowfly.odt]:
pt_obj.CopyFrom(requested_obj)
else:
# extend the journey with the fallback routing path
Expand Down Expand Up @@ -731,7 +731,7 @@ def compute_fallback(
pt_departure = fallback.get_pt_section_datetime(journey)
fallback_extremity_dep = PeriodExtremity(pt_departure, False)
from_sub_request_id = "{}_{}_from".format(request_id, i)
if from_obj.uri != pt_orig.uri and pt_orig.uri not in orig_all_free_access:
if from_obj.uri != pt_orig.uri and pt_orig.uri not in set((p.uri for p in orig_all_free_access)):
# here, if the mode is car, we have to find from which car park the stop_point is accessed
if dep_mode == 'car':
orig_obj = orig_fallback_durations_pool.wait_and_get(dep_mode)[pt_orig.uri].car_park
Expand Down Expand Up @@ -763,7 +763,7 @@ def compute_fallback(
pt_arrival = fallback.get_pt_section_datetime(journey)
fallback_extremity_arr = PeriodExtremity(pt_arrival, True)
to_sub_request_id = "{}_{}_to".format(request_id, i)
if to_obj.uri != pt_dest.uri and pt_dest.uri not in dest_all_free_access:
if to_obj.uri != pt_dest.uri and pt_dest.uri not in set((p.uri for p in dest_all_free_access)):
if arr_mode == 'car':
dest_obj = dest_fallback_durations_pool.wait_and_get(arr_mode)[pt_dest.uri].car_park
real_mode = dest_fallback_durations_pool.get_real_mode(arr_mode, dest_obj.uri)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,15 @@
# https://groups.google.com/d/forum/navitia
# www.navitia.io
from __future__ import absolute_import

import collections
from navitiacommon import type_pb2
from jormungandr import utils, new_relic
from collections import namedtuple
import logging
from .helper_utils import timed_logger

FreeAccessObject = namedtuple('FreeAccessObject', ['uri', 'lon', 'lat'])
PlaceFreeAccessResult = namedtuple('PlaceFreeAccessResult', ['crowfly', 'odt', 'free_radius'])


Expand Down Expand Up @@ -73,18 +76,30 @@ def _do_request(self):
place = self._requested_place_obj

if place.embedded_type == type_pb2.STOP_AREA:
crowfly = self._get_stop_points_for_stop_area(self._instance.georef, place.uri)
crowfly = {
FreeAccessObject(sp[0], sp[1], sp[2])
for sp in self._get_stop_points_for_stop_area(self._instance.georef, place.uri)
}
elif place.embedded_type == type_pb2.ADMINISTRATIVE_REGION:
crowfly = {sp.uri for sa in place.administrative_region.main_stop_areas for sp in sa.stop_points}
crowfly = {
FreeAccessObject(sp.uri, sp.coord.lon, sp.coord.lat)
for sa in place.administrative_region.main_stop_areas
for sp in sa.stop_points
}
elif place.embedded_type == type_pb2.STOP_POINT:
crowfly = {place.stop_point.uri}
crowfly = {
FreeAccessObject(place.stop_point.uri, place.stop_point.coord.lon, place.stop_point.coord.lat)
}

coord = utils.get_pt_object_coord(place)
odt = set()

if coord:
odt_sps = self._get_odt_stop_points(self._pt_planner, coord)
[odt.add(stop_point.uri) for stop_point in odt_sps]
collections.deque(
(odt.add(FreeAccessObject(sp.uri, sp.coord.lon, sp.coord.lat)) for sp in odt_sps),
maxlen=1,
)

self._logger.debug("finish places with free access from %s", self._requested_place_obj.uri)

Expand Down
15 changes: 14 additions & 1 deletion source/jormungandr/jormungandr/street_network/asgard.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,21 @@ def _get_street_network_routing_matrix(
req.sn_routing_matrix.mode = FallbackModes.car.name

res = self._call_asgard(req, request_id)

# to handle the case where all origins or all destinations happen to be located in excluded zones
# Asgard could have returned a matrix filled with Unreached status, which is kind of waste of the bandwidth
# So instead, asgard return with an error_id(all_excluded), we fill the matrix with just on element
# to make jormun believe that Asgard has actually responded without errors,
# so no crow fly is about to be created
if res is not None and res.HasField('error') and res.error.id == response_pb2.Error.all_excluded:
row = res.sn_routing_matrix.rows.add()
r = row.routing_response.add()
r.routing_status = response_pb2.unreached
r.duration = -1

self._check_for_error_and_raise(res)
return res.sn_routing_matrix

return res.sn_routing_matrix if res else None

@staticmethod
def handle_car_no_park_modes(mode):
Expand Down
2 changes: 1 addition & 1 deletion source/jormungandr/jormungandr/street_network/kraken.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ def _create_sn_routing_matrix_request(
)

def _check_for_error_and_raise(self, res):
if res is None or res.HasField('error'):
if res is None or res.HasField('error') and res.error.id != response_pb2.Error.all_excluded:
logging.getLogger(__name__).error(
'routing matrix query error {}'.format(res.error if res else "Unknown")
)
Expand Down
Loading

0 comments on commit db3c526

Please sign in to comment.