diff --git a/integration-tests/enrich-kafka/config/enrichments/cross_navigation_config.json b/integration-tests/enrich-kafka/config/enrichments/cross_navigation_config.json new file mode 100644 index 000000000..4eced189c --- /dev/null +++ b/integration-tests/enrich-kafka/config/enrichments/cross_navigation_config.json @@ -0,0 +1,9 @@ +{ + "schema": "iglu:com.snowplowanalytics.snowplow.enrichments/cross_navigation_config/jsonschema/1-0-0", + + "data": { + "enabled": true, + "vendor": "com.snowplowanalytics.snowplow.enrichments", + "name": "cross_navigation_config" + } +} diff --git a/modules/common-fs2/src/it/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/test/CollectorPayloadGen.scala b/modules/common-fs2/src/it/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/test/CollectorPayloadGen.scala index f8c9157d5..af1d81c1f 100644 --- a/modules/common-fs2/src/it/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/test/CollectorPayloadGen.scala +++ b/modules/common-fs2/src/it/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/test/CollectorPayloadGen.scala @@ -48,7 +48,7 @@ object CollectorPayloadGen { version <- Gen.const("tp2") api = CollectorPayload.Api(vendor, version) - queryString = Nil + queryString = List(new org.apache.http.message.BasicNameValuePair("url", "http://example.com?_sp=6de9024e-17b9-4026-bd4d-efec50ae84cb.1680681134458")) contentType = Some("application/json") diff --git a/modules/kafka/src/it/scala/com/snowplowanalytics/snowplow/enrich/kafka/test/EnrichKafkaSpec.scala b/modules/kafka/src/it/scala/com/snowplowanalytics/snowplow/enrich/kafka/test/EnrichKafkaSpec.scala index ea30c37c6..2ac7fcbbe 100644 --- a/modules/kafka/src/it/scala/com/snowplowanalytics/snowplow/enrich/kafka/test/EnrichKafkaSpec.scala +++ b/modules/kafka/src/it/scala/com/snowplowanalytics/snowplow/enrich/kafka/test/EnrichKafkaSpec.scala @@ -44,8 +44,8 @@ class EnrichKafkaSpec extends Specification with CatsEffect { val enrichedStream = "it-enrich-kinesis-enriched" val badRowsStream = "it-enrich-kinesis-bad" - val nbGood = 100l - val nbBad = 10l + val nbGood = 100L + val nbBad = 10L type AggregateGood = List[Event] type AggregateBad = List[String] @@ -55,12 +55,12 @@ class EnrichKafkaSpec extends Specification with CatsEffect { val bootstrapServers = s"localhost:$kafkaPort" val consumerConf: Map[String, String] = Map( - "group.id" -> "it-enrich", - "auto.offset.reset" -> "earliest", - "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", - "value.deserializer" -> "org.apache.kafka.common.serialization.ByteArrayDeserializer", - "security.protocol" -> "PLAINTEXT", - "sasl.mechanism" -> "GSSAPI" + "group.id" -> "it-enrich", + "auto.offset.reset" -> "earliest", + "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", + "value.deserializer" -> "org.apache.kafka.common.serialization.ByteArrayDeserializer", + "security.protocol" -> "PLAINTEXT", + "sasl.mechanism" -> "GSSAPI" ) val producerConf: Map[String, String] = Map( @@ -71,11 +71,13 @@ class EnrichKafkaSpec extends Specification with CatsEffect { def run(): IO[Aggregates] = { - val resources = Sink.init[IO](OutKafka(collectorPayloadsStream, bootstrapServers, "", Set.empty, producerConf), classOf[SourceAuthHandler].getName) + val resources = + Sink.init[IO](OutKafka(collectorPayloadsStream, bootstrapServers, "", Set.empty, producerConf), classOf[SourceAuthHandler].getName) resources.use { sink => val generate = - CollectorPayloadGen.generate[IO](nbGood, nbBad) + CollectorPayloadGen + .generate[IO](nbGood, nbBad) .evalMap(events => sink(List(events))) .onComplete(fs2.Stream.eval(Logger[IO].info(s"Random data has been generated and sent to $collectorPayloadsStream"))) @@ -83,10 +85,16 @@ class EnrichKafkaSpec extends Specification with CatsEffect { consumeGood(refGood).merge(consumeBad(refBad)) def consumeGood(ref: Ref[IO, AggregateGood]): Stream[IO, Unit] = - Source.init[IO](InKafka(enrichedStream, bootstrapServers, consumerConf), classOf[GoodSinkAuthHandler].getName).map(_.record.value).evalMap(aggregateGood(_, ref)) + Source + .init[IO](InKafka(enrichedStream, bootstrapServers, consumerConf), classOf[GoodSinkAuthHandler].getName) + .map(_.record.value) + .evalMap(aggregateGood(_, ref)) def consumeBad(ref: Ref[IO, AggregateBad]): Stream[IO, Unit] = - Source.init[IO](InKafka(badRowsStream, bootstrapServers, consumerConf), classOf[BadSinkAuthHandler].getName).map(_.record.value).evalMap(aggregateBad(_, ref)) + Source + .init[IO](InKafka(badRowsStream, bootstrapServers, consumerConf), classOf[BadSinkAuthHandler].getName) + .map(_.record.value) + .evalMap(aggregateBad(_, ref)) def aggregateGood(r: Array[Byte], ref: Ref[IO, AggregateGood]): IO[Unit] = for { @@ -94,12 +102,11 @@ class EnrichKafkaSpec extends Specification with CatsEffect { _ <- ref.update(updateAggregateGood(_, e)) } yield () - def aggregateBad(r: Array[Byte], ref: Ref[IO, AggregateBad]): IO[Unit] = { + def aggregateBad(r: Array[Byte], ref: Ref[IO, AggregateBad]): IO[Unit] = for { br <- IO(new String(r)) _ <- ref.update(updateAggregateBad(_, br)) } yield () - } def updateAggregateGood(aggregate: AggregateGood, e: Event): AggregateGood = e :: aggregate @@ -110,13 +117,12 @@ class EnrichKafkaSpec extends Specification with CatsEffect { for { refGood <- Ref.of[IO, AggregateGood](Nil) refBad <- Ref.of[IO, AggregateBad](Nil) - _ <- - generate - .merge(consume(refGood, refBad)) - .interruptAfter(30.seconds) - .attempt - .compile - .drain + _ <- generate + .merge(consume(refGood, refBad)) + .interruptAfter(30.seconds) + .attempt + .compile + .drain aggregateGood <- refGood.get aggregateBad <- refBad.get } yield Aggregates(aggregateGood, aggregateBad) @@ -130,6 +136,12 @@ class EnrichKafkaSpec extends Specification with CatsEffect { aggregates.good.size must beEqualTo(nbGood) } + "contain cross platform context" in { + val crossNavigationExpected = + aggregates.good.map(_.derived_contexts.data.flatMap(_.data.findAllByKey("domain_user_id").flatMap(_.asString.toList))) + crossNavigationExpected must contain(contain(be_===("6de9024e-17b9-4026-bd4d-efec50ae84cb"))).foreach + } + "emit the expected bad rows events" in { aggregates.bad.size must beEqualTo(nbBad) }