-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
frogmasta
authored and
frogmasta
committed
Oct 12, 2023
0 parents
commit 8764199
Showing
6 changed files
with
288 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
__pycache__ |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,100 @@ | ||
""" | ||
Simple demonstration of Mavlink communication using Pymavlink. | ||
Gives a simple example of a Python thread which sends heartbeats to the server. | ||
NOTE: Launch AFTER `server.py` | ||
October 12th 2023 | ||
Eric Roth | ||
""" | ||
|
||
import os | ||
import time | ||
from threading import Thread | ||
|
||
from pymavlink import mavutil | ||
import pymavlink.dialects.v20.ardupilotmega as dialect | ||
|
||
import keyboard_handler | ||
|
||
# Set Mavlink2 as our default | ||
os.environ['MAVLINK20'] = '1' | ||
|
||
# File constants | ||
HEARTBEAT_INTERVAL = 1 | ||
|
||
def heartbeat(client: mavutil.mavfile): | ||
""" | ||
Sends 1 [s] spaced heartbeats to the vehicle | ||
""" | ||
mav: dialect.MAVLink = client.mav | ||
last_beat = 0 | ||
|
||
# 1 [s] loop | ||
while True: | ||
loop_time = time.perf_counter() | ||
|
||
# Check to make sure it's actually time to send | ||
if loop_time - last_beat >= HEARTBEAT_INTERVAL: | ||
mav.heartbeat_send(dialect.MAV_TYPE_SUBMARINE, dialect.MAV_AUTOPILOT_INVALID, 0, 0, 0) | ||
last_beat = time.perf_counter() | ||
|
||
# Sleep until next heartbeat | ||
time.sleep(HEARTBEAT_INTERVAL - (last_beat - loop_time)) | ||
|
||
def get_new_messages(client: mavutil.mavfile): | ||
""" | ||
Clears out the message buffer and returns a list of messages | ||
""" | ||
msgs = [] | ||
while True: | ||
msg: dialect.MAVLink_message = client.recv_msg() | ||
|
||
if msg is None: | ||
break | ||
|
||
msgs.append(msg) | ||
|
||
return msgs | ||
|
||
def main(): | ||
print("Establishing connection to the server...") | ||
|
||
# Connect to the server on localhost port 14550 | ||
client: mavutil.mavfile = mavutil.mavlink_connection('tcp:127.0.0.1:14550', dialect="ardupilotmega", autoreconnect=True) | ||
|
||
print("Connected!") | ||
|
||
# Set up the heartbeat thread | ||
heartbeat_thread = Thread(target=heartbeat, daemon=True, args=(client,)) | ||
heartbeat_thread.start() | ||
|
||
# Set up the keyboard handler | ||
keyboard_handler.initialize_keyboard() | ||
|
||
# Client loop | ||
print("Press [`] to quit the client!") | ||
while True: | ||
# Check any incoming messages | ||
new_msgs = get_new_messages(client) | ||
|
||
# Print any new messages to the terminal | ||
for msg in new_msgs: | ||
msg: dialect.MAVLink_message # Type hinting | ||
print(f"Received message of type {msg.get_type()}...") | ||
print(msg) | ||
|
||
# Check any keyboard input | ||
quit = keyboard_handler.get_quit() | ||
if quit: | ||
break | ||
|
||
# Shut down the keyboard | ||
keyboard_handler.destroy_keyboard() | ||
|
||
# No need to join the heartbeat thread as we declared it a daemon thread earlier (i.e. it stops running when the main thread stops) | ||
|
||
print("Shutting down...") | ||
|
||
if __name__ == "__main__": | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
""" | ||
Handles I/O for quitting the server/client programs. | ||
Uses basic synchronization techniques to demonstrate how they work in Python. Not required because of the Global Interpretor Lock (GIL). | ||
On the use of `blocking=True`, we are using the spinlock paradigm. Look into the linux kernel if you're interested. | ||
Also uses `global` variables which is BAD practice! Use a class to encapsulate all the state. | ||
October 12th 2023 | ||
Eric Roth | ||
""" | ||
|
||
import threading | ||
from threading import Thread | ||
|
||
from sshkeyboard import listen_keyboard, stop_listening | ||
|
||
|
||
# Global state | ||
quit_thread: Thread = None | ||
quit_lock = threading.Lock() | ||
quit = 0 | ||
|
||
def get_quit(): | ||
""" | ||
Allows other files to access the quit variable (w/ necessary synchronization) | ||
""" | ||
global quit | ||
|
||
quit_lock.acquire(blocking=True) | ||
ret = quit | ||
quit_lock.release() | ||
return ret | ||
|
||
def on_key_press(key): | ||
""" | ||
Handler for pressing a key on the keyboard | ||
""" | ||
global quit | ||
|
||
# Check if `enter` character is pressed | ||
if key == '`': | ||
quit_lock.acquire(blocking=True) | ||
quit = 1 | ||
quit_lock.release() | ||
|
||
def on_key_release(key): | ||
""" | ||
Handler for releasing a key on the keyboard | ||
""" | ||
return | ||
|
||
def initialize_keyboard(): | ||
""" | ||
Initializes the keyboard thread | ||
""" | ||
global quit_thread | ||
|
||
quit_thread = Thread(target=listen_keyboard, daemon=True, kwargs={"on_press": on_key_press, "on_release": on_key_release, "sleep": 0.01}) | ||
quit_thread.start() | ||
|
||
def destroy_keyboard(): | ||
""" | ||
Cleans up the keyboard | ||
""" | ||
global quit_thread | ||
|
||
stop_listening() | ||
quit_thread.join() | ||
|
||
if __name__ == "__main__": | ||
""" | ||
Sample program which demonstartes use of the keyboard handler. | ||
""" | ||
initialize_keyboard() | ||
|
||
while True: | ||
q = get_quit() | ||
if q: | ||
break | ||
|
||
destroy_keyboard() | ||
print("Quit the program!") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
pymavlink | ||
sshkeyboard |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
""" | ||
Simple demonstration of Mavlink communication using Pymavlink. | ||
NOTE: Launch BEFORE `client.py` | ||
October 12th 2023 | ||
Eric Roth | ||
""" | ||
|
||
import os | ||
import time | ||
from threading import Thread | ||
|
||
from pymavlink import mavutil | ||
import pymavlink.dialects.v20.ardupilotmega as dialect | ||
|
||
import keyboard_handler | ||
|
||
# Set Mavlink2 as our default | ||
os.environ['MAVLINK20'] = '1' | ||
|
||
# File constants | ||
HEARTBEAT_INTERVAL = 1 | ||
|
||
def heartbeat(server: mavutil.mavfile): | ||
""" | ||
Sends 1 [s] spaced heartbeats to the vehicle | ||
""" | ||
mav: dialect.MAVLink = server.mav | ||
last_beat = 0 | ||
|
||
# 1 [s] loop | ||
while True: | ||
loop_time = time.perf_counter() | ||
|
||
# Check to make sure it's actually time to send | ||
if loop_time - last_beat >= HEARTBEAT_INTERVAL: | ||
mav.heartbeat_send(dialect.MAV_TYPE_GCS, dialect.MAV_AUTOPILOT_INVALID, 0, 0, 0) | ||
last_beat = time.perf_counter() | ||
|
||
# Sleep until next heartbeat | ||
time.sleep(HEARTBEAT_INTERVAL - (last_beat - loop_time)) | ||
|
||
def get_new_messages(client: mavutil.mavfile): | ||
""" | ||
Clears out the message buffer and returns a list of messages | ||
""" | ||
msgs = [] | ||
while True: | ||
msg: dialect.MAVLink_message = client.recv_msg() | ||
|
||
if msg is None: | ||
break | ||
|
||
msgs.append(msg) | ||
|
||
return msgs | ||
|
||
def main(): | ||
print("Opening up server connection...") | ||
|
||
# Listen for incoming UDP connection on localhost port 14550 | ||
server: mavutil.mavfile = mavutil.mavlink_connection('tcpin:127.0.0.1:14550', dialect="ardupilotmega", autoreconnect=True) | ||
|
||
# Confirm the connection is valid | ||
server.wait_heartbeat() | ||
|
||
# Print out vehicle information | ||
print("Heartbeat from system (system %u component %u)" % (server.target_system, server.target_component)) | ||
|
||
# Set up the heartbeat thread | ||
heartbeat_thread = Thread(target=heartbeat, daemon=True, args=(server,)) | ||
heartbeat_thread.start() | ||
|
||
# Initialize the keyboard handler | ||
keyboard_handler.initialize_keyboard() | ||
|
||
# Server loop | ||
print("Press [`] to quit the client!") | ||
while True: | ||
# Check any incoming messages | ||
new_msgs = get_new_messages(server) | ||
|
||
# Print any new messages to the terminal | ||
for msg in new_msgs: | ||
msg: dialect.MAVLink_message # Type hinting | ||
print(f"Received message of type {msg.get_type()}...") | ||
print(msg) | ||
|
||
# Check | ||
quit = keyboard_handler.get_quit() | ||
if quit: | ||
break | ||
|
||
# Destroy the keyboard | ||
keyboard_handler.destroy_keyboard() | ||
|
||
# No need to join the heartbeat thread as we declared it a daemon thread earlier (i.e. it stops running when the main thread stops) | ||
|
||
print("Shutting down...") | ||
|
||
if __name__ == "__main__": | ||
main() |