From 6709106b40b286c3758645ad7cea7ab5cb8c2b4e Mon Sep 17 00:00:00 2001 From: Din Date: Sun, 19 Jan 2025 23:53:54 +0500 Subject: [PATCH] parse span path as array --- app-server/src/ch/spans.rs | 4 ++- app-server/src/traces/consumer.rs | 2 +- app-server/src/traces/index.rs | 5 ++-- app-server/src/traces/spans.rs | 43 +++++++++++++++++++++---------- 4 files changed, 36 insertions(+), 18 deletions(-) diff --git a/app-server/src/ch/spans.rs b/app-server/src/ch/spans.rs index cd707c2a..99b083d6 100644 --- a/app-server/src/ch/spans.rs +++ b/app-server/src/ch/spans.rs @@ -89,7 +89,9 @@ impl CHSpan { trace_id: span.trace_id, provider: usage.provider_name.unwrap_or(String::from("")), user_id: span_attributes.user_id().unwrap_or(String::from("")), - path: span_attributes.path().unwrap_or(String::from("")), + path: span_attributes + .flat_path() + .unwrap_or(String::from("")), } } } diff --git a/app-server/src/traces/consumer.rs b/app-server/src/traces/consumer.rs index 112e14a9..4266a63c 100644 --- a/app-server/src/traces/consumer.rs +++ b/app-server/src/traces/consumer.rs @@ -244,7 +244,7 @@ async fn inner_process_queue_spans( let registered_label_classes = match get_registered_label_classes_for_path( &db.pool, rabbitmq_span_message.project_id, - &span.get_attributes().path().unwrap_or_default(), + &span.get_attributes().flat_path().unwrap_or_default(), ) .await { diff --git a/app-server/src/traces/index.rs b/app-server/src/traces/index.rs index 34656771..ccf6d2fa 100644 --- a/app-server/src/traces/index.rs +++ b/app-server/src/traces/index.rs @@ -1,7 +1,6 @@ //! This module indexes spans in a vector database for further semantic search. use anyhow::Result; -use serde_json::Value; use std::{collections::HashMap, sync::Arc}; use uuid::Uuid; @@ -82,8 +81,8 @@ fn create_datapoint(span: &Span, content: String, field_type: &str) -> Datapoint data.insert("trace_id".to_string(), span.trace_id.to_string()); data.insert("span_id".to_string(), span.span_id.to_string()); data.insert("type".to_string(), field_type.to_string()); - if let Some(Value::String(path)) = span.attributes.get(SPAN_PATH) { - data.insert("path".to_string(), path.to_owned()); + if let Some(v) = span.attributes.get(SPAN_PATH) { + data.insert("path".to_string(), v.to_string()); }; Datapoint { diff --git a/app-server/src/traces/spans.rs b/app-server/src/traces/spans.rs index fb4d0945..3dfde43d 100644 --- a/app-server/src/traces/spans.rs +++ b/app-server/src/traces/spans.rs @@ -165,10 +165,20 @@ impl SpanAttributes { } } - pub fn path(&self) -> Option { - self.attributes - .get(SPAN_PATH) - .and_then(|p| p.as_str().map(|s| s.to_string())) + pub fn path(&self) -> Option> { + match self.attributes.get(SPAN_PATH) { + Some(Value::Array(arr)) => Some( + arr.iter() + .map(|v| json_value_to_string(v.clone())) + .collect(), + ), + Some(Value::String(s)) => Some(vec![s.clone()]), + _ => None, + } + } + + pub fn flat_path(&self) -> Option { + self.path().map(|path| path.join(".")) } pub fn set_usage(&mut self, usage: &SpanUsage) { @@ -205,15 +215,20 @@ impl SpanAttributes { /// NOTE: Nested spans generated by Langchain auto-instrumentation may have the same path /// because we don't have a way to set the span name/path in the auto-instrumentation code. pub fn extend_span_path(&mut self, span_name: &str) { - if let Some(serde_json::Value::String(path)) = self.attributes.get(SPAN_PATH) { - if !(path.ends_with(&format!(".{span_name}")) || path == span_name) { - let span_path = format!("{}.{}", path, span_name); + if let Some(serde_json::Value::Array(path)) = self.attributes.get(SPAN_PATH) { + if path.len() > 0 + && !matches!(path.last().unwrap(), serde_json::Value::String(s) if s == span_name) + { + let mut new_path = path.clone(); + new_path.push(serde_json::Value::String(span_name.to_string())); self.attributes - .insert(SPAN_PATH.to_string(), Value::String(span_path)); + .insert(SPAN_PATH.to_string(), Value::Array(new_path)); } } else { - self.attributes - .insert(SPAN_PATH.to_string(), Value::String(span_name.to_string())); + self.attributes.insert( + SPAN_PATH.to_string(), + Value::Array(vec![serde_json::Value::String(span_name.to_string())]), + ); } } @@ -492,7 +507,7 @@ impl Span { messages: &HashMap, trace_id: Uuid, parent_span_id: Uuid, - parent_span_path: String, + parent_span_path: Vec, ) -> Vec { messages .iter() @@ -509,7 +524,9 @@ impl Span { // i.e. do not append span name. parent_span_path.clone() } else { - format!("{}.{}", parent_span_path, message.node_name) + let mut path = parent_span_path.clone(); + path.push(message.node_name.clone()); + path }; let input_values = message @@ -571,7 +588,7 @@ impl Span { } } -fn span_attributes_from_meta_log(meta_log: Option, span_path: String) -> Value { +fn span_attributes_from_meta_log(meta_log: Option, span_path: Vec) -> Value { let mut attributes = HashMap::new(); if let Some(MetaLog::LLM(llm_log)) = meta_log {