diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 2cb2ffe13..c823e801d 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -105,6 +105,43 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result< Ok(()) } +// Handler for POST /v1/logs to ingest OTEL logs +// ingests events by extracting stream name from header +// creates if stream does not exist +pub async fn ingest_otel_logs(req: HttpRequest, body: Bytes) -> Result { + if let Some((_, stream_name)) = req + .headers() + .iter() + .find(|&(key, _)| key == STREAM_NAME_HEADER_KEY) + { + let stream_name = stream_name.to_str().unwrap().to_owned(); + create_stream_if_not_exists(&stream_name).await?; + + //flatten logs + if let Some((_, log_source)) = req.headers().iter().find(|&(key, _)| key == LOG_SOURCE_KEY) + { + let log_source: String = log_source.to_str().unwrap().to_owned(); + if log_source == LOG_SOURCE_OTEL { + let mut json = otel::flatten_otel_logs(&body); + for record in json.iter_mut() { + let body: Bytes = serde_json::to_vec(record).unwrap().into(); + push_logs(stream_name.to_string(), req.clone(), body).await?; + } + } else { + log::warn!("Unknown log source: {}", log_source); + return Err(PostError::CustomError("Unknown log source".to_string())); + } + } else { + return Err(PostError::CustomError( + "log source key header is missing".to_string(), + )); + } + } else { + return Err(PostError::Header(ParseHeaderError::MissingStreamName)); + } + Ok(HttpResponse::Ok().finish()) +} + async fn flatten_and_push_logs( req: HttpRequest, body: Bytes, @@ -116,7 +153,9 @@ async fn flatten_and_push_logs( let log_source: String = log_source.to_str().unwrap().to_owned(); match log_source.as_str() { LOG_SOURCE_KINESIS => json = kinesis::flatten_kinesis_logs(&body), - LOG_SOURCE_OTEL => json = otel::flatten_otel_logs(&body), + LOG_SOURCE_OTEL => { + json = otel::flatten_otel_logs(&body); + } _ => { log::warn!("Unknown log source: {}", log_source); push_logs(stream_name.to_string(), req.clone(), body).await?; diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index 26ed25d8f..869d92b8c 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -139,16 +139,18 @@ impl ParseableServer for IngestServer { impl IngestServer { // configure the api routes fn configure_routes(config: &mut web::ServiceConfig, _oidc_client: Option) { - config.service( - // Base path "{url}/api/v1" - web::scope(&base_path()) - .service(Server::get_ingest_factory()) - .service(Self::logstream_api()) - .service(Server::get_about_factory()) - .service(Self::analytics_factory()) - .service(Server::get_liveness_factory()) - .service(Server::get_readiness_factory()), - ); + config + .service( + // Base path "{url}/api/v1" + web::scope(&base_path()) + .service(Server::get_ingest_factory()) + .service(Self::logstream_api()) + .service(Server::get_about_factory()) + .service(Self::analytics_factory()) + .service(Server::get_liveness_factory()) + .service(Server::get_readiness_factory()), + ) + .service(Server::get_ingest_otel_factory()); } fn analytics_factory() -> Scope { diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index ad14750b4..70a226a72 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -151,6 +151,7 @@ impl Server { .service(Self::get_oauth_webscope(oidc_client)) .service(Self::get_user_role_webscope()), ) + .service(Self::get_ingest_otel_factory()) .service(Self::get_generated()); } @@ -347,6 +348,17 @@ impl Server { .app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)) } + // /v1/logs endpoint to be used for OTEL log ingestion only + pub fn get_ingest_otel_factory() -> Resource { + web::resource("/v1/logs") + .route( + web::post() + .to(ingest::ingest_otel_logs) + .authorize_for_stream(Action::Ingest), + ) + .app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)) + } + // get the oauth webscope pub fn get_oauth_webscope(oidc_client: Option) -> Scope { let oauth = web::scope("/o") diff --git a/server/src/handlers/http/otel.rs b/server/src/handlers/http/otel.rs index 83a6404f8..24a5d3547 100644 --- a/server/src/handlers/http/otel.rs +++ b/server/src/handlers/http/otel.rs @@ -56,10 +56,18 @@ fn collect_json_from_any_value( if value.array_val.is_some() { let array_val = value.array_val.as_ref().unwrap(); let values = &array_val.values; - for value in values { - let value = &value.value; - value_json = collect_json_from_any_value(key, value.clone()); + let array_value_json = collect_json_from_any_value(key, value.clone()); + for key in array_value_json.keys() { + value_json.insert( + format!( + "{}_{}", + key.to_owned(), + value_to_string(array_value_json[key].to_owned()) + ), + array_value_json[key].to_owned(), + ); + } } } @@ -69,7 +77,22 @@ fn collect_json_from_any_value( let kv_list_val = value.kv_list_val.unwrap(); for key_value in kv_list_val.values { let value = key_value.value; - value_json = collect_json_from_values(&value, key); + if value.is_some() { + let value = value.unwrap(); + let key_value_json = collect_json_from_any_value(key, value); + + for key in key_value_json.keys() { + value_json.insert( + format!( + "{}_{}_{}", + key.to_owned(), + key_value.key, + value_to_string(key_value_json[key].to_owned()) + ), + key_value_json[key].to_owned(), + ); + } + } } } if value.bytes_val.is_some() { @@ -96,6 +119,14 @@ fn collect_json_from_values( value_json } +fn value_to_string(value: serde_json::Value) -> String { + match value.clone() { + e @ Value::Number(_) | e @ Value::Bool(_) => e.to_string(), + Value::String(s) => s, + _ => "".to_string(), + } +} + pub fn flatten_otel_logs(body: &Bytes) -> Vec> { let mut vec_otel_json: Vec> = Vec::new(); let body_str = std::str::from_utf8(body).unwrap(); @@ -117,10 +148,12 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec> { } } } - if resource.dropped_attributes_count > 0 { + if resource.dropped_attributes_count.is_some() { otel_json.insert( "resource_dropped_attributes_count".to_string(), - Value::Number(serde_json::Number::from(resource.dropped_attributes_count)), + Value::Number(serde_json::Number::from( + resource.dropped_attributes_count.unwrap(), + )), ); } } @@ -128,16 +161,20 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec> { for scope_logs in record.scope_logs.iter() { for scope_log in scope_logs.iter() { for instrumentation_scope in scope_log.scope.iter() { - if !instrumentation_scope.name.is_empty() { + if instrumentation_scope.name.is_some() { otel_json.insert( "instrumentation_scope_name".to_string(), - Value::String(instrumentation_scope.name.to_string()), + Value::String( + instrumentation_scope.name.as_ref().unwrap().to_string(), + ), ); } - if !instrumentation_scope.version.is_empty() { + if instrumentation_scope.version.is_some() { otel_json.insert( "instrumentation_scope_version".to_string(), - Value::String(instrumentation_scope.version.to_string()), + Value::String( + instrumentation_scope.version.as_ref().unwrap().to_string(), + ), ); } let attributes = &instrumentation_scope.attributes; @@ -154,11 +191,11 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec> { } } } - if instrumentation_scope.dropped_attributes_count > 0 { + if instrumentation_scope.dropped_attributes_count.is_some() { otel_json.insert( "instrumentation_scope_dropped_attributes_count".to_string(), Value::Number(serde_json::Number::from( - instrumentation_scope.dropped_attributes_count, + instrumentation_scope.dropped_attributes_count.unwrap(), )), ); } @@ -166,25 +203,33 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec> { for log_record in scope_log.log_records.iter() { let mut log_record_json: BTreeMap = BTreeMap::new(); - if !log_record.time_unix_nano > 0 { + if log_record.time_unix_nano.is_some() { log_record_json.insert( "time_unix_nano".to_string(), - Value::String(log_record.time_unix_nano.to_string()), + Value::String( + log_record.time_unix_nano.as_ref().unwrap().to_string(), + ), ); } - if !log_record.observed_time_unix_nano > 0 { + if log_record.observed_time_unix_nano.is_some() { log_record_json.insert( "observed_time_unix_nano".to_string(), - Value::String(log_record.observed_time_unix_nano.to_string()), + Value::String( + log_record + .observed_time_unix_nano + .as_ref() + .unwrap() + .to_string(), + ), ); } - if log_record.severity_number > 0 { - let severity_number: i32 = log_record.severity_number; + if log_record.severity_number.is_some() { + let severity_number: i32 = log_record.severity_number.unwrap(); log_record_json.insert( "severity_number".to_string(), Value::Number(serde_json::Number::from(severity_number)), ); - if log_record.severity_text.is_empty() { + if log_record.severity_text.is_none() { log_record_json.insert( "severity_text".to_string(), Value::String( @@ -193,10 +238,12 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec> { ); } } - if !log_record.severity_text.is_empty() { + if log_record.severity_text.is_some() { log_record_json.insert( "severity_text".to_string(), - Value::String(log_record.severity_text.to_string()), + Value::String( + log_record.severity_text.as_ref().unwrap().to_string(), + ), ); } @@ -221,17 +268,17 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec> { } } - if log_record.dropped_attributes_count > 0 { + if log_record.dropped_attributes_count.is_some() { log_record_json.insert( "log_record_dropped_attributes_count".to_string(), Value::Number(serde_json::Number::from( - log_record.dropped_attributes_count, + log_record.dropped_attributes_count.unwrap(), )), ); } - if log_record.flags > 0 { - let flags: u32 = log_record.flags; + if log_record.flags.is_some() { + let flags: u32 = log_record.flags.unwrap(); log_record_json.insert( "flags_number".to_string(), Value::Number(serde_json::Number::from(flags)), @@ -242,17 +289,17 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec> { ); } - if !log_record.span_id.is_empty() { + if log_record.span_id.is_some() { log_record_json.insert( "span_id".to_string(), - Value::String(log_record.span_id.to_string()), + Value::String(log_record.span_id.as_ref().unwrap().to_string()), ); } - if !log_record.trace_id.is_empty() { + if log_record.trace_id.is_some() { log_record_json.insert( "trace_id".to_string(), - Value::String(log_record.trace_id.to_string()), + Value::String(log_record.trace_id.as_ref().unwrap().to_string()), ); } for key in log_record_json.keys() { @@ -261,18 +308,18 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec> { vec_otel_json.push(otel_json.clone()); } - if !scope_log.schema_url.is_empty() { + if scope_log.schema_url.is_some() { otel_json.insert( "scope_log_schema_url".to_string(), - Value::String(scope_log.schema_url.to_string()), + Value::String(scope_log.schema_url.as_ref().unwrap().to_string()), ); } } } - if !record.schema_url.is_empty() { + if record.schema_url.is_some() { otel_json.insert( "resource_schema_url".to_string(), - Value::String(record.schema_url.to_string()), + Value::String(record.schema_url.as_ref().unwrap().to_string()), ); } } diff --git a/server/src/handlers/http/otel/opentelemetry.proto.common.v1.rs b/server/src/handlers/http/otel/opentelemetry.proto.common.v1.rs index ca2ea99bc..65a23ac6b 100644 --- a/server/src/handlers/http/otel/opentelemetry.proto.common.v1.rs +++ b/server/src/handlers/http/otel/opentelemetry.proto.common.v1.rs @@ -53,7 +53,7 @@ /// since oneof in AnyValue does not allow repeated fields. pub struct ArrayValue { /// Array of values. The array may be empty (contain 0 elements). - pub values: Vec, + pub values: Vec, } #[derive(Serialize, Deserialize, Debug, Clone)] @@ -83,13 +83,13 @@ /// such as the fully qualified name and version. pub struct InstrumentationScope { /// An empty instrumentation scope name means the name is unknown. - pub name: String, - pub version: String, + pub name: Option, + pub version: Option, /// Additional attributes that describe the scope. \[Optional\]. /// Attribute keys MUST be unique (it is not allowed to have more than one /// attribute with the same key). pub attributes: Option>, #[serde(rename = "droppedAttributesCount")] - pub dropped_attributes_count: u32, + pub dropped_attributes_count: Option, } \ No newline at end of file diff --git a/server/src/handlers/http/otel/opentelemetry.proto.logs.v1.rs b/server/src/handlers/http/otel/opentelemetry.proto.logs.v1.rs index 318f85fbf..dc63286e3 100644 --- a/server/src/handlers/http/otel/opentelemetry.proto.logs.v1.rs +++ b/server/src/handlers/http/otel/opentelemetry.proto.logs.v1.rs @@ -58,7 +58,7 @@ /// This schema_url applies to the data in the "resource" field. It does not apply /// to the data in the "scope_logs" field which have their own schema_url field. #[serde(rename = "schemaUrl")] - pub schema_url: String, + pub schema_url: Option, } #[derive(Serialize, Deserialize, Debug)] @@ -73,7 +73,7 @@ pub log_records: Vec, /// This schema_url applies to all logs in the "logs" field. #[serde(rename = "schemaUrl")] - pub schema_url: String, + pub schema_url: Option, } #[derive(Serialize, Deserialize, Debug)] @@ -84,7 +84,7 @@ /// Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970. /// Value of 0 indicates unknown or missing timestamp. #[serde(rename = "timeUnixNano")] - pub time_unix_nano: u64, + pub time_unix_nano: Option, /// Time when the event was observed by the collection system. /// For events that originate in OpenTelemetry (e.g. using OpenTelemetry Logging SDK) /// this timestamp is typically set at the generation time and is equal to Timestamp. @@ -101,15 +101,15 @@ /// Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970. /// Value of 0 indicates unknown or missing timestamp. #[serde(rename = "observedTimeUnixNano")] - pub observed_time_unix_nano: u64, + pub observed_time_unix_nano: Option, /// Numerical value of the severity, normalized to values described in Log Data Model. /// \[Optional\]. #[serde(rename = "severityNumber")] - pub severity_number: i32, + pub severity_number: Option, /// The severity text (also known as log level). The original string representation as /// it is known at the source. \[Optional\]. #[serde(rename = "severityText")] - pub severity_text: String, + pub severity_text: Option, /// A value containing the body of the log record. Can be for example a human-readable /// string message (including multi-line) describing the event in a free form or it can /// be a structured data composed of arrays and maps of other values. \[Optional\]. @@ -119,13 +119,13 @@ /// attribute with the same key). pub attributes: Option>, #[serde(rename = "droppedAttributesCount")] - pub dropped_attributes_count: u32, + pub dropped_attributes_count: Option, /// Flags, a bit field. 8 least significant bits are the trace flags as /// defined in W3C Trace Context specification. 24 most significant bits are reserved /// and must be set to 0. Readers must not assume that 24 most significant bits /// will be zero and must correctly mask the bits when reading 8-bit trace flag (use /// flags & LOG_RECORD_FLAGS_TRACE_FLAGS_MASK). \[Optional\]. - pub flags: u32, + pub flags: Option, /// A unique identifier for a trace. All logs from the same trace share /// the same `trace_id`. The ID is a 16-byte array. An ID with all zeroes OR /// of length other than 16 bytes is considered invalid (empty string in OTLP/JSON @@ -138,7 +138,7 @@ /// - the field is not present, /// - the field contains an invalid value. #[serde(rename = "traceId")] - pub trace_id: String, + pub trace_id: Option, /// A unique identifier for a span within a trace, assigned when the span /// is created. The ID is an 8-byte array. An ID with all zeroes OR of length /// other than 8 bytes is considered invalid (empty string in OTLP/JSON @@ -152,7 +152,7 @@ /// - the field is not present, /// - the field contains an invalid value. #[serde(rename = "spanId")] - pub span_id: String, + pub span_id: Option, } /// Possible values for LogRecord.SeverityNumber. #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] diff --git a/server/src/handlers/http/otel/opentelemetry.proto.resource.v1.rs b/server/src/handlers/http/otel/opentelemetry.proto.resource.v1.rs index 1d72275b0..51f86481a 100644 --- a/server/src/handlers/http/otel/opentelemetry.proto.resource.v1.rs +++ b/server/src/handlers/http/otel/opentelemetry.proto.resource.v1.rs @@ -33,6 +33,6 @@ /// no attributes were dropped. #[serde(rename = "droppedAttributesCount")] - pub dropped_attributes_count: u32, + pub dropped_attributes_count: Option, } \ No newline at end of file