Skip to content

Commit

Permalink
row_number aggregate window function
Browse files Browse the repository at this point in the history
  • Loading branch information
jaceklaskowski committed Apr 8, 2024
1 parent 119f7fb commit b04bf9c
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 54 deletions.
18 changes: 17 additions & 1 deletion docs/expressions/AggregateWindowFunction.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,21 @@

* `NthValue`
* `RankLike`
* `RowNumberLike`
* [RowNumberLike](RowNumberLike.md)
* `SizeBasedWindowFunction`

## Frame { #frame }

??? note "WindowFunction"

```scala
frame: WindowFrame
```

`frame` is part of the [WindowFunction](WindowFunction.md#frame) abstraction.

`frame` is a `SpecifiedWindowFrame` with the following:

* `RowFrame` type
* `UnboundedPreceding` lower expression
* `CurrentRow` lower expression
7 changes: 6 additions & 1 deletion docs/expressions/DeclarativeAggregate.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,18 @@ title: DeclarativeAggregate

## Contract

### <span id="evaluateExpression"> evaluateExpression
### Evaluating Expression { #evaluateExpression }

```scala
evaluateExpression: Expression
```

Catalyst [Expression](Expression.md) to calculate the final value of this aggregate function

See:

* [RowNumber](RowNumber.md#evaluateExpression)

Used when:

* `EliminateAggregateFilter` logical optimization is executed
Expand Down Expand Up @@ -73,6 +77,7 @@ Catalyst [Expression](Expression.md)s to update the mutable aggregation buffer b
See:

* [Count](Count.md#updateExpressions)
* [RowNumber](RowNumber.md#updateExpressions)
* [SimpleTypedAggregateExpression](SimpleTypedAggregateExpression.md#updateExpressions)

Used when:
Expand Down
35 changes: 35 additions & 0 deletions docs/expressions/RowNumber.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
---
title: RowNumber
---

# RowNumber Aggregate Window Leaf Expression

`RowNumber` is a [RowNumberLike](RowNumberLike.md) leaf expression known as [row_number](#prettyName).

## Evaluating Expression { #evaluateExpression }

??? note "DeclarativeAggregate"

```scala
evaluateExpression: AttributeReference
```

`evaluateExpression` is part of the [DeclarativeAggregate](DeclarativeAggregate.md#evaluateExpression) abstraction.

`evaluateExpression` is the [rowNumber](RowNumberLike.md#rowNumber) attribute reference.

## Pretty Name { #prettyName }

??? note "Expression"

```scala
prettyName
```

`prettyName` is part of the [Expression](Expression.md#prettyName) abstraction.

`prettyName` is the following text:

```text
row_number
```
49 changes: 49 additions & 0 deletions docs/expressions/RowNumberLike.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
---
title: RowNumberLike
---

# RowNumberLike Aggregate Window Function Expressions

`RowNumberLike` is an extension of the [AggregateWindowFunction](AggregateWindowFunction.md) abstraction for [aggregate window function leaf expressions](#implementations).

## Implementations

* `CumeDist`
* `NTile`
* [RowNumber](RowNumber.md)

## rowNumber Attribute Reference { #rowNumber }

```scala
rowNumber: AttributeReference
```

`rowNumber` is an `AttributeReference` with the following properties:

* `rowNumber` name
* `IntegerType` data type
* [nullable](Expression.md#nullable) disabled (`false`)

## Aggregation Buffer Attributes { #aggBufferAttributes }

??? note "AggregateFunction"

```scala
aggBufferAttributes: Seq[AttributeReference]
```

`aggBufferAttributes` is part of the [AggregateFunction](AggregateFunction.md#aggBufferAttributes) abstraction.

`aggBufferAttributes` is a collection with the [rowNumber](#rowNumber) attribute reference.

## Update Expressions { #updateExpressions }

??? note "DeclarativeAggregate"

```scala
updateExpressions: Seq[Expression]
```

`updateExpressions` is part of the [DeclarativeAggregate](DeclarativeAggregate.md#updateExpressions) abstraction.

`updateExpressions` is a collection with the [rowNumber](#rowNumber) attribute reference incremented (by 1).
10 changes: 8 additions & 2 deletions docs/expressions/WindowFunction.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,19 @@
1. `ResolveWindowOrder`
1. [ExtractWindowExpressions](../logical-analysis-rules/ExtractWindowExpressions.md)

## <span id="frame"> WindowFrame
## WindowFrame { #frame }

```scala
frame: WindowFrame
```

`frame` is `UnspecifiedFrame` by default.
`frame` is an `UnspecifiedFrame` by default.

See:

* [AggregateWindowFunction](AggregateWindowFunction.md#frame)

---

`frame` is used when:

Expand Down
100 changes: 50 additions & 50 deletions docs/standard-functions/windows-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,64 @@
title: Window
---

# Standard Functions for Window Aggregation (Window Functions)
# Aggregate Window Functions

**Window aggregate functions** (aka **window functions** or **windowed aggregates**) are functions that perform a calculation over a group of records called **window** that are in _some_ relation to the current record (i.e. can be in the same partition or frame as the current row).
**Aggregate Window Functions** (aka **window functions** or **windowed aggregates**) are functions that perform a calculation over a group of records called **window** that are in _some_ relation to the current record (i.e. can be in the same partition or frame as the current row).

In other words, when executed, a window function computes a value for each and every row in a window (per [window specification](../window-functions/WindowSpec.md)).

Window functions are also called **over functions** due to how they are applied using [over](../Column.md#over) operator.

Spark SQL supports three kinds of window functions:

* _ranking_ functions
* _analytic_ functions
* _aggregate_ functions
* [Aggregate functions](aggregate-functions.md)
* Analytic functions
* [Ranking functions](#ranking-functions)

## Ranking Functions

### row_number { #row_number }

```scala
row_number(): Column
```

`row_number` assigns a unique, sequential number to each row within a window partition according to the ordering of rows (starting from 1).

Internally, `row_number` creates a [RowNumber](../expressions/RowNumber.md) aggregate window leaf expression.

```text
val buckets = spark.range(5).withColumn("bucket", 'id % 3)
// Make duplicates
val input = buckets.union(buckets)
```

```scala
import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy('bucket).orderBy('id)
```

```scala
val q = input.withColumn("row_number", row_number() over windowSpec)
```

```text
scala> q.show
+---+------+----------+
| id|bucket|row_number|
+---+------+----------+
| 0| 0| 1|
| 0| 0| 2|
| 3| 0| 3|
| 3| 0| 4|
| 1| 1| 1|
| 1| 1| 2|
| 4| 1| 3|
| 4| 1| 4|
| 2| 2| 1|
| 2| 2| 2|
+---+------+----------+
```

<!---
## Review Me
Expand All @@ -40,9 +85,6 @@ Spark SQL supports three kinds of window functions:
| <<ntile, ntile>>
|
| <<row_number, row_number>>
|
.3+^.^| *Analytic functions*
| <<cume_dist, cume_dist>>
Expand Down Expand Up @@ -718,48 +760,6 @@ scala> dataset.withColumn("cume_dist", cume_dist over windowSpec).show
+---+------+------------------+
----
=== [[row_number]] Sequential numbering per window partition -- `row_number` Window Function
[source, scala]
----
row_number(): Column
----
`row_number` returns a sequential number starting at `1` within a window partition.
[source, scala]
----
val buckets = spark.range(9).withColumn("bucket", 'id % 3)
// Make duplicates
val dataset = buckets.union(buckets)
import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy('bucket).orderBy('id)
scala> dataset.withColumn("row_number", row_number() over windowSpec).show
+---+------+----------+
| id|bucket|row_number|
+---+------+----------+
| 0| 0| 1|
| 0| 0| 2|
| 3| 0| 3|
| 3| 0| 4|
| 6| 0| 5|
| 6| 0| 6|
| 1| 1| 1|
| 1| 1| 2|
| 4| 1| 3|
| 4| 1| 4|
| 7| 1| 5|
| 7| 1| 6|
| 2| 2| 1|
| 2| 2| 2|
| 5| 2| 3|
| 5| 2| 4|
| 8| 2| 5|
| 8| 2| 6|
+---+------+----------+
----
=== [[ntile]] `ntile` Window Function
[source, scala]
Expand Down
2 changes: 2 additions & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,8 @@ nav:
- Predicate: expressions/Predicate.md
- Projection: expressions/Projection.md
- expressions/PythonUDF.md
- RowNumber: expressions/RowNumber.md
- RowNumberLike: expressions/RowNumberLike.md
- RowOrdering: expressions/RowOrdering.md
- RuntimeReplaceable: expressions/RuntimeReplaceable.md
- ScalaAggregator: expressions/ScalaAggregator.md
Expand Down

0 comments on commit b04bf9c

Please sign in to comment.