From 05a4ebb80f2ed55bdf0b657c826c6bbe90748387 Mon Sep 17 00:00:00 2001 From: wallarelvo Date: Sun, 31 Jan 2016 20:31:13 +0800 Subject: [PATCH] Transport layer is wokring better now --- src/client/__init__.py | 5 -- src/client/heart.py | 91 ------------------------ src/client/nameserver.py | 146 --------------------------------------- src/client/ws.py | 68 ++++++++++++++++++ src/server/ws.py | 13 ++-- tests/ws_test.py | 4 +- 6 files changed, 78 insertions(+), 249 deletions(-) delete mode 100644 src/client/heart.py delete mode 100644 src/client/nameserver.py create mode 100644 src/client/ws.py diff --git a/src/client/__init__.py b/src/client/__init__.py index 108e783..e69de29 100644 --- a/src/client/__init__.py +++ b/src/client/__init__.py @@ -1,5 +0,0 @@ - -__all__ = ["Heart", "NameServer"] - -from heart import Heart -from nameserver import NameServer diff --git a/src/client/heart.py b/src/client/heart.py deleted file mode 100644 index 269a98a..0000000 --- a/src/client/heart.py +++ /dev/null @@ -1,91 +0,0 @@ - -import socket -import json -import zlib -import rospy -import threading - - -class Heart(object): - """ - A heartbeat class that can be used to send messages over UDP to a group - with a given rate - """ - - def __init__(self, host, port, rate): - """ - Initializes a multicast heart object - - Parameters - ---------- - host: string - The host of the multicast group - - port: integer - The port of the multicast group - - rate: integer - The rate per second that the messages would be sent over UDP - """ - - self.host = host - self.port = port - self.rate = rate - self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - self.data = dict() - self.running = False - - def set_data(self, data): - """ - Sets the data being sent over UDP - - Parameters - ---------- - data: dict - A dictionary to update the current data with - """ - - self.data = data - return self - - def beat(self): - """ - Compresses and sends the formatted data over UDP - - Notes - ----- - This is not to be called independently but instead gets called by - the timed thread in `start` - """ - json_str = json.dumps(self.data) - zip_str = zlib.compress(json_str) - self.sock.sendto(zip_str, (self.host, self.port)) - return self - - def kill(self): - """ - Kills the heartbeat thread - """ - self.running = False - return self - - def start(self): - """ - Starts the hearbeat thread - - Notes - ----- - If the thread has already started, this will throw a RuntimeWarning - """ - if self.running: - raise RuntimeWarning("Heart already running") - else: - def __thread(): - rate = rospy.Rate(self.rate) - while self.running: - self.beat() - rate.sleep() - self.running = True - self.thread = threading.Thread(target=__thread) - self.thread.daemon = True - self.thread.start() diff --git a/src/client/nameserver.py b/src/client/nameserver.py deleted file mode 100644 index ea00499..0000000 --- a/src/client/nameserver.py +++ /dev/null @@ -1,146 +0,0 @@ - -import socket -import zlib -import time -import json -import threading - - -class NameServer(object): - """ - A name server class that is used to aggregate masters that are - sending heartbeat messages over the network. This class is used - for the discovery of other masters - """ - - def __init__(self, host, port, interval): - """ - Initializes a NameServer - - Parameters - ---------- - host: string - The host of the multicast group - - port: integer - The port of the multicast group - - interval: number - The amount of time that can elapse between heartbeat messages - before the remote master is labeled "dead" - """ - - self.host = host - self.port = port - self.interval = interval - self.masters = dict() - self.times = dict() - self.new_connections = list() - self.running = False - self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - self.sock.bind((host, port)) - - def kill(self): - """ - Kills the name server thread - """ - - self.running = False - return self - - def get_alive(self): - """ - Returns the names of masters that are still labeled as "alive" - - Returns - ------- - alive: list of strings - A list of master names that are still alive - """ - - alive = list() - ctime = time.time() - for name, t in self.times.iteritems(): - if ctime - t < self.interval: - alive.append(name) - return alive - - def get_data(self, name): - """ - Gets the data associated with a given master name - - Parameters - ---------- - name: string - The name of the master - - Returns - ------- - A: dict - The associated data of the master - """ - - return self.masters[name] - - def get_time(self, name): - """ - Gets the last time that the master with name, `name`, sent a hearbeat - message - - Parameters - ---------- - name: string - The name of the master - - Returns - ------- - A: float - The last time the master as sent a heartbeat message - """ - - return self.times[name] - - def discover(self): - """ - Discover new masters - - Returns - ------- - nc: list of strings - The new masters on the network - - Notes - ----- - This method resets the `new_connections` variable to an empty list - """ - - nc = self.new_connections[:] - self.new_connections = list() - return nc - - def start(self): - """ - Starts the name server thread - - Notes - ----- - If the name server is already running, a RuntimeWarning will be - thrown - """ - - if self.running: - raise RuntimeWarning("NameServer already running") - else: - def __thread(): - while self.running: - data_zip, _ = self.sock.recvfrom(1024) - data_str = zlib.decompress(data_zip) - data = json.loads(data_str) - if not data["name"] in self.masters: - self.new_connections.append(data["name"]) - self.masters[data["name"]] = data - self.times[data["name"]] = time.time() - self.running = True - self.thread = threading.Thread(target=__thread) - self.thread.daemon = True - self.thread.start() diff --git a/src/client/ws.py b/src/client/ws.py new file mode 100644 index 0000000..5c1c870 --- /dev/null +++ b/src/client/ws.py @@ -0,0 +1,68 @@ + +import threading +import string +import sys +from twisted.internet import reactor +from autobahn.twisted.websocket import WebSocketClientProtocol +from autobahn.twisted.websocket import WebSocketClientFactory + + +test_msg = """ +{"to": "$to", + "from": "$fr", + "topic": "/foo/state", + "type": "geometry_msgs/Point", + "msg": "{'x': 0, 'y': 0, 'z': 0}", + "stamp": $t} +""" + + +class MMClient(WebSocketClientProtocol): + + client = None + + def onConnect(self, reponse): + MMClient.client = self + + def onMessage(self, payload, is_binary): + if not is_binary: + print payload + + @staticmethod + def send_message(payload): + if not MMClient.client is None: + MMClient.client.sendMessage(payload) + + +class Connection(threading.Thread): + def __init__(self, host, port, name): + super(Connection, self).__init__() + self.host = host + self.port = port + self.name = name + self.url = "ws://{}:{}/{}".format(host, port, name) + self.factory = WebSocketClientFactory(self.url, debug=True) + + def run(self): + self.factory.protocol = MMClient + reactor.connectTCP(self.host, self.port, self.factory) + reactor.run(installSignalHandlers=0) + + def stop(self): + reactor.stop() + + def send_message(self, payload): + return MMClient.send_message(payload) + + +if __name__ == "__main__": + import time + conn = Connection("localhost", 9000, sys.argv[1]) + conn.daemon = True + conn.start() + msg_tmp = string.Template(test_msg) + while True: + msg = msg_tmp.substitute( + fr=sys.argv[1], to=sys.argv[2], t=time.time()) + conn.send_message(msg) + time.sleep(0.1) diff --git a/src/server/ws.py b/src/server/ws.py index 3d33956..75db62f 100644 --- a/src/server/ws.py +++ b/src/server/ws.py @@ -16,12 +16,15 @@ def onOpen(self): def onMessage(self, payload, is_binary): if not is_binary: - msg = json.loads(payload) - if msg["to"] == "*": - for name in common.clients.keys(): + try: + msg = json.loads(payload) + if msg["to"] == "*": + for name in common.clients.keys(): + common.get_client(msg["to"]).sendMessage(payload) + else: common.get_client(msg["to"]).sendMessage(payload) - else: - common.get_client(msg["to"]).sendMessage(payload) + except KeyError: + pass def onClose(self, was_clean, code, reason): common.remove_client(self.name_of_client) diff --git a/tests/ws_test.py b/tests/ws_test.py index 949fdea..92646d7 100644 --- a/tests/ws_test.py +++ b/tests/ws_test.py @@ -63,9 +63,9 @@ def create_websocket(): def test_ws_server(): log.startLogging(sys.stdout) - create_websocket() + # create_websocket() factory = WebSocketServerFactory("ws://localhost:9000", debug=True) factory.protocol = server.MMServerProtocol reactor.listenTCP(9000, factory) - task.LoopingCall(print_conns).start(1.0) + # task.LoopingCall(print_conns).start(1.0) reactor.run()