Skip to content

Commit

Permalink
new udp protocol works
Browse files Browse the repository at this point in the history
  • Loading branch information
raphaelchang committed Feb 11, 2018
1 parent 5fb5aa8 commit d210666
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 162 deletions.
48 changes: 25 additions & 23 deletions src/client_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
import requests
import publishermanager as pm
from rospy_message_converter import message_converter as mc
from connection import Connection
from sender import Sender
from receiver import Receiver
import threading
import socket


NODE_NAME = "canopy_client"
Expand All @@ -25,9 +27,9 @@ def __init__(self, host, port, name, broadcasting, private_key,
self.port = port
self.name = name.replace(" ", "").replace("/", "")
self.use_local_time = use_local_time
self.conn = dict()
self.senders = dict()
self.receiver = None
self.descriptionConn = None
self.descriptionSender = None
self.subs = dict()
self.broadcasting = broadcasting
self.private_key = private_key
Expand All @@ -36,6 +38,7 @@ def __init__(self, host, port, name, broadcasting, private_key,
self.leaflets = leaflets
self.pub_man = pm.PublisherManager(use_local_time)
self.timer = threading.Timer(0.1, self.descriptionSend)
self.socket = None

def post_leaflet_urls(self):
if len(self.leaflets) > 0:
Expand All @@ -48,36 +51,35 @@ def post_leaflet_urls(self):
# Creates all connections and subscribers and starts them.
# Runs a loop that checks for received messages.
def run(self):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.connect((self.host, self.port))
self.socket.settimeout(1.0)
while True:
try:
self.socket.sendto("CONNECT:{}:{}".format(self.private_key, self.name), (self.host, self.port))
reply, addr = self.socket.recvfrom(64)
if reply == "HANDSHAKE":
print "[{}-canopy-client] Connected to server.".format(self.name)
break
except socket.timeout:
print "[{}-canopy-client] Connection timed out. Retrying...".format(self.name)
continue
self.socket.setblocking(1)
for topic, msg_type, trusted in self.broadcasting:
if topic[0] != "/":
topic = "/" + topic
self.create_subscriber(topic, msg_type, trusted)
if topic == "/receiving":
rospy.logerr("{}: topic name 'receiving' is reserved".format(
self.name))
continue
self.conn[topic] = Connection(host, port, "{}{}".format(
self.name, topic), private_key)
self.conn[topic].start()
self.receiver = Connection(host, port, "{}{}".format(
self.name, "/receiving"), private_key)
self.descriptionConn = Connection(host, port, "{}/description".format(
self.name), private_key)
self.senders[topic] = Sender(self.socket)
self.descriptionSender = Sender(self.socket)
self.receiver = Receiver(self.socket)
self.receiver.start()
self.descriptionConn.start()
self.timer.start()
self.post_leaflet_urls()
while not rospy.is_shutdown():
#for key, conn in self.conn.iteritems():
# updates = conn.updates()
updates = self.receiver.updates()
for v in updates.values():
self.pub_man.publish(v)
for key, conn in self.conn.iteritems():
conn.stop()
self.receiver.stop()
self.timer.cancel()
self.descriptionConn.stop()

# Creates a subscriber for messages of msg_type published on topic.
def create_subscriber(self, topic, msg_type, trusted):
Expand Down Expand Up @@ -110,7 +112,7 @@ def callback(msg):
else:
msg = self.modify_stamped_message(msg)
data["Msg"] = mc.convert_ros_message_to_dictionary(msg)
self.conn[topic].send_message(data)
self.senders[topic].send_message(data)
return callback

def modify_child_frame_id(self, message):
Expand Down Expand Up @@ -157,7 +159,7 @@ def descriptionSend(self):
msg = dict()
msg["data"] = self.description
data["Msg"] = msg
self.descriptionConn.send_message(data)
self.descriptionSender.send_message(data)
self.timer = threading.Timer(0.1, self.descriptionSend)
self.timer.start()

Expand Down
139 changes: 0 additions & 139 deletions src/connection.py

This file was deleted.

47 changes: 47 additions & 0 deletions src/receiver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Defines the Connection class.

import zlib
import threading
import json
import copy
import struct
import rospy
import time
import socket

# Represents a threaded websocket connection to the server.
class Receiver(threading.Thread):

def __init__(self, socket):
super(Receiver, self).__init__()
self.socket = socket
self.values = dict()

# Starts the Tornado IOLoop and connects to the websocket.
# Called on thread start.
def run(self):
while True:
data = self.socket.recv(65565)
if data == "HANDSHAKE":
continue
self.process_message(data)

# Returns the formatted last received message.
def updates(self):
payloads = copy.copy(self.values)
self.values = dict()
return payloads

# Callback for message receiving.
# Decompresses messages, converts to unicode,
# and converts from JSON to dictionary.
def process_message(self, payload):
try:
decompressed = zlib.decompress(payload)
size = struct.unpack('=I', decompressed[:4])
frmt = "%ds" % size[0]
unpacked = struct.unpack('=I' + frmt, decompressed)
data = json.loads(unpacked[1])
self.values[data["Topic"]] = data
except:
pass
43 changes: 43 additions & 0 deletions src/sender.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Defines the Connection class.

import zlib
import threading
import json
import copy
import struct
import rospy
import time
import socket

# Represents a threaded websocket connection to the server.
class Sender():
lock = threading.Lock()

def __init__(self, socket):
self.socket = socket
self.data = None
self.worker = None
self.values = dict()

# Starts the Tornado IOLoop and connects to the websocket.
# Called on thread start.
def run(self):
self.send_message_cb(self.data)

# Formats data dictionary as JSON, converts to binary,
# compresses using zlib, and sends to the server.
def send_message_cb(self, data):
payload = json.dumps(data)
frmt = "%ds" % len(payload)
binary = struct.pack(frmt, payload)
binLen = len(binary)
binary = struct.pack('=I' + frmt, binLen, payload)
compressed = zlib.compress(binary)
with Sender.lock:
self.socket.sendall(compressed)

# Creates callback to send message in IOLoop.
def send_message(self, data):
self.data = data
self.worker = threading.Thread(target = self.run)
self.worker.start()

0 comments on commit d210666

Please sign in to comment.