Skip to content

Commit

Permalink
Formatting and Dockerfile sha hash
Browse files Browse the repository at this point in the history
  • Loading branch information
lk-iqt committed Nov 1, 2023
1 parent 5e2639d commit a568579
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 56 deletions.
2 changes: 1 addition & 1 deletion utils/mavlink-api/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:3.11-slim
FROM python:3.11-slim@sha256:1591aa8c01b5b37ab31dbe5662c5bdcf40c2f1bce4ef1c1fd24802dae3d01052

WORKDIR /app
ENV PYTHONPATH=${PYTHONPATH}:${PWD}
Expand Down
125 changes: 74 additions & 51 deletions utils/mavlink-api/mavlink-api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
import serial

app = Flask(__name__)


class MAVLINKGPSHandler:
def __init__(self):

self.STALE_TIMEOUT = 30 #seconds
self.STALE_TIMEOUT = 30 # seconds

self.gps_stale = None
self.gps_fix_type = None
Expand All @@ -26,7 +27,7 @@ def __init__(self):
self.latest_GLOBAL_POSITION_INT_timestamp = None
self.latest_GPS_RAW_INT_msg = None
self.latest_GPS_RAW_INT_timestamp = None

self.mavlink_thread = threading.Thread(target=self.handle_mavlink_messages)
self.mavlink_thread.daemon = True
self.mavlink_thread.start()
Expand All @@ -39,24 +40,24 @@ def GLOBAL_POSITION_INT_parser(self, data: dict = None) -> dict:
Returns:
clean_data (dict): Required data to be published
"""
#If data supplied, else use last msg
# If data supplied, else use last msg
if data == None:
data = self.latest_GLOBAL_POSITION_INT_msg.to_dict()

#Check for stale GPS
# Check for stale GPS
self.gps_stale_check()

#Parse msg data
self.time_boot_ms = data["time_boot_ms"] #mm
self.latitude = data["lat"]/10000000 # decimal degrees
self.longitude = data["lon"]/10000000 # decimal degrees
self.altitude = data["alt"] #mm
self.relative_alt = data["relative_alt"] #mm
self.heading = data["hdg"]/100.0 # decimal degrees
self.vx = data["vx"]/100.0 # meters/second
self.vy = data["vy"]/100.0 # meters/second
self.vz = data["vz"]/100.0 # meters/second
return
# Parse msg data
self.time_boot_ms = data["time_boot_ms"] # mm
self.latitude = data["lat"] / 10000000 # decimal degrees
self.longitude = data["lon"] / 10000000 # decimal degrees
self.altitude = data["alt"] # mm
self.relative_alt = data["relative_alt"] # mm
self.heading = data["hdg"] / 100.0 # decimal degrees
self.vx = data["vx"] / 100.0 # meters/second
self.vy = data["vy"] / 100.0 # meters/second
self.vz = data["vz"] / 100.0 # meters/second
return

def GPS_RAW_INT_parser(self, data: dict = None) -> dict:
"""Selected required data from data received from Pixhawk, and
Expand All @@ -65,7 +66,7 @@ def GPS_RAW_INT_parser(self, data: dict = None) -> dict:
data (dict): Data received from the Pixhawk device
Returns:
clean_data (dict): Required data to be published
GPS_FIX_TYPE
[Enum] Type of GPS fix
Expand All @@ -80,43 +81,43 @@ def GPS_RAW_INT_parser(self, data: dict = None) -> dict:
7 GPS_FIX_TYPE_STATIC Static fixed, typically used for base stations
8 GPS_FIX_TYPE_PPP PPP, 3D position.
"""
#If data supplied, else use last msg
# If data supplied, else use last msg
if data == None:
data = self.latest_GPS_RAW_INT_msg.to_dict()

#Check for stale GPS
# Check for stale GPS
self.gps_stale_check()

#Update fix type
self.time_usec = data["time_usec"] #UNIX Epoch time uSec
self.gps_fix_type=data["fix_type"]
# Update fix type
self.time_usec = data["time_usec"] # UNIX Epoch time uSec
self.gps_fix_type = data["fix_type"]

return

def gps_stale_check(self):
#Check for stale GPS data
if (time.time() - self.latest_GPS_RAW_INT_timestamp > self.STALE_TIMEOUT) or \
(time.time() - self.latest_GLOBAL_POSITION_INT_timestamp > self.STALE_TIMEOUT):
# Check for stale GPS data
if (time.time() - self.latest_GPS_RAW_INT_timestamp > self.STALE_TIMEOUT) or (
time.time() - self.latest_GLOBAL_POSITION_INT_timestamp > self.STALE_TIMEOUT
):
self.gps_stale = True
else:
self.gps_stale = False


def create_gps_json_payload(self):
#Check for stale GPS
# Check for stale GPS
self.gps_stale_check()

#Create payload dict for json
payload={}
# Create payload dict for json
payload = {}

payload["gps_stale"] = self.gps_stale
payload["gps_fix_type"] = self.gps_fix_type
payload["time_boot_ms"] = self.time_boot_ms #mm
payload["time_boot_ms"] = self.time_boot_ms # mm
payload["time_usec"] = self.time_usec
payload["latitude"] = self.latitude # decimal degrees
payload["longitude"] = self.longitude # decimal degrees
payload["altitude"] = self.altitude #mm
payload["relative_alt"] = self.relative_alt #mm
payload["altitude"] = self.altitude # mm
payload["relative_alt"] = self.relative_alt # mm
payload["heading"] = self.heading # decimal degrees
payload["vx"] = self.vx # meters/second
payload["vy"] = self.vy # meters/second
Expand All @@ -130,47 +131,69 @@ def handle_mavlink_messages(self):
mavlink_connection = mavutil.mavlink_connection("/dev/tty.serial1", 57600)

while True:
#msg = mavlink_connection.recv_match(type='GLOBAL_POSITION_INT', blocking=True)
# msg = mavlink_connection.recv_match(type='GLOBAL_POSITION_INT', blocking=True)
msg = mavlink_connection.recv_match(blocking=True)
if msg.get_type() == 'GLOBAL_POSITION_INT':
if msg.get_type() == "GLOBAL_POSITION_INT":
self.latest_GLOBAL_POSITION_INT_msg = msg
self.latest_GLOBAL_POSITION_INT_timestamp=time.time()
elif msg.get_type() == 'GPS_RAW_INT':
self.latest_GLOBAL_POSITION_INT_timestamp = time.time()
elif msg.get_type() == "GPS_RAW_INT":
self.latest_GPS_RAW_INT_msg = msg
self.latest_GPS_RAW_INT_timestamp=time.time()
self.latest_GPS_RAW_INT_timestamp = time.time()
msg


# Store the latest GPS data handler
mavlink_gps_handler = MAVLINKGPSHandler()


# Routes for latest GPS Data
@app.route('/gps-fix-status', methods=['GET'])
@app.route("/gps-fix-status", methods=["GET"])
def get_latest_gps_fix_status():
if mavlink_gps_handler.latest_GPS_RAW_INT_msg:
mavlink_gps_handler.GPS_RAW_INT_parser()
return jsonify({"fix_type":mavlink_gps_handler.gps_fix_type,"gps_stale":mavlink_gps_handler.gps_stale}), 200
return (
jsonify(
{
"fix_type": mavlink_gps_handler.gps_fix_type,
"gps_stale": mavlink_gps_handler.gps_stale,
}
),
200,
)
else:
return jsonify({'error': 'No GPS data available'}), 404

@app.route('/gps-data', methods=['GET'])
return jsonify({"error": "No GPS data available"}), 404


@app.route("/gps-data", methods=["GET"])
def get_latest_gps_data():
if mavlink_gps_handler.latest_GLOBAL_POSITION_INT_msg:
mavlink_gps_handler.GLOBAL_POSITION_INT_parser()
msg=mavlink_gps_handler.create_gps_json_payload()
msg = mavlink_gps_handler.create_gps_json_payload()
return jsonify(msg), 200
else:
return jsonify({'error': 'No GPS data available'}), 404

@app.route('/heading', methods=['GET'])
return jsonify({"error": "No GPS data available"}), 404


@app.route("/heading", methods=["GET"])
def get_latest_heading():
if mavlink_gps_handler.latest_GLOBAL_POSITION_INT_msg:
mavlink_gps_handler.GLOBAL_POSITION_INT_parser()
return jsonify({"heading":mavlink_gps_handler.heading,"gps_stale":mavlink_gps_handler.gps_stale}), 200
return (
jsonify(
{
"heading": mavlink_gps_handler.heading,
"gps_stale": mavlink_gps_handler.gps_stale,
}
),
200,
)
else:
return jsonify({'error': 'No heading data available'}), 404
return jsonify({"error": "No heading data available"}), 404


def main():
app.run(host='0.0.0.0', port=8888)
app.run(host="0.0.0.0", port=8888)


if __name__ == '__main__':
if __name__ == "__main__":
main()
8 changes: 4 additions & 4 deletions utils/mavlink-api/utils/mavlink_serial_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@
try:
mav = mavutil.mavlink_connection(serial_port)
print("Connected to MAVLink at", serial_port)

# You can perform further operations here

# For example, you can continuously read messages
while True:
message = mav.recv_match(blocking=True)
if message is not None:
print("Received:", message)

# You can add more logic here based on the received messages

except Exception as e:
print("Error:", str(e))

0 comments on commit a568579

Please sign in to comment.