Skip to content

Commit

Permalink
Refactoring to enable additional sources (#1032)
Browse files Browse the repository at this point in the history
* GCP Storage Source Refactoring

* Refactors to allow reusability of components in both sink and source and reduce code existing code.

* Remove LazyLogging from CloudConfigDef subclasses

* Reduce implicit usage of the validator on Source and Sink tasks
  • Loading branch information
davidsloan authored Feb 23, 2024
1 parent 7d8f2dc commit 047b953
Show file tree
Hide file tree
Showing 82 changed files with 1,506 additions and 894 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
private implicit val cloudLocationValidator = S3LocationValidator
private val bucketAndPrefix = CloudLocation(BucketName, PathPrefix.some)
private def avroConfig = S3SinkConfig(
S3Config(
S3ConnectionConfig(
None,
Some(s3Container.identity.identity),
Some(s3Container.identity.credential),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont

val bucketAndPrefix = CloudLocation(BucketName, PathPrefix.some)
val config = S3SinkConfig(
S3Config(
S3ConnectionConfig(
None,
Some(s3Container.identity.identity),
Some(s3Container.identity.credential),
Expand Down Expand Up @@ -123,7 +123,7 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont

val bucketAndPrefix = CloudLocation(BucketName, PathPrefix.some)
val config = S3SinkConfig(
S3Config(
S3ConnectionConfig(
None,
Some(s3Container.identity.identity),
Some(s3Container.identity.credential),
Expand Down Expand Up @@ -181,7 +181,7 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont

val bucketAndPrefix = CloudLocation(BucketName, PathPrefix.some)
val config = S3SinkConfig(
S3Config(
S3ConnectionConfig(
None,
Some(s3Container.identity.identity),
Some(s3Container.identity.credential),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class S3ParquetWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyC

private val bucketAndPrefix = CloudLocation(BucketName, PathPrefix.some)
private def parquetConfig = S3SinkConfig(
S3Config(
S3ConnectionConfig(
None,
Some(s3Container.identity.identity),
Some(s3Container.identity.credential),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package io.lenses.streamreactor.connect.aws.s3.sink
import cats.effect.IO
import cats.effect.kernel.Resource
import cats.effect.unsafe.implicits.global
import io.lenses.streamreactor.connect.aws.s3.sink.config.S3SinkConfig
import io.lenses.streamreactor.connect.aws.s3.storage.AwsS3StorageInterface
import io.lenses.streamreactor.connect.aws.s3.storage.S3FileMetadata
import io.lenses.streamreactor.connect.aws.s3.utils.S3ProxyContainerTest
Expand All @@ -27,7 +28,9 @@ import software.amazon.awssdk.services.s3.S3Client
import scala.jdk.CollectionConverters.MapHasAsJava

class S3SinkTaskTest
extends CoreSinkTaskTestCases[S3FileMetadata, AwsS3StorageInterface, S3SinkTask, S3Client]("S3SinkTask")
extends CoreSinkTaskTestCases[S3FileMetadata, AwsS3StorageInterface, S3SinkConfig, S3Client, S3SinkTask](
"S3SinkTask",
)
with S3ProxyContainerTest {

"S3SinkTask" should "fail with message when deprecated properties are used" in {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package io.lenses.streamreactor.connect.aws.s3.source

import cats.implicits.catsSyntaxEitherId
import io.lenses.streamreactor.connect.aws.s3.source.config.SourcePartitionSearcherSettingsKeys
import io.lenses.streamreactor.connect.aws.s3.storage.AwsS3StorageInterface
import io.lenses.streamreactor.connect.aws.s3.utils.S3ProxyContainerTest
import io.lenses.streamreactor.connect.cloud.common.model.UploadableFile
import io.lenses.streamreactor.connect.cloud.common.source.config.CloudSourceSettingsKeys
import org.apache.avro.SchemaBuilder
import org.apache.avro.file.CodecFactory
import org.apache.avro.file.DataFileWriter
Expand All @@ -26,7 +26,7 @@ class S3SourceAvroEnvelopeTest
with AnyFlatSpecLike
with Matchers
with EitherValues
with SourcePartitionSearcherSettingsKeys {
with CloudSourceSettingsKeys {

def DefaultProps: Map[String, String] = defaultProps + (
SOURCE_PARTITION_SEARCH_INTERVAL_MILLIS -> "1000",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package io.lenses.streamreactor.connect.aws.s3.source

import cats.implicits.catsSyntaxEitherId
import io.lenses.streamreactor.connect.aws.s3.source.config.SourcePartitionSearcherSettingsKeys
import io.lenses.streamreactor.connect.aws.s3.storage.AwsS3StorageInterface
import io.lenses.streamreactor.connect.aws.s3.utils.S3ProxyContainerTest
import io.lenses.streamreactor.connect.cloud.common.model.UploadableFile
import io.lenses.streamreactor.connect.cloud.common.source.config.CloudSourceSettingsKeys
import org.apache.avro.SchemaBuilder
import org.apache.avro.file.CodecFactory
import org.apache.avro.file.DataFileWriter
Expand All @@ -28,7 +28,7 @@ class S3SourceAvroWithValueAsArrayEnvelopeTest
with AnyFlatSpecLike
with Matchers
with EitherValues
with SourcePartitionSearcherSettingsKeys {
with CloudSourceSettingsKeys {

def DefaultProps: Map[String, String] = defaultProps + (
SOURCE_PARTITION_SEARCH_INTERVAL_MILLIS -> "1000",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package io.lenses.streamreactor.connect.aws.s3.source

import cats.implicits.catsSyntaxEitherId
import io.lenses.streamreactor.connect.aws.s3.source.config.SourcePartitionSearcherSettingsKeys
import io.lenses.streamreactor.connect.aws.s3.storage.AwsS3StorageInterface
import io.lenses.streamreactor.connect.aws.s3.utils.S3ProxyContainerTest
import io.lenses.streamreactor.connect.cloud.common.model.UploadableFile
import io.lenses.streamreactor.connect.cloud.common.source.config.CloudSourceSettingsKeys
import org.apache.avro
import org.apache.avro.SchemaBuilder
import org.apache.avro.file.CodecFactory
Expand All @@ -28,7 +28,7 @@ class S3SourceAvroWithValueAsOptionalArrayEnvelopeTest
with AnyFlatSpecLike
with Matchers
with EitherValues
with SourcePartitionSearcherSettingsKeys {
with CloudSourceSettingsKeys {

def DefaultProps: Map[String, String] = defaultProps + (
SOURCE_PARTITION_SEARCH_INTERVAL_MILLIS -> "1000",
Expand Down Expand Up @@ -155,7 +155,7 @@ class S3SourceAvroWithValueAsOptionalArrayMixValuesEnvelopeTest
with AnyFlatSpecLike
with Matchers
with EitherValues
with SourcePartitionSearcherSettingsKeys {
with CloudSourceSettingsKeys {

def DefaultProps: Map[String, String] = defaultProps + (
SOURCE_PARTITION_SEARCH_INTERVAL_MILLIS -> "1000",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package io.lenses.streamreactor.connect.aws.s3.source

import cats.implicits.catsSyntaxEitherId
import io.lenses.streamreactor.connect.aws.s3.source.config.SourcePartitionSearcherSettingsKeys
import io.lenses.streamreactor.connect.aws.s3.storage.AwsS3StorageInterface
import io.lenses.streamreactor.connect.aws.s3.utils.S3ProxyContainerTest
import io.lenses.streamreactor.connect.cloud.common.model.UploadableFile
import io.lenses.streamreactor.connect.cloud.common.source.config.CloudSourceSettingsKeys
import org.apache.kafka.connect.source.SourceRecord
import org.scalatest.EitherValues
import org.scalatest.concurrent.Eventually.eventually
Expand All @@ -22,7 +22,7 @@ class S3SourceJsonEnvelopeTest
with AnyFlatSpecLike
with Matchers
with EitherValues
with SourcePartitionSearcherSettingsKeys {
with CloudSourceSettingsKeys {

def DefaultProps: Map[String, String] = defaultProps + (
SOURCE_PARTITION_SEARCH_INTERVAL_MILLIS -> "1000",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package io.lenses.streamreactor.connect.aws.s3.source

import cats.implicits.catsSyntaxEitherId
import io.lenses.streamreactor.connect.aws.s3.source.config.SourcePartitionSearcherSettingsKeys
import io.lenses.streamreactor.connect.aws.s3.storage.AwsS3StorageInterface
import io.lenses.streamreactor.connect.aws.s3.utils.S3ProxyContainerTest
import io.lenses.streamreactor.connect.cloud.common.formats.writer.parquet.ParquetOutputFile
import io.lenses.streamreactor.connect.cloud.common.model.UploadableFile
import io.lenses.streamreactor.connect.cloud.common.source.config.CloudSourceSettingsKeys
import io.lenses.streamreactor.connect.cloud.common.stream.CloudByteArrayOutputStream
import org.apache.avro.SchemaBuilder
import org.apache.kafka.connect.source.SourceRecord
Expand All @@ -28,7 +28,7 @@ class S3SourceParquetEnvelopeTest
with AnyFlatSpecLike
with Matchers
with EitherValues
with SourcePartitionSearcherSettingsKeys {
with CloudSourceSettingsKeys {

def DefaultProps: Map[String, String] = defaultProps + (
SOURCE_PARTITION_SEARCH_INTERVAL_MILLIS -> "1000",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package io.lenses.streamreactor.connect.aws.s3.source
import cats.implicits._
import io.lenses.streamreactor.connect.aws.s3.config.S3ConfigSettings._
import io.lenses.streamreactor.connect.aws.s3.source.S3SourceTaskTest.formats
import io.lenses.streamreactor.connect.aws.s3.source.config.SourcePartitionSearcherSettingsKeys
import io.lenses.streamreactor.connect.aws.s3.utils.S3ProxyContainerTest
import io.lenses.streamreactor.connect.cloud.common.source.config.CloudSourceSettingsKeys
import org.scalatest.EitherValues
import org.scalatest.concurrent.Eventually.eventually
import org.scalatest.flatspec.AnyFlatSpecLike
Expand All @@ -20,7 +20,7 @@ class S3SourceTaskBucketRootTest
with AnyFlatSpecLike
with Matchers
with EitherValues
with SourcePartitionSearcherSettingsKeys {
with CloudSourceSettingsKeys {

def DefaultProps: Map[String, String] = defaultProps + (
SOURCE_PARTITION_SEARCH_INTERVAL_MILLIS -> "1000",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import com.typesafe.scalalogging.LazyLogging
import io.lenses.streamreactor.connect.aws.s3.config.S3ConfigSettings._
import io.lenses.streamreactor.connect.aws.s3.model.location.S3LocationValidator
import io.lenses.streamreactor.connect.aws.s3.source.S3SourceTaskTest.formats
import io.lenses.streamreactor.connect.aws.s3.source.config.SourcePartitionSearcherSettingsKeys
import io.lenses.streamreactor.connect.aws.s3.storage.AwsS3DirectoryLister
import io.lenses.streamreactor.connect.aws.s3.storage.AwsS3StorageInterface
import io.lenses.streamreactor.connect.aws.s3.utils.S3ProxyContainerTest
Expand All @@ -16,6 +15,7 @@ import io.lenses.streamreactor.connect.cloud.common.config.Format.Bytes
import io.lenses.streamreactor.connect.cloud.common.config.FormatOptions
import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation
import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocationValidator
import io.lenses.streamreactor.connect.cloud.common.source.config.CloudSourceSettingsKeys
import io.lenses.streamreactor.connect.cloud.common.storage.DirectoryFindCompletionConfig
import io.lenses.streamreactor.connect.cloud.common.storage.DirectoryFindResults
import org.apache.kafka.connect.source.SourceTaskContext
Expand Down Expand Up @@ -51,7 +51,7 @@ class S3SourceTaskTest
with LazyLogging
with BeforeAndAfter
with Eventually
with SourcePartitionSearcherSettingsKeys {
with CloudSourceSettingsKeys {

override implicit def patienceConfig: PatienceConfig =
PatienceConfig(timeout = Span(10, Seconds), interval = Span(500, Milliseconds))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package io.lenses.streamreactor.connect.aws.s3.source

import cats.implicits._
import io.lenses.streamreactor.connect.aws.s3.source.config.SourcePartitionSearcherSettingsKeys
import io.lenses.streamreactor.connect.aws.s3.storage.AwsS3StorageInterface
import io.lenses.streamreactor.connect.aws.s3.utils.S3ProxyContainerTest
import io.lenses.streamreactor.connect.cloud.common.model.UploadableFile
import io.lenses.streamreactor.connect.cloud.common.source.config.CloudSourceSettingsKeys
import org.apache.kafka.connect.source.SourceRecord
import org.scalatest.EitherValues
import org.scalatest.concurrent.Eventually.eventually
Expand All @@ -22,7 +22,7 @@ class S3SourceTaskXmlReaderTest
with AnyFlatSpecLike
with Matchers
with EitherValues
with SourcePartitionSearcherSettingsKeys {
with CloudSourceSettingsKeys {

def DefaultProps: Map[String, String] = defaultProps + (
SOURCE_PARTITION_SEARCH_INTERVAL_MILLIS -> "1000",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package io.lenses.streamreactor.connect.aws.s3.utils
import com.typesafe.scalalogging.LazyLogging
import io.lenses.streamreactor.connect.aws.s3.auth.AwsS3ClientCreator
import io.lenses.streamreactor.connect.aws.s3.config.AuthMode
import io.lenses.streamreactor.connect.aws.s3.config.S3Config
import io.lenses.streamreactor.connect.aws.s3.config.S3ConnectionConfig
import io.lenses.streamreactor.connect.aws.s3.config.S3ConfigSettings._
import io.lenses.streamreactor.connect.aws.s3.sink.S3SinkTask
import io.lenses.streamreactor.connect.aws.s3.sink.config.S3SinkConfig
import io.lenses.streamreactor.connect.aws.s3.storage.AwsS3StorageInterface
import io.lenses.streamreactor.connect.aws.s3.storage.S3FileMetadata
import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId
Expand All @@ -24,7 +25,7 @@ import java.nio.file.Files
import scala.util.Try

trait S3ProxyContainerTest
extends CloudPlatformEmulatorSuite[S3FileMetadata, AwsS3StorageInterface, S3SinkTask, S3Client]
extends CloudPlatformEmulatorSuite[S3FileMetadata, AwsS3StorageInterface, S3SinkConfig, S3Client, S3SinkTask]
with TaskIndexKey
with LazyLogging {

Expand All @@ -40,7 +41,7 @@ trait S3ProxyContainerTest

override def createClient(): Either[Throwable, S3Client] = {

val s3Config: S3Config = S3Config(
val s3Config: S3ConnectionConfig = S3ConnectionConfig(
region = Some("eu-west-1"),
accessKey = Some(s3Container.identity.identity),
secretKey = Some(s3Container.identity.credential),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package io.lenses.streamreactor.connect.cloud.common.sink
import cats.implicits.catsSyntaxEitherId
import cats.implicits.catsSyntaxOptionId
import cats.implicits.toBifunctorOps
import io.lenses.streamreactor.connect.cloud.common.config.traits.CloudSinkConfig
import io.lenses.streamreactor.connect.cloud.common.storage.FileMetadata
import io.lenses.streamreactor.connect.cloud.common.storage.StorageInterface
import io.lenses.streamreactor.connect.cloud.common.utils.RemoteFileHelper
Expand All @@ -13,8 +14,13 @@ import org.scalatest.flatspec.AnyFlatSpec

import scala.util.Try

trait CloudPlatformEmulatorSuite[SM <: FileMetadata, SI <: StorageInterface[SM], T <: CloudSinkTask[SM], C]
extends AnyFlatSpec
trait CloudPlatformEmulatorSuite[
SM <: FileMetadata,
SI <: StorageInterface[SM],
CSC <: CloudSinkConfig,
C,
T <: CloudSinkTask[SM, CSC, C],
] extends AnyFlatSpec
with BeforeAndAfter
with BeforeAndAfterAll
with RemoteFileHelper[SI] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import io.lenses.streamreactor.common.config.base.const.TraitConfigConst.MAX_RET
import io.lenses.streamreactor.common.config.base.const.TraitConfigConst.RETRY_INTERVAL_PROP_SUFFIX
import com.opencsv.CSVReader
import com.typesafe.scalalogging.LazyLogging
import io.lenses.streamreactor.connect.cloud.common.config.traits.CloudSinkConfig
import io.lenses.streamreactor.connect.cloud.common.formats.AvroFormatReader
import io.lenses.streamreactor.connect.cloud.common.formats.reader.ParquetFormatReader
import io.lenses.streamreactor.connect.cloud.common.formats.writer.BytesFormatWriter
Expand Down Expand Up @@ -43,9 +44,14 @@ import scala.jdk.CollectionConverters.SeqHasAsJava
import scala.util.Failure
import scala.util.Success
import scala.util.Try
abstract class CoreSinkTaskTestCases[SM <: FileMetadata, SI <: StorageInterface[SM], ST <: CloudSinkTask[SM], C](
unitUnderTest: String,
) extends CloudPlatformEmulatorSuite[SM, SI, ST, C]
abstract class CoreSinkTaskTestCases[
SM <: FileMetadata,
SI <: StorageInterface[SM],
CSC <: CloudSinkConfig,
C,
T <: CloudSinkTask[SM, CSC, C],
](unitUnderTest: String,
) extends CloudPlatformEmulatorSuite[SM, SI, CSC, C, T]
with Matchers
with MockitoSugar
with LazyLogging {
Expand Down Expand Up @@ -2059,7 +2065,7 @@ abstract class CoreSinkTaskTestCases[SM <: FileMetadata, SI <: StorageInterface[
struct
}

private def createHeaders[T](keyValuePair: (String, T)*): lang.Iterable[Header] = {
private def createHeaders[HX](keyValuePair: (String, HX)*): lang.Iterable[Header] = {
val headers = new ConnectHeaders()
keyValuePair.foreach {
case (key: String, value) => headers.add(key, value, null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package io.lenses.streamreactor.connect.aws.s3.auth
import cats.implicits.catsSyntaxEitherId
import cats.implicits.toBifunctorOps
import io.lenses.streamreactor.connect.aws.s3.config.AuthMode
import io.lenses.streamreactor.connect.aws.s3.config.S3Config
import io.lenses.streamreactor.connect.aws.s3.config.S3ConnectionConfig
import io.lenses.streamreactor.connect.cloud.common.auth.ClientCreator
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.auth.credentials.AwsCredentials
Expand All @@ -36,14 +36,14 @@ import java.net.URI
import java.time.Duration
import scala.util.Try

object AwsS3ClientCreator extends ClientCreator[S3Config, S3Client] {
object AwsS3ClientCreator extends ClientCreator[S3ConnectionConfig, S3Client] {

private val missingCredentialsError =
"Configured to use credentials however one or both of `AWS_ACCESS_KEY` or `AWS_SECRET_KEY` are missing."

private val defaultCredentialsProvider: AwsCredentialsProvider = DefaultCredentialsProvider.create()

def make(config: S3Config): Either[Throwable, S3Client] =
def make(config: S3ConnectionConfig): Either[Throwable, S3Client] =
for {
retryPolicy <- Try {
RetryPolicy
Expand Down Expand Up @@ -92,7 +92,7 @@ object AwsS3ClientCreator extends ClientCreator[S3Config, S3Client] {
}
} yield s3Client

private def credentialsFromConfig(awsConfig: S3Config): Either[String, AwsCredentialsProvider] =
private def credentialsFromConfig(awsConfig: S3ConnectionConfig): Either[String, AwsCredentialsProvider] =
awsConfig.accessKey.zip(awsConfig.secretKey) match {
case Some((access, secret)) =>
new AwsCredentialsProvider {
Expand All @@ -101,7 +101,7 @@ object AwsS3ClientCreator extends ClientCreator[S3Config, S3Client] {
case None => missingCredentialsError.asLeft
}

private def credentialsProvider(config: S3Config): Either[String, AwsCredentialsProvider] =
private def credentialsProvider(config: S3ConnectionConfig): Either[String, AwsCredentialsProvider] =
config.authMode match {
case AuthMode.Credentials => credentialsFromConfig(config)
case AuthMode.Default => defaultCredentialsProvider.asRight[String]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,6 @@ object S3ConfigSettings {
s"Maximum index files to allow per topic/partition. Advisable to not raise this: if a large number of files build up this means there is a problem with file deletion."
val SEEK_MAX_INDEX_FILES_DEFAULT = 5

val SOURCE_PARTITION_EXTRACTOR_TYPE = s"$CONNECTOR_PREFIX.source.partition.extractor.type"
val SOURCE_PARTITION_EXTRACTOR_TYPE_DOC =
"If you want to read to specific partitions when running the source. Options are 'hierarchical' (to match the sink's hierarchical file storage pattern) and 'regex' (supply a custom regex). Any other value will ignore original partitions and they should be evenly distributed through available partitions (Kafka dependent)."

val SOURCE_PARTITION_EXTRACTOR_REGEX = s"$CONNECTOR_PREFIX.source.partition.extractor.regex"
val SOURCE_PARTITION_EXTRACTOR_REGEX_DOC = "If reading filename from regex, supply the regex here."

val POOL_MAX_CONNECTIONS = s"$CONNECTOR_PREFIX.pool.max.connections"
val POOL_MAX_CONNECTIONS_DOC = "Max connections in pool. -1: Use default according to underlying client."
val POOL_MAX_CONNECTIONS_DEFAULT: Int = -1
Expand Down
Loading

0 comments on commit 047b953

Please sign in to comment.