Skip to content

Commit

Permalink
feat(config): allow reading configuration files remotely (s3, HDFS) (#…
Browse files Browse the repository at this point in the history
…329)

* feat(config): allow reading configuration files remotely (s3, HDFS)

* feat(config): allow reading configuration files remotely (s3, HDFS)
  • Loading branch information
lyogev authored May 21, 2020
1 parent 57684ea commit 59a56b5
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 9 deletions.
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,17 @@ There are currently 3 options to run Metorikku.
* Run the following command:
`spark-submit --class com.yotpo.metorikku.Metorikku metorikku.jar -c config.yaml`

*Running with remote job/metric files:*

Metorikku supports using remote job/metric files.

Simply write the full path to the job/metric. example: `s3://bucket/job.yaml`

Anything supported by hadoop can be used (s3, hdfs etc.)

To help running both locally and remotely you can add the following env variable at runtime to add a prefix to all your configuration files paths:
`CONFIG_FILES_PATH_PREFIX=s3://bucket/`

#### Run locally
*Metorikku is released with a JAR that includes a bundled spark.*
* Download the [last released Standalone JAR](https://github.com/YotpoLtd/metorikku/releases/latest)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ object ConfigurationParser {

log.info(s"Initializing Metric file $fileName")
try {
val metricConfig = parseFile(path.getAbsolutePath)
val metricConfig = parseFile(path.getPath)
Metric(metricConfig, metricDir, FilenameUtils.removeExtension(fileName))
} catch {
case e: Exception => throw MetorikkuInvalidMetricFileException(s"Failed to parse metric file $fileName", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ trait FileInputBase {
def getSchemaStruct(schemaPath: Option[String], sparkSession: SparkSession): Option[StructType] = {
schemaPath match {
case Some(path) => {
Option(SchemaConverter.convert(FileUtils.readFileWithHadoop(path, sparkSession)))
Option(SchemaConverter.convert(FileUtils.readFileWithHadoop(path)))
}
case None => None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ object StepFactory {
configuration.file match {
case Some(filePath) =>
Sql(
FileUtils.getContentFromFileAsString(new File(metricDir, filePath)),
FileUtils.readConfigurationFile(new File(metricDir, filePath).getPath),
configuration.dataFrameName, showPreviewLines, cacheOnPreview, showQuery
)
case None => {
Expand Down
19 changes: 13 additions & 6 deletions src/main/scala/com/yotpo/metorikku/utils/FileUtils.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.yotpo.metorikku.utils

import java.io.{BufferedReader, File, FileNotFoundException, InputStreamReader}
import java.net.URI
import java.util.stream.Collectors

import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
Expand Down Expand Up @@ -58,17 +59,23 @@ object FileUtils {
}

def readConfigurationFile(path: String): String = {
val fileContents = Source.fromFile(path).getLines.mkString("\n")
val interpolationMap = System.getProperties().asScala ++= System.getenv().asScala
StringSubstitutor.replace(fileContents, interpolationMap.asJava)
val envAndSystemProperties = System.getProperties().asScala ++= System.getenv().asScala
val prefix = envAndSystemProperties.get("CONFIG_FILES_PATH_PREFIX") match {
case Some(prefix) => prefix
case _ => ""
}

val fileContents = readFileWithHadoop(prefix + path)
StringSubstitutor.replace(fileContents, envAndSystemProperties.asJava)
}

def readFileWithHadoop(path: String, sparkSession: SparkSession): String = {
def readFileWithHadoop(path: String): String = {
val hadoopConf = SparkSession.builder().getOrCreate().sessionState.newHadoopConf()

val file = new Path(path)
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val fs = file.getFileSystem(hadoopConf)
val fsFile = fs.open(file)
val reader = new BufferedReader(new InputStreamReader(fsFile))
reader.lines.collect(Collectors.joining)
reader.lines.collect(Collectors.joining("\n"))
}
}

0 comments on commit 59a56b5

Please sign in to comment.