diff --git a/include/fluent-bit/flb_chunk_trace.h b/include/fluent-bit/flb_chunk_trace.h index bd2a9501c07..57ba1f85ee7 100644 --- a/include/fluent-bit/flb_chunk_trace.h +++ b/include/fluent-bit/flb_chunk_trace.h @@ -67,16 +67,27 @@ struct flb_chunk_trace_limit { int count; }; -struct flb_chunk_trace_context { +struct flb_chunk_pipeline_context { + flb_ctx_t *flb; + flb_sds_t output_name; + pthread_t thread; + pthread_mutex_t lock; + pthread_cond_t cond; + struct mk_list *props; + void *data; void *input; void *output; +}; + +struct flb_chunk_trace_context { + void *input; int trace_count; struct flb_chunk_trace_limit limit; flb_sds_t trace_prefix; int to_destroy; int chunks; - flb_ctx_t *flb; - struct cio_ctx *cio; + + struct flb_chunk_pipeline_context pipeline; }; struct flb_chunk_trace { @@ -94,6 +105,7 @@ int flb_chunk_trace_input(struct flb_chunk_trace *trace); void flb_chunk_trace_do_input(struct flb_input_chunk *trace); int flb_chunk_trace_pre_output(struct flb_chunk_trace *trace); int flb_chunk_trace_filter(struct flb_chunk_trace *trace, void *pfilter, struct flb_time *, struct flb_time *, char *buf, size_t buf_size); +int flb_chunk_trace_output(struct flb_chunk_trace *trace, struct flb_output_instance *output, int ret); void flb_chunk_trace_free(struct flb_chunk_trace *trace); int flb_chunk_trace_context_set_limit(void *input, int, int); int flb_chunk_trace_context_hit_limit(void *input); diff --git a/include/fluent-bit/flb_event.h b/include/fluent-bit/flb_event.h index 6217f0776a8..e80a58c38f8 100644 --- a/include/fluent-bit/flb_event.h +++ b/include/fluent-bit/flb_event.h @@ -42,6 +42,9 @@ struct flb_event_chunk { void *data; /* event content */ size_t size; /* size of event */ size_t total_events; /* total number of serialized events */ +#ifdef FLB_HAVE_CHUNK_TRACE + struct flb_chunk_trace *trace; +#endif }; struct flb_event_chunk *flb_event_chunk_create(int type, diff --git a/include/fluent-bit/flb_output.h b/include/fluent-bit/flb_output.h index d69de4f9192..5978c285679 100644 --- a/include/fluent-bit/flb_output.h +++ b/include/fluent-bit/flb_output.h @@ -64,6 +64,11 @@ #include #endif +#ifdef FLB_HAVE_CHUNK_TRACE +/* include prototype directly to avoid cyclical include ... */ +int flb_chunk_trace_output(struct flb_chunk_trace *trace, struct flb_output_instance *output, int ret); +#endif + /* Output plugin masks */ #define FLB_OUTPUT_NET 32 /* output address may set host and port */ #define FLB_OUTPUT_PLUGIN_CORE 0 @@ -940,7 +945,16 @@ static inline void flb_output_return(int ret, struct flb_coro *co) { flb_task_release_lock(task); +#ifdef FLB_HAVE_CHUNK_TRACE + if (task->event_chunk) { + if (task->event_chunk->trace) { + flb_chunk_trace_output(task->event_chunk->trace, o_ins, ret); + } + } +#endif + if (out_flush->processed_event_chunk) { + if (task->event_chunk->data != out_flush->processed_event_chunk->data) { flb_free(out_flush->processed_event_chunk->data); } diff --git a/src/flb_chunk_trace.c b/src/flb_chunk_trace.c index adacb73f21a..92ce96bd1ec 100644 --- a/src/flb_chunk_trace.c +++ b/src/flb_chunk_trace.c @@ -98,28 +98,54 @@ static struct flb_output_instance *find_calyptia_output_instance(struct flb_conf return NULL; } -static void trace_chunk_context_destroy(struct flb_chunk_trace_context *ctxt) +static void trace_pipeline_stop(struct flb_chunk_pipeline_context *pipeline) +{ + flb_sds_destroy(pipeline->output_name); + + flb_trace("stop the pipeline"); + flb_stop(pipeline->flb); + + flb_trace("signaling pipeline thread to stop"); + pthread_mutex_lock(&pipeline->lock); + pthread_cond_signal(&pipeline->cond); + pthread_mutex_unlock(&pipeline->lock); + + flb_trace("joining pipeline thread..."); + + pthread_join(pipeline->thread, NULL); + flb_destroy(pipeline->flb); +} + +static void trace_pipeline_shutdown(struct flb_chunk_pipeline_context *pipeline) +{ + flb_input_pause_all(pipeline->flb->config); +} + +static void trace_pipeline_wait(struct flb_chunk_pipeline_context *pipeline) { int i; + /* waiting for all tasks to end is key to safely stopping and destroying */ + /* the fluent-bit pipeline. */ + for (i = 0; i < 5 && flb_task_running_count(pipeline->flb->config) > 0; i++) { + usleep(10 * 1000); + } +} +static void trace_chunk_context_destroy(struct flb_chunk_trace_context *ctxt) +{ if (flb_chunk_trace_has_chunks(ctxt) == FLB_TRUE) { flb_chunk_trace_set_destroy(ctxt); - flb_input_pause_all(ctxt->flb->config); + trace_pipeline_shutdown(&ctxt->pipeline); return; } - + /* pause all inputs, then destroy the input storage. */ - flb_input_pause_all(ctxt->flb->config); - /* waiting for all tasks to end is key to safely stopping and destroying */ - /* the fluent-bit pipeline. */ - for (i = 0; i < 5 && flb_task_running_count(ctxt->flb->config) > 0; i++) { - usleep(10 * 1000); - } + trace_pipeline_shutdown(&ctxt->pipeline); + trace_pipeline_wait(&ctxt->pipeline); flb_sds_destroy(ctxt->trace_prefix); - flb_stop(ctxt->flb); - flb_destroy(ctxt->flb); + trace_pipeline_stop(&ctxt->pipeline); flb_free(ctxt); } @@ -134,44 +160,23 @@ void flb_chunk_trace_context_destroy(void *input) pthread_mutex_unlock(&in->chunk_trace_lock); } -struct flb_chunk_trace_context *flb_chunk_trace_context_new(void *trace_input, - const char *output_name, - const char *trace_prefix, - void *data, struct mk_list *props) +static void *trace_chunk_pipeline_thread(void *arg) { - struct flb_input_instance *in = (struct flb_input_instance *)trace_input; - struct flb_config *config = in->config; + int ret; + struct flb_chunk_pipeline_context *ctx = (struct flb_chunk_pipeline_context *)arg; struct flb_input_instance *input = NULL; struct flb_output_instance *output = NULL; - struct flb_output_instance *calyptia = NULL; - struct flb_chunk_trace_context *ctx = NULL; struct mk_list *head = NULL; struct flb_kv *prop = NULL; - int ret; - - if (config->enable_chunk_trace == FLB_FALSE) { - flb_warn("[chunk trace] enable chunk tracing via the configuration or " - " command line to be able to activate tracing."); - return NULL; - } - - pthread_mutex_lock(&in->chunk_trace_lock); - - if (in->chunk_trace_ctxt) { - trace_chunk_context_destroy(in->chunk_trace_ctxt); - } - ctx = flb_calloc(1, sizeof(struct flb_chunk_trace_context)); - if (ctx == NULL) { - flb_errno(); - pthread_mutex_unlock(&in->chunk_trace_lock); - return NULL; - } + flb_trace("[pipeline_thead]: waiting for start lock"); + pthread_mutex_lock(&ctx->lock); + flb_trace("[pipeline_thead]: waited for start lock"); ctx->flb = flb_create(); if (ctx->flb == NULL) { flb_errno(); - goto error_ctxt; + goto error_lock; } flb_service_set(ctx->flb, "flush", "1", "grace", "1", NULL); @@ -181,6 +186,7 @@ struct flb_chunk_trace_context *flb_chunk_trace_context_new(void *trace_input, flb_error("could not load trace emitter"); goto error_flb; } + input->is_threaded = FLB_TRUE; ret = flb_input_set_property(input, "alias", "trace-emitter"); if (ret != 0) { @@ -194,27 +200,14 @@ struct flb_chunk_trace_context *flb_chunk_trace_context_new(void *trace_input, goto error_input; } - output = flb_output_new(ctx->flb->config, output_name, data, 1); + output = flb_output_new(ctx->flb->config, ctx->output_name, ctx->data, 1); if (output == NULL) { flb_error("could not create trace output"); goto error_input; } - - /* special handling for the calyptia plugin so we can copy the API */ - /* key and other configuration properties. */ - if (strcmp(output_name, "calyptia") == 0) { - calyptia = find_calyptia_output_instance(config); - if (calyptia == NULL) { - flb_error("unable to find calyptia output instance"); - goto error_output; - } - mk_list_foreach(head, &calyptia->properties) { - prop = mk_list_entry(head, struct flb_kv, _head); - flb_output_set_property(output, prop->key, prop->val); - } - } - else if (props != NULL) { - mk_list_foreach(head, props) { + + if (ctx->props != NULL) { + mk_list_foreach(head, ctx->props) { prop = mk_list_entry(head, struct flb_kv, _head); flb_output_set_property(output, prop->key, prop->val); } @@ -228,29 +221,196 @@ struct flb_chunk_trace_context *flb_chunk_trace_context_new(void *trace_input, ctx->output = (void *)output; ctx->input = (void *)input; - ctx->trace_prefix = flb_sds_create(trace_prefix); - flb_start_trace(ctx->flb); + flb_trace("[pipeline_thead]: start pipeline in thread"); - in->chunk_trace_ctxt = ctx; - pthread_mutex_unlock(&in->chunk_trace_lock); - return ctx; + if (flb_start(ctx->flb) != 0) { + flb_error("[pipeline_thead]: unable to start pipeline"); + goto error_output; + } + + /* signal that we have finally started and we can begin waiting to exit.*/ + if (pthread_cond_signal(&ctx->cond) != 0) { + errno = ret; + flb_errno(); + flb_error("[pipeline_thead]: unable to signal start of pipeline"); + goto error_start; + } + + if (pthread_mutex_unlock(&ctx->lock) != 0) { + errno = ret; + flb_errno(); + flb_error("[pipeline_thead]: unable to unlock mutex at start of pipeline"); + goto error_start; + } + /* simply wait here until the trace context is destroyed. */ + flb_trace("[pipeline_thead]: wait for exit of pipeline thread"); + + if ((ret = pthread_mutex_lock(&ctx->lock)) != 0) { + errno = ret; + flb_errno(); + flb_error("[pipeline_thread]: unable to lock when waiting"); + goto error_start; + } + + if ((ret = pthread_cond_wait(&ctx->cond, &ctx->lock)) != 0) { + errno = ret; + flb_errno(); + flb_error("[pipeline_thread]: unable to wait for exit"); + goto error_start; + } + + pthread_mutex_unlock(&ctx->lock); + + flb_trace("[pipeline_thead]: exit trace pipeline thread"); + return NULL; + +error_start: + flb_stop(ctx->flb); error_output: flb_output_instance_destroy(output); error_input: - if (ctx->cio) { - cio_destroy(ctx->cio); - } flb_input_instance_destroy(input); error_flb: flb_destroy(ctx->flb); -error_ctxt: - flb_free(ctx); - pthread_mutex_unlock(&in->chunk_trace_lock); +error_lock: + pthread_mutex_unlock(&ctx->lock); + flb_trace("[pipeline_thead]: error: exit trace pipeline thread."); return NULL; } +static int trace_pipeline_start(struct flb_chunk_pipeline_context *pipeline) +{ + int rc; + + flb_trace("start pipeline thread"); + /* we open the lock before starting the pipeline thread so it will + * wait for us to wait for it... + */ + if (pthread_mutex_lock(&pipeline->lock) != 0) { + flb_errno(); + return FLB_FALSE; + } + + errno = 0; + rc = pthread_create(&pipeline->thread, NULL, trace_chunk_pipeline_thread, + (void *)pipeline); + + if (rc != 0) { + + /* store the return value in errno if it is zero since . */ + if (errno == 0) { + errno = rc; + } + + flb_errno(); + return FLB_FALSE; + } + + flb_trace("waiting for pipeline to start"); + rc = pthread_cond_wait(&pipeline->cond, &pipeline->lock); + + if (rc != 0) { + + /* store the return value in errno if it is zero since . */ + if (errno == 0) { + errno = rc; + } + + flb_errno(); + return FLB_FALSE; + } + + rc = pthread_mutex_unlock(&pipeline->lock); + + if (rc != 0) { + + /* store the return value in errno if it is zero since . */ + if (errno == 0) { + errno = rc; + } + + flb_errno(); + return FLB_FALSE; + } + + flb_trace("pipeline thread has started"); + + return FLB_TRUE; +} + +static int trace_pipeline_init(struct flb_chunk_pipeline_context *pipeline, + struct flb_config *config, const char *output_name, + void *data, struct mk_list *props) +{ + struct flb_output_instance *calyptia = NULL; + + pipeline->data = data; + pipeline->output_name = flb_sds_create(output_name); + + if (strcmp(pipeline->output_name, "calyptia") == 0) { + calyptia = find_calyptia_output_instance(config); + if (calyptia == NULL) { + flb_error("unable to find calyptia output instance"); + flb_sds_destroy(pipeline->output_name); + return FLB_FALSE; + } + pipeline->props = &calyptia->properties; + } + else if (props != NULL) { + pipeline->props = props; + } + + pthread_mutex_init(&pipeline->lock, NULL); + pthread_cond_init(&pipeline->cond, NULL); + + return trace_pipeline_start(pipeline); +} + +struct flb_chunk_trace_context *flb_chunk_trace_context_new(void *trace_input, + const char *output_name, + const char *trace_prefix, + void *data, struct mk_list *props) +{ + struct flb_input_instance *in = (struct flb_input_instance *)trace_input; + struct flb_config *config = in->config; + struct flb_chunk_trace_context *ctx = NULL; + + if (config->enable_chunk_trace == FLB_FALSE) { + flb_warn("[chunk trace] enable chunk tracing via the configuration or " + " command line to be able to activate tracing."); + return NULL; + } + + pthread_mutex_lock(&in->chunk_trace_lock); + + if (in->chunk_trace_ctxt) { + trace_chunk_context_destroy(in->chunk_trace_ctxt); + } + + ctx = flb_calloc(1, sizeof(struct flb_chunk_trace_context)); + if (ctx == NULL) { + flb_errno(); + pthread_mutex_unlock(&in->chunk_trace_lock); + return NULL; + } + + if (trace_pipeline_init(&ctx->pipeline, config, output_name, data, props) == FLB_FALSE) { + flb_error("unable to initialize chunk trace pipeline"); + flb_free(ctx); + pthread_mutex_unlock(&in->chunk_trace_lock); + return NULL; + } + + ctx->input = ctx->pipeline.input; + ctx->trace_prefix = flb_sds_create(trace_prefix); + + in->chunk_trace_ctxt = ctx; + pthread_mutex_unlock(&in->chunk_trace_lock); + return ctx; +} + struct flb_chunk_trace *flb_chunk_trace_new(struct flb_input_chunk *chunk) { struct flb_chunk_trace *trace = NULL; @@ -327,7 +487,7 @@ int flb_chunk_trace_context_set_limit(void *input, int limit_type, int limit_arg ctxt->limit.type = FLB_CHUNK_TRACE_LIMIT_TIME; ctxt->limit.seconds_started = tm.tm.tv_sec; ctxt->limit.seconds = limit_arg; - + pthread_mutex_unlock(&in->chunk_trace_lock); return 0; case FLB_CHUNK_TRACE_LIMIT_COUNT: @@ -381,10 +541,10 @@ void flb_chunk_trace_do_input(struct flb_input_chunk *ic) pthread_mutex_lock(&ic->in->chunk_trace_lock); if (ic->in->chunk_trace_ctxt == NULL) { pthread_mutex_unlock(&ic->in->chunk_trace_lock); - return; + return; } pthread_mutex_unlock(&ic->in->chunk_trace_lock); - + if (ic->trace == NULL) { ic->trace = flb_chunk_trace_new(ic); } @@ -423,12 +583,12 @@ int flb_chunk_trace_input(struct flb_chunk_trace *trace) msgpack_unpacked_init(&result); cio_chunk_get_content(trace->ic->chunk, &buf, &buf_size); - + msgpack_pack_array(&mp_pck, 2); flb_pack_time_now(&mp_pck); if (input->alias != NULL) { msgpack_pack_map(&mp_pck, 7); - } + } else { msgpack_pack_map(&mp_pck, 6); } @@ -483,8 +643,11 @@ int flb_chunk_trace_input(struct flb_chunk_trace *trace) flb_time_append_to_msgpack(&tm, &mp_pck, FLB_TIME_ETFMT_INT); msgpack_pack_str_with_body(&mp_pck, "end_time", strlen("end_time")); flb_time_append_to_msgpack(&tm_end, &mp_pck, FLB_TIME_ETFMT_INT); - in_emitter_add_record(tag, flb_sds_len(tag), mp_sbuf.data, mp_sbuf.size, - trace->ctxt->input); + flb_input_log_append(trace->ctxt->input, + tag, flb_sds_len(tag), + mp_sbuf.data, mp_sbuf.size); + // in_emitter_add_record(tag, flb_sds_len(tag), mp_sbuf.data, mp_sbuf.size, + // trace->ctxt->input); sbuffer_error: flb_sds_destroy(tag); msgpack_unpacked_destroy(&result); @@ -523,7 +686,7 @@ int flb_chunk_trace_pre_output(struct flb_chunk_trace *trace) flb_pack_time_now(&mp_pck); if (input->alias != NULL) { msgpack_pack_map(&mp_pck, 7); - } + } else { msgpack_pack_map(&mp_pck, 6); } @@ -577,8 +740,11 @@ int flb_chunk_trace_pre_output(struct flb_chunk_trace *trace) flb_time_append_to_msgpack(&tm, &mp_pck, FLB_TIME_ETFMT_INT); msgpack_pack_str_with_body(&mp_pck, "end_time", strlen("end_time")); flb_time_append_to_msgpack(&tm_end, &mp_pck, FLB_TIME_ETFMT_INT); - in_emitter_add_record(tag, flb_sds_len(tag), mp_sbuf.data, mp_sbuf.size, - trace->ctxt->input); + flb_input_log_append(trace->ctxt->input, + tag, flb_sds_len(tag), + mp_sbuf.data, mp_sbuf.size); + // in_emitter_add_record(tag, flb_sds_len(tag), mp_sbuf.data, mp_sbuf.size, + // trace->ctxt->input); sbuffer_error: flb_sds_destroy(tag); msgpack_unpacked_destroy(&result); @@ -613,7 +779,7 @@ int flb_chunk_trace_filter(struct flb_chunk_trace *tracer, void *pfilter, struct msgpack_pack_map(&mp_pck, 6); } else { - msgpack_pack_map(&mp_pck, 7); + msgpack_pack_map(&mp_pck, 7); } msgpack_pack_str_with_body(&mp_pck, "type", strlen("type")); @@ -632,13 +798,13 @@ int flb_chunk_trace_filter(struct flb_chunk_trace *tracer, void *pfilter, struct msgpack_pack_str_with_body(&mp_pck, "trace_id", strlen("trace_id")); msgpack_pack_str_with_body(&mp_pck, tracer->trace_id, strlen(tracer->trace_id)); - + msgpack_pack_str_with_body(&mp_pck, "plugin_instance", strlen("plugin_instance")); rc = msgpack_pack_str_with_body(&mp_pck, filter->name, strlen(filter->name)); if (rc == -1) { goto sbuffer_error; } - + if (filter->alias != NULL) { msgpack_pack_str_with_body(&mp_pck, "plugin_alias", strlen("plugin_alias")); msgpack_pack_str_with_body(&mp_pck, filter->alias, strlen(filter->alias)); @@ -677,9 +843,12 @@ int flb_chunk_trace_filter(struct flb_chunk_trace *tracer, void *pfilter, struct } while (rc == MSGPACK_UNPACK_SUCCESS && off < buf_size); } - in_emitter_add_record(tag, flb_sds_len(tag), mp_sbuf.data, mp_sbuf.size, - tracer->ctxt->input); - + flb_input_log_append(tracer->ctxt->input, + tag, flb_sds_len(tag), + mp_sbuf.data, mp_sbuf.size); + // in_emitter_add_record(tag, flb_sds_len(tag), mp_sbuf.data, mp_sbuf.size, + // tracer->ctxt->input); + rc = 0; unpack_error: @@ -690,3 +859,61 @@ int flb_chunk_trace_filter(struct flb_chunk_trace *tracer, void *pfilter, struct flb_sds_destroy(tag); return rc; } + +int flb_chunk_trace_output(struct flb_chunk_trace *trace, struct flb_output_instance *output, int ret) +{ + msgpack_packer mp_pck; + msgpack_sbuffer mp_sbuf; + msgpack_unpacked result; + struct flb_time tm; + struct flb_time tm_end; + int rc = -1; + flb_sds_t tag = flb_sds_create("trace"); + + + /* initiailize start time */ + flb_time_get(&tm); + flb_time_get(&tm_end); + + msgpack_sbuffer_init(&mp_sbuf); + msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + msgpack_unpacked_init(&result); + + msgpack_pack_array(&mp_pck, 2); + flb_pack_time_now(&mp_pck); + if (output->alias != NULL) { + msgpack_pack_map(&mp_pck, 7); + } + else { + msgpack_pack_map(&mp_pck, 6); + } + + msgpack_pack_str_with_body(&mp_pck, "type", 4); + msgpack_pack_int(&mp_pck, FLB_CHUNK_TRACE_TYPE_OUTPUT); + + msgpack_pack_str_with_body(&mp_pck, "trace_id", strlen("trace_id")); + msgpack_pack_str_with_body(&mp_pck, trace->trace_id, strlen(trace->trace_id)); + + msgpack_pack_str_with_body(&mp_pck, "plugin_instance", strlen("plugin_instance")); + msgpack_pack_str_with_body(&mp_pck, output->name, strlen(output->name)); + + if (output->alias != NULL) { + msgpack_pack_str_with_body(&mp_pck, "plugin_alias", strlen("plugin_alias")); + msgpack_pack_str_with_body(&mp_pck, output->alias, strlen(output->alias)); + } + + msgpack_pack_str_with_body(&mp_pck, "return", strlen("return")); + msgpack_pack_int(&mp_pck, ret); + + msgpack_pack_str_with_body(&mp_pck, "start_time", strlen("start_time")); + flb_time_append_to_msgpack(&tm, &mp_pck, FLB_TIME_ETFMT_INT); + msgpack_pack_str_with_body(&mp_pck, "end_time", strlen("end_time")); + flb_time_append_to_msgpack(&tm_end, &mp_pck, FLB_TIME_ETFMT_INT); + flb_input_log_append(trace->ctxt->input, + tag, flb_sds_len(tag), + mp_sbuf.data, mp_sbuf.size); + flb_sds_destroy(tag); + msgpack_unpacked_destroy(&result); + msgpack_sbuffer_destroy(&mp_sbuf); + return rc; +} diff --git a/src/flb_event.c b/src/flb_event.c index d18bbe32a1e..fef7f475322 100644 --- a/src/flb_event.c +++ b/src/flb_event.c @@ -45,6 +45,10 @@ struct flb_event_chunk *flb_event_chunk_create(int type, return NULL; } +#ifdef FLB_HAVE_CHUNK_TRACE + evc->trace = NULL; +#endif + evc->type = type; evc->data = buf_data; evc->size = buf_size; diff --git a/src/flb_task.c b/src/flb_task.c index bc9ddc6344f..ed2ca5cb7e3 100644 --- a/src/flb_task.c +++ b/src/flb_task.c @@ -376,6 +376,14 @@ struct flb_task *flb_task_create(uint64_t ref_id, *err = FLB_TRUE; return NULL; } + +#ifdef FLB_HAVE_CHUNK_TRACE + if (ic->trace) { + flb_debug("add trace to task"); + evc->trace = ic->trace; + } +#endif + task->event_chunk = evc; task_ic = (struct flb_input_chunk *) ic; task_ic->task = task;