Skip to content

Commit

Permalink
Merge branch 'udp'
Browse files Browse the repository at this point in the history
  • Loading branch information
a20r committed Feb 14, 2018
2 parents 569d1a5 + 9657e09 commit 2ca50f9
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 190 deletions.
21 changes: 4 additions & 17 deletions launch/bar.launch
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,18 @@
<param name="name" value="bar"/>
<param name="host" value="$(arg host)"/>
<param name="port" value="$(arg port)"/>
<param name="private_key" value="PsFXjmWpszr6acSKL" />
<param name="private_key" value="abc" />
<param name="description" value="bar robot" />
<rosparam>
<!--<rosparam>
publishing:
- /filtered_state
types:
- geometry_msgs/PointStamped
trusted:
- foo
</rosparam>
- ".*"
</rosparam> -->
</node>

<node pkg="rostopic" type="rostopic" name="filtered_state_pub"
args="pub -sr 40 /filtered_state geometry_msgs/PointStamped '{header: {stamp: now, frame_id: base_link}, point: {x: 2.0, y: 1.0, z: 1.0}}'" />
<!--<node name="usb_cam" pkg="usb_cam" type="usb_cam_node" output="screen" >
<param name="video_device" value="/dev/video0" />
<param name="image_width" value="640" />
<param name="image_height" value="480" />
<param name="pixel_format" value="yuyv" />
<param name="camera_frame_id" value="usb_cam" />
<param name="io_method" value="mmap"/>
</node>-->
<!-- <node name="image_view" pkg="image_view" type="image_view" respawn="false" output="screen"> -->
<!-- <remap from="image" to="/qux/usb_cam/image_raw"/> -->
<!-- <param name="autosize" value="true" /> -->
<!-- </node> -->

</launch>
6 changes: 3 additions & 3 deletions launch/video_chat_demo.launch
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@
<launch>
<node pkg="canopy_client" type="client_node.py" name="raphael_client_node" output="screen">
<param name="name" value="raphael"/>
<param name="host" value="wallar.me"/>
<param name="host" value="wallarelvo-tower.csail.mit.edu"/>
<param name="port" value="8080"/>
<param name="private_key" value="MIICXAIBAAKBgQCTUQ49FJUCVvU5tIag" />
<param name="private_key" value="MIICXAIBAAKBgQCTUQ49FJUCVvU5tIag" />
<param name="description" value="raphael" />
<rosparam>
publishing:
- /usb_cam/image_raw/compressed
types:
- sensor_msgs/CompressedImage
trusted:
- "*"
- "raphael"
</rosparam>

</node>
Expand Down
52 changes: 29 additions & 23 deletions src/client_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
import requests
import publishermanager as pm
from rospy_message_converter import message_converter as mc
from connection import Connection
from sender import Sender
from receiver import Receiver
import threading
import socket


NODE_NAME = "canopy_client"
Expand All @@ -25,9 +27,9 @@ def __init__(self, host, port, name, broadcasting, private_key,
self.port = port
self.name = name.replace(" ", "").replace("/", "")
self.use_local_time = use_local_time
self.conn = dict()
self.senders = dict()
self.receiver = None
self.descriptionConn = None
self.descriptionSender = None
self.subs = dict()
self.broadcasting = broadcasting
self.private_key = private_key
Expand All @@ -36,6 +38,7 @@ def __init__(self, host, port, name, broadcasting, private_key,
self.leaflets = leaflets
self.pub_man = pm.PublisherManager(use_local_time)
self.timer = threading.Timer(0.1, self.descriptionSend)
self.socket = None

def post_leaflet_urls(self):
if len(self.leaflets) > 0:
Expand All @@ -48,35 +51,38 @@ def post_leaflet_urls(self):
# Creates all connections and subscribers and starts them.
# Runs a loop that checks for received messages.
def run(self):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 65535)
self.socket.setsockopt(socket.SOL_IP, 10, 0)
self.socket.connect((self.host, self.port))
self.socket.sendto("CONNECT:{}:{}".format(self.private_key, self.name), (self.host, self.port))
# self.socket.settimeout(1.0)
# while True:
# try:
# self.socket.sendto("CONNECT:{}:{}".format(self.private_key, self.name), (self.host, self.port))
# reply, addr = self.socket.recvfrom(64)
# if reply == "HANDSHAKE":
# print "[{}-canopy-client] Connected to server.".format(self.name)
# break
# except socket.timeout:
# print "[{}-canopy-client] Connection timed out. Retrying...".format(self.name)
# continue
self.socket.setblocking(1)
for topic, msg_type, trusted in self.broadcasting:
if topic[0] != "/":
topic = "/" + topic
self.create_subscriber(topic, msg_type, trusted)
if topic == "/receiving":
rospy.logerr("{}: topic name 'receiving' is reserved".format(
self.name))
continue
self.conn[topic] = Connection(host, port, "{}{}".format(
self.name, topic), private_key)
self.conn[topic].start()
self.receiver = Connection(host, port, "{}{}".format(
self.name, "/receiving"), private_key)
self.descriptionConn = Connection(host, port, "{}/description".format(
self.name), private_key)
self.senders[topic] = Sender(self.socket)
self.descriptionSender = Sender(self.socket)
self.receiver = Receiver(self.socket)
self.receiver.start()
self.descriptionConn.start()
self.timer.start()
#self.post_leaflet_urls()
self.post_leaflet_urls()
while not rospy.is_shutdown():
updates = self.receiver.updates()
for v in updates.values():
self.pub_man.publish(v)

for key, conn in self.conn.iteritems():
conn.stop()
self.receiver.stop()
self.timer.cancel()
self.descriptionConn.stop()

# Creates a subscriber for messages of msg_type published on topic.
def create_subscriber(self, topic, msg_type, trusted):
Expand Down Expand Up @@ -109,7 +115,7 @@ def callback(msg):
else:
msg = self.modify_stamped_message(msg)
data["Msg"] = mc.convert_ros_message_to_dictionary(msg)
self.conn[topic].send_message(data)
self.senders[topic].send_message(data)
return callback

def modify_child_frame_id(self, message):
Expand Down Expand Up @@ -156,7 +162,7 @@ def descriptionSend(self):
msg = dict()
msg["data"] = self.description
data["Msg"] = msg
self.descriptionConn.send_message(data)
self.descriptionSender.send_message(data)
self.timer = threading.Timer(0.1, self.descriptionSend)
self.timer.start()

Expand Down
147 changes: 0 additions & 147 deletions src/connection.py

This file was deleted.

48 changes: 48 additions & 0 deletions src/receiver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Defines the Connection class.

import zlib
import threading
import json
import copy
import struct
import rospy
import time
import socket

# Represents a threaded websocket connection to the server.
class Receiver(threading.Thread):

def __init__(self, socket):
super(Receiver, self).__init__()
self.socket = socket
self.values = dict()

# Starts the Tornado IOLoop and connects to the websocket.
# Called on thread start.
def run(self):
while True:
data = self.socket.recv(65565)
if data == "HANDSHAKE":
continue
self.process_message(data)

# Returns the formatted last received message.
def updates(self):
#payloads = copy.copy(self.values)
payloads = self.values
self.values = dict()
return payloads

# Callback for message receiving.
# Decompresses messages, converts to unicode,
# and converts from JSON to dictionary.
def process_message(self, payload):
try:
decompressed = zlib.decompress(payload)
size = struct.unpack('=I', decompressed[:4])
frmt = "%ds" % size[0]
unpacked = struct.unpack('=I' + frmt, decompressed)
data = json.loads(unpacked[1])
self.values[data["Topic"]] = data
except:
pass
Loading

0 comments on commit 2ca50f9

Please sign in to comment.