From 4145646feec887def726b4cffc43979141482933 Mon Sep 17 00:00:00 2001 From: Doe-Ed <63307024+Doe-Ed@users.noreply.github.com> Date: Sun, 24 May 2020 11:14:35 +0300 Subject: [PATCH] fix(MetricReporting):match on all possible time units. (#325) --- .../com/yotpo/metorikku/metric/MetricReporting.scala | 9 +++++---- .../metorikku/metric/test/MetricReporterTester.scala | 2 -- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/main/scala/com/yotpo/metorikku/metric/MetricReporting.scala b/src/main/scala/com/yotpo/metorikku/metric/MetricReporting.scala index 2c2552674..4583bb97b 100644 --- a/src/main/scala/com/yotpo/metorikku/metric/MetricReporting.scala +++ b/src/main/scala/com/yotpo/metorikku/metric/MetricReporting.scala @@ -2,7 +2,7 @@ package com.yotpo.metorikku.metric import java.util.concurrent.TimeUnit -import com.yotpo.metorikku.exceptions.MetorikkuWriteFailedException +import com.yotpo.metorikku.exceptions.{MetorikkuException, MetorikkuWriteFailedException} import com.yotpo.metorikku.instrumentation.InstrumentationProvider import org.apache.log4j.LogManager import org.apache.spark.sql.DataFrame @@ -18,10 +18,11 @@ class MetricReporting { try { reportLagTimeColumnUnits match { case Some(units) => TimeUnit.valueOf(units) match { - case TimeUnit.SECONDS => TimeUnit.SECONDS.toMillis(dataFrame.agg({timeColumn.toString -> "max"}).collect()(0).getLong(0)) - case TimeUnit.MILLISECONDS => TimeUnit.MILLISECONDS.toMillis(dataFrame.agg({timeColumn.toString -> "max"}).collect()(0).getLong(0)) + case TimeUnit.MILLISECONDS => TimeUnit.MILLISECONDS.toMillis(dataFrame.agg({timeColumn -> "max"}).collect()(0).getLong(0)) + case TimeUnit.SECONDS => TimeUnit.SECONDS.toMillis(dataFrame.agg({timeColumn -> "max"}).collect()(0).getLong(0)) + case _ => throw MetorikkuException("Unsupported time unit type " + TimeUnit.valueOf(units)) } - case _=> dataFrame.agg({timeColumn.toString -> "max"}).collect()(0).getTimestamp(0).getTime() + case _ => dataFrame.agg({timeColumn -> "max"}).collect()(0).getTimestamp(0).getTime() } } catch { case e: ClassCastException => throw new ClassCastException(s"Lag instrumentation column -${timeColumn} " + diff --git a/src/test/scala/com/yotpo/metorikku/metric/test/MetricReporterTester.scala b/src/test/scala/com/yotpo/metorikku/metric/test/MetricReporterTester.scala index ba5be3177..4c1cf13ba 100644 --- a/src/test/scala/com/yotpo/metorikku/metric/test/MetricReporterTester.scala +++ b/src/test/scala/com/yotpo/metorikku/metric/test/MetricReporterTester.scala @@ -28,7 +28,6 @@ class MetricReporterTester extends FunSuite with BeforeAndAfterEach { StructField("updated_at", TimestampType, true))) val sparkSession = SparkSession.builder.appName("test").getOrCreate() - val sqlContext= new SQLContext(sparkSession.sparkContext) val df = sparkSession.read.format("csv").option("header", "true").schema(schema).load("src/test/configurations/mocks/ratings_time.csv") val metricReport = new MetricReporting() val maxUpdatedAt = metricReport.getMaxDataframeTime(df, Option("updated_at"), None) @@ -70,7 +69,6 @@ class MetricReporterTester extends FunSuite with BeforeAndAfterEach { StructField("updated_at", TimestampType, true))) val sparkSession = SparkSession.builder.appName("test").getOrCreate() - val sqlContext= new SQLContext(sparkSession.sparkContext) val df = sparkSession.read.format("csv").option("header", "true").schema(schema).load("src/test/configurations/mocks/ratings_time.csv") val metricReport = new MetricReporting()