Skip to content

Commit

Permalink
Update scripts
Browse files Browse the repository at this point in the history
  • Loading branch information
MysterAitch committed Sep 26, 2023
1 parent 84254db commit 14633da
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 46 deletions.
39 changes: 24 additions & 15 deletions adsbflightalert.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# for example squawk = 7700, flight = blah, etc
import json


def parseJSONfile(aircraft_json_path, filters):
"""
Returns a dict of dicts of aircraft matching the filter, with a
Expand All @@ -15,47 +16,55 @@ def parseJSONfile(aircraft_json_path, filters):

current_alert_count = 0
inside_box_count = 0
alerted_aircraft_list = {} # dict to store any aircraft matching the filter

# dict to store any aircraft matching the filter
alerted_aircraft_list = []

with open(aircraft_json_path + 'aircraft.json', 'r') as datafile:
live_data = json.load(datafile) # load live aircraft.json file
live_data = json.load(datafile) # load live aircraft.json file

for tracked_aircraft in live_data['aircraft']:

# fix any missing data in mode S returns
if 'flight' not in tracked_aircraft: # Fix blank callsign
if 'flight' not in tracked_aircraft: # Fix blank callsign
tracked_aircraft['flight'] = "NONE"
if 'squawk' not in tracked_aircraft: # Fix blank squawk
if 'squawk' not in tracked_aircraft: # Fix blank squawk
tracked_aircraft['squawk'] = "XXXX"

for filter in filters: # iterate through the filters for each plane in tbe json
for filter in filters: # iterate through the filters for each plane in tbe json
filter_type = filter[0]
filter_text = filter[1]

if filter_type != "area": # non-area filters
if filter_type != "area": # non-area filters
if filter_type in tracked_aircraft:
if filter_text in tracked_aircraft[filter_type]:
current_alert_count += 1
tracked_aircraft['matched'] = filter_type
alerted_aircraft_list[current_alert_count] = tracked_aircraft # add aircraft to alerted dict
else: # area filter
# add aircraft to alerted array
alerted_aircraft_list.append(tracked_aircraft)
else: # area filter
if 'lat' in tracked_aircraft:
if isInsideBox(filter_text, tracked_aircraft):
inside_box_count += 1
current_alert_count += 1
tracked_aircraft['matched'] = filter_type
alerted_aircraft_list[current_alert_count] = tracked_aircraft # add aircraft to inside-the-box dict
# add aircraft to inside-the-box dict
alerted_aircraft_list.append(tracked_aircraft)

# todo: add support for more than one area filter?
if (inside_box_count > 0):
print(inside_box_count, "aircraft in user-defined box")

if (current_alert_count > 0):
alerted_aircraft_list[0] = current_alert_count # set alerted planes count in dict. currently planes
# are counted twice if they are caught by two filters in the same pass
return alerted_aircraft_list
else:
return 0

return alerted_aircraft_list

#if (current_alert_count > 0):
# alerted_aircraft_list[0] = current_alert_count # set alerted planes count in dict. currently planes
#
# return alerted_aircraft_list
#else:
# return 0


def isInsideBox(coords_box, tracked_aircraft):
"""
Expand Down
132 changes: 101 additions & 31 deletions example.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,105 @@
# adsb-flightalert 0.5 - example script
# parse a running dump1090-whatever install's aircraft.json for particular criteria in a key
# for example squawk = 7700, flight = blah, etc
import json

import adsbflightalert
import time
import smtplib
import socket

def logAlerts(alerted_aircraft): # send to stdout
# add time/date to log? if running as service there is no need
# if not then it would be useful. can i detect which somehow?
num_of_alerts = alerted_aircraft[0]

while num_of_alerts > 0:
print(alerted_aircraft[num_of_alerts]['matched'], "alert for - Hex:", alerted_aircraft[num_of_alerts]['hex'], "Squawk:", alerted_aircraft[num_of_alerts]['squawk'], "Callsign:", alerted_aircraft[num_of_alerts]['flight'])
num_of_alerts -= 1
## Matches get logged into JSON files in the following format:
# {
# "flight": "SKW4879 ",
# "hex": "a65598",
# "squawk": "7500",
# "link": "https://adsb.oarc.uk/?icao=a65598",
# "alert_detected": "2023-09-26T23:06:28",
# "alert_last_seen": "2023-09-26T23:06:28",
# "alert_detection_stopped": "",
# "alert_status": "active",
# "last_seen": "2023-09-26T23:07:12"
# }
#
# {
# "flight": "SKW4879 ",
# "hex": "a65598",
# "squawk": "7500",
# "link": "https://adsb.oarc.uk/?icao=a65598",
# "alert_detected": "2023-09-26T23:06:28",
# "alert_last_seen": "2023-09-26T23:10:50",
# "alert_detection_stopped": "2023-09-26T23:10:56",
# "alert_status": "inactive",
# "last_seen": "2023-09-26T23:07:12"
# }


def logAlerts(alerted_aircraft):
print('Incoming/current aircraft count to be logged/updated: ' + str(len(alerted_aircraft)))

## File paths.
current_emergencies_file_path = "./emergencies.json"
historical_emergencies_file_path = "./emergencies_history.json"

## Use consistent timestamp for all entries made during this run.
detection_time = time.strftime("%Y-%m-%dT%H:%M:%S")

## Read in the current and historical emergencies files.
with open(current_emergencies_file_path, 'r') as current_emergencies_file:
current_emergencies_json = json.load(current_emergencies_file)
with open(historical_emergencies_file_path, 'r') as historical_emergencies_file:
historical_emergencies_json = json.load(historical_emergencies_file)

## Iterate through the list of incoming/current alerts.
for value in alerted_aircraft:
## Check to see if there is already an entry for this hex code which is active.
## If found, update the last seen time to now.
is_new = True
for entry in current_emergencies_json:
if (entry['hex'] == value['hex']) and (entry['alert_status'] == "active"):
entry['alert_last_seen'] = detection_time
is_new = False
## NB: Deliberately not breaking here in case there are multiple entries with the same hex (shouldn't be)

## If the hex isn't in the current list of matches, add it.
if (is_new):
new_entry = dict()
new_entry['flight'] = value['flight']
new_entry['hex'] = value['hex']
new_entry['squawk'] = value['squawk']
new_entry['link'] = "https://adsb.oarc.uk/?icao=" + value['hex']
new_entry['alert_detected'] = detection_time
new_entry['alert_last_seen'] = detection_time
new_entry['alert_detection_stopped'] = ""
new_entry['alert_status'] = "active"
current_emergencies_json.append(new_entry)

## We have now processed all incoming/current alerts.
## Any remaining active alerts which were not in the current list need to be marked as inactive.
active_entries = [entry for entry in current_emergencies_json if entry['alert_status'] == "active"]
for entry in active_entries:
matching_entries_by_hex = [value for value in alerted_aircraft if value['hex'] == entry['hex']]
if (len(matching_entries_by_hex) == 0):
## No match found, so this alert is no longer active.
entry['alert_status'] = "inactive"
entry['alert_detection_stopped'] = detection_time

## Any inactive alerts within the current emergencies json need to be moved across to the historical file.
newly_inactive_entries = [entry for entry in current_emergencies_json if entry['alert_status'] == "inactive"]
for entry in newly_inactive_entries:
historical_emergencies_json.append(entry)

## All inactive alerts have now been added to the historical file.
## Remove them from the list of current emergencies.
current_emergencies_json = [entry for entry in current_emergencies_json if entry['alert_status'] != "inactive"]

## Write the files back out
with open(current_emergencies_file_path, 'w') as current_emergencies_file:
current_emergencies_file.write(json.dumps(current_emergencies_json))
with open(historical_emergencies_file_path, 'w') as historical_emergencies_file:
historical_emergencies_file.write(json.dumps(historical_emergencies_json))


def main():
global alert
Expand All @@ -26,39 +112,23 @@ def main():

if current_time > start_time + check_delay:
scan_result = adsbflightalert.parseJSONfile(aircraft_json_path, filters)

if (scan_result != 0): # this is where you put your alert code!
alert = True
if not already_alerting:
# fire off notificiations here
print("Triggering alert state")
already_alerting = True
logAlerts(scan_result) # display the alerted flights
else: # do something here to update a notification?
# you could check alerted hex codes against a list, for example,
# so you don't alert planes more than neccesary, only showing new planes,
# or skip all alert actions entirely if already in alert mode
print("Continuing current alert state")
else: # code for no alert state here, such as cancelling a notification if one exists
alert = False
if already_alerting: # check if this needs cancelling
print("Canceling alert state")
already_alerting = False
# unlight an LED, change a display, etc
logAlerts(scan_result)
start_time = current_time

time.sleep(1)

alert = False # alert state
already_alerting = False # have we triggered an alert already?
check_delay = 60 # how often to check in seconds

aircraft_json_path = "/run/dump1090-fa/" # path to aircraft.json file
alert = False # alert state
already_alerting = False # have we triggered an alert already?
check_delay = 5 # how often to check in seconds

aircraft_json_path = "/run/dump1090-fa/" # path to aircraft.json file
# aircraft_json_path = "./" # path to aircraft.json file
filters = [
("squawk", "7500"),
("squawk", "7600"),
("squawk", "7700"),
# ("area", [(55.5, -4.0), (56.5, -3.3)]) # not used in example, but this is how you do an area check - a list of two tuplea
# ("area", [(55.5, -4.0), (56.5, -3.3)]) # not used in example, but this is how you do an area check - a list of two tuplea
]

print("==========")
Expand Down

0 comments on commit 14633da

Please sign in to comment.