Skip to content

Commit

Permalink
Failure entities for EnrichmentFailure
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Mar 29, 2024
1 parent 22a978b commit 231d5f2
Show file tree
Hide file tree
Showing 4 changed files with 220 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ import com.snowplowanalytics.snowplow.enrich.common.utils.{IgluUtils, Conversion

object EnrichmentManager {

private type EnrichmentResult[F[_]] = IorT[F, NonEmptyList[FailureEntity.BadRowWithFailureEntities], (EnrichedEvent, List[SelfDescribingData[Json]])]
private type EnrichmentResult[F[_]] =
IorT[F, NonEmptyList[FailureEntity.BadRowWithFailureEntities], (EnrichedEvent, List[SelfDescribingData[Json]])]

/**
* Run the enrichment workflow
Expand Down Expand Up @@ -103,18 +104,18 @@ object EnrichmentManager {
.leftMap(NonEmptyList.one)
.possiblyExitingEarly(emitIncomplete)
validContexts <- validateEnriched(
enriched,
raw,
enrichmentsContexts,
client,
processor,
registryLookup,
featureFlags.acceptInvalid,
invalidCount,
atomicFields
)
.leftMap(NonEmptyList.one)
.possiblyExitingEarly(emitIncomplete)
enriched,
raw,
enrichmentsContexts,
client,
processor,
registryLookup,
featureFlags.acceptInvalid,
invalidCount,
atomicFields
)
.leftMap(NonEmptyList.one)
.possiblyExitingEarly(emitIncomplete)
} yield (enriched, validContexts ::: extractResult.validationInfoContexts)

// derived contexts are set lastly because we want to include failure entities
Expand All @@ -125,18 +126,20 @@ object EnrichmentManager {

private def setDerivedContexts[F[_]: Sync](enriched: EnrichmentResult[F]): EnrichmentResult[F] =
IorT(
enriched.value.flatTap(v => Sync[F].delay {
val (derivedContexts, enriched) = v match {
case Ior.Right((e, l)) => (l, e.some)
case Ior.Left(l) => (extractFailureEntities(l), None)
case Ior.Both(b, (e, l)) => (l ::: extractFailureEntities(b), e.some)
enriched.value.flatTap(v =>
Sync[F].delay {
val (derivedContexts, enriched) = v match {
case Ior.Right((e, l)) => (l, e.some)
case Ior.Left(l) => (extractFailureEntities(l), None)
case Ior.Both(b, (e, l)) => (l ::: extractFailureEntities(b), e.some)
}
for {
c <- ME.formatContexts(derivedContexts)
e <- enriched
_ = e.derived_contexts = c
} yield ()
}
for {
c <- ME.formatContexts(derivedContexts)
e <- enriched
_ = e.derived_contexts = c
} yield ()
})
)
)

private def extractFailureEntities(l: NonEmptyList[FailureEntity.BadRowWithFailureEntities]): List[SelfDescribingData[Json]] =
Expand All @@ -157,12 +160,12 @@ object EnrichmentManager {
} yield extract

iorT.leftMap { violations =>
buildSchemaViolationsBadRow(
violations,
EnrichedEvent.toPartiallyEnrichedEvent(enrichedEvent),
RawEvent.toRawEvent(raw),
processor
)
buildSchemaViolationsBadRow(
violations,
EnrichedEvent.toPartiallyEnrichedEvent(enrichedEvent),
RawEvent.toRawEvent(raw),
processor
)
}
}

Expand Down Expand Up @@ -875,14 +878,14 @@ object EnrichmentManager {
processor: Processor
): FailureEntity.BadRowWithFailureEntities = {
val now = Instant.now()
val failureEntities = vs.map(v => FailureEntity.fromSchemaViolation(v, now, processor))
val failureEntities = vs.toList.map(v => FailureEntity.fromSchemaViolation(v, now, processor))
FailureEntity.BadRowWithFailureEntities(
badRow = BadRow.SchemaViolations(
processor,
Failure.SchemaViolations(now, vs.map(_.schemaViolation)),
Payload.EnrichmentPayload(pee, re)
),
failureEntities = failureEntities.toList
failureEntities = failureEntities
)
}

Expand All @@ -893,8 +896,7 @@ object EnrichmentManager {
processor: Processor
): FailureEntity.BadRowWithFailureEntities = {
val now = Instant.now()
// TODO: Fill it correctly
val failureEntities = Nil
val failureEntities = fs.toList.flatMap(v => FailureEntity.fromEnrichmentFailure(v, now, processor))
FailureEntity.BadRowWithFailureEntities(
badRow = BadRow.EnrichmentFailures(
processor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,19 @@ package com.snowplowanalytics.snowplow.enrich.common.enrichments

import java.time.Instant

import cats.syntax.option._

import io.circe.{Encoder, Json}
import io.circe.generic.semiauto._
import io.circe.syntax._

import com.snowplowanalytics.snowplow.badrows._

import com.snowplowanalytics.iglu.client.ClientError
import com.snowplowanalytics.iglu.client.validator.ValidatorError

import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData}
import com.snowplowanalytics.iglu.core.circe.implicits.schemaKeyCirceJsonEncoder

/**
* Represents a failure encountered during enrichment of the event.
Expand All @@ -27,7 +33,7 @@ import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData
case class FailureEntity(
failureType: String,
errors: List[Json],
schema: Option[String],
schema: Option[SchemaKey],
data: Option[String],
timestamp: Instant,
componentName: String,
Expand All @@ -36,7 +42,9 @@ case class FailureEntity(

object FailureEntity {

val schemaKey = SchemaKey("com.snowplowanalytics.snowplow", "failure", "jsonschema", SchemaVer.Full(1, 0, 0))
val failureEntitySchemaKey = SchemaKey("com.snowplowanalytics.snowplow", "failure", "jsonschema", SchemaVer.Full(1, 0, 0))

implicit val failureEntityEncoder: Encoder[FailureEntity] = deriveEncoder[FailureEntity]

case class BadRowWithFailureEntities(
badRow: BadRow,
Expand All @@ -56,12 +64,52 @@ object FailureEntity {

def toSDJ(failure: FailureEntity): SelfDescribingData[Json] =
SelfDescribingData(
schemaKey,
failureEntitySchemaKey,
failure.asJson
)

implicit val instantEncoder: Encoder[Instant] = Encoder.encodeString.contramap[Instant](_.toString)
implicit val encoder: Encoder[FailureEntity] = deriveEncoder[FailureEntity]
def fromEnrichmentFailure(
ef: FailureDetails.EnrichmentFailure,
timestamp: Instant,
processor: Processor
): Option[FailureEntity] = {
val failureType = s"EnrichmentError: ${ef.enrichment.map(_.identifier).getOrElse("")}"
ef.message match {
case m: FailureDetails.EnrichmentFailureMessage.InputData =>
FailureEntity(
failureType = failureType,
errors = List(
Json.obj(
"message" := s"${m.field} - ${m.expectation}",
"source" := m.field
)
),
schema = ef.enrichment.map(_.schemaKey),
data = s"""{"${m.field}" : "${m.value.getOrElse("")}"}""".some,
timestamp = timestamp,
componentName = processor.artifact,
componentVersion = processor.version
).some
case m: FailureDetails.EnrichmentFailureMessage.Simple =>
FailureEntity(
failureType = failureType,
errors = List(
Json.obj(
"message" := m.error
)
),
schema = ef.enrichment.map(_.schemaKey),
data = None,
timestamp = timestamp,
componentName = processor.artifact,
componentVersion = processor.version
).some
case _: FailureDetails.EnrichmentFailureMessage.IgluError =>
// EnrichmentFailureMessage.IgluError isn't used anywhere in the project
// therefore we don't expect it in here
None
}
}

def fromSchemaViolation(
v: SchemaViolationWithExtraContext,
Expand All @@ -71,8 +119,8 @@ object FailureEntity {
v.schemaViolation match {
case f: FailureDetails.SchemaViolation.NotJson =>
FailureEntity(
failureType = "NotJson",
errors = List(Json.obj("message" -> f.error.asJson, "source" -> v.source.asJson)),
failureType = "NotJSON",
errors = List(Json.obj("message" := f.error.asJson, "source" := v.source.asJson)),
schema = None,
data = v.data,
timestamp = timestamp,
Expand All @@ -83,14 +131,87 @@ object FailureEntity {
val message = f.error.message("").split(":").headOption
FailureEntity(
failureType = "NotIglu",
errors = List(Json.obj("message" -> message.asJson, "source" -> v.source.asJson)),
errors = List(Json.obj("message" := message.asJson, "source" := v.source.asJson)),
schema = None,
data = v.data,
timestamp = timestamp,
componentName = processor.artifact,
componentVersion = processor.version
)
// TODO: Implement remaining cases
case _ => throw new Exception("")
case f: FailureDetails.SchemaViolation.CriterionMismatch =>
val message = s"Unexpected schema: ${f.schemaKey.toSchemaUri} does not match the criterion"
FailureEntity(
failureType = "CriterionMismatch",
errors = List(
Json.obj(
"message" := message,
"source" := v.source,
"criterion" := f.schemaCriterion.asString
)
),
schema = f.schemaKey.some,
data = v.data,
timestamp = timestamp,
componentName = processor.artifact,
componentVersion = processor.version
)
case FailureDetails.SchemaViolation.IgluError(schemaKey, ClientError.ResolutionError(lh)) =>
val message = s"Resolution error: schema ${schemaKey.toSchemaUri} not found"
val lookupHistory = lh.toList
.map {
case (repo, lookups) =>
lookups.asJson.deepMerge(Json.obj("repository" := repo.asJson))
}
FailureEntity(
failureType = "ResolutionError",
errors = List(
Json.obj(
"message" := message,
"source" := v.source,
"lookupHistory" := lookupHistory
)
),
schema = schemaKey.some,
data = v.data,
timestamp = timestamp,
componentName = processor.artifact,
componentVersion = processor.version
)
case FailureDetails.SchemaViolation.IgluError(schemaKey, ClientError.ValidationError(ValidatorError.InvalidData(e), _)) =>
val errors = e.toList.map { r =>
Json.obj(
"message" := r.message,
"source" := v.source,
"path" := r.path,
"keyword" := r.keyword,
"targets" := r.targets
)
}
FailureEntity(
failureType = "ValidationError",
errors = errors,
schema = schemaKey.some,
data = v.data,
timestamp = timestamp,
componentName = processor.artifact,
componentVersion = processor.version
)
case FailureDetails.SchemaViolation.IgluError(schemaKey, ClientError.ValidationError(ValidatorError.InvalidSchema(e), _)) =>
val errors = e.toList.map { r =>
Json.obj(
"message" := s"Invalid schema: $schemaKey - ${r.message}",
"source" := v.source,
"path" := r.path
)
}
FailureEntity(
failureType = "ValidationError",
errors = errors,
schema = schemaKey.some,
data = v.data,
timestamp = timestamp,
componentName = processor.artifact,
componentVersion = processor.version
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ object IgluUtils {
SchemaViolationWithExtraContext(
schemaViolation = FailureDetails.SchemaViolation.NotJson(field, rawJson.some, e),
source = field,
data = s"{$field : $rawJson".some
data = s"""{"$field" : "$rawJson"}""".some
)
)
.toEitherT[F]
Expand Down
Loading

0 comments on commit 231d5f2

Please sign in to comment.