diff --git a/.github/workflows/unit-tests.yaml b/.github/workflows/unit-tests.yaml index abd82c05472..3d4a7564ce8 100644 --- a/.github/workflows/unit-tests.yaml +++ b/.github/workflows/unit-tests.yaml @@ -127,7 +127,7 @@ jobs: config: - name: "Aarch64 actuated testing" flb_option: "-DFLB_WITHOUT_flb-it-network=1 -DFLB_WITHOUT_flb-it-fstore=1" - omit_option: "-DFLB_WITHOUT_flb-it-utils=1 -DFLB_WITHOUT_flb-it-pack=1" + omit_option: "" global_option: "-DFLB_BACKTRACE=Off -DFLB_SHARED_LIB=Off -DFLB_DEBUG=On -DFLB_ALL=On -DFLB_EXAMPLES=Off" unit_test_option: "-DFLB_TESTS_INTERNAL=On" compiler: gcc diff --git a/CMakeLists.txt b/CMakeLists.txt index 4d37ccb7ae2..08ecfb7ad94 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -8,7 +8,7 @@ set(CMAKE_POLICY_DEFAULT_CMP0069 NEW) # Fluent Bit Version set(FLB_VERSION_MAJOR 3) set(FLB_VERSION_MINOR 2) -set(FLB_VERSION_PATCH 1) +set(FLB_VERSION_PATCH 3) set(FLB_VERSION_STR "${FLB_VERSION_MAJOR}.${FLB_VERSION_MINOR}.${FLB_VERSION_PATCH}") set(CMAKE_POSITION_INDEPENDENT_CODE ON) @@ -30,6 +30,10 @@ endif() if(CMAKE_SYSTEM_NAME MATCHES "Linux") set(FLB_SYSTEM_LINUX On) add_definitions(-DFLB_SYSTEM_LINUX) + if (CMAKE_SYSTEM_PROCESSOR MATCHES "^(arm64|aarch64)") + set(FLB_LINUX_ON_AARCH64 On) + add_definitions(-DFLB_LINUX_ON_AARCH64) + endif() endif() # Update CFLAGS @@ -256,6 +260,8 @@ if(FLB_ALL) endif() if(FLB_DEV) + FLB_DEFINITION(FLB_HAVE_DEV) + set(FLB_DEBUG On) set(FLB_TRACE On) set(FLB_CHUNK_TRACE On) @@ -315,6 +321,12 @@ if (FLB_SYSTEM_LINUX) include(cmake/s390x.cmake) endif () +# Enable signed char support on Linux AARCH64 if specified +if (FLB_LINUX_ON_AARCH64) + set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fsigned-char") + message(STATUS "Enabling signed char") +endif() + # Extract Git commit information for debug output. # Note that this is only set when cmake is run, the intent here is to use in CI for verification of releases so is acceptable. # For a better solution see https://jonathanhamberg.com/post/cmake-embedding-git-hash/ but this is simple and easy. diff --git a/dockerfiles/Dockerfile b/dockerfiles/Dockerfile index 74319cbdcd3..9a774a27664 100644 --- a/dockerfiles/Dockerfile +++ b/dockerfiles/Dockerfile @@ -13,7 +13,7 @@ # docker buildx build --platform "linux/amd64,linux/arm64,linux/arm/v7,linux/s390x" -f ./dockerfiles/Dockerfile.multiarch --build-arg FLB_TARBALL=https://github.com/fluent/fluent-bit/archive/v1.8.11.tar.gz ./dockerfiles/ # Set this to the current release version: it gets done so as part of the release. -ARG RELEASE_VERSION=3.2.0 +ARG RELEASE_VERSION=3.2.3 # For multi-arch builds - assumption is running on an AMD64 host FROM multiarch/qemu-user-static:x86_64-arm AS qemu-arm32 diff --git a/fluent-bit-3.2.1.bb b/fluent-bit-3.2.3.bb similarity index 99% rename from fluent-bit-3.2.1.bb rename to fluent-bit-3.2.3.bb index 17d7149ede8..9f64a38cc17 100644 --- a/fluent-bit-3.2.1.bb +++ b/fluent-bit-3.2.3.bb @@ -16,7 +16,7 @@ LIC_FILES_CHKSUM = "file://LICENSE;md5=2ee41112a44fe7014dce33e26468ba93" SECTION = "net" PR = "r0" -PV = "3.2.1" +PV = "3.2.3" SRCREV = "v${PV}" SRC_URI = "git://github.com/fluent/fluent-bit.git;nobranch=1" diff --git a/include/fluent-bit/flb_http_client.h b/include/fluent-bit/flb_http_client.h index 243ed253da8..2573fba8ea6 100644 --- a/include/fluent-bit/flb_http_client.h +++ b/include/fluent-bit/flb_http_client.h @@ -28,6 +28,8 @@ #include #include +#define HTTP_CLIENT_TEMPORARY_BUFFER_SIZE (1024 * 64) + #define HTTP_CLIENT_SUCCESS 0 #define HTTP_CLIENT_PROVIDER_ERROR -1 @@ -145,6 +147,71 @@ struct flb_http_debug { int (*cb_debug_request_payload); }; +/* To make opaque struct */ +struct flb_http_client; + +/* + * Tests callbacks + * =============== + */ +struct flb_test_http_response { + /* + * Response Test Mode + * ==================== + * When the response test enable the test response mode, it needs to + * keep a reference of the context and other information: + * + * - rt_ctx : flb_http_client context + * + * - rt_status : HTTP response code + * + * - rt_in_callback: intermediary function to receive the results of + * the http response test function. + * + * - rt_data: opaque data type for rt_in_callback() + */ + + /* runtime library context */ + void *rt_ctx; + + /* HTTP status */ + int rt_status; + + /* optional response context */ + void *response_ctx; + + /* + * "response test callback": this function pointer is used by Fluent Bit + * http client testing mode to reference a test function that must retrieve the + * results of 'callback'. Consider this an intermediary function to + * transfer the results to the runtime test. + * + * This function is private and should not be set manually in the plugin + * code, it's set on src/flb_http_client.c . + */ + void (*rt_resp_callback) (void *, int, void *, size_t, void *); + + /* + * opaque data type passed by the runtime library to be used on + * rt_in_callback(). + */ + void *rt_data; + + /* + * Callback + * ========= + * "HTTP response callback": it references the plugin function that performs + * to validate HTTP response by HTTP client. This entry is mostly to + * expose the plugin local function. + */ + int (*callback) (/* plugin that ingested the records */ + struct flb_http_client *, + const void *, /* incoming response data */ + size_t, /* incoming response size */ + void **, /* output buffer */ + size_t *); /* output buffer size */ +}; + /* Set a request type */ struct flb_http_client { /* Upstream connection */ @@ -178,6 +245,10 @@ struct flb_http_client { /* Response */ struct flb_http_client_response resp; + /* Tests */ + int test_mode; + struct flb_test_http_response test_response; + /* Reference to Callback context */ void *cb_ctx; }; @@ -188,6 +259,7 @@ struct flb_http_client_ng { uint16_t port; uint64_t flags; int protocol_version; + cfl_sds_t temporary_buffer; int releasable; void *user_data; @@ -279,6 +351,13 @@ struct flb_http_client *flb_http_client(struct flb_connection *u_conn, const char *host, int port, const char *proxy, int flags); +/* For fulfilling HTTP response testing (dummy client) */ +struct flb_http_client *flb_http_dummy_client(struct flb_connection *u_conn, + int method, const char *uri, + const char *body, size_t body_len, + const char *host, int port, + const char *proxy, int flags); + int flb_http_add_header(struct flb_http_client *c, const char *key, size_t key_len, const char *val, size_t val_len); @@ -294,6 +373,12 @@ int flb_http_set_keepalive(struct flb_http_client *c); int flb_http_set_content_encoding_gzip(struct flb_http_client *c); int flb_http_set_callback_context(struct flb_http_client *c, struct flb_callback *cb_ctx); +int flb_http_set_response_test(struct flb_http_client *c, char *test_name, + const void *data, size_t len, + int status, + void (*resp_callback) (void *, int, void *, size_t, void *), + void *resp_callback_data); +int flb_http_push_response(struct flb_http_client *c, const void *data, size_t len); int flb_http_get_response_data(struct flb_http_client *c, size_t bytes_consumed); int flb_http_do_request(struct flb_http_client *c, size_t *bytes); diff --git a/include/fluent-bit/flb_http_client_http2.h b/include/fluent-bit/flb_http_client_http2.h index dbf3b85a22b..eb6932aa569 100644 --- a/include/fluent-bit/flb_http_client_http2.h +++ b/include/fluent-bit/flb_http_client_http2.h @@ -22,6 +22,7 @@ #include #include +#include struct flb_http_client_session; diff --git a/include/fluent-bit/flb_http_common.h b/include/fluent-bit/flb_http_common.h index 9d4c593dcdd..f7f0d7df90d 100644 --- a/include/fluent-bit/flb_http_common.h +++ b/include/fluent-bit/flb_http_common.h @@ -88,6 +88,7 @@ struct flb_http_server_session; struct flb_http_request { int protocol_version; + cfl_sds_t authority; int method; cfl_sds_t path; cfl_sds_t host; diff --git a/include/fluent-bit/flb_lib.h b/include/fluent-bit/flb_lib.h index f96b06eff86..897ea0098ee 100644 --- a/include/fluent-bit/flb_lib.h +++ b/include/fluent-bit/flb_lib.h @@ -68,6 +68,9 @@ FLB_EXPORT int flb_output_set_test(flb_ctx_t *ctx, int ffd, char *test_name, void *test_ctx); FLB_EXPORT int flb_output_set_callback(flb_ctx_t *ctx, int ffd, char *name, void (*cb)(char *, void *, void *)); +FLB_EXPORT int flb_output_set_http_test(flb_ctx_t *ctx, int ffd, char *test_name, + void (*out_response) (void *, int, int, void *, size_t, void *), + void *out_callback_data); FLB_EXPORT int flb_filter_set(flb_ctx_t *ctx, int ffd, ...); FLB_EXPORT int flb_service_set(flb_ctx_t *ctx, ...); @@ -84,6 +87,9 @@ FLB_EXPORT int flb_loop(flb_ctx_t *ctx); FLB_EXPORT int flb_lib_push(flb_ctx_t *ctx, int ffd, const void *data, size_t len); FLB_EXPORT int flb_lib_config_file(flb_ctx_t *ctx, const char *path); +/* Emulate ingestions of HTTP responses for output plugins */ +FLB_EXPORT int flb_lib_response(flb_ctx_t *ctx, int ffd, int status, const void *data, size_t len); + /* library context handling */ FLB_EXPORT void flb_context_set(flb_ctx_t *ctx); FLB_EXPORT flb_ctx_t *flb_context_get(); diff --git a/include/fluent-bit/flb_output.h b/include/fluent-bit/flb_output.h index b834886f0fa..c8b007806f2 100644 --- a/include/fluent-bit/flb_output.h +++ b/include/fluent-bit/flb_output.h @@ -168,6 +168,66 @@ struct flb_test_out_formatter { size_t *); /* output buffer size */ }; +struct flb_test_out_response { + /* + * Runtime Library Mode + * ==================== + * When the runtime library enable the test formatter mode, it needs to + * keep a reference of the context and other information: + * + * - rt_ctx : context created by flb_create() + * + * - rt_ffd : this plugin assigned 'integer' created by flb_output() + * + * - rt_step_calback: intermediary function to receive the results of + * the formatter plugin test function. + * + * - rt_data: opaque data type for rt_step_callback() + */ + + /* runtime library context */ + void *rt_ctx; + + /* runtime library: assigned plugin integer */ + int rt_ffd; + + /* + * "runtime step callback": this function pointer is used by Fluent Bit + * library mode to reference a test function that must retrieve the + * results of 'callback'. Consider this an intermediary function to + * transfer the results to the runtime test. + * + * This function is private and should not be set manually in the plugin + * code, it's set on src/flb_lib.c . + */ + void (*rt_out_response) (void *, int, int, void *, size_t, void *); + + /* + * opaque data type passed by the runtime library to be used on + * rt_step_test(). + */ + void *rt_data; + + /* optional context for flush callback */ + void *flush_ctx; + + /* + * Callback + * ========= + * "Formatter callback": it references the plugin function that performs + * data formatting (msgpack -> local data). This entry is mostly to + * expose the plugin local function. + */ + int (*callback) (/* Fluent Bit context */ + struct flb_config *, + void *, /* plugin instance context */ + int status, /* HTTP status code */ + const void *, /* respond msgpack data */ + size_t, /* respond msgpack size */ + void **, /* output buffer */ + size_t *); /* output buffer size */ +}; + struct flb_output_plugin { /* * a 'mask' to define what kind of data the plugin can manage: @@ -241,6 +301,7 @@ struct flb_output_plugin { /* Tests */ struct flb_test_out_formatter test_formatter; + struct flb_test_out_response test_response; /* Link to global list from flb_config->outputs */ struct mk_list _head; @@ -391,6 +452,7 @@ struct flb_output_instance { /* Tests */ struct flb_test_out_formatter test_formatter; + struct flb_test_out_response test_response; /* * Buffer counter: it counts the total of disk space (filesystem) used by buffers diff --git a/plugins/custom_calyptia/calyptia.c b/plugins/custom_calyptia/calyptia.c index c3554157598..bfbb42f4767 100644 --- a/plugins/custom_calyptia/calyptia.c +++ b/plugins/custom_calyptia/calyptia.c @@ -30,40 +30,7 @@ #include -struct calyptia { - /* config map options */ - flb_sds_t api_key; - flb_sds_t store_path; - flb_sds_t cloud_host; - flb_sds_t cloud_port; - flb_sds_t machine_id; - int machine_id_auto_configured; - -/* used for reporting chunk trace records. */ -#ifdef FLB_HAVE_CHUNK_TRACE - flb_sds_t pipeline_id; -#endif /* FLB_HAVE_CHUNK_TRACE */ - - int cloud_tls; - int cloud_tls_verify; - - /* config reader for 'add_label' */ - struct mk_list *add_labels; - - /* instances */ - struct flb_input_instance *i; - struct flb_output_instance *o; - struct flb_input_instance *fleet; - struct flb_custom_instance *ins; - - /* Fleet configuration */ - flb_sds_t fleet_id; /* fleet-id */ - flb_sds_t fleet_name; - flb_sds_t fleet_config_dir; /* fleet configuration directory */ - flb_sds_t fleet_max_http_buffer_size; - int fleet_interval_sec; - int fleet_interval_nsec; -}; +#include "calyptia.h" /* * Check if the key belongs to a sensitive data field, if so report it. We never @@ -232,6 +199,53 @@ flb_sds_t custom_calyptia_pipeline_config_get(struct flb_config *ctx) return buf; } +int set_fleet_input_properties(struct calyptia *ctx, struct flb_input_instance *fleet) +{ + if (!fleet) { + flb_plg_error(ctx->ins, "invalid fleet input instance"); + return -1; + } + + if (ctx->fleet_name) { + flb_input_set_property(fleet, "fleet_name", ctx->fleet_name); + } + + if (ctx->fleet_id) { + flb_input_set_property(fleet, "fleet_id", ctx->fleet_id); + } + + flb_input_set_property(fleet, "api_key", ctx->api_key); + flb_input_set_property(fleet, "host", ctx->cloud_host); + flb_input_set_property(fleet, "port", ctx->cloud_port); + + /* Set TLS properties */ + flb_input_set_property(fleet, "tls", ctx->cloud_tls == 1 ? "on" : "off"); + flb_input_set_property(fleet, "tls.verify", ctx->cloud_tls_verify == 1 ? "on" : "off"); + + /* Optional configurations */ + if (ctx->fleet_config_dir) { + flb_input_set_property(fleet, "config_dir", ctx->fleet_config_dir); + } + + if (ctx->fleet_max_http_buffer_size) { + flb_input_set_property(fleet, "max_http_buffer_size", ctx->fleet_max_http_buffer_size); + } + + if (ctx->machine_id) { + flb_input_set_property(fleet, "machine_id", ctx->machine_id); + } + + if (ctx->fleet_interval_sec) { + flb_input_set_property(fleet, "interval_sec", ctx->fleet_interval_sec); + } + + if (ctx->fleet_interval_nsec) { + flb_input_set_property(fleet, "interval_nsec", ctx->fleet_interval_nsec); + } + + return 0; +} + static struct flb_output_instance *setup_cloud_output(struct flb_config *config, struct calyptia *ctx) { int ret; @@ -279,6 +293,12 @@ static struct flb_output_instance *setup_cloud_output(struct flb_config *config, flb_output_set_property(cloud, "match", "_calyptia_cloud"); flb_output_set_property(cloud, "api_key", ctx->api_key); + if (ctx->register_retry_on_flush) { + flb_output_set_property(cloud, "register_retry_on_flush", "true"); + } else { + flb_output_set_property(cloud, "register_retry_on_flush", "false"); + } + if (ctx->store_path) { flb_output_set_property(cloud, "store_path", ctx->store_path); } @@ -387,16 +407,14 @@ static flb_sds_t get_machine_id(struct calyptia *ctx) } static int cb_calyptia_init(struct flb_custom_instance *ins, - struct flb_config *config, - void *data) + struct flb_config *config, + void *data) { int ret; struct calyptia *ctx; - int is_fleet_mode; (void) data; ctx = flb_calloc(1, sizeof(struct calyptia)); - if (!ctx) { flb_errno(); return -1; @@ -405,7 +423,6 @@ static int cb_calyptia_init(struct flb_custom_instance *ins, /* Load the config map */ ret = flb_custom_config_map_set(ins, (void *) ctx); - if (ret == -1) { flb_free(ctx); return -1; @@ -416,21 +433,17 @@ static int cb_calyptia_init(struct flb_custom_instance *ins, /* If no machine_id has been provided via a configuration option get it from the local machine-id. */ if (!ctx->machine_id) { - /* machine id */ ctx->machine_id = get_machine_id(ctx); - if (ctx->machine_id == NULL) { flb_plg_error(ctx->ins, "unable to retrieve machine_id"); flb_free(ctx); return -1; } - ctx->machine_id_auto_configured = 1; } /* input collector */ ctx->i = flb_input_new(config, "fluentbit_metrics", NULL, FLB_TRUE); - if (!ctx->i) { flb_plg_error(ctx->ins, "could not load metrics collector"); flb_free(ctx); @@ -439,76 +452,40 @@ static int cb_calyptia_init(struct flb_custom_instance *ins, flb_input_set_property(ctx->i, "tag", "_calyptia_cloud"); flb_input_set_property(ctx->i, "scrape_on_start", "true"); + // This scrape interval should be configurable. flb_input_set_property(ctx->i, "scrape_interval", "30"); - if (ctx->fleet_name || ctx->fleet_id) { - is_fleet_mode = FLB_TRUE; - } - else { - is_fleet_mode = FLB_FALSE; - } - - /* output cloud connector */ - if ((is_fleet_mode == FLB_TRUE && ctx->fleet_id != NULL) || - (is_fleet_mode == FLB_FALSE)) { + /* Setup cloud output if needed */ + if (ctx->fleet_id != NULL || !ctx->fleet_name) { ctx->o = setup_cloud_output(config, ctx); - if (ctx->o == NULL) { flb_free(ctx); return -1; } + /* Set fleet_id for output if present */ + if (ctx->fleet_id) { + flb_output_set_property(ctx->o, "fleet_id", ctx->fleet_id); + } } + /* Setup fleet input if needed */ if (ctx->fleet_id || ctx->fleet_name) { - ctx->fleet = flb_input_new(config, "calyptia_fleet", NULL, FLB_FALSE); - + ctx->fleet = flb_input_new(config, "calyptia_fleet", NULL, FLB_FALSE); if (!ctx->fleet) { flb_plg_error(ctx->ins, "could not load Calyptia Fleet plugin"); return -1; } - if (ctx->fleet_name) { - flb_input_set_property(ctx->fleet, "fleet_name", ctx->fleet_name); - } - - if (ctx->fleet_id) { - flb_output_set_property(ctx->o, "fleet_id", ctx->fleet_id); - flb_input_set_property(ctx->fleet, "fleet_id", ctx->fleet_id); - } - - flb_input_set_property(ctx->fleet, "api_key", ctx->api_key); - flb_input_set_property(ctx->fleet, "host", ctx->cloud_host); - flb_input_set_property(ctx->fleet, "port", ctx->cloud_port); - - if (ctx->cloud_tls == 1) { - flb_input_set_property(ctx->fleet, "tls", "on"); - } - else { - flb_input_set_property(ctx->fleet, "tls", "off"); - } - - if (ctx->cloud_tls_verify == 1) { - flb_input_set_property(ctx->fleet, "tls.verify", "on"); - } - else { - flb_input_set_property(ctx->fleet, "tls.verify", "off"); - } - - if (ctx->fleet_config_dir) { - flb_input_set_property(ctx->fleet, "config_dir", ctx->fleet_config_dir); - } - - if (ctx->fleet_max_http_buffer_size) { - flb_input_set_property(ctx->fleet, "max_http_buffer_size", ctx->fleet_max_http_buffer_size); - } - if (ctx->machine_id) { - flb_input_set_property(ctx->fleet, "machine_id", ctx->machine_id); + ret = set_fleet_input_properties(ctx, ctx->fleet); + if (ret == -1) { + return -1; } } if (ctx->o) { flb_router_connect(ctx->i, ctx->o); } + flb_plg_info(ins, "custom initialized!"); return 0; } @@ -587,12 +564,12 @@ static struct flb_config_map config_map[] = { "Base path for the configuration directory." }, { - FLB_CONFIG_MAP_INT, "fleet.interval_sec", "-1", + FLB_CONFIG_MAP_STR, "fleet.interval_sec", "-1", 0, FLB_TRUE, offsetof(struct calyptia, fleet_interval_sec), "Set the collector interval" }, { - FLB_CONFIG_MAP_INT, "fleet.interval_nsec", "-1", + FLB_CONFIG_MAP_STR, "fleet.interval_nsec", "-1", 0, FLB_TRUE, offsetof(struct calyptia, fleet_interval_nsec), "Set the collector interval (nanoseconds)" }, @@ -614,7 +591,11 @@ static struct flb_config_map config_map[] = { "Pipeline ID for reporting to calyptia cloud." }, #endif /* FLB_HAVE_CHUNK_TRACE */ - + { + FLB_CONFIG_MAP_BOOL, "register_retry_on_flush", "true", + 0, FLB_TRUE, offsetof(struct calyptia, register_retry_on_flush), + "Retry agent registration on flush if failed on init." + }, /* EOF */ {0} }; diff --git a/plugins/custom_calyptia/calyptia.h b/plugins/custom_calyptia/calyptia.h new file mode 100644 index 00000000000..b4313f51182 --- /dev/null +++ b/plugins/custom_calyptia/calyptia.h @@ -0,0 +1,60 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_CALYPTIA_H +#define FLB_CALYPTIA_H + +struct calyptia { + /* config map options */ + flb_sds_t api_key; + flb_sds_t store_path; + flb_sds_t cloud_host; + flb_sds_t cloud_port; + flb_sds_t machine_id; + int machine_id_auto_configured; + +/* used for reporting chunk trace records. */ +#ifdef FLB_HAVE_CHUNK_TRACE + flb_sds_t pipeline_id; +#endif /* FLB_HAVE_CHUNK_TRACE */ + + int cloud_tls; + int cloud_tls_verify; + + /* config reader for 'add_label' */ + struct mk_list *add_labels; + + /* instances */ + struct flb_input_instance *i; + struct flb_output_instance *o; + struct flb_input_instance *fleet; + struct flb_custom_instance *ins; + + /* Fleet configuration */ + flb_sds_t fleet_id; /* fleet-id */ + flb_sds_t fleet_name; + flb_sds_t fleet_config_dir; /* fleet configuration directory */ + flb_sds_t fleet_max_http_buffer_size; + flb_sds_t fleet_interval_sec; + flb_sds_t fleet_interval_nsec; + bool register_retry_on_flush; /* retry registration on flush if failed */ +}; + +int set_fleet_input_properties(struct calyptia *ctx, struct flb_input_instance *fleet); +#endif /* FLB_CALYPTIA_H */ diff --git a/plugins/filter_lua/lua.c b/plugins/filter_lua/lua.c index 5186955b8ae..de7e6983188 100644 --- a/plugins/filter_lua/lua.c +++ b/plugins/filter_lua/lua.c @@ -27,8 +27,10 @@ #include #include #include +#include #include #include + #include #include "fluent-bit/flb_mem.h" @@ -76,6 +78,31 @@ static int cb_lua_pre_run(struct flb_filter_instance *f_ins, return ret; } +static int env_variables(struct flb_config *config, struct flb_luajit *lj) +{ + struct mk_list *list; + struct mk_list *head; + struct flb_env *env; + struct flb_hash_table_entry *entry; + + lua_newtable(lj->state); + + env = (struct flb_env *) config->env; + list = (struct mk_list *) &env->ht->entries; + mk_list_foreach(head, list) { + entry = mk_list_entry(head, struct flb_hash_table_entry, _head_parent); + if (entry->val_size <= 0) { + continue; + } + lua_pushlstring(lj->state, entry->key, entry->key_len); + lua_pushlstring(lj->state, entry->val, entry->val_size); + lua_settable(lj->state, -3); + } + + lua_setglobal(lj->state, "FLB_ENV"); + return 0; +} + static int cb_lua_init(struct flb_filter_instance *f_ins, struct flb_config *config, void *data) @@ -101,6 +128,9 @@ static int cb_lua_init(struct flb_filter_instance *f_ins, } ctx->lua = lj; + /* register environment variables */ + env_variables(config, lj); + if (ctx->enable_flb_null) { flb_lua_enable_flb_null(lj->state); } diff --git a/plugins/filter_parser/filter_parser.c b/plugins/filter_parser/filter_parser.c index 11bf71f7e53..64a504ef905 100644 --- a/plugins/filter_parser/filter_parser.c +++ b/plugins/filter_parser/filter_parser.c @@ -142,10 +142,6 @@ static int cb_parser_init(struct flb_filter_instance *f_ins, struct flb_config *config, void *data) { - (void) f_ins; - (void) config; - (void) data; - struct filter_parser_ctx *ctx = NULL; /* Create context */ @@ -156,13 +152,12 @@ static int cb_parser_init(struct flb_filter_instance *f_ins, } ctx->ins = f_ins; - if ( configure(ctx, f_ins, config) < 0 ){ + if (configure(ctx, f_ins, config) < 0) { flb_free(ctx); return -1; } flb_filter_set_context(f_ins, ctx); - return 0; } @@ -174,11 +169,9 @@ static int cb_parser_filter(const void *data, size_t bytes, void *context, struct flb_config *config) { - int continue_parsing; struct filter_parser_ctx *ctx = context; struct flb_time tm; msgpack_object *obj; - msgpack_object_kv *kv; int i; int ret = FLB_FILTER_NOTOUCH; @@ -191,10 +184,8 @@ static int cb_parser_filter(const void *data, size_t bytes, char *out_buf; size_t out_size; struct flb_time parsed_time; - msgpack_object_kv **append_arr = NULL; size_t append_arr_len = 0; - int append_arr_i; struct mk_list *head; struct filter_parser *fp; struct flb_log_event_encoder log_encoder; @@ -209,9 +200,7 @@ static int cb_parser_filter(const void *data, size_t bytes, ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes); if (ret != FLB_EVENT_DECODER_SUCCESS) { - flb_plg_error(ctx->ins, - "Log event decoder initialization error : %d", ret); - + flb_plg_error(ctx->ins, "Log event decoder initialization error : %d", ret); return FLB_FILTER_NOTOUCH; } @@ -219,11 +208,8 @@ static int cb_parser_filter(const void *data, size_t bytes, FLB_LOG_EVENT_FORMAT_DEFAULT); if (ret != FLB_EVENT_ENCODER_SUCCESS) { - flb_plg_error(ctx->ins, - "Log event encoder initialization error : %d", ret); - + flb_plg_error(ctx->ins, "Log event encoder initialization error : %d", ret); flb_log_event_decoder_destroy(&log_decoder); - return FLB_FILTER_NOTOUCH; } @@ -231,77 +217,69 @@ static int cb_parser_filter(const void *data, size_t bytes, &log_decoder, &log_event)) == FLB_EVENT_DECODER_SUCCESS) { out_buf = NULL; - append_arr_i = 0; flb_time_copy(&tm, &log_event.timestamp); obj = log_event.body; if (obj->type == MSGPACK_OBJECT_MAP) { map_num = obj->via.map.size; - if (ctx->reserve_data) { - append_arr_len = obj->via.map.size; - append_arr = flb_calloc(append_arr_len, sizeof(msgpack_object_kv *)); + /* Calculate initial array size based on configuration */ + append_arr_len = (ctx->reserve_data ? map_num : 0); + if (ctx->preserve_key && !ctx->reserve_data) { + append_arr_len = 1; /* Space for preserved key */ + } + if (append_arr_len > 0) { + append_arr = flb_calloc(append_arr_len, sizeof(msgpack_object_kv *)); if (append_arr == NULL) { flb_errno(); - flb_log_event_decoder_destroy(&log_decoder); flb_log_event_encoder_destroy(&log_encoder); - return FLB_FILTER_NOTOUCH; } - } - continue_parsing = FLB_TRUE; - for (i = 0; i < map_num && continue_parsing; i++) { - kv = &obj->via.map.ptr[i]; + /* Initialize array */ if (ctx->reserve_data) { - append_arr[append_arr_i] = kv; - append_arr_i++; + for (i = 0; i < map_num; i++) { + append_arr[i] = &obj->via.map.ptr[i]; + } } - if ( msgpackobj2char(&kv->key, &key_str, &key_len) < 0 ) { - /* key is not string */ + } + + /* Process the target key */ + for (i = 0; i < map_num; i++) { + kv = &obj->via.map.ptr[i]; + if (msgpackobj2char(&kv->key, &key_str, &key_len) < 0) { continue; } + if (key_len == ctx->key_name_len && !strncmp(key_str, ctx->key_name, key_len)) { - if ( msgpackobj2char(&kv->val, &val_str, &val_len) < 0 ) { - /* val is not string */ + if (msgpackobj2char(&kv->val, &val_str, &val_len) < 0) { continue; } /* Lookup parser */ mk_list_foreach(head, &ctx->parsers) { fp = mk_list_entry(head, struct filter_parser, _head); - - /* Reset time */ flb_time_zero(&parsed_time); parse_ret = flb_parser_do(fp->parser, val_str, val_len, (void **) &out_buf, &out_size, &parsed_time); if (parse_ret >= 0) { - /* - * If the parser succeeded we need to check the - * status of the parsed time. If the time was - * parsed successfully 'parsed_time' will be - * different than zero, if so, override the time - * holder with the new value, otherwise keep the - * original. - */ if (flb_time_to_nanosec(&parsed_time) != 0L) { flb_time_copy(&tm, &parsed_time); } - if (ctx->reserve_data) { + if (append_arr != NULL) { if (!ctx->preserve_key) { - append_arr_i--; - append_arr_len--; - append_arr[append_arr_i] = NULL; + append_arr[i] = NULL; + } + else if (!ctx->reserve_data) { + /* Store only the key being preserved */ + append_arr[0] = kv; } - } - else { - continue_parsing = FLB_FALSE; } break; } @@ -322,27 +300,58 @@ static int cb_parser_filter(const void *data, size_t bytes, &log_encoder, log_event.metadata); } - if (out_buf != NULL) { - if (ctx->reserve_data) { + if (out_buf != NULL && parse_ret >= 0) { + if (append_arr != NULL && append_arr_len > 0) { char *new_buf = NULL; - int new_size; - int ret; - ret = flb_msgpack_expand_map(out_buf, out_size, - append_arr, append_arr_len, - &new_buf, &new_size); - if (ret == -1) { - flb_plg_error(ctx->ins, "cannot expand map"); - - flb_log_event_decoder_destroy(&log_decoder); - flb_log_event_encoder_destroy(&log_encoder); - flb_free(append_arr); - - return FLB_FILTER_NOTOUCH; + int new_size; + size_t valid_kv_count = 0; + msgpack_object_kv **valid_kv = NULL; + + /* Count valid entries */ + for (i = 0; i < append_arr_len; i++) { + if (append_arr[i] != NULL) { + valid_kv_count++; + } } - flb_free(out_buf); - out_buf = new_buf; - out_size = new_size; + if (valid_kv_count > 0) { + valid_kv = flb_calloc(valid_kv_count, sizeof(msgpack_object_kv *)); + if (!valid_kv) { + flb_errno(); + flb_log_event_decoder_destroy(&log_decoder); + flb_log_event_encoder_destroy(&log_encoder); + flb_free(append_arr); + flb_free(out_buf); + return FLB_FILTER_NOTOUCH; + } + + /* Fill valid entries */ + valid_kv_count = 0; + for (i = 0; i < append_arr_len; i++) { + if (append_arr[i] != NULL) { + valid_kv[valid_kv_count++] = append_arr[i]; + } + } + + ret = flb_msgpack_expand_map(out_buf, out_size, + valid_kv, valid_kv_count, + &new_buf, &new_size); + + flb_free(valid_kv); + + if (ret == -1) { + flb_plg_error(ctx->ins, "cannot expand map"); + flb_log_event_decoder_destroy(&log_decoder); + flb_log_event_encoder_destroy(&log_encoder); + flb_free(append_arr); + flb_free(out_buf); + return FLB_FILTER_NOTOUCH; + } + + flb_free(out_buf); + out_buf = new_buf; + out_size = new_size; + } } if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) { @@ -355,7 +364,6 @@ static int cb_parser_filter(const void *data, size_t bytes, ret = FLB_FILTER_MODIFIED; } else { - /* re-use original data*/ if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) { encoder_result = \ flb_log_event_encoder_set_body_from_msgpack_object( @@ -371,26 +379,22 @@ static int cb_parser_filter(const void *data, size_t bytes, flb_plg_error(ctx->ins, "log event encoder error : %d", encoder_result); } - flb_free(append_arr); - append_arr = NULL; - } - else { - continue; + if (append_arr != NULL) { + flb_free(append_arr); + append_arr = NULL; + } } } if (log_encoder.output_length > 0) { - *ret_buf = log_encoder.output_buffer; + *ret_buf = log_encoder.output_buffer; *ret_bytes = log_encoder.output_length; ret = FLB_FILTER_MODIFIED; - flb_log_event_encoder_claim_internal_buffer_ownership(&log_encoder); } else { - flb_plg_error(ctx->ins, - "Log event encoder error : %d", ret); - + flb_plg_error(ctx->ins, "Log event encoder error : %d", ret); ret = FLB_FILTER_NOTOUCH; } @@ -400,7 +404,6 @@ static int cb_parser_filter(const void *data, size_t bytes, return ret; } - static int cb_parser_exit(void *data, struct flb_config *config) { struct filter_parser_ctx *ctx = data; diff --git a/plugins/in_calyptia_fleet/in_calyptia_fleet.c b/plugins/in_calyptia_fleet/in_calyptia_fleet.c index f61068ceede..28e3fc8719c 100644 --- a/plugins/in_calyptia_fleet/in_calyptia_fleet.c +++ b/plugins/in_calyptia_fleet/in_calyptia_fleet.c @@ -2233,7 +2233,6 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in, ctx->collect_fd = -1; ctx->fleet_id_found = FLB_FALSE; - /* Load the config map */ ret = flb_input_config_map_set(in, (void *) ctx); if (ret == -1) { @@ -2278,14 +2277,16 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in, return -1; } + /* Log initial interval values */ + flb_plg_debug(ctx->ins, "initial collector interval: sec=%d nsec=%d", + ctx->interval_sec, ctx->interval_nsec); + if (ctx->interval_sec <= 0 && ctx->interval_nsec <= 0) { /* Illegal settings. Override them. */ ctx->interval_sec = atoi(DEFAULT_INTERVAL_SEC); ctx->interval_nsec = atoi(DEFAULT_INTERVAL_NSEC); - } - - if (ctx->interval_sec < atoi(DEFAULT_INTERVAL_SEC)) { - ctx->interval_sec = atoi(DEFAULT_INTERVAL_SEC); + flb_plg_info(ctx->ins, "invalid interval settings, using defaults: sec=%d nsec=%d", + ctx->interval_sec, ctx->interval_nsec); } /* Set the context */ @@ -2328,6 +2329,8 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in, } ctx->collect_fd = ret; + flb_plg_info(ctx->ins, "fleet collector initialized with interval: %d sec %d nsec", + ctx->interval_sec, ctx->interval_nsec); return 0; } diff --git a/plugins/in_forward/fw_prot.c b/plugins/in_forward/fw_prot.c index a118da599ce..a3c0ee05464 100644 --- a/plugins/in_forward/fw_prot.c +++ b/plugins/in_forward/fw_prot.c @@ -629,6 +629,7 @@ static int check_ping(struct flb_input_instance *ins, flb_free(serverside); flb_free(hostname); flb_free(shared_key_salt); + flb_free(shared_key_digest); msgpack_unpacked_destroy(&result); return -1; } diff --git a/plugins/in_http/http_prot.c b/plugins/in_http/http_prot.c index 175fe4461e1..4e2aa6a761c 100644 --- a/plugins/in_http/http_prot.c +++ b/plugins/in_http/http_prot.c @@ -843,6 +843,11 @@ static int process_pack_ng(struct flb_http *ctx, flb_sds_t tag, char *buf, size_ { record = obj->via.array.ptr[i]; + tag_from_record = NULL; + if (ctx->tag_key) { + tag_from_record = tag_key(ctx, &record); + } + if (tag_from_record) { ret = process_pack_record(ctx, &tm, tag_from_record, &record); flb_sds_destroy(tag_from_record); diff --git a/plugins/in_node_exporter_metrics/ne_utils.c b/plugins/in_node_exporter_metrics/ne_utils.c index e54a2e862b8..7206f60bbd7 100644 --- a/plugins/in_node_exporter_metrics/ne_utils.c +++ b/plugins/in_node_exporter_metrics/ne_utils.c @@ -302,7 +302,7 @@ int ne_utils_path_scan(struct flb_ne *ctx, const char *mount, const char *path, flb_plg_error(ctx->ins, "no memory space available"); return -1; case GLOB_ABORTED: - flb_plg_error(ctx->ins, "read error, check permissions: %s", path); + flb_plg_error(ctx->ins, "read error, check permissions: %s", real_path); return -1;; case GLOB_NOMATCH: ret = stat(path, &st); diff --git a/plugins/in_opentelemetry/opentelemetry_prot.c b/plugins/in_opentelemetry/opentelemetry_prot.c index bfab3d02145..7b89902bf29 100644 --- a/plugins/in_opentelemetry/opentelemetry_prot.c +++ b/plugins/in_opentelemetry/opentelemetry_prot.c @@ -2242,7 +2242,7 @@ static int process_payload_metrics_ng(struct flb_opentelemetry *ctx, cfl_list_foreach(iterator, &decoded_contexts) { context = cfl_list_entry(iterator, struct cmt, _head); - result = flb_input_metrics_append(ctx->ins, NULL, 0, context); + result = flb_input_metrics_append(ctx->ins, tag, cfl_sds_len(tag), context); if (result != 0) { flb_plg_debug(ctx->ins, "could not ingest metrics context : %d", result); @@ -2301,7 +2301,7 @@ static int process_payload_traces_proto_ng(struct flb_opentelemetry *ctx, } if (result == 0) { - result = flb_input_trace_append(ctx->ins, NULL, 0, decoded_context); + result = flb_input_trace_append(ctx->ins, tag, cfl_sds_len(tag), decoded_context); ctr_decode_opentelemetry_destroy(decoded_context); } else { diff --git a/plugins/out_azure_kusto/azure_kusto.c b/plugins/out_azure_kusto/azure_kusto.c index 58ac559dbdc..4b79d80c4bb 100644 --- a/plugins/out_azure_kusto/azure_kusto.c +++ b/plugins/out_azure_kusto/azure_kusto.c @@ -22,8 +22,9 @@ #include #include #include -#include #include +#include +#include #include "azure_kusto.h" #include "azure_kusto_conf.h" @@ -126,11 +127,21 @@ flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *cs struct flb_http_client *c; flb_sds_t resp = NULL; + flb_plg_debug(ctx->ins, "before getting upstream connection"); + + flb_plg_debug(ctx->ins, "Logging attributes of flb_azure_kusto_resources:"); + flb_plg_debug(ctx->ins, "blob_ha: %p", ctx->resources->blob_ha); + flb_plg_debug(ctx->ins, "queue_ha: %p", ctx->resources->queue_ha); + flb_plg_debug(ctx->ins, "load_time: %lu", ctx->resources->load_time); + + ctx->u->base.net.connect_timeout = ctx->ingestion_endpoint_connect_timeout ; + /* Get upstream connection */ u_conn = flb_upstream_conn_get(ctx->u); if (u_conn) { token = get_azure_kusto_token(ctx); + flb_plg_debug(ctx->ins, "after get azure kusto token"); if (token) { /* Compose request body */ @@ -152,6 +163,9 @@ flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *cs flb_http_add_header(c, "Accept", 6, "application/json", 16); flb_http_add_header(c, "Authorization", 13, token, flb_sds_len(token)); + flb_http_add_header(c, "x-ms-client-version", 19, FLB_VERSION_STR, strlen(FLB_VERSION_STR)); + flb_http_add_header(c, "x-ms-app", 8, "Fluent-Bit", 10); + flb_http_add_header(c, "x-ms-user", 9, "Fluent-Bit", 10); flb_http_buffer_size(c, FLB_HTTP_DATA_SIZE_MAX * 10); /* Send HTTP request */ @@ -231,6 +245,7 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi */ pthread_mutex_init(&ctx->token_mutex, NULL); pthread_mutex_init(&ctx->resources_mutex, NULL); + pthread_mutex_init(&ctx->blob_mutex, NULL); /* * Create upstream context for Kusto Ingestion endpoint @@ -250,6 +265,8 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi } flb_output_upstream_set(ctx->u, ins); + flb_plg_debug(ctx->ins, "azure kusto init completed"); + return 0; } @@ -367,16 +384,21 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk, size_t json_size; size_t tag_len; struct flb_azure_kusto *ctx = out_context; + int is_compressed = FLB_FALSE; (void)i_ins; (void)config; + void *final_payload = NULL; + size_t final_payload_size = 0; + flb_plg_trace(ctx->ins, "flushing bytes %zu", event_chunk->size); tag_len = flb_sds_len(event_chunk->tag); /* Load or refresh ingestion resources */ ret = azure_kusto_load_ingestion_resources(ctx, config); + flb_plg_trace(ctx->ins, "load_ingestion_resources: ret=%d", ret); if (ret != 0) { flb_plg_error(ctx->ins, "cannot load ingestion resources"); FLB_OUTPUT_RETURN(FLB_RETRY); @@ -385,12 +407,32 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk, /* Reformat msgpack to JSON payload */ ret = azure_kusto_format(ctx, event_chunk->tag, tag_len, event_chunk->data, event_chunk->size, (void **)&json, &json_size); + flb_plg_trace(ctx->ins, "format: ret=%d", ret); if (ret != 0) { flb_plg_error(ctx->ins, "cannot reformat data into json"); FLB_OUTPUT_RETURN(FLB_RETRY); } - ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, json, json_size); + /* Map buffer */ + final_payload = json; + final_payload_size = json_size; + if (ctx->compression_enabled == FLB_TRUE) { + ret = flb_gzip_compress((void *) json, json_size, + &final_payload, &final_payload_size); + if (ret != 0) { + flb_plg_error(ctx->ins, + "cannot gzip payload"); + flb_sds_destroy(json); + FLB_OUTPUT_RETURN(FLB_RETRY); + } + else { + is_compressed = FLB_TRUE; + /* JSON buffer will be cleared at cleanup: */ + } + } + flb_plg_trace(ctx->ins, "payload size before compression %zu & after compression %zu ", json_size ,final_payload_size); + ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, final_payload, final_payload_size); + flb_plg_trace(ctx->ins, "after kusto queued ingestion %d", ret); if (ret != 0) { flb_plg_error(ctx->ins, "cannot perform queued ingestion"); flb_sds_destroy(json); @@ -400,6 +442,10 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk, /* Cleanup */ flb_sds_destroy(json); + /* release compressed payload */ + if (is_compressed == FLB_TRUE) { + flb_free(final_payload); + } /* Done */ FLB_OUTPUT_RETURN(FLB_OK); } @@ -417,6 +463,10 @@ static int cb_azure_kusto_exit(void *data, struct flb_config *config) ctx->u = NULL; } + pthread_mutex_destroy(&ctx->resources_mutex); + pthread_mutex_destroy(&ctx->token_mutex); + pthread_mutex_destroy(&ctx->blob_mutex); + flb_azure_kusto_conf_destroy(ctx); return 0; @@ -462,7 +512,19 @@ static struct flb_config_map config_map[] = { offsetof(struct flb_azure_kusto, time_key), "The key name of the time. If 'include_time_key' is false, " "This property is ignored"}, - /* EOF */ + {FLB_CONFIG_MAP_TIME, "ingestion_endpoint_connect_timeout", FLB_AZURE_KUSTO_INGEST_ENDPOINT_CONNECTION_TIMEOUT, 0, FLB_TRUE, + offsetof(struct flb_azure_kusto, ingestion_endpoint_connect_timeout), + "Set the connection timeout of various kusto endpoints (kusto ingest endpoint, kusto ingestion blob endpoint, kusto ingestion queue endpoint) in seconds." + "The default is 60 seconds."}, + {FLB_CONFIG_MAP_BOOL, "compression_enabled", "true", 0, FLB_TRUE, + offsetof(struct flb_azure_kusto, compression_enabled), + "Enable HTTP payload compression (gzip)." + "The default is true."}, + {FLB_CONFIG_MAP_TIME, "ingestion_resources_refresh_interval", FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC,0, FLB_TRUE, + offsetof(struct flb_azure_kusto, ingestion_resources_refresh_interval), + "Set the azure kusto ingestion resources refresh interval" + "The default is 3600 seconds."}, + /* EOF */ {0}}; struct flb_output_plugin out_azure_kusto_plugin = { diff --git a/plugins/out_azure_kusto/azure_kusto.h b/plugins/out_azure_kusto/azure_kusto.h index b752e087e65..9e3eb7b3182 100644 --- a/plugins/out_azure_kusto/azure_kusto.h +++ b/plugins/out_azure_kusto/azure_kusto.h @@ -49,7 +49,10 @@ #define AZURE_KUSTO_RESOURCE_UPSTREAM_URI "uri" #define AZURE_KUSTO_RESOURCE_UPSTREAM_SAS "sas" -#define FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC 3600 +#define FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC "3600" + +#define FLB_AZURE_KUSTO_INGEST_ENDPOINT_CONNECTION_TIMEOUT "60" + struct flb_azure_kusto_resources { struct flb_upstream_ha *blob_ha; @@ -70,6 +73,13 @@ struct flb_azure_kusto { flb_sds_t table_name; flb_sds_t ingestion_mapping_reference; + int ingestion_endpoint_connect_timeout; + + /* compress payload */ + int compression_enabled; + + int ingestion_resources_refresh_interval; + /* records configuration */ flb_sds_t log_key; int include_tag_key; @@ -94,6 +104,8 @@ struct flb_azure_kusto { /* mutex for loading reosurces */ pthread_mutex_t resources_mutex; + pthread_mutex_t blob_mutex; + /* Upstream connection to the backend server */ struct flb_upstream *u; diff --git a/plugins/out_azure_kusto/azure_kusto_conf.c b/plugins/out_azure_kusto/azure_kusto_conf.c index ec4c23c8d16..d25f11b15c3 100644 --- a/plugins/out_azure_kusto/azure_kusto_conf.c +++ b/plugins/out_azure_kusto/azure_kusto_conf.c @@ -161,8 +161,7 @@ static int parse_storage_resources(struct flb_azure_kusto *ctx, struct flb_confi { jsmn_parser parser; jsmntok_t *t; - jsmntok_t *tokens; - int tok_size = 100; + jsmntok_t *tokens = NULL; int ret = -1; int i; int blob_count = 0; @@ -200,10 +199,18 @@ static int parse_storage_resources(struct flb_azure_kusto *ctx, struct flb_confi } jsmn_init(&parser); - tokens = flb_calloc(1, sizeof(jsmntok_t) * tok_size); + + /* Dynamically allocate memory for tokens based on response length */ + tokens = flb_calloc(1, sizeof(jsmntok_t) * (flb_sds_len(response))); + + if (!tokens) { + flb_errno(); /* Log the error using flb_errno() */ + flb_plg_error(ctx->ins, "failed to allocate memory for tokens"); + return -1; + } if (tokens) { - ret = jsmn_parse(&parser, response, flb_sds_len(response), tokens, tok_size); + ret = jsmn_parse(&parser, response, flb_sds_len(response), tokens, flb_sds_len(response)); if (ret > 0) { /* skip all tokens until we reach "Rows" */ @@ -417,6 +424,24 @@ static flb_sds_t parse_ingestion_identity_token(struct flb_azure_kusto *ctx, return identity_token; } + + +/** + * This method returns random integers from range -600 to +600 which needs to be added + * to the kusto ingestion resources refresh interval to even out the spikes + * in kusto DM for .get ingestion resources upon expiry + * */ +int azure_kusto_generate_random_integer() { + /* Seed the random number generator */ + int pid = getpid(); + unsigned long address = (unsigned long)&address; + unsigned int seed = pid ^ (address & 0xFFFFFFFF) * time(0); + srand(seed); + /* Generate a random integer in the range [-600, 600] */ + int random_integer = rand() % 1201 - 600; + return random_integer; +} + int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx, struct flb_config *config) { @@ -427,22 +452,20 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx, struct flb_upstream_ha *queue_ha = NULL; time_t now; - if (pthread_mutex_lock(&ctx->resources_mutex)) { - flb_plg_error(ctx->ins, "error locking mutex"); - return -1; - } + int generated_random_integer = azure_kusto_generate_random_integer(); + flb_plg_debug(ctx->ins, "generated random integer is %d", generated_random_integer); now = time(NULL); /* check if we have all resources and they are not stale */ if (ctx->resources->blob_ha && ctx->resources->queue_ha && ctx->resources->identity_token && - now - ctx->resources->load_time < FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC) { + now - ctx->resources->load_time < ctx->ingestion_resources_refresh_interval + generated_random_integer) { flb_plg_debug(ctx->ins, "resources are already loaded and are not stale"); ret = 0; } else { - flb_plg_info(ctx->ins, "loading kusto ingestion resourcs"); + flb_plg_info(ctx->ins, "loading kusto ingestion resourcs and refresh interval is %d", ctx->ingestion_resources_refresh_interval + generated_random_integer); response = execute_ingest_csl_command(ctx, ".get ingestion resources"); if (response) { @@ -452,9 +475,19 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx, blob_ha = flb_upstream_ha_create("azure_kusto_blob_ha"); if (blob_ha) { + + if (pthread_mutex_lock(&ctx->resources_mutex)) { + flb_plg_error(ctx->ins, "error locking mutex"); + return -1; + } ret = parse_storage_resources(ctx, config, response, blob_ha, queue_ha); + if (pthread_mutex_unlock(&ctx->resources_mutex)) { + flb_plg_error(ctx->ins, "error unlocking mutex"); + return -1; + } + if (ret == 0) { flb_sds_destroy(response); response = NULL; @@ -463,31 +496,30 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx, execute_ingest_csl_command(ctx, ".get kusto identity token"); if (response) { + if (pthread_mutex_lock(&ctx->resources_mutex)) { + flb_plg_error(ctx->ins, "error locking mutex"); + return -1; + } identity_token = parse_ingestion_identity_token(ctx, response); if (identity_token) { - ret = flb_azure_kusto_resources_clear(ctx->resources); - - if (ret != -1) { ctx->resources->blob_ha = blob_ha; ctx->resources->queue_ha = queue_ha; ctx->resources->identity_token = identity_token; ctx->resources->load_time = now; ret = 0; - } - else { - flb_plg_error( - ctx->ins, - "error destroying previous ingestion resources"); - } } else { flb_plg_error(ctx->ins, "error parsing ingestion identity token"); ret = -1; } + if (pthread_mutex_unlock(&ctx->resources_mutex)) { + flb_plg_error(ctx->ins, "error unlocking mutex"); + return -1; + } } else { flb_plg_error(ctx->ins, "error getting kusto identity token"); @@ -526,11 +558,6 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx, } } - if (pthread_mutex_unlock(&ctx->resources_mutex)) { - flb_plg_error(ctx->ins, "error unlocking mutex"); - return -1; - } - return ret; } diff --git a/plugins/out_azure_kusto/azure_kusto_ingest.c b/plugins/out_azure_kusto/azure_kusto_ingest.c index 3fe8e2921a0..495aa94cb83 100644 --- a/plugins/out_azure_kusto/azure_kusto_ingest.c +++ b/plugins/out_azure_kusto/azure_kusto_ingest.c @@ -23,6 +23,7 @@ #include #include #include +#include #include #include @@ -92,6 +93,14 @@ static flb_sds_t azure_kusto_create_blob_uri(struct flb_azure_kusto *ctx, size_t blob_uri_size; char *blob_sas; size_t blob_sas_size; + const char *extension; + + if (ctx->compression_enabled) { + extension = ".multijson.gz"; + } + else { + extension = ".multijson"; + } ret = flb_hash_table_get(u_node->ht, AZURE_KUSTO_RESOURCE_UPSTREAM_URI, 3, (void **)&blob_uri, &blob_uri_size); @@ -109,11 +118,11 @@ static flb_sds_t azure_kusto_create_blob_uri(struct flb_azure_kusto *ctx, /* uri will be https:////.multijson? */ uri = flb_sds_create_size(flb_sds_len(u_node->host) + blob_uri_size + blob_sas_size + - flb_sds_len(blob_id) + 21); + flb_sds_len(blob_id) + 11 + strlen(extension)); if (uri) { - flb_sds_snprintf(&uri, flb_sds_alloc(uri), "https://%s%s/%s.multijson?%s", - u_node->host, blob_uri, blob_id, blob_sas); + flb_sds_snprintf(&uri, flb_sds_alloc(uri), "https://%s%s/%s%s?%s", + u_node->host, blob_uri, blob_id, extension ,blob_sas); flb_plg_debug(ctx->ins, "created blob uri %s", uri); } else { @@ -137,6 +146,8 @@ static flb_sds_t azure_kusto_create_blob(struct flb_azure_kusto *ctx, flb_sds_t char tmp[64]; int len; + struct timespec ts; + now = time(NULL); gmtime_r(&now, &tm); len = strftime(tmp, sizeof(tmp) - 1, "%a, %d %b %Y %H:%M:%S GMT", &tm); @@ -147,11 +158,26 @@ static flb_sds_t azure_kusto_create_blob(struct flb_azure_kusto *ctx, flb_sds_t return NULL; } + + flb_plg_debug(ctx->ins,"inside blob after upstream ha node get"); + u_node->u->base.net.connect_timeout = ctx->ingestion_endpoint_connect_timeout; + u_conn = flb_upstream_conn_get(u_node->u); if (u_conn) { + if (pthread_mutex_lock(&ctx->blob_mutex)) { + flb_plg_error(ctx->ins, "error locking blob mutex"); + return NULL; + } + + flb_plg_debug(ctx->ins,"inside blob before create blob uri"); uri = azure_kusto_create_blob_uri(ctx, u_node, blob_id); + if (pthread_mutex_unlock(&ctx->blob_mutex)) { + flb_plg_error(ctx->ins, "error unlocking blob mutex"); + return NULL; + } + if (uri) { flb_plg_debug(ctx->ins, "uploading payload to blob uri: %s", uri); c = flb_http_client(u_conn, FLB_HTTP_PUT, uri, payload, payload_size, NULL, 0, @@ -163,6 +189,10 @@ static flb_sds_t azure_kusto_create_blob(struct flb_azure_kusto *ctx, flb_sds_t flb_http_add_header(c, "x-ms-blob-type", 14, "BlockBlob", 9); flb_http_add_header(c, "x-ms-date", 9, tmp, len); flb_http_add_header(c, "x-ms-version", 12, "2019-12-12", 10); + flb_http_add_header(c, "x-ms-client-version", 19, FLB_VERSION_STR, strlen(FLB_VERSION_STR)); + flb_http_add_header(c, "x-ms-app", 8, "Kusto.Fluent-Bit", 16); + flb_http_add_header(c, "x-ms-user", 9, "Kusto.Fluent-Bit", 16); + ret = flb_http_do(c, &resp_size); flb_plg_debug(ctx->ins, @@ -222,24 +252,37 @@ static flb_sds_t create_ingestion_message(struct flb_azure_kusto *ctx, flb_sds_t size_t b64_len; size_t message_len; + + if (pthread_mutex_lock(&ctx->blob_mutex)) { + flb_plg_error(ctx->ins, "error locking blob mutex"); + return NULL; + } + uuid = generate_uuid(); if (uuid) { message = flb_sds_create(NULL); + flb_plg_debug(ctx->ins,"uuid :: %s",uuid); + flb_plg_debug(ctx->ins,"blob uri :: %s",blob_uri); + flb_plg_debug(ctx->ins,"payload size :: %lu",payload_size); + flb_plg_debug(ctx->ins,"database_name :: %s",ctx->database_name); + flb_plg_debug(ctx->ins,"table name :: %s",ctx->table_name); + if (message) { message_len = - flb_sds_snprintf(&message, 0, - "{\"Id\": \"%s\", \"BlobPath\": \"%s\", " - "\"RawDataSize\": %lu, \"DatabaseName\": " - "\"%s\", \"TableName\": \"%s\"," - "\"AdditionalProperties\": { \"format\": \"multijson\", " - "\"authorizationContext\": " - "\"%s\", \"jsonMappingReference\": \"%s\" }}%c", - uuid, blob_uri, payload_size, ctx->database_name, - ctx->table_name, ctx->resources->identity_token, - ctx->ingestion_mapping_reference == NULL - ? "" - : ctx->ingestion_mapping_reference, 0); + flb_sds_snprintf(&message, 0, + "{\"Id\": \"%s\", \"BlobPath\": \"%s\", " + "\"RawDataSize\": %lu, \"DatabaseName\": " + "\"%s\", \"TableName\": \"%s\", " + "\"ClientVersionForTracing\": \"Kusto.Fluent-Bit:%s\", " + "\"ApplicationForTracing\": \"%s\", " + "\"AdditionalProperties\": { \"format\": \"multijson\", " + "\"authorizationContext\": \"%s\", " + "\"jsonMappingReference\": \"%s\" }}%c", + uuid, blob_uri, payload_size, ctx->database_name, + ctx->table_name, FLB_VERSION_STR, "Kusto.Fluent-Bit", + ctx->resources->identity_token, + ctx->ingestion_mapping_reference == NULL ? "" : ctx->ingestion_mapping_reference, 0); if (message_len != -1) { flb_plg_debug(ctx->ins, "created ingestion message:\n%s", message); @@ -281,6 +324,12 @@ static flb_sds_t create_ingestion_message(struct flb_azure_kusto *ctx, flb_sds_t flb_plg_error(ctx->ins, "error generating unique ingestion UUID"); } + + if (pthread_mutex_unlock(&ctx->blob_mutex)) { + flb_plg_error(ctx->ins, "error unlocking blob mutex"); + return NULL; + } + return message; } @@ -347,12 +396,23 @@ static int azure_kusto_enqueue_ingestion(struct flb_azure_kusto *ctx, flb_sds_t flb_plg_error(ctx->ins, "error getting queue upstream"); return -1; } + + u_node->u->base.net.connect_timeout = ctx->ingestion_endpoint_connect_timeout; u_conn = flb_upstream_conn_get(u_node->u); if (u_conn) { + if (pthread_mutex_lock(&ctx->blob_mutex)) { + flb_plg_error(ctx->ins, "error unlocking mutex"); + return -1; + } uri = azure_kusto_create_queue_uri(ctx, u_node); + if (pthread_mutex_unlock(&ctx->blob_mutex)) { + flb_plg_error(ctx->ins, "error unlocking mutex"); + return -1; + } + if (uri) { payload = create_ingestion_message(ctx, blob_uri, payload_size); @@ -366,6 +426,9 @@ static int azure_kusto_enqueue_ingestion(struct flb_azure_kusto *ctx, flb_sds_t 20); flb_http_add_header(c, "x-ms-date", 9, tmp, len); flb_http_add_header(c, "x-ms-version", 12, "2019-12-12", 10); + flb_http_add_header(c, "x-ms-client-version", 19, FLB_VERSION_STR, strlen(FLB_VERSION_STR)); + flb_http_add_header(c, "x-ms-app", 8, "Kusto.Fluent-Bit", 16); + flb_http_add_header(c, "x-ms-user", 9, "Kusto.Fluent-Bit", 16); ret = flb_http_do(c, &resp_size); flb_plg_debug(ctx->ins, @@ -466,9 +529,21 @@ int azure_kusto_queued_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag, flb_sds_t blob_id; flb_sds_t blob_uri; + + if (pthread_mutex_lock(&ctx->blob_mutex)) { + flb_plg_error(ctx->ins, "error unlocking mutex"); + return -1; + } + /* flb________ */ blob_id = azure_kusto_create_blob_id(ctx, tag, tag_len); + + if (pthread_mutex_unlock(&ctx->blob_mutex)) { + flb_plg_error(ctx->ins, "error unlocking mutex"); + return -1; + } + if (blob_id) { blob_uri = azure_kusto_create_blob(ctx, blob_id, payload, payload_size); diff --git a/plugins/out_calyptia/calyptia.c b/plugins/out_calyptia/calyptia.c index aa760e2fa5e..c16fd31d6ee 100644 --- a/plugins/out_calyptia/calyptia.c +++ b/plugins/out_calyptia/calyptia.c @@ -322,8 +322,24 @@ static int calyptia_http_do(struct flb_calyptia *ctx, struct flb_http_client *c, int ret; size_t b_sent; + if( !ctx || !c ) { + return FLB_ERROR; + } + + /* Ensure agent_token is not empty when required */ + if ((type == CALYPTIA_ACTION_METRICS || type == CALYPTIA_ACTION_PATCH || type == CALYPTIA_ACTION_TRACE) && + !ctx->agent_token) { + flb_plg_warn(ctx->ins, "agent_token is missing for action type %d", type); + return FLB_ERROR; + } + /* append headers */ if (type == CALYPTIA_ACTION_REGISTER) { + // When registering a new agent api key is required + if (!ctx->api_key) { + flb_plg_error(ctx->ins, "api_key is missing"); + return FLB_ERROR; + } flb_http_add_header(c, CALYPTIA_H_CTYPE, sizeof(CALYPTIA_H_CTYPE) - 1, CALYPTIA_H_CTYPE_JSON, sizeof(CALYPTIA_H_CTYPE_JSON) - 1); @@ -721,6 +737,21 @@ static struct flb_calyptia *config_init(struct flb_output_instance *ins, return NULL; } + ctx->metrics_endpoint = flb_sds_create_size(256); + if (!ctx->metrics_endpoint) { + flb_free(ctx); + return NULL; + } + +#ifdef FLB_HAVE_CHUNK_TRACE + ctx->trace_endpoint = flb_sds_create_size(256); + if (!ctx->trace_endpoint) { + flb_sds_destroy(ctx->metrics_endpoint); + flb_free(ctx); + return NULL; + } +#endif + /* api_key */ if (!ctx->api_key) { flb_plg_error(ctx->ins, "configuration 'api_key' is missing"); @@ -771,12 +802,40 @@ static struct flb_calyptia *config_init(struct flb_output_instance *ins, return ctx; } -static int cb_calyptia_init(struct flb_output_instance *ins, - struct flb_config *config, void *data) +static int register_agent(struct flb_calyptia *ctx, struct flb_config *config) { int ret; + + /* Try registration */ + ret = api_agent_create(config, ctx); + if (ret != FLB_OK) { + flb_plg_warn(ctx->ins, "agent registration failed"); + return FLB_ERROR; + } + + /* Update endpoints */ + flb_sds_len_set(ctx->metrics_endpoint, 0); + flb_sds_printf(&ctx->metrics_endpoint, CALYPTIA_ENDPOINT_METRICS, + ctx->agent_id); + +#ifdef FLB_HAVE_CHUNK_TRACE + if (ctx->pipeline_id) { + flb_sds_len_set(ctx->trace_endpoint, 0); + flb_sds_printf(&ctx->trace_endpoint, CALYPTIA_ENDPOINT_TRACE, + ctx->pipeline_id); + } +#endif + + flb_plg_info(ctx->ins, "agent registration successful"); + return FLB_OK; +} + +static int cb_calyptia_init(struct flb_output_instance *ins, + struct flb_config *config, void *data) +{ struct flb_calyptia *ctx; (void) data; + int ret; /* create config context */ ctx = config_init(ins, config); @@ -791,23 +850,12 @@ static int cb_calyptia_init(struct flb_output_instance *ins, */ flb_output_set_http_debug_callbacks(ins); - /* register/update agent */ - ret = api_agent_create(config, ctx); - if (ret != FLB_OK) { - flb_plg_error(ctx->ins, "agent registration failed"); + ret = register_agent(ctx, config); + if (ret != FLB_OK && !ctx->register_retry_on_flush) { + flb_plg_error(ins, "agent registration failed and register_retry_on_flush=false"); return -1; } - /* metrics endpoint */ - ctx->metrics_endpoint = flb_sds_create_size(256); - flb_sds_printf(&ctx->metrics_endpoint, CALYPTIA_ENDPOINT_METRICS, - ctx->agent_id); - -#ifdef FLB_HAVE_CHUNK_TRACE - ctx->trace_endpoint = flb_sds_create_size(256); - flb_sds_printf(&ctx->trace_endpoint, CALYPTIA_ENDPOINT_TRACE, - ctx->pipeline_id); -#endif /* FLB_HAVE_CHUNK_TRACE */ return 0; } @@ -830,29 +878,79 @@ static void debug_payload(struct flb_calyptia *ctx, void *data, size_t bytes) cmt_destroy(cmt); } -static void cb_calyptia_flush(struct flb_event_chunk *event_chunk, - struct flb_output_flush *out_flush, - struct flb_input_instance *i_ins, - void *out_context, - struct flb_config *config) +static int cb_calyptia_exit(void *data, struct flb_config *config) { - int ret = FLB_RETRY; - size_t off = 0; - size_t out_size = 0; - char *out_buf = NULL; + struct flb_calyptia *ctx = data; + + if (!ctx) { + return 0; + } + + if (ctx->u) { + flb_upstream_destroy(ctx->u); + } + + if (ctx->agent_id) { + flb_sds_destroy(ctx->agent_id); + } + + if (ctx->agent_token) { + flb_sds_destroy(ctx->agent_token); + } + + if (ctx->env) { + flb_env_destroy(ctx->env); + } + + if (ctx->metrics_endpoint) { + flb_sds_destroy(ctx->metrics_endpoint); + } -/* used to create records for reporting traces to the cloud. */ #ifdef FLB_HAVE_CHUNK_TRACE - flb_sds_t json; + if (ctx->trace_endpoint) { + flb_sds_destroy(ctx->trace_endpoint); + } #endif /* FLB_HAVE_CHUNK_TRACE */ + if (ctx->fs) { + flb_fstore_destroy(ctx->fs); + } + + flb_kv_release(&ctx->kv_labels); + flb_free(ctx); + + return 0; +} + +static void cb_calyptia_flush(struct flb_event_chunk *event_chunk, + struct flb_output_flush *out_flush, + struct flb_input_instance *i_ins, + void *out_context, + struct flb_config *config) +{ + int ret; + size_t off = 0; + size_t out_size = 0; + char *out_buf = NULL; struct flb_connection *u_conn; struct flb_http_client *c = NULL; struct flb_calyptia *ctx = out_context; struct cmt *cmt; + flb_sds_t json; (void) i_ins; (void) config; + if ((!ctx->agent_id || !ctx->agent_token) && ctx->register_retry_on_flush) { + flb_plg_info(ctx->ins, "missing agent_id or agent_token, attempting re-registration register_retry_on_flush=true"); + if (register_agent(ctx, config) != FLB_OK) { + FLB_OUTPUT_RETURN(FLB_RETRY); + } + } + else if (!ctx->agent_id || !ctx->agent_token) { + flb_plg_error(ctx->ins, "missing agent_id or agent_token, and register_retry_on_flush=false"); + FLB_OUTPUT_RETURN(FLB_ERROR); + } + /* Get upstream connection */ u_conn = flb_upstream_conn_get(ctx->u); if (!u_conn) { @@ -890,7 +988,7 @@ static void cb_calyptia_flush(struct flb_event_chunk *event_chunk, /* Compose HTTP Client request */ c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->metrics_endpoint, - out_buf, out_size, NULL, 0, NULL, 0); + out_buf, out_size, NULL, 0, NULL, 0); if (!c) { if (out_buf != event_chunk->data) { cmt_encode_msgpack_destroy(out_buf); @@ -899,12 +997,12 @@ static void cb_calyptia_flush(struct flb_event_chunk *event_chunk, FLB_OUTPUT_RETURN(FLB_RETRY); } - /* perform request: 'ret' might be FLB_OK, FLB_ERROR or FLB_RETRY */ + /* perform request */ ret = calyptia_http_do(ctx, c, CALYPTIA_ACTION_METRICS); if (ret == FLB_OK) { flb_plg_debug(ctx->ins, "metrics delivered OK"); } - else if (ret == FLB_ERROR) { + else { flb_plg_error(ctx->ins, "could not deliver metrics"); debug_payload(ctx, out_buf, out_size); } @@ -915,42 +1013,35 @@ static void cb_calyptia_flush(struct flb_event_chunk *event_chunk, } #ifdef FLB_HAVE_CHUNK_TRACE - if (event_chunk->type == (FLB_EVENT_TYPE_LOGS | FLB_EVENT_TYPE_HAS_TRACE)) { + if (event_chunk->type & FLB_EVENT_TYPE_LOGS && + event_chunk->type & FLB_EVENT_TYPE_HAS_TRACE) { json = flb_pack_msgpack_to_json_format(event_chunk->data, - event_chunk->size, - FLB_PACK_JSON_FORMAT_STREAM, - FLB_PACK_JSON_DATE_DOUBLE, - NULL); + event_chunk->size, + FLB_PACK_JSON_FORMAT_STREAM, + FLB_PACK_JSON_DATE_DOUBLE, + NULL); if (json == NULL) { flb_upstream_conn_release(u_conn); FLB_OUTPUT_RETURN(FLB_RETRY); } - out_buf = (char *)json; - out_size = flb_sds_len(json); - if (flb_sds_printf(&ctx->metrics_endpoint, CALYPTIA_ENDPOINT_METRICS, - ctx->agent_id) == NULL) { - flb_upstream_conn_release(u_conn); - flb_sds_destroy(json); - FLB_OUTPUT_RETURN(FLB_RETRY); - } c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->trace_endpoint, - out_buf, out_size, NULL, 0, NULL, 0); + (char *) json, flb_sds_len(json), + NULL, 0, NULL, 0); + if (!c) { flb_upstream_conn_release(u_conn); flb_sds_destroy(json); - flb_sds_destroy(ctx->metrics_endpoint); FLB_OUTPUT_RETURN(FLB_RETRY); } - /* perform request: 'ret' might be FLB_OK, FLB_ERROR or FLB_RETRY */ ret = calyptia_http_do(ctx, c, CALYPTIA_ACTION_TRACE); if (ret == FLB_OK) { flb_plg_debug(ctx->ins, "trace delivered OK"); } - else if (ret == FLB_ERROR) { + else { flb_plg_error(ctx->ins, "could not deliver trace"); - debug_payload(ctx, out_buf, out_size); + debug_payload(ctx, (char *) json, flb_sds_len(json)); } flb_sds_destroy(json); } @@ -961,51 +1052,8 @@ static void cb_calyptia_flush(struct flb_event_chunk *event_chunk, if (c) { flb_http_client_destroy(c); } - FLB_OUTPUT_RETURN(ret); -} - -static int cb_calyptia_exit(void *data, struct flb_config *config) -{ - struct flb_calyptia *ctx = data; - - if (!ctx) { - return 0; - } - - if (ctx->u) { - flb_upstream_destroy(ctx->u); - } - - if (ctx->agent_id) { - flb_sds_destroy(ctx->agent_id); - } - - if (ctx->agent_token) { - flb_sds_destroy(ctx->agent_token); - } - - if (ctx->env) { - flb_env_destroy(ctx->env); - } - - if (ctx->metrics_endpoint) { - flb_sds_destroy(ctx->metrics_endpoint); - } - -#ifdef FLB_HAVE_CHUNK_TRACE - if (ctx->trace_endpoint) { - flb_sds_destroy(ctx->trace_endpoint); - } -#endif /* FLB_HAVE_CHUNK_TRACE */ - - if (ctx->fs) { - flb_fstore_destroy(ctx->fs); - } - - flb_kv_release(&ctx->kv_labels); - flb_free(ctx); - return 0; + FLB_OUTPUT_RETURN(ret); } /* Configuration properties map */ @@ -1057,7 +1105,11 @@ static struct flb_config_map config_map[] = { "Pipeline ID for calyptia core traces." }, #endif - + { + FLB_CONFIG_MAP_BOOL, "register_retry_on_flush", "true", + 0, FLB_TRUE, offsetof(struct flb_calyptia, register_retry_on_flush), + "Retry agent registration on flush if failed on init." + }, /* EOF */ {0} }; diff --git a/plugins/out_calyptia/calyptia.h b/plugins/out_calyptia/calyptia.h index ee37d8778dc..0532e4a2b01 100644 --- a/plugins/out_calyptia/calyptia.h +++ b/plugins/out_calyptia/calyptia.h @@ -80,6 +80,7 @@ struct flb_calyptia { flb_sds_t trace_endpoint; flb_sds_t pipeline_id; #endif /* FLB_HAVE_CHUNK_TRACE */ + bool register_retry_on_flush; /* retry registration on flush if failed */ }; #endif diff --git a/plugins/out_es/es.c b/plugins/out_es/es.c index b0773a40991..c896b1ed258 100644 --- a/plugins/out_es/es.c +++ b/plugins/out_es/es.c @@ -985,6 +985,78 @@ static void cb_es_flush(struct flb_event_chunk *event_chunk, FLB_OUTPUT_RETURN(FLB_RETRY); } +static int elasticsearch_response_test(struct flb_config *config, + void *plugin_context, + int status, + const void *data, size_t bytes, + void **out_data, size_t *out_size) +{ + int ret = 0; + struct flb_elasticsearch *ctx = plugin_context; + struct flb_connection *u_conn; + struct flb_http_client *c; + size_t b_sent; + + /* Not retrieve upstream connection */ + u_conn = NULL; + + /* Compose HTTP Client request (dummy client) */ + c = flb_http_dummy_client(u_conn, FLB_HTTP_POST, ctx->uri, + NULL, 0, NULL, 0, NULL, 0); + + flb_http_buffer_size(c, ctx->buffer_size); + + /* Just stubbing the HTTP responses */ + flb_http_set_response_test(c, "response", data, bytes, status, NULL, NULL); + + ret = flb_http_do(c, &b_sent); + if (ret != 0) { + flb_plg_warn(ctx->ins, "http_do=%i URI=%s", ret, ctx->uri); + goto error; + } + if (ret != 0) { + flb_plg_warn(ctx->ins, "http_do=%i URI=%s", ret, ctx->uri); + goto error; + } + else { + /* The request was issued successfully, validate the 'error' field */ + flb_plg_debug(ctx->ins, "HTTP Status=%i URI=%s", c->resp.status, ctx->uri); + if (c->resp.status != 200 && c->resp.status != 201) { + if (c->resp.payload_size > 0) { + flb_plg_error(ctx->ins, "HTTP status=%i URI=%s, response:\n%s\n", + c->resp.status, ctx->uri, c->resp.payload); + } + else { + flb_plg_error(ctx->ins, "HTTP status=%i URI=%s", + c->resp.status, ctx->uri); + } + goto error; + } + + if (c->resp.payload_size > 0) { + /* + * Elasticsearch payload should be JSON, we convert it to msgpack + * and lookup the 'error' field. + */ + ret = elasticsearch_error_check(ctx, c); + } + else { + goto error; + } + } + + /* Cleanup */ + flb_http_client_destroy(c); + + return ret; + +error: + /* Cleanup */ + flb_http_client_destroy(c); + + return -2; +} + static int cb_es_exit(void *data, struct flb_config *config) { struct flb_elasticsearch *ctx = data; @@ -1231,6 +1303,7 @@ struct flb_output_plugin out_es_plugin = { /* Test */ .test_formatter.callback = elasticsearch_format, + .test_response.callback = elasticsearch_response_test, /* Plugin flags */ .flags = FLB_OUTPUT_NET | FLB_IO_OPT_TLS, diff --git a/plugins/out_opentelemetry/opentelemetry.c b/plugins/out_opentelemetry/opentelemetry.c index b79d78adafd..ae614a9c575 100644 --- a/plugins/out_opentelemetry/opentelemetry.c +++ b/plugins/out_opentelemetry/opentelemetry.c @@ -227,6 +227,7 @@ int opentelemetry_post(struct opentelemetry_context *ctx, const char *compression_algorithm; uint32_t wire_message_length; size_t grpc_body_length; + cfl_sds_t sds_result; cfl_sds_t grpc_body; struct flb_http_response *response; struct flb_http_request *request; @@ -257,23 +258,46 @@ int opentelemetry_post(struct opentelemetry_context *ctx, return FLB_RETRY; } - if (request->protocol_version == HTTP_PROTOCOL_VERSION_20) { + if (request->protocol_version == HTTP_PROTOCOL_VERSION_20 && + ctx->enable_grpc_flag) { grpc_body = cfl_sds_create_size(body_len + 5); if (grpc_body == NULL) { + flb_http_client_request_destroy(request, FLB_TRUE); + return FLB_RETRY; } wire_message_length = (uint32_t) body_len; - cfl_sds_cat(grpc_body, "\x00----", 5); + sds_result = cfl_sds_cat(grpc_body, "\x00----", 5); + + if (sds_result == NULL) { + flb_http_client_request_destroy(request, FLB_TRUE); + + cfl_sds_destroy(grpc_body); + + return FLB_RETRY; + } + + grpc_body = sds_result; ((uint8_t *) grpc_body)[1] = (wire_message_length & 0xFF000000) >> 24; ((uint8_t *) grpc_body)[2] = (wire_message_length & 0x00FF0000) >> 16; ((uint8_t *) grpc_body)[3] = (wire_message_length & 0x0000FF00) >> 8; ((uint8_t *) grpc_body)[4] = (wire_message_length & 0x000000FF) >> 0; - cfl_sds_cat(grpc_body, body, body_len); + sds_result = cfl_sds_cat(grpc_body, body, body_len); + + if (sds_result == NULL) { + flb_http_client_request_destroy(request, FLB_TRUE); + + cfl_sds_destroy(grpc_body); + + return FLB_RETRY; + } + + grpc_body = sds_result; grpc_body_length = cfl_sds_len(grpc_body); @@ -353,17 +377,24 @@ int opentelemetry_post(struct opentelemetry_context *ctx, * - 205: Reset content * */ + if (response->status < 200 || response->status > 205) { if (ctx->log_response_payload && response->body != NULL && cfl_sds_len(response->body) > 0) { - flb_plg_error(ctx->ins, "%s:%i, HTTP status=%i\n%s", - ctx->host, ctx->port, - response->status, response->body); + flb_plg_error(ctx->ins, + "%s:%i, HTTP status=%i\n%s", + ctx->host, + ctx->port, + response->status, + response->body); } else { - flb_plg_error(ctx->ins, "%s:%i, HTTP status=%i", - ctx->host, ctx->port, response->status); + flb_plg_error(ctx->ins, + "%s:%i, HTTP status=%i", + ctx->host, + ctx->port, + response->status); } out_ret = FLB_RETRY; @@ -676,6 +707,11 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct opentelemetry_context, enable_http2), "Enable, disable or force HTTP/2 usage. Accepted values : on, off, force" }, + { + FLB_CONFIG_MAP_BOOL, "grpc", "off", + 0, FLB_TRUE, offsetof(struct opentelemetry_context, enable_grpc_flag), + "Enable, disable or force gRPC usage. Accepted values : on, off, auto" + }, { FLB_CONFIG_MAP_STR, "proxy", NULL, 0, FLB_FALSE, 0, diff --git a/plugins/out_opentelemetry/opentelemetry.h b/plugins/out_opentelemetry/opentelemetry.h index cb25e6da055..1041409e0ca 100644 --- a/plugins/out_opentelemetry/opentelemetry.h +++ b/plugins/out_opentelemetry/opentelemetry.h @@ -46,6 +46,7 @@ struct opentelemetry_body_key { struct opentelemetry_context { int enable_http2_flag; char *enable_http2; + int enable_grpc_flag; /* HTTP Auth */ char *http_user; diff --git a/plugins/out_prometheus_remote_write/remote_write.c b/plugins/out_prometheus_remote_write/remote_write.c index 7d3dc64ba40..85cfe9840cb 100644 --- a/plugins/out_prometheus_remote_write/remote_write.c +++ b/plugins/out_prometheus_remote_write/remote_write.c @@ -446,7 +446,7 @@ static struct flb_config_map config_map[] = { { FLB_CONFIG_MAP_BOOL, "log_response_payload", "true", 0, FLB_TRUE, offsetof(struct prometheus_remote_write_context, log_response_payload), - "Specify if the response paylod should be logged or not" + "Specify if the response payload should be logged or not" }, /* EOF */ {0} diff --git a/plugins/out_prometheus_remote_write/remote_write.h b/plugins/out_prometheus_remote_write/remote_write.h index f24d53b88d7..678c64022be 100644 --- a/plugins/out_prometheus_remote_write/remote_write.h +++ b/plugins/out_prometheus_remote_write/remote_write.h @@ -60,7 +60,7 @@ struct prometheus_remote_write_context { const char *compression; - /* Log the response paylod */ + /* Log the response payload */ int log_response_payload; /* config reader for 'add_label' */ diff --git a/plugins/out_stackdriver/stackdriver.c b/plugins/out_stackdriver/stackdriver.c index 903bffce98d..99ef657a0b4 100644 --- a/plugins/out_stackdriver/stackdriver.c +++ b/plugins/out_stackdriver/stackdriver.c @@ -357,6 +357,7 @@ static flb_sds_t get_google_token(struct flb_stackdriver *ctx) int ret = 0; flb_sds_t output = NULL; time_t cached_expiration = 0; + time_t current_timestamp = 0; ret = pthread_mutex_trylock(&ctx->token_mutex); if (ret == EBUSY) { @@ -369,7 +370,9 @@ static flb_sds_t get_google_token(struct flb_stackdriver *ctx) */ output = oauth2_cache_to_token(); cached_expiration = oauth2_cache_get_expiration(); - if (time(NULL) >= cached_expiration) { + current_timestamp = time(NULL); + + if (current_timestamp < cached_expiration) { return output; } else { /* diff --git a/plugins/processor_labels/labels.c b/plugins/processor_labels/labels.c index ee34e639f25..a0afd8314d4 100644 --- a/plugins/processor_labels/labels.c +++ b/plugins/processor_labels/labels.c @@ -1713,44 +1713,41 @@ static int cb_process_metrics(struct flb_processor_instance *processor_instance, return FLB_PROCESSOR_FAILURE; } - result = delete_labels(metrics_context, + result = cmt_cat(out_cmt, metrics_context); + if (result != 0) { + cmt_destroy(out_cmt); + + return FLB_PROCESSOR_FAILURE; + } + + result = delete_labels(out_cmt, &processor_context->delete_labels); if (result == FLB_PROCESSOR_SUCCESS) { - result = update_labels(metrics_context, + result = update_labels(out_cmt, &processor_context->update_labels); } if (result == FLB_PROCESSOR_SUCCESS) { - result = upsert_labels(metrics_context, + result = upsert_labels(out_cmt, &processor_context->upsert_labels); } if (result == FLB_PROCESSOR_SUCCESS) { - result = insert_labels(metrics_context, + result = insert_labels(out_cmt, &processor_context->insert_labels); } if (result == FLB_PROCESSOR_SUCCESS) { - result = hash_labels(metrics_context, + result = hash_labels(out_cmt, &processor_context->hash_labels); } - if (result == FLB_PROCESSOR_SUCCESS) { - result = cmt_cat(out_cmt, metrics_context); - if (result != 0) { - cmt_destroy(out_cmt); - - return FLB_PROCESSOR_FAILURE; - } - - *out_context = out_cmt; - } - if (result != FLB_PROCESSOR_SUCCESS) { return FLB_PROCESSOR_FAILURE; } + *out_context = out_cmt; return FLB_PROCESSOR_SUCCESS; } diff --git a/run_code_analysis.sh b/run_code_analysis.sh index 22adc47f7ef..ecd6fdd4bd5 100755 --- a/run_code_analysis.sh +++ b/run_code_analysis.sh @@ -31,11 +31,20 @@ elif [[ ! -f "$SOURCE_DIR"/CMakeLists.txt ]]; then exit 1 fi +machine_id_file="$(mktemp)" +< /dev/urandom tr -dc 'a-zA-Z0-9' | fold -w 32 | head -n 1 > "${machine_id_file}" + +exit_code=0 # Run the action we want on it but using an in-container build directory to prevent various permissions errors and files locally "$CONTAINER_RUNTIME" run --rm -t -w "/tmp/source" -v "${SOURCE_DIR}:/source:ro" \ + -v "${machine_id_file}:/etc/machine-id:ro" \ -e INPUT_PRESET="$TEST_PRESET" \ -e INPUT_DEPENDENCIES_DEBIAN="$ADDITIONAL_DEPS" \ -e INPUT_CMAKEFLAGS="$FLB_CMAKE_OPTIONS $SKIP" \ -e INPUT_PRE_COMMAND="cp -R /source /tmp" \ -e INPUT_WORKING-DIRECTORY="/tmp/source" \ - lpenz/ghaction-cmake:0.19 + lpenz/ghaction-cmake:0.19 \ + || exit_code=$? + +rm -f "${machine_id_file}" +exit "${exit_code}" diff --git a/snap/snapcraft.yaml b/snap/snapcraft.yaml index d76e4901c9e..584dd52571e 100644 --- a/snap/snapcraft.yaml +++ b/snap/snapcraft.yaml @@ -1,6 +1,6 @@ name: fluent-bit base: core18 -version: '3.2.1' +version: '3.2.3' summary: High performance logs and stream processor description: | Fluent Bit is a high performance log processor and stream processor for Linux. diff --git a/src/flb_http_client.c b/src/flb_http_client.c index 80a643678ee..54113fd4bcf 100644 --- a/src/flb_http_client.c +++ b/src/flb_http_client.c @@ -637,7 +637,7 @@ static int add_host_and_content_length(struct flb_http_client *c) return 0; } -struct flb_http_client *flb_http_client(struct flb_connection *u_conn, +struct flb_http_client *create_http_client(struct flb_connection *u_conn, int method, const char *uri, const char *body, size_t body_len, const char *host, int port, @@ -749,24 +749,81 @@ struct flb_http_client *flb_http_client(struct flb_connection *u_conn, c->query_string = p; } - /* Is Upstream connection using keepalive mode ? */ - if (flb_stream_get_flag_status(&u_conn->upstream->base, FLB_IO_TCP_KA)) { - c->flags |= FLB_HTTP_KA; - } - /* Response */ c->resp.content_length = -1; c->resp.connection_close = -1; - if ((flags & FLB_HTTP_10) == 0) { - c->flags |= FLB_HTTP_11; - } - if (body && body_len > 0) { c->body_buf = body; c->body_len = body_len; } + /* 'Read' buffer size */ + c->resp.data = flb_malloc(FLB_HTTP_DATA_SIZE_MAX); + if (!c->resp.data) { + flb_errno(); + flb_http_client_destroy(c); + return NULL; + } + c->resp.data[0] = '\0'; + c->resp.data_len = 0; + c->resp.data_size = FLB_HTTP_DATA_SIZE_MAX; + c->resp.data_size_max = FLB_HTTP_DATA_SIZE_MAX; + + /* Tests */ + c->test_mode = FLB_FALSE; + c->test_response.callback = NULL; + + return c; +} + +struct flb_http_client *flb_http_dummy_client(struct flb_connection *u_conn, + int method, const char *uri, + const char *body, size_t body_len, + const char *host, int port, + const char *proxy, int flags) +{ + struct flb_http_client *c; + + c = create_http_client(u_conn, method, uri, + body, body_len, + host, port, + proxy, flags); + + if (!c) { + return NULL; + } + + return c; +} + +struct flb_http_client *flb_http_client(struct flb_connection *u_conn, + int method, const char *uri, + const char *body, size_t body_len, + const char *host, int port, + const char *proxy, int flags) +{ + int ret; + struct flb_http_client *c; + + c = create_http_client(u_conn, method, uri, + body, body_len, + host, port, + proxy, flags); + + if (!c) { + return NULL; + } + + /* Is Upstream connection using keepalive mode ? */ + if (flb_stream_get_flag_status(&u_conn->upstream->base, FLB_IO_TCP_KA)) { + c->flags |= FLB_HTTP_KA; + } + + if ((flags & FLB_HTTP_10) == 0) { + c->flags |= FLB_HTTP_11; + } + add_host_and_content_length(c); /* Check proxy data */ @@ -780,18 +837,6 @@ struct flb_http_client *flb_http_client(struct flb_connection *u_conn, } } - /* 'Read' buffer size */ - c->resp.data = flb_malloc(FLB_HTTP_DATA_SIZE_MAX); - if (!c->resp.data) { - flb_errno(); - flb_http_client_destroy(c); - return NULL; - } - c->resp.data[0] = '\0'; - c->resp.data_len = 0; - c->resp.data_size = FLB_HTTP_DATA_SIZE_MAX; - c->resp.data_size_max = FLB_HTTP_DATA_SIZE_MAX; - return c; } @@ -1075,6 +1120,91 @@ int flb_http_set_callback_context(struct flb_http_client *c, return 0; } +int flb_http_set_response_test(struct flb_http_client *c, char *test_name, + const void *data, size_t len, + int status, + void (*resp_callback) (void *, int, void *, size_t, void *), + void *resp_callback_data) +{ + if (!c) { + return -1; + } + + /* + * Enabling a test, set the http_client instance in 'test' mode, so no real + * http request is invoked, only the desired implemented test. + */ + + /* Response test */ + if (strcmp(test_name, "response") == 0) { + c->test_mode = FLB_TRUE; + c->test_response.rt_ctx = c; + c->test_response.rt_status = status; + c->test_response.rt_resp_callback = resp_callback; + c->test_response.rt_data = resp_callback_data; + if (data != NULL && len > 0) { + c->resp.payload = (char *)data; + c->resp.payload_size = len; + c->resp.status = status; + } + } + else { + return -1; + } + + return 0; +} + +static int flb_http_run_response_test(struct flb_http_client *c, + const void *data, size_t len) +{ + int ret = 0; + void *out_buf = NULL; + size_t out_size = 0; + struct flb_test_http_response *htr; + + if (!c) { + return -1; + } + + htr = &c->test_response; + + /* Invoke the output plugin formatter test callback */ + ret = htr->callback(c, + data, len, + &out_buf, &out_size); + + /* Call the runtime test callback checker */ + if (htr->rt_resp_callback) { + htr->rt_resp_callback(htr->rt_ctx, + ret, + out_buf, out_size, + htr->rt_data); + } + else { + flb_free(out_buf); + } + + return 0; +} + +/* Push some response into the http client */ +static int flb_http_stub_response(struct flb_http_client *c) +{ + int ret = 0; + + if (!c) { + return -1; + } + + /* If http client's test_responses is registered, run the stub. */ + if (c->test_response.callback != NULL && c->resp.payload != NULL) { + ret = flb_http_run_response_test(c, c->resp.payload, c->resp.payload_size); + } + + return ret; +} + int flb_http_add_auth_header(struct flb_http_client *c, const char *user, const char *passwd, const char *header) { int ret; @@ -1367,6 +1497,10 @@ int flb_http_do(struct flb_http_client *c, size_t *bytes) { int ret; + if (c->test_mode == FLB_TRUE) { + return flb_http_stub_response(c); + } + ret = flb_http_do_request(c, bytes); if (ret != 0) { return ret; @@ -1510,6 +1644,12 @@ int flb_http_client_ng_init(struct flb_http_client_ng *client, { memset(client, 0, sizeof(struct flb_http_client_ng)); + client->temporary_buffer = cfl_sds_create_size(HTTP_CLIENT_TEMPORARY_BUFFER_SIZE); + + if (client->temporary_buffer == NULL) { + return -1; + } + client->protocol_version = protocol_version; client->upstream_ha = upstream_ha; client->upstream = upstream; @@ -1583,6 +1723,12 @@ void flb_http_client_ng_destroy(struct flb_http_client_ng *client) FLB_LOCK_INFINITE_RETRY_LIMIT, FLB_LOCK_DEFAULT_RETRY_DELAY); + if (client->temporary_buffer != NULL) { + cfl_sds_destroy(client->temporary_buffer); + + client->temporary_buffer = NULL; + } + cfl_list_foreach_safe(iterator, iterator_backup, &client->sessions) { @@ -1701,6 +1847,7 @@ struct flb_http_client_session *flb_http_client_session_begin(struct flb_http_cl int protocol_version; struct flb_upstream_node *upstream_node; struct flb_connection *connection; + struct flb_upstream *upstream; struct flb_http_client_session *session; const char *alpn; @@ -1711,11 +1858,15 @@ struct flb_http_client_session *flb_http_client_session_begin(struct flb_http_cl return NULL; } + upstream = upstream_node->u; + connection = flb_upstream_conn_get(upstream_node->u); } else { upstream_node = NULL; + upstream = client->upstream; + connection = flb_upstream_conn_get(client->upstream); } @@ -1747,6 +1898,10 @@ struct flb_http_client_session *flb_http_client_session_begin(struct flb_http_cl protocol_version = HTTP_PROTOCOL_VERSION_11; } + if (protocol_version == HTTP_PROTOCOL_VERSION_20) { + flb_stream_disable_keepalive(&upstream->base); + } + session = flb_http_client_session_create(client, protocol_version, connection); if (session == NULL) { @@ -1932,20 +2087,20 @@ struct flb_http_response *flb_http_client_request_execute(struct flb_http_reques static int flb_http_client_session_read(struct flb_http_client_session *session) { - unsigned char input_buffer[1024 * 65]; ssize_t result; result = flb_io_net_read(session->connection, - (void *) &input_buffer, - sizeof(input_buffer)); + (void *) session->parent->temporary_buffer, + cfl_sds_avail(session->parent->temporary_buffer)); if (result <= 0) { return -1; } - result = (ssize_t) flb_http_client_session_ingest(session, - input_buffer, - result); + result = (ssize_t) flb_http_client_session_ingest( + session, + (unsigned char *) session->parent->temporary_buffer, + result); if (result < 0) { return -2; @@ -2204,6 +2359,11 @@ int flb_http_request_set_authorization(struct flb_http_request *request, return -1; } } + else { + va_end(arguments); + + return -1; + } va_end(arguments); diff --git a/src/flb_http_client_http1.c b/src/flb_http_client_http1.c index e09950c70fb..399ccc48c5e 100644 --- a/src/flb_http_client_http1.c +++ b/src/flb_http_client_http1.c @@ -479,6 +479,8 @@ int flb_http1_request_commit(struct flb_http_request *request) return -7; } + request_buffer = sds_result; + if (request->body != NULL) { sds_result = cfl_sds_cat(request_buffer, request->body, @@ -526,6 +528,9 @@ static int compose_request_line(cfl_sds_t *output_buffer, else if (request->protocol_version == HTTP_PROTOCOL_VERSION_09) { protocol_version_string = ""; } + else { + return -1; + } method_name = flb_http_get_method_string_from_id(request->method); diff --git a/src/flb_http_client_http2.c b/src/flb_http_client_http2.c index d2d074cc23e..de22b62d151 100644 --- a/src/flb_http_client_http2.c +++ b/src/flb_http_client_http2.c @@ -19,6 +19,7 @@ #define _GNU_SOURCE #include +#include #include #include @@ -285,27 +286,24 @@ static int http2_data_chunk_recv_callback(nghttp2_session *inner_session, return -1; } - memcpy(stream->response.body, data, len); + cfl_sds_set_len(stream->response.body, 0); - cfl_sds_set_len(stream->response.body, len); - - stream->response.body_read_offset = len; + stream->response.body_read_offset = 0; } - else { - resized_buffer = cfl_sds_cat(stream->response.body, - (const char *) data, - len); - if (resized_buffer == NULL) { - stream->status = HTTP_STREAM_STATUS_ERROR; + resized_buffer = cfl_sds_cat(stream->response.body, + (const char *) data, + len); - return -1; - } + if (resized_buffer == NULL) { + stream->status = HTTP_STREAM_STATUS_ERROR; - stream->response.body = resized_buffer; - stream->response.body_read_offset += len; + return -1; } + stream->response.body = resized_buffer; + stream->response.body_read_offset += len; + if (stream->status == HTTP_STREAM_STATUS_RECEIVING_DATA) { if (stream->response.content_length >= stream->response.body_read_offset) { @@ -387,7 +385,7 @@ static ssize_t http2_data_source_read_callback(nghttp2_session *session, int flb_http2_client_session_init(struct flb_http2_client_session *session) { - nghttp2_settings_entry session_settings[1]; + nghttp2_settings_entry session_settings[3]; nghttp2_session_callbacks *callbacks; int result; @@ -422,10 +420,17 @@ int flb_http2_client_session_init(struct flb_http2_client_session *session) session_settings[0].settings_id = NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS; session_settings[0].value = 1; + session_settings[1].settings_id = NGHTTP2_SETTINGS_MAX_FRAME_SIZE; + session_settings[1].value = cfl_sds_alloc(session->parent->parent->temporary_buffer); + + session_settings[2].settings_id = NGHTTP2_SETTINGS_ENABLE_PUSH; + session_settings[2].value = 0; + + result = nghttp2_submit_settings(session->inner_session, NGHTTP2_FLAG_NONE, session_settings, - 1); + 3); if (result != 0) { return -3; @@ -480,6 +485,7 @@ int flb_http2_request_begin(struct flb_http_request *request) int flb_http2_request_commit(struct flb_http_request *request) { struct flb_http_client_session *parent_session; + cfl_sds_t sds_result; struct flb_http2_client_session *session; struct flb_http_stream *stream; int result; @@ -517,10 +523,10 @@ int flb_http2_request_commit(struct flb_http_request *request) } if (parent_session->connection->tls_session != NULL) { - scheme_as_text = "HTTPS"; + scheme_as_text = "https"; } else { - scheme_as_text = "HTTP"; + scheme_as_text = "http"; } switch (request->method) { @@ -554,6 +560,22 @@ int flb_http2_request_commit(struct flb_http_request *request) return -1; } + if (request->authority == NULL) { + request->authority = cfl_sds_create(request->host); + + if (request->authority == NULL) { + return -1; + } + + sds_result = cfl_sds_printf(&request->authority, + ":%u", + request->port); + + if (sds_result == NULL) { + return -1; + } + } + header_count = request->headers->total_count + 7; headers = flb_calloc(header_count, sizeof(nghttp2_nv)); @@ -580,8 +602,8 @@ int flb_http2_request_commit(struct flb_http_request *request) headers[header_index].name = (uint8_t *) ":authority"; headers[header_index].namelen = strlen(":authority"); - headers[header_index].value = (uint8_t *) request->host; - headers[header_index].valuelen = strlen(request->host); + headers[header_index].value = (uint8_t *) request->authority; + headers[header_index].valuelen = strlen(request->authority); header_index++; diff --git a/src/flb_http_common.c b/src/flb_http_common.c index fdfdd88041c..a5c65b50ffd 100644 --- a/src/flb_http_common.c +++ b/src/flb_http_common.c @@ -150,6 +150,10 @@ struct flb_http_request *flb_http_request_create() void flb_http_request_destroy(struct flb_http_request *request) { + if (request->authority != NULL) { + cfl_sds_destroy(request->authority); + } + if (request->path != NULL) { cfl_sds_destroy(request->path); } @@ -519,6 +523,15 @@ int flb_http_request_set_url(struct flb_http_request *request, return -1; } + start_of_authorization = NULL; + start_of_query_string = NULL; + start_of_authority = NULL; + start_of_username = NULL; + start_of_password = NULL; + start_of_port = NULL; + start_of_host = NULL; + start_of_path = NULL; + start_of_authority = strstr(local_url, "://"); if (start_of_authority == NULL) { @@ -645,6 +658,12 @@ int flb_http_request_set_url(struct flb_http_request *request, int flb_http_request_set_uri(struct flb_http_request *request, char *uri) { + if (request->path != NULL) { + cfl_sds_destroy(request->path); + + request->path = NULL; + } + request->path = cfl_sds_create(uri); if (request->path == NULL) { @@ -657,6 +676,12 @@ int flb_http_request_set_uri(struct flb_http_request *request, int flb_http_request_set_query_string(struct flb_http_request *request, char *query_string) { + if (request->query_string != NULL) { + cfl_sds_destroy(request->query_string); + + request->query_string = NULL; + } + request->query_string = cfl_sds_create(query_string); if (request->query_string == NULL) { @@ -669,6 +694,12 @@ int flb_http_request_set_query_string(struct flb_http_request *request, int flb_http_request_set_content_type(struct flb_http_request *request, char *content_type) { + if (request->content_type != NULL) { + cfl_sds_destroy(request->content_type); + + request->content_type = NULL; + } + request->content_type = cfl_sds_create(content_type); if (request->content_type == NULL) { @@ -681,6 +712,12 @@ int flb_http_request_set_content_type(struct flb_http_request *request, int flb_http_request_set_user_agent(struct flb_http_request *request, char *user_agent) { + if (request->user_agent != NULL) { + cfl_sds_destroy(request->user_agent); + + request->user_agent = NULL; + } + request->user_agent = cfl_sds_create(user_agent); if (request->user_agent == NULL) { diff --git a/src/flb_lib.c b/src/flb_lib.c index 1821a2e61ad..b9674e06824 100644 --- a/src/flb_lib.c +++ b/src/flb_lib.c @@ -347,6 +347,37 @@ int flb_input_set_processor(flb_ctx_t *ctx, int ffd, struct flb_processor *proc) return 0; } +int flb_output_set_http_test(flb_ctx_t *ctx, int ffd, char *test_name, + void (*out_response) (void *, int, int, void *, size_t, void *), + void *out_callback_data) +{ + struct flb_output_instance *o_ins; + + o_ins = out_instance_get(ctx, ffd); + if (!o_ins) { + return -1; + } + + /* + * Enabling a test, set the output instance in 'test' mode, so no real + * flush callback is invoked, only the desired implemented test. + */ + + /* Response test */ + if (strcmp(test_name, "response") == 0) { + o_ins->test_mode = FLB_TRUE; + o_ins->test_response.rt_ctx = ctx; + o_ins->test_response.rt_ffd = ffd; + o_ins->test_response.rt_out_response = out_response; + o_ins->test_response.rt_data = out_callback_data; + } + else { + return -1; + } + + return 0; +} + static inline int flb_config_map_property_check(char *plugin_name, struct mk_list *config_map, char *key, char *val) { struct flb_kv *kv; @@ -638,6 +669,41 @@ int flb_lib_free(void* data) } +static int flb_output_run_response(flb_ctx_t *ctx, struct flb_output_instance *o_ins, + int status, const void *data, size_t len) +{ + int ret; + void *out_buf = NULL; + size_t out_size = 0; + struct flb_test_out_response *resp; + + if (!o_ins) { + return -1; + } + + resp = &o_ins->test_response; + + /* Invoke the input plugin formatter test callback */ + ret = resp->callback(ctx->config, + o_ins->context, + status, data, len, + &out_buf, &out_size); + + /* Call the runtime test callback checker */ + if (resp->rt_out_response) { + resp->rt_out_response(resp->rt_ctx, + resp->rt_ffd, + ret, + out_buf, out_size, + resp->rt_data); + } + else { + flb_free(out_buf); + } + + return 0; +} + /* Push some data into the Engine */ int flb_lib_push(flb_ctx_t *ctx, int ffd, const void *data, size_t len) { @@ -662,6 +728,29 @@ int flb_lib_push(flb_ctx_t *ctx, int ffd, const void *data, size_t len) return ret; } +/* Emulate some data from the response */ +int flb_lib_response(flb_ctx_t *ctx, int ffd, int status, const void *data, size_t len) +{ + int ret; + struct flb_output_instance *o_ins; + + if (ctx->status == FLB_LIB_NONE || ctx->status == FLB_LIB_ERROR) { + flb_error("[lib] cannot push data, engine is not running"); + return -1; + } + + o_ins = out_instance_get(ctx, ffd); + if (!o_ins) { + return -1; + } + + /* If input's test_formatter is registered, priorize to run it. */ + if (o_ins->test_response.callback != NULL) { + ret = flb_output_run_response(ctx, o_ins, status, data, len); + } + return ret; +} + static void flb_lib_worker(void *data) { int ret; diff --git a/src/flb_network.c b/src/flb_network.c index 8f8ca33f602..e53030ba742 100644 --- a/src/flb_network.c +++ b/src/flb_network.c @@ -1807,16 +1807,22 @@ int flb_net_bind_udp(flb_sockfd_t fd, const struct sockaddr *addr, flb_sockfd_t flb_net_accept(flb_sockfd_t server_fd) { flb_sockfd_t remote_fd; - struct sockaddr sock_addr; - socklen_t socket_size = sizeof(struct sockaddr); - - // return accept(server_fd, &sock_addr, &socket_size); + struct sockaddr_storage sock_addr = { 0 }; + socklen_t socket_size = sizeof(sock_addr); + + /* + * sock_addr used to be a sockaddr struct, but this was too + * small of a structure to handle IPV6 addresses (#9053). + * This would cause accept() to not accept the connection (with no error), + * and a loop would occur continually trying to accept the connection. + * The sockaddr_storage can handle both IPV4 and IPV6. + */ #ifdef FLB_HAVE_ACCEPT4 - remote_fd = accept4(server_fd, &sock_addr, &socket_size, + remote_fd = accept4(server_fd, (struct sockaddr*)&sock_addr, &socket_size, SOCK_NONBLOCK | SOCK_CLOEXEC); #else - remote_fd = accept(server_fd, &sock_addr, &socket_size); + remote_fd = accept(server_fd, (struct sockaddr*)&sock_addr, &socket_size); flb_net_socket_nonblocking(remote_fd); #endif diff --git a/src/flb_output.c b/src/flb_output.c index 90593905960..288fc9dbb1f 100644 --- a/src/flb_output.c +++ b/src/flb_output.c @@ -729,6 +729,7 @@ struct flb_output_instance *flb_output_new(struct flb_config *config, /* Tests */ instance->test_formatter.callback = plugin->test_formatter.callback; + instance->test_response.callback = plugin->test_response.callback; return instance; diff --git a/src/flb_plugin.c b/src/flb_plugin.c index d55fced3287..7e259f67711 100644 --- a/src/flb_plugin.c +++ b/src/flb_plugin.c @@ -428,7 +428,6 @@ int flb_plugin_load_config_file(const char *file, struct flb_config *config) */ ret = flb_plugin_load_config_format(cf, config); if (ret == -1) { - flb_cf_destroy(cf); return -1; } diff --git a/src/flb_signv4_ng.c b/src/flb_signv4_ng.c index 701ebea9a65..400a04a47ea 100644 --- a/src/flb_signv4_ng.c +++ b/src/flb_signv4_ng.c @@ -588,6 +588,7 @@ static flb_sds_t flb_signv4_ng_canonical_request(struct flb_http_request *reques } else { tmp = (char *) request->path; + len = strlen(request->path); } /* Do URI encoding (rfc3986) */ diff --git a/src/tls/openssl.c b/src/tls/openssl.c index f6d54460f36..fca99c3542b 100644 --- a/src/tls/openssl.c +++ b/src/tls/openssl.c @@ -17,7 +17,11 @@ * limitations under the License. */ +#include +#include + #include +#include #include #include #include @@ -76,6 +80,7 @@ static int tls_init(void) SSL_load_error_strings(); SSL_library_init(); #endif + return 0; } @@ -134,10 +139,15 @@ static void tls_context_destroy(void *ctx_backend) struct tls_context *ctx = ctx_backend; pthread_mutex_lock(&ctx->mutex); + SSL_CTX_free(ctx->ctx); + if (ctx->alpn != NULL) { flb_free(ctx->alpn); + + ctx->alpn = NULL; } + pthread_mutex_unlock(&ctx->mutex); flb_free(ctx); @@ -438,7 +448,7 @@ static int macos_load_system_certificates(struct tls_context *ctx) } CFRelease(certs); - flb_debug("[tls] finished loading keychain certificates, total loaded: %d", loaded_cert_count); + flb_debug("[tls] finished loading keychain certificates, total loaded: %lu", loaded_cert_count); return 0; } #endif @@ -448,6 +458,9 @@ static int load_system_certificates(struct tls_context *ctx) int ret; const char *ca_file = FLB_DEFAULT_SEARCH_CA_BUNDLE; + (void) ret; + (void) ca_file; + /* For Windows use specific API to read the certs store */ #ifdef _MSC_VER return windows_load_system_certificates(ctx); @@ -467,6 +480,33 @@ static int load_system_certificates(struct tls_context *ctx) #endif } +#ifdef FLB_HAVE_DEV +/* This is not thread safe */ +static void ssl_key_logger(const SSL *ssl, const char *line) +{ + char *key_log_filename; + FILE *key_log_file; + + key_log_filename = getenv("SSLKEYLOGFILE"); + + if (key_log_filename == NULL) { + return; + } + + key_log_file = fopen(key_log_filename, "a"); + + if (key_log_file == NULL) { + return; + } + + setvbuf(key_log_file, NULL, 0, _IOLBF); + + fprintf(key_log_file, "%s\n", line); + + fclose(key_log_file); +} +#endif + static void *tls_context_create(int verify, int debug, int mode, @@ -481,6 +521,7 @@ static void *tls_context_create(int verify, SSL_CTX *ssl_ctx; struct tls_context *ctx; char err_buf[256]; + char *key_log_filename; /* * Init library ? based in the documentation on OpenSSL >= 1.1.0 is not longer @@ -523,6 +564,16 @@ static void *tls_context_create(int verify, flb_errno(); return NULL; } + +#ifdef FLB_HAVE_DEV + key_log_filename = getenv("SSLKEYLOGFILE"); + + if (key_log_filename != NULL) { + SSL_CTX_set_keylog_callback(ssl_ctx, ssl_key_logger); + } +#endif + + ctx->ctx = ssl_ctx; ctx->mode = mode; ctx->alpn = NULL; diff --git a/src/wasm/flb_wasm.c b/src/wasm/flb_wasm.c index a71ff85f4cf..a6810e2530e 100644 --- a/src/wasm/flb_wasm.c +++ b/src/wasm/flb_wasm.c @@ -168,6 +168,8 @@ struct flb_wasm *flb_wasm_instantiate(struct flb_config *config, const char *was if (!wasm_runtime_full_init(&wasm_args)) { flb_error("Init runtime environment failed."); + flb_free(fw); + return NULL; } diff --git a/tests/runtime/CMakeLists.txt b/tests/runtime/CMakeLists.txt index ea1a1da3199..09dce13c15e 100644 --- a/tests/runtime/CMakeLists.txt +++ b/tests/runtime/CMakeLists.txt @@ -60,6 +60,40 @@ if(FLB_OUT_LIB) FLB_RT_TEST(FLB_IN_KUBERNETES_EVENTS "in_kubernetes_events.c") endif() +if (FLB_CUSTOM_CALYPTIA) + set(CALYPTIA_TEST_LINK_LIBS + fluent-bit-static + ${CMAKE_THREAD_LIBS_INIT} + ) + + set(CALYPTIA_TESTS + "custom_calyptia_test.c" + "custom_calyptia_registration_retry_test.c" + "custom_calyptia_input_test.c" + ) + + foreach(TEST_SOURCE ${CALYPTIA_TESTS}) + get_filename_component(TEST_NAME ${TEST_SOURCE} NAME_WE) + + set(TEST_TARGET "flb-rt-${TEST_NAME}") + add_executable(${TEST_TARGET} + ${TEST_SOURCE} + "../../plugins/custom_calyptia/calyptia.c" + ) + + target_link_libraries(${TEST_TARGET} + ${CALYPTIA_TEST_LINK_LIBS} + ) + + add_test(NAME ${TEST_TARGET} + COMMAND ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/${TEST_TARGET} + WORKING_DIRECTORY ${CMAKE_HOME_DIRECTORY}/build) + + set_tests_properties(${TEST_TARGET} PROPERTIES LABELS "runtime") + add_dependencies(${TEST_TARGET} fluent-bit-static) + endforeach() +endif() + if(FLB_IN_EBPF) # Define common variables set(EBPF_TEST_INCLUDE_DIRS @@ -196,10 +230,6 @@ if(FLB_IN_LIB) endif() -if (FLB_CUSTOM_CALYPTIA) - FLB_RT_TEST(FLB_CUSTOM_CALYPTIA "custom_calyptia_test.c") -endif() - if (FLB_PROCESSOR_METRICS_SELECTOR) FLB_RT_TEST(FLB_PROCESSOR_METRICS_SELECTOR "processor_metrics_selector.c") endif() diff --git a/tests/runtime/custom_calyptia_input_test.c b/tests/runtime/custom_calyptia_input_test.c new file mode 100644 index 00000000000..ffc774c83d9 --- /dev/null +++ b/tests/runtime/custom_calyptia_input_test.c @@ -0,0 +1,172 @@ +#include +#include +#include +#include +#include +#include +#include +#include "flb_tests_runtime.h" +#include "../../plugins/custom_calyptia/calyptia.h" + +/* Test context structure */ +struct test_context { + struct calyptia *ctx; + struct flb_input_instance *fleet; + struct flb_config *config; +}; + +/* Initialize test context */ +static struct test_context *init_test_context() +{ + struct test_context *t_ctx = flb_calloc(1, sizeof(struct test_context)); + if (!t_ctx) { + return NULL; + } + + t_ctx->config = flb_config_init(); + if (!t_ctx->config) { + flb_free(t_ctx); + return NULL; + } + + t_ctx->ctx = flb_calloc(1, sizeof(struct calyptia)); + if (!t_ctx->ctx) { + flb_config_exit(t_ctx->config); + flb_free(t_ctx); + return NULL; + } + + /* Initialize plugin instance for logging */ + t_ctx->ctx->ins = flb_calloc(1, sizeof(struct flb_custom_instance)); + if (!t_ctx->ctx->ins) { + flb_free(t_ctx->ctx); + flb_config_exit(t_ctx->config); + flb_free(t_ctx); + return NULL; + } + + /* Initialize test values in ctx */ + t_ctx->ctx->api_key = flb_strdup("test_api_key"); + t_ctx->ctx->fleet_config_dir = flb_strdup("/test/config/dir"); + t_ctx->ctx->fleet_id = flb_strdup("test_fleet_id"); + t_ctx->ctx->fleet_name = flb_strdup("test_fleet"); + t_ctx->ctx->machine_id = flb_strdup("test_machine_id"); + t_ctx->ctx->fleet_max_http_buffer_size = flb_strdup("1024"); + t_ctx->ctx->fleet_interval_sec = flb_strdup("60"); + t_ctx->ctx->fleet_interval_nsec = flb_strdup("500000000"); + + t_ctx->fleet = flb_input_new(t_ctx->config, "calyptia_fleet", NULL, FLB_FALSE); + if (!t_ctx->fleet) { + if (t_ctx->ctx->ins) flb_free(t_ctx->ctx->ins); + flb_free(t_ctx->ctx); + flb_config_exit(t_ctx->config); + flb_free(t_ctx); + return NULL; + } + + return t_ctx; +} + +static void cleanup_test_context(struct test_context *t_ctx) +{ + if (!t_ctx) { + return; + } + + if (t_ctx->fleet) { + /* Input instance cleanup */ + flb_input_instance_destroy(t_ctx->fleet); + } + + if (t_ctx->ctx) { + if (t_ctx->ctx->api_key) flb_free(t_ctx->ctx->api_key); + if (t_ctx->ctx->fleet_config_dir) flb_free(t_ctx->ctx->fleet_config_dir); + if (t_ctx->ctx->fleet_id) flb_free(t_ctx->ctx->fleet_id); + if (t_ctx->ctx->fleet_name) flb_free(t_ctx->ctx->fleet_name); + if (t_ctx->ctx->machine_id) flb_free(t_ctx->ctx->machine_id); + if (t_ctx->ctx->fleet_max_http_buffer_size) flb_free(t_ctx->ctx->fleet_max_http_buffer_size); + if (t_ctx->ctx->fleet_interval_sec) flb_free(t_ctx->ctx->fleet_interval_sec); + if (t_ctx->ctx->fleet_interval_nsec) flb_free(t_ctx->ctx->fleet_interval_nsec); + if (t_ctx->ctx->ins) flb_free(t_ctx->ctx->ins); + flb_free(t_ctx->ctx); + } + + if (t_ctx->config) { + /* Destroy the config which will cleanup any remaining instances */ + flb_config_exit(t_ctx->config); + } + + flb_free(t_ctx); +} + +void test_set_fleet_input_properties() +{ + struct test_context *t_ctx = init_test_context(); + TEST_CHECK(t_ctx != NULL); + + /* Test setting properties */ + int ret = set_fleet_input_properties(t_ctx->ctx, t_ctx->fleet); + TEST_CHECK(ret == 0); + + /* Verify properties were set correctly */ + const char *value; + + /* Check api_key */ + value = flb_input_get_property("api_key", t_ctx->fleet); + TEST_CHECK(value != NULL); + TEST_MSG("api_key expected=%s got=%s", t_ctx->ctx->api_key, value); + TEST_CHECK(value && strcmp(value, t_ctx->ctx->api_key) == 0); + + /* Check config_dir */ + value = flb_input_get_property("config_dir", t_ctx->fleet); + TEST_CHECK(value != NULL); + TEST_MSG("config_dir expected=%s got=%s", t_ctx->ctx->fleet_config_dir, value); + TEST_CHECK(value && strcmp(value, t_ctx->ctx->fleet_config_dir) == 0); + + /* Check fleet_id */ + value = flb_input_get_property("fleet_id", t_ctx->fleet); + TEST_CHECK(value != NULL); + TEST_MSG("fleet_id expected=%s got=%s", t_ctx->ctx->fleet_id, value); + TEST_CHECK(value && strcmp(value, t_ctx->ctx->fleet_id) == 0); + + /* Check fleet_name */ + value = flb_input_get_property("fleet_name", t_ctx->fleet); + TEST_CHECK(value != NULL); + TEST_MSG("fleet_name expected=%s got=%s", t_ctx->ctx->fleet_name, value); + TEST_CHECK(value && strcmp(value, t_ctx->ctx->fleet_name) == 0); + + /* Check machine_id */ + value = flb_input_get_property("machine_id", t_ctx->fleet); + TEST_CHECK(value != NULL); + TEST_MSG("machine_id expected=%s got=%s", t_ctx->ctx->machine_id, value); + TEST_CHECK(value && strcmp(value, t_ctx->ctx->machine_id) == 0); + + /* Check max_http_buffer_size */ + value = flb_input_get_property("max_http_buffer_size", t_ctx->fleet); + TEST_CHECK(value != NULL); + TEST_MSG("max_http_buffer_size expected=%s got=%s", t_ctx->ctx->fleet_max_http_buffer_size, value); + TEST_CHECK(value && strcmp(value, t_ctx->ctx->fleet_max_http_buffer_size) == 0); + + // /* Check interval_sec */ + value = flb_input_get_property("interval_sec", t_ctx->fleet); + TEST_CHECK(value != NULL); + TEST_MSG("interval_sec expected=%s got=%s", t_ctx->ctx->fleet_interval_sec, value); + TEST_CHECK(value && strcmp(value, t_ctx->ctx->fleet_interval_sec) == 0); + + // /* Check interval_nsec */ + value = flb_input_get_property("interval_nsec", t_ctx->fleet); + TEST_CHECK(value != NULL); + TEST_MSG("interval_nsec expected=%s got=%s", t_ctx->ctx->fleet_interval_nsec, value); + TEST_CHECK(value && strcmp(value, t_ctx->ctx->fleet_interval_nsec) == 0); + + ret = set_fleet_input_properties(t_ctx->ctx, NULL); + TEST_CHECK(ret == -1); + + cleanup_test_context(t_ctx); +} + +/* Define test list */ +TEST_LIST = { + {"set_fleet_input_properties", test_set_fleet_input_properties}, + {NULL, NULL} +}; \ No newline at end of file diff --git a/tests/runtime/custom_calyptia_registration_retry_test.c b/tests/runtime/custom_calyptia_registration_retry_test.c new file mode 100644 index 00000000000..db3ea101e25 --- /dev/null +++ b/tests/runtime/custom_calyptia_registration_retry_test.c @@ -0,0 +1,313 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +#include +#include +#include +#include +#include +#include +#include + +#include "flb_tests_runtime.h" + +#define MOCK_SERVER_HOST "127.0.0.1" +#define MOCK_SERVER_PORT 9876 + +static int registration_count = 0; + +static void mock_server_cb_empty_token(mk_request_t *request, void *data) +{ + registration_count++; + if (registration_count == 1) { + /* Use a local buffer with correct size */ + const char *response = "{\"id\":\"test-id\"}"; + size_t response_len = strlen(response); // Ensure size is accurate + + mk_http_status(request, 200); + mk_http_header(request, "Content-Type", sizeof("Content-Type") - 1, + "application/json", sizeof("application/json") - 1); + mk_http_send(request, response, response_len, NULL); // Use response_len + } else { + mk_http_status(request, 500); + mk_http_header(request, "Content-Type", sizeof("Content-Type") - 1, + "text/plain", sizeof("text/plain") - 1); + mk_http_send(request, "Internal Server Error", sizeof("Internal Server Error") - 1, NULL); + } + mk_http_done(request); +} + +static void mock_server_cb(mk_request_t *request, void *data) +{ + registration_count++; + mk_http_status(request, 500); + mk_http_header(request, "Content-Type", sizeof("Content-Type") - 1, + "text/plain", sizeof("text/plain") - 1); + mk_http_send(request, "Internal Server Error", sizeof("Internal Server Error") - 1, NULL); + mk_http_done(request); +} + +/* Test function */ +void test_calyptia_register_retry() +{ + flb_ctx_t *ctx; + int ret; + int in_ffd; + mk_ctx_t *mock_ctx; + int vid; + char tmp[256]; + struct flb_custom_instance *calyptia; + + /* Reset registration count */ + registration_count = 0; + + /* Init mock server */ + mock_ctx = mk_create(); + TEST_CHECK(mock_ctx != NULL); + + /* Compose listen address */ + snprintf(tmp, sizeof(tmp) - 1, "%s:%d", MOCK_SERVER_HOST, MOCK_SERVER_PORT); + ret = mk_config_set(mock_ctx, "Listen", tmp, NULL); + TEST_CHECK(ret == 0); + + vid = mk_vhost_create(mock_ctx, NULL); + TEST_CHECK(vid >= 0); + + ret = mk_vhost_handler(mock_ctx, vid, "/v1/agents", mock_server_cb, NULL); + TEST_CHECK(ret == 0); + + ret = mk_vhost_handler(mock_ctx, vid, "/v1/agents/test-id", mock_server_cb, NULL); + TEST_CHECK(ret == 0); + + ret = mk_start(mock_ctx); + TEST_CHECK(ret == 0); + + flb_time_msleep(500); // Allow the mock server to initialize + + /* Init Fluent Bit context */ + ctx = flb_create(); + TEST_CHECK(ctx != NULL); + + ret = flb_service_set(ctx, + "Log_Level", "debug", + NULL); + TEST_CHECK(ret == 0); + + /* Create dummy input */ + in_ffd = flb_input(ctx, (char *)"dummy", NULL); + TEST_CHECK(in_ffd >= 0); + + /* Create custom Calyptia plugin */ + calyptia = flb_custom_new(ctx->config, (char *)"calyptia", NULL); + TEST_CHECK(calyptia != NULL); + + /* Set custom plugin properties */ + flb_custom_set_property(calyptia, "api_key", "test-key"); + flb_custom_set_property(calyptia, "log_level", "debug"); + flb_custom_set_property(calyptia, "add_label", "pipeline_id test-pipeline-id"); + flb_custom_set_property(calyptia, "calyptia_host", MOCK_SERVER_HOST); + flb_custom_set_property(calyptia, "calyptia_port", "9876"); + flb_custom_set_property(calyptia, "register_retry_on_flush", "true"); + flb_custom_set_property(calyptia, "calyptia_tls", "off"); + flb_custom_set_property(calyptia, "calyptia_tls.verify", "off"); + + /* Start the engine */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* First registration attempt should have failed */ + TEST_CHECK(registration_count == 1); + + flb_time_msleep(1000); + flb_lib_push(ctx, in_ffd, "{\"key\":\"val\"}", 13); + + /* Wait for processing */ + flb_time_msleep(10000); + TEST_CHECK(registration_count > 1); + + /* Cleanup */ + flb_stop(ctx); + flb_destroy(ctx); + mk_stop(mock_ctx); + mk_destroy(mock_ctx); +} + +static void test_calyptia_register_retry_empty_token() +{ + flb_ctx_t *ctx; + int ret; + int in_ffd; + mk_ctx_t *mock_ctx; + int vid; + char tmp[256]; + struct flb_custom_instance *calyptia; + + /* Reset registration count */ + registration_count = 0; + + /* Init mock server */ + mock_ctx = mk_create(); + TEST_CHECK(mock_ctx != NULL); + + /* Compose listen address */ + snprintf(tmp, sizeof(tmp) - 1, "%s:%d", MOCK_SERVER_HOST, MOCK_SERVER_PORT); + ret = mk_config_set(mock_ctx, "Listen", tmp, NULL); + TEST_CHECK(ret == 0); + + vid = mk_vhost_create(mock_ctx, NULL); + TEST_CHECK(vid >= 0); + + ret = mk_vhost_handler(mock_ctx, vid, "/v1/agents", mock_server_cb_empty_token, NULL); + TEST_CHECK(ret == 0); + + ret = mk_vhost_handler(mock_ctx, vid, "/v1/agents/test-id", mock_server_cb_empty_token, NULL); + TEST_CHECK(ret == 0); + + ret = mk_start(mock_ctx); + TEST_CHECK(ret == 0); + + flb_time_msleep(500); // Allow the mock server to initialize + + /* Init Fluent Bit context */ + ctx = flb_create(); + TEST_CHECK(ctx != NULL); + + ret = flb_service_set(ctx, + "Log_Level", "debug", + NULL); + TEST_CHECK(ret == 0); + + /* Create dummy input */ + in_ffd = flb_input(ctx, (char *)"dummy", NULL); + TEST_CHECK(in_ffd >= 0); + + /* Create custom Calyptia plugin */ + calyptia = flb_custom_new(ctx->config, (char *)"calyptia", NULL); + TEST_CHECK(calyptia != NULL); + + /* Set custom plugin properties */ + flb_custom_set_property(calyptia, "api_key", "test-key"); + flb_custom_set_property(calyptia, "log_level", "debug"); + flb_custom_set_property(calyptia, "add_label", "pipeline_id test-pipeline-id"); + flb_custom_set_property(calyptia, "calyptia_host", MOCK_SERVER_HOST); + flb_custom_set_property(calyptia, "calyptia_port", "9876"); + flb_custom_set_property(calyptia, "register_retry_on_flush", "false"); + flb_custom_set_property(calyptia, "calyptia_tls", "off"); + flb_custom_set_property(calyptia, "calyptia_tls.verify", "off"); + + /* Start the engine */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* First registration should be successful but with an empty token */ + TEST_CHECK(registration_count == 1); + + /* Push some data to trigger flush */ + flb_time_msleep(1000); + flb_lib_push(ctx, in_ffd, "{\"key\":\"val\"}", 13); + + /* Wait for processing */ + flb_time_msleep(10000); + + /* Verify the plugin fails due to empty token */ + TEST_CHECK(registration_count == 1); + + /* Cleanup */ + flb_stop(ctx); + flb_destroy(ctx); + mk_stop(mock_ctx); + mk_destroy(mock_ctx); +} + +static void test_calyptia_register_retry_empty_token_retry_true() +{ + flb_ctx_t *ctx; + int ret; + int in_ffd; + mk_ctx_t *mock_ctx; + int vid; + char tmp[256]; + struct flb_custom_instance *calyptia; + + /* Reset registration count */ + registration_count = 0; + + /* Init mock server */ + mock_ctx = mk_create(); + TEST_CHECK(mock_ctx != NULL); + + /* Compose listen address */ + snprintf(tmp, sizeof(tmp) - 1, "%s:%d", MOCK_SERVER_HOST, MOCK_SERVER_PORT); + ret = mk_config_set(mock_ctx, "Listen", tmp, NULL); + TEST_CHECK(ret == 0); + + vid = mk_vhost_create(mock_ctx, NULL); + TEST_CHECK(vid >= 0); + + ret = mk_vhost_handler(mock_ctx, vid, "/v1/agents", mock_server_cb_empty_token, NULL); + TEST_CHECK(ret == 0); + + ret = mk_vhost_handler(mock_ctx, vid, "/v1/agents/test-id", mock_server_cb_empty_token, NULL); + TEST_CHECK(ret == 0); + + ret = mk_start(mock_ctx); + TEST_CHECK(ret == 0); + + flb_time_msleep(500); // Allow the mock server to initialize + + /* Init Fluent Bit context */ + ctx = flb_create(); + TEST_CHECK(ctx != NULL); + + ret = flb_service_set(ctx, + "Log_Level", "debug", + NULL); + TEST_CHECK(ret == 0); + + /* Create dummy input */ + in_ffd = flb_input(ctx, (char *)"dummy", NULL); + TEST_CHECK(in_ffd >= 0); + + /* Create custom Calyptia plugin */ + calyptia = flb_custom_new(ctx->config, (char *)"calyptia", NULL); + TEST_CHECK(calyptia != NULL); + + /* Set custom plugin properties */ + flb_custom_set_property(calyptia, "api_key", "test-key"); + flb_custom_set_property(calyptia, "log_level", "debug"); + flb_custom_set_property(calyptia, "add_label", "pipeline_id test-pipeline-id"); + flb_custom_set_property(calyptia, "calyptia_host", MOCK_SERVER_HOST); + flb_custom_set_property(calyptia, "calyptia_port", "9876"); + flb_custom_set_property(calyptia, "register_retry_on_flush", "true"); + flb_custom_set_property(calyptia, "calyptia_tls", "off"); + flb_custom_set_property(calyptia, "calyptia_tls.verify", "off"); + + /* Start the engine */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* First registration should be successful but with an empty token */ + TEST_CHECK(registration_count == 1); + + /* Push some data to trigger flush */ + flb_time_msleep(1000); + flb_lib_push(ctx, in_ffd, "{\"key\":\"val\"}", 13); + + /* Wait for processing */ + flb_time_msleep(10000); + + /* Verify the plugin fails due to empty token */ + TEST_CHECK(registration_count > 1); + + /* Cleanup */ + flb_stop(ctx); + flb_destroy(ctx); + mk_stop(mock_ctx); + mk_destroy(mock_ctx); +} + +TEST_LIST = { + {"register_retry", test_calyptia_register_retry}, + {"register_retry_empty_token", test_calyptia_register_retry_empty_token}, + {"register_retry_empty_token_retry_true", test_calyptia_register_retry_empty_token_retry_true}, + {NULL, NULL} +}; \ No newline at end of file diff --git a/tests/runtime/data/es/json_es.h b/tests/runtime/data/es/json_es.h index 40f8ab1cac4..91348ec47db 100755 --- a/tests/runtime/data/es/json_es.h +++ b/tests/runtime/data/es/json_es.h @@ -15,3 +15,37 @@ #define JSON_DOTS \ "[1448403340," \ "{\".le.vel\":\"error\", \".fo.o\":[{\".o.k\": [{\".b.ar\": \"baz\"}]}]}]" + +#define JSON_RESPONSE_SUCCESSES "{\"errors\":false,\"took\":0,\"items\":[" \ + "{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"dcfJBJIBHhdJuKsoC7Tm\",\"_version\":1,\"result\":\"created\"," \ + "\"_shards\":{\"total\":2,\"successful\":1,\"failed\":0},\"_seq_no\":6,\"_primary_term\":1,\"status\":201}}," \ + "{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"dsfJBJIBHhdJuKsoC7Tm\",\"_version\":1,\"result\":\"created\"," \ + "\"_shards\":{\"total\":2,\"successful\":1,\"failed\":0},\"_seq_no\":7,\"_primary_term\":1,\"status\":201}}," \ + "{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"d8fJBJIBHhdJuKsoC7Tm\",\"_version\":1,\"result\":\"created\"," \ + "\"_shards\":{\"total\":2,\"successful\":1,\"failed\":0},\"_seq_no\":8,\"_primary_term\":1,\"status\":201}}," \ + "{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"eMfJBJIBHhdJuKsoC7Tm\",\"_version\":1,\"result\":\"created\"," \ + "\"_shards\":{\"total\":2,\"successful\":1,\"failed\":0},\"_seq_no\":9,\"_primary_term\":1,\"status\":201}}]}" + +#define JSON_RESPONSE_SUCCESSES_SIZE 783 + +#define JSON_RESPONSE_PARTIALLY_SUCCESS "{\"errors\":true,\"took\":316737025,\"items\":" \ + "[{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"hxELapEB_XqxG5Ydupgb\",\"_version\":1,\"result\":\"created\"," \ + "\"_shards\":{\"total\":2,\"successful\":1,\"failed\":0},\"_seq_no\":7,\"_primary_term\":1,\"status\":201}}," \ + "{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"iBELapEB_XqxG5Ydupgb\",\"status\":400," \ + "\"error\":{\"type\":\"document_parsing_exception\"," \ + "\"reason\":\"[1:65] failed to parse field [_id] of type [_id] in document with id 'iBELapEB_XqxG5Ydupgb'. " \ + "Preview of field's value: 'fhHraZEB_XqxG5Ydzpjv'\"," \ + "\"caused_by\":{\"type\":\"document_parsing_exception\"," \ + "\"reason\":\"[1:65] Field [_id] is a metadata field and cannot be added inside a document. " \ + "Use the index API request parameters.\"}}}}," \ + "{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"iRELapEB_XqxG5Ydupgb\",\"status\":400," \ + "\"error\":{\"type\":\"document_parsing_exception\"," \ + "\"reason\":\"[1:65] failed to parse field [_id] of type [_id] in document with id 'iRELapEB_XqxG5Ydupgb'. " \ + "Preview of field's value: 'fhHraZEB_XqxG5Ydzpjv'\"," \ + "\"caused_by\":{\"type\":\"document_parsing_exception\"," \ + "\"reason\":\"[1:65] Field [_id] is a metadata field and cannot be added inside a document. " \ + "Use the index API request parameters.\"}}}}," \ + "{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"ihELapEB_XqxG5Ydupgb\",\"_version\":1,\"result\":\"created\"," \ + "\"_shards\":{\"total\":2,\"successful\":1,\"failed\":0},\"_seq_no\":8,\"_primary_term\":1,\"status\":201}}]}" + +#define JSON_RESPONSE_PARTIALLY_SUCCESS_SIZE 1322 diff --git a/tests/runtime/filter_parser.c b/tests/runtime/filter_parser.c index 8f25fec0e6e..24368afc69d 100644 --- a/tests/runtime/filter_parser.c +++ b/tests/runtime/filter_parser.c @@ -493,32 +493,61 @@ void flb_test_filter_parser_handle_time_key_with_time_zone() } void test_parser_timestamp_timezone(char *tz, - char *time_fmt, - char *timestamp, - char *expected_epoch, - int use_system_timezone) + char *time_fmt, + char *timestamp, + char *expected_epoch, + int use_system_timezone) { int ret; int bytes; char *output, *original_tz = NULL; + char *saved_tz = NULL; char p[256]; - char expected[12]; + char expected[256]; flb_ctx_t *ctx; int in_ffd; int out_ffd; int filter_ffd; struct flb_parser *parser; + struct flb_lib_out_cb *cb; - struct flb_lib_out_cb cb; - cb.cb = callback_test; - cb.data = NULL; + /* Allocate and initialize callback */ + cb = flb_malloc(sizeof(struct flb_lib_out_cb)); + if (!cb) { + flb_errno(); + return; + } + cb->cb = callback_test; + cb->data = NULL; clear_output(); + /* Save current TZ if exists */ + original_tz = getenv("TZ"); + if (original_tz) { + saved_tz = strdup(original_tz); + if (!saved_tz) { + flb_free(cb); + return; + } + } + + /* Set new timezone if provided */ + if (tz) { + ret = setenv("TZ", tz, 1); + TEST_CHECK(ret == 0); + tzset(); /* Make sure timezone changes take effect */ + } + ctx = flb_create(); + TEST_CHECK(ctx != NULL); /* Configure service */ - flb_service_set(ctx, "Flush", FLUSH_INTERVAL, "Grace", "1", "Log_Level", "debug", NULL); + flb_service_set(ctx, + "Flush", FLUSH_INTERVAL, + "Grace", "1", + "Log_Level", "debug", + NULL); /* Input */ in_ffd = flb_input(ctx, (char *) "lib", NULL); @@ -528,21 +557,20 @@ void test_parser_timestamp_timezone(char *tz, NULL); /* Parser */ - parser = flb_parser_create("timestamp", // name - "regex", // format - "^(?