Skip to content

Commit

Permalink
Emit bad row if enriching times out
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Oct 10, 2023
1 parent b13099e commit 0e4ceae
Showing 1 changed file with 19 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.concurrent.duration._
import org.joda.time.DateTime

import cats.data.{NonEmptyList, ValidatedNel}
import cats.data.Validated
import cats.{Monad, Parallel}
import cats.implicits._

Expand Down Expand Up @@ -85,10 +86,23 @@ object Enrich {
.evalMap(chunk =>
for {
begin <- Clock[F].realTime(TimeUnit.MILLISECONDS)
result <-
env.semaphore.withPermit(
chunk.toList.map { case (orig, bytes) => enrich(bytes).map((orig, _)) }.parSequenceN(env.streamsSettings.concurrency.enrich)
)
result <- env.semaphore.withPermit(
chunk.toList
.map {
case (orig, bytes) =>
val timeout = 1.minute
val badRow = genericBadRow(
bytes,
Instant.now(),
new RuntimeException(s"Enriching the event timed out ($timeout)"),
env.processor
)
enrich(bytes)
.timeoutTo(timeout, Sync[F].pure((List(Validated.Invalid(badRow)), None)))
.map((orig, _))
}
.parSequenceN(env.streamsSettings.concurrency.enrich)
)
end <- Clock[F].realTime(TimeUnit.MILLISECONDS)
_ <- Logger[F].debug(s"Chunk of size ${chunk.size} enriched in ${end - begin} ms")
} yield result
Expand All @@ -112,7 +126,7 @@ object Enrich {
* Enrich a single `CollectorPayload` to get list of bad rows and/or enriched events
* @return enriched event or bad row, along with the collector timestamp
*/
def enrichWith[F[_]: Clock: ContextShift: RegistryLookup: Sync: HttpClient](
def enrichWith[F[_]: Clock: ContextShift: RegistryLookup: Sync: Timer: HttpClient](
enrichRegistry: F[EnrichmentRegistry[F]],
adapterRegistry: AdapterRegistry,
igluClient: IgluCirceClient[F],
Expand Down

0 comments on commit 0e4ceae

Please sign in to comment.