diff --git a/plugins/out_parseable/parseable.c b/plugins/out_parseable/parseable.c index 4d87b2fb429..336c31e4b0b 100644 --- a/plugins/out_parseable/parseable.c +++ b/plugins/out_parseable/parseable.c @@ -67,6 +67,7 @@ static void cb_parseable_flush(struct flb_event_chunk *event_chunk, struct flb_http_client *client; struct flb_connection *u_conn; flb_sds_t body; + flb_sds_t x_p_stream_value = NULL; int ret; size_t b_sent; @@ -85,62 +86,83 @@ static void cb_parseable_flush(struct flb_event_chunk *event_chunk, msgpack_sbuffer_init(&sbuf); msgpack_packer_init(&pk, &sbuf, msgpack_sbuffer_write); - /* - * Pack an empty map of original size + 1 because we are appending one - * additional keyval pair later - */ + /* Pack original key-value pairs */ msgpack_pack_map(&pk, p->via.map.size + 1); - - /* Pack the original keyval pairs in first */ for (int i = 0; i < p->via.map.size; i++) { msgpack_pack_object(&pk, p->via.map.ptr[i].key); msgpack_pack_object(&pk, p->via.map.ptr[i].val); } - /* Append one more keyval pair to this log */ + /* Append one more key-value pair */ msgpack_pack_str_with_body(&pk, "source", 6); msgpack_pack_str_with_body(&pk, "fluent bit parseable plugin", 25); - /* Convert from msgpack serialization to JSON serialization for sending through HTTP */ + /* Convert from msgpack to JSON */ body = flb_msgpack_raw_to_json_sds(sbuf.data, sbuf.size); flb_plg_info(ctx->ins, "Body content: %s", body); /* Free up buffer as we don't need it anymore */ msgpack_sbuffer_destroy(&sbuf); - /* Retrieve the namespace_header value from the body (assuming body is a JSON object) */ - // Assuming you have a way to parse JSON or MsgPack (like searching by key) - // For simplicity, let's assume "namespace_header" is a key in the JSON object - /* Convert from msgpack serialization to JSON serialization for sending through HTTP */ - /* Copy the body to another variable to avoid mutating the original */ - flb_sds_t body_copy = flb_sds_create(body); - if (body_copy == NULL) { - flb_plg_error(ctx->ins, "Failed to create a copy of the body"); - flb_sds_destroy(body); - return NULL; // Handle the error appropriately - } - /* Retrieve the namespace_name value from the body copy */ - flb_sds_t namespace_name = flb_sds_create_size(256); // Dynamic string - if (body_copy != NULL) { - // Search for the "namespace_name" field in the JSON string - char *namespace_name_value = strstr(body_copy, "\"namespace_name\":\""); - if (namespace_name_value != NULL) { - namespace_name_value += strlen("\"namespace_name\":\""); - char *end_quote = strchr(namespace_name_value, '\"'); - if (end_quote != NULL) { - *end_quote = '\0'; // Null-terminate the extracted value - namespace_name = flb_sds_printf(&namespace_name, "%s", namespace_name_value); - flb_plg_info(ctx->ins, "Namespace name extracted value: %s", namespace_name_value); + /* Determine the value of the X-P-Stream header */ + if (ctx->p_stream && strcmp(ctx->p_stream, "$NAMESPACE") == 0) { + /* Extract namespace_name from the body */ + flb_sds_t body_copy = flb_sds_create(body); + if (body_copy == NULL) { + flb_plg_error(ctx->ins, "Failed to create a copy of the body"); + flb_sds_destroy(body); + msgpack_unpacked_destroy(&result); + FLB_OUTPUT_RETURN(FLB_ERROR); + } + + flb_sds_t namespace_name = flb_sds_create_size(256); // Dynamic string + if (body_copy != NULL) { + char *namespace_name_value = strstr(body_copy, "\"namespace_name\":\""); + if (namespace_name_value != NULL) { + namespace_name_value += strlen("\"namespace_name\":\""); + char *end_quote = strchr(namespace_name_value, '\"'); + if (end_quote != NULL) { + *end_quote = '\0'; // Null-terminate the extracted value + namespace_name = flb_sds_printf(&namespace_name, "%s", namespace_name_value); + flb_plg_info(ctx->ins, "Namespace name extracted value: %s", namespace_name_value); + } } } - } + flb_sds_destroy(body_copy); + + if (!namespace_name || flb_sds_len(namespace_name) == 0) { + flb_plg_error(ctx->ins, "Failed to extract namespace_name from the body"); + flb_sds_destroy(body); + flb_sds_destroy(namespace_name); + msgpack_unpacked_destroy(&result); + FLB_OUTPUT_RETURN(FLB_ERROR); + } + x_p_stream_value = namespace_name; + } + else if (ctx->p_stream) { + /* Use the user-specified stream directly */ + x_p_stream_value = flb_sds_create(ctx->p_stream); + if (!x_p_stream_value) { + flb_plg_error(ctx->ins, "Failed to set X-P-Stream header to the specified stream: %s", ctx->p_stream); + flb_sds_destroy(body); + msgpack_unpacked_destroy(&result); + FLB_OUTPUT_RETURN(FLB_ERROR); + } + } + else { + flb_plg_error(ctx->ins, "P_Stream is not set. Cannot determine the value for X-P-Stream."); + flb_sds_destroy(body); + msgpack_unpacked_destroy(&result); + FLB_OUTPUT_RETURN(FLB_ERROR); + } /* Get upstream connection */ u_conn = flb_upstream_conn_get(ctx->upstream); if (!u_conn) { flb_plg_error(ctx->ins, "connection initialization error"); flb_sds_destroy(body); + flb_sds_destroy(x_p_stream_value); msgpack_unpacked_destroy(&result); FLB_OUTPUT_RETURN(FLB_ERROR); } @@ -155,55 +177,34 @@ static void cb_parseable_flush(struct flb_event_chunk *event_chunk, if (!client) { flb_plg_error(ctx->ins, "could not create HTTP client"); flb_sds_destroy(body); + flb_sds_destroy(x_p_stream_value); flb_upstream_conn_release(u_conn); msgpack_unpacked_destroy(&result); FLB_OUTPUT_RETURN(FLB_ERROR); } - // Handle cases where namespace_name is empty - if (flb_sds_len(namespace_name) == 0) { - namespace_name = flb_sds_cat(namespace_name, "default-stream", 13); - flb_plg_info(ctx->ins, "Namespace name not found, using default: %s", namespace_name); - } - - flb_plg_info(ctx->ins, "Namespace name to be passed is: %s", namespace_name); - - // Add namespace_name to the HTTP header + /* Add HTTP headers */ flb_http_add_header(client, "Content-Type", 12, "application/json", 16); - flb_plg_info(ctx->ins, "Adding Header: Content-Type: application/json"); - - flb_http_add_header(client, "X-P-Stream", 10, namespace_name, flb_sds_len(namespace_name)); - flb_plg_info(ctx->ins, "Adding Header: X-P-Stream: %s", namespace_name); - - flb_http_basic_auth(client, "admin", "admin"); - flb_plg_info(ctx->ins, "Adding Header: Authorization: Basic "); - + flb_http_add_header(client, "X-P-Stream", 10, x_p_stream_value, flb_sds_len(x_p_stream_value)); + flb_http_basic_auth(client, ctx->p_username, ctx->p_password); /* Perform request */ ret = flb_http_do(client, &b_sent); flb_plg_info(ctx->ins, "HTTP request http_do=%i, HTTP Status: %i", ret, client->resp.status); - - /* Log the HTTP request details */ - flb_plg_info(ctx->ins, "HTTP Request:"); - flb_plg_info(ctx->ins, " Method: POST"); - flb_plg_info(ctx->ins, " URL: %s", client->uri); - flb_plg_info(ctx->ins, " Host: %s:%d", ctx->p_server, ctx->p_port); - flb_plg_info(ctx->ins, " Payload: %s", body ? body : "(null)"); - - - /* Free up resources */ + + /* Clean up resources */ flb_sds_destroy(body); + flb_sds_destroy(x_p_stream_value); flb_http_client_destroy(client); flb_upstream_conn_release(u_conn); - flb_sds_destroy(namespace_name); - } msgpack_unpacked_destroy(&result); FLB_OUTPUT_RETURN(FLB_OK); } + static int cb_parseable_exit(void *data, struct flb_config *config) { struct flb_out_parseable *ctx = data; @@ -227,6 +228,21 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct flb_out_parseable, p_server), "The host of the server to send logs to." }, + { + FLB_CONFIG_MAP_STR, "P_Username", NULL, + 0, FLB_TRUE, offsetof(struct flb_out_parseable, p_username), + "The parseable server username." + }, + { + FLB_CONFIG_MAP_STR, "P_Password", NULL, + 0, FLB_TRUE, offsetof(struct flb_out_parseable, p_password), + "The parseable server password." + }, + { + FLB_CONFIG_MAP_STR, "P_Stream", NULL, + 0, FLB_TRUE, offsetof(struct flb_out_parseable, p_stream), + "The stream name to send logs to. using $NAMESPACE will dynamically create namespace." + }, { FLB_CONFIG_MAP_INT, "P_Port", 0, 0, FLB_TRUE, offsetof(struct flb_out_parseable, p_port), diff --git a/plugins/out_parseable/parseable.h b/plugins/out_parseable/parseable.h index 4392b089005..745849c3926 100644 --- a/plugins/out_parseable/parseable.h +++ b/plugins/out_parseable/parseable.h @@ -7,6 +7,9 @@ struct flb_out_parseable { flb_sds_t p_server; int p_port; + flb_sds_t p_username; + flb_sds_t p_password; + flb_sds_t p_stream; struct flb_upstream *upstream; struct flb_output_instance *ins; };