diff --git a/cloudwatch/cloudwatch.go b/cloudwatch/cloudwatch.go index c7d2c27..bef105a 100644 --- a/cloudwatch/cloudwatch.go +++ b/cloudwatch/cloudwatch.go @@ -31,6 +31,8 @@ import ( fluentbit "github.com/fluent/fluent-bit-go/output" jsoniter "github.com/json-iterator/go" "github.com/sirupsen/logrus" + "github.com/valyala/bytebufferpool" + "github.com/valyala/fasttemplate" ) const ( @@ -41,6 +43,7 @@ const ( maximumBytesPerEvent = 1024 * 256 //256KB maximumTimeSpanPerPut = time.Hour * 24 truncatedSuffix = "[Truncated...]" + maxGroupStreamLength = 512 ) const ( @@ -91,11 +94,16 @@ func (stream *logStream) updateExpiration() { stream.expiration = time.Now().Add(logStreamInactivityTimeout) } +type fastTemplate struct { + String string + *fasttemplate.Template +} + // OutputPlugin is the CloudWatch Logs Fluent Bit output plugin type OutputPlugin struct { - logGroupName string + logGroupName *fastTemplate logStreamPrefix string - logStreamName string + logStreamName *fastTemplate logKey string client LogsClient streams map[string]*logStream @@ -106,6 +114,7 @@ type OutputPlugin struct { logGroupTags map[string]*string logGroupRetention int64 autoCreateGroup bool + bufferPool bytebufferpool.Pool } // OutputPluginConfig is the input information used by NewOutputPlugin to create a new OutputPlugin @@ -155,15 +164,24 @@ func NewOutputPlugin(config OutputPluginConfig) (*OutputPlugin, error) { logrus.Errorf("[cloudwatch %d] timeout threshold reached: Failed to send logs for %s\n", config.PluginInstanceID, d.String()) logrus.Fatalf("[cloudwatch %d] Quitting Fluent Bit", config.PluginInstanceID) // exit the plugin and kill Fluent Bit }) + if err != nil { + return nil, err + } + + logGroupTemplate, err := newTemplate(config.LogGroupName) + if err != nil { + return nil, err + } + logStreamTemplate, err := newTemplate(config.LogStreamName) if err != nil { return nil, err } return &OutputPlugin{ - logGroupName: config.LogGroupName, + logGroupName: logGroupTemplate, + logStreamName: logStreamTemplate, logStreamPrefix: config.LogStreamPrefix, - logStreamName: config.LogStreamName, logKey: config.LogKey, client: client, timer: timer, @@ -392,29 +410,44 @@ func (output *OutputPlugin) describeLogStreams(e *Event, nextToken *string) (*cl func (output *OutputPlugin) setGroupStreamNames(e *Event) { // This happens here to avoid running Split more than once per log Event. logTagSplit := strings.SplitN(e.Tag, ".", 10) + s := &sanitizer{sanitize: sanitizeGroup, buf: output.bufferPool.Get()} - var err error - if e.group, err = parseDataMapTags(e, logTagSplit, output.logGroupName); err != nil { - logrus.Errorf("[cloudwatch %d] parsing template: '%s': %v", output.PluginInstanceID, output.logGroupName, err) + if _, err := parseDataMapTags(e, logTagSplit, output.logGroupName, s); err != nil { + logrus.Errorf("[cloudwatch %d] parsing log_group_name template: %v", output.PluginInstanceID, err) + } + + if e.group = s.buf.String(); len(e.group) == 0 { + e.group = output.logGroupName.String } - if e.group == "" { - e.group = output.logGroupName + if len(e.group) > maxGroupStreamLength { + e.group = e.group[:maxGroupStreamLength] } + s.buf.Reset() + if output.logStreamPrefix != "" { e.stream = output.logStreamPrefix + e.Tag + output.bufferPool.Put(s.buf) + return } - if e.stream, err = parseDataMapTags(e, logTagSplit, output.logStreamName); err != nil { - // If a user gets this error, they need to fix their log_stream_name template to make it go away. Simple. - logrus.Errorf("[cloudwatch %d] parsing template: '%s': %v", output.PluginInstanceID, output.logStreamName, err) + s.sanitize = sanitizeStream + + if _, err := parseDataMapTags(e, logTagSplit, output.logStreamName, s); err != nil { + logrus.Errorf("[cloudwatch %d] parsing log_stream_name template: %v", output.PluginInstanceID, err) + } + + if e.stream = s.buf.String(); len(e.stream) == 0 { + e.stream = output.logStreamName.String } - if e.stream == "" { - e.stream = output.logStreamName + if len(e.stream) > maxGroupStreamLength { + e.stream = e.stream[:maxGroupStreamLength] } + + output.bufferPool.Put(s.buf) } func (output *OutputPlugin) createStream(e *Event) (*logStream, error) { @@ -521,7 +554,8 @@ func (output *OutputPlugin) processRecord(e *Event) ([]byte, error) { data = append(data, []byte("\n")...) if (len(data) + perEventBytes) > maximumBytesPerEvent { - logrus.Warnf("[cloudwatch %d] Found record with %d bytes, truncating to 256KB, logGroup=%s, stream=%s\n", output.PluginInstanceID, len(data)+perEventBytes, output.logGroupName, output.logStreamName) + logrus.Warnf("[cloudwatch %d] Found record with %d bytes, truncating to 256KB, logGroup=%s, stream=%s\n", + output.PluginInstanceID, len(data)+perEventBytes, e.group, e.stream) data = data[:(maximumBytesPerEvent - len(truncatedSuffix) - perEventBytes)] data = append(data, []byte(truncatedSuffix)...) } diff --git a/cloudwatch/cloudwatch_test.go b/cloudwatch/cloudwatch_test.go index ca1fc55..0ef82b6 100644 --- a/cloudwatch/cloudwatch_test.go +++ b/cloudwatch/cloudwatch_test.go @@ -41,6 +41,12 @@ const ( testSequenceToken = "sequence-token" ) +// helper function to make a log stream/log group name template from a string. +func testTemplate(template string) *fastTemplate { + t, _ := newTemplate(template) + return t +} + func TestAddEvent(t *testing.T) { ctrl := gomock.NewController(t) mockCloudWatch := mock_cloudwatch.NewMockLogsClient(ctrl) @@ -51,7 +57,7 @@ func TestAddEvent(t *testing.T) { }).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil) output := OutputPlugin{ - logGroupName: testLogGroup, + logGroupName: testTemplate(testLogGroup), logStreamPrefix: testLogStreamPrefix, client: mockCloudWatch, timer: setupTimeout(), @@ -77,7 +83,7 @@ func TestTruncateLargeLogEvent(t *testing.T) { }).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil) output := OutputPlugin{ - logGroupName: testLogGroup, + logGroupName: testTemplate(testLogGroup), logStreamPrefix: testLogStreamPrefix, client: mockCloudWatch, timer: setupTimeout(), @@ -114,7 +120,7 @@ func TestAddEventCreateLogGroup(t *testing.T) { ) output := OutputPlugin{ - logGroupName: testLogGroup, + logGroupName: testTemplate(testLogGroup), logStreamPrefix: testLogStreamPrefix, client: mockCloudWatch, timer: setupTimeout(), @@ -169,7 +175,7 @@ func TestAddEventExistingStream(t *testing.T) { ) output := OutputPlugin{ - logGroupName: testLogGroup, + logGroupName: testTemplate(testLogGroup), logStreamPrefix: testLogStreamPrefix, client: mockCloudWatch, timer: setupTimeout(), @@ -220,7 +226,7 @@ func TestAddEventExistingStreamNotFound(t *testing.T) { ) output := OutputPlugin{ - logGroupName: testLogGroup, + logGroupName: testTemplate(testLogGroup), logStreamPrefix: testLogStreamPrefix, client: mockCloudWatch, timer: setupTimeout(), @@ -242,7 +248,7 @@ func TestAddEventEmptyRecord(t *testing.T) { mockCloudWatch := mock_cloudwatch.NewMockLogsClient(ctrl) output := OutputPlugin{ - logGroupName: testLogGroup, + logGroupName: testTemplate(testLogGroup), logStreamPrefix: testLogStreamPrefix, client: mockCloudWatch, timer: setupTimeout(), @@ -278,7 +284,7 @@ func TestAddEventAndFlush(t *testing.T) { ) output := OutputPlugin{ - logGroupName: testLogGroup, + logGroupName: testTemplate(testLogGroup), logStreamPrefix: testLogStreamPrefix, client: mockCloudWatch, timer: setupTimeout(), @@ -300,7 +306,7 @@ func TestPutLogEvents(t *testing.T) { mockCloudWatch := mock_cloudwatch.NewMockLogsClient(ctrl) output := OutputPlugin{ - logGroupName: testLogGroup, + logGroupName: testTemplate(testLogGroup), logStreamPrefix: testLogStreamPrefix, client: mockCloudWatch, timer: setupTimeout(), @@ -327,30 +333,50 @@ func TestSetGroupStreamNames(t *testing.T) { e := &Event{Tag: "syslog.0", Record: record} // Test against non-template name. - output := OutputPlugin{logStreamName: "/aws/ecs/test-stream-name"} + output := OutputPlugin{ + logStreamName: testTemplate("/aws/ecs/test-stream-name"), + logGroupName: testTemplate(""), + } output.setGroupStreamNames(e) - assert.Equal(t, output.logStreamName, e.stream, + assert.Equal(t, "/aws/ecs/test-stream-name", e.stream, "The provided stream name must be returned exactly, without modifications.") // Test against a simple log stream prefix. - output = OutputPlugin{logStreamPrefix: "/aws/ecs/test-stream-prefix/"} + output.logStreamPrefix = "/aws/ecs/test-stream-prefix/" output.setGroupStreamNames(e) assert.Equal(t, output.logStreamPrefix+"syslog.0", e.stream, "The provided stream prefix must be prefixed to the provided tag name.") // Test replacing items from template variables. - output = OutputPlugin{logStreamName: "/aws/ecs/$(tag[0])/$(tag[1])/$(details['region'])/$(details['az'])/$(ident)"} + output.logStreamPrefix = "" + output.logStreamName = testTemplate("/aws/ecs/$(tag[0])/$(tag[1])/$(details['region'])/$(details['az'])/$(ident)") output.setGroupStreamNames(e) assert.Equal(t, "/aws/ecs/syslog/0/us-west-2/a/cron", e.stream, "The stream name template was not correctly parsed.") - // Test bad template } missing. Just prints an error and returns the input value. - output = OutputPlugin{logStreamName: "/aws/ecs/$(tag"} - output.setGroupStreamNames(e) - assert.Equal(t, "/aws/ecs/$(tag", e.stream, - "The provided stream name must match when parsing fails.") // Test another bad template ] missing. - output = OutputPlugin{logStreamName: "/aws/ecs/$(details['region')"} + output.logStreamName = testTemplate("/aws/ecs/$(details['region')") output.setGroupStreamNames(e) - assert.Equal(t, "/aws/ecs/$(details['region')", e.stream, + assert.Equal(t, "/aws/ecs/['region'", e.stream, "The provided stream name must match when parsing fails.") + + // Test that log stream and log group names get truncated to the maximum allowed. + b := make([]byte, maxGroupStreamLength*2) + for i := range b { // make a string twice the max + b[i] = '_' + } + + ident := string(b) + assert.True(t, len(ident) > maxGroupStreamLength, "test string creation failed") + + e.Record = map[interface{}]interface{}{"ident": ident} // set the long string into our record. + output.logStreamName = testTemplate("/aws/ecs/$(ident)") + output.logGroupName = testTemplate("/aws/ecs/$(ident)") + + output.setGroupStreamNames(e) + assert.Equal(t, maxGroupStreamLength, len(e.stream), "the stream name should be truncated to the maximum size") + assert.Equal(t, maxGroupStreamLength, len(e.group), "the group name should be truncated to the maximum size") + assert.Equal(t, "/aws/ecs/"+string(b[:maxGroupStreamLength-len("/aws/ecs/")]), + e.stream, "the stream name was incorrectly truncated") + assert.Equal(t, "/aws/ecs/"+string(b[:maxGroupStreamLength-len("/aws/ecs/")]), + e.group, "the group name was incorrectly truncated") } func TestAddEventAndFlushDataAlreadyAcceptedException(t *testing.T) { @@ -369,7 +395,7 @@ func TestAddEventAndFlushDataAlreadyAcceptedException(t *testing.T) { ) output := OutputPlugin{ - logGroupName: testLogGroup, + logGroupName: testTemplate(testLogGroup), logStreamPrefix: testLogStreamPrefix, client: mockCloudWatch, timer: setupTimeout(), @@ -409,7 +435,7 @@ func TestAddEventAndFlushDataInvalidSequenceTokenException(t *testing.T) { ) output := OutputPlugin{ - logGroupName: testLogGroup, + logGroupName: testTemplate(testLogGroup), logStreamPrefix: testLogStreamPrefix, client: mockCloudWatch, timer: setupTimeout(), @@ -525,7 +551,7 @@ func setupLimitTestOutput(t *testing.T, times int) OutputPlugin { ) return OutputPlugin{ - logGroupName: testLogGroup, + logGroupName: testTemplate(testLogGroup), logStreamPrefix: testLogStreamPrefix, client: mockCloudWatch, timer: setupTimeout(), diff --git a/cloudwatch/helpers.go b/cloudwatch/helpers.go index 358c0d8..9992e82 100644 --- a/cloudwatch/helpers.go +++ b/cloudwatch/helpers.go @@ -5,9 +5,16 @@ import ( "strconv" "strings" + "github.com/valyala/bytebufferpool" "github.com/valyala/fasttemplate" ) +// newTemplate is the only place you'll find the template start and end tags. +func newTemplate(template string) (*fastTemplate, error) { + t, err := fasttemplate.NewTemplate(template, "$(", ")") + return &fastTemplate{Template: t, String: template}, err +} + // tagKeysToMap converts a raw string into a go map. // This is used by input data to create AWS tags applied to newly-created log groups. // @@ -45,13 +52,8 @@ func tagKeysToMap(tags string) map[string]*string { // example keys := "['level1']['level2']['level3']" // This is called by parseDataMapTags any time a nested value is found in a log Event. // This procedure checks if any of the nested values match variable identifiers in the logStream or logGroups. -func parseKeysTemplate(data map[interface{}]interface{}, keys string) (string, error) { - t, err := fasttemplate.NewTemplate(keys, "['", "']") - if err != nil { - return "", err - } - - return t.ExecuteFuncStringWithErr(func(w io.Writer, tag string) (int, error) { +func parseKeysTemplate(data map[interface{}]interface{}, keys string, w io.Writer) (int64, error) { + return fasttemplate.ExecuteFunc(keys, "['", "']", w, func(w io.Writer, tag string) (int, error) { switch val := data[tag].(type) { case []byte: return w.Write(val) @@ -70,13 +72,8 @@ func parseKeysTemplate(data map[interface{}]interface{}, keys string) (string, e // from an interface{} map (expected to contain strings or more interface{} maps). // This runs once for every log line. // Used to fill in any template variables that may exist in the logStream or logGroup names. -func parseDataMapTags(e *Event, logTags []string, template string) (string, error) { - t, err := fasttemplate.NewTemplate(template, "$(", ")") - if err != nil { - return "", err - } - - return t.ExecuteFuncStringWithErr(func(w io.Writer, tag string) (int, error) { +func parseDataMapTags(e *Event, logTags []string, t *fastTemplate, w io.Writer) (int64, error) { + return t.ExecuteFunc(w, func(w io.Writer, tag string) (int, error) { v := strings.Index(tag, "[") if v == -1 { v = len(tag) @@ -101,12 +98,9 @@ func parseDataMapTags(e *Event, logTags []string, template string) (string, erro case string: return w.Write([]byte(val)) case map[interface{}]interface{}: - keyVal, err := parseKeysTemplate(val, tag[v:]) - if err != nil { - return 0, err - } + i, err := parseKeysTemplate(val, tag[v:], w) - return w.Write([]byte(keyVal)) + return int(i), err case []byte: // we should never land here because the interface{} map should have already been converted to strings. return w.Write(val) @@ -115,3 +109,47 @@ func parseDataMapTags(e *Event, logTags []string, template string) (string, erro } }) } + +// sanitizer implements io.Writer for fasttemplate usage. +// Instead of just writing bytes to a buffer, sanitize them first. +type sanitizer struct { + sanitize func(b []byte) []byte + buf *bytebufferpool.ByteBuffer +} + +// Write completes the io.Writer implementation. +func (s *sanitizer) Write(b []byte) (int, error) { + return s.buf.Write(s.sanitize(b)) +} + +// sanitizeGroup removes special characters from the log group names bytes. +// https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-logs-loggroup.html +func sanitizeGroup(b []byte) []byte { + for i, r := range b { + // 45-47 = / . - + // 48-57 = 0-9 + // 65-90 = A-Z + // 95 = _ + // 97-122 = a-z + if r == 95 || (r > 44 && r < 58) || + (r > 64 && r < 91) || (r > 96 && r < 123) { + continue + } + + b[i] = '.' + } + + return b +} + +// sanitizeStream removes : and * from the log stream bytes. +// https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-logs-logstream.html +func sanitizeStream(b []byte) []byte { + for i, r := range b { + if r == '*' || r == ':' { + b[i] = '.' + } + } + + return b +} diff --git a/cloudwatch/helpers_test.go b/cloudwatch/helpers_test.go index 3566a37..71348ae 100644 --- a/cloudwatch/helpers_test.go +++ b/cloudwatch/helpers_test.go @@ -4,9 +4,12 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/valyala/bytebufferpool" ) func TestTagKeysToMap(t *testing.T) { + t.Parallel() + // Testable values. Purposely "messed up" - they should all parse out OK. values := " key1 =value , key2=value2, key3= value3 ,key4=, key5 = v5,,key7==value7," + " k8, k9,key1=value1,space key = space value" @@ -20,6 +23,8 @@ func TestTagKeysToMap(t *testing.T) { } func TestParseDataMapTags(t *testing.T) { + t.Parallel() + template := "$(missing).$(tag).$(pam['item2']['subitem2']['more']).$(pam['item']).$(pam['item2'])." + "$(pam['item2']['subitem'])-$(pam['item2']['subitem55'])-$(pam['item2']['subitem2']['more'])-$(tag[1])-$(tag[6])" data := map[interface{}]interface{}{ @@ -29,8 +34,46 @@ func TestParseDataMapTags(t *testing.T) { "subitem2": map[interface{}]interface{}{"more": "final"}}, }, } - s, err := parseDataMapTags(&Event{Record: data, Tag: "syslog.0"}, []string{"syslog", "0"}, template) + + s := &sanitizer{buf: bytebufferpool.Get(), sanitize: sanitizeGroup} + defer bytebufferpool.Put(s.buf) + + _, err := parseDataMapTags(&Event{Record: data, Tag: "syslog.0"}, []string{"syslog", "0"}, testTemplate(template), s) assert.Nil(t, err) - assert.Equal(t, "missing.syslog.0.final.soup..SubIt3m-subitem55-final-0-tag6", s, "Rendered string is incorrect.") + assert.Equal(t, "missing.syslog.0.final.soup..SubIt3m-subitem55-final-0-tag6", s.buf.String(), "Rendered string is incorrect.") +} + +func TestSanitizeGroup(t *testing.T) { + t.Parallel() + + tests := map[string]string{ // "send": "expect", + "this.is.a.log.group.name": "this.is.a.log.group.name", + "1234567890abcdefghijklmnopqrstuvwxyz": "1234567890abcdefghijklmnopqrstuvwxyz", + "ABCDEFGHIJKLMNOPQRSTUVWXYZ": "ABCDEFGHIJKLMNOPQRSTUVWXYZ", + `!@#$%^&*()_+}{][=-';":/.?>,<~"']}`: ".........._......-..../..........", + "": "", + } + + for send, expect := range tests { + actual := sanitizeGroup([]byte(send)) + assert.Equal(t, expect, string(actual), "the wrong characters were modified in sanitizeGroup") + } +} + +func TestSanitizeStream(t *testing.T) { + t.Parallel() + + tests := map[string]string{ // "send": "expect", + "this.is.a.log.group.name": "this.is.a.log.group.name", + "1234567890abcdefghijklmnopqrstuvwxyz": "1234567890abcdefghijklmnopqrstuvwxyz", + "ABCDEFGHIJKLMNOPQRSTUVWXYZ": "ABCDEFGHIJKLMNOPQRSTUVWXYZ", + `!@#$%^&*()_+}{][=-';":/.?>,<~"']}`: `!@#$%^&.()_+}{][=-';"./.?>,<~"']}`, + "": "", + } + + for send, expect := range tests { + actual := sanitizeStream([]byte(send)) + assert.Equal(t, expect, string(actual), "the wrong characters were modified in sanitizeStream") + } } diff --git a/fluent-bit-cloudwatch.go b/fluent-bit-cloudwatch.go index e7b7d50..34ded20 100644 --- a/fluent-bit-cloudwatch.go +++ b/fluent-bit-cloudwatch.go @@ -69,7 +69,7 @@ func getConfiguration(ctx unsafe.Pointer, pluginID int) cloudwatch.OutputPluginC config.PluginInstanceID = pluginID config.LogGroupName = output.FLBPluginConfigKey(ctx, "log_group_name") - logrus.Infof("[cloudwatch %d] plugin parameter log_group = '%s'", pluginID, config.LogGroupName) + logrus.Infof("[cloudwatch %d] plugin parameter log_group_name = '%s'", pluginID, config.LogGroupName) config.LogStreamPrefix = output.FLBPluginConfigKey(ctx, "log_stream_prefix") logrus.Infof("[cloudwatch %d] plugin parameter log_stream_prefix = '%s'", pluginID, config.LogStreamPrefix) diff --git a/go.mod b/go.mod index a4b8cbd..2263255 100644 --- a/go.mod +++ b/go.mod @@ -10,5 +10,6 @@ require ( github.com/json-iterator/go v1.1.10 github.com/sirupsen/logrus v1.6.0 github.com/stretchr/testify v1.6.1 + github.com/valyala/bytebufferpool v1.0.0 github.com/valyala/fasttemplate v1.2.1 )