diff --git a/modules/streamloader/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/streamloader/Bigquery.scala b/modules/streamloader/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/streamloader/Bigquery.scala index 60aa0353..667f4c1b 100644 --- a/modules/streamloader/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/streamloader/Bigquery.scala +++ b/modules/streamloader/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/streamloader/Bigquery.scala @@ -24,7 +24,6 @@ import com.google.api.client.json.gson.GsonFactory import com.google.api.gax.retrying.RetrySettings import com.google.cloud.bigquery.{BigQuery, BigQueryOptions, InsertAllRequest, InsertAllResponse, TableId} import com.google.cloud.bigquery.InsertAllRequest.RowToInsert -import com.permutive.pubsub.producer.PubsubProducer import org.threeten.bp.Duration import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger @@ -44,7 +43,7 @@ object Bigquery { * Exceptions from the underlying `insertAll` call are propagated. */ def insert[F[_]: Parallel: Sync]( - failedInsertProducer: PubsubProducer[F, FailedInsert], + failedInsertProducer: Producer[F, FailedInsert], metrics: Metrics[F], toLoad: List[LoaderRow] )( @@ -149,7 +148,7 @@ object Bigquery { private def handleFailedRows[F[_]: Sync]( metrics: Metrics[F], - failedInsertProducer: PubsubProducer[F, FailedInsert], + failedInsertProducer: Producer[F, FailedInsert], rows: List[LoaderRow] ): F[Unit] = { val tableRows = rows.map { lr => diff --git a/modules/streamloader/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/streamloader/Producer.scala b/modules/streamloader/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/streamloader/Producer.scala new file mode 100644 index 00000000..292546ac --- /dev/null +++ b/modules/streamloader/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/streamloader/Producer.scala @@ -0,0 +1,112 @@ +/* + * Copyright (c) 2018-2023 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.storage.bigquery.streamloader + +import java.time.Instant +import java.nio.charset.StandardCharsets + +import cats.implicits._ + +import cats.effect.{Async, Resource} + +import com.permutive.pubsub.producer.{Model, PubsubProducer} +import com.permutive.pubsub.producer.encoder.MessageEncoder +import com.permutive.pubsub.producer.grpc.{GooglePubsubProducer, PubsubProducerConfig} + +import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure, Payload => BadRowPayload} + +import org.typelevel.log4cats.Logger + +import scala.concurrent.duration._ + +trait Producer[F[_], A] { + def produce(data: A): F[Unit] +} + +object Producer { + + val MaxPayloadLength = 9000000 // Stay under Pubsub Maximum of 10MB + + def mkProducer[F[_]: Async: Logger, A: MessageEncoder]( + projectId: String, + topic: String, + batchSize: Long, + delay: FiniteDuration, + oversizeBadRowProducer: Producer[F, BadRow.SizeViolation] + ): Resource[F, Producer[F, A]] = + mkPubsubProducer[F, A](projectId, topic, batchSize, delay).map { p => + (data: A) => produceWrtToSize[F, A](data, p.produce(_).void, oversizeBadRowProducer) + } + + def mkBadRowProducers[F[_]: Async: Logger]( + projectId: String, + topic: String, + batchSize: Long, + delay: FiniteDuration + ): Resource[F, (Producer[F, BadRow], Producer[F, BadRow.SizeViolation])] = + mkPubsubProducer[F, BadRow](projectId, topic, batchSize, delay).map { p => + val badRowProducer = new Producer[F, BadRow] { + override def produce(badRow: BadRow): F[Unit] = { + produceWrtToSize[F, BadRow](badRow, p.produce(_).void, p.produce(_).void) + } + } + val oversizeBadRowProducer = new Producer[F, BadRow.SizeViolation] { + override def produce(data: BadRow.SizeViolation): F[Unit] = + p.produce(data).void + } + (badRowProducer, oversizeBadRowProducer) + } + + /** Construct a PubSub producer. */ + private def mkPubsubProducer[F[_]: Async: Logger, A: MessageEncoder]( + projectId: String, + topic: String, + batchSize: Long, + delay: FiniteDuration + ): Resource[F, PubsubProducer[F, A]] = + GooglePubsubProducer.of[F, A]( + Model.ProjectId(projectId), + Model.Topic(topic), + config = PubsubProducerConfig[F]( + batchSize = batchSize, + delayThreshold = delay, + onFailedTerminate = e => Logger[F].error(e)(s"Error in PubSub producer") + ) + ) + + def produceWrtToSize[F[_]: Async, A: MessageEncoder]( + data: A, + producer: Producer[F, A], + oversizeBadRowProducer: Producer[F, BadRow.SizeViolation] + ): F[Unit] = { + val dataSize = getSize(data) + if (dataSize >= MaxPayloadLength) { + val msg = s"Pubsub message exceeds allowed size" + val payload = MessageEncoder[A].encode(data) + .map(bytes => new String(bytes, StandardCharsets.UTF_8)) + .getOrElse("Pubsub message can't be converted to string") + .take(MaxPayloadLength / 10) + val badRow = BadRow.SizeViolation( + processor, + Failure.SizeViolation(Instant.now(), MaxPayloadLength, dataSize, msg), + BadRowPayload.RawPayload(payload) + ) + oversizeBadRowProducer.produce(badRow) + } else { + producer.produce(data).void + } + } + + private def getSize[A: MessageEncoder](a: A): Int = + MessageEncoder[A].encode(a).map(_.length).getOrElse(Int.MaxValue) +} diff --git a/modules/streamloader/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/streamloader/Resources.scala b/modules/streamloader/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/streamloader/Resources.scala index 8223f85d..60569dd4 100644 --- a/modules/streamloader/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/streamloader/Resources.scala +++ b/modules/streamloader/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/streamloader/Resources.scala @@ -36,9 +36,6 @@ import cats.effect.{Async, Resource, Sync} import cats.implicits._ import retry.RetryPolicies import com.google.cloud.bigquery.BigQuery -import com.permutive.pubsub.producer.{Model, PubsubProducer} -import com.permutive.pubsub.producer.encoder.MessageEncoder -import com.permutive.pubsub.producer.grpc.{GooglePubsubProducer, PubsubProducerConfig} import com.snowplowanalytics.iglu.client.resolver.Resolver import com.snowplowanalytics.iglu.client.resolver.Resolver.ResolverConfig import com.snowplowanalytics.iglu.schemaddl.bigquery.Field @@ -100,15 +97,32 @@ object Resources { resolverConfig <- mkResolverConfig(env.resolverJson) resolver <- mkResolver(resolverConfig) metrics <- Resource.eval(mkMetricsReporter[F](env.monitoring)) - types <- mkTypeSink[F](env.projectId, env.config.output.types.topic, env.config.sinkSettings.types, metrics) bigquery <- Resource.eval[F, BigQuery](Bigquery.getClient(env.config.retrySettings, env.projectId)) - failedInserts <- mkProducer[F, Bigquery.FailedInsert]( + (badRowProducer, oversizeBadRowProducer) <- Producer.mkBadRowProducers[F]( + env.projectId, + env.config.output.bad.topic, + env.config.sinkSettings.bad.producerBatchSize, + env.config.sinkSettings.bad.producerDelayThreshold + ) + types <- mkTypeSink[F]( + env.projectId, + env.config.output.types.topic, + env.config.sinkSettings.types, + metrics, + oversizeBadRowProducer + ) + failedInserts <- Producer.mkProducer[F, Bigquery.FailedInsert]( env.projectId, env.config.output.failedInserts.topic, env.config.sinkSettings.failedInserts.producerBatchSize, - env.config.sinkSettings.failedInserts.producerDelayThreshold + env.config.sinkSettings.failedInserts.producerDelayThreshold, + oversizeBadRowProducer + ) + badSink = mkBadSink[F]( + env.config.sinkSettings.bad.sinkConcurrency, + metrics, + badRowProducer ) - badSink <- mkBadSink[F](env.projectId, env.config.output.bad.topic, env.config.sinkSettings.bad.sinkConcurrency, metrics, env.config.sinkSettings.bad) sentry <- Sentry.init(env.monitoring.sentry) http <- EmberClientBuilder.default[F].build lookup <- Resource.eval(CreateLruMap[F, FieldKey, Field].create(resolverConfig.cacheSize)) @@ -118,56 +132,32 @@ object Resources { } yield new Resources[F](source, resolver, badSink, goodSink, metrics, sentry, registryLookup, lookup) // format: on - /** Construct a PubSub producer. */ - private def mkProducer[F[_]: Async: Logger, A: MessageEncoder]( - projectId: String, - topic: String, - batchSize: Long, - delay: FiniteDuration - ): Resource[F, PubsubProducer[F, A]] = - GooglePubsubProducer.of[F, A]( - Model.ProjectId(projectId), - Model.Topic(topic), - config = PubsubProducerConfig[F]( - batchSize = batchSize, - delayThreshold = delay, - onFailedTerminate = e => Logger[F].error(e)(s"Error in PubSub producer") - ) - ) - // TODO: Can failed inserts be given their own sink like bad rows and types? // Acks the event after processing it as a `BadRow` - private def mkBadSink[F[_]: Async: Logger]( - projectId: String, - topic: String, + private def mkBadSink[F[_]: Async]( maxConcurrency: Int, metrics: Metrics[F], - sinkSettingsBad: SinkSettings.Bad - ): Resource[F, Pipe[F, StreamBadRow[F], Nothing]] = - mkProducer[F, BadRow]( - projectId, - topic, - sinkSettingsBad.producerBatchSize, - sinkSettingsBad.producerDelayThreshold - ).map { p => - _.parEvalMapUnordered(maxConcurrency) { badRow => - p.produce(badRow.row) *> badRow.ack *> metrics.badCount - }.drain - } + badRowProducer: Producer[F, BadRow] + ): Pipe[F, StreamBadRow[F], Nothing] = + _.parEvalMapUnordered(maxConcurrency) { badRow => + badRowProducer.produce(badRow.row) *> badRow.ack *> metrics.badCount + }.drain // Does not ack the event -- it still needs to end up in one of the other targets private def mkTypeSink[F[_]: Async: Logger]( projectId: String, topic: String, sinkSettingsTypes: SinkSettings.Types, - metrics: Metrics[F] + metrics: Metrics[F], + oversizeBadRowProducer: Producer[F, BadRow.SizeViolation] ): Resource[F, Pipe[F, Set[ShreddedType], Nothing]] = - mkProducer[F, Set[ShreddedType]]( + Producer.mkProducer[F, Set[ShreddedType]]( projectId, topic, sinkSettingsTypes.producerBatchSize, - sinkSettingsTypes.producerDelayThreshold + sinkSettingsTypes.producerDelayThreshold, + oversizeBadRowProducer ).map { p => _.parEvalMapUnordered(sinkSettingsTypes.sinkConcurrency) { types => p.produce(types).void *> metrics.typesCount(types.size) @@ -189,7 +179,7 @@ object Resources { private def mkGoodSink[F[_]: Async: Parallel]( good: Output.BigQuery, bigQuery: BigQuery, - producer: PubsubProducer[F, FailedInsert], + producer: Producer[F, FailedInsert], metrics: Metrics[F], typeSink: Pipe[F, Set[ShreddedType], Nothing], sinkSettingsGood: SinkSettings.Good, diff --git a/modules/streamloader/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/streamloader/StreamLoader.scala b/modules/streamloader/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/streamloader/StreamLoader.scala index 1eb903ae..30e2bd50 100644 --- a/modules/streamloader/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/streamloader/StreamLoader.scala +++ b/modules/streamloader/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/streamloader/StreamLoader.scala @@ -14,7 +14,7 @@ package com.snowplowanalytics.snowplow.storage.bigquery.streamloader import com.snowplowanalytics.iglu.client.Resolver import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup -import com.snowplowanalytics.snowplow.badrows.{BadRow, Processor} +import com.snowplowanalytics.snowplow.badrows.BadRow import com.snowplowanalytics.snowplow.storage.bigquery.common.{FieldCache, LoaderRow} import com.snowplowanalytics.snowplow.storage.bigquery.common.config.Environment.LoaderEnvironment @@ -27,7 +27,6 @@ import fs2.Pipe import org.typelevel.log4cats.Logger object StreamLoader { - private val processor: Processor = Processor(generated.BuildInfo.name, generated.BuildInfo.version) /** * PubSub message with successfully parsed row, ready to be inserted into BQ. diff --git a/modules/streamloader/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/streamloader/package.scala b/modules/streamloader/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/streamloader/package.scala index cc62fadf..5363cffd 100644 --- a/modules/streamloader/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/streamloader/package.scala +++ b/modules/streamloader/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/streamloader/package.scala @@ -15,7 +15,7 @@ package com.snowplowanalytics.snowplow.storage.bigquery import java.nio.charset.StandardCharsets import com.snowplowanalytics.snowplow.analytics.scalasdk.Data.ShreddedType -import com.snowplowanalytics.snowplow.badrows.BadRow +import com.snowplowanalytics.snowplow.badrows.{BadRow, Processor} import com.snowplowanalytics.snowplow.storage.bigquery.common.Codecs.toPayload import com.snowplowanalytics.snowplow.storage.bigquery.streamloader.Bigquery.FailedInsert @@ -27,6 +27,8 @@ package object streamloader { type Payload[F[_]] = ConsumerRecord[F, String] + val processor: Processor = Processor(generated.BuildInfo.name, generated.BuildInfo.version) + implicit val messageDecoder: MessageDecoder[String] = (bytes: Array[Byte]) => { Right(new String(bytes, StandardCharsets.UTF_8)) } diff --git a/modules/streamloader/src/test/scala/com/snowplowanalytics/snowplow/storage/bigquery/streamloader/ProducerSpec.scala b/modules/streamloader/src/test/scala/com/snowplowanalytics/snowplow/storage/bigquery/streamloader/ProducerSpec.scala new file mode 100644 index 00000000..5cfeb774 --- /dev/null +++ b/modules/streamloader/src/test/scala/com/snowplowanalytics/snowplow/storage/bigquery/streamloader/ProducerSpec.scala @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2018-2023 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.storage.bigquery.streamloader + +import scala.collection.mutable.ListBuffer +import scala.util.Random +import java.nio.charset.StandardCharsets +import cats.effect.IO +import cats.effect.unsafe.IORuntime +import com.permutive.pubsub.producer.encoder.MessageEncoder +import com.snowplowanalytics.snowplow.badrows.{BadRow, Payload, Failure} +import org.specs2.mutable.Specification + +class ProducerSpec extends Specification { + + implicit val runtime: IORuntime = IORuntime.global + + implicit val stringEncoder: MessageEncoder[String] = { str => + Right(str.getBytes(StandardCharsets.UTF_8)) + } + + class MockProducer[A] extends Producer[IO, A] { + val buf: ListBuffer[A] = ListBuffer.empty + override def produce(data: A): IO[Unit] = IO.delay(buf.addOne(data)) + def get: List[A] = buf.toList + } + + "produceWrtToSize" should { + "send normal size messages to normal producer" in { + val producer = new MockProducer[String] + val oversizeBadRowProducer = new MockProducer[BadRow.SizeViolation] + + val messages = (1 to 100).map(i => s"test-$i") + + messages.foreach( + Producer.produceWrtToSize(_, producer, oversizeBadRowProducer).unsafeRunSync() + ) + + producer.get must beEqualTo(messages) + oversizeBadRowProducer.get must beEmpty + } + + "send oversize messages to oversize bad row producer" in { + val producer = new MockProducer[String] + val oversizeBadRowProducer = new MockProducer[BadRow.SizeViolation] + + val messageSize = Producer.MaxPayloadLength + 1 + val message = Random.alphanumeric.take(messageSize).mkString + + Producer.produceWrtToSize(message, producer, oversizeBadRowProducer).unsafeRunSync() + + val expectedBadRowPayload = message.take(Producer.MaxPayloadLength / 10) + + producer.get must beEmpty + oversizeBadRowProducer.get must beLike { + case List(BadRow.SizeViolation( + `processor`, + Failure.SizeViolation(_, Producer.MaxPayloadLength, `messageSize`, _), + Payload.RawPayload(`expectedBadRowPayload`) + )) => ok + } + } + } + +}