Skip to content

Commit

Permalink
builder
Browse files Browse the repository at this point in the history
Signed-off-by: coldWater <[email protected]>
  • Loading branch information
forsaken628 committed Oct 29, 2024
1 parent 700ea6e commit 65a59c2
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 11 deletions.
10 changes: 10 additions & 0 deletions src/query/service/src/pipelines/builders/builder_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use databend_common_expression::with_number_mapped_type;
use databend_common_expression::SortColumnDescription;
use databend_common_pipeline_core::processors::Processor;
use databend_common_pipeline_core::processors::ProcessorPtr;
use databend_common_pipeline_transforms::processors::TransformPipelineHelper;
use databend_common_sql::executor::physical_plans::Window;
use databend_common_sql::executor::physical_plans::WindowPartition;
use databend_storages_common_cache::TempDirManager;
Expand All @@ -31,6 +32,7 @@ use opendal::Operator;

use crate::pipelines::processors::transforms::FrameBound;
use crate::pipelines::processors::transforms::TransformWindow;
use crate::pipelines::processors::transforms::TransformWindowPartialTopN;
use crate::pipelines::processors::transforms::TransformWindowPartitionCollect;
use crate::pipelines::processors::transforms::WindowFunctionInfo;
use crate::pipelines::processors::transforms::WindowPartitionExchange;
Expand Down Expand Up @@ -169,6 +171,14 @@ impl PipelineBuilder {
})
.collect::<Result<Vec<_>>>()?;

if let Some(limit) = window_partition.limit {
if limit > 0 {
self.main_pipeline.add_transformer(|| {
TransformWindowPartialTopN::new(partition_by.clone(), sort_desc.clone(), limit)
})
}
}

self.main_pipeline.exchange(
num_processors,
WindowPartitionExchange::create(partition_by.clone(), num_partitions),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ pub struct TransformWindowPartialTopN {
}

impl TransformWindowPartialTopN {
pub fn try_new(
pub fn new(
partition_indices: Vec<usize>,
order_by: Vec<SortColumnDescription>,
limit: usize,
) -> Result<Self> {
) -> Self {
assert!(limit > 0);
let partition_indices = partition_indices.into_boxed_slice();
let sort_desc = partition_indices
Expand All @@ -44,12 +44,12 @@ impl TransformWindowPartialTopN {
.collect::<Vec<_>>()
.into();

Ok(Self {
Self {
partition_indices,
limit,
sort_desc,
indices: Vec::new(),
})
}
}
}

Expand Down Expand Up @@ -151,11 +151,8 @@ mod tests {
data.check_valid()?;

{
let mut transform = TransformWindowPartialTopN::try_new(
partition_indices.clone(),
order_by.clone(),
3,
)?;
let mut transform =
TransformWindowPartialTopN::new(partition_indices.clone(), order_by.clone(), 3);

let got = transform.transform(data.clone())?;
let want = DataBlock::new_from_columns(vec![
Expand All @@ -168,8 +165,7 @@ mod tests {
}

{
let mut transform =
TransformWindowPartialTopN::try_new(partition_indices, order_by, 1)?;
let mut transform = TransformWindowPartialTopN::new(partition_indices, order_by, 1);

let got = transform.transform(data.clone())?;
let want = DataBlock::new_from_columns(vec![
Expand Down
1 change: 1 addition & 0 deletions src/query/sql/src/executor/physical_plan_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ pub trait PhysicalPlanReplacer {
partition_by: plan.partition_by.clone(),
order_by: plan.order_by.clone(),
after_exchange: plan.after_exchange,
limit: plan.limit,
stat_info: plan.stat_info.clone(),
}))
}
Expand Down
1 change: 1 addition & 0 deletions src/query/sql/src/executor/physical_plans/physical_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ impl PhysicalPlanBuilder {
partition_by: window_partition.clone(),
order_by: order_by.clone(),
after_exchange: sort.after_exchange,
limit: sort.limit,
stat_info: Some(stat_info.clone()),
}));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub struct WindowPartition {
pub partition_by: Vec<IndexType>,
pub order_by: Vec<SortDesc>,
pub after_exchange: Option<bool>,
pub limit: Option<usize>,

pub stat_info: Option<PlanStatsInfo>,
}
Expand Down

0 comments on commit 65a59c2

Please sign in to comment.