Skip to content

Commit

Permalink
refactor: simplify SQLite append
Browse files Browse the repository at this point in the history
  • Loading branch information
aljazerzen committed Feb 21, 2024
1 parent 312beff commit 64a3b7b
Showing 1 changed file with 99 additions and 155 deletions.
254 changes: 99 additions & 155 deletions connector_arrow/src/sqlite/append.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
use std::sync::Arc;

use arrow::array::*;
use arrow::{datatypes::DataType, record_batch::RecordBatch};
use arrow::datatypes::*;
use arrow::record_batch::RecordBatch;
use itertools::zip_eq;
use itertools::Itertools;
use rusqlite::types::Value;
use rusqlite::{params_from_iter, Transaction};

use crate::impl_consume_unsupported;
use crate::types::{FixedSizeBinaryType, NullType};
use crate::util::transport;
use crate::util::transport::{Consume, ConsumeTy};
use crate::util::ArrayCellRef;
use crate::{api::Append, ConnectorError};

pub struct SQLiteAppender<'conn> {
Expand Down Expand Up @@ -68,163 +72,103 @@ fn insert_query(table_name: &str, cols: usize, rows: usize) -> String {

fn collect_args(batch: &RecordBatch, rows_range: std::ops::Range<usize>) -> Vec<Value> {
let mut res = Vec::with_capacity(rows_range.len() * batch.num_columns());
for row in rows_range {
for col_array in batch.columns() {
res.push(get_value(col_array, row).to_owned());

let schema = batch.schema();
let mut row = zip_eq(batch.columns(), schema.fields())
.map(|(array, field)| ArrayCellRef {
array,
field,
row_number: 0,
})
.collect_vec();

for row_number in rows_range {
for cell in &mut row {
cell.row_number = row_number;
transport::transport(cell.field, cell as &_, &mut res).unwrap();
}
}
res
}

fn get_value(array: &Arc<dyn Array>, row: usize) -> Value {
if array.is_null(row) {
return Value::Null;
impl Consume for Vec<Value> {}

macro_rules! impl_consume_ty {
($ArrTy: ty, $value_kind: expr) => {
impl_consume_ty!($ArrTy, $value_kind, std::convert::identity);
};

($ArrTy: ty, $value_kind: expr, $conversion: expr) => {
impl ConsumeTy<$ArrTy> for Vec<Value> {
fn consume(
&mut self,
_ty: &DataType,
value: <$ArrTy as crate::types::ArrowType>::Native,
) {
let value: Value = $value_kind(($conversion)(value));
self.push(value);
}

fn consume_null(&mut self) {
self.push(Value::Null);
}
}
};
}

impl ConsumeTy<NullType> for Vec<Value> {
fn consume(&mut self, _ty: &DataType, _value: ()) {
self.push(Value::Null);
}

match array.data_type() {
DataType::Null => Value::Null,
DataType::Boolean => Value::Integer(
array
.as_any()
.downcast_ref::<BooleanArray>()
.unwrap()
.value(row) as i64,
),
DataType::Int8 => Value::Integer(
array
.as_any()
.downcast_ref::<Int8Array>()
.unwrap()
.value(row) as i64,
),
DataType::Int16 => Value::Integer(
array
.as_any()
.downcast_ref::<Int16Array>()
.unwrap()
.value(row) as i64,
),
DataType::Int32 => Value::Integer(
array
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.value(row) as i64,
),
DataType::Int64 => Value::Integer(
array
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.value(row),
),
DataType::UInt8 => Value::Integer(
array
.as_any()
.downcast_ref::<UInt8Array>()
.unwrap()
.value(row) as i64,
),
DataType::UInt16 => Value::Integer(
array
.as_any()
.downcast_ref::<UInt16Array>()
.unwrap()
.value(row) as i64,
),
DataType::UInt32 => Value::Integer(
array
.as_any()
.downcast_ref::<UInt32Array>()
.unwrap()
.value(row) as i64,
),
DataType::UInt64 => Value::Text(
array
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap()
.value(row)
.to_string(),
),
DataType::Float16 => Value::Real(
array
.as_any()
.downcast_ref::<Float16Array>()
.unwrap()
.value(row)
.to_f64(),
),
DataType::Float32 => Value::Real(
array
.as_any()
.downcast_ref::<Float32Array>()
.unwrap()
.value(row) as f64,
),
DataType::Float64 => Value::Real(
array
.as_any()
.downcast_ref::<Float64Array>()
.unwrap()
.value(row),
),
DataType::Timestamp(_, _) => unimplemented!(),
DataType::Date32 => unimplemented!(),
DataType::Date64 => unimplemented!(),
DataType::Time32(_) => unimplemented!(),
DataType::Time64(_) => unimplemented!(),
DataType::Duration(_) => unimplemented!(),
DataType::Interval(_) => unimplemented!(),
DataType::Binary => Value::Blob(
array
.as_any()
.downcast_ref::<BinaryArray>()
.unwrap()
.value(row)
.to_vec(),
),
DataType::FixedSizeBinary(_) => Value::Blob(
array
.as_any()
.downcast_ref::<FixedSizeBinaryArray>()
.unwrap()
.value(row)
.to_vec(),
),
DataType::LargeBinary => Value::Blob(
array
.as_any()
.downcast_ref::<LargeBinaryArray>()
.unwrap()
.value(row)
.to_vec(),
),
DataType::Utf8 => Value::Text(
array
.as_any()
.downcast_ref::<StringArray>()
.unwrap()
.value(row)
.to_string(),
),
DataType::LargeUtf8 => Value::Text(
array
.as_any()
.downcast_ref::<LargeStringArray>()
.unwrap()
.value(row)
.to_string(),
),
DataType::List(_) => unimplemented!(),
DataType::FixedSizeList(_, _) => unimplemented!(),
DataType::LargeList(_) => unimplemented!(),
DataType::Struct(_) => unimplemented!(),
DataType::Union(_, _) => unimplemented!(),
DataType::Dictionary(_, _) => unimplemented!(),
DataType::Decimal128(_, _) => unimplemented!(),
DataType::Decimal256(_, _) => unimplemented!(),
DataType::Map(_, _) => unimplemented!(),
DataType::RunEndEncoded(_, _) => unimplemented!(),
fn consume_null(&mut self) {
self.push(Value::Null);
}
}

impl_consume_ty!(BooleanType, Value::Integer, i64::from);
impl_consume_ty!(Int8Type, Value::Integer, i64::from);
impl_consume_ty!(Int16Type, Value::Integer, i64::from);
impl_consume_ty!(Int32Type, Value::Integer, i64::from);
impl_consume_ty!(Int64Type, Value::Integer);
impl_consume_ty!(UInt8Type, Value::Integer, i64::from);
impl_consume_ty!(UInt16Type, Value::Integer, i64::from);
impl_consume_ty!(UInt32Type, Value::Integer, i64::from);
impl_consume_ty!(UInt64Type, Value::Text, u64_to_string);
impl_consume_ty!(Float16Type, Value::Real, f64::from);
impl_consume_ty!(Float32Type, Value::Real, f64::from);
impl_consume_ty!(Float64Type, Value::Real);
impl_consume_ty!(BinaryType, Value::Blob);
impl_consume_ty!(LargeBinaryType, Value::Blob);
impl_consume_ty!(FixedSizeBinaryType, Value::Blob);
impl_consume_ty!(Utf8Type, Value::Text);
impl_consume_ty!(LargeUtf8Type, Value::Text);

impl_consume_unsupported!(
Vec<Value>,
(
TimestampSecondType,
TimestampMillisecondType,
TimestampMicrosecondType,
TimestampNanosecondType,
Date32Type,
Date64Type,
Time32SecondType,
Time32MillisecondType,
Time64MicrosecondType,
Time64NanosecondType,
IntervalYearMonthType,
IntervalDayTimeType,
IntervalMonthDayNanoType,
DurationSecondType,
DurationMillisecondType,
DurationMicrosecondType,
DurationNanosecondType,
Decimal128Type,
Decimal256Type,
)
);

fn u64_to_string(u: u64) -> String {
u64::to_string(&u)
}

0 comments on commit 64a3b7b

Please sign in to comment.