diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala index 5b2cacb18..8143d15f9 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala @@ -46,7 +46,8 @@ import com.snowplowanalytics.snowplow.enrich.common.utils.{IgluUtils, Conversion object EnrichmentManager { - private type EnrichmentResult[F[_]] = IorT[F, NonEmptyList[FailureEntity.BadRowWithFailureEntities], (EnrichedEvent, List[SelfDescribingData[Json]])] + private type EnrichmentResult[F[_]] = + IorT[F, NonEmptyList[FailureEntity.BadRowWithFailureEntities], (EnrichedEvent, List[SelfDescribingData[Json]])] /** * Run the enrichment workflow @@ -103,18 +104,18 @@ object EnrichmentManager { .leftMap(NonEmptyList.one) .possiblyExitingEarly(emitIncomplete) validContexts <- validateEnriched( - enriched, - raw, - enrichmentsContexts, - client, - processor, - registryLookup, - featureFlags.acceptInvalid, - invalidCount, - atomicFields - ) - .leftMap(NonEmptyList.one) - .possiblyExitingEarly(emitIncomplete) + enriched, + raw, + enrichmentsContexts, + client, + processor, + registryLookup, + featureFlags.acceptInvalid, + invalidCount, + atomicFields + ) + .leftMap(NonEmptyList.one) + .possiblyExitingEarly(emitIncomplete) } yield (enriched, validContexts ::: extractResult.validationInfoContexts) // derived contexts are set lastly because we want to include failure entities @@ -125,18 +126,20 @@ object EnrichmentManager { private def setDerivedContexts[F[_]: Sync](enriched: EnrichmentResult[F]): EnrichmentResult[F] = IorT( - enriched.value.flatTap(v => Sync[F].delay { - val (derivedContexts, enriched) = v match { - case Ior.Right((e, l)) => (l, e.some) - case Ior.Left(l) => (extractFailureEntities(l), None) - case Ior.Both(b, (e, l)) => (l ::: extractFailureEntities(b), e.some) + enriched.value.flatTap(v => + Sync[F].delay { + val (derivedContexts, enriched) = v match { + case Ior.Right((e, l)) => (l, e.some) + case Ior.Left(l) => (extractFailureEntities(l), None) + case Ior.Both(b, (e, l)) => (l ::: extractFailureEntities(b), e.some) + } + for { + c <- ME.formatContexts(derivedContexts) + e <- enriched + _ = e.derived_contexts = c + } yield () } - for { - c <- ME.formatContexts(derivedContexts) - e <- enriched - _ = e.derived_contexts = c - } yield () - }) + ) ) private def extractFailureEntities(l: NonEmptyList[FailureEntity.BadRowWithFailureEntities]): List[SelfDescribingData[Json]] = @@ -157,12 +160,12 @@ object EnrichmentManager { } yield extract iorT.leftMap { violations => - buildSchemaViolationsBadRow( - violations, - EnrichedEvent.toPartiallyEnrichedEvent(enrichedEvent), - RawEvent.toRawEvent(raw), - processor - ) + buildSchemaViolationsBadRow( + violations, + EnrichedEvent.toPartiallyEnrichedEvent(enrichedEvent), + RawEvent.toRawEvent(raw), + processor + ) } } diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/FailureEntity.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/FailureEntity.scala index 87d7dcf62..fab511a61 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/FailureEntity.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/FailureEntity.scala @@ -158,8 +158,9 @@ object FailureEntity { case FailureDetails.SchemaViolation.IgluError(schemaKey, ClientError.ResolutionError(lh)) => val message = s"Resolution error: schema ${schemaKey.toSchemaUri} not found" val lookupHistory = lh.toList - .map { case (repo, lookups) => - lookups.asJson.deepMerge(Json.obj("repository" := repo.asJson)) + .map { + case (repo, lookups) => + lookups.asJson.deepMerge(Json.obj("repository" := repo.asJson)) } FailureEntity( failureType = "ResolutionError", @@ -176,9 +177,7 @@ object FailureEntity { componentName = processor.artifact, componentVersion = processor.version ) - case FailureDetails.SchemaViolation.IgluError( - schemaKey, - ClientError.ValidationError(ValidatorError.InvalidData(e), _)) => + case FailureDetails.SchemaViolation.IgluError(schemaKey, ClientError.ValidationError(ValidatorError.InvalidData(e), _)) => val errors = e.toList.map { r => Json.obj( "message" := r.message, @@ -197,9 +196,7 @@ object FailureEntity { componentName = processor.artifact, componentVersion = processor.version ) - case FailureDetails.SchemaViolation.IgluError( - schemaKey, - ClientError.ValidationError(ValidatorError.InvalidSchema(e), _)) => + case FailureDetails.SchemaViolation.IgluError(schemaKey, ClientError.ValidationError(ValidatorError.InvalidSchema(e), _)) => val errors = e.toList.map { r => Json.obj( "message" := s"Invalid schema: $schemaKey - ${r.message}", diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/utils/IgluUtilsSpec.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/utils/IgluUtilsSpec.scala index 4a406a6e4..7f270bb08 100644 --- a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/utils/IgluUtilsSpec.scala +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/utils/IgluUtilsSpec.scala @@ -204,7 +204,8 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .extractAndValidateUnstructEvent(input, SpecHelpers.client, SpecHelpers.registryLookup) .value .map { - case Ior.Both(NonEmptyList(SchemaViolationWithExtraContext(SchemaViolation.IgluError(_, _: ValidationError), _, _), _), None) => ok + case Ior.Both(NonEmptyList(SchemaViolationWithExtraContext(SchemaViolation.IgluError(_, _: ValidationError), _, _), _), None) => + ok case Ior.Both(NonEmptyList(SchemaViolationWithExtraContext(ie: SchemaViolation.IgluError, _, _), _), None) => ko(s"IgluError [$ie] is not ValidationError") case other => ko(s"[$other] is not an error with IgluError") @@ -219,7 +220,8 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .extractAndValidateUnstructEvent(input, SpecHelpers.client, SpecHelpers.registryLookup) .value .map { - case Ior.Both(NonEmptyList(SchemaViolationWithExtraContext(SchemaViolation.IgluError(_, _: ValidationError), _, _), _), None) => ok + case Ior.Both(NonEmptyList(SchemaViolationWithExtraContext(SchemaViolation.IgluError(_, _: ValidationError), _, _), _), None) => + ok case Ior.Both(NonEmptyList(SchemaViolationWithExtraContext(ie: SchemaViolation.IgluError, _, _), _), None) => ko(s"IgluError [$ie] is not ValidationError") case other => ko(s"[$other] is not an error with IgluError") @@ -234,7 +236,8 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .extractAndValidateUnstructEvent(input, SpecHelpers.client, SpecHelpers.registryLookup) .value .map { - case Ior.Both(NonEmptyList(SchemaViolationWithExtraContext(SchemaViolation.IgluError(_, _: ResolutionError), _, _), _), None) => ok + case Ior.Both(NonEmptyList(SchemaViolationWithExtraContext(SchemaViolation.IgluError(_, _: ResolutionError), _, _), _), None) => + ok case Ior.Both(NonEmptyList(SchemaViolationWithExtraContext(ie: SchemaViolation.IgluError, _, _), _), None) => ko(s"IgluError [$ie] is not a ResolutionError") case other => ko(s"[$other] is not an error with IgluError") @@ -488,7 +491,8 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .validateEnrichmentsContexts(SpecHelpers.client, contexts, SpecHelpers.registryLookup) .value .map { - case Ior.Both(NonEmptyList(SchemaViolationWithExtraContext(SchemaViolation.IgluError(_, _: ValidationError), _, _), Nil), Nil) => ok + case Ior.Both(NonEmptyList(SchemaViolationWithExtraContext(SchemaViolation.IgluError(_, _: ValidationError), _, _), Nil), Nil) => + ok case other => ko(s"[$other] is not one ValidationError") } }