Skip to content

Commit

Permalink
trace: add new command line options for enabling tracing at startup. (#…
Browse files Browse the repository at this point in the history
…6726)

Add new command line options to enable tracing from the start:

  * --trace: define a complete trace pipeline in a single line.
  * --trace-input: define the input to trace.
  * --trace-output: define the trace output.
  * --trace-output-property: define a trace output property.

Signed-off-by: Phillip Whelan <[email protected]>
  • Loading branch information
pwhelan authored Jul 21, 2023
1 parent eaa0bb5 commit 824ba3d
Show file tree
Hide file tree
Showing 5 changed files with 255 additions and 5 deletions.
1 change: 1 addition & 0 deletions include/fluent-bit/flb_lib.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ FLB_EXPORT double flb_time_now();

/* start stop the engine */
FLB_EXPORT int flb_start(flb_ctx_t *ctx);
FLB_EXPORT int flb_start_trace(flb_ctx_t *ctx);
FLB_EXPORT int flb_stop(flb_ctx_t *ctx);
FLB_EXPORT int flb_loop(flb_ctx_t *ctx);

Expand Down
2 changes: 1 addition & 1 deletion src/flb_chunk_trace.c
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ struct flb_chunk_trace_context *flb_chunk_trace_context_new(void *trace_input,
ctx->input = (void *)input;
ctx->trace_prefix = flb_sds_create(trace_prefix);

flb_start(ctx->flb);
flb_start_trace(ctx->flb);

in->chunk_trace_ctxt = ctx;
pthread_mutex_unlock(&in->chunk_trace_lock);
Expand Down
24 changes: 21 additions & 3 deletions src/flb_lib.c
Original file line number Diff line number Diff line change
Expand Up @@ -653,8 +653,7 @@ double flb_time_now()
return flb_time_to_double(&t);
}

/* Start the engine */
int flb_start(flb_ctx_t *ctx)
int static do_start(flb_ctx_t *ctx)
{
int fd;
int bytes;
Expand All @@ -669,7 +668,6 @@ int flb_start(flb_ctx_t *ctx)
flb_debug("[lib] context set: %p", ctx);

/* set context as the last active one */
flb_context_set(ctx);

/* spawn worker thread */
config = ctx->config;
Expand Down Expand Up @@ -715,6 +713,26 @@ int flb_start(flb_ctx_t *ctx)
return 0;
}

/* Start the engine */
int flb_start(flb_ctx_t *ctx)
{
int ret;

ret = do_start(ctx);
if (ret == 0) {
/* set context as the last active one */
flb_context_set(ctx);
}

return ret;
}

/* Start the engine without setting the global context */
int flb_start_trace(flb_ctx_t *ctx)
{
return do_start(ctx);
}

int flb_loop(flb_ctx_t *ctx)
{
while (ctx->status == FLB_LIB_OK) {
Expand Down
231 changes: 231 additions & 0 deletions src/fluent-bit.c
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,17 @@ volatile sig_atomic_t flb_bin_restarting = FLB_RELOAD_IDLE;
struct flb_stacktrace flb_st;
#endif

#ifdef FLB_HAVE_CHUNK_TRACE

#include <fluent-bit/flb_chunk_trace.h>

#define FLB_LONG_TRACE (1024 + 1)
#define FLB_LONG_TRACE_INPUT (1024 + 2)
#define FLB_LONG_TRACE_OUTPUT (1024 + 3)
#define FLB_LONG_TRACE_OUTPUT_PROPERTY (1024 + 4)

#endif

#define FLB_HELP_TEXT 0
#define FLB_HELP_JSON 1

Expand Down Expand Up @@ -138,6 +149,10 @@ static void flb_help(int rc, struct flb_config *config)
#endif
#ifdef FLB_HAVE_CHUNK_TRACE
print_opt("-Z, --enable-chunk-trace", "enable chunk tracing. activating it requires using the HTTP Server API.");
print_opt("--trace-input", "input to start tracing on startup.");
print_opt("--trace-output", "output to use for tracing on startup.");
print_opt("--trace-output-property", "set a property for output tracing on startup.");
print_opt("--trace", "setup a trace pipeline on startup. Uses a single line, ie: \"input=dummy.0 output=stdout output.format='json'\"");
#endif
print_opt("-w, --workdir", "set the working directory");
#ifdef FLB_HAVE_HTTP_SERVER
Expand Down Expand Up @@ -726,6 +741,173 @@ static struct flb_cf *service_configure(struct flb_cf *cf,
return cf;
}

#ifdef FLB_HAVE_CHUNK_TRACE
static struct flb_input_instance *find_input(flb_ctx_t *ctx, const char *name)
{
struct mk_list *head;
struct flb_input_instance *in;


mk_list_foreach(head, &ctx->config->inputs) {
in = mk_list_entry(head, struct flb_input_instance, _head);
if (strcmp(name, in->name) == 0) {
return in;
}
if (in->alias) {
if (strcmp(name, in->alias) == 0) {
return in;
}
}
}
return NULL;
}

static int enable_trace_input(flb_ctx_t *ctx, const char *name, const char *prefix, const char *output_name, struct mk_list *props)
{
struct flb_input_instance *in;


in = find_input(ctx, name);
if (in == NULL) {
return FLB_ERROR;
}

flb_chunk_trace_context_new(in, output_name, prefix, NULL, props);
return (in->chunk_trace_ctxt == NULL ? FLB_ERROR : FLB_OK);
}

static int disable_trace_input(flb_ctx_t *ctx, const char *name)
{
struct flb_input_instance *in;


in = find_input(ctx, name);
if (in == NULL) {
return FLB_ERROR;
}

if (in->chunk_trace_ctxt != NULL) {
flb_chunk_trace_context_destroy(in);
}
return FLB_OK;
}

static int set_trace_property(struct mk_list *props, char *kv)
{
int len;
int sep;
char *key;
char *value;

len = strlen(kv);
sep = mk_string_char_search(kv, '=', len);
if (sep == -1) {
return -1;
}

key = mk_string_copy_substr(kv, 0, sep);
value = kv + sep + 1;

if (!key) {
return -1;
}

flb_kv_item_create_len(props,
(char *)key, strlen(key),
(char *)value, strlen(value));

mk_mem_free(key);
return 0;
}

static int parse_trace_pipeline_prop(flb_ctx_t *ctx, const char *kv, char **key, char **value)
{
int len;
int sep;

len = strlen(kv);
sep = mk_string_char_search(kv, '=', len);
if (sep == -1) {
return FLB_ERROR;
}

*key = mk_string_copy_substr(kv, 0, sep);
if (!key) {
return FLB_ERROR;
}

*value = flb_strdup(kv + sep + 1);
return FLB_OK;
}

static int parse_trace_pipeline(flb_ctx_t *ctx, const char *pipeline, char **trace_input, char **trace_output, struct mk_list **props)
{
struct mk_list *parts = NULL;
struct mk_list *cur;
struct flb_split_entry *part;
char *key;
char *value;
const char *propname;
const char *propval;


parts = flb_utils_split(pipeline, (int)' ', 0);
if (parts == NULL) {
return FLB_ERROR;
}

mk_list_foreach(cur, parts) {
key = NULL;
value = NULL;
part = mk_list_entry(cur, struct flb_split_entry, _head);
if (parse_trace_pipeline_prop(ctx, part->value, &key, &value) == FLB_ERROR) {
return FLB_ERROR;
}
if (strcmp(key, "input") == 0) {
if (*trace_input != NULL) {
flb_free(*trace_input);
}
*trace_input = flb_strdup(value);
}
else if (strcmp(key, "output") == 0) {
if (*trace_output != NULL) {
flb_free(*trace_output);
}
*trace_output = flb_strdup(value);
}
else if (strncmp(key, "output.", strlen("output.")) == 0) {
propname = mk_string_copy_substr(key, strlen("output."), strlen(key));
if (propname == NULL) {
return FLB_ERROR;
}

propval = flb_strdup(value);
if (propval == NULL) {
return FLB_ERROR;
}

if (*props == NULL) {
*props = flb_calloc(1, sizeof(struct mk_list));
flb_kv_init(*props);
}

flb_kv_item_create_len(*props,
(char *)propname, strlen(propname),
(char *)propval, strlen(propval));
}
if (key != NULL) {
mk_mem_free(key);
}
if (value != NULL) {
flb_free(value);
}
}

flb_utils_split_free(parts);
return FLB_OK;
}
#endif

int flb_main(int argc, char **argv)
{
int opt;
Expand Down Expand Up @@ -762,6 +944,12 @@ int flb_main(int argc, char **argv)
flb_stacktrace_init(argv[0], &flb_st);
#endif

#ifdef FLB_HAVE_CHUNK_TRACE
char *trace_input = NULL;
char *trace_output = flb_strdup("stdout");
struct mk_list *trace_props = NULL;
#endif

/* Setup long-options */
static const struct option long_opts[] = {
{ "storage_path", required_argument, NULL, 'b' },
Expand Down Expand Up @@ -804,6 +992,10 @@ int flb_main(int argc, char **argv)
{ "enable-hot-reload", no_argument, NULL, 'Y' },
#ifdef FLB_HAVE_CHUNK_TRACE
{ "enable-chunk-trace", no_argument, NULL, 'Z' },
{ "trace", required_argument, NULL, FLB_LONG_TRACE },
{ "trace-input", required_argument, NULL, FLB_LONG_TRACE_INPUT },
{ "trace-output", required_argument, NULL, FLB_LONG_TRACE_OUTPUT },
{ "trace-output-property", required_argument, NULL, FLB_LONG_TRACE_OUTPUT_PROPERTY },
#endif
{ "disable-thread-safety-on-hot-reload", no_argument, NULL, 'W' },
{ NULL, 0, NULL, 0 }
Expand Down Expand Up @@ -998,6 +1190,28 @@ int flb_main(int argc, char **argv)
case 'Z':
flb_cf_section_property_add(cf_opts, service->properties, FLB_CONF_STR_ENABLE_CHUNK_TRACE, 0, "on", 0);
break;
case FLB_LONG_TRACE:
parse_trace_pipeline(ctx, optarg, &trace_input, &trace_output, &trace_props);
break;
case FLB_LONG_TRACE_INPUT:
if (trace_input != NULL) {
flb_free(trace_input);
}
trace_input = flb_strdup(optarg);
break;
case FLB_LONG_TRACE_OUTPUT:
if (trace_output != NULL) {
flb_free(trace_output);
}
trace_output = flb_strdup(optarg);
break;
case FLB_LONG_TRACE_OUTPUT_PROPERTY:
if (trace_props == NULL) {
trace_props = flb_calloc(1, sizeof(struct mk_list));
flb_kv_init(trace_props);
}
set_trace_property(trace_props, optarg);
break;
#endif /* FLB_HAVE_CHUNK_TRACE */
default:
flb_help(EXIT_FAILURE, config);
Expand Down Expand Up @@ -1113,6 +1327,12 @@ int flb_main(int argc, char **argv)
*/
ctx = flb_context_get();

#ifdef FLB_HAVE_CHUNK_TRACE
if (trace_input != NULL) {
enable_trace_input(ctx, trace_input, NULL /* prefix ... */, trace_output, trace_props);
}
#endif

while (ctx->status == FLB_LIB_OK && exit_signal == 0) {
sleep(1);

Expand All @@ -1130,6 +1350,17 @@ int flb_main(int argc, char **argv)
if (cf_opts != NULL) {
flb_cf_destroy(cf_opts);
}

#ifdef FLB_HAVE_CHUNK_TRACE
if (trace_input != NULL) {
disable_trace_input(ctx, trace_input);
flb_free(trace_input);
}
if (trace_output) {
flb_free(trace_output);
}
#endif

flb_stop(ctx);
flb_destroy(ctx);

Expand Down
2 changes: 1 addition & 1 deletion src/http_server/api/v1/trace.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
#include <msgpack.h>


struct flb_input_instance *find_input(struct flb_hs *hs, const char *name)
static struct flb_input_instance *find_input(struct flb_hs *hs, const char *name)
{
struct mk_list *head;
struct flb_input_instance *in;
Expand Down

0 comments on commit 824ba3d

Please sign in to comment.