Skip to content

Commit

Permalink
Merge pull request #60 from badrinathpatchikolla/master
Browse files Browse the repository at this point in the history
fixed test cases issue and added csv option in target deserializer
  • Loading branch information
mantovani authored Oct 4, 2022
2 parents fc58c7c + 8d39204 commit 329c632
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ private[almaren] trait Deserializer extends Core {
case "JSON" => json
case "XML" => xml
case "AVRO" => avro
case "CSV" => csv
case d => throw InvalidDecoder(d)
}
}
Expand Down
51 changes: 0 additions & 51 deletions src/test/scala/com/github/music/of/the/ainur/almaren/Test.scala
Original file line number Diff line number Diff line change
Expand Up @@ -251,57 +251,6 @@ class Test extends FunSuite with BeforeAndAfter {
}
}

def testSourceSql(tableName: String): DataFrame = {
almaren.builder
.sourceSql(s"select * from $tableName")
.batch

}

def testTargetFileTarget(format: String, path: String, saveMode: SaveMode, params: Map[String, String], partitionBy: List[String], bucketBy: (Int, List[String]), sortBy: List[String], tableName: Option[String]): DataFrame = {
almaren.builder
.sourceDataFrame(movies)
.targetFile(format, path, saveMode, params, partitionBy, bucketBy, sortBy, tableName)
.batch
}

def testTargetFileBucketPartition(path: String, partitionBy: List[String], bucketBy: (Int, List[String]),fileFormat: String) = {
val filesList = getListOfDirectories(path).map(_.toString)
if (partitionBy.nonEmpty) {
val extractFiles = filesList.map(a => a.substring(a.lastIndexOf("=") + 1))
val distinctValues = movies.select(partitionBy(0)).distinct.as[String].collect.toList
val checkLists = extractFiles.intersect(distinctValues)
test(s"partitionBy_$fileFormat") {
assert(checkLists.size == distinctValues.size)
}
}
if (bucketBy._2.nonEmpty) {
val check = filesList.map(f => getListOfFiles(f).size)
val bool = if (check.forall(_ == check.head)) check.head == 2 * bucketBy._1 else false
test(s"bucketBy_$fileFormat") {
assert(bool == true)
}
}
}

def getListOfDirectories(dir: String): List[File] = {
val d = new File(dir)
if (d.exists && d.isDirectory) {
d.listFiles.filter(_.isDirectory).toList
} else {
List[File]()
}
}

def getListOfFiles(dir: String): List[File] = {
val d = new File(dir)
if (d.exists && d.isDirectory) {
d.listFiles.filter(_.isFile).toList
} else {
List[File]()
}
}

def repartitionAndColaeseTest(dataFrame: DataFrame) {
val repartition_df = almaren.builder.sourceSql(s"select * from $testTable")
.repartition(10).batch
Expand Down

0 comments on commit 329c632

Please sign in to comment.