From 0bea9144a2e4aeb8b8b66255401597330397f56c Mon Sep 17 00:00:00 2001 From: Stewart Webb Date: Mon, 15 Jul 2024 19:56:16 +1000 Subject: [PATCH 01/16] in_opentelemetry: Propogate tag in http2 metrics and trace handlers Signed-off-by: Stewart Webb --- plugins/in_opentelemetry/opentelemetry_prot.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugins/in_opentelemetry/opentelemetry_prot.c b/plugins/in_opentelemetry/opentelemetry_prot.c index bfab3d02145..7b89902bf29 100644 --- a/plugins/in_opentelemetry/opentelemetry_prot.c +++ b/plugins/in_opentelemetry/opentelemetry_prot.c @@ -2242,7 +2242,7 @@ static int process_payload_metrics_ng(struct flb_opentelemetry *ctx, cfl_list_foreach(iterator, &decoded_contexts) { context = cfl_list_entry(iterator, struct cmt, _head); - result = flb_input_metrics_append(ctx->ins, NULL, 0, context); + result = flb_input_metrics_append(ctx->ins, tag, cfl_sds_len(tag), context); if (result != 0) { flb_plg_debug(ctx->ins, "could not ingest metrics context : %d", result); @@ -2301,7 +2301,7 @@ static int process_payload_traces_proto_ng(struct flb_opentelemetry *ctx, } if (result == 0) { - result = flb_input_trace_append(ctx->ins, NULL, 0, decoded_context); + result = flb_input_trace_append(ctx->ins, tag, cfl_sds_len(tag), decoded_context); ctr_decode_opentelemetry_destroy(decoded_context); } else { From 0ba19ce306d56191185349a80c609d975b106921 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Fri, 22 Nov 2024 18:06:15 +0900 Subject: [PATCH 02/16] processor_labels: Process operations for output purposed contexts of metrics instead of the original one Signed-off-by: Hiroshi Hatake --- plugins/processor_labels/labels.c | 29 +++++++++++++---------------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/plugins/processor_labels/labels.c b/plugins/processor_labels/labels.c index ee34e639f25..a0afd8314d4 100644 --- a/plugins/processor_labels/labels.c +++ b/plugins/processor_labels/labels.c @@ -1713,44 +1713,41 @@ static int cb_process_metrics(struct flb_processor_instance *processor_instance, return FLB_PROCESSOR_FAILURE; } - result = delete_labels(metrics_context, + result = cmt_cat(out_cmt, metrics_context); + if (result != 0) { + cmt_destroy(out_cmt); + + return FLB_PROCESSOR_FAILURE; + } + + result = delete_labels(out_cmt, &processor_context->delete_labels); if (result == FLB_PROCESSOR_SUCCESS) { - result = update_labels(metrics_context, + result = update_labels(out_cmt, &processor_context->update_labels); } if (result == FLB_PROCESSOR_SUCCESS) { - result = upsert_labels(metrics_context, + result = upsert_labels(out_cmt, &processor_context->upsert_labels); } if (result == FLB_PROCESSOR_SUCCESS) { - result = insert_labels(metrics_context, + result = insert_labels(out_cmt, &processor_context->insert_labels); } if (result == FLB_PROCESSOR_SUCCESS) { - result = hash_labels(metrics_context, + result = hash_labels(out_cmt, &processor_context->hash_labels); } - if (result == FLB_PROCESSOR_SUCCESS) { - result = cmt_cat(out_cmt, metrics_context); - if (result != 0) { - cmt_destroy(out_cmt); - - return FLB_PROCESSOR_FAILURE; - } - - *out_context = out_cmt; - } - if (result != FLB_PROCESSOR_SUCCESS) { return FLB_PROCESSOR_FAILURE; } + *out_context = out_cmt; return FLB_PROCESSOR_SUCCESS; } From 7cafc500d710f5216a89a39ab686aa9f91a417e9 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Mon, 18 Nov 2024 22:22:20 -0600 Subject: [PATCH 03/16] filter_lua: expose env variables in FLB_ENV Lua table Signed-off-by: Eduardo Silva --- plugins/filter_lua/lua.c | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/plugins/filter_lua/lua.c b/plugins/filter_lua/lua.c index 5186955b8ae..de7e6983188 100644 --- a/plugins/filter_lua/lua.c +++ b/plugins/filter_lua/lua.c @@ -27,8 +27,10 @@ #include #include #include +#include #include #include + #include #include "fluent-bit/flb_mem.h" @@ -76,6 +78,31 @@ static int cb_lua_pre_run(struct flb_filter_instance *f_ins, return ret; } +static int env_variables(struct flb_config *config, struct flb_luajit *lj) +{ + struct mk_list *list; + struct mk_list *head; + struct flb_env *env; + struct flb_hash_table_entry *entry; + + lua_newtable(lj->state); + + env = (struct flb_env *) config->env; + list = (struct mk_list *) &env->ht->entries; + mk_list_foreach(head, list) { + entry = mk_list_entry(head, struct flb_hash_table_entry, _head_parent); + if (entry->val_size <= 0) { + continue; + } + lua_pushlstring(lj->state, entry->key, entry->key_len); + lua_pushlstring(lj->state, entry->val, entry->val_size); + lua_settable(lj->state, -3); + } + + lua_setglobal(lj->state, "FLB_ENV"); + return 0; +} + static int cb_lua_init(struct flb_filter_instance *f_ins, struct flb_config *config, void *data) @@ -101,6 +128,9 @@ static int cb_lua_init(struct flb_filter_instance *f_ins, } ctx->lua = lj; + /* register environment variables */ + env_variables(config, lj); + if (ctx->enable_flb_null) { flb_lua_enable_flb_null(lj->state); } From ff85301a46f8b88f33ae87985e0eb014675857fd Mon Sep 17 00:00:00 2001 From: Jorge Niedbalski Date: Wed, 27 Nov 2024 13:13:09 +0100 Subject: [PATCH 04/16] custom_calyptia: added interval handling and tests --- plugins/custom_calyptia/calyptia.c | 159 ++++++++----------- plugins/custom_calyptia/calyptia.h | 59 +++++++ tests/runtime/CMakeLists.txt | 26 ++++ tests/runtime/custom_calyptia_input_test.c | 172 +++++++++++++++++++++ 4 files changed, 322 insertions(+), 94 deletions(-) create mode 100644 plugins/custom_calyptia/calyptia.h create mode 100644 tests/runtime/custom_calyptia_input_test.c diff --git a/plugins/custom_calyptia/calyptia.c b/plugins/custom_calyptia/calyptia.c index c3554157598..76f5868efdc 100644 --- a/plugins/custom_calyptia/calyptia.c +++ b/plugins/custom_calyptia/calyptia.c @@ -30,40 +30,7 @@ #include -struct calyptia { - /* config map options */ - flb_sds_t api_key; - flb_sds_t store_path; - flb_sds_t cloud_host; - flb_sds_t cloud_port; - flb_sds_t machine_id; - int machine_id_auto_configured; - -/* used for reporting chunk trace records. */ -#ifdef FLB_HAVE_CHUNK_TRACE - flb_sds_t pipeline_id; -#endif /* FLB_HAVE_CHUNK_TRACE */ - - int cloud_tls; - int cloud_tls_verify; - - /* config reader for 'add_label' */ - struct mk_list *add_labels; - - /* instances */ - struct flb_input_instance *i; - struct flb_output_instance *o; - struct flb_input_instance *fleet; - struct flb_custom_instance *ins; - - /* Fleet configuration */ - flb_sds_t fleet_id; /* fleet-id */ - flb_sds_t fleet_name; - flb_sds_t fleet_config_dir; /* fleet configuration directory */ - flb_sds_t fleet_max_http_buffer_size; - int fleet_interval_sec; - int fleet_interval_nsec; -}; +#include "calyptia.h" /* * Check if the key belongs to a sensitive data field, if so report it. We never @@ -232,6 +199,53 @@ flb_sds_t custom_calyptia_pipeline_config_get(struct flb_config *ctx) return buf; } +int set_fleet_input_properties(struct calyptia *ctx, struct flb_input_instance *fleet) +{ + if (!fleet) { + flb_plg_error(ctx->ins, "invalid fleet input instance"); + return -1; + } + + if (ctx->fleet_name) { + flb_input_set_property(fleet, "fleet_name", ctx->fleet_name); + } + + if (ctx->fleet_id) { + flb_input_set_property(fleet, "fleet_id", ctx->fleet_id); + } + + flb_input_set_property(fleet, "api_key", ctx->api_key); + flb_input_set_property(fleet, "host", ctx->cloud_host); + flb_input_set_property(fleet, "port", ctx->cloud_port); + + /* Set TLS properties */ + flb_input_set_property(fleet, "tls", ctx->cloud_tls == 1 ? "on" : "off"); + flb_input_set_property(fleet, "tls.verify", ctx->cloud_tls_verify == 1 ? "on" : "off"); + + /* Optional configurations */ + if (ctx->fleet_config_dir) { + flb_input_set_property(fleet, "config_dir", ctx->fleet_config_dir); + } + + if (ctx->fleet_max_http_buffer_size) { + flb_input_set_property(fleet, "max_http_buffer_size", ctx->fleet_max_http_buffer_size); + } + + if (ctx->machine_id) { + flb_input_set_property(fleet, "machine_id", ctx->machine_id); + } + + if (ctx->fleet_interval_sec) { + flb_input_set_property(fleet, "interval_sec", ctx->fleet_interval_sec); + } + + if (ctx->fleet_interval_nsec) { + flb_input_set_property(fleet, "interval_nsec", ctx->fleet_interval_nsec); + } + + return 0; +} + static struct flb_output_instance *setup_cloud_output(struct flb_config *config, struct calyptia *ctx) { int ret; @@ -387,16 +401,14 @@ static flb_sds_t get_machine_id(struct calyptia *ctx) } static int cb_calyptia_init(struct flb_custom_instance *ins, - struct flb_config *config, - void *data) + struct flb_config *config, + void *data) { int ret; struct calyptia *ctx; - int is_fleet_mode; (void) data; ctx = flb_calloc(1, sizeof(struct calyptia)); - if (!ctx) { flb_errno(); return -1; @@ -405,7 +417,6 @@ static int cb_calyptia_init(struct flb_custom_instance *ins, /* Load the config map */ ret = flb_custom_config_map_set(ins, (void *) ctx); - if (ret == -1) { flb_free(ctx); return -1; @@ -416,21 +427,17 @@ static int cb_calyptia_init(struct flb_custom_instance *ins, /* If no machine_id has been provided via a configuration option get it from the local machine-id. */ if (!ctx->machine_id) { - /* machine id */ ctx->machine_id = get_machine_id(ctx); - if (ctx->machine_id == NULL) { flb_plg_error(ctx->ins, "unable to retrieve machine_id"); flb_free(ctx); return -1; } - ctx->machine_id_auto_configured = 1; } /* input collector */ ctx->i = flb_input_new(config, "fluentbit_metrics", NULL, FLB_TRUE); - if (!ctx->i) { flb_plg_error(ctx->ins, "could not load metrics collector"); flb_free(ctx); @@ -439,76 +446,40 @@ static int cb_calyptia_init(struct flb_custom_instance *ins, flb_input_set_property(ctx->i, "tag", "_calyptia_cloud"); flb_input_set_property(ctx->i, "scrape_on_start", "true"); + // This scrape interval should be configurable. flb_input_set_property(ctx->i, "scrape_interval", "30"); - if (ctx->fleet_name || ctx->fleet_id) { - is_fleet_mode = FLB_TRUE; - } - else { - is_fleet_mode = FLB_FALSE; - } - - /* output cloud connector */ - if ((is_fleet_mode == FLB_TRUE && ctx->fleet_id != NULL) || - (is_fleet_mode == FLB_FALSE)) { + /* Setup cloud output if needed */ + if (ctx->fleet_id != NULL || !ctx->fleet_name) { ctx->o = setup_cloud_output(config, ctx); - if (ctx->o == NULL) { flb_free(ctx); return -1; } + /* Set fleet_id for output if present */ + if (ctx->fleet_id) { + flb_output_set_property(ctx->o, "fleet_id", ctx->fleet_id); + } } + /* Setup fleet input if needed */ if (ctx->fleet_id || ctx->fleet_name) { - ctx->fleet = flb_input_new(config, "calyptia_fleet", NULL, FLB_FALSE); - + ctx->fleet = flb_input_new(config, "calyptia_fleet", NULL, FLB_FALSE); if (!ctx->fleet) { flb_plg_error(ctx->ins, "could not load Calyptia Fleet plugin"); return -1; } - if (ctx->fleet_name) { - flb_input_set_property(ctx->fleet, "fleet_name", ctx->fleet_name); - } - - if (ctx->fleet_id) { - flb_output_set_property(ctx->o, "fleet_id", ctx->fleet_id); - flb_input_set_property(ctx->fleet, "fleet_id", ctx->fleet_id); - } - - flb_input_set_property(ctx->fleet, "api_key", ctx->api_key); - flb_input_set_property(ctx->fleet, "host", ctx->cloud_host); - flb_input_set_property(ctx->fleet, "port", ctx->cloud_port); - - if (ctx->cloud_tls == 1) { - flb_input_set_property(ctx->fleet, "tls", "on"); - } - else { - flb_input_set_property(ctx->fleet, "tls", "off"); - } - - if (ctx->cloud_tls_verify == 1) { - flb_input_set_property(ctx->fleet, "tls.verify", "on"); - } - else { - flb_input_set_property(ctx->fleet, "tls.verify", "off"); - } - - if (ctx->fleet_config_dir) { - flb_input_set_property(ctx->fleet, "config_dir", ctx->fleet_config_dir); - } - - if (ctx->fleet_max_http_buffer_size) { - flb_input_set_property(ctx->fleet, "max_http_buffer_size", ctx->fleet_max_http_buffer_size); - } - if (ctx->machine_id) { - flb_input_set_property(ctx->fleet, "machine_id", ctx->machine_id); + ret = set_fleet_input_properties(ctx, ctx->fleet); + if (ret == -1) { + return -1; } } if (ctx->o) { flb_router_connect(ctx->i, ctx->o); } + flb_plg_info(ins, "custom initialized!"); return 0; } @@ -587,12 +558,12 @@ static struct flb_config_map config_map[] = { "Base path for the configuration directory." }, { - FLB_CONFIG_MAP_INT, "fleet.interval_sec", "-1", + FLB_CONFIG_MAP_STR, "fleet.interval_sec", "-1", 0, FLB_TRUE, offsetof(struct calyptia, fleet_interval_sec), "Set the collector interval" }, { - FLB_CONFIG_MAP_INT, "fleet.interval_nsec", "-1", + FLB_CONFIG_MAP_STR, "fleet.interval_nsec", "-1", 0, FLB_TRUE, offsetof(struct calyptia, fleet_interval_nsec), "Set the collector interval (nanoseconds)" }, diff --git a/plugins/custom_calyptia/calyptia.h b/plugins/custom_calyptia/calyptia.h new file mode 100644 index 00000000000..e1f4dd36770 --- /dev/null +++ b/plugins/custom_calyptia/calyptia.h @@ -0,0 +1,59 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_CALYPTIA_H +#define FLB_CALYPTIA_H + +struct calyptia { + /* config map options */ + flb_sds_t api_key; + flb_sds_t store_path; + flb_sds_t cloud_host; + flb_sds_t cloud_port; + flb_sds_t machine_id; + int machine_id_auto_configured; + +/* used for reporting chunk trace records. */ +#ifdef FLB_HAVE_CHUNK_TRACE + flb_sds_t pipeline_id; +#endif /* FLB_HAVE_CHUNK_TRACE */ + + int cloud_tls; + int cloud_tls_verify; + + /* config reader for 'add_label' */ + struct mk_list *add_labels; + + /* instances */ + struct flb_input_instance *i; + struct flb_output_instance *o; + struct flb_input_instance *fleet; + struct flb_custom_instance *ins; + + /* Fleet configuration */ + flb_sds_t fleet_id; /* fleet-id */ + flb_sds_t fleet_name; + flb_sds_t fleet_config_dir; /* fleet configuration directory */ + flb_sds_t fleet_max_http_buffer_size; + flb_sds_t fleet_interval_sec; + flb_sds_t fleet_interval_nsec; +}; + +int set_fleet_input_properties(struct calyptia *ctx, struct flb_input_instance *fleet); +#endif /* FLB_CALYPTIA_H */ diff --git a/tests/runtime/CMakeLists.txt b/tests/runtime/CMakeLists.txt index ea1a1da3199..f355294ed9f 100644 --- a/tests/runtime/CMakeLists.txt +++ b/tests/runtime/CMakeLists.txt @@ -60,6 +60,32 @@ if(FLB_OUT_LIB) FLB_RT_TEST(FLB_IN_KUBERNETES_EVENTS "in_kubernetes_events.c") endif() +if (FLB_CUSTOM_CALYPTIA) + # Define common variables for calyptia tests + set(CALYPTIA_TEST_LINK_LIBS + fluent-bit-static + ${CMAKE_THREAD_LIBS_INIT} + ) + + # Add calyptia input properties test + set(TEST_TARGET "flb-rt-calyptia_input_properties") + add_executable(${TEST_TARGET} + "custom_calyptia_input_test.c" + "../../plugins/custom_calyptia/calyptia.c" + ) + + target_link_libraries(${TEST_TARGET} + ${CALYPTIA_TEST_LINK_LIBS} + ) + + add_test(NAME ${TEST_TARGET} + COMMAND ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/${TEST_TARGET} + WORKING_DIRECTORY ${CMAKE_HOME_DIRECTORY}/build) + + set_tests_properties(${TEST_TARGET} PROPERTIES LABELS "runtime") + add_dependencies(${TEST_TARGET} fluent-bit-static) +endif() + if(FLB_IN_EBPF) # Define common variables set(EBPF_TEST_INCLUDE_DIRS diff --git a/tests/runtime/custom_calyptia_input_test.c b/tests/runtime/custom_calyptia_input_test.c new file mode 100644 index 00000000000..ffc774c83d9 --- /dev/null +++ b/tests/runtime/custom_calyptia_input_test.c @@ -0,0 +1,172 @@ +#include +#include +#include +#include +#include +#include +#include +#include "flb_tests_runtime.h" +#include "../../plugins/custom_calyptia/calyptia.h" + +/* Test context structure */ +struct test_context { + struct calyptia *ctx; + struct flb_input_instance *fleet; + struct flb_config *config; +}; + +/* Initialize test context */ +static struct test_context *init_test_context() +{ + struct test_context *t_ctx = flb_calloc(1, sizeof(struct test_context)); + if (!t_ctx) { + return NULL; + } + + t_ctx->config = flb_config_init(); + if (!t_ctx->config) { + flb_free(t_ctx); + return NULL; + } + + t_ctx->ctx = flb_calloc(1, sizeof(struct calyptia)); + if (!t_ctx->ctx) { + flb_config_exit(t_ctx->config); + flb_free(t_ctx); + return NULL; + } + + /* Initialize plugin instance for logging */ + t_ctx->ctx->ins = flb_calloc(1, sizeof(struct flb_custom_instance)); + if (!t_ctx->ctx->ins) { + flb_free(t_ctx->ctx); + flb_config_exit(t_ctx->config); + flb_free(t_ctx); + return NULL; + } + + /* Initialize test values in ctx */ + t_ctx->ctx->api_key = flb_strdup("test_api_key"); + t_ctx->ctx->fleet_config_dir = flb_strdup("/test/config/dir"); + t_ctx->ctx->fleet_id = flb_strdup("test_fleet_id"); + t_ctx->ctx->fleet_name = flb_strdup("test_fleet"); + t_ctx->ctx->machine_id = flb_strdup("test_machine_id"); + t_ctx->ctx->fleet_max_http_buffer_size = flb_strdup("1024"); + t_ctx->ctx->fleet_interval_sec = flb_strdup("60"); + t_ctx->ctx->fleet_interval_nsec = flb_strdup("500000000"); + + t_ctx->fleet = flb_input_new(t_ctx->config, "calyptia_fleet", NULL, FLB_FALSE); + if (!t_ctx->fleet) { + if (t_ctx->ctx->ins) flb_free(t_ctx->ctx->ins); + flb_free(t_ctx->ctx); + flb_config_exit(t_ctx->config); + flb_free(t_ctx); + return NULL; + } + + return t_ctx; +} + +static void cleanup_test_context(struct test_context *t_ctx) +{ + if (!t_ctx) { + return; + } + + if (t_ctx->fleet) { + /* Input instance cleanup */ + flb_input_instance_destroy(t_ctx->fleet); + } + + if (t_ctx->ctx) { + if (t_ctx->ctx->api_key) flb_free(t_ctx->ctx->api_key); + if (t_ctx->ctx->fleet_config_dir) flb_free(t_ctx->ctx->fleet_config_dir); + if (t_ctx->ctx->fleet_id) flb_free(t_ctx->ctx->fleet_id); + if (t_ctx->ctx->fleet_name) flb_free(t_ctx->ctx->fleet_name); + if (t_ctx->ctx->machine_id) flb_free(t_ctx->ctx->machine_id); + if (t_ctx->ctx->fleet_max_http_buffer_size) flb_free(t_ctx->ctx->fleet_max_http_buffer_size); + if (t_ctx->ctx->fleet_interval_sec) flb_free(t_ctx->ctx->fleet_interval_sec); + if (t_ctx->ctx->fleet_interval_nsec) flb_free(t_ctx->ctx->fleet_interval_nsec); + if (t_ctx->ctx->ins) flb_free(t_ctx->ctx->ins); + flb_free(t_ctx->ctx); + } + + if (t_ctx->config) { + /* Destroy the config which will cleanup any remaining instances */ + flb_config_exit(t_ctx->config); + } + + flb_free(t_ctx); +} + +void test_set_fleet_input_properties() +{ + struct test_context *t_ctx = init_test_context(); + TEST_CHECK(t_ctx != NULL); + + /* Test setting properties */ + int ret = set_fleet_input_properties(t_ctx->ctx, t_ctx->fleet); + TEST_CHECK(ret == 0); + + /* Verify properties were set correctly */ + const char *value; + + /* Check api_key */ + value = flb_input_get_property("api_key", t_ctx->fleet); + TEST_CHECK(value != NULL); + TEST_MSG("api_key expected=%s got=%s", t_ctx->ctx->api_key, value); + TEST_CHECK(value && strcmp(value, t_ctx->ctx->api_key) == 0); + + /* Check config_dir */ + value = flb_input_get_property("config_dir", t_ctx->fleet); + TEST_CHECK(value != NULL); + TEST_MSG("config_dir expected=%s got=%s", t_ctx->ctx->fleet_config_dir, value); + TEST_CHECK(value && strcmp(value, t_ctx->ctx->fleet_config_dir) == 0); + + /* Check fleet_id */ + value = flb_input_get_property("fleet_id", t_ctx->fleet); + TEST_CHECK(value != NULL); + TEST_MSG("fleet_id expected=%s got=%s", t_ctx->ctx->fleet_id, value); + TEST_CHECK(value && strcmp(value, t_ctx->ctx->fleet_id) == 0); + + /* Check fleet_name */ + value = flb_input_get_property("fleet_name", t_ctx->fleet); + TEST_CHECK(value != NULL); + TEST_MSG("fleet_name expected=%s got=%s", t_ctx->ctx->fleet_name, value); + TEST_CHECK(value && strcmp(value, t_ctx->ctx->fleet_name) == 0); + + /* Check machine_id */ + value = flb_input_get_property("machine_id", t_ctx->fleet); + TEST_CHECK(value != NULL); + TEST_MSG("machine_id expected=%s got=%s", t_ctx->ctx->machine_id, value); + TEST_CHECK(value && strcmp(value, t_ctx->ctx->machine_id) == 0); + + /* Check max_http_buffer_size */ + value = flb_input_get_property("max_http_buffer_size", t_ctx->fleet); + TEST_CHECK(value != NULL); + TEST_MSG("max_http_buffer_size expected=%s got=%s", t_ctx->ctx->fleet_max_http_buffer_size, value); + TEST_CHECK(value && strcmp(value, t_ctx->ctx->fleet_max_http_buffer_size) == 0); + + // /* Check interval_sec */ + value = flb_input_get_property("interval_sec", t_ctx->fleet); + TEST_CHECK(value != NULL); + TEST_MSG("interval_sec expected=%s got=%s", t_ctx->ctx->fleet_interval_sec, value); + TEST_CHECK(value && strcmp(value, t_ctx->ctx->fleet_interval_sec) == 0); + + // /* Check interval_nsec */ + value = flb_input_get_property("interval_nsec", t_ctx->fleet); + TEST_CHECK(value != NULL); + TEST_MSG("interval_nsec expected=%s got=%s", t_ctx->ctx->fleet_interval_nsec, value); + TEST_CHECK(value && strcmp(value, t_ctx->ctx->fleet_interval_nsec) == 0); + + ret = set_fleet_input_properties(t_ctx->ctx, NULL); + TEST_CHECK(ret == -1); + + cleanup_test_context(t_ctx); +} + +/* Define test list */ +TEST_LIST = { + {"set_fleet_input_properties", test_set_fleet_input_properties}, + {NULL, NULL} +}; \ No newline at end of file From 6ba1121e017b8857c47f3f98197f9cef92509ee9 Mon Sep 17 00:00:00 2001 From: Jorge Niedbalski Date: Wed, 27 Nov 2024 13:13:24 +0100 Subject: [PATCH 05/16] in_calyptia_fleet: improved interval handling --- plugins/in_calyptia_fleet/in_calyptia_fleet.c | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/plugins/in_calyptia_fleet/in_calyptia_fleet.c b/plugins/in_calyptia_fleet/in_calyptia_fleet.c index f61068ceede..28e3fc8719c 100644 --- a/plugins/in_calyptia_fleet/in_calyptia_fleet.c +++ b/plugins/in_calyptia_fleet/in_calyptia_fleet.c @@ -2233,7 +2233,6 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in, ctx->collect_fd = -1; ctx->fleet_id_found = FLB_FALSE; - /* Load the config map */ ret = flb_input_config_map_set(in, (void *) ctx); if (ret == -1) { @@ -2278,14 +2277,16 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in, return -1; } + /* Log initial interval values */ + flb_plg_debug(ctx->ins, "initial collector interval: sec=%d nsec=%d", + ctx->interval_sec, ctx->interval_nsec); + if (ctx->interval_sec <= 0 && ctx->interval_nsec <= 0) { /* Illegal settings. Override them. */ ctx->interval_sec = atoi(DEFAULT_INTERVAL_SEC); ctx->interval_nsec = atoi(DEFAULT_INTERVAL_NSEC); - } - - if (ctx->interval_sec < atoi(DEFAULT_INTERVAL_SEC)) { - ctx->interval_sec = atoi(DEFAULT_INTERVAL_SEC); + flb_plg_info(ctx->ins, "invalid interval settings, using defaults: sec=%d nsec=%d", + ctx->interval_sec, ctx->interval_nsec); } /* Set the context */ @@ -2328,6 +2329,8 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in, } ctx->collect_fd = ret; + flb_plg_info(ctx->ins, "fleet collector initialized with interval: %d sec %d nsec", + ctx->interval_sec, ctx->interval_nsec); return 0; } From 0e0c50119834cd6ca412f006b7c44692c1119427 Mon Sep 17 00:00:00 2001 From: shuaichen Date: Wed, 27 Nov 2024 06:51:27 -0800 Subject: [PATCH 06/16] out_stackdriver bug fix: return cached token when current_timestamp is less than cached_expiration (#9652) * out_stackdriver: return cached token when current_timestamp is less than cached_expiration. Signed-off-by: shuaichen * stackdriver: revert log line change Signed-off-by: Braydon Kains <93549768+braydonk@users.noreply.github.com> --------- Signed-off-by: shuaichen Signed-off-by: Braydon Kains <93549768+braydonk@users.noreply.github.com> Co-authored-by: Braydon Kains <93549768+braydonk@users.noreply.github.com> --- plugins/out_stackdriver/stackdriver.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/plugins/out_stackdriver/stackdriver.c b/plugins/out_stackdriver/stackdriver.c index 903bffce98d..99ef657a0b4 100644 --- a/plugins/out_stackdriver/stackdriver.c +++ b/plugins/out_stackdriver/stackdriver.c @@ -357,6 +357,7 @@ static flb_sds_t get_google_token(struct flb_stackdriver *ctx) int ret = 0; flb_sds_t output = NULL; time_t cached_expiration = 0; + time_t current_timestamp = 0; ret = pthread_mutex_trylock(&ctx->token_mutex); if (ret == EBUSY) { @@ -369,7 +370,9 @@ static flb_sds_t get_google_token(struct flb_stackdriver *ctx) */ output = oauth2_cache_to_token(); cached_expiration = oauth2_cache_get_expiration(); - if (time(NULL) >= cached_expiration) { + current_timestamp = time(NULL); + + if (current_timestamp < cached_expiration) { return output; } else { /* From ce2d3711ee577e649ea293c2e0b16ad71e571fab Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Fri, 13 Sep 2024 20:27:21 +0900 Subject: [PATCH 07/16] http_client: Implement response testing framework Signed-off-by: Hiroshi Hatake --- include/fluent-bit/flb_http_client.h | 82 ++++++++++++ src/flb_http_client.c | 178 +++++++++++++++++++++++---- 2 files changed, 238 insertions(+), 22 deletions(-) diff --git a/include/fluent-bit/flb_http_client.h b/include/fluent-bit/flb_http_client.h index 534538dd484..2573fba8ea6 100644 --- a/include/fluent-bit/flb_http_client.h +++ b/include/fluent-bit/flb_http_client.h @@ -147,6 +147,71 @@ struct flb_http_debug { int (*cb_debug_request_payload); }; +/* To make opaque struct */ +struct flb_http_client; + +/* + * Tests callbacks + * =============== + */ +struct flb_test_http_response { + /* + * Response Test Mode + * ==================== + * When the response test enable the test response mode, it needs to + * keep a reference of the context and other information: + * + * - rt_ctx : flb_http_client context + * + * - rt_status : HTTP response code + * + * - rt_in_callback: intermediary function to receive the results of + * the http response test function. + * + * - rt_data: opaque data type for rt_in_callback() + */ + + /* runtime library context */ + void *rt_ctx; + + /* HTTP status */ + int rt_status; + + /* optional response context */ + void *response_ctx; + + /* + * "response test callback": this function pointer is used by Fluent Bit + * http client testing mode to reference a test function that must retrieve the + * results of 'callback'. Consider this an intermediary function to + * transfer the results to the runtime test. + * + * This function is private and should not be set manually in the plugin + * code, it's set on src/flb_http_client.c . + */ + void (*rt_resp_callback) (void *, int, void *, size_t, void *); + + /* + * opaque data type passed by the runtime library to be used on + * rt_in_callback(). + */ + void *rt_data; + + /* + * Callback + * ========= + * "HTTP response callback": it references the plugin function that performs + * to validate HTTP response by HTTP client. This entry is mostly to + * expose the plugin local function. + */ + int (*callback) (/* plugin that ingested the records */ + struct flb_http_client *, + const void *, /* incoming response data */ + size_t, /* incoming response size */ + void **, /* output buffer */ + size_t *); /* output buffer size */ +}; + /* Set a request type */ struct flb_http_client { /* Upstream connection */ @@ -180,6 +245,10 @@ struct flb_http_client { /* Response */ struct flb_http_client_response resp; + /* Tests */ + int test_mode; + struct flb_test_http_response test_response; + /* Reference to Callback context */ void *cb_ctx; }; @@ -282,6 +351,13 @@ struct flb_http_client *flb_http_client(struct flb_connection *u_conn, const char *host, int port, const char *proxy, int flags); +/* For fulfilling HTTP response testing (dummy client) */ +struct flb_http_client *flb_http_dummy_client(struct flb_connection *u_conn, + int method, const char *uri, + const char *body, size_t body_len, + const char *host, int port, + const char *proxy, int flags); + int flb_http_add_header(struct flb_http_client *c, const char *key, size_t key_len, const char *val, size_t val_len); @@ -297,6 +373,12 @@ int flb_http_set_keepalive(struct flb_http_client *c); int flb_http_set_content_encoding_gzip(struct flb_http_client *c); int flb_http_set_callback_context(struct flb_http_client *c, struct flb_callback *cb_ctx); +int flb_http_set_response_test(struct flb_http_client *c, char *test_name, + const void *data, size_t len, + int status, + void (*resp_callback) (void *, int, void *, size_t, void *), + void *resp_callback_data); +int flb_http_push_response(struct flb_http_client *c, const void *data, size_t len); int flb_http_get_response_data(struct flb_http_client *c, size_t bytes_consumed); int flb_http_do_request(struct flb_http_client *c, size_t *bytes); diff --git a/src/flb_http_client.c b/src/flb_http_client.c index b22926b7be2..54113fd4bcf 100644 --- a/src/flb_http_client.c +++ b/src/flb_http_client.c @@ -637,7 +637,7 @@ static int add_host_and_content_length(struct flb_http_client *c) return 0; } -struct flb_http_client *flb_http_client(struct flb_connection *u_conn, +struct flb_http_client *create_http_client(struct flb_connection *u_conn, int method, const char *uri, const char *body, size_t body_len, const char *host, int port, @@ -749,24 +749,81 @@ struct flb_http_client *flb_http_client(struct flb_connection *u_conn, c->query_string = p; } - /* Is Upstream connection using keepalive mode ? */ - if (flb_stream_get_flag_status(&u_conn->upstream->base, FLB_IO_TCP_KA)) { - c->flags |= FLB_HTTP_KA; - } - /* Response */ c->resp.content_length = -1; c->resp.connection_close = -1; - if ((flags & FLB_HTTP_10) == 0) { - c->flags |= FLB_HTTP_11; - } - if (body && body_len > 0) { c->body_buf = body; c->body_len = body_len; } + /* 'Read' buffer size */ + c->resp.data = flb_malloc(FLB_HTTP_DATA_SIZE_MAX); + if (!c->resp.data) { + flb_errno(); + flb_http_client_destroy(c); + return NULL; + } + c->resp.data[0] = '\0'; + c->resp.data_len = 0; + c->resp.data_size = FLB_HTTP_DATA_SIZE_MAX; + c->resp.data_size_max = FLB_HTTP_DATA_SIZE_MAX; + + /* Tests */ + c->test_mode = FLB_FALSE; + c->test_response.callback = NULL; + + return c; +} + +struct flb_http_client *flb_http_dummy_client(struct flb_connection *u_conn, + int method, const char *uri, + const char *body, size_t body_len, + const char *host, int port, + const char *proxy, int flags) +{ + struct flb_http_client *c; + + c = create_http_client(u_conn, method, uri, + body, body_len, + host, port, + proxy, flags); + + if (!c) { + return NULL; + } + + return c; +} + +struct flb_http_client *flb_http_client(struct flb_connection *u_conn, + int method, const char *uri, + const char *body, size_t body_len, + const char *host, int port, + const char *proxy, int flags) +{ + int ret; + struct flb_http_client *c; + + c = create_http_client(u_conn, method, uri, + body, body_len, + host, port, + proxy, flags); + + if (!c) { + return NULL; + } + + /* Is Upstream connection using keepalive mode ? */ + if (flb_stream_get_flag_status(&u_conn->upstream->base, FLB_IO_TCP_KA)) { + c->flags |= FLB_HTTP_KA; + } + + if ((flags & FLB_HTTP_10) == 0) { + c->flags |= FLB_HTTP_11; + } + add_host_and_content_length(c); /* Check proxy data */ @@ -780,18 +837,6 @@ struct flb_http_client *flb_http_client(struct flb_connection *u_conn, } } - /* 'Read' buffer size */ - c->resp.data = flb_malloc(FLB_HTTP_DATA_SIZE_MAX); - if (!c->resp.data) { - flb_errno(); - flb_http_client_destroy(c); - return NULL; - } - c->resp.data[0] = '\0'; - c->resp.data_len = 0; - c->resp.data_size = FLB_HTTP_DATA_SIZE_MAX; - c->resp.data_size_max = FLB_HTTP_DATA_SIZE_MAX; - return c; } @@ -1075,6 +1120,91 @@ int flb_http_set_callback_context(struct flb_http_client *c, return 0; } +int flb_http_set_response_test(struct flb_http_client *c, char *test_name, + const void *data, size_t len, + int status, + void (*resp_callback) (void *, int, void *, size_t, void *), + void *resp_callback_data) +{ + if (!c) { + return -1; + } + + /* + * Enabling a test, set the http_client instance in 'test' mode, so no real + * http request is invoked, only the desired implemented test. + */ + + /* Response test */ + if (strcmp(test_name, "response") == 0) { + c->test_mode = FLB_TRUE; + c->test_response.rt_ctx = c; + c->test_response.rt_status = status; + c->test_response.rt_resp_callback = resp_callback; + c->test_response.rt_data = resp_callback_data; + if (data != NULL && len > 0) { + c->resp.payload = (char *)data; + c->resp.payload_size = len; + c->resp.status = status; + } + } + else { + return -1; + } + + return 0; +} + +static int flb_http_run_response_test(struct flb_http_client *c, + const void *data, size_t len) +{ + int ret = 0; + void *out_buf = NULL; + size_t out_size = 0; + struct flb_test_http_response *htr; + + if (!c) { + return -1; + } + + htr = &c->test_response; + + /* Invoke the output plugin formatter test callback */ + ret = htr->callback(c, + data, len, + &out_buf, &out_size); + + /* Call the runtime test callback checker */ + if (htr->rt_resp_callback) { + htr->rt_resp_callback(htr->rt_ctx, + ret, + out_buf, out_size, + htr->rt_data); + } + else { + flb_free(out_buf); + } + + return 0; +} + +/* Push some response into the http client */ +static int flb_http_stub_response(struct flb_http_client *c) +{ + int ret = 0; + + if (!c) { + return -1; + } + + /* If http client's test_responses is registered, run the stub. */ + if (c->test_response.callback != NULL && c->resp.payload != NULL) { + ret = flb_http_run_response_test(c, c->resp.payload, c->resp.payload_size); + } + + return ret; +} + int flb_http_add_auth_header(struct flb_http_client *c, const char *user, const char *passwd, const char *header) { int ret; @@ -1367,6 +1497,10 @@ int flb_http_do(struct flb_http_client *c, size_t *bytes) { int ret; + if (c->test_mode == FLB_TRUE) { + return flb_http_stub_response(c); + } + ret = flb_http_do_request(c, bytes); if (ret != 0) { return ret; From fef9cb62b2ad535c97896807f0f8ae590464f795 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 17 Sep 2024 18:00:31 +0900 Subject: [PATCH 08/16] output: Add a capability to inject HTTP response testing environment Signed-off-by: Hiroshi Hatake --- include/fluent-bit/flb_output.h | 62 +++++++++++++++++++++++++++++++++ src/flb_output.c | 1 + 2 files changed, 63 insertions(+) diff --git a/include/fluent-bit/flb_output.h b/include/fluent-bit/flb_output.h index b834886f0fa..c8b007806f2 100644 --- a/include/fluent-bit/flb_output.h +++ b/include/fluent-bit/flb_output.h @@ -168,6 +168,66 @@ struct flb_test_out_formatter { size_t *); /* output buffer size */ }; +struct flb_test_out_response { + /* + * Runtime Library Mode + * ==================== + * When the runtime library enable the test formatter mode, it needs to + * keep a reference of the context and other information: + * + * - rt_ctx : context created by flb_create() + * + * - rt_ffd : this plugin assigned 'integer' created by flb_output() + * + * - rt_step_calback: intermediary function to receive the results of + * the formatter plugin test function. + * + * - rt_data: opaque data type for rt_step_callback() + */ + + /* runtime library context */ + void *rt_ctx; + + /* runtime library: assigned plugin integer */ + int rt_ffd; + + /* + * "runtime step callback": this function pointer is used by Fluent Bit + * library mode to reference a test function that must retrieve the + * results of 'callback'. Consider this an intermediary function to + * transfer the results to the runtime test. + * + * This function is private and should not be set manually in the plugin + * code, it's set on src/flb_lib.c . + */ + void (*rt_out_response) (void *, int, int, void *, size_t, void *); + + /* + * opaque data type passed by the runtime library to be used on + * rt_step_test(). + */ + void *rt_data; + + /* optional context for flush callback */ + void *flush_ctx; + + /* + * Callback + * ========= + * "Formatter callback": it references the plugin function that performs + * data formatting (msgpack -> local data). This entry is mostly to + * expose the plugin local function. + */ + int (*callback) (/* Fluent Bit context */ + struct flb_config *, + void *, /* plugin instance context */ + int status, /* HTTP status code */ + const void *, /* respond msgpack data */ + size_t, /* respond msgpack size */ + void **, /* output buffer */ + size_t *); /* output buffer size */ +}; + struct flb_output_plugin { /* * a 'mask' to define what kind of data the plugin can manage: @@ -241,6 +301,7 @@ struct flb_output_plugin { /* Tests */ struct flb_test_out_formatter test_formatter; + struct flb_test_out_response test_response; /* Link to global list from flb_config->outputs */ struct mk_list _head; @@ -391,6 +452,7 @@ struct flb_output_instance { /* Tests */ struct flb_test_out_formatter test_formatter; + struct flb_test_out_response test_response; /* * Buffer counter: it counts the total of disk space (filesystem) used by buffers diff --git a/src/flb_output.c b/src/flb_output.c index 90593905960..288fc9dbb1f 100644 --- a/src/flb_output.c +++ b/src/flb_output.c @@ -729,6 +729,7 @@ struct flb_output_instance *flb_output_new(struct flb_config *config, /* Tests */ instance->test_formatter.callback = plugin->test_formatter.callback; + instance->test_response.callback = plugin->test_response.callback; return instance; From 69d148e36b64e137b8d8985a7c939e2a1ff2960a Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 17 Sep 2024 18:02:42 +0900 Subject: [PATCH 09/16] lib: Implement injecting HTTP response mechanism Signed-off-by: Hiroshi Hatake --- include/fluent-bit/flb_lib.h | 6 +++ src/flb_lib.c | 89 ++++++++++++++++++++++++++++++++++++ 2 files changed, 95 insertions(+) diff --git a/include/fluent-bit/flb_lib.h b/include/fluent-bit/flb_lib.h index f96b06eff86..897ea0098ee 100644 --- a/include/fluent-bit/flb_lib.h +++ b/include/fluent-bit/flb_lib.h @@ -68,6 +68,9 @@ FLB_EXPORT int flb_output_set_test(flb_ctx_t *ctx, int ffd, char *test_name, void *test_ctx); FLB_EXPORT int flb_output_set_callback(flb_ctx_t *ctx, int ffd, char *name, void (*cb)(char *, void *, void *)); +FLB_EXPORT int flb_output_set_http_test(flb_ctx_t *ctx, int ffd, char *test_name, + void (*out_response) (void *, int, int, void *, size_t, void *), + void *out_callback_data); FLB_EXPORT int flb_filter_set(flb_ctx_t *ctx, int ffd, ...); FLB_EXPORT int flb_service_set(flb_ctx_t *ctx, ...); @@ -84,6 +87,9 @@ FLB_EXPORT int flb_loop(flb_ctx_t *ctx); FLB_EXPORT int flb_lib_push(flb_ctx_t *ctx, int ffd, const void *data, size_t len); FLB_EXPORT int flb_lib_config_file(flb_ctx_t *ctx, const char *path); +/* Emulate ingestions of HTTP responses for output plugins */ +FLB_EXPORT int flb_lib_response(flb_ctx_t *ctx, int ffd, int status, const void *data, size_t len); + /* library context handling */ FLB_EXPORT void flb_context_set(flb_ctx_t *ctx); FLB_EXPORT flb_ctx_t *flb_context_get(); diff --git a/src/flb_lib.c b/src/flb_lib.c index 1821a2e61ad..b9674e06824 100644 --- a/src/flb_lib.c +++ b/src/flb_lib.c @@ -347,6 +347,37 @@ int flb_input_set_processor(flb_ctx_t *ctx, int ffd, struct flb_processor *proc) return 0; } +int flb_output_set_http_test(flb_ctx_t *ctx, int ffd, char *test_name, + void (*out_response) (void *, int, int, void *, size_t, void *), + void *out_callback_data) +{ + struct flb_output_instance *o_ins; + + o_ins = out_instance_get(ctx, ffd); + if (!o_ins) { + return -1; + } + + /* + * Enabling a test, set the output instance in 'test' mode, so no real + * flush callback is invoked, only the desired implemented test. + */ + + /* Response test */ + if (strcmp(test_name, "response") == 0) { + o_ins->test_mode = FLB_TRUE; + o_ins->test_response.rt_ctx = ctx; + o_ins->test_response.rt_ffd = ffd; + o_ins->test_response.rt_out_response = out_response; + o_ins->test_response.rt_data = out_callback_data; + } + else { + return -1; + } + + return 0; +} + static inline int flb_config_map_property_check(char *plugin_name, struct mk_list *config_map, char *key, char *val) { struct flb_kv *kv; @@ -638,6 +669,41 @@ int flb_lib_free(void* data) } +static int flb_output_run_response(flb_ctx_t *ctx, struct flb_output_instance *o_ins, + int status, const void *data, size_t len) +{ + int ret; + void *out_buf = NULL; + size_t out_size = 0; + struct flb_test_out_response *resp; + + if (!o_ins) { + return -1; + } + + resp = &o_ins->test_response; + + /* Invoke the input plugin formatter test callback */ + ret = resp->callback(ctx->config, + o_ins->context, + status, data, len, + &out_buf, &out_size); + + /* Call the runtime test callback checker */ + if (resp->rt_out_response) { + resp->rt_out_response(resp->rt_ctx, + resp->rt_ffd, + ret, + out_buf, out_size, + resp->rt_data); + } + else { + flb_free(out_buf); + } + + return 0; +} + /* Push some data into the Engine */ int flb_lib_push(flb_ctx_t *ctx, int ffd, const void *data, size_t len) { @@ -662,6 +728,29 @@ int flb_lib_push(flb_ctx_t *ctx, int ffd, const void *data, size_t len) return ret; } +/* Emulate some data from the response */ +int flb_lib_response(flb_ctx_t *ctx, int ffd, int status, const void *data, size_t len) +{ + int ret; + struct flb_output_instance *o_ins; + + if (ctx->status == FLB_LIB_NONE || ctx->status == FLB_LIB_ERROR) { + flb_error("[lib] cannot push data, engine is not running"); + return -1; + } + + o_ins = out_instance_get(ctx, ffd); + if (!o_ins) { + return -1; + } + + /* If input's test_formatter is registered, priorize to run it. */ + if (o_ins->test_response.callback != NULL) { + ret = flb_output_run_response(ctx, o_ins, status, data, len); + } + return ret; +} + static void flb_lib_worker(void *data) { int ret; From 1eb324a6407ee8d1a8c3d0ccd6c0460508766fb6 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Wed, 18 Sep 2024 14:06:59 +0900 Subject: [PATCH 10/16] out_es: tests: Add HTTP response testing Signed-off-by: Hiroshi Hatake --- plugins/out_es/es.c | 73 ++++++++++++++ tests/runtime/data/es/json_es.h | 34 +++++++ tests/runtime/out_elasticsearch.c | 161 ++++++++++++++++++++++++++++++ 3 files changed, 268 insertions(+) diff --git a/plugins/out_es/es.c b/plugins/out_es/es.c index b0773a40991..c896b1ed258 100644 --- a/plugins/out_es/es.c +++ b/plugins/out_es/es.c @@ -985,6 +985,78 @@ static void cb_es_flush(struct flb_event_chunk *event_chunk, FLB_OUTPUT_RETURN(FLB_RETRY); } +static int elasticsearch_response_test(struct flb_config *config, + void *plugin_context, + int status, + const void *data, size_t bytes, + void **out_data, size_t *out_size) +{ + int ret = 0; + struct flb_elasticsearch *ctx = plugin_context; + struct flb_connection *u_conn; + struct flb_http_client *c; + size_t b_sent; + + /* Not retrieve upstream connection */ + u_conn = NULL; + + /* Compose HTTP Client request (dummy client) */ + c = flb_http_dummy_client(u_conn, FLB_HTTP_POST, ctx->uri, + NULL, 0, NULL, 0, NULL, 0); + + flb_http_buffer_size(c, ctx->buffer_size); + + /* Just stubbing the HTTP responses */ + flb_http_set_response_test(c, "response", data, bytes, status, NULL, NULL); + + ret = flb_http_do(c, &b_sent); + if (ret != 0) { + flb_plg_warn(ctx->ins, "http_do=%i URI=%s", ret, ctx->uri); + goto error; + } + if (ret != 0) { + flb_plg_warn(ctx->ins, "http_do=%i URI=%s", ret, ctx->uri); + goto error; + } + else { + /* The request was issued successfully, validate the 'error' field */ + flb_plg_debug(ctx->ins, "HTTP Status=%i URI=%s", c->resp.status, ctx->uri); + if (c->resp.status != 200 && c->resp.status != 201) { + if (c->resp.payload_size > 0) { + flb_plg_error(ctx->ins, "HTTP status=%i URI=%s, response:\n%s\n", + c->resp.status, ctx->uri, c->resp.payload); + } + else { + flb_plg_error(ctx->ins, "HTTP status=%i URI=%s", + c->resp.status, ctx->uri); + } + goto error; + } + + if (c->resp.payload_size > 0) { + /* + * Elasticsearch payload should be JSON, we convert it to msgpack + * and lookup the 'error' field. + */ + ret = elasticsearch_error_check(ctx, c); + } + else { + goto error; + } + } + + /* Cleanup */ + flb_http_client_destroy(c); + + return ret; + +error: + /* Cleanup */ + flb_http_client_destroy(c); + + return -2; +} + static int cb_es_exit(void *data, struct flb_config *config) { struct flb_elasticsearch *ctx = data; @@ -1231,6 +1303,7 @@ struct flb_output_plugin out_es_plugin = { /* Test */ .test_formatter.callback = elasticsearch_format, + .test_response.callback = elasticsearch_response_test, /* Plugin flags */ .flags = FLB_OUTPUT_NET | FLB_IO_OPT_TLS, diff --git a/tests/runtime/data/es/json_es.h b/tests/runtime/data/es/json_es.h index 40f8ab1cac4..91348ec47db 100755 --- a/tests/runtime/data/es/json_es.h +++ b/tests/runtime/data/es/json_es.h @@ -15,3 +15,37 @@ #define JSON_DOTS \ "[1448403340," \ "{\".le.vel\":\"error\", \".fo.o\":[{\".o.k\": [{\".b.ar\": \"baz\"}]}]}]" + +#define JSON_RESPONSE_SUCCESSES "{\"errors\":false,\"took\":0,\"items\":[" \ + "{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"dcfJBJIBHhdJuKsoC7Tm\",\"_version\":1,\"result\":\"created\"," \ + "\"_shards\":{\"total\":2,\"successful\":1,\"failed\":0},\"_seq_no\":6,\"_primary_term\":1,\"status\":201}}," \ + "{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"dsfJBJIBHhdJuKsoC7Tm\",\"_version\":1,\"result\":\"created\"," \ + "\"_shards\":{\"total\":2,\"successful\":1,\"failed\":0},\"_seq_no\":7,\"_primary_term\":1,\"status\":201}}," \ + "{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"d8fJBJIBHhdJuKsoC7Tm\",\"_version\":1,\"result\":\"created\"," \ + "\"_shards\":{\"total\":2,\"successful\":1,\"failed\":0},\"_seq_no\":8,\"_primary_term\":1,\"status\":201}}," \ + "{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"eMfJBJIBHhdJuKsoC7Tm\",\"_version\":1,\"result\":\"created\"," \ + "\"_shards\":{\"total\":2,\"successful\":1,\"failed\":0},\"_seq_no\":9,\"_primary_term\":1,\"status\":201}}]}" + +#define JSON_RESPONSE_SUCCESSES_SIZE 783 + +#define JSON_RESPONSE_PARTIALLY_SUCCESS "{\"errors\":true,\"took\":316737025,\"items\":" \ + "[{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"hxELapEB_XqxG5Ydupgb\",\"_version\":1,\"result\":\"created\"," \ + "\"_shards\":{\"total\":2,\"successful\":1,\"failed\":0},\"_seq_no\":7,\"_primary_term\":1,\"status\":201}}," \ + "{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"iBELapEB_XqxG5Ydupgb\",\"status\":400," \ + "\"error\":{\"type\":\"document_parsing_exception\"," \ + "\"reason\":\"[1:65] failed to parse field [_id] of type [_id] in document with id 'iBELapEB_XqxG5Ydupgb'. " \ + "Preview of field's value: 'fhHraZEB_XqxG5Ydzpjv'\"," \ + "\"caused_by\":{\"type\":\"document_parsing_exception\"," \ + "\"reason\":\"[1:65] Field [_id] is a metadata field and cannot be added inside a document. " \ + "Use the index API request parameters.\"}}}}," \ + "{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"iRELapEB_XqxG5Ydupgb\",\"status\":400," \ + "\"error\":{\"type\":\"document_parsing_exception\"," \ + "\"reason\":\"[1:65] failed to parse field [_id] of type [_id] in document with id 'iRELapEB_XqxG5Ydupgb'. " \ + "Preview of field's value: 'fhHraZEB_XqxG5Ydzpjv'\"," \ + "\"caused_by\":{\"type\":\"document_parsing_exception\"," \ + "\"reason\":\"[1:65] Field [_id] is a metadata field and cannot be added inside a document. " \ + "Use the index API request parameters.\"}}}}," \ + "{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"ihELapEB_XqxG5Ydupgb\",\"_version\":1,\"result\":\"created\"," \ + "\"_shards\":{\"total\":2,\"successful\":1,\"failed\":0},\"_seq_no\":8,\"_primary_term\":1,\"status\":201}}]}" + +#define JSON_RESPONSE_PARTIALLY_SUCCESS_SIZE 1322 diff --git a/tests/runtime/out_elasticsearch.c b/tests/runtime/out_elasticsearch.c index 9efe7610a95..eac72cbf321 100644 --- a/tests/runtime/out_elasticsearch.c +++ b/tests/runtime/out_elasticsearch.c @@ -799,6 +799,164 @@ void flb_test_logstash_prefix_separator() flb_destroy(ctx); } +static void cb_check_response_success(void *ctx, int ffd, + int res_ret, void *res_data, + size_t res_size, void *data) +{ + TEST_CHECK(res_ret == 1); +} + +void flb_test_response_success() +{ + int ret; + char *response = "{\"took\":1,\"errors\":false,\"items\":[]}"; + int size = 37; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Elasticsearch output */ + out_ffd = flb_output(ctx, (char *) "es", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + NULL); + + /* Override defaults of index and type */ + flb_output_set(ctx, out_ffd, + "write_operation", "create", + NULL); + + /* Enable test mode */ + ret = flb_output_set_http_test(ctx, out_ffd, "response", + cb_check_response_success, + NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + ret = flb_lib_response(ctx, out_ffd, 200, response, size); + TEST_CHECK(ret == 0); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_response_successes() +{ + int ret; + char *response = JSON_RESPONSE_SUCCESSES; + int size = JSON_RESPONSE_SUCCESSES_SIZE; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Elasticsearch output */ + out_ffd = flb_output(ctx, (char *) "es", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + NULL); + + /* Override defaults of index and type */ + flb_output_set(ctx, out_ffd, + "write_operation", "create", + NULL); + + /* Enable test mode */ + ret = flb_output_set_http_test(ctx, out_ffd, "response", + cb_check_response_success, + NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + ret = flb_lib_response(ctx, out_ffd, 200, response, size); + TEST_CHECK(ret == 0); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +static void cb_check_response_partially_success(void *ctx, int ffd, + int res_ret, void *res_data, + size_t res_size, void *data) +{ + int composed_ret = 0; + composed_ret |= (1 << 0); + composed_ret |= (1 << 7); + + TEST_CHECK(res_ret == composed_ret); + /* Check whether contains a success flag or not */ + TEST_CHECK((res_ret & (1 << 0))); +} + +void flb_test_response_partially_success() +{ + int ret; + char *response = JSON_RESPONSE_PARTIALLY_SUCCESS; + int size = JSON_RESPONSE_PARTIALLY_SUCCESS_SIZE; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Elasticsearch output */ + out_ffd = flb_output(ctx, (char *) "es", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + NULL); + + /* Override defaults of index and type */ + flb_output_set(ctx, out_ffd, + "write_operation", "create", + NULL); + + /* Enable test mode */ + ret = flb_output_set_http_test(ctx, out_ffd, "response", + cb_check_response_partially_success, + NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + ret = flb_lib_response(ctx, out_ffd, 200, response, size); + TEST_CHECK(ret == 0); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + /* Test list */ TEST_LIST = { {"long_index" , flb_test_long_index }, @@ -814,5 +972,8 @@ TEST_LIST = { {"replace_dots" , flb_test_replace_dots }, {"id_key" , flb_test_id_key }, {"logstash_prefix_separator" , flb_test_logstash_prefix_separator }, + {"response_success" , flb_test_response_success }, + {"response_successes", flb_test_response_successes }, + {"response_partially_success" , flb_test_response_partially_success }, {NULL, NULL} }; From ff584f5c7fa3e4fcd0df4e3bbca8650428270f6f Mon Sep 17 00:00:00 2001 From: Ankur Patel Date: Tue, 26 Nov 2024 18:43:42 +0530 Subject: [PATCH 11/16] in_http: use 'tag_key' option when json array is received When a json array is received by http input, it doesn't use the 'tag_key' option and always sets the tag to 'http.N'. So this fixes the bug and also adds test cases to test for both json object and json array. Signed-off-by: Ankur Patel --- plugins/in_http/http_prot.c | 5 +++++ tests/runtime/in_http.c | 17 ++++++++++++++--- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/plugins/in_http/http_prot.c b/plugins/in_http/http_prot.c index 175fe4461e1..4e2aa6a761c 100644 --- a/plugins/in_http/http_prot.c +++ b/plugins/in_http/http_prot.c @@ -843,6 +843,11 @@ static int process_pack_ng(struct flb_http *ctx, flb_sds_t tag, char *buf, size_ { record = obj->via.array.ptr[i]; + tag_from_record = NULL; + if (ctx->tag_key) { + tag_from_record = tag_key(ctx, &record); + } + if (tag_from_record) { ret = process_pack_record(ctx, &tm, tag_from_record, &record); flb_sds_destroy(tag_from_record); diff --git a/tests/runtime/in_http.c b/tests/runtime/in_http.c index d4d88faadf8..66ddaea5230 100644 --- a/tests/runtime/in_http.c +++ b/tests/runtime/in_http.c @@ -588,7 +588,7 @@ void flb_test_http_failure_400_bad_disk_write() test_ctx_destroy(ctx); } -void flb_test_http_tag_key() +void test_http_tag_key(char *input) { struct flb_lib_out_cb cb_data; struct test_ctx *ctx; @@ -597,7 +597,7 @@ void flb_test_http_tag_key() int num; size_t b_sent; - char *buf = "{\"test\":\"msg\", \"tag\":\"new_tag\"}"; + char *buf = input; clear_output_num(); @@ -661,12 +661,23 @@ void flb_test_http_tag_key() test_ctx_destroy(ctx); } +void flb_test_http_tag_key_with_map_input() +{ + test_http_tag_key("{\"tag\":\"new_tag\",\"test\":\"msg\"}"); +} + +void flb_test_http_tag_key_with_array_input() +{ + test_http_tag_key("[{\"tag\":\"new_tag\",\"test\":\"msg\"}]"); +} + TEST_LIST = { {"http", flb_test_http}, {"successful_response_code_200", flb_test_http_successful_response_code_200}, {"successful_response_code_204", flb_test_http_successful_response_code_204}, {"failure_response_code_400_bad_json", flb_test_http_failure_400_bad_json}, {"failure_response_code_400_bad_disk_write", flb_test_http_failure_400_bad_disk_write}, - {"tag_key", flb_test_http_tag_key}, + {"tag_key_with_map_input", flb_test_http_tag_key_with_map_input}, + {"tag_key_with_array_input", flb_test_http_tag_key_with_array_input}, {NULL, NULL} }; From 6654681a4713f7aedae2edb71f14ffb5daba3116 Mon Sep 17 00:00:00 2001 From: jomillerOpen Date: Thu, 19 Sep 2024 17:39:57 -0400 Subject: [PATCH 12/16] network: Update struct type for sock_addr This allows for expanded storage to handle IPV6 addresses. Signed-off-by: jomillerOpen --- src/flb_network.c | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/flb_network.c b/src/flb_network.c index 8f8ca33f602..11efbb7d60b 100644 --- a/src/flb_network.c +++ b/src/flb_network.c @@ -1807,10 +1807,16 @@ int flb_net_bind_udp(flb_sockfd_t fd, const struct sockaddr *addr, flb_sockfd_t flb_net_accept(flb_sockfd_t server_fd) { flb_sockfd_t remote_fd; - struct sockaddr sock_addr; - socklen_t socket_size = sizeof(struct sockaddr); - - // return accept(server_fd, &sock_addr, &socket_size); + struct sockaddr_storage sock_addr = { 0 }; + socklen_t socket_size = sizeof(sock_addr); + + /* + * sock_addr used to be a sockaddr struct, but this was too + * small of a structure to handle IPV6 addresses (#9053). + * This would cause accept() to not accept the connection (with no error), + * and a loop would occur continually trying to accept the connection. + * The sockaddr_storage can handle both IPV4 and IPV6. + */ #ifdef FLB_HAVE_ACCEPT4 remote_fd = accept4(server_fd, &sock_addr, &socket_size, From a59c8679248b26bec81ed653306996d93b6e711c Mon Sep 17 00:00:00 2001 From: jomillerOpen Date: Wed, 2 Oct 2024 17:16:56 -0400 Subject: [PATCH 13/16] network: Update struct type being passed into accept Changed (based on an example) the struct declaration being passed to accept to fix the CI pipeline. Signed-off-by: jomillerOpen --- src/flb_network.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/flb_network.c b/src/flb_network.c index 11efbb7d60b..e53030ba742 100644 --- a/src/flb_network.c +++ b/src/flb_network.c @@ -1819,10 +1819,10 @@ flb_sockfd_t flb_net_accept(flb_sockfd_t server_fd) */ #ifdef FLB_HAVE_ACCEPT4 - remote_fd = accept4(server_fd, &sock_addr, &socket_size, + remote_fd = accept4(server_fd, (struct sockaddr*)&sock_addr, &socket_size, SOCK_NONBLOCK | SOCK_CLOEXEC); #else - remote_fd = accept(server_fd, &sock_addr, &socket_size); + remote_fd = accept(server_fd, (struct sockaddr*)&sock_addr, &socket_size); flb_net_socket_nonblocking(remote_fd); #endif From 361b5b9b575a070192ca9f22e9d9b4268c4d321a Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 28 Nov 2024 16:47:28 +0200 Subject: [PATCH 14/16] release: update to 3.2.3 (#9665) Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: edsiper <369718+edsiper@users.noreply.github.com> --- CMakeLists.txt | 2 +- dockerfiles/Dockerfile | 2 +- fluent-bit-3.2.2.bb => fluent-bit-3.2.3.bb | 2 +- snap/snapcraft.yaml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) rename fluent-bit-3.2.2.bb => fluent-bit-3.2.3.bb (99%) diff --git a/CMakeLists.txt b/CMakeLists.txt index e52e3e55832..de5b5973974 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -8,7 +8,7 @@ set(CMAKE_POLICY_DEFAULT_CMP0069 NEW) # Fluent Bit Version set(FLB_VERSION_MAJOR 3) set(FLB_VERSION_MINOR 2) -set(FLB_VERSION_PATCH 2) +set(FLB_VERSION_PATCH 3) set(FLB_VERSION_STR "${FLB_VERSION_MAJOR}.${FLB_VERSION_MINOR}.${FLB_VERSION_PATCH}") set(CMAKE_POSITION_INDEPENDENT_CODE ON) diff --git a/dockerfiles/Dockerfile b/dockerfiles/Dockerfile index 0570871cd66..d0fc7fd3845 100644 --- a/dockerfiles/Dockerfile +++ b/dockerfiles/Dockerfile @@ -13,7 +13,7 @@ # docker buildx build --platform "linux/amd64,linux/arm64,linux/arm/v7,linux/s390x" -f ./dockerfiles/Dockerfile.multiarch --build-arg FLB_TARBALL=https://github.com/fluent/fluent-bit/archive/v1.8.11.tar.gz ./dockerfiles/ # Set this to the current release version: it gets done so as part of the release. -ARG RELEASE_VERSION=3.2.2 +ARG RELEASE_VERSION=3.2.3 # For multi-arch builds - assumption is running on an AMD64 host FROM multiarch/qemu-user-static:x86_64-arm AS qemu-arm32 diff --git a/fluent-bit-3.2.2.bb b/fluent-bit-3.2.3.bb similarity index 99% rename from fluent-bit-3.2.2.bb rename to fluent-bit-3.2.3.bb index 58b3d1f0a22..9f64a38cc17 100644 --- a/fluent-bit-3.2.2.bb +++ b/fluent-bit-3.2.3.bb @@ -16,7 +16,7 @@ LIC_FILES_CHKSUM = "file://LICENSE;md5=2ee41112a44fe7014dce33e26468ba93" SECTION = "net" PR = "r0" -PV = "3.2.2" +PV = "3.2.3" SRCREV = "v${PV}" SRC_URI = "git://github.com/fluent/fluent-bit.git;nobranch=1" diff --git a/snap/snapcraft.yaml b/snap/snapcraft.yaml index fa0f27fd896..584dd52571e 100644 --- a/snap/snapcraft.yaml +++ b/snap/snapcraft.yaml @@ -1,6 +1,6 @@ name: fluent-bit base: core18 -version: '3.2.2' +version: '3.2.3' summary: High performance logs and stream processor description: | Fluent Bit is a high performance log processor and stream processor for Linux. From 4acffc2ce8bc486b9c0e407b17850e5a9a1e36d9 Mon Sep 17 00:00:00 2001 From: Jorge Niedbalski Date: Thu, 28 Nov 2024 17:24:58 +0100 Subject: [PATCH 15/16] out_calyptia: retry agent registration on flush callback (#9656) * out_calyptia: retry registering agent on flush. if register_retry_on_flush is set (default true), agent registration is retried on each flush callback. if set to false then registration will cause to abort the plugin initialisation. Signed-off-by: Jorge Niedbalski * custom_calyptia: cascade register_retry_on_flush variables. Signed-off-by: Jorge Niedbalski --------- Signed-off-by: Jorge Niedbalski Co-authored-by: Jorge Niedbalski --- plugins/custom_calyptia/calyptia.c | 12 +- plugins/custom_calyptia/calyptia.h | 1 + plugins/out_calyptia/calyptia.c | 238 +++++++------ plugins/out_calyptia/calyptia.h | 1 + tests/runtime/CMakeLists.txt | 38 ++- .../custom_calyptia_registration_retry_test.c | 313 ++++++++++++++++++ 6 files changed, 492 insertions(+), 111 deletions(-) create mode 100644 tests/runtime/custom_calyptia_registration_retry_test.c diff --git a/plugins/custom_calyptia/calyptia.c b/plugins/custom_calyptia/calyptia.c index 76f5868efdc..bfbb42f4767 100644 --- a/plugins/custom_calyptia/calyptia.c +++ b/plugins/custom_calyptia/calyptia.c @@ -293,6 +293,12 @@ static struct flb_output_instance *setup_cloud_output(struct flb_config *config, flb_output_set_property(cloud, "match", "_calyptia_cloud"); flb_output_set_property(cloud, "api_key", ctx->api_key); + if (ctx->register_retry_on_flush) { + flb_output_set_property(cloud, "register_retry_on_flush", "true"); + } else { + flb_output_set_property(cloud, "register_retry_on_flush", "false"); + } + if (ctx->store_path) { flb_output_set_property(cloud, "store_path", ctx->store_path); } @@ -585,7 +591,11 @@ static struct flb_config_map config_map[] = { "Pipeline ID for reporting to calyptia cloud." }, #endif /* FLB_HAVE_CHUNK_TRACE */ - + { + FLB_CONFIG_MAP_BOOL, "register_retry_on_flush", "true", + 0, FLB_TRUE, offsetof(struct calyptia, register_retry_on_flush), + "Retry agent registration on flush if failed on init." + }, /* EOF */ {0} }; diff --git a/plugins/custom_calyptia/calyptia.h b/plugins/custom_calyptia/calyptia.h index e1f4dd36770..b4313f51182 100644 --- a/plugins/custom_calyptia/calyptia.h +++ b/plugins/custom_calyptia/calyptia.h @@ -53,6 +53,7 @@ struct calyptia { flb_sds_t fleet_max_http_buffer_size; flb_sds_t fleet_interval_sec; flb_sds_t fleet_interval_nsec; + bool register_retry_on_flush; /* retry registration on flush if failed */ }; int set_fleet_input_properties(struct calyptia *ctx, struct flb_input_instance *fleet); diff --git a/plugins/out_calyptia/calyptia.c b/plugins/out_calyptia/calyptia.c index aa760e2fa5e..c16fd31d6ee 100644 --- a/plugins/out_calyptia/calyptia.c +++ b/plugins/out_calyptia/calyptia.c @@ -322,8 +322,24 @@ static int calyptia_http_do(struct flb_calyptia *ctx, struct flb_http_client *c, int ret; size_t b_sent; + if( !ctx || !c ) { + return FLB_ERROR; + } + + /* Ensure agent_token is not empty when required */ + if ((type == CALYPTIA_ACTION_METRICS || type == CALYPTIA_ACTION_PATCH || type == CALYPTIA_ACTION_TRACE) && + !ctx->agent_token) { + flb_plg_warn(ctx->ins, "agent_token is missing for action type %d", type); + return FLB_ERROR; + } + /* append headers */ if (type == CALYPTIA_ACTION_REGISTER) { + // When registering a new agent api key is required + if (!ctx->api_key) { + flb_plg_error(ctx->ins, "api_key is missing"); + return FLB_ERROR; + } flb_http_add_header(c, CALYPTIA_H_CTYPE, sizeof(CALYPTIA_H_CTYPE) - 1, CALYPTIA_H_CTYPE_JSON, sizeof(CALYPTIA_H_CTYPE_JSON) - 1); @@ -721,6 +737,21 @@ static struct flb_calyptia *config_init(struct flb_output_instance *ins, return NULL; } + ctx->metrics_endpoint = flb_sds_create_size(256); + if (!ctx->metrics_endpoint) { + flb_free(ctx); + return NULL; + } + +#ifdef FLB_HAVE_CHUNK_TRACE + ctx->trace_endpoint = flb_sds_create_size(256); + if (!ctx->trace_endpoint) { + flb_sds_destroy(ctx->metrics_endpoint); + flb_free(ctx); + return NULL; + } +#endif + /* api_key */ if (!ctx->api_key) { flb_plg_error(ctx->ins, "configuration 'api_key' is missing"); @@ -771,12 +802,40 @@ static struct flb_calyptia *config_init(struct flb_output_instance *ins, return ctx; } -static int cb_calyptia_init(struct flb_output_instance *ins, - struct flb_config *config, void *data) +static int register_agent(struct flb_calyptia *ctx, struct flb_config *config) { int ret; + + /* Try registration */ + ret = api_agent_create(config, ctx); + if (ret != FLB_OK) { + flb_plg_warn(ctx->ins, "agent registration failed"); + return FLB_ERROR; + } + + /* Update endpoints */ + flb_sds_len_set(ctx->metrics_endpoint, 0); + flb_sds_printf(&ctx->metrics_endpoint, CALYPTIA_ENDPOINT_METRICS, + ctx->agent_id); + +#ifdef FLB_HAVE_CHUNK_TRACE + if (ctx->pipeline_id) { + flb_sds_len_set(ctx->trace_endpoint, 0); + flb_sds_printf(&ctx->trace_endpoint, CALYPTIA_ENDPOINT_TRACE, + ctx->pipeline_id); + } +#endif + + flb_plg_info(ctx->ins, "agent registration successful"); + return FLB_OK; +} + +static int cb_calyptia_init(struct flb_output_instance *ins, + struct flb_config *config, void *data) +{ struct flb_calyptia *ctx; (void) data; + int ret; /* create config context */ ctx = config_init(ins, config); @@ -791,23 +850,12 @@ static int cb_calyptia_init(struct flb_output_instance *ins, */ flb_output_set_http_debug_callbacks(ins); - /* register/update agent */ - ret = api_agent_create(config, ctx); - if (ret != FLB_OK) { - flb_plg_error(ctx->ins, "agent registration failed"); + ret = register_agent(ctx, config); + if (ret != FLB_OK && !ctx->register_retry_on_flush) { + flb_plg_error(ins, "agent registration failed and register_retry_on_flush=false"); return -1; } - /* metrics endpoint */ - ctx->metrics_endpoint = flb_sds_create_size(256); - flb_sds_printf(&ctx->metrics_endpoint, CALYPTIA_ENDPOINT_METRICS, - ctx->agent_id); - -#ifdef FLB_HAVE_CHUNK_TRACE - ctx->trace_endpoint = flb_sds_create_size(256); - flb_sds_printf(&ctx->trace_endpoint, CALYPTIA_ENDPOINT_TRACE, - ctx->pipeline_id); -#endif /* FLB_HAVE_CHUNK_TRACE */ return 0; } @@ -830,29 +878,79 @@ static void debug_payload(struct flb_calyptia *ctx, void *data, size_t bytes) cmt_destroy(cmt); } -static void cb_calyptia_flush(struct flb_event_chunk *event_chunk, - struct flb_output_flush *out_flush, - struct flb_input_instance *i_ins, - void *out_context, - struct flb_config *config) +static int cb_calyptia_exit(void *data, struct flb_config *config) { - int ret = FLB_RETRY; - size_t off = 0; - size_t out_size = 0; - char *out_buf = NULL; + struct flb_calyptia *ctx = data; + + if (!ctx) { + return 0; + } + + if (ctx->u) { + flb_upstream_destroy(ctx->u); + } + + if (ctx->agent_id) { + flb_sds_destroy(ctx->agent_id); + } + + if (ctx->agent_token) { + flb_sds_destroy(ctx->agent_token); + } + + if (ctx->env) { + flb_env_destroy(ctx->env); + } + + if (ctx->metrics_endpoint) { + flb_sds_destroy(ctx->metrics_endpoint); + } -/* used to create records for reporting traces to the cloud. */ #ifdef FLB_HAVE_CHUNK_TRACE - flb_sds_t json; + if (ctx->trace_endpoint) { + flb_sds_destroy(ctx->trace_endpoint); + } #endif /* FLB_HAVE_CHUNK_TRACE */ + if (ctx->fs) { + flb_fstore_destroy(ctx->fs); + } + + flb_kv_release(&ctx->kv_labels); + flb_free(ctx); + + return 0; +} + +static void cb_calyptia_flush(struct flb_event_chunk *event_chunk, + struct flb_output_flush *out_flush, + struct flb_input_instance *i_ins, + void *out_context, + struct flb_config *config) +{ + int ret; + size_t off = 0; + size_t out_size = 0; + char *out_buf = NULL; struct flb_connection *u_conn; struct flb_http_client *c = NULL; struct flb_calyptia *ctx = out_context; struct cmt *cmt; + flb_sds_t json; (void) i_ins; (void) config; + if ((!ctx->agent_id || !ctx->agent_token) && ctx->register_retry_on_flush) { + flb_plg_info(ctx->ins, "missing agent_id or agent_token, attempting re-registration register_retry_on_flush=true"); + if (register_agent(ctx, config) != FLB_OK) { + FLB_OUTPUT_RETURN(FLB_RETRY); + } + } + else if (!ctx->agent_id || !ctx->agent_token) { + flb_plg_error(ctx->ins, "missing agent_id or agent_token, and register_retry_on_flush=false"); + FLB_OUTPUT_RETURN(FLB_ERROR); + } + /* Get upstream connection */ u_conn = flb_upstream_conn_get(ctx->u); if (!u_conn) { @@ -890,7 +988,7 @@ static void cb_calyptia_flush(struct flb_event_chunk *event_chunk, /* Compose HTTP Client request */ c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->metrics_endpoint, - out_buf, out_size, NULL, 0, NULL, 0); + out_buf, out_size, NULL, 0, NULL, 0); if (!c) { if (out_buf != event_chunk->data) { cmt_encode_msgpack_destroy(out_buf); @@ -899,12 +997,12 @@ static void cb_calyptia_flush(struct flb_event_chunk *event_chunk, FLB_OUTPUT_RETURN(FLB_RETRY); } - /* perform request: 'ret' might be FLB_OK, FLB_ERROR or FLB_RETRY */ + /* perform request */ ret = calyptia_http_do(ctx, c, CALYPTIA_ACTION_METRICS); if (ret == FLB_OK) { flb_plg_debug(ctx->ins, "metrics delivered OK"); } - else if (ret == FLB_ERROR) { + else { flb_plg_error(ctx->ins, "could not deliver metrics"); debug_payload(ctx, out_buf, out_size); } @@ -915,42 +1013,35 @@ static void cb_calyptia_flush(struct flb_event_chunk *event_chunk, } #ifdef FLB_HAVE_CHUNK_TRACE - if (event_chunk->type == (FLB_EVENT_TYPE_LOGS | FLB_EVENT_TYPE_HAS_TRACE)) { + if (event_chunk->type & FLB_EVENT_TYPE_LOGS && + event_chunk->type & FLB_EVENT_TYPE_HAS_TRACE) { json = flb_pack_msgpack_to_json_format(event_chunk->data, - event_chunk->size, - FLB_PACK_JSON_FORMAT_STREAM, - FLB_PACK_JSON_DATE_DOUBLE, - NULL); + event_chunk->size, + FLB_PACK_JSON_FORMAT_STREAM, + FLB_PACK_JSON_DATE_DOUBLE, + NULL); if (json == NULL) { flb_upstream_conn_release(u_conn); FLB_OUTPUT_RETURN(FLB_RETRY); } - out_buf = (char *)json; - out_size = flb_sds_len(json); - if (flb_sds_printf(&ctx->metrics_endpoint, CALYPTIA_ENDPOINT_METRICS, - ctx->agent_id) == NULL) { - flb_upstream_conn_release(u_conn); - flb_sds_destroy(json); - FLB_OUTPUT_RETURN(FLB_RETRY); - } c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->trace_endpoint, - out_buf, out_size, NULL, 0, NULL, 0); + (char *) json, flb_sds_len(json), + NULL, 0, NULL, 0); + if (!c) { flb_upstream_conn_release(u_conn); flb_sds_destroy(json); - flb_sds_destroy(ctx->metrics_endpoint); FLB_OUTPUT_RETURN(FLB_RETRY); } - /* perform request: 'ret' might be FLB_OK, FLB_ERROR or FLB_RETRY */ ret = calyptia_http_do(ctx, c, CALYPTIA_ACTION_TRACE); if (ret == FLB_OK) { flb_plg_debug(ctx->ins, "trace delivered OK"); } - else if (ret == FLB_ERROR) { + else { flb_plg_error(ctx->ins, "could not deliver trace"); - debug_payload(ctx, out_buf, out_size); + debug_payload(ctx, (char *) json, flb_sds_len(json)); } flb_sds_destroy(json); } @@ -961,51 +1052,8 @@ static void cb_calyptia_flush(struct flb_event_chunk *event_chunk, if (c) { flb_http_client_destroy(c); } - FLB_OUTPUT_RETURN(ret); -} - -static int cb_calyptia_exit(void *data, struct flb_config *config) -{ - struct flb_calyptia *ctx = data; - - if (!ctx) { - return 0; - } - - if (ctx->u) { - flb_upstream_destroy(ctx->u); - } - - if (ctx->agent_id) { - flb_sds_destroy(ctx->agent_id); - } - - if (ctx->agent_token) { - flb_sds_destroy(ctx->agent_token); - } - - if (ctx->env) { - flb_env_destroy(ctx->env); - } - - if (ctx->metrics_endpoint) { - flb_sds_destroy(ctx->metrics_endpoint); - } - -#ifdef FLB_HAVE_CHUNK_TRACE - if (ctx->trace_endpoint) { - flb_sds_destroy(ctx->trace_endpoint); - } -#endif /* FLB_HAVE_CHUNK_TRACE */ - - if (ctx->fs) { - flb_fstore_destroy(ctx->fs); - } - - flb_kv_release(&ctx->kv_labels); - flb_free(ctx); - return 0; + FLB_OUTPUT_RETURN(ret); } /* Configuration properties map */ @@ -1057,7 +1105,11 @@ static struct flb_config_map config_map[] = { "Pipeline ID for calyptia core traces." }, #endif - + { + FLB_CONFIG_MAP_BOOL, "register_retry_on_flush", "true", + 0, FLB_TRUE, offsetof(struct flb_calyptia, register_retry_on_flush), + "Retry agent registration on flush if failed on init." + }, /* EOF */ {0} }; diff --git a/plugins/out_calyptia/calyptia.h b/plugins/out_calyptia/calyptia.h index ee37d8778dc..0532e4a2b01 100644 --- a/plugins/out_calyptia/calyptia.h +++ b/plugins/out_calyptia/calyptia.h @@ -80,6 +80,7 @@ struct flb_calyptia { flb_sds_t trace_endpoint; flb_sds_t pipeline_id; #endif /* FLB_HAVE_CHUNK_TRACE */ + bool register_retry_on_flush; /* retry registration on flush if failed */ }; #endif diff --git a/tests/runtime/CMakeLists.txt b/tests/runtime/CMakeLists.txt index f355294ed9f..09dce13c15e 100644 --- a/tests/runtime/CMakeLists.txt +++ b/tests/runtime/CMakeLists.txt @@ -61,29 +61,37 @@ if(FLB_OUT_LIB) endif() if (FLB_CUSTOM_CALYPTIA) - # Define common variables for calyptia tests set(CALYPTIA_TEST_LINK_LIBS fluent-bit-static ${CMAKE_THREAD_LIBS_INIT} ) - # Add calyptia input properties test - set(TEST_TARGET "flb-rt-calyptia_input_properties") - add_executable(${TEST_TARGET} + set(CALYPTIA_TESTS + "custom_calyptia_test.c" + "custom_calyptia_registration_retry_test.c" "custom_calyptia_input_test.c" - "../../plugins/custom_calyptia/calyptia.c" ) - target_link_libraries(${TEST_TARGET} - ${CALYPTIA_TEST_LINK_LIBS} - ) + foreach(TEST_SOURCE ${CALYPTIA_TESTS}) + get_filename_component(TEST_NAME ${TEST_SOURCE} NAME_WE) - add_test(NAME ${TEST_TARGET} - COMMAND ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/${TEST_TARGET} - WORKING_DIRECTORY ${CMAKE_HOME_DIRECTORY}/build) + set(TEST_TARGET "flb-rt-${TEST_NAME}") + add_executable(${TEST_TARGET} + ${TEST_SOURCE} + "../../plugins/custom_calyptia/calyptia.c" + ) + + target_link_libraries(${TEST_TARGET} + ${CALYPTIA_TEST_LINK_LIBS} + ) - set_tests_properties(${TEST_TARGET} PROPERTIES LABELS "runtime") - add_dependencies(${TEST_TARGET} fluent-bit-static) + add_test(NAME ${TEST_TARGET} + COMMAND ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/${TEST_TARGET} + WORKING_DIRECTORY ${CMAKE_HOME_DIRECTORY}/build) + + set_tests_properties(${TEST_TARGET} PROPERTIES LABELS "runtime") + add_dependencies(${TEST_TARGET} fluent-bit-static) + endforeach() endif() if(FLB_IN_EBPF) @@ -222,10 +230,6 @@ if(FLB_IN_LIB) endif() -if (FLB_CUSTOM_CALYPTIA) - FLB_RT_TEST(FLB_CUSTOM_CALYPTIA "custom_calyptia_test.c") -endif() - if (FLB_PROCESSOR_METRICS_SELECTOR) FLB_RT_TEST(FLB_PROCESSOR_METRICS_SELECTOR "processor_metrics_selector.c") endif() diff --git a/tests/runtime/custom_calyptia_registration_retry_test.c b/tests/runtime/custom_calyptia_registration_retry_test.c new file mode 100644 index 00000000000..db3ea101e25 --- /dev/null +++ b/tests/runtime/custom_calyptia_registration_retry_test.c @@ -0,0 +1,313 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +#include +#include +#include +#include +#include +#include +#include + +#include "flb_tests_runtime.h" + +#define MOCK_SERVER_HOST "127.0.0.1" +#define MOCK_SERVER_PORT 9876 + +static int registration_count = 0; + +static void mock_server_cb_empty_token(mk_request_t *request, void *data) +{ + registration_count++; + if (registration_count == 1) { + /* Use a local buffer with correct size */ + const char *response = "{\"id\":\"test-id\"}"; + size_t response_len = strlen(response); // Ensure size is accurate + + mk_http_status(request, 200); + mk_http_header(request, "Content-Type", sizeof("Content-Type") - 1, + "application/json", sizeof("application/json") - 1); + mk_http_send(request, response, response_len, NULL); // Use response_len + } else { + mk_http_status(request, 500); + mk_http_header(request, "Content-Type", sizeof("Content-Type") - 1, + "text/plain", sizeof("text/plain") - 1); + mk_http_send(request, "Internal Server Error", sizeof("Internal Server Error") - 1, NULL); + } + mk_http_done(request); +} + +static void mock_server_cb(mk_request_t *request, void *data) +{ + registration_count++; + mk_http_status(request, 500); + mk_http_header(request, "Content-Type", sizeof("Content-Type") - 1, + "text/plain", sizeof("text/plain") - 1); + mk_http_send(request, "Internal Server Error", sizeof("Internal Server Error") - 1, NULL); + mk_http_done(request); +} + +/* Test function */ +void test_calyptia_register_retry() +{ + flb_ctx_t *ctx; + int ret; + int in_ffd; + mk_ctx_t *mock_ctx; + int vid; + char tmp[256]; + struct flb_custom_instance *calyptia; + + /* Reset registration count */ + registration_count = 0; + + /* Init mock server */ + mock_ctx = mk_create(); + TEST_CHECK(mock_ctx != NULL); + + /* Compose listen address */ + snprintf(tmp, sizeof(tmp) - 1, "%s:%d", MOCK_SERVER_HOST, MOCK_SERVER_PORT); + ret = mk_config_set(mock_ctx, "Listen", tmp, NULL); + TEST_CHECK(ret == 0); + + vid = mk_vhost_create(mock_ctx, NULL); + TEST_CHECK(vid >= 0); + + ret = mk_vhost_handler(mock_ctx, vid, "/v1/agents", mock_server_cb, NULL); + TEST_CHECK(ret == 0); + + ret = mk_vhost_handler(mock_ctx, vid, "/v1/agents/test-id", mock_server_cb, NULL); + TEST_CHECK(ret == 0); + + ret = mk_start(mock_ctx); + TEST_CHECK(ret == 0); + + flb_time_msleep(500); // Allow the mock server to initialize + + /* Init Fluent Bit context */ + ctx = flb_create(); + TEST_CHECK(ctx != NULL); + + ret = flb_service_set(ctx, + "Log_Level", "debug", + NULL); + TEST_CHECK(ret == 0); + + /* Create dummy input */ + in_ffd = flb_input(ctx, (char *)"dummy", NULL); + TEST_CHECK(in_ffd >= 0); + + /* Create custom Calyptia plugin */ + calyptia = flb_custom_new(ctx->config, (char *)"calyptia", NULL); + TEST_CHECK(calyptia != NULL); + + /* Set custom plugin properties */ + flb_custom_set_property(calyptia, "api_key", "test-key"); + flb_custom_set_property(calyptia, "log_level", "debug"); + flb_custom_set_property(calyptia, "add_label", "pipeline_id test-pipeline-id"); + flb_custom_set_property(calyptia, "calyptia_host", MOCK_SERVER_HOST); + flb_custom_set_property(calyptia, "calyptia_port", "9876"); + flb_custom_set_property(calyptia, "register_retry_on_flush", "true"); + flb_custom_set_property(calyptia, "calyptia_tls", "off"); + flb_custom_set_property(calyptia, "calyptia_tls.verify", "off"); + + /* Start the engine */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* First registration attempt should have failed */ + TEST_CHECK(registration_count == 1); + + flb_time_msleep(1000); + flb_lib_push(ctx, in_ffd, "{\"key\":\"val\"}", 13); + + /* Wait for processing */ + flb_time_msleep(10000); + TEST_CHECK(registration_count > 1); + + /* Cleanup */ + flb_stop(ctx); + flb_destroy(ctx); + mk_stop(mock_ctx); + mk_destroy(mock_ctx); +} + +static void test_calyptia_register_retry_empty_token() +{ + flb_ctx_t *ctx; + int ret; + int in_ffd; + mk_ctx_t *mock_ctx; + int vid; + char tmp[256]; + struct flb_custom_instance *calyptia; + + /* Reset registration count */ + registration_count = 0; + + /* Init mock server */ + mock_ctx = mk_create(); + TEST_CHECK(mock_ctx != NULL); + + /* Compose listen address */ + snprintf(tmp, sizeof(tmp) - 1, "%s:%d", MOCK_SERVER_HOST, MOCK_SERVER_PORT); + ret = mk_config_set(mock_ctx, "Listen", tmp, NULL); + TEST_CHECK(ret == 0); + + vid = mk_vhost_create(mock_ctx, NULL); + TEST_CHECK(vid >= 0); + + ret = mk_vhost_handler(mock_ctx, vid, "/v1/agents", mock_server_cb_empty_token, NULL); + TEST_CHECK(ret == 0); + + ret = mk_vhost_handler(mock_ctx, vid, "/v1/agents/test-id", mock_server_cb_empty_token, NULL); + TEST_CHECK(ret == 0); + + ret = mk_start(mock_ctx); + TEST_CHECK(ret == 0); + + flb_time_msleep(500); // Allow the mock server to initialize + + /* Init Fluent Bit context */ + ctx = flb_create(); + TEST_CHECK(ctx != NULL); + + ret = flb_service_set(ctx, + "Log_Level", "debug", + NULL); + TEST_CHECK(ret == 0); + + /* Create dummy input */ + in_ffd = flb_input(ctx, (char *)"dummy", NULL); + TEST_CHECK(in_ffd >= 0); + + /* Create custom Calyptia plugin */ + calyptia = flb_custom_new(ctx->config, (char *)"calyptia", NULL); + TEST_CHECK(calyptia != NULL); + + /* Set custom plugin properties */ + flb_custom_set_property(calyptia, "api_key", "test-key"); + flb_custom_set_property(calyptia, "log_level", "debug"); + flb_custom_set_property(calyptia, "add_label", "pipeline_id test-pipeline-id"); + flb_custom_set_property(calyptia, "calyptia_host", MOCK_SERVER_HOST); + flb_custom_set_property(calyptia, "calyptia_port", "9876"); + flb_custom_set_property(calyptia, "register_retry_on_flush", "false"); + flb_custom_set_property(calyptia, "calyptia_tls", "off"); + flb_custom_set_property(calyptia, "calyptia_tls.verify", "off"); + + /* Start the engine */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* First registration should be successful but with an empty token */ + TEST_CHECK(registration_count == 1); + + /* Push some data to trigger flush */ + flb_time_msleep(1000); + flb_lib_push(ctx, in_ffd, "{\"key\":\"val\"}", 13); + + /* Wait for processing */ + flb_time_msleep(10000); + + /* Verify the plugin fails due to empty token */ + TEST_CHECK(registration_count == 1); + + /* Cleanup */ + flb_stop(ctx); + flb_destroy(ctx); + mk_stop(mock_ctx); + mk_destroy(mock_ctx); +} + +static void test_calyptia_register_retry_empty_token_retry_true() +{ + flb_ctx_t *ctx; + int ret; + int in_ffd; + mk_ctx_t *mock_ctx; + int vid; + char tmp[256]; + struct flb_custom_instance *calyptia; + + /* Reset registration count */ + registration_count = 0; + + /* Init mock server */ + mock_ctx = mk_create(); + TEST_CHECK(mock_ctx != NULL); + + /* Compose listen address */ + snprintf(tmp, sizeof(tmp) - 1, "%s:%d", MOCK_SERVER_HOST, MOCK_SERVER_PORT); + ret = mk_config_set(mock_ctx, "Listen", tmp, NULL); + TEST_CHECK(ret == 0); + + vid = mk_vhost_create(mock_ctx, NULL); + TEST_CHECK(vid >= 0); + + ret = mk_vhost_handler(mock_ctx, vid, "/v1/agents", mock_server_cb_empty_token, NULL); + TEST_CHECK(ret == 0); + + ret = mk_vhost_handler(mock_ctx, vid, "/v1/agents/test-id", mock_server_cb_empty_token, NULL); + TEST_CHECK(ret == 0); + + ret = mk_start(mock_ctx); + TEST_CHECK(ret == 0); + + flb_time_msleep(500); // Allow the mock server to initialize + + /* Init Fluent Bit context */ + ctx = flb_create(); + TEST_CHECK(ctx != NULL); + + ret = flb_service_set(ctx, + "Log_Level", "debug", + NULL); + TEST_CHECK(ret == 0); + + /* Create dummy input */ + in_ffd = flb_input(ctx, (char *)"dummy", NULL); + TEST_CHECK(in_ffd >= 0); + + /* Create custom Calyptia plugin */ + calyptia = flb_custom_new(ctx->config, (char *)"calyptia", NULL); + TEST_CHECK(calyptia != NULL); + + /* Set custom plugin properties */ + flb_custom_set_property(calyptia, "api_key", "test-key"); + flb_custom_set_property(calyptia, "log_level", "debug"); + flb_custom_set_property(calyptia, "add_label", "pipeline_id test-pipeline-id"); + flb_custom_set_property(calyptia, "calyptia_host", MOCK_SERVER_HOST); + flb_custom_set_property(calyptia, "calyptia_port", "9876"); + flb_custom_set_property(calyptia, "register_retry_on_flush", "true"); + flb_custom_set_property(calyptia, "calyptia_tls", "off"); + flb_custom_set_property(calyptia, "calyptia_tls.verify", "off"); + + /* Start the engine */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* First registration should be successful but with an empty token */ + TEST_CHECK(registration_count == 1); + + /* Push some data to trigger flush */ + flb_time_msleep(1000); + flb_lib_push(ctx, in_ffd, "{\"key\":\"val\"}", 13); + + /* Wait for processing */ + flb_time_msleep(10000); + + /* Verify the plugin fails due to empty token */ + TEST_CHECK(registration_count > 1); + + /* Cleanup */ + flb_stop(ctx); + flb_destroy(ctx); + mk_stop(mock_ctx); + mk_destroy(mock_ctx); +} + +TEST_LIST = { + {"register_retry", test_calyptia_register_retry}, + {"register_retry_empty_token", test_calyptia_register_retry_empty_token}, + {"register_retry_empty_token_retry_true", test_calyptia_register_retry_empty_token_retry_true}, + {NULL, NULL} +}; \ No newline at end of file From d573777550073b36bf5c9896cd09572f925f4d06 Mon Sep 17 00:00:00 2001 From: Rama Malladi <98832537+RamaMalladiAWS@users.noreply.github.com> Date: Sun, 1 Dec 2024 10:25:24 -0600 Subject: [PATCH 16/16] build: cmake: fix UNICODE-escaped characters on aarch64 (#8851) Signed-off-by: Rama Malladi --- .github/workflows/unit-tests.yaml | 2 +- CMakeLists.txt | 10 ++++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/.github/workflows/unit-tests.yaml b/.github/workflows/unit-tests.yaml index abd82c05472..3d4a7564ce8 100644 --- a/.github/workflows/unit-tests.yaml +++ b/.github/workflows/unit-tests.yaml @@ -127,7 +127,7 @@ jobs: config: - name: "Aarch64 actuated testing" flb_option: "-DFLB_WITHOUT_flb-it-network=1 -DFLB_WITHOUT_flb-it-fstore=1" - omit_option: "-DFLB_WITHOUT_flb-it-utils=1 -DFLB_WITHOUT_flb-it-pack=1" + omit_option: "" global_option: "-DFLB_BACKTRACE=Off -DFLB_SHARED_LIB=Off -DFLB_DEBUG=On -DFLB_ALL=On -DFLB_EXAMPLES=Off" unit_test_option: "-DFLB_TESTS_INTERNAL=On" compiler: gcc diff --git a/CMakeLists.txt b/CMakeLists.txt index de5b5973974..08ecfb7ad94 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -30,6 +30,10 @@ endif() if(CMAKE_SYSTEM_NAME MATCHES "Linux") set(FLB_SYSTEM_LINUX On) add_definitions(-DFLB_SYSTEM_LINUX) + if (CMAKE_SYSTEM_PROCESSOR MATCHES "^(arm64|aarch64)") + set(FLB_LINUX_ON_AARCH64 On) + add_definitions(-DFLB_LINUX_ON_AARCH64) + endif() endif() # Update CFLAGS @@ -317,6 +321,12 @@ if (FLB_SYSTEM_LINUX) include(cmake/s390x.cmake) endif () +# Enable signed char support on Linux AARCH64 if specified +if (FLB_LINUX_ON_AARCH64) + set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fsigned-char") + message(STATUS "Enabling signed char") +endif() + # Extract Git commit information for debug output. # Note that this is only set when cmake is run, the intent here is to use in CI for verification of releases so is acceptable. # For a better solution see https://jonathanhamberg.com/post/cmake-embedding-git-hash/ but this is simple and easy.