Skip to content

Commit

Permalink
fix(configuration): fix remote file loading (#333)
Browse files Browse the repository at this point in the history
  • Loading branch information
lyogev authored May 24, 2020
1 parent 59a56b5 commit 9d5058d
Show file tree
Hide file tree
Showing 10 changed files with 72 additions and 62 deletions.
5 changes: 4 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ lazy val excludeAvro = ExclusionRule(organization = "org.apache.avro", name = "a
lazy val excludeSpark = ExclusionRule(organization = "org.apache.spark")
lazy val excludeFasterXML = ExclusionRule(organization = "com.fasterxml.jackson.module", name= "jackson-module-scala_2.12")
lazy val excludeMetricsCore = ExclusionRule(organization = "io.dropwizard.metrics", name= "metrics-core")
lazy val excludeLog4j = ExclusionRule(organization = "org.apache.logging.log4j")
lazy val excludeParquet = ExclusionRule(organization = "org.apache.parquet")

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
Expand Down Expand Up @@ -65,7 +67,8 @@ libraryDependencies ++= Seq(
"org.apache.hudi" %% "hudi-spark-bundle" % "0.5.2-incubating" % "provided" excludeAll excludeFasterXML,
"org.apache.parquet" % "parquet-avro" % "1.10.1" % "provided",
"org.apache.avro" % "avro" % "1.8.2" % "provided",
"org.apache.hive" % "hive-jdbc" % "2.3.3" % "provided" excludeAll(excludeNetty, excludeNettyAll)
"org.apache.hive" % "hive-jdbc" % "2.3.3" % "provided" excludeAll(excludeNetty, excludeNettyAll, excludeLog4j, excludeParquet),
"org.apache.hadoop" % "hadoop-aws" % "2.7.3" % "provided"
)

// Temporary fix for https://github.com/databricks/spark-redshift/issues/315#issuecomment-285294306
Expand Down
10 changes: 1 addition & 9 deletions examples/movies_metric.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,7 @@ steps:
FROM moviesWithRatings
WHERE genres LIKE '%Fantasy%'
- dataFrameName: topFantasyMovies
sql:
SELECT movieId,
title,
avg(rating) AS averageRating
FROM fantasyMoviesWithRatings
GROUP BY movieId,
title
ORDER BY averageRating DESC
LIMIT 100
file: topFantasyMovies.sql
- dataFrameName: myFavoriteMovieRated
sql:
SELECT *
Expand Down
8 changes: 8 additions & 0 deletions examples/topFantasyMovies.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
SELECT movieId,
title,
avg(rating) AS averageRating
FROM fantasyMoviesWithRatings
GROUP BY movieId,
title
ORDER BY averageRating DESC
LIMIT 100
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.yotpo.metorikku.configuration.job

import java.nio.file.{Files, Paths}

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.yotpo.metorikku.exceptions.{MetorikkuException, MetorikkuInvalidMetricFileException}
Expand All @@ -22,14 +20,6 @@ object ConfigurationParser {
opt[String]('c', "config")
.text("Path to the job config file (YAML/JSON)")
.action((x, c) => c.copy(filename = Option(x)))
.validate(x => {
if (Files.exists(Paths.get(x))) {
success
}
else {
failure("Supplied file not found")
}
})
help("help") text "use command line arguments to specify the configuration file path or content"
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.yotpo.metorikku.configuration.metric

import java.io.File
import java.io.{File, FileNotFoundException}

import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.yotpo.metorikku.exceptions.MetorikkuInvalidMetricFileException
Expand All @@ -20,15 +20,20 @@ object ConfigurationParser {
validExtensions.contains(extension)
}

def parse(path: File): Metric = {
val fileName: String = path.getName
val metricDir: File = path.getParentFile
def parse(path: String): Metric = {
val hadoopPath = FileUtils.getHadoopPath(path)
val fileName = hadoopPath.getName
val metricDir = FileUtils.isLocalFile(path) match {
case true => Option(new File(path).getParentFile)
case false => None
}

log.info(s"Initializing Metric file $fileName")
try {
val metricConfig = parseFile(path.getPath)
val metricConfig = parseFile(path)
Metric(metricConfig, metricDir, FilenameUtils.removeExtension(fileName))
} catch {
case e: FileNotFoundException => throw e
case e: Exception => throw MetorikkuInvalidMetricFileException(s"Failed to parse metric file $fileName", e)
}
}
Expand Down
8 changes: 2 additions & 6 deletions src/main/scala/com/yotpo/metorikku/metric/Metric.scala
Original file line number Diff line number Diff line change
@@ -1,26 +1,22 @@
package com.yotpo.metorikku.metric

import java.io.File
import java.util.concurrent.TimeUnit

import com.yotpo.metorikku.Job
import com.yotpo.metorikku.configuration.job.Streaming
import com.yotpo.metorikku.configuration.metric.{Configuration, Output}
import com.yotpo.metorikku.configuration.metric.OutputType.OutputType
import com.yotpo.metorikku.exceptions.{MetorikkuFailedStepException, MetorikkuWriteFailedException}
import com.yotpo.metorikku.instrumentation.InstrumentationProvider
import com.yotpo.metorikku.output.{Writer, WriterFactory}
import org.apache.log4j.LogManager
import org.apache.spark.sql.types.TimestampType
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.streaming.Seconds
import org.apache.spark.sql.DataFrame

import scala.collection.mutable
import scala.collection.mutable.ListBuffer

case class StreamingWritingConfiguration(dataFrame: DataFrame, outputConfig: Output, writers: ListBuffer[Writer] = ListBuffer.empty)
case class StreamingWriting(streamingWritingConfiguration: StreamingWritingConfiguration)
case class Metric(configuration: Configuration, metricDir: File, metricName: String) {
case class Metric(configuration: Configuration, metricDir: Option[File], metricName: String) {
val log = LogManager.getLogger(this.getClass)

def calculate(job: Job): Unit = {
Expand Down
10 changes: 8 additions & 2 deletions src/main/scala/com/yotpo/metorikku/metric/MetricSet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,14 @@ class MetricSet(metricSet: String, write: Boolean = true) {

def parseMetrics(metricSet: String): Seq[Metric] = {
log.info(s"Starting to parse metricSet")
val metricsToCalculate = FileUtils.getListOfFiles(metricSet)
metricsToCalculate.filter(ConfigurationParser.isValidFile(_)).map(ConfigurationParser.parse(_))

FileUtils.isLocalDirectory(metricSet) match {
case true => {
val metricsToCalculate = FileUtils.getListOfLocalFiles(metricSet)
metricsToCalculate.filter(ConfigurationParser.isValidFile(_)).map(f => ConfigurationParser.parse(f.getPath))
}
case false => Seq(ConfigurationParser.parse(metricSet))
}
}

def run(job: Job) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,20 @@ import com.yotpo.metorikku.metric.stepActions.Code
import com.yotpo.metorikku.utils.FileUtils

object StepFactory {
def getStepAction(configuration: Step, metricDir: File, metricName: String,
def getStepAction(configuration: Step, metricDir: Option[File], metricName: String,
showPreviewLines: Int, cacheOnPreview: Option[Boolean],
showQuery: Option[Boolean]): StepAction[_] = {
configuration.sql match {
case Some(expression) => Sql(expression, configuration.dataFrameName, showPreviewLines, cacheOnPreview, showQuery)
case None => {
configuration.file match {
case Some(filePath) =>
val path = metricDir match {
case Some(dir) => new File(dir, filePath).getPath
case _ => filePath
}
Sql(
FileUtils.readConfigurationFile(new File(metricDir, filePath).getPath),
FileUtils.readConfigurationFile(path),
configuration.dataFrameName, showPreviewLines, cacheOnPreview, showQuery
)
case None => {
Expand Down
58 changes: 32 additions & 26 deletions src/main/scala/com/yotpo/metorikku/utils/FileUtils.scala
Original file line number Diff line number Diff line change
@@ -1,24 +1,29 @@
package com.yotpo.metorikku.utils

import java.io.{BufferedReader, File, FileNotFoundException, InputStreamReader}
import java.net.URI
import java.util.stream.Collectors

import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
import com.yotpo.metorikku.exceptions.MetorikkuException
import org.apache.commons.io.FilenameUtils
import org.apache.commons.text.StringSubstitutor
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{FSDataInputStream, FileSystem, Path}
import org.apache.spark.sql.SparkSession
import org.json4s.DefaultFormats
import org.json4s.native.JsonMethods

import scala.io.Source
import scala.collection.JavaConverters._

case class HadoopPath(path: Path, fs: FileSystem) {
def open: FSDataInputStream = {
fs.open(path)
}

def getName: String = {
path.getName
}
}

object FileUtils {
def getListOfFiles(dir: String): List[File] = {
def getListOfLocalFiles(dir: String): List[File] = {
val d = new File(dir)
if (d.isDirectory) {
d.listFiles.filter(_.isFile).toList
Expand All @@ -29,23 +34,6 @@ object FileUtils {
}
}

def jsonFileToObject[T: Manifest](file: File): T = {
implicit val formats = DefaultFormats
val jsonString = scala.io.Source.fromFile(file).mkString

try {
val json = JsonMethods.parse(jsonString)
json.extract[T]
} catch {
case cast: ClassCastException => throw MetorikkuException(s"Failed to cast json file " + file, cast)
case other: Throwable => throw other
}
}

def getContentFromFileAsString(file: File): String = {
scala.io.Source.fromFile(file).mkString // //By scala.io. on read spark fail with legit error when path does not exists
}

def getObjectMapperByExtension(extension: String): Option[ObjectMapper] = {
extension match {
case "json" => Option(new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false))
Expand All @@ -69,13 +57,31 @@ object FileUtils {
StringSubstitutor.replace(fileContents, envAndSystemProperties.asJava)
}

def readFileWithHadoop(path: String): String = {


def getHadoopPath(path: String): HadoopPath = {
val hadoopConf = SparkSession.builder().getOrCreate().sessionState.newHadoopConf()

val file = new Path(path)

val fs = file.getFileSystem(hadoopConf)
val fsFile = fs.open(file)
HadoopPath(file, fs)
}

def readFileWithHadoop(path: String): String = {
val hadoopPath = getHadoopPath(path)

val fsFile = hadoopPath.open

val reader = new BufferedReader(new InputStreamReader(fsFile))
reader.lines.collect(Collectors.joining("\n"))
}

def isLocalDirectory(path: String): Boolean = {
new File(path).isDirectory
}

def isLocalFile(path: String): Boolean = {
new File(path).isFile
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class MetorikkuTest extends FunSuite with BeforeAndAfterAll {
val thrown = intercept[FileNotFoundException] {
Metorikku.main(Array("-c", "src/test/scala/com/yotpo/metorikku/test/metorikku-test-config-invalid-metrics.yaml"))
}
assert(thrown.getMessage.startsWith("No Files to Run"))
assert(thrown.getMessage.endsWith("does not exist"))

}

Expand Down

0 comments on commit 9d5058d

Please sign in to comment.