Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(planner): improve infer filter #16361

Merged
merged 19 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 33 additions & 20 deletions src/query/service/src/pipelines/builders/builder_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ impl PipelineBuilder {
self.main_pipeline.get_scopes(),
);
right_side_builder.cte_state = self.cte_state.clone();
right_side_builder.cte_scan_offsets = self.cte_scan_offsets.clone();
right_side_builder.hash_join_states = self.hash_join_states.clone();

let mut right_res = right_side_builder.finalize(&range_join.right)?;
Expand Down Expand Up @@ -148,6 +149,7 @@ impl PipelineBuilder {
self.main_pipeline.get_scopes(),
);
build_side_builder.cte_state = self.cte_state.clone();
build_side_builder.cte_scan_offsets = self.cte_scan_offsets.clone();
build_side_builder.hash_join_states = self.hash_join_states.clone();
let mut build_res = build_side_builder.finalize(build)?;

Expand Down Expand Up @@ -231,52 +233,63 @@ impl PipelineBuilder {
&mut self,
materialized_cte: &MaterializedCte,
) -> Result<()> {
self.expand_left_side_pipeline(
&materialized_cte.left,
self.cte_scan_offsets.insert(
materialized_cte.cte_idx,
&materialized_cte.left_output_columns,
materialized_cte.cte_scan_offset.clone(),
);
self.expand_materialized_side_pipeline(
&materialized_cte.right,
materialized_cte.cte_idx,
&materialized_cte.materialized_output_columns,
)?;
self.build_pipeline(&materialized_cte.right)
self.build_pipeline(&materialized_cte.left)
}

fn expand_left_side_pipeline(
fn expand_materialized_side_pipeline(
&mut self,
left_side: &PhysicalPlan,
materialized_side: &PhysicalPlan,
cte_idx: IndexType,
left_output_columns: &[ColumnBinding],
materialized_output_columns: &[ColumnBinding],
) -> Result<()> {
let left_side_ctx = QueryContext::create_from(self.ctx.clone());
let materialized_side_ctx = QueryContext::create_from(self.ctx.clone());
let state = Arc::new(MaterializedCteState::new(self.ctx.clone()));
self.cte_state.insert(cte_idx, state.clone());
let mut left_side_builder = PipelineBuilder::create(
let mut materialized_side_builder = PipelineBuilder::create(
self.func_ctx.clone(),
self.settings.clone(),
left_side_ctx,
materialized_side_ctx,
self.main_pipeline.get_scopes(),
);
left_side_builder.cte_state = self.cte_state.clone();
left_side_builder.hash_join_states = self.hash_join_states.clone();
let mut left_side_pipeline = left_side_builder.finalize(left_side)?;
assert!(left_side_pipeline.main_pipeline.is_pulling_pipeline()?);
materialized_side_builder.cte_state = self.cte_state.clone();
materialized_side_builder.cte_scan_offsets = self.cte_scan_offsets.clone();
materialized_side_builder.hash_join_states = self.hash_join_states.clone();
let mut materialized_side_pipeline =
materialized_side_builder.finalize(materialized_side)?;
assert!(
materialized_side_pipeline
.main_pipeline
.is_pulling_pipeline()?
);

PipelineBuilder::build_result_projection(
&self.func_ctx,
left_side.output_schema()?,
left_output_columns,
&mut left_side_pipeline.main_pipeline,
materialized_side.output_schema()?,
materialized_output_columns,
&mut materialized_side_pipeline.main_pipeline,
false,
)?;

left_side_pipeline.main_pipeline.add_sink(|input| {
materialized_side_pipeline.main_pipeline.add_sink(|input| {
let transform = Sinker::<MaterializedCteSink>::create(
input,
MaterializedCteSink::create(self.ctx.clone(), cte_idx, state.clone())?,
);
Ok(ProcessorPtr::create(transform))
})?;
self.pipelines
.push(left_side_pipeline.main_pipeline.finalize());
self.pipelines.extend(left_side_pipeline.sources_pipelines);
.push(materialized_side_pipeline.main_pipeline.finalize());
self.pipelines
.extend(materialized_side_pipeline.sources_pipelines);
Ok(())
}
}
4 changes: 4 additions & 0 deletions src/query/service/src/pipelines/builders/builder_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ impl PipelineBuilder {
cte_scan.cte_idx,
self.cte_state.get(&cte_scan.cte_idx.0).unwrap().clone(),
cte_scan.offsets.clone(),
self.cte_scan_offsets
.get(&cte_scan.cte_idx.0)
.unwrap()
.clone(),
)
},
max_threads as usize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ impl PipelineBuilder {
self.main_pipeline.get_scopes(),
);
pipeline_builder.cte_state = self.cte_state.clone();
pipeline_builder.cte_scan_offsets = self.cte_scan_offsets.clone();
pipeline_builder.hash_join_states = self.hash_join_states.clone();

let mut build_res = pipeline_builder.finalize(input)?;
Expand Down
5 changes: 4 additions & 1 deletion src/query/service/src/pipelines/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ pub struct PipelineBuilder {
pub merge_into_probe_data_fields: Option<Vec<DataField>>,
pub join_state: Option<Arc<HashJoinBuildState>>,

// Cte -> state, each cte has it's own state
// The cte state of each materialized cte.
pub cte_state: HashMap<IndexType, Arc<MaterializedCteState>>,
// The column offsets used by cte scan
pub cte_scan_offsets: HashMap<IndexType, Vec<usize>>,

pub(crate) exchange_injector: Arc<dyn ExchangeInjector>,

Expand All @@ -75,6 +77,7 @@ impl PipelineBuilder {
main_pipeline: Pipeline::with_scopes(scopes),
exchange_injector: DefaultExchangeInjector::create(),
cte_state: HashMap::new(),
cte_scan_offsets: HashMap::new(),
merge_into_probe_data_fields: None,
join_state: None,
hash_join_states: HashMap::new(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ pub struct MaterializedCteSource {
cte_idx: (IndexType, IndexType),
ctx: Arc<QueryContext>,
cte_state: Arc<MaterializedCteState>,
offsets: Vec<IndexType>,
column_offsets: Vec<IndexType>,
scan_offsets: Vec<usize>,
}

impl MaterializedCteSource {
Expand All @@ -146,13 +147,15 @@ impl MaterializedCteSource {
output_port: Arc<OutputPort>,
cte_idx: (IndexType, IndexType),
cte_state: Arc<MaterializedCteState>,
offsets: Vec<IndexType>,
column_offsets: Vec<IndexType>,
scan_offsets: Vec<usize>,
) -> Result<ProcessorPtr> {
AsyncSourcer::create(ctx.clone(), output_port, MaterializedCteSource {
ctx,
cte_idx,
cte_state,
offsets,
column_offsets,
scan_offsets,
})
}
}
Expand All @@ -167,19 +170,19 @@ impl AsyncSource for MaterializedCteSource {
let materialized_cte = self.ctx.get_materialized_cte(self.cte_idx)?;
if let Some(blocks) = materialized_cte {
let mut blocks_guard = blocks.write();
let block = blocks_guard.pop();
if let Some(b) = block {
if self.offsets.len() == b.num_columns() {
return Ok(Some(b));
let data_block = blocks_guard.pop();
if let Some(data_block) = data_block {
if self.column_offsets.len() == data_block.num_columns() {
return Ok(Some(data_block));
}
let row_len = b.num_rows();
let num_rows = data_block.num_rows();
let pruned_columns = self
.offsets
.column_offsets
.iter()
.map(|offset| b.get_by_offset(*offset).clone())
.map(|offset| data_block.get_by_offset(self.scan_offsets[*offset]).clone())
.collect::<Vec<BlockEntry>>();

Ok(Some(DataBlock::new(pruned_columns, row_len)))
Ok(Some(DataBlock::new(pruned_columns, num_rows)))
} else {
Ok(None)
}
Expand Down
6 changes: 3 additions & 3 deletions src/query/sql/src/executor/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,11 @@ impl PhysicalPlan {
}
PhysicalPlan::CteScan(cte_scan) => cte_scan_to_format_tree(cte_scan),
PhysicalPlan::MaterializedCte(materialized_cte) => {
let left_child = materialized_cte.left.format_join(metadata)?;
let right_child = materialized_cte.right.format_join(metadata)?;
let left_child = materialized_cte.left.format_join(metadata)?;
let children = vec![
FormatTreeNode::with_children("Left".to_string(), vec![left_child]),
FormatTreeNode::with_children("Right".to_string(), vec![right_child]),
FormatTreeNode::with_children("Left".to_string(), vec![left_child]),
];
Ok(FormatTreeNode::with_children(
format!("MaterializedCte: {}", materialized_cte.cte_idx),
Expand Down Expand Up @@ -1766,8 +1766,8 @@ fn materialized_cte_to_format_tree(
"output columns: [{}]",
format_output_columns(plan.output_schema()?, metadata, true)
)),
to_format_tree(&plan.left, metadata, profs)?,
to_format_tree(&plan.right, metadata, profs)?,
to_format_tree(&plan.left, metadata, profs)?,
];
Ok(FormatTreeNode::with_children(
"MaterializedCTE".to_string(),
Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/executor/physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ impl PhysicalPlan {
PhysicalPlan::MutationOrganize(plan) => Box::new(std::iter::once(plan.input.as_ref())),
PhysicalPlan::AddStreamColumn(plan) => Box::new(std::iter::once(plan.input.as_ref())),
PhysicalPlan::MaterializedCte(plan) => Box::new(
std::iter::once(plan.left.as_ref()).chain(std::iter::once(plan.right.as_ref())),
std::iter::once(plan.right.as_ref()).chain(std::iter::once(plan.left.as_ref())),
),
PhysicalPlan::Udf(plan) => Box::new(std::iter::once(plan.input.as_ref())),
PhysicalPlan::AsyncFunction(plan) => Box::new(std::iter::once(plan.input.as_ref())),
Expand Down
4 changes: 4 additions & 0 deletions src/query/sql/src/executor/physical_plan_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;

use databend_common_catalog::plan::Partitions;
Expand Down Expand Up @@ -40,6 +41,8 @@ pub struct PhysicalPlanBuilder {
pub(crate) dry_run: bool,
// Record cte_idx and the cte's output columns
pub(crate) cte_output_columns: HashMap<IndexType, Vec<ColumnBinding>>,
// The used column offsets of each materialized cte.
pub(crate) cet_used_column_offsets: HashMap<IndexType, HashSet<usize>>,
// DataMutation info, used to build MergeInto physical plan
pub(crate) mutation_build_info: Option<MutationBuildInfo>,
}
Expand All @@ -53,6 +56,7 @@ impl PhysicalPlanBuilder {
func_ctx,
dry_run,
cte_output_columns: Default::default(),
cet_used_column_offsets: Default::default(),
mutation_build_info: None,
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/query/sql/src/executor/physical_plan_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,15 +277,16 @@ pub trait PhysicalPlanReplacer {
}

fn replace_materialized_cte(&mut self, plan: &MaterializedCte) -> Result<PhysicalPlan> {
let left = self.replace(&plan.left)?;
let right = self.replace(&plan.right)?;
let left = self.replace(&plan.left)?;

Ok(PhysicalPlan::MaterializedCte(MaterializedCte {
plan_id: plan.plan_id,
left: Box::new(left),
right: Box::new(right),
cte_idx: plan.cte_idx,
left_output_columns: plan.left_output_columns.clone(),
cte_scan_offset: plan.cte_scan_offset.clone(),
materialized_output_columns: plan.materialized_output_columns.clone(),
}))
}

Expand Down
17 changes: 14 additions & 3 deletions src/query/sql/src/executor/physical_plans/physical_cte_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,27 @@ impl PhysicalPlanBuilder {
used_columns = required.intersection(&used_columns).cloned().collect();
let mut pruned_fields = vec![];
let mut pruned_offsets = vec![];
let mut pruned_materialized_indexes = vec![];
let cte_output_columns = self.cte_output_columns.get(&cte_scan.cte_idx.0).unwrap();
for field in cte_scan.fields.iter() {
for (field, column_index) in cte_scan
.fields
.iter()
.zip(cte_scan.materialized_indexes.iter())
{
if used_columns.contains(&field.name().parse()?) {
pruned_fields.push(field.clone());
pruned_materialized_indexes.push(*column_index);
}
}
for field in pruned_fields.iter() {
for column_index in pruned_materialized_indexes.iter() {
for (offset, col) in cte_output_columns.iter().enumerate() {
if col.index.eq(&field.name().parse::<IndexType>()?) {
if col.index.eq(column_index) {
pruned_offsets.push(offset);
self.cet_used_column_offsets
.entry(cte_scan.cte_idx.0)
.and_modify(|column_offsets| {
column_offsets.insert(offset);
});
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;

use databend_common_exception::Result;
use databend_common_expression::DataSchemaRef;
use databend_common_expression::DataSchemaRefExt;

use crate::executor::PhysicalPlan;
use crate::executor::PhysicalPlanBuilder;
use crate::optimizer::RelExpr;
use crate::optimizer::SExpr;
use crate::ColumnBinding;
use crate::ColumnSet;
Expand All @@ -31,12 +32,13 @@ pub struct MaterializedCte {
pub left: Box<PhysicalPlan>,
pub right: Box<PhysicalPlan>,
pub cte_idx: IndexType,
pub left_output_columns: Vec<ColumnBinding>,
pub cte_scan_offset: Vec<usize>,
pub materialized_output_columns: Vec<ColumnBinding>,
}

impl MaterializedCte {
pub fn output_schema(&self) -> Result<DataSchemaRef> {
let fields = self.right.output_schema()?.fields().clone();
let fields = self.left.output_schema()?.fields().clone();
Ok(DataSchemaRefExt::create(fields))
}
}
Expand All @@ -49,36 +51,35 @@ impl PhysicalPlanBuilder {
required: ColumnSet,
) -> Result<PhysicalPlan> {
// 1. Prune unused Columns.
let left_output_column = RelExpr::with_s_expr(s_expr)
.derive_relational_prop_child(0)?
.output_columns
.clone();
let right_used_column = RelExpr::with_s_expr(s_expr)
.derive_relational_prop_child(1)?
.used_columns
.clone();
// Get the intersection of `left_used_column` and `right_used_column`
let left_required = left_output_column
.intersection(&right_used_column)
.cloned()
.collect::<ColumnSet>();
self.cte_output_columns
.insert(cte.cte_idx, cte.materialized_output_columns.clone());
self.cet_used_column_offsets
.insert(cte.cte_idx, HashSet::new());
let left = Box::new(self.build(s_expr.child(0)?, required).await?);

let mut required_output_columns = vec![];
for column in cte.left_output_columns.iter() {
if left_required.contains(&column.index) {
required_output_columns.push(column.clone());
let mut materialize_required = ColumnSet::new();
let mut materialized_output_columns = vec![];
let mut cte_scan_offset = Vec::with_capacity(cte.materialized_output_columns.len());
let used_column_offset = self.cet_used_column_offsets.get(&cte.cte_idx).unwrap();
for (offset, column) in cte.materialized_output_columns.iter().enumerate() {
if used_column_offset.contains(&offset) {
cte_scan_offset.push(materialized_output_columns.len());
materialize_required.insert(column.index);
materialized_output_columns.push(column.clone());
} else {
cte_scan_offset.push(0);
}
}
self.cte_output_columns
.insert(cte.cte_idx, required_output_columns.clone());
let right = Box::new(self.build(s_expr.child(1)?, materialize_required).await?);

// 2. Build physical plan.
Ok(PhysicalPlan::MaterializedCte(MaterializedCte {
plan_id: 0,
left: Box::new(self.build(s_expr.child(0)?, left_required).await?),
right: Box::new(self.build(s_expr.child(1)?, required).await?),
left,
right,
cte_idx: cte.cte_idx,
left_output_columns: required_output_columns,
cte_scan_offset,
materialized_output_columns,
}))
}
}
Loading
Loading