Skip to content

Commit

Permalink
Create SizeViolation bad row for oversized messages sent to Pubsub (c…
Browse files Browse the repository at this point in the history
…lose #361)
  • Loading branch information
spenes committed Oct 13, 2023
1 parent 7b3c5f6 commit 1e65e33
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import com.google.api.client.json.gson.GsonFactory
import com.google.api.gax.retrying.RetrySettings
import com.google.cloud.bigquery.{BigQuery, BigQueryOptions, InsertAllRequest, InsertAllResponse, TableId}
import com.google.cloud.bigquery.InsertAllRequest.RowToInsert
import com.permutive.pubsub.producer.PubsubProducer
import org.threeten.bp.Duration
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger
Expand All @@ -44,7 +43,7 @@ object Bigquery {
* Exceptions from the underlying `insertAll` call are propagated.
*/
def insert[F[_]: Parallel: Sync](
failedInsertProducer: PubsubProducer[F, FailedInsert],
failedInsertProducer: Producer[F, FailedInsert],
metrics: Metrics[F],
toLoad: List[LoaderRow]
)(
Expand Down Expand Up @@ -149,7 +148,7 @@ object Bigquery {

private def handleFailedRows[F[_]: Sync](
metrics: Metrics[F],
failedInsertProducer: PubsubProducer[F, FailedInsert],
failedInsertProducer: Producer[F, FailedInsert],
rows: List[LoaderRow]
): F[Unit] = {
val tableRows = rows.map { lr =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright (c) 2018-2023 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.storage.bigquery.streamloader

import java.time.Instant
import java.nio.charset.StandardCharsets

import cats.implicits._

import cats.effect.{Async, Resource}

import com.permutive.pubsub.producer.{Model, PubsubProducer}
import com.permutive.pubsub.producer.encoder.MessageEncoder
import com.permutive.pubsub.producer.grpc.{GooglePubsubProducer, PubsubProducerConfig}

import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure, Payload => BadRowPayload}

import org.typelevel.log4cats.Logger

import scala.concurrent.duration._

trait Producer[F[_], A] {
def produce(data: A): F[Unit]
}

object Producer {

val MaxPayloadLength = 9000000 // Stay under Pubsub Maximum of 10MB

def mkProducer[F[_]: Async: Logger, A: MessageEncoder](
projectId: String,
topic: String,
batchSize: Long,
delay: FiniteDuration,
oversizeBadRowProducer: Producer[F, BadRow.SizeViolation]
): Resource[F, Producer[F, A]] =
mkPubsubProducer[F, A](projectId, topic, batchSize, delay).map { p =>
(data: A) => produceWrtToSize[F, A](data, p.produce(_).void, oversizeBadRowProducer)
}

def mkBadRowProducers[F[_]: Async: Logger](
projectId: String,
topic: String,
batchSize: Long,
delay: FiniteDuration
): Resource[F, (Producer[F, BadRow], Producer[F, BadRow.SizeViolation])] =
mkPubsubProducer[F, BadRow](projectId, topic, batchSize, delay).map { p =>
val badRowProducer = new Producer[F, BadRow] {
override def produce(badRow: BadRow): F[Unit] = {
produceWrtToSize[F, BadRow](badRow, p.produce(_).void, p.produce(_).void)
}
}
val oversizeBadRowProducer = new Producer[F, BadRow.SizeViolation] {
override def produce(data: BadRow.SizeViolation): F[Unit] =
p.produce(data).void
}
(badRowProducer, oversizeBadRowProducer)
}

/** Construct a PubSub producer. */
private def mkPubsubProducer[F[_]: Async: Logger, A: MessageEncoder](
projectId: String,
topic: String,
batchSize: Long,
delay: FiniteDuration
): Resource[F, PubsubProducer[F, A]] =
GooglePubsubProducer.of[F, A](
Model.ProjectId(projectId),
Model.Topic(topic),
config = PubsubProducerConfig[F](
batchSize = batchSize,
delayThreshold = delay,
onFailedTerminate = e => Logger[F].error(e)(s"Error in PubSub producer")
)
)

def produceWrtToSize[F[_]: Async, A: MessageEncoder](
data: A,
producer: Producer[F, A],
oversizeBadRowProducer: Producer[F, BadRow.SizeViolation]
): F[Unit] = {
val dataSize = getSize(data)
if (dataSize >= MaxPayloadLength) {
val msg = s"Pubsub message exceeds allowed size"
val payload = MessageEncoder[A].encode(data)
.map(bytes => new String(bytes, StandardCharsets.UTF_8))
.getOrElse("Pubsub message can't be converted to string")
.take(MaxPayloadLength / 10)
val badRow = BadRow.SizeViolation(
processor,
Failure.SizeViolation(Instant.now(), MaxPayloadLength, dataSize, msg),
BadRowPayload.RawPayload(payload)
)
oversizeBadRowProducer.produce(badRow)
} else {
producer.produce(data).void
}
}

private def getSize[A: MessageEncoder](a: A): Int =
MessageEncoder[A].encode(a).map(_.length).getOrElse(Int.MaxValue)
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ import cats.effect.{Async, Resource, Sync}
import cats.implicits._
import retry.RetryPolicies
import com.google.cloud.bigquery.BigQuery
import com.permutive.pubsub.producer.{Model, PubsubProducer}
import com.permutive.pubsub.producer.encoder.MessageEncoder
import com.permutive.pubsub.producer.grpc.{GooglePubsubProducer, PubsubProducerConfig}
import com.snowplowanalytics.iglu.client.resolver.Resolver
import com.snowplowanalytics.iglu.client.resolver.Resolver.ResolverConfig
import com.snowplowanalytics.iglu.schemaddl.bigquery.Field
Expand Down Expand Up @@ -100,15 +97,32 @@ object Resources {
resolverConfig <- mkResolverConfig(env.resolverJson)
resolver <- mkResolver(resolverConfig)
metrics <- Resource.eval(mkMetricsReporter[F](env.monitoring))
types <- mkTypeSink[F](env.projectId, env.config.output.types.topic, env.config.sinkSettings.types, metrics)
bigquery <- Resource.eval[F, BigQuery](Bigquery.getClient(env.config.retrySettings, env.projectId))
failedInserts <- mkProducer[F, Bigquery.FailedInsert](
(badRowProducer, oversizeBadRowProducer) <- Producer.mkBadRowProducers[F](
env.projectId,
env.config.output.bad.topic,
env.config.sinkSettings.bad.producerBatchSize,
env.config.sinkSettings.bad.producerDelayThreshold
)
types <- mkTypeSink[F](
env.projectId,
env.config.output.types.topic,
env.config.sinkSettings.types,
metrics,
oversizeBadRowProducer
)
failedInserts <- Producer.mkProducer[F, Bigquery.FailedInsert](
env.projectId,
env.config.output.failedInserts.topic,
env.config.sinkSettings.failedInserts.producerBatchSize,
env.config.sinkSettings.failedInserts.producerDelayThreshold
env.config.sinkSettings.failedInserts.producerDelayThreshold,
oversizeBadRowProducer
)
badSink = mkBadSink[F](
env.config.sinkSettings.bad.sinkConcurrency,
metrics,
badRowProducer
)
badSink <- mkBadSink[F](env.projectId, env.config.output.bad.topic, env.config.sinkSettings.bad.sinkConcurrency, metrics, env.config.sinkSettings.bad)
sentry <- Sentry.init(env.monitoring.sentry)
http <- EmberClientBuilder.default[F].build
lookup <- Resource.eval(CreateLruMap[F, FieldKey, Field].create(resolverConfig.cacheSize))
Expand All @@ -118,56 +132,32 @@ object Resources {
} yield new Resources[F](source, resolver, badSink, goodSink, metrics, sentry, registryLookup, lookup)
// format: on

/** Construct a PubSub producer. */
private def mkProducer[F[_]: Async: Logger, A: MessageEncoder](
projectId: String,
topic: String,
batchSize: Long,
delay: FiniteDuration
): Resource[F, PubsubProducer[F, A]] =
GooglePubsubProducer.of[F, A](
Model.ProjectId(projectId),
Model.Topic(topic),
config = PubsubProducerConfig[F](
batchSize = batchSize,
delayThreshold = delay,
onFailedTerminate = e => Logger[F].error(e)(s"Error in PubSub producer")
)
)

// TODO: Can failed inserts be given their own sink like bad rows and types?

// Acks the event after processing it as a `BadRow`
private def mkBadSink[F[_]: Async: Logger](
projectId: String,
topic: String,
private def mkBadSink[F[_]: Async](
maxConcurrency: Int,
metrics: Metrics[F],
sinkSettingsBad: SinkSettings.Bad
): Resource[F, Pipe[F, StreamBadRow[F], Nothing]] =
mkProducer[F, BadRow](
projectId,
topic,
sinkSettingsBad.producerBatchSize,
sinkSettingsBad.producerDelayThreshold
).map { p =>
_.parEvalMapUnordered(maxConcurrency) { badRow =>
p.produce(badRow.row) *> badRow.ack *> metrics.badCount
}.drain
}
badRowProducer: Producer[F, BadRow]
): Pipe[F, StreamBadRow[F], Nothing] =
_.parEvalMapUnordered(maxConcurrency) { badRow =>
badRowProducer.produce(badRow.row) *> badRow.ack *> metrics.badCount
}.drain

// Does not ack the event -- it still needs to end up in one of the other targets
private def mkTypeSink[F[_]: Async: Logger](
projectId: String,
topic: String,
sinkSettingsTypes: SinkSettings.Types,
metrics: Metrics[F]
metrics: Metrics[F],
oversizeBadRowProducer: Producer[F, BadRow.SizeViolation]
): Resource[F, Pipe[F, Set[ShreddedType], Nothing]] =
mkProducer[F, Set[ShreddedType]](
Producer.mkProducer[F, Set[ShreddedType]](
projectId,
topic,
sinkSettingsTypes.producerBatchSize,
sinkSettingsTypes.producerDelayThreshold
sinkSettingsTypes.producerDelayThreshold,
oversizeBadRowProducer
).map { p =>
_.parEvalMapUnordered(sinkSettingsTypes.sinkConcurrency) { types =>
p.produce(types).void *> metrics.typesCount(types.size)
Expand All @@ -189,7 +179,7 @@ object Resources {
private def mkGoodSink[F[_]: Async: Parallel](
good: Output.BigQuery,
bigQuery: BigQuery,
producer: PubsubProducer[F, FailedInsert],
producer: Producer[F, FailedInsert],
metrics: Metrics[F],
typeSink: Pipe[F, Set[ShreddedType], Nothing],
sinkSettingsGood: SinkSettings.Good,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ package com.snowplowanalytics.snowplow.storage.bigquery.streamloader

import com.snowplowanalytics.iglu.client.Resolver
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup
import com.snowplowanalytics.snowplow.badrows.{BadRow, Processor}
import com.snowplowanalytics.snowplow.badrows.BadRow
import com.snowplowanalytics.snowplow.storage.bigquery.common.{FieldCache, LoaderRow}
import com.snowplowanalytics.snowplow.storage.bigquery.common.config.Environment.LoaderEnvironment

Expand All @@ -27,7 +27,6 @@ import fs2.Pipe
import org.typelevel.log4cats.Logger

object StreamLoader {
private val processor: Processor = Processor(generated.BuildInfo.name, generated.BuildInfo.version)

/**
* PubSub message with successfully parsed row, ready to be inserted into BQ.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ package com.snowplowanalytics.snowplow.storage.bigquery
import java.nio.charset.StandardCharsets

import com.snowplowanalytics.snowplow.analytics.scalasdk.Data.ShreddedType
import com.snowplowanalytics.snowplow.badrows.BadRow
import com.snowplowanalytics.snowplow.badrows.{BadRow, Processor}
import com.snowplowanalytics.snowplow.storage.bigquery.common.Codecs.toPayload
import com.snowplowanalytics.snowplow.storage.bigquery.streamloader.Bigquery.FailedInsert

Expand All @@ -27,6 +27,8 @@ package object streamloader {

type Payload[F[_]] = ConsumerRecord[F, String]

val processor: Processor = Processor(generated.BuildInfo.name, generated.BuildInfo.version)

implicit val messageDecoder: MessageDecoder[String] = (bytes: Array[Byte]) => {
Right(new String(bytes, StandardCharsets.UTF_8))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright (c) 2018-2023 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.storage.bigquery.streamloader

import scala.collection.mutable.ListBuffer
import scala.util.Random
import java.nio.charset.StandardCharsets
import cats.effect.IO
import cats.effect.unsafe.IORuntime
import com.permutive.pubsub.producer.encoder.MessageEncoder
import com.snowplowanalytics.snowplow.badrows.{BadRow, Payload, Failure}
import org.specs2.mutable.Specification

class ProducerSpec extends Specification {

implicit val runtime: IORuntime = IORuntime.global

implicit val stringEncoder: MessageEncoder[String] = { str =>
Right(str.getBytes(StandardCharsets.UTF_8))
}

class MockProducer[A] extends Producer[IO, A] {
val buf: ListBuffer[A] = ListBuffer.empty
override def produce(data: A): IO[Unit] = IO.delay(buf.addOne(data))
def get: List[A] = buf.toList
}

"produceWrtToSize" should {
"send normal size messages to normal producer" in {
val producer = new MockProducer[String]
val oversizeBadRowProducer = new MockProducer[BadRow.SizeViolation]

val messages = (1 to 100).map(i => s"test-$i")

messages.foreach(
Producer.produceWrtToSize(_, producer, oversizeBadRowProducer).unsafeRunSync()
)

producer.get must beEqualTo(messages)
oversizeBadRowProducer.get must beEmpty
}

"send oversize messages to oversize bad row producer" in {
val producer = new MockProducer[String]
val oversizeBadRowProducer = new MockProducer[BadRow.SizeViolation]

val messageSize = Producer.MaxPayloadLength + 1
val message = Random.alphanumeric.take(messageSize).mkString

Producer.produceWrtToSize(message, producer, oversizeBadRowProducer).unsafeRunSync()

val expectedBadRowPayload = message.take(Producer.MaxPayloadLength / 10)

producer.get must beEmpty
oversizeBadRowProducer.get must beLike {
case List(BadRow.SizeViolation(
`processor`,
Failure.SizeViolation(_, Producer.MaxPayloadLength, `messageSize`, _),
Payload.RawPayload(`expectedBadRowPayload`)
)) => ok
}
}
}

}

0 comments on commit 1e65e33

Please sign in to comment.