From 70287ab03a08b7c8405694bd2570ea5b2fec07dd Mon Sep 17 00:00:00 2001 From: "truong.hua" Date: Mon, 29 Nov 2021 02:49:10 +0700 Subject: [PATCH 01/16] Support decompress gzip request body by detecting header 'Content-Encoding: gzip' --- src/route/handlers.go | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/route/handlers.go b/src/route/handlers.go index 108b5ff..04c6892 100644 --- a/src/route/handlers.go +++ b/src/route/handlers.go @@ -9,6 +9,7 @@ import ( "net/http" "net/url" "strings" + "compress/gzip" "github.com/apache/pulsar-client-go/pulsar" "github.com/gorilla/mux" @@ -74,7 +75,18 @@ func StatusPage(w http.ResponseWriter, r *http.Request) { // ReceiveHandler - the message receiver handler func ReceiveHandler(w http.ResponseWriter, r *http.Request) { - b, err := ioutil.ReadAll(r.Body) + var b []byte + var err error + if r.Header.Get("Content-Encoding") == "gzip" { + g, gerr := gzip.NewReader(r.Body) + if gerr != nil { + util.ResponseErrorJSON(gerr, w, http.StatusInternalServerError) + return + } + b, err = ioutil.ReadAll(g) + } else { + b, err = ioutil.ReadAll(r.Body) + } defer r.Body.Close() if err != nil { util.ResponseErrorJSON(err, w, http.StatusInternalServerError) From 43ee8354195917c250cf48021961413c1d602740 Mon Sep 17 00:00:00 2001 From: "truong.hua" Date: Mon, 9 May 2022 22:19:14 +0700 Subject: [PATCH 02/16] Support GET param ?includeHeaders=true to generate rich message format that contains both HTTP headers and body as a single message --- src/route/handlers.go | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/src/route/handlers.go b/src/route/handlers.go index 6c863bf..f624684 100644 --- a/src/route/handlers.go +++ b/src/route/handlers.go @@ -73,6 +73,12 @@ func StatusPage(w http.ResponseWriter, r *http.Request) { return } +// Full message structure that contains all request information +type InfoRichMessage struct { + Headers http.Header `json:"headers"` + Body string `json:"body"` +} + // ReceiveHandler - the message receiver handler func ReceiveHandler(w http.ResponseWriter, r *http.Request) { var b []byte @@ -97,6 +103,21 @@ func ReceiveHandler(w http.ResponseWriter, r *http.Request) { util.ResponseErrorJSON(err, w, http.StatusUnauthorized) return } + + // Include headers information into the message payload if url has includeHeaders=true + includeHeaders, isInfoRichMessage := r.URL.Query()["includeHeaders"] + + var infoRichMessage *InfoRichMessage + + if isInfoRichMessage && includeHeaders[0] != "false" { + infoRichMessage = new(InfoRichMessage) + infoRichMessage.Headers = r.Header + infoRichMessage.Body = string(b) + } + + if infoRichMessage != nil { + b, _ = json.Marshal(infoRichMessage) + } topicFN, err2 := GetTopicFnFromRoute(mux.Vars(r)) if topic == "" && err2 != nil { From c2e3604de105161d58943d38f951e5055cea3d2f Mon Sep 17 00:00:00 2001 From: "truong.hua" Date: Tue, 21 Jun 2022 11:07:34 +0700 Subject: [PATCH 03/16] Implement pprof so we can profile the performance while daemon is running to optimize it --- src/route/router.go | 2 +- src/route/routes.go | 39 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/src/route/router.go b/src/route/router.go index 8ae106b..f9211db 100644 --- a/src/route/router.go +++ b/src/route/router.go @@ -36,7 +36,7 @@ func NewRouter(mode *string) *mux.Router { // GetEffectiveRoutes gets effective routes func GetEffectiveRoutes(mode *string) Routes { - return append(PrometheusRoute, getRoutes(mode)...) + return append(PprofRoute, append(PrometheusRoute, getRoutes(mode)...)...) } func getRoutes(mode *string) Routes { diff --git a/src/route/routes.go b/src/route/routes.go index b09cc1c..ef1b79d 100644 --- a/src/route/routes.go +++ b/src/route/routes.go @@ -2,6 +2,7 @@ package route import ( "net/http" + "net/http/pprof" "github.com/gorilla/mux" "github.com/kafkaesque-io/pulsar-beam/src/middleware" @@ -42,6 +43,44 @@ var PrometheusRoute = Routes{ }, } +var PprofRoute = Routes{ + Route{ + "Pprof Index", + http.MethodGet, + "/debug/pprof/", + pprof.Index, + middleware.NoAuth, + }, + Route{ + "Pprof Cmdline", + http.MethodGet, + "/debug/pprof/cmdline", + pprof.Cmdline, + middleware.NoAuth, + }, + Route{ + "Pprof Profile", + http.MethodGet, + "/debug/pprof/profile", + pprof.Profile, + middleware.NoAuth, + }, + Route{ + "Pprof Symbol", + http.MethodGet, + "/debug/pprof/symbol", + pprof.Symbol, + middleware.NoAuth, + }, + Route{ + "Pprof Trace", + http.MethodGet, + "/debug/pprof/trace", + pprof.Trace, + middleware.NoAuth, + }, +} + // ReceiverRoutes definition var ReceiverRoutes = Routes{ Route{ From b747439904d4bb499bc7eb8304620a17461ebbbe Mon Sep 17 00:00:00 2001 From: "truong.hua" Date: Sat, 2 Jul 2022 01:45:30 +0700 Subject: [PATCH 04/16] Implement missed pprof endpoints --- src/route/router.go | 14 ++++++++++++-- src/route/routes.go | 39 --------------------------------------- 2 files changed, 12 insertions(+), 41 deletions(-) diff --git a/src/route/router.go b/src/route/router.go index f9211db..847b664 100644 --- a/src/route/router.go +++ b/src/route/router.go @@ -2,6 +2,7 @@ package route import ( "net/http" + "net/http/pprof" "github.com/gorilla/mux" @@ -25,8 +26,17 @@ func NewRouter(mode *string) *mux.Router { Path(route.Pattern). Name(route.Name). Handler(route.AuthFunc(handler)) - } + + router.Handle("/debug/pprof", http.HandlerFunc(pprof.Index)) + router.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline)) + router.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile)) + router.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol)) + router.Handle("/debug/pprof/heap", pprof.Handler("heap")) + router.Handle("/debug/pprof/block", pprof.Handler("block")) + router.Handle("/debug/pprof/goroutine", pprof.Handler("goroutine")) + router.Handle("/debug/pprof/threadcreate", pprof.Handler("threadcreate")) + // TODO rate limit can be added per route basis router.Use(middleware.LimitRate) @@ -36,7 +46,7 @@ func NewRouter(mode *string) *mux.Router { // GetEffectiveRoutes gets effective routes func GetEffectiveRoutes(mode *string) Routes { - return append(PprofRoute, append(PrometheusRoute, getRoutes(mode)...)...) + return append(PrometheusRoute, getRoutes(mode)...) } func getRoutes(mode *string) Routes { diff --git a/src/route/routes.go b/src/route/routes.go index ef1b79d..b09cc1c 100644 --- a/src/route/routes.go +++ b/src/route/routes.go @@ -2,7 +2,6 @@ package route import ( "net/http" - "net/http/pprof" "github.com/gorilla/mux" "github.com/kafkaesque-io/pulsar-beam/src/middleware" @@ -43,44 +42,6 @@ var PrometheusRoute = Routes{ }, } -var PprofRoute = Routes{ - Route{ - "Pprof Index", - http.MethodGet, - "/debug/pprof/", - pprof.Index, - middleware.NoAuth, - }, - Route{ - "Pprof Cmdline", - http.MethodGet, - "/debug/pprof/cmdline", - pprof.Cmdline, - middleware.NoAuth, - }, - Route{ - "Pprof Profile", - http.MethodGet, - "/debug/pprof/profile", - pprof.Profile, - middleware.NoAuth, - }, - Route{ - "Pprof Symbol", - http.MethodGet, - "/debug/pprof/symbol", - pprof.Symbol, - middleware.NoAuth, - }, - Route{ - "Pprof Trace", - http.MethodGet, - "/debug/pprof/trace", - pprof.Trace, - middleware.NoAuth, - }, -} - // ReceiverRoutes definition var ReceiverRoutes = Routes{ Route{ From 191e65210387b8a98ddad6714e3601f26c34bf2b Mon Sep 17 00:00:00 2001 From: "truong.hua" Date: Thu, 7 Jul 2022 16:08:56 +0700 Subject: [PATCH 05/16] Leverage docker cache to speed up futher build by caching the dependencies --- Dockerfile | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/Dockerfile b/Dockerfile index dee6803..d3a9405 100644 --- a/Dockerfile +++ b/Dockerfile @@ -12,6 +12,11 @@ RUN apk --no-cache add build-base git RUN go install github.com/google/gops@latest WORKDIR /root/ + +# Cache dependencies download +ADD go.mod go.sum ./ +RUN go mod graph | awk '{if ($1 !~ "@") print $2}' | xargs go get + ADD . /root RUN cd /root/src && go build -o pulsar-beam From 1ada8fa2f851bd86a38cd1f9da50b6bec5a906f7 Mon Sep 17 00:00:00 2001 From: "truong.hua" Date: Thu, 7 Jul 2022 16:09:29 +0700 Subject: [PATCH 06/16] This log is duplicated with the same log in NewPulsarClient --- src/pulsardriver/pulsar-client.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/pulsardriver/pulsar-client.go b/src/pulsardriver/pulsar-client.go index cb85632..708af70 100644 --- a/src/pulsardriver/pulsar-client.go +++ b/src/pulsardriver/pulsar-client.go @@ -67,9 +67,6 @@ func (c *PulsarClient) GetClient(url, tokenStr string) (pulsar.Client, error) { log.Errorf("failed instantiate pulsar client %v", err) return nil, fmt.Errorf("Could not instantiate Pulsar client: %v", err) } - if log.GetLevel() == log.DebugLevel { - log.Debugf("pulsar client url %s\n token %s", url, tokenStr) - } c.client = driver return driver, nil From 9d42172f32e32b69b4dbe819d151f67a7a03a0a6 Mon Sep 17 00:00:00 2001 From: "truong.hua" Date: Thu, 7 Jul 2022 16:09:44 +0700 Subject: [PATCH 07/16] Add more debug log to trace the resource leaking problem --- src/pulsardriver/pulsar-client.go | 1 + 1 file changed, 1 insertion(+) diff --git a/src/pulsardriver/pulsar-client.go b/src/pulsardriver/pulsar-client.go index 708af70..3b6bf32 100644 --- a/src/pulsardriver/pulsar-client.go +++ b/src/pulsardriver/pulsar-client.go @@ -29,6 +29,7 @@ func GetPulsarClient(pulsarURL, pulsarToken string, reset bool) (pulsar.Client, clientSync.Lock() driver, ok := ClientCache[key] if !ok { + log.Debugf("Creating new pulsar client cache %s\n token %s", pulsarURL, pulsarToken) driver = &PulsarClient{} driver.createdAt = time.Now() driver.pulsarURL = pulsarURL From ce0a8b20df1ac1d498c872bbcd12615498580570 Mon Sep 17 00:00:00 2001 From: "truong.hua" Date: Thu, 7 Jul 2022 16:10:18 +0700 Subject: [PATCH 08/16] Allow number type in configuration --- src/util/config.go | 38 ++++++++++++++++++++++++++++++-------- 1 file changed, 30 insertions(+), 8 deletions(-) diff --git a/src/util/config.go b/src/util/config.go index 2eddf6e..62c9419 100644 --- a/src/util/config.go +++ b/src/util/config.go @@ -8,6 +8,7 @@ import ( "os" "reflect" "strings" + "strconv" "unicode" @@ -140,14 +141,35 @@ func ReadConfigFile(configFile string) { for i := 0; i < fields.NumField(); i++ { field := fields.Field(i).Name f := st.FieldByName(field) - - if f.Kind() == reflect.String { - envV := os.Getenv(field) - if len(envV) > 0 && f.IsValid() && f.CanSet() { - f.SetString(strings.TrimSuffix(envV, "\n")) // ensure no \n at the end of line that was introduced by loading k8s secrete file - } - os.Setenv(field, f.String()) - } + envV := os.Getenv(field) + + switch f.Kind() { + case reflect.String: + if len(envV) > 0 && f.IsValid() && f.CanSet() { + f.SetString(strings.TrimSuffix(envV, "\n")) // ensure no \n at the end of line that was introduced by loading k8s secrete file + } + os.Setenv(field, f.String()) + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + if len(envV) > 0 && f.IsValid() && f.CanSet() { + intVal, err := strconv.ParseInt(envV, 10, 64) + if err != nil { + panic(err) + } else { + f.SetInt(intVal) + } + } + os.Setenv(field, strconv.FormatInt(f.Int(), 10)) + case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: + if len(envV) > 0 && f.IsValid() && f.CanSet() { + uintVal, err := strconv.ParseUint(envV, 10, 64) + if err != nil { + panic(err) + } else { + f.SetUint(uintVal) + } + } + os.Setenv(field, strconv.FormatUint(f.Uint(), 10)) + } } clusterStr := AssignString(Config.PulsarClusters, "") From 5a0f0828a4a9f4753137bfb345de8913d7880859 Mon Sep 17 00:00:00 2001 From: "truong.hua" Date: Thu, 7 Jul 2022 16:11:07 +0700 Subject: [PATCH 09/16] Add WorkerPoolSize configuration which is to allow limit the number of concurrency of receiver --- src/util/config.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/util/config.go b/src/util/config.go index 62c9419..f960e19 100644 --- a/src/util/config.go +++ b/src/util/config.go @@ -82,6 +82,8 @@ type Configuration struct { // HTTPAuthImpl specifies the jwt authen and authorization algorithm, `noauth` to skip JWT authentication HTTPAuthImpl string `json:"HTTPAuthImpl"` + + WorkerPoolSize int `json:"WorkerPoolSize"` } var ( @@ -107,6 +109,10 @@ func Init() { ReadConfigFile(configFile) log.SetLevel(logLevel(Config.LogLevel)) + + if Config.WorkerPoolSize <= 0 { + Config.WorkerPoolSize = 4 + } log.Warnf("Configuration built from file - %s", configFile) JWTAuth = icrypto.NewRSAKeyPair(Config.PulsarPrivateKey, Config.PulsarPublicKey) From aebdadee63f22bbf2d47b1e35df34d9c60de1f51 Mon Sep 17 00:00:00 2001 From: "truong.hua" Date: Thu, 7 Jul 2022 16:13:22 +0700 Subject: [PATCH 10/16] Fork predefined goroutine as worker and reuse memory buffer to reduce pressure on GC which reduce a lot of CPU usage and memory usage when receive hugh amount number of request --- src/route/handlers.go | 158 +++++++++++++++++++++++++++--------------- 1 file changed, 102 insertions(+), 56 deletions(-) diff --git a/src/route/handlers.go b/src/route/handlers.go index f624684..83927f0 100644 --- a/src/route/handlers.go +++ b/src/route/handlers.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "io" - "io/ioutil" "net/http" "net/url" "strings" @@ -26,9 +25,27 @@ var singleDb db.Db const subDelimiter = "-" +// 5MB + 1 byte buffer (default Pulsar message size limit is 5MB https://pulsar.apache.org/docs/concepts-messaging/) +const workerBufferSize = 5242881 + +var workerPool chan func(buffer []byte) + // Init initializes database func Init() { singleDb = db.NewDbWithPanic(util.GetConfig().PbDbType) + + log.Infof("Start worker pool with size = %d", util.GetConfig().WorkerPoolSize) + workerPool = make(chan func(buffer []byte), util.GetConfig().WorkerPoolSize) + + // Start a number of goroutine as worker pool + for i := 0; i < util.GetConfig().WorkerPoolSize; i++ { + go func() { + var buffer [workerBufferSize]byte + for f := range workerPool { + f(buffer[:]) + } + }() + } } // TokenServerResponse is the json object for token server response @@ -73,68 +90,97 @@ func StatusPage(w http.ResponseWriter, r *http.Request) { return } -// Full message structure that contains all request information -type InfoRichMessage struct { - Headers http.Header `json:"headers"` - Body string `json:"body"` -} - // ReceiveHandler - the message receiver handler func ReceiveHandler(w http.ResponseWriter, r *http.Request) { - var b []byte - var err error - if r.Header.Get("Content-Encoding") == "gzip" { - g, gerr := gzip.NewReader(r.Body) - if gerr != nil { - util.ResponseErrorJSON(gerr, w, http.StatusInternalServerError) + done := make(chan bool) + workerPool <- func(buffer []byte) { + var b []byte = buffer[:0] + var err error + var bufferSize int = 0 + + defer r.Body.Close() + defer func() { done <- true }() + + // Include headers information into the message payload if url has includeHeaders=true + includeHeaders, isInfoRichMessage := r.URL.Query()["includeHeaders"] + + if isInfoRichMessage && includeHeaders[0] != "false" { + for name, values := range r.Header { + b = append(append(append(append(b, name...), ": "...), values[0]...), "\r\n"...) + } + b = append(b, "\r\n\r\n"...) + bufferSize = len(b) + } + + if r.Header.Get("Content-Encoding") == "gzip" { + g, gerr := gzip.NewReader(r.Body) + + if gerr != nil { + util.ResponseErrorJSON(gerr, w, http.StatusInternalServerError) + return + } + + defer g.Close() + + var n int + for { + n, err = g.Read(buffer[bufferSize:]) + bufferSize += n + if err == io.EOF { + break + } else if err != nil { + util.ResponseErrorJSON(err, w, http.StatusInternalServerError) + return + } else if bufferSize >= workerBufferSize { + util.ResponseErrorJSON(errors.New("Buffer overflow"), w, http.StatusInternalServerError) + return + } + } + } else { + var n int + for { + n, err = r.Body.Read(buffer[bufferSize:]) + bufferSize += n + if err == io.EOF { + break + } else if err != nil { + util.ResponseErrorJSON(err, w, http.StatusInternalServerError) + return + } else if bufferSize >= workerBufferSize { + util.ResponseErrorJSON(errors.New("Buffer overflow"), w, http.StatusInternalServerError) + return + } + } + } + + b = buffer[:bufferSize] + log.Debugf("Message buffer (size = %d): %s", bufferSize, b); + + token, topic, pulsarURL, err := util.ReceiverHeader(util.AllowedPulsarURLs, &r.Header) + if err != nil { + util.ResponseErrorJSON(err, w, http.StatusUnauthorized) return } - b, err = ioutil.ReadAll(g) - } else { - b, err = ioutil.ReadAll(r.Body) - } - defer r.Body.Close() - if err != nil { - util.ResponseErrorJSON(err, w, http.StatusInternalServerError) - return - } - token, topic, pulsarURL, err := util.ReceiverHeader(util.AllowedPulsarURLs, &r.Header) - if err != nil { - util.ResponseErrorJSON(err, w, http.StatusUnauthorized) - return - } - - // Include headers information into the message payload if url has includeHeaders=true - includeHeaders, isInfoRichMessage := r.URL.Query()["includeHeaders"] - - var infoRichMessage *InfoRichMessage - - if isInfoRichMessage && includeHeaders[0] != "false" { - infoRichMessage = new(InfoRichMessage) - infoRichMessage.Headers = r.Header - infoRichMessage.Body = string(b) - } - - if infoRichMessage != nil { - b, _ = json.Marshal(infoRichMessage) - } - - topicFN, err2 := GetTopicFnFromRoute(mux.Vars(r)) - if topic == "" && err2 != nil { - // only read topic from routes - util.ResponseErrorJSON(err2, w, http.StatusUnprocessableEntity) - return - } - topicFN = util.AssignString(topic, topicFN) // header topicFn overwrites topic specified in the routes - log.Infof("topicFN %s pulsarURL %s", topicFN, pulsarURL) + + topicFN, err2 := GetTopicFnFromRoute(mux.Vars(r)) + if topic == "" && err2 != nil { + // only read topic from routes + util.ResponseErrorJSON(err2, w, http.StatusUnprocessableEntity) + return + } + topicFN = util.AssignString(topic, topicFN) // header topicFn overwrites topic specified in the routes + log.Infof("topicFN %s pulsarURL %s", topicFN, pulsarURL) - pulsarAsync := r.URL.Query().Get("mode") == "async" - err = pulsardriver.SendToPulsar(pulsarURL, token, topicFN, b, pulsarAsync) - if err != nil { - util.ResponseErrorJSON(err, w, http.StatusServiceUnavailable) + pulsarAsync := r.URL.Query().Get("mode") == "async" + err = pulsardriver.SendToPulsar(pulsarURL, token, topicFN, b, pulsarAsync) + if err != nil { + util.ResponseErrorJSON(err, w, http.StatusServiceUnavailable) + return + } + w.WriteHeader(http.StatusOK) return } - w.WriteHeader(http.StatusOK) + <-done return } From e5c06f225a9300f881a4fb4f05105b1bd3456ea4 Mon Sep 17 00:00:00 2001 From: "truong.hua" Date: Thu, 7 Jul 2022 17:50:04 +0700 Subject: [PATCH 11/16] Add more description about WorkPoolSize config --- src/util/config.go | 1 + 1 file changed, 1 insertion(+) diff --git a/src/util/config.go b/src/util/config.go index f960e19..9f8f3b2 100644 --- a/src/util/config.go +++ b/src/util/config.go @@ -83,6 +83,7 @@ type Configuration struct { // HTTPAuthImpl specifies the jwt authen and authorization algorithm, `noauth` to skip JWT authentication HTTPAuthImpl string `json:"HTTPAuthImpl"` + // Limit concurency of receiver. Every worker will need to allocate a buffer memory (default is 5MB) WorkerPoolSize int `json:"WorkerPoolSize"` } From 8ff3d1b335c4e8e1082686881257cbcaa63634d3 Mon Sep 17 00:00:00 2001 From: "truong.hua" Date: Thu, 7 Jul 2022 17:51:01 +0700 Subject: [PATCH 12/16] Allow to define an empty string config via environment variable, which is very useful to allow user to disable a config --- src/util/config.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/util/config.go b/src/util/config.go index 9f8f3b2..a9e69af 100644 --- a/src/util/config.go +++ b/src/util/config.go @@ -148,11 +148,11 @@ func ReadConfigFile(configFile string) { for i := 0; i < fields.NumField(); i++ { field := fields.Field(i).Name f := st.FieldByName(field) - envV := os.Getenv(field) + envV, envPresent := os.LookupEnv(field) switch f.Kind() { case reflect.String: - if len(envV) > 0 && f.IsValid() && f.CanSet() { + if envPresent && f.IsValid() && f.CanSet() { f.SetString(strings.TrimSuffix(envV, "\n")) // ensure no \n at the end of line that was introduced by loading k8s secrete file } os.Setenv(field, f.String()) From 1c8c37200f5e001816f6700b4cd32b3f4e72bf49 Mon Sep 17 00:00:00 2001 From: "truong.hua" Date: Thu, 7 Jul 2022 17:53:45 +0700 Subject: [PATCH 13/16] Allow configurable HTTP header that use to pass Pulsar token. This commit prevent confiction when using Authorization header for both JWT and Pulsar token and may be other authorization purpose. Without this commit, using Authorization header as Pulsar token may cause memory leak because of token change per every request cause pulsar-beam to create a new pulsar client every time. With configurable Authorization, user can also use ?includeHeader to get Authorization header to validate a payload without casuing pulsar-beam memory leak --- src/util/config.go | 14 ++++++++++---- src/util/util.go | 5 ++++- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/src/util/config.go b/src/util/config.go index a9e69af..0eb814a 100644 --- a/src/util/config.go +++ b/src/util/config.go @@ -85,6 +85,9 @@ type Configuration struct { // Limit concurency of receiver. Every worker will need to allocate a buffer memory (default is 5MB) WorkerPoolSize int `json:"WorkerPoolSize"` + + // Name of the HTTP header to use for Pulsar token to authorize pulsar client, set tp empty to disable pulsar token authorization + PulsarTokenHeaderName string `json:"PulsarTokenHeaderName"` } var ( @@ -107,13 +110,16 @@ var ( // Init initializes configuration func Init() { configFile := AssignString(os.Getenv("PULSAR_BEAM_CONFIG"), DefaultConfigFile) + + // Default config + Config.WorkerPoolSize = 4 + Config.PulsarTokenHeaderName = "Authorization" + ReadConfigFile(configFile) + + fmt.Printf("PulsarTokenHeaderName %s", Config.PulsarTokenHeaderName) log.SetLevel(logLevel(Config.LogLevel)) - - if Config.WorkerPoolSize <= 0 { - Config.WorkerPoolSize = 4 - } log.Warnf("Configuration built from file - %s", configFile) JWTAuth = icrypto.NewRSAKeyPair(Config.PulsarPrivateKey, Config.PulsarPublicKey) diff --git a/src/util/util.go b/src/util/util.go index 1093f19..8de3968 100644 --- a/src/util/util.go +++ b/src/util/util.go @@ -59,7 +59,10 @@ func ResponseErrorJSON(e error, w http.ResponseWriter, statusCode int) { // ReceiverHeader parses headers for Pulsar required configuration func ReceiverHeader(allowedClusters []string, h *http.Header) (token, topicFN, pulsarURL string, err error) { - token = strings.TrimSpace(strings.Replace(h.Get("Authorization"), "Bearer", "", 1)) + token = "" + if GetConfig().PulsarTokenHeaderName != "" { + token = strings.TrimSpace(strings.Replace(h.Get(GetConfig().PulsarTokenHeaderName), "Bearer", "", 1)) + } topicFN = h.Get("TopicFn") pulsarURL = h.Get("PulsarUrl") if len(allowedClusters) > 1 || (len(allowedClusters) == 1 && allowedClusters[0] != "") { From 8a042201428c920ae45d048a767969571bfbf9e3 Mon Sep 17 00:00:00 2001 From: "truong.hua" Date: Fri, 8 Jul 2022 16:01:30 +0700 Subject: [PATCH 14/16] Allow to specify ?includeRequestLine=true in the url to include HTTP Request Line in the payload to send to Pulsar, which is useful in case of Pulsar consumer would like to detect the url or method that webhook client is using --- src/route/handlers.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/route/handlers.go b/src/route/handlers.go index 83927f0..4500c7a 100644 --- a/src/route/handlers.go +++ b/src/route/handlers.go @@ -100,11 +100,19 @@ func ReceiveHandler(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() defer func() { done <- true }() + + // Include request line (GET /uri HTTP/1.1) into the message payload if url has includeRequestLine=true + includeRequestLine, isIncludeRequestLine := r.URL.Query()["includeRequestLine"] + + if isIncludeRequestLine && includeRequestLine[0] != "false" { + b = append(append(append(append(append(append(b, r.Method...), " "...), r.RequestURI...), " "...), r.Proto...), "\r\n"...) + bufferSize = len(b) + } // Include headers information into the message payload if url has includeHeaders=true - includeHeaders, isInfoRichMessage := r.URL.Query()["includeHeaders"] + includeHeaders, isIncludeHeaders := r.URL.Query()["includeHeaders"] - if isInfoRichMessage && includeHeaders[0] != "false" { + if isIncludeHeaders && includeHeaders[0] != "false" { for name, values := range r.Header { b = append(append(append(append(b, name...), ": "...), values[0]...), "\r\n"...) } From 4246a28c3d31dc3992550e5a82202a8d0463b08d Mon Sep 17 00:00:00 2001 From: "truong.hua" Date: Wed, 13 Jul 2022 14:02:27 +0700 Subject: [PATCH 15/16] Fix bug that missing header body delimiter if there is only ?includeRequestLine --- src/route/handlers.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/route/handlers.go b/src/route/handlers.go index 4500c7a..be04d58 100644 --- a/src/route/handlers.go +++ b/src/route/handlers.go @@ -106,7 +106,6 @@ func ReceiveHandler(w http.ResponseWriter, r *http.Request) { if isIncludeRequestLine && includeRequestLine[0] != "false" { b = append(append(append(append(append(append(b, r.Method...), " "...), r.RequestURI...), " "...), r.Proto...), "\r\n"...) - bufferSize = len(b) } // Include headers information into the message payload if url has includeHeaders=true @@ -116,9 +115,13 @@ func ReceiveHandler(w http.ResponseWriter, r *http.Request) { for name, values := range r.Header { b = append(append(append(append(b, name...), ": "...), values[0]...), "\r\n"...) } - b = append(b, "\r\n\r\n"...) - bufferSize = len(b) } + + // Append header delimiter (\r\n\r\n) and adjust the buffer size + if isIncludeRequestLine || isIncludeHeaders { + b = append(b, "\r\n\r\n"...) + bufferSize = len(b) + } if r.Header.Get("Content-Encoding") == "gzip" { g, gerr := gzip.NewReader(r.Body) From 5f067cc91c31fdbf3d87500aa6dfab5b4c1b71ad Mon Sep 17 00:00:00 2001 From: "truong.hua" Date: Wed, 13 Jul 2022 14:03:58 +0700 Subject: [PATCH 16/16] Add an notice about memory leak if using default Pulsar Beam configuration with a reverse proxy engine in front of or using other Authorization method. This commit also add a short brief to let people know how to detect memory leak and resolve it. Further default configuration of Pulsar Beam should prevent this case from happening. --- README.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/README.md b/README.md index 10a1d2a..c67caa6 100644 --- a/README.md +++ b/README.md @@ -91,6 +91,13 @@ Pulsar Beam requires the same public and private keys to generate and verify JWT To disable JWT authentication, set the paramater `HTTPAuthImpl` in the config file or env variable to `noauth`. +Notice: Pulsar Beam create one client connection per pulsar url per token, so using other authorization on top of Pulsar Beam may cause memory leak due to creating of a lot of pulsar client. In order to use other authorization like reverse proxy (like nginx) on top of Pulsar Beam, please disable Pulsar authorization by setting `PulsarTokenHeaderName` to empty string (default is "Authorization"). If you would like to keep both authorization of reverse proxy and Pulsar, please change `PulsarTokenHeaderName` to another header name that is different than "Authorization" or not using by reverse proxy. + +How to know that you are under memory leak? + +- Option 1: Use `gops` to check running go routine and if you are having a lot of routine that doing ping/pong with Pulsar brokers. +- Option 2: We are using Pulsar Beam as http webhook receiver at https://doopage.com and we are able to handle few millions of request every day with CPU stable at <3% (8vCPU AWS) and Memory <40MB (`WorkerPoolSize` = 16). If you are using more resources than us, please try to set `PulsarTokenHeaderName` to empty string to check whether the problem is resolved. + ### Sink source If a webhook's response contains a body and three headers including `Authorization` for Pulsar JWT, `TopicFn` for a topic fully qualified name, and `PulsarUrl`, the beam server will send the body as a new message to the Pulsar's topic specified as in TopicFn and PulsarUrl.