Skip to content

Commit

Permalink
Proxy http stream responses (#31)
Browse files Browse the repository at this point in the history
Signed-off-by: Tamal Saha <[email protected]>
  • Loading branch information
tamalsaha authored May 7, 2023
1 parent bdd4c26 commit 5d2e9eb
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 28 deletions.
69 changes: 57 additions & 12 deletions pkg/cmds/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ func NewCmdRun() *cobra.Command {

var pool = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
// https://docs.nats.io/reference/faq#is-there-a-message-size-limitation-in-nats
return bufio.NewWriterSize(nil, 8*1024) // 8 KB
},
}

Expand Down Expand Up @@ -211,24 +212,68 @@ func addSubscribers(nc *nats.Conn, names shared.SubjectNames) error {
}
}

buf := pool.Get().(*bytes.Buffer)
defer pool.Put(buf)
buf.Reset()
ncw := &natsWriter{
nc: nc,
subj: msg.Reply,
}

w := pool.Get().(*bufio.Writer)
defer pool.Put(w)
w.Reset(ncw)

respMsg := &nats.Msg{
Subject: msg.Reply,
err = resp.Write(w)
ncw.final = true
if err != nil {
_, _ = ncw.WriteError(err)
return
}
if err := resp.Write(buf); err != nil { // WriteProxy
respMsg.Data = []byte(err.Error())
if w.Buffered() > 0 {
if e2 := w.Flush(); e2 != nil {
klog.ErrorS(e2, "failed to flush buffer")
}
} else {
respMsg.Data = buf.Bytes()
if _, e2 := ncw.Write([]byte("\n")); e2 != nil {
klog.ErrorS(e2, "failed to close buffer")
}
}
})
return err
}

type natsWriter struct {
nc *nats.Conn
subj string
final bool
}

var _ io.Writer = &natsWriter{}

if err := msg.RespondMsg(respMsg); err != nil {
klog.ErrorS(err, "failed to respond to proxy request")
func (w *natsWriter) Write(data []byte) (int, error) {
h := nats.Header{}
if w.final {
h.Set(transport.HeaderKeyDone, "")
}
return len(data), w.nc.PublishMsg(&nats.Msg{
Subject: w.subj,
Data: data,
Header: h,
})
}

func (w *natsWriter) WriteError(err error) (int, error) {
h := nats.Header{}
if w.final {
if err == nil {
h.Set(transport.HeaderKeyDone, "")
} else {
h.Set(transport.HeaderKeyDone, err.Error())
}
}
return 0, w.nc.PublishMsg(&nats.Msg{
Subject: w.subj,
Data: nil,
Header: h,
})
return err
}

// k8s.io/client-go/transport/cache.go
Expand Down
58 changes: 42 additions & 16 deletions pkg/transport/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"os"
"sync"
Expand All @@ -36,6 +38,8 @@ import (
const (
natsConnectionTimeout = 350 * time.Millisecond
natsConnectionRetryInterval = 100 * time.Millisecond

HeaderKeyDone = "Done"
)

// NewConnection creates a new NATS connection
Expand Down Expand Up @@ -185,15 +189,11 @@ func (rt *NatsTransport) RoundTrip(r *http.Request) (*http.Response, error) {
return nil, err
}

resp, err := Proxy(rt.Conn, rt.Names, buf.Bytes(), timeout)
if err != nil {
return nil, err
}
return http.ReadResponse(bufio.NewReader(bytes.NewReader(resp)), r)
return Proxy(r, rt.Conn, rt.Names, buf.Bytes(), timeout)
}

// SEE: https://github.com/nats-io/nats.docs/blob/master/using-nats/developing-with-nats/sending/replyto.md#including-a-reply-subject
func Proxy(nc *nats.Conn, names shared.SubjectNames, data []byte, timeout time.Duration) ([]byte, error) {
func Proxy(req *http.Request, nc *nats.Conn, names shared.SubjectNames, data []byte, timeout time.Duration) (*http.Response, error) {
hubRespSub, edgeRespSub := names.ProxyResponseSubjects()

// Listen for a single response
Expand All @@ -209,16 +209,42 @@ func Proxy(nc *nats.Conn, names shared.SubjectNames, data []byte, timeout time.D
return nil, err
}

// Read the reply
msg, err := sub.NextMsg(timeout)
if err != nil {
return nil, err
}
r, w := io.Pipe()
go func() {
var e2 error

err = sub.Unsubscribe()
if err != nil {
return nil, err
}
defer func() {
if e2 != nil {
_ = w.CloseWithError(e2)
} else {
_ = w.Close()
}
_ = sub.Unsubscribe()
}()

for {
var msg *nats.Msg
msg, e2 = sub.NextMsg(timeout)
if e2 != nil {
if e2 == nats.ErrTimeout {
e2 = nil
continue // ignore ErrTimeout
}
break
}

_, e2 = w.Write(msg.Data)
if e2 != nil {
break
}
if results, ok := msg.Header[HeaderKeyDone]; ok {
if results[0] != "" {
e2 = errors.New(results[0])
}
break
}
}
}()

return msg.Data, nil
return http.ReadResponse(bufio.NewReader(r), req)
}

0 comments on commit 5d2e9eb

Please sign in to comment.