diff --git a/README.md b/README.md index 02d4bc5..75c176a 100644 --- a/README.md +++ b/README.md @@ -4,12 +4,13 @@ FluntBit custom output plugin which allows sending messages to AWS-SQS. ## Configuration Parameters -| Configuration Key Name | Description | Mandatory | -| ---------------------- | ------------------------------------- | --------- | -| QueueUrl | the queue url in your aws account | yes | -| QueueRegion | the queue region in your aws account | yes | -| PluginTagAttribute | attribute name of the message tag | no | -| QueueMessageGroupId | the group id required for fifo queues | fifo-only | +| Configuration Key Name | Description | Mandatory | +| ---------------------- | -------------------------------------------------------- | --------- | +| QueueUrl | the queue url in your aws account | yes | +| QueueRegion | the queue region in your aws account | yes | +| PluginTagAttribute | attribute name of the message tag | no | +| QueueMessageGroupId | the group id required for fifo queues | fifo-only | +| ProxyUrl | the proxy address between fluentbit and sqs (if exists) | no | ```conf [SERVICE] @@ -73,3 +74,5 @@ More information about the usage and installation of golang plugins can be found 2) Shared credentials file. 3) If your application is running on an Amazon EC2 instance, IAM role for Amazon EC2. The IAM role should have full access to your SQS and in addition, it should add the following KMS permissions: `kms:GenerateDataKey*, kms:Get*, kms:Decrypt*` + +- The plugin uses specific environment variable for log level: `SQS_OUT_LOG_LEVEL`. Supported values are: `debug`, `info` or `error` diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..f3647af --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module github.com/PayU/fluentBit-sqs-plugin + +go 1.14 diff --git a/out_sqs.go b/out_sqs.go index e7545cb..33d9081 100644 --- a/out_sqs.go +++ b/out_sqs.go @@ -6,6 +6,7 @@ import ( "fmt" "time" "unsafe" + "os" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" @@ -15,9 +16,17 @@ import ( ) import ( "encoding/json" + "net/http" + "net/url" "strings" ) +// integer representation for this plugin log level +// 0 - debug +// 1 - info +// 2 - error +var sqsOutLogLevel int + // MessageCounter is used for count the current SQS Batch messages var MessageCounter int = 0 @@ -29,10 +38,12 @@ type sqsConfig struct { queueMessageGroupID string mySQS *sqs.SQS pluginTagAttribute string + proxyURL string } //export FLBPluginRegister func FLBPluginRegister(def unsafe.Pointer) int { + setLogLevel() return output.FLBPluginRegister(def, "sqs", "aws sqs output plugin") } @@ -42,10 +53,13 @@ func FLBPluginInit(plugin unsafe.Pointer) int { queueRegion := output.FLBPluginConfigKey(plugin, "QueueRegion") queueMessageGroupID := output.FLBPluginConfigKey(plugin, "QueueMessageGroupId") pluginTagAttribute := output.FLBPluginConfigKey(plugin, "PluginTagAttribute") + proxyURL := output.FLBPluginConfigKey(plugin, "ProxyUrl") + writeInfoLog(fmt.Sprintf("QueueUrl is: %s", queueURL)) writeInfoLog(fmt.Sprintf("QueueRegion is: %s", queueRegion)) writeInfoLog(fmt.Sprintf("QueueMessageGroupId is: %s", queueMessageGroupID)) writeInfoLog(fmt.Sprintf("pluginTagAttribute is: %s", pluginTagAttribute)) + writeInfoLog(fmt.Sprintf("ProxyUrl is: %s", proxyURL)) if queueURL == "" { writeErrorLog(errors.New("QueueUrl configuration key is mandatory")) @@ -68,25 +82,39 @@ func FLBPluginInit(plugin unsafe.Pointer) int { awsCredentials := credentials.NewEnvCredentials() var myAWSSession *session.Session var sessionError error + var awsConfig *aws.Config // Retrieve the credentials value _, credError := awsCredentials.Get() if credError != nil { writeInfoLog("unable to find aws credentials from environment variables..using credentials chain") - myAWSSession, sessionError = session.NewSession(&aws.Config{ + awsConfig = &aws.Config{ Region: aws.String(queueRegion), CredentialsChainVerboseErrors: aws.Bool(true), - }) + } } else { - // environment variables credentials was found writeInfoLog("environment variables credentials where found") - myAWSSession, sessionError = session.NewSession(&aws.Config{ + awsConfig = &aws.Config{ Region: aws.String(queueRegion), CredentialsChainVerboseErrors: aws.Bool(true), Credentials: awsCredentials, - }) + } } + // if proxy + if proxyURL != "" { + writeInfoLog("set http client struct on aws configuration since proxy url has been found") + awsConfig.HTTPClient = &http.Client{ + Transport: &http.Transport{ + Proxy: func(*http.Request) (*url.URL, error) { + return url.Parse(proxyURL) // Or your own implementation that decides a proxy based on the URL in the request + }, + }, + } + } + + // create the session + myAWSSession, sessionError = session.NewSession(awsConfig) if sessionError != nil { writeErrorLog(sessionError) return output.FLB_ERROR @@ -129,6 +157,8 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int break } + writeDebugLog(fmt.Sprintf("got new record from input. record length is: %d", len(record))) + if len(record) == 0 { writeInfoLog("got empty record from input. skipping it") continue @@ -150,7 +180,7 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int recordString, err := createRecordString(timeStamp, tagStr, record) if err != nil { - fmt.Printf("%v\n", err) + writeErrorLog(err) // DO NOT RETURN HERE becase one message has an error when json is // generated, but a retry would fetch ALL messages again. instead an // error should be printed to console @@ -159,6 +189,9 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int MessageCounter++ + writeDebugLog(fmt.Sprintf("record string: %s", recordString)) + writeDebugLog(fmt.Sprintf("message counter: %d", MessageCounter)) + sqsRecord = &sqs.SendMessageBatchRequestEntry{ Id: aws.String(fmt.Sprintf("MessageNumber-%d", MessageCounter)), MessageBody: aws.String(recordString), @@ -182,15 +215,14 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int if MessageCounter == 10 { err := sendBatchToSqs(sqsConf, SqsRecords) + SqsRecords = nil + MessageCounter = 0 + if err != nil { writeErrorLog(err) return output.FLB_ERROR } - - SqsRecords = nil - MessageCounter = 0 } - } return output.FLB_OK @@ -242,14 +274,40 @@ func createRecordString(timestamp time.Time, tag string, record map[interface{}] return string(js), nil } +func writeDebugLog(message string) { + if sqsOutLogLevel == 0 { + currentTime := time.Now() + fmt.Printf("[%s] [ debug] [sqs-out] %s\n", currentTime.Format("2006.01.02 15:04:05"), message) + } +} + func writeInfoLog(message string) { - currentTime := time.Now() - fmt.Printf("[%s][info][sqs-out] %s\n", currentTime.Format("2006.01.02 15:04:05"), message) + if sqsOutLogLevel <= 1 { + currentTime := time.Now() + fmt.Printf("[%s] [ info] [sqs-out] %s\n", currentTime.Format("2006.01.02 15:04:05"), message) + } } func writeErrorLog(err error) { - currentTime := time.Now() - fmt.Printf("[%s][error][sqs-out] %v\n", currentTime.Format("2006.01.02 15:04:05"), err) + if sqsOutLogLevel <= 2 { + currentTime := time.Now() + fmt.Printf("[%s] [ error] [sqs-out] %v\n", currentTime.Format("2006.01.02 15:04:05"), err) + } +} + +func setLogLevel() { + logEnv := os.Getenv("SQS_OUT_LOG_LEVEL") + + switch strings.ToLower(logEnv) { + case "debug": + sqsOutLogLevel = 0 + case "info": + sqsOutLogLevel = 1 + case "error": + sqsOutLogLevel = 2 + default: + sqsOutLogLevel = 1 // info + } } func main() {