Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add API to write OpenTelemetry logs to GreptimeDB #4755

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ opentelemetry-proto = { version = "0.5", features = [
"metrics",
"trace",
"with-serde",
"logs",
] }
parquet = { version = "51.0.0", default-features = false, features = ["arrow", "async", "object_store"] }
paste = "1.0"
Expand Down
25 changes: 22 additions & 3 deletions src/datatypes/src/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use snafu::{ensure, ResultExt};

use crate::error::{self, ConvertArrowArrayToScalarsSnafu, Error, Result, TryFromValueSnafu};
use crate::prelude::*;
use crate::schema::ColumnSchema;
use crate::type_id::LogicalTypeId;
use crate::types::{IntervalType, ListType};
use crate::vectors::ListVector;
Expand Down Expand Up @@ -1237,10 +1238,16 @@ impl<'a> From<Option<ListValueRef<'a>>> for ValueRef<'a> {
}
}

impl<'a> TryFrom<ValueRef<'a>> for serde_json::Value {
pub struct ColumnPair<'a> {
pub value: ValueRef<'a>,
pub schema: &'a ColumnSchema,
}

impl<'a> TryFrom<ColumnPair<'a>> for serde_json::Value {
type Error = serde_json::Error;

fn try_from(value: ValueRef<'a>) -> serde_json::Result<serde_json::Value> {
fn try_from(value: ColumnPair<'a>) -> serde_json::Result<serde_json::Value> {
let ColumnPair { value, schema } = value;
let json_value = match value {
ValueRef::Null => serde_json::Value::Null,
ValueRef::Boolean(v) => serde_json::Value::Bool(v),
Expand All @@ -1255,7 +1262,19 @@ impl<'a> TryFrom<ValueRef<'a>> for serde_json::Value {
ValueRef::Float32(v) => serde_json::Value::from(v.0),
ValueRef::Float64(v) => serde_json::Value::from(v.0),
ValueRef::String(bytes) => serde_json::Value::String(bytes.to_string()),
ValueRef::Binary(bytes) => serde_json::to_value(bytes)?,
ValueRef::Binary(bytes) => {
if let ConcreteDataType::Json(_) = schema.data_type {
match jsonb::from_slice(bytes) {
Ok(json) => json.into(),
Err(e) => {
error!(e; "Failed to parse jsonb");
serde_json::Value::Null
}
}
} else {
serde_json::to_value(bytes)?
}
}
ValueRef::Date(v) => serde_json::Value::Number(v.val().into()),
ValueRef::DateTime(v) => serde_json::Value::Number(v.val().into()),
ValueRef::List(v) => serde_json::to_value(v)?,
Expand Down
31 changes: 30 additions & 1 deletion src/frontend/src/instance/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use client::Output;
use common_error::ext::BoxedError;
use common_telemetry::tracing;
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use pipeline::PipelineWay;
use servers::error::{self, AuthSnafu, Result as ServerResult};
use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef};
use servers::otlp;
Expand All @@ -28,7 +30,7 @@ use session::context::QueryContextRef;
use snafu::ResultExt;

use crate::instance::Instance;
use crate::metrics::{OTLP_METRICS_ROWS, OTLP_TRACES_ROWS};
use crate::metrics::{OTLP_LOGS_ROWS, OTLP_METRICS_ROWS, OTLP_TRACES_ROWS};

#[async_trait]
impl OpenTelemetryProtocolHandler for Instance {
Expand Down Expand Up @@ -92,4 +94,31 @@ impl OpenTelemetryProtocolHandler for Instance {
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)
}

#[tracing::instrument(skip_all)]
async fn logs(
&self,
request: ExportLogsServiceRequest,
pipeline: PipelineWay,
table_name: String,
ctx: QueryContextRef,
) -> ServerResult<Output> {
self.plugins
.get::<PermissionCheckerRef>()
.as_ref()
.check_permission(ctx.current_user(), PermissionReq::Otlp)
.context(AuthSnafu)?;

let interceptor_ref = self
.plugins
.get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
interceptor_ref.pre_execute(ctx.clone())?;
let (requests, rows) = otlp::logs::to_grpc_insert_requests(request, pipeline, table_name)?;

self.handle_row_inserts(requests, ctx)
.await
.inspect(|_| OTLP_LOGS_ROWS.inc_by(rows as u64))
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)
}
}
12 changes: 12 additions & 0 deletions src/frontend/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,28 @@ lazy_static! {
.with_label_values(&["insert"]);
pub static ref EXECUTE_SCRIPT_ELAPSED: Histogram = HANDLE_SCRIPT_ELAPSED
.with_label_values(&["execute"]);

/// The number of OpenTelemetry metrics send by frontend node.
pub static ref OTLP_METRICS_ROWS: IntCounter = register_int_counter!(
"greptime_frontend_otlp_metrics_rows",
"frontend otlp metrics rows"
)
.unwrap();

/// The number of OpenTelemetry traces send by frontend node.
pub static ref OTLP_TRACES_ROWS: IntCounter = register_int_counter!(
"greptime_frontend_otlp_traces_rows",
"frontend otlp traces rows"
)
.unwrap();

/// The number of OpenTelemetry logs send by frontend node.
pub static ref OTLP_LOGS_ROWS: IntCounter = register_int_counter!(
"greptime_frontend_otlp_logs_rows",
"frontend otlp logs rows"
)
.unwrap();

/// The number of heartbeats send by frontend node.
pub static ref HEARTBEAT_SENT_COUNT: IntCounter = register_int_counter!(
"greptime_frontend_heartbeat_send_count",
Expand Down
1 change: 1 addition & 0 deletions src/pipeline/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ enum_dispatch = "0.3"
futures.workspace = true
greptime-proto.workspace = true
itertools.workspace = true
jsonb.workspace = true
lazy_static.workspace = true
moka = { workspace = true, features = ["sync"] }
once_cell.workspace = true
Expand Down
36 changes: 36 additions & 0 deletions src/pipeline/src/etl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,37 @@ where
self.transformer.transform_mut(val)
}

pub fn prepare_pipeline_value(&self, val: Value, result: &mut [Value]) -> Result<()> {
match val {
Value::Map(map) => {
let mut search_from = 0;
// because of the key in the json map is ordered
for (payload_key, payload_value) in map.values.into_iter() {
if search_from >= self.required_keys.len() {
break;
}

// because of map key is ordered, required_keys is ordered too
if let Some(pos) = self.required_keys[search_from..]
.iter()
.position(|k| k == &payload_key)
{
result[search_from + pos] = payload_value;
// next search from is always after the current key
search_from += pos;
}
}
}
Value::String(_) => {
result[0] = val;
}
_ => {
return PrepareValueMustBeObjectSnafu.fail();
}
}
Ok(())
}

pub fn prepare(&self, val: serde_json::Value, result: &mut [Value]) -> Result<()> {
match val {
serde_json::Value::Object(map) => {
Expand Down Expand Up @@ -286,6 +317,11 @@ pub(crate) fn find_key_index(intermediate_keys: &[String], key: &str, kind: &str
.context(IntermediateKeyIndexSnafu { kind, key })
}

pub enum PipelineWay {
Identity,
Custom(std::sync::Arc<Pipeline<crate::GreptimeTransformer>>),
}

#[cfg(test)]
mod tests {

Expand Down
12 changes: 12 additions & 0 deletions src/pipeline/src/etl/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,18 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("can not coerce nested type to {ty}"))]
CoerceNestedType {
ty: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("can not coerce {ty} to nested type"))]
CoerceTypeToNested {
ty: String,
#[snafu(implicit)]
location: Location,
},

#[snafu(display(
"invalid resolution: '{resolution}'. Available resolutions: {valid_resolution}"
Expand Down
7 changes: 4 additions & 3 deletions src/pipeline/src/etl/processor/cmcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,8 @@ impl Processor for CmcdProcessor {

#[cfg(test)]
mod tests {
use ahash::HashMap;
use std::collections::BTreeMap;

use urlencoding::decode;

use super::{CmcdProcessorBuilder, CMCD_KEYS};
Expand Down Expand Up @@ -563,14 +564,14 @@ mod tests {
let values = vec
.into_iter()
.map(|(k, v)| (k.to_string(), v))
.collect::<HashMap<String, Value>>();
.collect::<BTreeMap<String, Value>>();
let expected = Map { values };

let actual = processor.parse(0, &decoded).unwrap();
let actual = actual
.into_iter()
.map(|(index, value)| (intermediate_keys[index].clone(), value))
.collect::<HashMap<String, Value>>();
.collect::<BTreeMap<String, Value>>();
let actual = Map { values: actual };
assert_eq!(actual, expected);
}
Expand Down
6 changes: 4 additions & 2 deletions src/pipeline/src/etl/processor/regex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,8 @@ impl Processor for RegexProcessor {
}
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;

use ahash::{HashMap, HashMapExt};
use itertools::Itertools;

Expand Down Expand Up @@ -475,14 +477,14 @@ ignore_missing: false"#;
.map(|k| k.to_string())
.collect_vec();
let processor = builder.build(&intermediate_keys).unwrap();
let mut result = HashMap::new();
let mut result = BTreeMap::new();
for (index, pattern) in processor.patterns.iter().enumerate() {
let r = processor
.process(&breadcrumbs_str, pattern, (0, index))
.unwrap()
.into_iter()
.map(|(k, v)| (intermediate_keys[k].clone(), v))
.collect::<HashMap<_, _>>();
.collect::<BTreeMap<_, _>>();
result.extend(r);
}
let map = Map { values: result };
Expand Down
33 changes: 30 additions & 3 deletions src/pipeline/src/etl/transform/transformer/greptime/coerce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use greptime_proto::v1::{ColumnDataType, ColumnSchema, SemanticType};
use snafu::ResultExt;

use crate::etl::error::{
CoerceStringToTypeSnafu, CoerceUnsupportedEpochTypeSnafu, CoerceUnsupportedNullTypeSnafu,
CoerceNestedTypeSnafu, CoerceStringToTypeSnafu, CoerceTypeToNestedSnafu,
CoerceUnsupportedEpochTypeSnafu, CoerceUnsupportedNullTypeSnafu,
CoerceUnsupportedNullTypeToSnafu, ColumnOptionsSnafu, Error, Result,
};
use crate::etl::transform::index::Index;
Expand Down Expand Up @@ -187,8 +188,8 @@ pub(crate) fn coerce_value(val: &Value, transform: &Transform) -> Result<Option<
}
Value::Timestamp(Timestamp::Second(s)) => Ok(Some(ValueData::TimestampSecondValue(*s))),

Value::Array(_) => unimplemented!("Array type not supported"),
Value::Map(_) => unimplemented!("Object type not supported"),
Value::Array(_) => coerce_nested_value(val, transform),
Value::Map(_) => coerce_nested_value(val, transform),
}
}

Expand Down Expand Up @@ -414,6 +415,32 @@ fn coerce_string_value(s: &String, transform: &Transform) -> Result<Option<Value
}
}

fn coerce_nested_value(v: &Value, transform: &Transform) -> Result<Option<ValueData>> {
match &transform.type_ {
Value::Array(_) | Value::Map(_) => (),
t => {
return CoerceNestedTypeSnafu {
ty: t.to_str_type(),
}
.fail();
}
}
match v {
Value::Map(_) => {
let data: jsonb::Value = v.into();
Ok(Some(ValueData::BinaryValue(data.to_vec())))
}
Value::Array(_) => {
let data: jsonb::Value = v.into();
Ok(Some(ValueData::BinaryValue(data.to_vec())))
}
_ => CoerceTypeToNestedSnafu {
ty: v.to_str_type(),
}
.fail(),
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading