Skip to content

Commit

Permalink
some simplification
Browse files Browse the repository at this point in the history
  • Loading branch information
AdamGS committed Aug 8, 2024
1 parent cc59175 commit dac788d
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 29 deletions.
8 changes: 0 additions & 8 deletions vortex-datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,21 +80,13 @@ pub trait SessionContextExt {
options: VortexMemTableOptions,
) -> DFResult<DataFrame>;

fn register_disk_vortex<S: AsRef<str>>(&self, name: S, url: ObjectStoreUrl) -> DFResult<()> {
self.register_disk_vortex_opts(name, url, VortexTableOptions::default())
}

fn register_disk_vortex_opts<S: AsRef<str>>(
&self,
name: S,
url: ObjectStoreUrl,
options: VortexTableOptions,
) -> DFResult<()>;

fn read_disk_vortex(&self, url: ObjectStoreUrl) -> DFResult<DataFrame> {
self.read_disk_vortex_opts(url, VortexTableOptions::default())
}

fn read_disk_vortex_opts(
&self,
url: ObjectStoreUrl,
Expand Down
18 changes: 4 additions & 14 deletions vortex-datafusion/src/persistent/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::fmt;
use std::sync::Arc;

use datafusion::datasource::physical_plan::{FileScanConfig, FileStream};
use datafusion_common::Result as DFResult;
use datafusion_common::{project_schema, Result as DFResult};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr};
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
Expand All @@ -18,7 +18,6 @@ pub struct VortexExec {
metrics: ExecutionPlanMetricsSet,
predicate: Option<Arc<dyn PhysicalExpr>>,
plan_properties: PlanProperties,
projection: Option<Vec<usize>>,
}

impl VortexExec {
Expand All @@ -28,26 +27,17 @@ impl VortexExec {
projection: Option<&Vec<usize>>,
predicate: Option<Arc<dyn PhysicalExpr>>,
) -> DFResult<Self> {
let partitioning = Partitioning::UnknownPartitioning(1);

let projected_schema = if let Some(projection) = projection {
Arc::new(file_scan_config.file_schema.clone().project(projection)?)
} else {
file_scan_config.file_schema.clone()
};

let projected_schema = project_schema(&file_scan_config.file_schema, projection)?;
let plan_properties = PlanProperties::new(
EquivalenceProperties::new(projected_schema),
partitioning,
Partitioning::UnknownPartitioning(1),
ExecutionMode::Bounded,
);
let projection = projection.cloned();

Ok(Self {
file_scan_config,
metrics,
predicate,
projection,
plan_properties,
})
}
Expand Down Expand Up @@ -99,7 +89,7 @@ impl ExecutionPlan for VortexExec {
.object_store(&self.file_scan_config.object_store_url)?;
let opener = VortexFileOpener {
object_store,
projection: self.projection.clone(),
projection: self.file_scan_config.projection.clone(),
batch_size: None,
predicate: self.predicate.clone(),
};
Expand Down
14 changes: 7 additions & 7 deletions vortex-datafusion/src/persistent/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ use async_trait::async_trait;
use datafusion::datasource::physical_plan::FileScanConfig;
use datafusion::datasource::TableProvider;
use datafusion::execution::context::SessionState;
use datafusion_common::{DFSchema, Result as DFResult, Statistics};
use datafusion_common::{project_schema, Result as DFResult, Statistics, ToDFSchema};
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_expr::utils::conjunction;
use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
use datafusion_physical_plan::empty::EmptyExec;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_physical_plan::ExecutionPlan;

Expand Down Expand Up @@ -53,13 +54,12 @@ impl TableProvider for VortexFileTableProvider {
filters: &[Expr],
_limit: Option<usize>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
let projected_schema = if let Some(projection) = projection {
Arc::new(self.schema().project(projection)?)
} else {
self.schema()
};
if self.config.data_files.is_empty() {
let projected_schema = project_schema(&self.schema(), projection)?;
return Ok(Arc::new(EmptyExec::new(projected_schema)));
}

let df_schema = DFSchema::try_from(projected_schema.clone())?;
let df_schema = self.schema().to_dfschema()?;
let predicate = conjunction(filters.to_vec());
let predicate = predicate
.map(|predicate| state.create_physical_expr(predicate, &df_schema))
Expand Down

0 comments on commit dac788d

Please sign in to comment.