diff --git a/README.md b/README.md index ae117062..de209781 100644 --- a/README.md +++ b/README.md @@ -41,6 +41,7 @@ The Almaren Framework provides a simplified consistent minimalistic layer over A + [targetSolr](#targetsolr) + [targetMongoDb](#targetmongodb) + [targetBigQuery](#targetbigquery) + + [targetFile](#targetfile) - [Executors](#executors) * [Batch](#batch) * [Streaming Kafka](#streaming-kafka) @@ -481,6 +482,14 @@ Write to BigQuery using [BigQuery Connector](https://github.com/music-of-the-ain Write to Neo4j using [Neo4j Connector](https://github.com/music-of-the-ainur/neo4j.almaren) +#### targetFile + +Write to File, you must have the following parameters: format, path, saveMode of the file and parameters as a Map. For partitioning provide a list of columns, for bucketing provide number of buckets and list of columns, for sorting provide list of columns, and tableName. Check the [documentation](https://spark.apache.org/docs/latest/sql-data-sources-generic-options.html) for the full list of parameters. + +```scala +targetFile("parquet","/home/abc/targetlocation/output.parquet",SaveMode.Overwrite,Map("batchSize"->10000),List("partitionColumns"),(5,List("bucketingColumns")),List("sortingColumns"),Some("sampleTableName")) +``` + ## Executors Executors are responsible to execute Almaren Tree i.e ```Option[Tree]``` to Apache Spark. Without invoke an _executor_, code won't be executed by Apache Spark. Follow the list of _executors_: @@ -636,7 +645,8 @@ val sourcePolicy = almaren.builder.sourceHbase("""{ |"status":{"cf":"Policy", "col":"status", "type":"string"}, |"person_id":{"cf":"Policy", "col":"source", "type":"long"} |} -|}""").sql(""" SELECT * FROM __TABLE__ WHERE status = "ACTIVE" """).alias("policy") +|}""").alias("hbase") + .sql(""" SELECT * FROM hbase WHERE status = "ACTIVE" """).alias("policy") val sourcePerson = almaren.builder.sourceHbase("""{ |"table":{"namespace":"default", "name":"person"}, @@ -647,7 +657,8 @@ val sourcePerson = almaren.builder.sourceHbase("""{ |"type":{"cf":"Policy", "col":"type", "type":"string"}, |"age":{"cf":"Policy", "col":"source", "type":"string"} |} -|}""").sql(""" SELECT * FROM __TABLE__ WHERE type = "PREMIUM" """).alias("person") +|}""").alias("hbase") + .sql(""" SELECT * FROM hbase WHERE type = "PREMIUM" """).alias("person") almaren.builder.sql(""" SELECT * FROM person JOIN policy ON policy.person_id = person.id """).alias("table") .sql("SELECT *,unix_timestamp() as timestamp FROM table").alias("table1") diff --git a/src/main/scala/com/github/music/of/the/ainur/almaren/builder/core/Deserializer.scala b/src/main/scala/com/github/music/of/the/ainur/almaren/builder/core/Deserializer.scala index ef5816ba..0b532bd3 100644 --- a/src/main/scala/com/github/music/of/the/ainur/almaren/builder/core/Deserializer.scala +++ b/src/main/scala/com/github/music/of/the/ainur/almaren/builder/core/Deserializer.scala @@ -2,24 +2,28 @@ package com.github.music.of.the.ainur.almaren.builder.core import com.github.music.of.the.ainur.almaren.builder.Core import com.github.music.of.the.ainur.almaren.state.core._ -import com.github.music.of.the.ainur.almaren.{Tree, InvalidDecoder, SchemaRequired, State} +import com.github.music.of.the.ainur.almaren.{InvalidDecoder, SchemaRequired, State, Tree} -private[almaren] trait Deserializer extends Core { - def deserializer(decoder:String,columnName:String,schemaInfo:Option[String] = None): Option[Tree] = { +import java.util.Collections - def json(): State = - JsonDeserializer(columnName,schemaInfo) - def xml(): State = - XMLDeserializer(columnName,schemaInfo) - def avro(): State = - AvroDeserializer(columnName,schemaInfo.getOrElse(throw SchemaRequired(decoder))) +private[almaren] trait Deserializer extends Core { + def deserializer(decoder:String,columnName:String,schemaInfo:Option[String] = None,options:Map[String,String] = Map(),autoFlatten:Boolean = true): Option[Tree] = { + def json: State = + JsonDeserializer(columnName,schemaInfo,options,autoFlatten) + def xml: State = + XMLDeserializer(columnName,schemaInfo,options,autoFlatten) + def avro: State = + AvroDeserializer(columnName,None,options,autoFlatten,schemaInfo.getOrElse(throw SchemaRequired(decoder))) + def csv: State = + CSVDeserializer(columnName, schemaInfo, options, autoFlatten) decoder.toUpperCase match { case "JSON" => json case "XML" => xml case "AVRO" => avro + case "CSV" => csv case d => throw InvalidDecoder(d) } } -} +} \ No newline at end of file diff --git a/src/main/scala/com/github/music/of/the/ainur/almaren/builder/core/Target.scala b/src/main/scala/com/github/music/of/the/ainur/almaren/builder/core/Target.scala index 10afcea5..d41e1927 100644 --- a/src/main/scala/com/github/music/of/the/ainur/almaren/builder/core/Target.scala +++ b/src/main/scala/com/github/music/of/the/ainur/almaren/builder/core/Target.scala @@ -2,7 +2,7 @@ package com.github.music.of.the.ainur.almaren.builder.core import com.github.music.of.the.ainur.almaren.Tree import com.github.music.of.the.ainur.almaren.builder.Core -import com.github.music.of.the.ainur.almaren.state.core.{TargetJdbc, TargetSql, TargetKafka} +import com.github.music.of.the.ainur.almaren.state.core.{TargetFile, TargetJdbc, TargetSql, TargetKafka} import org.apache.spark.sql.SaveMode private[almaren] trait Target extends Core { @@ -14,4 +14,14 @@ private[almaren] trait Target extends Core { def targetKafka(servers: String, options: Map[String, String] = Map()): Option[Tree] = TargetKafka(servers, options) + + def targetFile(format: String, + path: String, + saveMode: SaveMode = SaveMode.Overwrite, + params: Map[String, String] = Map(), + partitionBy: List[String] = List.empty, + bucketBy: (Int, List[String]) = (64, List.empty), + sortBy: List[String] = List.empty, + tableName: Option[String]): Option[Tree] = + TargetFile(format, path, params, saveMode, partitionBy, bucketBy, sortBy, tableName) } diff --git a/src/main/scala/com/github/music/of/the/ainur/almaren/state/core/Deserializer.scala b/src/main/scala/com/github/music/of/the/ainur/almaren/state/core/Deserializer.scala index 8b6faf8a..eddf5734 100644 --- a/src/main/scala/com/github/music/of/the/ainur/almaren/state/core/Deserializer.scala +++ b/src/main/scala/com/github/music/of/the/ainur/almaren/state/core/Deserializer.scala @@ -1,61 +1,123 @@ package com.github.music.of.the.ainur.almaren.state.core -import com.github.music.of.the.ainur.almaren.State -import org.apache.spark.sql.DataFrame +import com.github.music.of.the.ainur.almaren.{Almaren, SchemaRequired, State} +import org.apache.spark.sql.{DataFrame, DataFrameReader, Dataset} import org.apache.spark.sql.types.{DataType, StructType} import scala.language.implicitConversions -import com.github.music.of.the.ainur.almaren.Almaren import com.github.music.of.the.ainur.almaren.util.Constants -import org.apache.spark.sql.Dataset +import org.apache.spark.sql.functions.col + +import javax.xml.crypto.Data + +trait Deserializer extends State { + + def columnName: String + def schema: Option[String] + def options: Map[String, String] + def autoFlatten: Boolean + + override def executor(df: DataFrame): DataFrame = { + val newDf = deserializer(df) + if(autoFlatten) + autoFlatten(newDf,columnName) + else + newDf + } -abstract class Deserializer() extends State { - override def executor(df: DataFrame): DataFrame = deserializer(df) def deserializer(df: DataFrame): DataFrame - implicit def string2Schema(schema: String): DataType = + + implicit def string2Schema(schema: String): StructType = StructType.fromDDL(schema) + + def autoFlatten(df: DataFrame, columnName: String): DataFrame = + df. + select("*", columnName.concat(".*")). + drop(columnName) + + def sampleData[T](df: Dataset[T]): Dataset[T] = { + df.sample( + options.getOrElse("samplingRatio","1.0").toDouble + ).limit( + options.getOrElse("samplingMaxLines","10000").toInt + ) + } + + def getReadWithOptions: DataFrameReader = + Almaren.spark.getOrCreate().read.options(options) + + def getDDL(df:DataFrame): String = + df.schema.toDDL } -case class AvroDeserializer(columnName: String,schema: String) extends Deserializer { - import org.apache.spark.sql.avro._ +case class AvroDeserializer(columnName: String, schema: Option[String] = None, options: Map[String, String], autoFlatten: Boolean, mandatorySchema: String) extends Deserializer { + + import org.apache.spark.sql.avro.functions.from_avro import org.apache.spark.sql.functions._ + import collection.JavaConversions._ + + schema.map(_ => throw SchemaRequired(s"AvroDeserializer, don't use 'schema' it must be None, use 'mandatorySchema' ")) + override def deserializer(df: DataFrame): DataFrame = { - logger.info(s"columnName:{$columnName}, schema:{$schema}") - df.withColumn(columnName,from_avro(col(columnName),schema)) - .select("*",columnName.concat(".*")).drop(columnName) + logger.info(s"columnName:{$columnName}, schema:{$mandatorySchema}, options:{$options}, autoFlatten:{$autoFlatten}") + df.withColumn(columnName, from_avro(col(columnName), mandatorySchema, options)) } } -case class JsonDeserializer(columnName: String,schema: Option[String]) extends Deserializer { +case class JsonDeserializer(columnName: String, schema: Option[String], options: Map[String, String], autoFlatten: Boolean) extends Deserializer { + import org.apache.spark.sql.functions._ + import collection.JavaConversions._ + override def deserializer(df: DataFrame): DataFrame = { import df.sparkSession.implicits._ - logger.info(s"columnName:{$columnName}, schema:{$schema}") + logger.info(s"columnName:{$columnName}, schema:{$schema}, options:{$options}, autoFlatten:{$autoFlatten}") df.withColumn(columnName, - from_json(col(columnName), - schema.getOrElse(getSchemaDDL(df.selectExpr(columnName).as[(String)])))) - .select("*",columnName.concat(".*")) - .drop(columnName) + from_json( + col(columnName), + schema.getOrElse(getSchemaDDL(df.selectExpr(columnName).as[(String)])), + options + )) } + private def getSchemaDDL(df: Dataset[String]): String = - Almaren.spark.getOrCreate().read.json(df.sample(Constants.sampleDeserializer)).schema.toDDL + getDDL(getReadWithOptions.json(sampleData(df))) } -case class XMLDeserializer(columnName: String, schema: Option[String]) extends Deserializer { +case class XMLDeserializer(columnName: String, schema: Option[String], options: Map[String, String], autoFlatten: Boolean) extends Deserializer { + import com.databricks.spark.xml.functions.from_xml import com.databricks.spark.xml.schema_of_xml import org.apache.spark.sql.functions._ override def deserializer(df: DataFrame): DataFrame = { - logger.info(s"columnName:{$columnName}") + logger.info(s"columnName:{$columnName}, schema:{$schema}, options:{$options}, autoFlatten:{$autoFlatten}") import df.sparkSession.implicits._ val xmlSchema = schema match { case Some(s) => StructType.fromDDL(s) - case None => schema_of_xml(df.select(columnName).as[String]) + case None => schema_of_xml(sampleData(df.select(columnName).as[String]), options = options) } df - .withColumn(columnName, from_xml(col(columnName), xmlSchema)) - .select("*",columnName.concat(".*")) - .drop(columnName) + .withColumn(columnName, from_xml(col(columnName), xmlSchema, options)) } } + +case class CSVDeserializer(columnName: String, schema: Option[String], options: Map[String, String], autoFlatten: Boolean) extends Deserializer { + + import org.apache.spark.sql.functions._ + import collection.JavaConversions._ + + override def deserializer(df: DataFrame): DataFrame = { + import df.sparkSession.implicits._ + logger.info(s"columnName:{$columnName}, schema:{$schema}, options:{$options}, autoFlatten:{$autoFlatten}") + df.withColumn(columnName, + from_csv( + col(columnName), + schema.getOrElse(getSchemaDDL(df.selectExpr(columnName).as[(String)])), + options + )) + } + + private def getSchemaDDL(df: Dataset[String]): String = + getDDL(getReadWithOptions.csv(sampleData(df))) +} \ No newline at end of file diff --git a/src/main/scala/com/github/music/of/the/ainur/almaren/state/core/Target.scala b/src/main/scala/com/github/music/of/the/ainur/almaren/state/core/Target.scala index 6a0d7fb2..1bdb5da7 100644 --- a/src/main/scala/com/github/music/of/the/ainur/almaren/state/core/Target.scala +++ b/src/main/scala/com/github/music/of/the/ainur/almaren/state/core/Target.scala @@ -51,17 +51,36 @@ case class TargetKafka(servers: String, options: Map[String, String]) extends Ta } } -case class TargetFile( - format: String, - path: String, - params: Map[String, String], - saveMode: SaveMode) extends Target { +case class TargetFile(format: String, + path: String, + params: Map[String, String], + saveMode: SaveMode, + partitionBy: List[String], + bucketBy: (Int, List[String]), + sortBy: List[String], + tableName: Option[String]) extends Target { override def target(df: DataFrame): DataFrame = { - logger.info(s"format:{$format}, path:{$path}, params:{$params}") - df.write + logger.info(s"format:{$format}, path:{$path}, params:{$params}, partitionBy:{$partitionBy}, bucketBy:{$bucketBy}, sort:{$sortBy},tableName:{$tableName}") + val write = df.write .format(format) + .option("path", path) .options(params) - .save() + .mode(saveMode) + + if (partitionBy.nonEmpty) + write.partitionBy(partitionBy: _*) + + if (bucketBy._2.nonEmpty) { + write.bucketBy(bucketBy._1, bucketBy._2.head, bucketBy._2.tail: _*) + if (sortBy.nonEmpty) + write.sortBy(sortBy.head, sortBy.tail: _*) + } + + tableName match { + case Some(tableName) => write.saveAsTable(tableName) + case _ => write.save + } + df } } diff --git a/src/main/scala/com/github/music/of/the/ainur/almaren/util/Constants.scala b/src/main/scala/com/github/music/of/the/ainur/almaren/util/Constants.scala index 08a8f7eb..0f6b271c 100644 --- a/src/main/scala/com/github/music/of/the/ainur/almaren/util/Constants.scala +++ b/src/main/scala/com/github/music/of/the/ainur/almaren/util/Constants.scala @@ -4,6 +4,4 @@ object Constants { val TempTableName = "__TABLE__" val TempStreamTableName = "__STREAMING__" val FrameworkName = "Almaren Framework" - - val sampleDeserializer = 1 } diff --git a/src/test/resources/data/csvDeserializer.parquet/._SUCCESS.crc b/src/test/resources/data/csvDeserializer.parquet/._SUCCESS.crc new file mode 100644 index 00000000..3b7b0449 Binary files /dev/null and b/src/test/resources/data/csvDeserializer.parquet/._SUCCESS.crc differ diff --git a/src/test/resources/data/csvDeserializer.parquet/.part-00000-481960de-7c18-4a69-a9f9-1c122d6c7cf2-c000.snappy.parquet.crc b/src/test/resources/data/csvDeserializer.parquet/.part-00000-481960de-7c18-4a69-a9f9-1c122d6c7cf2-c000.snappy.parquet.crc new file mode 100644 index 00000000..f10411c8 Binary files /dev/null and b/src/test/resources/data/csvDeserializer.parquet/.part-00000-481960de-7c18-4a69-a9f9-1c122d6c7cf2-c000.snappy.parquet.crc differ diff --git a/src/test/resources/data/csvDeserializer.parquet/.part-00001-481960de-7c18-4a69-a9f9-1c122d6c7cf2-c000.snappy.parquet.crc b/src/test/resources/data/csvDeserializer.parquet/.part-00001-481960de-7c18-4a69-a9f9-1c122d6c7cf2-c000.snappy.parquet.crc new file mode 100644 index 00000000..1e74c33d Binary files /dev/null and b/src/test/resources/data/csvDeserializer.parquet/.part-00001-481960de-7c18-4a69-a9f9-1c122d6c7cf2-c000.snappy.parquet.crc differ diff --git a/src/test/resources/data/csvDeserializer.parquet/.part-00002-481960de-7c18-4a69-a9f9-1c122d6c7cf2-c000.snappy.parquet.crc b/src/test/resources/data/csvDeserializer.parquet/.part-00002-481960de-7c18-4a69-a9f9-1c122d6c7cf2-c000.snappy.parquet.crc new file mode 100644 index 00000000..7989c5ad Binary files /dev/null and b/src/test/resources/data/csvDeserializer.parquet/.part-00002-481960de-7c18-4a69-a9f9-1c122d6c7cf2-c000.snappy.parquet.crc differ diff --git a/src/test/resources/data/csvDeserializer.parquet/.part-00003-481960de-7c18-4a69-a9f9-1c122d6c7cf2-c000.snappy.parquet.crc b/src/test/resources/data/csvDeserializer.parquet/.part-00003-481960de-7c18-4a69-a9f9-1c122d6c7cf2-c000.snappy.parquet.crc new file mode 100644 index 00000000..7a7a3cd0 Binary files /dev/null and b/src/test/resources/data/csvDeserializer.parquet/.part-00003-481960de-7c18-4a69-a9f9-1c122d6c7cf2-c000.snappy.parquet.crc differ diff --git a/src/test/resources/data/csvDeserializer.parquet/_SUCCESS b/src/test/resources/data/csvDeserializer.parquet/_SUCCESS new file mode 100644 index 00000000..e69de29b diff --git a/src/test/resources/data/csvDeserializer.parquet/part-00000-481960de-7c18-4a69-a9f9-1c122d6c7cf2-c000.snappy.parquet b/src/test/resources/data/csvDeserializer.parquet/part-00000-481960de-7c18-4a69-a9f9-1c122d6c7cf2-c000.snappy.parquet new file mode 100644 index 00000000..cdf569c4 Binary files /dev/null and b/src/test/resources/data/csvDeserializer.parquet/part-00000-481960de-7c18-4a69-a9f9-1c122d6c7cf2-c000.snappy.parquet differ diff --git a/src/test/resources/data/csvDeserializer.parquet/part-00001-481960de-7c18-4a69-a9f9-1c122d6c7cf2-c000.snappy.parquet b/src/test/resources/data/csvDeserializer.parquet/part-00001-481960de-7c18-4a69-a9f9-1c122d6c7cf2-c000.snappy.parquet new file mode 100644 index 00000000..7ea6bec8 Binary files /dev/null and b/src/test/resources/data/csvDeserializer.parquet/part-00001-481960de-7c18-4a69-a9f9-1c122d6c7cf2-c000.snappy.parquet differ diff --git a/src/test/resources/data/csvDeserializer.parquet/part-00002-481960de-7c18-4a69-a9f9-1c122d6c7cf2-c000.snappy.parquet b/src/test/resources/data/csvDeserializer.parquet/part-00002-481960de-7c18-4a69-a9f9-1c122d6c7cf2-c000.snappy.parquet new file mode 100644 index 00000000..2268a17e Binary files /dev/null and b/src/test/resources/data/csvDeserializer.parquet/part-00002-481960de-7c18-4a69-a9f9-1c122d6c7cf2-c000.snappy.parquet differ diff --git a/src/test/resources/data/csvDeserializer.parquet/part-00003-481960de-7c18-4a69-a9f9-1c122d6c7cf2-c000.snappy.parquet b/src/test/resources/data/csvDeserializer.parquet/part-00003-481960de-7c18-4a69-a9f9-1c122d6c7cf2-c000.snappy.parquet new file mode 100644 index 00000000..07eb47a5 Binary files /dev/null and b/src/test/resources/data/csvDeserializer.parquet/part-00003-481960de-7c18-4a69-a9f9-1c122d6c7cf2-c000.snappy.parquet differ diff --git a/src/test/resources/data/csvDeserializerSchema.parquet/._SUCCESS.crc b/src/test/resources/data/csvDeserializerSchema.parquet/._SUCCESS.crc new file mode 100644 index 00000000..3b7b0449 Binary files /dev/null and b/src/test/resources/data/csvDeserializerSchema.parquet/._SUCCESS.crc differ diff --git a/src/test/resources/data/csvDeserializerSchema.parquet/.part-00000-83307a04-3932-47db-bdfa-c1d0e4dcfb49-c000.snappy.parquet.crc b/src/test/resources/data/csvDeserializerSchema.parquet/.part-00000-83307a04-3932-47db-bdfa-c1d0e4dcfb49-c000.snappy.parquet.crc new file mode 100644 index 00000000..aec0f1ab Binary files /dev/null and b/src/test/resources/data/csvDeserializerSchema.parquet/.part-00000-83307a04-3932-47db-bdfa-c1d0e4dcfb49-c000.snappy.parquet.crc differ diff --git a/src/test/resources/data/csvDeserializerSchema.parquet/.part-00001-83307a04-3932-47db-bdfa-c1d0e4dcfb49-c000.snappy.parquet.crc b/src/test/resources/data/csvDeserializerSchema.parquet/.part-00001-83307a04-3932-47db-bdfa-c1d0e4dcfb49-c000.snappy.parquet.crc new file mode 100644 index 00000000..9b93eb0c Binary files /dev/null and b/src/test/resources/data/csvDeserializerSchema.parquet/.part-00001-83307a04-3932-47db-bdfa-c1d0e4dcfb49-c000.snappy.parquet.crc differ diff --git a/src/test/resources/data/csvDeserializerSchema.parquet/.part-00002-83307a04-3932-47db-bdfa-c1d0e4dcfb49-c000.snappy.parquet.crc b/src/test/resources/data/csvDeserializerSchema.parquet/.part-00002-83307a04-3932-47db-bdfa-c1d0e4dcfb49-c000.snappy.parquet.crc new file mode 100644 index 00000000..e8ef725c Binary files /dev/null and b/src/test/resources/data/csvDeserializerSchema.parquet/.part-00002-83307a04-3932-47db-bdfa-c1d0e4dcfb49-c000.snappy.parquet.crc differ diff --git a/src/test/resources/data/csvDeserializerSchema.parquet/.part-00003-83307a04-3932-47db-bdfa-c1d0e4dcfb49-c000.snappy.parquet.crc b/src/test/resources/data/csvDeserializerSchema.parquet/.part-00003-83307a04-3932-47db-bdfa-c1d0e4dcfb49-c000.snappy.parquet.crc new file mode 100644 index 00000000..1b0f7d5a Binary files /dev/null and b/src/test/resources/data/csvDeserializerSchema.parquet/.part-00003-83307a04-3932-47db-bdfa-c1d0e4dcfb49-c000.snappy.parquet.crc differ diff --git a/src/test/resources/data/csvDeserializerSchema.parquet/_SUCCESS b/src/test/resources/data/csvDeserializerSchema.parquet/_SUCCESS new file mode 100644 index 00000000..e69de29b diff --git a/src/test/resources/data/csvDeserializerSchema.parquet/part-00000-83307a04-3932-47db-bdfa-c1d0e4dcfb49-c000.snappy.parquet b/src/test/resources/data/csvDeserializerSchema.parquet/part-00000-83307a04-3932-47db-bdfa-c1d0e4dcfb49-c000.snappy.parquet new file mode 100644 index 00000000..0d9a3a62 Binary files /dev/null and b/src/test/resources/data/csvDeserializerSchema.parquet/part-00000-83307a04-3932-47db-bdfa-c1d0e4dcfb49-c000.snappy.parquet differ diff --git a/src/test/resources/data/csvDeserializerSchema.parquet/part-00001-83307a04-3932-47db-bdfa-c1d0e4dcfb49-c000.snappy.parquet b/src/test/resources/data/csvDeserializerSchema.parquet/part-00001-83307a04-3932-47db-bdfa-c1d0e4dcfb49-c000.snappy.parquet new file mode 100644 index 00000000..b54cb034 Binary files /dev/null and b/src/test/resources/data/csvDeserializerSchema.parquet/part-00001-83307a04-3932-47db-bdfa-c1d0e4dcfb49-c000.snappy.parquet differ diff --git a/src/test/resources/data/csvDeserializerSchema.parquet/part-00002-83307a04-3932-47db-bdfa-c1d0e4dcfb49-c000.snappy.parquet b/src/test/resources/data/csvDeserializerSchema.parquet/part-00002-83307a04-3932-47db-bdfa-c1d0e4dcfb49-c000.snappy.parquet new file mode 100644 index 00000000..7fd3bb52 Binary files /dev/null and b/src/test/resources/data/csvDeserializerSchema.parquet/part-00002-83307a04-3932-47db-bdfa-c1d0e4dcfb49-c000.snappy.parquet differ diff --git a/src/test/resources/data/csvDeserializerSchema.parquet/part-00003-83307a04-3932-47db-bdfa-c1d0e4dcfb49-c000.snappy.parquet b/src/test/resources/data/csvDeserializerSchema.parquet/part-00003-83307a04-3932-47db-bdfa-c1d0e4dcfb49-c000.snappy.parquet new file mode 100644 index 00000000..08723d1c Binary files /dev/null and b/src/test/resources/data/csvDeserializerSchema.parquet/part-00003-83307a04-3932-47db-bdfa-c1d0e4dcfb49-c000.snappy.parquet differ diff --git a/src/test/scala/com/github/music/of/the/ainur/almaren/Test.scala b/src/test/scala/com/github/music/of/the/ainur/almaren/Test.scala index 1211e2d3..1fda37ae 100644 --- a/src/test/scala/com/github/music/of/the/ainur/almaren/Test.scala +++ b/src/test/scala/com/github/music/of/the/ainur/almaren/Test.scala @@ -6,8 +6,7 @@ import org.apache.spark.sql.{AnalysisException, Column, DataFrame, SaveMode} import org.scalatest._ import org.apache.spark.sql.avro._ - - +import java.io.File import scala.collection.immutable._ class Test extends FunSuite with BeforeAndAfter { @@ -62,10 +61,42 @@ class Test extends FunSuite with BeforeAndAfter { test(testSourceTargetJdbc(moviesDf), moviesDf, "SourceTargetJdbcTest") test(testSourceTargetJdbcUserPassword(moviesDf), moviesDf, "SourceTargetJdbcTestUserPassword") - test(testSourceFile("parquet","src/test/resources/sample_data/emp.parquet"), - spark.read.parquet("src/test/resources/sample_output/employee.parquet"),"SourceParquetFileTest") - test(testSourceFile("avro","src/test/resources/sample_data/emp.avro"), - spark.read.parquet("src/test/resources/sample_output/employee.parquet"),"SourceAvroFileTest") + test(testSourceFile("parquet", "src/test/resources/sample_data/emp.parquet"), + spark.read.parquet("src/test/resources/sample_output/employee.parquet"), "SourceParquetFileTest") + test(testSourceFile("avro", "src/test/resources/sample_data/emp.avro"), + spark.read.parquet("src/test/resources/sample_output/employee.parquet"), "SourceAvroFileTest") + test(testTargetFileTarget("parquet", + "/tmp/target.parquet", + SaveMode.Overwrite, + Map(), + List("year"), + (3, List("title")), + List("title"), + Some("table1")), + movies, "TargetParquetFileTest") + test(testTargetFileTarget("avro", "/tmp/target.avro", + SaveMode.Overwrite, + Map(), + List("year"), + (3, List("title")), + List("title"), + Some("table2")), + movies, "TargetAvroFileTest") + + test(testSourceFile("parquet", "/tmp/target.parquet"), movies, "TargetParquetFileTest1") + test(testSourceFile("avro", "/tmp/target.avro"), movies, "TargetAvroFileTest1") + + test( + testTargetFileTarget("parquet", "/tmp/target.parquet", SaveMode.Overwrite, Map(), List("year"), (3, List("title")), List("title"), Some("tableTarget1")), + testSourceSql("tableTarget1"), + "TargetParquetFileTest2") + test( + testTargetFileTarget("avro", "/tmp/target.avro", SaveMode.Overwrite, Map(), List("year"), (3, List("title")), List("title"), Some("tableTarget2")), + testSourceSql("tableTarget2"), + "TargetAvroFileTest2") + + testTargetFileBucketPartition("/tmp/target.parquet", List("year"), (3, List("title")), "parquet") + testTargetFileBucketPartition("/tmp/target.avro", List("year"), (3, List("title")), "avro") repartitionAndColaeseTest(moviesDf) repartitionWithColumnTest(df) repartitionWithSizeAndColumnTest(df) @@ -79,6 +110,7 @@ class Test extends FunSuite with BeforeAndAfter { deserializerJsonTest() deserializerXmlTest() deserializerAvroTest() + deserializerCsvTest() testInferSchemaJsonColumn() testInferSchemaDataframe(moviesDf) @@ -160,11 +192,61 @@ class Test extends FunSuite with BeforeAndAfter { .batch } - def testSourceFile(format:String,path:String):DataFrame ={ + def testSourceFile(format: String, path: String): DataFrame = { + almaren.builder + .sourceFile(format, path, Map()) + .batch + + } + + def testSourceSql(tableName: String): DataFrame = { + almaren.builder + .sourceSql(s"select * from $tableName") + .batch + } + + def testTargetFileTarget(format: String, path: String, saveMode: SaveMode, params: Map[String, String], partitionBy: List[String], bucketBy: (Int, List[String]), sortBy: List[String], tableName: Option[String]): DataFrame = { almaren.builder - .sourceFile(format,path,Map()) + .sourceDataFrame(movies) + .targetFile(format, path, saveMode, params, partitionBy, bucketBy, sortBy, tableName) .batch + } + def testTargetFileBucketPartition(path: String, partitionBy: List[String], bucketBy: (Int, List[String]), fileFormat: String) = { + val filesList = getListOfDirectories(path).map(_.toString) + if (partitionBy.nonEmpty) { + val extractFiles = filesList.map(a => a.substring(a.lastIndexOf("=") + 1)) + val distinctValues = movies.select(partitionBy(0)).distinct.as[String].collect.toList + val checkLists = extractFiles.intersect(distinctValues) + test(s"partitionBy_$fileFormat") { + assert(checkLists.size == distinctValues.size) + } + } + if (bucketBy._2.nonEmpty) { + val check = filesList.map(f => getListOfFiles(f).size) + val bool = if (check.forall(_ == check.head)) check.head == 2 * bucketBy._1 else false + test(s"bucketBy_$fileFormat") { + assert(bool == true) + } + } + } + + def getListOfDirectories(dir: String): List[File] = { + val d = new File(dir) + if (d.exists && d.isDirectory) { + d.listFiles.filter(_.isDirectory).toList + } else { + List[File]() + } + } + + def getListOfFiles(dir: String): List[File] = { + val d = new File(dir) + if (d.exists && d.isDirectory) { + d.listFiles.filter(_.isFile).toList + } else { + List[File]() + } } def repartitionAndColaeseTest(dataFrame: DataFrame) { @@ -237,6 +319,7 @@ class Test extends FunSuite with BeforeAndAfter { assert(aliasTableCount > 0) } } + def testingDrop(moviesDf: DataFrame): Unit = { moviesDf.createTempView("Test_drop") @@ -247,6 +330,7 @@ class Test extends FunSuite with BeforeAndAfter { test(testDF, testDropcompare, "Testing Drop") } + def testingWhere(moviesDf: DataFrame): Unit = { moviesDf.createTempView("Test_where") @@ -257,6 +341,7 @@ class Test extends FunSuite with BeforeAndAfter { test(testDF, testWherecompare, "Testing Where") } + def testingSqlExpr(): Unit = { val df = Seq( @@ -269,10 +354,11 @@ class Test extends FunSuite with BeforeAndAfter { df.createOrReplaceTempView("person_info") - val testDF = almaren.builder.sourceSql("select CAST (salary as INT) from person_info" ).batch + val testDF = almaren.builder.sourceSql("select CAST (salary as INT) from person_info").batch val testSqlExprcompare = almaren.builder.sourceSql("select * from person_info").sqlExpr("CAST(salary as int)").batch test(testDF, testSqlExprcompare, "Testing sqlExpr") - } + } + def testingSourceDataFrame(): Unit = { val testDS = spark.range(3) @@ -283,8 +369,7 @@ class Test extends FunSuite with BeforeAndAfter { } - - def cacheTest(df: DataFrame): Unit = { + def cacheTest(df: DataFrame): Unit = { df.createTempView("cache_test") @@ -304,7 +389,7 @@ class Test extends FunSuite with BeforeAndAfter { def testingPipe(df: DataFrame): Unit = { df.createTempView("pipe_view") - val pipeDf = almaren.builder.sql("select * from pipe_view").pipe("echo","Testing Echo Command").batch + val pipeDf = almaren.builder.sql("select * from pipe_view").pipe("echo", "Testing Echo Command").batch val pipeDfCount = pipeDf.count() test("Testing Pipe") { assert(pipeDfCount > 0) @@ -391,13 +476,34 @@ class Test extends FunSuite with BeforeAndAfter { val df = spark.sql("select * from sample_json_table") val jsonSchema = "`address` STRING,`age` BIGINT,`name` STRING" - val generatedSchema = Util.genDDLFromJsonString(df, "json_string",0.1) + val generatedSchema = Util.genDDLFromJsonString(df, "json_string", 0.1) testSchema(jsonSchema, generatedSchema, "Test infer schema for json column") } + def deserializerCsvTest(): Unit = { + val df = Seq( + ("John,Chris", "Smith", "London"), + ("David,Michael", "Jones", "India"), + ("Joseph,Mike", "Lee", "Russia"), + ("Chris,Tony", "Brown", "Indonesia"), + ).toDF("first_name", "last_name", "country") + val newCsvDF = almaren.builder + .sourceDataFrame(df) + .deserializer("CSV", "first_name", options = Map("header" -> "false")) + .batch + val newCsvSchemaDf = almaren.builder + .sourceDataFrame(df) + .deserializer("CSV", "first_name", Some("`first_name_1` STRING,`first_name_2` STRING"), Map("header" -> "true")) + .batch + val csvDf = spark.read.parquet("src/test/resources/data/csvDeserializer.parquet") + val csvSchemaDf = spark.read.parquet("src/test/resources/data/csvDeserializerSchema.parquet") + test(newCsvDF, csvDf, "Deserialize CSV") + test(newCsvSchemaDf, csvSchemaDf, "Deserialize CSV Schema") + } + def testInferSchemaDataframe(df: DataFrame): Unit = { val dfSchema = "`cast` ARRAY,`genres` ARRAY,`title` STRING,`year` BIGINT" - val generatedSchema = Util.genDDLFromDataFrame(df,0.1) + val generatedSchema = Util.genDDLFromDataFrame(df, 0.1) testSchema(dfSchema, generatedSchema, "Test infer schema for dataframe") }