Skip to content

Commit

Permalink
chore: add test for write json log api
Browse files Browse the repository at this point in the history
  • Loading branch information
paomian committed Sep 26, 2024
1 parent c7ad508 commit 2bfc070
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 22 deletions.
2 changes: 1 addition & 1 deletion src/pipeline/src/etl/transform/transformer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
// limitations under the License.

pub mod greptime;
pub use greptime::identify_pipeline;
pub use greptime::identity_pipeline;
52 changes: 32 additions & 20 deletions src/pipeline/src/etl/transform/transformer/greptime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod coerce;

use std::collections::HashSet;

use ahash::HashMap;
use api::helper::proto_value_type;
use api::v1::column_data_type_extension::TypeExt;
use api::v1::value::ValueData;
Expand Down Expand Up @@ -201,44 +202,52 @@ impl Transformer for GreptimeTransformer {
}
}

#[derive(Debug, Default)]
struct SchemaInfo {
schema: Vec<ColumnSchema>,
index: HashMap<String, usize>,
}

fn resolve_schema(
index: Option<usize>,
value_data: ValueData,
column_schema: ColumnSchema,
row: &mut Vec<GreptimeValue>,
schema: &mut Vec<ColumnSchema>,
schema_info: &mut SchemaInfo,
) {
if let Some(index) = index {
let api_value = GreptimeValue {
value_data: Some(value_data),
};
let value_column_data_type = proto_value_type(&api_value);
// safety unwrap is fine here because index is always valid
let schema_column_data_type = schema.get(index).unwrap().datatype();
let schema_column_data_type = schema_info.schema.get(index).unwrap().datatype();
if value_column_data_type.is_some_and(|t| t != schema_column_data_type) {
row[index] = GreptimeValue { value_data: None };
} else {
row[index] = api_value;
}
} else {
schema.push(column_schema);
let key = column_schema.column_name.clone();
schema_info.schema.push(column_schema);
schema_info.index.insert(key, schema_info.schema.len() - 1);
let api_value = GreptimeValue {
value_data: Some(value_data),
};
row.push(api_value);
}
}

fn json_value_to_row(schemas: &mut Vec<ColumnSchema>, map: Map<String, serde_json::Value>) -> Row {
let mut row: Vec<GreptimeValue> = Vec::with_capacity(schemas.len());
for _ in 0..schemas.len() {
fn json_value_to_row(schema_info: &mut SchemaInfo, map: Map<String, serde_json::Value>) -> Row {
let mut row: Vec<GreptimeValue> = Vec::with_capacity(schema_info.schema.len());
for _ in 0..schema_info.schema.len() {
row.push(GreptimeValue { value_data: None });
}
for (key, value) in map {
if key == DEFAULT_GREPTIME_TIMESTAMP_COLUMN {
continue;
}
let index = schemas.iter().position(|x| x.column_name == key);
let index = schema_info.index.get(&key).copied();
match value {
serde_json::Value::Null => {
// do nothing
Expand All @@ -255,7 +264,7 @@ fn json_value_to_row(schemas: &mut Vec<ColumnSchema>, map: Map<String, serde_jso
options: None,
},
&mut row,
schemas,
schema_info,
);
}
serde_json::Value::Bool(b) => {
Expand All @@ -270,7 +279,7 @@ fn json_value_to_row(schemas: &mut Vec<ColumnSchema>, map: Map<String, serde_jso
options: None,
},
&mut row,
schemas,
schema_info,
);
}
serde_json::Value::Number(n) => {
Expand All @@ -287,7 +296,7 @@ fn json_value_to_row(schemas: &mut Vec<ColumnSchema>, map: Map<String, serde_jso
options: None,
},
&mut row,
schemas,
schema_info,
);
} else if n.is_u64() {
resolve_schema(
Expand All @@ -302,7 +311,7 @@ fn json_value_to_row(schemas: &mut Vec<ColumnSchema>, map: Map<String, serde_jso
options: None,
},
&mut row,
schemas,
schema_info,
);
} else if n.is_f64() {
resolve_schema(
Expand All @@ -317,7 +326,7 @@ fn json_value_to_row(schemas: &mut Vec<ColumnSchema>, map: Map<String, serde_jso
options: None,
},
&mut row,
schemas,
schema_info,
);
} else {
unreachable!("unexpected number type");
Expand All @@ -337,18 +346,18 @@ fn json_value_to_row(schemas: &mut Vec<ColumnSchema>, map: Map<String, serde_jso
options: None,
},
&mut row,
schemas,
schema_info,
);
}
}
}
Row { values: row }
}

pub fn identify_pipeline(array: Vec<serde_json::Value>) -> Result<Rows, String> {
pub fn identity_pipeline(array: Vec<serde_json::Value>) -> Result<Rows, String> {
let mut rows = Vec::with_capacity(array.len());

let mut schema = Vec::new();
let mut schema = SchemaInfo::default();
for value in array {
if let serde_json::Value::Object(map) = value {
let row = json_value_to_row(&mut schema, map);
Expand All @@ -366,21 +375,24 @@ pub fn identify_pipeline(array: Vec<serde_json::Value>) -> Result<Rows, String>
let ts = GreptimeValue {
value_data: Some(ValueData::TimestampNanosecondValue(ns)),
};
let column_count = schema.len();
let column_count = schema.schema.len();
for row in rows.iter_mut() {
let diff = column_count - row.values.len();
for _ in 0..diff {
row.values.push(GreptimeValue { value_data: None });
}
row.values.push(ts.clone());
}
schema.push(greptime_timestamp_schema);
Ok(Rows { schema, rows })
schema.schema.push(greptime_timestamp_schema);
Ok(Rows {
schema: schema.schema,
rows,
})
}

#[cfg(test)]
mod tests {
use crate::identify_pipeline;
use crate::identity_pipeline;

#[test]
fn test_identify_pipeline() {
Expand All @@ -404,7 +416,7 @@ mod tests {
"gaga": "gaga"
}),
];
let rows = identify_pipeline(array).unwrap();
let rows = identity_pipeline(array).unwrap();
assert_eq!(rows.schema.len(), 8);
assert_eq!(rows.rows.len(), 2);
assert_eq!(8, rows.rows[0].values.len());
Expand Down
2 changes: 1 addition & 1 deletion src/pipeline/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ mod metrics;

pub use etl::error::Result;
pub use etl::processor::Processor;
pub use etl::transform::transformer::identify_pipeline;
pub use etl::transform::transformer::identity_pipeline;
pub use etl::transform::{GreptimeTransformer, Transformer};
pub use etl::value::{Array, Map, Value};
pub use etl::{parse, Content, Pipeline};
Expand Down
56 changes: 56 additions & 0 deletions tests-integration/tests/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ macro_rules! http_tests {
test_pipeline_api,
test_test_pipeline_api,
test_plain_text_ingestion,
test_identify_pipeline,

test_otlp_metrics,
test_otlp_traces,
Expand Down Expand Up @@ -1157,6 +1158,61 @@ transform:
guard.remove_all().await;
}

pub async fn test_identify_pipeline(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_pipeline_api").await;

// handshake
let client = TestClient::new(app);
let body = r#"{"__time__":1453809242,"__topic__":"","__source__":"10.170.***.***","ip":"10.200.**.***","time":"26/Jan/2016:19:54:02 +0800","url":"POST/PutData?Category=YunOsAccountOpLog&AccessKeyId=<yourAccessKeyId>&Date=Fri%2C%2028%20Jun%202013%2006%3A53%3A30%20GMT&Topic=raw&Signature=<yourSignature>HTTP/1.1","status":"200","user-agent":"aliyun-sdk-java"}
{"__time__":1453809242,"__topic__":"","__source__":"10.170.***.***","ip":"10.200.**.***","time":"26/Jan/2016:19:54:02 +0800","url":"POST/PutData?Category=YunOsAccountOpLog&AccessKeyId=<yourAccessKeyId>&Date=Fri%2C%2028%20Jun%202013%2006%3A53%3A30%20GMT&Topic=raw&Signature=<yourSignature>HTTP/1.1","status":"200","user-agent":"aliyun-sdk-java","hasagei":"hasagei","dongdongdong":"guaguagua"}"#;
let res = client
.post("/v1/events/logs?db=public&table=logs")
.header("Content-Type", "application/json")
.body(body)
.send()
.await;

assert_eq!(res.status(), StatusCode::OK);

let body: serde_json::Value = res.json().await;

assert!(body.get("execution_time_ms").unwrap().is_number());
assert_eq!(body["output"][0]["affectedrows"], 2);

let res = client.get("/v1/sql?sql=select * from logs").send().await;

assert_eq!(res.status(), StatusCode::OK);

let line1_expected = r#"["10.170.***.***",1453809242,"","10.200.**.***","200","26/Jan/2016:19:54:02 +0800","POST/PutData?Category=YunOsAccountOpLog&AccessKeyId=<yourAccessKeyId>&Date=Fri%2C%2028%20Jun%202013%2006%3A53%3A30%20GMT&Topic=raw&Signature=<yourSignature>HTTP/1.1","aliyun-sdk-java","guaguagua","hasagei",null]"#;
let line2_expected = r#"["10.170.***.***",1453809242,"","10.200.**.***","200","26/Jan/2016:19:54:02 +0800","POST/PutData?Category=YunOsAccountOpLog&AccessKeyId=<yourAccessKeyId>&Date=Fri%2C%2028%20Jun%202013%2006%3A53%3A30%20GMT&Topic=raw&Signature=<yourSignature>HTTP/1.1","aliyun-sdk-java",null,null,null]"#;
let res = client.get("/v1/sql?sql=select * from logs").send().await;
assert_eq!(res.status(), StatusCode::OK);
let resp: serde_json::Value = res.json().await;
let result = resp["output"][0]["records"]["rows"].as_array().unwrap();
assert_eq!(result.len(), 2);
let mut line1 = result[0].as_array().unwrap().clone();
let mut line2 = result[1].as_array().unwrap().clone();
assert!(line1.last().unwrap().is_i64());
assert!(line2.last().unwrap().is_i64());
*line1.last_mut().unwrap() = serde_json::Value::Null;
*line2.last_mut().unwrap() = serde_json::Value::Null;

assert_eq!(
line1,
serde_json::from_str::<Vec<Value>>(line1_expected).unwrap()
);
assert_eq!(
line2,
serde_json::from_str::<Vec<Value>>(line2_expected).unwrap()
);

let expected = r#"[["__source__","String","","YES","","FIELD"],["__time__","Int64","","YES","","FIELD"],["__topic__","String","","YES","","FIELD"],["ip","String","","YES","","FIELD"],["status","String","","YES","","FIELD"],["time","String","","YES","","FIELD"],["url","String","","YES","","FIELD"],["user-agent","String","","YES","","FIELD"],["dongdongdong","String","","YES","","FIELD"],["hasagei","String","","YES","","FIELD"],["greptime_timestamp","TimestampNanosecond","PRI","NO","","TIMESTAMP"]]"#;
validate_data(&client, "desc logs", expected).await;

guard.remove_all().await;
}

pub async fn test_test_pipeline_api(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_pipeline_api").await;
Expand Down

0 comments on commit 2bfc070

Please sign in to comment.