Skip to content

Commit

Permalink
filter_parser: fix reserve data and preserve key handling (fluent#9675)
Browse files Browse the repository at this point in the history
Handle properly the 4 combinations for Reserve_Data and Preserve_Key:
- Off/Off: keep only parsed fields
- Off/On: parsed fields + original key
- On/Off: all fields except target key
- On/On: keep all fields

Signed-off-by: Jorge Niedbalski <[email protected]>
Co-authored-by: Jorge Niedbalski <[email protected]>
Signed-off-by: AdheipSingh <[email protected]>
  • Loading branch information
2 people authored and AdheipSingh committed Dec 4, 2024
1 parent 250b3ec commit 9efde0a
Show file tree
Hide file tree
Showing 2 changed files with 478 additions and 148 deletions.
163 changes: 83 additions & 80 deletions plugins/filter_parser/filter_parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,6 @@ static int cb_parser_init(struct flb_filter_instance *f_ins,
struct flb_config *config,
void *data)
{
(void) f_ins;
(void) config;
(void) data;

struct filter_parser_ctx *ctx = NULL;

/* Create context */
Expand All @@ -156,13 +152,12 @@ static int cb_parser_init(struct flb_filter_instance *f_ins,
}
ctx->ins = f_ins;

if ( configure(ctx, f_ins, config) < 0 ){
if (configure(ctx, f_ins, config) < 0) {
flb_free(ctx);
return -1;
}

flb_filter_set_context(f_ins, ctx);

return 0;
}

Expand All @@ -174,11 +169,9 @@ static int cb_parser_filter(const void *data, size_t bytes,
void *context,
struct flb_config *config)
{
int continue_parsing;
struct filter_parser_ctx *ctx = context;
struct flb_time tm;
msgpack_object *obj;

msgpack_object_kv *kv;
int i;
int ret = FLB_FILTER_NOTOUCH;
Expand All @@ -191,10 +184,8 @@ static int cb_parser_filter(const void *data, size_t bytes,
char *out_buf;
size_t out_size;
struct flb_time parsed_time;

msgpack_object_kv **append_arr = NULL;
size_t append_arr_len = 0;
int append_arr_i;
struct mk_list *head;
struct filter_parser *fp;
struct flb_log_event_encoder log_encoder;
Expand All @@ -209,99 +200,86 @@ static int cb_parser_filter(const void *data, size_t bytes,
ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes);

if (ret != FLB_EVENT_DECODER_SUCCESS) {
flb_plg_error(ctx->ins,
"Log event decoder initialization error : %d", ret);

flb_plg_error(ctx->ins, "Log event decoder initialization error : %d", ret);
return FLB_FILTER_NOTOUCH;
}

ret = flb_log_event_encoder_init(&log_encoder,
FLB_LOG_EVENT_FORMAT_DEFAULT);

if (ret != FLB_EVENT_ENCODER_SUCCESS) {
flb_plg_error(ctx->ins,
"Log event encoder initialization error : %d", ret);

flb_plg_error(ctx->ins, "Log event encoder initialization error : %d", ret);
flb_log_event_decoder_destroy(&log_decoder);

return FLB_FILTER_NOTOUCH;
}

while ((ret = flb_log_event_decoder_next(
&log_decoder,
&log_event)) == FLB_EVENT_DECODER_SUCCESS) {
out_buf = NULL;
append_arr_i = 0;

flb_time_copy(&tm, &log_event.timestamp);
obj = log_event.body;

if (obj->type == MSGPACK_OBJECT_MAP) {
map_num = obj->via.map.size;
if (ctx->reserve_data) {
append_arr_len = obj->via.map.size;
append_arr = flb_calloc(append_arr_len, sizeof(msgpack_object_kv *));
/* Calculate initial array size based on configuration */
append_arr_len = (ctx->reserve_data ? map_num : 0);
if (ctx->preserve_key && !ctx->reserve_data) {
append_arr_len = 1; /* Space for preserved key */
}

if (append_arr_len > 0) {
append_arr = flb_calloc(append_arr_len, sizeof(msgpack_object_kv *));
if (append_arr == NULL) {
flb_errno();

flb_log_event_decoder_destroy(&log_decoder);
flb_log_event_encoder_destroy(&log_encoder);

return FLB_FILTER_NOTOUCH;
}
}

continue_parsing = FLB_TRUE;
for (i = 0; i < map_num && continue_parsing; i++) {
kv = &obj->via.map.ptr[i];
/* Initialize array */
if (ctx->reserve_data) {
append_arr[append_arr_i] = kv;
append_arr_i++;
for (i = 0; i < map_num; i++) {
append_arr[i] = &obj->via.map.ptr[i];
}
}
if ( msgpackobj2char(&kv->key, &key_str, &key_len) < 0 ) {
/* key is not string */
}

/* Process the target key */
for (i = 0; i < map_num; i++) {
kv = &obj->via.map.ptr[i];
if (msgpackobj2char(&kv->key, &key_str, &key_len) < 0) {
continue;
}

if (key_len == ctx->key_name_len &&
!strncmp(key_str, ctx->key_name, key_len)) {
if ( msgpackobj2char(&kv->val, &val_str, &val_len) < 0 ) {
/* val is not string */
if (msgpackobj2char(&kv->val, &val_str, &val_len) < 0) {
continue;
}

/* Lookup parser */
mk_list_foreach(head, &ctx->parsers) {
fp = mk_list_entry(head, struct filter_parser, _head);

/* Reset time */
flb_time_zero(&parsed_time);

parse_ret = flb_parser_do(fp->parser, val_str, val_len,
(void **) &out_buf, &out_size,
&parsed_time);
if (parse_ret >= 0) {
/*
* If the parser succeeded we need to check the
* status of the parsed time. If the time was
* parsed successfully 'parsed_time' will be
* different than zero, if so, override the time
* holder with the new value, otherwise keep the
* original.
*/
if (flb_time_to_nanosec(&parsed_time) != 0L) {
flb_time_copy(&tm, &parsed_time);
}

if (ctx->reserve_data) {
if (append_arr != NULL) {
if (!ctx->preserve_key) {
append_arr_i--;
append_arr_len--;
append_arr[append_arr_i] = NULL;
append_arr[i] = NULL;
}
else if (!ctx->reserve_data) {
/* Store only the key being preserved */
append_arr[0] = kv;
}
}
else {
continue_parsing = FLB_FALSE;
}
break;
}
Expand All @@ -322,27 +300,58 @@ static int cb_parser_filter(const void *data, size_t bytes,
&log_encoder, log_event.metadata);
}

if (out_buf != NULL) {
if (ctx->reserve_data) {
if (out_buf != NULL && parse_ret >= 0) {
if (append_arr != NULL && append_arr_len > 0) {
char *new_buf = NULL;
int new_size;
int ret;
ret = flb_msgpack_expand_map(out_buf, out_size,
append_arr, append_arr_len,
&new_buf, &new_size);
if (ret == -1) {
flb_plg_error(ctx->ins, "cannot expand map");

flb_log_event_decoder_destroy(&log_decoder);
flb_log_event_encoder_destroy(&log_encoder);
flb_free(append_arr);

return FLB_FILTER_NOTOUCH;
int new_size;
size_t valid_kv_count = 0;
msgpack_object_kv **valid_kv = NULL;

/* Count valid entries */
for (i = 0; i < append_arr_len; i++) {
if (append_arr[i] != NULL) {
valid_kv_count++;
}
}

flb_free(out_buf);
out_buf = new_buf;
out_size = new_size;
if (valid_kv_count > 0) {
valid_kv = flb_calloc(valid_kv_count, sizeof(msgpack_object_kv *));
if (!valid_kv) {
flb_errno();
flb_log_event_decoder_destroy(&log_decoder);
flb_log_event_encoder_destroy(&log_encoder);
flb_free(append_arr);
flb_free(out_buf);
return FLB_FILTER_NOTOUCH;
}

/* Fill valid entries */
valid_kv_count = 0;
for (i = 0; i < append_arr_len; i++) {
if (append_arr[i] != NULL) {
valid_kv[valid_kv_count++] = append_arr[i];
}
}

ret = flb_msgpack_expand_map(out_buf, out_size,
valid_kv, valid_kv_count,
&new_buf, &new_size);

flb_free(valid_kv);

if (ret == -1) {
flb_plg_error(ctx->ins, "cannot expand map");
flb_log_event_decoder_destroy(&log_decoder);
flb_log_event_encoder_destroy(&log_encoder);
flb_free(append_arr);
flb_free(out_buf);
return FLB_FILTER_NOTOUCH;
}

flb_free(out_buf);
out_buf = new_buf;
out_size = new_size;
}
}

if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) {
Expand All @@ -355,7 +364,6 @@ static int cb_parser_filter(const void *data, size_t bytes,
ret = FLB_FILTER_MODIFIED;
}
else {
/* re-use original data*/
if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) {
encoder_result = \
flb_log_event_encoder_set_body_from_msgpack_object(
Expand All @@ -371,26 +379,22 @@ static int cb_parser_filter(const void *data, size_t bytes,
flb_plg_error(ctx->ins, "log event encoder error : %d", encoder_result);
}

flb_free(append_arr);
append_arr = NULL;
}
else {
continue;
if (append_arr != NULL) {
flb_free(append_arr);
append_arr = NULL;
}
}
}

if (log_encoder.output_length > 0) {
*ret_buf = log_encoder.output_buffer;
*ret_buf = log_encoder.output_buffer;
*ret_bytes = log_encoder.output_length;

ret = FLB_FILTER_MODIFIED;

flb_log_event_encoder_claim_internal_buffer_ownership(&log_encoder);
}
else {
flb_plg_error(ctx->ins,
"Log event encoder error : %d", ret);

flb_plg_error(ctx->ins, "Log event encoder error : %d", ret);
ret = FLB_FILTER_NOTOUCH;
}

Expand All @@ -400,7 +404,6 @@ static int cb_parser_filter(const void *data, size_t bytes,
return ret;
}


static int cb_parser_exit(void *data, struct flb_config *config)
{
struct filter_parser_ctx *ctx = data;
Expand Down
Loading

0 comments on commit 9efde0a

Please sign in to comment.