Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/sllogformat] skip with warning for empty log lines #242

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion sllogformatprocessor/batch_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"crypto/sha1"
"encoding/json"
"errors"
"fmt"

"go.uber.org/zap"
Expand Down Expand Up @@ -100,7 +101,9 @@ func (bl *batchLogs) addToBatch(ld plog.Logs) {
rl.ScopeLogs().RemoveIf(func(ils plog.ScopeLogs) bool {
ils.LogRecords().RemoveIf(func(lr plog.LogRecord) bool {
gen, req, err := bl.cfg.MatchProfile(bl.log, rl, ils, lr)
if err != nil {
if errors.Is(err, &NoPrintablesError{}) {
return true
} else if err != nil {
bl.log.Error("Failed to match profile",
zap.String("err", err.Error()))
bl.dumpLogRecord(rl, ils, lr)
Expand Down
74 changes: 48 additions & 26 deletions sllogformatprocessor/match_profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
package sllogformatprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/sllogformatprocessor"
import (
"encoding/json"
"errors"
"fmt"
"regexp"
"strconv"
"strings"
Expand All @@ -27,6 +27,12 @@ import (
"go.uber.org/zap"
)

type NoPrintablesError struct{}

func (err *NoPrintablesError) Error() string {
return "log message has no printable characters"
}

type StreamTokenReq struct {
Stream string `json:"stream"`
Logbasename string `json:"logbasename"`
Expand Down Expand Up @@ -388,7 +394,8 @@ type ConfigResult struct {

func (c *Config) MatchProfile(log *zap.Logger, rl plog.ResourceLogs, ils plog.ScopeLogs, lr plog.LogRecord) (*ConfigResult, *StreamTokenReq, error) {
var id, ret string
for _, profile := range c.Profiles {
reasons := []string{}
for idx, profile := range c.Profiles {
req := newStreamTokenReq()
gen := ConfigResult{}
parser := Parser{
Expand All @@ -399,17 +406,19 @@ func (c *Config) MatchProfile(log *zap.Logger, rl plog.ResourceLogs, ils plog.Sc
}
id, gen.ServiceGroup = parser.EvalElem(profile.ServiceGroup)
if gen.ServiceGroup == "" {
continue
reasons = append(reasons, "service_group")
} else {
req.Ids[id] = gen.ServiceGroup
}
req.Ids[id] = gen.ServiceGroup
id, gen.Host = parser.EvalElem(profile.Host)
if gen.Host == "" {
continue
reasons = append(reasons, "host")
} else {
req.Ids[id] = gen.Host
}
req.Ids[id] = gen.Host
id, gen.Logbasename = parser.EvalElem(profile.Logbasename)
if gen.Logbasename == "" {
continue
reasons = append(reasons, "logbasename")
}
if lr.SeverityNumber() == plog.SeverityNumberUnspecified {
sevNum, ok := sevText2Num[lr.SeverityText()]
Expand All @@ -420,33 +429,43 @@ func (c *Config) MatchProfile(log *zap.Logger, rl plog.ResourceLogs, ils plog.Sc
if profile.Severity != nil {
_, sevText := parser.EvalElem(profile.Severity)
if sevText == "" {
continue
}
sevText = strings.ToUpper(sevText)
sevNum := plog.SeverityNumberUnspecified
sevNum, _ = sevTextMap[sevText]
if sevNum == plog.SeverityNumberUnspecified &&
len(sevText) == 3 {
// Interpret as HTTP status
switch sevText[0] {
case '1', '2':
sevNum = plog.SeverityNumberInfo
case '3':
sevNum = plog.SeverityNumberDebug
case '4', '5':
sevNum = plog.SeverityNumberError
reasons = append(reasons, "severity")
} else {
sevText = strings.ToUpper(sevText)
sevNum := plog.SeverityNumberUnspecified
sevNum, _ = sevTextMap[sevText]
if sevNum == plog.SeverityNumberUnspecified &&
len(sevText) == 3 {
// Interpret as HTTP status
switch sevText[0] {
case '1', '2':
sevNum = plog.SeverityNumberInfo
case '3':
sevNum = plog.SeverityNumberDebug
case '4', '5':
sevNum = plog.SeverityNumberError
}
}
lr.SetSeverityNumber(sevNum)
}
lr.SetSeverityNumber(sevNum)
}
req.Ids[id] = gen.Logbasename
req.Logbasename = gen.Logbasename
if gen.Logbasename != "" {
req.Ids[id] = gen.Logbasename
req.Logbasename = gen.Logbasename
}
for _, label := range profile.Labels {
id, ret = parser.EvalElem(label)
req.Cfgs[id] = ret
}
_, gen.Message = parser.EvalElem(profile.Message)
if gen.Message == "" {
if idx >= len(c.Profiles)-1 {
err_noprint := &NoPrintablesError{}
log.Warn("Failed to match profile",
zap.String("err", err_noprint.Error()))
return nil, nil, err_noprint
}
reasons = append(reasons, "message")
continue
}
// FORMAT MESSAGE
Expand Down Expand Up @@ -480,5 +499,8 @@ func (c *Config) MatchProfile(log *zap.Logger, rl plog.ResourceLogs, ils plog.Sc
gen.Format = profile.Format
return &gen, &req, nil
}
return nil, nil, errors.New("No matching profile for log record")
if len(reasons) > 0 {
return nil, nil, fmt.Errorf("no matching profile for log record, failed to find %s", strings.Join(reasons, ","))
}
return nil, nil, fmt.Errorf("No matching profile for log record")
}
197 changes: 197 additions & 0 deletions sllogformatprocessor/match_profile_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
package sllogformatprocessor

import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/zap"
)

func TestMatchProfileSkipLogic(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()

// Assert type assertion for *Config
config, ok := cfg.(*Config)
if !ok {
t.Fatalf("Expected *Config but got %T", cfg)
}

// Update the config with the profile information
config.Profiles = []ConfigProfile{
{
ServiceGroup: &ConfigAttribute{
Rename: "service_group",
Exp: &ConfigExpression{
Source: "lit:default-group",
},
},
Host: &ConfigAttribute{
Rename: "host",
Exp: &ConfigExpression{
Source: "lit:test-host",
},
},
Logbasename: &ConfigAttribute{
Rename: "logbasename",
Exp: &ConfigExpression{
Source: "lit:example-log",
},
},
Severity: &ConfigAttribute{
Exp: &ConfigExpression{
Source: "lit:INFO",
},
},
Message: &ConfigAttribute{
Exp: &ConfigExpression{
Source: "body", // Message comes from the log body
},
},
},
}

originalProfile := config.Profiles[0]

logger := zap.NewNop()
mockResourceLogs := plog.NewResourceLogs()
mockScopeLogs := mockResourceLogs.ScopeLogs().AppendEmpty()

// Table-driven test cases
testCases := []struct {
name string
ServiceGroup string
Host string
Logbasename string
Severity string
Message string
expectError bool
specificError error
}{
{
name: "All fields valid",
ServiceGroup: "default-group",
Host: "test-host",
Logbasename: "example-log",
Severity: "INFO",
Message: "valid log line",
expectError: false,
specificError: nil,
},
{
name: "Empty service_group",
ServiceGroup: "",
Host: "test-host",
Logbasename: "example-log",
Severity: "INFO",
Message: "valid log line",
expectError: false,
specificError: nil,
},
{
name: "Empty host",
ServiceGroup: "default-group",
Host: "",
Logbasename: "example-log",
Severity: "INFO",
Message: "valid log line",
expectError: false,
specificError: nil,
},
{
name: "Empty logbasename",
ServiceGroup: "default-group",
Host: "test-host",
Logbasename: "",
Severity: "INFO",
Message: "valid log line",
expectError: false,
specificError: nil,
},
{
name: "Empty severity",
ServiceGroup: "default-group",
Host: "test-host",
Logbasename: "example-log",
Severity: "",
Message: "valid log line",
expectError: false,
specificError: nil,
},
{
name: "Empty message",
ServiceGroup: "default-group",
Host: "test-host",
Logbasename: "example-log",
Severity: "INFO",
Message: "",
expectError: true,
specificError: &NoPrintablesError{},
},
{
name: "Log line with unprintable characters",
ServiceGroup: "default-group",
Host: "test-host",
Logbasename: "example-log",
Severity: "INFO",
Message: "\x00\x01\x02\x03",
expectError: true,
specificError: &NoPrintablesError{},
},
}

// Iterate through test cases
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Modify the first profile based on the test case
if tc.ServiceGroup == "" {
config.Profiles[0].ServiceGroup = nil
}
if tc.Host == "" {
config.Profiles[0].Host = nil
}
if tc.Logbasename == "" {
config.Profiles[0].Logbasename = nil
}
if tc.Severity == "" {
config.Profiles[0].Severity = nil
}
if tc.Message == "" {
config.Profiles[0].Message = nil
}

// Create log record for the test case
createLogRecord(mockScopeLogs, tc.Message)

// Get the last log record (the one just created), and attempt to match a profile
logRecord := mockScopeLogs.LogRecords().At(mockScopeLogs.LogRecords().Len() - 1)
gen, req, err := config.MatchProfile(logger, mockResourceLogs, mockScopeLogs, logRecord)

// Validate the error based on test case expectations
if tc.expectError {
if tc.specificError != nil {
assert.ErrorAs(t, err, &tc.specificError, "expected specific error for log record")
} else {
assert.Error(t, err, "expected some error for log record")
}
assert.Empty(t, gen, "gen should be empty when log record is skipped")
assert.Empty(t, req, "req should be empty when log record is skipped")
} else {
assert.NoError(t, err, "expected no error for valid log record")
assert.NotEmpty(t, gen, "gen should not be empty for valid log record")
assert.NotEmpty(t, req, "req should not be empty for valid log record")
}

// Restore the original profile
config.Profiles[0] = originalProfile
})
}

}

// Creates a log record with specific attributes and log line
func createLogRecord(scopeLogs plog.ScopeLogs, logLine string) {
logRecord := scopeLogs.LogRecords().AppendEmpty()
logRecord.Body().SetStr(logLine)
}