Skip to content

Commit

Permalink
Improve exception handling immediately after altering the table
Browse files Browse the repository at this point in the history
When the BigQuery Loader encounters a new schema for the first time, it alters the table to add new
columns.  But there can be a delay between adding the new column, and the column becoming available
for receiving new events.

Opening a Writer is not considered successful, until it is opened in a state where it knows about
the newly added column.  This commit adds a delay/backoff when opening the Writer, so it retries
gracefully.
  • Loading branch information
istreeter committed Feb 14, 2024
1 parent 79b94dc commit f3e62a2
Show file tree
Hide file tree
Showing 13 changed files with 184 additions and 75 deletions.
8 changes: 7 additions & 1 deletion config/config.azure.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,18 @@
"delay": "30 seconds"
}

# -- Configures exponential backoff errors that are likely to be transient.
# -- Configures exponential backoff errors that are likely to be transient.
# -- Examples include server errors and network errors
"transientErrors": {
"delay": "1 second"
"attempts": 5
}

# -- Configures backoff when waiting for the BigQuery Writer to detect that we have altered the
# -- table by adding new columns
"alterTableWait": {
"delay": 1 second"
}
}

"monitoring": {
Expand Down
8 changes: 7 additions & 1 deletion config/config.kinesis.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,18 @@
"delay": "30 seconds"
}

# -- Configures exponential backoff errors that are likely to be transient.
# -- Configures exponential backoff errors that are likely to be transient.
# -- Examples include server errors and network errors
"transientErrors": {
"delay": "1 second"
"attempts": 5
}

# -- Configures backoff when waiting for the BigQuery Writer to detect that we have altered the
# -- table by adding new columns
"alterTableWait": {
"delay": 1 second"
}
}

"monitoring": {
Expand Down
8 changes: 7 additions & 1 deletion config/config.pubsub.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,18 @@
"delay": "30 seconds"
}

# -- Configures exponential backoff errors that are likely to be transient.
# -- Configures exponential backoff errors that are likely to be transient.
# -- Examples include server errors and network errors
"transientErrors": {
"delay": "1 second"
"attempts": 5
}

# -- Configures backoff when waiting for the BigQuery Writer to detect that we have altered the
# -- table by adding new columns
"alterTableWait": {
"delay": 1 second"
}
}

"monitoring": {
Expand Down
3 changes: 3 additions & 0 deletions modules/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
"delay": "1 second"
"attempts": 5
}
"alterTableWait": {
"delay": 1 second"
}
}

"monitoring": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,13 @@ object Config {
final case class Webhook(endpoint: Uri, tags: Map[String, String])

case class SetupErrorRetries(delay: FiniteDuration)
case class AlterTableWaitRetries(delay: FiniteDuration)
case class TransientErrorRetries(delay: FiniteDuration, attempts: Int)

case class Retries(
setupErrors: SetupErrorRetries,
transientErrors: TransientErrorRetries
transientErrors: TransientErrorRetries,
alterTableWait: AlterTableWaitRetries
)

implicit def decoder[Source: Decoder, Sink: Decoder]: Decoder[Config[Source, Sink]] = {
Expand All @@ -112,6 +114,7 @@ object Config {
implicit val webhookDecoder = deriveConfiguredDecoder[Webhook]
implicit val monitoringDecoder = deriveConfiguredDecoder[Monitoring]
implicit val setupRetries = deriveConfiguredDecoder[SetupErrorRetries]
implicit val alterTableRetries = deriveConfiguredDecoder[AlterTableWaitRetries]
implicit val transientRetries = deriveConfiguredDecoder[TransientErrorRetries]
implicit val retriesDecoder = deriveConfiguredDecoder[Retries]
deriveConfiguredDecoder[Config[Source, Sink]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ import cats.effect.unsafe.implicits.global
import org.http4s.client.Client
import org.http4s.blaze.client.BlazeClientBuilder
import io.sentry.Sentry
import retry.RetryPolicy

import com.snowplowanalytics.iglu.client.resolver.Resolver
import com.snowplowanalytics.snowplow.sources.SourceAndAck
import com.snowplowanalytics.snowplow.sinks.Sink
import com.snowplowanalytics.snowplow.bigquery.processing.{BigQueryUtils, TableManager, Writer}
import com.snowplowanalytics.snowplow.bigquery.processing.{BigQueryRetrying, BigQueryUtils, TableManager, Writer}
import com.snowplowanalytics.snowplow.runtime.{AppInfo, HealthProbe}

case class Environment[F[_]](
Expand All @@ -31,6 +32,7 @@ case class Environment[F[_]](
writer: Writer.Provider[F],
metrics: Metrics[F],
appHealth: AppHealth[F],
alterTableWaitPolicy: RetryPolicy[F],
batching: Config.Batching,
badRowMaxSize: Int
)
Expand Down Expand Up @@ -60,20 +62,21 @@ object Environment {
metrics <- Resource.eval(Metrics.build(config.main.monitoring.metrics))
creds <- Resource.eval(BigQueryUtils.credentials(config.main.output.good))
tableManager <- Resource.eval(TableManager.make(config.main.output.good, config.main.retries, creds, appHealth, monitoring))
writerBuilder <- Writer.builder(config.main.output.good, creds, appHealth)
writerBuilder <- Writer.builder(config.main.output.good, creds)
writerProvider <- Writer.provider(writerBuilder, config.main.retries, appHealth, monitoring)
} yield Environment(
appInfo = appInfo,
source = sourceAndAck,
badSink = badSink,
resolver = resolver,
httpClient = httpClient,
tableManager = tableManager,
writer = writerProvider,
metrics = metrics,
appHealth = appHealth,
batching = config.main.batching,
badRowMaxSize = config.main.output.bad.maxRecordSize
appInfo = appInfo,
source = sourceAndAck,
badSink = badSink,
resolver = resolver,
httpClient = httpClient,
tableManager = tableManager,
writer = writerProvider,
metrics = metrics,
appHealth = appHealth,
alterTableWaitPolicy = BigQueryRetrying.policyForAlterTableWait(config.main.retries),
batching = config.main.batching,
badRowMaxSize = config.main.output.bad.maxRecordSize
)

private def enableSentry[F[_]: Sync](appInfo: AppInfo, config: Option[Config.Sentry]): Resource[F, Unit] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ object BigQueryRetrying {
private def policyForTransientErrors[F[_]: Applicative](config: Config.Retries): RetryPolicy[F] =
RetryPolicies.fullJitter[F](config.transientErrors.delay).join(RetryPolicies.limitRetries(config.transientErrors.attempts - 1))

def policyForAlterTableWait[F[_]: Applicative](config: Config.Retries): RetryPolicy[F] =
RetryPolicies.fibonacciBackoff[F](config.alterTableWait.delay)

private def logErrorAndSendAlert[F[_]: Sync](
monitoring: Monitoring[F],
toAlert: Throwable => Alert,
Expand All @@ -80,7 +83,7 @@ object BigQueryRetrying {
private def logError[F[_]: Sync](error: Throwable, details: RetryDetails): F[Unit] =
Logger[F].error(error)(s"Executing BigQuery command failed. ${extractRetryDetails(details)}")

private def extractRetryDetails(details: RetryDetails): String = details match {
def extractRetryDetails(details: RetryDetails): String = details match {
case RetryDetails.GivingUp(totalRetries, totalDelay) =>
s"Giving up on retrying, total retries: $totalRetries, total delay: ${totalDelay.toSeconds} seconds"
case RetryDetails.WillDelayAndRetry(nextDelay, retriesSoFar, cumulativeDelay) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ import cats.effect.kernel.Unique
import fs2.{Chunk, Pipe, Stream}
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger
import retry.RetryDetails
import retry.implicits._

import java.nio.charset.StandardCharsets

import com.snowplowanalytics.iglu.schemaddl.parquet.Caster
import com.snowplowanalytics.iglu.client.resolver.registries.{Http4sRegistryLookup, RegistryLookup}
import com.snowplowanalytics.iglu.schemaddl.parquet.Field
import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.badrows.{BadRow, Payload => BadPayload, Processor => BadRowProcessor}
import com.snowplowanalytics.snowplow.badrows.Payload.{RawPayload => BadRowRawPayload}
Expand Down Expand Up @@ -196,8 +199,6 @@ object Processing {
val attempt = env.writer.opened
.use(_.write(batch.toBeInserted.map(_._2)))
.flatMap {
case Writer.WriteResult.WriterIsClosed =>
env.writer.closed.use_ *> writeUntilSuccessful(env, badProcessor, batch)
case Writer.WriteResult.Success =>
Sync[F].pure(batch.copy(toBeInserted = List.empty))
case Writer.WriteResult.SerializationFailures(failures) =>
Expand Down Expand Up @@ -227,22 +228,50 @@ object Processing {
* Alters the table to add any columns that were present in the Events but not currently in the
* table
*/
private def handleSchemaEvolution[F[_]: Sync](
private def handleSchemaEvolution[F[_]: Async](
env: Environment[F]
): Pipe[F, BatchAfterTransform, BatchAfterTransform] =
_.evalTap { batch =>
env.writer.opened.use(_.descriptor).flatMap { descriptor =>
val fields = batch.entities.fields.flatMap { tte =>
tte.mergedField :: tte.recoveries.map(_._2)
}
if (BigQuerySchemaUtils.alterTableRequired(descriptor, fields)) {
env.tableManager.addColumns(fields) *> env.writer.closed.use_
} else {
val fields = batch.entities.fields.flatMap { tte =>
tte.mergedField :: tte.recoveries.map(_._2)
}
val attempt = alterTableRequired(env, fields).flatMap {
case true =>
env.tableManager.addColumns(fields) *>
waitForAlteredWriter(env, fields)
case false =>
Sync[F].unit
}
}

attempt
.onError { _ =>
env.appHealth.setServiceHealth(AppHealth.Service.BigQueryClient, isHealthy = false)
}
}

private def alterTableRequired[F[_]: Sync](env: Environment[F], fields: List[Field]): F[Boolean] =
env.writer.opened.use(_.descriptor).map { descriptor =>
BigQuerySchemaUtils.alterTableRequired(descriptor, fields)
}

/**
* Repeatedly close and open the Writer it picks up the new columns we just added to the table
*/
private def waitForAlteredWriter[F[_]: Async](env: Environment[F], fields: List[Field]): F[Unit] = {
def logFailure(a: Boolean, details: RetryDetails): F[Unit] = {
val _ = a
val extractedDetail = BigQueryRetrying.extractRetryDetails(details)
Logger[F].warn(s"Newly added columns have not yet propagated to the BigQuery Writer. $extractedDetail")
}
(env.writer.closed.use_ *> alterTableRequired(env, fields))
.retryingOnFailures(
policy = env.alterTableWaitPolicy,
wasSuccessful = { required => Sync[F].pure(!required) },
onFailure = logFailure
)
.void
}

private def sendFailedEvents[F[_]: Sync](
env: Environment[F],
badRowProcessor: BadRowProcessor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,6 @@ object Writer {

case object Success extends WriteResult

/**
* The result of `write` when it raises an exception which which closed the Writer
*/
case object WriterIsClosed extends WriteResult

/**
* The result of `write` when some events could not be serialized according to the underlying
* protobuf descriptor
Expand All @@ -99,13 +94,12 @@ object Writer {

def builder[F[_]: Async](
config: Config.BigQuery,
credentials: Credentials,
appHealth: AppHealth[F]
credentials: Credentials
): Resource[F, Builder[F]] =
for {
client <- createWriteClient(config, credentials)
} yield new Builder[F] {
def build: F[CloseableWriter[F]] = buildWriter[F](config, client).map(impl(_, appHealth))
def build: F[CloseableWriter[F]] = buildWriter[F](config, client).map(impl[F])
}

def provider[F[_]: Async](
Expand All @@ -131,10 +125,7 @@ object Writer {
Resource.makeFull(make)(_.close)
}

private def impl[F[_]: Async](
writer: JsonStreamWriter,
appHealth: AppHealth[F]
): CloseableWriter[F] =
private def impl[F[_]: Async](writer: JsonStreamWriter): CloseableWriter[F] =
new CloseableWriter[F] {

def descriptor: F[Descriptors.Descriptor] =
Expand All @@ -149,13 +140,6 @@ object Writer {
FutureInterop
.fromFuture(fut)
.as[WriteResult](WriteResult.Success)
.attemptTap { either =>
appHealth.setServiceHealth(AppHealth.Service.BigQueryClient, isHealthy = either.isRight)
}
.recoverWith {
case (e: Throwable) if writer.isClosed =>
Logger[F].warn(e)(s"Writer is closed, with an associated exception").as(WriteResult.WriterIsClosed)
}
case Left(appendSerializationError) =>
Applicative[F].pure {
WriteResult.SerializationFailures {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,23 @@ import com.google.protobuf.{DescriptorProtos, Descriptors}
* table
*/
object AtomicDescriptor {
val get: Descriptors.Descriptor = {

val eventId = DescriptorProtos.FieldDescriptorProto.newBuilder
.setLabel(DescriptorProtos.FieldDescriptorProto.Label.LABEL_OPTIONAL)
.setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_STRING)
.setName("event_id")
.setNumber(1)
def get: Descriptors.Descriptor = {
val descriptorProto = DescriptorProtos.DescriptorProto.newBuilder
.addField(0, eventId.setNumber(1)) // For laziness, only adding one atomic field
fromDescriptorProtoBuilder(descriptorProto)
}

def withWebPage: Descriptors.Descriptor = {
val descriptorProto = DescriptorProtos.DescriptorProto.newBuilder
.setName("event")
.addField(0, eventId) // For laziness, only adding one atomic field
.addField(0, eventId.setNumber(1))
.addField(1, webPage.setNumber(2))
.addNestedType(webPageNestedType)
fromDescriptorProtoBuilder(descriptorProto)
}

private def fromDescriptorProtoBuilder(descriptorProto: DescriptorProtos.DescriptorProto.Builder): Descriptors.Descriptor = {
descriptorProto.setName("event")

val fdp = DescriptorProtos.FileDescriptorProto.newBuilder
.addMessageType(descriptorProto)
Expand All @@ -35,4 +41,26 @@ object AtomicDescriptor {
fd.findMessageTypeByName("event")
}

private def eventId: DescriptorProtos.FieldDescriptorProto.Builder =
DescriptorProtos.FieldDescriptorProto.newBuilder
.setLabel(DescriptorProtos.FieldDescriptorProto.Label.LABEL_OPTIONAL)
.setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_STRING)
.setName("event_id")

private def webPage: DescriptorProtos.FieldDescriptorProto.Builder = DescriptorProtos.FieldDescriptorProto.newBuilder
.setLabel(DescriptorProtos.FieldDescriptorProto.Label.LABEL_OPTIONAL)
.setTypeName("web_page_1")
.setName("unstruct_event_com_snowplowanalytics_snowplow_web_page_1")

private def webPageId: DescriptorProtos.FieldDescriptorProto.Builder =
DescriptorProtos.FieldDescriptorProto.newBuilder
.setLabel(DescriptorProtos.FieldDescriptorProto.Label.LABEL_OPTIONAL)
.setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_STRING)
.setName("id")

private def webPageNestedType: DescriptorProtos.DescriptorProto.Builder =
DescriptorProtos.DescriptorProto.newBuilder
.addField(0, webPageId.setNumber(1))
.setName("web_page_1")

}
Loading

0 comments on commit f3e62a2

Please sign in to comment.