diff --git a/cmd/ingest_tap/main.go b/cmd/ingest_tap/main.go index 9f4e63ef..d1f2815b 100644 --- a/cmd/ingest_tap/main.go +++ b/cmd/ingest_tap/main.go @@ -1,46 +1,34 @@ package main import ( - "github.com/nats-io/nats.go" "github.com/rs/zerolog/log" "github.com/urfave/cli/v2" "os" "os/signal" "plane.watch/lib/logging" - "plane.watch/lib/nats_io" - "plane.watch/lib/randstr" "plane.watch/lib/tracker/beast" - "sync" "syscall" - "time" ) const ( - natsUrl = "nats" - apiKey = "api-key" - icao = "icao" - logFile = "file" + natsURL = "nats" + websocketURL = "websocket-url" + feederAPIKey = "api-key" + icao = "icao" + logFile = "file" ) func main() { app := cli.NewApp() - app.Name = "pw_ingest tap" + app.Name = "pw_ingest natsTap" app.Description = "Ask all of the pw_ingest instances on the nats bus to feed matching data to this terminal.\n" + "Matching beast/avr/sbs1 messages will be forwarded to you" app.Commands = cli.Commands{ { - Name: "log", - Description: "Logs matching data to a file", + Name: "log", + Usage: "Logs matching data to a file", Flags: []cli.Flag{ - &cli.StringFlag{ - Name: icao, - Usage: "if specified, only frames from the plane with the specified ICAO will be sent", - }, - &cli.StringFlag{ - Name: apiKey, - Usage: "the plane.watch api UUID for the given feeder you want to investigate", - }, &cli.StringFlag{ Name: logFile, Value: "captured-frames", @@ -49,14 +37,32 @@ func main() { }, Action: logMatching, }, + { + Name: "tui", + Usage: "Shows an Interactive Text User Interface", + Action: runTui, + }, } app.Flags = []cli.Flag{ &cli.StringFlag{ - Name: natsUrl, + Name: natsURL, Usage: "nats url", Value: "nats://localhost:4222/", }, + &cli.StringFlag{ + Name: websocketURL, + Usage: "Plane Watch WebSocket Api URL", + Value: "https://localhost/planes", + }, + &cli.StringFlag{ + Name: icao, + Usage: "if specified, only frames from the plane with the specified ICAO will be sent", + }, + &cli.StringFlag{ + Name: feederAPIKey, + Usage: "the plane.watch api UUID for the given feeder you want to investigate", + }, } logging.IncludeVerbosityFlags(app) app.Before = func(c *cli.Context) error { @@ -72,6 +78,13 @@ func main() { func logMatching(c *cli.Context) error { logging.ConfigureForCli() + + tapper := NewPlaneWatchTapper(WithLogger(log.Logger)) + if err := tapper.Connect(c.String(natsURL), ""); err != nil { + return err + } + defer tapper.Disconnect() + fileHandles := make(map[string]*os.File) exts := []string{"beast", "avr", "sbs1"} for _, ext := range exts { @@ -84,55 +97,17 @@ func logMatching(c *cli.Context) error { fileHandles[ext] = fh } - natsSvr, err := nats_io.NewServer( - nats_io.WithConnections(true, true), - nats_io.WithServer(c.String(natsUrl), "ingest-tap"), - ) - if nil != err { + if err := tapper.IncomingDataTap(c.String(icao), c.String(feederAPIKey), handleStream(fileHandles)); err != nil { return err } - tapSubject := "ingest-tap-" + randstr.RandString(20) - ch, err := natsSvr.Subscribe(tapSubject) - - var wg sync.WaitGroup - wg.Add(1) - - go handleStream(ch, fileHandles, &wg) - - headers := map[string]string{ - "action": "add", - "api-key": c.String(apiKey), - "icao": c.String(icao), - "subject": tapSubject, - } - - response, errRq := natsSvr.Request("v1.pw-ingest.tap", []byte{}, headers, time.Second) - if nil != errRq { - log.Error().Err(err).Msg("Failed to request tap") - return err - } - log.Debug().Str("response", string(response)).Msg("request response") - chSignal := make(chan os.Signal, 1) signal.Notify(chSignal, syscall.SIGINT, syscall.SIGTERM) <-chSignal // wait for our cancel signal log.Info().Msg("Shutting down") - // ask to step sending - headers["action"] = "remove" - response, errRq = natsSvr.Request("v1.pw-ingest.tap", []byte{}, headers, time.Second) - if nil != errRq { - log.Error().Err(err).Msg("Failed to stop tap") - return err - } - log.Debug().Str("response", string(response)).Msg("request response") - - close(ch) - wg.Wait() - for ext, f := range fileHandles { - err = f.Close() + err := f.Close() if err != nil { log.Error().Err(err).Str("ext", ext).Msg("Failed to close file") } @@ -142,34 +117,36 @@ func logMatching(c *cli.Context) error { return nil } -func handleStream(ch chan *nats.Msg, fileHandles map[string]*os.File, wg *sync.WaitGroup) { - for msg := range ch { - switch msg.Header.Get("type") { +func handleStream(fileHandles map[string]*os.File) IngestTapHandler { + return func(frameType, tag string, msgData []byte) { + switch frameType { case "beast": - b, err := beast.NewFrame(msg.Data, false) + b, err := beast.NewFrame(msgData, false) + if nil != err { + log.Error().Err(err) + return + } log.Info(). Str("ICAO", b.IcaoStr()). Str("AVR", b.RawString()). - Str("tag", msg.Header.Get("tag")). + Str("tag", tag). Send() // TODO: Replace timestamp with our own - _, err = fileHandles["beast"].Write(msg.Data) + _, err = fileHandles["beast"].Write(msgData) if err != nil { log.Error().Err(err).Send() } case "avr": - _, err := fileHandles["avr"].Write(append(msg.Data, 10)) + _, err := fileHandles["avr"].Write(append(msgData, 10)) // new line append if err != nil { log.Error().Err(err).Send() } case "sbs1": - _, err := fileHandles["sbs1"].Write(append(msg.Data, 10)) + _, err := fileHandles["sbs1"].Write(append(msgData, 10)) if err != nil { log.Error().Err(err).Send() } - } } - wg.Done() } diff --git a/cmd/ingest_tap/model.go b/cmd/ingest_tap/model.go new file mode 100644 index 00000000..b2900e14 --- /dev/null +++ b/cmd/ingest_tap/model.go @@ -0,0 +1,291 @@ +package main + +import ( + "github.com/charmbracelet/bubbles/help" + "github.com/charmbracelet/bubbles/table" + "github.com/charmbracelet/bubbles/viewport" + "github.com/charmbracelet/lipgloss" + "github.com/rs/zerolog" + "plane.watch/lib/export" + "plane.watch/lib/tracker/beast" + "plane.watch/lib/tracker/mode_s" + "plane.watch/lib/tracker/sbs1" + "sort" + "strconv" + "strings" + "sync" + "time" +) + +type ( + planesSource int + + sourceInfo struct { + mu sync.Mutex + frameCount uint64 + planes map[string]*export.PlaneLocation + frames map[string]uint64 + icaos []string + } + + model struct { + logger zerolog.Logger + + startTime time.Time + + tapper *PlaneWatchTapper + + focusIcaoList []string + + help help.Model + + statsTable table.Model + selectedTable table.Model + planesTable table.Model + source planesSource + selectedIcao string + selectedCallSign string + + logView viewport.Model + logViewReady bool + + width, height int + tickCount uint64 + tickDuration time.Duration + + heading lipgloss.Style + + logs *strings.Builder + + incomingMutex sync.Mutex + + feederSources map[string]int + incomingIcaoFrames map[uint32]int + numIncomingBeast uint64 + numIncomingAvr uint64 + numIncomingSbs1 uint64 + + afterIngest sourceInfo + afterEnrichment sourceInfo + afterRouterLow sourceInfo + afterRouterHigh sourceInfo + finalLow sourceInfo + finalHigh sourceInfo + } + + timerTick time.Time +) + +func initialModel(natsURL, wsURL string) (*model, error) { + logs := &strings.Builder{} + logger := zerolog.New(zerolog.ConsoleWriter{Out: logs, TimeFormat: time.UnixDate}).With().Timestamp().Logger() + + m := &model{ + logger: logger.With().Str("app", "model").Logger(), + startTime: time.Now(), + tapper: NewPlaneWatchTapper(WithLogger(logger)), + help: help.New(), + tickDuration: time.Millisecond * 16, + focusIcaoList: make([]string, 0), + source: planesSourceWSLow, + logs: logs, + + feederSources: make(map[string]int), + incomingIcaoFrames: make(map[uint32]int), + } + if err := m.tapper.Connect(natsURL, wsURL); err != nil { + return nil, err + } + m.buildTables() + m.configureStyles() + + m.afterIngest.init() + m.afterEnrichment.init() + m.afterRouterLow.init() + m.afterRouterHigh.init() + m.finalLow.init() + m.finalHigh.init() + m.logger.Debug().Msg("Startup Init Complete") + + return m, nil +} + +func (m *model) configureStyles() { + m.heading = func() lipgloss.Style { + b := lipgloss.RoundedBorder() + b.Right = "├" + return lipgloss.NewStyle().BorderStyle(b).Padding(0, 1) + }() + + s := table.DefaultStyles() + s.Header = s.Header. + BorderStyle(lipgloss.NormalBorder()). + BorderForeground(lipgloss.Color("240")). + BorderBottom(true). + Bold(false) + s.Selected = s.Selected. + Foreground(lipgloss.Color("229")). + Background(lipgloss.Color("57")). + Bold(false) + m.statsTable.SetStyles(s) + m.selectedTable.SetStyles(s) + m.planesTable.SetStyles(s) +} + +func (m *model) buildTables() { + m.statsTable = table.New( + table.WithColumns([]table.Column{ + {Title: "Receivers", Width: 11}, + {Title: "Planes", Width: 10}, + + {Title: "Beast", Width: 10}, + {Title: "Avr", Width: 5}, + {Title: "Sbs1", Width: 5}, + + {Title: "Frames", Width: 10}, + + {Title: "Enriched", Width: 10}, + + {Title: "Routed Low", Width: 10}, + {Title: "Routed High", Width: 11}, + + {Title: "WS Low", Width: 10}, + {Title: "WS High", Width: 10}, + }), + table.WithRows([]table.Row{ + {"0", "0", "0", "0", "0", "0", "0", "0", "0", "0"}, + }), + table.WithHeight(2), + table.WithFocused(false), + ) + m.selectedTable = table.New( + table.WithColumns([]table.Column{ + {Title: "Source", Width: 20}, + {Title: "# Updates", Width: 10}, + {Title: "Squawk", Width: 9}, + {Title: "Lat", Width: 10}, + {Title: "Lon", Width: 10}, + {Title: "Altitude", Width: 10}, + {Title: "Vert Rate", Width: 10}, + {Title: "Heading", Width: 10}, + }), + table.WithRows(m.defaultSelectedTableRows()), + table.WithHeight(8), + table.WithFocused(false), + ) + m.planesTable = table.New( + table.WithColumns([]table.Column{ + {Title: "ICAO", Width: 6}, + {Title: "Receivers", Width: 9}, + {Title: "CallSign", Width: 9}, + {Title: "Squawk", Width: 9}, + {Title: "Lat", Width: 10}, + {Title: "Lon", Width: 10}, + {Title: "Altitude", Width: 10}, + {Title: "Vert Rate", Width: 10}, + {Title: "Heading", Width: 10}, + }), + table.WithHeight(10), + table.WithRows([]table.Row{}), + table.WithFocused(true), + ) +} + +func (m *model) defaultSelectedTableRows() []table.Row { + return []table.Row{ + {planesSourceIncoming.String(), "", "", "", "", "", "", ""}, + {planesSourceIngest.String(), "", "", "", "", "", "", ""}, + {planesSourceEnriched.String(), "", "", "", "", "", "", ""}, + {planesSourceRoutedLow.String(), "", "", "", "", "", "", ""}, + {planesSourceRoutedHigh.String(), "", "", "", "", "", "", ""}, + {planesSourceWSLow.String(), "", "", "", "", "", "", ""}, + {planesSourceWSHigh.String(), "", "", "", "", "", "", ""}, + } +} + +func (m *model) handleIncomingData(frameType, tag string, data []byte) { + m.incomingMutex.Lock() + defer m.incomingMutex.Unlock() + + m.feederSources[tag]++ + + switch frameType { + case "beast": + frame, err := beast.NewFrame(data, false) + if nil != err { + return + } + m.incomingIcaoFrames[frame.Icao()]++ + m.numIncomingBeast++ + case "avr": + frame, err := mode_s.DecodeString(string(data), m.startTime) + if nil == err { + return + } + m.incomingIcaoFrames[frame.Icao()]++ + m.numIncomingAvr++ + case "sbs1": + frame := sbs1.NewFrame(string(data)) + m.incomingIcaoFrames[frame.Icao()]++ + m.numIncomingSbs1++ + } +} + +func (si *sourceInfo) init() { + si.planes = make(map[string]*export.PlaneLocation) + si.frames = make(map[string]uint64) +} + +func (si *sourceInfo) update(loc *export.PlaneLocation) { + si.mu.Lock() + defer si.mu.Unlock() + if _, ok := si.frames[loc.Icao]; !ok { + si.icaos = append(si.icaos, loc.Icao) + sort.Strings(si.icaos) + } + si.frameCount++ + si.planes[loc.Icao] = loc + si.frames[loc.Icao]++ +} + +func (si *sourceInfo) numFrames() string { + si.mu.Lock() + defer si.mu.Unlock() + return strconv.FormatUint(si.frameCount, 10) +} + +func (si *sourceInfo) numFramesFor(icao string) string { + si.mu.Lock() + defer si.mu.Unlock() + return strconv.FormatUint(si.frames[icao], 10) +} + +func (si *sourceInfo) getLoc(icao string) *export.PlaneLocation { + si.mu.Lock() + defer si.mu.Unlock() + p := si.planes[icao] + if nil == p { + return nil + } + return p +} + +func (ps planesSource) String() string { + switch ps { + case planesSourceIncoming: + return "Incoming Data" + case planesSourceIngest: + return "After Ingest" + case planesSourceEnriched: + return "After Enrichment" + case planesSourceRoutedLow: + return "After Routing - Low" + case planesSourceRoutedHigh: + return "After Routing - High" + case planesSourceWSLow: + return "Websocket - Low" + case planesSourceWSHigh: + return "Websocket - High" + } + return "Unknown" +} diff --git a/cmd/ingest_tap/planewatch.go b/cmd/ingest_tap/planewatch.go new file mode 100644 index 00000000..35c2296b --- /dev/null +++ b/cmd/ingest_tap/planewatch.go @@ -0,0 +1,288 @@ +package main + +import ( + "encoding/json" + "fmt" + "github.com/nats-io/nats.go" + "github.com/rs/zerolog" + "plane.watch/lib/export" + "plane.watch/lib/middleware" + "plane.watch/lib/nats_io" + "plane.watch/lib/randstr" + "plane.watch/lib/sink" + "plane.watch/lib/ws_protocol" + "sync" + "time" +) + +const ( + tapActionAdd = "add" + tapActionRemove = "remove" +) + +type ( + tapHeaders map[string]string + + PlaneWatchTapper struct { + natsSvr *nats_io.Server + wsLow, wsHigh *ws_protocol.WsClient + + exitChannels map[string]chan bool + taps []tapHeaders + + logger zerolog.Logger + + queueLocations string + queueLocationsEnriched string + queueLocationsEnrichedLow string + queueLocationsEnrichedHigh string + + wsHandlersLow []*wsFilterFunc + wsHandlersHigh []*wsFilterFunc + mu sync.Mutex + } + + wsFilterFunc struct { + icao, feederTag string + speed string + filter wsHandler + } + + IngestTapHandler func(frameType, tag string, data []byte) + wsHandler func(location *export.PlaneLocation) + + Option func(*PlaneWatchTapper) +) + +func NewPlaneWatchTapper(opts ...Option) *PlaneWatchTapper { + pw := &PlaneWatchTapper{ + exitChannels: make(map[string]chan bool), + taps: make([]tapHeaders, 0), + logger: zerolog.Logger{}, + queueLocations: sink.QueueLocationUpdates, + queueLocationsEnriched: "location-updates-enriched", + queueLocationsEnrichedLow: "location-updates-enriched-reduced", + queueLocationsEnrichedHigh: "location-updates-enriched-merged", + + wsHandlersLow: make([]*wsFilterFunc, 0), + wsHandlersHigh: make([]*wsFilterFunc, 0), + } + + for _, opt := range opts { + opt(pw) + } + + return pw +} + +func WithLogger(logger zerolog.Logger) Option { + return func(tapper *PlaneWatchTapper) { + tapper.logger = logger + } +} + +func (pw *PlaneWatchTapper) Connect(natsServer, wsServer string) error { + var err error + pw.natsSvr, err = nats_io.NewServer( + nats_io.WithConnections(true, true), + nats_io.WithServer(natsServer, "ingest-nats-tap"), + ) + if nil != err { + return err + } + + pw.wsLow = ws_protocol.NewClient( + ws_protocol.WithSourceURL(wsServer), + ws_protocol.WithResponseHandler(pw.wsHandler(&pw.wsHandlersLow)), + ws_protocol.WithLogger(pw.logger.With().Str("ws", "low").Logger()), + ) + if err = pw.wsLow.Connect(); err != nil { + return err + } + pw.wsHigh = ws_protocol.NewClient( + ws_protocol.WithSourceURL(wsServer), + ws_protocol.WithResponseHandler(pw.wsHandler(&pw.wsHandlersHigh)), + ws_protocol.WithLogger(pw.logger.With().Str("ws", "high").Logger()), + ) + return pw.wsHigh.Connect() +} + +func (pw *PlaneWatchTapper) Disconnect() { + // request things stop sending + for _, tapName := range pw.taps { + pw.RemoveIncomingTap(tapName) + } + + // drains the incoming queues before closing all the things + pw.natsSvr.Close() + if err := pw.wsLow.Disconnect(); err != nil { + pw.logger.Error().Err(err).Msg("Did not disconnect from websocket, low") + } + if err := pw.wsHigh.Disconnect(); err != nil { + pw.logger.Error().Err(err).Msg("Did not disconnect from websocket, high") + } + + // simple sequential disconnect + for _, exitChan := range pw.exitChannels { + exitChan <- true + } +} + +func (pw *PlaneWatchTapper) IncomingDataTap(icao, feederKey string, writer IngestTapHandler) error { + tapSubject := "ingest-natsTap-" + randstr.RandString(20) + ch, err := pw.natsSvr.Subscribe(tapSubject) + if err != nil { + return fmt.Errorf("failed to open Nats tap: %w", err) + } + pw.exitChannels[tapSubject] = make(chan bool) + + go func(ch chan *nats.Msg, exitChan chan bool) { + for { + select { + case msg := <-ch: + writer(msg.Header.Get("type"), msg.Header.Get("tag"), msg.Data) + case <-exitChan: + return + } + } + }(ch, pw.exitChannels[tapSubject]) + + headers := tapHeaders{ + "action": tapActionAdd, + "api-key": feederKey, // the ingest feeder we wish to target + "icao": icao, // the aircraft we wish to look at + "subject": tapSubject, + } + + response, errRq := pw.natsSvr.Request(middleware.NatsAPIv1PwIngestTap, []byte{}, headers, time.Second) + if nil != errRq { + pw.logger.Error().Err(err).Msg("Failed to request natsTap, are there any pw_ingest's?") + return errRq + } + pw.logger.Debug().Str("response", string(response)).Msg("request response") + pw.taps = append(pw.taps, headers) + + return nil +} + +func (pw *PlaneWatchTapper) RemoveIncomingTap(tap tapHeaders) { + tap["action"] = tapActionRemove + response, errRq := pw.natsSvr.Request(middleware.NatsAPIv1PwIngestTap, []byte{}, tap, time.Second) + if nil != errRq { + pw.logger.Error().Err(errRq).Msg("Failed to stop natsTap") + return + } + pw.logger.Debug().Str("response", string(response)).Msg("request response") +} + +func (pw *PlaneWatchTapper) natsTap(icao, feederKey string, subject string, callback func(*export.PlaneLocation)) error { + listenCh, err := pw.natsSvr.Subscribe(subject) + if nil != err { + return err + } + tapSubject := subject + "-" + randstr.RandString(20) + + pw.exitChannels[tapSubject] = make(chan bool) + + go func(ch chan *nats.Msg, exitChan chan bool) { + for { + select { + case msg := <-ch: + var planeLocation export.PlaneLocation + err = json.Unmarshal(msg.Data, &planeLocation) + if nil != err { + pw.logger.Error().Err(err) + continue + } + if icao != "" && icao != planeLocation.Icao { + continue + } + if feederKey != "" && feederKey != planeLocation.SourceTag { + continue + } + callback(&planeLocation) + case <-exitChan: + return + } + } + }(listenCh, pw.exitChannels[tapSubject]) + + return nil +} + +func (pw *PlaneWatchTapper) AfterIngestTap(icao, feederKey string, callback func(*export.PlaneLocation)) error { + return pw.natsTap(icao, feederKey, pw.queueLocations, callback) +} + +func (pw *PlaneWatchTapper) AfterEnrichmentTap(icao, feederKey string, callback func(*export.PlaneLocation)) error { + return pw.natsTap(icao, feederKey, pw.queueLocationsEnriched, callback) +} + +func (pw *PlaneWatchTapper) AfterRouterLowTap(icao, feederKey string, callback func(*export.PlaneLocation)) error { + return pw.natsTap(icao, feederKey, pw.queueLocationsEnrichedLow, callback) +} + +func (pw *PlaneWatchTapper) AfterRouterHighTap(icao, feederKey string, callback func(*export.PlaneLocation)) error { + return pw.natsTap(icao, feederKey, pw.queueLocationsEnrichedHigh, callback) +} + +func (pw *PlaneWatchTapper) WebSocketTapLow(icao, feederKey string, callback func(*export.PlaneLocation)) error { + pw.addWsFilterFunc(icao, feederKey, "low", callback) + return pw.wsLow.SubscribeAllLow() +} +func (pw *PlaneWatchTapper) WebSocketTapHigh(icao, feederKey string, callback func(*export.PlaneLocation)) error { + pw.addWsFilterFunc(icao, feederKey, "high", callback) + return pw.wsHigh.SubscribeAllHigh() +} + +func (pw *PlaneWatchTapper) addWsFilterFunc(icao, feederKey, speed string, filter wsHandler) { + pw.mu.Lock() + defer pw.mu.Unlock() + if speed == "low" { + pw.wsHandlersLow = append(pw.wsHandlersLow, &wsFilterFunc{ + icao: icao, + feederTag: feederKey, + speed: speed, + filter: filter, + }) + } else { + pw.wsHandlersHigh = append(pw.wsHandlersHigh, &wsFilterFunc{ + icao: icao, + feederTag: feederKey, + filter: filter, + }) + } +} + +// wsHandler runs through all the handlers we have and calls maybeSend for each plane location +func (pw *PlaneWatchTapper) wsHandler(handlers *[]*wsFilterFunc) func(r *ws_protocol.WsResponse) { + return func(r *ws_protocol.WsResponse) { + pw.mu.Lock() + defer pw.mu.Unlock() + for _, ff := range *handlers { + pw.maybeSend(ff, r.Location) + if r.Locations != nil { + for _, loc := range r.Locations { + pw.maybeSend(ff, loc) + } + } + } + } +} + +// maybeSend takes into account the filter and calls the user provided handler if it matches +func (pw *PlaneWatchTapper) maybeSend(ff *wsFilterFunc, loc *export.PlaneLocation) { + if nil == loc { + return + } + if ff.icao != "" && ff.icao != loc.Icao { + return + } + if ff.feederTag != "" { + if _, ok := loc.SourceTags[ff.feederTag]; !ok { + return + } + } + + ff.filter(loc) +} diff --git a/cmd/ingest_tap/tui.go b/cmd/ingest_tap/tui.go new file mode 100644 index 00000000..5e7e74a7 --- /dev/null +++ b/cmd/ingest_tap/tui.go @@ -0,0 +1,325 @@ +package main + +import ( + "github.com/charmbracelet/bubbles/key" + "github.com/charmbracelet/bubbles/table" + "github.com/charmbracelet/bubbles/viewport" + tea "github.com/charmbracelet/bubbletea" + "github.com/charmbracelet/lipgloss" + "github.com/urfave/cli/v2" + "math" + "plane.watch/lib/export" + "strconv" + "time" +) + +const ( + planesSourceIncoming planesSource = iota + planesSourceIngest + planesSourceEnriched + planesSourceRoutedLow + planesSourceRoutedHigh + planesSourceWSLow + planesSourceWSHigh +) + +type keyMap map[string]key.Binding + +var keyBindings = keyMap{ + "Up": key.NewBinding(key.WithKeys("up"), key.WithHelp("↑", "Move up in the aircraft list")), + "Down": key.NewBinding(key.WithKeys("up"), key.WithHelp("↓", "Move up in the aircraft list")), + "PageUp": key.NewBinding(key.WithKeys("up"), key.WithHelp("PgUp", "Move a page up in the aircraft list")), + "PageDown": key.NewBinding(key.WithKeys("up"), key.WithHelp("PgDn", "Move a page down in the aircraft list")), + + "Source": key.NewBinding(key.WithKeys("s"), key.WithHelp("s", "Switch Plane List Data Source")), + "Select": key.NewBinding(key.WithKeys(tea.KeyEnter.String()), key.WithHelp(tea.KeyEnter.String(), "Select a plane")), + + "Quit": key.NewBinding(key.WithKeys("q", "ctrl+c"), key.WithHelp("q/ctrl+c", "Exit")), + + "Help": key.NewBinding(key.WithKeys("h", "?"), key.WithHelp("h/?", "Show Help")), +} + +func runTui(c *cli.Context) error { + m, err := initialModel(c.String(natsURL), c.String(websocketURL)) + if err != nil { + return err + } + defer m.tapper.Disconnect() + + filterIcao := c.String(icao) + filterFeeder := c.String(feederAPIKey) + if err = m.tapper.IncomingDataTap(filterIcao, filterFeeder, m.handleIncomingData); err != nil { + return err + } + + if err = m.tapper.AfterIngestTap(filterIcao, filterFeeder, m.afterIngest.update); err != nil { + return err + } + if err = m.tapper.AfterEnrichmentTap(filterIcao, filterFeeder, m.afterEnrichment.update); err != nil { + return err + } + if err = m.tapper.AfterRouterLowTap(filterIcao, filterFeeder, m.afterRouterLow.update); err != nil { + return err + } + if err = m.tapper.AfterRouterHighTap(filterIcao, filterFeeder, m.afterRouterHigh.update); err != nil { + return err + } + if err = m.tapper.WebSocketTapLow(filterIcao, filterFeeder, m.finalLow.update); err != nil { + return err + } + if err = m.tapper.WebSocketTapHigh(filterIcao, filterFeeder, m.finalHigh.update); err != nil { + return err + } + + if _, err = tea.NewProgram(m, tea.WithAltScreen()).Run(); nil != err { + return err + } + return nil +} + +func (m *model) Init() tea.Cmd { + return m.tickCmd() +} + +func (m *model) Update(msg tea.Msg) (tea.Model, tea.Cmd) { + switch tMsg := msg.(type) { + case tea.KeyMsg: + switch { + case key.Matches(tMsg, keyBindings["Quit"]): + return m, tea.Quit + case key.Matches(tMsg, keyBindings["Source"]): + if m.source == planesSourceWSHigh { + m.source = planesSourceIngest + } else { + m.source++ + } + case key.Matches(tMsg, keyBindings["Select"]): + m.selectedIcao = m.planesTable.SelectedRow()[0] + m.selectedCallSign = m.planesTable.SelectedRow()[2] + m.logger.Info(). + Str("icao", m.selectedIcao). + Str("callsign", m.selectedCallSign). + Msg("Selecting Aircraft") + case key.Matches(tMsg, keyBindings["Help"]): + m.help.ShowAll = !m.help.ShowAll + } + case tea.WindowSizeMsg: + m.width = tMsg.Width + m.height = tMsg.Height + m.handleWindowSizing() + + return m, nil + case timerTick: + m.tickCount++ + m.updateIncomingStats() + m.updateAircraftTable() + + m.updateSelectedAircraftTable() + return m, m.tickCmd() + } + var cmd1, cmd2, cmd3 tea.Cmd + m.statsTable, cmd1 = m.statsTable.Update(msg) + m.selectedTable, cmd2 = m.selectedTable.Update(msg) + m.planesTable, cmd3 = m.planesTable.Update(msg) + + return m, tea.Batch(cmd1, cmd2, cmd3) +} + +func (m *model) handleWindowSizing() { + m.statsTable.SetWidth(m.width) + m.selectedTable.SetWidth(m.width) + m.planesTable.SetWidth(m.width) + m.help.Width = m.width + + headingHeight := lipgloss.Height(m.heading.Render("test")) + statsTableHeight := 3 + selectedTableHeight := 9 + + planeViewTop := statsTableHeight + selectedTableHeight + (headingHeight * 3) + + planesViewHeight := 20 + if planesViewHeight+planeViewTop > m.height { + planesViewHeight = min(0, m.height-planeViewTop) + } + m.planesTable.SetHeight(planesViewHeight) + + logViewTop := statsTableHeight + selectedTableHeight + planesViewHeight + (headingHeight * 4) + logViewHeight := 15 + if logViewHeight+logViewTop > m.height { + logViewHeight = min(0, m.height-logViewTop) + } + + if !m.logViewReady { + // configure log viewport + m.logViewReady = true + m.logView = viewport.New(m.width, logViewHeight) + m.logView.YPosition = logViewTop + } else { + m.logView.Width = m.width + m.logView.Height = logViewHeight + } +} + +func (m *model) updateIncomingStats() { + m.incomingMutex.Lock() + defer m.incomingMutex.Unlock() + + m.statsTable.SetRows([]table.Row{ + { + // Number of feeders + strconv.Itoa(len(m.feederSources)), + // Number of Planes + strconv.Itoa(len(m.incomingIcaoFrames)), + + // source frame type counts + strconv.FormatUint(m.numIncomingBeast, 10), + strconv.FormatUint(m.numIncomingAvr, 10), + strconv.FormatUint(m.numIncomingSbs1, 10), + + // Ingest parsed out frames + m.afterIngest.numFrames(), + + // enriched frames + m.afterEnrichment.numFrames(), + + // routed low + m.afterRouterLow.numFrames(), + // routed high + m.afterRouterHigh.numFrames(), + + // websocket low + m.finalLow.numFrames(), + m.finalHigh.numFrames(), + }, + }) +} + +func (m *model) updateSelectedAircraftTable() { + if m.selectedIcao == "" { + m.selectedTable.SetRows(m.defaultSelectedTableRows()) + return + } + + m.selectedTable.SetRows([]table.Row{ + m.selectedTableIncomingRow(), + m.selectedTableRow(planesSourceIngest, &m.afterIngest), + m.selectedTableRow(planesSourceEnriched, &m.afterEnrichment), + m.selectedTableRow(planesSourceRoutedLow, &m.afterRouterLow), + m.selectedTableRow(planesSourceRoutedHigh, &m.afterRouterHigh), + m.selectedTableRow(planesSourceWSLow, &m.finalLow), + m.selectedTableRow(planesSourceWSHigh, &m.finalHigh), + }) +} + +func (m *model) selectedTableIncomingRow() table.Row { + m.incomingMutex.Lock() + defer m.incomingMutex.Unlock() + icaoInt, err := strconv.ParseUint(m.selectedIcao, 16, 32) + row := m.defaultSelectedTableRows()[0] + if nil != err { + return row + } + row[1] = strconv.Itoa(m.incomingIcaoFrames[uint32(icaoInt)]) + return row +} +func (m *model) selectedTableRow(source planesSource, data *sourceInfo) table.Row { + loc := data.getLoc(m.selectedIcao) + return table.Row{ + source.String(), + data.numFramesFor(m.selectedIcao), + loc.SquawkStr(), + loc.LatStr(), + loc.LonStr(), + loc.AltitudeStr(), + loc.VerticalRateStr(), + loc.HeadingStr(), + } +} + +func (m *model) currentSourceData() *sourceInfo { + switch m.source { + case planesSourceIngest: + return &m.afterIngest + case planesSourceEnriched: + return &m.afterEnrichment + case planesSourceRoutedLow: + return &m.afterRouterLow + case planesSourceRoutedHigh: + return &m.afterRouterHigh + case planesSourceWSLow: + return &m.finalLow + case planesSourceWSHigh: + return &m.finalHigh + } + return nil +} + +func (m *model) updateAircraftTable() { + data := m.currentSourceData() + + if nil == data { + m.planesTable.SetRows([]table.Row{}) + return + } + data.mu.Lock() + defer data.mu.Unlock() + + rows := make([]table.Row, 0, len(data.planes)) + var p *export.PlaneLocation + for _, icaoStr := range data.icaos { + p = data.planes[icaoStr] + rows = append(rows, table.Row{ + icaoStr, + strconv.Itoa(len(p.SourceTags)), + p.CallSignStr(), + p.SquawkStr(), + p.LatStr(), + p.LonStr(), + p.AltitudeStr(), + p.VerticalRateStr(), + p.HeadingStr(), + }) + } + m.planesTable.SetRows(rows) +} + +func (m *model) View() string { + if m.logViewReady && m.logView.Height > 0 { + m.logView.SetContent(m.logs.String()) + m.logView.SetYOffset(math.MaxInt) + } + + view := m.heading.Render("Received Data Stats") + "\n" + + m.statsTable.View() + "\n" + + m.heading.Render("Selected Aircraft "+m.selectedIcao+" "+m.selectedCallSign) + "\n" + + m.selectedTable.View() + "\n" + + view += m.heading.Render("All Aircraft - Source: "+m.source.String()) + "\n" + view += m.planesTable.View() + "\n" + + if m.logViewReady && m.logView.Height > 0 { + view += m.heading.Render("Logs") + "\n" + view += m.logView.View() + "\n" + } + + view += m.help.View(keyBindings) + return view +} + +func (m *model) tickCmd() tea.Cmd { + return tea.Tick(m.tickDuration, func(t time.Time) tea.Msg { + return timerTick(t) + }) +} + +func (k keyMap) ShortHelp() []key.Binding { + return []key.Binding{k["Help"], k["Quit"]} +} + +func (k keyMap) FullHelp() [][]key.Binding { + return [][]key.Binding{ + {k["Up"], k["Down"], k["PgUp"], k["PgDn"]}, + {k["Source"], k["Select"]}, + {k["Help"], k["Quit"]}, + } +} diff --git a/cmd/plane.path/avr.go b/cmd/plane.path/avr.go index 0131c7fe..4be33a5b 100644 --- a/cmd/plane.path/avr.go +++ b/cmd/plane.path/avr.go @@ -11,14 +11,22 @@ import ( "time" ) -type timeFiddler struct { +type TimeFiddler struct { } -func NewTimeFiddler() *timeFiddler { - return &timeFiddler{} +func (fm *TimeFiddler) HealthCheckName() string { + return "Time Fiddler" +} + +func (fm *TimeFiddler) HealthCheck() bool { + return true +} + +func NewTimeFiddler() *TimeFiddler { + return &TimeFiddler{} } -func (fm *timeFiddler) String() string { +func (fm *TimeFiddler) String() string { return "Time Fiddler" } @@ -42,7 +50,7 @@ var lastSeenMap sync.Map // Handle ensures we have enough time between messages for a plane to have travelled the distance it says it did // this is because we do not have the timestamp for when it was collected when processing AVR frames -func (fm *timeFiddler) Handle(fe *tracker.FrameEvent) tracker.Frame { +func (fm *TimeFiddler) Handle(fe *tracker.FrameEvent) tracker.Frame { if nil == fe { return nil } diff --git a/cmd/pw_ws_broker/web.go b/cmd/pw_ws_broker/web.go index e1651524..fc6d93b3 100644 --- a/cmd/pw_ws_broker/web.go +++ b/cmd/pw_ws_broker/web.go @@ -608,7 +608,7 @@ func (c *WsClient) planeProtocolHandler(ctx context.Context, conn *websocket.Con } case planeMsg := <-c.outChan: // if we have a subscription to this planes tile or all tiles - //log.Debug().Str("tile", planeMsg.tile).Str("highlow", planeMsg.highLow).Msg("info") + // log.Debug().Str("tile", planeMsg.tile).Str("highlow", planeMsg.highLow).Msg("info") tileSub, tileOk := subs[planeMsg.tile] allSub, allOk := subs["all"+planeMsg.highLow] if (tileSub && tileOk) || (allSub && allOk) { @@ -647,7 +647,6 @@ func (c *WsClient) planeProtocolHandler(ctx context.Context, conn *websocket.Con // tell prometheus we are no longer caring about the tiles for k := range subs { prometheusSubscriptions.WithLabelValues(k).Dec() - } return err } diff --git a/cmd/recorder/main.go b/cmd/recorder/main.go index 7fb9aa5f..3cb2e7f8 100644 --- a/cmd/recorder/main.go +++ b/cmd/recorder/main.go @@ -30,6 +30,14 @@ type ( } ) +func (fp *frameProcessor) HealthCheckName() string { + return "Frame Processor" +} + +func (fp *frameProcessor) HealthCheck() bool { + return true +} + func main() { app := cli.NewApp() app.Name = "Frame Recorder" diff --git a/go.mod b/go.mod index 2946d8be..af91d70c 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module plane.watch -go 1.19 +go 1.21 require ( github.com/bwmarrin/discordgo v0.27.1 @@ -19,12 +19,16 @@ require ( require ( github.com/ClickHouse/clickhouse-go/v2 v2.14.3 + github.com/charmbracelet/bubbles v0.16.1 + github.com/charmbracelet/bubbletea v0.24.2 + github.com/charmbracelet/lipgloss v0.9.1 github.com/google/btree v1.1.2 github.com/google/uuid v1.3.1 github.com/jmoiron/sqlx v1.3.5 github.com/json-iterator/go v1.1.12 github.com/lib/pq v1.10.9 github.com/paulmach/orb v0.10.0 + github.com/pkg/errors v0.9.1 github.com/simukti/sqldb-logger v0.0.0-20230108155151-646c1a075551 github.com/simukti/sqldb-logger/logadapter/zerologadapter v0.0.0-20230108155151-646c1a075551 ) @@ -32,8 +36,10 @@ require ( require ( github.com/ClickHouse/ch-go v0.58.2 // indirect github.com/andybalholm/brotli v1.0.6 // indirect + github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/containerd/console v1.0.4-0.20230313162750-1ae8d489ac81 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/gin-gonic/gin v1.9.0 // indirect github.com/go-faster/city v1.0.1 // indirect @@ -42,20 +48,25 @@ require ( github.com/gorilla/websocket v1.5.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/klauspost/compress v1.17.1 // indirect + github.com/lucasb-eyer/go-colorful v1.2.0 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.19 // indirect + github.com/mattn/go-localereader v0.0.1 // indirect github.com/mattn/go-runewidth v0.0.15 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/muesli/ansi v0.0.0-20211018074035-2e021307bc4b // indirect + github.com/muesli/cancelreader v0.2.2 // indirect + github.com/muesli/reflow v0.3.0 // indirect + github.com/muesli/termenv v0.15.2 // indirect github.com/nats-io/nats-server/v2 v2.7.4 // indirect github.com/nats-io/nkeys v0.4.5 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/pelletier/go-toml/v2 v2.1.0 // indirect github.com/pierrec/lz4/v4 v4.1.18 // indirect - github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/common v0.44.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect @@ -75,7 +86,9 @@ require ( go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.14.0 // indirect golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect + golang.org/x/sync v0.3.0 // indirect golang.org/x/sys v0.13.0 // indirect + golang.org/x/term v0.13.0 // indirect golang.org/x/text v0.13.0 // indirect google.golang.org/appengine v1.6.8 // indirect google.golang.org/protobuf v1.31.0 // indirect diff --git a/go.sum b/go.sum index 50a22b42..a0f92e86 100644 --- a/go.sum +++ b/go.sum @@ -44,15 +44,25 @@ github.com/ClickHouse/clickhouse-go/v2 v2.14.3 h1:s9SuU3PfJrfJ4SDbVRo6XM2ZWlr7ef github.com/ClickHouse/clickhouse-go/v2 v2.14.3/go.mod h1:qdw8IMGH4Y+PedKlf9QEhFO1ATTSFhh4exQRVIa3y2A= github.com/andybalholm/brotli v1.0.6 h1:Yf9fFpf49Zrxb9NlQaluyE92/+X7UVHlhMNJN2sxfOI= github.com/andybalholm/brotli v1.0.6/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= +github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiErDT4WkJ2k= +github.com/aymanbagabas/go-osc52/v2 v2.0.1/go.mod h1:uYgXzlJ7ZpABp8OJ+exZzJJhRNQ2ASbcXHWsFqH8hp8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bwmarrin/discordgo v0.27.1 h1:ib9AIc/dom1E/fSIulrBwnez0CToJE113ZGt4HoliGY= github.com/bwmarrin/discordgo v0.27.1/go.mod h1:NJZpH+1AfhIcyQsPeuBKsUtYrRnjkyu0kIVMCHkZtRY= github.com/bytedance/sonic v1.8.0 h1:ea0Xadu+sHlu7x5O3gKhRpQ1IKiMrSiHttPF0ybECuA= +github.com/bytedance/sonic v1.8.0/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/charmbracelet/bubbles v0.16.1 h1:6uzpAAaT9ZqKssntbvZMlksWHruQLNxg49H5WdeuYSY= +github.com/charmbracelet/bubbles v0.16.1/go.mod h1:2QCp9LFlEsBQMvIYERr7Ww2H2bA7xen1idUDIzm/+Xc= +github.com/charmbracelet/bubbletea v0.24.2 h1:uaQIKx9Ai6Gdh5zpTbGiWpytMU+CfsPp06RaW2cx/SY= +github.com/charmbracelet/bubbletea v0.24.2/go.mod h1:XdrNrV4J8GiyshTtx3DNuYkR1FDaJmO3l2nejekbsgg= +github.com/charmbracelet/lipgloss v0.9.1 h1:PNyd3jvaJbg4jRHKWXnCj1akQm4rh8dbEzN1p/u1KWg= +github.com/charmbracelet/lipgloss v0.9.1/go.mod h1:1mPmG4cxScwUQALAAnacHaigiiHB9Pmr+v1VEawJl6I= github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 h1:qSGYFH7+jGhDF8vLC+iwCD4WpbV1EBDSzWkJODFLams= +github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= @@ -60,12 +70,15 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= +github.com/containerd/console v1.0.4-0.20230313162750-1ae8d489ac81 h1:q2hJAaP1k2wIvVRd/hEHD7lacgqrCPS+k8g1MndzfWY= +github.com/containerd/console v1.0.4-0.20230313162750-1ae8d489ac81/go.mod h1:YynlIjWYF8myEu6sdkwKIvGQq+cOckRm6So2avqoYAk= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/cpuguy83/go-md2man/v2 v2.0.3 h1:qMCsGGgs+MAzDFyp9LpAe1Lqy/fY/qCovCm0qnXZOBM= github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= @@ -73,6 +86,7 @@ github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5y github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/frankban/quicktest v1.14.4 h1:g2rn0vABPOOXmZUj+vbmUp0lPoXEMuhTpIluN0XL9UY= +github.com/frankban/quicktest v1.14.4/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= @@ -90,10 +104,13 @@ github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2 github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8= github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA= +github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY= github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA= github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY= +github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY= github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GOhaH6EGOAJShg8Id5JGkI= github.com/go-playground/validator/v10 v10.11.2 h1:q3SHpufmypg+erIExEKUmsgmhDTyhcJ38oeKGACXohU= +github.com/go-playground/validator/v10 v10.11.2/go.mod h1:NieE624vt4SCTJtD87arVLvdmjPAeV8BQlHtMnw9D7s= github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE= github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee h1:s+21KNqlpePfkah2I+gwHF8xmJWRjooY+5248k6m4A0= @@ -103,6 +120,7 @@ github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6Wezm github.com/gobwas/ws v1.0.2 h1:CoAavW/wd/kulfZmSIBt6p24n4j7tHgNVCjsfHVNUbo= github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM= github.com/goccy/go-json v0.10.0 h1:mXKd9Qw4NuzShiRlOXKews24ufknHO7gx30lsDyokKA= +github.com/goccy/go-json v0.10.0/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= @@ -150,6 +168,7 @@ github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= @@ -195,19 +214,25 @@ github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47e github.com/klauspost/compress v1.17.1 h1:NE3C767s2ak2bweCZo3+rdP4U/HoyVXLv/X9f2gPS5g= github.com/klauspost/compress v1.17.1/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= +github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/kpawlik/geojson v0.0.0-20171201195549-1a4f120c6b41 h1:hmhQApZRZctFnYW02XZBC9jKR31R7c4QbXbqBtQGZeg= github.com/kpawlik/geojson v0.0.0-20171201195549-1a4f120c6b41/go.mod h1:qKaqI0FuzSzLI6RtBrhVXqM5wjknuGYU2e8KVG+Z1MM= github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= github.com/leodido/go-urn v1.2.1 h1:BqpAaACuzVSgi/VLzGZIobT2z4v53pjosyNd9Yv6n/w= +github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY= github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/lucasb-eyer/go-colorful v1.2.0 h1:1nnpGOrhyZZuNyfu1QjKiUICQ74+3FNCN69Aj6K7nkY= +github.com/lucasb-eyer/go-colorful v1.2.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0= github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= @@ -216,7 +241,10 @@ github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Ky github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-localereader v0.0.1 h1:ygSAOl7ZXTx4RdPYinUpg6W99U8jWvWi9Ye2JC/oIi4= +github.com/mattn/go-localereader v0.0.1/go.mod h1:8fBrzywKY7BI3czFoHkuzRoWE9C+EiG4R1k4Cjx5p88= github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= +github.com/mattn/go-runewidth v0.0.12/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk= github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U= github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/mattn/go-sqlite3 v1.14.6 h1:dNPt6NO46WmLVt2DLNpwczCmdV5boIZ6g/tlDrlRUbg= @@ -224,6 +252,7 @@ github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= +github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -233,7 +262,16 @@ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lN github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= +github.com/muesli/ansi v0.0.0-20211018074035-2e021307bc4b h1:1XF24mVaiu7u+CFywTdcDo2ie1pzzhwjt6RHqzpMU34= +github.com/muesli/ansi v0.0.0-20211018074035-2e021307bc4b/go.mod h1:fQuZ0gauxyBcmsdE3ZT4NasjaRdxmbCS0jRHsrWu3Ho= +github.com/muesli/cancelreader v0.2.2 h1:3I4Kt4BQjOR54NavqnDogx/MIoWBFa0StPA8ELUXHmA= +github.com/muesli/cancelreader v0.2.2/go.mod h1:3XuTXfFS2VjM+HTLZY9Ak0l6eUKfijIfMUZ4EgX0QYo= +github.com/muesli/reflow v0.3.0 h1:IFsN6K9NfGtjeggFP+68I4chLZV2yIKsXJFNZ+eWh6s= +github.com/muesli/reflow v0.3.0/go.mod h1:pbwTDkVPibjO2kyvBQRBxTWEEGDGq0FlB1BIKtnHY/8= +github.com/muesli/termenv v0.15.2 h1:GohcuySI0QmI3wN8Ok9PtKGkgkFIk7y6Vpb5PvrY+Wo= +github.com/muesli/termenv v0.15.2/go.mod h1:Epx+iuz8sNs7mNKhxzH4fWXGNpZwUaJKRS1noLXviQ8= github.com/nats-io/jwt/v2 v2.4.1 h1:Y35W1dgbbz2SQUYDPCaclXcuqleVmpbRa7646Jf2EX4= +github.com/nats-io/jwt/v2 v2.4.1/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI= github.com/nats-io/nats-server/v2 v2.7.4 h1:c+BZJ3rGzUKCBIM4IXO8uNT2u1vajGbD1kPA6wqCEaM= github.com/nats-io/nats-server/v2 v2.7.4/go.mod h1:1vZ2Nijh8tcyNe8BDVyTviCd9NYzRbubQYiEHsvOQWc= github.com/nats-io/nats.go v1.30.2 h1:aloM0TGpPorZKQhbAkdCzYDj+ZmsJDyeo3Gkbr72NuY= @@ -256,6 +294,7 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qRg= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v1.17.0 h1:rl2sfwZMtSthVU752MqfjQozy7blglC+1SOtjMAMh+Q= github.com/prometheus/client_golang v1.17.0/go.mod h1:VeL+gMmOAxkS2IqfCq0ZmHSL+LjWfWDUmp1mBz9JgUY= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= @@ -265,11 +304,13 @@ github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdO github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY= github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= +github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis= github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/zerolog v1.31.0 h1:FcTR3NnLWW+NnTwwhFWiJSZr4ECLpqCm6QsEnyvbV4A= github.com/rs/zerolog v1.31.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= @@ -314,10 +355,12 @@ github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8 github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= +github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo= github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= github.com/ugorji/go/codec v1.2.9 h1:rmenucSohSTiyL09Y+l2OCk+FrMxGMzho2+tjr5ticU= +github.com/ugorji/go/codec v1.2.9/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= github.com/urfave/cli/v2 v2.25.7 h1:VAzn5oq403l5pHjc4OhD54+XGO9cdKVL/7lDjF+iKUs= github.com/urfave/cli/v2 v2.25.7/go.mod h1:8qnjx1vcq5s2/wpsqoZFndg2CE5tNFyrTvS6SinrnYQ= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= @@ -345,6 +388,7 @@ go.opentelemetry.io/otel/trace v1.19.0/go.mod h1:mfaSyvGyEJEI0nyV2I4qhNQnbBOUUmY go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= golang.org/x/arch v0.0.0-20210923205945-b76863e36670 h1:18EFjUmQOcUvxNYSkA6jO9VAiXCnxFY6NyDX0bHDmkU= +golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -426,6 +470,7 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -449,6 +494,8 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= +golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -489,12 +536,15 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek= +golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -510,6 +560,7 @@ golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxb golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= +golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= @@ -662,6 +713,7 @@ google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqw gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= diff --git a/lib/dedupe/dedupe.go b/lib/dedupe/dedupe.go index f623e9e8..633e54d9 100644 --- a/lib/dedupe/dedupe.go +++ b/lib/dedupe/dedupe.go @@ -2,6 +2,7 @@ package dedupe import ( "github.com/prometheus/client_golang/prometheus" + "github.com/rs/zerolog/log" "plane.watch/lib/dedupe/forgetfulmap" "plane.watch/lib/tracker" "plane.watch/lib/tracker/beast" @@ -42,6 +43,18 @@ func NewFilter(opts ...Option) *Filter { return &f } +func (f *Filter) HealthCheckName() string { + return "Dedupe Filter" +} + +func (f *Filter) HealthCheck() bool { + log.Info(). + Str("what", "Dedupe Middleware"). + Int32("Num Entries", f.list.Len()). + Msg("Health Check") + + return true +} func (f *Filter) Handle(fe *tracker.FrameEvent) tracker.Frame { if nil == fe { diff --git a/lib/dedupe/forgetfulmap/map.go b/lib/dedupe/forgetfulmap/map.go index 5ec2663a..51d0873e 100644 --- a/lib/dedupe/forgetfulmap/map.go +++ b/lib/dedupe/forgetfulmap/map.go @@ -114,7 +114,7 @@ func (f *ForgetfulSyncMap) sweep() { f.lookup.Range(func(key, value interface{}) bool { m, ok := value.(*marble) if !ok { - // not entirely sure how a non marble object got in, but whatever + // not entirely sure how a non-marble object got in, but whatever return true } @@ -183,11 +183,11 @@ func (f *ForgetfulSyncMap) Load(key any) (any, bool) { retVal, retBool := f.lookup.Load(key) if retBool { - if t, tok := retVal.(*marble); tok { + t, tok := retVal.(*marble) + if tok { return t.value, retBool - } else { - return nil, false } + return nil, false } else { return retVal, retBool } @@ -224,7 +224,6 @@ func (f *ForgetfulSyncMap) Range(rangeFunc func(key, value interface{}) bool) { } else { return rangeFunc(key, value) } - }) } diff --git a/lib/example_finder/filter.go b/lib/example_finder/filter.go index 8f3a064e..1d7e453c 100644 --- a/lib/example_finder/filter.go +++ b/lib/example_finder/filter.go @@ -79,6 +79,12 @@ func NewFilter(opts ...Option) *Filter { func (f *Filter) String() string { return "Example Finder/Filter" } +func (f *Filter) HealthCheckName() string { + return "Example Finder/Filter" +} +func (f *Filter) HealthCheck() bool { + return true +} func (f *Filter) Handle(fe *tracker.FrameEvent) tracker.Frame { if nil == fe { diff --git a/lib/export/types.go b/lib/export/types.go index ba83741c..fe990eed 100644 --- a/lib/export/types.go +++ b/lib/export/types.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "math" + "strconv" "sync" "time" @@ -333,7 +334,7 @@ func IsLocationPossible(prev, next PlaneLocation) bool { deltaBearing := prev.Heading - bearing absDeltaBearing := math.Abs(math.Mod(deltaBearing+180, 360) - 180) - if absDeltaBearing < 90 { //don't make this less than ~45 degrees, otherwise it'll be inaccurate due to possible wind. + if absDeltaBearing < 90 { // don't make this less than ~45 degrees, otherwise it'll be inaccurate due to possible wind. if log.Trace().Enabled() { log.Trace().Str("CallSign", unPtr(next.CallSign)). Str("Next", fmt.Sprintf("(%f,%f)", next.Lat, next.Lon)). @@ -362,3 +363,73 @@ func IsLocationPossible(prev, next PlaneLocation) bool { } return false } + +func (pl *PlaneLocation) CallSignStr() string { + if nil == pl { + return "" + } + if nil == pl.CallSign { + return "" + } + return *pl.CallSign +} + +func (pl *PlaneLocation) SquawkStr() string { + if nil == pl { + return "" + } + if pl.Squawk == "0" { + return "" + } + return pl.Squawk +} + +func (pl *PlaneLocation) LatStr() string { + if nil == pl { + return "" + } + if !pl.HasLocation { + return "" + } + return strconv.FormatFloat(pl.Lat, 'f', 4, 64) +} + +func (pl *PlaneLocation) LonStr() string { + if nil == pl { + return "" + } + if !pl.HasLocation { + return "" + } + return strconv.FormatFloat(pl.Lon, 'f', 4, 64) +} + +func (pl *PlaneLocation) AltitudeStr() string { + if nil == pl { + return "" + } + if !pl.HasAltitude { + return "" + } + return strconv.Itoa(pl.Altitude) + " " + pl.AltitudeUnits +} + +func (pl *PlaneLocation) VerticalRateStr() string { + if nil == pl { + return "" + } + if !pl.HasVerticalRate { + return "" + } + return strconv.Itoa(pl.VerticalRate) +} + +func (pl *PlaneLocation) HeadingStr() string { + if nil == pl { + return "" + } + if !pl.HasHeading { + return "" + } + return strconv.FormatFloat(pl.Heading, 'f', 1, 64) +} diff --git a/lib/logging/logging.go b/lib/logging/logging.go index 4de44c83..85cc8332 100644 --- a/lib/logging/logging.go +++ b/lib/logging/logging.go @@ -13,7 +13,7 @@ const ( VeryVerbose = "very-verbose" Debug = "debug" Quiet = "quiet" - CpuProfile = "cpu-profile" + CPUProfile = "cpu-profile" ) func IncludeVerbosityFlags(app *cli.App) { @@ -33,7 +33,7 @@ func IncludeVerbosityFlags(app *cli.App) { EnvVars: []string{"QUIET"}, }, &cli.StringFlag{ - Name: CpuProfile, + Name: CPUProfile, Usage: "Specifying this parameter causes a CPU Profile to be generated", }, ) @@ -59,8 +59,8 @@ func SetLoggingLevel(c *cli.Context) { c.Bool(Debug), c.Bool(Quiet), ) - if "" != c.String(CpuProfile) { - ConfigureForProfiling(c.String(CpuProfile)) + if c.String(CPUProfile) != "" { + ConfigureForProfiling(c.String(CPUProfile)) } } @@ -75,7 +75,7 @@ func SetVerboseOrQuiet(trace, verbose, quiet bool) { if quiet { zerolog.SetGlobalLevel(zerolog.ErrorLevel) } - //log.Info().Str("log-level", zerolog.GlobalLevel().String()).Msg("Logging Set") + // log.Info().Str("log-level", zerolog.GlobalLevel().String()).Msg("Logging Set") } func cliWriter() zerolog.ConsoleWriter { @@ -98,7 +98,7 @@ func ConfigureForProfiling(outFile string) { } func StopProfiling(c *cli.Context) error { - if fileName := c.String(CpuProfile); "" != fileName { + if fileName := c.String(CPUProfile); fileName != "" { pprof.StopCPUProfile() println("To analyze the profile, use this cmd") println("go tool pprof -http=:7777", fileName) diff --git a/lib/middleware/ingest_tap.go b/lib/middleware/ingest_tap.go index fb86f269..442c2c4f 100644 --- a/lib/middleware/ingest_tap.go +++ b/lib/middleware/ingest_tap.go @@ -13,6 +13,8 @@ import ( "sync" ) +const NatsAPIv1PwIngestTap = "v1.pw-ingest.tap" //nolint:gosec + type ( IngestTap struct { head, tail *condition @@ -45,7 +47,7 @@ func NewIngestTap(natsServer *nats_io.Server) tracker.Middleware { } var err error tap.natsQueue = "pw-ingest-tap-" + randstr.RandString(20) - tap.sub, err = tap.natsServer.SubscribeReply("v1.pw-ingest.tap", tap.natsQueue, tap.requestHandler) + tap.sub, err = tap.natsServer.SubscribeReply(NatsAPIv1PwIngestTap, tap.natsQueue, tap.requestHandler) if err != nil { return nil } @@ -91,7 +93,7 @@ func (tap *IngestTap) requestHandler(msg *nats.Msg) { var err error var uIcao uint64 - if "" != icao { + if icao != "" { uIcao, err = strconv.ParseUint(icao, 16, 32) if nil != err { log.Error().Err(err).Str("icao", icao).Msg("Failed to convert ICAO string into a uint") @@ -117,6 +119,14 @@ func (tap *IngestTap) String() string { return "Ingest Tap" } +func (tap *IngestTap) HealthCheckName() string { + return "Ingest Tap" +} + +func (tap *IngestTap) HealthCheck() bool { + return true +} + func (tap *IngestTap) Handle(frame *tracker.FrameEvent) tracker.Frame { if tap.head != nil { tap.queue <- frame @@ -190,17 +200,17 @@ func (c *condition) match(fe *tracker.FrameEvent) bool { if nil == fe { return false } - isMatchApiKey := true - if "" != c.apiKey { + isMatchAPIKey := true + if c.apiKey != "" { source := fe.Source() if nil == source { return false } - isMatchApiKey = source.Tag == c.apiKey + isMatchAPIKey = source.Tag == c.apiKey } isMatchIcao := true - if 0 != c.icao { + if c.icao != 0 { frame := fe.Frame() if nil == frame { return false @@ -208,5 +218,5 @@ func (c *condition) match(fe *tracker.FrameEvent) bool { isMatchIcao = frame.Icao() == c.icao } - return isMatchApiKey && isMatchIcao + return isMatchAPIKey && isMatchIcao } diff --git a/lib/nats_io/nats.go b/lib/nats_io/nats.go index 26eee849..fff3083b 100644 --- a/lib/nats_io/nats.go +++ b/lib/nats_io/nats.go @@ -47,9 +47,9 @@ func WithConnections(incoming, outgoing bool) Option { } } -func WithServer(serverUrl, connectionName string) Option { +func WithServer(serverURL, connectionName string) Option { return func(server *Server) { - server.SetUrl(serverUrl) + server.SetURL(serverURL) server.connectionName = connectionName } } @@ -70,16 +70,16 @@ func NewServer(opts ...Option) (*Server, error) { return n, nil } -func (n *Server) SetUrl(serverUrl string) { - serverUrlParts, err := url.Parse(serverUrl) +func (n *Server) SetURL(serverURL string) { + serverURLParts, err := url.Parse(serverURL) if nil == err { - if "" == serverUrlParts.Port() { - serverUrlParts.Host = net.JoinHostPort(serverUrlParts.Hostname(), "4222") + if serverURLParts.Port() == "" { + serverURLParts.Host = net.JoinHostPort(serverURLParts.Hostname(), "4222") } } else { log.Error().Err(err).Msg("invalid url") } - n.url = serverUrlParts.String() + n.url = serverURLParts.String() } func (n *Server) DroppedCounter(counter prometheus.Counter) { n.droppedMessageCounter = counter @@ -101,7 +101,7 @@ func (n *Server) NatsErrHandler(conn *nats.Conn, sub *nats.Subscription, err err } l.Send() - if nil != n.droppedMessageCounter && err == nats.ErrSlowConsumer { + if nil != n.droppedMessageCounter && errors.Is(err, nats.ErrSlowConsumer) { n.droppedMessageCounter.Inc() } } @@ -116,7 +116,11 @@ func (n *Server) Connect() error { nats.Name(n.connectionName+"+incoming"), ) if nil != err { - n.log.Error().Err(err).Str("dir", "incoming").Msg("Unable to connect to NATS server") + n.log.Error(). + Err(err). + Str("dir", "incoming"). + Str("url", n.url). + Msg("Unable to connect to NATS server") return err } } @@ -142,7 +146,7 @@ func (n *Server) Publish(queue string, msg []byte) error { } err := n.outgoing.Publish(queue, msg) if nil != err { - if nats.ErrInvalidConnection == err || nats.ErrConnectionClosed == err || nats.ErrConnectionDraining == err { + if errors.Is(err, nats.ErrInvalidConnection) || errors.Is(err, nats.ErrConnectionClosed) || errors.Is(err, nats.ErrConnectionDraining) { n.log.Error().Err(err).Msg("Connection not in a valid state") } } diff --git a/lib/tracker/input.go b/lib/tracker/input.go index fa432af7..e9c09a7d 100644 --- a/lib/tracker/input.go +++ b/lib/tracker/input.go @@ -59,6 +59,7 @@ type ( // Middleware has a chance to modify a frame before we send it to the plane Tracker Middleware interface { fmt.Stringer + monitoring.HealthCheck Handle(*FrameEvent) Frame } ) @@ -125,6 +126,7 @@ func (t *Tracker) AddMiddleware(m Middleware) { if nil == m { return } + monitoring.AddHealthCheck(m) t.log.Debug().Str("name", m.String()).Msg("Adding middleware") t.middlewares = append(t.middlewares, m) @@ -156,16 +158,30 @@ func (t *Tracker) StopOnCancel() { ch := make(chan os.Signal, 1) signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM, os.Interrupt) isStopping := false + exitChan := make(chan bool, 3) for { - sig := <-ch - log.Info().Str("Signal", sig.String()).Msg("Received Interrupt, stopping") - if !isStopping { - isStopping = true - t.Stop() - log.Info().Msg("Done Stopping") - } else { - log.Info().Str("Signal", sig.String()).Msg("Second Interrupt, forcing exit") - os.Exit(1) + select { + case sig := <-ch: + log.Info().Str("Signal", sig.String()).Msg("Received Interrupt, stopping") + if !isStopping { + isStopping = true + go func() { + t.Stop() + exitChan <- true + log.Info().Msg("Done Stopping") + }() + go func() { + time.Sleep(time.Second * 5) + exitChan <- true + log.Info().Msg("Timeout after 5 seconds, force stopping") + os.Exit(1) + }() + } else { + log.Info().Str("Signal", sig.String()).Msg("Second Interrupt, forcing exit") + os.Exit(1) + } + case <-exitChan: + return } } } diff --git a/lib/tracker/mode_s/common.go b/lib/tracker/mode_s/common.go index fee18eda..90d43c11 100644 --- a/lib/tracker/mode_s/common.go +++ b/lib/tracker/mode_s/common.go @@ -1,16 +1,16 @@ package mode_s -//var format string = "%20s = %13s" +// var format string = "%20s = %13s" // decode an AC12 altitude field func decodeAC12Field(AC12Field int32) int32 { qBit := (AC12Field & 0x10) == 0x10 var n int32 - //log.Printf(format, "0x10", strconv.FormatInt(int64(0x10), 2)) - //log.Printf(format, "AC12", strconv.FormatInt(int64(AC12Field), 2)) + // log.Printf(format, "0x10", strconv.FormatInt(int64(0x10), 2)) + // log.Printf(format, "AC12", strconv.FormatInt(int64(AC12Field), 2)) if qBit { - //log.Printf(format, "Q Bit Set", strconv.FormatInt(int64(AC12Field), 2)) + // log.Printf(format, "Q Bit Set", strconv.FormatInt(int64(AC12Field), 2)) /// N is the 11 bit integer resulting from the removal of bit Q at bit 4 n = ((AC12Field & 0x0FE0) >> 1) | (AC12Field & 0x000F) // The final altitude is the resulting number multiplied by 25, minus 1000. @@ -19,7 +19,7 @@ func decodeAC12Field(AC12Field int32) int32 { } else { // Make N a 13 bit Gillham coded altitude by inserting M=0 at bit 6 n = ((AC12Field & 0x0FC0) << 1) | (AC12Field & 0x003F) - //log.Printf(format, "Q Bit Clear", strconv.FormatInt(int64(n), 2)) + // log.Printf(format, "Q Bit Clear", strconv.FormatInt(int64(n), 2)) n = modeAToModeC(decodeID13Field(n)) if n < -12 { n = 0 @@ -50,7 +50,7 @@ func gillhamToAltitude(i16GillhamValue int32) int32 { func decodeID13Field(ID13Field int32) int32 { var hexGillham int32 - //log.Printf(format, "Decoding ID13 Field", strconv.FormatInt(int64(ID13Field), 2)) + // log.Printf(format, "Decoding ID13 Field", strconv.FormatInt(int64(ID13Field), 2)) if 0 < (ID13Field & 0x1000) { hexGillham |= 0x0010 diff --git a/lib/tracker/mode_s/decode.go b/lib/tracker/mode_s/decode.go index 6e7a9477..816425ec 100644 --- a/lib/tracker/mode_s/decode.go +++ b/lib/tracker/mode_s/decode.go @@ -117,37 +117,37 @@ func (f *Frame) parse() error { f.decodeDownLinkRequest() f.decodeUtilityMessage() err = f.decode13bitAltitudeCode() - case 5: //DF_5 + case 5: // DF_5 f.decodeICAO() f.decodeFlightStatus() f.decodeDownLinkRequest() f.decodeUtilityMessage() f.decodeSquawkIdentity(2, 3) // gillham encoded squawk - case 11: //DF_11 + case 11: // DF_11 f.decodeICAO() f.decodeCapability() - case 16: //DF_16 + case 16: // DF_16 f.decodeICAO() f.decodeVerticalStatus() err = f.decode13bitAltitudeCode() f.decodeReplyInformation() f.decodeSensitivityLevel() - case 17: //DF_17 + case 17: // DF_17 f.decodeICAO() f.decodeCapability() f.decodeAdsb() - case 18: //DF_18 + case 18: // DF_18 f.decodeCapability() // control field - if 0 == f.ca { + if f.ca == 0 { f.decodeICAO() f.decodeAdsb() } - case 20: //DF_20 + case 20: // DF_20 f.decodeICAO() f.decodeFlightStatus() _ = f.decode13bitAltitudeCode() err = f.decodeCommB() - case 21: //DF_21 + case 21: // DF_21 f.decodeICAO() f.decodeFlightStatus() f.decodeSquawkIdentity(2, 3) // gillham encoded squawk diff --git a/lib/ws_protocol/client.go b/lib/ws_protocol/client.go new file mode 100644 index 00000000..3c4a9a6b --- /dev/null +++ b/lib/ws_protocol/client.go @@ -0,0 +1,161 @@ +package ws_protocol + +import ( + "context" + "crypto/tls" + "encoding/json" + "github.com/pkg/errors" + "github.com/rs/zerolog" + "net/http" + "nhooyr.io/websocket" + "time" +) + +const ( + ProdSourceURL = "https://plane.watch/planes" +) + +type ( + WsClient struct { + sourceURL string + conn *websocket.Conn + + responseHandlers []ResponseHandler + + logger zerolog.Logger + } + + Option func(client *WsClient) + ResponseHandler func(response *WsResponse) +) + +var ( + ErrUnexpectedProtocol = errors.New("Unexpected WS Protocol") +) + +func WithSourceURL(sourceURL string) Option { + return func(client *WsClient) { + client.sourceURL = sourceURL + } +} + +func WithResponseHandler(f ResponseHandler) Option { + return func(client *WsClient) { + client.responseHandlers = append(client.responseHandlers, f) + } +} + +func WithLogger(logger zerolog.Logger) Option { + return func(client *WsClient) { + client.logger = logger + } +} + +func NewClient(opts ...Option) *WsClient { + c := &WsClient{ + sourceURL: ProdSourceURL, + responseHandlers: make([]ResponseHandler, 0), + } + + for _, opt := range opts { + opt(c) + } + + return c +} + +func (c *WsClient) Connect() error { + var err error + customTransport := http.DefaultTransport.(*http.Transport).Clone() + customTransport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} + httpClient := &http.Client{Transport: customTransport} + cfg := &websocket.DialOptions{ + HTTPClient: httpClient, + Subprotocols: []string{WsProtocolPlanes}, + } + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + c.conn, _, err = websocket.Dial(ctx, c.sourceURL, cfg) + + if nil != err { + return errors.Wrap(err, "failed to connect to Plane.Watch websocket") + } + + if c.conn.Subprotocol() != WsProtocolPlanes { + return ErrUnexpectedProtocol + } + + c.conn.SetReadLimit(1_048_576) // 1MiB + + go c.reader() + return nil +} + +func (c *WsClient) Disconnect() error { + return c.conn.Close(websocket.StatusNormalClosure, "Going away") +} + +func (c *WsClient) reader() { + for { + mType, msg, err := c.conn.Read(context.Background()) + if mType != websocket.MessageText { + // c.logger.Error().Str("body", string(msg)).Msg("Incorrect Protocol, expected text, got binary") + continue + } + if nil != err { + if errors.Is(err, websocket.CloseError{}) { + return + } + c.logger.Error().Err(err).Send() + continue + } + + r := &WsResponse{} + err = json.Unmarshal(msg, r) + if nil != err { + c.logger.Error().Err(err).Send() + continue + } + + for _, f := range c.responseHandlers { + f(r) + } + } +} + +func (c *WsClient) writeRequest(rq *WsRequest) error { + rqJSON, err := json.Marshal(rq) + if err != nil { + return errors.Wrap(err, "failed to subscribe to tile") + } + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + return c.conn.Write(ctx, websocket.MessageText, rqJSON) +} + +func (c *WsClient) Subscribe(gridTile string) error { + rq := WsRequest{ + Type: RequestTypeSubscribe, + GridTile: gridTile, + } + + return c.writeRequest(&rq) +} + +func (c *WsClient) SubscribeAllLow() error { + rq := WsRequest{ + Type: RequestTypeSubscribe, + GridTile: GridTileAllLow, + } + + return c.writeRequest(&rq) +} + +func (c *WsClient) SubscribeAllHigh() error { + rq := WsRequest{ + Type: RequestTypeSubscribe, + GridTile: GridTileAllHigh, + } + + return c.writeRequest(&rq) +} diff --git a/lib/ws_protocol/protocol.go b/lib/ws_protocol/protocol.go index 3db32544..a09f67f8 100644 --- a/lib/ws_protocol/protocol.go +++ b/lib/ws_protocol/protocol.go @@ -25,6 +25,9 @@ const ( ResponseTypePlaneLocations = "plane-location-list" ResponseTypePlaneLocHistory = "plane-location-history" ResponseTypeSearchResults = "search-results" + + GridTileAllLow = "all_low" + GridTileAllHigh = "all_high" ) type (