Skip to content

Commit

Permalink
in_opentelemetry: json logs: process spanID, traceID and observedTime…
Browse files Browse the repository at this point in the history
…UnixNano

Signed-off-by: Eduardo Silva <[email protected]>
  • Loading branch information
edsiper committed Dec 14, 2024
1 parent e6e1213 commit 7f59bcf
Showing 1 changed file with 71 additions and 8 deletions.
79 changes: 71 additions & 8 deletions plugins/in_opentelemetry/opentelemetry_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -1225,22 +1225,25 @@ static int json_payload_append_converted_value(
return result;
}

static int process_json_payload_log_records_entry(
struct flb_opentelemetry *ctx,
struct flb_log_event_encoder *encoder,
msgpack_object *log_records_object)
static int process_json_payload_log_records_entry(struct flb_opentelemetry *ctx,
struct flb_log_event_encoder *encoder,
msgpack_object *log_records_object)
{
msgpack_object_map *log_records_entry;
int result;
int body_type;
char timestamp_str[32];
msgpack_object_map *log_records_entry;
msgpack_object *timestamp_object;
uint64_t timestamp_uint64;
msgpack_object *metadata_object;
msgpack_object *body_object;
int body_type;
struct flb_time timestamp;
int result;
msgpack_object *observed_time_unix_nano = NULL;
msgpack_object *severity_number = NULL;
msgpack_object *severity_text = NULL;
msgpack_object *trace_id = NULL;
msgpack_object *span_id = NULL;
struct flb_time timestamp;


if (log_records_object->type != MSGPACK_OBJECT_MAP) {
flb_plg_error(ctx->ins, "unexpected logRecords entry type");
Expand Down Expand Up @@ -1300,6 +1303,15 @@ static int process_json_payload_log_records_entry(
flb_time_from_uint64(&timestamp, timestamp_uint64);
}

/* observedTimeUnixNano (yes, we do it again) */
result = find_map_entry_by_key(log_records_entry, "observedTimeUnixNano", 0, FLB_TRUE);
if (result == -1) {
result = find_map_entry_by_key(log_records_entry, "observed_time_unix_nano", 0, FLB_TRUE);
}
else if (result >= 0) {
observed_time_unix_nano = &log_records_entry->ptr[result].val;
}

/* severityNumber */
result = find_map_entry_by_key(log_records_entry, "severityNumber", 0, FLB_TRUE);
if (result == -1) {
Expand Down Expand Up @@ -1334,6 +1346,24 @@ static int process_json_payload_log_records_entry(
metadata_object = &log_records_entry->ptr[result].val;
}

/* traceId */
result = find_map_entry_by_key(log_records_entry, "traceId", 0, FLB_TRUE);
if (result == -1) {
result = find_map_entry_by_key(log_records_entry, "trace_id", 0, FLB_TRUE);
}
if (result >= 0) {
trace_id = &log_records_entry->ptr[result].val;
}

/* spanId */
result = find_map_entry_by_key(log_records_entry, "spanId", 0, FLB_TRUE);
if (result == -1) {
result = find_map_entry_by_key(log_records_entry, "span_id", 0, FLB_TRUE);
}
if (result >= 0) {
span_id = &log_records_entry->ptr[result].val;
}

result = find_map_entry_by_key(log_records_entry, "body", 0, FLB_TRUE);

if (result == -1) {
Expand Down Expand Up @@ -1363,6 +1393,27 @@ static int process_json_payload_log_records_entry(
flb_log_event_encoder_append_string(encoder, FLB_LOG_EVENT_METADATA, ctx->logs_metadata_key, flb_sds_len(ctx->logs_metadata_key));
flb_log_event_encoder_begin_map(encoder, FLB_LOG_EVENT_METADATA);

if (observed_time_unix_nano != NULL && observed_time_unix_nano->type == MSGPACK_OBJECT_STR) {
memset(timestamp_str, 0, sizeof(timestamp_str));

if (timestamp_object->via.str.size < sizeof(timestamp_str)) {
strncpy(timestamp_str,
timestamp_object->via.str.ptr,
timestamp_object->via.str.size);
}
else {
strncpy(timestamp_str,
timestamp_object->via.str.ptr,
sizeof(timestamp_str) - 1);
}

timestamp_uint64 = strtoul(timestamp_str, NULL, 10);

flb_log_event_encoder_append_metadata_values(encoder,
FLB_LOG_EVENT_STRING_VALUE("observed_timestamp", 18),
FLB_LOG_EVENT_INT64_VALUE(timestamp_uint64));
}

if (severity_number != NULL) {
flb_log_event_encoder_append_metadata_values(encoder,
FLB_LOG_EVENT_STRING_VALUE("severity_number", 15),
Expand All @@ -1380,6 +1431,18 @@ static int process_json_payload_log_records_entry(
result = json_payload_append_converted_kvlist(encoder, FLB_LOG_EVENT_METADATA, metadata_object);
}

if (trace_id != NULL && (trace_id->type == MSGPACK_OBJECT_STR || trace_id->type == MSGPACK_OBJECT_BIN)) {
flb_log_event_encoder_append_metadata_values(encoder,
FLB_LOG_EVENT_STRING_VALUE("trace_id", 8),
FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(trace_id));
}

if (span_id != NULL && (span_id->type == MSGPACK_OBJECT_STR || span_id->type == MSGPACK_OBJECT_BIN)) {
flb_log_event_encoder_append_metadata_values(encoder,
FLB_LOG_EVENT_STRING_VALUE("span_id", 7),
FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(span_id));
}

flb_log_event_encoder_commit_map(encoder, FLB_LOG_EVENT_METADATA);

}
Expand Down

0 comments on commit 7f59bcf

Please sign in to comment.