Skip to content

Commit

Permalink
Merge pull request #7 from PayU/unable-proxy-usage
Browse files Browse the repository at this point in the history
Unable proxy usage
  • Loading branch information
shyimo authored Jul 4, 2020
2 parents 071bc39 + 843e514 commit d316da7
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 20 deletions.
15 changes: 9 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ FluntBit custom output plugin which allows sending messages to AWS-SQS.

## Configuration Parameters

| Configuration Key Name | Description | Mandatory |
| ---------------------- | ------------------------------------- | --------- |
| QueueUrl | the queue url in your aws account | yes |
| QueueRegion | the queue region in your aws account | yes |
| PluginTagAttribute | attribute name of the message tag | no |
| QueueMessageGroupId | the group id required for fifo queues | fifo-only |
| Configuration Key Name | Description | Mandatory |
| ---------------------- | -------------------------------------------------------- | --------- |
| QueueUrl | the queue url in your aws account | yes |
| QueueRegion | the queue region in your aws account | yes |
| PluginTagAttribute | attribute name of the message tag | no |
| QueueMessageGroupId | the group id required for fifo queues | fifo-only |
| ProxyUrl | the proxy address between fluentbit and sqs (if exists) | no |

```conf
[SERVICE]
Expand Down Expand Up @@ -73,3 +74,5 @@ More information about the usage and installation of golang plugins can be found
2) Shared credentials file.

3) If your application is running on an Amazon EC2 instance, IAM role for Amazon EC2. The IAM role should have full access to your SQS and in addition, it should add the following KMS permissions: `kms:GenerateDataKey*, kms:Get*, kms:Decrypt*`

- The plugin uses specific environment variable for log level: `SQS_OUT_LOG_LEVEL`. Supported values are: `debug`, `info` or `error`
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module github.com/PayU/fluentBit-sqs-plugin

go 1.14
86 changes: 72 additions & 14 deletions out_sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"time"
"unsafe"
"os"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
Expand All @@ -15,9 +16,17 @@ import (
)
import (
"encoding/json"
"net/http"
"net/url"
"strings"
)

// integer representation for this plugin log level
// 0 - debug
// 1 - info
// 2 - error
var sqsOutLogLevel int

// MessageCounter is used for count the current SQS Batch messages
var MessageCounter int = 0

Expand All @@ -29,10 +38,12 @@ type sqsConfig struct {
queueMessageGroupID string
mySQS *sqs.SQS
pluginTagAttribute string
proxyURL string
}

//export FLBPluginRegister
func FLBPluginRegister(def unsafe.Pointer) int {
setLogLevel()
return output.FLBPluginRegister(def, "sqs", "aws sqs output plugin")
}

Expand All @@ -42,10 +53,13 @@ func FLBPluginInit(plugin unsafe.Pointer) int {
queueRegion := output.FLBPluginConfigKey(plugin, "QueueRegion")
queueMessageGroupID := output.FLBPluginConfigKey(plugin, "QueueMessageGroupId")
pluginTagAttribute := output.FLBPluginConfigKey(plugin, "PluginTagAttribute")
proxyURL := output.FLBPluginConfigKey(plugin, "ProxyUrl")

writeInfoLog(fmt.Sprintf("QueueUrl is: %s", queueURL))
writeInfoLog(fmt.Sprintf("QueueRegion is: %s", queueRegion))
writeInfoLog(fmt.Sprintf("QueueMessageGroupId is: %s", queueMessageGroupID))
writeInfoLog(fmt.Sprintf("pluginTagAttribute is: %s", pluginTagAttribute))
writeInfoLog(fmt.Sprintf("ProxyUrl is: %s", proxyURL))

if queueURL == "" {
writeErrorLog(errors.New("QueueUrl configuration key is mandatory"))
Expand All @@ -68,25 +82,39 @@ func FLBPluginInit(plugin unsafe.Pointer) int {
awsCredentials := credentials.NewEnvCredentials()
var myAWSSession *session.Session
var sessionError error
var awsConfig *aws.Config

// Retrieve the credentials value
_, credError := awsCredentials.Get()
if credError != nil {
writeInfoLog("unable to find aws credentials from environment variables..using credentials chain")
myAWSSession, sessionError = session.NewSession(&aws.Config{
awsConfig = &aws.Config{
Region: aws.String(queueRegion),
CredentialsChainVerboseErrors: aws.Bool(true),
})
}
} else {
// environment variables credentials was found
writeInfoLog("environment variables credentials where found")
myAWSSession, sessionError = session.NewSession(&aws.Config{
awsConfig = &aws.Config{
Region: aws.String(queueRegion),
CredentialsChainVerboseErrors: aws.Bool(true),
Credentials: awsCredentials,
})
}
}

// if proxy
if proxyURL != "" {
writeInfoLog("set http client struct on aws configuration since proxy url has been found")
awsConfig.HTTPClient = &http.Client{
Transport: &http.Transport{
Proxy: func(*http.Request) (*url.URL, error) {
return url.Parse(proxyURL) // Or your own implementation that decides a proxy based on the URL in the request
},
},
}
}

// create the session
myAWSSession, sessionError = session.NewSession(awsConfig)
if sessionError != nil {
writeErrorLog(sessionError)
return output.FLB_ERROR
Expand Down Expand Up @@ -129,6 +157,8 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int
break
}

writeDebugLog(fmt.Sprintf("got new record from input. record length is: %d", len(record)))

if len(record) == 0 {
writeInfoLog("got empty record from input. skipping it")
continue
Expand All @@ -150,7 +180,7 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int
recordString, err := createRecordString(timeStamp, tagStr, record)

if err != nil {
fmt.Printf("%v\n", err)
writeErrorLog(err)
// DO NOT RETURN HERE becase one message has an error when json is
// generated, but a retry would fetch ALL messages again. instead an
// error should be printed to console
Expand All @@ -159,6 +189,9 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int

MessageCounter++

writeDebugLog(fmt.Sprintf("record string: %s", recordString))
writeDebugLog(fmt.Sprintf("message counter: %d", MessageCounter))

sqsRecord = &sqs.SendMessageBatchRequestEntry{
Id: aws.String(fmt.Sprintf("MessageNumber-%d", MessageCounter)),
MessageBody: aws.String(recordString),
Expand All @@ -182,15 +215,14 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int
if MessageCounter == 10 {
err := sendBatchToSqs(sqsConf, SqsRecords)

SqsRecords = nil
MessageCounter = 0

if err != nil {
writeErrorLog(err)
return output.FLB_ERROR
}

SqsRecords = nil
MessageCounter = 0
}

}

return output.FLB_OK
Expand Down Expand Up @@ -242,14 +274,40 @@ func createRecordString(timestamp time.Time, tag string, record map[interface{}]
return string(js), nil
}

func writeDebugLog(message string) {
if sqsOutLogLevel == 0 {
currentTime := time.Now()
fmt.Printf("[%s] [ debug] [sqs-out] %s\n", currentTime.Format("2006.01.02 15:04:05"), message)
}
}

func writeInfoLog(message string) {
currentTime := time.Now()
fmt.Printf("[%s][info][sqs-out] %s\n", currentTime.Format("2006.01.02 15:04:05"), message)
if sqsOutLogLevel <= 1 {
currentTime := time.Now()
fmt.Printf("[%s] [ info] [sqs-out] %s\n", currentTime.Format("2006.01.02 15:04:05"), message)
}
}

func writeErrorLog(err error) {
currentTime := time.Now()
fmt.Printf("[%s][error][sqs-out] %v\n", currentTime.Format("2006.01.02 15:04:05"), err)
if sqsOutLogLevel <= 2 {
currentTime := time.Now()
fmt.Printf("[%s] [ error] [sqs-out] %v\n", currentTime.Format("2006.01.02 15:04:05"), err)
}
}

func setLogLevel() {
logEnv := os.Getenv("SQS_OUT_LOG_LEVEL")

switch strings.ToLower(logEnv) {
case "debug":
sqsOutLogLevel = 0
case "info":
sqsOutLogLevel = 1
case "error":
sqsOutLogLevel = 2
default:
sqsOutLogLevel = 1 // info
}
}

func main() {
Expand Down

0 comments on commit d316da7

Please sign in to comment.