Skip to content

Commit

Permalink
some simplification
Browse files Browse the repository at this point in the history
  • Loading branch information
AdamGS committed Aug 8, 2024
1 parent cc59175 commit 6709da8
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 36 deletions.
12 changes: 5 additions & 7 deletions bench-vortex/src/tpch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ use std::fs;
use std::path::Path;
use std::sync::Arc;

use arrow_array::StructArray;
use arrow_array::StructArray as ArrowStructArray;
use arrow_schema::Schema;
use datafusion::dataframe::DataFrameWriteOptions;
use datafusion::datasource::MemTable;
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::prelude::{CsvReadOptions, ParquetReadOptions, SessionContext};
use tokio::fs::OpenOptions;
use vortex::array::ChunkedArray;
use vortex::array::{ChunkedArray, StructArray};
use vortex::arrow::FromArrowArray;
use vortex::variants::StructArrayTrait;
use vortex::{Array, ArrayDType, IntoArray, IntoArrayVariant};
Expand Down Expand Up @@ -249,7 +249,7 @@ async fn register_vortex_file(
})
.collect::<Vec<_>>();

let data = vortex::array::StructArray::from_fields(&fields).into_array();
let data = StructArray::from_fields(&fields).into_array();

let data = if enable_compression {
let compressor = SamplingCompressor::default();
Expand All @@ -266,17 +266,15 @@ async fn register_vortex_file(
.await?;

let mut writer = LayoutWriter::new(f);

writer = writer.write_array_columns(data).await?;

writer.finalize().await?;

anyhow::Ok(())
},
)
.await?;

let f = tokio::fs::File::options()
let f = OpenOptions::new()
.read(true)
.write(true)
.open(&vtx_file)
Expand Down Expand Up @@ -324,7 +322,7 @@ async fn register_vortex(
let chunks: Vec<Array> = record_batches
.iter()
.cloned()
.map(StructArray::from)
.map(ArrowStructArray::from)
.map(|struct_array| Array::from_arrow(&struct_array, false))
.collect();

Expand Down
8 changes: 0 additions & 8 deletions vortex-datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,21 +80,13 @@ pub trait SessionContextExt {
options: VortexMemTableOptions,
) -> DFResult<DataFrame>;

fn register_disk_vortex<S: AsRef<str>>(&self, name: S, url: ObjectStoreUrl) -> DFResult<()> {
self.register_disk_vortex_opts(name, url, VortexTableOptions::default())
}

fn register_disk_vortex_opts<S: AsRef<str>>(
&self,
name: S,
url: ObjectStoreUrl,
options: VortexTableOptions,
) -> DFResult<()>;

fn read_disk_vortex(&self, url: ObjectStoreUrl) -> DFResult<DataFrame> {
self.read_disk_vortex_opts(url, VortexTableOptions::default())
}

fn read_disk_vortex_opts(
&self,
url: ObjectStoreUrl,
Expand Down
18 changes: 4 additions & 14 deletions vortex-datafusion/src/persistent/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::fmt;
use std::sync::Arc;

use datafusion::datasource::physical_plan::{FileScanConfig, FileStream};
use datafusion_common::Result as DFResult;
use datafusion_common::{project_schema, Result as DFResult};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr};
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
Expand All @@ -18,7 +18,6 @@ pub struct VortexExec {
metrics: ExecutionPlanMetricsSet,
predicate: Option<Arc<dyn PhysicalExpr>>,
plan_properties: PlanProperties,
projection: Option<Vec<usize>>,
}

impl VortexExec {
Expand All @@ -28,26 +27,17 @@ impl VortexExec {
projection: Option<&Vec<usize>>,
predicate: Option<Arc<dyn PhysicalExpr>>,
) -> DFResult<Self> {
let partitioning = Partitioning::UnknownPartitioning(1);

let projected_schema = if let Some(projection) = projection {
Arc::new(file_scan_config.file_schema.clone().project(projection)?)
} else {
file_scan_config.file_schema.clone()
};

let projected_schema = project_schema(&file_scan_config.file_schema, projection)?;
let plan_properties = PlanProperties::new(
EquivalenceProperties::new(projected_schema),
partitioning,
Partitioning::UnknownPartitioning(1),
ExecutionMode::Bounded,
);
let projection = projection.cloned();

Ok(Self {
file_scan_config,
metrics,
predicate,
projection,
plan_properties,
})
}
Expand Down Expand Up @@ -99,7 +89,7 @@ impl ExecutionPlan for VortexExec {
.object_store(&self.file_scan_config.object_store_url)?;
let opener = VortexFileOpener {
object_store,
projection: self.projection.clone(),
projection: self.file_scan_config.projection.clone(),
batch_size: None,
predicate: self.predicate.clone(),
};
Expand Down
14 changes: 7 additions & 7 deletions vortex-datafusion/src/persistent/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ use async_trait::async_trait;
use datafusion::datasource::physical_plan::FileScanConfig;
use datafusion::datasource::TableProvider;
use datafusion::execution::context::SessionState;
use datafusion_common::{DFSchema, Result as DFResult, Statistics};
use datafusion_common::{project_schema, Result as DFResult, Statistics, ToDFSchema};
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_expr::utils::conjunction;
use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
use datafusion_physical_plan::empty::EmptyExec;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_physical_plan::ExecutionPlan;

Expand Down Expand Up @@ -53,13 +54,12 @@ impl TableProvider for VortexFileTableProvider {
filters: &[Expr],
_limit: Option<usize>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
let projected_schema = if let Some(projection) = projection {
Arc::new(self.schema().project(projection)?)
} else {
self.schema()
};
if self.config.data_files.is_empty() {
let projected_schema = project_schema(&self.schema(), projection)?;
return Ok(Arc::new(EmptyExec::new(projected_schema)));
}

let df_schema = DFSchema::try_from(projected_schema.clone())?;
let df_schema = self.schema().to_dfschema()?;
let predicate = conjunction(filters.to_vec());
let predicate = predicate
.map(|predicate| state.create_physical_expr(predicate, &df_schema))
Expand Down

0 comments on commit 6709da8

Please sign in to comment.