Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chunk_trace: use a threaded pipeline. #8097

Merged
merged 6 commits into from
Oct 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions include/fluent-bit/flb_chunk_trace.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,27 @@ struct flb_chunk_trace_limit {
int count;
};

struct flb_chunk_trace_context {
struct flb_chunk_pipeline_context {
flb_ctx_t *flb;
flb_sds_t output_name;
pthread_t thread;
pthread_mutex_t lock;
pthread_cond_t cond;
struct mk_list *props;
void *data;
void *input;
void *output;
};

struct flb_chunk_trace_context {
void *input;
int trace_count;
struct flb_chunk_trace_limit limit;
flb_sds_t trace_prefix;
int to_destroy;
int chunks;
flb_ctx_t *flb;
struct cio_ctx *cio;

struct flb_chunk_pipeline_context pipeline;
};

struct flb_chunk_trace {
Expand All @@ -94,6 +105,7 @@ int flb_chunk_trace_input(struct flb_chunk_trace *trace);
void flb_chunk_trace_do_input(struct flb_input_chunk *trace);
int flb_chunk_trace_pre_output(struct flb_chunk_trace *trace);
int flb_chunk_trace_filter(struct flb_chunk_trace *trace, void *pfilter, struct flb_time *, struct flb_time *, char *buf, size_t buf_size);
int flb_chunk_trace_output(struct flb_chunk_trace *trace, struct flb_output_instance *output, int ret);
void flb_chunk_trace_free(struct flb_chunk_trace *trace);
int flb_chunk_trace_context_set_limit(void *input, int, int);
int flb_chunk_trace_context_hit_limit(void *input);
Expand Down
3 changes: 3 additions & 0 deletions include/fluent-bit/flb_event.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ struct flb_event_chunk {
void *data; /* event content */
size_t size; /* size of event */
size_t total_events; /* total number of serialized events */
#ifdef FLB_HAVE_CHUNK_TRACE
struct flb_chunk_trace *trace;
#endif
};

struct flb_event_chunk *flb_event_chunk_create(int type,
Expand Down
14 changes: 14 additions & 0 deletions include/fluent-bit/flb_output.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@
#include <fluent-bit/flb_regex.h>
#endif

#ifdef FLB_HAVE_CHUNK_TRACE
/* include prototype directly to avoid cyclical include ... */
int flb_chunk_trace_output(struct flb_chunk_trace *trace, struct flb_output_instance *output, int ret);
#endif

/* Output plugin masks */
#define FLB_OUTPUT_NET 32 /* output address may set host and port */
#define FLB_OUTPUT_PLUGIN_CORE 0
Expand Down Expand Up @@ -940,7 +945,16 @@ static inline void flb_output_return(int ret, struct flb_coro *co) {

flb_task_release_lock(task);

#ifdef FLB_HAVE_CHUNK_TRACE
if (task->event_chunk) {
if (task->event_chunk->trace) {
flb_chunk_trace_output(task->event_chunk->trace, o_ins, ret);
}
}
#endif

if (out_flush->processed_event_chunk) {

if (task->event_chunk->data != out_flush->processed_event_chunk->data) {
flb_free(out_flush->processed_event_chunk->data);
}
Expand Down
Loading
Loading