From 315df411709426af27000e3b519d4c98f978605d Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Thu, 11 Jul 2024 13:52:33 +0100 Subject: [PATCH 01/47] Add vortex file format --- Cargo.lock | 69 ++--- Cargo.toml | 8 +- bench-vortex/Cargo.toml | 6 +- bench-vortex/src/data_downloads.rs | 4 +- bench-vortex/src/reader.rs | 10 +- bench-vortex/src/taxi_data.rs | 2 +- vortex-array/src/array/struct_/mod.rs | 9 +- vortex-array/src/array/varbin/compute/mod.rs | 8 +- vortex-array/src/array/varbin/mod.rs | 7 +- vortex-array/src/array/varbin/stats.rs | 5 +- vortex-array/src/array/varbinview/compute.rs | 3 +- vortex-ipc/src/io/read.rs | 88 ------- vortex-scalar/src/utf8.rs | 18 +- {vortex-ipc => vortex-serde}/Cargo.toml | 6 +- {vortex-ipc => vortex-serde}/README.md | 0 .../benches/ipc_array_reader_take.rs | 6 +- .../benches/ipc_take.rs | 6 +- {vortex-ipc => vortex-serde}/build.rs | 0 vortex-serde/flatbuffers/footer.fbs | 25 ++ .../flatbuffers/message.fbs | 2 +- .../src/chunked_reader/mod.rs | 18 +- .../src/chunked_reader/take_rows.rs | 51 ++-- vortex-serde/src/file/column_metadata.rs | 130 ++++++++++ vortex-serde/src/file/file_metadata.rs | 10 + vortex-serde/src/file/file_reader.rs | 49 ++++ vortex-serde/src/file/file_writer.rs | 238 ++++++++++++++++++ vortex-serde/src/file/mod.rs | 4 + .../src/io/futures.rs | 0 {vortex-ipc => vortex-serde}/src/io/mod.rs | 0 {vortex-ipc => vortex-serde}/src/io/monoio.rs | 0 .../src/io/object_store.rs | 6 +- {vortex-ipc => vortex-serde}/src/io/offset.rs | 4 + vortex-serde/src/io/read.rs | 149 +++++++++++ {vortex-ipc => vortex-serde}/src/io/tokio.rs | 4 + {vortex-ipc => vortex-serde}/src/io/write.rs | 0 {vortex-ipc => vortex-serde}/src/lib.rs | 64 +++-- .../src/message_reader.rs | 32 ++- .../src/message_writer.rs | 0 {vortex-ipc => vortex-serde}/src/messages.rs | 4 +- .../src/stream_reader/mod.rs | 4 +- {vortex-ipc => vortex-serde}/src/writer.rs | 0 41 files changed, 803 insertions(+), 246 deletions(-) delete mode 100644 vortex-ipc/src/io/read.rs rename {vortex-ipc => vortex-serde}/Cargo.toml (94%) rename {vortex-ipc => vortex-serde}/README.md (100%) rename {vortex-ipc => vortex-serde}/benches/ipc_array_reader_take.rs (94%) rename {vortex-ipc => vortex-serde}/benches/ipc_take.rs (96%) rename {vortex-ipc => vortex-serde}/build.rs (100%) create mode 100644 vortex-serde/flatbuffers/footer.fbs rename {vortex-ipc => vortex-serde}/flatbuffers/message.fbs (96%) rename {vortex-ipc => vortex-serde}/src/chunked_reader/mod.rs (65%) rename {vortex-ipc => vortex-serde}/src/chunked_reader/take_rows.rs (88%) create mode 100644 vortex-serde/src/file/column_metadata.rs create mode 100644 vortex-serde/src/file/file_metadata.rs create mode 100644 vortex-serde/src/file/file_reader.rs create mode 100644 vortex-serde/src/file/file_writer.rs create mode 100644 vortex-serde/src/file/mod.rs rename {vortex-ipc => vortex-serde}/src/io/futures.rs (100%) rename {vortex-ipc => vortex-serde}/src/io/mod.rs (100%) rename {vortex-ipc => vortex-serde}/src/io/monoio.rs (100%) rename {vortex-ipc => vortex-serde}/src/io/object_store.rs (95%) rename {vortex-ipc => vortex-serde}/src/io/offset.rs (88%) create mode 100644 vortex-serde/src/io/read.rs rename {vortex-ipc => vortex-serde}/src/io/tokio.rs (94%) rename {vortex-ipc => vortex-serde}/src/io/write.rs (100%) rename {vortex-ipc => vortex-serde}/src/lib.rs (70%) rename {vortex-ipc => vortex-serde}/src/message_reader.rs (93%) rename {vortex-ipc => vortex-serde}/src/message_writer.rs (100%) rename {vortex-ipc => vortex-serde}/src/messages.rs (98%) rename {vortex-ipc => vortex-serde}/src/stream_reader/mod.rs (95%) rename {vortex-ipc => vortex-serde}/src/writer.rs (100%) diff --git a/Cargo.lock b/Cargo.lock index 9145d07dc2..8e606dfa93 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -457,10 +457,10 @@ dependencies = [ "vortex-dtype", "vortex-error", "vortex-fastlanes", - "vortex-ipc", "vortex-roaring", "vortex-runend", "vortex-sampling-compressor", + "vortex-serde", "xshell", ] @@ -4239,39 +4239,6 @@ dependencies = [ "flatbuffers", ] -[[package]] -name = "vortex-ipc" -version = "0.1.0" -dependencies = [ - "arrow", - "arrow-array", - "arrow-ipc", - "arrow-schema", - "arrow-select", - "build-vortex", - "bytes", - "criterion", - "flatbuffers", - "futures-executor", - "futures-util", - "itertools 0.13.0", - "monoio", - "object_store", - "pin-project", - "rand", - "simplelog", - "tokio", - "vortex-alp", - "vortex-array", - "vortex-buffer", - "vortex-dtype", - "vortex-error", - "vortex-fastlanes", - "vortex-flatbuffers", - "vortex-sampling-compressor", - "vortex-scalar", -] - [[package]] name = "vortex-roaring" version = "0.1.0" @@ -4357,6 +4324,40 @@ dependencies = [ "vortex-flatbuffers", ] +[[package]] +name = "vortex-serde" +version = "0.1.0" +dependencies = [ + "arrow", + "arrow-array", + "arrow-ipc", + "arrow-schema", + "arrow-select", + "build-vortex", + "bytes", + "criterion", + "flatbuffers", + "futures", + "futures-executor", + "futures-util", + "itertools 0.13.0", + "monoio", + "object_store", + "pin-project", + "rand", + "simplelog", + "tokio", + "vortex-alp", + "vortex-array", + "vortex-buffer", + "vortex-dtype", + "vortex-error", + "vortex-fastlanes", + "vortex-flatbuffers", + "vortex-sampling-compressor", + "vortex-scalar", +] + [[package]] name = "vortex-zigzag" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 51336a85a0..79853109c9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ members = [ "vortex-error", "vortex-expr", "vortex-flatbuffers", - "vortex-ipc", + "vortex-serde", "vortex-sampling-compressor", ] resolver = "2" @@ -32,6 +32,7 @@ rust-version = "1.76" [workspace.dependencies] ahash = "0.8.11" allocator-api2 = "0.2.16" +anyhow = "1.0" arrayref = "0.3.7" arrow = { version = "52.0.0", features = ["pyarrow"] } arrow-arith = "52.0.0" @@ -53,7 +54,6 @@ chrono = "0.4.38" criterion = { version = "0.5.1", features = ["html_reports"] } croaring = "2.0.0" csv = "1.3.0" -object_store = "0.10.1" datafusion = "40.0.0" datafusion-common = "40.0.0" datafusion-execution = "40.0.0" @@ -81,9 +81,11 @@ itertools = "0.13.0" lazy_static = "1.4.0" leb128 = "0.2.5" log = "0.4.21" +mimalloc = "0.1.42" monoio = "0.2.3" num-traits = "0.2.18" num_enum = "0.7.2" +object_store = "0.10.1" parquet = "52.0.0" paste = "1.0.14" pin-project = "1.1.5" @@ -108,8 +110,8 @@ uninit = "0.6.2" uuid = "1.8.0" walkdir = "2.5.0" worker = "0.3.0" +xshell = "0.2.6" zigzag = "0.1.0" -mimalloc = "0.1.42" [workspace.lints.rust] warnings = "deny" diff --git a/bench-vortex/Cargo.toml b/bench-vortex/Cargo.toml index ea3baf45bb..61cad8f6d0 100644 --- a/bench-vortex/Cargo.toml +++ b/bench-vortex/Cargo.toml @@ -15,7 +15,7 @@ rust-version = { workspace = true } workspace = true [dependencies] -anyhow = "1.0" +anyhow = { workspace = true } arrow-array = { workspace = true } arrow-schema = { workspace = true } arrow-select = { workspace = true } @@ -51,11 +51,11 @@ vortex-dict = { path = "../encodings/dict" } vortex-dtype = { path = "../vortex-dtype" } vortex-error = { path = "../vortex-error", features = ["parquet"] } vortex-fastlanes = { path = "../encodings/fastlanes" } -vortex-ipc = { path = "../vortex-ipc", features = ["object_store"] } vortex-roaring = { path = "../encodings/roaring" } vortex-runend = { path = "../encodings/runend" } vortex-sampling-compressor = { path = "../vortex-sampling-compressor" } -xshell = "0.2.6" +vortex-serde = { path = "../vortex-serde", features = ["object_store"] } +xshell = { workspace = true } [dev-dependencies] criterion = { workspace = true, features = ["html_reports", "async_tokio"] } diff --git a/bench-vortex/src/data_downloads.rs b/bench-vortex/src/data_downloads.rs index d1bdf017b7..dc07b745f0 100644 --- a/bench-vortex/src/data_downloads.rs +++ b/bench-vortex/src/data_downloads.rs @@ -14,8 +14,8 @@ use vortex::arrow::FromArrowType; use vortex::{IntoArray, ToArrayData}; use vortex_dtype::DType; use vortex_error::{VortexError, VortexResult}; -use vortex_ipc::io::TokioAdapter; -use vortex_ipc::writer::ArrayWriter; +use vortex_serde::io::TokioAdapter; +use vortex_serde::writer::ArrayWriter; use crate::idempotent; use crate::reader::BATCH_SIZE; diff --git a/bench-vortex/src/reader.rs b/bench-vortex/src/reader.rs index c25c94a481..bd00a4e9ee 100644 --- a/bench-vortex/src/reader.rs +++ b/bench-vortex/src/reader.rs @@ -31,11 +31,11 @@ use vortex::{Array, IntoArray, IntoCanonical, ToArrayData}; use vortex_buffer::Buffer; use vortex_dtype::DType; use vortex_error::{vortex_err, VortexResult}; -use vortex_ipc::chunked_reader::ChunkedArrayReader; -use vortex_ipc::io::{ObjectStoreExt, TokioAdapter, VortexReadAt, VortexWrite}; -use vortex_ipc::writer::ArrayWriter; -use vortex_ipc::MessageReader; use vortex_sampling_compressor::SamplingCompressor; +use vortex_serde::chunked_reader::ChunkedArrayReader; +use vortex_serde::io::{ObjectStoreExt, TokioAdapter, VortexReadAt, VortexWrite}; +use vortex_serde::writer::ArrayWriter; +use vortex_serde::MessageReader; use crate::{COMPRESSORS, CTX}; @@ -153,7 +153,7 @@ pub async fn read_vortex_footer_format( ChunkedArrayReader::try_new( reader, CTX.clone(), - dtype, + dtype.into(), PrimitiveArray::from(footer.byte_offsets).into_array(), PrimitiveArray::from(footer.row_offsets).into_array(), ) diff --git a/bench-vortex/src/taxi_data.rs b/bench-vortex/src/taxi_data.rs index e4d67ea1ad..348e424132 100644 --- a/bench-vortex/src/taxi_data.rs +++ b/bench-vortex/src/taxi_data.rs @@ -5,7 +5,7 @@ use std::path::PathBuf; use futures::executor::block_on; use vortex_buffer::io_buf::IoBuf; use vortex_error::VortexError; -use vortex_ipc::io::VortexWrite; +use vortex_serde::io::VortexWrite; use crate::data_downloads::{data_vortex_uncompressed, download_data}; use crate::reader::rewrite_parquet_as_vortex; diff --git a/vortex-array/src/array/struct_/mod.rs b/vortex-array/src/array/struct_/mod.rs index 7802743448..8893ee0c29 100644 --- a/vortex-array/src/array/struct_/mod.rs +++ b/vortex-array/src/array/struct_/mod.rs @@ -26,15 +26,11 @@ impl StructArray { self.len(), )) } -} -impl<'a> StructArray { - pub fn children(&'a self) -> impl Iterator + '_ { + pub fn children(&self) -> impl Iterator + '_ { (0..self.nfields()).map(move |idx| self.field(idx).unwrap()) } -} -impl StructArray { pub fn try_new( names: FieldNames, fields: Vec, @@ -85,11 +81,8 @@ impl StructArray { Self::try_new(FieldNames::from(names), fields, len, Validity::NonNullable) .expect("building StructArray with helper") } -} -impl StructArray { // TODO(aduffy): Add equivalent function to support field masks for nested column access. - /// Return a new StructArray with the given projection applied. /// /// Projection does not copy data arrays. Projection is defined by an ordinal array slice diff --git a/vortex-array/src/array/varbin/compute/mod.rs b/vortex-array/src/array/varbin/compute/mod.rs index e56dfac773..df946ba109 100644 --- a/vortex-array/src/array/varbin/compute/mod.rs +++ b/vortex-array/src/array/varbin/compute/mod.rs @@ -28,13 +28,7 @@ impl ArrayCompute for VarBinArray { impl ScalarAtFn for VarBinArray { fn scalar_at(&self, index: usize) -> VortexResult { if self.is_valid(index) { - Ok(varbin_scalar( - self.bytes_at(index)? - // TODO(ngates): update to use buffer when we refactor scalars. - .into_vec() - .unwrap_or_else(|b| b.as_ref().to_vec()), - self.dtype(), - )) + Ok(varbin_scalar(self.bytes_at(index)?, self.dtype())) } else { Ok(Scalar::null(self.dtype().clone())) } diff --git a/vortex-array/src/array/varbin/mod.rs b/vortex-array/src/array/varbin/mod.rs index f46ed70b4e..3a82467ae5 100644 --- a/vortex-array/src/array/varbin/mod.rs +++ b/vortex-array/src/array/varbin/mod.rs @@ -216,12 +216,11 @@ impl<'a> FromIterator> for VarBinArray { } } -pub fn varbin_scalar(value: Vec, dtype: &DType) -> Scalar { +pub fn varbin_scalar(value: Buffer, dtype: &DType) -> Scalar { if matches!(dtype, DType::Utf8(_)) { - let str = unsafe { String::from_utf8_unchecked(value) }; - Scalar::utf8(str, dtype.nullability()) + Scalar::try_utf8(value, dtype.nullability()).unwrap() } else { - Scalar::binary(value.into(), dtype.nullability()) + Scalar::binary(value, dtype.nullability()) } } diff --git a/vortex-array/src/array/varbin/stats.rs b/vortex-array/src/array/varbin/stats.rs index 834b69bfc5..157f1990ff 100644 --- a/vortex-array/src/array/varbin/stats.rs +++ b/vortex-array/src/array/varbin/stats.rs @@ -1,6 +1,7 @@ use std::cmp::Ordering; use std::collections::HashMap; +use vortex_buffer::Buffer; use vortex_dtype::DType; use vortex_error::VortexResult; @@ -101,8 +102,8 @@ impl<'a> VarBinAccumulator<'a> { pub fn finish(&self, dtype: &DType) -> StatsSet { StatsSet::from(HashMap::from([ - (Stat::Min, varbin_scalar(self.min.to_vec(), dtype)), - (Stat::Max, varbin_scalar(self.max.to_vec(), dtype)), + (Stat::Min, varbin_scalar(Buffer::from(self.min), dtype)), + (Stat::Max, varbin_scalar(Buffer::from(self.max), dtype)), (Stat::RunCount, self.runs.into()), (Stat::IsSorted, self.is_sorted.into()), (Stat::IsStrictSorted, self.is_strict_sorted.into()), diff --git a/vortex-array/src/array/varbinview/compute.rs b/vortex-array/src/array/varbinview/compute.rs index 938236d7d9..184eec0f8f 100644 --- a/vortex-array/src/array/varbinview/compute.rs +++ b/vortex-array/src/array/varbinview/compute.rs @@ -1,3 +1,4 @@ +use vortex_buffer::Buffer; use vortex_error::VortexResult; use vortex_scalar::Scalar; @@ -22,7 +23,7 @@ impl ScalarAtFn for VarBinViewArray { fn scalar_at(&self, index: usize) -> VortexResult { if self.is_valid(index) { self.bytes_at(index) - .map(|bytes| varbin_scalar(bytes, self.dtype())) + .map(|bytes| varbin_scalar(Buffer::from(bytes), self.dtype())) } else { Ok(Scalar::null(self.dtype().clone())) } diff --git a/vortex-ipc/src/io/read.rs b/vortex-ipc/src/io/read.rs deleted file mode 100644 index ebbf38d23d..0000000000 --- a/vortex-ipc/src/io/read.rs +++ /dev/null @@ -1,88 +0,0 @@ -use std::future::Future; -use std::io; -use std::io::{Cursor, Read}; - -use bytes::BytesMut; -use vortex_buffer::Buffer; -use vortex_error::vortex_err; - -pub trait VortexRead { - fn read_into(&mut self, buffer: BytesMut) -> impl Future>; -} - -pub trait VortexReadAt { - fn read_at_into( - &self, - pos: u64, - buffer: BytesMut, - ) -> impl Future>; - - // TODO(ngates): the read implementation should be able to hint at its latency/throughput - // allowing the caller to make better decisions about how to coalesce reads. - fn performance_hint(&self) -> usize { - 0 - } -} - -impl VortexRead for BytesMut { - async fn read_into(&mut self, buffer: BytesMut) -> io::Result { - if buffer.len() > self.len() { - Err(io::Error::new( - io::ErrorKind::UnexpectedEof, - vortex_err!("unexpected eof"), - )) - } else { - Ok(self.split_to(buffer.len())) - } - } -} - -impl VortexRead for Cursor> { - async fn read_into(&mut self, mut buffer: BytesMut) -> io::Result { - Read::read_exact(self, buffer.as_mut())?; - Ok(buffer) - } -} - -impl VortexRead for Cursor<&[u8]> { - async fn read_into(&mut self, mut buffer: BytesMut) -> io::Result { - Read::read_exact(self, buffer.as_mut())?; - Ok(buffer) - } -} - -impl VortexRead for Cursor { - async fn read_into(&mut self, mut buffer: BytesMut) -> io::Result { - Read::read_exact(self, buffer.as_mut())?; - Ok(buffer) - } -} - -impl VortexReadAt for Vec { - fn read_at_into( - &self, - pos: u64, - buffer: BytesMut, - ) -> impl Future> { - VortexReadAt::read_at_into(self.as_slice(), pos, buffer) - } -} - -impl VortexReadAt for [u8] { - async fn read_at_into(&self, pos: u64, mut buffer: BytesMut) -> io::Result { - let buffer_len = buffer.len(); - buffer.copy_from_slice(&self[pos as usize..][..buffer_len]); - Ok(buffer) - } -} - -impl VortexReadAt for Buffer { - async fn read_at_into(&self, pos: u64, mut buffer: BytesMut) -> io::Result { - let buffer_len = buffer.len(); - buffer.copy_from_slice( - self.slice(pos as usize..pos as usize + buffer_len) - .as_slice(), - ); - Ok(buffer) - } -} diff --git a/vortex-scalar/src/utf8.rs b/vortex-scalar/src/utf8.rs index 1e02f909cc..90d3bb624c 100644 --- a/vortex-scalar/src/utf8.rs +++ b/vortex-scalar/src/utf8.rs @@ -29,12 +29,22 @@ impl<'a> Utf8Scalar<'a> { impl Scalar { pub fn utf8(str: B, nullability: Nullability) -> Self where - BufferString: From, + B: Into, { - Self { + Self::try_utf8(str, nullability).unwrap() + } + + pub fn try_utf8( + str: B, + nullability: Nullability, + ) -> Result>::Error> + where + B: TryInto, + { + Ok(Self { dtype: DType::Utf8(nullability), - value: ScalarValue::BufferString(BufferString::from(str)), - } + value: ScalarValue::BufferString(str.try_into()?), + }) } } diff --git a/vortex-ipc/Cargo.toml b/vortex-serde/Cargo.toml similarity index 94% rename from vortex-ipc/Cargo.toml rename to vortex-serde/Cargo.toml index 5443b5b1a4..fff6e21018 100644 --- a/vortex-ipc/Cargo.toml +++ b/vortex-serde/Cargo.toml @@ -1,7 +1,7 @@ [package] -name = "vortex-ipc" +name = "vortex-serde" version = { workspace = true } -description = "Vortex IPC" +description = "Vortex Serialisation" homepage = { workspace = true } repository = { workspace = true } authors = { workspace = true } @@ -14,6 +14,7 @@ rust-version = { workspace = true } [dependencies] bytes = { workspace = true } flatbuffers = { workspace = true } +futures = { workspace = true } futures-util = { workspace = true } itertools = { workspace = true } monoio = { workspace = true, optional = true, features = ["bytes"] } @@ -52,6 +53,7 @@ workspace = true default = ["futures", "monoio", "tokio"] futures = ["futures-util/io"] monoio = ["dep:monoio"] +tokio = ["dep:tokio"] [[bench]] name = "ipc_take" diff --git a/vortex-ipc/README.md b/vortex-serde/README.md similarity index 100% rename from vortex-ipc/README.md rename to vortex-serde/README.md diff --git a/vortex-ipc/benches/ipc_array_reader_take.rs b/vortex-serde/benches/ipc_array_reader_take.rs similarity index 94% rename from vortex-ipc/benches/ipc_array_reader_take.rs rename to vortex-serde/benches/ipc_array_reader_take.rs index 886a26d245..28f6ed9c5f 100644 --- a/vortex-ipc/benches/ipc_array_reader_take.rs +++ b/vortex-serde/benches/ipc_array_reader_take.rs @@ -10,9 +10,9 @@ use vortex::array::primitive::PrimitiveArray; use vortex::stream::ArrayStreamExt; use vortex::validity::Validity; use vortex::{Context, IntoArray}; -use vortex_ipc::io::FuturesAdapter; -use vortex_ipc::writer::ArrayWriter; -use vortex_ipc::MessageReader; +use vortex_serde::io::FuturesAdapter; +use vortex_serde::writer::ArrayWriter; +use vortex_serde::MessageReader; // 100 record batches, 100k rows each // take from the first 20 batches and last batch diff --git a/vortex-ipc/benches/ipc_take.rs b/vortex-serde/benches/ipc_take.rs similarity index 96% rename from vortex-ipc/benches/ipc_take.rs rename to vortex-serde/benches/ipc_take.rs index 7c50297b21..2b0cb8cc4d 100644 --- a/vortex-ipc/benches/ipc_take.rs +++ b/vortex-serde/benches/ipc_take.rs @@ -15,10 +15,10 @@ use vortex::array::primitive::PrimitiveArray; use vortex::compress::CompressionStrategy; use vortex::compute::take; use vortex::{Context, IntoArray}; -use vortex_ipc::io::FuturesAdapter; -use vortex_ipc::writer::ArrayWriter; -use vortex_ipc::MessageReader; use vortex_sampling_compressor::SamplingCompressor; +use vortex_serde::io::FuturesAdapter; +use vortex_serde::writer::ArrayWriter; +use vortex_serde::MessageReader; fn ipc_take(c: &mut Criterion) { let mut group = c.benchmark_group("ipc_take"); diff --git a/vortex-ipc/build.rs b/vortex-serde/build.rs similarity index 100% rename from vortex-ipc/build.rs rename to vortex-serde/build.rs diff --git a/vortex-serde/flatbuffers/footer.fbs b/vortex-serde/flatbuffers/footer.fbs new file mode 100644 index 0000000000..39e377f484 --- /dev/null +++ b/vortex-serde/flatbuffers/footer.fbs @@ -0,0 +1,25 @@ +namespace vortex.serde; + +table OpaqueLayout { + byte_offset: uint64; +} + +table ArrayLayout { + children: [Layout]; + encoding: uint16; +} + +union LayoutVariant { + OpaqueLayout, + ArrayLayout, +} + +table Layout { + layout: LayoutVariant; +} + +table Footer { + layout: ArrayLayout; +} + +root_type Footer; \ No newline at end of file diff --git a/vortex-ipc/flatbuffers/message.fbs b/vortex-serde/flatbuffers/message.fbs similarity index 96% rename from vortex-ipc/flatbuffers/message.fbs rename to vortex-serde/flatbuffers/message.fbs index cfd6947843..c37b83a203 100644 --- a/vortex-ipc/flatbuffers/message.fbs +++ b/vortex-serde/flatbuffers/message.fbs @@ -1,7 +1,7 @@ include "vortex-array/flatbuffers/array.fbs"; include "vortex-dtype/flatbuffers/dtype.fbs"; -namespace vortex.ipc; +namespace vortex.serde; enum Version: uint8 { V0 = 0, diff --git a/vortex-ipc/src/chunked_reader/mod.rs b/vortex-serde/src/chunked_reader/mod.rs similarity index 65% rename from vortex-ipc/src/chunked_reader/mod.rs rename to vortex-serde/src/chunked_reader/mod.rs index 0c987400d0..82868f269c 100644 --- a/vortex-ipc/src/chunked_reader/mod.rs +++ b/vortex-serde/src/chunked_reader/mod.rs @@ -1,10 +1,14 @@ +use std::io::Cursor; use std::sync::Arc; +use vortex::compute::unary::scalar_at::scalar_at; +use vortex::stream::ArrayStream; use vortex::{Array, Context}; use vortex_dtype::DType; use vortex_error::{vortex_bail, VortexResult}; use crate::io::VortexReadAt; +use crate::stream_reader::StreamArrayReader; mod take_rows; @@ -12,7 +16,7 @@ mod take_rows; pub struct ChunkedArrayReader { read: R, context: Arc, - dtype: DType, + dtype: Arc, // One row per chunk + 1 row for the end of the last chunk. byte_offsets: Array, @@ -23,7 +27,7 @@ impl ChunkedArrayReader { pub fn try_new( read: R, context: Arc, - dtype: DType, + dtype: Arc, byte_offsets: Array, row_offsets: Array, ) -> VortexResult { @@ -47,4 +51,14 @@ impl ChunkedArrayReader { } Ok(()) } + + pub async fn array_stream(&mut self) -> impl ArrayStream + '_ { + let mut cursor = Cursor::new(&self.read); + cursor.set_position(u64::try_from(&scalar_at(&self.byte_offsets, 0).unwrap()).unwrap()); + StreamArrayReader::try_new(cursor, self.context.clone()) + .await + .unwrap() + .with_dtype(self.dtype.clone()) + .into_array_stream() + } } diff --git a/vortex-ipc/src/chunked_reader/take_rows.rs b/vortex-serde/src/chunked_reader/take_rows.rs similarity index 88% rename from vortex-ipc/src/chunked_reader/take_rows.rs rename to vortex-serde/src/chunked_reader/take_rows.rs index 236f68fcc7..9d2c4ab3a1 100644 --- a/vortex-ipc/src/chunked_reader/take_rows.rs +++ b/vortex-serde/src/chunked_reader/take_rows.rs @@ -4,13 +4,15 @@ use std::ops::Range; use bytes::BytesMut; use futures_util::{stream, StreamExt, TryStreamExt}; use itertools::Itertools; + +use vortex::{Array, ArrayDType, IntoArray, IntoArrayVariant}; use vortex::array::chunked::ChunkedArray; use vortex::array::primitive::PrimitiveArray; -use vortex::compute::unary::{subtract_scalar, try_cast}; -use vortex::compute::{search_sorted, slice, take, SearchSortedSide}; +use vortex::compute::{search_sorted, SearchResult, SearchSortedSide, slice, take}; +use vortex::compute::unary::cast::try_cast; +use vortex::compute::unary::scalar_subtract::subtract_scalar; use vortex::stats::ArrayStatistics; use vortex::stream::{ArrayStream, ArrayStreamExt}; -use vortex::{Array, ArrayDType, IntoArray, IntoArrayVariant}; use vortex_dtype::PType; use vortex_error::{vortex_bail, VortexResult}; use vortex_scalar::Scalar; @@ -94,7 +96,7 @@ impl ChunkedArrayReader { .try_collect() .await?; - Ok(ChunkedArray::try_new(chunks, self.dtype.clone())?.into_array()) + Ok(ChunkedArray::try_new(chunks, (*self.dtype).clone())?.into_array()) } /// Coalesce reads for the given chunks. @@ -167,9 +169,11 @@ fn find_chunks(row_offsets: &Array, indices: &Array) -> VortexResult(); for (pos, idx) in indices.maybe_null_slice::().iter().enumerate() { - let chunk_idx = row_offsets_ref.binary_search(idx).unwrap_or_else(|x| x - 1); + let chunk_idx = match search_sorted(row_offsets.array(), *idx, SearchSortedSide::Left)? { + SearchResult::Found(i) => i, + SearchResult::NotFound(i) => i - 1, + }; chunks .entry(chunk_idx as u32) .and_modify(|chunk_indices: &mut ChunkIndices| { @@ -204,41 +208,43 @@ mod test { use std::io::Cursor; use std::sync::Arc; + use futures_executor::block_on; use itertools::Itertools; + + use vortex::{Context, IntoArray, IntoArrayVariant}; use vortex::array::chunked::ChunkedArray; use vortex::array::primitive::PrimitiveArray; - use vortex::{Context, IntoArray, IntoCanonical}; use vortex_buffer::Buffer; use vortex_dtype::PType; use vortex_error::VortexResult; use crate::chunked_reader::ChunkedArrayReader; - use crate::writer::ArrayWriter; use crate::MessageReader; + use crate::writer::ArrayWriter; - async fn chunked_array() -> VortexResult>> { + fn chunked_array() -> VortexResult>> { let c = ChunkedArray::try_new( vec![PrimitiveArray::from((0i32..1000).collect_vec()).into_array(); 10], PType::I32.into(), )? .into_array(); - ArrayWriter::new(vec![]).write_array(c).await + block_on(async { ArrayWriter::new(vec![]).write_array(c).await }) } - #[tokio::test] - #[cfg_attr(miri, ignore)] - async fn test_take_rows() -> VortexResult<()> { - let writer = chunked_array().await?; + #[test] + fn test_take_rows() -> VortexResult<()> { + let writer = chunked_array()?; let array_layout = writer.array_layouts()[0].clone(); - let row_offsets = PrimitiveArray::from(array_layout.chunks.row_offsets.clone()); let byte_offsets = PrimitiveArray::from(array_layout.chunks.byte_offsets.clone()); + let row_offsets = PrimitiveArray::from(array_layout.chunks.row_offsets.clone()); let buffer = Buffer::from(writer.into_inner()); - let mut msgs = MessageReader::try_new(Cursor::new(buffer.clone())).await?; - let dtype = msgs.read_dtype().await?; + let mut msgs = + block_on(async { MessageReader::try_new(Cursor::new(buffer.clone())).await })?; + let dtype = Arc::new(block_on(async { msgs.read_dtype().await })?); let mut reader = ChunkedArrayReader::try_new( buffer, @@ -249,11 +255,12 @@ mod test { ) .unwrap(); - let result = reader - .take_rows(&PrimitiveArray::from(vec![0u64, 10, 10_000 - 1]).into_array()) - .await? - .into_canonical()? - .into_primitive()?; + let result = block_on(async { + reader + .take_rows(&PrimitiveArray::from(vec![0u64, 10, 10_000 - 1]).into_array()) + .await + })? + .into_primitive()?; assert_eq!(result.len(), 3); assert_eq!(result.maybe_null_slice::(), &[0, 10, 999]); diff --git a/vortex-serde/src/file/column_metadata.rs b/vortex-serde/src/file/column_metadata.rs new file mode 100644 index 0000000000..cb3b9f99d3 --- /dev/null +++ b/vortex-serde/src/file/column_metadata.rs @@ -0,0 +1,130 @@ +use std::sync::Arc; + +use itertools::Itertools; +use vortex::array::struct_::{Struct, StructArray}; +use vortex::validity::Validity; +use vortex::{Array, ArrayDef, Context, IntoArray}; +use vortex_dtype::DType; +use vortex_error::{vortex_bail, vortex_err, VortexResult}; + +use crate::chunked_reader::ChunkedArrayReader; +use crate::io::VortexReadAt; +use crate::writer::ChunkLayout; + +#[derive(Debug)] +pub struct ColumnMetadata { + array: Array, +} + +impl ColumnMetadata { + pub fn try_new(array: Array) -> VortexResult { + if array.encoding().id() != Struct::ID { + vortex_bail!("Metadata table array must be a struct") + } + + Ok(Self { array }) + } + + pub fn read( + &self, + read: R, + dtype: Arc, + ctx: Arc, + ) -> VortexResult> { + ChunkedArrayReader::try_new( + read, + ctx, + dtype, + self.array + .with_dyn(|a| a.as_struct_array().and_then(|a| a.field(0))) + .ok_or_else(|| vortex_err!("Missing column"))?, + self.array + .with_dyn(|a| a.as_struct_array().and_then(|a| a.field(2))) + .ok_or_else(|| vortex_err!("Missing column"))?, + ) + } + + pub fn from_chunk_layout(mut layout: ChunkLayout) -> VortexResult { + let len = layout.byte_offsets.len() - 1; + let byte_counts = layout + .byte_offsets + .iter() + .skip(1) + .zip(layout.byte_offsets.iter()) + .map(|(a, b)| a - b) + .collect_vec(); + let row_counts = layout + .row_offsets + .iter() + .skip(1) + .zip(layout.row_offsets.iter()) + .map(|(a, b)| a - b) + .collect_vec(); + layout.byte_offsets.truncate(len); + layout.row_offsets.truncate(len); + + let metadata_array = StructArray::try_new( + [ + "byte_offset".into(), + "byte_count".into(), + "row_offset".into(), + "row_count".into(), + ] + .into(), + vec![ + layout.byte_offsets.into_array(), + byte_counts.into_array(), + layout.row_offsets.into_array(), + row_counts.into_array(), + ], + len, + Validity::NonNullable, + )?; + ColumnMetadata::try_new(metadata_array.into_array()) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use futures::executor::block_on; + use itertools::Itertools; + use vortex::array::primitive::PrimitiveArray; + use vortex::stream::ArrayStreamExt; + use vortex::validity::Validity; + use vortex::{ArrayDType, Context, ToArray}; + use vortex_error::VortexResult; + + use crate::file::column_metadata::ColumnMetadata; + use crate::writer::ArrayWriter; + + #[test] + fn read_using_metadata() -> VortexResult<()> { + let data = PrimitiveArray::from_vec((0u64..100_000).collect_vec(), Validity::NonNullable); + let mut buffer = Vec::new(); + + let mut writer = ArrayWriter::new(buffer); + writer = block_on(async { writer.write_array(data.to_array()).await })?; + let meta_table = + ColumnMetadata::from_chunk_layout(writer.array_layouts()[0].chunks.clone())?; + buffer = writer.into_inner(); + + let mut reader = meta_table.read( + buffer, + Arc::new(data.dtype().clone()), + Arc::new(Context::default()), + )?; + + let array = block_on(async { reader.array_stream().await.collect_chunked().await })?; + assert_eq!( + array + .chunk(0) + .unwrap() + .as_primitive() + .maybe_null_slice::(), + data.maybe_null_slice::() + ); + Ok(()) + } +} diff --git a/vortex-serde/src/file/file_metadata.rs b/vortex-serde/src/file/file_metadata.rs new file mode 100644 index 0000000000..199ac4fdff --- /dev/null +++ b/vortex-serde/src/file/file_metadata.rs @@ -0,0 +1,10 @@ +use crate::file::column_metadata::ColumnMetadata; + +#[allow(dead_code)] +#[derive(Debug)] +pub struct FileMetadata { + column_metas: Vec, + row_boundaries: Vec, +} + +impl FileMetadata {} diff --git a/vortex-serde/src/file/file_reader.rs b/vortex-serde/src/file/file_reader.rs new file mode 100644 index 0000000000..64d665f363 --- /dev/null +++ b/vortex-serde/src/file/file_reader.rs @@ -0,0 +1,49 @@ +use bytes::BytesMut; +use vortex_error::VortexResult; + +use crate::file::file_metadata::FileMetadata; +use crate::io::VortexReadAt; + +pub struct FileReader { + read: R, + len: Option, +} + +impl FileReader { + const FOOTER_READ_SIZE: usize = 8 * 1024 * 1024; + + pub fn new(read: R) -> Self { + Self { read, len: None } + } + + pub fn with_length(mut self, len: u64) -> Self { + self.len = Some(len); + self + } + + pub async fn read_metadata(&mut self, _columns: &[&str]) -> VortexResult { + let mut buf = BytesMut::with_capacity(Self::FOOTER_READ_SIZE); + unsafe { buf.set_len(Self::FOOTER_READ_SIZE) } + let read_offset = self.len().await - Self::FOOTER_READ_SIZE as u64; + buf = self.read.read_at_into(read_offset, buf).await?; + + // Ok(FileMetadata {}) + todo!() + } + + async fn len(&mut self) -> u64 { + match self.len { + None => { + self.len = Some(self.read.len().await); + self.len.unwrap() + } + Some(l) => l, + } + } +} + +#[cfg(test)] +mod tests { + #[test] + fn read() {} +} diff --git a/vortex-serde/src/file/file_writer.rs b/vortex-serde/src/file/file_writer.rs new file mode 100644 index 0000000000..e356398ceb --- /dev/null +++ b/vortex-serde/src/file/file_writer.rs @@ -0,0 +1,238 @@ +use std::mem; + +use futures::{Stream, TryStreamExt}; +use itertools::Itertools; +use vortex::array::chunked::ChunkedArray; +use vortex::array::struct_::StructArray; +use vortex::array::varbin::builder::VarBinBuilder; +use vortex::stream::ArrayStream; +use vortex::validity::Validity; +use vortex::{Array, ArrayDType, IntoArray}; +use vortex_dtype::{DType, Nullability}; +use vortex_error::{vortex_bail, VortexResult}; + +use crate::io::VortexWrite; +use crate::writer::ChunkLayout; +use crate::MessageWriter; + +pub const MAGIC_BYTES: [u8; 4] = *b"SP1R"; + +pub struct FileWriter { + msgs: MessageWriter, + + dtype: Option, + column_chunks: Vec, +} + +impl FileWriter { + pub fn new(write: W) -> Self { + FileWriter { + msgs: MessageWriter::new(write), + dtype: None, + column_chunks: Vec::new(), + } + } + + pub async fn write_array_columns(self, array: Array) -> VortexResult { + if let Ok(chunked) = ChunkedArray::try_from(&array) { + self.write_array_columns_stream(chunked.array_stream()) + .await + } else { + self.write_array_columns_stream(array.into_array_stream()) + .await + } + } + + pub async fn write_array_columns_stream( + mut self, + mut array_stream: S, + ) -> VortexResult { + match self.dtype { + None => self.dtype = Some(array_stream.dtype().clone()), + Some(ref sd) => { + if sd != array_stream.dtype() { + vortex_bail!( + "Expected all arrays in the stream to have the same dtype {}, found {}", + sd, + array_stream.dtype() + ) + } + } + } + + while let Some(columns) = array_stream.try_next().await? { + let st = StructArray::try_from(&columns)?; + for (i, field) in st.children().enumerate() { + let chunk_pos = self + .write_column_chunks(field.into_array_stream(), i) + .await?; + + self.merge_chunk_offsets(i, chunk_pos); + } + } + + Ok(self) + } + + async fn write_column_chunks( + &mut self, + mut stream: S, + column_idx: usize, + ) -> VortexResult + where + S: Stream> + Unpin, + { + let column_row_offset = self + .column_chunks + .get(column_idx) + .and_then(|c| c.row_offsets.last()) + .copied() + .unwrap_or(0u64); + let mut byte_offsets = vec![self.msgs.tell()]; + let mut row_offsets = vec![column_row_offset]; + + while let Some(chunk) = stream.try_next().await? { + row_offsets.push( + row_offsets + .last() + .map(|off| off + chunk.len() as u64) + .expect("Row offsets should be initialized with a value"), + ); + self.msgs.write_chunk(chunk).await?; + byte_offsets.push(self.msgs.tell()); + } + + Ok(ChunkLayout { + byte_offsets, + row_offsets, + }) + } + + fn merge_chunk_offsets(&mut self, column_idx: usize, chunk_pos: ChunkLayout) { + if let Some(chunk) = self.column_chunks.get_mut(column_idx) { + chunk.byte_offsets.extend(chunk_pos.byte_offsets); + chunk.row_offsets.extend(chunk_pos.row_offsets); + } else { + self.column_chunks.push(chunk_pos); + } + } + + async fn write_metadata_arrays(&mut self) -> VortexResult { + let DType::Struct(s, _) = self.dtype.as_ref().expect("Should have written values") else { + unreachable!("Values are a structarray") + }; + + let mut column_names = VarBinBuilder::::with_capacity(s.names().len()); + let mut metadata_offsets = Vec::new(); + + for (name, mut chunk) in s.names().iter().zip(mem::take(&mut self.column_chunks)) { + let len = chunk.byte_offsets.len() - 1; + let byte_counts = chunk + .byte_offsets + .iter() + .skip(1) + .zip(chunk.byte_offsets.iter()) + .map(|(a, b)| a - b) + .collect_vec(); + let row_counts = chunk + .row_offsets + .iter() + .skip(1) + .zip(chunk.row_offsets.iter()) + .map(|(a, b)| a - b) + .collect_vec(); + chunk.byte_offsets.truncate(len); + chunk.row_offsets.truncate(len); + + let metadata_array = StructArray::try_new( + [ + "byte_offset".into(), + "byte_count".into(), + "row_offset".into(), + "row_count".into(), + ] + .into(), + vec![ + chunk.byte_offsets.into_array(), + byte_counts.into_array(), + chunk.row_offsets.into_array(), + row_counts.into_array(), + ], + len, + Validity::NonNullable, + )?; + + column_names.push_value(name.as_bytes()); + metadata_offsets.push(self.msgs.tell()); + self.msgs.write_dtype(metadata_array.dtype()).await?; + self.msgs.write_chunk(metadata_array.into_array()).await?; + } + + let meta_array = StructArray::try_new( + ["names".into(), "metadata_offsets".into()].into(), + vec![ + column_names + .finish(DType::Utf8(Nullability::NonNullable)) + .into_array(), + metadata_offsets.into_array(), + ], + s.names().len(), + Validity::NonNullable, + ) + .unwrap() + .into_array(); + + Ok(meta_array) + } + + async fn write_metadata_offsets(&mut self, metadata_offsets: Array) -> VortexResult { + let offset = self.msgs.tell(); + + self.msgs.write_dtype(metadata_offsets.dtype()).await?; + self.msgs.write_chunk(metadata_offsets).await?; + Ok(offset) + } + + async fn write_file_trailer(self, metadata_offsets_offset: u64) -> VortexResult { + let mut w = self.msgs.into_inner(); + w.write_all(metadata_offsets_offset.to_le_bytes()).await?; + w.write_all(MAGIC_BYTES).await?; + Ok(w) + } + + pub async fn finalize(mut self) -> VortexResult { + let metadata_offsets = self.write_metadata_arrays().await?; + let metadata_offsets = self.write_metadata_offsets(metadata_offsets).await?; + self.write_file_trailer(metadata_offsets).await + } +} + +#[cfg(test)] +mod tests { + use futures_executor::block_on; + use vortex::array::primitive::PrimitiveArray; + use vortex::array::struct_::StructArray; + use vortex::array::varbin::VarBinArray; + use vortex::validity::Validity; + use vortex::IntoArray; + + use crate::file::file_writer::FileWriter; + + #[test] + fn write_columns() { + let strings = VarBinArray::from(vec!["ab", "foo", "bar", "baz"]); + let numbers = PrimitiveArray::from(vec![1u32, 2, 3, 4]); + let st = StructArray::try_new( + ["strings".into(), "numbers".into()].into(), + vec![strings.into_array(), numbers.into_array()], + 4, + Validity::NonNullable, + ) + .unwrap(); + let buf = Vec::new(); + let mut writer = FileWriter::new(buf); + writer = block_on(async { writer.write_array_columns(st.into_array()).await }).unwrap(); + let written = block_on(async { writer.finalize().await }).unwrap(); + assert!(!written.is_empty()); + } +} diff --git a/vortex-serde/src/file/mod.rs b/vortex-serde/src/file/mod.rs new file mode 100644 index 0000000000..1dd4d84c4c --- /dev/null +++ b/vortex-serde/src/file/mod.rs @@ -0,0 +1,4 @@ +mod column_metadata; +mod file_metadata; +mod file_reader; +mod file_writer; diff --git a/vortex-ipc/src/io/futures.rs b/vortex-serde/src/io/futures.rs similarity index 100% rename from vortex-ipc/src/io/futures.rs rename to vortex-serde/src/io/futures.rs diff --git a/vortex-ipc/src/io/mod.rs b/vortex-serde/src/io/mod.rs similarity index 100% rename from vortex-ipc/src/io/mod.rs rename to vortex-serde/src/io/mod.rs diff --git a/vortex-ipc/src/io/monoio.rs b/vortex-serde/src/io/monoio.rs similarity index 100% rename from vortex-ipc/src/io/monoio.rs rename to vortex-serde/src/io/monoio.rs diff --git a/vortex-ipc/src/io/object_store.rs b/vortex-serde/src/io/object_store.rs similarity index 95% rename from vortex-ipc/src/io/object_store.rs rename to vortex-serde/src/io/object_store.rs index 3bb7191565..1a8daf49d1 100644 --- a/vortex-ipc/src/io/object_store.rs +++ b/vortex-serde/src/io/object_store.rs @@ -66,7 +66,7 @@ impl<'a, 'b, O: ObjectStore> ObjectStoreReadAt<'a, 'b, O> { } impl<'a, 'b, O: ObjectStore> VortexReadAt for ObjectStoreReadAt<'a, 'b, O> { - async fn read_at_into(&self, pos: u64, mut buffer: BytesMut) -> std::io::Result { + async fn read_at_into(&self, pos: u64, mut buffer: BytesMut) -> io::Result { let start_range = pos as usize; let bytes = self .object_store @@ -75,6 +75,10 @@ impl<'a, 'b, O: ObjectStore> VortexReadAt for ObjectStoreReadAt<'a, 'b, O> { buffer.as_mut().copy_from_slice(bytes.as_ref()); Ok(buffer) } + + async fn len(&self) -> u64 { + self.object_store.head(self.location).await.unwrap().size as u64 + } } pub struct ObjectStoreWriter { diff --git a/vortex-ipc/src/io/offset.rs b/vortex-serde/src/io/offset.rs similarity index 88% rename from vortex-ipc/src/io/offset.rs rename to vortex-serde/src/io/offset.rs index e515d0a4e3..f152af6b56 100644 --- a/vortex-ipc/src/io/offset.rs +++ b/vortex-serde/src/io/offset.rs @@ -28,4 +28,8 @@ impl VortexReadAt for OffsetReadAt { fn performance_hint(&self) -> usize { self.read.performance_hint() } + + async fn len(&self) -> u64 { + self.read.len().await - self.offset + } } diff --git a/vortex-serde/src/io/read.rs b/vortex-serde/src/io/read.rs new file mode 100644 index 0000000000..dfe35ba06d --- /dev/null +++ b/vortex-serde/src/io/read.rs @@ -0,0 +1,149 @@ +use std::future::Future; +use std::io; +use std::io::Cursor; +use std::sync::Arc; + +use bytes::BytesMut; +use vortex_buffer::Buffer; +use vortex_error::vortex_err; + +pub trait VortexRead { + fn read_into(&mut self, buffer: BytesMut) -> impl Future>; +} + +#[allow(clippy::len_without_is_empty)] +pub trait VortexReadAt { + fn read_at_into( + &self, + pos: u64, + buffer: BytesMut, + ) -> impl Future>; + + // TODO(ngates): the read implementation should be able to hint at its latency/throughput + // allowing the caller to make better decisions about how to coalesce reads. + fn performance_hint(&self) -> usize { + 0 + } + + fn len(&self) -> impl Future; +} + +impl VortexReadAt for Arc { + async fn read_at_into(&self, pos: u64, buffer: BytesMut) -> io::Result { + T::read_at_into(self, pos, buffer).await + } + + fn performance_hint(&self) -> usize { + T::performance_hint(self) + } + + async fn len(&self) -> u64 { + T::len(self).await + } +} + +impl VortexRead for BytesMut { + async fn read_into(&mut self, buffer: BytesMut) -> io::Result { + if buffer.len() > self.len() { + Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + vortex_err!("unexpected eof"), + )) + } else { + Ok(self.split_to(buffer.len())) + } + } +} + +impl VortexRead for &mut R { + async fn read_into(&mut self, buffer: BytesMut) -> io::Result { + R::read_into(*self, buffer).await + } +} + +impl VortexRead for Cursor { + async fn read_into(&mut self, buffer: BytesMut) -> io::Result { + let res = R::read_at_into(self.get_ref(), self.position(), buffer).await?; + self.set_position(self.position() + res.len() as u64); + Ok(res) + } +} + +impl VortexReadAt for &R { + async fn read_at_into(&self, pos: u64, buffer: BytesMut) -> io::Result { + R::read_at_into(*self, pos, buffer).await + } + + fn performance_hint(&self) -> usize { + R::performance_hint(*self) + } + + async fn len(&self) -> u64 { + R::len(*self).await + } +} + +impl VortexReadAt for Vec { + fn read_at_into( + &self, + pos: u64, + buffer: BytesMut, + ) -> impl Future> { + VortexReadAt::read_at_into(self.as_slice(), pos, buffer) + } + + async fn len(&self) -> u64 { + self.len() as u64 + } +} + +impl VortexReadAt for [u8] { + async fn read_at_into(&self, pos: u64, mut buffer: BytesMut) -> io::Result { + if buffer.len() + pos as usize > self.len() { + Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + vortex_err!("unexpected eof"), + )) + } else { + let buffer_len = buffer.len(); + buffer.copy_from_slice(&self[pos as usize..][..buffer_len]); + Ok(buffer) + } + } + + async fn len(&self) -> u64 { + self.len() as u64 + } +} + +// impl VortexReadAt for &[u8] { +// async fn read_at_into(&self, pos: u64, buffer: BytesMut) -> io::Result { +// VortexReadAt::read_at_into(*self, pos, buffer).await +// } +// +// fn len(&self) -> usize { +// (*self).len() +// } +// } + +impl VortexReadAt for Buffer { + async fn read_at_into(&self, pos: u64, mut buffer: BytesMut) -> io::Result { + if buffer.len() + pos as usize > self.len() { + Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + vortex_err!("unexpected eof"), + )) + } else { + let buffer_len = buffer.len(); + buffer.copy_from_slice( + self.slice(pos as usize..pos as usize + buffer_len) + .as_slice(), + ); + Ok(buffer) + } + } + + async fn len(&self) -> u64 { + self.len() as u64 + } +} diff --git a/vortex-ipc/src/io/tokio.rs b/vortex-serde/src/io/tokio.rs similarity index 94% rename from vortex-ipc/src/io/tokio.rs rename to vortex-serde/src/io/tokio.rs index 8703e6c8b5..6df3bb7a5e 100644 --- a/vortex-ipc/src/io/tokio.rs +++ b/vortex-serde/src/io/tokio.rs @@ -25,6 +25,10 @@ impl VortexReadAt for TokioAdapter { std_file.read_exact_at(buffer.as_mut(), pos)?; Ok(buffer) } + + async fn len(&self) -> u64 { + self.0.metadata().await.unwrap().len() + } } impl VortexWrite for TokioAdapter { diff --git a/vortex-ipc/src/io/write.rs b/vortex-serde/src/io/write.rs similarity index 100% rename from vortex-ipc/src/io/write.rs rename to vortex-serde/src/io/write.rs diff --git a/vortex-ipc/src/lib.rs b/vortex-serde/src/lib.rs similarity index 70% rename from vortex-ipc/src/lib.rs rename to vortex-serde/src/lib.rs index 6a1e550a26..3af3cee528 100644 --- a/vortex-ipc/src/lib.rs +++ b/vortex-serde/src/lib.rs @@ -2,6 +2,7 @@ pub use message_reader::*; pub use message_writer::*; pub mod chunked_reader; +pub mod file; pub mod io; mod message_reader; mod message_writer; @@ -39,9 +40,10 @@ pub mod flatbuffers { } #[cfg(test)] -pub mod test { +mod test { use std::sync::Arc; + use futures_executor::block_on; use futures_util::io::Cursor; use futures_util::{pin_mut, StreamExt, TryStreamExt}; use itertools::Itertools; @@ -56,23 +58,17 @@ pub mod test { use crate::writer::ArrayWriter; use crate::MessageReader; - pub async fn create_stream() -> Vec { - let array = PrimitiveArray::from(vec![0, 1, 2]).into_array(); - let chunked_array = - ChunkedArray::try_new(vec![array.clone(), array.clone()], array.dtype().clone()) + fn write_ipc(array: A) -> Vec { + block_on(async { + ArrayWriter::new(vec![]) + .write_array(array.into_array()) + .await .unwrap() - .into_array(); - - ArrayWriter::new(vec![]) - .write_array(array) - .await - .unwrap() - .write_array(chunked_array) - .await - .unwrap() - .into_inner() + .into_inner() + }) } +<<<<<<< HEAD:vortex-ipc/src/lib.rs async fn write_ipc(array: A) -> Vec { ArrayWriter::new(vec![]) .write_array(array.into_array()) @@ -84,28 +80,33 @@ pub mod test { #[tokio::test] #[cfg_attr(miri, ignore)] async fn test_empty_index() -> VortexResult<()> { +======= + #[test] + fn test_empty_index() -> VortexResult<()> { +>>>>>>> 7a239b3c (Add vortex file format):vortex-serde/src/lib.rs let data = PrimitiveArray::from((0i32..3_000_000).collect_vec()); - let buffer = write_ipc(data).await; + let buffer = write_ipc(data); let indices = PrimitiveArray::from(vec![1, 2, 10]).into_array(); let ctx = Arc::new(Context::default()); - let mut messages = MessageReader::try_new(FuturesAdapter(Cursor::new(buffer))) - .await - .unwrap(); - let reader = messages.array_stream_from_messages(ctx).await?; + let mut messages = block_on(async { + MessageReader::try_new(FuturesAdapter(Cursor::new(buffer))) + .await + .unwrap() + }); + let reader = block_on(async { messages.array_stream_from_messages(ctx).await })?; let result_iter = reader.take_rows(indices).unwrap(); pin_mut!(result_iter); - let result = result_iter.next().await.unwrap().unwrap(); + let result = block_on(async { result_iter.next().await.unwrap().unwrap() }); println!("Taken {:?}", result); Ok(()) } - #[tokio::test] - #[cfg_attr(miri, ignore)] - async fn test_write_read_chunked() -> VortexResult<()> { + #[test] + fn test_write_read_chunked() -> VortexResult<()> { let indices = PrimitiveArray::from(vec![ 10u32, 11, 12, 13, 100_000, 2_999_999, 2_999_999, 3_000_000, ]) @@ -116,18 +117,17 @@ pub mod test { let data2 = PrimitiveArray::from((3_000_000i32..6_000_000).rev().collect_vec()).into_array(); let chunked = ChunkedArray::try_new(vec![data.clone(), data2], data.dtype().clone())?; - let buffer = write_ipc(chunked).await; + let buffer = write_ipc(chunked); - let mut messages = MessageReader::try_new(FuturesAdapter(Cursor::new(buffer))).await?; + let mut messages = + block_on(async { MessageReader::try_new(FuturesAdapter(Cursor::new(buffer))).await })?; let ctx = Arc::new(Context::default()); - let take_iter = messages - .array_stream_from_messages(ctx) - .await? + let take_iter = block_on(async { messages.array_stream_from_messages(ctx).await })? .take_rows(indices)?; pin_mut!(take_iter); - let next = take_iter.try_next().await?.expect("Expected a chunk"); + let next = block_on(async { take_iter.try_next().await })?.expect("Expected a chunk"); assert_eq!(next.encoding().id(), PrimitiveEncoding.id()); assert_eq!( @@ -135,9 +135,7 @@ pub mod test { vec![2999989, 2999988, 2999987, 2999986, 2899999, 0, 0] ); assert_eq!( - take_iter - .try_next() - .await? + block_on(async { take_iter.try_next().await })? .expect("Expected a chunk") .as_primitive() .maybe_null_slice::(), diff --git a/vortex-ipc/src/message_reader.rs b/vortex-serde/src/message_reader.rs similarity index 93% rename from vortex-ipc/src/message_reader.rs rename to vortex-serde/src/message_reader.rs index c27f80b244..a4fcc4f1d0 100644 --- a/vortex-ipc/src/message_reader.rs +++ b/vortex-serde/src/message_reader.rs @@ -5,13 +5,14 @@ use bytes::{Buf, BytesMut}; use flatbuffers::{root, root_unchecked}; use futures_util::stream::try_unfold; use itertools::Itertools; -use vortex::stream::{ArrayStream, ArrayStreamAdapter}; + use vortex::{Array, ArrayView, Context, IntoArray, ToArray}; +use vortex::stream::{ArrayStream, ArrayStreamAdapter}; use vortex_buffer::Buffer; use vortex_dtype::DType; use vortex_error::{vortex_bail, vortex_err, VortexError, VortexResult}; -use crate::flatbuffers::ipc as fb; +use crate::flatbuffers::serde as fb; use crate::io::VortexRead; pub struct MessageReader { @@ -194,7 +195,7 @@ impl MessageReader { Ok(Some(view.into_array())) } - /// Construct an ArrayStream pulling the ViewContext and DType from the stream. + /// Construct an ArrayStream pulling the DType from the stream. pub async fn array_stream_from_messages( &mut self, ctx: Arc, @@ -282,24 +283,29 @@ mod test { use std::io::Cursor; use bytes::Bytes; + use futures_executor::block_on; + use vortex_buffer::Buffer; use crate::{MessageReader, MessageWriter}; - #[tokio::test] - #[cfg_attr(miri, ignore)] - async fn read_write_page() { + #[test] + fn read_write_page() { let write = Vec::new(); let mut writer = MessageWriter::new(write); - writer - .write_page(Buffer::Bytes(Bytes::from("somevalue"))) - .await - .unwrap(); + block_on(async { + writer + .write_page(Buffer::Bytes(Bytes::from("somevalue"))) + .await + }) + .unwrap(); let written = writer.into_inner(); - let mut reader = MessageReader::try_new(Cursor::new(written.as_slice())) - .await + let mut reader = + block_on(async { MessageReader::try_new(Cursor::new(written.as_slice())).await }) + .unwrap(); + let read_page = block_on(async { reader.maybe_read_page().await }) + .unwrap() .unwrap(); - let read_page = reader.maybe_read_page().await.unwrap().unwrap(); assert_eq!(read_page, Buffer::Bytes(Bytes::from("somevalue"))); } } diff --git a/vortex-ipc/src/message_writer.rs b/vortex-serde/src/message_writer.rs similarity index 100% rename from vortex-ipc/src/message_writer.rs rename to vortex-serde/src/message_writer.rs diff --git a/vortex-ipc/src/messages.rs b/vortex-serde/src/messages.rs similarity index 98% rename from vortex-ipc/src/messages.rs rename to vortex-serde/src/messages.rs index e56bdba912..d2d2a75f16 100644 --- a/vortex-ipc/src/messages.rs +++ b/vortex-serde/src/messages.rs @@ -6,8 +6,8 @@ use vortex_buffer::Buffer; use vortex_dtype::DType; use vortex_flatbuffers::{FlatBufferRoot, WriteFlatBuffer}; -use crate::flatbuffers::ipc as fb; -use crate::flatbuffers::ipc::Compression; +use crate::flatbuffers::serde as fb; +use crate::flatbuffers::serde::Compression; use crate::ALIGNMENT; pub enum IPCMessage<'a> { diff --git a/vortex-ipc/src/stream_reader/mod.rs b/vortex-serde/src/stream_reader/mod.rs similarity index 95% rename from vortex-ipc/src/stream_reader/mod.rs rename to vortex-serde/src/stream_reader/mod.rs index 6368f310c1..078616b798 100644 --- a/vortex-ipc/src/stream_reader/mod.rs +++ b/vortex-serde/src/stream_reader/mod.rs @@ -27,10 +27,10 @@ impl StreamArrayReader { }) } - pub fn with_dtype(self, dtype: DType) -> Self { + pub fn with_dtype(self, dtype: Arc) -> Self { assert!(self.dtype.is_none(), "DType already set"); Self { - dtype: Some(Arc::new(dtype)), + dtype: Some(dtype), ..self } } diff --git a/vortex-ipc/src/writer.rs b/vortex-serde/src/writer.rs similarity index 100% rename from vortex-ipc/src/writer.rs rename to vortex-serde/src/writer.rs From e6c4818c383039fdc432b84a19e8af979783ac4b Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Tue, 23 Jul 2024 14:14:10 +0100 Subject: [PATCH 02/47] less --- vortex-serde/Cargo.toml | 4 +- vortex-serde/flatbuffers/footer.fbs | 15 +- vortex-serde/src/chunked_reader/mod.rs | 2 +- vortex-serde/src/chunked_reader/take_rows.rs | 3 +- vortex-serde/src/file/file_reader.rs | 6 +- vortex-serde/src/file/file_writer.rs | 106 +++++++++----- vortex-serde/src/file/layouts.rs | 145 +++++++++++++++++++ vortex-serde/src/file/mod.rs | 1 + vortex-serde/src/lib.rs | 13 +- 9 files changed, 242 insertions(+), 53 deletions(-) create mode 100644 vortex-serde/src/file/layouts.rs diff --git a/vortex-serde/Cargo.toml b/vortex-serde/Cargo.toml index fff6e21018..08d996283d 100644 --- a/vortex-serde/Cargo.toml +++ b/vortex-serde/Cargo.toml @@ -46,8 +46,8 @@ vortex-alp = { path = "../encodings/alp" } vortex-fastlanes = { path = "../encodings/fastlanes" } vortex-sampling-compressor = { path = "../vortex-sampling-compressor" } -[lints] -workspace = true +#[lints] +#workspace = true [features] default = ["futures", "monoio", "tokio"] diff --git a/vortex-serde/flatbuffers/footer.fbs b/vortex-serde/flatbuffers/footer.fbs index 39e377f484..0a73356531 100644 --- a/vortex-serde/flatbuffers/footer.fbs +++ b/vortex-serde/flatbuffers/footer.fbs @@ -1,17 +1,18 @@ -namespace vortex.serde; +namespace vortex.footer; -table OpaqueLayout { - byte_offset: uint64; +table FlatLayout { + begin: uint64; + end: uint64; } -table ArrayLayout { +table NestedLayout { children: [Layout]; encoding: uint16; } union LayoutVariant { - OpaqueLayout, - ArrayLayout, + FlatLayout, + NestedLayout, } table Layout { @@ -19,7 +20,7 @@ table Layout { } table Footer { - layout: ArrayLayout; + layout: Layout; } root_type Footer; \ No newline at end of file diff --git a/vortex-serde/src/chunked_reader/mod.rs b/vortex-serde/src/chunked_reader/mod.rs index 82868f269c..3d845ae259 100644 --- a/vortex-serde/src/chunked_reader/mod.rs +++ b/vortex-serde/src/chunked_reader/mod.rs @@ -1,7 +1,7 @@ use std::io::Cursor; use std::sync::Arc; -use vortex::compute::unary::scalar_at::scalar_at; +use vortex::compute::unary::scalar_at; use vortex::stream::ArrayStream; use vortex::{Array, Context}; use vortex_dtype::DType; diff --git a/vortex-serde/src/chunked_reader/take_rows.rs b/vortex-serde/src/chunked_reader/take_rows.rs index 9d2c4ab3a1..85a9b1489b 100644 --- a/vortex-serde/src/chunked_reader/take_rows.rs +++ b/vortex-serde/src/chunked_reader/take_rows.rs @@ -9,8 +9,7 @@ use vortex::{Array, ArrayDType, IntoArray, IntoArrayVariant}; use vortex::array::chunked::ChunkedArray; use vortex::array::primitive::PrimitiveArray; use vortex::compute::{search_sorted, SearchResult, SearchSortedSide, slice, take}; -use vortex::compute::unary::cast::try_cast; -use vortex::compute::unary::scalar_subtract::subtract_scalar; +use vortex::compute::unary::{subtract_scalar, try_cast}; use vortex::stats::ArrayStatistics; use vortex::stream::{ArrayStream, ArrayStreamExt}; use vortex_dtype::PType; diff --git a/vortex-serde/src/file/file_reader.rs b/vortex-serde/src/file/file_reader.rs index 64d665f363..934848cbe2 100644 --- a/vortex-serde/src/file/file_reader.rs +++ b/vortex-serde/src/file/file_reader.rs @@ -22,10 +22,10 @@ impl FileReader { } pub async fn read_metadata(&mut self, _columns: &[&str]) -> VortexResult { - let mut buf = BytesMut::with_capacity(Self::FOOTER_READ_SIZE); - unsafe { buf.set_len(Self::FOOTER_READ_SIZE) } + let mut _buf = BytesMut::with_capacity(Self::FOOTER_READ_SIZE); + unsafe { _buf.set_len(Self::FOOTER_READ_SIZE) } let read_offset = self.len().await - Self::FOOTER_READ_SIZE as u64; - buf = self.read.read_at_into(read_offset, buf).await?; + _buf = self.read.read_at_into(read_offset, _buf).await?; // Ok(FileMetadata {}) todo!() diff --git a/vortex-serde/src/file/file_writer.rs b/vortex-serde/src/file/file_writer.rs index e356398ceb..a7bb3861bb 100644 --- a/vortex-serde/src/file/file_writer.rs +++ b/vortex-serde/src/file/file_writer.rs @@ -1,16 +1,20 @@ use std::mem; +use flatbuffers::{FlatBufferBuilder, WIPOffset}; use futures::{Stream, TryStreamExt}; use itertools::Itertools; use vortex::array::chunked::ChunkedArray; use vortex::array::struct_::StructArray; -use vortex::array::varbin::builder::VarBinBuilder; use vortex::stream::ArrayStream; use vortex::validity::Validity; use vortex::{Array, ArrayDType, IntoArray}; -use vortex_dtype::{DType, Nullability}; +use vortex_buffer::io_buf::IoBuf; +use vortex_dtype::DType; use vortex_error::{vortex_bail, VortexResult}; +use vortex_flatbuffers::WriteFlatBuffer; +use crate::file::layouts::{ChunkedLayout, FlatLayout, Layout, StructLayout}; +use crate::flatbuffers::footer as fb; use crate::io::VortexWrite; use crate::writer::ChunkLayout; use crate::MessageWriter; @@ -24,6 +28,33 @@ pub struct FileWriter { column_chunks: Vec, } +pub struct Footer { + layout: Layout, +} + +impl Footer { + pub fn new(layout: Layout) -> Self { + Self { layout } + } +} + +impl WriteFlatBuffer for Footer { + type Target<'a> = fb::Footer<'a>; + + fn write_flatbuffer<'fb>( + &self, + fbb: &mut FlatBufferBuilder<'fb>, + ) -> WIPOffset> { + let layout_offset = self.layout.write_flatbuffer(fbb); + fb::Footer::create( + fbb, + &fb::FooterArgs { + layout: Some(layout_offset), + }, + ) + } +} + impl FileWriter { pub fn new(write: W) -> Self { FileWriter { @@ -117,15 +148,16 @@ impl FileWriter { } } - async fn write_metadata_arrays(&mut self) -> VortexResult { - let DType::Struct(s, _) = self.dtype.as_ref().expect("Should have written values") else { + async fn write_metadata_arrays(&mut self) -> VortexResult { + let DType::Struct(..) = self.dtype.as_ref().expect("Should have written values") else { unreachable!("Values are a structarray") }; - let mut column_names = VarBinBuilder::::with_capacity(s.names().len()); - let mut metadata_offsets = Vec::new(); + let mut column_layouts = Vec::with_capacity(self.column_chunks.len()); + + for mut chunk in mem::take(&mut self.column_chunks) { + let mut chunks = Vec::new(); - for (name, mut chunk) in s.names().iter().zip(mem::take(&mut self.column_chunks)) { let len = chunk.byte_offsets.len() - 1; let byte_counts = chunk .byte_offsets @@ -134,6 +166,13 @@ impl FileWriter { .zip(chunk.byte_offsets.iter()) .map(|(a, b)| a - b) .collect_vec(); + chunks.extend( + chunk + .byte_offsets + .iter() + .zip(chunk.byte_offsets.iter().skip(1)) + .map(|(begin, end)| Layout::Flat(FlatLayout::new(*begin, *end))), + ); let row_counts = chunk .row_offsets .iter() @@ -162,48 +201,43 @@ impl FileWriter { Validity::NonNullable, )?; - column_names.push_value(name.as_bytes()); - metadata_offsets.push(self.msgs.tell()); + let metadata_table_begin = self.msgs.tell(); self.msgs.write_dtype(metadata_array.dtype()).await?; self.msgs.write_chunk(metadata_array.into_array()).await?; + chunks.push(Layout::Flat(FlatLayout::new( + metadata_table_begin, + self.msgs.tell(), + ))); + column_layouts.push(Layout::Chunked(ChunkedLayout::new(chunks))); } - let meta_array = StructArray::try_new( - ["names".into(), "metadata_offsets".into()].into(), - vec![ - column_names - .finish(DType::Utf8(Nullability::NonNullable)) - .into_array(), - metadata_offsets.into_array(), - ], - s.names().len(), - Validity::NonNullable, - ) - .unwrap() - .into_array(); - - Ok(meta_array) + Ok(StructLayout::new(column_layouts)) } - async fn write_metadata_offsets(&mut self, metadata_offsets: Array) -> VortexResult { - let offset = self.msgs.tell(); + async fn write_file_trailer(self, footer: Footer, schema_offset: u64) -> VortexResult { + let footer_offset = self.msgs.tell(); + let mut w = self.msgs.into_inner(); - self.msgs.write_dtype(metadata_offsets.dtype()).await?; - self.msgs.write_chunk(metadata_offsets).await?; - Ok(offset) - } + let mut fbb = FlatBufferBuilder::new(); + footer.write_flatbuffer(&mut fbb); + let (buffer, buffer_begin) = fbb.collapse(); + let buffer_end = buffer.len(); - async fn write_file_trailer(self, metadata_offsets_offset: u64) -> VortexResult { - let mut w = self.msgs.into_inner(); - w.write_all(metadata_offsets_offset.to_le_bytes()).await?; + w.write_all(buffer.slice(buffer_begin, buffer_end)).await?; + w.write_all(schema_offset.to_le_bytes()).await?; + w.write_all(footer_offset.to_le_bytes()).await?; w.write_all(MAGIC_BYTES).await?; Ok(w) } pub async fn finalize(mut self) -> VortexResult { - let metadata_offsets = self.write_metadata_arrays().await?; - let metadata_offsets = self.write_metadata_offsets(metadata_offsets).await?; - self.write_file_trailer(metadata_offsets).await + let top_level_layout = self.write_metadata_arrays().await?; + let schema_offset = self.msgs.tell(); + self.msgs + .write_dtype(self.dtype.as_ref().expect("Should have gotten a schema")) + .await?; + self.write_file_trailer(Footer::new(Layout::Struct(top_level_layout)), schema_offset) + .await } } diff --git a/vortex-serde/src/file/layouts.rs b/vortex-serde/src/file/layouts.rs new file mode 100644 index 0000000000..f378697d61 --- /dev/null +++ b/vortex-serde/src/file/layouts.rs @@ -0,0 +1,145 @@ +use flatbuffers::{FlatBufferBuilder, WIPOffset}; +use vortex_flatbuffers::WriteFlatBuffer; + +use crate::flatbuffers::footer as fb; +use crate::writer::ByteRange; + +#[derive(Debug, Clone)] +pub enum Layout { + Chunked(ChunkedLayout), + Struct(StructLayout), + Flat(FlatLayout), +} + +impl WriteFlatBuffer for Layout { + type Target<'a> = fb::Layout<'a>; + + fn write_flatbuffer<'fb>( + &self, + fbb: &mut FlatBufferBuilder<'fb>, + ) -> WIPOffset> { + let layout_variant = match self { + Self::Chunked(l) => l.write_flatbuffer(fbb).as_union_value(), + Self::Struct(l) => l.write_flatbuffer(fbb).as_union_value(), + Self::Flat(l) => l.write_flatbuffer(fbb).as_union_value(), + }; + + let mut layout = fb::LayoutBuilder::new(fbb); + layout.add_layout_type(match self { + Self::Chunked(_) => fb::LayoutVariant::NestedLayout, + Self::Struct(_) => fb::LayoutVariant::NestedLayout, + Self::Flat(_) => fb::LayoutVariant::FlatLayout, + }); + layout.add_layout(layout_variant); + layout.finish() + } +} + +#[derive(Debug, Copy, Clone)] +pub struct FlatLayout { + range: ByteRange, +} + +impl WriteFlatBuffer for FlatLayout { + type Target<'a> = fb::FlatLayout<'a>; + + fn write_flatbuffer<'fb>( + &self, + fbb: &mut FlatBufferBuilder<'fb>, + ) -> WIPOffset> { + fb::FlatLayout::create( + fbb, + &fb::FlatLayoutArgs { + begin: self.range.begin, + end: self.range.end, + }, + ) + } +} + +impl FlatLayout { + pub fn new(begin: u64, end: u64) -> Self { + Self { + range: ByteRange { begin, end }, + } + } +} + +#[derive(Debug, Clone)] +pub struct ChunkedLayout { + children: Vec, +} + +impl WriteFlatBuffer for ChunkedLayout { + type Target<'a> = fb::NestedLayout<'a>; + + fn write_flatbuffer<'fb>( + &self, + fbb: &mut FlatBufferBuilder<'fb>, + ) -> WIPOffset> { + let child_offsets = self + .children + .iter() + .map(|c| c.write_flatbuffer(fbb)) + .collect::>(); + let child_vector = fbb.create_vector(&child_offsets); + fb::NestedLayout::create( + fbb, + &fb::NestedLayoutArgs { + children: Some(child_vector), + // TODO(robert): Make this pluggable + encoding: 1u16, + }, + ) + } +} + +impl ChunkedLayout { + pub fn new(child_ranges: Vec) -> Self { + Self { + children: child_ranges, + } + } + + pub fn metadata_range(&self) -> &Layout { + &self.children[self.children.len() - 1] + } +} + +// TODO(robert): Should struct layout store a schema? How do you pick a child by name +#[derive(Debug, Clone)] +pub struct StructLayout { + children: Vec, +} + +impl WriteFlatBuffer for StructLayout { + type Target<'a> = fb::NestedLayout<'a>; + + fn write_flatbuffer<'fb>( + &self, + fbb: &mut FlatBufferBuilder<'fb>, + ) -> WIPOffset> { + let child_offsets = self + .children + .iter() + .map(|c| c.write_flatbuffer(fbb)) + .collect::>(); + let child_vector = fbb.create_vector(&child_offsets); + fb::NestedLayout::create( + fbb, + &fb::NestedLayoutArgs { + children: Some(child_vector), + // TODO(robert): Make this pluggable + encoding: 2u16, + }, + ) + } +} + +impl StructLayout { + pub fn new(child_ranges: Vec) -> Self { + Self { + children: child_ranges, + } + } +} diff --git a/vortex-serde/src/file/mod.rs b/vortex-serde/src/file/mod.rs index 1dd4d84c4c..0e1c8b701a 100644 --- a/vortex-serde/src/file/mod.rs +++ b/vortex-serde/src/file/mod.rs @@ -2,3 +2,4 @@ mod column_metadata; mod file_metadata; mod file_reader; mod file_writer; +mod layouts; diff --git a/vortex-serde/src/lib.rs b/vortex-serde/src/lib.rs index 3af3cee528..4cfee840f0 100644 --- a/vortex-serde/src/lib.rs +++ b/vortex-serde/src/lib.rs @@ -13,16 +13,25 @@ pub mod writer; pub const ALIGNMENT: usize = 64; pub mod flatbuffers { - pub use generated::vortex::*; + pub use generated_footer::vortex::*; + pub use generated_message::vortex::*; #[allow(unused_imports)] #[allow(dead_code)] #[allow(non_camel_case_types)] #[allow(clippy::all)] - mod generated { + mod generated_message { include!(concat!(env!("OUT_DIR"), "/flatbuffers/message.rs")); } + #[allow(unused_imports)] + #[allow(dead_code)] + #[allow(non_camel_case_types)] + #[allow(clippy::all)] + mod generated_footer { + include!(concat!(env!("OUT_DIR"), "/flatbuffers/footer.rs")); + } + mod deps { pub mod array { pub use vortex::flatbuffers as array; From b47f95c3395a60114d6bbfb5e47ad26966ecc34b Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Tue, 23 Jul 2024 15:56:53 +0100 Subject: [PATCH 03/47] bug fix and minimal reader --- vortex-serde/src/file/file_metadata.rs | 10 -- vortex-serde/src/file/file_reader.rs | 132 ++++++++++++++++++++++--- vortex-serde/src/file/file_writer.rs | 3 +- vortex-serde/src/file/footer.rs | 19 ++++ vortex-serde/src/file/layouts.rs | 45 +++++++++ vortex-serde/src/file/mod.rs | 2 +- 6 files changed, 185 insertions(+), 26 deletions(-) delete mode 100644 vortex-serde/src/file/file_metadata.rs create mode 100644 vortex-serde/src/file/footer.rs diff --git a/vortex-serde/src/file/file_metadata.rs b/vortex-serde/src/file/file_metadata.rs deleted file mode 100644 index 199ac4fdff..0000000000 --- a/vortex-serde/src/file/file_metadata.rs +++ /dev/null @@ -1,10 +0,0 @@ -use crate::file::column_metadata::ColumnMetadata; - -#[allow(dead_code)] -#[derive(Debug)] -pub struct FileMetadata { - column_metas: Vec, - row_boundaries: Vec, -} - -impl FileMetadata {} diff --git a/vortex-serde/src/file/file_reader.rs b/vortex-serde/src/file/file_reader.rs index 934848cbe2..9a19ad5ed5 100644 --- a/vortex-serde/src/file/file_reader.rs +++ b/vortex-serde/src/file/file_reader.rs @@ -1,7 +1,13 @@ use bytes::BytesMut; +use flatbuffers::root; +use futures::Stream; +use vortex::Array; use vortex_error::VortexResult; -use crate::file::file_metadata::FileMetadata; +use super::layouts::Layout; +use crate::file::file_writer::MAGIC_BYTES; +use crate::file::footer::Footer; +use crate::flatbuffers::footer as fb; use crate::io::VortexReadAt; pub struct FileReader { @@ -9,11 +15,17 @@ pub struct FileReader { len: Option, } -impl FileReader { - const FOOTER_READ_SIZE: usize = 8 * 1024 * 1024; +pub struct FileReaderBuilder { + inner: R, + len: Option, +} - pub fn new(read: R) -> Self { - Self { read, len: None } +impl FileReaderBuilder { + pub fn new(reader: R) -> Self { + Self { + inner: reader, + len: None, + } } pub fn with_length(mut self, len: u64) -> Self { @@ -21,14 +33,48 @@ impl FileReader { self } - pub async fn read_metadata(&mut self, _columns: &[&str]) -> VortexResult { - let mut _buf = BytesMut::with_capacity(Self::FOOTER_READ_SIZE); - unsafe { _buf.set_len(Self::FOOTER_READ_SIZE) } - let read_offset = self.len().await - Self::FOOTER_READ_SIZE as u64; - _buf = self.read.read_at_into(read_offset, _buf).await?; + pub fn build(self) -> FileReader { + FileReader { + read: self.inner, + len: self.len, + } + } +} - // Ok(FileMetadata {}) - todo!() +impl FileReader { + const FOOTER_READ_SIZE: usize = 8 * 1024 * 1024; + + pub async fn read_footer(&mut self) -> VortexResult