Skip to content

Commit

Permalink
Merge pull request #92 from DV1207/master
Browse files Browse the repository at this point in the history
Added Cache Argument Method
  • Loading branch information
badrinathpatchikolla authored Oct 5, 2023
2 parents d2ab401 + cbdc55f commit bac2643
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 34 deletions.
34 changes: 21 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,32 +62,34 @@ The Almaren Framework provides a simplified consistent minimalistic layer over A
To add Almaren Framework dependency to your sbt build:

```
libraryDependencies += "com.github.music-of-the-ainur" %% "almaren-framework" % "0.9.10-3.4"
libraryDependencies += "com.github.music-of-the-ainur" %% "almaren-framework" % "0.9.11-3.5"
```

To run in spark-shell:

For scala version(2.12):
```
spark-shell --packages "com.github.music-of-the-ainur:almaren-framework_2.12:0.9.10-3.4"
spark-shell --packages "com.github.music-of-the-ainur:almaren-framework_2.12:0.9.11-3.5"
```
For scala version(2.13):
```
spark-shell --packages "com.github.music-of-the-ainur:almaren-framework_2.13:0.9.10-3.4"
spark-shell --packages "com.github.music-of-the-ainur:almaren-framework_2.13:0.9.11-3.5"
```
Almaren connector is available in
[Maven Central](https://mvnrepository.com/artifact/com.github.music-of-the-ainur) repository.

| version | Connector Artifact |
|----------------------------|------------------------------------------------------------------|
| Spark 3.4.x and scala 2.13 | `com.github.music-of-the-ainur:almaren-framework_2.13:0.9.10-3.4` |
| Spark 3.4.x and scala 2.12 | `com.github.music-of-the-ainur:almaren-framework_2.12:0.9.10-3.4` |
| Spark 3.3.x and scala 2.13 | `com.github.music-of-the-ainur:almaren-framework_2.13:0.9.10-3.3` |
| Spark 3.3.x and scala 2.12 | `com.github.music-of-the-ainur:almaren-framework_2.12:0.9.10-3.3` |
| Spark 3.2.x and scala 2.12 | `com.github.music-of-the-ainur:almaren-framework_2.12:0.9.10-3.2` |
| Spark 3.1.x and scala 2.12 | `com.github.music-of-the-ainur:almaren-framework_2.12:0.9.10-3.1` |
| Spark 2.4.x and scala 2.12 | `com.github.music-of-the-ainur:almaren-framework_2.12:0.9.10-2.4` |
| Spark 2.4.x and scala 2.11 | `com.github.music-of-the-ainur:almaren-framework_2.11:0.9.10-2.4` |
| version | Connector Artifact |
|----------------------------|-------------------------------------------------------------------|
| Spark 3.5.x and scala 2.13 | `com.github.music-of-the-ainur:almaren-framework_2.13:0.9.11-3.5` |
| Spark 3.5.x and scala 2.12 | `com.github.music-of-the-ainur:almaren-framework_2.12:0.9.11-3.5` |
| Spark 3.4.x and scala 2.13 | `com.github.music-of-the-ainur:almaren-framework_2.13:0.9.11-3.4` |
| Spark 3.4.x and scala 2.12 | `com.github.music-of-the-ainur:almaren-framework_2.12:0.9.11-3.4` |
| Spark 3.3.x and scala 2.13 | `com.github.music-of-the-ainur:almaren-framework_2.13:0.9.11-3.3` |
| Spark 3.3.x and scala 2.12 | `com.github.music-of-the-ainur:almaren-framework_2.12:0.9.11-3.3` |
| Spark 3.2.x and scala 2.12 | `com.github.music-of-the-ainur:almaren-framework_2.12:0.9.11-3.2` |
| Spark 3.1.x and scala 2.12 | `com.github.music-of-the-ainur:almaren-framework_2.12:0.9.11-3.1` |
| Spark 2.4.x and scala 2.12 | `com.github.music-of-the-ainur:almaren-framework_2.12:0.9.11-2.4` |
| Spark 2.4.x and scala 2.11 | `com.github.music-of-the-ainur:almaren-framework_2.11:0.9.11-2.4` |


### Batch Example
Expand Down Expand Up @@ -358,6 +360,12 @@ Cache/Uncache both DataFrame or Table
cache(true)
```

Cache Dataframe with Storage Level

```scala
cache(true,storageLevel = MEMORY_AND_DISK)
```

#### Coalesce

Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.
Expand Down
10 changes: 5 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ lazy val scala213 = "2.13.9"
crossScalaVersions := Seq(scala212,scala213)
ThisBuild / scalaVersion := scala213

val sparkVersion = "3.4.0"
val sparkVersion = "3.5.0"
val majorVersionReg = "([0-9]+\\.[0-9]+).{0,}".r

val majorVersionReg(majorVersion) = sparkVersion
Expand All @@ -19,10 +19,10 @@ libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion % "provided",
"org.apache.spark" %% "spark-avro" % sparkVersion,
"com.databricks" %% "spark-xml" % "0.14.0",
"com.github.music-of-the-ainur" %% "quenya-dsl" % s"1.2.3-${majorVersion}-2",
"org.scalatest" %% "scalatest" % "3.2.14" % "test",
"org.postgresql" % "postgresql" % "42.2.8" % "test"
"com.databricks" %% "spark-xml" % "0.17.0",
"com.github.music-of-the-ainur" %% "quenya-dsl" % s"1.2.3-${majorVersion}",
"org.scalatest" %% "scalatest" % "3.2.17" % "test",
"org.postgresql" % "postgresql" % "42.6.0" % "test"
)

enablePlugins(GitVersioning)
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.7.1
sbt.version=1.9.6
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.github.music.of.the.ainur.almaren.Tree
import com.github.music.of.the.ainur.almaren.builder.Core
import com.github.music.of.the.ainur.almaren.state.core._
import org.apache.spark.sql.Column
import org.apache.spark.storage.StorageLevel

private[almaren] trait Main extends Core {
def sql(sql: String): Option[Tree] =
Expand All @@ -12,8 +13,8 @@ private[almaren] trait Main extends Core {
def alias(alias:String): Option[Tree] =
Alias(alias)

def cache(opType:Boolean = true,tableName:Option[String] = None): Option[Tree] =
Cache(opType, tableName)
def cache(opType:Boolean = true,tableName:Option[String] = None,storageLevel: Option[StorageLevel] = None): Option[Tree] =
Cache(opType, tableName = tableName, storageLevel = storageLevel)

def coalesce(size:Int): Option[Tree] =
Coalesce(size)
Expand All @@ -33,12 +34,12 @@ private[almaren] trait Main extends Core {
def dsl(dsl:String): Option[Tree] =
Dsl(dsl)

def sqlExpr(exprs:String*): Option[Tree] =
def sqlExpr(exprs:String*): Option[Tree] =
SqlExpr(exprs:_*)

def where(expr:String): Option[Tree] =
def where(expr:String): Option[Tree] =
Where(expr)

def drop(drop:String*): Option[Tree] =
def drop(drop:String*): Option[Tree] =
Drop(drop:_*)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.github.music.of.the.ainur.almaren.state.core
import com.github.music.of.the.ainur.almaren.State
import com.github.music.of.the.ainur.almaren.util.Constants
import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.storage.StorageLevel

private[almaren] abstract class Main extends State {
override def executor(df: DataFrame): DataFrame = core(df)
Expand Down Expand Up @@ -81,22 +82,30 @@ case class Alias(alias:String) extends Main {
}
}

case class Cache(opType:Boolean = true,tableName:Option[String] = None) extends Main {
case class Cache(opType: Boolean = true, tableName: Option[String] = None, storageLevel: Option[StorageLevel] = None) extends Main {
override def core(df: DataFrame): DataFrame = cache(df)

def cache(df: DataFrame): DataFrame = {
logger.info(s"opType:{$opType}, tableName{$tableName}")
logger.info(s"opType:{$opType}, tableName:{$tableName}, StorageType:{$storageLevel}")
tableName match {
case Some(t) => cacheTable(df,t)
case None => cacheDf(df)
case Some(t) => cacheTable(df, t)
case None => cacheDf(df, storageLevel)
}
df
}
private def cacheDf(df:DataFrame): Unit = opType match {
case true => df.persist()

private def cacheDf(df: DataFrame, storageLevel: Option[StorageLevel]): Unit = opType match {
case true => {
storageLevel match {
case Some(value) => df.persist(value)
case None => df.persist()
}
}
case false => df.unpersist()

}
private def cacheTable(df:DataFrame,tableName: String): Unit =

private def cacheTable(df: DataFrame, tableName: String): Unit =
opType match {
case true => df.sqlContext.cacheTable(tableName)
case false => df.sqlContext.uncacheTable(tableName)
Expand All @@ -112,7 +121,7 @@ case class SqlExpr(exprs:String*) extends Main {

case class Where(where:String) extends Main {
override def core(df: DataFrame): DataFrame = {
logger.info(s"where:{$where}")
logger.info(s"where:{$where}")
df.where(where)
}
}
Expand All @@ -122,4 +131,4 @@ case class Drop(drop:String*) extends Main {
logger.info(s"""drop:{${drop.mkString("\n")}}""")
df.drop(drop:_*)
}
}
}
14 changes: 14 additions & 0 deletions src/test/scala/com/github/music/of/the/ainur/almaren/Test.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import org.apache.spark.sql.{AnalysisException, Column, DataFrame, SaveMode}
import org.scalatest._
import org.apache.spark.sql.avro._
import org.scalatest.funsuite.AnyFunSuite
import org.apache.spark.storage.StorageLevel._

import java.io.File
import scala.collection.immutable._
Expand Down Expand Up @@ -383,6 +384,18 @@ class Test extends AnyFunSuite with BeforeAndAfter {
assert(bool_cache)
}

val testCacheDfStorage: DataFrame = almaren.builder.sourceSql("select * from cache_test").cache(true, storageLevel = Some(MEMORY_ONLY)).batch
val bool_cache_storage = testCacheDfStorage.storageLevel.useMemory
test("Testing Cache Memory Storage") {
assert(bool_cache_storage)
}

val testCacheDfDiskStorage: DataFrame = almaren.builder.sourceSql("select * from cache_test").cache(true, storageLevel = Some(DISK_ONLY)).batch
val bool_cache_disk_storage = testCacheDfDiskStorage.storageLevel.useDisk
test("Testing Cache Disk Storage") {
assert(bool_cache_disk_storage)
}

val testUnCacheDf = almaren.builder.sourceSql("select * from cache_test").cache(false).batch
val bool_uncache = testUnCacheDf.storageLevel.useMemory
test("Testing Uncache") {
Expand Down Expand Up @@ -416,6 +429,7 @@ class Test extends AnyFunSuite with BeforeAndAfter {
test(jsonschmeadf, resDf, "Deserialize JSON Schema")
}


def deserializerCsvTest(): Unit = {
val df = Seq(
("John,Chris", "Smith", "London"),
Expand Down

0 comments on commit bac2643

Please sign in to comment.