Skip to content

Commit

Permalink
feat: implemented tail command
Browse files Browse the repository at this point in the history
  • Loading branch information
chetan committed Nov 3, 2021
1 parent b0de06c commit 43e6ea3
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 28 deletions.
24 changes: 24 additions & 0 deletions bin/vproxy/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,30 @@ vproxy connect hello.local:8888 -- vproxy hello
},
},
},
{
Name: "tail",
Aliases: []string{"stream", "attach"},
Usage: "Stream logs for given vhost",
Action: tailLogs,
Before: loadClientConfig,
UsageText: `vproxy tail [command options] <hostname>`,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "host",
Value: "127.0.0.1",
Usage: "Daemon host IP",
},
&cli.IntFlag{
Name: "http",
Value: 80,
Usage: "Daemon HTTP port",
},
&cli.BoolFlag{
Name: "no-follow",
Usage: "Get the most recent logs and exit",
},
},
},
{
Name: "list",
Aliases: []string{"l"},
Expand Down
22 changes: 19 additions & 3 deletions bin/vproxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ func printVersion(c *cli.Context) error {
}

func startClient(c *cli.Context) error {
host := c.String("host")
httpPort := c.Int("http")

// collect and validate binds
args := c.Args().Slice()
Expand Down Expand Up @@ -67,7 +65,7 @@ func startClient(c *cli.Context) error {
}
}

client := &vproxy.Client{Addr: fmt.Sprintf("%s:%d", host, httpPort)}
client := createClient(c)
if !client.IsDaemonRunning() {
return fmt.Errorf("daemon not running on localhost")
}
Expand All @@ -76,6 +74,24 @@ func startClient(c *cli.Context) error {
return nil
}

func createClient(c *cli.Context) *vproxy.Client {
host := c.String("host")
httpPort := c.Int("http")
return &vproxy.Client{Addr: fmt.Sprintf("%s:%d", host, httpPort)}
}

func tailLogs(c *cli.Context) error {
if !c.Args().Present() {
return fmt.Errorf("missing hostname")
}

hostname := c.Args().First()
client := createClient(c)
client.Attach(hostname)

return nil
}

func validateBinding(bind string) error {
if bind == "" || !reBinding.MatchString(bind) {
return fmt.Errorf("invalid binding: '%s' (expected format 'host:port', e.g., 'app.local.com:7000')", bind)
Expand Down
27 changes: 22 additions & 5 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ func (c *Client) AddBindings(binds []string, detach bool, args []string) {
os.Exit(1)
}

c.runCommand(args)

c.wg = &sync.WaitGroup{}
for _, bind := range binds {
c.wg.Add(1)
Expand Down Expand Up @@ -83,12 +85,20 @@ func (c *Client) addBinding(bind string, detach bool) {
if detach {
res.Body.Close()
} else {
streamLogs(res)
c.Attach(s[0])
}
}

func attach(addr string, bind string) {

func (c *Client) Attach(hostname string) {
data := url.Values{}
data.Add("host", hostname)
res, err := http.DefaultClient.PostForm(c.uri("/clients/stream"), data)
if err != nil {
stopCommand(c.cmd)
log.Fatalf("error registering client: %s\n", err)
}
fmt.Printf("[*] streaming logs for %s\n", hostname)
streamLogs(res)
}

func streamLogs(res *http.Response) {
Expand All @@ -97,8 +107,15 @@ func streamLogs(res *http.Response) {
for {
line, err := r.ReadString('\n')
if err != nil {
fmt.Printf("error reading from daemon: %s\n", err)
fmt.Println("exiting")
if line != "" && strings.Contains(line, "error") {
fmt.Println(line)
os.Exit(1)
} else if err.Error() == "EOF" {
fmt.Println("[*] daemon connection closed")
} else {
fmt.Printf("error reading from daemon: %s\n", err)
fmt.Println("exiting")
}
os.Exit(0)
}
if strings.HasPrefix(line, "[*] ") {
Expand Down
43 changes: 23 additions & 20 deletions daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ func (d *Daemon) Run() {

d.loggedHandler.HandleFunc("/_vproxy/hello", d.hello)
d.loggedHandler.HandleFunc("/_vproxy/clients", d.listClients)
d.loggedHandler.HandleFunc("/_vproxy", d.registerVhost)
d.loggedHandler.HandleFunc("/_vproxy/clients/add", d.registerVhost)
d.loggedHandler.HandleFunc("/_vproxy/clients/stream", d.streamLogs)
d.wg.Add(1) // ensure we don't exit immediately

if d.enableHTTP() {
Expand Down Expand Up @@ -145,10 +146,7 @@ func (d *Daemon) startHTTP() {
defer null.Close()

server := &http.Server{Handler: d.loggedHandler, ErrorLog: nullLogger}
err = server.Serve(d.httpListener)
// if err != nil {
// fmt.Printf("[*] error: http server exited with error: %s\n", err)
// }
server.Serve(d.httpListener)
d.wg.Done()
}

Expand All @@ -170,10 +168,7 @@ func (d *Daemon) startTLS() {
// ErrorLog: nullLogger,
}

err = server.ServeTLS(d.httpsListener, "", "")
// if err != nil {
// fmt.Printf("[*] error: tls server exited with error: %s\n", err)
// }
server.ServeTLS(d.httpsListener, "", "")
d.wg.Done()
}

Expand All @@ -185,28 +180,36 @@ func (d *Daemon) restartTLS() {
go d.startTLS()
}

// registerVhost handler
// registerVhost handler creates and starts a new vhost reverse proxy
func (d *Daemon) registerVhost(w http.ResponseWriter, r *http.Request) {
binding := r.PostFormValue("binding")
d.addVhost(binding, w)

// defer func() {
// // Remove this client when this handler exits
// fmt.Printf("[*] removing vhost: %s -> %d\n", vhost.Host, vhost.Port)
// d.loggedHandler.RemoveVhost(vhost.Host)
// d.restartTLS()
// }()
}

// streamLogs for a given hostname back to the caller. Runs forever until client
// disconnects.
func (d *Daemon) streamLogs(w http.ResponseWriter, r *http.Request) {
rw := w.(*LogRecord).ResponseWriter
flusher, ok := rw.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
return
}

binding := r.PostFormValue("binding")
logChan, vhost := d.addVhost(binding, w)
if vhost == nil {
hostname := r.PostFormValue("host")
logChan := d.loggedHandler.VhostLogListeners[hostname]
if logChan == nil {
fmt.Fprintf(w, "[*] error: host '%s' not found", hostname)
return
}

defer func() {
// Remove this client when this handler exits
fmt.Printf("[*] removing vhost: %s -> %d\n", vhost.Host, vhost.Port)
d.loggedHandler.RemoveVhost(vhost.Host)
d.restartTLS()
}()

// runs forever until connection closes
d.relayLogsUntilClose(flusher, logChan, rw, w)
}
Expand Down

0 comments on commit 43e6ea3

Please sign in to comment.