Skip to content

Commit

Permalink
Merge pull request #59 from badrinathpatchikolla/spark-3.1
Browse files Browse the repository at this point in the history
Added CSV Deserializer for Target File
  • Loading branch information
mantovani authored Oct 4, 2022
2 parents 0303311 + 230d0cc commit 3ac23a2
Show file tree
Hide file tree
Showing 27 changed files with 273 additions and 63 deletions.
15 changes: 13 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ The Almaren Framework provides a simplified consistent minimalistic layer over A
+ [targetSolr](#targetsolr)
+ [targetMongoDb](#targetmongodb)
+ [targetBigQuery](#targetbigquery)
+ [targetFile](#targetfile)
- [Executors](#executors)
* [Batch](#batch)
* [Streaming Kafka](#streaming-kafka)
Expand Down Expand Up @@ -481,6 +482,14 @@ Write to BigQuery using [BigQuery Connector](https://github.com/music-of-the-ain

Write to Neo4j using [Neo4j Connector](https://github.com/music-of-the-ainur/neo4j.almaren)

#### targetFile

Write to File, you must have the following parameters: format, path, saveMode of the file and parameters as a Map. For partitioning provide a list of columns, for bucketing provide number of buckets and list of columns, for sorting provide list of columns, and tableName. Check the [documentation](https://spark.apache.org/docs/latest/sql-data-sources-generic-options.html) for the full list of parameters.

```scala
targetFile("parquet","/home/abc/targetlocation/output.parquet",SaveMode.Overwrite,Map("batchSize"->10000),List("partitionColumns"),(5,List("bucketingColumns")),List("sortingColumns"),Some("sampleTableName"))
```

## Executors

Executors are responsible to execute Almaren Tree i.e ```Option[Tree]``` to Apache Spark. Without invoke an _executor_, code won't be executed by Apache Spark. Follow the list of _executors_:
Expand Down Expand Up @@ -636,7 +645,8 @@ val sourcePolicy = almaren.builder.sourceHbase("""{
|"status":{"cf":"Policy", "col":"status", "type":"string"},
|"person_id":{"cf":"Policy", "col":"source", "type":"long"}
|}
|}""").sql(""" SELECT * FROM __TABLE__ WHERE status = "ACTIVE" """).alias("policy")
|}""").alias("hbase")
.sql(""" SELECT * FROM hbase WHERE status = "ACTIVE" """).alias("policy")

val sourcePerson = almaren.builder.sourceHbase("""{
|"table":{"namespace":"default", "name":"person"},
Expand All @@ -647,7 +657,8 @@ val sourcePerson = almaren.builder.sourceHbase("""{
|"type":{"cf":"Policy", "col":"type", "type":"string"},
|"age":{"cf":"Policy", "col":"source", "type":"string"}
|}
|}""").sql(""" SELECT * FROM __TABLE__ WHERE type = "PREMIUM" """).alias("person")
|}""").alias("hbase")
.sql(""" SELECT * FROM hbase WHERE type = "PREMIUM" """).alias("person")

almaren.builder.sql(""" SELECT * FROM person JOIN policy ON policy.person_id = person.id """).alias("table")
.sql("SELECT *,unix_timestamp() as timestamp FROM table").alias("table1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,28 @@ package com.github.music.of.the.ainur.almaren.builder.core

import com.github.music.of.the.ainur.almaren.builder.Core
import com.github.music.of.the.ainur.almaren.state.core._
import com.github.music.of.the.ainur.almaren.{Tree, InvalidDecoder, SchemaRequired, State}
import com.github.music.of.the.ainur.almaren.{InvalidDecoder, SchemaRequired, State, Tree}

private[almaren] trait Deserializer extends Core {
def deserializer(decoder:String,columnName:String,schemaInfo:Option[String] = None): Option[Tree] = {
import java.util.Collections

def json(): State =
JsonDeserializer(columnName,schemaInfo)
def xml(): State =
XMLDeserializer(columnName,schemaInfo)
def avro(): State =
AvroDeserializer(columnName,schemaInfo.getOrElse(throw SchemaRequired(decoder)))
private[almaren] trait Deserializer extends Core {
def deserializer(decoder:String,columnName:String,schemaInfo:Option[String] = None,options:Map[String,String] = Map(),autoFlatten:Boolean = true): Option[Tree] = {

def json: State =
JsonDeserializer(columnName,schemaInfo,options,autoFlatten)
def xml: State =
XMLDeserializer(columnName,schemaInfo,options,autoFlatten)
def avro: State =
AvroDeserializer(columnName,None,options,autoFlatten,schemaInfo.getOrElse(throw SchemaRequired(decoder)))
def csv: State =
CSVDeserializer(columnName, schemaInfo, options, autoFlatten)

decoder.toUpperCase match {
case "JSON" => json
case "XML" => xml
case "AVRO" => avro
case "CSV" => csv
case d => throw InvalidDecoder(d)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.github.music.of.the.ainur.almaren.builder.core

import com.github.music.of.the.ainur.almaren.Tree
import com.github.music.of.the.ainur.almaren.builder.Core
import com.github.music.of.the.ainur.almaren.state.core.{TargetJdbc, TargetSql, TargetKafka}
import com.github.music.of.the.ainur.almaren.state.core.{TargetFile, TargetJdbc, TargetSql, TargetKafka}
import org.apache.spark.sql.SaveMode

private[almaren] trait Target extends Core {
Expand All @@ -14,4 +14,14 @@ private[almaren] trait Target extends Core {

def targetKafka(servers: String, options: Map[String, String] = Map()): Option[Tree] =
TargetKafka(servers, options)

def targetFile(format: String,
path: String,
saveMode: SaveMode = SaveMode.Overwrite,
params: Map[String, String] = Map(),
partitionBy: List[String] = List.empty,
bucketBy: (Int, List[String]) = (64, List.empty),
sortBy: List[String] = List.empty,
tableName: Option[String]): Option[Tree] =
TargetFile(format, path, params, saveMode, partitionBy, bucketBy, sortBy, tableName)
}
Original file line number Diff line number Diff line change
@@ -1,61 +1,123 @@
package com.github.music.of.the.ainur.almaren.state.core

import com.github.music.of.the.ainur.almaren.State
import org.apache.spark.sql.DataFrame
import com.github.music.of.the.ainur.almaren.{Almaren, SchemaRequired, State}
import org.apache.spark.sql.{DataFrame, DataFrameReader, Dataset}
import org.apache.spark.sql.types.{DataType, StructType}

import scala.language.implicitConversions
import com.github.music.of.the.ainur.almaren.Almaren
import com.github.music.of.the.ainur.almaren.util.Constants
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.functions.col

import javax.xml.crypto.Data

trait Deserializer extends State {

def columnName: String
def schema: Option[String]
def options: Map[String, String]
def autoFlatten: Boolean

override def executor(df: DataFrame): DataFrame = {
val newDf = deserializer(df)
if(autoFlatten)
autoFlatten(newDf,columnName)
else
newDf
}

abstract class Deserializer() extends State {
override def executor(df: DataFrame): DataFrame = deserializer(df)
def deserializer(df: DataFrame): DataFrame
implicit def string2Schema(schema: String): DataType =

implicit def string2Schema(schema: String): StructType =
StructType.fromDDL(schema)

def autoFlatten(df: DataFrame, columnName: String): DataFrame =
df.
select("*", columnName.concat(".*")).
drop(columnName)

def sampleData[T](df: Dataset[T]): Dataset[T] = {
df.sample(
options.getOrElse("samplingRatio","1.0").toDouble
).limit(
options.getOrElse("samplingMaxLines","10000").toInt
)
}

def getReadWithOptions: DataFrameReader =
Almaren.spark.getOrCreate().read.options(options)

def getDDL(df:DataFrame): String =
df.schema.toDDL
}

case class AvroDeserializer(columnName: String,schema: String) extends Deserializer {
import org.apache.spark.sql.avro._
case class AvroDeserializer(columnName: String, schema: Option[String] = None, options: Map[String, String], autoFlatten: Boolean, mandatorySchema: String) extends Deserializer {

import org.apache.spark.sql.avro.functions.from_avro
import org.apache.spark.sql.functions._
import collection.JavaConversions._

schema.map(_ => throw SchemaRequired(s"AvroDeserializer, don't use 'schema' it must be None, use 'mandatorySchema' "))

override def deserializer(df: DataFrame): DataFrame = {
logger.info(s"columnName:{$columnName}, schema:{$schema}")
df.withColumn(columnName,from_avro(col(columnName),schema))
.select("*",columnName.concat(".*")).drop(columnName)
logger.info(s"columnName:{$columnName}, schema:{$mandatorySchema}, options:{$options}, autoFlatten:{$autoFlatten}")
df.withColumn(columnName, from_avro(col(columnName), mandatorySchema, options))
}
}

case class JsonDeserializer(columnName: String,schema: Option[String]) extends Deserializer {
case class JsonDeserializer(columnName: String, schema: Option[String], options: Map[String, String], autoFlatten: Boolean) extends Deserializer {

import org.apache.spark.sql.functions._
import collection.JavaConversions._

override def deserializer(df: DataFrame): DataFrame = {
import df.sparkSession.implicits._
logger.info(s"columnName:{$columnName}, schema:{$schema}")
logger.info(s"columnName:{$columnName}, schema:{$schema}, options:{$options}, autoFlatten:{$autoFlatten}")
df.withColumn(columnName,
from_json(col(columnName),
schema.getOrElse(getSchemaDDL(df.selectExpr(columnName).as[(String)]))))
.select("*",columnName.concat(".*"))
.drop(columnName)
from_json(
col(columnName),
schema.getOrElse(getSchemaDDL(df.selectExpr(columnName).as[(String)])),
options
))
}

private def getSchemaDDL(df: Dataset[String]): String =
Almaren.spark.getOrCreate().read.json(df.sample(Constants.sampleDeserializer)).schema.toDDL
getDDL(getReadWithOptions.json(sampleData(df)))
}

case class XMLDeserializer(columnName: String, schema: Option[String]) extends Deserializer {
case class XMLDeserializer(columnName: String, schema: Option[String], options: Map[String, String], autoFlatten: Boolean) extends Deserializer {

import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml
import org.apache.spark.sql.functions._

override def deserializer(df: DataFrame): DataFrame = {
logger.info(s"columnName:{$columnName}")
logger.info(s"columnName:{$columnName}, schema:{$schema}, options:{$options}, autoFlatten:{$autoFlatten}")
import df.sparkSession.implicits._
val xmlSchema = schema match {
case Some(s) => StructType.fromDDL(s)
case None => schema_of_xml(df.select(columnName).as[String])
case None => schema_of_xml(sampleData(df.select(columnName).as[String]), options = options)
}
df
.withColumn(columnName, from_xml(col(columnName), xmlSchema))
.select("*",columnName.concat(".*"))
.drop(columnName)
.withColumn(columnName, from_xml(col(columnName), xmlSchema, options))
}
}

case class CSVDeserializer(columnName: String, schema: Option[String], options: Map[String, String], autoFlatten: Boolean) extends Deserializer {

import org.apache.spark.sql.functions._
import collection.JavaConversions._

override def deserializer(df: DataFrame): DataFrame = {
import df.sparkSession.implicits._
logger.info(s"columnName:{$columnName}, schema:{$schema}, options:{$options}, autoFlatten:{$autoFlatten}")
df.withColumn(columnName,
from_csv(
col(columnName),
schema.getOrElse(getSchemaDDL(df.selectExpr(columnName).as[(String)])),
options
))
}

private def getSchemaDDL(df: Dataset[String]): String =
getDDL(getReadWithOptions.csv(sampleData(df)))
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,36 @@ case class TargetKafka(servers: String, options: Map[String, String]) extends Ta
}
}

case class TargetFile(
format: String,
path: String,
params: Map[String, String],
saveMode: SaveMode) extends Target {
case class TargetFile(format: String,
path: String,
params: Map[String, String],
saveMode: SaveMode,
partitionBy: List[String],
bucketBy: (Int, List[String]),
sortBy: List[String],
tableName: Option[String]) extends Target {
override def target(df: DataFrame): DataFrame = {
logger.info(s"format:{$format}, path:{$path}, params:{$params}")
df.write
logger.info(s"format:{$format}, path:{$path}, params:{$params}, partitionBy:{$partitionBy}, bucketBy:{$bucketBy}, sort:{$sortBy},tableName:{$tableName}")
val write = df.write
.format(format)
.option("path", path)
.options(params)
.save()
.mode(saveMode)

if (partitionBy.nonEmpty)
write.partitionBy(partitionBy: _*)

if (bucketBy._2.nonEmpty) {
write.bucketBy(bucketBy._1, bucketBy._2.head, bucketBy._2.tail: _*)
if (sortBy.nonEmpty)
write.sortBy(sortBy.head, sortBy.tail: _*)
}

tableName match {
case Some(tableName) => write.saveAsTable(tableName)
case _ => write.save
}

df
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,4 @@ object Constants {
val TempTableName = "__TABLE__"
val TempStreamTableName = "__STREAMING__"
val FrameworkName = "Almaren Framework"

val sampleDeserializer = 1
}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Empty file.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Empty file.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Loading

0 comments on commit 3ac23a2

Please sign in to comment.