diff --git a/plugins/in_systemd/systemd.c b/plugins/in_systemd/systemd.c index 85f36ee990a..21e84b80c38 100644 --- a/plugins/in_systemd/systemd.c +++ b/plugins/in_systemd/systemd.c @@ -77,6 +77,7 @@ static int in_systemd_collect(struct flb_input_instance *ins, int ret_j; int i; int len; + int key_len; int entries = 0; int skip_entries = 0; int rows = 0; @@ -100,6 +101,15 @@ static int in_systemd_collect(struct flb_input_instance *ins, const void *data; struct flb_systemd_config *ctx = in_context; struct flb_time tm; + struct cfl_kvlist *kvlist = NULL; + struct cfl_variant *cfl_val = NULL; + struct cfl_list *head; + struct cfl_kvpair *kvpair = NULL; + struct cfl_variant *v = NULL; + struct cfl_array *array = NULL; + struct cfl_variant *tmp_val = NULL; + flb_sds_t list_key = NULL; + flb_sds_t search_key = NULL; /* Restricted by mem_buf_limit */ if (flb_input_buf_paused(ins) == FLB_TRUE) { @@ -200,6 +210,13 @@ static int in_systemd_collect(struct flb_input_instance *ins, ret = flb_log_event_encoder_set_timestamp(ctx->log_encoder, &tm); } + /* create an empty kvlist as the labels */ + kvlist = cfl_kvlist_create(); + if (!kvlist) { + flb_plg_error(ctx->ins, "error allocating kvlist"); + break; + } + /* Pack every field in the entry */ entries = 0; skip_entries = 0; @@ -218,11 +235,7 @@ static int in_systemd_collect(struct flb_input_instance *ins, } len = (sep - key); - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_append_body_string_length( - ctx->log_encoder, len); - } + key_len = len; if (ctx->lowercase == FLB_TRUE) { /* @@ -238,31 +251,139 @@ static int in_systemd_collect(struct flb_input_instance *ins, for (i = 0; i < len; i++) { buf[i] = tolower(key[i]); } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_append_body_string_body( - ctx->log_encoder, buf, len); - } + list_key = flb_sds_create_len(buf, key_len); } else { - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_append_body_string_body( - ctx->log_encoder, (char *) key, len); - } + list_key = flb_sds_create_len(key, key_len); + } + + if (!list_key) { + continue; } + /* Check existence */ + cfl_val = NULL; + cfl_val = cfl_kvlist_fetch_s(kvlist, list_key, key_len); + val = sep + 1; len = length - (sep - key) - 1; - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_append_body_string( - ctx->log_encoder, (char *) val, len); + /* Initialize variable for cfl_variant operations. */ + search_key = NULL; + tmp_val = NULL; + + /* Store cfl_kvlist format at first to detect duplicated keys */ + if (cfl_val) { + switch(cfl_val->type) { + case CFL_VARIANT_STRING: + tmp_val = cfl_variant_create_from_string(cfl_val->data.as_string); + if (!tmp_val) { + continue; + } + break; + case CFL_VARIANT_ARRAY: + /* Just a reference */ + tmp_val = cfl_val; + break; + default: + /* nop */ + break; + } + + switch(tmp_val->type) { + case CFL_VARIANT_STRING: + search_key = flb_sds_create_len(list_key, key_len); + if (search_key != NULL) { + cfl_kvlist_remove(kvlist, list_key); + } + flb_sds_destroy(search_key); + + array = cfl_array_create(8); + if (!array) { + cfl_variant_destroy(tmp_val); + continue; + } + if (cfl_array_resizable(array, CFL_TRUE) == -1) { + cfl_array_destroy(array); + cfl_variant_destroy(tmp_val); + continue; + } + + cfl_array_append_string_s(array, + tmp_val->data.as_string, + strlen(tmp_val->data.as_string), + CFL_FALSE); + cfl_array_append_string_s(array, (char *)val, strlen(val), CFL_FALSE); + cfl_kvlist_insert_array_s(kvlist, list_key, key_len, array); + cfl_variant_destroy(tmp_val); + break; + case CFL_VARIANT_ARRAY: + /* Just appending the newly arrived field(s) */ + array = tmp_val->data.as_array; + cfl_array_append_string_s(array, (char *)val, strlen(val), CFL_FALSE); + break; + default: + /* nop */ + break; + } + } + else { + cfl_kvlist_insert_string_s(kvlist, list_key, key_len, + (char *)val, strlen(val), CFL_FALSE); } + flb_sds_destroy(list_key); entries++; } rows++; + /* Interpret cfl_kvlist as logs type of events later. */ + cfl_list_foreach(head, &kvlist->list) { + kvpair = cfl_list_entry(head, struct cfl_kvpair, _head); + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_append_body_string_length( + ctx->log_encoder, cfl_sds_len(kvpair->key)); + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_append_body_string_body( + ctx->log_encoder, kvpair->key, cfl_sds_len(kvpair->key)); + } + + v = kvpair->val; + if (v->type == CFL_VARIANT_STRING) { + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_append_body_string( + ctx->log_encoder, v->data.as_string, cfl_variant_size_get(v)); + } + } + else if (v->type == CFL_VARIANT_ARRAY) { + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_body_begin_array(ctx->log_encoder); + } + + array = v->data.as_array; + for (i = 0; i < array->entry_count; i++) { + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + if (array->entries[i]->type != CFL_VARIANT_STRING) { + continue; + } + ret = flb_log_event_encoder_append_body_string( + ctx->log_encoder, array->entries[i]->data.as_string, + cfl_variant_size_get(array->entries[i])); + } + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_body_commit_array(ctx->log_encoder); + } + } + } + + if (kvlist) { + cfl_kvlist_destroy(kvlist); + } + if (skip_entries > 0) { flb_plg_error(ctx->ins, "Skip %d broken entries", skip_entries); }