Skip to content

Commit

Permalink
enrich-kafka: add blob storage support (close #831)
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Nov 17, 2023
1 parent 4c373e9 commit 7e0554f
Show file tree
Hide file tree
Showing 26 changed files with 939 additions and 362 deletions.
46 changes: 42 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,29 @@ lazy val commonFs2 = project
.settings(addCompilerPlugin(betterMonadicFor))
.dependsOn(common % "test->test;compile->compile")

lazy val awsUtils = project
.in(file("modules/cloudutils/aws"))
.enablePlugins(BuildInfoPlugin)
.settings(awsUtilsBuildSettings)
.settings(libraryDependencies ++= awsUtilsDependencies)
.settings(addCompilerPlugin(betterMonadicFor))
.dependsOn(commonFs2 % "test->test;compile->compile")

lazy val gcpUtils = project
.in(file("modules/cloudutils/gcp"))
.enablePlugins(BuildInfoPlugin)
.settings(gcpUtilsBuildSettings)
.settings(libraryDependencies ++= gcpUtilsDependencies)
.settings(addCompilerPlugin(betterMonadicFor))
.dependsOn(commonFs2 % "test->test;compile->compile")

lazy val azureUtils = project
.in(file("modules/cloudutils/azure"))
.enablePlugins(BuildInfoPlugin)
.settings(azureUtilsBuildSettings)
.settings(libraryDependencies ++= azureUtilsDependencies)
.settings(addCompilerPlugin(betterMonadicFor))
.dependsOn(commonFs2 % "test->test;compile->compile")

lazy val pubsub = project
.in(file("modules/pubsub"))
Expand All @@ -50,6 +73,7 @@ lazy val pubsub = project
.settings(excludeDependencies ++= exclusions)
.settings(addCompilerPlugin(betterMonadicFor))
.dependsOn(commonFs2 % "test->test;compile->compile")
.dependsOn(gcpUtils % "compile->compile;it->it")

lazy val pubsubDistroless = project
.in(file("modules/distroless/pubsub"))
Expand All @@ -60,7 +84,7 @@ lazy val pubsubDistroless = project
.settings(excludeDependencies ++= exclusions)
.settings(addCompilerPlugin(betterMonadicFor))
.dependsOn(commonFs2 % "test->test;compile->compile")

.dependsOn(gcpUtils % "compile->compile;it->it")

lazy val kinesis = project
.in(file("modules/kinesis"))
Expand All @@ -69,7 +93,8 @@ lazy val kinesis = project
.settings(libraryDependencies ++= kinesisDependencies)
.settings(excludeDependencies ++= exclusions)
.settings(addCompilerPlugin(betterMonadicFor))
.dependsOn(commonFs2)
.dependsOn(commonFs2 % "test->test;compile->compile")
.dependsOn(awsUtils % "compile->compile;it->it")

lazy val kinesisDistroless = project
.in(file("modules/distroless/kinesis"))
Expand All @@ -84,6 +109,7 @@ lazy val kinesisDistroless = project
.settings(excludeDependencies ++= exclusions)
.settings(addCompilerPlugin(betterMonadicFor))
.dependsOn(commonFs2 % "compile->compile;it->it")
.dependsOn(awsUtils % "compile->compile;it->it")
.settings(Defaults.itSettings)
.configs(IntegrationTest)
.settings((IntegrationTest / test) := (IntegrationTest / test).dependsOn(Docker / publishLocal).value)
Expand All @@ -101,7 +127,10 @@ lazy val kafka = project
.settings(Defaults.itSettings)
.configs(IntegrationTest)
.settings(addCompilerPlugin(betterMonadicFor))
.dependsOn(commonFs2 % "compile->compile;it->it")
.dependsOn(commonFs2 % "compile->compile;test->test;it->it")
.dependsOn(awsUtils % "compile->compile;it->it")
.dependsOn(gcpUtils % "compile->compile;it->it")
.dependsOn(azureUtils % "compile->compile;it->it")

lazy val kafkaDistroless = project
.in(file("modules/distroless/kafka"))
Expand All @@ -112,6 +141,9 @@ lazy val kafkaDistroless = project
.settings(excludeDependencies ++= exclusions)
.settings(addCompilerPlugin(betterMonadicFor))
.dependsOn(commonFs2)
.dependsOn(awsUtils % "compile->compile;it->it")
.dependsOn(gcpUtils % "compile->compile;it->it")
.dependsOn(azureUtils % "compile->compile;it->it")

lazy val nsq = project
.in(file("modules/nsq"))
Expand All @@ -120,7 +152,10 @@ lazy val nsq = project
.settings(libraryDependencies ++= nsqDependencies)
.settings(excludeDependencies ++= exclusions)
.settings(addCompilerPlugin(betterMonadicFor))
.dependsOn(commonFs2)
.dependsOn(commonFs2 % "compile->compile;test->test")
.dependsOn(awsUtils % "compile->compile;it->it")
.dependsOn(gcpUtils % "compile->compile;it->it")
.dependsOn(azureUtils % "compile->compile;it->it")

lazy val nsqDistroless = project
.in(file("modules/distroless/nsq"))
Expand All @@ -135,6 +170,9 @@ lazy val nsqDistroless = project
.settings(excludeDependencies ++= exclusions)
.settings(addCompilerPlugin(betterMonadicFor))
.dependsOn(commonFs2 % "compile->compile;it->it")
.dependsOn(awsUtils % "compile->compile;it->it")
.dependsOn(gcpUtils % "compile->compile;it->it")
.dependsOn(azureUtils % "compile->compile;it->it")
.settings(Defaults.itSettings)
.configs(IntegrationTest)
.settings((IntegrationTest / test) := (IntegrationTest / test).dependsOn(Docker / publishLocal).value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.enrich.nsq
package com.snowplowanalytics.snowplow.enrich.aws

import java.net.URI

Expand All @@ -36,7 +36,8 @@ object S3Client {
s3Client <- Resource.fromAutoCloseable(ConcurrentEffect[F].delay(S3AsyncClient.builder().region(getRegion).build()))
store <- Resource.eval(S3Store.builder[F](s3Client).build.toEither.leftMap(_.head).pure[F].rethrow)
} yield new Client[F] {
val prefixes = List("s3")
def canDownload(uri: URI): Boolean =
uri.getScheme == "s3"

def download(uri: URI): Stream[F, Byte] =
Stream.eval(Url.parseF[F](uri.toString)).flatMap { url =>
Expand Down Expand Up @@ -64,3 +65,4 @@ object S3Client {
Region.EU_CENTRAL_1
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright (c) 2023 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.enrich.azure

import blobstore.azure.AzureStore
import blobstore.url.exception.{AuthorityParseError, MultipleUrlValidationException, Throwables}
import blobstore.url.{Authority, Path, Url}
import cats.data.Validated.{Invalid, Valid}
import cats.data.ValidatedNec
import cats.effect._
import cats.implicits._
import com.azure.identity.DefaultAzureCredentialBuilder
import com.azure.storage.blob.{BlobServiceClientBuilder, BlobUrlParts}
import fs2.Stream
import java.net.URI
import com.snowplowanalytics.snowplow.enrich.common.fs2.io.Clients.Client

object AzureStorageClient {

def mk[F[_]: ConcurrentEffect](storageAccountName: String): Resource[F, Client[F]] =
for {
store <- createStore(storageAccountName)
} yield new Client[F] {
def canDownload(uri: URI): Boolean =
uri.toString.contains("core.windows.net")

def download(uri: URI): Stream[F, Byte] =
createStorageUrlFrom(uri.toString) match {
case Valid(url) => store.get(url, 16 * 1024)
case Invalid(errors) => Stream.raiseError[F](MultipleUrlValidationException(errors))
}
}

private def createStore[F[_]: ConcurrentEffect: Async](storageAccountName: String): Resource[F, AzureStore[F]] =
for {
client <- Resource.eval {
ConcurrentEffect[F].delay {
val builder = new BlobServiceClientBuilder().credential(new DefaultAzureCredentialBuilder().build)
val storageEndpoint = createStorageEndpoint(storageAccountName)
builder.endpoint(storageEndpoint).buildAsyncClient()
}
}
store <- AzureStore
.builder[F](client)
.build
.fold(
errors => Resource.eval(ConcurrentEffect[F].raiseError(errors.reduce(Throwables.collapsingSemigroup))),
s => Resource.pure[F, AzureStore[F]](s)
)
} yield store

private def createStorageUrlFrom(input: String): ValidatedNec[AuthorityParseError, Url[String]] = {
val inputParts = BlobUrlParts.parse(input)
Authority
.parse(inputParts.getBlobContainerName)
.map(authority => Url(inputParts.getScheme, authority, Path(inputParts.getBlobName)))
}

private def createStorageEndpoint(storageAccountName: String): String =
s"https://$storageAccountName.blob.core.windows.net"
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.enrich.nsq
package com.snowplowanalytics.snowplow.enrich.gcp

import java.net.URI

Expand All @@ -32,10 +32,10 @@ object GcsClient {
def mk[F[_]: ConcurrentEffect: ContextShift: Timer](blocker: Blocker): F[Client[F]] =
ConcurrentEffect[F].delay(StorageOptions.getDefaultInstance.getService).map { service =>
new Client[F] {
val prefixes = List("gs")

val store = GcsStore.builder(service, blocker).unsafe

def canDownload(uri: URI): Boolean = uri.getScheme == "gs"

def download(uri: URI): Stream[F, Byte] =
Stream.eval(Url.parseF[F](uri.toString)).flatMap { url =>
store
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,15 @@ import retry.syntax.all._

import com.snowplowanalytics.snowplow.badrows.Processor

import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{BackoffPolicy, Cloud, Input, Monitoring, Output, RetryCheckpointing}
import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{
BackoffPolicy,
BlobStorageClients,
Cloud,
Input,
Monitoring,
Output,
RetryCheckpointing
}
import com.snowplowanalytics.snowplow.enrich.common.fs2.config.{CliConfig, ParsedConfigs}
import com.snowplowanalytics.snowplow.enrich.common.fs2.io.{FileSink, Retries, Source}
import com.snowplowanalytics.snowplow.enrich.common.fs2.io.Clients.Client
Expand All @@ -49,7 +57,7 @@ object Run {
mkSinkPii: (Blocker, Output) => Resource[F, AttributedByteSink[F]],
mkSinkBad: (Blocker, Output) => Resource[F, ByteSink[F]],
checkpoint: List[A] => F[Unit],
mkClients: List[Blocker => Resource[F, Client[F]]],
mkClients: BlobStorageClients => List[Blocker => Resource[F, Client[F]]],
getPayload: A => Array[Byte],
maxRecordSize: Int,
cloud: Option[Cloud],
Expand Down Expand Up @@ -79,7 +87,7 @@ object Run {
case _ =>
mkSinkBad(blocker, file.output.bad)
}
clients = mkClients.map(mk => mk(blocker)).sequence
clients = mkClients(file.blobStorage).map(mk => mk(blocker)).sequence
exit <- file.input match {
case p: Input.FileSystem =>
val env = Environment
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ final case class ConfigFile(
telemetry: Telemetry,
featureFlags: FeatureFlags,
experimental: Option[Experimental],
adaptersSchemas: AdaptersSchemas
adaptersSchemas: AdaptersSchemas,
blobStorage: BlobStorageClients
)

object ConfigFile {
Expand All @@ -61,16 +62,17 @@ object ConfigFile {

implicit val configFileDecoder: Decoder[ConfigFile] =
deriveConfiguredDecoder[ConfigFile].emap {
case ConfigFile(_, _, _, Some(aup), _, _, _, _, _, _) if aup._1 <= 0L =>
case ConfigFile(_, _, _, Some(aup), _, _, _, _, _, _, _) if aup._1 <= 0L =>
"assetsUpdatePeriod in config file cannot be less than 0".asLeft // TODO: use newtype
// Remove pii output if streamName and region empty
case c @ ConfigFile(_, Outputs(good, Some(output: Output.Kinesis), bad), _, _, _, _, _, _, _, _) if output.streamName.isEmpty =>
case c @ ConfigFile(_, Outputs(good, Some(output: Output.Kinesis), bad), _, _, _, _, _, _, _, _, _) if output.streamName.isEmpty =>
c.copy(output = Outputs(good, None, bad)).asRight
// Remove pii output if topic empty
case c @ ConfigFile(_, Outputs(good, Some(Output.PubSub(t, _, _, _, _, _)), bad), _, _, _, _, _, _, _, _) if t.isEmpty =>
case c @ ConfigFile(_, Outputs(good, Some(Output.PubSub(t, _, _, _, _, _)), bad), _, _, _, _, _, _, _, _, _) if t.isEmpty =>
c.copy(output = Outputs(good, None, bad)).asRight
// Remove pii output if topic empty
case c @ ConfigFile(_, Outputs(good, Some(Output.Kafka(topicName, _, _, _, _)), bad), _, _, _, _, _, _, _, _) if topicName.isEmpty =>
case c @ ConfigFile(_, Outputs(good, Some(Output.Kafka(topicName, _, _, _, _)), bad), _, _, _, _, _, _, _, _, _)
if topicName.isEmpty =>
c.copy(output = Outputs(good, None, bad)).asRight
case other => other.asRight
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,24 @@ object io {
deriveConfiguredEncoder[GcpUserAgent]
}

case class BlobStorageClients(
gcs: Boolean,
s3: Boolean,
azureStorage: Option[BlobStorageClients.AzureStorage]
)
object BlobStorageClients {
case class AzureStorage(storageAccountName: String)

implicit val azureStorageDecoder: Decoder[AzureStorage] =
deriveConfiguredDecoder[AzureStorage]
implicit val azureStorageEncoder: Encoder[AzureStorage] =
deriveConfiguredEncoder[AzureStorage]
implicit val blobStorageClientDecoder: Decoder[BlobStorageClients] =
deriveConfiguredDecoder[BlobStorageClients]
implicit val blobStorageClientEncoder: Encoder[BlobStorageClients] =
deriveConfiguredEncoder[BlobStorageClients]
}

object AdaptersSchemasEncoderDecoders {
implicit val adaptersSchemasDecoder: Decoder[AdaptersSchemas] =
deriveConfiguredDecoder[AdaptersSchemas]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ case class Clients[F[_]: ConcurrentEffect](clients: List[Client[F]]) {

/** Download a URI as a stream of bytes, using the appropriate client */
def download(uri: URI): Stream[F, Byte] =
clients.find(_.prefixes.contains(uri.getScheme())) match {
clients.find(_.canDownload(uri)) match {
case Some(client) =>
client.download(uri)
case None =>
Expand All @@ -47,7 +47,10 @@ object Clients {

def wrapHttpClient[F[_]: ConcurrentEffect](client: Http4sClient[F]): Client[F] =
new Client[F] {
val prefixes = List("http", "https")
def canDownload(uri: URI): Boolean =
// Since Azure Blob Storage urls' scheme are https as well and we want to fetch them with
// their own client, we added second condition to not pick up those urls
(uri.getScheme == "http" || uri.getScheme == "https") && !uri.toString.contains("core.windows.net")

def download(uri: URI): Stream[F, Byte] = {
val request = Request[F](uri = Uri.unsafeFromString(uri.toString))
Expand Down Expand Up @@ -97,7 +100,7 @@ object Clients {
}

trait Client[F[_]] {
val prefixes: List[String]
def canDownload(uri: URI): Boolean
def download(uri: URI): Stream[F, Byte]
}
}
Loading

0 comments on commit 7e0554f

Please sign in to comment.