diff --git a/tests/runtime/CMakeLists.txt b/tests/runtime/CMakeLists.txt index f3e901b6b7f..c870f76e4b9 100644 --- a/tests/runtime/CMakeLists.txt +++ b/tests/runtime/CMakeLists.txt @@ -100,6 +100,7 @@ if(FLB_IN_LIB) FLB_RT_TEST(FLB_OUT_FLOWCOUNTER "out_flowcounter.c") FLB_RT_TEST(FLB_OUT_FORWARD "out_forward.c") FLB_RT_TEST(FLB_OUT_HTTP "out_http.c") + FLB_RT_TEST(FLB_OUT_KAFKA "out_kafka.c") FLB_RT_TEST(FLB_OUT_LIB "out_lib.c") FLB_RT_TEST(FLB_OUT_LOKI "out_loki.c") FLB_RT_TEST(FLB_OUT_NULL "out_null.c") diff --git a/tests/runtime/out_kafka.c b/tests/runtime/out_kafka.c new file mode 100644 index 00000000000..4c5c852bc44 --- /dev/null +++ b/tests/runtime/out_kafka.c @@ -0,0 +1,49 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +#include +#include "flb_tests_runtime.h" + +/* Test data */ +#include "data/td/json_td.h" + + +void flb_test_raw_format() +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + + ctx = flb_create(); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Kafka output */ + out_ffd = flb_output(ctx, (char *) "kafka", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "test", NULL); + + /* Switch to raw mode and select a key */ + flb_output_set(ctx, out_ffd, "format", "raw", NULL); + flb_output_set(ctx, out_ffd, "raw_log_key", "key_0", NULL); + flb_output_set(ctx, out_ffd, "topics", "test", NULL); + flb_output_set(ctx, out_ffd, "brokers", "127.0.0.1:111", NULL); + flb_output_set(ctx, out_ffd, "queue_full_retries", "1", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + flb_lib_push(ctx, in_ffd, (char *) JSON_TD, (int) sizeof(JSON_TD) - 1); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +TEST_LIST = { + { "raw_format", flb_test_raw_format }, + { NULL, NULL }, +};