Skip to content

Commit

Permalink
feat(DATA-2956_influx_lag_reporter) (#308)
Browse files Browse the repository at this point in the history
* feat(DATA-2956_influx_lag_reporter)

* from map to string type

* influx_lag_reporter-fix

* new case class for StreamingWriting

* suuport lag units

* using TimeUnit

* unitTest + getMaxDfTime function

* removed outside from Metric class

* move MetricReporting to new file

* logging
  • Loading branch information
Irenez753 authored May 13, 2020
1 parent 8fb6786 commit ef770ef
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 20 deletions.
6 changes: 6 additions & 0 deletions config/metric_config_sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,12 @@ output:
extraOptions:
es.index.auto.create: false
- outputType: Hudi
# Optional: If enabled, lag between current timestamp and max dataframe's time column (configurable) will be reported.
reportLag: true
# Optional: Column name that will be used for calculating the lag, supported types: (Long, Timestamp, Integer), must be provided if "reportLag" is enabled.
reportLagTimeColumn: Option[String]
# Optional: Represents time durations of the "reportLagTimeColumn" (Long or Integer), at a given unit of granularity, optional values: [MILLISECONDS (Default), SECONDS]
reportLagTimeColumnUnits: Option[String]
outputOptions:
path: path
# The key to use for upserts
Expand Down
4 changes: 4 additions & 0 deletions examples/hudi/ratings_no_partitions_metric.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@ steps:
sql:
SELECT *,
userId AS hoodie_key,
CAST(timestamp AS LONG) AS lag_timestamp,
'no_partitions' as test_partitions
FROM ratings
output:
- dataFrameName: hive_sync_test
outputType: Hudi
reportLag: true
reportLagTimeColumn: lag_timestamp
reportLagTimeColumnUnits: SECONDS
outputOptions:
path: table_view_manual_hive_sync_no_partitions
keyColumn: hoodie_key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import com.fasterxml.jackson.module.scala.JsonScalaEnumeration
case class Output(name: Option[String],
dataFrameName: String,
@JsonScalaEnumeration(classOf[OutputTypeReference]) outputType: OutputType.OutputType,
reportLag: Option[Boolean],
reportLagTimeColumn: Option[String],
reportLagTimeColumnUnits: Option[String],
repartition: Option[Int],
coalesce: Option[Boolean],
outputOptions: Map[String, Any])
Expand Down
54 changes: 34 additions & 20 deletions src/main/scala/com/yotpo/metorikku/metric/Metric.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.yotpo.metorikku.metric

import java.io.File
import java.util.concurrent.TimeUnit

import com.yotpo.metorikku.Job
import com.yotpo.metorikku.configuration.job.Streaming
Expand All @@ -10,13 +11,15 @@ import com.yotpo.metorikku.exceptions.{MetorikkuFailedStepException, MetorikkuWr
import com.yotpo.metorikku.instrumentation.InstrumentationProvider
import com.yotpo.metorikku.output.{Writer, WriterFactory}
import org.apache.log4j.LogManager
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.TimestampType
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.streaming.Seconds

import scala.collection.mutable
import scala.collection.mutable.ListBuffer

case class StreamingWritingConfiguration(dataFrame: DataFrame, writers: ListBuffer[Writer] = ListBuffer.empty)

case class StreamingWritingConfiguration(dataFrame: DataFrame, outputConfig: Output, writers: ListBuffer[Writer] = ListBuffer.empty)
case class StreamingWriting(streamingWritingConfiguration: StreamingWritingConfiguration)
case class Metric(configuration: Configuration, metricDir: File, metricName: String) {
val log = LogManager.getLogger(this.getClass)

Expand All @@ -43,21 +46,27 @@ case class Metric(configuration: Configuration, metricDir: File, metricName: Str
}
}

private def writeStream(dataFrameName: String, writerConfig: StreamingWritingConfiguration,
streamingConfig: Option[Streaming]): Unit = {
private def writeStream(dataFrameName: String,
writerConfig: StreamingWritingConfiguration,
streamingConfig: Option[Streaming],
instrumentationProvider: InstrumentationProvider): Unit = {
log.info(s"Starting to write streaming results of ${dataFrameName}")

streamingConfig match {
case Some(config) => {
val streamWriter = writerConfig.dataFrame.writeStream
config.applyOptions(streamWriter)

config.batchMode match {
case Some(true) => {
val query = streamWriter.foreachBatch((batchDF: DataFrame, _: Long) => {
writerConfig.writers.foreach(writer => writer.write(batchDF))
writerConfig.outputConfig.reportLag match {
case Some(true) => new MetricReporting().reportLagTime(batchDF, writerConfig.outputConfig.reportLagTimeColumn,
writerConfig.outputConfig.reportLagTimeColumnUnits, instrumentationProvider)
case _ =>
}
}).start()
query.awaitTermination()

// Exit this function after streaming is completed
return
}
Expand All @@ -79,7 +88,7 @@ case class Metric(configuration: Configuration, metricDir: File, metricName: Str
private def writeBatch(dataFrame: DataFrame,
dataFrameName: String,
writer: Writer,
outputType: OutputType,
outputConfig: Output,
instrumentationProvider: InstrumentationProvider,
cacheCountOnOutput: Option[Boolean]): Unit = {

Expand All @@ -90,16 +99,19 @@ case class Metric(configuration: Configuration, metricDir: File, metricName: Str
}
case _ => 0
}
val tags = Map("metric" -> metricName, "dataframe" -> dataFrameName, "output_type" -> outputType.toString)
val tags = Map("metric" -> metricName, "dataframe" -> dataFrameName, "output_type" -> outputConfig.outputType.toString)
instrumentationProvider.count(name="counter", value=dataFrameCount, tags=tags)
log.info(s"Starting to Write results of ${dataFrameName}")
try {
writer.write(dataFrame)
}
catch {
outputConfig.reportLag match {
case Some(true) => new MetricReporting().reportLagTime(dataFrame, outputConfig.reportLagTimeColumn,
outputConfig.reportLagTimeColumnUnits, instrumentationProvider)
case _ =>
} } catch {
case ex: Exception => {
throw MetorikkuWriteFailedException(s"Failed to write dataFrame: " +
s"$dataFrameName to output: ${outputType} on metric: ${metricName}", ex)
s"$dataFrameName to output: ${outputConfig.outputType} on metric: ${metricName}", ex)
}
}
}
Expand All @@ -117,31 +129,33 @@ case class Metric(configuration: Configuration, metricDir: File, metricName: Str
}
}


def write(job: Job): Unit = {

configuration.output match {
case Some(output) => {
val streamingWriterList: mutable.Map[String, StreamingWritingConfiguration] = mutable.Map()

val streamingWriterList: mutable.Map[String, StreamingWriting] = mutable.Map()
output.foreach(outputConfig => {
val writer = WriterFactory.get(outputConfig, metricName, job.config, job)
val dataFrameName = outputConfig.dataFrameName
val dataFrame = repartition(outputConfig, job.sparkSession.table(dataFrameName))

if (dataFrame.isStreaming) {
val streamingWriterConfig = streamingWriterList.getOrElse(dataFrameName, StreamingWritingConfiguration(dataFrame))
streamingWriterConfig.writers += writer
val streamingWriterConfig = streamingWriterList.getOrElse(dataFrameName, StreamingWriting(StreamingWritingConfiguration(dataFrame, outputConfig)))
streamingWriterConfig.streamingWritingConfiguration.writers += writer
streamingWriterList += (dataFrameName -> streamingWriterConfig)
}
else {
writeBatch(dataFrame, dataFrameName, writer,
outputConfig.outputType, job.instrumentationClient, job.config.cacheCountOnOutput)
writeBatch(dataFrame, dataFrameName, writer, outputConfig, job.instrumentationClient,
job.config.cacheCountOnOutput)
}
})

for ((dataFrameName, config) <- streamingWriterList) writeStream(dataFrameName, config, job.config.streaming)
for ((dataFrameName, streamingConfig) <- streamingWriterList) writeStream(dataFrameName,
streamingConfig.streamingWritingConfiguration, job.config.streaming, job.instrumentationClient)

}
case None =>
}
}
}

46 changes: 46 additions & 0 deletions src/main/scala/com/yotpo/metorikku/metric/MetricReporting.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.yotpo.metorikku.metric

import java.util.concurrent.TimeUnit

import com.yotpo.metorikku.exceptions.MetorikkuWriteFailedException
import com.yotpo.metorikku.instrumentation.InstrumentationProvider
import org.apache.log4j.LogManager
import org.apache.spark.sql.DataFrame

class MetricReporting {
val log = LogManager.getLogger(this.getClass)

def getMaxDataframeTime(dataFrame: DataFrame, reportLagTimeColumn: Option[String],
reportLagTimeColumnUnits:Option[String]): Long ={
reportLagTimeColumn match {
case Some(timeColumn) => {
dataFrame.cache()
try {
reportLagTimeColumnUnits match {
case Some(units) => TimeUnit.valueOf(units) match {
case TimeUnit.SECONDS => TimeUnit.SECONDS.toMillis(dataFrame.agg({timeColumn.toString -> "max"}).collect()(0).getLong(0))
case TimeUnit.MILLISECONDS => TimeUnit.MILLISECONDS.toMillis(dataFrame.agg({timeColumn.toString -> "max"}).collect()(0).getLong(0))
}
case _=> dataFrame.agg({timeColumn.toString -> "max"}).collect()(0).getTimestamp(0).getTime()
}
} catch {
case e: ClassCastException => throw new ClassCastException(s"Lag instrumentation column -${timeColumn} " +
s"cannot be cast to spark.sql.Timestamp or spark.sql.Long")
case e: IllegalArgumentException => throw new MetorikkuWriteFailedException(
s"${reportLagTimeColumnUnits} is not a legal argument for units, use one of the following: [SECONDS,MILLISECONDS]")
}
}
case _=> throw MetorikkuWriteFailedException("Failed to report lag time, reportLagTimeColumn is not defined")
}
}

def reportLagTime(dataFrame: DataFrame, reportLagTimeColumn: Option[String],
reportLagTimeColumnUnits:Option[String],
instrumentationProvider: InstrumentationProvider) : Unit ={
val maxDataframeTime = getMaxDataframeTime(dataFrame, reportLagTimeColumn, reportLagTimeColumnUnits)
log.info(s"Max column ${reportLagTimeColumn} value is ${maxDataframeTime} for ${dataFrame}")
val lag = System.currentTimeMillis - maxDataframeTime
log.info(s"Reporting lag value: ${lag} for ${dataFrame}")
instrumentationProvider.gauge(name = "lag", lag)
}
}
21 changes: 21 additions & 0 deletions src/test/configurations/mocks/ratings_time.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
"userId","movieId","rating","created_at","updated_at"
1,31,2.5,1260759144,"2016-05-05T22:37:36.000"
1,1029,3,1260759179,"2016-05-05T22:43:36.000"
1,1061,3,1260759182,"2016-05-05T22:38:36.000"
1,1129,2,1260759185,"2016-05-05T22:39:36.000"
1,1172,4,1260759205,"2016-05-05T22:30:36.000"
1,1263,2,1260759151,"2016-05-05T22:31:36.000"
1,1287,2,1260759187,"2016-05-05T22:32:36.000"
1,1293,2,1260759148,"2016-05-05T22:33:36.000"
1,1339,3.5,1260759125,"2016-05-05T22:34:36.000"
1,1343,2,1260759131,"2016-05-05T22:35:36.000"
1,1371,2.5,1260759135,"2016-05-05T22:36:36.000"
1,1405,1,1260759203,"2016-05-05T22:37:36.000"
1,1953,4,1260759191,"2016-05-05T20:37:36.000"
1,2105,4,1260759139,"2016-05-05T20:37:33.000"
1,2150,3,1260759194,"2016-05-05T22:37:36.000"
1,2193,2,1260759198,"2016-05-05T21:30:36.000"
1,2294,2,1260759108,"2016-05-05T20:37:36.000"
1,2455,2.5,1260759113,"2016-05-05T20:27:36.000"
1,2968,1,1260759200,"2016-05-05T20:25:36.000"
1,3671,3,1260759117,"2016-05-05T20:15:36.000"
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package com.yotpo.metorikku.metric.test
import com.yotpo.metorikku.metric.{Metric, MetricReporting}
import org.apache.log4j.{Level, LogManager, Logger}
import org.apache.spark.sql.{DataFrame, SQLContext, SparkSession}
import org.scalatest.{FunSuite, _}
import org.apache.spark.sql.types._

//noinspection ScalaStyle
class MetricReporterTester extends FunSuite with BeforeAndAfterEach {
private val log: Logger = LogManager.getLogger(this.getClass)
private var sparkSession : SparkSession = _
Logger.getLogger("org").setLevel(Level.WARN)

override def beforeEach() {
sparkSession = SparkSession.builder().appName("udf tests")
.master("local")
.config("", "")
.getOrCreate()
}

test("Test getMaxDataframeTime") {

val schema = StructType(Array(
StructField("userId", IntegerType, true),
StructField("movieId", IntegerType, true),
StructField("rating", DoubleType, true),
StructField("created_at", LongType, true),
StructField("updated_at", TimestampType, true)))

val sparkSession = SparkSession.builder.appName("test").getOrCreate()
val sqlContext= new SQLContext(sparkSession.sparkContext)
val df = sparkSession.read.format("csv").option("header", "true").schema(schema).load("src/test/configurations/mocks/ratings_time.csv")
val metricReport = new MetricReporting()
val maxUpdatedAt = metricReport.getMaxDataframeTime(df, Option("updated_at"), None)
val maxCreatedAt = metricReport.getMaxDataframeTime(df, Option("created_at"), Option("SECONDS"))

df.cache
assert(maxUpdatedAt == 1462488216000L)
assert(maxCreatedAt == 1260759205000L)
}

test("Test getMaxDataframeTime FAILs with invalid reportLagTimeColumn Units specified") {

val schema = StructType(Array(
StructField("userId", IntegerType, true),
StructField("movieId", IntegerType, true),
StructField("rating", DoubleType, true),
StructField("created_at", LongType, true),
StructField("updated_at", TimestampType, true)))

val sparkSession = SparkSession.builder.appName("test").getOrCreate()
val sqlContext= new SQLContext(sparkSession.sparkContext)

val df = sparkSession.read.format("csv").option("header", "true").schema(schema).load("src/test/configurations/mocks/ratings_time.csv")
val metricReport = new MetricReporting()

val thrown = intercept[Exception] {
metricReport.getMaxDataframeTime(df, Option("created_at"), Option("HOUR"))
}
assert(thrown.getMessage.startsWith("Some(HOUR) is not a legal argument for units, use one of the following: [SECONDS,MILLISECONDS]"))
}

test("Test getMaxDataframeTime FAILs reportLagTimeColumn is not defined") {

val schema = StructType(Array(
StructField("userId", IntegerType, true),
StructField("movieId", IntegerType, true),
StructField("rating", DoubleType, true),
StructField("created_at", LongType, true),
StructField("updated_at", TimestampType, true)))

val sparkSession = SparkSession.builder.appName("test").getOrCreate()
val sqlContext= new SQLContext(sparkSession.sparkContext)

val df = sparkSession.read.format("csv").option("header", "true").schema(schema).load("src/test/configurations/mocks/ratings_time.csv")
val metricReport = new MetricReporting()

val thrown = intercept[Exception] {
metricReport.getMaxDataframeTime(df, None, None)
}
assert(thrown.getMessage.startsWith("Failed to report lag time, reportLagTimeColumn is not defined"))
}

override def afterEach() {
sparkSession.stop()
}
}

0 comments on commit ef770ef

Please sign in to comment.