From 9a57d713de98f7c06413b69a6b064455cb80fda3 Mon Sep 17 00:00:00 2001 From: GeorgeEfstathiadis <54844705+GeorgeEfstathiadis@users.noreply.github.com> Date: Fri, 19 Nov 2021 19:05:14 +0000 Subject: [PATCH] Added process_attributes function (#49) Add process_switches function, convert exist list to PossibleExits class, and add error checking to Attributes class --- forest/bonsai/simulate_gps_data.py | 174 +++++++++++++----- forest/bonsai/tests/test_simulate_gps_data.py | 141 +++++++++----- 2 files changed, 224 insertions(+), 91 deletions(-) diff --git a/forest/bonsai/simulate_gps_data.py b/forest/bonsai/simulate_gps_data.py index 80dd5cd5..2eedced4 100644 --- a/forest/bonsai/simulate_gps_data.py +++ b/forest/bonsai/simulate_gps_data.py @@ -8,7 +8,7 @@ from enum import Enum import os import time -from typing import Dict, List, Tuple, Union +from typing import Dict, List, Optional, Tuple, Union import numpy as np import openrouteservice @@ -18,6 +18,19 @@ from forest.poplar.legacy.common_funcs import datetime2stamp, stamp2datetime R = 6.371*10**6 +ACTIVE_STATUS_LIST = range(11) +TRAVELLING_STATUS_LIST = range(11) + + +class PossibleExits(Enum): + """This class enumerates possible exits for attributes""" + CAFE = "cafe" + BAR = "bar" + RESTAURANT = "restaurant" + PARK = "park" + CINEMA = "cinema" + DANCE = "dance" + FITNESS = "fitness" class Vehicle(Enum): @@ -30,7 +43,7 @@ class Vehicle(Enum): class Occupation(Enum): """This class enumerates occupation for attributes""" - NONE = "" + NONE = "none" WORK = "office" SCHOOL = "university" @@ -170,27 +183,79 @@ def bounding_box(center: Tuple[float, float], radius: int) -> Tuple: return lat - lat_const, lon - lon_const, lat + lat_const, lon + lon_const -@dataclass class Attributes: - """This class holds the attributes needed to create an instance of a person + """This class holds the attributes needed to create an instance of a + Person class""" - Args: - vehicle used for distances and time of flights - main_occupation used for routine action in weekdays - active_status = 0-10 - used for probability in free time to take an action - or stay home - travelling status = 0-10 - used to derive amount of distance travelled - preferred_places = [x1, x2, x3] - used to sample action when free time - where x1-x3 are amenities (str) - """ - vehicle: Vehicle - main_occupation: Occupation - active_status: int - travelling_status: int - preferred_places: list + def __init__(self, + vehicle: Optional[str] = None, + main_employment: Optional[str] = None, + active_status: Optional[int] = None, + travelling_status: Optional[int] = None, + preferred_places: Optional[List[str]] = None, + **kwargs): + """Error check and generate missing data for attributes + + Args: + vehicle: used for distances and time of flights + main_occupation: used for routine action in weekdays + active_status: used for probability in free time to take an action + or stay home + travelling status: used to derive amount of distance travelled + preferred_places :used to sample action when free time + where x1-x3 are amenities (str) + Raises: + ValueError: for incorrect vehicle type + ValueError: for incorrect main_employment type + ValueError: if active_status is not between 0 and 10 + ValueError: if travelling_status is not between 0 and 10 + ValueError: if an exit from possible_exits is not correct type + """ + if vehicle is not None: + self.vehicle = Vehicle(vehicle) + else: + # exclude bus + self.vehicle = np.random.choice(list(Vehicle)[1:]) + + if main_employment is not None: + self.main_occupation = Occupation(main_employment) + else: + self.main_occupation = np.random.choice(list(Occupation)) + + if active_status is not None: + if active_status not in ACTIVE_STATUS_LIST: + raise ValueError("active_status must be between 0 and 10") + self.active_status = int(active_status) + else: + self.active_status = np.random.choice(ACTIVE_STATUS_LIST) + + if travelling_status is not None: + if travelling_status not in TRAVELLING_STATUS_LIST: + raise ValueError("travelling_status must be between 0 and 10") + self.travelling_status = int(travelling_status) + else: + self.travelling_status = np.random.choice(TRAVELLING_STATUS_LIST) + + if preferred_places is not None: + self.preferred_places = [] + for possible_exit in preferred_places: + possible_exit2 = PossibleExits(possible_exit) + self.preferred_places.append(possible_exit2) + + possible_exits2 = [ + x for x in PossibleExits + if x not in self.preferred_places + ] + + random_exits = np.random.choice( + possible_exits2, 3 - len(self.preferred_places), replace=False + ).tolist() + for choice in random_exits: + self.preferred_places.append(choice) + else: + self.preferred_places = np.random.choice( + list(PossibleExits), 3, replace=False + ).tolist() @dataclass @@ -271,15 +336,7 @@ def __init__(self, self.office_days = np.array([]) # define favorite places - self.possible_destinations = [ - "cafe", - "bar", - "restaurant", - "park", - "cinema", - "dance", - "fitness", - ] + self.possible_destinations = list(PossibleExits) # for a certain venue select 3 locations for each venue randomly # these will be considered the 3 favorite places to go @@ -290,24 +347,26 @@ def __init__(self, for possible_exit in self.possible_destinations: # if there are more than 3 sets of coordinates for an venue # select 3 at random, else select all of them as preferred - if len(local_places[possible_exit]) > 3: + if len(local_places[possible_exit.value]) > 3: random_places = np.random.choice( - range(len(local_places[possible_exit])), 3, replace=False + range(len(local_places[possible_exit.value])), + 3, replace=False, ).tolist() places_selected = [ tuple(place) - for place in np.array(local_places[possible_exit])[ + for place in np.array(local_places[possible_exit.value])[ random_places ] if tuple(place) != home_coordinates ] - setattr(self, possible_exit + "_places", places_selected) + setattr(self, possible_exit.value + "_places", places_selected) else: setattr( self, - possible_exit + "_places", + possible_exit.value + "_places", [ - tuple(place) for place in local_places[possible_exit] + tuple(place) for place + in local_places[possible_exit.value] if tuple(place) != home_coordinates ], ) @@ -315,13 +374,13 @@ def __init__(self, # create a list of the locations ordered by distance distances = [ great_circle_dist(*home_coordinates, *place) - for place in getattr(self, possible_exit + "_places") + for place in getattr(self, possible_exit.value + "_places") ] order = np.argsort(distances) setattr( self, - possible_exit + "_places_ordered", - np.array(getattr(self, possible_exit + "_places"))[ + possible_exit.value + "_places_ordered", + np.array(getattr(self, possible_exit.value + "_places"))[ order ].tolist(), ) @@ -329,7 +388,7 @@ def __init__(self, # remove all exits which have no places nearby possible_destinations2 = self.possible_destinations.copy() for act in possible_destinations2: - if len(getattr(self, act + "_places")) == 0: + if len(getattr(self, act.value + "_places")) == 0: self.possible_destinations.remove(act) # order preferred places by travelling_status @@ -340,7 +399,7 @@ def __init__(self, + (10 - self.attributes.travelling_status) ** 2 ) for act in self.possible_destinations: - act_places = getattr(self, act + "_places_ordered").copy() + act_places = getattr(self, act.value + "_places_ordered").copy() places = [] for i in range(len(act_places) - 1, -1, -1): @@ -348,7 +407,7 @@ def __init__(self, places.append(act_places[index]) del act_places[index] - setattr(self, act + "_places", places) + setattr(self, act.value + "_places", places) def set_travelling_status(self, travelling_status: int): """Update preferred locations of exits @@ -364,7 +423,7 @@ def set_travelling_status(self, travelling_status: int): travelling_status ** 2 + (10 - travelling_status) ** 2 ) for act in self.possible_destinations: - act_places = getattr(self, act + "_places_ordered").copy() + act_places = getattr(self, act.value + "_places_ordered").copy() places = [] for i in range(len(act_places) - 1, -1, -1): @@ -372,7 +431,7 @@ def set_travelling_status(self, travelling_status: int): places.append(act_places[index]) del act_places[index] - setattr(self, act + "_places", places) + setattr(self, act.value + "_places", places) def set_active_status(self, active_status: int): """Update active status. @@ -393,7 +452,7 @@ def set_active_status(self, active_status: int): ) self.office_days.sort() - def update_preferred_places(self, exit_code: str): + def update_preferred_places(self, exit_code: PossibleExits): """This function updates the set of preferred exits for the day, after an action has been performed. @@ -496,7 +555,7 @@ def choose_preferred_exit(self, current_time: float, # after venue has been selected, a location for that venue # needs to be selected as well. - action_locations = getattr(self, selected_action + "_places") + action_locations = getattr(self, selected_action.value + "_places") ratios2 = ratios[: len(action_locations)] probabilities2 = np.array(ratios2) probabilities2 = probabilities2 / sum(probabilities2) @@ -1035,6 +1094,29 @@ def prepare_data( ) +def process_switches( + attributes: Dict[str, Dict], key: str, +) -> Dict[str, int]: + """Preprocesses the attributes of each person. + + Args: + attributes: (dictionary) contains attributes of each person, + loaded from json file. + key: (str) a key from attributes.keys() + Returns: + switches: (dictionary) contains possible changes of attributes + in between of simulation + """ + switches = {} + + for x in attributes[key].keys(): + key_list = x.split("-") + if len(key_list) == 2: + switches[x] = attributes[key][x] + + return switches + + def sim_GPS_data(cycle,p,data_folder): ## only two parameters ## cycle is the sum of on-cycle and off_cycle, unit is minute diff --git a/forest/bonsai/tests/test_simulate_gps_data.py b/forest/bonsai/tests/test_simulate_gps_data.py index 1288b4f2..15608e60 100644 --- a/forest/bonsai/tests/test_simulate_gps_data.py +++ b/forest/bonsai/tests/test_simulate_gps_data.py @@ -6,9 +6,10 @@ import pytest from forest.bonsai.simulate_gps_data import ( - bounding_box, get_basic_path, get_path, Vehicle, Occupation, + bounding_box, get_basic_path, get_path, PossibleExits, Vehicle, Occupation, ActionType, Attributes, Person, gen_basic_traj, gen_basic_pause, gen_route_traj, gen_all_traj, remove_data, prepare_data, + process_switches, ) from forest.jasmine.data2mobmat import great_circle_dist @@ -426,23 +427,74 @@ def sample_locations(): } -def test_person_main_employment(sample_coordinates, sample_locations): - attributes = Attributes(vehicle=Vehicle.BUS, - main_occupation=Occupation.WORK, - active_status=6, - travelling_status=8, - preferred_places=["cinema", "bar", "park"]) +@pytest.fixture(scope="session") +def sample_attributes(): + """Sample attributes""" + return { + "User 1": + { + "main_employment": "none", + "vehicle": "car", + "travelling_status": 10, + "active_status": 0 + }, + + "Users 2-4": + { + "main_employment": "university", + "vehicle": "bicycle", + "travelling_status": 8, + "active_status": 8, + "active_status-16": 2 + }, + + "User 5": + { + "main_employment": "office", + "vehicle": "foot", + "travelling_status": 9, + "travelling_status-20": 1, + "preferred_places": ["cafe", "bar", "cinema"] + } + } + + +def test_attributes_user_missing_args(sample_attributes): + """Test processing attributes with missing arguments""" + + user_attrs = sample_attributes["User 1"] + attrs = Attributes(**user_attrs) + assert len(attrs.preferred_places) == 3 + + +def test_process_attributes_arguments_correct(sample_attributes): + """Test that given arguments are processed correctly""" + + user_attrs = sample_attributes["User 5"] + attrs = Attributes(**user_attrs) + assert ( + attrs.travelling_status == 9 + and attrs.preferred_places == [ + PossibleExits.CAFE, PossibleExits.BAR, PossibleExits.CINEMA + ] + ) + + +def test_person_main_employment( + sample_coordinates, sample_locations, sample_attributes +): + user_attrs = sample_attributes["User 5"] + attributes = Attributes(**user_attrs) random_person = Person(sample_coordinates, attributes, sample_locations) assert random_person.attributes.main_occupation == Occupation.WORK -def test_person_cafe_places(sample_coordinates, sample_locations): +def test_person_cafe_places( + sample_coordinates, sample_locations, sample_attributes +): """Test one place from cafe_places attribute is actual cafe""" - attributes = Attributes(vehicle=Vehicle.BUS, - main_occupation=Occupation.NONE, - active_status=2, - travelling_status=7, - preferred_places=["cafe", "cinema", "park"]) + user_attrs = sample_attributes["User 5"] + attributes = Attributes(**user_attrs) random_person = Person(sample_coordinates, attributes, sample_locations) cafe_place = ( random_person.cafe_places[0][0], @@ -451,13 +503,12 @@ def test_person_cafe_places(sample_coordinates, sample_locations): assert cafe_place in sample_locations["cafe"] -def test_person_office_address(sample_coordinates, sample_locations): +def test_person_office_address( + sample_coordinates, sample_locations, sample_attributes +): """Test person going to work office_address""" - attributes = Attributes(vehicle=Vehicle.BUS, - main_occupation=Occupation.WORK, - active_status=6, - travelling_status=7, - preferred_places=["cafe", "cinema", "park"]) + user_attrs = sample_attributes["User 5"] + attributes = Attributes(**user_attrs) random_person = Person(sample_coordinates, attributes, sample_locations) office_coordinates = ( random_person.office_coordinates[0], @@ -466,24 +517,20 @@ def test_person_office_address(sample_coordinates, sample_locations): assert office_coordinates in sample_locations["office"] -def test_person_office_days(sample_coordinates, sample_locations): +def test_person_office_days( + sample_coordinates, sample_locations, sample_attributes +): """Test person going to work office_address""" - attributes = Attributes(vehicle=Vehicle.BUS, - main_occupation=Occupation.WORK, - active_status=6, - travelling_status=7, - preferred_places=["cafe", "bar", "park"]) + user_attrs = sample_attributes["User 5"] + attributes = Attributes(**user_attrs) random_person = Person(sample_coordinates, attributes, sample_locations) assert len(random_person.office_days) <= 5 @pytest.fixture() -def sample_person(sample_coordinates, sample_locations): - attributes = Attributes(vehicle=Vehicle.BUS, - main_occupation=Occupation.WORK, - active_status=6, - travelling_status=7, - preferred_places=["cafe", "bar", "park"]) +def sample_person(sample_coordinates, sample_locations, sample_attributes): + user_attrs = sample_attributes["User 5"] + attributes = Attributes(**user_attrs) return Person(sample_coordinates, attributes, sample_locations) @@ -499,23 +546,16 @@ def test_set_active_status(sample_person): def test_update_preferred_places_case_first_option(sample_person): """Test changing preferred exits change first to second""" - sample_person.update_preferred_places("cafe") - assert sample_person.preferred_places_today == ["bar", "cafe", "park"] + sample_person.update_preferred_places(PossibleExits.CAFE) + assert sample_person.preferred_places_today == [ + PossibleExits.BAR, PossibleExits.CAFE, PossibleExits.CINEMA + ] def test_update_preferred_places_case_last_option(sample_person): """Test changing preferred exits remove last""" - sample_person.update_preferred_places("park") - assert "park" not in sample_person.preferred_places_today - - -def test_update_preferred_places_case_no_option(sample_person): - """Test changing preferred exits when selected exit not in preferred""" - sample_person.update_preferred_places("not_an_option") - assert ( - sample_person.preferred_places_today - == sample_person.attributes.preferred_places - ) + sample_person.update_preferred_places(PossibleExits.CINEMA) + assert PossibleExits.CINEMA not in sample_person.preferred_places_today def test_choose_preferred_exit_morning_home(sample_person): @@ -549,7 +589,7 @@ def test_choose_preferred_exit_random_exit(sample_person): def test_end_of_day_reset(sample_person): """Test end of day reset of preferred exits""" - sample_person.update_preferred_places("cafe") + sample_person.update_preferred_places(PossibleExits.CAFE) sample_person.end_of_day_reset() assert ( sample_person.attributes.preferred_places @@ -836,3 +876,14 @@ def test_prepare_data_timezones(generated_trajectory): final_data['timestamp'] == final_data['UTC time'] + 3600000 ) assert sum(boolean_series) == len(boolean_series) + + +def test_process_switches(sample_attributes): + """Test processing attributes with switch of behavior""" + + key = "User 5" + switches = process_switches(sample_attributes, key) + assert ( + list(switches.keys())[0] == "travelling_status-20" + and list(switches.values())[0] == 1 + )