diff --git a/cmd/pw_ingest/main.go b/cmd/pw_ingest/main.go index 3f30f99c..95f4ade0 100644 --- a/cmd/pw_ingest/main.go +++ b/cmd/pw_ingest/main.go @@ -104,7 +104,10 @@ func main() { func commonSetup(c *cli.Context) (*tracker.Tracker, error) { monitoring.RunWebServer(c) - // let's parse our URL forms + producers, err := setup.HandleSourceFlags(c) + if nil != err { + return nil, err + } trackerOpts := make([]tracker.Option, 0) trackerOpts = append(trackerOpts, tracker.WithPrometheusCounters(prometheusGaugeCurrentPlanes, prometheusCounterFramesDecoded)) @@ -126,10 +129,6 @@ func commonSetup(c *cli.Context) (*tracker.Tracker, error) { } } - producers, err := setup.HandleSourceFlags(c) - if nil != err { - return nil, err - } for _, p := range producers { trk.AddProducer(p) } @@ -138,9 +137,9 @@ func commonSetup(c *cli.Context) (*tracker.Tracker, error) { } func runSimple(c *cli.Context) error { - defer func() { - recover() - }() + //defer func() { + // recover() + //}() logging.ConfigureForCli() trk, err := commonSetup(c) diff --git a/cmd/website_decode/main.go b/cmd/website_decode/main.go index ad0ac6cc..4b63f588 100644 --- a/cmd/website_decode/main.go +++ b/cmd/website_decode/main.go @@ -136,7 +136,8 @@ func runHttpServer(c *cli.Context) error { _, _ = fmt.Fprintln(w, "Not an AVR Frame", html.EscapeString(err.Error())) return } - pt.GetPlane(frame.Icao()).HandleModeSFrame(frame, refLat, refLon) + source := tracker.FrameSource{RefLat: refLat, RefLon: refLon} + pt.GetPlane(frame.Icao()).HandleModeSFrame(frame, &source) icaoList[frame.Icao()] = frame.Icao() frame.Describe(w) } diff --git a/lib/dedupe/btree_test.go b/lib/dedupe/btree_test.go index 6f93886a..5f207946 100644 --- a/lib/dedupe/btree_test.go +++ b/lib/dedupe/btree_test.go @@ -13,7 +13,7 @@ func TestBTreeSweep1(t *testing.T) { frame, _ := beast.NewFrame(beastModeSShort, false) - if nil == filter.Handle(&frame) { + if nil == filter.Handle(frame) { t.Errorf("Expected to add a frame") } @@ -33,13 +33,13 @@ func TestFilterBTree_Handle(t *testing.T) { frame, _ := beast.NewFrame(beastModeSShort, false) - resp := filter.Handle(&frame) + resp := filter.Handle(frame) if resp == nil { t.Errorf("Expected the same frame back") } - if nil != filter.Handle(&frame) { + if nil != filter.Handle(frame) { t.Errorf("Got a duplicated frame back") } } @@ -109,10 +109,10 @@ func BenchmarkFilterBTree_HandleDuplicates(b *testing.B) { filter := NewFilterBTree(WithSweeperInterval(0), WithDedupeMaxAge(time.Minute)) frame, _ := beast.NewFrame(beastModeSShort, false) - filter.Handle(&frame) + filter.Handle(frame) for n := 0; n < b.N; n++ { - if nil != filter.Handle(&frame) { + if nil != filter.Handle(frame) { b.Error("Should not have gotten a non empty response - duplicate handled incorrectly!?") } } @@ -126,12 +126,13 @@ func BenchmarkFilterBTree_HandleUnique(b *testing.B) { for n := 0; n < b.N; n++ { beastModeSTest := []byte{0x1a, 0x32, 0x22, 0x1b, 0x54, 0xf0, 0x81, 0x2b, 0x26, byte(n >> 24), byte(n >> 16), byte(n >> 8), byte(n), 0, 0, byte(degree)} msg, _ := beast.NewFrame(beastModeSTest, false) - if nil == filter.Handle(&msg) { + if nil == filter.Handle(msg) { b.Fatalf("Expected to insert new message %0X", beastModeSTest) } - if nil != filter.Handle(&msg) { + if nil != filter.Handle(msg) { b.Fatalf("Failed duplicate insert of %0X", beastModeSTest) } + beast.Release(msg) } if filter.btree.Len() != b.N { b.Errorf("Did not get the same number of items as tested. expected %d, got %d", b.N, filter.btree.Len()) diff --git a/lib/dedupe/common_test.go b/lib/dedupe/common_test.go index d2ec94ca..6288ed8d 100644 --- a/lib/dedupe/common_test.go +++ b/lib/dedupe/common_test.go @@ -3,9 +3,9 @@ package dedupe import "plane.watch/lib/tracker/beast" func makeBeastMessages(iterMax int) []*beast.Frame { - //max := 0x00FFFFFF - max := iterMax * iterMax * iterMax - messages := make([]*beast.Frame, 0, max) + //maxVal := 0x00FFFFFF + maxVal := iterMax * iterMax * iterMax + messages := make([]*beast.Frame, 0, maxVal) // setup our test data template := make([]byte, len(beastModeSShort)) @@ -22,7 +22,7 @@ func makeBeastMessages(iterMax int) []*beast.Frame { shrt[14] = byte(y) shrt[15] = byte(z) frame, _ := beast.NewFrame(shrt, false) - messages = append(messages, &frame) + messages = append(messages, frame) } } } diff --git a/lib/dedupe/dedupe_test.go b/lib/dedupe/dedupe_test.go index ebdadd86..e9708637 100644 --- a/lib/dedupe/dedupe_test.go +++ b/lib/dedupe/dedupe_test.go @@ -18,7 +18,7 @@ func TestFilter_Handle(t *testing.T) { if nil != err { t.Error(err) } - fe := tracker.NewFrameEvent(&frame, nil) + fe := tracker.NewFrameEvent(frame, nil) resp := filter.Handle(&fe) @@ -35,7 +35,7 @@ func BenchmarkFilter_HandleDuplicates(b *testing.B) { filter := NewFilter() frame, _ := beast.NewFrame(beastModeSShort, false) - fe := tracker.NewFrameEvent(&frame, nil) + fe := tracker.NewFrameEvent(frame, nil) filter.Handle(&fe) for n := 0; n < b.N; n++ { @@ -52,7 +52,7 @@ func BenchmarkFilter_HandleUnique(b *testing.B) { for n := 0; n < b.N; n++ { beastModeSTest := []byte{0x1a, 0x32, 0x22, 0x1b, 0x54, 0xf0, 0x81, 0x2b, 0x26, byte(n >> 24), byte(n >> 16), byte(n >> 8), byte(n), 0, 0, 0} msg, _ := beast.NewFrame(beastModeSTest, false) - fe := tracker.NewFrameEvent(&msg, nil) + fe := tracker.NewFrameEvent(msg, nil) if nil == filter.Handle(&fe) { b.Fatal("Expected to insert new message") @@ -60,6 +60,7 @@ func BenchmarkFilter_HandleUnique(b *testing.B) { if nil != filter.Handle(&fe) { b.Fatal("Failed duplicate insert") } + beast.Release(msg) } if int(filter.list.Len()) != b.N { diff --git a/lib/dedupe/forgetfulmap/map.go b/lib/dedupe/forgetfulmap/map.go index 51d0873e..e075c90a 100644 --- a/lib/dedupe/forgetfulmap/map.go +++ b/lib/dedupe/forgetfulmap/map.go @@ -20,6 +20,8 @@ type ( forgettable ForgettableFunc itemCounter prometheus.Gauge + + useSyncPool bool } // a generic wrapper for things that can be lost @@ -31,11 +33,27 @@ type ( Option func(*ForgetfulSyncMap) ) +var ( + marbleBag *sync.Pool +) + +func init() { + marbleBag = &sync.Pool{ + New: func() any { + return &marble{ + added: time.Time{}, + value: nil, + } + }, + } +} + func NewForgetfulSyncMap(opts ...Option) *ForgetfulSyncMap { f := &ForgetfulSyncMap{ lookup: &sync.Map{}, sweepInterval: 10 * time.Second, oldAfter: 60 * time.Second, + useSyncPool: true, } for _, opt := range opts { opt(f) @@ -51,6 +69,12 @@ func NewForgetfulSyncMap(opts ...Option) *ForgetfulSyncMap { return f } +func UseMemSyncPool(use bool) Option { + return func(syncMap *ForgetfulSyncMap) { + syncMap.useSyncPool = use + } +} + func WithPrometheusCounters(numItems prometheus.Gauge) Option { return func(syncMap *ForgetfulSyncMap) { syncMap.itemCounter = numItems @@ -159,12 +183,12 @@ func (f *ForgetfulSyncMap) AddKey(key interface{}) { return } if kb, ok := key.([]byte); ok { - if 0 == len(kb) { + if len(kb) == 0 { return } } if ks, ok := key.(string); ok { - if "" == ks { + if ks == "" { return } } @@ -172,7 +196,7 @@ func (f *ForgetfulSyncMap) AddKey(key interface{}) { } func (f *ForgetfulSyncMap) AddKeyStr(key string) { // avoid storing empty things - if "" == key { + if key == "" { return } f.Store(key, nil) @@ -188,22 +212,30 @@ func (f *ForgetfulSyncMap) Load(key any) (any, bool) { return t.value, retBool } return nil, false - } else { - return retVal, retBool } + return retVal, retBool } // Store remembers an item func (f *ForgetfulSyncMap) Store(key, value interface{}) { - f.lookup.Store(key, &marble{ - added: time.Now(), - value: value, - }) + var m *marble + if f.useSyncPool { + m = marbleBag.Get().(*marble) + } else { + m = &marble{} + } + m.added = time.Now() + m.value = value + f.lookup.Store(key, m) } // Delete Removes an item from the list func (f *ForgetfulSyncMap) Delete(key interface{}) { - f.lookup.Delete(key) + m, ok := f.lookup.Load(key) + if ok { + marbleBag.Put(m) + f.lookup.Delete(key) + } } // Len returns a count of the number of items in the list @@ -219,11 +251,11 @@ func (f *ForgetfulSyncMap) Len() (entries int32) { // Range Iterates over the underlying sync.Map and calls the user function once per item func (f *ForgetfulSyncMap) Range(rangeFunc func(key, value interface{}) bool) { f.lookup.Range(func(key, value interface{}) bool { - if m, ok := value.(*marble); ok { + m, ok := value.(*marble) + if ok { return rangeFunc(key, m.value) - } else { - return rangeFunc(key, value) } + return rangeFunc(key, value) }) } diff --git a/lib/dedupe/forgetfulmap/map_test.go b/lib/dedupe/forgetfulmap/map_test.go index 4f6fa0a6..b5b51302 100644 --- a/lib/dedupe/forgetfulmap/map_test.go +++ b/lib/dedupe/forgetfulmap/map_test.go @@ -104,7 +104,7 @@ func TestForgetfulSyncMap_DontSweepNewPlane(t *testing.T) { t.Error("Test plane not added.") } - //this shouldn't sweep our new plane. + // this shouldn't sweep our new plane. testMap.sweep() if testMap.Len() != 1 { @@ -212,7 +212,7 @@ func TestForgetfulSyncMap_Range(t *testing.T) { item := testItem{value: "item 222"} testMap.Store("test", item) - if 1 != testMap.Len() { + if testMap.Len() != 1 { t.Error("Failed to store test item") } @@ -226,7 +226,7 @@ func TestForgetfulSyncMap_Range(t *testing.T) { if !tOk { t.Error("Failed to get our test item out unmolested") } - if "item 222" != typedLoadedItem.value { + if typedLoadedItem.value != "item 222" { t.Errorf("item came out changed?! - %+v", item) } @@ -237,14 +237,14 @@ func TestForgetfulSyncMap_Range(t *testing.T) { if !tOk { t.Error("Failed to get our test item out unmolested") } - if "item 222" != typedLoadedItem.value { + if typedLoadedItem.value != "item 222" { t.Errorf("item came out changed?! - %+v", item) } return true }) - if 1 != counter { + if counter != 1 { t.Error("Failed to range correctly through the map") } } @@ -268,12 +268,12 @@ func TestForgetfulSyncMap_SweepWithCustomExpiryFunc(t *testing.T) { item2 := testItem{value: "item 222", remove: true} testMap.Store("item2", item2) - if 2 != testMap.Len() { + if testMap.Len() != 2 { t.Error("Failed to store test items") } testMap.sweep() - if 1 != testMap.Len() { + if testMap.Len() != 1 { t.Error("Failed to remove our test2 item on sweep") } @@ -283,3 +283,45 @@ func TestForgetfulSyncMap_SweepWithCustomExpiryFunc(t *testing.T) { t.Error("Failed load our item1") } } + +func BenchmarkForgetfulSyncMap(b *testing.B) { + m := NewForgetfulSyncMap( + UseMemSyncPool(false), + WithSweepInterval(time.Second*300), + WithOldAgeAfterSeconds(300), + WithForgettableAction(func(key, value any, added time.Time) bool { + return false + }), + ) + + var i, j int64 + for n := 0; n < b.N; n++ { + for j = 0; j < 100; j++ { + m.AddKey((i * 100) + j) + } + for j = 0; j < 100; j++ { + m.Delete((i * 100) + j) + } + } +} + +func BenchmarkForgetfulSyncMapPool(b *testing.B) { + m := NewForgetfulSyncMap( + UseMemSyncPool(true), + WithSweepInterval(time.Second*300), + WithOldAgeAfterSeconds(300), + WithForgettableAction(func(key, value any, added time.Time) bool { + return false + }), + ) + + var i, j int64 + for n := 0; n < b.N; n++ { + for j = 0; j < 100; j++ { + m.AddKey((i * 100) + j) + } + for j = 0; j < 100; j++ { + m.Delete((i * 100) + j) + } + } +} diff --git a/lib/logging/logging.go b/lib/logging/logging.go index 85cc8332..90f80832 100644 --- a/lib/logging/logging.go +++ b/lib/logging/logging.go @@ -102,6 +102,16 @@ func StopProfiling(c *cli.Context) error { pprof.StopCPUProfile() println("To analyze the profile, use this cmd") println("go tool pprof -http=:7777", fileName) + + f, err := os.Create("mem-" + fileName) + if nil != err { + panic(err) + } + err = pprof.WriteHeapProfile(f) + if nil != err { + panic(err) + } + println("go tool pprof -http=:7777", "mem-"+fileName) } return nil } diff --git a/lib/producer/beast.go b/lib/producer/beast.go index 6610de17..dd5a25af 100644 --- a/lib/producer/beast.go +++ b/lib/producer/beast.go @@ -12,16 +12,14 @@ const tokenBufLen = 50 func (p *Producer) beastScanner(scan *bufio.Scanner) error { lastTimeStamp := time.Duration(0) + // make our best lib allocate out of a sync.Pool + beast.UsePoolAllocator = true for scan.Scan() { msg := bytes.Clone(scan.Bytes()) frame, err := beast.NewFrame(msg, false) if nil != err { continue } - //frame := beast.NewFrame(msg, false) - //if nil == frame { - // continue - //} if p.beastDelay { currentTs := frame.BeastTicksNs() if lastTimeStamp > 0 && lastTimeStamp < currentTs { @@ -29,7 +27,7 @@ func (p *Producer) beastScanner(scan *bufio.Scanner) error { } lastTimeStamp = currentTs } - p.addFrame(&frame, &p.FrameSource) + p.addFrame(frame, &p.FrameSource) if nil != p.stats.beast { p.stats.beast.Inc() @@ -46,7 +44,7 @@ func ScanBeast(data []byte, atEOF bool) (int, []byte, error) { // skip until we get our first 0x1A (message start) i := bytes.IndexByte(data, 0x1A) - if -1 == i || len(data) < i+11 { + if i == -1 || len(data) < i+11 { // we do not even have the smallest message, let's get some more data return 0, nil, nil } @@ -78,7 +76,7 @@ func ScanBeast(data []byte, atEOF bool) (int, []byte, error) { return i + 1, nil, nil } bufLen := len(data) - i - //println("type", data[i+1], "input len", bufLen, "msg len",msgLen) + // println("type", data[i+1], "input len", bufLen, "msg len",msgLen) if bufLen >= tokenBufLen { // we have enough in our buffer // account for double escapes diff --git a/lib/producer/beast_test.go b/lib/producer/beast_test.go index 62f5a8c8..c75338f2 100644 --- a/lib/producer/beast_test.go +++ b/lib/producer/beast_test.go @@ -141,7 +141,7 @@ func TestScanBeast(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - scanner := ScanBeast() + scanner := ScanBeast gotAdvance, gotToken, err := scanner(tt.args.data, tt.args.atEOF) if (err != nil) != tt.wantErr { t.Errorf("ScanBeast() error = %v, wantErr %v", err, tt.wantErr) @@ -203,7 +203,7 @@ func Test_producer_beastScanner(t *testing.T) { } func TestScanBeastUnique(t *testing.T) { - scanner := ScanBeast() + scanner := ScanBeast buf := append(append(beastModeSShort, beastModeSLong...), emptyBuf...) @@ -252,7 +252,7 @@ func TestScanBeastBufferWorks(t *testing.T) { short := bytes.Repeat(beastModeSShort, tokenBufSize) long := bytes.Repeat(beastModeSLong, tokenBufSize) scanner := bufio.NewScanner(bytes.NewReader(append(append(short, long...), emptyBuf...))) - scanner.Split(ScanBeast()) + scanner.Split(ScanBeast) idx := 0 tokens := make([][]byte, 2*tokenBufSize) @@ -292,7 +292,7 @@ func BenchmarkScanBeast(b *testing.B) { for n := 0; n < b.N; n++ { _, _ = f.Seek(0, 0) scanner := bufio.NewScanner(f) - scanner.Split(ScanBeast()) + scanner.Split(ScanBeast) for scanner.Scan() { } } @@ -308,7 +308,7 @@ func BenchmarkBeastDecode(b *testing.B) { for n := 0; n < b.N; n++ { _, _ = f.Seek(0, 0) scanner := bufio.NewScanner(f) - scanner.Split(ScanBeast()) + scanner.Split(ScanBeast) for scanner.Scan() { msg := scanner.Bytes() frame, err := beast.NewFrame(msg, false) diff --git a/lib/producer/common.go b/lib/producer/common.go index e57e7407..47ced6f6 100644 --- a/lib/producer/common.go +++ b/lib/producer/common.go @@ -41,7 +41,9 @@ type ( splitter bufio.SplitFunc beastDelay, keepAliveRepeater bool - run func() + run func() + running bool + runningLock sync.Mutex stats struct { avr, beast, sbs1 prometheus.Counter @@ -63,6 +65,7 @@ func New(opts ...Option) *Producer { Tag: "", RefLat: nil, RefLon: nil, + VelocityCheck: true, }, out: make(chan tracker.FrameEvent, 100), cmdChan: make(chan int), @@ -174,6 +177,7 @@ func WithOriginName(name string) Option { func WithFiles(filePaths []string) Option { return func(p *Producer) { + p.FrameSource.VelocityCheck = p.beastDelay p.run = func() { p.readFiles(filePaths, func(reader io.Reader, fileName string) error { scanner := bufio.NewScanner(reader) @@ -213,6 +217,10 @@ func WithPrometheusCounters(avr, beast, sbs1 prometheus.Counter) Option { } } +func (p *Producer) Source() *tracker.FrameSource { + return &p.FrameSource +} + func (p *Producer) readFromScanner(scan *bufio.Scanner) error { scan.Split(p.splitter) @@ -232,8 +240,8 @@ func (p *Producer) readFromScanner(scan *bufio.Scanner) error { func WithReferenceLatLon(lat, lon float64) Option { return func(p *Producer) { p.log.Debug().Float64("lat", lat).Float64("lon", lon).Msg("With Reference Lat/Lon") - p.RefLat = &lat - p.RefLon = &lon + p.FrameSource.RefLat = &lat + p.FrameSource.RefLon = &lon } } func WithKeepAliveRepeater() Option { @@ -243,11 +251,16 @@ func WithKeepAliveRepeater() Option { } func (p *Producer) String() string { - return p.Name + return p.FrameSource.Name } func (p *Producer) Listen() chan tracker.FrameEvent { - go p.run() + p.runningLock.Lock() + defer p.runningLock.Unlock() + if !p.running { + go p.run() + p.running = true + } return p.out } @@ -261,15 +274,15 @@ func (p *Producer) addFrame(f tracker.Frame, s *tracker.FrameSource) { } func (p *Producer) addDebug(sfmt string, v ...interface{}) { - p.log.Debug().Str("section", p.Name).Msgf(sfmt, v...) + p.log.Debug().Str("section", p.FrameSource.Name).Msgf(sfmt, v...) } func (p *Producer) addInfo(sfmt string, v ...interface{}) { - p.log.Info().Str("section", p.Name).Msgf(sfmt, v...) + p.log.Info().Str("section", p.FrameSource.Name).Msgf(sfmt, v...) } func (p *Producer) addError(err error) { - p.log.Error().Str("section", p.Name).Err(err).Send() + p.log.Error().Str("section", p.FrameSource.Name).Err(err).Send() } func (p *Producer) HealthCheck() bool { @@ -297,7 +310,11 @@ func (p *Producer) AddEvent(e tracker.FrameEvent) { } func (p *Producer) Cleanup() { - defer func() { recover() }() + defer func() { + if r := recover(); nil != r { + p.log.Error().Interface("recover", r).Msg("Cleanup() had a panic") + } + }() close(p.out) } @@ -397,6 +414,7 @@ func (p *Producer) fetcher(host, port string, read func(net.Conn) error) { } p.addDebug("Done with Producer %s", p) p.Cleanup() + p.addDebug("cleanup is done %s", p) }() go func() { @@ -407,7 +425,9 @@ func (p *Producer) fetcher(host, port string, read func(net.Conn) error) { wLock.Lock() working = false if nil != conn { - _ = conn.Close() + if err := conn.Close(); err != nil { + p.log.Error().Err(err).Msg("Err when closing socket") + } } wLock.Unlock() return diff --git a/lib/tracker/beast/main.go b/lib/tracker/beast/main.go index 866cd9c7..68937a7b 100644 --- a/lib/tracker/beast/main.go +++ b/lib/tracker/beast/main.go @@ -5,31 +5,69 @@ import ( "fmt" "math" "plane.watch/lib/tracker/mode_s" + "sync" "time" ) type ( + // Frame represents our Beast frame and is used to decode into AVR Frame struct { raw []byte - msgType byte mlatTimestamp []byte - signalLevel byte body []byte + msgType byte + signalLevel byte bodyString string isRadarCape bool hasDecoded bool + isPool bool decodedModeS mode_s.Frame } ) -//var msgLenLookup = map[byte]int{ +var ( + // UsePoolAllocator when set to true will allocate Frame objects out of a sync.Pool. you will need to free them + // by calling beast.Release() + UsePoolAllocator = false + beastPool sync.Pool + + magicTimestampMLAT = []byte{0xFF, 0x00, 0x4D, 0x4C, 0x41, 0x54} + ErrBadBeastFrame = errors.New("bad beast frame") +) + +func init() { + beastPool = sync.Pool{ + New: func() any { + return &Frame{ + raw: make([]byte, 0, 30), + msgType: 0, + mlatTimestamp: make([]byte, 0, 6), + signalLevel: 0, + body: make([]byte, 0, 14), + bodyString: " ", // 28 chars to fit 112bit squitters + isRadarCape: false, + hasDecoded: false, + decodedModeS: mode_s.Frame{}, + } + }, + } +} + +func Release(frame *Frame) { + if UsePoolAllocator { + beastPool.Put(frame) + } +} + +// var msgLenLookup = map[byte]int{ // 0x31: 2, // 0x32: 7, // 0x33: 14, // 0x34: 2, -//} +// } +// Icao returns the airframes ICAO code as an int func (f *Frame) Icao() uint32 { if nil == f { return 0 @@ -40,6 +78,7 @@ func (f *Frame) Icao() uint32 { return f.decodedModeS.Icao() } +// IcaoStr returns the airframes ICAO code as a readable string func (f *Frame) IcaoStr() string { if nil == f { return "" @@ -50,6 +89,7 @@ func (f *Frame) IcaoStr() string { return f.decodedModeS.IcaoStr() } +// Decode is used to turn our beast msg into our mode_s.Frame representation func (f *Frame) Decode() error { if nil == f { return errors.New("nil frame") @@ -73,6 +113,7 @@ func (f *Frame) TimeStamp() time.Time { return time.Now() } +// Raw gives us back our raw beast message func (f *Frame) Raw() []byte { if nil == f { return []byte{} @@ -80,12 +121,14 @@ func (f *Frame) Raw() []byte { return f.raw } -var magicTimestampMLAT = []byte{0xFF, 0x00, 0x4D, 0x4C, 0x41, 0x54} - -var ErrBadBeastFrame = errors.New("bad beast frame") - -func NewFrame(rawBytes []byte, isRadarCape bool) (Frame, error) { - var f Frame +func NewFrame(rawBytes []byte, isRadarCape bool) (*Frame, error) { + if UsePoolAllocator { + return newFrameInto(beastPool.Get().(*Frame), rawBytes, isRadarCape) + } else { + return newFrameInto(&Frame{}, rawBytes, isRadarCape) + } +} +func newFrameInto(f *Frame, rawBytes []byte, isRadarCape bool) (*Frame, error) { if len(rawBytes) <= 8 { return f, ErrBadBeastFrame } @@ -143,7 +186,7 @@ func (f *Frame) BeastTicksNs() time.Duration { var t uint64 inc := 40 for i := 0; i < 6; i++ { - t = t | uint64(f.mlatTimestamp[i])< 95 || lon < -180 || lon > 180 { return fmt.Errorf("cannot add invalid coordinates {%0.6f, %0.6f}", lat, lon) } @@ -756,11 +756,12 @@ func (p *Plane) addLatLong(lat, lon float64, ts time.Time) (warn error) { var durationTravelled float64 numHistoryItems := len(p.locationHistory) // determine speed? - if numHistoryItems > 0 && p.location.latitude != 0 && p.location.longitude != 0 { + doVelocityCheck := velocityCheck && numHistoryItems > 0 && p.location.latitude != 0 && p.location.longitude != 0 + if doVelocityCheck { referenceTime := p.locationHistory[numHistoryItems-1].cprDecodedTs if !referenceTime.IsZero() && referenceTime.Before(ts) { durationTravelled = float64(ts.Sub(referenceTime)) / float64(time.Second) - if 0.0 == durationTravelled { + if durationTravelled == 0.0 { durationTravelled = 1 } acceptableMaxDistance := (1 + durationTravelled) * 686 // mach2 in metres/second seems fast enough... @@ -785,19 +786,19 @@ func (p *Plane) addLatLong(lat, lon float64, ts time.Time) (warn error) { Floats64("This Lat/Lon", []float64{lat, lon}). Msg("A Frame Too Far") - var lastTs int64 + var lastTS int64 p.recentFrames.Range(func(f *mode_s.Frame) bool { - if 0 == lastTs { - lastTs = f.TimeStamp().UnixNano() + if 0 == lastTS { + lastTS = f.TimeStamp().UnixNano() } p.tracker.log.Error(). Str("ICAO", f.IcaoStr()). Time("received", f.TimeStamp()). Int64("unix nano", f.TimeStamp().UnixNano()). Str("Frame", f.RawString()). - Int64("Time Diff ms", (lastTs-f.TimeStamp().UnixNano())/1e6). + Int64("Time Diff ms", (lastTS-f.TimeStamp().UnixNano())/1e6). Msg("Frames Leading to Broken Track") - lastTs = f.TimeStamp().UnixNano() + lastTS = f.TimeStamp().UnixNano() return true }) return @@ -814,13 +815,13 @@ func (p *Plane) addLatLong(lat, lon float64, ts time.Time) (warn error) { p.location.cprDecodedTs = ts needsLookup := true - if "" != p.location.gridTileLocation { - if tile_grid.InGridLocation(lat, lon, p.location.gridTileLocation) { + if !p.location.HasTileGrid() { + if tile_grid.InGridLocation(lat, lon, p.location.TileGrid()) { needsLookup = false } } if needsLookup { - p.location.gridTileLocation = tile_grid.LookupTile(lat, lon) + p.location.SetTileGrid(tile_grid.LookupTile(lat, lon)) } p.locationHistory = append(p.locationHistory, p.location.Copy()) return @@ -848,7 +849,7 @@ func (p *Plane) setCprOddLocation(lat, lon float64, t time.Time) error { } // decodeCpr decodes the CPR Even and Odd frames and gets our Plane position -func (p *Plane) decodeCpr(refLat, refLon float64, ts time.Time) error { +func (p *Plane) decodeCpr(refLat, refLon float64, velocityCheck bool) error { p.cprLocation.refLat = refLat p.cprLocation.refLon = refLon loc, err := p.cprLocation.decode(p.OnGround()) @@ -856,7 +857,7 @@ func (p *Plane) decodeCpr(refLat, refLon float64, ts time.Time) error { return err } - return p.addLatLong(loc.latitude, loc.longitude, loc.cprDecodedTs) + return p.addLatLong(loc.latitude, loc.longitude, loc.cprDecodedTs, velocityCheck) } // LocationHistory returns the track history of the Plane @@ -920,8 +921,8 @@ func (hi headingInfo) getCompassLabel(heading float64) string { // Copy let's us duplicate a plane location func (pl *PlaneLocation) Copy() *PlaneLocation { - pl.rwlock.RLock() - defer pl.rwlock.RUnlock() + pl.mu.Lock() + defer pl.mu.Unlock() return &PlaneLocation{ latitude: pl.latitude, longitude: pl.longitude, @@ -948,14 +949,32 @@ func (pl *PlaneLocation) Copy() *PlaneLocation { // Lat returns the Locations current LAT func (pl *PlaneLocation) Lat() float64 { - pl.rwlock.RLock() - defer pl.rwlock.RUnlock() + pl.mu.Lock() + defer pl.mu.Unlock() return pl.latitude } // Lon returns the Locations current LON func (pl *PlaneLocation) Lon() float64 { - pl.rwlock.RLock() - defer pl.rwlock.RUnlock() + pl.mu.Lock() + defer pl.mu.Unlock() return pl.longitude } + +func (pl *PlaneLocation) HasTileGrid() bool { + pl.mu.Lock() + defer pl.mu.Unlock() + return pl.gridTileLocation != "" +} + +func (pl *PlaneLocation) SetTileGrid(tile string) { + pl.mu.Lock() + defer pl.mu.Unlock() + pl.gridTileLocation = tile +} + +func (pl *PlaneLocation) TileGrid() string { + pl.mu.Lock() + defer pl.mu.Unlock() + return pl.gridTileLocation +} diff --git a/lib/tracker/plane_test.go b/lib/tracker/plane_test.go index 24d99630..3baa92fc 100644 --- a/lib/tracker/plane_test.go +++ b/lib/tracker/plane_test.go @@ -111,7 +111,7 @@ func TestGetPlane(t *testing.T) { t.Errorf("latitude Calculation was incorrect: expected 10.2162144547802, got %0.13f", location.latitude) } - _ = plane.addLatLong(location.latitude, location.longitude, time.Now()) + _ = plane.addLatLong(location.latitude, location.longitude, time.Now(), true) } func Test_headingInfo_getCompassLabel(t *testing.T) { diff --git a/lib/tracker/tracker.go b/lib/tracker/tracker.go index b28cff14..bc119b35 100644 --- a/lib/tracker/tracker.go +++ b/lib/tracker/tracker.go @@ -28,16 +28,13 @@ type ( middlewares []Middleware sink Sink - producerWaiter sync.WaitGroup - middlewareWaiter sync.WaitGroup - - decodeWorkerCount int - decodingQueue chan *FrameEvent - decodingQueueDepth int + producerWaiter sync.WaitGroup + middlewareWaiter sync.WaitGroup + eventsWaiter sync.WaitGroup decodingQueueWaiter sync.WaitGroup - finishDone bool - eventsWaiter sync.WaitGroup + decodeWorkerCount int + finishDone bool startTime time.Time @@ -70,14 +67,13 @@ func (d dummySink) HealthCheck() bool { // NewTracker creates a new tracker with which we can populate with plane tracking data func NewTracker(opts ...Option) *Tracker { t := &Tracker{ - producers: []Producer{}, - middlewares: []Middleware{}, - decodeWorkerCount: 5, - pruneTick: 10 * time.Second, - pruneAfter: 5 * time.Minute, - decodingQueueDepth: 1000, - sink: dummySink{}, - startTime: time.Now(), + producers: []Producer{}, + middlewares: []Middleware{}, + decodeWorkerCount: 5, + pruneTick: 10 * time.Second, + pruneAfter: 5 * time.Minute, + sink: dummySink{}, + startTime: time.Now(), log: log.With().Str("Section", "Tracker").Logger(), } @@ -86,8 +82,6 @@ func NewTracker(opts ...Option) *Tracker { opt(t) } - t.decodingQueue = make(chan *FrameEvent, t.decodingQueueDepth) - t.planeList = forgetfulmap.NewForgetfulSyncMap( forgetfulmap.WithSweepInterval(t.pruneTick), forgetfulmap.WithPreEvictionAction(func(key, value interface{}) { @@ -123,11 +117,6 @@ func NewTracker(opts ...Option) *Tracker { }), ) - t.decodingQueueWaiter.Add(t.decodeWorkerCount) - for i := 0; i < t.decodeWorkerCount; i++ { - go t.decodeQueue() - } - return t } @@ -161,27 +150,37 @@ func (t *Tracker) EachPlane(pi PlaneIterator) { }) } -func (p *Plane) HandleModeSFrame(frame *mode_s.Frame, refLat, refLon *float64) { +func (p *Plane) HandleModeSFrame(frame *mode_s.Frame, source *FrameSource) { if nil == frame { return } icao := frame.Icao() - if 0 == icao { + if icao == 0 { return } var planeFormat string var hasChanged bool + var refLat, refLon *float64 + checkVelocity := true + + if source != nil { + refLat = source.RefLat + refLon = source.RefLon + checkVelocity = source.VelocityCheck + } p.setLastSeen(frame.TimeStamp()) p.incMsgCount() - p.addFrame(frame) + isDebug := p.tracker.log.Debug().Enabled() + + if isDebug { + p.addFrame(frame) + } debugMessage := func(sfmt string, a ...interface{}) { - //if zerolog.GlobalLevel() <= zerolog.DebugLevel { - if p.tracker.log.Debug().Enabled() { + if isDebug { planeFormat = fmt.Sprintf("DF%02d - \033[0;97mPlane (\033[38;5;118m%s %-8s\033[0;97m)", frame.DownLinkType(), p.IcaoIdentifierStr(), p.FlightNumber()) p.tracker.log.Debug().Msgf(planeFormat+sfmt, a...) - } } @@ -201,8 +200,8 @@ func (p *Plane) HandleModeSFrame(frame *mode_s.Frame, refLat, refLon *float64) { // if there is no tile location for this plane and we have a refLat/refLon - let's assume it is in the same tile // as the receiver. This will be "fixed" for aircraft sending lat/lon within a few frames if it is different. // this means that all the aircraft that do not send locations, will at least have a chance of showing up. - if "" == p.GridTileLocation() && nil != refLat && nil != refLon { - p.location.gridTileLocation = tile_grid.LookupTile(*refLat, *refLon) + if p.GridTileLocation() == "" && nil != refLat && nil != refLon { + p.location.SetTileGrid(tile_grid.LookupTile(*refLat, *refLon)) } // determine what to do with our given frame @@ -244,7 +243,7 @@ func (p *Plane) HandleModeSFrame(frame *mode_s.Frame, refLat, refLon *float64) { } hasChanged = p.setFlightStatus(frame.FlightStatus(), frame.FlightStatusString(), frame.TimeStamp()) || hasChanged - if 5 == frame.DownLinkType() { // || 21 == frame.DownLinkType() + if frame.DownLinkType() == 5 { // || 21 == frame.DownLinkType() hasChanged = p.setSquawkIdentity(frame.SquawkIdentity(), frame.TimeStamp()) || hasChanged } @@ -260,10 +259,7 @@ func (p *Plane) HandleModeSFrame(frame *mode_s.Frame, refLat, refLon *float64) { } case 17, 18, 19: // ADS-B - //if debug { - // frame.Describe(os.Stdout) - //} - // i am using the text version because it is easier to program with. + // I am using the text version because it is easier to program with. // if performance is an issue, change over to byte comparing messageType := frame.MessageTypeString() switch messageType { @@ -294,7 +290,7 @@ func (p *Plane) HandleModeSFrame(frame *mode_s.Frame, refLat, refLon *float64) { } else { _ = p.setCprOddLocation(float64(frame.Latitude()), float64(frame.Longitude()), frame.TimeStamp()) } - if err := p.decodeCprFilledRefLatLon(refLat, refLon, frame.TimeStamp()); nil != err { + if err := p.decodeCprFilledRefLatLon(refLat, refLon, checkVelocity); nil != err { debugMessage("%s", err) } else { hasChanged = true @@ -318,7 +314,7 @@ func (p *Plane) HandleModeSFrame(frame *mode_s.Frame, refLat, refLon *float64) { altitude, _ := frame.Altitude() hasChanged = p.setAltitude(altitude, frame.AltitudeUnits(), frame.TimeStamp()) || hasChanged - if err := p.decodeCpr(0, 0, frame.TimeStamp()); nil != err { + if err := p.decodeCpr(0, 0, checkVelocity); nil != err { debugMessage("%s", err) } else { hasChanged = true @@ -355,19 +351,19 @@ func (p *Plane) HandleModeSFrame(frame *mode_s.Frame, refLat, refLon *float64) { debugMessage(" has %s and is travelling at %0.2f knots\033[0m", headingStr, p.Velocity()) } - case mode_s.DF17FrameTestMessage: //, "Test Message": + case mode_s.DF17FrameTestMessage: debugMessage("\033[2m Ignoring: DF%d %s\033[0m", frame.DownLinkType(), messageType) - case mode_s.DF17FrameTestMessageSquawk: //, "Test Message": + case mode_s.DF17FrameTestMessageSquawk: { if frame.SquawkIdentity() > 0 { hasChanged = p.setSquawkIdentity(frame.SquawkIdentity(), frame.TimeStamp()) || hasChanged } } - case mode_s.DF17FrameSurfaceSystemStatus: //, "Surface System status": + case mode_s.DF17FrameSurfaceSystemStatus: hasChanged = p.setGroundStatus(true, frame.TimeStamp()) || hasChanged debugMessage("\033[2m Ignoring: DF%d %s\033[0m", frame.DownLinkType(), messageType) - case mode_s.DF17FrameEmergencyPriority: //, "Extended Squitter Aircraft status (Emergency)": + case mode_s.DF17FrameEmergencyPriority: { debugMessage("\033[2m %s\033[0m", messageType) if frame.Alert() { @@ -376,15 +372,15 @@ func (p *Plane) HandleModeSFrame(frame *mode_s.Frame, refLat, refLon *float64) { } hasChanged = p.setSquawkIdentity(frame.SquawkIdentity(), frame.TimeStamp()) || hasChanged } - case mode_s.DF17FrameTcasRA: //, "Extended Squitter Aircraft status (1090ES TCAS RA)": + case mode_s.DF17FrameTcasRA: { debugMessage("\033[2m Ignoring: DF%d %s\033[0m", frame.DownLinkType(), messageType) } - case mode_s.DF17FrameTargetStateStatus: //, "Target State and status Message": + case mode_s.DF17FrameTargetStateStatus: { debugMessage("\033[2m Ignoring: DF%d %s\033[0m", frame.DownLinkType(), messageType) } - case mode_s.DF17FrameAircraftOperational: //, "Aircraft Operational status Message": + case mode_s.DF17FrameAircraftOperational: { if frame.VerticalStatusValid() { hasChanged = p.setGroundStatus(frame.MustOnGround(), frame.TimeStamp()) || hasChanged @@ -409,10 +405,10 @@ func (p *Plane) HandleModeSFrame(frame *mode_s.Frame, refLat, refLon *float64) { } } - if "" == p.location.gridTileLocation && nil != refLat && nil != refLon { + if p.location.HasTileGrid() && nil != refLat && nil != refLon { // do not have a grid tile for this plane, let's assume it is in same tile as the receiver - p.location.gridTileLocation = tile_grid.LookupTile(*refLat, *refLon) - hasChanged = p.location.gridTileLocation != "" || hasChanged + p.location.SetTileGrid(tile_grid.LookupTile(*refLat, *refLon)) + hasChanged = p.location.TileGrid() != "" || hasChanged } if hasChanged { @@ -425,7 +421,7 @@ func (p *Plane) HandleSbs1Frame(frame *sbs1.Frame) { p.setLastSeen(frame.TimeStamp()) p.incMsgCount() if frame.HasPosition { - if err := p.addLatLong(frame.Lat, frame.Lon, frame.Received); nil != err { + if err := p.addLatLong(frame.Lat, frame.Lon, frame.Received, true); nil != err { p.tracker.log.Warn().Err(err).Send() } diff --git a/lib/tracker/tracker_test.go b/lib/tracker/tracker_test.go index 548b1d9e..cd2c828c 100644 --- a/lib/tracker/tracker_test.go +++ b/lib/tracker/tracker_test.go @@ -176,7 +176,7 @@ func performTrackingTest(frames []string, t *testing.T) *Tracker { if nil != err { t.Errorf("%s", err) } - trk.GetPlane(frame.Icao()).HandleModeSFrame(frame, nil, nil) + trk.GetPlane(frame.Icao()).HandleModeSFrame(frame, nil) } return trk } @@ -226,7 +226,7 @@ func TestTrackingLocationHistory(t *testing.T) { return } plane := trk.GetPlane(frame.Icao()) - plane.HandleModeSFrame(frame, nil, nil) + plane.HandleModeSFrame(frame, nil) numHistory := len(plane.locationHistory) if tt.numLocations != numHistory { t.Errorf("Expected plane to have %d history items, actually has %d", tt.numLocations, numHistory) @@ -245,7 +245,7 @@ func TestTrackingLocationHistory(t *testing.T) { func TestPlane_HasLocation(t *testing.T) { trk := NewTracker() p := trk.GetPlane(0x010101) - err := p.addLatLong(0.01, 0.02, time.Now()) + err := p.addLatLong(0.01, 0.02, time.Now(), true) if nil != err { t.Errorf("Got error when adding lat/lon: %s", err) } @@ -398,9 +398,10 @@ func TestFarApartLocationUpdatesFail(t *testing.T) { } return f } + // more than 10s apart frames := []*mode_s.Frame{ md(mode_s.DecodeString("8D4CC54C58D3012E5A42EC86E201", time.Unix(1654054750, 540447277))), - md(mode_s.DecodeString("8D4CC54C58D304E49BF688F07265", time.Unix(1654054754, 563149779))), + md(mode_s.DecodeString("8D4CC54C58D304E49BF688F07265", time.Unix(1654054764, 563149779))), md(mode_s.DecodeString("8D4CC54C58D3012D1E44DD9DB4C3", time.Unix(1654054761, 392075155))), md(mode_s.DecodeString("8D4CC54C58D304DFD3FE0680A0AE", time.Unix(1654054797, 461184199))), } @@ -408,7 +409,7 @@ func TestFarApartLocationUpdatesFail(t *testing.T) { // make sure our frame timestamps are correct expectedUnixNano := []int64{ 1654054750540447277, - 1654054754563149779, + 1654054764563149779, 1654054761392075155, 1654054797461184199, } @@ -417,12 +418,12 @@ func TestFarApartLocationUpdatesFail(t *testing.T) { t.Errorf("Incorrect unix timestamp for frame %d. Expected %d != %d", i, expectedUnixNano[i], frames[i].TimeStamp().UnixNano()) } } - + source := &FrameSource{VelocityCheck: true} tkr := NewTracker() p := tkr.GetPlane(0x4CC54C) for i := 0; i < 4; i++ { - p.HandleModeSFrame(frames[i], nil, nil) + p.HandleModeSFrame(frames[i], source) if p.location.hasLatLon { t.Error("Should not have decoded lat/lon") @@ -450,7 +451,7 @@ func TestBadLocationUpdateRejected(t *testing.T) { p := tkr.GetPlane(0x4CA813) for i := 0; i < 4; i++ { - p.HandleModeSFrame(frames[i], nil, nil) + p.HandleModeSFrame(frames[i], nil) } if !p.location.hasLatLon { t.Error("Should have decoded lat/lon") @@ -460,7 +461,7 @@ func TestBadLocationUpdateRejected(t *testing.T) { // "Lat": 89.90261271848516, // "Lon": -86.77276611328125, - if 53.290813898636124 != p.location.latitude { + if p.location.latitude != 53.290813898636124 { t.Error("Wrong Latitude") } @@ -480,6 +481,10 @@ type testProducer struct { source *FrameSource } +func (tp *testProducer) Source() *FrameSource { + return tp.source +} + func newTestProducer() *testProducer { messages := map[string][]byte{ "DF00_MT00_ST00": {0x1A, 0x32, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xE1, 0x98, 0x38, 0x5F, 0x1A, 0x9D}, @@ -529,7 +534,7 @@ func newTestProducer() *testProducer { } for k := range messages { frame, _ := beast.NewFrame(messages[k], false) - tp.frames = append(tp.frames, frame) + tp.frames = append(tp.frames, *frame) } return &tp } @@ -565,15 +570,9 @@ func (tp *testProducer) addMsg() { tp.idx++ } -func withDecodeQueueDepth(num int) Option { - return func(t *Tracker) { - t.decodingQueueDepth = num - } -} - func BenchmarkTracker_AddFrame(b *testing.B) { b.StopTimer() - tracker := NewTracker(WithDecodeWorkerCount(1), withDecodeQueueDepth(1)) + tracker := NewTracker(WithDecodeWorkerCount(1)) tp := newTestProducer() tracker.AddProducer(tp)