Skip to content

Commit

Permalink
feat: mysql append, schema get, schema edit
Browse files Browse the repository at this point in the history
  • Loading branch information
aljazerzen committed Mar 26, 2024
1 parent 670dbe2 commit 6953078
Show file tree
Hide file tree
Showing 15 changed files with 519 additions and 90 deletions.
16 changes: 8 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ without need for dynamic linking of C libraries.

## Support matrix

| | SQLite | DuckDB | PostgreSQL | MySQL | Microsoft SQL Server |
| RDBMS | SQLite | DuckDB | PostgreSQL | MySQL | Microsoft SQL Server |
| --- | --- | --- | --- | --- | --- |
| feature | `src_sqlite` | `src_duckdb` | `src_postgres` | `src_mysql` | `src_tiberius` |
| dependency | [rusqlite](https://crates.io/crates/rusqlite) | [duckdb](https://crates.io/crates/duckdb) | [postgres](https://crates.io/crates/postgres) | [mysql](https://crates.io/crates/mysql) | [tiberius](https://crates.io/crates/tiberius) |
Expand All @@ -39,19 +39,19 @@ without need for dynamic linking of C libraries.
| schema get | x | x | x | | |
| schema edit | x | x | x | | |
| append | x | x | x | | |
| roundtrip: null & bool | x | x | x | | |
| roundtrip: int | x | x | x | | |
| roundtrip: uint | x | x | x | | |
| roundtrip: float | x | x | x | | |
| roundtrip: null & bool | x | x | x | x | |
| roundtrip: int | x | x | x | x | |
| roundtrip: uint | x | x | x | x | |
| roundtrip: float | x | x | x | x | |
| roundtrip: decimal | x | | x | | |
| roundtrip: timestamp | x | x | x | | |
| roundtrip: date | x | | x | | |
| roundtrip: time | x | | x | | |
| roundtrip: duration | x | | x | | |
| roundtrip: interval | | | | | |
| roundtrip: utf8 | x | x | x | | |
| roundtrip: binary | x | x | x | | |
| roundtrip: empty | | x | x | | |
| roundtrip: utf8 | x | x | x | x | |
| roundtrip: binary | x | x | x | x | |
| roundtrip: empty | | x | x | x | |
| containers | | | | | |
| binary fallback | x | | x | | |

Expand Down
172 changes: 172 additions & 0 deletions connector_arrow/src/mysql/append.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
use arrow::datatypes::*;
use arrow::record_batch::RecordBatch;
use itertools::{zip_eq, Itertools};
use mysql::prelude::Queryable;
use mysql::Value;

use crate::api::Append;
use crate::types::{FixedSizeBinaryType, NullType};
use crate::util::escape::escaped_ident_bt;
use crate::util::transport::{self, Consume, ConsumeTy};
use crate::util::ArrayCellRef;
use crate::{impl_consume_unsupported, ConnectorError};

pub struct MySQLAppender<'conn, C: Queryable> {
table: String,
client: &'conn mut C,
}

impl<'conn, C: Queryable> MySQLAppender<'conn, C> {
pub fn new(client: &'conn mut C, table_name: &str) -> Result<Self, ConnectorError> {
client.query_drop("START TRANSACTION;")?;
Ok(Self {
table: table_name.to_owned(),
client,
})
}
}

impl<'conn, C: Queryable> Append<'conn> for MySQLAppender<'conn, C> {
fn append(&mut self, batch: RecordBatch) -> Result<(), ConnectorError> {
// TODO: 30 is a guess, we need benchmarking to find the optimum value
const BATCH_SIZE: usize = 30;

let last_batch_size = batch.num_rows() % BATCH_SIZE;

let batch_query = insert_query(&self.table, batch.num_columns(), BATCH_SIZE);
for batch_number in 0..(batch.num_rows() / BATCH_SIZE) {
let rows_range = (batch_number * BATCH_SIZE)..((batch_number + 1) * BATCH_SIZE);

let params: Vec<Value> = collect_args(&batch, rows_range);
self.client.exec_iter(&batch_query, params)?;
}

if last_batch_size > 0 {
let rows_range = (batch.num_rows() - last_batch_size)..batch.num_rows();

let last_query = insert_query(&self.table, batch.num_columns(), last_batch_size);
let params: Vec<Value> = collect_args(&batch, rows_range);
self.client.exec_iter(&last_query, params)?;
}

Ok(())
}

fn finish(self) -> Result<(), ConnectorError> {
self.client.query_drop("COMMIT;")?;
Ok(())
}
}

fn insert_query(table_name: &str, cols: usize, rows: usize) -> String {
let values = (0..rows)
.map(|_| {
let row = (0..cols).map(|_| "?").join(",");
format!("({row})")
})
.join(",");

format!(
"INSERT INTO {} VALUES {values}",
escaped_ident_bt(table_name)
)
}

fn collect_args(batch: &RecordBatch, rows_range: std::ops::Range<usize>) -> Vec<Value> {
let mut res = Vec::with_capacity(rows_range.len() * batch.num_columns());

let schema = batch.schema();
let mut row = zip_eq(batch.columns(), schema.fields())
.map(|(array, field)| ArrayCellRef {
array,
field,
row_number: 0,
})
.collect_vec();

for row_number in rows_range {
for cell in &mut row {
cell.row_number = row_number;
transport::transport(cell.field, cell as &_, &mut res).unwrap();
}
}
res
}

impl Consume for Vec<Value> {}

macro_rules! impl_consume_ty {
($ArrTy: ty, $value_kind: expr) => {
impl_consume_ty!($ArrTy, $value_kind, std::convert::identity);
};

($ArrTy: ty, $value_kind: expr, $conversion: expr) => {
impl ConsumeTy<$ArrTy> for Vec<Value> {
fn consume(
&mut self,
_ty: &DataType,
value: <$ArrTy as crate::types::ArrowType>::Native,
) {
let value: Value = $value_kind(($conversion)(value));
self.push(value);
}

fn consume_null(&mut self) {
self.push(Value::NULL);
}
}
};
}

impl_consume_ty!(BooleanType, Value::Int, i64::from);
impl_consume_ty!(Int8Type, Value::Int, i64::from);
impl_consume_ty!(Int16Type, Value::Int, i64::from);
impl_consume_ty!(Int32Type, Value::Int, i64::from);
impl_consume_ty!(Int64Type, Value::Int);
impl_consume_ty!(UInt8Type, Value::UInt, u64::from);
impl_consume_ty!(UInt16Type, Value::UInt, u64::from);
impl_consume_ty!(UInt32Type, Value::UInt, u64::from);
impl_consume_ty!(UInt64Type, Value::UInt);
impl_consume_ty!(Float16Type, Value::Float, f32::from);
impl_consume_ty!(Float32Type, Value::Float);
impl_consume_ty!(Float64Type, Value::Double);
impl_consume_ty!(Utf8Type, Value::Bytes, String::into_bytes);
impl_consume_ty!(BinaryType, Value::Bytes);
impl_consume_ty!(LargeBinaryType, Value::Bytes);
impl_consume_ty!(FixedSizeBinaryType, Value::Bytes);

impl ConsumeTy<NullType> for Vec<Value> {
fn consume(&mut self, _ty: &DataType, _value: ()) {
self.push(Value::NULL);
}

fn consume_null(&mut self) {
self.push(Value::NULL);
}
}

impl_consume_unsupported!(
Vec<Value>,
(
TimestampSecondType,
TimestampMillisecondType,
TimestampMicrosecondType,
TimestampNanosecondType,
Date32Type,
Date64Type,
Time32SecondType,
Time32MillisecondType,
Time64MicrosecondType,
Time64NanosecondType,
DurationSecondType,
DurationMillisecondType,
DurationMicrosecondType,
DurationNanosecondType,
IntervalDayTimeType,
IntervalMonthDayNanoType,
IntervalYearMonthType,
Decimal128Type,
Decimal256Type,
LargeUtf8Type,
)
);
89 changes: 68 additions & 21 deletions connector_arrow/src/mysql/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
mod append;
mod query;
mod schema;
mod types;

use arrow::datatypes::*;
use mysql::prelude::*;

use crate::api::{unimplemented, Connector};
use crate::api::Connector;
use crate::ConnectorError;

pub struct MySQLConnection<C: Queryable> {
Expand All @@ -24,7 +26,7 @@ impl<C: Queryable> MySQLConnection<C> {
impl<C: Queryable> Connector for MySQLConnection<C> {
type Stmt<'conn> = query::MySQLStatement<'conn, C> where Self: 'conn;

type Append<'conn> = unimplemented::Appender where Self: 'conn;
type Append<'conn> = append::MySQLAppender<'conn, C> where Self: 'conn;

fn query<'a>(&'a mut self, query: &str) -> Result<Self::Stmt<'a>, ConnectorError> {
let stmt = self.conn.prep(query)?;
Expand All @@ -34,37 +36,82 @@ impl<C: Queryable> Connector for MySQLConnection<C> {
})
}

fn append<'a>(&'a mut self, _table_name: &str) -> Result<Self::Append<'a>, ConnectorError> {
Ok(unimplemented::Appender {})
fn append<'a>(&'a mut self, table_name: &str) -> Result<Self::Append<'a>, ConnectorError> {
append::MySQLAppender::new(&mut self.conn, table_name)
}

fn type_db_into_arrow(ty: &str) -> Option<DataType> {
Some(match ty {
"null" => DataType::Null,
dbg!(ty);

"tinyint" | "bool" | "boolean" => DataType::Int8,
"smallint" => DataType::Int16,
"integer" | "int" => DataType::Int32,
"bigint" => DataType::Int64,
let (ty, unsigned) = ty
.strip_suffix(" unsigned")
.map(|p| (p, true))
.unwrap_or((ty, false));

"tinyint unsigned" => DataType::UInt8,
"smallint unsigned" => DataType::UInt16,
"integer unsigned" | "int unsigned" => DataType::UInt32,
"bigint unsigned" => DataType::UInt64,
// strip size suffix and anything following it
let ty = if let Some(open_parent) = ty.find('(') {
&ty[0..open_parent]
} else {
ty
};

"real" | "float4" => DataType::Float32,
"double" | "float8" => DataType::Float64,
Some(match (ty, unsigned) {
("null", _) => DataType::Null,

"bytea" => DataType::Binary,
"bit" | "tiny_blob" | "medium_blob" | "long_blob" | "blob" => DataType::Binary,
("tinyint" | "bool" | "boolean", false) => DataType::Int8,
("smallint", false) => DataType::Int16,
("integer" | "int", false) => DataType::Int32,
("bigint", false) => DataType::Int64,

"varchar" | "var_string" | "string" => DataType::Utf8,
("tinyint", true) => DataType::UInt8,
("smallint", true) => DataType::UInt16,
("integer" | "int", true) => DataType::UInt32,
("bigint", true) => DataType::UInt64,

("real" | "float" | "float4", _) => DataType::Float32,
("double" | "float8", _) => DataType::Float64,

("bit" | "tinyblob" | "mediumblob" | "longblob" | "blob" | "binary", _) => {
DataType::Binary
}

("tinytext" | "mediumtext" | "longtext" | "text" | "varchar", _) => DataType::Utf8,

_ => return None,
})
}

fn type_arrow_into_db(_ty: &DataType) -> Option<String> {
None
fn type_arrow_into_db(ty: &DataType) -> Option<String> {
Some(
match ty {
DataType::Null => "tinyint",
DataType::Boolean => "tinyint",
DataType::Int8 => "tinyint",
DataType::Int16 => "smallint",
DataType::Int32 => "integer",
DataType::Int64 => "bigint",
DataType::UInt8 => "tinyint unsigned",
DataType::UInt16 => "smallint unsigned",
DataType::UInt32 => "integer unsigned",
DataType::UInt64 => "bigint unsigned",
DataType::Float16 => "float",
DataType::Float32 => "float",
DataType::Float64 => "double",

DataType::Binary => "longblob",
DataType::FixedSizeBinary(1) => "binary",
DataType::FixedSizeBinary(2) => "blob",
DataType::FixedSizeBinary(3) => "mediumblob",
DataType::FixedSizeBinary(4) => "longblob",
DataType::FixedSizeBinary(_) => return None,
DataType::LargeBinary => return None,

DataType::Utf8 => "longtext",
DataType::LargeUtf8 => return None,

_ => return None,
}
.to_string(),
)
}
}
8 changes: 7 additions & 1 deletion connector_arrow/src/mysql/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ impl<'a> util::CellReader<'a> for MySQLCellReader {
cell: self.cell,
};
self.cell += 1;

Some(r)
}
}
Expand All @@ -113,7 +114,12 @@ macro_rules! impl_produce_ty {
Ok(self.row.take(self.cell).unwrap())
}
fn produce_opt(self) -> Result<Option<<$t as ArrowType>::Native>, ConnectorError> {
Ok(self.row.take(self.cell))
let res = self.row.take_opt(self.cell).unwrap();
match res {
Ok(v) => Ok(Some(v)),
Err(mysql::FromValueError(mysql::Value::NULL)) => Ok(None),
Err(mysql::FromValueError(v)) => Err(ConnectorError::from(mysql::Error::FromValueError(v)))
}
}
}
)+
Expand Down
Loading

0 comments on commit 6953078

Please sign in to comment.