Skip to content

Commit

Permalink
Address Ian's feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Oct 12, 2023
1 parent 255b5f0 commit 6572661
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,30 +44,31 @@ object Producer {
delay: FiniteDuration,
oversizeBadRowProducer: Producer[F, BadRow.SizeViolation]
): Resource[F, Producer[F, A]] =
mkProducer[F, A](projectId, topic, batchSize, delay).map { p =>
(data: A) => {
val dataSize = getSize(data)
if (dataSize >= MaxPayloadLength) {
val badRow = createSizeViolationBadRow(data, dataSize)
oversizeBadRowProducer.produce(badRow)
} else {
p.produce(data).void
}
}
mkPubsubProducer[F, A](projectId, topic, batchSize, delay).map { p =>
(data: A) => produceWrtToSize[F, A](data, p.produce(_).void, oversizeBadRowProducer)
}

def mkOversizeBadRowProducer[F[_]: Async: Logger](
def mkBadRowProducers[F[_]: Async: Logger](
projectId: String,
topic: String,
batchSize: Long,
delay: FiniteDuration
): Resource[F, Producer[F, BadRow.SizeViolation]] =
mkProducer[F, BadRow](projectId, topic, batchSize, delay).map { p =>
(data: BadRow.SizeViolation) => p.produce(data).void
): Resource[F, (Producer[F, BadRow], Producer[F, BadRow.SizeViolation])] =
mkPubsubProducer[F, BadRow](projectId, topic, batchSize, delay).map { p =>
val badRowProducer = new Producer[F, BadRow] {
override def produce(badRow: BadRow): F[Unit] = {
produceWrtToSize[F, BadRow](badRow, p.produce(_).void, p.produce(_).void)
}
}
val oversizeBadRowProducer = new Producer[F, BadRow.SizeViolation] {
override def produce(data: BadRow.SizeViolation): F[Unit] =
p.produce(data).void
}
(badRowProducer, oversizeBadRowProducer)
}

/** Construct a PubSub producer. */
private def mkProducer[F[_]: Async: Logger, A: MessageEncoder](
private def mkPubsubProducer[F[_]: Async: Logger, A: MessageEncoder](
projectId: String,
topic: String,
batchSize: Long,
Expand All @@ -83,17 +84,27 @@ object Producer {
)
)

private def createSizeViolationBadRow[A: MessageEncoder](data: A, actualDataSize: Int): BadRow.SizeViolation = {
val msg = s"Pubsub message exceedsMessageEncoder allowed size"
val payload = MessageEncoder[A].encode(data)
.map(bytes => new String(bytes, StandardCharsets.UTF_8))
.getOrElse("Pubsub message can't be converted to string")
BadRow
.SizeViolation(
processor,
Failure.SizeViolation(Instant.now(), MaxPayloadLength, actualDataSize, msg),
BadRowPayload.RawPayload(payload.take(MaxPayloadLength / 10))
)
def produceWrtToSize[F[_]: Async, A: MessageEncoder](
data: A,
producer: Producer[F, A],
oversizeBadRowProducer: Producer[F, BadRow.SizeViolation]
): F[Unit] = {
val dataSize = getSize(data)
if (dataSize >= MaxPayloadLength) {
val msg = s"Pubsub message exceedsMessageEncoder allowed size"
val payload = MessageEncoder[A].encode(data)
.map(bytes => new String(bytes, StandardCharsets.UTF_8))
.getOrElse("Pubsub message can't be converted to string")
.take(MaxPayloadLength / 10)
val badRow = BadRow.SizeViolation(
processor,
Failure.SizeViolation(Instant.now(), MaxPayloadLength, dataSize, msg),
BadRowPayload.RawPayload(payload)
)
oversizeBadRowProducer.produce(badRow)
} else {
producer.produce(data).void
}
}

private def getSize[A: MessageEncoder](a: A): Int =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ object Resources {
resolver <- mkResolver(resolverConfig)
metrics <- Resource.eval(mkMetricsReporter[F](env.monitoring))
bigquery <- Resource.eval[F, BigQuery](Bigquery.getClient(env.config.retrySettings, env.projectId))
oversizeBadRowProducer <- Producer.mkOversizeBadRowProducer[F](
(badRowProducer, oversizeBadRowProducer) <- Producer.mkBadRowProducers[F](
env.projectId,
env.config.output.bad.topic,
env.config.sinkSettings.bad.producerBatchSize,
Expand All @@ -118,12 +118,10 @@ object Resources {
env.config.sinkSettings.failedInserts.producerDelayThreshold,
oversizeBadRowProducer
)
badSink <- mkBadSink[F](
env.projectId,
env.config.output.bad.topic,
badSink = mkBadSink[F](
env.config.sinkSettings.bad.sinkConcurrency,
metrics, env.config.sinkSettings.bad,
oversizeBadRowProducer
metrics,
badRowProducer
)
sentry <- Sentry.init(env.monitoring.sentry)
http <- EmberClientBuilder.default[F].build
Expand All @@ -137,25 +135,14 @@ object Resources {
// TODO: Can failed inserts be given their own sink like bad rows and types?

// Acks the event after processing it as a `BadRow`
private def mkBadSink[F[_]: Async: Logger](
projectId: String,
topic: String,
private def mkBadSink[F[_]: Async](
maxConcurrency: Int,
metrics: Metrics[F],
sinkSettingsBad: SinkSettings.Bad,
oversizeBadRowProducer: Producer[F, BadRow.SizeViolation]
): Resource[F, Pipe[F, StreamBadRow[F], Nothing]] =
Producer.mkProducer[F, BadRow](
projectId,
topic,
sinkSettingsBad.producerBatchSize,
sinkSettingsBad.producerDelayThreshold,
oversizeBadRowProducer
).map { p =>
_.parEvalMapUnordered(maxConcurrency) { badRow =>
p.produce(badRow.row) *> badRow.ack *> metrics.badCount
}.drain
}
badRowProducer: Producer[F, BadRow]
): Pipe[F, StreamBadRow[F], Nothing] =
_.parEvalMapUnordered(maxConcurrency) { badRow =>
badRowProducer.produce(badRow.row) *> badRow.ack *> metrics.badCount
}.drain

// Does not ack the event -- it still needs to end up in one of the other targets
private def mkTypeSink[F[_]: Async: Logger](
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright (c) 2018-2023 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.storage.bigquery.streamloader

import scala.collection.mutable.ListBuffer
import scala.util.Random
import java.nio.charset.StandardCharsets
import cats.effect.IO
import cats.effect.unsafe.IORuntime
import com.permutive.pubsub.producer.encoder.MessageEncoder
import com.snowplowanalytics.snowplow.badrows.{BadRow, Payload, Failure}
import org.specs2.mutable.Specification

class ProducerSpec extends Specification {

implicit val runtime: IORuntime = IORuntime.global

implicit val stringEncoder: MessageEncoder[String] = { str =>
Right(str.getBytes(StandardCharsets.UTF_8))
}

class MockProducer[A] extends Producer[IO, A] {
val buf: ListBuffer[A] = ListBuffer.empty
override def produce(data: A): IO[Unit] = IO.delay(buf.addOne(data))
def get: List[A] = buf.toList
}

"produceWrtToSize" should {
"send normal size messages to normal producer" in {
val producer = new MockProducer[String]
val oversizeBadRowProducer = new MockProducer[BadRow.SizeViolation]

val messages = (1 to 100).map(i => s"test-$i")

messages.foreach(
Producer.produceWrtToSize(_, producer, oversizeBadRowProducer).unsafeRunSync()
)

producer.get must beEqualTo(messages)
oversizeBadRowProducer.get must beEmpty
}

"send oversize messages to oversize bad row producer" in {
val producer = new MockProducer[String]
val oversizeBadRowProducer = new MockProducer[BadRow.SizeViolation]

val messageSize = Producer.MaxPayloadLength + 1
val message = Random.alphanumeric.take(messageSize).mkString

Producer.produceWrtToSize(message, producer, oversizeBadRowProducer).unsafeRunSync()

val expectedBadRowPayload = message.take(Producer.MaxPayloadLength / 10)

producer.get must beEmpty
oversizeBadRowProducer.get must beLike {
case List(BadRow.SizeViolation(
`processor`,
Failure.SizeViolation(_, Producer.MaxPayloadLength, `messageSize`, _),
Payload.RawPayload(`expectedBadRowPayload`)
)) => ok
}
}
}

}

0 comments on commit 6572661

Please sign in to comment.