Skip to content

Commit

Permalink
Merge pull request #58 from badrinathpatchikolla/master
Browse files Browse the repository at this point in the history
Added CSV Deserializer
  • Loading branch information
mantovani authored Sep 29, 2022
2 parents 3ffb24e + 2b80e74 commit 691a9d9
Show file tree
Hide file tree
Showing 23 changed files with 44 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ private[almaren] trait Deserializer extends Core {
XMLDeserializer(columnName,schemaInfo,options,autoFlatten)
def avro: State =
AvroDeserializer(columnName,None,options,autoFlatten,schemaInfo.getOrElse(throw SchemaRequired(decoder)))
def csv: State =
CSVDeserializer(columnName, schemaInfo, options, autoFlatten)

decoder.toUpperCase match {
case "JSON" => json
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,23 @@ case class XMLDeserializer(columnName: String, schema: Option[String], options:
.withColumn(columnName, from_xml(col(columnName), xmlSchema, options))
}
}

case class CSVDeserializer(columnName: String, schema: Option[String], options: Map[String, String], autoFlatten: Boolean) extends Deserializer {

import org.apache.spark.sql.functions._
import collection.JavaConversions._

override def deserializer(df: DataFrame): DataFrame = {
import df.sparkSession.implicits._
logger.info(s"columnName:{$columnName}, schema:{$schema}, options:{$options}, autoFlatten:{$autoFlatten}")
df.withColumn(columnName,
from_csv(
col(columnName),
schema.getOrElse(getSchemaDDL(df.selectExpr(columnName).as[(String)])),
options
))
}

private def getSchemaDDL(df: Dataset[String]): String =
getDDL(getReadWithOptions.csv(sampleData(df)))
}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Empty file.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Empty file.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
22 changes: 22 additions & 0 deletions src/test/scala/com/github/music/of/the/ainur/almaren/Test.scala
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ class Test extends FunSuite with BeforeAndAfter {
deserializerJsonTest()
deserializerXmlTest()
deserializerAvroTest()
deserializerCsvTest()
testInferSchemaJsonColumn()
testInferSchemaDataframe(moviesDf)

Expand Down Expand Up @@ -464,6 +465,27 @@ class Test extends FunSuite with BeforeAndAfter {
test(jsonschmeadf, resDf, "Deserialize JSON Schema")
}

def deserializerCsvTest(): Unit = {
val df = Seq(
("John,Chris", "Smith", "London"),
("David,Michael", "Jones", "India"),
("Joseph,Mike", "Lee", "Russia"),
("Chris,Tony", "Brown", "Indonesia"),
).toDF("first_name", "last_name", "country")
val newCsvDF = almaren.builder
.sourceDataFrame(df)
.deserializer("CSV", "first_name", options = Map("header" -> "false"))
.batch
val newCsvSchemaDf = almaren.builder
.sourceDataFrame(df)
.deserializer("CSV", "first_name", Some("`first_name_1` STRING,`first_name_2` STRING"), Map("header" -> "true"))
.batch
val csvDf = spark.read.parquet("src/test/resources/data/csvDeserializer.parquet")
val csvSchemaDf = spark.read.parquet("src/test/resources/data/csvDeserializerSchema.parquet")
test(newCsvDF, csvDf, "Deserialize CSV")
test(newCsvSchemaDf, csvSchemaDf, "Deserialize CSV Schema")
}

def deserializerXmlTest(): Unit = {
val xmlStr = Seq(
""" <json_string>
Expand Down

0 comments on commit 691a9d9

Please sign in to comment.