diff --git a/plugins/out_forward/forward.c b/plugins/out_forward/forward.c index 885b420effe..8cc2ca2cc76 100644 --- a/plugins/out_forward/forward.c +++ b/plugins/out_forward/forward.c @@ -839,6 +839,13 @@ static int config_set_properties(struct flb_upstream_node *node, fc->send_options = flb_utils_bool(tmp); } + /* add_option -> extra_options: if the user has defined 'add_option' + * we need to enable the 'send_options' flag + */ + if (fc->extra_options && mk_list_size(fc->extra_options) > 0) { + fc->send_options = FLB_TRUE; + } + /* require ack response (implies send_options) */ tmp = config_get_property("require_ack_response", node, ctx); if (tmp) { @@ -1785,7 +1792,14 @@ static struct flb_config_map config_map[] = { { FLB_CONFIG_MAP_BOOL, "fluentd_compat", "false", 0, FLB_TRUE, offsetof(struct flb_forward_config, fluentd_compat), - "Send cmetrics and ctreaces with Fluentd compatible format" + "Send metrics and traces with Fluentd compatible format" + }, + + { + FLB_CONFIG_MAP_SLIST_2, "add_option", NULL, + FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct flb_forward_config, extra_options), + "Set an extra Forward protocol option. This is an advance feature, use it only for " + "very specific use-cases." }, /* EOF */ diff --git a/plugins/out_forward/forward.h b/plugins/out_forward/forward.h index f0abcfcd9b2..8e77e6e111d 100644 --- a/plugins/out_forward/forward.h +++ b/plugins/out_forward/forward.h @@ -73,6 +73,10 @@ struct flb_forward_config { int time_as_integer; /* Use backward compatible timestamp ? */ int fluentd_compat; /* Use Fluentd compatible payload for * metrics and ctraces */ + + /* add extra options to the Forward payload (advanced) */ + struct mk_list *extra_options; + int fwd_retain_metadata; /* Do not drop metadata in forward mode */ /* config */ diff --git a/plugins/out_forward/forward_format.c b/plugins/out_forward/forward_format.c index 85a9c5d3f09..48dedd862d6 100644 --- a/plugins/out_forward/forward_format.c +++ b/plugins/out_forward/forward_format.c @@ -93,7 +93,11 @@ static int append_options(struct flb_forward *ctx, char *chunk = NULL; uint8_t checksum[64]; int result; + struct mk_list *head; + struct flb_config_map_val *mv; struct flb_mp_map_header mh; + struct flb_slist_entry *eopt_key; + struct flb_slist_entry *eopt_val; /* options is map, use the dynamic map type */ flb_mp_map_header_init(&mh, mp_pck); @@ -152,6 +156,20 @@ static int append_options(struct flb_forward *ctx, msgpack_pack_str_body(mp_pck, "fluent_signal", 13); msgpack_pack_int64(mp_pck, event_type); + /* process 'extra_option(s)' */ + if (fc->extra_options) { + flb_config_map_foreach(head, mv, fc->extra_options) { + eopt_key = mk_list_entry_first(mv->val.list, struct flb_slist_entry, _head); + eopt_val = mk_list_entry_last(mv->val.list, struct flb_slist_entry, _head); + + flb_mp_map_header_append(&mh); + msgpack_pack_str(mp_pck, flb_sds_len(eopt_key->str)); + msgpack_pack_str_body(mp_pck, eopt_key->str, flb_sds_len(eopt_key->str)); + msgpack_pack_str(mp_pck, flb_sds_len(eopt_val->str)); + msgpack_pack_str_body(mp_pck, eopt_val->str, flb_sds_len(eopt_val->str)); + } + } + if (metadata != NULL && metadata->type == MSGPACK_OBJECT_MAP && metadata->via.map.size > 0) {