Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filter_multiline: Add functionality to use as processor #9521

Merged
merged 1 commit into from
Nov 4, 2024

Conversation

drbugfinder-work
Copy link
Contributor

This PR adds the support to re-emit the processed multiline records back into the processor pipeline.

It is basically a copy of the rewrite_tag code:

static int ingest_inline(struct flb_rewrite_tag *ctx,
flb_sds_t out_tag,
const void *buf, size_t buf_size)
{
struct flb_input_instance *input_instance;
struct flb_processor_unit *processor_unit;
struct flb_processor *processor;
int result;
if (ctx->ins->parent_processor != NULL) {
processor_unit = (struct flb_processor_unit *) \
ctx->ins->parent_processor;
processor = (struct flb_processor *) processor_unit->parent;
input_instance = (struct flb_input_instance *) processor->data;
if (processor->source_plugin_type == FLB_PLUGIN_INPUT) {
result = flb_input_log_append_skip_processor_stages(
input_instance,
processor_unit->stage + 1,
out_tag, flb_sds_len(out_tag),
buf, buf_size);
if (result == 0) {
return FLB_TRUE;
}
}
}
return FLB_FALSE;
}


Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
  • Debug log output from testing the change
  • Attached Valgrind output that shows no leaks or memory corruption was found

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • [N/A] Run local packaging test showing all targets (including any new ones) build.
  • [N/A] Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

  • [N/A] Documentation required for this feature

Backporting

  • [N/A] Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

This commit adds the support to re-emit the processed multiline records back into the processor pipeline

Signed-off-by: Richard Treu <[email protected]>
@drbugfinder-work
Copy link
Contributor Author

Valgrind output

root@fb-dev:~/fluent-bit-ml-processor/fluent-bit/build/bin# valgrind --leak-check=full ./fluent-bit -c fluent-bit.yaml
==871136== Memcheck, a memory error detector
==871136== Copyright (C) 2002-2017, and GNU GPL'd, by Julian Seward et al.
==871136== Using Valgrind-3.15.0 and LibVEX; rerun with -h for copyright info
==871136== Command: ./fluent-bit -c fluent-bit.yaml
==871136== 
Fluent Bit v3.2.0
* Copyright (C) 2015-2024 The Fluent Bit Authors
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

______ _                  _    ______ _ _           _____  __  
|  ___| |                | |   | ___ (_) |         |____ |/  | 
| |_  | |_   _  ___ _ __ | |_  | |_/ /_| |_  __   __   / /`| | 
|  _| | | | | |/ _ \ '_ \| __| | ___ \ | __| \ \ / /   \ \ | | 
| |   | | |_| |  __/ | | | |_  | |_/ / | |_   \ V /.___/ /_| |_
\_|   |_|\__,_|\___|_| |_|\__| \____/|_|\__|   \_/ \____(_)___/

[2024/10/24 09:58:52] [ info] Configuration:
[2024/10/24 09:58:52] [ info]  flush time     | 1.000000 seconds
[2024/10/24 09:58:52] [ info]  grace          | 5 seconds
[2024/10/24 09:58:52] [ info]  daemon         | 0
[2024/10/24 09:58:52] [ info] ___________
[2024/10/24 09:58:52] [ info]  inputs:
[2024/10/24 09:58:52] [ info]      tail
[2024/10/24 09:58:52] [ info] ___________
[2024/10/24 09:58:52] [ info]  filters:
[2024/10/24 09:58:52] [ info] ___________
[2024/10/24 09:58:52] [ info]  outputs:
[2024/10/24 09:58:52] [ info] ___________
[2024/10/24 09:58:52] [ info]  collectors:
[2024/10/24 09:58:52] [ info] [fluent bit] version=3.2.0, commit=1e753b5f6c, pid=871136
[2024/10/24 09:58:52] [debug] [engine] coroutine stack size: 24576 bytes (24.0K)
[2024/10/24 09:58:52] [ info] [storage] ver=1.5.2, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2024/10/24 09:58:52] [debug] [input:tail:tail.0] flb_tail_fs_inotify_init() initializing inotify tail input
[2024/10/24 09:58:52] [ info] [cmetrics] version=0.9.5
[2024/10/24 09:58:52] [debug] [input:tail:tail.0] inotify watch fd=34
[2024/10/24 09:58:52] [ info] [ctraces ] version=0.5.5
[2024/10/24 09:58:52] [debug] [input:tail:tail.0] scanning path test.log
[2024/10/24 09:58:52] [ info] [input:tail:tail.0] initializing
[2024/10/24 09:58:52] [debug] [input:tail:tail.0] file will be read in POSIX_FADV_DONTNEED mode test.log
[2024/10/24 09:58:52] [ info] [input:tail:tail.0] storage_strategy='memory' (memory only)
[2024/10/24 09:58:52] [debug] [input:tail:tail.0] inode=324427 with offset=0 appended as test.log
[2024/10/24 09:58:52] [ info] [input:tail:tail.0] thread instance initialized
[2024/10/24 09:58:52] [debug] [input:tail:tail.0] scan_glob add(): test.log, inode 324427
[2024/10/24 09:58:52] [debug] [tail:tail.0] created event channels: read=36 write=37
[2024/10/24 09:58:52] [debug] [input:tail:tail.0] 1 new files found on path 'test.log'
[2024/10/24 09:58:52] [ info] [input:emitter:ml_processor] initializing
[2024/10/24 09:58:52] [debug] [input:tail:tail.0] [thread init] initialization OK
[2024/10/24 09:58:52] [ info] [input:emitter:ml_processor] storage_strategy='memory' (memory only)
[2024/10/24 09:58:52] [debug] [emitter:ml_processor] created event channels: read=40 write=41
[2024/10/24 09:58:52] [ info] [sp] stream processor started
[0] tail.0: [1729756732.900717527, {}, {"log"=>"single line..."}]
[0] tail.0: [1729756732.922188592, {}, {"log"=>"Dec 14 06:41:08 Exception in thread "main" java.lang.RuntimeException: Something has gone wrong, aborting!
    at com.myproject.module.MyProject.badMethod(MyProject.java:22)
    at com.myproject.module.MyProject.oneMoreMethod(MyProject.java:18)
    at com.myproject.module.MyProject.anotherMethod(MyProject.java:14)
    at com.myproject.module.MyProject.someMethod(MyProject.java:10)
    at com.myproject.module.MyProject.main(MyProject.java:6)"}]
[0] tail.0: [1729756732.923384425, {}, {"log"=>"another line..."}]
[0] tail.0: [1729756732.923485649, {}, {"log"=>"panic: my panic

goroutine 4 [running]:
panic(0x45cb40, 0x47ad70)
  /usr/local/go/src/runtime/panic.go:542 +0x46c fp=0xc42003f7b8 sp=0xc42003f710 pc=0x422f7c
main.main.func1(0xc420024120)
  foo.go:6 +0x39 fp=0xc42003f7d8 sp=0xc42003f7b8 pc=0x451339
runtime.goexit()
  /usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc42003f7e0 sp=0xc42003f7d8 pc=0x44b4d1
created by main.main
  foo.go:5 +0x58

goroutine 1 [chan receive]:
runtime.gopark(0x4739b8, 0xc420024178, 0x46fcd7, 0xc, 0xc420028e17, 0x3)
  /usr/local/go/src/runtime/proc.go:280 +0x12c fp=0xc420053e30 sp=0xc420053e00 pc=0x42503c
runtime.goparkunlock(0xc420024178, 0x46fcd7, 0xc, 0x1000f010040c217, 0x3)
  /usr/local/go/src/runtime/proc.go:286 +0x5e fp=0xc420053e70 sp=0xc420053e30 pc=0x42512e
runtime.chanrecv(0xc420024120, 0x0, 0xc420053f01, 0x4512d8)
  /usr/local/go/src/runtime/chan.go:506 +0x304 fp=0xc420053f20 sp=0xc420053e70 pc=0x4046b4
runtime.chanrecv1(0xc420024120, 0x0)
  /usr/local/go/src/runtime/chan.[2024/10/24 09:58:52] [ info] [filter:multiline:multiline.0] created new multiline stream for tail.0_tail.0
go:388 +0x2b fp=0xc420053f50 sp=0xc420053f20 pc=0x40439b
main.main()
  foo.go:9 +0x6f fp=0xc420053f80 sp=0xc420053f50 pc=0x4512ef
runtime.main()
  /usr/local/go/src/runtime/proc.go:185 +0x20d fp=0xc420053fe0 sp=0xc420053f80 pc=0x424bad
runtime.goexit()
  /usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc420053fe8 sp=0xc420053fe0 pc=0x44b4d1

goroutine 2 [force gc (idle)]:
runtime.gopark(0x4739b8, 0x4ad720, 0x47001e, 0xf, 0x14, 0x1)
  /usr/local/go/src/runtime/proc.go:280 +0x12c fp=0xc42003e768 sp=0xc42003e738 pc=0x42503c
runtime.goparkunlock(0x4ad720, 0x47001e, 0xf, 0xc420000114, 0x1)
  /usr/local/go/src/runtime/proc.go:286 +0x5e fp=0xc42003e7a8 sp=0xc42003e768 pc=0x42512e
runtime.forcegchelper()
  /usr/local/go/src/runtime/proc.go:238 +0xcc fp=0xc42003e7e0 sp=0xc42003e7a8 pc=0x424e5c
runtime.goexit()
  /usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc42003e7e8 sp=0xc42003e7e0 pc=0x44b4d1
created by runtime.init.4
  /usr/local/go/src/runtime/proc.go:227 +0x35

goroutine 3 [GC sweep wait]:
runtime.gopark(0x4739b8, 0x4ad7e0, 0x46fdd2, 0xd, 0x419914, 0x1)
  /usr/local/go/src/runtime/proc.go:280 +0x12c fp=0xc42003ef60 sp=0xc42003ef30 pc=0x42503c
[2024/10/24 09:58:52] [debug] [filter:multiline:multiline.0] Created new ML stream for tail.0_tail.0
runtime.goparkunlock(0x4ad7e0, 0x46fdd2, 0xd, 0x14, 0x1)
  /usr/local/go/src/runtime/proc.go:286 +0x5e fp=0xc42003efa0 sp=0xc42003ef60 pc=0x42512e
runtime.bgsweep(0xc42001e150)
  /usr/local/go/src/runtime/mgcsweep.go:52 +0xa3 fp=0xc42003efd8 sp=0xc42003efa0 pc=0x419973
runtime.goexit()
  /usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc42003efe0 sp=0xc42003efd8 pc=0x44b4d1
created by runtime.gcenable
  /usr/local/go/src/runtime/mgc.go:216 +0x58"}]
[0] tail.0: [1729756732.928994930, {}, {"log"=>"one more line, no multiline"}]
[2024/10/24 09:58:53] [debug] [input:tail:tail.0] [static files] processed 3.0K
[2024/10/24 09:58:53] [debug] [input:tail:tail.0] inode=324427 file=test.log promote to TAIL_EVENT
[2024/10/24 09:58:53] [ info] [input:tail:tail.0] inotify_fs_add(): inode=324427 watch_fd=1 name=test.log
[2024/10/24 09:58:53] [debug] [input:tail:tail.0] [static files] processed 0b, done
[2024/10/24 09:58:53] [debug] [task] created task=0x51c4a30 id=0 without routes, dropping.
[2024/10/24 09:58:53] [debug] [task] destroy task=0x51c4a30 (task_id=0)
^C[2024/10/24 09:58:57] [engine] caught signal (SIGINT)
[2024/10/24 09:58:57] [ warn] [engine] service will shutdown in max 5 seconds
[2024/10/24 09:58:57] [debug] [input:tail:tail.0] thread pause instance
[2024/10/24 09:58:57] [ info] [input] pausing ml_processor
[2024/10/24 09:58:58] [ info] [engine] service has stopped (0 pending tasks)
[2024/10/24 09:58:58] [debug] [input:tail:tail.0] thread pause instance
[2024/10/24 09:58:58] [ info] [input] pausing ml_processor
[2024/10/24 09:58:58] [debug] [input:tail:tail.0] inode=324427 removing file name test.log
[2024/10/24 09:58:58] [ info] [input:tail:tail.0] inotify_fs_remove(): inode=324427 watch_fd=1
[2024/10/24 09:58:58] [debug] [input:tail:tail.0] thread exit instance
==871136== 
==871136== HEAP SUMMARY:
==871136==     in use at exit: 0 bytes in 0 blocks
==871136==   total heap usage: 2,998 allocs, 2,998 frees, 2,239,006 bytes allocated
==871136== 
==871136== All heap blocks were freed -- no leaks are possible
==871136== 
==871136== For lists of detected and suppressed errors, rerun with: -s
==871136== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)
root@fb-dev:~/fluent-bit-ml-processor/fluent-bit/build/bin# 

@drbugfinder-work
Copy link
Contributor Author

Example configuration

root@fb-dev:~/fluent-bit-ml-processor/fluent-bit/build/bin# cat fluent-bit.yaml 
service:
  log_level: debug
  flush_interval: 1
  parsers_file: parsers_multiline.conf

pipeline:
  inputs:
    - name: tail
      path: test.log
      read_from_head: true
      threaded: true
      processors:
        logs:
          - name: multiline
            multiline.parser: go, multiline-regex-test
            multiline.key_content: log
            emitter_name: ml_processor
          - name: stdout
root@fb-dev:~/fluent-bit-ml-processor/fluent-bit/build/bin# cat parsers_multiline.conf 
[MULTILINE_PARSER]
    name          multiline-regex-test
    type          regex
    flush_timeout 1000
    #
    # Regex rules for multiline parsing
    # ---------------------------------
    #
    # configuration hints:
    #
    #  - first state always has the name: start_state
    #  - every field in the rule must be inside double quotes
    #
    # rules |   state name  | regex pattern                  | next state
    # ------|---------------|--------------------------------------------
    rule      "start_state"   "/([A-Za-z]+ \d+ \d+\:\d+\:\d+)(.*)/"  "cont"
    rule      "cont"          "/^\s+at.*/"                     "cont"
    

root@fb-dev:~/fluent-bit-ml-processor/fluent-bit/build/bin# cat test.log 
single line...
Dec 14 06:41:08 Exception in thread "main" java.lang.RuntimeException: Something has gone wrong, aborting!
    at com.myproject.module.MyProject.badMethod(MyProject.java:22)
    at com.myproject.module.MyProject.oneMoreMethod(MyProject.java:18)
    at com.myproject.module.MyProject.anotherMethod(MyProject.java:14)
    at com.myproject.module.MyProject.someMethod(MyProject.java:10)
    at com.myproject.module.MyProject.main(MyProject.java:6)
another line...
panic: my panic

goroutine 4 [running]:
panic(0x45cb40, 0x47ad70)
  /usr/local/go/src/runtime/panic.go:542 +0x46c fp=0xc42003f7b8 sp=0xc42003f710 pc=0x422f7c
main.main.func1(0xc420024120)
  foo.go:6 +0x39 fp=0xc42003f7d8 sp=0xc42003f7b8 pc=0x451339
runtime.goexit()
  /usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc42003f7e0 sp=0xc42003f7d8 pc=0x44b4d1
created by main.main
  foo.go:5 +0x58

goroutine 1 [chan receive]:
runtime.gopark(0x4739b8, 0xc420024178, 0x46fcd7, 0xc, 0xc420028e17, 0x3)
  /usr/local/go/src/runtime/proc.go:280 +0x12c fp=0xc420053e30 sp=0xc420053e00 pc=0x42503c
runtime.goparkunlock(0xc420024178, 0x46fcd7, 0xc, 0x1000f010040c217, 0x3)
  /usr/local/go/src/runtime/proc.go:286 +0x5e fp=0xc420053e70 sp=0xc420053e30 pc=0x42512e
runtime.chanrecv(0xc420024120, 0x0, 0xc420053f01, 0x4512d8)
  /usr/local/go/src/runtime/chan.go:506 +0x304 fp=0xc420053f20 sp=0xc420053e70 pc=0x4046b4
runtime.chanrecv1(0xc420024120, 0x0)
  /usr/local/go/src/runtime/chan.go:388 +0x2b fp=0xc420053f50 sp=0xc420053f20 pc=0x40439b
main.main()
  foo.go:9 +0x6f fp=0xc420053f80 sp=0xc420053f50 pc=0x4512ef
runtime.main()
  /usr/local/go/src/runtime/proc.go:185 +0x20d fp=0xc420053fe0 sp=0xc420053f80 pc=0x424bad
runtime.goexit()
  /usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc420053fe8 sp=0xc420053fe0 pc=0x44b4d1

goroutine 2 [force gc (idle)]:
runtime.gopark(0x4739b8, 0x4ad720, 0x47001e, 0xf, 0x14, 0x1)
  /usr/local/go/src/runtime/proc.go:280 +0x12c fp=0xc42003e768 sp=0xc42003e738 pc=0x42503c
runtime.goparkunlock(0x4ad720, 0x47001e, 0xf, 0xc420000114, 0x1)
  /usr/local/go/src/runtime/proc.go:286 +0x5e fp=0xc42003e7a8 sp=0xc42003e768 pc=0x42512e
runtime.forcegchelper()
  /usr/local/go/src/runtime/proc.go:238 +0xcc fp=0xc42003e7e0 sp=0xc42003e7a8 pc=0x424e5c
runtime.goexit()
  /usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc42003e7e8 sp=0xc42003e7e0 pc=0x44b4d1
created by runtime.init.4
  /usr/local/go/src/runtime/proc.go:227 +0x35

goroutine 3 [GC sweep wait]:
runtime.gopark(0x4739b8, 0x4ad7e0, 0x46fdd2, 0xd, 0x419914, 0x1)
  /usr/local/go/src/runtime/proc.go:280 +0x12c fp=0xc42003ef60 sp=0xc42003ef30 pc=0x42503c
runtime.goparkunlock(0x4ad7e0, 0x46fdd2, 0xd, 0x14, 0x1)
  /usr/local/go/src/runtime/proc.go:286 +0x5e fp=0xc42003efa0 sp=0xc42003ef60 pc=0x42512e
runtime.bgsweep(0xc42001e150)
  /usr/local/go/src/runtime/mgcsweep.go:52 +0xa3 fp=0xc42003efd8 sp=0xc42003efa0 pc=0x419973
runtime.goexit()
  /usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc42003efe0 sp=0xc42003efd8 pc=0x44b4d1
created by runtime.gcenable
  /usr/local/go/src/runtime/mgc.go:216 +0x58
one more line, no multiline

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants