diff --git a/build.sbt b/build.sbt index 072cd428a..45f173257 100644 --- a/build.sbt +++ b/build.sbt @@ -41,6 +41,29 @@ lazy val commonFs2 = project .settings(addCompilerPlugin(betterMonadicFor)) .dependsOn(common % "test->test;compile->compile") +lazy val awsUtils = project + .in(file("modules/cloudutils/aws")) + .enablePlugins(BuildInfoPlugin) + .settings(awsUtilsBuildSettings) + .settings(libraryDependencies ++= awsUtilsDependencies) + .settings(addCompilerPlugin(betterMonadicFor)) + .dependsOn(commonFs2 % "test->test;compile->compile") + +lazy val gcpUtils = project + .in(file("modules/cloudutils/gcp")) + .enablePlugins(BuildInfoPlugin) + .settings(gcpUtilsBuildSettings) + .settings(libraryDependencies ++= gcpUtilsDependencies) + .settings(addCompilerPlugin(betterMonadicFor)) + .dependsOn(commonFs2 % "test->test;compile->compile") + +lazy val azureUtils = project + .in(file("modules/cloudutils/azure")) + .enablePlugins(BuildInfoPlugin) + .settings(azureUtilsBuildSettings) + .settings(libraryDependencies ++= azureUtilsDependencies) + .settings(addCompilerPlugin(betterMonadicFor)) + .dependsOn(commonFs2 % "test->test;compile->compile") lazy val pubsub = project .in(file("modules/pubsub")) @@ -50,6 +73,7 @@ lazy val pubsub = project .settings(excludeDependencies ++= exclusions) .settings(addCompilerPlugin(betterMonadicFor)) .dependsOn(commonFs2 % "test->test;compile->compile") + .dependsOn(gcpUtils % "compile->compile;it->it") lazy val pubsubDistroless = project .in(file("modules/distroless/pubsub")) @@ -60,7 +84,7 @@ lazy val pubsubDistroless = project .settings(excludeDependencies ++= exclusions) .settings(addCompilerPlugin(betterMonadicFor)) .dependsOn(commonFs2 % "test->test;compile->compile") - + .dependsOn(gcpUtils % "compile->compile;it->it") lazy val kinesis = project .in(file("modules/kinesis")) @@ -69,7 +93,8 @@ lazy val kinesis = project .settings(libraryDependencies ++= kinesisDependencies) .settings(excludeDependencies ++= exclusions) .settings(addCompilerPlugin(betterMonadicFor)) - .dependsOn(commonFs2) + .dependsOn(commonFs2 % "test->test;compile->compile") + .dependsOn(awsUtils % "compile->compile;it->it") lazy val kinesisDistroless = project .in(file("modules/distroless/kinesis")) @@ -84,6 +109,7 @@ lazy val kinesisDistroless = project .settings(excludeDependencies ++= exclusions) .settings(addCompilerPlugin(betterMonadicFor)) .dependsOn(commonFs2 % "compile->compile;it->it") + .dependsOn(awsUtils % "compile->compile;it->it") .settings(Defaults.itSettings) .configs(IntegrationTest) .settings((IntegrationTest / test) := (IntegrationTest / test).dependsOn(Docker / publishLocal).value) @@ -101,7 +127,10 @@ lazy val kafka = project .settings(Defaults.itSettings) .configs(IntegrationTest) .settings(addCompilerPlugin(betterMonadicFor)) - .dependsOn(commonFs2 % "compile->compile;it->it") + .dependsOn(commonFs2 % "compile->compile;test->test;it->it") + .dependsOn(awsUtils % "compile->compile;it->it") + .dependsOn(gcpUtils % "compile->compile;it->it") + .dependsOn(azureUtils % "compile->compile;it->it") lazy val kafkaDistroless = project .in(file("modules/distroless/kafka")) @@ -112,6 +141,9 @@ lazy val kafkaDistroless = project .settings(excludeDependencies ++= exclusions) .settings(addCompilerPlugin(betterMonadicFor)) .dependsOn(commonFs2) + .dependsOn(awsUtils % "compile->compile;it->it") + .dependsOn(gcpUtils % "compile->compile;it->it") + .dependsOn(azureUtils % "compile->compile;it->it") lazy val nsq = project .in(file("modules/nsq")) @@ -120,7 +152,10 @@ lazy val nsq = project .settings(libraryDependencies ++= nsqDependencies) .settings(excludeDependencies ++= exclusions) .settings(addCompilerPlugin(betterMonadicFor)) - .dependsOn(commonFs2) + .dependsOn(commonFs2 % "compile->compile;test->test") + .dependsOn(awsUtils % "compile->compile;it->it") + .dependsOn(gcpUtils % "compile->compile;it->it") + .dependsOn(azureUtils % "compile->compile;it->it") lazy val nsqDistroless = project .in(file("modules/distroless/nsq")) @@ -135,6 +170,9 @@ lazy val nsqDistroless = project .settings(excludeDependencies ++= exclusions) .settings(addCompilerPlugin(betterMonadicFor)) .dependsOn(commonFs2 % "compile->compile;it->it") + .dependsOn(awsUtils % "compile->compile;it->it") + .dependsOn(gcpUtils % "compile->compile;it->it") + .dependsOn(azureUtils % "compile->compile;it->it") .settings(Defaults.itSettings) .configs(IntegrationTest) .settings((IntegrationTest / test) := (IntegrationTest / test).dependsOn(Docker / publishLocal).value) diff --git a/modules/nsq/src/main/scala/com/snowplowanalytics/snowplow/enrich/nsq/S3Client.scala b/modules/cloudutils/aws/src/main/scala/com/snowplowanalytics/snowplow/enrich/aws/S3Client.scala similarity index 95% rename from modules/nsq/src/main/scala/com/snowplowanalytics/snowplow/enrich/nsq/S3Client.scala rename to modules/cloudutils/aws/src/main/scala/com/snowplowanalytics/snowplow/enrich/aws/S3Client.scala index d53a57322..1d73ca3cb 100644 --- a/modules/nsq/src/main/scala/com/snowplowanalytics/snowplow/enrich/nsq/S3Client.scala +++ b/modules/cloudutils/aws/src/main/scala/com/snowplowanalytics/snowplow/enrich/aws/S3Client.scala @@ -10,7 +10,7 @@ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. */ -package com.snowplowanalytics.snowplow.enrich.nsq +package com.snowplowanalytics.snowplow.enrich.aws import java.net.URI @@ -36,7 +36,8 @@ object S3Client { s3Client <- Resource.fromAutoCloseable(ConcurrentEffect[F].delay(S3AsyncClient.builder().region(getRegion).build())) store <- Resource.eval(S3Store.builder[F](s3Client).build.toEither.leftMap(_.head).pure[F].rethrow) } yield new Client[F] { - val prefixes = List("s3") + def canDownload(uri: URI): Boolean = + uri.getScheme == "s3" def download(uri: URI): Stream[F, Byte] = Stream.eval(Url.parseF[F](uri.toString)).flatMap { url => @@ -64,3 +65,4 @@ object S3Client { Region.EU_CENTRAL_1 } } + diff --git a/modules/cloudutils/azure/src/main/scala/com/snowplowanalytics/snowplow/enrich/azure/AzureStorageClient.scala b/modules/cloudutils/azure/src/main/scala/com/snowplowanalytics/snowplow/enrich/azure/AzureStorageClient.scala new file mode 100644 index 000000000..0cf84afff --- /dev/null +++ b/modules/cloudutils/azure/src/main/scala/com/snowplowanalytics/snowplow/enrich/azure/AzureStorageClient.scala @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2023 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.azure + +import blobstore.azure.AzureStore +import blobstore.url.exception.{AuthorityParseError, MultipleUrlValidationException, Throwables} +import blobstore.url.{Authority, Path, Url} +import cats.data.Validated.{Invalid, Valid} +import cats.data.ValidatedNec +import cats.effect._ +import cats.implicits._ +import com.azure.identity.DefaultAzureCredentialBuilder +import com.azure.storage.blob.{BlobServiceClientBuilder, BlobUrlParts} +import fs2.Stream +import java.net.URI +import com.snowplowanalytics.snowplow.enrich.common.fs2.io.Clients.Client + +object AzureStorageClient { + + def mk[F[_]: ConcurrentEffect](storageAccountName: String): Resource[F, Client[F]] = + for { + store <- createStore(storageAccountName) + } yield new Client[F] { + def canDownload(uri: URI): Boolean = + uri.toString.contains("core.windows.net") + + def download(uri: URI): Stream[F, Byte] = + createStorageUrlFrom(uri.toString) match { + case Valid(url) => store.get(url, 16 * 1024) + case Invalid(errors) => Stream.raiseError[F](MultipleUrlValidationException(errors)) + } + } + + private def createStore[F[_]: ConcurrentEffect: Async](storageAccountName: String): Resource[F, AzureStore[F]] = + for { + client <- Resource.eval { + ConcurrentEffect[F].delay { + val builder = new BlobServiceClientBuilder().credential(new DefaultAzureCredentialBuilder().build) + val storageEndpoint = createStorageEndpoint(storageAccountName) + builder.endpoint(storageEndpoint).buildAsyncClient() + } + } + store <- AzureStore + .builder[F](client) + .build + .fold( + errors => Resource.eval(ConcurrentEffect[F].raiseError(errors.reduce(Throwables.collapsingSemigroup))), + s => Resource.pure[F, AzureStore[F]](s) + ) + } yield store + + private def createStorageUrlFrom(input: String): ValidatedNec[AuthorityParseError, Url[String]] = { + val inputParts = BlobUrlParts.parse(input) + Authority + .parse(inputParts.getBlobContainerName) + .map(authority => Url(inputParts.getScheme, authority, Path(inputParts.getBlobName))) + } + + private def createStorageEndpoint(storageAccountName: String): String = + s"https://$storageAccountName.blob.core.windows.net" +} diff --git a/modules/nsq/src/main/scala/com/snowplowanalytics/snowplow/enrich/nsq/GcsClient.scala b/modules/cloudutils/gcp/src/main/scala/com/snowplowanalytics/snowplow/enrich/gcp/GcsClient.scala similarity index 94% rename from modules/nsq/src/main/scala/com/snowplowanalytics/snowplow/enrich/nsq/GcsClient.scala rename to modules/cloudutils/gcp/src/main/scala/com/snowplowanalytics/snowplow/enrich/gcp/GcsClient.scala index 7bac98532..d54acca0a 100644 --- a/modules/nsq/src/main/scala/com/snowplowanalytics/snowplow/enrich/nsq/GcsClient.scala +++ b/modules/cloudutils/gcp/src/main/scala/com/snowplowanalytics/snowplow/enrich/gcp/GcsClient.scala @@ -10,7 +10,7 @@ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. */ -package com.snowplowanalytics.snowplow.enrich.nsq +package com.snowplowanalytics.snowplow.enrich.gcp import java.net.URI @@ -32,10 +32,10 @@ object GcsClient { def mk[F[_]: ConcurrentEffect: ContextShift: Timer](blocker: Blocker): F[Client[F]] = ConcurrentEffect[F].delay(StorageOptions.getDefaultInstance.getService).map { service => new Client[F] { - val prefixes = List("gs") - val store = GcsStore.builder(service, blocker).unsafe + def canDownload(uri: URI): Boolean = uri.getScheme == "gs" + def download(uri: URI): Stream[F, Byte] = Stream.eval(Url.parseF[F](uri.toString)).flatMap { url => store diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Run.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Run.scala index d4248fc8e..4d5ace1b6 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Run.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Run.scala @@ -28,7 +28,15 @@ import retry.syntax.all._ import com.snowplowanalytics.snowplow.badrows.Processor -import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{BackoffPolicy, Cloud, Input, Monitoring, Output, RetryCheckpointing} +import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{ + BackoffPolicy, + BlobStorageClients, + Cloud, + Input, + Monitoring, + Output, + RetryCheckpointing +} import com.snowplowanalytics.snowplow.enrich.common.fs2.config.{CliConfig, ParsedConfigs} import com.snowplowanalytics.snowplow.enrich.common.fs2.io.{FileSink, Retries, Source} import com.snowplowanalytics.snowplow.enrich.common.fs2.io.Clients.Client @@ -49,7 +57,7 @@ object Run { mkSinkPii: (Blocker, Output) => Resource[F, AttributedByteSink[F]], mkSinkBad: (Blocker, Output) => Resource[F, ByteSink[F]], checkpoint: List[A] => F[Unit], - mkClients: List[Blocker => Resource[F, Client[F]]], + mkClients: BlobStorageClients => List[Blocker => Resource[F, Client[F]]], getPayload: A => Array[Byte], maxRecordSize: Int, cloud: Option[Cloud], @@ -79,7 +87,7 @@ object Run { case _ => mkSinkBad(blocker, file.output.bad) } - clients = mkClients.map(mk => mk(blocker)).sequence + clients = mkClients(file.blobStorage).map(mk => mk(blocker)).sequence exit <- file.input match { case p: Input.FileSystem => val env = Environment diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFile.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFile.scala index c6a2f8143..a37354fba 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFile.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFile.scala @@ -49,7 +49,8 @@ final case class ConfigFile( telemetry: Telemetry, featureFlags: FeatureFlags, experimental: Option[Experimental], - adaptersSchemas: AdaptersSchemas + adaptersSchemas: AdaptersSchemas, + blobStorage: BlobStorageClients ) object ConfigFile { @@ -61,16 +62,17 @@ object ConfigFile { implicit val configFileDecoder: Decoder[ConfigFile] = deriveConfiguredDecoder[ConfigFile].emap { - case ConfigFile(_, _, _, Some(aup), _, _, _, _, _, _) if aup._1 <= 0L => + case ConfigFile(_, _, _, Some(aup), _, _, _, _, _, _, _) if aup._1 <= 0L => "assetsUpdatePeriod in config file cannot be less than 0".asLeft // TODO: use newtype // Remove pii output if streamName and region empty - case c @ ConfigFile(_, Outputs(good, Some(output: Output.Kinesis), bad), _, _, _, _, _, _, _, _) if output.streamName.isEmpty => + case c @ ConfigFile(_, Outputs(good, Some(output: Output.Kinesis), bad), _, _, _, _, _, _, _, _, _) if output.streamName.isEmpty => c.copy(output = Outputs(good, None, bad)).asRight // Remove pii output if topic empty - case c @ ConfigFile(_, Outputs(good, Some(Output.PubSub(t, _, _, _, _, _)), bad), _, _, _, _, _, _, _, _) if t.isEmpty => + case c @ ConfigFile(_, Outputs(good, Some(Output.PubSub(t, _, _, _, _, _)), bad), _, _, _, _, _, _, _, _, _) if t.isEmpty => c.copy(output = Outputs(good, None, bad)).asRight // Remove pii output if topic empty - case c @ ConfigFile(_, Outputs(good, Some(Output.Kafka(topicName, _, _, _, _)), bad), _, _, _, _, _, _, _, _) if topicName.isEmpty => + case c @ ConfigFile(_, Outputs(good, Some(Output.Kafka(topicName, _, _, _, _)), bad), _, _, _, _, _, _, _, _, _) + if topicName.isEmpty => c.copy(output = Outputs(good, None, bad)).asRight case other => other.asRight } diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala index 6cef4f1df..44a757ad0 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala @@ -486,6 +486,24 @@ object io { deriveConfiguredEncoder[GcpUserAgent] } + case class BlobStorageClients( + gcs: Boolean, + s3: Boolean, + azureStorage: Option[BlobStorageClients.AzureStorage] + ) + object BlobStorageClients { + case class AzureStorage(storageAccountName: String) + + implicit val azureStorageDecoder: Decoder[AzureStorage] = + deriveConfiguredDecoder[AzureStorage] + implicit val azureStorageEncoder: Encoder[AzureStorage] = + deriveConfiguredEncoder[AzureStorage] + implicit val blobStorageClientDecoder: Decoder[BlobStorageClients] = + deriveConfiguredDecoder[BlobStorageClients] + implicit val blobStorageClientEncoder: Encoder[BlobStorageClients] = + deriveConfiguredEncoder[BlobStorageClients] + } + object AdaptersSchemasEncoderDecoders { implicit val adaptersSchemasDecoder: Decoder[AdaptersSchemas] = deriveConfiguredDecoder[AdaptersSchemas] diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/io/Clients.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/io/Clients.scala index 929fb872a..efb781c3c 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/io/Clients.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/io/Clients.scala @@ -33,7 +33,7 @@ case class Clients[F[_]: ConcurrentEffect](clients: List[Client[F]]) { /** Download a URI as a stream of bytes, using the appropriate client */ def download(uri: URI): Stream[F, Byte] = - clients.find(_.prefixes.contains(uri.getScheme())) match { + clients.find(_.canDownload(uri)) match { case Some(client) => client.download(uri) case None => @@ -47,7 +47,10 @@ object Clients { def wrapHttpClient[F[_]: ConcurrentEffect](client: Http4sClient[F]): Client[F] = new Client[F] { - val prefixes = List("http", "https") + def canDownload(uri: URI): Boolean = + // Since Azure Blob Storage urls' scheme are https as well and we want to fetch them with + // their own client, we added second condition to not pick up those urls + (uri.getScheme == "http" || uri.getScheme == "https") && !uri.toString.contains("core.windows.net") def download(uri: URI): Stream[F, Byte] = { val request = Request[F](uri = Uri.unsafeFromString(uri.toString)) @@ -97,7 +100,7 @@ object Clients { } trait Client[F[_]] { - val prefixes: List[String] + def canDownload(uri: URI): Boolean def download(uri: URI): Stream[F, Byte] } } diff --git a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFileSpec.scala b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFileSpec.scala index 152a269b5..1ca51b49d 100644 --- a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFileSpec.scala +++ b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFileSpec.scala @@ -12,12 +12,8 @@ */ package com.snowplowanalytics.snowplow.enrich.common.fs2.config -import java.net.URI -import java.util.UUID import java.nio.file.Paths -import scala.concurrent.duration._ - import cats.syntax.either._ import cats.effect.IO @@ -25,220 +21,10 @@ import cats.effect.testing.specs2.CatsIO import com.typesafe.config.ConfigFactory -import org.http4s.Uri - import org.specs2.mutable.Specification -import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.BackoffPolicy -import com.snowplowanalytics.snowplow.enrich.common.SpecHelpers.adaptersSchemas - class ConfigFileSpec extends Specification with CatsIO { "parse" should { - - "parse reference example for Kinesis" in { - val configPath = Paths.get(getClass.getResource("/config.kinesis.extended.hocon").toURI) - val expected = ConfigFile( - io.Input.Kinesis( - "snowplow-enrich-kinesis", - "collector-payloads", - Some("eu-central-1"), - io.Input.Kinesis.InitPosition.TrimHorizon, - io.Input.Kinesis.Retrieval.Polling(10000), - 3, - io.BackoffPolicy(100.milli, 10.second, Some(10)), - None, - None, - None - ), - io.Outputs( - io.Output.Kinesis( - "enriched", - Some("eu-central-1"), - None, - io.BackoffPolicy(100.millis, 10.seconds, Some(10)), - io.BackoffPolicy(100.millis, 1.second, None), - 500, - 5242880, - None - ), - Some( - io.Output.Kinesis( - "pii", - Some("eu-central-1"), - None, - io.BackoffPolicy(100.millis, 10.seconds, Some(10)), - io.BackoffPolicy(100.millis, 1.second, None), - 500, - 5242880, - None - ) - ), - io.Output.Kinesis( - "bad", - Some("eu-central-1"), - None, - io.BackoffPolicy(100.millis, 10.seconds, Some(10)), - io.BackoffPolicy(100.millis, 1.second, None), - 500, - 5242880, - None - ) - ), - io.Concurrency(256, 1), - Some(7.days), - io.RemoteAdapterConfigs( - 10.seconds, - 45.seconds, - 10, - List( - io.RemoteAdapterConfig("com.example", "v1", "https://remote-adapter.com") - ) - ), - io.Monitoring( - Some(Sentry(URI.create("http://sentry.acme.com"))), - io.MetricsReporters( - Some(io.MetricsReporters.StatsD("localhost", 8125, Map("app" -> "enrich"), 10.seconds, None)), - Some(io.MetricsReporters.Stdout(10.seconds, None)), - true - ) - ), - io.Telemetry( - false, - 15.minutes, - "POST", - "collector-g.snowplowanalytics.com", - 443, - true, - Some("my_pipeline"), - Some("hfy67e5ydhtrd"), - Some("665bhft5u6udjf"), - Some("enrich-kinesis-ce"), - Some("1.0.0") - ), - io.FeatureFlags( - false, - false, - false - ), - Some( - io.Experimental( - Some( - io.Metadata( - Uri.uri("https://my_pipeline.my_domain.com/iglu"), - 5.minutes, - UUID.fromString("c5f3a09f-75f8-4309-bec5-fea560f78455"), - UUID.fromString("75a13583-5c99-40e3-81fc-541084dfc784") - ) - ) - ) - ), - adaptersSchemas - ) - ConfigFile.parse[IO](configPath.asRight).value.map(result => result must beRight(expected)) - } - - "parse reference example for NSQ" in { - val configPath = Paths.get(getClass.getResource("/config.nsq.extended.hocon").toURI) - val expected = ConfigFile( - io.Input.Nsq( - "collector-payloads", - "collector-payloads-channel", - "127.0.0.1", - 4161, - 3000, - BackoffPolicy( - minBackoff = 100.milliseconds, - maxBackoff = 10.seconds, - maxRetries = Some(10) - ) - ), - io.Outputs( - io.Output.Nsq( - "enriched", - "127.0.0.1", - 4150, - BackoffPolicy( - minBackoff = 100.milliseconds, - maxBackoff = 10.seconds, - maxRetries = Some(10) - ) - ), - Some( - io.Output.Nsq( - "pii", - "127.0.0.1", - 4150, - BackoffPolicy( - minBackoff = 100.milliseconds, - maxBackoff = 10.seconds, - maxRetries = Some(10) - ) - ) - ), - io.Output.Nsq( - "bad", - "127.0.0.1", - 4150, - BackoffPolicy( - minBackoff = 100.milliseconds, - maxBackoff = 10.seconds, - maxRetries = Some(10) - ) - ) - ), - io.Concurrency(256, 3), - Some(7.days), - io.RemoteAdapterConfigs( - 10.seconds, - 45.seconds, - 10, - List( - io.RemoteAdapterConfig("com.example", "v1", "https://remote-adapter.com") - ) - ), - io.Monitoring( - Some(Sentry(URI.create("http://sentry.acme.com"))), - io.MetricsReporters( - Some(io.MetricsReporters.StatsD("localhost", 8125, Map("app" -> "enrich"), 10.seconds, None)), - Some(io.MetricsReporters.Stdout(10.seconds, None)), - true - ) - ), - io.Telemetry( - false, - 15.minutes, - "POST", - "collector-g.snowplowanalytics.com", - 443, - true, - Some("my_pipeline"), - Some("hfy67e5ydhtrd"), - Some("665bhft5u6udjf"), - Some("enrich-nsq-ce"), - Some("1.0.0") - ), - io.FeatureFlags( - false, - false, - false - ), - Some( - io.Experimental( - Some( - io.Metadata( - Uri.uri("https://my_pipeline.my_domain.com/iglu"), - 5.minutes, - UUID.fromString("c5f3a09f-75f8-4309-bec5-fea560f78455"), - UUID.fromString("75a13583-5c99-40e3-81fc-541084dfc784") - ) - ) - ) - ), - adaptersSchemas - ) - ConfigFile.parse[IO](configPath.asRight).value.map(result => result must beRight(expected)) - } - "parse valid 0 minutes as None" in { val input = """{ @@ -317,6 +103,10 @@ class ConfigFileSpec extends Specification with CatsIO { "organizationId": "c5f3a09f-75f8-4309-bec5-fea560f78455", "pipelineId": "75a13583-5c99-40e3-81fc-541084dfc784" } + }, + "blobStorage": { + "gcs": true + "s3": true } }""" @@ -380,6 +170,10 @@ class ConfigFileSpec extends Specification with CatsIO { "acceptInvalid": false, "legacyEnrichmentOrder": false, "tryBase64Decoding": false + }, + "blobStorage": { + "gcs": true, + "s3": true } }""" diff --git a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigsSpec.scala b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigsSpec.scala index b42684f20..1817660fc 100644 --- a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigsSpec.scala +++ b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigsSpec.scala @@ -26,6 +26,7 @@ import org.specs2.mutable.Specification import com.typesafe.config.ConfigFactory +import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.BlobStorageClients import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent import com.snowplowanalytics.snowplow.enrich.common.SpecHelpers.adaptersSchemas @@ -119,7 +120,8 @@ class ParsedConfigsSpec extends Specification with CatsIO { ) ) ), - adaptersSchemas + adaptersSchemas, + BlobStorageClients(gcs = false, s3 = true, azureStorage = None) ) ParsedConfigs.validateConfig[IO](configFile).value.map(result => result must beLeft) diff --git a/modules/kafka/src/main/resources/application.conf b/modules/kafka/src/main/resources/application.conf index 5bbf2496e..4efa7e45f 100644 --- a/modules/kafka/src/main/resources/application.conf +++ b/modules/kafka/src/main/resources/application.conf @@ -71,4 +71,9 @@ "legacyEnrichmentOrder": false "tryBase64Decoding": false } + + "blobStorage": { + "gcs": false + "s3": false + } } diff --git a/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Main.scala b/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Main.scala index 29e0a5de0..76ae811c1 100644 --- a/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Main.scala +++ b/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Main.scala @@ -17,9 +17,14 @@ import java.util.concurrent.{Executors, TimeUnit} import scala.concurrent.ExecutionContext import cats.{Applicative, Parallel} import cats.implicits._ -import cats.effect.{ExitCode, IO, IOApp, Resource, SyncIO} +import cats.effect.{Blocker, ExitCode, IO, IOApp, Resource, SyncIO} import fs2.kafka.CommittableConsumerRecord import com.snowplowanalytics.snowplow.enrich.common.fs2.Run +import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{BlobStorageClients => BlobStorageClientsConfig} +import com.snowplowanalytics.snowplow.enrich.common.fs2.io.Clients.Client +import com.snowplowanalytics.snowplow.enrich.aws.S3Client +import com.snowplowanalytics.snowplow.enrich.gcp.GcsClient +import com.snowplowanalytics.snowplow.enrich.azure.AzureStorageClient import com.snowplowanalytics.snowplow.enrich.kafka.generated.BuildInfo object Main extends IOApp.WithContext { @@ -57,7 +62,7 @@ object Main extends IOApp.WithContext { (blocker, out) => Sink.initAttributed(blocker, out), (blocker, out) => Sink.init(blocker, out), checkpoint, - List.empty, + createBlobStorageClient, _.record.value, MaxRecordSize, None, @@ -73,4 +78,11 @@ object Main extends IOApp.WithContext { .values .toList .parTraverse_(_.offset.commit) + + private def createBlobStorageClient(conf: BlobStorageClientsConfig): List[Blocker => Resource[IO, Client[IO]]] = { + val gcs = if (conf.gcs) Some((b: Blocker) => Resource.eval(GcsClient.mk[IO](b))) else None + val aws = if (conf.s3) Some((_: Blocker) => S3Client.mk[IO]) else None + val azure = conf.azureStorage.map(s => (_: Blocker) => AzureStorageClient.mk[IO](s.storageAccountName)) + List(gcs, aws, azure).flatten + } } diff --git a/modules/kafka/src/test/scala/com/snowplowanalytics/snowplow/enrich/kafka/ConfigSpec.scala b/modules/kafka/src/test/scala/com/snowplowanalytics/snowplow/enrich/kafka/ConfigSpec.scala new file mode 100644 index 000000000..b439eb63b --- /dev/null +++ b/modules/kafka/src/test/scala/com/snowplowanalytics/snowplow/enrich/kafka/ConfigSpec.scala @@ -0,0 +1,200 @@ +/* + * Copyright (c) 2019-2023 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.kafka + +import java.net.URI +import java.util.UUID +import java.nio.file.Paths + +import scala.concurrent.duration._ + +import cats.syntax.either._ +import cats.effect.IO + +import org.http4s.Uri + +import cats.effect.testing.specs2.CatsIO + +import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io +import com.snowplowanalytics.snowplow.enrich.common.fs2.config.{ConfigFile, Sentry} +import com.snowplowanalytics.snowplow.enrich.common.SpecHelpers.adaptersSchemas + +import org.specs2.mutable.Specification + +class ConfigSpec extends Specification with CatsIO { + + "parse" should { + "parse reference example for Kafka" in { + val configPath = Paths.get(getClass.getResource("/config.kafka.extended.hocon").toURI) + val expected = ConfigFile( + io.Input.Kafka( + "collector-payloads", + "localhost:9092", + Map( + "auto.offset.reset" -> "earliest", + "session.timeout.ms" -> "45000", + "enable.auto.commit" -> "false", + "group.id" -> "enrich" + ) + ), + io.Outputs( + io.Output.Kafka( + "enriched", + "localhost:9092", + "app_id", + Set("app_id"), + Map("acks" -> "all") + ), + Some( + io.Output.Kafka( + "pii", + "localhost:9092", + "app_id", + Set("app_id"), + Map("acks" -> "all") + ) + ), + io.Output.Kafka( + "bad", + "localhost:9092", + "", + Set(), + Map("acks" -> "all") + ) + ), + io.Concurrency(256, 1), + Some(7.days), + io.RemoteAdapterConfigs( + 10.seconds, + 45.seconds, + 10, + List( + io.RemoteAdapterConfig("com.example", "v1", "https://remote-adapter.com") + ) + ), + io.Monitoring( + Some(Sentry(URI.create("http://sentry.acme.com"))), + io.MetricsReporters( + Some(io.MetricsReporters.StatsD("localhost", 8125, Map("app" -> "enrich"), 10.seconds, None)), + Some(io.MetricsReporters.Stdout(10.seconds, None)), + true + ) + ), + io.Telemetry( + false, + 15.minutes, + "POST", + "collector-g.snowplowanalytics.com", + 443, + true, + Some("my_pipeline"), + Some("hfy67e5ydhtrd"), + Some("665bhft5u6udjf"), + Some("enrich-kafka-ce"), + Some("1.0.0") + ), + io.FeatureFlags( + false, + false, + false + ), + Some( + io.Experimental( + Some( + io.Metadata( + Uri.uri("https://my_pipeline.my_domain.com/iglu"), + 5.minutes, + UUID.fromString("c5f3a09f-75f8-4309-bec5-fea560f78455"), + UUID.fromString("75a13583-5c99-40e3-81fc-541084dfc784") + ) + ) + ) + ), + adaptersSchemas, + io.BlobStorageClients(gcs = false, s3 = false, azureStorage = None) + ) + ConfigFile.parse[IO](configPath.asRight).value.map(result => result must beRight(expected)) + } + + "parse minimal example for Kafka" in { + val configPath = Paths.get(getClass.getResource("/config.kafka.minimal.hocon").toURI) + val expected = ConfigFile( + io.Input.Kafka( + "collector-payloads", + "localhost:9092", + Map( + "auto.offset.reset" -> "earliest", + "enable.auto.commit" -> "false", + "group.id" -> "enrich" + ) + ), + io.Outputs( + io.Output.Kafka( + "enriched", + "localhost:9092", + "", + Set(), + Map("acks" -> "all") + ), + None, + io.Output.Kafka( + "bad", + "localhost:9092", + "", + Set(), + Map("acks" -> "all") + ) + ), + io.Concurrency(256, 1), + None, + io.RemoteAdapterConfigs( + 10.seconds, + 45.seconds, + 10, + List() + ), + io.Monitoring( + None, + io.MetricsReporters( + None, + None, + true + ) + ), + io.Telemetry( + false, + 15.minutes, + "POST", + "collector-g.snowplowanalytics.com", + 443, + true, + None, + None, + None, + None, + None + ), + io.FeatureFlags( + false, + false, + false + ), + None, + adaptersSchemas, + io.BlobStorageClients(gcs = false, s3 = false, azureStorage = None) + ) + ConfigFile.parse[IO](configPath.asRight).value.map(result => result must beRight(expected)) + } + } + +} diff --git a/modules/kinesis/src/main/resources/application.conf b/modules/kinesis/src/main/resources/application.conf index bd763f030..a75006bde 100644 --- a/modules/kinesis/src/main/resources/application.conf +++ b/modules/kinesis/src/main/resources/application.conf @@ -98,4 +98,9 @@ "legacyEnrichmentOrder": false "tryBase64Decoding": false } + + "blobStorage": { + "gcs": false + "s3": true + } } diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/enrich/kinesis/KinesisRun.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/enrich/kinesis/KinesisRun.scala index 6ee094b1c..c0d02ae6c 100644 --- a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/enrich/kinesis/KinesisRun.scala +++ b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/enrich/kinesis/KinesisRun.scala @@ -30,6 +30,8 @@ import software.amazon.kinesis.exceptions.ShutdownException import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.Cloud import com.snowplowanalytics.snowplow.enrich.common.fs2.Run +import com.snowplowanalytics.snowplow.enrich.aws.S3Client + import com.snowplowanalytics.snowplow.enrich.kinesis.generated.BuildInfo object KinesisRun { @@ -53,7 +55,7 @@ object KinesisRun { (blocker, out) => Sink.initAttributed(blocker, out), (blocker, out) => Sink.init(blocker, out), checkpoint[F], - List(_ => S3Client.mk[F]), + _ => List(_ => S3Client.mk[F]), getPayload, MaxRecordSize, Some(Cloud.Aws), diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/enrich/kinesis/S3Client.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/enrich/kinesis/S3Client.scala deleted file mode 100644 index 5a23f3386..000000000 --- a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/enrich/kinesis/S3Client.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (c) 2021-2021 Snowplow Analytics Ltd. All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, - * and you may not use this file except in compliance with the Apache License Version 2.0. - * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the Apache License Version 2.0 is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. - */ -package com.snowplowanalytics.snowplow.enrich.kinesis - -import java.net.URI - -import cats.implicits._ -import cats.effect.{ConcurrentEffect, Resource, Timer} - -import fs2.Stream - -import blobstore.url.Url -import blobstore.s3.S3Store - -import software.amazon.awssdk.services.s3.S3AsyncClient -import software.amazon.awssdk.core.exception.SdkException - -import com.snowplowanalytics.snowplow.enrich.common.fs2.io.Clients.{Client, RetryableFailure} - -object S3Client { - - def mk[F[_]: ConcurrentEffect: Timer]: Resource[F, Client[F]] = - for { - s3Client <- Resource.fromAutoCloseable(ConcurrentEffect[F].delay(S3AsyncClient.builder().build())) - store <- Resource.eval(S3Store.builder[F](s3Client).build.toEither.leftMap(_.head).pure[F].rethrow) - } yield new Client[F] { - val prefixes = List("s3") - - def download(uri: URI): Stream[F, Byte] = - Stream.eval(Url.parseF[F](uri.toString)).flatMap { url => - store - .get(url, 16 * 1024) - .handleErrorWith { e => - val e2 = e match { - case sdke: SdkException if sdke.retryable => - new RetryableFailure { - override def getMessage: String = sdke.getMessage - override def getCause: Throwable = sdke - } - case e => e - } - Stream.raiseError[F](e2) - } - } - } -} diff --git a/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/enrich/kinesis/ConfigSpec.scala b/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/enrich/kinesis/ConfigSpec.scala new file mode 100644 index 000000000..f4734bf63 --- /dev/null +++ b/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/enrich/kinesis/ConfigSpec.scala @@ -0,0 +1,219 @@ +/* + * Copyright (c) 2019-2023 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.kinesis + +import java.net.URI +import java.util.UUID +import java.nio.file.Paths + +import scala.concurrent.duration._ + +import cats.syntax.either._ +import cats.effect.IO + +import cats.effect.testing.specs2.CatsIO + +import org.http4s.Uri + +import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io +import com.snowplowanalytics.snowplow.enrich.common.fs2.config.{ConfigFile, Sentry} +import com.snowplowanalytics.snowplow.enrich.common.SpecHelpers.adaptersSchemas + +import org.specs2.mutable.Specification + +class ConfigSpec extends Specification with CatsIO { + + "parse" should { + "parse reference example for Kinesis" in { + val configPath = Paths.get(getClass.getResource("/config.kinesis.extended.hocon").toURI) + val expected = ConfigFile( + io.Input.Kinesis( + "snowplow-enrich-kinesis", + "collector-payloads", + Some("eu-central-1"), + io.Input.Kinesis.InitPosition.TrimHorizon, + io.Input.Kinesis.Retrieval.Polling(10000), + 3, + io.BackoffPolicy(100.milli, 10.second, Some(10)), + None, + None, + None + ), + io.Outputs( + io.Output.Kinesis( + "enriched", + Some("eu-central-1"), + None, + io.BackoffPolicy(100.millis, 10.seconds, Some(10)), + io.BackoffPolicy(100.millis, 1.second, None), + 500, + 5242880, + None + ), + Some( + io.Output.Kinesis( + "pii", + Some("eu-central-1"), + None, + io.BackoffPolicy(100.millis, 10.seconds, Some(10)), + io.BackoffPolicy(100.millis, 1.second, None), + 500, + 5242880, + None + ) + ), + io.Output.Kinesis( + "bad", + Some("eu-central-1"), + None, + io.BackoffPolicy(100.millis, 10.seconds, Some(10)), + io.BackoffPolicy(100.millis, 1.second, None), + 500, + 5242880, + None + ) + ), + io.Concurrency(256, 1), + Some(7.days), + io.RemoteAdapterConfigs( + 10.seconds, + 45.seconds, + 10, + List( + io.RemoteAdapterConfig("com.example", "v1", "https://remote-adapter.com") + ) + ), + io.Monitoring( + Some(Sentry(URI.create("http://sentry.acme.com"))), + io.MetricsReporters( + Some(io.MetricsReporters.StatsD("localhost", 8125, Map("app" -> "enrich"), 10.seconds, None)), + Some(io.MetricsReporters.Stdout(10.seconds, None)), + true + ) + ), + io.Telemetry( + false, + 15.minutes, + "POST", + "collector-g.snowplowanalytics.com", + 443, + true, + Some("my_pipeline"), + Some("hfy67e5ydhtrd"), + Some("665bhft5u6udjf"), + Some("enrich-kinesis-ce"), + Some("1.0.0") + ), + io.FeatureFlags( + false, + false, + false + ), + Some( + io.Experimental( + Some( + io.Metadata( + Uri.uri("https://my_pipeline.my_domain.com/iglu"), + 5.minutes, + UUID.fromString("c5f3a09f-75f8-4309-bec5-fea560f78455"), + UUID.fromString("75a13583-5c99-40e3-81fc-541084dfc784") + ) + ) + ) + ), + adaptersSchemas, + io.BlobStorageClients(gcs = false, s3 = true, azureStorage = None) + ) + ConfigFile.parse[IO](configPath.asRight).value.map(result => result must beRight(expected)) + } + + "parse minimal example for Kinesis" in { + val configPath = Paths.get(getClass.getResource("/config.kinesis.minimal.hocon").toURI) + val expected = ConfigFile( + io.Input.Kinesis( + "snowplow-enrich-kinesis", + "collector-payloads", + None, + io.Input.Kinesis.InitPosition.TrimHorizon, + io.Input.Kinesis.Retrieval.Polling(10000), + 3, + io.BackoffPolicy(100.milli, 10.second, Some(10)), + None, + None, + None + ), + io.Outputs( + io.Output.Kinesis( + "enriched", + None, + None, + io.BackoffPolicy(100.millis, 10.seconds, Some(10)), + io.BackoffPolicy(100.millis, 1.second, None), + 500, + 5242880, + None + ), + None, + io.Output.Kinesis( + "bad", + None, + None, + io.BackoffPolicy(100.millis, 10.seconds, Some(10)), + io.BackoffPolicy(100.millis, 1.second, None), + 500, + 5242880, + None + ) + ), + io.Concurrency(256, 1), + None, + io.RemoteAdapterConfigs( + 10.seconds, + 45.seconds, + 10, + List() + ), + io.Monitoring( + None, + io.MetricsReporters( + None, + None, + true + ) + ), + io.Telemetry( + false, + 15.minutes, + "POST", + "collector-g.snowplowanalytics.com", + 443, + true, + None, + None, + None, + None, + None + ), + io.FeatureFlags( + false, + false, + false + ), + None, + adaptersSchemas, + io.BlobStorageClients(gcs = false, s3 = true, azureStorage = None) + ) + ConfigFile.parse[IO](configPath.asRight).value.map(result => result must beRight(expected)) + } + } +} diff --git a/modules/nsq/src/main/resources/application.conf b/modules/nsq/src/main/resources/application.conf index 58eac0065..ad6d39326 100644 --- a/modules/nsq/src/main/resources/application.conf +++ b/modules/nsq/src/main/resources/application.conf @@ -61,4 +61,9 @@ "legacyEnrichmentOrder": false "tryBase64Decoding": false } + + "blobStorage": { + "gcs": true + "s3": true + } } diff --git a/modules/nsq/src/main/scala/com/snowplowanalytics/snowplow/enrich/nsq/Main.scala b/modules/nsq/src/main/scala/com/snowplowanalytics/snowplow/enrich/nsq/Main.scala index d1849be02..6417170a2 100644 --- a/modules/nsq/src/main/scala/com/snowplowanalytics/snowplow/enrich/nsq/Main.scala +++ b/modules/nsq/src/main/scala/com/snowplowanalytics/snowplow/enrich/nsq/Main.scala @@ -16,13 +16,19 @@ import cats.Parallel import cats.implicits._ -import cats.effect.{ExitCode, IO, IOApp, Resource, Sync, SyncIO} +import cats.effect.{Blocker, ExitCode, IO, IOApp, Resource, Sync, SyncIO} import java.util.concurrent.{Executors, TimeUnit} import scala.concurrent.ExecutionContext import com.snowplowanalytics.snowplow.enrich.common.fs2.Run +import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{BlobStorageClients => BlobStorageClientsConfig} +import com.snowplowanalytics.snowplow.enrich.common.fs2.io.Clients.Client + +import com.snowplowanalytics.snowplow.enrich.aws.S3Client +import com.snowplowanalytics.snowplow.enrich.gcp.GcsClient +import com.snowplowanalytics.snowplow.enrich.azure.AzureStorageClient import com.snowplowanalytics.snowplow.enrich.nsq.generated.BuildInfo @@ -61,7 +67,7 @@ object Main extends IOApp.WithContext { (blocker, out) => Sink.initAttributed(blocker, out), (blocker, out) => Sink.init(blocker, out), checkpoint, - List(b => Resource.eval(GcsClient.mk[IO](b)), _ => S3Client.mk[IO]), + createBlobStorageClient, _.data, MaxRecordSize, None, @@ -70,4 +76,11 @@ object Main extends IOApp.WithContext { private def checkpoint[F[_]: Parallel: Sync](records: List[Record[F]]): F[Unit] = records.parTraverse_(_.ack) + + private def createBlobStorageClient(conf: BlobStorageClientsConfig): List[Blocker => Resource[IO, Client[IO]]] = { + val gcs = if (conf.gcs) Some((b: Blocker) => Resource.eval(GcsClient.mk[IO](b))) else None + val aws = if (conf.s3) Some((_: Blocker) => S3Client.mk[IO]) else None + val azure = conf.azureStorage.map(s => (_: Blocker) => AzureStorageClient.mk[IO](s.storageAccountName)) + List(gcs, aws, azure).flatten + } } diff --git a/modules/nsq/src/test/scala/com/snowplowanalytics/snowplow/enrich/nsq/ConfigSpec.scala b/modules/nsq/src/test/scala/com/snowplowanalytics/snowplow/enrich/nsq/ConfigSpec.scala new file mode 100644 index 000000000..52f652d8d --- /dev/null +++ b/modules/nsq/src/test/scala/com/snowplowanalytics/snowplow/enrich/nsq/ConfigSpec.scala @@ -0,0 +1,220 @@ +/* + * Copyright (c) 2019-2023 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.nsq + +import java.net.URI +import java.util.UUID +import java.nio.file.Paths + +import scala.concurrent.duration._ + +import cats.syntax.either._ +import cats.effect.IO + +import org.http4s.Uri + +import cats.effect.testing.specs2.CatsIO + +import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io +import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.BackoffPolicy +import com.snowplowanalytics.snowplow.enrich.common.fs2.config.{ConfigFile, Sentry} +import com.snowplowanalytics.snowplow.enrich.common.SpecHelpers.adaptersSchemas + +import org.specs2.mutable.Specification + +class ConfigSpec extends Specification with CatsIO { + + "parse" should { + "parse reference example for NSQ" in { + val configPath = Paths.get(getClass.getResource("/config.nsq.extended.hocon").toURI) + val expected = ConfigFile( + io.Input.Nsq( + "collector-payloads", + "collector-payloads-channel", + "127.0.0.1", + 4161, + 3000, + BackoffPolicy( + minBackoff = 100.milliseconds, + maxBackoff = 10.seconds, + maxRetries = Some(10) + ) + ), + io.Outputs( + io.Output.Nsq( + "enriched", + "127.0.0.1", + 4150, + BackoffPolicy( + minBackoff = 100.milliseconds, + maxBackoff = 10.seconds, + maxRetries = Some(10) + ) + ), + Some( + io.Output.Nsq( + "pii", + "127.0.0.1", + 4150, + BackoffPolicy( + minBackoff = 100.milliseconds, + maxBackoff = 10.seconds, + maxRetries = Some(10) + ) + ) + ), + io.Output.Nsq( + "bad", + "127.0.0.1", + 4150, + BackoffPolicy( + minBackoff = 100.milliseconds, + maxBackoff = 10.seconds, + maxRetries = Some(10) + ) + ) + ), + io.Concurrency(256, 3), + Some(7.days), + io.RemoteAdapterConfigs( + 10.seconds, + 45.seconds, + 10, + List( + io.RemoteAdapterConfig("com.example", "v1", "https://remote-adapter.com") + ) + ), + io.Monitoring( + Some(Sentry(URI.create("http://sentry.acme.com"))), + io.MetricsReporters( + Some(io.MetricsReporters.StatsD("localhost", 8125, Map("app" -> "enrich"), 10.seconds, None)), + Some(io.MetricsReporters.Stdout(10.seconds, None)), + false + ) + ), + io.Telemetry( + false, + 15.minutes, + "POST", + "collector-g.snowplowanalytics.com", + 443, + true, + Some("my_pipeline"), + Some("hfy67e5ydhtrd"), + Some("665bhft5u6udjf"), + Some("enrich-nsq-ce"), + Some("1.0.0") + ), + io.FeatureFlags( + false, + false, + false + ), + Some( + io.Experimental( + Some( + io.Metadata( + Uri.uri("https://my_pipeline.my_domain.com/iglu"), + 5.minutes, + UUID.fromString("c5f3a09f-75f8-4309-bec5-fea560f78455"), + UUID.fromString("75a13583-5c99-40e3-81fc-541084dfc784") + ) + ) + ) + ), + adaptersSchemas, + io.BlobStorageClients(gcs = true, s3 = true, azureStorage = None) + ) + ConfigFile.parse[IO](configPath.asRight).value.map(result => result must beRight(expected)) + } + + "parse minimal example for NSQ" in { + val configPath = Paths.get(getClass.getResource("/config.nsq.minimal.hocon").toURI) + val expected = ConfigFile( + io.Input.Nsq( + "collector-payloads", + "collector-payloads-channel", + "127.0.0.1", + 4161, + 3000, + BackoffPolicy( + minBackoff = 100.milliseconds, + maxBackoff = 10.seconds, + maxRetries = Some(10) + ) + ), + io.Outputs( + io.Output.Nsq( + "enriched", + "127.0.0.1", + 4150, + BackoffPolicy( + minBackoff = 100.milliseconds, + maxBackoff = 10.seconds, + maxRetries = Some(10) + ) + ), + None, + io.Output.Nsq( + "bad", + "127.0.0.1", + 4150, + BackoffPolicy( + minBackoff = 100.milliseconds, + maxBackoff = 10.seconds, + maxRetries = Some(10) + ) + ) + ), + io.Concurrency(256, 3), + None, + io.RemoteAdapterConfigs( + 10.seconds, + 45.seconds, + 10, + List() + ), + io.Monitoring( + None, + io.MetricsReporters( + None, + None, + false + ) + ), + io.Telemetry( + false, + 15.minutes, + "POST", + "collector-g.snowplowanalytics.com", + 443, + true, + None, + None, + None, + None, + None + ), + io.FeatureFlags( + false, + false, + false + ), + None, + adaptersSchemas, + io.BlobStorageClients(gcs = true, s3 = true, azureStorage = None) + ) + ConfigFile.parse[IO](configPath.asRight).value.map(result => result must beRight(expected)) + } + } +} diff --git a/modules/pubsub/src/main/resources/application.conf b/modules/pubsub/src/main/resources/application.conf index c8e13272f..bdb750caf 100644 --- a/modules/pubsub/src/main/resources/application.conf +++ b/modules/pubsub/src/main/resources/application.conf @@ -76,4 +76,9 @@ "legacyEnrichmentOrder": false "tryBase64Decoding": false } + + "blobStorage": { + "gcs": true + "s3": false + } } diff --git a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/GcsClient.scala b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/GcsClient.scala deleted file mode 100644 index 0c32fe6e0..000000000 --- a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/GcsClient.scala +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright (c) 2021-2021 Snowplow Analytics Ltd. All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, - * and you may not use this file except in compliance with the Apache License Version 2.0. - * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the Apache License Version 2.0 is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. - */ -package com.snowplowanalytics.snowplow.enrich.pubsub - -import java.net.URI - -import cats.implicits._ -import cats.effect.{Blocker, ConcurrentEffect, ContextShift, Timer} - -import fs2.Stream - -import blobstore.gcs.GcsStore -import blobstore.url.Url - -import com.google.cloud.storage.StorageOptions -import com.google.cloud.BaseServiceException - -import com.snowplowanalytics.snowplow.enrich.common.fs2.io.Clients.{Client, RetryableFailure} - -object GcsClient { - - def mk[F[_]: ConcurrentEffect: ContextShift: Timer](blocker: Blocker): F[Client[F]] = - ConcurrentEffect[F].delay(StorageOptions.getDefaultInstance.getService).map { service => - new Client[F] { - val prefixes = List("gs") - - val store = GcsStore.builder(service, blocker).unsafe - - def download(uri: URI): Stream[F, Byte] = - Stream.eval(Url.parseF[F](uri.toString)).flatMap { url => - store - .get(url, 16 * 1024) - .handleErrorWith { e => - val e2 = e match { - case bse: BaseServiceException if bse.isRetryable => - new RetryableFailure { - override def getMessage: String = bse.getMessage - override def getCause: Throwable = bse - } - case e => e - } - Stream.raiseError[F](e2) - } - } - } - } -} diff --git a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Main.scala b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Main.scala index 09adbca4c..07bf6e3e3 100644 --- a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Main.scala +++ b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Main.scala @@ -26,6 +26,8 @@ import com.permutive.pubsub.consumer.ConsumerRecord import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.Cloud import com.snowplowanalytics.snowplow.enrich.common.fs2.Run +import com.snowplowanalytics.snowplow.enrich.gcp.GcsClient + import com.snowplowanalytics.snowplow.enrich.pubsub.generated.BuildInfo object Main extends IOApp.WithContext { @@ -68,7 +70,7 @@ object Main extends IOApp.WithContext { (_, out) => Sink.initAttributed(out), (_, out) => Sink.init(out), checkpoint, - List(b => Resource.eval(GcsClient.mk[IO](b))), + _ => List(b => Resource.eval(GcsClient.mk[IO](b))), _.value, MaxRecordSize, Some(Cloud.Gcp), diff --git a/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/enrich/pubsub/ConfigSpec.scala b/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/enrich/pubsub/ConfigSpec.scala index 2aac06fb6..02c1ac51e 100644 --- a/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/enrich/pubsub/ConfigSpec.scala +++ b/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/enrich/pubsub/ConfigSpec.scala @@ -121,7 +121,8 @@ class ConfigSpec extends Specification with CatsIO { ) ) ), - adaptersSchemas + adaptersSchemas, + io.BlobStorageClients(gcs = true, s3 = false, azureStorage = None) ) ConfigFile.parse[IO](configPath.asRight).value.map(result => result must beRight(expected)) } @@ -191,7 +192,8 @@ class ConfigSpec extends Specification with CatsIO { false ), None, - adaptersSchemas + adaptersSchemas, + io.BlobStorageClients(gcs = true, s3 = false, azureStorage = None) ) ConfigFile.parse[IO](configPath.asRight).value.map(result => result must beRight(expected)) } diff --git a/project/BuildSettings.scala b/project/BuildSettings.scala index 7812d1ada..562060b12 100644 --- a/project/BuildSettings.scala +++ b/project/BuildSettings.scala @@ -50,6 +50,24 @@ object BuildSettings { description := "Common functionality for streaming enrich applications built on top of functional streams" ) + lazy val awsUtilsProjectSettings = projectSettings ++ Seq( + name := "snowplow-enrich-aws-utils", + moduleName := "snowplow-enrich-aws-utils", + description := "AWS specific utils" + ) + + lazy val gcpUtilsProjectSettings = projectSettings ++ Seq( + name := "snowplow-enrich-gcp-utils", + moduleName := "snowplow-enrich-gcp-utils", + description := "GCP specific utils" + ) + + lazy val azureUtilsProjectSettings = projectSettings ++ Seq( + name := "snowplow-enrich-azure-utils", + moduleName := "snowplow-enrich-azure-utils", + description := "Azure specific utils" + ) + lazy val pubsubProjectSettings = projectSettings ++ Seq( name := "snowplow-enrich-pubsub", moduleName := "snowplow-enrich-pubsub", @@ -211,6 +229,27 @@ object BuildSettings { scoverageSettings ++ noParallelTestExecution ++ addExampleConfToTestCp } + lazy val awsUtilsBuildSettings = { + // Project + awsUtilsProjectSettings ++ buildSettings ++ + // Tests + scoverageSettings ++ noParallelTestExecution ++ addExampleConfToTestCp + } + + lazy val gcpUtilsBuildSettings = { + // Project + gcpUtilsProjectSettings ++ buildSettings ++ + // Tests + scoverageSettings ++ noParallelTestExecution ++ addExampleConfToTestCp + } + + lazy val azureUtilsBuildSettings = { + // Project + azureUtilsProjectSettings ++ buildSettings ++ + // Tests + scoverageSettings ++ noParallelTestExecution ++ addExampleConfToTestCp + } + lazy val pubsubBuildSettings = { // Project pubsubProjectSettings ++ buildSettings ++ @@ -230,7 +269,7 @@ object BuildSettings { assemblySettings ++ dockerSettingsFocal ++ Seq(Docker / packageName := "snowplow-enrich-kinesis") ++ // Tests - scoverageSettings ++ noParallelTestExecution ++ Seq(Test / fork := true) + scoverageSettings ++ noParallelTestExecution ++ Seq(Test / fork := true) ++ addExampleConfToTestCp } lazy val kinesisDistrolessBuildSettings = kinesisBuildSettings.diff(dockerSettingsFocal) ++ dockerSettingsDistroless @@ -242,7 +281,7 @@ object BuildSettings { assemblySettings ++ dockerSettingsFocal ++ Seq(Docker / packageName := "snowplow-enrich-kafka") ++ // Tests - scoverageSettings ++ noParallelTestExecution + scoverageSettings ++ noParallelTestExecution ++ addExampleConfToTestCp } lazy val kafkaDistrolessBuildSettings = kafkaBuildSettings.diff(dockerSettingsFocal) ++ dockerSettingsDistroless @@ -254,7 +293,7 @@ object BuildSettings { assemblySettings ++ dockerSettingsFocal ++ Seq(Docker / packageName := "snowplow-enrich-nsq") ++ // Tests - scoverageSettings ++ noParallelTestExecution + scoverageSettings ++ noParallelTestExecution ++ addExampleConfToTestCp } lazy val nsqDistrolessBuildSettings = nsqBuildSettings.diff(dockerSettingsFocal) ++ dockerSettingsDistroless diff --git a/project/Dependencies.scala b/project/Dependencies.scala index b3e116a48..4ef6837fe 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -86,6 +86,7 @@ object Dependencies { val fs2Aws = "3.1.1" val fs2Kafka = "1.10.0" val fs2BlobStorage = "0.8.6" + val azureIdentity = "1.11.0" val http4s = "0.21.34" val log4cats = "1.3.0" val catsRetry = "2.1.0" @@ -192,12 +193,14 @@ object Dependencies { .exclude("software.amazon.glue", "schema-registry-common") .exclude("software.amazon.glue", "schema-registry-serde") val stsSdk2 = "software.amazon.awssdk" % "sts" % V.awsSdk2 % Runtime + val azureIdentity = "com.azure" % "azure-identity" % V.azureIdentity val http4sClient = "org.http4s" %% "http4s-blaze-client" % V.http4s val http4sCirce = "org.http4s" %% "http4s-circe" % V.http4s val log4cats = "org.typelevel" %% "log4cats-slf4j" % V.log4cats val catsRetry = "com.github.cb372" %% "cats-retry" % V.catsRetry val fs2BlobS3 = "com.github.fs2-blobstore" %% "s3" % V.fs2BlobStorage val fs2BlobGcs = "com.github.fs2-blobstore" %% "gcs" % V.fs2BlobStorage + val fs2BlobAzure = "com.github.fs2-blobstore" %% "azure" % V.fs2BlobStorage val http4sDsl = "org.http4s" %% "http4s-dsl" % V.http4s % Test val http4sServer = "org.http4s" %% "http4s-blaze-server" % V.http4s % Test val trackerCore = "com.snowplowanalytics" %% "snowplow-scala-tracker-core" % V.snowplowTracker @@ -282,6 +285,21 @@ object Dependencies { circeParser % Test ) + val awsUtilsDependencies = Seq( + fs2BlobS3, + s3Sdk2 + ) + + val gcpUtilsDependencies = Seq( + fs2BlobGcs, + gcs + ) + + val azureUtilsDependencies = Seq( + fs2BlobAzure, + azureIdentity + ) + val pubsubDependencies = Seq( fs2BlobGcs, gcs, @@ -302,20 +320,25 @@ object Dependencies { kinesisClient2, stsSdk2, sts, - specs2 + specs2, + specs2CE ) val kafkaDependencies = Seq( fs2Kafka, kafkaClients, // override kafka-clients 2.8.1 from fs2Kafka to address https://security.snyk.io/vuln/SNYK-JAVA-ORGAPACHEKAFKA-3027430 - mskAuth + mskAuth, + specs2, + specs2CE ) val nsqDependencies = Seq( nsqClient, fs2BlobS3, fs2BlobGcs, - log4j // for security vulnerabilities + log4j, // for security vulnerabilities + specs2, + specs2CE ) // exclusions