From bfcb181cdf0735f8c439b56664179c2141da7d15 Mon Sep 17 00:00:00 2001 From: Phil Howard Date: Fri, 4 Oct 2024 15:41:00 +0100 Subject: [PATCH] PPP: Add send_text_message for #14. Also a quick cleanup of the lte module code. --- micropython/modules_py/lte.py | 61 +++++++++++++++++++---------------- 1 file changed, 33 insertions(+), 28 deletions(-) diff --git a/micropython/modules_py/lte.py b/micropython/modules_py/lte.py index 5340650..435b2e2 100644 --- a/micropython/modules_py/lte.py +++ b/micropython/modules_py/lte.py @@ -18,8 +18,8 @@ class CellularError(Exception): - def __init__(self, message=None): - self.message = "CellularError: " + message + def __init__(self, message=None): + self.message = f"CellularError: {message}" class LTE(): @@ -30,13 +30,13 @@ def __init__(self, apn, uart=None, reset_pin=None, netlight_pin=None, netlight_l DEFAULT_UART_ID, tx=Pin(DEFAULT_PIN_TX, Pin.OUT), rx=Pin(DEFAULT_PIN_RX, Pin.OUT)) - + # Set PPP timeouts and rxbuf self._uart.init( timeout=DEFAULT_UART_TIMEOUT, timeout_char=DEFAULT_UART_TIMEOUT_CHAR, rxbuf=DEFAULT_UART_RXBUF) - + if not skip_reset: self._reset.value(0) time.sleep(1.0) @@ -46,7 +46,7 @@ def __init__(self, apn, uart=None, reset_pin=None, netlight_pin=None, netlight_l self._led = netlight_led self._netlight = netlight_pin or Pin(DEFAULT_PIN_NETLIGHT, Pin.IN) self._netlight.irq(self._netlight_irq) - + def _netlight_irq(self, pin): self._led.value(pin.value()) @@ -60,23 +60,23 @@ def iccid(self): try: return self._send_at_command("AT+CICCID", 1) except CellularError: - return None + return None def status(self): lte_status = self._send_at_command("AT+CEREG?", 1) gsm_status = self._send_at_command("AT+CGREG?", 1) return lte_status, gsm_status - + def signal_quality(self): try: response = self._send_at_command("AT+CSQ", 1) quality = int(response.split(":")[1].split(",")[0]) - db = -113 + (2 * quality) # conversion as per AT command set datasheet + db = -113 + (2 * quality) # conversion as per AT command set datasheet return db except CellularError: pass return None - + def stop_ppp(self): self._ppp.disconnect() self._send_at_command(f"AT+IPR={DEFAULT_UART_STARTUP_BAUD}") @@ -106,11 +106,9 @@ def start_ppp(self, baudrate=DEFAULT_UART_BAUD, connect=True): # print(e) # Force PPP to use modem's default settings... - #time.sleep(2.0) self._flush_uart() self._uart.write("ATD*99#\r") self._uart.flush() - #time.sleep(2.0) self._ppp = PPP(self._uart) self._ppp.connect() @@ -122,16 +120,16 @@ def start_ppp(self, baudrate=DEFAULT_UART_BAUD, connect=True): def connect(self, timeout=60): print(" - setting up cellular uart") # connect to and flush the uart - # consume any unsolicited messages first, we don't need those + # consume any unsolicited messages first, we don't need those self._flush_uart() print(" - waiting for cellular module to be ready") # wait for the cellular module to respond to AT commands - self._wait_ready() + self._wait_ready() - self._send_at_command("ATE0") # disable local echo - self._send_at_command(f"AT+CGDCONT=1,\"IP\",\"{self._apn}\"") # set apn and activate pdp context + self._send_at_command("ATE0") # disable local echo + self._send_at_command(f"AT+CGDCONT=1,\"IP\",\"{self._apn}\"") # set apn and activate pdp context # wait for roaming lte connection to be established giveup = time.time() + timeout @@ -140,10 +138,10 @@ def connect(self, timeout=60): status = self._send_at_command("AT+CEREG?", 1) time.sleep(0.25) if time.time() > giveup: - raise CellularError("timed out getting network registration") + raise CellularError("timed out getting network registration") # disable server and client certification validation - self._send_at_command("AT+CSSLCFG=\"authmode\",0,0") + self._send_at_command("AT+CSSLCFG=\"authmode\",0,0") self._send_at_command("AT+CSSLCFG=\"enableSNI\",0,1") print(f" - SIM ICCID is {self.iccid()}") @@ -152,13 +150,14 @@ def _wait_ready(self, poll_time=0.25, timeout=10): giveup = time.time() + timeout while time.time() <= giveup: try: + # if __send_at_command doesn't throw an exception then we're good! self._send_at_command("AT") - return # if __send_at_command doesn't throw an exception then we're good! + return except CellularError as e: print(e) time.sleep(poll_time) - raise CellularError("timed out waiting for AT response") + raise CellularError("timed out waiting for AT response") def _flush_uart(self): self._uart.flush() @@ -168,28 +167,22 @@ def _flush_uart(self): time.sleep(0.25) def _send_at_command(self, command, result_lines=0, timeout=5.0): - # consume any unsolicited messages first, we don't need those + # consume any unsolicited messages first, we don't need those self._flush_uart() self._uart.write(command + "\r") - #print(f" - tx: {command}") self._uart.flush() status, data = self._read_result(result_lines, timeout=timeout) print(" -", command, status, data) - if status == "TIMEOUT": - #print.error(" !", command, status, data) - raise CellularError(f"cellular module timed out for command {command}") - if status not in ["OK", "DOWNLOAD"]: - #print(" !", command, status, data) raise CellularError(f"non 'OK' or 'DOWNLOAD' result for command {command}") if result_lines == 1: return data[0] if result_lines > 1: - return data + return data return None def _read_result(self, result_lines, timeout=1.0): @@ -199,7 +192,7 @@ def _read_result(self, result_lines, timeout=1.0): timeout *= 1000 while len(result) < result_lines or status is None: if (time.ticks_ms() - start) > timeout: - return "TIMEOUT", [] + raise CellularError("cellular module timed out") line = self._uart.readline() @@ -213,3 +206,15 @@ def _read_result(self, result_lines, timeout=1.0): return status, result + def send_text_message(self, recipient, body, timeout=5.0): + self._uart.write("AT+CMGS=\"") + self._uart.write(recipient.encode("ascii")) + self._uart.write(b"\"\r") + time.sleep(0.5) + self._uart.write(body.encode("ascii")) + self._uart.write(b"\x1a\r") + self._uart.flush() + + status, _ = self._read_result(1, timeout=timeout) + + return status == "OK"