Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug fix, support array datatype in MV #211

Merged
merged 3 commits into from
Jan 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

package org.apache.spark.sql.flint.datatype

import java.time.format.DateTimeFormatterBuilder

import org.json4s.{Formats, JField, JValue, NoTypeHints}
import org.json4s.JsonAST.{JNothing, JObject, JString}
import org.json4s.jackson.JsonMethods
Expand Down Expand Up @@ -78,8 +76,11 @@ object FlintDataType {
// object types
case JString("object") | JNothing => deserializeJValue(fieldProperties)

// binary types
case JString("binary") => BinaryType

// not supported
case _ => throw new IllegalStateException(s"unsupported data type")
case unknown => throw new IllegalStateException(s"unsupported data type: $unknown")
}
DataTypes.createStructField(fieldName, dataType, true, metadataBuilder.build())
}
Expand Down Expand Up @@ -112,13 +113,16 @@ object FlintDataType {
JsonMethods.compact(JsonMethods.render(jValue))
}

def serializeJValue(structType: StructType): JValue = {
JObject("properties" -> JObject(structType.fields.map(field => serializeField(field)).toList))
private def serializeJValue(structType: StructType): JValue = {
JObject(
"properties" -> JObject(
structType.fields
.map(field => JField(field.name, serializeField(field.dataType, field.metadata)))
.toList))
}

def serializeField(structField: StructField): JField = {
val metadata = structField.metadata
val dataType = structField.dataType match {
def serializeField(dataType: DataType, metadata: Metadata): JValue = {
dataType match {
// boolean
case BooleanType => JObject("type" -> JString("boolean"))

Expand Down Expand Up @@ -147,8 +151,14 @@ object FlintDataType {

// objects
case st: StructType => serializeJValue(st)
case _ => throw new IllegalStateException(s"unsupported data type")

// array
case ArrayType(elementType, _) => serializeField(elementType, Metadata.empty)

// binary
case BinaryType => JObject("type" -> JString("binary"))

case unknown => throw new IllegalStateException(s"unsupported data type: ${unknown.sql}")
}
JField(structField.name, dataType)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ object FlintSparkMaterializedView {
.sql(query)
.schema
.map { field =>
field.name -> field.dataType.typeName
field.name -> field.dataType.simpleString
}
.toMap
FlintSparkMaterializedView(mvName, query, outputSchema, indexOptions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ class FlintDataTypeSuite extends FlintSuite with Matchers {
| },
| "textField": {
| "type": "text"
| },
| "binaryField": {
| "type": "binary"
| }
| }
|}""".stripMargin
Expand All @@ -59,6 +62,7 @@ class FlintDataTypeSuite extends FlintSuite with Matchers {
StringType,
true,
new MetadataBuilder().putString("osType", "text").build()) ::
StructField("binaryField", BinaryType, true) ::
Nil)

FlintDataType.serialize(sparkStructType) shouldBe compactJson(flintDataType)
Expand Down Expand Up @@ -192,6 +196,40 @@ class FlintDataTypeSuite extends FlintSuite with Matchers {
FlintDataType.serialize(sparkStructType) shouldBe compactJson(flintDataType)
}

test("spark array type map to should map to array element type in OpenSearch") {
val flintDataType = """{
| "properties": {
| "varcharField": {
| "type": "keyword"
| },
| "charField": {
| "type": "keyword"
| }
| }
|}""".stripMargin
val sparkStructType =
StructType(
StructField("arrayIntField", ArrayType(IntegerType), true) ::
StructField(
"arrayObjectField",
StructType(StructField("booleanField", BooleanType, true) :: Nil),
true) :: Nil)
FlintDataType.serialize(sparkStructType) shouldBe compactJson(s"""{
| "properties": {
| "arrayIntField": {
| "type": "integer"
| },
| "arrayObjectField": {
| "properties": {
| "booleanField":{
| "type": "boolean"
| }
| }
| }
| }
|}""".stripMargin)
}

def compactJson(json: String): String = {
val data: JValue = JsonMethods.parse(json)
JsonMethods.compact(JsonMethods.render(data))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite {
| "columnType": "timestamp"
| },{
| "columnName": "count",
| "columnType": "long"
| "columnType": "bigint"
| }],
| "options": {
| "auto_refresh": "true",
Expand Down Expand Up @@ -197,7 +197,9 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite {
Map(
"auto_refresh" -> "true",
"checkpoint_location" -> checkpointDir.getAbsolutePath,
"watermark_delay" -> "1 Minute")) // This must be small to ensure window closed soon
"watermark_delay" -> "1 Minute"
)
) // This must be small to ensure window closed soon

flint
.materializedView()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,48 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
sql(s"CREATE MATERIALIZED VIEW IF NOT EXISTS $testMvName AS $testQuery")
}

test("issue 112, https://github.com/opensearch-project/opensearch-spark/issues/112") {
val tableName = "spark_catalog.default.issue112"
createTableIssue112(tableName)
sql(s"""
|CREATE MATERIALIZED VIEW $testMvName AS
| SELECT
| rs.resource.attributes.key as resource_key,
| rs.resource.attributes.value.stringValue as resource_value,
| ss.scope.name as scope_name,
| ss.scope.version as scope_version,
| span.attributes.key as span_key,
| span.attributes.value.intValue as span_int_value,
| span.attributes.value.stringValue as span_string_value,
| span.endTimeUnixNano,
| span.kind,
| span.name as span_name,
| span.parentSpanId,
| span.spanId,
| span.startTimeUnixNano,
| span.traceId
| FROM $tableName
| LATERAL VIEW
| EXPLODE(resourceSpans) as rs
| LATERAL VIEW
| EXPLODE(rs.scopeSpans) as ss
| LATERAL VIEW
| EXPLODE(ss.spans) as span
| LATERAL VIEW
| EXPLODE(span.attributes) as span_attr
|WITH (
| auto_refresh = false
|)
""".stripMargin)

val indexData = spark.read.format(FLINT_DATASOURCE).load(testFlintIndex)
flint.describeIndex(testFlintIndex) shouldBe defined
indexData.count() shouldBe 0

sql(s"REFRESH MATERIALIZED VIEW $testMvName")
indexData.count() shouldBe 2
}

test("create materialized view with quoted name and column name") {
val testQuotedQuery =
""" SELECT
Expand All @@ -182,9 +224,7 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
test("show all materialized views in catalog and database") {
// Show in catalog
flint.materializedView().name("spark_catalog.default.mv1").query(testQuery).create()
checkAnswer(
sql(s"SHOW MATERIALIZED VIEW IN spark_catalog"),
Seq(Row("mv1")))
checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN spark_catalog"), Seq(Row("mv1")))

// Show in catalog.database
flint.materializedView().name("spark_catalog.default.mv2").query(testQuery).create()
Expand All @@ -208,7 +248,7 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {

checkAnswer(
sql(s"DESC MATERIALIZED VIEW $testMvName"),
Seq(Row("startTime", "timestamp"), Row("count", "long")))
Seq(Row("startTime", "timestamp"), Row("count", "bigint")))
}

test("should return empty when describe nonexistent materialized view") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,67 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit
sql(s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 01:00:00', 'D', 40, 'Portland')")
sql(s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 03:00:00', 'E', 15, 'Vancouver')")
}

protected def createTableIssue112(testTable: String): Unit = {
sql(s"""
| CREATE TABLE $testTable (
| resourceSpans ARRAY<STRUCT<
| resource: STRUCT<
| attributes: ARRAY<STRUCT<key: STRING, value: STRUCT<stringValue: STRING>>>>,
| scopeSpans: ARRAY<STRUCT<
| scope: STRUCT<name: STRING, version: STRING>,
| spans: ARRAY<STRUCT<
| attributes: ARRAY<STRUCT<key: STRING, value: STRUCT<intValue: STRING, stringValue: STRING>>>,
| endTimeUnixNano: STRING,
| kind: BIGINT,
| name: STRING,
| parentSpanId: STRING,
| spanId: STRING,
| startTimeUnixNano: STRING,
| traceId: STRING>>>>>>)
| USING json
|""".stripMargin)
sql(s"""INSERT INTO $testTable
|VALUES (
| array(
| named_struct(
| 'resource',
| named_struct(
| 'attributes',
| array(
| named_struct('key','telemetry.sdk.version', 'value', named_struct('stringValue', '1.3.0')),
| named_struct('key','telemetry.sdk.name', 'value', named_struct('stringValue', 'opentelemetry'))
| )
| ),
| 'scopeSpans',
| array(
| named_struct(
| 'scope',
| named_struct('name', 'opentelemetry_ecto', 'version', '1.1.1'),
| 'spans',
| array(
| named_struct(
| 'attributes',
| array(
| named_struct('key', 'total_time_microseconds',
| 'value', named_struct('stringValue', null, 'intValue',
| '31286')),
| named_struct('key', 'source', 'value', named_struct
| ('stringValue', 'featureflags', 'intValue', null))
| ),
| 'endTimeUnixNano', '1698098982202276205',
| 'kind', 3,
| 'name', 'featureflagservice.repo.query:featureflags',
| 'parentSpanId', '9b355ca40dd98f5e',
| 'spanId', '87acd6659b425f80',
| 'startTimeUnixNano', '1698098982170068232',
| 'traceId', 'bc342fb3fbfa54c2188595b89b0b1cd8'
| )
| )
| )
| )
| )
| )
|)""".stripMargin)
}
}
Loading