Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use vortex-buffer over bytes::Bytes #1713

Merged
merged 43 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
ff69266
Message cleanup
gatesn Dec 14, 2024
d1cd362
Move expr
gatesn Dec 14, 2024
54ddfd8
Clean up
gatesn Dec 14, 2024
b368ec0
Clean up
gatesn Dec 14, 2024
34edbf5
Clean up
gatesn Dec 14, 2024
4d02906
message reader
gatesn Dec 14, 2024
cc65637
Message codec
gatesn Dec 16, 2024
21ce1b9
Message codec
gatesn Dec 16, 2024
831d41e
Message codec
gatesn Dec 16, 2024
0010f00
Message codec
gatesn Dec 16, 2024
8c55100
Message codec
gatesn Dec 16, 2024
29a78a1
Message codec
gatesn Dec 16, 2024
bbda9e6
Message Codecs
gatesn Dec 16, 2024
5a7c8ab
Message Codecs
gatesn Dec 16, 2024
75fd1e0
Message Codecs
gatesn Dec 16, 2024
0c333b9
merge
gatesn Dec 16, 2024
92a7d6e
merge
gatesn Dec 16, 2024
b474b4b
Merge
gatesn Dec 16, 2024
7e1c80f
Merge
gatesn Dec 16, 2024
92062d8
Fix docs
gatesn Dec 16, 2024
87ba232
Assert alignment
gatesn Dec 16, 2024
f4d6fdf
Merge branch 'develop' into ngates/message-reader
gatesn Dec 17, 2024
1b285e6
Address comments
gatesn Dec 17, 2024
e532fa6
Address comments
gatesn Dec 17, 2024
ca4e632
Address comments
gatesn Dec 17, 2024
d7c38be
Address comments
gatesn Dec 17, 2024
fe46f61
Address comments
gatesn Dec 17, 2024
80c6fcc
Address comments
gatesn Dec 17, 2024
20f8b18
Address comments
gatesn Dec 17, 2024
78d9f0d
Fix decoder
gatesn Dec 18, 2024
94ae305
Fix decoder
gatesn Dec 18, 2024
5b21ae7
Remove unpin bounds
gatesn Dec 18, 2024
c5277d0
Remove unpin bounds
gatesn Dec 18, 2024
24648c9
Remove bytes
gatesn Dec 18, 2024
6313dca
Add PrimInt bound to VarBinOffset
gatesn Dec 18, 2024
2040b0d
Add PrimInt bound to VarBinOffset
gatesn Dec 18, 2024
641d1c7
Add PrimInt bound to VarBinOffset
gatesn Dec 18, 2024
5633dd3
Merge branch 'develop' into ngates/message-reader
gatesn Dec 18, 2024
e73ef79
Merge branch 'ngates/message-reader' into ngates/read-at-alignment
gatesn Dec 18, 2024
bc128b0
Some fixes
gatesn Dec 18, 2024
124441a
Some fixes
gatesn Dec 18, 2024
e77ff30
Buffer alignment
gatesn Dec 18, 2024
ddbd714
Buffer alignment
gatesn Dec 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 139 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ categories = ["database-implementations", "data-structures", "compression"]

[workspace.dependencies]
anyhow = "1.0"
aligned-buffer = "0.2.0"
arbitrary = "1.3.2"
arrayref = "0.3.7"
arrow = { version = "53.0.0" }
arrow-arith = "53.0.0"
arrow-array = "53.0.0"
arrow-buffer = "53.0.0"
arrow-cast = "53.0.0"
arrow-ipc = "53.0.0"
arrow-ord = "53.0.0"
arrow-schema = "53.0.0"
arrow-select = "53.0.0"
Expand Down Expand Up @@ -103,6 +103,7 @@ once_cell = "1.20.2"
parquet = "53.0.0"
paste = "1.0.14"
pin-project = "1.1.5"
pin-project-lite = "0.2.15"
prettytable-rs = "0.10.0"
tabled = { version = "0.17.0", default-features = false }
prost = "0.13.0"
Expand Down
35 changes: 10 additions & 25 deletions bench-vortex/benches/bytes_at.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,12 @@
use std::sync::Arc;

use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use futures::executor::block_on;
use futures::StreamExt;
use vortex::array::{PrimitiveArray, VarBinArray, VarBinViewArray};
use vortex::buffer::Buffer;
use vortex::dtype::{DType, Nullability};
use vortex::io::VortexBufReader;
use vortex::ipc::stream_reader::StreamArrayReader;
use vortex::ipc::stream_writer::StreamArrayWriter;
use vortex::ipc::iterator::{ArrayIteratorIPC, SyncIPCReader};
use vortex::iter::ArrayIteratorExt;
use vortex::validity::Validity;
use vortex::{Context, IntoArrayData, IntoCanonical};
use vortex::{Context, IntoArrayData, IntoArrayVariant};

fn array_data_fixture() -> VarBinArray {
VarBinArray::try_new(
Expand All @@ -27,27 +23,16 @@ fn array_data_fixture() -> VarBinArray {

fn array_view_fixture() -> VarBinViewArray {
let array_data = array_data_fixture();
let mut buffer = Vec::new();

let writer = StreamArrayWriter::new(&mut buffer);
block_on(writer.write_array(array_data.into_array())).unwrap();
let buffer = array_data
.into_array()
.into_array_iterator()
.write_ipc(vec![])
.unwrap();

let buffer = Buffer::from(buffer);

let ctx = Arc::new(Context::default());
let reader = block_on(StreamArrayReader::try_new(
VortexBufReader::new(buffer),
ctx.clone(),
))
.unwrap();
let reader = block_on(reader.load_dtype()).unwrap();

let mut stream = Box::pin(reader.into_array_stream());

block_on(stream.next())
.unwrap()
SyncIPCReader::try_new(buffer.as_slice(), Arc::new(Context::default()))
.unwrap()
.into_canonical()
.into_array_data()
.unwrap()
.into_varbinview()
.unwrap()
Expand Down
15 changes: 8 additions & 7 deletions bench-vortex/src/data_downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@ use std::path::PathBuf;

use arrow_array::RecordBatchReader;
use bzip2::read::BzDecoder;
use futures::StreamExt;
use log::info;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use tokio::runtime::Runtime;
use vortex::array::ChunkedArray;
use vortex::arrow::FromArrowType;
use vortex::dtype::DType;
use vortex::error::{VortexError, VortexResult};
use vortex::io::TokioAdapter;
use vortex::ipc::stream_writer::StreamArrayWriter;
use vortex::io::{TokioAdapter, VortexWrite};
use vortex::ipc::stream::ArrayStreamIPC;
use vortex::{ArrayData, IntoArrayData};

use crate::idempotent;
Expand Down Expand Up @@ -56,11 +57,11 @@ pub fn data_vortex_uncompressed(fname_out: &str, downloaded_data: PathBuf) -> Pa
Runtime::new()
.unwrap()
.block_on(async move {
let write = TokioAdapter(tokio::fs::File::create(path).await.unwrap());
StreamArrayWriter::new(write)
.write_array(array)
.await
.unwrap();
let mut write = TokioAdapter(tokio::fs::File::create(path).await.unwrap());
let mut bytes = array.into_array_stream().into_ipc();
while let Some(buffer) = bytes.next().await {
write.write_all(buffer.unwrap()).await.unwrap();
}
Ok::<(), VortexError>(())
})
.unwrap();
Expand Down
1 change: 0 additions & 1 deletion vortex-array/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ arrow-schema = { workspace = true }
arrow-select = { workspace = true }
arrow-string = { workspace = true }
backtrace = { workspace = true }
bytes = { workspace = true }
enum-iterator = { workspace = true }
flatbuffers = { workspace = true }
flexbuffers = { workspace = true }
Expand Down
Loading
Loading