diff --git a/modules/common/src/main/resources/application.conf b/modules/common/src/main/resources/application.conf index ed7903df..498df2a4 100644 --- a/modules/common/src/main/resources/application.conf +++ b/modules/common/src/main/resources/application.conf @@ -83,4 +83,8 @@ "monitoring": { } + + "gcpUserAgent": { + "productName": "Snowplow OSS" + } } diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/common/config/AllAppsConfig.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/common/config/AllAppsConfig.scala index 2db7ea41..1954007c 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/common/config/AllAppsConfig.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/common/config/AllAppsConfig.scala @@ -12,8 +12,8 @@ */ package com.snowplowanalytics.snowplow.storage.bigquery.common.config +import com.snowplowanalytics.snowplow.storage.bigquery.common.config.AllAppsConfig.GcpUserAgent import com.snowplowanalytics.snowplow.storage.bigquery.common.config.model._ - import io.circe.{Decoder, Encoder} import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} @@ -22,11 +22,17 @@ final case class AllAppsConfig( loader: Config.Loader, mutator: Config.Mutator, repeater: Config.Repeater, - monitoring: Monitoring + monitoring: Monitoring, + gcpUserAgent: GcpUserAgent ) object AllAppsConfig { + final case class GcpUserAgent(productName: String) + + implicit val gcpUserAgentEncoder: Encoder[GcpUserAgent] = deriveEncoder[GcpUserAgent] + implicit val gcpUserAgentDecoder: Decoder[GcpUserAgent] = deriveDecoder[GcpUserAgent] + implicit val allAppsConfigDecoder: Decoder[AllAppsConfig] = deriveDecoder[AllAppsConfig] implicit val allAppsConfigEncoder: Encoder[AllAppsConfig] = deriveEncoder[AllAppsConfig] diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/common/config/Environment.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/common/config/Environment.scala index eeeaab90..a817386c 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/common/config/Environment.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/common/config/Environment.scala @@ -12,11 +12,17 @@ */ package com.snowplowanalytics.snowplow.storage.bigquery.common.config +import com.snowplowanalytics.snowplow.storage.bigquery.common.config.AllAppsConfig.GcpUserAgent import io.circe.Json - import com.snowplowanalytics.snowplow.storage.bigquery.common.config.model.{Config, Monitoring} -final case class Environment[A](config: A, resolverJson: Json, projectId: String, monitoring: Monitoring) { +final case class Environment[A]( + config: A, + resolverJson: Json, + projectId: String, + monitoring: Monitoring, + gcpUserAgent: GcpUserAgent +) { def getFullSubName(sub: String): String = s"projects/$projectId/subscriptions/$sub" def getFullTopicName(topic: String): String = s"projects/$projectId/topics/$topic" } diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/common/package.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/common/package.scala index 8224fa68..59f893a3 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/common/package.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/common/package.scala @@ -1,11 +1,17 @@ package com.snowplowanalytics.snowplow.storage.bigquery +import com.google.api.gax.rpc.FixedHeaderProvider import com.snowplowanalytics.iglu.client.resolver.StorageTime import com.snowplowanalytics.iglu.core.SchemaKey import com.snowplowanalytics.iglu.schemaddl.bigquery.Field import com.snowplowanalytics.lrumap.LruMap +import com.snowplowanalytics.snowplow.storage.bigquery.common.config.AllAppsConfig.GcpUserAgent package object common { - type FieldKey = (SchemaKey, StorageTime) + type FieldKey = (SchemaKey, StorageTime) type FieldCache[F[_]] = LruMap[F, FieldKey, Field] + + def createGcpUserAgentHeader(gcpUserAgent: GcpUserAgent): FixedHeaderProvider = + FixedHeaderProvider.create("user-agent", s"${gcpUserAgent.productName}/bigquery-loader (GPN:Snowplow;)") + } diff --git a/modules/common/src/test/scala/com/snowplowanalytics/snowplow/storage/bigquery/common/GcpUserAgentSpec.scala b/modules/common/src/test/scala/com/snowplowanalytics/snowplow/storage/bigquery/common/GcpUserAgentSpec.scala new file mode 100644 index 00000000..0aff9aec --- /dev/null +++ b/modules/common/src/test/scala/com/snowplowanalytics/snowplow/storage/bigquery/common/GcpUserAgentSpec.scala @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2023 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.storage.bigquery.common + +import com.snowplowanalytics.snowplow.storage.bigquery.common.config.AllAppsConfig.GcpUserAgent +import org.specs2.mutable.Specification + +import java.util.regex.Pattern + +class GcpUserAgentSpec extends Specification { + + "createUserAgent" should { + "create user agent string correctly" in { + val gcpUserAgent = GcpUserAgent(productName = "Snowplow OSS") + val resultUserAgent = createGcpUserAgentHeader(gcpUserAgent).getHeaders.get("user-agent") + val expectedUserAgent = s"Snowplow OSS/bigquery-loader (GPN:Snowplow;)" + + val userAgentRegex = Pattern.compile( + """(?iU)(?:[^\(\)\/]+\/[^\/]+\s+)*(?:[^\s][^\(\)\/]+\/[^\/]+\s?\([^\(\)]*)gpn:(.*)[;\)]""" + ) + val matcher = userAgentRegex.matcher(resultUserAgent) + val matched = if (matcher.find()) Some(matcher.group(1)) else None + val expectedMatched = "Snowplow;" + + resultUserAgent must beEqualTo(expectedUserAgent) + matched must beSome(expectedMatched) + } + } + +} diff --git a/modules/common/src/test/scala/com/snowplowanalytics/snowplow/storage/bigquery/common/SpecHelpers.scala b/modules/common/src/test/scala/com/snowplowanalytics/snowplow/storage/bigquery/common/SpecHelpers.scala index ffa1ce33..c0539e72 100644 --- a/modules/common/src/test/scala/com/snowplowanalytics/snowplow/storage/bigquery/common/SpecHelpers.scala +++ b/modules/common/src/test/scala/com/snowplowanalytics/snowplow/storage/bigquery/common/SpecHelpers.scala @@ -24,6 +24,7 @@ import com.snowplowanalytics.lrumap.CreateLruMap import com.snowplowanalytics.snowplow.analytics.scalasdk.Event import com.snowplowanalytics.snowplow.analytics.scalasdk.SnowplowEvent.{Contexts, UnstructEvent} import com.snowplowanalytics.snowplow.badrows.Processor +import com.snowplowanalytics.snowplow.storage.bigquery.common.config.AllAppsConfig.GcpUserAgent import com.snowplowanalytics.snowplow.storage.bigquery.common.config.Environment import com.snowplowanalytics.snowplow.storage.bigquery.common.config.Environment.{ LoaderEnvironment, @@ -658,6 +659,7 @@ object SpecHelpers { private val terminationTimeout = FiniteDuration(60, SECONDS) + private val gcpUserAgent: GcpUserAgent = GcpUserAgent("Snowplow OSS") private val loader: Config.Loader = Config.Loader(lInput, lOutput, consumerSettings, sinkSettings, retrySettings, terminationTimeout) @@ -684,11 +686,12 @@ object SpecHelpers { private val sentry: SentryConfig = SentryConfig(URI.create("http://sentry.acme.com")) private val monitoring: Monitoring = Monitoring(Some(statsd), Some(stdout), Some(sentry)) - private[bigquery] val loaderEnv: LoaderEnvironment = Environment(loader, validResolverJson, projectId, monitoring) + private[bigquery] val loaderEnv: LoaderEnvironment = + Environment(loader, validResolverJson, projectId, monitoring, gcpUserAgent) private[bigquery] val mutatorEnv: MutatorEnvironment = - Environment(mutator, validResolverJson, projectId, monitoring) + Environment(mutator, validResolverJson, projectId, monitoring, gcpUserAgent) private[bigquery] val repeaterEnv: RepeaterEnvironment = - Environment(repeater, validResolverJson, projectId, monitoring) + Environment(repeater, validResolverJson, projectId, monitoring, gcpUserAgent) } object cache { diff --git a/modules/mutator/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/mutator/Main.scala b/modules/mutator/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/mutator/Main.scala index 82f072f6..a601443a 100644 --- a/modules/mutator/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/mutator/Main.scala +++ b/modules/mutator/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/mutator/Main.scala @@ -52,7 +52,7 @@ object Main extends IOApp { case Right(c: MutatorCommand.Create) => for { - client <- TableReference.BigQueryTable.getClient(c.env.projectId) + client <- TableReference.BigQueryTable.getClient(c.env.projectId, c.env.gcpUserAgent) _ <- TableReference.BigQueryTable.create(c, client) } yield ExitCode.Success diff --git a/modules/mutator/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/mutator/Mutator.scala b/modules/mutator/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/mutator/Mutator.scala index 782c2f9b..3e77831a 100644 --- a/modules/mutator/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/mutator/Mutator.scala +++ b/modules/mutator/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/mutator/Mutator.scala @@ -69,7 +69,7 @@ object Mutator { verbose: Boolean )(implicit c: Concurrent[IO], logger: Logger[IO]): EitherT[IO, String, Pipe[IO, List[ShreddedType], Unit]] = for { - bqClient <- EitherT.liftF(TableReference.BigQueryTable.getClient(env.projectId)) + bqClient <- EitherT.liftF(TableReference.BigQueryTable.getClient(env.projectId, env.gcpUserAgent)) table = new TableReference.BigQueryTable( bqClient, env.config.output.good.datasetId, diff --git a/modules/mutator/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/mutator/MutatorCli.scala b/modules/mutator/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/mutator/MutatorCli.scala index 79989f67..11ad7bce 100644 --- a/modules/mutator/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/mutator/MutatorCli.scala +++ b/modules/mutator/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/mutator/MutatorCli.scala @@ -30,7 +30,7 @@ object MutatorCli { private val options: Opts[MutatorEnvironment] = CliConfig.options.map { case CliConfig.Parsed(config, resolver) => - Environment(config.mutator, resolver, config.projectId, config.monitoring) + Environment(config.mutator, resolver, config.projectId, config.monitoring, config.gcpUserAgent) } private val schema: Opts[SchemaKey] = Opts.option[String]("schema", "Iglu URI to add to the table").mapValidated { diff --git a/modules/mutator/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/mutator/TableReference.scala b/modules/mutator/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/mutator/TableReference.scala index c150d985..2f37f78b 100644 --- a/modules/mutator/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/mutator/TableReference.scala +++ b/modules/mutator/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/mutator/TableReference.scala @@ -12,11 +12,11 @@ */ package com.snowplowanalytics.snowplow.storage.bigquery.mutator -import com.snowplowanalytics.snowplow.storage.bigquery.common.{Adapter, LoaderRow} +import com.snowplowanalytics.snowplow.storage.bigquery.common.{Adapter, LoaderRow, createGcpUserAgentHeader} import com.snowplowanalytics.snowplow.storage.bigquery.mutator.MutatorCli.MutatorCommand - import cats.effect.IO import com.google.cloud.bigquery.{Schema => BqSchema, _} +import com.snowplowanalytics.snowplow.storage.bigquery.common.config.AllAppsConfig.GcpUserAgent import scala.jdk.CollectionConverters._ @@ -56,9 +56,14 @@ object TableReference { } object BigQueryTable { - def getClient(projectId: String): IO[BigQuery] = + def getClient(projectId: String, gcpUserAgent: GcpUserAgent): IO[BigQuery] = IO( - BigQueryOptions.newBuilder.setProjectId(projectId).build.getService + BigQueryOptions + .newBuilder + .setProjectId(projectId) + .setHeaderProvider(createGcpUserAgentHeader(gcpUserAgent)) + .build + .getService ) def create(args: MutatorCommand.Create, client: BigQuery): IO[Table] = IO { diff --git a/modules/mutator/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/mutator/TypeReceiver.scala b/modules/mutator/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/mutator/TypeReceiver.scala index cd352656..0f704e70 100644 --- a/modules/mutator/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/mutator/TypeReceiver.scala +++ b/modules/mutator/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/mutator/TypeReceiver.scala @@ -23,9 +23,9 @@ import cats.effect.std.Queue import cats.effect.unsafe.IORuntime import cats.syntax.either._ import cats.syntax.show._ -import com.google.api.gax.rpc.FixedHeaderProvider import com.google.cloud.pubsub.v1.{AckReplyConsumer, MessageReceiver, Subscriber} import com.google.pubsub.v1.{ProjectSubscriptionName, PubsubMessage} +import com.snowplowanalytics.snowplow.storage.bigquery.common.createGcpUserAgentHeader import io.circe.{Decoder, DecodingFailure, Error, Json} import io.circe.jawn.parse @@ -83,8 +83,6 @@ class TypeReceiver(queue: Queue[IO, List[ShreddedType]], verbose: Boolean)(impli } object TypeReceiver { - private val UserAgent = - FixedHeaderProvider.create("User-Agent", generated.BuildInfo.userAgent) /** Decode inventory items either in legacy (non-self-describing) format or as `shredded_types` schema'ed */ def decodeItems(json: Json): Decoder.Result[List[ShreddedType]] = @@ -113,7 +111,10 @@ object TypeReceiver { def startSubscription(env: MutatorEnvironment, listener: TypeReceiver): IO[Unit] = IO { val subscription = ProjectSubscriptionName.of(env.projectId, env.config.input.subscription) - val subscriber = Subscriber.newBuilder(subscription, listener).setHeaderProvider(UserAgent).build() + val subscriber = Subscriber + .newBuilder(subscription, listener) + .setHeaderProvider(createGcpUserAgentHeader(env.gcpUserAgent)) + .build() subscriber.startAsync().awaitRunning(10L, TimeUnit.SECONDS) } } diff --git a/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Repeater.scala b/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Repeater.scala index 7814a424..83a6a4b2 100644 --- a/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Repeater.scala +++ b/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Repeater.scala @@ -33,7 +33,8 @@ object Repeater extends IOApp { .getEvents( resources.env.projectId, resources.env.config.input.subscription, - resources.uninsertable + resources.uninsertable, + resources.env.gcpUserAgent ) .interruptWhen(resources.stop) .through[IO, Unit](Flow.sink(resources)) diff --git a/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/RepeaterCli.scala b/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/RepeaterCli.scala index 52cd510b..8a596c13 100644 --- a/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/RepeaterCli.scala +++ b/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/RepeaterCli.scala @@ -31,7 +31,7 @@ object RepeaterCli { private val options: Opts[RepeaterEnvironment] = CliConfig.options.map { case CliConfig.Parsed(config, resolver) => - Environment(config.repeater, resolver, config.projectId, config.monitoring) + Environment(config.repeater, resolver, config.projectId, config.monitoring, config.gcpUserAgent) } private val bufferSize = Opts diff --git a/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Resources.scala b/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Resources.scala index 4ec75eae..5082fb7b 100644 --- a/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Resources.scala +++ b/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Resources.scala @@ -69,7 +69,7 @@ object Resources { // It's a function because blocker needs to be created as Resource val initResources: F[(Sentry[F]) => F[Resources[F]]] = for { env <- Sync[F].delay(cmd.env) - bigQuery <- services.Database.getClient[F](cmd.env.projectId) + bigQuery <- services.Database.getClient[F](cmd.env.projectId, cmd.env.gcpUserAgent) bucket <- Sync[F].fromEither( validateBucket(env.config.output.deadLetters.bucket) .toEither diff --git a/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/services/Database.scala b/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/services/Database.scala index 1c4669f3..4848b7f4 100644 --- a/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/services/Database.scala +++ b/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/services/Database.scala @@ -14,10 +14,11 @@ package com.snowplowanalytics.snowplow.storage.bigquery.repeater.services import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure, FailureDetails, Payload} import com.snowplowanalytics.snowplow.storage.bigquery.repeater.{EventContainer, Repeater} - import cats.effect.Sync import cats.syntax.all._ import com.google.cloud.bigquery.{Option => _, _} +import com.snowplowanalytics.snowplow.storage.bigquery.common.config.AllAppsConfig.GcpUserAgent +import com.snowplowanalytics.snowplow.storage.bigquery.common.createGcpUserAgentHeader import org.typelevel.log4cats.Logger import io.circe.syntax._ @@ -75,9 +76,14 @@ object Database { } } - def getClient[F[_]: Sync](projectId: String): F[BigQuery] = + def getClient[F[_]: Sync](projectId: String, gcpUserAgent: GcpUserAgent): F[BigQuery] = Sync[F].delay( - BigQueryOptions.newBuilder.setProjectId(projectId).build.getService + BigQueryOptions + .newBuilder + .setProjectId(projectId) + .setHeaderProvider(createGcpUserAgentHeader(gcpUserAgent)) + .build + .getService ) /** The first argument passed to addRow is an ID used to deduplicate inserts. diff --git a/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/services/PubSub.scala b/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/services/PubSub.scala index 977e65fd..f5a5f124 100644 --- a/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/services/PubSub.scala +++ b/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/services/PubSub.scala @@ -14,13 +14,14 @@ package com.snowplowanalytics.snowplow.storage.bigquery.repeater.services import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure, FailureDetails, Payload} import com.snowplowanalytics.snowplow.storage.bigquery.repeater.{EventContainer, Repeater} - import cats.effect._ import cats.effect.std.Queue import cats.syntax.all._ import com.google.pubsub.v1.PubsubMessage import com.permutive.pubsub.consumer.{ConsumerRecord, Model} import com.permutive.pubsub.consumer.grpc.{PubsubGoogleConsumer, PubsubGoogleConsumerConfig} +import com.snowplowanalytics.snowplow.storage.bigquery.common.config.AllAppsConfig.GcpUserAgent +import com.snowplowanalytics.snowplow.storage.bigquery.common.createGcpUserAgentHeader import fs2.Stream import org.typelevel.log4cats.Logger @@ -31,13 +32,17 @@ object PubSub { def getEvents[F[_]: Sync: Logger]( projectId: String, subscription: String, - uninsertable: Queue[F, BadRow] + uninsertable: Queue[F, BadRow], + gcpUserAgent: GcpUserAgent ): Stream[F, ConsumerRecord[F, EventContainer]] = PubsubGoogleConsumer.subscribe[F, EventContainer]( Model.ProjectId(projectId), Model.Subscription(subscription), (msg, err, ack, _) => callback[F](msg, err, ack, uninsertable), - PubsubGoogleConsumerConfig[F](onFailedTerminate = t => Logger[F].error(s"Terminating consumer due to $t")) + PubsubGoogleConsumerConfig[F]( + onFailedTerminate = t => Logger[F].error(s"Terminating consumer due to $t"), + customizeSubscriber = Some(_.setHeaderProvider(createGcpUserAgentHeader(gcpUserAgent))) + ) ) private def callback[F[_]: Sync](msg: PubsubMessage, err: Throwable, ack: F[Unit], uninsertable: Queue[F, BadRow]) = { diff --git a/modules/streamloader/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/streamloader/Bigquery.scala b/modules/streamloader/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/streamloader/Bigquery.scala index 667f4c1b..b73ff531 100644 --- a/modules/streamloader/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/streamloader/Bigquery.scala +++ b/modules/streamloader/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/streamloader/Bigquery.scala @@ -12,10 +12,9 @@ */ package com.snowplowanalytics.snowplow.storage.bigquery.streamloader -import com.snowplowanalytics.snowplow.storage.bigquery.common.LoaderRow +import com.snowplowanalytics.snowplow.storage.bigquery.common.{LoaderRow, createGcpUserAgentHeader} import com.snowplowanalytics.snowplow.storage.bigquery.common.config.model.{BigQueryRetrySettings, Output} import com.snowplowanalytics.snowplow.storage.bigquery.common.metrics.Metrics - import cats.Parallel import cats.effect.{Async, Sync} import cats.implicits._ @@ -24,6 +23,7 @@ import com.google.api.client.json.gson.GsonFactory import com.google.api.gax.retrying.RetrySettings import com.google.cloud.bigquery.{BigQuery, BigQueryOptions, InsertAllRequest, InsertAllResponse, TableId} import com.google.cloud.bigquery.InsertAllRequest.RowToInsert +import com.snowplowanalytics.snowplow.storage.bigquery.common.config.AllAppsConfig.GcpUserAgent import org.threeten.bp.Duration import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger @@ -99,7 +99,7 @@ object Bigquery { } } - def getClient[F[_]: Sync](rs: BigQueryRetrySettings, projectId: String): F[BigQuery] = { + def getClient[F[_]: Sync](rs: BigQueryRetrySettings, projectId: String, gcpUserAgent: GcpUserAgent): F[BigQuery] = { val retrySettings = RetrySettings .newBuilder() @@ -110,7 +110,13 @@ object Bigquery { .build Sync[F].delay( - BigQueryOptions.newBuilder.setRetrySettings(retrySettings).setProjectId(projectId).build.getService + BigQueryOptions + .newBuilder + .setRetrySettings(retrySettings) + .setProjectId(projectId) + .setHeaderProvider(createGcpUserAgentHeader(gcpUserAgent)) + .build + .getService ) } diff --git a/modules/streamloader/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/streamloader/Producer.scala b/modules/streamloader/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/streamloader/Producer.scala index 292546ac..4f98c14b 100644 --- a/modules/streamloader/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/streamloader/Producer.scala +++ b/modules/streamloader/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/streamloader/Producer.scala @@ -14,17 +14,14 @@ package com.snowplowanalytics.snowplow.storage.bigquery.streamloader import java.time.Instant import java.nio.charset.StandardCharsets - import cats.implicits._ - import cats.effect.{Async, Resource} - import com.permutive.pubsub.producer.{Model, PubsubProducer} import com.permutive.pubsub.producer.encoder.MessageEncoder import com.permutive.pubsub.producer.grpc.{GooglePubsubProducer, PubsubProducerConfig} - import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure, Payload => BadRowPayload} - +import com.snowplowanalytics.snowplow.storage.bigquery.common.config.AllAppsConfig.GcpUserAgent +import com.snowplowanalytics.snowplow.storage.bigquery.common.createGcpUserAgentHeader import org.typelevel.log4cats.Logger import scala.concurrent.duration._ @@ -42,9 +39,10 @@ object Producer { topic: String, batchSize: Long, delay: FiniteDuration, - oversizeBadRowProducer: Producer[F, BadRow.SizeViolation] + oversizeBadRowProducer: Producer[F, BadRow.SizeViolation], + gcpUserAgent: GcpUserAgent ): Resource[F, Producer[F, A]] = - mkPubsubProducer[F, A](projectId, topic, batchSize, delay).map { p => + mkPubsubProducer[F, A](projectId, topic, batchSize, delay, gcpUserAgent).map { p => (data: A) => produceWrtToSize[F, A](data, p.produce(_).void, oversizeBadRowProducer) } @@ -52,9 +50,10 @@ object Producer { projectId: String, topic: String, batchSize: Long, - delay: FiniteDuration + delay: FiniteDuration, + gcpUserAgent: GcpUserAgent ): Resource[F, (Producer[F, BadRow], Producer[F, BadRow.SizeViolation])] = - mkPubsubProducer[F, BadRow](projectId, topic, batchSize, delay).map { p => + mkPubsubProducer[F, BadRow](projectId, topic, batchSize, delay, gcpUserAgent).map { p => val badRowProducer = new Producer[F, BadRow] { override def produce(badRow: BadRow): F[Unit] = { produceWrtToSize[F, BadRow](badRow, p.produce(_).void, p.produce(_).void) @@ -72,7 +71,8 @@ object Producer { projectId: String, topic: String, batchSize: Long, - delay: FiniteDuration + delay: FiniteDuration, + gcpUserAgent: GcpUserAgent ): Resource[F, PubsubProducer[F, A]] = GooglePubsubProducer.of[F, A]( Model.ProjectId(projectId), @@ -80,7 +80,8 @@ object Producer { config = PubsubProducerConfig[F]( batchSize = batchSize, delayThreshold = delay, - onFailedTerminate = e => Logger[F].error(e)(s"Error in PubSub producer") + onFailedTerminate = e => Logger[F].error(e)(s"Error in PubSub producer"), + customizePublisher = Some(_.setHeaderProvider(createGcpUserAgentHeader(gcpUserAgent))) ) ) diff --git a/modules/streamloader/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/streamloader/Resources.scala b/modules/streamloader/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/streamloader/Resources.scala index 60569dd4..096192ba 100644 --- a/modules/streamloader/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/streamloader/Resources.scala +++ b/modules/streamloader/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/streamloader/Resources.scala @@ -19,18 +19,12 @@ import com.snowplowanalytics.lrumap.CreateLruMap import com.snowplowanalytics.snowplow.analytics.scalasdk.Data.ShreddedType import com.snowplowanalytics.snowplow.badrows.BadRow import com.snowplowanalytics.snowplow.storage.bigquery.common.config.Environment.LoaderEnvironment -import com.snowplowanalytics.snowplow.storage.bigquery.common.config.model.{ - BigQueryRetrySettings, - Monitoring, - Output, - SinkSettings -} +import com.snowplowanalytics.snowplow.storage.bigquery.common.config.model.{BigQueryRetrySettings, Monitoring, Output, SinkSettings} import com.snowplowanalytics.snowplow.storage.bigquery.streamloader.StreamLoader.{StreamBadRow, StreamLoaderRow} import com.snowplowanalytics.snowplow.storage.bigquery.common.metrics.Metrics import com.snowplowanalytics.snowplow.storage.bigquery.common.metrics.Metrics.ReportingApp import com.snowplowanalytics.snowplow.storage.bigquery.common.{FieldCache, FieldKey, Sentry} import com.snowplowanalytics.snowplow.storage.bigquery.streamloader.Bigquery.FailedInsert - import cats.Parallel import cats.effect.{Async, Resource, Sync} import cats.implicits._ @@ -39,9 +33,9 @@ import com.google.cloud.bigquery.BigQuery import com.snowplowanalytics.iglu.client.resolver.Resolver import com.snowplowanalytics.iglu.client.resolver.Resolver.ResolverConfig import com.snowplowanalytics.iglu.schemaddl.bigquery.Field +import com.snowplowanalytics.snowplow.storage.bigquery.common.config.AllAppsConfig.GcpUserAgent import fs2.{Pipe, Stream} import io.circe.Json - import org.typelevel.log4cats.Logger import org.http4s.ember.client.EmberClientBuilder @@ -97,26 +91,29 @@ object Resources { resolverConfig <- mkResolverConfig(env.resolverJson) resolver <- mkResolver(resolverConfig) metrics <- Resource.eval(mkMetricsReporter[F](env.monitoring)) - bigquery <- Resource.eval[F, BigQuery](Bigquery.getClient(env.config.retrySettings, env.projectId)) + bigquery <- Resource.eval[F, BigQuery](Bigquery.getClient(env.config.retrySettings, env.projectId, env.gcpUserAgent)) (badRowProducer, oversizeBadRowProducer) <- Producer.mkBadRowProducers[F]( env.projectId, env.config.output.bad.topic, env.config.sinkSettings.bad.producerBatchSize, - env.config.sinkSettings.bad.producerDelayThreshold + env.config.sinkSettings.bad.producerDelayThreshold, + env.gcpUserAgent ) types <- mkTypeSink[F]( env.projectId, env.config.output.types.topic, env.config.sinkSettings.types, metrics, - oversizeBadRowProducer + oversizeBadRowProducer, + env.gcpUserAgent ) failedInserts <- Producer.mkProducer[F, Bigquery.FailedInsert]( env.projectId, env.config.output.failedInserts.topic, env.config.sinkSettings.failedInserts.producerBatchSize, env.config.sinkSettings.failedInserts.producerDelayThreshold, - oversizeBadRowProducer + oversizeBadRowProducer, + env.gcpUserAgent ) badSink = mkBadSink[F]( env.config.sinkSettings.bad.sinkConcurrency, @@ -127,7 +124,7 @@ object Resources { http <- EmberClientBuilder.default[F].build lookup <- Resource.eval(CreateLruMap[F, FieldKey, Field].create(resolverConfig.cacheSize)) goodSink = mkGoodSink[F](env.config.output.good, bigquery, failedInserts, metrics, types, env.config.sinkSettings.good, env.config.sinkSettings.types, env.config.retrySettings) - source = Source.getStream[F](env.projectId, env.config.input.subscription, env.config.consumerSettings) + source = Source.getStream[F](env.projectId, env.config.input.subscription, env.config.consumerSettings, env.gcpUserAgent) registryLookup = Http4sRegistryLookup(http) } yield new Resources[F](source, resolver, badSink, goodSink, metrics, sentry, registryLookup, lookup) // format: on @@ -150,14 +147,16 @@ object Resources { topic: String, sinkSettingsTypes: SinkSettings.Types, metrics: Metrics[F], - oversizeBadRowProducer: Producer[F, BadRow.SizeViolation] + oversizeBadRowProducer: Producer[F, BadRow.SizeViolation], + gcpUserAgent: GcpUserAgent ): Resource[F, Pipe[F, Set[ShreddedType], Nothing]] = Producer.mkProducer[F, Set[ShreddedType]]( projectId, topic, sinkSettingsTypes.producerBatchSize, sinkSettingsTypes.producerDelayThreshold, - oversizeBadRowProducer + oversizeBadRowProducer, + gcpUserAgent ).map { p => _.parEvalMapUnordered(sinkSettingsTypes.sinkConcurrency) { types => p.produce(types).void *> metrics.typesCount(types.size) diff --git a/modules/streamloader/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/streamloader/Source.scala b/modules/streamloader/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/streamloader/Source.scala index 9e9a5299..f65f0bc7 100644 --- a/modules/streamloader/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/streamloader/Source.scala +++ b/modules/streamloader/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/streamloader/Source.scala @@ -20,13 +20,16 @@ import com.google.api.gax.batching.FlowControlSettings import com.google.pubsub.v1.PubsubMessage import com.permutive.pubsub.consumer.grpc.{PubsubGoogleConsumer, PubsubGoogleConsumerConfig} import com.permutive.pubsub.consumer.Model +import com.snowplowanalytics.snowplow.storage.bigquery.common.config.AllAppsConfig.GcpUserAgent +import com.snowplowanalytics.snowplow.storage.bigquery.common.createGcpUserAgentHeader import fs2.Stream object Source { def getStream[F[_]: Sync: Logger]( projectId: String, subscription: String, - cs: ConsumerSettings + cs: ConsumerSettings, + gcpUserAgent: GcpUserAgent ): Stream[F, Payload[F]] = { val onFailedTerminate: Throwable => F[Unit] = @@ -47,7 +50,9 @@ object Source { maxAckExtensionPeriod = cs.maxAckExtensionPeriod, awaitTerminatePeriod = cs.awaitTerminatePeriod, onFailedTerminate = onFailedTerminate, - customizeSubscriber = Some(builder => builder.setFlowControlSettings(flowControlSettings)) + customizeSubscriber = Some { builder => + builder.setFlowControlSettings(flowControlSettings).setHeaderProvider(createGcpUserAgentHeader(gcpUserAgent)) + } ) val errorHandler: (PubsubMessage, Throwable, F[Unit], F[Unit]) => F[Unit] = diff --git a/modules/streamloader/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/streamloader/StreamLoaderCli.scala b/modules/streamloader/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/streamloader/StreamLoaderCli.scala index 82b1881a..666b83dd 100644 --- a/modules/streamloader/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/streamloader/StreamLoaderCli.scala +++ b/modules/streamloader/src/main/scala/com/snowplowanalytics/snowplow/storage/bigquery/streamloader/StreamLoaderCli.scala @@ -26,5 +26,11 @@ object StreamLoaderCli { def parse(args: Seq[String]): Either[String, LoaderEnvironment] = for { parsed <- command.parse(args).leftMap(_.show) - } yield Environment(parsed.config.loader, parsed.resolver, parsed.config.projectId, parsed.config.monitoring) + } yield Environment( + parsed.config.loader, + parsed.resolver, + parsed.config.projectId, + parsed.config.monitoring, + parsed.config.gcpUserAgent + ) }