From 64a3b7b288572a75a174f2eaf47a1e4462e82313 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alja=C5=BE=20Mur=20Er=C5=BEen?= Date: Wed, 21 Feb 2024 13:37:09 +0100 Subject: [PATCH] refactor: simplify SQLite append --- connector_arrow/src/sqlite/append.rs | 254 +++++++++++---------------- 1 file changed, 99 insertions(+), 155 deletions(-) diff --git a/connector_arrow/src/sqlite/append.rs b/connector_arrow/src/sqlite/append.rs index ed0bd24..42f44cc 100644 --- a/connector_arrow/src/sqlite/append.rs +++ b/connector_arrow/src/sqlite/append.rs @@ -1,11 +1,15 @@ -use std::sync::Arc; - -use arrow::array::*; -use arrow::{datatypes::DataType, record_batch::RecordBatch}; +use arrow::datatypes::*; +use arrow::record_batch::RecordBatch; +use itertools::zip_eq; use itertools::Itertools; use rusqlite::types::Value; use rusqlite::{params_from_iter, Transaction}; +use crate::impl_consume_unsupported; +use crate::types::{FixedSizeBinaryType, NullType}; +use crate::util::transport; +use crate::util::transport::{Consume, ConsumeTy}; +use crate::util::ArrayCellRef; use crate::{api::Append, ConnectorError}; pub struct SQLiteAppender<'conn> { @@ -68,163 +72,103 @@ fn insert_query(table_name: &str, cols: usize, rows: usize) -> String { fn collect_args(batch: &RecordBatch, rows_range: std::ops::Range) -> Vec { let mut res = Vec::with_capacity(rows_range.len() * batch.num_columns()); - for row in rows_range { - for col_array in batch.columns() { - res.push(get_value(col_array, row).to_owned()); + + let schema = batch.schema(); + let mut row = zip_eq(batch.columns(), schema.fields()) + .map(|(array, field)| ArrayCellRef { + array, + field, + row_number: 0, + }) + .collect_vec(); + + for row_number in rows_range { + for cell in &mut row { + cell.row_number = row_number; + transport::transport(cell.field, cell as &_, &mut res).unwrap(); } } res } -fn get_value(array: &Arc, row: usize) -> Value { - if array.is_null(row) { - return Value::Null; +impl Consume for Vec {} + +macro_rules! impl_consume_ty { + ($ArrTy: ty, $value_kind: expr) => { + impl_consume_ty!($ArrTy, $value_kind, std::convert::identity); + }; + + ($ArrTy: ty, $value_kind: expr, $conversion: expr) => { + impl ConsumeTy<$ArrTy> for Vec { + fn consume( + &mut self, + _ty: &DataType, + value: <$ArrTy as crate::types::ArrowType>::Native, + ) { + let value: Value = $value_kind(($conversion)(value)); + self.push(value); + } + + fn consume_null(&mut self) { + self.push(Value::Null); + } + } + }; +} + +impl ConsumeTy for Vec { + fn consume(&mut self, _ty: &DataType, _value: ()) { + self.push(Value::Null); } - match array.data_type() { - DataType::Null => Value::Null, - DataType::Boolean => Value::Integer( - array - .as_any() - .downcast_ref::() - .unwrap() - .value(row) as i64, - ), - DataType::Int8 => Value::Integer( - array - .as_any() - .downcast_ref::() - .unwrap() - .value(row) as i64, - ), - DataType::Int16 => Value::Integer( - array - .as_any() - .downcast_ref::() - .unwrap() - .value(row) as i64, - ), - DataType::Int32 => Value::Integer( - array - .as_any() - .downcast_ref::() - .unwrap() - .value(row) as i64, - ), - DataType::Int64 => Value::Integer( - array - .as_any() - .downcast_ref::() - .unwrap() - .value(row), - ), - DataType::UInt8 => Value::Integer( - array - .as_any() - .downcast_ref::() - .unwrap() - .value(row) as i64, - ), - DataType::UInt16 => Value::Integer( - array - .as_any() - .downcast_ref::() - .unwrap() - .value(row) as i64, - ), - DataType::UInt32 => Value::Integer( - array - .as_any() - .downcast_ref::() - .unwrap() - .value(row) as i64, - ), - DataType::UInt64 => Value::Text( - array - .as_any() - .downcast_ref::() - .unwrap() - .value(row) - .to_string(), - ), - DataType::Float16 => Value::Real( - array - .as_any() - .downcast_ref::() - .unwrap() - .value(row) - .to_f64(), - ), - DataType::Float32 => Value::Real( - array - .as_any() - .downcast_ref::() - .unwrap() - .value(row) as f64, - ), - DataType::Float64 => Value::Real( - array - .as_any() - .downcast_ref::() - .unwrap() - .value(row), - ), - DataType::Timestamp(_, _) => unimplemented!(), - DataType::Date32 => unimplemented!(), - DataType::Date64 => unimplemented!(), - DataType::Time32(_) => unimplemented!(), - DataType::Time64(_) => unimplemented!(), - DataType::Duration(_) => unimplemented!(), - DataType::Interval(_) => unimplemented!(), - DataType::Binary => Value::Blob( - array - .as_any() - .downcast_ref::() - .unwrap() - .value(row) - .to_vec(), - ), - DataType::FixedSizeBinary(_) => Value::Blob( - array - .as_any() - .downcast_ref::() - .unwrap() - .value(row) - .to_vec(), - ), - DataType::LargeBinary => Value::Blob( - array - .as_any() - .downcast_ref::() - .unwrap() - .value(row) - .to_vec(), - ), - DataType::Utf8 => Value::Text( - array - .as_any() - .downcast_ref::() - .unwrap() - .value(row) - .to_string(), - ), - DataType::LargeUtf8 => Value::Text( - array - .as_any() - .downcast_ref::() - .unwrap() - .value(row) - .to_string(), - ), - DataType::List(_) => unimplemented!(), - DataType::FixedSizeList(_, _) => unimplemented!(), - DataType::LargeList(_) => unimplemented!(), - DataType::Struct(_) => unimplemented!(), - DataType::Union(_, _) => unimplemented!(), - DataType::Dictionary(_, _) => unimplemented!(), - DataType::Decimal128(_, _) => unimplemented!(), - DataType::Decimal256(_, _) => unimplemented!(), - DataType::Map(_, _) => unimplemented!(), - DataType::RunEndEncoded(_, _) => unimplemented!(), + fn consume_null(&mut self) { + self.push(Value::Null); } } + +impl_consume_ty!(BooleanType, Value::Integer, i64::from); +impl_consume_ty!(Int8Type, Value::Integer, i64::from); +impl_consume_ty!(Int16Type, Value::Integer, i64::from); +impl_consume_ty!(Int32Type, Value::Integer, i64::from); +impl_consume_ty!(Int64Type, Value::Integer); +impl_consume_ty!(UInt8Type, Value::Integer, i64::from); +impl_consume_ty!(UInt16Type, Value::Integer, i64::from); +impl_consume_ty!(UInt32Type, Value::Integer, i64::from); +impl_consume_ty!(UInt64Type, Value::Text, u64_to_string); +impl_consume_ty!(Float16Type, Value::Real, f64::from); +impl_consume_ty!(Float32Type, Value::Real, f64::from); +impl_consume_ty!(Float64Type, Value::Real); +impl_consume_ty!(BinaryType, Value::Blob); +impl_consume_ty!(LargeBinaryType, Value::Blob); +impl_consume_ty!(FixedSizeBinaryType, Value::Blob); +impl_consume_ty!(Utf8Type, Value::Text); +impl_consume_ty!(LargeUtf8Type, Value::Text); + +impl_consume_unsupported!( + Vec, + ( + TimestampSecondType, + TimestampMillisecondType, + TimestampMicrosecondType, + TimestampNanosecondType, + Date32Type, + Date64Type, + Time32SecondType, + Time32MillisecondType, + Time64MicrosecondType, + Time64NanosecondType, + IntervalYearMonthType, + IntervalDayTimeType, + IntervalMonthDayNanoType, + DurationSecondType, + DurationMillisecondType, + DurationMicrosecondType, + DurationNanosecondType, + Decimal128Type, + Decimal256Type, + ) +); + +fn u64_to_string(u: u64) -> String { + u64::to_string(&u) +}