From ef770ef71f28efbf05f9912876dc3dfd3f475548 Mon Sep 17 00:00:00 2001 From: Irena Reznikov Date: Wed, 13 May 2020 17:05:06 +0300 Subject: [PATCH] feat(DATA-2956_influx_lag_reporter) (#308) * feat(DATA-2956_influx_lag_reporter) * from map to string type * influx_lag_reporter-fix * new case class for StreamingWriting * suuport lag units * using TimeUnit * unitTest + getMaxDfTime function * removed outside from Metric class * move MetricReporting to new file * logging --- config/metric_config_sample.yaml | 6 ++ .../hudi/ratings_no_partitions_metric.yaml | 4 + .../configuration/metric/Output.scala | 3 + .../com/yotpo/metorikku/metric/Metric.scala | 54 +++++++----- .../metorikku/metric/MetricReporting.scala | 46 ++++++++++ .../configurations/mocks/ratings_time.csv | 21 +++++ .../metric/test/MetricReporterTester.scala | 87 +++++++++++++++++++ 7 files changed, 201 insertions(+), 20 deletions(-) create mode 100644 src/main/scala/com/yotpo/metorikku/metric/MetricReporting.scala create mode 100644 src/test/configurations/mocks/ratings_time.csv create mode 100644 src/test/scala/com/yotpo/metorikku/metric/test/MetricReporterTester.scala diff --git a/config/metric_config_sample.yaml b/config/metric_config_sample.yaml index 7b553c35f..d9864874c 100644 --- a/config/metric_config_sample.yaml +++ b/config/metric_config_sample.yaml @@ -148,6 +148,12 @@ output: extraOptions: es.index.auto.create: false - outputType: Hudi + # Optional: If enabled, lag between current timestamp and max dataframe's time column (configurable) will be reported. + reportLag: true + # Optional: Column name that will be used for calculating the lag, supported types: (Long, Timestamp, Integer), must be provided if "reportLag" is enabled. + reportLagTimeColumn: Option[String] + # Optional: Represents time durations of the "reportLagTimeColumn" (Long or Integer), at a given unit of granularity, optional values: [MILLISECONDS (Default), SECONDS] + reportLagTimeColumnUnits: Option[String] outputOptions: path: path # The key to use for upserts diff --git a/examples/hudi/ratings_no_partitions_metric.yaml b/examples/hudi/ratings_no_partitions_metric.yaml index 4e6fd8a8d..f6f53bcce 100644 --- a/examples/hudi/ratings_no_partitions_metric.yaml +++ b/examples/hudi/ratings_no_partitions_metric.yaml @@ -3,11 +3,15 @@ steps: sql: SELECT *, userId AS hoodie_key, + CAST(timestamp AS LONG) AS lag_timestamp, 'no_partitions' as test_partitions FROM ratings output: - dataFrameName: hive_sync_test outputType: Hudi + reportLag: true + reportLagTimeColumn: lag_timestamp + reportLagTimeColumnUnits: SECONDS outputOptions: path: table_view_manual_hive_sync_no_partitions keyColumn: hoodie_key diff --git a/src/main/scala/com/yotpo/metorikku/configuration/metric/Output.scala b/src/main/scala/com/yotpo/metorikku/configuration/metric/Output.scala index 2a0deeb0b..fcd4eb723 100644 --- a/src/main/scala/com/yotpo/metorikku/configuration/metric/Output.scala +++ b/src/main/scala/com/yotpo/metorikku/configuration/metric/Output.scala @@ -6,6 +6,9 @@ import com.fasterxml.jackson.module.scala.JsonScalaEnumeration case class Output(name: Option[String], dataFrameName: String, @JsonScalaEnumeration(classOf[OutputTypeReference]) outputType: OutputType.OutputType, + reportLag: Option[Boolean], + reportLagTimeColumn: Option[String], + reportLagTimeColumnUnits: Option[String], repartition: Option[Int], coalesce: Option[Boolean], outputOptions: Map[String, Any]) diff --git a/src/main/scala/com/yotpo/metorikku/metric/Metric.scala b/src/main/scala/com/yotpo/metorikku/metric/Metric.scala index 3057cef64..8b2a01052 100644 --- a/src/main/scala/com/yotpo/metorikku/metric/Metric.scala +++ b/src/main/scala/com/yotpo/metorikku/metric/Metric.scala @@ -1,6 +1,7 @@ package com.yotpo.metorikku.metric import java.io.File +import java.util.concurrent.TimeUnit import com.yotpo.metorikku.Job import com.yotpo.metorikku.configuration.job.Streaming @@ -10,13 +11,15 @@ import com.yotpo.metorikku.exceptions.{MetorikkuFailedStepException, MetorikkuWr import com.yotpo.metorikku.instrumentation.InstrumentationProvider import com.yotpo.metorikku.output.{Writer, WriterFactory} import org.apache.log4j.LogManager -import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.types.TimestampType +import org.apache.spark.sql.{DataFrame, Dataset} +import org.apache.spark.streaming.Seconds import scala.collection.mutable import scala.collection.mutable.ListBuffer -case class StreamingWritingConfiguration(dataFrame: DataFrame, writers: ListBuffer[Writer] = ListBuffer.empty) - +case class StreamingWritingConfiguration(dataFrame: DataFrame, outputConfig: Output, writers: ListBuffer[Writer] = ListBuffer.empty) +case class StreamingWriting(streamingWritingConfiguration: StreamingWritingConfiguration) case class Metric(configuration: Configuration, metricDir: File, metricName: String) { val log = LogManager.getLogger(this.getClass) @@ -43,21 +46,27 @@ case class Metric(configuration: Configuration, metricDir: File, metricName: Str } } - private def writeStream(dataFrameName: String, writerConfig: StreamingWritingConfiguration, - streamingConfig: Option[Streaming]): Unit = { + private def writeStream(dataFrameName: String, + writerConfig: StreamingWritingConfiguration, + streamingConfig: Option[Streaming], + instrumentationProvider: InstrumentationProvider): Unit = { log.info(s"Starting to write streaming results of ${dataFrameName}") - streamingConfig match { case Some(config) => { val streamWriter = writerConfig.dataFrame.writeStream config.applyOptions(streamWriter) - config.batchMode match { case Some(true) => { val query = streamWriter.foreachBatch((batchDF: DataFrame, _: Long) => { writerConfig.writers.foreach(writer => writer.write(batchDF)) + writerConfig.outputConfig.reportLag match { + case Some(true) => new MetricReporting().reportLagTime(batchDF, writerConfig.outputConfig.reportLagTimeColumn, + writerConfig.outputConfig.reportLagTimeColumnUnits, instrumentationProvider) + case _ => + } }).start() query.awaitTermination() + // Exit this function after streaming is completed return } @@ -79,7 +88,7 @@ case class Metric(configuration: Configuration, metricDir: File, metricName: Str private def writeBatch(dataFrame: DataFrame, dataFrameName: String, writer: Writer, - outputType: OutputType, + outputConfig: Output, instrumentationProvider: InstrumentationProvider, cacheCountOnOutput: Option[Boolean]): Unit = { @@ -90,16 +99,19 @@ case class Metric(configuration: Configuration, metricDir: File, metricName: Str } case _ => 0 } - val tags = Map("metric" -> metricName, "dataframe" -> dataFrameName, "output_type" -> outputType.toString) + val tags = Map("metric" -> metricName, "dataframe" -> dataFrameName, "output_type" -> outputConfig.outputType.toString) instrumentationProvider.count(name="counter", value=dataFrameCount, tags=tags) log.info(s"Starting to Write results of ${dataFrameName}") try { writer.write(dataFrame) - } - catch { + outputConfig.reportLag match { + case Some(true) => new MetricReporting().reportLagTime(dataFrame, outputConfig.reportLagTimeColumn, + outputConfig.reportLagTimeColumnUnits, instrumentationProvider) + case _ => + } } catch { case ex: Exception => { throw MetorikkuWriteFailedException(s"Failed to write dataFrame: " + - s"$dataFrameName to output: ${outputType} on metric: ${metricName}", ex) + s"$dataFrameName to output: ${outputConfig.outputType} on metric: ${metricName}", ex) } } } @@ -117,31 +129,33 @@ case class Metric(configuration: Configuration, metricDir: File, metricName: Str } } + def write(job: Job): Unit = { + configuration.output match { case Some(output) => { - val streamingWriterList: mutable.Map[String, StreamingWritingConfiguration] = mutable.Map() - + val streamingWriterList: mutable.Map[String, StreamingWriting] = mutable.Map() output.foreach(outputConfig => { val writer = WriterFactory.get(outputConfig, metricName, job.config, job) val dataFrameName = outputConfig.dataFrameName val dataFrame = repartition(outputConfig, job.sparkSession.table(dataFrameName)) if (dataFrame.isStreaming) { - val streamingWriterConfig = streamingWriterList.getOrElse(dataFrameName, StreamingWritingConfiguration(dataFrame)) - streamingWriterConfig.writers += writer + val streamingWriterConfig = streamingWriterList.getOrElse(dataFrameName, StreamingWriting(StreamingWritingConfiguration(dataFrame, outputConfig))) + streamingWriterConfig.streamingWritingConfiguration.writers += writer streamingWriterList += (dataFrameName -> streamingWriterConfig) } else { - writeBatch(dataFrame, dataFrameName, writer, - outputConfig.outputType, job.instrumentationClient, job.config.cacheCountOnOutput) + writeBatch(dataFrame, dataFrameName, writer, outputConfig, job.instrumentationClient, + job.config.cacheCountOnOutput) } }) - for ((dataFrameName, config) <- streamingWriterList) writeStream(dataFrameName, config, job.config.streaming) + for ((dataFrameName, streamingConfig) <- streamingWriterList) writeStream(dataFrameName, + streamingConfig.streamingWritingConfiguration, job.config.streaming, job.instrumentationClient) + } case None => } } } - diff --git a/src/main/scala/com/yotpo/metorikku/metric/MetricReporting.scala b/src/main/scala/com/yotpo/metorikku/metric/MetricReporting.scala new file mode 100644 index 000000000..2c2552674 --- /dev/null +++ b/src/main/scala/com/yotpo/metorikku/metric/MetricReporting.scala @@ -0,0 +1,46 @@ +package com.yotpo.metorikku.metric + +import java.util.concurrent.TimeUnit + +import com.yotpo.metorikku.exceptions.MetorikkuWriteFailedException +import com.yotpo.metorikku.instrumentation.InstrumentationProvider +import org.apache.log4j.LogManager +import org.apache.spark.sql.DataFrame + +class MetricReporting { + val log = LogManager.getLogger(this.getClass) + + def getMaxDataframeTime(dataFrame: DataFrame, reportLagTimeColumn: Option[String], + reportLagTimeColumnUnits:Option[String]): Long ={ + reportLagTimeColumn match { + case Some(timeColumn) => { + dataFrame.cache() + try { + reportLagTimeColumnUnits match { + case Some(units) => TimeUnit.valueOf(units) match { + case TimeUnit.SECONDS => TimeUnit.SECONDS.toMillis(dataFrame.agg({timeColumn.toString -> "max"}).collect()(0).getLong(0)) + case TimeUnit.MILLISECONDS => TimeUnit.MILLISECONDS.toMillis(dataFrame.agg({timeColumn.toString -> "max"}).collect()(0).getLong(0)) + } + case _=> dataFrame.agg({timeColumn.toString -> "max"}).collect()(0).getTimestamp(0).getTime() + } + } catch { + case e: ClassCastException => throw new ClassCastException(s"Lag instrumentation column -${timeColumn} " + + s"cannot be cast to spark.sql.Timestamp or spark.sql.Long") + case e: IllegalArgumentException => throw new MetorikkuWriteFailedException( + s"${reportLagTimeColumnUnits} is not a legal argument for units, use one of the following: [SECONDS,MILLISECONDS]") + } + } + case _=> throw MetorikkuWriteFailedException("Failed to report lag time, reportLagTimeColumn is not defined") + } + } + + def reportLagTime(dataFrame: DataFrame, reportLagTimeColumn: Option[String], + reportLagTimeColumnUnits:Option[String], + instrumentationProvider: InstrumentationProvider) : Unit ={ + val maxDataframeTime = getMaxDataframeTime(dataFrame, reportLagTimeColumn, reportLagTimeColumnUnits) + log.info(s"Max column ${reportLagTimeColumn} value is ${maxDataframeTime} for ${dataFrame}") + val lag = System.currentTimeMillis - maxDataframeTime + log.info(s"Reporting lag value: ${lag} for ${dataFrame}") + instrumentationProvider.gauge(name = "lag", lag) + } +} diff --git a/src/test/configurations/mocks/ratings_time.csv b/src/test/configurations/mocks/ratings_time.csv new file mode 100644 index 000000000..78ac94b7d --- /dev/null +++ b/src/test/configurations/mocks/ratings_time.csv @@ -0,0 +1,21 @@ +"userId","movieId","rating","created_at","updated_at" +1,31,2.5,1260759144,"2016-05-05T22:37:36.000" +1,1029,3,1260759179,"2016-05-05T22:43:36.000" +1,1061,3,1260759182,"2016-05-05T22:38:36.000" +1,1129,2,1260759185,"2016-05-05T22:39:36.000" +1,1172,4,1260759205,"2016-05-05T22:30:36.000" +1,1263,2,1260759151,"2016-05-05T22:31:36.000" +1,1287,2,1260759187,"2016-05-05T22:32:36.000" +1,1293,2,1260759148,"2016-05-05T22:33:36.000" +1,1339,3.5,1260759125,"2016-05-05T22:34:36.000" +1,1343,2,1260759131,"2016-05-05T22:35:36.000" +1,1371,2.5,1260759135,"2016-05-05T22:36:36.000" +1,1405,1,1260759203,"2016-05-05T22:37:36.000" +1,1953,4,1260759191,"2016-05-05T20:37:36.000" +1,2105,4,1260759139,"2016-05-05T20:37:33.000" +1,2150,3,1260759194,"2016-05-05T22:37:36.000" +1,2193,2,1260759198,"2016-05-05T21:30:36.000" +1,2294,2,1260759108,"2016-05-05T20:37:36.000" +1,2455,2.5,1260759113,"2016-05-05T20:27:36.000" +1,2968,1,1260759200,"2016-05-05T20:25:36.000" +1,3671,3,1260759117,"2016-05-05T20:15:36.000" \ No newline at end of file diff --git a/src/test/scala/com/yotpo/metorikku/metric/test/MetricReporterTester.scala b/src/test/scala/com/yotpo/metorikku/metric/test/MetricReporterTester.scala new file mode 100644 index 000000000..ba5be3177 --- /dev/null +++ b/src/test/scala/com/yotpo/metorikku/metric/test/MetricReporterTester.scala @@ -0,0 +1,87 @@ +package com.yotpo.metorikku.metric.test +import com.yotpo.metorikku.metric.{Metric, MetricReporting} +import org.apache.log4j.{Level, LogManager, Logger} +import org.apache.spark.sql.{DataFrame, SQLContext, SparkSession} +import org.scalatest.{FunSuite, _} +import org.apache.spark.sql.types._ + +//noinspection ScalaStyle +class MetricReporterTester extends FunSuite with BeforeAndAfterEach { + private val log: Logger = LogManager.getLogger(this.getClass) + private var sparkSession : SparkSession = _ + Logger.getLogger("org").setLevel(Level.WARN) + + override def beforeEach() { + sparkSession = SparkSession.builder().appName("udf tests") + .master("local") + .config("", "") + .getOrCreate() + } + + test("Test getMaxDataframeTime") { + + val schema = StructType(Array( + StructField("userId", IntegerType, true), + StructField("movieId", IntegerType, true), + StructField("rating", DoubleType, true), + StructField("created_at", LongType, true), + StructField("updated_at", TimestampType, true))) + + val sparkSession = SparkSession.builder.appName("test").getOrCreate() + val sqlContext= new SQLContext(sparkSession.sparkContext) + val df = sparkSession.read.format("csv").option("header", "true").schema(schema).load("src/test/configurations/mocks/ratings_time.csv") + val metricReport = new MetricReporting() + val maxUpdatedAt = metricReport.getMaxDataframeTime(df, Option("updated_at"), None) + val maxCreatedAt = metricReport.getMaxDataframeTime(df, Option("created_at"), Option("SECONDS")) + + df.cache + assert(maxUpdatedAt == 1462488216000L) + assert(maxCreatedAt == 1260759205000L) + } + + test("Test getMaxDataframeTime FAILs with invalid reportLagTimeColumn Units specified") { + + val schema = StructType(Array( + StructField("userId", IntegerType, true), + StructField("movieId", IntegerType, true), + StructField("rating", DoubleType, true), + StructField("created_at", LongType, true), + StructField("updated_at", TimestampType, true))) + + val sparkSession = SparkSession.builder.appName("test").getOrCreate() + val sqlContext= new SQLContext(sparkSession.sparkContext) + + val df = sparkSession.read.format("csv").option("header", "true").schema(schema).load("src/test/configurations/mocks/ratings_time.csv") + val metricReport = new MetricReporting() + + val thrown = intercept[Exception] { + metricReport.getMaxDataframeTime(df, Option("created_at"), Option("HOUR")) + } + assert(thrown.getMessage.startsWith("Some(HOUR) is not a legal argument for units, use one of the following: [SECONDS,MILLISECONDS]")) + } + + test("Test getMaxDataframeTime FAILs reportLagTimeColumn is not defined") { + + val schema = StructType(Array( + StructField("userId", IntegerType, true), + StructField("movieId", IntegerType, true), + StructField("rating", DoubleType, true), + StructField("created_at", LongType, true), + StructField("updated_at", TimestampType, true))) + + val sparkSession = SparkSession.builder.appName("test").getOrCreate() + val sqlContext= new SQLContext(sparkSession.sparkContext) + + val df = sparkSession.read.format("csv").option("header", "true").schema(schema).load("src/test/configurations/mocks/ratings_time.csv") + val metricReport = new MetricReporting() + + val thrown = intercept[Exception] { + metricReport.getMaxDataframeTime(df, None, None) + } + assert(thrown.getMessage.startsWith("Failed to report lag time, reportLagTimeColumn is not defined")) + } + + override def afterEach() { + sparkSession.stop() + } +}