Skip to content

Commit

Permalink
Add assertion on DF schema in IT
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Sep 20, 2023
1 parent 3e7b770 commit fb191eb
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class TumbleFunctionSuite extends FlintSuite with Matchers {
}
}

test("should fail if only argument type is wrong") {
test("should fail if argument type is wrong") {
assertThrows[AnalysisException] {
TumbleFunction.functionBuilder(Seq(col("timestamp").expr, lit(10).expr))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ package org.opensearch.flint.spark

import java.sql.Timestamp

import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper

import org.apache.spark.FlintSuite
import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.types.{StructField, StructType, TimestampType}

class FlintSparkWindowingFunctionITSuite extends QueryTest with FlintSuite {

Expand All @@ -22,12 +25,15 @@ class FlintSparkWindowingFunctionITSuite extends QueryTest with FlintSuite {
.toDF("id", "timestamp")

val resultDF = inputDF.selectExpr("TUMBLE(timestamp, '10 minutes') AS window")
val expectedData = Seq(
Row(Row(timestamp("2023-01-01 00:00:00"), timestamp("2023-01-01 00:10:00"))),
Row(Row(timestamp("2023-01-01 00:00:00"), timestamp("2023-01-01 00:10:00"))),
Row(Row(timestamp("2023-01-01 00:10:00"), timestamp("2023-01-01 00:20:00"))))

checkAnswer(resultDF, expectedData)
resultDF.schema shouldBe StructType.fromDDL(
"window struct<start:timestamp,end:timestamp> NOT NULL")
checkAnswer(
resultDF,
Seq(
Row(Row(timestamp("2023-01-01 00:00:00"), timestamp("2023-01-01 00:10:00"))),
Row(Row(timestamp("2023-01-01 00:00:00"), timestamp("2023-01-01 00:10:00"))),
Row(Row(timestamp("2023-01-01 00:10:00"), timestamp("2023-01-01 00:20:00")))))
}

private def timestamp(ts: String): Timestamp = Timestamp.valueOf(ts)
Expand Down

0 comments on commit fb191eb

Please sign in to comment.