From 8f6f12c14f301eb4cdada30bb75731ac6630509e Mon Sep 17 00:00:00 2001 From: shaimoria Date: Wed, 1 Jul 2020 12:43:40 +0300 Subject: [PATCH 1/8] set go mod --- go.mod | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 go.mod diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..f3647af --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module github.com/PayU/fluentBit-sqs-plugin + +go 1.14 From 122c2054ec0a398e2fabaafd99100d396abadae5 Mon Sep 17 00:00:00 2001 From: shaimoria Date: Fri, 3 Jul 2020 15:14:05 +0300 Subject: [PATCH 2/8] testing now --- out_sqs.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/out_sqs.go b/out_sqs.go index e7545cb..0c6d639 100644 --- a/out_sqs.go +++ b/out_sqs.go @@ -47,6 +47,8 @@ func FLBPluginInit(plugin unsafe.Pointer) int { writeInfoLog(fmt.Sprintf("QueueMessageGroupId is: %s", queueMessageGroupID)) writeInfoLog(fmt.Sprintf("pluginTagAttribute is: %s", pluginTagAttribute)) + writeInfoLog("Im here on testing!!") + if queueURL == "" { writeErrorLog(errors.New("QueueUrl configuration key is mandatory")) return output.FLB_ERROR @@ -252,5 +254,10 @@ func writeErrorLog(err error) { fmt.Printf("[%s][error][sqs-out] %v\n", currentTime.Format("2006.01.02 15:04:05"), err) } +func writeDebugLog(message string) { + currentTime := time.Now() + fmt.Printf("[%s][debug][sqs-out] %s\n", currentTime.Format("2006.01.02 15:04:05"), message) +} + func main() { } From c3321b81406750a3ac343b21aeefc0dc1a459e2e Mon Sep 17 00:00:00 2001 From: shaimoria Date: Fri, 3 Jul 2020 15:44:46 +0300 Subject: [PATCH 3/8] during work --- out_sqs.go | 37 ++++++++++++++++++++++++++++++------- 1 file changed, 30 insertions(+), 7 deletions(-) diff --git a/out_sqs.go b/out_sqs.go index 0c6d639..42feaed 100644 --- a/out_sqs.go +++ b/out_sqs.go @@ -15,6 +15,8 @@ import ( ) import ( "encoding/json" + "net/http" + "net/url" "strings" ) @@ -29,25 +31,30 @@ type sqsConfig struct { queueMessageGroupID string mySQS *sqs.SQS pluginTagAttribute string + proxyURL string } //export FLBPluginRegister func FLBPluginRegister(def unsafe.Pointer) int { + writeDebugLog("starting FLBPluginRegister") return output.FLBPluginRegister(def, "sqs", "aws sqs output plugin") } //export FLBPluginInit func FLBPluginInit(plugin unsafe.Pointer) int { + writeDebugLog("starting FLBPluginInit") + queueURL := output.FLBPluginConfigKey(plugin, "QueueUrl") queueRegion := output.FLBPluginConfigKey(plugin, "QueueRegion") queueMessageGroupID := output.FLBPluginConfigKey(plugin, "QueueMessageGroupId") pluginTagAttribute := output.FLBPluginConfigKey(plugin, "PluginTagAttribute") + proxyURL := output.FLBPluginConfigKey(plugin, "ProxyURL") + writeInfoLog(fmt.Sprintf("QueueUrl is: %s", queueURL)) writeInfoLog(fmt.Sprintf("QueueRegion is: %s", queueRegion)) writeInfoLog(fmt.Sprintf("QueueMessageGroupId is: %s", queueMessageGroupID)) writeInfoLog(fmt.Sprintf("pluginTagAttribute is: %s", pluginTagAttribute)) - - writeInfoLog("Im here on testing!!") + writeInfoLog(fmt.Sprintf("ProxyUrl is: %s", proxyURL)) if queueURL == "" { writeErrorLog(errors.New("QueueUrl configuration key is mandatory")) @@ -70,25 +77,41 @@ func FLBPluginInit(plugin unsafe.Pointer) int { awsCredentials := credentials.NewEnvCredentials() var myAWSSession *session.Session var sessionError error + var awsConfig *aws.Config // Retrieve the credentials value _, credError := awsCredentials.Get() if credError != nil { writeInfoLog("unable to find aws credentials from environment variables..using credentials chain") - myAWSSession, sessionError = session.NewSession(&aws.Config{ + awsConfig = &aws.Config{ Region: aws.String(queueRegion), CredentialsChainVerboseErrors: aws.Bool(true), - }) + } } else { - // environment variables credentials was found writeInfoLog("environment variables credentials where found") - myAWSSession, sessionError = session.NewSession(&aws.Config{ + awsConfig = &aws.Config{ Region: aws.String(queueRegion), CredentialsChainVerboseErrors: aws.Bool(true), Credentials: awsCredentials, - }) + } + } + + // if proxy + if proxyURL != "" { + writeInfoLog("set http client struct on aws configuration since proxy url has been found") + awsConfig.HTTPClient = &http.Client{ + Transport: &http.Transport{ + Proxy: func(*http.Request) (*url.URL, error) { + return url.Parse(proxyURL) // Or your own implementation that decides a proxy based on the URL in the request + }, + }, + } + + writeInfoLog(awsConfig) } + // create the session + myAWSSession, sessionError = session.NewSession(awsConfig) if sessionError != nil { writeErrorLog(sessionError) return output.FLB_ERROR From c34cf69c4b6d23b0f3720c79e99a20d118537963 Mon Sep 17 00:00:00 2001 From: shaimoria Date: Fri, 3 Jul 2020 16:05:57 +0300 Subject: [PATCH 4/8] testing --- out_sqs.go | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/out_sqs.go b/out_sqs.go index 42feaed..2138039 100644 --- a/out_sqs.go +++ b/out_sqs.go @@ -36,14 +36,11 @@ type sqsConfig struct { //export FLBPluginRegister func FLBPluginRegister(def unsafe.Pointer) int { - writeDebugLog("starting FLBPluginRegister") return output.FLBPluginRegister(def, "sqs", "aws sqs output plugin") } //export FLBPluginInit func FLBPluginInit(plugin unsafe.Pointer) int { - writeDebugLog("starting FLBPluginInit") - queueURL := output.FLBPluginConfigKey(plugin, "QueueUrl") queueRegion := output.FLBPluginConfigKey(plugin, "QueueRegion") queueMessageGroupID := output.FLBPluginConfigKey(plugin, "QueueMessageGroupId") @@ -106,8 +103,6 @@ func FLBPluginInit(plugin unsafe.Pointer) int { }, }, } - - writeInfoLog(awsConfig) } // create the session @@ -277,10 +272,5 @@ func writeErrorLog(err error) { fmt.Printf("[%s][error][sqs-out] %v\n", currentTime.Format("2006.01.02 15:04:05"), err) } -func writeDebugLog(message string) { - currentTime := time.Now() - fmt.Printf("[%s][debug][sqs-out] %s\n", currentTime.Format("2006.01.02 15:04:05"), message) -} - func main() { } From b39b89df37e6291d993d25bbd55fc5b8ad5c27b2 Mon Sep 17 00:00:00 2001 From: shaimoria Date: Sat, 4 Jul 2020 12:40:55 +0300 Subject: [PATCH 5/8] update logs format --- out_sqs.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/out_sqs.go b/out_sqs.go index 2138039..b68d7cb 100644 --- a/out_sqs.go +++ b/out_sqs.go @@ -264,12 +264,12 @@ func createRecordString(timestamp time.Time, tag string, record map[interface{}] func writeInfoLog(message string) { currentTime := time.Now() - fmt.Printf("[%s][info][sqs-out] %s\n", currentTime.Format("2006.01.02 15:04:05"), message) + fmt.Printf("[%s] [info] [sqs-out] %s\n", currentTime.Format("2006.01.02 15:04:05"), message) } func writeErrorLog(err error) { currentTime := time.Now() - fmt.Printf("[%s][error][sqs-out] %v\n", currentTime.Format("2006.01.02 15:04:05"), err) + fmt.Printf("[%s] [error] [sqs-out] %v\n", currentTime.Format("2006.01.02 15:04:05"), err) } func main() { From 9a0ebfa65427f3b0cd83f9d429a25e0a4e2e8251 Mon Sep 17 00:00:00 2001 From: shaimoria Date: Sat, 4 Jul 2020 13:27:41 +0300 Subject: [PATCH 6/8] improve logs --- out_sqs.go | 50 ++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 44 insertions(+), 6 deletions(-) diff --git a/out_sqs.go b/out_sqs.go index b68d7cb..62f1763 100644 --- a/out_sqs.go +++ b/out_sqs.go @@ -6,6 +6,7 @@ import ( "fmt" "time" "unsafe" + "os" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" @@ -20,6 +21,12 @@ import ( "strings" ) +// integer representation for this plugin log level +// 0 - debug +// 1 - info +// 2 - error +var sqsOutLogLevel int + // MessageCounter is used for count the current SQS Batch messages var MessageCounter int = 0 @@ -36,6 +43,7 @@ type sqsConfig struct { //export FLBPluginRegister func FLBPluginRegister(def unsafe.Pointer) int { + setLogLevel() return output.FLBPluginRegister(def, "sqs", "aws sqs output plugin") } @@ -149,6 +157,8 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int break } + writeDebugLog(fmt.Sprintf("got new record from input. record length is: %d", len(record))) + if len(record) == 0 { writeInfoLog("got empty record from input. skipping it") continue @@ -170,7 +180,7 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int recordString, err := createRecordString(timeStamp, tagStr, record) if err != nil { - fmt.Printf("%v\n", err) + writeErrorLog(err) // DO NOT RETURN HERE becase one message has an error when json is // generated, but a retry would fetch ALL messages again. instead an // error should be printed to console @@ -179,6 +189,9 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int MessageCounter++ + writeDebugLog(fmt.Sprintf("record string: %s", recordString)) + writeDebugLog(fmt.Sprintf("message counter: %d", MessageCounter)) + sqsRecord = &sqs.SendMessageBatchRequestEntry{ Id: aws.String(fmt.Sprintf("MessageNumber-%d", MessageCounter)), MessageBody: aws.String(recordString), @@ -210,7 +223,6 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int SqsRecords = nil MessageCounter = 0 } - } return output.FLB_OK @@ -262,14 +274,40 @@ func createRecordString(timestamp time.Time, tag string, record map[interface{}] return string(js), nil } +func writeDebugLog(message string) { + if sqsOutLogLevel == 0 { + currentTime := time.Now() + fmt.Printf("[%s] [ debug] [sqs-out] %s\n", currentTime.Format("2006.01.02 15:04:05"), message) + } +} + func writeInfoLog(message string) { - currentTime := time.Now() - fmt.Printf("[%s] [info] [sqs-out] %s\n", currentTime.Format("2006.01.02 15:04:05"), message) + if sqsOutLogLevel <= 1 { + currentTime := time.Now() + fmt.Printf("[%s] [ info] [sqs-out] %s\n", currentTime.Format("2006.01.02 15:04:05"), message) + } } func writeErrorLog(err error) { - currentTime := time.Now() - fmt.Printf("[%s] [error] [sqs-out] %v\n", currentTime.Format("2006.01.02 15:04:05"), err) + if sqsOutLogLevel <= 2 { + currentTime := time.Now() + fmt.Printf("[%s] [ error] [sqs-out] %v\n", currentTime.Format("2006.01.02 15:04:05"), err) + } +} + +func setLogLevel() { + logEnv := os.Getenv("SQS_OUT_LOG_LEVEL") + + switch strings.ToLower(logEnv) { + case "debug": + sqsOutLogLevel = 0 + case "info": + sqsOutLogLevel = 1 + case "error": + sqsOutLogLevel = 2 + default: + sqsOutLogLevel = 1 // info + } } func main() { From 2061024fb1c1deb17c69ee73297492d4ff3176f7 Mon Sep 17 00:00:00 2001 From: shaimoria Date: Sat, 4 Jul 2020 13:41:55 +0300 Subject: [PATCH 7/8] update logic --- out_sqs.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/out_sqs.go b/out_sqs.go index 62f1763..5ea1991 100644 --- a/out_sqs.go +++ b/out_sqs.go @@ -215,13 +215,13 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int if MessageCounter == 10 { err := sendBatchToSqs(sqsConf, SqsRecords) + SqsRecords = nil + MessageCounter = 0 + if err != nil { writeErrorLog(err) return output.FLB_ERROR } - - SqsRecords = nil - MessageCounter = 0 } } From 843e5143f17135643338c5a77de78fdcf43fd107 Mon Sep 17 00:00:00 2001 From: shaimoria Date: Sat, 4 Jul 2020 13:49:07 +0300 Subject: [PATCH 8/8] update readme --- README.md | 15 +++++++++------ out_sqs.go | 2 +- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 02d4bc5..75c176a 100644 --- a/README.md +++ b/README.md @@ -4,12 +4,13 @@ FluntBit custom output plugin which allows sending messages to AWS-SQS. ## Configuration Parameters -| Configuration Key Name | Description | Mandatory | -| ---------------------- | ------------------------------------- | --------- | -| QueueUrl | the queue url in your aws account | yes | -| QueueRegion | the queue region in your aws account | yes | -| PluginTagAttribute | attribute name of the message tag | no | -| QueueMessageGroupId | the group id required for fifo queues | fifo-only | +| Configuration Key Name | Description | Mandatory | +| ---------------------- | -------------------------------------------------------- | --------- | +| QueueUrl | the queue url in your aws account | yes | +| QueueRegion | the queue region in your aws account | yes | +| PluginTagAttribute | attribute name of the message tag | no | +| QueueMessageGroupId | the group id required for fifo queues | fifo-only | +| ProxyUrl | the proxy address between fluentbit and sqs (if exists) | no | ```conf [SERVICE] @@ -73,3 +74,5 @@ More information about the usage and installation of golang plugins can be found 2) Shared credentials file. 3) If your application is running on an Amazon EC2 instance, IAM role for Amazon EC2. The IAM role should have full access to your SQS and in addition, it should add the following KMS permissions: `kms:GenerateDataKey*, kms:Get*, kms:Decrypt*` + +- The plugin uses specific environment variable for log level: `SQS_OUT_LOG_LEVEL`. Supported values are: `debug`, `info` or `error` diff --git a/out_sqs.go b/out_sqs.go index 5ea1991..33d9081 100644 --- a/out_sqs.go +++ b/out_sqs.go @@ -53,7 +53,7 @@ func FLBPluginInit(plugin unsafe.Pointer) int { queueRegion := output.FLBPluginConfigKey(plugin, "QueueRegion") queueMessageGroupID := output.FLBPluginConfigKey(plugin, "QueueMessageGroupId") pluginTagAttribute := output.FLBPluginConfigKey(plugin, "PluginTagAttribute") - proxyURL := output.FLBPluginConfigKey(plugin, "ProxyURL") + proxyURL := output.FLBPluginConfigKey(plugin, "ProxyUrl") writeInfoLog(fmt.Sprintf("QueueUrl is: %s", queueURL)) writeInfoLog(fmt.Sprintf("QueueRegion is: %s", queueRegion))