Skip to content

Commit

Permalink
Merge pull request #198 from plane-watch/191-jumping-squawk-altitude
Browse files Browse the repository at this point in the history
ingest-tap
  • Loading branch information
bluntelk authored Jun 18, 2023
2 parents 7da64b7 + 4644686 commit 0949178
Show file tree
Hide file tree
Showing 13 changed files with 732 additions and 19 deletions.
175 changes: 175 additions & 0 deletions cmd/ingest_tap/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package main

import (
"github.com/nats-io/nats.go"
"github.com/rs/zerolog/log"
"github.com/urfave/cli/v2"
"os"
"os/signal"
"plane.watch/lib/logging"
"plane.watch/lib/nats_io"
"plane.watch/lib/randstr"
"plane.watch/lib/tracker/beast"
"sync"
"syscall"
"time"
)

const (
natsUrl = "nats"
apiKey = "api-key"
icao = "icao"
logFile = "file"
)

func main() {
app := cli.NewApp()
app.Name = "pw_ingest tap"
app.Description = "Ask all of the pw_ingest instances on the nats bus to feed matching data to this terminal.\n" +
"Matching beast/avr/sbs1 messages will be forwarded to you"

app.Commands = cli.Commands{
{
Name: "log",
Description: "Logs matching data to a file",
Flags: []cli.Flag{
&cli.StringFlag{
Name: icao,
Usage: "if specified, only frames from the plane with the specified ICAO will be sent",
},
&cli.StringFlag{
Name: apiKey,
Usage: "the plane.watch api UUID for the given feeder you want to investigate",
},
&cli.StringFlag{
Name: logFile,
Value: "captured-frames",
Usage: "[.beast|.avr|.sbs1] will be appended to file",
},
},
Action: logMatching,
},
}

app.Flags = []cli.Flag{
&cli.StringFlag{
Name: natsUrl,
Usage: "nats url",
Value: "nats://localhost:4222/",
},
}
logging.IncludeVerbosityFlags(app)
app.Before = func(c *cli.Context) error {
logging.SetLoggingLevel(c)

return nil
}

if err := app.Run(os.Args); nil != err {
log.Error().Err(err).Send()
}
}

func logMatching(c *cli.Context) error {
logging.ConfigureForCli()
fileHandles := make(map[string]*os.File)
exts := []string{"beast", "avr", "sbs1"}
for _, ext := range exts {
fileName := c.String(logFile) + "." + ext
fh, err := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE, 0666)
if nil != err {
log.Error().Err(err).Str("filename", fileName).Send()
return err
}
fileHandles[ext] = fh
}

natsSvr, err := nats_io.NewServer(
nats_io.WithConnections(true, true),
nats_io.WithServer(c.String(natsUrl), "ingest-tap"),
)
if nil != err {
return err
}

tapSubject := "ingest-tap-" + randstr.RandString(20)
ch, err := natsSvr.Subscribe(tapSubject)

var wg sync.WaitGroup
wg.Add(1)

go handleStream(ch, fileHandles, &wg)

headers := map[string]string{
"action": "add",
"api-key": c.String(apiKey),
"icao": c.String(icao),
"subject": tapSubject,
}

response, errRq := natsSvr.Request("v1.pw-ingest.tap", []byte{}, headers, time.Second)
if nil != errRq {
log.Error().Err(err).Msg("Failed to request tap")
return err
}
log.Debug().Str("response", string(response)).Msg("request response")

chSignal := make(chan os.Signal, 1)
signal.Notify(chSignal, syscall.SIGINT, syscall.SIGTERM)
<-chSignal // wait for our cancel signal
log.Info().Msg("Shutting down")

// ask to step sending
headers["action"] = "remove"
response, errRq = natsSvr.Request("v1.pw-ingest.tap", []byte{}, headers, time.Second)
if nil != errRq {
log.Error().Err(err).Msg("Failed to stop tap")
return err
}
log.Debug().Str("response", string(response)).Msg("request response")

close(ch)
wg.Wait()

for ext, f := range fileHandles {
err = f.Close()
if err != nil {
log.Error().Err(err).Str("ext", ext).Msg("Failed to close file")
}
}

log.Info().Msg("Shut down complete")
return nil
}

func handleStream(ch chan *nats.Msg, fileHandles map[string]*os.File, wg *sync.WaitGroup) {
for msg := range ch {
switch msg.Header.Get("type") {
case "beast":
b, err := beast.NewFrame(msg.Data, false)
log.Info().
Str("ICAO", b.IcaoStr()).
Str("AVR", b.RawString()).
Str("tag", msg.Header.Get("tag")).
Send()

// TODO: Replace timestamp with our own
_, err = fileHandles["beast"].Write(msg.Data)
if err != nil {
log.Error().Err(err).Send()
}
case "avr":
_, err := fileHandles["avr"].Write(append(msg.Data, 10))
if err != nil {
log.Error().Err(err).Send()
}
case "sbs1":
_, err := fileHandles["sbs1"].Write(append(msg.Data, 10))
if err != nil {
log.Error().Err(err).Send()
}

}
}
wg.Done()
}
12 changes: 9 additions & 3 deletions cmd/plane.path/avr.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,18 @@ var lastSeenMap sync.Map

// Handle ensures we have enough time between messages for a plane to have travelled the distance it says it did
// this is because we do not have the timestamp for when it was collected when processing AVR frames
func (fm *timeFiddler) Handle(f tracker.Frame) tracker.Frame {
switch f.(type) {
func (fm *timeFiddler) Handle(fe *tracker.FrameEvent) tracker.Frame {
if nil == fe {
return nil
}
f := fe.Frame()
if nil == f {
return nil
}
switch frame := f.(type) {
case *mode_s.Frame:
lastSeen, _ := lastSeenMap.LoadOrStore(f.Icao(), time.Now().Add(-24*time.Hour))
t := lastSeen.(time.Time)
frame := f.(*mode_s.Frame)
if 17 == frame.DownLinkType() {
switch frame.MessageTypeString() {
case mode_s.DF17FrameSurfacePos, mode_s.DF17FrameAirPositionGnss, mode_s.DF17FrameAirPositionBarometric:
Expand Down
13 changes: 11 additions & 2 deletions cmd/pw_ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@ import (
"plane.watch/lib/dedupe"
"plane.watch/lib/example_finder"
"plane.watch/lib/logging"
"plane.watch/lib/middleware"
"plane.watch/lib/monitoring"
"plane.watch/lib/nats_io"
"plane.watch/lib/setup"
"plane.watch/lib/sink"
"plane.watch/lib/tracker"
)

Expand Down Expand Up @@ -111,11 +114,17 @@ func commonSetup(c *cli.Context) (*tracker.Tracker, error) {
trk.AddMiddleware(dedupe.NewFilter(dedupe.WithDedupeCounter(prometheusOutputFrameDedupe)))
// trk.AddMiddleware(dedupe.NewFilterBTree(dedupe.WithDedupeCounterBTree(prometheusOutputFrameDedupe), dedupe.WithBtreeDegree(16)))
}
sink, err := setup.HandleSinkFlag(c, "pw_ingest")
sinkDest, err := setup.HandleSinkFlag(c, "pw_ingest")
if nil != err {
return nil, err
}
trk.SetSink(sink)
trk.SetSink(sinkDest)

if sinkType, ok := sinkDest.(*sink.Sink); ok {
if ns, ok := sinkType.Server().(*nats_io.Server); ok {
trk.AddMiddleware(middleware.NewIngestTap(ns))
}
}

producers, err := setup.HandleSourceFlags(c)
if nil != err {
Expand Down
3 changes: 2 additions & 1 deletion cmd/pw_ws_broker/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,8 @@ func (cl *ClientList) performSearch(query string) ws_protocol.SearchResult {

// Airport Lookup, if we have a nats connection
if nil != cl.broker.natsRpc {
resp, err := cl.broker.natsRpc.Request(export.NatsApiSearchAirportV1, []byte(query), time.Second)
headers := map[string]string{}
resp, err := cl.broker.natsRpc.Request(export.NatsApiSearchAirportV1, []byte(query), headers, time.Second)
if nil != err {
log.Error().Err(err).Msg("Failed to search for airport")
} else {
Expand Down
6 changes: 5 additions & 1 deletion cmd/recorder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,11 @@ func (fp *frameProcessor) String() string {
func (fp *frameProcessor) Listen() chan tracker.Event {
return fp.events
}
func (fp *frameProcessor) Handle(frame tracker.Frame) tracker.Frame {
func (fp *frameProcessor) Handle(fe *tracker.FrameEvent) tracker.Frame {
if nil == fe {
return nil
}
frame := fe.Frame()
if nil == frame {
return nil
}
Expand Down
6 changes: 5 additions & 1 deletion lib/dedupe/dedupe.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ func NewFilter(opts ...Option) *Filter {
return &f
}

func (f *Filter) Handle(frame tracker.Frame) tracker.Frame {
func (f *Filter) Handle(fe *tracker.FrameEvent) tracker.Frame {
if nil == fe {
return nil
}
frame := fe.Frame()
if nil == frame {
return nil
}
Expand Down
16 changes: 10 additions & 6 deletions lib/dedupe/dedupe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dedupe

import (
"github.com/rs/zerolog"
"plane.watch/lib/tracker"
"plane.watch/lib/tracker/beast"
"testing"
)
Expand All @@ -17,14 +18,15 @@ func TestFilter_Handle(t *testing.T) {
if nil != err {
t.Error(err)
}
fe := tracker.NewFrameEvent(&frame, nil)

resp := filter.Handle(&frame)
resp := filter.Handle(&fe)

if resp == nil {
t.Errorf("Expected the same frame back")
}

if nil != filter.Handle(&frame) {
if nil != filter.Handle(&fe) {
t.Errorf("Got a duplicated frame back")
}
}
Expand All @@ -33,10 +35,11 @@ func BenchmarkFilter_HandleDuplicates(b *testing.B) {
filter := NewFilter()

frame, _ := beast.NewFrame(beastModeSShort, false)
filter.Handle(&frame)
fe := tracker.NewFrameEvent(&frame, nil)
filter.Handle(&fe)

for n := 0; n < b.N; n++ {
if nil != filter.Handle(&frame) {
if nil != filter.Handle(&fe) {
b.Error("Should not have gotten a non empty response - duplicate handled incorrectly!?")
}
}
Expand All @@ -49,11 +52,12 @@ func BenchmarkFilter_HandleUnique(b *testing.B) {
for n := 0; n < b.N; n++ {
beastModeSTest := []byte{0x1a, 0x32, 0x22, 0x1b, 0x54, 0xf0, 0x81, 0x2b, 0x26, byte(n >> 24), byte(n >> 16), byte(n >> 8), byte(n), 0, 0, 0}
msg, _ := beast.NewFrame(beastModeSTest, false)
fe := tracker.NewFrameEvent(&msg, nil)

if nil == filter.Handle(&msg) {
if nil == filter.Handle(&fe) {
b.Fatal("Expected to insert new message")
}
if nil != filter.Handle(&msg) {
if nil != filter.Handle(&fe) {
b.Fatal("Failed duplicate insert")
}
}
Expand Down
6 changes: 5 additions & 1 deletion lib/example_finder/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,11 @@ func (f *Filter) String() string {
return "Example Finder/Filter"
}

func (f *Filter) Handle(frame tracker.Frame) tracker.Frame {
func (f *Filter) Handle(fe *tracker.FrameEvent) tracker.Frame {
if nil == fe {
return nil
}
frame := fe.Frame()
if nil == frame {
return nil
}
Expand Down
Loading

0 comments on commit 0949178

Please sign in to comment.