Skip to content

Commit

Permalink
feat: allow otel compatibility changes (#803)
Browse files Browse the repository at this point in the history
This PR ensures we're compatible to OTEL with

- allow /v1/logs endpoint for otel log ingestion
- make all fields optional to allow ingestion
  • Loading branch information
nikhilsinhaparseable authored Jun 7, 2024
1 parent 1b6f992 commit ad39f57
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 59 deletions.
41 changes: 40 additions & 1 deletion server/src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,43 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
Ok(())
}

// Handler for POST /v1/logs to ingest OTEL logs
// ingests events by extracting stream name from header
// creates if stream does not exist
pub async fn ingest_otel_logs(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostError> {
if let Some((_, stream_name)) = req
.headers()
.iter()
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
{
let stream_name = stream_name.to_str().unwrap().to_owned();
create_stream_if_not_exists(&stream_name).await?;

//flatten logs
if let Some((_, log_source)) = req.headers().iter().find(|&(key, _)| key == LOG_SOURCE_KEY)
{
let log_source: String = log_source.to_str().unwrap().to_owned();
if log_source == LOG_SOURCE_OTEL {
let mut json = otel::flatten_otel_logs(&body);
for record in json.iter_mut() {
let body: Bytes = serde_json::to_vec(record).unwrap().into();
push_logs(stream_name.to_string(), req.clone(), body).await?;
}
} else {
log::warn!("Unknown log source: {}", log_source);
return Err(PostError::CustomError("Unknown log source".to_string()));
}
} else {
return Err(PostError::CustomError(
"log source key header is missing".to_string(),
));
}
} else {
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
}
Ok(HttpResponse::Ok().finish())
}

async fn flatten_and_push_logs(
req: HttpRequest,
body: Bytes,
Expand All @@ -116,7 +153,9 @@ async fn flatten_and_push_logs(
let log_source: String = log_source.to_str().unwrap().to_owned();
match log_source.as_str() {
LOG_SOURCE_KINESIS => json = kinesis::flatten_kinesis_logs(&body),
LOG_SOURCE_OTEL => json = otel::flatten_otel_logs(&body),
LOG_SOURCE_OTEL => {
json = otel::flatten_otel_logs(&body);
}
_ => {
log::warn!("Unknown log source: {}", log_source);
push_logs(stream_name.to_string(), req.clone(), body).await?;
Expand Down
22 changes: 12 additions & 10 deletions server/src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,16 +139,18 @@ impl ParseableServer for IngestServer {
impl IngestServer {
// configure the api routes
fn configure_routes(config: &mut web::ServiceConfig, _oidc_client: Option<OpenIdClient>) {
config.service(
// Base path "{url}/api/v1"
web::scope(&base_path())
.service(Server::get_ingest_factory())
.service(Self::logstream_api())
.service(Server::get_about_factory())
.service(Self::analytics_factory())
.service(Server::get_liveness_factory())
.service(Server::get_readiness_factory()),
);
config
.service(
// Base path "{url}/api/v1"
web::scope(&base_path())
.service(Server::get_ingest_factory())
.service(Self::logstream_api())
.service(Server::get_about_factory())
.service(Self::analytics_factory())
.service(Server::get_liveness_factory())
.service(Server::get_readiness_factory()),
)
.service(Server::get_ingest_otel_factory());
}

fn analytics_factory() -> Scope {
Expand Down
12 changes: 12 additions & 0 deletions server/src/handlers/http/modal/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ impl Server {
.service(Self::get_oauth_webscope(oidc_client))
.service(Self::get_user_role_webscope()),
)
.service(Self::get_ingest_otel_factory())
.service(Self::get_generated());
}

Expand Down Expand Up @@ -347,6 +348,17 @@ impl Server {
.app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE))
}

// /v1/logs endpoint to be used for OTEL log ingestion only
pub fn get_ingest_otel_factory() -> Resource {
web::resource("/v1/logs")
.route(
web::post()
.to(ingest::ingest_otel_logs)
.authorize_for_stream(Action::Ingest),
)
.app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE))
}

// get the oauth webscope
pub fn get_oauth_webscope(oidc_client: Option<OpenIdClient>) -> Scope {
let oauth = web::scope("/o")
Expand Down
113 changes: 80 additions & 33 deletions server/src/handlers/http/otel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,18 @@ fn collect_json_from_any_value(
if value.array_val.is_some() {
let array_val = value.array_val.as_ref().unwrap();
let values = &array_val.values;

for value in values {
let value = &value.value;
value_json = collect_json_from_any_value(key, value.clone());
let array_value_json = collect_json_from_any_value(key, value.clone());
for key in array_value_json.keys() {
value_json.insert(
format!(
"{}_{}",
key.to_owned(),
value_to_string(array_value_json[key].to_owned())
),
array_value_json[key].to_owned(),
);
}
}
}

Expand All @@ -69,7 +77,22 @@ fn collect_json_from_any_value(
let kv_list_val = value.kv_list_val.unwrap();
for key_value in kv_list_val.values {
let value = key_value.value;
value_json = collect_json_from_values(&value, key);
if value.is_some() {
let value = value.unwrap();
let key_value_json = collect_json_from_any_value(key, value);

for key in key_value_json.keys() {
value_json.insert(
format!(
"{}_{}_{}",
key.to_owned(),
key_value.key,
value_to_string(key_value_json[key].to_owned())
),
key_value_json[key].to_owned(),
);
}
}
}
}
if value.bytes_val.is_some() {
Expand All @@ -96,6 +119,14 @@ fn collect_json_from_values(
value_json
}

fn value_to_string(value: serde_json::Value) -> String {
match value.clone() {
e @ Value::Number(_) | e @ Value::Bool(_) => e.to_string(),
Value::String(s) => s,
_ => "".to_string(),
}
}

pub fn flatten_otel_logs(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
let mut vec_otel_json: Vec<BTreeMap<String, Value>> = Vec::new();
let body_str = std::str::from_utf8(body).unwrap();
Expand All @@ -117,27 +148,33 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
}
}
}
if resource.dropped_attributes_count > 0 {
if resource.dropped_attributes_count.is_some() {
otel_json.insert(
"resource_dropped_attributes_count".to_string(),
Value::Number(serde_json::Number::from(resource.dropped_attributes_count)),
Value::Number(serde_json::Number::from(
resource.dropped_attributes_count.unwrap(),
)),
);
}
}

for scope_logs in record.scope_logs.iter() {
for scope_log in scope_logs.iter() {
for instrumentation_scope in scope_log.scope.iter() {
if !instrumentation_scope.name.is_empty() {
if instrumentation_scope.name.is_some() {
otel_json.insert(
"instrumentation_scope_name".to_string(),
Value::String(instrumentation_scope.name.to_string()),
Value::String(
instrumentation_scope.name.as_ref().unwrap().to_string(),
),
);
}
if !instrumentation_scope.version.is_empty() {
if instrumentation_scope.version.is_some() {
otel_json.insert(
"instrumentation_scope_version".to_string(),
Value::String(instrumentation_scope.version.to_string()),
Value::String(
instrumentation_scope.version.as_ref().unwrap().to_string(),
),
);
}
let attributes = &instrumentation_scope.attributes;
Expand All @@ -154,37 +191,45 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
}
}
}
if instrumentation_scope.dropped_attributes_count > 0 {
if instrumentation_scope.dropped_attributes_count.is_some() {
otel_json.insert(
"instrumentation_scope_dropped_attributes_count".to_string(),
Value::Number(serde_json::Number::from(
instrumentation_scope.dropped_attributes_count,
instrumentation_scope.dropped_attributes_count.unwrap(),
)),
);
}
}

for log_record in scope_log.log_records.iter() {
let mut log_record_json: BTreeMap<String, Value> = BTreeMap::new();
if !log_record.time_unix_nano > 0 {
if log_record.time_unix_nano.is_some() {
log_record_json.insert(
"time_unix_nano".to_string(),
Value::String(log_record.time_unix_nano.to_string()),
Value::String(
log_record.time_unix_nano.as_ref().unwrap().to_string(),
),
);
}
if !log_record.observed_time_unix_nano > 0 {
if log_record.observed_time_unix_nano.is_some() {
log_record_json.insert(
"observed_time_unix_nano".to_string(),
Value::String(log_record.observed_time_unix_nano.to_string()),
Value::String(
log_record
.observed_time_unix_nano
.as_ref()
.unwrap()
.to_string(),
),
);
}
if log_record.severity_number > 0 {
let severity_number: i32 = log_record.severity_number;
if log_record.severity_number.is_some() {
let severity_number: i32 = log_record.severity_number.unwrap();
log_record_json.insert(
"severity_number".to_string(),
Value::Number(serde_json::Number::from(severity_number)),
);
if log_record.severity_text.is_empty() {
if log_record.severity_text.is_none() {
log_record_json.insert(
"severity_text".to_string(),
Value::String(
Expand All @@ -193,10 +238,12 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
);
}
}
if !log_record.severity_text.is_empty() {
if log_record.severity_text.is_some() {
log_record_json.insert(
"severity_text".to_string(),
Value::String(log_record.severity_text.to_string()),
Value::String(
log_record.severity_text.as_ref().unwrap().to_string(),
),
);
}

Expand All @@ -221,17 +268,17 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
}
}

if log_record.dropped_attributes_count > 0 {
if log_record.dropped_attributes_count.is_some() {
log_record_json.insert(
"log_record_dropped_attributes_count".to_string(),
Value::Number(serde_json::Number::from(
log_record.dropped_attributes_count,
log_record.dropped_attributes_count.unwrap(),
)),
);
}

if log_record.flags > 0 {
let flags: u32 = log_record.flags;
if log_record.flags.is_some() {
let flags: u32 = log_record.flags.unwrap();
log_record_json.insert(
"flags_number".to_string(),
Value::Number(serde_json::Number::from(flags)),
Expand All @@ -242,17 +289,17 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
);
}

if !log_record.span_id.is_empty() {
if log_record.span_id.is_some() {
log_record_json.insert(
"span_id".to_string(),
Value::String(log_record.span_id.to_string()),
Value::String(log_record.span_id.as_ref().unwrap().to_string()),
);
}

if !log_record.trace_id.is_empty() {
if log_record.trace_id.is_some() {
log_record_json.insert(
"trace_id".to_string(),
Value::String(log_record.trace_id.to_string()),
Value::String(log_record.trace_id.as_ref().unwrap().to_string()),
);
}
for key in log_record_json.keys() {
Expand All @@ -261,18 +308,18 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
vec_otel_json.push(otel_json.clone());
}

if !scope_log.schema_url.is_empty() {
if scope_log.schema_url.is_some() {
otel_json.insert(
"scope_log_schema_url".to_string(),
Value::String(scope_log.schema_url.to_string()),
Value::String(scope_log.schema_url.as_ref().unwrap().to_string()),
);
}
}
}
if !record.schema_url.is_empty() {
if record.schema_url.is_some() {
otel_json.insert(
"resource_schema_url".to_string(),
Value::String(record.schema_url.to_string()),
Value::String(record.schema_url.as_ref().unwrap().to_string()),
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
/// since oneof in AnyValue does not allow repeated fields.
pub struct ArrayValue {
/// Array of values. The array may be empty (contain 0 elements).
pub values: Vec<AnyValue>,
pub values: Vec<Value>,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
Expand Down Expand Up @@ -83,13 +83,13 @@
/// such as the fully qualified name and version.
pub struct InstrumentationScope {
/// An empty instrumentation scope name means the name is unknown.
pub name: String,
pub version: String,
pub name: Option<String>,
pub version: Option<String>,
/// Additional attributes that describe the scope. \[Optional\].
/// Attribute keys MUST be unique (it is not allowed to have more than one
/// attribute with the same key).
pub attributes: Option<Vec<KeyValue>>,
#[serde(rename = "droppedAttributesCount")]
pub dropped_attributes_count: u32,
pub dropped_attributes_count: Option<u32>,
}

Loading

0 comments on commit ad39f57

Please sign in to comment.