Skip to content

Commit

Permalink
Fixed null-pointer (#19)
Browse files Browse the repository at this point in the history
  • Loading branch information
NathanSpeidel authored Jan 11, 2019
1 parent 9179865 commit 5696c19
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 1 deletion.
2 changes: 1 addition & 1 deletion src/main/scala/com/yahoo/bullet/spark/JoinStreaming.scala
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ object JoinStreaming {
}

// If the query is not finished and data is not empty, consume data and check if it should be emitted.
if (!querierState.finished && filterResultData.data.nonEmpty) {
if (!querierState.finished && filterResultData.data != null && filterResultData.data.nonEmpty) {
querierState.querier.combine(filterResultData.data)
emitResults(key, querierState, bulletResults, metrics)
}
Expand Down
30 changes: 30 additions & 0 deletions src/test/scala/com/yahoo/bullet/spark/JoinStreamingTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,36 @@ class JoinStreamingTest extends BulletSparkTest {
}
}

it should "handle queries for which there is no state" in {
val inputQueries: mutable.Queue[RDD[(String, BulletData)]] = mutable.Queue()
val outputCollector = ListBuffer.empty[Array[(String, BulletResult)]]

val pubSubMessage = new PubSubMessage("id",
makeSimpleAggregationFilterQuery("field", List("b235gf23b").asJava, Operation.EQUALS, RAW, 1))
val runningQuery = new RunningQuery("id", pubSubMessage.getContent, config)

val inputQueryStream = ssc.queueStream(inputQueries)

val broadcastedConfig = BulletSparkConfig.getInstance(ssc, config)
val outputStream = JoinStreaming.join(ssc, inputQueryStream, broadcastedConfig)

outputStream.foreachRDD(rdd => outputCollector += rdd.collect())

ssc.checkpoint("target/spark-test")

ssc.start()

// Consuming a RunningQuery in the join stage for which there is no state yet
val bulletQuery = BulletSparkUtils.createBulletQuerier(runningQuery, Querier.Mode.PARTITION, broadcastedConfig)
inputQueries += sc.makeRDD(Seq(("id", new FilterResultData(metadata, runningQuery, bulletQuery.getData))))

wait1second() // T = 1s

eventually {
outputCollector.last.length should equal(0)
}
}

it should "output join results on query timeout" in {
val inputQueries: mutable.Queue[RDD[(String, BulletData)]] = mutable.Queue()
val outputCollector = ListBuffer.empty[Array[(String, BulletResult)]]
Expand Down

0 comments on commit 5696c19

Please sign in to comment.