Skip to content

Commit

Permalink
Merge pull request #39 from badrinathpatchikolla/master
Browse files Browse the repository at this point in the history
Added Target File Code and Test Cases
  • Loading branch information
mantovani authored Mar 28, 2022
2 parents aef17c9 + 8b6148f commit 47cf50a
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 28 deletions.
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ The Almaren Framework provides a simplified consistent minimalistic layer over A
+ [targetSolr](#targetsolr)
+ [targetMongoDb](#targetmongodb)
+ [targetBigQuery](#targetbigquery)
+ [targetFile](#targetfile)
- [Executors](#executors)
* [Batch](#batch)
* [Streaming Kafka](#streaming-kafka)
Expand Down Expand Up @@ -481,6 +482,14 @@ Write to BigQuery using [BigQuery Connector](https://github.com/music-of-the-ain

Write to Neo4j using [Neo4j Connector](https://github.com/music-of-the-ainur/neo4j.almaren)

#### targetFile

Write to File, you must have the following parameters: format, path, saveMode of the file and parameters as a Map. For partitioning provide a list of columns, for bucketing provide number of buckets and list of columns, and for sorting provide list of columns. Check the [documentation](https://spark.apache.org/docs/latest/sql-data-sources-generic-options.html) for the full list of parameters.

```scala
targetFile("parquet","/home/abc/targetlocation/output.parquet",SaveMode.Overwrite,Map("batchSize"->10000),List("partitionColumns"),(5,List("bucketingColumns")),List("sortingColumns"))
```

## Executors

Executors are responsible to execute Almaren Tree i.e ```Option[Tree]``` to Apache Spark. Without invoke an _executor_, code won't be executed by Apache Spark. Follow the list of _executors_:
Expand Down
6 changes: 6 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ ThisBuild / developers := List(
name = "Daniel Mantovani",
email = "[email protected]",
url = url("https://github.com/music-of-the-ainur")
),
Developer(
id = "badrinathpatchikolla",
name = "Badrinath Patchikolla",
email = "[email protected]",
url = url("https://github.com/music-of-the-ainur")
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.github.music.of.the.ainur.almaren.builder.core

import com.github.music.of.the.ainur.almaren.Tree
import com.github.music.of.the.ainur.almaren.builder.Core
import com.github.music.of.the.ainur.almaren.state.core.{TargetJdbc, TargetSql, TargetKafka}
import com.github.music.of.the.ainur.almaren.state.core.{TargetFile, TargetJdbc, TargetKafka, TargetSql}
import org.apache.spark.sql.SaveMode

private[almaren] trait Target extends Core {
Expand All @@ -14,4 +14,14 @@ private[almaren] trait Target extends Core {

def targetKafka(servers: String, options: Map[String, String] = Map()): Option[Tree] =
TargetKafka(servers, options)

def targetFile(format: String,
path: String,
saveMode: SaveMode = SaveMode.Overwrite,
params: Map[String, String] = Map(),
partitionBy: List[String] = List.empty,
bucketBy: (Int, List[String]) = (64, List.empty),
sortBy: List[String] = List.empty,
tableName: Option[String]): Option[Tree] =
TargetFile(format, path, params, saveMode, partitionBy, bucketBy, sortBy, tableName)
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package com.github.music.of.the.ainur.almaren.state.core

import com.github.music.of.the.ainur.almaren.State
import com.github.music.of.the.ainur.almaren.util.Constants
import org.apache.spark.sql.{DataFrame, SaveMode}
import org.apache.spark.sql.{Column, DataFrame, DataFrameWriter, SaveMode}


private[almaren] abstract class Target extends State {
override def executor(df: DataFrame): DataFrame = target(df)
Expand Down Expand Up @@ -52,18 +53,38 @@ case class TargetKafka(servers: String, options: Map[String, String]) extends Ta
}
}

case class TargetFile(
format: String,
path: String,
params: Map[String, String],
saveMode: SaveMode) extends Target {
case class TargetFile(format: String,
path: String,
params: Map[String, String],
saveMode: SaveMode,
partitionBy: List[String],
bucketBy: (Int, List[String]),
sortBy: List[String],
tableName: Option[String]) extends Target {
override def target(df: DataFrame): DataFrame = {
logger.info(s"format:{$format}, path:{$path}, params:{$params}")
df.write
logger.info(s"format:{$format}, path:{$path}, params:{$params}, partitionBy:{$partitionBy}, bucketBy:{$bucketBy}, sort:{$sortBy},tableName:{$tableName}")
val write = df.write
.format(format)
.option("path", path)
.options(params)
.save()
.mode(saveMode)

if (partitionBy.nonEmpty)
write.partitionBy(partitionBy: _*)

if (bucketBy._2.nonEmpty) {
write.bucketBy(bucketBy._1, bucketBy._2.head, bucketBy._2.tail: _*)
if (sortBy.nonEmpty)
write.sortBy(sortBy.head, sortBy.tail: _*)
}

tableName match {
case Some(tableName) => write.saveAsTable(tableName)
case _ => write.save
}

df
}
}


126 changes: 108 additions & 18 deletions src/test/scala/com/github/music/of/the/ainur/almaren/Test.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package com.github.music.of.the.ainur.almaren

import com.github.music.of.the.ainur.almaren.builder.Core.Implicit
import org.apache.spark.sql.avro._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{AnalysisException, Column, DataFrame, SaveMode}
import org.scalatest._
import org.apache.spark.sql.avro._



import java.io.File
import scala.collection.immutable._

class Test extends FunSuite with BeforeAndAfter {
Expand Down Expand Up @@ -62,10 +61,46 @@ class Test extends FunSuite with BeforeAndAfter {

test(testSourceTargetJdbc(moviesDf), moviesDf, "SourceTargetJdbcTest")
test(testSourceTargetJdbcUserPassword(moviesDf), moviesDf, "SourceTargetJdbcTestUserPassword")
test(testSourceFile("parquet","src/test/resources/sample_data/emp.parquet"),
spark.read.parquet("src/test/resources/sample_output/employee.parquet"),"SourceParquetFileTest")
test(testSourceFile("avro","src/test/resources/sample_data/emp.avro"),
spark.read.parquet("src/test/resources/sample_output/employee.parquet"),"SourceAvroFileTest")
test(testSourceFile("parquet", "src/test/resources/sample_data/emp.parquet"),
spark.read.parquet("src/test/resources/sample_output/employee.parquet"), "SourceParquetFileTest")
test(testSourceFile("avro", "src/test/resources/sample_data/emp.avro"),
spark.read.parquet("src/test/resources/sample_output/employee.parquet"), "SourceAvroFileTest")

test(
testTargetFileTarget("parquet",
"src/test/resources/sample_target/target.parquet",
SaveMode.Overwrite,
Map(),
List("year"),
(3, List("title")),
List("title"),
Some("table1")),
movies, "TargetParquetFileTest")
test(
testTargetFileTarget("avro", "src/test/resources/sample_target/target.avro",
SaveMode.Overwrite,
Map(),
List("year"),
(3, List("title")),
List("title"),
Some("table2")),
movies, "TargetAvroFileTest")

test(testSourceFile("parquet", "src/test/resources/sample_target/target.parquet"), movies, "TargetParquetFileTest1")
test(testSourceFile("avro", "src/test/resources/sample_target/target.avro"), movies, "TargetAvroFileTest1")

test(
testTargetFileTarget("parquet", "src/test/resources/sample_target/target.parquet", SaveMode.Overwrite, Map(), List("year"), (3, List("title")), List("title"), Some("tableTarget1")),
testSourceSql("tableTarget1"),
"TargetParquetFileTest2")
test(
testTargetFileTarget("avro", "src/test/resources/sample_target/target.avro", SaveMode.Overwrite, Map(), List("year"), (3, List("title")), List("title"), Some("tableTarget2")),
testSourceSql("tableTarget2"),
"TargetAvroFileTest2")

testTargetFileBucketPartition("src/test/resources/sample_target/target.parquet", List("year"), (3, List("title")),"parquet")
testTargetFileBucketPartition("src/test/resources/sample_target/target.avro",List("year"),(3,List("title")),"avro")

repartitionAndColaeseTest(moviesDf)
repartitionWithColumnTest(df)
repartitionWithSizeAndColumnTest(df)
Expand Down Expand Up @@ -152,21 +187,72 @@ class Test extends FunSuite with BeforeAndAfter {
def testSourceTargetJdbc(df: DataFrame): DataFrame = {
almaren.builder
.sourceSql(s"select * from $testTable")
.targetJdbc("jdbc:postgresql://localhost/almaren", "org.postgresql.Driver", "movies_test", SaveMode.Overwrite)
.targetJdbc("jdbc:postgresql://localhost/almaren", "org.postgresql.Driver", "movies_test", SaveMode.Overwrite, Some("postgres"), Some("foo"))
.batch

almaren.builder
.sourceJdbc("jdbc:postgresql://localhost/almaren", "org.postgresql.Driver", "select * from movies_test", Some("postgres"), Some("foo"))
.batch
}

def testSourceFile(format: String, path: String): DataFrame = {
almaren.builder
.sourceJdbc("jdbc:postgresql://localhost/almaren", "org.postgresql.Driver", "select * from movies_test")
.sourceFile(format, path, Map())
.batch

}

def testSourceFile(format:String,path:String):DataFrame ={
def testSourceSql(tableName: String): DataFrame = {
almaren.builder
.sourceFile(format,path,Map())
.sourceSql(s"select * from $tableName")
.batch

}

def testTargetFileTarget(format: String, path: String, saveMode: SaveMode, params: Map[String, String], partitionBy: List[String], bucketBy: (Int, List[String]), sortBy: List[String], tableName: Option[String]): DataFrame = {
almaren.builder
.sourceDataFrame(movies)
.targetFile(format, path, saveMode, params, partitionBy, bucketBy, sortBy, tableName)
.batch
}

def testTargetFileBucketPartition(path: String, partitionBy: List[String], bucketBy: (Int, List[String]),fileFormat: String) = {
val filesList = getListOfDirectories(path).map(_.toString)
if (partitionBy.nonEmpty) {
val extractFiles = filesList.map(a => a.substring(a.lastIndexOf("=") + 1))
val distinctValues = movies.select(partitionBy(0)).distinct.as[String].collect.toList
val checkLists = extractFiles.intersect(distinctValues)
test(s"partitionBy_$fileFormat") {
assert(checkLists.size == distinctValues.size)
}
}
if (bucketBy._2.nonEmpty) {
val check = filesList.map(f => getListOfFiles(f).size)
val bool = if (check.forall(_ == check.head)) check.head == 2 * bucketBy._1 else false
test(s"bucketBy_$fileFormat") {
assert(bool == true)
}
}
}

def getListOfDirectories(dir: String): List[File] = {
val d = new File(dir)
if (d.exists && d.isDirectory) {
d.listFiles.filter(_.isDirectory).toList
} else {
List[File]()
}
}

def getListOfFiles(dir: String): List[File] = {
val d = new File(dir)
if (d.exists && d.isDirectory) {
d.listFiles.filter(_.isFile).toList
} else {
List[File]()
}
}

def repartitionAndColaeseTest(dataFrame: DataFrame) {
val repartition_df = almaren.builder.sourceSql(s"select * from $testTable")
.repartition(10).batch
Expand Down Expand Up @@ -237,6 +323,7 @@ class Test extends FunSuite with BeforeAndAfter {
assert(aliasTableCount > 0)
}
}

def testingDrop(moviesDf: DataFrame): Unit = {

moviesDf.createTempView("Test_drop")
Expand All @@ -247,6 +334,7 @@ class Test extends FunSuite with BeforeAndAfter {
test(testDF, testDropcompare, "Testing Drop")

}

def testingWhere(moviesDf: DataFrame): Unit = {

moviesDf.createTempView("Test_where")
Expand All @@ -257,6 +345,7 @@ class Test extends FunSuite with BeforeAndAfter {

test(testDF, testWherecompare, "Testing Where")
}

def testingSqlExpr(): Unit = {

val df = Seq(
Expand All @@ -269,10 +358,11 @@ class Test extends FunSuite with BeforeAndAfter {
df.createOrReplaceTempView("person_info")


val testDF = almaren.builder.sourceSql("select CAST (salary as INT) from person_info" ).batch
val testDF = almaren.builder.sourceSql("select CAST (salary as INT) from person_info").batch
val testSqlExprcompare = almaren.builder.sourceSql("select * from person_info").sqlExpr("CAST(salary as int)").batch
test(testDF, testSqlExprcompare, "Testing sqlExpr")
}
}

def testingSourceDataFrame(): Unit = {

val testDS = spark.range(3)
Expand All @@ -283,8 +373,7 @@ class Test extends FunSuite with BeforeAndAfter {
}



def cacheTest(df: DataFrame): Unit = {
def cacheTest(df: DataFrame): Unit = {

df.createTempView("cache_test")

Expand All @@ -304,7 +393,7 @@ class Test extends FunSuite with BeforeAndAfter {

def testingPipe(df: DataFrame): Unit = {
df.createTempView("pipe_view")
val pipeDf = almaren.builder.sql("select * from pipe_view").pipe("echo","Testing Echo Command").batch
val pipeDf = almaren.builder.sql("select * from pipe_view").pipe("echo", "Testing Echo Command").batch
val pipeDfCount = pipeDf.count()
test("Testing Pipe") {
assert(pipeDfCount > 0)
Expand Down Expand Up @@ -391,13 +480,13 @@ class Test extends FunSuite with BeforeAndAfter {

val df = spark.sql("select * from sample_json_table")
val jsonSchema = "`address` STRING,`age` BIGINT,`name` STRING"
val generatedSchema = Util.genDDLFromJsonString(df, "json_string",0.1)
val generatedSchema = Util.genDDLFromJsonString(df, "json_string", 0.1)
testSchema(jsonSchema, generatedSchema, "Test infer schema for json column")
}

def testInferSchemaDataframe(df: DataFrame): Unit = {
val dfSchema = "`cast` ARRAY<STRING>,`genres` ARRAY<STRING>,`title` STRING,`year` BIGINT"
val generatedSchema = Util.genDDLFromDataFrame(df,0.1)
val generatedSchema = Util.genDDLFromDataFrame(df, 0.1)
testSchema(dfSchema, generatedSchema, "Test infer schema for dataframe")
}

Expand All @@ -409,3 +498,4 @@ class Test extends FunSuite with BeforeAndAfter {
}

}

0 comments on commit 47cf50a

Please sign in to comment.