From c7028c14da8f4bc617a067906eaa9367cdddd79f Mon Sep 17 00:00:00 2001 From: ouyangca <51312196+ouyangca@users.noreply.github.com> Date: Wed, 30 Oct 2024 18:39:59 -0700 Subject: [PATCH] Add files via upload --- backend_arduino.ino | 46 +++++++++++++++++++++++ serialread.py | 20 ++++++++++ ui.py | 89 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 155 insertions(+) create mode 100644 backend_arduino.ino create mode 100644 serialread.py create mode 100644 ui.py diff --git a/backend_arduino.ino b/backend_arduino.ino new file mode 100644 index 00000000..09c7431d --- /dev/null +++ b/backend_arduino.ino @@ -0,0 +1,46 @@ +// #include + +// void setup() { +// Serial.begin(9600); +// } + +// void loop() { +// Serial.println("Hello, World!"); +// delay(1000); +// } + +const int LED { 17 }; + +// add these +const char S_OK { 0xaa }; +const char S_ERR { 0xff }; + +void on_receive(void* event_handler_arg, esp_event_base_t event_base, int32_t event_id, void* event_data) { + // read one byte + char state { Serial.read() }; + // guard byte is valid LED state + if (!(state == LOW || state == HIGH)) { + // invalid byte received + // report error + Serial.write(S_ERR); + return; + } + // update LED with valid state + digitalWrite(LED, state); + Serial.write(S_OK); +} + + + +void setup() { + pinMode(LED, OUTPUT); + + // register "on_receive" as callback for RX event + Serial.onEvent(ARDUINO_HW_CDC_RX_EVENT, on_receive); + Serial.begin(9600); +} + +void loop() { + // Serial.println("Hello, World!"); + // delay(1000); +} diff --git a/serialread.py b/serialread.py new file mode 100644 index 00000000..8d6a90ce --- /dev/null +++ b/serialread.py @@ -0,0 +1,20 @@ +from serial import Serial, SerialException + +# with Serial('/dev/cu.usbmodem14101', 9600) as ser: +# while True: +# print(ser.readline().decode()) + +with Serial('/dev/cu.usbmodem14301', 9600) as ser: + ser.write(bytes([0x0])) + input() # keep port open to see the LED turn on + + +# with Serial('/dev/cu.usbmodem14301', 9600) as ser: +# ser.write(bytes([0x1])) +# assert ser.read() == bytes([0xaa]) + +# ser.write(bytes([0x0])) +# assert ser.read() == bytes([0xaa]) + +# ser.write(bytes([0x2])) +# assert ser.read() == bytes([0xff]) \ No newline at end of file diff --git a/ui.py b/ui.py new file mode 100644 index 00000000..c872bccb --- /dev/null +++ b/ui.py @@ -0,0 +1,89 @@ +import tkinter as tk +import tkinter.ttk as ttk +from tkinter.messagebox import showerror +from serial import Serial, SerialException +from serial.tools.list_ports import comports + +from threading import Thread, Lock # we'll use Lock later ;) +def detached_callback(f): + return lambda *args, **kwargs: Thread(target=f, args=args, kwargs=kwargs).start() + + +S_OK: int = 0xaa +S_ERR: int = 0xff + +class LockedSerial(Serial): + _lock: Lock = Lock() + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + def read(self, size=1) -> bytes: + with self._lock: + return super().read(size) + def write(self, b: bytes, /) -> int | None: + with self._lock: + super().write(b) + def close(self): + with self._lock: + super().close() + + +class App(tk.Tk): + def __init__(self): + super().__init__() + + self.title("LED Blinker") + self.led = tk.BooleanVar() + self.port = tk.StringVar() # add this + + ttk.Checkbutton(self, text='Toggle LED', variable=self.led, command=self.update_led).pack() + ttk.Button(self, text='Send Invalid', command=self.send_invalid).pack() + ttk.Button(self, text='Disconnect',command=self.disconnect, default='active').pack() + SerialPortal(self) # and this + + def connect(self): + self.ser = LockedSerial(self.port.get()) + + def disconnect(self): + self.ser.close() + SerialPortal(self) # display portal to reconnect + + def write(self, b: bytes): + try: + self.ser.write(b) + if int.from_bytes(self.ser.read(), 'big') == S_ERR: + showerror('Device Error', 'The device reported an invalid command.') + except SerialException: + showerror('Serial Error', 'Write failed.') + + @detached_callback + def update_led(self): + self.write(bytes([self.led.get()])) + def send_invalid(self): + self.write(bytes([0x10])) + + def __enter__(self): + return self + def __exit__(self, *_): + self.disconnect() + + + + + +class SerialPortal(tk.Toplevel): + def __init__(self, parent: App): + super().__init__(parent) + self.parent = parent + self.parent.withdraw() # hide App until connected + ttk.OptionMenu(self, parent.port, '', *[d.device for d in comports()]).pack() + ttk.Button(self, text='Connect', command=self.connect, default='active').pack() + def connect(self): + self.parent.connect() + self.destroy() + self.parent.deiconify() # reveal App + + + +if __name__ == '__main__': + with App() as app: + app.mainloop()