From 6790b26303628bf65051676437ba1701ec34f15d Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 27 Aug 2024 18:14:48 +0900 Subject: [PATCH] in_systemd: Process enumerated data as cfl_kvlist(s) at first This is because systemctl's -o json-pretty or -o json converts duplicated keys' values as array(s). To avoid generating the duplicated key(s) does not resolve this issue. Instead, we need to store as cfl_kvlist at first to detect duplicated keys on enumerated data in journal storage. Then, we also need to generate as msgpack's array format when the duplicated key(s) were detected and translated as array format when storing as cfl_kvlist(s). Signed-off-by: Hiroshi Hatake --- plugins/in_systemd/systemd.c | 338 ++++++++++++++++++++++++---- plugins/in_systemd/systemd_config.h | 1 + 2 files changed, 290 insertions(+), 49 deletions(-) diff --git a/plugins/in_systemd/systemd.c b/plugins/in_systemd/systemd.c index 85f36ee990a..08ba75d210e 100644 --- a/plugins/in_systemd/systemd.c +++ b/plugins/in_systemd/systemd.c @@ -70,13 +70,202 @@ static int tag_compose(const char *tag, const char *unit_name, return 0; } +static int append_enumerate_data(struct flb_systemd_config *ctx, struct cfl_kvlist *kvlist) +{ + int i; + int ret = FLB_EVENT_ENCODER_SUCCESS; + struct cfl_list *head; + struct cfl_kvpair *kvpair = NULL; + struct cfl_variant *v = NULL; + struct cfl_array *array = NULL; + + /* Interpret cfl_kvlist as logs type of events later. */ + cfl_list_foreach(head, &kvlist->list) { + kvpair = cfl_list_entry(head, struct cfl_kvpair, _head); + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_append_body_string_length( + ctx->log_encoder, cfl_sds_len(kvpair->key)); + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_append_body_string_body( + ctx->log_encoder, kvpair->key, cfl_sds_len(kvpair->key)); + } + + v = kvpair->val; + if (v->type == CFL_VARIANT_STRING) { + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_append_body_string( + ctx->log_encoder, v->data.as_string, cfl_variant_size_get(v)); + } + } + else if (v->type == CFL_VARIANT_ARRAY) { + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_body_begin_array(ctx->log_encoder); + } + + array = v->data.as_array; + for (i = 0; i < array->entry_count; i++) { + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + if (array->entries[i]->type != CFL_VARIANT_STRING) { + continue; + } + ret = flb_log_event_encoder_append_body_string( + ctx->log_encoder, array->entries[i]->data.as_string, + cfl_variant_size_get(array->entries[i])); + } + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_body_commit_array(ctx->log_encoder); + } + } + } + + return ret; +} + +static int systemd_enumerate_data_store(struct flb_config *config, + struct flb_input_instance *ins, + void *plugin_context, + void *format_context, + const void *data, size_t data_size) +{ + int i; + int len; + int key_len; + size_t length = data_size; + const char *sep; + const char *key; + const char *val; + char *buf = NULL; + struct cfl_kvlist *kvlist = format_context; + struct flb_systemd_config *ctx = plugin_context; + struct cfl_variant *cfl_val = NULL; + struct cfl_array *array = NULL; + struct cfl_variant *tmp_val = NULL; + flb_sds_t list_key = NULL; + flb_sds_t search_key = NULL; + + key = (const char *) data; + sep = strchr(key, '='); + if (sep == NULL) { + return -2; + } + + len = (sep - key); + key_len = len; + + if (ctx->lowercase == FLB_TRUE) { + /* + * Ensure buf to have enough space for the key because the libsystemd + * might return larger data than the threshold. + */ + if (buf == NULL) { + buf = flb_sds_create_len(NULL, ctx->threshold); + } + if (flb_sds_alloc(buf) < len) { + buf = flb_sds_increase(buf, len - flb_sds_alloc(buf)); + } + for (i = 0; i < len; i++) { + buf[i] = tolower(key[i]); + } + list_key = flb_sds_create_len(buf, key_len); + } + else { + list_key = flb_sds_create_len(key, key_len); + } + + if (!list_key) { + return -1; + } + + /* Check existence */ + cfl_val = NULL; + cfl_val = cfl_kvlist_fetch_s(kvlist, list_key, key_len); + + val = sep + 1; + len = length - (sep - key) - 1; + + /* Initialize variable for cfl_variant operations. */ + search_key = NULL; + tmp_val = NULL; + + /* Store cfl_kvlist format at first to detect duplicated keys */ + if (cfl_val) { + switch(cfl_val->type) { + case CFL_VARIANT_STRING: + tmp_val = cfl_variant_create_from_string(cfl_val->data.as_string); + if (!tmp_val) { + return -1; + } + break; + case CFL_VARIANT_ARRAY: + /* Just a reference */ + tmp_val = cfl_val; + break; + default: + /* nop */ + break; + } + + switch(tmp_val->type) { + case CFL_VARIANT_STRING: + search_key = flb_sds_create_len(list_key, key_len); + if (search_key != NULL) { + cfl_kvlist_remove(kvlist, search_key); + } + flb_sds_destroy(search_key); + + array = cfl_array_create(8); + if (!array) { + cfl_variant_destroy(tmp_val); + goto error; + } + if (cfl_array_resizable(array, CFL_TRUE) == -1) { + cfl_array_destroy(array); + cfl_variant_destroy(tmp_val); + goto error; + } + + cfl_array_append_string_s(array, + tmp_val->data.as_string, + strlen(tmp_val->data.as_string), + CFL_FALSE); + cfl_array_append_string_s(array, (char *)val, strlen(val), CFL_FALSE); + cfl_kvlist_insert_array_s(kvlist, list_key, key_len, array); + cfl_variant_destroy(tmp_val); + break; + case CFL_VARIANT_ARRAY: + /* Just appending the newly arrived field(s) */ + array = tmp_val->data.as_array; + cfl_array_append_string_s(array, (char *)val, strlen(val), CFL_FALSE); + break; + default: + /* nop */ + break; + } + } + else { + cfl_kvlist_insert_string_s(kvlist, list_key, key_len, + (char *)val, strlen(val), CFL_FALSE); + } + + flb_sds_destroy(list_key); + + return 0; + +error: + flb_sds_destroy(list_key); + + return -1; +} + static int in_systemd_collect(struct flb_input_instance *ins, struct flb_config *config, void *in_context) { int ret; int ret_j; - int i; - int len; int entries = 0; int skip_entries = 0; int rows = 0; @@ -84,10 +273,7 @@ static int in_systemd_collect(struct flb_input_instance *ins, long nsec; uint64_t usec; size_t length; - size_t threshold; - const char *sep; const char *key; - const char *val; char *buf = NULL; #ifdef FLB_HAVE_SQLDB char *cursor = NULL; @@ -100,6 +286,7 @@ static int in_systemd_collect(struct flb_input_instance *ins, const void *data; struct flb_systemd_config *ctx = in_context; struct flb_time tm; + struct cfl_kvlist *kvlist = NULL; /* Restricted by mem_buf_limit */ if (flb_input_buf_paused(ins) == FLB_TRUE) { @@ -123,7 +310,7 @@ static int in_systemd_collect(struct flb_input_instance *ins, } if (ctx->lowercase == FLB_TRUE) { - ret = sd_journal_get_data_threshold(ctx->j, &threshold); + ret = sd_journal_get_data_threshold(ctx->j, &ctx->threshold); if (ret != 0) { flb_plg_error(ctx->ins, "error setting up systemd data. " @@ -200,6 +387,13 @@ static int in_systemd_collect(struct flb_input_instance *ins, ret = flb_log_event_encoder_set_timestamp(ctx->log_encoder, &tm); } + /* create an empty kvlist as the labels */ + kvlist = cfl_kvlist_create(); + if (!kvlist) { + flb_plg_error(ctx->ins, "error allocating kvlist"); + break; + } + /* Pack every field in the entry */ entries = 0; skip_entries = 0; @@ -211,58 +405,28 @@ static int in_systemd_collect(struct flb_input_instance *ins, length--; } - sep = strchr(key, '='); - if (sep == NULL) { + ret = systemd_enumerate_data_store(config, ctx->ins, + (void *)ctx, (void *)kvlist, + key, length); + if (ret == -2) { skip_entries++; continue; } - - len = (sep - key); - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_append_body_string_length( - ctx->log_encoder, len); - } - - if (ctx->lowercase == FLB_TRUE) { - /* - * Ensure buf to have enough space for the key because the libsystemd - * might return larger data than the threshold. - */ - if (buf == NULL) { - buf = flb_sds_create_len(NULL, threshold); - } - if (flb_sds_alloc(buf) < len) { - buf = flb_sds_increase(buf, len - flb_sds_alloc(buf)); - } - for (i = 0; i < len; i++) { - buf[i] = tolower(key[i]); - } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_append_body_string_body( - ctx->log_encoder, buf, len); - } - } - else { - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_append_body_string_body( - ctx->log_encoder, (char *) key, len); - } - } - - val = sep + 1; - len = length - (sep - key) - 1; - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_append_body_string( - ctx->log_encoder, (char *) val, len); + else if (ret == -1) { + continue; } entries++; } rows++; + /* Interpret cfl_kvlist as logs type of events later. */ + ret = append_enumerate_data(ctx, kvlist); + + if (kvlist) { + cfl_kvlist_destroy(kvlist); + } + if (skip_entries > 0) { flb_plg_error(ctx->ins, "Skip %d broken entries", skip_entries); } @@ -483,6 +647,78 @@ static int in_systemd_exit(void *data, struct flb_config *config) return 0; } +static int cb_systemd_format_test(struct flb_config *config, + struct flb_input_instance *ins, + void *plugin_context, + const void *data, size_t bytes, + void **out_data, size_t *out_size) +{ + int ret; + struct flb_systemd_config *ctx = plugin_context; + struct flb_time tm; + struct cfl_list *head = NULL; + struct cfl_list *kvs = NULL; + struct cfl_split_entry *cur = NULL; + struct cfl_kvlist *kvlist = NULL; + const char *keys; + + ret = flb_log_event_encoder_begin_record(ctx->log_encoder); + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_set_timestamp(ctx->log_encoder, &tm); + } + + /* create an empty kvlist as the labels */ + kvlist = cfl_kvlist_create(); + if (!kvlist) { + flb_plg_error(ctx->ins, "error allocating kvlist"); + return -1; + } + + keys = (const char *) data; + kvs = cfl_utils_split(keys, '\n', -1 ); + if (kvs == NULL) { + goto split_error; + } + + cfl_list_foreach(head, kvs) { + cur = cfl_list_entry(head, struct cfl_split_entry, _head); + ret = systemd_enumerate_data_store(config, ctx->ins, + (void *)ctx, (void *)kvlist, + cur->value, cur->len); + + if (ret == -2 || ret == -1) { + continue; + } + } + + /* Interpret cfl_kvlist as logs type of events later. */ + ret = append_enumerate_data(ctx, kvlist); + + if (kvlist) { + cfl_kvlist_destroy(kvlist); + } + + if (kvs != NULL) { + cfl_utils_split_free(kvs); + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_commit_record(ctx->log_encoder); + } + + *out_data = ctx->log_encoder->output_buffer; + *out_size = ctx->log_encoder->output_length; + + return 0; + +split_error: + *out_data = NULL; + *out_size = 0; + + return -1; +} + static struct flb_config_map config_map[] = { { FLB_CONFIG_MAP_STR, "path", (char *)NULL, @@ -551,5 +787,9 @@ struct flb_input_plugin in_systemd_plugin = { .cb_resume = in_systemd_resume, .cb_exit = in_systemd_exit, .config_map = config_map, + + /* for testing */ + .test_formatter.callback = cb_systemd_format_test, + .flags = 0 }; diff --git a/plugins/in_systemd/systemd_config.h b/plugins/in_systemd/systemd_config.h index 83e14856b4d..af789b7ea3b 100644 --- a/plugins/in_systemd/systemd_config.h +++ b/plugins/in_systemd/systemd_config.h @@ -63,6 +63,7 @@ struct flb_systemd_config { int dynamic_tag; int max_fields; /* max number of fields per record */ int max_entries; /* max number of records per iteration */ + size_t threshold; /* threshold for retriveing journal */ #ifdef FLB_HAVE_SQLDB flb_sds_t db_path;