diff --git a/plugins/out_loki/loki.c b/plugins/out_loki/loki.c index 9722ebc3170..dab858c2e97 100644 --- a/plugins/out_loki/loki.c +++ b/plugins/out_loki/loki.c @@ -298,6 +298,13 @@ static void flb_loki_kv_exit(struct flb_loki *ctx) mk_list_foreach_safe(head, tmp, &ctx->structured_metadata_list) { kv = mk_list_entry(head, struct flb_loki_kv, _head); + /* unlink and destroy */ + mk_list_del(&kv->_head); + flb_loki_kv_destroy(kv); + } + mk_list_foreach_safe(head, tmp, &ctx->structured_metadata_map_keys_list) { + kv = mk_list_entry(head, struct flb_loki_kv, _head); + /* unlink and destroy */ mk_list_del(&kv->_head); flb_loki_kv_destroy(kv); @@ -416,6 +423,94 @@ static void pack_kv(struct flb_loki *ctx, } } +/* + * Similar to pack_kv above, except will only use msgpack_objects of type + * MSGPACK_OBJECT_MAP, and will iterate over the keys adding each entry as a + * separate item. Non-string map values are serialised to JSON, as Loki requires + * all values to be strings. +*/ +static void pack_maps(struct flb_loki *ctx, + msgpack_packer *mp_pck, + char *tag, int tag_len, + msgpack_object *map, + struct flb_mp_map_header *mh, + struct mk_list *list) +{ + struct mk_list *head; + struct flb_loki_kv *kv; + + msgpack_object *start_key; + msgpack_object *out_key; + msgpack_object *out_val; + + msgpack_object_map accessed_map; + uint32_t accessed_map_index; + msgpack_object_kv accessed_map_kv; + + char *accessed_map_val_json; + + mk_list_foreach(head, list) { + /* get the flb_loki_kv for this iteration of the loop */ + kv = mk_list_entry(head, struct flb_loki_kv, _head); + + /* record accessor key/value pair */ + if (kv->ra_key != NULL && kv->ra_val == NULL) { + + /* try to get the value for the record accessor */ + if (flb_ra_get_kv_pair(kv->ra_key, *map, &start_key, &out_key, &out_val) + == MSGPACK_UNPACK_CONTINUE) { + + /* + * we require the value to be a map, or it doesn't make sense as + * this is adding a map's key / values + */ + if (out_val->type != MSGPACK_OBJECT_MAP || out_val->via.map.size <= 0) { + flb_plg_debug(ctx->ins, "No valid map data found for key %s", + kv->ra_key->pattern); + } + else { + accessed_map = out_val->via.map; + + /* for each entry in the accessed map... */ + for (accessed_map_index = 0; accessed_map_index < accessed_map.size; + accessed_map_index++) { + + /* get the entry */ + accessed_map_kv = accessed_map.ptr[accessed_map_index]; + + /* Pack the key and value */ + flb_mp_map_header_append(mh); + + pack_label_key(mp_pck, (char*) accessed_map_kv.key.via.str.ptr, + accessed_map_kv.key.via.str.size); + /* + * Does this need optimising? For example, to handle + * bool as non-JSON? + */ + if (accessed_map_kv.val.type == MSGPACK_OBJECT_STR) { + msgpack_pack_str_with_body(mp_pck, + accessed_map_kv.val.via.str.ptr, + accessed_map_kv.val.via.str.size); + } + /* + * convert value to JSON, as Loki expects a string value + */ + else { + accessed_map_val_json = flb_msgpack_to_json_str(1024, + &accessed_map_kv.val); + if (accessed_map_val_json) { + msgpack_pack_str_with_body(mp_pck, accessed_map_val_json, + strlen(accessed_map_val_json)); + flb_free(accessed_map_val_json); + } + } + } + } + } + } + } +} + static flb_sds_t pack_structured_metadata(struct flb_loki *ctx, msgpack_packer *mp_pck, char *tag, int tag_len, @@ -424,7 +519,17 @@ static flb_sds_t pack_structured_metadata(struct flb_loki *ctx, struct flb_mp_map_header mh; /* Initialize dynamic map header */ flb_mp_map_header_init(&mh, mp_pck); - pack_kv(ctx, mp_pck, tag, tag_len, map, &mh, &ctx->structured_metadata_list); + if (ctx->structured_metadata_map_keys) { + pack_maps(ctx, mp_pck, tag, tag_len, map, &mh, + &ctx->structured_metadata_map_keys_list); + } + /* + * explicit structured_metadata entries override + * structured_metadata_map_keys entries + * */ + if (ctx->structured_metadata) { + pack_kv(ctx, mp_pck, tag, tag_len, map, &mh, &ctx->structured_metadata_list); + } flb_mp_map_header_end(&mh); return 0; } @@ -788,6 +893,7 @@ static int parse_labels(struct flb_loki *ctx) flb_loki_kv_init(&ctx->labels_list); flb_loki_kv_init(&ctx->structured_metadata_list); + flb_loki_kv_init(&ctx->structured_metadata_map_keys_list); if (ctx->structured_metadata) { ret = parse_kv(ctx, ctx->structured_metadata, &ctx->structured_metadata_list, &ra_used); @@ -796,6 +902,28 @@ static int parse_labels(struct flb_loki *ctx) } } + /* Append structured metadata map keys set in the configuration */ + if (ctx->structured_metadata_map_keys) { + mk_list_foreach(head, ctx->structured_metadata_map_keys) { + entry = mk_list_entry(head, struct flb_slist_entry, _head); + if (entry->str[0] != '$') { + flb_plg_error(ctx->ins, + "invalid structured metadata map key, the name must start " + "with '$'"); + return -1; + } + + ret = flb_loki_kv_append(ctx, &ctx->structured_metadata_map_keys_list, + entry->str, NULL); + if (ret == -1) { + return -1; + } + else if (ret > 0) { + ra_used++; + } + } + } + if (ctx->labels) { ret = parse_kv(ctx, ctx->labels, &ctx->labels_list, &ra_used); if (ret == -1) { @@ -971,6 +1099,7 @@ static struct flb_loki *loki_config_create(struct flb_output_instance *ins, ctx->ins = ins; flb_loki_kv_init(&ctx->labels_list); flb_loki_kv_init(&ctx->structured_metadata_list); + flb_loki_kv_init(&ctx->structured_metadata_map_keys_list); /* Register context with plugin instance */ flb_output_set_context(ins, ctx); @@ -1539,12 +1668,13 @@ static flb_sds_t loki_compose_payload(struct flb_loki *ctx, while ((ret = flb_log_event_decoder_next( &log_decoder, &log_event)) == FLB_EVENT_DECODER_SUCCESS) { - msgpack_pack_array(&mp_pck, ctx->structured_metadata ? 3 : 2); + msgpack_pack_array(&mp_pck, ctx->structured_metadata || + ctx->structured_metadata_map_keys ? 3 : 2); /* Append the timestamp */ pack_timestamp(&mp_pck, &log_event.timestamp); pack_record(ctx, &mp_pck, log_event.body, dynamic_tenant_id); - if (ctx->structured_metadata) { + if (ctx->structured_metadata || ctx->structured_metadata_map_keys) { pack_structured_metadata(ctx, &mp_pck, tag, tag_len, NULL); } } @@ -1575,12 +1705,13 @@ static flb_sds_t loki_compose_payload(struct flb_loki *ctx, msgpack_pack_str_body(&mp_pck, "values", 6); msgpack_pack_array(&mp_pck, 1); - msgpack_pack_array(&mp_pck, ctx->structured_metadata ? 3 : 2); + msgpack_pack_array(&mp_pck, ctx->structured_metadata || + ctx->structured_metadata_map_keys ? 3 : 2); /* Append the timestamp */ pack_timestamp(&mp_pck, &log_event.timestamp); pack_record(ctx, &mp_pck, log_event.body, dynamic_tenant_id); - if (ctx->structured_metadata) { + if (ctx->structured_metadata || ctx->structured_metadata_map_keys) { pack_structured_metadata(ctx, &mp_pck, tag, tag_len, log_event.body); } } @@ -1905,6 +2036,13 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct flb_loki, structured_metadata), "optional structured metadata fields for API requests." }, + + { + FLB_CONFIG_MAP_CLIST, "structured_metadata_map_keys", NULL, + 0, FLB_TRUE, offsetof(struct flb_loki, structured_metadata_map_keys), + "optional structured metadata fields, as derived dynamically from configured maps " + "keys, for API requests." + }, { FLB_CONFIG_MAP_BOOL, "auto_kubernetes_labels", "false", diff --git a/plugins/out_loki/loki.h b/plugins/out_loki/loki.h index ee5c56c83b4..69b539f9991 100644 --- a/plugins/out_loki/loki.h +++ b/plugins/out_loki/loki.h @@ -78,6 +78,7 @@ struct flb_loki { struct mk_list *labels; struct mk_list *label_keys; struct mk_list *structured_metadata; + struct mk_list *structured_metadata_map_keys; struct mk_list *remove_keys; flb_sds_t label_map_path; @@ -91,6 +92,7 @@ struct flb_loki { struct flb_record_accessor *ra_k8s; /* kubernetes record accessor */ struct mk_list labels_list; /* list of flb_loki_kv nodes */ struct mk_list structured_metadata_list; /* list of flb_loki_kv nodes */ + struct mk_list structured_metadata_map_keys_list; /* list of flb_loki_kv nodes */ struct mk_list remove_keys_derived; /* remove_keys with label RAs */ struct flb_mp_accessor *remove_mpa; /* remove_keys multi-pattern accessor */ struct flb_record_accessor *ra_tenant_id_key; /* dynamic tenant id key */ diff --git a/tests/runtime/out_loki.c b/tests/runtime/out_loki.c index 7cef907b5de..10a814c407c 100644 --- a/tests/runtime/out_loki.c +++ b/tests/runtime/out_loki.c @@ -83,7 +83,7 @@ void flb_test_basic() in_ffd = flb_input(ctx, (char *) "lib", NULL); flb_input_set(ctx, in_ffd, "tag", "test", NULL); - /* Elasticsearch output */ + /* Loki output */ out_ffd = flb_output(ctx, (char *) "loki", NULL); flb_output_set(ctx, out_ffd, "match", "test", @@ -141,7 +141,7 @@ void flb_test_labels() in_ffd = flb_input(ctx, (char *) "lib", NULL); flb_input_set(ctx, in_ffd, "tag", "test", NULL); - /* Elasticsearch output */ + /* Loki output */ out_ffd = flb_output(ctx, (char *) "loki", NULL); flb_output_set(ctx, out_ffd, "match", "test", @@ -201,7 +201,7 @@ void flb_test_label_keys() in_ffd = flb_input(ctx, (char *) "lib", NULL); flb_input_set(ctx, in_ffd, "tag", "test", NULL); - /* Elasticsearch output */ + /* Loki output */ out_ffd = flb_output(ctx, (char *) "loki", NULL); flb_output_set(ctx, out_ffd, "match", "test", @@ -260,7 +260,7 @@ void flb_test_line_format() in_ffd = flb_input(ctx, (char *) "lib", NULL); flb_input_set(ctx, in_ffd, "tag", "test", NULL); - /* Elasticsearch output */ + /* Loki output */ out_ffd = flb_output(ctx, (char *) "loki", NULL); flb_output_set(ctx, out_ffd, "match", "test", @@ -498,7 +498,7 @@ void flb_test_remove_map() in_ffd = flb_input(ctx, (char *) "lib", NULL); flb_input_set(ctx, in_ffd, "tag", "test", NULL); - /* Elasticsearch output */ + /* Loki output */ out_ffd = flb_output(ctx, (char *) "loki", NULL); flb_output_set(ctx, out_ffd, "match", "test", @@ -558,7 +558,7 @@ void flb_test_labels_ra() in_ffd = flb_input(ctx, (char *) "lib", NULL); flb_input_set(ctx, in_ffd, "tag", "test", NULL); - /* Elasticsearch output */ + /* Loki output */ out_ffd = flb_output(ctx, (char *) "loki", NULL); flb_output_set(ctx, out_ffd, "match", "test", @@ -620,7 +620,7 @@ void flb_test_remove_keys() in_ffd = flb_input(ctx, (char *) "lib", NULL); flb_input_set(ctx, in_ffd, "tag", "test", NULL); - /* Elasticsearch output */ + /* Loki output */ out_ffd = flb_output(ctx, (char *) "loki", NULL); flb_output_set(ctx, out_ffd, "match", "test", @@ -692,7 +692,7 @@ void flb_test_label_map_path() in_ffd = flb_input(ctx, (char *) "lib", NULL); flb_input_set(ctx, in_ffd, "tag", "test", NULL); - /* Elasticsearch output */ + /* Loki output */ out_ffd = flb_output(ctx, (char *) "loki", NULL); flb_output_set(ctx, out_ffd, "match", "test", @@ -739,11 +739,12 @@ static void cb_check_float_value(void *ctx, int ffd, flb_sds_destroy(out_js); } + #define JSON_FLOAT "[12345678, {\"float\":1.3}]" void flb_test_float_value() { int ret; - int size = sizeof(JSON_FLOAT) - 1; + size_t size = sizeof(JSON_FLOAT) - 1; flb_ctx_t *ctx; int in_ffd; int out_ffd; @@ -758,7 +759,7 @@ void flb_test_float_value() in_ffd = flb_input(ctx, (char *) "lib", NULL); flb_input_set(ctx, in_ffd, "tag", "test", NULL); - /* Elasticsearch output */ + /* Loki output */ out_ffd = flb_output(ctx, (char *) "loki", NULL); flb_output_set(ctx, out_ffd, "match", "test", @@ -783,6 +784,180 @@ void flb_test_float_value() flb_destroy(ctx); } +static void cb_check_structured_metadata_value(void *ctx, int ffd, + int res_ret, void *res_data, size_t res_size, + void *data) +{ + char *p; + flb_sds_t out_js = res_data; + if (!TEST_CHECK(out_js != NULL)) { + TEST_MSG("out_js is NULL"); + return; + } + + char *index_line = (char *) data; + + p = strstr(out_js, index_line); + if (!TEST_CHECK(p != NULL)) { + TEST_MSG("Expecting %s but Given:%s", index_line, out_js); + } + + flb_sds_destroy(out_js); +} + +#define JSON_MAP "[12345678, {\"log\": \"This is an interesting log message!\", " \ + "\"map1\": {\"key1\": \"value1\", \"key2\": \"value2\", \"key_nested_object_1\": " \ + "{\"sub_key1\": \"sub_value1\", \"sub_key2\": false}}, \"map2\": {\"key4\": " \ + "\"value1\", \"key5\": false}, \"map3\": {\"key1\": \"map3_value1\", \"key2\": " \ + "\"map3_value2\"}}]" +void flb_test_structured_metadata_map_params(char *remove_keys, + char *structured_metadata, + char *structured_metadata_map_keys, + char *input_log_json, + char *expected_sub_str) +{ + int ret; + size_t size = strlen(input_log_json); + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", + "log_level", "error", + NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Loki output */ + out_ffd = flb_output(ctx, (char *) "loki", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + "line_format", "key_value", + "remove_keys", remove_keys, + "drop_single_key", "on", + "labels", "service_name=my_service_name", + "structured_metadata", structured_metadata, + "structured_metadata_map_keys", structured_metadata_map_keys, + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_structured_metadata_value, + expected_sub_str, NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + ret = flb_lib_push(ctx, in_ffd, (char *) input_log_json, size); + TEST_CHECK(ret >= 0); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_structured_metadata_map_single_map() { + flb_test_structured_metadata_map_params( + "map1, map2, map3", + "", + "$map1", + JSON_MAP, + "{\"key1\":\"value1\",\"key2\":\"value2\"," + "\"key_nested_object_1\":\"{\\\"sub_key1\\\":\\\"sub_value1\\\"," + "\\\"sub_key2\\\":false}\"}"); +} + +void flb_test_structured_metadata_map_two_maps() { + flb_test_structured_metadata_map_params( + "map1, map2, map3", + "", + "$map1,$map2", + JSON_MAP, + "{\"key1\":\"value1\",\"key2\":\"value2\"," + "\"key_nested_object_1\":\"{\\\"sub_key1\\\":\\\"sub_value1\\\"," + "\\\"sub_key2\\\":false}\",\"key4\":\"value1\",\"key5\":\"false\"}"); +} + +void flb_test_structured_metadata_map_sub_map() { + flb_test_structured_metadata_map_params( + "map1, map2, map3", + "", + "$map1['key_nested_object_1']", + JSON_MAP, + "\"This is an interesting log message!\",{\"sub_key1\":\"sub_value1\"," + "\"sub_key2\":\"false\"}"); +} + +void flb_test_structured_metadata_map_both_with_non_map_value() { + flb_test_structured_metadata_map_params( + "map1, map2, map3", + "$map2", + "$map1,$map2", + JSON_MAP, + "{\"key1\":\"value1\",\"key2\":\"value2\"," + "\"key_nested_object_1\":\"{\\\"sub_key1\\\":\\\"sub_value1\\\"," + "\\\"sub_key2\\\":false}\",\"key4\":\"value1\",\"key5\":\"false\"," + "\"map2\":\"{\\\"key4\\\":\\\"value1\\\",\\\"key5\\\":false}\"}"); +} + +/* key1 is overridden by the explicit value given to structured_metadata */ +void flb_test_structured_metadata_map_value_explicit_override_map_key() { + flb_test_structured_metadata_map_params( + "map1, map2, map3", + "key1=value_explicit", + "$map1,$map2", + JSON_MAP, + "{\"key2\":\"value2\"," + "\"key_nested_object_1\":\"{\\\"sub_key1\\\":\\\"sub_value1\\\"," + "\\\"sub_key2\\\":false}\",\"key4\":\"value1\",\"key5\":\"false\"," + "\"key1\":\"value_explicit\"}"); +} + +void flb_test_structured_metadata_explicit_only_no_map() { + flb_test_structured_metadata_map_params( + "map1, map2, map3", + "key1=value_explicit", + "", + JSON_MAP, + "[\"12345678000000000\"," + "\"This is an interesting log message!\",{\"key1\":\"value_explicit\"}]"); +} + +void flb_test_structured_metadata_explicit_only_map() { + flb_test_structured_metadata_map_params( + "map1, map2, map3", + "$map2", + "", + JSON_MAP, + "{\"map2\":\"{\\\"key4\\\":\\\"value1\\\",\\\"key5\\\":false}\"}"); +} + +void flb_test_structured_metadata_map_and_explicit() { + flb_test_structured_metadata_map_params( + "map1, map2, map3", + "key_explicit=value_explicit", + "$map1", + JSON_MAP, + "[\"12345678000000000\",\"This is an interesting log message!\"," + "{\"key1\":\"value1\",\"key2\":\"value2\"," + "\"key_nested_object_1\":\"{\\\"sub_key1\\\":\\\"sub_value1\\\"," + "\\\"sub_key2\\\":false}\",\"key_explicit\":\"value_explicit\"}]"); +} + +void flb_test_structured_metadata_map_single_missing_map() { + flb_test_structured_metadata_map_params( + "map1, map2, map3", + "", + "$missing_map", + JSON_MAP, + "[\"12345678000000000\",\"This is an interesting log message!\",{}]"); +} /* Test list */ TEST_LIST = { @@ -798,5 +973,23 @@ TEST_LIST = { {"drop_single_key_raw" , flb_test_drop_single_key_raw }, {"label_map_path" , flb_test_label_map_path}, {"float_value" , flb_test_float_value}, + {"structured_metadata_map_single_map", + flb_test_structured_metadata_map_single_map}, + {"structured_metadata_map_two_maps", + flb_test_structured_metadata_map_two_maps}, + {"structured_metadata_map_sub_map", + flb_test_structured_metadata_map_sub_map}, + {"structured_metadata_map_both_with_non_map_value", + flb_test_structured_metadata_map_both_with_non_map_value}, + {"structured_metadata_map_value_explicit_override_map_key", + flb_test_structured_metadata_map_value_explicit_override_map_key}, + {"structured_metadata_explicit_only_no_map", + flb_test_structured_metadata_explicit_only_no_map}, + {"structured_metadata_explicit_only_map", + flb_test_structured_metadata_explicit_only_map}, + {"structured_metadata_map_and_explicit", + flb_test_structured_metadata_map_and_explicit}, + {"structured_metadata_map_single_missing_map", + flb_test_structured_metadata_map_single_missing_map}, {NULL, NULL} };