Skip to content

Commit

Permalink
custom_calyptia: cascade register_retry_on_flush variables.
Browse files Browse the repository at this point in the history
Signed-off-by: Jorge Niedbalski <[email protected]>
  • Loading branch information
Jorge Niedbalski committed Nov 28, 2024
1 parent 6cb6a04 commit 84d00f4
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 24 deletions.
12 changes: 11 additions & 1 deletion plugins/custom_calyptia/calyptia.c
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,12 @@ static struct flb_output_instance *setup_cloud_output(struct flb_config *config,
flb_output_set_property(cloud, "match", "_calyptia_cloud");
flb_output_set_property(cloud, "api_key", ctx->api_key);

if (ctx->register_retry_on_flush) {
flb_output_set_property(cloud, "register_retry_on_flush", "true");
} else {
flb_output_set_property(cloud, "register_retry_on_flush", "false");
}

if (ctx->store_path) {
flb_output_set_property(cloud, "store_path", ctx->store_path);
}
Expand Down Expand Up @@ -585,7 +591,11 @@ static struct flb_config_map config_map[] = {
"Pipeline ID for reporting to calyptia cloud."
},
#endif /* FLB_HAVE_CHUNK_TRACE */

{
FLB_CONFIG_MAP_BOOL, "register_retry_on_flush", "true",
0, FLB_TRUE, offsetof(struct calyptia, register_retry_on_flush),
"Retry agent registration on flush if failed on init."
},
/* EOF */
{0}
};
Expand Down
1 change: 1 addition & 0 deletions plugins/custom_calyptia/calyptia.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ struct calyptia {
flb_sds_t fleet_max_http_buffer_size;
flb_sds_t fleet_interval_sec;
flb_sds_t fleet_interval_nsec;
bool register_retry_on_flush; /* retry registration on flush if failed */
};

int set_fleet_input_properties(struct calyptia *ctx, struct flb_input_instance *fleet);
Expand Down
29 changes: 23 additions & 6 deletions plugins/out_calyptia/calyptia.c
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,21 @@ static struct flb_calyptia *config_init(struct flb_output_instance *ins,
return NULL;
}

ctx->metrics_endpoint = flb_sds_create_size(256);
if (!ctx->metrics_endpoint) {
flb_free(ctx);
return NULL;
}

#ifdef FLB_HAVE_CHUNK_TRACE
ctx->trace_endpoint = flb_sds_create_size(256);
if (!ctx->trace_endpoint) {
flb_sds_destroy(ctx->metrics_endpoint);
flb_free(ctx);
return NULL;
}
#endif

/* api_key */
if (!ctx->api_key) {
flb_plg_error(ctx->ins, "configuration 'api_key' is missing");
Expand Down Expand Up @@ -905,6 +920,7 @@ static void cb_calyptia_flush(struct flb_event_chunk *event_chunk,
struct flb_http_client *c = NULL;
struct flb_calyptia *ctx = out_context;
struct cmt *cmt;
flb_sds_t json;
(void) i_ins;
(void) config;

Expand Down Expand Up @@ -981,12 +997,13 @@ static void cb_calyptia_flush(struct flb_event_chunk *event_chunk,
}

#ifdef FLB_HAVE_CHUNK_TRACE
if (event_chunk->type == (FLB_EVENT_TYPE_LOGS | FLB_EVENT_TYPE_HAS_TRACE)) {
flb_sds_t json = flb_pack_msgpack_to_json_format(event_chunk->data,
event_chunk->size,
FLB_PACK_JSON_FORMAT_STREAM,
FLB_PACK_JSON_DATE_DOUBLE,
NULL);
if (event_chunk->type & FLB_EVENT_TYPE_LOGS &&
event_chunk->type & FLB_EVENT_TYPE_HAS_TRACE) {
json = flb_pack_msgpack_to_json_format(event_chunk->data,
event_chunk->size,
FLB_PACK_JSON_FORMAT_STREAM,
FLB_PACK_JSON_DATE_DOUBLE,
NULL);
if (json == NULL) {
flb_upstream_conn_release(u_conn);
FLB_OUTPUT_RETURN(FLB_RETRY);
Expand Down
38 changes: 21 additions & 17 deletions tests/runtime/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -61,29 +61,37 @@ if(FLB_OUT_LIB)
endif()

if (FLB_CUSTOM_CALYPTIA)
# Define common variables for calyptia tests
set(CALYPTIA_TEST_LINK_LIBS
fluent-bit-static
${CMAKE_THREAD_LIBS_INIT}
)

# Add calyptia input properties test
set(TEST_TARGET "flb-rt-calyptia_input_properties")
add_executable(${TEST_TARGET}
set(CALYPTIA_TESTS
"custom_calyptia_test.c"
"custom_calyptia_registration_retry_test.c"
"custom_calyptia_input_test.c"
"../../plugins/custom_calyptia/calyptia.c"
)

target_link_libraries(${TEST_TARGET}
${CALYPTIA_TEST_LINK_LIBS}
)
foreach(TEST_SOURCE ${CALYPTIA_TESTS})
get_filename_component(TEST_NAME ${TEST_SOURCE} NAME_WE)

add_test(NAME ${TEST_TARGET}
COMMAND ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/${TEST_TARGET}
WORKING_DIRECTORY ${CMAKE_HOME_DIRECTORY}/build)
set(TEST_TARGET "flb-rt-${TEST_NAME}")
add_executable(${TEST_TARGET}
${TEST_SOURCE}
"../../plugins/custom_calyptia/calyptia.c"
)

target_link_libraries(${TEST_TARGET}
${CALYPTIA_TEST_LINK_LIBS}
)

set_tests_properties(${TEST_TARGET} PROPERTIES LABELS "runtime")
add_dependencies(${TEST_TARGET} fluent-bit-static)
add_test(NAME ${TEST_TARGET}
COMMAND ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/${TEST_TARGET}
WORKING_DIRECTORY ${CMAKE_HOME_DIRECTORY}/build)

set_tests_properties(${TEST_TARGET} PROPERTIES LABELS "runtime")
add_dependencies(${TEST_TARGET} fluent-bit-static)
endforeach()
endif()

if(FLB_IN_EBPF)
Expand Down Expand Up @@ -222,10 +230,6 @@ if(FLB_IN_LIB)

endif()

if (FLB_CUSTOM_CALYPTIA)
FLB_RT_TEST(FLB_CUSTOM_CALYPTIA "custom_calyptia_test.c")
endif()

if (FLB_PROCESSOR_METRICS_SELECTOR)
FLB_RT_TEST(FLB_PROCESSOR_METRICS_SELECTOR "processor_metrics_selector.c")
endif()
Expand Down
117 changes: 117 additions & 0 deletions tests/runtime/custom_calyptia_registration_retry_test.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */

#include <fluent-bit.h>
#include <fluent-bit/flb_custom.h>
#include <monkey/mk_core.h>
#include <monkey/mk_lib.h>
#include <fluent-bit/flb_time.h>
#include <stdlib.h>
#include <string.h>

#include "flb_tests_runtime.h"

#define MOCK_SERVER_HOST "127.0.0.1"
#define MOCK_SERVER_PORT 9876

static int registration_count = 0;

static void mock_server_cb(mk_request_t *request, void *data)
{
registration_count++;
mk_http_status(request, 500);
mk_http_header(request, "Content-Type", sizeof("Content-Type") - 1,
"text/plain", sizeof("text/plain") - 1);
mk_http_send(request, "Internal Server Error", sizeof("Internal Server Error") - 1, NULL);
mk_http_done(request);
}

/* Test function */
void test_calyptia_register_retry()
{
flb_ctx_t *ctx;
int ret;
int in_ffd;
mk_ctx_t *mock_ctx;
int vid;
char tmp[256];
struct flb_custom_instance *calyptia;

/* Reset registration count */
registration_count = 0;

/* Init mock server */
mock_ctx = mk_create();
TEST_CHECK(mock_ctx != NULL);

/* Compose listen address */
snprintf(tmp, sizeof(tmp) - 1, "%s:%d", MOCK_SERVER_HOST, MOCK_SERVER_PORT);
ret = mk_config_set(mock_ctx, "Listen", tmp, NULL);
TEST_CHECK(ret == 0);

vid = mk_vhost_create(mock_ctx, NULL);
TEST_CHECK(vid >= 0);

ret = mk_vhost_handler(mock_ctx, vid, "/v1/agents", mock_server_cb, NULL);
TEST_CHECK(ret == 0);

ret = mk_vhost_handler(mock_ctx, vid, "/v1/agents/test-id", mock_server_cb, NULL);
TEST_CHECK(ret == 0);

ret = mk_start(mock_ctx);
TEST_CHECK(ret == 0);

flb_time_msleep(500); // Allow the mock server to initialize

/* Init Fluent Bit context */
ctx = flb_create();
TEST_CHECK(ctx != NULL);

ret = flb_service_set(ctx,
"Log_Level", "debug",
NULL);
TEST_CHECK(ret == 0);

/* Create dummy input */
in_ffd = flb_input(ctx, (char *)"dummy", NULL);
TEST_CHECK(in_ffd >= 0);

/* Create custom Calyptia plugin */
calyptia = flb_custom_new(ctx->config, (char *)"calyptia", NULL);
TEST_CHECK(calyptia != NULL);

/* Set custom plugin properties */
flb_custom_set_property(calyptia, "api_key", "test-key");
flb_custom_set_property(calyptia, "log_level", "debug");
flb_custom_set_property(calyptia, "add_label", "pipeline_id test-pipeline-id");
flb_custom_set_property(calyptia, "calyptia_host", MOCK_SERVER_HOST);
flb_custom_set_property(calyptia, "calyptia_port", "9876");
flb_custom_set_property(calyptia, "register_retry_on_flush", "true");
flb_custom_set_property(calyptia, "calyptia_tls", "off");
flb_custom_set_property(calyptia, "calyptia_tls.verify", "off");

/* Start the engine */
ret = flb_start(ctx);
TEST_CHECK(ret == 0);

/* First registration attempt should have failed */
TEST_CHECK(registration_count == 1);

flb_time_msleep(1000);
flb_lib_push(ctx, in_ffd, "{\"key\":\"val\"}", 13);

/* Wait for processing */
flb_time_msleep(10000);
TEST_CHECK(registration_count > 1);

/* Cleanup */
flb_stop(ctx);
flb_destroy(ctx);
mk_stop(mock_ctx);
mk_destroy(mock_ctx);
}

/* Test list */
TEST_LIST = {
{"register_retry", test_calyptia_register_retry},
{NULL, NULL}
};

0 comments on commit 84d00f4

Please sign in to comment.