diff --git a/docs/index.md b/docs/index.md index 411877c30..288ea8e4f 100644 --- a/docs/index.md +++ b/docs/index.md @@ -234,6 +234,7 @@ User can provide the following options in `WITH` clause of create statement: + `watermark_delay`: a string as time expression for how late data can come and still be processed, e.g. 1 minute, 10 seconds. This is required by incremental refresh on materialized view if it has aggregation in the query. + `output_mode`: a mode string that describes how data will be written to streaming sink. If unspecified, default append mode will be applied. + `index_settings`: a JSON string as index settings for OpenSearch index that will be created. Please follow the format in OpenSearch documentation. If unspecified, default OpenSearch index settings will be applied. ++ `id_expression`: an expression string that generates an ID column to avoid duplicate data when incremental refresh job restart, especially for covering index. If unspecified, an ID column based on source file path and `timestamp` or `@timestamp` column. If not found, no ID column generated and may cause duplicate data when refresh job restart. + `extra_options`: a JSON string as extra options that can be passed to Spark streaming source and sink API directly. Use qualified source table name (because there could be multiple) and "sink", e.g. '{"sink": "{key: val}", "table1": {key: val}}' Note that the index option name is case-sensitive. Here is an example: @@ -246,6 +247,7 @@ WITH ( watermark_delay = '1 Second', output_mode = 'complete', index_settings = '{"number_of_shards": 2, "number_of_replicas": 3}', + id_expression = 'uuid()', extra_options = '{"spark_catalog.default.alb_logs": {"maxFilesPerTrigger": "1"}}' ) ``` diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala index af1e9fa74..779e2eac6 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala @@ -9,14 +9,16 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter import org.opensearch.flint.core.metadata.FlintMetadata -import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{Column, DataFrame, SparkSession} import org.apache.spark.sql.flint.datatype.FlintDataType +import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.StructType /** * Flint index interface in Spark. */ -trait FlintSparkIndex { +trait FlintSparkIndex extends Logging { /** * Index type @@ -79,6 +81,32 @@ object FlintSparkIndex { */ val ID_COLUMN: String = "__id__" + /** + * Generate an ID column in the precedence below: (1) Use ID expression directly if provided in + * index option; (2) SHA-1 based on all aggregated columns if found; (3) SHA-1 based on source + * file path and timestamp column; 4) No ID column generated + * + * @param df + * data frame to generate ID column for + * @param idExpr + * ID expression option + * @return + * optional ID column expression + */ + def generateIdColumn(df: DataFrame, idExpr: Option[String]): Option[Column] = { + def timestampColumn: Option[String] = { + df.columns.toSet.intersect(Set("timestamp", "@timestamp")).headOption + } + + if (idExpr.isDefined) { + Some(expr(idExpr.get)) + } else if (timestampColumn.isDefined) { + Some(sha1(concat(input_file_name(), col(timestampColumn.get)))) + } else { + None + } + } + /** * Common prefix of Flint index name which is "flint_database_table_" * diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala index ffb479b54..2751c8762 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala @@ -8,7 +8,7 @@ package org.opensearch.flint.spark import org.json4s.{Formats, NoTypeHints} import org.json4s.native.JsonMethods._ import org.json4s.native.Serialization -import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, EXTRA_OPTIONS, INDEX_SETTINGS, OptionName, OUTPUT_MODE, REFRESH_INTERVAL, WATERMARK_DELAY} +import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, EXTRA_OPTIONS, ID_EXPRESSION, INDEX_SETTINGS, OptionName, OUTPUT_MODE, REFRESH_INTERVAL, WATERMARK_DELAY} import org.opensearch.flint.spark.FlintSparkIndexOptions.validateOptionNames /** @@ -70,6 +70,14 @@ case class FlintSparkIndexOptions(options: Map[String, String]) { */ def indexSettings(): Option[String] = getOptionValue(INDEX_SETTINGS) + /** + * An expression that generates unique value as source data row ID. + * + * @return + * ID expression + */ + def idExpression(): Option[String] = getOptionValue(ID_EXPRESSION) + /** * Extra streaming source options that can be simply passed to DataStreamReader or * Relation.options @@ -136,6 +144,7 @@ object FlintSparkIndexOptions { val OUTPUT_MODE: OptionName.Value = Value("output_mode") val INDEX_SETTINGS: OptionName.Value = Value("index_settings") val EXTRA_OPTIONS: OptionName.Value = Value("extra_options") + val ID_EXPRESSION: OptionName.Value = Value("id_expression") } /** diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index cdb3a3462..9cd781dcb 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -9,11 +9,12 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.spark._ -import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchemaJSON, metadataBuilder} +import org.opensearch.flint.spark.FlintSparkIndex._ import org.opensearch.flint.spark.FlintSparkIndexOptions.empty import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.{getFlintIndexName, COVERING_INDEX_TYPE} import org.apache.spark.sql._ +import org.apache.spark.sql.execution.SimpleMode /** * Flint covering index in Spark. @@ -59,14 +60,29 @@ case class FlintSparkCoveringIndex( } override def build(spark: SparkSession, df: Option[DataFrame]): DataFrame = { - val colNames = indexedColumns.keys.toSeq - val job = df.getOrElse(spark.read.table(tableName)) + var colNames = indexedColumns.keys.toSeq + var job = df.getOrElse(spark.read.table(tableName)) + + // Add optional ID column + val idColumn = generateIdColumn(job, options.idExpression()) + if (idColumn.isDefined) { + logInfo(s"Generate ID column based on expression $idColumn") + colNames = colNames :+ ID_COLUMN + job = job.withColumn(ID_COLUMN, idColumn.get) + } else { + logWarning("Cannot generate ID column which may cause duplicate data when restart") + } // Add optional filtering condition - filterCondition - .map(job.where) - .getOrElse(job) - .select(colNames.head, colNames.tail: _*) + if (filterCondition.isDefined) { + job = job.where(filterCondition.get) + } + + // Add indexed columns + job = job.select(colNames.head, colNames.tail: _*) + + logInfo(s"Building covering index by " + job.queryExecution.explainString(SimpleMode)) + job } } diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala index b678096ca..1f37c8547 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala @@ -19,6 +19,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers { WATERMARK_DELAY.toString shouldBe "watermark_delay" OUTPUT_MODE.toString shouldBe "output_mode" INDEX_SETTINGS.toString shouldBe "index_settings" + ID_EXPRESSION.toString shouldBe "id_expression" EXTRA_OPTIONS.toString shouldBe "extra_options" } @@ -31,6 +32,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers { "watermark_delay" -> "30 Seconds", "output_mode" -> "complete", "index_settings" -> """{"number_of_shards": 3}""", + "id_expression" -> """sha1(col("timestamp"))""", "extra_options" -> """ { | "alb_logs": { @@ -48,6 +50,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers { options.watermarkDelay() shouldBe Some("30 Seconds") options.outputMode() shouldBe Some("complete") options.indexSettings() shouldBe Some("""{"number_of_shards": 3}""") + options.idExpression() shouldBe Some("""sha1(col("timestamp"))""") options.extraSourceOptions("alb_logs") shouldBe Map("opt1" -> "val1") options.extraSinkOptions() shouldBe Map("opt2" -> "val2", "opt3" -> "val3") } @@ -75,6 +78,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers { options.watermarkDelay() shouldBe empty options.outputMode() shouldBe empty options.indexSettings() shouldBe empty + options.idExpression() shouldBe empty options.extraSourceOptions("alb_logs") shouldBe empty options.extraSinkOptions() shouldBe empty options.optionsWithDefault should contain("auto_refresh" -> "false") diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala index f52e6ef85..ba2868caa 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala @@ -5,15 +5,22 @@ package org.opensearch.flint.spark.covering -import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper +import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN +import org.opensearch.flint.spark.FlintSparkIndexOptions +import org.scalatest.matchers.should.Matchers import org.apache.spark.FlintSuite +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.functions._ -class FlintSparkCoveringIndexSuite extends FlintSuite { +class FlintSparkCoveringIndexSuite extends FlintSuite with Matchers { + + /** Test table name */ + val testTable = "spark_catalog.default.ci_test" test("get covering index name") { val index = - new FlintSparkCoveringIndex("ci", "spark_catalog.default.test", Map("name" -> "string")) + FlintSparkCoveringIndex("ci", "spark_catalog.default.test", Map("name" -> "string")) index.name() shouldBe "flint_spark_catalog_default_test_ci_index" } @@ -32,7 +39,115 @@ class FlintSparkCoveringIndexSuite extends FlintSuite { test("should fail if no indexed column given") { assertThrows[IllegalArgumentException] { - new FlintSparkCoveringIndex("ci", "default.test", Map.empty) + FlintSparkCoveringIndex("ci", "default.test", Map.empty) + } + } + + test("should generate id column based on ID expression in index options") { + withTable(testTable) { + sql(s"CREATE TABLE $testTable (timestamp TIMESTAMP, name STRING) USING JSON") + val index = + FlintSparkCoveringIndex( + "name_idx", + testTable, + Map("name" -> "string"), + options = FlintSparkIndexOptions(Map("id_expression" -> "now()"))) + + assertDataFrameEquals( + index.build(spark, None), + spark + .table(testTable) + .withColumn(ID_COLUMN, expr("now()")) + .select(col("name"), col(ID_COLUMN))) + } + } + + test("should generate id column based on timestamp column if found") { + withTable(testTable) { + sql(s"CREATE TABLE $testTable (timestamp TIMESTAMP, name STRING) USING JSON") + val index = FlintSparkCoveringIndex("name_idx", testTable, Map("name" -> "string")) + + assertDataFrameEquals( + index.build(spark, None), + spark + .table(testTable) + .withColumn(ID_COLUMN, sha1(concat(input_file_name(), col("timestamp")))) + .select(col("name"), col(ID_COLUMN))) + } + } + + test("should generate id column based on @timestamp column if found") { + withTable(testTable) { + sql(s"CREATE TABLE $testTable (`@timestamp` TIMESTAMP, name STRING) USING JSON") + val index = FlintSparkCoveringIndex("name_idx", testTable, Map("name" -> "string")) + + assertDataFrameEquals( + index.build(spark, None), + spark + .table(testTable) + .withColumn(ID_COLUMN, sha1(concat(input_file_name(), col("@timestamp")))) + .select(col("name"), col(ID_COLUMN))) } } + + test("should not generate id column if no ID expression or timestamp column") { + withTable(testTable) { + sql(s"CREATE TABLE $testTable (name STRING, age INTEGER) USING JSON") + val index = FlintSparkCoveringIndex("name_idx", testTable, Map("name" -> "string")) + + assertDataFrameEquals( + index.build(spark, None), + spark + .table(testTable) + .select(col("name"))) + } + } + + test("should generate id column if micro batch has timestamp column") { + withTable(testTable) { + sql(s"CREATE TABLE $testTable (timestamp TIMESTAMP, name STRING) USING JSON") + val index = FlintSparkCoveringIndex("name_idx", testTable, Map("name" -> "string")) + val batch = spark.read.table(testTable).select("timestamp", "name") + + assertDataFrameEquals( + index.build(spark, Some(batch)), + batch + .withColumn(ID_COLUMN, sha1(concat(input_file_name(), col("timestamp")))) + .select(col("name"), col(ID_COLUMN))) + } + } + + test("should not generate id column if micro batch doesn't have timestamp column") { + withTable(testTable) { + sql(s"CREATE TABLE $testTable (timestamp TIMESTAMP, name STRING) USING JSON") + val index = FlintSparkCoveringIndex("name_idx", testTable, Map("name" -> "string")) + val batch = spark.read.table(testTable).select("name") + + assertDataFrameEquals(index.build(spark, Some(batch)), batch.select(col("name"))) + } + } + + test("should build with filtering condition") { + withTable(testTable) { + sql(s"CREATE TABLE $testTable (timestamp TIMESTAMP, name STRING) USING JSON") + val index = FlintSparkCoveringIndex( + "name_idx", + testTable, + Map("name" -> "string"), + Some("name = 'test'")) + + assertDataFrameEquals( + index.build(spark, None), + spark + .table(testTable) + .withColumn(ID_COLUMN, sha1(concat(input_file_name(), col("timestamp")))) + .where("name = 'test'") + .select(col("name"), col(ID_COLUMN))) + } + } + + /* Assert unresolved logical plan in DataFrame equals without semantic analysis */ + private def assertDataFrameEquals(df1: DataFrame, df2: DataFrame): Unit = { + comparePlans(df1.queryExecution.logical, df2.queryExecution.logical, checkAnalysis = false) + } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala index 27419b616..ea1ea5029 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala @@ -15,7 +15,7 @@ import org.opensearch.flint.core.FlintOptions import org.opensearch.flint.core.storage.FlintOpenSearchClient import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName -import org.scalatest.matchers.must.Matchers.{defined, have} +import org.scalatest.matchers.must.Matchers.defined import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the} import org.apache.spark.sql.Row @@ -24,14 +24,17 @@ import org.apache.spark.sql.flint.config.FlintSparkConf.CHECKPOINT_MANDATORY class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { /** Test table and index name */ - private val testTable = "spark_catalog.default.covering_sql_test" private val testIndex = "name_and_age" + private val testTable = "spark_catalog.default.covering_sql_test" private val testFlintIndex = getFlintIndexName(testIndex, testTable) + private val testTimeSeriesTable = "spark_catalog.default.ci_time_test" + private val testFlintTimeSeriesIndex = getFlintIndexName(testIndex, testTimeSeriesTable) override def beforeAll(): Unit = { super.beforeAll() createPartitionedTable(testTable) + createTimeSeriesTable(testTimeSeriesTable) } override def afterEach(): Unit = { @@ -39,6 +42,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { // Delete all test indices flint.deleteIndex(testFlintIndex) + flint.deleteIndex(testFlintTimeSeriesIndex) } test("create covering index with auto refresh") { @@ -211,7 +215,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { |""".stripMargin) } - test("create skipping index with quoted index, table and column name") { + test("create covering index with quoted index, table and column name") { sql(s""" | CREATE INDEX `$testIndex` ON `spark_catalog`.`default`.`covering_sql_test` | (`name`, `age`) @@ -226,6 +230,72 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { metadata.indexedColumns.map(_.asScala("columnName")) shouldBe Seq("name", "age") } + test("create covering index on time series time with timestamp column") { + sql(s""" + | CREATE INDEX $testIndex ON $testTimeSeriesTable + | (timestamp, age) + | WITH (auto_refresh = true) + |""".stripMargin) + + val job = spark.streams.active.find(_.name == testFlintTimeSeriesIndex) + awaitStreamingComplete(job.get.id.toString) + + val indexData = flint.queryIndex(testFlintTimeSeriesIndex) + indexData.count() shouldBe 5 + } + + test("create covering index on time series time without indexing timestamp column") { + sql(s""" + | CREATE INDEX $testIndex ON $testTimeSeriesTable + | (name) + | WITH (auto_refresh = true) + |""".stripMargin) + + val job = spark.streams.active.find(_.name == testFlintTimeSeriesIndex) + awaitStreamingComplete(job.get.id.toString) + + val indexData = flint.queryIndex(testFlintTimeSeriesIndex) + indexData.count() shouldBe 5 + } + + test("create covering index on time series time with @timestamp column") { + val testTimeSeriesTable2 = "spark_catalog.default.ci_time_table2" + val testFlintTimeSeriesIndex2 = getFlintIndexName(testIndex, testTimeSeriesTable2) + withTable(testTimeSeriesTable2) { + sql(s"CREATE TABLE $testTimeSeriesTable2 (`@timestamp` TIMESTAMP, name STRING) USING JSON") + sql(s"INSERT INTO $testTimeSeriesTable2 VALUES (TIMESTAMP '2023-10-01 00:01:00', 'A')") + sql(s"INSERT INTO $testTimeSeriesTable2 VALUES (TIMESTAMP '2023-10-01 00:10:00', 'B')") + sql(s""" + | CREATE INDEX $testIndex ON $testTimeSeriesTable2 + | (`@timestamp`, name) + | WITH (auto_refresh = true) + |""".stripMargin) + + val job = spark.streams.active.find(_.name == testFlintTimeSeriesIndex2) + awaitStreamingComplete(job.get.id.toString) + + val indexData = flint.queryIndex(testFlintTimeSeriesIndex2) + indexData.count() shouldBe 2 + } + } + + test("create covering index on time series time with ID expression") { + sql(s""" + | CREATE INDEX $testIndex ON $testTimeSeriesTable + | (timestamp, age) + | WITH ( + | auto_refresh = true, + | id_expression = 'address' + | ) + |""".stripMargin) + + val job = spark.streams.active.find(_.name == testFlintTimeSeriesIndex) + awaitStreamingComplete(job.get.id.toString) + + val indexData = flint.queryIndex(testFlintTimeSeriesIndex) + indexData.count() shouldBe 3 // only 3 rows left due to same ID + } + test("show all covering index on the source table") { flint .coveringIndex() diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala index 1b16a9e16..17a29d7e9 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala @@ -28,7 +28,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { | window.start AS startTime, | COUNT(*) AS count | FROM $testTable - | GROUP BY TUMBLE(time, '10 Minutes') + | GROUP BY TUMBLE(timestamp, '10 Minutes') |""".stripMargin override def beforeAll(): Unit = { @@ -134,7 +134,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { | window.start AS startTime, | COUNT(*) AS count | FROM $testTable - | GROUP BY TUMBLE(time, '1 Hour') + | GROUP BY TUMBLE(timestamp, '1 Hour') |""".stripMargin withIncrementalMaterializedView(largeWindowQuery) { indexData => @@ -159,7 +159,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { | COUNT(*) AS count | FROM $testTable | WHERE address = 'Seattle' - | GROUP BY TUMBLE(time, '5 Minutes') + | GROUP BY TUMBLE(timestamp, '5 Minutes') |""".stripMargin withIncrementalMaterializedView(filterQuery) { indexData => diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala index f9bd3967a..ce77a510c 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala @@ -34,7 +34,7 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { | window.start AS startTime, | COUNT(*) AS count | FROM $testTable - | GROUP BY TUMBLE(time, '10 Minutes') + | GROUP BY TUMBLE(timestamp, '10 Minutes') |""".stripMargin override def beforeAll(): Unit = { @@ -163,7 +163,7 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { | window.start AS `start.time`, | COUNT(*) AS `count` | FROM `spark_catalog`.`default`.`mv_test` - | GROUP BY TUMBLE(`time`, '10 Minutes')""".stripMargin.trim + | GROUP BY TUMBLE(`timestamp`, '10 Minutes')""".stripMargin.trim sql(s""" | CREATE MATERIALIZED VIEW `spark_catalog`.`default`.`mv_test_metrics` diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala index 29b8b95a6..f623db2a9 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala @@ -89,7 +89,7 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit sql(s""" | CREATE TABLE $testTable | ( - | time TIMESTAMP, + | timestamp TIMESTAMP, | name STRING, | age INT, | address STRING