From 59a56b5db188c7be67c13e9e16035fac9a0109ac Mon Sep 17 00:00:00 2001 From: Liran Yogev <23477645+lyogev@users.noreply.github.com> Date: Thu, 21 May 2020 10:39:14 +0300 Subject: [PATCH] feat(config): allow reading configuration files remotely (s3, HDFS) (#329) * feat(config): allow reading configuration files remotely (s3, HDFS) * feat(config): allow reading configuration files remotely (s3, HDFS) --- README.md | 11 +++++++++++ .../metric/ConfigurationParser.scala | 2 +- .../input/readers/file/FileInputBase.scala | 2 +- .../metorikku/metric/StepActionFactory.scala | 2 +- .../com/yotpo/metorikku/utils/FileUtils.scala | 19 +++++++++++++------ 5 files changed, 27 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 2d0c33a3e..1b56eeecb 100644 --- a/README.md +++ b/README.md @@ -74,6 +74,17 @@ There are currently 3 options to run Metorikku. * Run the following command: `spark-submit --class com.yotpo.metorikku.Metorikku metorikku.jar -c config.yaml` +*Running with remote job/metric files:* + +Metorikku supports using remote job/metric files. + +Simply write the full path to the job/metric. example: `s3://bucket/job.yaml` + +Anything supported by hadoop can be used (s3, hdfs etc.) + +To help running both locally and remotely you can add the following env variable at runtime to add a prefix to all your configuration files paths: +`CONFIG_FILES_PATH_PREFIX=s3://bucket/` + #### Run locally *Metorikku is released with a JAR that includes a bundled spark.* * Download the [last released Standalone JAR](https://github.com/YotpoLtd/metorikku/releases/latest) diff --git a/src/main/scala/com/yotpo/metorikku/configuration/metric/ConfigurationParser.scala b/src/main/scala/com/yotpo/metorikku/configuration/metric/ConfigurationParser.scala index a54aef719..87b6c12f9 100644 --- a/src/main/scala/com/yotpo/metorikku/configuration/metric/ConfigurationParser.scala +++ b/src/main/scala/com/yotpo/metorikku/configuration/metric/ConfigurationParser.scala @@ -26,7 +26,7 @@ object ConfigurationParser { log.info(s"Initializing Metric file $fileName") try { - val metricConfig = parseFile(path.getAbsolutePath) + val metricConfig = parseFile(path.getPath) Metric(metricConfig, metricDir, FilenameUtils.removeExtension(fileName)) } catch { case e: Exception => throw MetorikkuInvalidMetricFileException(s"Failed to parse metric file $fileName", e) diff --git a/src/main/scala/com/yotpo/metorikku/input/readers/file/FileInputBase.scala b/src/main/scala/com/yotpo/metorikku/input/readers/file/FileInputBase.scala index edea73a03..ef2f6311d 100644 --- a/src/main/scala/com/yotpo/metorikku/input/readers/file/FileInputBase.scala +++ b/src/main/scala/com/yotpo/metorikku/input/readers/file/FileInputBase.scala @@ -44,7 +44,7 @@ trait FileInputBase { def getSchemaStruct(schemaPath: Option[String], sparkSession: SparkSession): Option[StructType] = { schemaPath match { case Some(path) => { - Option(SchemaConverter.convert(FileUtils.readFileWithHadoop(path, sparkSession))) + Option(SchemaConverter.convert(FileUtils.readFileWithHadoop(path))) } case None => None } diff --git a/src/main/scala/com/yotpo/metorikku/metric/StepActionFactory.scala b/src/main/scala/com/yotpo/metorikku/metric/StepActionFactory.scala index 293782170..719e5e639 100644 --- a/src/main/scala/com/yotpo/metorikku/metric/StepActionFactory.scala +++ b/src/main/scala/com/yotpo/metorikku/metric/StepActionFactory.scala @@ -18,7 +18,7 @@ object StepFactory { configuration.file match { case Some(filePath) => Sql( - FileUtils.getContentFromFileAsString(new File(metricDir, filePath)), + FileUtils.readConfigurationFile(new File(metricDir, filePath).getPath), configuration.dataFrameName, showPreviewLines, cacheOnPreview, showQuery ) case None => { diff --git a/src/main/scala/com/yotpo/metorikku/utils/FileUtils.scala b/src/main/scala/com/yotpo/metorikku/utils/FileUtils.scala index 61245fa82..7b3588068 100644 --- a/src/main/scala/com/yotpo/metorikku/utils/FileUtils.scala +++ b/src/main/scala/com/yotpo/metorikku/utils/FileUtils.scala @@ -1,6 +1,7 @@ package com.yotpo.metorikku.utils import java.io.{BufferedReader, File, FileNotFoundException, InputStreamReader} +import java.net.URI import java.util.stream.Collectors import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} @@ -58,17 +59,23 @@ object FileUtils { } def readConfigurationFile(path: String): String = { - val fileContents = Source.fromFile(path).getLines.mkString("\n") - val interpolationMap = System.getProperties().asScala ++= System.getenv().asScala - StringSubstitutor.replace(fileContents, interpolationMap.asJava) + val envAndSystemProperties = System.getProperties().asScala ++= System.getenv().asScala + val prefix = envAndSystemProperties.get("CONFIG_FILES_PATH_PREFIX") match { + case Some(prefix) => prefix + case _ => "" + } + + val fileContents = readFileWithHadoop(prefix + path) + StringSubstitutor.replace(fileContents, envAndSystemProperties.asJava) } - def readFileWithHadoop(path: String, sparkSession: SparkSession): String = { + def readFileWithHadoop(path: String): String = { + val hadoopConf = SparkSession.builder().getOrCreate().sessionState.newHadoopConf() + val file = new Path(path) - val hadoopConf = sparkSession.sessionState.newHadoopConf() val fs = file.getFileSystem(hadoopConf) val fsFile = fs.open(file) val reader = new BufferedReader(new InputStreamReader(fsFile)) - reader.lines.collect(Collectors.joining) + reader.lines.collect(Collectors.joining("\n")) } }