From 4c4c02faf78e97094ab193b367c8cd516ea3aa27 Mon Sep 17 00:00:00 2001 From: Lara CODECA Date: Tue, 28 Jul 2020 14:47:30 +0100 Subject: [PATCH 01/15] development files added to gitignore --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index b6e4761..3efbf71 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ +# Development +copy-to-devel-docker.sh + # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] From 8f888e9e4e05703e433f2e8b4b2aaea8c1e40284 Mon Sep 17 00:00:00 2001 From: Lara CODECA Date: Tue, 28 Jul 2020 14:49:32 +0100 Subject: [PATCH 02/15] MAJOR CHANGE for speed up: import libsumo as traci --- example/marlenvironment.py | 7 +++++-- rllibsumoutils/sumoconnector.py | 3 +-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/example/marlenvironment.py b/example/marlenvironment.py index 54a4573..064d161 100644 --- a/example/marlenvironment.py +++ b/example/marlenvironment.py @@ -168,8 +168,11 @@ def get_observation(self, agent): if agent in self.simulation.veh_subscriptions: speed = round(self.simulation.veh_subscriptions[agent][tc.VAR_SPEED] * MS_TO_KMH) leader = self.simulation.veh_subscriptions[agent][tc.VAR_LEADER] - if leader: - distance = round(leader[1]) + if leader: ## compatible with traci + veh, dist = leader + if veh: + ## compatible with libsumo + distance = round(dist) ret = [speed, distance] print('Observation: {}'.format(ret)) return ret diff --git a/rllibsumoutils/sumoconnector.py b/rllibsumoutils/sumoconnector.py index 6a0a736..d492007 100644 --- a/rllibsumoutils/sumoconnector.py +++ b/rllibsumoutils/sumoconnector.py @@ -14,7 +14,7 @@ # """ Import SUMO library """ if 'SUMO_HOME' in os.environ: sys.path.append(os.path.join(os.environ['SUMO_HOME'], 'tools')) - import traci + import libsumo as traci import traci.constants as tc else: raise Exception("Please declare environment variable 'SUMO_HOME'") @@ -169,4 +169,3 @@ def end_simulation(self): pass ################################################################################################ - \ No newline at end of file From 30fa18cf10f4e6602416bc1aef4888e5937bdc96 Mon Sep 17 00:00:00 2001 From: Lara CODECA Date: Tue, 28 Jul 2020 14:49:54 +0100 Subject: [PATCH 03/15] PPO policy_conf['multiagent'] fix --- example/train.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/example/train.py b/example/train.py index a923df2..a4b086a 100644 --- a/example/train.py +++ b/example/train.py @@ -106,11 +106,9 @@ def _main(): marl_env.get_obs_space(agent), marl_env.get_action_space(agent), agent_policy_params) - policy_conf['multiagent'] = { - 'policies': policies, - 'policy_mapping_fn': lambda agent_id: agent_id, - 'policies_to_train': ['ppo_policy'], - } + policy_conf['multiagent']['policies'] = policies + policy_conf['multiagent']['policy_mapping_fn'] = lambda agent_id: agent_id + policy_conf['multiagent']['policies_to_train'] = ['ppo_policy'] policy_conf['env_config'] = env_config trainer = ppo.PPOTrainer(env='test_env', config=policy_conf) From 70619efde6a76e6182e0794b1b3b10865d1c1536 Mon Sep 17 00:00:00 2001 From: Lara CODECA Date: Tue, 28 Jul 2020 16:24:16 +0100 Subject: [PATCH 04/15] specific ppo trainer --- example/{train.py => ppotrain.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename example/{train.py => ppotrain.py} (100%) diff --git a/example/train.py b/example/ppotrain.py similarity index 100% rename from example/train.py rename to example/ppotrain.py From 44b8c6fb6787a0ee5790722dbd93012c71cabe2e Mon Sep 17 00:00:00 2001 From: Lara CODECA Date: Thu, 30 Jul 2020 19:30:38 +0100 Subject: [PATCH 05/15] libsumo - logger - configurations - parallel environments - ppo & a3c trainers --- example/a3ctrain.py | 120 +++++++++++++++++++++++++++ example/marlenvironment.py | 128 +++++++++++++++++++---------- example/ppotrain.py | 94 +++++++++------------ rllibsumoutils/sumoconnector.py | 139 ++++++++++++++++++++++---------- rllibsumoutils/sumoutils.py | 98 ++++++++++++---------- 5 files changed, 392 insertions(+), 187 deletions(-) create mode 100644 example/a3ctrain.py diff --git a/example/a3ctrain.py b/example/a3ctrain.py new file mode 100644 index 0000000..15752e6 --- /dev/null +++ b/example/a3ctrain.py @@ -0,0 +1,120 @@ +#!/usr/bin/env python3 + +""" Example Trainer for RLLIB + SUMO Utlis + + Author: Lara CODECA + + This program and the accompanying materials are made available under the + terms of the Eclipse Public License 2.0 which is available at + http://www.eclipse.org/legal/epl-2.0. +""" + +from copy import deepcopy +import logging +import pathlib +from pprint import pformat +import random +import sys +import traceback + +import ray + +from ray.rllib.agents.a3c import a3c +from ray.tune.logger import pretty_print + +import marlenvironment + +logging.basicConfig(level=logging.WARN) +logger = logging.getLogger('a3ctrain') + +def _main(): + """ Training example """ + + # Initialize RAY. + ray.tune.registry.register_env('test_env', marlenvironment.env_creator) + ray.init(memory=52428800, object_store_memory=78643200) ## minimum values + + # Algorithm. + policy_class = a3c.A3CTFPolicy + + # https://github.com/ray-project/ray/blob/releases/0.8.6/rllib/agents/trainer.py#L44 + # https://github.com/ray-project/ray/blob/releases/0.8.6/rllib/agents/a3c/a3c.py#L14 + policy_conf = a3c.DEFAULT_CONFIG + policy_conf['batch_mode'] = 'complete_episodes' + policy_conf['collect_metrics_timeout'] = 86400 # 1 day timeout + policy_conf['ignore_worker_failures'] = False + policy_conf['log_level'] = 'WARN' + policy_conf['min_iter_time_s'] = 5 + policy_conf['monitor'] = True + policy_conf['no_done_at_end'] = True + policy_conf['num_envs_per_worker'] = 1 + policy_conf['num_workers'] = 2 + policy_conf['remote_env_batch_wait_ms'] = 1000 + policy_conf['remote_worker_envs'] = False + policy_conf['rollout_fragment_length'] = 1 + policy_conf['seed'] = 666 + policy_conf['sgd_minibatch_size'] = 1 + policy_conf['simple_optimizer'] = True + policy_conf['timesteps_per_iteration'] = 1000 + policy_conf['train_batch_size'] = 1 + policy_conf['use_gae'] = False + + # https://github.com/ray-project/ray/blob/releases/0.8.6/rllib/models/catalog.py#L37 + policy_conf['model']['use_lstm'] = True + + # Load default Scenario configuration for the LEARNING ENVIRONMENT + scenario_config = deepcopy(marlenvironment.DEFAULT_SCENARIO_CONFING) + scenario_config['seed'] = 42 + scenario_config['log_level'] = 'INFO' + scenario_config['sumo_config']['sumo_cfg'] = '{}/scenario/sumo.cfg.xml'.format( + pathlib.Path(__file__).parent.absolute()) + scenario_config['sumo_config']['sumo_params'] = ['--collision.action', 'warn'] + scenario_config['sumo_config']['end_of_sim'] = 3600 # [s] + scenario_config['sumo_config']['update_freq'] = 10 # number of traci.simulationStep() + # for each environment step. + scenario_config['sumo_config']['log_level'] = 'INFO' + logger.info('Scenario Configuration: \n %s', pformat(scenario_config)) + + # Associate the agents with their configuration. + agent_init = { + 'agent_0': deepcopy(marlenvironment.DEFAULT_AGENT_CONFING), + 'agent_1': deepcopy(marlenvironment.DEFAULT_AGENT_CONFING), + } + logger.info('Agents Configuration: \n %s', pformat(agent_init)) + + ## MARL Environment Init + env_config = { + 'agent_init': agent_init, + 'scenario_config': scenario_config, + } + marl_env = marlenvironment.SUMOTestMultiAgentEnv(env_config) + + # Config for the A3C trainer from the MARLEnv + policies = {} + for agent in marl_env.get_agents(): + agent_policy_params = {} + policies[agent] = (policy_class, + marl_env.get_obs_space(agent), + marl_env.get_action_space(agent), + agent_policy_params) + policy_conf['multiagent']['policies'] = policies + policy_conf['multiagent']['policy_mapping_fn'] = lambda agent_id: agent_id + policy_conf['multiagent']['policies_to_train'] = ['a3c_policy'] + policy_conf['env_config'] = env_config + + logger.info('a3cA3C Configuration: \n %s', pformat(policy_conf)) + trainer = a3c.A3CTrainer(env='test_env', + config=policy_conf) + + # Single training iteration, just for testing. + try: + result = trainer.train() + print('Results: \n {}'.format(pretty_print(result))) + except Exception: + EXC_TYPE, EXC_VALUE, EXC_TRACEBACK = sys.exc_info() + traceback.print_exception(EXC_TYPE, EXC_VALUE, EXC_TRACEBACK, file=sys.stdout) + finally: + ray.shutdown() + +if __name__ == '__main__': + _main() diff --git a/example/marlenvironment.py b/example/marlenvironment.py index 064d161..9b6ba0f 100644 --- a/example/marlenvironment.py +++ b/example/marlenvironment.py @@ -8,6 +8,7 @@ """ import collections +import logging import os import sys from pprint import pformat @@ -17,7 +18,7 @@ import gym from ray.rllib.env import MultiAgentEnv -from rllibsumoutils.sumoutils import SUMOUtils +from rllibsumoutils.sumoutils import SUMOUtils, sumo_default_config # """ Import SUMO library """ if 'SUMO_HOME' in os.environ: @@ -27,15 +28,21 @@ else: sys.exit("please declare environment variable 'SUMO_HOME'") +#################################################################################################### + +logger = logging.getLogger(__name__) + +#################################################################################################### + def env_creator(config): """ Environment creator used in the environment registration. """ - print('[env_creator] Environment creation: SUMOTestMultiAgentEnv') + logger.info('Environment creation: SUMOTestMultiAgentEnv') return SUMOTestMultiAgentEnv(config) -MS_TO_KMH = 3.6 - #################################################################################################### +MS_TO_KMH = 3.6 + class SUMOSimulationWrapper(SUMOUtils): """ A wrapper for the interaction with the SUMO simulation """ @@ -63,19 +70,19 @@ def _default_step_action(self, agents): pass # get collisions collisions = self.traci_handler.simulation.getCollidingVehiclesIDList() - print('Collisions: {}'.format(collisions)) + logger.debug('Collisions: %s', pformat(collisions)) for veh in collisions: self.collisions[veh] += 1 # get subscriptions self.veh_subscriptions = self.traci_handler.vehicle.getAllSubscriptionResults() for veh, vals in self.veh_subscriptions.items(): - print('Subs:', veh, vals) + logger.debug('Subs: %s, %s', pformat(veh), pformat(vals)) running = set() for agent in agents: if agent in self.veh_subscriptions: running.add(agent) if len(running) == 0: - print('All the agent left the simulation..') + logger.info('All the agent left the simulation..') self.end_simulation() return True @@ -90,22 +97,21 @@ def __init__(self, agent, config): self.action_to_meaning = dict() for pos, action in enumerate(config['actions']): self.action_to_meaning[pos] = config['actions'][action] - print('CONFIG: agent "{}"'.format(self.agent_id)) - print('CONFIG: \n{}'.format(pformat(self.config))) - print('CONFIG: actions \n{}'.format(pformat(self.action_to_meaning))) + logger.debug('Agent "%s" configuration \n %s', self.agent_id, pformat(self.config)) def step(self, action, sumo_handler): """ Implements the logic of each specific action passed as input. """ - print('Agent {}: action {}'.format(self.agent_id, action)) + logger.debug('Agent %s: action %d', self.agent_id, action) # Subscriptions EXAMPLE: # {'agent_0': {64: 14.603468282230542, 104: None}, # 'agent_1': {64: 12.922797055918513, # 104: ('veh.19', 27.239870121802596)}} - print('Subscriptions: {}'.format(sumo_handler.veh_subscriptions[self.agent_id])) + logger.debug('Subscriptions: %s', pformat(sumo_handler.veh_subscriptions[self.agent_id])) previous_speed = sumo_handler.veh_subscriptions[self.agent_id][tc.VAR_SPEED] new_speed = previous_speed + self.action_to_meaning[action] - print('Before {}, After {}'.format(previous_speed, new_speed)) + logger.debug('Before %.2f', previous_speed) sumo_handler.traci_handler.vehicle.setSpeed(self.agent_id, new_speed) + logger.debug('After %.2f', new_speed) return def reset(self, sumo_handler): @@ -122,11 +128,33 @@ def reset(self, sumo_handler): self.agent_id, route, departLane='best', departSpeed='max') sumo_handler.traci_handler.vehicle.subscribeLeader(self.agent_id) sumo_handler.traci_handler.vehicle.subscribe(self.agent_id, varIDs=[tc.VAR_SPEED, ]) - print('Agent {} reset'.format(self.agent_id)) + logger.info('Agent %s reset done.', self.agent_id) return self.agent_id, self.config['start'] #################################################################################################### +DEFAULT_SCENARIO_CONFING = { + 'sumo_config': sumo_default_config(), + 'agent_rnd_order': True, + 'log_level': 'WARN', + 'seed': 42, + 'misc': { + 'max_distance': 5000, # [m] + } +} + +DEFAULT_AGENT_CONFING = { + 'origin': 'road', + 'destination': 'road', + 'start': 0, + 'actions': { # increase/decrease the speed of: + 'acc': 1.0, # [m/s] + 'none': 0.0, # [m/s] + 'dec': -1.0, # [m/s] + }, + 'max_speed': 130, # km/h +} + class SUMOTestMultiAgentEnv(MultiAgentEnv): """ A RLLIB environment for testing MARL environments with SUMO simulations. @@ -139,14 +167,26 @@ def __init__(self, config): self._config = config + # logging + level = logging.getLevelName(config['scenario_config']['log_level']) + logger.setLevel(level) + + # SUMO Connector self.simulation = None + + # Random number generator self.rndgen = RandomState(config['scenario_config']['seed']) + # Agent initialization self.agents_init_list = dict() self.agents = dict() for agent, agent_config in self._config['agent_init'].items(): self.agents[agent] = SUMOAgent(agent, agent_config) - self.resetted = False + + # Environment initialization + self.resetted = True + self.episodes = 0 + self.steps = 0 def seed(self, seed): """ Set the seed of a possible random number generator. """ @@ -156,6 +196,11 @@ def get_agents(self): """ Returns a list of the agents. """ return self.agents.keys() + def __del__(self): + logger.info('Environment destruction: SUMOTestMultiAgentEnv') + if self.simulation: + del self.simulation + ######################################### OBSERVATIONS ######################################### def get_observation(self, agent): @@ -164,7 +209,7 @@ def get_observation(self, agent): See http://sumo.sourceforge.net/pydoc/traci._simulation.html """ speed = 0 - distance = self._config['scenario_config']['misc']['max-distance'] + distance = self._config['scenario_config']['misc']['max_distance'] if agent in self.simulation.veh_subscriptions: speed = round(self.simulation.veh_subscriptions[agent][tc.VAR_SPEED] * MS_TO_KMH) leader = self.simulation.veh_subscriptions[agent][tc.VAR_LEADER] @@ -174,7 +219,7 @@ def get_observation(self, agent): ## compatible with libsumo distance = round(dist) ret = [speed, distance] - print('Observation: {}'.format(ret)) + logger.debug('Agent %s --> Obs: %s', agent, pformat(ret)) return ret def compute_observations(self, agents): @@ -182,19 +227,18 @@ def compute_observations(self, agents): obs = dict() for agent in agents: obs[agent] = self.get_observation(agent) - print('Observations: \n{}'.format(pformat(obs))) return obs ########################################### REWARDS ############################################ def get_reward(self, agent): """ Return the reward for a given agent. """ - speed = self.agents[agent].config['max-speed'] # if the agent is not in the subscriptions + speed = self.agents[agent].config['max_speed'] # if the agent is not in the subscriptions # and this function is called, the agent has # reached the end of the road if agent in self.simulation.veh_subscriptions: speed = round(self.simulation.veh_subscriptions[agent][tc.VAR_SPEED] * MS_TO_KMH) - print('Agent {} --> reward {}'.format(agent, speed)) + logger.debug('Agent %s --> Reward %d', agent, speed) return speed def compute_rewards(self, agents): @@ -209,18 +253,14 @@ def compute_rewards(self, agents): def reset(self): """ Resets the env and returns observations from ready agents. """ self.resetted = True + self.episodes += 1 + self.steps = 0 # Reset the SUMO simulation if self.simulation: del self.simulation - config = { - 'sumo_cfg': self._config['scenario_config']['sumo-cfg-file'], - 'sumo_params': self._config['scenario_config']['sumo-params'], - 'end_of_sim': self._config['scenario_config']['misc']['end-of-sim'], - 'update_freq': self._config['scenario_config']['misc']['algo-update-freq'], - } - self.simulation = SUMOSimulationWrapper(config) + self.simulation = SUMOSimulationWrapper(self._config['scenario_config']['sumo_config']) # Reset the agents waiting_agents = list() @@ -256,7 +296,9 @@ def step(self, action_dict): infos (dict): Optional info values for each agent id. """ self.resetted = False - print('============== SUMOTestMultiAgentEnv:step ==============') + self.steps += 1 + logger.debug('====> [SUMOTestMultiAgentEnv:step] Episode: %d - Step: %d <====', + self.episodes, self.steps) dones = {} dones['__all__'] = False @@ -264,22 +306,22 @@ def step(self, action_dict): # may need to be shuffled afterwards, but it # is a matter of consistency instead of using # whatever insertion order was used in the dict - if self._config['scenario_config']['agent-rnd-order']: + if self._config['scenario_config']['agent_rnd_order']: ## randomize the agent order to minimize SUMO's insertion queues impact - print('Shuffling the order of the agents.') + logger.debug('Shuffling the order of the agents.') self.rndgen.shuffle(shuffled_agents) # in-place shuffle # Take action for agent in shuffled_agents: self.agents[agent].step(action_dict[agent], self.simulation) - print('Before SUMO') + logger.debug('Before SUMO') ongoing_simulation = self.simulation.step(until_end=False, agents=set(action_dict.keys())) - print('After SUMO') + logger.debug('After SUMO') ## end of the episode if not ongoing_simulation: - print('Reached the end of the SUMO simulation.') + logger.info('Reached the end of the SUMO simulation.') dones['__all__'] = True obs, rewards, infos = {}, {}, {} @@ -290,7 +332,7 @@ def step(self, action_dict): # punish the agent and remove it from the simulation dones[agent] = True obs[agent] = [0, 0] - rewards[agent] = - self.agents[agent].config['max-speed'] + rewards[agent] = - self.agents[agent].config['max_speed'] # infos[agent] = 'Collision' self.simulation.traci_handler.remove(agent, reason=tc.REMOVE_VAPORIZED) else: @@ -299,11 +341,11 @@ def step(self, action_dict): rewards[agent] = self.get_reward(agent) # infos[agent] = '' - print('Observations: {}'.format(obs)) - print('Rewards: {}'.format(rewards)) - print('Dones: {}'.format(dones)) - print('Info: {}'.format(infos)) - print('========================================================') + logger.debug('Observations: %s', pformat(obs)) + logger.debug('Rewards: %s', pformat(rewards)) + logger.debug('Dones: %s', pformat(dones)) + logger.debug('Info: %s', pformat(infos)) + logger.debug('========================================================') return obs, rewards, dones, infos ################################## ACTIONS & OBSERATIONS SPACE ################################# @@ -322,11 +364,11 @@ def get_set_of_actions(self, agent): def get_obs_space_size(self, agent): """ Returns the size of the observation space. """ - return ((self.agents[agent].config['max-speed'] + 1) * - (self._config['scenario_config']['misc']['max-distance'] + 1)) + return ((self.agents[agent].config['max_speed'] + 1) * + (self._config['scenario_config']['misc']['max_distance'] + 1)) def get_obs_space(self, agent): """ Returns the observation space. """ return gym.spaces.MultiDiscrete( - [self.agents[agent].config['max-speed'] + 1, - self._config['scenario_config']['misc']['max-distance'] + 1]) + [self.agents[agent].config['max_speed'] + 1, + self._config['scenario_config']['misc']['max_distance'] + 1]) diff --git a/example/ppotrain.py b/example/ppotrain.py index a4b086a..4149eb1 100644 --- a/example/ppotrain.py +++ b/example/ppotrain.py @@ -10,7 +10,9 @@ """ from copy import deepcopy +import logging import pathlib +from pprint import pformat import random import sys import traceback @@ -22,70 +24,50 @@ import marlenvironment +logging.basicConfig(level=logging.WARN) +logger = logging.getLogger('ppotrain') + def _main(): """ Training example """ + # Initialize RAY. + ray.tune.registry.register_env('test_env', marlenvironment.env_creator) + ray.init(memory=52428800, object_store_memory=78643200) ## minimum values + # Algorithm. policy_class = ppo.PPOTFPolicy # https://github.com/ray-project/ray/blob/releases/0.8.3/rllib/agents/trainer.py#L41 # https://github.com/ray-project/ray/blob/releases/0.8.3/rllib/agents/ppo/ppo.py#L15 policy_conf = ppo.DEFAULT_CONFIG - policy_conf['log_level'] = 'INFO' + policy_conf['batch_mode'] = 'complete_episodes' + policy_conf['log_level'] = 'WARN' + policy_conf['min_iter_time_s'] = 5 policy_conf['num_workers'] = 2 - policy_conf['train_batch_size'] = 1 - policy_conf['sgd_minibatch_size'] = 1 policy_conf['rollout_fragment_length'] = 1 + policy_conf['seed'] = 42 + policy_conf['sgd_minibatch_size'] = 1 policy_conf['simple_optimizer'] = True - policy_conf['batch_mode'] = 'complete_episodes' - policy_params = {} + policy_conf['train_batch_size'] = 1 # Load default Scenario configuration for the LEARNING ENVIRONMENT - scenario_config = { - "agent-rnd-order": True, - "sumo-cfg-file": "{}/scenario/sumo.cfg.xml".format( - pathlib.Path(__file__).parent.absolute()), - "sumo-params": ['--collision.action', 'warn'], - "seed": 42, - "misc": { - "algo-update-freq": 10, # number of traci.simulationStep() each learning step. - "end-of-sim": 3600, # [s] - "max-distance": 5000, # [m] - } - } - - # Initialize RAY. - ray.tune.registry.register_env('test_env', marlenvironment.env_creator) - ray.init(memory=52428800, object_store_memory=78643200) ## minimum values + scenario_config = deepcopy(marlenvironment.DEFAULT_SCENARIO_CONFING) + scenario_config['seed'] = 42 + scenario_config['log_level'] = 'INFO' + scenario_config['sumo_config']['sumo_cfg'] = '{}/scenario/sumo.cfg.xml'.format( + pathlib.Path(__file__).parent.absolute()) + scenario_config['sumo_config']['sumo_params'] = ['--collision.action', 'warn'] + scenario_config['sumo_config']['end_of_sim'] = 3600 # [s] + scenario_config['sumo_config']['update_freq'] = 10 # number of traci.simulationStep() + # for each learning step. + scenario_config['sumo_config']['log_level'] = 'INFO' + logger.info('Scenario Configuration: \n %s', pformat(scenario_config)) # Associate the agents with their configuration. agent_init = { - "agent_0": { - "origin": "road", - "destination": "road", - "start": 0, - "actions": { # increase/decrease the speed of: - "acc": 1.0, # [m/s] - "none": 0.0, # [m/s] - "dec": -1.0, # [m/s] - }, - "max-speed": 130, # km/h - "seed": 75834444, - "init": [0, 0], - }, - "agent_1": { - "origin": "road", - "destination": "road", - "start": 0, - "actions": { - "acc": 1.0, - "none": 0.0, - "dec": -1.0, - }, - "max-speed": 130, - "seed": 44447583, - "init": [0, 0], - } + 'agent_0': deepcopy(marlenvironment.DEFAULT_AGENT_CONFING), + 'agent_1': deepcopy(marlenvironment.DEFAULT_AGENT_CONFING), } + logger.info('Agents Configuration: \n %s', pformat(agent_init)) ## MARL Environment Init env_config = { @@ -97,11 +79,7 @@ def _main(): # Config for the PPO trainer from the MARLEnv policies = {} for agent in marl_env.get_agents(): - agent_policy_params = deepcopy(policy_params) - from_val, to_val = agent_init[agent]['init'] - agent_policy_params['init'] = lambda: random.randint(from_val, to_val) - agent_policy_params['actions'] = marl_env.get_set_of_actions(agent) - agent_policy_params['seed'] = agent_init[agent]['seed'] + agent_policy_params = {} policies[agent] = (policy_class, marl_env.get_obs_space(agent), marl_env.get_action_space(agent), @@ -110,18 +88,20 @@ def _main(): policy_conf['multiagent']['policy_mapping_fn'] = lambda agent_id: agent_id policy_conf['multiagent']['policies_to_train'] = ['ppo_policy'] policy_conf['env_config'] = env_config + + logger.info('PPO Configuration: \n %s', pformat(policy_conf)) trainer = ppo.PPOTrainer(env='test_env', config=policy_conf) # Single training iteration, just for testing. - result = trainer.train() - print(pretty_print(result)) - -if __name__ == '__main__': try: - _main() + result = trainer.train() + print('Results: \n {}'.format(pretty_print(result))) except Exception: EXC_TYPE, EXC_VALUE, EXC_TRACEBACK = sys.exc_info() traceback.print_exception(EXC_TYPE, EXC_VALUE, EXC_TRACEBACK, file=sys.stdout) finally: ray.shutdown() + +if __name__ == '__main__': + _main() diff --git a/rllibsumoutils/sumoconnector.py b/rllibsumoutils/sumoconnector.py index d492007..4fa80a6 100644 --- a/rllibsumoutils/sumoconnector.py +++ b/rllibsumoutils/sumoconnector.py @@ -9,11 +9,15 @@ import logging import os +from random import random import sys +from datetime import datetime # """ Import SUMO library """ if 'SUMO_HOME' in os.environ: sys.path.append(os.path.join(os.environ['SUMO_HOME'], 'tools')) + import sumolib + # import traci import libsumo as traci import traci.constants as tc else: @@ -21,41 +25,73 @@ #################################################################################################### -LOGGER = logging.getLogger(__name__) +logging.basicConfig() +logger = logging.getLogger(__name__) + +#################################################################################################### + +DEFAULT_CONFIG = { + # SUMO configuration file. Required. String. + 'sumo_cfg': None, + # Overides . + # Required when using multiple environments at the same time. String. + 'sumo_output': '', + # Additional parameter for the SUMO command line. + # It cannot contain --output-prefix. List of strings. + 'sumo_params': None, + # Enable TraCI trace file. Boolean. + 'trace_file': False, + # SUMO Simulation ending time, in seconds. Float. + 'end_of_sim': None, + # SUMO update frequency in number of traci.simulationStep() calls. Integer. + 'update_freq': 1, + # SUMO tripinfo file as defined in the sumo configuration file in . + # Required for gathering metrics only. String. + 'tripinfo_keyword': None, + # SUMO tripinfo XML Schema file. Required for gathering metrics only. String. + 'tripinfo_xml_schema': None, + # Logging legel. Should be one of DEBUG, INFO, WARN, or ERROR. + 'log_level': 'WARN', + # Anything. User defined. + 'misc': None, +} #################################################################################################### class SUMOConnector(object): """ Handler of a SUMO simulation. """ + def __init__(self, config): """ Initialize SUMO and sets the beginning of the simulation. Param: - config: Dict. - { - 'sumo_cfg': SUMO configuration file. String. - 'sumo_params': Additional parameter for the SUMO command line. - List of strings. - 'end_of_sim': Simulation ending time, in seconds. Float. - 'update_freq': SUMO update frequency in number of traci.simulationStep() calls. - Integer. - 'tripinfo_xml_file': SUMO tripinfo file. Required for gathering metrics only. - String. - 'tripinfo_xml_schema': SUMO tripinfo XML Schema file. - Required for gathering metrics only. String. - 'misc': Anything. User defined. - } + config: Dict. See DEFAULT_CONFIG. """ self._config = config - sumo_parameters = ['sumo', '-c', config['sumo_cfg']] - if config['sumo_params']: - sumo_parameters.extend(config['sumo_params']) - traci.start(sumo_parameters) + # logging + level = logging.getLevelName(config['log_level']) + logger.setLevel(level) + + # TraCI Handler and SUMO simulation + self._sumo_label = '{}'.format(self._get_unique_id()) + self._sumo_output_prefix = '{}{}'.format(config['sumo_output'], self._sumo_label) + self._sumo_parameters = ['sumo', '-c', config['sumo_cfg']] + if config['sumo_params'] is not None: + self._sumo_parameters.extend(config['sumo_params']) + self._sumo_parameters.extend(['--output-prefix', self._sumo_output_prefix]) + logger.debug('SUMO command line: %s', str(self._sumo_parameters)) + if config['trace_file']: + traci.start(self._sumo_parameters, + traceFile='{}.tracefile.log'.format(self._sumo_output_prefix)) + else: + traci.start(self._sumo_parameters) self.traci_handler = traci + ## From now on, the call must always be to self.traci_handler + self._is_ongoing = True - self._start_time = traci.simulation.getTime() + self._start_time = self.traci_handler.simulation.getTime() self._sumo_steps = 0 self._manually_stopped = False @@ -66,7 +102,17 @@ def __init__(self, config): self._initialize_metrics() def __del__(self): - self.end_simulation() + try: + self.end_simulation() + except KeyError: + logger.warning('Simulation %s already closed.', self._sumo_label) + #TODO: delete files + + @staticmethod + def _get_unique_id(): + now = datetime.utcnow() + now_in_sec = (now - datetime(1970, 1, 1)).total_seconds() + return round(random() * now_in_sec) ################################################################################################ @@ -91,9 +137,10 @@ def _stopping_condition(self, current_step_counter, until_end): if self.traci_handler.simulation.getMinExpectedNumber() <= 0: # No entities left in the simulation. return True - if self.traci_handler.simulation.getTime() > self._config['end_of_sim']: - # the simulatio reach the predefined (from parameters) end - return True + if self._config['end_of_sim'] is not None: + if self.traci_handler.simulation.getTime() > self._config['end_of_sim']: + # the simulatio reach the predefined (from parameters) end + return True if current_step_counter == self._config['update_freq'] and not until_end: return True return False @@ -112,26 +159,21 @@ def step(self, until_end=False, agents=set()): """ ## Execute SUMO steps until the learning needs to happen current_step_counter = 0 - LOGGER.debug('============================================================================') + logger.debug('============================================================================') while not self._stopping_condition(current_step_counter, until_end): - LOGGER.debug('[%s] Current step counter: %d, Update frequency: %d', + logger.debug('[%s] Current step counter: %d, Update frequency: %d', str(until_end), current_step_counter, self._config['update_freq']) self.traci_handler.simulationStep() self._sumo_steps += 1 current_step_counter += 1 self._default_step_action(agents) - LOGGER.debug('============================================================================') - - ## Set if simulation has ended - self._is_ongoing = (not self._manually_stopped and - self.traci_handler.simulation.getMinExpectedNumber() > 0 and - self.traci_handler.simulation.getTime() <= self._config['end_of_sim']) - - if not self.is_ongoing_sim(): - LOGGER.debug('The SUMO simulation is done.') + logger.debug('============================================================================') # If the simulation has finished - return self.is_ongoing_sim() + if self.is_ongoing_sim(): + return True + logger.debug('The SUMO simulation is done.') + return False def fast_forward(self, time): """ @@ -139,9 +181,9 @@ def fast_forward(self, time): Param: time: Float, simulation time in seconds. """ - LOGGER.debug('Fast-forward from time %.2f', self.traci_handler.simulation.getTime()) + logger.debug('Fast-forward from time %.2f', self.traci_handler.simulation.getTime()) self.traci_handler.simulationStep(float(time)) - LOGGER.debug('Fast-forward to time %.2f', self.traci_handler.simulation.getTime()) + logger.debug('Fast-forward to time %.2f', self.traci_handler.simulation.getTime()) ################################################################################################ @@ -151,7 +193,16 @@ def get_sumo_steps(self): def is_ongoing_sim(self): """ Return True iff the SUMO simulation is still ongoing. """ - return self._is_ongoing and not self._manually_stopped + if self._manually_stopped: + return False + if self.traci_handler.simulation.getMinExpectedNumber() <= 0: + # No entities left in the simulation. + return False + if self._config['end_of_sim'] is not None: + if self.traci_handler.simulation.getTime() > self._config['end_of_sim']: + # the simulatio reach the predefined (from parameters) end + return False + return True def get_current_time(self): """ Returns the current simulation time, or None if the simulation is not ongoing. """ @@ -161,11 +212,11 @@ def get_current_time(self): def end_simulation(self): """ Forces the simulation to stop. """ - self._manually_stopped = True - try: + if self.is_ongoing_sim(): + logger.info('Closing TraCI %s', self._sumo_label) + self._manually_stopped = True self.traci_handler.close() - except KeyError: - # The 'default' connection was already closed. - pass + else: + logger.warning('TraCI %s is already closed.', self._sumo_label) ################################################################################################ diff --git a/rllibsumoutils/sumoutils.py b/rllibsumoutils/sumoutils.py index ecf5dbf..965f914 100644 --- a/rllibsumoutils/sumoutils.py +++ b/rllibsumoutils/sumoutils.py @@ -8,6 +8,7 @@ """ import collections +from copy import deepcopy import logging import os import sys @@ -16,7 +17,7 @@ from lxml import etree -from rllibsumoutils.sumoconnector import SUMOConnector +from rllibsumoutils.sumoconnector import SUMOConnector, DEFAULT_CONFIG # """ Import SUMO library """ if 'SUMO_HOME' in os.environ: @@ -26,7 +27,17 @@ else: sys.exit("please declare environment variable 'SUMO_HOME'") -LOGGER = logging.getLogger(__name__) +#################################################################################################### + +logging.basicConfig() +logger = logging.getLogger(__name__) + +#################################################################################################### + +def sumo_default_config(): + return deepcopy(DEFAULT_CONFIG) + +#################################################################################################### class SUMOUtils(SUMOConnector): """ A wrapper for the interaction with the SUMO simulation that adds functionalities """ @@ -45,9 +56,9 @@ def process_tripinfo_file(self): It requires 'tripinfo_xml_file' and 'tripinfo_xml_schema' configuration parametes set. """ - if 'tripinfo_xml_file' not in self._config: + if 'tripinfo_keyword' not in self._config: raise Exception( - 'Function process_tripinfo_file requires the parameter "tripinfo_xml_file" set.', + 'Function process_tripinfo_file requires the parameter "tripinfo_keyword" set.', self._config) if 'tripinfo_xml_schema' not in self._config: @@ -64,9 +75,10 @@ def process_tripinfo_file(self): schema = etree.XMLSchema(file=self._config['tripinfo_xml_schema']) parser = etree.XMLParser(schema=schema) - tree = etree.parse(self._config['tripinfo_xml_file'], parser) + tripinfo_file = '{}{}'.format(self._sumo_output_prefix, self._config['tripinfo_keyword']) + tree = etree.parse(tripinfo_file, parser) - LOGGER.debug('Processing %s tripinfo file.', self._config['tripinfo_xml_file']) + logger.info('Processing %s tripinfo file.', tripinfo_file) for element in tree.getroot(): if element.tag == 'tripinfo': self.tripinfo[element.attrib['id']] = dict(element.attrib) @@ -78,8 +90,8 @@ def process_tripinfo_file(self): self.personinfo[element.attrib['id']]['stages'] = stages else: raise Exception('Unrecognized element in the tripinfo file.') - LOGGER.debug('TRIPINFO: \n%s', pformat(self.tripinfo)) - LOGGER.debug('PERSONINFO: \n%s', pformat(self.personinfo)) + logger.debug('TRIPINFO: \n%s', pformat(self.tripinfo)) + logger.debug('PERSONINFO: \n%s', pformat(self.personinfo)) def get_timeloss(self, entity, default=float('NaN')): """ @@ -91,31 +103,31 @@ def get_timeloss(self, entity, default=float('NaN')): If the entity does not exist or does not have the value, it returns the default value. """ if entity in self.tripinfo: - LOGGER.debug('TRIPINFO for %s', entity) + logger.debug('TRIPINFO for %s', entity) if 'timeLoss' in self.tripinfo[entity]: - LOGGER.debug('timeLoss %s', self.tripinfo[entity]['timeLoss']) + logger.debug('timeLoss %s', self.tripinfo[entity]['timeLoss']) return float(self.tripinfo[entity]['timeLoss']) - LOGGER.debug('timeLoss not found.') + logger.debug('timeLoss not found.') return default elif entity in self.personinfo: - LOGGER.debug('PERSONINFO for %s', entity) - LOGGER.debug('%s', pformat(self.personinfo[entity])) + logger.debug('PERSONINFO for %s', entity) + logger.debug('%s', pformat(self.personinfo[entity])) time_loss, ts_found = 0.0, False for _, stage in self.personinfo[entity]['stages']: if 'timeLoss' in stage: - LOGGER.debug('timeLoss %s', stage['timeLoss']) + logger.debug('timeLoss %s', stage['timeLoss']) time_loss += float(stage['timeLoss']) ts_found = True if not ts_found: - LOGGER.debug('timeLoss not found.') + logger.debug('timeLoss not found.') return default if time_loss <= 0: - LOGGER.debug('ERROR: timeLoss is %.2f', time_loss) + logger.debug('ERROR: timeLoss is %.2f', time_loss) return default - LOGGER.debug('total timeLoss %.2f', time_loss) + logger.debug('total timeLoss %.2f', time_loss) return time_loss else: - LOGGER.debug('Entity %s not found.', entity) + logger.debug('Entity %s not found.', entity) return default def get_depart(self, entity, default=float('NaN')): @@ -128,20 +140,20 @@ def get_depart(self, entity, default=float('NaN')): If the entity does not exist or does not have the value, it returns the default value. """ if entity in self.tripinfo: - LOGGER.debug('TRIPINFO for %s', entity) + logger.debug('TRIPINFO for %s', entity) if 'depart' in self.tripinfo[entity]: - LOGGER.debug('depart %s', self.tripinfo[entity]['depart']) + logger.debug('depart %s', self.tripinfo[entity]['depart']) return float(self.tripinfo[entity]['depart']) - LOGGER.debug('depart not found.') + logger.debug('depart not found.') elif entity in self.personinfo: - LOGGER.debug('PERSONINFO for %s', entity) - LOGGER.debug('%s', pformat(self.personinfo[entity])) + logger.debug('PERSONINFO for %s', entity) + logger.debug('%s', pformat(self.personinfo[entity])) if 'depart' in self.personinfo[entity]: - LOGGER.debug('depart %s', self.personinfo[entity]['depart']) + logger.debug('depart %s', self.personinfo[entity]['depart']) return float(self.personinfo[entity]['depart']) - LOGGER.debug('depart not found.') + logger.debug('depart not found.') else: - LOGGER.debug('Entity %s not found.', entity) + logger.debug('Entity %s not found.', entity) return default def get_duration(self, entity, default=float('NaN')): @@ -154,14 +166,14 @@ def get_duration(self, entity, default=float('NaN')): If the entity does not exist or does not have the value, it returns the default value. """ if entity in self.tripinfo: - LOGGER.debug('TRIPINFO for %s', entity) + logger.debug('TRIPINFO for %s', entity) if 'duration' in self.tripinfo[entity]: - LOGGER.debug('duration %s', self.tripinfo[entity]['duration']) + logger.debug('duration %s', self.tripinfo[entity]['duration']) return float(self.tripinfo[entity]['duration']) - LOGGER.debug('duration not found.') + logger.debug('duration not found.') elif entity in self.personinfo: - LOGGER.debug('PERSONINFO for %s', entity) - LOGGER.debug('%s', pformat(self.personinfo[entity])) + logger.debug('PERSONINFO for %s', entity) + logger.debug('%s', pformat(self.personinfo[entity])) if 'depart' in self.personinfo[entity]: depart = float(self.personinfo[entity]['depart']) arrival = depart @@ -170,11 +182,11 @@ def get_duration(self, entity, default=float('NaN')): arrival = float(stage['arrival']) duration = arrival - depart if duration > 0: - LOGGER.debug('duration %d', duration) + logger.debug('duration %d', duration) return duration - LOGGER.debug('duration impossible to compute.') + logger.debug('duration impossible to compute.') else: - LOGGER.debug('Entity %s not found.', entity) + logger.debug('Entity %s not found.', entity) return default def get_arrival(self, entity, default=float('NaN')): @@ -187,30 +199,30 @@ def get_arrival(self, entity, default=float('NaN')): If the entity does not exist or does not have the value, it returns the default value. """ if entity in self.tripinfo: - LOGGER.debug('TRIPINFO for %s', entity) + logger.debug('TRIPINFO for %s', entity) if 'arrival' in self.tripinfo[entity]: - LOGGER.debug('arrival %s', self.tripinfo[entity]['arrival']) + logger.debug('arrival %s', self.tripinfo[entity]['arrival']) return float(self.tripinfo[entity]['arrival']) - LOGGER.debug('arrival not found.') + logger.debug('arrival not found.') return default elif entity in self.personinfo: - LOGGER.debug('PERSONINFO for %s', entity) + logger.debug('PERSONINFO for %s', entity) arrival, arrival_found = 0.0, False for _, stage in self.personinfo[entity]['stages']: if 'arrival' in stage: - LOGGER.debug('arrival %s', stage['arrival']) + logger.debug('arrival %s', stage['arrival']) arrival = float(stage['arrival']) arrival_found = True if not arrival_found: - LOGGER.debug('arrival not found.') + logger.debug('arrival not found.') return default if arrival <= 0: - LOGGER.debug('ERROR: arrival is %.2f', arrival) + logger.debug('ERROR: arrival is %.2f', arrival) return default - LOGGER.debug('total arrival %.2f', arrival) + logger.debug('total arrival %.2f', arrival) return arrival else: - LOGGER.debug('Entity %s not found.', entity) + logger.debug('Entity %s not found.', entity) return default def get_global_travel_time(self): From cce0d16878e175ff514d6c25a5355e7a5a01654b Mon Sep 17 00:00:00 2001 From: Lara CODECA Date: Tue, 28 Jul 2020 14:47:30 +0100 Subject: [PATCH 06/15] development files added to gitignore --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index b6e4761..3efbf71 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ +# Development +copy-to-devel-docker.sh + # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] From c87d14d4070dd6024f72bd41795f38170a242f1a Mon Sep 17 00:00:00 2001 From: Lara CODECA Date: Tue, 28 Jul 2020 14:49:32 +0100 Subject: [PATCH 07/15] MAJOR CHANGE for speed up: import libsumo as traci --- example/marlenvironment.py | 7 +++++-- rllibsumoutils/sumoconnector.py | 3 +-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/example/marlenvironment.py b/example/marlenvironment.py index 54a4573..064d161 100644 --- a/example/marlenvironment.py +++ b/example/marlenvironment.py @@ -168,8 +168,11 @@ def get_observation(self, agent): if agent in self.simulation.veh_subscriptions: speed = round(self.simulation.veh_subscriptions[agent][tc.VAR_SPEED] * MS_TO_KMH) leader = self.simulation.veh_subscriptions[agent][tc.VAR_LEADER] - if leader: - distance = round(leader[1]) + if leader: ## compatible with traci + veh, dist = leader + if veh: + ## compatible with libsumo + distance = round(dist) ret = [speed, distance] print('Observation: {}'.format(ret)) return ret diff --git a/rllibsumoutils/sumoconnector.py b/rllibsumoutils/sumoconnector.py index 6a0a736..d492007 100644 --- a/rllibsumoutils/sumoconnector.py +++ b/rllibsumoutils/sumoconnector.py @@ -14,7 +14,7 @@ # """ Import SUMO library """ if 'SUMO_HOME' in os.environ: sys.path.append(os.path.join(os.environ['SUMO_HOME'], 'tools')) - import traci + import libsumo as traci import traci.constants as tc else: raise Exception("Please declare environment variable 'SUMO_HOME'") @@ -169,4 +169,3 @@ def end_simulation(self): pass ################################################################################################ - \ No newline at end of file From 0b69fd43011b39f82f050f5d6ff2cd14e2365050 Mon Sep 17 00:00:00 2001 From: Lara CODECA Date: Tue, 28 Jul 2020 14:49:54 +0100 Subject: [PATCH 08/15] PPO policy_conf['multiagent'] fix --- example/train.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/example/train.py b/example/train.py index a923df2..a4b086a 100644 --- a/example/train.py +++ b/example/train.py @@ -106,11 +106,9 @@ def _main(): marl_env.get_obs_space(agent), marl_env.get_action_space(agent), agent_policy_params) - policy_conf['multiagent'] = { - 'policies': policies, - 'policy_mapping_fn': lambda agent_id: agent_id, - 'policies_to_train': ['ppo_policy'], - } + policy_conf['multiagent']['policies'] = policies + policy_conf['multiagent']['policy_mapping_fn'] = lambda agent_id: agent_id + policy_conf['multiagent']['policies_to_train'] = ['ppo_policy'] policy_conf['env_config'] = env_config trainer = ppo.PPOTrainer(env='test_env', config=policy_conf) From 36d5487d109bbe8ab08a1b001f0ff40183a749b4 Mon Sep 17 00:00:00 2001 From: Lara CODECA Date: Tue, 28 Jul 2020 16:24:16 +0100 Subject: [PATCH 09/15] specific ppo trainer --- example/{train.py => ppotrain.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename example/{train.py => ppotrain.py} (100%) diff --git a/example/train.py b/example/ppotrain.py similarity index 100% rename from example/train.py rename to example/ppotrain.py From 7d77df3bfe121fc53771a3915e2189bede87ed4c Mon Sep 17 00:00:00 2001 From: Lara CODECA Date: Thu, 30 Jul 2020 19:30:38 +0100 Subject: [PATCH 10/15] libsumo - logger - configurations - parallel environments - ppo & a3c trainers --- example/a3ctrain.py | 120 +++++++++++++++++++++++++++ example/marlenvironment.py | 128 +++++++++++++++++++---------- example/ppotrain.py | 94 +++++++++------------ rllibsumoutils/sumoconnector.py | 139 ++++++++++++++++++++++---------- rllibsumoutils/sumoutils.py | 98 ++++++++++++---------- 5 files changed, 392 insertions(+), 187 deletions(-) create mode 100644 example/a3ctrain.py diff --git a/example/a3ctrain.py b/example/a3ctrain.py new file mode 100644 index 0000000..15752e6 --- /dev/null +++ b/example/a3ctrain.py @@ -0,0 +1,120 @@ +#!/usr/bin/env python3 + +""" Example Trainer for RLLIB + SUMO Utlis + + Author: Lara CODECA + + This program and the accompanying materials are made available under the + terms of the Eclipse Public License 2.0 which is available at + http://www.eclipse.org/legal/epl-2.0. +""" + +from copy import deepcopy +import logging +import pathlib +from pprint import pformat +import random +import sys +import traceback + +import ray + +from ray.rllib.agents.a3c import a3c +from ray.tune.logger import pretty_print + +import marlenvironment + +logging.basicConfig(level=logging.WARN) +logger = logging.getLogger('a3ctrain') + +def _main(): + """ Training example """ + + # Initialize RAY. + ray.tune.registry.register_env('test_env', marlenvironment.env_creator) + ray.init(memory=52428800, object_store_memory=78643200) ## minimum values + + # Algorithm. + policy_class = a3c.A3CTFPolicy + + # https://github.com/ray-project/ray/blob/releases/0.8.6/rllib/agents/trainer.py#L44 + # https://github.com/ray-project/ray/blob/releases/0.8.6/rllib/agents/a3c/a3c.py#L14 + policy_conf = a3c.DEFAULT_CONFIG + policy_conf['batch_mode'] = 'complete_episodes' + policy_conf['collect_metrics_timeout'] = 86400 # 1 day timeout + policy_conf['ignore_worker_failures'] = False + policy_conf['log_level'] = 'WARN' + policy_conf['min_iter_time_s'] = 5 + policy_conf['monitor'] = True + policy_conf['no_done_at_end'] = True + policy_conf['num_envs_per_worker'] = 1 + policy_conf['num_workers'] = 2 + policy_conf['remote_env_batch_wait_ms'] = 1000 + policy_conf['remote_worker_envs'] = False + policy_conf['rollout_fragment_length'] = 1 + policy_conf['seed'] = 666 + policy_conf['sgd_minibatch_size'] = 1 + policy_conf['simple_optimizer'] = True + policy_conf['timesteps_per_iteration'] = 1000 + policy_conf['train_batch_size'] = 1 + policy_conf['use_gae'] = False + + # https://github.com/ray-project/ray/blob/releases/0.8.6/rllib/models/catalog.py#L37 + policy_conf['model']['use_lstm'] = True + + # Load default Scenario configuration for the LEARNING ENVIRONMENT + scenario_config = deepcopy(marlenvironment.DEFAULT_SCENARIO_CONFING) + scenario_config['seed'] = 42 + scenario_config['log_level'] = 'INFO' + scenario_config['sumo_config']['sumo_cfg'] = '{}/scenario/sumo.cfg.xml'.format( + pathlib.Path(__file__).parent.absolute()) + scenario_config['sumo_config']['sumo_params'] = ['--collision.action', 'warn'] + scenario_config['sumo_config']['end_of_sim'] = 3600 # [s] + scenario_config['sumo_config']['update_freq'] = 10 # number of traci.simulationStep() + # for each environment step. + scenario_config['sumo_config']['log_level'] = 'INFO' + logger.info('Scenario Configuration: \n %s', pformat(scenario_config)) + + # Associate the agents with their configuration. + agent_init = { + 'agent_0': deepcopy(marlenvironment.DEFAULT_AGENT_CONFING), + 'agent_1': deepcopy(marlenvironment.DEFAULT_AGENT_CONFING), + } + logger.info('Agents Configuration: \n %s', pformat(agent_init)) + + ## MARL Environment Init + env_config = { + 'agent_init': agent_init, + 'scenario_config': scenario_config, + } + marl_env = marlenvironment.SUMOTestMultiAgentEnv(env_config) + + # Config for the A3C trainer from the MARLEnv + policies = {} + for agent in marl_env.get_agents(): + agent_policy_params = {} + policies[agent] = (policy_class, + marl_env.get_obs_space(agent), + marl_env.get_action_space(agent), + agent_policy_params) + policy_conf['multiagent']['policies'] = policies + policy_conf['multiagent']['policy_mapping_fn'] = lambda agent_id: agent_id + policy_conf['multiagent']['policies_to_train'] = ['a3c_policy'] + policy_conf['env_config'] = env_config + + logger.info('a3cA3C Configuration: \n %s', pformat(policy_conf)) + trainer = a3c.A3CTrainer(env='test_env', + config=policy_conf) + + # Single training iteration, just for testing. + try: + result = trainer.train() + print('Results: \n {}'.format(pretty_print(result))) + except Exception: + EXC_TYPE, EXC_VALUE, EXC_TRACEBACK = sys.exc_info() + traceback.print_exception(EXC_TYPE, EXC_VALUE, EXC_TRACEBACK, file=sys.stdout) + finally: + ray.shutdown() + +if __name__ == '__main__': + _main() diff --git a/example/marlenvironment.py b/example/marlenvironment.py index 064d161..9b6ba0f 100644 --- a/example/marlenvironment.py +++ b/example/marlenvironment.py @@ -8,6 +8,7 @@ """ import collections +import logging import os import sys from pprint import pformat @@ -17,7 +18,7 @@ import gym from ray.rllib.env import MultiAgentEnv -from rllibsumoutils.sumoutils import SUMOUtils +from rllibsumoutils.sumoutils import SUMOUtils, sumo_default_config # """ Import SUMO library """ if 'SUMO_HOME' in os.environ: @@ -27,15 +28,21 @@ else: sys.exit("please declare environment variable 'SUMO_HOME'") +#################################################################################################### + +logger = logging.getLogger(__name__) + +#################################################################################################### + def env_creator(config): """ Environment creator used in the environment registration. """ - print('[env_creator] Environment creation: SUMOTestMultiAgentEnv') + logger.info('Environment creation: SUMOTestMultiAgentEnv') return SUMOTestMultiAgentEnv(config) -MS_TO_KMH = 3.6 - #################################################################################################### +MS_TO_KMH = 3.6 + class SUMOSimulationWrapper(SUMOUtils): """ A wrapper for the interaction with the SUMO simulation """ @@ -63,19 +70,19 @@ def _default_step_action(self, agents): pass # get collisions collisions = self.traci_handler.simulation.getCollidingVehiclesIDList() - print('Collisions: {}'.format(collisions)) + logger.debug('Collisions: %s', pformat(collisions)) for veh in collisions: self.collisions[veh] += 1 # get subscriptions self.veh_subscriptions = self.traci_handler.vehicle.getAllSubscriptionResults() for veh, vals in self.veh_subscriptions.items(): - print('Subs:', veh, vals) + logger.debug('Subs: %s, %s', pformat(veh), pformat(vals)) running = set() for agent in agents: if agent in self.veh_subscriptions: running.add(agent) if len(running) == 0: - print('All the agent left the simulation..') + logger.info('All the agent left the simulation..') self.end_simulation() return True @@ -90,22 +97,21 @@ def __init__(self, agent, config): self.action_to_meaning = dict() for pos, action in enumerate(config['actions']): self.action_to_meaning[pos] = config['actions'][action] - print('CONFIG: agent "{}"'.format(self.agent_id)) - print('CONFIG: \n{}'.format(pformat(self.config))) - print('CONFIG: actions \n{}'.format(pformat(self.action_to_meaning))) + logger.debug('Agent "%s" configuration \n %s', self.agent_id, pformat(self.config)) def step(self, action, sumo_handler): """ Implements the logic of each specific action passed as input. """ - print('Agent {}: action {}'.format(self.agent_id, action)) + logger.debug('Agent %s: action %d', self.agent_id, action) # Subscriptions EXAMPLE: # {'agent_0': {64: 14.603468282230542, 104: None}, # 'agent_1': {64: 12.922797055918513, # 104: ('veh.19', 27.239870121802596)}} - print('Subscriptions: {}'.format(sumo_handler.veh_subscriptions[self.agent_id])) + logger.debug('Subscriptions: %s', pformat(sumo_handler.veh_subscriptions[self.agent_id])) previous_speed = sumo_handler.veh_subscriptions[self.agent_id][tc.VAR_SPEED] new_speed = previous_speed + self.action_to_meaning[action] - print('Before {}, After {}'.format(previous_speed, new_speed)) + logger.debug('Before %.2f', previous_speed) sumo_handler.traci_handler.vehicle.setSpeed(self.agent_id, new_speed) + logger.debug('After %.2f', new_speed) return def reset(self, sumo_handler): @@ -122,11 +128,33 @@ def reset(self, sumo_handler): self.agent_id, route, departLane='best', departSpeed='max') sumo_handler.traci_handler.vehicle.subscribeLeader(self.agent_id) sumo_handler.traci_handler.vehicle.subscribe(self.agent_id, varIDs=[tc.VAR_SPEED, ]) - print('Agent {} reset'.format(self.agent_id)) + logger.info('Agent %s reset done.', self.agent_id) return self.agent_id, self.config['start'] #################################################################################################### +DEFAULT_SCENARIO_CONFING = { + 'sumo_config': sumo_default_config(), + 'agent_rnd_order': True, + 'log_level': 'WARN', + 'seed': 42, + 'misc': { + 'max_distance': 5000, # [m] + } +} + +DEFAULT_AGENT_CONFING = { + 'origin': 'road', + 'destination': 'road', + 'start': 0, + 'actions': { # increase/decrease the speed of: + 'acc': 1.0, # [m/s] + 'none': 0.0, # [m/s] + 'dec': -1.0, # [m/s] + }, + 'max_speed': 130, # km/h +} + class SUMOTestMultiAgentEnv(MultiAgentEnv): """ A RLLIB environment for testing MARL environments with SUMO simulations. @@ -139,14 +167,26 @@ def __init__(self, config): self._config = config + # logging + level = logging.getLevelName(config['scenario_config']['log_level']) + logger.setLevel(level) + + # SUMO Connector self.simulation = None + + # Random number generator self.rndgen = RandomState(config['scenario_config']['seed']) + # Agent initialization self.agents_init_list = dict() self.agents = dict() for agent, agent_config in self._config['agent_init'].items(): self.agents[agent] = SUMOAgent(agent, agent_config) - self.resetted = False + + # Environment initialization + self.resetted = True + self.episodes = 0 + self.steps = 0 def seed(self, seed): """ Set the seed of a possible random number generator. """ @@ -156,6 +196,11 @@ def get_agents(self): """ Returns a list of the agents. """ return self.agents.keys() + def __del__(self): + logger.info('Environment destruction: SUMOTestMultiAgentEnv') + if self.simulation: + del self.simulation + ######################################### OBSERVATIONS ######################################### def get_observation(self, agent): @@ -164,7 +209,7 @@ def get_observation(self, agent): See http://sumo.sourceforge.net/pydoc/traci._simulation.html """ speed = 0 - distance = self._config['scenario_config']['misc']['max-distance'] + distance = self._config['scenario_config']['misc']['max_distance'] if agent in self.simulation.veh_subscriptions: speed = round(self.simulation.veh_subscriptions[agent][tc.VAR_SPEED] * MS_TO_KMH) leader = self.simulation.veh_subscriptions[agent][tc.VAR_LEADER] @@ -174,7 +219,7 @@ def get_observation(self, agent): ## compatible with libsumo distance = round(dist) ret = [speed, distance] - print('Observation: {}'.format(ret)) + logger.debug('Agent %s --> Obs: %s', agent, pformat(ret)) return ret def compute_observations(self, agents): @@ -182,19 +227,18 @@ def compute_observations(self, agents): obs = dict() for agent in agents: obs[agent] = self.get_observation(agent) - print('Observations: \n{}'.format(pformat(obs))) return obs ########################################### REWARDS ############################################ def get_reward(self, agent): """ Return the reward for a given agent. """ - speed = self.agents[agent].config['max-speed'] # if the agent is not in the subscriptions + speed = self.agents[agent].config['max_speed'] # if the agent is not in the subscriptions # and this function is called, the agent has # reached the end of the road if agent in self.simulation.veh_subscriptions: speed = round(self.simulation.veh_subscriptions[agent][tc.VAR_SPEED] * MS_TO_KMH) - print('Agent {} --> reward {}'.format(agent, speed)) + logger.debug('Agent %s --> Reward %d', agent, speed) return speed def compute_rewards(self, agents): @@ -209,18 +253,14 @@ def compute_rewards(self, agents): def reset(self): """ Resets the env and returns observations from ready agents. """ self.resetted = True + self.episodes += 1 + self.steps = 0 # Reset the SUMO simulation if self.simulation: del self.simulation - config = { - 'sumo_cfg': self._config['scenario_config']['sumo-cfg-file'], - 'sumo_params': self._config['scenario_config']['sumo-params'], - 'end_of_sim': self._config['scenario_config']['misc']['end-of-sim'], - 'update_freq': self._config['scenario_config']['misc']['algo-update-freq'], - } - self.simulation = SUMOSimulationWrapper(config) + self.simulation = SUMOSimulationWrapper(self._config['scenario_config']['sumo_config']) # Reset the agents waiting_agents = list() @@ -256,7 +296,9 @@ def step(self, action_dict): infos (dict): Optional info values for each agent id. """ self.resetted = False - print('============== SUMOTestMultiAgentEnv:step ==============') + self.steps += 1 + logger.debug('====> [SUMOTestMultiAgentEnv:step] Episode: %d - Step: %d <====', + self.episodes, self.steps) dones = {} dones['__all__'] = False @@ -264,22 +306,22 @@ def step(self, action_dict): # may need to be shuffled afterwards, but it # is a matter of consistency instead of using # whatever insertion order was used in the dict - if self._config['scenario_config']['agent-rnd-order']: + if self._config['scenario_config']['agent_rnd_order']: ## randomize the agent order to minimize SUMO's insertion queues impact - print('Shuffling the order of the agents.') + logger.debug('Shuffling the order of the agents.') self.rndgen.shuffle(shuffled_agents) # in-place shuffle # Take action for agent in shuffled_agents: self.agents[agent].step(action_dict[agent], self.simulation) - print('Before SUMO') + logger.debug('Before SUMO') ongoing_simulation = self.simulation.step(until_end=False, agents=set(action_dict.keys())) - print('After SUMO') + logger.debug('After SUMO') ## end of the episode if not ongoing_simulation: - print('Reached the end of the SUMO simulation.') + logger.info('Reached the end of the SUMO simulation.') dones['__all__'] = True obs, rewards, infos = {}, {}, {} @@ -290,7 +332,7 @@ def step(self, action_dict): # punish the agent and remove it from the simulation dones[agent] = True obs[agent] = [0, 0] - rewards[agent] = - self.agents[agent].config['max-speed'] + rewards[agent] = - self.agents[agent].config['max_speed'] # infos[agent] = 'Collision' self.simulation.traci_handler.remove(agent, reason=tc.REMOVE_VAPORIZED) else: @@ -299,11 +341,11 @@ def step(self, action_dict): rewards[agent] = self.get_reward(agent) # infos[agent] = '' - print('Observations: {}'.format(obs)) - print('Rewards: {}'.format(rewards)) - print('Dones: {}'.format(dones)) - print('Info: {}'.format(infos)) - print('========================================================') + logger.debug('Observations: %s', pformat(obs)) + logger.debug('Rewards: %s', pformat(rewards)) + logger.debug('Dones: %s', pformat(dones)) + logger.debug('Info: %s', pformat(infos)) + logger.debug('========================================================') return obs, rewards, dones, infos ################################## ACTIONS & OBSERATIONS SPACE ################################# @@ -322,11 +364,11 @@ def get_set_of_actions(self, agent): def get_obs_space_size(self, agent): """ Returns the size of the observation space. """ - return ((self.agents[agent].config['max-speed'] + 1) * - (self._config['scenario_config']['misc']['max-distance'] + 1)) + return ((self.agents[agent].config['max_speed'] + 1) * + (self._config['scenario_config']['misc']['max_distance'] + 1)) def get_obs_space(self, agent): """ Returns the observation space. """ return gym.spaces.MultiDiscrete( - [self.agents[agent].config['max-speed'] + 1, - self._config['scenario_config']['misc']['max-distance'] + 1]) + [self.agents[agent].config['max_speed'] + 1, + self._config['scenario_config']['misc']['max_distance'] + 1]) diff --git a/example/ppotrain.py b/example/ppotrain.py index a4b086a..4149eb1 100644 --- a/example/ppotrain.py +++ b/example/ppotrain.py @@ -10,7 +10,9 @@ """ from copy import deepcopy +import logging import pathlib +from pprint import pformat import random import sys import traceback @@ -22,70 +24,50 @@ import marlenvironment +logging.basicConfig(level=logging.WARN) +logger = logging.getLogger('ppotrain') + def _main(): """ Training example """ + # Initialize RAY. + ray.tune.registry.register_env('test_env', marlenvironment.env_creator) + ray.init(memory=52428800, object_store_memory=78643200) ## minimum values + # Algorithm. policy_class = ppo.PPOTFPolicy # https://github.com/ray-project/ray/blob/releases/0.8.3/rllib/agents/trainer.py#L41 # https://github.com/ray-project/ray/blob/releases/0.8.3/rllib/agents/ppo/ppo.py#L15 policy_conf = ppo.DEFAULT_CONFIG - policy_conf['log_level'] = 'INFO' + policy_conf['batch_mode'] = 'complete_episodes' + policy_conf['log_level'] = 'WARN' + policy_conf['min_iter_time_s'] = 5 policy_conf['num_workers'] = 2 - policy_conf['train_batch_size'] = 1 - policy_conf['sgd_minibatch_size'] = 1 policy_conf['rollout_fragment_length'] = 1 + policy_conf['seed'] = 42 + policy_conf['sgd_minibatch_size'] = 1 policy_conf['simple_optimizer'] = True - policy_conf['batch_mode'] = 'complete_episodes' - policy_params = {} + policy_conf['train_batch_size'] = 1 # Load default Scenario configuration for the LEARNING ENVIRONMENT - scenario_config = { - "agent-rnd-order": True, - "sumo-cfg-file": "{}/scenario/sumo.cfg.xml".format( - pathlib.Path(__file__).parent.absolute()), - "sumo-params": ['--collision.action', 'warn'], - "seed": 42, - "misc": { - "algo-update-freq": 10, # number of traci.simulationStep() each learning step. - "end-of-sim": 3600, # [s] - "max-distance": 5000, # [m] - } - } - - # Initialize RAY. - ray.tune.registry.register_env('test_env', marlenvironment.env_creator) - ray.init(memory=52428800, object_store_memory=78643200) ## minimum values + scenario_config = deepcopy(marlenvironment.DEFAULT_SCENARIO_CONFING) + scenario_config['seed'] = 42 + scenario_config['log_level'] = 'INFO' + scenario_config['sumo_config']['sumo_cfg'] = '{}/scenario/sumo.cfg.xml'.format( + pathlib.Path(__file__).parent.absolute()) + scenario_config['sumo_config']['sumo_params'] = ['--collision.action', 'warn'] + scenario_config['sumo_config']['end_of_sim'] = 3600 # [s] + scenario_config['sumo_config']['update_freq'] = 10 # number of traci.simulationStep() + # for each learning step. + scenario_config['sumo_config']['log_level'] = 'INFO' + logger.info('Scenario Configuration: \n %s', pformat(scenario_config)) # Associate the agents with their configuration. agent_init = { - "agent_0": { - "origin": "road", - "destination": "road", - "start": 0, - "actions": { # increase/decrease the speed of: - "acc": 1.0, # [m/s] - "none": 0.0, # [m/s] - "dec": -1.0, # [m/s] - }, - "max-speed": 130, # km/h - "seed": 75834444, - "init": [0, 0], - }, - "agent_1": { - "origin": "road", - "destination": "road", - "start": 0, - "actions": { - "acc": 1.0, - "none": 0.0, - "dec": -1.0, - }, - "max-speed": 130, - "seed": 44447583, - "init": [0, 0], - } + 'agent_0': deepcopy(marlenvironment.DEFAULT_AGENT_CONFING), + 'agent_1': deepcopy(marlenvironment.DEFAULT_AGENT_CONFING), } + logger.info('Agents Configuration: \n %s', pformat(agent_init)) ## MARL Environment Init env_config = { @@ -97,11 +79,7 @@ def _main(): # Config for the PPO trainer from the MARLEnv policies = {} for agent in marl_env.get_agents(): - agent_policy_params = deepcopy(policy_params) - from_val, to_val = agent_init[agent]['init'] - agent_policy_params['init'] = lambda: random.randint(from_val, to_val) - agent_policy_params['actions'] = marl_env.get_set_of_actions(agent) - agent_policy_params['seed'] = agent_init[agent]['seed'] + agent_policy_params = {} policies[agent] = (policy_class, marl_env.get_obs_space(agent), marl_env.get_action_space(agent), @@ -110,18 +88,20 @@ def _main(): policy_conf['multiagent']['policy_mapping_fn'] = lambda agent_id: agent_id policy_conf['multiagent']['policies_to_train'] = ['ppo_policy'] policy_conf['env_config'] = env_config + + logger.info('PPO Configuration: \n %s', pformat(policy_conf)) trainer = ppo.PPOTrainer(env='test_env', config=policy_conf) # Single training iteration, just for testing. - result = trainer.train() - print(pretty_print(result)) - -if __name__ == '__main__': try: - _main() + result = trainer.train() + print('Results: \n {}'.format(pretty_print(result))) except Exception: EXC_TYPE, EXC_VALUE, EXC_TRACEBACK = sys.exc_info() traceback.print_exception(EXC_TYPE, EXC_VALUE, EXC_TRACEBACK, file=sys.stdout) finally: ray.shutdown() + +if __name__ == '__main__': + _main() diff --git a/rllibsumoutils/sumoconnector.py b/rllibsumoutils/sumoconnector.py index d492007..4fa80a6 100644 --- a/rllibsumoutils/sumoconnector.py +++ b/rllibsumoutils/sumoconnector.py @@ -9,11 +9,15 @@ import logging import os +from random import random import sys +from datetime import datetime # """ Import SUMO library """ if 'SUMO_HOME' in os.environ: sys.path.append(os.path.join(os.environ['SUMO_HOME'], 'tools')) + import sumolib + # import traci import libsumo as traci import traci.constants as tc else: @@ -21,41 +25,73 @@ #################################################################################################### -LOGGER = logging.getLogger(__name__) +logging.basicConfig() +logger = logging.getLogger(__name__) + +#################################################################################################### + +DEFAULT_CONFIG = { + # SUMO configuration file. Required. String. + 'sumo_cfg': None, + # Overides . + # Required when using multiple environments at the same time. String. + 'sumo_output': '', + # Additional parameter for the SUMO command line. + # It cannot contain --output-prefix. List of strings. + 'sumo_params': None, + # Enable TraCI trace file. Boolean. + 'trace_file': False, + # SUMO Simulation ending time, in seconds. Float. + 'end_of_sim': None, + # SUMO update frequency in number of traci.simulationStep() calls. Integer. + 'update_freq': 1, + # SUMO tripinfo file as defined in the sumo configuration file in . + # Required for gathering metrics only. String. + 'tripinfo_keyword': None, + # SUMO tripinfo XML Schema file. Required for gathering metrics only. String. + 'tripinfo_xml_schema': None, + # Logging legel. Should be one of DEBUG, INFO, WARN, or ERROR. + 'log_level': 'WARN', + # Anything. User defined. + 'misc': None, +} #################################################################################################### class SUMOConnector(object): """ Handler of a SUMO simulation. """ + def __init__(self, config): """ Initialize SUMO and sets the beginning of the simulation. Param: - config: Dict. - { - 'sumo_cfg': SUMO configuration file. String. - 'sumo_params': Additional parameter for the SUMO command line. - List of strings. - 'end_of_sim': Simulation ending time, in seconds. Float. - 'update_freq': SUMO update frequency in number of traci.simulationStep() calls. - Integer. - 'tripinfo_xml_file': SUMO tripinfo file. Required for gathering metrics only. - String. - 'tripinfo_xml_schema': SUMO tripinfo XML Schema file. - Required for gathering metrics only. String. - 'misc': Anything. User defined. - } + config: Dict. See DEFAULT_CONFIG. """ self._config = config - sumo_parameters = ['sumo', '-c', config['sumo_cfg']] - if config['sumo_params']: - sumo_parameters.extend(config['sumo_params']) - traci.start(sumo_parameters) + # logging + level = logging.getLevelName(config['log_level']) + logger.setLevel(level) + + # TraCI Handler and SUMO simulation + self._sumo_label = '{}'.format(self._get_unique_id()) + self._sumo_output_prefix = '{}{}'.format(config['sumo_output'], self._sumo_label) + self._sumo_parameters = ['sumo', '-c', config['sumo_cfg']] + if config['sumo_params'] is not None: + self._sumo_parameters.extend(config['sumo_params']) + self._sumo_parameters.extend(['--output-prefix', self._sumo_output_prefix]) + logger.debug('SUMO command line: %s', str(self._sumo_parameters)) + if config['trace_file']: + traci.start(self._sumo_parameters, + traceFile='{}.tracefile.log'.format(self._sumo_output_prefix)) + else: + traci.start(self._sumo_parameters) self.traci_handler = traci + ## From now on, the call must always be to self.traci_handler + self._is_ongoing = True - self._start_time = traci.simulation.getTime() + self._start_time = self.traci_handler.simulation.getTime() self._sumo_steps = 0 self._manually_stopped = False @@ -66,7 +102,17 @@ def __init__(self, config): self._initialize_metrics() def __del__(self): - self.end_simulation() + try: + self.end_simulation() + except KeyError: + logger.warning('Simulation %s already closed.', self._sumo_label) + #TODO: delete files + + @staticmethod + def _get_unique_id(): + now = datetime.utcnow() + now_in_sec = (now - datetime(1970, 1, 1)).total_seconds() + return round(random() * now_in_sec) ################################################################################################ @@ -91,9 +137,10 @@ def _stopping_condition(self, current_step_counter, until_end): if self.traci_handler.simulation.getMinExpectedNumber() <= 0: # No entities left in the simulation. return True - if self.traci_handler.simulation.getTime() > self._config['end_of_sim']: - # the simulatio reach the predefined (from parameters) end - return True + if self._config['end_of_sim'] is not None: + if self.traci_handler.simulation.getTime() > self._config['end_of_sim']: + # the simulatio reach the predefined (from parameters) end + return True if current_step_counter == self._config['update_freq'] and not until_end: return True return False @@ -112,26 +159,21 @@ def step(self, until_end=False, agents=set()): """ ## Execute SUMO steps until the learning needs to happen current_step_counter = 0 - LOGGER.debug('============================================================================') + logger.debug('============================================================================') while not self._stopping_condition(current_step_counter, until_end): - LOGGER.debug('[%s] Current step counter: %d, Update frequency: %d', + logger.debug('[%s] Current step counter: %d, Update frequency: %d', str(until_end), current_step_counter, self._config['update_freq']) self.traci_handler.simulationStep() self._sumo_steps += 1 current_step_counter += 1 self._default_step_action(agents) - LOGGER.debug('============================================================================') - - ## Set if simulation has ended - self._is_ongoing = (not self._manually_stopped and - self.traci_handler.simulation.getMinExpectedNumber() > 0 and - self.traci_handler.simulation.getTime() <= self._config['end_of_sim']) - - if not self.is_ongoing_sim(): - LOGGER.debug('The SUMO simulation is done.') + logger.debug('============================================================================') # If the simulation has finished - return self.is_ongoing_sim() + if self.is_ongoing_sim(): + return True + logger.debug('The SUMO simulation is done.') + return False def fast_forward(self, time): """ @@ -139,9 +181,9 @@ def fast_forward(self, time): Param: time: Float, simulation time in seconds. """ - LOGGER.debug('Fast-forward from time %.2f', self.traci_handler.simulation.getTime()) + logger.debug('Fast-forward from time %.2f', self.traci_handler.simulation.getTime()) self.traci_handler.simulationStep(float(time)) - LOGGER.debug('Fast-forward to time %.2f', self.traci_handler.simulation.getTime()) + logger.debug('Fast-forward to time %.2f', self.traci_handler.simulation.getTime()) ################################################################################################ @@ -151,7 +193,16 @@ def get_sumo_steps(self): def is_ongoing_sim(self): """ Return True iff the SUMO simulation is still ongoing. """ - return self._is_ongoing and not self._manually_stopped + if self._manually_stopped: + return False + if self.traci_handler.simulation.getMinExpectedNumber() <= 0: + # No entities left in the simulation. + return False + if self._config['end_of_sim'] is not None: + if self.traci_handler.simulation.getTime() > self._config['end_of_sim']: + # the simulatio reach the predefined (from parameters) end + return False + return True def get_current_time(self): """ Returns the current simulation time, or None if the simulation is not ongoing. """ @@ -161,11 +212,11 @@ def get_current_time(self): def end_simulation(self): """ Forces the simulation to stop. """ - self._manually_stopped = True - try: + if self.is_ongoing_sim(): + logger.info('Closing TraCI %s', self._sumo_label) + self._manually_stopped = True self.traci_handler.close() - except KeyError: - # The 'default' connection was already closed. - pass + else: + logger.warning('TraCI %s is already closed.', self._sumo_label) ################################################################################################ diff --git a/rllibsumoutils/sumoutils.py b/rllibsumoutils/sumoutils.py index ecf5dbf..965f914 100644 --- a/rllibsumoutils/sumoutils.py +++ b/rllibsumoutils/sumoutils.py @@ -8,6 +8,7 @@ """ import collections +from copy import deepcopy import logging import os import sys @@ -16,7 +17,7 @@ from lxml import etree -from rllibsumoutils.sumoconnector import SUMOConnector +from rllibsumoutils.sumoconnector import SUMOConnector, DEFAULT_CONFIG # """ Import SUMO library """ if 'SUMO_HOME' in os.environ: @@ -26,7 +27,17 @@ else: sys.exit("please declare environment variable 'SUMO_HOME'") -LOGGER = logging.getLogger(__name__) +#################################################################################################### + +logging.basicConfig() +logger = logging.getLogger(__name__) + +#################################################################################################### + +def sumo_default_config(): + return deepcopy(DEFAULT_CONFIG) + +#################################################################################################### class SUMOUtils(SUMOConnector): """ A wrapper for the interaction with the SUMO simulation that adds functionalities """ @@ -45,9 +56,9 @@ def process_tripinfo_file(self): It requires 'tripinfo_xml_file' and 'tripinfo_xml_schema' configuration parametes set. """ - if 'tripinfo_xml_file' not in self._config: + if 'tripinfo_keyword' not in self._config: raise Exception( - 'Function process_tripinfo_file requires the parameter "tripinfo_xml_file" set.', + 'Function process_tripinfo_file requires the parameter "tripinfo_keyword" set.', self._config) if 'tripinfo_xml_schema' not in self._config: @@ -64,9 +75,10 @@ def process_tripinfo_file(self): schema = etree.XMLSchema(file=self._config['tripinfo_xml_schema']) parser = etree.XMLParser(schema=schema) - tree = etree.parse(self._config['tripinfo_xml_file'], parser) + tripinfo_file = '{}{}'.format(self._sumo_output_prefix, self._config['tripinfo_keyword']) + tree = etree.parse(tripinfo_file, parser) - LOGGER.debug('Processing %s tripinfo file.', self._config['tripinfo_xml_file']) + logger.info('Processing %s tripinfo file.', tripinfo_file) for element in tree.getroot(): if element.tag == 'tripinfo': self.tripinfo[element.attrib['id']] = dict(element.attrib) @@ -78,8 +90,8 @@ def process_tripinfo_file(self): self.personinfo[element.attrib['id']]['stages'] = stages else: raise Exception('Unrecognized element in the tripinfo file.') - LOGGER.debug('TRIPINFO: \n%s', pformat(self.tripinfo)) - LOGGER.debug('PERSONINFO: \n%s', pformat(self.personinfo)) + logger.debug('TRIPINFO: \n%s', pformat(self.tripinfo)) + logger.debug('PERSONINFO: \n%s', pformat(self.personinfo)) def get_timeloss(self, entity, default=float('NaN')): """ @@ -91,31 +103,31 @@ def get_timeloss(self, entity, default=float('NaN')): If the entity does not exist or does not have the value, it returns the default value. """ if entity in self.tripinfo: - LOGGER.debug('TRIPINFO for %s', entity) + logger.debug('TRIPINFO for %s', entity) if 'timeLoss' in self.tripinfo[entity]: - LOGGER.debug('timeLoss %s', self.tripinfo[entity]['timeLoss']) + logger.debug('timeLoss %s', self.tripinfo[entity]['timeLoss']) return float(self.tripinfo[entity]['timeLoss']) - LOGGER.debug('timeLoss not found.') + logger.debug('timeLoss not found.') return default elif entity in self.personinfo: - LOGGER.debug('PERSONINFO for %s', entity) - LOGGER.debug('%s', pformat(self.personinfo[entity])) + logger.debug('PERSONINFO for %s', entity) + logger.debug('%s', pformat(self.personinfo[entity])) time_loss, ts_found = 0.0, False for _, stage in self.personinfo[entity]['stages']: if 'timeLoss' in stage: - LOGGER.debug('timeLoss %s', stage['timeLoss']) + logger.debug('timeLoss %s', stage['timeLoss']) time_loss += float(stage['timeLoss']) ts_found = True if not ts_found: - LOGGER.debug('timeLoss not found.') + logger.debug('timeLoss not found.') return default if time_loss <= 0: - LOGGER.debug('ERROR: timeLoss is %.2f', time_loss) + logger.debug('ERROR: timeLoss is %.2f', time_loss) return default - LOGGER.debug('total timeLoss %.2f', time_loss) + logger.debug('total timeLoss %.2f', time_loss) return time_loss else: - LOGGER.debug('Entity %s not found.', entity) + logger.debug('Entity %s not found.', entity) return default def get_depart(self, entity, default=float('NaN')): @@ -128,20 +140,20 @@ def get_depart(self, entity, default=float('NaN')): If the entity does not exist or does not have the value, it returns the default value. """ if entity in self.tripinfo: - LOGGER.debug('TRIPINFO for %s', entity) + logger.debug('TRIPINFO for %s', entity) if 'depart' in self.tripinfo[entity]: - LOGGER.debug('depart %s', self.tripinfo[entity]['depart']) + logger.debug('depart %s', self.tripinfo[entity]['depart']) return float(self.tripinfo[entity]['depart']) - LOGGER.debug('depart not found.') + logger.debug('depart not found.') elif entity in self.personinfo: - LOGGER.debug('PERSONINFO for %s', entity) - LOGGER.debug('%s', pformat(self.personinfo[entity])) + logger.debug('PERSONINFO for %s', entity) + logger.debug('%s', pformat(self.personinfo[entity])) if 'depart' in self.personinfo[entity]: - LOGGER.debug('depart %s', self.personinfo[entity]['depart']) + logger.debug('depart %s', self.personinfo[entity]['depart']) return float(self.personinfo[entity]['depart']) - LOGGER.debug('depart not found.') + logger.debug('depart not found.') else: - LOGGER.debug('Entity %s not found.', entity) + logger.debug('Entity %s not found.', entity) return default def get_duration(self, entity, default=float('NaN')): @@ -154,14 +166,14 @@ def get_duration(self, entity, default=float('NaN')): If the entity does not exist or does not have the value, it returns the default value. """ if entity in self.tripinfo: - LOGGER.debug('TRIPINFO for %s', entity) + logger.debug('TRIPINFO for %s', entity) if 'duration' in self.tripinfo[entity]: - LOGGER.debug('duration %s', self.tripinfo[entity]['duration']) + logger.debug('duration %s', self.tripinfo[entity]['duration']) return float(self.tripinfo[entity]['duration']) - LOGGER.debug('duration not found.') + logger.debug('duration not found.') elif entity in self.personinfo: - LOGGER.debug('PERSONINFO for %s', entity) - LOGGER.debug('%s', pformat(self.personinfo[entity])) + logger.debug('PERSONINFO for %s', entity) + logger.debug('%s', pformat(self.personinfo[entity])) if 'depart' in self.personinfo[entity]: depart = float(self.personinfo[entity]['depart']) arrival = depart @@ -170,11 +182,11 @@ def get_duration(self, entity, default=float('NaN')): arrival = float(stage['arrival']) duration = arrival - depart if duration > 0: - LOGGER.debug('duration %d', duration) + logger.debug('duration %d', duration) return duration - LOGGER.debug('duration impossible to compute.') + logger.debug('duration impossible to compute.') else: - LOGGER.debug('Entity %s not found.', entity) + logger.debug('Entity %s not found.', entity) return default def get_arrival(self, entity, default=float('NaN')): @@ -187,30 +199,30 @@ def get_arrival(self, entity, default=float('NaN')): If the entity does not exist or does not have the value, it returns the default value. """ if entity in self.tripinfo: - LOGGER.debug('TRIPINFO for %s', entity) + logger.debug('TRIPINFO for %s', entity) if 'arrival' in self.tripinfo[entity]: - LOGGER.debug('arrival %s', self.tripinfo[entity]['arrival']) + logger.debug('arrival %s', self.tripinfo[entity]['arrival']) return float(self.tripinfo[entity]['arrival']) - LOGGER.debug('arrival not found.') + logger.debug('arrival not found.') return default elif entity in self.personinfo: - LOGGER.debug('PERSONINFO for %s', entity) + logger.debug('PERSONINFO for %s', entity) arrival, arrival_found = 0.0, False for _, stage in self.personinfo[entity]['stages']: if 'arrival' in stage: - LOGGER.debug('arrival %s', stage['arrival']) + logger.debug('arrival %s', stage['arrival']) arrival = float(stage['arrival']) arrival_found = True if not arrival_found: - LOGGER.debug('arrival not found.') + logger.debug('arrival not found.') return default if arrival <= 0: - LOGGER.debug('ERROR: arrival is %.2f', arrival) + logger.debug('ERROR: arrival is %.2f', arrival) return default - LOGGER.debug('total arrival %.2f', arrival) + logger.debug('total arrival %.2f', arrival) return arrival else: - LOGGER.debug('Entity %s not found.', entity) + logger.debug('Entity %s not found.', entity) return default def get_global_travel_time(self): From 0b06aef84d707ea6cc6eeeecccb5d3d63ea86c8f Mon Sep 17 00:00:00 2001 From: Lara CODECA Date: Fri, 31 Jul 2020 07:45:44 +0100 Subject: [PATCH 11/15] Added SUMO pid check. --- rllibsumoutils/sumoconnector.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/rllibsumoutils/sumoconnector.py b/rllibsumoutils/sumoconnector.py index 4fa80a6..d2e93fd 100644 --- a/rllibsumoutils/sumoconnector.py +++ b/rllibsumoutils/sumoconnector.py @@ -68,6 +68,8 @@ def __init__(self, config): Param: config: Dict. See DEFAULT_CONFIG. """ + logger.debug('Starting SUMOConnector in process %d.', os.getpid()) + self._config = config # logging @@ -102,6 +104,7 @@ def __init__(self, config): self._initialize_metrics() def __del__(self): + logger.debug('Deleting SUMOConnector in process %d.', os.getpid()) try: self.end_simulation() except KeyError: From c2ab029f70d739fa226c17d03046ae660234f9c3 Mon Sep 17 00:00:00 2001 From: Lara CODECA Date: Fri, 31 Jul 2020 10:34:55 +0100 Subject: [PATCH 12/15] active selection between traci and libsumo in the connector configuration --- example/a3ctrain.py | 1 + example/ppotrain.py | 1 + rllibsumoutils/sumoconnector.py | 18 ++++++++++++++---- 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/example/a3ctrain.py b/example/a3ctrain.py index 15752e6..ae0e8b7 100644 --- a/example/a3ctrain.py +++ b/example/a3ctrain.py @@ -66,6 +66,7 @@ def _main(): scenario_config = deepcopy(marlenvironment.DEFAULT_SCENARIO_CONFING) scenario_config['seed'] = 42 scenario_config['log_level'] = 'INFO' + scenario_config['sumo_config']['sumo_connector'] = 'traci' scenario_config['sumo_config']['sumo_cfg'] = '{}/scenario/sumo.cfg.xml'.format( pathlib.Path(__file__).parent.absolute()) scenario_config['sumo_config']['sumo_params'] = ['--collision.action', 'warn'] diff --git a/example/ppotrain.py b/example/ppotrain.py index 4149eb1..6025bbb 100644 --- a/example/ppotrain.py +++ b/example/ppotrain.py @@ -53,6 +53,7 @@ def _main(): scenario_config = deepcopy(marlenvironment.DEFAULT_SCENARIO_CONFING) scenario_config['seed'] = 42 scenario_config['log_level'] = 'INFO' + scenario_config['sumo_config']['sumo_connector'] = 'libsumo' scenario_config['sumo_config']['sumo_cfg'] = '{}/scenario/sumo.cfg.xml'.format( pathlib.Path(__file__).parent.absolute()) scenario_config['sumo_config']['sumo_params'] = ['--collision.action', 'warn'] diff --git a/rllibsumoutils/sumoconnector.py b/rllibsumoutils/sumoconnector.py index d2e93fd..157a71f 100644 --- a/rllibsumoutils/sumoconnector.py +++ b/rllibsumoutils/sumoconnector.py @@ -17,8 +17,6 @@ if 'SUMO_HOME' in os.environ: sys.path.append(os.path.join(os.environ['SUMO_HOME'], 'tools')) import sumolib - # import traci - import libsumo as traci import traci.constants as tc else: raise Exception("Please declare environment variable 'SUMO_HOME'") @@ -31,6 +29,8 @@ #################################################################################################### DEFAULT_CONFIG = { + # SUMO Connector. Default is libsumo. Possible strings: 'libsumo' or 'traci' + 'sumo_connector': 'libsumo', # SUMO configuration file. Required. String. 'sumo_cfg': None, # Overides . @@ -68,15 +68,25 @@ def __init__(self, config): Param: config: Dict. See DEFAULT_CONFIG. """ - logger.debug('Starting SUMOConnector in process %d.', os.getpid()) - self._config = config # logging level = logging.getLevelName(config['log_level']) logger.setLevel(level) + # libsumo vs TraCI selection + if config['sumo_connector'] == 'libsumo': + import libsumo as traci + elif config['sumo_connector'] == 'traci': + import traci + else: + error = 'ERROR: "{}" is not a valid option for "sumo_connector".'.format( + config['sumo_connector']) + error += ' The possible connectors are "traci" or "libsumo".' + raise Exception(error) + # TraCI Handler and SUMO simulation + logger.debug('Starting SUMOConnector in process %d.', os.getpid()) self._sumo_label = '{}'.format(self._get_unique_id()) self._sumo_output_prefix = '{}{}'.format(config['sumo_output'], self._sumo_label) self._sumo_parameters = ['sumo', '-c', config['sumo_cfg']] From 1f9bf6ec856a869f763f0a120b04ae87b6fc5ac1 Mon Sep 17 00:00:00 2001 From: Lara CODECA Date: Fri, 31 Jul 2020 10:58:24 +0100 Subject: [PATCH 13/15] using the pid to discern the sumo connectors --- rllibsumoutils/sumoconnector.py | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/rllibsumoutils/sumoconnector.py b/rllibsumoutils/sumoconnector.py index 157a71f..5e017ce 100644 --- a/rllibsumoutils/sumoconnector.py +++ b/rllibsumoutils/sumoconnector.py @@ -9,11 +9,9 @@ import logging import os -from random import random import sys -from datetime import datetime -# """ Import SUMO library """ +# Attach $SUMO_HOME/tools to the path to import SUMO libraries if 'SUMO_HOME' in os.environ: sys.path.append(os.path.join(os.environ['SUMO_HOME'], 'tools')) import sumolib @@ -87,7 +85,7 @@ def __init__(self, config): # TraCI Handler and SUMO simulation logger.debug('Starting SUMOConnector in process %d.', os.getpid()) - self._sumo_label = '{}'.format(self._get_unique_id()) + self._sumo_label = '{}'.format(os.getpid()) self._sumo_output_prefix = '{}{}'.format(config['sumo_output'], self._sumo_label) self._sumo_parameters = ['sumo', '-c', config['sumo_cfg']] if config['sumo_params'] is not None: @@ -119,13 +117,6 @@ def __del__(self): self.end_simulation() except KeyError: logger.warning('Simulation %s already closed.', self._sumo_label) - #TODO: delete files - - @staticmethod - def _get_unique_id(): - now = datetime.utcnow() - now_in_sec = (now - datetime(1970, 1, 1)).total_seconds() - return round(random() * now_in_sec) ################################################################################################ From 4dcf9307089d8db9505009c708fa825f8c60bca3 Mon Sep 17 00:00:00 2001 From: Lara CODECA Date: Fri, 31 Jul 2020 10:59:04 +0100 Subject: [PATCH 14/15] remove verbose and stats from the sumo simulation --- example/a3ctrain.py | 2 +- example/ppotrain.py | 3 ++- example/scenario/sumo.cfg.xml | 9 +++++++++ 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/example/a3ctrain.py b/example/a3ctrain.py index ae0e8b7..20ec601 100644 --- a/example/a3ctrain.py +++ b/example/a3ctrain.py @@ -66,7 +66,7 @@ def _main(): scenario_config = deepcopy(marlenvironment.DEFAULT_SCENARIO_CONFING) scenario_config['seed'] = 42 scenario_config['log_level'] = 'INFO' - scenario_config['sumo_config']['sumo_connector'] = 'traci' + scenario_config['sumo_config']['sumo_connector'] = 'libsumo' scenario_config['sumo_config']['sumo_cfg'] = '{}/scenario/sumo.cfg.xml'.format( pathlib.Path(__file__).parent.absolute()) scenario_config['sumo_config']['sumo_params'] = ['--collision.action', 'warn'] diff --git a/example/ppotrain.py b/example/ppotrain.py index 6025bbb..843cda0 100644 --- a/example/ppotrain.py +++ b/example/ppotrain.py @@ -53,10 +53,11 @@ def _main(): scenario_config = deepcopy(marlenvironment.DEFAULT_SCENARIO_CONFING) scenario_config['seed'] = 42 scenario_config['log_level'] = 'INFO' - scenario_config['sumo_config']['sumo_connector'] = 'libsumo' + scenario_config['sumo_config']['sumo_connector'] = 'traci' scenario_config['sumo_config']['sumo_cfg'] = '{}/scenario/sumo.cfg.xml'.format( pathlib.Path(__file__).parent.absolute()) scenario_config['sumo_config']['sumo_params'] = ['--collision.action', 'warn'] + scenario_config['sumo_config']['trace_file'] = True scenario_config['sumo_config']['end_of_sim'] = 3600 # [s] scenario_config['sumo_config']['update_freq'] = 10 # number of traci.simulationStep() # for each learning step. diff --git a/example/scenario/sumo.cfg.xml b/example/scenario/sumo.cfg.xml index 038d45d..b60dd70 100644 --- a/example/scenario/sumo.cfg.xml +++ b/example/scenario/sumo.cfg.xml @@ -12,4 +12,13 @@ + + + + + + + + + From ade8271f876af649e1ebfff54d4c1d01ca3989ee Mon Sep 17 00:00:00 2001 From: Lara CODECA Date: Fri, 31 Jul 2020 11:58:14 +0100 Subject: [PATCH 15/15] fix partial-merging errors --- rllibsumoutils/sumoconnector.py | 21 --------------------- 1 file changed, 21 deletions(-) diff --git a/rllibsumoutils/sumoconnector.py b/rllibsumoutils/sumoconnector.py index e0e941f..2d5f29f 100644 --- a/rllibsumoutils/sumoconnector.py +++ b/rllibsumoutils/sumoconnector.py @@ -17,11 +17,6 @@ if 'SUMO_HOME' in os.environ: sys.path.append(os.path.join(os.environ['SUMO_HOME'], 'tools')) import sumolib -<<<<<<< HEAD -======= - # import traci - import libsumo as traci ->>>>>>> 44b8c6fb6787a0ee5790722dbd93012c71cabe2e import traci.constants as tc else: raise Exception("Please declare environment variable 'SUMO_HOME'") @@ -34,11 +29,8 @@ #################################################################################################### DEFAULT_CONFIG = { -<<<<<<< HEAD # SUMO Connector. Default is libsumo. Possible strings: 'libsumo' or 'traci' 'sumo_connector': 'libsumo', -======= ->>>>>>> 44b8c6fb6787a0ee5790722dbd93012c71cabe2e # SUMO configuration file. Required. String. 'sumo_cfg': None, # Overides . @@ -122,24 +114,11 @@ def __init__(self, config): self._initialize_metrics() def __del__(self): -<<<<<<< HEAD logger.debug('Deleting SUMOConnector in process %d.', os.getpid()) -======= ->>>>>>> 44b8c6fb6787a0ee5790722dbd93012c71cabe2e try: self.end_simulation() except KeyError: logger.warning('Simulation %s already closed.', self._sumo_label) -<<<<<<< HEAD -======= - #TODO: delete files - - @staticmethod - def _get_unique_id(): - now = datetime.utcnow() - now_in_sec = (now - datetime(1970, 1, 1)).total_seconds() - return round(random() * now_in_sec) ->>>>>>> 44b8c6fb6787a0ee5790722dbd93012c71cabe2e ################################################################################################