Skip to content

Commit

Permalink
bug fix, support array datatype in MV (#211)
Browse files Browse the repository at this point in the history
* bug fix, support array datatype in MV

Signed-off-by: Peng Huo <[email protected]>

* fix format

Signed-off-by: Peng Huo <[email protected]>

* Fix IT, change columnType long to bigint

Signed-off-by: Peng Huo <[email protected]>

---------

Signed-off-by: Peng Huo <[email protected]>
  • Loading branch information
penghuo authored Jan 5, 2024
1 parent c72d773 commit d9b9dda
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

package org.apache.spark.sql.flint.datatype

import java.time.format.DateTimeFormatterBuilder

import org.json4s.{Formats, JField, JValue, NoTypeHints}
import org.json4s.JsonAST.{JNothing, JObject, JString}
import org.json4s.jackson.JsonMethods
Expand Down Expand Up @@ -78,8 +76,11 @@ object FlintDataType {
// object types
case JString("object") | JNothing => deserializeJValue(fieldProperties)

// binary types
case JString("binary") => BinaryType

// not supported
case _ => throw new IllegalStateException(s"unsupported data type")
case unknown => throw new IllegalStateException(s"unsupported data type: $unknown")
}
DataTypes.createStructField(fieldName, dataType, true, metadataBuilder.build())
}
Expand Down Expand Up @@ -112,13 +113,16 @@ object FlintDataType {
JsonMethods.compact(JsonMethods.render(jValue))
}

def serializeJValue(structType: StructType): JValue = {
JObject("properties" -> JObject(structType.fields.map(field => serializeField(field)).toList))
private def serializeJValue(structType: StructType): JValue = {
JObject(
"properties" -> JObject(
structType.fields
.map(field => JField(field.name, serializeField(field.dataType, field.metadata)))
.toList))
}

def serializeField(structField: StructField): JField = {
val metadata = structField.metadata
val dataType = structField.dataType match {
def serializeField(dataType: DataType, metadata: Metadata): JValue = {
dataType match {
// boolean
case BooleanType => JObject("type" -> JString("boolean"))

Expand Down Expand Up @@ -147,8 +151,14 @@ object FlintDataType {

// objects
case st: StructType => serializeJValue(st)
case _ => throw new IllegalStateException(s"unsupported data type")

// array
case ArrayType(elementType, _) => serializeField(elementType, Metadata.empty)

// binary
case BinaryType => JObject("type" -> JString("binary"))

case unknown => throw new IllegalStateException(s"unsupported data type: ${unknown.sql}")
}
JField(structField.name, dataType)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ object FlintSparkMaterializedView {
.sql(query)
.schema
.map { field =>
field.name -> field.dataType.typeName
field.name -> field.dataType.simpleString
}
.toMap
FlintSparkMaterializedView(mvName, query, outputSchema, indexOptions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ class FlintDataTypeSuite extends FlintSuite with Matchers {
| },
| "textField": {
| "type": "text"
| },
| "binaryField": {
| "type": "binary"
| }
| }
|}""".stripMargin
Expand All @@ -59,6 +62,7 @@ class FlintDataTypeSuite extends FlintSuite with Matchers {
StringType,
true,
new MetadataBuilder().putString("osType", "text").build()) ::
StructField("binaryField", BinaryType, true) ::
Nil)

FlintDataType.serialize(sparkStructType) shouldBe compactJson(flintDataType)
Expand Down Expand Up @@ -192,6 +196,40 @@ class FlintDataTypeSuite extends FlintSuite with Matchers {
FlintDataType.serialize(sparkStructType) shouldBe compactJson(flintDataType)
}

test("spark array type map to should map to array element type in OpenSearch") {
val flintDataType = """{
| "properties": {
| "varcharField": {
| "type": "keyword"
| },
| "charField": {
| "type": "keyword"
| }
| }
|}""".stripMargin
val sparkStructType =
StructType(
StructField("arrayIntField", ArrayType(IntegerType), true) ::
StructField(
"arrayObjectField",
StructType(StructField("booleanField", BooleanType, true) :: Nil),
true) :: Nil)
FlintDataType.serialize(sparkStructType) shouldBe compactJson(s"""{
| "properties": {
| "arrayIntField": {
| "type": "integer"
| },
| "arrayObjectField": {
| "properties": {
| "booleanField":{
| "type": "boolean"
| }
| }
| }
| }
|}""".stripMargin)
}

def compactJson(json: String): String = {
val data: JValue = JsonMethods.parse(json)
JsonMethods.compact(JsonMethods.render(data))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite {
| "columnType": "timestamp"
| },{
| "columnName": "count",
| "columnType": "long"
| "columnType": "bigint"
| }],
| "options": {
| "auto_refresh": "true",
Expand Down Expand Up @@ -197,7 +197,9 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite {
Map(
"auto_refresh" -> "true",
"checkpoint_location" -> checkpointDir.getAbsolutePath,
"watermark_delay" -> "1 Minute")) // This must be small to ensure window closed soon
"watermark_delay" -> "1 Minute"
)
) // This must be small to ensure window closed soon

flint
.materializedView()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,48 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
sql(s"CREATE MATERIALIZED VIEW IF NOT EXISTS $testMvName AS $testQuery")
}

test("issue 112, https://github.com/opensearch-project/opensearch-spark/issues/112") {
val tableName = "spark_catalog.default.issue112"
createTableIssue112(tableName)
sql(s"""
|CREATE MATERIALIZED VIEW $testMvName AS
| SELECT
| rs.resource.attributes.key as resource_key,
| rs.resource.attributes.value.stringValue as resource_value,
| ss.scope.name as scope_name,
| ss.scope.version as scope_version,
| span.attributes.key as span_key,
| span.attributes.value.intValue as span_int_value,
| span.attributes.value.stringValue as span_string_value,
| span.endTimeUnixNano,
| span.kind,
| span.name as span_name,
| span.parentSpanId,
| span.spanId,
| span.startTimeUnixNano,
| span.traceId
| FROM $tableName
| LATERAL VIEW
| EXPLODE(resourceSpans) as rs
| LATERAL VIEW
| EXPLODE(rs.scopeSpans) as ss
| LATERAL VIEW
| EXPLODE(ss.spans) as span
| LATERAL VIEW
| EXPLODE(span.attributes) as span_attr
|WITH (
| auto_refresh = false
|)
""".stripMargin)

val indexData = spark.read.format(FLINT_DATASOURCE).load(testFlintIndex)
flint.describeIndex(testFlintIndex) shouldBe defined
indexData.count() shouldBe 0

sql(s"REFRESH MATERIALIZED VIEW $testMvName")
indexData.count() shouldBe 2
}

test("create materialized view with quoted name and column name") {
val testQuotedQuery =
""" SELECT
Expand All @@ -182,9 +224,7 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
test("show all materialized views in catalog and database") {
// Show in catalog
flint.materializedView().name("spark_catalog.default.mv1").query(testQuery).create()
checkAnswer(
sql(s"SHOW MATERIALIZED VIEW IN spark_catalog"),
Seq(Row("mv1")))
checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN spark_catalog"), Seq(Row("mv1")))

// Show in catalog.database
flint.materializedView().name("spark_catalog.default.mv2").query(testQuery).create()
Expand All @@ -208,7 +248,7 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {

checkAnswer(
sql(s"DESC MATERIALIZED VIEW $testMvName"),
Seq(Row("startTime", "timestamp"), Row("count", "long")))
Seq(Row("startTime", "timestamp"), Row("count", "bigint")))
}

test("should return empty when describe nonexistent materialized view") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,4 +147,67 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit
sql(s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 01:00:00', 'D', 40, 'Portland')")
sql(s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 03:00:00', 'E', 15, 'Vancouver')")
}

protected def createTableIssue112(testTable: String): Unit = {
sql(s"""
| CREATE TABLE $testTable (
| resourceSpans ARRAY<STRUCT<
| resource: STRUCT<
| attributes: ARRAY<STRUCT<key: STRING, value: STRUCT<stringValue: STRING>>>>,
| scopeSpans: ARRAY<STRUCT<
| scope: STRUCT<name: STRING, version: STRING>,
| spans: ARRAY<STRUCT<
| attributes: ARRAY<STRUCT<key: STRING, value: STRUCT<intValue: STRING, stringValue: STRING>>>,
| endTimeUnixNano: STRING,
| kind: BIGINT,
| name: STRING,
| parentSpanId: STRING,
| spanId: STRING,
| startTimeUnixNano: STRING,
| traceId: STRING>>>>>>)
| USING json
|""".stripMargin)
sql(s"""INSERT INTO $testTable
|VALUES (
| array(
| named_struct(
| 'resource',
| named_struct(
| 'attributes',
| array(
| named_struct('key','telemetry.sdk.version', 'value', named_struct('stringValue', '1.3.0')),
| named_struct('key','telemetry.sdk.name', 'value', named_struct('stringValue', 'opentelemetry'))
| )
| ),
| 'scopeSpans',
| array(
| named_struct(
| 'scope',
| named_struct('name', 'opentelemetry_ecto', 'version', '1.1.1'),
| 'spans',
| array(
| named_struct(
| 'attributes',
| array(
| named_struct('key', 'total_time_microseconds',
| 'value', named_struct('stringValue', null, 'intValue',
| '31286')),
| named_struct('key', 'source', 'value', named_struct
| ('stringValue', 'featureflags', 'intValue', null))
| ),
| 'endTimeUnixNano', '1698098982202276205',
| 'kind', 3,
| 'name', 'featureflagservice.repo.query:featureflags',
| 'parentSpanId', '9b355ca40dd98f5e',
| 'spanId', '87acd6659b425f80',
| 'startTimeUnixNano', '1698098982170068232',
| 'traceId', 'bc342fb3fbfa54c2188595b89b0b1cd8'
| )
| )
| )
| )
| )
| )
|)""".stripMargin)
}
}

0 comments on commit d9b9dda

Please sign in to comment.