Skip to content

Commit

Permalink
Merge pull request aws#104 from davidnewhall/dn2_sanitize
Browse files Browse the repository at this point in the history
Remove invalid characters in log stream and log group names.
  • Loading branch information
PettitWesley authored Oct 13, 2020
2 parents 8343c5e + 4721c76 commit e9d76ee
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 59 deletions.
64 changes: 49 additions & 15 deletions cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
fluentbit "github.com/fluent/fluent-bit-go/output"
jsoniter "github.com/json-iterator/go"
"github.com/sirupsen/logrus"
"github.com/valyala/bytebufferpool"
"github.com/valyala/fasttemplate"
)

const (
Expand All @@ -41,6 +43,7 @@ const (
maximumBytesPerEvent = 1024 * 256 //256KB
maximumTimeSpanPerPut = time.Hour * 24
truncatedSuffix = "[Truncated...]"
maxGroupStreamLength = 512
)

const (
Expand Down Expand Up @@ -91,11 +94,16 @@ func (stream *logStream) updateExpiration() {
stream.expiration = time.Now().Add(logStreamInactivityTimeout)
}

type fastTemplate struct {
String string
*fasttemplate.Template
}

// OutputPlugin is the CloudWatch Logs Fluent Bit output plugin
type OutputPlugin struct {
logGroupName string
logGroupName *fastTemplate
logStreamPrefix string
logStreamName string
logStreamName *fastTemplate
logKey string
client LogsClient
streams map[string]*logStream
Expand All @@ -106,6 +114,7 @@ type OutputPlugin struct {
logGroupTags map[string]*string
logGroupRetention int64
autoCreateGroup bool
bufferPool bytebufferpool.Pool
}

// OutputPluginConfig is the input information used by NewOutputPlugin to create a new OutputPlugin
Expand Down Expand Up @@ -155,15 +164,24 @@ func NewOutputPlugin(config OutputPluginConfig) (*OutputPlugin, error) {
logrus.Errorf("[cloudwatch %d] timeout threshold reached: Failed to send logs for %s\n", config.PluginInstanceID, d.String())
logrus.Fatalf("[cloudwatch %d] Quitting Fluent Bit", config.PluginInstanceID) // exit the plugin and kill Fluent Bit
})
if err != nil {
return nil, err
}

logGroupTemplate, err := newTemplate(config.LogGroupName)
if err != nil {
return nil, err
}

logStreamTemplate, err := newTemplate(config.LogStreamName)
if err != nil {
return nil, err
}

return &OutputPlugin{
logGroupName: config.LogGroupName,
logGroupName: logGroupTemplate,
logStreamName: logStreamTemplate,
logStreamPrefix: config.LogStreamPrefix,
logStreamName: config.LogStreamName,
logKey: config.LogKey,
client: client,
timer: timer,
Expand Down Expand Up @@ -392,29 +410,44 @@ func (output *OutputPlugin) describeLogStreams(e *Event, nextToken *string) (*cl
func (output *OutputPlugin) setGroupStreamNames(e *Event) {
// This happens here to avoid running Split more than once per log Event.
logTagSplit := strings.SplitN(e.Tag, ".", 10)
s := &sanitizer{sanitize: sanitizeGroup, buf: output.bufferPool.Get()}

var err error
if e.group, err = parseDataMapTags(e, logTagSplit, output.logGroupName); err != nil {
logrus.Errorf("[cloudwatch %d] parsing template: '%s': %v", output.PluginInstanceID, output.logGroupName, err)
if _, err := parseDataMapTags(e, logTagSplit, output.logGroupName, s); err != nil {
logrus.Errorf("[cloudwatch %d] parsing log_group_name template: %v", output.PluginInstanceID, err)
}

if e.group = s.buf.String(); len(e.group) == 0 {
e.group = output.logGroupName.String
}

if e.group == "" {
e.group = output.logGroupName
if len(e.group) > maxGroupStreamLength {
e.group = e.group[:maxGroupStreamLength]
}

s.buf.Reset()

if output.logStreamPrefix != "" {
e.stream = output.logStreamPrefix + e.Tag
output.bufferPool.Put(s.buf)

return
}

if e.stream, err = parseDataMapTags(e, logTagSplit, output.logStreamName); err != nil {
// If a user gets this error, they need to fix their log_stream_name template to make it go away. Simple.
logrus.Errorf("[cloudwatch %d] parsing template: '%s': %v", output.PluginInstanceID, output.logStreamName, err)
s.sanitize = sanitizeStream

if _, err := parseDataMapTags(e, logTagSplit, output.logStreamName, s); err != nil {
logrus.Errorf("[cloudwatch %d] parsing log_stream_name template: %v", output.PluginInstanceID, err)
}

if e.stream = s.buf.String(); len(e.stream) == 0 {
e.stream = output.logStreamName.String
}

if e.stream == "" {
e.stream = output.logStreamName
if len(e.stream) > maxGroupStreamLength {
e.stream = e.stream[:maxGroupStreamLength]
}

output.bufferPool.Put(s.buf)
}

func (output *OutputPlugin) createStream(e *Event) (*logStream, error) {
Expand Down Expand Up @@ -521,7 +554,8 @@ func (output *OutputPlugin) processRecord(e *Event) ([]byte, error) {
data = append(data, []byte("\n")...)

if (len(data) + perEventBytes) > maximumBytesPerEvent {
logrus.Warnf("[cloudwatch %d] Found record with %d bytes, truncating to 256KB, logGroup=%s, stream=%s\n", output.PluginInstanceID, len(data)+perEventBytes, output.logGroupName, output.logStreamName)
logrus.Warnf("[cloudwatch %d] Found record with %d bytes, truncating to 256KB, logGroup=%s, stream=%s\n",
output.PluginInstanceID, len(data)+perEventBytes, e.group, e.stream)
data = data[:(maximumBytesPerEvent - len(truncatedSuffix) - perEventBytes)]
data = append(data, []byte(truncatedSuffix)...)
}
Expand Down
70 changes: 48 additions & 22 deletions cloudwatch/cloudwatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ const (
testSequenceToken = "sequence-token"
)

// helper function to make a log stream/log group name template from a string.
func testTemplate(template string) *fastTemplate {
t, _ := newTemplate(template)
return t
}

func TestAddEvent(t *testing.T) {
ctrl := gomock.NewController(t)
mockCloudWatch := mock_cloudwatch.NewMockLogsClient(ctrl)
Expand All @@ -51,7 +57,7 @@ func TestAddEvent(t *testing.T) {
}).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil)

output := OutputPlugin{
logGroupName: testLogGroup,
logGroupName: testTemplate(testLogGroup),
logStreamPrefix: testLogStreamPrefix,
client: mockCloudWatch,
timer: setupTimeout(),
Expand All @@ -77,7 +83,7 @@ func TestTruncateLargeLogEvent(t *testing.T) {
}).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil)

output := OutputPlugin{
logGroupName: testLogGroup,
logGroupName: testTemplate(testLogGroup),
logStreamPrefix: testLogStreamPrefix,
client: mockCloudWatch,
timer: setupTimeout(),
Expand Down Expand Up @@ -114,7 +120,7 @@ func TestAddEventCreateLogGroup(t *testing.T) {
)

output := OutputPlugin{
logGroupName: testLogGroup,
logGroupName: testTemplate(testLogGroup),
logStreamPrefix: testLogStreamPrefix,
client: mockCloudWatch,
timer: setupTimeout(),
Expand Down Expand Up @@ -169,7 +175,7 @@ func TestAddEventExistingStream(t *testing.T) {
)

output := OutputPlugin{
logGroupName: testLogGroup,
logGroupName: testTemplate(testLogGroup),
logStreamPrefix: testLogStreamPrefix,
client: mockCloudWatch,
timer: setupTimeout(),
Expand Down Expand Up @@ -220,7 +226,7 @@ func TestAddEventExistingStreamNotFound(t *testing.T) {
)

output := OutputPlugin{
logGroupName: testLogGroup,
logGroupName: testTemplate(testLogGroup),
logStreamPrefix: testLogStreamPrefix,
client: mockCloudWatch,
timer: setupTimeout(),
Expand All @@ -242,7 +248,7 @@ func TestAddEventEmptyRecord(t *testing.T) {
mockCloudWatch := mock_cloudwatch.NewMockLogsClient(ctrl)

output := OutputPlugin{
logGroupName: testLogGroup,
logGroupName: testTemplate(testLogGroup),
logStreamPrefix: testLogStreamPrefix,
client: mockCloudWatch,
timer: setupTimeout(),
Expand Down Expand Up @@ -278,7 +284,7 @@ func TestAddEventAndFlush(t *testing.T) {
)

output := OutputPlugin{
logGroupName: testLogGroup,
logGroupName: testTemplate(testLogGroup),
logStreamPrefix: testLogStreamPrefix,
client: mockCloudWatch,
timer: setupTimeout(),
Expand All @@ -300,7 +306,7 @@ func TestPutLogEvents(t *testing.T) {
mockCloudWatch := mock_cloudwatch.NewMockLogsClient(ctrl)

output := OutputPlugin{
logGroupName: testLogGroup,
logGroupName: testTemplate(testLogGroup),
logStreamPrefix: testLogStreamPrefix,
client: mockCloudWatch,
timer: setupTimeout(),
Expand All @@ -327,30 +333,50 @@ func TestSetGroupStreamNames(t *testing.T) {
e := &Event{Tag: "syslog.0", Record: record}

// Test against non-template name.
output := OutputPlugin{logStreamName: "/aws/ecs/test-stream-name"}
output := OutputPlugin{
logStreamName: testTemplate("/aws/ecs/test-stream-name"),
logGroupName: testTemplate(""),
}
output.setGroupStreamNames(e)
assert.Equal(t, output.logStreamName, e.stream,
assert.Equal(t, "/aws/ecs/test-stream-name", e.stream,
"The provided stream name must be returned exactly, without modifications.")
// Test against a simple log stream prefix.
output = OutputPlugin{logStreamPrefix: "/aws/ecs/test-stream-prefix/"}
output.logStreamPrefix = "/aws/ecs/test-stream-prefix/"
output.setGroupStreamNames(e)
assert.Equal(t, output.logStreamPrefix+"syslog.0", e.stream,
"The provided stream prefix must be prefixed to the provided tag name.")
// Test replacing items from template variables.
output = OutputPlugin{logStreamName: "/aws/ecs/$(tag[0])/$(tag[1])/$(details['region'])/$(details['az'])/$(ident)"}
output.logStreamPrefix = ""
output.logStreamName = testTemplate("/aws/ecs/$(tag[0])/$(tag[1])/$(details['region'])/$(details['az'])/$(ident)")
output.setGroupStreamNames(e)
assert.Equal(t, "/aws/ecs/syslog/0/us-west-2/a/cron", e.stream,
"The stream name template was not correctly parsed.")
// Test bad template } missing. Just prints an error and returns the input value.
output = OutputPlugin{logStreamName: "/aws/ecs/$(tag"}
output.setGroupStreamNames(e)
assert.Equal(t, "/aws/ecs/$(tag", e.stream,
"The provided stream name must match when parsing fails.")
// Test another bad template ] missing.
output = OutputPlugin{logStreamName: "/aws/ecs/$(details['region')"}
output.logStreamName = testTemplate("/aws/ecs/$(details['region')")
output.setGroupStreamNames(e)
assert.Equal(t, "/aws/ecs/$(details['region')", e.stream,
assert.Equal(t, "/aws/ecs/['region'", e.stream,
"The provided stream name must match when parsing fails.")

// Test that log stream and log group names get truncated to the maximum allowed.
b := make([]byte, maxGroupStreamLength*2)
for i := range b { // make a string twice the max
b[i] = '_'
}

ident := string(b)
assert.True(t, len(ident) > maxGroupStreamLength, "test string creation failed")

e.Record = map[interface{}]interface{}{"ident": ident} // set the long string into our record.
output.logStreamName = testTemplate("/aws/ecs/$(ident)")
output.logGroupName = testTemplate("/aws/ecs/$(ident)")

output.setGroupStreamNames(e)
assert.Equal(t, maxGroupStreamLength, len(e.stream), "the stream name should be truncated to the maximum size")
assert.Equal(t, maxGroupStreamLength, len(e.group), "the group name should be truncated to the maximum size")
assert.Equal(t, "/aws/ecs/"+string(b[:maxGroupStreamLength-len("/aws/ecs/")]),
e.stream, "the stream name was incorrectly truncated")
assert.Equal(t, "/aws/ecs/"+string(b[:maxGroupStreamLength-len("/aws/ecs/")]),
e.group, "the group name was incorrectly truncated")
}

func TestAddEventAndFlushDataAlreadyAcceptedException(t *testing.T) {
Expand All @@ -369,7 +395,7 @@ func TestAddEventAndFlushDataAlreadyAcceptedException(t *testing.T) {
)

output := OutputPlugin{
logGroupName: testLogGroup,
logGroupName: testTemplate(testLogGroup),
logStreamPrefix: testLogStreamPrefix,
client: mockCloudWatch,
timer: setupTimeout(),
Expand Down Expand Up @@ -409,7 +435,7 @@ func TestAddEventAndFlushDataInvalidSequenceTokenException(t *testing.T) {
)

output := OutputPlugin{
logGroupName: testLogGroup,
logGroupName: testTemplate(testLogGroup),
logStreamPrefix: testLogStreamPrefix,
client: mockCloudWatch,
timer: setupTimeout(),
Expand Down Expand Up @@ -525,7 +551,7 @@ func setupLimitTestOutput(t *testing.T, times int) OutputPlugin {
)

return OutputPlugin{
logGroupName: testLogGroup,
logGroupName: testTemplate(testLogGroup),
logStreamPrefix: testLogStreamPrefix,
client: mockCloudWatch,
timer: setupTimeout(),
Expand Down
Loading

0 comments on commit e9d76ee

Please sign in to comment.