From 9323e57d3e5454bb71f1af41bfbd1d07a6f478a1 Mon Sep 17 00:00:00 2001 From: pradeepmaripala Date: Wed, 4 Oct 2023 13:10:24 +0530 Subject: [PATCH 1/3] Added Cache Argument Method for 3.3 --- README.md | 32 ++++++++++++------- .../the/ainur/almaren/builder/core/Main.scala | 5 +-- .../the/ainur/almaren/state/core/Main.scala | 20 ++++++++---- .../music/of/the/ainur/almaren/Test.scala | 13 ++++++++ 4 files changed, 49 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index d307c04b..5aca9f5a 100644 --- a/README.md +++ b/README.md @@ -67,27 +67,29 @@ libraryDependencies += "com.github.music-of-the-ainur" %% "almaren-framework" % To run in spark-shell: For scala version-2.12: ``` -spark-shell --packages "com.github.music-of-the-ainur:almaren-framework_2.12:0.9.10-3.3" +spark-shell --packages "com.github.music-of-the-ainur:almaren-framework_2.12:0.9.11-3.3" ``` For scala version-2.13: ``` -spark-shell --packages "com.github.music-of-the-ainur:almaren-framework_2.13:0.9.10-3.3" +spark-shell --packages "com.github.music-of-the-ainur:almaren-framework_2.13:0.9.11-3.3" ``` Almaren connector is available in [Maven Central](https://mvnrepository.com/artifact/com.github.music-of-the-ainur) repository. -| version | Connector Artifact | -|----------------------------|------------------------------------------------------------------| -| Spark 3.4.x and scala 2.13 | `com.github.music-of-the-ainur:almaren-framework_2.13:0.9.10-3.4` | -| Spark 3.4.x and scala 2.12 | `com.github.music-of-the-ainur:almaren-framework_2.12:0.9.10-3.4` | -| Spark 3.3.x and scala 2.13 | `com.github.music-of-the-ainur:almaren-framework_2.13:0.9.10-3.3` | -| Spark 3.3.x and scala 2.12 | `com.github.music-of-the-ainur:almaren-framework_2.12:0.9.10-3.3` | -| Spark 3.2.x and scala 2.12 | `com.github.music-of-the-ainur:almaren-framework_2.12:0.9.10-3.2` | -| Spark 3.1.x and scala 2.12 | `com.github.music-of-the-ainur:almaren-framework_2.12:0.9.10-3.1` | -| Spark 2.4.x and scala 2.12 | `com.github.music-of-the-ainur:almaren-framework_2.12:0.9.10-2.4` | -| Spark 2.4.x and scala 2.11 | `com.github.music-of-the-ainur:almaren-framework_2.11:0.9.10-2.4` | +| version | Connector Artifact | +|----------------------------|-------------------------------------------------------------------| +| Spark 3.5.x and scala 2.13 | `com.github.music-of-the-ainur:almaren-framework_2.13:0.9.11-3.5` | +| Spark 3.5.x and scala 2.12 | `com.github.music-of-the-ainur:almaren-framework_2.12:0.9.11-3.5` | +| Spark 3.4.x and scala 2.13 | `com.github.music-of-the-ainur:almaren-framework_2.13:0.9.11-3.4` | +| Spark 3.4.x and scala 2.12 | `com.github.music-of-the-ainur:almaren-framework_2.12:0.9.11-3.4` | +| Spark 3.3.x and scala 2.13 | `com.github.music-of-the-ainur:almaren-framework_2.13:0.9.11-3.3` | +| Spark 3.3.x and scala 2.12 | `com.github.music-of-the-ainur:almaren-framework_2.12:0.9.11-3.3` | +| Spark 3.2.x and scala 2.12 | `com.github.music-of-the-ainur:almaren-framework_2.12:0.9.11-3.2` | +| Spark 3.1.x and scala 2.12 | `com.github.music-of-the-ainur:almaren-framework_2.12:0.9.11-3.1` | +| Spark 2.4.x and scala 2.12 | `com.github.music-of-the-ainur:almaren-framework_2.12:0.9.11-2.4` | +| Spark 2.4.x and scala 2.11 | `com.github.music-of-the-ainur:almaren-framework_2.11:0.9.11-2.4` | ### Batch Example ```scala @@ -357,6 +359,12 @@ Cache/Uncache both DataFrame or Table cache(true) ``` +Cache Dataframe with Storage Level + +```scala +cache(true,storageLevel = MEMORY_AND_DISK) +``` + #### Coalesce Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset. diff --git a/src/main/scala/com/github/music/of/the/ainur/almaren/builder/core/Main.scala b/src/main/scala/com/github/music/of/the/ainur/almaren/builder/core/Main.scala index a5dc8fca..ab59a966 100644 --- a/src/main/scala/com/github/music/of/the/ainur/almaren/builder/core/Main.scala +++ b/src/main/scala/com/github/music/of/the/ainur/almaren/builder/core/Main.scala @@ -4,6 +4,7 @@ import com.github.music.of.the.ainur.almaren.Tree import com.github.music.of.the.ainur.almaren.builder.Core import com.github.music.of.the.ainur.almaren.state.core._ import org.apache.spark.sql.Column +import org.apache.spark.storage.StorageLevel private[almaren] trait Main extends Core { def sql(sql: String): Option[Tree] = @@ -12,8 +13,8 @@ private[almaren] trait Main extends Core { def alias(alias:String): Option[Tree] = Alias(alias) - def cache(opType:Boolean = true,tableName:Option[String] = None): Option[Tree] = - Cache(opType, tableName) + def cache(opType:Boolean = true,tableName:Option[String] = None,storageLevel: Option[StorageLevel] = None): Option[Tree] = + Cache(opType, tableName = tableName, storageLevel = storageLevel) def coalesce(size:Int): Option[Tree] = Coalesce(size) diff --git a/src/main/scala/com/github/music/of/the/ainur/almaren/state/core/Main.scala b/src/main/scala/com/github/music/of/the/ainur/almaren/state/core/Main.scala index d82476a5..1d4ee87b 100644 --- a/src/main/scala/com/github/music/of/the/ainur/almaren/state/core/Main.scala +++ b/src/main/scala/com/github/music/of/the/ainur/almaren/state/core/Main.scala @@ -3,6 +3,7 @@ package com.github.music.of.the.ainur.almaren.state.core import com.github.music.of.the.ainur.almaren.State import com.github.music.of.the.ainur.almaren.util.Constants import org.apache.spark.sql.{Column, DataFrame} +import org.apache.spark.storage.StorageLevel private[almaren] abstract class Main extends State { override def executor(df: DataFrame): DataFrame = core(df) @@ -81,22 +82,27 @@ case class Alias(alias:String) extends Main { } } -case class Cache(opType:Boolean = true,tableName:Option[String] = None) extends Main { +case class Cache(opType: Boolean = true, tableName: Option[String] = None, storageLevel: Option[StorageLevel] = None) extends Main { override def core(df: DataFrame): DataFrame = cache(df) def cache(df: DataFrame): DataFrame = { - logger.info(s"opType:{$opType}, tableName{$tableName}") + logger.info(s"opType:{$opType}, tableName:{$tableName}, StorageType:{$storageLevel}") tableName match { - case Some(t) => cacheTable(df,t) - case None => cacheDf(df) + case Some(t) => cacheTable(df, t) + case None => cacheDf(df, storageLevel) } df } - private def cacheDf(df:DataFrame): Unit = opType match { - case true => df.persist() + private def cacheDf(df: DataFrame, storageLevel: Option[StorageLevel]): Unit = opType match { + case true => { + storageLevel match { + case Some(value) => df.persist(value) + case None => df.persist() + } + } case false => df.unpersist() } - private def cacheTable(df:DataFrame,tableName: String): Unit = + private def cacheTable(df: DataFrame, tableName: String): Unit = opType match { case true => df.sqlContext.cacheTable(tableName) case false => df.sqlContext.uncacheTable(tableName) diff --git a/src/test/scala/com/github/music/of/the/ainur/almaren/Test.scala b/src/test/scala/com/github/music/of/the/ainur/almaren/Test.scala index 967389bd..c01d72f9 100644 --- a/src/test/scala/com/github/music/of/the/ainur/almaren/Test.scala +++ b/src/test/scala/com/github/music/of/the/ainur/almaren/Test.scala @@ -10,6 +10,7 @@ import org.scalatest.funsuite.AnyFunSuite import java.io.File import scala.collection.immutable._ +import org.apache.spark.storage.StorageLevel._ class Test extends AnyFunSuite with BeforeAndAfter { val almaren = Almaren("App Test") @@ -383,6 +384,18 @@ class Test extends AnyFunSuite with BeforeAndAfter { assert(bool_cache) } + val testCacheDfStorage: DataFrame = almaren.builder.sourceSql("select * from cache_test").cache(true,storageLevel = Some(MEMORY_ONLY)).batch + val bool_cache_storage = testCacheDfStorage.storageLevel.useMemory + test("Testing Cache Memory Storage") { + assert(bool_cache_storage) + } + + val testCacheDfDiskStorage: DataFrame = almaren.builder.sourceSql("select * from cache_test").cache(true, storageLevel = Some(DISK_ONLY)).batch + val bool_cache_disk_storage = testCacheDfDiskStorage.storageLevel.useDisk + test("Testing Cache Disk Storage") { + assert(bool_cache_disk_storage) + } + val testUnCacheDf = almaren.builder.sourceSql("select * from cache_test").cache(false).batch val bool_uncache = testUnCacheDf.storageLevel.useMemory test("Testing Uncache") { From 8298dbd3a06aaf1efa67f977e1cc4fbed3d154e1 Mon Sep 17 00:00:00 2001 From: pradeepmaripala Date: Wed, 4 Oct 2023 13:40:25 +0530 Subject: [PATCH 2/3] Added Cache Argument Method for 3.3 --- README.md | 2 +- .../com/github/music/of/the/ainur/almaren/state/core/Main.scala | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 5aca9f5a..91bc1fd1 100644 --- a/README.md +++ b/README.md @@ -61,7 +61,7 @@ The Almaren Framework provides a simplified consistent minimalistic layer over A To add Almaren Framework dependency to your sbt build: ``` -libraryDependencies += "com.github.music-of-the-ainur" %% "almaren-framework" % "0.9.10-3.3" +libraryDependencies += "com.github.music-of-the-ainur" %% "almaren-framework" % "0.9.11-3.3" ``` To run in spark-shell: diff --git a/src/main/scala/com/github/music/of/the/ainur/almaren/state/core/Main.scala b/src/main/scala/com/github/music/of/the/ainur/almaren/state/core/Main.scala index 1d4ee87b..68920ba7 100644 --- a/src/main/scala/com/github/music/of/the/ainur/almaren/state/core/Main.scala +++ b/src/main/scala/com/github/music/of/the/ainur/almaren/state/core/Main.scala @@ -84,6 +84,7 @@ case class Alias(alias:String) extends Main { case class Cache(opType: Boolean = true, tableName: Option[String] = None, storageLevel: Option[StorageLevel] = None) extends Main { override def core(df: DataFrame): DataFrame = cache(df) + def cache(df: DataFrame): DataFrame = { logger.info(s"opType:{$opType}, tableName:{$tableName}, StorageType:{$storageLevel}") tableName match { From 827f76d395d18b756f15f84b81ef927d300b949e Mon Sep 17 00:00:00 2001 From: pradeepmaripala Date: Thu, 5 Oct 2023 17:49:40 +0530 Subject: [PATCH 3/3] updated libs and sbt to latest versions for 3.3 --- build.sbt | 8 ++++---- project/build.properties | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/build.sbt b/build.sbt index 2be06e9b..579380e0 100755 --- a/build.sbt +++ b/build.sbt @@ -7,7 +7,7 @@ lazy val scala213 = "2.13.9" crossScalaVersions := Seq(scala212,scala213) ThisBuild / scalaVersion := scala213 -val sparkVersion = "3.3.0" +val sparkVersion = "3.3.3" val majorVersionReg = "([0-9]+\\.[0-9]+).{0,}".r val majorVersionReg(majorVersion) = sparkVersion @@ -19,10 +19,10 @@ libraryDependencies ++= Seq( "org.apache.spark" %% "spark-sql" % sparkVersion % "provided", "org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion % "provided", "org.apache.spark" %% "spark-avro" % sparkVersion, - "com.databricks" %% "spark-xml" % "0.14.0", + "com.databricks" %% "spark-xml" % "0.17.0", "com.github.music-of-the-ainur" %% "quenya-dsl" % s"1.2.3-${majorVersion}", - "org.scalatest" %% "scalatest" % "3.2.14" % "test", - "org.postgresql" % "postgresql" % "42.2.8" % "test" + "org.scalatest" %% "scalatest" % "3.2.17" % "test", + "org.postgresql" % "postgresql" % "42.6.0" % "test" ) enablePlugins(GitVersioning) diff --git a/project/build.properties b/project/build.properties index 22af2628..27430827 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.7.1 +sbt.version=1.9.6