Skip to content

Commit

Permalink
[SPARK-50350][SQL] Avro: add new function schema_of_avro (scala s…
Browse files Browse the repository at this point in the history
…ide)

### What changes were proposed in this pull request?
The pr aims to add new function `schema_of_avro` for `avro`.

### Why are the changes needed?
- The schema format of Avro is different from that of Spark when presented to end users. In order to facilitate the intuitive understanding of Avro's schema by end users.
- Similar functions exist in other formats of data, such as `csv`, `json` and `xml`,
https://github.com/apache/spark/blob/87a5b37ec3c4b383a5938144612c07187d597ff8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L872-L875
https://github.com/apache/spark/blob/87a5b37ec3c4b383a5938144612c07187d597ff8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L836-L839
https://github.com/apache/spark/blob/87a5b37ec3c4b383a5938144612c07187d597ff8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L877-L880

### Does this PR introduce _any_ user-facing change?
Yes, end-users will be able to clearly know what `Avro's schema` format should look like in `Spark` through the function `schema_of_avro`.

### How was this patch tested?
- Add new UT.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #48889 from panbingkun/SPARK-50350.

Authored-by: panbingkun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
panbingkun authored and dongjoon-hyun committed Dec 6, 2024
1 parent 934a387 commit be0780b
Show file tree
Hide file tree
Showing 9 changed files with 278 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.avro

import org.apache.avro.Schema

import org.apache.spark.sql.catalyst.util.{ParseMode, PermissiveMode}
import org.apache.spark.unsafe.types.UTF8String

object AvroExpressionEvalUtils {

def schemaOfAvro(
avroOptions: AvroOptions,
parseMode: ParseMode,
expectedSchema: Schema): UTF8String = {
val dt = SchemaConverters.toSqlType(
expectedSchema,
avroOptions.useStableIdForUnionType,
avroOptions.stableIdPrefixForUnionType,
avroOptions.recursiveFieldMaxDepth).dataType
val schema = parseMode match {
// With PermissiveMode, the output Catalyst row might contain columns of null values for
// corrupt records, even if some of the columns are not nullable in the user-provided schema.
// Therefore we force the schema to be all nullable here.
case PermissiveMode => dt.asNullable
case _ => dt
}
UTF8String.fromString(schema.sql)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.avro

import org.apache.avro.Schema

import org.apache.spark.sql.catalyst.expressions.{Expression, LeafExpression, Literal, RuntimeReplaceable}
import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
import org.apache.spark.sql.catalyst.util.{FailFastMode, ParseMode, PermissiveMode}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataType, ObjectType}

private[sql] case class SchemaOfAvro(
jsonFormatSchema: String,
options: Map[String, String])
extends LeafExpression with RuntimeReplaceable {

override def dataType: DataType = SQLConf.get.defaultStringType

override def nullable: Boolean = false

@transient private lazy val avroOptions = AvroOptions(options)

@transient private lazy val actualSchema =
new Schema.Parser().setValidateDefaults(false).parse(jsonFormatSchema)

@transient private lazy val expectedSchema = avroOptions.schema.getOrElse(actualSchema)

@transient private lazy val parseMode: ParseMode = {
val mode = avroOptions.parseMode
if (mode != PermissiveMode && mode != FailFastMode) {
throw QueryCompilationErrors.parseModeUnsupportedError(
prettyName, mode
)
}
mode
}

override def prettyName: String = "schema_of_avro"

@transient private lazy val avroOptionsObjectType = ObjectType(classOf[AvroOptions])
@transient private lazy val parseModeObjectType = ObjectType(classOf[ParseMode])
@transient private lazy val schemaObjectType = ObjectType(classOf[Schema])

override def replacement: Expression = StaticInvoke(
AvroExpressionEvalUtils.getClass,
dataType,
"schemaOfAvro",
Seq(
Literal(avroOptions, avroOptionsObjectType),
Literal(parseMode, parseModeObjectType),
Literal(expectedSchema, schemaObjectType)),
Seq(avroOptionsObjectType, parseModeObjectType, schemaObjectType)
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -629,4 +629,40 @@ class AvroFunctionsSuite extends QueryTest with SharedSparkSession {
assert(readbackPerson2.get(2).toString === person2.get(2))
}
}

test("schema_of_avro") {
val df = spark.range(1)
val avroIntType = s"""
|{
| "type": "int",
| "name": "id"
|}""".stripMargin
checkAnswer(df.select(functions.schema_of_avro(avroIntType)), Row("INT"))

val avroStructType =
"""
|{
| "type": "record",
| "name": "person",
| "fields": [
| {"name": "name", "type": "string"},
| {"name": "age", "type": "int"},
| {"name": "country", "type": "string"}
| ]
|}""".stripMargin
checkAnswer(df.select(functions.schema_of_avro(avroStructType)),
Row("STRUCT<name: STRING NOT NULL, age: INT NOT NULL, country: STRING NOT NULL>"))

val avroMultiType =
"""
|{
| "type": "record",
| "name": "person",
| "fields": [
| {"name": "u", "type": ["int", "string"]}
| ]
|}""".stripMargin
checkAnswer(df.select(functions.schema_of_avro(avroMultiType)),
Row("STRUCT<u: STRUCT<member0: INT, member1: STRING> NOT NULL>"))
}
}
28 changes: 28 additions & 0 deletions sql/api/src/main/scala/org/apache/spark/sql/avro/functions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,32 @@ object functions {
def to_avro(data: Column, jsonFormatSchema: String): Column = {
Column.fn("to_avro", data, lit(jsonFormatSchema))
}

/**
* Returns schema in the DDL format of the avro schema in JSON string format.
*
* @param jsonFormatSchema
* the avro schema in JSON string format.
*
* @since 4.0.0
*/
@Experimental
def schema_of_avro(jsonFormatSchema: String): Column = {
Column.fn("schema_of_avro", lit(jsonFormatSchema))
}

/**
* Returns schema in the DDL format of the avro schema in JSON string format.
*
* @param jsonFormatSchema
* the avro schema in JSON string format.
* @param options
* options to control how the Avro record is parsed.
*
* @since 4.0.0
*/
@Experimental
def schema_of_avro(jsonFormatSchema: String, options: java.util.Map[String, String]): Column = {
Column.fnWithOptions("schema_of_avro", options.asScala.iterator, lit(jsonFormatSchema))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,7 @@ object FunctionRegistry {
// Avro
expression[FromAvro]("from_avro"),
expression[ToAvro]("to_avro"),
expression[SchemaOfAvro]("schema_of_avro"),

// Protobuf
expression[FromProtobuf]("from_protobuf"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,96 @@ case class ToAvro(child: Expression, jsonFormatSchema: Expression)
override def prettyName: String =
getTagValue(FunctionRegistry.FUNC_ALIAS).getOrElse("to_avro")
}

/**
* Returns schema in the DDL format of the avro schema in JSON string format.
* This is a thin wrapper over the [[SchemaOfAvro]] class to create a SQL function.
*
* @param jsonFormatSchema the Avro schema in JSON string format.
* @param options the options to use when performing the conversion.
*
* @since 4.0.0
*/
// scalastyle:off line.size.limit
@ExpressionDescription(
usage = """
_FUNC_(jsonFormatSchema, options) - Returns schema in the DDL format of the avro schema in JSON string format.
""",
examples = """
Examples:
> SELECT _FUNC_('{"type": "record", "name": "struct", "fields": [{"name": "u", "type": ["int", "string"]}]}', map());
STRUCT<u: STRUCT<member0: INT, member1: STRING> NOT NULL>
""",
group = "misc_funcs",
since = "4.0.0"
)
// scalastyle:on line.size.limit
case class SchemaOfAvro(jsonFormatSchema: Expression, options: Expression)
extends BinaryExpression with RuntimeReplaceable {

override def left: Expression = jsonFormatSchema
override def right: Expression = options

override protected def withNewChildrenInternal(
newLeft: Expression, newRight: Expression): Expression =
copy(jsonFormatSchema = newLeft, options = newRight)

def this(jsonFormatSchema: Expression) =
this(jsonFormatSchema, Literal.create(null))

override def checkInputDataTypes(): TypeCheckResult = {
val schemaCheck = jsonFormatSchema.dataType match {
case _: StringType |
_: NullType
if jsonFormatSchema.foldable =>
None
case _ =>
Some(TypeCheckResult.TypeCheckFailure("The first argument of the SCHEMA_OF_AVRO SQL " +
"function must be a constant string containing the JSON representation of the schema " +
"to use for converting the value from AVRO format"))
}
val optionsCheck = options.dataType match {
case MapType(StringType, StringType, _) |
MapType(NullType, NullType, _) |
_: NullType
if options.foldable =>
None
case _ =>
Some(TypeCheckResult.TypeCheckFailure("The second argument of the SCHEMA_OF_AVRO SQL " +
"function must be a constant map of strings to strings containing the options to use " +
"for converting the value from AVRO format"))
}
schemaCheck.getOrElse(
optionsCheck.getOrElse(
TypeCheckResult.TypeCheckSuccess))
}

override lazy val replacement: Expression = {
val schemaValue: String = jsonFormatSchema.eval() match {
case s: UTF8String =>
s.toString
case null =>
""
}
val optionsValue: Map[String, String] = options.eval() match {
case a: ArrayBasedMapData if a.keyArray.array.nonEmpty =>
val keys: Array[String] = a.keyArray.array.map(_.toString)
val values: Array[String] = a.valueArray.array.map(_.toString)
keys.zip(values).toMap
case _ =>
Map.empty
}
val constructor = try {
Utils.classForName("org.apache.spark.sql.avro.SchemaOfAvro").getConstructors.head
} catch {
case _: java.lang.ClassNotFoundException =>
throw QueryCompilationErrors.avroNotLoadedSqlFunctionsUnusable(
functionName = "SCHEMA_OF_AVRO")
}
val expr = constructor.newInstance(schemaValue, optionsValue)
expr.asInstanceOf[Expression]
}

override def prettyName: String =
getTagValue(FunctionRegistry.FUNC_ALIAS).getOrElse("schema_of_avro")
}
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,7 @@ class CollationExpressionWalkerSuite extends SparkFunSuite with SharedSparkSessi
// other functions which are not yet supported
"to_avro",
"from_avro",
"schema_of_avro",
"to_protobuf",
"from_protobuf"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ class ExpressionsSchemaSuite extends QueryTest with SharedSparkSession {
// SET spark.sql.parser.escapedStringLiterals=true
example.split(" > ").tail.filterNot(_.trim.startsWith("SET")).take(1).foreach {
case _ if funcName == "from_avro" || funcName == "to_avro" ||
funcName == "from_protobuf" || funcName == "to_protobuf" =>
funcName == "schema_of_avro" || funcName == "from_protobuf" ||
funcName == "to_protobuf" =>
// Skip running the example queries for the from_avro, to_avro, from_protobuf and
// to_protobuf functions because these functions dynamically load the
// AvroDataToCatalyst or CatalystDataToAvro classes which are not available in this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ class ExpressionInfoSuite extends SparkFunSuite with SharedSparkSession {
// Requires dynamic class loading not available in this test suite.
"org.apache.spark.sql.catalyst.expressions.FromAvro",
"org.apache.spark.sql.catalyst.expressions.ToAvro",
"org.apache.spark.sql.catalyst.expressions.SchemaOfAvro",
"org.apache.spark.sql.catalyst.expressions.FromProtobuf",
"org.apache.spark.sql.catalyst.expressions.ToProtobuf",
classOf[CurrentUser].getName,
Expand Down

0 comments on commit be0780b

Please sign in to comment.