diff --git a/build.sbt b/build.sbt index 36534b2c1..b6473688f 100644 --- a/build.sbt +++ b/build.sbt @@ -28,6 +28,8 @@ lazy val excludeAvro = ExclusionRule(organization = "org.apache.avro", name = "a lazy val excludeSpark = ExclusionRule(organization = "org.apache.spark") lazy val excludeFasterXML = ExclusionRule(organization = "com.fasterxml.jackson.module", name= "jackson-module-scala_2.12") lazy val excludeMetricsCore = ExclusionRule(organization = "io.dropwizard.metrics", name= "metrics-core") +lazy val excludeLog4j = ExclusionRule(organization = "org.apache.logging.log4j") +lazy val excludeParquet = ExclusionRule(organization = "org.apache.parquet") libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % sparkVersion % "provided", @@ -65,7 +67,8 @@ libraryDependencies ++= Seq( "org.apache.hudi" %% "hudi-spark-bundle" % "0.5.2-incubating" % "provided" excludeAll excludeFasterXML, "org.apache.parquet" % "parquet-avro" % "1.10.1" % "provided", "org.apache.avro" % "avro" % "1.8.2" % "provided", - "org.apache.hive" % "hive-jdbc" % "2.3.3" % "provided" excludeAll(excludeNetty, excludeNettyAll) + "org.apache.hive" % "hive-jdbc" % "2.3.3" % "provided" excludeAll(excludeNetty, excludeNettyAll, excludeLog4j, excludeParquet), + "org.apache.hadoop" % "hadoop-aws" % "2.7.3" % "provided" ) // Temporary fix for https://github.com/databricks/spark-redshift/issues/315#issuecomment-285294306 diff --git a/examples/movies_metric.yaml b/examples/movies_metric.yaml index 5c90683d0..e90c449e4 100644 --- a/examples/movies_metric.yaml +++ b/examples/movies_metric.yaml @@ -19,15 +19,7 @@ steps: FROM moviesWithRatings WHERE genres LIKE '%Fantasy%' - dataFrameName: topFantasyMovies - sql: - SELECT movieId, - title, - avg(rating) AS averageRating - FROM fantasyMoviesWithRatings - GROUP BY movieId, - title - ORDER BY averageRating DESC - LIMIT 100 + file: topFantasyMovies.sql - dataFrameName: myFavoriteMovieRated sql: SELECT * diff --git a/examples/topFantasyMovies.sql b/examples/topFantasyMovies.sql new file mode 100644 index 000000000..242a4c991 --- /dev/null +++ b/examples/topFantasyMovies.sql @@ -0,0 +1,8 @@ + SELECT movieId, + title, + avg(rating) AS averageRating + FROM fantasyMoviesWithRatings + GROUP BY movieId, + title + ORDER BY averageRating DESC + LIMIT 100 \ No newline at end of file diff --git a/src/main/scala/com/yotpo/metorikku/configuration/job/ConfigurationParser.scala b/src/main/scala/com/yotpo/metorikku/configuration/job/ConfigurationParser.scala index 0c3d390e1..41e410ac8 100644 --- a/src/main/scala/com/yotpo/metorikku/configuration/job/ConfigurationParser.scala +++ b/src/main/scala/com/yotpo/metorikku/configuration/job/ConfigurationParser.scala @@ -1,7 +1,5 @@ package com.yotpo.metorikku.configuration.job -import java.nio.file.{Files, Paths} - import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule import com.yotpo.metorikku.exceptions.{MetorikkuException, MetorikkuInvalidMetricFileException} @@ -22,14 +20,6 @@ object ConfigurationParser { opt[String]('c', "config") .text("Path to the job config file (YAML/JSON)") .action((x, c) => c.copy(filename = Option(x))) - .validate(x => { - if (Files.exists(Paths.get(x))) { - success - } - else { - failure("Supplied file not found") - } - }) help("help") text "use command line arguments to specify the configuration file path or content" } diff --git a/src/main/scala/com/yotpo/metorikku/configuration/metric/ConfigurationParser.scala b/src/main/scala/com/yotpo/metorikku/configuration/metric/ConfigurationParser.scala index 87b6c12f9..ed095096c 100644 --- a/src/main/scala/com/yotpo/metorikku/configuration/metric/ConfigurationParser.scala +++ b/src/main/scala/com/yotpo/metorikku/configuration/metric/ConfigurationParser.scala @@ -1,6 +1,6 @@ package com.yotpo.metorikku.configuration.metric -import java.io.File +import java.io.{File, FileNotFoundException} import com.fasterxml.jackson.module.scala.DefaultScalaModule import com.yotpo.metorikku.exceptions.MetorikkuInvalidMetricFileException @@ -20,15 +20,20 @@ object ConfigurationParser { validExtensions.contains(extension) } - def parse(path: File): Metric = { - val fileName: String = path.getName - val metricDir: File = path.getParentFile + def parse(path: String): Metric = { + val hadoopPath = FileUtils.getHadoopPath(path) + val fileName = hadoopPath.getName + val metricDir = FileUtils.isLocalFile(path) match { + case true => Option(new File(path).getParentFile) + case false => None + } log.info(s"Initializing Metric file $fileName") try { - val metricConfig = parseFile(path.getPath) + val metricConfig = parseFile(path) Metric(metricConfig, metricDir, FilenameUtils.removeExtension(fileName)) } catch { + case e: FileNotFoundException => throw e case e: Exception => throw MetorikkuInvalidMetricFileException(s"Failed to parse metric file $fileName", e) } } diff --git a/src/main/scala/com/yotpo/metorikku/metric/Metric.scala b/src/main/scala/com/yotpo/metorikku/metric/Metric.scala index bb19afe3d..59f6ca70b 100644 --- a/src/main/scala/com/yotpo/metorikku/metric/Metric.scala +++ b/src/main/scala/com/yotpo/metorikku/metric/Metric.scala @@ -1,26 +1,22 @@ package com.yotpo.metorikku.metric import java.io.File -import java.util.concurrent.TimeUnit import com.yotpo.metorikku.Job import com.yotpo.metorikku.configuration.job.Streaming import com.yotpo.metorikku.configuration.metric.{Configuration, Output} -import com.yotpo.metorikku.configuration.metric.OutputType.OutputType import com.yotpo.metorikku.exceptions.{MetorikkuFailedStepException, MetorikkuWriteFailedException} import com.yotpo.metorikku.instrumentation.InstrumentationProvider import com.yotpo.metorikku.output.{Writer, WriterFactory} import org.apache.log4j.LogManager -import org.apache.spark.sql.types.TimestampType -import org.apache.spark.sql.{DataFrame, Dataset} -import org.apache.spark.streaming.Seconds +import org.apache.spark.sql.DataFrame import scala.collection.mutable import scala.collection.mutable.ListBuffer case class StreamingWritingConfiguration(dataFrame: DataFrame, outputConfig: Output, writers: ListBuffer[Writer] = ListBuffer.empty) case class StreamingWriting(streamingWritingConfiguration: StreamingWritingConfiguration) -case class Metric(configuration: Configuration, metricDir: File, metricName: String) { +case class Metric(configuration: Configuration, metricDir: Option[File], metricName: String) { val log = LogManager.getLogger(this.getClass) def calculate(job: Job): Unit = { diff --git a/src/main/scala/com/yotpo/metorikku/metric/MetricSet.scala b/src/main/scala/com/yotpo/metorikku/metric/MetricSet.scala index 9be96116d..3b8dd9872 100644 --- a/src/main/scala/com/yotpo/metorikku/metric/MetricSet.scala +++ b/src/main/scala/com/yotpo/metorikku/metric/MetricSet.scala @@ -26,8 +26,14 @@ class MetricSet(metricSet: String, write: Boolean = true) { def parseMetrics(metricSet: String): Seq[Metric] = { log.info(s"Starting to parse metricSet") - val metricsToCalculate = FileUtils.getListOfFiles(metricSet) - metricsToCalculate.filter(ConfigurationParser.isValidFile(_)).map(ConfigurationParser.parse(_)) + + FileUtils.isLocalDirectory(metricSet) match { + case true => { + val metricsToCalculate = FileUtils.getListOfLocalFiles(metricSet) + metricsToCalculate.filter(ConfigurationParser.isValidFile(_)).map(f => ConfigurationParser.parse(f.getPath)) + } + case false => Seq(ConfigurationParser.parse(metricSet)) + } } def run(job: Job) { diff --git a/src/main/scala/com/yotpo/metorikku/metric/StepActionFactory.scala b/src/main/scala/com/yotpo/metorikku/metric/StepActionFactory.scala index 719e5e639..e8886bbde 100644 --- a/src/main/scala/com/yotpo/metorikku/metric/StepActionFactory.scala +++ b/src/main/scala/com/yotpo/metorikku/metric/StepActionFactory.scala @@ -9,7 +9,7 @@ import com.yotpo.metorikku.metric.stepActions.Code import com.yotpo.metorikku.utils.FileUtils object StepFactory { - def getStepAction(configuration: Step, metricDir: File, metricName: String, + def getStepAction(configuration: Step, metricDir: Option[File], metricName: String, showPreviewLines: Int, cacheOnPreview: Option[Boolean], showQuery: Option[Boolean]): StepAction[_] = { configuration.sql match { @@ -17,8 +17,12 @@ object StepFactory { case None => { configuration.file match { case Some(filePath) => + val path = metricDir match { + case Some(dir) => new File(dir, filePath).getPath + case _ => filePath + } Sql( - FileUtils.readConfigurationFile(new File(metricDir, filePath).getPath), + FileUtils.readConfigurationFile(path), configuration.dataFrameName, showPreviewLines, cacheOnPreview, showQuery ) case None => { diff --git a/src/main/scala/com/yotpo/metorikku/utils/FileUtils.scala b/src/main/scala/com/yotpo/metorikku/utils/FileUtils.scala index 7b3588068..29b2fd37b 100644 --- a/src/main/scala/com/yotpo/metorikku/utils/FileUtils.scala +++ b/src/main/scala/com/yotpo/metorikku/utils/FileUtils.scala @@ -1,24 +1,29 @@ package com.yotpo.metorikku.utils import java.io.{BufferedReader, File, FileNotFoundException, InputStreamReader} -import java.net.URI import java.util.stream.Collectors import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} import com.fasterxml.jackson.dataformat.yaml.YAMLFactory -import com.yotpo.metorikku.exceptions.MetorikkuException import org.apache.commons.io.FilenameUtils import org.apache.commons.text.StringSubstitutor -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FSDataInputStream, FileSystem, Path} import org.apache.spark.sql.SparkSession -import org.json4s.DefaultFormats -import org.json4s.native.JsonMethods -import scala.io.Source import scala.collection.JavaConverters._ +case class HadoopPath(path: Path, fs: FileSystem) { + def open: FSDataInputStream = { + fs.open(path) + } + + def getName: String = { + path.getName + } +} + object FileUtils { - def getListOfFiles(dir: String): List[File] = { + def getListOfLocalFiles(dir: String): List[File] = { val d = new File(dir) if (d.isDirectory) { d.listFiles.filter(_.isFile).toList @@ -29,23 +34,6 @@ object FileUtils { } } - def jsonFileToObject[T: Manifest](file: File): T = { - implicit val formats = DefaultFormats - val jsonString = scala.io.Source.fromFile(file).mkString - - try { - val json = JsonMethods.parse(jsonString) - json.extract[T] - } catch { - case cast: ClassCastException => throw MetorikkuException(s"Failed to cast json file " + file, cast) - case other: Throwable => throw other - } - } - - def getContentFromFileAsString(file: File): String = { - scala.io.Source.fromFile(file).mkString // //By scala.io. on read spark fail with legit error when path does not exists - } - def getObjectMapperByExtension(extension: String): Option[ObjectMapper] = { extension match { case "json" => Option(new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)) @@ -69,13 +57,31 @@ object FileUtils { StringSubstitutor.replace(fileContents, envAndSystemProperties.asJava) } - def readFileWithHadoop(path: String): String = { + + + def getHadoopPath(path: String): HadoopPath = { val hadoopConf = SparkSession.builder().getOrCreate().sessionState.newHadoopConf() val file = new Path(path) + val fs = file.getFileSystem(hadoopConf) - val fsFile = fs.open(file) + HadoopPath(file, fs) + } + + def readFileWithHadoop(path: String): String = { + val hadoopPath = getHadoopPath(path) + + val fsFile = hadoopPath.open + val reader = new BufferedReader(new InputStreamReader(fsFile)) reader.lines.collect(Collectors.joining("\n")) } + + def isLocalDirectory(path: String): Boolean = { + new File(path).isDirectory + } + + def isLocalFile(path: String): Boolean = { + new File(path).isFile + } } diff --git a/src/test/scala/com/yotpo/metorikku/test/MetorikkuTest.scala b/src/test/scala/com/yotpo/metorikku/test/MetorikkuTest.scala index b49cc3ea1..360cd3717 100644 --- a/src/test/scala/com/yotpo/metorikku/test/MetorikkuTest.scala +++ b/src/test/scala/com/yotpo/metorikku/test/MetorikkuTest.scala @@ -42,7 +42,7 @@ class MetorikkuTest extends FunSuite with BeforeAndAfterAll { val thrown = intercept[FileNotFoundException] { Metorikku.main(Array("-c", "src/test/scala/com/yotpo/metorikku/test/metorikku-test-config-invalid-metrics.yaml")) } - assert(thrown.getMessage.startsWith("No Files to Run")) + assert(thrown.getMessage.endsWith("does not exist")) }