diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala index 8bfc86120..cb32e74d3 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala @@ -15,7 +15,6 @@ import org.scalatestplus.mockito.MockitoSugar.mock import org.apache.spark.FlintSuite import org.apache.spark.sql.{DataFrame, Row} -import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.dsl.expressions.{count, intToLiteral, stringToLiteral, DslAttr, DslExpression, StringToAttributeConversionHelper} import org.apache.spark.sql.catalyst.dsl.plans.DslLogicalPlan @@ -33,7 +32,7 @@ import org.apache.spark.unsafe.types.UTF8String class FlintSparkMaterializedViewSuite extends FlintSuite { /** Test table, MV name and query */ - val testTable = "mv_build_test" + val testTable = "spark_catalog.default.mv_build_test" val testMvName = "spark_catalog.default.mv" val testQuery = "SELECT 1" @@ -100,13 +99,15 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { val options = Map("watermark_delay" -> "30 Seconds") withAggregateMaterializedView(testQuery, options) { actualPlan => - assert( - actualPlan.sameSemantics( - streamingRelation(testTable) - .watermark($"time", "30 Seconds") - .groupBy($"TUMBLE".function($"time", "1 Minute"))( - $"window.start" as "startTime", - count(1) as "count"))) + comparePlans( + actualPlan, + streamingRelation(testTable) + .watermark($"time", "30 Seconds") + .groupBy($"TUMBLE".function($"time", "1 Minute"))( + $"window.start" as "startTime", + $"COUNT".function(1) as "count"), + checkAnalysis = false + ) // don't analyze due to full test table name } } @@ -123,14 +124,15 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { val options = Map("watermark_delay" -> "30 Seconds") withAggregateMaterializedView(testQuery, options) { actualPlan => - assert( - actualPlan.sameSemantics( - streamingRelation(testTable) - .where($"age" > 30) - .watermark($"time", "30 Seconds") - .groupBy($"TUMBLE".function($"time", "1 Minute"))( - $"window.start" as "startTime", - count(1) as "count"))) + comparePlans( + actualPlan, + streamingRelation(testTable) + .where($"age" > 30) + .watermark($"time", "30 Seconds") + .groupBy($"TUMBLE".function($"time", "1 Minute"))( + $"window.start" as "startTime", + $"COUNT".function(1) as "count"), + checkAnalysis = false) } } @@ -138,11 +140,12 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { val testQuery = s"SELECT name, age FROM $testTable WHERE age > 30" withAggregateMaterializedView(testQuery, Map.empty) { actualPlan => - assert( - actualPlan.sameSemantics( - streamingRelation(testTable) - .where($"age" > 30) - .select($"name", $"age"))) + comparePlans( + actualPlan, + streamingRelation(testTable) + .where($"age" > 30) + .select($"name", $"age"), + checkAnalysis = false) } } @@ -151,9 +154,11 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { val options = Map("extra_options" -> s"""{"$testTable": {"maxFilesPerTrigger": "1"}}""") withAggregateMaterializedView(testQuery, options) { actualPlan => - val expectPlan = - streamingRelation(testTable, Map("maxFilesPerTrigger" -> "1")).select("name", "age") - assert(actualPlan.sameSemantics(expectPlan)) + comparePlans( + actualPlan, + streamingRelation(testTable, Map("maxFilesPerTrigger" -> "1")) + .select($"name", $"age"), + checkAnalysis = false) } } @@ -195,8 +200,8 @@ object FlintSparkMaterializedViewSuite { def streamingRelation( tableName: String, extraOptions: Map[String, String] = Map.empty): UnresolvedRelation = { - UnresolvedRelation( - TableIdentifier(tableName), + new UnresolvedRelation( + tableName.split('.'), new CaseInsensitiveStringMap(extraOptions.asJava), isStreaming = true) }