diff --git a/include/fluent-bit/config_format/flb_cf.h b/include/fluent-bit/config_format/flb_cf.h index 37d50594bb5..b5ae20fa327 100644 --- a/include/fluent-bit/config_format/flb_cf.h +++ b/include/fluent-bit/config_format/flb_cf.h @@ -55,7 +55,10 @@ enum cf_file_format { enum section_type { FLB_CF_SERVICE = 0, /* [SERVICE] */ FLB_CF_PARSER, /* [PARSER] */ - FLB_CF_MULTILINE_PARSER, /* [MULTILINE_PARSER] */ + FLB_CF_MULTILINE_PARSER, /* multiline_parser */ + FLB_CF_STREAM_PROCESSOR, /* stream_processor */ + FLB_CF_PLUGINS, /* plugins */ + FLB_CF_UPSTREAM_SERVERS, /* upstream_servers */ FLB_CF_CUSTOM, /* [CUSTOM] */ FLB_CF_INPUT, /* [INPUT] */ FLB_CF_FILTER, /* [FILTER] */ @@ -97,7 +100,16 @@ struct flb_cf { struct mk_list parsers; struct mk_list multiline_parsers; - /* custom plugins */ + /* stream processor: every entry is added as a task */ + struct mk_list stream_processors; + + /* external plugins (.so) */ + struct mk_list plugins; + + /* upstream servers */ + struct mk_list upstream_servers; + + /* 'custom' type plugins */ struct mk_list customs; /* pipeline */ diff --git a/include/fluent-bit/flb_parser.h b/include/fluent-bit/flb_parser.h index 3c92ca489ea..afc8159b100 100644 --- a/include/fluent-bit/flb_parser.h +++ b/include/fluent-bit/flb_parser.h @@ -75,7 +75,7 @@ enum { FLB_PARSER_TYPE_HEX, }; -static inline time_t flb_parser_tm2time(const struct flb_tm *src, +static inline time_t flb_parser_tm2time(const struct flb_tm *src, int use_system_timezone) { struct tm tmp; @@ -107,6 +107,11 @@ struct flb_parser *flb_parser_create(const char *name, const char *format, struct flb_config *config); int flb_parser_conf_file_stat(const char *file, struct flb_config *config); int flb_parser_conf_file(const char *file, struct flb_config *config); +int flb_parser_load_parser_definitions(const char *cfg, struct flb_cf *cf, + struct flb_config *config); +int flb_parser_load_multiline_parser_definitions(const char *cfg, struct flb_cf *cf, + struct flb_config *config); + void flb_parser_destroy(struct flb_parser *parser); struct flb_parser *flb_parser_get(const char *name, struct flb_config *config); int flb_parser_do(struct flb_parser *parser, const char *buf, size_t length, diff --git a/include/fluent-bit/flb_plugin.h b/include/fluent-bit/flb_plugin.h index 44369c4a363..af78b1cde91 100644 --- a/include/fluent-bit/flb_plugin.h +++ b/include/fluent-bit/flb_plugin.h @@ -47,8 +47,12 @@ struct flb_plugins { struct flb_plugins *flb_plugin_create(); int flb_plugin_load(char *path, struct flb_plugins *ctx, struct flb_config *config); + int flb_plugin_load_router(char *path, struct flb_config *config); + int flb_plugin_load_config_file(const char *file, struct flb_config *config); +int flb_plugin_load_config_format(struct flb_cf *cf, struct flb_config *config); + void flb_plugin_destroy(struct flb_plugins *ctx); #endif diff --git a/src/config_format/flb_cf_yaml.c b/src/config_format/flb_cf_yaml.c index e09034393f3..609f9b7aafd 100644 --- a/src/config_format/flb_cf_yaml.c +++ b/src/config_format/flb_cf_yaml.c @@ -57,6 +57,12 @@ enum section { SECTION_FILTER, SECTION_OUTPUT, SECTION_PROCESSOR, + SECTION_PARSER, + SECTION_MULTILINE_PARSER, + SECTION_MULTILINE_PARSER_RULE, + SECTION_STREAM_PROCESSOR, + SECTION_PLUGINS, + SECTION_UPSTREAM_SERVERS, SECTION_OTHER, }; @@ -70,6 +76,12 @@ static char *section_names[] = { "filter", "output", "processor", + "parser", + "multiline_parser", + "multiline_parser_rule", + "stream_processor", + "plugins", + "upstream_servers", "other" }; @@ -130,6 +142,34 @@ enum state { STATE_INPUT_PROCESSORS, STATE_INPUT_PROCESSOR, + /* Parser */ + STATE_PARSER, /* parser section */ + STATE_PARSER_ENTRY, /* a parser definition */ + STATE_PARSER_KEY, /* reading a key inside a parser */ + STATE_PARSER_VALUE, /* reading a value inside a parser */ + + /* Multiline Parser */ + STATE_MULTILINE_PARSER, /* multiline parser section */ + STATE_MULTILINE_PARSER_ENTRY, /* a multiline parser definition */ + STATE_MULTILINE_PARSER_VALUE, /* reading a value inside a multiline parser */ + STATE_MULTILINE_PARSER_RULE, /* reading a multiline parser rule */ + + /* Stream Processor */ + STATE_STREAM_PROCESSOR, + STATE_STREAM_PROCESSOR_ENTRY, + STATE_STREAM_PROCESSOR_KEY, + + /* Plugins */ + STATE_PLUGINS, + + /* Upstream Servers */ + STATE_UPSTREAM_SERVERS, + STATE_UPSTREAM_SERVER, + STATE_UPSTREAM_SERVER_VALUE, + STATE_UPSTREAM_NODE_GROUP, + STATE_UPSTREAM_NODE, + STATE_UPSTREAM_NODE_VALUE, + /* environment variables */ STATE_ENV, @@ -152,17 +192,22 @@ struct parser_state { /* active section */ struct flb_cf_section *cf_section; + /* active group */ struct flb_cf_group *cf_group; /* key value */ flb_sds_t key; + /* section key/value list */ struct cfl_kvlist *keyvals; + /* pointer to current values in a list. */ struct cfl_array *values; + /* pointer to current variant */ struct cfl_variant *variant; + /* if the current variant is reading the key of a kvlist */ cfl_sds_t variant_kvlist_key; /* are we the owner of the values? */ @@ -252,6 +297,16 @@ static char *state_str(enum state val) return "processor"; case STATE_ENV: return "env"; + case STATE_PARSER: + return "parser"; + case STATE_MULTILINE_PARSER: + return "multiline-parser"; + case STATE_STREAM_PROCESSOR: + return "stream-processor"; + case STATE_PLUGINS: + return "plugins"; + case STATE_UPSTREAM_SERVERS: + return "upstream-servers"; case STATE_STOP: return "stop"; default: @@ -266,17 +321,35 @@ static int add_section_type(struct flb_cf *conf, struct parser_state *state) } if (state->section == SECTION_INPUT) { - state->cf_section = flb_cf_section_create(conf, "INPUT", 0); + state->cf_section = flb_cf_section_create(conf, "input", 0); } else if (state->section == SECTION_FILTER) { - state->cf_section = flb_cf_section_create(conf, "FILTER", 0); + state->cf_section = flb_cf_section_create(conf, "filter", 0); } else if (state->section == SECTION_OUTPUT) { - state->cf_section = flb_cf_section_create(conf, "OUTPUT", 0); + state->cf_section = flb_cf_section_create(conf, "output", 0); } else if (state->section == SECTION_CUSTOM) { state->cf_section = flb_cf_section_create(conf, "customs", 0); } + else if (state->section == SECTION_PARSER) { + state->cf_section = flb_cf_section_create(conf, "parser", 0); + } + else if (state->section == SECTION_MULTILINE_PARSER) { + state->cf_section = flb_cf_section_create(conf, "multiline_parser", 0); + } + else if (state->section == SECTION_STREAM_PROCESSOR) { + state->cf_section = flb_cf_section_create(conf, "stream_processor", 0); + } + else if (state->section == SECTION_PLUGINS) { + state->cf_section = flb_cf_section_create(conf, "plugins", 0); + } + else if (state->section == SECTION_UPSTREAM_SERVERS) { + state->cf_section = flb_cf_section_create(conf, "upstream_servers", 0); + } + else { + state->cf_section = flb_cf_section_create(conf, "other", 0); + } if (!state->cf_section) { return -1; @@ -328,8 +401,8 @@ static char *state_get_last(struct local_ctx *ctx) return entry->str; } -static void yaml_error_event(struct local_ctx *ctx, struct parser_state *state, - yaml_event_t *event) +static void yaml_error_event_line(struct local_ctx *ctx, struct parser_state *state, + yaml_event_t *event, int line) { struct flb_slist_entry *entry; @@ -362,6 +435,10 @@ static void yaml_error_event(struct local_ctx *ctx, struct parser_state *state, event_type_str(event), event->type, state_str(state->state), state->state); } +#define yaml_error_event(ctx, state, event) \ + yaml_error_event_line(ctx, state, event, __LINE__) + + static void yaml_error_definition(struct local_ctx *ctx, struct parser_state *state, yaml_event_t *event, char *value) { @@ -560,6 +637,7 @@ static int read_glob(struct flb_cf *conf, struct local_ctx *ctx, static void print_current_state(struct local_ctx *ctx, struct parser_state *state, yaml_event_t *event) { + /* note: change this to flb_info() for debugging purposes */ flb_debug("%*s%s->%s", state->level*2, "", state_str(state->state), event_type_str(event)); } @@ -571,6 +649,8 @@ static void print_current_properties(struct parser_state *state) struct cfl_variant *var; int idx; + /* note: change flb_debug with flb_info() for debugging purposes */ + flb_debug("%*s[%s] PROPERTIES:", state->level*2, "", section_names[state->section]); cfl_list_foreach(head, &state->keyvals->list) { @@ -869,6 +949,523 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx, break; /* end of 'includes' */ + /* Handle the 'parsers' section */ + case STATE_PARSER: + switch (event->type) { + case YAML_SEQUENCE_START_EVENT: + /* Start of the parsers list */ + break; + + case YAML_MAPPING_START_EVENT: + /* we handle each parser definition as a new section */ + if (add_section_type(conf, state) == -1) { + flb_error("Unable to add parsers section"); + return YAML_FAILURE; + } + + /* Start of an individual parser entry */ + state = state_push_withvals(ctx, state, STATE_PARSER_ENTRY); + if (!state) { + flb_error("Unable to allocate state for parser entry"); + return YAML_FAILURE; + } + break; + + case YAML_SEQUENCE_END_EVENT: + /* End of the parsers list */ + state = state_pop(ctx); + if (!state) { + flb_error("No state left"); + return YAML_FAILURE; + } + break; + + default: + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + } + break; + + case STATE_PARSER_ENTRY: + switch (event->type) { + case YAML_SCALAR_EVENT: + /* Found a key within the parser entry */ + value = (char *) event->data.scalar.value; + state = state_push_key(ctx, STATE_PARSER_KEY, value); + if (!state) { + flb_error("Unable to allocate state for parser key"); + return YAML_FAILURE; + } + break; + + case YAML_MAPPING_END_EVENT: + /* End of an individual parser entry */ + print_current_properties(state); + state = state_pop(ctx); + if (!state) { + flb_error("No state left"); + return YAML_FAILURE; + } + break; + + default: + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + } + break; + + case STATE_PARSER_KEY: + switch (event->type) { + case YAML_SCALAR_EVENT: + /* Store the value for the previous key */ + value = (char *) event->data.scalar.value; + if (flb_cf_section_property_add(conf, state->cf_section->properties, + state->key, flb_sds_len(state->key), + value, strlen(value)) < 0) { + flb_error("unable to add property"); + return YAML_FAILURE; + } + + /* Return to the parser entry state */ + state = state_pop(ctx); + if (!state) { + flb_error("No state left"); + return YAML_FAILURE; + } + break; + default: + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + } + break; + case STATE_PARSER_VALUE: + /* unused */ + break; + + + /* + * Handle the 'multiline_parsers' section + * -------------------------------------- + */ + case STATE_MULTILINE_PARSER: + switch (event->type) { + case YAML_SEQUENCE_START_EVENT: + /* Start of the multiline parsers list */ + break; + + case YAML_MAPPING_START_EVENT: + /* we handle each multiline parser definition as a new section */ + if (add_section_type(conf, state) == -1) { + flb_error("Unable to add multiline parsers section"); + return YAML_FAILURE; + } + + /* Start of an individual multiline parser entry */ + state = state_push_withvals(ctx, state, STATE_MULTILINE_PARSER_ENTRY); + if (!state) { + flb_error("Unable to allocate state for multiline parser entry"); + return YAML_FAILURE; + } + break; + + case YAML_SEQUENCE_END_EVENT: + /* End of the multiline parsers list */ + state = state_pop(ctx); + if (!state) { + flb_error("No state left"); + return YAML_FAILURE; + } + break; + + default: + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + } + break; + case STATE_MULTILINE_PARSER_ENTRY: + switch (event->type) { + case YAML_SCALAR_EVENT: + /* Found a key within the multiline parser entry */ + value = (char *) event->data.scalar.value; + + /* start of 'rules:' sequence */ + if (strcmp(value, "rules") == 0) { + state = state_push_withvals(ctx, state, STATE_MULTILINE_PARSER_RULE); + if (state == NULL) { + flb_error("Unable to allocate state for multiline parser rules"); + return YAML_FAILURE; + } + break; + } + + /* normal key value pair for the multiline parser */ + state = state_push_key(ctx, STATE_MULTILINE_PARSER_VALUE, value); + if (!state) { + flb_error("Unable to allocate state for multiline parser key"); + return YAML_FAILURE; + } + break; + + case YAML_MAPPING_END_EVENT: + /* End of an individual multiline parser entry */ + print_current_properties(state); + state = state_pop(ctx); + if (!state) { + flb_error("No state left"); + return YAML_FAILURE; + } + break; + default: + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + } + break; + case STATE_MULTILINE_PARSER_VALUE: + switch (event->type) { + case YAML_SCALAR_EVENT: + /* Store the value for the previous key */ + value = (char *) event->data.scalar.value; + if (flb_cf_section_property_add(conf, state->cf_section->properties, + state->key, flb_sds_len(state->key), + value, strlen(value)) < 0) { + flb_error("unable to add property"); + return YAML_FAILURE; + } + + /* Return to the multiline parser entry state */ + state = state_pop(ctx); + if (!state) { + flb_error("No state left"); + return YAML_FAILURE; + } + break; + default: + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + } + break; + + /* + * Multiline Parser "Rules" + * ------------------------ + */ + case STATE_MULTILINE_PARSER_RULE: + switch(event->type) { + case YAML_SEQUENCE_START_EVENT: + break; + case YAML_SEQUENCE_END_EVENT: + state = state_pop(ctx); + if (state == NULL) { + flb_error("no state left"); + return YAML_FAILURE; + } + break; + case YAML_MAPPING_START_EVENT: + if (state_create_group(conf, state, "rule") == YAML_FAILURE) { + flb_error("unable to create group"); + return YAML_FAILURE; + } + + state = state_push(ctx, STATE_GROUP_KEY); + if (state == NULL) { + flb_error("unable to allocate state"); + return YAML_FAILURE; + } + /* create group */ + state->values = flb_cf_section_property_add_list(conf, + state->cf_section->properties, + "rules", 5); + + if (state->values == NULL) { + flb_error("no values"); + return YAML_FAILURE; + } + + break; + case YAML_MAPPING_END_EVENT: + return YAML_FAILURE; + break; + default: + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + }; + break; + + + /* + * Stream Processor + * ---------------- + */ + case STATE_STREAM_PROCESSOR: + switch (event->type) { + case YAML_SEQUENCE_START_EVENT: + break; + + case YAML_MAPPING_START_EVENT: + if (add_section_type(conf, state) == -1) { + flb_error("Unable to add parsers section"); + return YAML_FAILURE; + } + + state = state_push_withvals(ctx, state, STATE_STREAM_PROCESSOR_ENTRY); + if (!state) { + flb_error("Unable to allocate state for stream processor entry"); + return YAML_FAILURE; + } + break; + + case YAML_SEQUENCE_END_EVENT: + state = state_pop(ctx); + if (!state) { + flb_error("No state left"); + return YAML_FAILURE; + } + break; + + default: + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + } + break; + + case STATE_STREAM_PROCESSOR_ENTRY: + switch (event->type) { + case YAML_SCALAR_EVENT: + value = (char *) event->data.scalar.value; + + state = state_push_key(ctx, STATE_STREAM_PROCESSOR_KEY, value); + if (!state) { + flb_error("Unable to allocate state for stream processor key"); + return YAML_FAILURE; + } + break; + + case YAML_MAPPING_END_EVENT: + state = state_pop(ctx); + if (!state) { + flb_error("No state left"); + return YAML_FAILURE; + } + break; + + default: + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + } + break; + + case STATE_STREAM_PROCESSOR_KEY: + switch (event->type) { + case YAML_SCALAR_EVENT: + value = (char *) event->data.scalar.value; + + if (flb_cf_section_property_add(conf, state->cf_section->properties, + state->key, flb_sds_len(state->key), + value, strlen(value)) < 0) { + flb_error("Unable to add property"); + return YAML_FAILURE; + } + + state = state_pop(ctx); + if (!state) { + flb_error("No state left"); + return YAML_FAILURE; + } + break; + + default: + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + } + break; + + /* + * Plugins: define a list of absolute paths for external plugins to load + * --------------------------------------------------------------------- + */ + case STATE_PLUGINS: + switch (event->type) { + case YAML_SEQUENCE_START_EVENT: + /* create the section */ + if (add_section_type(conf, state) == -1) { + flb_error("Unable to add parsers section"); + return YAML_FAILURE; + } + break; + + case YAML_SCALAR_EVENT: + /* Store the path as an entry in the plugins section */ + value = (char *) event->data.scalar.value; + + /* + * note that we pass an empty string as the real value since this is + * a list of items. + */ + if (flb_cf_section_property_add(conf, state->cf_section->properties, + value, strlen(value), "", 0) == NULL) { + flb_error("Unable to add plugin path"); + return YAML_FAILURE; + } + break; + + case YAML_SEQUENCE_END_EVENT: + /* Pop back to the previous state */ + state = state_pop(ctx); + if (!state) { + flb_error("No state left"); + return YAML_FAILURE; + } + break; + + default: + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + } + break; + + /* + * Upstream Servers + * ---------------- + */ + case STATE_UPSTREAM_SERVERS: + switch (event->type) { + case YAML_SEQUENCE_START_EVENT: + break; + + case YAML_MAPPING_START_EVENT: + if (add_section_type(conf, state) == -1) { + flb_error("Unable to add parsers section"); + return YAML_FAILURE; + } + + state = state_push_withvals(ctx, state, STATE_UPSTREAM_SERVER); + if (!state) { + flb_error("Unable to allocate state for upstream server"); + return YAML_FAILURE; + } + break; + + case YAML_SEQUENCE_END_EVENT: + state = state_pop(ctx); + break; + + default: + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + } + break; + + /* Handling individual upstream server */ + case STATE_UPSTREAM_SERVER: + switch (event->type) { + case YAML_SCALAR_EVENT: + value = (char *) event->data.scalar.value; + + if (strcmp(value, "nodes") == 0) { + state = state_push_withvals(ctx, state, STATE_UPSTREAM_NODE_GROUP); + if (!state) { + flb_error("Unable to allocate state for node group"); + return YAML_FAILURE; + } + break; + } + + /* normal key value pair for the upstream server */ + state = state_push_key(ctx, STATE_UPSTREAM_SERVER_VALUE, value); + if (!state) { + flb_error("Unable to allocate state for upstream server key"); + return YAML_FAILURE; + } + break; + + case YAML_MAPPING_END_EVENT: + state = state_pop(ctx); + break; + + default: + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + } + break; + + /* Handling upstream server key-value pairs */ + case STATE_UPSTREAM_SERVER_VALUE: + if (event->type == YAML_SCALAR_EVENT) { + value = (char *) event->data.scalar.value; + if (flb_cf_section_property_add(conf, state->cf_section->properties, + state->key, flb_sds_len(state->key), + value, strlen(value)) == NULL) { + flb_error("Unable to add upstream server property"); + return YAML_FAILURE; + } + state = state_pop(ctx); + } + break; + + /* Handling node group */ + case STATE_UPSTREAM_NODE_GROUP: + switch(event->type) { + case YAML_SEQUENCE_START_EVENT: + break; + case YAML_SEQUENCE_END_EVENT: + state = state_pop(ctx); + if (state == NULL) { + flb_error("no state left"); + return YAML_FAILURE; + } + break; + case YAML_MAPPING_START_EVENT: + if (state_create_group(conf, state, "upstream_node") == YAML_FAILURE) { + flb_error("unable to create group"); + return YAML_FAILURE; + } + state = state_push(ctx, STATE_GROUP_KEY); + if (state == NULL) { + flb_error("unable to allocate state"); + return YAML_FAILURE; + } + break; + case YAML_MAPPING_END_EVENT: + return YAML_FAILURE; + break; + default: + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + }; + break; + + /* Handling individual node */ + case STATE_UPSTREAM_NODE: + switch (event->type) { + case YAML_SCALAR_EVENT: + value = (char *) event->data.scalar.value; + state = state_push_key(ctx, STATE_UPSTREAM_NODE_VALUE, value); + break; + + case YAML_MAPPING_END_EVENT: + state = state_pop(ctx); + break; + + default: + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + } + break; + + /* Handling node key-value pairs */ + case STATE_UPSTREAM_NODE_VALUE: + if (event->type == YAML_SCALAR_EVENT) { + value = (char *) event->data.scalar.value; + if (flb_cf_section_property_add(conf, state->cf_group->properties, + state->key, flb_sds_len(state->key), + value, strlen(value)) == NULL) { + flb_error("Unable to add node property"); + return YAML_FAILURE; + } + state = state_pop(ctx); + } + break; + /* * 'customs' * -------- @@ -906,7 +1503,7 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx, case STATE_PIPELINE: switch (event->type) { case YAML_SCALAR_EVENT: - value = (char *)event->data.scalar.value; + value = (char *) event->data.scalar.value; if (strcasecmp(value, "inputs") == 0) { state = state_push_section(ctx, STATE_PLUGIN_INPUT, SECTION_INPUT); @@ -946,7 +1543,7 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx, case STATE_SECTION: switch (event->type) { case YAML_SCALAR_EVENT: - value = (char *)event->data.scalar.value; + value = (char *) event->data.scalar.value; if (strcasecmp(value, "env") == 0) { state = state_push_section(ctx, STATE_ENV, SECTION_ENV); @@ -955,6 +1552,41 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx, return YAML_FAILURE; } } + else if (strcasecmp(value, "parsers") == 0) { + state = state_push_section(ctx, STATE_PARSER, SECTION_PARSER); + if (state == NULL) { + flb_error("unable to allocate state"); + return YAML_FAILURE; + } + } + else if (strcasecmp(value, "multiline_parsers") == 0) { + state = state_push_section(ctx, STATE_MULTILINE_PARSER, SECTION_MULTILINE_PARSER); + if (state == NULL) { + flb_error("unable to allocate state"); + return YAML_FAILURE; + } + } + else if (strcasecmp(value, "stream_processor") == 0) { + state = state_push_section(ctx, STATE_STREAM_PROCESSOR, SECTION_STREAM_PROCESSOR); + if (state == NULL) { + flb_error("unable to allocate state"); + return YAML_FAILURE; + } + } + else if (strcasecmp(value, "plugins") == 0) { + state = state_push_section(ctx, STATE_PLUGINS, SECTION_PLUGINS); + if (state == NULL) { + flb_error("unable to allocate state"); + return YAML_FAILURE; + } + } + else if (strcasecmp(value, "upstream_servers") == 0) { + state = state_push_section(ctx, STATE_UPSTREAM_SERVERS, SECTION_UPSTREAM_SERVERS); + if (state == NULL) { + flb_error("unable to allocate state"); + return YAML_FAILURE; + } + } else if (strcasecmp(value, "pipeline") == 0) { state = state_push_section(ctx, STATE_PIPELINE, SECTION_PIPELINE); if (state == NULL) { @@ -985,7 +1617,7 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx, else if (strcasecmp(value, "customs") == 0) { state = state_push_section(ctx, STATE_CUSTOM, SECTION_CUSTOM); - if (state == NULL) { + if (state == NULL) { flb_error("unable to allocate state"); return YAML_FAILURE; } @@ -1466,7 +2098,7 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx, return YAML_FAILURE; } - if (cfl_array_append_string(state->values, (char *)event->data.scalar.value) < 0) { + if (cfl_array_append_string(state->values, (char *) event->data.scalar.value) < 0) { flb_error("unable to add values to list"); return YAML_FAILURE; } @@ -1685,11 +2317,13 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx, /* This is also the end of the plugin values mapping. * So we pop an additional state off the stack. */ - state = state_pop(ctx); + if (state->state == STATE_PLUGIN_VAL) { + state = state_pop(ctx); - if (state == NULL) { - flb_error("no state left"); - return YAML_FAILURE; + if (state == NULL) { + flb_error("no state left"); + return YAML_FAILURE; + } } break; default: @@ -2036,7 +2670,6 @@ static int state_create_section(struct flb_cf *conf, struct parser_state *state, } state->cf_section = flb_cf_section_create(conf, name, 0); - if (state->cf_section == NULL) { return -1; } @@ -2051,7 +2684,7 @@ static int state_create_group(struct flb_cf *conf, struct parser_state *state, c } state->cf_group = flb_cf_group_create(conf, state->cf_section, - "processors", strlen("processors")); + name, strlen(name)); if (state->cf_group == NULL) { return -1; @@ -2322,7 +2955,9 @@ struct flb_cf *flb_cf_yaml_create(struct flb_cf *conf, char *file_path, if (!conf) { return NULL; } - + flb_cf_set_origin_format(conf, FLB_CF_YAML); + } + else { flb_cf_set_origin_format(conf, FLB_CF_YAML); } diff --git a/src/config_format/flb_config_format.c b/src/config_format/flb_config_format.c index 2a6ab5f4601..92f92add099 100644 --- a/src/config_format/flb_config_format.c +++ b/src/config_format/flb_config_format.c @@ -119,7 +119,16 @@ struct flb_cf *flb_cf_create() mk_list_init(&ctx->parsers); mk_list_init(&ctx->multiline_parsers); - /* custom plugins */ + /* stream processors */ + mk_list_init(&ctx->stream_processors); + + /* external plugins (*.so) */ + mk_list_init(&ctx->plugins); + + /* upstream servers */ + mk_list_init(&ctx->upstream_servers); + + /* 'custom' type plugins */ mk_list_init(&ctx->customs); /* pipeline */ @@ -160,29 +169,38 @@ int flb_cf_set_origin_format(struct flb_cf *cf, int format) static enum section_type get_section_type(char *name, int len) { - if (strncasecmp(name, "SERVICE", len) == 0) { + if (strncasecmp(name, "service", len) == 0) { return FLB_CF_SERVICE; } - else if (strncasecmp(name, "PARSER", len) == 0) { + else if (strncasecmp(name, "parser", len) == 0) { return FLB_CF_PARSER; } - else if (strncasecmp(name, "MULTILINE_PARSER", len) == 0) { + else if (strncasecmp(name, "multiline_parser", len) == 0) { return FLB_CF_MULTILINE_PARSER; } - else if (strncasecmp(name, "CUSTOM", len) == 0 || - strncasecmp(name, "CUSTOMS", len) == 0) { + else if (strncasecmp(name, "stream_processor", len) == 0) { + return FLB_CF_STREAM_PROCESSOR; + } + else if (strncasecmp(name, "plugins", len) == 0) { + return FLB_CF_PLUGINS; + } + else if (strncasecmp(name, "upstream_servers", len) == 0) { + return FLB_CF_UPSTREAM_SERVERS; + } + else if (strncasecmp(name, "custom", len) == 0 || + strncasecmp(name, "customs", len) == 0) { return FLB_CF_CUSTOM; } - else if (strncasecmp(name, "INPUT", len) == 0 || - strncasecmp(name, "INPUTS", len) == 0) { + else if (strncasecmp(name, "input", len) == 0 || + strncasecmp(name, "inputs", len) == 0) { return FLB_CF_INPUT; } - else if (strncasecmp(name, "FILTER", len) == 0 || - strncasecmp(name, "FILTERS", len) == 0) { + else if (strncasecmp(name, "filter", len) == 0 || + strncasecmp(name, "filters", len) == 0) { return FLB_CF_FILTER; } - else if (strncasecmp(name, "OUTPUT", len) == 0 || - strncasecmp(name, "OUTPUTS", len) == 0) { + else if (strncasecmp(name, "output", len) == 0 || + strncasecmp(name, "outputs", len) == 0) { return FLB_CF_OUTPUT; } @@ -634,6 +652,15 @@ struct flb_cf_section *flb_cf_section_create(struct flb_cf *cf, char *name, int else if (type == FLB_CF_MULTILINE_PARSER) { mk_list_add(&s->_head_section, &cf->multiline_parsers); } + else if (type == FLB_CF_STREAM_PROCESSOR) { + mk_list_add(&s->_head_section, &cf->stream_processors); + } + else if (type == FLB_CF_PLUGINS) { + mk_list_add(&s->_head_section, &cf->plugins); + } + else if (type == FLB_CF_UPSTREAM_SERVERS) { + mk_list_add(&s->_head_section, &cf->upstream_servers); + } else if (type == FLB_CF_CUSTOM) { mk_list_add(&s->_head_section, &cf->customs); } @@ -728,6 +755,12 @@ static char *section_type_str(int type) return "PARSER"; case FLB_CF_MULTILINE_PARSER: return "MULTILINE_PARSER"; + case FLB_CF_STREAM_PROCESSOR: + return "STREAM_PROCESSOR"; + case FLB_CF_PLUGINS: + return "PLUGINS"; + case FLB_CF_UPSTREAM_SERVERS: + return "UPSTREAM_SERVERS"; case FLB_CF_CUSTOM: return "CUSTOM"; case FLB_CF_INPUT: diff --git a/src/flb_config.c b/src/flb_config.c index 747d855cf08..32dc34b7e83 100644 --- a/src/flb_config.c +++ b/src/flb_config.c @@ -907,11 +907,21 @@ int flb_config_load_config_format(struct flb_config *config, struct flb_cf *cf) /* Extra sanity checks */ if (strcasecmp(s->name, "parser") == 0 || strcasecmp(s->name, "multiline_parser") == 0) { - fprintf(stderr, - "Sections 'multiline_parser' and 'parser' are not valid in " - "the main configuration file. It belongs to \n" - "the 'parsers_file' configuration files.\n"); - return -1; + + /* + * Classic mode configuration don't allow parser or multiline_parser + * to be defined in the main configuration file. + */ + if (cf->format == FLB_CF_CLASSIC) { + fprintf(stderr, + "Sections 'multiline_parser' and 'parser' are not valid in " + "the main configuration file. It belongs to \n" + "the 'parsers_file' configuration files.\n"); + return -1; + } + else { + /* Yaml allow parsers definitions in any Yaml file, all good */ + } } } @@ -925,6 +935,21 @@ int flb_config_load_config_format(struct flb_config *config, struct flb_cf *cf) } } + ret = flb_parser_load_parser_definitions("", cf, config); + if (ret == -1) { + return -1; + } + + ret = flb_parser_load_multiline_parser_definitions("", cf, config); + if (ret == -1) { + return -1; + } + + ret = flb_plugin_load_config_format(cf, config); + if (ret == -1) { + return -1; + } + ret = configure_plugins_type(config, cf, FLB_CF_CUSTOM); if (ret == -1) { return -1; diff --git a/src/flb_parser.c b/src/flb_parser.c index 8df7f56dfb5..149ab7f1b91 100644 --- a/src/flb_parser.c +++ b/src/flb_parser.c @@ -313,15 +313,15 @@ struct flb_parser *flb_parser_create(const char *name, const char *format, p->time_frac_secs = (tmp + 2); } - /* - * Fall back to the system timezone - * if there is no zone parsed from the log. + /* + * Fall back to the system timezone + * if there is no zone parsed from the log. */ p->time_system_timezone = time_system_timezone; - /* - * Optional fixed timezone offset, only applied if - * not falling back to system timezone. + /* + * Optional fixed timezone offset, only applied if + * not falling back to system timezone. */ if (!p->time_system_timezone && time_offset) { diff = 0; @@ -481,9 +481,9 @@ static flb_sds_t get_parser_key(struct flb_config *config, return val; } -/* Config file: read 'parser' definitions */ -static int parser_conf_file(const char *cfg, struct flb_cf *cf, - struct flb_config *config) +/* Load each parser definition set in 'struct flb_cf *cf' */ +int flb_parser_load_parser_definitions(const char *cfg, struct flb_cf *cf, + struct flb_config *config) { int i = 0; flb_sds_t name; @@ -541,7 +541,7 @@ static int parser_conf_file(const char *cfg, struct flb_cf *cf, name, cfg); goto fconf_early_error; } - + /* skip_empty_values */ skip_empty = FLB_TRUE; tmp_str = get_parser_key(config, cf, s, "skip_empty_values"); @@ -605,7 +605,7 @@ static int parser_conf_file(const char *cfg, struct flb_cf *cf, /* Create the parser context */ if (!flb_parser_create(name, format, regex, skip_empty, time_fmt, time_key, time_offset, time_keep, time_strict, - time_system_timezone, logfmt_no_bare_keys, types, types_len, + time_system_timezone, logfmt_no_bare_keys, types, types_len, decoders, config)) { goto fconf_error; } @@ -680,6 +680,17 @@ static int parser_conf_file(const char *cfg, struct flb_cf *cf, return -1; } +static int multiline_rule_create(struct flb_ml_parser *ml_parser, + char *from_state, + char *regex_pattern, + char *to_state) +{ + int ret; + + ret = flb_ml_rule_create(ml_parser, from_state, regex_pattern, to_state, NULL); + return ret; +} + static int multiline_load_regex_rules(struct flb_ml_parser *ml_parser, struct flb_cf_section *section, struct flb_config *config) @@ -692,7 +703,47 @@ static int multiline_load_regex_rules(struct flb_ml_parser *ml_parser, struct flb_slist_entry *from_state; struct flb_slist_entry *regex_pattern; struct flb_slist_entry *tmp; + struct mk_list *g_head; + struct flb_cf_group *group; + struct cfl_variant *var_state; + struct cfl_variant *var_regex; + struct cfl_variant *var_next_state; + + /* Check if we have groups (coming from Yaml style config */ + mk_list_foreach(g_head, §ion->groups) { + /* Every group is a rule */ + group = cfl_list_entry(g_head, struct flb_cf_group, _head); + + var_state = cfl_kvlist_fetch(group->properties, "state"); + if (!var_state || var_state->type != CFL_VARIANT_STRING) { + flb_error("[multiline parser: %s] invalid 'state' key", ml_parser->name); + return -1; + } + + var_regex = cfl_kvlist_fetch(group->properties, "regex"); + if (!var_regex || var_regex->type != CFL_VARIANT_STRING) { + flb_error("[multiline parser: %s] invalid 'regex' key", ml_parser->name); + return -1; + } + var_next_state = cfl_kvlist_fetch(group->properties, "next_state"); + if (!var_next_state || var_next_state->type != CFL_VARIANT_STRING) { + flb_error("[multiline parser: %s] invalid 'next_state' key", ml_parser->name); + return -1; + } + + ret = multiline_rule_create(ml_parser, + var_state->data.as_string, + var_regex->data.as_string, + var_next_state->data.as_string); + + if (ret == -1) { + flb_error("[multiline parser: %s] error creating rule", ml_parser->name); + return -1; + } + } + + /* Multiline rules set by a Fluent Bit classic mode config */ cfl_list_foreach(head, §ion->properties->list) { entry = cfl_list_entry(head, struct cfl_kvpair, _head); @@ -705,7 +756,7 @@ static int multiline_load_regex_rules(struct flb_ml_parser *ml_parser, ret = flb_slist_split_tokens(&list, entry->val->data.as_string, 3); if (ret == -1) { flb_error("[multiline parser: %s] invalid section on key '%s'", - ml_parser->name, entry->key); + ml_parser->name, entry->key); return -1; } @@ -734,11 +785,10 @@ static int multiline_load_regex_rules(struct flb_ml_parser *ml_parser, return -1; } - ret = flb_ml_rule_create(ml_parser, - from_state->str, - regex_pattern->str, - to_state, - NULL); + ret = multiline_rule_create(ml_parser, + from_state->str, + regex_pattern->str, + to_state); if (ret == -1) { flb_error("[multiline parser: %s] error creating rule", ml_parser->name); @@ -762,8 +812,8 @@ static int multiline_load_regex_rules(struct flb_ml_parser *ml_parser, /* config file: read 'multiline_parser' sections */ -static int multiline_parser_conf_file(const char *cfg, struct flb_cf *cf, - struct flb_config *config) +int flb_parser_load_multiline_parser_definitions(const char *cfg, struct flb_cf *cf, + struct flb_config *config) { int ret; int type; @@ -781,6 +831,10 @@ static int multiline_parser_conf_file(const char *cfg, struct flb_cf *cf, struct flb_cf_section *s; struct flb_ml_parser *ml_parser; + /* + * debug content of cf: flb_cf_dump(cf); + */ + /* read all 'multiline_parser' sections */ mk_list_foreach(head, &cf->multiline_parsers) { ml_parser = NULL; @@ -946,15 +1000,15 @@ int flb_parser_conf_file(const char *file, struct flb_config *config) return -1; } - /* process 'parser' sections */ - ret = parser_conf_file(cfg, cf, config); + /* load the parser definitions */ + ret = flb_parser_load_parser_definitions(cfg, cf, config); if (ret == -1) { flb_cf_destroy(cf); return -1; } /* processs 'multiline_parser' sections */ - ret = multiline_parser_conf_file(cfg, cf, config); + ret = flb_parser_load_multiline_parser_definitions(cfg, cf, config); if (ret == -1) { flb_cf_destroy(cf); return -1; diff --git a/src/flb_plugin.c b/src/flb_plugin.c index a26bd013576..d55fced3287 100644 --- a/src/flb_plugin.c +++ b/src/flb_plugin.c @@ -353,6 +353,33 @@ int flb_plugin_load_router(char *path, struct flb_config *config) return 0; } +int flb_plugin_load_config_format(struct flb_cf *cf, struct flb_config *config) +{ + int ret; + struct mk_list *head; + struct cfl_list *head_e; + struct flb_cf_section *section; + struct cfl_kvpair *entry; + + /* read all 'plugins' sections */ + mk_list_foreach(head, &cf->plugins) { + section = mk_list_entry(head, struct flb_cf_section, _head_section); + + cfl_list_foreach(head_e, §ion->properties->list) { + entry = cfl_list_entry(head_e, struct cfl_kvpair, _head); + + /* Load plugin with router function */ + ret = flb_plugin_load_router(entry->key, config); + if (ret == -1) { + flb_cf_destroy(cf); + return -1; + } + } + } + + return 0; +} + /* Load plugins from a configuration file */ int flb_plugin_load_config_file(const char *file, struct flb_config *config) { @@ -395,7 +422,17 @@ int flb_plugin_load_config_file(const char *file, struct flb_config *config) return -1; } - /* read all 'plugins' sections */ + /* + * pass to the config_format loader also in case some Yaml have been included in + * the service section through the option 'plugins_file' + */ + ret = flb_plugin_load_config_format(cf, config); + if (ret == -1) { + flb_cf_destroy(cf); + return -1; + } + + /* (classic mode) read all 'plugins' sections */ mk_list_foreach(head, &cf->sections) { section = mk_list_entry(head, struct flb_cf_section, _head); if (strcasecmp(section->name, "plugins") != 0) { diff --git a/src/flb_upstream_ha.c b/src/flb_upstream_ha.c index 62c6b3db956..d01e55ad5f3 100644 --- a/src/flb_upstream_ha.c +++ b/src/flb_upstream_ha.c @@ -363,10 +363,13 @@ struct flb_upstream_ha *flb_upstream_ha_from_file(const char *file, char path[PATH_MAX + 1]; struct stat st; struct mk_list *head; + struct mk_list *g_head; struct flb_upstream_ha *ups; struct flb_upstream_node *node; struct flb_cf *cf = NULL; struct flb_cf_section *section; + struct flb_cf_group *group; + struct flb_cf_section *node_section; #ifndef FLB_HAVE_STATIC_CONF ret = stat(file, &st); @@ -394,50 +397,110 @@ struct flb_upstream_ha *flb_upstream_ha_from_file(const char *file, return NULL; } - /* 'upstream' sections are under enum section_type FLB_CF_OTHER */ - section = flb_cf_section_get_by_name(cf, "upstream"); - if (!section) { - flb_error("[upstream_ha] section name 'upstream' could not be found"); - flb_cf_destroy(cf); - return NULL; - } - - /* upstream name */ - tmp = flb_cf_section_property_get_string(cf, section, "name"); - if (!tmp) { - flb_error("[upstream_ha] missing name for upstream at %s", cfg); - flb_cf_destroy(cf); - return NULL; - } - - ups = flb_upstream_ha_create(tmp); - flb_sds_destroy(tmp); - if (!ups) { - flb_error("[upstream_ha] cannot create context"); - flb_cf_destroy(cf); - return NULL; - } + if (cf->format == FLB_CF_FLUENTBIT) { + /* 'upstream' sections are under enum section_type FLB_CF_OTHER */ + section = flb_cf_section_get_by_name(cf, "upstream"); + if (!section) { + flb_error("[upstream_ha] section name 'upstream' could not be found"); + flb_cf_destroy(cf); + return NULL; + } - /* 'node' sections */ - mk_list_foreach(head, &cf->sections) { - section = mk_list_entry(head, struct flb_cf_section, _head); - if (strcasecmp(section->name, "node") != 0) { - continue; + /* upstream name */ + tmp = flb_cf_section_property_get_string(cf, section, "name"); + if (!tmp) { + flb_error("[upstream_ha] missing name for upstream at %s", cfg); + flb_cf_destroy(cf); + return NULL; } - /* Read section info and create a Node context */ - node = create_node(c, cf, section, config); - if (!node) { - flb_error("[upstream_ha] cannot register node on upstream '%s'", - tmp); - flb_upstream_ha_destroy(ups); + ups = flb_upstream_ha_create(tmp); + flb_sds_destroy(tmp); + if (!ups) { + flb_error("[upstream_ha] cannot create context"); flb_cf_destroy(cf); return NULL; } - flb_upstream_ha_node_add(ups, node); - c++; + /* 'node' sections */ + mk_list_foreach(head, &cf->sections) { + section = mk_list_entry(head, struct flb_cf_section, _head); + if (strcasecmp(section->name, "node") != 0) { + continue; + } + + /* Read section info and create a Node context */ + node = create_node(c, cf, section, config); + if (!node) { + flb_error("[upstream_ha] cannot register node on upstream '%s'", + tmp); + flb_upstream_ha_destroy(ups); + flb_cf_destroy(cf); + return NULL; + } + + flb_upstream_ha_node_add(ups, node); + c++; + } } +#ifdef FLB_HAVE_LIBYAML + else if (cf->format == FLB_CF_YAML) { + mk_list_foreach(head, &cf->upstream_servers) { + section = mk_list_entry(head, struct flb_cf_section, _head_section); + + /* upstream name */ + tmp = flb_cf_section_property_get_string(cf, section, "name"); + if (!tmp) { + flb_error("[upstream_ha] missing name for upstream at %s", cfg); + flb_cf_destroy(cf); + return NULL; + } + + ups = flb_upstream_ha_create(tmp); + flb_sds_destroy(tmp); + if (!ups) { + flb_error("[upstream_ha] cannot create context"); + flb_cf_destroy(cf); + return NULL; + } + + /* iterate nodes (groups) */ + mk_list_foreach(g_head, §ion->groups) { + group = mk_list_entry(g_head, struct flb_cf_group, _head); + + /* + * create temporary node section: the node creation function needs a section, + * which is not the same as the group but similar: we just map the name and + * properties. + */ + node_section = flb_calloc(1, sizeof(struct flb_cf_section)); + if (!node_section) { + flb_errno(); + flb_upstream_ha_destroy(ups); + flb_cf_destroy(cf); + return NULL; + } + node_section->name = group->name; + node_section->properties = group->properties; + + /* Read section info and create a Node context */ + node = create_node(c, cf, node_section, config); + if (!node) { + flb_error("[upstream_ha] cannot register node on upstream '%s'", + tmp); + flb_upstream_ha_destroy(ups); + flb_cf_destroy(cf); + flb_free(node_section); + return NULL; + } + flb_free(node_section); + + flb_upstream_ha_node_add(ups, node); + c++; + } + } + } +#endif if (c == 0) { flb_error("[upstream_ha] no nodes defined"); diff --git a/src/stream_processor/flb_sp.c b/src/stream_processor/flb_sp.c index 96aa508d9d1..07a19abddc5 100644 --- a/src/stream_processor/flb_sp.c +++ b/src/stream_processor/flb_sp.c @@ -96,10 +96,19 @@ static int sp_config_file(struct flb_config *config, struct flb_sp *sp, return -1; } - /* Read all 'stream_task' sections */ + /* + * Note on reading the sections + * ---------------------------- + * Classic mode configuration looks for [STREAM_TASK], while the + * new Yaml parser expects the section names to be stream_processor. + * + * On Yaml mode, each pair of "name/exec" is set as an independent section, + * so the adjusted code below works for both type of files. + */ mk_list_foreach(head, &cf->sections) { section = mk_list_entry(head, struct flb_cf_section, _head); - if (strcasecmp(section->name, "stream_task") != 0) { + if (strcasecmp(section->name, "stream_task") != 0 && + strcasecmp(section->name, "stream_processor") != 0) { continue; } @@ -689,10 +698,14 @@ struct flb_sp *flb_sp_create(struct flb_config *config) int i = 0; int ret; char buf[32]; + char *task_name; + char *task_exec; struct mk_list *head; struct flb_sp *sp; struct flb_slist_entry *e; struct flb_sp_task *task; + struct cfl_variant *var; + struct flb_cf_section *section; /* Allocate context */ sp = flb_malloc(sizeof(struct flb_sp)); @@ -714,6 +727,35 @@ struct flb_sp *flb_sp_create(struct flb_config *config) } } + /* register stream processor tasks registered through Yaml config */ + if (config->cf_main) { + mk_list_foreach(head, &config->cf_main->stream_processors) { + section = mk_list_entry(head, struct flb_cf_section, _head_section); + + /* task name */ + var = cfl_kvlist_fetch(section->properties, "name"); + if (!var || var->type != CFL_VARIANT_STRING) { + flb_error("[sp] missing 'name' property in stream_processor section"); + continue; + } + task_name = var->data.as_string; + + /* task exec/query */ + var = cfl_kvlist_fetch(section->properties, "exec"); + if (!var || var->type != CFL_VARIANT_STRING) { + flb_error("[sp] missing 'exec' property in stream_processor section"); + continue; + } + task_exec = var->data.as_string; + + /* create task */ + task = flb_sp_task_create(sp, task_name, task_exec); + if (!task) { + continue; + } + } + } + /* Lookup configuration file if any */ if (config->stream_processor_file) { ret = sp_config_file(config, sp, config->stream_processor_file); diff --git a/tests/internal/config_format_yaml.c b/tests/internal/config_format_yaml.c index f34bdb273d4..304dec9393c 100644 --- a/tests/internal/config_format_yaml.c +++ b/tests/internal/config_format_yaml.c @@ -20,6 +20,17 @@ #define FLB_000 FLB_TESTS_CONF_PATH "/fluent-bit.yaml" #define FLB_001 FLB_TESTS_CONF_PATH "/issue_7559.yaml" #define FLB_002 FLB_TESTS_CONF_PATH "/processors.yaml" + +#ifdef _WIN32 +#define FLB_003 FLB_TESTS_CONF_PATH "\\parsers_and_multiline_parsers.yaml" +#else +#define FLB_003 FLB_TESTS_CONF_PATH "/parsers_and_multiline_parsers.yaml" +#endif + +#define FLB_004 FLB_TESTS_CONF_PATH "/stream_processor.yaml" +#define FLB_005 FLB_TESTS_CONF_PATH "/plugins.yaml" +#define FLB_006 FLB_TESTS_CONF_PATH "/upstream.yaml" + #define FLB_000_WIN FLB_TESTS_CONF_PATH "\\fluent-bit-windows.yaml" #define FLB_BROKEN_PLUGIN_VARIANT FLB_TESTS_CONF_PATH "/broken_plugin_variant.yaml" @@ -289,7 +300,7 @@ static void test_parser_conf() /* Total number of inputs */ if(!TEST_CHECK(mk_list_size(&config->parsers) == cnt+1)) { - TEST_MSG("Section number error. Got=%d expect=%d", + TEST_MSG("Section number error. Got=%d expect=%d", mk_list_size(&config->parsers), cnt+1); } @@ -455,6 +466,373 @@ static void test_processors() flb_cf_destroy(cf); } +static void test_parsers_and_multiline_parsers() +{ + int idx = 0; + flb_sds_t str; + struct mk_list *head; + struct mk_list *rule_head; + struct flb_cf *cf; + struct flb_cf_section *s; + struct flb_cf_group *g; + struct cfl_variant *v; + struct cfl_variant *tmp; + + cf = flb_cf_yaml_create(NULL, FLB_003, NULL, 0); + TEST_CHECK(cf != NULL); + if (!cf) { + exit(EXIT_FAILURE); + } + + /* Total number of sections */ + TEST_CHECK(mk_list_size(&cf->sections) == 8); + + /* Check number sections per list */ + TEST_CHECK(mk_list_size(&cf->parsers) == 3); + TEST_CHECK(mk_list_size(&cf->multiline_parsers) == 2); + TEST_CHECK(mk_list_size(&cf->customs) == 0); + TEST_CHECK(mk_list_size(&cf->inputs) == 1); + TEST_CHECK(mk_list_size(&cf->filters) == 0); + TEST_CHECK(mk_list_size(&cf->outputs) == 1); + TEST_CHECK(mk_list_size(&cf->others) == 0); + + /* check parsers */ + idx = 0; + mk_list_foreach(head, &cf->parsers) { + s = mk_list_entry(head, struct flb_cf_section, _head_section); + switch (idx) { + case 0: + v = flb_cf_section_property_get(cf, s, "name"); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "json-2") == 0); + break; + case 1: + v = flb_cf_section_property_get(cf, s, "name"); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "json") == 0); + break; + + case 2: + v = flb_cf_section_property_get(cf, s, "name"); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "docker") == 0); + break; + } + idx++; + } + + /* check multiline parsers */ + idx = 0; + head = NULL; + mk_list_foreach(head, &cf->multiline_parsers) { + s = mk_list_entry(head, struct flb_cf_section, _head_section); + str = flb_cf_section_property_get_string(cf, s, "name"); + + switch (idx) { + case 0: + TEST_CHECK(strcmp(str, "exception_test-2") == 0); + break; + case 1: + TEST_CHECK(strcmp(str, "exception_test") == 0); + break; + }; + flb_sds_destroy(str); + + /* check rules (groups) */ + TEST_CHECK(mk_list_size(&s->groups) == 2); + + idx = 0; + mk_list_foreach(rule_head, &s->groups) { + g = mk_list_entry(rule_head, struct flb_cf_group, _head); + TEST_CHECK(strcmp(g->name, "rule") == 0); + + if (idx == 0) { + /* get initial state "start_state" */ + tmp = cfl_kvlist_fetch(g->properties, "state"); + TEST_CHECK(tmp != NULL); + + TEST_CHECK(tmp->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(tmp->data.as_string, "start_state") == 0); + } + else if (idx == 1) { + /* get initial state "start_state" */ + tmp = cfl_kvlist_fetch(g->properties, "state"); + TEST_CHECK(tmp != NULL); + + TEST_CHECK(tmp->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(tmp->data.as_string, "cont") == 0); + } + idx++; + } + } + + flb_cf_destroy(cf); +} + +static void test_stream_processor() +{ + int idx = 0; + struct mk_list *head; + struct flb_cf *cf; + struct flb_cf_section *s; + struct cfl_variant *v; + + cf = flb_cf_yaml_create(NULL, FLB_004, NULL, 0); + TEST_CHECK(cf != NULL); + if (!cf) { + exit(EXIT_FAILURE); + } + + /* Total number of sections */ + TEST_CHECK(mk_list_size(&cf->sections) == 5); + + /* Check number sections per list */ + TEST_CHECK(mk_list_size(&cf->parsers) == 0); + TEST_CHECK(mk_list_size(&cf->multiline_parsers) == 0); + TEST_CHECK(mk_list_size(&cf->customs) == 0); + TEST_CHECK(mk_list_size(&cf->inputs) == 1); + TEST_CHECK(mk_list_size(&cf->filters) == 0); + TEST_CHECK(mk_list_size(&cf->outputs) == 1); + TEST_CHECK(mk_list_size(&cf->others) == 0); + + /* check others */ + idx = 0; + mk_list_foreach(head, &cf->stream_processors) { + s = mk_list_entry(head, struct flb_cf_section, _head_section); + + switch (idx) { + case 0: + v = flb_cf_section_property_get(cf, s, "name"); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "create_results") == 0); + + v = flb_cf_section_property_get(cf, s, "exec"); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strncmp(v->data.as_string, "CREATE STREAM results", 21) == 0); + break; + case 1: + v = flb_cf_section_property_get(cf, s, "name"); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "select_results") == 0); + + v = flb_cf_section_property_get(cf, s, "exec"); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strncmp(v->data.as_string, "SELECT * FROM", 13) == 0); + break; + }; + idx++; + + /* check groups */ + TEST_CHECK(mk_list_size(&s->groups) == 0); + } + + flb_cf_destroy(cf); +} + +static void test_plugins() +{ + int idx = 0; + struct mk_list *head; + struct flb_cf *cf; + struct flb_cf_section *s; + + struct cfl_kvpair *path; + struct cfl_list *path_head; + + cf = flb_cf_yaml_create(NULL, FLB_005, NULL, 0); + TEST_CHECK(cf != NULL); + if (!cf) { + exit(EXIT_FAILURE); + } + + /* Total number of sections */ + TEST_CHECK(mk_list_size(&cf->sections) == 4); + + /* Check number sections per list */ + TEST_CHECK(mk_list_size(&cf->plugins) == 1); + TEST_CHECK(mk_list_size(&cf->parsers) == 0); + TEST_CHECK(mk_list_size(&cf->multiline_parsers) == 0); + TEST_CHECK(mk_list_size(&cf->customs) == 0); + TEST_CHECK(mk_list_size(&cf->inputs) == 1); + TEST_CHECK(mk_list_size(&cf->filters) == 0); + TEST_CHECK(mk_list_size(&cf->outputs) == 1); + TEST_CHECK(mk_list_size(&cf->others) == 0); + + + mk_list_foreach(head, &cf->plugins) { + s = mk_list_entry(head, struct flb_cf_section, _head_section); + + idx = 0; + cfl_list_foreach(path_head, &s->properties->list) { + path = cfl_list_entry(path_head, struct cfl_kvpair, _head); + + switch (idx) { + case 0: + TEST_CHECK(strcmp(path->key, "/path/to/out_gstdout.so") == 0); + break; + case 1: + TEST_CHECK(strcmp(path->key, "/path/to/out_fluent.so") == 0); + break; + }; + idx++; + } + } + + flb_cf_destroy(cf); +} + +static void test_upstream_servers() +{ + int idx = 0; + int g_idx = 0; + struct mk_list *head; + struct mk_list *g_head; + struct flb_cf *cf; + struct flb_cf_section *s; + struct cfl_variant *v; + struct flb_cf_group *group; + + cf = flb_cf_yaml_create(NULL, FLB_006, NULL, 0); + TEST_CHECK(cf != NULL); + if (!cf) { + exit(EXIT_FAILURE); + } + + /* Total number of sections */ + TEST_CHECK(mk_list_size(&cf->sections) == 4); + + /* Check number sections per list */ + TEST_CHECK(mk_list_size(&cf->upstream_servers) == 2); + TEST_CHECK(mk_list_size(&cf->parsers) == 0); + TEST_CHECK(mk_list_size(&cf->multiline_parsers) == 0); + TEST_CHECK(mk_list_size(&cf->customs) == 0); + TEST_CHECK(mk_list_size(&cf->inputs) == 1); + TEST_CHECK(mk_list_size(&cf->filters) == 0); + TEST_CHECK(mk_list_size(&cf->outputs) == 1); + TEST_CHECK(mk_list_size(&cf->others) == 0); + + /* check upstream servers */ + idx = 0; + mk_list_foreach(head, &cf->upstream_servers) { + s = mk_list_entry(head, struct flb_cf_section, _head_section); + + switch (idx) { + case 0: + v = flb_cf_section_property_get(cf, s, "name"); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "forward-balancing") == 0); + + /* iterate node/groups */ + TEST_CHECK(mk_list_size(&s->groups) == 3); + + g_idx = 0; + mk_list_foreach(g_head, &s->groups) { + group = mk_list_entry(g_head, struct flb_cf_group, _head); + TEST_CHECK(group != NULL); + TEST_CHECK(strcmp(group->name, "upstream_node") == 0); + + switch (g_idx) { + case 0: + v = cfl_kvlist_fetch(group->properties, "name"); + TEST_CHECK(v != NULL); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "node-1") == 0); + + v = cfl_kvlist_fetch(group->properties, "host"); + TEST_CHECK(v != NULL); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "127.0.0.1") == 0); + + v = cfl_kvlist_fetch(group->properties, "port"); + TEST_CHECK(v != NULL); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "43000") == 0); + break; + + case 1: + v = cfl_kvlist_fetch(group->properties, "name"); + TEST_CHECK(v != NULL); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "node-2") == 0); + + v = cfl_kvlist_fetch(group->properties, "host"); + TEST_CHECK(v != NULL); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "127.0.0.1") == 0); + + v = cfl_kvlist_fetch(group->properties, "port"); + TEST_CHECK(v != NULL); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "44000") == 0); + break; + case 2: + v = cfl_kvlist_fetch(group->properties, "name"); + TEST_CHECK(v != NULL); + + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "node-3") == 0); + break; + }; + g_idx++; + } + break; + case 1: + v = flb_cf_section_property_get(cf, s, "name"); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "forward-balancing-2") == 0); + + g_idx = 0; + mk_list_foreach(g_head, &s->groups) { + group = mk_list_entry(g_head, struct flb_cf_group, _head); + TEST_CHECK(group != NULL); + TEST_CHECK(strcmp(group->name, "upstream_node") == 0); + + switch (g_idx) { + case 0: + v = cfl_kvlist_fetch(group->properties, "name"); + TEST_CHECK(v != NULL); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "node-A") == 0); + + v = cfl_kvlist_fetch(group->properties, "host"); + TEST_CHECK(v != NULL); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "192.168.1.10") == 0); + + v = cfl_kvlist_fetch(group->properties, "port"); + TEST_CHECK(v != NULL); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "50000") == 0); + + break; + case 1: + v = cfl_kvlist_fetch(group->properties, "name"); + TEST_CHECK(v != NULL); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "node-B") == 0); + + v = cfl_kvlist_fetch(group->properties, "host"); + TEST_CHECK(v != NULL); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "192.168.1.11") == 0); + + v = cfl_kvlist_fetch(group->properties, "port"); + TEST_CHECK(v != NULL); + TEST_CHECK(v->type == CFL_VARIANT_STRING); + TEST_CHECK(strcmp(v->data.as_string, "51000") == 0); + break; + }; + g_idx++; + } + + break; + }; + idx++; + } + + flb_cf_destroy(cf); +} + TEST_LIST = { { "basic" , test_basic}, { "customs section", test_customs_section}, @@ -464,5 +842,9 @@ TEST_LIST = { { "parsers file conf", test_parser_conf}, { "camel_case_key", test_camel_case_key}, { "processors", test_processors}, + { "parsers_and_multiline_parsers", test_parsers_and_multiline_parsers}, + { "stream_processor", test_stream_processor}, + { "plugins", test_plugins}, + { "upstream_servers", test_upstream_servers}, { 0 } }; diff --git a/tests/internal/data/config_format/yaml/extra_parser.yaml b/tests/internal/data/config_format/yaml/extra_parser.yaml new file mode 100644 index 00000000000..abbf29bd98c --- /dev/null +++ b/tests/internal/data/config_format/yaml/extra_parser.yaml @@ -0,0 +1,16 @@ +parsers: + - name: json-2 + format: json + +multiline_parsers: + - name: exception_test-2 + type: regex + flush_timeout: 1000 + rules: + - state: start_state + regex: "/(Dec \\d+ \\d+\\:\\d+\\:\\d+)(.*)/" + next_state: cont + + - state: cont + regex: "/^\\s+at.*/" + next_state: cont diff --git a/tests/internal/data/config_format/yaml/parsers_and_multiline_parsers.yaml b/tests/internal/data/config_format/yaml/parsers_and_multiline_parsers.yaml new file mode 100644 index 00000000000..637c93ea3b0 --- /dev/null +++ b/tests/internal/data/config_format/yaml/parsers_and_multiline_parsers.yaml @@ -0,0 +1,42 @@ +--- +service: + log_level: info + #parsers_file: parsers_multiline.conf + +includes: + - extra_parser.yaml + +parsers: + - name: json + format: json + + - name: docker + format: json + time_key: time + time_format: "%Y-%m-%dT%H:%M:%S.%L" + time_keep: true + +multiline_parsers: + - name: exception_test + type: regex + flush_timeout: 1000 + rules: + - state: start_state + regex: "/(Dec \\d+ \\d+\\:\\d+\\:\\d+)(.*)/" + next_state: cont + + - state: cont + regex: "/^\\s+at.*/" + next_state: cont + +pipeline: + inputs: + - name: tail + path: ../test_multiline.log + read_from_head: true + multiline.parser: multiline-regex-test + + outputs: + - name: stdout + match: '*' + format: json_lines diff --git a/tests/internal/data/config_format/yaml/plugins.yaml b/tests/internal/data/config_format/yaml/plugins.yaml new file mode 100644 index 00000000000..79378f6e55b --- /dev/null +++ b/tests/internal/data/config_format/yaml/plugins.yaml @@ -0,0 +1,16 @@ +--- + +plugins: + - /path/to/out_gstdout.so + - /path/to/out_fluent.so + +service: + log_level: info + +pipeline: + inputs: + - name: random + + outputs: + - name: gstdout + match: '*' diff --git a/tests/internal/data/config_format/yaml/stream_processor.yaml b/tests/internal/data/config_format/yaml/stream_processor.yaml new file mode 100644 index 00000000000..f098cc6b027 --- /dev/null +++ b/tests/internal/data/config_format/yaml/stream_processor.yaml @@ -0,0 +1,22 @@ +--- + +stream_processor: + - name: create_results + exec: CREATE STREAM results WITH (tag='500_error') AS SELECT * FROM STREAM:tail.0 WHERE http_status=500; + + - name: select_results + exec: SELECT * FROM STREAM:results WHERE http_status=500; + +service: + log_level: info + +pipeline: + inputs: + - name: tail + path: test_multiline.log + read_from_head: true + + outputs: + - name: stdout + match: '*' + format: json_lines diff --git a/tests/internal/data/config_format/yaml/upstream.yaml b/tests/internal/data/config_format/yaml/upstream.yaml new file mode 100644 index 00000000000..9dcf0ffb499 --- /dev/null +++ b/tests/internal/data/config_format/yaml/upstream.yaml @@ -0,0 +1,38 @@ +--- + +upstream_servers: + - name: forward-balancing + nodes: + - name: node-1 + host: 127.0.0.1 + port: 43000 + + - name: node-2 + host: 127.0.0.1 + port: 44000 + + - name: node-3 + host: 127.0.0.1 + port: 45000 + tls: true + tls_verify: false + shared_key: secret + + - name: forward-balancing-2 + nodes: + - name: node-A + host: 192.168.1.10 + port: 50000 + + - name: node-B + host: 192.168.1.11 + port: 51000 + +pipeline: + inputs: + - name: random + + outputs: + - name: stdout + match: '*' +