Skip to content

Commit

Permalink
Server reimplementation in Go
Browse files Browse the repository at this point in the history
  • Loading branch information
raphaelchang committed Feb 15, 2016
1 parent e33e7fe commit 1fb3976
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 21 deletions.
8 changes: 4 additions & 4 deletions launch/example.launch
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,25 @@
<launch>
<node pkg="roscloud" type="client_node.py" name="foo_client_node" output="screen">
<param name="name" value="foo"/>
<param name="host" value="wallar.me"/>
<param name="host" value="localhost"/>
<param name="port" value="9000"/>
</node>

<node pkg="roscloud" type="client_node.py" name="bar_client_node" output="screen">
<param name="name" value="bar"/>
<param name="host" value="wallar.me"/>
<param name="host" value="localhost"/>
<param name="port" value="9000"/>
</node>

<node pkg="roscloud" type="client_node.py" name="baz_client_node" output="screen">
<param name="name" value="baz"/>
<param name="host" value="wallar.me"/>
<param name="host" value="localhost"/>
<param name="port" value="9000"/>
</node>

<node pkg="roscloud" type="client_node.py" name="qux_client_node" output="screen">
<param name="name" value="qux"/>
<param name="host" value="wallar.me"/>
<param name="host" value="localhost"/>
<param name="port" value="9000"/>
<rosparam>
publishing:
Expand Down
36 changes: 25 additions & 11 deletions src/client_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,34 @@ def __init__(self, host, port, name, broadcasting):
self.host = host
self.port = port
self.name = name
self.conn = Connection(host, port, name)
self.conn.start()
self.conn = dict()
self.receiver = None
self.subs = dict()
self.broadcasting = broadcasting
self.pub_man = pm.PublisherManager()

def run(self):
for topic, msg_type, trusted in self.broadcasting:
self.create_subscriber(topic, msg_type, trusted)
if topic == "/receiving":
rospy.logerror("{}: topic name 'receiving' is reserved".format(
self.name))
continue
self.conn[topic] = Connection(host, port, "{}{}".format(
self.name, topic))
self.conn[topic].start()
self.receiver = Connection(host, port, "{}{}".format(
self.name, "/receiving"))
self.receiver.start()
while not rospy.is_shutdown():
updates = self.conn.updates()
#for key, conn in self.conn.iteritems():
# updates = conn.updates()
updates = self.receiver.updates()
for v in updates.values():
self.pub_man.publish(v)
self.conn.stop()
for key, conn in self.conn.iteritems():
conn.stop()
self.receiver.stop()

def create_subscriber(self, topic, msg_type, trusted):
namespace, msg_name = msg_type.split("/")
Expand All @@ -42,13 +56,13 @@ def create_subscriber(self, topic, msg_type, trusted):
def create_callback(self, topic, msg_type, trusted):
def callback(msg):
data = dict()
data["to"] = trusted.split(' ')
data["from"] = self.name
data["topic"] = "/{}{}".format(self.name, topic)
data["type"] = msg_type
data["stamp"] = time.time()
data["msg"] = mc.convert_ros_message_to_dictionary(msg)
self.conn.send_message(data)
data["To"] = trusted.split(' ')
data["From"] = self.name
data["Topic"] = "/{}{}".format(self.name, topic)
data["Type"] = msg_type
data["Stamp"] = time.time()
data["Msg"] = mc.convert_ros_message_to_dictionary(msg)
self.conn[topic].send_message(data)
return callback


Expand Down
19 changes: 15 additions & 4 deletions src/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,36 @@
from std_msgs.msg import Float32

class Connection(threading.Thread):

def __init__(self, host, port, name):
super(Connection, self).__init__()
self.host = host
self.port = port
self.name = name
self.url = "ws://{}:{}/{}".format(host, port, name)
self.ioloop = tornado.ioloop.IOLoop.current()
self.ioloop = None
self.connection = None
self.values = dict()
self.acknowledged = True
self.timer = threading.Timer
self.freqPub = rospy.Publisher("/{}/ping".format(name), Float32, queue_size=0)

def run(self):
while self.ioloop is None:
self.ioloop = tornado.ioloop.IOLoop()
tornado.websocket.websocket_connect(
self.url,
self.ioloop,
callback = self.on_connected,
on_message_callback = self.on_message)
self.ioloop.start()

def stop(self):
def shutdown(self):
self.ioloop.stop()

def stop(self):
self.ioloop.add_callback(self.shutdown)

def send_message_cb(self, data):
payload = json.dumps(data)
frmt = "%ds" % len(payload)
Expand All @@ -45,7 +51,6 @@ def send_message_cb(self, data):
binary = struct.pack('=I' + frmt, binLen, payload)
compressed = zlib.compress(binary)
if not self.connection is None:
# rospy.loginfo(self.acknowledged)
if self.acknowledged:
self.acknowledged = False
if binLen > 1000:
Expand All @@ -65,6 +70,12 @@ def updates(self):
def on_connected(self, res):
try:
self.connection = res.result()
while self.connection is None:
tornado.websocket.websocket_connect(
self.url,
self.ioloop,
callback = self.on_connected,
on_message_callback = self.on_message)
except Exception, e:
print "Failed to connect: {}".format(e)
tornado.websocket.websocket_connect(
Expand All @@ -84,7 +95,7 @@ def on_message(self, payload):
frmt = "%ds" % size[0]
unpacked = struct.unpack('=I' + frmt, decompressed)
data = json.loads(unpacked[1])
self.values[data["topic"]] = data
self.values[data["Topic"]] = data

def timeout(self):
self.acknowledged = True
Expand Down
4 changes: 2 additions & 2 deletions src/publishermanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ def create_msg(self, msg_dict, msg_type):
return msg, msg_cls

def publish(self, data):
msg, msg_cls = self.create_msg(data["msg"], data["type"])
topic = data["topic"]
msg, msg_cls = self.create_msg(data["Msg"], data["Type"])
topic = data["Topic"]
if not topic in self.pubs.keys():
self.pubs[topic] = rospy.Publisher(topic, msg_cls, queue_size=2)
self.pubs[topic].publish(msg)

0 comments on commit 1fb3976

Please sign in to comment.