Skip to content

Commit

Permalink
added server and client files for cobot mqtt control
Browse files Browse the repository at this point in the history
  • Loading branch information
root committed Sep 20, 2024
1 parent 11bfa59 commit 9d86712
Show file tree
Hide file tree
Showing 4 changed files with 385 additions and 0 deletions.
147 changes: 147 additions & 0 deletions scripts/cobot280pi/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
import paho.mqtt.client as paho
import time
import json
import threading
import matplotlib.pyplot as plt
from queue import Queue
import numpy as np
from PIL import Image

response_queues = {}

def on_connect(client, userdata, flags, rc, properties=None):
print("Connection recieved")

# with this callback you can see if your publish was successful
def on_publish(client, userdata, mid, properties=None):
print("mid: " + str(mid))

def on_message(client, userdata, msg):
if msg.topic not in response_queues:
response_queues[msg.topic] = Queue()
response_queues[msg.topic].put(msg)

class CobotMQTTClient:

def __init__(
self,
hive_mq_username: str,
hive_mq_password: str,
hive_mq_cloud: str,
port: int = 8883
):
self.client = paho.Client(client_id="", userdata=None, protocol=paho.MQTTv5)
self.client.on_connect = on_connect
# client.on_subscribe = on_subscribe
self.client.on_message = on_message
self.client.on_publish = on_publish

self.client.tls_set(tls_version=paho.ssl.PROTOCOL_TLS)
self.client.username_pw_set(hive_mq_username, hive_mq_password)
self.client.connect(hive_mq_cloud, port)

self.client.subscribe("response/query/angles", qos=2)
self.client.subscribe("response/query/coords", qos=2)
self.client.subscribe("response/query/camera", qos=2)
self.client.subscribe("response/control/angles", qos=2)
self.client.subscribe("response/control/coords", qos=2)
self.client.subscribe("response/control/gripper", qos=2)

self.client_loop_thread = threading.Thread(target=self.client.loop_forever)
self.client_loop_thread.start()

def wait_until_response_recieved(self, response_topic: str):
while True:
if response_topic not in response_queues:
time.sleep(5)
print("Waiting for response...")
continue

item = response_queues[response_topic].get()
return json.loads(item.payload)

def send_angles(
self,
angle_list: list[float] = [0.0] * 6,
speed: int = 50
):
assert(type(angle_list) == list)
assert(type(speed) == int)

payload = json.dumps({"args": {"angles": angle_list, "speed": speed}})
self.client.publish("control/angles", payload=payload, qos=2)
response = self.wait_until_response_recieved("response/control/angles")
if not response["success"]:
print(f"error sending angles: \n{response['error_msg']}")
return
print("angles sent successfully")


def send_coords(
self,
coord_list: list[float] = [0.0] * 6,
speed: int = 50
):
assert(type(coord_list) == list)
assert(type(speed) == int)

payload = json.dumps({"args": {"coords": coord_list, "speed": speed}})
self.client.publish("control/coords", payload=payload, qos=2)
response = self.wait_until_response_recieved("response/control/coords")
if not response["success"]:
print(f"error sending coords: \n{response['error_msg']}")
return
print("coords sent successfully")

def send_gripper_value(
self,
value: int = 100,
speed: int = 50
):
assert(type(value) == int)
assert(type(speed) == int)

payload = json.dumps({"args": {"value": value, "speed": speed}})
self.client.publish("control/gripper", payload=payload, qos=2)
response = self.wait_until_response_recieved("response/control/gripper")
if not response["success"]:
print(f"error sending gripper value: \n{response['error_msg']}")
return
print("gripper value sent successfully")

def get_angles(self):
payload = json.dumps({"args": {}})
self.client.publish("query/angles", payload=payload, qos=2)
response = self.wait_until_response_recieved("response/query/angles")
if not response["success"]:
print("Error getting angles")
return None
array = np.array(response["angles"])
return array

def get_coords(self):
payload = json.dumps({"args": {}})
self.client.publish("query/coords", payload=payload, qos=2)
response = self.wait_until_response_recieved("response/query/coords")
if not response["success"]:
print("Error getting coords")
return None
array = np.array(response["coords"])
return array

def get_camera(self):
payload = json.dumps({"args": {}})
self.client.publish("query/camera", payload=payload, qos=2)
response = self.wait_until_response_recieved("response/query/camera")
if not response["success"]:
print(f"could not get image with error: \n{response['error_msg']}")
return None
array = np.array(response["image_pixels"])
array_shape = tuple(response["image_shape"])
array = array.reshape(array_shape)[:, :, ::-1]
return array


if __name__ == "__main__":
from my_secrets import HIVEMQ_HOST, HIVEMQ_PASSWORD, HIVEMQ_USERNAME
client = CobotMQTTClient(HIVEMQ_USERNAME, HIVEMQ_PASSWORD, HIVEMQ_HOST)
41 changes: 41 additions & 0 deletions scripts/cobot280pi/environment.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
name: paho-mqtt
channels:
- defaults
dependencies:
- _libgcc_mutex=0.1=main
- _openmp_mutex=5.1=1_gnu
- bzip2=1.0.8=h5eee18b_6
- ca-certificates=2024.7.2=h06a4308_0
- ld_impl_linux-64=2.38=h1181459_1
- libffi=3.4.4=h6a678d5_1
- libgcc-ng=11.2.0=h1234567_1
- libgomp=11.2.0=h1234567_1
- libstdcxx-ng=11.2.0=h1234567_1
- libuuid=1.41.5=h5eee18b_0
- ncurses=6.4=h6a678d5_0
- openssl=3.0.15=h5eee18b_0
- pip=24.2=py311h06a4308_0
- python=3.11.9=h955ad1f_0
- readline=8.2=h5eee18b_0
- setuptools=72.1.0=py311h06a4308_0
- sqlite=3.45.3=h5eee18b_0
- tk=8.6.14=h39e8969_0
- tzdata=2024a=h04d1e81_0
- wheel=0.44.0=py311h06a4308_0
- xz=5.4.6=h5eee18b_1
- zlib=1.2.13=h5eee18b_1
- pip:
- contourpy==1.3.0
- cycler==0.12.1
- fonttools==4.53.1
- kiwisolver==1.4.7
- matplotlib==3.9.2
- numpy==2.1.1
- opencv-python==4.10.0.84
- packaging==24.1
- paho-mqtt==1.5.1
- pillow==10.4.0
- pyparsing==3.1.4
- python-dateutil==2.9.0.post0
- six==1.16.0
prefix: /root/miniconda3/envs/paho-mqtt
174 changes: 174 additions & 0 deletions scripts/cobot280pi/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
import threading
import time
import queue
import json
import paho.mqtt.client as paho
from my_secrets import *
import cv2
import numpy as np
from utils import setup_logger, truncate_string
from pymycobot.mycobot import MyCobot

logger = setup_logger()

class CobotMQTTServer:
def __init__(self, client):
self.client = client
self.mc = MyCobot("/dev/ttyAMA0", 1000000)
logger.info("Cobot controller initialized")

self.message_queue = queue.Queue()
self.thread = threading.Thread(target=self.queue_thread)
self.thread.daemon = True
self.thread.start()
logger.info("Listening for commands...")

def queue_thread(self):
while True:
item = self.message_queue.get()
if "response" in item.topic:
continue

logger.debug(f"recieved task: {item.topic}, {truncate_string(item.payload)}")
self.handle_message(item)
self.message_queue.task_done()

def push_command(self, message):
self.message_queue.put(message)

def handle_message(self, message):
response_topic = "response/" + message.topic
response_payload = {}

try:
payload = json.loads(message.payload)
except Exception:
response_payload["success"] = False
response_payload["error_msg"] = f"could not parse payload: {truncate_string(message.payload)}"
logger.critical(response_payload["error_msg"])
self.client.publish(response_topic, payload=json.dumps(response_payload), qos=2)
return

if message.topic == 'control/angles':
status = self.handle_control_angle(payload)
elif message.topic == 'control/coords':
status = self.handle_control_coord(payload)
elif message.topic == "control/gripper":
status = self.handle_control_gripper(payload)
elif message.topic == 'query/angles':
status = self.handle_query_angle(payload)
elif message.topic == 'query/coords':
status = self.handle_query_coord(payload)
elif message.topic == 'query/camera':
status = self.handle_query_camera(payload)
else:
response_payload["success"] = False
response_payload["error_msg"] = f"unknown topic {message.topic}"
logger.critical(response_payload["error_msg"])
self.client.publish(response_topic, payload=json.dumps(response_payload), qos=2)
return

self.client.publish(response_topic, payload=json.dumps(status), qos=2)

def handle_control_gripper(self, payload):
args = payload["args"]
try:
self.mc.set_gripper_value(**args)
time.sleep(3)
return {"success": True}
except Exception as e:
logger.critical("control gripper error: " + truncate_string(str(e)))
return {"success": False, "error_msg": str(e)}

def handle_control_angle(self, payload):
args = payload["args"]
try:
self.mc.send_angles(**args)
time.sleep(3)
return {"success": True}
except Exception as e:
logger.critical("control angle error: " + truncate_string(str(e)))
return {"success": False, "error_msg": str(e)}

def handle_control_coord(self, payload):
args = payload["args"]
try:
self.mc.send_coords(**args)
time.sleep(3)
return {"success": True}
except Exception as e:
logger.critical("control coords error: " + truncate_string(str(e)))
return {"success": False, "error_msg": str(e)}

def handle_query_angle(self, payload):
args = payload["args"]
try:
angles = self.mc.get_angles()
time.sleep(3)
if angles is None or len(angles) < 6:
raise Exception("could not read angle")
return {"success": True, "angles": angles}
except Exception as e:
error_msg = str(e)
logger.critical(error_msg)
return {"success": False, "error_msg": truncate_string(error_msg)}

def handle_query_coord(self, payload):
args = payload["args"]
try:
coords = self.mc.get_coords()
time.sleep(3)
if coords is None or len(coords) < 6:
raise Exception("could not read coord")
return {"success": True, "coords": coords}
except Exception as e:
error_msg = str(e)
logger.critical(error_msg)
return {"success": False, "error_msg": truncate_string(error_msg)}

# TODO: fix bug here
def handle_query_camera(self, payload):
args = payload["args"]
try:
webcam = cv2.VideoCapture(0)
_, frame = webcam.read()
frame_arr = np.array(frame)
pixel_list = frame_arr.astype(int).flatten().tolist()
webcam.release()
return {
"success": True,
"image_pixels": pixel_list,
"image_shape": list(frame_arr.shape),
"image_type": frame_arr.dtype.name
}
except Exception as e:
logger.critical(f"error in camera query: {truncate_string(str(e))}")
return {"success": False, "error_msg": str(e)}



def on_connect(client, userdata, flags, rc, properties=None):
print("CONNACK received with code %s." % rc)

def on_publish(client, userdata, mid, properties=None):
print("mid: " + str(mid))

def on_message(client, userdata, msg):
print(msg.topic + " " + str(msg.qos) + " " + str(msg.payload))
handler.push_command(msg)


if __name__ == "__main__":
client = paho.Client(client_id="", userdata=None, protocol=paho.MQTTv5)
client.on_connect = on_connect
client.on_message = on_message
client.on_publish = on_publish

client.tls_set(tls_version=paho.ssl.PROTOCOL_TLS)
client.username_pw_set(HIVEMQ_USERNAME, HIVEMQ_PASSWORD)
client.connect(HIVEMQ_HOST, 8883)

client.subscribe("#", qos=2)

handler = CobotMQTTServer(client)
client.loop_forever()
23 changes: 23 additions & 0 deletions scripts/cobot280pi/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import logging
import sys

def setup_logger(logfile_name: str = "mqttcobot.log"):
logger = logging.getLogger('logger')
logger.setLevel(logging.NOTSET)

console_handler = logging.StreamHandler(sys.stdout)
file_handler = logging.FileHandler(logfile_name)

formatter = logging.Formatter('[%(levelname)s - %(asctime)s]: %(message)s')
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)

logger.addHandler(console_handler)
logger.addHandler(file_handler)
return logger

def truncate_string(s, max_length = 50):
if len(s) <= max_length:
return s
half = (max_length - 3) // 2
return f"{s[:half]}...{s[-half:]}"

0 comments on commit 9d86712

Please sign in to comment.