Skip to content

Commit

Permalink
filebeat/inputs/filestream: add metric for messages truncated (#41667)
Browse files Browse the repository at this point in the history
* filebeat: log when multiline reader truncates messages

While investigating an SDH, I noticed that although we add the truncated label
to log fields, there is no feedback explaining why the messages were
truncated.

This PR adds a log statement to provide insight whenever multiline messages
are truncated.
  • Loading branch information
mauri870 authored Dec 31, 2024
1 parent 111a480 commit 7806f1a
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 51 deletions.
21 changes: 11 additions & 10 deletions filebeat/docs/inputs/input-filestream.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -175,19 +175,20 @@ include::../inputs/input-filestream-reader-options.asciidoc[]

This input exposes metrics under the <<http-endpoint, HTTP monitoring endpoint>>.
These metrics are exposed under the `/inputs` path. They can be used to
observe the activity of the input.
observe the activity of the input. Note that metrics from processors are not included.

[options="header"]
|=======
| Metric | Description
| `files_opened_total` | Total number of files opened.
| `files_closed_total` | Total number of files closed.
| `files_active` | Number of files currently open (gauge).
| `messages_read_total` | Total number of messages read.
| `bytes_processed_total` | Total number of bytes processed.
| `events_processed_total` | Total number of events processed.
| `processing_errors_total` | Total number of processing errors.
| `processing_time` | Histogram of the elapsed time to process messages (expressed in nanoseconds).
| Metric | Description
| `files_opened_total` | Total number of files opened.
| `files_closed_total` | Total number of files closed.
| `files_active` | Number of files currently open (gauge).
| `messages_read_total` | Total number of messages read.
| `messages_truncated_total` | Total number of messages truncated.
| `bytes_processed_total` | Total number of bytes processed.
| `events_processed_total` | Total number of events processed.
| `processing_errors_total` | Total number of processing errors.
| `processing_time` | Histogram of the elapsed time to process messages (expressed in nanoseconds).
|=======

Note:
Expand Down
21 changes: 15 additions & 6 deletions filebeat/input/filestream/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func (e *inputTestingEnvironment) requireNoEntryInRegistry(filename, inputID str
func (e *inputTestingEnvironment) requireOffsetInRegistryByID(key string, expectedOffset int) {
entry, err := e.getRegistryState(key)
if err != nil {
e.t.Fatalf(err.Error())
e.t.Fatal(err.Error())
}

require.Equal(e.t, expectedOffset, entry.Cursor.Offset)
Expand Down Expand Up @@ -416,12 +416,18 @@ func (e *inputTestingEnvironment) waitUntilEventCountCtx(ctx context.Context, co
select {
case <-ctx.Done():
logLines := map[string][]string{}
for _, e := range e.pipeline.GetAllEvents() {
flat := e.Fields.Flatten()
for _, evt := range e.pipeline.GetAllEvents() {
flat := evt.Fields.Flatten()
pathi, _ := flat.GetValue("log.file.path")
path := pathi.(string)
path, ok := pathi.(string)
if !ok {
e.t.Fatalf("waitUntilEventCountCtx: path is not a string: %v", pathi)
}
msgi, _ := flat.GetValue("message")
msg := msgi.(string)
msg, ok := msgi.(string)
if !ok {
e.t.Fatalf("waitUntilEventCountCtx: message is not a string: %v", msgi)
}
logLines[path] = append(logLines[path], msg)
}

Expand Down Expand Up @@ -468,7 +474,10 @@ func (e *inputTestingEnvironment) requireEventsReceived(events []string) {
if len(events) == checkedEventCount {
e.t.Fatalf("not enough expected elements")
}
message := evt.Fields["message"].(string)
message, ok := evt.Fields["message"].(string)
if !ok {
e.t.Fatalf("message is not string %+v", evt.Fields["message"])
}
if message == events[checkedEventCount] {
foundEvents[checkedEventCount] = true
}
Expand Down
10 changes: 10 additions & 0 deletions filebeat/input/filestream/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"io"
"os"
"slices"
"time"

"golang.org/x/text/transform"
Expand Down Expand Up @@ -380,6 +381,15 @@ func (inp *filestream) readFromSource(

s.Offset += int64(message.Bytes) + int64(message.Offset)

flags, err := message.Fields.GetValue("log.flags")
if err == nil {
if flags, ok := flags.([]string); ok {
if slices.Contains(flags, "truncated") { //nolint:typecheck,nolintlint // linter fails to infer generics
metrics.MessagesTruncated.Add(1)
}
}
}

metrics.MessagesRead.Inc()
if message.IsEmpty() || inp.isDroppedLine(log, string(message.Content)) {
continue
Expand Down
36 changes: 19 additions & 17 deletions filebeat/input/filestream/internal/input-logfile/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@ import (
type Metrics struct {
unregister func()

FilesOpened *monitoring.Uint // Number of files that have been opened.
FilesClosed *monitoring.Uint // Number of files closed.
FilesActive *monitoring.Uint // Number of files currently open (gauge).
MessagesRead *monitoring.Uint // Number of messages read.
BytesProcessed *monitoring.Uint // Number of bytes processed.
EventsProcessed *monitoring.Uint // Number of events processed.
ProcessingErrors *monitoring.Uint // Number of processing errors.
ProcessingTime metrics.Sample // Histogram of the elapsed time for processing an event.
FilesOpened *monitoring.Uint // Number of files that have been opened.
FilesClosed *monitoring.Uint // Number of files closed.
FilesActive *monitoring.Uint // Number of files currently open (gauge).
MessagesRead *monitoring.Uint // Number of messages read.
MessagesTruncated *monitoring.Uint // Number of messages truncated.
BytesProcessed *monitoring.Uint // Number of bytes processed.
EventsProcessed *monitoring.Uint // Number of events processed.
ProcessingErrors *monitoring.Uint // Number of processing errors.
ProcessingTime metrics.Sample // Histogram of the elapsed time for processing an event.

// Those metrics use the same registry/keys as the log input uses
HarvesterStarted *monitoring.Int
Expand Down Expand Up @@ -65,15 +66,16 @@ func NewMetrics(id string) *Metrics {

reg, unreg := inputmon.NewInputRegistry("filestream", id, nil)
m := Metrics{
unregister: unreg,
FilesOpened: monitoring.NewUint(reg, "files_opened_total"),
FilesClosed: monitoring.NewUint(reg, "files_closed_total"),
FilesActive: monitoring.NewUint(reg, "files_active"),
MessagesRead: monitoring.NewUint(reg, "messages_read_total"),
BytesProcessed: monitoring.NewUint(reg, "bytes_processed_total"),
EventsProcessed: monitoring.NewUint(reg, "events_processed_total"),
ProcessingErrors: monitoring.NewUint(reg, "processing_errors_total"),
ProcessingTime: metrics.NewUniformSample(1024),
unregister: unreg,
FilesOpened: monitoring.NewUint(reg, "files_opened_total"),
FilesClosed: monitoring.NewUint(reg, "files_closed_total"),
FilesActive: monitoring.NewUint(reg, "files_active"),
MessagesRead: monitoring.NewUint(reg, "messages_read_total"),
MessagesTruncated: monitoring.NewUint(reg, "messages_truncated_total"),
BytesProcessed: monitoring.NewUint(reg, "bytes_processed_total"),
EventsProcessed: monitoring.NewUint(reg, "events_processed_total"),
ProcessingErrors: monitoring.NewUint(reg, "processing_errors_total"),
ProcessingTime: metrics.NewUniformSample(1024),

HarvesterStarted: monitoring.NewInt(harvesterMetrics, "started"),
HarvesterClosed: monitoring.NewInt(harvesterMetrics, "closed"),
Expand Down
140 changes: 124 additions & 16 deletions filebeat/input/filestream/metrics_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,22 @@ func TestFilestreamMetrics(t *testing.T) {
"close.on_state_change.inactive": "2s",
"prospector.scanner.fingerprint.enabled": false,
"file_identity.native": map[string]any{},
"message_max_bytes": 20,
"parsers": []map[string]interface{}{
{
"multiline": map[string]interface{}{
"type": "pattern",
"pattern": "^multiline",
"negate": true,
"match": "after",
"max_lines": 1,
"timeout": "1s",
},
},
},
})

testlines := []byte("first line\nsecond line\nthird line\n")
testlines := []byte("first line\nsecond line\nthird line\nthis is a very long line exceeding message_max_bytes\nmultiline first line\nmultiline second line\n")
env.mustWriteToFile(testlogName, testlines)

ctx, cancelInput := context.WithCancel(context.Background())
Expand All @@ -53,37 +66,132 @@ func TestFilestreamMetrics(t *testing.T) {
env.waitUntilHarvesterIsDone()

checkMetrics(t, "fake-ID", expectedMetrics{
FilesOpened: 1,
FilesClosed: 1,
FilesActive: 0,
MessagesRead: 3,
BytesProcessed: 34,
EventsProcessed: 3,
ProcessingErrors: 0,
FilesOpened: 1,
FilesClosed: 1,
FilesActive: 0,
MessagesRead: 3,
MessagesTruncated: 2,
BytesProcessed: 130,
EventsProcessed: 3,
ProcessingErrors: 0,
})

cancelInput()
env.waitUntilInputStops()
}

func TestFilestreamMessageMaxBytesTruncatedMetric(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
inp := env.mustCreateInput(map[string]interface{}{
"id": "fake-ID",
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "24h",
"close.on_state_change.check_interval": "100ms",
"close.on_state_change.inactive": "2s",
"prospector.scanner.fingerprint.enabled": false,
"file_identity.native": map[string]any{},
"message_max_bytes": 20,
})

testlines := []byte("first line\nsecond line\nthird line\nthis is a long line exceeding message_max_bytes\n")
env.mustWriteToFile(testlogName, testlines)

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

env.waitUntilEventCount(4)
env.requireOffsetInRegistry(testlogName, "fake-ID", len(testlines))
env.waitUntilHarvesterIsDone()

checkMetrics(t, "fake-ID", expectedMetrics{
FilesOpened: 1,
FilesClosed: 1,
FilesActive: 0,
MessagesRead: 4,
MessagesTruncated: 1,
BytesProcessed: 82,
EventsProcessed: 4,
ProcessingErrors: 0,
})

cancelInput()
env.waitUntilInputStops()
}

func TestFilestreamMultilineMaxLinesTruncatedMetric(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
inp := env.mustCreateInput(map[string]interface{}{
"id": "fake-ID",
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "24h",
"close.on_state_change.check_interval": "100ms",
"close.on_state_change.inactive": "2s",
"prospector.scanner.fingerprint.enabled": false,
"file_identity.native": map[string]any{},
"parsers": []map[string]interface{}{
{
"multiline": map[string]interface{}{
"type": "pattern",
"pattern": "^multiline",
"negate": true,
"match": "after",
"max_lines": 1,
"timeout": "1s",
},
},
},
})

testlines := []byte("first line\nsecond line\nmultiline first line\nmultiline second line\n")
env.mustWriteToFile(testlogName, testlines)

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

env.waitUntilEventCount(3)
env.requireOffsetInRegistry(testlogName, "fake-ID", len(testlines))
env.waitUntilHarvesterIsDone()

checkMetrics(t, "fake-ID", expectedMetrics{
FilesOpened: 1,
FilesClosed: 1,
FilesActive: 0,
MessagesRead: 3,
MessagesTruncated: 1,
BytesProcessed: 66,
EventsProcessed: 3,
ProcessingErrors: 0,
})

cancelInput()
env.waitUntilInputStops()
}

type expectedMetrics struct {
FilesOpened uint64
FilesClosed uint64
FilesActive uint64
MessagesRead uint64
BytesProcessed uint64
EventsProcessed uint64
ProcessingErrors uint64
FilesOpened uint64
FilesClosed uint64
FilesActive uint64
MessagesRead uint64
MessagesTruncated uint64
BytesProcessed uint64
EventsProcessed uint64
ProcessingErrors uint64
}

func checkMetrics(t *testing.T, id string, expected expectedMetrics) {
reg := monitoring.GetNamespace("dataset").GetRegistry().Get(id).(*monitoring.Registry)
reg, ok := monitoring.GetNamespace("dataset").GetRegistry().Get(id).(*monitoring.Registry)
require.True(t, ok, "registry not found")

require.Equal(t, id, reg.Get("id").(*monitoring.String).Get(), "id")
require.Equal(t, "filestream", reg.Get("input").(*monitoring.String).Get(), "input")
require.Equal(t, expected.FilesOpened, reg.Get("files_opened_total").(*monitoring.Uint).Get(), "files_opened_total")
require.Equal(t, expected.FilesClosed, reg.Get("files_closed_total").(*monitoring.Uint).Get(), "files_closed_total")
require.Equal(t, expected.MessagesRead, reg.Get("messages_read_total").(*monitoring.Uint).Get(), "messages_read_total")
require.Equal(t, expected.MessagesTruncated, reg.Get("messages_truncated_total").(*monitoring.Uint).Get(), "messages_truncated_total")
require.Equal(t, expected.BytesProcessed, reg.Get("bytes_processed_total").(*monitoring.Uint).Get(), "bytes_processed_total")
require.Equal(t, expected.EventsProcessed, reg.Get("events_processed_total").(*monitoring.Uint).Get(), "events_processed_total")
require.Equal(t, expected.ProcessingErrors, reg.Get("processing_errors_total").(*monitoring.Uint).Get(), "processing_errors_total")
Expand Down
4 changes: 2 additions & 2 deletions libbeat/reader/multiline/message_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,11 @@ func (b *messageBuffer) addLine(m reader.Message) {
// finalize writes the existing content into the returned message and resets all reader variables.
func (b *messageBuffer) finalize() reader.Message {
if b.truncated > 0 {
b.message.AddFlagsWithKey("log.flags", "truncated")
b.message.AddFlagsWithKey("log.flags", "truncated") //nolint:errcheck // It is safe to ignore the error.
}

if b.numLines > 1 {
b.message.AddFlagsWithKey("log.flags", "multiline")
b.message.AddFlagsWithKey("log.flags", "multiline") //nolint:errcheck // It is safe to ignore the error.
}

// Copy message from existing content
Expand Down

0 comments on commit 7806f1a

Please sign in to comment.