Skip to content

Commit

Permalink
Downgrade zstd compression to gzip if sending grpc payload
Browse files Browse the repository at this point in the history
Signed-off-by: Rob Skillington <[email protected]>
  • Loading branch information
robskillington committed Nov 26, 2024
1 parent a6a1cf2 commit 89acb65
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 9 deletions.
1 change: 1 addition & 0 deletions include/fluent-bit/flb_http_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ int flb_http_bearer_auth(struct flb_http_client *c,
const char *token);
int flb_http_set_keepalive(struct flb_http_client *c);
int flb_http_set_content_encoding_gzip(struct flb_http_client *c);
int flb_http_set_content_encoding_zstd(struct flb_http_client *c);
int flb_http_set_callback_context(struct flb_http_client *c,
struct flb_callback *cb_ctx);

Expand Down
40 changes: 32 additions & 8 deletions plugins/out_opentelemetry/opentelemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ int opentelemetry_legacy_post(struct opentelemetry_context *ctx,
{
size_t final_body_len;
void *final_body;
const char *compression_algorithm;
int compressed;
int out_ret;
size_t b_sent;
Expand All @@ -63,6 +64,7 @@ int opentelemetry_legacy_post(struct opentelemetry_context *ctx,
struct flb_config_map_val *mv;
struct flb_http_client *c;

compression_algorithm = NULL;
compressed = FLB_FALSE;

u_conn = flb_upstream_conn_get(ctx->u);
Expand All @@ -81,6 +83,7 @@ int opentelemetry_legacy_post(struct opentelemetry_context *ctx,
&final_body, &final_body_len);

if (ret == 0) {
compression_algorithm = "gzip";
compressed = FLB_TRUE;
}
else {
Expand All @@ -94,6 +97,7 @@ int opentelemetry_legacy_post(struct opentelemetry_context *ctx,
&final_body, &final_body_len);

if (ret == 0) {
compression_algorithm = "zstd";
compressed = FLB_TRUE;
}
else {
Expand Down Expand Up @@ -163,7 +167,12 @@ int opentelemetry_legacy_post(struct opentelemetry_context *ctx,
}

if (compressed) {
flb_http_set_content_encoding_gzip(c);
if (strncasecmp(compression_algorithm, "gzip", 4) == 0) {
flb_http_set_content_encoding_gzip(c);
}
else if (strncasecmp(compression_algorithm, "zstd", 4) == 0) {
flb_http_set_content_encoding_zstd(c);
}
}

ret = flb_http_do(c, &b_sent);
Expand Down Expand Up @@ -256,7 +265,15 @@ int opentelemetry_post(struct opentelemetry_context *ctx,
http_uri);
}

compression_algorithm = NULL;
if (ctx->compress_gzip == FLB_TRUE) {
compression_algorithm = "gzip";
}
else if (ctx->compress_zstd == FLB_TRUE) {
compression_algorithm = "zstd";
}
else {
compression_algorithm = NULL;
}

request = flb_http_client_request_builder(
&ctx->http_client,
Expand Down Expand Up @@ -293,6 +310,19 @@ int opentelemetry_post(struct opentelemetry_context *ctx,

grpc_body_length = cfl_sds_len(grpc_body);

if (compression_algorithm != NULL) {
// If compression enabled, ensure compression is gzip otherwise we
// need to fallback to gzip.
// Today grpc only supports gzip compression.
if (strncasecmp(compression_algorithm, "gzip", 4) != 0) {
// Only gzip is supported for gRPC, fall back to gzip.
flb_plg_debug(ctx->ins,
"grpc compression '%s' unsupported, using gzip",
compression_algorithm);
compression_algorithm = "gzip";
}
}

result = flb_http_request_set_parameters(request,
FLB_HTTP_CLIENT_ARGUMENT_URI(grpc_uri),
FLB_HTTP_CLIENT_ARGUMENT_CONTENT_TYPE(
Expand All @@ -310,12 +340,6 @@ int opentelemetry_post(struct opentelemetry_context *ctx,
}
}
else {
if (ctx->compress_gzip == FLB_TRUE) {
compression_algorithm = "gzip";
} else if (ctx->compress_zstd == FLB_TRUE) {
compression_algorithm = "zstd";
}

result = flb_http_request_set_parameters(request,
FLB_HTTP_CLIENT_ARGUMENT_URI(http_uri),
FLB_HTTP_CLIENT_ARGUMENT_CONTENT_TYPE(
Expand Down
12 changes: 12 additions & 0 deletions src/flb_http_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -1068,6 +1068,18 @@ int flb_http_set_content_encoding_gzip(struct flb_http_client *c)
return ret;
}

/* Adds a header specifying that the payload is compressed with zstd */
int flb_http_set_content_encoding_zstd(struct flb_http_client *c)
{
int ret;

ret = flb_http_add_header(c,
FLB_HTTP_HEADER_CONTENT_ENCODING,
sizeof(FLB_HTTP_HEADER_CONTENT_ENCODING) - 1,
"zstd", 4);
return ret;
}

int flb_http_set_callback_context(struct flb_http_client *c,
struct flb_callback *cb_ctx)
{
Expand Down
2 changes: 1 addition & 1 deletion src/flb_http_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ int flb_http_request_compress_body(
request->body,
cfl_sds_len(request->body));
}
else if (strncasecmp(content_encoding_header_value, "deflate", 4) == 0) {
else if (strncasecmp(content_encoding_header_value, "deflate", 7) == 0) {
result = compress_deflate(&output_buffer,
&output_size,
request->body,
Expand Down

0 comments on commit 89acb65

Please sign in to comment.