diff --git a/launch/example.launch b/launch/example.launch index 7fbaac8..c495445 100644 --- a/launch/example.launch +++ b/launch/example.launch @@ -2,25 +2,25 @@ - + - + - + - + publishing: diff --git a/src/client_node.py b/src/client_node.py index c35b255..10a600f 100755 --- a/src/client_node.py +++ b/src/client_node.py @@ -16,8 +16,8 @@ def __init__(self, host, port, name, broadcasting): self.host = host self.port = port self.name = name - self.conn = Connection(host, port, name) - self.conn.start() + self.conn = dict() + self.receiver = None self.subs = dict() self.broadcasting = broadcasting self.pub_man = pm.PublisherManager() @@ -25,11 +25,25 @@ def __init__(self, host, port, name, broadcasting): def run(self): for topic, msg_type, trusted in self.broadcasting: self.create_subscriber(topic, msg_type, trusted) + if topic == "/receiving": + rospy.logerror("{}: topic name 'receiving' is reserved".format( + self.name)) + continue + self.conn[topic] = Connection(host, port, "{}{}".format( + self.name, topic)) + self.conn[topic].start() + self.receiver = Connection(host, port, "{}{}".format( + self.name, "/receiving")) + self.receiver.start() while not rospy.is_shutdown(): - updates = self.conn.updates() + #for key, conn in self.conn.iteritems(): + # updates = conn.updates() + updates = self.receiver.updates() for v in updates.values(): self.pub_man.publish(v) - self.conn.stop() + for key, conn in self.conn.iteritems(): + conn.stop() + self.receiver.stop() def create_subscriber(self, topic, msg_type, trusted): namespace, msg_name = msg_type.split("/") @@ -42,13 +56,13 @@ def create_subscriber(self, topic, msg_type, trusted): def create_callback(self, topic, msg_type, trusted): def callback(msg): data = dict() - data["to"] = trusted.split(' ') - data["from"] = self.name - data["topic"] = "/{}{}".format(self.name, topic) - data["type"] = msg_type - data["stamp"] = time.time() - data["msg"] = mc.convert_ros_message_to_dictionary(msg) - self.conn.send_message(data) + data["To"] = trusted.split(' ') + data["From"] = self.name + data["Topic"] = "/{}{}".format(self.name, topic) + data["Type"] = msg_type + data["Stamp"] = time.time() + data["Msg"] = mc.convert_ros_message_to_dictionary(msg) + self.conn[topic].send_message(data) return callback diff --git a/src/connection.py b/src/connection.py index 5ee68ca..4a5ab59 100644 --- a/src/connection.py +++ b/src/connection.py @@ -13,13 +13,14 @@ from std_msgs.msg import Float32 class Connection(threading.Thread): + def __init__(self, host, port, name): super(Connection, self).__init__() self.host = host self.port = port self.name = name self.url = "ws://{}:{}/{}".format(host, port, name) - self.ioloop = tornado.ioloop.IOLoop.current() + self.ioloop = None self.connection = None self.values = dict() self.acknowledged = True @@ -27,6 +28,8 @@ def __init__(self, host, port, name): self.freqPub = rospy.Publisher("/{}/ping".format(name), Float32, queue_size=0) def run(self): + while self.ioloop is None: + self.ioloop = tornado.ioloop.IOLoop() tornado.websocket.websocket_connect( self.url, self.ioloop, @@ -34,9 +37,12 @@ def run(self): on_message_callback = self.on_message) self.ioloop.start() - def stop(self): + def shutdown(self): self.ioloop.stop() + def stop(self): + self.ioloop.add_callback(self.shutdown) + def send_message_cb(self, data): payload = json.dumps(data) frmt = "%ds" % len(payload) @@ -45,7 +51,6 @@ def send_message_cb(self, data): binary = struct.pack('=I' + frmt, binLen, payload) compressed = zlib.compress(binary) if not self.connection is None: - # rospy.loginfo(self.acknowledged) if self.acknowledged: self.acknowledged = False if binLen > 1000: @@ -65,6 +70,12 @@ def updates(self): def on_connected(self, res): try: self.connection = res.result() + while self.connection is None: + tornado.websocket.websocket_connect( + self.url, + self.ioloop, + callback = self.on_connected, + on_message_callback = self.on_message) except Exception, e: print "Failed to connect: {}".format(e) tornado.websocket.websocket_connect( @@ -84,7 +95,7 @@ def on_message(self, payload): frmt = "%ds" % size[0] unpacked = struct.unpack('=I' + frmt, decompressed) data = json.loads(unpacked[1]) - self.values[data["topic"]] = data + self.values[data["Topic"]] = data def timeout(self): self.acknowledged = True diff --git a/src/publishermanager.py b/src/publishermanager.py index b8beeb8..e856038 100644 --- a/src/publishermanager.py +++ b/src/publishermanager.py @@ -17,8 +17,8 @@ def create_msg(self, msg_dict, msg_type): return msg, msg_cls def publish(self, data): - msg, msg_cls = self.create_msg(data["msg"], data["type"]) - topic = data["topic"] + msg, msg_cls = self.create_msg(data["Msg"], data["Type"]) + topic = data["Topic"] if not topic in self.pubs.keys(): self.pubs[topic] = rospy.Publisher(topic, msg_cls, queue_size=2) self.pubs[topic].publish(msg)