From 1490882d4fefa49427eb4230e4eb20bb0916e940 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Tue, 5 Sep 2023 14:28:57 -0500 Subject: [PATCH] out_forward: add new 'add_option' configuration property Forward protocol supports metadata through the 'options' feature. This information is appended to the whole payload and normally is handled only by the Forward plugin implementation. This plugin extends Fluent Bit Forward configuration where now is possible to set arbitrary options key/value pairs (strings) for very advanced use cases. Usage: [OUTPUT] name forward match * add_option key1 val1 add_option key2 val2 Signed-off-by: Eduardo Silva --- plugins/out_forward/forward.c | 16 +++++++++++++++- plugins/out_forward/forward.h | 4 ++++ plugins/out_forward/forward_format.c | 18 ++++++++++++++++++ 3 files changed, 37 insertions(+), 1 deletion(-) diff --git a/plugins/out_forward/forward.c b/plugins/out_forward/forward.c index 885b420effe..8cc2ca2cc76 100644 --- a/plugins/out_forward/forward.c +++ b/plugins/out_forward/forward.c @@ -839,6 +839,13 @@ static int config_set_properties(struct flb_upstream_node *node, fc->send_options = flb_utils_bool(tmp); } + /* add_option -> extra_options: if the user has defined 'add_option' + * we need to enable the 'send_options' flag + */ + if (fc->extra_options && mk_list_size(fc->extra_options) > 0) { + fc->send_options = FLB_TRUE; + } + /* require ack response (implies send_options) */ tmp = config_get_property("require_ack_response", node, ctx); if (tmp) { @@ -1785,7 +1792,14 @@ static struct flb_config_map config_map[] = { { FLB_CONFIG_MAP_BOOL, "fluentd_compat", "false", 0, FLB_TRUE, offsetof(struct flb_forward_config, fluentd_compat), - "Send cmetrics and ctreaces with Fluentd compatible format" + "Send metrics and traces with Fluentd compatible format" + }, + + { + FLB_CONFIG_MAP_SLIST_2, "add_option", NULL, + FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct flb_forward_config, extra_options), + "Set an extra Forward protocol option. This is an advance feature, use it only for " + "very specific use-cases." }, /* EOF */ diff --git a/plugins/out_forward/forward.h b/plugins/out_forward/forward.h index f0abcfcd9b2..8e77e6e111d 100644 --- a/plugins/out_forward/forward.h +++ b/plugins/out_forward/forward.h @@ -73,6 +73,10 @@ struct flb_forward_config { int time_as_integer; /* Use backward compatible timestamp ? */ int fluentd_compat; /* Use Fluentd compatible payload for * metrics and ctraces */ + + /* add extra options to the Forward payload (advanced) */ + struct mk_list *extra_options; + int fwd_retain_metadata; /* Do not drop metadata in forward mode */ /* config */ diff --git a/plugins/out_forward/forward_format.c b/plugins/out_forward/forward_format.c index 85a9c5d3f09..48dedd862d6 100644 --- a/plugins/out_forward/forward_format.c +++ b/plugins/out_forward/forward_format.c @@ -93,7 +93,11 @@ static int append_options(struct flb_forward *ctx, char *chunk = NULL; uint8_t checksum[64]; int result; + struct mk_list *head; + struct flb_config_map_val *mv; struct flb_mp_map_header mh; + struct flb_slist_entry *eopt_key; + struct flb_slist_entry *eopt_val; /* options is map, use the dynamic map type */ flb_mp_map_header_init(&mh, mp_pck); @@ -152,6 +156,20 @@ static int append_options(struct flb_forward *ctx, msgpack_pack_str_body(mp_pck, "fluent_signal", 13); msgpack_pack_int64(mp_pck, event_type); + /* process 'extra_option(s)' */ + if (fc->extra_options) { + flb_config_map_foreach(head, mv, fc->extra_options) { + eopt_key = mk_list_entry_first(mv->val.list, struct flb_slist_entry, _head); + eopt_val = mk_list_entry_last(mv->val.list, struct flb_slist_entry, _head); + + flb_mp_map_header_append(&mh); + msgpack_pack_str(mp_pck, flb_sds_len(eopt_key->str)); + msgpack_pack_str_body(mp_pck, eopt_key->str, flb_sds_len(eopt_key->str)); + msgpack_pack_str(mp_pck, flb_sds_len(eopt_val->str)); + msgpack_pack_str_body(mp_pck, eopt_val->str, flb_sds_len(eopt_val->str)); + } + } + if (metadata != NULL && metadata->type == MSGPACK_OBJECT_MAP && metadata->via.map.size > 0) {