window
is a standard function that generates tumbling, sliding or delayed stream time window ranges (on a timestamp column).
window(
timeColumn: Column,
windowDuration: String): Column // (1)
window(
timeColumn: Column,
windowDuration: String,
slideDuration: String): Column // (2)
window(
timeColumn: Column,
windowDuration: String,
slideDuration: String,
startTime: String): Column // (3)
-
Creates a tumbling time window with
slideDuration
aswindowDuration
and0 second
forstartTime
-
Creates a sliding time window with
0 second
forstartTime
-
Creates a delayed time window
Note
|
|
Note
|
|
scala> val timeColumn = window($"time", "5 seconds")
timeColumn: org.apache.spark.sql.Column = timewindow(time, 5000000, 5000000, 0) AS `window`
timeColumn
should be of TimestampType
, i.e. with java.sql.Timestamp values.
Tip
|
Use java.sql.Timestamp.from or java.sql.Timestamp.valueOf factory methods to create Timestamp instances.
|
// https://docs.oracle.com/javase/8/docs/api/java/time/LocalDateTime.html
import java.time.LocalDateTime
// https://docs.oracle.com/javase/8/docs/api/java/sql/Timestamp.html
import java.sql.Timestamp
val levels = Seq(
// (year, month, dayOfMonth, hour, minute, second)
((2012, 12, 12, 12, 12, 12), 5),
((2012, 12, 12, 12, 12, 14), 9),
((2012, 12, 12, 13, 13, 14), 4),
((2016, 8, 13, 0, 0, 0), 10),
((2017, 5, 27, 0, 0, 0), 15)).
map { case ((yy, mm, dd, h, m, s), a) => (LocalDateTime.of(yy, mm, dd, h, m, s), a) }.
map { case (ts, a) => (Timestamp.valueOf(ts), a) }.
toDF("time", "level")
scala> levels.show
+-------------------+-----+
| time|level|
+-------------------+-----+
|2012-12-12 12:12:12| 5|
|2012-12-12 12:12:14| 9|
|2012-12-12 13:13:14| 4|
|2016-08-13 00:00:00| 10|
|2017-05-27 00:00:00| 15|
+-------------------+-----+
val q = levels.select(window($"time", "5 seconds"), $"level")
scala> q.show(truncate = false)
+---------------------------------------------+-----+
|window |level|
+---------------------------------------------+-----+
|[2012-12-12 12:12:10.0,2012-12-12 12:12:15.0]|5 |
|[2012-12-12 12:12:10.0,2012-12-12 12:12:15.0]|9 |
|[2012-12-12 13:13:10.0,2012-12-12 13:13:15.0]|4 |
|[2016-08-13 00:00:00.0,2016-08-13 00:00:05.0]|10 |
|[2017-05-27 00:00:00.0,2017-05-27 00:00:05.0]|15 |
+---------------------------------------------+-----+
scala> q.printSchema
root
|-- window: struct (nullable = true)
| |-- start: timestamp (nullable = true)
| |-- end: timestamp (nullable = true)
|-- level: integer (nullable = false)
// calculating the sum of levels every 5 seconds
val sums = levels.
groupBy(window($"time", "5 seconds")).
agg(sum("level") as "level_sum").
select("window.start", "window.end", "level_sum")
scala> sums.show
+-------------------+-------------------+---------+
| start| end|level_sum|
+-------------------+-------------------+---------+
|2012-12-12 13:13:10|2012-12-12 13:13:15| 4|
|2012-12-12 12:12:10|2012-12-12 12:12:15| 14|
|2016-08-13 00:00:00|2016-08-13 00:00:05| 10|
|2017-05-27 00:00:00|2017-05-27 00:00:05| 15|
+-------------------+-------------------+---------+
windowDuration
and slideDuration
are strings specifying the width of the window for duration and sliding identifiers, respectively.
Tip
|
Use CalendarInterval for valid window identifiers.
|
There are a couple of rules governing the durations:
-
The window duration must be greater than 0
-
The slide duration must be greater than 0.
-
The start time must be greater than or equal to 0.
-
The slide duration must be less than or equal to the window duration.
-
The start time must be less than the slide duration.
Note
|
Only one window expression is supported in a query.
|
Note
|
null values are filtered out in window expression.
|
Internally, window
creates a Column with TimeWindow
Catalyst expression under window
alias.
scala> val timeColumn = window($"time", "5 seconds")
timeColumn: org.apache.spark.sql.Column = timewindow(time, 5000000, 5000000, 0) AS `window`
val windowExpr = timeColumn.expr
scala> println(windowExpr.numberedTreeString)
00 timewindow('time, 5000000, 5000000, 0) AS window#23
01 +- timewindow('time, 5000000, 5000000, 0)
02 +- 'time
Internally, TimeWindow
Catalyst expression is simply a struct type with two fields, i.e. start
and end
, both of TimestampType
type.
scala> println(windowExpr.dataType)
StructType(StructField(start,TimestampType,true), StructField(end,TimestampType,true))
scala> println(windowExpr.dataType.prettyJson)
{
"type" : "struct",
"fields" : [ {
"name" : "start",
"type" : "timestamp",
"nullable" : true,
"metadata" : { }
}, {
"name" : "end",
"type" : "timestamp",
"nullable" : true,
"metadata" : { }
} ]
}
Note
|
Find more about the Spark SQL logical query plan analyzer in Mastering Apache Spark 2 gitbook. |
Note
|
The example is borrowed from Introducing Stream Windows in Apache Flink. |
The example shows how to use window
function to model a traffic sensor that counts every 15 seconds the number of vehicles passing a certain location.