Skip to content

Commit

Permalink
it: cross navigation spec
Browse files Browse the repository at this point in the history
  • Loading branch information
peel committed Jun 13, 2024
1 parent 52ad0a0 commit b60022b
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 22 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"schema": "iglu:com.snowplowanalytics.snowplow.enrichments/cross_navigation_config/jsonschema/1-0-0",

"data": {
"enabled": true,
"vendor": "com.snowplowanalytics.snowplow.enrichments",
"name": "cross_navigation_config"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ object CollectorPayloadGen {
version <- Gen.const("tp2")
api = CollectorPayload.Api(vendor, version)

queryString = Nil
queryString = List(new org.apache.http.message.BasicNameValuePair("url", "http://example.com?_sp=6de9024e-17b9-4026-bd4d-efec50ae84cb.1680681134458"))

contentType = Some("application/json")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ class EnrichKafkaSpec extends Specification with CatsEffect {
val enrichedStream = "it-enrich-kinesis-enriched"
val badRowsStream = "it-enrich-kinesis-bad"

val nbGood = 100l
val nbBad = 10l
val nbGood = 100L
val nbBad = 10L

type AggregateGood = List[Event]
type AggregateBad = List[String]
Expand All @@ -55,12 +55,12 @@ class EnrichKafkaSpec extends Specification with CatsEffect {
val bootstrapServers = s"localhost:$kafkaPort"

val consumerConf: Map[String, String] = Map(
"group.id" -> "it-enrich",
"auto.offset.reset" -> "earliest",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.ByteArrayDeserializer",
"security.protocol" -> "PLAINTEXT",
"sasl.mechanism" -> "GSSAPI"
"group.id" -> "it-enrich",
"auto.offset.reset" -> "earliest",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.ByteArrayDeserializer",
"security.protocol" -> "PLAINTEXT",
"sasl.mechanism" -> "GSSAPI"
)

val producerConf: Map[String, String] = Map(
Expand All @@ -71,35 +71,42 @@ class EnrichKafkaSpec extends Specification with CatsEffect {

def run(): IO[Aggregates] = {

val resources = Sink.init[IO](OutKafka(collectorPayloadsStream, bootstrapServers, "", Set.empty, producerConf), classOf[SourceAuthHandler].getName)
val resources =
Sink.init[IO](OutKafka(collectorPayloadsStream, bootstrapServers, "", Set.empty, producerConf), classOf[SourceAuthHandler].getName)

resources.use { sink =>
val generate =
CollectorPayloadGen.generate[IO](nbGood, nbBad)
CollectorPayloadGen
.generate[IO](nbGood, nbBad)
.evalMap(events => sink(List(events)))
.onComplete(fs2.Stream.eval(Logger[IO].info(s"Random data has been generated and sent to $collectorPayloadsStream")))

def consume(refGood: Ref[IO, AggregateGood], refBad: Ref[IO, AggregateBad]): Stream[IO, Unit] =
consumeGood(refGood).merge(consumeBad(refBad))

def consumeGood(ref: Ref[IO, AggregateGood]): Stream[IO, Unit] =
Source.init[IO](InKafka(enrichedStream, bootstrapServers, consumerConf), classOf[GoodSinkAuthHandler].getName).map(_.record.value).evalMap(aggregateGood(_, ref))
Source
.init[IO](InKafka(enrichedStream, bootstrapServers, consumerConf), classOf[GoodSinkAuthHandler].getName)
.map(_.record.value)
.evalMap(aggregateGood(_, ref))

def consumeBad(ref: Ref[IO, AggregateBad]): Stream[IO, Unit] =
Source.init[IO](InKafka(badRowsStream, bootstrapServers, consumerConf), classOf[BadSinkAuthHandler].getName).map(_.record.value).evalMap(aggregateBad(_, ref))
Source
.init[IO](InKafka(badRowsStream, bootstrapServers, consumerConf), classOf[BadSinkAuthHandler].getName)
.map(_.record.value)
.evalMap(aggregateBad(_, ref))

def aggregateGood(r: Array[Byte], ref: Ref[IO, AggregateGood]): IO[Unit] =
for {
e <- IO(Event.parse(new String(r)).getOrElse(throw new RuntimeException("can't parse enriched event")))
_ <- ref.update(updateAggregateGood(_, e))
} yield ()

def aggregateBad(r: Array[Byte], ref: Ref[IO, AggregateBad]): IO[Unit] = {
def aggregateBad(r: Array[Byte], ref: Ref[IO, AggregateBad]): IO[Unit] =
for {
br <- IO(new String(r))
_ <- ref.update(updateAggregateBad(_, br))
} yield ()
}

def updateAggregateGood(aggregate: AggregateGood, e: Event): AggregateGood =
e :: aggregate
Expand All @@ -110,13 +117,12 @@ class EnrichKafkaSpec extends Specification with CatsEffect {
for {
refGood <- Ref.of[IO, AggregateGood](Nil)
refBad <- Ref.of[IO, AggregateBad](Nil)
_ <-
generate
.merge(consume(refGood, refBad))
.interruptAfter(30.seconds)
.attempt
.compile
.drain
_ <- generate
.merge(consume(refGood, refBad))
.interruptAfter(30.seconds)
.attempt
.compile
.drain
aggregateGood <- refGood.get
aggregateBad <- refBad.get
} yield Aggregates(aggregateGood, aggregateBad)
Expand All @@ -130,6 +136,12 @@ class EnrichKafkaSpec extends Specification with CatsEffect {
aggregates.good.size must beEqualTo(nbGood)
}

"contain cross platform context" in {
val crossNavigationExpected =
aggregates.good.map(_.derived_contexts.data.flatMap(_.data.findAllByKey("domain_user_id").flatMap(_.asString.toList)))
crossNavigationExpected must contain(contain(be_===("6de9024e-17b9-4026-bd4d-efec50ae84cb"))).foreach
}

"emit the expected bad rows events" in {
aggregates.bad.size must beEqualTo(nbBad)
}
Expand Down

0 comments on commit b60022b

Please sign in to comment.