Skip to content

Commit

Permalink
Add SLUL acceptance flag
Browse files Browse the repository at this point in the history
If user doesn't accept license, error message looks like this:

```
[io-compute-blocker-4] ERROR com.snowplowanalytics.snowplow.bigquery.Run - Cannot resolve config: DecodingFailure at .license: Please accept the terms of the Snowplow Limited Use License Agreement to proceed. See https://docs.snowplow.io/limited-use-license-1.0/ for more information on the license and how to configure this.
```

License model is brought by new common-streams version which also provides oauth2 stuff for kafka
  • Loading branch information
pondzix committed Mar 12, 2024
1 parent 7ae3f3c commit 6fce84d
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 19 deletions.
5 changes: 5 additions & 0 deletions modules/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
{

license {
accept = false
accept = ${?ACCEPT_LIMITED_USE_LICENSE}
}

"gcpUserAgent": {
"productName": "Snowplow OSS"
"productName": ${?GCP_USER_AGENT_PRODUCT_NAME}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ import com.comcast.ip4s.Port
import org.http4s.{ParseFailure, Uri}

import scala.concurrent.duration.FiniteDuration

import com.snowplowanalytics.iglu.client.resolver.Resolver.ResolverConfig
import com.snowplowanalytics.snowplow.runtime.{Metrics => CommonMetrics, Telemetry}
import com.snowplowanalytics.snowplow.runtime.{AcceptedLicense, Metrics => CommonMetrics, Telemetry}
import com.snowplowanalytics.snowplow.runtime.HealthProbe.decoders._

case class Config[+Source, +Sink](
Expand All @@ -29,7 +28,8 @@ case class Config[+Source, +Sink](
batching: Config.Batching,
retries: Config.Retries,
telemetry: Telemetry.Config,
monitoring: Config.Monitoring
monitoring: Config.Monitoring,
license: AcceptedLicense
)

object Config {
Expand Down Expand Up @@ -117,6 +117,11 @@ object Config {
implicit val alterTableRetries = deriveConfiguredDecoder[AlterTableWaitRetries]
implicit val transientRetries = deriveConfiguredDecoder[TransientErrorRetries]
implicit val retriesDecoder = deriveConfiguredDecoder[Retries]

// TODO add bigquery docs
implicit val licenseDecoder =
AcceptedLicense.decoder(AcceptedLicense.DocumentationLink("https://docs.snowplow.io/limited-use-license-1.0/"))

deriveConfiguredDecoder[Config[Source, Sink]]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,19 @@
*/
package com.snowplowanalytics.snowplow.bigquery

import com.snowplowanalytics.snowplow.azure.AzureAuthenticationCallbackHandler
import com.snowplowanalytics.snowplow.sources.kafka.{KafkaSource, KafkaSourceConfig}
import com.snowplowanalytics.snowplow.sinks.kafka.{KafkaSink, KafkaSinkConfig}

import scala.reflect.classTag

class SourceAuthHandler extends AzureAuthenticationCallbackHandler

class SinkAuthHandler extends AzureAuthenticationCallbackHandler

object AzureApp extends LoaderApp[KafkaSourceConfig, KafkaSinkConfig](BuildInfo) {

override def source: SourceProvider = KafkaSource.build(_)
override def source: SourceProvider = KafkaSource.build(_, classTag[SourceAuthHandler])

override def badSink: SinkProvider = KafkaSink.resource(_)
override def badSink: SinkProvider = KafkaSink.resource(_, classTag[SinkAuthHandler])
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import cats.effect.{ExitCode, IO}
import com.comcast.ip4s.Port
import com.snowplowanalytics.snowplow.bigquery.Config.GcpUserAgent
import com.snowplowanalytics.snowplow.runtime.Metrics.StatsdConfig
import com.snowplowanalytics.snowplow.runtime.{ConfigParser, Telemetry}
import com.snowplowanalytics.snowplow.runtime.{AcceptedLicense, ConfigParser, Telemetry}
import com.snowplowanalytics.snowplow.sinks.kafka.KafkaSinkConfig
import com.snowplowanalytics.snowplow.sources.kafka.KafkaSourceConfig
import org.http4s.implicits.http4sLiteralsSyntax
Expand Down Expand Up @@ -65,7 +65,10 @@ object KafkaConfigSpec {
consumerConf = Map(
"group.id" -> "snowplow-bigquery-loader",
"allow.auto.create.topics" -> "false",
"auto.offset.reset" -> "latest"
"auto.offset.reset" -> "latest",
"security.protocol" -> "SASL_SSL",
"sasl.mechanism" -> "OAUTHBEARER",
"sasl.jaas.config" -> "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
)
),
output = Config.Output(
Expand All @@ -81,7 +84,10 @@ object KafkaConfigSpec {
topicName = "sp-dev-bad",
bootstrapServers = "localhost:9092",
producerConf = Map(
"client.id" -> "snowplow-bigquery-loader"
"client.id" -> "snowplow-bigquery-loader",
"security.protocol" -> "SASL_SSL",
"sasl.mechanism" -> "OAUTHBEARER",
"sasl.jaas.config" -> "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
)
),
maxRecordSize = 1000000
Expand Down Expand Up @@ -114,7 +120,8 @@ object KafkaConfigSpec {
sentry = None,
healthProbe = Config.HealthProbe(port = Port.fromInt(8000).get, unhealthyLatency = 5.minutes),
webhook = None
)
),
license = AcceptedLicense()
)

private val extendedConfig = Config[KafkaSourceConfig, KafkaSinkConfig](
Expand All @@ -125,7 +132,10 @@ object KafkaConfigSpec {
"group.id" -> "snowplow-bigquery-loader",
"enable.auto.commit" -> "false",
"allow.auto.create.topics" -> "false",
"auto.offset.reset" -> "earliest"
"auto.offset.reset" -> "earliest",
"security.protocol" -> "SASL_SSL",
"sasl.mechanism" -> "OAUTHBEARER",
"sasl.jaas.config" -> "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
)
),
output = Config.Output(
Expand All @@ -141,7 +151,10 @@ object KafkaConfigSpec {
topicName = "sp-dev-bad",
bootstrapServers = "localhost:9092",
producerConf = Map(
"client.id" -> "snowplow-bigquery-loader"
"client.id" -> "snowplow-bigquery-loader",
"security.protocol" -> "SASL_SSL",
"sasl.mechanism" -> "OAUTHBEARER",
"sasl.jaas.config" -> "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
)
),
maxRecordSize = 1000000
Expand Down Expand Up @@ -187,6 +200,7 @@ object KafkaConfigSpec {
unhealthyLatency = 5.minutes
),
webhook = Some(Config.Webhook(endpoint = uri"https://webhook.acme.com", tags = Map("pipeline" -> "production")))
)
),
license = AcceptedLicense()
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import cats.effect.{ExitCode, IO}
import com.comcast.ip4s.Port
import com.snowplowanalytics.snowplow.bigquery.Config.GcpUserAgent
import com.snowplowanalytics.snowplow.runtime.Metrics.StatsdConfig
import com.snowplowanalytics.snowplow.runtime.{ConfigParser, Telemetry}
import com.snowplowanalytics.snowplow.runtime.{AcceptedLicense, ConfigParser, Telemetry}
import com.snowplowanalytics.snowplow.sinks.kinesis.{BackoffPolicy, KinesisSinkConfig}
import com.snowplowanalytics.snowplow.sources.kinesis.KinesisSourceConfig
import eu.timepit.refined.types.all.PosInt
Expand Down Expand Up @@ -116,7 +116,8 @@ object KinesisConfigSpec {
sentry = None,
healthProbe = Config.HealthProbe(port = Port.fromInt(8000).get, unhealthyLatency = 5.minutes),
webhook = None
)
),
license = AcceptedLicense()
)

private val extendedConfig = Config[KinesisSourceConfig, KinesisSinkConfig](
Expand Down Expand Up @@ -189,6 +190,7 @@ object KinesisConfigSpec {
unhealthyLatency = 5.minutes
),
webhook = Some(Config.Webhook(endpoint = uri"https://webhook.acme.com", tags = Map("pipeline" -> "production")))
)
),
license = AcceptedLicense()
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import com.comcast.ip4s.Port
import com.snowplowanalytics.snowplow.bigquery.Config.GcpUserAgent
import com.snowplowanalytics.snowplow.pubsub.{GcpUserAgent => PubsubUserAgent}
import com.snowplowanalytics.snowplow.runtime.Metrics.StatsdConfig
import com.snowplowanalytics.snowplow.runtime.{ConfigParser, Telemetry}
import com.snowplowanalytics.snowplow.runtime.{AcceptedLicense, ConfigParser, Telemetry}
import com.snowplowanalytics.snowplow.sinks.pubsub.PubsubSinkConfig
import com.snowplowanalytics.snowplow.sources.pubsub.PubsubSourceConfig
import org.http4s.implicits.http4sLiteralsSyntax
Expand Down Expand Up @@ -115,7 +115,8 @@ object PubsubConfigSpec {
sentry = None,
healthProbe = Config.HealthProbe(port = Port.fromInt(8000).get, unhealthyLatency = 5.minutes),
webhook = None
)
),
license = AcceptedLicense()
)

private val extendedConfig = Config[PubsubSourceConfig, PubsubSinkConfig](
Expand Down Expand Up @@ -187,6 +188,7 @@ object PubsubConfigSpec {
unhealthyLatency = 5.minutes
),
webhook = Some(Config.Webhook(endpoint = uri"https://webhook.acme.com", tags = Map("pipeline" -> "production")))
)
),
license = AcceptedLicense()
)
}
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ object Dependencies {
val bigquery = "2.34.2"

// Snowplow
val streams = "0.3.0"
val streams = "0.5.0-M1"
val igluClient = "3.0.0"

// tests
Expand Down

0 comments on commit 6fce84d

Please sign in to comment.