diff --git a/.github/workflows/bench-pr.yml b/.github/workflows/bench-pr.yml index d52ccfffe8..01736364f1 100644 --- a/.github/workflows/bench-pr.yml +++ b/.github/workflows/bench-pr.yml @@ -22,6 +22,8 @@ jobs: labels: benchmark - uses: actions/checkout@v4 + with: + submodules: recursive - uses: ./.github/actions/setup-zig - uses: ./.github/actions/setup-rust diff --git a/.github/workflows/bench.yml b/.github/workflows/bench.yml index 5180858bef..f9068a1247 100644 --- a/.github/workflows/bench.yml +++ b/.github/workflows/bench.yml @@ -16,6 +16,8 @@ jobs: if: ${{ github.event_name == 'workflow_dispatch' || (contains(github.event.head_commit.message, '[benchmark]') && github.ref_name == 'develop') }} steps: - uses: actions/checkout@v4 + with: + submodules: recursive - uses: ./.github/actions/setup-zig - uses: ./.github/actions/setup-rust diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9c7dcad81e..9967c8911d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -17,6 +17,8 @@ jobs: runs-on: ubuntu-latest-medium steps: - uses: actions/checkout@v4 + with: + submodules: recursive - uses: ./.github/actions/setup-zig - uses: ./.github/actions/setup-rust - uses: ./.github/actions/setup-python diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000000..bfbcfba415 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "deps/fastlanez"] + path = deps/fastlanez + url = https://github.com/fulcrum-so/fastlanez.git diff --git a/Cargo.lock b/Cargo.lock index 49fafdbf0a..13a0456f84 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -103,6 +103,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "arrayref" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b4930d2cb77ce62f89ee5d5289b4ac049559b1c45539271f5ed4fdc7db34545" + [[package]] name = "arrow" version = "50.0.0" @@ -819,6 +825,18 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95765f67b4b18863968b4a1bd5bb576f732b29a4a28c7cd84c09fa3e2875f33c" +[[package]] +name = "fastlanez-sys" +version = "0.1.0" +dependencies = [ + "arrayref", + "bindgen", + "paste", + "seq-macro", + "uninit", + "walkdir", +] + [[package]] name = "fastrand" version = "2.0.1" @@ -2038,6 +2056,7 @@ dependencies = [ "vortex", "vortex-alp", "vortex-dict", + "vortex-fastlanes", "vortex-ffor", "vortex-ree", "vortex-roaring", @@ -2743,6 +2762,12 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c7de7d73e1754487cb58364ee906a499937a0dfabd86bcb980fa99ec8c8fa2ce" +[[package]] +name = "uninit" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "359fdaaabedff944f100847f2e0ea88918d8012fe64baf5b54c191ad010168c9" + [[package]] name = "url" version = "2.5.0" @@ -2826,6 +2851,7 @@ dependencies = [ "vortex", "vortex-alp", "vortex-dict", + "vortex-fastlanes", "vortex-ffor", "vortex-ree", "vortex-roaring", @@ -2845,6 +2871,18 @@ dependencies = [ "vortex", ] +[[package]] +name = "vortex-fastlanes" +version = "0.1.0" +dependencies = [ + "arrayref", + "fastlanez-sys", + "itertools 0.12.1", + "linkme", + "log", + "vortex", +] + [[package]] name = "vortex-ffor" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 62b7edd0bd..0013fbe9e1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,11 +3,13 @@ members = [ "bench-vortex", "codecz", "codecz-sys", + "fastlanez-sys", "pyvortex", "vortex", "vortex-alloc", "vortex-alp", "vortex-dict", + "vortex-fastlanes", "vortex-ffor", "vortex-ree", "vortex-roaring", diff --git a/bench-vortex/.gitignore b/bench-vortex/.gitignore new file mode 100644 index 0000000000..6320cd248d --- /dev/null +++ b/bench-vortex/.gitignore @@ -0,0 +1 @@ +data \ No newline at end of file diff --git a/bench-vortex/Cargo.toml b/bench-vortex/Cargo.toml index 70939d3d62..12755e461f 100644 --- a/bench-vortex/Cargo.toml +++ b/bench-vortex/Cargo.toml @@ -11,6 +11,7 @@ arrow-array = "50.0.0" vortex = { path = "../vortex" } vortex-alp = { path = "../vortex-alp" } vortex-dict = { path = "../vortex-dict" } +vortex-fastlanes = { path = "../vortex-fastlanes" } vortex-ffor = { path = "../vortex-ffor" } vortex-ree = { path = "../vortex-ree" } vortex-roaring = { path = "../vortex-roaring" } diff --git a/bench-vortex/benches/compress_benchmark.rs b/bench-vortex/benches/compress_benchmark.rs index 29acd61b33..cebc61ce0a 100644 --- a/bench-vortex/benches/compress_benchmark.rs +++ b/bench-vortex/benches/compress_benchmark.rs @@ -29,11 +29,10 @@ use vortex::array::{Array, ArrayRef}; use vortex::compress::CompressCtx; use vortex::dtype::DType; use vortex::error::{VortexError, VortexResult}; - use vortex_bench::enumerate_arrays; fn download_taxi_data() -> &'static Path { - let download_path = Path::new("../../pyspiral/bench/.data/https-d37ci6vzurychx-cloudfront-net-trip-data-yellow-tripdata-2023-11.parquet"); + let download_path = Path::new("data/yellow-tripdata-2023-11.parquet"); if download_path.exists() { return download_path; } diff --git a/bench-vortex/src/lib.rs b/bench-vortex/src/lib.rs index 6f04584280..91bd55eaef 100644 --- a/bench-vortex/src/lib.rs +++ b/bench-vortex/src/lib.rs @@ -17,7 +17,7 @@ use itertools::Itertools; use vortex::array::Encoding; use vortex_alp::ALPEncoding; use vortex_dict::DictEncoding; -use vortex_ffor::FFoREncoding; +use vortex_fastlanes::{BitPackedEncoding, FoREncoding}; use vortex_ree::REEEncoding; use vortex_roaring::{RoaringBoolEncoding, RoaringIntEncoding}; use vortex_zigzag::ZigZagEncoding; @@ -26,7 +26,9 @@ pub fn enumerate_arrays() { let encodings: Vec<&dyn Encoding> = vec![ &ALPEncoding, &DictEncoding, - &FFoREncoding, + &BitPackedEncoding, + &FoREncoding, + //&FFoREncoding, &REEEncoding, &RoaringBoolEncoding, &RoaringIntEncoding, @@ -46,15 +48,16 @@ mod test { use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use simplelog::{ColorChoice, Config, TermLogger, TerminalMode}; - use crate::enumerate_arrays; use vortex::array::chunked::ChunkedArray; use vortex::array::{Array, ArrayRef}; use vortex::compress::CompressCtx; use vortex::dtype::DType; use vortex::error::{VortexError, VortexResult}; + use crate::enumerate_arrays; + pub fn download_taxi_data() -> &'static Path { - let download_path = Path::new("../../pyspiral/bench/.data/https-d37ci6vzurychx-cloudfront-net-trip-data-yellow-tripdata-2023-11.parquet"); + let download_path = Path::new("data/yellow-tripdata-2023-11.parquet"); if download_path.exists() { return download_path; } diff --git a/deps/fastlanez b/deps/fastlanez new file mode 160000 index 0000000000..d4ed218868 --- /dev/null +++ b/deps/fastlanez @@ -0,0 +1 @@ +Subproject commit d4ed218868fdd8cf5a50f3e13fcbee34bc3af4e4 diff --git a/fastlanez-sys/Cargo.toml b/fastlanez-sys/Cargo.toml new file mode 100644 index 0000000000..c86126aa42 --- /dev/null +++ b/fastlanez-sys/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "fastlanez-sys" +version = "0.1.0" +edition = "2021" +links = "fastlanez" + +[lints] +workspace = true + +[dependencies] +arrayref = "0.3.7" +paste = "1.0.14" +seq-macro = "0.3.5" +uninit = "0.6.2" + +[build-dependencies] +bindgen = "0.69.1" +walkdir = "2.4.0" \ No newline at end of file diff --git a/fastlanez-sys/build.rs b/fastlanez-sys/build.rs new file mode 100644 index 0000000000..0a6988dc18 --- /dev/null +++ b/fastlanez-sys/build.rs @@ -0,0 +1,121 @@ +use std::env; +use std::path::{Path, PathBuf}; +use std::process::{Command, Stdio}; + +use walkdir::WalkDir; + +fn main() { + let buildrs_dir = PathBuf::from(env::var("CARGO_MANIFEST_DIR").unwrap()) + .canonicalize() + .expect("Failed to canonicalize CARGO_MANIFEST_DIR"); + let root_dir = buildrs_dir + .join("../") + .canonicalize() + .expect("Failed to canonicalize root dir"); + let fastlanez_dir = root_dir.join("deps/fastlanez"); + + // Tell cargo to tell rustc to link codecz + println!( + "cargo:rustc-link-search={}", + fastlanez_dir.join("zig-out/lib").to_str().unwrap() + ); + println!("cargo:rustc-link-lib=fastlanez"); + + rerun_if_changed(&buildrs_dir.join("build.rs")); + WalkDir::new(fastlanez_dir.join("src")) + .into_iter() + .filter_map(|e| e.ok()) + .for_each(|e| rerun_if_changed(e.path())); + + let zig_opt = get_zig_opt(); + println!("cargo:info=invoking `zig build` with {}", zig_opt); + if !Command::new("zig") + .args(["build", "lib"]) + .arg(zig_opt) + .args(["--summary", "all"]) + .current_dir(fastlanez_dir.clone()) + .spawn() + .expect("Could not invoke `zig build`") + .wait() + .unwrap() + .success() + { + // Panic if the command was not successful. + panic!( + "failed to successfully invoke `zig build` in {}", + root_dir.to_str().unwrap() + ); + } + + let bindings = bindgen::Builder::default() + .header( + fastlanez_dir + .join("zig-out/include/fastlanez.h") + .to_str() + .unwrap(), + ) + .clang_args(&[ + get_zig_include().as_ref(), + "-DZIG_TARGET_MAX_INT_ALIGNMENT=16", + ]) + .parse_callbacks(Box::new(bindgen::CargoCallbacks::new())) + .allowlist_item("fl_.*") + .generate() + .expect("Unable to generate bindings"); + + // Write the bindings to the $OUT_DIR/bindings.rs file. + let out_path = PathBuf::from(env::var("OUT_DIR").unwrap()); + bindings + .write_to_file(out_path.join("bindings.rs")) + .expect("Couldn't write bindings!"); +} + +fn rerun_if_changed(path: &Path) { + println!( + "cargo:rerun-if-changed={}", + path.canonicalize() + .unwrap_or_else(|_| panic!("failed to canonicalize {}", path.to_str().unwrap())) + .to_str() + .unwrap() + ); +} + +fn get_zig_opt() -> &'static str { + let profile_env = env::var("PROFILE").unwrap(); + let opt_level_zero = env::var("OPT_LEVEL").unwrap() == "0"; + + // based on https://doc.rust-lang.org/cargo/reference/environment-variables.html + // + // confusingly, the PROFILE env var will be either "debug" or "release" depending on whether the cargo profile + // derives from the "dev" or "release" profile, respectively. *facepalm* + // so `cargo build` and `cargo test` will be "debug"; `cargo build --release` and `cargo bench` will be "release" + // + // we also check whether debug_assertions are enabled (to pick a sane value for custom profiles) + if profile_env == "debug" || cfg!(debug_assertions) { + "-Doptimize=Debug" + } else if profile_env == "release" || !opt_level_zero { + "-Doptimize=ReleaseSmall" + } else { + // we're in a custom profile, the opt_level is 0, but debug assertions aren't enabled + // pretty weird case, let's default to debug + println!( + "cargo:warning=unrecognized cargo profile {}, defaulting to `zig build -Doptimize=Debug`", profile_env + ); + "-Doptimize=Debug" + } +} + +fn get_zig_include() -> String { + String::from_utf8( + Command::new("bash") + .arg("-c") + .arg("zig env | grep lib_dir | awk -F'\"' '{print \"-I\"$4}'") + .stdout(Stdio::piped()) + .output() + .expect("Failed to execute command") + .stdout, + ) + .expect("Failed to convert command output to string") + .trim_end() + .to_string() +} diff --git a/fastlanez-sys/src/lib.rs b/fastlanez-sys/src/lib.rs new file mode 100644 index 0000000000..001195c6e1 --- /dev/null +++ b/fastlanez-sys/src/lib.rs @@ -0,0 +1,115 @@ +/* + * (c) Copyright 2024 Fulcrum Technologies, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#![allow(incomplete_features)] +#![feature(generic_const_exprs)] +#![allow(non_upper_case_globals)] +#![allow(non_camel_case_types)] +#![allow(non_snake_case)] + +use std::mem::{size_of, MaybeUninit}; + +use arrayref::array_mut_ref; +use seq_macro::seq; +use uninit::prelude::VecCapacity; + +include!(concat!(env!("OUT_DIR"), "/bindings.rs")); + +pub struct Pred; + +pub trait Satisfied {} + +impl Satisfied for Pred {} + +/// BitPack into a compile-time known bit-width. +pub trait BitPack +where + Self: Sized, + Pred<{ W > 0 }>: Satisfied, + Pred<{ W < 8 * size_of::() }>: Satisfied, +{ + fn bitpack<'a>( + input: &[Self; 1024], + output: &'a mut [MaybeUninit; 128 * W], + ) -> &'a [u8; 128 * W]; +} + +#[derive(Debug)] +pub struct UnsupportedBitWidth; + +/// Try to bitpack into a runtime-known bit width. +pub trait TryBitPack +where + Self: Sized, +{ + fn try_bitpack<'a>( + input: &[Self; 1024], + width: usize, + output: &'a mut [MaybeUninit], + ) -> Result<&'a [u8], UnsupportedBitWidth>; + + fn try_bitpack_into( + input: &[Self; 1024], + width: usize, + output: &mut Vec, + ) -> Result<(), UnsupportedBitWidth> { + Self::try_bitpack(input, width, output.reserve_uninit(width * 128))?; + unsafe { output.set_len(output.len() + (width * 128)) } + Ok(()) + } +} + +macro_rules! bitpack_impl { + ($T:ty, $W:literal) => { + paste::item! { + seq!(N in 1..$W { + impl BitPack for $T { + #[inline] + fn bitpack<'a>( + input: &[Self; 1024], + output: &'a mut [MaybeUninit; 128 * N], + ) -> &'a [u8; 128 * N] { + unsafe { + let output_array: &mut [u8; 128 * N] = std::mem::transmute(output); + []~N(input, output_array); + output_array + } + } + } + }); + } + + impl TryBitPack for $T { + fn try_bitpack<'a>( + input: &[Self; 1024], + width: usize, + output: &'a mut [MaybeUninit], + ) -> Result<&'a [u8], UnsupportedBitWidth> { + seq!(N in 1..$W { + match width { + #(N => Ok(BitPack::::bitpack(input, array_mut_ref![output, 0, N * 128]).as_slice()),)* + _ => Err(UnsupportedBitWidth), + } + }) + } + } + }; +} + +bitpack_impl!(u8, 8); +bitpack_impl!(u16, 16); +bitpack_impl!(u32, 32); +bitpack_impl!(u64, 64); diff --git a/pyproject.toml b/pyproject.toml index 2398bb5dae..3e7b88c721 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,8 +22,6 @@ dev-dependencies = [ "ruff>=0.1.11", "pip>=23.3.2", "maturin>=1.4.0", - "snakeviz>=2.2.0", - "py-spy>=0.3.14", "mkdocs>=1.5.3", "mkdocs-material>=9.5.11", "mkdocs-include-markdown-plugin>=6.0.4", diff --git a/pyvortex/Cargo.toml b/pyvortex/Cargo.toml index 3f648d72bc..594c646bf8 100644 --- a/pyvortex/Cargo.toml +++ b/pyvortex/Cargo.toml @@ -15,6 +15,7 @@ arrow = { version = "50.0.0", features = ["ffi"] } vortex = { path = "../vortex" } vortex-alp = { path = "../vortex-alp" } vortex-dict = { path = "../vortex-dict" } +vortex-fastlanes = { path = "../vortex-fastlanes" } vortex-ffor = { path = "../vortex-ffor" } vortex-ree = { path = "../vortex-ree" } vortex-roaring = { path = "../vortex-roaring" } diff --git a/pyvortex/src/array.rs b/pyvortex/src/array.rs index 61604548c9..bd35b64e36 100644 --- a/pyvortex/src/array.rs +++ b/pyvortex/src/array.rs @@ -28,6 +28,7 @@ use vortex::array::varbinview::VarBinViewArray; use vortex::array::{Array, ArrayKind, ArrayRef}; use vortex_alp::{ALPArray, ALP_ENCODING}; use vortex_dict::{DictArray, DICT_ENCODING}; +use vortex_fastlanes::{BitPackedArray, FoRArray, FL_BITPACKED_ENCODING, FL_FOR_ENCODING}; use vortex_ffor::{FFORArray, FFOR_ENCODING}; use vortex_ree::{REEArray, REE_ENCODING}; use vortex_roaring::{ @@ -78,6 +79,8 @@ pyarray!(VarBinArray, "VarBinArray"); pyarray!(VarBinViewArray, "VarBinViewArray"); pyarray!(ALPArray, "ALPArray"); +pyarray!(BitPackedArray, "BitPackedArray"); +pyarray!(FoRArray, "FoRArray"); pyarray!(DictArray, "DictArray"); pyarray!(FFORArray, "FFORArray"); pyarray!(REEArray, "REEArray"); @@ -137,6 +140,15 @@ impl PyArray { PyDictArray::wrap(py, inner.into_any().downcast::().unwrap())? .extract(py) } + FL_FOR_ENCODING => { + PyFoRArray::wrap(py, inner.into_any().downcast::().unwrap())? + .extract(py) + } + FL_BITPACKED_ENCODING => PyBitPackedArray::wrap( + py, + inner.into_any().downcast::().unwrap(), + )? + .extract(py), FFOR_ENCODING => { PyFFORArray::wrap(py, inner.into_any().downcast::().unwrap())? .extract(py) diff --git a/pyvortex/test/test_compress.py b/pyvortex/test/test_compress.py index 84ae84eddb..3e3db9b310 100644 --- a/pyvortex/test/test_compress.py +++ b/pyvortex/test/test_compress.py @@ -25,6 +25,12 @@ def test_primitive_compress(): assert arr_compressed.nbytes < a.nbytes +def test_for_compress(): + a = pa.array(np.arange(10_000) + 10_000_000) + arr_compressed = vortex.compress(vortex.encode(a)) + assert not isinstance(arr_compressed, vortex.PrimitiveArray) + + def test_bool_compress(): a = vortex.encode(pa.array([False] * 10_000 + [True] * 10_000)) arr_compressed = vortex.compress(a) diff --git a/requirements-dev.lock b/requirements-dev.lock index ccda876e09..43512784c8 100644 --- a/requirements-dev.lock +++ b/requirements-dev.lock @@ -5,7 +5,6 @@ # pre: false # features: [] # all-features: false -# with-sources: false -e file:pyvortex -e file:. @@ -28,7 +27,7 @@ mergedeep==1.3.4 mike==2.0.0 mkdocs==1.5.3 mkdocs-include-markdown-plugin==6.0.4 -mkdocs-material==9.5.11 +mkdocs-material==9.5.12 mkdocs-material-extensions==1.3.1 numpy==1.26.4 packaging==23.2 @@ -44,7 +43,7 @@ pymdown-extensions==10.7 pyparsing==3.1.1 pytest==7.4.0 pytest-benchmark==4.0.0 -python-dateutil==2.8.2 +python-dateutil==2.9.0 pyyaml==6.0.1 pyyaml-env-tag==0.1 regex==2023.12.25 diff --git a/requirements.lock b/requirements.lock index 8f59ff5e9c..c9283eb85a 100644 --- a/requirements.lock +++ b/requirements.lock @@ -5,7 +5,6 @@ # pre: false # features: [] # all-features: false -# with-sources: false -e file:pyvortex -e file:. diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 23591c9686..b2b30bc401 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,4 +1,5 @@ [toolchain] -channel = "stable" +channel = "nightly" components = ["rust-src", "rustfmt", "clippy"] profile = "minimal" + diff --git a/vortex-fastlanes/Cargo.toml b/vortex-fastlanes/Cargo.toml new file mode 100644 index 0000000000..05bb5f9236 --- /dev/null +++ b/vortex-fastlanes/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "vortex-fastlanes" +version = "0.1.0" +edition = "2021" + +[lints] +workspace = true + +[dependencies] +arrayref = "0.3.7" +vortex = { "path" = "../vortex" } +linkme = "0.3.22" +itertools = "0.12.1" +fastlanez-sys = { path = "../fastlanez-sys" } +log = "0.4.20" diff --git a/vortex-fastlanes/src/bitpacking/compress.rs b/vortex-fastlanes/src/bitpacking/compress.rs new file mode 100644 index 0000000000..e55f029287 --- /dev/null +++ b/vortex-fastlanes/src/bitpacking/compress.rs @@ -0,0 +1,253 @@ +// (c) Copyright 2024 Fulcrum Technologies, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use arrayref::array_ref; +use log::debug; + +use fastlanez_sys::TryBitPack; +use vortex::array::downcast::DowncastArrayBuiltin; +use vortex::array::primitive::PrimitiveArray; +use vortex::array::sparse::SparseArray; +use vortex::array::{Array, ArrayRef}; +use vortex::compress::{CompressConfig, CompressCtx, Compressor, EncodingCompression}; +use vortex::match_each_integer_ptype; +use vortex::ptype::{NativePType, PType}; +use vortex::scalar::ListScalarVec; +use vortex::stats::Stat; + +use crate::{BitPackedArray, BitPackedEncoding}; + +impl EncodingCompression for BitPackedEncoding { + fn compressor( + &self, + array: &dyn Array, + _config: &CompressConfig, + ) -> Option<&'static Compressor> { + // Only support primitive arrays + let Some(parray) = array.maybe_primitive() else { + debug!("Skipping BitPacking: not primitive"); + return None; + }; + + // Only supports ints + if !parray.ptype().is_int() { + debug!("Skipping BitPacking: not int"); + return None; + } + + // Check that the min > zero + if parray + .stats() + .get_or_compute_cast::(&Stat::Min) + .unwrap() + < 0 + { + debug!("Skipping BitPacking: min is zero"); + return None; + } + + let bit_width_freq = parray + .stats() + .get_or_compute_as::>(&Stat::BitWidthFreq) + .unwrap() + .0; + let bit_width = best_bit_width(parray.ptype(), &bit_width_freq); + + // Check that the bit width is less than the type's bit width + if bit_width == parray.ptype().bit_width() { + debug!("Skipping BitPacking: bit packing has no effect"); + return None; + } + + debug!("Compressing with BitPacking"); + Some(&(bitpacked_compressor as Compressor)) + } +} + +fn bitpacked_compressor(array: &dyn Array, like: Option<&dyn Array>, ctx: CompressCtx) -> ArrayRef { + let parray = array.as_primitive(); + let bit_width_freq = parray + .stats() + .get_or_compute_as::>(&Stat::BitWidthFreq) + .unwrap() + .0; + + let like_bp = like.map(|l| l.as_any().downcast_ref::().unwrap()); + + let bit_width = like_bp + .map(|bp| bp.bit_width()) + .unwrap_or_else(|| best_bit_width(parray.ptype(), &bit_width_freq)); + let num_exceptions = count_exceptions(bit_width, &bit_width_freq); + + // If we pack into zero bits, then we have an empty byte array. + let packed = if bit_width == 0 { + PrimitiveArray::from_vec(Vec::::new()).boxed() + } else { + bitpack(parray, bit_width) + }; + + let validity = parray + .validity() + .map(|v| ctx.compress(v.as_ref(), like_bp.and_then(|bp| bp.validity()))); + + let patches = if num_exceptions > 0 { + Some(ctx.compress( + bitpack_patches(parray, bit_width, num_exceptions).as_ref(), + like_bp.and_then(|bp| bp.patches()), + )) + } else { + None + }; + + return BitPackedArray::try_new( + packed, + validity, + patches, + bit_width, + parray.dtype().clone(), + parray.len(), + ) + .unwrap() + .boxed(); +} + +fn bitpack(parray: &PrimitiveArray, bit_width: usize) -> ArrayRef { + // We know the min is > 0, so it's safe to re-interpret signed integers as unsigned. + // TODO(ngates): we should implement this using a vortex cast to centralize this hack. + use PType::*; + let bytes = match parray.ptype() { + I8 | U8 => bitpack_primitive(parray.buffer().typed_data::(), bit_width), + I16 | U16 => bitpack_primitive(parray.buffer().typed_data::(), bit_width), + I32 | U32 => bitpack_primitive(parray.buffer().typed_data::(), bit_width), + I64 | U64 => bitpack_primitive(parray.buffer().typed_data::(), bit_width), + _ => panic!("Unsupported ptype {:?}", parray.ptype()), + }; + PrimitiveArray::from_vec(bytes).boxed() +} + +fn bitpack_primitive(array: &[T], bit_width: usize) -> Vec { + // How many fastlanes vectors we will process. + let num_chunks = (array.len() + 1023) / 1024; + + // Allocate a result byte array. + let mut output = Vec::with_capacity(num_chunks * bit_width * 128); + + // Loop over all but the last chunk. + (0..num_chunks - 1).for_each(|i| { + let start_elem = i * 1024; + let chunk: &[T; 1024] = array_ref![array, start_elem, 1024]; + TryBitPack::try_bitpack_into(chunk, bit_width, &mut output).unwrap(); + }); + + // Pad the last chunk with zeros to a full 1024 elements. + let last_chunk_size = array.len() % 1024; + let mut last_chunk: [T; 1024] = [T::default(); 1024]; + last_chunk[..last_chunk_size].copy_from_slice(&array[array.len() - last_chunk_size..]); + TryBitPack::try_bitpack_into(&last_chunk, bit_width, &mut output).unwrap(); + + output +} + +fn bitpack_patches( + parray: &PrimitiveArray, + bit_width: usize, + num_exceptions_hint: usize, +) -> ArrayRef { + match_each_integer_ptype!(parray.ptype(), |$T| { + let mut indices: Vec = Vec::with_capacity(num_exceptions_hint); + let mut values: Vec<$T> = Vec::with_capacity(num_exceptions_hint); + for (i, v) in parray.buffer().typed_data::<$T>().iter().enumerate() { + if (v.leading_zeros() as usize) < parray.ptype().bit_width() - bit_width { + indices.push(i as u64); + values.push(*v); + } + } + let len = indices.len(); + SparseArray::new( + PrimitiveArray::from_vec(indices).boxed(), + PrimitiveArray::from_vec(values).boxed(), + len, + ).boxed() + }) +} + +/// Assuming exceptions cost 1 value + 1 u32 index, figure out the best bit-width to use. +/// We could try to be clever, but we can never really predict how the exceptions will compress. +fn best_bit_width(ptype: &PType, bit_width_freq: &[usize]) -> usize { + let len: usize = bit_width_freq.iter().sum(); + let bytes_per_exception = ptype.byte_width() + 4; + + if bit_width_freq.len() > u8::MAX as usize { + panic!("Too many bit widths"); + } + + let mut num_packed = 0; + let mut best_cost = len * bytes_per_exception; + let mut best_width = 0; + for (bit_width, freq) in bit_width_freq.iter().enumerate() { + num_packed += *freq; + let packed_cost = ((bit_width * len) + 7) / 8; + let exceptions_cost = (len - num_packed) * bytes_per_exception; + let cost = exceptions_cost + packed_cost; + if cost < best_cost { + best_cost = cost; + best_width = bit_width; + } + } + + best_width +} + +fn count_exceptions(bit_width: usize, bit_width_freq: &[usize]) -> usize { + bit_width_freq[bit_width + 1..].iter().sum() +} + +#[cfg(test)] +mod test { + use std::collections::HashSet; + + use vortex::array::primitive::PrimitiveEncoding; + use vortex::array::Encoding; + + use super::*; + + #[test] + fn test_best_bit_width() { + // 10 1-bit values, 20 2-bit, etc. + let freq = vec![0, 10, 20, 15, 1, 0, 0, 0]; + // 3-bits => (46 * 3) + (8 * 1 * 5) => 178 bits => 23 bytes and zero exceptions + assert_eq!(best_bit_width(&PType::U8, &freq), 3); + } + + #[test] + fn test_compress() { + // FIXME(ngates): remove PrimitiveEncoding https://github.com/fulcrum-so/vortex/issues/35 + let cfg = CompressConfig::new( + HashSet::from([PrimitiveEncoding.id(), BitPackedEncoding.id()]), + HashSet::default(), + ); + let ctx = CompressCtx::new(&cfg); + + let compressed = ctx.compress( + &PrimitiveArray::from_vec(Vec::from_iter((0..10_000).map(|i| (i % 63) as u8))), + None, + ); + assert_eq!(compressed.encoding().id(), BitPackedEncoding.id()); + let bp = compressed + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(bp.bit_width(), 6); + } +} diff --git a/vortex-fastlanes/src/bitpacking/mod.rs b/vortex-fastlanes/src/bitpacking/mod.rs new file mode 100644 index 0000000000..f94dabc26c --- /dev/null +++ b/vortex-fastlanes/src/bitpacking/mod.rs @@ -0,0 +1,214 @@ +// (c) Copyright 2024 Fulcrum Technologies, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; + +use vortex::array::{ + check_validity_buffer, Array, ArrayRef, ArrowIterator, Encoding, EncodingId, EncodingRef, +}; +use vortex::compress::EncodingCompression; +use vortex::dtype::DType; +use vortex::error::VortexResult; +use vortex::formatter::{ArrayDisplay, ArrayFormatter}; +use vortex::scalar::{NullableScalar, Scalar}; +use vortex::serde::{ArraySerde, EncodingSerde}; +use vortex::stats::{Stat, Stats, StatsCompute, StatsSet}; + +mod compress; +mod serde; + +#[derive(Debug, Clone)] +pub struct BitPackedArray { + encoded: ArrayRef, + validity: Option, + patches: Option, + len: usize, + bit_width: usize, + dtype: DType, + stats: Arc>, +} + +impl BitPackedArray { + pub fn try_new( + encoded: ArrayRef, + validity: Option, + patches: Option, + bit_width: usize, + dtype: DType, + len: usize, + ) -> VortexResult { + let validity = validity.filter(|v| !v.is_empty()); + check_validity_buffer(validity.as_ref())?; + + // TODO(ngates): check encoded has type u8 + + Ok(Self { + encoded, + validity, + patches, + bit_width, + len, + dtype, + stats: Arc::new(RwLock::new(StatsSet::new())), + }) + } + + #[inline] + pub fn encoded(&self) -> &dyn Array { + self.encoded.as_ref() + } + + #[inline] + pub fn bit_width(&self) -> usize { + self.bit_width + } + + #[inline] + pub fn validity(&self) -> Option<&dyn Array> { + self.validity.as_deref() + } + + #[inline] + pub fn patches(&self) -> Option<&dyn Array> { + self.patches.as_deref() + } + + pub fn is_valid(&self, index: usize) -> bool { + self.validity() + .map(|v| v.scalar_at(index).and_then(|v| v.try_into()).unwrap()) + .unwrap_or(true) + } +} + +impl Array for BitPackedArray { + #[inline] + fn as_any(&self) -> &dyn Any { + self + } + + #[inline] + fn boxed(self) -> ArrayRef { + Box::new(self) + } + + #[inline] + fn into_any(self: Box) -> Box { + self + } + + #[inline] + fn len(&self) -> usize { + self.len + } + + #[inline] + fn is_empty(&self) -> bool { + self.len == 0 + } + + #[inline] + fn dtype(&self) -> &DType { + &self.dtype + } + + #[inline] + fn stats(&self) -> Stats { + Stats::new(&self.stats, self) + } + + fn scalar_at(&self, index: usize) -> VortexResult> { + if !self.is_valid(index) { + return Ok(NullableScalar::none(self.dtype().clone()).boxed()); + } + + if let Some(patch) = self + .patches() + .and_then(|p| p.scalar_at(index).ok()) + .and_then(|p| p.into_nonnull()) + { + return Ok(patch); + } + + todo!("Decode single element from BitPacked array"); + } + + fn iter_arrow(&self) -> Box { + todo!() + } + + fn slice(&self, _start: usize, _stop: usize) -> VortexResult { + unimplemented!("BitPackedArray::slice") + } + + #[inline] + fn encoding(&self) -> EncodingRef { + &BitPackedEncoding + } + + #[inline] + fn nbytes(&self) -> usize { + self.encoded().nbytes() + + self.patches().map(|p| p.nbytes()).unwrap_or(0) + + self.validity().map(|v| v.nbytes()).unwrap_or(0) + } + + fn serde(&self) -> &dyn ArraySerde { + self + } +} + +impl<'arr> AsRef<(dyn Array + 'arr)> for BitPackedArray { + fn as_ref(&self) -> &(dyn Array + 'arr) { + self + } +} + +impl ArrayDisplay for BitPackedArray { + fn fmt(&self, f: &mut ArrayFormatter) -> std::fmt::Result { + f.writeln(format!("packed: u{}", self.bit_width()))?; + if let Some(p) = self.patches() { + f.writeln("patches:")?; + f.indent(|indent| indent.array(p.as_ref()))?; + } + f.array(self.encoded()) + } +} + +impl StatsCompute for BitPackedArray { + fn compute(&self, _stat: &Stat) -> StatsSet { + // TODO(ngates): implement based on the encoded array + StatsSet::from(HashMap::new()) + } +} + +#[derive(Debug)] +pub struct BitPackedEncoding; + +pub const FL_BITPACKED_ENCODING: EncodingId = EncodingId::new("fastlanes.bitpacked"); + +impl Encoding for BitPackedEncoding { + fn id(&self) -> &EncodingId { + &FL_BITPACKED_ENCODING + } + + fn compression(&self) -> Option<&dyn EncodingCompression> { + Some(self) + } + + fn serde(&self) -> Option<&dyn EncodingSerde> { + Some(self) + } +} diff --git a/vortex-fastlanes/src/bitpacking/serde.rs b/vortex-fastlanes/src/bitpacking/serde.rs new file mode 100644 index 0000000000..676d442878 --- /dev/null +++ b/vortex-fastlanes/src/bitpacking/serde.rs @@ -0,0 +1,47 @@ +// (c) Copyright 2024 Fulcrum Technologies, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io; + +use vortex::array::{Array, ArrayRef}; +use vortex::serde::{ArraySerde, EncodingSerde, ReadCtx, WriteCtx}; + +use crate::{BitPackedArray, BitPackedEncoding}; + +impl ArraySerde for BitPackedArray { + fn write(&self, ctx: &mut WriteCtx) -> io::Result<()> { + ctx.write(self.encoded())?; + ctx.write_optional_array(self.validity())?; + ctx.write_optional_array(self.patches())?; + ctx.write_usize(self.bit_width())?; + ctx.dtype(self.dtype())?; + ctx.write_usize(self.len()) + } +} + +impl EncodingSerde for BitPackedEncoding { + fn read(&self, ctx: &mut ReadCtx) -> io::Result { + let encoded = ctx.read()?; + let validity = ctx.read_optional_array()?; + let patches = ctx.read_optional_array()?; + let bit_width = ctx.read_usize()?; + let dtype = ctx.dtype()?; + let len = ctx.read_usize()?; + Ok( + BitPackedArray::try_new(encoded, validity, patches, bit_width, dtype, len) + .unwrap() + .boxed(), + ) + } +} diff --git a/vortex-fastlanes/src/for/compress.rs b/vortex-fastlanes/src/for/compress.rs new file mode 100644 index 0000000000..1f5bab357f --- /dev/null +++ b/vortex-fastlanes/src/for/compress.rs @@ -0,0 +1,117 @@ +// (c) Copyright 2024 Fulcrum Technologies, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use itertools::Itertools; +use log::debug; + +use vortex::array::downcast::DowncastArrayBuiltin; +use vortex::array::primitive::PrimitiveArray; +use vortex::array::{Array, ArrayRef}; +use vortex::compress::{CompressConfig, CompressCtx, Compressor, EncodingCompression}; +use vortex::match_each_integer_ptype; +use vortex::stats::Stat; + +use crate::{FoRArray, FoREncoding}; + +impl EncodingCompression for FoREncoding { + fn compressor( + &self, + array: &dyn Array, + _config: &CompressConfig, + ) -> Option<&'static Compressor> { + // Only support primitive arrays + let Some(parray) = array.maybe_primitive() else { + debug!("Skipping FoR: not primitive"); + return None; + }; + + // Only supports integers + if !parray.ptype().is_int() { + debug!("Skipping FoR: not int"); + return None; + } + + // Nothing for us to do if the min is already zero. + if parray + .stats() + .get_or_compute_cast::(&Stat::Min) + .unwrap() + == 0 + { + debug!("Skipping BitPacking: min is zero"); + return None; + } + + debug!("Compressing with FoR"); + Some(&(for_compressor as Compressor)) + } +} + +fn for_compressor(array: &dyn Array, like: Option<&dyn Array>, ctx: CompressCtx) -> ArrayRef { + let parray = array.as_primitive(); + + let child = match_each_integer_ptype!(parray.ptype(), |$T| { + let min = parray.stats().get_or_compute_as::<$T>(&Stat::Min).unwrap(); + // TODO(ngates): check for overflow + let values = parray.buffer().typed_data::<$T>().iter().map(|v| v - min) + // TODO(ngates): cast to unsigned + // .map(|v| v as parray.ptype().to_unsigned()::T) + .collect_vec(); + PrimitiveArray::from_vec(values) + }); + + // TODO(ngates): remove FoR as a potential encoding from the ctx + let compressed_child = ctx.compress( + child.as_ref(), + like.map(|l| l.as_any().downcast_ref::().unwrap().child()), + ); + let reference = parray.stats().get(&Stat::Min).unwrap(); + FoRArray::try_new(compressed_child, reference) + .unwrap() + .boxed() +} + +#[cfg(test)] +mod test { + use std::collections::HashSet; + + use vortex::array::primitive::PrimitiveEncoding; + use vortex::array::Encoding; + + use crate::BitPackedEncoding; + + use super::*; + + #[test] + fn test_compress() { + let cfg = CompressConfig::new( + // We need some BitPacking else we will need choose FoR. + HashSet::from([ + PrimitiveEncoding.id(), + FoREncoding.id(), + BitPackedEncoding.id(), + ]), + HashSet::default(), + ); + let ctx = CompressCtx::new(&cfg); + + // Create a range offset by a million + let array = PrimitiveArray::from_vec((0u32..10_000).map(|v| v + 1_000_000).collect_vec()); + + let compressed = ctx.compress(&array, None); + assert_eq!(compressed.encoding().id(), FoREncoding.id()); + let fa = compressed.as_any().downcast_ref::().unwrap(); + assert_eq!(fa.reference().try_into(), Ok(1_000_000u32)); + } +} diff --git a/vortex-fastlanes/src/for/mod.rs b/vortex-fastlanes/src/for/mod.rs new file mode 100644 index 0000000000..05aa2dee80 --- /dev/null +++ b/vortex-fastlanes/src/for/mod.rs @@ -0,0 +1,164 @@ +// (c) Copyright 2024 Fulcrum Technologies, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; + +use vortex::array::{Array, ArrayRef, ArrowIterator, Encoding, EncodingId, EncodingRef}; +use vortex::compress::EncodingCompression; +use vortex::dtype::DType; +use vortex::error::VortexResult; +use vortex::formatter::{ArrayDisplay, ArrayFormatter}; +use vortex::scalar::Scalar; +use vortex::serde::{ArraySerde, EncodingSerde}; +use vortex::stats::{Stat, Stats, StatsCompute, StatsSet}; + +mod compress; +mod serde; + +#[derive(Debug, Clone)] +pub struct FoRArray { + child: ArrayRef, + reference: Box, + stats: Arc>, +} + +impl FoRArray { + pub fn try_new(child: ArrayRef, reference: Box) -> VortexResult { + // TODO(ngates): check the dtype of reference == child.dtype() + Ok(Self { + child, + reference, + stats: Arc::new(RwLock::new(StatsSet::new())), + }) + } + + #[inline] + pub fn child(&self) -> &dyn Array { + self.child.as_ref() + } + + #[inline] + pub fn reference(&self) -> &dyn Scalar { + self.reference.as_ref() + } +} + +impl Array for FoRArray { + #[inline] + fn as_any(&self) -> &dyn Any { + self + } + + #[inline] + fn boxed(self) -> ArrayRef { + Box::new(self) + } + + #[inline] + fn into_any(self: Box) -> Box { + self + } + + #[inline] + fn len(&self) -> usize { + self.child.len() + } + + #[inline] + fn is_empty(&self) -> bool { + self.child.is_empty() + } + + #[inline] + fn dtype(&self) -> &DType { + self.child.dtype() + } + + #[inline] + fn stats(&self) -> Stats { + Stats::new(&self.stats, self) + } + + fn scalar_at(&self, _index: usize) -> VortexResult> { + todo!() + } + + fn iter_arrow(&self) -> Box { + todo!() + } + + fn slice(&self, start: usize, stop: usize) -> VortexResult { + Ok(Self { + child: self.child.slice(start, stop)?, + reference: self.reference.clone(), + stats: Arc::new(RwLock::new(StatsSet::new())), + } + .boxed()) + } + + #[inline] + fn encoding(&self) -> EncodingRef { + &FoREncoding + } + + #[inline] + fn nbytes(&self) -> usize { + self.child.nbytes() + self.reference.nbytes() + } + + fn serde(&self) -> &dyn ArraySerde { + self + } +} + +impl<'arr> AsRef<(dyn Array + 'arr)> for FoRArray { + fn as_ref(&self) -> &(dyn Array + 'arr) { + self + } +} + +impl ArrayDisplay for FoRArray { + fn fmt(&self, f: &mut ArrayFormatter) -> std::fmt::Result { + f.writeln(format!("reference: {}", self.reference))?; + f.indent(|indent| indent.array(self.child())) + } +} + +impl StatsCompute for FoRArray { + fn compute(&self, _stat: &Stat) -> StatsSet { + // TODO(ngates): implement based on the encoded array + StatsSet::from(HashMap::new()) + } +} + +#[derive(Debug)] +pub struct FoREncoding; + +pub const FL_FOR_ENCODING: EncodingId = EncodingId::new("fastlanes.for"); + +impl Encoding for FoREncoding { + fn id(&self) -> &EncodingId { + &FL_FOR_ENCODING + } + + fn compression(&self) -> Option<&dyn EncodingCompression> { + Some(self) + } + + fn serde(&self) -> Option<&dyn EncodingSerde> { + Some(self) + } +} diff --git a/vortex-fastlanes/src/for/serde.rs b/vortex-fastlanes/src/for/serde.rs new file mode 100644 index 0000000000..d8c237b2b6 --- /dev/null +++ b/vortex-fastlanes/src/for/serde.rs @@ -0,0 +1,66 @@ +// (c) Copyright 2024 Fulcrum Technologies, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io; + +use vortex::array::{Array, ArrayRef}; +use vortex::serde::{ArraySerde, EncodingSerde, ReadCtx, WriteCtx}; + +use crate::{FoRArray, FoREncoding}; + +impl ArraySerde for FoRArray { + fn write(&self, ctx: &mut WriteCtx) -> io::Result<()> { + ctx.scalar(self.reference())?; + ctx.write(self.child()) + } +} + +impl EncodingSerde for FoREncoding { + fn read(&self, ctx: &mut ReadCtx) -> io::Result { + let reference = ctx.scalar()?; + let child = ctx.read()?; + Ok(FoRArray::try_new(child, reference).unwrap().boxed()) + } +} + +#[cfg(test)] +mod test { + use std::io; + + use vortex::array::primitive::PrimitiveArray; + use vortex::array::{Array, ArrayRef}; + use vortex::scalar::Scalar; + use vortex::serde::{ReadCtx, WriteCtx}; + + use super::*; + + fn roundtrip_array(array: &dyn Array) -> io::Result { + let mut buf = Vec::::new(); + let mut write_ctx = WriteCtx::new(&mut buf); + write_ctx.write(array)?; + let mut read = buf.as_slice(); + let mut read_ctx = ReadCtx::new(array.dtype(), &mut read); + read_ctx.read() + } + + #[test] + fn roundtrip() { + let arr = FoRArray::try_new( + PrimitiveArray::from_vec(vec![-7i64, -13, 17, 23]).boxed(), + >>::into(-7i64), + ) + .unwrap(); + roundtrip_array(arr.as_ref()).unwrap(); + } +} diff --git a/vortex-fastlanes/src/lib.rs b/vortex-fastlanes/src/lib.rs new file mode 100644 index 0000000000..9b5ddce8ee --- /dev/null +++ b/vortex-fastlanes/src/lib.rs @@ -0,0 +1,28 @@ +// (c) Copyright 2024 Fulcrum Technologies, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use linkme::distributed_slice; + +pub use bitpacking::*; +pub use r#for::*; +use vortex::array::{EncodingRef, ENCODINGS}; + +mod bitpacking; +mod r#for; + +#[distributed_slice(ENCODINGS)] +static ENCODINGS_FL_BITPACKING: EncodingRef = &BitPackedEncoding; + +#[distributed_slice(ENCODINGS)] +static ENCODINGS_FL_FOR: EncodingRef = &FoREncoding; diff --git a/vortex-ffor/src/lib.rs b/vortex-ffor/src/lib.rs index fdfe4dc597..2adcfcbaef 100644 --- a/vortex-ffor/src/lib.rs +++ b/vortex-ffor/src/lib.rs @@ -12,8 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub use ffor::*; use linkme::distributed_slice; + +pub use ffor::*; use vortex::array::{EncodingRef, ENCODINGS}; mod compress; diff --git a/vortex/src/array/mod.rs b/vortex/src/array/mod.rs index f22e09e039..c9386f10fb 100644 --- a/vortex/src/array/mod.rs +++ b/vortex/src/array/mod.rs @@ -112,6 +112,7 @@ pub fn check_index_bounds(array: &dyn Array, index: usize) -> VortexResult<()> { } pub fn check_validity_buffer(validity: Option<&ArrayRef>) -> VortexResult<()> { + // TODO(ngates): take a length parameter and check that the length of the validity buffer matches if validity .map(|v| !matches!(v.dtype(), DType::Bool(Nullability::NonNullable))) .unwrap_or(false) diff --git a/vortex/src/compress.rs b/vortex/src/compress.rs index e94a004a31..9210c5484f 100644 --- a/vortex/src/compress.rs +++ b/vortex/src/compress.rs @@ -148,7 +148,6 @@ pub fn sampled_compression(array: &dyn Array, ctx: CompressCtx) -> ArrayRef { let candidate_compressors: Vec<&Compressor> = ENCODINGS .iter() - .filter(|encoding| ctx.options.is_enabled(encoding.id())) // TODO(robert): Avoid own encoding to avoid infinite recursion .filter(|encoding| encoding.id().name() != array.encoding().id().name()) .filter(|encoding| ctx.options().is_enabled(encoding.id())) diff --git a/vortex/src/ptype.rs b/vortex/src/ptype.rs index f1c153fc04..4564842478 100644 --- a/vortex/src/ptype.rs +++ b/vortex/src/ptype.rs @@ -115,6 +115,22 @@ macro_rules! match_each_integer_ptype { } pub use match_each_integer_ptype; +#[macro_export] +macro_rules! match_each_unsigned_integer_ptype { + ($self:expr, | $_:tt $enc:ident | $($body:tt)*) => ({ + macro_rules! __with__ {( $_ $enc:ident ) => ( $($body)* )} + use $crate::ptype::PType; + match $self { + PType::U8 => __with__! { u8 }, + PType::U16 => __with__! { u16 }, + PType::U32 => __with__! { u32 }, + PType::U64 => __with__! { u64 }, + _ => panic!("Unsupported ptype {:?}", $self), + } + }) +} +pub use match_each_unsigned_integer_ptype; + impl PType { pub fn is_unsigned_int(self) -> bool { matches!(self, PType::U8 | PType::U16 | PType::U32 | PType::U64) @@ -135,6 +151,10 @@ impl PType { pub fn byte_width(&self) -> usize { match_each_native_ptype!(self, |$T| std::mem::size_of::<$T>()) } + + pub fn bit_width(&self) -> usize { + self.byte_width() * 8 + } } impl TryFrom<&DType> for PType { diff --git a/vortex/src/serde/mod.rs b/vortex/src/serde/mod.rs index 771ab3dd36..b2cec9fc08 100644 --- a/vortex/src/serde/mod.rs +++ b/vortex/src/serde/mod.rs @@ -119,6 +119,20 @@ impl<'a> ReadCtx<'a> { .map(|u| u as usize) } + pub fn read_option_tag(&mut self) -> io::Result { + let mut tag = [0; 1]; + self.r.read_exact(&mut tag)?; + Ok(tag[0] == 0x01) + } + + pub fn read_optional_array(&mut self) -> io::Result> { + if self.read_option_tag()? { + self.read().map(Some) + } else { + Ok(None) + } + } + pub fn read(&mut self) -> io::Result { let encoding_id = self.read_usize()?; if let Some(serde) = ENCODINGS @@ -178,6 +192,15 @@ impl<'a> WriteCtx<'a> { self.w.write_all(&[if present { 0x01 } else { 0x00 }]) } + pub fn write_optional_array(&mut self, array: Option<&dyn Array>) -> io::Result<()> { + self.write_option_tag(array.is_some())?; + if let Some(array) = array { + self.write(array) + } else { + Ok(()) + } + } + pub fn write(&mut self, array: &dyn Array) -> io::Result<()> { let encoding_id = self .available_encodings