Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add json write #4744

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/pipeline/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ enum_dispatch = "0.3"
futures.workspace = true
greptime-proto.workspace = true
itertools.workspace = true
jsonb.workspace = true
lazy_static.workspace = true
moka = { workspace = true, features = ["sync"] }
once_cell.workspace = true
Expand Down
9 changes: 9 additions & 0 deletions src/pipeline/src/etl/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,15 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Column type mismatch. column: {column}, original: {original}, now: {now}"))]
IdentifyPipelineColumnTypeMismatch {
column: String,
original: String,
now: String,
#[snafu(implicit)]
location: Location,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down
1 change: 1 addition & 0 deletions src/pipeline/src/etl/transform/transformer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@
// limitations under the License.

pub mod greptime;
pub use greptime::identity_pipeline;
306 changes: 304 additions & 2 deletions src/pipeline/src/etl/transform/transformer/greptime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,19 @@ pub mod coerce;

use std::collections::HashSet;

use ahash::HashMap;
use api::helper::proto_value_type;
use api::v1::column_data_type_extension::TypeExt;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnDataTypeExtension, JsonTypeExtension, SemanticType};
use coerce::{coerce_columns, coerce_value};
use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue};
use itertools::Itertools;
use serde_json::{Map, Number};

use crate::etl::error::{
Result, TransformColumnNameMustBeUniqueSnafu, TransformEmptySnafu,
TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu,
IdentifyPipelineColumnTypeMismatchSnafu, Result, TransformColumnNameMustBeUniqueSnafu,
TransformEmptySnafu, TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu,
};
use crate::etl::field::{InputFieldInfo, OneInputOneOutputField};
use crate::etl::transform::index::Index;
Expand Down Expand Up @@ -120,6 +126,7 @@ impl Transformer for GreptimeTransformer {
if let Some(idx) = transform.index {
if idx == Index::Time {
match transform.real_fields.len() {
//safety unwrap is fine here because we have checked the length of real_fields
1 => timestamp_columns
.push(transform.real_fields.first().unwrap().input_name()),
_ => {
Expand Down Expand Up @@ -194,3 +201,298 @@ impl Transformer for GreptimeTransformer {
&mut self.transforms
}
}

/// This is used to record the current state schema information and a sequential cache of field names.
/// As you traverse the user input JSON, this will change.
/// It will record a superset of all user input schemas.
#[derive(Debug, Default)]
struct SchemaInfo {
/// schema info
schema: Vec<ColumnSchema>,
/// index of the column name
index: HashMap<String, usize>,
paomian marked this conversation as resolved.
Show resolved Hide resolved
}

fn resolve_schema(
index: Option<usize>,
value_data: ValueData,
column_schema: ColumnSchema,
row: &mut Vec<GreptimeValue>,
schema_info: &mut SchemaInfo,
) -> Result<()> {
if let Some(index) = index {
let api_value = GreptimeValue {
value_data: Some(value_data),
};
let value_column_data_type = proto_value_type(&api_value);
// safety unwrap is fine here because index is always valid
let schema_column_data_type = schema_info.schema.get(index).unwrap().datatype();
if value_column_data_type.is_some_and(|t| t != schema_column_data_type) {
IdentifyPipelineColumnTypeMismatchSnafu {
column: column_schema.column_name,
original: schema_column_data_type.as_str_name(),
now: value_column_data_type
.map(|t| t.as_str_name())
.unwrap_or_else(|| "null"),
}
.fail()
} else {
row[index] = api_value;
Ok(())
}
} else {
let key = column_schema.column_name.clone();
schema_info.schema.push(column_schema);
schema_info.index.insert(key, schema_info.schema.len() - 1);
let api_value = GreptimeValue {
value_data: Some(value_data),
};
row.push(api_value);
Ok(())
}
}

fn resolve_number_schema(
n: Number,
column_name: String,
index: Option<usize>,
row: &mut Vec<GreptimeValue>,
schema_info: &mut SchemaInfo,
) -> Result<()> {
let (value, datatype, semantic_type) = if n.is_i64() {
(
ValueData::I64Value(n.as_i64().unwrap()),
ColumnDataType::Int64 as i32,
SemanticType::Field as i32,
)
} else if n.is_u64() {
(
ValueData::U64Value(n.as_u64().unwrap()),
ColumnDataType::Uint64 as i32,
SemanticType::Field as i32,
)
} else if n.is_f64() {
(
ValueData::F64Value(n.as_f64().unwrap()),
ColumnDataType::Float64 as i32,
SemanticType::Field as i32,
)
} else {
unreachable!("unexpected number type");
};
resolve_schema(
index,
value,
ColumnSchema {
column_name,
datatype,
semantic_type,
datatype_extension: None,
options: None,
},
row,
schema_info,
)
}

fn json_value_to_row(
schema_info: &mut SchemaInfo,
map: Map<String, serde_json::Value>,
) -> Result<Row> {
let mut row: Vec<GreptimeValue> = Vec::with_capacity(schema_info.schema.len());
for _ in 0..schema_info.schema.len() {
row.push(GreptimeValue { value_data: None });
}
for (column_name, value) in map {
if column_name == DEFAULT_GREPTIME_TIMESTAMP_COLUMN {
continue;
}
let index = schema_info.index.get(&column_name).copied();
match value {
serde_json::Value::Null => {
// do nothing
}
serde_json::Value::String(s) => {
resolve_schema(
index,
ValueData::StringValue(s),
ColumnSchema {
column_name,
datatype: ColumnDataType::String as i32,
semantic_type: SemanticType::Field as i32,
datatype_extension: None,
options: None,
},
&mut row,
schema_info,
)?;
}
serde_json::Value::Bool(b) => {
resolve_schema(
index,
ValueData::BoolValue(b),
ColumnSchema {
column_name,
datatype: ColumnDataType::Boolean as i32,
semantic_type: SemanticType::Field as i32,
datatype_extension: None,
options: None,
},
&mut row,
schema_info,
)?;
}
serde_json::Value::Number(n) => {
resolve_number_schema(n, column_name, index, &mut row, schema_info)?;
}
serde_json::Value::Array(_) | serde_json::Value::Object(_) => {
resolve_schema(
index,
ValueData::BinaryValue(jsonb::Value::from(value).to_vec()),
ColumnSchema {
column_name,
datatype: ColumnDataType::Binary as i32,
semantic_type: SemanticType::Field as i32,
datatype_extension: Some(ColumnDataTypeExtension {
type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
}),
options: None,
},
&mut row,
schema_info,
)?;
}
}
}
Ok(Row { values: row })
}

pub fn identity_pipeline(array: Vec<serde_json::Value>) -> Result<Rows> {
let mut rows = Vec::with_capacity(array.len());

let mut schema = SchemaInfo::default();
for value in array {
if let serde_json::Value::Object(map) = value {
let row = json_value_to_row(&mut schema, map)?;
rows.push(row);
}
}
let greptime_timestamp_schema = ColumnSchema {
column_name: DEFAULT_GREPTIME_TIMESTAMP_COLUMN.to_string(),
datatype: ColumnDataType::TimestampNanosecond as i32,
semantic_type: SemanticType::Timestamp as i32,
datatype_extension: None,
options: None,
};
let ns = chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0);
let ts = GreptimeValue {
value_data: Some(ValueData::TimestampNanosecondValue(ns)),
};
let column_count = schema.schema.len();
for row in rows.iter_mut() {
let diff = column_count - row.values.len();
for _ in 0..diff {
row.values.push(GreptimeValue { value_data: None });
}
row.values.push(ts.clone());
}
schema.schema.push(greptime_timestamp_schema);
Ok(Rows {
schema: schema.schema,
rows,
})
}

#[cfg(test)]
mod tests {
use crate::identity_pipeline;

#[test]
fn test_identify_pipeline() {
{
let array = vec![
serde_json::json!({
"woshinull": null,
"name": "Alice",
"age": 20,
"is_student": true,
"score": 99.5,
"hobbies": "reading",
"address": "Beijing",
}),
serde_json::json!({
"name": "Bob",
"age": 21,
"is_student": false,
"score": "88.5",
"hobbies": "swimming",
"address": "Shanghai",
"gaga": "gaga"
}),
];
let rows = identity_pipeline(array);
assert!(rows.is_err());
assert_eq!(
rows.err().unwrap().to_string(),
"Column type mismatch. column: score, original: FLOAT64, now: STRING".to_string(),
);
}
{
let array = vec![
serde_json::json!({
"woshinull": null,
"name": "Alice",
"age": 20,
"is_student": true,
"score": 99.5,
"hobbies": "reading",
"address": "Beijing",
}),
serde_json::json!({
"name": "Bob",
"age": 21,
"is_student": false,
"score": 88,
"hobbies": "swimming",
"address": "Shanghai",
"gaga": "gaga"
}),
];
let rows = identity_pipeline(array);
assert!(rows.is_err());
assert_eq!(
rows.err().unwrap().to_string(),
"Column type mismatch. column: score, original: FLOAT64, now: INT64".to_string(),
);
}
{
let array = vec![
serde_json::json!({
"woshinull": null,
"name": "Alice",
"age": 20,
"is_student": true,
"score": 99.5,
"hobbies": "reading",
"address": "Beijing",
}),
serde_json::json!({
"name": "Bob",
"age": 21,
"is_student": false,
"score": 88.5,
"hobbies": "swimming",
"address": "Shanghai",
"gaga": "gaga"
}),
];
let rows = identity_pipeline(array);
assert!(rows.is_ok());
let rows = rows.unwrap();
assert_eq!(rows.schema.len(), 8);
assert_eq!(rows.rows.len(), 2);
assert_eq!(8, rows.rows[0].values.len());
assert_eq!(8, rows.rows[1].values.len());
}
}
}
1 change: 1 addition & 0 deletions src/pipeline/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod metrics;

pub use etl::error::Result;
pub use etl::processor::Processor;
pub use etl::transform::transformer::identity_pipeline;
pub use etl::transform::{GreptimeTransformer, Transformer};
pub use etl::value::{Array, Map, Value};
pub use etl::{parse, Content, Pipeline};
Expand Down
Loading