Skip to content

Commit

Permalink
File-based table provider for Datafusion (#546)
Browse files Browse the repository at this point in the history
A very minimal datafusion `TableProvider` to read vortex-encoded files.
  • Loading branch information
AdamGS authored Aug 6, 2024
1 parent 31198a0 commit 5a8c2d8
Show file tree
Hide file tree
Showing 21 changed files with 944 additions and 431 deletions.
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ members = [
"encodings/*",
"pyvortex",
"vortex-array",
"vortex-build",
"vortex-buffer",
"vortex-build",
"vortex-datafusion",
"vortex-dtype",
"vortex-error",
Expand Down Expand Up @@ -87,7 +87,7 @@ mimalloc = "0.1.42"
monoio = "0.2.3"
num-traits = "0.2.18"
num_enum = "0.7.2"
object_store = "0.10.1"
object_store = "0.10.2"
parquet = "52.0.0"
paste = "1.0.14"
pin-project = "1.1.5"
Expand Down Expand Up @@ -139,6 +139,8 @@ walkdir = "2.5.0"
worker = "0.3.0"
xshell = "0.2.6"
zigzag = "0.1.0"
url = "2"
tempfile = "3"

[workspace.lints.rust]
warnings = "deny"
Expand Down
2 changes: 1 addition & 1 deletion bench-vortex/benches/datafusion_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use lazy_static::lazy_static;
use vortex::compress::CompressionStrategy;
use vortex::encoding::EncodingRef;
use vortex::{Array, Context, IntoArray, ToArrayData};
use vortex_datafusion::{VortexMemTable, VortexMemTableOptions};
use vortex_datafusion::memory::{VortexMemTable, VortexMemTableOptions};
use vortex_dict::DictEncoding;
use vortex_fastlanes::{BitPackedEncoding, DeltaEncoding, FoREncoding};
use vortex_sampling_compressor::compressors::bitpacked::BitPackedCompressor;
Expand Down
5 changes: 3 additions & 2 deletions bench-vortex/benches/random_access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion};
use mimalloc::MiMalloc;
use object_store::aws::AmazonS3Builder;
use object_store::local::LocalFileSystem;
use object_store::ObjectStore;
use tokio::runtime::Runtime;

#[global_allocator]
Expand All @@ -31,7 +32,7 @@ fn random_access_vortex(c: &mut Criterion) {
.iter(|| async { black_box(take_vortex_tokio(&taxi_vortex, &INDICES).await.unwrap()) })
});

let local_fs = LocalFileSystem::new();
let local_fs = Arc::new(LocalFileSystem::new()) as Arc<dyn ObjectStore>;
let local_fs_path = object_store::path::Path::from_filesystem_path(&taxi_vortex).unwrap();
group.bench_function("localfs", |b| {
b.to_async(Runtime::new().unwrap()).iter(|| async {
Expand All @@ -43,7 +44,7 @@ fn random_access_vortex(c: &mut Criterion) {
})
});

let r2_fs = AmazonS3Builder::from_env().build().unwrap();
let r2_fs = Arc::new(AmazonS3Builder::from_env().build().unwrap()) as Arc<dyn ObjectStore>;
let r2_path =
object_store::path::Path::from_url_path(taxi_vortex.file_name().unwrap().to_str().unwrap())
.unwrap();
Expand Down
4 changes: 2 additions & 2 deletions bench-vortex/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ pub async fn read_vortex_footer_format<R: VortexReadAt>(
)
}

pub async fn take_vortex_object_store<O: ObjectStore>(
fs: &O,
pub async fn take_vortex_object_store(
fs: &Arc<dyn ObjectStore>,
path: &object_store::path::Path,
indices: &[u64],
) -> VortexResult<Array> {
Expand Down
3 changes: 2 additions & 1 deletion bench-vortex/src/tpch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use datafusion::prelude::{CsvReadOptions, ParquetReadOptions, SessionContext};
use vortex::array::chunked::ChunkedArray;
use vortex::arrow::FromArrowArray;
use vortex::{Array, ArrayDType, ArrayData, IntoArray};
use vortex_datafusion::{SessionContextExt, VortexMemTableOptions};
use vortex_datafusion::memory::VortexMemTableOptions;
use vortex_datafusion::SessionContextExt;

use crate::idempotent_async;

Expand Down
14 changes: 8 additions & 6 deletions vortex-array/src/array/struct_/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize};
use vortex_dtype::{DType, FieldName, FieldNames, Nullability, StructDType};
use vortex_error::{vortex_bail, VortexResult};
use vortex_error::{vortex_bail, vortex_err, VortexResult};

use crate::stats::{ArrayStatisticsCompute, StatsSet};
use crate::validity::{ArrayValidity, LogicalValidity, Validity, ValidityMetadata};
Expand Down Expand Up @@ -97,12 +97,12 @@ impl StructArray {
let mut children = Vec::with_capacity(projection.len());
let mut names = Vec::with_capacity(projection.len());

for column_idx in projection {
for &column_idx in projection {
children.push(
self.field(*column_idx)
.expect("column must not exceed bounds"),
self.field(column_idx)
.ok_or(vortex_err!(OutOfBounds: column_idx, 0, self.dtypes().len()))?,
);
names.push(self.names()[*column_idx].clone());
names.push(self.names()[column_idx].clone());
}

StructArray::try_new(
Expand All @@ -124,7 +124,9 @@ impl ArrayVariants for StructArray {

impl StructArrayTrait for StructArray {
fn field(&self, idx: usize) -> Option<Array> {
self.array().child(idx, &self.dtypes()[idx], self.len())
self.dtypes()
.get(idx)
.and_then(|dtype| self.array().child(idx, dtype, self.len()))
}
}

Expand Down
17 changes: 13 additions & 4 deletions vortex-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@ include = { workspace = true }
edition = { workspace = true }
rust-version = { workspace = true }

[dependencies]
[lib]
name = "vortex_datafusion"
path = "src/lib.rs"

[dependencies]
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
async-trait = { workspace = true }

chrono = { workspace = true }
datafusion = { workspace = true }
datafusion-common = { workspace = true }
datafusion-execution = { workspace = true }
Expand All @@ -26,15 +29,21 @@ datafusion-physical-plan = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
lazy_static = { workspace = true }
log = { workspace = true }
object_store = { workspace = true }
pin-project = { workspace = true }
vortex-array = { workspace = true }
vortex-dtype = { workspace = true }
vortex-error = { workspace = true }
vortex-error = { workspace = true, features = ["datafusion"] }
vortex-expr = { workspace = true }
vortex-scalar = { workspace = true, features = ["datafusion"] }
vortex-serde = { workspace = true, features = ["object_store"] }

[dev-dependencies]
tokio = { workspace = true, features = ["test-util"] }
anyhow = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true, features = ["test-util", "rt-multi-thread"] }
url = { workspace = true }

[lints]
workspace = true
98 changes: 98 additions & 0 deletions vortex-datafusion/examples/table_provider.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
use std::sync::Arc;

use arrow_schema::{DataType, Field, Schema};
use datafusion::prelude::SessionContext;
use datafusion_execution::object_store::ObjectStoreUrl;
use object_store::local::LocalFileSystem;
use object_store::path::Path;
use object_store::ObjectStore;
use tempfile::tempdir;
use tokio::fs::OpenOptions;
use url::Url;
use vortex::array::chunked::ChunkedArray;
use vortex::array::primitive::PrimitiveArray;
use vortex::array::struct_::StructArray;
use vortex::array::varbin::VarBinArray;
use vortex::validity::Validity;
use vortex::IntoArray;
use vortex_datafusion::persistent::config::{VortexFile, VortexTableConfig};
use vortex_datafusion::persistent::provider::VortexFileTableProvider;
use vortex_serde::file::file_writer::FileWriter;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let temp_dir = tempdir()?;
let strings = ChunkedArray::from_iter([
VarBinArray::from(vec!["ab", "foo", "bar", "baz"]).into_array(),
VarBinArray::from(vec!["ab", "foo", "bar", "baz"]).into_array(),
])
.into_array();

let numbers = ChunkedArray::from_iter([
PrimitiveArray::from(vec![1u32, 2, 3, 4]).into_array(),
PrimitiveArray::from(vec![5u32, 6, 7, 8]).into_array(),
])
.into_array();

let st = StructArray::try_new(
["strings".into(), "numbers".into()].into(),
vec![strings, numbers],
8,
Validity::NonNullable,
)
.unwrap();

let filepath = temp_dir.path().join("a.vtx");

let f = OpenOptions::new()
.write(true)
.truncate(true)
.create(true)
.open(&filepath)
.await?;

let writer = FileWriter::new(f);
let writer = writer.write_array_columns(st.into_array()).await?;
writer.finalize().await?;

let f = tokio::fs::File::open(&filepath).await?;
let file_size = f.metadata().await?.len();

let object_store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
let url = ObjectStoreUrl::local_filesystem();

let p = Path::from_filesystem_path(filepath)?;

let config = VortexTableConfig::new(
Arc::new(Schema::new(vec![
Field::new("strings", DataType::Utf8, false),
Field::new("numbers", DataType::UInt32, false),
])),
vec![VortexFile::new(p, file_size)],
);

let provider = Arc::new(VortexFileTableProvider::try_new(url, config)?);

let ctx = SessionContext::new();
ctx.register_table("vortex_tbl", Arc::clone(&provider) as _)?;

let url = Url::try_from("file://").unwrap();
ctx.register_object_store(&url, object_store);

run_query(&ctx, "SELECT * FROM vortex_tbl").await?;

Ok(())
}

async fn run_query(ctx: &SessionContext, query_string: impl AsRef<str>) -> anyhow::Result<()> {
let query_string = query_string.as_ref();

ctx.sql(&format!("EXPLAIN {query_string}"))
.await?
.show()
.await?;

ctx.sql(query_string).await?.show().await?;

Ok(())
}
Loading

0 comments on commit 5a8c2d8

Please sign in to comment.