Skip to content

Commit

Permalink
Use flatbuffers to serialize dtypes (#126)
Browse files Browse the repository at this point in the history
  • Loading branch information
robert3005 authored Mar 22, 2024
1 parent 0199f4d commit 10ad5af
Show file tree
Hide file tree
Showing 8 changed files with 532 additions and 356 deletions.
23 changes: 21 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 7 additions & 11 deletions vortex-array/src/serde/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::io::{ErrorKind, Read, Write};
use arrow_buffer::buffer::{Buffer, MutableBuffer};

use vortex_schema::{
DType, DTypeReader, DTypeWriter, IntWidth, Nullability, SchemaError, Signedness,
DType, FbDeserialize, FbSerialize, IntWidth, Nullability, SchemaError, Signedness,
};

use crate::array::composite::find_extension_id;
Expand Down Expand Up @@ -82,12 +82,10 @@ impl<'a> ReadCtx<'a> {

#[inline]
pub fn dtype(&mut self) -> VortexResult<DType> {
DTypeReader::new(self.r)
.read(find_extension_id)
.map_err(|e| match e {
SchemaError::InvalidArgument(s) => VortexError::InvalidArgument(s),
SchemaError::IOError(io_err) => io_err.0.into(),
})
let dtype_bytes = self.read_slice()?;
DType::deserialize(&dtype_bytes, find_extension_id).map_err(|e| match e {
SchemaError::InvalidArgument(s) => VortexError::InvalidArgument(s),
})
}

pub fn ptype(&mut self) -> VortexResult<PType> {
Expand Down Expand Up @@ -182,10 +180,8 @@ impl<'a> WriteCtx<'a> {
}

pub fn dtype(&mut self, dtype: &DType) -> VortexResult<()> {
DTypeWriter::new(self.w).write(dtype).map_err(|e| match e {
SchemaError::InvalidArgument(s) => VortexError::InvalidArgument(s),
SchemaError::IOError(io_err) => io_err.0.into(),
})
let (bytes, head) = dtype.serialize();
self.write_slice(&bytes[head..])
}

pub fn ptype(&mut self, ptype: PType) -> VortexResult<()> {
Expand Down
7 changes: 5 additions & 2 deletions vortex-schema/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ path = "src/lib.rs"
[dependencies]
arrow-schema = "50.0.0"
itertools = "0.12.1"
leb128 = "0.2.5"
num_enum = "0.7.2"
thiserror = "1.0.58"
flatbuffers = "23.5.26"

[build-dependencies]
flatc = "0.2.2"
walkdir = "2.4.0"

[lints]
workspace = true
50 changes: 50 additions & 0 deletions vortex-schema/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use std::env;
use std::ffi::OsStr;
use std::path::{Path, PathBuf};
use std::process::Command;

use flatc::flatc;
use walkdir::WalkDir;

fn main() {
let flatbuffers_dir = PathBuf::from(env::var("CARGO_MANIFEST_DIR").unwrap())
.canonicalize()
.expect("Failed to canonicalize CARGO_MANIFEST_DIR")
.join("flatbuffers");
let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap())
.canonicalize()
.expect("Failed to canonicalize OUT_DIR");

let fbs_files = WalkDir::new(flatbuffers_dir)
.into_iter()
.filter_map(|e| e.ok())
.filter(|e| e.path().extension() == Some(OsStr::new("fbs")))
.map(|e| {
rerun_if_changed(e.path());
e.path().to_path_buf()
})
.collect::<Vec<_>>();

if !Command::new(flatc())
.args(["--filename-suffix", ""])
.arg("--rust")
.arg("-o")
.arg(out_dir.join("flatbuffers"))
.args(fbs_files)
.status()
.unwrap()
.success()
{
panic!("Failed to run flatc");
}
}

fn rerun_if_changed(path: &Path) {
println!(
"cargo:rerun-if-changed={}",
path.canonicalize()
.unwrap_or_else(|_| panic!("failed to canonicalize {}", path.to_str().unwrap()))
.to_str()
.unwrap()
);
}
94 changes: 94 additions & 0 deletions vortex-schema/flatbuffers/schema.fbs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
enum Nullability: byte {
NonNullable,
Nullable,
}

enum Signedness: byte {
Signed,
Unsigned,
Unknown,
}

enum IntWidth: byte {
Unknown,
_8,
_16,
_32,
_64,
}

enum FloatWidth: byte {
Unknown,
_16,
_32,
_64,
}

table Null {}

table Bool {
nullability: Nullability;
}

table Int {
width: IntWidth = Unknown;
signedness: Signedness = Unknown;
nullability: Nullability;
}

table Decimal {
/// Total number of decimal digits
precision: ubyte;

/// Number of digits after the decimal point "."
scale: byte;
nullability: Nullability;
}

table Float {
width: FloatWidth = Unknown;
nullability: Nullability;
}

table Utf8 {
nullability: Nullability;

}

table Binary {
nullability: Nullability;
}

table Struct_ {
names: [string];
fields: [DType];
}

table List {
element_type: DType;
nullability: Nullability;
}

table Composite {
id: string;
nullability: Nullability;
}

union Type {
Null,
Bool,
Int,
Decimal,
Float,
Utf8,
Binary,
Struct_,
List,
Composite,
}

table DType {
type: Type;
}

root_type DType;
25 changes: 1 addition & 24 deletions vortex-schema/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::borrow::Cow;
use std::fmt::{Display, Formatter};
use std::ops::Deref;
use std::{env, fmt, io};
use std::{env, fmt};

#[derive(Debug, PartialEq)]
pub struct ErrString(Cow<'static, str>);
Expand Down Expand Up @@ -43,29 +43,6 @@ impl Display for ErrString {
pub enum SchemaError {
#[error("{0}")]
InvalidArgument(ErrString),
#[error("io error: {0:?}")]
IOError(IOError),
}

pub type SchemaResult<T> = Result<T, SchemaError>;

macro_rules! wrapped_error {
($E:ty, $e:ident) => {
#[derive(Debug)]
pub struct $e(pub $E);

impl PartialEq for $e {
fn eq(&self, _other: &Self) -> bool {
false
}
}

impl From<$E> for SchemaError {
fn from(err: $E) -> Self {
SchemaError::$e($e(err))
}
}
};
}

wrapped_error!(io::Error, IOError);
16 changes: 13 additions & 3 deletions vortex-schema/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::fmt::{Display, Formatter};

pub use dtype::*;
pub use error::ErrString;
pub use error::SchemaError;
pub use error::SchemaResult;
pub use serde::DTypeReader;
pub use serde::DTypeWriter;
use std::fmt::{Display, Formatter};
pub use serde::FbDeserialize;
pub use serde::FbSerialize;

mod dtype;
mod error;
Expand All @@ -18,3 +19,12 @@ impl Display for CompositeID {
write!(f, "{}", self.0)
}
}

#[allow(unused_imports)]
#[allow(dead_code)]
#[allow(clippy::needless_lifetimes)]
#[allow(clippy::extra_unused_lifetimes)]
#[allow(non_camel_case_types)]
mod generated {
include!(concat!(env!("OUT_DIR"), "/flatbuffers/schema.rs"));
}
Loading

0 comments on commit 10ad5af

Please sign in to comment.