diff --git a/Cargo.lock b/Cargo.lock index 5c3db8676d..4350a047ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4165,9 +4165,11 @@ dependencies = [ name = "vortex-datafusion" version = "0.2.0" dependencies = [ + "anyhow", "arrow-array", "arrow-schema", "async-trait", + "chrono", "datafusion", "datafusion-common", "datafusion-execution", @@ -4177,13 +4179,18 @@ dependencies = [ "futures", "itertools 0.13.0", "lazy_static", + "log", + "object_store", "pin-project", + "tempfile", "tokio", + "url", "vortex-array", "vortex-dtype", "vortex-error", "vortex-expr", "vortex-scalar", + "vortex-serde", ] [[package]] @@ -4238,6 +4245,7 @@ name = "vortex-error" version = "0.2.0" dependencies = [ "arrow-schema", + "datafusion-common", "flatbuffers", "flexbuffers", "object_store", diff --git a/Cargo.toml b/Cargo.toml index 250a4e3641..3c19d7289e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,8 +4,8 @@ members = [ "encodings/*", "pyvortex", "vortex-array", - "vortex-build", "vortex-buffer", + "vortex-build", "vortex-datafusion", "vortex-dtype", "vortex-error", @@ -87,7 +87,7 @@ mimalloc = "0.1.42" monoio = "0.2.3" num-traits = "0.2.18" num_enum = "0.7.2" -object_store = "0.10.1" +object_store = "0.10.2" parquet = "52.0.0" paste = "1.0.14" pin-project = "1.1.5" @@ -139,6 +139,8 @@ walkdir = "2.5.0" worker = "0.3.0" xshell = "0.2.6" zigzag = "0.1.0" +url = "2" +tempfile = "3" [workspace.lints.rust] warnings = "deny" diff --git a/bench-vortex/benches/datafusion_benchmark.rs b/bench-vortex/benches/datafusion_benchmark.rs index 5429781d36..7e2ce50e8f 100644 --- a/bench-vortex/benches/datafusion_benchmark.rs +++ b/bench-vortex/benches/datafusion_benchmark.rs @@ -16,7 +16,7 @@ use lazy_static::lazy_static; use vortex::compress::CompressionStrategy; use vortex::encoding::EncodingRef; use vortex::{Array, Context, IntoArray, ToArrayData}; -use vortex_datafusion::{VortexMemTable, VortexMemTableOptions}; +use vortex_datafusion::memory::{VortexMemTable, VortexMemTableOptions}; use vortex_dict::DictEncoding; use vortex_fastlanes::{BitPackedEncoding, DeltaEncoding, FoREncoding}; use vortex_sampling_compressor::compressors::bitpacked::BitPackedCompressor; diff --git a/bench-vortex/benches/random_access.rs b/bench-vortex/benches/random_access.rs index 7d856ecc26..3d69ad9c2c 100644 --- a/bench-vortex/benches/random_access.rs +++ b/bench-vortex/benches/random_access.rs @@ -8,6 +8,7 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion}; use mimalloc::MiMalloc; use object_store::aws::AmazonS3Builder; use object_store::local::LocalFileSystem; +use object_store::ObjectStore; use tokio::runtime::Runtime; #[global_allocator] @@ -31,7 +32,7 @@ fn random_access_vortex(c: &mut Criterion) { .iter(|| async { black_box(take_vortex_tokio(&taxi_vortex, &INDICES).await.unwrap()) }) }); - let local_fs = LocalFileSystem::new(); + let local_fs = Arc::new(LocalFileSystem::new()) as Arc; let local_fs_path = object_store::path::Path::from_filesystem_path(&taxi_vortex).unwrap(); group.bench_function("localfs", |b| { b.to_async(Runtime::new().unwrap()).iter(|| async { @@ -43,7 +44,7 @@ fn random_access_vortex(c: &mut Criterion) { }) }); - let r2_fs = AmazonS3Builder::from_env().build().unwrap(); + let r2_fs = Arc::new(AmazonS3Builder::from_env().build().unwrap()) as Arc; let r2_path = object_store::path::Path::from_url_path(taxi_vortex.file_name().unwrap().to_str().unwrap()) .unwrap(); diff --git a/bench-vortex/src/reader.rs b/bench-vortex/src/reader.rs index bd00a4e9ee..79a9f06889 100644 --- a/bench-vortex/src/reader.rs +++ b/bench-vortex/src/reader.rs @@ -159,8 +159,8 @@ pub async fn read_vortex_footer_format( ) } -pub async fn take_vortex_object_store( - fs: &O, +pub async fn take_vortex_object_store( + fs: &Arc, path: &object_store::path::Path, indices: &[u64], ) -> VortexResult { diff --git a/bench-vortex/src/tpch/mod.rs b/bench-vortex/src/tpch/mod.rs index fc81921483..8d2f061dac 100644 --- a/bench-vortex/src/tpch/mod.rs +++ b/bench-vortex/src/tpch/mod.rs @@ -10,7 +10,8 @@ use datafusion::prelude::{CsvReadOptions, ParquetReadOptions, SessionContext}; use vortex::array::chunked::ChunkedArray; use vortex::arrow::FromArrowArray; use vortex::{Array, ArrayDType, ArrayData, IntoArray}; -use vortex_datafusion::{SessionContextExt, VortexMemTableOptions}; +use vortex_datafusion::memory::VortexMemTableOptions; +use vortex_datafusion::SessionContextExt; use crate::idempotent_async; diff --git a/vortex-array/src/array/struct_/mod.rs b/vortex-array/src/array/struct_/mod.rs index 8893ee0c29..c8c04406d9 100644 --- a/vortex-array/src/array/struct_/mod.rs +++ b/vortex-array/src/array/struct_/mod.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; use vortex_dtype::{DType, FieldName, FieldNames, Nullability, StructDType}; -use vortex_error::{vortex_bail, VortexResult}; +use vortex_error::{vortex_bail, vortex_err, VortexResult}; use crate::stats::{ArrayStatisticsCompute, StatsSet}; use crate::validity::{ArrayValidity, LogicalValidity, Validity, ValidityMetadata}; @@ -97,12 +97,12 @@ impl StructArray { let mut children = Vec::with_capacity(projection.len()); let mut names = Vec::with_capacity(projection.len()); - for column_idx in projection { + for &column_idx in projection { children.push( - self.field(*column_idx) - .expect("column must not exceed bounds"), + self.field(column_idx) + .ok_or(vortex_err!(OutOfBounds: column_idx, 0, self.dtypes().len()))?, ); - names.push(self.names()[*column_idx].clone()); + names.push(self.names()[column_idx].clone()); } StructArray::try_new( @@ -124,7 +124,9 @@ impl ArrayVariants for StructArray { impl StructArrayTrait for StructArray { fn field(&self, idx: usize) -> Option { - self.array().child(idx, &self.dtypes()[idx], self.len()) + self.dtypes() + .get(idx) + .and_then(|dtype| self.array().child(idx, dtype, self.len())) } } diff --git a/vortex-datafusion/Cargo.toml b/vortex-datafusion/Cargo.toml index 38696fbc11..dca4d66c6c 100644 --- a/vortex-datafusion/Cargo.toml +++ b/vortex-datafusion/Cargo.toml @@ -11,12 +11,15 @@ include = { workspace = true } edition = { workspace = true } rust-version = { workspace = true } -[dependencies] +[lib] +name = "vortex_datafusion" +path = "src/lib.rs" +[dependencies] arrow-array = { workspace = true } arrow-schema = { workspace = true } async-trait = { workspace = true } - +chrono = { workspace = true } datafusion = { workspace = true } datafusion-common = { workspace = true } datafusion-execution = { workspace = true } @@ -26,15 +29,21 @@ datafusion-physical-plan = { workspace = true } futures = { workspace = true } itertools = { workspace = true } lazy_static = { workspace = true } +log = { workspace = true } +object_store = { workspace = true } pin-project = { workspace = true } vortex-array = { workspace = true } vortex-dtype = { workspace = true } -vortex-error = { workspace = true } +vortex-error = { workspace = true, features = ["datafusion"] } vortex-expr = { workspace = true } vortex-scalar = { workspace = true, features = ["datafusion"] } +vortex-serde = { workspace = true, features = ["object_store"] } [dev-dependencies] -tokio = { workspace = true, features = ["test-util"] } +anyhow = { workspace = true } +tempfile = { workspace = true } +tokio = { workspace = true, features = ["test-util", "rt-multi-thread"] } +url = { workspace = true } [lints] workspace = true diff --git a/vortex-datafusion/examples/table_provider.rs b/vortex-datafusion/examples/table_provider.rs new file mode 100644 index 0000000000..88e4aeee3e --- /dev/null +++ b/vortex-datafusion/examples/table_provider.rs @@ -0,0 +1,98 @@ +use std::sync::Arc; + +use arrow_schema::{DataType, Field, Schema}; +use datafusion::prelude::SessionContext; +use datafusion_execution::object_store::ObjectStoreUrl; +use object_store::local::LocalFileSystem; +use object_store::path::Path; +use object_store::ObjectStore; +use tempfile::tempdir; +use tokio::fs::OpenOptions; +use url::Url; +use vortex::array::chunked::ChunkedArray; +use vortex::array::primitive::PrimitiveArray; +use vortex::array::struct_::StructArray; +use vortex::array::varbin::VarBinArray; +use vortex::validity::Validity; +use vortex::IntoArray; +use vortex_datafusion::persistent::config::{VortexFile, VortexTableConfig}; +use vortex_datafusion::persistent::provider::VortexFileTableProvider; +use vortex_serde::file::file_writer::FileWriter; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let temp_dir = tempdir()?; + let strings = ChunkedArray::from_iter([ + VarBinArray::from(vec!["ab", "foo", "bar", "baz"]).into_array(), + VarBinArray::from(vec!["ab", "foo", "bar", "baz"]).into_array(), + ]) + .into_array(); + + let numbers = ChunkedArray::from_iter([ + PrimitiveArray::from(vec![1u32, 2, 3, 4]).into_array(), + PrimitiveArray::from(vec![5u32, 6, 7, 8]).into_array(), + ]) + .into_array(); + + let st = StructArray::try_new( + ["strings".into(), "numbers".into()].into(), + vec![strings, numbers], + 8, + Validity::NonNullable, + ) + .unwrap(); + + let filepath = temp_dir.path().join("a.vtx"); + + let f = OpenOptions::new() + .write(true) + .truncate(true) + .create(true) + .open(&filepath) + .await?; + + let writer = FileWriter::new(f); + let writer = writer.write_array_columns(st.into_array()).await?; + writer.finalize().await?; + + let f = tokio::fs::File::open(&filepath).await?; + let file_size = f.metadata().await?.len(); + + let object_store: Arc = Arc::new(LocalFileSystem::new()); + let url = ObjectStoreUrl::local_filesystem(); + + let p = Path::from_filesystem_path(filepath)?; + + let config = VortexTableConfig::new( + Arc::new(Schema::new(vec![ + Field::new("strings", DataType::Utf8, false), + Field::new("numbers", DataType::UInt32, false), + ])), + vec![VortexFile::new(p, file_size)], + ); + + let provider = Arc::new(VortexFileTableProvider::try_new(url, config)?); + + let ctx = SessionContext::new(); + ctx.register_table("vortex_tbl", Arc::clone(&provider) as _)?; + + let url = Url::try_from("file://").unwrap(); + ctx.register_object_store(&url, object_store); + + run_query(&ctx, "SELECT * FROM vortex_tbl").await?; + + Ok(()) +} + +async fn run_query(ctx: &SessionContext, query_string: impl AsRef) -> anyhow::Result<()> { + let query_string = query_string.as_ref(); + + ctx.sql(&format!("EXPLAIN {query_string}")) + .await? + .show() + .await?; + + ctx.sql(query_string).await?.show().await?; + + Ok(()) +} diff --git a/vortex-datafusion/src/lib.rs b/vortex-datafusion/src/lib.rs index d3f8e23d5a..fee7fdf6ca 100644 --- a/vortex-datafusion/src/lib.rs +++ b/vortex-datafusion/src/lib.rs @@ -11,27 +11,20 @@ use std::task::{Context, Poll}; use arrow_array::{RecordBatch, StructArray as ArrowStructArray}; use arrow_schema::{DataType, SchemaRef}; -use async_trait::async_trait; -use datafusion::dataframe::DataFrame; -use datafusion::datasource::TableProvider; -use datafusion::execution::context::SessionState; use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; -use datafusion::prelude::SessionContext; +use datafusion::prelude::{DataFrame, SessionContext}; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion_common::{exec_datafusion_err, DataFusionError, Result as DFResult}; -use datafusion_expr::{Expr, Operator, TableProviderFilterPushDown, TableType}; -use datafusion_physical_expr::EquivalenceProperties; -use datafusion_physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties, -}; +use datafusion_expr::{Expr, Operator}; +use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; use futures::Stream; use itertools::Itertools; +use memory::{VortexMemTable, VortexMemTableOptions}; use vortex::array::chunked::ChunkedArray; use vortex::{Array, ArrayDType, IntoArrayVariant, IntoCanonical}; -use vortex_dtype::DType; -use crate::datatype::infer_schema; -use crate::plans::{RowSelectorExec, TakeRowsExec}; +pub mod memory; +pub mod persistent; mod datatype; mod eval; @@ -47,25 +40,17 @@ const SUPPORTED_BINARY_OPS: &[Operator] = &[ Operator::LtEq, ]; -/// Optional configurations to pass when loading a [VortexMemTable]. -#[derive(Debug, Clone)] -pub struct VortexMemTableOptions { - pub enable_pushdown: bool, -} - -impl Default for VortexMemTableOptions { - fn default() -> Self { - Self { - enable_pushdown: true, - } - } -} - -impl VortexMemTableOptions { - pub fn with_pushdown(mut self, enable_pushdown: bool) -> Self { - self.enable_pushdown = enable_pushdown; - self - } +fn supported_data_types(dt: DataType) -> bool { + dt.is_integer() + || dt.is_signed_integer() + || dt.is_floating() + || dt.is_null() + || dt == DataType::Boolean + || dt == DataType::Binary + || dt == DataType::Utf8 + || dt == DataType::Binary + || dt == DataType::BinaryView + || dt == DataType::Utf8View } pub trait SessionContextExt { @@ -96,7 +81,7 @@ impl SessionContextExt for SessionContext { options: VortexMemTableOptions, ) -> DFResult<()> { assert!( - matches!(array.dtype(), DType::Struct(_, _)), + array.dtype().is_struct(), "Vortex arrays must have struct type" ); @@ -111,7 +96,7 @@ impl SessionContextExt for SessionContext { options: VortexMemTableOptions, ) -> DFResult { assert!( - matches!(array.dtype(), DType::Struct(_, _)), + array.dtype().is_struct(), "Vortex arrays must have struct type" ); @@ -121,203 +106,6 @@ impl SessionContextExt for SessionContext { } } -/// A [`TableProvider`] that exposes an existing Vortex Array to the DataFusion SQL engine. -/// -/// Only arrays that have a top-level [struct type](vortex_dtype::StructDType) can be exposed as -/// a table to DataFusion. -#[derive(Debug, Clone)] -pub struct VortexMemTable { - array: ChunkedArray, - schema_ref: SchemaRef, - options: VortexMemTableOptions, -} - -impl VortexMemTable { - /// Build a new table provider from an existing [struct type](vortex_dtype::StructDType) array. - /// - /// # Panics - /// - /// Creation will panic if the provided array is not of `DType::Struct` type. - pub fn new(array: Array, options: VortexMemTableOptions) -> Self { - let arrow_schema = infer_schema(array.dtype()); - let schema_ref = SchemaRef::new(arrow_schema); - - let array = match ChunkedArray::try_from(&array) { - Ok(a) => a, - _ => { - let dtype = array.dtype().clone(); - ChunkedArray::try_new(vec![array], dtype).unwrap() - } - }; - - Self { - array, - schema_ref, - options, - } - } -} - -#[async_trait] -impl TableProvider for VortexMemTable { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> SchemaRef { - Arc::clone(&self.schema_ref) - } - - fn table_type(&self) -> TableType { - TableType::Base - } - - /// Plan an array scan. - /// - /// Currently, projection pushdown is supported, but not filter pushdown. - /// The array is flattened directly into the nearest Arrow-compatible encoding. - async fn scan( - &self, - state: &SessionState, - projection: Option<&Vec>, - filters: &[Expr], - _limit: Option, - ) -> DFResult> { - fn get_filter_projection(exprs: &[Expr], schema: SchemaRef) -> Vec { - let referenced_columns: HashSet = - exprs.iter().flat_map(get_column_references).collect(); - - let projection: Vec = referenced_columns - .iter() - .map(|col_name| schema.column_with_name(col_name).unwrap().0) - .sorted() - .collect(); - - projection - } - - let filter_exprs = if filters.is_empty() { - None - } else { - Some(filters) - }; - - let output_projection: Vec = match projection { - None => (0..self.schema_ref.fields().len()).collect(), - Some(proj) => proj.clone(), - }; - - match filter_exprs { - // If there is a filter expression, we execute in two phases, first performing a filter - // on the input to get back row indices, and then taking the remaining struct columns - // using the calculated indices from the filter. - Some(filter_exprs) => { - let filter_projection = - get_filter_projection(filter_exprs, self.schema_ref.clone()); - - Ok(make_filter_then_take_plan( - self.schema_ref.clone(), - filter_exprs, - filter_projection, - self.array.clone(), - output_projection.clone(), - state, - )) - } - - // If no filters were pushed down, we materialize the entire StructArray into a - // RecordBatch and let DataFusion process the entire query. - _ => { - let output_schema = Arc::new( - self.schema_ref - .project(output_projection.as_slice()) - .expect("project output schema"), - ); - let plan_properties = PlanProperties::new( - EquivalenceProperties::new(output_schema), - // non-pushdown scans execute in single partition, where the partition - // yields one RecordBatch per chunk in the input ChunkedArray - Partitioning::UnknownPartitioning(1), - ExecutionMode::Bounded, - ); - - Ok(Arc::new(VortexScanExec { - array: self.array.clone(), - scan_projection: output_projection.clone(), - plan_properties, - })) - } - } - } - - fn supports_filters_pushdown( - &self, - filters: &[&Expr], - ) -> DFResult> { - // In the case the caller has configured this provider with filter pushdown disabled, - // do not attempt to apply any filters at scan time. - if !self.options.enable_pushdown { - return Ok(filters - .iter() - .map(|_| TableProviderFilterPushDown::Unsupported) - .collect()); - } - - filters - .iter() - .map(|expr| { - if can_be_pushed_down(expr) { - Ok(TableProviderFilterPushDown::Exact) - } else { - Ok(TableProviderFilterPushDown::Unsupported) - } - }) - .try_collect() - } -} - -/// Construct an operator plan that executes in two stages. -/// -/// The first plan stage only materializes the columns related to the provided set of filter -/// expressions. It evaluates the filters into a row selection. -/// -/// The second stage receives the row selection above and dispatches a `take` on the remaining -/// columns. -fn make_filter_then_take_plan( - schema: SchemaRef, - filter_exprs: &[Expr], - filter_projection: Vec, - chunked_array: ChunkedArray, - output_projection: Vec, - _session_state: &SessionState, -) -> Arc { - let row_selector_op = Arc::new(RowSelectorExec::new( - filter_exprs, - filter_projection, - &chunked_array, - )); - - Arc::new(TakeRowsExec::new( - schema.clone(), - &output_projection, - row_selector_op.clone(), - &chunked_array, - )) -} - -fn supported_data_types(dt: DataType) -> bool { - dt.is_integer() - || dt.is_signed_integer() - || dt.is_floating() - || dt.is_null() - || dt == DataType::Boolean - || dt == DataType::Binary - || dt == DataType::Utf8 - || dt == DataType::Binary - || dt == DataType::BinaryView - || dt == DataType::Utf8View -} - fn can_be_pushed_down(expr: &Expr) -> bool { match expr { Expr::BinaryExpr(expr) @@ -331,6 +119,19 @@ fn can_be_pushed_down(expr: &Expr) -> bool { } } +fn get_filter_projection(exprs: &[Expr], schema: SchemaRef) -> Vec { + let referenced_columns: HashSet = + exprs.iter().flat_map(get_column_references).collect(); + + let projection: Vec = referenced_columns + .iter() + .map(|col_name| schema.column_with_name(col_name).unwrap().0) + .sorted() + .collect(); + + projection +} + /// Extract out the columns from our table referenced by the expression. fn get_column_references(expr: &Expr) -> HashSet { let mut references = HashSet::new(); @@ -475,164 +276,3 @@ impl ExecutionPlan for VortexScanExec { })) } } - -#[cfg(test)] -mod test { - use arrow_array::types::Int64Type; - use datafusion::arrow::array::AsArray; - use datafusion::functions_aggregate::count::count_distinct; - use datafusion::prelude::SessionContext; - use datafusion_common::{Column, TableReference}; - use datafusion_expr::{and, col, lit, BinaryExpr, Expr, Operator}; - use vortex::array::primitive::PrimitiveArray; - use vortex::array::struct_::StructArray; - use vortex::array::varbin::VarBinArray; - use vortex::validity::Validity; - use vortex::{Array, IntoArray}; - use vortex_dtype::{DType, Nullability}; - - use crate::{can_be_pushed_down, SessionContextExt, VortexMemTableOptions}; - - fn presidents_array() -> Array { - let names = VarBinArray::from_vec( - vec![ - "Washington", - "Adams", - "Jefferson", - "Madison", - "Monroe", - "Adams", - ], - DType::Utf8(Nullability::NonNullable), - ); - let term_start = PrimitiveArray::from_vec( - vec![1789u16, 1797, 1801, 1809, 1817, 1825], - Validity::NonNullable, - ); - - StructArray::from_fields(&[ - ("president", names.into_array()), - ("term_start", term_start.into_array()), - ]) - .into_array() - } - - #[tokio::test] - #[cfg_attr(miri, ignore)] - async fn test_datafusion_pushdown() { - let ctx = SessionContext::new(); - - let df = ctx.read_vortex(presidents_array()).unwrap(); - - let distinct_names = df - .filter(col("term_start").gt_eq(lit(1795))) - .unwrap() - .aggregate(vec![], vec![count_distinct(col("president"))]) - .unwrap() - .collect() - .await - .unwrap(); - - assert_eq!(distinct_names.len(), 1); - - assert_eq!( - *distinct_names[0] - .column(0) - .as_primitive::() - .values() - .first() - .unwrap(), - 4i64 - ); - } - - #[tokio::test] - #[cfg_attr(miri, ignore)] - async fn test_datafusion_no_pushdown() { - let ctx = SessionContext::new(); - - let df = ctx - .read_vortex_opts( - presidents_array(), - // Disable pushdown. We run this test to make sure that the naive codepath also - // produces correct results and does not panic anywhere. - VortexMemTableOptions::default().with_pushdown(false), - ) - .unwrap(); - - let distinct_names = df - .filter(col("term_start").gt_eq(lit(1795))) - .unwrap() - .filter(col("term_start").lt(lit(2000))) - .unwrap() - .aggregate(vec![], vec![count_distinct(col("president"))]) - .unwrap() - .collect() - .await - .unwrap(); - - assert_eq!(distinct_names.len(), 1); - - assert_eq!( - *distinct_names[0] - .column(0) - .as_primitive::() - .values() - .first() - .unwrap(), - 4i64 - ); - } - - #[test] - fn test_can_be_pushed_down0() { - let e = BinaryExpr { - left: Box::new( - Column { - relation: Some(TableReference::Bare { - table: "orders".into(), - }), - name: "o_orderstatus".to_string(), - } - .into(), - ), - op: Operator::Eq, - right: Box::new(lit("F")), - }; - let e = Expr::BinaryExpr(e); - - assert!(can_be_pushed_down(&e)); - } - - #[test] - fn test_can_be_pushed_down1() { - let e = lit("hello"); - - assert!(can_be_pushed_down(&e)); - } - - #[test] - fn test_can_be_pushed_down2() { - let e = lit(3); - - assert!(can_be_pushed_down(&e)); - } - - #[test] - fn test_can_be_pushed_down3() { - let e = BinaryExpr { - left: Box::new(col("nums")), - op: Operator::Modulo, - right: Box::new(lit(5)), - }; - let e = Expr::BinaryExpr(e); - - assert!(!can_be_pushed_down(&e)); - } - - #[test] - fn test_can_be_pushed_down4() { - let e = and((col("a")).eq(lit(2u64)), col("b").eq(lit(true))); - assert!(can_be_pushed_down(&e)); - } -} diff --git a/vortex-datafusion/src/memory.rs b/vortex-datafusion/src/memory.rs new file mode 100644 index 0000000000..abe924c873 --- /dev/null +++ b/vortex-datafusion/src/memory.rs @@ -0,0 +1,427 @@ +use std::any::Any; +use std::sync::Arc; + +use arrow_schema::SchemaRef; +use async_trait::async_trait; +use datafusion::datasource::TableProvider; +use datafusion::execution::context::SessionState; +use datafusion::prelude::*; +use datafusion_common::Result as DFResult; +use datafusion_expr::{TableProviderFilterPushDown, TableType}; +use datafusion_physical_expr::EquivalenceProperties; +use datafusion_physical_plan::{ExecutionMode, ExecutionPlan, Partitioning, PlanProperties}; +use itertools::Itertools; +use vortex::array::chunked::ChunkedArray; +use vortex::{Array, ArrayDType as _}; +use vortex_dtype::DType; + +use crate::datatype::infer_schema; +use crate::plans::{RowSelectorExec, TakeRowsExec}; +use crate::{can_be_pushed_down, get_filter_projection, VortexScanExec}; + +/// A [`TableProvider`] that exposes an existing Vortex Array to the DataFusion SQL engine. +/// +/// Only arrays that have a top-level [struct type](vortex_dtype::StructDType) can be exposed as +/// a table to DataFusion. +#[derive(Debug, Clone)] +pub struct VortexMemTable { + array: ChunkedArray, + schema_ref: SchemaRef, + options: VortexMemTableOptions, +} + +impl VortexMemTable { + /// Build a new table provider from an existing [struct type](vortex_dtype::StructDType) array. + /// + /// # Panics + /// + /// Creation will panic if the provided array is not of `DType::Struct` type. + pub fn new(array: Array, options: VortexMemTableOptions) -> Self { + let arrow_schema = infer_schema(array.dtype()); + let schema_ref = SchemaRef::new(arrow_schema); + + let array = match ChunkedArray::try_from(&array) { + Ok(a) => a, + _ => { + let dtype = array.dtype().clone(); + ChunkedArray::try_new(vec![array], dtype).unwrap() + } + }; + + Self { + array, + schema_ref, + options, + } + } +} + +#[async_trait] +impl TableProvider for VortexMemTable { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema_ref) + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + /// Plan an array scan. + /// + /// Currently, projection pushdown is supported, but not filter pushdown. + /// The array is flattened directly into the nearest Arrow-compatible encoding. + async fn scan( + &self, + state: &SessionState, + projection: Option<&Vec>, + filters: &[Expr], + _limit: Option, + ) -> DFResult> { + let filter_exprs = if filters.is_empty() { + None + } else { + Some(filters) + }; + + let output_projection: Vec = match projection { + None => (0..self.schema_ref.fields().len()).collect(), + Some(proj) => proj.clone(), + }; + + match filter_exprs { + // If there is a filter expression, we execute in two phases, first performing a filter + // on the input to get back row indices, and then taking the remaining struct columns + // using the calculated indices from the filter. + Some(filter_exprs) => { + let filter_projection = + get_filter_projection(filter_exprs, self.schema_ref.clone()); + + Ok(make_filter_then_take_plan( + self.schema_ref.clone(), + filter_exprs, + filter_projection, + self.array.clone(), + output_projection.clone(), + state, + )) + } + + // If no filters were pushed down, we materialize the entire StructArray into a + // RecordBatch and let DataFusion process the entire query. + _ => { + let output_schema = Arc::new( + self.schema_ref + .project(output_projection.as_slice()) + .expect("project output schema"), + ); + let plan_properties = PlanProperties::new( + EquivalenceProperties::new(output_schema), + // non-pushdown scans execute in single partition, where the partition + // yields one RecordBatch per chunk in the input ChunkedArray + Partitioning::UnknownPartitioning(1), + ExecutionMode::Bounded, + ); + + Ok(Arc::new(VortexScanExec { + array: self.array.clone(), + scan_projection: output_projection.clone(), + plan_properties, + })) + } + } + } + + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> DFResult> { + // In the case the caller has configured this provider with filter pushdown disabled, + // do not attempt to apply any filters at scan time. + if !self.options.enable_pushdown { + return Ok(filters + .iter() + .map(|_| TableProviderFilterPushDown::Unsupported) + .collect()); + } + + filters + .iter() + .map(|expr| { + if can_be_pushed_down(expr) { + Ok(TableProviderFilterPushDown::Exact) + } else { + Ok(TableProviderFilterPushDown::Unsupported) + } + }) + .try_collect() + } +} + +/// Optional configurations to pass when loading a [VortexMemTable]. +#[derive(Debug, Clone)] +pub struct VortexMemTableOptions { + pub enable_pushdown: bool, +} + +impl Default for VortexMemTableOptions { + fn default() -> Self { + Self { + enable_pushdown: true, + } + } +} + +impl VortexMemTableOptions { + pub fn with_pushdown(mut self, enable_pushdown: bool) -> Self { + self.enable_pushdown = enable_pushdown; + self + } +} + +pub trait SessionContextExt { + fn register_vortex>(&self, name: S, array: Array) -> DFResult<()> { + self.register_vortex_opts(name, array, VortexMemTableOptions::default()) + } + + fn register_vortex_opts>( + &self, + name: S, + array: Array, + options: VortexMemTableOptions, + ) -> DFResult<()>; + + fn read_vortex(&self, array: Array) -> DFResult { + self.read_vortex_opts(array, VortexMemTableOptions::default()) + } + + fn read_vortex_opts(&self, array: Array, options: VortexMemTableOptions) + -> DFResult; +} + +impl SessionContextExt for SessionContext { + fn register_vortex_opts>( + &self, + name: S, + array: Array, + options: VortexMemTableOptions, + ) -> DFResult<()> { + assert!( + matches!(array.dtype(), DType::Struct(_, _)), + "Vortex arrays must have struct type" + ); + + let vortex_table = VortexMemTable::new(array, options); + self.register_table(name.as_ref(), Arc::new(vortex_table)) + .map(|_| ()) + } + + fn read_vortex_opts( + &self, + array: Array, + options: VortexMemTableOptions, + ) -> DFResult { + assert!( + matches!(array.dtype(), DType::Struct(_, _)), + "Vortex arrays must have struct type" + ); + + let vortex_table = VortexMemTable::new(array, options); + + self.read_table(Arc::new(vortex_table)) + } +} + +/// Construct an operator plan that executes in two stages. +/// +/// The first plan stage only materializes the columns related to the provided set of filter +/// expressions. It evaluates the filters into a row selection. +/// +/// The second stage receives the row selection above and dispatches a `take` on the remaining +/// columns. +fn make_filter_then_take_plan( + schema: SchemaRef, + filter_exprs: &[Expr], + filter_projection: Vec, + chunked_array: ChunkedArray, + output_projection: Vec, + _session_state: &SessionState, +) -> Arc { + let row_selector_op = Arc::new(RowSelectorExec::new( + filter_exprs, + filter_projection, + &chunked_array, + )); + + Arc::new(TakeRowsExec::new( + schema.clone(), + &output_projection, + row_selector_op.clone(), + &chunked_array, + )) +} + +#[cfg(test)] +mod test { + use arrow_array::cast::AsArray as _; + use arrow_array::types::Int64Type; + use datafusion::functions_aggregate::count::count_distinct; + use datafusion::prelude::SessionContext; + use datafusion_common::{Column, TableReference}; + use datafusion_expr::{and, col, lit, BinaryExpr, Expr, Operator}; + use vortex::array::primitive::PrimitiveArray; + use vortex::array::struct_::StructArray; + use vortex::array::varbin::VarBinArray; + use vortex::validity::Validity; + use vortex::{Array, IntoArray}; + use vortex_dtype::{DType, Nullability}; + + use crate::can_be_pushed_down; + use crate::memory::{SessionContextExt as _, VortexMemTableOptions}; + + fn presidents_array() -> Array { + let names = VarBinArray::from_vec( + vec![ + "Washington", + "Adams", + "Jefferson", + "Madison", + "Monroe", + "Adams", + ], + DType::Utf8(Nullability::NonNullable), + ); + let term_start = PrimitiveArray::from_vec( + vec![1789u16, 1797, 1801, 1809, 1817, 1825], + Validity::NonNullable, + ); + + StructArray::from_fields(&[ + ("president", names.into_array()), + ("term_start", term_start.into_array()), + ]) + .into_array() + } + + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn test_datafusion_pushdown() { + let ctx = SessionContext::new(); + + let df = ctx.read_vortex(presidents_array()).unwrap(); + + let distinct_names = df + .filter(col("term_start").gt_eq(lit(1795))) + .unwrap() + .aggregate(vec![], vec![count_distinct(col("president"))]) + .unwrap() + .collect() + .await + .unwrap(); + + assert_eq!(distinct_names.len(), 1); + + assert_eq!( + *distinct_names[0] + .column(0) + .as_primitive::() + .values() + .first() + .unwrap(), + 4i64 + ); + } + + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn test_datafusion_no_pushdown() { + let ctx = SessionContext::new(); + + let df = ctx + .read_vortex_opts( + presidents_array(), + // Disable pushdown. We run this test to make sure that the naive codepath also + // produces correct results and does not panic anywhere. + VortexMemTableOptions::default().with_pushdown(false), + ) + .unwrap(); + + let distinct_names = df + .filter(col("term_start").gt_eq(lit(1795))) + .unwrap() + .filter(col("term_start").lt(lit(2000))) + .unwrap() + .aggregate(vec![], vec![count_distinct(col("president"))]) + .unwrap() + .collect() + .await + .unwrap(); + + assert_eq!(distinct_names.len(), 1); + + assert_eq!( + *distinct_names[0] + .column(0) + .as_primitive::() + .values() + .first() + .unwrap(), + 4i64 + ); + } + + #[test] + fn test_can_be_pushed_down0() { + let e = BinaryExpr { + left: Box::new( + Column { + relation: Some(TableReference::Bare { + table: "orders".into(), + }), + name: "o_orderstatus".to_string(), + } + .into(), + ), + op: Operator::Eq, + right: Box::new(lit("F")), + }; + let e = Expr::BinaryExpr(e); + + assert!(can_be_pushed_down(&e)); + } + + #[test] + fn test_can_be_pushed_down1() { + let e = lit("hello"); + + assert!(can_be_pushed_down(&e)); + } + + #[test] + fn test_can_be_pushed_down2() { + let e = lit(3); + + assert!(can_be_pushed_down(&e)); + } + + #[test] + fn test_can_be_pushed_down3() { + let e = BinaryExpr { + left: Box::new(col("nums")), + op: Operator::Modulo, + right: Box::new(lit(5)), + }; + let e = Expr::BinaryExpr(e); + + assert!(!can_be_pushed_down(&e)); + } + + #[test] + fn test_can_be_pushed_down4() { + let e = and((col("a")).eq(lit(2u64)), col("b").eq(lit(true))); + assert!(can_be_pushed_down(&e)); + } +} diff --git a/vortex-datafusion/src/persistent/config.rs b/vortex-datafusion/src/persistent/config.rs new file mode 100644 index 0000000000..f60e7ed943 --- /dev/null +++ b/vortex-datafusion/src/persistent/config.rs @@ -0,0 +1,44 @@ +use arrow_schema::SchemaRef; +use chrono::TimeZone as _; +use datafusion::datasource::listing::PartitionedFile; +use object_store::path::Path; +use object_store::ObjectMeta; + +#[derive(Clone)] +pub struct VortexFile { + pub(crate) object_meta: ObjectMeta, +} + +impl From for PartitionedFile { + fn from(value: VortexFile) -> Self { + PartitionedFile::new(value.object_meta.location, value.object_meta.size as u64) + } +} + +impl VortexFile { + pub fn new(path: impl Into, size: u64) -> Self { + Self { + object_meta: ObjectMeta { + location: Path::from(path.into()), + last_modified: chrono::Utc.timestamp_nanos(0), + size: size as usize, + e_tag: None, + version: None, + }, + } + } +} + +pub struct VortexTableConfig { + pub(crate) data_files: Vec, + pub(crate) schema: Option, +} + +impl VortexTableConfig { + pub fn new(schema: SchemaRef, data_files: Vec) -> Self { + Self { + data_files, + schema: Some(schema), + } + } +} diff --git a/vortex-datafusion/src/persistent/execution.rs b/vortex-datafusion/src/persistent/execution.rs new file mode 100644 index 0000000000..51f8f84003 --- /dev/null +++ b/vortex-datafusion/src/persistent/execution.rs @@ -0,0 +1,103 @@ +use std::fmt; +use std::sync::Arc; + +use datafusion::datasource::physical_plan::{FileScanConfig, FileStream}; +use datafusion_common::Result as DFResult; +use datafusion_execution::{SendableRecordBatchStream, TaskContext}; +use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr}; +use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; +use datafusion_physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, PlanProperties, +}; + +use crate::persistent::opener::VortexFileOpener; + +#[derive(Debug)] +pub struct VortexExec { + file_scan_config: FileScanConfig, + metrics: ExecutionPlanMetricsSet, + predicate: Option>, + plan_properties: PlanProperties, + projection: Option>, +} + +impl VortexExec { + pub fn try_new( + file_scan_config: FileScanConfig, + metrics: ExecutionPlanMetricsSet, + projection: Option<&Vec>, + predicate: Option>, + ) -> DFResult { + let partitioning = Partitioning::UnknownPartitioning(1); + let plan_properties = PlanProperties::new( + EquivalenceProperties::new(file_scan_config.file_schema.clone()), + partitioning, + ExecutionMode::Bounded, + ); + let projection = projection.cloned(); + + Ok(Self { + file_scan_config, + metrics, + predicate, + projection, + plan_properties, + }) + } + pub(crate) fn into_arc(self) -> Arc { + Arc::new(self) as _ + } +} + +impl DisplayAs for VortexExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "VortexExec: ")?; + self.file_scan_config.fmt_as(t, f)?; + + Ok(()) + } +} + +impl ExecutionPlan for VortexExec { + fn name(&self) -> &str { + "VortexExec" + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.plan_properties + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> DFResult> { + Ok(self) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> DFResult { + let object_store = context + .runtime_env() + .object_store(&self.file_scan_config.object_store_url)?; + let opener = VortexFileOpener { + object_store, + projection: self.projection.clone(), + batch_size: None, + predicate: self.predicate.clone(), + }; + let stream = FileStream::new(&self.file_scan_config, partition, opener, &self.metrics)?; + + Ok(Box::pin(stream)) + } +} diff --git a/vortex-datafusion/src/persistent/mod.rs b/vortex-datafusion/src/persistent/mod.rs new file mode 100644 index 0000000000..bed3bd9d43 --- /dev/null +++ b/vortex-datafusion/src/persistent/mod.rs @@ -0,0 +1,4 @@ +pub mod config; +pub mod execution; +pub mod opener; +pub mod provider; diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs new file mode 100644 index 0000000000..1ce621e026 --- /dev/null +++ b/vortex-datafusion/src/persistent/opener.rs @@ -0,0 +1,59 @@ +use std::sync::Arc; + +use arrow_array::cast::as_struct_array; +use arrow_array::RecordBatch; +use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener}; +use datafusion_common::Result as DFResult; +use datafusion_physical_expr::PhysicalExpr; +use futures::{FutureExt as _, TryStreamExt}; +use object_store::ObjectStore; +use vortex::IntoCanonical; +use vortex_serde::file::reader::projections::Projection; +use vortex_serde::file::reader::VortexBatchReaderBuilder; +use vortex_serde::io::ObjectStoreReadAt; + +pub struct VortexFileOpener { + pub object_store: Arc, + pub batch_size: Option, + pub projection: Option>, + pub predicate: Option>, +} + +impl FileOpener for VortexFileOpener { + fn open(&self, file_meta: FileMeta) -> DFResult { + let read_at = + ObjectStoreReadAt::new(self.object_store.clone(), file_meta.location().clone()); + + let mut builder = VortexBatchReaderBuilder::new(read_at); + + if let Some(batch_size) = self.batch_size { + builder = builder.with_batch_size(batch_size); + } + + if let Some(_predicate) = self.predicate.as_ref() { + log::warn!("Missing logic to turn a physical expression into a RowFilter"); + } + + if let Some(projection) = self.projection.as_ref() { + builder = builder.with_projection(Projection::new(projection)) + } + + Ok(async move { + let reader = builder.build().await?; + + let stream = reader + .map_ok(|array| { + let arrow = array + .into_canonical() + .expect("struct arrays must canonicalize") + .into_arrow(); + let struct_array = as_struct_array(arrow.as_ref()); + RecordBatch::from(struct_array) + }) + .map_err(|e| e.into()); + + Ok(Box::pin(stream) as _) + } + .boxed()) + } +} diff --git a/vortex-datafusion/src/persistent/provider.rs b/vortex-datafusion/src/persistent/provider.rs new file mode 100644 index 0000000000..460d350803 --- /dev/null +++ b/vortex-datafusion/src/persistent/provider.rs @@ -0,0 +1,92 @@ +use std::any::Any; +use std::sync::Arc; + +use arrow_schema::SchemaRef; +use async_trait::async_trait; +use datafusion::datasource::physical_plan::FileScanConfig; +use datafusion::datasource::TableProvider; +use datafusion::execution::context::SessionState; +use datafusion_common::{DFSchema, Result as DFResult, Statistics}; +use datafusion_execution::object_store::ObjectStoreUrl; +use datafusion_expr::utils::conjunction; +use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType}; +use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; +use datafusion_physical_plan::ExecutionPlan; + +use super::config::VortexTableConfig; +use crate::persistent::execution::VortexExec; + +pub struct VortexFileTableProvider { + schema_ref: SchemaRef, + object_store_url: ObjectStoreUrl, + config: VortexTableConfig, +} + +impl VortexFileTableProvider { + pub fn try_new(object_store_url: ObjectStoreUrl, config: VortexTableConfig) -> DFResult { + Ok(Self { + schema_ref: config.schema.clone().unwrap(), + object_store_url, + config, + }) + } +} + +#[async_trait] +impl TableProvider for VortexFileTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema_ref) + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + state: &SessionState, + projection: Option<&Vec>, + filters: &[Expr], + _limit: Option, + ) -> DFResult> { + let df_schema = DFSchema::try_from(self.schema())?; + let predicate = conjunction(filters.to_vec()); + let predicate = predicate + .map(|predicate| state.create_physical_expr(predicate, &df_schema)) + .transpose()?; + + let metrics = ExecutionPlanMetricsSet::new(); + + // TODO: Point at some files and/or ranges + let file_scan_config = FileScanConfig::new(self.object_store_url.clone(), self.schema()) + .with_file_group( + self.config + .data_files + .iter() + .cloned() + .map(|f| f.into()) + .collect(), + ) + .with_projection(projection.cloned()); + + let exec = + VortexExec::try_new(file_scan_config, metrics, projection, predicate)?.into_arc(); + + Ok(exec) + } + + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> DFResult> { + Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()]) + } + + fn statistics(&self) -> Option { + None + } +} diff --git a/vortex-error/Cargo.toml b/vortex-error/Cargo.toml index a36a426160..97fbf6f3f6 100644 --- a/vortex-error/Cargo.toml +++ b/vortex-error/Cargo.toml @@ -15,8 +15,12 @@ rust-version = { workspace = true } name = "vortex_error" path = "src/lib.rs" +[features] +datafusion = ["datafusion-common"] + [dependencies] arrow-schema = { workspace = true } +datafusion-common = { workspace = true, optional = true } flatbuffers = { workspace = true } flexbuffers = { workspace = true, optional = true } object_store = { workspace = true, optional = true } diff --git a/vortex-error/src/lib.rs b/vortex-error/src/lib.rs index 505563d3b1..f9d44e74fa 100644 --- a/vortex-error/src/lib.rs +++ b/vortex-error/src/lib.rs @@ -194,6 +194,23 @@ macro_rules! vortex_bail { }; } +#[cfg(feature = "datafusion")] +impl From for datafusion_common::DataFusionError { + fn from(value: VortexError) -> Self { + Self::External(Box::new(value)) + } +} + +#[cfg(feature = "datafusion")] +impl From for datafusion_common::arrow::error::ArrowError { + fn from(value: VortexError) -> Self { + match value { + VortexError::ArrowError(e) => e, + _ => Self::from_external_error(Box::new(value)), + } + } +} + // Not public, referenced by macros only. #[doc(hidden)] pub mod __private { diff --git a/vortex-serde/src/file/reader/filtering.rs b/vortex-serde/src/file/reader/filtering.rs index d66cbf499e..e3ddc3347f 100644 --- a/vortex-serde/src/file/reader/filtering.rs +++ b/vortex-serde/src/file/reader/filtering.rs @@ -3,7 +3,7 @@ use vortex_error::VortexResult; use super::projections::Projection; -pub trait FilteringPredicate { +pub trait FilteringPredicate: Send + Sync { fn projection(&self) -> &Projection; fn evaluate(&mut self, array: &Array) -> VortexResult; } diff --git a/vortex-serde/src/file/reader/mod.rs b/vortex-serde/src/file/reader/mod.rs index f6a4905bdf..618e3ee06f 100644 --- a/vortex-serde/src/file/reader/mod.rs +++ b/vortex-serde/src/file/reader/mod.rs @@ -299,16 +299,19 @@ impl Stream for VortexBatchStream { batch = filter(&batch, ¤t_predicate)?; - let projected = match &self.projection { - Projection::All => batch, - Projection::Partial(indices) => StructArray::try_from(batch.clone()) - .unwrap() - .project(indices.as_ref()) - .unwrap() - .into_array(), + let projected = { + let array = match &self.projection { + Projection::All => batch, + Projection::Partial(indices) => { + StructArray::try_from(batch.clone())? + .project(indices.as_ref())? + .into_array() + } + }; + Ok(array) }; - return Poll::Ready(Some(Ok(projected))); + return Poll::Ready(Some(projected)); } None => { diff --git a/vortex-serde/src/io/object_store.rs b/vortex-serde/src/io/object_store.rs index cc03b16136..5306ffd7df 100644 --- a/vortex-serde/src/io/object_store.rs +++ b/vortex-serde/src/io/object_store.rs @@ -1,8 +1,7 @@ -#![cfg(feature = "object_store")] - use std::future::Future; use std::io::Cursor; use std::ops::Range; +use std::sync::Arc; use std::{io, mem}; use bytes::BytesMut; @@ -29,7 +28,7 @@ pub trait ObjectStoreExt { ) -> impl Future>; } -impl ObjectStoreExt for O { +impl ObjectStoreExt for Arc { async fn vortex_read( &self, location: &Path, @@ -40,7 +39,7 @@ impl ObjectStoreExt for O { } fn vortex_reader(&self, location: &Path) -> impl VortexReadAt { - ObjectStoreReadAt::new(self, location) + ObjectStoreReadAt::new(self.clone(), location.clone()) } async fn vortex_writer(&self, location: &Path) -> VortexResult { @@ -51,13 +50,13 @@ impl ObjectStoreExt for O { } } -pub struct ObjectStoreReadAt<'a, 'b, O: ObjectStore> { - object_store: &'a O, - location: &'b Path, +pub struct ObjectStoreReadAt { + object_store: Arc, + location: Path, } -impl<'a, 'b, O: ObjectStore> ObjectStoreReadAt<'a, 'b, O> { - pub fn new(object_store: &'a O, location: &'b Path) -> Self { +impl ObjectStoreReadAt { + pub fn new(object_store: Arc, location: Path) -> Self { Self { object_store, location, @@ -65,19 +64,19 @@ impl<'a, 'b, O: ObjectStore> ObjectStoreReadAt<'a, 'b, O> { } } -impl<'a, 'b, O: ObjectStore> VortexReadAt for ObjectStoreReadAt<'a, 'b, O> { +impl VortexReadAt for ObjectStoreReadAt { async fn read_at_into(&self, pos: u64, mut buffer: BytesMut) -> io::Result { let start_range = pos as usize; let bytes = self .object_store - .get_range(self.location, start_range..(start_range + buffer.len())) + .get_range(&self.location, start_range..(start_range + buffer.len())) .await?; buffer.as_mut().copy_from_slice(bytes.as_ref()); Ok(buffer) } async fn size(&self) -> u64 { - self.object_store.head(self.location).await.unwrap().size as u64 + self.object_store.head(&self.location).await.unwrap().size as u64 } }