Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🚧 it: cross navigation spec #900

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"schema": "iglu:com.snowplowanalytics.snowplow.enrichments/cross_navigation_config/jsonschema/1-0-0",

"data": {
"enabled": true,
"vendor": "com.snowplowanalytics.snowplow.enrichments",
"name": "cross_navigation_config"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ object CollectorPayloadGen {
version <- Gen.const("tp2")
api = CollectorPayload.Api(vendor, version)

queryString = Nil
queryString = List(new org.apache.http.message.BasicNameValuePair("url", "http://example.com?_sp=6de9024e-17b9-4026-bd4d-efec50ae84cb.1680681134458"))

contentType = Some("application/json")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ class EnrichKafkaSpec extends Specification with CatsEffect {
val enrichedStream = "it-enrich-kinesis-enriched"
val badRowsStream = "it-enrich-kinesis-bad"

val nbGood = 100l
val nbBad = 10l
val nbGood = 100L
val nbBad = 10L

type AggregateGood = List[Event]
type AggregateBad = List[String]
Expand All @@ -55,12 +55,12 @@ class EnrichKafkaSpec extends Specification with CatsEffect {
val bootstrapServers = s"localhost:$kafkaPort"

val consumerConf: Map[String, String] = Map(
"group.id" -> "it-enrich",
"auto.offset.reset" -> "earliest",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.ByteArrayDeserializer",
"security.protocol" -> "PLAINTEXT",
"sasl.mechanism" -> "GSSAPI"
"group.id" -> "it-enrich",
"auto.offset.reset" -> "earliest",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.ByteArrayDeserializer",
"security.protocol" -> "PLAINTEXT",
"sasl.mechanism" -> "GSSAPI"
)

val producerConf: Map[String, String] = Map(
Expand All @@ -71,35 +71,42 @@ class EnrichKafkaSpec extends Specification with CatsEffect {

def run(): IO[Aggregates] = {

val resources = Sink.init[IO](OutKafka(collectorPayloadsStream, bootstrapServers, "", Set.empty, producerConf), classOf[SourceAuthHandler].getName)
val resources =
Sink.init[IO](OutKafka(collectorPayloadsStream, bootstrapServers, "", Set.empty, producerConf), classOf[SourceAuthHandler].getName)

resources.use { sink =>
val generate =
CollectorPayloadGen.generate[IO](nbGood, nbBad)
CollectorPayloadGen
.generate[IO](nbGood, nbBad)
.evalMap(events => sink(List(events)))
.onComplete(fs2.Stream.eval(Logger[IO].info(s"Random data has been generated and sent to $collectorPayloadsStream")))

def consume(refGood: Ref[IO, AggregateGood], refBad: Ref[IO, AggregateBad]): Stream[IO, Unit] =
consumeGood(refGood).merge(consumeBad(refBad))

def consumeGood(ref: Ref[IO, AggregateGood]): Stream[IO, Unit] =
Source.init[IO](InKafka(enrichedStream, bootstrapServers, consumerConf), classOf[GoodSinkAuthHandler].getName).map(_.record.value).evalMap(aggregateGood(_, ref))
Source
.init[IO](InKafka(enrichedStream, bootstrapServers, consumerConf), classOf[GoodSinkAuthHandler].getName)
.map(_.record.value)
.evalMap(aggregateGood(_, ref))

def consumeBad(ref: Ref[IO, AggregateBad]): Stream[IO, Unit] =
Source.init[IO](InKafka(badRowsStream, bootstrapServers, consumerConf), classOf[BadSinkAuthHandler].getName).map(_.record.value).evalMap(aggregateBad(_, ref))
Source
.init[IO](InKafka(badRowsStream, bootstrapServers, consumerConf), classOf[BadSinkAuthHandler].getName)
.map(_.record.value)
.evalMap(aggregateBad(_, ref))

def aggregateGood(r: Array[Byte], ref: Ref[IO, AggregateGood]): IO[Unit] =
for {
e <- IO(Event.parse(new String(r)).getOrElse(throw new RuntimeException("can't parse enriched event")))
_ <- ref.update(updateAggregateGood(_, e))
} yield ()

def aggregateBad(r: Array[Byte], ref: Ref[IO, AggregateBad]): IO[Unit] = {
def aggregateBad(r: Array[Byte], ref: Ref[IO, AggregateBad]): IO[Unit] =
for {
br <- IO(new String(r))
_ <- ref.update(updateAggregateBad(_, br))
} yield ()
}

def updateAggregateGood(aggregate: AggregateGood, e: Event): AggregateGood =
e :: aggregate
Expand All @@ -110,13 +117,12 @@ class EnrichKafkaSpec extends Specification with CatsEffect {
for {
refGood <- Ref.of[IO, AggregateGood](Nil)
refBad <- Ref.of[IO, AggregateBad](Nil)
_ <-
generate
.merge(consume(refGood, refBad))
.interruptAfter(30.seconds)
.attempt
.compile
.drain
_ <- generate
.merge(consume(refGood, refBad))
.interruptAfter(30.seconds)
.attempt
.compile
.drain
aggregateGood <- refGood.get
aggregateBad <- refBad.get
} yield Aggregates(aggregateGood, aggregateBad)
Expand All @@ -130,6 +136,12 @@ class EnrichKafkaSpec extends Specification with CatsEffect {
aggregates.good.size must beEqualTo(nbGood)
}

"contain cross platform context" in {
val crossNavigationExpected =
aggregates.good.map(_.derived_contexts.data.flatMap(_.data.findAllByKey("domain_user_id").flatMap(_.asString.toList)))
crossNavigationExpected must contain(contain(be_===("6de9024e-17b9-4026-bd4d-efec50ae84cb"))).foreach
}

"emit the expected bad rows events" in {
aggregates.bad.size must beEqualTo(nbBad)
}
Expand Down
Loading