Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lint pass and pytype support for mavlink-api. #992

Merged
merged 1 commit into from
Nov 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion .github/workflows/ci-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ jobs:
- name: Code Quality - Black
run: |
poetry run black gamutrf --check
poetry run black utils --check
poetry run black torchserve --check
poetry run black augment --check
- name: Code Quality - Pylint
run: |
poetry run pylint --fail-under=6 gamutrf/
Expand All @@ -71,7 +74,9 @@ jobs:
PYTHONPATH: /usr/local/lib/python3.10/dist-packages:/usr/lib/python3/dist-packages
run: |
sudo pip3 install pytype=="$(grep -E "pytype = " pyproject.toml | grep -Eo "[0-9\.]+")" && \
pytype -k gamutrf/
sudo pip3 install -U pyserial && \
pytype -k gamutrf/ && \
pytype -k utils/mavlink-api
- name: Test with pytest
env:
PYTHONPATH: /usr/local/lib/python3.10/dist-packages:/usr/lib/python3/dist-packages
Expand Down
30 changes: 11 additions & 19 deletions utils/mavlink-api/mavlink-api.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from flask import Flask, jsonify
from pymavlink import mavutil
import threading
import time
import serial
from flask import Flask, jsonify
from pymavlink import mavutil # pytype: disable=import-error

app = Flask(__name__)

Expand All @@ -16,6 +15,7 @@ def __init__(self):
self.latitude = None
self.longitude = None
self.altitude = None
self.heading = None
self.relative_alt = None
self.time_usec = None
self.vx = None
Expand All @@ -32,7 +32,7 @@ def __init__(self):
self.mavlink_thread.daemon = True
self.mavlink_thread.start()

def GLOBAL_POSITION_INT_parser(self, data: dict = None) -> dict:
def GLOBAL_POSITION_INT_parser(self, data=None):
"""Selected required data from data received from Pixhawk, and
convert to required units.
Args:
Expand All @@ -41,7 +41,7 @@ def GLOBAL_POSITION_INT_parser(self, data: dict = None) -> dict:
clean_data (dict): Required data to be published
"""
# If data supplied, else use last msg
if data == None:
if data is None:
data = self.latest_GLOBAL_POSITION_INT_msg.to_dict()

# Check for stale GPS
Expand All @@ -57,15 +57,12 @@ def GLOBAL_POSITION_INT_parser(self, data: dict = None) -> dict:
self.vx = data["vx"] / 100.0 # meters/second
self.vy = data["vy"] / 100.0 # meters/second
self.vz = data["vz"] / 100.0 # meters/second
return

def GPS_RAW_INT_parser(self, data: dict = None) -> dict:
def GPS_RAW_INT_parser(self, data=None):
"""Selected required data from data received from Pixhawk, and
convert to required units.
Args:
data (dict): Data received from the Pixhawk device
Returns:
clean_data (dict): Required data to be published

GPS_FIX_TYPE
[Enum] Type of GPS fix
Expand All @@ -82,7 +79,7 @@ def GPS_RAW_INT_parser(self, data: dict = None) -> dict:
8 GPS_FIX_TYPE_PPP PPP, 3D position.
"""
# If data supplied, else use last msg
if data == None:
if data is None:
data = self.latest_GPS_RAW_INT_msg.to_dict()

# Check for stale GPS
Expand All @@ -92,8 +89,6 @@ def GPS_RAW_INT_parser(self, data: dict = None) -> dict:
self.time_usec = data["time_usec"] # UNIX Epoch time uSec
self.gps_fix_type = data["fix_type"]

return

def gps_stale_check(self):
# Check for stale GPS data
if (time.time() - self.latest_GPS_RAW_INT_timestamp > self.STALE_TIMEOUT) or (
Expand Down Expand Up @@ -160,8 +155,7 @@ def get_latest_gps_fix_status():
),
200,
)
else:
return jsonify({"error": "No GPS data available"}), 404
return jsonify({"error": "No GPS data available"}), 404


@app.route("/gps-data", methods=["GET"])
Expand All @@ -170,8 +164,7 @@ def get_latest_gps_data():
mavlink_gps_handler.GLOBAL_POSITION_INT_parser()
msg = mavlink_gps_handler.create_gps_json_payload()
return jsonify(msg), 200
else:
return jsonify({"error": "No GPS data available"}), 404
return jsonify({"error": "No GPS data available"}), 404


@app.route("/heading", methods=["GET"])
Expand All @@ -187,12 +180,11 @@ def get_latest_heading():
),
200,
)
else:
return jsonify({"error": "No heading data available"}), 404
return jsonify({"error": "No heading data available"}), 404


def main():
app.run(host="0.0.0.0", port=8888) # nosec
app.run(host="0.0.0.0", port=8888) # nosec


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion utils/mavlink-api/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ authors = ["lk-iqt <[email protected]>"]
readme = "README.md"

[tool.poetry.dependencies]
python = "^3.11"
python = ">=3.10,<3.13"
Flask = "^3.0.0"
pymavlink = "^2.4.40"
pyserial = "^3.5"
Expand Down
38 changes: 20 additions & 18 deletions utils/mavlink-api/utils/gps-test.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,25 @@
import requests
import json
import logging
import os
import socket
import time
import csv
import time
from datetime import datetime

import gpsd
import httpx


def get_adafruit_gps():
try:
if gpsd.gpsd_stream is None:
gpsd.connect(host="127.0.0.1", port=2947)
packet = gpsd.get_current()
vals= {"timestamp": time.time(),
"position": packet.position(),
"altitude": packet.altitude(),
"gps_time": packet.get_time().timestamp(),
"map_url": packet.map_url(),
"heading": None,
"gps": "fix"}
vals = {
"timestamp": time.time(),
"position": packet.position(),
"altitude": packet.altitude(),
"gps_time": packet.get_time().timestamp(),
"map_url": packet.map_url(),
"heading": None,
"gps": "fix",
}
except (BrokenPipeError, gpsd.NoFixError, AttributeError) as err:
logging.error("could not update with GPS: %s", err)
vals = {
Expand All @@ -36,10 +33,11 @@ def get_adafruit_gps():
}
return vals


def get_pixhawk_gps():
try:
external_gps_msg = json.loads(httpx.get(f"http://127.0.0.1:8888/gps-data").text)

vals = {
"timestamp": time.time(),
"position": (
Expand All @@ -66,17 +64,21 @@ def get_pixhawk_gps():
}
return vals


def write_to_csv(filename, data):
with open(filename, 'a', newline='') as csvfile:
with open(filename, "a", newline="") as csvfile:
fieldnames = data.keys()
import csv

writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
if csvfile.tell() == 0:
writer.writeheader()
writer.writerow(data)


# Define the file names for the CSV files
adafruit_csv_filename = 'adafruit_gps_data.csv'
pixhawk_csv_filename = 'pixhawk_gps_data.csv'
adafruit_csv_filename = "adafruit_gps_data.csv"
pixhawk_csv_filename = "pixhawk_gps_data.csv"

while True:
# Get data from Adafruit GPS
Expand All @@ -92,4 +94,4 @@ def write_to_csv(filename, data):
print("Pixhawk GPS data written to CSV")

# Wait for one minute before the next iteration
time.sleep(60)
time.sleep(60)
2 changes: 1 addition & 1 deletion utils/mavlink-api/utils/mavlink_serial_test.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#! /usr/bin/env python3

import sys
from pymavlink import mavutil
from pymavlink import mavutil # pytype: disable=import-error

if len(sys.argv) < 2:
print("Usage: python mavlink_serial_test.py <serial_port>")
Expand Down
Loading