From a9b3a121325d3ec605320651e8c9018d15f826c2 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Mon, 21 Oct 2024 21:45:58 -0600 Subject: [PATCH 01/26] parser: allow to register parser with flb_cf context Signed-off-by: Eduardo Silva --- include/fluent-bit/flb_parser.h | 4 +++- src/flb_parser.c | 26 +++++++++++++------------- 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/include/fluent-bit/flb_parser.h b/include/fluent-bit/flb_parser.h index 3c92ca489ea..bdfacc7c035 100644 --- a/include/fluent-bit/flb_parser.h +++ b/include/fluent-bit/flb_parser.h @@ -75,7 +75,7 @@ enum { FLB_PARSER_TYPE_HEX, }; -static inline time_t flb_parser_tm2time(const struct flb_tm *src, +static inline time_t flb_parser_tm2time(const struct flb_tm *src, int use_system_timezone) { struct tm tmp; @@ -107,6 +107,8 @@ struct flb_parser *flb_parser_create(const char *name, const char *format, struct flb_config *config); int flb_parser_conf_file_stat(const char *file, struct flb_config *config); int flb_parser_conf_file(const char *file, struct flb_config *config); +int flb_parser_load_parser_definitions(const char *cfg, struct flb_cf *cf, + struct flb_config *config); void flb_parser_destroy(struct flb_parser *parser); struct flb_parser *flb_parser_get(const char *name, struct flb_config *config); int flb_parser_do(struct flb_parser *parser, const char *buf, size_t length, diff --git a/src/flb_parser.c b/src/flb_parser.c index 8df7f56dfb5..ed1229046a2 100644 --- a/src/flb_parser.c +++ b/src/flb_parser.c @@ -313,15 +313,15 @@ struct flb_parser *flb_parser_create(const char *name, const char *format, p->time_frac_secs = (tmp + 2); } - /* - * Fall back to the system timezone - * if there is no zone parsed from the log. + /* + * Fall back to the system timezone + * if there is no zone parsed from the log. */ p->time_system_timezone = time_system_timezone; - /* - * Optional fixed timezone offset, only applied if - * not falling back to system timezone. + /* + * Optional fixed timezone offset, only applied if + * not falling back to system timezone. */ if (!p->time_system_timezone && time_offset) { diff = 0; @@ -481,9 +481,9 @@ static flb_sds_t get_parser_key(struct flb_config *config, return val; } -/* Config file: read 'parser' definitions */ -static int parser_conf_file(const char *cfg, struct flb_cf *cf, - struct flb_config *config) +/* Load each parser definition set in 'struct flb_cf *cf' */ +int flb_parser_load_parser_definitions(const char *cfg, struct flb_cf *cf, + struct flb_config *config) { int i = 0; flb_sds_t name; @@ -541,7 +541,7 @@ static int parser_conf_file(const char *cfg, struct flb_cf *cf, name, cfg); goto fconf_early_error; } - + /* skip_empty_values */ skip_empty = FLB_TRUE; tmp_str = get_parser_key(config, cf, s, "skip_empty_values"); @@ -605,7 +605,7 @@ static int parser_conf_file(const char *cfg, struct flb_cf *cf, /* Create the parser context */ if (!flb_parser_create(name, format, regex, skip_empty, time_fmt, time_key, time_offset, time_keep, time_strict, - time_system_timezone, logfmt_no_bare_keys, types, types_len, + time_system_timezone, logfmt_no_bare_keys, types, types_len, decoders, config)) { goto fconf_error; } @@ -946,8 +946,8 @@ int flb_parser_conf_file(const char *file, struct flb_config *config) return -1; } - /* process 'parser' sections */ - ret = parser_conf_file(cfg, cf, config); + /* load the parser definitions */ + ret = flb_parser_load_parser_definitions(cfg, cf, config); if (ret == -1) { flb_cf_destroy(cf); return -1; From 14b2ded4d3791df8d95e4d158be398ff43aa1361 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Mon, 21 Oct 2024 21:46:51 -0600 Subject: [PATCH 02/26] config: allow parsers and multiline parsers in main config file for Yaml Signed-off-by: Eduardo Silva --- src/flb_config.c | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/src/flb_config.c b/src/flb_config.c index 747d855cf08..a141dcfc991 100644 --- a/src/flb_config.c +++ b/src/flb_config.c @@ -907,11 +907,21 @@ int flb_config_load_config_format(struct flb_config *config, struct flb_cf *cf) /* Extra sanity checks */ if (strcasecmp(s->name, "parser") == 0 || strcasecmp(s->name, "multiline_parser") == 0) { - fprintf(stderr, - "Sections 'multiline_parser' and 'parser' are not valid in " - "the main configuration file. It belongs to \n" - "the 'parsers_file' configuration files.\n"); - return -1; + + /* + * Classic mode configuration don't allow parser or multiline_parser + * to be defined in the main configuration file. + */ + if (cf->format == FLB_CF_CLASSIC) { + fprintf(stderr, + "Sections 'multiline_parser' and 'parser' are not valid in " + "the main configuration file. It belongs to \n" + "the 'parsers_file' configuration files.\n"); + return -1; + } + else { + /* Yaml allow parsers definitions in any Yaml file, all good */ + } } } @@ -925,6 +935,11 @@ int flb_config_load_config_format(struct flb_config *config, struct flb_cf *cf) } } + ret = flb_parser_load_parser_definitions("", cf, config); + if (ret == -1) { + return -1; + } + ret = configure_plugins_type(config, cf, FLB_CF_CUSTOM); if (ret == -1) { return -1; From 2200217dc40a9b5e8c087e7d1d27190df7b52f45 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Mon, 21 Oct 2024 21:48:20 -0600 Subject: [PATCH 03/26] config_format: cf_yaml: add support for 'parsers' section Signed-off-by: Eduardo Silva --- src/config_format/flb_cf_yaml.c | 142 +++++++++++++++++++++++++++++--- 1 file changed, 131 insertions(+), 11 deletions(-) diff --git a/src/config_format/flb_cf_yaml.c b/src/config_format/flb_cf_yaml.c index e09034393f3..341c0499a64 100644 --- a/src/config_format/flb_cf_yaml.c +++ b/src/config_format/flb_cf_yaml.c @@ -57,6 +57,7 @@ enum section { SECTION_FILTER, SECTION_OUTPUT, SECTION_PROCESSOR, + SECTION_PARSER, SECTION_OTHER, }; @@ -70,6 +71,7 @@ static char *section_names[] = { "filter", "output", "processor", + "parser", "other" }; @@ -130,6 +132,12 @@ enum state { STATE_INPUT_PROCESSORS, STATE_INPUT_PROCESSOR, + /* Parser */ + STATE_PARSER, /* parser section */ + STATE_PARSER_ENTRY, /* a parser definition */ + STATE_PARSER_KEY, /* reading a key inside a parser */ + STATE_PARSER_VALUE, /* reading a value inside a parser */ + /* environment variables */ STATE_ENV, @@ -152,17 +160,22 @@ struct parser_state { /* active section */ struct flb_cf_section *cf_section; + /* active group */ struct flb_cf_group *cf_group; /* key value */ flb_sds_t key; + /* section key/value list */ struct cfl_kvlist *keyvals; + /* pointer to current values in a list. */ struct cfl_array *values; + /* pointer to current variant */ struct cfl_variant *variant; + /* if the current variant is reading the key of a kvlist */ cfl_sds_t variant_kvlist_key; /* are we the owner of the values? */ @@ -252,6 +265,8 @@ static char *state_str(enum state val) return "processor"; case STATE_ENV: return "env"; + case STATE_PARSER: + return "parser"; case STATE_STOP: return "stop"; default: @@ -266,17 +281,20 @@ static int add_section_type(struct flb_cf *conf, struct parser_state *state) } if (state->section == SECTION_INPUT) { - state->cf_section = flb_cf_section_create(conf, "INPUT", 0); + state->cf_section = flb_cf_section_create(conf, "input", 0); } else if (state->section == SECTION_FILTER) { - state->cf_section = flb_cf_section_create(conf, "FILTER", 0); + state->cf_section = flb_cf_section_create(conf, "filter", 0); } else if (state->section == SECTION_OUTPUT) { - state->cf_section = flb_cf_section_create(conf, "OUTPUT", 0); + state->cf_section = flb_cf_section_create(conf, "output", 0); } else if (state->section == SECTION_CUSTOM) { state->cf_section = flb_cf_section_create(conf, "customs", 0); } + else if (state->section == SECTION_PARSER) { + state->cf_section = flb_cf_section_create(conf, "parser", 0); + } if (!state->cf_section) { return -1; @@ -571,21 +589,21 @@ static void print_current_properties(struct parser_state *state) struct cfl_variant *var; int idx; - flb_debug("%*s[%s] PROPERTIES:", state->level*2, "", section_names[state->section]); + flb_info("%*s[%s] PROPERTIES:", state->level*2, "", section_names[state->section]); cfl_list_foreach(head, &state->keyvals->list) { prop = cfl_list_entry(head, struct cfl_kvpair, _head); switch (prop->val->type) { case CFL_VARIANT_STRING: - flb_debug("%*s%s: %s", (state->level+2)*2, "", prop->key, prop->val->data.as_string); + flb_info("%*s%s: %s", (state->level+2)*2, "", prop->key, prop->val->data.as_string); break; case CFL_VARIANT_ARRAY: - flb_debug("%*s%s: [", (state->level+2)*2, "", prop->key); + flb_info("%*s%s: [", (state->level+2)*2, "", prop->key); for (idx = 0; idx < prop->val->data.as_array->entry_count; idx++) { var = cfl_array_fetch_by_index(prop->val->data.as_array, idx); - flb_debug("%*s%s", (state->level+3)*2, "", var->data.as_string); + flb_info("%*s%s", (state->level+3)*2, "", var->data.as_string); } - flb_debug("%*s]", (state->level+2)*2, ""); + flb_info("%*s]", (state->level+2)*2, ""); break; } } @@ -869,6 +887,100 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx, break; /* end of 'includes' */ + /* Handle the 'parsers' section */ + case STATE_PARSER: + switch (event->type) { + case YAML_SEQUENCE_START_EVENT: + /* Start of the parsers list */ + break; + + case YAML_MAPPING_START_EVENT: + /* we handle each parser definition as a new section */ + if (add_section_type(conf, state) == -1) { + flb_error("Unable to add parsers section"); + return YAML_FAILURE; + } + + /* Start of an individual parser entry */ + state = state_push_withvals(ctx, state, STATE_PARSER_ENTRY); + if (!state) { + flb_error("Unable to allocate state for parser entry"); + return YAML_FAILURE; + } + break; + + case YAML_SEQUENCE_END_EVENT: + /* End of the parsers list */ + state = state_pop(ctx); + if (!state) { + flb_error("No state left"); + return YAML_FAILURE; + } + break; + + default: + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + } + break; + + case STATE_PARSER_ENTRY: + switch (event->type) { + case YAML_SCALAR_EVENT: + /* Found a key within the parser entry */ + value = (char *) event->data.scalar.value; + state = state_push_key(ctx, STATE_PARSER_KEY, value); + if (!state) { + flb_error("Unable to allocate state for parser key"); + return YAML_FAILURE; + } + break; + + case YAML_MAPPING_END_EVENT: + /* End of an individual parser entry */ + print_current_properties(state); + state = state_pop(ctx); + if (!state) { + flb_error("No state left"); + return YAML_FAILURE; + } + break; + + default: + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + } + break; + + case STATE_PARSER_KEY: + switch (event->type) { + case YAML_SCALAR_EVENT: + /* Store the value for the previous key */ + value = (char *)event->data.scalar.value; + if (flb_cf_section_property_add(conf, state->cf_section->properties, + state->key, flb_sds_len(state->key), + value, strlen(value)) < 0) { + flb_error("unable to add property"); + return YAML_FAILURE; + } + + /* Return to the parser entry state */ + state = state_pop(ctx); + if (!state) { + flb_error("No state left"); + return YAML_FAILURE; + } + break; + + default: + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + } + break; + case STATE_PARSER_VALUE: + /* unused */ + break; + /* * 'customs' * -------- @@ -955,6 +1067,13 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx, return YAML_FAILURE; } } + else if (strcasecmp(value, "parsers") == 0) { + state = state_push_section(ctx, STATE_PARSER, SECTION_PARSER); + if (state == NULL) { + flb_error("unable to allocate state"); + return YAML_FAILURE; + } + } else if (strcasecmp(value, "pipeline") == 0) { state = state_push_section(ctx, STATE_PIPELINE, SECTION_PIPELINE); if (state == NULL) { @@ -985,7 +1104,7 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx, else if (strcasecmp(value, "customs") == 0) { state = state_push_section(ctx, STATE_CUSTOM, SECTION_CUSTOM); - if (state == NULL) { + if (state == NULL) { flb_error("unable to allocate state"); return YAML_FAILURE; } @@ -2036,7 +2155,6 @@ static int state_create_section(struct flb_cf *conf, struct parser_state *state, } state->cf_section = flb_cf_section_create(conf, name, 0); - if (state->cf_section == NULL) { return -1; } @@ -2322,7 +2440,9 @@ struct flb_cf *flb_cf_yaml_create(struct flb_cf *conf, char *file_path, if (!conf) { return NULL; } - + flb_cf_set_origin_format(conf, FLB_CF_YAML); + } + else { flb_cf_set_origin_format(conf, FLB_CF_YAML); } From eb116bb5777a5f5674c96ef4be4e6ecb3eefa22a Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Mon, 21 Oct 2024 21:53:43 -0600 Subject: [PATCH 04/26] config_format: cf_yaml: move back properties print to debug mode Signed-off-by: Eduardo Silva --- src/config_format/flb_cf_yaml.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/config_format/flb_cf_yaml.c b/src/config_format/flb_cf_yaml.c index 341c0499a64..92bf4e54b44 100644 --- a/src/config_format/flb_cf_yaml.c +++ b/src/config_format/flb_cf_yaml.c @@ -589,21 +589,21 @@ static void print_current_properties(struct parser_state *state) struct cfl_variant *var; int idx; - flb_info("%*s[%s] PROPERTIES:", state->level*2, "", section_names[state->section]); + flb_debug("%*s[%s] PROPERTIES:", state->level*2, "", section_names[state->section]); cfl_list_foreach(head, &state->keyvals->list) { prop = cfl_list_entry(head, struct cfl_kvpair, _head); switch (prop->val->type) { case CFL_VARIANT_STRING: - flb_info("%*s%s: %s", (state->level+2)*2, "", prop->key, prop->val->data.as_string); + flb_debug("%*s%s: %s", (state->level+2)*2, "", prop->key, prop->val->data.as_string); break; case CFL_VARIANT_ARRAY: - flb_info("%*s%s: [", (state->level+2)*2, "", prop->key); + flb_debug("%*s%s: [", (state->level+2)*2, "", prop->key); for (idx = 0; idx < prop->val->data.as_array->entry_count; idx++) { var = cfl_array_fetch_by_index(prop->val->data.as_array, idx); - flb_info("%*s%s", (state->level+3)*2, "", var->data.as_string); + flb_debug("%*s%s", (state->level+3)*2, "", var->data.as_string); } - flb_info("%*s]", (state->level+2)*2, ""); + flb_debug("%*s]", (state->level+2)*2, ""); break; } } From c24d47ee5f047f9b72304831bb85de4e599cefd5 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Wed, 23 Oct 2024 20:06:44 -0600 Subject: [PATCH 05/26] config: load multiline parsers definitions from config format Signed-off-by: Eduardo Silva --- src/flb_config.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/flb_config.c b/src/flb_config.c index a141dcfc991..da12e9053d4 100644 --- a/src/flb_config.c +++ b/src/flb_config.c @@ -940,6 +940,11 @@ int flb_config_load_config_format(struct flb_config *config, struct flb_cf *cf) return -1; } + ret = flb_parser_load_multiline_parser_definitions("", cf, config); + if (ret == -1) { + return -1; + } + ret = configure_plugins_type(config, cf, FLB_CF_CUSTOM); if (ret == -1) { return -1; From fbd86aa5abc284b2ac1c4d5bf07978d2aa6d1d7b Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Wed, 23 Oct 2024 20:07:28 -0600 Subject: [PATCH 06/26] parser: split logic to handle multiline rules Signed-off-by: Eduardo Silva --- include/fluent-bit/flb_parser.h | 3 ++ src/flb_parser.c | 72 ++++++++++++++++++++++++++++----- 2 files changed, 66 insertions(+), 9 deletions(-) diff --git a/include/fluent-bit/flb_parser.h b/include/fluent-bit/flb_parser.h index bdfacc7c035..afc8159b100 100644 --- a/include/fluent-bit/flb_parser.h +++ b/include/fluent-bit/flb_parser.h @@ -109,6 +109,9 @@ int flb_parser_conf_file_stat(const char *file, struct flb_config *config); int flb_parser_conf_file(const char *file, struct flb_config *config); int flb_parser_load_parser_definitions(const char *cfg, struct flb_cf *cf, struct flb_config *config); +int flb_parser_load_multiline_parser_definitions(const char *cfg, struct flb_cf *cf, + struct flb_config *config); + void flb_parser_destroy(struct flb_parser *parser); struct flb_parser *flb_parser_get(const char *name, struct flb_config *config); int flb_parser_do(struct flb_parser *parser, const char *buf, size_t length, diff --git a/src/flb_parser.c b/src/flb_parser.c index ed1229046a2..149ab7f1b91 100644 --- a/src/flb_parser.c +++ b/src/flb_parser.c @@ -680,6 +680,17 @@ int flb_parser_load_parser_definitions(const char *cfg, struct flb_cf *cf, return -1; } +static int multiline_rule_create(struct flb_ml_parser *ml_parser, + char *from_state, + char *regex_pattern, + char *to_state) +{ + int ret; + + ret = flb_ml_rule_create(ml_parser, from_state, regex_pattern, to_state, NULL); + return ret; +} + static int multiline_load_regex_rules(struct flb_ml_parser *ml_parser, struct flb_cf_section *section, struct flb_config *config) @@ -692,7 +703,47 @@ static int multiline_load_regex_rules(struct flb_ml_parser *ml_parser, struct flb_slist_entry *from_state; struct flb_slist_entry *regex_pattern; struct flb_slist_entry *tmp; + struct mk_list *g_head; + struct flb_cf_group *group; + struct cfl_variant *var_state; + struct cfl_variant *var_regex; + struct cfl_variant *var_next_state; + + /* Check if we have groups (coming from Yaml style config */ + mk_list_foreach(g_head, §ion->groups) { + /* Every group is a rule */ + group = cfl_list_entry(g_head, struct flb_cf_group, _head); + + var_state = cfl_kvlist_fetch(group->properties, "state"); + if (!var_state || var_state->type != CFL_VARIANT_STRING) { + flb_error("[multiline parser: %s] invalid 'state' key", ml_parser->name); + return -1; + } + + var_regex = cfl_kvlist_fetch(group->properties, "regex"); + if (!var_regex || var_regex->type != CFL_VARIANT_STRING) { + flb_error("[multiline parser: %s] invalid 'regex' key", ml_parser->name); + return -1; + } + var_next_state = cfl_kvlist_fetch(group->properties, "next_state"); + if (!var_next_state || var_next_state->type != CFL_VARIANT_STRING) { + flb_error("[multiline parser: %s] invalid 'next_state' key", ml_parser->name); + return -1; + } + + ret = multiline_rule_create(ml_parser, + var_state->data.as_string, + var_regex->data.as_string, + var_next_state->data.as_string); + + if (ret == -1) { + flb_error("[multiline parser: %s] error creating rule", ml_parser->name); + return -1; + } + } + + /* Multiline rules set by a Fluent Bit classic mode config */ cfl_list_foreach(head, §ion->properties->list) { entry = cfl_list_entry(head, struct cfl_kvpair, _head); @@ -705,7 +756,7 @@ static int multiline_load_regex_rules(struct flb_ml_parser *ml_parser, ret = flb_slist_split_tokens(&list, entry->val->data.as_string, 3); if (ret == -1) { flb_error("[multiline parser: %s] invalid section on key '%s'", - ml_parser->name, entry->key); + ml_parser->name, entry->key); return -1; } @@ -734,11 +785,10 @@ static int multiline_load_regex_rules(struct flb_ml_parser *ml_parser, return -1; } - ret = flb_ml_rule_create(ml_parser, - from_state->str, - regex_pattern->str, - to_state, - NULL); + ret = multiline_rule_create(ml_parser, + from_state->str, + regex_pattern->str, + to_state); if (ret == -1) { flb_error("[multiline parser: %s] error creating rule", ml_parser->name); @@ -762,8 +812,8 @@ static int multiline_load_regex_rules(struct flb_ml_parser *ml_parser, /* config file: read 'multiline_parser' sections */ -static int multiline_parser_conf_file(const char *cfg, struct flb_cf *cf, - struct flb_config *config) +int flb_parser_load_multiline_parser_definitions(const char *cfg, struct flb_cf *cf, + struct flb_config *config) { int ret; int type; @@ -781,6 +831,10 @@ static int multiline_parser_conf_file(const char *cfg, struct flb_cf *cf, struct flb_cf_section *s; struct flb_ml_parser *ml_parser; + /* + * debug content of cf: flb_cf_dump(cf); + */ + /* read all 'multiline_parser' sections */ mk_list_foreach(head, &cf->multiline_parsers) { ml_parser = NULL; @@ -954,7 +1008,7 @@ int flb_parser_conf_file(const char *file, struct flb_config *config) } /* processs 'multiline_parser' sections */ - ret = multiline_parser_conf_file(cfg, cf, config); + ret = flb_parser_load_multiline_parser_definitions(cfg, cf, config); if (ret == -1) { flb_cf_destroy(cf); return -1; From 4bb67b211dc07d3357778dc669acda9c07d3621a Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Wed, 23 Oct 2024 20:08:14 -0600 Subject: [PATCH 07/26] config_format: cf_yaml: add support for multiline_parsers Signed-off-by: Eduardo Silva --- src/config_format/flb_cf_yaml.c | 199 ++++++++++++++++++++++++++++++-- 1 file changed, 191 insertions(+), 8 deletions(-) diff --git a/src/config_format/flb_cf_yaml.c b/src/config_format/flb_cf_yaml.c index 92bf4e54b44..9d95e81cb5a 100644 --- a/src/config_format/flb_cf_yaml.c +++ b/src/config_format/flb_cf_yaml.c @@ -58,6 +58,8 @@ enum section { SECTION_OUTPUT, SECTION_PROCESSOR, SECTION_PARSER, + SECTION_MULTILINE_PARSER, + SECTION_MULTILINE_PARSER_RULE, SECTION_OTHER, }; @@ -72,6 +74,8 @@ static char *section_names[] = { "output", "processor", "parser", + "multiline_parser", + "multiline_parser_rule", "other" }; @@ -138,6 +142,12 @@ enum state { STATE_PARSER_KEY, /* reading a key inside a parser */ STATE_PARSER_VALUE, /* reading a value inside a parser */ + /* Multiline Parser */ + STATE_MULTILINE_PARSER, /* multiline parser section */ + STATE_MULTILINE_PARSER_ENTRY, /* a multiline parser definition */ + STATE_MULTILINE_PARSER_VALUE, /* reading a value inside a multiline parser */ + STATE_MULTILINE_PARSER_RULE, /* reading a multiline parser rule */ + /* environment variables */ STATE_ENV, @@ -267,6 +277,8 @@ static char *state_str(enum state val) return "env"; case STATE_PARSER: return "parser"; + case STATE_MULTILINE_PARSER: + return "multiline-parser"; case STATE_STOP: return "stop"; default: @@ -295,6 +307,12 @@ static int add_section_type(struct flb_cf *conf, struct parser_state *state) else if (state->section == SECTION_PARSER) { state->cf_section = flb_cf_section_create(conf, "parser", 0); } + else if (state->section == SECTION_MULTILINE_PARSER) { + state->cf_section = flb_cf_section_create(conf, "multiline_parser", 0); + } + else { + state->cf_section = flb_cf_section_create(conf, "other", 0); + } if (!state->cf_section) { return -1; @@ -346,8 +364,8 @@ static char *state_get_last(struct local_ctx *ctx) return entry->str; } -static void yaml_error_event(struct local_ctx *ctx, struct parser_state *state, - yaml_event_t *event) +static void yaml_error_event_line(struct local_ctx *ctx, struct parser_state *state, + yaml_event_t *event, int line) { struct flb_slist_entry *entry; @@ -380,6 +398,9 @@ static void yaml_error_event(struct local_ctx *ctx, struct parser_state *state, event_type_str(event), event->type, state_str(state->state), state->state); } +#define yaml_error_event(ctx, state, event) \ + yaml_error_event_line(ctx, state, event, __LINE__) + static void yaml_error_definition(struct local_ctx *ctx, struct parser_state *state, yaml_event_t *event, char *value) { @@ -578,6 +599,7 @@ static int read_glob(struct flb_cf *conf, struct local_ctx *ctx, static void print_current_state(struct local_ctx *ctx, struct parser_state *state, yaml_event_t *event) { + /* note: change this to flb_info() for debugging purposes */ flb_debug("%*s%s->%s", state->level*2, "", state_str(state->state), event_type_str(event)); } @@ -589,6 +611,8 @@ static void print_current_properties(struct parser_state *state) struct cfl_variant *var; int idx; + /* note: change flb_debug with flb_info() for debugging purposes */ + flb_debug("%*s[%s] PROPERTIES:", state->level*2, "", section_names[state->section]); cfl_list_foreach(head, &state->keyvals->list) { @@ -971,7 +995,6 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx, return YAML_FAILURE; } break; - default: yaml_error_event(ctx, state, event); return YAML_FAILURE; @@ -981,6 +1004,157 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx, /* unused */ break; + + /* Handle the 'multiline_parsers' section */ + /* *****************************************/ + /* Handle the 'multiline_parsers' section */ + case STATE_MULTILINE_PARSER: + switch (event->type) { + case YAML_SEQUENCE_START_EVENT: + /* Start of the multiline parsers list */ + break; + + case YAML_MAPPING_START_EVENT: + /* we handle each multiline parser definition as a new section */ + if (add_section_type(conf, state) == -1) { + flb_error("Unable to add multiline parsers section"); + return YAML_FAILURE; + } + + /* Start of an individual multiline parser entry */ + state = state_push_withvals(ctx, state, STATE_MULTILINE_PARSER_ENTRY); + if (!state) { + flb_error("Unable to allocate state for multiline parser entry"); + return YAML_FAILURE; + } + break; + + case YAML_SEQUENCE_END_EVENT: + /* End of the multiline parsers list */ + state = state_pop(ctx); + if (!state) { + flb_error("No state left"); + return YAML_FAILURE; + } + break; + + default: + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + } + break; + case STATE_MULTILINE_PARSER_ENTRY: + switch (event->type) { + case YAML_SCALAR_EVENT: + /* Found a key within the multiline parser entry */ + value = (char *) event->data.scalar.value; + + /* start of 'rules:' sequence */ + if (strcmp(value, "rules") == 0) { + state = state_push_withvals(ctx, state, STATE_MULTILINE_PARSER_RULE); + if (state == NULL) { + flb_error("Unable to allocate state for multiline parser rules"); + return YAML_FAILURE; + } + break; + } + + /* normal key value pair for the multiline parser */ + state = state_push_key(ctx, STATE_MULTILINE_PARSER_VALUE, value); + if (!state) { + flb_error("Unable to allocate state for multiline parser key"); + return YAML_FAILURE; + } + break; + + case YAML_MAPPING_END_EVENT: + /* End of an individual multiline parser entry */ + print_current_properties(state); + state = state_pop(ctx); + if (!state) { + flb_error("No state left"); + return YAML_FAILURE; + } + break; + default: + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + } + break; + case STATE_MULTILINE_PARSER_VALUE: + switch (event->type) { + case YAML_SCALAR_EVENT: + /* Store the value for the previous key */ + value = (char *)event->data.scalar.value; + if (flb_cf_section_property_add(conf, state->cf_section->properties, + state->key, flb_sds_len(state->key), + value, strlen(value)) < 0) { + flb_error("unable to add property"); + return YAML_FAILURE; + } + + /* Return to the multiline parser entry state */ + state = state_pop(ctx); + if (!state) { + flb_error("No state left"); + return YAML_FAILURE; + } + break; + default: + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + } + break; + + /* + * Multiline Parser "Rules" + * ------------------------ + */ + case STATE_MULTILINE_PARSER_RULE: + switch(event->type) { + case YAML_SEQUENCE_START_EVENT: + break; + case YAML_SEQUENCE_END_EVENT: + state = state_pop(ctx); + if (state == NULL) { + flb_error("no state left"); + return YAML_FAILURE; + } + break; + case YAML_MAPPING_START_EVENT: + if (state_create_group(conf, state, "rule") == YAML_FAILURE) { + flb_error("unable to create group"); + return YAML_FAILURE; + } + + //state = state_push_withvals(ctx, state, STATE_GROUP_KEY); + state = state_push(ctx, STATE_GROUP_KEY); + if (state == NULL) { + flb_error("unable to allocate state"); + return YAML_FAILURE; + } + /* create group */ + state->values = flb_cf_section_property_add_list(conf, + state->cf_section->properties, + //state->key, flb_sds_len(state->key)); + "rules", 5); + + if (state->values == NULL) { + flb_error("no values"); + return YAML_FAILURE; + } + + break; + case YAML_MAPPING_END_EVENT: + return YAML_FAILURE; + break; + default: + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + }; + break; + + /* * 'customs' * -------- @@ -1074,6 +1248,13 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx, return YAML_FAILURE; } } + else if (strcasecmp(value, "multiline_parsers") == 0) { + state = state_push_section(ctx, STATE_MULTILINE_PARSER, SECTION_MULTILINE_PARSER); + if (state == NULL) { + flb_error("unable to allocate state"); + return YAML_FAILURE; + } + } else if (strcasecmp(value, "pipeline") == 0) { state = state_push_section(ctx, STATE_PIPELINE, SECTION_PIPELINE); if (state == NULL) { @@ -1804,11 +1985,13 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx, /* This is also the end of the plugin values mapping. * So we pop an additional state off the stack. */ - state = state_pop(ctx); + if (state->state == STATE_PLUGIN_VAL) { + state = state_pop(ctx); - if (state == NULL) { - flb_error("no state left"); - return YAML_FAILURE; + if (state == NULL) { + flb_error("no state left"); + return YAML_FAILURE; + } } break; default: @@ -2169,7 +2352,7 @@ static int state_create_group(struct flb_cf *conf, struct parser_state *state, c } state->cf_group = flb_cf_group_create(conf, state->cf_section, - "processors", strlen("processors")); + name, strlen(name)); if (state->cf_group == NULL) { return -1; From 4fbcfde3bb67c68b7210430ac868d4805fb91473 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Wed, 23 Oct 2024 22:03:37 -0600 Subject: [PATCH 08/26] tests: internal: config_format: yaml: add parsers and multiline parsers Signed-off-by: Eduardo Silva --- tests/internal/config_format_yaml.c | 112 +++++++++++++++++- .../data/config_format/yaml/extra_parser.yaml | 16 +++ .../yaml/parsers_and_multiline_parsers.yaml | 42 +++++++ 3 files changed, 169 insertions(+), 1 deletion(-) create mode 100644 tests/internal/data/config_format/yaml/extra_parser.yaml create mode 100644 tests/internal/data/config_format/yaml/parsers_and_multiline_parsers.yaml diff --git a/tests/internal/config_format_yaml.c b/tests/internal/config_format_yaml.c index f34bdb273d4..9673e5a792b 100644 --- a/tests/internal/config_format_yaml.c +++ b/tests/internal/config_format_yaml.c @@ -20,6 +20,8 @@ #define FLB_000 FLB_TESTS_CONF_PATH "/fluent-bit.yaml" #define FLB_001 FLB_TESTS_CONF_PATH "/issue_7559.yaml" #define FLB_002 FLB_TESTS_CONF_PATH "/processors.yaml" +#define FLB_003 FLB_TESTS_CONF_PATH "/parsers_and_multiline_parsers.yaml" + #define FLB_000_WIN FLB_TESTS_CONF_PATH "\\fluent-bit-windows.yaml" #define FLB_BROKEN_PLUGIN_VARIANT FLB_TESTS_CONF_PATH "/broken_plugin_variant.yaml" @@ -289,7 +291,7 @@ static void test_parser_conf() /* Total number of inputs */ if(!TEST_CHECK(mk_list_size(&config->parsers) == cnt+1)) { - TEST_MSG("Section number error. Got=%d expect=%d", + TEST_MSG("Section number error. Got=%d expect=%d", mk_list_size(&config->parsers), cnt+1); } @@ -455,6 +457,113 @@ static void test_processors() flb_cf_destroy(cf); } +static void test_parsers_and_multiline_parsers() +{ + flb_sds_t str; + struct mk_list *head; + struct mk_list *rule_head; + struct flb_cf *cf; + struct flb_cf_section *s; + struct flb_cf_group *g; + struct cfl_variant *v; + struct cfl_variant *logs; + struct cfl_variant *tmp; + struct cfl_variant *record_modifier_filter; + struct cfl_variant *records; + struct cfl_variant *record; + int idx = 0; + + cf = flb_cf_yaml_create(NULL, FLB_003, NULL, 0); + TEST_CHECK(cf != NULL); + if (!cf) { + exit(EXIT_FAILURE); + } + + /* Total number of sections */ + TEST_CHECK(mk_list_size(&cf->sections) == 8); + + /* Check number sections per list */ + TEST_CHECK(mk_list_size(&cf->parsers) == 3); + TEST_CHECK(mk_list_size(&cf->multiline_parsers) == 2); + TEST_CHECK(mk_list_size(&cf->customs) == 0); + TEST_CHECK(mk_list_size(&cf->inputs) == 1); + TEST_CHECK(mk_list_size(&cf->filters) == 0); + TEST_CHECK(mk_list_size(&cf->outputs) == 1); + TEST_CHECK(mk_list_size(&cf->others) == 0); + + /* check parsers */ + idx = 0; + mk_list_foreach(head, &cf->parsers) { + s = mk_list_entry(head, struct flb_cf_section, _head_section); + switch (idx) { + case 0: + v = flb_cf_section_property_get(cf, s, "name"); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "json-2") == 0); + break; + case 1: + v = flb_cf_section_property_get(cf, s, "name"); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "json") == 0); + break; + + case 2: + v = flb_cf_section_property_get(cf, s, "name"); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "docker") == 0); + break; + } + idx++; + } + + /* check multiline parsers */ + idx = 0; + head = NULL; + mk_list_foreach(head, &cf->multiline_parsers) { + s = mk_list_entry(head, struct flb_cf_section, _head_section); + str = flb_cf_section_property_get_string(cf, s, "name"); + + switch (idx) { + case 0: + TEST_CHECK(strcmp(str, "exception_test-2") == 0); + break; + case 1: + TEST_CHECK(strcmp(str, "exception_test") == 0); + break; + }; + flb_sds_destroy(str); + + /* check rules (groups) */ + TEST_CHECK(mk_list_size(&s->groups) == 2); + + idx = 0; + mk_list_foreach(rule_head, &s->groups) { + g = mk_list_entry(rule_head, struct flb_cf_group, _head); + TEST_CHECK(strcmp(g->name, "rule") == 0); + + if (idx == 0) { + /* get initial state "start_state" */ + tmp = cfl_kvlist_fetch(g->properties, "state"); + TEST_CHECK(tmp != NULL); + + TEST_CHECK(tmp->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(tmp->data.as_string, "start_state") == 0); + } + else if (idx == 1) { + /* get initial state "start_state" */ + tmp = cfl_kvlist_fetch(g->properties, "state"); + TEST_CHECK(tmp != NULL); + + TEST_CHECK(tmp->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(tmp->data.as_string, "cont") == 0); + } + idx++; + } + } + + flb_cf_destroy(cf); +} + TEST_LIST = { { "basic" , test_basic}, { "customs section", test_customs_section}, @@ -464,5 +573,6 @@ TEST_LIST = { { "parsers file conf", test_parser_conf}, { "camel_case_key", test_camel_case_key}, { "processors", test_processors}, + { "parsers_and_multiline_parsers", test_parsers_and_multiline_parsers}, { 0 } }; diff --git a/tests/internal/data/config_format/yaml/extra_parser.yaml b/tests/internal/data/config_format/yaml/extra_parser.yaml new file mode 100644 index 00000000000..abbf29bd98c --- /dev/null +++ b/tests/internal/data/config_format/yaml/extra_parser.yaml @@ -0,0 +1,16 @@ +parsers: + - name: json-2 + format: json + +multiline_parsers: + - name: exception_test-2 + type: regex + flush_timeout: 1000 + rules: + - state: start_state + regex: "/(Dec \\d+ \\d+\\:\\d+\\:\\d+)(.*)/" + next_state: cont + + - state: cont + regex: "/^\\s+at.*/" + next_state: cont diff --git a/tests/internal/data/config_format/yaml/parsers_and_multiline_parsers.yaml b/tests/internal/data/config_format/yaml/parsers_and_multiline_parsers.yaml new file mode 100644 index 00000000000..637c93ea3b0 --- /dev/null +++ b/tests/internal/data/config_format/yaml/parsers_and_multiline_parsers.yaml @@ -0,0 +1,42 @@ +--- +service: + log_level: info + #parsers_file: parsers_multiline.conf + +includes: + - extra_parser.yaml + +parsers: + - name: json + format: json + + - name: docker + format: json + time_key: time + time_format: "%Y-%m-%dT%H:%M:%S.%L" + time_keep: true + +multiline_parsers: + - name: exception_test + type: regex + flush_timeout: 1000 + rules: + - state: start_state + regex: "/(Dec \\d+ \\d+\\:\\d+\\:\\d+)(.*)/" + next_state: cont + + - state: cont + regex: "/^\\s+at.*/" + next_state: cont + +pipeline: + inputs: + - name: tail + path: ../test_multiline.log + read_from_head: true + multiline.parser: multiline-regex-test + + outputs: + - name: stdout + match: '*' + format: json_lines From b4816953544ac7a4a90adf9191a338e524fa3a67 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Thu, 24 Oct 2024 11:35:42 -0600 Subject: [PATCH 09/26] config_format: yaml: register stream_processor section Signed-off-by: Eduardo Silva --- include/fluent-bit/config_format/flb_cf.h | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/include/fluent-bit/config_format/flb_cf.h b/include/fluent-bit/config_format/flb_cf.h index 37d50594bb5..5c6f6448dfb 100644 --- a/include/fluent-bit/config_format/flb_cf.h +++ b/include/fluent-bit/config_format/flb_cf.h @@ -56,6 +56,7 @@ enum section_type { FLB_CF_SERVICE = 0, /* [SERVICE] */ FLB_CF_PARSER, /* [PARSER] */ FLB_CF_MULTILINE_PARSER, /* [MULTILINE_PARSER] */ + FLB_CF_STREAM_PROCESSOR, /* STREAM_PROCESSOR */ FLB_CF_CUSTOM, /* [CUSTOM] */ FLB_CF_INPUT, /* [INPUT] */ FLB_CF_FILTER, /* [FILTER] */ @@ -97,6 +98,9 @@ struct flb_cf { struct mk_list parsers; struct mk_list multiline_parsers; + /* stream processor: every entry is added as a task */ + struct mk_list stream_processors; + /* custom plugins */ struct mk_list customs; From 2109bcd8c43f6aa1417cda841a197eb948e890e0 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Thu, 24 Oct 2024 11:37:04 -0600 Subject: [PATCH 10/26] config_format: add handlers for new stream_processor section Signed-off-by: Eduardo Silva --- src/config_format/flb_config_format.c | 33 ++++++++++++++++++--------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/src/config_format/flb_config_format.c b/src/config_format/flb_config_format.c index 2a6ab5f4601..bc572b56a49 100644 --- a/src/config_format/flb_config_format.c +++ b/src/config_format/flb_config_format.c @@ -119,6 +119,9 @@ struct flb_cf *flb_cf_create() mk_list_init(&ctx->parsers); mk_list_init(&ctx->multiline_parsers); + /* stream processors */ + mk_list_init(&ctx->stream_processors); + /* custom plugins */ mk_list_init(&ctx->customs); @@ -160,29 +163,32 @@ int flb_cf_set_origin_format(struct flb_cf *cf, int format) static enum section_type get_section_type(char *name, int len) { - if (strncasecmp(name, "SERVICE", len) == 0) { + if (strncasecmp(name, "service", len) == 0) { return FLB_CF_SERVICE; } - else if (strncasecmp(name, "PARSER", len) == 0) { + else if (strncasecmp(name, "parser", len) == 0) { return FLB_CF_PARSER; } - else if (strncasecmp(name, "MULTILINE_PARSER", len) == 0) { + else if (strncasecmp(name, "multiline_parser", len) == 0) { return FLB_CF_MULTILINE_PARSER; } - else if (strncasecmp(name, "CUSTOM", len) == 0 || - strncasecmp(name, "CUSTOMS", len) == 0) { + else if (strncasecmp(name, "stream_processor", len) == 0) { + return FLB_CF_STREAM_PROCESSOR; + } + else if (strncasecmp(name, "custom", len) == 0 || + strncasecmp(name, "customs", len) == 0) { return FLB_CF_CUSTOM; } - else if (strncasecmp(name, "INPUT", len) == 0 || - strncasecmp(name, "INPUTS", len) == 0) { + else if (strncasecmp(name, "input", len) == 0 || + strncasecmp(name, "inputs", len) == 0) { return FLB_CF_INPUT; } - else if (strncasecmp(name, "FILTER", len) == 0 || - strncasecmp(name, "FILTERS", len) == 0) { + else if (strncasecmp(name, "filter", len) == 0 || + strncasecmp(name, "filters", len) == 0) { return FLB_CF_FILTER; } - else if (strncasecmp(name, "OUTPUT", len) == 0 || - strncasecmp(name, "OUTPUTS", len) == 0) { + else if (strncasecmp(name, "output", len) == 0 || + strncasecmp(name, "outputs", len) == 0) { return FLB_CF_OUTPUT; } @@ -634,6 +640,9 @@ struct flb_cf_section *flb_cf_section_create(struct flb_cf *cf, char *name, int else if (type == FLB_CF_MULTILINE_PARSER) { mk_list_add(&s->_head_section, &cf->multiline_parsers); } + else if (type == FLB_CF_STREAM_PROCESSOR) { + mk_list_add(&s->_head_section, &cf->stream_processors); + } else if (type == FLB_CF_CUSTOM) { mk_list_add(&s->_head_section, &cf->customs); } @@ -728,6 +737,8 @@ static char *section_type_str(int type) return "PARSER"; case FLB_CF_MULTILINE_PARSER: return "MULTILINE_PARSER"; + case FLB_CF_STREAM_PROCESSOR: + return "STREAM_PROCESSOR"; case FLB_CF_CUSTOM: return "CUSTOM"; case FLB_CF_INPUT: From 6a3df4c2ce2fcb268c3aee0b64b61c7b9872c138 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Thu, 24 Oct 2024 11:38:38 -0600 Subject: [PATCH 11/26] config_format: yaml: add support to parse stream_processor section Signed-off-by: Eduardo Silva --- src/config_format/flb_cf_yaml.c | 106 ++++++++++++++++++++++++++++++++ 1 file changed, 106 insertions(+) diff --git a/src/config_format/flb_cf_yaml.c b/src/config_format/flb_cf_yaml.c index 9d95e81cb5a..abc881b64ca 100644 --- a/src/config_format/flb_cf_yaml.c +++ b/src/config_format/flb_cf_yaml.c @@ -60,6 +60,7 @@ enum section { SECTION_PARSER, SECTION_MULTILINE_PARSER, SECTION_MULTILINE_PARSER_RULE, + SECTION_STREAM_PROCESSOR, SECTION_OTHER, }; @@ -76,6 +77,7 @@ static char *section_names[] = { "parser", "multiline_parser", "multiline_parser_rule", + "stream_processor", "other" }; @@ -148,6 +150,11 @@ enum state { STATE_MULTILINE_PARSER_VALUE, /* reading a value inside a multiline parser */ STATE_MULTILINE_PARSER_RULE, /* reading a multiline parser rule */ + /* Stream Processor */ + STATE_STREAM_PROCESSOR, + STATE_STREAM_PROCESSOR_ENTRY, + STATE_STREAM_PROCESSOR_KEY, + /* environment variables */ STATE_ENV, @@ -279,6 +286,8 @@ static char *state_str(enum state val) return "parser"; case STATE_MULTILINE_PARSER: return "multiline-parser"; + case STATE_STREAM_PROCESSOR: + return "stream-processor"; case STATE_STOP: return "stop"; default: @@ -310,6 +319,9 @@ static int add_section_type(struct flb_cf *conf, struct parser_state *state) else if (state->section == SECTION_MULTILINE_PARSER) { state->cf_section = flb_cf_section_create(conf, "multiline_parser", 0); } + else if (state->section == SECTION_STREAM_PROCESSOR) { + state->cf_section = flb_cf_section_create(conf, "stream_processor", 0); + } else { state->cf_section = flb_cf_section_create(conf, "other", 0); } @@ -1155,6 +1167,93 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx, break; + /* + * Stream Processor + * ---------------- + */ + case STATE_STREAM_PROCESSOR: + switch (event->type) { + case YAML_SEQUENCE_START_EVENT: + break; + + case YAML_MAPPING_START_EVENT: + if (add_section_type(conf, state) == -1) { + flb_error("Unable to add parsers section"); + return YAML_FAILURE; + } + + state = state_push_withvals(ctx, state, STATE_STREAM_PROCESSOR_ENTRY); + if (!state) { + flb_error("Unable to allocate state for stream processor entry"); + return YAML_FAILURE; + } + break; + + case YAML_SEQUENCE_END_EVENT: + state = state_pop(ctx); + if (!state) { + flb_error("No state left"); + return YAML_FAILURE; + } + break; + + default: + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + } + break; + + case STATE_STREAM_PROCESSOR_ENTRY: + switch (event->type) { + case YAML_SCALAR_EVENT: + value = (char *)event->data.scalar.value; + + state = state_push_key(ctx, STATE_STREAM_PROCESSOR_KEY, value); + if (!state) { + flb_error("Unable to allocate state for stream processor key"); + return YAML_FAILURE; + } + break; + + case YAML_MAPPING_END_EVENT: + state = state_pop(ctx); + if (!state) { + flb_error("No state left"); + return YAML_FAILURE; + } + break; + + default: + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + } + break; + + case STATE_STREAM_PROCESSOR_KEY: + switch (event->type) { + case YAML_SCALAR_EVENT: + value = (char *)event->data.scalar.value; + + if (flb_cf_section_property_add(conf, state->cf_section->properties, + state->key, flb_sds_len(state->key), + value, strlen(value)) < 0) { + flb_error("Unable to add property"); + return YAML_FAILURE; + } + + state = state_pop(ctx); + if (!state) { + flb_error("No state left"); + return YAML_FAILURE; + } + break; + + default: + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + } + break; + /* * 'customs' * -------- @@ -1255,6 +1354,13 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx, return YAML_FAILURE; } } + else if (strcasecmp(value, "stream_processor") == 0) { + state = state_push_section(ctx, STATE_STREAM_PROCESSOR, SECTION_STREAM_PROCESSOR); + if (state == NULL) { + flb_error("unable to allocate state"); + return YAML_FAILURE; + } + } else if (strcasecmp(value, "pipeline") == 0) { state = state_push_section(ctx, STATE_PIPELINE, SECTION_PIPELINE); if (state == NULL) { From 35c9e4b6f0a34c3e1e04a2119bf60be2cfb6be70 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Thu, 24 Oct 2024 11:39:12 -0600 Subject: [PATCH 12/26] sp: allow to read new sections set by Yaml Signed-off-by: Eduardo Silva --- src/stream_processor/flb_sp.c | 44 +++++++++++++++++++++++++++++++++-- 1 file changed, 42 insertions(+), 2 deletions(-) diff --git a/src/stream_processor/flb_sp.c b/src/stream_processor/flb_sp.c index 96aa508d9d1..5e5b601ca43 100644 --- a/src/stream_processor/flb_sp.c +++ b/src/stream_processor/flb_sp.c @@ -96,10 +96,19 @@ static int sp_config_file(struct flb_config *config, struct flb_sp *sp, return -1; } - /* Read all 'stream_task' sections */ + /* + * Note on reading the sections + * ---------------------------- + * Classic mode configuration looks for [STREAM_TASK], while the + * new Yaml parser expects the section names to be stream_processor. + * + * On Yaml mode, each pair of "name/exec" is set as an independent section, + * so the adjusted code below works for both type of files. + */ mk_list_foreach(head, &cf->sections) { section = mk_list_entry(head, struct flb_cf_section, _head); - if (strcasecmp(section->name, "stream_task") != 0) { + if (strcasecmp(section->name, "stream_task") != 0 && + strcasecmp(section->name, "stream_processor") != 0) { continue; } @@ -689,10 +698,14 @@ struct flb_sp *flb_sp_create(struct flb_config *config) int i = 0; int ret; char buf[32]; + char *task_name; + char *task_exec; struct mk_list *head; struct flb_sp *sp; struct flb_slist_entry *e; struct flb_sp_task *task; + struct cfl_variant *var; + struct flb_cf_section *section; /* Allocate context */ sp = flb_malloc(sizeof(struct flb_sp)); @@ -714,6 +727,33 @@ struct flb_sp *flb_sp_create(struct flb_config *config) } } + /* register stream processor tasks registered through Yaml config */ + mk_list_foreach(head, &config->cf_main->stream_processors) { + section = mk_list_entry(head, struct flb_cf_section, _head_section); + + /* task name */ + var = cfl_kvlist_fetch(section->properties, "name"); + if (!var || var->type != CFL_VARIANT_STRING) { + flb_error("[sp] missing 'name' property in stream_processor section"); + continue; + } + task_name = var->data.as_string; + + /* task exec/query */ + var = cfl_kvlist_fetch(section->properties, "exec"); + if (!var || var->type != CFL_VARIANT_STRING) { + flb_error("[sp] missing 'exec' property in stream_processor section"); + continue; + } + task_exec = var->data.as_string; + + /* create task */ + task = flb_sp_task_create(sp, task_name, task_exec); + if (!task) { + continue; + } + } + /* Lookup configuration file if any */ if (config->stream_processor_file) { ret = sp_config_file(config, sp, config->stream_processor_file); From f3520c09716c69a011d54b01dcd28977d02250e8 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Thu, 24 Oct 2024 11:39:43 -0600 Subject: [PATCH 13/26] tests: internal: config_format: yaml: add unit tests for stream_processor Signed-off-by: Eduardo Silva --- tests/internal/config_format_yaml.c | 67 +++++++++++++++++++ .../config_format/yaml/stream_processor.yaml | 22 ++++++ 2 files changed, 89 insertions(+) create mode 100644 tests/internal/data/config_format/yaml/stream_processor.yaml diff --git a/tests/internal/config_format_yaml.c b/tests/internal/config_format_yaml.c index 9673e5a792b..f5568bbbbe2 100644 --- a/tests/internal/config_format_yaml.c +++ b/tests/internal/config_format_yaml.c @@ -21,6 +21,7 @@ #define FLB_001 FLB_TESTS_CONF_PATH "/issue_7559.yaml" #define FLB_002 FLB_TESTS_CONF_PATH "/processors.yaml" #define FLB_003 FLB_TESTS_CONF_PATH "/parsers_and_multiline_parsers.yaml" +#define FLB_004 FLB_TESTS_CONF_PATH "/stream_processor.yaml" #define FLB_000_WIN FLB_TESTS_CONF_PATH "\\fluent-bit-windows.yaml" #define FLB_BROKEN_PLUGIN_VARIANT FLB_TESTS_CONF_PATH "/broken_plugin_variant.yaml" @@ -564,6 +565,71 @@ static void test_parsers_and_multiline_parsers() flb_cf_destroy(cf); } +static void test_stream_processor() +{ + struct mk_list *head; + struct flb_cf *cf; + struct flb_cf_section *s; + struct flb_cf_group *g; + struct cfl_variant *v; + struct cfl_variant *tmp; + struct cfl_variant *record_modifier_filter; + struct cfl_variant *records; + struct cfl_variant *record; + int idx = 0; + + cf = flb_cf_yaml_create(NULL, FLB_004, NULL, 0); + TEST_CHECK(cf != NULL); + if (!cf) { + exit(EXIT_FAILURE); + } + + /* Total number of sections */ + TEST_CHECK(mk_list_size(&cf->sections) == 5); + + /* Check number sections per list */ + TEST_CHECK(mk_list_size(&cf->parsers) == 0); + TEST_CHECK(mk_list_size(&cf->multiline_parsers) == 0); + TEST_CHECK(mk_list_size(&cf->customs) == 0); + TEST_CHECK(mk_list_size(&cf->inputs) == 1); + TEST_CHECK(mk_list_size(&cf->filters) == 0); + TEST_CHECK(mk_list_size(&cf->outputs) == 1); + TEST_CHECK(mk_list_size(&cf->others) == 0); + + /* check others */ + idx = 0; + mk_list_foreach(head, &cf->stream_processors) { + s = mk_list_entry(head, struct flb_cf_section, _head_section); + + switch (idx) { + case 0: + v = flb_cf_section_property_get(cf, s, "name"); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "create_results") == 0); + + v = flb_cf_section_property_get(cf, s, "exec"); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strncmp(v->data.as_string, "CREATE STREAM results", 21) == 0); + break; + case 1: + v = flb_cf_section_property_get(cf, s, "name"); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "select_results") == 0); + + v = flb_cf_section_property_get(cf, s, "exec"); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strncmp(v->data.as_string, "SELECT * FROM", 13) == 0); + break; + }; + idx++; + + /* check groups */ + TEST_CHECK(mk_list_size(&s->groups) == 0); + } + + flb_cf_destroy(cf); +} + TEST_LIST = { { "basic" , test_basic}, { "customs section", test_customs_section}, @@ -574,5 +640,6 @@ TEST_LIST = { { "camel_case_key", test_camel_case_key}, { "processors", test_processors}, { "parsers_and_multiline_parsers", test_parsers_and_multiline_parsers}, + { "stream_processor", test_stream_processor}, { 0 } }; diff --git a/tests/internal/data/config_format/yaml/stream_processor.yaml b/tests/internal/data/config_format/yaml/stream_processor.yaml new file mode 100644 index 00000000000..f098cc6b027 --- /dev/null +++ b/tests/internal/data/config_format/yaml/stream_processor.yaml @@ -0,0 +1,22 @@ +--- + +stream_processor: + - name: create_results + exec: CREATE STREAM results WITH (tag='500_error') AS SELECT * FROM STREAM:tail.0 WHERE http_status=500; + + - name: select_results + exec: SELECT * FROM STREAM:results WHERE http_status=500; + +service: + log_level: info + +pipeline: + inputs: + - name: tail + path: test_multiline.log + read_from_head: true + + outputs: + - name: stdout + match: '*' + format: json_lines From 6d1a8a31cd97028892afb9eeacf227844cdc7537 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Thu, 24 Oct 2024 15:43:11 -0600 Subject: [PATCH 14/26] config_format: register plugins as sections Signed-off-by: Eduardo Silva --- src/config_format/flb_config_format.c | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/config_format/flb_config_format.c b/src/config_format/flb_config_format.c index bc572b56a49..a9cc1b333dd 100644 --- a/src/config_format/flb_config_format.c +++ b/src/config_format/flb_config_format.c @@ -122,7 +122,10 @@ struct flb_cf *flb_cf_create() /* stream processors */ mk_list_init(&ctx->stream_processors); - /* custom plugins */ + /* external plugins (*.so) */ + mk_list_init(&ctx->plugins); + + /* 'custom' type plugins */ mk_list_init(&ctx->customs); /* pipeline */ @@ -175,6 +178,9 @@ static enum section_type get_section_type(char *name, int len) else if (strncasecmp(name, "stream_processor", len) == 0) { return FLB_CF_STREAM_PROCESSOR; } + else if (strncasecmp(name, "plugins", len) == 0) { + return FLB_CF_PLUGINS; + } else if (strncasecmp(name, "custom", len) == 0 || strncasecmp(name, "customs", len) == 0) { return FLB_CF_CUSTOM; @@ -643,6 +649,9 @@ struct flb_cf_section *flb_cf_section_create(struct flb_cf *cf, char *name, int else if (type == FLB_CF_STREAM_PROCESSOR) { mk_list_add(&s->_head_section, &cf->stream_processors); } + else if (type == FLB_CF_PLUGINS) { + mk_list_add(&s->_head_section, &cf->plugins); + } else if (type == FLB_CF_CUSTOM) { mk_list_add(&s->_head_section, &cf->customs); } @@ -739,6 +748,8 @@ static char *section_type_str(int type) return "MULTILINE_PARSER"; case FLB_CF_STREAM_PROCESSOR: return "STREAM_PROCESSOR"; + case FLB_CF_PLUGINS: + return "PLUGINS"; case FLB_CF_CUSTOM: return "CUSTOM"; case FLB_CF_INPUT: From e434d309cfda911813d4bd41ffe613257cab328b Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Thu, 24 Oct 2024 15:44:15 -0600 Subject: [PATCH 15/26] plugin: allow to load plugins by using config_format context Signed-off-by: Eduardo Silva --- include/fluent-bit/flb_plugin.h | 4 ++++ src/flb_plugin.c | 28 ++++++++++++++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/include/fluent-bit/flb_plugin.h b/include/fluent-bit/flb_plugin.h index 44369c4a363..af78b1cde91 100644 --- a/include/fluent-bit/flb_plugin.h +++ b/include/fluent-bit/flb_plugin.h @@ -47,8 +47,12 @@ struct flb_plugins { struct flb_plugins *flb_plugin_create(); int flb_plugin_load(char *path, struct flb_plugins *ctx, struct flb_config *config); + int flb_plugin_load_router(char *path, struct flb_config *config); + int flb_plugin_load_config_file(const char *file, struct flb_config *config); +int flb_plugin_load_config_format(struct flb_cf *cf, struct flb_config *config); + void flb_plugin_destroy(struct flb_plugins *ctx); #endif diff --git a/src/flb_plugin.c b/src/flb_plugin.c index a26bd013576..fa36c8f0367 100644 --- a/src/flb_plugin.c +++ b/src/flb_plugin.c @@ -353,6 +353,33 @@ int flb_plugin_load_router(char *path, struct flb_config *config) return 0; } +int flb_plugin_load_config_format(struct flb_cf *cf, struct flb_config *config) +{ + int ret; + struct mk_list *head; + struct cfl_list *head_e; + struct flb_cf_section *section; + struct cfl_kvpair *entry; + + /* read all 'plugins' sections */ + mk_list_foreach(head, &cf->plugins) { + section = mk_list_entry(head, struct flb_cf_section, _head_section); + + cfl_list_foreach(head_e, §ion->properties->list) { + entry = cfl_list_entry(head_e, struct cfl_kvpair, _head); + + /* Load plugin with router function */ + ret = flb_plugin_load_router(entry->key, config); + if (ret == -1) { + flb_cf_destroy(cf); + return -1; + } + } + } + + return 0; +} + /* Load plugins from a configuration file */ int flb_plugin_load_config_file(const char *file, struct flb_config *config) { @@ -366,6 +393,7 @@ int flb_plugin_load_config_file(const char *file, struct flb_config *config) struct flb_cf_section *section; struct cfl_kvpair *entry; + printf("load from config file\n"); #ifndef FLB_HAVE_STATIC_CONF ret = stat(file, &st); if (ret == -1 && errno == ENOENT) { From e9cc39e85990bf1161ee916860d7a7ca58fd6462 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Thu, 24 Oct 2024 15:44:52 -0600 Subject: [PATCH 16/26] config: load plugins if they are part of the main context Signed-off-by: Eduardo Silva --- src/flb_config.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/flb_config.c b/src/flb_config.c index da12e9053d4..32dc34b7e83 100644 --- a/src/flb_config.c +++ b/src/flb_config.c @@ -945,6 +945,11 @@ int flb_config_load_config_format(struct flb_config *config, struct flb_cf *cf) return -1; } + ret = flb_plugin_load_config_format(cf, config); + if (ret == -1) { + return -1; + } + ret = configure_plugins_type(config, cf, FLB_CF_CUSTOM); if (ret == -1) { return -1; From 2b93767f694d5d5242312e80a1e870293c34be10 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Thu, 24 Oct 2024 15:45:32 -0600 Subject: [PATCH 17/26] config_format: yaml: add support to parse 'plugins' section Signed-off-by: Eduardo Silva --- include/fluent-bit/config_format/flb_cf.h | 10 +++- src/config_format/flb_cf_yaml.c | 71 +++++++++++++++++++++-- 2 files changed, 72 insertions(+), 9 deletions(-) diff --git a/include/fluent-bit/config_format/flb_cf.h b/include/fluent-bit/config_format/flb_cf.h index 5c6f6448dfb..dd21dd25b85 100644 --- a/include/fluent-bit/config_format/flb_cf.h +++ b/include/fluent-bit/config_format/flb_cf.h @@ -55,8 +55,9 @@ enum cf_file_format { enum section_type { FLB_CF_SERVICE = 0, /* [SERVICE] */ FLB_CF_PARSER, /* [PARSER] */ - FLB_CF_MULTILINE_PARSER, /* [MULTILINE_PARSER] */ - FLB_CF_STREAM_PROCESSOR, /* STREAM_PROCESSOR */ + FLB_CF_MULTILINE_PARSER, /* multiline_parser */ + FLB_CF_STREAM_PROCESSOR, /* stream_processor */ + FLB_CF_PLUGINS, /* plugins */ FLB_CF_CUSTOM, /* [CUSTOM] */ FLB_CF_INPUT, /* [INPUT] */ FLB_CF_FILTER, /* [FILTER] */ @@ -101,7 +102,10 @@ struct flb_cf { /* stream processor: every entry is added as a task */ struct mk_list stream_processors; - /* custom plugins */ + /* external plugins (.so) */ + struct mk_list plugins; + + /* 'custom' type plugins */ struct mk_list customs; /* pipeline */ diff --git a/src/config_format/flb_cf_yaml.c b/src/config_format/flb_cf_yaml.c index abc881b64ca..b80a65ebb42 100644 --- a/src/config_format/flb_cf_yaml.c +++ b/src/config_format/flb_cf_yaml.c @@ -61,6 +61,7 @@ enum section { SECTION_MULTILINE_PARSER, SECTION_MULTILINE_PARSER_RULE, SECTION_STREAM_PROCESSOR, + SECTION_PLUGINS, SECTION_OTHER, }; @@ -155,6 +156,9 @@ enum state { STATE_STREAM_PROCESSOR_ENTRY, STATE_STREAM_PROCESSOR_KEY, + /* Plugins */ + STATE_PLUGINS, + /* environment variables */ STATE_ENV, @@ -288,6 +292,8 @@ static char *state_str(enum state val) return "multiline-parser"; case STATE_STREAM_PROCESSOR: return "stream-processor"; + case STATE_PLUGINS: + return "plugins"; case STATE_STOP: return "stop"; default: @@ -322,6 +328,9 @@ static int add_section_type(struct flb_cf *conf, struct parser_state *state) else if (state->section == SECTION_STREAM_PROCESSOR) { state->cf_section = flb_cf_section_create(conf, "stream_processor", 0); } + else if (state->section == SECTION_PLUGINS) { + state->cf_section = flb_cf_section_create(conf, "plugins", 0); + } else { state->cf_section = flb_cf_section_create(conf, "other", 0); } @@ -1017,9 +1026,10 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx, break; - /* Handle the 'multiline_parsers' section */ - /* *****************************************/ - /* Handle the 'multiline_parsers' section */ + /* + * Handle the 'multiline_parsers' section + * -------------------------------------- + */ case STATE_MULTILINE_PARSER: switch (event->type) { case YAML_SEQUENCE_START_EVENT: @@ -1139,7 +1149,6 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx, return YAML_FAILURE; } - //state = state_push_withvals(ctx, state, STATE_GROUP_KEY); state = state_push(ctx, STATE_GROUP_KEY); if (state == NULL) { flb_error("unable to allocate state"); @@ -1148,8 +1157,7 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx, /* create group */ state->values = flb_cf_section_property_add_list(conf, state->cf_section->properties, - //state->key, flb_sds_len(state->key)); - "rules", 5); + "rules", 5); if (state->values == NULL) { flb_error("no values"); @@ -1254,6 +1262,50 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx, } break; + /* + * Plugins: define a list of absolute paths for external plugins to load + * --------------------------------------------------------------------- + */ + case STATE_PLUGINS: + switch (event->type) { + case YAML_SEQUENCE_START_EVENT: + /* create the section */ + if (add_section_type(conf, state) == -1) { + flb_error("Unable to add parsers section"); + return YAML_FAILURE; + } + break; + + case YAML_SCALAR_EVENT: + /* Store the path as an entry in the plugins section */ + value = (char *)event->data.scalar.value; + + /* + * note that we pass an empty string as the real value since this is + * a list of items. + */ + if (flb_cf_section_property_add(conf, state->cf_section->properties, + value, strlen(value), "", 0) == NULL) { + flb_error("Unable to add plugin path"); + return YAML_FAILURE; + } + break; + + case YAML_SEQUENCE_END_EVENT: + /* Pop back to the previous state */ + state = state_pop(ctx); + if (!state) { + flb_error("No state left"); + return YAML_FAILURE; + } + break; + + default: + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + } + break; + /* * 'customs' * -------- @@ -1361,6 +1413,13 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx, return YAML_FAILURE; } } + else if (strcasecmp(value, "plugins") == 0) { + state = state_push_section(ctx, STATE_PLUGINS, SECTION_PLUGINS); + if (state == NULL) { + flb_error("unable to allocate state"); + return YAML_FAILURE; + } + } else if (strcasecmp(value, "pipeline") == 0) { state = state_push_section(ctx, STATE_PIPELINE, SECTION_PIPELINE); if (state == NULL) { From 8328ed786a13a0f62ccdab08f1ea168958dd0e2a Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Thu, 24 Oct 2024 15:45:52 -0600 Subject: [PATCH 18/26] tests: internal: config_format: yaml: add unit tests for plugins Signed-off-by: Eduardo Silva --- tests/internal/config_format_yaml.c | 67 ++++++++++++++++--- .../data/config_format/yaml/plugins.yaml | 16 +++++ 2 files changed, 72 insertions(+), 11 deletions(-) create mode 100644 tests/internal/data/config_format/yaml/plugins.yaml diff --git a/tests/internal/config_format_yaml.c b/tests/internal/config_format_yaml.c index f5568bbbbe2..d58ee0d4c72 100644 --- a/tests/internal/config_format_yaml.c +++ b/tests/internal/config_format_yaml.c @@ -22,6 +22,7 @@ #define FLB_002 FLB_TESTS_CONF_PATH "/processors.yaml" #define FLB_003 FLB_TESTS_CONF_PATH "/parsers_and_multiline_parsers.yaml" #define FLB_004 FLB_TESTS_CONF_PATH "/stream_processor.yaml" +#define FLB_005 FLB_TESTS_CONF_PATH "/plugins.yaml" #define FLB_000_WIN FLB_TESTS_CONF_PATH "\\fluent-bit-windows.yaml" #define FLB_BROKEN_PLUGIN_VARIANT FLB_TESTS_CONF_PATH "/broken_plugin_variant.yaml" @@ -460,6 +461,7 @@ static void test_processors() static void test_parsers_and_multiline_parsers() { + int idx = 0; flb_sds_t str; struct mk_list *head; struct mk_list *rule_head; @@ -467,12 +469,7 @@ static void test_parsers_and_multiline_parsers() struct flb_cf_section *s; struct flb_cf_group *g; struct cfl_variant *v; - struct cfl_variant *logs; struct cfl_variant *tmp; - struct cfl_variant *record_modifier_filter; - struct cfl_variant *records; - struct cfl_variant *record; - int idx = 0; cf = flb_cf_yaml_create(NULL, FLB_003, NULL, 0); TEST_CHECK(cf != NULL); @@ -567,16 +564,11 @@ static void test_parsers_and_multiline_parsers() static void test_stream_processor() { + int idx = 0; struct mk_list *head; struct flb_cf *cf; struct flb_cf_section *s; - struct flb_cf_group *g; struct cfl_variant *v; - struct cfl_variant *tmp; - struct cfl_variant *record_modifier_filter; - struct cfl_variant *records; - struct cfl_variant *record; - int idx = 0; cf = flb_cf_yaml_create(NULL, FLB_004, NULL, 0); TEST_CHECK(cf != NULL); @@ -630,6 +622,58 @@ static void test_stream_processor() flb_cf_destroy(cf); } +static void test_plugins() +{ + int idx = 0; + struct mk_list *head; + struct flb_cf *cf; + struct flb_cf_section *s; + + struct cfl_kvpair *path; + struct cfl_list *path_head; + + cf = flb_cf_yaml_create(NULL, FLB_005, NULL, 0); + TEST_CHECK(cf != NULL); + if (!cf) { + exit(EXIT_FAILURE); + } + + /* Total number of sections */ + TEST_CHECK(mk_list_size(&cf->sections) == 4); + + /* Check number sections per list */ + TEST_CHECK(mk_list_size(&cf->plugins) == 1); + TEST_CHECK(mk_list_size(&cf->parsers) == 0); + TEST_CHECK(mk_list_size(&cf->multiline_parsers) == 0); + TEST_CHECK(mk_list_size(&cf->customs) == 0); + TEST_CHECK(mk_list_size(&cf->inputs) == 1); + TEST_CHECK(mk_list_size(&cf->filters) == 0); + TEST_CHECK(mk_list_size(&cf->outputs) == 1); + TEST_CHECK(mk_list_size(&cf->others) == 0); + + + mk_list_foreach(head, &cf->plugins) { + s = mk_list_entry(head, struct flb_cf_section, _head_section); + + idx = 0; + cfl_list_foreach(path_head, &s->properties->list) { + path = cfl_list_entry(path_head, struct cfl_kvpair, _head); + + switch (idx) { + case 0: + TEST_CHECK(strcmp(path->key, "/path/to/out_gstdout.so") == 0); + break; + case 1: + TEST_CHECK(strcmp(path->key, "/path/to/out_fluent.so") == 0); + break; + }; + idx++; + } + } + + flb_cf_destroy(cf); +} + TEST_LIST = { { "basic" , test_basic}, { "customs section", test_customs_section}, @@ -641,5 +685,6 @@ TEST_LIST = { { "processors", test_processors}, { "parsers_and_multiline_parsers", test_parsers_and_multiline_parsers}, { "stream_processor", test_stream_processor}, + { "plugins", test_plugins}, { 0 } }; diff --git a/tests/internal/data/config_format/yaml/plugins.yaml b/tests/internal/data/config_format/yaml/plugins.yaml new file mode 100644 index 00000000000..79378f6e55b --- /dev/null +++ b/tests/internal/data/config_format/yaml/plugins.yaml @@ -0,0 +1,16 @@ +--- + +plugins: + - /path/to/out_gstdout.so + - /path/to/out_fluent.so + +service: + log_level: info + +pipeline: + inputs: + - name: random + + outputs: + - name: gstdout + match: '*' From 0a1536873836e16a2511ac5f081121141078be9f Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Thu, 24 Oct 2024 15:48:17 -0600 Subject: [PATCH 19/26] sp: validate config_format context before iterate Signed-off-by: Eduardo Silva --- src/stream_processor/flb_sp.c | 44 ++++++++++++++++++----------------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/src/stream_processor/flb_sp.c b/src/stream_processor/flb_sp.c index 5e5b601ca43..07a19abddc5 100644 --- a/src/stream_processor/flb_sp.c +++ b/src/stream_processor/flb_sp.c @@ -728,29 +728,31 @@ struct flb_sp *flb_sp_create(struct flb_config *config) } /* register stream processor tasks registered through Yaml config */ - mk_list_foreach(head, &config->cf_main->stream_processors) { - section = mk_list_entry(head, struct flb_cf_section, _head_section); - - /* task name */ - var = cfl_kvlist_fetch(section->properties, "name"); - if (!var || var->type != CFL_VARIANT_STRING) { - flb_error("[sp] missing 'name' property in stream_processor section"); - continue; - } - task_name = var->data.as_string; + if (config->cf_main) { + mk_list_foreach(head, &config->cf_main->stream_processors) { + section = mk_list_entry(head, struct flb_cf_section, _head_section); + + /* task name */ + var = cfl_kvlist_fetch(section->properties, "name"); + if (!var || var->type != CFL_VARIANT_STRING) { + flb_error("[sp] missing 'name' property in stream_processor section"); + continue; + } + task_name = var->data.as_string; - /* task exec/query */ - var = cfl_kvlist_fetch(section->properties, "exec"); - if (!var || var->type != CFL_VARIANT_STRING) { - flb_error("[sp] missing 'exec' property in stream_processor section"); - continue; - } - task_exec = var->data.as_string; + /* task exec/query */ + var = cfl_kvlist_fetch(section->properties, "exec"); + if (!var || var->type != CFL_VARIANT_STRING) { + flb_error("[sp] missing 'exec' property in stream_processor section"); + continue; + } + task_exec = var->data.as_string; - /* create task */ - task = flb_sp_task_create(sp, task_name, task_exec); - if (!task) { - continue; + /* create task */ + task = flb_sp_task_create(sp, task_name, task_exec); + if (!task) { + continue; + } } } From f4946fe5cebf2e5fa77d6105d01ce57013818e65 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Thu, 24 Oct 2024 15:54:26 -0600 Subject: [PATCH 20/26] plugin: allow plugins_file to support Yaml Signed-off-by: Eduardo Silva --- src/flb_plugin.c | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/flb_plugin.c b/src/flb_plugin.c index fa36c8f0367..d55fced3287 100644 --- a/src/flb_plugin.c +++ b/src/flb_plugin.c @@ -393,7 +393,6 @@ int flb_plugin_load_config_file(const char *file, struct flb_config *config) struct flb_cf_section *section; struct cfl_kvpair *entry; - printf("load from config file\n"); #ifndef FLB_HAVE_STATIC_CONF ret = stat(file, &st); if (ret == -1 && errno == ENOENT) { @@ -423,7 +422,17 @@ int flb_plugin_load_config_file(const char *file, struct flb_config *config) return -1; } - /* read all 'plugins' sections */ + /* + * pass to the config_format loader also in case some Yaml have been included in + * the service section through the option 'plugins_file' + */ + ret = flb_plugin_load_config_format(cf, config); + if (ret == -1) { + flb_cf_destroy(cf); + return -1; + } + + /* (classic mode) read all 'plugins' sections */ mk_list_foreach(head, &cf->sections) { section = mk_list_entry(head, struct flb_cf_section, _head); if (strcasecmp(section->name, "plugins") != 0) { From 2381a1ffb02c8f935f193fe2e248de819993d266 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Thu, 24 Oct 2024 22:14:29 -0600 Subject: [PATCH 21/26] config_format: register section upstream_servers Signed-off-by: Eduardo Silva --- src/config_format/flb_config_format.c | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/config_format/flb_config_format.c b/src/config_format/flb_config_format.c index a9cc1b333dd..92f92add099 100644 --- a/src/config_format/flb_config_format.c +++ b/src/config_format/flb_config_format.c @@ -125,6 +125,9 @@ struct flb_cf *flb_cf_create() /* external plugins (*.so) */ mk_list_init(&ctx->plugins); + /* upstream servers */ + mk_list_init(&ctx->upstream_servers); + /* 'custom' type plugins */ mk_list_init(&ctx->customs); @@ -181,6 +184,9 @@ static enum section_type get_section_type(char *name, int len) else if (strncasecmp(name, "plugins", len) == 0) { return FLB_CF_PLUGINS; } + else if (strncasecmp(name, "upstream_servers", len) == 0) { + return FLB_CF_UPSTREAM_SERVERS; + } else if (strncasecmp(name, "custom", len) == 0 || strncasecmp(name, "customs", len) == 0) { return FLB_CF_CUSTOM; @@ -652,6 +658,9 @@ struct flb_cf_section *flb_cf_section_create(struct flb_cf *cf, char *name, int else if (type == FLB_CF_PLUGINS) { mk_list_add(&s->_head_section, &cf->plugins); } + else if (type == FLB_CF_UPSTREAM_SERVERS) { + mk_list_add(&s->_head_section, &cf->upstream_servers); + } else if (type == FLB_CF_CUSTOM) { mk_list_add(&s->_head_section, &cf->customs); } @@ -750,6 +759,8 @@ static char *section_type_str(int type) return "STREAM_PROCESSOR"; case FLB_CF_PLUGINS: return "PLUGINS"; + case FLB_CF_UPSTREAM_SERVERS: + return "UPSTREAM_SERVERS"; case FLB_CF_CUSTOM: return "CUSTOM"; case FLB_CF_INPUT: From 714cc5711d7e7febb582046228601208cffde794 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Thu, 24 Oct 2024 22:15:06 -0600 Subject: [PATCH 22/26] config_format: yaml: add support for upstream_servers Signed-off-by: Eduardo Silva --- include/fluent-bit/config_format/flb_cf.h | 4 + src/config_format/flb_cf_yaml.c | 191 ++++++++++++++++++++-- 2 files changed, 183 insertions(+), 12 deletions(-) diff --git a/include/fluent-bit/config_format/flb_cf.h b/include/fluent-bit/config_format/flb_cf.h index dd21dd25b85..b5ae20fa327 100644 --- a/include/fluent-bit/config_format/flb_cf.h +++ b/include/fluent-bit/config_format/flb_cf.h @@ -58,6 +58,7 @@ enum section_type { FLB_CF_MULTILINE_PARSER, /* multiline_parser */ FLB_CF_STREAM_PROCESSOR, /* stream_processor */ FLB_CF_PLUGINS, /* plugins */ + FLB_CF_UPSTREAM_SERVERS, /* upstream_servers */ FLB_CF_CUSTOM, /* [CUSTOM] */ FLB_CF_INPUT, /* [INPUT] */ FLB_CF_FILTER, /* [FILTER] */ @@ -105,6 +106,9 @@ struct flb_cf { /* external plugins (.so) */ struct mk_list plugins; + /* upstream servers */ + struct mk_list upstream_servers; + /* 'custom' type plugins */ struct mk_list customs; diff --git a/src/config_format/flb_cf_yaml.c b/src/config_format/flb_cf_yaml.c index b80a65ebb42..609f9b7aafd 100644 --- a/src/config_format/flb_cf_yaml.c +++ b/src/config_format/flb_cf_yaml.c @@ -62,6 +62,7 @@ enum section { SECTION_MULTILINE_PARSER_RULE, SECTION_STREAM_PROCESSOR, SECTION_PLUGINS, + SECTION_UPSTREAM_SERVERS, SECTION_OTHER, }; @@ -79,6 +80,8 @@ static char *section_names[] = { "multiline_parser", "multiline_parser_rule", "stream_processor", + "plugins", + "upstream_servers", "other" }; @@ -159,6 +162,14 @@ enum state { /* Plugins */ STATE_PLUGINS, + /* Upstream Servers */ + STATE_UPSTREAM_SERVERS, + STATE_UPSTREAM_SERVER, + STATE_UPSTREAM_SERVER_VALUE, + STATE_UPSTREAM_NODE_GROUP, + STATE_UPSTREAM_NODE, + STATE_UPSTREAM_NODE_VALUE, + /* environment variables */ STATE_ENV, @@ -294,6 +305,8 @@ static char *state_str(enum state val) return "stream-processor"; case STATE_PLUGINS: return "plugins"; + case STATE_UPSTREAM_SERVERS: + return "upstream-servers"; case STATE_STOP: return "stop"; default: @@ -331,6 +344,9 @@ static int add_section_type(struct flb_cf *conf, struct parser_state *state) else if (state->section == SECTION_PLUGINS) { state->cf_section = flb_cf_section_create(conf, "plugins", 0); } + else if (state->section == SECTION_UPSTREAM_SERVERS) { + state->cf_section = flb_cf_section_create(conf, "upstream_servers", 0); + } else { state->cf_section = flb_cf_section_create(conf, "other", 0); } @@ -422,6 +438,7 @@ static void yaml_error_event_line(struct local_ctx *ctx, struct parser_state *st #define yaml_error_event(ctx, state, event) \ yaml_error_event_line(ctx, state, event, __LINE__) + static void yaml_error_definition(struct local_ctx *ctx, struct parser_state *state, yaml_event_t *event, char *value) { @@ -1001,10 +1018,10 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx, switch (event->type) { case YAML_SCALAR_EVENT: /* Store the value for the previous key */ - value = (char *)event->data.scalar.value; + value = (char *) event->data.scalar.value; if (flb_cf_section_property_add(conf, state->cf_section->properties, - state->key, flb_sds_len(state->key), - value, strlen(value)) < 0) { + state->key, flb_sds_len(state->key), + value, strlen(value)) < 0) { flb_error("unable to add property"); return YAML_FAILURE; } @@ -1107,10 +1124,10 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx, switch (event->type) { case YAML_SCALAR_EVENT: /* Store the value for the previous key */ - value = (char *)event->data.scalar.value; + value = (char *) event->data.scalar.value; if (flb_cf_section_property_add(conf, state->cf_section->properties, - state->key, flb_sds_len(state->key), - value, strlen(value)) < 0) { + state->key, flb_sds_len(state->key), + value, strlen(value)) < 0) { flb_error("unable to add property"); return YAML_FAILURE; } @@ -1214,7 +1231,7 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx, case STATE_STREAM_PROCESSOR_ENTRY: switch (event->type) { case YAML_SCALAR_EVENT: - value = (char *)event->data.scalar.value; + value = (char *) event->data.scalar.value; state = state_push_key(ctx, STATE_STREAM_PROCESSOR_KEY, value); if (!state) { @@ -1240,7 +1257,7 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx, case STATE_STREAM_PROCESSOR_KEY: switch (event->type) { case YAML_SCALAR_EVENT: - value = (char *)event->data.scalar.value; + value = (char *) event->data.scalar.value; if (flb_cf_section_property_add(conf, state->cf_section->properties, state->key, flb_sds_len(state->key), @@ -1278,7 +1295,7 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx, case YAML_SCALAR_EVENT: /* Store the path as an entry in the plugins section */ - value = (char *)event->data.scalar.value; + value = (char *) event->data.scalar.value; /* * note that we pass an empty string as the real value since this is @@ -1306,6 +1323,149 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx, } break; + /* + * Upstream Servers + * ---------------- + */ + case STATE_UPSTREAM_SERVERS: + switch (event->type) { + case YAML_SEQUENCE_START_EVENT: + break; + + case YAML_MAPPING_START_EVENT: + if (add_section_type(conf, state) == -1) { + flb_error("Unable to add parsers section"); + return YAML_FAILURE; + } + + state = state_push_withvals(ctx, state, STATE_UPSTREAM_SERVER); + if (!state) { + flb_error("Unable to allocate state for upstream server"); + return YAML_FAILURE; + } + break; + + case YAML_SEQUENCE_END_EVENT: + state = state_pop(ctx); + break; + + default: + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + } + break; + + /* Handling individual upstream server */ + case STATE_UPSTREAM_SERVER: + switch (event->type) { + case YAML_SCALAR_EVENT: + value = (char *) event->data.scalar.value; + + if (strcmp(value, "nodes") == 0) { + state = state_push_withvals(ctx, state, STATE_UPSTREAM_NODE_GROUP); + if (!state) { + flb_error("Unable to allocate state for node group"); + return YAML_FAILURE; + } + break; + } + + /* normal key value pair for the upstream server */ + state = state_push_key(ctx, STATE_UPSTREAM_SERVER_VALUE, value); + if (!state) { + flb_error("Unable to allocate state for upstream server key"); + return YAML_FAILURE; + } + break; + + case YAML_MAPPING_END_EVENT: + state = state_pop(ctx); + break; + + default: + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + } + break; + + /* Handling upstream server key-value pairs */ + case STATE_UPSTREAM_SERVER_VALUE: + if (event->type == YAML_SCALAR_EVENT) { + value = (char *) event->data.scalar.value; + if (flb_cf_section_property_add(conf, state->cf_section->properties, + state->key, flb_sds_len(state->key), + value, strlen(value)) == NULL) { + flb_error("Unable to add upstream server property"); + return YAML_FAILURE; + } + state = state_pop(ctx); + } + break; + + /* Handling node group */ + case STATE_UPSTREAM_NODE_GROUP: + switch(event->type) { + case YAML_SEQUENCE_START_EVENT: + break; + case YAML_SEQUENCE_END_EVENT: + state = state_pop(ctx); + if (state == NULL) { + flb_error("no state left"); + return YAML_FAILURE; + } + break; + case YAML_MAPPING_START_EVENT: + if (state_create_group(conf, state, "upstream_node") == YAML_FAILURE) { + flb_error("unable to create group"); + return YAML_FAILURE; + } + state = state_push(ctx, STATE_GROUP_KEY); + if (state == NULL) { + flb_error("unable to allocate state"); + return YAML_FAILURE; + } + break; + case YAML_MAPPING_END_EVENT: + return YAML_FAILURE; + break; + default: + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + }; + break; + + /* Handling individual node */ + case STATE_UPSTREAM_NODE: + switch (event->type) { + case YAML_SCALAR_EVENT: + value = (char *) event->data.scalar.value; + state = state_push_key(ctx, STATE_UPSTREAM_NODE_VALUE, value); + break; + + case YAML_MAPPING_END_EVENT: + state = state_pop(ctx); + break; + + default: + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + } + break; + + /* Handling node key-value pairs */ + case STATE_UPSTREAM_NODE_VALUE: + if (event->type == YAML_SCALAR_EVENT) { + value = (char *) event->data.scalar.value; + if (flb_cf_section_property_add(conf, state->cf_group->properties, + state->key, flb_sds_len(state->key), + value, strlen(value)) == NULL) { + flb_error("Unable to add node property"); + return YAML_FAILURE; + } + state = state_pop(ctx); + } + break; + /* * 'customs' * -------- @@ -1343,7 +1503,7 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx, case STATE_PIPELINE: switch (event->type) { case YAML_SCALAR_EVENT: - value = (char *)event->data.scalar.value; + value = (char *) event->data.scalar.value; if (strcasecmp(value, "inputs") == 0) { state = state_push_section(ctx, STATE_PLUGIN_INPUT, SECTION_INPUT); @@ -1383,7 +1543,7 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx, case STATE_SECTION: switch (event->type) { case YAML_SCALAR_EVENT: - value = (char *)event->data.scalar.value; + value = (char *) event->data.scalar.value; if (strcasecmp(value, "env") == 0) { state = state_push_section(ctx, STATE_ENV, SECTION_ENV); @@ -1420,6 +1580,13 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx, return YAML_FAILURE; } } + else if (strcasecmp(value, "upstream_servers") == 0) { + state = state_push_section(ctx, STATE_UPSTREAM_SERVERS, SECTION_UPSTREAM_SERVERS); + if (state == NULL) { + flb_error("unable to allocate state"); + return YAML_FAILURE; + } + } else if (strcasecmp(value, "pipeline") == 0) { state = state_push_section(ctx, STATE_PIPELINE, SECTION_PIPELINE); if (state == NULL) { @@ -1931,7 +2098,7 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx, return YAML_FAILURE; } - if (cfl_array_append_string(state->values, (char *)event->data.scalar.value) < 0) { + if (cfl_array_append_string(state->values, (char *) event->data.scalar.value) < 0) { flb_error("unable to add values to list"); return YAML_FAILURE; } From c6c8207b3767ffa7033cc87bf66956059400f38d Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Thu, 24 Oct 2024 22:15:41 -0600 Subject: [PATCH 23/26] upstream_ha: extend to read upstream servers configured by Yaml Signed-off-by: Eduardo Silva --- src/flb_upstream_ha.c | 133 ++++++++++++++++++++++++++++++------------ 1 file changed, 97 insertions(+), 36 deletions(-) diff --git a/src/flb_upstream_ha.c b/src/flb_upstream_ha.c index 62c6b3db956..951f0a16225 100644 --- a/src/flb_upstream_ha.c +++ b/src/flb_upstream_ha.c @@ -363,10 +363,13 @@ struct flb_upstream_ha *flb_upstream_ha_from_file(const char *file, char path[PATH_MAX + 1]; struct stat st; struct mk_list *head; + struct mk_list *g_head; struct flb_upstream_ha *ups; struct flb_upstream_node *node; struct flb_cf *cf = NULL; struct flb_cf_section *section; + struct flb_cf_group *group; + struct flb_cf_section *node_section; #ifndef FLB_HAVE_STATIC_CONF ret = stat(file, &st); @@ -394,49 +397,107 @@ struct flb_upstream_ha *flb_upstream_ha_from_file(const char *file, return NULL; } - /* 'upstream' sections are under enum section_type FLB_CF_OTHER */ - section = flb_cf_section_get_by_name(cf, "upstream"); - if (!section) { - flb_error("[upstream_ha] section name 'upstream' could not be found"); - flb_cf_destroy(cf); - return NULL; - } - - /* upstream name */ - tmp = flb_cf_section_property_get_string(cf, section, "name"); - if (!tmp) { - flb_error("[upstream_ha] missing name for upstream at %s", cfg); - flb_cf_destroy(cf); - return NULL; - } - - ups = flb_upstream_ha_create(tmp); - flb_sds_destroy(tmp); - if (!ups) { - flb_error("[upstream_ha] cannot create context"); - flb_cf_destroy(cf); - return NULL; - } + if (cf->format == FLB_CF_FLUENTBIT) { + /* 'upstream' sections are under enum section_type FLB_CF_OTHER */ + section = flb_cf_section_get_by_name(cf, "upstream"); + if (!section) { + flb_error("[upstream_ha] section name 'upstream' could not be found"); + flb_cf_destroy(cf); + return NULL; + } - /* 'node' sections */ - mk_list_foreach(head, &cf->sections) { - section = mk_list_entry(head, struct flb_cf_section, _head); - if (strcasecmp(section->name, "node") != 0) { - continue; + /* upstream name */ + tmp = flb_cf_section_property_get_string(cf, section, "name"); + if (!tmp) { + flb_error("[upstream_ha] missing name for upstream at %s", cfg); + flb_cf_destroy(cf); + return NULL; } - /* Read section info and create a Node context */ - node = create_node(c, cf, section, config); - if (!node) { - flb_error("[upstream_ha] cannot register node on upstream '%s'", - tmp); - flb_upstream_ha_destroy(ups); + ups = flb_upstream_ha_create(tmp); + flb_sds_destroy(tmp); + if (!ups) { + flb_error("[upstream_ha] cannot create context"); flb_cf_destroy(cf); return NULL; } - flb_upstream_ha_node_add(ups, node); - c++; + /* 'node' sections */ + mk_list_foreach(head, &cf->sections) { + section = mk_list_entry(head, struct flb_cf_section, _head); + if (strcasecmp(section->name, "node") != 0) { + continue; + } + + /* Read section info and create a Node context */ + node = create_node(c, cf, section, config); + if (!node) { + flb_error("[upstream_ha] cannot register node on upstream '%s'", + tmp); + flb_upstream_ha_destroy(ups); + flb_cf_destroy(cf); + return NULL; + } + + flb_upstream_ha_node_add(ups, node); + c++; + } + } + else if (cf->format == FLB_CF_YAML) { + mk_list_foreach(head, &cf->upstream_servers) { + section = mk_list_entry(head, struct flb_cf_section, _head_section); + + /* upstream name */ + tmp = flb_cf_section_property_get_string(cf, section, "name"); + if (!tmp) { + flb_error("[upstream_ha] missing name for upstream at %s", cfg); + flb_cf_destroy(cf); + return NULL; + } + + ups = flb_upstream_ha_create(tmp); + flb_sds_destroy(tmp); + if (!ups) { + flb_error("[upstream_ha] cannot create context"); + flb_cf_destroy(cf); + return NULL; + } + + /* iterate nodes (groups) */ + mk_list_foreach(g_head, §ion->groups) { + group = mk_list_entry(g_head, struct flb_cf_group, _head); + + /* + * create temporary node section: the node creation function needs a section, + * which is not the same as the group but similar: we just map the name and + * properties. + */ + node_section = flb_calloc(1, sizeof(struct flb_cf_section)); + if (!node_section) { + flb_errno(); + flb_upstream_ha_destroy(ups); + flb_cf_destroy(cf); + return NULL; + } + node_section->name = group->name; + node_section->properties = group->properties; + + /* Read section info and create a Node context */ + node = create_node(c, cf, node_section, config); + if (!node) { + flb_error("[upstream_ha] cannot register node on upstream '%s'", + tmp); + flb_upstream_ha_destroy(ups); + flb_cf_destroy(cf); + flb_free(node_section); + return NULL; + } + flb_free(node_section); + + flb_upstream_ha_node_add(ups, node); + c++; + } + } } if (c == 0) { From eda97c5600ce46dd6f57a767b43cf48a1c9f42ce Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Fri, 25 Oct 2024 14:20:14 -0600 Subject: [PATCH 24/26] tests: internal: config_format: yaml: add tests for upstream_servers Signed-off-by: Eduardo Silva --- tests/internal/config_format_yaml.c | 154 ++++++++++++++++++ .../data/config_format/yaml/upstream.yaml | 38 +++++ 2 files changed, 192 insertions(+) create mode 100644 tests/internal/data/config_format/yaml/upstream.yaml diff --git a/tests/internal/config_format_yaml.c b/tests/internal/config_format_yaml.c index d58ee0d4c72..c5054b77e7f 100644 --- a/tests/internal/config_format_yaml.c +++ b/tests/internal/config_format_yaml.c @@ -23,6 +23,7 @@ #define FLB_003 FLB_TESTS_CONF_PATH "/parsers_and_multiline_parsers.yaml" #define FLB_004 FLB_TESTS_CONF_PATH "/stream_processor.yaml" #define FLB_005 FLB_TESTS_CONF_PATH "/plugins.yaml" +#define FLB_006 FLB_TESTS_CONF_PATH "/upstream.yaml" #define FLB_000_WIN FLB_TESTS_CONF_PATH "\\fluent-bit-windows.yaml" #define FLB_BROKEN_PLUGIN_VARIANT FLB_TESTS_CONF_PATH "/broken_plugin_variant.yaml" @@ -674,6 +675,158 @@ static void test_plugins() flb_cf_destroy(cf); } +static void test_upstream_servers() +{ + int idx = 0; + int g_idx = 0; + struct mk_list *head; + struct mk_list *g_head; + struct flb_cf *cf; + struct flb_cf_section *s; + struct cfl_variant *v; + struct flb_cf_group *group; + + cf = flb_cf_yaml_create(NULL, FLB_006, NULL, 0); + TEST_CHECK(cf != NULL); + if (!cf) { + exit(EXIT_FAILURE); + } + + /* Total number of sections */ + TEST_CHECK(mk_list_size(&cf->sections) == 4); + + /* Check number sections per list */ + TEST_CHECK(mk_list_size(&cf->upstream_servers) == 2); + TEST_CHECK(mk_list_size(&cf->parsers) == 0); + TEST_CHECK(mk_list_size(&cf->multiline_parsers) == 0); + TEST_CHECK(mk_list_size(&cf->customs) == 0); + TEST_CHECK(mk_list_size(&cf->inputs) == 1); + TEST_CHECK(mk_list_size(&cf->filters) == 0); + TEST_CHECK(mk_list_size(&cf->outputs) == 1); + TEST_CHECK(mk_list_size(&cf->others) == 0); + + /* check upstream servers */ + idx = 0; + mk_list_foreach(head, &cf->upstream_servers) { + s = mk_list_entry(head, struct flb_cf_section, _head_section); + + switch (idx) { + case 0: + v = flb_cf_section_property_get(cf, s, "name"); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "forward-balancing") == 0); + + /* iterate node/groups */ + TEST_CHECK(mk_list_size(&s->groups) == 3); + + g_idx = 0; + mk_list_foreach(g_head, &s->groups) { + group = mk_list_entry(g_head, struct flb_cf_group, _head); + TEST_CHECK(group != NULL); + TEST_CHECK(strcmp(group->name, "upstream_node") == 0); + + switch (g_idx) { + case 0: + v = cfl_kvlist_fetch(group->properties, "name"); + TEST_CHECK(v != NULL); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "node-1") == 0); + + v = cfl_kvlist_fetch(group->properties, "host"); + TEST_CHECK(v != NULL); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "127.0.0.1") == 0); + + v = cfl_kvlist_fetch(group->properties, "port"); + TEST_CHECK(v != NULL); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "43000") == 0); + break; + + case 1: + v = cfl_kvlist_fetch(group->properties, "name"); + TEST_CHECK(v != NULL); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "node-2") == 0); + + v = cfl_kvlist_fetch(group->properties, "host"); + TEST_CHECK(v != NULL); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "127.0.0.1") == 0); + + v = cfl_kvlist_fetch(group->properties, "port"); + TEST_CHECK(v != NULL); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "44000") == 0); + break; + case 2: + v = cfl_kvlist_fetch(group->properties, "name"); + TEST_CHECK(v != NULL); + + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "node-3") == 0); + break; + }; + g_idx++; + } + break; + case 1: + v = flb_cf_section_property_get(cf, s, "name"); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "forward-balancing-2") == 0); + + g_idx = 0; + mk_list_foreach(g_head, &s->groups) { + group = mk_list_entry(g_head, struct flb_cf_group, _head); + TEST_CHECK(group != NULL); + TEST_CHECK(strcmp(group->name, "upstream_node") == 0); + + switch (g_idx) { + case 0: + v = cfl_kvlist_fetch(group->properties, "name"); + TEST_CHECK(v != NULL); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "node-A") == 0); + + v = cfl_kvlist_fetch(group->properties, "host"); + TEST_CHECK(v != NULL); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "192.168.1.10") == 0); + + v = cfl_kvlist_fetch(group->properties, "port"); + TEST_CHECK(v != NULL); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "50000") == 0); + + break; + case 1: + v = cfl_kvlist_fetch(group->properties, "name"); + TEST_CHECK(v != NULL); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "node-B") == 0); + + v = cfl_kvlist_fetch(group->properties, "host"); + TEST_CHECK(v != NULL); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "192.168.1.11") == 0); + + v = cfl_kvlist_fetch(group->properties, "port"); + TEST_CHECK(v != NULL); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "51000") == 0); + break; + }; + g_idx++; + } + + break; + }; + idx++; + } + + flb_cf_destroy(cf); +} + TEST_LIST = { { "basic" , test_basic}, { "customs section", test_customs_section}, @@ -686,5 +839,6 @@ TEST_LIST = { { "parsers_and_multiline_parsers", test_parsers_and_multiline_parsers}, { "stream_processor", test_stream_processor}, { "plugins", test_plugins}, + { "upstream_servers", test_upstream_servers}, { 0 } }; diff --git a/tests/internal/data/config_format/yaml/upstream.yaml b/tests/internal/data/config_format/yaml/upstream.yaml new file mode 100644 index 00000000000..9dcf0ffb499 --- /dev/null +++ b/tests/internal/data/config_format/yaml/upstream.yaml @@ -0,0 +1,38 @@ +--- + +upstream_servers: + - name: forward-balancing + nodes: + - name: node-1 + host: 127.0.0.1 + port: 43000 + + - name: node-2 + host: 127.0.0.1 + port: 44000 + + - name: node-3 + host: 127.0.0.1 + port: 45000 + tls: true + tls_verify: false + shared_key: secret + + - name: forward-balancing-2 + nodes: + - name: node-A + host: 192.168.1.10 + port: 50000 + + - name: node-B + host: 192.168.1.11 + port: 51000 + +pipeline: + inputs: + - name: random + + outputs: + - name: stdout + match: '*' + From 7dcf1f2d1136d74a7ddfd0e0b6b3c4cf85eb9588 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Sat, 26 Oct 2024 08:37:55 -0600 Subject: [PATCH 25/26] tests: internal: config_format: yaml: fix path for WIN32 Signed-off-by: Eduardo Silva --- tests/internal/config_format_yaml.c | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/internal/config_format_yaml.c b/tests/internal/config_format_yaml.c index c5054b77e7f..304dec9393c 100644 --- a/tests/internal/config_format_yaml.c +++ b/tests/internal/config_format_yaml.c @@ -20,7 +20,13 @@ #define FLB_000 FLB_TESTS_CONF_PATH "/fluent-bit.yaml" #define FLB_001 FLB_TESTS_CONF_PATH "/issue_7559.yaml" #define FLB_002 FLB_TESTS_CONF_PATH "/processors.yaml" + +#ifdef _WIN32 +#define FLB_003 FLB_TESTS_CONF_PATH "\\parsers_and_multiline_parsers.yaml" +#else #define FLB_003 FLB_TESTS_CONF_PATH "/parsers_and_multiline_parsers.yaml" +#endif + #define FLB_004 FLB_TESTS_CONF_PATH "/stream_processor.yaml" #define FLB_005 FLB_TESTS_CONF_PATH "/plugins.yaml" #define FLB_006 FLB_TESTS_CONF_PATH "/upstream.yaml" From ca31f519ab1d26578fd2b3aaae35102dc0daed4c Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Sat, 26 Oct 2024 09:44:47 -0600 Subject: [PATCH 26/26] upstream_ha: check if Yaml is available Signed-off-by: Eduardo Silva --- src/flb_upstream_ha.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/flb_upstream_ha.c b/src/flb_upstream_ha.c index 951f0a16225..d01e55ad5f3 100644 --- a/src/flb_upstream_ha.c +++ b/src/flb_upstream_ha.c @@ -443,6 +443,7 @@ struct flb_upstream_ha *flb_upstream_ha_from_file(const char *file, c++; } } +#ifdef FLB_HAVE_LIBYAML else if (cf->format == FLB_CF_YAML) { mk_list_foreach(head, &cf->upstream_servers) { section = mk_list_entry(head, struct flb_cf_section, _head_section); @@ -499,6 +500,7 @@ struct flb_upstream_ha *flb_upstream_ha_from_file(const char *file, } } } +#endif if (c == 0) { flb_error("[upstream_ha] no nodes defined");