Skip to content

Commit

Permalink
Revert "Data-3002 support empty folder protection (#317)" (#324)
Browse files Browse the repository at this point in the history
This reverts commit f077829.
  • Loading branch information
liebstein authored May 19, 2020
1 parent 59e0867 commit aa2737c
Show file tree
Hide file tree
Showing 8 changed files with 2 additions and 93 deletions.
2 changes: 0 additions & 2 deletions config/metric_config_sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ output:
path: file.parquet
# Optional: If enabled, adding current timestamp suffix to path of the file
createUniquePath: true
# Optional: Abort writing an empty df to output OR updating external table in Hive with empty parquets.
protectFromEmptyOutput: true
# partition by the following columns
partitionBy:
- col1
Expand Down
1 change: 0 additions & 1 deletion examples/hive/movies_metric.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ output:
saveMode: Overwrite
path: comedyFantasyRomanceSciFiMovies.parquet
tableName: testMetorikkuHiveExternal
protectFromEmptyOutput: true
- dataFrameName: comedyFantasyRomanceSciFiMoviesAddColumn
outputType: Parquet
outputOptions:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ case class Output(name: Option[String],
reportLagTimeColumnUnits: Option[String],
repartition: Option[Int],
coalesce: Option[Boolean],
protectFromEmptyOutput: Option[Boolean],
outputOptions: Map[String, Any])

object OutputType extends Enumeration {
Expand Down
11 changes: 0 additions & 11 deletions src/main/scala/com/yotpo/metorikku/metric/Metric.scala
Original file line number Diff line number Diff line change
Expand Up @@ -140,16 +140,6 @@ case class Metric(configuration: Configuration, metricDir: File, metricName: Str
val dataFrameName = outputConfig.dataFrameName
val dataFrame = repartition(outputConfig, job.sparkSession.table(dataFrameName))

outputConfig.outputOptions.get("protectFromEmptyOutput") match {
case Some(true) => {
if (dataFrame.head(1).isEmpty) {
throw MetorikkuWriteFailedException(s"Abort writing dataframe: ${dataFrameName}, " +
s"empty dataframe output is not allowed according to configuration")
}
}
case _ =>
}

if (dataFrame.isStreaming) {
val streamingWriterConfig = streamingWriterList.getOrElse(dataFrameName, StreamingWriting(StreamingWritingConfiguration(dataFrame, outputConfig)))
streamingWriterConfig.streamingWritingConfiguration.writers += writer
Expand All @@ -169,4 +159,3 @@ case class Metric(configuration: Configuration, metricDir: File, metricName: Str
}
}
}

Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package com.yotpo.metorikku.output.writers.file

import java.text.SimpleDateFormat

import com.yotpo.metorikku.configuration.job.Streaming
import com.yotpo.metorikku.configuration.job.output.File
import com.yotpo.metorikku.exceptions.MetorikkuWriteFailedException
import com.yotpo.metorikku.output.Writer
import org.apache.log4j.LogManager
import org.apache.spark.sql.{DataFrame, DataFrameWriter, SparkSession}
import org.joda.time.DateTime

class FileOutputWriter(props: Map[String, Object], outputFile: Option[File]) extends Writer {
val log = LogManager.getLogger(this.getClass)
Expand All @@ -19,7 +20,6 @@ class FileOutputWriter(props: Map[String, Object], outputFile: Option[File]) ext
tableName: Option[String],
format: Option[String],
alwaysUpdateSchemaInCatalog: Boolean,
protectFromEmptyOutput: Option[Boolean],
extraOptions: Option[Map[String, String]])

val fileOutputProperties = FileOutputProperties(
Expand All @@ -31,7 +31,6 @@ class FileOutputWriter(props: Map[String, Object], outputFile: Option[File]) ext
props.get("tableName").asInstanceOf[Option[String]],
props.get("format").asInstanceOf[Option[String]],
props.get("alwaysUpdateSchemaInCatalog").asInstanceOf[Option[Boolean]].getOrElse(true),
props.get("protectFromEmptyOutput").asInstanceOf[Option[Boolean]],
props.get("extraOptions").asInstanceOf[Option[Map[String, String]]])


Expand Down Expand Up @@ -113,8 +112,6 @@ class FileOutputWriter(props: Map[String, Object], outputFile: Option[File]) ext

writer.save()

protectFromEmptyOutput(ss, fileOutputProperties.protectFromEmptyOutput, fileOutputProperties.format, filePath, tableName)

catalog.tableExists(tableName) match {
// Quick overwrite (using alter table + refresh instead of drop + write + refresh)
case true => {
Expand Down Expand Up @@ -203,26 +200,6 @@ class FileOutputWriter(props: Map[String, Object], outputFile: Option[File]) ext
case _=> Option(file.dir + "/" + path)
}
}

def protectFromEmptyOutput(ss: SparkSession, protectFromEmptyOutput: Option[Boolean], format: Option[String],
path: String, tableName: String): Unit = {
fileOutputProperties.protectFromEmptyOutput match {
case Some(true) => {
log.info(s"Applying protection from updating Hive table: ${tableName} with empty parquets")
val dfFromFile = fileOutputProperties.format match {
case Some(format) => {
ss.read.format(format.toLowerCase).load(path)
}
case _=> ss.read.parquet(path)
}
if (dfFromFile.head(1).isEmpty) {
throw MetorikkuWriteFailedException(s"Aborting Hive external table ${tableName} update -> data files are empty!")
}}
case _ =>
}
}
}




This file was deleted.

7 changes: 0 additions & 7 deletions src/test/scala/com/yotpo/metorikku/test/MetorikkuTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -565,14 +565,7 @@ class MetorikkuTest extends FunSuite with BeforeAndAfterAll {
}
}

test("Test Metorikku should fail on writing an empty df, when protectFromEmptyOutput is enabled") {

val thrown = intercept[Exception] {
Metorikku.main(Array(
"-c", "src/test/scala/com/yotpo/metorikku/test/metorikku-test-empty-output-protection-config.yaml"))
}
assert(thrown.getMessage.startsWith("Abort writing dataframe: resultDfEmpty, empty dataframe output is not allowed according to configuration"))
}

private def assertMismatchExpected(definedKeys: List[String], thrownMsg: String, expectedRow: Map[String, Any], rowIndex: Int, keyColumns: KeyColumns) = {
assertMismatchByType(ResultsType.expected, definedKeys, thrownMsg, expectedRow, 1, 0, rowIndex, keyColumns)
Expand Down

This file was deleted.

0 comments on commit aa2737c

Please sign in to comment.