Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Oct 30, 2023
1 parent f97307d commit 7e0c964
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 171 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ object EnrichmentManager {
case (event, _) =>
pageQsMap match {
case Some(qsMap) =>
WPE
CNE
.parseCrossDomain(qsMap)
.bimap(
err =>
Expand All @@ -623,15 +623,10 @@ object EnrichmentManager {
case None => NonEmptyList.one(err)
},
crossNavMap => {
val crossNavKeys = crossNavMap.keySet
val duidKey = CNE.CrossNavProps(0)
val tstampKey = CNE.CrossNavProps(1)
if (crossNavKeys.contains(duidKey))
crossNavMap(duidKey).foreach(event.refr_domain_userid = _)
if (crossNavKeys.contains(tstampKey))
crossNavMap(tstampKey).foreach(event.refr_dvce_tstamp = _)
crossNavMap.duid.foreach(event.refr_domain_userid = _)
crossNavMap.tstamp.foreach(event.refr_dvce_tstamp = _)
crossNavEnrichment match {
case Some(cn) => cn.getCrossNavigationContext(crossNavMap)
case Some(_) => crossNavMap.getCrossNavigationContext
case None => Nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import java.time.format.DateTimeFormatter
import cats.data.ValidatedNel
import cats.syntax.either._
import cats.syntax.option._
import cats.syntax.traverse._

import io.circe.Json
import io.circe.syntax._
Expand All @@ -26,12 +27,16 @@ import com.snowplowanalytics.snowplow.badrows.FailureDetails
import com.snowplowanalytics.snowplow.enrich.common.enrichments.{EventEnrichments => EE}
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf.CrossNavigationConf
import com.snowplowanalytics.snowplow.enrich.common.utils.{ConversionUtils => CU}
import com.snowplowanalytics.snowplow.enrich.common.QueryStringParameters

/**
* Companion object to create an instance of CrossNavigationEnrichment
* from the configuration.
*/
object CrossNavigationEnrichment extends ParseableEnrichment {

type CrossNavTransformation = String => Either[FailureDetails.EnrichmentFailure, Option[String]]

val supportedSchema = SchemaCriterion(
"com.snowplowanalytics.snowplow.enrichments",
"cross_navigation_config",
Expand All @@ -47,16 +52,6 @@ object CrossNavigationEnrichment extends ParseableEnrichment {
SchemaVer.Full(1, 0, 0)
)

val CrossNavProps = List(
"domain_user_id",
"timestamp",
"session_id",
"user_id",
"source_id",
"source_platform",
"reason"
)

/**
* Creates a CrossNavigationConf instance from a Json.
* @param config The cross_navigation_config enrichment JSON
Expand All @@ -73,105 +68,139 @@ object CrossNavigationEnrichment extends ParseableEnrichment {
} yield CrossNavigationConf(schemaKey)).toValidatedNel

/**
* Wrapper around CU.decodeBase64Url.
* If passed an empty string returns Right(None).
* @param str The string to decode
* @return either the decoded string or enrichment failure
*/
private def decodeWithFailure(str: String): Either[FailureDetails.EnrichmentFailure, Option[String]] =
CU.decodeBase64Url(str) match {
case Right(r) => Option(r).filter(_.trim.nonEmpty).asRight
case Left(msg) =>
FailureDetails
.EnrichmentFailure(
None,
FailureDetails.EnrichmentFailureMessage.Simple(msg)
)
.asLeft
}

/**
* Wrapper around EE.extractTimestamp
* If passed an empty string returns Right(None).
* @param str The string to extract the timestamp from
* @return either the extracted timestamp or enrichment failure
*/
private def extractTstamp(str: String): Either[FailureDetails.EnrichmentFailure, Option[String]] =
str match {
case "" => None.asRight
case s => EE.extractTimestamp("sp_dtm", s).map(_.some)
}

/**
* Converts a timestamp to an ISO-8601 format
* Extract the referrer domain user ID and timestamp from the "_sp={{DUID}}.{{TSTAMP}}"
* portion of the querystring
*
* @param tstamp The timestamp expected as output of EE.extractTimestamp
* @return ISO-8601 timestamp
* @param qsMap The querystring parameters
* @return Validation boxing a pair of optional strings corresponding to the two fields
*/
private def reformatTstamp(tstamp: Option[String]): Option[String] = {
val pFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
val formatter = DateTimeFormatter.ISO_DATE_TIME

tstamp match {
case Some(t) =>
Some(formatter.format(pFormatter.parse(t)).replaceAll(" ", "T") + "Z")
case None => None
def parseCrossDomain(qsMap: QueryStringParameters): Either[FailureDetails.EnrichmentFailure, CrossDomainMap] =
qsMap.toMap
.map { case (k, v) => (k, v.getOrElse("")) }
.get("_sp") match {
case Some("") => CrossDomainMap.empty.asRight
case Some(sp) => CrossDomainMap.makeCrossDomainMap(sp)
case None => CrossDomainMap.empty.asRight
}
}

/**
* Finalizes the cross navigation map by reformatting its timestamp key
*
* @param inputMap A Map of cross navigation properties
* @return The finalized Map
*/
private def finalizeCrossNavigationMap(inputMap: Map[String, Option[String]]): Map[String, Option[String]] =
inputMap
.map({
case ("timestamp", t) => ("timestamp" -> reformatTstamp(t))
case kvPair => kvPair
})
case class CrossDomainMap(domainMap: Map[String, Option[String]]) {

/**
* Gets the cross navigation parameters as self-describing JSON.
*
* @param cnMap The map of cross navigation data
* @return the cross navigation context wrapped in a List
*/
def getCrossNavigationContext: List[SelfDescribingData[Json]] =
domainMap match {
case m: Map[String, Option[String]] if m.isEmpty => Nil
case _ =>
List(
SelfDescribingData(
CrossNavigationEnrichment.outputSchema,
finalizeCrossNavigationMap.asJson
)
)
}

def duid: Option[String] = domainMap.get(CrossDomainMap.domainUserIdFieldName).flatten

def tstamp: Option[String] = domainMap.get(CrossDomainMap.timestampFieldName).flatten

/**
* Finalizes the cross navigation map by reformatting its timestamp key
*
* @param inputMap A Map of cross navigation properties
* @return The finalized Map
*/
private def finalizeCrossNavigationMap: Map[String, Option[String]] =
domainMap
.map {
case ("timestamp", t) => ("timestamp" -> CrossDomainMap.reformatTstamp(t))
case kvPair => kvPair
}
}

/**
* Parses the QueryString into a Map
* @param sp QueryString
* @return either a map of query string parameters or enrichment failure
*/
def makeCrossDomainMap(sp: String): Either[FailureDetails.EnrichmentFailure, Map[String, Option[String]]] =
sp.split("\\.", -1)
.padTo(
CrossNavProps.size,
""
object CrossDomainMap {
val domainUserIdFieldName = "domain_user_id"
val timestampFieldName = "timestamp"
val CrossNavProps: List[(String, CrossNavTransformation)] =
List(
(domainUserIdFieldName, CU.makeTsvSafe(_).some.asRight),
(timestampFieldName, extractTstamp),
("session_id", Option(_: String).filter(_.trim.nonEmpty).asRight),
("user_id", decodeWithFailure),
("source_id", decodeWithFailure),
("source_platform", Option(_: String).filter(_.trim.nonEmpty).asRight),
("reason", decodeWithFailure)
)
.toList match {
case List(
duidElt,
tstampElt,
sessionIdElt,
userIdElt,
sourceIdElt,
sourcePlatformElt,
reasonElt
) =>
for {
domainUserId <- CU.makeTsvSafe(duidElt).some.asRight
timestamp <- extractTstamp(tstampElt)
sessionId <- Option(sessionIdElt).filter(_.trim.nonEmpty).asRight
userId <- decodeWithFailure(userIdElt)
sourceId <- decodeWithFailure(sourceIdElt)
sourcePlatform <- Option(sourcePlatformElt).filter(_.trim.nonEmpty).asRight
reason <- decodeWithFailure(reasonElt)
} yield (CrossNavProps zip List(
domainUserId,
timestamp,
sessionId,
userId,
sourceId,
sourcePlatform,
reason
)).filterNot(t => t._1 != "timestamp" && t._2 == None).toMap
case _ => Map.empty[String, Option[String]].asRight

/**
* Parses the QueryString into a Map
* @param sp QueryString
* @return either a map of query string parameters or enrichment failure
*/
def makeCrossDomainMap(sp: String): Either[FailureDetails.EnrichmentFailure, CrossDomainMap] = {
val values = sp.split("\\.", -1)
.padTo(
CrossNavProps.size,
""
).toList
val result = if (values.size == CrossNavProps.size)
values.zip(CrossNavProps).map {
case (value, (propName, f)) => f(value).map(propName -> _)
}.sequence
.map(_.filterNot { case (key, value) => key != timestampFieldName && value.isEmpty }.toMap)
else Map.empty[String, Option[String]].asRight
result.map(CrossDomainMap(_))
}

def empty: CrossDomainMap = CrossDomainMap(Map.empty)

/**
* Wrapper around CU.decodeBase64Url.
* If passed an empty string returns Right(None).
*
* @param str The string to decode
* @return either the decoded string or enrichment failure
*/
private def decodeWithFailure(str: String): Either[FailureDetails.EnrichmentFailure, Option[String]] =
CU.decodeBase64Url(str) match {
case Right(r) => Option(r).filter(_.trim.nonEmpty).asRight
case Left(msg) =>
FailureDetails
.EnrichmentFailure(
None,
FailureDetails.EnrichmentFailureMessage.Simple(msg)
)
.asLeft
}

/**
* Wrapper around EE.extractTimestamp
* If passed an empty string returns Right(None).
*
* @param str The string to extract the timestamp from
* @return either the extracted timestamp or enrichment failure
*/
private def extractTstamp(str: String): Either[FailureDetails.EnrichmentFailure, Option[String]] =
str match {
case "" => None.asRight
case s => EE.extractTimestamp("sp_dtm", s).map(_.some)
}

/**
* Converts a timestamp to an ISO-8601 format
*
* @param tstamp The timestamp expected as output of EE.extractTimestamp
* @return ISO-8601 timestamp
*/
private def reformatTstamp(tstamp: Option[String]): Option[String] = {
val pFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
val formatter = DateTimeFormatter.ISO_DATE_TIME
tstamp.map(t => formatter.format(pFormatter.parse(t)).replaceAll(" ", "T") + "Z")
}
}
}

/**
Expand All @@ -181,33 +210,13 @@ final case class CrossNavigationEnrichment(schemaKey: SchemaKey) extends Enrichm
private val enrichmentInfo =
FailureDetails.EnrichmentInformation(schemaKey, "cross-navigation").some

/**
* Gets the cross navigation parameters as self-describing JSON.
* @param cnMap The map of cross navigation data
* @return the cross navigation context wrapped in a List
*/
def getCrossNavigationContext(cnMap: Map[String, Option[String]]): List[SelfDescribingData[Json]] =
cnMap match {
case m: Map[String, Option[String]] if m.isEmpty => Nil
case _ =>
List(
SelfDescribingData(
CrossNavigationEnrichment.outputSchema,
CrossNavigationEnrichment.finalizeCrossNavigationMap(cnMap).asJson
)
)
}

/**
* Given an EnrichmentFailure, returns one with the cross-navigation
* enrichment information added.
* @param failure The input enrichment failure
* @return the EnrichmentFailure with cross-navigation enrichment information
*/
def addEnrichmentInfo(failure: FailureDetails.EnrichmentFailure): FailureDetails.EnrichmentFailure =
failure match {
case FailureDetails.EnrichmentFailure(_, msg) => FailureDetails.EnrichmentFailure(enrichmentInfo, msg)
case _ => failure
}
failure.copy(enrichment = enrichmentInfo)

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import java.net.URI

import cats.syntax.either._

import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.{CrossNavigationEnrichment => CNE}
import com.snowplowanalytics.snowplow.badrows.FailureDetails

import utils.{ConversionUtils => CU}
Expand Down Expand Up @@ -46,19 +45,4 @@ object PageEnrichments {
FailureDetails.EnrichmentFailureMessage.Simple(f)
)
)

/**
* Extract the referrer domain user ID and timestamp from the "_sp={{DUID}}.{{TSTAMP}}"
* portion of the querystring
* @param qsMap The querystring parameters
* @return Validation boxing a pair of optional strings corresponding to the two fields
*/
def parseCrossDomain(qsMap: QueryStringParameters): Either[FailureDetails.EnrichmentFailure, Map[String, Option[String]]] =
qsMap.toMap
.map { case (k, v) => (k, v.getOrElse("")) }
.get("_sp") match {
case Some("") => Map.empty[String, Option[String]].asRight
case Some(sp) => CNE.makeCrossDomainMap(sp)
case None => Map.empty[String, Option[String]].asRight
}
}
Loading

0 comments on commit 7e0c964

Please sign in to comment.