From 10ad5afdde0beaada44fcc7a4da5fa0e8fa74f63 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Fri, 22 Mar 2024 10:34:02 +0000 Subject: [PATCH] Use flatbuffers to serialize dtypes (#126) --- Cargo.lock | 23 +- vortex-array/src/serde/mod.rs | 18 +- vortex-schema/Cargo.toml | 7 +- vortex-schema/build.rs | 50 ++ vortex-schema/flatbuffers/schema.fbs | 94 ++++ vortex-schema/src/error.rs | 25 +- vortex-schema/src/lib.rs | 16 +- vortex-schema/src/serde.rs | 655 ++++++++++++++------------- 8 files changed, 532 insertions(+), 356 deletions(-) create mode 100644 vortex-schema/build.rs create mode 100644 vortex-schema/flatbuffers/schema.fbs diff --git a/Cargo.lock b/Cargo.lock index b8daebb110..137d10e169 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -552,6 +552,15 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "98cc8fbded0c607b7ba9dd60cd98df59af97e84d24e49c8557331cfc26d301ce" +[[package]] +name = "cmake" +version = "0.1.50" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a31c789563b815f77f4250caee12365734369f942439b7defd71e18a48197130" +dependencies = [ + "cc", +] + [[package]] name = "condtype" version = "1.3.0" @@ -809,6 +818,15 @@ dependencies = [ "rustc_version", ] +[[package]] +name = "flatc" +version = "0.2.2+23.5.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4bba3050358223ece3d79720f9800139503b600e93b1d68493500a2d8c3721e" +dependencies = [ + "cmake", +] + [[package]] name = "flate2" version = "1.0.28" @@ -2652,10 +2670,11 @@ name = "vortex-schema" version = "0.1.0" dependencies = [ "arrow-schema", + "flatbuffers", + "flatc", "itertools 0.12.1", - "leb128", - "num_enum", "thiserror", + "walkdir", ] [[package]] diff --git a/vortex-array/src/serde/mod.rs b/vortex-array/src/serde/mod.rs index 68c811ff4f..77b12f0fcc 100644 --- a/vortex-array/src/serde/mod.rs +++ b/vortex-array/src/serde/mod.rs @@ -4,7 +4,7 @@ use std::io::{ErrorKind, Read, Write}; use arrow_buffer::buffer::{Buffer, MutableBuffer}; use vortex_schema::{ - DType, DTypeReader, DTypeWriter, IntWidth, Nullability, SchemaError, Signedness, + DType, FbDeserialize, FbSerialize, IntWidth, Nullability, SchemaError, Signedness, }; use crate::array::composite::find_extension_id; @@ -82,12 +82,10 @@ impl<'a> ReadCtx<'a> { #[inline] pub fn dtype(&mut self) -> VortexResult { - DTypeReader::new(self.r) - .read(find_extension_id) - .map_err(|e| match e { - SchemaError::InvalidArgument(s) => VortexError::InvalidArgument(s), - SchemaError::IOError(io_err) => io_err.0.into(), - }) + let dtype_bytes = self.read_slice()?; + DType::deserialize(&dtype_bytes, find_extension_id).map_err(|e| match e { + SchemaError::InvalidArgument(s) => VortexError::InvalidArgument(s), + }) } pub fn ptype(&mut self) -> VortexResult { @@ -182,10 +180,8 @@ impl<'a> WriteCtx<'a> { } pub fn dtype(&mut self, dtype: &DType) -> VortexResult<()> { - DTypeWriter::new(self.w).write(dtype).map_err(|e| match e { - SchemaError::InvalidArgument(s) => VortexError::InvalidArgument(s), - SchemaError::IOError(io_err) => io_err.0.into(), - }) + let (bytes, head) = dtype.serialize(); + self.write_slice(&bytes[head..]) } pub fn ptype(&mut self, ptype: PType) -> VortexResult<()> { diff --git a/vortex-schema/Cargo.toml b/vortex-schema/Cargo.toml index 61cfdb1ab4..5387537be5 100644 --- a/vortex-schema/Cargo.toml +++ b/vortex-schema/Cargo.toml @@ -18,9 +18,12 @@ path = "src/lib.rs" [dependencies] arrow-schema = "50.0.0" itertools = "0.12.1" -leb128 = "0.2.5" -num_enum = "0.7.2" thiserror = "1.0.58" +flatbuffers = "23.5.26" + +[build-dependencies] +flatc = "0.2.2" +walkdir = "2.4.0" [lints] workspace = true diff --git a/vortex-schema/build.rs b/vortex-schema/build.rs new file mode 100644 index 0000000000..cee1d21652 --- /dev/null +++ b/vortex-schema/build.rs @@ -0,0 +1,50 @@ +use std::env; +use std::ffi::OsStr; +use std::path::{Path, PathBuf}; +use std::process::Command; + +use flatc::flatc; +use walkdir::WalkDir; + +fn main() { + let flatbuffers_dir = PathBuf::from(env::var("CARGO_MANIFEST_DIR").unwrap()) + .canonicalize() + .expect("Failed to canonicalize CARGO_MANIFEST_DIR") + .join("flatbuffers"); + let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap()) + .canonicalize() + .expect("Failed to canonicalize OUT_DIR"); + + let fbs_files = WalkDir::new(flatbuffers_dir) + .into_iter() + .filter_map(|e| e.ok()) + .filter(|e| e.path().extension() == Some(OsStr::new("fbs"))) + .map(|e| { + rerun_if_changed(e.path()); + e.path().to_path_buf() + }) + .collect::>(); + + if !Command::new(flatc()) + .args(["--filename-suffix", ""]) + .arg("--rust") + .arg("-o") + .arg(out_dir.join("flatbuffers")) + .args(fbs_files) + .status() + .unwrap() + .success() + { + panic!("Failed to run flatc"); + } +} + +fn rerun_if_changed(path: &Path) { + println!( + "cargo:rerun-if-changed={}", + path.canonicalize() + .unwrap_or_else(|_| panic!("failed to canonicalize {}", path.to_str().unwrap())) + .to_str() + .unwrap() + ); +} diff --git a/vortex-schema/flatbuffers/schema.fbs b/vortex-schema/flatbuffers/schema.fbs new file mode 100644 index 0000000000..a4fc42865c --- /dev/null +++ b/vortex-schema/flatbuffers/schema.fbs @@ -0,0 +1,94 @@ +enum Nullability: byte { + NonNullable, + Nullable, +} + +enum Signedness: byte { + Signed, + Unsigned, + Unknown, +} + +enum IntWidth: byte { + Unknown, + _8, + _16, + _32, + _64, +} + +enum FloatWidth: byte { + Unknown, + _16, + _32, + _64, +} + +table Null {} + +table Bool { + nullability: Nullability; +} + +table Int { + width: IntWidth = Unknown; + signedness: Signedness = Unknown; + nullability: Nullability; +} + +table Decimal { + /// Total number of decimal digits + precision: ubyte; + + /// Number of digits after the decimal point "." + scale: byte; + nullability: Nullability; +} + +table Float { + width: FloatWidth = Unknown; + nullability: Nullability; +} + +table Utf8 { + nullability: Nullability; + +} + +table Binary { + nullability: Nullability; +} + +table Struct_ { + names: [string]; + fields: [DType]; +} + +table List { + element_type: DType; + nullability: Nullability; +} + +table Composite { + id: string; + nullability: Nullability; +} + +union Type { + Null, + Bool, + Int, + Decimal, + Float, + Utf8, + Binary, + Struct_, + List, + Composite, +} + +table DType { + type: Type; +} + +root_type DType; \ No newline at end of file diff --git a/vortex-schema/src/error.rs b/vortex-schema/src/error.rs index d404e5dc3b..3f0ab1df84 100644 --- a/vortex-schema/src/error.rs +++ b/vortex-schema/src/error.rs @@ -1,7 +1,7 @@ use std::borrow::Cow; use std::fmt::{Display, Formatter}; use std::ops::Deref; -use std::{env, fmt, io}; +use std::{env, fmt}; #[derive(Debug, PartialEq)] pub struct ErrString(Cow<'static, str>); @@ -43,29 +43,6 @@ impl Display for ErrString { pub enum SchemaError { #[error("{0}")] InvalidArgument(ErrString), - #[error("io error: {0:?}")] - IOError(IOError), } pub type SchemaResult = Result; - -macro_rules! wrapped_error { - ($E:ty, $e:ident) => { - #[derive(Debug)] - pub struct $e(pub $E); - - impl PartialEq for $e { - fn eq(&self, _other: &Self) -> bool { - false - } - } - - impl From<$E> for SchemaError { - fn from(err: $E) -> Self { - SchemaError::$e($e(err)) - } - } - }; -} - -wrapped_error!(io::Error, IOError); diff --git a/vortex-schema/src/lib.rs b/vortex-schema/src/lib.rs index 7a3494f440..9f19c605c8 100644 --- a/vortex-schema/src/lib.rs +++ b/vortex-schema/src/lib.rs @@ -1,10 +1,11 @@ +use std::fmt::{Display, Formatter}; + pub use dtype::*; pub use error::ErrString; pub use error::SchemaError; pub use error::SchemaResult; -pub use serde::DTypeReader; -pub use serde::DTypeWriter; -use std::fmt::{Display, Formatter}; +pub use serde::FbDeserialize; +pub use serde::FbSerialize; mod dtype; mod error; @@ -18,3 +19,12 @@ impl Display for CompositeID { write!(f, "{}", self.0) } } + +#[allow(unused_imports)] +#[allow(dead_code)] +#[allow(clippy::needless_lifetimes)] +#[allow(clippy::extra_unused_lifetimes)] +#[allow(non_camel_case_types)] +mod generated { + include!(concat!(env!("OUT_DIR"), "/flatbuffers/schema.rs")); +} diff --git a/vortex-schema/src/serde.rs b/vortex-schema/src/serde.rs index ea083d8b72..89910bbfd1 100644 --- a/vortex-schema/src/serde.rs +++ b/vortex-schema/src/serde.rs @@ -1,393 +1,420 @@ -use std::io::{Read, Write}; use std::sync::Arc; -use leb128::read::Error; -use num_enum::{IntoPrimitive, TryFromPrimitive}; - -use crate::dtype::DType::*; -use crate::dtype::{DType, FloatWidth, IntWidth, Nullability, Signedness}; -use crate::error::{SchemaError, SchemaResult}; -use crate::CompositeID; +use flatbuffers::{FlatBufferBuilder, WIPOffset}; + +use crate::generated::{ + root_as_dtype, Bool, BoolArgs, Composite, CompositeArgs, Int, IntArgs, List, ListArgs, Null, + NullArgs, Struct_, Struct_Args, Type, +}; +use crate::generated::{Binary, BinaryArgs, Signedness as FbSignedness}; +use crate::generated::{DType as FbDType, DTypeArgs}; +use crate::generated::{Decimal, DecimalArgs, FloatWidth as FbFloatWidth}; +use crate::generated::{Float, FloatArgs, IntWidth as FbIntWidth}; +use crate::generated::{Nullability as FbNullability, Utf8, Utf8Args}; +use crate::{ + CompositeID, DType, FloatWidth, IntWidth, Nullability, SchemaError, SchemaResult, Signedness, +}; + +pub trait FbSerialize<'a> { + type OffsetType; + + // Convert self to flatbuffer representation, returns written bytes and index of valid data + // If you want to serialize multiple objects you should prefer serialize_to_builder to reuse the allocated memory + fn serialize(&self) -> (Vec, usize) { + let mut fbb = FlatBufferBuilder::new(); + let wip_dtype = self.serialize_to_builder(&mut fbb); + fbb.finish_minimal(wip_dtype); + fbb.collapse() + } -pub struct DTypeReader<'a> { - reader: &'a mut dyn Read, + fn serialize_to_builder(&self, fbb: &mut FlatBufferBuilder<'a>) -> WIPOffset; } -impl<'a> DTypeReader<'a> { - pub fn new(reader: &'a mut dyn Read) -> Self { - Self { reader } - } - - fn read_nbytes(&mut self) -> SchemaResult<[u8; N]> { - let mut bytes: [u8; N] = [0; N]; - self.reader - .read_exact(&mut bytes) - .map_err(SchemaError::from)?; - Ok(bytes) - } +pub trait FbDeserialize<'a>: Sized { + type OffsetType; - fn read_usize(&mut self) -> SchemaResult { - leb128::read::unsigned(self.reader) - .map_err(|e| match e { - Error::IoError(io_err) => io_err.into(), - Error::Overflow => SchemaError::InvalidArgument("overflow".into()), - }) - .map(|u| u as usize) - } + fn deserialize(bytes: &[u8], find_id: fn(&str) -> Option) -> SchemaResult; - fn read_slice(&mut self) -> SchemaResult> { - let len = self.read_usize()?; - let mut slice = Vec::with_capacity(len); - self.reader - .take(len as u64) - .read_to_end(&mut slice) - .map_err(SchemaError::from)?; - Ok(slice) - } + fn convert_from_fb( + fb_type: Self::OffsetType, + find_id: fn(&str) -> Option, + ) -> SchemaResult; +} - pub fn read(&mut self, find_extension: fn(&str) -> Option) -> SchemaResult { - let dtype = DTypeTag::try_from(self.read_nbytes::<1>()?[0]) - .map_err(|_| SchemaError::InvalidArgument("Failed to parse dtype tag".into()))?; - match dtype { - DTypeTag::Null => Ok(Null), - DTypeTag::Bool => Ok(Bool(self.read_nullability()?)), - DTypeTag::Int => { - let nullability = self.read_nullability()?; - Ok(Int( - self.read_int_width()?, - self.read_signedness()?, - nullability, - )) - } - DTypeTag::Float => { - let nullability = self.read_nullability()?; - Ok(Float(self.read_float_width()?, nullability)) - } - DTypeTag::Utf8 => Ok(Utf8(self.read_nullability()?)), - DTypeTag::Binary => Ok(Binary(self.read_nullability()?)), - DTypeTag::Decimal => { - let nullability = self.read_nullability()?; - let precision_scale: [u8; 2] = self.read_nbytes()?; - Ok(Decimal( - precision_scale[0], - precision_scale[1] as i8, - nullability, - )) - } - DTypeTag::List => { - let nullability = self.read_nullability()?; - Ok(List(Box::new(self.read(find_extension)?), nullability)) - } - DTypeTag::Struct => { - let field_num = self.read_usize()?; - let mut names = Vec::with_capacity(field_num); - for _ in 0..field_num { - let name = unsafe { String::from_utf8_unchecked(self.read_slice()?) }; - names.push(Arc::new(name)); +impl<'a> FbSerialize<'a> for DType { + type OffsetType = FbDType<'a>; + + fn serialize_to_builder(&self, fbb: &mut FlatBufferBuilder<'a>) -> WIPOffset { + let (dtype_union, dtype_union_variant) = match self { + DType::Null => (Null::create(fbb, &NullArgs {}).as_union_value(), Type::Null), + DType::Bool(n) => ( + Bool::create( + fbb, + &BoolArgs { + nullability: n.into(), + }, + ) + .as_union_value(), + Type::Bool, + ), + DType::Int(w, s, n) => ( + Int::create( + fbb, + &IntArgs { + width: w.into(), + signedness: s.into(), + nullability: n.into(), + }, + ) + .as_union_value(), + Type::Int, + ), + DType::Decimal(p, s, n) => ( + Decimal::create( + fbb, + &DecimalArgs { + precision: *p, + scale: *s, + nullability: n.into(), + }, + ) + .as_union_value(), + Type::Decimal, + ), + DType::Float(w, n) => ( + Float::create( + fbb, + &FloatArgs { + width: w.into(), + nullability: n.into(), + }, + ) + .as_union_value(), + Type::Float, + ), + DType::Utf8(n) => ( + Utf8::create( + fbb, + &Utf8Args { + nullability: n.into(), + }, + ) + .as_union_value(), + Type::Utf8, + ), + DType::Binary(n) => ( + Binary::create( + fbb, + &BinaryArgs { + nullability: n.into(), + }, + ) + .as_union_value(), + Type::Binary, + ), + DType::Struct(ns, fs) => { + let name_offsets = ns + .iter() + .map(|n| fbb.create_string(n.as_ref())) + .collect::>(); + fbb.start_vector::>(ns.len()); + for name in name_offsets.iter().rev() { + fbb.push(name); } - - let mut fields = Vec::with_capacity(field_num); - for _ in 0..field_num { - fields.push(self.read(find_extension)?); + let names_vector = fbb.end_vector(ns.len()); + + let dtype_offsets = fs + .iter() + .map(|f| f.serialize_to_builder(fbb)) + .collect::>(); + fbb.start_vector::>(fs.len()); + for doff in dtype_offsets.iter().rev() { + fbb.push(doff); } - Ok(Struct(names, fields)) + let fields_vector = fbb.end_vector(fs.len()); + + ( + Struct_::create( + fbb, + &Struct_Args { + names: Some(names_vector), + fields: Some(fields_vector), + }, + ) + .as_union_value(), + Type::Struct_, + ) } - DTypeTag::Composite => { - let nullability = self.read_nullability()?; - let id = unsafe { String::from_utf8_unchecked(self.read_slice()?) }; - let extension_id = find_extension(id.as_str()).ok_or( - SchemaError::InvalidArgument("Failed to find extension".into()), - )?; - Ok(Composite(extension_id, nullability)) + DType::List(e, n) => { + let fb_dtype = e.as_ref().serialize_to_builder(fbb); + ( + List::create( + fbb, + &ListArgs { + element_type: Some(fb_dtype), + nullability: n.into(), + }, + ) + .as_union_value(), + Type::List, + ) } - } - } - - fn read_signedness(&mut self) -> SchemaResult { - SignednessTag::try_from(self.read_nbytes::<1>()?[0]) - .map_err(|_| SchemaError::InvalidArgument("Failed to parse signedness tag".into())) - .map(Signedness::from) - } - - fn read_nullability(&mut self) -> SchemaResult { - NullabilityTag::try_from(self.read_nbytes::<1>()?[0]) - .map_err(|_| SchemaError::InvalidArgument("Failed to parse nullability tag".into())) - .map(Nullability::from) - } - - fn read_int_width(&mut self) -> SchemaResult { - IntWidthTag::try_from(self.read_nbytes::<1>()?[0]) - .map_err(|_| SchemaError::InvalidArgument("Failed to parse int width tag".into())) - .map(IntWidth::from) - } - - fn read_float_width(&mut self) -> SchemaResult { - FloatWidthTag::try_from(self.read_nbytes::<1>()?[0]) - .map_err(|_| SchemaError::InvalidArgument("Failed to parse float width tag".into())) - .map(FloatWidth::from) + DType::Composite(id, n) => { + let id = fbb.create_string(id.0); + ( + Composite::create( + fbb, + &CompositeArgs { + id: Some(id), + nullability: n.into(), + }, + ) + .as_union_value(), + Type::Composite, + ) + } + }; + + FbDType::create( + fbb, + &DTypeArgs { + type_type: dtype_union_variant, + type_: Some(dtype_union), + }, + ) } } -pub struct DTypeWriter<'a> { - writer: &'a mut dyn Write, -} +impl<'a> FbDeserialize<'a> for DType { + type OffsetType = FbDType<'a>; -impl<'a> DTypeWriter<'a> { - pub fn new(writer: &'a mut dyn Write) -> Self { - Self { writer } + fn deserialize(bytes: &[u8], find_id: fn(&str) -> Option) -> SchemaResult { + root_as_dtype(bytes) + .map_err(|e| { + SchemaError::InvalidArgument(format!("Unable to read bytes as DType: {}", e).into()) + }) + .and_then(|d| Self::convert_from_fb(d, find_id)) } - pub fn write(&mut self, dtype: &DType) -> SchemaResult<()> { - self.write_fixed_slice([DTypeTag::from(dtype).into()])?; - match dtype { - Null => {} - Bool(n) => self.write_nullability(*n)?, - Int(w, s, n) => { - self.write_nullability(*n)?; - self.write_int_width(*w)?; - self.write_signedness(*s)? + fn convert_from_fb( + fb_type: Self::OffsetType, + find_id: fn(&str) -> Option, + ) -> SchemaResult { + match fb_type.type_type() { + Type::Null => Ok(DType::Null), + Type::Bool => Ok(DType::Bool( + fb_type.type__as_bool().unwrap().nullability().try_into()?, + )), + Type::Int => { + let fb_int = fb_type.type__as_int().unwrap(); + Ok(DType::Int( + fb_int.width().try_into()?, + fb_int.signedness().try_into()?, + fb_int.nullability().try_into()?, + )) } - Decimal(p, w, n) => { - self.write_nullability(*n)?; - self.write_fixed_slice([*p, *w as u8])? + Type::Float => { + let fb_float = fb_type.type__as_float().unwrap(); + Ok(DType::Float( + fb_float.width().try_into()?, + fb_float.nullability().try_into()?, + )) } - Float(w, n) => { - self.write_nullability(*n)?; - self.write_float_width(*w)? + Type::Decimal => { + let fb_decimal = fb_type.type__as_decimal().unwrap(); + Ok(DType::Decimal( + fb_decimal.precision(), + fb_decimal.scale(), + fb_decimal.nullability().try_into()?, + )) } - Utf8(n) => self.write_nullability(*n)?, - Binary(n) => self.write_nullability(*n)?, - Struct(ns, fs) => { - self.write_usize(ns.len())?; - for name in ns { - self.write_slice(name.as_bytes())?; - } - for field in fs { - self.write(field)? - } + Type::Binary => Ok(DType::Binary( + fb_type + .type__as_binary() + .unwrap() + .nullability() + .try_into()?, + )), + Type::Utf8 => Ok(DType::Utf8( + fb_type.type__as_utf_8().unwrap().nullability().try_into()?, + )), + Type::List => { + let fb_list = fb_type.type__as_list().unwrap(); + let element_dtype = + DType::convert_from_fb(fb_list.element_type().unwrap(), find_id)?; + Ok(DType::List( + Box::new(element_dtype), + fb_list.nullability().try_into()?, + )) } - List(e, n) => { - self.write_nullability(*n)?; - self.write(e.as_ref())? + Type::Struct_ => { + let fb_struct = fb_type.type__as_struct_().unwrap(); + let names = fb_struct + .names() + .unwrap() + .iter() + .map(|n| Arc::new(n.to_string())) + .collect::>(); + let fields: Vec = fb_struct + .fields() + .unwrap() + .iter() + .map(|f| DType::convert_from_fb(f, find_id)) + .collect::>>()?; + Ok(DType::Struct(names, fields)) } - Composite(id, n) => { - self.write_nullability(*n)?; - self.write_slice(id.0.as_bytes())?; + Type::Composite => { + let fb_composite = fb_type.type__as_composite().unwrap(); + let id = find_id(fb_composite.id().unwrap()).ok_or_else(|| { + SchemaError::InvalidArgument("Couldn't find composite id".into()) + })?; + Ok(DType::Composite(id, fb_composite.nullability().try_into()?)) } + _ => Err(SchemaError::InvalidArgument("Unknown DType variant".into())), } - - Ok(()) - } - - fn write_usize(&mut self, u: usize) -> SchemaResult<()> { - leb128::write::unsigned(self.writer, u as u64) - .map_err(|_| SchemaError::InvalidArgument("Failed to write leb128 usize".into())) - .map(|_| ()) - } - - fn write_fixed_slice(&mut self, slice: [u8; N]) -> SchemaResult<()> { - self.writer.write_all(&slice).map_err(|e| e.into()) - } - - fn write_slice(&mut self, slice: &[u8]) -> SchemaResult<()> { - self.write_usize(slice.len())?; - self.writer.write_all(slice).map_err(|e| e.into()) - } - - fn write_signedness(&mut self, signedness: Signedness) -> SchemaResult<()> { - self.write_fixed_slice([SignednessTag::from(signedness).into()]) - } - - fn write_nullability(&mut self, nullability: Nullability) -> SchemaResult<()> { - self.write_fixed_slice([NullabilityTag::from(nullability).into()]) - } - - fn write_int_width(&mut self, int_width: IntWidth) -> SchemaResult<()> { - self.write_fixed_slice([IntWidthTag::from(int_width).into()]) } - - fn write_float_width(&mut self, float_width: FloatWidth) -> SchemaResult<()> { - self.write_fixed_slice([FloatWidthTag::from(float_width).into()]) - } -} - -#[derive(IntoPrimitive, TryFromPrimitive)] -#[repr(u8)] -enum DTypeTag { - Null, - Bool, - Int, - Float, - Utf8, - Binary, - Decimal, - List, - Struct, - Composite, } -impl From<&DType> for DTypeTag { - fn from(value: &DType) -> Self { +impl From<&Nullability> for FbNullability { + fn from(value: &Nullability) -> Self { match value { - Null => DTypeTag::Null, - Bool(_) => DTypeTag::Bool, - Int(_, _, _) => DTypeTag::Int, - Float(_, _) => DTypeTag::Float, - Utf8(_) => DTypeTag::Utf8, - Binary(_) => DTypeTag::Binary, - Decimal(_, _, _) => DTypeTag::Decimal, - List(_, _) => DTypeTag::List, - Struct(_, _) => DTypeTag::Struct, - Composite(_, _) => DTypeTag::Composite, + Nullability::NonNullable => FbNullability::NonNullable, + Nullability::Nullable => FbNullability::Nullable, } } } -#[derive(IntoPrimitive, TryFromPrimitive)] -#[repr(u8)] -enum NullabilityTag { - Nullable, - NonNullable, -} +impl TryFrom for Nullability { + type Error = SchemaError; -impl From for NullabilityTag { - fn from(value: Nullability) -> Self { - use Nullability::*; + fn try_from(value: FbNullability) -> SchemaResult { match value { - NonNullable => NullabilityTag::NonNullable, - Nullable => NullabilityTag::Nullable, + FbNullability::NonNullable => Ok(Nullability::NonNullable), + FbNullability::Nullable => Ok(Nullability::Nullable), + _ => Err(SchemaError::InvalidArgument( + "Unknown nullability value".into(), + )), } } } -impl From for Nullability { - fn from(value: NullabilityTag) -> Self { - use Nullability::*; +impl From<&IntWidth> for FbIntWidth { + fn from(value: &IntWidth) -> Self { match value { - NullabilityTag::Nullable => Nullable, - NullabilityTag::NonNullable => NonNullable, + IntWidth::Unknown => FbIntWidth::Unknown, + IntWidth::_8 => FbIntWidth::_8, + IntWidth::_16 => FbIntWidth::_16, + IntWidth::_32 => FbIntWidth::_32, + IntWidth::_64 => FbIntWidth::_64, } } } -#[derive(IntoPrimitive, TryFromPrimitive)] -#[repr(u8)] -enum SignednessTag { - Unknown, - Unsigned, - Signed, -} +impl TryFrom for IntWidth { + type Error = SchemaError; -impl From for SignednessTag { - fn from(value: Signedness) -> Self { - use Signedness::*; + fn try_from(value: FbIntWidth) -> SchemaResult { match value { - Unknown => SignednessTag::Unknown, - Unsigned => SignednessTag::Unsigned, - Signed => SignednessTag::Signed, + FbIntWidth::Unknown => Ok(IntWidth::Unknown), + FbIntWidth::_8 => Ok(IntWidth::_8), + FbIntWidth::_16 => Ok(IntWidth::_16), + FbIntWidth::_32 => Ok(IntWidth::_32), + FbIntWidth::_64 => Ok(IntWidth::_64), + _ => Err(SchemaError::InvalidArgument( + "Unknown IntWidth value".into(), + )), } } } -impl From for Signedness { - fn from(value: SignednessTag) -> Self { - use Signedness::*; +impl From<&Signedness> for FbSignedness { + fn from(value: &Signedness) -> Self { match value { - SignednessTag::Unknown => Unknown, - SignednessTag::Unsigned => Unsigned, - SignednessTag::Signed => Signed, + Signedness::Unknown => FbSignedness::Unknown, + Signedness::Unsigned => FbSignedness::Unsigned, + Signedness::Signed => FbSignedness::Signed, } } } -#[derive(IntoPrimitive, TryFromPrimitive)] -#[repr(u8)] -enum FloatWidthTag { - Unknown, - _16, - _32, - _64, -} +impl TryFrom for Signedness { + type Error = SchemaError; -#[allow(clippy::just_underscores_and_digits)] -impl From for FloatWidthTag { - fn from(value: FloatWidth) -> Self { - use FloatWidth::*; + fn try_from(value: FbSignedness) -> SchemaResult { match value { - Unknown => FloatWidthTag::Unknown, - _16 => FloatWidthTag::_16, - _32 => FloatWidthTag::_32, - _64 => FloatWidthTag::_64, + FbSignedness::Unknown => Ok(Signedness::Unknown), + FbSignedness::Unsigned => Ok(Signedness::Unsigned), + FbSignedness::Signed => Ok(Signedness::Signed), + _ => Err(SchemaError::InvalidArgument( + "Unknown Signedness value".into(), + )), } } } -impl From for FloatWidth { - fn from(value: FloatWidthTag) -> Self { - use FloatWidth::*; +impl From<&FloatWidth> for FbFloatWidth { + fn from(value: &FloatWidth) -> Self { match value { - FloatWidthTag::Unknown => Unknown, - FloatWidthTag::_16 => _16, - FloatWidthTag::_32 => _32, - FloatWidthTag::_64 => _64, + FloatWidth::Unknown => FbFloatWidth::Unknown, + FloatWidth::_16 => FbFloatWidth::_16, + FloatWidth::_32 => FbFloatWidth::_32, + FloatWidth::_64 => FbFloatWidth::_64, } } } -#[derive(IntoPrimitive, TryFromPrimitive)] -#[repr(u8)] -enum IntWidthTag { - Unknown, - _8, - _16, - _32, - _64, -} +impl TryFrom for FloatWidth { + type Error = SchemaError; -#[allow(clippy::just_underscores_and_digits)] -impl From for IntWidthTag { - fn from(value: IntWidth) -> Self { - use IntWidth::*; + fn try_from(value: FbFloatWidth) -> SchemaResult { match value { - Unknown => IntWidthTag::Unknown, - _8 => IntWidthTag::_8, - _16 => IntWidthTag::_16, - _32 => IntWidthTag::_32, - _64 => IntWidthTag::_64, - } - } -} - -impl From for IntWidth { - fn from(value: IntWidthTag) -> Self { - use IntWidth::*; - match value { - IntWidthTag::Unknown => Unknown, - IntWidthTag::_8 => _8, - IntWidthTag::_16 => _16, - IntWidthTag::_32 => _32, - IntWidthTag::_64 => _64, + FbFloatWidth::Unknown => Ok(FloatWidth::Unknown), + FbFloatWidth::_16 => Ok(FloatWidth::_16), + FbFloatWidth::_32 => Ok(FloatWidth::_32), + FbFloatWidth::_64 => Ok(FloatWidth::_64), + _ => Err(SchemaError::InvalidArgument( + "Unknown IntWidth value".into(), + )), } } } #[cfg(test)] mod test { - use crate::dtype::DType::Int; - use crate::dtype::IntWidth::_64; - use crate::dtype::Nullability::NonNullable; - use crate::dtype::Signedness::Unsigned; - use crate::serde::{DTypeReader, DTypeWriter}; + use std::sync::Arc; + + use crate::{DType, FbDeserialize, FbSerialize, FloatWidth, IntWidth, Nullability, Signedness}; + + fn roundtrip_dtype(dtype: DType) { + let (bytes, head) = dtype.serialize(); + let deserialized = + DType::deserialize(&bytes[head..], |_| panic!("no composite ids")).unwrap(); + assert_eq!(dtype, deserialized); + } #[test] fn roundtrip() { - let mut buffer: Vec = Vec::new(); - let dtype = Int(_64, Unsigned, NonNullable); - DTypeWriter::new(&mut buffer).write(&dtype).unwrap(); - assert_eq!(buffer, [0x02, 0x01, 0x04, 0x01]); - let read_dtype = DTypeReader::new(&mut buffer.as_slice()) - .read(|_| panic!("no composite types")) - .unwrap(); - assert_eq!(dtype, read_dtype); + roundtrip_dtype(DType::Null); + roundtrip_dtype(DType::Bool(Nullability::NonNullable)); + roundtrip_dtype(DType::Int( + IntWidth::_64, + Signedness::Unsigned, + Nullability::NonNullable, + )); + roundtrip_dtype(DType::Decimal(18, 9, Nullability::NonNullable)); + roundtrip_dtype(DType::Float(FloatWidth::_64, Nullability::NonNullable)); + roundtrip_dtype(DType::Binary(Nullability::NonNullable)); + roundtrip_dtype(DType::Utf8(Nullability::NonNullable)); + roundtrip_dtype(DType::List( + Box::new(DType::Float(FloatWidth::_32, Nullability::Nullable)), + Nullability::NonNullable, + )); + roundtrip_dtype(DType::Struct( + vec![Arc::new("strings".into()), Arc::new("ints".into())], + vec![ + DType::Utf8(Nullability::NonNullable), + DType::Int(IntWidth::_16, Signedness::Unsigned, Nullability::Nullable), + ], + )) } }