-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
110 lines (88 loc) · 3.85 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
from logpy.LogPy import Logger
import threading
# Cameras
from vision.camera import Camera
# Task running
from utils.python_subtask import PythonSubtask
# Sensors
from communication.communication import Communication
from communication.rpi_broker.ahrs import AHRS
from communication.rpi_broker.depth_sensor import DepthSensor
from communication.rpi_broker.hydrophones import Hydrophones
from communication.rpi_broker.distance import DistanceSensor
# Control
from communication.rpi_broker.movements import Movements
from communication.rpi_broker.torpedoes import Torpedoes
from communication.rpi_broker.manipulator import Manipulator
#Task sceduller
from tasks.tasks_scheduler import TaskSchedululer
from tasks.erl_task_scheduler import TaskSchedululer as ErlTaskScheduler
#CHECK MODE
# TODO - replace with checkoing if is conection to simulation or real rpi
from definitions import MAINDEF, CAMERAS
#simulation
if MAINDEF.MODE == "SIMULATION":
from communication.unity_driver import UnityDriver
class Main():
'''
Creates object of all sensor types, packs their references into
a list. Creates Communication thread.
'''
def __init__(self):
'''
Creates and stores references of all slave objects.
'''
mode = MAINDEF.MODE
self.logger = Logger(filename='main', title="Main")
# Simulation initialisation
self.unity_driver = None
if mode == "SIMULATION":
self.unity_driver = UnityDriver()
# cameras creation
PythonSubtask.run("vision/camera_server.py")
self.front_cam1 = Camera("front", mode, self.unity_driver)
self.bottom_camera = Camera("bottom", mode, self.unity_driver)
self.arm_camera = Camera("arm", mode, self.unity_driver)
self.bumper_cam_right = Camera("bumper_right", mode, self.unity_driver)
self.bumper_cam_left = Camera("bumper_left", mode, self.unity_driver)
self.logger.log("Cameras created")
self.cameras = {'arm_camera': self.arm_camera,
'bottom_camera': self.bottom_camera,
'front_cam1': self.front_cam1,
'bumper_cam_right': self.bumper_cam_right,
'bumper_cam_left': self.bumper_cam_left}
#communication
self.communication = Communication()
self.rpi_reference = self.communication.rpi_reference
self.logger.log("communication was established")
# sensors
self.ahrs = AHRS(self.rpi_reference)
self.depth_sensor = DepthSensor(self.rpi_reference)
self.distance_sensor = DistanceSensor(self.rpi_reference)
self.hydrophones = Hydrophones(self.rpi_reference)
self.logger.log("sensors created")
self.sensors = {'ahrs': self.ahrs,
'depth': self.depth_sensor,
'distance': self.distance_sensor,
'hydrophones': self.hydrophones}
#control
self.movements = Movements(self.rpi_reference)
self.torpedoes = Torpedoes(self.rpi_reference)
self.manipulator = Manipulator(self.rpi_reference)
self.logger.log("control objects created")
self.control = {'movements': self.movements,
'torpedoes': self.torpedoes,
'manipulator': self.manipulator}
# task sheduler
if mode == "ROV3":
self.task_scheduler = ErlTaskScheduler(self.control, self.sensors, self.cameras, self.logger)
self.logger.log("ERL task scheduler created")
else:
self.task_scheduler = TaskSchedululer(self.control, self.sensors, self.cameras, self.logger)
self.logger.log("Robosub task scheduler created")
def run(self):
self.logger.log("main thread is running")
self.task_scheduler.run()
if __name__ == "__main__":
main = Main()
main.run()