Skip to content

Commit

Permalink
filebeat/inputs/filestream: add metric for messages truncated (#41667)
Browse files Browse the repository at this point in the history
* filebeat: log when multiline reader truncates messages

While investigating an SDH, I noticed that although we add the truncated label
to log fields, there is no feedback explaining why the messages were
truncated.

This PR adds a log statement to provide insight whenever multiline messages
are truncated.

(cherry picked from commit 7806f1a)

# Conflicts:
#	filebeat/input/filestream/metrics_integration_test.go
  • Loading branch information
mauri870 authored and mergify[bot] committed Dec 31, 2024
1 parent 94b5691 commit 728e22b
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 51 deletions.
21 changes: 11 additions & 10 deletions filebeat/docs/inputs/input-filestream.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -165,19 +165,20 @@ include::../inputs/input-filestream-reader-options.asciidoc[]

This input exposes metrics under the <<http-endpoint, HTTP monitoring endpoint>>.
These metrics are exposed under the `/inputs` path. They can be used to
observe the activity of the input.
observe the activity of the input. Note that metrics from processors are not included.

[options="header"]
|=======
| Metric | Description
| `files_opened_total` | Total number of files opened.
| `files_closed_total` | Total number of files closed.
| `files_active` | Number of files currently open (gauge).
| `messages_read_total` | Total number of messages read.
| `bytes_processed_total` | Total number of bytes processed.
| `events_processed_total` | Total number of events processed.
| `processing_errors_total` | Total number of processing errors.
| `processing_time` | Histogram of the elapsed time to process messages (expressed in nanoseconds).
| Metric | Description
| `files_opened_total` | Total number of files opened.
| `files_closed_total` | Total number of files closed.
| `files_active` | Number of files currently open (gauge).
| `messages_read_total` | Total number of messages read.
| `messages_truncated_total` | Total number of messages truncated.
| `bytes_processed_total` | Total number of bytes processed.
| `events_processed_total` | Total number of events processed.
| `processing_errors_total` | Total number of processing errors.
| `processing_time` | Histogram of the elapsed time to process messages (expressed in nanoseconds).
|=======

Note:
Expand Down
21 changes: 15 additions & 6 deletions filebeat/input/filestream/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func (e *inputTestingEnvironment) requireNoEntryInRegistry(filename, inputID str
func (e *inputTestingEnvironment) requireOffsetInRegistryByID(key string, expectedOffset int) {
entry, err := e.getRegistryState(key)
if err != nil {
e.t.Fatalf(err.Error())
e.t.Fatal(err.Error())
}

require.Equal(e.t, expectedOffset, entry.Cursor.Offset)
Expand Down Expand Up @@ -415,12 +415,18 @@ func (e *inputTestingEnvironment) waitUntilEventCountCtx(ctx context.Context, co
select {
case <-ctx.Done():
logLines := map[string][]string{}
for _, e := range e.pipeline.GetAllEvents() {
flat := e.Fields.Flatten()
for _, evt := range e.pipeline.GetAllEvents() {
flat := evt.Fields.Flatten()
pathi, _ := flat.GetValue("log.file.path")
path := pathi.(string)
path, ok := pathi.(string)
if !ok {
e.t.Fatalf("waitUntilEventCountCtx: path is not a string: %v", pathi)
}
msgi, _ := flat.GetValue("message")
msg := msgi.(string)
msg, ok := msgi.(string)
if !ok {
e.t.Fatalf("waitUntilEventCountCtx: message is not a string: %v", msgi)
}
logLines[path] = append(logLines[path], msg)
}

Expand Down Expand Up @@ -462,7 +468,10 @@ func (e *inputTestingEnvironment) requireEventsReceived(events []string) {
if len(events) == checkedEventCount {
e.t.Fatalf("not enough expected elements")
}
message := evt.Fields["message"].(string)
message, ok := evt.Fields["message"].(string)
if !ok {
e.t.Fatalf("message is not string %+v", evt.Fields["message"])
}
if message == events[checkedEventCount] {
foundEvents[checkedEventCount] = true
}
Expand Down
10 changes: 10 additions & 0 deletions filebeat/input/filestream/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"io"
"os"
"slices"
"time"

"golang.org/x/text/transform"
Expand Down Expand Up @@ -380,6 +381,15 @@ func (inp *filestream) readFromSource(

s.Offset += int64(message.Bytes) + int64(message.Offset)

flags, err := message.Fields.GetValue("log.flags")
if err == nil {
if flags, ok := flags.([]string); ok {
if slices.Contains(flags, "truncated") { //nolint:typecheck,nolintlint // linter fails to infer generics
metrics.MessagesTruncated.Add(1)
}
}
}

metrics.MessagesRead.Inc()
if message.IsEmpty() || inp.isDroppedLine(log, string(message.Content)) {
continue
Expand Down
36 changes: 19 additions & 17 deletions filebeat/input/filestream/internal/input-logfile/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@ import (
type Metrics struct {
unregister func()

FilesOpened *monitoring.Uint // Number of files that have been opened.
FilesClosed *monitoring.Uint // Number of files closed.
FilesActive *monitoring.Uint // Number of files currently open (gauge).
MessagesRead *monitoring.Uint // Number of messages read.
BytesProcessed *monitoring.Uint // Number of bytes processed.
EventsProcessed *monitoring.Uint // Number of events processed.
ProcessingErrors *monitoring.Uint // Number of processing errors.
ProcessingTime metrics.Sample // Histogram of the elapsed time for processing an event.
FilesOpened *monitoring.Uint // Number of files that have been opened.
FilesClosed *monitoring.Uint // Number of files closed.
FilesActive *monitoring.Uint // Number of files currently open (gauge).
MessagesRead *monitoring.Uint // Number of messages read.
MessagesTruncated *monitoring.Uint // Number of messages truncated.
BytesProcessed *monitoring.Uint // Number of bytes processed.
EventsProcessed *monitoring.Uint // Number of events processed.
ProcessingErrors *monitoring.Uint // Number of processing errors.
ProcessingTime metrics.Sample // Histogram of the elapsed time for processing an event.

// Those metrics use the same registry/keys as the log input uses
HarvesterStarted *monitoring.Int
Expand Down Expand Up @@ -65,15 +66,16 @@ func NewMetrics(id string) *Metrics {

reg, unreg := inputmon.NewInputRegistry("filestream", id, nil)
m := Metrics{
unregister: unreg,
FilesOpened: monitoring.NewUint(reg, "files_opened_total"),
FilesClosed: monitoring.NewUint(reg, "files_closed_total"),
FilesActive: monitoring.NewUint(reg, "files_active"),
MessagesRead: monitoring.NewUint(reg, "messages_read_total"),
BytesProcessed: monitoring.NewUint(reg, "bytes_processed_total"),
EventsProcessed: monitoring.NewUint(reg, "events_processed_total"),
ProcessingErrors: monitoring.NewUint(reg, "processing_errors_total"),
ProcessingTime: metrics.NewUniformSample(1024),
unregister: unreg,
FilesOpened: monitoring.NewUint(reg, "files_opened_total"),
FilesClosed: monitoring.NewUint(reg, "files_closed_total"),
FilesActive: monitoring.NewUint(reg, "files_active"),
MessagesRead: monitoring.NewUint(reg, "messages_read_total"),
MessagesTruncated: monitoring.NewUint(reg, "messages_truncated_total"),
BytesProcessed: monitoring.NewUint(reg, "bytes_processed_total"),
EventsProcessed: monitoring.NewUint(reg, "events_processed_total"),
ProcessingErrors: monitoring.NewUint(reg, "processing_errors_total"),
ProcessingTime: metrics.NewUniformSample(1024),

HarvesterStarted: monitoring.NewInt(harvesterMetrics, "started"),
HarvesterClosed: monitoring.NewInt(harvesterMetrics, "closed"),
Expand Down
150 changes: 134 additions & 16 deletions filebeat/input/filestream/metrics_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,37 @@ func TestFilestreamMetrics(t *testing.T) {

testlogName := "test.log"
inp := env.mustCreateInput(map[string]interface{}{
<<<<<<< HEAD

Check failure on line 36 in filebeat/input/filestream/metrics_integration_test.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

expected operand, found '<<' (typecheck)
"id": "fake-ID",
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "24h",
"close.on_state_change.check_interval": "100ms",
"close.on_state_change.inactive": "2s",
=======
"id": "fake-ID",
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "24h",
"close.on_state_change.check_interval": "100ms",
"close.on_state_change.inactive": "2s",
"prospector.scanner.fingerprint.enabled": false,
"file_identity.native": map[string]any{},
"message_max_bytes": 20,
"parsers": []map[string]interface{}{
{
"multiline": map[string]interface{}{
"type": "pattern",
"pattern": "^multiline",
"negate": true,
"match": "after",
"max_lines": 1,
"timeout": "1s",
},
},
},
>>>>>>> 7806f1a2c (filebeat/inputs/filestream: add metric for messages truncated (#41667))

Check failure on line 63 in filebeat/input/filestream/metrics_integration_test.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

missing ',' in composite literal (typecheck)
})

testlines := []byte("first line\nsecond line\nthird line\n")
testlines := []byte("first line\nsecond line\nthird line\nthis is a very long line exceeding message_max_bytes\nmultiline first line\nmultiline second line\n")
env.mustWriteToFile(testlogName, testlines)

ctx, cancelInput := context.WithCancel(context.Background())
Expand All @@ -51,37 +74,132 @@ func TestFilestreamMetrics(t *testing.T) {
env.waitUntilHarvesterIsDone()

checkMetrics(t, "fake-ID", expectedMetrics{
FilesOpened: 1,
FilesClosed: 1,
FilesActive: 0,
MessagesRead: 3,
BytesProcessed: 34,
EventsProcessed: 3,
ProcessingErrors: 0,
FilesOpened: 1,
FilesClosed: 1,
FilesActive: 0,
MessagesRead: 3,
MessagesTruncated: 2,
BytesProcessed: 130,
EventsProcessed: 3,
ProcessingErrors: 0,
})

cancelInput()
env.waitUntilInputStops()
}

func TestFilestreamMessageMaxBytesTruncatedMetric(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
inp := env.mustCreateInput(map[string]interface{}{
"id": "fake-ID",
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "24h",
"close.on_state_change.check_interval": "100ms",
"close.on_state_change.inactive": "2s",
"prospector.scanner.fingerprint.enabled": false,
"file_identity.native": map[string]any{},
"message_max_bytes": 20,
})

testlines := []byte("first line\nsecond line\nthird line\nthis is a long line exceeding message_max_bytes\n")
env.mustWriteToFile(testlogName, testlines)

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

env.waitUntilEventCount(4)
env.requireOffsetInRegistry(testlogName, "fake-ID", len(testlines))
env.waitUntilHarvesterIsDone()

checkMetrics(t, "fake-ID", expectedMetrics{
FilesOpened: 1,
FilesClosed: 1,
FilesActive: 0,
MessagesRead: 4,
MessagesTruncated: 1,
BytesProcessed: 82,
EventsProcessed: 4,
ProcessingErrors: 0,
})

cancelInput()
env.waitUntilInputStops()
}

func TestFilestreamMultilineMaxLinesTruncatedMetric(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
inp := env.mustCreateInput(map[string]interface{}{
"id": "fake-ID",
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "24h",
"close.on_state_change.check_interval": "100ms",
"close.on_state_change.inactive": "2s",
"prospector.scanner.fingerprint.enabled": false,
"file_identity.native": map[string]any{},
"parsers": []map[string]interface{}{
{
"multiline": map[string]interface{}{
"type": "pattern",
"pattern": "^multiline",
"negate": true,
"match": "after",
"max_lines": 1,
"timeout": "1s",
},
},
},
})

testlines := []byte("first line\nsecond line\nmultiline first line\nmultiline second line\n")
env.mustWriteToFile(testlogName, testlines)

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

env.waitUntilEventCount(3)
env.requireOffsetInRegistry(testlogName, "fake-ID", len(testlines))
env.waitUntilHarvesterIsDone()

checkMetrics(t, "fake-ID", expectedMetrics{
FilesOpened: 1,
FilesClosed: 1,
FilesActive: 0,
MessagesRead: 3,
MessagesTruncated: 1,
BytesProcessed: 66,
EventsProcessed: 3,
ProcessingErrors: 0,
})

cancelInput()
env.waitUntilInputStops()
}

type expectedMetrics struct {

Check failure on line 182 in filebeat/input/filestream/metrics_integration_test.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

expected ')', found 'type' (typecheck)
FilesOpened uint64
FilesClosed uint64
FilesActive uint64
MessagesRead uint64
BytesProcessed uint64
EventsProcessed uint64
ProcessingErrors uint64
FilesOpened uint64
FilesClosed uint64
FilesActive uint64
MessagesRead uint64
MessagesTruncated uint64
BytesProcessed uint64
EventsProcessed uint64
ProcessingErrors uint64
}

Check failure on line 191 in filebeat/input/filestream/metrics_integration_test.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

missing ',' before newline in composite literal (typecheck)

func checkMetrics(t *testing.T, id string, expected expectedMetrics) {

Check failure on line 193 in filebeat/input/filestream/metrics_integration_test.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

expected '(', found checkMetrics (typecheck)
reg := monitoring.GetNamespace("dataset").GetRegistry().Get(id).(*monitoring.Registry)
reg, ok := monitoring.GetNamespace("dataset").GetRegistry().Get(id).(*monitoring.Registry)
require.True(t, ok, "registry not found")

require.Equal(t, id, reg.Get("id").(*monitoring.String).Get(), "id")
require.Equal(t, "filestream", reg.Get("input").(*monitoring.String).Get(), "input")
require.Equal(t, expected.FilesOpened, reg.Get("files_opened_total").(*monitoring.Uint).Get(), "files_opened_total")
require.Equal(t, expected.FilesClosed, reg.Get("files_closed_total").(*monitoring.Uint).Get(), "files_closed_total")
require.Equal(t, expected.MessagesRead, reg.Get("messages_read_total").(*monitoring.Uint).Get(), "messages_read_total")
require.Equal(t, expected.MessagesTruncated, reg.Get("messages_truncated_total").(*monitoring.Uint).Get(), "messages_truncated_total")
require.Equal(t, expected.BytesProcessed, reg.Get("bytes_processed_total").(*monitoring.Uint).Get(), "bytes_processed_total")
require.Equal(t, expected.EventsProcessed, reg.Get("events_processed_total").(*monitoring.Uint).Get(), "events_processed_total")
require.Equal(t, expected.ProcessingErrors, reg.Get("processing_errors_total").(*monitoring.Uint).Get(), "processing_errors_total")
Expand Down
4 changes: 2 additions & 2 deletions libbeat/reader/multiline/message_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,11 @@ func (b *messageBuffer) addLine(m reader.Message) {
// finalize writes the existing content into the returned message and resets all reader variables.
func (b *messageBuffer) finalize() reader.Message {
if b.truncated > 0 {
b.message.AddFlagsWithKey("log.flags", "truncated")
b.message.AddFlagsWithKey("log.flags", "truncated") //nolint:errcheck // It is safe to ignore the error.
}

if b.numLines > 1 {
b.message.AddFlagsWithKey("log.flags", "multiline")
b.message.AddFlagsWithKey("log.flags", "multiline") //nolint:errcheck // It is safe to ignore the error.
}

// Copy message from existing content
Expand Down

0 comments on commit 728e22b

Please sign in to comment.