Skip to content

Commit

Permalink
Address Ben's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Mar 29, 2024
1 parent 231d5f2 commit 36fc8fd
Show file tree
Hide file tree
Showing 6 changed files with 321 additions and 237 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ package com.snowplowanalytics.snowplow.enrich.common.enrichments

import cats.data.NonEmptyList

import io.circe.Json
import io.circe.syntax._

import com.snowplowanalytics.snowplow.badrows.FailureDetails

import com.snowplowanalytics.iglu.client.ClientError.ValidationError
Expand Down Expand Up @@ -133,17 +136,19 @@ object AtomicFields {
AtomicFields(withLimits)
}

def errorsToSchemaViolation(errors: NonEmptyList[ValidatorReport]): FailureEntity.SchemaViolationWithExtraContext = {
def errorsToSchemaViolation(errors: NonEmptyList[ValidatorReport]): FailureEntity.SchemaViolation = {
val clientError = ValidationError(ValidatorError.InvalidData(errors), None)

FailureEntity.SchemaViolationWithExtraContext(
val failureData = Json.obj(errors.toList.flatMap(e => e.path.map(p => p := e.keyword)): _*)

FailureEntity.SchemaViolation(
schemaViolation = FailureDetails.SchemaViolation.IgluError(
AtomicFields.atomicSchema,
clientError
),
// Source atomic field and actual value of the field should be already on the ValidatorReport list
source = "atomic_field",
data = None
data = failureData
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ object AtomicFieldsLengthValidator {
acceptInvalid: Boolean,
invalidCount: F[Unit],
atomicFields: AtomicFields
): IorT[F, FailureEntity.SchemaViolationWithExtraContext, Unit] =
): IorT[F, FailureEntity.SchemaViolation, Unit] =
IorT {
atomicFields.value
.map(validateField(event, _).toValidatedNel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,16 @@ import com.snowplowanalytics.snowplow.enrich.common.utils.{IgluUtils, Conversion
object EnrichmentManager {

private type EnrichmentResult[F[_]] =
IorT[F, NonEmptyList[FailureEntity.BadRowWithFailureEntities], (EnrichedEvent, List[SelfDescribingData[Json]])]
IorT[F, NonEmptyList[IntermediateBadRow], (EnrichedEvent, List[SelfDescribingData[Json]])]

// We need this intermediate representation because we have to create partially enriched event
// right after an enrichment/validation step completed. If we don't do it like that and
// create partially enriched event in the end instead, we might get partially enriched event
// updated in the later steps.
private case class IntermediateBadRow(
failureEntities: NonEmptyList[FailureEntity],
partiallyEnrichedEvent: Payload.PartiallyEnrichedEvent
)

/**
* Run the enrichment workflow
Expand Down Expand Up @@ -75,7 +84,7 @@ object EnrichmentManager {
emitIncomplete: Boolean
): IorT[F, BadRow, EnrichedEvent] = {
val iorT: EnrichmentResult[F] = for {
enriched <- IorT.pure[F, NonEmptyList[FailureEntity.BadRowWithFailureEntities]](new EnrichedEvent)
enriched <- IorT.pure[F, NonEmptyList[IntermediateBadRow]](new EnrichedEvent)
extractResult <- mapAndValidateInput(
raw,
enriched,
Expand All @@ -94,7 +103,6 @@ object EnrichmentManager {
}
enrichmentsContexts <- runEnrichments(
registry,
processor,
raw,
enriched,
extractResult.contexts,
Expand All @@ -105,10 +113,8 @@ object EnrichmentManager {
.possiblyExitingEarly(emitIncomplete)
validContexts <- validateEnriched(
enriched,
raw,
enrichmentsContexts,
client,
processor,
registryLookup,
featureFlags.acceptInvalid,
invalidCount,
Expand All @@ -121,17 +127,36 @@ object EnrichmentManager {
// derived contexts are set lastly because we want to include failure entities
// to derived contexts as well and we can get failure entities only in the end
// of the enrichment process
setDerivedContexts(iorT).leftMap(_.head.badRow).map(_._1)
setDerivedContexts(iorT, processor)
.leftMap(createBadRow(_, RawEvent.toRawEvent(raw), processor))
.map(_._1)
}

private def setDerivedContexts[F[_]: Sync](enriched: EnrichmentResult[F]): EnrichmentResult[F] =
private def createBadRow(
fe: NonEmptyList[IntermediateBadRow],
re: Payload.RawEvent,
processor: Processor
): BadRow = {
val intermediateBadRow = fe.head
intermediateBadRow.failureEntities.head match {
case h: FailureEntity.SchemaViolation =>
val sv = intermediateBadRow.failureEntities.tail.collect { case f: FailureEntity.SchemaViolation => f }
buildSchemaViolationsBadRow(NonEmptyList(h, sv), intermediateBadRow.partiallyEnrichedEvent, re, processor)
case h: FailureEntity.EnrichmentFailure =>
val ef = intermediateBadRow.failureEntities.tail.collect { case f: FailureEntity.EnrichmentFailure => f }
buildEnrichmentFailuresBadRow(NonEmptyList(h, ef), intermediateBadRow.partiallyEnrichedEvent, re, processor)
}
}

private def setDerivedContexts[F[_]: Sync](enriched: EnrichmentResult[F], processor: Processor): EnrichmentResult[F] =
IorT(
enriched.value.flatTap(v =>
Sync[F].delay {
val now = Instant.now()
val (derivedContexts, enriched) = v match {
case Ior.Right((e, l)) => (l, e.some)
case Ior.Left(l) => (extractFailureEntities(l), None)
case Ior.Both(b, (e, l)) => (l ::: extractFailureEntities(b), e.some)
case Ior.Left(l) => (convertFailureEntitiesToSDJ(l, now, processor), None)
case Ior.Both(b, (e, l)) => (l ::: convertFailureEntitiesToSDJ(b, now, processor), e.some)
}
for {
c <- ME.formatContexts(derivedContexts)
Expand All @@ -142,8 +167,12 @@ object EnrichmentManager {
)
)

private def extractFailureEntities(l: NonEmptyList[FailureEntity.BadRowWithFailureEntities]): List[SelfDescribingData[Json]] =
l.toList.flatMap(_.failureEntities).map(FailureEntity.toSDJ)
private def convertFailureEntitiesToSDJ(
l: NonEmptyList[IntermediateBadRow],
timestamp: Instant,
processor: Processor
): List[SelfDescribingData[Json]] =
l.flatMap(_.failureEntities).map(FailureEntity.toSDJ(_, timestamp, processor)).toList

private def mapAndValidateInput[F[_]: Sync](
raw: RawEvent,
Expand All @@ -152,21 +181,16 @@ object EnrichmentManager {
processor: Processor,
client: IgluCirceClient[F],
registryLookup: RegistryLookup[F]
): IorT[F, FailureEntity.BadRowWithFailureEntities, IgluUtils.EventExtractResult] = {
): IorT[F, IntermediateBadRow, IgluUtils.EventExtractResult] = {
val iorT = for {
_ <- setupEnrichedEvent[F](raw, enrichedEvent, etlTstamp, processor)
.leftMap(NonEmptyList.one)
extract <- IgluUtils.extractAndValidateInputJsons(enrichedEvent, client, registryLookup)
extract <- IgluUtils
.extractAndValidateInputJsons(enrichedEvent, client, registryLookup)
.leftMap { l: NonEmptyList[FailureEntity] => l }
} yield extract

iorT.leftMap { violations =>
buildSchemaViolationsBadRow(
violations,
EnrichedEvent.toPartiallyEnrichedEvent(enrichedEvent),
RawEvent.toRawEvent(raw),
processor
)
}
iorT.leftMap(v => IntermediateBadRow(v, EnrichedEvent.toPartiallyEnrichedEvent(enrichedEvent)))
}

/**
Expand All @@ -177,13 +201,12 @@ object EnrichmentManager {
*/
private def runEnrichments[F[_]: Monad](
registry: EnrichmentRegistry[F],
processor: Processor,
raw: RawEvent,
enriched: EnrichedEvent,
inputContexts: List[SelfDescribingData[Json]],
unstructEvent: Option[SelfDescribingData[Json]],
legacyOrder: Boolean
): IorT[F, FailureEntity.BadRowWithFailureEntities, List[SelfDescribingData[Json]]] =
): IorT[F, IntermediateBadRow, List[SelfDescribingData[Json]]] =
IorT {
accState(registry, raw, inputContexts, unstructEvent, legacyOrder)
.runS(Accumulation(enriched, Nil, Nil))
Expand All @@ -192,12 +215,7 @@ object EnrichmentManager {
failures.toNel match {
case Some(nel) =>
Ior.both(
buildEnrichmentFailuresBadRow(
nel,
EnrichedEvent.toPartiallyEnrichedEvent(enriched),
RawEvent.toRawEvent(raw),
processor
),
IntermediateBadRow(nel.map(FailureEntity.EnrichmentFailure), EnrichedEvent.toPartiallyEnrichedEvent(enriched)),
contexts
)
case None =>
Expand All @@ -208,30 +226,21 @@ object EnrichmentManager {

private def validateEnriched[F[_]: Clock: Monad](
enriched: EnrichedEvent,
raw: RawEvent,
enrichmentsContexts: List[SelfDescribingData[Json]],
client: IgluCirceClient[F],
processor: Processor,
registryLookup: RegistryLookup[F],
acceptInvalid: Boolean,
invalidCount: F[Unit],
atomicFields: AtomicFields
): IorT[F, FailureEntity.BadRowWithFailureEntities, List[SelfDescribingData[Json]]] = {
): IorT[F, IntermediateBadRow, List[SelfDescribingData[Json]]] = {
val iorT = for {
validContexts <- IgluUtils.validateEnrichmentsContexts[F](client, enrichmentsContexts, registryLookup)
_ <- AtomicFieldsLengthValidator
.validate[F](enriched, acceptInvalid, invalidCount, atomicFields)
.leftMap(NonEmptyList.one)
.leftMap { v: FailureEntity => NonEmptyList.one(v) }
} yield validContexts

iorT.leftMap { violations =>
buildSchemaViolationsBadRow(
violations,
EnrichedEvent.toPartiallyEnrichedEvent(enriched),
RawEvent.toRawEvent(raw),
processor
)
}
iorT.leftMap(v => IntermediateBadRow(v, EnrichedEvent.toPartiallyEnrichedEvent(enriched)))
}

private[enrichments] case class Accumulation(
Expand Down Expand Up @@ -360,7 +369,7 @@ object EnrichmentManager {
e: EnrichedEvent,
etlTstamp: DateTime,
processor: Processor
): IorT[F, FailureEntity.SchemaViolationWithExtraContext, Unit] =
): IorT[F, FailureEntity.SchemaViolation, Unit] =
IorT {
Sync[F].delay {
e.event_id = EE.generateEventId() // May be updated later if we have an `eid` parameter
Expand Down Expand Up @@ -872,38 +881,30 @@ object EnrichmentManager {
}

private def buildSchemaViolationsBadRow(
vs: NonEmptyList[FailureEntity.SchemaViolationWithExtraContext],
fe: NonEmptyList[FailureEntity.SchemaViolation],
pee: Payload.PartiallyEnrichedEvent,
re: Payload.RawEvent,
processor: Processor
): FailureEntity.BadRowWithFailureEntities = {
): BadRow = {
val now = Instant.now()
val failureEntities = vs.toList.map(v => FailureEntity.fromSchemaViolation(v, now, processor))
FailureEntity.BadRowWithFailureEntities(
badRow = BadRow.SchemaViolations(
processor,
Failure.SchemaViolations(now, vs.map(_.schemaViolation)),
Payload.EnrichmentPayload(pee, re)
),
failureEntities = failureEntities
BadRow.SchemaViolations(
processor,
Failure.SchemaViolations(now, fe.map(_.schemaViolation)),
Payload.EnrichmentPayload(pee, re)
)
}

private def buildEnrichmentFailuresBadRow(
fs: NonEmptyList[FailureDetails.EnrichmentFailure],
fe: NonEmptyList[FailureEntity.EnrichmentFailure],
pee: Payload.PartiallyEnrichedEvent,
re: Payload.RawEvent,
processor: Processor
): FailureEntity.BadRowWithFailureEntities = {
): BadRow = {
val now = Instant.now()
val failureEntities = fs.toList.flatMap(v => FailureEntity.fromEnrichmentFailure(v, now, processor))
FailureEntity.BadRowWithFailureEntities(
badRow = BadRow.EnrichmentFailures(
processor,
Failure.EnrichmentFailures(now, fs),
Payload.EnrichmentPayload(pee, re)
),
failureEntities = failureEntities
BadRow.EnrichmentFailures(
processor,
Failure.EnrichmentFailures(now, fe.map(_.enrichmentFailure)),
Payload.EnrichmentPayload(pee, re)
)
}

Expand Down
Loading

0 comments on commit 36fc8fd

Please sign in to comment.