From 9b068b3ce4ae9616879fd0554c758eccb702ee9f Mon Sep 17 00:00:00 2001 From: spenes Date: Mon, 1 Apr 2024 10:54:41 +0300 Subject: [PATCH 1/2] [Temp] Iglu repo for failure entity schema Remove this after failure entity schema is merged --- .../SpecHelpers.scala | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/SpecHelpers.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/SpecHelpers.scala index 6bf35e859..b7953076a 100644 --- a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/SpecHelpers.scala +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/SpecHelpers.scala @@ -65,6 +65,16 @@ object SpecHelpers extends CatsEffect { } } }, + { + "name": "Temp Iglu Central", + "priority": 0, + "vendorPrefixes": [], + "connection": { + "http": { + "uri": "https://raw.githubusercontent.com/snowplow/iglu-central/incomplete-events-schema/" + } + } + }, { "name": "Embedded src/test/resources", "priority": 100, From c4ab89b0f188cb3b31f629eaf5c60036d5c2cda4 Mon Sep 17 00:00:00 2001 From: spenes Date: Tue, 26 Mar 2024 15:37:58 +0300 Subject: [PATCH 2/2] Add failure entity for incomplete events With this commit, failure entities are started to be attached to incomplete events as derived contexts. There are some cases that we need more information than in the bad rows to create failure entities therefore it isn't possible to create failure entities from bad rows directly. For this purpose, we created wrapper classes to attach extra information about failure entities. --- .../common/enrichments/AtomicFields.scala | 17 +- .../AtomicFieldsLengthValidator.scala | 4 +- .../enrichments/EnrichmentManager.scala | 225 +++++----- .../common/enrichments/Failure.scala | 195 +++++++++ .../common/utils/IgluUtils.scala | 89 ++-- .../SpecHelpers.scala | 8 +- .../enrichments/AtomicFieldsSpec.scala | 53 +++ .../enrichments/EnrichmentManagerSpec.scala | 402 +++++++++++++++--- .../enrichments/FailureSpec.scala | 392 +++++++++++++++++ .../utils/IgluUtilsSpec.scala | 270 +++++++++--- 10 files changed, 1395 insertions(+), 260 deletions(-) create mode 100644 modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/Failure.scala create mode 100644 modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/AtomicFieldsSpec.scala create mode 100644 modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/FailureSpec.scala diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/AtomicFields.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/AtomicFields.scala index 26e513760..2ddbc83b4 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/AtomicFields.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/AtomicFields.scala @@ -12,6 +12,8 @@ package com.snowplowanalytics.snowplow.enrich.common.enrichments import cats.data.NonEmptyList +import io.circe.syntax._ + import com.snowplowanalytics.snowplow.badrows.FailureDetails import com.snowplowanalytics.iglu.client.ClientError.ValidationError @@ -133,12 +135,19 @@ object AtomicFields { AtomicFields(withLimits) } - def errorsToSchemaViolation(errors: NonEmptyList[ValidatorReport]): FailureDetails.SchemaViolation = { + def errorsToSchemaViolation(errors: NonEmptyList[ValidatorReport]): Failure.SchemaViolation = { val clientError = ValidationError(ValidatorError.InvalidData(errors), None) - FailureDetails.SchemaViolation.IgluError( - AtomicFields.atomicSchema, - clientError + val failureData = errors.toList.flatMap(e => e.path.map(p => p := e.keyword)).toMap.asJson + + Failure.SchemaViolation( + schemaViolation = FailureDetails.SchemaViolation.IgluError( + AtomicFields.atomicSchema, + clientError + ), + // Source atomic field and actual value of the field should be already on the ValidatorReport list + source = "atomic_field", + data = failureData ) } } diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/AtomicFieldsLengthValidator.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/AtomicFieldsLengthValidator.scala index c14a7a6d2..01929f75a 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/AtomicFieldsLengthValidator.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/AtomicFieldsLengthValidator.scala @@ -19,8 +19,6 @@ import cats.implicits._ import com.snowplowanalytics.iglu.client.validator.ValidatorReport -import com.snowplowanalytics.snowplow.badrows.FailureDetails - import com.snowplowanalytics.snowplow.enrich.common.enrichments.AtomicFields.LimitedAtomicField import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent @@ -37,7 +35,7 @@ object AtomicFieldsLengthValidator { acceptInvalid: Boolean, invalidCount: F[Unit], atomicFields: AtomicFields - ): IorT[F, FailureDetails.SchemaViolation, Unit] = + ): IorT[F, Failure.SchemaViolation, Unit] = IorT { atomicFields.value .map(validateField(event, _).toValidatedNel) diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala index 54be85e71..6737dccfe 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala @@ -30,7 +30,7 @@ import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData import com.snowplowanalytics.iglu.core.circe.implicits._ import com.snowplowanalytics.snowplow.badrows._ -import com.snowplowanalytics.snowplow.badrows.{FailureDetails, Payload, Processor} +import com.snowplowanalytics.snowplow.badrows.{Failure => BadRowFailure} import com.snowplowanalytics.snowplow.enrich.common.{EtlPipeline, QueryStringParameters, RawEventParameters} import com.snowplowanalytics.snowplow.enrich.common.adapters.RawEvent @@ -71,52 +71,109 @@ object EnrichmentManager { atomicFields: AtomicFields, emitIncomplete: Boolean ): IorT[F, BadRow, EnrichedEvent] = { - val iorT: IorT[F, NonEmptyList[BadRow], EnrichedEvent] = for { - enriched <- IorT.pure[F, NonEmptyList[BadRow]](new EnrichedEvent) - extractResult <- mapAndValidateInput( - raw, - enriched, - etlTstamp, - processor, - client, - registryLookup - ) - .leftMap(NonEmptyList.one) - .possiblyExitingEarly(emitIncomplete) - // Next 2 lines remove the invalid contexts and the invalid unstructured event from the event. - // This should be done after the bad row was created and only if emitIncomplete is enabled. - _ = { - enriched.contexts = ME.formatContexts(extractResult.contexts).orNull - enriched.unstruct_event = ME.formatUnstructEvent(extractResult.unstructEvent).orNull - } - enrichmentsContexts <- runEnrichments( - registry, - processor, - raw, - enriched, - extractResult.contexts, - extractResult.unstructEvent, - featureFlags.legacyEnrichmentOrder - ) - .leftMap(NonEmptyList.one) - .possiblyExitingEarly(emitIncomplete) - _ <- validateEnriched( - enriched, - raw, - enrichmentsContexts, - extractResult.validationInfoContexts, - client, - processor, - registryLookup, - featureFlags.acceptInvalid, - invalidCount, - atomicFields - ) - .leftMap(NonEmptyList.one) - .possiblyExitingEarly(emitIncomplete) - } yield enriched + def enrich(enriched: EnrichedEvent): IorT[F, NonEmptyList[NonEmptyList[Failure]], List[SelfDescribingData[Json]]] = + for { + extractResult <- mapAndValidateInput( + raw, + enriched, + etlTstamp, + processor, + client, + registryLookup + ) + .leftMap(NonEmptyList.one) + .possiblyExitingEarly(emitIncomplete) + // Next 2 lines remove the invalid contexts and the invalid unstructured event from the event. + // This should be done after the bad row was created and only if emitIncomplete is enabled. + _ = { + enriched.contexts = ME.formatContexts(extractResult.contexts).orNull + enriched.unstruct_event = ME.formatUnstructEvent(extractResult.unstructEvent).orNull + } + enrichmentsContexts <- runEnrichments( + registry, + raw, + enriched, + extractResult.contexts, + extractResult.unstructEvent, + featureFlags.legacyEnrichmentOrder + ) + .leftMap(NonEmptyList.one) + .possiblyExitingEarly(emitIncomplete) + validContexts <- validateEnriched( + enriched, + enrichmentsContexts, + client, + registryLookup, + featureFlags.acceptInvalid, + invalidCount, + atomicFields + ) + .leftMap(NonEmptyList.one) + .possiblyExitingEarly(emitIncomplete) + derivedContexts = validContexts ::: extractResult.validationInfoContexts + } yield derivedContexts + + // derived contexts are set lastly because we want to include failure entities + // to derived contexts as well and we can get failure entities only in the end + // of the enrichment process + IorT( + for { + enrichedEvent <- Sync[F].delay(new EnrichedEvent) + enrichmentResult <- enrich(enrichedEvent).value + now = Instant.now() + _ = setDerivedContexts(enrichedEvent, enrichmentResult, now, processor) + result = enrichmentResult + .leftMap { fe => + createBadRow( + fe, + EnrichedEvent.toPartiallyEnrichedEvent(enrichedEvent), + RawEvent.toRawEvent(raw), + now, + processor + ) + } + .map(_ => enrichedEvent) + } yield result + ) + } + + private def createBadRow( + fe: NonEmptyList[NonEmptyList[Failure]], + pe: Payload.PartiallyEnrichedEvent, + re: Payload.RawEvent, + timestamp: Instant, + processor: Processor + ): BadRow = { + val firstList = fe.head + firstList.head match { + case h: Failure.SchemaViolation => + val sv = firstList.tail.collect { case f: Failure.SchemaViolation => f } + BadRow.SchemaViolations( + processor, + BadRowFailure.SchemaViolations(timestamp, NonEmptyList(h, sv).map(_.schemaViolation)), + Payload.EnrichmentPayload(pe, re) + ) + case h: Failure.EnrichmentFailure => + val ef = firstList.tail.collect { case f: Failure.EnrichmentFailure => f } + BadRow.EnrichmentFailures( + processor, + BadRowFailure.EnrichmentFailures(timestamp, NonEmptyList(h, ef).map(_.enrichmentFailure)), + Payload.EnrichmentPayload(pe, re) + ) + } + } - iorT.leftMap(_.head) + def setDerivedContexts( + enriched: EnrichedEvent, + enrichmentResult: Ior[NonEmptyList[NonEmptyList[Failure]], List[SelfDescribingData[Json]]], + timestamp: Instant, + processor: Processor + ): Unit = { + val derivedContexts = enrichmentResult.leftMap { ll => + ll.flatten.toList + .map(_.toSDJ(timestamp, processor)) + }.merge + ME.formatContexts(derivedContexts).foreach(c => enriched.derived_contexts = c) } private def mapAndValidateInput[F[_]: Sync]( @@ -126,23 +183,15 @@ object EnrichmentManager { processor: Processor, client: IgluCirceClient[F], registryLookup: RegistryLookup[F] - ): IorT[F, BadRow, IgluUtils.EventExtractResult] = { - val iorT = for { + ): IorT[F, NonEmptyList[Failure], IgluUtils.EventExtractResult] = + for { _ <- setupEnrichedEvent[F](raw, enrichedEvent, etlTstamp, processor) .leftMap(NonEmptyList.one) - extract <- IgluUtils.extractAndValidateInputJsons(enrichedEvent, client, registryLookup) + extract <- IgluUtils + .extractAndValidateInputJsons(enrichedEvent, client, registryLookup) + .leftMap { l: NonEmptyList[Failure] => l } } yield extract - iorT.leftMap { violations => - buildSchemaViolationsBadRow( - violations, - EnrichedEvent.toPartiallyEnrichedEvent(enrichedEvent), - RawEvent.toRawEvent(raw), - processor - ) - } - } - /** * Run all the enrichments * @param enriched /!\ MUTABLE enriched event, mutated IN-PLACE /!\ @@ -151,13 +200,12 @@ object EnrichmentManager { */ private def runEnrichments[F[_]: Monad]( registry: EnrichmentRegistry[F], - processor: Processor, raw: RawEvent, enriched: EnrichedEvent, inputContexts: List[SelfDescribingData[Json]], unstructEvent: Option[SelfDescribingData[Json]], legacyOrder: Boolean - ): IorT[F, BadRow, List[SelfDescribingData[Json]]] = + ): IorT[F, NonEmptyList[Failure], List[SelfDescribingData[Json]]] = IorT { accState(registry, raw, inputContexts, unstructEvent, legacyOrder) .runS(Accumulation(enriched, Nil, Nil)) @@ -166,12 +214,7 @@ object EnrichmentManager { failures.toNel match { case Some(nel) => Ior.both( - buildEnrichmentFailuresBadRow( - nel, - EnrichedEvent.toPartiallyEnrichedEvent(enriched), - RawEvent.toRawEvent(raw), - processor - ), + nel.map(Failure.EnrichmentFailure), contexts ) case None => @@ -182,33 +225,19 @@ object EnrichmentManager { private def validateEnriched[F[_]: Clock: Monad]( enriched: EnrichedEvent, - raw: RawEvent, enrichmentsContexts: List[SelfDescribingData[Json]], - validationInfoContexts: List[SelfDescribingData[Json]], client: IgluCirceClient[F], - processor: Processor, registryLookup: RegistryLookup[F], acceptInvalid: Boolean, invalidCount: F[Unit], atomicFields: AtomicFields - ): IorT[F, BadRow, Unit] = { - val iorT = for { + ): IorT[F, NonEmptyList[Failure], List[SelfDescribingData[Json]]] = + for { validContexts <- IgluUtils.validateEnrichmentsContexts[F](client, enrichmentsContexts, registryLookup) - _ = ME.formatContexts(validContexts ::: validationInfoContexts).foreach(enriched.derived_contexts = _) _ <- AtomicFieldsLengthValidator .validate[F](enriched, acceptInvalid, invalidCount, atomicFields) - .leftMap(NonEmptyList.one) - } yield () - - iorT.leftMap { violations => - buildSchemaViolationsBadRow( - violations, - EnrichedEvent.toPartiallyEnrichedEvent(enriched), - RawEvent.toRawEvent(raw), - processor - ) - } - } + .leftMap { v: Failure => NonEmptyList.one(v) } + } yield validContexts private[enrichments] case class Accumulation( event: EnrichedEvent, @@ -336,7 +365,7 @@ object EnrichmentManager { e: EnrichedEvent, etlTstamp: DateTime, processor: Processor - ): IorT[F, FailureDetails.SchemaViolation, Unit] = + ): IorT[F, Failure.SchemaViolation, Unit] = IorT { Sync[F].delay { e.event_id = EE.generateEventId() // May be updated later if we have an `eid` parameter @@ -847,30 +876,6 @@ object EnrichmentManager { } } - private def buildSchemaViolationsBadRow( - vs: NonEmptyList[FailureDetails.SchemaViolation], - pee: Payload.PartiallyEnrichedEvent, - re: Payload.RawEvent, - processor: Processor - ): BadRow.SchemaViolations = - BadRow.SchemaViolations( - processor, - Failure.SchemaViolations(Instant.now(), vs), - Payload.EnrichmentPayload(pee, re) - ) - - private def buildEnrichmentFailuresBadRow( - fs: NonEmptyList[FailureDetails.EnrichmentFailure], - pee: Payload.PartiallyEnrichedEvent, - re: Payload.RawEvent, - processor: Processor - ) = - BadRow.EnrichmentFailures( - processor, - Failure.EnrichmentFailures(Instant.now(), fs), - Payload.EnrichmentPayload(pee, re) - ) - private implicit class IorTOps[F[_], A, B](val iorT: IorT[F, A, B]) extends AnyVal { /** If the incomplete events feature is disabled, then convert a Both to a Left, so we don't waste time with next steps */ diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/Failure.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/Failure.scala new file mode 100644 index 000000000..6e982866a --- /dev/null +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/Failure.scala @@ -0,0 +1,195 @@ +/* + * Copyright (c) 2024-present Snowplow Analytics Ltd. + * All rights reserved. + * + * This software is made available by Snowplow Analytics, Ltd., + * under the terms of the Snowplow Limited Use License Agreement, Version 1.0 + * located at https://docs.snowplow.io/limited-use-license-1.0 + * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION + * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. + */ +package com.snowplowanalytics.snowplow.enrich.common.enrichments + +import java.time.Instant + +import cats.syntax.option._ + +import io.circe.{Encoder, Json} +import io.circe.generic.semiauto._ +import io.circe.syntax._ + +import com.snowplowanalytics.snowplow.badrows._ + +import com.snowplowanalytics.iglu.client.ClientError +import com.snowplowanalytics.iglu.client.validator.{ValidatorError, ValidatorReport} + +import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData} +import com.snowplowanalytics.iglu.core.circe.implicits.schemaKeyCirceJsonEncoder + +/** + * Represents a failure encountered during enrichment of the event. + * Failure entities will be attached to incomplete events as derived contexts. + */ +sealed trait Failure { + def toSDJ(timestamp: Instant, processor: Processor): SelfDescribingData[Json] +} + +object Failure { + + val failureSchemaKey = SchemaKey("com.snowplowanalytics.snowplow", "failure", "jsonschema", SchemaVer.Full(1, 0, 0)) + + case class SchemaViolation( + schemaViolation: FailureDetails.SchemaViolation, + source: String, + data: Json + ) extends Failure { + def toSDJ(timestamp: Instant, processor: Processor): SelfDescribingData[Json] = { + val feJson = fromSchemaViolation(this, timestamp, processor) + SelfDescribingData(failureSchemaKey, feJson.asJson) + } + + } + + case class EnrichmentFailure( + enrichmentFailure: FailureDetails.EnrichmentFailure + ) extends Failure { + def toSDJ(timestamp: Instant, processor: Processor): SelfDescribingData[Json] = { + val feJson = fromEnrichmentFailure(this, timestamp, processor) + SelfDescribingData(failureSchemaKey, feJson.asJson) + } + } + + case class FailureContext( + failureType: String, + errors: List[Json], + schema: Option[SchemaKey], + data: Option[Json], + timestamp: Instant, + componentName: String, + componentVersion: String + ) + + object FailureContext { + implicit val failureContextEncoder: Encoder[FailureContext] = deriveEncoder[FailureContext] + } + + def fromEnrichmentFailure( + ef: EnrichmentFailure, + timestamp: Instant, + processor: Processor + ): FailureContext = { + val failureType = s"EnrichmentError: ${ef.enrichmentFailure.enrichment.map(_.identifier).getOrElse("")}" + val schemaKey = ef.enrichmentFailure.enrichment.map(_.schemaKey) + val (errors, data) = ef.enrichmentFailure.message match { + case FailureDetails.EnrichmentFailureMessage.InputData(field, value, expectation) => + ( + List( + Json.obj( + "message" := s"$field - $expectation", + "source" := field + ) + ), + Json.obj(field := value).some + ) + case FailureDetails.EnrichmentFailureMessage.Simple(error) => + ( + List( + Json.obj( + "message" := error + ) + ), + None + ) + case FailureDetails.EnrichmentFailureMessage.IgluError(_, error) => + // EnrichmentFailureMessage.IgluError isn't used anywhere in the project. + // We are return this value for completeness. + ( + List( + Json.obj( + "message" := error + ) + ), + None + ) + } + FailureContext( + failureType = failureType, + errors = errors, + schema = schemaKey, + data = data, + timestamp = timestamp, + componentName = processor.artifact, + componentVersion = processor.version + ) + } + + def fromSchemaViolation( + v: SchemaViolation, + timestamp: Instant, + processor: Processor + ): FailureContext = { + val (failureType, errors, schema, data) = v.schemaViolation match { + case FailureDetails.SchemaViolation.NotJson(_, _, err) => + val error = Json.obj("message" := err, "source" := v.source) + ("NotJSON", List(error), None, Json.obj(v.source := v.data).some) + case FailureDetails.SchemaViolation.NotIglu(_, err) => + val message = err.message("").split(":").headOption + val error = Json.obj("message" := message, "source" := v.source) + ("NotIglu", List(error), None, v.data.some) + case FailureDetails.SchemaViolation.CriterionMismatch(schemaKey, schemaCriterion) => + val message = s"Unexpected schema: ${schemaKey.toSchemaUri} does not match the criterion" + val error = Json.obj( + "message" := message, + "source" := v.source, + "criterion" := schemaCriterion.asString + ) + ("CriterionMismatch", List(error), schemaKey.some, v.data.some) + case FailureDetails.SchemaViolation.IgluError(schemaKey, ClientError.ResolutionError(lh)) => + val message = s"Resolution error: schema ${schemaKey.toSchemaUri} not found" + val lookupHistory = lh.toList + .map { + case (repo, lookups) => + lookups.asJson.deepMerge(Json.obj("repository" := repo.asJson)) + } + val error = Json.obj( + "message" := message, + "source" := v.source, + "lookupHistory" := lookupHistory + ) + ("ResolutionError", List(error), schemaKey.some, v.data.some) + case FailureDetails.SchemaViolation.IgluError(schemaKey, ClientError.ValidationError(ValidatorError.InvalidData(e), _)) => + val isAtomicField = schemaKey == AtomicFields.atomicSchema + // If error is for atomic field, we want to set the source to atomic field name. Since ValidatorReport.path + // is set to atomic field name, we are using path as source. + def source(r: ValidatorReport) = if (isAtomicField) r.path.getOrElse(v.source) else v.source + val errors = e.toList.map { r => + Json.obj( + "message" := r.message, + "source" := source(r), + "path" := r.path, + "keyword" := r.keyword, + "targets" := r.targets + ) + } + ("ValidationError", errors, schemaKey.some, v.data.some) + case FailureDetails.SchemaViolation.IgluError(schemaKey, ClientError.ValidationError(ValidatorError.InvalidSchema(e), _)) => + val errors = e.toList.map { r => + Json.obj( + "message" := s"Invalid schema: ${schemaKey.toSchemaUri} - ${r.message}", + "source" := v.source, + "path" := r.path + ) + } + ("ValidationError", errors, schemaKey.some, v.data.some) + } + FailureContext( + failureType = failureType, + errors = errors, + schema = schema, + data = data, + timestamp = timestamp, + componentName = processor.artifact, + componentVersion = processor.version + ) + } +} diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/utils/IgluUtils.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/utils/IgluUtils.scala index 8483b2cca..6cea198d6 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/utils/IgluUtils.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/utils/IgluUtils.scala @@ -25,6 +25,8 @@ import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey, SchemaVer, SelfDescribingData} import com.snowplowanalytics.iglu.core.circe.implicits._ +import com.snowplowanalytics.snowplow.enrich.common.enrichments.Failure + import com.snowplowanalytics.snowplow.badrows._ import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent @@ -51,7 +53,7 @@ object IgluUtils { enriched: EnrichedEvent, client: IgluCirceClient[F], registryLookup: RegistryLookup[F] - ): IorT[F, NonEmptyList[FailureDetails.SchemaViolation], EventExtractResult] = + ): IorT[F, NonEmptyList[Failure.SchemaViolation], EventExtractResult] = for { contexts <- extractAndValidateInputContexts(enriched, client, registryLookup) unstruct <- extractAndValidateUnstructEvent(enriched, client, registryLookup) @@ -77,9 +79,9 @@ object IgluUtils { enriched: EnrichedEvent, client: IgluCirceClient[F], registryLookup: RegistryLookup[F], - field: String = "ue_properties", + field: String = "unstruct", criterion: SchemaCriterion = SchemaCriterion("com.snowplowanalytics.snowplow", "unstruct_event", "jsonschema", 1, 0) - ): IorT[F, NonEmptyList[FailureDetails.SchemaViolation], Option[SdjExtractResult]] = + ): IorT[F, NonEmptyList[Failure.SchemaViolation], Option[SdjExtractResult]] = Option(enriched.unstruct_event) match { case Some(rawUnstructEvent) => val iorT = for { @@ -88,11 +90,11 @@ object IgluUtils { .leftMap(NonEmptyList.one) .toIor // Parse Json unstructured event as SelfDescribingData[Json] - unstructSDJ <- parseAndValidateSDJ(unstruct, client, registryLookup) + unstructSDJ <- parseAndValidateSDJ(unstruct, client, registryLookup, field) } yield unstructSDJ.some iorT.recoverWith { case errors => IorT.fromIor[F](Ior.Both(errors, None)) } case None => - IorT.rightT[F, NonEmptyList[FailureDetails.SchemaViolation]](none[SdjExtractResult]) + IorT.rightT[F, NonEmptyList[Failure.SchemaViolation]](none[SdjExtractResult]) } /** @@ -110,7 +112,7 @@ object IgluUtils { registryLookup: RegistryLookup[F], field: String = "contexts", criterion: SchemaCriterion = SchemaCriterion("com.snowplowanalytics.snowplow", "contexts", "jsonschema", 1, 0) - ): IorT[F, NonEmptyList[FailureDetails.SchemaViolation], List[SdjExtractResult]] = + ): IorT[F, NonEmptyList[Failure.SchemaViolation], List[SdjExtractResult]] = Option(enriched.contexts) match { case Some(rawContexts) => val iorT = for { @@ -122,7 +124,7 @@ object IgluUtils { // Parse and validate each SDJ and merge the errors contextsSdj <- contexts .traverse( - parseAndValidateSDJ(_, client, registryLookup) + parseAndValidateSDJ(_, client, registryLookup, field) .map(sdj => List(sdj)) .recoverWith { case errors => IorT.fromIor[F](Ior.Both(errors, Nil)) } ) @@ -130,7 +132,7 @@ object IgluUtils { } yield contextsSdj iorT.recoverWith { case errors => IorT.fromIor[F](Ior.Both(errors, Nil)) } case None => - IorT.rightT[F, NonEmptyList[FailureDetails.SchemaViolation]](Nil) + IorT.rightT[F, NonEmptyList[Failure.SchemaViolation]](Nil) } /** @@ -146,13 +148,16 @@ object IgluUtils { client: IgluCirceClient[F], sdjs: List[SelfDescribingData[Json]], registryLookup: RegistryLookup[F] - ): IorT[F, NonEmptyList[FailureDetails.SchemaViolation], List[SelfDescribingData[Json]]] = + ): IorT[F, NonEmptyList[Failure.SchemaViolation], List[SelfDescribingData[Json]]] = checkList(client, sdjs, registryLookup) .leftMap( _.map { - case (schemaKey, clientError) => - val f: FailureDetails.SchemaViolation = FailureDetails.SchemaViolation.IgluError(schemaKey, clientError) - f + case (sdj, clientError) => + Failure.SchemaViolation( + schemaViolation = FailureDetails.SchemaViolation.IgluError(sdj.schema, clientError), + source = "derived_contexts", + data = sdj.asJson + ) } ) @@ -163,34 +168,54 @@ object IgluUtils { expectedCriterion: SchemaCriterion, client: IgluCirceClient[F], registryLookup: RegistryLookup[F] - ): EitherT[F, FailureDetails.SchemaViolation, Json] = + ): EitherT[F, Failure.SchemaViolation, Json] = for { // Parse Json string with the SDJ json <- JsonUtils .extractJson(rawJson) - .leftMap(e => FailureDetails.SchemaViolation.NotJson(field, rawJson.some, e)) + .leftMap(e => + Failure.SchemaViolation( + schemaViolation = FailureDetails.SchemaViolation.NotJson(field, rawJson.some, e), + source = field, + data = rawJson.asJson + ) + ) .toEitherT[F] // Parse Json as SelfDescribingData[Json] (which contains the .data that we want) sdj <- SelfDescribingData .parse(json) - .leftMap(FailureDetails.SchemaViolation.NotIglu(json, _)) + .leftMap(e => + Failure.SchemaViolation( + schemaViolation = FailureDetails.SchemaViolation.NotIglu(json, e), + source = field, + data = json + ) + ) .toEitherT[F] // Check that the schema of SelfDescribingData[Json] is the expected one _ <- if (validateCriterion(sdj, expectedCriterion)) - EitherT.rightT[F, FailureDetails.SchemaViolation](sdj) + EitherT.rightT[F, Failure.SchemaViolation](sdj) else EitherT .leftT[F, SelfDescribingData[Json]]( - FailureDetails.SchemaViolation.CriterionMismatch(sdj.schema, expectedCriterion) + Failure.SchemaViolation( + schemaViolation = FailureDetails.SchemaViolation.CriterionMismatch(sdj.schema, expectedCriterion), + source = field, + data = sdj.asJson + ) ) // Check that the SDJ holding the .data is valid _ <- check(client, sdj, registryLookup) .leftMap { case (schemaKey, clientError) => - FailureDetails.SchemaViolation.IgluError(schemaKey, clientError) + Failure.SchemaViolation( + schemaViolation = FailureDetails.SchemaViolation.IgluError(schemaKey, clientError), + source = field, + data = sdj.asJson + ) } // Extract .data of SelfDescribingData[Json] - data <- EitherT.rightT[F, FailureDetails.SchemaViolation](sdj.data) + data <- EitherT.rightT[F, Failure.SchemaViolation](sdj.data) } yield data /** Check that the schema of a SDJ matches the expected one */ @@ -217,11 +242,11 @@ object IgluUtils { client: IgluCirceClient[F], sdjs: List[SelfDescribingData[Json]], registryLookup: RegistryLookup[F] - ): IorT[F, NonEmptyList[(SchemaKey, ClientError)], List[SelfDescribingData[Json]]] = + ): IorT[F, NonEmptyList[(SelfDescribingData[Json], ClientError)], List[SelfDescribingData[Json]]] = sdjs.map { sdj => check(client, sdj, registryLookup) .map(_ => List(sdj)) - .leftMap(NonEmptyList.one) + .leftMap(e => NonEmptyList.one((sdj, e._2))) .toIor .recoverWith { case errors => IorT.fromIor[F](Ior.Both(errors, Nil)) } }.foldA @@ -230,19 +255,29 @@ object IgluUtils { private def parseAndValidateSDJ[F[_]: Monad: Clock]( json: Json, client: IgluCirceClient[F], - registryLookup: RegistryLookup[F] - ): IorT[F, NonEmptyList[FailureDetails.SchemaViolation], SdjExtractResult] = + registryLookup: RegistryLookup[F], + field: String + ): IorT[F, NonEmptyList[Failure.SchemaViolation], SdjExtractResult] = for { sdj <- IorT .fromEither[F](SelfDescribingData.parse(json)) - .leftMap[FailureDetails.SchemaViolation](FailureDetails.SchemaViolation.NotIglu(json, _)) + .leftMap[Failure.SchemaViolation](e => + Failure.SchemaViolation( + schemaViolation = FailureDetails.SchemaViolation.NotIglu(json, e), + source = field, + data = json.asJson + ) + ) .leftMap(NonEmptyList.one) supersedingSchema <- check(client, sdj, registryLookup) .leftMap { case (schemaKey, clientError) => - FailureDetails.SchemaViolation - .IgluError(schemaKey, clientError): FailureDetails.SchemaViolation - + Failure.SchemaViolation( + schemaViolation = FailureDetails.SchemaViolation + .IgluError(schemaKey, clientError): FailureDetails.SchemaViolation, + source = field, + data = json.asJson + ) } .leftMap(NonEmptyList.one) .toIor diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/SpecHelpers.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/SpecHelpers.scala index b7953076a..c8a0b2fe4 100644 --- a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/SpecHelpers.scala +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/SpecHelpers.scala @@ -139,15 +139,17 @@ object SpecHelpers extends CatsEffect { .flatMap(SelfDescribingData.parse[Json]) .leftMap(err => s"Can't parse Json [$rawJson] as as SelfDescribingData, error: [$err]") - def listContextsSchemas(rawContexts: String): List[SchemaKey] = + def listContexts(rawContexts: String): List[SelfDescribingData[Json]] = jsonStringToSDJ(rawContexts) .map(_.data.asArray.get.toList) - .flatMap(contexts => contexts.traverse(c => SelfDescribingData.parse[Json](c).map(_.schema))) match { + .flatMap(contexts => contexts.traverse(c => SelfDescribingData.parse[Json](c))) match { case Left(err) => throw new IllegalArgumentException(s"Couldn't list contexts schemas. Error: [$err]") - case Right(schemas) => schemas + case Right(sdjs) => sdjs } + def listContextsSchemas(rawContexts: String): List[SchemaKey] = listContexts(rawContexts).map(_.schema) + def getUnstructSchema(rawUnstruct: String): SchemaKey = jsonStringToSDJ(rawUnstruct) .map(_.data) diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/AtomicFieldsSpec.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/AtomicFieldsSpec.scala new file mode 100644 index 000000000..22b5e6baf --- /dev/null +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/AtomicFieldsSpec.scala @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2022-present Snowplow Analytics Ltd. + * All rights reserved. + * + * This software is made available by Snowplow Analytics, Ltd., + * under the terms of the Snowplow Limited Use License Agreement, Version 1.0 + * located at https://docs.snowplow.io/limited-use-license-1.0 + * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION + * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. + */ +package com.snowplowanalytics.snowplow.enrich.common.enrichments + +import cats.data.NonEmptyList +import cats.syntax.option._ + +import io.circe.Json +import io.circe.syntax._ + +import com.snowplowanalytics.iglu.client.ClientError.ValidationError +import com.snowplowanalytics.iglu.client.validator.{ValidatorError, ValidatorReport} +import com.snowplowanalytics.snowplow.badrows.FailureDetails + +import org.specs2.mutable.Specification + +class AtomicFieldsSpec extends Specification { + + "errorsToSchemaViolation" should { + "convert ValidatorReports to SchemaViolation correctly" >> { + val vrList = NonEmptyList( + ValidatorReport(message = "testMessage", path = "testPath1".some, targets = List("t1, t2"), keyword = "testKeyword1".some), + List( + ValidatorReport(message = "testMessage", path = None, targets = List.empty, keyword = "testKeyword2".some), + ValidatorReport(message = "testMessage", path = "testPath3".some, targets = List("t1", "t2"), keyword = None), + ValidatorReport(message = "testMessage", path = "testPath4".some, targets = List.empty, keyword = "testKeyword4".some) + ) + ) + val expected = Failure.SchemaViolation( + schemaViolation = FailureDetails.SchemaViolation.IgluError( + schemaKey = AtomicFields.atomicSchema, + error = ValidationError(ValidatorError.InvalidData(vrList), None) + ), + source = "atomic_field", + data = Json.obj( + "testPath1" := "testKeyword1", + "testPath3" := Json.Null, + "testPath4" := "testKeyword4" + ) + ) + val result = AtomicFields.errorsToSchemaViolation(vrList) + result must beEqualTo(expected) + } + } +} diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/EnrichmentManagerSpec.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/EnrichmentManagerSpec.scala index fd1aff96c..fe8317c5d 100644 --- a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/EnrichmentManagerSpec.scala +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/EnrichmentManagerSpec.scala @@ -10,6 +10,8 @@ */ package com.snowplowanalytics.snowplow.enrich.common.enrichments +import java.time.Instant + import org.apache.commons.codec.digest.DigestUtils import org.specs2.mutable.Specification @@ -29,8 +31,10 @@ import io.circe.syntax._ import org.joda.time.DateTime import com.snowplowanalytics.snowplow.badrows._ +import com.snowplowanalytics.snowplow.badrows.{Failure => BadRowFailure} import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey, SchemaVer, SelfDescribingData} +import com.snowplowanalytics.iglu.core.circe.implicits._ import com.snowplowanalytics.snowplow.enrich.common.QueryStringParameters import com.snowplowanalytics.snowplow.enrich.common.loaders._ @@ -175,10 +179,10 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE case Ior.Left( BadRow.SchemaViolations( _, - Failure.SchemaViolations(_, - NonEmptyList(FailureDetails.SchemaViolation.IgluError(schemaKey1, clientError1), - List(FailureDetails.SchemaViolation.IgluError(schemaKey2, clientError2)) - ) + BadRowFailure.SchemaViolations(_, + NonEmptyList(FailureDetails.SchemaViolation.IgluError(schemaKey1, clientError1), + List(FailureDetails.SchemaViolation.IgluError(schemaKey2, clientError2)) + ) ), _ ) @@ -242,7 +246,7 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE case Ior.Left( BadRow.EnrichmentFailures( _, - Failure.EnrichmentFailures( + BadRowFailure.EnrichmentFailures( _, NonEmptyList( FailureDetails.EnrichmentFailure( @@ -311,7 +315,7 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE case Ior.Left( BadRow.SchemaViolations( _, - Failure.SchemaViolations( + BadRowFailure.SchemaViolations( _, NonEmptyList( _: FailureDetails.SchemaViolation.IgluError, @@ -1237,6 +1241,15 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE JavascriptScriptEnrichment(schemaKey, script) ) ) + val invalidUe = + """{ + "schema":"iglu:com.acme/email_sent/jsonschema/1-0-0", + "data": { + "emailAddress": "hello@world.com", + "emailAddress2": "foo@bar.org", + "unallowedAdditionalField": "foo@bar.org" + } + }""" val parameters = Map( "e" -> "pp", @@ -1252,20 +1265,34 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE } """, "ue_pr" -> - """ + s""" { "schema":"iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0", - "data":{ - "schema":"iglu:com.acme/email_sent/jsonschema/1-0-0", - "data": { - "emailAddress": "hello@world.com", - "emailAddress2": "foo@bar.org", - "unallowedAdditionalField": "foo@bar.org" - } - } + "data":$invalidUe }""" ).toOpt val rawEvent = RawEvent(api, parameters, None, source, context) + def expectedDerivedContexts(enriched: EnrichedEvent): Boolean = { + val emailSentSDJ = SelfDescribingData.parse[Json](jparse(emailSent).toOption.get).toOption.get + SpecHelpers.listContexts(enriched.derived_contexts) match { + case List(SelfDescribingData(Failure.`failureSchemaKey`, feJson), `emailSentSDJ`) + if feJson.field("failureType") == "ValidationError".asJson && + feJson.field("errors") == Json.arr( + Json.obj( + "message" := "$.unallowedAdditionalField: is not defined in the schema and the schema does not allow additional properties", + "source" := "unstruct", + "path" := "$", + "keyword" := "additionalProperties", + "targets" := List("unallowedAdditionalField") + ) + ) && + feJson.field("schema") == emailSentSchema.asJson && + feJson.field("data") == jparse(invalidUe).toOption.get => + true + case _ => false + } + } + val enriched = EnrichmentManager.enrichEvent[IO]( enrichmentReg, client, @@ -1278,11 +1305,12 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE atomicFieldLimits, emitIncomplete = true ) + enriched.value.map { case Ior.Both(_: BadRow.SchemaViolations, enriched) if Option(enriched.unstruct_event).isEmpty && SpecHelpers.listContextsSchemas(enriched.contexts) == List(clientSessionSchema) && - SpecHelpers.listContextsSchemas(enriched.derived_contexts).contains(emailSentSchema) => + expectedDerivedContexts(enriched) => ok case other => ko(s"[$other] is not a SchemaViolations bad row and an enriched event without the unstructured event") } @@ -1305,24 +1333,24 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE JavascriptScriptEnrichment(schemaKey, script) ) ) + val invalidContext = + """{ + "schema":"iglu:com.acme/email_sent/jsonschema/1-0-0", + "data": { + "foo": "hello@world.com", + "emailAddress2": "foo@bar.org" + } + }""" val parameters = Map( "e" -> "pp", "tv" -> "js-0.13.1", "p" -> "web", "co" -> - """ + s""" { "schema": "iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0", - "data": [ - { - "schema":"iglu:com.acme/email_sent/jsonschema/1-0-0", - "data": { - "foo": "hello@world.com", - "emailAddress2": "foo@bar.org" - } - } - ] + "data": [$invalidContext] } """, "ue_pr" -> @@ -1333,6 +1361,34 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE }""" ).toOpt val rawEvent = RawEvent(api, parameters, None, source, context) + def expectedDerivedContexts(enriched: EnrichedEvent): Boolean = { + val emailSentSDJ = SelfDescribingData.parse[Json](jparse(emailSent).toOption.get).toOption.get + SpecHelpers.listContexts(enriched.derived_contexts) match { + case List(SelfDescribingData(Failure.`failureSchemaKey`, feJson), `emailSentSDJ`) + if feJson.field("failureType") == "ValidationError".asJson && + feJson.field("errors") == Json.arr( + Json.obj( + "message" := "$.emailAddress: is missing but it is required", + "source" := "contexts", + "path" := "$", + "keyword" := "required", + "targets" := List("emailAddress") + ), + Json.obj( + "message" := "$.foo: is not defined in the schema and the schema does not allow additional properties", + "source" := "contexts", + "path" := "$", + "keyword" := "additionalProperties", + "targets" := List("foo") + ) + ) && + feJson.field("schema") == emailSentSchema.asJson && + feJson.field("data") == jparse(invalidContext).toOption.get => + true + case _ => false + } + } + val enriched = EnrichmentManager.enrichEvent[IO]( enrichmentReg, client, @@ -1349,7 +1405,7 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE case Ior.Both(_: BadRow.SchemaViolations, enriched) if Option(enriched.contexts).isEmpty && SpecHelpers.getUnstructSchema(enriched.unstruct_event) == clientSessionSchema && - SpecHelpers.listContextsSchemas(enriched.derived_contexts).contains(emailSentSchema) => + expectedDerivedContexts(enriched) => ok case other => ko(s"[$other] is not a SchemaViolations bad row and an enriched event with no input contexts") } @@ -1372,6 +1428,15 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE JavascriptScriptEnrichment(schemaKey, script) ) ) + val invalidContext = + """ + { + "schema":"iglu:com.acme/email_sent/jsonschema/1-0-0", + "data": { + "foo": "hello@world.com", + "emailAddress2": "foo@bar.org" + } + }""" val parameters = Map( "e" -> "pp", @@ -1382,13 +1447,7 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE { "schema": "iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0", "data": [ - { - "schema":"iglu:com.acme/email_sent/jsonschema/1-0-0", - "data": { - "foo": "hello@world.com", - "emailAddress2": "foo@bar.org" - } - }, + $invalidContext, $clientSession ] } @@ -1401,6 +1460,34 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE }""" ).toOpt val rawEvent = RawEvent(api, parameters, None, source, context) + def expectedDerivedContexts(enriched: EnrichedEvent): Boolean = { + val emailSentSDJ = SelfDescribingData.parse[Json](jparse(emailSent).toOption.get).toOption.get + SpecHelpers.listContexts(enriched.derived_contexts) match { + case List(SelfDescribingData(Failure.`failureSchemaKey`, feJson), `emailSentSDJ`) + if feJson.field("failureType") == "ValidationError".asJson && + feJson.field("errors") == Json.arr( + Json.obj( + "message" := "$.emailAddress: is missing but it is required", + "source" := "contexts", + "path" := "$", + "keyword" := "required", + "targets" := List("emailAddress") + ), + Json.obj( + "message" := "$.foo: is not defined in the schema and the schema does not allow additional properties", + "source" := "contexts", + "path" := "$", + "keyword" := "additionalProperties", + "targets" := List("foo") + ) + ) && + feJson.field("schema") == emailSentSchema.asJson && + feJson.field("data") == jparse(invalidContext).toOption.get => + true + case _ => false + } + } + val enriched = EnrichmentManager.enrichEvent[IO]( enrichmentReg, client, @@ -1417,7 +1504,7 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE case Ior.Both(_: BadRow.SchemaViolations, enriched) if SpecHelpers.getUnstructSchema(enriched.unstruct_event) == clientSessionSchema && SpecHelpers.listContextsSchemas(enriched.contexts) == List(clientSessionSchema) && - SpecHelpers.listContextsSchemas(enriched.derived_contexts).contains(emailSentSchema) => + expectedDerivedContexts(enriched) => ok case other => ko(s"[$other] is not a SchemaViolations bad row and an enriched event with 1 input context") } @@ -1455,6 +1542,24 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE }""" ).toOpt val rawEvent = RawEvent(api, parameters, None, source, context) + def expectedDerivedContexts(enriched: EnrichedEvent): Boolean = + SpecHelpers.listContexts(enriched.derived_contexts) match { + case List( + SelfDescribingData(Failure.`failureSchemaKey`, feJson), + SelfDescribingData(SchemaKey("nl.basjes", "yauaa_context", "jsonschema", _), _) + ) + if feJson.field("failureType") == "EnrichmentError: Javascript enrichment".asJson && + feJson.field("errors") == Json.arr( + Json.obj( + "message" := "Error during execution of JavaScript function: [Javascript exception in at line number 3 at column number 10]" + ) + ) && + feJson.field("schema") == JavascriptScriptEnrichment.supportedSchema.copy(addition = 0.some).asString.asJson && + feJson.field("data") == Json.Null => + true + case _ => false + } + val enriched = EnrichmentManager.enrichEvent[IO]( enrichmentReg, client, @@ -1470,7 +1575,7 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE enriched.value.map { case Ior.Both(_: BadRow.EnrichmentFailures, enriched) if SpecHelpers.getUnstructSchema(enriched.unstruct_event) == clientSessionSchema && - !SpecHelpers.listContextsSchemas(enriched.derived_contexts).contains(emailSentSchema) => + expectedDerivedContexts(enriched) => ok case other => ko(s"[$other] is not an EnrichmentFailures bad row and an enriched event") } @@ -1508,6 +1613,36 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE }""" ).toOpt val rawEvent = RawEvent(api, parameters, None, source, context) + def expectedDerivedContexts(enriched: EnrichedEvent): Boolean = + SpecHelpers.listContexts(enriched.derived_contexts) match { + case List( + SelfDescribingData(Failure.`failureSchemaKey`, validationError), + SelfDescribingData(Failure.`failureSchemaKey`, enrichmentError) + ) + if validationError.field("failureType") == "ValidationError".asJson && + validationError.field("errors") == Json.arr( + Json.obj( + "message" := "Cannot be converted to java.math.BigDecimal. Error : Character f is neither a decimal digit number, decimal point, nor \"e\" notation exponential mark.", + "source" := "tr_tt", + "path" := "tr_tt", + "keyword" := "foo", + "targets" := Json.arr() + ) + ) && + validationError.field("schema") == AtomicFields.atomicSchema.asJson && + validationError.field("data") == Json.obj("tr_tt" := "foo") && + enrichmentError.field("failureType") == "EnrichmentError: Javascript enrichment".asJson && + enrichmentError.field("errors") == Json.arr( + Json.obj( + "message" := "Error during execution of JavaScript function: [Javascript exception in at line number 3 at column number 10]" + ) + ) && + enrichmentError.field("schema") == JavascriptScriptEnrichment.supportedSchema.copy(addition = 0.some).asString.asJson && + enrichmentError.field("data") == Json.Null => + true + case _ => false + } + val enriched = EnrichmentManager.enrichEvent[IO]( enrichmentReg, client, @@ -1521,23 +1656,26 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE emitIncomplete = true ) enriched.value.map { - case Ior.Both(_: BadRow.SchemaViolations, _) => ok + case Ior.Both(_: BadRow.SchemaViolations, enriched) if expectedDerivedContexts(enriched) => ok case other => ko(s"[$other] doesn't have a SchemaViolations bad row in the Left") } } "remove an invalid enrichment context and return the enriched event if emitIncomplete is set to true" >> { + val invalidContext = + """ + { + "schema":"iglu:com.acme/email_sent/jsonschema/1-0-0", + "data": { + "foo": "hello@world.com", + "emailAddress2": "foo@bar.org" + } + }""" val script = s""" function process(event) { return [ - { - "schema":"iglu:com.acme/email_sent/jsonschema/1-0-0", - "data": { - "foo": "hello@world.com", - "emailAddress2": "foo@bar.org" - } - } + $invalidContext ]; }""" val schemaKey = SchemaKey( @@ -1565,6 +1703,35 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE }""" ).toOpt val rawEvent = RawEvent(api, parameters, None, source, context) + def expectedDerivedContexts(enriched: EnrichedEvent): Boolean = + SpecHelpers.listContexts(enriched.derived_contexts) match { + case List( + SelfDescribingData(Failure.`failureSchemaKey`, feJson), + SelfDescribingData(SchemaKey("nl.basjes", "yauaa_context", "jsonschema", _), _) + ) + if feJson.field("failureType") == "ValidationError".asJson && + feJson.field("errors") == Json.arr( + Json.obj( + "message" := "$.emailAddress: is missing but it is required", + "source" := "derived_contexts", + "path" := "$", + "keyword" := "required", + "targets" := List("emailAddress") + ), + Json.obj( + "message" := "$.foo: is not defined in the schema and the schema does not allow additional properties", + "source" := "derived_contexts", + "path" := "$", + "keyword" := "additionalProperties", + "targets" := List("foo") + ) + ) && + feJson.field("schema") == emailSentSchema.asJson && + feJson.field("data") == jparse(invalidContext).toOption.get => + true + case _ => false + } + val enriched = EnrichmentManager.enrichEvent[IO]( enrichmentReg, client, @@ -1580,11 +1747,97 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE enriched.value.map { case Ior.Both(_: BadRow.SchemaViolations, enriched) if SpecHelpers.getUnstructSchema(enriched.unstruct_event) == clientSessionSchema && - !SpecHelpers.listContextsSchemas(enriched.derived_contexts).contains(emailSentSchema) => + expectedDerivedContexts(enriched) => ok case other => ko(s"[$other] is not a SchemaViolations bad row and an enriched event without the faulty enrichment context") } } + + "return a bad row that contains validation errors only from ue if there is validation error in both ue and derived contexts when emitIncomplete is set to true" >> { + val invalidContext = + """ + { + "schema":"iglu:com.acme/email_sent/jsonschema/1-0-0", + "data": { + "foo": "hello@world.com", + "emailAddress2": "foo@bar.org" + } + }""" + val invalidUe = + """{ + "schema":"iglu:com.snowplowanalytics.snowplow/client_session/jsonschema/1-0-1", + "data": { + "unallowedAdditionalField": "foo@bar.org" + } + }""" + val script = + s""" + function process(event) { + return [ + $invalidContext + ]; + }""" + val schemaKey = SchemaKey( + "com.snowplowanalytics.snowplow", + "javascript_script_config", + "jsonschema", + SchemaVer.Full(1, 0, 0) + ) + val enrichmentReg = EnrichmentRegistry[IO]( + javascriptScript = List( + JavascriptScriptEnrichment(schemaKey, script) + ) + ) + val parameters = Map( + "e" -> "pp", + "tv" -> "js-0.13.1", + "p" -> "web", + "ue_pr" -> + s""" + { + "schema":"iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0", + "data": $invalidUe + }""" + ).toOpt + val rawEvent = RawEvent(api, parameters, None, source, context) + val enriched = EnrichmentManager.enrichEvent[IO]( + enrichmentReg, + client, + processor, + timestamp, + rawEvent, + AcceptInvalid.featureFlags, + IO.unit, + SpecHelpers.registryLookup, + atomicFieldLimits, + emitIncomplete = true + ) + def expectedDerivedContexts(enriched: EnrichedEvent): Boolean = + SpecHelpers.listContextsSchemas(enriched.derived_contexts).count(_ == Failure.failureSchemaKey) == 2 + + def expectedBadRow(badRow: BadRow): Boolean = + badRow match { + case BadRow.SchemaViolations( + _, + BadRowFailure.SchemaViolations( + _, + NonEmptyList(FailureDetails.SchemaViolation.IgluError(`clientSessionSchema`, _), Nil) + ), + _ + ) => + true + case _ => false + } + + enriched.value.map { + case Ior.Both(badRow, enriched) + if Option(enriched.unstruct_event).isEmpty && + expectedDerivedContexts(enriched) && + expectedBadRow(badRow) => + ok + case other => ko(s"[$other] is not expected one") + } + } } "getCrossDomain" should { @@ -2066,7 +2319,7 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE case Ior.Left( BadRow.SchemaViolations( _, - Failure.SchemaViolations(_, NonEmptyList(FailureDetails.SchemaViolation.IgluError(schemaKey, clientError), Nil)), + BadRowFailure.SchemaViolations(_, NonEmptyList(FailureDetails.SchemaViolation.IgluError(schemaKey, clientError), Nil)), _ ) ) => @@ -2147,10 +2400,10 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE case Ior.Left( BadRow.SchemaViolations( _, - Failure.SchemaViolations(_, - NonEmptyList(FailureDetails.SchemaViolation.IgluError(schemaKey1, clientError1), - List(FailureDetails.SchemaViolation.IgluError(schemaKey2, clientError2)) - ) + BadRowFailure.SchemaViolations(_, + NonEmptyList(FailureDetails.SchemaViolation.IgluError(schemaKey1, clientError1), + List(FailureDetails.SchemaViolation.IgluError(schemaKey2, clientError2)) + ) ), _ ) @@ -2165,6 +2418,50 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE } } } + + "setDerivedContexts" should { + val sv = Failure.SchemaViolation( + schemaViolation = FailureDetails.SchemaViolation.NotJson("testField", "testValue".some, "testError"), + source = "testSource", + data = Json.obj("testKey" := "testValue") + ) + val ef = Failure.EnrichmentFailure( + FailureDetails.EnrichmentFailure( + None, + FailureDetails.EnrichmentFailureMessage.Simple("testError") + ) + ) + val emailSentSDJ = SelfDescribingData.parse[Json](jparse(emailSent).toOption.get).toOption.get + val timestamp = Instant.now() + "set derived contexts correctly if enrichment result is Ior.Left" >> { + val enriched = new EnrichedEvent() + val enrichmentResult = Ior.Left(NonEmptyList.of(NonEmptyList.of(sv, ef), NonEmptyList.of(sv, ef))) + EnrichmentManager.setDerivedContexts(enriched, enrichmentResult, timestamp, processor) + val schemas = SpecHelpers.listContextsSchemas(enriched.derived_contexts) + schemas.size must beEqualTo(4) + forall(schemas)(s => s must beEqualTo(Failure.failureSchemaKey)) + } + "set derived contexts correctly if enrichment result is Ior.Right" >> { + val enriched = new EnrichedEvent() + val enrichmentResult = Ior.Right(List(emailSentSDJ, emailSentSDJ)) + EnrichmentManager.setDerivedContexts(enriched, enrichmentResult, timestamp, processor) + val schemas = SpecHelpers.listContextsSchemas(enriched.derived_contexts) + schemas.size must beEqualTo(2) + forall(schemas)(s => s must beEqualTo(emailSentSchema)) + } + "set derived contexts correctly if enrichment result is Ior.Both" >> { + val enriched = new EnrichedEvent() + val enrichmentResult = Ior.Both( + NonEmptyList.of(NonEmptyList.of(sv, ef), NonEmptyList.of(sv, ef)), + List(emailSentSDJ, emailSentSDJ) + ) + EnrichmentManager.setDerivedContexts(enriched, enrichmentResult, timestamp, processor) + val schemas = SpecHelpers.listContextsSchemas(enriched.derived_contexts) + schemas.size must beEqualTo(6) + schemas.count(_ == Failure.failureSchemaKey) must beEqualTo(4) + schemas.count(_ == emailSentSchema) must beEqualTo(2) + } + } } object EnrichmentManagerSpec { @@ -2267,4 +2564,9 @@ object EnrichmentManagerSpec { "userId": "20d631b8-7837-49df-a73e-6da73154e6fd" } }""" + + implicit class JsonFieldGetter(json: Json) { + def field(f: String): Json = + json.hcursor.downField(f).as[Json].toOption.get + } } diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/FailureSpec.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/FailureSpec.scala new file mode 100644 index 000000000..678d65d51 --- /dev/null +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/FailureSpec.scala @@ -0,0 +1,392 @@ +/* + * Copyright (c) 2014-present Snowplow Analytics Ltd. + * All rights reserved. + * + * This software is made available by Snowplow Analytics, Ltd., + * under the terms of the Snowplow Limited Use License Agreement, Version 1.0 + * located at https://docs.snowplow.io/limited-use-license-1.0 + * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION + * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. + */ +package com.snowplowanalytics.snowplow.enrich.common.enrichments + +import java.time.Instant + +import scala.collection.immutable.SortedMap + +import cats.effect.testing.specs2.CatsEffect +import cats.effect.unsafe.implicits.global +import cats.effect.IO + +import cats.data.NonEmptyList +import cats.syntax.option._ + +import io.circe.syntax._ +import io.circe.Json + +import org.specs2.mutable.Specification +import org.specs2.matcher.ValidatedMatchers +import org.specs2.ScalaCheck + +import org.scalacheck.{Gen, Prop} + +import com.snowplowanalytics.snowplow.enrich.common.SpecHelpers +import com.snowplowanalytics.snowplow.badrows.{FailureDetails, Processor} +import com.snowplowanalytics.iglu.core.{ParseError, SchemaCriterion, SchemaKey, SchemaVer, SelfDescribingData} +import com.snowplowanalytics.iglu.client.ClientError +import com.snowplowanalytics.iglu.client.validator.{ValidatorError, ValidatorReport} +import com.snowplowanalytics.iglu.client.resolver.LookupHistory +import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup + +class FailureSpec extends Specification with ValidatedMatchers with CatsEffect with ScalaCheck { + + val timestamp = Instant.now() + val processor = Processor("unit tests SCE", "v42") + val schemaKey = SchemaKey("com.snowplowanalytics", "test", "jsonschema", SchemaVer.Full(1, 0, 0)) + val schemaCriterion = SchemaCriterion.apply("com.snowplowanalytics", "test", "jsonschema", 1) + + "FailureEntityContext should be valid against its schema" >> { + implicit val registryLookup: RegistryLookup[IO] = SpecHelpers.registryLookup + + val genFeContext = for { + failureType <- Gen.alphaNumStr + jsonGen = Gen.oneOf( + Json.obj(), + Json.obj("test1" := "value1"), + Json.obj("test1" := "value1", "test2" := "value2"), + Json.obj("test1" := "value1", "test2" := "value2", "test3" := "value3") + ) + errors <- Gen.listOf(jsonGen) + data <- Gen.option(jsonGen) + schema <- Gen.option(Gen.const(schemaKey)) + } yield Failure.FailureContext( + failureType = failureType, + errors = errors, + schema = schema, + data = data, + timestamp = timestamp, + componentName = processor.artifact, + componentVersion = processor.version + ) + + Prop.forAll(genFeContext) { feContext: Failure.FailureContext => + val sdj = SelfDescribingData(schema = Failure.failureSchemaKey, data = feContext.asJson) + SpecHelpers.client + .check(sdj) + .value + .map(_ must beRight) + .unsafeRunSync() + } + } + + "fromEnrichmentFailure" should { + "convert InputData correctly" >> { + val ef = Failure.EnrichmentFailure( + enrichmentFailure = FailureDetails.EnrichmentFailure( + enrichment = FailureDetails + .EnrichmentInformation( + schemaKey = schemaKey, + identifier = "enrichmentId" + ) + .some, + message = FailureDetails.EnrichmentFailureMessage.InputData( + field = "testField", + value = "testValue".some, + expectation = "testExpectation" + ) + ) + ) + val result = Failure.fromEnrichmentFailure(ef, timestamp, processor) + val expected = Failure.FailureContext( + failureType = "EnrichmentError: enrichmentId", + errors = List( + Json.obj( + "message" := "testField - testExpectation", + "source" := "testField" + ) + ), + schema = schemaKey.some, + data = Json.obj("testField" := "testValue").some, + timestamp = timestamp, + componentName = processor.artifact, + componentVersion = processor.version + ) + result must beEqualTo(expected) + } + + "convert Simple correctly" >> { + val ef = Failure.EnrichmentFailure( + enrichmentFailure = FailureDetails.EnrichmentFailure( + enrichment = FailureDetails + .EnrichmentInformation( + schemaKey = schemaKey, + identifier = "enrichmentId" + ) + .some, + message = FailureDetails.EnrichmentFailureMessage.Simple(error = "testError") + ) + ) + val result = Failure.fromEnrichmentFailure(ef, timestamp, processor) + val expected = Failure.FailureContext( + failureType = "EnrichmentError: enrichmentId", + errors = List(Json.obj("message" := "testError")), + schema = schemaKey.some, + data = None, + timestamp = timestamp, + componentName = processor.artifact, + componentVersion = processor.version + ) + result must beEqualTo(expected) + } + } + + "fromSchemaViolation" should { + "convert NotJson correctly" >> { + val sv = Failure.SchemaViolation( + schemaViolation = FailureDetails.SchemaViolation.NotJson( + field = "testField", + value = "testValue".some, + error = "testError" + ), + source = "testSource", + data = "testData".asJson + ) + val fe = Failure.fromSchemaViolation(sv, timestamp, processor) + val expected = Failure.FailureContext( + failureType = "NotJSON", + errors = List( + Json.obj( + "message" := "testError", + "source" := "testSource" + ) + ), + schema = None, + data = Json.obj("testSource" := "testData").some, + timestamp = timestamp, + componentName = processor.artifact, + componentVersion = processor.version + ) + fe must beEqualTo(expected) + } + + "convert NotIglu correctly" >> { + val sv = Failure.SchemaViolation( + schemaViolation = FailureDetails.SchemaViolation.NotIglu( + json = Json.Null, + error = ParseError.InvalidSchema + ), + source = "testSource", + data = "testData".asJson + ) + val fe = Failure.fromSchemaViolation(sv, timestamp, processor) + val expected = Failure.FailureContext( + failureType = "NotIglu", + errors = List( + Json.obj( + "message" := "Invalid schema", + "source" := "testSource" + ) + ), + schema = None, + data = "testData".asJson.some, + timestamp = timestamp, + componentName = processor.artifact, + componentVersion = processor.version + ) + fe must beEqualTo(expected) + } + + "convert CriterionMismatch correctly" >> { + val sv = Failure.SchemaViolation( + schemaViolation = FailureDetails.SchemaViolation.CriterionMismatch( + schemaKey = schemaKey, + schemaCriterion = schemaCriterion + ), + source = "testSource", + data = "testData".asJson + ) + val fe = Failure.fromSchemaViolation(sv, timestamp, processor) + val expected = Failure.FailureContext( + failureType = "CriterionMismatch", + errors = List( + Json.obj( + "message" := "Unexpected schema: iglu:com.snowplowanalytics/test/jsonschema/1-0-0 does not match the criterion", + "source" := "testSource", + "criterion" := "iglu:com.snowplowanalytics/test/jsonschema/1-*-*" + ) + ), + schema = schemaKey.some, + data = "testData".asJson.some, + timestamp = timestamp, + componentName = processor.artifact, + componentVersion = processor.version + ) + fe must beEqualTo(expected) + } + + "convert ResolutionError correctly" >> { + val sv = Failure.SchemaViolation( + schemaViolation = FailureDetails.SchemaViolation.IgluError( + schemaKey = schemaKey, + error = ClientError.ResolutionError( + value = SortedMap( + "repo1" -> LookupHistory( + errors = Set.empty, + attempts = 1, + lastAttempt = timestamp + ), + "repo2" -> LookupHistory( + errors = Set.empty, + attempts = 2, + lastAttempt = timestamp + ) + ) + ) + ), + source = "testSource", + data = "testData".asJson + ) + val fe = Failure.fromSchemaViolation(sv, timestamp, processor) + val expected = Failure.FailureContext( + failureType = "ResolutionError", + errors = List( + Json.obj( + "message" := "Resolution error: schema iglu:com.snowplowanalytics/test/jsonschema/1-0-0 not found", + "source" := "testSource", + "lookupHistory" := Json.arr( + Json.obj("repository" := "repo1", "errors" := List.empty[String], "attempts" := 1, "lastAttempt" := timestamp), + Json.obj("repository" := "repo2", "errors" := List.empty[String], "attempts" := 2, "lastAttempt" := timestamp) + ) + ) + ), + schema = schemaKey.some, + data = "testData".asJson.some, + timestamp = timestamp, + componentName = processor.artifact, + componentVersion = processor.version + ) + fe must beEqualTo(expected) + } + + "convert InvalidData correctly" >> { + def createSv(schemaKey: SchemaKey) = + Failure.SchemaViolation( + schemaViolation = FailureDetails.SchemaViolation.IgluError( + schemaKey = schemaKey, + error = ClientError.ValidationError( + error = ValidatorError.InvalidData( + messages = NonEmptyList.of( + ValidatorReport(message = "testMessage1", + path = "testPath1".some, + targets = List("testTarget1"), + keyword = "testKeyword1".some + ), + ValidatorReport(message = "testMessage2", + path = "testPath2".some, + targets = List("testTarget2"), + keyword = "testKeyword2".some + ) + ) + ), + supersededBy = None + ) + ), + source = "testSource", + data = "testData".asJson + ) + + val svWithAtomicSchema = createSv(AtomicFields.atomicSchema) + val svWithOrdinarySchema = createSv(schemaKey) + val feWithAtomicSchema = Failure.fromSchemaViolation(svWithAtomicSchema, timestamp, processor) + val feWithOrdinarySchema = Failure.fromSchemaViolation(svWithOrdinarySchema, timestamp, processor) + val expectedWithAtomicSchema = Failure.FailureContext( + failureType = "ValidationError", + errors = List( + Json.obj("message" := "testMessage1", + "source" := "testPath1", + "path" := "testPath1", + "keyword" := "testKeyword1", + "targets" := List("testTarget1") + ), + Json.obj("message" := "testMessage2", + "source" := "testPath2", + "path" := "testPath2", + "keyword" := "testKeyword2", + "targets" := List("testTarget2") + ) + ), + schema = AtomicFields.atomicSchema.some, + data = "testData".asJson.some, + timestamp = timestamp, + componentName = processor.artifact, + componentVersion = processor.version + ) + val expectedWithOrdinarySchema = Failure.FailureContext( + failureType = "ValidationError", + errors = List( + Json.obj("message" := "testMessage1", + "source" := "testSource", + "path" := "testPath1", + "keyword" := "testKeyword1", + "targets" := List("testTarget1") + ), + Json.obj("message" := "testMessage2", + "source" := "testSource", + "path" := "testPath2", + "keyword" := "testKeyword2", + "targets" := List("testTarget2") + ) + ), + schema = schemaKey.some, + data = "testData".asJson.some, + timestamp = timestamp, + componentName = processor.artifact, + componentVersion = processor.version + ) + + feWithAtomicSchema must beEqualTo(expectedWithAtomicSchema) + feWithOrdinarySchema must beEqualTo(expectedWithOrdinarySchema) + } + + "convert InvalidSchema correctly" >> { + val sv = Failure.SchemaViolation( + schemaViolation = FailureDetails.SchemaViolation.IgluError( + schemaKey = schemaKey, + error = ClientError.ValidationError( + error = ValidatorError.InvalidSchema( + issues = NonEmptyList.of( + ValidatorError.SchemaIssue(path = "testPath1", message = "testMessage1"), + ValidatorError.SchemaIssue(path = "testPath2", message = "testMessage2") + ) + ), + supersededBy = None + ) + ), + source = "testSource", + data = "testData".asJson + ) + val fe = Failure.fromSchemaViolation(sv, timestamp, processor) + val expected = Failure.FailureContext( + failureType = "ValidationError", + errors = List( + Json.obj( + "message" := "Invalid schema: iglu:com.snowplowanalytics/test/jsonschema/1-0-0 - testMessage1", + "source" := "testSource", + "path" := "testPath1" + ), + Json.obj( + "message" := "Invalid schema: iglu:com.snowplowanalytics/test/jsonschema/1-0-0 - testMessage2", + "source" := "testSource", + "path" := "testPath2" + ) + ), + schema = schemaKey.some, + data = "testData".asJson.some, + timestamp = timestamp, + componentName = processor.artifact, + componentVersion = processor.version + ) + fe must beEqualTo(expected) + } + } +} diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/utils/IgluUtilsSpec.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/utils/IgluUtilsSpec.scala index 607b3a5c3..38aa5500d 100644 --- a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/utils/IgluUtilsSpec.scala +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/utils/IgluUtilsSpec.scala @@ -16,6 +16,8 @@ import org.specs2.matcher.ValidatedMatchers import cats.effect.testing.specs2.CatsEffect import io.circe.parser.parse +import io.circe.Json +import io.circe.syntax._ import cats.data.{Ior, NonEmptyList} @@ -24,7 +26,9 @@ import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer} import com.snowplowanalytics.iglu.client.ClientError.{ResolutionError, ValidationError} import com.snowplowanalytics.snowplow.badrows._ +import com.snowplowanalytics.snowplow.badrows.FailureDetails +import com.snowplowanalytics.snowplow.enrich.common.enrichments.Failure import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent import com.snowplowanalytics.snowplow.enrich.common.SpecHelpers import com.snowplowanalytics.snowplow.enrich.common.adapters.RawEvent @@ -42,7 +46,12 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect val processor = Processor("unit tests SCE", "v42") val enriched = new EnrichedEvent() + val unstructFieldName = "unstruct" + val contextsFieldName = "contexts" + val derivedContextsFieldName = "derived_contexts" + val notJson = "foo" + val jsonNotJson = notJson.asJson // Just jsonized version of the string val notIglu = """{"foo":"bar"}""" val unstructSchema = SchemaKey( @@ -142,7 +151,7 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect } } - "return a SchemaViolation.NotJson if unstruct_event does not contain a properly formatted JSON string" >> { + "return a FailureDetails.SchemaViolation.NotJson if unstruct_event does not contain a properly formatted JSON string" >> { val input = new EnrichedEvent input.setUnstruct_event(notJson) @@ -150,12 +159,20 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .extractAndValidateUnstructEvent(input, SpecHelpers.client, SpecHelpers.registryLookup) .value .map { - case Ior.Both(NonEmptyList(_: FailureDetails.SchemaViolation.NotJson, _), None) => ok + case Ior.Both( + NonEmptyList( + Failure.SchemaViolation(_: FailureDetails.SchemaViolation.NotJson, `unstructFieldName`, `jsonNotJson`), + _ + ), + None + ) => + ok case other => ko(s"[$other] is not an error with NotJson") } } - "return a SchemaViolation.NotIglu if unstruct_event contains a properly formatted JSON string that is not self-describing" >> { + "return a FailureDetails.SchemaViolation.NotIglu if unstruct_event contains a properly formatted JSON string that is not self-describing" >> { + val json = notIglu.toJson val input = new EnrichedEvent input.setUnstruct_event(notIglu) @@ -163,12 +180,17 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .extractAndValidateUnstructEvent(input, SpecHelpers.client, SpecHelpers.registryLookup) .value .map { - case Ior.Both(NonEmptyList(_: FailureDetails.SchemaViolation.NotIglu, _), None) => ok + case Ior.Both( + NonEmptyList(Failure.SchemaViolation(_: FailureDetails.SchemaViolation.NotIglu, `unstructFieldName`, `json`), _), + None + ) => + ok case other => ko(s"[$other] is not an error with NotIglu") } } - "return a SchemaViolation.CriterionMismatch if unstruct_event contains a self-describing JSON but not with the expected schema for unstructured events" >> { + "return a FailureDetails.SchemaViolation.CriterionMismatch if unstruct_event contains a self-describing JSON but not with the expected schema for unstructured events" >> { + val json = noSchema.toJson val input = new EnrichedEvent input.setUnstruct_event(noSchema) @@ -176,66 +198,102 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .extractAndValidateUnstructEvent(input, SpecHelpers.client, SpecHelpers.registryLookup) .value .map { - case Ior.Both(NonEmptyList(_: FailureDetails.SchemaViolation.CriterionMismatch, _), None) => ok + case Ior.Both( + NonEmptyList( + Failure.SchemaViolation(_: FailureDetails.SchemaViolation.CriterionMismatch, `unstructFieldName`, `json`), + _ + ), + None + ) => + ok case other => ko(s"[$other] is not an error with CriterionMismatch") } } - "return a SchemaViolation.NotJson if the JSON in .data is not a JSON" >> { + "return a FailureDetails.SchemaViolation.NotJson if the JSON in .data is not a JSON" >> { val input = new EnrichedEvent - input.setUnstruct_event(buildUnstruct(notJson)) + val ue = buildUnstruct(notJson) + val ueJson = ue.asJson + input.setUnstruct_event(ue) IgluUtils .extractAndValidateUnstructEvent(input, SpecHelpers.client, SpecHelpers.registryLookup) .value .map { - case Ior.Both(NonEmptyList(_: FailureDetails.SchemaViolation.NotJson, _), None) => ok + case Ior.Both(NonEmptyList( + Failure.SchemaViolation(_: FailureDetails.SchemaViolation.NotJson, `unstructFieldName`, `ueJson`), + _ + ), + None + ) => + ok case other => ko(s"[$other] is not an error with NotJson") } } - "return a SchemaViolation.IgluError containing a ValidationError if the JSON in .data is not self-describing" >> { + "return a FailureDetails.SchemaViolation.IgluError containing a ValidationError if the JSON in .data is not self-describing" >> { val input = new EnrichedEvent - input.setUnstruct_event(buildUnstruct(notIglu)) + val ue = buildUnstruct(notIglu) + val ueJson = ue.toJson + input.setUnstruct_event(ue) IgluUtils .extractAndValidateUnstructEvent(input, SpecHelpers.client, SpecHelpers.registryLookup) .value .map { - case Ior.Both(NonEmptyList(FailureDetails.SchemaViolation.IgluError(_, ValidationError(_, _)), _), None) => ok - case Ior.Both(NonEmptyList(ie: FailureDetails.SchemaViolation.IgluError, _), None) => - ko(s"IgluError [$ie] is not ValidationError") - case other => ko(s"[$other] is not an error with IgluError") + case Ior.Both(NonEmptyList(Failure.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ValidationError), + `unstructFieldName`, + `ueJson` + ), + _ + ), + None + ) => + ok + case other => ko(s"[$other] is not expected one") } } - "return a SchemaViolation.IgluError containing a ValidationError if the JSON in .data is not a valid SDJ" >> { + "return a FailureDetails.SchemaViolation.IgluError containing a ValidationError if the JSON in .data is not a valid SDJ" >> { val input = new EnrichedEvent + val json = invalidEmailSent.toJson input.setUnstruct_event(buildUnstruct(invalidEmailSent)) IgluUtils .extractAndValidateUnstructEvent(input, SpecHelpers.client, SpecHelpers.registryLookup) .value .map { - case Ior.Both(NonEmptyList(FailureDetails.SchemaViolation.IgluError(_, ValidationError(_, _)), _), None) => ok - case Ior.Both(NonEmptyList(ie: FailureDetails.SchemaViolation.IgluError, _), None) => - ko(s"IgluError [$ie] is not ValidationError") - case other => ko(s"[$other] is not an error with IgluError") + case Ior.Both(NonEmptyList(Failure.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ValidationError), + `unstructFieldName`, + `json` + ), + _ + ), + None + ) => + ok + case other => ko(s"[$other] is not expected one") } } - "return a SchemaViolation.IgluError containing a ResolutionError if the schema of the SDJ in .data can't be resolved" >> { + "return a FailureDetails.SchemaViolation.IgluError containing a ResolutionError if the schema of the SDJ in .data can't be resolved" >> { val input = new EnrichedEvent + val json = noSchema.toJson input.setUnstruct_event(buildUnstruct(noSchema)) IgluUtils .extractAndValidateUnstructEvent(input, SpecHelpers.client, SpecHelpers.registryLookup) .value .map { - case Ior.Both(NonEmptyList(FailureDetails.SchemaViolation.IgluError(_, ResolutionError(_)), _), None) => ok - case Ior.Both(NonEmptyList(ie: FailureDetails.SchemaViolation.IgluError, _), None) => - ko(s"IgluError [$ie] is not a ResolutionError") - case other => ko(s"[$other] is not an error with IgluError") + case Ior.Both( + NonEmptyList( + Failure.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ResolutionError), `unstructFieldName`, `json`), + _ + ), + None + ) => + ok + case other => ko(s"[$other] is not expected one") } } @@ -307,7 +365,7 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect } } - "return a SchemaViolation.NotJson if .contexts does not contain a properly formatted JSON string" >> { + "return a FailureDetails.SchemaViolation.NotJson if .contexts does not contain a properly formatted JSON string" >> { val input = new EnrichedEvent input.setContexts(notJson) @@ -315,89 +373,122 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .extractAndValidateInputContexts(input, SpecHelpers.client, SpecHelpers.registryLookup) .value .map { - case Ior.Both(NonEmptyList(_: FailureDetails.SchemaViolation.NotJson, Nil), Nil) => ok + case Ior.Both( + NonEmptyList(Failure.SchemaViolation(_: FailureDetails.SchemaViolation.NotJson, `contextsFieldName`, `jsonNotJson`), Nil), + Nil + ) => + ok case other => ko(s"[$other] is not an error with NotJson") } } - "return a SchemaViolation.NotIglu if .contexts contains a properly formatted JSON string that is not self-describing" >> { + "return a FailureDetails.SchemaViolation.NotIglu if .contexts contains a properly formatted JSON string that is not self-describing" >> { val input = new EnrichedEvent + val json = notIglu.toJson input.setContexts(notIglu) IgluUtils .extractAndValidateInputContexts(input, SpecHelpers.client, SpecHelpers.registryLookup) .value .map { - case Ior.Both(NonEmptyList(_: FailureDetails.SchemaViolation.NotIglu, Nil), Nil) => ok + case Ior.Both( + NonEmptyList(Failure.SchemaViolation(_: FailureDetails.SchemaViolation.NotIglu, `contextsFieldName`, `json`), Nil), + Nil + ) => + ok case other => ko(s"[$other] is not an error with NotIglu") } } - "return a SchemaViolation.CriterionMismatch if .contexts contains a self-describing JSON but not with the right schema" >> { + "return a FailureDetails.SchemaViolation.CriterionMismatch if .contexts contains a self-describing JSON but not with the right schema" >> { val input = new EnrichedEvent + val json = noSchema.toJson input.setContexts(noSchema) IgluUtils .extractAndValidateInputContexts(input, SpecHelpers.client, SpecHelpers.registryLookup) .value .map { - case Ior.Both(NonEmptyList(_: FailureDetails.SchemaViolation.CriterionMismatch, Nil), Nil) => ok + case Ior.Both(NonEmptyList( + Failure.SchemaViolation(_: FailureDetails.SchemaViolation.CriterionMismatch, `contextsFieldName`, `json`), + Nil + ), + Nil + ) => + ok case other => ko(s"[$other] is not an error with CriterionMismatch") } } - "return a SchemaViolation.IgluError containing a ValidationError if .data does not contain an array of JSON objects" >> { + "return a FailureDetails.SchemaViolation.IgluError containing a ValidationError if .data does not contain an array of JSON objects" >> { val input = new EnrichedEvent val notArrayContexts = s"""{"schema": "${inputContextsSchema.toSchemaUri}", "data": ${emailSent1}}""" + val json = notArrayContexts.toJson input.setContexts(notArrayContexts) IgluUtils .extractAndValidateInputContexts(input, SpecHelpers.client, SpecHelpers.registryLookup) .value .map { - case Ior.Both(NonEmptyList(FailureDetails.SchemaViolation.IgluError(_, ValidationError(_, _)), Nil), Nil) => + case Ior.Both( + NonEmptyList( + Failure.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ValidationError), `contextsFieldName`, `json`), + Nil + ), + Nil + ) => ok - case Ior.Both(NonEmptyList(ie: FailureDetails.SchemaViolation.IgluError, Nil), Nil) => - ko(s"IgluError [$ie] is not ValidationError") - case other => ko(s"[$other] is not an error with IgluError") + case other => ko(s"[$other] is not expected one") } } - "return a SchemaViolation.IgluError containing a ValidationError if .data contains one invalid context" >> { + "return a FailureDetails.SchemaViolation.IgluError containing a ValidationError if .data contains one invalid context" >> { val input = new EnrichedEvent + val json = invalidEmailSent.toJson input.setContexts(buildInputContexts(List(invalidEmailSent))) IgluUtils .extractAndValidateInputContexts(input, SpecHelpers.client, SpecHelpers.registryLookup) .value .map { - case Ior.Both(NonEmptyList(FailureDetails.SchemaViolation.IgluError(_, ValidationError(_, _)), Nil), Nil) => + case Ior.Both( + NonEmptyList( + Failure.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ValidationError), `contextsFieldName`, `json`), + Nil + ), + Nil + ) => ok - case Ior.Both(NonEmptyList(ie: FailureDetails.SchemaViolation.IgluError, Nil), Nil) => - ko(s"IgluError [$ie] is not ValidationError") - case other => ko(s"[$other] is not an error with IgluError") + case other => ko(s"[$other] is not expected one") } } - "return a SchemaViolation.IgluError containing a ResolutionError if .data contains one context whose schema can't be resolved" >> { + "return a FailureDetails.SchemaViolation.IgluError containing a ResolutionError if .data contains one context whose schema can't be resolved" >> { val input = new EnrichedEvent + val json = noSchema.toJson input.setContexts(buildInputContexts(List(noSchema))) IgluUtils .extractAndValidateInputContexts(input, SpecHelpers.client, SpecHelpers.registryLookup) .value .map { - case Ior.Both(NonEmptyList(FailureDetails.SchemaViolation.IgluError(_, ResolutionError(_)), Nil), Nil) => + case Ior.Both( + NonEmptyList( + Failure.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ResolutionError), `contextsFieldName`, `json`), + Nil + ), + Nil + ) => ok - case Ior.Both(NonEmptyList(ie: FailureDetails.SchemaViolation.IgluError, Nil), Nil) => - ko(s"IgluError [$ie] is not ResolutionError") - case other => ko(s"[$other] is not an error with IgluError") + case other => ko(s"[$other] is not expected one") } } "return 2 expected failures for 2 invalid contexts" >> { val input = new EnrichedEvent + val invalidEmailSentJson = invalidEmailSent.toJson + val noSchemaJson = noSchema.toJson input.setContexts(buildInputContexts(List(invalidEmailSent, noSchema))) IgluUtils @@ -405,8 +496,16 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .value .map { case Ior.Both(NonEmptyList( - FailureDetails.SchemaViolation.IgluError(_, ValidationError(_, _)), - List(FailureDetails.SchemaViolation.IgluError(_, ResolutionError(_))) + Failure.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ValidationError), + `contextsFieldName`, + `invalidEmailSentJson` + ), + List( + Failure.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ResolutionError), + `contextsFieldName`, + `noSchemaJson` + ) + ) ), Nil ) => @@ -417,14 +516,19 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect "return an expected failure and an expected SDJ if one context is invalid and one is valid" >> { val input = new EnrichedEvent + val noSchemaJson = noSchema.toJson input.setContexts(buildInputContexts(List(emailSent1, noSchema))) IgluUtils .extractAndValidateInputContexts(input, SpecHelpers.client, SpecHelpers.registryLookup) .value .map { - case Ior.Both(NonEmptyList(_: FailureDetails.SchemaViolation.IgluError, Nil), List(extract)) - if extract.sdj.schema == emailSentSchema => + case Ior.Both(NonEmptyList( + Failure.SchemaViolation(_: FailureDetails.SchemaViolation.IgluError, `contextsFieldName`, `noSchemaJson`), + Nil + ), + List(extract) + ) if extract.sdj.schema == emailSentSchema => ok case other => ko(s"[$other] is not one IgluError and one SDJ with schema $emailSentSchema") } @@ -478,6 +582,7 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect "validateEnrichmentsContexts" should { "return one expected SchemaViolation for one invalid context" >> { + val json = invalidEmailSent.toJson val contexts = List( SpecHelpers.jsonStringToSDJ(invalidEmailSent).right.get ) @@ -486,12 +591,23 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .validateEnrichmentsContexts(SpecHelpers.client, contexts, SpecHelpers.registryLookup) .value .map { - case Ior.Both(NonEmptyList(FailureDetails.SchemaViolation.IgluError(_, ValidationError(_, _)), Nil), Nil) => ok + case Ior.Both( + NonEmptyList(Failure.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ValidationError), + `derivedContextsFieldName`, + `json` + ), + Nil + ), + Nil + ) => + ok case other => ko(s"[$other] is not one ValidationError") } } "return two expected SchemaViolation for two invalid contexts" >> { + val invalidEmailSentJson = invalidEmailSent.toJson + val noSchemaJson = noSchema.toJson val contexts = List( SpecHelpers.jsonStringToSDJ(invalidEmailSent).right.get, SpecHelpers.jsonStringToSDJ(noSchema).right.get @@ -501,8 +617,17 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .validateEnrichmentsContexts(SpecHelpers.client, contexts, SpecHelpers.registryLookup) .value .map { - case Ior.Both(NonEmptyList(FailureDetails.SchemaViolation.IgluError(_, ValidationError(_, _)), - List(FailureDetails.SchemaViolation.IgluError(_, ResolutionError(_))) + case Ior.Both(NonEmptyList( + Failure.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ValidationError), + `derivedContextsFieldName`, + `invalidEmailSentJson` + ), + List( + Failure.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ResolutionError), + `derivedContextsFieldName`, + `noSchemaJson` + ) + ) ), Nil ) => @@ -512,6 +637,7 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect } "return one expected SchemaViolation for one invalid context and one valid" >> { + val invalidEmailSentJson = invalidEmailSent.toJson val contexts = List( SpecHelpers.jsonStringToSDJ(invalidEmailSent).right.get, SpecHelpers.jsonStringToSDJ(emailSent1).right.get @@ -522,7 +648,10 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .value .map { case Ior.Both(NonEmptyList( - FailureDetails.SchemaViolation.IgluError(_, ValidationError(_, _)), + Failure.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ValidationError), + `derivedContextsFieldName`, + `invalidEmailSentJson` + ), Nil ), List(sdj) @@ -563,7 +692,7 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .map { case Ior.Both( NonEmptyList( - _: FailureDetails.SchemaViolation, + _: Failure.SchemaViolation, Nil ), IgluUtils.EventExtractResult(Nil, None, Nil) @@ -587,7 +716,7 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .map { case Ior.Both( NonEmptyList( - _: FailureDetails.SchemaViolation, + _: Failure.SchemaViolation, Nil ), IgluUtils.EventExtractResult(Nil, None, Nil) @@ -612,8 +741,8 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .map { case Ior.Both( NonEmptyList( - _: FailureDetails.SchemaViolation, - List(_: FailureDetails.SchemaViolation) + _: Failure.SchemaViolation, + List(_: Failure.SchemaViolation) ), IgluUtils.EventExtractResult(Nil, None, Nil) ) => @@ -649,6 +778,7 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect "return the SchemaViolation of the invalid context in the Left and the extracted unstructured event in the Right" >> { val input = new EnrichedEvent + val invalidEmailSentJson = invalidEmailSent.toJson input.setUnstruct_event(buildUnstruct(emailSent1)) input.setContexts(buildInputContexts(List(invalidEmailSent))) @@ -661,7 +791,12 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .value .map { case Ior.Both( - NonEmptyList(FailureDetails.SchemaViolation.IgluError(_, ValidationError(_, _)), _), + NonEmptyList(Failure.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ValidationError), + `contextsFieldName`, + `invalidEmailSentJson` + ), + _ + ), extract ) if extract.contexts.isEmpty && extract.unstructEvent.isDefined && extract.unstructEvent.get.schema == emailSentSchema => ok @@ -674,6 +809,7 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect "return the SchemaViolation of the invalid unstructured event in the Left and the valid context in the Right" >> { val input = new EnrichedEvent + val invalidEmailSentJson = invalidEmailSent.toJson input.setUnstruct_event(buildUnstruct(invalidEmailSent)) input.setContexts(buildInputContexts(List(emailSent1))) @@ -686,7 +822,12 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .value .map { case Ior.Both( - NonEmptyList(FailureDetails.SchemaViolation.IgluError(_, ValidationError(_, _)), _), + NonEmptyList(Failure.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ValidationError), + `unstructFieldName`, + `invalidEmailSentJson` + ), + _ + ), extract ) if extract.contexts.size == 1 && extract.contexts.head.schema == emailSentSchema && extract.unstructEvent.isEmpty => ok @@ -702,12 +843,11 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect input.setUnstruct_event(buildUnstruct(supersedingExample1)) input.setContexts(buildInputContexts(List(supersedingExample1, supersedingExample2))) - val expectedValidationInfoContext = parse( + val expectedValidationInfoContext = """ { | "originalSchema" : "iglu:com.acme/superseding_example/jsonschema/1-0-0", | "validatedWith" : "1-0-1" - |}""".stripMargin - ).toOption.get + |}""".stripMargin.toJson IgluUtils .extractAndValidateInputJsons( @@ -737,4 +877,8 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect def buildInputContexts(sdjs: List[String] = List.empty[String]) = s"""{"schema": "${inputContextsSchema.toSchemaUri}", "data": [${sdjs.mkString(",")}]}""" + + implicit class StringToJson(str: String) { + def toJson: Json = parse(str).toOption.get + } }