Skip to content

Commit

Permalink
Using PubSubMessageSerDe (#29)
Browse files Browse the repository at this point in the history
  • Loading branch information
0aix authored Jul 7, 2021
1 parent df71219 commit d1a2470
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 16 deletions.
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.yahoo.bullet</groupId>
<artifactId>bullet-spark</artifactId>
<version>1.0.5-SNAPSHOT</version>
<version>1.1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>bullet-spark</name>

Expand Down Expand Up @@ -54,8 +54,8 @@
<scala.dep.version>2.12</scala.dep.version>
<scoverage.plugin.version>1.3.0</scoverage.plugin.version>
<spark.version>3.0.1</spark.version>
<bullet.core.version>1.3.1</bullet.core.version>
<bullet.dsl.version>1.1.6</bullet.dsl.version>
<bullet.core.version>1.4.2</bullet.core.version>
<bullet.dsl.version>1.1.7</bullet.dsl.version>
<avro.version>1.9.2</avro.version>
<kafka.clients.version>2.6.0</kafka.clients.version>
<pulsar.client.version>2.2.1</pulsar.client.version>
Expand Down
5 changes: 3 additions & 2 deletions src/main/scala/com/yahoo/bullet/spark/QueryDataUnioning.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package com.yahoo.bullet.spark

import com.yahoo.bullet.pubsub.Metadata.Signal
import com.yahoo.bullet.pubsub.PubSubMessage
import com.yahoo.bullet.pubsub.{PubSubMessage, PubSubMessageSerDe}
import com.yahoo.bullet.record.BulletRecord
import com.yahoo.bullet.spark.data.{BulletData, BulletErrorData, BulletSignalData, RunningQueryData}
import com.yahoo.bullet.spark.utils.{BulletSparkConfig, BulletSparkUtils}
Expand Down Expand Up @@ -40,8 +40,9 @@ object QueryDataUnioning {
): DStream[(String, BulletData)] = {
// Create (id, BulletData) pairs out of the queries.
val metrics = BulletSparkMetrics.getInstance(ssc, broadcastedConfig)
val messageSerDe = PubSubMessageSerDe.from(broadcastedConfig.value)
val queryPairStream = queryStream.map(m => {
val output = (m.getId, BulletSparkUtils.createBulletData(m, broadcastedConfig))
val output = (m.getId, BulletSparkUtils.createBulletData(messageSerDe.fromMessage(m), broadcastedConfig))
if (!output._2.isInstanceOf[BulletSignalData]) {
BulletSparkMetrics.newQueryReceived(metrics)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ package com.yahoo.bullet.spark.utils
import scala.collection.JavaConverters._
// scalastyle:on

import com.yahoo.bullet.common.{BulletError, SerializerDeserializer}
import com.yahoo.bullet.common.BulletError
import com.yahoo.bullet.pubsub.Metadata.Signal
import com.yahoo.bullet.pubsub.{Metadata, PubSubMessage}
import com.yahoo.bullet.query.Query
import com.yahoo.bullet.querying.{Querier, RateLimitError, RunningQuery}
import com.yahoo.bullet.result.{Clip, Meta}
import com.yahoo.bullet.spark.data.{BulletData, BulletErrorData, BulletSignalData, QuerierData, RunningQueryData}
Expand All @@ -27,15 +26,14 @@ object BulletSparkUtils {
*/
def createBulletData(pubSubMessage: PubSubMessage, broadcastedConfig: Broadcast[BulletSparkConfig]): BulletData = {
val id = pubSubMessage.getId
val content = pubSubMessage.getContent
val metadata = pubSubMessage.getMetadata
val config = broadcastedConfig.value
try {
val signal: Metadata.Signal = if (metadata == null) null else metadata.getSignal
if (signal != null && signal == Metadata.Signal.KILL || signal == Metadata.Signal.COMPLETE) {
new BulletSignalData(metadata, signal)
} else {
val query: Query = SerializerDeserializer.fromBytes(content)
val query = pubSubMessage.getContentAsQuery
val runningQuery = new RunningQuery(id, query, metadata)
new RunningQueryData(metadata, runningQuery)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@
*/
package com.yahoo.bullet.spark

import com.yahoo.bullet.common.SerializerDeserializer

import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import com.yahoo.bullet.pubsub.Metadata.Signal
import com.yahoo.bullet.pubsub.{Metadata, PubSubMessage}
import com.yahoo.bullet.pubsub.{ByteArrayPubSubMessageSerDe, PubSubMessage}
import com.yahoo.bullet.query.{Projection, Query, Window}
import com.yahoo.bullet.query.QueryUtils.makeFieldFilterQuery
import com.yahoo.bullet.query.aggregations.Raw
Expand All @@ -19,7 +17,7 @@ import com.yahoo.bullet.spark.utils.{BulletSparkConfig, BulletSparkUtils}
import org.apache.spark.rdd.RDD

class QueryDataUnioningTest extends BulletSparkTest {
private val metadata = new Metadata()
private val messageSerDe = new ByteArrayPubSubMessageSerDe(new BulletSparkConfig("src/test/resources/test_config.yaml"))

behavior of "The query data unioning stage"

Expand All @@ -38,9 +36,10 @@ class QueryDataUnioningTest extends BulletSparkTest {
ssc.start()

val query = makeFieldFilterQuery("b235gf23b")
val pubSubMessage1 = new PubSubMessage("id1", SerializerDeserializer.toBytes(query), metadata)
val pubSubMessage1 = messageSerDe.toMessage("id1", query, null)
// Json parsing error.
val pubSubMessage2 = new PubSubMessage("id2", "This is a json parsing error pubsub message.")
val pubSubMessage2 = messageSerDe.toMessage("id2", null, null)
pubSubMessage2.setContent("This is a query deserialization error.")
inputQueries += sc.makeRDD(Seq(pubSubMessage1, pubSubMessage2))
wait1second() // T = 1s

Expand Down Expand Up @@ -92,7 +91,7 @@ class QueryDataUnioningTest extends BulletSparkTest {
ssc.start()

val query = new Query(new Projection(), null, new Raw(1), null, new Window(), 1L)
val pubSubMessage1 = new PubSubMessage("id1", SerializerDeserializer.toBytes(query), metadata)
val pubSubMessage1 = messageSerDe.toMessage("id1", query, null)
inputQueries += sc.makeRDD(Seq(pubSubMessage1))
wait1second() // T = 1s

Expand Down

0 comments on commit d1a2470

Please sign in to comment.