Skip to content

Commit

Permalink
ZSTD and GZIP/DEFLATE streaming support (netdata#16268)
Browse files Browse the repository at this point in the history
* move compression header to compression.h

* prototype with zstd compression

* updated capabilities

* no need for resetting compression

* left-over reset function

* use ZSTD_compressStream() instead of ZSTD_compressStream2() for backwards compatibility

* remove call to LZ4_decoderRingBufferSize()

* debug signature failures

* fix the buffers of lz4

* fix decoding of zstd

* detect compression based on initialization; prefer ZSTD over LZ4

* allow both lz4 and zstd

* initialize zstd streams

* define missing ZSTD_CLEVEL_DEFAULT

* log zero compressed size

* debug log

* flush compression buffer

* add sender compression statistics

* removed debugging messages

* do not fail if zstd is not available

* cleanup and buildinfo

* fix max message size, use zstd level 1, add compressio ratio reporting

* use compression level 1

* fix ratio title

* better compression error logs

* for backwards compatibility use buffers of COMPRESSION_MAX_CHUNK

* switch to default compression level

* additional streaming error conditions detection

* do not expose compression stats when compression is not enabled

* test for the right lz4 functions

* moved lz4 and zstd to their own files

* add gzip streaming compression

* gzip error handling

* added unittest for streaming compression

* eliminate a copy of the uncompressed data during zstd compression

* eliminate not needed zstd allocations

* cleanup

* decode gzip with Z_SYNC_FLUSH

* set the decoding gzip algorithm

* user configuration for compression levels and compression algorithms order

* fix exclusion of not preferred compressions

* remove now obsolete compression define, since gzip is always available

* rename compression algorithms order in stream.conf

* move common checks in compression.c

* cleanup

* backwards compatible error checking
  • Loading branch information
ktsaou authored Oct 27, 2023
1 parent 89978b5 commit cd584e0
Show file tree
Hide file tree
Showing 20 changed files with 1,344 additions and 405 deletions.
8 changes: 8 additions & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,13 @@ API_PLUGIN_FILES = \
STREAMING_PLUGIN_FILES = \
streaming/rrdpush.c \
streaming/compression.c \
streaming/compression.h \
streaming/compression_gzip.c \
streaming/compression_gzip.h \
streaming/compression_lz4.c \
streaming/compression_lz4.h \
streaming/compression_zstd.c \
streaming/compression_zstd.h \
streaming/sender.c \
streaming/receiver.c \
streaming/replication.h \
Expand Down Expand Up @@ -1143,6 +1150,7 @@ NETDATA_COMMON_LIBS = \
$(OPTIONAL_MQTT_LIBS) \
$(OPTIONAL_UV_LIBS) \
$(OPTIONAL_LZ4_LIBS) \
$(OPTIONAL_ZSTD_LIBS) \
$(OPTIONAL_DATACHANNEL_LIBS) \
libjudy.a \
$(OPTIONAL_SSL_LIBS) \
Expand Down
19 changes: 16 additions & 3 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -555,16 +555,28 @@ OPTIONAL_UV_LIBS="${UV_LIBS}"

AC_CHECK_LIB(
[lz4],
[LZ4_initStream],
[LZ4_createStream],
[LZ4_LIBS_FAST="-llz4"]
)

AC_CHECK_LIB(
[lz4],
[LZ4_compress_default],
[LZ4_compress_fast_continue],
[LZ4_LIBS="-llz4"]
)

# -----------------------------------------------------------------------------
# zstd

AC_CHECK_LIB([zstd], [ZSTD_createCStream, ZSTD_compressStream, ZSTD_decompressStream, ZSTD_createDStream],
[LIBZSTD_FOUND=yes],
[LIBZSTD_FOUND=no])

if test "x$LIBZSTD_FOUND" = "xyes"; then
AC_DEFINE([ENABLE_ZSTD], [1], [libzstd usability])
OPTIONAL_ZSTD_LIBS="-lzstd"
fi

# -----------------------------------------------------------------------------
# zlib

Expand Down Expand Up @@ -702,7 +714,7 @@ if test "${enable_lz4}" != "no"; then
AC_TRY_LINK(
[ #include <lz4.h> ],
[
LZ4_stream_t* stream = LZ4_initStream(NULL, 0);
LZ4_stream_t* stream = LZ4_createStream();
],
[ enable_lz4="yes"],
[ enable_lz4="no" ]
Expand Down Expand Up @@ -1900,6 +1912,7 @@ AC_SUBST([OPTIONAL_MATH_LIBS])
AC_SUBST([OPTIONAL_DATACHANNEL_LIBS])
AC_SUBST([OPTIONAL_UV_LIBS])
AC_SUBST([OPTIONAL_LZ4_LIBS])
AC_SUBST([OPTIONAL_ZSTD_LIBS])
AC_SUBST([OPTIONAL_SSL_LIBS])
AC_SUBST([OPTIONAL_JSONC_LIBS])
AC_SUBST([OPTIONAL_YAML_LIBS])
Expand Down
50 changes: 46 additions & 4 deletions daemon/buildinfo.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ typedef enum __attribute__((packed)) {
BIB_FEATURE_CLOUD,
BIB_FEATURE_HEALTH,
BIB_FEATURE_STREAMING,
BIB_FEATURE_BACKFILLING,
BIB_FEATURE_REPLICATION,
BIB_FEATURE_STREAMING_COMPRESSION,
BIB_FEATURE_CONTEXTS,
Expand All @@ -66,6 +67,7 @@ typedef enum __attribute__((packed)) {
BIB_CONNECTIVITY_NATIVE_HTTPS,
BIB_CONNECTIVITY_TLS_HOST_VERIFY,
BIB_LIB_LZ4,
BIB_LIB_ZSTD,
BIB_LIB_ZLIB,
BIB_LIB_JUDY,
BIB_LIB_DLIB,
Expand Down Expand Up @@ -484,6 +486,14 @@ static struct {
.json = "streaming",
.value = NULL,
},
[BIB_FEATURE_BACKFILLING] = {
.category = BIC_FEATURE,
.type = BIT_BOOLEAN,
.analytics = NULL,
.print = "Back-filling (of higher database tiers)",
.json = "back-filling",
.value = NULL,
},
[BIB_FEATURE_REPLICATION] = {
.category = BIC_FEATURE,
.type = BIT_BOOLEAN,
Expand All @@ -498,7 +508,7 @@ static struct {
.analytics = "Stream Compression",
.print = "Streaming and Replication Compression",
.json = "stream-compression",
.value = "none",
.value = NULL,
},
[BIB_FEATURE_CONTEXTS] = {
.category = BIC_FEATURE,
Expand Down Expand Up @@ -628,6 +638,14 @@ static struct {
.json = "lz4",
.value = NULL,
},
[BIB_LIB_ZSTD] = {
.category = BIC_LIBS,
.type = BIT_BOOLEAN,
.analytics = NULL,
.print = "ZSTD (fast, lossless compression algorithm)",
.json = "zstd",
.value = NULL,
},
[BIB_LIB_ZLIB] = {
.category = BIC_LIBS,
.type = BIT_BOOLEAN,
Expand Down Expand Up @@ -1029,6 +1047,23 @@ static void build_info_set_value(BUILD_INFO_SLOT slot, const char *value) {
BUILD_INFO[slot].value = value;
}

static void build_info_append_value(BUILD_INFO_SLOT slot, const char *value) {
size_t size = BUILD_INFO[slot].value ? strlen(BUILD_INFO[slot].value) + 1 : 0;
size += strlen(value);
char buf[size + 1];

if(BUILD_INFO[slot].value) {
strcpy(buf, BUILD_INFO[slot].value);
strcat(buf, " ");
strcat(buf, value);
}
else
strcpy(buf, value);

freez((void *)BUILD_INFO[slot].value);
BUILD_INFO[slot].value = strdupz(buf);
}

static void build_info_set_value_strdupz(BUILD_INFO_SLOT slot, const char *value) {
if(!value) value = "";
build_info_set_value(slot, strdupz(value));
Expand Down Expand Up @@ -1075,14 +1110,18 @@ __attribute__((constructor)) void initialize_build_info(void) {

build_info_set_status(BIB_FEATURE_HEALTH, true);
build_info_set_status(BIB_FEATURE_STREAMING, true);
build_info_set_status(BIB_FEATURE_BACKFILLING, true);
build_info_set_status(BIB_FEATURE_REPLICATION, true);

#ifdef ENABLE_RRDPUSH_COMPRESSION
build_info_set_status(BIB_FEATURE_STREAMING_COMPRESSION, true);
#ifdef ENABLE_LZ4
build_info_set_value(BIB_FEATURE_STREAMING_COMPRESSION, "lz4");

#ifdef ENABLE_ZSTD
build_info_append_value(BIB_FEATURE_STREAMING_COMPRESSION, "zstd");
#endif
#ifdef ENABLE_LZ4
build_info_append_value(BIB_FEATURE_STREAMING_COMPRESSION, "lz4");
#endif
build_info_append_value(BIB_FEATURE_STREAMING_COMPRESSION, "gzip");

build_info_set_status(BIB_FEATURE_CONTEXTS, true);
build_info_set_status(BIB_FEATURE_TIERING, true);
Expand Down Expand Up @@ -1117,6 +1156,9 @@ __attribute__((constructor)) void initialize_build_info(void) {
#ifdef ENABLE_LZ4
build_info_set_status(BIB_LIB_LZ4, true);
#endif
#ifdef ENABLE_ZSTD
build_info_set_status(BIB_LIB_ZSTD, true);
#endif

build_info_set_status(BIB_LIB_ZLIB, true);

Expand Down
7 changes: 7 additions & 0 deletions daemon/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -1337,6 +1337,7 @@ int julytest(void);
int pluginsd_parser_unittest(void);
void replication_initialize(void);
void bearer_tokens_init(void);
int unittest_rrdpush_compressions(void);

int main(int argc, char **argv) {
// initialize the system clocks
Expand Down Expand Up @@ -1550,6 +1551,10 @@ int main(int argc, char **argv) {
unittest_running = true;
return pluginsd_parser_unittest();
}
else if(strcmp(optarg, "rrdpush_compressions_test") == 0) {
unittest_running = true;
return unittest_rrdpush_compressions();
}
else if(strncmp(optarg, createdataset_string, strlen(createdataset_string)) == 0) {
optarg += strlen(createdataset_string);
unsigned history_seconds = strtoul(optarg, NULL, 0);
Expand Down Expand Up @@ -1901,6 +1906,8 @@ int main(int argc, char **argv) {
netdata_log_info("Netdata agent version \""VERSION"\" is starting");

ieee754_doubles = is_system_ieee754_double();
if(!ieee754_doubles)
globally_disabled_capabilities |= STREAM_CAP_IEEE754;

aral_judy_init();

Expand Down
15 changes: 4 additions & 11 deletions database/rrdhost.c
Original file line number Diff line number Diff line change
Expand Up @@ -1145,13 +1145,10 @@ static void rrdhost_streaming_sender_structures_init(RRDHOST *host)
host->sender->rrdpush_sender_pipe[PIPE_READ] = -1;
host->sender->rrdpush_sender_pipe[PIPE_WRITE] = -1;
host->sender->rrdpush_sender_socket = -1;
host->sender->disabled_capabilities = STREAM_CAP_NONE;

#ifdef ENABLE_RRDPUSH_COMPRESSION
if(default_rrdpush_compression_enabled)
host->sender->flags |= SENDER_FLAG_COMPRESSION;
else
host->sender->flags &= ~SENDER_FLAG_COMPRESSION;
#endif
if(!default_rrdpush_compression_enabled)
host->sender->disabled_capabilities |= STREAM_CAP_COMPRESSIONS_AVAILABLE;

spinlock_init(&host->sender->spinlock);
replication_init_sender(host->sender);
Expand All @@ -1167,9 +1164,7 @@ static void rrdhost_streaming_sender_structures_free(RRDHOST *host)
rrdpush_sender_thread_stop(host, STREAM_HANDSHAKE_DISCONNECT_HOST_CLEANUP, true); // stop a possibly running thread
cbuffer_free(host->sender->buffer);

#ifdef ENABLE_RRDPUSH_COMPRESSION
rrdpush_compressor_destroy(&host->sender->compressor);
#endif

replication_cleanup_sender(host->sender);

Expand Down Expand Up @@ -1885,9 +1880,7 @@ void rrdhost_status(RRDHOST *host, time_t now, RRDHOST_STATUS *s) {
else
s->stream.status = RRDHOST_STREAM_STATUS_ONLINE;

#ifdef ENABLE_RRDPUSH_COMPRESSION
s->stream.compression = (stream_has_capability(host->sender, STREAM_CAP_COMPRESSION) && host->sender->compressor.initialized);
#endif
s->stream.compression = host->sender->compressor.initialized;
}
else {
s->stream.status = RRDHOST_STREAM_STATUS_OFFLINE;
Expand Down
11 changes: 4 additions & 7 deletions libnetdata/libnetdata.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@ extern "C" {
#include <config.h>
#endif

#ifdef ENABLE_LZ4
#define ENABLE_RRDPUSH_COMPRESSION 1
#endif

#ifdef ENABLE_OPENSSL
#define ENABLE_HTTPS 1
#endif
Expand Down Expand Up @@ -681,9 +677,10 @@ static inline BITMAPX *bitmapX_create(uint32_t bits) {
#define bitmap1024_get_bit(ptr, idx) bitmapX_get_bit((BITMAPX *)ptr, idx)
#define bitmap1024_set_bit(ptr, idx, value) bitmapX_set_bit((BITMAPX *)ptr, idx, value)


#define COMPRESSION_MAX_MSG_SIZE 0x4000
#define PLUGINSD_LINE_MAX (COMPRESSION_MAX_MSG_SIZE - 1024)
#define COMPRESSION_MAX_CHUNK 0x4000
#define COMPRESSION_MAX_OVERHEAD 128
#define COMPRESSION_MAX_MSG_SIZE (COMPRESSION_MAX_CHUNK - COMPRESSION_MAX_OVERHEAD - 1)
#define PLUGINSD_LINE_MAX (COMPRESSION_MAX_MSG_SIZE - 768)
int pluginsd_isspace(char c);
int config_isspace(char c);
int group_by_label_isspace(char c);
Expand Down
Loading

0 comments on commit cd584e0

Please sign in to comment.