Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

out_parseable: Plugin for sending logs to Parseable #9622

Open
wants to merge 81 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
81 commits
Select commit Hold shift + click to select a range
54041d2
bitbake: bump to v3.2.1
edsiper Nov 17, 2024
abe21b5
working http requests
AdheipSingh Nov 17, 2024
7d24323
hacked code
AdheipSingh Nov 18, 2024
c5eba8e
add namespace_name filter
AdheipSingh Nov 18, 2024
378393b
use stream dynamically
AdheipSingh Nov 20, 2024
50a833e
update dockerfile
AdheipSingh Nov 20, 2024
b82245d
add debug
AdheipSingh Nov 20, 2024
c36f54c
add headers debug
AdheipSingh Nov 20, 2024
0f9d637
add fix char size
AdheipSingh Nov 20, 2024
d271775
mutate body debug
AdheipSingh Nov 20, 2024
e7cfff0
add p_stream config and dynamic namespace action
AdheipSingh Nov 20, 2024
f0923c6
clean up log
AdheipSingh Nov 20, 2024
43d2943
exclude namesapces support
AdheipSingh Dec 4, 2024
733afb1
fix build
AdheipSingh Dec 4, 2024
a678706
remove example from cmakelists.txt
AdheipSingh Dec 4, 2024
0d7f070
debug leak
AdheipSingh Dec 4, 2024
4e7864d
release: update to 3.2.2 (#9610)
github-actions[bot] Nov 18, 2024
718e714
signv4: added missing length calculation
leonardo-albertovich Nov 18, 2024
ee31a65
http_client: added missing exit path
leonardo-albertovich Nov 18, 2024
070f7c8
http_common: added missing initializers
leonardo-albertovich Nov 18, 2024
7d15c9e
out_opentelemetry: added missing result checks and fixed leaks
leonardo-albertovich Nov 18, 2024
1eed94f
http_client: fixed potential memory corruption
leonardo-albertovich Nov 18, 2024
1a6dd29
http_client: added missing exit path
leonardo-albertovich Nov 18, 2024
2e1da5d
http_common: moved initializers to correct a bug introduced in PR 9608
leonardo-albertovich Nov 19, 2024
81bc86a
ci: fixed script running tests to match requirements of test_flb_util…
mabrarov Nov 22, 2024
b16801e
in_forward: Plug a resource leak on exception (CID 508064)
cosmo0920 Nov 19, 2024
86a496b
node_exporter_metrics: Use real_path for complaining on glob error
cosmo0920 Nov 19, 2024
5b00e18
plugin: Plug a use-after-free issue (CID 514582)
cosmo0920 Nov 25, 2024
9d3f158
build: made FLB_DEV detectable from code
leonardo-albertovich Nov 25, 2024
5393e1f
tls: openssl: added support for SSLKEYLOGFILE on DEV builds
leonardo-albertovich Nov 25, 2024
3e34356
out_opentelemetry: decoupled HTTP/2 and gRPC
leonardo-albertovich Nov 25, 2024
e00931c
http_client: added per client temporary buffer
leonardo-albertovich Nov 25, 2024
d430d56
http_client_http2: improved protocol compliance
leonardo-albertovich Nov 25, 2024
cbad521
http_common: added guards to prevent leaks
leonardo-albertovich Nov 25, 2024
1890898
http_client: added per session temporary buffer
leonardo-albertovich Nov 25, 2024
ab27ba5
http_client_http2: added missing header
leonardo-albertovich Nov 25, 2024
4491bce
http_common: added pre-generated authority field
leonardo-albertovich Nov 25, 2024
37a1912
out_azure_kusto : fix multiple files tail issue and timeout issue (#8…
tanmaya-panda1 Nov 26, 2024
eec1b70
in_opentelemetry: Propogate tag in http2 metrics and trace handlers
nuclearpidgeon Jul 15, 2024
d62ca38
processor_labels: Process operations for output purposed contexts of …
cosmo0920 Nov 22, 2024
b483230
filter_lua: expose env variables in FLB_ENV Lua table
edsiper Nov 19, 2024
a95e92a
custom_calyptia: added interval handling and tests
Nov 27, 2024
6ecb32f
in_calyptia_fleet: improved interval handling
Nov 27, 2024
0e75a94
out_stackdriver bug fix: return cached token when current_timestamp i…
shuaich Nov 27, 2024
1a7aa21
http_client: Implement response testing framework
cosmo0920 Sep 13, 2024
4bbd07d
output: Add a capability to inject HTTP response testing environment
cosmo0920 Sep 17, 2024
b5ed7fa
lib: Implement injecting HTTP response mechanism
cosmo0920 Sep 17, 2024
ce227ca
out_es: tests: Add HTTP response testing
cosmo0920 Sep 18, 2024
17e308c
in_http: use 'tag_key' option when json array is received
imankurpatel000 Nov 26, 2024
fd27cd9
network: Update struct type for sock_addr
jomillerOpen Sep 19, 2024
59dd67b
network: Update struct type being passed into accept
jomillerOpen Oct 2, 2024
2adc537
release: update to 3.2.3 (#9665)
github-actions[bot] Nov 28, 2024
97102e5
out_calyptia: retry agent registration on flush callback (#9656)
niedbalski Nov 28, 2024
250b3ec
build: cmake: fix UNICODE-escaped characters on aarch64 (#8851)
RamaMalladiAWS Dec 1, 2024
9efde0a
filter_parser: fix reserve data and preserve key handling (#9675)
niedbalski Dec 3, 2024
c713bda
out_prometheus_remote_write: Fix a typo (#9674)
baonq-me Dec 3, 2024
6bbb7b4
wasm: Plug a resource leak on exception (CID 508177) (#9615)
cosmo0920 Dec 4, 2024
2d34b29
update exclude namespace logic
AdheipSingh Dec 4, 2024
8b96eb3
update debug
AdheipSingh Dec 4, 2024
cc8fb1c
fix cm
AdheipSingh Dec 4, 2024
b90cbb6
debug inline check
AdheipSingh Dec 4, 2024
ee60adc
debug exclude
AdheipSingh Dec 4, 2024
faa0cf7
add debug statements
AdheipSingh Dec 4, 2024
e95413d
print config
AdheipSingh Dec 4, 2024
7facebb
add debug to exclude namespaces list
AdheipSingh Dec 4, 2024
1ed552d
debug exclude namespaces
AdheipSingh Dec 4, 2024
e3d11b1
update logic
AdheipSingh Dec 4, 2024
b998276
debug print out comparison
AdheipSingh Dec 4, 2024
4c69573
revert debug change
AdheipSingh Dec 4, 2024
e92898f
Merge branch 'fluent:master' into master
AdheipSingh Dec 4, 2024
ecacc1d
remove P_ in config
AdheipSingh Dec 4, 2024
84dda2f
Merge branch 'master' of github.com:AdheipSingh/fluent-bit
AdheipSingh Dec 4, 2024
648e878
revert prof_info changes
AdheipSingh Dec 4, 2024
30a7432
remove fluent-bit tgz
AdheipSingh Dec 4, 2024
f13ab68
debug port and server hostname
AdheipSingh Dec 4, 2024
9c1725b
add debug ctx
AdheipSingh Dec 4, 2024
2018767
hardcode port
AdheipSingh Dec 4, 2024
98ed0a5
revert debug
AdheipSingh Dec 4, 2024
2c9ba03
debug ports
AdheipSingh Dec 4, 2024
a7c5961
tweak configurations
AdheipSingh Dec 5, 2024
6d1f0a2
Merge branch 'master' into master
AdheipSingh Dec 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dockerfiles/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ RUN cmake -DFLB_RELEASE=On \
-DFLB_IN_SYSTEMD=On \
-DFLB_OUT_KAFKA=On \
-DFLB_OUT_PGSQL=On \
-DFLB_OUT_PARSEABLE=On \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this what should be used in the cmake config? It's already defaulting to on

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

feel free to suggest the best practice here. I tried to build without it but was not able to run the plugin.

-DFLB_NIGHTLY_BUILD="$FLB_NIGHTLY_BUILD" \
-DFLB_LOG_NO_CONTROL_CHARS=On \
-DFLB_CHUNK_TRACE="$FLB_CHUNK_TRACE" \
Expand Down
3 changes: 3 additions & 0 deletions lib/cprofiles/include/cprofiles/cprof_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
#ifndef CPROF_HAVE_GMTIME_R
#define CPROF_HAVE_GMTIME_R
#endif
#ifndef CPROF_HAVE_CLOCK_GET_TIME
#define CPROF_HAVE_CLOCK_GET_TIME
#endif
#ifndef CPROF_HAVE_CFL
#define CPROF_HAVE_CFL
#endif
Expand Down
1 change: 1 addition & 0 deletions plugins/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ REGISTER_OUT_PLUGIN("out_nrlogs")
REGISTER_OUT_PLUGIN("out_null")
REGISTER_OUT_PLUGIN("out_opensearch")
REGISTER_OUT_PLUGIN("out_oracle_log_analytics")
REGISTER_OUT_PLUGIN("out_parseable")

if (NOT CMAKE_SYSTEM_NAME MATCHES "Windows")
REGISTER_OUT_PLUGIN("out_plot")
Expand Down
4 changes: 4 additions & 0 deletions plugins/out_parseable/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
set(src
parseable.c)

FLB_PLUGIN(out_parseable "${src}" "")
306 changes: 306 additions & 0 deletions plugins/out_parseable/parseable.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,306 @@
#include <fluent-bit/flb_output_plugin.h>
#include <fluent-bit/flb_utils.h>
#include <fluent-bit/flb_slist.h>
#include <fluent-bit/flb_time.h>
#include <fluent-bit/flb_pack.h>
#include <fluent-bit/flb_config_map.h>
#include <fluent-bit/flb_metrics.h>

#include <msgpack.h>
#include "parseable.h"

static int cb_parseable_init(struct flb_output_instance *ins,
struct flb_config *config, void *data)
{
int ret;
struct flb_out_parseable *ctx = NULL;
(void) ins;
(void) config;
(void) data;

ctx = flb_calloc(1, sizeof(struct flb_out_parseable));
if (!ctx) {
flb_errno();
return -1;
}
ctx->ins = ins;

/* Read in config values */
ret = flb_output_config_map_set(ins, (void *) ctx);
if (ret == -1) {
flb_free(ctx);
return -1;
}

flb_plg_info(ctx->ins, "Configured port: %d", ctx->server_port);

ctx->upstream = flb_upstream_create(config,
ctx->server_host,
ctx->server_port,
FLB_IO_TCP,
NULL);

if (!ctx->upstream) {
flb_free(ctx);
return -1;
}

/* Export context */
flb_output_set_context(ins, ctx);

return 0;
}

static void cb_parseable_flush(struct flb_event_chunk *event_chunk,
struct flb_output_flush *out_flush,
struct flb_input_instance *i_ins,
void *out_context,
struct flb_config *config)
{
msgpack_unpacked result;
size_t off = 0;
struct flb_out_parseable *ctx = out_context;
(void) config;
struct flb_time tmp;
msgpack_object *p;
msgpack_sbuffer sbuf;
msgpack_packer pk;
struct flb_http_client *client;
struct flb_connection *u_conn;
flb_sds_t body;
flb_sds_t x_p_stream_value = NULL;
int ret;
size_t b_sent;

msgpack_unpacked_init(&result);
while (msgpack_unpack_next(&result,
event_chunk->data,
event_chunk->size, &off) == MSGPACK_UNPACK_SUCCESS) {
flb_time_pop_from_msgpack(&tmp, &result, &p);

/* Only operate if log is map type */
if (p->type != MSGPACK_OBJECT_MAP) {
continue;
}

/* Initialize the packer and buffer for serialization/packing */
msgpack_sbuffer_init(&sbuf);
msgpack_packer_init(&pk, &sbuf, msgpack_sbuffer_write);

/* Pack original key-value pairs */
msgpack_pack_map(&pk, p->via.map.size + 1);
for (int i = 0; i < p->via.map.size; i++) {
msgpack_pack_object(&pk, p->via.map.ptr[i].key);
msgpack_pack_object(&pk, p->via.map.ptr[i].val);
}

/* Append one more key-value pair */
msgpack_pack_str_with_body(&pk, "source", 6);
msgpack_pack_str_with_body(&pk, "fluent bit parseable plugin", 25);

/* Convert from msgpack to JSON */
body = flb_msgpack_raw_to_json_sds(sbuf.data, sbuf.size);

/* Free up buffer as we don't need it anymore */
msgpack_sbuffer_destroy(&sbuf);

/* Determine the value of the X-P-Stream header */
if (ctx->stream && strcmp(ctx->stream, "$NAMESPACE") == 0) {
/* Extract namespace_name from the body */
flb_sds_t body_copy = flb_sds_create(body);
if (body_copy == NULL) {
flb_plg_error(ctx->ins, "Failed to create a copy of the body");
flb_sds_destroy(body);
msgpack_unpacked_destroy(&result);
FLB_OUTPUT_RETURN(FLB_ERROR);
}


flb_sds_t namespace_name = flb_sds_create_size(256); // Dynamic string
if (body_copy != NULL) {
char *namespace_name_value = strstr(body_copy, "\"namespace_name\":\"");
if (namespace_name_value != NULL) {
namespace_name_value += strlen("\"namespace_name\":\"");
char *end_quote = strchr(namespace_name_value, '\"');
if (end_quote != NULL) {
*end_quote = '\0'; // Null-terminate the extracted value
namespace_name = flb_sds_printf(&namespace_name, "%s", namespace_name_value);

// Debug: Print the extracted namespace name
flb_plg_info(ctx->ins, "Extracted namespace_name: %s", namespace_name);


struct mk_list *head;
struct flb_slist_entry *entry;

if (ctx->exclude_namespaces) {
mk_list_foreach(head, ctx->exclude_namespaces) {
entry = mk_list_entry(head, struct flb_slist_entry, _head);
flb_plg_info(ctx->ins, "Checking against exclude namespace: %s", entry->str);
// flb_plg_info(ctx->ins, "namespace_name: %s %d", namespace_name,flb_sds_len(namespace_name));
if (flb_sds_cmp(entry->str, namespace_name, flb_sds_len(namespace_name)) == 0) {
flb_plg_info(ctx->ins, "Skipping excluded namespace: %s", namespace_name);
// Cleanup
flb_sds_destroy(namespace_name);
flb_sds_destroy(body);
flb_sds_destroy(body_copy);
msgpack_unpacked_destroy(&result);

// Skip sending the HTTP request
FLB_OUTPUT_RETURN(FLB_OK);
}
}
}
}
} else {
// Debug: Could not find the namespace_name in body_copy
flb_plg_info(ctx->ins, "namespace_name not found in body_copy.");
}
} else {
// Debug: body_copy is NULL
flb_plg_info(ctx->ins, "body_copy is NULL.");
}

flb_sds_destroy(body_copy);

if (!namespace_name || flb_sds_len(namespace_name) == 0) {
flb_plg_error(ctx->ins, "Failed to extract namespace_name from the body");
flb_sds_destroy(body);
flb_sds_destroy(namespace_name);
msgpack_unpacked_destroy(&result);
FLB_OUTPUT_RETURN(FLB_ERROR);
}

/* Determine the value of the X-P-Stream header */
x_p_stream_value = namespace_name; // Use the namespace name for the header
}
else if (ctx->stream) {
/* Use the user-specified stream directly */
x_p_stream_value = flb_sds_create(ctx->stream);
if (!x_p_stream_value) {
flb_plg_error(ctx->ins, "Failed to set X-P-Stream header to the specified stream: %s", ctx->stream);
flb_sds_destroy(body);
msgpack_unpacked_destroy(&result);
FLB_OUTPUT_RETURN(FLB_ERROR);
}
}
else {
flb_plg_error(ctx->ins, "Stream is not set. Cannot determine the value for X-P-Stream.");
flb_sds_destroy(body);
msgpack_unpacked_destroy(&result);
FLB_OUTPUT_RETURN(FLB_ERROR);
}

flb_plg_info(ctx->ins, "Creating upstream with server: %s, port: %d", ctx->server_host, ctx->server_port);

/* Get upstream connection */
u_conn = flb_upstream_conn_get(ctx->upstream);
if (!u_conn) {
flb_plg_error(ctx->ins, "connection initialization error");
flb_sds_destroy(body);
flb_sds_destroy(x_p_stream_value);
msgpack_unpacked_destroy(&result);
FLB_OUTPUT_RETURN(FLB_ERROR);
}

/* Compose HTTP Client request */
client = flb_http_client(u_conn,
FLB_HTTP_POST, "/api/v1/ingest",
body, flb_sds_len(body),
ctx->server_host, ctx->server_port,
NULL, 0);

if (!client) {
flb_plg_error(ctx->ins, "could not create HTTP client");
flb_sds_destroy(body);
flb_sds_destroy(x_p_stream_value);
flb_upstream_conn_release(u_conn);
msgpack_unpacked_destroy(&result);
FLB_OUTPUT_RETURN(FLB_ERROR);
}

/* Add HTTP headers */
flb_http_add_header(client, "Content-Type", 12, "application/json", 16);
flb_http_add_header(client, "X-P-Stream", 10, x_p_stream_value, flb_sds_len(x_p_stream_value));
flb_http_basic_auth(client, ctx->username, ctx->password);

/* Perform request */
ret = flb_http_do(client, &b_sent);
flb_plg_info(ctx->ins, "HTTP request http_do=%i, HTTP Status: %i",
ret, client->resp.status);

/* Clean up resources */
flb_sds_destroy(body);
flb_sds_destroy(x_p_stream_value);
flb_http_client_destroy(client);
flb_upstream_conn_release(u_conn);
}
msgpack_unpacked_destroy(&result);

FLB_OUTPUT_RETURN(FLB_OK);
}

static int cb_parseable_exit(void *data, struct flb_config *config)
{
struct flb_out_parseable *ctx = data;

if (!ctx) {
return 0;
}

flb_slist_destroy(&ctx->exclude_namespaces);
/* Free up resources */
if (ctx->upstream) {
flb_upstream_destroy(ctx->upstream);
}
flb_free(ctx);
return 0;
}

/* Configuration properties map */
static struct flb_config_map config_map[] = {
{
FLB_CONFIG_MAP_STR, "server_host", NULL,
0, FLB_TRUE, offsetof(struct flb_out_parseable, server_host),
"The host of the server to send logs to."
},
{
FLB_CONFIG_MAP_STR, "username", NULL,
0, FLB_TRUE, offsetof(struct flb_out_parseable, username),
"The parseable server username."
},
{
FLB_CONFIG_MAP_STR, "password", NULL,
0, FLB_TRUE, offsetof(struct flb_out_parseable, password),
"The parseable server password."
},
{
FLB_CONFIG_MAP_STR, "stream", NULL,
0, FLB_TRUE, offsetof(struct flb_out_parseable, stream),
"The stream name to send logs to. Using $NAMESPACE will dynamically create a namespace."
},
{
FLB_CONFIG_MAP_INT, "server_port", NULL,
0, FLB_TRUE, offsetof(struct flb_out_parseable, server_port),
"The port on the host to send logs to."
},
{
FLB_CONFIG_MAP_CLIST, "Exclude_Namespaces", NULL,
0, FLB_TRUE, offsetof(struct flb_out_parseable, exclude_namespaces),
"A space-separated list of Kubernetes namespaces to exclude from log forwarding."
},
/* EOF */
{0}
};

/* Plugin registration */
struct flb_output_plugin out_parseable_plugin = {
.name = "parseable",
.description = "Sends events to a HTTP server",
.cb_init = cb_parseable_init,
.cb_flush = cb_parseable_flush,
.cb_exit = cb_parseable_exit,
.flags = 0,
.event_type = FLB_OUTPUT_LOGS,
.config_map = config_map
};
18 changes: 18 additions & 0 deletions plugins/out_parseable/parseable.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#ifndef FLB_OUT_PARSEABLE_H
#define FLB_OUT_PARSEABLE_H

#include <fluent-bit/flb_output_plugin.h>
#include <fluent-bit/flb_sds.h>

struct flb_out_parseable {
flb_sds_t server_host;
int server_port;
flb_sds_t username;
flb_sds_t password;
flb_sds_t stream;
struct mk_list *exclude_namespaces; // Use mk_list for namespace exclusion
struct flb_upstream *upstream;
struct flb_output_instance *ins;
};

#endif