diff --git a/src/main/scala/com/yahoo/bullet/spark/JoinStreaming.scala b/src/main/scala/com/yahoo/bullet/spark/JoinStreaming.scala index e0bc8ed..e4e7be4 100644 --- a/src/main/scala/com/yahoo/bullet/spark/JoinStreaming.scala +++ b/src/main/scala/com/yahoo/bullet/spark/JoinStreaming.scala @@ -122,7 +122,7 @@ object JoinStreaming { } // If the query is not finished and data is not empty, consume data and check if it should be emitted. - if (!querierState.finished && filterResultData.data.nonEmpty) { + if (!querierState.finished && filterResultData.data != null && filterResultData.data.nonEmpty) { querierState.querier.combine(filterResultData.data) emitResults(key, querierState, bulletResults, metrics) } diff --git a/src/test/scala/com/yahoo/bullet/spark/JoinStreamingTest.scala b/src/test/scala/com/yahoo/bullet/spark/JoinStreamingTest.scala index 2ceb253..bc88453 100644 --- a/src/test/scala/com/yahoo/bullet/spark/JoinStreamingTest.scala +++ b/src/test/scala/com/yahoo/bullet/spark/JoinStreamingTest.scala @@ -73,6 +73,36 @@ class JoinStreamingTest extends BulletSparkTest { } } + it should "handle queries for which there is no state" in { + val inputQueries: mutable.Queue[RDD[(String, BulletData)]] = mutable.Queue() + val outputCollector = ListBuffer.empty[Array[(String, BulletResult)]] + + val pubSubMessage = new PubSubMessage("id", + makeSimpleAggregationFilterQuery("field", List("b235gf23b").asJava, Operation.EQUALS, RAW, 1)) + val runningQuery = new RunningQuery("id", pubSubMessage.getContent, config) + + val inputQueryStream = ssc.queueStream(inputQueries) + + val broadcastedConfig = BulletSparkConfig.getInstance(ssc, config) + val outputStream = JoinStreaming.join(ssc, inputQueryStream, broadcastedConfig) + + outputStream.foreachRDD(rdd => outputCollector += rdd.collect()) + + ssc.checkpoint("target/spark-test") + + ssc.start() + + // Consuming a RunningQuery in the join stage for which there is no state yet + val bulletQuery = BulletSparkUtils.createBulletQuerier(runningQuery, Querier.Mode.PARTITION, broadcastedConfig) + inputQueries += sc.makeRDD(Seq(("id", new FilterResultData(metadata, runningQuery, bulletQuery.getData)))) + + wait1second() // T = 1s + + eventually { + outputCollector.last.length should equal(0) + } + } + it should "output join results on query timeout" in { val inputQueries: mutable.Queue[RDD[(String, BulletData)]] = mutable.Queue() val outputCollector = ListBuffer.empty[Array[(String, BulletResult)]]