Skip to content

Commit

Permalink
parse span path as array (#333)
Browse files Browse the repository at this point in the history
  • Loading branch information
dinmukhamedm authored Jan 19, 2025
1 parent 0505983 commit 587138c
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 18 deletions.
4 changes: 3 additions & 1 deletion app-server/src/ch/spans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ impl CHSpan {
trace_id: span.trace_id,
provider: usage.provider_name.unwrap_or(String::from("<null>")),
user_id: span_attributes.user_id().unwrap_or(String::from("<null>")),
path: span_attributes.path().unwrap_or(String::from("<null>")),
path: span_attributes
.flat_path()
.unwrap_or(String::from("<null>")),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion app-server/src/traces/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ async fn inner_process_queue_spans<T: Storage + ?Sized>(
let registered_label_classes = match get_registered_label_classes_for_path(
&db.pool,
rabbitmq_span_message.project_id,
&span.get_attributes().path().unwrap_or_default(),
&span.get_attributes().flat_path().unwrap_or_default(),
)
.await
{
Expand Down
5 changes: 2 additions & 3 deletions app-server/src/traces/index.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! This module indexes spans in a vector database for further semantic search.
use anyhow::Result;
use serde_json::Value;
use std::{collections::HashMap, sync::Arc};
use uuid::Uuid;

Expand Down Expand Up @@ -82,8 +81,8 @@ fn create_datapoint(span: &Span, content: String, field_type: &str) -> Datapoint
data.insert("trace_id".to_string(), span.trace_id.to_string());
data.insert("span_id".to_string(), span.span_id.to_string());
data.insert("type".to_string(), field_type.to_string());
if let Some(Value::String(path)) = span.attributes.get(SPAN_PATH) {
data.insert("path".to_string(), path.to_owned());
if let Some(v) = span.attributes.get(SPAN_PATH) {
data.insert("path".to_string(), v.to_string());
};

Datapoint {
Expand Down
43 changes: 30 additions & 13 deletions app-server/src/traces/spans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,20 @@ impl SpanAttributes {
}
}

pub fn path(&self) -> Option<String> {
self.attributes
.get(SPAN_PATH)
.and_then(|p| p.as_str().map(|s| s.to_string()))
pub fn path(&self) -> Option<Vec<String>> {
match self.attributes.get(SPAN_PATH) {
Some(Value::Array(arr)) => Some(
arr.iter()
.map(|v| json_value_to_string(v.clone()))
.collect(),
),
Some(Value::String(s)) => Some(vec![s.clone()]),
_ => None,
}
}

pub fn flat_path(&self) -> Option<String> {
self.path().map(|path| path.join("."))
}

pub fn set_usage(&mut self, usage: &SpanUsage) {
Expand Down Expand Up @@ -205,15 +215,20 @@ impl SpanAttributes {
/// NOTE: Nested spans generated by Langchain auto-instrumentation may have the same path
/// because we don't have a way to set the span name/path in the auto-instrumentation code.
pub fn extend_span_path(&mut self, span_name: &str) {
if let Some(serde_json::Value::String(path)) = self.attributes.get(SPAN_PATH) {
if !(path.ends_with(&format!(".{span_name}")) || path == span_name) {
let span_path = format!("{}.{}", path, span_name);
if let Some(serde_json::Value::Array(path)) = self.attributes.get(SPAN_PATH) {
if path.len() > 0
&& !matches!(path.last().unwrap(), serde_json::Value::String(s) if s == span_name)
{
let mut new_path = path.clone();
new_path.push(serde_json::Value::String(span_name.to_string()));
self.attributes
.insert(SPAN_PATH.to_string(), Value::String(span_path));
.insert(SPAN_PATH.to_string(), Value::Array(new_path));
}
} else {
self.attributes
.insert(SPAN_PATH.to_string(), Value::String(span_name.to_string()));
self.attributes.insert(
SPAN_PATH.to_string(),
Value::Array(vec![serde_json::Value::String(span_name.to_string())]),
);
}
}

Expand Down Expand Up @@ -492,7 +507,7 @@ impl Span {
messages: &HashMap<Uuid, Message>,
trace_id: Uuid,
parent_span_id: Uuid,
parent_span_path: String,
parent_span_path: Vec<String>,
) -> Vec<Self> {
messages
.iter()
Expand All @@ -509,7 +524,9 @@ impl Span {
// i.e. do not append span name.
parent_span_path.clone()
} else {
format!("{}.{}", parent_span_path, message.node_name)
let mut path = parent_span_path.clone();
path.push(message.node_name.clone());
path
};

let input_values = message
Expand Down Expand Up @@ -571,7 +588,7 @@ impl Span {
}
}

fn span_attributes_from_meta_log(meta_log: Option<MetaLog>, span_path: String) -> Value {
fn span_attributes_from_meta_log(meta_log: Option<MetaLog>, span_path: Vec<String>) -> Value {
let mut attributes = HashMap::new();

if let Some(MetaLog::LLM(llm_log)) = meta_log {
Expand Down

0 comments on commit 587138c

Please sign in to comment.