Skip to content

Commit

Permalink
Client acknowledgement
Browse files Browse the repository at this point in the history
  • Loading branch information
raphaelchang committed Feb 7, 2016
1 parent 0393e29 commit 9eadc78
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 24 deletions.
23 changes: 9 additions & 14 deletions launch/example.launch
Original file line number Diff line number Diff line change
@@ -1,32 +1,27 @@
<?xml version="1.0"?>
<launch>
<node pkg="jammi" type="server_node.py" name="jammi_server" output="screen">
<param name="host" value="localhost"/>
<param name="port" value="9000"/>
</node>

<node pkg="jammi" type="client_node.py" name="foo_client_node" output="screen">
<param name="name" value="foo"/>
<param name="host" value="localhost"/>
<param name="port" value="9000"/>
<param name="host" value="wallar.me"/>
<param name="port" value="8080"/>
</node>

<node pkg="jammi" type="client_node.py" name="bar_client_node" output="screen">
<param name="name" value="bar"/>
<param name="host" value="localhost"/>
<param name="port" value="9000"/>
<param name="host" value="wallar.me"/>
<param name="port" value="8080"/>
</node>

<node pkg="jammi" type="client_node.py" name="baz_client_node" output="screen">
<param name="name" value="baz"/>
<param name="host" value="localhost"/>
<param name="port" value="9000"/>
<param name="host" value="wallar.me"/>
<param name="port" value="8080"/>
</node>

<node pkg="jammi" type="client_node.py" name="qux_client_node" output="screen">
<param name="name" value="qux"/>
<param name="host" value="localhost"/>
<param name="port" value="9000"/>
<param name="host" value="wallar.me"/>
<param name="port" value="8080"/>
<rosparam>
publishing:
- /state
Expand All @@ -35,7 +30,7 @@
- geometry_msgs/Point
- geometry_msgs/Point
trusted:
- foo bar baz
- "*"
- foo bar
</rosparam>

Expand Down
2 changes: 1 addition & 1 deletion src/client/client_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def create_subscriber(self, topic, msg_type, trusted):
mod = __import__(namespace + ".msg")
msg_cls = getattr(mod.msg, msg_name)
cb = self.create_callback(topic, msg_type, trusted)
self.subs[topic] = rospy.Subscriber(topic, msg_cls, cb)
self.subs[topic] = rospy.Subscriber(topic, msg_cls, cb, None, 1)
return self

def create_callback(self, topic, msg_type, trusted):
Expand Down
37 changes: 28 additions & 9 deletions src/client/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,52 @@
from twisted.internet import reactor
from autobahn.twisted.websocket import WebSocketClientProtocol
from autobahn.twisted.websocket import WebSocketClientFactory

import rospy

class MMClient(WebSocketClientProtocol):

client = None
updates = dict()
acknowledged = True
timer = threading.Timer

def onConnect(self, reponse):
MMClient.client = self
MMClient.acknowledged = True
MMClient.timer = threading.Timer

def onMessage(self, payload, is_binary):
if not is_binary:
data = json.loads(payload)
MMClient.updates[data["topic"]] = data
else:
decompressed = zlib.decompress(payload)
size = struct.unpack('=I', decompressed[:4])
frmt = "%ds" % size[0]
unpacked = struct.unpack('=I' + frmt, decompressed)
data = json.loads(unpacked[1])
MMClient.updates[data["topic"]] = data
if len(payload) == 1:
MMClient.acknowledged = True
MMClient.timer.cancel()
else:
decompressed = zlib.decompress(payload)
size = struct.unpack('=I', decompressed[:4])
frmt = "%ds" % size[0]
unpacked = struct.unpack('=I' + frmt, decompressed)
data = json.loads(unpacked[1])
MMClient.updates[data["topic"]] = data

def onClose(self, wasClean, code, reason):
rospy.logwarn("WebSocket connection closed: {0}".format(reason))

@staticmethod
def timeout():
MMClient.acknowledged = True

@staticmethod
def send_message(payload, is_binary):
if not MMClient.client is None:
MMClient.client.sendMessage(payload, is_binary)

rospy.loginfo(MMClient.acknowledged)
if MMClient.acknowledged:
MMClient.acknowledged = False
MMClient.client.sendMessage(payload, is_binary)
MMClient.timer = threading.Timer(1, MMClient.timeout)
MMClient.timer.start()

class Connection(threading.Thread):
def __init__(self, host, port, name):
Expand Down

0 comments on commit 9eadc78

Please sign in to comment.