Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wasm: filter_wasm: Pass through msgpack format records into wasm filters #8431

Merged
merged 5 commits into from
Mar 14, 2024

Conversation

cosmo0920
Copy link
Contributor

@cosmo0920 cosmo0920 commented Jan 29, 2024

Currently, wasm filter uses JSON encoding to pass records and its associated metadata.
There is a opportunity to improve the capability to handle msgpack format on Wasm ones because reducing encoding/decoding overheads.
This PR intends to pass through msgpack format of records into wasm filter mechanism.
When using msgpack format on wasm filter, it needn't encode from JSON to msgpack.


Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
[SERVICE]
    Flush        1
    Daemon       Off
    Log_Level    info
    HTTP_Server  On
    HTTP_Listen  0.0.0.0
    HTTP_Port    2020
    Hot_Reload   On
    Grace 5

[INPUT]
    Name dummy
    Tag dummy.locals
    Dummy {"message":"dummy","wasm_float1":1.0,"wasm_float2":100.0,"wasm_int1":1,"wasm_int2":100}

[FILTER]
    Name wasm
    match dummy.*
    event_format msgpack
    WASM_Path filter_rust_msgpack.aot
    # WASM_Path filter_rust_msgpack.wasm
    Function_Name rust_filter_msgpack
    # Function_Name filter_no_effect
    accessible_paths .,/path/to/Gitrepo/fluent-bit

[OUTPUT]
    Name  stdout
    Match *

  • Debug log output from testing the change
Fluent Bit v3.0.0
* Copyright (C) 2015-2024 The Fluent Bit Authors
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

____________________
< Fluent Bit v2.2.2 >
 -------------------
          \
           \
            \          __---__
                    _-       /--______
               __--( /     \ )XXXXXXXXXXX\v.
             .-XXX(   O   O  )XXXXXXXXXXXXXXX-
            /XXX(       U     )        XXXXXXX\
          /XXXXX(              )--_  XXXXXXXXXXX\
         /XXXXX/ (      O     )   XXXXXX   \XXXXX\
         XXXXX/   /            XXXXXX   \__ \XXXXX
         XXXXXX__/          XXXXXX         \__---->
 ---___  XXX__/          XXXXXX      \__         /
   \-  --__/   ___/\  XXXXXX            /  ___--/=
    \-\    ___/    XXXXXX              '--- XXXXXX
       \-\/XXX\ XXXXXX                      /XXXXX
         \XXXXXXXXX   \                    /XXXXX/
          \XXXXXX      >                 _/XXXXX/
            \XXXXX--__/              __-- XXXX/
             -XXXXXXXX---------------  XXXXXX-
                \XXXXXXXXXXXXXXXXXXXXXXXXXX/
                  ""VXXXXXXXXXXXXXXXXXXV""

[2024/01/29 20:37:06] [ info] Configuration:
[2024/01/29 20:37:06] [ info]  flush time     | 1.000000 seconds
[2024/01/29 20:37:06] [ info]  grace          | 5 seconds
[2024/01/29 20:37:06] [ info]  daemon         | 0
[2024/01/29 20:37:06] [ info] ___________
[2024/01/29 20:37:06] [ info]  inputs:
[2024/01/29 20:37:06] [ info]      dummy
[2024/01/29 20:37:06] [ info] ___________
[2024/01/29 20:37:06] [ info]  filters:
[2024/01/29 20:37:06] [ info]      wasm.0
[2024/01/29 20:37:06] [ info] ___________
[2024/01/29 20:37:06] [ info]  outputs:
[2024/01/29 20:37:06] [ info]      stdout.0
[2024/01/29 20:37:06] [ info] ___________
[2024/01/29 20:37:06] [ info]  collectors:
[2024/01/29 20:37:06] [ info] [fluent bit] version=3.0.0, commit=7a95d4b243, pid=2747427
[2024/01/29 20:37:06] [debug] [engine] coroutine stack size: 24576 bytes (24.0K)
[2024/01/29 20:37:06] [ info] [storage] ver=1.1.6, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2024/01/29 20:37:06] [ info] [cmetrics] version=0.6.6
[2024/01/29 20:37:06] [ info] [ctraces ] version=0.4.0
[2024/01/29 20:37:06] [ info] [input:dummy:dummy.0] initializing
[2024/01/29 20:37:06] [ info] [input:dummy:dummy.0] storage_strategy='memory' (memory only)
[2024/01/29 20:37:06] [debug] [dummy:dummy.0] created event channels: read=21 write=22
[2024/01/29 20:37:06] [debug] [stdout:stdout.0] created event channels: read=23 write=24
[2024/01/29 20:37:06] [ info] [output:stdout:stdout.0] worker #0 started
[2024/01/29 20:37:06] [ info] [http_server] listen iface=0.0.0.0 tcp_port=2020
[2024/01/29 20:37:06] [ info] [sp] stream processor started
[2024/01/29 20:37:06] [debug] [input chunk] update output instances with new chunk size diff=199, records=1, input=dummy.0
[2024/01/29 20:37:07] [debug] [task] created task=0x7fcdd401f3d0 id=0 OK
[2024/01/29 20:37:07] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] dummy.locals: [[1706528226.476400669, {}], {"message"=>"dummy", "time"=>"2024-01-29T11:37:06.476400669 +0000", "tag"=>"dummy.locals", "original"=>{"message"=>"dummy", "platform"=>"wasm", "wasm_float1"=>"1", "wasm_float2"=>"100", "wasm_int1"=>"1", "wasm_int2"=>"100"}, "lang"=>"Rust"}]
[2024/01/29 20:37:07] [debug] [out flush] cb_destroy coro_id=0
[2024/01/29 20:37:07] [debug] [input chunk] update output instances with new chunk size diff=199, records=1, input=dummy.0
[2024/01/29 20:37:07] [debug] [task] destroy task=0x7fcdd401f3d0 (task_id=0)
[2024/01/29 20:37:08] [debug] [task] created task=0x7fcdd40395b0 id=0 OK
[2024/01/29 20:37:08] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] dummy.locals: [[1706528227.476473623, {}], {"message"=>"dummy", "time"=>"2024-01-29T11:37:07.476473623 +0000", "tag"=>"dummy.locals", "original"=>{"message"=>"dummy", "platform"=>"wasm", "wasm_float1"=>"1", "wasm_float2"=>"100", "wasm_int1"=>"1", "wasm_int2"=>"100"}, "lang"=>"Rust"}]
[2024/01/29 20:37:08] [debug] [out flush] cb_destroy coro_id=1
[2024/01/29 20:37:08] [debug] [input chunk] update output instances with new chunk size diff=199, records=1, input=dummy.0
[2024/01/29 20:37:08] [debug] [task] destroy task=0x7fcdd40395b0 (task_id=0)
^C[2024/01/29 20:37:08] [engine] caught signal (SIGINT)
[2024/01/29 20:37:08] [debug] [task] created task=0x7fcdd403b6a0 id=0 OK
[2024/01/29 20:37:08] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[2024/01/29 20:37:08] [ warn] [engine] service will shutdown in max 5 seconds
[2024/01/29 20:37:08] [ info] [input] pausing dummy.0
[0] dummy.locals: [[1706528228.476343453, {}], {"message"=>"dummy", "time"=>"2024-01-29T11:37:08.476343453 +0000", "tag"=>"dummy.locals", "original"=>{"message"=>"dummy", "platform"=>"wasm", "wasm_float1"=>"1", "wasm_float2"=>"100", "wasm_int1"=>"1", "wasm_int2"=>"100"}, "lang"=>"Rust"}]
[2024/01/29 20:37:08] [debug] [out flush] cb_destroy coro_id=2
[2024/01/29 20:37:08] [debug] [task] destroy task=0x7fcdd403b6a0 (task_id=0)
[2024/01/29 20:37:09] [ info] [engine] service has stopped (0 pending tasks)
[2024/01/29 20:37:09] [ info] [input] pausing dummy.0
[2024/01/29 20:37:09] [ info] [output:stdout:stdout.0] thread worker #0 stopping...
[2024/01/29 20:37:09] [ info] [output:stdout:stdout.0] thread worker #0 stopped
  • Attached Valgrind output that shows no leaks or memory corruption was found

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • Run local packaging test showing all targets (including any new ones) build.
  • Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

  • Documentation required for this feature

fluent/fluent-bit-docs#1293

Backporting

  • Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

@cosmo0920 cosmo0920 force-pushed the cosmo0920-msgpack-packed-records-wasm-micro-runtime branch from 6242a25 to bf49ab5 Compare January 30, 2024 05:47
@cosmo0920 cosmo0920 force-pushed the cosmo0920-msgpack-packed-records-wasm-micro-runtime branch from bf49ab5 to 243ed22 Compare January 31, 2024 08:10
@cosmo0920 cosmo0920 force-pushed the cosmo0920-msgpack-packed-records-wasm-micro-runtime branch from 243ed22 to afc3669 Compare January 31, 2024 08:12
@cosmo0920 cosmo0920 marked this pull request as ready for review January 31, 2024 09:33
@edsiper edsiper added this to the Fluent Bit v3.0.0 milestone Mar 14, 2024
@edsiper edsiper merged commit 47f4287 into master Mar 14, 2024
46 checks passed
@edsiper edsiper deleted the cosmo0920-msgpack-packed-records-wasm-micro-runtime branch March 14, 2024 21:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants