From 4acff0e479eb87c4cd7f257ad417000533c0cf6b Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Thu, 26 Oct 2023 13:16:26 -0300 Subject: [PATCH] flb_chunk_trace: abstract out the pipeline thread into its own API. Signed-off-by: Phillip Whelan --- include/fluent-bit/flb_chunk_trace.h | 23 ++- src/flb_chunk_trace.c | 262 +++++++++++++++++++-------- 2 files changed, 202 insertions(+), 83 deletions(-) diff --git a/include/fluent-bit/flb_chunk_trace.h b/include/fluent-bit/flb_chunk_trace.h index bb948a7b57f..57ba1f85ee7 100644 --- a/include/fluent-bit/flb_chunk_trace.h +++ b/include/fluent-bit/flb_chunk_trace.h @@ -67,22 +67,27 @@ struct flb_chunk_trace_limit { int count; }; -struct flb_chunk_trace_context { +struct flb_chunk_pipeline_context { + flb_ctx_t *flb; + flb_sds_t output_name; + pthread_t thread; + pthread_mutex_t lock; + pthread_cond_t cond; + struct mk_list *props; + void *data; void *input; void *output; +}; + +struct flb_chunk_trace_context { + void *input; int trace_count; struct flb_chunk_trace_limit limit; flb_sds_t trace_prefix; int to_destroy; int chunks; - flb_ctx_t *flb; - struct cio_ctx *cio; - pthread_t thread; - pthread_cond_t wait; - pthread_mutex_t lock; - flb_sds_t output_name; - struct mk_list *props; - void *data; + + struct flb_chunk_pipeline_context pipeline; }; struct flb_chunk_trace { diff --git a/src/flb_chunk_trace.c b/src/flb_chunk_trace.c index 2eebe4d92a4..92ce96bd1ec 100644 --- a/src/flb_chunk_trace.c +++ b/src/flb_chunk_trace.c @@ -98,36 +98,54 @@ static struct flb_output_instance *find_calyptia_output_instance(struct flb_conf return NULL; } -static void trace_chunk_context_destroy(struct flb_chunk_trace_context *ctxt) +static void trace_pipeline_stop(struct flb_chunk_pipeline_context *pipeline) +{ + flb_sds_destroy(pipeline->output_name); + + flb_trace("stop the pipeline"); + flb_stop(pipeline->flb); + + flb_trace("signaling pipeline thread to stop"); + pthread_mutex_lock(&pipeline->lock); + pthread_cond_signal(&pipeline->cond); + pthread_mutex_unlock(&pipeline->lock); + + flb_trace("joining pipeline thread..."); + + pthread_join(pipeline->thread, NULL); + flb_destroy(pipeline->flb); +} + +static void trace_pipeline_shutdown(struct flb_chunk_pipeline_context *pipeline) +{ + flb_input_pause_all(pipeline->flb->config); +} + +static void trace_pipeline_wait(struct flb_chunk_pipeline_context *pipeline) { int i; + /* waiting for all tasks to end is key to safely stopping and destroying */ + /* the fluent-bit pipeline. */ + for (i = 0; i < 5 && flb_task_running_count(pipeline->flb->config) > 0; i++) { + usleep(10 * 1000); + } +} +static void trace_chunk_context_destroy(struct flb_chunk_trace_context *ctxt) +{ if (flb_chunk_trace_has_chunks(ctxt) == FLB_TRUE) { flb_chunk_trace_set_destroy(ctxt); - flb_input_pause_all(ctxt->flb->config); + trace_pipeline_shutdown(&ctxt->pipeline); return; } - + /* pause all inputs, then destroy the input storage. */ - flb_input_pause_all(ctxt->flb->config); - /* waiting for all tasks to end is key to safely stopping and destroying */ - /* the fluent-bit pipeline. */ - for (i = 0; i < 5 && flb_task_running_count(ctxt->flb->config) > 0; i++) { - usleep(10 * 1000); - } + trace_pipeline_shutdown(&ctxt->pipeline); + trace_pipeline_wait(&ctxt->pipeline); flb_sds_destroy(ctxt->trace_prefix); - flb_sds_destroy(ctxt->output_name); - flb_trace("stop the pipeline"); - flb_stop(ctxt->flb); - flb_trace("signaling pipeline thread to stop"); - pthread_mutex_lock(&ctxt->lock); - pthread_cond_signal(&ctxt->wait); - pthread_mutex_unlock(&ctxt->lock); - flb_trace("joining pipeline thread..."); - pthread_join(ctxt->thread, NULL); - flb_destroy(ctxt->flb); + trace_pipeline_stop(&ctxt->pipeline); flb_free(ctxt); } @@ -142,10 +160,10 @@ void flb_chunk_trace_context_destroy(void *input) pthread_mutex_unlock(&in->chunk_trace_lock); } -static void *pipeline_thread(void *arg) +static void *trace_chunk_pipeline_thread(void *arg) { int ret; - struct flb_chunk_trace_context *ctx = (struct flb_chunk_trace_context *)arg; + struct flb_chunk_pipeline_context *ctx = (struct flb_chunk_pipeline_context *)arg; struct flb_input_instance *input = NULL; struct flb_output_instance *output = NULL; struct mk_list *head = NULL; @@ -158,7 +176,7 @@ static void *pipeline_thread(void *arg) ctx->flb = flb_create(); if (ctx->flb == NULL) { flb_errno(); - return NULL; + goto error_lock; } flb_service_set(ctx->flb, "flush", "1", "grace", "1", NULL); @@ -187,7 +205,7 @@ static void *pipeline_thread(void *arg) flb_error("could not create trace output"); goto error_input; } - + if (ctx->props != NULL) { mk_list_foreach(head, ctx->props) { prop = mk_list_entry(head, struct flb_kv, _head); @@ -205,29 +223,151 @@ static void *pipeline_thread(void *arg) ctx->input = (void *)input; flb_trace("[pipeline_thead]: start pipeline in thread"); - flb_start(ctx->flb); - pthread_cond_signal(&ctx->wait); - pthread_mutex_unlock(&ctx->lock); + + if (flb_start(ctx->flb) != 0) { + flb_error("[pipeline_thead]: unable to start pipeline"); + goto error_output; + } + + /* signal that we have finally started and we can begin waiting to exit.*/ + if (pthread_cond_signal(&ctx->cond) != 0) { + errno = ret; + flb_errno(); + flb_error("[pipeline_thead]: unable to signal start of pipeline"); + goto error_start; + } + + if (pthread_mutex_unlock(&ctx->lock) != 0) { + errno = ret; + flb_errno(); + flb_error("[pipeline_thead]: unable to unlock mutex at start of pipeline"); + goto error_start; + } + + /* simply wait here until the trace context is destroyed. */ flb_trace("[pipeline_thead]: wait for exit of pipeline thread"); - pthread_mutex_lock(&ctx->lock); - pthread_cond_wait(&ctx->wait, &ctx->lock); + + if ((ret = pthread_mutex_lock(&ctx->lock)) != 0) { + errno = ret; + flb_errno(); + flb_error("[pipeline_thread]: unable to lock when waiting"); + goto error_start; + } + + if ((ret = pthread_cond_wait(&ctx->cond, &ctx->lock)) != 0) { + errno = ret; + flb_errno(); + flb_error("[pipeline_thread]: unable to wait for exit"); + goto error_start; + } + pthread_mutex_unlock(&ctx->lock); - flb_trace("[pipeline_thead]: exit trace pipeline thread."); + flb_trace("[pipeline_thead]: exit trace pipeline thread"); return NULL; + +error_start: + flb_stop(ctx->flb); error_output: flb_output_instance_destroy(output); error_input: - if (ctx->cio) { - cio_destroy(ctx->cio); - } flb_input_instance_destroy(input); error_flb: flb_destroy(ctx->flb); +error_lock: + pthread_mutex_unlock(&ctx->lock); flb_trace("[pipeline_thead]: error: exit trace pipeline thread."); return NULL; } +static int trace_pipeline_start(struct flb_chunk_pipeline_context *pipeline) +{ + int rc; + + flb_trace("start pipeline thread"); + /* we open the lock before starting the pipeline thread so it will + * wait for us to wait for it... + */ + if (pthread_mutex_lock(&pipeline->lock) != 0) { + flb_errno(); + return FLB_FALSE; + } + + errno = 0; + rc = pthread_create(&pipeline->thread, NULL, trace_chunk_pipeline_thread, + (void *)pipeline); + + if (rc != 0) { + + /* store the return value in errno if it is zero since . */ + if (errno == 0) { + errno = rc; + } + + flb_errno(); + return FLB_FALSE; + } + + flb_trace("waiting for pipeline to start"); + rc = pthread_cond_wait(&pipeline->cond, &pipeline->lock); + + if (rc != 0) { + + /* store the return value in errno if it is zero since . */ + if (errno == 0) { + errno = rc; + } + + flb_errno(); + return FLB_FALSE; + } + + rc = pthread_mutex_unlock(&pipeline->lock); + + if (rc != 0) { + + /* store the return value in errno if it is zero since . */ + if (errno == 0) { + errno = rc; + } + + flb_errno(); + return FLB_FALSE; + } + + flb_trace("pipeline thread has started"); + + return FLB_TRUE; +} + +static int trace_pipeline_init(struct flb_chunk_pipeline_context *pipeline, + struct flb_config *config, const char *output_name, + void *data, struct mk_list *props) +{ + struct flb_output_instance *calyptia = NULL; + + pipeline->data = data; + pipeline->output_name = flb_sds_create(output_name); + + if (strcmp(pipeline->output_name, "calyptia") == 0) { + calyptia = find_calyptia_output_instance(config); + if (calyptia == NULL) { + flb_error("unable to find calyptia output instance"); + flb_sds_destroy(pipeline->output_name); + return FLB_FALSE; + } + pipeline->props = &calyptia->properties; + } + else if (props != NULL) { + pipeline->props = props; + } + + pthread_mutex_init(&pipeline->lock, NULL); + pthread_cond_init(&pipeline->cond, NULL); + + return trace_pipeline_start(pipeline); +} + struct flb_chunk_trace_context *flb_chunk_trace_context_new(void *trace_input, const char *output_name, const char *trace_prefix, @@ -235,7 +375,6 @@ struct flb_chunk_trace_context *flb_chunk_trace_context_new(void *trace_input, { struct flb_input_instance *in = (struct flb_input_instance *)trace_input; struct flb_config *config = in->config; - struct flb_output_instance *calyptia = NULL; struct flb_chunk_trace_context *ctx = NULL; if (config->enable_chunk_trace == FLB_FALSE) { @@ -257,44 +396,19 @@ struct flb_chunk_trace_context *flb_chunk_trace_context_new(void *trace_input, return NULL; } - ctx->output_name = flb_sds_create(output_name); - ctx->data = data; - - /* special handling for the calyptia plugin so we can copy the API */ - /* key and other configuration properties. */ - if (strcmp(ctx->output_name, "calyptia") == 0) { - calyptia = find_calyptia_output_instance(config); - if (calyptia == NULL) { - flb_error("unable to find calyptia output instance"); - goto error_ctxt; - } - ctx->props = &calyptia->properties; - } - else if (props != NULL) { - ctx->props = props; + if (trace_pipeline_init(&ctx->pipeline, config, output_name, data, props) == FLB_FALSE) { + flb_error("unable to initialize chunk trace pipeline"); + flb_free(ctx); + pthread_mutex_unlock(&in->chunk_trace_lock); + return NULL; } - pthread_mutex_init(&ctx->lock, NULL); - pthread_cond_init(&ctx->wait, NULL); - - flb_trace("wait for pipeline to start"); - pthread_mutex_lock(&ctx->lock); - flb_trace("waiting for pipeline to start"); - pthread_create(&ctx->thread, NULL, pipeline_thread, ctx); - pthread_cond_wait(&ctx->wait, &ctx->lock); - pthread_mutex_unlock(&ctx->lock); - flb_trace("waited for pipeline to start"); - + ctx->input = ctx->pipeline.input; ctx->trace_prefix = flb_sds_create(trace_prefix); in->chunk_trace_ctxt = ctx; pthread_mutex_unlock(&in->chunk_trace_lock); return ctx; - -error_ctxt: - flb_free(ctx); - pthread_mutex_unlock(&in->chunk_trace_lock); - return NULL; } struct flb_chunk_trace *flb_chunk_trace_new(struct flb_input_chunk *chunk) @@ -373,7 +487,7 @@ int flb_chunk_trace_context_set_limit(void *input, int limit_type, int limit_arg ctxt->limit.type = FLB_CHUNK_TRACE_LIMIT_TIME; ctxt->limit.seconds_started = tm.tm.tv_sec; ctxt->limit.seconds = limit_arg; - + pthread_mutex_unlock(&in->chunk_trace_lock); return 0; case FLB_CHUNK_TRACE_LIMIT_COUNT: @@ -430,7 +544,7 @@ void flb_chunk_trace_do_input(struct flb_input_chunk *ic) return; } pthread_mutex_unlock(&ic->in->chunk_trace_lock); - + if (ic->trace == NULL) { ic->trace = flb_chunk_trace_new(ic); } @@ -469,12 +583,12 @@ int flb_chunk_trace_input(struct flb_chunk_trace *trace) msgpack_unpacked_init(&result); cio_chunk_get_content(trace->ic->chunk, &buf, &buf_size); - + msgpack_pack_array(&mp_pck, 2); flb_pack_time_now(&mp_pck); if (input->alias != NULL) { msgpack_pack_map(&mp_pck, 7); - } + } else { msgpack_pack_map(&mp_pck, 6); } @@ -572,7 +686,7 @@ int flb_chunk_trace_pre_output(struct flb_chunk_trace *trace) flb_pack_time_now(&mp_pck); if (input->alias != NULL) { msgpack_pack_map(&mp_pck, 7); - } + } else { msgpack_pack_map(&mp_pck, 6); } @@ -684,13 +798,13 @@ int flb_chunk_trace_filter(struct flb_chunk_trace *tracer, void *pfilter, struct msgpack_pack_str_with_body(&mp_pck, "trace_id", strlen("trace_id")); msgpack_pack_str_with_body(&mp_pck, tracer->trace_id, strlen(tracer->trace_id)); - + msgpack_pack_str_with_body(&mp_pck, "plugin_instance", strlen("plugin_instance")); rc = msgpack_pack_str_with_body(&mp_pck, filter->name, strlen(filter->name)); if (rc == -1) { goto sbuffer_error; } - + if (filter->alias != NULL) { msgpack_pack_str_with_body(&mp_pck, "plugin_alias", strlen("plugin_alias")); msgpack_pack_str_with_body(&mp_pck, filter->alias, strlen(filter->alias)); @@ -734,7 +848,7 @@ int flb_chunk_trace_filter(struct flb_chunk_trace *tracer, void *pfilter, struct mp_sbuf.data, mp_sbuf.size); // in_emitter_add_record(tag, flb_sds_len(tag), mp_sbuf.data, mp_sbuf.size, // tracer->ctxt->input); - + rc = 0; unpack_error: @@ -764,12 +878,12 @@ int flb_chunk_trace_output(struct flb_chunk_trace *trace, struct flb_output_inst msgpack_sbuffer_init(&mp_sbuf); msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); msgpack_unpacked_init(&result); - + msgpack_pack_array(&mp_pck, 2); flb_pack_time_now(&mp_pck); if (output->alias != NULL) { msgpack_pack_map(&mp_pck, 7); - } + } else { msgpack_pack_map(&mp_pck, 6); }