You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I tried to use "AT+INVOKE=1,0,0" to capture images from a python script similar to how it is done on https://sensecraft.seeed.cc/ai/. I get error in the base64 encoded image. I believe it comes from a bug in el_base64_encode() in the second part there is a mixup of i and j. I think it should be
for (j = 0; j < i + 1; j++) *out++ = constants::BASE64_CHARS_TABLE[char_array_4[j]];
Adding null terminating at the end of the function is probably good too but not sure if it creates any issues without it.
*out = '\0';
Environment
Environment you use when bug appears:
Grove Vision AI with latest available firmware connected to Ubuntu running a python script.
Compiler Version
Compiler Commands
SSCMA-Micro Version
Code you run
The detailed error
Additional context
The python script
import serial
import time
import threading
import base64
import io
from PIL import Image, ImageTk
import queue
import json
import tkinter as tk
from tkinter import ttk
import argparse
class SSCMAClient:
def __init__(self, port, baudrate=921600, timeout=1):
"""
Initialize the serial connection and start the listener thread.
"""
try:
self.ser = serial.Serial(
port=port,
baudrate=baudrate,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=timeout
)
time.sleep(2) # Wait for the connection to initialize
print(f"Connected to {port} at {baudrate} baud.")
except serial.SerialException as e:
print(f"Failed to connect to {port}: {e}")
raise e
self.lock = threading.Lock()
self.listening = True
self.listener_thread = threading.Thread(target=self.listen, daemon=True)
self.listener_thread.start()
self.buffer = ""
# Queues for thread-safe communication
self.image_queue = queue.Queue()
self.bbox_queue = queue.Queue()
def send_command(self, command):
"""
Send an AT command to the device.
"""
full_command = f"{command}\r\n" # Ensure commands are terminated with CRLF
print(f"Sending command: {full_command.strip()}")
with self.lock:
try:
self.ser.flushInput() # Flush input buffer to remove any residual data
self.ser.flushOutput() # Flush output buffer
self.buffer = ""
self.ser.write(full_command.encode())
print(f"Command '{command}' sent successfully.")
except serial.SerialException as e:
print(f"Failed to send command '{command}': {e}")
def listen(self):
"""
Listen for incoming data from the device.
"""
self.buffer = ""
decoder = json.JSONDecoder()
while self.listening:
try:
if self.ser.in_waiting:
data = self.ser.read(self.ser.in_waiting).decode(errors='ignore')
print(f"Raw data received: {data.strip()}")
self.buffer += data
while self.buffer:
self.buffer = self.buffer.lstrip() # Remove leading whitespace
if not self.buffer:
break
if self.buffer.strip().endswith('"is_ready": 1}}'):
self.buffer = ""
print("End of startup sequence")
break
if not self.buffer.strip().startswith('{'):
print("####### Ignore not complete data" + self.buffer)
self.buffer = ""
break
try:
obj, index = decoder.raw_decode(self.buffer)
self.handle_json(obj)
self.buffer = self.buffer[index:]
except json.JSONDecodeError as je:
# If no complete JSON object can be decoded, wait for more data
print(f"----> JSON EX:{je}")
print("!!!!!" + self.buffer + "!!!!!!!!")
break
time.sleep(0.1)
except serial.SerialException as e:
print(f"Serial exception: {e}")
self.listening = False
except Exception as e:
print(f"Unexpected error in listen thread: {e}")
self.listening = False
def handle_json(self, json_data):
"""
Handle a parsed JSON object.
"""
print(f"Handling JSON data: {json_data}")
msg_type = json_data.get("type")
name = json_data.get("name")
if msg_type == 1 and name == "INVOKE":
data = json_data.get("data", {})
image_b64 = data.get("image")
boxes = data.get("boxes", [])
if image_b64:
image = self.decode_image(image_b64)
if image:
self.image_queue.put(image)
if boxes:
self.bbox_queue.put(boxes)
elif msg_type == 0 and name == "MODEL":
print(f"Model Information: {json_data.get('data')}")
elif msg_type == 0 and name == "INVOKE":
print(f"Invoke Information: {json_data.get('data')}")
else:
print(f"Unrecognized or unsupported message: {json_data}")
def decode_image(self, img_data):
"""
Decode base64 image data to a PIL Image.
"""
try:
# Remove any whitespace or newlines within the Base64 string
img_data_clean = ''.join(img_data.split())
img_bytes = base64.b64decode(img_data_clean)
image = Image.open(io.BytesIO(img_bytes))
print("Image decoded successfully.")
return image
except Exception as e:
print(f"Failed to decode image: {e}")
return None
def parse_bboxes(self, bbox_data):
"""
Parse bounding boxes from the received data.
Expected format: "x1,y1,x2,y2; x1,y1,x2,y2; ..."
"""
try:
bboxes = []
boxes = bbox_data.split(';')
for box in boxes:
coords = box.strip().split(',')
if len(coords) == 4:
x1, y1, x2, y2 = map(int, coords)
bboxes.append((x1, y1, x2, y2))
print(f"Parsed bounding boxes: {bboxes}")
return bboxes
except Exception as e:
print(f"Failed to parse bounding boxes: {e}")
return []
def close(self):
"""
Close the serial connection and stop listening.
"""
self.listening = False
self.listener_thread.join()
self.ser.close()
print("Serial connection closed.")
class SSCMAApp:
def __init__(self, root, client):
self.root = root
self.client = client
self.root.title("Grove Vision AI v2 Client")
self.root.protocol("WM_DELETE_WINDOW", self.on_closing)
# Create UI elements
self.create_widgets()
# Initialize image and bounding boxes
self.current_image = None
self.bounding_boxes = []
# Start polling queues
self.poll_queues()
def create_widgets(self):
# Create a frame for the image
self.image_frame = ttk.Frame(self.root)
self.image_frame.pack(padx=10, pady=10)
# Canvas to display image and bounding boxes
self.canvas = tk.Canvas(self.image_frame, width=640, height=480, bg='grey')
self.canvas.pack()
# Create a frame for buttons
self.button_frame = ttk.Frame(self.root)
self.button_frame.pack(pady=10)
# Refresh button
self.refresh_button = ttk.Button(
self.button_frame, text="Refresh Image", command=self.refresh_image
)
self.refresh_button.pack()
def refresh_image(self):
"""
Send commands to request image and bounding boxes from the device.
"""
# Send the AT+INVOKE command to get image and bounding boxes
self.client.send_command("AT+INVOKE=1,0,0")
def poll_queues(self):
"""
Periodically check the image and bbox queues for new data.
"""
# Check for new images
try:
while True:
image = self.client.image_queue.get_nowait()
self.display_image(image)
except queue.Empty:
pass
# Check for new bounding boxes
try:
while True:
bboxes = self.client.bbox_queue.get_nowait()
self.display_bboxes(bboxes)
except queue.Empty:
pass
# Schedule the next poll
self.root.after(100, self.poll_queues)
def display_image(self, image):
"""
Display the image on the canvas.
"""
try:
self.current_image = image.copy()
# Define desired max size
max_width, max_height = 640, 480
img_width, img_height = self.current_image.size
scale = min(max_width / img_width, max_height / img_height, 1)
new_size = (int(img_width * scale), int(img_height * scale))
self.current_image = self.current_image.resize(new_size, Image.ANTIALIAS)
self.photo = ImageTk.PhotoImage(self.current_image)
self.canvas.config(width=new_size[0], height=new_size[1])
self.canvas.create_image(0, 0, anchor=tk.NW, image=self.photo)
print("Image displayed and scaled.")
except:
print("Failed to show image!")
pass
def display_bboxes(self, bboxes):
"""
Draw bounding boxes on the canvas.
"""
self.bounding_boxes = bboxes
self.canvas.delete("bbox") # Remove existing bounding boxes
if self.current_image:
for bbox in bboxes:
x1, y1, x2, y2 = bbox
self.canvas.create_rectangle(
x1, y1, x2, y2, outline="red", width=2, tags="bbox"
)
print("Bounding boxes drawn.")
def on_closing(self):
"""
Handle the window closing event.
"""
print("Closing application...")
self.client.close()
self.root.destroy()
def main():
parser = argparse.ArgumentParser(description="Grove Vision AI v2 Python Client with Tkinter GUI")
parser.add_argument(
'--port', type=str, required=True,
help='Serial port (e.g., COM3 or /dev/ttyUSB0)'
)
args = parser.parse_args()
# Initialize the serial client
try:
client = SSCMAClient(port=args.port)
except serial.SerialException:
return
# Initialize Tkinter
root = tk.Tk()
app = SSCMAApp(root, client)
# Start the Tkinter main loop
try:
root.mainloop()
except KeyboardInterrupt:
print("Exiting...")
finally:
client.close()
if __name__ == "__main__":
main()
The text was updated successfully, but these errors were encountered:
Describe the bug
I tried to use "AT+INVOKE=1,0,0" to capture images from a python script similar to how it is done on https://sensecraft.seeed.cc/ai/. I get error in the base64 encoded image. I believe it comes from a bug in el_base64_encode() in the second part there is a mixup of i and j. I think it should be
for (j = 0; j < i + 1; j++) *out++ = constants::BASE64_CHARS_TABLE[char_array_4[j]];
Adding null terminating at the end of the function is probably good too but not sure if it creates any issues without it.
*out = '\0';
Environment
Environment you use when bug appears:
Grove Vision AI with latest available firmware connected to Ubuntu running a python script.
Additional context
The python script
The text was updated successfully, but these errors were encountered: