Skip to content

Commit

Permalink
[amendment] retrying moved into common-streams
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Aug 20, 2024
1 parent 645caf6 commit 178178e
Show file tree
Hide file tree
Showing 14 changed files with 206 additions and 261 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,54 +9,24 @@
package com.snowplowanalytics.snowplow.bigquery

import cats.implicits._
import cats.{Eq, Show}
import cats.Show
import cats.implicits.showInterpolator

import com.snowplowanalytics.snowplow.runtime.SetupExceptionMessages

sealed trait Alert
object Alert {

final case class FailedToCreateEventsTable(cause: Throwable) extends Alert
final case class FailedToAddColumns(columns: Vector[String], cause: Throwable) extends Alert
final case class FailedToOpenBigQueryWriter(cause: Throwable) extends Alert
final case class FailedToGetTable(reasons: SetupExceptionMessages) extends Alert
final case class FailedToCreateEventsTable(reasons: SetupExceptionMessages) extends Alert
final case class FailedToAddColumns(columns: Vector[String], cause: SetupExceptionMessages) extends Alert
final case class FailedToOpenBigQueryWriter(reasons: SetupExceptionMessages) extends Alert

implicit def showAlert: Show[Alert] = Show[Alert] {
case FailedToCreateEventsTable(cause) => show"Failed to create events table: $cause"
case FailedToGetTable(cause) => show"Failed to get table: $cause"
case FailedToCreateEventsTable(cause) => show"Failed to create table: $cause"
case FailedToAddColumns(columns, cause) => show"Failed to add columns: ${columns.mkString("[", ",", "]")}. Cause: $cause"
case FailedToOpenBigQueryWriter(cause) => show"Failed to open BigQuery writer: $cause"
}

implicit def eqAlert: Eq[Alert] = Eq.instance[Alert] {
case (FailedToCreateEventsTable(cause1), FailedToCreateEventsTable(cause2)) =>
throwableShow.show(cause1) === throwableShow.show(cause2)
case (FailedToAddColumns(_, cause1), FailedToAddColumns(_, cause2)) =>
throwableShow.show(cause1) === throwableShow.show(cause2)
case (FailedToOpenBigQueryWriter(cause1), FailedToOpenBigQueryWriter(cause2)) =>
throwableShow.show(cause1) === throwableShow.show(cause2)
case _ =>
false
}

private implicit def throwableShow: Show[Throwable] = {
def removeDuplicateMessages(in: List[String]): List[String] =
in match {
case h :: t :: rest =>
if (h.contains(t)) removeDuplicateMessages(h :: rest)
else if (t.contains(h)) removeDuplicateMessages(t :: rest)
else h :: removeDuplicateMessages(t :: rest)
case fewer => fewer
}

def accumulateMessages(t: Throwable): List[String] = {
val nextMessage = Option(t.getMessage)
Option(t.getCause) match {
case Some(cause) => nextMessage.toList ::: accumulateMessages(cause)
case None => nextMessage.toList
}
}

Show.show { t =>
removeDuplicateMessages(accumulateMessages(t)).mkString(": ")
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import com.comcast.ip4s.Port
import scala.concurrent.duration.FiniteDuration
import com.snowplowanalytics.iglu.client.resolver.Resolver.ResolverConfig
import com.snowplowanalytics.iglu.core.SchemaCriterion
import com.snowplowanalytics.snowplow.runtime.{AcceptedLicense, Metrics => CommonMetrics, Telemetry, Webhook}
import com.snowplowanalytics.snowplow.runtime.{AcceptedLicense, Metrics => CommonMetrics, Retrying, Telemetry, Webhook}
import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs.schemaCriterionDecoder
import com.snowplowanalytics.snowplow.runtime.HealthProbe.decoders._

Expand Down Expand Up @@ -80,14 +80,12 @@ object Config {
webhook: Webhook.Config
)

case class SetupErrorRetries(delay: FiniteDuration)
case class AlterTableWaitRetries(delay: FiniteDuration)
case class TransientErrorRetries(delay: FiniteDuration, attempts: Int)
case class TooManyColumnsRetries(delay: FiniteDuration)

case class Retries(
setupErrors: SetupErrorRetries,
transientErrors: TransientErrorRetries,
setupErrors: Retrying.Config.ForSetup,
transientErrors: Retrying.Config.ForTransient,
alterTableWait: AlterTableWaitRetries,
tooManyColumns: TooManyColumnsRetries
)
Expand All @@ -112,9 +110,7 @@ object Config {
implicit val metricsDecoder = deriveConfiguredDecoder[Metrics]
implicit val healthProbeDecoder = deriveConfiguredDecoder[HealthProbe]
implicit val monitoringDecoder = deriveConfiguredDecoder[Monitoring]
implicit val setupRetries = deriveConfiguredDecoder[SetupErrorRetries]
implicit val alterTableRetries = deriveConfiguredDecoder[AlterTableWaitRetries]
implicit val transientRetries = deriveConfiguredDecoder[TransientErrorRetries]
implicit val tooManyColsRetries = deriveConfiguredDecoder[TooManyColumnsRetries]
implicit val retriesDecoder = deriveConfiguredDecoder[Retries]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,90 +8,40 @@
*/
package com.snowplowanalytics.snowplow.bigquery.processing

import cats.Applicative
import cats.{Applicative, Eq}
import cats.effect.Sync
import cats.implicits._
import com.google.api.gax.rpc.PermissionDeniedException
import com.google.cloud.bigquery.BigQueryException
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger
import io.grpc.{Status => GrpcStatus, StatusRuntimeException}
import retry._
import retry.implicits.retrySyntaxError

import com.snowplowanalytics.snowplow.runtime.AppHealth
import com.snowplowanalytics.snowplow.runtime.{AppHealth, Retrying, SetupExceptionMessages}
import com.snowplowanalytics.snowplow.bigquery.{Alert, Config, RuntimeService}
import com.snowplowanalytics.snowplow.bigquery.processing.BigQueryUtils.BQExceptionSyntax

object BigQueryRetrying {

private implicit def logger[F[_]: Sync] = Slf4jLogger.getLogger[F]

def withRetries[F[_]: Sync: Sleep, A](
appHealth: AppHealth.Interface[F, Alert, RuntimeService],
config: Config.Retries,
toAlert: Throwable => Alert
toAlert: SetupExceptionMessages => Alert
)(
action: F[A]
): F[A] =
retryUntilSuccessful(appHealth, config, toAlert, action) <*
appHealth.becomeHealthyForRuntimeService(RuntimeService.BigQueryClient)

private def retryUntilSuccessful[F[_]: Sync: Sleep, A](
appHealth: AppHealth.Interface[F, Alert, RuntimeService],
config: Config.Retries,
toAlert: Throwable => Alert,
action: F[A]
): F[A] =
action
.retryingOnSomeErrors(
isWorthRetrying = isSetupError[F](_),
policy = policyForSetupErrors[F](config),
onError = logErrorAndSendAlert[F](appHealth, toAlert, _, _)
)
.retryingOnAllErrors(
policy = policyForTransientErrors[F](config),
onError = logErrorAndReportUnhealthy[F](appHealth, _, _)
)
Retrying.withRetries(appHealth, config.transientErrors, config.setupErrors, RuntimeService.BigQueryClient, toAlert, isSetupError)(
action
)

private def isSetupError[F[_]: Sync](t: Throwable): F[Boolean] = t match {
def isSetupError: PartialFunction[Throwable, String] = {
case bqe: BigQueryException if Set("notfound", "accessdenied").contains(bqe.lowerCaseReason) =>
true.pure[F]
case _: PermissionDeniedException =>
true.pure[F]
case _ =>
false.pure[F]
bqe.getMessage
case sre: StatusRuntimeException if sre.getStatus.getCode === GrpcStatus.Code.PERMISSION_DENIED =>
sre.getMessage
}

private def policyForSetupErrors[F[_]: Applicative](config: Config.Retries): RetryPolicy[F] =
RetryPolicies.exponentialBackoff[F](config.setupErrors.delay)

private def policyForTransientErrors[F[_]: Applicative](config: Config.Retries): RetryPolicy[F] =
RetryPolicies.fullJitter[F](config.transientErrors.delay).join(RetryPolicies.limitRetries(config.transientErrors.attempts - 1))

def policyForAlterTableWait[F[_]: Applicative](config: Config.Retries): RetryPolicy[F] =
RetryPolicies.fibonacciBackoff[F](config.alterTableWait.delay)

private def logErrorAndSendAlert[F[_]: Sync](
appHealth: AppHealth.Interface[F, Alert, RuntimeService],
toAlert: Throwable => Alert,
error: Throwable,
details: RetryDetails
): F[Unit] =
Logger[F].error(error)(s"Executing BigQuery command failed. ${extractRetryDetails(details)}") *>
appHealth.becomeUnhealthyForSetup(toAlert(error))
private implicit def statusCodeEq: Eq[GrpcStatus.Code] = Eq.fromUniversalEquals

private def logErrorAndReportUnhealthy[F[_]: Sync](
appHealth: AppHealth.Interface[F, ?, RuntimeService],
error: Throwable,
details: RetryDetails
): F[Unit] =
Logger[F].error(error)(s"Executing BigQuery command failed. ${extractRetryDetails(details)}") *>
appHealth.becomeUnhealthyForRuntimeService(RuntimeService.BigQueryClient)

def extractRetryDetails(details: RetryDetails): String = details match {
case RetryDetails.GivingUp(totalRetries, totalDelay) =>
s"Giving up on retrying, total retries: $totalRetries, total delay: ${totalDelay.toSeconds} seconds"
case RetryDetails.WillDelayAndRetry(nextDelay, retriesSoFar, cumulativeDelay) =>
s"Will retry in ${nextDelay.toSeconds} seconds, retries so far: $retriesSoFar, total delay so far: ${cumulativeDelay.toSeconds} seconds"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProce
import com.snowplowanalytics.snowplow.sinks.ListOfList
import com.snowplowanalytics.snowplow.runtime.syntax.foldable._
import com.snowplowanalytics.snowplow.runtime.processing.BatchUp
import com.snowplowanalytics.snowplow.runtime.Retrying.showRetryDetails
import com.snowplowanalytics.snowplow.loaders.transform.{BadRowsSerializer, NonAtomicFields, SchemaSubVersion, TabledEntity, Transform}
import com.snowplowanalytics.snowplow.bigquery.{Environment, Metrics, RuntimeService}

Expand All @@ -41,7 +42,7 @@ object Processing {
def stream[F[_]: Async](env: Environment[F]): Stream[F, Nothing] = {
implicit val lookup: RegistryLookup[F] = Http4sRegistryLookup(env.httpClient)
val eventProcessingConfig = EventProcessingConfig(EventProcessingConfig.NoWindowing)
Stream.eval(env.tableManager.createTable) *>
Stream.eval(env.tableManager.createTableIfNotExists) *>
Stream.eval(env.writer.opened.use_) *>
env.source.stream(eventProcessingConfig, eventProcessor(env))
}
Expand Down Expand Up @@ -201,7 +202,7 @@ object Processing {
_.parEvalMap(env.batching.writeBatchConcurrency) { batch =>
writeUntilSuccessful(env, badProcessor, batch)
.onError { _ =>
env.appHealth.becomeUnhealthyForRuntimeService(RuntimeService.BigQueryClient)
env.appHealth.beUnhealthyForRuntimeService(RuntimeService.BigQueryClient)
}
}

Expand All @@ -218,8 +219,7 @@ object Processing {

def handlingServerSideSchemaMismatches(env: Environment[F]): F[Writer.WriteResult] = {
def onFailure(wr: Writer.WriteResult, details: RetryDetails): F[Unit] = {
val extractedDetail = BigQueryRetrying.extractRetryDetails(details)
val msg = s"Newly added columns have not yet propagated to the BigQuery Writer server-side. $extractedDetail"
val msg = show"Newly added columns have not yet propagated to the BigQuery Writer server-side. $details"
val log = wr match {
case Writer.WriteResult.ServerSideSchemaMismatch(e) if details.retriesSoFar > errorsAllowedWithShortLogging =>
Logger[F].warn(e)(msg)
Expand Down Expand Up @@ -318,7 +318,7 @@ object Processing {
}
}
.onError { _ =>
env.appHealth.becomeUnhealthyForRuntimeService(RuntimeService.BigQueryClient)
env.appHealth.beUnhealthyForRuntimeService(RuntimeService.BigQueryClient)
}
}

Expand All @@ -331,8 +331,7 @@ object Processing {
(!BigQuerySchemaUtils.fieldsMissingFromDescriptor(descriptor, fieldsToExist)).pure[F]
},
onFailure = { case (_, details) =>
val extractedDetail = BigQueryRetrying.extractRetryDetails(details)
val msg = s"Newly added columns have not yet propagated to the BigQuery Writer client-side. $extractedDetail"
val msg = show"Newly added columns have not yet propagated to the BigQuery Writer client-side. $details"
Logger[F].warn(msg) *> env.writer.closed.use_
}
)
Expand All @@ -349,7 +348,7 @@ object Processing {
env.badSink
.sinkSimple(serialized)
.onError { _ =>
env.appHealth.becomeUnhealthyForRuntimeService(RuntimeService.BadSink)
env.appHealth.beUnhealthyForRuntimeService(RuntimeService.BadSink)
}
} else Applicative[F].unit
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import com.google.auth.Credentials
import com.google.cloud.bigquery.BigQueryException

import com.snowplowanalytics.iglu.schemaddl.parquet.Field
import com.snowplowanalytics.snowplow.runtime.AppHealth
import com.snowplowanalytics.snowplow.runtime.{AppHealth, SetupExceptionMessages}
import com.snowplowanalytics.snowplow.loaders.transform.AtomicFields
import com.snowplowanalytics.snowplow.bigquery.{Alert, Config, RuntimeService}
import com.snowplowanalytics.snowplow.bigquery.processing.BigQueryUtils.BQExceptionSyntax
Expand All @@ -48,6 +48,8 @@ trait TableManager[F[_]] {
*/
def addColumns(columns: Vector[Field]): F[FieldList]

def tableExists: F[Boolean]

def createTable: F[Unit]

}
Expand All @@ -56,7 +58,10 @@ object TableManager {

private implicit def logger[F[_]: Sync] = Slf4jLogger.getLogger[F]

trait WithHandledErrors[F[_]] extends TableManager[F]
trait WithHandledErrors[F[_]] {
def addColumns(columns: Vector[Field]): F[FieldList]
def createTableIfNotExists: F[Unit]
}

def make[F[_]: Async](
config: Config.BigQuery,
Expand All @@ -79,27 +84,37 @@ object TableManager {
case true =>
BigQueryRetrying
.withRetries(appHealth, retries, Alert.FailedToAddColumns(columns.map(_.name), _)) {
Logger[F].info(s"Altering table to add columns [${showColumns(columns)}]") *>
underlying
.addColumns(columns)
.recoverWith(handleTooManyColumns(retries, appHealth, addingColumnsEnabled, columns))
.onError(logOnRaceCondition)
underlying
.addColumns(columns)
.recoverWith(handleTooManyColumns(retries, appHealth, addingColumnsEnabled, columns))
.onError(logOnRaceCondition)
}
case false =>
FieldList.of().pure[F]
}

def createTable: F[Unit] =
BigQueryRetrying.withRetries(appHealth, retries, Alert.FailedToCreateEventsTable(_)) {
underlying.createTable
.recoverWith {
case bqe: BigQueryException if bqe.lowerCaseReason === "duplicate" =>
// Table already exists
Logger[F].info(s"Ignoring error when creating table: ${bqe.getMessage}")
case bqe: BigQueryException if bqe.lowerCaseReason === "accessdenied" =>
Logger[F].info(s"Access denied when trying to create table. Will ignore error and assume table already exists.")
}
}
def createTableIfNotExists: F[Unit] =
BigQueryRetrying
.withRetries(appHealth, retries, Alert.FailedToGetTable(_)) {
underlying.tableExists
.recoverWith {
case bqe: BigQueryException if bqe.lowerCaseReason === "accessdenied" =>
Logger[F].info(bqe)("Failed to get details of existing table. Will fallback to creating the table...").as(false)
}
}
.flatMap {
case true =>
Sync[F].unit
case false =>
BigQueryRetrying.withRetries(appHealth, retries, Alert.FailedToCreateEventsTable(_)) {
underlying.createTable
.recoverWith {
case bqe: BigQueryException if bqe.lowerCaseReason === "duplicate" =>
// Table already exists
Logger[F].info(s"Ignoring error when creating table: ${bqe.getMessage}")
}
}
}
}

private def impl[F[_]: Async](
Expand All @@ -109,21 +124,31 @@ object TableManager {

def addColumns(columns: Vector[Field]): F[FieldList] =
for {
_ <- Logger[F].info(s"Attempting to fetch details of table ${config.dataset}.${config.table}...")
table <- Sync[F].blocking(client.getTable(config.dataset, config.table))
_ <- Logger[F].info("Successfully fetched details of table")
schema <- Sync[F].pure(table.getDefinition[TableDefinition].getSchema)
fields <- Sync[F].pure(schema.getFields)
fields <- Sync[F].pure(BigQuerySchemaUtils.mergeInColumns(fields, columns))
schema <- Sync[F].pure(Schema.of(fields))
table <- Sync[F].pure(setTableSchema(table, schema))
_ <- Logger[F].info(s"Altering table to add columns [${showColumns(columns)}]...")
_ <- Sync[F].blocking(table.update())
_ <- Logger[F].info("Successfully altered table schema")
} yield fields

def tableExists: F[Boolean] =
for {
_ <- Logger[F].info(s"Attempting to fetch details of table ${config.dataset}.${config.table}...")
_ <- Sync[F].blocking(client.getTable(config.dataset, config.table))
_ <- Logger[F].info("Successfully fetched details of table")
} yield true

def createTable: F[Unit] = {
val tableInfo = atomicTableInfo(config)
Logger[F].info(show"Creating table $tableInfo") *>
Sync[F]
.blocking(client.create(tableInfo))
.void
Sync[F].blocking(client.create(tableInfo)) *>
Logger[F].info(show"Successfully created table $tableInfo")
}
}

Expand All @@ -148,9 +173,9 @@ object TableManager {
val enableAfterDelay = Async[F].sleep(retries.tooManyColumns.delay) *> addingColumnsEnabled.set(true)
for {
_ <- Logger[F].error(bqe)(s"Could not alter table schema because of too many columns")
_ <- appHealth.becomeUnhealthyForSetup(
Alert.FailedToAddColumns(columns.map(_.name), bqe)
) // TODO: app will immediately become healthy again and send a heartbeat
_ <- appHealth.beUnhealthyForSetup(
Alert.FailedToAddColumns(columns.map(_.name), SetupExceptionMessages(List(bqe.getMessage)))
)
_ <- addingColumnsEnabled.set(false)
_ <- enableAfterDelay.start
} yield FieldList.of()
Expand Down
Loading

0 comments on commit 178178e

Please sign in to comment.