Skip to content

Commit

Permalink
Tornado multithreading attempts
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander Wallar committed Feb 14, 2016
1 parent 4b048ae commit ff7caac
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 9 deletions.
35 changes: 31 additions & 4 deletions src/server/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,17 @@
import time
import common
import struct
import multiprocessing
from std_msgs.msg import Float32
import tornado.web
import tornado.websocket
import tornado.httpserver
import tornado.ioloop
from tornado import gen
from concurrent.futures import ProcessPoolExecutor

def call_process(message):
MMServerProtocol.instance.process_message(message)

class MMServerProtocol(tornado.websocket.WebSocketHandler):
lat_pubs = dict()
Expand All @@ -21,27 +26,49 @@ def open(self, name):
MMServerProtocol.lat_pubs[name] = rospy.Publisher("/jammi/" + name
+ "/latency", Float32, queue_size=2)
print "Connected to: {}".format(name)
self.ioloop = tornado.ioloop.IOLoop.instance()
MMServerProtocol.instance = self
self.pool = multiprocessing.Pool(multiprocessing.cpu_count())

def send_to_client(self, to, message):
to.write_message(message, True)

def on_message(self, message):
def process_message(self, message, callback):
received_time = time.time()
decompressed = zlib.decompress(message)
size = struct.unpack('=I', decompressed[:4])
frmt = "%ds" % size[0]
unpacked = struct.unpack('=I' + frmt, decompressed)
msg = json.loads(unpacked[1])
acknowledge = struct.pack('=b', 0)
self.write_message(acknowledge, True)
self.ioloop.add_callback(
self.send_to_client,
common.get_client(msg["from"]), acknowledge)
latency = Float32()
latency.data = received_time - msg["stamp"]
MMServerProtocol.lat_pubs[msg["from"]].publish(latency)
if msg["to"][0] == "*":
for name in common.clients.keys():
if name != msg["from"]:
common.get_client(name).write_message(message, True)
self.ioloop.add_callback(
self.send_to_client,
common.get_client(name), message)
else:
for name in msg["to"]:
common.get_client(name).write_message(message, True)
self.ioloop.add_callback(
self.send_to_client,
common.get_client(name), message)
return callback(message)

@tornado.gen.coroutine
def on_message(self, message):
print len(message)
result = yield tornado.gen.Task(self.process_message, message)
return
pool = ProcessPoolExecutor()
fut = pool.submit(call_process, message)
ret = yield fut
pool.shutdown()

def on_close(self):
common.remove_client(self.name_of_client)
11 changes: 6 additions & 5 deletions src/server/server_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,23 @@

NODE_NAME = "jammi_server"

settings = {'debug': True}
settings = {'debug': False}
app = tornado.web.Application([
(r'/(.*)', connection.MMServerProtocol),
], **settings)

def sig_handler(sig, frame):
tornado.ioloop.IOLoop.instance().add_callback(shutdown)
tornado.ioloop.IOLoop.current().add_callback(shutdown)

def shutdown():
tornado.ioloop.IOLoop.instance().stop()
tornado.ioloop.IOLoop.current().stop()

def run_server(host, port):
url = "ws://{}:{}".format(host, port)
http_server = tornado.httpserver.HTTPServer(app)
http_server.listen(port)
tornado.ioloop.IOLoop.instance().start()
http_server.bind(port)
http_server.start(0)
tornado.ioloop.IOLoop.current().start()

if __name__ == "__main__":
rospy.init_node(NODE_NAME, anonymous=False)
Expand Down

0 comments on commit ff7caac

Please sign in to comment.