Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release/1.7.0 #364

Merged
merged 2 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
Version 1.7.0 (2023-11-20)
--------------------------
Set GCP user agent header for BQ and Pubsub (#363)

Version 1.6.8 (2023-10-13)
--------------------------
Create SizeViolation bad row for oversized messages sent to Pubsub (#361)
Expand Down
4 changes: 4 additions & 0 deletions modules/common/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,8 @@

"monitoring": {
}

"gcpUserAgent": {
"productName": "Snowplow OSS"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
*/
package com.snowplowanalytics.snowplow.storage.bigquery.common.config

import com.snowplowanalytics.snowplow.storage.bigquery.common.config.AllAppsConfig.GcpUserAgent
import com.snowplowanalytics.snowplow.storage.bigquery.common.config.model._

import io.circe.{Decoder, Encoder}
import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}

Expand All @@ -22,11 +22,17 @@ final case class AllAppsConfig(
loader: Config.Loader,
mutator: Config.Mutator,
repeater: Config.Repeater,
monitoring: Monitoring
monitoring: Monitoring,
gcpUserAgent: GcpUserAgent
)

object AllAppsConfig {

final case class GcpUserAgent(productName: String)

implicit val gcpUserAgentEncoder: Encoder[GcpUserAgent] = deriveEncoder[GcpUserAgent]
implicit val gcpUserAgentDecoder: Decoder[GcpUserAgent] = deriveDecoder[GcpUserAgent]

implicit val allAppsConfigDecoder: Decoder[AllAppsConfig] = deriveDecoder[AllAppsConfig]
implicit val allAppsConfigEncoder: Encoder[AllAppsConfig] = deriveEncoder[AllAppsConfig]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,17 @@
*/
package com.snowplowanalytics.snowplow.storage.bigquery.common.config

import com.snowplowanalytics.snowplow.storage.bigquery.common.config.AllAppsConfig.GcpUserAgent
import io.circe.Json

import com.snowplowanalytics.snowplow.storage.bigquery.common.config.model.{Config, Monitoring}

final case class Environment[A](config: A, resolverJson: Json, projectId: String, monitoring: Monitoring) {
final case class Environment[A](
config: A,
resolverJson: Json,
projectId: String,
monitoring: Monitoring,
gcpUserAgent: GcpUserAgent
) {
def getFullSubName(sub: String): String = s"projects/$projectId/subscriptions/$sub"
def getFullTopicName(topic: String): String = s"projects/$projectId/topics/$topic"
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
package com.snowplowanalytics.snowplow.storage.bigquery

import com.google.api.gax.rpc.FixedHeaderProvider
import com.snowplowanalytics.iglu.client.resolver.StorageTime
import com.snowplowanalytics.iglu.core.SchemaKey
import com.snowplowanalytics.iglu.schemaddl.bigquery.Field
import com.snowplowanalytics.lrumap.LruMap
import com.snowplowanalytics.snowplow.storage.bigquery.common.config.AllAppsConfig.GcpUserAgent

package object common {
type FieldKey = (SchemaKey, StorageTime)
type FieldKey = (SchemaKey, StorageTime)
type FieldCache[F[_]] = LruMap[F, FieldKey, Field]

def createGcpUserAgentHeader(gcpUserAgent: GcpUserAgent): FixedHeaderProvider =
FixedHeaderProvider.create("user-agent", s"${gcpUserAgent.productName}/bigquery-loader (GPN:Snowplow;)")

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (c) 2023 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.storage.bigquery.common

import com.snowplowanalytics.snowplow.storage.bigquery.common.config.AllAppsConfig.GcpUserAgent
import org.specs2.mutable.Specification

import java.util.regex.Pattern

class GcpUserAgentSpec extends Specification {

"createUserAgent" should {
"create user agent string correctly" in {
val gcpUserAgent = GcpUserAgent(productName = "Snowplow OSS")
val resultUserAgent = createGcpUserAgentHeader(gcpUserAgent).getHeaders.get("user-agent")
val expectedUserAgent = s"Snowplow OSS/bigquery-loader (GPN:Snowplow;)"

val userAgentRegex = Pattern.compile(
"""(?iU)(?:[^\(\)\/]+\/[^\/]+\s+)*(?:[^\s][^\(\)\/]+\/[^\/]+\s?\([^\(\)]*)gpn:(.*)[;\)]"""
)
val matcher = userAgentRegex.matcher(resultUserAgent)
val matched = if (matcher.find()) Some(matcher.group(1)) else None
val expectedMatched = "Snowplow;"

resultUserAgent must beEqualTo(expectedUserAgent)
matched must beSome(expectedMatched)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.snowplowanalytics.lrumap.CreateLruMap
import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.analytics.scalasdk.SnowplowEvent.{Contexts, UnstructEvent}
import com.snowplowanalytics.snowplow.badrows.Processor
import com.snowplowanalytics.snowplow.storage.bigquery.common.config.AllAppsConfig.GcpUserAgent
import com.snowplowanalytics.snowplow.storage.bigquery.common.config.Environment
import com.snowplowanalytics.snowplow.storage.bigquery.common.config.Environment.{
LoaderEnvironment,
Expand Down Expand Up @@ -658,6 +659,7 @@ object SpecHelpers {

private val terminationTimeout = FiniteDuration(60, SECONDS)

private val gcpUserAgent: GcpUserAgent = GcpUserAgent("Snowplow OSS")
private val loader: Config.Loader =
Config.Loader(lInput, lOutput, consumerSettings, sinkSettings, retrySettings, terminationTimeout)

Expand All @@ -684,11 +686,12 @@ object SpecHelpers {
private val sentry: SentryConfig = SentryConfig(URI.create("http://sentry.acme.com"))
private val monitoring: Monitoring = Monitoring(Some(statsd), Some(stdout), Some(sentry))

private[bigquery] val loaderEnv: LoaderEnvironment = Environment(loader, validResolverJson, projectId, monitoring)
private[bigquery] val loaderEnv: LoaderEnvironment =
Environment(loader, validResolverJson, projectId, monitoring, gcpUserAgent)
private[bigquery] val mutatorEnv: MutatorEnvironment =
Environment(mutator, validResolverJson, projectId, monitoring)
Environment(mutator, validResolverJson, projectId, monitoring, gcpUserAgent)
private[bigquery] val repeaterEnv: RepeaterEnvironment =
Environment(repeater, validResolverJson, projectId, monitoring)
Environment(repeater, validResolverJson, projectId, monitoring, gcpUserAgent)
}

object cache {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ object Main extends IOApp {

case Right(c: MutatorCommand.Create) =>
for {
client <- TableReference.BigQueryTable.getClient(c.env.projectId)
client <- TableReference.BigQueryTable.getClient(c.env.projectId, c.env.gcpUserAgent)
_ <- TableReference.BigQueryTable.create(c, client)
} yield ExitCode.Success

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ object Mutator {
verbose: Boolean
)(implicit c: Concurrent[IO], logger: Logger[IO]): EitherT[IO, String, Pipe[IO, List[ShreddedType], Unit]] =
for {
bqClient <- EitherT.liftF(TableReference.BigQueryTable.getClient(env.projectId))
bqClient <- EitherT.liftF(TableReference.BigQueryTable.getClient(env.projectId, env.gcpUserAgent))
table = new TableReference.BigQueryTable(
bqClient,
env.config.output.good.datasetId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ object MutatorCli {
private val options: Opts[MutatorEnvironment] =
CliConfig.options.map {
case CliConfig.Parsed(config, resolver) =>
Environment(config.mutator, resolver, config.projectId, config.monitoring)
Environment(config.mutator, resolver, config.projectId, config.monitoring, config.gcpUserAgent)
}

private val schema: Opts[SchemaKey] = Opts.option[String]("schema", "Iglu URI to add to the table").mapValidated {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
*/
package com.snowplowanalytics.snowplow.storage.bigquery.mutator

import com.snowplowanalytics.snowplow.storage.bigquery.common.{Adapter, LoaderRow}
import com.snowplowanalytics.snowplow.storage.bigquery.common.{Adapter, LoaderRow, createGcpUserAgentHeader}
import com.snowplowanalytics.snowplow.storage.bigquery.mutator.MutatorCli.MutatorCommand

import cats.effect.IO
import com.google.cloud.bigquery.{Schema => BqSchema, _}
import com.snowplowanalytics.snowplow.storage.bigquery.common.config.AllAppsConfig.GcpUserAgent

import scala.jdk.CollectionConverters._

Expand Down Expand Up @@ -56,9 +56,14 @@ object TableReference {
}

object BigQueryTable {
def getClient(projectId: String): IO[BigQuery] =
def getClient(projectId: String, gcpUserAgent: GcpUserAgent): IO[BigQuery] =
IO(
BigQueryOptions.newBuilder.setProjectId(projectId).build.getService
BigQueryOptions
.newBuilder
.setProjectId(projectId)
.setHeaderProvider(createGcpUserAgentHeader(gcpUserAgent))
.build
.getService
)

def create(args: MutatorCommand.Create, client: BigQuery): IO[Table] = IO {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import cats.effect.std.Queue
import cats.effect.unsafe.IORuntime
import cats.syntax.either._
import cats.syntax.show._
import com.google.api.gax.rpc.FixedHeaderProvider
import com.google.cloud.pubsub.v1.{AckReplyConsumer, MessageReceiver, Subscriber}
import com.google.pubsub.v1.{ProjectSubscriptionName, PubsubMessage}
import com.snowplowanalytics.snowplow.storage.bigquery.common.createGcpUserAgentHeader
import io.circe.{Decoder, DecodingFailure, Error, Json}
import io.circe.jawn.parse

Expand Down Expand Up @@ -83,8 +83,6 @@ class TypeReceiver(queue: Queue[IO, List[ShreddedType]], verbose: Boolean)(impli
}

object TypeReceiver {
private val UserAgent =
FixedHeaderProvider.create("User-Agent", generated.BuildInfo.userAgent)

/** Decode inventory items either in legacy (non-self-describing) format or as `shredded_types` schema'ed */
def decodeItems(json: Json): Decoder.Result[List[ShreddedType]] =
Expand Down Expand Up @@ -113,7 +111,10 @@ object TypeReceiver {
def startSubscription(env: MutatorEnvironment, listener: TypeReceiver): IO[Unit] =
IO {
val subscription = ProjectSubscriptionName.of(env.projectId, env.config.input.subscription)
val subscriber = Subscriber.newBuilder(subscription, listener).setHeaderProvider(UserAgent).build()
val subscriber = Subscriber
.newBuilder(subscription, listener)
.setHeaderProvider(createGcpUserAgentHeader(env.gcpUserAgent))
.build()
subscriber.startAsync().awaitRunning(10L, TimeUnit.SECONDS)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ object Repeater extends IOApp {
.getEvents(
resources.env.projectId,
resources.env.config.input.subscription,
resources.uninsertable
resources.uninsertable,
resources.env.gcpUserAgent
)
.interruptWhen(resources.stop)
.through[IO, Unit](Flow.sink(resources))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ object RepeaterCli {
private val options: Opts[RepeaterEnvironment] =
CliConfig.options.map {
case CliConfig.Parsed(config, resolver) =>
Environment(config.repeater, resolver, config.projectId, config.monitoring)
Environment(config.repeater, resolver, config.projectId, config.monitoring, config.gcpUserAgent)
}

private val bufferSize = Opts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ object Resources {
// It's a function because blocker needs to be created as Resource
val initResources: F[(Sentry[F]) => F[Resources[F]]] = for {
env <- Sync[F].delay(cmd.env)
bigQuery <- services.Database.getClient[F](cmd.env.projectId)
bigQuery <- services.Database.getClient[F](cmd.env.projectId, cmd.env.gcpUserAgent)
bucket <- Sync[F].fromEither(
validateBucket(env.config.output.deadLetters.bucket)
.toEither
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ package com.snowplowanalytics.snowplow.storage.bigquery.repeater.services

import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure, FailureDetails, Payload}
import com.snowplowanalytics.snowplow.storage.bigquery.repeater.{EventContainer, Repeater}

import cats.effect.Sync
import cats.syntax.all._
import com.google.cloud.bigquery.{Option => _, _}
import com.snowplowanalytics.snowplow.storage.bigquery.common.config.AllAppsConfig.GcpUserAgent
import com.snowplowanalytics.snowplow.storage.bigquery.common.createGcpUserAgentHeader
import org.typelevel.log4cats.Logger
import io.circe.syntax._

Expand Down Expand Up @@ -75,9 +76,14 @@ object Database {
}
}

def getClient[F[_]: Sync](projectId: String): F[BigQuery] =
def getClient[F[_]: Sync](projectId: String, gcpUserAgent: GcpUserAgent): F[BigQuery] =
Sync[F].delay(
BigQueryOptions.newBuilder.setProjectId(projectId).build.getService
BigQueryOptions
.newBuilder
.setProjectId(projectId)
.setHeaderProvider(createGcpUserAgentHeader(gcpUserAgent))
.build
.getService
)

/** The first argument passed to addRow is an ID used to deduplicate inserts.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@ package com.snowplowanalytics.snowplow.storage.bigquery.repeater.services

import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure, FailureDetails, Payload}
import com.snowplowanalytics.snowplow.storage.bigquery.repeater.{EventContainer, Repeater}

import cats.effect._
import cats.effect.std.Queue
import cats.syntax.all._
import com.google.pubsub.v1.PubsubMessage
import com.permutive.pubsub.consumer.{ConsumerRecord, Model}
import com.permutive.pubsub.consumer.grpc.{PubsubGoogleConsumer, PubsubGoogleConsumerConfig}
import com.snowplowanalytics.snowplow.storage.bigquery.common.config.AllAppsConfig.GcpUserAgent
import com.snowplowanalytics.snowplow.storage.bigquery.common.createGcpUserAgentHeader
import fs2.Stream
import org.typelevel.log4cats.Logger

Expand All @@ -31,13 +32,17 @@ object PubSub {
def getEvents[F[_]: Sync: Logger](
projectId: String,
subscription: String,
uninsertable: Queue[F, BadRow]
uninsertable: Queue[F, BadRow],
gcpUserAgent: GcpUserAgent
): Stream[F, ConsumerRecord[F, EventContainer]] =
PubsubGoogleConsumer.subscribe[F, EventContainer](
Model.ProjectId(projectId),
Model.Subscription(subscription),
(msg, err, ack, _) => callback[F](msg, err, ack, uninsertable),
PubsubGoogleConsumerConfig[F](onFailedTerminate = t => Logger[F].error(s"Terminating consumer due to $t"))
PubsubGoogleConsumerConfig[F](
onFailedTerminate = t => Logger[F].error(s"Terminating consumer due to $t"),
customizeSubscriber = Some(_.setHeaderProvider(createGcpUserAgentHeader(gcpUserAgent)))
)
)

private def callback[F[_]: Sync](msg: PubsubMessage, err: Throwable, ack: F[Unit], uninsertable: Queue[F, BadRow]) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@
*/
package com.snowplowanalytics.snowplow.storage.bigquery.streamloader

import com.snowplowanalytics.snowplow.storage.bigquery.common.LoaderRow
import com.snowplowanalytics.snowplow.storage.bigquery.common.{LoaderRow, createGcpUserAgentHeader}
import com.snowplowanalytics.snowplow.storage.bigquery.common.config.model.{BigQueryRetrySettings, Output}
import com.snowplowanalytics.snowplow.storage.bigquery.common.metrics.Metrics

import cats.Parallel
import cats.effect.{Async, Sync}
import cats.implicits._
Expand All @@ -24,6 +23,7 @@ import com.google.api.client.json.gson.GsonFactory
import com.google.api.gax.retrying.RetrySettings
import com.google.cloud.bigquery.{BigQuery, BigQueryOptions, InsertAllRequest, InsertAllResponse, TableId}
import com.google.cloud.bigquery.InsertAllRequest.RowToInsert
import com.snowplowanalytics.snowplow.storage.bigquery.common.config.AllAppsConfig.GcpUserAgent
import org.threeten.bp.Duration
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger
Expand Down Expand Up @@ -99,7 +99,7 @@ object Bigquery {
}
}

def getClient[F[_]: Sync](rs: BigQueryRetrySettings, projectId: String): F[BigQuery] = {
def getClient[F[_]: Sync](rs: BigQueryRetrySettings, projectId: String, gcpUserAgent: GcpUserAgent): F[BigQuery] = {
val retrySettings =
RetrySettings
.newBuilder()
Expand All @@ -110,7 +110,13 @@ object Bigquery {
.build

Sync[F].delay(
BigQueryOptions.newBuilder.setRetrySettings(retrySettings).setProjectId(projectId).build.getService
BigQueryOptions
.newBuilder
.setRetrySettings(retrySettings)
.setProjectId(projectId)
.setHeaderProvider(createGcpUserAgentHeader(gcpUserAgent))
.build
.getService
)
}

Expand Down
Loading
Loading