Skip to content

Commit

Permalink
Add tumble function and IT
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Sep 19, 2023
1 parent 7434e5a commit 267afed
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.flint.spark

import org.opensearch.flint.spark.function.TumbleFunction
import org.opensearch.flint.spark.sql.FlintSparkSqlParser

import org.apache.spark.sql.SparkSessionExtensions
Expand All @@ -18,6 +19,9 @@ class FlintSparkExtensions extends (SparkSessionExtensions => Unit) {
extensions.injectParser { (spark, parser) =>
new FlintSparkSqlParser(parser)
}

extensions.injectFunction(TumbleFunction.description)

extensions.injectOptimizerRule { spark =>
new FlintSparkOptimizer(spark)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.function

import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
import org.apache.spark.sql.functions.window

/**
* Tumble windowing function that groups row into fixed interval window without overlap.
*/
object TumbleFunction {

val identifier: FunctionIdentifier = FunctionIdentifier("tumble")

val exprInfo: ExpressionInfo = new ExpressionInfo(classOf[Column].getCanonicalName, "window")

val functionBuilder: Seq[Expression] => Expression =
(children: Seq[Expression]) => {
// Delegate actual implementation to window() function
val timeColumn = children.head
val windowDuration = children(1)
window(new Column(timeColumn), windowDuration.toString()).expr
}

/**
* Function description to register current function to Spark extension.
*/
val description: (FunctionIdentifier, ExpressionInfo, FunctionBuilder) =
(identifier, exprInfo, functionBuilder)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.function

import org.scalatest.matchers.should.Matchers

import org.apache.spark.FlintSuite

class TumbleFunctionSuite extends FlintSuite with Matchers {

test("should require both column name and window expression as arguments") {
// TumbleFunction.functionBuilder(AttributeReference())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark

import java.sql.Timestamp

import org.apache.spark.FlintSuite
import org.apache.spark.sql.{QueryTest, Row}

class FlintSparkWindowingFunctionITSuite extends QueryTest with FlintSuite {

test("tumble windowing function") {
val inputDF = spark
.createDataFrame(
Seq(
(1L, "2023-01-01 00:00:00"),
(2L, "2023-01-01 00:09:00"),
(3L, "2023-01-01 00:15:00")))
.toDF("id", "timestamp")

val resultDF = inputDF.selectExpr("TUMBLE(timestamp, '10 minutes') AS window")
val expectedData = Seq(
Row(Row(timestamp("2023-01-01 00:00:00"), timestamp("2023-01-01 00:10:00"))),
Row(Row(timestamp("2023-01-01 00:00:00"), timestamp("2023-01-01 00:10:00"))),
Row(Row(timestamp("2023-01-01 00:10:00"), timestamp("2023-01-01 00:20:00"))))

checkAnswer(resultDF, expectedData)
}

private def timestamp(ts: String): Timestamp = Timestamp.valueOf(ts)
}

0 comments on commit 267afed

Please sign in to comment.