Skip to content

Commit

Permalink
un_revert_support_empty_folder_protection (#327)
Browse files Browse the repository at this point in the history
  • Loading branch information
Irenez753 authored May 19, 2020
1 parent e8eda2d commit b45ec72
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 0 deletions.
2 changes: 2 additions & 0 deletions config/metric_config_sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ output:
path: file.parquet
# Optional: If enabled, adding current timestamp suffix to path of the file
createUniquePath: true
# Optional: Abort writing an empty df to output OR updating external table in Hive with empty parquets.
protectFromEmptyOutput: true
# partition by the following columns
partitionBy:
- col1
Expand Down
1 change: 1 addition & 0 deletions examples/hive/movies_metric.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ output:
saveMode: Overwrite
path: comedyFantasyRomanceSciFiMovies.parquet
tableName: testMetorikkuHiveExternal
protectFromEmptyOutput: true
- dataFrameName: comedyFantasyRomanceSciFiMoviesAddColumn
outputType: Parquet
outputOptions:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ case class Output(name: Option[String],
reportLagTimeColumnUnits: Option[String],
repartition: Option[Int],
coalesce: Option[Boolean],
protectFromEmptyOutput: Option[Boolean],
outputOptions: Map[String, Any])

object OutputType extends Enumeration {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import java.text.SimpleDateFormat

import com.yotpo.metorikku.configuration.job.Streaming
import com.yotpo.metorikku.configuration.job.output.File
import com.yotpo.metorikku.exceptions.MetorikkuWriteFailedException
import com.yotpo.metorikku.output.Writer
import org.apache.log4j.LogManager
import org.apache.spark.sql.{DataFrame, DataFrameWriter, SparkSession}
Expand All @@ -20,6 +21,7 @@ class FileOutputWriter(props: Map[String, Object], outputFile: Option[File]) ext
tableName: Option[String],
format: Option[String],
alwaysUpdateSchemaInCatalog: Boolean,
protectFromEmptyOutput: Option[Boolean],
extraOptions: Option[Map[String, String]])

val fileOutputProperties = FileOutputProperties(
Expand All @@ -31,6 +33,7 @@ class FileOutputWriter(props: Map[String, Object], outputFile: Option[File]) ext
props.get("tableName").asInstanceOf[Option[String]],
props.get("format").asInstanceOf[Option[String]],
props.get("alwaysUpdateSchemaInCatalog").asInstanceOf[Option[Boolean]].getOrElse(true),
props.get("protectFromEmptyOutput").asInstanceOf[Option[Boolean]],
props.get("extraOptions").asInstanceOf[Option[Map[String, String]]])


Expand Down Expand Up @@ -112,6 +115,8 @@ class FileOutputWriter(props: Map[String, Object], outputFile: Option[File]) ext

writer.save()

protectFromEmptyOutput(ss, fileOutputProperties.protectFromEmptyOutput, fileOutputProperties.format, filePath, tableName)

catalog.tableExists(tableName) match {
// Quick overwrite (using alter table + refresh instead of drop + write + refresh)
case true => {
Expand Down Expand Up @@ -200,6 +205,24 @@ class FileOutputWriter(props: Map[String, Object], outputFile: Option[File]) ext
case _=> Option(file.dir + "/" + path)
}
}

def protectFromEmptyOutput(ss: SparkSession, protectFromEmptyOutput: Option[Boolean], format: Option[String],
path: String, tableName: String): Unit = {
fileOutputProperties.protectFromEmptyOutput match {
case Some(true) => {
log.info(s"Applying protection from updating Hive table: ${tableName} with empty parquets")
val dfFromFile = fileOutputProperties.format match {
case Some(format) => {
ss.read.format(format.toLowerCase).load(path)
}
case _=> ss.read.parquet(path)
}
if (dfFromFile.head(1).isEmpty) {
throw MetorikkuWriteFailedException(s"Aborting Hive external table ${tableName} update -> data files are empty!")
}}
case _ =>
}
}
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
steps:
- dataFrameName: resultDfEmpty
sql:
SELECT *
FROM reviews
WHERE id=101
- dataFrameName: resultDf
sql:
SELECT *
FROM reviews

output:
- dataFrameName: resultDf
outputType: CSV
outputOptions:
saveMode: Overwrite
path: resultDfCsv.csv
protectFromEmptyOutput: true
- dataFrameName: resultDf
outputType: JSON
outputOptions:
saveMode: Overwrite
path: resultDfJson.json
protectFromEmptyOutput: true
- dataFrameName: resultDf
outputType: Parquet
outputOptions:
saveMode: Overwrite
path: resultDfParquet.parquet
protectFromEmptyOutput: true
- dataFrameName: resultDfEmpty
outputType: CSV
outputOptions:
saveMode: Overwrite
path: emptyResult.csv
protectFromEmptyOutput: true
8 changes: 8 additions & 0 deletions src/test/scala/com/yotpo/metorikku/test/MetorikkuTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,14 @@ class MetorikkuTest extends FunSuite with BeforeAndAfterAll {
}
}

test("Test Metorikku should fail on writing an empty df, when protectFromEmptyOutput is enabled") {
val thrown = intercept[Exception] {
Metorikku.main(Array(
"-c", "src/test/scala/com/yotpo/metorikku/test/metorikku-test-empty-output-protection-config.yaml"))
}
assert(thrown.getMessage.startsWith("Abort writing dataframe: resultDfEmpty, empty dataframe output is not allowed according to configuration"))
}



private def assertMismatchExpected(definedKeys: List[String], thrownMsg: String, expectedRow: Map[String, Any], rowIndex: Int, keyColumns: KeyColumns) = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
metrics:
- src/test/configurations/metorikku-tester-test-empty-output-protection.yaml
inputs:
reviews:
file:
path: src/test/configurations/mocks/reviews_complex.csv
output:
file:
dir: src/test/out/

0 comments on commit b45ec72

Please sign in to comment.