diff --git a/cloudwatch/cloudwatch.go b/cloudwatch/cloudwatch.go index 426c5db..7338154 100644 --- a/cloudwatch/cloudwatch.go +++ b/cloudwatch/cloudwatch.go @@ -18,7 +18,6 @@ import ( "sort" "strings" "time" - "unicode/utf8" "github.com/aws/amazon-kinesis-firehose-for-fluent-bit/plugins" "github.com/aws/aws-sdk-go/aws" @@ -39,7 +38,9 @@ const ( perEventBytes = 26 maximumBytesPerPut = 1048576 maximumLogEventsPerPut = 10000 + maximumBytesPerEvent = 1024 * 256 //256KB maximumTimeSpanPerPut = time.Hour * 24 + truncatedSuffix = "[Truncated...]" ) const ( @@ -509,6 +510,15 @@ func (output *OutputPlugin) processRecord(e *Event) ([]byte, error) { return nil, err } + // append newline + data = append(data, []byte("\n")...) + + if (len(data) + perEventBytes) > maximumBytesPerEvent { + logrus.Warnf("[cloudwatch %d] Found record with %d bytes, truncating to 256KB, logGroup=%s, stream=%s\n", output.PluginInstanceID, len(data)+perEventBytes, output.logGroupName, output.logStreamName) + data = data[:(maximumBytesPerEvent - len(truncatedSuffix) - perEventBytes)] + data = append(data, []byte(truncatedSuffix)...) + } + return data, nil } @@ -603,21 +613,8 @@ func (output *OutputPlugin) processRejectedEventsInfo(response *cloudwatchlogs.P } } -// effectiveLen counts the effective number of bytes in the string, after -// UTF-8 normalization. UTF-8 normalization includes replacing bytes that do -// not constitute valid UTF-8 encoded Unicode codepoints with the Unicode -// replacement codepoint U+FFFD (a 3-byte UTF-8 sequence, represented in Go as -// utf8.RuneError) -func effectiveLen(line string) int { - effectiveBytes := 0 - for _, rune := range line { - effectiveBytes += utf8.RuneLen(rune) - } - return effectiveBytes -} - func cloudwatchLen(event string) int { - return effectiveLen(event) + perEventBytes + return len(event) + perEventBytes } func (stream *logStream) logBatchSpan(timestamp time.Time) time.Duration { diff --git a/cloudwatch/cloudwatch_test.go b/cloudwatch/cloudwatch_test.go index 63fa97f..28c2de5 100644 --- a/cloudwatch/cloudwatch_test.go +++ b/cloudwatch/cloudwatch_test.go @@ -67,6 +67,39 @@ func TestAddEvent(t *testing.T) { assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected return code to FLB_OK") } +func TestTruncateLargeLogEvent(t *testing.T) { + ctrl := gomock.NewController(t) + mockCloudWatch := mock_cloudwatch.NewMockLogsClient(ctrl) + + mockCloudWatch.EXPECT().CreateLogStream(gomock.Any()).Do(func(input *cloudwatchlogs.CreateLogStreamInput) { + assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") + assert.Equal(t, aws.StringValue(input.LogStreamName), testLogStreamPrefix+testTag, "Expected log group name to match") + }).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil) + + output := OutputPlugin{ + logGroupName: testLogGroup, + logStreamPrefix: testLogStreamPrefix, + client: mockCloudWatch, + timer: setupTimeout(), + streams: make(map[string]*logStream), + groups: map[string]struct{}{testLogGroup: {}}, + } + + record := map[interface{}]interface{}{ + "somekey": make([]byte, 256*1024+100), + } + + retCode := output.AddEvent(&Event{TS: time.Now(), Tag: testTag, Record: record}) + actualData, err := output.processRecord(&Event{TS: time.Now(), Tag: testTag, Record: record}) + + if err != nil { + logrus.Debugf("[cloudwatch %d] Failed to process record: %v\n", output.PluginInstanceID, record) + } + + assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected return code to be FLB_OK") + assert.Len(t, actualData, 256*1024-26, "Expected length is 256*1024-26") +} + func TestAddEventCreateLogGroup(t *testing.T) { ctrl := gomock.NewController(t) mockCloudWatch := mock_cloudwatch.NewMockLogsClient(ctrl)