Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

out_kafka: Introduce raw_log_key to write a single value to kafka #8655

Merged
merged 3 commits into from
Nov 9, 2024

Conversation

zecke
Copy link
Contributor

@zecke zecke commented Apr 1, 2024

Implement a new option called raw_log_key that allows to write a single value to kafka. This allows to use message_key to put some fields into the Kafka message_key and another field as the payload. Make this work as an additional format.


Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
  • Debug log output from testing the change
  • Attached Valgrind output that shows no leaks or memory corruption was found

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • [N/A] Run local packaging test showing all targets (including any new ones) build.
  • [N/A] Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

  • Documentation required for this feature

Backporting

  • Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

@zecke
Copy link
Contributor Author

zecke commented Apr 1, 2024

Example config used for testing this:

[INPUT]
    Name  random

[FILTER]
    Name type_converter
    Match *
    uint_key rand_value rand_value string

[FILTER]
    Name modify
    Match *
    Add message Foo123

[OUTPUT]
    Name        kafka
    Match       *
    Brokers     127.0.0.1:9092
    Topics      test-foo
    Format      raw
    raw_log_key rand_value
    message_key message

Using kcat to read from kafka:

$ kcat -C -b localhost -t test-foo
% Reached end of topic test-foo [0] at offset 470
12174776462032818091
% Reached end of topic test-foo [0] at offset 471
3397822772916645746
% Reached end of topic test-foo [0] at offset 472
3284960532694686745
% Reached end of topic test-foo [0] at offset 473
18034299562221828230
% Reached end of topic test-foo [0] at offset 474
2331075259719154332

output:

./bin/fluent-bit -c ~/source/sre/fb_kafka.ini -vv
Fluent Bit v3.0.1
* Copyright (C) 2015-2024 The Fluent Bit Authors
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

___________.__                        __    __________.__  __          ________  
\_   _____/|  |  __ __   ____   _____/  |_  \______   \__|/  |_  ___  _\_____  \ 
 |    __)  |  | |  |  \_/ __ \ /    \   __\  |    |  _/  \   __\ \  \/ / _(__  < 
 |     \   |  |_|  |  /\  ___/|   |  \  |    |    |   \  ||  |    \   / /       \
 \___  /   |____/____/  \___  >___|  /__|    |______  /__||__|     \_/ /______  /
     \/                     \/     \/               \/                        \/ 

[2024/04/01 19:55:52] [ info] Configuration:
[2024/04/01 19:55:52] [ info]  flush time     | 1.000000 seconds
[2024/04/01 19:55:52] [ info]  grace          | 5 seconds
[2024/04/01 19:55:52] [ info]  daemon         | 0
[2024/04/01 19:55:52] [ info] ___________
[2024/04/01 19:55:52] [ info]  inputs:
[2024/04/01 19:55:52] [ info]      random
[2024/04/01 19:55:52] [ info] ___________
[2024/04/01 19:55:52] [ info]  filters:
[2024/04/01 19:55:52] [ info]      type_converter.0
[2024/04/01 19:55:52] [ info]      modify.1
[2024/04/01 19:55:52] [ info] ___________
[2024/04/01 19:55:52] [ info]  outputs:
[2024/04/01 19:55:52] [ info]      kafka.0
[2024/04/01 19:55:52] [ info] ___________
[2024/04/01 19:55:52] [ info]  collectors:
[2024/04/01 19:55:52] [ info] [fluent bit] version=3.0.1, commit=cf3e53ce03, pid=81063
[2024/04/01 19:55:52] [debug] [engine] coroutine stack size: 36864 bytes (36.0K)
[2024/04/01 19:55:52] [ info] [storage] ver=1.5.1, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2024/04/01 19:55:52] [ info] [cmetrics] version=0.7.1
[2024/04/01 19:55:52] [ info] [ctraces ] version=0.4.0
[2024/04/01 19:55:52] [ info] [input:random:random.0] initializing
[2024/04/01 19:55:52] [ info] [input:random:random.0] storage_strategy='memory' (memory only)
[2024/04/01 19:55:52] [debug] [random:random.0] created event channels: read=21 write=22
[2024/04/01 19:55:52] [debug] [input:random:random.0] interval_sec=1 interval_nsec=0
[2024/04/01 19:55:52] [debug] [filter:modify:modify.1] Initialized modify filter with 0 conditions and 1 rules
[2024/04/01 19:55:52] [debug] [kafka:kafka.0] created event channels: read=23 write=24
[2024/04/01 19:55:52] [ info] [output:kafka:kafka.0] brokers='127.0.0.1:9092' topics='test-foo'
[2024/04/01 19:55:52] [ info] [sp] stream processor started
[2024/04/01 19:55:53] [trace] [filter:modify:modify.1 at fluent-bit/plugins/filter_modify/modify.c:1427] Input map size 2 elements, output map size 3 elements
[2024/04/01 19:55:53] [debug] [input chunk] update output instances with new chunk size diff=89, records=1, input=random.0
[2024/04/01 19:55:54] [trace] [task 0x600000dbc000] created (id=0)
[2024/04/01 19:55:54] [debug] [task] created task=0x600000dbc000 id=0 OK
[2024/04/01 19:55:54] [trace] [filter:modify:modify.1 at fluent-bit/plugins/filter_modify/modify.c:1427] Input map size 2 elements, output map size 3 elements
[2024/04/01 19:55:54] [debug] [input chunk] update output instances with new chunk size diff=89, records=1, input=random.0
{"rand_value"[2024/04/01 19:55:54] [debug] in produce_message

=>17896051301435224995, "rand_value"=>"17896051301435224995", "message"=>"Foo123"}[2024/04/01 19:55:54] [debug] [output:kafka:kafka.0] enqueued message (20 bytes) for topic 'test-foo'
[2024/04/01 19:55:54] [trace] [engine] [task event] task_id=0 out_id=0 return=OK
[2024/04/01 19:55:54] [debug] [out flush] cb_destroy coro_id=0
[2024/04/01 19:55:54] [trace] [coro] destroy coroutine=0x600003aa0040 data=0x600003aa0058
[2024/04/01 19:55:54] [debug] [task] destroy task=0x600000dbc000 (task_id=0)
[2024/04/01 19:55:55] [trace] [task 0x600000da0000] created (id=0)
[2024/04/01 19:55:55] [debug] [task] created task=0x600000da0000 id=0 OK
[2024/04/01 19:55:55] [trace] [filter:modify:modify.1 at fluent-bit/plugins/filter_modify/modify.c:1427] Input map size 2 elements, output map size 3 elements
[2024/04/01 19:55:55] [debug] [input chunk] update output instances with new chunk size diff=88, records=1, input=random.0
{"rand_value[2024/04/01 19:55:55] [debug] in produce_message

"=>15726046629086841672, "rand_value"=>"15726046629086841672", "message"=>"Foo123"}[2024/04/01 19:55:55] [debug] [output:kafka:kafka.0] enqueued message (20 bytes) for topic 'test-foo'
[2024/04/01 19:55:55] [debug] [output:kafka:kafka.0] message delivered (20 bytes, partition 0)
[2024/04/01 19:55:55] [trace] [engine] [task event] task_id=0 out_id=0 return=OK
[2024/04/01 19:55:55] [debug] [out flush] cb_destroy coro_id=1
[2024/04/01 19:55:55] [trace] [coro] destroy coroutine=0x600003aa4040 data=0x600003aa4058
[2024/04/01 19:55:55] [debug] [task] destroy task=0x600000da0000 (task_id=0)
[2024/04/01 19:55:56] [trace] [task 0x600000da4000] created (id=0)
[2024/04/01 19:55:56] [debug] [task] created task=0x600000da4000 id=0 OK
[2024/04/01 19:55:56] [trace] [filter:modify:modify.1 at fluent-bit/plugins/filter_modify/modify.c:1427] Input map size 2 elements, output map size 3 elements
[2024/04/01 19:55:56] [debug] [input chunk] update output instances with new chunk size diff=87, records=1, input=random.0
{"rand_value"=>1477704631817544547, "rand_value"=>"1477704631817544547", "message[2024/04/01 19:55:56] [debug] in produce_message

"=>"Foo123"}[2024/04/01 19:55:56] [debug] [output:kafka:kafka.0] enqueued message (19 bytes) for topic 'test-foo'
[2024/04/01 19:55:56] [debug] [output:kafka:kafka.0] message delivered (20 bytes, partition 0)
[2024/04/01 19:55:56] [trace] [engine] [task event] task_id=0 out_id=0 return=OK
[2024/04/01 19:55:56] [debug] [out flush] cb_destroy coro_id=2
[2024/04/01 19:55:56] [trace] [coro] destroy coroutine=0x600003aa8040 data=0x600003aa8058
[2024/04/01 19:55:56] [debug] [task] destroy task=0x600000da4000 (task_id=0)
[2024/04/01 19:55:57] [trace] [task 0x600000da4000] created (id=0)
[2024/04/01 19:55:57] [debug] [task] created task=0x600000da4000 id=0 OK
[2024/04/01 19:55:57] [trace] [filter:modify:modify.1 at fluent-bit/plugins/filter_modify/modify.c:1427] Input map size 2 elements, output map size 3 elements
[2024/04/01 19:55:57] [debug] [input chunk] update output instances with new chunk size diff=88, records=1, input=random.0
{"rand_value"=>567065323002200912[2024/04/01 19:55:57] [debug] in produce_message

, "rand_value"=>"567065323002200912", "message"=>"Foo123"}[2024/04/01 19:55:57] [debug] [output:kafka:kafka.0] enqueued message (18 bytes) for topic 'test-foo'
[2024/04/01 19:55:57] [debug] [output:kafka:kafka.0] message delivered (19 bytes, partition 0)
[2024/04/01 19:55:57] [trace] [engine] [task event] task_id=0 out_id=0 return=OK
[2024/04/01 19:55:57] [debug] [out flush] cb_destroy coro_id=3
[2024/04/01 19:55:57] [trace] [coro] destroy coroutine=0x600003aa8040 data=0x600003aa8058
[2024/04/01 19:55:57] [debug] [task] destroy task=0x600000da4000 (task_id=0)
[2024/04/01 19:55:58] [trace] [task 0x600000dbc000] created (id=0)
[2024/04/01 19:55:58] [debug] [task] created task=0x600000dbc000 id=0 OK
[2024/04/01 19:55:58] [trace] [filter:modify:modify.1 at fluent-bit/plugins/filter_modify/modify.c:1427] Input map size 2 elements, output map size 3 elements
[2024/04/01 19:55:58] [debug] [input chunk] update output instances with new chunk size diff=88, records=1, input=random.0
{"rand_value"[2024/04/01 19:55:58] [debug] in produce_message

=>3899690440799581283, "rand_value"=>"3899690440799581283", "message"=>"Foo123"}[2024/04/01 19:55:58] [debug] [output:kafka:kafka.0] enqueued message (19 bytes) for topic 'test-foo'
[2024/04/01 19:55:58] [debug] [output:kafka:kafka.0] message delivered (18 bytes, partition 0)
[2024/04/01 19:55:58] [trace] [engine] [task event] task_id=0 out_id=0 return=OK
[2024/04/01 19:55:58] [debug] [out flush] cb_destroy coro_id=4
[2024/04/01 19:55:58] [trace] [coro] destroy coroutine=0x600003aa0040 data=0x600003aa0058
[2024/04/01 19:55:58] [debug] [task] destroy task=0x600000dbc000 (task_id=0)
^C[2024/04/01 19:55:58] [engine] caught signal (SIGINT)
[2024/04/01 19:55:58] [ info] [input] pausing random.0
[2024/04/01 19:55:58] [debug] [output:kafka:kafka.0] message delivered (19 bytes, partition 0)

@zecke
Copy link
Contributor Author

zecke commented Apr 1, 2024

Fixed an unrelated issue found by valgrind (and added a dedicated commit). The reachable bits belong to global state created by curl (and then two dependencies down...)

==4403== 
==4403== HEAP SUMMARY:
==4403==     in use at exit: 192 bytes in 12 blocks
==4403==   total heap usage: 7,620 allocs, 7,608 frees, 5,391,102 bytes allocated
==4403== 
==4403== 48 bytes in 6 blocks are still reachable in loss record 1 of 2
==4403==    at 0x48850C8: malloc (in /usr/libexec/valgrind/vgpreload_memcheck-arm64-linux.so)
==4403==    by 0x576F83F: ??? (in /usr/lib/aarch64-linux-gnu/libgcrypt.so.20.2.8)
==4403==    by 0x5770E2F: ??? (in /usr/lib/aarch64-linux-gnu/libgcrypt.so.20.2.8)
==4403==    by 0x57D7C5B: ??? (in /usr/lib/aarch64-linux-gnu/libgcrypt.so.20.2.8)
==4403==    by 0x576F73F: ??? (in /usr/lib/aarch64-linux-gnu/libgcrypt.so.20.2.8)
==4403==    by 0x57708DB: ??? (in /usr/lib/aarch64-linux-gnu/libgcrypt.so.20.2.8)
==4403==    by 0x576C443: gcry_control (in /usr/lib/aarch64-linux-gnu/libgcrypt.so.20.2.8)
==4403==    by 0x50F2BC3: libssh2_init (in /usr/lib/aarch64-linux-gnu/libssh2.so.1.0.1)
==4403==    by 0x4DDF8CF: ??? (in /usr/lib/aarch64-linux-gnu/libcurl.so.4.7.0)
==4403==    by 0x4D8D1FB: curl_global_init (in /usr/lib/aarch64-linux-gnu/libcurl.so.4.7.0)
==4403==    by 0x104FF87: rd_http_global_init (rdhttp.c:443)
==4403==    by 0xEC4177: rd_kafka_global_init0 (rdkafka.c:160)
==4403== 
==4403== 144 bytes in 6 blocks are still reachable in loss record 2 of 2
==4403==    at 0x48850C8: malloc (in /usr/libexec/valgrind/vgpreload_memcheck-arm64-linux.so)
==4403==    by 0x576F83F: ??? (in /usr/lib/aarch64-linux-gnu/libgcrypt.so.20.2.8)
==4403==    by 0x5770E2F: ??? (in /usr/lib/aarch64-linux-gnu/libgcrypt.so.20.2.8)
==4403==    by 0x57D7C4F: ??? (in /usr/lib/aarch64-linux-gnu/libgcrypt.so.20.2.8)
==4403==    by 0x576F73F: ??? (in /usr/lib/aarch64-linux-gnu/libgcrypt.so.20.2.8)
==4403==    by 0x57708DB: ??? (in /usr/lib/aarch64-linux-gnu/libgcrypt.so.20.2.8)
==4403==    by 0x576C443: gcry_control (in /usr/lib/aarch64-linux-gnu/libgcrypt.so.20.2.8)
==4403==    by 0x50F2BC3: libssh2_init (in /usr/lib/aarch64-linux-gnu/libssh2.so.1.0.1)
==4403==    by 0x4DDF8CF: ??? (in /usr/lib/aarch64-linux-gnu/libcurl.so.4.7.0)
==4403==    by 0x4D8D1FB: curl_global_init (in /usr/lib/aarch64-linux-gnu/libcurl.so.4.7.0)
==4403==    by 0x104FF87: rd_http_global_init (rdhttp.c:443)
==4403==    by 0xEC4177: rd_kafka_global_init0 (rdkafka.c:160)
==4403== 
==4403== LEAK SUMMARY:
==4403==    definitely lost: 0 bytes in 0 blocks
==4403==    indirectly lost: 0 bytes in 0 blocks
==4403==      possibly lost: 0 bytes in 0 blocks
==4403==    still reachable: 192 bytes in 12 blocks
==4403==         suppressed: 0 bytes in 0 blocks

@zecke
Copy link
Contributor Author

zecke commented Jun 15, 2024

I have rebased this against the master branch.

@agup006
Copy link
Member

agup006 commented Jun 15, 2024

@cosmo0920 would you have some time to take a look

@patrick-stephens patrick-stephens added the ok-package-test Run PR packaging tests label Jun 17, 2024
@patrick-stephens
Copy link
Contributor

@zecke can you link the docs PR for this as well? It's a new parameter I believe is what you're saying so should be documented here (ideally with an example): https://github.com/fluent/fluent-bit-docs/blob/master/pipeline/outputs/kafka.md

I see you've ticked the backport option as well, that typically means also linking a PR to the branch for the backport version.

@zecke
Copy link
Contributor Author

zecke commented Jun 19, 2024

@zecke can you link the docs PR for this as well? It's a new parameter I believe is what you're saying so should be documented here (ideally with an example): https://github.com/fluent/fluent-bit-docs/blob/master/pipeline/outputs/kafka.md

I opened fluent/fluent-bit-docs#1397 Not sure how to link these two together.

I see you've ticked the backport option as well, that typically means also linking a PR to the branch for the backport version.

I have unticked the box. Getting this into any future release will be great.

Copy link
Contributor

@cosmo0920 cosmo0920 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. Thank you!

plugins/out_kafka/kafka_config.c Show resolved Hide resolved
@zecke
Copy link
Contributor Author

zecke commented Jul 8, 2024

The doc PR has been merged (fluent/fluent-bit-docs#1397)

@cosmo0920 cosmo0920 added this to the Fluent Bit v3.1.7 milestone Aug 26, 2024
if (ctx->raw_log_key && !raw_key && val.type == MSGPACK_OBJECT_STR) {
if (key.via.str.size == ctx->raw_log_key_len &&
strncmp(key.via.str.ptr, ctx->raw_log_key, ctx->raw_log_key_len) == 0) {
raw_key = (char *) val.via.str.ptr;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this routine is inside a loop, if I am not wrong the values might be set for one message but if the next message does not match they will have the value from the previous one (cc: @cosmo0920 )

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohh, I've overlooked one. Thanks for catching this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this different to the message_key handling a few lines of above? What is the lifetime of val.via.str.ptr? Shouldn't it be valid until after the msgpack is freed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally, we preserve the temporary reference as a heap memory with flb_sds_create_len.
In this case, just referencing via val.via.str.ptr would be replaced with the latter line in the loop.
So, we need to create a heap region with:

 raw_key = flb_sds_create_len(val.via.str.ptr, val.via.str.size);

and in the finalizing function:

flb_sds_destroy(raw_key);

Plus, this heap should be used flb_sds_t type for it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took a copy of that memory and freed it on the exit paths. flb_sds_destroy(NULL) seems legit (like free(NULL))

Allow to write the value of a single key instead of the entire message
to kafka. This allows to use a part of the message as the message_key
and another part as the payload. This is similar to other output
plugins.

The value of the key must be a string and the format must be set to raw.

Signed-off-by: Holger Hans Peter Freyther <[email protected]>
Create a simple test for the out_kafka plugin that creates the
context and tears it down.

Signed-off-by: Holger Hans Peter Freyther <[email protected]>
The value is owned by the properties of the output and we don't
need to free this.

==57818== Invalid free() / delete / delete[] / realloc()
==57818==    at 0x4887B40: free (vg_replace_malloc.c:872)
==57818==    by 0x8EE5F3: flb_free (flb_mem.h:127)
==57818==    by 0x8F27A7: flb_out_kafka_destroy (kafka_config.c:243)
==57818==    by 0x8EE3F7: cb_kafka_exit (kafka.c:558)
==57818==    by 0x4D6D5B: flb_output_exit (flb_output.c:474)
==57818==    by 0x4FF65F: flb_engine_shutdown (flb_engine.c:1119)
==57818==    by 0x4FF333: flb_engine_start (flb_engine.c:1017)
==57818==    by 0x49F20B: flb_lib_worker (flb_lib.c:674)
==57818==    by 0x4F1EE57: start_thread (pthread_create.c:442)
==57818==    by 0x4F87F9B: thread_start (clone.S:79)
==57818==  Address 0x5d22850 is 16 bytes inside a block of size 24 alloc'd
==57818==    at 0x48850C8: malloc (vg_replace_malloc.c:381)
==57818==    by 0x4AD267: flb_malloc (flb_mem.h:80)
==57818==    by 0x4AD4C3: sds_alloc (flb_sds.c:41)
==57818==    by 0x4AD60F: flb_sds_create_size (flb_sds.c:93)
==57818==    by 0x5B0D3F: flb_env_var_translate (flb_env.c:180)
==57818==    by 0x4D76BF: flb_output_set_property (flb_output.c:753)
==57818==    by 0x4E869F: configure_plugins_type (flb_config.c:833)
==57818==    by 0x4E8BFF: flb_config_load_config_format (flb_config.c:941)
==57818==    by 0x489843: service_configure (fluent-bit.c:765)
==57818==    by 0x48A95B: flb_main (fluent-bit.c:1298)
==57818==    by 0x48AD47: main (fluent-bit.c:1456)

Signed-off-by: Holger Hans Peter Freyther <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
docs-required ok-package-test Run PR packaging tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants