diff --git a/src/pipeline/src/etl/transform/transformer/greptime.rs b/src/pipeline/src/etl/transform/transformer/greptime.rs index f3556851ac2..3e06781e7d0 100644 --- a/src/pipeline/src/etl/transform/transformer/greptime.rs +++ b/src/pipeline/src/etl/transform/transformer/greptime.rs @@ -354,7 +354,7 @@ fn json_value_to_row(schema_info: &mut SchemaInfo, map: Map) -> Result { +pub fn identity_pipeline(array: Vec) -> Result { let mut rows = Vec::with_capacity(array.len()); let mut schema = SchemaInfo::default(); diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index 7a9d10320e0..84bd5e3b4b9 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -437,21 +437,19 @@ async fn ingest_logs_inner( for v in pipeline_data { pipeline .prepare(v, &mut intermediate_state) - .inspect_err(|reason| { + .inspect_err(|_| { METRIC_HTTP_LOGS_TRANSFORM_ELAPSED .with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE]) .observe(transform_timer.elapsed().as_secs_f64()); - PipelineTransformSnafu { reason }.build() }) .context(PipelineTransformSnafu) .context(PipelineSnafu)?; let r = pipeline .exec_mut(&mut intermediate_state) - .inspect_err(|reason| { + .inspect_err(|_| { METRIC_HTTP_LOGS_TRANSFORM_ELAPSED .with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE]) .observe(transform_timer.elapsed().as_secs_f64()); - PipelineTransformSnafu { reason }.build() }) .context(PipelineTransformSnafu) .context(PipelineSnafu)?; @@ -468,7 +466,7 @@ async fn ingest_logs_inner( schema: pipeline.schemas().clone(), }; } else { - let rows = pipeline::identify_pipeline(pipeline_data) + let rows = pipeline::identity_pipeline(pipeline_data) .context(PipelineTransformSnafu) .context(PipelineSnafu)?; transformed_data = rows;