Skip to content

Commit

Permalink
Merge pull request #992 from anarkiwi/lint
Browse files Browse the repository at this point in the history
lint pass and pytype support for mavlink-api.
  • Loading branch information
anarkiwi authored Nov 24, 2023
2 parents 0cabc77 + cdddf77 commit 8184657
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 40 deletions.
7 changes: 6 additions & 1 deletion .github/workflows/ci-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ jobs:
- name: Code Quality - Black
run: |
poetry run black gamutrf --check
poetry run black utils --check
poetry run black torchserve --check
poetry run black augment --check
- name: Code Quality - Pylint
run: |
poetry run pylint --fail-under=6 gamutrf/
Expand All @@ -71,7 +74,9 @@ jobs:
PYTHONPATH: /usr/local/lib/python3.10/dist-packages:/usr/lib/python3/dist-packages
run: |
sudo pip3 install pytype=="$(grep -E "pytype = " pyproject.toml | grep -Eo "[0-9\.]+")" && \
pytype -k gamutrf/
sudo pip3 install -U pyserial && \
pytype -k gamutrf/ && \
pytype -k utils/mavlink-api
- name: Test with pytest
env:
PYTHONPATH: /usr/local/lib/python3.10/dist-packages:/usr/lib/python3/dist-packages
Expand Down
30 changes: 11 additions & 19 deletions utils/mavlink-api/mavlink-api.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from flask import Flask, jsonify
from pymavlink import mavutil
import threading
import time
import serial
from flask import Flask, jsonify
from pymavlink import mavutil # pytype: disable=import-error

app = Flask(__name__)

Expand All @@ -16,6 +15,7 @@ def __init__(self):
self.latitude = None
self.longitude = None
self.altitude = None
self.heading = None
self.relative_alt = None
self.time_usec = None
self.vx = None
Expand All @@ -32,7 +32,7 @@ def __init__(self):
self.mavlink_thread.daemon = True
self.mavlink_thread.start()

def GLOBAL_POSITION_INT_parser(self, data: dict = None) -> dict:
def GLOBAL_POSITION_INT_parser(self, data=None):
"""Selected required data from data received from Pixhawk, and
convert to required units.
Args:
Expand All @@ -41,7 +41,7 @@ def GLOBAL_POSITION_INT_parser(self, data: dict = None) -> dict:
clean_data (dict): Required data to be published
"""
# If data supplied, else use last msg
if data == None:
if data is None:
data = self.latest_GLOBAL_POSITION_INT_msg.to_dict()

# Check for stale GPS
Expand All @@ -57,15 +57,12 @@ def GLOBAL_POSITION_INT_parser(self, data: dict = None) -> dict:
self.vx = data["vx"] / 100.0 # meters/second
self.vy = data["vy"] / 100.0 # meters/second
self.vz = data["vz"] / 100.0 # meters/second
return

def GPS_RAW_INT_parser(self, data: dict = None) -> dict:
def GPS_RAW_INT_parser(self, data=None):
"""Selected required data from data received from Pixhawk, and
convert to required units.
Args:
data (dict): Data received from the Pixhawk device
Returns:
clean_data (dict): Required data to be published
GPS_FIX_TYPE
[Enum] Type of GPS fix
Expand All @@ -82,7 +79,7 @@ def GPS_RAW_INT_parser(self, data: dict = None) -> dict:
8 GPS_FIX_TYPE_PPP PPP, 3D position.
"""
# If data supplied, else use last msg
if data == None:
if data is None:
data = self.latest_GPS_RAW_INT_msg.to_dict()

# Check for stale GPS
Expand All @@ -92,8 +89,6 @@ def GPS_RAW_INT_parser(self, data: dict = None) -> dict:
self.time_usec = data["time_usec"] # UNIX Epoch time uSec
self.gps_fix_type = data["fix_type"]

return

def gps_stale_check(self):
# Check for stale GPS data
if (time.time() - self.latest_GPS_RAW_INT_timestamp > self.STALE_TIMEOUT) or (
Expand Down Expand Up @@ -160,8 +155,7 @@ def get_latest_gps_fix_status():
),
200,
)
else:
return jsonify({"error": "No GPS data available"}), 404
return jsonify({"error": "No GPS data available"}), 404


@app.route("/gps-data", methods=["GET"])
Expand All @@ -170,8 +164,7 @@ def get_latest_gps_data():
mavlink_gps_handler.GLOBAL_POSITION_INT_parser()
msg = mavlink_gps_handler.create_gps_json_payload()
return jsonify(msg), 200
else:
return jsonify({"error": "No GPS data available"}), 404
return jsonify({"error": "No GPS data available"}), 404


@app.route("/heading", methods=["GET"])
Expand All @@ -187,12 +180,11 @@ def get_latest_heading():
),
200,
)
else:
return jsonify({"error": "No heading data available"}), 404
return jsonify({"error": "No heading data available"}), 404


def main():
app.run(host="0.0.0.0", port=8888) # nosec
app.run(host="0.0.0.0", port=8888) # nosec


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion utils/mavlink-api/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ authors = ["lk-iqt <[email protected]>"]
readme = "README.md"

[tool.poetry.dependencies]
python = "^3.11"
python = ">=3.10,<3.13"
Flask = "^3.0.0"
pymavlink = "^2.4.40"
pyserial = "^3.5"
Expand Down
38 changes: 20 additions & 18 deletions utils/mavlink-api/utils/gps-test.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,25 @@
import requests
import json
import logging
import os
import socket
import time
import csv
import time
from datetime import datetime

import gpsd
import httpx


def get_adafruit_gps():
try:
if gpsd.gpsd_stream is None:
gpsd.connect(host="127.0.0.1", port=2947)
packet = gpsd.get_current()
vals= {"timestamp": time.time(),
"position": packet.position(),
"altitude": packet.altitude(),
"gps_time": packet.get_time().timestamp(),
"map_url": packet.map_url(),
"heading": None,
"gps": "fix"}
vals = {
"timestamp": time.time(),
"position": packet.position(),
"altitude": packet.altitude(),
"gps_time": packet.get_time().timestamp(),
"map_url": packet.map_url(),
"heading": None,
"gps": "fix",
}
except (BrokenPipeError, gpsd.NoFixError, AttributeError) as err:
logging.error("could not update with GPS: %s", err)
vals = {
Expand All @@ -36,10 +33,11 @@ def get_adafruit_gps():
}
return vals


def get_pixhawk_gps():
try:
external_gps_msg = json.loads(httpx.get(f"http://127.0.0.1:8888/gps-data").text)

vals = {
"timestamp": time.time(),
"position": (
Expand All @@ -66,17 +64,21 @@ def get_pixhawk_gps():
}
return vals


def write_to_csv(filename, data):
with open(filename, 'a', newline='') as csvfile:
with open(filename, "a", newline="") as csvfile:
fieldnames = data.keys()
import csv

writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
if csvfile.tell() == 0:
writer.writeheader()
writer.writerow(data)


# Define the file names for the CSV files
adafruit_csv_filename = 'adafruit_gps_data.csv'
pixhawk_csv_filename = 'pixhawk_gps_data.csv'
adafruit_csv_filename = "adafruit_gps_data.csv"
pixhawk_csv_filename = "pixhawk_gps_data.csv"

while True:
# Get data from Adafruit GPS
Expand All @@ -92,4 +94,4 @@ def write_to_csv(filename, data):
print("Pixhawk GPS data written to CSV")

# Wait for one minute before the next iteration
time.sleep(60)
time.sleep(60)
2 changes: 1 addition & 1 deletion utils/mavlink-api/utils/mavlink_serial_test.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#! /usr/bin/env python3

import sys
from pymavlink import mavutil
from pymavlink import mavutil # pytype: disable=import-error

if len(sys.argv) < 2:
print("Usage: python mavlink_serial_test.py <serial_port>")
Expand Down

0 comments on commit 8184657

Please sign in to comment.