Skip to content

Commit

Permalink
Report stats to DataFusion (#1506)
Browse files Browse the repository at this point in the history
  • Loading branch information
gatesn authored Nov 28, 2024
1 parent 1b62acf commit 29a5d45
Showing 1 changed file with 18 additions and 1 deletion.
19 changes: 18 additions & 1 deletion vortex-datafusion/src/persistent/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::fmt;
use std::sync::Arc;

use datafusion::datasource::physical_plan::{FileScanConfig, FileStream};
use datafusion_common::{project_schema, Result as DFResult};
use datafusion_common::{project_schema, Result as DFResult, Statistics};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr};
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
Expand All @@ -19,6 +19,7 @@ pub struct VortexExec {
metrics: ExecutionPlanMetricsSet,
predicate: Option<Arc<dyn PhysicalExpr>>,
plan_properties: PlanProperties,
projected_statistics: Statistics,
ctx: Arc<Context>,
}

Expand All @@ -39,11 +40,23 @@ impl VortexExec {
ExecutionMode::Bounded,
);

// We project our statistics to only the selected columns
// We must also take care to report in-exact statistics if we have any form of filter
// push-down.
let mut projected_statistics = file_scan_config
.statistics
.clone()
.project(file_scan_config.projection.as_ref());
if predicate.is_some() {
projected_statistics = projected_statistics.to_inexact();
}

Ok(Self {
file_scan_config,
metrics,
predicate,
plan_properties,
projected_statistics,
ctx,
})
}
Expand Down Expand Up @@ -107,4 +120,8 @@ impl ExecutionPlan for VortexExec {

Ok(Box::pin(stream))
}

fn statistics(&self) -> DFResult<Statistics> {
Ok(self.projected_statistics.clone())
}
}

0 comments on commit 29a5d45

Please sign in to comment.