Skip to content

Commit

Permalink
out_calyptia: retry agent registration on flush callback (fluent#9656)
Browse files Browse the repository at this point in the history
* out_calyptia: retry registering agent on flush.

if register_retry_on_flush is set (default true), agent registration
is retried on each flush callback.

if set to false then registration will cause to abort the plugin
initialisation.

Signed-off-by: Jorge Niedbalski <[email protected]>

* custom_calyptia: cascade register_retry_on_flush variables.

Signed-off-by: Jorge Niedbalski <[email protected]>

---------

Signed-off-by: Jorge Niedbalski <[email protected]>
Co-authored-by: Jorge Niedbalski <[email protected]>
  • Loading branch information
niedbalski and Jorge Niedbalski authored Nov 28, 2024
1 parent 361b5b9 commit 4acffc2
Show file tree
Hide file tree
Showing 6 changed files with 492 additions and 111 deletions.
12 changes: 11 additions & 1 deletion plugins/custom_calyptia/calyptia.c
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,12 @@ static struct flb_output_instance *setup_cloud_output(struct flb_config *config,
flb_output_set_property(cloud, "match", "_calyptia_cloud");
flb_output_set_property(cloud, "api_key", ctx->api_key);

if (ctx->register_retry_on_flush) {
flb_output_set_property(cloud, "register_retry_on_flush", "true");
} else {
flb_output_set_property(cloud, "register_retry_on_flush", "false");
}

if (ctx->store_path) {
flb_output_set_property(cloud, "store_path", ctx->store_path);
}
Expand Down Expand Up @@ -585,7 +591,11 @@ static struct flb_config_map config_map[] = {
"Pipeline ID for reporting to calyptia cloud."
},
#endif /* FLB_HAVE_CHUNK_TRACE */

{
FLB_CONFIG_MAP_BOOL, "register_retry_on_flush", "true",
0, FLB_TRUE, offsetof(struct calyptia, register_retry_on_flush),
"Retry agent registration on flush if failed on init."
},
/* EOF */
{0}
};
Expand Down
1 change: 1 addition & 0 deletions plugins/custom_calyptia/calyptia.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ struct calyptia {
flb_sds_t fleet_max_http_buffer_size;
flb_sds_t fleet_interval_sec;
flb_sds_t fleet_interval_nsec;
bool register_retry_on_flush; /* retry registration on flush if failed */
};

int set_fleet_input_properties(struct calyptia *ctx, struct flb_input_instance *fleet);
Expand Down
238 changes: 145 additions & 93 deletions plugins/out_calyptia/calyptia.c
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,24 @@ static int calyptia_http_do(struct flb_calyptia *ctx, struct flb_http_client *c,
int ret;
size_t b_sent;

if( !ctx || !c ) {
return FLB_ERROR;
}

/* Ensure agent_token is not empty when required */
if ((type == CALYPTIA_ACTION_METRICS || type == CALYPTIA_ACTION_PATCH || type == CALYPTIA_ACTION_TRACE) &&
!ctx->agent_token) {
flb_plg_warn(ctx->ins, "agent_token is missing for action type %d", type);
return FLB_ERROR;
}

/* append headers */
if (type == CALYPTIA_ACTION_REGISTER) {
// When registering a new agent api key is required
if (!ctx->api_key) {
flb_plg_error(ctx->ins, "api_key is missing");
return FLB_ERROR;
}
flb_http_add_header(c,
CALYPTIA_H_CTYPE, sizeof(CALYPTIA_H_CTYPE) - 1,
CALYPTIA_H_CTYPE_JSON, sizeof(CALYPTIA_H_CTYPE_JSON) - 1);
Expand Down Expand Up @@ -721,6 +737,21 @@ static struct flb_calyptia *config_init(struct flb_output_instance *ins,
return NULL;
}

ctx->metrics_endpoint = flb_sds_create_size(256);
if (!ctx->metrics_endpoint) {
flb_free(ctx);
return NULL;
}

#ifdef FLB_HAVE_CHUNK_TRACE
ctx->trace_endpoint = flb_sds_create_size(256);
if (!ctx->trace_endpoint) {
flb_sds_destroy(ctx->metrics_endpoint);
flb_free(ctx);
return NULL;
}
#endif

/* api_key */
if (!ctx->api_key) {
flb_plg_error(ctx->ins, "configuration 'api_key' is missing");
Expand Down Expand Up @@ -771,12 +802,40 @@ static struct flb_calyptia *config_init(struct flb_output_instance *ins,
return ctx;
}

static int cb_calyptia_init(struct flb_output_instance *ins,
struct flb_config *config, void *data)
static int register_agent(struct flb_calyptia *ctx, struct flb_config *config)
{
int ret;

/* Try registration */
ret = api_agent_create(config, ctx);
if (ret != FLB_OK) {
flb_plg_warn(ctx->ins, "agent registration failed");
return FLB_ERROR;
}

/* Update endpoints */
flb_sds_len_set(ctx->metrics_endpoint, 0);
flb_sds_printf(&ctx->metrics_endpoint, CALYPTIA_ENDPOINT_METRICS,
ctx->agent_id);

#ifdef FLB_HAVE_CHUNK_TRACE
if (ctx->pipeline_id) {
flb_sds_len_set(ctx->trace_endpoint, 0);
flb_sds_printf(&ctx->trace_endpoint, CALYPTIA_ENDPOINT_TRACE,
ctx->pipeline_id);
}
#endif

flb_plg_info(ctx->ins, "agent registration successful");
return FLB_OK;
}

static int cb_calyptia_init(struct flb_output_instance *ins,
struct flb_config *config, void *data)
{
struct flb_calyptia *ctx;
(void) data;
int ret;

/* create config context */
ctx = config_init(ins, config);
Expand All @@ -791,23 +850,12 @@ static int cb_calyptia_init(struct flb_output_instance *ins,
*/
flb_output_set_http_debug_callbacks(ins);

/* register/update agent */
ret = api_agent_create(config, ctx);
if (ret != FLB_OK) {
flb_plg_error(ctx->ins, "agent registration failed");
ret = register_agent(ctx, config);
if (ret != FLB_OK && !ctx->register_retry_on_flush) {
flb_plg_error(ins, "agent registration failed and register_retry_on_flush=false");
return -1;
}

/* metrics endpoint */
ctx->metrics_endpoint = flb_sds_create_size(256);
flb_sds_printf(&ctx->metrics_endpoint, CALYPTIA_ENDPOINT_METRICS,
ctx->agent_id);

#ifdef FLB_HAVE_CHUNK_TRACE
ctx->trace_endpoint = flb_sds_create_size(256);
flb_sds_printf(&ctx->trace_endpoint, CALYPTIA_ENDPOINT_TRACE,
ctx->pipeline_id);
#endif /* FLB_HAVE_CHUNK_TRACE */
return 0;
}

Expand All @@ -830,29 +878,79 @@ static void debug_payload(struct flb_calyptia *ctx, void *data, size_t bytes)
cmt_destroy(cmt);
}

static void cb_calyptia_flush(struct flb_event_chunk *event_chunk,
struct flb_output_flush *out_flush,
struct flb_input_instance *i_ins,
void *out_context,
struct flb_config *config)
static int cb_calyptia_exit(void *data, struct flb_config *config)
{
int ret = FLB_RETRY;
size_t off = 0;
size_t out_size = 0;
char *out_buf = NULL;
struct flb_calyptia *ctx = data;

if (!ctx) {
return 0;
}

if (ctx->u) {
flb_upstream_destroy(ctx->u);
}

if (ctx->agent_id) {
flb_sds_destroy(ctx->agent_id);
}

if (ctx->agent_token) {
flb_sds_destroy(ctx->agent_token);
}

if (ctx->env) {
flb_env_destroy(ctx->env);
}

if (ctx->metrics_endpoint) {
flb_sds_destroy(ctx->metrics_endpoint);
}

/* used to create records for reporting traces to the cloud. */
#ifdef FLB_HAVE_CHUNK_TRACE
flb_sds_t json;
if (ctx->trace_endpoint) {
flb_sds_destroy(ctx->trace_endpoint);
}
#endif /* FLB_HAVE_CHUNK_TRACE */

if (ctx->fs) {
flb_fstore_destroy(ctx->fs);
}

flb_kv_release(&ctx->kv_labels);
flb_free(ctx);

return 0;
}

static void cb_calyptia_flush(struct flb_event_chunk *event_chunk,
struct flb_output_flush *out_flush,
struct flb_input_instance *i_ins,
void *out_context,
struct flb_config *config)
{
int ret;
size_t off = 0;
size_t out_size = 0;
char *out_buf = NULL;
struct flb_connection *u_conn;
struct flb_http_client *c = NULL;
struct flb_calyptia *ctx = out_context;
struct cmt *cmt;
flb_sds_t json;
(void) i_ins;
(void) config;

if ((!ctx->agent_id || !ctx->agent_token) && ctx->register_retry_on_flush) {
flb_plg_info(ctx->ins, "missing agent_id or agent_token, attempting re-registration register_retry_on_flush=true");
if (register_agent(ctx, config) != FLB_OK) {
FLB_OUTPUT_RETURN(FLB_RETRY);
}
}
else if (!ctx->agent_id || !ctx->agent_token) {
flb_plg_error(ctx->ins, "missing agent_id or agent_token, and register_retry_on_flush=false");
FLB_OUTPUT_RETURN(FLB_ERROR);
}

/* Get upstream connection */
u_conn = flb_upstream_conn_get(ctx->u);
if (!u_conn) {
Expand Down Expand Up @@ -890,7 +988,7 @@ static void cb_calyptia_flush(struct flb_event_chunk *event_chunk,

/* Compose HTTP Client request */
c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->metrics_endpoint,
out_buf, out_size, NULL, 0, NULL, 0);
out_buf, out_size, NULL, 0, NULL, 0);
if (!c) {
if (out_buf != event_chunk->data) {
cmt_encode_msgpack_destroy(out_buf);
Expand All @@ -899,12 +997,12 @@ static void cb_calyptia_flush(struct flb_event_chunk *event_chunk,
FLB_OUTPUT_RETURN(FLB_RETRY);
}

/* perform request: 'ret' might be FLB_OK, FLB_ERROR or FLB_RETRY */
/* perform request */
ret = calyptia_http_do(ctx, c, CALYPTIA_ACTION_METRICS);
if (ret == FLB_OK) {
flb_plg_debug(ctx->ins, "metrics delivered OK");
}
else if (ret == FLB_ERROR) {
else {
flb_plg_error(ctx->ins, "could not deliver metrics");
debug_payload(ctx, out_buf, out_size);
}
Expand All @@ -915,42 +1013,35 @@ static void cb_calyptia_flush(struct flb_event_chunk *event_chunk,
}

#ifdef FLB_HAVE_CHUNK_TRACE
if (event_chunk->type == (FLB_EVENT_TYPE_LOGS | FLB_EVENT_TYPE_HAS_TRACE)) {
if (event_chunk->type & FLB_EVENT_TYPE_LOGS &&
event_chunk->type & FLB_EVENT_TYPE_HAS_TRACE) {
json = flb_pack_msgpack_to_json_format(event_chunk->data,
event_chunk->size,
FLB_PACK_JSON_FORMAT_STREAM,
FLB_PACK_JSON_DATE_DOUBLE,
NULL);
event_chunk->size,
FLB_PACK_JSON_FORMAT_STREAM,
FLB_PACK_JSON_DATE_DOUBLE,
NULL);
if (json == NULL) {
flb_upstream_conn_release(u_conn);
FLB_OUTPUT_RETURN(FLB_RETRY);
}
out_buf = (char *)json;
out_size = flb_sds_len(json);

if (flb_sds_printf(&ctx->metrics_endpoint, CALYPTIA_ENDPOINT_METRICS,
ctx->agent_id) == NULL) {
flb_upstream_conn_release(u_conn);
flb_sds_destroy(json);
FLB_OUTPUT_RETURN(FLB_RETRY);
}
c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->trace_endpoint,
out_buf, out_size, NULL, 0, NULL, 0);
(char *) json, flb_sds_len(json),
NULL, 0, NULL, 0);

if (!c) {
flb_upstream_conn_release(u_conn);
flb_sds_destroy(json);
flb_sds_destroy(ctx->metrics_endpoint);
FLB_OUTPUT_RETURN(FLB_RETRY);
}

/* perform request: 'ret' might be FLB_OK, FLB_ERROR or FLB_RETRY */
ret = calyptia_http_do(ctx, c, CALYPTIA_ACTION_TRACE);
if (ret == FLB_OK) {
flb_plg_debug(ctx->ins, "trace delivered OK");
}
else if (ret == FLB_ERROR) {
else {
flb_plg_error(ctx->ins, "could not deliver trace");
debug_payload(ctx, out_buf, out_size);
debug_payload(ctx, (char *) json, flb_sds_len(json));
}
flb_sds_destroy(json);
}
Expand All @@ -961,51 +1052,8 @@ static void cb_calyptia_flush(struct flb_event_chunk *event_chunk,
if (c) {
flb_http_client_destroy(c);
}
FLB_OUTPUT_RETURN(ret);
}

static int cb_calyptia_exit(void *data, struct flb_config *config)
{
struct flb_calyptia *ctx = data;

if (!ctx) {
return 0;
}

if (ctx->u) {
flb_upstream_destroy(ctx->u);
}

if (ctx->agent_id) {
flb_sds_destroy(ctx->agent_id);
}

if (ctx->agent_token) {
flb_sds_destroy(ctx->agent_token);
}

if (ctx->env) {
flb_env_destroy(ctx->env);
}

if (ctx->metrics_endpoint) {
flb_sds_destroy(ctx->metrics_endpoint);
}

#ifdef FLB_HAVE_CHUNK_TRACE
if (ctx->trace_endpoint) {
flb_sds_destroy(ctx->trace_endpoint);
}
#endif /* FLB_HAVE_CHUNK_TRACE */

if (ctx->fs) {
flb_fstore_destroy(ctx->fs);
}

flb_kv_release(&ctx->kv_labels);
flb_free(ctx);

return 0;
FLB_OUTPUT_RETURN(ret);
}

/* Configuration properties map */
Expand Down Expand Up @@ -1057,7 +1105,11 @@ static struct flb_config_map config_map[] = {
"Pipeline ID for calyptia core traces."
},
#endif

{
FLB_CONFIG_MAP_BOOL, "register_retry_on_flush", "true",
0, FLB_TRUE, offsetof(struct flb_calyptia, register_retry_on_flush),
"Retry agent registration on flush if failed on init."
},
/* EOF */
{0}
};
Expand Down
1 change: 1 addition & 0 deletions plugins/out_calyptia/calyptia.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ struct flb_calyptia {
flb_sds_t trace_endpoint;
flb_sds_t pipeline_id;
#endif /* FLB_HAVE_CHUNK_TRACE */
bool register_retry_on_flush; /* retry registration on flush if failed */
};

#endif
Loading

0 comments on commit 4acffc2

Please sign in to comment.