Skip to content

Commit

Permalink
Pushdown LIMIT and ORDER BY for realtime caggs
Browse files Browse the repository at this point in the history
Previously ordered queries on realtime caggs would always lead to full
table scan as the query plan would have a sort with the limit on top.
With this patch this gets changed so that the ORDER BY and LIMIT can
be pushed down so the query can benefit from the ordered append
optimization and does not require full table scan.

Fixes timescale#4861
  • Loading branch information
svenklemm committed Sep 23, 2024
1 parent d29ccee commit bd4f332
Show file tree
Hide file tree
Showing 12 changed files with 529 additions and 122 deletions.
12 changes: 12 additions & 0 deletions src/guc.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ bool ts_guc_enable_qual_propagation = true;
bool ts_guc_enable_cagg_reorder_groupby = true;
bool ts_guc_enable_now_constify = true;
bool ts_guc_enable_foreign_key_propagation = true;
TSDLLEXPORT bool ts_guc_enable_cagg_sort_pushdown = true;
TSDLLEXPORT bool ts_guc_enable_cagg_watermark_constify = true;
TSDLLEXPORT int ts_guc_cagg_max_individual_materializations = 10;
bool ts_guc_enable_osm_reads = true;
Expand Down Expand Up @@ -562,6 +563,17 @@ _guc_init(void)
NULL,
NULL);

DefineCustomBoolVariable(MAKE_EXTOPTION("enable_cagg_sort_pushdown"),
"Enable sort pushdown for continuous aggregates",
"Enable pushdown of ORDER BY clause for continuous aggregates",
&ts_guc_enable_cagg_sort_pushdown,
true,
PGC_USERSET,
0,
NULL,
NULL,
NULL);

DefineCustomBoolVariable(MAKE_EXTOPTION("enable_cagg_watermark_constify"),
"Enable cagg watermark constify",
"Enable constifying cagg watermark for real-time caggs",
Expand Down
3 changes: 2 additions & 1 deletion src/guc.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ extern bool ts_guc_enable_cagg_reorder_groupby;
extern TSDLLEXPORT int ts_guc_cagg_max_individual_materializations;
extern bool ts_guc_enable_now_constify;
extern bool ts_guc_enable_foreign_key_propagation;
extern TSDLLEXPORT bool ts_guc_enable_cagg_watermark_constify;
extern bool ts_guc_enable_osm_reads;
extern TSDLLEXPORT bool ts_guc_enable_cagg_sort_pushdown;
extern TSDLLEXPORT bool ts_guc_enable_cagg_watermark_constify;
extern TSDLLEXPORT bool ts_guc_enable_dml_decompression;
extern TSDLLEXPORT bool ts_guc_enable_dml_decompression_tuple_filtering;
extern TSDLLEXPORT bool ts_guc_enable_compressed_direct_batch_delete;
Expand Down
109 changes: 109 additions & 0 deletions tsl/src/continuous_aggs/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -355,3 +355,112 @@ constify_cagg_watermark(Query *parse)
if (context.valid_query)
replace_watermark_with_const(&context);
}

/*
* Push down ORDER BY and LIMIT into subqueries of UNION for realtime
* continuous aggregates when sorting by time.
*/
void
cagg_sort_pushdown(Query *parse)
{
ListCell *lc;

/* Nothing to do if we have no valid sort clause */
if (list_length(parse->rtable) != 1 || list_length(parse->sortClause) != 1 ||
!OidIsValid(linitial_node(SortGroupClause, parse->sortClause)->sortop))
return;

Cache *cache = ts_hypertable_cache_pin();

foreach (lc, parse->rtable)
{
RangeTblEntry *rte = lfirst(lc);

/*
* Realtime cagg view will have 2 rtable entries, one for the materialized data and one for
* the not yet materialized data.
*/
if (rte->rtekind != RTE_SUBQUERY || rte->relkind != RELKIND_VIEW ||
list_length(rte->subquery->rtable) != 2)
continue;

ContinuousAgg *cagg = ts_continuous_agg_find_by_relid(rte->relid);

/*
* This optimization only applies to realtime caggs.
*/
if (!cagg || !cagg->data.finalized || cagg->data.materialized_only)
continue;

Hypertable *ht = ts_hypertable_cache_get_entry_by_id(cache, cagg->data.mat_hypertable_id);
Dimension const *dim = hyperspace_get_open_dimension(ht->space, 0);

/* We should only encounter hypertables with an open dimension */
if (!dim)
continue;

SortGroupClause *sort = linitial_node(SortGroupClause, parse->sortClause);
TargetEntry *tle = get_sortgroupref_tle(sort->tleSortGroupRef, parse->targetList);

/*
* We only pushdown ORDER BY when it's single column
* ORDER BY on the time column.
*/
AttrNumber time_col = dim->column_attno;
if (!IsA(tle->expr, Var) || castNode(Var, tle->expr)->varattno != time_col)
continue;

RangeTblEntry *mat_rte = linitial_node(RangeTblEntry, rte->subquery->rtable);
RangeTblEntry *rt_rte = lsecond_node(RangeTblEntry, rte->subquery->rtable);

/*
* When we have a LIMIT and no offset we can push down the LIMIT into the subqueries.
*/
if (parse->limitOption == LIMIT_OPTION_COUNT && parse->limitCount && !parse->limitOffset)
{
mat_rte->subquery->limitOption = LIMIT_OPTION_COUNT;
rt_rte->subquery->limitOption = LIMIT_OPTION_COUNT;
mat_rte->subquery->limitCount = parse->limitCount;
rt_rte->subquery->limitCount = parse->limitCount;
}

mat_rte->subquery->sortClause = list_copy(parse->sortClause);
rt_rte->subquery->sortClause = list_copy(parse->sortClause);

TargetEntry *mat_tle = list_nth(mat_rte->subquery->targetList, time_col - 1);
TargetEntry *rt_tle = list_nth(rt_rte->subquery->targetList, time_col - 1);
linitial_node(SortGroupClause, mat_rte->subquery->sortClause)->tleSortGroupRef =
mat_tle->ressortgroupref;
linitial_node(SortGroupClause, rt_rte->subquery->sortClause)->tleSortGroupRef =
rt_tle->ressortgroupref;

SortGroupClause *cagg_group = linitial(rt_rte->subquery->groupClause);
cagg_group = list_nth(rt_rte->subquery->groupClause, rt_tle->ressortgroupref - 1);

/*
* If this is DESC order and the sortop is the commutator of the cagg_group sortop,
* we can align the sortop of the cagg_group with the sortop of the sort clause, which
* will allow us to have the GroupAggregate node to produce the correct order and avoid
* having to resort.
*
* To avoid additional catalog lookups here checking the sort strategy we implicitly check
* for DESC here. The sortop on grouping will always be ASC and when the toplevel sortop
* is the commutator of the grouping sortop, it means the toplevel sortop is DESC.
*/
if (cagg_group->sortop != sort->sortop &&
cagg_group->sortop == get_commutator(sort->sortop))
{
/*
* Try to align the sortop of the cagg_group with the sortop of the
* sort clause.
*/
cagg_group->sortop = sort->sortop;
cagg_group->nulls_first = sort->nulls_first;

rte->subquery->rtable = list_make2(rt_rte, mat_rte);
}

parse->sortClause = NIL;
}
ts_cache_release(cache);
}
1 change: 1 addition & 0 deletions tsl/src/continuous_aggs/planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@
#include "planner/planner.h"

void constify_cagg_watermark(Query *parse);
void cagg_sort_pushdown(Query *parse);

#endif
6 changes: 6 additions & 0 deletions tsl/src/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,12 @@ tsl_preprocess_query(Query *parse)
{
constify_cagg_watermark(parse);
}

/* Push down ORDER BY and LIMIT for realtime cagg */
if (ts_guc_enable_cagg_sort_pushdown)
{
cagg_sort_pushdown(parse);
}
}

/*
Expand Down
Loading

0 comments on commit bd4f332

Please sign in to comment.