From d673a85326b74c676f37ce308d841ee485480aad Mon Sep 17 00:00:00 2001 From: paomian Date: Fri, 20 Sep 2024 15:19:50 +0800 Subject: [PATCH 1/5] chore: otlp logs api --- Cargo.lock | 3 + Cargo.toml | 1 + src/frontend/src/instance/otlp.rs | 32 +- src/frontend/src/metrics.rs | 12 + src/pipeline/Cargo.toml | 1 + src/pipeline/src/etl.rs | 28 + .../transform/transformer/greptime/coerce.rs | 27 +- src/pipeline/src/etl/value.rs | 40 ++ src/servers/Cargo.toml | 1 + src/servers/src/error.rs | 10 +- src/servers/src/http.rs | 1 + src/servers/src/http/otlp.rs | 164 +++++- src/servers/src/otlp.rs | 1 + src/servers/src/otlp/logs.rs | 543 ++++++++++++++++++ src/servers/src/query_handler.rs | 15 +- 15 files changed, 849 insertions(+), 30 deletions(-) create mode 100644 src/servers/src/otlp/logs.rs diff --git a/Cargo.lock b/Cargo.lock index 991829a75813..ec42ce4e2e53 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7281,6 +7281,7 @@ dependencies = [ "ordered-float 4.2.2", "percent-encoding", "rand", + "serde_json", "thiserror", ] @@ -7900,6 +7901,7 @@ dependencies = [ "futures", "greptime-proto", "itertools 0.10.5", + "jsonb", "lazy_static", "moka", "once_cell", @@ -10439,6 +10441,7 @@ dependencies = [ name = "servers" version = "0.9.3" dependencies = [ + "ahash 0.8.11", "aide", "api", "arrow", diff --git a/Cargo.toml b/Cargo.toml index 44b2cda1227f..b0e78249c3c2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -137,6 +137,7 @@ opentelemetry-proto = { version = "0.5", features = [ "metrics", "trace", "with-serde", + "logs", ] } parquet = { version = "51.0.0", default-features = false, features = ["arrow", "async", "object_store"] } paste = "1.0" diff --git a/src/frontend/src/instance/otlp.rs b/src/frontend/src/instance/otlp.rs index 09335af0804e..0e0419688e5b 100644 --- a/src/frontend/src/instance/otlp.rs +++ b/src/frontend/src/instance/otlp.rs @@ -17,18 +17,19 @@ use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use client::Output; use common_error::ext::BoxedError; use common_telemetry::tracing; +use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; use servers::error::{self, AuthSnafu, Result as ServerResult}; use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef}; use servers::otlp; use servers::otlp::plugin::TraceParserRef; -use servers::query_handler::OpenTelemetryProtocolHandler; +use servers::query_handler::{OpenTelemetryProtocolHandler, PipelineWay}; use session::context::QueryContextRef; use snafu::ResultExt; use crate::instance::Instance; -use crate::metrics::{OTLP_METRICS_ROWS, OTLP_TRACES_ROWS}; +use crate::metrics::{OTLP_LOGS_ROWS, OTLP_METRICS_ROWS, OTLP_TRACES_ROWS}; #[async_trait] impl OpenTelemetryProtocolHandler for Instance { @@ -92,4 +93,31 @@ impl OpenTelemetryProtocolHandler for Instance { .map_err(BoxedError::new) .context(error::ExecuteGrpcQuerySnafu) } + + #[tracing::instrument(skip_all)] + async fn logs( + &self, + request: ExportLogsServiceRequest, + pipeline: PipelineWay, + table_name: String, + ctx: QueryContextRef, + ) -> ServerResult { + self.plugins + .get::() + .as_ref() + .check_permission(ctx.current_user(), PermissionReq::Otlp) + .context(AuthSnafu)?; + + let interceptor_ref = self + .plugins + .get::>(); + interceptor_ref.pre_execute(ctx.clone())?; + let (requests, rows) = otlp::logs::to_grpc_insert_requests(request, pipeline, table_name)?; + OTLP_LOGS_ROWS.inc_by(rows as u64); + + self.handle_row_inserts(requests, ctx) + .await + .map_err(BoxedError::new) + .context(error::ExecuteGrpcQuerySnafu) + } } diff --git a/src/frontend/src/metrics.rs b/src/frontend/src/metrics.rs index 33580c550e8b..a8bf4eb76eb5 100644 --- a/src/frontend/src/metrics.rs +++ b/src/frontend/src/metrics.rs @@ -41,16 +41,28 @@ lazy_static! { .with_label_values(&["insert"]); pub static ref EXECUTE_SCRIPT_ELAPSED: Histogram = HANDLE_SCRIPT_ELAPSED .with_label_values(&["execute"]); + + /// The number of OpenTelemetry metrics send by frontend node. pub static ref OTLP_METRICS_ROWS: IntCounter = register_int_counter!( "greptime_frontend_otlp_metrics_rows", "frontend otlp metrics rows" ) .unwrap(); + + /// The number of OpenTelemetry traces send by frontend node. pub static ref OTLP_TRACES_ROWS: IntCounter = register_int_counter!( "greptime_frontend_otlp_traces_rows", "frontend otlp traces rows" ) .unwrap(); + + /// The number of OpenTelemetry logs send by frontend node. + pub static ref OTLP_LOGS_ROWS: IntCounter = register_int_counter!( + "greptime_frontend_otlp_logs_rows", + "frontend otlp logs rows" + ) + .unwrap(); + /// The number of heartbeats send by frontend node. pub static ref HEARTBEAT_SENT_COUNT: IntCounter = register_int_counter!( "greptime_frontend_heartbeat_send_count", diff --git a/src/pipeline/Cargo.toml b/src/pipeline/Cargo.toml index 5b85b999feff..2402605f7fe8 100644 --- a/src/pipeline/Cargo.toml +++ b/src/pipeline/Cargo.toml @@ -40,6 +40,7 @@ enum_dispatch = "0.3" futures.workspace = true greptime-proto.workspace = true itertools.workspace = true +jsonb.workspace = true lazy_static.workspace = true moka = { workspace = true, features = ["sync"] } once_cell.workspace = true diff --git a/src/pipeline/src/etl.rs b/src/pipeline/src/etl.rs index f29032e4f8a2..b8a367afc960 100644 --- a/src/pipeline/src/etl.rs +++ b/src/pipeline/src/etl.rs @@ -210,6 +210,34 @@ where self.transformer.transform_mut(val) } + pub fn prepare_pipeline_value(&self, val: Value, result: &mut [Value]) -> Result<()> { + match val { + Value::Map(map) => { + let mut search_from = 0; + for (payload_key, payload_value) in map.values.into_iter() { + if search_from >= self.required_keys.len() { + break; + } + + if let Some(pos) = self.required_keys[search_from..] + .iter() + .position(|k| k == &payload_key) + { + result[search_from + pos] = payload_value; + search_from += pos; + } + } + } + Value::String(_) => { + result[0] = val; + } + _ => { + return PrepareValueMustBeObjectSnafu.fail(); + } + } + Ok(()) + } + pub fn prepare(&self, val: serde_json::Value, result: &mut [Value]) -> Result<()> { match val { serde_json::Value::Object(map) => { diff --git a/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs b/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs index 827613b02b60..489b2e147b6d 100644 --- a/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs +++ b/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs @@ -187,8 +187,8 @@ pub(crate) fn coerce_value(val: &Value, transform: &Transform) -> Result Ok(Some(ValueData::TimestampSecondValue(*s))), - Value::Array(_) => unimplemented!("Array type not supported"), - Value::Map(_) => unimplemented!("Object type not supported"), + Value::Array(_) => coerce_nested_value(val, transform), + Value::Map(_) => coerce_nested_value(val, transform), } } @@ -414,6 +414,29 @@ fn coerce_string_value(s: &String, transform: &Transform) -> Result Result, String> { + match &transform.type_ { + Value::Array(_) | Value::Map(_) => (), + t => { + return Err(format!( + "nested value type not supported {}", + t.to_str_type() + )) + } + } + match v { + Value::Map(_) => { + let data: jsonb::Value = v.into(); + Ok(Some(ValueData::BinaryValue(data.to_vec()))) + } + Value::Array(_) => { + let data: jsonb::Value = v.into(); + Ok(Some(ValueData::BinaryValue(data.to_vec()))) + } + _ => Err(format!("nested type not support {}", v.to_str_type())), + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/pipeline/src/etl/value.rs b/src/pipeline/src/etl/value.rs index 3adde2514b9e..034768a7024e 100644 --- a/src/pipeline/src/etl/value.rs +++ b/src/pipeline/src/etl/value.rs @@ -18,6 +18,7 @@ pub mod time; use ahash::{HashMap, HashMapExt}; pub use array::Array; +use jsonb::{Number as JsonbNumber, Object as JsonbObject, Value as JsonbValue}; pub use map::Map; use snafu::{OptionExt, ResultExt}; pub use time::Timestamp; @@ -112,6 +113,7 @@ impl Value { "array" => Ok(Value::Array(Array::default())), "map" => Ok(Value::Map(Map::default())), + "json" => Ok(Value::Map(Map::default())), _ => ValueParseTypeSnafu { t }.fail(), } @@ -331,3 +333,41 @@ impl TryFrom<&yaml_rust::Yaml> for Value { } } } + +impl<'a> From<&Value> for JsonbValue<'a> { + fn from(value: &Value) -> Self { + match value { + Value::Null => JsonbValue::Null, + Value::Boolean(v) => JsonbValue::Bool(*v), + + Value::Int8(v) => JsonbValue::Number(JsonbNumber::Int64(*v as i64)), + Value::Int16(v) => JsonbValue::Number(JsonbNumber::Int64(*v as i64)), + Value::Int32(v) => JsonbValue::Number(JsonbNumber::Int64(*v as i64)), + Value::Int64(v) => JsonbValue::Number(JsonbNumber::Int64(*v)), + + Value::Uint8(v) => JsonbValue::Number(JsonbNumber::UInt64(*v as u64)), + Value::Uint16(v) => JsonbValue::Number(JsonbNumber::UInt64(*v as u64)), + Value::Uint32(v) => JsonbValue::Number(JsonbNumber::UInt64(*v as u64)), + Value::Uint64(v) => JsonbValue::Number(JsonbNumber::UInt64(*v)), + Value::Float32(v) => JsonbValue::Number(JsonbNumber::Float64(*v as f64)), + Value::Float64(v) => JsonbValue::Number(JsonbNumber::Float64(*v)), + Value::String(v) => JsonbValue::String(v.clone().into()), + Value::Timestamp(v) => JsonbValue::String(v.to_string().into()), + Value::Array(arr) => { + let mut vals: Vec = Vec::with_capacity(arr.len()); + for val in arr.iter() { + vals.push(val.into()); + } + JsonbValue::Array(vals) + } + Value::Map(obj) => { + let mut map = JsonbObject::new(); + for (k, v) in obj.iter() { + let val: JsonbValue = v.into(); + map.insert(k.to_string(), val); + } + JsonbValue::Object(map) + } + } + } +} diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index b94fa17d44c0..6fad5644e573 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -14,6 +14,7 @@ testing = [] workspace = true [dependencies] +ahash = "0.8" aide = { version = "0.9", features = ["axum"] } api.workspace = true arrow.workspace = true diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 0fde3b527c84..2f616f18f954 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -533,6 +533,13 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("OpenTelemetry log error: {}", error))] + OpenTelemetryLog { + error: String, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -597,7 +604,8 @@ impl ErrorExt for Error { | MysqlValueConversion { .. } | ParseJson { .. } | UnsupportedContentType { .. } - | TimestampOverflow { .. } => StatusCode::InvalidArguments, + | TimestampOverflow { .. } + | OpenTelemetryLog { .. } => StatusCode::InvalidArguments, Catalog { source, .. } => source.status_code(), RowWriter { source, .. } => source.status_code(), diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index a2b72b548b1e..61b17a69fc05 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -866,6 +866,7 @@ impl HttpServer { Router::new() .route("/v1/metrics", routing::post(otlp::metrics)) .route("/v1/traces", routing::post(otlp::traces)) + .route("/v1/logs", routing::post(otlp::logs)) .layer( ServiceBuilder::new() .layer(HandleErrorLayer::new(handle_error)) diff --git a/src/servers/src/http/otlp.rs b/src/servers/src/http/otlp.rs index 3efdaeec96d4..c82b0f5fe14f 100644 --- a/src/servers/src/http/otlp.rs +++ b/src/servers/src/http/otlp.rs @@ -12,35 +12,43 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::result::Result as StdResult; use std::sync::Arc; -use axum::extract::State; -use axum::http::header; +use axum::extract::{FromRequestParts, State}; +use axum::http::header::HeaderValue; +use axum::http::request::Parts; +use axum::http::{header, StatusCode}; use axum::response::IntoResponse; -use axum::Extension; +use axum::{async_trait, Extension}; use bytes::Bytes; use common_telemetry::tracing; +use opentelemetry_proto::tonic::collector::logs::v1::{ + ExportLogsServiceRequest, ExportLogsServiceResponse, +}; use opentelemetry_proto::tonic::collector::metrics::v1::{ ExportMetricsServiceRequest, ExportMetricsServiceResponse, }; use opentelemetry_proto::tonic::collector::trace::v1::{ ExportTraceServiceRequest, ExportTraceServiceResponse, }; +use pipeline::util::to_pipeline_version; use prost::Message; use session::context::{Channel, QueryContext}; use snafu::prelude::*; use super::header::{write_cost_header_map, CONTENT_TYPE_PROTOBUF}; use crate::error::{self, Result}; -use crate::query_handler::OpenTelemetryProtocolHandlerRef; +use crate::query_handler::{OpenTelemetryProtocolHandlerRef, PipelineWay}; #[axum_macros::debug_handler] #[tracing::instrument(skip_all, fields(protocol = "otlp", request_type = "metrics"))] pub async fn metrics( State(handler): State, Extension(mut query_ctx): Extension, + bytes: Bytes, -) -> Result { +) -> Result> { let db = query_ctx.get_db_string(); query_ctx.set_channel(Channel::Otlp); let query_ctx = Arc::new(query_ctx); @@ -53,7 +61,7 @@ pub async fn metrics( handler .metrics(request, query_ctx) .await - .map(|o| OtlpMetricsResponse { + .map(|o| OtlpResponse { resp_body: ExportMetricsServiceResponse { partial_success: None, }, @@ -61,52 +69,160 @@ pub async fn metrics( }) } -pub struct OtlpMetricsResponse { - resp_body: ExportMetricsServiceResponse, - write_cost: usize, +#[axum_macros::debug_handler] +#[tracing::instrument(skip_all, fields(protocol = "otlp", request_type = "traces"))] +pub async fn traces( + State(handler): State, + Extension(mut query_ctx): Extension, + bytes: Bytes, +) -> Result> { + let db = query_ctx.get_db_string(); + query_ctx.set_channel(Channel::Otlp); + let query_ctx = Arc::new(query_ctx); + let _timer = crate::metrics::METRIC_HTTP_OPENTELEMETRY_TRACES_ELAPSED + .with_label_values(&[db.as_str()]) + .start_timer(); + let request = + ExportTraceServiceRequest::decode(bytes).context(error::DecodeOtlpRequestSnafu)?; + handler + .traces(request, query_ctx) + .await + .map(|o| OtlpResponse { + resp_body: ExportTraceServiceResponse { + partial_success: None, + }, + write_cost: o.meta.cost, + }) } -impl IntoResponse for OtlpMetricsResponse { - fn into_response(self) -> axum::response::Response { - let mut header_map = write_cost_header_map(self.write_cost); - header_map.insert(header::CONTENT_TYPE, CONTENT_TYPE_PROTOBUF.clone()); +pub struct PipelineInfo { + pub pipeline_name: String, + pub pipeline_version: Option, +} - (header_map, self.resp_body.encode_to_vec()).into_response() +fn pipeline_header_error( + header: &HeaderValue, +) -> StdResult { + match header.to_str() { + Ok(s) => Ok(s.to_string()), + Err(_) => Err(( + StatusCode::BAD_REQUEST, + "`X-Pipeline-Name` or `X-Pipeline-Version` header is not string type.", + )), + } +} + +#[async_trait] +impl FromRequestParts for PipelineInfo +where + S: Send + Sync, +{ + type Rejection = (StatusCode, &'static str); + + async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult { + let pipeline_name = parts.headers.get("X-Pipeline-Name"); + let pipeline_version = parts.headers.get("X-Pipeline-Version"); + match (pipeline_name, pipeline_version) { + (Some(name), Some(version)) => Ok(PipelineInfo { + pipeline_name: pipeline_header_error(name)?, + pipeline_version: Some(pipeline_header_error(version)?), + }), + (None, _) => Err(( + StatusCode::BAD_REQUEST, + "`X-Pipeline-Name` header is missing", + )), + (Some(name), None) => Ok(PipelineInfo { + pipeline_name: pipeline_header_error(name)?, + pipeline_version: None, + }), + } + } +} + +pub struct TableInfo { + table_name: String, +} + +#[async_trait] +impl FromRequestParts for TableInfo +where + S: Send + Sync, +{ + type Rejection = (StatusCode, &'static str); + + async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult { + let table_name = parts.headers.get("X-Table-Name"); + match table_name { + Some(name) => Ok(TableInfo { + table_name: pipeline_header_error(name)?, + }), + None => Ok(TableInfo { + table_name: "opentelemetry_logs".to_string(), + }), + } } } #[axum_macros::debug_handler] #[tracing::instrument(skip_all, fields(protocol = "otlp", request_type = "traces"))] -pub async fn traces( +pub async fn logs( State(handler): State, Extension(mut query_ctx): Extension, + pipeline_info: PipelineInfo, + table_info: TableInfo, bytes: Bytes, -) -> Result { +) -> Result> { let db = query_ctx.get_db_string(); query_ctx.set_channel(Channel::Otlp); let query_ctx = Arc::new(query_ctx); let _timer = crate::metrics::METRIC_HTTP_OPENTELEMETRY_TRACES_ELAPSED .with_label_values(&[db.as_str()]) .start_timer(); - let request = - ExportTraceServiceRequest::decode(bytes).context(error::DecodeOtlpRequestSnafu)?; + let request = ExportLogsServiceRequest::decode(bytes).context(error::DecodeOtlpRequestSnafu)?; + + let pipeline_way; + if pipeline_info.pipeline_name == "identity" { + pipeline_way = PipelineWay::Identity; + } else { + let pipeline_version = + to_pipeline_version(pipeline_info.pipeline_version).map_err(|_| { + error::InvalidParameterSnafu { + reason: "X-Pipeline-Version".to_string(), + } + .build() + })?; + let pipeline = match handler + .get_pipeline( + &pipeline_info.pipeline_name, + pipeline_version, + query_ctx.clone(), + ) + .await + { + Ok(p) => p, + Err(e) => { + return Err(e); + } + }; + pipeline_way = PipelineWay::Custom(pipeline); + } handler - .traces(request, query_ctx) + .logs(request, pipeline_way, table_info.table_name, query_ctx) .await - .map(|o| OtlpTracesResponse { - resp_body: ExportTraceServiceResponse { + .map(|o| OtlpResponse { + resp_body: ExportLogsServiceResponse { partial_success: None, }, write_cost: o.meta.cost, }) } -pub struct OtlpTracesResponse { - resp_body: ExportTraceServiceResponse, +pub struct OtlpResponse { + resp_body: T, write_cost: usize, } -impl IntoResponse for OtlpTracesResponse { +impl IntoResponse for OtlpResponse { fn into_response(self) -> axum::response::Response { let mut header_map = write_cost_header_map(self.write_cost); header_map.insert(header::CONTENT_TYPE, CONTENT_TYPE_PROTOBUF.clone()); diff --git a/src/servers/src/otlp.rs b/src/servers/src/otlp.rs index 5b92d12cb629..cc1321ac7020 100644 --- a/src/servers/src/otlp.rs +++ b/src/servers/src/otlp.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod logs; pub mod metrics; pub mod plugin; pub mod trace; diff --git a/src/servers/src/otlp/logs.rs b/src/servers/src/otlp/logs.rs new file mode 100644 index 000000000000..8fc6bed16059 --- /dev/null +++ b/src/servers/src/otlp/logs.rs @@ -0,0 +1,543 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{BTreeMap, HashMap as StdHashMap}; + +use ahash::{HashMap, HashMapExt}; +use api::v1::column_data_type_extension::TypeExt; +use api::v1::value::ValueData; +use api::v1::{ + ColumnDataType, ColumnDataTypeExtension, ColumnOptions, ColumnSchema, JsonTypeExtension, Row, + RowInsertRequest, RowInsertRequests, Rows, SemanticType, Value as GreptimeValue, +}; +use jsonb::{Number as JsonbNumber, Value as JsonbValue}; +use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; +use opentelemetry_proto::tonic::common::v1::{any_value, AnyValue, InstrumentationScope, KeyValue}; +use opentelemetry_proto::tonic::logs::v1::LogRecord; +use pipeline::{Array, Map, Value as PipelineValue}; + +use super::trace::attributes::OtlpAnyValue; +use crate::error::{OpenTelemetryLogSnafu, Result}; +use crate::otlp::trace::span::bytes_to_hex_string; +use crate::query_handler::PipelineWay; + +/// Normalize otlp instrumentation, metric and attribute names +/// +/// +/// - since the name are case-insensitive, we transform them to lowercase for +/// - better sql usability +/// - replace `.` and `-` with `_` +fn normalize_otlp_name(name: &str) -> String { + name.to_lowercase().replace(|c| c == '.' || c == '-', "_") +} + +/// Convert OpenTelemetry metrics to GreptimeDB insert requests +/// +/// See +/// +/// for data structure of OTLP metrics. +/// +/// Returns `InsertRequests` and total number of rows to ingest +pub fn to_grpc_insert_requests( + request: ExportLogsServiceRequest, + pipeline: PipelineWay, + table_name: String, +) -> Result<(RowInsertRequests, usize)> { + match pipeline { + PipelineWay::Identity => { + let rows = parse_export_logs_service_request_to_rows(request); + let len = rows.rows.len(); + let insert_request = RowInsertRequest { + rows: Some(rows), + table_name, + }; + Ok(( + RowInsertRequests { + inserts: vec![insert_request], + }, + len, + )) + } + PipelineWay::Custom(p) => { + let request = parse_export_logs_service_request(request); + let mut result = Vec::new(); + let mut intermediate_state = p.init_intermediate_state(); + for v in request { + p.prepare_pipeline_value(v, &mut intermediate_state) + .map_err(|e| OpenTelemetryLogSnafu { error: e }.build())?; + let r = p + .exec_mut(&mut intermediate_state) + .map_err(|e| OpenTelemetryLogSnafu { error: e }.build())?; + result.push(r); + } + let len = result.len(); + let rows = Rows { + schema: p.schemas().clone(), + rows: result, + }; + let insert_request = RowInsertRequest { + rows: Some(rows), + table_name, + }; + let insert_requests = RowInsertRequests { + inserts: vec![insert_request], + }; + Ok((insert_requests, len)) + } + } +} + +fn scope_to_pipeline_value( + scope: Option, +) -> (PipelineValue, PipelineValue, PipelineValue) { + scope + .map(|x| { + ( + PipelineValue::Map(Map { + values: key_value_to_map(x.attributes), + }), + PipelineValue::String(x.version), + PipelineValue::String(x.name), + ) + }) + .unwrap_or(( + PipelineValue::Null, + PipelineValue::Null, + PipelineValue::Null, + )) +} + +fn scope_to_jsonb( + scope: Option, +) -> (JsonbValue<'static>, Option, Option) { + scope + .map(|x| { + ( + key_value_to_jsonb(x.attributes), + Some(x.version), + Some(x.name), + ) + }) + .unwrap_or((JsonbValue::Null, None, None)) +} + +fn log_to_pipeline_value( + log: LogRecord, + resource_schema_url: PipelineValue, + resource_attr: PipelineValue, + scope_schema_url: PipelineValue, + scope_name: PipelineValue, + scope_version: PipelineValue, + scope_attrs: PipelineValue, +) -> PipelineValue { + let log_attrs = PipelineValue::Map(Map { + values: key_value_to_map(log.attributes), + }); + let mut map = HashMap::new(); + map.insert( + "Timestamp".to_string(), + PipelineValue::Uint64(log.time_unix_nano), + ); + map.insert( + "ObservedTimestamp".to_string(), + PipelineValue::Uint64(log.observed_time_unix_nano), + ); + + // need to be convert to string + map.insert( + "TraceId".to_string(), + PipelineValue::String(bytes_to_hex_string(&log.trace_id)), + ); + map.insert( + "SpanId".to_string(), + PipelineValue::String(bytes_to_hex_string(&log.span_id)), + ); + map.insert("TraceFlags".to_string(), PipelineValue::Uint32(log.flags)); + map.insert( + "SeverityText".to_string(), + PipelineValue::String(log.severity_text), + ); + map.insert( + "SeverityNumber".to_string(), + PipelineValue::Int32(log.severity_number), + ); + // need to be convert to string + map.insert( + "Body".to_string(), + log.body + .as_ref() + .map(|x| PipelineValue::String(log_body_to_string(x))) + .unwrap_or(PipelineValue::Null), + ); + map.insert("ResourceSchemaUrl".to_string(), resource_schema_url); + + map.insert("ResourceAttributes".to_string(), resource_attr); + map.insert("ScopeSchemaUrl".to_string(), scope_schema_url); + map.insert("ScopeName".to_string(), scope_name); + map.insert("ScopeVersion".to_string(), scope_version); + map.insert("ScopeAttributes".to_string(), scope_attrs); + map.insert("LogAttributes".to_string(), log_attrs); + PipelineValue::Map(Map { values: map }) +} + +fn build_identity_schema() -> Vec { + [ + ( + "scope_name", + ColumnDataType::String, + SemanticType::Tag, + None, + None, + ), + ( + "scope_version", + ColumnDataType::String, + SemanticType::Field, + None, + None, + ), + ( + "scope_attributes", + ColumnDataType::Binary, + SemanticType::Field, + Some(ColumnDataTypeExtension { + type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())), + }), + None, + ), + ( + "scope_schemaUrl", + ColumnDataType::String, + SemanticType::Field, + None, + None, + ), + ( + "resource_schema_url", + ColumnDataType::String, + SemanticType::Field, + None, + None, + ), + ( + "resource_attributes", + ColumnDataType::Binary, + SemanticType::Field, + Some(ColumnDataTypeExtension { + type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())), + }), + None, + ), + ( + "log_attributes", + ColumnDataType::Binary, + SemanticType::Field, + Some(ColumnDataTypeExtension { + type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())), + }), + None, + ), + ( + "timestamp", + ColumnDataType::TimestampNanosecond, + SemanticType::Timestamp, + None, + None, + ), + ( + "observed_timestamp", + ColumnDataType::TimestampNanosecond, + SemanticType::Field, + None, + None, + ), + ( + "trace_id", + ColumnDataType::String, + SemanticType::Tag, + None, + None, + ), + ( + "span_id", + ColumnDataType::String, + SemanticType::Tag, + None, + None, + ), + ( + "trace_flags", + ColumnDataType::Uint32, + SemanticType::Field, + None, + None, + ), + ( + "severity_text", + ColumnDataType::String, + SemanticType::Field, + None, + None, + ), + ( + "severity_number", + ColumnDataType::Int32, + SemanticType::Field, + None, + None, + ), + ( + "body", + ColumnDataType::String, + SemanticType::Field, + None, + Some(ColumnOptions { + options: StdHashMap::from([( + "fulltext".to_string(), + r#"{"enable":true}"#.to_string(), + )]), + }), + ), + ] + .into_iter() + .map( + |(field_name, column_type, semantic_type, datatype_extension, options)| ColumnSchema { + column_name: field_name.to_string(), + datatype: column_type as i32, + semantic_type: semantic_type as i32, + datatype_extension, + options, + }, + ) + .collect::>() +} + +fn build_identity_row( + log: LogRecord, + resource_schema_url: String, + resource_attr: JsonbValue<'_>, + scope_schema_url: String, + scope_name: Option, + scope_version: Option, + scope_attrs: JsonbValue<'_>, +) -> Row { + let row = vec![ + GreptimeValue { + value_data: scope_name.map(ValueData::StringValue), + }, + GreptimeValue { + value_data: scope_version.map(ValueData::StringValue), + }, + GreptimeValue { + value_data: Some(ValueData::BinaryValue(scope_attrs.to_vec())), + }, + GreptimeValue { + value_data: Some(ValueData::StringValue(scope_schema_url)), + }, + GreptimeValue { + value_data: Some(ValueData::StringValue(resource_schema_url)), + }, + GreptimeValue { + value_data: Some(ValueData::BinaryValue(resource_attr.to_vec())), + }, + GreptimeValue { + value_data: Some(ValueData::BinaryValue( + key_value_to_jsonb(log.attributes).to_vec(), + )), + }, + GreptimeValue { + value_data: Some(ValueData::TimestampNanosecondValue( + log.time_unix_nano as i64, + )), + }, + GreptimeValue { + value_data: Some(ValueData::TimestampNanosecondValue( + log.observed_time_unix_nano as i64, + )), + }, + GreptimeValue { + value_data: Some(ValueData::StringValue(bytes_to_hex_string(&log.trace_id))), + }, + GreptimeValue { + value_data: Some(ValueData::StringValue(bytes_to_hex_string(&log.span_id))), + }, + GreptimeValue { + value_data: Some(ValueData::U32Value(log.flags)), + }, + GreptimeValue { + value_data: Some(ValueData::StringValue(log.severity_text)), + }, + GreptimeValue { + value_data: Some(ValueData::I32Value(log.severity_number)), + }, + GreptimeValue { + value_data: log + .body + .as_ref() + .map(|x| ValueData::StringValue(log_body_to_string(x))), + }, + ]; + + Row { values: row } +} + +fn parse_export_logs_service_request_to_rows(request: ExportLogsServiceRequest) -> Rows { + let mut result = Vec::new(); + for r in request.resource_logs { + let resource_attr = r + .resource + .map(|x| key_value_to_jsonb(x.attributes)) + .unwrap_or(JsonbValue::Null); + let resource_schema_url = r.schema_url; + for scope_logs in r.scope_logs { + let (scope_attrs, scope_version, scope_name) = scope_to_jsonb(scope_logs.scope); + let scope_schema_url = scope_logs.schema_url; + for log in scope_logs.log_records { + let value = build_identity_row( + log, + resource_schema_url.clone(), + resource_attr.clone(), + scope_schema_url.clone(), + scope_name.clone(), + scope_version.clone(), + scope_attrs.clone(), + ); + result.push(value); + } + } + } + Rows { + schema: build_identity_schema(), + rows: result, + } +} + +/// transform otlp logs request to pipeline value +/// https://opentelemetry.io/docs/concepts/signals/logs/ +fn parse_export_logs_service_request(request: ExportLogsServiceRequest) -> Vec { + let mut result = Vec::new(); + for r in request.resource_logs { + let resource_attr = r + .resource + .map(|x| { + PipelineValue::Map(Map { + values: key_value_to_map(x.attributes), + }) + }) + .unwrap_or(PipelineValue::Null); + let resource_schema_url = PipelineValue::String(r.schema_url); + for scope_logs in r.scope_logs { + let (scope_attrs, scope_version, scope_name) = + scope_to_pipeline_value(scope_logs.scope); + let scope_schema_url = PipelineValue::String(scope_logs.schema_url); + for log in scope_logs.log_records { + let value = log_to_pipeline_value( + log, + resource_schema_url.clone(), + resource_attr.clone(), + scope_schema_url.clone(), + scope_name.clone(), + scope_version.clone(), + scope_attrs.clone(), + ); + result.push(value); + } + } + } + result +} + +// convert AnyValue to pipeline value +fn any_value_to_pipeline_value(value: any_value::Value) -> PipelineValue { + match value { + any_value::Value::StringValue(s) => PipelineValue::String(s), + any_value::Value::IntValue(i) => PipelineValue::Int64(i), + any_value::Value::DoubleValue(d) => PipelineValue::Float64(d), + any_value::Value::BoolValue(b) => PipelineValue::Boolean(b), + any_value::Value::ArrayValue(a) => { + let values = a + .values + .into_iter() + .map(|v| match v.value { + Some(value) => any_value_to_pipeline_value(value), + None => PipelineValue::Null, + }) + .collect(); + PipelineValue::Array(Array { values }) + } + any_value::Value::KvlistValue(kv) => { + let value = key_value_to_map(kv.values); + PipelineValue::Map(Map { values: value }) + } + any_value::Value::BytesValue(b) => PipelineValue::String(bytes_to_hex_string(&b)), + } +} + +// convert otlp keyValue vec to map +fn key_value_to_map(key_values: Vec) -> HashMap { + let mut map = HashMap::new(); + for kv in key_values { + let value = match kv.value { + Some(value) => match value.value { + Some(value) => any_value_to_pipeline_value(value), + None => PipelineValue::Null, + }, + None => PipelineValue::Null, + }; + map.insert(normalize_otlp_name(&kv.key), value); + } + map +} + +fn any_value_to_jsonb(value: any_value::Value) -> JsonbValue<'static> { + match value { + any_value::Value::StringValue(s) => JsonbValue::String(s.into()), + any_value::Value::IntValue(i) => JsonbValue::Number(JsonbNumber::Int64(i)), + any_value::Value::DoubleValue(d) => JsonbValue::Number(JsonbNumber::Float64(d)), + any_value::Value::BoolValue(b) => JsonbValue::Bool(b), + any_value::Value::ArrayValue(a) => { + let values = a + .values + .into_iter() + .map(|v| match v.value { + Some(value) => any_value_to_jsonb(value), + None => JsonbValue::Null, + }) + .collect(); + JsonbValue::Array(values) + } + any_value::Value::KvlistValue(kv) => key_value_to_jsonb(kv.values), + any_value::Value::BytesValue(b) => JsonbValue::String(bytes_to_hex_string(&b).into()), + } +} + +fn key_value_to_jsonb(key_values: Vec) -> JsonbValue<'static> { + let mut map = BTreeMap::new(); + for kv in key_values { + let value = match kv.value { + Some(value) => match value.value { + Some(value) => any_value_to_jsonb(value), + None => JsonbValue::Null, + }, + None => JsonbValue::Null, + }; + map.insert(normalize_otlp_name(&kv.key), value); + } + JsonbValue::Object(map) +} + +fn log_body_to_string(body: &AnyValue) -> String { + let otlp_value = OtlpAnyValue::from(body); + otlp_value.to_string() +} diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index 1fe64e652265..5415ee33d9ba 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -33,6 +33,7 @@ use api::v1::RowInsertRequests; use async_trait::async_trait; use common_query::Output; use headers::HeaderValue; +use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; use pipeline::{GreptimeTransformer, Pipeline, PipelineInfo, PipelineVersion}; @@ -104,8 +105,12 @@ pub trait PromStoreProtocolHandler { async fn ingest_metrics(&self, metrics: Metrics) -> Result<()>; } +pub enum PipelineWay { + Identity, + Custom(Arc>), +} #[async_trait] -pub trait OpenTelemetryProtocolHandler { +pub trait OpenTelemetryProtocolHandler: LogHandler { /// Handling opentelemetry metrics request async fn metrics( &self, @@ -119,6 +124,14 @@ pub trait OpenTelemetryProtocolHandler { request: ExportTraceServiceRequest, ctx: QueryContextRef, ) -> Result; + + async fn logs( + &self, + request: ExportLogsServiceRequest, + pipeline: PipelineWay, + table_name: String, + ctx: QueryContextRef, + ) -> Result; } /// LogHandler is responsible for handling log related requests. From 1d0bca2296fce1ddd7bc22d1ac671f2e09834b2b Mon Sep 17 00:00:00 2001 From: paomian Date: Sun, 22 Sep 2024 17:11:05 +0800 Subject: [PATCH 2/5] feat: add API to write OpenTelemetry logs to GreptimeDB --- src/servers/src/http/otlp.rs | 29 ++++++++-------- src/servers/src/metrics.rs | 7 ++++ tests-integration/tests/http.rs | 59 +++++++++++++++++++++++++++++---- 3 files changed, 73 insertions(+), 22 deletions(-) diff --git a/src/servers/src/http/otlp.rs b/src/servers/src/http/otlp.rs index c82b0f5fe14f..53883cc5e705 100644 --- a/src/servers/src/http/otlp.rs +++ b/src/servers/src/http/otlp.rs @@ -96,7 +96,7 @@ pub async fn traces( } pub struct PipelineInfo { - pub pipeline_name: String, + pub pipeline_name: Option, pub pipeline_version: Option, } @@ -124,15 +124,15 @@ where let pipeline_version = parts.headers.get("X-Pipeline-Version"); match (pipeline_name, pipeline_version) { (Some(name), Some(version)) => Ok(PipelineInfo { - pipeline_name: pipeline_header_error(name)?, + pipeline_name: Some(pipeline_header_error(name)?), pipeline_version: Some(pipeline_header_error(version)?), }), - (None, _) => Err(( - StatusCode::BAD_REQUEST, - "`X-Pipeline-Name` header is missing", - )), + (None, _) => Ok(PipelineInfo { + pipeline_name: None, + pipeline_version: None, + }), (Some(name), None) => Ok(PipelineInfo { - pipeline_name: pipeline_header_error(name)?, + pipeline_name: Some(pipeline_header_error(name)?), pipeline_version: None, }), } @@ -175,15 +175,13 @@ pub async fn logs( let db = query_ctx.get_db_string(); query_ctx.set_channel(Channel::Otlp); let query_ctx = Arc::new(query_ctx); - let _timer = crate::metrics::METRIC_HTTP_OPENTELEMETRY_TRACES_ELAPSED + let _timer = crate::metrics::METRIC_HTTP_OPENTELEMETRY_LOGS_ELAPSED .with_label_values(&[db.as_str()]) .start_timer(); let request = ExportLogsServiceRequest::decode(bytes).context(error::DecodeOtlpRequestSnafu)?; let pipeline_way; - if pipeline_info.pipeline_name == "identity" { - pipeline_way = PipelineWay::Identity; - } else { + if let Some(pipeline_name) = &pipeline_info.pipeline_name { let pipeline_version = to_pipeline_version(pipeline_info.pipeline_version).map_err(|_| { error::InvalidParameterSnafu { @@ -192,11 +190,7 @@ pub async fn logs( .build() })?; let pipeline = match handler - .get_pipeline( - &pipeline_info.pipeline_name, - pipeline_version, - query_ctx.clone(), - ) + .get_pipeline(pipeline_name, pipeline_version, query_ctx.clone()) .await { Ok(p) => p, @@ -205,7 +199,10 @@ pub async fn logs( } }; pipeline_way = PipelineWay::Custom(pipeline); + } else { + pipeline_way = PipelineWay::Identity; } + handler .logs(request, pipeline_way, table_info.table_name, query_ctx) .await diff --git a/src/servers/src/metrics.rs b/src/servers/src/metrics.rs index cdf927536f04..ead86f3ad88b 100644 --- a/src/servers/src/metrics.rs +++ b/src/servers/src/metrics.rs @@ -141,6 +141,13 @@ lazy_static! { &[METRIC_DB_LABEL] ) .unwrap(); + pub static ref METRIC_HTTP_OPENTELEMETRY_LOGS_ELAPSED: HistogramVec = + register_histogram_vec!( + "greptime_servers_http_otlp_logs_elapsed", + "servers http otlp logs elapsed", + &[METRIC_DB_LABEL] + ) + .unwrap(); pub static ref METRIC_HTTP_LOGS_INGESTION_COUNTER: IntCounterVec = register_int_counter_vec!( "greptime_servers_http_logs_ingestion_counter", "servers http logs ingestion counter", diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index e11060fbbda5..cb34e8582e4c 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -17,10 +17,11 @@ use std::io::Write; use api::prom_store::remote::WriteRequest; use auth::user_provider_from_option; -use axum::http::{HeaderName, StatusCode}; +use axum::http::{HeaderName, HeaderValue, StatusCode}; use common_error::status_code::StatusCode as ErrorCode; use flate2::write::GzEncoder; use flate2::Compression; +use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; use opentelemetry_proto::tonic::metrics::v1::ResourceMetrics; @@ -89,6 +90,7 @@ macro_rules! http_tests { test_otlp_metrics, test_otlp_traces, + test_otlp_logs, ); )* }; @@ -1429,7 +1431,7 @@ pub async fn test_otlp_metrics(store_type: StorageType) { let client = TestClient::new(app); // write metrics data - let res = send_req(&client, "/v1/otlp/v1/metrics", body.clone(), false).await; + let res = send_req(&client, vec![], "/v1/otlp/v1/metrics", body.clone(), false).await; assert_eq!(StatusCode::OK, res.status()); // select metrics data @@ -1441,7 +1443,7 @@ pub async fn test_otlp_metrics(store_type: StorageType) { assert_eq!(res.status(), StatusCode::OK); // write metrics data with gzip - let res = send_req(&client, "/v1/otlp/v1/metrics", body.clone(), true).await; + let res = send_req(&client, vec![], "/v1/otlp/v1/metrics", body.clone(), true).await; assert_eq!(StatusCode::OK, res.status()); // select metrics data again @@ -1466,7 +1468,7 @@ pub async fn test_otlp_traces(store_type: StorageType) { let client = TestClient::new(app); // write traces data - let res = send_req(&client, "/v1/otlp/v1/traces", body.clone(), false).await; + let res = send_req(&client, vec![], "/v1/otlp/v1/traces", body.clone(), false).await; assert_eq!(StatusCode::OK, res.status()); // select traces data @@ -1481,7 +1483,7 @@ pub async fn test_otlp_traces(store_type: StorageType) { assert_eq!(res.status(), StatusCode::OK); // write metrics data with gzip - let res = send_req(&client, "/v1/otlp/v1/traces", body.clone(), true).await; + let res = send_req(&client, vec![], "/v1/otlp/v1/traces", body.clone(), true).await; assert_eq!(StatusCode::OK, res.status()); // select metrics data again @@ -1490,6 +1492,41 @@ pub async fn test_otlp_traces(store_type: StorageType) { guard.remove_all().await; } +pub async fn test_otlp_logs(store_type: StorageType) { + common_telemetry::init_default_ut_logging(); + let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_otlp_traces").await; + + let content = r#" +{"resourceLogs":[{"resource":{"attributes":[{"key":"resource-attr","value":{"stringValue":"resource-attr-val-1"}}]},"schemaUrl":"https://opentelemetry.io/schemas/1.0.0/resourceLogs","scopeLogs":[{"scope":{},"schemaUrl":"https://opentelemetry.io/schemas/1.0.0/scopeLogs","logRecords":[{"flags":1,"timeUnixNano":1581452773000009875,"observedTimeUnixNano":1581452773000009875,"severityNumber":9,"severityText":"Info","body":{"stringValue":"This is a log message"},"attributes":[{"key":"app","value":{"stringValue":"server"}},{"key":"instance_num","value":{"intValue":1}}],"droppedAttributesCount":1,"traceId":[48,56,48,52,48,50,48,49,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48],"spanId":[48,49,48,50,48,52,48,56,48,48,48,48,48,48,48,48]},{"flags":1,"timeUnixNano":1581452773000000789,"observedTimeUnixNano":1581452773000000789,"severityNumber":9,"severityText":"Info","body":{"stringValue":"something happened"},"attributes":[{"key":"customer","value":{"stringValue":"acme"}},{"key":"env","value":{"stringValue":"dev"}}],"droppedAttributesCount":1,"traceId":[48],"spanId":[48]}]}]}]} +"#; + + let req: ExportLogsServiceRequest = serde_json::from_str(content).unwrap(); + let body = req.encode_to_vec(); + + // handshake + let client = TestClient::new(app); + + // write traces data + let res = send_req( + &client, + vec![( + HeaderName::from_static("x-table-name"), + HeaderValue::from_static("logs"), + )], + "/v1/otlp/v1/logs?db=public", + body.clone(), + false, + ) + .await; + assert_eq!(StatusCode::OK, res.status()); + + // TODO(qtang): we show convert jsonb to json string in http sql API + let expected = r#"[["","",[64,0,0,0],"https://opentelemetry.io/schemas/1.0.0/scopeLogs","https://opentelemetry.io/schemas/1.0.0/resourceLogs",[64,0,0,1,16,0,0,13,16,0,0,19,114,101,115,111,117,114,99,101,95,97,116,116,114,114,101,115,111,117,114,99,101,45,97,116,116,114,45,118,97,108,45,49],[64,0,0,2,16,0,0,8,16,0,0,3,16,0,0,4,16,0,0,3,99,117,115,116,111,109,101,114,101,110,118,97,99,109,101,100,101,118],1581452773000000789,1581452773000000789,"30","30",1,"Info",9,"null"],["","",[64,0,0,0],"https://opentelemetry.io/schemas/1.0.0/scopeLogs","https://opentelemetry.io/schemas/1.0.0/resourceLogs",[64,0,0,1,16,0,0,13,16,0,0,19,114,101,115,111,117,114,99,101,95,97,116,116,114,114,101,115,111,117,114,99,101,45,97,116,116,114,45,118,97,108,45,49],[64,0,0,2,16,0,0,3,16,0,0,12,16,0,0,6,32,0,0,2,97,112,112,105,110,115,116,97,110,99,101,95,110,117,109,115,101,114,118,101,114,64,1],1581452773000009875,1581452773000009875,"3038303430323031303030303030303030303030303030303030303030303030","30313032303430383030303030303030",1,"Info",9,"null"]]"#; + validate_data(&client, "select * from logs;", expected).await; + + guard.remove_all().await; +} + async fn validate_data(client: &TestClient, sql: &str, expected: &str) { let res = client .get(format!("/v1/sql?sql={sql}").as_str()) @@ -1502,11 +1539,21 @@ async fn validate_data(client: &TestClient, sql: &str, expected: &str) { assert_eq!(v, expected); } -async fn send_req(client: &TestClient, path: &str, body: Vec, with_gzip: bool) -> TestResponse { +async fn send_req( + client: &TestClient, + headers: Vec<(HeaderName, HeaderValue)>, + path: &str, + body: Vec, + with_gzip: bool, +) -> TestResponse { let mut req = client .post(path) .header("content-type", "application/x-protobuf"); + for (k, v) in headers { + req = req.header(k, v); + } + let mut len = body.len(); if with_gzip { From 0f78d3535d4b65ece5cb0fd29167522f83c2a0a8 Mon Sep 17 00:00:00 2001 From: paomian Date: Mon, 23 Sep 2024 16:50:56 +0800 Subject: [PATCH 3/5] chore: fix test data schema error --- tests-integration/tests/http.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index cb34e8582e4c..a9ca8d0598d9 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -1497,7 +1497,7 @@ pub async fn test_otlp_logs(store_type: StorageType) { let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_otlp_traces").await; let content = r#" -{"resourceLogs":[{"resource":{"attributes":[{"key":"resource-attr","value":{"stringValue":"resource-attr-val-1"}}]},"schemaUrl":"https://opentelemetry.io/schemas/1.0.0/resourceLogs","scopeLogs":[{"scope":{},"schemaUrl":"https://opentelemetry.io/schemas/1.0.0/scopeLogs","logRecords":[{"flags":1,"timeUnixNano":1581452773000009875,"observedTimeUnixNano":1581452773000009875,"severityNumber":9,"severityText":"Info","body":{"stringValue":"This is a log message"},"attributes":[{"key":"app","value":{"stringValue":"server"}},{"key":"instance_num","value":{"intValue":1}}],"droppedAttributesCount":1,"traceId":[48,56,48,52,48,50,48,49,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48],"spanId":[48,49,48,50,48,52,48,56,48,48,48,48,48,48,48,48]},{"flags":1,"timeUnixNano":1581452773000000789,"observedTimeUnixNano":1581452773000000789,"severityNumber":9,"severityText":"Info","body":{"stringValue":"something happened"},"attributes":[{"key":"customer","value":{"stringValue":"acme"}},{"key":"env","value":{"stringValue":"dev"}}],"droppedAttributesCount":1,"traceId":[48],"spanId":[48]}]}]}]} +{"resourceLogs":[{"resource":{"attributes":[{"key":"resource-attr","value":{"stringValue":"resource-attr-val-1"}}]},"schemaUrl":"https://opentelemetry.io/schemas/1.0.0/resourceLogs","scopeLogs":[{"scope":{},"schemaUrl":"https://opentelemetry.io/schemas/1.0.0/scopeLogs","logRecords":[{"flags":1,"timeUnixNano":1581452773000009875,"observedTimeUnixNano":1581452773000009875,"severityNumber":9,"severityText":"Info","body":{"value":{"stringValue":"This is a log message"}},"attributes":[{"key":"app","value":{"stringValue":"server"}},{"key":"instance_num","value":{"intValue":1}}],"droppedAttributesCount":1,"traceId":[48,56,48,52,48,50,48,49,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48],"spanId":[48,49,48,50,48,52,48,56,48,48,48,48,48,48,48,48]},{"flags":1,"timeUnixNano":1581452773000000789,"observedTimeUnixNano":1581452773000000789,"severityNumber":9,"severityText":"Info","body":{"value":{"stringValue":"something happened"}},"attributes":[{"key":"customer","value":{"stringValue":"acme"}},{"key":"env","value":{"stringValue":"dev"}}],"droppedAttributesCount":1,"traceId":[48],"spanId":[48]}]}]}]} "#; let req: ExportLogsServiceRequest = serde_json::from_str(content).unwrap(); @@ -1521,7 +1521,7 @@ pub async fn test_otlp_logs(store_type: StorageType) { assert_eq!(StatusCode::OK, res.status()); // TODO(qtang): we show convert jsonb to json string in http sql API - let expected = r#"[["","",[64,0,0,0],"https://opentelemetry.io/schemas/1.0.0/scopeLogs","https://opentelemetry.io/schemas/1.0.0/resourceLogs",[64,0,0,1,16,0,0,13,16,0,0,19,114,101,115,111,117,114,99,101,95,97,116,116,114,114,101,115,111,117,114,99,101,45,97,116,116,114,45,118,97,108,45,49],[64,0,0,2,16,0,0,8,16,0,0,3,16,0,0,4,16,0,0,3,99,117,115,116,111,109,101,114,101,110,118,97,99,109,101,100,101,118],1581452773000000789,1581452773000000789,"30","30",1,"Info",9,"null"],["","",[64,0,0,0],"https://opentelemetry.io/schemas/1.0.0/scopeLogs","https://opentelemetry.io/schemas/1.0.0/resourceLogs",[64,0,0,1,16,0,0,13,16,0,0,19,114,101,115,111,117,114,99,101,95,97,116,116,114,114,101,115,111,117,114,99,101,45,97,116,116,114,45,118,97,108,45,49],[64,0,0,2,16,0,0,3,16,0,0,12,16,0,0,6,32,0,0,2,97,112,112,105,110,115,116,97,110,99,101,95,110,117,109,115,101,114,118,101,114,64,1],1581452773000009875,1581452773000009875,"3038303430323031303030303030303030303030303030303030303030303030","30313032303430383030303030303030",1,"Info",9,"null"]]"#; + let expected = r#"[{"records":{"schema":{"column_schemas":[{"name":"scope_name","data_type":"String"},{"name":"scope_version","data_type":"String"},{"name":"scope_attributes","data_type":"Json"},{"name":"scope_schemaUrl","data_type":"String"},{"name":"resource_schema_url","data_type":"String"},{"name":"resource_attributes","data_type":"Json"},{"name":"log_attributes","data_type":"Json"},{"name":"timestamp","data_type":"TimestampNanosecond"},{"name":"observed_timestamp","data_type":"TimestampNanosecond"},{"name":"trace_id","data_type":"String"},{"name":"span_id","data_type":"String"},{"name":"trace_flags","data_type":"UInt32"},{"name":"severity_text","data_type":"String"},{"name":"severity_number","data_type":"Int32"},{"name":"body","data_type":"String"}]},"rows":[["","",[64,0,0,0],"https://opentelemetry.io/schemas/1.0.0/scopeLogs","https://opentelemetry.io/schemas/1.0.0/resourceLogs",[64,0,0,1,16,0,0,13,16,0,0,19,114,101,115,111,117,114,99,101,95,97,116,116,114,114,101,115,111,117,114,99,101,45,97,116,116,114,45,118,97,108,45,49],[64,0,0,2,16,0,0,8,16,0,0,3,16,0,0,4,16,0,0,3,99,117,115,116,111,109,101,114,101,110,118,97,99,109,101,100,101,118],1581452773000000789,1581452773000000789,"30","30",1,"Info",9,"something happened"],["","",[64,0,0,0],"https://opentelemetry.io/schemas/1.0.0/scopeLogs","https://opentelemetry.io/schemas/1.0.0/resourceLogs",[64,0,0,1,16,0,0,13,16,0,0,19,114,101,115,111,117,114,99,101,95,97,116,116,114,114,101,115,111,117,114,99,101,45,97,116,116,114,45,118,97,108,45,49],[64,0,0,2,16,0,0,3,16,0,0,12,16,0,0,6,32,0,0,2,97,112,112,105,110,115,116,97,110,99,101,95,110,117,109,115,101,114,118,101,114,64,1],1581452773000009875,1581452773000009875,"3038303430323031303030303030303030303030303030303030303030303030","30313032303430383030303030303030",1,"Info",9,"This is a log message"]]"#; validate_data(&client, "select * from logs;", expected).await; guard.remove_all().await; From b756f82d089b42004d6101f8f7f79b20a6dcaef8 Mon Sep 17 00:00:00 2001 From: paomian Date: Mon, 23 Sep 2024 17:22:36 +0800 Subject: [PATCH 4/5] chore: modify the underlying data structure of the pipeline value map type from hashmap to btremap to keep key order --- src/pipeline/src/etl.rs | 3 +++ src/pipeline/src/etl/processor/cmcd.rs | 7 ++++--- src/pipeline/src/etl/processor/regex.rs | 6 ++++-- src/pipeline/src/etl/value.rs | 7 ++++--- src/pipeline/src/etl/value/map.rs | 16 +++++++++++----- src/servers/src/otlp/logs.rs | 7 +++---- 6 files changed, 29 insertions(+), 17 deletions(-) diff --git a/src/pipeline/src/etl.rs b/src/pipeline/src/etl.rs index b8a367afc960..748493331c93 100644 --- a/src/pipeline/src/etl.rs +++ b/src/pipeline/src/etl.rs @@ -214,16 +214,19 @@ where match val { Value::Map(map) => { let mut search_from = 0; + // because of the key in the json map is ordered for (payload_key, payload_value) in map.values.into_iter() { if search_from >= self.required_keys.len() { break; } + // because of map key is ordered, required_keys is ordered too if let Some(pos) = self.required_keys[search_from..] .iter() .position(|k| k == &payload_key) { result[search_from + pos] = payload_value; + // next search from is always after the current key search_from += pos; } } diff --git a/src/pipeline/src/etl/processor/cmcd.rs b/src/pipeline/src/etl/processor/cmcd.rs index f43186b94aa0..06cfeb7c6905 100644 --- a/src/pipeline/src/etl/processor/cmcd.rs +++ b/src/pipeline/src/etl/processor/cmcd.rs @@ -402,7 +402,8 @@ impl Processor for CmcdProcessor { #[cfg(test)] mod tests { - use ahash::HashMap; + use std::collections::BTreeMap; + use urlencoding::decode; use super::{CmcdProcessorBuilder, CMCD_KEYS}; @@ -563,14 +564,14 @@ mod tests { let values = vec .into_iter() .map(|(k, v)| (k.to_string(), v)) - .collect::>(); + .collect::>(); let expected = Map { values }; let actual = processor.parse(0, &decoded).unwrap(); let actual = actual .into_iter() .map(|(index, value)| (intermediate_keys[index].clone(), value)) - .collect::>(); + .collect::>(); let actual = Map { values: actual }; assert_eq!(actual, expected); } diff --git a/src/pipeline/src/etl/processor/regex.rs b/src/pipeline/src/etl/processor/regex.rs index a74c19140c19..de25195f99ab 100644 --- a/src/pipeline/src/etl/processor/regex.rs +++ b/src/pipeline/src/etl/processor/regex.rs @@ -383,6 +383,8 @@ impl Processor for RegexProcessor { } #[cfg(test)] mod tests { + use std::collections::BTreeMap; + use ahash::{HashMap, HashMapExt}; use itertools::Itertools; @@ -475,14 +477,14 @@ ignore_missing: false"#; .map(|k| k.to_string()) .collect_vec(); let processor = builder.build(&intermediate_keys).unwrap(); - let mut result = HashMap::new(); + let mut result = BTreeMap::new(); for (index, pattern) in processor.patterns.iter().enumerate() { let r = processor .process(&breadcrumbs_str, pattern, (0, index)) .unwrap() .into_iter() .map(|(k, v)| (intermediate_keys[k].clone(), v)) - .collect::>(); + .collect::>(); result.extend(r); } let map = Map { values: result }; diff --git a/src/pipeline/src/etl/value.rs b/src/pipeline/src/etl/value.rs index 034768a7024e..f2617aff3ba7 100644 --- a/src/pipeline/src/etl/value.rs +++ b/src/pipeline/src/etl/value.rs @@ -16,7 +16,8 @@ pub mod array; pub mod map; pub mod time; -use ahash::{HashMap, HashMapExt}; +use std::collections::BTreeMap; + pub use array::Array; use jsonb::{Number as JsonbNumber, Object as JsonbObject, Value as JsonbValue}; pub use map::Map; @@ -289,7 +290,7 @@ impl TryFrom for Value { Ok(Value::Array(Array { values })) } serde_json::Value::Object(v) => { - let mut values = HashMap::with_capacity(v.len()); + let mut values = BTreeMap::new(); for (k, v) in v { values.insert(k, Value::try_from(v)?); } @@ -320,7 +321,7 @@ impl TryFrom<&yaml_rust::Yaml> for Value { Ok(Value::Array(Array { values })) } yaml_rust::Yaml::Hash(v) => { - let mut values = HashMap::new(); + let mut values = BTreeMap::new(); for (k, v) in v { let key = k .as_str() diff --git a/src/pipeline/src/etl/value/map.rs b/src/pipeline/src/etl/value/map.rs index b8b81da7563b..c9d9cd25f79c 100644 --- a/src/pipeline/src/etl/value/map.rs +++ b/src/pipeline/src/etl/value/map.rs @@ -12,19 +12,21 @@ // See the License for the specific language governing permissions and // limitations under the License. -use ahash::{HashMap, HashMapExt}; +use std::collections::BTreeMap; + +use ahash::HashMap; use crate::etl::value::Value; #[derive(Debug, Clone, PartialEq)] pub struct Map { - pub values: HashMap, + pub values: BTreeMap, } impl Default for Map { fn default() -> Self { Self { - values: HashMap::with_capacity(30), + values: BTreeMap::default(), } } } @@ -47,12 +49,16 @@ impl Map { impl From> for Map { fn from(values: HashMap) -> Self { - Map { values } + let mut map = Map::default(); + for (k, v) in values.into_iter() { + map.insert(k, v); + } + map } } impl std::ops::Deref for Map { - type Target = HashMap; + type Target = BTreeMap; fn deref(&self) -> &Self::Target { &self.values diff --git a/src/servers/src/otlp/logs.rs b/src/servers/src/otlp/logs.rs index 8fc6bed16059..b9c653e51551 100644 --- a/src/servers/src/otlp/logs.rs +++ b/src/servers/src/otlp/logs.rs @@ -14,7 +14,6 @@ use std::collections::{BTreeMap, HashMap as StdHashMap}; -use ahash::{HashMap, HashMapExt}; use api::v1::column_data_type_extension::TypeExt; use api::v1::value::ValueData; use api::v1::{ @@ -144,7 +143,7 @@ fn log_to_pipeline_value( let log_attrs = PipelineValue::Map(Map { values: key_value_to_map(log.attributes), }); - let mut map = HashMap::new(); + let mut map = BTreeMap::new(); map.insert( "Timestamp".to_string(), PipelineValue::Uint64(log.time_unix_nano), @@ -485,8 +484,8 @@ fn any_value_to_pipeline_value(value: any_value::Value) -> PipelineValue { } // convert otlp keyValue vec to map -fn key_value_to_map(key_values: Vec) -> HashMap { - let mut map = HashMap::new(); +fn key_value_to_map(key_values: Vec) -> BTreeMap { + let mut map = BTreeMap::new(); for kv in key_values { let value = match kv.value { Some(value) => match value.value { From 58bf15f81fe99ab200b128a6cc1a83091eaa14a2 Mon Sep 17 00:00:00 2001 From: paomian Date: Thu, 26 Sep 2024 17:34:41 +0800 Subject: [PATCH 5/5] chore: fix by pr comment --- src/datatypes/src/value.rs | 25 ++++++++-- src/frontend/src/instance/otlp.rs | 5 +- src/pipeline/src/etl.rs | 5 ++ src/pipeline/src/etl/error.rs | 12 +++++ .../transform/transformer/greptime/coerce.rs | 18 ++++--- src/pipeline/src/etl/value/map.rs | 10 +--- src/pipeline/src/lib.rs | 2 +- src/servers/src/error.rs | 4 +- src/servers/src/http.rs | 12 ++++- src/servers/src/http/otlp.rs | 48 +++++++++++++------ src/servers/src/otlp/logs.rs | 34 ++----------- src/servers/src/query_handler.rs | 6 +-- tests-integration/tests/http.rs | 5 +- 13 files changed, 108 insertions(+), 78 deletions(-) diff --git a/src/datatypes/src/value.rs b/src/datatypes/src/value.rs index a8e59da51355..a02e6178ab0c 100644 --- a/src/datatypes/src/value.rs +++ b/src/datatypes/src/value.rs @@ -38,6 +38,7 @@ use snafu::{ensure, ResultExt}; use crate::error::{self, ConvertArrowArrayToScalarsSnafu, Error, Result, TryFromValueSnafu}; use crate::prelude::*; +use crate::schema::ColumnSchema; use crate::type_id::LogicalTypeId; use crate::types::{IntervalType, ListType}; use crate::vectors::ListVector; @@ -1237,10 +1238,16 @@ impl<'a> From>> for ValueRef<'a> { } } -impl<'a> TryFrom> for serde_json::Value { +pub struct ColumnPair<'a> { + pub value: ValueRef<'a>, + pub schema: &'a ColumnSchema, +} + +impl<'a> TryFrom> for serde_json::Value { type Error = serde_json::Error; - fn try_from(value: ValueRef<'a>) -> serde_json::Result { + fn try_from(value: ColumnPair<'a>) -> serde_json::Result { + let ColumnPair { value, schema } = value; let json_value = match value { ValueRef::Null => serde_json::Value::Null, ValueRef::Boolean(v) => serde_json::Value::Bool(v), @@ -1255,7 +1262,19 @@ impl<'a> TryFrom> for serde_json::Value { ValueRef::Float32(v) => serde_json::Value::from(v.0), ValueRef::Float64(v) => serde_json::Value::from(v.0), ValueRef::String(bytes) => serde_json::Value::String(bytes.to_string()), - ValueRef::Binary(bytes) => serde_json::to_value(bytes)?, + ValueRef::Binary(bytes) => { + if let ConcreteDataType::Json(_) = schema.data_type { + match jsonb::from_slice(bytes) { + Ok(json) => json.into(), + Err(e) => { + error!(e; "Failed to parse jsonb"); + serde_json::Value::Null + } + } + } else { + serde_json::to_value(bytes)? + } + } ValueRef::Date(v) => serde_json::Value::Number(v.val().into()), ValueRef::DateTime(v) => serde_json::Value::Number(v.val().into()), ValueRef::List(v) => serde_json::to_value(v)?, diff --git a/src/frontend/src/instance/otlp.rs b/src/frontend/src/instance/otlp.rs index 0e0419688e5b..30d6c6cb42d9 100644 --- a/src/frontend/src/instance/otlp.rs +++ b/src/frontend/src/instance/otlp.rs @@ -20,11 +20,12 @@ use common_telemetry::tracing; use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; +use pipeline::PipelineWay; use servers::error::{self, AuthSnafu, Result as ServerResult}; use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef}; use servers::otlp; use servers::otlp::plugin::TraceParserRef; -use servers::query_handler::{OpenTelemetryProtocolHandler, PipelineWay}; +use servers::query_handler::OpenTelemetryProtocolHandler; use session::context::QueryContextRef; use snafu::ResultExt; @@ -113,10 +114,10 @@ impl OpenTelemetryProtocolHandler for Instance { .get::>(); interceptor_ref.pre_execute(ctx.clone())?; let (requests, rows) = otlp::logs::to_grpc_insert_requests(request, pipeline, table_name)?; - OTLP_LOGS_ROWS.inc_by(rows as u64); self.handle_row_inserts(requests, ctx) .await + .inspect(|_| OTLP_LOGS_ROWS.inc_by(rows as u64)) .map_err(BoxedError::new) .context(error::ExecuteGrpcQuerySnafu) } diff --git a/src/pipeline/src/etl.rs b/src/pipeline/src/etl.rs index 748493331c93..bec06cab8bfb 100644 --- a/src/pipeline/src/etl.rs +++ b/src/pipeline/src/etl.rs @@ -317,6 +317,11 @@ pub(crate) fn find_key_index(intermediate_keys: &[String], key: &str, kind: &str .context(IntermediateKeyIndexSnafu { kind, key }) } +pub enum PipelineWay { + Identity, + Custom(std::sync::Arc>), +} + #[cfg(test)] mod tests { diff --git a/src/pipeline/src/etl/error.rs b/src/pipeline/src/etl/error.rs index 3680053ba0d7..57d3d52ea7e9 100644 --- a/src/pipeline/src/etl/error.rs +++ b/src/pipeline/src/etl/error.rs @@ -438,6 +438,18 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + #[snafu(display("can not coerce nested type to {ty}"))] + CoerceNestedType { + ty: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("can not coerce {ty} to nested type"))] + CoerceTypeToNested { + ty: String, + #[snafu(implicit)] + location: Location, + }, #[snafu(display( "invalid resolution: '{resolution}'. Available resolutions: {valid_resolution}" diff --git a/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs b/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs index 489b2e147b6d..48356bc96e67 100644 --- a/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs +++ b/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs @@ -20,7 +20,8 @@ use greptime_proto::v1::{ColumnDataType, ColumnSchema, SemanticType}; use snafu::ResultExt; use crate::etl::error::{ - CoerceStringToTypeSnafu, CoerceUnsupportedEpochTypeSnafu, CoerceUnsupportedNullTypeSnafu, + CoerceNestedTypeSnafu, CoerceStringToTypeSnafu, CoerceTypeToNestedSnafu, + CoerceUnsupportedEpochTypeSnafu, CoerceUnsupportedNullTypeSnafu, CoerceUnsupportedNullTypeToSnafu, ColumnOptionsSnafu, Error, Result, }; use crate::etl::transform::index::Index; @@ -414,14 +415,14 @@ fn coerce_string_value(s: &String, transform: &Transform) -> Result Result, String> { +fn coerce_nested_value(v: &Value, transform: &Transform) -> Result> { match &transform.type_ { Value::Array(_) | Value::Map(_) => (), t => { - return Err(format!( - "nested value type not supported {}", - t.to_str_type() - )) + return CoerceNestedTypeSnafu { + ty: t.to_str_type(), + } + .fail(); } } match v { @@ -433,7 +434,10 @@ fn coerce_nested_value(v: &Value, transform: &Transform) -> Result Err(format!("nested type not support {}", v.to_str_type())), + _ => CoerceTypeToNestedSnafu { + ty: v.to_str_type(), + } + .fail(), } } diff --git a/src/pipeline/src/etl/value/map.rs b/src/pipeline/src/etl/value/map.rs index c9d9cd25f79c..7450fed20d70 100644 --- a/src/pipeline/src/etl/value/map.rs +++ b/src/pipeline/src/etl/value/map.rs @@ -18,19 +18,11 @@ use ahash::HashMap; use crate::etl::value::Value; -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Default)] pub struct Map { pub values: BTreeMap, } -impl Default for Map { - fn default() -> Self { - Self { - values: BTreeMap::default(), - } - } -} - impl Map { pub fn one(key: impl Into, value: Value) -> Map { let mut map = Map::default(); diff --git a/src/pipeline/src/lib.rs b/src/pipeline/src/lib.rs index 8fc72c584484..a4d9767804d3 100644 --- a/src/pipeline/src/lib.rs +++ b/src/pipeline/src/lib.rs @@ -20,7 +20,7 @@ pub use etl::error::Result; pub use etl::processor::Processor; pub use etl::transform::{GreptimeTransformer, Transformer}; pub use etl::value::{Array, Map, Value}; -pub use etl::{parse, Content, Pipeline}; +pub use etl::{error as etl_error, parse, Content, Pipeline, PipelineWay}; pub use manager::{ error, pipeline_operator, table, util, PipelineInfo, PipelineRef, PipelineTableRef, PipelineVersion, diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 2f616f18f954..ebee059e1ddf 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -534,9 +534,9 @@ pub enum Error { location: Location, }, - #[snafu(display("OpenTelemetry log error: {}", error))] + #[snafu(display("OpenTelemetry log error"))] OpenTelemetryLog { - error: String, + source: pipeline::etl_error::Error, #[snafu(implicit)] location: Location, }, diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 61b17a69fc05..12db3346e6d1 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -36,6 +36,7 @@ use common_time::timestamp::TimeUnit; use common_time::Timestamp; use datatypes::data_type::DataType; use datatypes::schema::SchemaRef; +use datatypes::value::ColumnPair; use event::{LogState, LogValidatorRef}; use futures::FutureExt; use schemars::JsonSchema; @@ -239,14 +240,21 @@ impl HttpRecordsOutput { } else { let num_rows = recordbatches.iter().map(|r| r.num_rows()).sum::(); let mut rows = Vec::with_capacity(num_rows); + let schemas = schema.column_schemas(); let num_cols = schema.column_schemas().len(); rows.resize_with(num_rows, || Vec::with_capacity(num_cols)); let mut finished_row_cursor = 0; for recordbatch in recordbatches { - for col in recordbatch.columns() { + for (col_idx, col) in recordbatch.columns().iter().enumerate() { + // safety here: schemas length is equal to the number of columns in the recordbatch + let schema = &schemas[col_idx]; for row_idx in 0..recordbatch.num_rows() { - let value = Value::try_from(col.get_ref(row_idx)).context(ToJsonSnafu)?; + let column_pair = ColumnPair { + value: col.get_ref(row_idx), + schema, + }; + let value = Value::try_from(column_pair).context(ToJsonSnafu)?; rows[row_idx + finished_row_cursor].push(value); } } diff --git a/src/servers/src/http/otlp.rs b/src/servers/src/http/otlp.rs index 53883cc5e705..349bb2a3b9eb 100644 --- a/src/servers/src/http/otlp.rs +++ b/src/servers/src/http/otlp.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use core::str; use std::result::Result as StdResult; use std::sync::Arc; @@ -33,13 +34,18 @@ use opentelemetry_proto::tonic::collector::trace::v1::{ ExportTraceServiceRequest, ExportTraceServiceResponse, }; use pipeline::util::to_pipeline_version; +use pipeline::PipelineWay; use prost::Message; use session::context::{Channel, QueryContext}; use snafu::prelude::*; use super::header::{write_cost_header_map, CONTENT_TYPE_PROTOBUF}; use crate::error::{self, Result}; -use crate::query_handler::{OpenTelemetryProtocolHandlerRef, PipelineWay}; +use crate::query_handler::OpenTelemetryProtocolHandlerRef; + +const OTLP_GREPTIME_PIPELINE_NAME_HEADER_NAME: &str = "x-greptime-pipeline-name"; +const OTLP_GREPTIME_PIPELINE_VERSION_HEADER_NAME: &str = "x-greptime-pipeline-version"; +const OTLP_GREPTIME_TABLE_NAME_HEADER_NAME: &str = "x-greptime-table-name"; #[axum_macros::debug_handler] #[tracing::instrument(skip_all, fields(protocol = "otlp", request_type = "metrics"))] @@ -102,12 +108,14 @@ pub struct PipelineInfo { fn pipeline_header_error( header: &HeaderValue, -) -> StdResult { - match header.to_str() { + key: &str, +) -> StdResult { + let header_utf8 = str::from_utf8(header.as_bytes()); + match header_utf8 { Ok(s) => Ok(s.to_string()), Err(_) => Err(( StatusCode::BAD_REQUEST, - "`X-Pipeline-Name` or `X-Pipeline-Version` header is not string type.", + format!("`{}` header is not valid UTF-8 string type.", key), )), } } @@ -117,22 +125,33 @@ impl FromRequestParts for PipelineInfo where S: Send + Sync, { - type Rejection = (StatusCode, &'static str); + type Rejection = (StatusCode, String); async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult { - let pipeline_name = parts.headers.get("X-Pipeline-Name"); - let pipeline_version = parts.headers.get("X-Pipeline-Version"); + let pipeline_name = parts.headers.get(OTLP_GREPTIME_PIPELINE_NAME_HEADER_NAME); + let pipeline_version = parts + .headers + .get(OTLP_GREPTIME_PIPELINE_VERSION_HEADER_NAME); match (pipeline_name, pipeline_version) { (Some(name), Some(version)) => Ok(PipelineInfo { - pipeline_name: Some(pipeline_header_error(name)?), - pipeline_version: Some(pipeline_header_error(version)?), + pipeline_name: Some(pipeline_header_error( + name, + OTLP_GREPTIME_PIPELINE_NAME_HEADER_NAME, + )?), + pipeline_version: Some(pipeline_header_error( + version, + OTLP_GREPTIME_PIPELINE_VERSION_HEADER_NAME, + )?), }), (None, _) => Ok(PipelineInfo { pipeline_name: None, pipeline_version: None, }), (Some(name), None) => Ok(PipelineInfo { - pipeline_name: Some(pipeline_header_error(name)?), + pipeline_name: Some(pipeline_header_error( + name, + OTLP_GREPTIME_PIPELINE_NAME_HEADER_NAME, + )?), pipeline_version: None, }), } @@ -148,13 +167,14 @@ impl FromRequestParts for TableInfo where S: Send + Sync, { - type Rejection = (StatusCode, &'static str); + type Rejection = (StatusCode, String); async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult { - let table_name = parts.headers.get("X-Table-Name"); + let table_name = parts.headers.get(OTLP_GREPTIME_TABLE_NAME_HEADER_NAME); + match table_name { Some(name) => Ok(TableInfo { - table_name: pipeline_header_error(name)?, + table_name: pipeline_header_error(name, OTLP_GREPTIME_TABLE_NAME_HEADER_NAME)?, }), None => Ok(TableInfo { table_name: "opentelemetry_logs".to_string(), @@ -185,7 +205,7 @@ pub async fn logs( let pipeline_version = to_pipeline_version(pipeline_info.pipeline_version).map_err(|_| { error::InvalidParameterSnafu { - reason: "X-Pipeline-Version".to_string(), + reason: OTLP_GREPTIME_PIPELINE_VERSION_HEADER_NAME, } .build() })?; diff --git a/src/servers/src/otlp/logs.rs b/src/servers/src/otlp/logs.rs index b9c653e51551..b8dd0bafe16d 100644 --- a/src/servers/src/otlp/logs.rs +++ b/src/servers/src/otlp/logs.rs @@ -24,12 +24,12 @@ use jsonb::{Number as JsonbNumber, Value as JsonbValue}; use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; use opentelemetry_proto::tonic::common::v1::{any_value, AnyValue, InstrumentationScope, KeyValue}; use opentelemetry_proto::tonic::logs::v1::LogRecord; -use pipeline::{Array, Map, Value as PipelineValue}; +use pipeline::{Array, Map, PipelineWay, Value as PipelineValue}; +use snafu::ResultExt; use super::trace::attributes::OtlpAnyValue; use crate::error::{OpenTelemetryLogSnafu, Result}; use crate::otlp::trace::span::bytes_to_hex_string; -use crate::query_handler::PipelineWay; /// Normalize otlp instrumentation, metric and attribute names /// @@ -74,10 +74,10 @@ pub fn to_grpc_insert_requests( let mut intermediate_state = p.init_intermediate_state(); for v in request { p.prepare_pipeline_value(v, &mut intermediate_state) - .map_err(|e| OpenTelemetryLogSnafu { error: e }.build())?; + .context(OpenTelemetryLogSnafu)?; let r = p .exec_mut(&mut intermediate_state) - .map_err(|e| OpenTelemetryLogSnafu { error: e }.build())?; + .context(OpenTelemetryLogSnafu)?; result.push(r); } let len = result.len(); @@ -215,20 +215,6 @@ fn build_identity_schema() -> Vec { }), None, ), - ( - "scope_schemaUrl", - ColumnDataType::String, - SemanticType::Field, - None, - None, - ), - ( - "resource_schema_url", - ColumnDataType::String, - SemanticType::Field, - None, - None, - ), ( "resource_attributes", ColumnDataType::Binary, @@ -324,9 +310,7 @@ fn build_identity_schema() -> Vec { fn build_identity_row( log: LogRecord, - resource_schema_url: String, resource_attr: JsonbValue<'_>, - scope_schema_url: String, scope_name: Option, scope_version: Option, scope_attrs: JsonbValue<'_>, @@ -341,12 +325,6 @@ fn build_identity_row( GreptimeValue { value_data: Some(ValueData::BinaryValue(scope_attrs.to_vec())), }, - GreptimeValue { - value_data: Some(ValueData::StringValue(scope_schema_url)), - }, - GreptimeValue { - value_data: Some(ValueData::StringValue(resource_schema_url)), - }, GreptimeValue { value_data: Some(ValueData::BinaryValue(resource_attr.to_vec())), }, @@ -398,16 +376,12 @@ fn parse_export_logs_service_request_to_rows(request: ExportLogsServiceRequest) .resource .map(|x| key_value_to_jsonb(x.attributes)) .unwrap_or(JsonbValue::Null); - let resource_schema_url = r.schema_url; for scope_logs in r.scope_logs { let (scope_attrs, scope_version, scope_name) = scope_to_jsonb(scope_logs.scope); - let scope_schema_url = scope_logs.schema_url; for log in scope_logs.log_records { let value = build_identity_row( log, - resource_schema_url.clone(), resource_attr.clone(), - scope_schema_url.clone(), scope_name.clone(), scope_version.clone(), scope_attrs.clone(), diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index 5415ee33d9ba..a1ad9997ba7b 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -36,7 +36,7 @@ use headers::HeaderValue; use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; -use pipeline::{GreptimeTransformer, Pipeline, PipelineInfo, PipelineVersion}; +use pipeline::{GreptimeTransformer, Pipeline, PipelineInfo, PipelineVersion, PipelineWay}; use serde_json::Value; use session::context::QueryContextRef; @@ -105,10 +105,6 @@ pub trait PromStoreProtocolHandler { async fn ingest_metrics(&self, metrics: Metrics) -> Result<()>; } -pub enum PipelineWay { - Identity, - Custom(Arc>), -} #[async_trait] pub trait OpenTelemetryProtocolHandler: LogHandler { /// Handling opentelemetry metrics request diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index a9ca8d0598d9..8d56d384c76e 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -1510,7 +1510,7 @@ pub async fn test_otlp_logs(store_type: StorageType) { let res = send_req( &client, vec![( - HeaderName::from_static("x-table-name"), + HeaderName::from_static("x-greptime-table-name"), HeaderValue::from_static("logs"), )], "/v1/otlp/v1/logs?db=public", @@ -1520,8 +1520,7 @@ pub async fn test_otlp_logs(store_type: StorageType) { .await; assert_eq!(StatusCode::OK, res.status()); - // TODO(qtang): we show convert jsonb to json string in http sql API - let expected = r#"[{"records":{"schema":{"column_schemas":[{"name":"scope_name","data_type":"String"},{"name":"scope_version","data_type":"String"},{"name":"scope_attributes","data_type":"Json"},{"name":"scope_schemaUrl","data_type":"String"},{"name":"resource_schema_url","data_type":"String"},{"name":"resource_attributes","data_type":"Json"},{"name":"log_attributes","data_type":"Json"},{"name":"timestamp","data_type":"TimestampNanosecond"},{"name":"observed_timestamp","data_type":"TimestampNanosecond"},{"name":"trace_id","data_type":"String"},{"name":"span_id","data_type":"String"},{"name":"trace_flags","data_type":"UInt32"},{"name":"severity_text","data_type":"String"},{"name":"severity_number","data_type":"Int32"},{"name":"body","data_type":"String"}]},"rows":[["","",[64,0,0,0],"https://opentelemetry.io/schemas/1.0.0/scopeLogs","https://opentelemetry.io/schemas/1.0.0/resourceLogs",[64,0,0,1,16,0,0,13,16,0,0,19,114,101,115,111,117,114,99,101,95,97,116,116,114,114,101,115,111,117,114,99,101,45,97,116,116,114,45,118,97,108,45,49],[64,0,0,2,16,0,0,8,16,0,0,3,16,0,0,4,16,0,0,3,99,117,115,116,111,109,101,114,101,110,118,97,99,109,101,100,101,118],1581452773000000789,1581452773000000789,"30","30",1,"Info",9,"something happened"],["","",[64,0,0,0],"https://opentelemetry.io/schemas/1.0.0/scopeLogs","https://opentelemetry.io/schemas/1.0.0/resourceLogs",[64,0,0,1,16,0,0,13,16,0,0,19,114,101,115,111,117,114,99,101,95,97,116,116,114,114,101,115,111,117,114,99,101,45,97,116,116,114,45,118,97,108,45,49],[64,0,0,2,16,0,0,3,16,0,0,12,16,0,0,6,32,0,0,2,97,112,112,105,110,115,116,97,110,99,101,95,110,117,109,115,101,114,118,101,114,64,1],1581452773000009875,1581452773000009875,"3038303430323031303030303030303030303030303030303030303030303030","30313032303430383030303030303030",1,"Info",9,"This is a log message"]]"#; + let expected = r#"[["","",{},{"resource_attr":"resource-attr-val-1"},{"customer":"acme","env":"dev"},1581452773000000789,1581452773000000789,"30","30",1,"Info",9,"something happened"],["","",{},{"resource_attr":"resource-attr-val-1"},{"app":"server","instance_num":1},1581452773000009875,1581452773000009875,"3038303430323031303030303030303030303030303030303030303030303030","30313032303430383030303030303030",1,"Info",9,"This is a log message"]]"#; validate_data(&client, "select * from logs;", expected).await; guard.remove_all().await;