diff --git a/plugins/in_systemd/systemd.c b/plugins/in_systemd/systemd.c index 85f36ee990a..08ba75d210e 100644 --- a/plugins/in_systemd/systemd.c +++ b/plugins/in_systemd/systemd.c @@ -70,13 +70,202 @@ static int tag_compose(const char *tag, const char *unit_name, return 0; } +static int append_enumerate_data(struct flb_systemd_config *ctx, struct cfl_kvlist *kvlist) +{ + int i; + int ret = FLB_EVENT_ENCODER_SUCCESS; + struct cfl_list *head; + struct cfl_kvpair *kvpair = NULL; + struct cfl_variant *v = NULL; + struct cfl_array *array = NULL; + + /* Interpret cfl_kvlist as logs type of events later. */ + cfl_list_foreach(head, &kvlist->list) { + kvpair = cfl_list_entry(head, struct cfl_kvpair, _head); + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_append_body_string_length( + ctx->log_encoder, cfl_sds_len(kvpair->key)); + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_append_body_string_body( + ctx->log_encoder, kvpair->key, cfl_sds_len(kvpair->key)); + } + + v = kvpair->val; + if (v->type == CFL_VARIANT_STRING) { + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_append_body_string( + ctx->log_encoder, v->data.as_string, cfl_variant_size_get(v)); + } + } + else if (v->type == CFL_VARIANT_ARRAY) { + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_body_begin_array(ctx->log_encoder); + } + + array = v->data.as_array; + for (i = 0; i < array->entry_count; i++) { + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + if (array->entries[i]->type != CFL_VARIANT_STRING) { + continue; + } + ret = flb_log_event_encoder_append_body_string( + ctx->log_encoder, array->entries[i]->data.as_string, + cfl_variant_size_get(array->entries[i])); + } + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_body_commit_array(ctx->log_encoder); + } + } + } + + return ret; +} + +static int systemd_enumerate_data_store(struct flb_config *config, + struct flb_input_instance *ins, + void *plugin_context, + void *format_context, + const void *data, size_t data_size) +{ + int i; + int len; + int key_len; + size_t length = data_size; + const char *sep; + const char *key; + const char *val; + char *buf = NULL; + struct cfl_kvlist *kvlist = format_context; + struct flb_systemd_config *ctx = plugin_context; + struct cfl_variant *cfl_val = NULL; + struct cfl_array *array = NULL; + struct cfl_variant *tmp_val = NULL; + flb_sds_t list_key = NULL; + flb_sds_t search_key = NULL; + + key = (const char *) data; + sep = strchr(key, '='); + if (sep == NULL) { + return -2; + } + + len = (sep - key); + key_len = len; + + if (ctx->lowercase == FLB_TRUE) { + /* + * Ensure buf to have enough space for the key because the libsystemd + * might return larger data than the threshold. + */ + if (buf == NULL) { + buf = flb_sds_create_len(NULL, ctx->threshold); + } + if (flb_sds_alloc(buf) < len) { + buf = flb_sds_increase(buf, len - flb_sds_alloc(buf)); + } + for (i = 0; i < len; i++) { + buf[i] = tolower(key[i]); + } + list_key = flb_sds_create_len(buf, key_len); + } + else { + list_key = flb_sds_create_len(key, key_len); + } + + if (!list_key) { + return -1; + } + + /* Check existence */ + cfl_val = NULL; + cfl_val = cfl_kvlist_fetch_s(kvlist, list_key, key_len); + + val = sep + 1; + len = length - (sep - key) - 1; + + /* Initialize variable for cfl_variant operations. */ + search_key = NULL; + tmp_val = NULL; + + /* Store cfl_kvlist format at first to detect duplicated keys */ + if (cfl_val) { + switch(cfl_val->type) { + case CFL_VARIANT_STRING: + tmp_val = cfl_variant_create_from_string(cfl_val->data.as_string); + if (!tmp_val) { + return -1; + } + break; + case CFL_VARIANT_ARRAY: + /* Just a reference */ + tmp_val = cfl_val; + break; + default: + /* nop */ + break; + } + + switch(tmp_val->type) { + case CFL_VARIANT_STRING: + search_key = flb_sds_create_len(list_key, key_len); + if (search_key != NULL) { + cfl_kvlist_remove(kvlist, search_key); + } + flb_sds_destroy(search_key); + + array = cfl_array_create(8); + if (!array) { + cfl_variant_destroy(tmp_val); + goto error; + } + if (cfl_array_resizable(array, CFL_TRUE) == -1) { + cfl_array_destroy(array); + cfl_variant_destroy(tmp_val); + goto error; + } + + cfl_array_append_string_s(array, + tmp_val->data.as_string, + strlen(tmp_val->data.as_string), + CFL_FALSE); + cfl_array_append_string_s(array, (char *)val, strlen(val), CFL_FALSE); + cfl_kvlist_insert_array_s(kvlist, list_key, key_len, array); + cfl_variant_destroy(tmp_val); + break; + case CFL_VARIANT_ARRAY: + /* Just appending the newly arrived field(s) */ + array = tmp_val->data.as_array; + cfl_array_append_string_s(array, (char *)val, strlen(val), CFL_FALSE); + break; + default: + /* nop */ + break; + } + } + else { + cfl_kvlist_insert_string_s(kvlist, list_key, key_len, + (char *)val, strlen(val), CFL_FALSE); + } + + flb_sds_destroy(list_key); + + return 0; + +error: + flb_sds_destroy(list_key); + + return -1; +} + static int in_systemd_collect(struct flb_input_instance *ins, struct flb_config *config, void *in_context) { int ret; int ret_j; - int i; - int len; int entries = 0; int skip_entries = 0; int rows = 0; @@ -84,10 +273,7 @@ static int in_systemd_collect(struct flb_input_instance *ins, long nsec; uint64_t usec; size_t length; - size_t threshold; - const char *sep; const char *key; - const char *val; char *buf = NULL; #ifdef FLB_HAVE_SQLDB char *cursor = NULL; @@ -100,6 +286,7 @@ static int in_systemd_collect(struct flb_input_instance *ins, const void *data; struct flb_systemd_config *ctx = in_context; struct flb_time tm; + struct cfl_kvlist *kvlist = NULL; /* Restricted by mem_buf_limit */ if (flb_input_buf_paused(ins) == FLB_TRUE) { @@ -123,7 +310,7 @@ static int in_systemd_collect(struct flb_input_instance *ins, } if (ctx->lowercase == FLB_TRUE) { - ret = sd_journal_get_data_threshold(ctx->j, &threshold); + ret = sd_journal_get_data_threshold(ctx->j, &ctx->threshold); if (ret != 0) { flb_plg_error(ctx->ins, "error setting up systemd data. " @@ -200,6 +387,13 @@ static int in_systemd_collect(struct flb_input_instance *ins, ret = flb_log_event_encoder_set_timestamp(ctx->log_encoder, &tm); } + /* create an empty kvlist as the labels */ + kvlist = cfl_kvlist_create(); + if (!kvlist) { + flb_plg_error(ctx->ins, "error allocating kvlist"); + break; + } + /* Pack every field in the entry */ entries = 0; skip_entries = 0; @@ -211,58 +405,28 @@ static int in_systemd_collect(struct flb_input_instance *ins, length--; } - sep = strchr(key, '='); - if (sep == NULL) { + ret = systemd_enumerate_data_store(config, ctx->ins, + (void *)ctx, (void *)kvlist, + key, length); + if (ret == -2) { skip_entries++; continue; } - - len = (sep - key); - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_append_body_string_length( - ctx->log_encoder, len); - } - - if (ctx->lowercase == FLB_TRUE) { - /* - * Ensure buf to have enough space for the key because the libsystemd - * might return larger data than the threshold. - */ - if (buf == NULL) { - buf = flb_sds_create_len(NULL, threshold); - } - if (flb_sds_alloc(buf) < len) { - buf = flb_sds_increase(buf, len - flb_sds_alloc(buf)); - } - for (i = 0; i < len; i++) { - buf[i] = tolower(key[i]); - } - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_append_body_string_body( - ctx->log_encoder, buf, len); - } - } - else { - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_append_body_string_body( - ctx->log_encoder, (char *) key, len); - } - } - - val = sep + 1; - len = length - (sep - key) - 1; - - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_append_body_string( - ctx->log_encoder, (char *) val, len); + else if (ret == -1) { + continue; } entries++; } rows++; + /* Interpret cfl_kvlist as logs type of events later. */ + ret = append_enumerate_data(ctx, kvlist); + + if (kvlist) { + cfl_kvlist_destroy(kvlist); + } + if (skip_entries > 0) { flb_plg_error(ctx->ins, "Skip %d broken entries", skip_entries); } @@ -483,6 +647,78 @@ static int in_systemd_exit(void *data, struct flb_config *config) return 0; } +static int cb_systemd_format_test(struct flb_config *config, + struct flb_input_instance *ins, + void *plugin_context, + const void *data, size_t bytes, + void **out_data, size_t *out_size) +{ + int ret; + struct flb_systemd_config *ctx = plugin_context; + struct flb_time tm; + struct cfl_list *head = NULL; + struct cfl_list *kvs = NULL; + struct cfl_split_entry *cur = NULL; + struct cfl_kvlist *kvlist = NULL; + const char *keys; + + ret = flb_log_event_encoder_begin_record(ctx->log_encoder); + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_set_timestamp(ctx->log_encoder, &tm); + } + + /* create an empty kvlist as the labels */ + kvlist = cfl_kvlist_create(); + if (!kvlist) { + flb_plg_error(ctx->ins, "error allocating kvlist"); + return -1; + } + + keys = (const char *) data; + kvs = cfl_utils_split(keys, '\n', -1 ); + if (kvs == NULL) { + goto split_error; + } + + cfl_list_foreach(head, kvs) { + cur = cfl_list_entry(head, struct cfl_split_entry, _head); + ret = systemd_enumerate_data_store(config, ctx->ins, + (void *)ctx, (void *)kvlist, + cur->value, cur->len); + + if (ret == -2 || ret == -1) { + continue; + } + } + + /* Interpret cfl_kvlist as logs type of events later. */ + ret = append_enumerate_data(ctx, kvlist); + + if (kvlist) { + cfl_kvlist_destroy(kvlist); + } + + if (kvs != NULL) { + cfl_utils_split_free(kvs); + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = flb_log_event_encoder_commit_record(ctx->log_encoder); + } + + *out_data = ctx->log_encoder->output_buffer; + *out_size = ctx->log_encoder->output_length; + + return 0; + +split_error: + *out_data = NULL; + *out_size = 0; + + return -1; +} + static struct flb_config_map config_map[] = { { FLB_CONFIG_MAP_STR, "path", (char *)NULL, @@ -551,5 +787,9 @@ struct flb_input_plugin in_systemd_plugin = { .cb_resume = in_systemd_resume, .cb_exit = in_systemd_exit, .config_map = config_map, + + /* for testing */ + .test_formatter.callback = cb_systemd_format_test, + .flags = 0 }; diff --git a/plugins/in_systemd/systemd_config.h b/plugins/in_systemd/systemd_config.h index 83e14856b4d..af789b7ea3b 100644 --- a/plugins/in_systemd/systemd_config.h +++ b/plugins/in_systemd/systemd_config.h @@ -63,6 +63,7 @@ struct flb_systemd_config { int dynamic_tag; int max_fields; /* max number of fields per record */ int max_entries; /* max number of records per iteration */ + size_t threshold; /* threshold for retriveing journal */ #ifdef FLB_HAVE_SQLDB flb_sds_t db_path;