diff --git a/README.md b/README.md index b898836e..d182b757 100644 --- a/README.md +++ b/README.md @@ -41,6 +41,7 @@ The Almaren Framework provides a simplified consistent minimalistic layer over A + [targetSolr](#targetsolr) + [targetMongoDb](#targetmongodb) + [targetBigQuery](#targetbigquery) + + [targetFile](#targetfile) - [Executors](#executors) * [Batch](#batch) * [Streaming Kafka](#streaming-kafka) @@ -481,6 +482,14 @@ Write to BigQuery using [BigQuery Connector](https://github.com/music-of-the-ain Write to Neo4j using [Neo4j Connector](https://github.com/music-of-the-ainur/neo4j.almaren) +#### targetFile + +Write to File, you must have the following parameters: format, path, saveMode of the file and parameters as a Map. For partitioning provide a list of columns, for bucketing provide number of buckets and list of columns, and for sorting provide list of columns. Check the [documentation](https://spark.apache.org/docs/latest/sql-data-sources-generic-options.html) for the full list of parameters. + +```scala +targetFile("parquet","/home/abc/targetlocation/output.parquet",SaveMode.Overwrite,Map("batchSize"->10000),List("partitionColumns"),(5,List("bucketingColumns")),List("sortingColumns")) +``` + ## Executors Executors are responsible to execute Almaren Tree i.e ```Option[Tree]``` to Apache Spark. Without invoke an _executor_, code won't be executed by Apache Spark. Follow the list of _executors_: diff --git a/build.sbt b/build.sbt index 7b151a73..fdb0cf9d 100755 --- a/build.sbt +++ b/build.sbt @@ -42,6 +42,12 @@ ThisBuild / developers := List( name = "Daniel Mantovani", email = "daniel.mantovani@modak.com", url = url("https://github.com/music-of-the-ainur") + ), + Developer( + id = "badrinathpatchikolla", + name = "Badrinath Patchikolla", + email = "badrinath.patchikolla@modakanalytics.com", + url = url("https://github.com/music-of-the-ainur") ) ) diff --git a/src/main/scala/com/github/music/of/the/ainur/almaren/builder/core/Target.scala b/src/main/scala/com/github/music/of/the/ainur/almaren/builder/core/Target.scala index 10afcea5..cc2c4dcb 100644 --- a/src/main/scala/com/github/music/of/the/ainur/almaren/builder/core/Target.scala +++ b/src/main/scala/com/github/music/of/the/ainur/almaren/builder/core/Target.scala @@ -2,7 +2,7 @@ package com.github.music.of.the.ainur.almaren.builder.core import com.github.music.of.the.ainur.almaren.Tree import com.github.music.of.the.ainur.almaren.builder.Core -import com.github.music.of.the.ainur.almaren.state.core.{TargetJdbc, TargetSql, TargetKafka} +import com.github.music.of.the.ainur.almaren.state.core.{TargetFile, TargetJdbc, TargetKafka, TargetSql} import org.apache.spark.sql.SaveMode private[almaren] trait Target extends Core { @@ -14,4 +14,14 @@ private[almaren] trait Target extends Core { def targetKafka(servers: String, options: Map[String, String] = Map()): Option[Tree] = TargetKafka(servers, options) + + def targetFile(format: String, + path: String, + saveMode: SaveMode = SaveMode.Overwrite, + params: Map[String, String] = Map(), + partitionBy: List[String] = List.empty, + bucketBy: (Int, List[String]) = (64, List.empty), + sortBy: List[String] = List.empty, + tableName: Option[String]): Option[Tree] = + TargetFile(format, path, params, saveMode, partitionBy, bucketBy, sortBy, tableName) } diff --git a/src/main/scala/com/github/music/of/the/ainur/almaren/state/core/Target.scala b/src/main/scala/com/github/music/of/the/ainur/almaren/state/core/Target.scala index d4bb103a..1107bb7d 100644 --- a/src/main/scala/com/github/music/of/the/ainur/almaren/state/core/Target.scala +++ b/src/main/scala/com/github/music/of/the/ainur/almaren/state/core/Target.scala @@ -2,7 +2,8 @@ package com.github.music.of.the.ainur.almaren.state.core import com.github.music.of.the.ainur.almaren.State import com.github.music.of.the.ainur.almaren.util.Constants -import org.apache.spark.sql.{DataFrame, SaveMode} +import org.apache.spark.sql.{Column, DataFrame, DataFrameWriter, SaveMode} + private[almaren] abstract class Target extends State { override def executor(df: DataFrame): DataFrame = target(df) @@ -52,18 +53,38 @@ case class TargetKafka(servers: String, options: Map[String, String]) extends Ta } } -case class TargetFile( - format: String, - path: String, - params: Map[String, String], - saveMode: SaveMode) extends Target { +case class TargetFile(format: String, + path: String, + params: Map[String, String], + saveMode: SaveMode, + partitionBy: List[String], + bucketBy: (Int, List[String]), + sortBy: List[String], + tableName: Option[String]) extends Target { override def target(df: DataFrame): DataFrame = { - logger.info(s"format:{$format}, path:{$path}, params:{$params}") - df.write + logger.info(s"format:{$format}, path:{$path}, params:{$params}, partitionBy:{$partitionBy}, bucketBy:{$bucketBy}, sort:{$sortBy},tableName:{$tableName}") + val write = df.write .format(format) + .option("path", path) .options(params) - .save() + .mode(saveMode) + + if (partitionBy.nonEmpty) + write.partitionBy(partitionBy: _*) + + if (bucketBy._2.nonEmpty) { + write.bucketBy(bucketBy._1, bucketBy._2.head, bucketBy._2.tail: _*) + if (sortBy.nonEmpty) + write.sortBy(sortBy.head, sortBy.tail: _*) + } + + tableName match { + case Some(tableName) => write.saveAsTable(tableName) + case _ => write.save + } + df } } + diff --git a/src/test/scala/com/github/music/of/the/ainur/almaren/Test.scala b/src/test/scala/com/github/music/of/the/ainur/almaren/Test.scala index 864a18b6..b0a3de46 100644 --- a/src/test/scala/com/github/music/of/the/ainur/almaren/Test.scala +++ b/src/test/scala/com/github/music/of/the/ainur/almaren/Test.scala @@ -1,13 +1,12 @@ package com.github.music.of.the.ainur.almaren import com.github.music.of.the.ainur.almaren.builder.Core.Implicit +import org.apache.spark.sql.avro._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.{AnalysisException, Column, DataFrame, SaveMode} import org.scalatest._ -import org.apache.spark.sql.avro._ - - +import java.io.File import scala.collection.immutable._ class Test extends FunSuite with BeforeAndAfter { @@ -62,10 +61,46 @@ class Test extends FunSuite with BeforeAndAfter { test(testSourceTargetJdbc(moviesDf), moviesDf, "SourceTargetJdbcTest") test(testSourceTargetJdbcUserPassword(moviesDf), moviesDf, "SourceTargetJdbcTestUserPassword") - test(testSourceFile("parquet","src/test/resources/sample_data/emp.parquet"), - spark.read.parquet("src/test/resources/sample_output/employee.parquet"),"SourceParquetFileTest") - test(testSourceFile("avro","src/test/resources/sample_data/emp.avro"), - spark.read.parquet("src/test/resources/sample_output/employee.parquet"),"SourceAvroFileTest") + test(testSourceFile("parquet", "src/test/resources/sample_data/emp.parquet"), + spark.read.parquet("src/test/resources/sample_output/employee.parquet"), "SourceParquetFileTest") + test(testSourceFile("avro", "src/test/resources/sample_data/emp.avro"), + spark.read.parquet("src/test/resources/sample_output/employee.parquet"), "SourceAvroFileTest") + + test( + testTargetFileTarget("parquet", + "src/test/resources/sample_target/target.parquet", + SaveMode.Overwrite, + Map(), + List("year"), + (3, List("title")), + List("title"), + Some("table1")), + movies, "TargetParquetFileTest") + test( + testTargetFileTarget("avro", "src/test/resources/sample_target/target.avro", + SaveMode.Overwrite, + Map(), + List("year"), + (3, List("title")), + List("title"), + Some("table2")), + movies, "TargetAvroFileTest") + + test(testSourceFile("parquet", "src/test/resources/sample_target/target.parquet"), movies, "TargetParquetFileTest1") + test(testSourceFile("avro", "src/test/resources/sample_target/target.avro"), movies, "TargetAvroFileTest1") + + test( + testTargetFileTarget("parquet", "src/test/resources/sample_target/target.parquet", SaveMode.Overwrite, Map(), List("year"), (3, List("title")), List("title"), Some("tableTarget1")), + testSourceSql("tableTarget1"), + "TargetParquetFileTest2") + test( + testTargetFileTarget("avro", "src/test/resources/sample_target/target.avro", SaveMode.Overwrite, Map(), List("year"), (3, List("title")), List("title"), Some("tableTarget2")), + testSourceSql("tableTarget2"), + "TargetAvroFileTest2") + + testTargetFileBucketPartition("src/test/resources/sample_target/target.parquet", List("year"), (3, List("title")),"parquet") + testTargetFileBucketPartition("src/test/resources/sample_target/target.avro",List("year"),(3,List("title")),"avro") + repartitionAndColaeseTest(moviesDf) repartitionWithColumnTest(df) repartitionWithSizeAndColumnTest(df) @@ -152,21 +187,72 @@ class Test extends FunSuite with BeforeAndAfter { def testSourceTargetJdbc(df: DataFrame): DataFrame = { almaren.builder .sourceSql(s"select * from $testTable") - .targetJdbc("jdbc:postgresql://localhost/almaren", "org.postgresql.Driver", "movies_test", SaveMode.Overwrite) + .targetJdbc("jdbc:postgresql://localhost/almaren", "org.postgresql.Driver", "movies_test", SaveMode.Overwrite, Some("postgres"), Some("foo")) + .batch + + almaren.builder + .sourceJdbc("jdbc:postgresql://localhost/almaren", "org.postgresql.Driver", "select * from movies_test", Some("postgres"), Some("foo")) .batch + } + def testSourceFile(format: String, path: String): DataFrame = { almaren.builder - .sourceJdbc("jdbc:postgresql://localhost/almaren", "org.postgresql.Driver", "select * from movies_test") + .sourceFile(format, path, Map()) .batch + } - def testSourceFile(format:String,path:String):DataFrame ={ + def testSourceSql(tableName: String): DataFrame = { almaren.builder - .sourceFile(format,path,Map()) + .sourceSql(s"select * from $tableName") .batch } + def testTargetFileTarget(format: String, path: String, saveMode: SaveMode, params: Map[String, String], partitionBy: List[String], bucketBy: (Int, List[String]), sortBy: List[String], tableName: Option[String]): DataFrame = { + almaren.builder + .sourceDataFrame(movies) + .targetFile(format, path, saveMode, params, partitionBy, bucketBy, sortBy, tableName) + .batch + } + + def testTargetFileBucketPartition(path: String, partitionBy: List[String], bucketBy: (Int, List[String]),fileFormat: String) = { + val filesList = getListOfDirectories(path).map(_.toString) + if (partitionBy.nonEmpty) { + val extractFiles = filesList.map(a => a.substring(a.lastIndexOf("=") + 1)) + val distinctValues = movies.select(partitionBy(0)).distinct.as[String].collect.toList + val checkLists = extractFiles.intersect(distinctValues) + test(s"partitionBy_$fileFormat") { + assert(checkLists.size == distinctValues.size) + } + } + if (bucketBy._2.nonEmpty) { + val check = filesList.map(f => getListOfFiles(f).size) + val bool = if (check.forall(_ == check.head)) check.head == 2 * bucketBy._1 else false + test(s"bucketBy_$fileFormat") { + assert(bool == true) + } + } + } + + def getListOfDirectories(dir: String): List[File] = { + val d = new File(dir) + if (d.exists && d.isDirectory) { + d.listFiles.filter(_.isDirectory).toList + } else { + List[File]() + } + } + + def getListOfFiles(dir: String): List[File] = { + val d = new File(dir) + if (d.exists && d.isDirectory) { + d.listFiles.filter(_.isFile).toList + } else { + List[File]() + } + } + def repartitionAndColaeseTest(dataFrame: DataFrame) { val repartition_df = almaren.builder.sourceSql(s"select * from $testTable") .repartition(10).batch @@ -237,6 +323,7 @@ class Test extends FunSuite with BeforeAndAfter { assert(aliasTableCount > 0) } } + def testingDrop(moviesDf: DataFrame): Unit = { moviesDf.createTempView("Test_drop") @@ -247,6 +334,7 @@ class Test extends FunSuite with BeforeAndAfter { test(testDF, testDropcompare, "Testing Drop") } + def testingWhere(moviesDf: DataFrame): Unit = { moviesDf.createTempView("Test_where") @@ -257,6 +345,7 @@ class Test extends FunSuite with BeforeAndAfter { test(testDF, testWherecompare, "Testing Where") } + def testingSqlExpr(): Unit = { val df = Seq( @@ -269,10 +358,11 @@ class Test extends FunSuite with BeforeAndAfter { df.createOrReplaceTempView("person_info") - val testDF = almaren.builder.sourceSql("select CAST (salary as INT) from person_info" ).batch + val testDF = almaren.builder.sourceSql("select CAST (salary as INT) from person_info").batch val testSqlExprcompare = almaren.builder.sourceSql("select * from person_info").sqlExpr("CAST(salary as int)").batch test(testDF, testSqlExprcompare, "Testing sqlExpr") - } + } + def testingSourceDataFrame(): Unit = { val testDS = spark.range(3) @@ -283,8 +373,7 @@ class Test extends FunSuite with BeforeAndAfter { } - - def cacheTest(df: DataFrame): Unit = { + def cacheTest(df: DataFrame): Unit = { df.createTempView("cache_test") @@ -304,7 +393,7 @@ class Test extends FunSuite with BeforeAndAfter { def testingPipe(df: DataFrame): Unit = { df.createTempView("pipe_view") - val pipeDf = almaren.builder.sql("select * from pipe_view").pipe("echo","Testing Echo Command").batch + val pipeDf = almaren.builder.sql("select * from pipe_view").pipe("echo", "Testing Echo Command").batch val pipeDfCount = pipeDf.count() test("Testing Pipe") { assert(pipeDfCount > 0) @@ -391,13 +480,13 @@ class Test extends FunSuite with BeforeAndAfter { val df = spark.sql("select * from sample_json_table") val jsonSchema = "`address` STRING,`age` BIGINT,`name` STRING" - val generatedSchema = Util.genDDLFromJsonString(df, "json_string",0.1) + val generatedSchema = Util.genDDLFromJsonString(df, "json_string", 0.1) testSchema(jsonSchema, generatedSchema, "Test infer schema for json column") } def testInferSchemaDataframe(df: DataFrame): Unit = { val dfSchema = "`cast` ARRAY,`genres` ARRAY,`title` STRING,`year` BIGINT" - val generatedSchema = Util.genDDLFromDataFrame(df,0.1) + val generatedSchema = Util.genDDLFromDataFrame(df, 0.1) testSchema(dfSchema, generatedSchema, "Test infer schema for dataframe") } @@ -409,3 +498,4 @@ class Test extends FunSuite with BeforeAndAfter { } } +