From 21acd96fc7b1ac6461bf06dcb79c6fa869ff9916 Mon Sep 17 00:00:00 2001 From: Marat Abrarov Date: Fri, 7 Jul 2023 01:06:52 +0300 Subject: [PATCH] pipeline: outputs: es: tests for upstream node configuration properties Signed-off-by: Marat Abrarov --- plugins/out_es/es.c | 2 +- plugins/out_es/es.h | 4 + tests/runtime/out_elasticsearch.c | 390 +++++++++++++++++++++++++++++- 3 files changed, 393 insertions(+), 3 deletions(-) diff --git a/plugins/out_es/es.c b/plugins/out_es/es.c index d87ed7800ab..9b8cd1b7e3d 100644 --- a/plugins/out_es/es.c +++ b/plugins/out_es/es.c @@ -642,7 +642,7 @@ static int cb_es_init(struct flb_output_instance *ins, return 0; } -static struct flb_elasticsearch_config *flb_elasticsearch_target( +struct flb_elasticsearch_config *flb_elasticsearch_target( struct flb_elasticsearch *ctx, struct flb_upstream_node **node) { struct flb_elasticsearch_config *ec; diff --git a/plugins/out_es/es.h b/plugins/out_es/es.h index 34546212cf2..e4e8c46d59f 100644 --- a/plugins/out_es/es.h +++ b/plugins/out_es/es.h @@ -22,6 +22,7 @@ #include #include +#include #define FLB_ES_DEFAULT_HOST "127.0.0.1" #define FLB_ES_DEFAULT_PORT 9200 @@ -173,4 +174,7 @@ struct flb_elasticsearch { struct flb_output_instance *ins; }; +struct flb_elasticsearch_config *flb_elasticsearch_target( + struct flb_elasticsearch *ctx, struct flb_upstream_node **node); + #endif diff --git a/tests/runtime/out_elasticsearch.c b/tests/runtime/out_elasticsearch.c index 914e363ae8e..3c9c88eb9f1 100644 --- a/tests/runtime/out_elasticsearch.c +++ b/tests/runtime/out_elasticsearch.c @@ -1,5 +1,9 @@ /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +#include +#include +#include + #include #include #include "flb_tests_runtime.h" @@ -12,16 +16,101 @@ * and to know how to extract that structure from plugin context. */ #include "../../plugins/out_es/es.h" -#include "../../plugins/out_es/es_conf.h" + +static const char * const es_upstream_section_property_prefix = " "; +static const char * const es_upstream_section_value_prefix = " "; + +static const char *create_upstream_conf_file(const char *first_property, ...) +{ + char *upstream_conf_filename; + FILE *upstream_conf_file; + int ret; + const char *arg; + int arg_idx; + va_list args; + + upstream_conf_filename = tmpnam(NULL); + if (upstream_conf_filename == NULL) { + return NULL; + } + + upstream_conf_file = fopen(upstream_conf_filename, "w"); + if (upstream_conf_file == NULL) { + return NULL; + } + + ret = fprintf(upstream_conf_file, "%s\n%s%s%s%s\n%s\n%s%s%s%s\n%s%s%s%s\n%s%s%s%s\n", + "[UPSTREAM]", + es_upstream_section_property_prefix, + "name", es_upstream_section_value_prefix, "es-balancing", + "[NODE]", + es_upstream_section_property_prefix, + "name", es_upstream_section_value_prefix, "node1", + es_upstream_section_property_prefix, + "host", es_upstream_section_value_prefix, FLB_ES_DEFAULT_HOST, + es_upstream_section_property_prefix, + "port", es_upstream_section_value_prefix, "9200"); + if (ret < 0) { + fclose(upstream_conf_file); + remove(upstream_conf_filename); + return NULL; + } + + arg = first_property; + arg_idx = 0; + va_start(args, first_property); + while (arg != NULL) { + if (arg_idx == 0) { + ret = fprintf(upstream_conf_file, "%s%s", + es_upstream_section_property_prefix, arg); + } + else { + if (strlen(arg) > 0) { + ret = fprintf(upstream_conf_file, "%s%s\n", + es_upstream_section_value_prefix, arg); + } + else { + ret = fprintf(upstream_conf_file, "\n"); + } + } + if (ret < 0) { + va_end(args); + fclose(upstream_conf_file); + remove(upstream_conf_filename); + return NULL; + } + arg = va_arg(args, const char *); + arg_idx ^= 1; + } + va_end(args); + + if (arg_idx != 0) { + ret = fprintf(upstream_conf_file, "\n"); + if (ret < 0) { + fclose(upstream_conf_file); + remove(upstream_conf_filename); + return NULL; + } + } + + ret = fclose(upstream_conf_file); + if (ret != 0) { + remove(upstream_conf_filename); + return NULL; + } + + return upstream_conf_filename; +} static void *cb_flush_context(struct flb_config *config, struct flb_input_instance *ins, void *plugin_context, void *flush_ctx) { + struct flb_upstream_node *node; struct flb_elasticsearch *ctx = plugin_context; (void) config; (void) ins; (void) flush_ctx; - return flb_es_upstream_conf(ctx, NULL); + return flb_elasticsearch_target(ctx, &node); } static void cb_check_write_op_index(void *ctx, int ffd, @@ -829,6 +918,298 @@ void flb_test_logstash_prefix_separator() flb_destroy(ctx); } +void flb_test_upstream_write_operation() +{ + int ret; + int size = sizeof(JSON_ES) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + const char *upstream_conf_filename; + + /* Override write_operation to index at upstream node level */ + upstream_conf_filename = create_upstream_conf_file("write_operation", "index", NULL); + TEST_ASSERT(upstream_conf_filename != NULL); + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Elasticsearch output */ + out_ffd = flb_output(ctx, (char *) "es", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + NULL); + + /* Override defaults of index and type */ + flb_output_set(ctx, out_ffd, + "Write_Operation", "Upsert", + "Generate_Id", "True", + NULL); + + /* Use upstream servers */ + flb_output_set(ctx, out_ffd, + "Upstream", upstream_conf_filename, + NULL); + + /* Enable test mode */ + ret = flb_output_set_test_with_ctx_callback(ctx, out_ffd, "formatter", + cb_check_write_op_index, + NULL, NULL, cb_flush_context); + TEST_CHECK(ret == 0); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + flb_lib_push(ctx, in_ffd, (char *) JSON_ES, size); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); + + /* Cleanup temporary configuration file */ + TEST_ASSERT(remove(upstream_conf_filename) == 0); +} + +void flb_test_upstream_index_type() +{ + int ret; + int size = sizeof(JSON_ES) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + const char *upstream_conf_filename; + + /* Override default index and type at upstream node level */ + upstream_conf_filename = create_upstream_conf_file("index", "index_test", + "type", "type_test", + NULL); + TEST_ASSERT(upstream_conf_filename != NULL); + + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Elasticsearch output */ + out_ffd = flb_output(ctx, (char *) "es", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + NULL); + + /* Use upstream servers */ + flb_output_set(ctx, out_ffd, + "Upstream", upstream_conf_filename, + NULL); + + /* Enable test mode */ + ret = flb_output_set_test_with_ctx_callback(ctx, out_ffd, "formatter", + cb_check_index_type, + NULL, NULL, cb_flush_context); + TEST_CHECK(ret == 0); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + flb_lib_push(ctx, in_ffd, (char *) JSON_ES, size); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); + + /* Cleanup temporary configuration file */ + TEST_ASSERT(remove(upstream_conf_filename) == 0); +} + +void flb_test_upstream_logstash_format() +{ + int ret; + int size = sizeof(JSON_ES) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + const char *upstream_conf_filename; + + /* Specify Logstash format at upstream node level */ + upstream_conf_filename = create_upstream_conf_file("logstash_format", "on", + "logstash_prefix", "prefix", + "logstash_prefix_separator", "SEP", + "logstash_dateformat", "%Y-%m-%d", + NULL); + TEST_ASSERT(upstream_conf_filename != NULL); + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Elasticsearch output */ + out_ffd = flb_output(ctx, (char *) "es", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + NULL); + + /* Use configuration different from upstream node configuration */ + flb_output_set(ctx, out_ffd, + "logstash_format", "off", + "logstash_prefix", "logstash", + "logstash_prefix_separator", "-" + "logstash_dateformat", "%Y.%m.%d", + NULL); + + /* Use upstream servers */ + flb_output_set(ctx, out_ffd, + "Upstream", upstream_conf_filename, + NULL); + + /* Enable test mode */ + ret = flb_output_set_test_with_ctx_callback(ctx, out_ffd, "formatter", + cb_check_logstash_prefix_separator, + NULL, NULL, cb_flush_context); + TEST_CHECK(ret == 0); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + flb_lib_push(ctx, in_ffd, (char *) JSON_ES, size); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); + + /* Cleanup temporary configuration file */ + TEST_ASSERT(remove(upstream_conf_filename) == 0); +} + +void flb_test_upstream_replace_dots() +{ + int ret; + int size = sizeof(JSON_DOTS) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + const char *upstream_conf_filename; + + /* Specify Logstash format at upstream node level */ + upstream_conf_filename = create_upstream_conf_file("replace_dots", "on", + NULL); + TEST_ASSERT(upstream_conf_filename != NULL); + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Elasticsearch output */ + out_ffd = flb_output(ctx, (char *) "es", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + NULL); + + /* Use configuration different from upstream node configuration */ + flb_output_set(ctx, out_ffd, + "replace_dots", "off", + NULL); + + /* Use upstream servers */ + flb_output_set(ctx, out_ffd, + "Upstream", upstream_conf_filename, + NULL); + + /* Enable test mode */ + ret = flb_output_set_test_with_ctx_callback(ctx, out_ffd, "formatter", + cb_check_replace_dots, + NULL, NULL, cb_flush_context); + TEST_CHECK(ret == 0); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + flb_lib_push(ctx, in_ffd, (char *) JSON_DOTS, size); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); + + /* Cleanup temporary configuration file */ + TEST_ASSERT(remove(upstream_conf_filename) == 0); +} + +void flb_test_upstream_id_key() +{ + int ret; + int size = sizeof(JSON_ES) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + const char *upstream_conf_filename; + + /* Specify Logstash format at upstream node level */ + upstream_conf_filename = create_upstream_conf_file("id_key", "key_2", NULL); + TEST_ASSERT(upstream_conf_filename != NULL); + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Elasticsearch output */ + out_ffd = flb_output(ctx, (char *) "es", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + NULL); + + /* Use upstream servers */ + flb_output_set(ctx, out_ffd, + "Upstream", upstream_conf_filename, + NULL); + + /* Enable test mode */ + ret = flb_output_set_test_with_ctx_callback(ctx, out_ffd, "formatter", + cb_check_id_key, + NULL, NULL, cb_flush_context); + TEST_CHECK(ret == 0); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + flb_lib_push(ctx, in_ffd, (char *) JSON_ES, size); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); + + /* Cleanup temporary configuration file */ + TEST_ASSERT(remove(upstream_conf_filename) == 0); +} + static void cb_check_response_success(void *ctx, int ffd, int res_ret, void *res_data, size_t res_size, void *data) @@ -1005,5 +1386,10 @@ TEST_LIST = { {"response_success" , flb_test_response_success }, {"response_successes", flb_test_response_successes }, {"response_partially_success" , flb_test_response_partially_success }, + {"upstream_write_operation" , flb_test_upstream_write_operation }, + {"upstream_index_type" , flb_test_upstream_index_type }, + {"upstream_logstash_format" , flb_test_upstream_logstash_format }, + {"upstream_replace_dots" , flb_test_upstream_replace_dots }, + {"upstream_id_key" , flb_test_upstream_id_key }, {NULL, NULL} };