-
Notifications
You must be signed in to change notification settings - Fork 45
/
Copy pathrun_simulation.py
141 lines (117 loc) · 4.83 KB
/
run_simulation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import logging
import os
import pprint
from pathlib import Path
from shutil import rmtree
from typing import List, Optional, Union
import hydra
import pandas as pd
import pytorch_lightning as pl
from nuplan.common.utils.s3_utils import is_s3_path
from nuplan.planning.script.builders.simulation_builder import build_simulations
from nuplan.planning.script.builders.simulation_callback_builder import (
build_callbacks_worker,
build_simulation_callbacks,
)
from nuplan.planning.script.utils import (
run_runners,
set_default_path,
set_up_common_builder,
)
from nuplan.planning.simulation.planner.abstract_planner import AbstractPlanner
from omegaconf import DictConfig, OmegaConf
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# If set, use the env. variable to overwrite the default dataset and experiment paths
set_default_path()
# If set, use the env. variable to overwrite the Hydra config
CONFIG_PATH = os.getenv("NUPLAN_HYDRA_CONFIG_PATH", "config/simulation")
def print_simulation_results(file=None):
if file is not None:
df = pd.read_parquet(file)
else:
root = Path(os.getcwd()) / "aggregator_metric"
result = list(root.glob("*.parquet"))
result = max(result, key=lambda item: item.stat().st_ctime)
df = pd.read_parquet(result)
final_score = df[df["scenario"] == "final_score"]
final_score = final_score.to_dict(orient="records")[0]
pprint.PrettyPrinter(indent=4).pprint(final_score)
def run_simulation(
cfg: DictConfig,
planners: Optional[Union[AbstractPlanner, List[AbstractPlanner]]] = None,
) -> None:
"""
Execute all available challenges simultaneously on the same scenario. Helper function for main to allow planner to
be specified via config or directly passed as argument.
:param cfg: Configuration that is used to run the experiment.
Already contains the changes merged from the experiment's config to default config.
:param planners: Pre-built planner(s) to run in simulation. Can either be a single planner or list of planners.
"""
# Fix random seed
pl.seed_everything(cfg.seed, workers=True)
profiler_name = "building_simulation"
common_builder = set_up_common_builder(cfg=cfg, profiler_name=profiler_name)
# Build simulation callbacks
callbacks_worker_pool = build_callbacks_worker(cfg)
callbacks = build_simulation_callbacks(
cfg=cfg, output_dir=common_builder.output_dir, worker=callbacks_worker_pool
)
# Remove planner from config to make sure run_simulation does not receive multiple planner specifications.
if planners and "planner" in cfg.keys():
logger.info("Using pre-instantiated planner. Ignoring planner in config")
OmegaConf.set_struct(cfg, False)
cfg.pop("planner")
OmegaConf.set_struct(cfg, True)
# Construct simulations
if isinstance(planners, AbstractPlanner):
planners = [planners]
runners = build_simulations(
cfg=cfg,
callbacks=callbacks,
worker=common_builder.worker,
pre_built_planners=planners,
callbacks_worker=callbacks_worker_pool,
)
if common_builder.profiler:
# Stop simulation construction profiling
common_builder.profiler.save_profiler(profiler_name)
logger.info("Running simulation...")
run_runners(
runners=runners,
common_builder=common_builder,
cfg=cfg,
profiler_name="running_simulation",
)
logger.info("Finished running simulation!")
def clean_up_s3_artifacts() -> None:
"""
Cleanup lingering s3 artifacts that are written locally.
This happens because some minor write-to-s3 functionality isn't yet implemented.
"""
# Lingering artifacts get written locally to a 's3:' directory. Hydra changes
# the working directory to a subdirectory of this, so we serach the working
# path for it.
working_path = os.getcwd()
s3_dirname = "s3:"
s3_ind = working_path.find(s3_dirname)
if s3_ind != -1:
local_s3_path = working_path[: working_path.find(s3_dirname) + len(s3_dirname)]
rmtree(local_s3_path)
@hydra.main(config_path="./config", config_name="default_simulation")
def main(cfg: DictConfig) -> None:
"""
Execute all available challenges simultaneously on the same scenario. Calls run_simulation to allow planner to
be specified via config or directly passed as argument.
:param cfg: Configuration that is used to run the experiment.
Already contains the changes merged from the experiment's config to default config.
"""
assert (
cfg.simulation_log_main_path is None
), "Simulation_log_main_path must not be set when running simulation."
run_simulation(cfg=cfg)
if is_s3_path(Path(cfg.output_dir)):
clean_up_s3_artifacts()
print_simulation_results()
if __name__ == "__main__":
main()