Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Window function #884

Merged
merged 10 commits into from
Nov 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Adds support for collection aggregation functions in the EvaluatingCompiler and experimental planner
- Adds support for the syntactic sugar of using aggregations functions in place of their collection aggregation function
counterparts (in the experimental planner)
- Experimental implementation for window function `Lag` and `Lead`.

### Changed
- Now `CompileOption` uses `TypedOpParameter.HONOR_PARAMETERS` as default.
Expand Down
32 changes: 32 additions & 0 deletions lang/antlr/PartiQL.g4
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,25 @@ groupAlias
groupKey
: key=exprSelect (AS symbolPrimitive)?;

/**
*
* Window Function
* TODO: Remove from experimental once https://github.com/partiql/partiql-docs/issues/31 is resolved and a RFC is approved
*
*/

over
: OVER PAREN_LEFT windowPartitionList? windowSortSpecList? PAREN_RIGHT
;

windowPartitionList
: PARTITION BY expr (COMMA expr)*
;

windowSortSpecList
: ORDER BY orderSortSpec (COMMA orderSortSpec)*
;

/**
*
* SIMPLE CLAUSES
Expand Down Expand Up @@ -526,6 +545,7 @@ exprPrimary
| caseExpr # ExprPrimaryBase
| valueList # ExprPrimaryBase
| values # ExprPrimaryBase
| windowFunction # ExprPrimaryBase
;

/**
Expand Down Expand Up @@ -574,6 +594,18 @@ aggregate
| func=(COUNT|MAX|MIN|SUM|AVG) PAREN_LEFT setQuantifierStrategy? expr PAREN_RIGHT # AggregateBase
;

// TODO: Remove from experimental once https://github.com/partiql/partiql-docs/issues/31 is resolved and a RFC is approved
/**
*
* Supported Window Functions:
* 1. LAG(expr, [offset [, default]]) OVER([window_partition] window_ordering)
* 2. LEAD(expr, [offset [, default]]) OVER([window_partition] window_ordering)
*
*/
windowFunction
: func=(LAG|LEAD) PAREN_LEFT expr ( COMMA expr (COMMA expr)?)? PAREN_RIGHT over #LagLeadFunction
;

cast
: CAST PAREN_LEFT expr AS type PAREN_RIGHT;

Expand Down
12 changes: 12 additions & 0 deletions lang/antlr/PartiQLTokens.g4
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,18 @@ WORK: 'WORK';
WRITE: 'WRITE';
ZONE: 'ZONE';


/**
* window related
*/
// TODO: Move the keywords to the corresponding section once https://github.com/partiql/partiql-docs/issues/31 is resolved and a RFC is approved
// i.e. Move OVER/PARTITION to KEYWORDS.
LAG: 'LAG';
LEAD: 'LEAD';
OVER: 'OVER';
PARTITION: 'PARTITION';


/**
* OTHER
*/
Expand Down
31 changes: 29 additions & 2 deletions lang/resources/org/partiql/type-domains/partiql.ion
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ may then be further optimized by selecting better implementations of each operat
(path root::expr steps::(* path_step 1))
(call func_name::symbol args::(* expr 1))
(call_agg setq::set_quantifier func_name::symbol arg::expr)
// TODO: In the feature, when we allow use of aggregation function as window function, we need to consider incorporate in setq
yliuuuu marked this conversation as resolved.
Show resolved Hide resolved
(call_window func_name::symbol over::over args::(* expr 1))
(cast value::expr as_type::type)
(can_cast value::expr as_type::type)
(can_lossless_cast value::expr as_type::type)
Expand Down Expand Up @@ -353,6 +355,18 @@ may then be further optimized by selecting better implementations of each operat
(nulls_last)
)

// Over clause of a window function.
// OVER ([PARTITION BY <expr> [, <expr>]... ] [ORDER BY <sort_spec> [, <sort_spec>]... ])
// TODO: In the future, we need to add support for custom declared frame clause
// TODO: Remove from experimental once https://github.com/partiql/partiql-docs/issues/31 is resolved and a RFC is approved
(product over partition_by::(? window_partition_list ) order_by::(? window_sort_spec_list ) )

// <expr>[, <expr>]...
(product window_partition_list exprs::(* expr 1 ))

// <sort_spec>[, <sort_spec>]...
(product window_sort_spec_list sort_specs::(* sort_spec 1 ))

// Indicates if variable lookup should be case-sensitive or not.
(sum case_sensitivity (case_sensitive) (case_insensitive))

Expand Down Expand Up @@ -564,7 +578,9 @@ may then be further optimized by selecting better implementations of each operat

(with expr
// Remove the select and struct node from the `expr` sum type, which will be replaced below.
(exclude select struct)
// Remove the call_window node from the `expr` sum type, because the call_window contains two parts:
// Window specification and window function, we want to define a stand alone operator for this
(exclude select struct call_window)

// CallAgg's such as SUM, MIN, MAX, etc. either become calls to built-in-functions (example: call COLL_SUM)
// or aggregate functions within the logical aggregate operator.
Expand Down Expand Up @@ -641,6 +657,8 @@ may then be further optimized by selecting better implementations of each operat
)
)

(include (product window_expression decl::var_decl func_name::symbol args::(* expr 1)))

// Now we include new stuff, including PartiQL's relational algebra.
(include
// Every instance of `var_decl` introduces a new binding in the current scope.
Expand Down Expand Up @@ -692,7 +710,13 @@ may then be further optimized by selecting better implementations of each operat
predicate::(? expr)
)

// Converts a bindings collection into a sorted bindings collection.
(window
source:: bexpr
window_specification:: over
window_expression_list:: (* window_expression 1)
)

// Converts a bindings collection into a sorted bindings collection.
(sort source::bexpr sort_specs::(* sort_spec 1))

// Skips `row_count` rows, then emits all remaining rows.
Expand Down Expand Up @@ -899,6 +923,9 @@ may then be further optimized by selecting better implementations of each operat
(offset i::impl row_count::expr source::bexpr)
(limit i::impl row_count::expr source::bexpr)
(let i::impl source::bexpr bindings::(* let_binding 1))
// Notice that the physical window operator contains a list of window expression
// That is because, we want to combine the window functions that are operating on the same window to a single window operator
(window i::impl source:: bexpr window_specification:: over window_expression_list:: (* window_expression 1))
)
)
)
Expand Down
4 changes: 4 additions & 0 deletions lang/src/org/partiql/lang/compiler/PartiQLCompilerBuilder.kt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import org.partiql.lang.eval.physical.operators.RelationalOperatorFactory
import org.partiql.lang.eval.physical.operators.ScanRelationalOperatorFactoryDefault
import org.partiql.lang.eval.physical.operators.SortOperatorFactoryDefault
import org.partiql.lang.eval.physical.operators.UnpivotOperatorFactoryDefault
import org.partiql.lang.eval.physical.operators.WindowRelationalOperatorFactoryDefault
import org.partiql.lang.eval.physical.window.ExperimentalWindowFunc
import org.partiql.lang.planner.EvaluatorOptions
import org.partiql.lang.types.CustomType

Expand Down Expand Up @@ -83,6 +85,8 @@ class PartiQLCompilerBuilder private constructor() {
OffsetRelationalOperatorFactoryDefault,
LimitRelationalOperatorFactoryDefault,
LetRelationalOperatorFactoryDefault,
@OptIn(ExperimentalWindowFunc::class)
WindowRelationalOperatorFactoryDefault
)

@JvmStatic
Expand Down
10 changes: 10 additions & 0 deletions lang/src/org/partiql/lang/errors/ErrorCode.kt
Original file line number Diff line number Diff line change
Expand Up @@ -1198,6 +1198,16 @@ enum class ErrorCode(
"Union type not permitted"
),

PARSE_EXPECTED_WINDOW_ORDER_BY(
ErrorCategory.PARSER,
LOC_TOKEN,
"Expect ORDER BY in window specification"
) {
override fun getErrorMessage(errorContext: PropertyValueMap?): String {
return "Expect ORDER BY in window specification"
}
},

// Generic errors
UNIMPLEMENTED_FEATURE(
ErrorCategory.SEMANTIC,
Expand Down
1 change: 1 addition & 0 deletions lang/src/org/partiql/lang/eval/EvaluatingCompiler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ internal class EvaluatingCompiler(
is PartiqlAst.Expr.BagOp -> compileBagOp(expr, metas)

is PartiqlAst.Expr.GraphMatch -> TODO("Compilation of GraphMatch expression")
is PartiqlAst.Expr.CallWindow -> TODO("Evaluating Compiler doesn't support window function")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.partiql.lang.eval.physical.operators.AggregateOperatorFactory
import org.partiql.lang.eval.physical.operators.CompiledAggregateFunction
import org.partiql.lang.eval.physical.operators.CompiledGroupKey
import org.partiql.lang.eval.physical.operators.CompiledSortKey
import org.partiql.lang.eval.physical.operators.CompiledWindowFunction
import org.partiql.lang.eval.physical.operators.FilterRelationalOperatorFactory
import org.partiql.lang.eval.physical.operators.JoinRelationalOperatorFactory
import org.partiql.lang.eval.physical.operators.LetRelationalOperatorFactory
Expand All @@ -25,7 +26,10 @@ import org.partiql.lang.eval.physical.operators.RelationalOperatorKind
import org.partiql.lang.eval.physical.operators.ScanRelationalOperatorFactory
import org.partiql.lang.eval.physical.operators.SortOperatorFactory
import org.partiql.lang.eval.physical.operators.UnpivotOperatorFactory
import org.partiql.lang.eval.physical.operators.WindowRelationalOperatorFactory
import org.partiql.lang.eval.physical.operators.valueExpression
import org.partiql.lang.eval.physical.window.ExperimentalWindowFunc
import org.partiql.lang.eval.physical.window.createBuiltinWindowFunction
import org.partiql.lang.util.toIntExact

/** A specialization of [Thunk] that we use for evaluation of physical plans. */
Expand Down Expand Up @@ -290,6 +294,38 @@ internal class PhysicalBexprToThunkConverter(
val value = exprConverter.convert(spec.expr).toValueExpr(spec.expr.metas.sourceLocationMeta)
CompiledSortKey(comp, value)
}

// TODO: Remove from experimental once https://github.com/partiql/partiql-docs/issues/31 is resolved and a RFC is approved
@ExperimentalWindowFunc
override fun convertWindow(node: PartiqlPhysical.Bexpr.Window): RelationThunkEnv {
val source = this.convert(node.source)

val windowPartitionList = node.windowSpecification.partitionBy

val windowSortSpecList = node.windowSpecification.orderBy

val compiledPartitionBy = windowPartitionList?.exprs?.map {
exprConverter.convert(it).toValueExpr(it.metas.sourceLocationMeta)
} ?: emptyList()

val compiledOrderBy = windowSortSpecList?.sortSpecs?.let { compileSortSpecs(it) } ?: emptyList()

val compiledWindowFunctions = node.windowExpressionList.map { windowExpression ->
CompiledWindowFunction(
createBuiltinWindowFunction(windowExpression.funcName.text),
windowExpression.args.map { exprConverter.convert(it).toValueExpr(it.metas.sourceLocationMeta) },
windowExpression.decl
)
}

// locate operator factory
val factory = findOperatorFactory<WindowRelationalOperatorFactory>(RelationalOperatorKind.WINDOW, node.i.name.text)

// create operator implementation
val bindingsExpr = factory.create(source, compiledPartitionBy, compiledOrderBy, compiledWindowFunctions)
// wrap in thunk
return bindingsExpr.toRelationThunk(node.metas)
}
}

private fun PartiqlPhysical.Expr.isLitTrue() =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,6 @@ internal class PhysicalPlanCompilerImpl(
is PartiqlPhysical.Expr.BagOp -> compileBagOp(expr, metas)
is PartiqlPhysical.Expr.BindingsToValues -> compileBindingsToValues(expr)
is PartiqlPhysical.Expr.Pivot -> compilePivot(expr, metas)

is PartiqlPhysical.Expr.GraphMatch -> TODO("Physical compilation of GraphMatch expression")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ enum class RelationalOperatorKind {
UNPIVOT,
FILTER,
JOIN,
WINDOW,
OFFSET,
LIMIT,
LET,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ internal class SortOperatorDefault(private val sortKeys: List<CompiledSortKey>,
* Returns a [Comparator] that compares arrays of registers by using un-evaluated sort keys. It does this by modifying
* the [state] to allow evaluation of the [sortKeys]
*/
private fun getSortingComparator(sortKeys: List<CompiledSortKey>, state: EvaluatorState): Comparator<Array<ExprValue>> {
internal fun getSortingComparator(sortKeys: List<CompiledSortKey>, state: EvaluatorState): Comparator<Array<ExprValue>> {
val initial: Comparator<Array<ExprValue>>? = null
return sortKeys.interruptibleFold(initial) { intermediate, sortKey ->
if (intermediate == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.partiql.lang.eval.physical.operators

import org.partiql.lang.domains.PartiqlPhysical
import org.partiql.lang.eval.physical.SetVariableFunc
import org.partiql.lang.eval.physical.window.ExperimentalWindowFunc
import org.partiql.lang.eval.physical.window.WindowFunction

// TODO: Remove from experimental once https://github.com/partiql/partiql-docs/issues/31 is resolved and a RFC is approved
@ExperimentalWindowFunc
abstract class WindowRelationalOperatorFactory(name: String) : RelationalOperatorFactory {

final override val key: RelationalOperatorFactoryKey = RelationalOperatorFactoryKey(RelationalOperatorKind.WINDOW, name)

/** Creates a [RelationExpression] instance for [PartiqlPhysical.Bexpr.Window]. */
abstract fun create(
source: RelationExpression,
windowPartitionList: List<ValueExpression>,
windowSortSpecList: List<CompiledSortKey>,
compiledWindowFunctions: List<CompiledWindowFunction>

): RelationExpression
}

@ExperimentalWindowFunc
class CompiledWindowFunction(
val func: WindowFunction,
val parameters: List<ValueExpression>,
/**
* This is [PartiqlPhysical.VarDecl] instead of [SetVariableFunc] because we would like to access the index of variable in the register
* when processing rows within the partition.
*/
val windowVarDecl: PartiqlPhysical.VarDecl
)
Loading