diff --git a/filebeat/docs/inputs/input-filestream.asciidoc b/filebeat/docs/inputs/input-filestream.asciidoc index 74b7514b91a..e845239d786 100644 --- a/filebeat/docs/inputs/input-filestream.asciidoc +++ b/filebeat/docs/inputs/input-filestream.asciidoc @@ -175,19 +175,20 @@ include::../inputs/input-filestream-reader-options.asciidoc[] This input exposes metrics under the <>. These metrics are exposed under the `/inputs` path. They can be used to -observe the activity of the input. +observe the activity of the input. Note that metrics from processors are not included. [options="header"] |======= -| Metric | Description -| `files_opened_total` | Total number of files opened. -| `files_closed_total` | Total number of files closed. -| `files_active` | Number of files currently open (gauge). -| `messages_read_total` | Total number of messages read. -| `bytes_processed_total` | Total number of bytes processed. -| `events_processed_total` | Total number of events processed. -| `processing_errors_total` | Total number of processing errors. -| `processing_time` | Histogram of the elapsed time to process messages (expressed in nanoseconds). +| Metric | Description +| `files_opened_total` | Total number of files opened. +| `files_closed_total` | Total number of files closed. +| `files_active` | Number of files currently open (gauge). +| `messages_read_total` | Total number of messages read. +| `messages_truncated_total` | Total number of messages truncated. +| `bytes_processed_total` | Total number of bytes processed. +| `events_processed_total` | Total number of events processed. +| `processing_errors_total` | Total number of processing errors. +| `processing_time` | Histogram of the elapsed time to process messages (expressed in nanoseconds). |======= Note: diff --git a/filebeat/input/filestream/environment_test.go b/filebeat/input/filestream/environment_test.go index 80460d6b3b4..a4dfd2c15fe 100644 --- a/filebeat/input/filestream/environment_test.go +++ b/filebeat/input/filestream/environment_test.go @@ -345,7 +345,7 @@ func (e *inputTestingEnvironment) requireNoEntryInRegistry(filename, inputID str func (e *inputTestingEnvironment) requireOffsetInRegistryByID(key string, expectedOffset int) { entry, err := e.getRegistryState(key) if err != nil { - e.t.Fatalf(err.Error()) + e.t.Fatal(err.Error()) } require.Equal(e.t, expectedOffset, entry.Cursor.Offset) @@ -416,12 +416,18 @@ func (e *inputTestingEnvironment) waitUntilEventCountCtx(ctx context.Context, co select { case <-ctx.Done(): logLines := map[string][]string{} - for _, e := range e.pipeline.GetAllEvents() { - flat := e.Fields.Flatten() + for _, evt := range e.pipeline.GetAllEvents() { + flat := evt.Fields.Flatten() pathi, _ := flat.GetValue("log.file.path") - path := pathi.(string) + path, ok := pathi.(string) + if !ok { + e.t.Fatalf("waitUntilEventCountCtx: path is not a string: %v", pathi) + } msgi, _ := flat.GetValue("message") - msg := msgi.(string) + msg, ok := msgi.(string) + if !ok { + e.t.Fatalf("waitUntilEventCountCtx: message is not a string: %v", msgi) + } logLines[path] = append(logLines[path], msg) } @@ -468,7 +474,10 @@ func (e *inputTestingEnvironment) requireEventsReceived(events []string) { if len(events) == checkedEventCount { e.t.Fatalf("not enough expected elements") } - message := evt.Fields["message"].(string) + message, ok := evt.Fields["message"].(string) + if !ok { + e.t.Fatalf("message is not string %+v", evt.Fields["message"]) + } if message == events[checkedEventCount] { foundEvents[checkedEventCount] = true } diff --git a/filebeat/input/filestream/input.go b/filebeat/input/filestream/input.go index c2efe2c50cd..0e2a66a2307 100644 --- a/filebeat/input/filestream/input.go +++ b/filebeat/input/filestream/input.go @@ -22,6 +22,7 @@ import ( "fmt" "io" "os" + "slices" "time" "golang.org/x/text/transform" @@ -380,6 +381,15 @@ func (inp *filestream) readFromSource( s.Offset += int64(message.Bytes) + int64(message.Offset) + flags, err := message.Fields.GetValue("log.flags") + if err == nil { + if flags, ok := flags.([]string); ok { + if slices.Contains(flags, "truncated") { //nolint:typecheck,nolintlint // linter fails to infer generics + metrics.MessagesTruncated.Add(1) + } + } + } + metrics.MessagesRead.Inc() if message.IsEmpty() || inp.isDroppedLine(log, string(message.Content)) { continue diff --git a/filebeat/input/filestream/internal/input-logfile/metrics.go b/filebeat/input/filestream/internal/input-logfile/metrics.go index 194e72f614d..edc6e915934 100644 --- a/filebeat/input/filestream/internal/input-logfile/metrics.go +++ b/filebeat/input/filestream/internal/input-logfile/metrics.go @@ -29,14 +29,15 @@ import ( type Metrics struct { unregister func() - FilesOpened *monitoring.Uint // Number of files that have been opened. - FilesClosed *monitoring.Uint // Number of files closed. - FilesActive *monitoring.Uint // Number of files currently open (gauge). - MessagesRead *monitoring.Uint // Number of messages read. - BytesProcessed *monitoring.Uint // Number of bytes processed. - EventsProcessed *monitoring.Uint // Number of events processed. - ProcessingErrors *monitoring.Uint // Number of processing errors. - ProcessingTime metrics.Sample // Histogram of the elapsed time for processing an event. + FilesOpened *monitoring.Uint // Number of files that have been opened. + FilesClosed *monitoring.Uint // Number of files closed. + FilesActive *monitoring.Uint // Number of files currently open (gauge). + MessagesRead *monitoring.Uint // Number of messages read. + MessagesTruncated *monitoring.Uint // Number of messages truncated. + BytesProcessed *monitoring.Uint // Number of bytes processed. + EventsProcessed *monitoring.Uint // Number of events processed. + ProcessingErrors *monitoring.Uint // Number of processing errors. + ProcessingTime metrics.Sample // Histogram of the elapsed time for processing an event. // Those metrics use the same registry/keys as the log input uses HarvesterStarted *monitoring.Int @@ -65,15 +66,16 @@ func NewMetrics(id string) *Metrics { reg, unreg := inputmon.NewInputRegistry("filestream", id, nil) m := Metrics{ - unregister: unreg, - FilesOpened: monitoring.NewUint(reg, "files_opened_total"), - FilesClosed: monitoring.NewUint(reg, "files_closed_total"), - FilesActive: monitoring.NewUint(reg, "files_active"), - MessagesRead: monitoring.NewUint(reg, "messages_read_total"), - BytesProcessed: monitoring.NewUint(reg, "bytes_processed_total"), - EventsProcessed: monitoring.NewUint(reg, "events_processed_total"), - ProcessingErrors: monitoring.NewUint(reg, "processing_errors_total"), - ProcessingTime: metrics.NewUniformSample(1024), + unregister: unreg, + FilesOpened: monitoring.NewUint(reg, "files_opened_total"), + FilesClosed: monitoring.NewUint(reg, "files_closed_total"), + FilesActive: monitoring.NewUint(reg, "files_active"), + MessagesRead: monitoring.NewUint(reg, "messages_read_total"), + MessagesTruncated: monitoring.NewUint(reg, "messages_truncated_total"), + BytesProcessed: monitoring.NewUint(reg, "bytes_processed_total"), + EventsProcessed: monitoring.NewUint(reg, "events_processed_total"), + ProcessingErrors: monitoring.NewUint(reg, "processing_errors_total"), + ProcessingTime: metrics.NewUniformSample(1024), HarvesterStarted: monitoring.NewInt(harvesterMetrics, "started"), HarvesterClosed: monitoring.NewInt(harvesterMetrics, "closed"), diff --git a/filebeat/input/filestream/metrics_integration_test.go b/filebeat/input/filestream/metrics_integration_test.go index b551b2321b7..a571b33e825 100644 --- a/filebeat/input/filestream/metrics_integration_test.go +++ b/filebeat/input/filestream/metrics_integration_test.go @@ -40,9 +40,22 @@ func TestFilestreamMetrics(t *testing.T) { "close.on_state_change.inactive": "2s", "prospector.scanner.fingerprint.enabled": false, "file_identity.native": map[string]any{}, + "message_max_bytes": 20, + "parsers": []map[string]interface{}{ + { + "multiline": map[string]interface{}{ + "type": "pattern", + "pattern": "^multiline", + "negate": true, + "match": "after", + "max_lines": 1, + "timeout": "1s", + }, + }, + }, }) - testlines := []byte("first line\nsecond line\nthird line\n") + testlines := []byte("first line\nsecond line\nthird line\nthis is a very long line exceeding message_max_bytes\nmultiline first line\nmultiline second line\n") env.mustWriteToFile(testlogName, testlines) ctx, cancelInput := context.WithCancel(context.Background()) @@ -53,13 +66,105 @@ func TestFilestreamMetrics(t *testing.T) { env.waitUntilHarvesterIsDone() checkMetrics(t, "fake-ID", expectedMetrics{ - FilesOpened: 1, - FilesClosed: 1, - FilesActive: 0, - MessagesRead: 3, - BytesProcessed: 34, - EventsProcessed: 3, - ProcessingErrors: 0, + FilesOpened: 1, + FilesClosed: 1, + FilesActive: 0, + MessagesRead: 3, + MessagesTruncated: 2, + BytesProcessed: 130, + EventsProcessed: 3, + ProcessingErrors: 0, + }) + + cancelInput() + env.waitUntilInputStops() +} + +func TestFilestreamMessageMaxBytesTruncatedMetric(t *testing.T) { + env := newInputTestingEnvironment(t) + + testlogName := "test.log" + inp := env.mustCreateInput(map[string]interface{}{ + "id": "fake-ID", + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "24h", + "close.on_state_change.check_interval": "100ms", + "close.on_state_change.inactive": "2s", + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, + "message_max_bytes": 20, + }) + + testlines := []byte("first line\nsecond line\nthird line\nthis is a long line exceeding message_max_bytes\n") + env.mustWriteToFile(testlogName, testlines) + + ctx, cancelInput := context.WithCancel(context.Background()) + env.startInput(ctx, inp) + + env.waitUntilEventCount(4) + env.requireOffsetInRegistry(testlogName, "fake-ID", len(testlines)) + env.waitUntilHarvesterIsDone() + + checkMetrics(t, "fake-ID", expectedMetrics{ + FilesOpened: 1, + FilesClosed: 1, + FilesActive: 0, + MessagesRead: 4, + MessagesTruncated: 1, + BytesProcessed: 82, + EventsProcessed: 4, + ProcessingErrors: 0, + }) + + cancelInput() + env.waitUntilInputStops() +} + +func TestFilestreamMultilineMaxLinesTruncatedMetric(t *testing.T) { + env := newInputTestingEnvironment(t) + + testlogName := "test.log" + inp := env.mustCreateInput(map[string]interface{}{ + "id": "fake-ID", + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "24h", + "close.on_state_change.check_interval": "100ms", + "close.on_state_change.inactive": "2s", + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, + "parsers": []map[string]interface{}{ + { + "multiline": map[string]interface{}{ + "type": "pattern", + "pattern": "^multiline", + "negate": true, + "match": "after", + "max_lines": 1, + "timeout": "1s", + }, + }, + }, + }) + + testlines := []byte("first line\nsecond line\nmultiline first line\nmultiline second line\n") + env.mustWriteToFile(testlogName, testlines) + + ctx, cancelInput := context.WithCancel(context.Background()) + env.startInput(ctx, inp) + + env.waitUntilEventCount(3) + env.requireOffsetInRegistry(testlogName, "fake-ID", len(testlines)) + env.waitUntilHarvesterIsDone() + + checkMetrics(t, "fake-ID", expectedMetrics{ + FilesOpened: 1, + FilesClosed: 1, + FilesActive: 0, + MessagesRead: 3, + MessagesTruncated: 1, + BytesProcessed: 66, + EventsProcessed: 3, + ProcessingErrors: 0, }) cancelInput() @@ -67,23 +172,26 @@ func TestFilestreamMetrics(t *testing.T) { } type expectedMetrics struct { - FilesOpened uint64 - FilesClosed uint64 - FilesActive uint64 - MessagesRead uint64 - BytesProcessed uint64 - EventsProcessed uint64 - ProcessingErrors uint64 + FilesOpened uint64 + FilesClosed uint64 + FilesActive uint64 + MessagesRead uint64 + MessagesTruncated uint64 + BytesProcessed uint64 + EventsProcessed uint64 + ProcessingErrors uint64 } func checkMetrics(t *testing.T, id string, expected expectedMetrics) { - reg := monitoring.GetNamespace("dataset").GetRegistry().Get(id).(*monitoring.Registry) + reg, ok := monitoring.GetNamespace("dataset").GetRegistry().Get(id).(*monitoring.Registry) + require.True(t, ok, "registry not found") require.Equal(t, id, reg.Get("id").(*monitoring.String).Get(), "id") require.Equal(t, "filestream", reg.Get("input").(*monitoring.String).Get(), "input") require.Equal(t, expected.FilesOpened, reg.Get("files_opened_total").(*monitoring.Uint).Get(), "files_opened_total") require.Equal(t, expected.FilesClosed, reg.Get("files_closed_total").(*monitoring.Uint).Get(), "files_closed_total") require.Equal(t, expected.MessagesRead, reg.Get("messages_read_total").(*monitoring.Uint).Get(), "messages_read_total") + require.Equal(t, expected.MessagesTruncated, reg.Get("messages_truncated_total").(*monitoring.Uint).Get(), "messages_truncated_total") require.Equal(t, expected.BytesProcessed, reg.Get("bytes_processed_total").(*monitoring.Uint).Get(), "bytes_processed_total") require.Equal(t, expected.EventsProcessed, reg.Get("events_processed_total").(*monitoring.Uint).Get(), "events_processed_total") require.Equal(t, expected.ProcessingErrors, reg.Get("processing_errors_total").(*monitoring.Uint).Get(), "processing_errors_total") diff --git a/libbeat/reader/multiline/message_buffer.go b/libbeat/reader/multiline/message_buffer.go index 506efc7599d..064dbc4fce7 100644 --- a/libbeat/reader/multiline/message_buffer.go +++ b/libbeat/reader/multiline/message_buffer.go @@ -120,11 +120,11 @@ func (b *messageBuffer) addLine(m reader.Message) { // finalize writes the existing content into the returned message and resets all reader variables. func (b *messageBuffer) finalize() reader.Message { if b.truncated > 0 { - b.message.AddFlagsWithKey("log.flags", "truncated") + b.message.AddFlagsWithKey("log.flags", "truncated") //nolint:errcheck // It is safe to ignore the error. } if b.numLines > 1 { - b.message.AddFlagsWithKey("log.flags", "multiline") + b.message.AddFlagsWithKey("log.flags", "multiline") //nolint:errcheck // It is safe to ignore the error. } // Copy message from existing content