Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LC-202 Backing up multiple topics at once #1220

Merged
merged 7 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class S3SinkTaskAvroEnvelopeTest
record
}

"S3SinkTask" should "write to avro format using V1 format" in {
"S3SinkTask" should "write to avro format" in {

testWritingAvro(
(
Expand All @@ -95,6 +95,17 @@ class S3SinkTaskAvroEnvelopeTest
)
}

"S3SinkTask" should "write envelope when * is used as KCQL source" in {

testWritingAvro(
(
defaultProps +
("connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from `*` STOREAS AVRO PROPERTIES('store.envelope'=true, 'padding.length.partition'='12', 'padding.length.offset'='12', '${FlushCount.entryName}'=3)")
).asJava,
"streamReactorBackups/myTopic/000000000001/000000000003_10001_10003.avro",
)
}

private def testWritingAvro(props: util.Map[String, String], expected: String) = {
val task = new S3SinkTask()
val ctx = mock[SinkTaskContext]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import io.lenses.streamreactor.connect.cloud.common.model.UploadableFile
import io.lenses.streamreactor.connect.cloud.common.source.config.CloudSourceSettingsKeys
import org.apache.avro.SchemaBuilder
import org.apache.avro.file.CodecFactory
//import org.apache.avro.file.CodecFactory
import org.apache.avro.file.DataFileWriter
import org.apache.avro.generic.GenericDatumWriter
import org.scalatest.EitherValues
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,33 +77,30 @@ class S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with Matc
"connect.s3.kcql" -> s"insert into mybucket:myprefix select * from $TopicName PARTITIONBY _key STOREAS CSV PROPERTIES('${FlushCount.entryName}'=1)",
)

CloudSinkBucketOptions(connectorTaskId, S3SinkConfigDefBuilder(props)) match {
case Left(value) => fail(value.toString)
case Right(value) => value.map(_.dataStorage) should be(List(DataStorageSettings.Default))
}
CloudSinkBucketOptions(connectorTaskId, S3SinkConfigDefBuilder(props)).value.map(_.dataStorage) should be(
List(DataStorageSettings.Default),
)
}

"S3SinkConfigDefBuilder" should "default all fields to true when envelope is set" in {
val props = Map(
"connect.s3.kcql" -> s"insert into mybucket:myprefix select * from $TopicName PARTITIONBY _key STOREAS `JSON` PROPERTIES('${DataStorageSettings.StoreEnvelopeKey}'=true,'${FlushCount.entryName}'=1)",
)

config.CloudSinkBucketOptions(connectorTaskId, S3SinkConfigDefBuilder(props)) match {
case Left(value) => fail(value.toString)
case Right(value) => value.map(_.dataStorage) should be(List(DataStorageSettings.enabled))
}
config.CloudSinkBucketOptions(connectorTaskId, S3SinkConfigDefBuilder(props)).value.map(_.dataStorage) should be(
List(DataStorageSettings.enabled),
)
}

"S3SinkConfigDefBuilder" should "enable Value and Key only" in {
val props = Map(
"connect.s3.kcql" -> s"insert into mybucket:myprefix select * from $TopicName PARTITIONBY _key STOREAS `PARQUET` PROPERTIES('${DataStorageSettings.StoreEnvelopeKey}'=true, '${DataStorageSettings.StoreKeyKey}'=true, '${DataStorageSettings.StoreValueKey}'=true, '${DataStorageSettings.StoreMetadataKey}'=false, '${DataStorageSettings.StoreHeadersKey}'=false,'${FlushCount.entryName}'=1,'${PartitionIncludeKeys.entryName}'=false)",
)

config.CloudSinkBucketOptions(connectorTaskId, S3SinkConfigDefBuilder(props)) match {
case Left(value) => fail(value.toString)
case Right(value) =>
value.map(_.dataStorage) should be(List(DataStorageSettings(true, true, true, false, false)))
}
config.CloudSinkBucketOptions(connectorTaskId, S3SinkConfigDefBuilder(props)).value.map(_.dataStorage) should be(
List(DataStorageSettings(true, true, true, false, false)),
)

}

"S3SinkConfigDefBuilder" should "data storage for each SQL statement" in {
Expand All @@ -124,13 +121,9 @@ class S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with Matc
|""".stripMargin,
)

config.CloudSinkBucketOptions(connectorTaskId, S3SinkConfigDefBuilder(props)) match {
case Left(value) => fail(value.toString)
case Right(value) =>
value.map(_.dataStorage) should be(List(DataStorageSettings(true, true, true, false, false),
DataStorageSettings(true, true, true, false, true),
))
}
config.CloudSinkBucketOptions(connectorTaskId, S3SinkConfigDefBuilder(props)).value.map(_.dataStorage) should be(
List(DataStorageSettings(true, true, true, false, false), DataStorageSettings(true, true, true, false, true)),
)

}
"S3SinkConfigDefBuilder" should "respect default flush settings" in {
Expand Down Expand Up @@ -200,33 +193,29 @@ class S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with Matc
"connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName STOREAS `JSON` PROPERTIES('${DataStorageSettings.StoreEnvelopeKey}'=true, '${DataStorageSettings.StoreKeyKey}'=true, '${DataStorageSettings.StoreValueKey}'=true, '${DataStorageSettings.StoreMetadataKey}'=false, '${DataStorageSettings.StoreHeadersKey}'=false, '${FlushCount.entryName}'=1)",
)

config.CloudSinkBucketOptions(connectorTaskId, S3SinkConfigDefBuilder(props)) match {
case Left(value) => fail(value.toString)
case Right(value) =>
value.map(_.dataStorage) should be(List(DataStorageSettings(envelope = true,
key = true,
value = true,
metadata = false,
headers = false,
)))
}
config.CloudSinkBucketOptions(connectorTaskId, S3SinkConfigDefBuilder(props)).value.map(_.dataStorage) should be(
List(DataStorageSettings(envelope = true, key = true, value = true, metadata = false, headers = false)),
)
}

"S3SinkConfigDefBuilder" should "return false on escape new lines" in {
val props = Map(
"connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName STOREAS `JSON` PROPERTIES('${DataStorageSettings.StoreEnvelopeKey}'=true, '${DataStorageSettings.StoreKeyKey}'=true, '${DataStorageSettings.StoreValueKey}'=true, '${DataStorageSettings.StoreMetadataKey}'=false, '${DataStorageSettings.StoreHeadersKey}'=false, '${FlushCount.entryName}'=1)",
)

config.CloudSinkBucketOptions(connectorTaskId, S3SinkConfigDefBuilder(props)) match {
case Left(value) => fail(value.toString)
case Right(value) =>
value.map(_.dataStorage) should be(List(DataStorageSettings(envelope = true,
key = true,
value = true,
metadata = false,
headers = false,
)))
}
config.CloudSinkBucketOptions(connectorTaskId, S3SinkConfigDefBuilder(props)).value.map(_.dataStorage) should be(
List(DataStorageSettings(envelope = true, key = true, value = true, metadata = false, headers = false)),
)
}

"S3SinkConfigDefBuilder" should "support selecting from *" in {
val props = Map(
"connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from `*` STOREAS `JSON` PROPERTIES('${DataStorageSettings.StoreEnvelopeKey}'=true, '${FlushCount.entryName}'=1)",
)

config.CloudSinkBucketOptions(connectorTaskId, S3SinkConfigDefBuilder(props)).value.map(_.dataStorage) should be(
List(DataStorageSettings(envelope = true, key = true, value = true, metadata = true, headers = true)),
)
}

"S3SinkConfigDefBuilder" should "error when old BYTES settings used" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import scala.jdk.CollectionConverters.SeqHasAsJava
*/
class AddConnectSchemaTransformer(topic: Topic, settings: DataStorageSettings) extends Transformer {
override def transform(message: MessageDetail): Either[RuntimeException, MessageDetail] =
if (topic != message.topic) {
if (topic != message.topic && topic != Topic.All) {
Left(
new RuntimeException(
s"Invalid state reached. Schema enrichment transformer topic [${topic.value}] does not match incoming message topic [${message.topic.value}].",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import scala.jdk.CollectionConverters.CollectionHasAsScala
*/
case class EnvelopeWithSchemaTransformer(topic: Topic, settings: DataStorageSettings) extends Transformer {
def transform(message: MessageDetail): Either[RuntimeException, MessageDetail] =
if (message.topic != topic) {
if (message.topic != topic && topic != Topic.All) {
Left(
new RuntimeException(
s"Invalid state reached. Envelope transformer topic [${topic.value}] does not match incoming message topic [${message.topic.value}].",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import scala.jdk.CollectionConverters.SeqHasAsJava
*/
case class SchemalessEnvelopeTransformer(topic: Topic, settings: DataStorageSettings) extends Transformer {
def transform(message: MessageDetail): Either[RuntimeException, MessageDetail] =
if (message.topic != topic) {
if (message.topic != topic && topic != Topic.All) {
Left(
new RuntimeException(
s"Invalid state reached. Envelope transformer topic [${topic.value}] does not match incoming message topic [${message.topic.value}].",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.lenses.streamreactor.connect.cloud.common.sink.transformers

import cats.implicits.catsSyntaxEitherId
import com.typesafe.scalalogging.StrictLogging
import io.lenses.streamreactor.connect.cloud.common.config.AvroFormatSelection
import io.lenses.streamreactor.connect.cloud.common.config.JsonFormatSelection
import io.lenses.streamreactor.connect.cloud.common.config.ParquetFormatSelection
Expand All @@ -40,19 +41,21 @@ case class SequenceTransformer(transformers: Transformer*) extends Transformer {
case class TopicsTransformers(transformers: Map[Topic, Transformer]) extends Transformer {
def get(topic: Topic): Option[Transformer] = transformers.get(topic)
def transform(message: MessageDetail): Either[RuntimeException, MessageDetail] =
transformers.get(message.topic).fold(message.asRight[RuntimeException])(_.transform(message))
transformers.get(message.topic)
//check to see if there is a `*`
.orElse(transformers.get(Topic.All))
.fold(message.asRight[RuntimeException])(_.transform(message))
}

object TopicsTransformers {
object TopicsTransformers extends StrictLogging {
def from(bucketOptions: Seq[WithTransformableDataStorage]): TopicsTransformers = {

val transformersMap =
bucketOptions
.filter(_.sourceTopic.nonEmpty)
.foldLeft(Map.empty[Topic, Transformer]) {
case (map, bo) =>
if (bo.dataStorage.hasEnvelope) {
val topic = Topic(bo.sourceTopic.get)
//replace empty with `*`. This is because CloudSinkBucketOptions moves `*` to empty
val topic = Topic(bo.sourceTopic.getOrElse("*"))
davidsloan marked this conversation as resolved.
Show resolved Hide resolved
bo.formatSelection match {
case JsonFormatSelection =>
val transformer = SequenceTransformer(SchemalessEnvelopeTransformer(topic, bo.dataStorage))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ case class Topic(value: String) extends AnyVal {
TopicPartition(this, partition)
}

object Topic {

val All: Topic = Topic("*")

}

object Offset {

implicit def orderingByOffsetValue[A <: Offset]: Ordering[A] =
Expand Down
Loading