Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

loki_out: add stuctured_metadata_map_keys #9530

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 143 additions & 5 deletions plugins/out_loki/loki.c
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,13 @@ static void flb_loki_kv_exit(struct flb_loki *ctx)
mk_list_foreach_safe(head, tmp, &ctx->structured_metadata_list) {
kv = mk_list_entry(head, struct flb_loki_kv, _head);

/* unlink and destroy */
mk_list_del(&kv->_head);
flb_loki_kv_destroy(kv);
}
mk_list_foreach_safe(head, tmp, &ctx->structured_metadata_map_keys_list) {
kv = mk_list_entry(head, struct flb_loki_kv, _head);

/* unlink and destroy */
mk_list_del(&kv->_head);
flb_loki_kv_destroy(kv);
Expand Down Expand Up @@ -416,6 +423,94 @@ static void pack_kv(struct flb_loki *ctx,
}
}

/*
* Similar to pack_kv above, except will only use msgpack_objects of type
* MSGPACK_OBJECT_MAP, and will iterate over the keys adding each entry as a
* separate item. Non-string map values are serialised to JSON, as Loki requires
* all values to be strings.
*/
static void pack_maps(struct flb_loki *ctx,
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This essentially tries to follow the flow of the proceeding pack_kv function.

Do you need a more detailed spec of what exactly this is supposed to be doing?

It should implement the proposal descibed in the FR #9463

msgpack_packer *mp_pck,
char *tag, int tag_len,
msgpack_object *map,
struct flb_mp_map_header *mh,
struct mk_list *list)
{
struct mk_list *head;
struct flb_loki_kv *kv;

msgpack_object *start_key;
msgpack_object *out_key;
msgpack_object *out_val;

msgpack_object_map accessed_map;
uint32_t accessed_map_index;
msgpack_object_kv accessed_map_kv;

char *accessed_map_val_json;

mk_list_foreach(head, list) {
/* get the flb_loki_kv for this iteration of the loop */
kv = mk_list_entry(head, struct flb_loki_kv, _head);

/* record accessor key/value pair */
if (kv->ra_key != NULL && kv->ra_val == NULL) {

/* try to get the value for the record accessor */
if (flb_ra_get_kv_pair(kv->ra_key, *map, &start_key, &out_key, &out_val)
== MSGPACK_UNPACK_CONTINUE) {

/*
* we require the value to be a map, or it doesn't make sense as
* this is adding a map's key / values
*/
if (out_val->type != MSGPACK_OBJECT_MAP || out_val->via.map.size <= 0) {
flb_plg_debug(ctx->ins, "No valid map data found for key %s",
kv->ra_key->pattern);
}
else {
accessed_map = out_val->via.map;

/* for each entry in the accessed map... */
for (accessed_map_index = 0; accessed_map_index < accessed_map.size;
accessed_map_index++) {

/* get the entry */
accessed_map_kv = accessed_map.ptr[accessed_map_index];

/* Pack the key and value */
flb_mp_map_header_append(mh);

pack_label_key(mp_pck, (char*) accessed_map_kv.key.via.str.ptr,
accessed_map_kv.key.via.str.size);
/*
* Does this need optimising? For example, to handle
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can anyone answer this question? Should I optimise this somehow?

* bool as non-JSON?
*/
if (accessed_map_kv.val.type == MSGPACK_OBJECT_STR) {
msgpack_pack_str_with_body(mp_pck,
accessed_map_kv.val.via.str.ptr,
accessed_map_kv.val.via.str.size);
}
/*
* convert value to JSON, as Loki expects a string value
*/
else {
accessed_map_val_json = flb_msgpack_to_json_str(1024,
&accessed_map_kv.val);
if (accessed_map_val_json) {
msgpack_pack_str_with_body(mp_pck, accessed_map_val_json,
strlen(accessed_map_val_json));
flb_free(accessed_map_val_json);
}
}
}
}
}
}
}
}

static flb_sds_t pack_structured_metadata(struct flb_loki *ctx,
msgpack_packer *mp_pck,
char *tag, int tag_len,
Expand All @@ -424,7 +519,17 @@ static flb_sds_t pack_structured_metadata(struct flb_loki *ctx,
struct flb_mp_map_header mh;
/* Initialize dynamic map header */
flb_mp_map_header_init(&mh, mp_pck);
pack_kv(ctx, mp_pck, tag, tag_len, map, &mh, &ctx->structured_metadata_list);
if (ctx->structured_metadata_map_keys) {
pack_maps(ctx, mp_pck, tag, tag_len, map, &mh,
&ctx->structured_metadata_map_keys_list);
}
/*
* explicit structured_metadata entries override
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this works as expected.

The intention is, if we try to add an entry to the structured_metadata with the same key twice (first from the new structured_metadata_map_keys, then from the existing structured_metadata), then the second entry should "win" and overrwite the first.

I am assuming msgpack works this way. But maybe I actually need to do some kind of explicit check, and only add a new entry where there is not already one for a given key?

* structured_metadata_map_keys entries
* */
if (ctx->structured_metadata) {
pack_kv(ctx, mp_pck, tag, tag_len, map, &mh, &ctx->structured_metadata_list);
}
flb_mp_map_header_end(&mh);
return 0;
}
Expand Down Expand Up @@ -788,6 +893,7 @@ static int parse_labels(struct flb_loki *ctx)

flb_loki_kv_init(&ctx->labels_list);
flb_loki_kv_init(&ctx->structured_metadata_list);
flb_loki_kv_init(&ctx->structured_metadata_map_keys_list);

if (ctx->structured_metadata) {
ret = parse_kv(ctx, ctx->structured_metadata, &ctx->structured_metadata_list, &ra_used);
Expand All @@ -796,6 +902,28 @@ static int parse_labels(struct flb_loki *ctx)
}
}

/* Append structured metadata map keys set in the configuration */
if (ctx->structured_metadata_map_keys) {
mk_list_foreach(head, ctx->structured_metadata_map_keys) {
entry = mk_list_entry(head, struct flb_slist_entry, _head);
if (entry->str[0] != '$') {
flb_plg_error(ctx->ins,
"invalid structured metadata map key, the name must start "
"with '$'");
return -1;
}

ret = flb_loki_kv_append(ctx, &ctx->structured_metadata_map_keys_list,
entry->str, NULL);
if (ret == -1) {
return -1;
}
else if (ret > 0) {
ra_used++;
}
}
}

if (ctx->labels) {
ret = parse_kv(ctx, ctx->labels, &ctx->labels_list, &ra_used);
if (ret == -1) {
Expand Down Expand Up @@ -971,6 +1099,7 @@ static struct flb_loki *loki_config_create(struct flb_output_instance *ins,
ctx->ins = ins;
flb_loki_kv_init(&ctx->labels_list);
flb_loki_kv_init(&ctx->structured_metadata_list);
flb_loki_kv_init(&ctx->structured_metadata_map_keys_list);

/* Register context with plugin instance */
flb_output_set_context(ins, ctx);
Expand Down Expand Up @@ -1539,12 +1668,13 @@ static flb_sds_t loki_compose_payload(struct flb_loki *ctx,
while ((ret = flb_log_event_decoder_next(
&log_decoder,
&log_event)) == FLB_EVENT_DECODER_SUCCESS) {
msgpack_pack_array(&mp_pck, ctx->structured_metadata ? 3 : 2);
msgpack_pack_array(&mp_pck, ctx->structured_metadata ||
ctx->structured_metadata_map_keys ? 3 : 2);

/* Append the timestamp */
pack_timestamp(&mp_pck, &log_event.timestamp);
pack_record(ctx, &mp_pck, log_event.body, dynamic_tenant_id);
if (ctx->structured_metadata) {
if (ctx->structured_metadata || ctx->structured_metadata_map_keys) {
pack_structured_metadata(ctx, &mp_pck, tag, tag_len, NULL);
}
}
Expand Down Expand Up @@ -1575,12 +1705,13 @@ static flb_sds_t loki_compose_payload(struct flb_loki *ctx,
msgpack_pack_str_body(&mp_pck, "values", 6);
msgpack_pack_array(&mp_pck, 1);

msgpack_pack_array(&mp_pck, ctx->structured_metadata ? 3 : 2);
msgpack_pack_array(&mp_pck, ctx->structured_metadata ||
ctx->structured_metadata_map_keys ? 3 : 2);

/* Append the timestamp */
pack_timestamp(&mp_pck, &log_event.timestamp);
pack_record(ctx, &mp_pck, log_event.body, dynamic_tenant_id);
if (ctx->structured_metadata) {
if (ctx->structured_metadata || ctx->structured_metadata_map_keys) {
pack_structured_metadata(ctx, &mp_pck, tag, tag_len, log_event.body);
}
}
Expand Down Expand Up @@ -1905,6 +2036,13 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_loki, structured_metadata),
"optional structured metadata fields for API requests."
},

{
FLB_CONFIG_MAP_CLIST, "structured_metadata_map_keys", NULL,
0, FLB_TRUE, offsetof(struct flb_loki, structured_metadata_map_keys),
"optional structured metadata fields, as derived dynamically from configured maps "
"keys, for API requests."
},

{
FLB_CONFIG_MAP_BOOL, "auto_kubernetes_labels", "false",
Expand Down
2 changes: 2 additions & 0 deletions plugins/out_loki/loki.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ struct flb_loki {
struct mk_list *labels;
struct mk_list *label_keys;
struct mk_list *structured_metadata;
struct mk_list *structured_metadata_map_keys;
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this config property name okay structured_metadata_map_keys?

struct mk_list *remove_keys;

flb_sds_t label_map_path;
Expand All @@ -91,6 +92,7 @@ struct flb_loki {
struct flb_record_accessor *ra_k8s; /* kubernetes record accessor */
struct mk_list labels_list; /* list of flb_loki_kv nodes */
struct mk_list structured_metadata_list; /* list of flb_loki_kv nodes */
struct mk_list structured_metadata_map_keys_list; /* list of flb_loki_kv nodes */
struct mk_list remove_keys_derived; /* remove_keys with label RAs */
struct flb_mp_accessor *remove_mpa; /* remove_keys multi-pattern accessor */
struct flb_record_accessor *ra_tenant_id_key; /* dynamic tenant id key */
Expand Down
Loading
Loading