-
Notifications
You must be signed in to change notification settings - Fork 1
/
HardDrivin.py
91 lines (72 loc) · 3.13 KB
/
HardDrivin.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# HardDrivin.py
#
import time, threading, yaml
from threading import Timer
from CarControl import CarControl
from OSC import OSCServer
class HardDrivin:
def __init__(self):
try:
f = open('settings.yaml')
self.settings = yaml.load(f)
except Exception, e:
print 'e: %s' % e
print repr(e)
exit(1)
# Receive commands from Pure Data via OSC
self.oscRecvAddress = self.settings['car_control']['osc_recv_addr'], self.settings['car_control']['osc_recv_port']
self.cars = CarControl(self.settings['serial_port'])
self.oscServer = OSCServer(self.oscRecvAddress)
# this registers a 'default' handler (for unmatched messages),
# an /'error' handler, an '/info' handler.
# And, if the client supports it, a '/subscribe' & '/unsubscribe' handler
self.oscServer.addDefaultHandlers()
# user defined handlers
self.oscServer.addMsgHandler("/print", self.printing_handler)
self.oscServer.addMsgHandler("/movecar", self.movecar_handler)
self.oscServer.addMsgHandler("/stopcar", self.stopcar_handler)
def movecar_handler(self, addr, tags, stuff, source):
car, direction, duration = stuff[0], stuff[1], stuff[2]
print "---"
print "Moving car %s %s for %s ms" % (car, direction, duration)
self.cars.move(car, direction, duration);
def stopcar_handler(self, addr, tags, stuff, source):
car = stuff[0]
print "---"
print "Stopping car %s" % car
self.cars.stop(car);
# define a message-handler function for the server to call.
def printing_handler(self, addr, tags, stuff, source):
print "---"
print "received new osc msg from %s" % OSC.getUrlStr(source)
print "with addr : %s" % addr
print "typetags %s" % tags
print "data %s" % stuff
print "---"
def startOSCServer(self):
# Start OSCServer
print "\nStarting OSCServer. Use ctrl-C to quit."
st = threading.Thread( target = self.oscServer.serve_forever )
st.start()
try :
while 1 :
time.sleep(5)
except KeyboardInterrupt :
print "\nClosing OSCServer."
self.oscServer.close()
print "Waiting for Server-thread to finish"
st.join()
print "Done"
if __name__ == "__main__":
print ""
print " __ __ __ ___ _ _ _ "
print " / // /___ ________/ / / _ \____(_)_ __(_)___ ( )"
print " / _ // _ `/ __/ _ / / // / __/ /| |/ / // _ \|/ "
print " /_//_/ \_,_/_/ \_,_/ /____/_/ /_/ |___/_//_//_/ "
print ""
print " __ __ __ __ ___ __ __ "
print " / ` /\ |__) / ` / \ |\ | | |__) / \ | "
print " \__, /~~\ | \ \__, \__/ | \| | | \ \__/ |___ "
print ""
hd = HardDrivin()
hd.startOSCServer()