-
Notifications
You must be signed in to change notification settings - Fork 2
/
main.py
executable file
·218 lines (180 loc) · 8.49 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
#! /usr/bin/env python
"""A script that starts the robot or the main station."""
from threading import Lock
import netifaces
import queue
import sys
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
from typing import Tuple
import cv2
from design.base_station.main_station import MainStation
from design.decision_making.brain import Brain
from design.decision_making.constants import RotationStrategyType, TranslationStrategyType
from design.decision_making.movement_strategy import MovementStrategy
from design.interfacing.hardware_controllers import (AntennaController,
LightsController,
PenController,
WheelsController)
from design.interfacing.interfacing_controller import InterfacingController
from design.interfacing.pen_driver import PenDriver
from design.interfacing.stm32_driver import Stm32Driver
from design.telemetry.commands import CommandHandler
from design.telemetry.selectors import (ClientSelectorFactory,
ServerSelectorFactory)
from design.utils.execution_logger import ExecutionLogger
from design.vision.camera import Camera, CameraSettings
from design.vision.drawing_zone_detector import DrawingZoneDetector
from design.vision.obstacles_detector import ObstaclesDetector
from design.vision.onboard_vision import OnboardVision
from design.vision.robot_detector import RobotDetector
from design.vision.vertices import HighFrequencyFilter, VerticesFinder
from design.vision.world_vision import WorldVision
CAMERA_SETTINGS_FILE = 'config/camera_optimized_values.json'
# Back up the reference to the exceptionhook
sys._excepthook = sys.excepthook
def my_exception_hook(exctype, value, traceback):
# Print the error and traceback
print(exctype, value, traceback)
# Call the normal Exception hook after
sys._excepthook(exctype, value, traceback)
sys.exit(1)
# Set the exception hook to our wrapping function
sys.excepthook = my_exception_hook
def parse_arguments():
parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter,
prog='main.py',
description='Start the main station or the robot.')
subparser = parser.add_subparsers(title='Subcommands',
description='Dispatch arguments to the '
'corresponding entity.')
parent_parser = _create_parent_parser()
main_station_parser = _create_main_station_parser(subparser, parent_parser)
robot_parser = _create_robot_parser(subparser, parent_parser)
return parser.parse_args()
def _create_parent_parser():
parent_parser = ArgumentParser(add_help=False)
networking_group = parent_parser.add_argument_group('networking arguments')
networking_group.add_argument('-p',
'--ports',
nargs=2,
type=int,
required=True,
metavar=('READ_PORT', 'WRITE_PORT'),
help='The ports to bind to')
vision_group = parent_parser.add_argument_group('vision arguments')
vision_group.add_argument('-c',
'--camera_port',
default=cv2.CAP_ANY,
type=int,
metavar='CAMERA_PORT',
help='The port of the camera to connect to')
return parent_parser
def _create_main_station_parser(subparser, parent_parser):
main_station_parser = subparser.add_parser(
'main_station',
parents=[parent_parser],
formatter_class=ArgumentDefaultsHelpFormatter
)
main_station_parser.add_argument('-n',
'--table-number',
type=int,
choices=tuple(range(1, 7)),
default=1,
metavar='TABLE_NUMBER',
help='The table\'s number')
main_station_parser.add_argument('-a',
'--address',
default='127.0.0.1',
type=str,
metavar='HOST_ADDRESS',
dest='host',
help='The host address on which to bind '
'the socket')
main_station_parser.set_defaults(function=start_main_station)
return main_station_parser
def _create_robot_parser(subparser, parent_parser):
robot_parser = subparser.add_parser(
'robot',
parents=[parent_parser],
formatter_class=ArgumentDefaultsHelpFormatter
)
robot_parser.add_argument('-r',
'--approximation-ratio',
default=0.008,
type=float,
metavar='RATIO',
dest='approximation_ratio',
help='The approximation ratio used for '
'`approxPolyDP`')
robot_parser.set_defaults(function=start_robot)
return robot_parser
def start_main_station(arguments):
command_handler = create_command_handler(arguments.host,
arguments.ports,
ClientSelectorFactory)
obstacles_detector = ObstaclesDetector()
robot_detector = RobotDetector()
drawing_zone_detector = DrawingZoneDetector()
with Camera(arguments.camera_port, CameraSettings(width=1600, height=1200), True) as camera:
world_vision = WorldVision(arguments.table_number,
obstacles_detector,
drawing_zone_detector,
robot_detector,
camera)
app = MainStation(sys.argv, command_handler, world_vision)
sys.exit(app.exec_())
def start_robot(arguments):
logger = ExecutionLogger()
command_handler = create_command_handler(
netifaces.ifaddresses('wlp4s0')[2][0]['addr'],
arguments.ports,
ServerSelectorFactory
)
onboard_vision = create_onboard_vision(arguments.camera_port,
arguments.approximation_ratio)
translation_lock = Lock()
rotation_lock = Lock()
interfacing_controller = create_interfacing_controller(logger, translation_lock, rotation_lock)
movement_strategy = create_movement_strategy()
brain = Brain(
command_handler,
interfacing_controller,
logger,
onboard_vision,
movement_strategy,
translation_lock,
rotation_lock
)
brain.main()
def create_interfacing_controller(logger, translation_lock, rotation_lock) -> InterfacingController:
microcontroller_driver = Stm32Driver()
prehensor_driver = PenDriver()
signal_strength_lock = Lock()
signal_data_lock = Lock()
return InterfacingController(
WheelsController(microcontroller_driver, translation_lock, rotation_lock, logger),
AntennaController(microcontroller_driver, signal_strength_lock, signal_data_lock, logger),
PenController(prehensor_driver, logger),
LightsController(microcontroller_driver, logger)
)
def create_movement_strategy() -> MovementStrategy:
return MovementStrategy(
TranslationStrategyType.BASIC_WHEEL_SERVOING,
RotationStrategyType.BASIC_WHEEL_SERVOING
)
def create_command_handler(host: str,
ports: Tuple[int, int],
selector_factory) -> CommandHandler:
consumer = queue.LifoQueue()
producer = queue.LifoQueue()
selector_factory = selector_factory(host, *ports)
selector = selector_factory.create_selector(consumer, producer)
return CommandHandler(selector, consumer, producer)
def create_onboard_vision(camera_port: int,
approximation_ratio: float) -> OnboardVision:
camera = Camera(camera_port, CameraSettings())
vertices_finder = VerticesFinder(HighFrequencyFilter(), approximation_ratio)
return OnboardVision(vertices_finder, camera)
if __name__ == '__main__':
arguments = parse_arguments()
arguments.function(arguments)