From 9d86712cd886f8e683c05eaff668e68736303727 Mon Sep 17 00:00:00 2001 From: root Date: Fri, 20 Sep 2024 18:40:57 -0400 Subject: [PATCH 01/13] added server and client files for cobot mqtt control --- scripts/cobot280pi/client.py | 147 ++++++++++++++++++++++++ scripts/cobot280pi/environment.yml | 41 +++++++ scripts/cobot280pi/server.py | 174 +++++++++++++++++++++++++++++ scripts/cobot280pi/utils.py | 23 ++++ 4 files changed, 385 insertions(+) create mode 100644 scripts/cobot280pi/client.py create mode 100644 scripts/cobot280pi/environment.yml create mode 100644 scripts/cobot280pi/server.py create mode 100644 scripts/cobot280pi/utils.py diff --git a/scripts/cobot280pi/client.py b/scripts/cobot280pi/client.py new file mode 100644 index 0000000..f28509f --- /dev/null +++ b/scripts/cobot280pi/client.py @@ -0,0 +1,147 @@ +import paho.mqtt.client as paho +import time +import json +import threading +import matplotlib.pyplot as plt +from queue import Queue +import numpy as np +from PIL import Image + +response_queues = {} + +def on_connect(client, userdata, flags, rc, properties=None): + print("Connection recieved") + +# with this callback you can see if your publish was successful +def on_publish(client, userdata, mid, properties=None): + print("mid: " + str(mid)) + +def on_message(client, userdata, msg): + if msg.topic not in response_queues: + response_queues[msg.topic] = Queue() + response_queues[msg.topic].put(msg) + +class CobotMQTTClient: + + def __init__( + self, + hive_mq_username: str, + hive_mq_password: str, + hive_mq_cloud: str, + port: int = 8883 + ): + self.client = paho.Client(client_id="", userdata=None, protocol=paho.MQTTv5) + self.client.on_connect = on_connect + # client.on_subscribe = on_subscribe + self.client.on_message = on_message + self.client.on_publish = on_publish + + self.client.tls_set(tls_version=paho.ssl.PROTOCOL_TLS) + self.client.username_pw_set(hive_mq_username, hive_mq_password) + self.client.connect(hive_mq_cloud, port) + + self.client.subscribe("response/query/angles", qos=2) + self.client.subscribe("response/query/coords", qos=2) + self.client.subscribe("response/query/camera", qos=2) + self.client.subscribe("response/control/angles", qos=2) + self.client.subscribe("response/control/coords", qos=2) + self.client.subscribe("response/control/gripper", qos=2) + + self.client_loop_thread = threading.Thread(target=self.client.loop_forever) + self.client_loop_thread.start() + + def wait_until_response_recieved(self, response_topic: str): + while True: + if response_topic not in response_queues: + time.sleep(5) + print("Waiting for response...") + continue + + item = response_queues[response_topic].get() + return json.loads(item.payload) + + def send_angles( + self, + angle_list: list[float] = [0.0] * 6, + speed: int = 50 + ): + assert(type(angle_list) == list) + assert(type(speed) == int) + + payload = json.dumps({"args": {"angles": angle_list, "speed": speed}}) + self.client.publish("control/angles", payload=payload, qos=2) + response = self.wait_until_response_recieved("response/control/angles") + if not response["success"]: + print(f"error sending angles: \n{response['error_msg']}") + return + print("angles sent successfully") + + + def send_coords( + self, + coord_list: list[float] = [0.0] * 6, + speed: int = 50 + ): + assert(type(coord_list) == list) + assert(type(speed) == int) + + payload = json.dumps({"args": {"coords": coord_list, "speed": speed}}) + self.client.publish("control/coords", payload=payload, qos=2) + response = self.wait_until_response_recieved("response/control/coords") + if not response["success"]: + print(f"error sending coords: \n{response['error_msg']}") + return + print("coords sent successfully") + + def send_gripper_value( + self, + value: int = 100, + speed: int = 50 + ): + assert(type(value) == int) + assert(type(speed) == int) + + payload = json.dumps({"args": {"value": value, "speed": speed}}) + self.client.publish("control/gripper", payload=payload, qos=2) + response = self.wait_until_response_recieved("response/control/gripper") + if not response["success"]: + print(f"error sending gripper value: \n{response['error_msg']}") + return + print("gripper value sent successfully") + + def get_angles(self): + payload = json.dumps({"args": {}}) + self.client.publish("query/angles", payload=payload, qos=2) + response = self.wait_until_response_recieved("response/query/angles") + if not response["success"]: + print("Error getting angles") + return None + array = np.array(response["angles"]) + return array + + def get_coords(self): + payload = json.dumps({"args": {}}) + self.client.publish("query/coords", payload=payload, qos=2) + response = self.wait_until_response_recieved("response/query/coords") + if not response["success"]: + print("Error getting coords") + return None + array = np.array(response["coords"]) + return array + + def get_camera(self): + payload = json.dumps({"args": {}}) + self.client.publish("query/camera", payload=payload, qos=2) + response = self.wait_until_response_recieved("response/query/camera") + if not response["success"]: + print(f"could not get image with error: \n{response['error_msg']}") + return None + array = np.array(response["image_pixels"]) + array_shape = tuple(response["image_shape"]) + array = array.reshape(array_shape)[:, :, ::-1] + return array + + +if __name__ == "__main__": + from my_secrets import HIVEMQ_HOST, HIVEMQ_PASSWORD, HIVEMQ_USERNAME + client = CobotMQTTClient(HIVEMQ_USERNAME, HIVEMQ_PASSWORD, HIVEMQ_HOST) \ No newline at end of file diff --git a/scripts/cobot280pi/environment.yml b/scripts/cobot280pi/environment.yml new file mode 100644 index 0000000..2327581 --- /dev/null +++ b/scripts/cobot280pi/environment.yml @@ -0,0 +1,41 @@ +name: paho-mqtt +channels: + - defaults +dependencies: + - _libgcc_mutex=0.1=main + - _openmp_mutex=5.1=1_gnu + - bzip2=1.0.8=h5eee18b_6 + - ca-certificates=2024.7.2=h06a4308_0 + - ld_impl_linux-64=2.38=h1181459_1 + - libffi=3.4.4=h6a678d5_1 + - libgcc-ng=11.2.0=h1234567_1 + - libgomp=11.2.0=h1234567_1 + - libstdcxx-ng=11.2.0=h1234567_1 + - libuuid=1.41.5=h5eee18b_0 + - ncurses=6.4=h6a678d5_0 + - openssl=3.0.15=h5eee18b_0 + - pip=24.2=py311h06a4308_0 + - python=3.11.9=h955ad1f_0 + - readline=8.2=h5eee18b_0 + - setuptools=72.1.0=py311h06a4308_0 + - sqlite=3.45.3=h5eee18b_0 + - tk=8.6.14=h39e8969_0 + - tzdata=2024a=h04d1e81_0 + - wheel=0.44.0=py311h06a4308_0 + - xz=5.4.6=h5eee18b_1 + - zlib=1.2.13=h5eee18b_1 + - pip: + - contourpy==1.3.0 + - cycler==0.12.1 + - fonttools==4.53.1 + - kiwisolver==1.4.7 + - matplotlib==3.9.2 + - numpy==2.1.1 + - opencv-python==4.10.0.84 + - packaging==24.1 + - paho-mqtt==1.5.1 + - pillow==10.4.0 + - pyparsing==3.1.4 + - python-dateutil==2.9.0.post0 + - six==1.16.0 +prefix: /root/miniconda3/envs/paho-mqtt diff --git a/scripts/cobot280pi/server.py b/scripts/cobot280pi/server.py new file mode 100644 index 0000000..4079761 --- /dev/null +++ b/scripts/cobot280pi/server.py @@ -0,0 +1,174 @@ +import threading +import time +import queue +import json +import paho.mqtt.client as paho +from my_secrets import * +import cv2 +import numpy as np +from utils import setup_logger, truncate_string +from pymycobot.mycobot import MyCobot + +logger = setup_logger() + +class CobotMQTTServer: + def __init__(self, client): + self.client = client + self.mc = MyCobot("/dev/ttyAMA0", 1000000) + logger.info("Cobot controller initialized") + + self.message_queue = queue.Queue() + self.thread = threading.Thread(target=self.queue_thread) + self.thread.daemon = True + self.thread.start() + logger.info("Listening for commands...") + + def queue_thread(self): + while True: + item = self.message_queue.get() + if "response" in item.topic: + continue + + logger.debug(f"recieved task: {item.topic}, {truncate_string(item.payload)}") + self.handle_message(item) + self.message_queue.task_done() + + def push_command(self, message): + self.message_queue.put(message) + + def handle_message(self, message): + response_topic = "response/" + message.topic + response_payload = {} + + try: + payload = json.loads(message.payload) + except Exception: + response_payload["success"] = False + response_payload["error_msg"] = f"could not parse payload: {truncate_string(message.payload)}" + logger.critical(response_payload["error_msg"]) + self.client.publish(response_topic, payload=json.dumps(response_payload), qos=2) + return + + if message.topic == 'control/angles': + status = self.handle_control_angle(payload) + elif message.topic == 'control/coords': + status = self.handle_control_coord(payload) + elif message.topic == "control/gripper": + status = self.handle_control_gripper(payload) + elif message.topic == 'query/angles': + status = self.handle_query_angle(payload) + elif message.topic == 'query/coords': + status = self.handle_query_coord(payload) + elif message.topic == 'query/camera': + status = self.handle_query_camera(payload) + else: + response_payload["success"] = False + response_payload["error_msg"] = f"unknown topic {message.topic}" + logger.critical(response_payload["error_msg"]) + self.client.publish(response_topic, payload=json.dumps(response_payload), qos=2) + return + + self.client.publish(response_topic, payload=json.dumps(status), qos=2) + + def handle_control_gripper(self, payload): + args = payload["args"] + try: + self.mc.set_gripper_value(**args) + time.sleep(3) + return {"success": True} + except Exception as e: + logger.critical("control gripper error: " + truncate_string(str(e))) + return {"success": False, "error_msg": str(e)} + + def handle_control_angle(self, payload): + args = payload["args"] + try: + self.mc.send_angles(**args) + time.sleep(3) + return {"success": True} + except Exception as e: + logger.critical("control angle error: " + truncate_string(str(e))) + return {"success": False, "error_msg": str(e)} + + def handle_control_coord(self, payload): + args = payload["args"] + try: + self.mc.send_coords(**args) + time.sleep(3) + return {"success": True} + except Exception as e: + logger.critical("control coords error: " + truncate_string(str(e))) + return {"success": False, "error_msg": str(e)} + + def handle_query_angle(self, payload): + args = payload["args"] + try: + angles = self.mc.get_angles() + time.sleep(3) + if angles is None or len(angles) < 6: + raise Exception("could not read angle") + return {"success": True, "angles": angles} + except Exception as e: + error_msg = str(e) + logger.critical(error_msg) + return {"success": False, "error_msg": truncate_string(error_msg)} + + def handle_query_coord(self, payload): + args = payload["args"] + try: + coords = self.mc.get_coords() + time.sleep(3) + if coords is None or len(coords) < 6: + raise Exception("could not read coord") + return {"success": True, "coords": coords} + except Exception as e: + error_msg = str(e) + logger.critical(error_msg) + return {"success": False, "error_msg": truncate_string(error_msg)} + + # TODO: fix bug here + def handle_query_camera(self, payload): + args = payload["args"] + try: + webcam = cv2.VideoCapture(0) + _, frame = webcam.read() + frame_arr = np.array(frame) + pixel_list = frame_arr.astype(int).flatten().tolist() + webcam.release() + return { + "success": True, + "image_pixels": pixel_list, + "image_shape": list(frame_arr.shape), + "image_type": frame_arr.dtype.name + } + except Exception as e: + logger.critical(f"error in camera query: {truncate_string(str(e))}") + return {"success": False, "error_msg": str(e)} + + + +def on_connect(client, userdata, flags, rc, properties=None): + print("CONNACK received with code %s." % rc) + +def on_publish(client, userdata, mid, properties=None): + print("mid: " + str(mid)) + +def on_message(client, userdata, msg): + print(msg.topic + " " + str(msg.qos) + " " + str(msg.payload)) + handler.push_command(msg) + + +if __name__ == "__main__": + client = paho.Client(client_id="", userdata=None, protocol=paho.MQTTv5) + client.on_connect = on_connect + client.on_message = on_message + client.on_publish = on_publish + + client.tls_set(tls_version=paho.ssl.PROTOCOL_TLS) + client.username_pw_set(HIVEMQ_USERNAME, HIVEMQ_PASSWORD) + client.connect(HIVEMQ_HOST, 8883) + + client.subscribe("#", qos=2) + + handler = CobotMQTTServer(client) + client.loop_forever() \ No newline at end of file diff --git a/scripts/cobot280pi/utils.py b/scripts/cobot280pi/utils.py new file mode 100644 index 0000000..f4947fe --- /dev/null +++ b/scripts/cobot280pi/utils.py @@ -0,0 +1,23 @@ +import logging +import sys + +def setup_logger(logfile_name: str = "mqttcobot.log"): + logger = logging.getLogger('logger') + logger.setLevel(logging.NOTSET) + + console_handler = logging.StreamHandler(sys.stdout) + file_handler = logging.FileHandler(logfile_name) + + formatter = logging.Formatter('[%(levelname)s - %(asctime)s]: %(message)s') + console_handler.setFormatter(formatter) + file_handler.setFormatter(formatter) + + logger.addHandler(console_handler) + logger.addHandler(file_handler) + return logger + +def truncate_string(s, max_length = 50): + if len(s) <= max_length: + return s + half = (max_length - 3) // 2 + return f"{s[:half]}...{s[-half:]}" From ed2c478fb6830a6039bb1a38d30c4340a66924ba Mon Sep 17 00:00:00 2001 From: root Date: Mon, 23 Sep 2024 01:11:27 -0400 Subject: [PATCH 02/13] fixed image sending, endpoint schema and logging --- scripts/cobot280pi/.gitignore | 2 + scripts/cobot280pi/README.md | 0 scripts/cobot280pi/client.py | 94 ++++++++++++++++------------------- scripts/cobot280pi/server.py | 68 +++++++++++++++---------- 4 files changed, 85 insertions(+), 79 deletions(-) create mode 100644 scripts/cobot280pi/.gitignore create mode 100644 scripts/cobot280pi/README.md diff --git a/scripts/cobot280pi/.gitignore b/scripts/cobot280pi/.gitignore new file mode 100644 index 0000000..cac4054 --- /dev/null +++ b/scripts/cobot280pi/.gitignore @@ -0,0 +1,2 @@ +my_secrets.py +*.jpg \ No newline at end of file diff --git a/scripts/cobot280pi/README.md b/scripts/cobot280pi/README.md new file mode 100644 index 0000000..e69de29 diff --git a/scripts/cobot280pi/client.py b/scripts/cobot280pi/client.py index f28509f..b3885dd 100644 --- a/scripts/cobot280pi/client.py +++ b/scripts/cobot280pi/client.py @@ -6,6 +6,8 @@ from queue import Queue import numpy as np from PIL import Image +import io +import base64 response_queues = {} @@ -28,11 +30,11 @@ def __init__( hive_mq_username: str, hive_mq_password: str, hive_mq_cloud: str, + cobot_id: str, port: int = 8883 ): self.client = paho.Client(client_id="", userdata=None, protocol=paho.MQTTv5) self.client.on_connect = on_connect - # client.on_subscribe = on_subscribe self.client.on_message = on_message self.client.on_publish = on_publish @@ -40,12 +42,13 @@ def __init__( self.client.username_pw_set(hive_mq_username, hive_mq_password) self.client.connect(hive_mq_cloud, port) - self.client.subscribe("response/query/angles", qos=2) - self.client.subscribe("response/query/coords", qos=2) - self.client.subscribe("response/query/camera", qos=2) - self.client.subscribe("response/control/angles", qos=2) - self.client.subscribe("response/control/coords", qos=2) - self.client.subscribe("response/control/gripper", qos=2) + self.base_endpoint = f"cobot280pi/{cobot_id}/" + self.client.subscribe(self.base_endpoint + "response/query/angles", qos=2) + self.client.subscribe(self.base_endpoint + "response/query/coords", qos=2) + self.client.subscribe(self.base_endpoint + "response/query/camera", qos=2) + self.client.subscribe(self.base_endpoint + "response/control/angles", qos=2) + self.client.subscribe(self.base_endpoint + "response/control/coords", qos=2) + self.client.subscribe(self.base_endpoint + "response/control/gripper", qos=2) self.client_loop_thread = threading.Thread(target=self.client.loop_forever) self.client_loop_thread.start() @@ -69,13 +72,9 @@ def send_angles( assert(type(speed) == int) payload = json.dumps({"args": {"angles": angle_list, "speed": speed}}) - self.client.publish("control/angles", payload=payload, qos=2) - response = self.wait_until_response_recieved("response/control/angles") - if not response["success"]: - print(f"error sending angles: \n{response['error_msg']}") - return - print("angles sent successfully") - + self.client.publish(self.base_endpoint + "control/angles", payload=payload, qos=2) + response = self.wait_until_response_recieved(self.base_endpoint + "response/control/angles") + return response def send_coords( self, @@ -86,12 +85,9 @@ def send_coords( assert(type(speed) == int) payload = json.dumps({"args": {"coords": coord_list, "speed": speed}}) - self.client.publish("control/coords", payload=payload, qos=2) - response = self.wait_until_response_recieved("response/control/coords") - if not response["success"]: - print(f"error sending coords: \n{response['error_msg']}") - return - print("coords sent successfully") + self.client.publish(self.base_endpoint + "control/coords", payload=payload, qos=2) + response = self.wait_until_response_recieved(self.base_endpoint + "response/control/coords") + return response def send_gripper_value( self, @@ -101,47 +97,41 @@ def send_gripper_value( assert(type(value) == int) assert(type(speed) == int) - payload = json.dumps({"args": {"value": value, "speed": speed}}) - self.client.publish("control/gripper", payload=payload, qos=2) - response = self.wait_until_response_recieved("response/control/gripper") - if not response["success"]: - print(f"error sending gripper value: \n{response['error_msg']}") - return - print("gripper value sent successfully") + payload = json.dumps({"args": {"gripper_value": value, "speed": speed}}) + self.client.publish(self.base_endpoint + "control/gripper", payload=payload, qos=2) + response = self.wait_until_response_recieved(self.base_endpoint + "response/control/gripper") + return response def get_angles(self): payload = json.dumps({"args": {}}) - self.client.publish("query/angles", payload=payload, qos=2) - response = self.wait_until_response_recieved("response/query/angles") - if not response["success"]: - print("Error getting angles") - return None - array = np.array(response["angles"]) - return array + self.client.publish(self.base_endpoint + "query/angles", payload=payload, qos=2) + response = self.wait_until_response_recieved(self.base_endpoint + "response/query/angles") + return response def get_coords(self): payload = json.dumps({"args": {}}) - self.client.publish("query/coords", payload=payload, qos=2) - response = self.wait_until_response_recieved("response/query/coords") - if not response["success"]: - print("Error getting coords") - return None - array = np.array(response["coords"]) - return array + self.client.publish(self.base_endpoint + "query/coords", payload=payload, qos=2) + response = self.wait_until_response_recieved(self.base_endpoint + "response/query/coords") + return response - def get_camera(self): + def get_camera(self, save_path=None): payload = json.dumps({"args": {}}) - self.client.publish("query/camera", payload=payload, qos=2) - response = self.wait_until_response_recieved("response/query/camera") + self.client.publish(self.base_endpoint + "query/camera", payload=payload, qos=2) + response = self.wait_until_response_recieved(self.base_endpoint + "response/query/camera") if not response["success"]: - print(f"could not get image with error: \n{response['error_msg']}") - return None - array = np.array(response["image_pixels"]) - array_shape = tuple(response["image_shape"]) - array = array.reshape(array_shape)[:, :, ::-1] - return array + return response + + b64_bytes = base64.b64decode(response["img_bytes"]) + img_bytes = io.BytesIO(b64_bytes) + img = Image.open(img_bytes) + + response["image"] = img + if save_path is not None: + img.save(save_path) + + return response if __name__ == "__main__": - from my_secrets import HIVEMQ_HOST, HIVEMQ_PASSWORD, HIVEMQ_USERNAME - client = CobotMQTTClient(HIVEMQ_USERNAME, HIVEMQ_PASSWORD, HIVEMQ_HOST) \ No newline at end of file + from my_secrets import HIVEMQ_HOST, HIVEMQ_PASSWORD, HIVEMQ_USERNAME, COBOT_ID + client = CobotMQTTClient(HIVEMQ_USERNAME, HIVEMQ_PASSWORD, HIVEMQ_HOST, COBOT_ID) \ No newline at end of file diff --git a/scripts/cobot280pi/server.py b/scripts/cobot280pi/server.py index 4079761..bf3ab2e 100644 --- a/scripts/cobot280pi/server.py +++ b/scripts/cobot280pi/server.py @@ -1,19 +1,23 @@ import threading +import io import time import queue import json import paho.mqtt.client as paho from my_secrets import * +import base64 import cv2 import numpy as np +from PIL import Image from utils import setup_logger, truncate_string from pymycobot.mycobot import MyCobot logger = setup_logger() class CobotMQTTServer: - def __init__(self, client): + def __init__(self, client, base_endpoint): self.client = client + self.base_endpoint = base_endpoint self.mc = MyCobot("/dev/ttyAMA0", 1000000) logger.info("Cobot controller initialized") @@ -26,18 +30,16 @@ def __init__(self, client): def queue_thread(self): while True: item = self.message_queue.get() - if "response" in item.topic: - continue - - logger.debug(f"recieved task: {item.topic}, {truncate_string(item.payload)}") self.handle_message(item) self.message_queue.task_done() + def push_command(self, message): self.message_queue.put(message) def handle_message(self, message): - response_topic = "response/" + message.topic + split_topic = message.topic.split("/") + response_topic = "/".join(split_topic[:2] + ["response"] + split_topic[2:]) response_payload = {} try: @@ -49,26 +51,31 @@ def handle_message(self, message): self.client.publish(response_topic, payload=json.dumps(response_payload), qos=2) return - if message.topic == 'control/angles': + logger.info(f"Processing task with parameters: \n\ttopic: {message.topic}\n\tjson_payload: {payload}") + + if message.topic == self.base_endpoint + 'control/angles': status = self.handle_control_angle(payload) - elif message.topic == 'control/coords': + elif message.topic == self.base_endpoint + 'control/coords': status = self.handle_control_coord(payload) - elif message.topic == "control/gripper": + elif message.topic == self.base_endpoint + "control/gripper": status = self.handle_control_gripper(payload) - elif message.topic == 'query/angles': + elif message.topic == self.base_endpoint + 'query/angles': status = self.handle_query_angle(payload) - elif message.topic == 'query/coords': + elif message.topic == self.base_endpoint + 'query/coords': status = self.handle_query_coord(payload) - elif message.topic == 'query/camera': + elif message.topic == self.base_endpoint + 'query/camera': status = self.handle_query_camera(payload) else: response_payload["success"] = False response_payload["error_msg"] = f"unknown topic {message.topic}" logger.critical(response_payload["error_msg"]) self.client.publish(response_topic, payload=json.dumps(response_payload), qos=2) + return - self.client.publish(response_topic, payload=json.dumps(status), qos=2) + response_payload = json.dumps(status) + self.client.publish(response_topic, payload=response_payload, qos=2) + logger.info(f"attempted publish on: \n\ttopic: {response_topic}\n\tpayload: {truncate_string(response_payload)}") def handle_control_gripper(self, payload): args = payload["args"] @@ -126,20 +133,24 @@ def handle_query_coord(self, payload): logger.critical(error_msg) return {"success": False, "error_msg": truncate_string(error_msg)} - # TODO: fix bug here - def handle_query_camera(self, payload): + def handle_query_camera(self, payload, quality=100): args = payload["args"] try: webcam = cv2.VideoCapture(0) _, frame = webcam.read() - frame_arr = np.array(frame) - pixel_list = frame_arr.astype(int).flatten().tolist() webcam.release() + + img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) + compressed_bytes = io.BytesIO() + img.save(compressed_bytes, format="JPEG", quality=quality) + compressed_bytes.seek(0) + byte_str = compressed_bytes.read() + + byte_str_base64 = base64.b64encode(byte_str).decode('utf-8') + return { "success": True, - "image_pixels": pixel_list, - "image_shape": list(frame_arr.shape), - "image_type": frame_arr.dtype.name + "img_bytes": byte_str_base64, } except Exception as e: logger.critical(f"error in camera query: {truncate_string(str(e))}") @@ -148,15 +159,17 @@ def handle_query_camera(self, payload): def on_connect(client, userdata, flags, rc, properties=None): - print("CONNACK received with code %s." % rc) + logger.info("Connection received with code %s." % rc) def on_publish(client, userdata, mid, properties=None): - print("mid: " + str(mid)) + logger.info("Successful publish.") def on_message(client, userdata, msg): - print(msg.topic + " " + str(msg.qos) + " " + str(msg.payload)) - handler.push_command(msg) - + if "response" not in msg.topic: + logger.info(f"Recieved message with: \n\ttopic: {msg.topic}\n\tqos: {msg.qos}\n\tpayload: {truncate_string(msg.payload)}") + handler.push_command(msg) + else: + logger.info(f"recieved response on {msg.topic}") if __name__ == "__main__": client = paho.Client(client_id="", userdata=None, protocol=paho.MQTTv5) @@ -168,7 +181,8 @@ def on_message(client, userdata, msg): client.username_pw_set(HIVEMQ_USERNAME, HIVEMQ_PASSWORD) client.connect(HIVEMQ_HOST, 8883) - client.subscribe("#", qos=2) + base_endpoint = f"cobot280pi/{COBOT_ID}/" + client.subscribe(base_endpoint + "#", qos=2) - handler = CobotMQTTServer(client) + handler = CobotMQTTServer(client, base_endpoint) client.loop_forever() \ No newline at end of file From 4a35ca91705e65a68d46ba62f3d571535ef786e0 Mon Sep 17 00:00:00 2001 From: Gursimar Singh Date: Thu, 26 Sep 2024 12:42:44 -0400 Subject: [PATCH 03/13] remove threading from client, resutrcture endpoints, added get_gripper_value --- scripts/cobot280pi/client.py | 137 ------------- scripts/cobot280pi/server.py | 188 ------------------ .../ac_training_lab}/cobot280pi/.gitignore | 0 .../ac_training_lab}/cobot280pi/README.md | 0 src/ac_training_lab/cobot280pi/client.py | 110 ++++++++++ .../cobot280pi/environment.yml | 12 +- src/ac_training_lab/cobot280pi/server.py | 178 +++++++++++++++++ .../ac_training_lab}/cobot280pi/utils.py | 0 8 files changed, 298 insertions(+), 327 deletions(-) delete mode 100644 scripts/cobot280pi/client.py delete mode 100644 scripts/cobot280pi/server.py rename {scripts => src/ac_training_lab}/cobot280pi/.gitignore (100%) rename {scripts => src/ac_training_lab}/cobot280pi/README.md (100%) create mode 100644 src/ac_training_lab/cobot280pi/client.py rename {scripts => src/ac_training_lab}/cobot280pi/environment.yml (80%) create mode 100644 src/ac_training_lab/cobot280pi/server.py rename {scripts => src/ac_training_lab}/cobot280pi/utils.py (100%) diff --git a/scripts/cobot280pi/client.py b/scripts/cobot280pi/client.py deleted file mode 100644 index b3885dd..0000000 --- a/scripts/cobot280pi/client.py +++ /dev/null @@ -1,137 +0,0 @@ -import paho.mqtt.client as paho -import time -import json -import threading -import matplotlib.pyplot as plt -from queue import Queue -import numpy as np -from PIL import Image -import io -import base64 - -response_queues = {} - -def on_connect(client, userdata, flags, rc, properties=None): - print("Connection recieved") - -# with this callback you can see if your publish was successful -def on_publish(client, userdata, mid, properties=None): - print("mid: " + str(mid)) - -def on_message(client, userdata, msg): - if msg.topic not in response_queues: - response_queues[msg.topic] = Queue() - response_queues[msg.topic].put(msg) - -class CobotMQTTClient: - - def __init__( - self, - hive_mq_username: str, - hive_mq_password: str, - hive_mq_cloud: str, - cobot_id: str, - port: int = 8883 - ): - self.client = paho.Client(client_id="", userdata=None, protocol=paho.MQTTv5) - self.client.on_connect = on_connect - self.client.on_message = on_message - self.client.on_publish = on_publish - - self.client.tls_set(tls_version=paho.ssl.PROTOCOL_TLS) - self.client.username_pw_set(hive_mq_username, hive_mq_password) - self.client.connect(hive_mq_cloud, port) - - self.base_endpoint = f"cobot280pi/{cobot_id}/" - self.client.subscribe(self.base_endpoint + "response/query/angles", qos=2) - self.client.subscribe(self.base_endpoint + "response/query/coords", qos=2) - self.client.subscribe(self.base_endpoint + "response/query/camera", qos=2) - self.client.subscribe(self.base_endpoint + "response/control/angles", qos=2) - self.client.subscribe(self.base_endpoint + "response/control/coords", qos=2) - self.client.subscribe(self.base_endpoint + "response/control/gripper", qos=2) - - self.client_loop_thread = threading.Thread(target=self.client.loop_forever) - self.client_loop_thread.start() - - def wait_until_response_recieved(self, response_topic: str): - while True: - if response_topic not in response_queues: - time.sleep(5) - print("Waiting for response...") - continue - - item = response_queues[response_topic].get() - return json.loads(item.payload) - - def send_angles( - self, - angle_list: list[float] = [0.0] * 6, - speed: int = 50 - ): - assert(type(angle_list) == list) - assert(type(speed) == int) - - payload = json.dumps({"args": {"angles": angle_list, "speed": speed}}) - self.client.publish(self.base_endpoint + "control/angles", payload=payload, qos=2) - response = self.wait_until_response_recieved(self.base_endpoint + "response/control/angles") - return response - - def send_coords( - self, - coord_list: list[float] = [0.0] * 6, - speed: int = 50 - ): - assert(type(coord_list) == list) - assert(type(speed) == int) - - payload = json.dumps({"args": {"coords": coord_list, "speed": speed}}) - self.client.publish(self.base_endpoint + "control/coords", payload=payload, qos=2) - response = self.wait_until_response_recieved(self.base_endpoint + "response/control/coords") - return response - - def send_gripper_value( - self, - value: int = 100, - speed: int = 50 - ): - assert(type(value) == int) - assert(type(speed) == int) - - payload = json.dumps({"args": {"gripper_value": value, "speed": speed}}) - self.client.publish(self.base_endpoint + "control/gripper", payload=payload, qos=2) - response = self.wait_until_response_recieved(self.base_endpoint + "response/control/gripper") - return response - - def get_angles(self): - payload = json.dumps({"args": {}}) - self.client.publish(self.base_endpoint + "query/angles", payload=payload, qos=2) - response = self.wait_until_response_recieved(self.base_endpoint + "response/query/angles") - return response - - def get_coords(self): - payload = json.dumps({"args": {}}) - self.client.publish(self.base_endpoint + "query/coords", payload=payload, qos=2) - response = self.wait_until_response_recieved(self.base_endpoint + "response/query/coords") - return response - - def get_camera(self, save_path=None): - payload = json.dumps({"args": {}}) - self.client.publish(self.base_endpoint + "query/camera", payload=payload, qos=2) - response = self.wait_until_response_recieved(self.base_endpoint + "response/query/camera") - if not response["success"]: - return response - - b64_bytes = base64.b64decode(response["img_bytes"]) - img_bytes = io.BytesIO(b64_bytes) - img = Image.open(img_bytes) - - response["image"] = img - if save_path is not None: - img.save(save_path) - - return response - - -if __name__ == "__main__": - from my_secrets import HIVEMQ_HOST, HIVEMQ_PASSWORD, HIVEMQ_USERNAME, COBOT_ID - client = CobotMQTTClient(HIVEMQ_USERNAME, HIVEMQ_PASSWORD, HIVEMQ_HOST, COBOT_ID) \ No newline at end of file diff --git a/scripts/cobot280pi/server.py b/scripts/cobot280pi/server.py deleted file mode 100644 index bf3ab2e..0000000 --- a/scripts/cobot280pi/server.py +++ /dev/null @@ -1,188 +0,0 @@ -import threading -import io -import time -import queue -import json -import paho.mqtt.client as paho -from my_secrets import * -import base64 -import cv2 -import numpy as np -from PIL import Image -from utils import setup_logger, truncate_string -from pymycobot.mycobot import MyCobot - -logger = setup_logger() - -class CobotMQTTServer: - def __init__(self, client, base_endpoint): - self.client = client - self.base_endpoint = base_endpoint - self.mc = MyCobot("/dev/ttyAMA0", 1000000) - logger.info("Cobot controller initialized") - - self.message_queue = queue.Queue() - self.thread = threading.Thread(target=self.queue_thread) - self.thread.daemon = True - self.thread.start() - logger.info("Listening for commands...") - - def queue_thread(self): - while True: - item = self.message_queue.get() - self.handle_message(item) - self.message_queue.task_done() - - - def push_command(self, message): - self.message_queue.put(message) - - def handle_message(self, message): - split_topic = message.topic.split("/") - response_topic = "/".join(split_topic[:2] + ["response"] + split_topic[2:]) - response_payload = {} - - try: - payload = json.loads(message.payload) - except Exception: - response_payload["success"] = False - response_payload["error_msg"] = f"could not parse payload: {truncate_string(message.payload)}" - logger.critical(response_payload["error_msg"]) - self.client.publish(response_topic, payload=json.dumps(response_payload), qos=2) - return - - logger.info(f"Processing task with parameters: \n\ttopic: {message.topic}\n\tjson_payload: {payload}") - - if message.topic == self.base_endpoint + 'control/angles': - status = self.handle_control_angle(payload) - elif message.topic == self.base_endpoint + 'control/coords': - status = self.handle_control_coord(payload) - elif message.topic == self.base_endpoint + "control/gripper": - status = self.handle_control_gripper(payload) - elif message.topic == self.base_endpoint + 'query/angles': - status = self.handle_query_angle(payload) - elif message.topic == self.base_endpoint + 'query/coords': - status = self.handle_query_coord(payload) - elif message.topic == self.base_endpoint + 'query/camera': - status = self.handle_query_camera(payload) - else: - response_payload["success"] = False - response_payload["error_msg"] = f"unknown topic {message.topic}" - logger.critical(response_payload["error_msg"]) - self.client.publish(response_topic, payload=json.dumps(response_payload), qos=2) - - return - - response_payload = json.dumps(status) - self.client.publish(response_topic, payload=response_payload, qos=2) - logger.info(f"attempted publish on: \n\ttopic: {response_topic}\n\tpayload: {truncate_string(response_payload)}") - - def handle_control_gripper(self, payload): - args = payload["args"] - try: - self.mc.set_gripper_value(**args) - time.sleep(3) - return {"success": True} - except Exception as e: - logger.critical("control gripper error: " + truncate_string(str(e))) - return {"success": False, "error_msg": str(e)} - - def handle_control_angle(self, payload): - args = payload["args"] - try: - self.mc.send_angles(**args) - time.sleep(3) - return {"success": True} - except Exception as e: - logger.critical("control angle error: " + truncate_string(str(e))) - return {"success": False, "error_msg": str(e)} - - def handle_control_coord(self, payload): - args = payload["args"] - try: - self.mc.send_coords(**args) - time.sleep(3) - return {"success": True} - except Exception as e: - logger.critical("control coords error: " + truncate_string(str(e))) - return {"success": False, "error_msg": str(e)} - - def handle_query_angle(self, payload): - args = payload["args"] - try: - angles = self.mc.get_angles() - time.sleep(3) - if angles is None or len(angles) < 6: - raise Exception("could not read angle") - return {"success": True, "angles": angles} - except Exception as e: - error_msg = str(e) - logger.critical(error_msg) - return {"success": False, "error_msg": truncate_string(error_msg)} - - def handle_query_coord(self, payload): - args = payload["args"] - try: - coords = self.mc.get_coords() - time.sleep(3) - if coords is None or len(coords) < 6: - raise Exception("could not read coord") - return {"success": True, "coords": coords} - except Exception as e: - error_msg = str(e) - logger.critical(error_msg) - return {"success": False, "error_msg": truncate_string(error_msg)} - - def handle_query_camera(self, payload, quality=100): - args = payload["args"] - try: - webcam = cv2.VideoCapture(0) - _, frame = webcam.read() - webcam.release() - - img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) - compressed_bytes = io.BytesIO() - img.save(compressed_bytes, format="JPEG", quality=quality) - compressed_bytes.seek(0) - byte_str = compressed_bytes.read() - - byte_str_base64 = base64.b64encode(byte_str).decode('utf-8') - - return { - "success": True, - "img_bytes": byte_str_base64, - } - except Exception as e: - logger.critical(f"error in camera query: {truncate_string(str(e))}") - return {"success": False, "error_msg": str(e)} - - - -def on_connect(client, userdata, flags, rc, properties=None): - logger.info("Connection received with code %s." % rc) - -def on_publish(client, userdata, mid, properties=None): - logger.info("Successful publish.") - -def on_message(client, userdata, msg): - if "response" not in msg.topic: - logger.info(f"Recieved message with: \n\ttopic: {msg.topic}\n\tqos: {msg.qos}\n\tpayload: {truncate_string(msg.payload)}") - handler.push_command(msg) - else: - logger.info(f"recieved response on {msg.topic}") - -if __name__ == "__main__": - client = paho.Client(client_id="", userdata=None, protocol=paho.MQTTv5) - client.on_connect = on_connect - client.on_message = on_message - client.on_publish = on_publish - - client.tls_set(tls_version=paho.ssl.PROTOCOL_TLS) - client.username_pw_set(HIVEMQ_USERNAME, HIVEMQ_PASSWORD) - client.connect(HIVEMQ_HOST, 8883) - - base_endpoint = f"cobot280pi/{COBOT_ID}/" - client.subscribe(base_endpoint + "#", qos=2) - - handler = CobotMQTTServer(client, base_endpoint) - client.loop_forever() \ No newline at end of file diff --git a/scripts/cobot280pi/.gitignore b/src/ac_training_lab/cobot280pi/.gitignore similarity index 100% rename from scripts/cobot280pi/.gitignore rename to src/ac_training_lab/cobot280pi/.gitignore diff --git a/scripts/cobot280pi/README.md b/src/ac_training_lab/cobot280pi/README.md similarity index 100% rename from scripts/cobot280pi/README.md rename to src/ac_training_lab/cobot280pi/README.md diff --git a/src/ac_training_lab/cobot280pi/client.py b/src/ac_training_lab/cobot280pi/client.py new file mode 100644 index 0000000..225de28 --- /dev/null +++ b/src/ac_training_lab/cobot280pi/client.py @@ -0,0 +1,110 @@ +import paho.mqtt.client as paho +import json, io, base64 +from queue import Queue +from PIL import Image + + +class CobotController: + + def __init__( + self, + hive_mq_username: str, + hive_mq_password: str, + hive_mq_cloud: str, + port: int, + cobot_id: str, + response_endpoint: str + ): + self.publish_endpoint = cobot_id + self.response_endpoint = response_endpoint + self.client = paho.Client(client_id="", userdata=None, protocol=paho.MQTTv5) + self.client.tls_set() + self.client.username_pw_set(hive_mq_username, hive_mq_password) + self.client.connect(hive_mq_cloud, port) + + response_queue = Queue() + + def on_message(client, userdata, msg): + payload_dict = json.loads(msg.payload) + response_queue.put(payload_dict) + + self.response_queue = response_queue + + def on_connect(client, userdata, flags, rc, properties=None): + print("Connection recieved") + + self.client.on_connect = on_connect + self.client.on_message = on_message + self.client.subscribe(self.response_endpoint, qos=2) + self.client.loop_start() + + def handle_publish_and_response(self, payload): + self.client.publish(self.publish_endpoint, payload=payload, qos=2) + return self.response_queue.get(block=True) + + def send_angles( + self, + angle_list: list[float] = [0.0] * 6, + speed: int = 50 + ): + payload = json.dumps({"command": "control/angles", + "args": {"angles": angle_list, "speed": speed}}) + return self.handle_publish_and_response(payload) + + def send_coords( + self, + coord_list: list[float] = [0.0] * 6, + speed: int = 50 + ): + payload = json.dumps({"command": "control/coords", + "args": {"coords": coord_list, "speed": speed}}) + return self.handle_publish_and_response(payload) + + def send_gripper_value( + self, + value: int = 100, + speed: int = 50 + ): + payload = json.dumps({"command": "control/gripper", + "args": {"gripper_value": value, "speed": speed}}) + return self.handle_publish_and_response(payload) + + def get_angles(self): + payload = json.dumps({"command": "query/angles", "args": {}}) + return self.handle_publish_and_response(payload) + + def get_coords(self): + payload = json.dumps({"command": "query/coords", "args": {}}) + return self.handle_publish_and_response(payload) + + def get_gripper_value(self): + payload = json.dumps({"command": "query/gripper", "args": {}}) + return self.handle_publish_and_response(payload) + + def get_camera(self, quality=100, save_path=None): + payload = json.dumps({"command": "query/camera", "args": {"quality": quality}}) + response = self.handle_publish_and_response(payload) + if not response["success"]: + return response + + b64_bytes = base64.b64decode(response["image"]) + img_bytes = io.BytesIO(b64_bytes) + img = Image.open(img_bytes) + + response["image"] = img + if save_path is not None: + img.save(save_path) + + return response + +if __name__ == "__main__": + from my_secrets import * + + cobot = CobotController( + HIVEMQ_USERNAME, + HIVEMQ_PASSWORD, + HIVEMQ_HOST, + DEVICE_PORT, + DEVICE_ENDPOINT, + RESPONSE_ENDPOINT + ) \ No newline at end of file diff --git a/scripts/cobot280pi/environment.yml b/src/ac_training_lab/cobot280pi/environment.yml similarity index 80% rename from scripts/cobot280pi/environment.yml rename to src/ac_training_lab/cobot280pi/environment.yml index 2327581..c7d5e0f 100644 --- a/scripts/cobot280pi/environment.yml +++ b/src/ac_training_lab/cobot280pi/environment.yml @@ -1,4 +1,4 @@ -name: paho-mqtt +name: mqtt channels: - defaults dependencies: @@ -25,17 +25,25 @@ dependencies: - xz=5.4.6=h5eee18b_1 - zlib=1.2.13=h5eee18b_1 - pip: + - autopep8==2.3.1 - contourpy==1.3.0 - cycler==0.12.1 - fonttools==4.53.1 - kiwisolver==1.4.7 - matplotlib==3.9.2 + - msgpack==1.0.8 - numpy==2.1.1 - opencv-python==4.10.0.84 - packaging==24.1 - paho-mqtt==1.5.1 - pillow==10.4.0 + - pycodestyle==2.12.1 + - pymycobot==3.5.3 - pyparsing==3.1.4 + - pyserial==3.5 + - python-can==4.4.2 - python-dateutil==2.9.0.post0 - six==1.16.0 -prefix: /root/miniconda3/envs/paho-mqtt + - typing-extensions==4.12.2 + - wrapt==1.16.0 +prefix: /home/gursi26/miniconda3/envs/mqtt diff --git a/src/ac_training_lab/cobot280pi/server.py b/src/ac_training_lab/cobot280pi/server.py new file mode 100644 index 0000000..df38d85 --- /dev/null +++ b/src/ac_training_lab/cobot280pi/server.py @@ -0,0 +1,178 @@ +from my_secrets import * +from utils import * + +import cv2 +import time, json, base64, io, sys, argparse +from PIL import Image +from queue import Queue +import paho.mqtt.client as paho +from pymycobot.mycobot import MyCobot + +# cli args +parser = argparse.ArgumentParser() +parser.add_argument("--debug", "-d", action="store_true", help="runs in debug mode") + +# Cobot action functions +def handle_control_gripper(args, cobot): + logger.info(f"running command control/gripper with {args}") + try: + cobot.set_gripper_value(**args) + return {"success": True} + except Exception as e: + logger.critical(f"control gripper error: {str(e)}") + return {"success": False, "error_msg": str(e)} + +def handle_control_angles(args, cobot): + logger.info(f"running command control/angle with {args}") + try: + cobot.send_angles(**args) + return {"success": True} + except Exception as e: + logger.critical(f"control angle error: {str(e)}") + return {"success": False, "error_msg": str(e)} + +def handle_control_coords(args, cobot): + logger.info(f"running command control/coord with {args}") + try: + cobot.send_coords(**args) + return {"success": True} + except Exception as e: + logger.critical(f"control coords error: {str(e)}") + return {"success": False, "error_msg": str(e)} + +def handle_query_angles(args, cobot): + logger.info(f"running command query/angle with {args}") + try: + angles = cobot.get_angles() + if angles is None or len(angles) < 6: + raise Exception("could not read angle") + return {"success": True, "angles": angles} + except Exception as e: + logger.critical(f"query angle error: {str(e)}") + return {"success": False, "error_msg": str(e)} + +def handle_query_coords(args, cobot): + logger.info(f"running command query/coord with {args}") + try: + coords = cobot.get_coords() + if coords is None or len(coords) < 6: + raise Exception("could not read coord") + return {"success": True, "coords": coords} + except Exception as e: + logger.critical(f"query coord error: {str(e)}") + return {"success": False, "error_msg": str(e)} + +def handle_query_gripper(args, cobot): + logger.info(f"running command query/coord with {args}") + try: + gripper_pos = cobot.get_gripper_value() + return {"success": True, "position": gripper_pos} + except Exception as e: + logger.critical(f"query gripper error: {str(e)}") + return {"success": False, "error_msg": str(e)} + +def handle_query_camera(args): + logger.info(f"running command query/camera with {args}") + try: + webcam = cv2.VideoCapture(0) + _, frame = webcam.read() + webcam.release() + + img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) + compressed_bytes = io.BytesIO() + img.save(compressed_bytes, format="JPEG", quality=args["quality"]) + compressed_bytes.seek(0) + byte_str = base64.b64encode(compressed_bytes.read()).decode("utf-8") + return {"success": True, "image": byte_str} + except Exception as e: + logger.critical(f"query camera error: {str(e)}") + return {"success": False, "error_msg": str(e)} + + +# MQTT Functions +def on_connect(client, userdata, flags, rc, properties=None): + logger.info("Connection received with code %s." % rc) + +def on_publish(client, userdata, mid, properties=None): + logger.info("Successful publish.") + +def handle_message(msg, cobot): + # Parse payload to json dict + try: + payload_dict = json.loads(msg.payload) + except Exception as e: + return { + "success": False, + "error": str(e) + } + + if "command" not in payload_dict: + return { + "success": False, + "error": "'command' key should be in payload" + } + + # Match to a command function + cmd = payload_dict["command"] + if cmd == "control/angles": + return handle_control_angles(payload_dict["args"], cobot) + elif cmd == "control/coords": + return handle_control_coords(payload_dict["args"], cobot) + elif cmd == "control/gripper": + return handle_control_gripper(payload_dict["args"], cobot) + elif cmd == "query/angles": + return handle_query_angles(payload_dict["args"], cobot) + elif cmd == "query/coords": + return handle_query_coords(payload_dict["args"], cobot) + elif cmd == "query/gripper": + return handle_query_gripper(payload_dict["args"], cobot) + elif cmd == "query/camera": + return handle_query_camera(payload_dict["args"]) + else: + return { + "success": False, + "error": "invalid command" + } + +if __name__ == "__main__": + logger = setup_logger() + task_queue = Queue() + args = parser.parse_args() + + try: + cobot = MyCobot("/dev/ttyAMA0", 1000000) + logger.info("Cobot object initialized...") + except Exception as e: + logger.critical(f"could not initialize cobot with error {str(e)}") + if not args.debug: + logger.info("exiting...") + sys.exit(1) + else: + cobot = None + + def on_message(client, userdata, msg): + logger.info( + f"Recieved message with: \n\ttopic: {msg.topic}\n\tqos: {msg.qos}\n\tpayload: {msg.payload}") + task_queue.put(msg) + + client = paho.Client(client_id="", userdata=None, protocol=paho.MQTTv5) + client.on_connect = on_connect + client.on_message = on_message + client.on_publish = on_publish + + client.tls_set(tls_version=paho.ssl.PROTOCOL_TLS) + client.username_pw_set(HIVEMQ_USERNAME, HIVEMQ_PASSWORD) + client.connect(HIVEMQ_HOST, DEVICE_PORT) + client.subscribe(DEVICE_ENDPOINT, qos=2) + client.loop_start() + logger.info("Ready for tasks...") + + while True: + msg = task_queue.get() # blocks if queue is empty + response_dict = handle_message(msg, cobot) + client.publish( + RESPONSE_ENDPOINT, + qos=2, + payload=json.dumps(response_dict) + ) + time.sleep(3) diff --git a/scripts/cobot280pi/utils.py b/src/ac_training_lab/cobot280pi/utils.py similarity index 100% rename from scripts/cobot280pi/utils.py rename to src/ac_training_lab/cobot280pi/utils.py From 0367f3acbfc84297b96593d6bb3b18f7cdff6c4a Mon Sep 17 00:00:00 2001 From: Gursimar Singh Date: Fri, 27 Sep 2024 14:06:14 -0400 Subject: [PATCH 04/13] added a dummy cobot file for easy testing without cobot --- src/ac_training_lab/cobot280pi/dummy_cobot.py | 33 ++++++++++++ src/ac_training_lab/cobot280pi/server.py | 53 ++++++++++++------- src/ac_training_lab/cobot280pi/utils.py | 9 ++-- 3 files changed, 74 insertions(+), 21 deletions(-) create mode 100644 src/ac_training_lab/cobot280pi/dummy_cobot.py diff --git a/src/ac_training_lab/cobot280pi/dummy_cobot.py b/src/ac_training_lab/cobot280pi/dummy_cobot.py new file mode 100644 index 0000000..f85c576 --- /dev/null +++ b/src/ac_training_lab/cobot280pi/dummy_cobot.py @@ -0,0 +1,33 @@ +from utils import setup_logger +from PIL import Image + +# A dummy class for easier testing without physically having the cobot +class DummyCobot: + + def __init__(self): + self.logger = setup_logger() + + def set_gripper_value(self, **kwargs): + self.logger.info(f"tried to set gripper value with args {kwargs}") + + def send_angles(self, **kwargs): + self.logger.info(f"tried to send angles with args {kwargs}") + + def send_coords(self, **kwargs): + self.logger.info(f"tried to send coords with args {kwargs}") + + def get_angles(self, **kwargs): + self.logger.info(f"tried to get angles with args {kwargs}") + return [0, 0, 0, 0, 0, 0] + + def get_coords(self, **kwargs): + self.logger.info(f"tried to get coords with args {kwargs}") + return [0, 0, 0, 0, 0, 0] + + def get_gripper_value(self, **kwargs): + self.logger.info(f"tried to get gripper value with args {kwargs}") + return 0 + + def get_camera(self, **kwargs): + self.logger.info(f"tried to get camera with args {kwargs}") + return Image.new('RGB', (1920, 1080), color='black') \ No newline at end of file diff --git a/src/ac_training_lab/cobot280pi/server.py b/src/ac_training_lab/cobot280pi/server.py index df38d85..b3363f4 100644 --- a/src/ac_training_lab/cobot280pi/server.py +++ b/src/ac_training_lab/cobot280pi/server.py @@ -2,15 +2,22 @@ from utils import * import cv2 -import time, json, base64, io, sys, argparse +import time +import json +import base64 +import io +import sys +import argparse from PIL import Image from queue import Queue import paho.mqtt.client as paho from pymycobot.mycobot import MyCobot + # cli args parser = argparse.ArgumentParser() parser.add_argument("--debug", "-d", action="store_true", help="runs in debug mode") +cliargs = parser.parse_args() # Cobot action functions def handle_control_gripper(args, cobot): @@ -22,6 +29,7 @@ def handle_control_gripper(args, cobot): logger.critical(f"control gripper error: {str(e)}") return {"success": False, "error_msg": str(e)} + def handle_control_angles(args, cobot): logger.info(f"running command control/angle with {args}") try: @@ -31,6 +39,7 @@ def handle_control_angles(args, cobot): logger.critical(f"control angle error: {str(e)}") return {"success": False, "error_msg": str(e)} + def handle_control_coords(args, cobot): logger.info(f"running command control/coord with {args}") try: @@ -40,6 +49,7 @@ def handle_control_coords(args, cobot): logger.critical(f"control coords error: {str(e)}") return {"success": False, "error_msg": str(e)} + def handle_query_angles(args, cobot): logger.info(f"running command query/angle with {args}") try: @@ -51,6 +61,7 @@ def handle_query_angles(args, cobot): logger.critical(f"query angle error: {str(e)}") return {"success": False, "error_msg": str(e)} + def handle_query_coords(args, cobot): logger.info(f"running command query/coord with {args}") try: @@ -62,6 +73,7 @@ def handle_query_coords(args, cobot): logger.critical(f"query coord error: {str(e)}") return {"success": False, "error_msg": str(e)} + def handle_query_gripper(args, cobot): logger.info(f"running command query/coord with {args}") try: @@ -71,14 +83,17 @@ def handle_query_gripper(args, cobot): logger.critical(f"query gripper error: {str(e)}") return {"success": False, "error_msg": str(e)} + def handle_query_camera(args): logger.info(f"running command query/camera with {args}") try: - webcam = cv2.VideoCapture(0) - _, frame = webcam.read() - webcam.release() - - img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) + if not cliargs.debug: + webcam = cv2.VideoCapture(0) + _, frame = webcam.read() + webcam.release() + img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) + else: + img = cobot.get_camera(**args) compressed_bytes = io.BytesIO() img.save(compressed_bytes, format="JPEG", quality=args["quality"]) compressed_bytes.seek(0) @@ -93,9 +108,11 @@ def handle_query_camera(args): def on_connect(client, userdata, flags, rc, properties=None): logger.info("Connection received with code %s." % rc) + def on_publish(client, userdata, mid, properties=None): logger.info("Successful publish.") + def handle_message(msg, cobot): # Parse payload to json dict try: @@ -134,21 +151,21 @@ def handle_message(msg, cobot): "error": "invalid command" } + if __name__ == "__main__": - logger = setup_logger() task_queue = Queue() - args = parser.parse_args() + logger = setup_logger() - try: - cobot = MyCobot("/dev/ttyAMA0", 1000000) - logger.info("Cobot object initialized...") - except Exception as e: - logger.critical(f"could not initialize cobot with error {str(e)}") - if not args.debug: - logger.info("exiting...") + if not cliargs.debug: + try: + cobot = MyCobot("/dev/ttyAMA0", 1000000) + logger.info("Cobot object initialized...") + except Exception as e: + logger.critical(f"could not initialize cobot with error {str(e)}") sys.exit(1) - else: - cobot = None + else: + from dummy_cobot import DummyCobot + cobot = DummyCobot() def on_message(client, userdata, msg): logger.info( @@ -175,4 +192,4 @@ def on_message(client, userdata, msg): qos=2, payload=json.dumps(response_dict) ) - time.sleep(3) + time.sleep(3) \ No newline at end of file diff --git a/src/ac_training_lab/cobot280pi/utils.py b/src/ac_training_lab/cobot280pi/utils.py index f4947fe..f5a75ee 100644 --- a/src/ac_training_lab/cobot280pi/utils.py +++ b/src/ac_training_lab/cobot280pi/utils.py @@ -3,7 +3,7 @@ def setup_logger(logfile_name: str = "mqttcobot.log"): logger = logging.getLogger('logger') - logger.setLevel(logging.NOTSET) + logger.setLevel(logging.INFO) console_handler = logging.StreamHandler(sys.stdout) file_handler = logging.FileHandler(logfile_name) @@ -12,10 +12,13 @@ def setup_logger(logfile_name: str = "mqttcobot.log"): console_handler.setFormatter(formatter) file_handler.setFormatter(formatter) - logger.addHandler(console_handler) - logger.addHandler(file_handler) + if not logger.hasHandlers(): + logger.addHandler(console_handler) + logger.addHandler(file_handler) + logger.propagate = False return logger + def truncate_string(s, max_length = 50): if len(s) <= max_length: return s From 8aa795910cec710047506cc4d367abec5951c784 Mon Sep 17 00:00:00 2001 From: Gursimar Singh Date: Wed, 2 Oct 2024 16:11:39 -0400 Subject: [PATCH 05/13] slight refactor in client and server --- src/ac_training_lab/cobot280pi/client.py | 8 +- src/ac_training_lab/cobot280pi/server.py | 284 ++++++++++++----------- 2 files changed, 146 insertions(+), 146 deletions(-) diff --git a/src/ac_training_lab/cobot280pi/client.py b/src/ac_training_lab/cobot280pi/client.py index 225de28..6eb740e 100644 --- a/src/ac_training_lab/cobot280pi/client.py +++ b/src/ac_training_lab/cobot280pi/client.py @@ -13,10 +13,9 @@ def __init__( hive_mq_cloud: str, port: int, cobot_id: str, - response_endpoint: str ): self.publish_endpoint = cobot_id - self.response_endpoint = response_endpoint + self.response_endpoint = cobot_id + "/response" self.client = paho.Client(client_id="", userdata=None, protocol=paho.MQTTv5) self.client.tls_set() self.client.username_pw_set(hive_mq_username, hive_mq_password) @@ -104,7 +103,6 @@ def get_camera(self, quality=100, save_path=None): HIVEMQ_USERNAME, HIVEMQ_PASSWORD, HIVEMQ_HOST, - DEVICE_PORT, - DEVICE_ENDPOINT, - RESPONSE_ENDPOINT + PORT, + DEVICE_ID ) \ No newline at end of file diff --git a/src/ac_training_lab/cobot280pi/server.py b/src/ac_training_lab/cobot280pi/server.py index b3363f4..37456d2 100644 --- a/src/ac_training_lab/cobot280pi/server.py +++ b/src/ac_training_lab/cobot280pi/server.py @@ -21,175 +21,177 @@ # Cobot action functions def handle_control_gripper(args, cobot): - logger.info(f"running command control/gripper with {args}") - try: - cobot.set_gripper_value(**args) - return {"success": True} - except Exception as e: - logger.critical(f"control gripper error: {str(e)}") - return {"success": False, "error_msg": str(e)} + logger.info(f"running command control/gripper with {args}") + try: + cobot.set_gripper_value(**args) + return {"success": True} + except Exception as e: + logger.critical(f"control gripper error: {str(e)}") + return {"success": False, "error_msg": str(e)} def handle_control_angles(args, cobot): - logger.info(f"running command control/angle with {args}") - try: - cobot.send_angles(**args) - return {"success": True} - except Exception as e: - logger.critical(f"control angle error: {str(e)}") - return {"success": False, "error_msg": str(e)} + logger.info(f"running command control/angle with {args}") + try: + cobot.send_angles(**args) + return {"success": True} + except Exception as e: + logger.critical(f"control angle error: {str(e)}") + return {"success": False, "error_msg": str(e)} def handle_control_coords(args, cobot): - logger.info(f"running command control/coord with {args}") - try: - cobot.send_coords(**args) - return {"success": True} - except Exception as e: - logger.critical(f"control coords error: {str(e)}") - return {"success": False, "error_msg": str(e)} + logger.info(f"running command control/coord with {args}") + try: + cobot.send_coords(**args) + return {"success": True} + except Exception as e: + logger.critical(f"control coords error: {str(e)}") + return {"success": False, "error_msg": str(e)} def handle_query_angles(args, cobot): - logger.info(f"running command query/angle with {args}") - try: - angles = cobot.get_angles() - if angles is None or len(angles) < 6: - raise Exception("could not read angle") - return {"success": True, "angles": angles} - except Exception as e: - logger.critical(f"query angle error: {str(e)}") - return {"success": False, "error_msg": str(e)} + logger.info(f"running command query/angle with {args}") + try: + angles = cobot.get_angles() + if angles is None or len(angles) < 6: + raise Exception("could not read angle") + return {"success": True, "angles": angles} + except Exception as e: + logger.critical(f"query angle error: {str(e)}") + return {"success": False, "error_msg": str(e)} def handle_query_coords(args, cobot): - logger.info(f"running command query/coord with {args}") - try: - coords = cobot.get_coords() - if coords is None or len(coords) < 6: - raise Exception("could not read coord") - return {"success": True, "coords": coords} - except Exception as e: - logger.critical(f"query coord error: {str(e)}") - return {"success": False, "error_msg": str(e)} + logger.info(f"running command query/coord with {args}") + try: + coords = cobot.get_coords() + if coords is None or len(coords) < 6: + raise Exception("could not read coord") + return {"success": True, "coords": coords} + except Exception as e: + logger.critical(f"query coord error: {str(e)}") + return {"success": False, "error_msg": str(e)} def handle_query_gripper(args, cobot): - logger.info(f"running command query/coord with {args}") - try: - gripper_pos = cobot.get_gripper_value() - return {"success": True, "position": gripper_pos} - except Exception as e: - logger.critical(f"query gripper error: {str(e)}") - return {"success": False, "error_msg": str(e)} + logger.info(f"running command query/coord with {args}") + try: + gripper_pos = cobot.get_gripper_value() + return {"success": True, "position": gripper_pos} + except Exception as e: + logger.critical(f"query gripper error: {str(e)}") + return {"success": False, "error_msg": str(e)} def handle_query_camera(args): - logger.info(f"running command query/camera with {args}") - try: - if not cliargs.debug: - webcam = cv2.VideoCapture(0) - _, frame = webcam.read() - webcam.release() - img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) - else: - img = cobot.get_camera(**args) - compressed_bytes = io.BytesIO() - img.save(compressed_bytes, format="JPEG", quality=args["quality"]) - compressed_bytes.seek(0) - byte_str = base64.b64encode(compressed_bytes.read()).decode("utf-8") - return {"success": True, "image": byte_str} - except Exception as e: - logger.critical(f"query camera error: {str(e)}") - return {"success": False, "error_msg": str(e)} + logger.info(f"running command query/camera with {args}") + try: + if not cliargs.debug: + webcam = cv2.VideoCapture(0) + _, frame = webcam.read() + webcam.release() + img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) + else: + img = cobot.get_camera(**args) + compressed_bytes = io.BytesIO() + img.save(compressed_bytes, format="JPEG", quality=args["quality"]) + compressed_bytes.seek(0) + byte_str = base64.b64encode(compressed_bytes.read()).decode("utf-8") + return {"success": True, "image": byte_str} + except Exception as e: + logger.critical(f"query camera error: {str(e)}") + return {"success": False, "error_msg": str(e)} # MQTT Functions def on_connect(client, userdata, flags, rc, properties=None): - logger.info("Connection received with code %s." % rc) + logger.info("Connection received with code %s." % rc) def on_publish(client, userdata, mid, properties=None): - logger.info("Successful publish.") + logger.info("Successful publish.") def handle_message(msg, cobot): - # Parse payload to json dict - try: - payload_dict = json.loads(msg.payload) - except Exception as e: - return { - "success": False, - "error": str(e) - } - - if "command" not in payload_dict: - return { - "success": False, - "error": "'command' key should be in payload" - } - - # Match to a command function - cmd = payload_dict["command"] - if cmd == "control/angles": - return handle_control_angles(payload_dict["args"], cobot) - elif cmd == "control/coords": - return handle_control_coords(payload_dict["args"], cobot) - elif cmd == "control/gripper": - return handle_control_gripper(payload_dict["args"], cobot) - elif cmd == "query/angles": - return handle_query_angles(payload_dict["args"], cobot) - elif cmd == "query/coords": - return handle_query_coords(payload_dict["args"], cobot) - elif cmd == "query/gripper": - return handle_query_gripper(payload_dict["args"], cobot) - elif cmd == "query/camera": - return handle_query_camera(payload_dict["args"]) - else: - return { - "success": False, - "error": "invalid command" - } + # Parse payload to json dict + try: + payload_dict = json.loads(msg.payload) + except Exception as e: + return { + "success": False, + "error": str(e) + } + + if "command" not in payload_dict: + return { + "success": False, + "error": "'command' key should be in payload" + } + + # Match to a command function + cmd = payload_dict["command"] + if cmd == "control/angles": + return handle_control_angles(payload_dict["args"], cobot) + elif cmd == "control/coords": + return handle_control_coords(payload_dict["args"], cobot) + elif cmd == "control/gripper": + return handle_control_gripper(payload_dict["args"], cobot) + elif cmd == "query/angles": + return handle_query_angles(payload_dict["args"], cobot) + elif cmd == "query/coords": + return handle_query_coords(payload_dict["args"], cobot) + elif cmd == "query/gripper": + return handle_query_gripper(payload_dict["args"], cobot) + elif cmd == "query/camera": + return handle_query_camera(payload_dict["args"]) + else: + return { + "success": False, + "error": "invalid command" + } if __name__ == "__main__": - task_queue = Queue() - logger = setup_logger() - - if not cliargs.debug: - try: - cobot = MyCobot("/dev/ttyAMA0", 1000000) - logger.info("Cobot object initialized...") - except Exception as e: - logger.critical(f"could not initialize cobot with error {str(e)}") - sys.exit(1) - else: - from dummy_cobot import DummyCobot - cobot = DummyCobot() - - def on_message(client, userdata, msg): - logger.info( - f"Recieved message with: \n\ttopic: {msg.topic}\n\tqos: {msg.qos}\n\tpayload: {msg.payload}") - task_queue.put(msg) - - client = paho.Client(client_id="", userdata=None, protocol=paho.MQTTv5) - client.on_connect = on_connect - client.on_message = on_message - client.on_publish = on_publish - - client.tls_set(tls_version=paho.ssl.PROTOCOL_TLS) - client.username_pw_set(HIVEMQ_USERNAME, HIVEMQ_PASSWORD) - client.connect(HIVEMQ_HOST, DEVICE_PORT) - client.subscribe(DEVICE_ENDPOINT, qos=2) - client.loop_start() - logger.info("Ready for tasks...") - - while True: - msg = task_queue.get() # blocks if queue is empty - response_dict = handle_message(msg, cobot) - client.publish( - RESPONSE_ENDPOINT, - qos=2, - payload=json.dumps(response_dict) - ) - time.sleep(3) \ No newline at end of file + task_queue = Queue() + logger = setup_logger() + + if not cliargs.debug: + try: + cobot = MyCobot("/dev/ttyAMA0", 1000000) + logger.info("Cobot object initialized...") + except Exception as e: + logger.critical(f"could not initialize cobot with error {str(e)}") + sys.exit(1) + else: + from dummy_cobot import DummyCobot + cobot = DummyCobot() + + def on_message(client, userdata, msg): + logger.info( + f"Recieved message with: \n\ttopic: {msg.topic}\n\tqos: {msg.qos}\n\tpayload: {msg.payload}") + task_queue.put(msg) + + client = paho.Client(client_id="", userdata=None, protocol=paho.MQTTv5) + client.on_connect = on_connect + client.on_message = on_message + client.on_publish = on_publish + + client.tls_set(tls_version=paho.ssl.PROTOCOL_TLS) + client.username_pw_set(HIVEMQ_USERNAME, HIVEMQ_PASSWORD) + client.connect(HIVEMQ_HOST, PORT) + client.subscribe(DEVICE_ID, qos=2) + client.loop_start() + logger.info("Ready for tasks...") + + response_endpoint = DEVICE_ID + "/response" + + while True: + msg = task_queue.get() # blocks if queue is empty + response_dict = handle_message(msg, cobot) + client.publish( + response_endpoint, + qos=2, + payload=json.dumps(response_dict) + ) + time.sleep(3) \ No newline at end of file From 68fae4ccee563db8e31fb12c244c606df1f71ec6 Mon Sep 17 00:00:00 2001 From: gursi26 Date: Thu, 10 Oct 2024 16:54:39 -0400 Subject: [PATCH 06/13] added all --- src/ac_training_lab/cobot280pi/.gitignore | 3 +- src/ac_training_lab/cobot280pi/demo.py | 5 ++ .../cobot280pi/environment.yml | 49 ------------------- .../cobot280pi/requirements.txt | 4 ++ src/ac_training_lab/cobot280pi/server.py | 11 ++--- 5 files changed, 16 insertions(+), 56 deletions(-) create mode 100644 src/ac_training_lab/cobot280pi/demo.py delete mode 100644 src/ac_training_lab/cobot280pi/environment.yml create mode 100644 src/ac_training_lab/cobot280pi/requirements.txt diff --git a/src/ac_training_lab/cobot280pi/.gitignore b/src/ac_training_lab/cobot280pi/.gitignore index cac4054..b612d44 100644 --- a/src/ac_training_lab/cobot280pi/.gitignore +++ b/src/ac_training_lab/cobot280pi/.gitignore @@ -1,2 +1,3 @@ my_secrets.py -*.jpg \ No newline at end of file +*.jpg +.stfolder/ diff --git a/src/ac_training_lab/cobot280pi/demo.py b/src/ac_training_lab/cobot280pi/demo.py new file mode 100644 index 0000000..31ee3ad --- /dev/null +++ b/src/ac_training_lab/cobot280pi/demo.py @@ -0,0 +1,5 @@ +from pymycobot.mycobot import MyCobot + +cobot = MyCobot("/dev/ttyAMA0", 1000000) + + diff --git a/src/ac_training_lab/cobot280pi/environment.yml b/src/ac_training_lab/cobot280pi/environment.yml deleted file mode 100644 index c7d5e0f..0000000 --- a/src/ac_training_lab/cobot280pi/environment.yml +++ /dev/null @@ -1,49 +0,0 @@ -name: mqtt -channels: - - defaults -dependencies: - - _libgcc_mutex=0.1=main - - _openmp_mutex=5.1=1_gnu - - bzip2=1.0.8=h5eee18b_6 - - ca-certificates=2024.7.2=h06a4308_0 - - ld_impl_linux-64=2.38=h1181459_1 - - libffi=3.4.4=h6a678d5_1 - - libgcc-ng=11.2.0=h1234567_1 - - libgomp=11.2.0=h1234567_1 - - libstdcxx-ng=11.2.0=h1234567_1 - - libuuid=1.41.5=h5eee18b_0 - - ncurses=6.4=h6a678d5_0 - - openssl=3.0.15=h5eee18b_0 - - pip=24.2=py311h06a4308_0 - - python=3.11.9=h955ad1f_0 - - readline=8.2=h5eee18b_0 - - setuptools=72.1.0=py311h06a4308_0 - - sqlite=3.45.3=h5eee18b_0 - - tk=8.6.14=h39e8969_0 - - tzdata=2024a=h04d1e81_0 - - wheel=0.44.0=py311h06a4308_0 - - xz=5.4.6=h5eee18b_1 - - zlib=1.2.13=h5eee18b_1 - - pip: - - autopep8==2.3.1 - - contourpy==1.3.0 - - cycler==0.12.1 - - fonttools==4.53.1 - - kiwisolver==1.4.7 - - matplotlib==3.9.2 - - msgpack==1.0.8 - - numpy==2.1.1 - - opencv-python==4.10.0.84 - - packaging==24.1 - - paho-mqtt==1.5.1 - - pillow==10.4.0 - - pycodestyle==2.12.1 - - pymycobot==3.5.3 - - pyparsing==3.1.4 - - pyserial==3.5 - - python-can==4.4.2 - - python-dateutil==2.9.0.post0 - - six==1.16.0 - - typing-extensions==4.12.2 - - wrapt==1.16.0 -prefix: /home/gursi26/miniconda3/envs/mqtt diff --git a/src/ac_training_lab/cobot280pi/requirements.txt b/src/ac_training_lab/cobot280pi/requirements.txt new file mode 100644 index 0000000..bdc8875 --- /dev/null +++ b/src/ac_training_lab/cobot280pi/requirements.txt @@ -0,0 +1,4 @@ +paho-mqtt==2.1.0 +pillow==10.4.0 +setuptools==75.1.0 +wheel==0.44.0 diff --git a/src/ac_training_lab/cobot280pi/server.py b/src/ac_training_lab/cobot280pi/server.py index 37456d2..e5bf405 100644 --- a/src/ac_training_lab/cobot280pi/server.py +++ b/src/ac_training_lab/cobot280pi/server.py @@ -179,19 +179,18 @@ def on_message(client, userdata, msg): client.tls_set(tls_version=paho.ssl.PROTOCOL_TLS) client.username_pw_set(HIVEMQ_USERNAME, HIVEMQ_PASSWORD) - client.connect(HIVEMQ_HOST, PORT) - client.subscribe(DEVICE_ID, qos=2) + client.connect(HIVEMQ_HOST, DEVICE_PORT) + client.subscribe(DEVICE_ENDPOINT, qos=2) client.loop_start() logger.info("Ready for tasks...") - response_endpoint = DEVICE_ID + "/response" - while True: msg = task_queue.get() # blocks if queue is empty response_dict = handle_message(msg, cobot) - client.publish( - response_endpoint, + pub_handle = client.publish( + RESPONSE_ENDPOINT, qos=2, payload=json.dumps(response_dict) ) + pub_handle.wait_for_publish() time.sleep(3) \ No newline at end of file From 5c95a91e4fce7ee9cf51ac630866e9c58fa7db14 Mon Sep 17 00:00:00 2001 From: gursi26 Date: Thu, 10 Oct 2024 16:59:01 -0400 Subject: [PATCH 07/13] updated demo --- src/ac_training_lab/cobot280pi/demo.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/ac_training_lab/cobot280pi/demo.py b/src/ac_training_lab/cobot280pi/demo.py index 31ee3ad..dd743ab 100644 --- a/src/ac_training_lab/cobot280pi/demo.py +++ b/src/ac_training_lab/cobot280pi/demo.py @@ -2,4 +2,6 @@ cobot = MyCobot("/dev/ttyAMA0", 1000000) +def rise(): + cobot.send_angles([0, 0, 0, 0, 0, 0], 100) From 3509a0032a287935be78888064992acf31775304 Mon Sep 17 00:00:00 2001 From: gursi26 Date: Thu, 10 Oct 2024 17:03:44 -0400 Subject: [PATCH 08/13] added another demo function --- src/ac_training_lab/cobot280pi/demo.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/ac_training_lab/cobot280pi/demo.py b/src/ac_training_lab/cobot280pi/demo.py index dd743ab..67752c3 100644 --- a/src/ac_training_lab/cobot280pi/demo.py +++ b/src/ac_training_lab/cobot280pi/demo.py @@ -5,3 +5,5 @@ def rise(): cobot.send_angles([0, 0, 0, 0, 0, 0], 100) +def kill(): + cobot.release_all_servos() From b0c5f98601f80ddfc6601f421195ab9be0f3c5d6 Mon Sep 17 00:00:00 2001 From: gursi26 Date: Fri, 8 Nov 2024 16:05:18 -0500 Subject: [PATCH 09/13] small changes, remove * imports --- src/ac_training_lab/cobot280pi/client.py | 4 ++-- src/ac_training_lab/cobot280pi/server.py | 8 +++----- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/src/ac_training_lab/cobot280pi/client.py b/src/ac_training_lab/cobot280pi/client.py index 6eb740e..995db4f 100644 --- a/src/ac_training_lab/cobot280pi/client.py +++ b/src/ac_training_lab/cobot280pi/client.py @@ -103,6 +103,6 @@ def get_camera(self, quality=100, save_path=None): HIVEMQ_USERNAME, HIVEMQ_PASSWORD, HIVEMQ_HOST, - PORT, - DEVICE_ID + DEVICE_PORT, + DEVICE_ENDPOINT ) \ No newline at end of file diff --git a/src/ac_training_lab/cobot280pi/server.py b/src/ac_training_lab/cobot280pi/server.py index e5bf405..0b5f606 100644 --- a/src/ac_training_lab/cobot280pi/server.py +++ b/src/ac_training_lab/cobot280pi/server.py @@ -1,6 +1,5 @@ -from my_secrets import * -from utils import * - +from my_secrets import DEVICE_ENDPOINT, HIVEMQ_USERNAME, HIVEMQ_PASSWORD, HIVEMQ_HOST, DEVICE_PORT +from utils import setup_logger import cv2 import time import json @@ -13,7 +12,6 @@ import paho.mqtt.client as paho from pymycobot.mycobot import MyCobot - # cli args parser = argparse.ArgumentParser() parser.add_argument("--debug", "-d", action="store_true", help="runs in debug mode") @@ -188,7 +186,7 @@ def on_message(client, userdata, msg): msg = task_queue.get() # blocks if queue is empty response_dict = handle_message(msg, cobot) pub_handle = client.publish( - RESPONSE_ENDPOINT, + DEVICE_ENDPOINT + "/response", qos=2, payload=json.dumps(response_dict) ) From d459a0d136aa9266dd66cfac8ac056ef8d60172f Mon Sep 17 00:00:00 2001 From: gursi26 Date: Fri, 8 Nov 2024 16:10:58 -0500 Subject: [PATCH 10/13] added all, fixed formatting --- src/ac_training_lab/cobot280pi/server.py | 298 +++++++++++------------ 1 file changed, 149 insertions(+), 149 deletions(-) diff --git a/src/ac_training_lab/cobot280pi/server.py b/src/ac_training_lab/cobot280pi/server.py index 0b5f606..7d3e39d 100644 --- a/src/ac_training_lab/cobot280pi/server.py +++ b/src/ac_training_lab/cobot280pi/server.py @@ -1,194 +1,194 @@ -from my_secrets import DEVICE_ENDPOINT, HIVEMQ_USERNAME, HIVEMQ_PASSWORD, HIVEMQ_HOST, DEVICE_PORT -from utils import setup_logger -import cv2 -import time -import json +import argparse import base64 import io +import json import sys -import argparse -from PIL import Image +import time from queue import Queue + +import cv2 import paho.mqtt.client as paho +from my_secrets import ( + DEVICE_ENDPOINT, + DEVICE_PORT, + HIVEMQ_HOST, + HIVEMQ_PASSWORD, + HIVEMQ_USERNAME, +) +from PIL import Image from pymycobot.mycobot import MyCobot +from utils import setup_logger # cli args parser = argparse.ArgumentParser() parser.add_argument("--debug", "-d", action="store_true", help="runs in debug mode") cliargs = parser.parse_args() + # Cobot action functions def handle_control_gripper(args, cobot): - logger.info(f"running command control/gripper with {args}") - try: - cobot.set_gripper_value(**args) - return {"success": True} - except Exception as e: - logger.critical(f"control gripper error: {str(e)}") - return {"success": False, "error_msg": str(e)} + logger.info(f"running command control/gripper with {args}") + try: + cobot.set_gripper_value(**args) + return {"success": True} + except Exception as e: + logger.critical(f"control gripper error: {str(e)}") + return {"success": False, "error_msg": str(e)} def handle_control_angles(args, cobot): - logger.info(f"running command control/angle with {args}") - try: - cobot.send_angles(**args) - return {"success": True} - except Exception as e: - logger.critical(f"control angle error: {str(e)}") - return {"success": False, "error_msg": str(e)} + logger.info(f"running command control/angle with {args}") + try: + cobot.send_angles(**args) + return {"success": True} + except Exception as e: + logger.critical(f"control angle error: {str(e)}") + return {"success": False, "error_msg": str(e)} def handle_control_coords(args, cobot): - logger.info(f"running command control/coord with {args}") - try: - cobot.send_coords(**args) - return {"success": True} - except Exception as e: - logger.critical(f"control coords error: {str(e)}") - return {"success": False, "error_msg": str(e)} + logger.info(f"running command control/coord with {args}") + try: + cobot.send_coords(**args) + return {"success": True} + except Exception as e: + logger.critical(f"control coords error: {str(e)}") + return {"success": False, "error_msg": str(e)} def handle_query_angles(args, cobot): - logger.info(f"running command query/angle with {args}") - try: - angles = cobot.get_angles() - if angles is None or len(angles) < 6: - raise Exception("could not read angle") - return {"success": True, "angles": angles} - except Exception as e: - logger.critical(f"query angle error: {str(e)}") - return {"success": False, "error_msg": str(e)} + logger.info(f"running command query/angle with {args}") + try: + angles = cobot.get_angles() + if angles is None or len(angles) < 6: + raise Exception("could not read angle") + return {"success": True, "angles": angles} + except Exception as e: + logger.critical(f"query angle error: {str(e)}") + return {"success": False, "error_msg": str(e)} def handle_query_coords(args, cobot): - logger.info(f"running command query/coord with {args}") - try: - coords = cobot.get_coords() - if coords is None or len(coords) < 6: - raise Exception("could not read coord") - return {"success": True, "coords": coords} - except Exception as e: - logger.critical(f"query coord error: {str(e)}") - return {"success": False, "error_msg": str(e)} + logger.info(f"running command query/coord with {args}") + try: + coords = cobot.get_coords() + if coords is None or len(coords) < 6: + raise Exception("could not read coord") + return {"success": True, "coords": coords} + except Exception as e: + logger.critical(f"query coord error: {str(e)}") + return {"success": False, "error_msg": str(e)} def handle_query_gripper(args, cobot): - logger.info(f"running command query/coord with {args}") - try: - gripper_pos = cobot.get_gripper_value() - return {"success": True, "position": gripper_pos} - except Exception as e: - logger.critical(f"query gripper error: {str(e)}") - return {"success": False, "error_msg": str(e)} + logger.info(f"running command query/coord with {args}") + try: + gripper_pos = cobot.get_gripper_value() + return {"success": True, "position": gripper_pos} + except Exception as e: + logger.critical(f"query gripper error: {str(e)}") + return {"success": False, "error_msg": str(e)} def handle_query_camera(args): - logger.info(f"running command query/camera with {args}") - try: - if not cliargs.debug: - webcam = cv2.VideoCapture(0) - _, frame = webcam.read() - webcam.release() - img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) - else: - img = cobot.get_camera(**args) - compressed_bytes = io.BytesIO() - img.save(compressed_bytes, format="JPEG", quality=args["quality"]) - compressed_bytes.seek(0) - byte_str = base64.b64encode(compressed_bytes.read()).decode("utf-8") - return {"success": True, "image": byte_str} - except Exception as e: - logger.critical(f"query camera error: {str(e)}") - return {"success": False, "error_msg": str(e)} + logger.info(f"running command query/camera with {args}") + try: + if not cliargs.debug: + webcam = cv2.VideoCapture(0) + _, frame = webcam.read() + webcam.release() + img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) + else: + img = cobot.get_camera(**args) + compressed_bytes = io.BytesIO() + img.save(compressed_bytes, format="JPEG", quality=args["quality"]) + compressed_bytes.seek(0) + byte_str = base64.b64encode(compressed_bytes.read()).decode("utf-8") + return {"success": True, "image": byte_str} + except Exception as e: + logger.critical(f"query camera error: {str(e)}") + return {"success": False, "error_msg": str(e)} # MQTT Functions def on_connect(client, userdata, flags, rc, properties=None): - logger.info("Connection received with code %s." % rc) + logger.info("Connection received with code %s." % rc) def on_publish(client, userdata, mid, properties=None): - logger.info("Successful publish.") + logger.info("Successful publish.") def handle_message(msg, cobot): - # Parse payload to json dict - try: - payload_dict = json.loads(msg.payload) - except Exception as e: - return { - "success": False, - "error": str(e) - } - - if "command" not in payload_dict: - return { - "success": False, - "error": "'command' key should be in payload" - } - - # Match to a command function - cmd = payload_dict["command"] - if cmd == "control/angles": - return handle_control_angles(payload_dict["args"], cobot) - elif cmd == "control/coords": - return handle_control_coords(payload_dict["args"], cobot) - elif cmd == "control/gripper": - return handle_control_gripper(payload_dict["args"], cobot) - elif cmd == "query/angles": - return handle_query_angles(payload_dict["args"], cobot) - elif cmd == "query/coords": - return handle_query_coords(payload_dict["args"], cobot) - elif cmd == "query/gripper": - return handle_query_gripper(payload_dict["args"], cobot) - elif cmd == "query/camera": - return handle_query_camera(payload_dict["args"]) - else: - return { - "success": False, - "error": "invalid command" - } + # Parse payload to json dict + try: + payload_dict = json.loads(msg.payload) + except Exception as e: + return {"success": False, "error": str(e)} + + if "command" not in payload_dict: + return {"success": False, "error": "'command' key should be in payload"} + + # Match to a command function + cmd = payload_dict["command"] + if cmd == "control/angles": + return handle_control_angles(payload_dict["args"], cobot) + elif cmd == "control/coords": + return handle_control_coords(payload_dict["args"], cobot) + elif cmd == "control/gripper": + return handle_control_gripper(payload_dict["args"], cobot) + elif cmd == "query/angles": + return handle_query_angles(payload_dict["args"], cobot) + elif cmd == "query/coords": + return handle_query_coords(payload_dict["args"], cobot) + elif cmd == "query/gripper": + return handle_query_gripper(payload_dict["args"], cobot) + elif cmd == "query/camera": + return handle_query_camera(payload_dict["args"]) + else: + return {"success": False, "error": "invalid command"} if __name__ == "__main__": - task_queue = Queue() - logger = setup_logger() - - if not cliargs.debug: - try: - cobot = MyCobot("/dev/ttyAMA0", 1000000) - logger.info("Cobot object initialized...") - except Exception as e: - logger.critical(f"could not initialize cobot with error {str(e)}") - sys.exit(1) - else: - from dummy_cobot import DummyCobot - cobot = DummyCobot() - - def on_message(client, userdata, msg): - logger.info( - f"Recieved message with: \n\ttopic: {msg.topic}\n\tqos: {msg.qos}\n\tpayload: {msg.payload}") - task_queue.put(msg) - - client = paho.Client(client_id="", userdata=None, protocol=paho.MQTTv5) - client.on_connect = on_connect - client.on_message = on_message - client.on_publish = on_publish - - client.tls_set(tls_version=paho.ssl.PROTOCOL_TLS) - client.username_pw_set(HIVEMQ_USERNAME, HIVEMQ_PASSWORD) - client.connect(HIVEMQ_HOST, DEVICE_PORT) - client.subscribe(DEVICE_ENDPOINT, qos=2) - client.loop_start() - logger.info("Ready for tasks...") - - while True: - msg = task_queue.get() # blocks if queue is empty - response_dict = handle_message(msg, cobot) - pub_handle = client.publish( - DEVICE_ENDPOINT + "/response", - qos=2, - payload=json.dumps(response_dict) - ) - pub_handle.wait_for_publish() - time.sleep(3) \ No newline at end of file + task_queue = Queue() + logger = setup_logger() + + if not cliargs.debug: + try: + cobot = MyCobot("/dev/ttyAMA0", 1000000) + logger.info("Cobot object initialized...") + except Exception as e: + logger.critical(f"could not initialize cobot with error {str(e)}") + sys.exit(1) + else: + from dummy_cobot import DummyCobot + + cobot = DummyCobot() + + def on_message(client, userdata, msg): + logger.info( + f"Recieved message with: \n\ttopic: {msg.topic}\ + \n\tqos: {msg.qos}\n\tpayload: {msg.payload}" + ) + task_queue.put(msg) + + client = paho.Client(client_id="", userdata=None, protocol=paho.MQTTv5) + client.on_connect = on_connect + client.on_message = on_message + client.on_publish = on_publish + + client.tls_set(tls_version=paho.ssl.PROTOCOL_TLS) + client.username_pw_set(HIVEMQ_USERNAME, HIVEMQ_PASSWORD) + client.connect(HIVEMQ_HOST, DEVICE_PORT) + client.subscribe(DEVICE_ENDPOINT, qos=2) + client.loop_start() + logger.info("Ready for tasks...") + + while True: + msg = task_queue.get() # blocks if queue is empty + response_dict = handle_message(msg, cobot) + pub_handle = client.publish( + DEVICE_ENDPOINT + "/response", qos=2, payload=json.dumps(response_dict) + ) + pub_handle.wait_for_publish() + time.sleep(3) From 78be223d5b01a05b0ebe754f721d57ba27b606b1 Mon Sep 17 00:00:00 2001 From: gursi26 Date: Fri, 8 Nov 2024 16:16:16 -0500 Subject: [PATCH 11/13] pls work --- src/ac_training_lab/cobot280pi/utils.py | 32 ++++++++++--------------- 1 file changed, 13 insertions(+), 19 deletions(-) diff --git a/src/ac_training_lab/cobot280pi/utils.py b/src/ac_training_lab/cobot280pi/utils.py index f5a75ee..502fe30 100644 --- a/src/ac_training_lab/cobot280pi/utils.py +++ b/src/ac_training_lab/cobot280pi/utils.py @@ -1,26 +1,20 @@ import logging import sys -def setup_logger(logfile_name: str = "mqttcobot.log"): - logger = logging.getLogger('logger') - logger.setLevel(logging.INFO) - - console_handler = logging.StreamHandler(sys.stdout) - file_handler = logging.FileHandler(logfile_name) - formatter = logging.Formatter('[%(levelname)s - %(asctime)s]: %(message)s') - console_handler.setFormatter(formatter) - file_handler.setFormatter(formatter) +def setup_logger(logfile_name: str = "mqttcobot.log"): + logger = logging.getLogger("logger") + logger.setLevel(logging.INFO) - if not logger.hasHandlers(): - logger.addHandler(console_handler) - logger.addHandler(file_handler) - logger.propagate = False - return logger + console_handler = logging.StreamHandler(sys.stdout) + file_handler = logging.FileHandler(logfile_name) + formatter = logging.Formatter("[%(levelname)s - %(asctime)s]: %(message)s") + console_handler.setFormatter(formatter) + file_handler.setFormatter(formatter) -def truncate_string(s, max_length = 50): - if len(s) <= max_length: - return s - half = (max_length - 3) // 2 - return f"{s[:half]}...{s[-half:]}" + if not logger.hasHandlers(): + logger.addHandler(console_handler) + logger.addHandler(file_handler) + logger.propagate = False + return logger From 7dd44a1ece431aae1cdc07de7c1652765a9864a6 Mon Sep 17 00:00:00 2001 From: gursi26 Date: Fri, 8 Nov 2024 16:18:28 -0500 Subject: [PATCH 12/13] added newliens, work now? --- src/ac_training_lab/cobot280pi/client.py | 60 ++++++++----------- src/ac_training_lab/cobot280pi/dummy_cobot.py | 7 ++- 2 files changed, 30 insertions(+), 37 deletions(-) diff --git a/src/ac_training_lab/cobot280pi/client.py b/src/ac_training_lab/cobot280pi/client.py index 995db4f..b0febd2 100644 --- a/src/ac_training_lab/cobot280pi/client.py +++ b/src/ac_training_lab/cobot280pi/client.py @@ -1,6 +1,9 @@ -import paho.mqtt.client as paho -import json, io, base64 +import base64 +import io +import json from queue import Queue + +import paho.mqtt.client as paho from PIL import Image @@ -41,31 +44,31 @@ def handle_publish_and_response(self, payload): self.client.publish(self.publish_endpoint, payload=payload, qos=2) return self.response_queue.get(block=True) - def send_angles( - self, - angle_list: list[float] = [0.0] * 6, - speed: int = 50 - ): - payload = json.dumps({"command": "control/angles", - "args": {"angles": angle_list, "speed": speed}}) + def send_angles(self, angle_list: list[float] = [0.0] * 6, speed: int = 50): + payload = json.dumps( + { + "command": "control/angles", + "args": {"angles": angle_list, "speed": speed}, + } + ) return self.handle_publish_and_response(payload) - def send_coords( - self, - coord_list: list[float] = [0.0] * 6, - speed: int = 50 - ): - payload = json.dumps({"command": "control/coords", - "args": {"coords": coord_list, "speed": speed}}) + def send_coords(self, coord_list: list[float] = [0.0] * 6, speed: int = 50): + payload = json.dumps( + { + "command": "control/coords", + "args": {"coords": coord_list, "speed": speed}, + } + ) return self.handle_publish_and_response(payload) - def send_gripper_value( - self, - value: int = 100, - speed: int = 50 - ): - payload = json.dumps({"command": "control/gripper", - "args": {"gripper_value": value, "speed": speed}}) + def send_gripper_value(self, value: int = 100, speed: int = 50): + payload = json.dumps( + { + "command": "control/gripper", + "args": {"gripper_value": value, "speed": speed}, + } + ) return self.handle_publish_and_response(payload) def get_angles(self): @@ -95,14 +98,3 @@ def get_camera(self, quality=100, save_path=None): img.save(save_path) return response - -if __name__ == "__main__": - from my_secrets import * - - cobot = CobotController( - HIVEMQ_USERNAME, - HIVEMQ_PASSWORD, - HIVEMQ_HOST, - DEVICE_PORT, - DEVICE_ENDPOINT - ) \ No newline at end of file diff --git a/src/ac_training_lab/cobot280pi/dummy_cobot.py b/src/ac_training_lab/cobot280pi/dummy_cobot.py index f85c576..f9ca37d 100644 --- a/src/ac_training_lab/cobot280pi/dummy_cobot.py +++ b/src/ac_training_lab/cobot280pi/dummy_cobot.py @@ -1,5 +1,6 @@ -from utils import setup_logger from PIL import Image +from utils import setup_logger + # A dummy class for easier testing without physically having the cobot class DummyCobot: @@ -22,7 +23,7 @@ def get_angles(self, **kwargs): def get_coords(self, **kwargs): self.logger.info(f"tried to get coords with args {kwargs}") - return [0, 0, 0, 0, 0, 0] + return [0, 0, 0, 0, 0, 0] def get_gripper_value(self, **kwargs): self.logger.info(f"tried to get gripper value with args {kwargs}") @@ -30,4 +31,4 @@ def get_gripper_value(self, **kwargs): def get_camera(self, **kwargs): self.logger.info(f"tried to get camera with args {kwargs}") - return Image.new('RGB', (1920, 1080), color='black') \ No newline at end of file + return Image.new("RGB", (1920, 1080), color="black") From 9acb41778ee6755d9fef5bca2a9148d675715629 Mon Sep 17 00:00:00 2001 From: gursi26 Date: Fri, 8 Nov 2024 16:20:05 -0500 Subject: [PATCH 13/13] ok, now i think its good --- src/ac_training_lab/cobot280pi/demo.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/ac_training_lab/cobot280pi/demo.py b/src/ac_training_lab/cobot280pi/demo.py index 67752c3..8c77f18 100644 --- a/src/ac_training_lab/cobot280pi/demo.py +++ b/src/ac_training_lab/cobot280pi/demo.py @@ -2,8 +2,10 @@ cobot = MyCobot("/dev/ttyAMA0", 1000000) + def rise(): - cobot.send_angles([0, 0, 0, 0, 0, 0], 100) + cobot.send_angles([0, 0, 0, 0, 0, 0], 100) + def kill(): - cobot.release_all_servos() + cobot.release_all_servos()