Skip to content

Commit

Permalink
Pushdown ORDER BY for realtime caggs
Browse files Browse the repository at this point in the history
Previously ordered queries on realtime caggs would always lead to full
table scan as the query plan would have a sort with the limit on top.
With this patch this gets changed so that the ORDER BY can be pushed
down so the query can benefit from the ordered append optimization and
does not require full table scan.

Fixes timescale#4861
  • Loading branch information
svenklemm committed Sep 28, 2024
1 parent 57c1dec commit 52b7d00
Show file tree
Hide file tree
Showing 18 changed files with 729 additions and 160 deletions.
1 change: 1 addition & 0 deletions .unreleased/pr_7271
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #7271 Push down ORDER BY in real time continuous aggregate queries
2 changes: 1 addition & 1 deletion src/cross_module_fn.c
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ ts_tsl_loaded(PG_FUNCTION_ARGS)
}

static void
preprocess_query_tsl_default_fn_community(Query *parse)
preprocess_query_tsl_default_fn_community(Query *parse, int *cursor_opts)
{
/* No op in community licensed code */
}
Expand Down
2 changes: 1 addition & 1 deletion src/cross_module_fn.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ typedef struct CrossModuleFunctions
PGFunction chunk_unfreeze_chunk;
PGFunction recompress_chunk_segmentwise;
PGFunction get_compressed_chunk_index_for_recompression;
void (*preprocess_query_tsl)(Query *parse);
void (*preprocess_query_tsl)(Query *parse, int *cursor_opts);
} CrossModuleFunctions;

extern TSDLLEXPORT CrossModuleFunctions *ts_cm_functions;
Expand Down
12 changes: 12 additions & 0 deletions src/guc.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ bool ts_guc_enable_qual_propagation = true;
bool ts_guc_enable_cagg_reorder_groupby = true;
bool ts_guc_enable_now_constify = true;
bool ts_guc_enable_foreign_key_propagation = true;
TSDLLEXPORT bool ts_guc_enable_cagg_sort_pushdown = true;
TSDLLEXPORT bool ts_guc_enable_cagg_watermark_constify = true;
TSDLLEXPORT int ts_guc_cagg_max_individual_materializations = 10;
bool ts_guc_enable_osm_reads = true;
Expand Down Expand Up @@ -562,6 +563,17 @@ _guc_init(void)
NULL,
NULL);

DefineCustomBoolVariable(MAKE_EXTOPTION("enable_cagg_sort_pushdown"),
"Enable sort pushdown for continuous aggregates",
"Enable pushdown of ORDER BY clause for continuous aggregates",
&ts_guc_enable_cagg_sort_pushdown,
true,
PGC_USERSET,
0,
NULL,
NULL,
NULL);

DefineCustomBoolVariable(MAKE_EXTOPTION("enable_cagg_watermark_constify"),
"Enable cagg watermark constify",
"Enable constifying cagg watermark for real-time caggs",
Expand Down
3 changes: 2 additions & 1 deletion src/guc.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ extern bool ts_guc_enable_cagg_reorder_groupby;
extern TSDLLEXPORT int ts_guc_cagg_max_individual_materializations;
extern bool ts_guc_enable_now_constify;
extern bool ts_guc_enable_foreign_key_propagation;
extern TSDLLEXPORT bool ts_guc_enable_cagg_watermark_constify;
extern bool ts_guc_enable_osm_reads;
extern TSDLLEXPORT bool ts_guc_enable_cagg_sort_pushdown;
extern TSDLLEXPORT bool ts_guc_enable_cagg_watermark_constify;
extern TSDLLEXPORT bool ts_guc_enable_dml_decompression;
extern TSDLLEXPORT bool ts_guc_enable_dml_decompression_tuple_filtering;
extern TSDLLEXPORT bool ts_guc_enable_compressed_direct_batch_delete;
Expand Down
2 changes: 1 addition & 1 deletion src/planner/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ timescaledb_planner(Query *parse, const char *query_string, int cursor_opts,
preprocess_query((Node *) parse, &context);

if (ts_guc_enable_optimizations)
ts_cm_functions->preprocess_query_tsl(parse);
ts_cm_functions->preprocess_query_tsl(parse, &cursor_opts);
}

if (prev_planner_hook != NULL)
Expand Down
98 changes: 98 additions & 0 deletions tsl/src/continuous_aggs/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -355,3 +355,101 @@ constify_cagg_watermark(Query *parse)
if (context.valid_query)
replace_watermark_with_const(&context);
}

/*
* Push down ORDER BY and LIMIT into subqueries of UNION for realtime
* continuous aggregates when sorting by time.
*/
void
cagg_sort_pushdown(Query *parse, int *cursor_opts)
{
ListCell *lc;

/* Nothing to do if we have no valid sort clause */
if (list_length(parse->rtable) != 1 || list_length(parse->sortClause) != 1 ||
!OidIsValid(linitial_node(SortGroupClause, parse->sortClause)->sortop))
return;

Cache *cache = ts_hypertable_cache_pin();

foreach (lc, parse->rtable)
{
RangeTblEntry *rte = lfirst(lc);

/*
* Realtime cagg view will have 2 rtable entries, one for the materialized data and one for
* the not yet materialized data.
*/
if (rte->rtekind != RTE_SUBQUERY || rte->relkind != RELKIND_VIEW ||
list_length(rte->subquery->rtable) != 2)
continue;

ContinuousAgg *cagg = ts_continuous_agg_find_by_relid(rte->relid);

/*
* This optimization only applies to realtime caggs.
*/
if (!cagg || !cagg->data.finalized || cagg->data.materialized_only)
continue;

Hypertable *ht = ts_hypertable_cache_get_entry_by_id(cache, cagg->data.mat_hypertable_id);
Dimension const *dim = hyperspace_get_open_dimension(ht->space, 0);

/* We should only encounter hypertables with an open dimension */
if (!dim)
continue;

SortGroupClause *sort = linitial_node(SortGroupClause, parse->sortClause);
TargetEntry *tle = get_sortgroupref_tle(sort->tleSortGroupRef, parse->targetList);

/*
* We only pushdown ORDER BY when it's single column
* ORDER BY on the time column.
*/
AttrNumber time_col = dim->column_attno;
if (!IsA(tle->expr, Var) || castNode(Var, tle->expr)->varattno != time_col)
continue;

RangeTblEntry *mat_rte = linitial_node(RangeTblEntry, rte->subquery->rtable);
RangeTblEntry *rt_rte = lsecond_node(RangeTblEntry, rte->subquery->rtable);

mat_rte->subquery->sortClause = list_copy(parse->sortClause);
rt_rte->subquery->sortClause = list_copy(parse->sortClause);

TargetEntry *mat_tle = list_nth(mat_rte->subquery->targetList, time_col - 1);
TargetEntry *rt_tle = list_nth(rt_rte->subquery->targetList, time_col - 1);
linitial_node(SortGroupClause, mat_rte->subquery->sortClause)->tleSortGroupRef =
mat_tle->ressortgroupref;
linitial_node(SortGroupClause, rt_rte->subquery->sortClause)->tleSortGroupRef =
rt_tle->ressortgroupref;

SortGroupClause *cagg_group = linitial(rt_rte->subquery->groupClause);
cagg_group = list_nth(rt_rte->subquery->groupClause, rt_tle->ressortgroupref - 1);
cagg_group->sortop = sort->sortop;
cagg_group->nulls_first = sort->nulls_first;

Oid placeholder;
int16 strategy;
get_ordering_op_properties(sort->sortop, &placeholder, &placeholder, &strategy);

/*
* If this is DESC order and the sortop is the commutator of the cagg_group sortop,
* we can align the sortop of the cagg_group with the sortop of the sort clause, which
* will allow us to have the GroupAggregate node to produce the correct order and avoid
* having to resort.
*/
if (strategy == BTGreaterStrategyNumber)
{
rte->subquery->rtable = list_make2(rt_rte, mat_rte);
}

/*
* We have to prevent parallelism when we do this optimization because
* the subplans of the Append have to be processed sequentially.
*/
*cursor_opts = *cursor_opts & ~CURSOR_OPT_PARALLEL_OK;
parse->sortClause = NIL;
rte->subquery->sortClause = NIL;
}
ts_cache_release(cache);
}
1 change: 1 addition & 0 deletions tsl/src/continuous_aggs/planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@
#include "planner/planner.h"

void constify_cagg_watermark(Query *parse);
void cagg_sort_pushdown(Query *parse, int *cursor_opts);

#endif
8 changes: 7 additions & 1 deletion tsl/src/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ tsl_set_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntr
* Run preprocess query optimizations
*/
void
tsl_preprocess_query(Query *parse)
tsl_preprocess_query(Query *parse, int *cursor_opts)
{
Assert(parse != NULL);

Expand All @@ -220,6 +220,12 @@ tsl_preprocess_query(Query *parse)
{
constify_cagg_watermark(parse);
}

/* Push down ORDER BY and LIMIT for realtime cagg */
if (ts_guc_enable_cagg_sort_pushdown)
{
cagg_sort_pushdown(parse, cursor_opts);
}
}

/*
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ void tsl_create_upper_paths_hook(PlannerInfo *, UpperRelationKind, RelOptInfo *,
void tsl_set_rel_pathlist_query(PlannerInfo *, RelOptInfo *, Index, RangeTblEntry *, Hypertable *);
void tsl_set_rel_pathlist_dml(PlannerInfo *, RelOptInfo *, Index, RangeTblEntry *, Hypertable *);
void tsl_set_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry *rte);
void tsl_preprocess_query(Query *parse);
void tsl_preprocess_query(Query *parse, int *cursor_opts);
void tsl_postprocess_plan(PlannedStmt *stmt);
Loading

0 comments on commit 52b7d00

Please sign in to comment.