Skip to content

Commit

Permalink
out_stackdriver: Add partialSuccess: true
Browse files Browse the repository at this point in the history
Add partialSuccess: true to all logs sent to Google Cloud Logging API

Signed-off-by: avilevy <[email protected]>

Add `partialSuccess: true` to all logs sent to Google Cloud Logging API

Signed-off-by: avilevy <[email protected]>
  • Loading branch information
avilevy18 committed Oct 4, 2023
1 parent 92e0435 commit 5980688
Showing 1 changed file with 96 additions and 1 deletion.
97 changes: 96 additions & 1 deletion plugins/out_stackdriver/stackdriver.c
Original file line number Diff line number Diff line change
Expand Up @@ -1769,7 +1769,12 @@ static flb_sds_t stackdriver_format(struct flb_stackdriver *ctx,
* {"resource": {"type": "...", "labels": {...},
* "entries": []
*/
msgpack_pack_map(&mp_pck, 2);
msgpack_pack_map(&mp_pck, 3);

/* Set partialSuccess to true */
msgpack_pack_str(&mp_pck, 14);
msgpack_pack_str_body(&mp_pck, "partialSuccess", 14);
msgpack_pack_true(&mp_pck);

msgpack_pack_str(&mp_pck, 8);
msgpack_pack_str_body(&mp_pck, "resource", 8);
Expand Down Expand Up @@ -2510,6 +2515,91 @@ static void update_retry_metric(struct flb_stackdriver *ctx,
}
#endif

static int extract_map_from_msgpack(msgpack_object *root, char *name, int size, msgpack_object_type object_type ,msgpack_object *val){
int i;
msgpack_object key;
for (i = 0; i < root->via.map.size; i++) {
key = root->via.map.ptr[i].key;
if (key.type != MSGPACK_OBJECT_STR) {
continue;
}
if(key.via.str.size == size && strncmp(key.via.str.ptr, name, size) == 0){
*val = root->via.map.ptr[i].val;
if (val->type != object_type) {
return -1;
}
return 0;
}
}
return -1;
}

static int parse_partial_success_response(struct flb_http_client *c, struct flb_stackdriver *ctx, int *partial_failure){
int ret;
int root_type;
int i;
char *buffer;
size_t size;
size_t off = 0;
msgpack_unpacked result;
msgpack_object root;
msgpack_object error_map;
msgpack_object details_arr;
msgpack_object logEntryErrors_map;
ret = flb_pack_json(c->resp.payload, c->resp.payload_size,
&buffer, &size, &root_type, NULL);
msgpack_unpacked_init(&result);
ret = msgpack_unpack_next(&result, buffer, size, &off);
if (ret != MSGPACK_UNPACK_SUCCESS) {
flb_plg_error(ctx->ins, "Cannot unpack %s response\n%s", c->resp.payload);
flb_free(buffer);
msgpack_unpacked_destroy(&result);
return -1;
}

root = result.data;
if (root.type != MSGPACK_OBJECT_MAP) {
flb_plg_error(ctx->ins, "%s response parsing failed, msgpack_type=%i", root.type);
flb_free(buffer);
msgpack_unpacked_destroy(&result);
return -1;
}

ret = extract_map_from_msgpack(&root, "error", 5, MSGPACK_OBJECT_MAP ,&error_map);
if (ret == -1) {
flb_plg_error(ctx->ins, "%s response parsing failed, could not find key: \"error\"");
flb_free(buffer);
msgpack_unpacked_destroy(&result);
return -1;
}
ret = extract_map_from_msgpack(&error_map, "details", 7, MSGPACK_OBJECT_ARRAY ,&details_arr);
if (ret == -1) {
flb_plg_error(ctx->ins, "%s response parsing failed, could not find key: \"details\"");
flb_free(buffer);
msgpack_unpacked_destroy(&result);
return -1;
}
for (i = 0; i < details_arr.via.array.size; i++) {
logEntryErrors_map = details_arr.via.array.ptr[i];
if (logEntryErrors_map.type != MSGPACK_OBJECT_MAP) {
continue;
}
ret = extract_map_from_msgpack(&logEntryErrors_map, "logEntryErrors", 14, MSGPACK_OBJECT_MAP ,&logEntryErrors_map);
if (ret == -1){
flb_plg_error(ctx->ins, "%s response parsing failed, could not find key: \"logEntryErrors\"");
flb_free(buffer);
msgpack_unpacked_destroy(&result);
return -1;
}
break;
}

*partial_failure = logEntryErrors_map.via.map.size;
flb_free(buffer);
msgpack_unpacked_destroy(&result);
return 0;
}

static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk,
struct flb_output_flush *out_flush,
struct flb_input_instance *i_ins,
Expand All @@ -2520,6 +2610,7 @@ static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk,
(void) config;
int ret;
int ret_code = FLB_RETRY;
int partial_failure;
size_t b_sent;
flb_sds_t token;
flb_sds_t payload_buf;
Expand Down Expand Up @@ -2637,6 +2728,10 @@ static void cb_stackdriver_flush(struct flb_event_chunk *event_chunk,
}
else if (c->resp.status >= 400 && c->resp.status < 500) {
ret_code = FLB_ERROR;
if(c->resp.status == 400){
ret = parse_partial_success_response(c, ctx, &partial_failure);
cmt_counter_add(ctx->cmt_failed_requests, ts, (double)partial_failure, 1, (char *[]) {name});
}
flb_plg_warn(ctx->ins, "error\n%s",
c->resp.payload);
}
Expand Down

0 comments on commit 5980688

Please sign in to comment.