Skip to content

Commit

Permalink
Further refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
davidsloan committed Oct 2, 2023
1 parent bdb5bc8 commit 5825694
Show file tree
Hide file tree
Showing 253 changed files with 1,822 additions and 1,688 deletions.
6 changes: 2 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ lazy val `cloud-common` = (project in file("kafka-connect-cloud-common"))
Seq(
name := "kafka-connect-cloud-common",
description := "Kafka Connect compatible connectors to move data between Kafka and popular data stores",
libraryDependencies ++= baseDeps ++ kafkaConnectS3Deps,
libraryDependencies ++= baseDeps ++ kafkaConnectCloudCommonDeps,
publish / skip := true,
packExcludeJars := Seq(
"scala-.*\\.jar",
Expand All @@ -91,13 +91,11 @@ lazy val `cloud-common` = (project in file("kafka-connect-cloud-common"))
)
.configureAssembly()
.configureTests(baseTestDeps)
//.configureIntegrationTests(kafkaConnectS3TestDeps)
//.configureFunctionalTests(kafkaConnectS3FuncTestDeps)
.enablePlugins(PackPlugin)

lazy val `aws-s3` = (project in file("kafka-connect-aws-s3"))
.dependsOn(common)
.dependsOn(`cloud-common`)
.dependsOn(`cloud-common` % "compile->compile;test->test")
.dependsOn(`test-common` % "fun->compile")
.settings(
settings ++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,20 @@ package io.lenses.streamreactor.connect.aws.s3.formats
import io.lenses.streamreactor.connect.aws.s3.utils.ITSampleSchemaAndData.firstUsers
import io.lenses.streamreactor.connect.aws.s3.utils.ITSampleSchemaAndData.users
import io.lenses.streamreactor.connect.aws.s3.utils.S3ProxyContainerTest
import io.lenses.streamreactor.connect.aws.s3.utils.SampleData.checkRecord
import io.lenses.streamreactor.connect.aws.s3.utils.SampleData.topic
import io.lenses.streamreactor.connect.cloud.model.CompressionCodecName.UNCOMPRESSED
import io.lenses.streamreactor.connect.cloud.model.location.FileUtils.toBufferedOutputStream
import io.lenses.streamreactor.connect.cloud.config.AvroFormatSelection
import io.lenses.streamreactor.connect.cloud.formats.writer.AvroFormatWriter
import io.lenses.streamreactor.connect.cloud.formats.writer.MessageDetail
import io.lenses.streamreactor.connect.cloud.formats.writer.NullSinkData
import io.lenses.streamreactor.connect.cloud.formats.writer.StructSinkData
import io.lenses.streamreactor.connect.cloud.model.CompressionCodec
import io.lenses.streamreactor.connect.cloud.model.Offset
import io.lenses.streamreactor.connect.cloud.model.Topic
import io.lenses.streamreactor.connect.cloud.stream.BuildLocalOutputStream
import io.lenses.streamreactor.connect.cloud.common.utils.SampleData.checkRecord
import io.lenses.streamreactor.connect.cloud.common.utils.SampleData.topic
import io.lenses.streamreactor.connect.cloud.common.config.AvroFormatSelection
import io.lenses.streamreactor.connect.cloud.common.formats.AvroFormatReader
import io.lenses.streamreactor.connect.cloud.common.formats.writer
import io.lenses.streamreactor.connect.cloud.common.formats.writer.AvroFormatWriter
import io.lenses.streamreactor.connect.cloud.common.formats.writer.NullSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.StructSinkData
import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodecName.UNCOMPRESSED
import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodec
import io.lenses.streamreactor.connect.cloud.common.model.Offset
import io.lenses.streamreactor.connect.cloud.common.model.Topic
import io.lenses.streamreactor.connect.cloud.common.model.location.FileUtils.toBufferedOutputStream
import io.lenses.streamreactor.connect.cloud.common.stream.BuildLocalOutputStream
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

Expand All @@ -47,13 +48,13 @@ class AvroFormatWriterStreamTest extends AnyFlatSpec with Matchers with S3ProxyC
val blobStream = new BuildLocalOutputStream(toBufferedOutputStream(localFile), Topic("testTopic").withPartition(1))

val avroFormatWriter = new AvroFormatWriter(blobStream)
avroFormatWriter.write(MessageDetail(NullSinkData(None),
StructSinkData(users.head),
Map.empty,
None,
topic,
1,
Offset(1),
avroFormatWriter.write(writer.MessageDetail(NullSinkData(None),
StructSinkData(users.head),
Map.empty,
None,
topic,
1,
Offset(1),
))
avroFormatWriter.complete() should be(Right(()))
val bytes = localFileAsBytes(localFile)
Expand Down Expand Up @@ -109,7 +110,7 @@ class AvroFormatWriterStreamTest extends AnyFlatSpec with Matchers with S3ProxyC
val avroFormatWriter = new AvroFormatWriter(blobStream)
firstUsers.foreach(u =>
avroFormatWriter.write(
MessageDetail(NullSinkData(None), StructSinkData(u), Map.empty, None, topic, 1, Offset(2)),
writer.MessageDetail(NullSinkData(None), StructSinkData(u), Map.empty, None, topic, 1, Offset(2)),
) should be(
Right(()),
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,24 @@

package io.lenses.streamreactor.connect.aws.s3.formats

import io.lenses.streamreactor.connect.cloud.model.CompressionCodecName.BROTLI
import io.lenses.streamreactor.connect.cloud.model.CompressionCodecName.LZ4
import io.lenses.streamreactor.connect.cloud.model.CompressionCodecName.LZO
import io.lenses.streamreactor.connect.cloud.model.CompressionCodecName.UNCOMPRESSED
import io.lenses.streamreactor.connect.aws.s3.utils.ITSampleSchemaAndData.firstUsers
import io.lenses.streamreactor.connect.aws.s3.utils.ITSampleSchemaAndData.users
import io.lenses.streamreactor.connect.aws.s3.utils.S3ProxyContainerTest
import io.lenses.streamreactor.connect.aws.s3.utils.SampleData.checkRecord
import io.lenses.streamreactor.connect.aws.s3.utils.SampleData.topic
import io.lenses.streamreactor.connect.cloud.model.location.FileUtils.toBufferedOutputStream
import io.lenses.streamreactor.connect.cloud.config.ParquetFormatSelection
import io.lenses.streamreactor.connect.cloud.formats.reader.ParquetFormatReader
import io.lenses.streamreactor.connect.cloud.formats.writer.ArraySinkData
import io.lenses.streamreactor.connect.cloud.formats.writer.MapSinkData
import io.lenses.streamreactor.connect.cloud.formats.writer.MessageDetail
import io.lenses.streamreactor.connect.cloud.formats.writer.NullSinkData
import io.lenses.streamreactor.connect.cloud.formats.writer.ParquetFormatWriter
import io.lenses.streamreactor.connect.cloud.formats.writer.StructSinkData
import io.lenses.streamreactor.connect.cloud.model.CompressionCodec
import io.lenses.streamreactor.connect.cloud.model.Offset
import io.lenses.streamreactor.connect.cloud.model.Topic
import io.lenses.streamreactor.connect.cloud.stream.BuildLocalOutputStream
import io.lenses.streamreactor.connect.cloud.common.utils.SampleData.checkRecord
import io.lenses.streamreactor.connect.cloud.common.utils.SampleData.topic
import io.lenses.streamreactor.connect.cloud.common.config.ParquetFormatSelection
import io.lenses.streamreactor.connect.cloud.common.formats.reader.ParquetFormatReader
import io.lenses.streamreactor.connect.cloud.common.formats.writer
import io.lenses.streamreactor.connect.cloud.common.formats.writer._
import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodec
import io.lenses.streamreactor.connect.cloud.common.model.Offset
import io.lenses.streamreactor.connect.cloud.common.model.Topic
import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodecName.BROTLI
import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodecName.LZ4
import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodecName.LZO
import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodecName.UNCOMPRESSED
import io.lenses.streamreactor.connect.cloud.common.model.location.FileUtils.toBufferedOutputStream
import io.lenses.streamreactor.connect.cloud.common.stream.BuildLocalOutputStream
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.SchemaBuilder
import org.scalatest.EitherValues
Expand All @@ -60,31 +56,31 @@ class ParquetFormatWriterStreamTest extends AnyFlatSpec with Matchers with S3Pro

val blobStream = new BuildLocalOutputStream(toBufferedOutputStream(localFile), Topic("testTopic").withPartition(1))
val parquetFormatWriter = new ParquetFormatWriter(blobStream)
parquetFormatWriter.write(MessageDetail(NullSinkData(None),
StructSinkData(users.head),
Map.empty,
None,
topic,
1,
Offset(1),
parquetFormatWriter.write(writer.MessageDetail(NullSinkData(None),
StructSinkData(users.head),
Map.empty,
None,
topic,
1,
Offset(1),
))
parquetFormatWriter.getPointer should be(21)
parquetFormatWriter.write(MessageDetail(NullSinkData(None),
StructSinkData(users(1)),
Map.empty,
None,
topic,
1,
Offset(2),
parquetFormatWriter.write(writer.MessageDetail(NullSinkData(None),
StructSinkData(users(1)),
Map.empty,
None,
topic,
1,
Offset(2),
))
parquetFormatWriter.getPointer should be(44)
parquetFormatWriter.write(MessageDetail(NullSinkData(None),
StructSinkData(users(2)),
Map.empty,
None,
topic,
1,
Offset(3),
parquetFormatWriter.write(writer.MessageDetail(NullSinkData(None),
StructSinkData(users(2)),
Map.empty,
None,
topic,
1,
Offset(3),
))
parquetFormatWriter.getPointer should be(59)
parquetFormatWriter.complete() should be(Right(()))
Expand All @@ -111,7 +107,7 @@ class ParquetFormatWriterStreamTest extends AnyFlatSpec with Matchers with S3Pro
val blobStream = new BuildLocalOutputStream(toBufferedOutputStream(localFile), Topic("testTopic").withPartition(1))
val parquetFormatWriter = new ParquetFormatWriter(blobStream)
parquetFormatWriter.write(
MessageDetail(
writer.MessageDetail(
NullSinkData(None),
ArraySinkData(
Seq(
Expand All @@ -136,7 +132,7 @@ class ParquetFormatWriterStreamTest extends AnyFlatSpec with Matchers with S3Pro
val blobStream = new BuildLocalOutputStream(toBufferedOutputStream(localFile), Topic("testTopic").withPartition(1))
val parquetFormatWriter = new ParquetFormatWriter(blobStream)
parquetFormatWriter.write(
MessageDetail(
writer.MessageDetail(
NullSinkData(None),
MapSinkData(
Map(
Expand Down Expand Up @@ -189,13 +185,13 @@ class ParquetFormatWriterStreamTest extends AnyFlatSpec with Matchers with S3Pro

val parquetFormatWriter = new ParquetFormatWriter(blobStream)
firstUsers.foreach(u =>
parquetFormatWriter.write(MessageDetail(NullSinkData(None),
StructSinkData(u),
Map.empty,
None,
topic,
1,
Offset(1),
parquetFormatWriter.write(writer.MessageDetail(NullSinkData(None),
StructSinkData(u),
Map.empty,
None,
topic,
1,
Offset(1),
)) should be(
Right(()),
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,37 @@ package io.lenses.streamreactor.connect.aws.s3.sink

import cats.implicits.catsSyntaxOptionId
import io.lenses.streamreactor.connect.aws.s3.config._
import io.lenses.streamreactor.connect.aws.s3.formats.AvroFormatReader
import io.lenses.streamreactor.connect.cloud.model.CompressionCodecName.UNCOMPRESSED
import io.lenses.streamreactor.connect.aws.s3.model.location.S3LocationValidator
import io.lenses.streamreactor.connect.cloud.sink.config.PartitionSelection.defaultPartitionSelection
import io.lenses.streamreactor.connect.aws.s3.sink.config.OffsetSeekerOptions
import io.lenses.streamreactor.connect.cloud.sink.config.PartitionDisplay.Values
import io.lenses.streamreactor.connect.aws.s3.sink.config.S3SinkConfig
import io.lenses.streamreactor.connect.aws.s3.sink.config.SinkBucketOptions
import io.lenses.streamreactor.connect.aws.s3.sink.naming.OffsetS3FileNamer
import io.lenses.streamreactor.connect.aws.s3.sink.naming.S3KeyNamer
import io.lenses.streamreactor.connect.aws.s3.utils.ITSampleSchemaAndData.firstUsers
import io.lenses.streamreactor.connect.aws.s3.utils.S3ProxyContainerTest
import io.lenses.streamreactor.connect.cloud.config.AvroFormatSelection
import io.lenses.streamreactor.connect.cloud.config.ConnectorTaskId
import io.lenses.streamreactor.connect.cloud.config.DataStorageSettings
import io.lenses.streamreactor.connect.cloud.formats.writer.MessageDetail
import io.lenses.streamreactor.connect.cloud.formats.writer.NullSinkData
import io.lenses.streamreactor.connect.cloud.formats.writer.SinkData
import io.lenses.streamreactor.connect.cloud.formats.writer.StructSinkData
import io.lenses.streamreactor.connect.cloud.model.location.CloudLocation
import io.lenses.streamreactor.connect.cloud.model.Offset
import io.lenses.streamreactor.connect.cloud.model.Topic
import io.lenses.streamreactor.connect.cloud.model.TopicPartitionOffset
import io.lenses.streamreactor.connect.cloud.sink.commit.CommitPolicy
import io.lenses.streamreactor.connect.cloud.sink.commit.Count
import io.lenses.streamreactor.connect.cloud.sink.config.LocalStagingArea
import io.lenses.streamreactor.connect.cloud.sink.config.padding.LeftPadPaddingStrategy
import io.lenses.streamreactor.connect.cloud.sink.config.padding.NoOpPaddingStrategy
import io.lenses.streamreactor.connect.cloud.sink.config.padding.PaddingService
import io.lenses.streamreactor.connect.cloud.sink.config.padding.PaddingStrategy
import io.lenses.streamreactor.connect.cloud.common.config.AvroFormatSelection
import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId
import io.lenses.streamreactor.connect.cloud.common.config.DataStorageSettings
import io.lenses.streamreactor.connect.cloud.common.formats.AvroFormatReader
import io.lenses.streamreactor.connect.cloud.common.formats.writer
import io.lenses.streamreactor.connect.cloud.common.formats.writer.MessageDetail
import io.lenses.streamreactor.connect.cloud.common.formats.writer.NullSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.SinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.StructSinkData
import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodecName.UNCOMPRESSED
import io.lenses.streamreactor.connect.cloud.common.model.Offset
import io.lenses.streamreactor.connect.cloud.common.model.Topic
import io.lenses.streamreactor.connect.cloud.common.model.TopicPartitionOffset
import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation
import io.lenses.streamreactor.connect.cloud.common.sink.commit.CommitPolicy
import io.lenses.streamreactor.connect.cloud.common.sink.commit.Count
import io.lenses.streamreactor.connect.cloud.common.sink.config.LocalStagingArea
import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionDisplay.Values
import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionSelection.defaultPartitionSelection
import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.LeftPadPaddingStrategy
import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.NoOpPaddingStrategy
import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.PaddingService
import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.PaddingStrategy
import io.lenses.streamreactor.connect.cloud.common.sink.naming.OffsetS3FileNamer
import io.lenses.streamreactor.connect.cloud.common.sink.naming.S3KeyNamer
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.SchemaBuilder
Expand Down Expand Up @@ -104,7 +105,7 @@ class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
)

"avro sink" should "write 2 records to avro format in s3" in {
val sink = S3WriterManager.from(avroConfig)
val sink = S3WriterManagerCreator.from(avroConfig)
firstUsers.zipWithIndex.foreach {
case (struct: Struct, index: Int) =>
val writeRes = sink.write(
Expand Down Expand Up @@ -148,18 +149,18 @@ class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
new Struct(secondSchema).put("name", "coco").put("designation", null).put("salary", 395.44),
)

val sink = S3WriterManager.from(avroConfig)
val sink = S3WriterManagerCreator.from(avroConfig)
firstUsers.concat(usersWithNewSchema).zipWithIndex.foreach {
case (user, index) =>
sink.write(
TopicPartitionOffset(Topic(TopicName), 1, Offset((index + 1).toLong)),
MessageDetail(NullSinkData(None),
StructSinkData(user),
Map.empty[String, SinkData],
None,
Topic(TopicName),
1,
Offset((index + 1).toLong),
writer.MessageDetail(NullSinkData(None),
StructSinkData(user),
Map.empty[String, SinkData],
None,
Topic(TopicName),
1,
Offset((index + 1).toLong),
),
)
}
Expand Down
Loading

0 comments on commit 5825694

Please sign in to comment.