diff --git a/plugins/filter_log_to_metrics/log_to_metrics.c b/plugins/filter_log_to_metrics/log_to_metrics.c index 0f1370166b8..87231f66392 100644 --- a/plugins/filter_log_to_metrics/log_to_metrics.c +++ b/plugins/filter_log_to_metrics/log_to_metrics.c @@ -450,6 +450,35 @@ static int fill_labels(struct log_to_metrics_ctx *ctx, char **label_values, return label_counter; } +/* Timer callback to inject metrics into the pipeline */ +static void cb_send_metric_chunk(struct flb_config *config, void *data) +{ + int ret; + struct log_to_metrics_ctx *ctx = data; + + /* Check that metric context is not empty */ + if (ctx->cmt == NULL || ctx->input_ins == NULL) { + return; + } + + if (ctx->new_data) { + ret = flb_input_metrics_append(ctx->input_ins, ctx->tag, + strlen(ctx->tag), ctx->cmt); + if (ret != 0) { + flb_plg_error(ctx->ins, "could not append metrics"); + } + } + + /* Check if we are shutting down. If so, stop our timer */ + if (config->is_shutting_down) { + if(ctx->timer && ctx->timer->active) { + flb_plg_debug(ctx->ins, "Stopping callback timer"); + flb_sched_timer_cb_disable(ctx->timer); + } + } + ctx->new_data = FLB_FALSE; +} + static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins, struct flb_config *config, void *data) { @@ -462,6 +491,7 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins, char metric_subsystem[MAX_METRIC_LENGTH]; char value_field[MAX_METRIC_LENGTH]; struct flb_input_instance *input_ins; + struct flb_sched *sched; int i; @@ -676,7 +706,7 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins, } ret = flb_input_name_exists(ctx->emitter_name, config); - if (ret == FLB_TRUE) { + if (ret) { flb_plg_error(f_ins, "emitter_name '%s' already exists", ctx->emitter_name); flb_sds_destroy(ctx->emitter_name); @@ -732,6 +762,43 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins, } ctx->input_ins = input_ins; + + if (ctx->flush_interval_sec <= 0) { + ctx->flush_interval_sec = strtol(DEFAULT_INTERVAL_SEC, NULL, 10); + } + if (ctx->flush_interval_nsec <= 0) { + ctx->flush_interval_nsec = strtol(DEFAULT_INTERVAL_NSEC, NULL, 10); + } + if (ctx->flush_interval_sec == 0 && ctx->flush_interval_nsec == 0) { + flb_plg_debug(ctx->ins, "Interval is set to 0, will not use timer and " + "send metrics immediately"); + ctx->timer_mode = FLB_FALSE; + return 0; + } + + /* Initialize timer for scheduled metric updates */ + sched = flb_sched_ctx_get(); + if(sched == 0) { + flb_plg_error(f_ins, "could not get scheduler context"); + log_to_metrics_destroy(ctx); + return -1; + } + /* Convert flush_interval_sec and flush_interval_nsec to milliseconds */ + ctx->timer_interval = (ctx->flush_interval_sec * 1000) + + (ctx->flush_interval_nsec / 1000000); + flb_plg_debug(ctx->ins, + "Creating metric timer with frequency %d ms", + ctx->timer_interval); + + ret = flb_sched_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM, + ctx->timer_interval, cb_send_metric_chunk, + ctx, &ctx->timer); + if (ret < 0) { + flb_plg_error(f_ins, "could not create timer callback"); + log_to_metrics_destroy(ctx); + return -1; + } + ctx->timer_mode = FLB_TRUE; return 0; } @@ -923,9 +990,17 @@ static int cb_log_to_metrics_filter(const void *data, size_t bytes, return -1; } - ret = flb_input_metrics_append(ctx->input_ins, ctx->tag, strlen(ctx->tag), ctx->cmt); - if (ret != 0) { - flb_plg_error(ctx->ins, "could not append metrics"); + if (!ctx->timer_mode) { + ret = flb_input_metrics_append(ctx->input_ins, ctx->tag, + strlen(ctx->tag), ctx->cmt); + + if (ret != 0) { + flb_plg_error(ctx->ins, "could not append metrics. " + "Please consider to use flush_interval_sec and flush_interval_nsec"); + } + } + else { + ctx->new_data = FLB_TRUE; } /* Cleanup */ @@ -944,6 +1019,7 @@ static int cb_log_to_metrics_filter(const void *data, size_t bytes, } } + if (ctx->discard_logs) { *out_buf = NULL; *out_size = 0; @@ -961,7 +1037,10 @@ static int cb_log_to_metrics_filter(const void *data, size_t bytes, static int cb_log_to_metrics_exit(void *data, struct flb_config *config) { struct log_to_metrics_ctx *ctx = data; - + if(ctx->timer != NULL) { + flb_plg_debug(ctx->ins, "Destroying callback timer"); + flb_sched_timer_destroy(ctx->timer); + } return log_to_metrics_destroy(ctx); } @@ -1040,13 +1119,24 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct log_to_metrics_ctx, emitter_name), "Name of the emitter (advanced users)" }, - { FLB_CONFIG_MAP_SIZE, "emitter_mem_buf_limit", FLB_MEM_BUF_LIMIT_DEFAULT, 0, FLB_TRUE, offsetof(struct log_to_metrics_ctx, emitter_mem_buf_limit), "set a buffer limit to restrict memory usage of metrics emitter" }, - + { + FLB_CONFIG_MAP_INT, "flush_interval_sec", DEFAULT_INTERVAL_SEC, + 0, FLB_TRUE, offsetof(struct log_to_metrics_ctx, flush_interval_sec), + "Set the timer interval for metrics emission. If flush_interval_sec and " + "flush_interval_nsec are set to 0, the timer is disabled (default)." + }, + { + FLB_CONFIG_MAP_INT, "flush_interval_nsec", DEFAULT_INTERVAL_NSEC, + 0, FLB_TRUE, offsetof(struct log_to_metrics_ctx, flush_interval_nsec), + "Set the timer interval (subseconds) for metrics emission. " + "If flush_interval_sec and flush_interval_nsec are set to 0, the timer is disabled " + "(default). Final precision is milliseconds." + }, { FLB_CONFIG_MAP_BOOL, "discard_logs", "false", 0, FLB_TRUE, offsetof(struct log_to_metrics_ctx, discard_logs), diff --git a/plugins/filter_log_to_metrics/log_to_metrics.h b/plugins/filter_log_to_metrics/log_to_metrics.h index ae5b822c554..b326ffca467 100644 --- a/plugins/filter_log_to_metrics/log_to_metrics.h +++ b/plugins/filter_log_to_metrics/log_to_metrics.h @@ -50,13 +50,13 @@ #define FLB_MEM_BUF_LIMIT_DEFAULT "10M" #define DEFAULT_LOG_TO_METRICS_NAMESPACE "log_metric" - +#define DEFAULT_INTERVAL_SEC "0" +#define DEFAULT_INTERVAL_NSEC "0" struct log_to_metrics_ctx { struct mk_list rules; struct flb_filter_instance *ins; struct cmt *cmt; - struct flb_input_instance *input_ins; char **label_keys; @@ -84,6 +84,12 @@ struct log_to_metrics_ctx { flb_sds_t tag; flb_sds_t emitter_name; size_t emitter_mem_buf_limit; + long flush_interval_sec; + long flush_interval_nsec; + int timer_interval; + int timer_mode; + struct flb_sched_timer *timer; + int new_data; }; struct grep_rule