Skip to content

Commit

Permalink
systemd-cat-native negative timeout (netdata#18729)
Browse files Browse the repository at this point in the history
fixed negative timeout; added verbose mode
  • Loading branch information
ktsaou authored Oct 8, 2024
1 parent 02c06c0 commit ce4a911
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 29 deletions.
5 changes: 4 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2401,7 +2401,10 @@ set(SYSTEMD_CAT_NATIVE_FILES src/libnetdata/log/systemd-cat-native.c
src/libnetdata/log/systemd-cat-native.h)

add_executable(systemd-cat-native ${SYSTEMD_CAT_NATIVE_FILES})
target_link_libraries(systemd-cat-native libnetdata)
target_link_libraries(systemd-cat-native
libnetdata
"$<$<BOOL:${CURL_FOUND}>:PkgConfig::CURL>"
)

install(TARGETS systemd-cat-native
COMPONENT netdata
Expand Down
70 changes: 43 additions & 27 deletions src/libnetdata/log/systemd-cat-native.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
#include <machine/endian.h>
#endif

static inline void log_message_to_stderr(BUFFER *msg) {
bool verbose = false;

static inline void log_message_to_stderr(BUFFER *msg, const char *scope) {
CLEAN_BUFFER *tmp = buffer_create(0, NULL);

for(size_t i = 0; i < msg->len ;i++) {
Expand All @@ -24,13 +26,13 @@ static inline void log_message_to_stderr(BUFFER *msg) {
}
}

fprintf(stderr, "SENDING: %s\n", buffer_tostring(tmp));
fprintf(stderr, "SENDING %s: %s\n", scope, buffer_tostring(tmp));
}

static inline buffered_reader_ret_t get_next_line(struct buffered_reader *reader, BUFFER *line, int timeout_ms) {
while(true) {
if(unlikely(!buffered_reader_next_line(reader, line))) {
buffered_reader_ret_t ret = buffered_reader_read_timeout(reader, STDIN_FILENO, timeout_ms, false);
buffered_reader_ret_t ret = buffered_reader_read_timeout(reader, STDIN_FILENO, timeout_ms, verbose);
if(unlikely(ret != BUFFERED_READER_READ_OK))
return ret;

Expand Down Expand Up @@ -126,7 +128,7 @@ static inline void buffer_memcat_replacing_newlines(BUFFER *wb, const char *src,
// ----------------------------------------------------------------------------
// log to a systemd-journal-remote

#ifdef HAVE_CURL
#ifdef HAVE_LIBCURL
#include <curl/curl.h>

#ifndef HOST_NAME_MAX
Expand Down Expand Up @@ -203,8 +205,8 @@ static void journal_remote_complete_event(BUFFER *msg, usec_t *monotonic_ut) {

buffer_sprintf(msg,
""
"__REALTIME_TIMESTAMP=%llu\n"
"__MONOTONIC_TIMESTAMP=%llu\n"
"__REALTIME_TIMESTAMP=%"PRIu64"\n"
"__MONOTONIC_TIMESTAMP=%"PRIu64"\n"
"_MACHINE_ID=%s\n"
"_BOOT_ID=%s\n"
"_HOSTNAME=%s\n"
Expand All @@ -226,7 +228,8 @@ static void journal_remote_complete_event(BUFFER *msg, usec_t *monotonic_ut) {

static CURLcode journal_remote_send_buffer(CURL* curl, BUFFER *msg) {

// log_message_to_stderr(msg);
if(verbose)
log_message_to_stderr(msg, "REMOTE");

struct upload_data upload = {0};

Expand Down Expand Up @@ -260,8 +263,8 @@ static log_to_journal_remote_ret_t log_input_to_journal_remote(const char *url,

global_boot_id[0] = '\0';
char buffer[1024];
if(read_file(BOOT_ID_PATH, buffer, sizeof(buffer)) == 0) {
uuid_t uuid;
if(read_txt_file(BOOT_ID_PATH, buffer, sizeof(buffer)) == 0) {
nd_uuid_t uuid;
if(uuid_parse_flexi(buffer, uuid) == 0)
uuid_unparse_lower_compact(uuid, global_boot_id);
else
Expand All @@ -270,13 +273,13 @@ static log_to_journal_remote_ret_t log_input_to_journal_remote(const char *url,

if(global_boot_id[0] == '\0') {
fprintf(stderr, "WARNING: cannot read '%s'. Will generate a random _BOOT_ID.\n", BOOT_ID_PATH);
uuid_t uuid;
nd_uuid_t uuid;
uuid_generate_random(uuid);
uuid_unparse_lower_compact(uuid, global_boot_id);
}

if(read_file(MACHINE_ID_PATH, buffer, sizeof(buffer)) == 0) {
uuid_t uuid;
if(read_txt_file(MACHINE_ID_PATH, buffer, sizeof(buffer)) == 0) {
nd_uuid_t uuid;
if(uuid_parse_flexi(buffer, uuid) == 0)
uuid_unparse_lower_compact(uuid, global_machine_id);
else
Expand All @@ -285,13 +288,13 @@ static log_to_journal_remote_ret_t log_input_to_journal_remote(const char *url,

if(global_machine_id[0] == '\0') {
fprintf(stderr, "WARNING: cannot read '%s'. Will generate a random _MACHINE_ID.\n", MACHINE_ID_PATH);
uuid_t uuid;
nd_uuid_t uuid;
uuid_generate_random(uuid);
uuid_unparse_lower_compact(uuid, global_boot_id);
}

if(global_stream_id[0] == '\0') {
uuid_t uuid;
nd_uuid_t uuid;
uuid_generate_random(uuid);
uuid_unparse_lower_compact(uuid, global_stream_id);
}
Expand Down Expand Up @@ -456,10 +459,11 @@ static int help(void) {
"Usage:\n"
"\n"
" %s\n"
" [--verbose|-v]\n"
" [--newline=STRING]\n"
" [--log-as-netdata|-N]\n"
" [--namespace=NAMESPACE] [--socket=PATH]\n"
#ifdef HAVE_CURL
#ifdef HAVE_LIBCURL
" [--url=URL [--key=FILENAME] [--cert=FILENAME] [--trust=FILENAME|all]]\n"
#endif
"\n"
Expand Down Expand Up @@ -488,7 +492,7 @@ static int help(void) {
" the log destination. Only log fields defined by Netdata are accepted.\n"
" If the environment variables expected by Netdata are not found, it\n"
" falls back to stderr logging in logfmt format.\n"
#ifdef HAVE_CURL
#ifdef HAVE_LIBCURL
"\n"
" * Log to a systemd-journal-remote TCP socket, enabled with --url=URL\n"
"\n"
Expand Down Expand Up @@ -585,15 +589,16 @@ static int log_input_as_netdata(const char *newline, int timeout_ms) {
ND_LOG_STACK_PUSH(lgs);
lgs_reset(lgs);

ND_LOG_SOURCES source = NDLS_HEALTH;
ND_LOG_FIELD_PRIORITY priority = NDLP_INFO;
size_t fields_added = 0;
size_t messages_logged = 0;
ND_LOG_FIELD_PRIORITY priority = NDLP_INFO;

while(get_next_line(&reader, line, timeout_ms) == BUFFERED_READER_READ_OK) {
if(!line->len) {
// an empty line - we are done for this message

nd_log(NDLS_HEALTH, priority,
nd_log(source, priority,
"added %zu fields", // if the user supplied a MESSAGE, this will be ignored
fields_added);

Expand Down Expand Up @@ -625,7 +630,7 @@ static int log_input_as_netdata(const char *newline, int timeout_ms) {
struct log_stack_entry backup = lgs[NDF_MESSAGE];
lgs[NDF_MESSAGE] = ND_LOG_FIELD_TXT(NDF_MESSAGE, NULL);

nd_log(NDLS_COLLECTORS, NDLP_ERR,
nd_log(source, NDLP_ERR,
"Field '%.*s' is not a Netdata field. Ignoring it.",
(int)field_len, field);

Expand All @@ -636,7 +641,7 @@ static int log_input_as_netdata(const char *newline, int timeout_ms) {
struct log_stack_entry backup = lgs[NDF_MESSAGE];
lgs[NDF_MESSAGE] = ND_LOG_FIELD_TXT(NDF_MESSAGE, NULL);

nd_log(NDLS_COLLECTORS, NDLP_ERR,
nd_log(source, NDLP_ERR,
"Line does not contain an = sign; ignoring it: %s",
line->buffer);

Expand All @@ -648,7 +653,7 @@ static int log_input_as_netdata(const char *newline, int timeout_ms) {
}

if(fields_added) {
nd_log(NDLS_HEALTH, priority, "added %zu fields", fields_added);
nd_log(source, priority, "added %zu fields", fields_added);
messages_logged++;
}

Expand All @@ -659,7 +664,8 @@ static int log_input_as_netdata(const char *newline, int timeout_ms) {
// log to a local systemd-journald

static bool journal_local_send_buffer(int fd, BUFFER *msg) {
// log_message_to_stderr(msg);
if(verbose)
log_message_to_stderr(msg, "LOCAL");

bool ret = journal_direct_send(fd, msg->buffer, msg->len);
if (!ret)
Expand Down Expand Up @@ -720,19 +726,26 @@ static int log_input_to_journal(const char *socket, const char *namespace, const
}

cleanup:
if(verbose) {
if(failed_messages)
fprintf(stderr, "%zu messages failed to be logged\n", failed_messages);
if(!messages_logged)
fprintf(stderr, "No messages were logged!\n");
}

return !failed_messages && messages_logged ? 0 : 1;
}

int main(int argc, char *argv[]) {
clocks_init();
nd_log_initialize_for_external_plugins(argv[0]);

int timeout_ms = -1; // wait forever
int timeout_ms = 0; // wait forever
bool log_as_netdata = false;
const char *newline = NULL;
const char *namespace = NULL;
const char *socket = getenv("NETDATA_SYSTEMD_JOURNAL_PATH");
#ifdef HAVE_CURL
#ifdef HAVE_LIBCURL
const char *url = NULL;
const char *key = NULL;
const char *cert = NULL;
Expand All @@ -746,6 +759,9 @@ int main(int argc, char *argv[]) {
if(strcmp(k, "--help") == 0 || strcmp(k, "-h") == 0)
return help();

else if(strcmp(k, "--verbose") == 0 || strcmp(k, "-v") == 0)
verbose = true;

else if(strcmp(k, "--log-as-netdata") == 0 || strcmp(k, "-N") == 0)
log_as_netdata = true;

Expand All @@ -758,7 +774,7 @@ int main(int argc, char *argv[]) {
else if(strncmp(k, "--newline=", 10) == 0)
newline = &k[10];

#ifdef HAVE_CURL
#ifdef HAVE_LIBCURL
else if (strncmp(k, "--url=", 6) == 0)
url = &k[6];

Expand All @@ -780,7 +796,7 @@ int main(int argc, char *argv[]) {
}
}

#ifdef HAVE_CURL
#ifdef HAVE_LIBCURL
if(log_as_netdata && url) {
fprintf(stderr, "Cannot log to a systemd-journal-remote URL as Netdata. "
"Please either give --url or --log-as-netdata, not both.\n");
Expand All @@ -804,7 +820,7 @@ int main(int argc, char *argv[]) {
if(log_as_netdata)
return log_input_as_netdata(newline, timeout_ms);

#ifdef HAVE_CURL
#ifdef HAVE_LIBCURL
if(url) {
if(url && namespace && *namespace)
snprintfz(global_namespace, sizeof(global_namespace), "_NAMESPACE=%s\n", namespace);
Expand Down
2 changes: 1 addition & 1 deletion src/libnetdata/socket/socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -1210,7 +1210,7 @@ inline int wait_on_socket_or_cancel_with_timeout(
.revents = 0,
};

bool forever = (timeout_ms == 0);
bool forever = (timeout_ms <= 0);

while (timeout_ms > 0 || forever) {
if(nd_thread_signaled_to_cancel()) {
Expand Down

0 comments on commit ce4a911

Please sign in to comment.