From 926f6b954e5a85ae03d44a18940b2ae6b7421490 Mon Sep 17 00:00:00 2001 From: lk-iqt <112730501+lk-iqt@users.noreply.github.com> Date: Mon, 5 Feb 2024 16:11:49 -0500 Subject: [PATCH 1/5] Added exception handling --- utils/mavlink-api/mqtt-publisher.py | 53 +++++++++++++++++------------ 1 file changed, 31 insertions(+), 22 deletions(-) diff --git a/utils/mavlink-api/mqtt-publisher.py b/utils/mavlink-api/mqtt-publisher.py index d90217bf..55644272 100644 --- a/utils/mavlink-api/mqtt-publisher.py +++ b/utils/mavlink-api/mqtt-publisher.py @@ -24,12 +24,16 @@ def on_connect(client, userdata, flags, rc): def publish_data(client, target_name, data): topic = f"{MQTT_TOPIC}" try: - client.publish(topic, data) + rc = client.publish(topic, data) print(f"Published data to {topic}: {data}") - except Exception as err: + except ConnectionError as err: print("could not publish data: %s", err) + else: + if rc != 0: + print("could not publish data RC=%s", rc) + def fetch_and_publish(client, target_name, target_url): try: @@ -51,7 +55,7 @@ def fetch_and_publish(client, target_name, target_url): "vz": response["vz"], # meters/second } - except Exception as err: + except (httpx.HTTPError, json.JSONDecodeError, KeyError) as err: print("could not update with external GPS: %s", err) data = { "target_name": target_name, @@ -72,26 +76,31 @@ def fetch_and_publish(client, target_name, target_url): def main(): - try: - client = mqtt_client.Client() - client.on_connect = on_connect - client.connect(MQTT_BROKER, int(MQTT_PORT)) - print("Connected to MQTT Broker") - except Exception as err: - logging.error( - f"Could not connect to MQTT broker ({MQTT_BROKER}:{MQTT_PORT}): {err}" - ) - while True: - for target in URL_LIST: - if len(target) == 2: - print(f"Attepting to retrieve data from {target}") - target_name, target_url = target - fetch_and_publish(client, target_name, target_url) - else: - print("Invalid entry in URL_LIST. Each entry should be a 2-entry list.") - - time.sleep(QUERY_INTERVAL) + try: + client = mqtt_client.Client() + client.on_connect = on_connect + client.connect(MQTT_BROKER, int(MQTT_PORT)) + print("Connected to MQTT Broker") + except (ConnectionRefusedError, ConnectionError) as err: + logging.error( + f"Could not connect to MQTT broker ({MQTT_BROKER}:{MQTT_PORT}): {err}" + ) + time.sleep(5) + + while client.is_connected(): + for target in URL_LIST: + if len(target) == 2: + print(f"Attempting to retrieve data from {target}") + target_name, target_url = target + fetch_and_publish(client, target_name, target_url) + else: + print( + "Invalid entry in URL_LIST. Each entry should be a 2-entry list." + ) + + time.sleep(QUERY_INTERVAL) + connect_flag = client.is_connected() if __name__ == "__main__": From 1b89e7a3a2befb79f7fb8a31af443bf44c087c6b Mon Sep 17 00:00:00 2001 From: lk-iqt <112730501+lk-iqt@users.noreply.github.com> Date: Tue, 6 Feb 2024 10:05:04 -0500 Subject: [PATCH 2/5] docker networking bugs --- utils/mavlink-api/docker-compose.yaml | 4 ++-- utils/mavlink-api/mqtt-publisher.py | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/utils/mavlink-api/docker-compose.yaml b/utils/mavlink-api/docker-compose.yaml index 305ac954..5e7a7730 100644 --- a/utils/mavlink-api/docker-compose.yaml +++ b/utils/mavlink-api/docker-compose.yaml @@ -22,11 +22,11 @@ services: # networks: # - gamutrf # environment: - # - MQTT_IP=127.0.0.1 + # - MQTT_IP=mqtt # - MQTT_PORT=1883 # - MQTT_TOPIC=gamutrf/targets # - QUERY_INTERVAL=1 - # - URL_LIST=[["drone", "http://127.0.0.1:8888/gps-data"], ["controller", "http://127.0.0.1:8889/gps-data"]] # Example: [["target1", "https://example1.com"], ["target2", "https://example2.com"]] + # - URL_LIST=[["drone", "http://mavlink-api0:8888/gps-data"], ["controller", "http://mavlink-api1:8888/gps-data"]] # Example: [["target1", "https://example1.com"], ["target2", "https://example2.com"]] # restart: unless-stopped diff --git a/utils/mavlink-api/mqtt-publisher.py b/utils/mavlink-api/mqtt-publisher.py index 55644272..c769208b 100644 --- a/utils/mavlink-api/mqtt-publisher.py +++ b/utils/mavlink-api/mqtt-publisher.py @@ -13,7 +13,6 @@ os.environ.get("URL_LIST", '[["default_target", "http://127.0.0.1:8888/gps-data"]]') ) - def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to MQTT Broker") @@ -56,7 +55,7 @@ def fetch_and_publish(client, target_name, target_url): } except (httpx.HTTPError, json.JSONDecodeError, KeyError) as err: - print("could not update with external GPS: %s", err) + print(f"Could not update with {target_name}:{err}") data = { "target_name": target_name, "gps_stale": None, From 46e0225d8215107ef8744a339a426b8e7f611107 Mon Sep 17 00:00:00 2001 From: lk-iqt <112730501+lk-iqt@users.noreply.github.com> Date: Tue, 6 Feb 2024 11:19:05 -0500 Subject: [PATCH 3/5] added logging --- utils/mavlink-api/docker-compose.yaml | 52 +++++++++++++++++++-------- utils/mavlink-api/mavlink-api.py | 6 +++- utils/mavlink-api/mqtt-publisher.py | 21 +++++------ 3 files changed, 54 insertions(+), 25 deletions(-) diff --git a/utils/mavlink-api/docker-compose.yaml b/utils/mavlink-api/docker-compose.yaml index 5e7a7730..f7116ba9 100644 --- a/utils/mavlink-api/docker-compose.yaml +++ b/utils/mavlink-api/docker-compose.yaml @@ -14,19 +14,43 @@ services: networks: - gamutrf restart: unless-stopped - #mqtt-publisher: - # image: iqtlabs/gamutrf-mavlink-mqtt-publisher:latest - # build: - # context: . - # dockerfile: Dockerfile.mqtt-publisher - # networks: - # - gamutrf - # environment: - # - MQTT_IP=mqtt - # - MQTT_PORT=1883 - # - MQTT_TOPIC=gamutrf/targets - # - QUERY_INTERVAL=1 - # - URL_LIST=[["drone", "http://mavlink-api0:8888/gps-data"], ["controller", "http://mavlink-api1:8888/gps-data"]] # Example: [["target1", "https://example1.com"], ["target2", "https://example2.com"]] - # restart: unless-stopped + mavlink-api-drone: + image: iqtlabs/gamutrf-mavlink-api:latest + build: + context: . + dockerfile: Dockerfile.mavlink-api + ports: + - "8889:8888" + devices: + - "/dev/ttyUSB0:/dev/tty.serial1" + networks: + - gamutrf + restart: unless-stopped + mavlink-api-controller: + image: iqtlabs/gamutrf-mavlink-api:latest + build: + context: . + dockerfile: Dockerfile.mavlink-api + ports: + - "8890:8888" + devices: + - "/dev/ttyUSB1:/dev/tty.serial1" + networks: + - gamutrf + restart: unless-stopped + mqtt-publisher: + image: iqtlabs/gamutrf-mavlink-mqtt-publisher:latest + build: + context: . + dockerfile: Dockerfile.mqtt-publisher + networks: + - gamutrf + environment: + - MQTT_IP=mqtt + - MQTT_PORT=1883 + - MQTT_TOPIC=gamutrf/targets + - QUERY_INTERVAL=1 + - URL_LIST=[["drone", "http://mavlink-api-drone:8888/gps-data"], ["controller", "http://mavlink-api-controller:8888/gps-data"]] # Example: [["target1", "https://example1.com"], ["target2", "https://example2.com"]] + restart: unless-stopped diff --git a/utils/mavlink-api/mavlink-api.py b/utils/mavlink-api/mavlink-api.py index 9afc1ac5..b4e53253 100644 --- a/utils/mavlink-api/mavlink-api.py +++ b/utils/mavlink-api/mavlink-api.py @@ -160,8 +160,12 @@ def get_latest_gps_fix_status(): @app.route("/gps-data", methods=["GET"]) def get_latest_gps_data(): - if mavlink_gps_handler.latest_GLOBAL_POSITION_INT_msg: + if ( + mavlink_gps_handler.latest_GLOBAL_POSITION_INT_msg + and mavlink_gps_handler.latest_GPS_RAW_INT_msg + ): mavlink_gps_handler.GLOBAL_POSITION_INT_parser() + mavlink_gps_handler.GPS_RAW_INT_parser() msg = mavlink_gps_handler.create_gps_json_payload() return jsonify(msg), 200 return jsonify({"error": "No GPS data available"}), 404 diff --git a/utils/mavlink-api/mqtt-publisher.py b/utils/mavlink-api/mqtt-publisher.py index c769208b..bb47c11e 100644 --- a/utils/mavlink-api/mqtt-publisher.py +++ b/utils/mavlink-api/mqtt-publisher.py @@ -13,25 +13,26 @@ os.environ.get("URL_LIST", '[["default_target", "http://127.0.0.1:8888/gps-data"]]') ) + def on_connect(client, userdata, flags, rc): if rc == 0: - print("Connected to MQTT Broker") + logging.info("Connected to MQTT Broker") else: - print(f"Failed to connect, return code {rc}") + logging.error(f"Failed to connect, return code {rc}") def publish_data(client, target_name, data): topic = f"{MQTT_TOPIC}" try: rc = client.publish(topic, data) - print(f"Published data to {topic}: {data}") + logging.info(f"Published data to {topic}: {data}") except ConnectionError as err: - print("could not publish data: %s", err) + logging.error("could not publish data: %s", err) else: if rc != 0: - print("could not publish data RC=%s", rc) + logging.warning("could not publish data RC=%s", rc) def fetch_and_publish(client, target_name, target_url): @@ -55,7 +56,7 @@ def fetch_and_publish(client, target_name, target_url): } except (httpx.HTTPError, json.JSONDecodeError, KeyError) as err: - print(f"Could not update with {target_name}:{err}") + logging.warning(f"Could not update with {target_name}:{err}") data = { "target_name": target_name, "gps_stale": None, @@ -80,7 +81,7 @@ def main(): client = mqtt_client.Client() client.on_connect = on_connect client.connect(MQTT_BROKER, int(MQTT_PORT)) - print("Connected to MQTT Broker") + logging.info("Connected to MQTT Broker") except (ConnectionRefusedError, ConnectionError) as err: logging.error( f"Could not connect to MQTT broker ({MQTT_BROKER}:{MQTT_PORT}): {err}" @@ -88,18 +89,18 @@ def main(): time.sleep(5) while client.is_connected(): + logging.info(f"Initializing with {URL_LIST}") for target in URL_LIST: if len(target) == 2: - print(f"Attempting to retrieve data from {target}") + logging.info(f"Attempting to retrieve data from {target}") target_name, target_url = target fetch_and_publish(client, target_name, target_url) else: - print( + logging.warning( "Invalid entry in URL_LIST. Each entry should be a 2-entry list." ) time.sleep(QUERY_INTERVAL) - connect_flag = client.is_connected() if __name__ == "__main__": From 89a6155ee97bf1b901e6660c21f402bcaf3bc73e Mon Sep 17 00:00:00 2001 From: lk-iqt <112730501+lk-iqt@users.noreply.github.com> Date: Tue, 6 Feb 2024 15:12:48 -0500 Subject: [PATCH 4/5] patches to make stable --- utils/mavlink-api/docker-compose.yaml | 78 +++++++++++++-------------- utils/mavlink-api/mqtt-publisher.py | 73 ++++++++++++++----------- 2 files changed, 81 insertions(+), 70 deletions(-) diff --git a/utils/mavlink-api/docker-compose.yaml b/utils/mavlink-api/docker-compose.yaml index f7116ba9..c072f609 100644 --- a/utils/mavlink-api/docker-compose.yaml +++ b/utils/mavlink-api/docker-compose.yaml @@ -14,43 +14,41 @@ services: networks: - gamutrf restart: unless-stopped - mavlink-api-drone: - image: iqtlabs/gamutrf-mavlink-api:latest - build: - context: . - dockerfile: Dockerfile.mavlink-api - ports: - - "8889:8888" - devices: - - "/dev/ttyUSB0:/dev/tty.serial1" - networks: - - gamutrf - restart: unless-stopped - mavlink-api-controller: - image: iqtlabs/gamutrf-mavlink-api:latest - build: - context: . - dockerfile: Dockerfile.mavlink-api - ports: - - "8890:8888" - devices: - - "/dev/ttyUSB1:/dev/tty.serial1" - networks: - - gamutrf - restart: unless-stopped - mqtt-publisher: - image: iqtlabs/gamutrf-mavlink-mqtt-publisher:latest - build: - context: . - dockerfile: Dockerfile.mqtt-publisher - networks: - - gamutrf - environment: - - MQTT_IP=mqtt - - MQTT_PORT=1883 - - MQTT_TOPIC=gamutrf/targets - - QUERY_INTERVAL=1 - - URL_LIST=[["drone", "http://mavlink-api-drone:8888/gps-data"], ["controller", "http://mavlink-api-controller:8888/gps-data"]] # Example: [["target1", "https://example1.com"], ["target2", "https://example2.com"]] - restart: unless-stopped - - + #mavlink-api-drone: + # image: iqtlabs/gamutrf-mavlink-api:latest + # build: + # context: . + # dockerfile: Dockerfile.mavlink-api + # ports: + # - "8889:8888" + # devices: + # - "/dev/ttyUSB0:/dev/tty.serial1" + # networks: + # - gamutrf + # restart: unless-stopped + #mavlink-api-controller: + # image: iqtlabs/gamutrf-mavlink-api:latest + # build: + # context: . + # dockerfile: Dockerfile.mavlink-api + # ports: + # - "8890:8888" + # devices: + # - "/dev/ttyUSB1:/dev/tty.serial1" + # networks: + # - gamutrf + # restart: unless-stopped + #mqtt-publisher: + # image: iqtlabs/gamutrf-mavlink-mqtt-publisher:latest + # build: + # context: . + # dockerfile: Dockerfile.mqtt-publisher + # networks: + # - gamutrf + # environment: + # - MQTT_IP=mqtt + # - MQTT_PORT=1883 + # - MQTT_TOPIC=gamutrf/targets + # - QUERY_INTERVAL=1 + # - URL_LIST=[["controller", "http://mavlink-api-controller:8888/gps-data"], ["drone", "http://mavlink-api-drone:8888/gps-data"]] # Example: [["target1", "https://example1.com"], ["target2", "https://example2.com"]] + # restart: unless-stopped \ No newline at end of file diff --git a/utils/mavlink-api/mqtt-publisher.py b/utils/mavlink-api/mqtt-publisher.py index bb47c11e..a0cd2d5b 100644 --- a/utils/mavlink-api/mqtt-publisher.py +++ b/utils/mavlink-api/mqtt-publisher.py @@ -13,50 +13,61 @@ os.environ.get("URL_LIST", '[["default_target", "http://127.0.0.1:8888/gps-data"]]') ) +mqtt_connect_flag = False + def on_connect(client, userdata, flags, rc): + global mqtt_connect_flag if rc == 0: - logging.info("Connected to MQTT Broker") + print("Connected to MQTT Broker") + mqtt_connect_flag = True else: - logging.error(f"Failed to connect, return code {rc}") + print(f"Failed to connect, return code {rc}") + + +def on_disconnect(client, userdata, flags, rc): + global mqtt_connect_flag + mqtt_connect_flag = False def publish_data(client, target_name, data): topic = f"{MQTT_TOPIC}" try: rc = client.publish(topic, data) - logging.info(f"Published data to {topic}: {data}") + print(f"Published data to {topic}: {data}") except ConnectionError as err: - logging.error("could not publish data: %s", err) + print("could not publish data: %s", err) else: if rc != 0: - logging.warning("could not publish data RC=%s", rc) + print(f"could not publish data RC={rc}") def fetch_and_publish(client, target_name, target_url): try: response = json.loads(httpx.get(f"{target_url}").text) - data = { - "target_name": target_name, - "gps_stale": response["gps_stale"], - "gps_fix_type": response["gps_fix_type"], - "time_boot_ms": response["time_boot_ms"], # mm - "time_usec": response["time_usec"], - "latitude": response["latitude"], # decimal degrees - "longitude": response["longitude"], # decimal degrees - "altitude": response["altitude"], # mm - "relative_alt": response["relative_alt"], # mm - "heading": response["heading"], # decimal degrees - "vx": response["vx"], # meters/second - "vy": response["vy"], # meters/second - "vz": response["vz"], # meters/second - } + if int(response["gps_fix_type"]) > 0: + data = { + "target_name": target_name, + "gps_stale": response["gps_stale"], + "gps_fix_type": response["gps_fix_type"], + "time_boot_ms": response["time_boot_ms"], # mm + "time_usec": response["time_usec"], + "latitude": response["latitude"], # decimal degrees + "longitude": response["longitude"], # decimal degrees + "altitude": response["altitude"], # mm + "relative_alt": response["relative_alt"], # mm + "heading": response["heading"], # decimal degrees + "vx": response["vx"], # meters/second + "vy": response["vy"], # meters/second + "vz": response["vz"], # meters/second + } + publish_data(client, target_name, json.dumps(data)) except (httpx.HTTPError, json.JSONDecodeError, KeyError) as err: - logging.warning(f"Could not update with {target_name}:{err}") + print(f"Could not update {target_name} with error:{err}") data = { "target_name": target_name, "gps_stale": None, @@ -72,31 +83,33 @@ def fetch_and_publish(client, target_name, target_url): "vy": None, # meters/second "vz": None, # meters/second } - publish_data(client, target_name, json.dumps(data)) def main(): + global mqtt_connect_flag while True: try: + print(f"Attempting to connect to MQTT {MQTT_BROKER}:{MQTT_PORT}") client = mqtt_client.Client() client.on_connect = on_connect + client.on_disconnect = on_disconnect client.connect(MQTT_BROKER, int(MQTT_PORT)) - logging.info("Connected to MQTT Broker") - except (ConnectionRefusedError, ConnectionError) as err: - logging.error( + # except (ConnectionRefusedError, ConnectionError) as err: + except Exception as err: + print( f"Could not connect to MQTT broker ({MQTT_BROKER}:{MQTT_PORT}): {err}" ) - time.sleep(5) + time.sleep(5) - while client.is_connected(): - logging.info(f"Initializing with {URL_LIST}") + while True: # Fix this to mqtt_connect_flag or client.is_connected() + print(f"Initializing with {URL_LIST}") for target in URL_LIST: if len(target) == 2: - logging.info(f"Attempting to retrieve data from {target}") + print(f"Attempting to retrieve data from {target}") target_name, target_url = target fetch_and_publish(client, target_name, target_url) else: - logging.warning( + print( "Invalid entry in URL_LIST. Each entry should be a 2-entry list." ) From 22d9cc068a990b93f5fd3cbccac2d9ff4a17ebc5 Mon Sep 17 00:00:00 2001 From: lk-iqt <112730501+lk-iqt@users.noreply.github.com> Date: Tue, 6 Feb 2024 15:17:19 -0500 Subject: [PATCH 5/5] Doc updates --- utils/mavlink-api/README.md | 9 +++++++++ .../{docker-compose.yaml => mavlink-api.yaml} | 0 2 files changed, 9 insertions(+) rename utils/mavlink-api/{docker-compose.yaml => mavlink-api.yaml} (100%) diff --git a/utils/mavlink-api/README.md b/utils/mavlink-api/README.md index 53d4775f..29db26c4 100644 --- a/utils/mavlink-api/README.md +++ b/utils/mavlink-api/README.md @@ -26,4 +26,13 @@ sudo udevadm control --reload-rules && sudo udevadm trigger Test again if it is working correctly: ```bash python3 utils/mavlink_serial_test.py /dev/pixhawk_serial +``` + +## Running with GamutRF + +To run with gamutRF add the `-f utils/mavlink-api/docker-compose.yml` and include mavlink-api for the gamutRF interface, and mavlink-api-controller, mavlink-api-drone, and mqtt-publisher for the controller API, drone API, and MQTT publisher respectively. + +Ex: +```bash +docker compose -f orchestrator.yml -f torchserve-cuda.yml -f utils/mavlink-api/mavlink-api.yaml -f geolocate.yml down mqtt mavlink-api-controller mavlink-api-drone geolocate mqtt-publisher ``` \ No newline at end of file diff --git a/utils/mavlink-api/docker-compose.yaml b/utils/mavlink-api/mavlink-api.yaml similarity index 100% rename from utils/mavlink-api/docker-compose.yaml rename to utils/mavlink-api/mavlink-api.yaml