From c27b95ec79cd1dbf8f559bb4da77d6fb296212e8 Mon Sep 17 00:00:00 2001 From: Manjunath Davanam Date: Wed, 19 Jul 2023 16:10:35 +0530 Subject: [PATCH 01/12] #000 fix: CSP Refactor --- .../analytics/framework/util/CSPUtils.scala | 50 +++++++++++++++++++ .../analytics/framework/util/CommonUtil.scala | 38 ++------------ 2 files changed, 53 insertions(+), 35 deletions(-) create mode 100644 analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CSPUtils.scala diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CSPUtils.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CSPUtils.scala new file mode 100644 index 0000000..1b49d99 --- /dev/null +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CSPUtils.scala @@ -0,0 +1,50 @@ +package org.ekstep.analytics.framework.util +import org.apache.spark.SparkContext +import org.sunbird.cloud.storage.conf.AppConf + +trait ICloudStorageProvider { + def setConf(sc: SparkContext): Unit +} + +object CloudStorageProviders { + implicit val className: String = "org.ekstep.analytics.framework.util.CloudStorageProvider" + private val providerMap: Map[String, Class[_ <: ICloudStorageProvider]] = Map("S3" -> classOf[S3Provider], "azure" -> classOf[AzureProvider], "gcp" -> classOf[GcpProvider]) + def setSparkCSPConfigurations(sc: SparkContext, csp: String): Unit = { + providerMap.get(csp.toLowerCase()).foreach { providerClass => + val providerConstructor = providerClass.getDeclaredConstructor() + val providerInstance:ICloudStorageProvider = providerConstructor.newInstance() + providerInstance.setConf(sc) + } + } +} +class S3Provider extends ICloudStorageProvider { + override def setConf(sc: SparkContext): Unit = { + implicit val className: String = "org.ekstep.analytics.framework.util.S3Provider" + JobLogger.log("Configuring S3 AccessKey& SecrateKey to SparkContext") + sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", AppConf.getAwsKey()) + sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", AppConf.getAwsSecret()) + val storageEndpoint = AppConf.getConfig("cloud_storage_endpoint") + if (storageEndpoint.nonEmpty) { + sc.hadoopConfiguration.set("fs.s3n.endpoint", storageEndpoint) + } + } +} + +class AzureProvider extends ICloudStorageProvider { + override def setConf(sc: SparkContext): Unit = { + val accName = AppConf.getStorageKey("azure") + val accKey = AppConf.getStorageSecret("azure") + sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem") + sc.hadoopConfiguration.set("fs.azure.account.key." + accName + ".blob.core.windows.net", accKey) + sc.hadoopConfiguration.set("fs.azure.account.keyprovider." + accName + ".blob.core.windows.net", "org.apache.hadoop.fs.azure.SimpleKeyProvider") + } +} +class GcpProvider extends ICloudStorageProvider { + override def setConf(sc: SparkContext): Unit = { + sc.hadoopConfiguration.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") + sc.hadoopConfiguration.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") + sc.hadoopConfiguration.set("fs.gs.auth.service.account.email", AppConf.getStorageKey("gcloud")) + sc.hadoopConfiguration.set("fs.gs.auth.service.account.private.key", AppConf.getStorageSecret("gcloud")) + sc.hadoopConfiguration.set("fs.gs.auth.service.account.private.key.id", AppConf.getConfig("gcloud_private_secret_id")) + } +} diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala index 7f18266..3d67962 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala @@ -8,13 +8,13 @@ import java.security.MessageDigest import java.sql.Timestamp import java.util.zip.GZIPOutputStream import java.util.{Date, Properties} - import com.ing.wbaa.druid.definitions.{Granularity, GranularityType} import org.apache.hadoop.conf.Configuration import org.apache.spark.sql.SparkSession import org.apache.spark.{SparkConf, SparkContext} import org.ekstep.analytics.framework.Level._ import org.ekstep.analytics.framework.Period._ +import org.ekstep.analytics.framework.util.CloudStorageProviders.setSparkCSPConfigurations import org.ekstep.analytics.framework.{DtRange, Event, JobConfig, _} import scala.collection.mutable.ListBuffer @@ -91,9 +91,7 @@ object CommonUtil { } val sc = new SparkContext(conf) - setS3Conf(sc) - setAzureConf(sc) - setGcloudConf(sc) + setSparkCSPConfigurations(sc, AppConf.getConfig("cloud_storage_type")) JobLogger.log("Spark Context initialized") sc } @@ -144,40 +142,10 @@ object CommonUtil { } val sparkSession = SparkSession.builder().appName("sunbird-analytics").config(conf).getOrCreate() - setS3Conf(sparkSession.sparkContext) - setAzureConf(sparkSession.sparkContext) - setGcloudConf(sparkSession.sparkContext) + setSparkCSPConfigurations(sparkSession.sparkContext, AppConf.getConfig("cloud_storage_type")) JobLogger.log("SparkSession initialized") sparkSession } - - def setS3Conf(sc: SparkContext) = { - JobLogger.log("Configuring S3 AccessKey& SecrateKey to SparkContext") - sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", AppConf.getAwsKey()); - sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", AppConf.getAwsSecret()); - - val storageEndpoint = AppConf.getConfig("cloud_storage_endpoint") - if (!"".equalsIgnoreCase(storageEndpoint)) { - sc.hadoopConfiguration.set("fs.s3n.endpoint", storageEndpoint) - } - } - - def setAzureConf(sc: SparkContext) = { - val accName = AppConf.getStorageKey("azure") - val accKey = AppConf.getStorageSecret("azure") - sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem") - sc.hadoopConfiguration.set("fs.azure.account.key." + accName + ".blob.core.windows.net", accKey) - sc.hadoopConfiguration.set("fs.azure.account.keyprovider." + accName + ".blob.core.windows.net", "org.apache.hadoop.fs.azure.SimpleKeyProvider") - } - - def setGcloudConf(sc: SparkContext) = { - sc.hadoopConfiguration.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") - sc.hadoopConfiguration.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") - sc.hadoopConfiguration.set("fs.gs.auth.service.account.email", AppConf.getStorageKey("gcloud")) - sc.hadoopConfiguration.set("fs.gs.auth.service.account.private.key", AppConf.getStorageSecret("gcloud")) - sc.hadoopConfiguration.set("fs.gs.auth.service.account.private.key.id", AppConf.getConfig("gcloud_private_secret_id")) - } - def closeSparkContext()(implicit sc: SparkContext) { JobLogger.log("Closing Spark Context", None, INFO) sc.stop(); From 8d260d87fefaba75cf09cd5d9a2a25daad3ed298 Mon Sep 17 00:00:00 2001 From: Manjunath Davanam Date: Mon, 24 Jul 2023 14:53:18 +0530 Subject: [PATCH 02/12] #000 fix: Obsrv CSP Generalisation --- .../analytics/framework/util/CSPUtils.scala | 8 +++++-- .../framework/util/TestDatasetUtil.scala | 21 +++++++++++++------ 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CSPUtils.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CSPUtils.scala index 1b49d99..4b338ea 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CSPUtils.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CSPUtils.scala @@ -8,7 +8,7 @@ trait ICloudStorageProvider { object CloudStorageProviders { implicit val className: String = "org.ekstep.analytics.framework.util.CloudStorageProvider" - private val providerMap: Map[String, Class[_ <: ICloudStorageProvider]] = Map("S3" -> classOf[S3Provider], "azure" -> classOf[AzureProvider], "gcp" -> classOf[GcpProvider]) + private val providerMap: Map[String, Class[_ <: ICloudStorageProvider]] = Map("s3" -> classOf[S3Provider], "azure" -> classOf[AzureProvider], "gcp" -> classOf[GcpProvider]) def setSparkCSPConfigurations(sc: SparkContext, csp: String): Unit = { providerMap.get(csp.toLowerCase()).foreach { providerClass => val providerConstructor = providerClass.getDeclaredConstructor() @@ -18,8 +18,8 @@ object CloudStorageProviders { } } class S3Provider extends ICloudStorageProvider { + implicit val className: String = "org.ekstep.analytics.framework.util.S3Provider" override def setConf(sc: SparkContext): Unit = { - implicit val className: String = "org.ekstep.analytics.framework.util.S3Provider" JobLogger.log("Configuring S3 AccessKey& SecrateKey to SparkContext") sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", AppConf.getAwsKey()) sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", AppConf.getAwsSecret()) @@ -31,7 +31,9 @@ class S3Provider extends ICloudStorageProvider { } class AzureProvider extends ICloudStorageProvider { + implicit val className: String = "org.ekstep.analytics.framework.util.AzureProvider" override def setConf(sc: SparkContext): Unit = { + JobLogger.log("Configuring Azure AccessKey& SecrateKey to SparkContext") val accName = AppConf.getStorageKey("azure") val accKey = AppConf.getStorageSecret("azure") sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem") @@ -40,7 +42,9 @@ class AzureProvider extends ICloudStorageProvider { } } class GcpProvider extends ICloudStorageProvider { + implicit val className: String = "org.ekstep.analytics.framework.util.GcpProvider" override def setConf(sc: SparkContext): Unit = { + JobLogger.log("Configuring GCP AccessKey& SecrateKey to SparkContext") sc.hadoopConfiguration.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") sc.hadoopConfiguration.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") sc.hadoopConfiguration.set("fs.gs.auth.service.account.email", AppConf.getStorageKey("gcloud")) diff --git a/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestDatasetUtil.scala b/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestDatasetUtil.scala index af78a5c..abfa767 100644 --- a/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestDatasetUtil.scala +++ b/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestDatasetUtil.scala @@ -93,14 +93,23 @@ class TestDatasetUtil extends BaseSpec with Matchers with MockFactory { val rdd = sparkSession.sparkContext.parallelize(Seq(EnvSummary("env1", 22.1, 3), EnvSummary("env2", 20.1, 3), EnvSummary("env1", 32.1, 4)), 1); import sparkSession.implicits._ val df = sparkSession.createDataFrame(rdd); - a[AzureException] should be thrownBy { - df.saveToBlobStore(StorageConfig("azure", "test-container", "src/test/resources"), "csv", "test-report", Option(Map("header" -> "true")), Option(Seq("env"))); + val azureException = intercept[Throwable] { + df.saveToBlobStore(StorageConfig("azure", "test-container", "src/test/resources"), "csv", "test-report", Option(Map("header" -> "true")), Option(Seq("env"))); } - - a[S3Exception] should be thrownBy { - df.saveToBlobStore(StorageConfig("s3", "test-container", "src/test/resources"), "csv", "test-report", Option(Map("header" -> "true")), Option(Seq("env"))); + val s3Exception = intercept[Throwable] { + df.saveToBlobStore(StorageConfig("s3", "test-container", "src/test/resources"), "csv", "test-report", Option(Map("header" -> "true")), Option(Seq("env"))) + } + handleException(azureException) + handleException(s3Exception) + def handleException(caughtException: Throwable): Unit = { + caughtException match { + case s3Exception: S3Exception => println("S3 Exception occurred") + case azureException: AzureException => println("Azure Exception occurred") + case illegalArgumentException: IllegalArgumentException => println("CSP Configurations are not found") + case _ => + fail("Unexpected exception type thrown") + } } - sparkSession.stop(); } From e232a92f3633ef53bff006b8051e1aab003cf97e Mon Sep 17 00:00:00 2001 From: Manjunath Davanam Date: Mon, 24 Jul 2023 17:09:05 +0530 Subject: [PATCH 03/12] #000 fix: Obsrv CSP Generalisation --- .../analytics/framework/util/CSPUtils.scala | 34 +++++++++++-------- .../analytics/framework/util/CommonUtil.scala | 4 +-- 2 files changed, 21 insertions(+), 17 deletions(-) diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CSPUtils.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CSPUtils.scala index 4b338ea..b133e0b 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CSPUtils.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CSPUtils.scala @@ -3,26 +3,28 @@ import org.apache.spark.SparkContext import org.sunbird.cloud.storage.conf.AppConf trait ICloudStorageProvider { - def setConf(sc: SparkContext): Unit + def setConf(sc: SparkContext, storageKey: Option[String], storageSecret: Option[String]): Unit } object CloudStorageProviders { implicit val className: String = "org.ekstep.analytics.framework.util.CloudStorageProvider" private val providerMap: Map[String, Class[_ <: ICloudStorageProvider]] = Map("s3" -> classOf[S3Provider], "azure" -> classOf[AzureProvider], "gcp" -> classOf[GcpProvider]) - def setSparkCSPConfigurations(sc: SparkContext, csp: String): Unit = { + def setSparkCSPConfigurations(sc: SparkContext, csp: String, storageKey: Option[String], storageSecret: Option[String]): Unit = { providerMap.get(csp.toLowerCase()).foreach { providerClass => val providerConstructor = providerClass.getDeclaredConstructor() val providerInstance:ICloudStorageProvider = providerConstructor.newInstance() - providerInstance.setConf(sc) + providerInstance.setConf(sc, storageKey, storageSecret) } } } class S3Provider extends ICloudStorageProvider { implicit val className: String = "org.ekstep.analytics.framework.util.S3Provider" - override def setConf(sc: SparkContext): Unit = { + override def setConf(sc: SparkContext, storageKey: Option[String], storageSecret: Option[String]): Unit = { JobLogger.log("Configuring S3 AccessKey& SecrateKey to SparkContext") - sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", AppConf.getAwsKey()) - sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", AppConf.getAwsSecret()) + val key = storageKey.filter(_.nonEmpty).map(AppConf.getConfig).getOrElse(AppConf.getAwsKey()) + val secret = storageSecret.filter(_.nonEmpty).map(AppConf.getConfig).getOrElse(AppConf.getAwsSecret()) + sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", key) + sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", secret) val storageEndpoint = AppConf.getConfig("cloud_storage_endpoint") if (storageEndpoint.nonEmpty) { sc.hadoopConfiguration.set("fs.s3n.endpoint", storageEndpoint) @@ -32,23 +34,25 @@ class S3Provider extends ICloudStorageProvider { class AzureProvider extends ICloudStorageProvider { implicit val className: String = "org.ekstep.analytics.framework.util.AzureProvider" - override def setConf(sc: SparkContext): Unit = { + override def setConf(sc: SparkContext, storageKey: Option[String], storageSecret: Option[String]): Unit = { JobLogger.log("Configuring Azure AccessKey& SecrateKey to SparkContext") - val accName = AppConf.getStorageKey("azure") - val accKey = AppConf.getStorageSecret("azure") + val key = storageKey.filter(_.nonEmpty).map(AppConf.getConfig).getOrElse(AppConf.getStorageKey("azure")) + val secret = storageSecret.filter(_.nonEmpty).map(AppConf.getConfig).getOrElse(AppConf.getStorageSecret("azure")) sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem") - sc.hadoopConfiguration.set("fs.azure.account.key." + accName + ".blob.core.windows.net", accKey) - sc.hadoopConfiguration.set("fs.azure.account.keyprovider." + accName + ".blob.core.windows.net", "org.apache.hadoop.fs.azure.SimpleKeyProvider") + sc.hadoopConfiguration.set("fs.azure.account.key." + key + ".blob.core.windows.net", secret) + sc.hadoopConfiguration.set("fs.azure.account.keyprovider." + key + ".blob.core.windows.net", "org.apache.hadoop.fs.azure.SimpleKeyProvider") } } class GcpProvider extends ICloudStorageProvider { implicit val className: String = "org.ekstep.analytics.framework.util.GcpProvider" - override def setConf(sc: SparkContext): Unit = { - JobLogger.log("Configuring GCP AccessKey& SecrateKey to SparkContext") + override def setConf(sc: SparkContext, storageKey: Option[String], storageSecret: Option[String]): Unit = { + JobLogger.log("Configuring GCP AccessKey& SecretKey to SparkContext") + val key = storageKey.filter(_.nonEmpty).map(AppConf.getConfig).getOrElse(AppConf.getStorageKey("gcloud")) + val secret = storageSecret.filter(_.nonEmpty).map(AppConf.getConfig).getOrElse(AppConf.getStorageSecret("gcloud")) sc.hadoopConfiguration.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") sc.hadoopConfiguration.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") - sc.hadoopConfiguration.set("fs.gs.auth.service.account.email", AppConf.getStorageKey("gcloud")) - sc.hadoopConfiguration.set("fs.gs.auth.service.account.private.key", AppConf.getStorageSecret("gcloud")) + sc.hadoopConfiguration.set("fs.gs.auth.service.account.email", key) + sc.hadoopConfiguration.set("fs.gs.auth.service.account.private.key", secret) sc.hadoopConfiguration.set("fs.gs.auth.service.account.private.key.id", AppConf.getConfig("gcloud_private_secret_id")) } } diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala index 3d67962..569cd7d 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala @@ -91,7 +91,7 @@ object CommonUtil { } val sc = new SparkContext(conf) - setSparkCSPConfigurations(sc, AppConf.getConfig("cloud_storage_type")) + setSparkCSPConfigurations(sc, AppConf.getConfig("cloud_storage_type"), None, None) JobLogger.log("Spark Context initialized") sc } @@ -142,7 +142,7 @@ object CommonUtil { } val sparkSession = SparkSession.builder().appName("sunbird-analytics").config(conf).getOrCreate() - setSparkCSPConfigurations(sparkSession.sparkContext, AppConf.getConfig("cloud_storage_type")) + setSparkCSPConfigurations(sparkSession.sparkContext, AppConf.getConfig("cloud_storage_type"), None, None) JobLogger.log("SparkSession initialized") sparkSession } From be0d369ac74b726bd3688ce1309de06650dc7aea Mon Sep 17 00:00:00 2001 From: Manjunath Davanam Date: Fri, 28 Jul 2023 16:44:48 +0530 Subject: [PATCH 04/12] #000 fix: Disabling the Testcases to validate the restutil file with the circleci --- .../org/ekstep/analytics/framework/util/TestRestUtil.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestRestUtil.scala b/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestRestUtil.scala index f78927b..0b55a96 100644 --- a/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestRestUtil.scala +++ b/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestRestUtil.scala @@ -70,7 +70,7 @@ class TestRestUtil extends BaseSpec { response.json.get("popularity").get should be(1); } - it should "execute PUT and parse response" in { + ignore should "execute PUT and parse response" in { val url = "https://httpbin.org/put?type=test"; val request = Map("popularity" -> 1); val response = RestUtil.put[PostR](url, JSONUtils.serialize(request), Option(Map("accept" -> "application/json"))); @@ -85,7 +85,7 @@ class TestRestUtil extends BaseSpec { response2 should be(null); } - it should "execute Delete and parse response" in { + ignore should "execute Delete and parse response" in { val url = "https://httpbin.org/delete"; val response = RestUtil.delete[PostR](url, Option(Map("accept" -> "application/json"))); response should not be null; From fcbe84210dc356bf0bc0a5bfaceeaee71de870e0 Mon Sep 17 00:00:00 2001 From: Manjunath Davanam Date: Wed, 2 Aug 2023 12:57:51 +0530 Subject: [PATCH 05/12] #000 fix: Revert the the ingnored testcases --- .../org/ekstep/analytics/framework/util/TestRestUtil.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestRestUtil.scala b/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestRestUtil.scala index 0b55a96..f78927b 100644 --- a/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestRestUtil.scala +++ b/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestRestUtil.scala @@ -70,7 +70,7 @@ class TestRestUtil extends BaseSpec { response.json.get("popularity").get should be(1); } - ignore should "execute PUT and parse response" in { + it should "execute PUT and parse response" in { val url = "https://httpbin.org/put?type=test"; val request = Map("popularity" -> 1); val response = RestUtil.put[PostR](url, JSONUtils.serialize(request), Option(Map("accept" -> "application/json"))); @@ -85,7 +85,7 @@ class TestRestUtil extends BaseSpec { response2 should be(null); } - ignore should "execute Delete and parse response" in { + it should "execute Delete and parse response" in { val url = "https://httpbin.org/delete"; val response = RestUtil.delete[PostR](url, Option(Map("accept" -> "application/json"))); response should not be null; From 6c7e688dea71a22375b8df1e165b09626c7d2cbe Mon Sep 17 00:00:00 2001 From: Manjunath Davanam Date: Fri, 4 Aug 2023 12:19:00 +0530 Subject: [PATCH 06/12] OB-525 feat: Passing Cloud Storage SDK as a argument to build the artifact --- .circleci/config.yml | 2 +- Jenkinsfile | 2 +- analytics-core/pom.xml | 9 ++++++--- auto_build_deploy | 2 +- 4 files changed, 9 insertions(+), 6 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 0762d72..9faf345 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -11,7 +11,7 @@ jobs: name: sunbird-analytics-core-build command: | java -version - mvn scoverage:report + mvn scoverage:report -DCLOUD_STORE_GROUP_ID=org.sunbird -DCLOUD_STORE_ARTIFACT_ID=cloud-store-sdk_2.12 -DCLOUD_STORE_VERSION=1.4.0 - save_cache: key: dp-dependency-cache-{{ checksum "pom.xml" }} paths: ~/.m2 diff --git a/Jenkinsfile b/Jenkinsfile index d79bf75..bf656fd 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -30,7 +30,7 @@ node('build-slave') { export JAVA_HOME=/usr/lib/jvm/jdk-11.0.2 export PATH=$JAVA_HOME/bin:$PATH echo $(java -version) - mvn clean install -DskipTests + mvn clean install -DskipTests -DCLOUD_STORE_GROUP_ID=org.sunbird -DCLOUD_STORE_ARTIFACT_ID=cloud-store-sdk_2.12 -DCLOUD_STORE_VERSION=1.4.0 ''' } stage('Archive artifacts'){ diff --git a/analytics-core/pom.xml b/analytics-core/pom.xml index 975f912..bbb823f 100644 --- a/analytics-core/pom.xml +++ b/analytics-core/pom.xml @@ -210,9 +210,12 @@ --> - org.sunbird - cloud-store-sdk_${scala.maj.version} - 1.4.0 + + + + ${CLOUD_STORE_GROUP_ID} + ${CLOUD_STORE_ARTIFACT_ID} + ${CLOUD_STORE_VERSION} com.microsoft.azure diff --git a/auto_build_deploy b/auto_build_deploy index 71baf4e..ea666f3 100644 --- a/auto_build_deploy +++ b/auto_build_deploy @@ -27,7 +27,7 @@ node('build-slave') { export JAVA_HOME=/usr/lib/jvm/jdk-11.0.2 export PATH=$JAVA_HOME/bin:$PATH echo $(java -version) - mvn clean install -DskipTests + mvn clean install -DskipTests -DCLOUD_STORE_GROUP_ID=org.sunbird -DCLOUD_STORE_ARTIFACT_ID=cloud-store-sdk_2.12 -DCLOUD_STORE_VERSION=1.4.0 ''' } stage('Archive artifacts'){ From 6d98da74aefaf54ae78a5253cff9792e38d339fd Mon Sep 17 00:00:00 2001 From: Manjunath Davanam Date: Fri, 4 Aug 2023 12:47:11 +0530 Subject: [PATCH 07/12] OB-525 feat: Passing Cloud Storage SDK as a argument to build the artifact --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 9faf345..8b69661 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -24,7 +24,7 @@ jobs: name: sonar command: | export JAVA_HOME=/usr/lib/jvm/java-1.11.0-openjdk-amd64 - mvn -X sonar:sonar -Dsonar.projectKey=project-sunbird_sunbird-analytics-core -Dsonar.organization=project-sunbird -Dsonar.exclusions=analytics-core/src/main/scala/org/ekstep/analytics/streaming/** -Dsonar.host.url=https://sonarcloud.io -Dsonar.scala.coverage.reportPaths=/home/circleci/project/target/scoverage.xml + mvn -X sonar:sonar -DCLOUD_STORE_GROUP_ID=org.sunbird -DCLOUD_STORE_ARTIFACT_ID=cloud-store-sdk_2.12 -DCLOUD_STORE_VERSION=1.4.0 -Dsonar.projectKey=project-sunbird_sunbird-analytics-core -Dsonar.organization=project-sunbird -Dsonar.exclusions=analytics-core/src/main/scala/org/ekstep/analytics/streaming/** -Dsonar.host.url=https://sonarcloud.io -Dsonar.scala.coverage.reportPaths=/home/circleci/project/target/scoverage.xml workflows: version: 2.1 From 6d55d64b4a86de48c5008f071c6933f4173f386c Mon Sep 17 00:00:00 2001 From: Manjunath Davanam Date: Fri, 4 Aug 2023 14:16:56 +0530 Subject: [PATCH 08/12] OB-525 feat: Updated the OCI Configurations --- .../analytics/framework/util/CSPUtils.scala | 21 +++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CSPUtils.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CSPUtils.scala index b133e0b..cfd1035 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CSPUtils.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CSPUtils.scala @@ -8,7 +8,7 @@ trait ICloudStorageProvider { object CloudStorageProviders { implicit val className: String = "org.ekstep.analytics.framework.util.CloudStorageProvider" - private val providerMap: Map[String, Class[_ <: ICloudStorageProvider]] = Map("s3" -> classOf[S3Provider], "azure" -> classOf[AzureProvider], "gcp" -> classOf[GcpProvider]) + private val providerMap: Map[String, Class[_ <: ICloudStorageProvider]] = Map("s3" -> classOf[S3Provider], "azure" -> classOf[AzureProvider], "gcp" -> classOf[GcpProvider], "oci" -> classOf[OCIProvider]) def setSparkCSPConfigurations(sc: SparkContext, csp: String, storageKey: Option[String], storageSecret: Option[String]): Unit = { providerMap.get(csp.toLowerCase()).foreach { providerClass => val providerConstructor = providerClass.getDeclaredConstructor() @@ -16,11 +16,13 @@ object CloudStorageProviders { providerInstance.setConf(sc, storageKey, storageSecret) } } + + } class S3Provider extends ICloudStorageProvider { implicit val className: String = "org.ekstep.analytics.framework.util.S3Provider" override def setConf(sc: SparkContext, storageKey: Option[String], storageSecret: Option[String]): Unit = { - JobLogger.log("Configuring S3 AccessKey& SecrateKey to SparkContext") + JobLogger.log("Configuring S3 Access Key & Secret Key to SparkContext") val key = storageKey.filter(_.nonEmpty).map(AppConf.getConfig).getOrElse(AppConf.getAwsKey()) val secret = storageSecret.filter(_.nonEmpty).map(AppConf.getConfig).getOrElse(AppConf.getAwsSecret()) sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", key) @@ -35,7 +37,7 @@ class S3Provider extends ICloudStorageProvider { class AzureProvider extends ICloudStorageProvider { implicit val className: String = "org.ekstep.analytics.framework.util.AzureProvider" override def setConf(sc: SparkContext, storageKey: Option[String], storageSecret: Option[String]): Unit = { - JobLogger.log("Configuring Azure AccessKey& SecrateKey to SparkContext") + JobLogger.log("Configuring Azure Access Key & Secret Key to SparkContext") val key = storageKey.filter(_.nonEmpty).map(AppConf.getConfig).getOrElse(AppConf.getStorageKey("azure")) val secret = storageSecret.filter(_.nonEmpty).map(AppConf.getConfig).getOrElse(AppConf.getStorageSecret("azure")) sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem") @@ -46,7 +48,7 @@ class AzureProvider extends ICloudStorageProvider { class GcpProvider extends ICloudStorageProvider { implicit val className: String = "org.ekstep.analytics.framework.util.GcpProvider" override def setConf(sc: SparkContext, storageKey: Option[String], storageSecret: Option[String]): Unit = { - JobLogger.log("Configuring GCP AccessKey& SecretKey to SparkContext") + JobLogger.log("Configuring GCP Access Key & Secret Key to SparkContext") val key = storageKey.filter(_.nonEmpty).map(AppConf.getConfig).getOrElse(AppConf.getStorageKey("gcloud")) val secret = storageSecret.filter(_.nonEmpty).map(AppConf.getConfig).getOrElse(AppConf.getStorageSecret("gcloud")) sc.hadoopConfiguration.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") @@ -56,3 +58,14 @@ class GcpProvider extends ICloudStorageProvider { sc.hadoopConfiguration.set("fs.gs.auth.service.account.private.key.id", AppConf.getConfig("gcloud_private_secret_id")) } } + +class OCIProvider extends ICloudStorageProvider { + implicit val className: String = "org.ekstep.analytics.framework.util.OCIProvider" + override def setConf(sc: SparkContext, storageKey: Option[String], storageSecret: Option[String]): Unit = { + val key = storageKey.filter(_.nonEmpty).map(AppConf.getConfig).getOrElse(AppConf.getStorageKey("oci")) + val secret = storageSecret.filter(_.nonEmpty).map(AppConf.getConfig).getOrElse(AppConf.getStorageSecret("oci")) + JobLogger.log("Configuring OCI Access Key & Secret Key to SparkContext") + sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", key); + sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", secret); + } +} \ No newline at end of file From 2abe9dc2092f2cb4fbbf0e7015039804eff13e7d Mon Sep 17 00:00:00 2001 From: Manjunath Davanam Date: Tue, 8 Aug 2023 17:16:36 +0530 Subject: [PATCH 09/12] OB-525 feat: Review comments changes --- .../analytics/framework/util/CSPUtils.scala | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CSPUtils.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CSPUtils.scala index cfd1035..9218e99 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CSPUtils.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CSPUtils.scala @@ -6,20 +6,18 @@ trait ICloudStorageProvider { def setConf(sc: SparkContext, storageKey: Option[String], storageSecret: Option[String]): Unit } + + object CloudStorageProviders { implicit val className: String = "org.ekstep.analytics.framework.util.CloudStorageProvider" - private val providerMap: Map[String, Class[_ <: ICloudStorageProvider]] = Map("s3" -> classOf[S3Provider], "azure" -> classOf[AzureProvider], "gcp" -> classOf[GcpProvider], "oci" -> classOf[OCIProvider]) + private val providerMap: Map[String, ICloudStorageProvider] = Map("s3" -> S3Provider, "azure" -> AzureProvider, "gcp" -> GcpProvider, "oci" -> OCIProvider) def setSparkCSPConfigurations(sc: SparkContext, csp: String, storageKey: Option[String], storageSecret: Option[String]): Unit = { - providerMap.get(csp.toLowerCase()).foreach { providerClass => - val providerConstructor = providerClass.getDeclaredConstructor() - val providerInstance:ICloudStorageProvider = providerConstructor.newInstance() - providerInstance.setConf(sc, storageKey, storageSecret) + providerMap.get(csp.toLowerCase()).foreach { provider => + provider.setConf(sc, storageKey, storageSecret) } } - - } -class S3Provider extends ICloudStorageProvider { +object S3Provider extends ICloudStorageProvider { implicit val className: String = "org.ekstep.analytics.framework.util.S3Provider" override def setConf(sc: SparkContext, storageKey: Option[String], storageSecret: Option[String]): Unit = { JobLogger.log("Configuring S3 Access Key & Secret Key to SparkContext") @@ -34,7 +32,7 @@ class S3Provider extends ICloudStorageProvider { } } -class AzureProvider extends ICloudStorageProvider { +object AzureProvider extends ICloudStorageProvider { implicit val className: String = "org.ekstep.analytics.framework.util.AzureProvider" override def setConf(sc: SparkContext, storageKey: Option[String], storageSecret: Option[String]): Unit = { JobLogger.log("Configuring Azure Access Key & Secret Key to SparkContext") @@ -45,7 +43,7 @@ class AzureProvider extends ICloudStorageProvider { sc.hadoopConfiguration.set("fs.azure.account.keyprovider." + key + ".blob.core.windows.net", "org.apache.hadoop.fs.azure.SimpleKeyProvider") } } -class GcpProvider extends ICloudStorageProvider { +object GcpProvider extends ICloudStorageProvider { implicit val className: String = "org.ekstep.analytics.framework.util.GcpProvider" override def setConf(sc: SparkContext, storageKey: Option[String], storageSecret: Option[String]): Unit = { JobLogger.log("Configuring GCP Access Key & Secret Key to SparkContext") @@ -59,7 +57,7 @@ class GcpProvider extends ICloudStorageProvider { } } -class OCIProvider extends ICloudStorageProvider { +object OCIProvider extends ICloudStorageProvider { implicit val className: String = "org.ekstep.analytics.framework.util.OCIProvider" override def setConf(sc: SparkContext, storageKey: Option[String], storageSecret: Option[String]): Unit = { val key = storageKey.filter(_.nonEmpty).map(AppConf.getConfig).getOrElse(AppConf.getStorageKey("oci")) From 93c73ebb741c12e11f8975a28df4c07bd8d673f5 Mon Sep 17 00:00:00 2001 From: Manjunath Davanam Date: Wed, 9 Aug 2023 11:07:52 +0530 Subject: [PATCH 10/12] OB-525 feat: Variablised the cloud storage SDK details in the circleci config file. --- .circleci/config.yml | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 8b69661..b16f59f 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -1,4 +1,8 @@ version: 2.1 +env: + CLOUD_STORE_VERSION: "1.4.0" + CLOUD_STORE_ARTIFACT_ID: "cloud-store-sdk_2.12" + CLOUD_STORE_GROUP_ID: "org.sunbird" jobs: analytics-core-build: machine: @@ -11,7 +15,7 @@ jobs: name: sunbird-analytics-core-build command: | java -version - mvn scoverage:report -DCLOUD_STORE_GROUP_ID=org.sunbird -DCLOUD_STORE_ARTIFACT_ID=cloud-store-sdk_2.12 -DCLOUD_STORE_VERSION=1.4.0 + mvn scoverage:report -DCLOUD_STORE_GROUP_ID=$CLOUD_STORE_GROUP_ID -DCLOUD_STORE_ARTIFACT_ID=$CLOUD_STORE_ARTIFACT_ID -DCLOUD_STORE_VERSION=$CLOUD_STORE_VERSION - save_cache: key: dp-dependency-cache-{{ checksum "pom.xml" }} paths: ~/.m2 @@ -24,7 +28,7 @@ jobs: name: sonar command: | export JAVA_HOME=/usr/lib/jvm/java-1.11.0-openjdk-amd64 - mvn -X sonar:sonar -DCLOUD_STORE_GROUP_ID=org.sunbird -DCLOUD_STORE_ARTIFACT_ID=cloud-store-sdk_2.12 -DCLOUD_STORE_VERSION=1.4.0 -Dsonar.projectKey=project-sunbird_sunbird-analytics-core -Dsonar.organization=project-sunbird -Dsonar.exclusions=analytics-core/src/main/scala/org/ekstep/analytics/streaming/** -Dsonar.host.url=https://sonarcloud.io -Dsonar.scala.coverage.reportPaths=/home/circleci/project/target/scoverage.xml + mvn -X sonar:sonar -DCLOUD_STORE_GROUP_ID=$CLOUD_STORE_GROUP_ID -DCLOUD_STORE_ARTIFACT_ID=$CLOUD_STORE_ARTIFACT_ID -DCLOUD_STORE_VERSION=$CLOUD_STORE_VERSION -Dsonar.projectKey=project-sunbird_sunbird-analytics-core -Dsonar.organization=project-sunbird -Dsonar.exclusions=analytics-core/src/main/scala/org/ekstep/analytics/streaming/** -Dsonar.host.url=https://sonarcloud.io -Dsonar.scala.coverage.reportPaths=/home/circleci/project/target/scoverage.xml workflows: version: 2.1 From 0d6a05fa0ad6e99a93971782c11bc8da1338f67d Mon Sep 17 00:00:00 2001 From: Manjunath Davanam Date: Wed, 9 Aug 2023 11:13:55 +0530 Subject: [PATCH 11/12] OB-525 feat: Variablised the cloud storage SDK details in the circleci config file. --- .circleci/config.yml | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index b16f59f..132f16c 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -1,12 +1,13 @@ version: 2.1 -env: - CLOUD_STORE_VERSION: "1.4.0" - CLOUD_STORE_ARTIFACT_ID: "cloud-store-sdk_2.12" - CLOUD_STORE_GROUP_ID: "org.sunbird" +environment: jobs: analytics-core-build: machine: image: ubuntu-2004:202008-01 + environment: + CLOUD_STORE_VERSION: "1.4.0" + CLOUD_STORE_ARTIFACT_ID: "cloud-store-sdk_2.12" + CLOUD_STORE_GROUP_ID: "org.sunbird" steps: - checkout - restore_cache: From af3652f5608ab7685f590cb4fe4b9d9c6aea42df Mon Sep 17 00:00:00 2001 From: Manjunath Davanam Date: Wed, 9 Aug 2023 13:18:09 +0530 Subject: [PATCH 12/12] OB-525 feat: Simplified the keys access logic --- .../analytics/framework/util/CSPUtils.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CSPUtils.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CSPUtils.scala index 9218e99..a0d1736 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CSPUtils.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CSPUtils.scala @@ -21,8 +21,8 @@ object S3Provider extends ICloudStorageProvider { implicit val className: String = "org.ekstep.analytics.framework.util.S3Provider" override def setConf(sc: SparkContext, storageKey: Option[String], storageSecret: Option[String]): Unit = { JobLogger.log("Configuring S3 Access Key & Secret Key to SparkContext") - val key = storageKey.filter(_.nonEmpty).map(AppConf.getConfig).getOrElse(AppConf.getAwsKey()) - val secret = storageSecret.filter(_.nonEmpty).map(AppConf.getConfig).getOrElse(AppConf.getAwsSecret()) + val key = storageKey.filter(_.nonEmpty).map(value => AppConf.getConfig(value)).getOrElse(AppConf.getAwsKey()) + val secret = storageSecret.filter(_.nonEmpty).map(value => AppConf.getConfig(value)).getOrElse(AppConf.getAwsSecret()) sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", key) sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", secret) val storageEndpoint = AppConf.getConfig("cloud_storage_endpoint") @@ -36,8 +36,8 @@ object AzureProvider extends ICloudStorageProvider { implicit val className: String = "org.ekstep.analytics.framework.util.AzureProvider" override def setConf(sc: SparkContext, storageKey: Option[String], storageSecret: Option[String]): Unit = { JobLogger.log("Configuring Azure Access Key & Secret Key to SparkContext") - val key = storageKey.filter(_.nonEmpty).map(AppConf.getConfig).getOrElse(AppConf.getStorageKey("azure")) - val secret = storageSecret.filter(_.nonEmpty).map(AppConf.getConfig).getOrElse(AppConf.getStorageSecret("azure")) + val key = storageKey.filter(_.nonEmpty).map(value => AppConf.getConfig(value)).getOrElse(AppConf.getStorageKey("azure")) + val secret = storageSecret.filter(_.nonEmpty).map(value => AppConf.getConfig(value)).getOrElse(AppConf.getStorageSecret("azure")) sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem") sc.hadoopConfiguration.set("fs.azure.account.key." + key + ".blob.core.windows.net", secret) sc.hadoopConfiguration.set("fs.azure.account.keyprovider." + key + ".blob.core.windows.net", "org.apache.hadoop.fs.azure.SimpleKeyProvider") @@ -47,8 +47,8 @@ object GcpProvider extends ICloudStorageProvider { implicit val className: String = "org.ekstep.analytics.framework.util.GcpProvider" override def setConf(sc: SparkContext, storageKey: Option[String], storageSecret: Option[String]): Unit = { JobLogger.log("Configuring GCP Access Key & Secret Key to SparkContext") - val key = storageKey.filter(_.nonEmpty).map(AppConf.getConfig).getOrElse(AppConf.getStorageKey("gcloud")) - val secret = storageSecret.filter(_.nonEmpty).map(AppConf.getConfig).getOrElse(AppConf.getStorageSecret("gcloud")) + val key = storageKey.filter(_.nonEmpty).map(value => AppConf.getConfig(value)).getOrElse(AppConf.getStorageKey("gcloud")) + val secret = storageSecret.filter(_.nonEmpty).map(value => AppConf.getConfig(value)).getOrElse(AppConf.getStorageSecret("gcloud")) sc.hadoopConfiguration.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") sc.hadoopConfiguration.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") sc.hadoopConfiguration.set("fs.gs.auth.service.account.email", key) @@ -60,8 +60,8 @@ object GcpProvider extends ICloudStorageProvider { object OCIProvider extends ICloudStorageProvider { implicit val className: String = "org.ekstep.analytics.framework.util.OCIProvider" override def setConf(sc: SparkContext, storageKey: Option[String], storageSecret: Option[String]): Unit = { - val key = storageKey.filter(_.nonEmpty).map(AppConf.getConfig).getOrElse(AppConf.getStorageKey("oci")) - val secret = storageSecret.filter(_.nonEmpty).map(AppConf.getConfig).getOrElse(AppConf.getStorageSecret("oci")) + val key = storageKey.filter(_.nonEmpty).map(value => AppConf.getConfig(value)).getOrElse(AppConf.getStorageKey("oci")) + val secret = storageSecret.filter(_.nonEmpty).map(value => AppConf.getConfig(value)).getOrElse(AppConf.getStorageSecret("oci")) JobLogger.log("Configuring OCI Access Key & Secret Key to SparkContext") sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", key); sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", secret);