Skip to content

Commit

Permalink
out_cloudwatch_logs: remove sequence tokens from API calls (#7973)
Browse files Browse the repository at this point in the history
Signed-off-by: Matthew Fala <[email protected]>
  • Loading branch information
matthewfala authored and leonardo-albertovich committed Nov 3, 2023
1 parent 4b00028 commit 26a4833
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 144 deletions.
114 changes: 11 additions & 103 deletions plugins/out_cloudwatch_logs/cloudwatch_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,7 @@
#include "cloudwatch_api.h"

#define ERR_CODE_ALREADY_EXISTS "ResourceAlreadyExistsException"
#define ERR_CODE_INVALID_SEQUENCE_TOKEN "InvalidSequenceTokenException"
#define ERR_CODE_NOT_FOUND "ResourceNotFoundException"
#define ERR_CODE_DATA_ALREADY_ACCEPTED "DataAlreadyAcceptedException"

#define AMZN_REQUEST_ID_HEADER "x-amzn-RequestId"

Expand Down Expand Up @@ -229,23 +227,6 @@ static int init_put_payload(struct flb_cloudwatch *ctx, struct cw_flush *buf,
goto error;
}

if (stream->sequence_token) {
if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,
"\"sequenceToken\":\"", 17)) {
goto error;
}

if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,
stream->sequence_token, 0)) {
goto error;
}

if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,
"\",", 2)) {
goto error;
}
}

if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,
"\"logEvents\":[", 13)) {
goto error;
Expand Down Expand Up @@ -493,9 +474,6 @@ void reset_flush_buf(struct flb_cloudwatch *ctx, struct cw_flush *buf) {
if (buf->current_stream != NULL) {
buf->data_size += strlen(buf->current_stream->name);
buf->data_size += strlen(buf->current_stream->group);
if (buf->current_stream->sequence_token) {
buf->data_size += strlen(buf->current_stream->sequence_token);
}
}
}

Expand Down Expand Up @@ -1153,7 +1131,6 @@ static int set_log_group_retention(struct flb_cloudwatch *ctx, struct log_stream
struct flb_aws_client *cw_client;
flb_sds_t body;
flb_sds_t tmp;
flb_sds_t error;

flb_plg_info(ctx->ins, "Setting retention policy on log group %s to %dd", stream->group, ctx->log_retention_days);

Expand Down Expand Up @@ -1196,17 +1173,9 @@ static int set_log_group_retention(struct flb_cloudwatch *ctx, struct log_stream

/* Check error */
if (c->resp.payload_size > 0) {
error = flb_aws_error(c->resp.payload, c->resp.payload_size);
if (error != NULL) {
/* some other error occurred; notify user */
flb_aws_print_error(c->resp.payload, c->resp.payload_size,
"PutRetentionPolicy", ctx->ins);
flb_sds_destroy(error);
}
else {
/* error can not be parsed, print raw response to debug */
flb_plg_debug(ctx->ins, "Raw response: %s", c->resp.payload);
}
/* some error occurred; notify user */
flb_aws_print_error(c->resp.payload, c->resp.payload_size,
"PutRetentionPolicy", ctx->ins);
}
}

Expand Down Expand Up @@ -1287,8 +1256,8 @@ int create_log_group(struct flb_cloudwatch *ctx, struct log_stream *stream)
flb_sds_destroy(error);
}
else {
/* error can not be parsed, print raw response to debug */
flb_plg_debug(ctx->ins, "Raw response: %s", c->resp.payload);
/* error can not be parsed, print raw response */
flb_plg_warn(ctx->ins, "Raw response: %s", c->resp.payload);
}
}
}
Expand Down Expand Up @@ -1402,8 +1371,8 @@ int create_log_stream(struct flb_cloudwatch *ctx, struct log_stream *stream,
flb_sds_destroy(error);
}
else {
/* error can not be parsed, print raw response to debug */
flb_plg_debug(ctx->ins, "Raw response: %s", c->resp.payload);
/* error can not be parsed, print raw response */
flb_plg_warn(ctx->ins, "Raw response: %s", c->resp.payload);
}
}
}
Expand All @@ -1417,8 +1386,7 @@ int create_log_stream(struct flb_cloudwatch *ctx, struct log_stream *stream,
}

/*
* Returns -1 on failure, 0 on success, and 1 for a sequence token error,
* which means the caller can retry.
* Returns -1 on failure, 0 on success
*/
int put_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf,
struct log_stream *stream, size_t payload_size)
Expand All @@ -1427,7 +1395,6 @@ int put_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf,
struct flb_http_client *c = NULL;
struct flb_aws_client *cw_client;
flb_sds_t tmp;
flb_sds_t error;
int num_headers = 1;
int retry = FLB_TRUE;

Expand Down Expand Up @@ -1460,8 +1427,7 @@ int put_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf,
if (c->resp.data == NULL || c->resp.data_len == 0 || strstr(c->resp.data, AMZN_REQUEST_ID_HEADER) == NULL) {
/* code was 200, but response is invalid, treat as failure */
if (c->resp.data != NULL) {
flb_plg_debug(ctx->ins, "Could not find sequence token in "
"response: response body is empty: full data: `%.*s`", c->resp.data_len, c->resp.data);
flb_plg_debug(ctx->ins, "Invalid response: full data: `%.*s`", c->resp.data_len, c->resp.data);
}
flb_http_client_destroy(c);

Expand All @@ -1474,73 +1440,15 @@ int put_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf,
AMZN_REQUEST_ID_HEADER);
return -1;
}


/* success */
if (c->resp.payload_size > 0) {
flb_plg_debug(ctx->ins, "Sent events to %s", stream->name);
tmp = flb_json_get_val(c->resp.payload, c->resp.payload_size,
"nextSequenceToken");
if (tmp) {
if (stream->sequence_token != NULL) {
flb_sds_destroy(stream->sequence_token);
}
stream->sequence_token = tmp;

flb_http_client_destroy(c);
return 0;
}
else {
flb_plg_error(ctx->ins, "Could not find sequence token in "
"response: %s", c->resp.payload);
}
}

flb_http_client_destroy(c);
return 0;
}

/* Check error */
if (c->resp.payload_size > 0) {
error = flb_aws_error(c->resp.payload, c->resp.payload_size);
if (error != NULL) {
if (strcmp(error, ERR_CODE_INVALID_SEQUENCE_TOKEN) == 0) {
/*
* This case will happen when we do not know the correct
* sequence token; we can find it in the error response
* and retry.
*/
flb_plg_debug(ctx->ins, "Sequence token was invalid, "
"will retry");
tmp = flb_json_get_val(c->resp.payload, c->resp.payload_size,
"expectedSequenceToken");
if (tmp) {
if (stream->sequence_token != NULL) {
flb_sds_destroy(stream->sequence_token);
}
stream->sequence_token = tmp;
flb_sds_destroy(error);
flb_http_client_destroy(c);
/* tell the caller to retry */
return 1;
}
} else if (strcmp(error, ERR_CODE_DATA_ALREADY_ACCEPTED) == 0) {
/* not sure what causes this but it counts as success */
flb_plg_info(ctx->ins, "Got %s, a previous retry must have succeeded asychronously", ERR_CODE_DATA_ALREADY_ACCEPTED);
flb_sds_destroy(error);
flb_http_client_destroy(c);
/* success */
return 0;
}
/* some other error occurred; notify user */
flb_aws_print_error(c->resp.payload, c->resp.payload_size,
"PutLogEvents", ctx->ins);
flb_sds_destroy(error);
}
else {
/* error could not be parsed, print raw response to debug */
flb_plg_debug(ctx->ins, "Raw response: %s", c->resp.payload);
}
flb_aws_print_error(c->resp.payload, c->resp.payload_size,
"PutLogEvents", ctx->ins);
}
}

Expand Down
68 changes: 32 additions & 36 deletions plugins/out_cloudwatch_logs/cloudwatch_logs.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ static int cb_cloudwatch_init(struct flb_output_instance *ins,
const char *tmp;
char *session_name = NULL;
struct flb_cloudwatch *ctx = NULL;
struct cw_flush *buf = NULL;
int ret;
flb_sds_t tmp_sds = NULL;
(void) config;
Expand Down Expand Up @@ -348,50 +347,53 @@ static int cb_cloudwatch_init(struct flb_output_instance *ins,
flb_output_upstream_set(upstream, ctx->ins);
ctx->cw_client->host = ctx->endpoint;

/* alloc the payload/processing buffer */
/* Export context */
flb_output_set_context(ins, ctx);

return 0;

error:
flb_free(session_name);
flb_plg_error(ctx->ins, "Initialization failed");
flb_cloudwatch_ctx_destroy(ctx);
return -1;
}

struct cw_flush *new_buffer()
{
struct cw_flush *buf;

buf = flb_calloc(1, sizeof(struct cw_flush));
if (!buf) {
flb_errno();
goto error;
return NULL;
}

buf->out_buf = flb_malloc(PUT_LOG_EVENTS_PAYLOAD_SIZE);
if (!buf->out_buf) {
flb_errno();
cw_flush_destroy(buf);
goto error;
return NULL;
}
buf->out_buf_size = PUT_LOG_EVENTS_PAYLOAD_SIZE;

buf->tmp_buf = flb_malloc(sizeof(char) * PUT_LOG_EVENTS_PAYLOAD_SIZE);
if (!buf->tmp_buf) {
flb_errno();
cw_flush_destroy(buf);
goto error;
return NULL;
}
buf->tmp_buf_size = PUT_LOG_EVENTS_PAYLOAD_SIZE;

buf->events = flb_malloc(sizeof(struct cw_event) * MAX_EVENTS_PER_PUT);
if (!buf->events) {
flb_errno();
cw_flush_destroy(buf);
goto error;
return NULL;
}
buf->events_capacity = MAX_EVENTS_PER_PUT;

ctx->buf = buf;


/* Export context */
flb_output_set_context(ins, ctx);

return 0;

error:
flb_free(session_name);
flb_plg_error(ctx->ins, "Initialization failed");
flb_cloudwatch_ctx_destroy(ctx);
return -1;
return buf;
}

static void cb_cloudwatch_flush(struct flb_event_chunk *event_chunk,
Expand All @@ -405,15 +407,21 @@ static void cb_cloudwatch_flush(struct flb_event_chunk *event_chunk,
(void) i_ins;
(void) config;

event_count = process_and_send(ctx, i_ins->p->name, ctx->buf, event_chunk->tag,
event_chunk->data, event_chunk->size);
struct cw_flush *buf;

buf = new_buffer();
if (!buf) {
FLB_OUTPUT_RETURN(FLB_RETRY);
}

event_count = process_and_send(ctx, i_ins->p->name, buf, event_chunk->tag, event_chunk->data, event_chunk->size);
if (event_count < 0) {
flb_plg_error(ctx->ins, "Failed to send events");
cw_flush_destroy(buf);
FLB_OUTPUT_RETURN(FLB_RETRY);
}

// TODO: this msg is innaccurate if events are skipped
flb_plg_debug(ctx->ins, "Sent %d events to CloudWatch", event_count);
cw_flush_destroy(buf);

FLB_OUTPUT_RETURN(FLB_OK);
}
Expand All @@ -429,10 +437,6 @@ void flb_cloudwatch_ctx_destroy(struct flb_cloudwatch *ctx)
flb_aws_provider_destroy(ctx->base_aws_provider);
}

if (ctx->buf) {
cw_flush_destroy(ctx->buf);
}

if (ctx->aws_provider) {
flb_aws_provider_destroy(ctx->aws_provider);
}
Expand Down Expand Up @@ -496,9 +500,6 @@ void log_stream_destroy(struct log_stream *stream)
if (stream->name) {
flb_sds_destroy(stream->name);
}
if (stream->sequence_token) {
flb_sds_destroy(stream->sequence_token);
}
if (stream->group) {
flb_sds_destroy(stream->group);
}
Expand Down Expand Up @@ -657,12 +658,7 @@ struct flb_output_plugin out_cloudwatch_logs_plugin = {
.cb_init = cb_cloudwatch_init,
.cb_flush = cb_cloudwatch_flush,
.cb_exit = cb_cloudwatch_exit,

/*
* Allow cloudwatch to use async network stack synchronously by opting into
* FLB_OUTPUT_SYNCHRONOUS synchronous task scheduler
*/
.flags = FLB_OUTPUT_SYNCHRONOUS,
.flags = 0,
.workers = 1,

/* Configuration */
Expand Down
8 changes: 3 additions & 5 deletions plugins/out_cloudwatch_logs/cloudwatch_logs.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ struct cw_event {
struct log_stream {
flb_sds_t name;
flb_sds_t group;
flb_sds_t sequence_token;

/*
* log streams in CloudWatch do not expire; but our internal representations
* of them are periodically cleaned up if they have been unused for too long
Expand All @@ -87,8 +87,6 @@ struct log_stream {
struct mk_list _head;
};

void log_stream_destroy(struct log_stream *stream);

struct flb_cloudwatch {
/*
* TLS instances can not be re-used. So we have one for:
Expand Down Expand Up @@ -138,8 +136,6 @@ struct flb_cloudwatch {
/* stores log streams we're putting to */
struct mk_list streams;

/* buffers for data processing and request payload */
struct cw_flush *buf;
/* The namespace to use for the metric */
flb_sds_t metric_namespace;

Expand All @@ -155,4 +151,6 @@ struct flb_cloudwatch {

void flb_cloudwatch_ctx_destroy(struct flb_cloudwatch *ctx);

void log_stream_destroy(struct log_stream *stream);

#endif
2 changes: 2 additions & 0 deletions src/aws/flb_aws_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,8 @@ void flb_aws_print_error(char *response, size_t response_len,

error = flb_json_get_val(response, response_len, "__type");
if (!error) {
/* error can not be parsed, print raw response */
flb_plg_warn(ins, "Raw response: %s", response);
return;
}

Expand Down

0 comments on commit 26a4833

Please sign in to comment.