Skip to content

Commit

Permalink
Merge pull request #219 from plane-watch/more-ingest-tap-tui
Browse files Browse the repository at this point in the history
More ingest tap tui
  • Loading branch information
bluntelk authored Oct 24, 2023
2 parents 0c29a9f + dfa92f7 commit 0fbff25
Show file tree
Hide file tree
Showing 21 changed files with 1,376 additions and 132 deletions.
119 changes: 48 additions & 71 deletions cmd/ingest_tap/main.go
Original file line number Diff line number Diff line change
@@ -1,46 +1,34 @@
package main

import (
"github.com/nats-io/nats.go"
"github.com/rs/zerolog/log"
"github.com/urfave/cli/v2"
"os"
"os/signal"
"plane.watch/lib/logging"
"plane.watch/lib/nats_io"
"plane.watch/lib/randstr"
"plane.watch/lib/tracker/beast"
"sync"
"syscall"
"time"
)

const (
natsUrl = "nats"
apiKey = "api-key"
icao = "icao"
logFile = "file"
natsURL = "nats"
websocketURL = "websocket-url"
feederAPIKey = "api-key"
icao = "icao"
logFile = "file"
)

func main() {
app := cli.NewApp()
app.Name = "pw_ingest tap"
app.Name = "pw_ingest natsTap"
app.Description = "Ask all of the pw_ingest instances on the nats bus to feed matching data to this terminal.\n" +
"Matching beast/avr/sbs1 messages will be forwarded to you"

app.Commands = cli.Commands{
{
Name: "log",
Description: "Logs matching data to a file",
Name: "log",
Usage: "Logs matching data to a file",
Flags: []cli.Flag{
&cli.StringFlag{
Name: icao,
Usage: "if specified, only frames from the plane with the specified ICAO will be sent",
},
&cli.StringFlag{
Name: apiKey,
Usage: "the plane.watch api UUID for the given feeder you want to investigate",
},
&cli.StringFlag{
Name: logFile,
Value: "captured-frames",
Expand All @@ -49,14 +37,32 @@ func main() {
},
Action: logMatching,
},
{
Name: "tui",
Usage: "Shows an Interactive Text User Interface",
Action: runTui,
},
}

app.Flags = []cli.Flag{
&cli.StringFlag{
Name: natsUrl,
Name: natsURL,
Usage: "nats url",
Value: "nats://localhost:4222/",
},
&cli.StringFlag{
Name: websocketURL,
Usage: "Plane Watch WebSocket Api URL",
Value: "https://localhost/planes",
},
&cli.StringFlag{
Name: icao,
Usage: "if specified, only frames from the plane with the specified ICAO will be sent",
},
&cli.StringFlag{
Name: feederAPIKey,
Usage: "the plane.watch api UUID for the given feeder you want to investigate",
},
}
logging.IncludeVerbosityFlags(app)
app.Before = func(c *cli.Context) error {
Expand All @@ -72,6 +78,13 @@ func main() {

func logMatching(c *cli.Context) error {
logging.ConfigureForCli()

tapper := NewPlaneWatchTapper(WithLogger(log.Logger))
if err := tapper.Connect(c.String(natsURL), ""); err != nil {
return err
}
defer tapper.Disconnect()

fileHandles := make(map[string]*os.File)
exts := []string{"beast", "avr", "sbs1"}
for _, ext := range exts {
Expand All @@ -84,55 +97,17 @@ func logMatching(c *cli.Context) error {
fileHandles[ext] = fh
}

natsSvr, err := nats_io.NewServer(
nats_io.WithConnections(true, true),
nats_io.WithServer(c.String(natsUrl), "ingest-tap"),
)
if nil != err {
if err := tapper.IncomingDataTap(c.String(icao), c.String(feederAPIKey), handleStream(fileHandles)); err != nil {
return err
}

tapSubject := "ingest-tap-" + randstr.RandString(20)
ch, err := natsSvr.Subscribe(tapSubject)

var wg sync.WaitGroup
wg.Add(1)

go handleStream(ch, fileHandles, &wg)

headers := map[string]string{
"action": "add",
"api-key": c.String(apiKey),
"icao": c.String(icao),
"subject": tapSubject,
}

response, errRq := natsSvr.Request("v1.pw-ingest.tap", []byte{}, headers, time.Second)
if nil != errRq {
log.Error().Err(err).Msg("Failed to request tap")
return err
}
log.Debug().Str("response", string(response)).Msg("request response")

chSignal := make(chan os.Signal, 1)
signal.Notify(chSignal, syscall.SIGINT, syscall.SIGTERM)
<-chSignal // wait for our cancel signal
log.Info().Msg("Shutting down")

// ask to step sending
headers["action"] = "remove"
response, errRq = natsSvr.Request("v1.pw-ingest.tap", []byte{}, headers, time.Second)
if nil != errRq {
log.Error().Err(err).Msg("Failed to stop tap")
return err
}
log.Debug().Str("response", string(response)).Msg("request response")

close(ch)
wg.Wait()

for ext, f := range fileHandles {
err = f.Close()
err := f.Close()
if err != nil {
log.Error().Err(err).Str("ext", ext).Msg("Failed to close file")
}
Expand All @@ -142,34 +117,36 @@ func logMatching(c *cli.Context) error {
return nil
}

func handleStream(ch chan *nats.Msg, fileHandles map[string]*os.File, wg *sync.WaitGroup) {
for msg := range ch {
switch msg.Header.Get("type") {
func handleStream(fileHandles map[string]*os.File) IngestTapHandler {
return func(frameType, tag string, msgData []byte) {
switch frameType {
case "beast":
b, err := beast.NewFrame(msg.Data, false)
b, err := beast.NewFrame(msgData, false)
if nil != err {
log.Error().Err(err)
return
}
log.Info().
Str("ICAO", b.IcaoStr()).
Str("AVR", b.RawString()).
Str("tag", msg.Header.Get("tag")).
Str("tag", tag).
Send()

// TODO: Replace timestamp with our own
_, err = fileHandles["beast"].Write(msg.Data)
_, err = fileHandles["beast"].Write(msgData)
if err != nil {
log.Error().Err(err).Send()
}
case "avr":
_, err := fileHandles["avr"].Write(append(msg.Data, 10))
_, err := fileHandles["avr"].Write(append(msgData, 10)) // new line append
if err != nil {
log.Error().Err(err).Send()
}
case "sbs1":
_, err := fileHandles["sbs1"].Write(append(msg.Data, 10))
_, err := fileHandles["sbs1"].Write(append(msgData, 10))
if err != nil {
log.Error().Err(err).Send()
}

}
}
wg.Done()
}
Loading

0 comments on commit 0fbff25

Please sign in to comment.