From 7c4c98b067598382eff6676fd018087e26c99f24 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 27 Aug 2024 18:14:48 +0900 Subject: [PATCH] in_systemd: Process enumerated data as cfl_kvlist(s) at first This is because systemctl's -o json-pretty or -o json converts duplicated keys' values as array(s). To avoid generating the duplicated key(s) does not resolve this issue. Instead, we need to store as cfl_kvlist at first to detect duplicated keys on enumerated data in journal storage. Then, we also need to generate as msgpack's array format when the duplicated key(s) were detected and translated as array format when storing as cfl_kvlist(s). Signed-off-by: Hiroshi Hatake --- plugins/in_systemd/systemd.c | 153 +++++++++++++++++++++++++++++++---- 1 file changed, 136 insertions(+), 17 deletions(-) diff --git a/plugins/in_systemd/systemd.c b/plugins/in_systemd/systemd.c index 85f36ee990a..4c03121e603 100644 --- a/plugins/in_systemd/systemd.c +++ b/plugins/in_systemd/systemd.c @@ -77,6 +77,7 @@ static int in_systemd_collect(struct flb_input_instance *ins, int ret_j; int i; int len; + int key_len; int entries = 0; int skip_entries = 0; int rows = 0; @@ -100,6 +101,15 @@ static int in_systemd_collect(struct flb_input_instance *ins, const void *data; struct flb_systemd_config *ctx = in_context; struct flb_time tm; + struct cfl_kvlist *kvlist = NULL; + struct cfl_variant *cfl_val = NULL; + struct cfl_list *head; + struct cfl_kvpair *kvpair = NULL; + struct cfl_variant *v = NULL; + struct cfl_array *array = NULL; + struct cfl_variant *tmp_val = NULL; + flb_sds_t list_key = NULL; + flb_sds_t search_key = NULL; /* Restricted by mem_buf_limit */ if (flb_input_buf_paused(ins) == FLB_TRUE) { @@ -200,6 +210,13 @@ static int in_systemd_collect(struct flb_input_instance *ins, ret = flb_log_event_encoder_set_timestamp(ctx->log_encoder, &tm); } + /* create an empty kvlist as the labels */ + kvlist = cfl_kvlist_create(); + if (!kvlist) { + flb_plg_error(ctx->ins, "error allocating kvlist"); + break; + } + /* Pack every field in the entry */ entries = 0; skip_entries = 0; @@ -218,11 +235,7 @@ static int in_systemd_collect(struct flb_input_instance *ins, } len = (sep - key); - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_append_body_string_length( - ctx->log_encoder, len); - } + key_len = len; if (ctx->lowercase == FLB_TRUE) { /* @@ -238,31 +251,137 @@ static int in_systemd_collect(struct flb_input_instance *ins, for (i = 0; i < len; i++) { buf[i] = tolower(key[i]); } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_append_body_string_body( - ctx->log_encoder, buf, len); - } + list_key = flb_sds_create_len(buf, key_len); } else { - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_append_body_string_body( - ctx->log_encoder, (char *) key, len); - } + list_key = flb_sds_create_len(key, key_len); + } + + if (!list_key) { + continue; } + /* Check existence */ + cfl_val = NULL; + cfl_val = cfl_kvlist_fetch_s(kvlist, list_key, key_len); + val = sep + 1; len = length - (sep - key) - 1; - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_append_body_string( - ctx->log_encoder, (char *) val, len); + /* Initialize variable for cfl_variant operations. */ + search_key = NULL; + tmp_val = NULL; + + /* Store cfl_kvlist format at first to detect duplicated keys */ + if (cfl_val) { + switch(cfl_val->type) { + case CFL_VARIANT_STRING: + tmp_val = cfl_variant_create_from_string(cfl_val->data.as_string); + if (!tmp_val) { + continue; + } + break; + case CFL_VARIANT_ARRAY: + /* Just a reference */ + tmp_val = cfl_val; + break; + default: + /* nop */ + } + + switch(tmp_val->type) { + case CFL_VARIANT_STRING: + search_key = flb_sds_create_len(list_key, key_len); + if (search_key != NULL) { + cfl_kvlist_remove(kvlist, list_key); + } + flb_sds_destroy(search_key); + + array = cfl_array_create(8); + if (!array) { + cfl_variant_destroy(tmp_val); + continue; + } + if (cfl_array_resizable(array, CFL_TRUE) == -1) { + cfl_array_destroy(array); + cfl_variant_destroy(tmp_val); + continue; + } + + cfl_array_append_string_s(array, + tmp_val->data.as_string, + strlen(tmp_val->data.as_string), + CFL_FALSE); + cfl_array_append_string_s(array, (char *)val, strlen(val), CFL_TRUE); + cfl_kvlist_insert_array_s(kvlist, list_key, key_len, array); + cfl_variant_destroy(tmp_val); + break; + case CFL_VARIANT_ARRAY: + /* Just appending the newly arrived field(s) */ + array = tmp_val->data.as_array; + cfl_array_append_string_s(array, (char *)val, strlen(val), CFL_TRUE); + break; + default: + /* nop */ + } + } + else { + cfl_kvlist_insert_string_s(kvlist, list_key, key_len, + (char *)val, strlen(val), CFL_FALSE); } + flb_sds_destroy(list_key); entries++; } rows++; + /* Interpret cfl_kvlist as logs type of events later. */ + cfl_list_foreach(head, &kvlist->list) { + kvpair = cfl_list_entry(head, struct cfl_kvpair, _head); + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_append_body_string_length( + ctx->log_encoder, cfl_sds_len(kvpair->key)); + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_append_body_string_body( + ctx->log_encoder, kvpair->key, cfl_sds_len(kvpair->key)); + } + + v = kvpair->val; + if (v->type == CFL_VARIANT_STRING) { + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_append_body_string( + ctx->log_encoder, v->data.as_string, cfl_variant_size_get(v)); + } + } + else if (v->type == CFL_VARIANT_ARRAY) { + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_body_begin_array(ctx->log_encoder); + } + + array = v->data.as_array; + for (i = 0; i < array->entry_count; i++) { + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + if (array->entries[i]->type != CFL_VARIANT_STRING) { + continue; + } + ret = flb_log_event_encoder_append_body_string( + ctx->log_encoder, array->entries[i]->data.as_string, + cfl_variant_size_get(array->entries[i])); + } + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_body_commit_array(ctx->log_encoder); + } + } + } + + if (kvlist) { + cfl_kvlist_destroy(kvlist); + } + if (skip_entries > 0) { flb_plg_error(ctx->ins, "Skip %d broken entries", skip_entries); }