Skip to content

Commit

Permalink
Merge pull request aws#98 from zhonghui12/truncate-largelog
Browse files Browse the repository at this point in the history
Truncate Large Log Events
  • Loading branch information
zhonghui12 authored Sep 25, 2020
2 parents c9b1b5b + 5f922a4 commit 34502d1
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 15 deletions.
27 changes: 12 additions & 15 deletions cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"sort"
"strings"
"time"
"unicode/utf8"

"github.com/aws/amazon-kinesis-firehose-for-fluent-bit/plugins"
"github.com/aws/aws-sdk-go/aws"
Expand All @@ -39,7 +38,9 @@ const (
perEventBytes = 26
maximumBytesPerPut = 1048576
maximumLogEventsPerPut = 10000
maximumBytesPerEvent = 1024 * 256 //256KB
maximumTimeSpanPerPut = time.Hour * 24
truncatedSuffix = "[Truncated...]"
)

const (
Expand Down Expand Up @@ -509,6 +510,15 @@ func (output *OutputPlugin) processRecord(e *Event) ([]byte, error) {
return nil, err
}

// append newline
data = append(data, []byte("\n")...)

if (len(data) + perEventBytes) > maximumBytesPerEvent {
logrus.Warnf("[cloudwatch %d] Found record with %d bytes, truncating to 256KB, logGroup=%s, stream=%s\n", output.PluginInstanceID, len(data)+perEventBytes, output.logGroupName, output.logStreamName)
data = data[:(maximumBytesPerEvent - len(truncatedSuffix) - perEventBytes)]
data = append(data, []byte(truncatedSuffix)...)
}

return data, nil
}

Expand Down Expand Up @@ -603,21 +613,8 @@ func (output *OutputPlugin) processRejectedEventsInfo(response *cloudwatchlogs.P
}
}

// effectiveLen counts the effective number of bytes in the string, after
// UTF-8 normalization. UTF-8 normalization includes replacing bytes that do
// not constitute valid UTF-8 encoded Unicode codepoints with the Unicode
// replacement codepoint U+FFFD (a 3-byte UTF-8 sequence, represented in Go as
// utf8.RuneError)
func effectiveLen(line string) int {
effectiveBytes := 0
for _, rune := range line {
effectiveBytes += utf8.RuneLen(rune)
}
return effectiveBytes
}

func cloudwatchLen(event string) int {
return effectiveLen(event) + perEventBytes
return len(event) + perEventBytes
}

func (stream *logStream) logBatchSpan(timestamp time.Time) time.Duration {
Expand Down
33 changes: 33 additions & 0 deletions cloudwatch/cloudwatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,39 @@ func TestAddEvent(t *testing.T) {
assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected return code to FLB_OK")
}

func TestTruncateLargeLogEvent(t *testing.T) {
ctrl := gomock.NewController(t)
mockCloudWatch := mock_cloudwatch.NewMockLogsClient(ctrl)

mockCloudWatch.EXPECT().CreateLogStream(gomock.Any()).Do(func(input *cloudwatchlogs.CreateLogStreamInput) {
assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match")
assert.Equal(t, aws.StringValue(input.LogStreamName), testLogStreamPrefix+testTag, "Expected log group name to match")
}).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil)

output := OutputPlugin{
logGroupName: testLogGroup,
logStreamPrefix: testLogStreamPrefix,
client: mockCloudWatch,
timer: setupTimeout(),
streams: make(map[string]*logStream),
groups: map[string]struct{}{testLogGroup: {}},
}

record := map[interface{}]interface{}{
"somekey": make([]byte, 256*1024+100),
}

retCode := output.AddEvent(&Event{TS: time.Now(), Tag: testTag, Record: record})
actualData, err := output.processRecord(&Event{TS: time.Now(), Tag: testTag, Record: record})

if err != nil {
logrus.Debugf("[cloudwatch %d] Failed to process record: %v\n", output.PluginInstanceID, record)
}

assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected return code to be FLB_OK")
assert.Len(t, actualData, 256*1024-26, "Expected length is 256*1024-26")
}

func TestAddEventCreateLogGroup(t *testing.T) {
ctrl := gomock.NewController(t)
mockCloudWatch := mock_cloudwatch.NewMockLogsClient(ctrl)
Expand Down

0 comments on commit 34502d1

Please sign in to comment.