-
Notifications
You must be signed in to change notification settings - Fork 2
/
bitty.py
307 lines (257 loc) · 8.13 KB
/
bitty.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
#! /usr/bin/python
#
# bitty.py: serial-to-network multiplexer for Arduino
#
# This is a serial port proxy. It makes a usbserial port available over the network
# for connection via telnet, nc, or your favorite telnet client.
#
# There are some nice features for Arduino users. You can unplug one Arduino and
# plug in another and bitty will find it. And the network port and baud rate are
# adjustable.
#
# Requires:
# python 2.4 or 2.5
# pyserial from http://pyserial.wiki.sourceforge.net/pySerial
# tested with pyserial 2.4 on OS X and Fedora 4
#
# To use: first, start the serial-to-net proxy and leave it running:
# $ python bitty.py [options]
#
# and then, in another terminal window, connect to it with your favorite telnet or nc program:
# $ nc localhost 8080
# $ telnet localhost 8080
# $ telnet arduino.bitlash.net 8080
#
# plug in Arduino and you should connect up automatically.
# Type 'logout' when done for a clean disconnect.
#
#
# LICENSE
#
# Copyright 2010-2012 by Bill Roy
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
# files (the "Software"), to deal in the Software without
# restriction, including without limitation the rights to use,
# copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following
# conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
#
###############################################################################
__version__ = '1.0'
__copyright__ = 'Copyright 2012 by Bill Roy'
import serial, time, socket, commands, traceback, os, sys
# serial port config
usbdevice = None
baud = 57600
# network config
port = 8080
# options
kbdpassthru = False
showflow = False
# change below here at your own risk
serialport = None
netsocket = None
netclient = None
clientaddress = None
# Net pump thread, and queued interface
import Queue
netq = Queue.Queue()
serialq = Queue.Queue()
def closeSerialPort():
global serialport
if serialport and serialport.isOpen():
print "Closing: ", serialport.portstr
try:
if netclient:
netclient.send("Closing ")
netclient.send(serialport.portstr)
netclient.send("\r\n");
except: pass
serialport.close()
def openSerialPort():
global serialport
closeSerialPort()
# serial port autoconfig
device = usbdevice # command line overrides auto select
if not device:
devicelist = commands.getoutput("ls /dev/tty.usbserial*")
#devicelist = commands.getoutput("ls /dev/ttyUSB*") # this works on the XO/Fedora
if devicelist[0] == '/': device = devicelist
if not device:
print "Waiting for device..."
return False
print "Connecting to", device, baud, "..."
if netclient:
netclient.send('Connecting to ');
netclient.send(device);
netclient.send('... ');
try:
# two stop bits helps paste-to-terminal not lose characters
#serialport = serial.Serial(device, baud, timeout=0, stopbits=serial.STOPBITS_TWO)
serialport = serial.Serial(device, baud, timeout=0)
print 'Opened port: ', serialport.portstr
if netclient:
netclient.send("connected.\r\n")
except:
print 'Failed to open port'
if netclient:
netclient.send("failed.\r\n")
return False
return True
# thread to read and queue serial input
# assumes opening the serial port is handled elsewhere
def serialPumpTask(usbdevice, baud):
while True:
if serialport and serialport.isOpen():
try:
data = serialport.read(1024);
if data:
if showflow: print "SER:",data
serialq.put(data)
else:
time.sleep(0.1)
except:
print "Exception reading serial port"
traceback.print_exc()
closeSerialPort()
else:
time.sleep(1.0)
def kbdPumpTask(d1,d2):
while True:
try:
netq.put(os.read(sys.stdin.fileno(), 1))
except:
print "Exception in keyboard handler"
traceback.print_exc()
# send network data to the serial port
def handleNetworkInput(data):
global serialport
try:
if serialport and serialport.isOpen():
#serialport.write(data);
for i in range(len(data)):
serialport.write(data[i])
time.sleep(0.05)
except:
print "Exception writing serial port"
traceback.print_exc()
closeSerialPort()
# send serial port data to the network socket
def handleSerialInput(data):
global netclient
try:
if netclient: netclient.send(data)
if kbdpassthru:
sys.stdout.write(data)
sys.stdout.flush()
except:
print "Exception writing network port"
traceback.print_exc()
closeSerialPort()
# thread to read and queue network input
def netPumpTask(port, dummy):
global netsocket, netclient, clientaddress
netsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
netsocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
#socket.setblocking(0)
netsocket.bind(('', port))
netsocket.listen(1) # allow no waiting connections
while True:
try:
print "Listening for network connection on", socket.getfqdn(), ':', port, '/', socket.gethostname()
netclient, clientaddress = netsocket.accept()
print "Connection from", clientaddress
netclient.send("g'day from bitty ")
netclient.send(__version__)
netclient.send(" -- type 'logout' to disconnect")
netclient.send("\r\n");
while True:
if not netclient: break # client went away: go get another one
while not openSerialPort():
netclient.send("Waiting for device...\r\n");
time.sleep(2);
while serialport.isOpen():
data = netclient.recv(10240)
if data:
if showflow: print "NET:", data
if data[0:6].find('logout') == 0:
if netclient: netclient.send("Disconnected.\r\n");
netclient.close()
netclient = None
closeSerialPort()
time.sleep(1.0) # pause to allow disconnect
else:
netq.put(data)
else: time.sleep(0.1)
except:
print "Exception in net pump"
traceback.print_exc()
closeSerialPort()
def parseOptions():
# parse command line options
from optparse import OptionParser
usage = "usage: %prog [options]"
parser = OptionParser()
parser.add_option("-p", "--port",
dest="port", type='int',
help="network connection port [8080]")
parser.add_option("-u", "--usbdevice",
dest="usbdevice",
help="name of USB serial port device for serial connection [/dev/tty.usbserial*]")
parser.add_option("-b", "--baud",
dest="baud", type='int',
help="baud rate for port specified by -u [57600]")
parser.add_option("-k", "--keyboard-passthru", action="store_true", dest="kbdpassthru")
parser.add_option("-d", "--debug", action="store_true", dest="debug")
(options, args) = parser.parse_args()
if options.port:
global port
port = options.port
print "Listening for connections on port:", port
if options.baud:
global baud
baud = options.baud
print "Serial port baud rate set to:", baud
if options.usbdevice:
global usbdevice
usbdevice = options.usbdevice
print "Using USB serial device: ", usbdevice
if options.debug:
global showflow
showflow = True
if options.kbdpassthru:
global kbdpassthru
kbdpassthru = True
#if options.password:
# import getpass
# password = getpass.getpass("Password:")
if __name__ == '__main__':
parseOptions()
import thread
net_pump_thread = thread.start_new_thread(netPumpTask, (port, 0))
serial_pump_thread = thread.start_new_thread(serialPumpTask, (usbdevice, baud))
if kbdpassthru:
kbd_pump_thread = thread.start_new_thread(kbdPumpTask, (0,0))
while True:
while not netq.empty():
handleNetworkInput(netq.get())
netq.task_done()
while not serialq.empty():
handleSerialInput(serialq.get())
serialq.task_done()
time.sleep(0.1)