Skip to content

Commit

Permalink
[WIP] Add failure entity for incomplete events
Browse files Browse the repository at this point in the history
With this commit, failure entities are started to be attached to incomplete
events as derived contexts.

There are some cases that we need more information than in the bad rows to create
failure entities therefore it isn't possible to create failure entities from bad rows directly.
For this purpose, we created wrapper classes to attach extra information about failure entities.
  • Loading branch information
spenes committed Mar 27, 2024
1 parent e6df527 commit 22a978b
Show file tree
Hide file tree
Showing 5 changed files with 231 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,17 @@ object AtomicFields {
AtomicFields(withLimits)
}

def errorsToSchemaViolation(errors: NonEmptyList[ValidatorReport]): FailureDetails.SchemaViolation = {
def errorsToSchemaViolation(errors: NonEmptyList[ValidatorReport]): FailureEntity.SchemaViolationWithExtraContext = {
val clientError = ValidationError(ValidatorError.InvalidData(errors), None)

FailureDetails.SchemaViolation.IgluError(
AtomicFields.atomicSchema,
clientError
FailureEntity.SchemaViolationWithExtraContext(
schemaViolation = FailureDetails.SchemaViolation.IgluError(
AtomicFields.atomicSchema,
clientError
),
// Source atomic field and actual value of the field should be already on the ValidatorReport list
source = "atomic_field",
data = None
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import cats.implicits._

import com.snowplowanalytics.iglu.client.validator.ValidatorReport

import com.snowplowanalytics.snowplow.badrows.FailureDetails

import com.snowplowanalytics.snowplow.enrich.common.enrichments.AtomicFields.LimitedAtomicField
import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent

Expand All @@ -37,7 +35,7 @@ object AtomicFieldsLengthValidator {
acceptInvalid: Boolean,
invalidCount: F[Unit],
atomicFields: AtomicFields
): IorT[F, FailureDetails.SchemaViolation, Unit] =
): IorT[F, FailureEntity.SchemaViolationWithExtraContext, Unit] =
IorT {
atomicFields.value
.map(validateField(event, _).toValidatedNel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ import com.snowplowanalytics.snowplow.enrich.common.utils.{IgluUtils, Conversion

object EnrichmentManager {

private type EnrichmentResult[F[_]] = IorT[F, NonEmptyList[FailureEntity.BadRowWithFailureEntities], (EnrichedEvent, List[SelfDescribingData[Json]])]

/**
* Run the enrichment workflow
* @param registry Contain configuration for all enrichments to apply
Expand All @@ -71,8 +73,8 @@ object EnrichmentManager {
atomicFields: AtomicFields,
emitIncomplete: Boolean
): IorT[F, BadRow, EnrichedEvent] = {
val iorT: IorT[F, NonEmptyList[BadRow], EnrichedEvent] = for {
enriched <- IorT.pure[F, NonEmptyList[BadRow]](new EnrichedEvent)
val iorT: EnrichmentResult[F] = for {
enriched <- IorT.pure[F, NonEmptyList[FailureEntity.BadRowWithFailureEntities]](new EnrichedEvent)
extractResult <- mapAndValidateInput(
raw,
enriched,
Expand Down Expand Up @@ -100,11 +102,10 @@ object EnrichmentManager {
)
.leftMap(NonEmptyList.one)
.possiblyExitingEarly(emitIncomplete)
_ <- validateEnriched(
validContexts <- validateEnriched(
enriched,
raw,
enrichmentsContexts,
extractResult.validationInfoContexts,
client,
processor,
registryLookup,
Expand All @@ -114,32 +115,54 @@ object EnrichmentManager {
)
.leftMap(NonEmptyList.one)
.possiblyExitingEarly(emitIncomplete)
} yield enriched
} yield (enriched, validContexts ::: extractResult.validationInfoContexts)

iorT.leftMap(_.head)
// derived contexts are set lastly because we want to include failure entities
// to derived contexts as well and we can get failure entities only in the end
// of the enrichment process
setDerivedContexts(iorT).leftMap(_.head.badRow).map(_._1)
}

private def setDerivedContexts[F[_]: Sync](enriched: EnrichmentResult[F]): EnrichmentResult[F] =
IorT(
enriched.value.flatTap(v => Sync[F].delay {
val (derivedContexts, enriched) = v match {
case Ior.Right((e, l)) => (l, e.some)
case Ior.Left(l) => (extractFailureEntities(l), None)
case Ior.Both(b, (e, l)) => (l ::: extractFailureEntities(b), e.some)
}
for {
c <- ME.formatContexts(derivedContexts)
e <- enriched
_ = e.derived_contexts = c
} yield ()
})
)

private def extractFailureEntities(l: NonEmptyList[FailureEntity.BadRowWithFailureEntities]): List[SelfDescribingData[Json]] =
l.toList.flatMap(_.failureEntities).map(FailureEntity.toSDJ)

private def mapAndValidateInput[F[_]: Sync](
raw: RawEvent,
enrichedEvent: EnrichedEvent,
etlTstamp: DateTime,
processor: Processor,
client: IgluCirceClient[F],
registryLookup: RegistryLookup[F]
): IorT[F, BadRow, IgluUtils.EventExtractResult] = {
): IorT[F, FailureEntity.BadRowWithFailureEntities, IgluUtils.EventExtractResult] = {
val iorT = for {
_ <- setupEnrichedEvent[F](raw, enrichedEvent, etlTstamp, processor)
.leftMap(NonEmptyList.one)
extract <- IgluUtils.extractAndValidateInputJsons(enrichedEvent, client, registryLookup)
} yield extract

iorT.leftMap { violations =>
buildSchemaViolationsBadRow(
violations,
EnrichedEvent.toPartiallyEnrichedEvent(enrichedEvent),
RawEvent.toRawEvent(raw),
processor
)
buildSchemaViolationsBadRow(
violations,
EnrichedEvent.toPartiallyEnrichedEvent(enrichedEvent),
RawEvent.toRawEvent(raw),
processor
)
}
}

Expand All @@ -157,7 +180,7 @@ object EnrichmentManager {
inputContexts: List[SelfDescribingData[Json]],
unstructEvent: Option[SelfDescribingData[Json]],
legacyOrder: Boolean
): IorT[F, BadRow, List[SelfDescribingData[Json]]] =
): IorT[F, FailureEntity.BadRowWithFailureEntities, List[SelfDescribingData[Json]]] =
IorT {
accState(registry, raw, inputContexts, unstructEvent, legacyOrder)
.runS(Accumulation(enriched, Nil, Nil))
Expand All @@ -184,21 +207,19 @@ object EnrichmentManager {
enriched: EnrichedEvent,
raw: RawEvent,
enrichmentsContexts: List[SelfDescribingData[Json]],
validationInfoContexts: List[SelfDescribingData[Json]],
client: IgluCirceClient[F],
processor: Processor,
registryLookup: RegistryLookup[F],
acceptInvalid: Boolean,
invalidCount: F[Unit],
atomicFields: AtomicFields
): IorT[F, BadRow, Unit] = {
): IorT[F, FailureEntity.BadRowWithFailureEntities, List[SelfDescribingData[Json]]] = {
val iorT = for {
validContexts <- IgluUtils.validateEnrichmentsContexts[F](client, enrichmentsContexts, registryLookup)
_ = ME.formatContexts(validContexts ::: validationInfoContexts).foreach(enriched.derived_contexts = _)
_ <- AtomicFieldsLengthValidator
.validate[F](enriched, acceptInvalid, invalidCount, atomicFields)
.leftMap(NonEmptyList.one)
} yield ()
} yield validContexts

iorT.leftMap { violations =>
buildSchemaViolationsBadRow(
Expand Down Expand Up @@ -336,7 +357,7 @@ object EnrichmentManager {
e: EnrichedEvent,
etlTstamp: DateTime,
processor: Processor
): IorT[F, FailureDetails.SchemaViolation, Unit] =
): IorT[F, FailureEntity.SchemaViolationWithExtraContext, Unit] =
IorT {
Sync[F].delay {
e.event_id = EE.generateEventId() // May be updated later if we have an `eid` parameter
Expand Down Expand Up @@ -848,28 +869,41 @@ object EnrichmentManager {
}

private def buildSchemaViolationsBadRow(
vs: NonEmptyList[FailureDetails.SchemaViolation],
vs: NonEmptyList[FailureEntity.SchemaViolationWithExtraContext],
pee: Payload.PartiallyEnrichedEvent,
re: Payload.RawEvent,
processor: Processor
): BadRow.SchemaViolations =
BadRow.SchemaViolations(
processor,
Failure.SchemaViolations(Instant.now(), vs),
Payload.EnrichmentPayload(pee, re)
): FailureEntity.BadRowWithFailureEntities = {
val now = Instant.now()
val failureEntities = vs.map(v => FailureEntity.fromSchemaViolation(v, now, processor))
FailureEntity.BadRowWithFailureEntities(
badRow = BadRow.SchemaViolations(
processor,
Failure.SchemaViolations(now, vs.map(_.schemaViolation)),
Payload.EnrichmentPayload(pee, re)
),
failureEntities = failureEntities.toList
)
}

private def buildEnrichmentFailuresBadRow(
fs: NonEmptyList[FailureDetails.EnrichmentFailure],
pee: Payload.PartiallyEnrichedEvent,
re: Payload.RawEvent,
processor: Processor
) =
BadRow.EnrichmentFailures(
processor,
Failure.EnrichmentFailures(Instant.now(), fs),
Payload.EnrichmentPayload(pee, re)
): FailureEntity.BadRowWithFailureEntities = {
val now = Instant.now()
// TODO: Fill it correctly
val failureEntities = Nil
FailureEntity.BadRowWithFailureEntities(
badRow = BadRow.EnrichmentFailures(
processor,
Failure.EnrichmentFailures(now, fs),
Payload.EnrichmentPayload(pee, re)
),
failureEntities = failureEntities
)
}

private implicit class IorTOps[F[_], A, B](val iorT: IorT[F, A, B]) extends AnyVal {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright (c) 2024-present Snowplow Analytics Ltd.
* All rights reserved.
*
* This software is made available by Snowplow Analytics, Ltd.,
* under the terms of the Snowplow Limited Use License Agreement, Version 1.0
* located at https://docs.snowplow.io/limited-use-license-1.0
* BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION
* OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT.
*/
package com.snowplowanalytics.snowplow.enrich.common.enrichments

import java.time.Instant

import io.circe.{Encoder, Json}
import io.circe.generic.semiauto._
import io.circe.syntax._

import com.snowplowanalytics.snowplow.badrows._

import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData}

/**
* Represents a failure encountered during enrichment of the event.
* Failure entities will be attached to incomplete events as derived contexts.
*/
case class FailureEntity(
failureType: String,
errors: List[Json],
schema: Option[String],
data: Option[String],
timestamp: Instant,
componentName: String,
componentVersion: String
)

object FailureEntity {

val schemaKey = SchemaKey("com.snowplowanalytics.snowplow", "failure", "jsonschema", SchemaVer.Full(1, 0, 0))

case class BadRowWithFailureEntities(
badRow: BadRow,
failureEntities: List[FailureEntity]
)

/**
* Wrapper for schema violation failure that stores extra information about the failure.
* These extra information will be used while creating the failure entities that will be
* attached to incomplete events.
*/
case class SchemaViolationWithExtraContext(
schemaViolation: FailureDetails.SchemaViolation,
source: String,
data: Option[String]
)

def toSDJ(failure: FailureEntity): SelfDescribingData[Json] =
SelfDescribingData(
schemaKey,
failure.asJson
)

implicit val instantEncoder: Encoder[Instant] = Encoder.encodeString.contramap[Instant](_.toString)
implicit val encoder: Encoder[FailureEntity] = deriveEncoder[FailureEntity]

def fromSchemaViolation(
v: SchemaViolationWithExtraContext,
timestamp: Instant,
processor: Processor
): FailureEntity =
v.schemaViolation match {
case f: FailureDetails.SchemaViolation.NotJson =>
FailureEntity(
failureType = "NotJson",
errors = List(Json.obj("message" -> f.error.asJson, "source" -> v.source.asJson)),
schema = None,
data = v.data,
timestamp = timestamp,
componentName = processor.artifact,
componentVersion = processor.version
)
case f: FailureDetails.SchemaViolation.NotIglu =>
val message = f.error.message("").split(":").headOption
FailureEntity(
failureType = "NotIglu",
errors = List(Json.obj("message" -> message.asJson, "source" -> v.source.asJson)),
schema = None,
data = v.data,
timestamp = timestamp,
componentName = processor.artifact,
componentVersion = processor.version
)
// TODO: Implement remaining cases
case _ => throw new Exception("")
}
}
Loading

0 comments on commit 22a978b

Please sign in to comment.