Skip to content

Commit

Permalink
fix(MetricReporting):match on all possible time units. (#325)
Browse files Browse the repository at this point in the history
  • Loading branch information
Doe-Ed authored May 24, 2020
1 parent 9d5058d commit 4145646
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.yotpo.metorikku.metric

import java.util.concurrent.TimeUnit

import com.yotpo.metorikku.exceptions.MetorikkuWriteFailedException
import com.yotpo.metorikku.exceptions.{MetorikkuException, MetorikkuWriteFailedException}
import com.yotpo.metorikku.instrumentation.InstrumentationProvider
import org.apache.log4j.LogManager
import org.apache.spark.sql.DataFrame
Expand All @@ -18,10 +18,11 @@ class MetricReporting {
try {
reportLagTimeColumnUnits match {
case Some(units) => TimeUnit.valueOf(units) match {
case TimeUnit.SECONDS => TimeUnit.SECONDS.toMillis(dataFrame.agg({timeColumn.toString -> "max"}).collect()(0).getLong(0))
case TimeUnit.MILLISECONDS => TimeUnit.MILLISECONDS.toMillis(dataFrame.agg({timeColumn.toString -> "max"}).collect()(0).getLong(0))
case TimeUnit.MILLISECONDS => TimeUnit.MILLISECONDS.toMillis(dataFrame.agg({timeColumn -> "max"}).collect()(0).getLong(0))
case TimeUnit.SECONDS => TimeUnit.SECONDS.toMillis(dataFrame.agg({timeColumn -> "max"}).collect()(0).getLong(0))
case _ => throw MetorikkuException("Unsupported time unit type " + TimeUnit.valueOf(units))
}
case _=> dataFrame.agg({timeColumn.toString -> "max"}).collect()(0).getTimestamp(0).getTime()
case _ => dataFrame.agg({timeColumn -> "max"}).collect()(0).getTimestamp(0).getTime()
}
} catch {
case e: ClassCastException => throw new ClassCastException(s"Lag instrumentation column -${timeColumn} " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ class MetricReporterTester extends FunSuite with BeforeAndAfterEach {
StructField("updated_at", TimestampType, true)))

val sparkSession = SparkSession.builder.appName("test").getOrCreate()
val sqlContext= new SQLContext(sparkSession.sparkContext)
val df = sparkSession.read.format("csv").option("header", "true").schema(schema).load("src/test/configurations/mocks/ratings_time.csv")
val metricReport = new MetricReporting()
val maxUpdatedAt = metricReport.getMaxDataframeTime(df, Option("updated_at"), None)
Expand Down Expand Up @@ -70,7 +69,6 @@ class MetricReporterTester extends FunSuite with BeforeAndAfterEach {
StructField("updated_at", TimestampType, true)))

val sparkSession = SparkSession.builder.appName("test").getOrCreate()
val sqlContext= new SQLContext(sparkSession.sparkContext)

val df = sparkSession.read.format("csv").option("header", "true").schema(schema).load("src/test/configurations/mocks/ratings_time.csv")
val metricReport = new MetricReporting()
Expand Down

0 comments on commit 4145646

Please sign in to comment.