Skip to content

Commit

Permalink
Fix for exceeding Pubsub max message lengths
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Oct 12, 2023
1 parent 7b3c5f6 commit 255b5f0
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import com.google.api.client.json.gson.GsonFactory
import com.google.api.gax.retrying.RetrySettings
import com.google.cloud.bigquery.{BigQuery, BigQueryOptions, InsertAllRequest, InsertAllResponse, TableId}
import com.google.cloud.bigquery.InsertAllRequest.RowToInsert
import com.permutive.pubsub.producer.PubsubProducer
import org.threeten.bp.Duration
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger
Expand All @@ -44,7 +43,7 @@ object Bigquery {
* Exceptions from the underlying `insertAll` call are propagated.
*/
def insert[F[_]: Parallel: Sync](
failedInsertProducer: PubsubProducer[F, FailedInsert],
failedInsertProducer: Producer[F, FailedInsert],
metrics: Metrics[F],
toLoad: List[LoaderRow]
)(
Expand Down Expand Up @@ -149,7 +148,7 @@ object Bigquery {

private def handleFailedRows[F[_]: Sync](
metrics: Metrics[F],
failedInsertProducer: PubsubProducer[F, FailedInsert],
failedInsertProducer: Producer[F, FailedInsert],
rows: List[LoaderRow]
): F[Unit] = {
val tableRows = rows.map { lr =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright (c) 2018-2023 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.storage.bigquery.streamloader

import java.time.Instant
import java.nio.charset.StandardCharsets

import cats.implicits._

import cats.effect.{Async, Resource}

import com.permutive.pubsub.producer.{Model, PubsubProducer}
import com.permutive.pubsub.producer.encoder.MessageEncoder
import com.permutive.pubsub.producer.grpc.{GooglePubsubProducer, PubsubProducerConfig}

import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure, Payload => BadRowPayload}

import org.typelevel.log4cats.Logger

import scala.concurrent.duration._

trait Producer[F[_], A] {
def produce(data: A): F[Unit]
}

object Producer {

val MaxPayloadLength = 9000000 // Stay under Pubsub Maximum of 10MB

def mkProducer[F[_]: Async: Logger, A: MessageEncoder](
projectId: String,
topic: String,
batchSize: Long,
delay: FiniteDuration,
oversizeBadRowProducer: Producer[F, BadRow.SizeViolation]
): Resource[F, Producer[F, A]] =
mkProducer[F, A](projectId, topic, batchSize, delay).map { p =>
(data: A) => {
val dataSize = getSize(data)
if (dataSize >= MaxPayloadLength) {
val badRow = createSizeViolationBadRow(data, dataSize)
oversizeBadRowProducer.produce(badRow)
} else {
p.produce(data).void
}
}
}

def mkOversizeBadRowProducer[F[_]: Async: Logger](
projectId: String,
topic: String,
batchSize: Long,
delay: FiniteDuration
): Resource[F, Producer[F, BadRow.SizeViolation]] =
mkProducer[F, BadRow](projectId, topic, batchSize, delay).map { p =>
(data: BadRow.SizeViolation) => p.produce(data).void
}

/** Construct a PubSub producer. */
private def mkProducer[F[_]: Async: Logger, A: MessageEncoder](
projectId: String,
topic: String,
batchSize: Long,
delay: FiniteDuration
): Resource[F, PubsubProducer[F, A]] =
GooglePubsubProducer.of[F, A](
Model.ProjectId(projectId),
Model.Topic(topic),
config = PubsubProducerConfig[F](
batchSize = batchSize,
delayThreshold = delay,
onFailedTerminate = e => Logger[F].error(e)(s"Error in PubSub producer")
)
)

private def createSizeViolationBadRow[A: MessageEncoder](data: A, actualDataSize: Int): BadRow.SizeViolation = {
val msg = s"Pubsub message exceedsMessageEncoder allowed size"
val payload = MessageEncoder[A].encode(data)
.map(bytes => new String(bytes, StandardCharsets.UTF_8))
.getOrElse("Pubsub message can't be converted to string")
BadRow
.SizeViolation(
processor,
Failure.SizeViolation(Instant.now(), MaxPayloadLength, actualDataSize, msg),
BadRowPayload.RawPayload(payload.take(MaxPayloadLength / 10))
)
}

private def getSize[A: MessageEncoder](a: A): Int =
MessageEncoder[A].encode(a).map(_.length).getOrElse(Int.MaxValue)
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ import cats.effect.{Async, Resource, Sync}
import cats.implicits._
import retry.RetryPolicies
import com.google.cloud.bigquery.BigQuery
import com.permutive.pubsub.producer.{Model, PubsubProducer}
import com.permutive.pubsub.producer.encoder.MessageEncoder
import com.permutive.pubsub.producer.grpc.{GooglePubsubProducer, PubsubProducerConfig}
import com.snowplowanalytics.iglu.client.resolver.Resolver
import com.snowplowanalytics.iglu.client.resolver.Resolver.ResolverConfig
import com.snowplowanalytics.iglu.schemaddl.bigquery.Field
Expand Down Expand Up @@ -100,15 +97,34 @@ object Resources {
resolverConfig <- mkResolverConfig(env.resolverJson)
resolver <- mkResolver(resolverConfig)
metrics <- Resource.eval(mkMetricsReporter[F](env.monitoring))
types <- mkTypeSink[F](env.projectId, env.config.output.types.topic, env.config.sinkSettings.types, metrics)
bigquery <- Resource.eval[F, BigQuery](Bigquery.getClient(env.config.retrySettings, env.projectId))
failedInserts <- mkProducer[F, Bigquery.FailedInsert](
oversizeBadRowProducer <- Producer.mkOversizeBadRowProducer[F](
env.projectId,
env.config.output.bad.topic,
env.config.sinkSettings.bad.producerBatchSize,
env.config.sinkSettings.bad.producerDelayThreshold
)
types <- mkTypeSink[F](
env.projectId,
env.config.output.types.topic,
env.config.sinkSettings.types,
metrics,
oversizeBadRowProducer
)
failedInserts <- Producer.mkProducer[F, Bigquery.FailedInsert](
env.projectId,
env.config.output.failedInserts.topic,
env.config.sinkSettings.failedInserts.producerBatchSize,
env.config.sinkSettings.failedInserts.producerDelayThreshold
env.config.sinkSettings.failedInserts.producerDelayThreshold,
oversizeBadRowProducer
)
badSink <- mkBadSink[F](
env.projectId,
env.config.output.bad.topic,
env.config.sinkSettings.bad.sinkConcurrency,
metrics, env.config.sinkSettings.bad,
oversizeBadRowProducer
)
badSink <- mkBadSink[F](env.projectId, env.config.output.bad.topic, env.config.sinkSettings.bad.sinkConcurrency, metrics, env.config.sinkSettings.bad)
sentry <- Sentry.init(env.monitoring.sentry)
http <- EmberClientBuilder.default[F].build
lookup <- Resource.eval(CreateLruMap[F, FieldKey, Field].create(resolverConfig.cacheSize))
Expand All @@ -118,23 +134,6 @@ object Resources {
} yield new Resources[F](source, resolver, badSink, goodSink, metrics, sentry, registryLookup, lookup)
// format: on

/** Construct a PubSub producer. */
private def mkProducer[F[_]: Async: Logger, A: MessageEncoder](
projectId: String,
topic: String,
batchSize: Long,
delay: FiniteDuration
): Resource[F, PubsubProducer[F, A]] =
GooglePubsubProducer.of[F, A](
Model.ProjectId(projectId),
Model.Topic(topic),
config = PubsubProducerConfig[F](
batchSize = batchSize,
delayThreshold = delay,
onFailedTerminate = e => Logger[F].error(e)(s"Error in PubSub producer")
)
)

// TODO: Can failed inserts be given their own sink like bad rows and types?

// Acks the event after processing it as a `BadRow`
Expand All @@ -143,13 +142,15 @@ object Resources {
topic: String,
maxConcurrency: Int,
metrics: Metrics[F],
sinkSettingsBad: SinkSettings.Bad
sinkSettingsBad: SinkSettings.Bad,
oversizeBadRowProducer: Producer[F, BadRow.SizeViolation]
): Resource[F, Pipe[F, StreamBadRow[F], Nothing]] =
mkProducer[F, BadRow](
Producer.mkProducer[F, BadRow](
projectId,
topic,
sinkSettingsBad.producerBatchSize,
sinkSettingsBad.producerDelayThreshold
sinkSettingsBad.producerDelayThreshold,
oversizeBadRowProducer
).map { p =>
_.parEvalMapUnordered(maxConcurrency) { badRow =>
p.produce(badRow.row) *> badRow.ack *> metrics.badCount
Expand All @@ -161,13 +162,15 @@ object Resources {
projectId: String,
topic: String,
sinkSettingsTypes: SinkSettings.Types,
metrics: Metrics[F]
metrics: Metrics[F],
oversizeBadRowProducer: Producer[F, BadRow.SizeViolation]
): Resource[F, Pipe[F, Set[ShreddedType], Nothing]] =
mkProducer[F, Set[ShreddedType]](
Producer.mkProducer[F, Set[ShreddedType]](
projectId,
topic,
sinkSettingsTypes.producerBatchSize,
sinkSettingsTypes.producerDelayThreshold
sinkSettingsTypes.producerDelayThreshold,
oversizeBadRowProducer
).map { p =>
_.parEvalMapUnordered(sinkSettingsTypes.sinkConcurrency) { types =>
p.produce(types).void *> metrics.typesCount(types.size)
Expand All @@ -189,7 +192,7 @@ object Resources {
private def mkGoodSink[F[_]: Async: Parallel](
good: Output.BigQuery,
bigQuery: BigQuery,
producer: PubsubProducer[F, FailedInsert],
producer: Producer[F, FailedInsert],
metrics: Metrics[F],
typeSink: Pipe[F, Set[ShreddedType], Nothing],
sinkSettingsGood: SinkSettings.Good,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ package com.snowplowanalytics.snowplow.storage.bigquery.streamloader

import com.snowplowanalytics.iglu.client.Resolver
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup
import com.snowplowanalytics.snowplow.badrows.{BadRow, Processor}
import com.snowplowanalytics.snowplow.badrows.BadRow
import com.snowplowanalytics.snowplow.storage.bigquery.common.{FieldCache, LoaderRow}
import com.snowplowanalytics.snowplow.storage.bigquery.common.config.Environment.LoaderEnvironment

Expand All @@ -27,7 +27,6 @@ import fs2.Pipe
import org.typelevel.log4cats.Logger

object StreamLoader {
private val processor: Processor = Processor(generated.BuildInfo.name, generated.BuildInfo.version)

/**
* PubSub message with successfully parsed row, ready to be inserted into BQ.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ package com.snowplowanalytics.snowplow.storage.bigquery
import java.nio.charset.StandardCharsets

import com.snowplowanalytics.snowplow.analytics.scalasdk.Data.ShreddedType
import com.snowplowanalytics.snowplow.badrows.BadRow
import com.snowplowanalytics.snowplow.badrows.{BadRow, Processor}
import com.snowplowanalytics.snowplow.storage.bigquery.common.Codecs.toPayload
import com.snowplowanalytics.snowplow.storage.bigquery.streamloader.Bigquery.FailedInsert

Expand All @@ -27,6 +27,8 @@ package object streamloader {

type Payload[F[_]] = ConsumerRecord[F, String]

val processor: Processor = Processor(generated.BuildInfo.name, generated.BuildInfo.version)

implicit val messageDecoder: MessageDecoder[String] = (bytes: Array[Byte]) => {
Right(new String(bytes, StandardCharsets.UTF_8))
}
Expand Down

0 comments on commit 255b5f0

Please sign in to comment.