Skip to content

Commit

Permalink
enrich-pubsub: set UserAgent header in Pubsub publisher and consumer (c…
Browse files Browse the repository at this point in the history
…lose #826)
  • Loading branch information
spenes committed Nov 14, 2023
1 parent 29f2672 commit c8153c2
Show file tree
Hide file tree
Showing 14 changed files with 368 additions and 91 deletions.
6 changes: 3 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ lazy val root = project.in(file("."))
.settings(projectSettings)
.settings(compilerSettings)
.settings(resolverSettings)
.aggregate(common, commonFs2, pubsub, pubsubDistroless, kinesis, kinesisDistroless, kafka, kafkaDistroless, nsq, nsqDistroless)
.aggregate(common, commonFs2, pubsub, kinesis, kafka, nsq)

lazy val common = project
.in(file("modules/common"))
Expand All @@ -49,7 +49,7 @@ lazy val pubsub = project
.settings(libraryDependencies ++= pubsubDependencies)
.settings(excludeDependencies ++= exclusions)
.settings(addCompilerPlugin(betterMonadicFor))
.dependsOn(commonFs2)
.dependsOn(commonFs2 % "test->test;compile->compile")

lazy val pubsubDistroless = project
.in(file("modules/distroless/pubsub"))
Expand All @@ -59,7 +59,7 @@ lazy val pubsubDistroless = project
.settings(libraryDependencies ++= pubsubDependencies)
.settings(excludeDependencies ++= exclusions)
.settings(addCompilerPlugin(betterMonadicFor))
.dependsOn(commonFs2)
.dependsOn(commonFs2 % "test->test;compile->compile")


lazy val kinesis = project
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ object ConfigFile {
case c @ ConfigFile(_, Outputs(good, Some(output: Output.Kinesis), bad), _, _, _, _, _, _, _, _) if output.streamName.isEmpty =>
c.copy(output = Outputs(good, None, bad)).asRight
// Remove pii output if topic empty
case c @ ConfigFile(_, Outputs(good, Some(Output.PubSub(t, _, _, _, _)), bad), _, _, _, _, _, _, _, _) if t.isEmpty =>
case c @ ConfigFile(_, Outputs(good, Some(Output.PubSub(t, _, _, _, _, _)), bad), _, _, _, _, _, _, _, _) if t.isEmpty =>
c.copy(output = Outputs(good, None, bad)).asRight
// Remove pii output if topic empty
case c @ ConfigFile(_, Outputs(good, Some(Output.Kafka(topicName, _, _, _, _)), bad), _, _, _, _, _, _, _, _) if topicName.isEmpty =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ object ParsedConfigs {

private[config] def outputAttributes(output: OutputConfig): EnrichedEvent => Map[String, String] =
output match {
case OutputConfig.PubSub(_, Some(attributes), _, _, _) => attributesFromFields(attributes)
case OutputConfig.PubSub(_, Some(attributes), _, _, _, _) => attributesFromFields(attributes)
case OutputConfig.Kafka(_, _, _, headers, _) => attributesFromFields(headers)
case _ => _ => Map.empty
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ object io {
parallelPullCount: Int,
maxQueueSize: Int,
maxRequestBytes: Int,
maxAckExtensionPeriod: FiniteDuration
maxAckExtensionPeriod: FiniteDuration,
gcpUserAgent: GcpUserAgent
) extends Input {
val (project, name) =
subscription.split("/").toList match {
Expand Down Expand Up @@ -192,7 +193,7 @@ object io {
implicit val inputDecoder: Decoder[Input] =
deriveConfiguredDecoder[Input]
.emap {
case s @ PubSub(sub, _, _, _, _) =>
case s @ PubSub(sub, _, _, _, _, _) =>
sub.split("/").toList match {
case List("projects", _, "subscriptions", _) =>
s.asRight
Expand All @@ -204,13 +205,13 @@ object io {
case other => other.asRight
}
.emap {
case PubSub(_, p, _, _, _) if p <= 0 =>
case PubSub(_, p, _, _, _, _) if p <= 0 =>
"PubSub parallelPullCount must be > 0".asLeft
case PubSub(_, _, m, _, _) if m <= 0 =>
case PubSub(_, _, m, _, _, _) if m <= 0 =>
"PubSub maxQueueSize must be > 0".asLeft
case PubSub(_, _, _, m, _) if m <= 0 =>
case PubSub(_, _, _, m, _, _) if m <= 0 =>
"PubSub maxRequestBytes must be > 0".asLeft
case PubSub(_, _, _, _, m) if m < Duration.Zero =>
case PubSub(_, _, _, _, m, _) if m < Duration.Zero =>
"PubSub maxAckExtensionPeriod must be >= 0".asLeft
case other =>
other.asRight
Expand Down Expand Up @@ -253,7 +254,8 @@ object io {
attributes: Option[Set[String]],
delayThreshold: FiniteDuration,
maxBatchSize: Long,
maxBatchBytes: Long
maxBatchBytes: Long,
gcpUserAgent: GcpUserAgent
) extends Output {
val (project, name) =
topic.split("/").toList match {
Expand Down Expand Up @@ -282,7 +284,7 @@ object io {
.emap {
case Kafka(topicName, bootstrapServers, _, _, _) if topicName.isEmpty ^ bootstrapServers.isEmpty =>
"Both topicName and bootstrapServers have to be set".asLeft
case s @ PubSub(top, _, _, _, _) if top.nonEmpty =>
case s @ PubSub(top, _, _, _, _, _) if top.nonEmpty =>
top.split("/").toList match {
case List("projects", _, "topics", _) =>
s.asRight
Expand Down Expand Up @@ -475,6 +477,15 @@ object io {
)
}

case class GcpUserAgent(productName: String)

object GcpUserAgent {
implicit val gcpUserAgentDecoder: Decoder[GcpUserAgent] =
deriveConfiguredDecoder[GcpUserAgent]
implicit val gcpUserAgentEncoder: Encoder[GcpUserAgent] =
deriveConfiguredEncoder[GcpUserAgent]
}

object AdaptersSchemasEncoderDecoders {
implicit val adaptersSchemasDecoder: Decoder[AdaptersSchemas] =
deriveConfiguredDecoder[AdaptersSchemas]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,67 +34,6 @@ import com.snowplowanalytics.snowplow.enrich.common.SpecHelpers.adaptersSchemas

class ConfigFileSpec extends Specification with CatsIO {
"parse" should {
"parse reference example for PubSub" in {
val configPath = Paths.get(getClass.getResource("/config.pubsub.extended.hocon").toURI)
val expected = ConfigFile(
io.Input.PubSub("projects/test-project/subscriptions/collector-payloads-sub", 1, 3000, 50000000, 1.hour),
io.Outputs(
io.Output.PubSub("projects/test-project/topics/enriched", Some(Set("app_id")), 200.milliseconds, 1000, 8000000),
Some(io.Output.PubSub("projects/test-project/topics/pii", None, 200.milliseconds, 1000, 8000000)),
io.Output.PubSub("projects/test-project/topics/bad", None, 200.milliseconds, 1000, 8000000)
),
io.Concurrency(256, 3),
Some(7.days),
io.RemoteAdapterConfigs(
10.seconds,
45.seconds,
10,
List(
io.RemoteAdapterConfig("com.example", "v1", "https://remote-adapter.com")
)
),
io.Monitoring(
Some(Sentry(URI.create("http://sentry.acme.com"))),
io.MetricsReporters(
Some(io.MetricsReporters.StatsD("localhost", 8125, Map("app" -> "enrich"), 10.seconds, None)),
Some(io.MetricsReporters.Stdout(10.seconds, None)),
true
)
),
io.Telemetry(
false,
15.minutes,
"POST",
"collector-g.snowplowanalytics.com",
443,
true,
Some("my_pipeline"),
Some("hfy67e5ydhtrd"),
Some("665bhft5u6udjf"),
Some("enrich-kinesis-ce"),
Some("1.0.0")
),
io.FeatureFlags(
false,
false,
false
),
Some(
io.Experimental(
Some(
io.Metadata(
Uri.uri("https://my_pipeline.my_domain.com/iglu"),
5.minutes,
UUID.fromString("c5f3a09f-75f8-4309-bec5-fea560f78455"),
UUID.fromString("75a13583-5c99-40e3-81fc-541084dfc784")
)
)
)
),
adaptersSchemas
)
ConfigFile.parse[IO](configPath.asRight).value.map(result => result must beRight(expected))
}

"parse reference example for Kinesis" in {
val configPath = Paths.get(getClass.getResource("/config.kinesis.extended.hocon").toURI)
Expand Down Expand Up @@ -309,29 +248,41 @@ class ConfigFileSpec extends Specification with CatsIO {
"parallelPullCount": 1,
"maxQueueSize": 3000,
"maxRequestBytes": 50000000,
"maxAckExtensionPeriod": 1 hour
"maxAckExtensionPeriod": 1 hour,
"gcpUserAgent": {
"productName": "Snowplow OSS"
}
},
"output": {
"good": {
"type": "PubSub",
"topic": "projects/test-project/topics/good-topic",
"delayThreshold": "200 milliseconds",
"maxBatchSize": 1000,
"maxBatchBytes": 8000000
"maxBatchBytes": 8000000,
"gcpUserAgent": {
"productName": "Snowplow OSS"
}
},
"pii": {
"type": "PubSub",
"topic": "projects/test-project/topics/pii-topic",
"delayThreshold": "200 milliseconds",
"maxBatchSize": 1000,
"maxBatchBytes": 8000000
"maxBatchBytes": 8000000,
"gcpUserAgent": {
"productName": "Snowplow OSS"
}
},
"bad": {
"type": "PubSub",
"topic": "projects/test-project/topics/bad-topic",
"delayThreshold": "200 milliseconds",
"maxBatchSize": 1000,
"maxBatchBytes": 8000000
"maxBatchBytes": 8000000,
"gcpUserAgent": {
"productName": "Snowplow OSS"
}
}
},
"concurrency": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,41 @@ class ParsedConfigsSpec extends Specification with CatsIO {
val invalidAttr2 = "invalidAttr2"

val configFile = ConfigFile(
io.Input.PubSub("projects/test-project/subscriptions/inputSub", 1, 3000, 50000000, 1.hour),
io.Input.PubSub(
"projects/test-project/subscriptions/inputSub",
1,
3000,
50000000,
1.hour,
io.GcpUserAgent("Snowplow OSS")
),
io.Outputs(
io.Output.PubSub("projects/test-project/topics/good-topic", Some(Set("app_id", invalidAttr1)), 200.milliseconds, 1000, 10000000),
io.Output.PubSub(
"projects/test-project/topics/good-topic",
Some(Set("app_id", invalidAttr1)),
200.milliseconds,
1000,
10000000,
io.GcpUserAgent("Snowplow OSS")
),
Some(
io.Output.PubSub("projects/test-project/topics/pii-topic", Some(Set("app_id", invalidAttr2)), 200.milliseconds, 1000, 10000000)
io.Output.PubSub(
"projects/test-project/topics/pii-topic",
Some(Set("app_id", invalidAttr2)),
200.milliseconds,
1000,
10000000,
io.GcpUserAgent("Snowplow OSS")
)
),
io.Output.PubSub("projects/test-project/topics/bad-topic", None, 200.milliseconds, 1000, 10000000)
io.Output.PubSub(
"projects/test-project/topics/bad-topic",
None,
200.milliseconds,
1000,
10000000,
io.GcpUserAgent("Snowplow OSS")
)
),
io.Concurrency(10000, 64),
Some(7.days),
Expand Down Expand Up @@ -100,7 +128,14 @@ class ParsedConfigsSpec extends Specification with CatsIO {

"outputAttributes" should {
"fetch attribute values" in {
val output = io.Output.PubSub("projects/test-project/topics/good-topic", Some(Set("app_id")), 200.milliseconds, 1000, 10000000)
val output = io.Output.PubSub(
"projects/test-project/topics/good-topic",
Some(Set("app_id")),
200.milliseconds,
1000,
10000000,
io.GcpUserAgent("Snowplow OSS")
)
val ee = new EnrichedEvent()
ee.app_id = "test_app"

Expand Down
20 changes: 16 additions & 4 deletions modules/pubsub/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,21 @@
"parallelPullCount": 1
"maxQueueSize": 3000
"maxRequestBytes": 50000000
"maxAckExtensionPeriod": 1 hour
"maxAckExtensionPeriod": 1 hour,
"gcpUserAgent": {
"productName": "Snowplow OSS"
}
}

"output": {
"good": {
"type": "PubSub"
"delayThreshold": 200 milliseconds
"maxBatchSize": 1000
"maxBatchBytes": 8000000
"maxBatchBytes": 8000000,
"gcpUserAgent": {
"productName": "Snowplow OSS"
}
}

"pii": {
Expand All @@ -21,14 +27,20 @@
"topic": ""
"delayThreshold": 200 milliseconds
"maxBatchSize": 1000
"maxBatchBytes": 8000000
"maxBatchBytes": 8000000,
"gcpUserAgent": {
"productName": "Snowplow OSS"
}
}

"bad": {
"type": "PubSub"
"delayThreshold": 200 milliseconds
"maxBatchSize": 1000
"maxBatchBytes": 8000000
"maxBatchBytes": 8000000,
"gcpUserAgent": {
"productName": "Snowplow OSS"
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ object Sink {
batchSize = output.maxBatchSize,
requestByteThreshold = Some(output.maxBatchBytes),
delayThreshold = output.delayThreshold,
onFailedTerminate = err => Logger[F].error(err)("PubSub sink termination error")
onFailedTerminate = err => Logger[F].error(err)("PubSub sink termination error"),
customizePublisher = Some(_.setHeaderProvider(Utils.createPubsubUserAgentHeader(output.gcpUserAgent)))
)

GooglePubsubProducer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ object Source {
parallelPullCount = input.parallelPullCount,
maxQueueSize = input.maxQueueSize,
maxAckExtensionPeriod = input.maxAckExtensionPeriod,
customizeSubscriber = Some(builder => builder.setFlowControlSettings(flowControlSettings))
customizeSubscriber = Some {
_.setFlowControlSettings(flowControlSettings)
.setHeaderProvider(Utils.createPubsubUserAgentHeader(input.gcpUserAgent))
}
)

val projectId = Model.ProjectId(input.project)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (c) 2019-2023 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.enrich.pubsub

import com.google.api.gax.rpc.FixedHeaderProvider

import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.GcpUserAgent

object Utils {

def createPubsubUserAgentHeader(gcpUserAgent: GcpUserAgent): FixedHeaderProvider =
FixedHeaderProvider.create("user-agent", s"${gcpUserAgent.productName}/enrich (GPN:Snowplow;)")

}
Loading

0 comments on commit c8153c2

Please sign in to comment.