diff --git a/pom.xml b/pom.xml index 8a21faa..037064c 100644 --- a/pom.xml +++ b/pom.xml @@ -2,7 +2,7 @@ 4.0.0 com.yahoo.bullet bullet-spark - 1.0.5-SNAPSHOT + 1.1.0-SNAPSHOT jar bullet-spark @@ -54,8 +54,8 @@ 2.12 1.3.0 3.0.1 - 1.3.1 - 1.1.6 + 1.4.2 + 1.1.7 1.9.2 2.6.0 2.2.1 diff --git a/src/main/scala/com/yahoo/bullet/spark/QueryDataUnioning.scala b/src/main/scala/com/yahoo/bullet/spark/QueryDataUnioning.scala index 2316c3f..0e95985 100644 --- a/src/main/scala/com/yahoo/bullet/spark/QueryDataUnioning.scala +++ b/src/main/scala/com/yahoo/bullet/spark/QueryDataUnioning.scala @@ -6,7 +6,7 @@ package com.yahoo.bullet.spark import com.yahoo.bullet.pubsub.Metadata.Signal -import com.yahoo.bullet.pubsub.PubSubMessage +import com.yahoo.bullet.pubsub.{PubSubMessage, PubSubMessageSerDe} import com.yahoo.bullet.record.BulletRecord import com.yahoo.bullet.spark.data.{BulletData, BulletErrorData, BulletSignalData, RunningQueryData} import com.yahoo.bullet.spark.utils.{BulletSparkConfig, BulletSparkUtils} @@ -40,8 +40,9 @@ object QueryDataUnioning { ): DStream[(String, BulletData)] = { // Create (id, BulletData) pairs out of the queries. val metrics = BulletSparkMetrics.getInstance(ssc, broadcastedConfig) + val messageSerDe = PubSubMessageSerDe.from(broadcastedConfig.value) val queryPairStream = queryStream.map(m => { - val output = (m.getId, BulletSparkUtils.createBulletData(m, broadcastedConfig)) + val output = (m.getId, BulletSparkUtils.createBulletData(messageSerDe.fromMessage(m), broadcastedConfig)) if (!output._2.isInstanceOf[BulletSignalData]) { BulletSparkMetrics.newQueryReceived(metrics) } diff --git a/src/main/scala/com/yahoo/bullet/spark/utils/BulletSparkUtils.scala b/src/main/scala/com/yahoo/bullet/spark/utils/BulletSparkUtils.scala index 1df44dc..04502e4 100644 --- a/src/main/scala/com/yahoo/bullet/spark/utils/BulletSparkUtils.scala +++ b/src/main/scala/com/yahoo/bullet/spark/utils/BulletSparkUtils.scala @@ -9,10 +9,9 @@ package com.yahoo.bullet.spark.utils import scala.collection.JavaConverters._ // scalastyle:on -import com.yahoo.bullet.common.{BulletError, SerializerDeserializer} +import com.yahoo.bullet.common.BulletError import com.yahoo.bullet.pubsub.Metadata.Signal import com.yahoo.bullet.pubsub.{Metadata, PubSubMessage} -import com.yahoo.bullet.query.Query import com.yahoo.bullet.querying.{Querier, RateLimitError, RunningQuery} import com.yahoo.bullet.result.{Clip, Meta} import com.yahoo.bullet.spark.data.{BulletData, BulletErrorData, BulletSignalData, QuerierData, RunningQueryData} @@ -27,7 +26,6 @@ object BulletSparkUtils { */ def createBulletData(pubSubMessage: PubSubMessage, broadcastedConfig: Broadcast[BulletSparkConfig]): BulletData = { val id = pubSubMessage.getId - val content = pubSubMessage.getContent val metadata = pubSubMessage.getMetadata val config = broadcastedConfig.value try { @@ -35,7 +33,7 @@ object BulletSparkUtils { if (signal != null && signal == Metadata.Signal.KILL || signal == Metadata.Signal.COMPLETE) { new BulletSignalData(metadata, signal) } else { - val query: Query = SerializerDeserializer.fromBytes(content) + val query = pubSubMessage.getContentAsQuery val runningQuery = new RunningQuery(id, query, metadata) new RunningQueryData(metadata, runningQuery) } diff --git a/src/test/scala/com/yahoo/bullet/spark/QueryDataUnioningTest.scala b/src/test/scala/com/yahoo/bullet/spark/QueryDataUnioningTest.scala index 33a2424..efa708a 100644 --- a/src/test/scala/com/yahoo/bullet/spark/QueryDataUnioningTest.scala +++ b/src/test/scala/com/yahoo/bullet/spark/QueryDataUnioningTest.scala @@ -5,12 +5,10 @@ */ package com.yahoo.bullet.spark -import com.yahoo.bullet.common.SerializerDeserializer - import scala.collection.mutable import scala.collection.mutable.ListBuffer import com.yahoo.bullet.pubsub.Metadata.Signal -import com.yahoo.bullet.pubsub.{Metadata, PubSubMessage} +import com.yahoo.bullet.pubsub.{ByteArrayPubSubMessageSerDe, PubSubMessage} import com.yahoo.bullet.query.{Projection, Query, Window} import com.yahoo.bullet.query.QueryUtils.makeFieldFilterQuery import com.yahoo.bullet.query.aggregations.Raw @@ -19,7 +17,7 @@ import com.yahoo.bullet.spark.utils.{BulletSparkConfig, BulletSparkUtils} import org.apache.spark.rdd.RDD class QueryDataUnioningTest extends BulletSparkTest { - private val metadata = new Metadata() + private val messageSerDe = new ByteArrayPubSubMessageSerDe(new BulletSparkConfig("src/test/resources/test_config.yaml")) behavior of "The query data unioning stage" @@ -38,9 +36,10 @@ class QueryDataUnioningTest extends BulletSparkTest { ssc.start() val query = makeFieldFilterQuery("b235gf23b") - val pubSubMessage1 = new PubSubMessage("id1", SerializerDeserializer.toBytes(query), metadata) + val pubSubMessage1 = messageSerDe.toMessage("id1", query, null) // Json parsing error. - val pubSubMessage2 = new PubSubMessage("id2", "This is a json parsing error pubsub message.") + val pubSubMessage2 = messageSerDe.toMessage("id2", null, null) + pubSubMessage2.setContent("This is a query deserialization error.") inputQueries += sc.makeRDD(Seq(pubSubMessage1, pubSubMessage2)) wait1second() // T = 1s @@ -92,7 +91,7 @@ class QueryDataUnioningTest extends BulletSparkTest { ssc.start() val query = new Query(new Projection(), null, new Raw(1), null, new Window(), 1L) - val pubSubMessage1 = new PubSubMessage("id1", SerializerDeserializer.toBytes(query), metadata) + val pubSubMessage1 = messageSerDe.toMessage("id1", query, null) inputQueries += sc.makeRDD(Seq(pubSubMessage1)) wait1second() // T = 1s