Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filter_parser: fix reserve data and preserve key handling #9675

Merged
merged 1 commit into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 83 additions & 80 deletions plugins/filter_parser/filter_parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,6 @@ static int cb_parser_init(struct flb_filter_instance *f_ins,
struct flb_config *config,
void *data)
{
(void) f_ins;
(void) config;
(void) data;

struct filter_parser_ctx *ctx = NULL;

/* Create context */
Expand All @@ -156,13 +152,12 @@ static int cb_parser_init(struct flb_filter_instance *f_ins,
}
ctx->ins = f_ins;

if ( configure(ctx, f_ins, config) < 0 ){
if (configure(ctx, f_ins, config) < 0) {
flb_free(ctx);
return -1;
}

flb_filter_set_context(f_ins, ctx);

return 0;
}

Expand All @@ -174,11 +169,9 @@ static int cb_parser_filter(const void *data, size_t bytes,
void *context,
struct flb_config *config)
{
int continue_parsing;
struct filter_parser_ctx *ctx = context;
struct flb_time tm;
msgpack_object *obj;

msgpack_object_kv *kv;
int i;
int ret = FLB_FILTER_NOTOUCH;
Expand All @@ -191,10 +184,8 @@ static int cb_parser_filter(const void *data, size_t bytes,
char *out_buf;
size_t out_size;
struct flb_time parsed_time;

msgpack_object_kv **append_arr = NULL;
size_t append_arr_len = 0;
int append_arr_i;
struct mk_list *head;
struct filter_parser *fp;
struct flb_log_event_encoder log_encoder;
Expand All @@ -209,99 +200,86 @@ static int cb_parser_filter(const void *data, size_t bytes,
ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes);

if (ret != FLB_EVENT_DECODER_SUCCESS) {
flb_plg_error(ctx->ins,
"Log event decoder initialization error : %d", ret);

flb_plg_error(ctx->ins, "Log event decoder initialization error : %d", ret);
return FLB_FILTER_NOTOUCH;
}

ret = flb_log_event_encoder_init(&log_encoder,
FLB_LOG_EVENT_FORMAT_DEFAULT);

if (ret != FLB_EVENT_ENCODER_SUCCESS) {
flb_plg_error(ctx->ins,
"Log event encoder initialization error : %d", ret);

flb_plg_error(ctx->ins, "Log event encoder initialization error : %d", ret);
flb_log_event_decoder_destroy(&log_decoder);

return FLB_FILTER_NOTOUCH;
}

while ((ret = flb_log_event_decoder_next(
&log_decoder,
&log_event)) == FLB_EVENT_DECODER_SUCCESS) {
out_buf = NULL;
append_arr_i = 0;

flb_time_copy(&tm, &log_event.timestamp);
obj = log_event.body;

if (obj->type == MSGPACK_OBJECT_MAP) {
map_num = obj->via.map.size;
if (ctx->reserve_data) {
append_arr_len = obj->via.map.size;
append_arr = flb_calloc(append_arr_len, sizeof(msgpack_object_kv *));
/* Calculate initial array size based on configuration */
append_arr_len = (ctx->reserve_data ? map_num : 0);
if (ctx->preserve_key && !ctx->reserve_data) {
append_arr_len = 1; /* Space for preserved key */
}

if (append_arr_len > 0) {
append_arr = flb_calloc(append_arr_len, sizeof(msgpack_object_kv *));
if (append_arr == NULL) {
flb_errno();

flb_log_event_decoder_destroy(&log_decoder);
flb_log_event_encoder_destroy(&log_encoder);

return FLB_FILTER_NOTOUCH;
}
}

continue_parsing = FLB_TRUE;
for (i = 0; i < map_num && continue_parsing; i++) {
kv = &obj->via.map.ptr[i];
/* Initialize array */
if (ctx->reserve_data) {
append_arr[append_arr_i] = kv;
append_arr_i++;
for (i = 0; i < map_num; i++) {
append_arr[i] = &obj->via.map.ptr[i];
}
}
if ( msgpackobj2char(&kv->key, &key_str, &key_len) < 0 ) {
/* key is not string */
}

/* Process the target key */
for (i = 0; i < map_num; i++) {
kv = &obj->via.map.ptr[i];
if (msgpackobj2char(&kv->key, &key_str, &key_len) < 0) {
continue;
}

if (key_len == ctx->key_name_len &&
!strncmp(key_str, ctx->key_name, key_len)) {
if ( msgpackobj2char(&kv->val, &val_str, &val_len) < 0 ) {
/* val is not string */
if (msgpackobj2char(&kv->val, &val_str, &val_len) < 0) {
continue;
}

/* Lookup parser */
mk_list_foreach(head, &ctx->parsers) {
fp = mk_list_entry(head, struct filter_parser, _head);

/* Reset time */
flb_time_zero(&parsed_time);

parse_ret = flb_parser_do(fp->parser, val_str, val_len,
(void **) &out_buf, &out_size,
&parsed_time);
if (parse_ret >= 0) {
/*
* If the parser succeeded we need to check the
* status of the parsed time. If the time was
* parsed successfully 'parsed_time' will be
* different than zero, if so, override the time
* holder with the new value, otherwise keep the
* original.
*/
if (flb_time_to_nanosec(&parsed_time) != 0L) {
flb_time_copy(&tm, &parsed_time);
}

if (ctx->reserve_data) {
if (append_arr != NULL) {
if (!ctx->preserve_key) {
append_arr_i--;
append_arr_len--;
append_arr[append_arr_i] = NULL;
append_arr[i] = NULL;
}
else if (!ctx->reserve_data) {
/* Store only the key being preserved */
append_arr[0] = kv;
}
}
else {
continue_parsing = FLB_FALSE;
}
break;
}
Expand All @@ -322,27 +300,58 @@ static int cb_parser_filter(const void *data, size_t bytes,
&log_encoder, log_event.metadata);
}

if (out_buf != NULL) {
if (ctx->reserve_data) {
if (out_buf != NULL && parse_ret >= 0) {
if (append_arr != NULL && append_arr_len > 0) {
char *new_buf = NULL;
int new_size;
int ret;
ret = flb_msgpack_expand_map(out_buf, out_size,
append_arr, append_arr_len,
&new_buf, &new_size);
if (ret == -1) {
flb_plg_error(ctx->ins, "cannot expand map");

flb_log_event_decoder_destroy(&log_decoder);
flb_log_event_encoder_destroy(&log_encoder);
flb_free(append_arr);

return FLB_FILTER_NOTOUCH;
int new_size;
size_t valid_kv_count = 0;
msgpack_object_kv **valid_kv = NULL;

/* Count valid entries */
for (i = 0; i < append_arr_len; i++) {
if (append_arr[i] != NULL) {
valid_kv_count++;
}
}

flb_free(out_buf);
out_buf = new_buf;
out_size = new_size;
if (valid_kv_count > 0) {
valid_kv = flb_calloc(valid_kv_count, sizeof(msgpack_object_kv *));
if (!valid_kv) {
flb_errno();
flb_log_event_decoder_destroy(&log_decoder);
flb_log_event_encoder_destroy(&log_encoder);
flb_free(append_arr);
flb_free(out_buf);
return FLB_FILTER_NOTOUCH;
}

/* Fill valid entries */
valid_kv_count = 0;
for (i = 0; i < append_arr_len; i++) {
if (append_arr[i] != NULL) {
valid_kv[valid_kv_count++] = append_arr[i];
}
}

ret = flb_msgpack_expand_map(out_buf, out_size,
valid_kv, valid_kv_count,
&new_buf, &new_size);

flb_free(valid_kv);

if (ret == -1) {
flb_plg_error(ctx->ins, "cannot expand map");
flb_log_event_decoder_destroy(&log_decoder);
flb_log_event_encoder_destroy(&log_encoder);
flb_free(append_arr);
flb_free(out_buf);
return FLB_FILTER_NOTOUCH;
}

flb_free(out_buf);
out_buf = new_buf;
out_size = new_size;
}
}

if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) {
Expand All @@ -355,7 +364,6 @@ static int cb_parser_filter(const void *data, size_t bytes,
ret = FLB_FILTER_MODIFIED;
}
else {
/* re-use original data*/
if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) {
encoder_result = \
flb_log_event_encoder_set_body_from_msgpack_object(
Expand All @@ -371,26 +379,22 @@ static int cb_parser_filter(const void *data, size_t bytes,
flb_plg_error(ctx->ins, "log event encoder error : %d", encoder_result);
}

flb_free(append_arr);
append_arr = NULL;
}
else {
continue;
if (append_arr != NULL) {
flb_free(append_arr);
append_arr = NULL;
}
}
}

if (log_encoder.output_length > 0) {
*ret_buf = log_encoder.output_buffer;
*ret_buf = log_encoder.output_buffer;
*ret_bytes = log_encoder.output_length;

ret = FLB_FILTER_MODIFIED;

flb_log_event_encoder_claim_internal_buffer_ownership(&log_encoder);
}
else {
flb_plg_error(ctx->ins,
"Log event encoder error : %d", ret);

flb_plg_error(ctx->ins, "Log event encoder error : %d", ret);
ret = FLB_FILTER_NOTOUCH;
}

Expand All @@ -400,7 +404,6 @@ static int cb_parser_filter(const void *data, size_t bytes,
return ret;
}


static int cb_parser_exit(void *data, struct flb_config *config)
{
struct filter_parser_ctx *ctx = data;
Expand Down
Loading
Loading