Skip to content

Commit

Permalink
Add Cross Navigation Enrichment
Browse files Browse the repository at this point in the history
  • Loading branch information
adatzer committed Oct 16, 2023
1 parent b13099e commit f97307d
Show file tree
Hide file tree
Showing 10 changed files with 827 additions and 35 deletions.
9 changes: 9 additions & 0 deletions config/enrichments/cross_navigation_config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"schema": "iglu:com.snowplowanalytics.snowplow.enrichments/cross_navigation_config/jsonschema/1-0-0",

"data": {
"enabled": false,
"vendor": "com.snowplowanalytics.snowplow.enrichments",
"name": "cross_navigation_config"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import com.snowplowanalytics.snowplow.badrows.{FailureDetails, Payload, Processo
import adapters.RawEvent
import enrichments.{EventEnrichments => EE}
import enrichments.{MiscEnrichments => ME}
import enrichments.registry._
import enrichments.registry.{CrossNavigationEnrichment => CNE, _}
import enrichments.registry.apirequest.ApiRequestEnrichment
import enrichments.registry.pii.PiiPseudonymizerEnrichment
import enrichments.registry.sqlquery.SqlQueryEnrichment
Expand Down Expand Up @@ -205,7 +205,7 @@ object EnrichmentManager {
_ <- getRefererUri[F](registry.refererParser) // Potentially set the referrer details and URL components
qsMap <- extractQueryString[F](pageUri, raw.source.encoding) // Parse the page URI's querystring
_ <- setCampaign[F](qsMap, registry.campaignAttribution) // Marketing attribution
_ <- getCrossDomain[F](qsMap) // Cross-domain tracking
_ <- getCrossDomain[F](qsMap, registry.crossNavigation) // Cross-domain tracking
_ <- setEventFingerprint[F](raw.parameters, registry.eventFingerprint) // This enrichment cannot fail
_ <- getCookieContexts // Execute cookie extractor enrichment
_ <- getHttpHeaderContexts // Execute header extractor enrichment
Expand All @@ -232,7 +232,7 @@ object EnrichmentManager {
_ <- getRefererUri[F](registry.refererParser) // Potentially set the referrer details and URL components
qsMap <- extractQueryString[F](pageUri, raw.source.encoding) // Parse the page URI's querystring
_ <- setCampaign[F](qsMap, registry.campaignAttribution) // Marketing attribution
_ <- getCrossDomain[F](qsMap) // Cross-domain tracking
_ <- getCrossDomain[F](qsMap, registry.crossNavigation) // Cross-domain tracking
_ <- setEventFingerprint[F](raw.parameters, registry.eventFingerprint) // This enrichment cannot fail
_ <- getCookieContexts // Execute cookie extractor enrichment
_ <- getHttpHeaderContexts // Execute header extractor enrichment
Expand Down Expand Up @@ -607,18 +607,35 @@ object EnrichmentManager {
}

def getCrossDomain[F[_]: Applicative](
pageQsMap: Option[QueryStringParameters]
pageQsMap: Option[QueryStringParameters],
crossNavEnrichment: Option[CNE]
): EStateT[F, Unit] =
EStateT.fromEither {
case (event, _) =>
pageQsMap match {
case Some(qsMap) =>
val crossDomainParseResult = WPE.parseCrossDomain(qsMap)
for ((maybeRefrDomainUserid, maybeRefrDvceTstamp) <- crossDomainParseResult.toOption) {
maybeRefrDomainUserid.foreach(event.refr_domain_userid = _)
maybeRefrDvceTstamp.foreach(event.refr_dvce_tstamp = _)
}
crossDomainParseResult.bimap(NonEmptyList.one(_), _ => Nil)
WPE
.parseCrossDomain(qsMap)
.bimap(
err =>
crossNavEnrichment match {
case Some(cn) => NonEmptyList.one(cn.addEnrichmentInfo(err))
case None => NonEmptyList.one(err)
},
crossNavMap => {
val crossNavKeys = crossNavMap.keySet
val duidKey = CNE.CrossNavProps(0)
val tstampKey = CNE.CrossNavProps(1)
if (crossNavKeys.contains(duidKey))
crossNavMap(duidKey).foreach(event.refr_domain_userid = _)
if (crossNavKeys.contains(tstampKey))
crossNavMap(tstampKey).foreach(event.refr_dvce_tstamp = _)
crossNavEnrichment match {
case Some(cn) => cn.getCrossNavigationContext(crossNavMap)
case None => Nil
}
}
)
case None => Nil.asRight
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ object EnrichmentRegistry {
registry <- er
} yield registry.copy(weather = enrichment.some)
case c: YauaaConf => er.map(_.copy(yauaa = c.enrichment.some))
case c: CrossNavigationConf => er.map(_.copy(crossNavigation = c.enrichment.some))
}
}

Expand Down Expand Up @@ -226,6 +227,8 @@ object EnrichmentRegistry {
PiiPseudonymizerEnrichment.parse(enrichmentConfig, schemaKey).map(_.some)
case "iab_spiders_and_robots_enrichment" =>
IabEnrichment.parse(enrichmentConfig, schemaKey, localMode).map(_.some)
case "cross_navigation_config" =>
CrossNavigationEnrichment.parse(enrichmentConfig, schemaKey).map(_.some)
case _ =>
Option.empty[EnrichmentConf].validNel // Enrichment is not recognized
}
Expand All @@ -250,5 +253,6 @@ final case class EnrichmentRegistry[F[_]](
uaParser: Option[UaParserEnrichment[F]] = None,
userAgentUtils: Option[UserAgentUtilsEnrichment] = None,
weather: Option[WeatherEnrichment[F]] = None,
yauaa: Option[YauaaEnrichment] = None
yauaa: Option[YauaaEnrichment] = None,
crossNavigation: Option[CrossNavigationEnrichment] = None
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
/**
* Copyright (c) 2023 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.enrich.common.enrichments.registry

import java.time.format.DateTimeFormatter

import cats.data.ValidatedNel
import cats.syntax.either._
import cats.syntax.option._

import io.circe.Json
import io.circe.syntax._

import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey, SchemaVer, SelfDescribingData}
import com.snowplowanalytics.snowplow.badrows.FailureDetails
import com.snowplowanalytics.snowplow.enrich.common.enrichments.{EventEnrichments => EE}
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf.CrossNavigationConf
import com.snowplowanalytics.snowplow.enrich.common.utils.{ConversionUtils => CU}

/**
* Companion object to create an instance of CrossNavigationEnrichment
* from the configuration.
*/
object CrossNavigationEnrichment extends ParseableEnrichment {
val supportedSchema = SchemaCriterion(
"com.snowplowanalytics.snowplow.enrichments",
"cross_navigation_config",
"jsonschema",
1,
0
)

val outputSchema = SchemaKey(
"com.snowplowanalytics.snowplow",
"cross_navigation",
"jsonschema",
SchemaVer.Full(1, 0, 0)
)

val CrossNavProps = List(
"domain_user_id",
"timestamp",
"session_id",
"user_id",
"source_id",
"source_platform",
"reason"
)

/**
* Creates a CrossNavigationConf instance from a Json.
* @param config The cross_navigation_config enrichment JSON
* @param schemaKey provided for the enrichment, must be supported by this enrichment
* @return a CrossNavigation configuration
*/
override def parse(
config: Json,
schemaKey: SchemaKey,
localMode: Boolean = false
): ValidatedNel[String, CrossNavigationConf] =
(for {
_ <- isParseable(config, schemaKey)
} yield CrossNavigationConf(schemaKey)).toValidatedNel

/**
* Wrapper around CU.decodeBase64Url.
* If passed an empty string returns Right(None).
* @param str The string to decode
* @return either the decoded string or enrichment failure
*/
private def decodeWithFailure(str: String): Either[FailureDetails.EnrichmentFailure, Option[String]] =
CU.decodeBase64Url(str) match {
case Right(r) => Option(r).filter(_.trim.nonEmpty).asRight
case Left(msg) =>
FailureDetails
.EnrichmentFailure(
None,
FailureDetails.EnrichmentFailureMessage.Simple(msg)
)
.asLeft
}

/**
* Wrapper around EE.extractTimestamp
* If passed an empty string returns Right(None).
* @param str The string to extract the timestamp from
* @return either the extracted timestamp or enrichment failure
*/
private def extractTstamp(str: String): Either[FailureDetails.EnrichmentFailure, Option[String]] =
str match {
case "" => None.asRight
case s => EE.extractTimestamp("sp_dtm", s).map(_.some)
}

/**
* Converts a timestamp to an ISO-8601 format
*
* @param tstamp The timestamp expected as output of EE.extractTimestamp
* @return ISO-8601 timestamp
*/
private def reformatTstamp(tstamp: Option[String]): Option[String] = {
val pFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
val formatter = DateTimeFormatter.ISO_DATE_TIME

tstamp match {
case Some(t) =>
Some(formatter.format(pFormatter.parse(t)).replaceAll(" ", "T") + "Z")
case None => None
}
}

/**
* Finalizes the cross navigation map by reformatting its timestamp key
*
* @param inputMap A Map of cross navigation properties
* @return The finalized Map
*/
private def finalizeCrossNavigationMap(inputMap: Map[String, Option[String]]): Map[String, Option[String]] =
inputMap
.map({
case ("timestamp", t) => ("timestamp" -> reformatTstamp(t))
case kvPair => kvPair
})

/**
* Parses the QueryString into a Map
* @param sp QueryString
* @return either a map of query string parameters or enrichment failure
*/
def makeCrossDomainMap(sp: String): Either[FailureDetails.EnrichmentFailure, Map[String, Option[String]]] =
sp.split("\\.", -1)
.padTo(
CrossNavProps.size,
""
)
.toList match {
case List(
duidElt,
tstampElt,
sessionIdElt,
userIdElt,
sourceIdElt,
sourcePlatformElt,
reasonElt
) =>
for {
domainUserId <- CU.makeTsvSafe(duidElt).some.asRight
timestamp <- extractTstamp(tstampElt)
sessionId <- Option(sessionIdElt).filter(_.trim.nonEmpty).asRight
userId <- decodeWithFailure(userIdElt)
sourceId <- decodeWithFailure(sourceIdElt)
sourcePlatform <- Option(sourcePlatformElt).filter(_.trim.nonEmpty).asRight
reason <- decodeWithFailure(reasonElt)
} yield (CrossNavProps zip List(
domainUserId,
timestamp,
sessionId,
userId,
sourceId,
sourcePlatform,
reason
)).filterNot(t => t._1 != "timestamp" && t._2 == None).toMap
case _ => Map.empty[String, Option[String]].asRight
}
}

/**
* Enrichment adding cross navigation context
*/
final case class CrossNavigationEnrichment(schemaKey: SchemaKey) extends Enrichment {
private val enrichmentInfo =
FailureDetails.EnrichmentInformation(schemaKey, "cross-navigation").some

/**
* Gets the cross navigation parameters as self-describing JSON.
* @param cnMap The map of cross navigation data
* @return the cross navigation context wrapped in a List
*/
def getCrossNavigationContext(cnMap: Map[String, Option[String]]): List[SelfDescribingData[Json]] =
cnMap match {
case m: Map[String, Option[String]] if m.isEmpty => Nil
case _ =>
List(
SelfDescribingData(
CrossNavigationEnrichment.outputSchema,
CrossNavigationEnrichment.finalizeCrossNavigationMap(cnMap).asJson
)
)
}

/**
* Given an EnrichmentFailure, returns one with the cross-navigation
* enrichment information added.
* @param failure The input enrichment failure
* @return the EnrichmentFailure with cross-navigation enrichment information
*/
def addEnrichmentInfo(failure: FailureDetails.EnrichmentFailure): FailureDetails.EnrichmentFailure =
failure match {
case FailureDetails.EnrichmentFailure(_, msg) => FailureDetails.EnrichmentFailure(enrichmentInfo, msg)
case _ => failure
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -211,4 +211,8 @@ object EnrichmentConf {
) extends EnrichmentConf {
def enrichment: YauaaEnrichment = YauaaEnrichment(cacheSize)
}

final case class CrossNavigationConf(schemaKey: SchemaKey) extends EnrichmentConf {
def enrichment: CrossNavigationEnrichment = CrossNavigationEnrichment(schemaKey)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ package web
import java.net.URI

import cats.syntax.either._
import cats.syntax.option._
import com.snowplowanalytics.snowplow.badrows._

import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.{CrossNavigationEnrichment => CNE}
import com.snowplowanalytics.snowplow.badrows.FailureDetails

import utils.{ConversionUtils => CU}

Expand Down Expand Up @@ -52,19 +53,12 @@ object PageEnrichments {
* @param qsMap The querystring parameters
* @return Validation boxing a pair of optional strings corresponding to the two fields
*/
def parseCrossDomain(qsMap: QueryStringParameters): Either[FailureDetails.EnrichmentFailure, (Option[String], Option[String])] =
def parseCrossDomain(qsMap: QueryStringParameters): Either[FailureDetails.EnrichmentFailure, Map[String, Option[String]]] =
qsMap.toMap
.map { case (k, v) => (k, v.getOrElse("")) }
.get("_sp") match {
case Some("") => (None, None).asRight
case Some(sp) =>
val crossDomainElements = sp.split("\\.")
val duid = CU.makeTsvSafe(crossDomainElements(0)).some
val tstamp = crossDomainElements.lift(1) match {
case Some(spDtm) => EventEnrichments.extractTimestamp("sp_dtm", spDtm).map(_.some)
case None => None.asRight
}
tstamp.map(duid -> _)
case None => (None -> None).asRight
case Some("") => Map.empty[String, Option[String]].asRight
case Some(sp) => CNE.makeCrossDomainMap(sp)
case None => Map.empty[String, Option[String]].asRight
}
}
Loading

0 comments on commit f97307d

Please sign in to comment.