From 0e4ceaea26c7ced71489a79cac839ed0a06288b2 Mon Sep 17 00:00:00 2001 From: Benjamin Benoist Date: Thu, 5 Oct 2023 22:28:11 +0200 Subject: [PATCH] Emit bad row if enriching times out --- .../snowplow/enrich/common/fs2/Enrich.scala | 24 +++++++++++++++---- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Enrich.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Enrich.scala index d44b8cbb8..85caf45e9 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Enrich.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Enrich.scala @@ -22,6 +22,7 @@ import scala.concurrent.duration._ import org.joda.time.DateTime import cats.data.{NonEmptyList, ValidatedNel} +import cats.data.Validated import cats.{Monad, Parallel} import cats.implicits._ @@ -85,10 +86,23 @@ object Enrich { .evalMap(chunk => for { begin <- Clock[F].realTime(TimeUnit.MILLISECONDS) - result <- - env.semaphore.withPermit( - chunk.toList.map { case (orig, bytes) => enrich(bytes).map((orig, _)) }.parSequenceN(env.streamsSettings.concurrency.enrich) - ) + result <- env.semaphore.withPermit( + chunk.toList + .map { + case (orig, bytes) => + val timeout = 1.minute + val badRow = genericBadRow( + bytes, + Instant.now(), + new RuntimeException(s"Enriching the event timed out ($timeout)"), + env.processor + ) + enrich(bytes) + .timeoutTo(timeout, Sync[F].pure((List(Validated.Invalid(badRow)), None))) + .map((orig, _)) + } + .parSequenceN(env.streamsSettings.concurrency.enrich) + ) end <- Clock[F].realTime(TimeUnit.MILLISECONDS) _ <- Logger[F].debug(s"Chunk of size ${chunk.size} enriched in ${end - begin} ms") } yield result @@ -112,7 +126,7 @@ object Enrich { * Enrich a single `CollectorPayload` to get list of bad rows and/or enriched events * @return enriched event or bad row, along with the collector timestamp */ - def enrichWith[F[_]: Clock: ContextShift: RegistryLookup: Sync: HttpClient]( + def enrichWith[F[_]: Clock: ContextShift: RegistryLookup: Sync: Timer: HttpClient]( enrichRegistry: F[EnrichmentRegistry[F]], adapterRegistry: AdapterRegistry, igluClient: IgluCirceClient[F],