Skip to content

Commit

Permalink
IPC Prototype (#181)
Browse files Browse the repository at this point in the history
Very early prototype, serde is not correctly implemented for arrays, but
the structure is almost there.
  • Loading branch information
gatesn authored Apr 3, 2024
1 parent 4b59675 commit 538d089
Show file tree
Hide file tree
Showing 110 changed files with 2,992 additions and 946 deletions.
447 changes: 327 additions & 120 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ members = [
"vortex-dict",
"vortex-error",
"vortex-fastlanes",
"vortex-flatbuffers",
"vortex-ipc",
"vortex-ree",
"vortex-roaring",
"vortex-schema",
Expand Down Expand Up @@ -47,6 +49,7 @@ criterion = { version = "0.5.1", features = ["html_reports"] }
croaring = "1.0.1"
divan = "0.1.14"
flatbuffers = "23.5.26"
flexbuffers = "2.0.0"
flatc = "0.2.2"
half = { version = "^2", features = ["std", "num-traits"] }
hashbrown = "0.14.3"
Expand Down
48 changes: 31 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,15 @@ without prior discussion infeasible. If you are interested in contributing, plea

This repo uses submodules for non-Rust dependencies (e.g., for the zig fastlanez repo). Before building make sure to run

* `git submodule update --init --recursive`
* `./zigup` (this will install the zig version required by fastlanez)
```bash
git submodule update --init --recursive

# Install the zig version required by fastlanez
./zigup

# Install Rye from https://rye-up.com, and setup the virtualenv
rye sync
```

## License

Expand All @@ -172,24 +179,31 @@ This project is inspired by and--in some cases--directly based upon the existing
and OSS developers.

In particular, the following academic papers greatly influenced the development:
* Maximilian Kuschewski, David Sauerwein, Adnan Alhomssi, and Viktor Leis. 2023. [BtrBlocks: Efficient Columnar Compression
for Data Lakes](https://www.cs.cit.tum.de/fileadmin/w00cfj/dis/papers/btrblocks.pdf). Proc. ACM Manag. Data 1, 2,
Article 118 (June 2023), 14 pages. https://doi.org/10.1145/3589263
* Azim Afroozeh and Peter Boncz. [The FastLanes Compression Layout: Decoding >100 Billion Integers per Second with Scalar
Code](https://www.vldb.org/pvldb/vol16/p2132-afroozeh.pdf). PVLDB, 16(9): 2132 - 2144, 2023.
* Peter Boncz, Thomas Neumann, and Viktor Leis. [FSST: Fast Random Access String
Compression](https://www.vldb.org/pvldb/vol13/p2649-boncz.pdf).
PVLDB, 13(11): 2649-2661, 2020.
* Azim Afroozeh, Leonardo X. Kuffo, and Peter Boncz. 2023. [ALP: Adaptive Lossless floating-Point
Compression](https://ir.cwi.nl/pub/33334/33334.pdf). Proc. ACM
Manag. Data 1, 4 (SIGMOD), Article 230 (December 2023), 26 pages. https://doi.org/10.1145/3626717

* Maximilian Kuschewski, David Sauerwein, Adnan Alhomssi, and Viktor Leis.
2023. [BtrBlocks: Efficient Columnar Compression
for Data Lakes](https://www.cs.cit.tum.de/fileadmin/w00cfj/dis/papers/btrblocks.pdf). Proc. ACM Manag. Data 1,
2,
Article 118 (June 2023), 14 pages. https://doi.org/10.1145/3589263
* Azim Afroozeh and Peter
Boncz. [The FastLanes Compression Layout: Decoding >100 Billion Integers per Second with Scalar
Code](https://www.vldb.org/pvldb/vol16/p2132-afroozeh.pdf). PVLDB, 16(9): 2132 - 2144, 2023.
* Peter Boncz, Thomas Neumann, and Viktor Leis. [FSST: Fast Random Access String
Compression](https://www.vldb.org/pvldb/vol13/p2649-boncz.pdf).
PVLDB, 13(11): 2649-2661, 2020.
* Azim Afroozeh, Leonardo X. Kuffo, and Peter Boncz. 2023. [ALP: Adaptive Lossless floating-Point
Compression](https://ir.cwi.nl/pub/33334/33334.pdf). Proc. ACM
Manag. Data 1, 4 (SIGMOD), Article 230 (December 2023), 26 pages. https://doi.org/10.1145/3626717

Additionally, we benefited greatly from:
* the collected OSS work of [Daniel Lemire](https://github.com/lemire), such as [FastPFor](https://github.com/lemire/FastPFor),
and [StreamVByte](https://github.com/lemire/streamvbyte).
* the [parquet2](https://github.com/jorgecarleitao/parquet2) project by [Jorge Leitao](https://github.com/jorgecarleitao).

* the collected OSS work of [Daniel Lemire](https://github.com/lemire), such
as [FastPFor](https://github.com/lemire/FastPFor),
and [StreamVByte](https://github.com/lemire/streamvbyte).
* the [parquet2](https://github.com/jorgecarleitao/parquet2) project
by [Jorge Leitao](https://github.com/jorgecarleitao).
* the public discussions around choices of compression codecs, as well as the C++ implementations thereof,
from [duckdb](https://github.com/duckdb/duckdb).
from [duckdb](https://github.com/duckdb/duckdb).
* the existence, ideas, & implementation of the [Apache Arrow](https://arrow.apache.org) project.
* the [Velox](https://github.com/facebookincubator/velox) project and discussions with its maintainers.

Expand Down
1 change: 1 addition & 0 deletions bench-vortex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ vortex-datetime = { path = "../vortex-datetime" }
vortex-dict = { path = "../vortex-dict" }
vortex-error = { path = "../vortex-error", features = ["parquet"] }
vortex-fastlanes = { path = "../vortex-fastlanes" }
vortex-ipc = { path = "../vortex-ipc" }
vortex-ree = { path = "../vortex-ree" }
vortex-roaring = { path = "../vortex-roaring" }
vortex-schema = { path = "../vortex-schema" }
Expand Down
46 changes: 46 additions & 0 deletions bench-vortex/src/bin/ipc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use log::LevelFilter;
use std::fs::File;

use bench_vortex::reader::open_vortex;
use bench_vortex::setup_logger;
use bench_vortex::taxi_data::taxi_data_vortex;
use vortex::array::primitive::PrimitiveArray;
use vortex::array::Array;
use vortex::compute::take::take;
use vortex::serde::context::SerdeContext;
use vortex_error::VortexResult;
use vortex_ipc::iter::FallibleLendingIterator;
use vortex_ipc::reader::StreamReader;
use vortex_ipc::writer::StreamWriter;

pub fn main() -> VortexResult<()> {
setup_logger(LevelFilter::Error);

let array = open_vortex(&taxi_data_vortex())?;
println!("Array {}", &array);

//let ipc = idempotent("ipc.vortex", |path| {
let ipc = "bench-vortex/data/ipc.vortex";
let mut write = File::create("bench-vortex/data/ipc.vortex")?;
let ctx = SerdeContext::default();
let mut writer = StreamWriter::try_new(&mut write, ctx)?;
writer.write(&array)?;
//})?;

// Now try to read from the IPC stream.
let mut read = File::open(ipc)?;
let mut ipc_reader = StreamReader::try_new(&mut read)?;

// We know we only wrote a single array.
// TODO(ngates): create an option to skip the multi-array reader?
let mut array_reader = ipc_reader.next()?.unwrap();
println!("DType: {:?}", array_reader.dtype());
// Read some number of chunks from the stream.
while let Some(chunk) = array_reader.next().unwrap() {
println!("VIEW: {}", (&chunk as &dyn Array));
let taken = take(&chunk, &PrimitiveArray::from(vec![0, 1, 0, 1])).unwrap();
println!("Taken: {}", &taken);
}

Ok(())
}
3 changes: 2 additions & 1 deletion bench-vortex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ use crate::taxi_data::taxi_data_parquet;
use vortex::array::chunked::ChunkedArray;
use vortex::array::downcast::DowncastArrayBuiltin;
use vortex::array::IntoArray;
use vortex::array::{Array, ArrayRef, EncodingRef, ENCODINGS};
use vortex::array::{Array, ArrayRef};
use vortex::arrow::FromArrowType;
use vortex::compress::{CompressConfig, CompressCtx};
use vortex::encoding::{EncodingRef, ENCODINGS};
use vortex::formatter::display_tree;
use vortex_alp::ALPEncoding;
use vortex_datetime::DateTimeEncoding;
Expand Down
55 changes: 55 additions & 0 deletions flatbuffers.build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use flatc::flatc;
use std::env;
use std::ffi::OsStr;
use std::path::{Path, PathBuf};
use std::process::Command;

use walkdir::WalkDir;

fn main() {
let flatbuffers_dir = PathBuf::from(env::var("CARGO_MANIFEST_DIR").unwrap())
.canonicalize()
.expect("Failed to canonicalize CARGO_MANIFEST_DIR")
.join("flatbuffers");
let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap())
.canonicalize()
.expect("Failed to canonicalize OUT_DIR");

let fbs_files = WalkDir::new(&flatbuffers_dir)
.into_iter()
.filter_map(|e| e.ok())
.filter(|e| e.path().extension() == Some(OsStr::new("fbs")))
.map(|e| {
rerun_if_changed(e.path());
e.path().to_path_buf()
})
.collect::<Vec<_>>();

if !Command::new(flatc())
.arg("--rust")
.arg("--filename-suffix")
.arg("")
.arg("-I")
.arg(flatbuffers_dir.join("../../"))
.arg("--include-prefix")
.arg("flatbuffers::deps")
.arg("-o")
.arg(out_dir.join("flatbuffers"))
.args(fbs_files)
.status()
.unwrap()
.success()
{
panic!("Failed to run flatc");
}
}

fn rerun_if_changed(path: &Path) {
println!(
"cargo:rerun-if-changed={}",
path.canonicalize()
.unwrap_or_else(|_| panic!("failed to canonicalize {}", path.to_str().unwrap()))
.to_str()
.unwrap()
);
}
3 changes: 2 additions & 1 deletion pyvortex/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use vortex::array::sparse::{SparseArray, SparseEncoding};
use vortex::array::struct_::{StructArray, StructEncoding};
use vortex::array::varbin::{VarBinArray, VarBinEncoding};
use vortex::array::varbinview::{VarBinViewArray, VarBinViewEncoding};
use vortex::array::{Array, ArrayKind, ArrayRef, EncodingRef};
use vortex::array::{Array, ArrayKind, ArrayRef};
use vortex::encoding::EncodingRef;
use vortex_alp::{ALPArray, ALPEncoding};
use vortex_dict::{DictArray, DictEncoding};
use vortex_fastlanes::{
Expand Down
2 changes: 1 addition & 1 deletion pyvortex/src/compress.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use pyo3::types::PyType;
use pyo3::{pyclass, pyfunction, pymethods, Py, PyResult, Python};
use std::sync::Arc;
use vortex::array::ENCODINGS;
use vortex::encoding::ENCODINGS;

use vortex::compress::{CompressConfig, CompressCtx};

Expand Down
2 changes: 1 addition & 1 deletion pyvortex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ fn _lib(_py: Python, m: &PyModule) -> PyResult<()> {

debug!(
"Discovered encodings: {:?}",
vortex::array::ENCODINGS
vortex::encoding::ENCODINGS
.iter()
.map(|e| e.id().to_string())
.collect::<Vec<String>>()
Expand Down
52 changes: 51 additions & 1 deletion requirements-dev.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5,54 +5,104 @@
# pre: false
# features: []
# all-features: false
# with-sources: false

-e file:pyvortex
-e file:.
babel==2.14.0
# via mkdocs-material
bracex==2.4
# via wcmatch
certifi==2024.2.2
# via requests
charset-normalizer==3.3.2
# via requests
click==8.1.7
# via mkdocs
colorama==0.4.6
# via mkdocs-material
ghp-import==2.1.0
# via mkdocs
idna==3.6
# via requests
importlib-metadata==7.0.1
# via mike
importlib-resources==6.1.2
# via mike
iniconfig==2.0.0
# via pytest
jinja2==3.1.3
# via mike
# via mkdocs
# via mkdocs-material
markdown==3.5.2
# via mkdocs
# via mkdocs-material
# via pymdown-extensions
markupsafe==2.1.5
# via jinja2
# via mkdocs
maturin==1.4.0
mergedeep==1.3.4
# via mkdocs
mike==2.0.0
mkdocs==1.5.3
# via mike
# via mkdocs-include-markdown-plugin
# via mkdocs-material
mkdocs-include-markdown-plugin==6.0.4
mkdocs-material==9.5.12
mkdocs-material-extensions==1.3.1
# via mkdocs-material
numpy==1.26.4
# via pyarrow
packaging==23.2
# via mkdocs
# via pytest
paginate==0.5.6
# via mkdocs-material
pathspec==0.12.1
# via mkdocs
platformdirs==4.2.0
# via mkdocs
pluggy==1.4.0
# via pytest
py-cpuinfo==9.0.0
# via pytest-benchmark
pyarrow==15.0.0
pygments==2.17.2
# via mkdocs-material
pymdown-extensions==10.7
# via mkdocs-material
pyparsing==3.1.1
# via mike
pytest==7.4.0
# via pytest-benchmark
pytest-benchmark==4.0.0
python-dateutil==2.9.0
# via ghp-import
pyyaml==6.0.1
# via mike
# via mkdocs
# via pymdown-extensions
# via pyyaml-env-tag
pyyaml-env-tag==0.1
# via mkdocs
regex==2023.12.25
# via mkdocs-material
requests==2.31.0
# via mkdocs-material
ruff==0.2.2
six==1.16.0
# via python-dateutil
urllib3==2.2.1
# via requests
verspec==0.1.0
# via mike
watchdog==4.0.0
# via mkdocs
wcmatch==8.5.1
# via mkdocs-include-markdown-plugin
zipp==3.17.0
# The following packages are considered to be unsafe in a requirements file:
# via importlib-metadata
pip==24.0
1 change: 1 addition & 0 deletions requirements.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# pre: false
# features: []
# all-features: false
# with-sources: false

-e file:pyvortex
-e file:.
10 changes: 8 additions & 2 deletions vortex-alp/src/array.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use std::sync::{Arc, RwLock};

use vortex::array::{Array, ArrayKind, ArrayRef, Encoding, EncodingId, EncodingRef};
use vortex::array::{Array, ArrayKind, ArrayRef};
use vortex::compress::EncodingCompression;
use vortex::compute::ArrayCompute;
use vortex::encoding::{Encoding, EncodingId, EncodingRef};
use vortex::formatter::{ArrayDisplay, ArrayFormatter};
use vortex::impl_array;
use vortex::serde::{ArraySerde, EncodingSerde};
use vortex::stats::{Stats, StatsSet};
use vortex::validity::{ArrayValidity, Validity};
use vortex::{impl_array, ArrayWalker};
use vortex_error::{vortex_bail, vortex_err, VortexResult};
use vortex_schema::{DType, IntWidth, Signedness};

Expand Down Expand Up @@ -115,6 +117,10 @@ impl Array for ALPArray {
fn serde(&self) -> Option<&dyn ArraySerde> {
Some(self)
}

fn walk(&self, walker: &mut dyn ArrayWalker) -> VortexResult<()> {
walker.visit_child(self.encoded())
}
}

impl ArrayDisplay for ALPArray {
Expand Down
2 changes: 1 addition & 1 deletion vortex-alp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ pub use alp::*;
pub use array::*;

use linkme::distributed_slice;
use vortex::array::{EncodingRef, ENCODINGS};
use vortex::encoding::{EncodingRef, ENCODINGS};

mod alp;
mod array;
Expand Down
4 changes: 4 additions & 0 deletions vortex-alp/src/serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ impl ArraySerde for ALPArray {
ctx.write_fixed_slice([self.exponents().e, self.exponents().f])?;
ctx.write(self.encoded())
}

fn metadata(&self) -> VortexResult<Option<Vec<u8>>> {
Ok(Some(vec![self.exponents().e, self.exponents().f]))
}
}

impl EncodingSerde for ALPEncoding {
Expand Down
Loading

0 comments on commit 538d089

Please sign in to comment.