diff --git a/src/client_node.py b/src/client_node.py index f5b19ed..f3be8e0 100755 --- a/src/client_node.py +++ b/src/client_node.py @@ -6,8 +6,10 @@ import requests import publishermanager as pm from rospy_message_converter import message_converter as mc -from connection import Connection +from sender import Sender +from receiver import Receiver import threading +import socket NODE_NAME = "canopy_client" @@ -25,9 +27,9 @@ def __init__(self, host, port, name, broadcasting, private_key, self.port = port self.name = name.replace(" ", "").replace("/", "") self.use_local_time = use_local_time - self.conn = dict() + self.senders = dict() self.receiver = None - self.descriptionConn = None + self.descriptionSender = None self.subs = dict() self.broadcasting = broadcasting self.private_key = private_key @@ -36,6 +38,7 @@ def __init__(self, host, port, name, broadcasting, private_key, self.leaflets = leaflets self.pub_man = pm.PublisherManager(use_local_time) self.timer = threading.Timer(0.1, self.descriptionSend) + self.socket = None def post_leaflet_urls(self): if len(self.leaflets) > 0: @@ -48,36 +51,35 @@ def post_leaflet_urls(self): # Creates all connections and subscribers and starts them. # Runs a loop that checks for received messages. def run(self): + self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.socket.connect((self.host, self.port)) + self.socket.settimeout(1.0) + while True: + try: + self.socket.sendto("CONNECT:{}:{}".format(self.private_key, self.name), (self.host, self.port)) + reply, addr = self.socket.recvfrom(64) + if reply == "HANDSHAKE": + print "[{}-canopy-client] Connected to server.".format(self.name) + break + except socket.timeout: + print "[{}-canopy-client] Connection timed out. Retrying...".format(self.name) + continue + self.socket.setblocking(1) for topic, msg_type, trusted in self.broadcasting: if topic[0] != "/": topic = "/" + topic self.create_subscriber(topic, msg_type, trusted) - if topic == "/receiving": - rospy.logerr("{}: topic name 'receiving' is reserved".format( - self.name)) - continue - self.conn[topic] = Connection(host, port, "{}{}".format( - self.name, topic), private_key) - self.conn[topic].start() - self.receiver = Connection(host, port, "{}{}".format( - self.name, "/receiving"), private_key) - self.descriptionConn = Connection(host, port, "{}/description".format( - self.name), private_key) + self.senders[topic] = Sender(self.socket) + self.descriptionSender = Sender(self.socket) + self.receiver = Receiver(self.socket) self.receiver.start() - self.descriptionConn.start() self.timer.start() self.post_leaflet_urls() while not rospy.is_shutdown(): - #for key, conn in self.conn.iteritems(): - # updates = conn.updates() updates = self.receiver.updates() for v in updates.values(): self.pub_man.publish(v) - for key, conn in self.conn.iteritems(): - conn.stop() - self.receiver.stop() self.timer.cancel() - self.descriptionConn.stop() # Creates a subscriber for messages of msg_type published on topic. def create_subscriber(self, topic, msg_type, trusted): @@ -110,7 +112,7 @@ def callback(msg): else: msg = self.modify_stamped_message(msg) data["Msg"] = mc.convert_ros_message_to_dictionary(msg) - self.conn[topic].send_message(data) + self.senders[topic].send_message(data) return callback def modify_child_frame_id(self, message): @@ -157,7 +159,7 @@ def descriptionSend(self): msg = dict() msg["data"] = self.description data["Msg"] = msg - self.descriptionConn.send_message(data) + self.descriptionSender.send_message(data) self.timer = threading.Timer(0.1, self.descriptionSend) self.timer.start() diff --git a/src/connection.py b/src/connection.py deleted file mode 100644 index 94040ad..0000000 --- a/src/connection.py +++ /dev/null @@ -1,139 +0,0 @@ -# Defines the Connection class. - -import zlib -import threading -import json -import copy -import struct -import rospy -import time -import tornado.web -import tornado.websocket -import tornado.httpserver -import tornado.ioloop -from std_msgs.msg import Float32 - -# Represents a threaded websocket connection to the server. -class Connection(threading.Thread): - - def __init__(self, host, port, name, private_key): - super(Connection, self).__init__() - self.host = host - self.port = port - self.name = name - self.url = "ws://{}:{}/{}/{}".format(host, port, private_key, name) - self.ioloop = None - self.connection = None - self.values = dict() - self.acknowledged = True - self.timer = threading.Timer - - # Starts the Tornado IOLoop and connects to the websocket. - # Called on thread start. - def run(self): - while self.ioloop is None: - self.ioloop = tornado.ioloop.IOLoop() - tornado.websocket.websocket_connect( - self.url, - self.ioloop, - callback = self.on_connected, - on_message_callback = self.on_message) - self.ioloop.start() - - # Stops the IOLoop - def shutdown(self): - self.ioloop.stop() - - # Called on thread abortion - def stop(self): - self.ioloop.add_callback(self.shutdown) - - # Formats data dictionary as JSON, converts to binary, - # compresses using zlib, and sends to the server. - def send_message_cb(self, data): - payload = json.dumps(data) - frmt = "%ds" % len(payload) - binary = struct.pack(frmt, payload) - binLen = len(binary) - binary = struct.pack('=I' + frmt, binLen, payload) - compressed = zlib.compress(binary) - if not self.connection is None: - if self.acknowledged: - self.acknowledged = False - self.connection.write_message(compressed, True) - self.timer = threading.Timer(1, self.timeout) - self.timer.start() - - # Creates callback to send message in IOLoop. - def send_message(self, data): - if not self.ioloop is None: - self.ioloop.add_callback(self.send_message_cb, data) - - # Returns the formatted last received message. - def updates(self): - payloads = copy.copy(self.values) - self.values = dict() - return payloads - - # Callback for websocket connection. Retries if connection fails. - def on_connected(self, res): - try: - self.connection = res.result() - while self.connection is None: - tornado.websocket.websocket_connect( - self.url, - self.ioloop, - callback = self.on_connected, - on_message_callback = self.on_message) - except Exception, e: - print "Failed to connect: {}".format(e) - tornado.websocket.websocket_connect( - self.url, - self.ioloop, - callback = self.on_connected, - on_message_callback = self.on_message) - - # Callback for message receiving. - # Detects between websocket close, acknowledge packet, or message packet. - # Decompresses messages, converts to unicode, - # and converts from JSON to dictionary. - def on_message(self, payload): - if payload is None: - self.connection = None - print "Server connection closed. Reconnecting..." - tornado.websocket.websocket_connect( - self.url, - self.ioloop, - callback = self.on_connected, - on_message_callback = self.on_message) - if len(payload) == 1: - self.acknowledged = True - try: - self.timer.cancel() - except: - pass - elif len(payload) == 2: - self.connection.write_message(chr(1), True) - else: - decompressed = zlib.decompress(payload) - size = struct.unpack('=I', decompressed[:4]) - frmt = "%ds" % size[0] - unpacked = struct.unpack('=I' + frmt, decompressed) - data = json.loads(unpacked[1]) - self.values[data["Topic"]] = data - self.connection.write_message(chr(1), True) - - def on_close(self): - self.connection = None - print "Server connection closed. Reconnecting..." - tornado.websocket.websocket_connect( - self.url, - self.ioloop, - callback = self.on_connected, - on_message_callback = self.on_message) - - - # Timeout for acknowledge packet. - def timeout(self): - self.acknowledged = True - diff --git a/src/receiver.py b/src/receiver.py new file mode 100644 index 0000000..5b5e5d0 --- /dev/null +++ b/src/receiver.py @@ -0,0 +1,47 @@ +# Defines the Connection class. + +import zlib +import threading +import json +import copy +import struct +import rospy +import time +import socket + +# Represents a threaded websocket connection to the server. +class Receiver(threading.Thread): + + def __init__(self, socket): + super(Receiver, self).__init__() + self.socket = socket + self.values = dict() + + # Starts the Tornado IOLoop and connects to the websocket. + # Called on thread start. + def run(self): + while True: + data = self.socket.recv(65565) + if data == "HANDSHAKE": + continue + self.process_message(data) + + # Returns the formatted last received message. + def updates(self): + payloads = copy.copy(self.values) + self.values = dict() + return payloads + + # Callback for message receiving. + # Decompresses messages, converts to unicode, + # and converts from JSON to dictionary. + def process_message(self, payload): + try: + decompressed = zlib.decompress(payload) + size = struct.unpack('=I', decompressed[:4]) + frmt = "%ds" % size[0] + unpacked = struct.unpack('=I' + frmt, decompressed) + data = json.loads(unpacked[1]) + self.values[data["Topic"]] = data + except: + pass diff --git a/src/sender.py b/src/sender.py new file mode 100644 index 0000000..5fdb9b4 --- /dev/null +++ b/src/sender.py @@ -0,0 +1,43 @@ +# Defines the Connection class. + +import zlib +import threading +import json +import copy +import struct +import rospy +import time +import socket + +# Represents a threaded websocket connection to the server. +class Sender(): + lock = threading.Lock() + + def __init__(self, socket): + self.socket = socket + self.data = None + self.worker = None + self.values = dict() + + # Starts the Tornado IOLoop and connects to the websocket. + # Called on thread start. + def run(self): + self.send_message_cb(self.data) + + # Formats data dictionary as JSON, converts to binary, + # compresses using zlib, and sends to the server. + def send_message_cb(self, data): + payload = json.dumps(data) + frmt = "%ds" % len(payload) + binary = struct.pack(frmt, payload) + binLen = len(binary) + binary = struct.pack('=I' + frmt, binLen, payload) + compressed = zlib.compress(binary) + with Sender.lock: + self.socket.sendall(compressed) + + # Creates callback to send message in IOLoop. + def send_message(self, data): + self.data = data + self.worker = threading.Thread(target = self.run) + self.worker.start()