From ff7caacc4ea8a6290654f255c16167da1bb8a9c6 Mon Sep 17 00:00:00 2001 From: Alexander Wallar Date: Sat, 13 Feb 2016 19:29:50 -0500 Subject: [PATCH] Tornado multithreading attempts --- src/server/connection.py | 35 +++++++++++++++++++++++++++++++---- src/server/server_node.py | 11 ++++++----- 2 files changed, 37 insertions(+), 9 deletions(-) diff --git a/src/server/connection.py b/src/server/connection.py index e7257e8..2969392 100644 --- a/src/server/connection.py +++ b/src/server/connection.py @@ -5,12 +5,17 @@ import time import common import struct +import multiprocessing from std_msgs.msg import Float32 import tornado.web import tornado.websocket import tornado.httpserver import tornado.ioloop +from tornado import gen +from concurrent.futures import ProcessPoolExecutor +def call_process(message): + MMServerProtocol.instance.process_message(message) class MMServerProtocol(tornado.websocket.WebSocketHandler): lat_pubs = dict() @@ -21,9 +26,14 @@ def open(self, name): MMServerProtocol.lat_pubs[name] = rospy.Publisher("/jammi/" + name + "/latency", Float32, queue_size=2) print "Connected to: {}".format(name) + self.ioloop = tornado.ioloop.IOLoop.instance() + MMServerProtocol.instance = self + self.pool = multiprocessing.Pool(multiprocessing.cpu_count()) + def send_to_client(self, to, message): + to.write_message(message, True) - def on_message(self, message): + def process_message(self, message, callback): received_time = time.time() decompressed = zlib.decompress(message) size = struct.unpack('=I', decompressed[:4]) @@ -31,17 +41,34 @@ def on_message(self, message): unpacked = struct.unpack('=I' + frmt, decompressed) msg = json.loads(unpacked[1]) acknowledge = struct.pack('=b', 0) - self.write_message(acknowledge, True) + self.ioloop.add_callback( + self.send_to_client, + common.get_client(msg["from"]), acknowledge) latency = Float32() latency.data = received_time - msg["stamp"] MMServerProtocol.lat_pubs[msg["from"]].publish(latency) if msg["to"][0] == "*": for name in common.clients.keys(): if name != msg["from"]: - common.get_client(name).write_message(message, True) + self.ioloop.add_callback( + self.send_to_client, + common.get_client(name), message) else: for name in msg["to"]: - common.get_client(name).write_message(message, True) + self.ioloop.add_callback( + self.send_to_client, + common.get_client(name), message) + return callback(message) + + @tornado.gen.coroutine + def on_message(self, message): + print len(message) + result = yield tornado.gen.Task(self.process_message, message) + return + pool = ProcessPoolExecutor() + fut = pool.submit(call_process, message) + ret = yield fut + pool.shutdown() def on_close(self): common.remove_client(self.name_of_client) diff --git a/src/server/server_node.py b/src/server/server_node.py index 663caed..60674b9 100755 --- a/src/server/server_node.py +++ b/src/server/server_node.py @@ -11,22 +11,23 @@ NODE_NAME = "jammi_server" -settings = {'debug': True} +settings = {'debug': False} app = tornado.web.Application([ (r'/(.*)', connection.MMServerProtocol), ], **settings) def sig_handler(sig, frame): - tornado.ioloop.IOLoop.instance().add_callback(shutdown) + tornado.ioloop.IOLoop.current().add_callback(shutdown) def shutdown(): - tornado.ioloop.IOLoop.instance().stop() + tornado.ioloop.IOLoop.current().stop() def run_server(host, port): url = "ws://{}:{}".format(host, port) http_server = tornado.httpserver.HTTPServer(app) - http_server.listen(port) - tornado.ioloop.IOLoop.instance().start() + http_server.bind(port) + http_server.start(0) + tornado.ioloop.IOLoop.current().start() if __name__ == "__main__": rospy.init_node(NODE_NAME, anonymous=False)