Skip to content

Commit

Permalink
Refactor zstd compress/uncompress into more modular reuseable methods
Browse files Browse the repository at this point in the history
  • Loading branch information
robskillington committed Nov 24, 2024
1 parent 59f25f7 commit 4129eea
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 106 deletions.
28 changes: 28 additions & 0 deletions include/fluent-bit/flb_zstd.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */

/* Fluent Bit
* ==========
* Copyright (C) 2015-2024 The Fluent Bit Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef FLB_ZSTD_H
#define FLB_ZSTD_H

int flb_zstd_compress(void *in_data, size_t in_len,
void **out_data, size_t *out_len);
int flb_zstd_uncompress(void *in_data, size_t in_len,
void **out_data, size_t *out_size);

#endif
34 changes: 8 additions & 26 deletions plugins/in_opentelemetry/opentelemetry_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <fluent-bit/flb_time.h>
#include <fluent-bit/flb_gzip.h>
#include <fluent-bit/flb_snappy.h>
#include <fluent-bit/flb_zstd.h>
#include <fluent-bit/flb_mp.h>
#include <fluent-bit/flb_log_event_encoder.h>

Expand All @@ -39,8 +40,6 @@
#include "opentelemetry.h"
#include "http_conn.h"

#include <zstd/lib/zstd.h>

#define HTTP_CONTENT_JSON 0

static int json_payload_append_converted_value(
Expand Down Expand Up @@ -1659,36 +1658,19 @@ int uncompress_zstd(char **output_buffer,
char *input_buffer,
size_t input_size)
{
// NB(rob): output_buffer and output_size are never initialized
// so we need to estimate compressed size and then alloc
// the output buffer.
// Caller needs to free output_buffer after call to this function.
size_t max_decompress_size = (size_t)ZSTD_getFrameContentSize(
(void*)input_buffer, input_size);
if (ZSTD_isError(max_decompress_size) != 0) {
flb_error("[opentelemetry] zstd decompression failed estimate buffer");
*output_buffer = (char *)input_buffer;
*output_size = input_size;
return -1;
}

void *out_buf = flb_malloc(max_decompress_size);
int ret;

size_t ret;
ret = flb_zstd_uncompress(input_buffer,
input_size,
(void **)output_buffer,
output_size);

ret = ZSTD_decompress(out_buf,
max_decompress_size,
(void *)input_buffer,
input_size);
if (ZSTD_isError(ret) != 0) {
if (ret != 0) {
flb_error("[opentelemetry] zstd decompression failed");
*output_buffer = (char *)input_buffer;
*output_size = input_size;

return -1;
}

*output_buffer = (char *)out_buf;
*output_size = ret;
return 1;
}

Expand Down
39 changes: 10 additions & 29 deletions plugins/out_opentelemetry/opentelemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

#include <cmetrics/cmetrics.h>
#include <fluent-bit/flb_gzip.h>
#include <fluent-bit/flb_zstd.h>
#include <cmetrics/cmt_encode_opentelemetry.h>

#include <ctraces/ctraces.h>
Expand All @@ -44,8 +45,6 @@ extern void cmt_encode_opentelemetry_destroy(cfl_sds_t text);
#include "opentelemetry_conf.h"
#include "opentelemetry_utils.h"

#include <zstd/lib/zstd.h>

int opentelemetry_legacy_post(struct opentelemetry_context *ctx,
const void *body, size_t body_len,
const char *tag, int tag_len,
Expand Down Expand Up @@ -85,40 +84,22 @@ int opentelemetry_legacy_post(struct opentelemetry_context *ctx,
compressed = FLB_TRUE;
}
else {
final_body = (void *) body;
final_body_len = body_len;
flb_plg_error(ctx->ins, "cannot gzip payload, disabling compression");
}
}
else if (ctx->compress_zstd) {
// NB(rob): final_body and final_body_len are never initialized
// so we need to estimate compressed size and then alloc
// the output buffer.
// Caller needs to free output_buffer after call to this function.
size_t max_compress_size = (size_t)ZSTD_compressBound(body_len);
if (ZSTD_isError(max_compress_size) != 0) {
flb_error("zstd compression failed estimate buffer");
final_body = (void *) body;
final_body_len = body_len;
return -1;
}

void *out_buf = flb_malloc(max_compress_size);
ret = flb_zstd_compress((void *) body, body_len,
&final_body, &final_body_len);

size_t compress_ret = ZSTD_compress(out_buf,
max_compress_size,
body,
body_len,
ZSTD_CLEVEL_DEFAULT);
if (ZSTD_isError(compress_ret) != 0) {
final_body = (void *) body;
final_body_len = body_len;
ret = -1;
flb_plg_error(ctx->ins, "cannot zstd compress payload, disabling compression");
if (ret == 0) {
compressed = FLB_TRUE;
}
else {
final_body = out_buf;
final_body_len = compress_ret;
compressed = FLB_TRUE;
ret = 0;
final_body = (void *) body;
final_body_len = body_len;
flb_plg_error(ctx->ins, "cannot zstd payload, disabling compression");
}
}
else {
Expand Down
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ set(src
flb_plugin.c
flb_gzip.c
flb_snappy.c
flb_zstd.c
flb_compression.c
flb_http_common.c
flb_http_client_http1.c
Expand Down
66 changes: 15 additions & 51 deletions src/flb_http_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@
#include <fluent-bit/flb_signv4_ng.h>
#include <fluent-bit/flb_snappy.h>
#include <fluent-bit/flb_gzip.h>

#include <zstd/lib/zstd.h>
#include <fluent-bit/flb_zstd.h>

/* PRIVATE */

Expand Down Expand Up @@ -1468,36 +1467,19 @@ int uncompress_zstd(char **output_buffer,
char *input_buffer,
size_t input_size)
{
// NB(rob): output_buffer and output_size are never initialized
// so we need to estimate compressed size and then alloc
// the output buffer.
// Caller needs to free output_buffer after call to this function.
size_t max_decompress_size = (size_t)ZSTD_getFrameContentSize(
(void*)input_buffer, input_size);
if (ZSTD_isError(max_decompress_size) != 0) {
flb_error("[opentelemetry] zstd decompression failed estimate buffer: error_no=%zu", max_decompress_size);
*output_buffer = (char *)input_buffer;
*output_size = input_size;
return -1;
}
int ret;

void *out_buf = flb_malloc(max_decompress_size);
ret = flb_zstd_uncompress(input_buffer,
input_size,
output_buffer,
output_size);

size_t ret;
if (ret != 0) {
flb_error("zstd decompression failed");

ret = ZSTD_decompress(out_buf,
max_decompress_size,
(void *)input_buffer,
input_size);
if (ZSTD_isError(ret) != 0) {
flb_error("[opentelemetry] zstd decompression failed");
*output_buffer = (char *)input_buffer;
*output_size = input_size;
return -1;
}

*output_buffer = (char *)out_buf;
*output_size = ret;
return 1;
}

Expand Down Expand Up @@ -1570,36 +1552,18 @@ int compress_zstd(char **output_buffer,
char *input_buffer,
size_t input_size)
{
// NB(rob): output_buffer and output_size are never initialized
// so we need to estimate compressed size and then alloc
// the output buffer.
// Caller needs to free output_buffer after call to this function.
size_t max_compress_size = (size_t)ZSTD_compressBound(input_size);
if (ZSTD_isError(max_compress_size) != 0) {
flb_error("zstd compression failed estimate buffer");
*output_buffer = (char *)input_buffer;
*output_size = input_size;
return -1;
}

void *out_buf = flb_malloc(max_compress_size);
int ret;

size_t ret;
ret = flb_zstd_compress((void *) input_buffer,
input_size,
(void **) output_buffer,
output_size);

ret = ZSTD_compress(out_buf,
max_compress_size,
(void *)input_buffer,
input_size,
ZSTD_CLEVEL_DEFAULT);
if (ZSTD_isError(ret) != 0) {
flb_error("zstd compression failed: error_no=%zu", ret);
*output_buffer = (char *)input_buffer;
*output_size = input_size;
if (ret == -1) {
flb_error("http client zstd compression failed");
return -1;
}

*output_buffer = (char *)out_buf;
*output_size = ret;
return 1;
}

Expand Down
87 changes: 87 additions & 0 deletions src/flb_zstd.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */

/* Fluent Bit
* ==========
* Copyright (C) 2015-2024 The Fluent Bit Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <fluent-bit/flb_mem.h>
#include <fluent-bit/flb_log.h>

#include <zstd/lib/zstd.h>

int flb_zstd_compress(void *in_data, size_t in_len,
void **out_data, size_t *out_len)
{
// NB(rob): out_data and out_len are never initialized
// so we need to estimate compressed size and then alloc
// the output buffer.
size_t max_compress_size = (size_t)ZSTD_compressBound(in_len);
if (ZSTD_isError(max_compress_size) != 0) {
size_t err = max_compress_size;
flb_error("zstd compression failed estimatation: error_no=%zu", err);
return -1;
}

void *out_buf = flb_malloc(max_compress_size);

size_t ret = ZSTD_compress(out_buf,
max_compress_size,
(void *)in_data,
in_len,
ZSTD_CLEVEL_DEFAULT);
if (ZSTD_isError(ret) != 0) {
flb_free(out_buf);
size_t err = ret;
flb_error("zstd compression failed: error_no=%zu", err);
return -1;
}

*out_data = out_buf;
*out_len = ret;
return 0;
}

int flb_zstd_uncompress(void *in_data, size_t in_len,
void **out_data, size_t *out_len)
{
// NB(rob): out_data and out_size are never initialized
// so we need to estimate compressed size and then alloc
// the output buffer.
size_t max_decompress_size = (size_t)ZSTD_getFrameContentSize(in_data,
in_len);
if (ZSTD_isError(max_decompress_size) != 0) {
size_t err = max_decompress_size;
flb_error("zstd decompression failed estimatation: error_no=%zu", err);
return -1;
}

void *out_buf = flb_malloc(max_decompress_size);

size_t ret = ZSTD_decompress(out_buf,
max_decompress_size,
(void *)in_data,
in_len);
if (ZSTD_isError(ret) != 0) {
flb_free(out_buf);
size_t err = ret;
flb_error("zstd decompression failed: error_no=%zu", err);
return -1;
}

*out_data = out_buf;
*out_len = ret;
return 0;
}

0 comments on commit 4129eea

Please sign in to comment.