Skip to content

Commit

Permalink
Fix for exceeding Pubsub max message lengths
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Oct 11, 2023
1 parent 7b3c5f6 commit d9eb0be
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ object LoaderRow {

val LoadTstampField = Field("load_tstamp", Type.Timestamp, Mode.Nullable)

val MaxBadRowPayloadLength = 5000000

/**
* Parse the enriched TSV line into a loader row, that can be loaded into BigQuery.
* If Loader is able to figure out that row cannot be loaded into BQ,
Expand All @@ -60,7 +62,7 @@ object LoaderRow {
case Validated.Valid(event) =>
fromEvent[F](resolver, processor, fieldCache)(event)
case Validated.Invalid(error) =>
val badRowError = BadRow.LoaderParsingError(processor, error, Payload.RawPayload(record))
val badRowError = BadRow.LoaderParsingError(processor, error, Payload.RawPayload(record.take(MaxBadRowPayloadLength)))
Monad[F].pure(badRowError.asLeft)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,23 @@ import com.permutive.pubsub.producer.encoder.MessageEncoder

package object streamloader {

val MaxPayloadLength = 9000000 // Stay under Pubsub Maximum of 10MB

type Payload[F[_]] = ConsumerRecord[F, String]

implicit val messageDecoder: MessageDecoder[String] = (bytes: Array[Byte]) => {
Right(new String(bytes, StandardCharsets.UTF_8))
Right(new String(bytes, StandardCharsets.UTF_8).take(MaxPayloadLength))
}

implicit val badRowEncoder: MessageEncoder[BadRow] = { br =>
Right(br.compact.getBytes(StandardCharsets.UTF_8))
Right(br.compact.getBytes(StandardCharsets.UTF_8).take(MaxPayloadLength))
}

implicit val shreddedTypesEncoder: MessageEncoder[Set[ShreddedType]] = { t =>
Right(toPayload(t).noSpaces.getBytes(StandardCharsets.UTF_8))
Right(toPayload(t).noSpaces.getBytes(StandardCharsets.UTF_8).take(MaxPayloadLength))
}

implicit val messageEncoder: MessageEncoder[FailedInsert] = { tr =>
Right(tr.tableRow.getBytes(StandardCharsets.UTF_8))
Right(tr.tableRow.getBytes(StandardCharsets.UTF_8).take(MaxPayloadLength))
}
}

0 comments on commit d9eb0be

Please sign in to comment.