-
Notifications
You must be signed in to change notification settings - Fork 0
/
track2point.py
executable file
·134 lines (127 loc) · 5.24 KB
/
track2point.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#!/bin/python3
import sys
import json
import time
import pointer
import tracker
import multiprocessing.connection
"""Usage
1. `python3 track2point.py` load build-in parameters and track in cli-only
2. `python3 track2point.py xxx.json` load configuration file and track in cli-only
3. `python3 track2point.py xxx.json socket` load configuration file and display
in mouseGUI mode
4. `python3 track2point.py xxx.json disp` load configuration file and display
in integrated window mode
v0.1 first version by hf
"""
# CONSTANT MARKER
FAIL = 0 # lost camera connection
MOVE = 1 # apply track
PARK = 2 # park pointer
LOST = 3 # lost mouse
IGNO = 4 # ignore tracking result
# default configure value
cage_id = 1
cam_idx = 0
roi_x0 = 0
roi_x1 = 600
roi_y0 = 0
roi_y1 = 600
gimbal_enable = False
gimbal_path = "/dev/ttyUSB0"
gimbal_pos_x0 = 75
gimbal_pos_y0 = -5
gimbal_half_width = 370/2
gimbal_height_in_mm = 210
yaw_bias = 0
erode1 = 5
log_dir = "/dev/null"
# TODO: change to unix domain socket for better performance
def init_config():
port = "" # socket port get from argv, zero means only cli
if len(sys.argv) == 1:
print("Using default configuration and dump to file, config.json")
config = {'cage_id':cage_id, 'cam_idx':cam_idx, \
'roi_x0':roi_x0, 'roi_y0':roi_y0, 'roi_x1':roi_x1, 'roi_y1':roi_y1, \
'gimbal_enable':gimbal_enable, 'gimbal_path':gimbal_path, \
'gimbal_pos_x0':gimbal_pos_x0, 'gimbal_pos_y0':gimbal_pos_y0, \
'gimbal_half_width':gimbal_half_width, 'gimbal_height_in_mm':gimbal_height_in_mm, \
'log_dir':log_dir, 'yaw_bias':yaw_bias, 'erode1':5}
with open('config.json', 'w') as config_file:
json.dump(config, config_file, indent=4)
return port, config
elif len(sys.argv) > 1:
config_file = sys.argv[1]
try:
print("Loading configuration from file")
with open(config_file, 'r') as file:
config = json.load(file)
# TODO: Check value error and JSON format
if len(sys.argv) == 3: # get port number by passed argument
port = sys.argv[2]
return port, config
except:
print("Unexpected error:", sys.exc_info()[0])
if __name__=='__main__':
port, config = init_config()
finder = tracker.MouseTracker(config['cam_idx'], [config[i] for i in \
['roi_y0', 'roi_y1', 'roi_x0', 'roi_x1']])
finder.set_data_dir("%s/cage_%d"%(config['log_dir'], config['cage_id']))
finder.set_erode1(config['erode1'])
if config['gimbal_enable']:
light = pointer.Pointer(config['gimbal_path'])
light.set_pointer(config['gimbal_pos_x0'], config['gimbal_pos_y0'], \
config['gimbal_half_width'], config['gimbal_height_in_mm'], config['yaw_bias'])
if port == "": #TODO: add integrated interface mode
print("Entering cli-only mode")
while True:
ret, pos, image = finder.track_mouse()
if ret == MOVE and config['gimbal_enable']:
light.point2mouse(pos[0], pos[1])
print("Found at", pos[0], pos[1])
elif port == "disp":
print("Entering cli with integrated mode")
import cv2 as cv
while True:
ret, pos, image = finder.track_mouse()
print(ret)
if not (ret == FAIL):
cv.imshow('Mouse Tracker', image)
if ret == MOVE and config['gimbal_enable']:
light.point2mouse(pos[0], pos[1])
if cv.waitKey(1) == 27:
break
time.sleep(0.1)
finder.close()
cv.destroyAllWindows()
else: # send image to mosueGUI
print("Entering mouseGUI model")
with multiprocessing.connection.Listener(('localhost', int(port)), authkey=b'cancer') as tracker_server:
with tracker_server.accept() as receiver:
message = None
command = None
while True:
ret, pos, image = finder.track_mouse()
if ret == MOVE and config['gimbal_enable']:
light.point2mouse(pos[0], pos[1])
elif ret == PARK and config['gimbal_enable']:
light.park_gimbal()
if receiver.poll(timeout = 0.003): # blocking or non-blocking
message = receiver.recv()
argument = message[1]
if message[0] == 'live' and ret != FAIL:
receiver.send( image )
elif message[0] == 'close':
finder.close()
if config['gimbal_enable']:
light.park_gimbal()
break
elif message[0] == 'pause':
print("stop gimbal, but keep camera open")
light.disconnect()
elif message[0] == 'resume':
print("resume gimbal")
light.connect()
else:
print("I don't know your command:", message[0])
#TODO: if lose connection, kill this process