Skip to content

Commit

Permalink
Pushdown ORDER BY for realtime caggs
Browse files Browse the repository at this point in the history
Previously ordered queries on realtime caggs would always lead to full
table scan as the query plan would have a sort with the limit on top.
With this patch this gets changed so that the ORDER BY can be pushed
down so the query can benefit from the ordered append optimization and
does not require full table scan.

Fixes timescale#4861
  • Loading branch information
svenklemm committed Sep 25, 2024
1 parent d29ccee commit f4954e3
Show file tree
Hide file tree
Showing 13 changed files with 695 additions and 122 deletions.
1 change: 1 addition & 0 deletions .unreleased/pr_7271
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #7271 Push down ORDER BY in real time continuous aggregate queries
12 changes: 12 additions & 0 deletions src/guc.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ bool ts_guc_enable_qual_propagation = true;
bool ts_guc_enable_cagg_reorder_groupby = true;
bool ts_guc_enable_now_constify = true;
bool ts_guc_enable_foreign_key_propagation = true;
TSDLLEXPORT bool ts_guc_enable_cagg_sort_pushdown = true;
TSDLLEXPORT bool ts_guc_enable_cagg_watermark_constify = true;
TSDLLEXPORT int ts_guc_cagg_max_individual_materializations = 10;
bool ts_guc_enable_osm_reads = true;
Expand Down Expand Up @@ -562,6 +563,17 @@ _guc_init(void)
NULL,
NULL);

DefineCustomBoolVariable(MAKE_EXTOPTION("enable_cagg_sort_pushdown"),
"Enable sort pushdown for continuous aggregates",
"Enable pushdown of ORDER BY clause for continuous aggregates",
&ts_guc_enable_cagg_sort_pushdown,
true,
PGC_USERSET,
0,
NULL,
NULL,
NULL);

DefineCustomBoolVariable(MAKE_EXTOPTION("enable_cagg_watermark_constify"),
"Enable cagg watermark constify",
"Enable constifying cagg watermark for real-time caggs",
Expand Down
3 changes: 2 additions & 1 deletion src/guc.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ extern bool ts_guc_enable_cagg_reorder_groupby;
extern TSDLLEXPORT int ts_guc_cagg_max_individual_materializations;
extern bool ts_guc_enable_now_constify;
extern bool ts_guc_enable_foreign_key_propagation;
extern TSDLLEXPORT bool ts_guc_enable_cagg_watermark_constify;
extern bool ts_guc_enable_osm_reads;
extern TSDLLEXPORT bool ts_guc_enable_cagg_sort_pushdown;
extern TSDLLEXPORT bool ts_guc_enable_cagg_watermark_constify;
extern TSDLLEXPORT bool ts_guc_enable_dml_decompression;
extern TSDLLEXPORT bool ts_guc_enable_dml_decompression_tuple_filtering;
extern TSDLLEXPORT bool ts_guc_enable_compressed_direct_batch_delete;
Expand Down
97 changes: 97 additions & 0 deletions tsl/src/continuous_aggs/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -355,3 +355,100 @@ constify_cagg_watermark(Query *parse)
if (context.valid_query)
replace_watermark_with_const(&context);
}

/*
* Push down ORDER BY and LIMIT into subqueries of UNION for realtime
* continuous aggregates when sorting by time.
*/
void
cagg_sort_pushdown(Query *parse)
{
ListCell *lc;

/* Nothing to do if we have no valid sort clause */
if (list_length(parse->rtable) != 1 || list_length(parse->sortClause) != 1 ||
!OidIsValid(linitial_node(SortGroupClause, parse->sortClause)->sortop))
return;

Cache *cache = ts_hypertable_cache_pin();

foreach (lc, parse->rtable)
{
RangeTblEntry *rte = lfirst(lc);

/*
* Realtime cagg view will have 2 rtable entries, one for the materialized data and one for
* the not yet materialized data.
*/
if (rte->rtekind != RTE_SUBQUERY || rte->relkind != RELKIND_VIEW ||
list_length(rte->subquery->rtable) != 2)
continue;

ContinuousAgg *cagg = ts_continuous_agg_find_by_relid(rte->relid);

/*
* This optimization only applies to realtime caggs.
*/
if (!cagg || !cagg->data.finalized || cagg->data.materialized_only)
continue;

Hypertable *ht = ts_hypertable_cache_get_entry_by_id(cache, cagg->data.mat_hypertable_id);
Dimension const *dim = hyperspace_get_open_dimension(ht->space, 0);

/* We should only encounter hypertables with an open dimension */
if (!dim)
continue;

SortGroupClause *sort = linitial_node(SortGroupClause, parse->sortClause);
TargetEntry *tle = get_sortgroupref_tle(sort->tleSortGroupRef, parse->targetList);

/*
* We only pushdown ORDER BY when it's single column
* ORDER BY on the time column.
*/
AttrNumber time_col = dim->column_attno;
if (!IsA(tle->expr, Var) || castNode(Var, tle->expr)->varattno != time_col)
continue;

RangeTblEntry *mat_rte = linitial_node(RangeTblEntry, rte->subquery->rtable);
RangeTblEntry *rt_rte = lsecond_node(RangeTblEntry, rte->subquery->rtable);

mat_rte->subquery->sortClause = list_copy(parse->sortClause);
rt_rte->subquery->sortClause = list_copy(parse->sortClause);

TargetEntry *mat_tle = list_nth(mat_rte->subquery->targetList, time_col - 1);
TargetEntry *rt_tle = list_nth(rt_rte->subquery->targetList, time_col - 1);
linitial_node(SortGroupClause, mat_rte->subquery->sortClause)->tleSortGroupRef =
mat_tle->ressortgroupref;
linitial_node(SortGroupClause, rt_rte->subquery->sortClause)->tleSortGroupRef =
rt_tle->ressortgroupref;

SortGroupClause *cagg_group = linitial(rt_rte->subquery->groupClause);
cagg_group = list_nth(rt_rte->subquery->groupClause, rt_tle->ressortgroupref - 1);
cagg_group->sortop = sort->sortop;
cagg_group->nulls_first = sort->nulls_first;

Oid placeholder;
int16 strategy;
get_ordering_op_properties(sort->sortop, &placeholder, &placeholder, &strategy);

/*
* If this is DESC order and the sortop is the commutator of the cagg_group sortop,
* we can align the sortop of the cagg_group with the sortop of the sort clause, which
* will allow us to have the GroupAggregate node to produce the correct order and avoid
* having to resort.
*/
if (strategy == BTGreaterStrategyNumber)
{
/*
* Try to align the sortop of the cagg_group with the sortop of the
* sort clause.
*/
rte->subquery->rtable = list_make2(rt_rte, mat_rte);
}

parse->sortClause = NIL;
rte->subquery->sortClause = NIL;
}
ts_cache_release(cache);
}
1 change: 1 addition & 0 deletions tsl/src/continuous_aggs/planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@
#include "planner/planner.h"

void constify_cagg_watermark(Query *parse);
void cagg_sort_pushdown(Query *parse);

#endif
6 changes: 6 additions & 0 deletions tsl/src/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,12 @@ tsl_preprocess_query(Query *parse)
{
constify_cagg_watermark(parse);
}

/* Push down ORDER BY and LIMIT for realtime cagg */
if (ts_guc_enable_cagg_sort_pushdown)
{
cagg_sort_pushdown(parse);
}
}

/*
Expand Down
Loading

0 comments on commit f4954e3

Please sign in to comment.