diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/datatype/FlintDataType.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/datatype/FlintDataType.scala index 414d8b61d..95a2666bd 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/datatype/FlintDataType.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/datatype/FlintDataType.scala @@ -5,8 +5,6 @@ package org.apache.spark.sql.flint.datatype -import java.time.format.DateTimeFormatterBuilder - import org.json4s.{Formats, JField, JValue, NoTypeHints} import org.json4s.JsonAST.{JNothing, JObject, JString} import org.json4s.jackson.JsonMethods @@ -78,8 +76,11 @@ object FlintDataType { // object types case JString("object") | JNothing => deserializeJValue(fieldProperties) + // binary types + case JString("binary") => BinaryType + // not supported - case _ => throw new IllegalStateException(s"unsupported data type") + case unknown => throw new IllegalStateException(s"unsupported data type: $unknown") } DataTypes.createStructField(fieldName, dataType, true, metadataBuilder.build()) } @@ -112,13 +113,16 @@ object FlintDataType { JsonMethods.compact(JsonMethods.render(jValue)) } - def serializeJValue(structType: StructType): JValue = { - JObject("properties" -> JObject(structType.fields.map(field => serializeField(field)).toList)) + private def serializeJValue(structType: StructType): JValue = { + JObject( + "properties" -> JObject( + structType.fields + .map(field => JField(field.name, serializeField(field.dataType, field.metadata))) + .toList)) } - def serializeField(structField: StructField): JField = { - val metadata = structField.metadata - val dataType = structField.dataType match { + def serializeField(dataType: DataType, metadata: Metadata): JValue = { + dataType match { // boolean case BooleanType => JObject("type" -> JString("boolean")) @@ -147,8 +151,14 @@ object FlintDataType { // objects case st: StructType => serializeJValue(st) - case _ => throw new IllegalStateException(s"unsupported data type") + + // array + case ArrayType(elementType, _) => serializeField(elementType, Metadata.empty) + + // binary + case BinaryType => JObject("type" -> JString("binary")) + + case unknown => throw new IllegalStateException(s"unsupported data type: ${unknown.sql}") } - JField(structField.name, dataType) } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala index 44a20e487..656cc387d 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala @@ -198,7 +198,7 @@ object FlintSparkMaterializedView { .sql(query) .schema .map { field => - field.name -> field.dataType.typeName + field.name -> field.dataType.simpleString } .toMap FlintSparkMaterializedView(mvName, query, outputSchema, indexOptions) diff --git a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/datatype/FlintDataTypeSuite.scala b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/datatype/FlintDataTypeSuite.scala index eb3c2a371..e2bde6b98 100644 --- a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/datatype/FlintDataTypeSuite.scala +++ b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/datatype/FlintDataTypeSuite.scala @@ -42,6 +42,9 @@ class FlintDataTypeSuite extends FlintSuite with Matchers { | }, | "textField": { | "type": "text" + | }, + | "binaryField": { + | "type": "binary" | } | } |}""".stripMargin @@ -59,6 +62,7 @@ class FlintDataTypeSuite extends FlintSuite with Matchers { StringType, true, new MetadataBuilder().putString("osType", "text").build()) :: + StructField("binaryField", BinaryType, true) :: Nil) FlintDataType.serialize(sparkStructType) shouldBe compactJson(flintDataType) @@ -192,6 +196,40 @@ class FlintDataTypeSuite extends FlintSuite with Matchers { FlintDataType.serialize(sparkStructType) shouldBe compactJson(flintDataType) } + test("spark array type map to should map to array element type in OpenSearch") { + val flintDataType = """{ + | "properties": { + | "varcharField": { + | "type": "keyword" + | }, + | "charField": { + | "type": "keyword" + | } + | } + |}""".stripMargin + val sparkStructType = + StructType( + StructField("arrayIntField", ArrayType(IntegerType), true) :: + StructField( + "arrayObjectField", + StructType(StructField("booleanField", BooleanType, true) :: Nil), + true) :: Nil) + FlintDataType.serialize(sparkStructType) shouldBe compactJson(s"""{ + | "properties": { + | "arrayIntField": { + | "type": "integer" + | }, + | "arrayObjectField": { + | "properties": { + | "booleanField":{ + | "type": "boolean" + | } + | } + | } + | } + |}""".stripMargin) + } + def compactJson(json: String): String = { val data: JValue = JsonMethods.parse(json) JsonMethods.compact(JsonMethods.render(data)) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala index 1b16a9e16..6bc85a241 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala @@ -70,7 +70,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { | "columnType": "timestamp" | },{ | "columnName": "count", - | "columnType": "long" + | "columnType": "bigint" | }], | "options": { | "auto_refresh": "true", @@ -197,7 +197,9 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { Map( "auto_refresh" -> "true", "checkpoint_location" -> checkpointDir.getAbsolutePath, - "watermark_delay" -> "1 Minute")) // This must be small to ensure window closed soon + "watermark_delay" -> "1 Minute" + ) + ) // This must be small to ensure window closed soon flint .materializedView() diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala index f9bd3967a..79e49c2fd 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala @@ -157,6 +157,48 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { sql(s"CREATE MATERIALIZED VIEW IF NOT EXISTS $testMvName AS $testQuery") } + test("issue 112, https://github.com/opensearch-project/opensearch-spark/issues/112") { + val tableName = "spark_catalog.default.issue112" + createTableIssue112(tableName) + sql(s""" + |CREATE MATERIALIZED VIEW $testMvName AS + | SELECT + | rs.resource.attributes.key as resource_key, + | rs.resource.attributes.value.stringValue as resource_value, + | ss.scope.name as scope_name, + | ss.scope.version as scope_version, + | span.attributes.key as span_key, + | span.attributes.value.intValue as span_int_value, + | span.attributes.value.stringValue as span_string_value, + | span.endTimeUnixNano, + | span.kind, + | span.name as span_name, + | span.parentSpanId, + | span.spanId, + | span.startTimeUnixNano, + | span.traceId + | FROM $tableName + | LATERAL VIEW + | EXPLODE(resourceSpans) as rs + | LATERAL VIEW + | EXPLODE(rs.scopeSpans) as ss + | LATERAL VIEW + | EXPLODE(ss.spans) as span + | LATERAL VIEW + | EXPLODE(span.attributes) as span_attr + |WITH ( + | auto_refresh = false + |) + """.stripMargin) + + val indexData = spark.read.format(FLINT_DATASOURCE).load(testFlintIndex) + flint.describeIndex(testFlintIndex) shouldBe defined + indexData.count() shouldBe 0 + + sql(s"REFRESH MATERIALIZED VIEW $testMvName") + indexData.count() shouldBe 2 + } + test("create materialized view with quoted name and column name") { val testQuotedQuery = """ SELECT @@ -182,9 +224,7 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { test("show all materialized views in catalog and database") { // Show in catalog flint.materializedView().name("spark_catalog.default.mv1").query(testQuery).create() - checkAnswer( - sql(s"SHOW MATERIALIZED VIEW IN spark_catalog"), - Seq(Row("mv1"))) + checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN spark_catalog"), Seq(Row("mv1"))) // Show in catalog.database flint.materializedView().name("spark_catalog.default.mv2").query(testQuery).create() @@ -208,7 +248,7 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { checkAnswer( sql(s"DESC MATERIALIZED VIEW $testMvName"), - Seq(Row("startTime", "timestamp"), Row("count", "long"))) + Seq(Row("startTime", "timestamp"), Row("count", "bigint"))) } test("should return empty when describe nonexistent materialized view") { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala index 211ddb57b..a5596bfe9 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala @@ -147,4 +147,67 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit sql(s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 01:00:00', 'D', 40, 'Portland')") sql(s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 03:00:00', 'E', 15, 'Vancouver')") } + + protected def createTableIssue112(testTable: String): Unit = { + sql(s""" + | CREATE TABLE $testTable ( + | resourceSpans ARRAY>>>, + | scopeSpans: ARRAY, + | spans: ARRAY>>, + | endTimeUnixNano: STRING, + | kind: BIGINT, + | name: STRING, + | parentSpanId: STRING, + | spanId: STRING, + | startTimeUnixNano: STRING, + | traceId: STRING>>>>>>) + | USING json + |""".stripMargin) + sql(s"""INSERT INTO $testTable + |VALUES ( + | array( + | named_struct( + | 'resource', + | named_struct( + | 'attributes', + | array( + | named_struct('key','telemetry.sdk.version', 'value', named_struct('stringValue', '1.3.0')), + | named_struct('key','telemetry.sdk.name', 'value', named_struct('stringValue', 'opentelemetry')) + | ) + | ), + | 'scopeSpans', + | array( + | named_struct( + | 'scope', + | named_struct('name', 'opentelemetry_ecto', 'version', '1.1.1'), + | 'spans', + | array( + | named_struct( + | 'attributes', + | array( + | named_struct('key', 'total_time_microseconds', + | 'value', named_struct('stringValue', null, 'intValue', + | '31286')), + | named_struct('key', 'source', 'value', named_struct + | ('stringValue', 'featureflags', 'intValue', null)) + | ), + | 'endTimeUnixNano', '1698098982202276205', + | 'kind', 3, + | 'name', 'featureflagservice.repo.query:featureflags', + | 'parentSpanId', '9b355ca40dd98f5e', + | 'spanId', '87acd6659b425f80', + | 'startTimeUnixNano', '1698098982170068232', + | 'traceId', 'bc342fb3fbfa54c2188595b89b0b1cd8' + | ) + | ) + | ) + | ) + | ) + | ) + |)""".stripMargin) + } }