From aa2737c8787c77128d7d7ebce877900287bdc14e Mon Sep 17 00:00:00 2001 From: Assaf Liebstein Date: Tue, 19 May 2020 09:55:26 +0300 Subject: [PATCH] Revert "Data-3002 support empty folder protection (#317)" (#324) This reverts commit f077829d1b1a6ca3fed7e1f477c4912599118900. --- config/metric_config_sample.yaml | 2 - examples/hive/movies_metric.yaml | 1 - .../configuration/metric/Output.scala | 1 - .../com/yotpo/metorikku/metric/Metric.scala | 11 ------ .../writers/file/FileOutputWriter.scala | 27 +------------- ...u-tester-test-empty-output-protection.yaml | 37 ------------------- .../yotpo/metorikku/test/MetorikkuTest.scala | 7 ---- ...u-test-empty-output-protection-config.yaml | 9 ----- 8 files changed, 2 insertions(+), 93 deletions(-) delete mode 100644 src/test/configurations/metorikku-tester-test-empty-output-protection.yaml delete mode 100644 src/test/scala/com/yotpo/metorikku/test/metorikku-test-empty-output-protection-config.yaml diff --git a/config/metric_config_sample.yaml b/config/metric_config_sample.yaml index bca1fe288..cf3ca3b56 100644 --- a/config/metric_config_sample.yaml +++ b/config/metric_config_sample.yaml @@ -38,8 +38,6 @@ output: path: file.parquet # Optional: If enabled, adding current timestamp suffix to path of the file createUniquePath: true - # Optional: Abort writing an empty df to output OR updating external table in Hive with empty parquets. - protectFromEmptyOutput: true # partition by the following columns partitionBy: - col1 diff --git a/examples/hive/movies_metric.yaml b/examples/hive/movies_metric.yaml index e124d5a68..442f09d0f 100644 --- a/examples/hive/movies_metric.yaml +++ b/examples/hive/movies_metric.yaml @@ -15,7 +15,6 @@ output: saveMode: Overwrite path: comedyFantasyRomanceSciFiMovies.parquet tableName: testMetorikkuHiveExternal - protectFromEmptyOutput: true - dataFrameName: comedyFantasyRomanceSciFiMoviesAddColumn outputType: Parquet outputOptions: diff --git a/src/main/scala/com/yotpo/metorikku/configuration/metric/Output.scala b/src/main/scala/com/yotpo/metorikku/configuration/metric/Output.scala index 7c214d646..fcd4eb723 100644 --- a/src/main/scala/com/yotpo/metorikku/configuration/metric/Output.scala +++ b/src/main/scala/com/yotpo/metorikku/configuration/metric/Output.scala @@ -11,7 +11,6 @@ case class Output(name: Option[String], reportLagTimeColumnUnits: Option[String], repartition: Option[Int], coalesce: Option[Boolean], - protectFromEmptyOutput: Option[Boolean], outputOptions: Map[String, Any]) object OutputType extends Enumeration { diff --git a/src/main/scala/com/yotpo/metorikku/metric/Metric.scala b/src/main/scala/com/yotpo/metorikku/metric/Metric.scala index 78faaeb6b..8b2a01052 100644 --- a/src/main/scala/com/yotpo/metorikku/metric/Metric.scala +++ b/src/main/scala/com/yotpo/metorikku/metric/Metric.scala @@ -140,16 +140,6 @@ case class Metric(configuration: Configuration, metricDir: File, metricName: Str val dataFrameName = outputConfig.dataFrameName val dataFrame = repartition(outputConfig, job.sparkSession.table(dataFrameName)) - outputConfig.outputOptions.get("protectFromEmptyOutput") match { - case Some(true) => { - if (dataFrame.head(1).isEmpty) { - throw MetorikkuWriteFailedException(s"Abort writing dataframe: ${dataFrameName}, " + - s"empty dataframe output is not allowed according to configuration") - } - } - case _ => - } - if (dataFrame.isStreaming) { val streamingWriterConfig = streamingWriterList.getOrElse(dataFrameName, StreamingWriting(StreamingWritingConfiguration(dataFrame, outputConfig))) streamingWriterConfig.streamingWritingConfiguration.writers += writer @@ -169,4 +159,3 @@ case class Metric(configuration: Configuration, metricDir: File, metricName: Str } } } - diff --git a/src/main/scala/com/yotpo/metorikku/output/writers/file/FileOutputWriter.scala b/src/main/scala/com/yotpo/metorikku/output/writers/file/FileOutputWriter.scala index a58e75774..38d4bc81c 100644 --- a/src/main/scala/com/yotpo/metorikku/output/writers/file/FileOutputWriter.scala +++ b/src/main/scala/com/yotpo/metorikku/output/writers/file/FileOutputWriter.scala @@ -1,12 +1,13 @@ package com.yotpo.metorikku.output.writers.file +import java.text.SimpleDateFormat import com.yotpo.metorikku.configuration.job.Streaming import com.yotpo.metorikku.configuration.job.output.File -import com.yotpo.metorikku.exceptions.MetorikkuWriteFailedException import com.yotpo.metorikku.output.Writer import org.apache.log4j.LogManager import org.apache.spark.sql.{DataFrame, DataFrameWriter, SparkSession} +import org.joda.time.DateTime class FileOutputWriter(props: Map[String, Object], outputFile: Option[File]) extends Writer { val log = LogManager.getLogger(this.getClass) @@ -19,7 +20,6 @@ class FileOutputWriter(props: Map[String, Object], outputFile: Option[File]) ext tableName: Option[String], format: Option[String], alwaysUpdateSchemaInCatalog: Boolean, - protectFromEmptyOutput: Option[Boolean], extraOptions: Option[Map[String, String]]) val fileOutputProperties = FileOutputProperties( @@ -31,7 +31,6 @@ class FileOutputWriter(props: Map[String, Object], outputFile: Option[File]) ext props.get("tableName").asInstanceOf[Option[String]], props.get("format").asInstanceOf[Option[String]], props.get("alwaysUpdateSchemaInCatalog").asInstanceOf[Option[Boolean]].getOrElse(true), - props.get("protectFromEmptyOutput").asInstanceOf[Option[Boolean]], props.get("extraOptions").asInstanceOf[Option[Map[String, String]]]) @@ -113,8 +112,6 @@ class FileOutputWriter(props: Map[String, Object], outputFile: Option[File]) ext writer.save() - protectFromEmptyOutput(ss, fileOutputProperties.protectFromEmptyOutput, fileOutputProperties.format, filePath, tableName) - catalog.tableExists(tableName) match { // Quick overwrite (using alter table + refresh instead of drop + write + refresh) case true => { @@ -203,26 +200,6 @@ class FileOutputWriter(props: Map[String, Object], outputFile: Option[File]) ext case _=> Option(file.dir + "/" + path) } } - - def protectFromEmptyOutput(ss: SparkSession, protectFromEmptyOutput: Option[Boolean], format: Option[String], - path: String, tableName: String): Unit = { - fileOutputProperties.protectFromEmptyOutput match { - case Some(true) => { - log.info(s"Applying protection from updating Hive table: ${tableName} with empty parquets") - val dfFromFile = fileOutputProperties.format match { - case Some(format) => { - ss.read.format(format.toLowerCase).load(path) - } - case _=> ss.read.parquet(path) - } - if (dfFromFile.head(1).isEmpty) { - throw MetorikkuWriteFailedException(s"Aborting Hive external table ${tableName} update -> data files are empty!") - }} - case _ => - } - } } - - diff --git a/src/test/configurations/metorikku-tester-test-empty-output-protection.yaml b/src/test/configurations/metorikku-tester-test-empty-output-protection.yaml deleted file mode 100644 index ef1d6434d..000000000 --- a/src/test/configurations/metorikku-tester-test-empty-output-protection.yaml +++ /dev/null @@ -1,37 +0,0 @@ -steps: - - dataFrameName: resultDfEmpty - sql: - SELECT * - FROM reviews - WHERE id=101 - - dataFrameName: resultDf - sql: - SELECT * - FROM reviews - -output: - - dataFrameName: resultDf - outputType: CSV - outputOptions: - saveMode: Overwrite - path: resultDfCsv.csv - protectFromEmptyOutput: true - - dataFrameName: resultDf - outputType: JSON - outputOptions: - saveMode: Overwrite - path: resultDfJson.json - protectFromEmptyOutput: true - - dataFrameName: resultDf - outputType: Parquet - outputOptions: - saveMode: Overwrite - path: resultDfParquet.parquet - protectFromEmptyOutput: true - - dataFrameName: resultDfEmpty - outputType: CSV - outputOptions: - saveMode: Overwrite - path: emptyResult.csv - protectFromEmptyOutput: true - diff --git a/src/test/scala/com/yotpo/metorikku/test/MetorikkuTest.scala b/src/test/scala/com/yotpo/metorikku/test/MetorikkuTest.scala index bcc003962..f0eda59e0 100644 --- a/src/test/scala/com/yotpo/metorikku/test/MetorikkuTest.scala +++ b/src/test/scala/com/yotpo/metorikku/test/MetorikkuTest.scala @@ -565,14 +565,7 @@ class MetorikkuTest extends FunSuite with BeforeAndAfterAll { } } - test("Test Metorikku should fail on writing an empty df, when protectFromEmptyOutput is enabled") { - val thrown = intercept[Exception] { - Metorikku.main(Array( - "-c", "src/test/scala/com/yotpo/metorikku/test/metorikku-test-empty-output-protection-config.yaml")) - } - assert(thrown.getMessage.startsWith("Abort writing dataframe: resultDfEmpty, empty dataframe output is not allowed according to configuration")) - } private def assertMismatchExpected(definedKeys: List[String], thrownMsg: String, expectedRow: Map[String, Any], rowIndex: Int, keyColumns: KeyColumns) = { assertMismatchByType(ResultsType.expected, definedKeys, thrownMsg, expectedRow, 1, 0, rowIndex, keyColumns) diff --git a/src/test/scala/com/yotpo/metorikku/test/metorikku-test-empty-output-protection-config.yaml b/src/test/scala/com/yotpo/metorikku/test/metorikku-test-empty-output-protection-config.yaml deleted file mode 100644 index a255eaeb2..000000000 --- a/src/test/scala/com/yotpo/metorikku/test/metorikku-test-empty-output-protection-config.yaml +++ /dev/null @@ -1,9 +0,0 @@ -metrics: - - src/test/configurations/metorikku-tester-test-empty-output-protection.yaml -inputs: - reviews: - file: - path: src/test/configurations/mocks/reviews_complex.csv -output: - file: - dir: src/test/out/