Skip to content

Commit

Permalink
in_systemd: Process enumerated data as cfl_kvlist(s) at first
Browse files Browse the repository at this point in the history
This is because systemctl's -o json-pretty or -o json converts
duplicated keys' values as array(s).
To avoid generating the duplicated key(s) does not resolve this issue.
Instead, we need to store as cfl_kvlist at first to detect duplicated
keys on enumerated data in journal storage.
Then, we also need to generate as msgpack's array format when the
duplicated key(s) were detected and translated as array format when
storing as cfl_kvlist(s).

Signed-off-by: Hiroshi Hatake <[email protected]>
  • Loading branch information
cosmo0920 committed Aug 28, 2024
1 parent e37b56b commit 347c77b
Showing 1 changed file with 138 additions and 17 deletions.
155 changes: 138 additions & 17 deletions plugins/in_systemd/systemd.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ static int in_systemd_collect(struct flb_input_instance *ins,
int ret_j;
int i;
int len;
int key_len;
int entries = 0;
int skip_entries = 0;
int rows = 0;
Expand All @@ -100,6 +101,15 @@ static int in_systemd_collect(struct flb_input_instance *ins,
const void *data;
struct flb_systemd_config *ctx = in_context;
struct flb_time tm;
struct cfl_kvlist *kvlist = NULL;
struct cfl_variant *cfl_val = NULL;
struct cfl_list *head;
struct cfl_kvpair *kvpair = NULL;
struct cfl_variant *v = NULL;
struct cfl_array *array = NULL;
struct cfl_variant *tmp_val = NULL;
flb_sds_t list_key = NULL;
flb_sds_t search_key = NULL;

/* Restricted by mem_buf_limit */
if (flb_input_buf_paused(ins) == FLB_TRUE) {
Expand Down Expand Up @@ -200,6 +210,13 @@ static int in_systemd_collect(struct flb_input_instance *ins,
ret = flb_log_event_encoder_set_timestamp(ctx->log_encoder, &tm);
}

/* create an empty kvlist as the labels */
kvlist = cfl_kvlist_create();
if (!kvlist) {
flb_plg_error(ctx->ins, "error allocating kvlist");
break;
}

/* Pack every field in the entry */
entries = 0;
skip_entries = 0;
Expand All @@ -218,11 +235,7 @@ static int in_systemd_collect(struct flb_input_instance *ins,
}

len = (sep - key);

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = flb_log_event_encoder_append_body_string_length(
ctx->log_encoder, len);
}
key_len = len;

if (ctx->lowercase == FLB_TRUE) {
/*
Expand All @@ -238,31 +251,139 @@ static int in_systemd_collect(struct flb_input_instance *ins,
for (i = 0; i < len; i++) {
buf[i] = tolower(key[i]);
}

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = flb_log_event_encoder_append_body_string_body(
ctx->log_encoder, buf, len);
}
list_key = flb_sds_create_len(buf, key_len);
}
else {
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = flb_log_event_encoder_append_body_string_body(
ctx->log_encoder, (char *) key, len);
}
list_key = flb_sds_create_len(key, key_len);
}

if (!list_key) {
continue;
}

/* Check existence */
cfl_val = NULL;
cfl_val = cfl_kvlist_fetch_s(kvlist, list_key, key_len);

val = sep + 1;
len = length - (sep - key) - 1;

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = flb_log_event_encoder_append_body_string(
ctx->log_encoder, (char *) val, len);
/* Initialize variable for cfl_variant operations. */
search_key = NULL;
tmp_val = NULL;

/* Store cfl_kvlist format at first to detect duplicated keys */
if (cfl_val) {
switch(cfl_val->type) {
case CFL_VARIANT_STRING:
tmp_val = cfl_variant_create_from_string(cfl_val->data.as_string);
if (!tmp_val) {
continue;
}
break;
case CFL_VARIANT_ARRAY:
/* Just a reference */
tmp_val = cfl_val;
break;
default:
/* nop */
break;
}

switch(tmp_val->type) {
case CFL_VARIANT_STRING:
search_key = flb_sds_create_len(list_key, key_len);
if (search_key != NULL) {
cfl_kvlist_remove(kvlist, list_key);
}
flb_sds_destroy(search_key);

array = cfl_array_create(8);
if (!array) {
cfl_variant_destroy(tmp_val);
continue;
}
if (cfl_array_resizable(array, CFL_TRUE) == -1) {
cfl_array_destroy(array);
cfl_variant_destroy(tmp_val);
continue;
}

cfl_array_append_string_s(array,
tmp_val->data.as_string,
strlen(tmp_val->data.as_string),
CFL_FALSE);
cfl_array_append_string_s(array, (char *)val, strlen(val), CFL_FALSE);
cfl_kvlist_insert_array_s(kvlist, list_key, key_len, array);
cfl_variant_destroy(tmp_val);
break;
case CFL_VARIANT_ARRAY:
/* Just appending the newly arrived field(s) */
array = tmp_val->data.as_array;
cfl_array_append_string_s(array, (char *)val, strlen(val), CFL_FALSE);
break;
default:
/* nop */
break;
}
}
else {
cfl_kvlist_insert_string_s(kvlist, list_key, key_len,
(char *)val, strlen(val), CFL_FALSE);
}
flb_sds_destroy(list_key);

entries++;
}
rows++;

/* Interpret cfl_kvlist as logs type of events later. */
cfl_list_foreach(head, &kvlist->list) {
kvpair = cfl_list_entry(head, struct cfl_kvpair, _head);
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = flb_log_event_encoder_append_body_string_length(
ctx->log_encoder, cfl_sds_len(kvpair->key));
}

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = flb_log_event_encoder_append_body_string_body(
ctx->log_encoder, kvpair->key, cfl_sds_len(kvpair->key));
}

v = kvpair->val;
if (v->type == CFL_VARIANT_STRING) {
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = flb_log_event_encoder_append_body_string(
ctx->log_encoder, v->data.as_string, cfl_variant_size_get(v));
}
}
else if (v->type == CFL_VARIANT_ARRAY) {
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = flb_log_event_encoder_body_begin_array(ctx->log_encoder);
}

array = v->data.as_array;
for (i = 0; i < array->entry_count; i++) {
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
if (array->entries[i]->type != CFL_VARIANT_STRING) {
continue;
}
ret = flb_log_event_encoder_append_body_string(
ctx->log_encoder, array->entries[i]->data.as_string,
cfl_variant_size_get(array->entries[i]));
}
}

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = flb_log_event_encoder_body_commit_array(ctx->log_encoder);
}
}
}

if (kvlist) {
cfl_kvlist_destroy(kvlist);
}

if (skip_entries > 0) {
flb_plg_error(ctx->ins, "Skip %d broken entries", skip_entries);
}
Expand Down

0 comments on commit 347c77b

Please sign in to comment.