diff --git a/Cargo.lock b/Cargo.lock index 991829a75813..ec42ce4e2e53 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7281,6 +7281,7 @@ dependencies = [ "ordered-float 4.2.2", "percent-encoding", "rand", + "serde_json", "thiserror", ] @@ -7900,6 +7901,7 @@ dependencies = [ "futures", "greptime-proto", "itertools 0.10.5", + "jsonb", "lazy_static", "moka", "once_cell", @@ -10439,6 +10441,7 @@ dependencies = [ name = "servers" version = "0.9.3" dependencies = [ + "ahash 0.8.11", "aide", "api", "arrow", diff --git a/Cargo.toml b/Cargo.toml index 44b2cda1227f..b0e78249c3c2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -137,6 +137,7 @@ opentelemetry-proto = { version = "0.5", features = [ "metrics", "trace", "with-serde", + "logs", ] } parquet = { version = "51.0.0", default-features = false, features = ["arrow", "async", "object_store"] } paste = "1.0" diff --git a/src/datatypes/src/value.rs b/src/datatypes/src/value.rs index a8e59da51355..a02e6178ab0c 100644 --- a/src/datatypes/src/value.rs +++ b/src/datatypes/src/value.rs @@ -38,6 +38,7 @@ use snafu::{ensure, ResultExt}; use crate::error::{self, ConvertArrowArrayToScalarsSnafu, Error, Result, TryFromValueSnafu}; use crate::prelude::*; +use crate::schema::ColumnSchema; use crate::type_id::LogicalTypeId; use crate::types::{IntervalType, ListType}; use crate::vectors::ListVector; @@ -1237,10 +1238,16 @@ impl<'a> From>> for ValueRef<'a> { } } -impl<'a> TryFrom> for serde_json::Value { +pub struct ColumnPair<'a> { + pub value: ValueRef<'a>, + pub schema: &'a ColumnSchema, +} + +impl<'a> TryFrom> for serde_json::Value { type Error = serde_json::Error; - fn try_from(value: ValueRef<'a>) -> serde_json::Result { + fn try_from(value: ColumnPair<'a>) -> serde_json::Result { + let ColumnPair { value, schema } = value; let json_value = match value { ValueRef::Null => serde_json::Value::Null, ValueRef::Boolean(v) => serde_json::Value::Bool(v), @@ -1255,7 +1262,19 @@ impl<'a> TryFrom> for serde_json::Value { ValueRef::Float32(v) => serde_json::Value::from(v.0), ValueRef::Float64(v) => serde_json::Value::from(v.0), ValueRef::String(bytes) => serde_json::Value::String(bytes.to_string()), - ValueRef::Binary(bytes) => serde_json::to_value(bytes)?, + ValueRef::Binary(bytes) => { + if let ConcreteDataType::Json(_) = schema.data_type { + match jsonb::from_slice(bytes) { + Ok(json) => json.into(), + Err(e) => { + error!(e; "Failed to parse jsonb"); + serde_json::Value::Null + } + } + } else { + serde_json::to_value(bytes)? + } + } ValueRef::Date(v) => serde_json::Value::Number(v.val().into()), ValueRef::DateTime(v) => serde_json::Value::Number(v.val().into()), ValueRef::List(v) => serde_json::to_value(v)?, diff --git a/src/frontend/src/instance/otlp.rs b/src/frontend/src/instance/otlp.rs index 09335af0804e..30d6c6cb42d9 100644 --- a/src/frontend/src/instance/otlp.rs +++ b/src/frontend/src/instance/otlp.rs @@ -17,8 +17,10 @@ use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use client::Output; use common_error::ext::BoxedError; use common_telemetry::tracing; +use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; +use pipeline::PipelineWay; use servers::error::{self, AuthSnafu, Result as ServerResult}; use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef}; use servers::otlp; @@ -28,7 +30,7 @@ use session::context::QueryContextRef; use snafu::ResultExt; use crate::instance::Instance; -use crate::metrics::{OTLP_METRICS_ROWS, OTLP_TRACES_ROWS}; +use crate::metrics::{OTLP_LOGS_ROWS, OTLP_METRICS_ROWS, OTLP_TRACES_ROWS}; #[async_trait] impl OpenTelemetryProtocolHandler for Instance { @@ -92,4 +94,31 @@ impl OpenTelemetryProtocolHandler for Instance { .map_err(BoxedError::new) .context(error::ExecuteGrpcQuerySnafu) } + + #[tracing::instrument(skip_all)] + async fn logs( + &self, + request: ExportLogsServiceRequest, + pipeline: PipelineWay, + table_name: String, + ctx: QueryContextRef, + ) -> ServerResult { + self.plugins + .get::() + .as_ref() + .check_permission(ctx.current_user(), PermissionReq::Otlp) + .context(AuthSnafu)?; + + let interceptor_ref = self + .plugins + .get::>(); + interceptor_ref.pre_execute(ctx.clone())?; + let (requests, rows) = otlp::logs::to_grpc_insert_requests(request, pipeline, table_name)?; + + self.handle_row_inserts(requests, ctx) + .await + .inspect(|_| OTLP_LOGS_ROWS.inc_by(rows as u64)) + .map_err(BoxedError::new) + .context(error::ExecuteGrpcQuerySnafu) + } } diff --git a/src/frontend/src/metrics.rs b/src/frontend/src/metrics.rs index 33580c550e8b..a8bf4eb76eb5 100644 --- a/src/frontend/src/metrics.rs +++ b/src/frontend/src/metrics.rs @@ -41,16 +41,28 @@ lazy_static! { .with_label_values(&["insert"]); pub static ref EXECUTE_SCRIPT_ELAPSED: Histogram = HANDLE_SCRIPT_ELAPSED .with_label_values(&["execute"]); + + /// The number of OpenTelemetry metrics send by frontend node. pub static ref OTLP_METRICS_ROWS: IntCounter = register_int_counter!( "greptime_frontend_otlp_metrics_rows", "frontend otlp metrics rows" ) .unwrap(); + + /// The number of OpenTelemetry traces send by frontend node. pub static ref OTLP_TRACES_ROWS: IntCounter = register_int_counter!( "greptime_frontend_otlp_traces_rows", "frontend otlp traces rows" ) .unwrap(); + + /// The number of OpenTelemetry logs send by frontend node. + pub static ref OTLP_LOGS_ROWS: IntCounter = register_int_counter!( + "greptime_frontend_otlp_logs_rows", + "frontend otlp logs rows" + ) + .unwrap(); + /// The number of heartbeats send by frontend node. pub static ref HEARTBEAT_SENT_COUNT: IntCounter = register_int_counter!( "greptime_frontend_heartbeat_send_count", diff --git a/src/pipeline/Cargo.toml b/src/pipeline/Cargo.toml index 5b85b999feff..2402605f7fe8 100644 --- a/src/pipeline/Cargo.toml +++ b/src/pipeline/Cargo.toml @@ -40,6 +40,7 @@ enum_dispatch = "0.3" futures.workspace = true greptime-proto.workspace = true itertools.workspace = true +jsonb.workspace = true lazy_static.workspace = true moka = { workspace = true, features = ["sync"] } once_cell.workspace = true diff --git a/src/pipeline/src/etl.rs b/src/pipeline/src/etl.rs index f29032e4f8a2..bec06cab8bfb 100644 --- a/src/pipeline/src/etl.rs +++ b/src/pipeline/src/etl.rs @@ -210,6 +210,37 @@ where self.transformer.transform_mut(val) } + pub fn prepare_pipeline_value(&self, val: Value, result: &mut [Value]) -> Result<()> { + match val { + Value::Map(map) => { + let mut search_from = 0; + // because of the key in the json map is ordered + for (payload_key, payload_value) in map.values.into_iter() { + if search_from >= self.required_keys.len() { + break; + } + + // because of map key is ordered, required_keys is ordered too + if let Some(pos) = self.required_keys[search_from..] + .iter() + .position(|k| k == &payload_key) + { + result[search_from + pos] = payload_value; + // next search from is always after the current key + search_from += pos; + } + } + } + Value::String(_) => { + result[0] = val; + } + _ => { + return PrepareValueMustBeObjectSnafu.fail(); + } + } + Ok(()) + } + pub fn prepare(&self, val: serde_json::Value, result: &mut [Value]) -> Result<()> { match val { serde_json::Value::Object(map) => { @@ -286,6 +317,11 @@ pub(crate) fn find_key_index(intermediate_keys: &[String], key: &str, kind: &str .context(IntermediateKeyIndexSnafu { kind, key }) } +pub enum PipelineWay { + Identity, + Custom(std::sync::Arc>), +} + #[cfg(test)] mod tests { diff --git a/src/pipeline/src/etl/error.rs b/src/pipeline/src/etl/error.rs index 3680053ba0d7..57d3d52ea7e9 100644 --- a/src/pipeline/src/etl/error.rs +++ b/src/pipeline/src/etl/error.rs @@ -438,6 +438,18 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + #[snafu(display("can not coerce nested type to {ty}"))] + CoerceNestedType { + ty: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("can not coerce {ty} to nested type"))] + CoerceTypeToNested { + ty: String, + #[snafu(implicit)] + location: Location, + }, #[snafu(display( "invalid resolution: '{resolution}'. Available resolutions: {valid_resolution}" diff --git a/src/pipeline/src/etl/processor/cmcd.rs b/src/pipeline/src/etl/processor/cmcd.rs index f43186b94aa0..06cfeb7c6905 100644 --- a/src/pipeline/src/etl/processor/cmcd.rs +++ b/src/pipeline/src/etl/processor/cmcd.rs @@ -402,7 +402,8 @@ impl Processor for CmcdProcessor { #[cfg(test)] mod tests { - use ahash::HashMap; + use std::collections::BTreeMap; + use urlencoding::decode; use super::{CmcdProcessorBuilder, CMCD_KEYS}; @@ -563,14 +564,14 @@ mod tests { let values = vec .into_iter() .map(|(k, v)| (k.to_string(), v)) - .collect::>(); + .collect::>(); let expected = Map { values }; let actual = processor.parse(0, &decoded).unwrap(); let actual = actual .into_iter() .map(|(index, value)| (intermediate_keys[index].clone(), value)) - .collect::>(); + .collect::>(); let actual = Map { values: actual }; assert_eq!(actual, expected); } diff --git a/src/pipeline/src/etl/processor/regex.rs b/src/pipeline/src/etl/processor/regex.rs index a74c19140c19..de25195f99ab 100644 --- a/src/pipeline/src/etl/processor/regex.rs +++ b/src/pipeline/src/etl/processor/regex.rs @@ -383,6 +383,8 @@ impl Processor for RegexProcessor { } #[cfg(test)] mod tests { + use std::collections::BTreeMap; + use ahash::{HashMap, HashMapExt}; use itertools::Itertools; @@ -475,14 +477,14 @@ ignore_missing: false"#; .map(|k| k.to_string()) .collect_vec(); let processor = builder.build(&intermediate_keys).unwrap(); - let mut result = HashMap::new(); + let mut result = BTreeMap::new(); for (index, pattern) in processor.patterns.iter().enumerate() { let r = processor .process(&breadcrumbs_str, pattern, (0, index)) .unwrap() .into_iter() .map(|(k, v)| (intermediate_keys[k].clone(), v)) - .collect::>(); + .collect::>(); result.extend(r); } let map = Map { values: result }; diff --git a/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs b/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs index 827613b02b60..48356bc96e67 100644 --- a/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs +++ b/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs @@ -20,7 +20,8 @@ use greptime_proto::v1::{ColumnDataType, ColumnSchema, SemanticType}; use snafu::ResultExt; use crate::etl::error::{ - CoerceStringToTypeSnafu, CoerceUnsupportedEpochTypeSnafu, CoerceUnsupportedNullTypeSnafu, + CoerceNestedTypeSnafu, CoerceStringToTypeSnafu, CoerceTypeToNestedSnafu, + CoerceUnsupportedEpochTypeSnafu, CoerceUnsupportedNullTypeSnafu, CoerceUnsupportedNullTypeToSnafu, ColumnOptionsSnafu, Error, Result, }; use crate::etl::transform::index::Index; @@ -187,8 +188,8 @@ pub(crate) fn coerce_value(val: &Value, transform: &Transform) -> Result Ok(Some(ValueData::TimestampSecondValue(*s))), - Value::Array(_) => unimplemented!("Array type not supported"), - Value::Map(_) => unimplemented!("Object type not supported"), + Value::Array(_) => coerce_nested_value(val, transform), + Value::Map(_) => coerce_nested_value(val, transform), } } @@ -414,6 +415,32 @@ fn coerce_string_value(s: &String, transform: &Transform) -> Result Result> { + match &transform.type_ { + Value::Array(_) | Value::Map(_) => (), + t => { + return CoerceNestedTypeSnafu { + ty: t.to_str_type(), + } + .fail(); + } + } + match v { + Value::Map(_) => { + let data: jsonb::Value = v.into(); + Ok(Some(ValueData::BinaryValue(data.to_vec()))) + } + Value::Array(_) => { + let data: jsonb::Value = v.into(); + Ok(Some(ValueData::BinaryValue(data.to_vec()))) + } + _ => CoerceTypeToNestedSnafu { + ty: v.to_str_type(), + } + .fail(), + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/pipeline/src/etl/value.rs b/src/pipeline/src/etl/value.rs index 3adde2514b9e..f2617aff3ba7 100644 --- a/src/pipeline/src/etl/value.rs +++ b/src/pipeline/src/etl/value.rs @@ -16,8 +16,10 @@ pub mod array; pub mod map; pub mod time; -use ahash::{HashMap, HashMapExt}; +use std::collections::BTreeMap; + pub use array::Array; +use jsonb::{Number as JsonbNumber, Object as JsonbObject, Value as JsonbValue}; pub use map::Map; use snafu::{OptionExt, ResultExt}; pub use time::Timestamp; @@ -112,6 +114,7 @@ impl Value { "array" => Ok(Value::Array(Array::default())), "map" => Ok(Value::Map(Map::default())), + "json" => Ok(Value::Map(Map::default())), _ => ValueParseTypeSnafu { t }.fail(), } @@ -287,7 +290,7 @@ impl TryFrom for Value { Ok(Value::Array(Array { values })) } serde_json::Value::Object(v) => { - let mut values = HashMap::with_capacity(v.len()); + let mut values = BTreeMap::new(); for (k, v) in v { values.insert(k, Value::try_from(v)?); } @@ -318,7 +321,7 @@ impl TryFrom<&yaml_rust::Yaml> for Value { Ok(Value::Array(Array { values })) } yaml_rust::Yaml::Hash(v) => { - let mut values = HashMap::new(); + let mut values = BTreeMap::new(); for (k, v) in v { let key = k .as_str() @@ -331,3 +334,41 @@ impl TryFrom<&yaml_rust::Yaml> for Value { } } } + +impl<'a> From<&Value> for JsonbValue<'a> { + fn from(value: &Value) -> Self { + match value { + Value::Null => JsonbValue::Null, + Value::Boolean(v) => JsonbValue::Bool(*v), + + Value::Int8(v) => JsonbValue::Number(JsonbNumber::Int64(*v as i64)), + Value::Int16(v) => JsonbValue::Number(JsonbNumber::Int64(*v as i64)), + Value::Int32(v) => JsonbValue::Number(JsonbNumber::Int64(*v as i64)), + Value::Int64(v) => JsonbValue::Number(JsonbNumber::Int64(*v)), + + Value::Uint8(v) => JsonbValue::Number(JsonbNumber::UInt64(*v as u64)), + Value::Uint16(v) => JsonbValue::Number(JsonbNumber::UInt64(*v as u64)), + Value::Uint32(v) => JsonbValue::Number(JsonbNumber::UInt64(*v as u64)), + Value::Uint64(v) => JsonbValue::Number(JsonbNumber::UInt64(*v)), + Value::Float32(v) => JsonbValue::Number(JsonbNumber::Float64(*v as f64)), + Value::Float64(v) => JsonbValue::Number(JsonbNumber::Float64(*v)), + Value::String(v) => JsonbValue::String(v.clone().into()), + Value::Timestamp(v) => JsonbValue::String(v.to_string().into()), + Value::Array(arr) => { + let mut vals: Vec = Vec::with_capacity(arr.len()); + for val in arr.iter() { + vals.push(val.into()); + } + JsonbValue::Array(vals) + } + Value::Map(obj) => { + let mut map = JsonbObject::new(); + for (k, v) in obj.iter() { + let val: JsonbValue = v.into(); + map.insert(k.to_string(), val); + } + JsonbValue::Object(map) + } + } + } +} diff --git a/src/pipeline/src/etl/value/map.rs b/src/pipeline/src/etl/value/map.rs index b8b81da7563b..7450fed20d70 100644 --- a/src/pipeline/src/etl/value/map.rs +++ b/src/pipeline/src/etl/value/map.rs @@ -12,21 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -use ahash::{HashMap, HashMapExt}; +use std::collections::BTreeMap; + +use ahash::HashMap; use crate::etl::value::Value; -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Default)] pub struct Map { - pub values: HashMap, -} - -impl Default for Map { - fn default() -> Self { - Self { - values: HashMap::with_capacity(30), - } - } + pub values: BTreeMap, } impl Map { @@ -47,12 +41,16 @@ impl Map { impl From> for Map { fn from(values: HashMap) -> Self { - Map { values } + let mut map = Map::default(); + for (k, v) in values.into_iter() { + map.insert(k, v); + } + map } } impl std::ops::Deref for Map { - type Target = HashMap; + type Target = BTreeMap; fn deref(&self) -> &Self::Target { &self.values diff --git a/src/pipeline/src/lib.rs b/src/pipeline/src/lib.rs index 8fc72c584484..a4d9767804d3 100644 --- a/src/pipeline/src/lib.rs +++ b/src/pipeline/src/lib.rs @@ -20,7 +20,7 @@ pub use etl::error::Result; pub use etl::processor::Processor; pub use etl::transform::{GreptimeTransformer, Transformer}; pub use etl::value::{Array, Map, Value}; -pub use etl::{parse, Content, Pipeline}; +pub use etl::{error as etl_error, parse, Content, Pipeline, PipelineWay}; pub use manager::{ error, pipeline_operator, table, util, PipelineInfo, PipelineRef, PipelineTableRef, PipelineVersion, diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index b94fa17d44c0..6fad5644e573 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -14,6 +14,7 @@ testing = [] workspace = true [dependencies] +ahash = "0.8" aide = { version = "0.9", features = ["axum"] } api.workspace = true arrow.workspace = true diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 0fde3b527c84..ebee059e1ddf 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -533,6 +533,13 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("OpenTelemetry log error"))] + OpenTelemetryLog { + source: pipeline::etl_error::Error, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -597,7 +604,8 @@ impl ErrorExt for Error { | MysqlValueConversion { .. } | ParseJson { .. } | UnsupportedContentType { .. } - | TimestampOverflow { .. } => StatusCode::InvalidArguments, + | TimestampOverflow { .. } + | OpenTelemetryLog { .. } => StatusCode::InvalidArguments, Catalog { source, .. } => source.status_code(), RowWriter { source, .. } => source.status_code(), diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index a2b72b548b1e..12db3346e6d1 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -36,6 +36,7 @@ use common_time::timestamp::TimeUnit; use common_time::Timestamp; use datatypes::data_type::DataType; use datatypes::schema::SchemaRef; +use datatypes::value::ColumnPair; use event::{LogState, LogValidatorRef}; use futures::FutureExt; use schemars::JsonSchema; @@ -239,14 +240,21 @@ impl HttpRecordsOutput { } else { let num_rows = recordbatches.iter().map(|r| r.num_rows()).sum::(); let mut rows = Vec::with_capacity(num_rows); + let schemas = schema.column_schemas(); let num_cols = schema.column_schemas().len(); rows.resize_with(num_rows, || Vec::with_capacity(num_cols)); let mut finished_row_cursor = 0; for recordbatch in recordbatches { - for col in recordbatch.columns() { + for (col_idx, col) in recordbatch.columns().iter().enumerate() { + // safety here: schemas length is equal to the number of columns in the recordbatch + let schema = &schemas[col_idx]; for row_idx in 0..recordbatch.num_rows() { - let value = Value::try_from(col.get_ref(row_idx)).context(ToJsonSnafu)?; + let column_pair = ColumnPair { + value: col.get_ref(row_idx), + schema, + }; + let value = Value::try_from(column_pair).context(ToJsonSnafu)?; rows[row_idx + finished_row_cursor].push(value); } } @@ -866,6 +874,7 @@ impl HttpServer { Router::new() .route("/v1/metrics", routing::post(otlp::metrics)) .route("/v1/traces", routing::post(otlp::traces)) + .route("/v1/logs", routing::post(otlp::logs)) .layer( ServiceBuilder::new() .layer(HandleErrorLayer::new(handle_error)) diff --git a/src/servers/src/http/otlp.rs b/src/servers/src/http/otlp.rs index 3efdaeec96d4..349bb2a3b9eb 100644 --- a/src/servers/src/http/otlp.rs +++ b/src/servers/src/http/otlp.rs @@ -12,20 +12,29 @@ // See the License for the specific language governing permissions and // limitations under the License. +use core::str; +use std::result::Result as StdResult; use std::sync::Arc; -use axum::extract::State; -use axum::http::header; +use axum::extract::{FromRequestParts, State}; +use axum::http::header::HeaderValue; +use axum::http::request::Parts; +use axum::http::{header, StatusCode}; use axum::response::IntoResponse; -use axum::Extension; +use axum::{async_trait, Extension}; use bytes::Bytes; use common_telemetry::tracing; +use opentelemetry_proto::tonic::collector::logs::v1::{ + ExportLogsServiceRequest, ExportLogsServiceResponse, +}; use opentelemetry_proto::tonic::collector::metrics::v1::{ ExportMetricsServiceRequest, ExportMetricsServiceResponse, }; use opentelemetry_proto::tonic::collector::trace::v1::{ ExportTraceServiceRequest, ExportTraceServiceResponse, }; +use pipeline::util::to_pipeline_version; +use pipeline::PipelineWay; use prost::Message; use session::context::{Channel, QueryContext}; use snafu::prelude::*; @@ -34,13 +43,18 @@ use super::header::{write_cost_header_map, CONTENT_TYPE_PROTOBUF}; use crate::error::{self, Result}; use crate::query_handler::OpenTelemetryProtocolHandlerRef; +const OTLP_GREPTIME_PIPELINE_NAME_HEADER_NAME: &str = "x-greptime-pipeline-name"; +const OTLP_GREPTIME_PIPELINE_VERSION_HEADER_NAME: &str = "x-greptime-pipeline-version"; +const OTLP_GREPTIME_TABLE_NAME_HEADER_NAME: &str = "x-greptime-table-name"; + #[axum_macros::debug_handler] #[tracing::instrument(skip_all, fields(protocol = "otlp", request_type = "metrics"))] pub async fn metrics( State(handler): State, Extension(mut query_ctx): Extension, + bytes: Bytes, -) -> Result { +) -> Result> { let db = query_ctx.get_db_string(); query_ctx.set_channel(Channel::Otlp); let query_ctx = Arc::new(query_ctx); @@ -53,7 +67,7 @@ pub async fn metrics( handler .metrics(request, query_ctx) .await - .map(|o| OtlpMetricsResponse { + .map(|o| OtlpResponse { resp_body: ExportMetricsServiceResponse { partial_success: None, }, @@ -61,27 +75,13 @@ pub async fn metrics( }) } -pub struct OtlpMetricsResponse { - resp_body: ExportMetricsServiceResponse, - write_cost: usize, -} - -impl IntoResponse for OtlpMetricsResponse { - fn into_response(self) -> axum::response::Response { - let mut header_map = write_cost_header_map(self.write_cost); - header_map.insert(header::CONTENT_TYPE, CONTENT_TYPE_PROTOBUF.clone()); - - (header_map, self.resp_body.encode_to_vec()).into_response() - } -} - #[axum_macros::debug_handler] #[tracing::instrument(skip_all, fields(protocol = "otlp", request_type = "traces"))] pub async fn traces( State(handler): State, Extension(mut query_ctx): Extension, bytes: Bytes, -) -> Result { +) -> Result> { let db = query_ctx.get_db_string(); query_ctx.set_channel(Channel::Otlp); let query_ctx = Arc::new(query_ctx); @@ -93,7 +93,7 @@ pub async fn traces( handler .traces(request, query_ctx) .await - .map(|o| OtlpTracesResponse { + .map(|o| OtlpResponse { resp_body: ExportTraceServiceResponse { partial_success: None, }, @@ -101,12 +101,145 @@ pub async fn traces( }) } -pub struct OtlpTracesResponse { - resp_body: ExportTraceServiceResponse, +pub struct PipelineInfo { + pub pipeline_name: Option, + pub pipeline_version: Option, +} + +fn pipeline_header_error( + header: &HeaderValue, + key: &str, +) -> StdResult { + let header_utf8 = str::from_utf8(header.as_bytes()); + match header_utf8 { + Ok(s) => Ok(s.to_string()), + Err(_) => Err(( + StatusCode::BAD_REQUEST, + format!("`{}` header is not valid UTF-8 string type.", key), + )), + } +} + +#[async_trait] +impl FromRequestParts for PipelineInfo +where + S: Send + Sync, +{ + type Rejection = (StatusCode, String); + + async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult { + let pipeline_name = parts.headers.get(OTLP_GREPTIME_PIPELINE_NAME_HEADER_NAME); + let pipeline_version = parts + .headers + .get(OTLP_GREPTIME_PIPELINE_VERSION_HEADER_NAME); + match (pipeline_name, pipeline_version) { + (Some(name), Some(version)) => Ok(PipelineInfo { + pipeline_name: Some(pipeline_header_error( + name, + OTLP_GREPTIME_PIPELINE_NAME_HEADER_NAME, + )?), + pipeline_version: Some(pipeline_header_error( + version, + OTLP_GREPTIME_PIPELINE_VERSION_HEADER_NAME, + )?), + }), + (None, _) => Ok(PipelineInfo { + pipeline_name: None, + pipeline_version: None, + }), + (Some(name), None) => Ok(PipelineInfo { + pipeline_name: Some(pipeline_header_error( + name, + OTLP_GREPTIME_PIPELINE_NAME_HEADER_NAME, + )?), + pipeline_version: None, + }), + } + } +} + +pub struct TableInfo { + table_name: String, +} + +#[async_trait] +impl FromRequestParts for TableInfo +where + S: Send + Sync, +{ + type Rejection = (StatusCode, String); + + async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult { + let table_name = parts.headers.get(OTLP_GREPTIME_TABLE_NAME_HEADER_NAME); + + match table_name { + Some(name) => Ok(TableInfo { + table_name: pipeline_header_error(name, OTLP_GREPTIME_TABLE_NAME_HEADER_NAME)?, + }), + None => Ok(TableInfo { + table_name: "opentelemetry_logs".to_string(), + }), + } + } +} + +#[axum_macros::debug_handler] +#[tracing::instrument(skip_all, fields(protocol = "otlp", request_type = "traces"))] +pub async fn logs( + State(handler): State, + Extension(mut query_ctx): Extension, + pipeline_info: PipelineInfo, + table_info: TableInfo, + bytes: Bytes, +) -> Result> { + let db = query_ctx.get_db_string(); + query_ctx.set_channel(Channel::Otlp); + let query_ctx = Arc::new(query_ctx); + let _timer = crate::metrics::METRIC_HTTP_OPENTELEMETRY_LOGS_ELAPSED + .with_label_values(&[db.as_str()]) + .start_timer(); + let request = ExportLogsServiceRequest::decode(bytes).context(error::DecodeOtlpRequestSnafu)?; + + let pipeline_way; + if let Some(pipeline_name) = &pipeline_info.pipeline_name { + let pipeline_version = + to_pipeline_version(pipeline_info.pipeline_version).map_err(|_| { + error::InvalidParameterSnafu { + reason: OTLP_GREPTIME_PIPELINE_VERSION_HEADER_NAME, + } + .build() + })?; + let pipeline = match handler + .get_pipeline(pipeline_name, pipeline_version, query_ctx.clone()) + .await + { + Ok(p) => p, + Err(e) => { + return Err(e); + } + }; + pipeline_way = PipelineWay::Custom(pipeline); + } else { + pipeline_way = PipelineWay::Identity; + } + + handler + .logs(request, pipeline_way, table_info.table_name, query_ctx) + .await + .map(|o| OtlpResponse { + resp_body: ExportLogsServiceResponse { + partial_success: None, + }, + write_cost: o.meta.cost, + }) +} + +pub struct OtlpResponse { + resp_body: T, write_cost: usize, } -impl IntoResponse for OtlpTracesResponse { +impl IntoResponse for OtlpResponse { fn into_response(self) -> axum::response::Response { let mut header_map = write_cost_header_map(self.write_cost); header_map.insert(header::CONTENT_TYPE, CONTENT_TYPE_PROTOBUF.clone()); diff --git a/src/servers/src/metrics.rs b/src/servers/src/metrics.rs index cdf927536f04..ead86f3ad88b 100644 --- a/src/servers/src/metrics.rs +++ b/src/servers/src/metrics.rs @@ -141,6 +141,13 @@ lazy_static! { &[METRIC_DB_LABEL] ) .unwrap(); + pub static ref METRIC_HTTP_OPENTELEMETRY_LOGS_ELAPSED: HistogramVec = + register_histogram_vec!( + "greptime_servers_http_otlp_logs_elapsed", + "servers http otlp logs elapsed", + &[METRIC_DB_LABEL] + ) + .unwrap(); pub static ref METRIC_HTTP_LOGS_INGESTION_COUNTER: IntCounterVec = register_int_counter_vec!( "greptime_servers_http_logs_ingestion_counter", "servers http logs ingestion counter", diff --git a/src/servers/src/otlp.rs b/src/servers/src/otlp.rs index 5b92d12cb629..cc1321ac7020 100644 --- a/src/servers/src/otlp.rs +++ b/src/servers/src/otlp.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod logs; pub mod metrics; pub mod plugin; pub mod trace; diff --git a/src/servers/src/otlp/logs.rs b/src/servers/src/otlp/logs.rs new file mode 100644 index 000000000000..b8dd0bafe16d --- /dev/null +++ b/src/servers/src/otlp/logs.rs @@ -0,0 +1,516 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{BTreeMap, HashMap as StdHashMap}; + +use api::v1::column_data_type_extension::TypeExt; +use api::v1::value::ValueData; +use api::v1::{ + ColumnDataType, ColumnDataTypeExtension, ColumnOptions, ColumnSchema, JsonTypeExtension, Row, + RowInsertRequest, RowInsertRequests, Rows, SemanticType, Value as GreptimeValue, +}; +use jsonb::{Number as JsonbNumber, Value as JsonbValue}; +use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; +use opentelemetry_proto::tonic::common::v1::{any_value, AnyValue, InstrumentationScope, KeyValue}; +use opentelemetry_proto::tonic::logs::v1::LogRecord; +use pipeline::{Array, Map, PipelineWay, Value as PipelineValue}; +use snafu::ResultExt; + +use super::trace::attributes::OtlpAnyValue; +use crate::error::{OpenTelemetryLogSnafu, Result}; +use crate::otlp::trace::span::bytes_to_hex_string; + +/// Normalize otlp instrumentation, metric and attribute names +/// +/// +/// - since the name are case-insensitive, we transform them to lowercase for +/// - better sql usability +/// - replace `.` and `-` with `_` +fn normalize_otlp_name(name: &str) -> String { + name.to_lowercase().replace(|c| c == '.' || c == '-', "_") +} + +/// Convert OpenTelemetry metrics to GreptimeDB insert requests +/// +/// See +/// +/// for data structure of OTLP metrics. +/// +/// Returns `InsertRequests` and total number of rows to ingest +pub fn to_grpc_insert_requests( + request: ExportLogsServiceRequest, + pipeline: PipelineWay, + table_name: String, +) -> Result<(RowInsertRequests, usize)> { + match pipeline { + PipelineWay::Identity => { + let rows = parse_export_logs_service_request_to_rows(request); + let len = rows.rows.len(); + let insert_request = RowInsertRequest { + rows: Some(rows), + table_name, + }; + Ok(( + RowInsertRequests { + inserts: vec![insert_request], + }, + len, + )) + } + PipelineWay::Custom(p) => { + let request = parse_export_logs_service_request(request); + let mut result = Vec::new(); + let mut intermediate_state = p.init_intermediate_state(); + for v in request { + p.prepare_pipeline_value(v, &mut intermediate_state) + .context(OpenTelemetryLogSnafu)?; + let r = p + .exec_mut(&mut intermediate_state) + .context(OpenTelemetryLogSnafu)?; + result.push(r); + } + let len = result.len(); + let rows = Rows { + schema: p.schemas().clone(), + rows: result, + }; + let insert_request = RowInsertRequest { + rows: Some(rows), + table_name, + }; + let insert_requests = RowInsertRequests { + inserts: vec![insert_request], + }; + Ok((insert_requests, len)) + } + } +} + +fn scope_to_pipeline_value( + scope: Option, +) -> (PipelineValue, PipelineValue, PipelineValue) { + scope + .map(|x| { + ( + PipelineValue::Map(Map { + values: key_value_to_map(x.attributes), + }), + PipelineValue::String(x.version), + PipelineValue::String(x.name), + ) + }) + .unwrap_or(( + PipelineValue::Null, + PipelineValue::Null, + PipelineValue::Null, + )) +} + +fn scope_to_jsonb( + scope: Option, +) -> (JsonbValue<'static>, Option, Option) { + scope + .map(|x| { + ( + key_value_to_jsonb(x.attributes), + Some(x.version), + Some(x.name), + ) + }) + .unwrap_or((JsonbValue::Null, None, None)) +} + +fn log_to_pipeline_value( + log: LogRecord, + resource_schema_url: PipelineValue, + resource_attr: PipelineValue, + scope_schema_url: PipelineValue, + scope_name: PipelineValue, + scope_version: PipelineValue, + scope_attrs: PipelineValue, +) -> PipelineValue { + let log_attrs = PipelineValue::Map(Map { + values: key_value_to_map(log.attributes), + }); + let mut map = BTreeMap::new(); + map.insert( + "Timestamp".to_string(), + PipelineValue::Uint64(log.time_unix_nano), + ); + map.insert( + "ObservedTimestamp".to_string(), + PipelineValue::Uint64(log.observed_time_unix_nano), + ); + + // need to be convert to string + map.insert( + "TraceId".to_string(), + PipelineValue::String(bytes_to_hex_string(&log.trace_id)), + ); + map.insert( + "SpanId".to_string(), + PipelineValue::String(bytes_to_hex_string(&log.span_id)), + ); + map.insert("TraceFlags".to_string(), PipelineValue::Uint32(log.flags)); + map.insert( + "SeverityText".to_string(), + PipelineValue::String(log.severity_text), + ); + map.insert( + "SeverityNumber".to_string(), + PipelineValue::Int32(log.severity_number), + ); + // need to be convert to string + map.insert( + "Body".to_string(), + log.body + .as_ref() + .map(|x| PipelineValue::String(log_body_to_string(x))) + .unwrap_or(PipelineValue::Null), + ); + map.insert("ResourceSchemaUrl".to_string(), resource_schema_url); + + map.insert("ResourceAttributes".to_string(), resource_attr); + map.insert("ScopeSchemaUrl".to_string(), scope_schema_url); + map.insert("ScopeName".to_string(), scope_name); + map.insert("ScopeVersion".to_string(), scope_version); + map.insert("ScopeAttributes".to_string(), scope_attrs); + map.insert("LogAttributes".to_string(), log_attrs); + PipelineValue::Map(Map { values: map }) +} + +fn build_identity_schema() -> Vec { + [ + ( + "scope_name", + ColumnDataType::String, + SemanticType::Tag, + None, + None, + ), + ( + "scope_version", + ColumnDataType::String, + SemanticType::Field, + None, + None, + ), + ( + "scope_attributes", + ColumnDataType::Binary, + SemanticType::Field, + Some(ColumnDataTypeExtension { + type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())), + }), + None, + ), + ( + "resource_attributes", + ColumnDataType::Binary, + SemanticType::Field, + Some(ColumnDataTypeExtension { + type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())), + }), + None, + ), + ( + "log_attributes", + ColumnDataType::Binary, + SemanticType::Field, + Some(ColumnDataTypeExtension { + type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())), + }), + None, + ), + ( + "timestamp", + ColumnDataType::TimestampNanosecond, + SemanticType::Timestamp, + None, + None, + ), + ( + "observed_timestamp", + ColumnDataType::TimestampNanosecond, + SemanticType::Field, + None, + None, + ), + ( + "trace_id", + ColumnDataType::String, + SemanticType::Tag, + None, + None, + ), + ( + "span_id", + ColumnDataType::String, + SemanticType::Tag, + None, + None, + ), + ( + "trace_flags", + ColumnDataType::Uint32, + SemanticType::Field, + None, + None, + ), + ( + "severity_text", + ColumnDataType::String, + SemanticType::Field, + None, + None, + ), + ( + "severity_number", + ColumnDataType::Int32, + SemanticType::Field, + None, + None, + ), + ( + "body", + ColumnDataType::String, + SemanticType::Field, + None, + Some(ColumnOptions { + options: StdHashMap::from([( + "fulltext".to_string(), + r#"{"enable":true}"#.to_string(), + )]), + }), + ), + ] + .into_iter() + .map( + |(field_name, column_type, semantic_type, datatype_extension, options)| ColumnSchema { + column_name: field_name.to_string(), + datatype: column_type as i32, + semantic_type: semantic_type as i32, + datatype_extension, + options, + }, + ) + .collect::>() +} + +fn build_identity_row( + log: LogRecord, + resource_attr: JsonbValue<'_>, + scope_name: Option, + scope_version: Option, + scope_attrs: JsonbValue<'_>, +) -> Row { + let row = vec![ + GreptimeValue { + value_data: scope_name.map(ValueData::StringValue), + }, + GreptimeValue { + value_data: scope_version.map(ValueData::StringValue), + }, + GreptimeValue { + value_data: Some(ValueData::BinaryValue(scope_attrs.to_vec())), + }, + GreptimeValue { + value_data: Some(ValueData::BinaryValue(resource_attr.to_vec())), + }, + GreptimeValue { + value_data: Some(ValueData::BinaryValue( + key_value_to_jsonb(log.attributes).to_vec(), + )), + }, + GreptimeValue { + value_data: Some(ValueData::TimestampNanosecondValue( + log.time_unix_nano as i64, + )), + }, + GreptimeValue { + value_data: Some(ValueData::TimestampNanosecondValue( + log.observed_time_unix_nano as i64, + )), + }, + GreptimeValue { + value_data: Some(ValueData::StringValue(bytes_to_hex_string(&log.trace_id))), + }, + GreptimeValue { + value_data: Some(ValueData::StringValue(bytes_to_hex_string(&log.span_id))), + }, + GreptimeValue { + value_data: Some(ValueData::U32Value(log.flags)), + }, + GreptimeValue { + value_data: Some(ValueData::StringValue(log.severity_text)), + }, + GreptimeValue { + value_data: Some(ValueData::I32Value(log.severity_number)), + }, + GreptimeValue { + value_data: log + .body + .as_ref() + .map(|x| ValueData::StringValue(log_body_to_string(x))), + }, + ]; + + Row { values: row } +} + +fn parse_export_logs_service_request_to_rows(request: ExportLogsServiceRequest) -> Rows { + let mut result = Vec::new(); + for r in request.resource_logs { + let resource_attr = r + .resource + .map(|x| key_value_to_jsonb(x.attributes)) + .unwrap_or(JsonbValue::Null); + for scope_logs in r.scope_logs { + let (scope_attrs, scope_version, scope_name) = scope_to_jsonb(scope_logs.scope); + for log in scope_logs.log_records { + let value = build_identity_row( + log, + resource_attr.clone(), + scope_name.clone(), + scope_version.clone(), + scope_attrs.clone(), + ); + result.push(value); + } + } + } + Rows { + schema: build_identity_schema(), + rows: result, + } +} + +/// transform otlp logs request to pipeline value +/// https://opentelemetry.io/docs/concepts/signals/logs/ +fn parse_export_logs_service_request(request: ExportLogsServiceRequest) -> Vec { + let mut result = Vec::new(); + for r in request.resource_logs { + let resource_attr = r + .resource + .map(|x| { + PipelineValue::Map(Map { + values: key_value_to_map(x.attributes), + }) + }) + .unwrap_or(PipelineValue::Null); + let resource_schema_url = PipelineValue::String(r.schema_url); + for scope_logs in r.scope_logs { + let (scope_attrs, scope_version, scope_name) = + scope_to_pipeline_value(scope_logs.scope); + let scope_schema_url = PipelineValue::String(scope_logs.schema_url); + for log in scope_logs.log_records { + let value = log_to_pipeline_value( + log, + resource_schema_url.clone(), + resource_attr.clone(), + scope_schema_url.clone(), + scope_name.clone(), + scope_version.clone(), + scope_attrs.clone(), + ); + result.push(value); + } + } + } + result +} + +// convert AnyValue to pipeline value +fn any_value_to_pipeline_value(value: any_value::Value) -> PipelineValue { + match value { + any_value::Value::StringValue(s) => PipelineValue::String(s), + any_value::Value::IntValue(i) => PipelineValue::Int64(i), + any_value::Value::DoubleValue(d) => PipelineValue::Float64(d), + any_value::Value::BoolValue(b) => PipelineValue::Boolean(b), + any_value::Value::ArrayValue(a) => { + let values = a + .values + .into_iter() + .map(|v| match v.value { + Some(value) => any_value_to_pipeline_value(value), + None => PipelineValue::Null, + }) + .collect(); + PipelineValue::Array(Array { values }) + } + any_value::Value::KvlistValue(kv) => { + let value = key_value_to_map(kv.values); + PipelineValue::Map(Map { values: value }) + } + any_value::Value::BytesValue(b) => PipelineValue::String(bytes_to_hex_string(&b)), + } +} + +// convert otlp keyValue vec to map +fn key_value_to_map(key_values: Vec) -> BTreeMap { + let mut map = BTreeMap::new(); + for kv in key_values { + let value = match kv.value { + Some(value) => match value.value { + Some(value) => any_value_to_pipeline_value(value), + None => PipelineValue::Null, + }, + None => PipelineValue::Null, + }; + map.insert(normalize_otlp_name(&kv.key), value); + } + map +} + +fn any_value_to_jsonb(value: any_value::Value) -> JsonbValue<'static> { + match value { + any_value::Value::StringValue(s) => JsonbValue::String(s.into()), + any_value::Value::IntValue(i) => JsonbValue::Number(JsonbNumber::Int64(i)), + any_value::Value::DoubleValue(d) => JsonbValue::Number(JsonbNumber::Float64(d)), + any_value::Value::BoolValue(b) => JsonbValue::Bool(b), + any_value::Value::ArrayValue(a) => { + let values = a + .values + .into_iter() + .map(|v| match v.value { + Some(value) => any_value_to_jsonb(value), + None => JsonbValue::Null, + }) + .collect(); + JsonbValue::Array(values) + } + any_value::Value::KvlistValue(kv) => key_value_to_jsonb(kv.values), + any_value::Value::BytesValue(b) => JsonbValue::String(bytes_to_hex_string(&b).into()), + } +} + +fn key_value_to_jsonb(key_values: Vec) -> JsonbValue<'static> { + let mut map = BTreeMap::new(); + for kv in key_values { + let value = match kv.value { + Some(value) => match value.value { + Some(value) => any_value_to_jsonb(value), + None => JsonbValue::Null, + }, + None => JsonbValue::Null, + }; + map.insert(normalize_otlp_name(&kv.key), value); + } + JsonbValue::Object(map) +} + +fn log_body_to_string(body: &AnyValue) -> String { + let otlp_value = OtlpAnyValue::from(body); + otlp_value.to_string() +} diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index 1fe64e652265..a1ad9997ba7b 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -33,9 +33,10 @@ use api::v1::RowInsertRequests; use async_trait::async_trait; use common_query::Output; use headers::HeaderValue; +use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; -use pipeline::{GreptimeTransformer, Pipeline, PipelineInfo, PipelineVersion}; +use pipeline::{GreptimeTransformer, Pipeline, PipelineInfo, PipelineVersion, PipelineWay}; use serde_json::Value; use session::context::QueryContextRef; @@ -105,7 +106,7 @@ pub trait PromStoreProtocolHandler { } #[async_trait] -pub trait OpenTelemetryProtocolHandler { +pub trait OpenTelemetryProtocolHandler: LogHandler { /// Handling opentelemetry metrics request async fn metrics( &self, @@ -119,6 +120,14 @@ pub trait OpenTelemetryProtocolHandler { request: ExportTraceServiceRequest, ctx: QueryContextRef, ) -> Result; + + async fn logs( + &self, + request: ExportLogsServiceRequest, + pipeline: PipelineWay, + table_name: String, + ctx: QueryContextRef, + ) -> Result; } /// LogHandler is responsible for handling log related requests. diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index e11060fbbda5..8d56d384c76e 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -17,10 +17,11 @@ use std::io::Write; use api::prom_store::remote::WriteRequest; use auth::user_provider_from_option; -use axum::http::{HeaderName, StatusCode}; +use axum::http::{HeaderName, HeaderValue, StatusCode}; use common_error::status_code::StatusCode as ErrorCode; use flate2::write::GzEncoder; use flate2::Compression; +use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; use opentelemetry_proto::tonic::metrics::v1::ResourceMetrics; @@ -89,6 +90,7 @@ macro_rules! http_tests { test_otlp_metrics, test_otlp_traces, + test_otlp_logs, ); )* }; @@ -1429,7 +1431,7 @@ pub async fn test_otlp_metrics(store_type: StorageType) { let client = TestClient::new(app); // write metrics data - let res = send_req(&client, "/v1/otlp/v1/metrics", body.clone(), false).await; + let res = send_req(&client, vec![], "/v1/otlp/v1/metrics", body.clone(), false).await; assert_eq!(StatusCode::OK, res.status()); // select metrics data @@ -1441,7 +1443,7 @@ pub async fn test_otlp_metrics(store_type: StorageType) { assert_eq!(res.status(), StatusCode::OK); // write metrics data with gzip - let res = send_req(&client, "/v1/otlp/v1/metrics", body.clone(), true).await; + let res = send_req(&client, vec![], "/v1/otlp/v1/metrics", body.clone(), true).await; assert_eq!(StatusCode::OK, res.status()); // select metrics data again @@ -1466,7 +1468,7 @@ pub async fn test_otlp_traces(store_type: StorageType) { let client = TestClient::new(app); // write traces data - let res = send_req(&client, "/v1/otlp/v1/traces", body.clone(), false).await; + let res = send_req(&client, vec![], "/v1/otlp/v1/traces", body.clone(), false).await; assert_eq!(StatusCode::OK, res.status()); // select traces data @@ -1481,7 +1483,7 @@ pub async fn test_otlp_traces(store_type: StorageType) { assert_eq!(res.status(), StatusCode::OK); // write metrics data with gzip - let res = send_req(&client, "/v1/otlp/v1/traces", body.clone(), true).await; + let res = send_req(&client, vec![], "/v1/otlp/v1/traces", body.clone(), true).await; assert_eq!(StatusCode::OK, res.status()); // select metrics data again @@ -1490,6 +1492,40 @@ pub async fn test_otlp_traces(store_type: StorageType) { guard.remove_all().await; } +pub async fn test_otlp_logs(store_type: StorageType) { + common_telemetry::init_default_ut_logging(); + let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_otlp_traces").await; + + let content = r#" +{"resourceLogs":[{"resource":{"attributes":[{"key":"resource-attr","value":{"stringValue":"resource-attr-val-1"}}]},"schemaUrl":"https://opentelemetry.io/schemas/1.0.0/resourceLogs","scopeLogs":[{"scope":{},"schemaUrl":"https://opentelemetry.io/schemas/1.0.0/scopeLogs","logRecords":[{"flags":1,"timeUnixNano":1581452773000009875,"observedTimeUnixNano":1581452773000009875,"severityNumber":9,"severityText":"Info","body":{"value":{"stringValue":"This is a log message"}},"attributes":[{"key":"app","value":{"stringValue":"server"}},{"key":"instance_num","value":{"intValue":1}}],"droppedAttributesCount":1,"traceId":[48,56,48,52,48,50,48,49,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48],"spanId":[48,49,48,50,48,52,48,56,48,48,48,48,48,48,48,48]},{"flags":1,"timeUnixNano":1581452773000000789,"observedTimeUnixNano":1581452773000000789,"severityNumber":9,"severityText":"Info","body":{"value":{"stringValue":"something happened"}},"attributes":[{"key":"customer","value":{"stringValue":"acme"}},{"key":"env","value":{"stringValue":"dev"}}],"droppedAttributesCount":1,"traceId":[48],"spanId":[48]}]}]}]} +"#; + + let req: ExportLogsServiceRequest = serde_json::from_str(content).unwrap(); + let body = req.encode_to_vec(); + + // handshake + let client = TestClient::new(app); + + // write traces data + let res = send_req( + &client, + vec![( + HeaderName::from_static("x-greptime-table-name"), + HeaderValue::from_static("logs"), + )], + "/v1/otlp/v1/logs?db=public", + body.clone(), + false, + ) + .await; + assert_eq!(StatusCode::OK, res.status()); + + let expected = r#"[["","",{},{"resource_attr":"resource-attr-val-1"},{"customer":"acme","env":"dev"},1581452773000000789,1581452773000000789,"30","30",1,"Info",9,"something happened"],["","",{},{"resource_attr":"resource-attr-val-1"},{"app":"server","instance_num":1},1581452773000009875,1581452773000009875,"3038303430323031303030303030303030303030303030303030303030303030","30313032303430383030303030303030",1,"Info",9,"This is a log message"]]"#; + validate_data(&client, "select * from logs;", expected).await; + + guard.remove_all().await; +} + async fn validate_data(client: &TestClient, sql: &str, expected: &str) { let res = client .get(format!("/v1/sql?sql={sql}").as_str()) @@ -1502,11 +1538,21 @@ async fn validate_data(client: &TestClient, sql: &str, expected: &str) { assert_eq!(v, expected); } -async fn send_req(client: &TestClient, path: &str, body: Vec, with_gzip: bool) -> TestResponse { +async fn send_req( + client: &TestClient, + headers: Vec<(HeaderName, HeaderValue)>, + path: &str, + body: Vec, + with_gzip: bool, +) -> TestResponse { let mut req = client .post(path) .header("content-type", "application/x-protobuf"); + for (k, v) in headers { + req = req.header(k, v); + } + let mut len = body.len(); if with_gzip {