Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

These commit is mostly resolve the memory leak problem and optimize memory usage to handle millions of requests #81

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
70287ab
Support decompress gzip request body by detecting header 'Content-Enc…
truong-hua Nov 28, 2021
ff94fc8
Merge branch 'kafkaesque-io:master' into master
truong-hua May 4, 2022
43ee835
Support GET param ?includeHeaders=true to generate rich message forma…
truong-hua May 9, 2022
c2e3604
Implement pprof so we can profile the performance while daemon is run…
truong-hua Jun 21, 2022
b747439
Implement missed pprof endpoints
truong-hua Jul 1, 2022
191e652
Leverage docker cache to speed up futher build by caching the depende…
truong-hua Jul 7, 2022
1ada8fa
This log is duplicated with the same log in NewPulsarClient
truong-hua Jul 7, 2022
9d42172
Add more debug log to trace the resource leaking problem
truong-hua Jul 7, 2022
ce0a8b2
Allow number type in configuration
truong-hua Jul 7, 2022
5a0f082
Add WorkerPoolSize configuration which is to allow limit the number o…
truong-hua Jul 7, 2022
aebdade
Fork predefined goroutine as worker and reuse memory buffer to reduce…
truong-hua Jul 7, 2022
e5c06f2
Add more description about WorkPoolSize config
truong-hua Jul 7, 2022
8ff3d1b
Allow to define an empty string config via environment variable, whic…
truong-hua Jul 7, 2022
1c8c372
Allow configurable HTTP header that use to pass Pulsar token. This co…
truong-hua Jul 7, 2022
8a04220
Allow to specify ?includeRequestLine=true in the url to include HTTP …
truong-hua Jul 8, 2022
4246a28
Fix bug that missing header body delimiter if there is only ?includeR…
truong-hua Jul 13, 2022
5f067cc
Add an notice about memory leak if using default Pulsar Beam configur…
truong-hua Jul 13, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ RUN apk --no-cache add build-base git
RUN go install github.com/google/gops@latest

WORKDIR /root/

# Cache dependencies download
ADD go.mod go.sum ./
RUN go mod graph | awk '{if ($1 !~ "@") print $2}' | xargs go get

ADD . /root
RUN cd /root/src && go build -o pulsar-beam

Expand Down
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ Pulsar Beam requires the same public and private keys to generate and verify JWT

To disable JWT authentication, set the paramater `HTTPAuthImpl` in the config file or env variable to `noauth`.

Notice: Pulsar Beam create one client connection per pulsar url per token, so using other authorization on top of Pulsar Beam may cause memory leak due to creating of a lot of pulsar client. In order to use other authorization like reverse proxy (like nginx) on top of Pulsar Beam, please disable Pulsar authorization by setting `PulsarTokenHeaderName` to empty string (default is "Authorization"). If you would like to keep both authorization of reverse proxy and Pulsar, please change `PulsarTokenHeaderName` to another header name that is different than "Authorization" or not using by reverse proxy.

How to know that you are under memory leak?

- Option 1: Use `gops` to check running go routine and if you are having a lot of routine that doing ping/pong with Pulsar brokers.
- Option 2: We are using Pulsar Beam as http webhook receiver at https://doopage.com and we are able to handle few millions of request every day with CPU stable at <3% (8vCPU AWS) and Memory <40MB (`WorkerPoolSize` = 16). If you are using more resources than us, please try to set `PulsarTokenHeaderName` to empty string to check whether the problem is resolved.

### Sink source

If a webhook's response contains a body and three headers including `Authorization` for Pulsar JWT, `TopicFn` for a topic fully qualified name, and `PulsarUrl`, the beam server will send the body as a new message to the Pulsar's topic specified as in TopicFn and PulsarUrl.
Expand Down
4 changes: 1 addition & 3 deletions src/pulsardriver/pulsar-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func GetPulsarClient(pulsarURL, pulsarToken string, reset bool) (pulsar.Client,
clientSync.Lock()
driver, ok := ClientCache[key]
if !ok {
log.Debugf("Creating new pulsar client cache %s\n token %s", pulsarURL, pulsarToken)
driver = &PulsarClient{}
driver.createdAt = time.Now()
driver.pulsarURL = pulsarURL
Expand Down Expand Up @@ -67,9 +68,6 @@ func (c *PulsarClient) GetClient(url, tokenStr string) (pulsar.Client, error) {
log.Errorf("failed instantiate pulsar client %v", err)
return nil, fmt.Errorf("Could not instantiate Pulsar client: %v", err)
}
if log.GetLevel() == log.DebugLevel {
log.Debugf("pulsar client url %s\n token %s", url, tokenStr)
}

c.client = driver
return driver, nil
Expand Down
148 changes: 113 additions & 35 deletions src/route/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"strings"
Expand All @@ -26,9 +25,27 @@ var singleDb db.Db

const subDelimiter = "-"

// 5MB + 1 byte buffer (default Pulsar message size limit is 5MB https://pulsar.apache.org/docs/concepts-messaging/)
const workerBufferSize = 5242881

var workerPool chan func(buffer []byte)

// Init initializes database
func Init() {
singleDb = db.NewDbWithPanic(util.GetConfig().PbDbType)

log.Infof("Start worker pool with size = %d", util.GetConfig().WorkerPoolSize)
workerPool = make(chan func(buffer []byte), util.GetConfig().WorkerPoolSize)

// Start a number of goroutine as worker pool
for i := 0; i < util.GetConfig().WorkerPoolSize; i++ {
go func() {
var buffer [workerBufferSize]byte
for f := range workerPool {
f(buffer[:])
}
}()
}
}

// TokenServerResponse is the json object for token server response
Expand Down Expand Up @@ -75,45 +92,106 @@ func StatusPage(w http.ResponseWriter, r *http.Request) {

// ReceiveHandler - the message receiver handler
func ReceiveHandler(w http.ResponseWriter, r *http.Request) {
var b []byte
var err error
if r.Header.Get("Content-Encoding") == "gzip" {
g, gerr := gzip.NewReader(r.Body)
if gerr != nil {
util.ResponseErrorJSON(gerr, w, http.StatusInternalServerError)
done := make(chan bool)
workerPool <- func(buffer []byte) {
var b []byte = buffer[:0]
var err error
var bufferSize int = 0

defer r.Body.Close()
defer func() { done <- true }()

// Include request line (GET /uri HTTP/1.1) into the message payload if url has includeRequestLine=true
includeRequestLine, isIncludeRequestLine := r.URL.Query()["includeRequestLine"]

if isIncludeRequestLine && includeRequestLine[0] != "false" {
b = append(append(append(append(append(append(b, r.Method...), " "...), r.RequestURI...), " "...), r.Proto...), "\r\n"...)
}

// Include headers information into the message payload if url has includeHeaders=true
includeHeaders, isIncludeHeaders := r.URL.Query()["includeHeaders"]

if isIncludeHeaders && includeHeaders[0] != "false" {
for name, values := range r.Header {
b = append(append(append(append(b, name...), ": "...), values[0]...), "\r\n"...)
}
}

// Append header delimiter (\r\n\r\n) and adjust the buffer size
if isIncludeRequestLine || isIncludeHeaders {
b = append(b, "\r\n\r\n"...)
bufferSize = len(b)
}

if r.Header.Get("Content-Encoding") == "gzip" {
g, gerr := gzip.NewReader(r.Body)

if gerr != nil {
util.ResponseErrorJSON(gerr, w, http.StatusInternalServerError)
return
}

defer g.Close()

var n int
for {
n, err = g.Read(buffer[bufferSize:])
bufferSize += n
if err == io.EOF {
break
} else if err != nil {
util.ResponseErrorJSON(err, w, http.StatusInternalServerError)
return
} else if bufferSize >= workerBufferSize {
util.ResponseErrorJSON(errors.New("Buffer overflow"), w, http.StatusInternalServerError)
return
}
}
} else {
var n int
for {
n, err = r.Body.Read(buffer[bufferSize:])
bufferSize += n
if err == io.EOF {
break
} else if err != nil {
util.ResponseErrorJSON(err, w, http.StatusInternalServerError)
return
} else if bufferSize >= workerBufferSize {
util.ResponseErrorJSON(errors.New("Buffer overflow"), w, http.StatusInternalServerError)
return
}
}
}

b = buffer[:bufferSize]
log.Debugf("Message buffer (size = %d): %s", bufferSize, b);

token, topic, pulsarURL, err := util.ReceiverHeader(util.AllowedPulsarURLs, &r.Header)
if err != nil {
util.ResponseErrorJSON(err, w, http.StatusUnauthorized)
return
}
b, err = ioutil.ReadAll(g)
} else {
b, err = ioutil.ReadAll(r.Body)
}
defer r.Body.Close()
if err != nil {
util.ResponseErrorJSON(err, w, http.StatusInternalServerError)
return
}
token, topic, pulsarURL, err := util.ReceiverHeader(util.AllowedPulsarURLs, &r.Header)
if err != nil {
util.ResponseErrorJSON(err, w, http.StatusUnauthorized)
return
}

topicFN, err2 := GetTopicFnFromRoute(mux.Vars(r))
if topic == "" && err2 != nil {
// only read topic from routes
util.ResponseErrorJSON(err2, w, http.StatusUnprocessableEntity)
return
}
topicFN = util.AssignString(topic, topicFN) // header topicFn overwrites topic specified in the routes
log.Infof("topicFN %s pulsarURL %s", topicFN, pulsarURL)

topicFN, err2 := GetTopicFnFromRoute(mux.Vars(r))
if topic == "" && err2 != nil {
// only read topic from routes
util.ResponseErrorJSON(err2, w, http.StatusUnprocessableEntity)
return
}
topicFN = util.AssignString(topic, topicFN) // header topicFn overwrites topic specified in the routes
log.Infof("topicFN %s pulsarURL %s", topicFN, pulsarURL)

pulsarAsync := r.URL.Query().Get("mode") == "async"
err = pulsardriver.SendToPulsar(pulsarURL, token, topicFN, b, pulsarAsync)
if err != nil {
util.ResponseErrorJSON(err, w, http.StatusServiceUnavailable)
pulsarAsync := r.URL.Query().Get("mode") == "async"
err = pulsardriver.SendToPulsar(pulsarURL, token, topicFN, b, pulsarAsync)
if err != nil {
util.ResponseErrorJSON(err, w, http.StatusServiceUnavailable)
return
}
w.WriteHeader(http.StatusOK)
return
}
w.WriteHeader(http.StatusOK)
<-done
return
}

Expand Down
12 changes: 11 additions & 1 deletion src/route/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package route

import (
"net/http"
"net/http/pprof"

"github.com/gorilla/mux"

Expand All @@ -25,8 +26,17 @@ func NewRouter(mode *string) *mux.Router {
Path(route.Pattern).
Name(route.Name).
Handler(route.AuthFunc(handler))

}

router.Handle("/debug/pprof", http.HandlerFunc(pprof.Index))
router.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline))
router.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile))
router.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol))
router.Handle("/debug/pprof/heap", pprof.Handler("heap"))
router.Handle("/debug/pprof/block", pprof.Handler("block"))
router.Handle("/debug/pprof/goroutine", pprof.Handler("goroutine"))
router.Handle("/debug/pprof/threadcreate", pprof.Handler("threadcreate"))

// TODO rate limit can be added per route basis
router.Use(middleware.LimitRate)

Expand Down
51 changes: 43 additions & 8 deletions src/util/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"reflect"
"strings"
"strconv"

"unicode"

Expand Down Expand Up @@ -81,6 +82,12 @@ type Configuration struct {

// HTTPAuthImpl specifies the jwt authen and authorization algorithm, `noauth` to skip JWT authentication
HTTPAuthImpl string `json:"HTTPAuthImpl"`

// Limit concurency of receiver. Every worker will need to allocate a buffer memory (default is 5MB)
WorkerPoolSize int `json:"WorkerPoolSize"`

// Name of the HTTP header to use for Pulsar token to authorize pulsar client, set tp empty to disable pulsar token authorization
PulsarTokenHeaderName string `json:"PulsarTokenHeaderName"`
}

var (
Expand All @@ -103,7 +110,14 @@ var (
// Init initializes configuration
func Init() {
configFile := AssignString(os.Getenv("PULSAR_BEAM_CONFIG"), DefaultConfigFile)

// Default config
Config.WorkerPoolSize = 4
Config.PulsarTokenHeaderName = "Authorization"

ReadConfigFile(configFile)

fmt.Printf("PulsarTokenHeaderName %s", Config.PulsarTokenHeaderName)

log.SetLevel(logLevel(Config.LogLevel))

Expand Down Expand Up @@ -140,14 +154,35 @@ func ReadConfigFile(configFile string) {
for i := 0; i < fields.NumField(); i++ {
field := fields.Field(i).Name
f := st.FieldByName(field)

if f.Kind() == reflect.String {
envV := os.Getenv(field)
if len(envV) > 0 && f.IsValid() && f.CanSet() {
f.SetString(strings.TrimSuffix(envV, "\n")) // ensure no \n at the end of line that was introduced by loading k8s secrete file
}
os.Setenv(field, f.String())
}
envV, envPresent := os.LookupEnv(field)

switch f.Kind() {
case reflect.String:
if envPresent && f.IsValid() && f.CanSet() {
f.SetString(strings.TrimSuffix(envV, "\n")) // ensure no \n at the end of line that was introduced by loading k8s secrete file
}
os.Setenv(field, f.String())
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
if len(envV) > 0 && f.IsValid() && f.CanSet() {
intVal, err := strconv.ParseInt(envV, 10, 64)
if err != nil {
panic(err)
} else {
f.SetInt(intVal)
}
}
os.Setenv(field, strconv.FormatInt(f.Int(), 10))
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
if len(envV) > 0 && f.IsValid() && f.CanSet() {
uintVal, err := strconv.ParseUint(envV, 10, 64)
if err != nil {
panic(err)
} else {
f.SetUint(uintVal)
}
}
os.Setenv(field, strconv.FormatUint(f.Uint(), 10))
}
}

clusterStr := AssignString(Config.PulsarClusters, "")
Expand Down
5 changes: 4 additions & 1 deletion src/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ func ResponseErrorJSON(e error, w http.ResponseWriter, statusCode int) {

// ReceiverHeader parses headers for Pulsar required configuration
func ReceiverHeader(allowedClusters []string, h *http.Header) (token, topicFN, pulsarURL string, err error) {
token = strings.TrimSpace(strings.Replace(h.Get("Authorization"), "Bearer", "", 1))
token = ""
if GetConfig().PulsarTokenHeaderName != "" {
token = strings.TrimSpace(strings.Replace(h.Get(GetConfig().PulsarTokenHeaderName), "Bearer", "", 1))
}
topicFN = h.Get("TopicFn")
pulsarURL = h.Get("PulsarUrl")
if len(allowedClusters) > 1 || (len(allowedClusters) == 1 && allowedClusters[0] != "") {
Expand Down