From ad6541539461db433657c594b19178d616ad56d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alja=C5=BE=20Mur=20Er=C5=BEen?= Date: Wed, 21 Feb 2024 14:17:15 +0100 Subject: [PATCH] feat: postgres uint6 --- connector_arrow/src/postgres/append.rs | 11 +++++++++-- connector_arrow/src/postgres/mod.rs | 2 +- connector_arrow/src/postgres/types.rs | 2 +- connector_arrow/tests/it/spec.rs | 8 ++++---- 4 files changed, 15 insertions(+), 8 deletions(-) diff --git a/connector_arrow/src/postgres/append.rs b/connector_arrow/src/postgres/append.rs index b1936a4..12d1e05 100644 --- a/connector_arrow/src/postgres/append.rs +++ b/connector_arrow/src/postgres/append.rs @@ -185,7 +185,6 @@ impl_consume_ty!(Int64Type, postgres_proto::int8_to_sql); impl_consume_ty!(UInt8Type, postgres_proto::int2_to_sql, i16::from); impl_consume_ty!(UInt16Type, postgres_proto::int4_to_sql, i32::from); impl_consume_ty!(UInt32Type, postgres_proto::int8_to_sql, i64::from); -// impl_consume_ty!(UInt64Type, ); impl_consume_ty!(Float16Type, postgres_proto::float4_to_sql, f32::from); impl_consume_ty!(Float32Type, postgres_proto::float4_to_sql); impl_consume_ty!(Float64Type, postgres_proto::float8_to_sql); @@ -212,6 +211,15 @@ impl_consume_ref_ty!(FixedSizeBinaryType, postgres_proto::bytea_to_sql); impl_consume_ref_ty!(Utf8Type, postgres_proto::text_to_sql); impl_consume_ref_ty!(LargeUtf8Type, postgres_proto::text_to_sql); +impl ConsumeTy for BytesMut { + fn consume(&mut self, _ty: &DataType, value: u64) { + // this is inefficient, we'd need a special u64_to_sql function + super::decimal::i128_to_sql(value as i128, 0, self) + } + + fn consume_null(&mut self) {} +} + impl ConsumeTy for BytesMut { fn consume(&mut self, ty: &DataType, value: i128) { let DataType::Decimal128(_, scale) = ty else { @@ -239,7 +247,6 @@ impl ConsumeTy for BytesMut { impl_consume_unsupported!( BytesMut, ( - UInt64Type, TimestampSecondType, TimestampMillisecondType, TimestampNanosecondType, diff --git a/connector_arrow/src/postgres/mod.rs b/connector_arrow/src/postgres/mod.rs index 06fcce6..42175eb 100644 --- a/connector_arrow/src/postgres/mod.rs +++ b/connector_arrow/src/postgres/mod.rs @@ -114,7 +114,7 @@ where DataType::UInt8 => Some(DataType::Int16), DataType::UInt16 => Some(DataType::Int32), DataType::UInt32 => Some(DataType::Int64), - DataType::UInt64 => Some(DataType::Decimal128(20, 0)), + DataType::UInt64 => Some(DataType::Utf8), DataType::Float16 => Some(DataType::Float32), DataType::Utf8 => Some(DataType::LargeUtf8), DataType::Decimal128(_, _) => Some(DataType::Utf8), diff --git a/connector_arrow/src/postgres/types.rs b/connector_arrow/src/postgres/types.rs index ec31939..1ad2d4b 100644 --- a/connector_arrow/src/postgres/types.rs +++ b/connector_arrow/src/postgres/types.rs @@ -63,7 +63,7 @@ pub(crate) fn arrow_ty_to_pg(data_type: &ArrowType) -> String { ArrowType::UInt8 => "INT2".into(), ArrowType::UInt16 => "INT4".into(), ArrowType::UInt32 => "INT8".into(), - // ArrowType::UInt64 => "DECIMAL(20, 0)".into(), + ArrowType::UInt64 => "NUMERIC(20, 0)".into(), ArrowType::Float16 => "FLOAT4".into(), ArrowType::Float32 => "FLOAT4".into(), ArrowType::Float64 => "FLOAT8".into(), diff --git a/connector_arrow/tests/it/spec.rs b/connector_arrow/tests/it/spec.rs index 22cf1a4..57edba0 100644 --- a/connector_arrow/tests/it/spec.rs +++ b/connector_arrow/tests/it/spec.rs @@ -44,7 +44,7 @@ pub fn all_types() -> Vec { DataType::UInt8, DataType::UInt16, DataType::UInt32, - // DataType::UInt64, + DataType::UInt64, DataType::Float16, DataType::Float32, DataType::Float64, @@ -108,7 +108,7 @@ pub fn uint() -> Vec { DataType::UInt8, DataType::UInt16, DataType::UInt32, - // DataType::UInt64, + DataType::UInt64, ], &[false, true], &VALUE_GEN_PROCESS_ALL, @@ -127,8 +127,8 @@ pub fn decimal() -> Vec { domains_to_batch_spec( &[ DataType::Decimal128(15, 4), - // DataType::Decimal128(Decimal128Type::MAX_PRECISION, 0), - // DataType::Decimal128(Decimal128Type::MAX_PRECISION, Decimal128Type::MAX_SCALE), + DataType::Decimal128(Decimal128Type::MAX_PRECISION, 0), + DataType::Decimal128(Decimal128Type::MAX_PRECISION, Decimal128Type::MAX_SCALE), DataType::Decimal256(45, 12), DataType::Decimal256(Decimal256Type::MAX_PRECISION, 0), DataType::Decimal256(Decimal256Type::MAX_PRECISION, Decimal256Type::MAX_SCALE),