forked from IQTLabs/SkyScan
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsbs1.py
175 lines (155 loc) · 5.49 KB
/
sbs1.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
"""
SBS-1 parser in python
The parser is a Python conversion of the JavaScript one in the node-sbs1
project by John Wiseman (github.com/wiseman/node-sbs1)
"""
from typing import *
from datetime import datetime
import logging
import re
try:
import dateutil.parser
except ImportError as e:
import sys
print("dateutil module not installed, try 'sudo pip install python-dateutil'")
sys.exit(1)
ES_IDENT_AND_CATEGORY = 1
ES_SURFACE_POS = 2
ES_AIRBORNE_POS = 3
ES_AIRBORNE_VEL = 4
SURVEILLANCE_ALT = 5
SURVEILLANCE_ID = 6
AIR_TO_AIR = 7
ALL_CALL_REPLY = 8
def parse(msg: str) -> Dict[str, Union[str, int, float, bool, datetime]]:
"""Parse message from the feed output by dump1090 on port 30003
A dict is returned withAn SBS-1 message has the following attributes:
messageType : string
transmissionType : sbs1.TransmissionType
sessionID : int
aircraftID : int
icao24 : string
flightID : int
generatedDate : datetime
loggedDate : datetime
callsign : string
altitude : int
groundSpeed : int
track : int
lat : float
lon : float
verticalRate : int
squawk : int
alert : bool
emergency : bool
spi : bool
onGround : bool
None is returned if the message was not valid
A field not present in the parsed message will be set to None. For a
description of the attributes, please see github.com/wiseman/node-sbs1
"""
if msg is None:
return None
sbs1 = {}
parts = msg.lstrip().rstrip().split(',')
try:
# logging.debug("%s %s %s" % (parts[1], parts[4], ",".join(parts[10:])))
sbs1["messageType"] = __parseString(parts, 0)
if sbs1["messageType"] != "MSG":
return None
sbs1["transmissionType"] = __parseInt(parts, 1)
sbs1["sessionID"] = __parseString(parts, 2)
sbs1["aircraftID"] = __parseString(parts, 3)
sbs1["icao24"] = __parseString(parts, 4)
sbs1["flightID"] = __parseString(parts, 5)
sbs1["generatedDate"] = __parseDateTime(parts, 6, 7)
sbs1["loggedDate"] = __parseDateTime(parts, 8, 9)
sbs1["callsign"] = __parseString(parts, 10)
if sbs1["callsign"]:
sbs1["callsign"] = sbs1["callsign"].rstrip()
sbs1["altitude"] = __parseInt(parts, 11)
sbs1["groundSpeed"] = __parseInt(parts, 12)
sbs1["track"] = __parseInt(parts, 13)
sbs1["lat"] = __parseFloat(parts, 14)
sbs1["lon"] = __parseFloat(parts, 15)
sbs1["verticalRate"] = __parseInt(parts, 16)
sbs1["squawk"] = __parseInt(parts, 17)
sbs1["alert"] = __parseBool(parts, 18)
sbs1["emergency"] = __parseBool(parts, 19)
sbs1["spi"] = __parseBool(parts, 20)
sbs1["onGround"] = __parseBool(parts, 21)
# Convert ground speed from knots to meter per second
if sbs1["groundSpeed"] is not None:
sbs1["groundSpeed"] = sbs1["groundSpeed"] * 0.514444 # knots -> m/s
# Convert Altitude from feet into meters
if sbs1["altitude"] is not None:
sbs1["altitude"] = sbs1["altitude"] * 0.3048
# Vertical Speed is in FEET PER MINUTE, we convert it to METERS PER SECOND.
if sbs1["verticalRate"] is not None:
sbs1["verticalRate"] = sbs1["verticalRate"] * 0.00508
except IndexError as e:
logging.error("Failed to init sbs1 message from '%s'" % (msg), exc_info=True)
return None
return sbs1
def __parseString(array: List, index: int):
"""Parse string at given index in array
Return string or None if string is empty or index is out of bounds"""
try:
value = array[index]
if len(value) == 0:
return None
else:
return value
except ValueError as e:
return None
except TypeError as e:
return None
except IndexError as e:
return None
def __parseBool(array: List, index: int):
"""Parse boolean at given index in array
Return boolean value or None if index is out of bounds or type casting failed"""
try:
return bool(int(array[index]))
except ValueError as e:
return None
except TypeError as e:
return None
except IndexError as e:
return None
def __parseInt(array: List, index: int):
"""Parse int at given index in array
Return int value or None if index is out of bounds or type casting failed"""
try:
numbers = re.findall('[\-0-9]+', array[index])[0]
return int(numbers)
except ValueError as e:
return None
except TypeError as e:
return None
except IndexError as e:
return None
def __parseFloat(array: List, index: int):
"""Parse float at given index in array
Return float value or None if index is out of bounds or type casting failed"""
try:
return float(array[index])
except ValueError as e:
return None
except TypeError as e:
return None
except IndexError as e:
return None
def __parseDateTime(array: List, dateIndex: int, timeIndex: int):
"""Parse date and time at given indexes in array
Return datetime value or None if indexes are out of bounds or type casting failed"""
date = __parseString(array, dateIndex)
time = __parseString(array, timeIndex)
if date != None and time != None:
try:
d = dateutil.parser.parse("%s %s" % (date, time))
except ValueError:
d = None
except TypeError:
d = None
return d