Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update scripts #2

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 24 additions & 15 deletions adsbflightalert.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# for example squawk = 7700, flight = blah, etc
import json


def parseJSONfile(aircraft_json_path, filters):
"""
Returns a dict of dicts of aircraft matching the filter, with a
Expand All @@ -15,47 +16,55 @@ def parseJSONfile(aircraft_json_path, filters):

current_alert_count = 0
inside_box_count = 0
alerted_aircraft_list = {} # dict to store any aircraft matching the filter

# dict to store any aircraft matching the filter
alerted_aircraft_list = []

with open(aircraft_json_path + 'aircraft.json', 'r') as datafile:
live_data = json.load(datafile) # load live aircraft.json file
live_data = json.load(datafile) # load live aircraft.json file

for tracked_aircraft in live_data['aircraft']:

# fix any missing data in mode S returns
if 'flight' not in tracked_aircraft: # Fix blank callsign
if 'flight' not in tracked_aircraft: # Fix blank callsign
tracked_aircraft['flight'] = "NONE"
if 'squawk' not in tracked_aircraft: # Fix blank squawk
if 'squawk' not in tracked_aircraft: # Fix blank squawk
tracked_aircraft['squawk'] = "XXXX"

for filter in filters: # iterate through the filters for each plane in tbe json
for filter in filters: # iterate through the filters for each plane in tbe json
filter_type = filter[0]
filter_text = filter[1]

if filter_type != "area": # non-area filters
if filter_type != "area": # non-area filters
if filter_type in tracked_aircraft:
if filter_text in tracked_aircraft[filter_type]:
current_alert_count += 1
tracked_aircraft['matched'] = filter_type
alerted_aircraft_list[current_alert_count] = tracked_aircraft # add aircraft to alerted dict
else: # area filter
# add aircraft to alerted array
alerted_aircraft_list.append(tracked_aircraft)
else: # area filter
if 'lat' in tracked_aircraft:
if isInsideBox(filter_text, tracked_aircraft):
inside_box_count += 1
current_alert_count += 1
tracked_aircraft['matched'] = filter_type
alerted_aircraft_list[current_alert_count] = tracked_aircraft # add aircraft to inside-the-box dict
# add aircraft to inside-the-box dict
alerted_aircraft_list.append(tracked_aircraft)

# todo: add support for more than one area filter?
if (inside_box_count > 0):
print(inside_box_count, "aircraft in user-defined box")

if (current_alert_count > 0):
alerted_aircraft_list[0] = current_alert_count # set alerted planes count in dict. currently planes
# are counted twice if they are caught by two filters in the same pass
return alerted_aircraft_list
else:
return 0

return alerted_aircraft_list

#if (current_alert_count > 0):
# alerted_aircraft_list[0] = current_alert_count # set alerted planes count in dict. currently planes
#
# return alerted_aircraft_list
#else:
# return 0


def isInsideBox(coords_box, tracked_aircraft):
"""
Expand Down
132 changes: 101 additions & 31 deletions example.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,105 @@
# adsb-flightalert 0.5 - example script
# parse a running dump1090-whatever install's aircraft.json for particular criteria in a key
# for example squawk = 7700, flight = blah, etc
import json

import adsbflightalert
import time
import smtplib
import socket

def logAlerts(alerted_aircraft): # send to stdout
# add time/date to log? if running as service there is no need
# if not then it would be useful. can i detect which somehow?
num_of_alerts = alerted_aircraft[0]

while num_of_alerts > 0:
print(alerted_aircraft[num_of_alerts]['matched'], "alert for - Hex:", alerted_aircraft[num_of_alerts]['hex'], "Squawk:", alerted_aircraft[num_of_alerts]['squawk'], "Callsign:", alerted_aircraft[num_of_alerts]['flight'])
num_of_alerts -= 1
## Matches get logged into JSON files in the following format:
# {
# "flight": "SKW4879 ",
# "hex": "a65598",
# "squawk": "7500",
# "link": "https://adsb.oarc.uk/?icao=a65598",
# "alert_detected": "2023-09-26T23:06:28",
# "alert_last_seen": "2023-09-26T23:06:28",
# "alert_detection_stopped": "",
# "alert_status": "active",
# "last_seen": "2023-09-26T23:07:12"
# }
#
# {
# "flight": "SKW4879 ",
# "hex": "a65598",
# "squawk": "7500",
# "link": "https://adsb.oarc.uk/?icao=a65598",
# "alert_detected": "2023-09-26T23:06:28",
# "alert_last_seen": "2023-09-26T23:10:50",
# "alert_detection_stopped": "2023-09-26T23:10:56",
# "alert_status": "inactive",
# "last_seen": "2023-09-26T23:07:12"
# }


def logAlerts(alerted_aircraft):
print('Incoming/current aircraft count to be logged/updated: ' + str(len(alerted_aircraft)))

## File paths.
current_emergencies_file_path = "./emergencies.json"
historical_emergencies_file_path = "./emergencies_history.json"

## Use consistent timestamp for all entries made during this run.
detection_time = time.strftime("%Y-%m-%dT%H:%M:%S")

## Read in the current and historical emergencies files.
with open(current_emergencies_file_path, 'r') as current_emergencies_file:
current_emergencies_json = json.load(current_emergencies_file)
with open(historical_emergencies_file_path, 'r') as historical_emergencies_file:
historical_emergencies_json = json.load(historical_emergencies_file)

## Iterate through the list of incoming/current alerts.
for value in alerted_aircraft:
## Check to see if there is already an entry for this hex code which is active.
## If found, update the last seen time to now.
is_new = True
for entry in current_emergencies_json:
if (entry['hex'] == value['hex']) and (entry['alert_status'] == "active"):
entry['alert_last_seen'] = detection_time
is_new = False
## NB: Deliberately not breaking here in case there are multiple entries with the same hex (shouldn't be)

## If the hex isn't in the current list of matches, add it.
if (is_new):
new_entry = dict()
new_entry['flight'] = value['flight']
new_entry['hex'] = value['hex']
new_entry['squawk'] = value['squawk']
new_entry['link'] = "https://adsb.oarc.uk/?icao=" + value['hex']
new_entry['alert_detected'] = detection_time
new_entry['alert_last_seen'] = detection_time
new_entry['alert_detection_stopped'] = ""
new_entry['alert_status'] = "active"
current_emergencies_json.append(new_entry)

## We have now processed all incoming/current alerts.
## Any remaining active alerts which were not in the current list need to be marked as inactive.
active_entries = [entry for entry in current_emergencies_json if entry['alert_status'] == "active"]
for entry in active_entries:
matching_entries_by_hex = [value for value in alerted_aircraft if value['hex'] == entry['hex']]
if (len(matching_entries_by_hex) == 0):
## No match found, so this alert is no longer active.
entry['alert_status'] = "inactive"
entry['alert_detection_stopped'] = detection_time

## Any inactive alerts within the current emergencies json need to be moved across to the historical file.
newly_inactive_entries = [entry for entry in current_emergencies_json if entry['alert_status'] == "inactive"]
for entry in newly_inactive_entries:
historical_emergencies_json.append(entry)

## All inactive alerts have now been added to the historical file.
## Remove them from the list of current emergencies.
current_emergencies_json = [entry for entry in current_emergencies_json if entry['alert_status'] != "inactive"]

## Write the files back out
with open(current_emergencies_file_path, 'w') as current_emergencies_file:
current_emergencies_file.write(json.dumps(current_emergencies_json))
with open(historical_emergencies_file_path, 'w') as historical_emergencies_file:
historical_emergencies_file.write(json.dumps(historical_emergencies_json))


def main():
global alert
Expand All @@ -26,39 +112,23 @@ def main():

if current_time > start_time + check_delay:
scan_result = adsbflightalert.parseJSONfile(aircraft_json_path, filters)

if (scan_result != 0): # this is where you put your alert code!
alert = True
if not already_alerting:
# fire off notificiations here
print("Triggering alert state")
already_alerting = True
logAlerts(scan_result) # display the alerted flights
else: # do something here to update a notification?
# you could check alerted hex codes against a list, for example,
# so you don't alert planes more than neccesary, only showing new planes,
# or skip all alert actions entirely if already in alert mode
print("Continuing current alert state")
else: # code for no alert state here, such as cancelling a notification if one exists
alert = False
if already_alerting: # check if this needs cancelling
print("Canceling alert state")
already_alerting = False
# unlight an LED, change a display, etc
logAlerts(scan_result)
start_time = current_time

time.sleep(1)

alert = False # alert state
already_alerting = False # have we triggered an alert already?
check_delay = 60 # how often to check in seconds

aircraft_json_path = "/run/dump1090-fa/" # path to aircraft.json file
alert = False # alert state
already_alerting = False # have we triggered an alert already?
check_delay = 5 # how often to check in seconds

aircraft_json_path = "/run/dump1090-fa/" # path to aircraft.json file
# aircraft_json_path = "./" # path to aircraft.json file
filters = [
("squawk", "7500"),
("squawk", "7600"),
("squawk", "7700"),
# ("area", [(55.5, -4.0), (56.5, -3.3)]) # not used in example, but this is how you do an area check - a list of two tuplea
# ("area", [(55.5, -4.0), (56.5, -3.3)]) # not used in example, but this is how you do an area check - a list of two tuplea
]

print("==========")
Expand Down