From 670dbe220889c4f43fa9df0df9340eec66be6c90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alja=C5=BE=20Mur=20Er=C5=BEen?= Date: Thu, 21 Mar 2024 19:58:38 +0100 Subject: [PATCH] refactor: mysql --- connector_arrow/src/mysql/mod.rs | 174 +---------------------------- connector_arrow/src/mysql/query.rs | 170 ++++++++++++++++++++++++++++ 2 files changed, 175 insertions(+), 169 deletions(-) create mode 100644 connector_arrow/src/mysql/query.rs diff --git a/connector_arrow/src/mysql/mod.rs b/connector_arrow/src/mysql/mod.rs index 43f0d1a..1eac4da 100644 --- a/connector_arrow/src/mysql/mod.rs +++ b/connector_arrow/src/mysql/mod.rs @@ -1,14 +1,10 @@ +mod query; mod types; -use arrow::{datatypes::*, record_batch::RecordBatch}; +use arrow::datatypes::*; use mysql::prelude::*; -use pac_cell::PacCell; -use crate::api::{unimplemented, Connector, ResultReader, Statement}; -use crate::impl_produce_unsupported; -use crate::types::{ArrowType, FixedSizeBinaryType, NullType}; -use crate::util::transport::ProduceTy; -use crate::util::{self, transport::Produce}; +use crate::api::{unimplemented, Connector}; use crate::ConnectorError; pub struct MySQLConnection { @@ -25,19 +21,14 @@ impl MySQLConnection { } } -pub struct MySQLStatement<'conn, C: Queryable> { - stmt: mysql::Statement, - conn: &'conn mut C, -} - impl Connector for MySQLConnection { - type Stmt<'conn> = MySQLStatement<'conn, C> where Self: 'conn; + type Stmt<'conn> = query::MySQLStatement<'conn, C> where Self: 'conn; type Append<'conn> = unimplemented::Appender where Self: 'conn; fn query<'a>(&'a mut self, query: &str) -> Result, ConnectorError> { let stmt = self.conn.prep(query)?; - Ok(MySQLStatement { + Ok(query::MySQLStatement { conn: &mut self.conn, stmt, }) @@ -77,158 +68,3 @@ impl Connector for MySQLConnection { None } } - -impl<'conn, C: Queryable> Statement<'conn> for MySQLStatement<'conn, C> { - type Reader<'stmt> = MySQLQueryResult<'stmt> - where - Self: 'stmt; - - fn start<'p, I>(&mut self, _params: I) -> Result, ConnectorError> - where - I: IntoIterator, - { - // TODO: params - - let query_result = self.conn.exec_iter(&self.stmt, ())?; - - // PacCell is needed so we can return query_result and result_set that mutably borrows query result. - let pac = PacCell::try_new(query_result, |qr| -> Result<_, ConnectorError> { - let result_set = qr.iter().ok_or(ConnectorError::NoResultSets)?; - let schema = types::get_result_schema(&result_set)?; - Ok(MySQLResultReader { result_set, schema }) - })?; - Ok(MySQLQueryResult(pac)) - } -} - -pub struct MySQLQueryResult<'stmt>( - PacCell< - mysql::QueryResult<'stmt, 'stmt, 'stmt, mysql::Binary>, // parent - MySQLResultReader<'stmt>, // child - >, -); - -impl<'stmt> ResultReader<'stmt> for MySQLQueryResult<'stmt> { - fn get_schema(&mut self) -> Result { - Ok(self.0.with_mut(|x| x.schema.clone())) - } -} - -impl<'stmt> Iterator for MySQLQueryResult<'stmt> { - type Item = Result; - - fn next(&mut self) -> Option { - self.0.with_mut(|reader| { - let schema = reader.schema.clone(); - util::next_batch_from_rows(&schema, reader, 1024).transpose() - }) - } -} - -struct MySQLResultReader<'stmt> { - result_set: mysql::ResultSet<'stmt, 'stmt, 'stmt, 'stmt, mysql::Binary>, - schema: SchemaRef, -} - -impl<'s> util::RowsReader<'s> for MySQLResultReader<'s> { - type CellReader<'row> = MySQLCellReader - where - Self: 'row; - - fn next_row(&mut self) -> Result>, ConnectorError> { - let row = self.result_set.next().transpose()?; - Ok(row.map(|row| MySQLCellReader { row, cell: 0 })) - } -} - -struct MySQLCellReader { - row: mysql::Row, - cell: usize, -} - -impl<'a> util::CellReader<'a> for MySQLCellReader { - type CellRef<'cell> = MySQLCellRef<'cell> - where - Self: 'cell; - - fn next_cell(&mut self) -> Option> { - let r = MySQLCellRef { - row: &mut self.row, - cell: self.cell, - }; - self.cell += 1; - Some(r) - } -} - -#[derive(Debug)] -struct MySQLCellRef<'a> { - row: &'a mut mysql::Row, - cell: usize, -} - -impl<'r> Produce<'r> for MySQLCellRef<'r> {} - -macro_rules! impl_produce_ty { - ($p: ty, ($($t: ty,)+)) => { - $( - impl<'r> ProduceTy<'r, $t> for $p { - fn produce(self) -> Result<<$t as ArrowType>::Native, ConnectorError> { - Ok(self.row.take(self.cell).unwrap()) - } - fn produce_opt(self) -> Result::Native>, ConnectorError> { - Ok(self.row.take(self.cell)) - } - } - )+ - }; -} - -impl_produce_ty!( - MySQLCellRef<'r>, - ( - BooleanType, - Int8Type, - Int16Type, - Int32Type, - Int64Type, - UInt8Type, - UInt16Type, - UInt32Type, - UInt64Type, - Float32Type, - Float64Type, - Utf8Type, - BinaryType, - ) -); - -impl_produce_unsupported!( - MySQLCellRef<'r>, - ( - NullType, - Float16Type, - TimestampSecondType, - TimestampMillisecondType, - TimestampMicrosecondType, - TimestampNanosecondType, - Date32Type, - Date64Type, - Time32SecondType, - Time32MillisecondType, - Time64MicrosecondType, - Time64NanosecondType, - IntervalYearMonthType, - IntervalDayTimeType, - IntervalMonthDayNanoType, - DurationSecondType, - DurationMillisecondType, - DurationMicrosecondType, - DurationNanosecondType, - LargeUtf8Type, - LargeBinaryType, - FixedSizeBinaryType, - Decimal128Type, - Decimal256Type, - ) -); diff --git a/connector_arrow/src/mysql/query.rs b/connector_arrow/src/mysql/query.rs new file mode 100644 index 0000000..6370e79 --- /dev/null +++ b/connector_arrow/src/mysql/query.rs @@ -0,0 +1,170 @@ +use arrow::{datatypes::*, record_batch::RecordBatch}; +use mysql::prelude::*; +use pac_cell::PacCell; + +use crate::api::{ResultReader, Statement}; +use crate::impl_produce_unsupported; +use crate::types::{ArrowType, FixedSizeBinaryType, NullType}; +use crate::util::transport::ProduceTy; +use crate::util::{self, transport::Produce}; +use crate::ConnectorError; + +pub struct MySQLStatement<'conn, C: Queryable> { + pub(super) stmt: mysql::Statement, + pub(super) conn: &'conn mut C, +} + +impl<'conn, C: Queryable> Statement<'conn> for MySQLStatement<'conn, C> { + type Reader<'stmt> = MySQLQueryResult<'stmt> + where + Self: 'stmt; + + fn start<'p, I>(&mut self, _params: I) -> Result, ConnectorError> + where + I: IntoIterator, + { + // TODO: params + + let query_result = self.conn.exec_iter(&self.stmt, ())?; + + // PacCell is needed so we can return query_result and result_set that mutably borrows query result. + let pac = PacCell::try_new(query_result, |qr| -> Result<_, ConnectorError> { + let result_set = qr.iter().ok_or(ConnectorError::NoResultSets)?; + let schema = super::types::get_result_schema(&result_set)?; + Ok(MySQLResultReader { result_set, schema }) + })?; + Ok(MySQLQueryResult(pac)) + } +} + +pub struct MySQLQueryResult<'stmt>( + PacCell< + mysql::QueryResult<'stmt, 'stmt, 'stmt, mysql::Binary>, // parent + MySQLResultReader<'stmt>, // child + >, +); + +impl<'stmt> ResultReader<'stmt> for MySQLQueryResult<'stmt> { + fn get_schema(&mut self) -> Result { + Ok(self.0.with_mut(|x| x.schema.clone())) + } +} + +impl<'stmt> Iterator for MySQLQueryResult<'stmt> { + type Item = Result; + + fn next(&mut self) -> Option { + self.0.with_mut(|reader| { + let schema = reader.schema.clone(); + util::next_batch_from_rows(&schema, reader, 1024).transpose() + }) + } +} + +struct MySQLResultReader<'stmt> { + result_set: mysql::ResultSet<'stmt, 'stmt, 'stmt, 'stmt, mysql::Binary>, + schema: SchemaRef, +} + +impl<'s> util::RowsReader<'s> for MySQLResultReader<'s> { + type CellReader<'row> = MySQLCellReader + where + Self: 'row; + + fn next_row(&mut self) -> Result>, ConnectorError> { + let row = self.result_set.next().transpose()?; + Ok(row.map(|row| MySQLCellReader { row, cell: 0 })) + } +} + +struct MySQLCellReader { + row: mysql::Row, + cell: usize, +} + +impl<'a> util::CellReader<'a> for MySQLCellReader { + type CellRef<'cell> = MySQLCellRef<'cell> + where + Self: 'cell; + + fn next_cell(&mut self) -> Option> { + let r = MySQLCellRef { + row: &mut self.row, + cell: self.cell, + }; + self.cell += 1; + Some(r) + } +} + +#[derive(Debug)] +struct MySQLCellRef<'a> { + row: &'a mut mysql::Row, + cell: usize, +} + +impl<'r> Produce<'r> for MySQLCellRef<'r> {} + +macro_rules! impl_produce_ty { + ($p: ty, ($($t: ty,)+)) => { + $( + impl<'r> ProduceTy<'r, $t> for $p { + fn produce(self) -> Result<<$t as ArrowType>::Native, ConnectorError> { + Ok(self.row.take(self.cell).unwrap()) + } + fn produce_opt(self) -> Result::Native>, ConnectorError> { + Ok(self.row.take(self.cell)) + } + } + )+ + }; +} + +impl_produce_ty!( + MySQLCellRef<'r>, + ( + BooleanType, + Int8Type, + Int16Type, + Int32Type, + Int64Type, + UInt8Type, + UInt16Type, + UInt32Type, + UInt64Type, + Float32Type, + Float64Type, + Utf8Type, + BinaryType, + ) +); + +impl_produce_unsupported!( + MySQLCellRef<'r>, + ( + NullType, + Float16Type, + TimestampSecondType, + TimestampMillisecondType, + TimestampMicrosecondType, + TimestampNanosecondType, + Date32Type, + Date64Type, + Time32SecondType, + Time32MillisecondType, + Time64MicrosecondType, + Time64NanosecondType, + IntervalYearMonthType, + IntervalDayTimeType, + IntervalMonthDayNanoType, + DurationSecondType, + DurationMillisecondType, + DurationMicrosecondType, + DurationNanosecondType, + LargeUtf8Type, + LargeBinaryType, + FixedSizeBinaryType, + Decimal128Type, + Decimal256Type, + ) +);