diff --git a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3SinkTaskAvroEnvelopeTest.scala b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3SinkTaskAvroEnvelopeTest.scala index 2b2026aee..fd1bf9b34 100644 --- a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3SinkTaskAvroEnvelopeTest.scala +++ b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3SinkTaskAvroEnvelopeTest.scala @@ -84,7 +84,7 @@ class S3SinkTaskAvroEnvelopeTest record } - "S3SinkTask" should "write to avro format using V1 format" in { + "S3SinkTask" should "write to avro format" in { testWritingAvro( ( @@ -95,6 +95,17 @@ class S3SinkTaskAvroEnvelopeTest ) } + "S3SinkTask" should "write envelope when * is used as KCQL source" in { + + testWritingAvro( + ( + defaultProps + + ("connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from `*` STOREAS AVRO PROPERTIES('store.envelope'=true, 'padding.length.partition'='12', 'padding.length.offset'='12', '${FlushCount.entryName}'=3)") + ).asJava, + "streamReactorBackups/myTopic/000000000001/000000000003_10001_10003.avro", + ) + } + private def testWritingAvro(props: util.Map[String, String], expected: String) = { val task = new S3SinkTask() val ctx = mock[SinkTaskContext] diff --git a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/source/S3SourceAvroEnvelopeTest.scala b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/source/S3SourceAvroEnvelopeTest.scala index 3600ea656..48c53a61c 100644 --- a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/source/S3SourceAvroEnvelopeTest.scala +++ b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/source/S3SourceAvroEnvelopeTest.scala @@ -7,7 +7,6 @@ import io.lenses.streamreactor.connect.cloud.common.model.UploadableFile import io.lenses.streamreactor.connect.cloud.common.source.config.CloudSourceSettingsKeys import org.apache.avro.SchemaBuilder import org.apache.avro.file.CodecFactory -//import org.apache.avro.file.CodecFactory import org.apache.avro.file.DataFileWriter import org.apache.avro.generic.GenericDatumWriter import org.scalatest.EitherValues diff --git a/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3SinkConfigDefBuilderTest.scala b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3SinkConfigDefBuilderTest.scala index 2aade2f1d..4a09a98ae 100644 --- a/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3SinkConfigDefBuilderTest.scala +++ b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3SinkConfigDefBuilderTest.scala @@ -77,10 +77,9 @@ class S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with Matc "connect.s3.kcql" -> s"insert into mybucket:myprefix select * from $TopicName PARTITIONBY _key STOREAS CSV PROPERTIES('${FlushCount.entryName}'=1)", ) - CloudSinkBucketOptions(connectorTaskId, S3SinkConfigDefBuilder(props)) match { - case Left(value) => fail(value.toString) - case Right(value) => value.map(_.dataStorage) should be(List(DataStorageSettings.Default)) - } + CloudSinkBucketOptions(connectorTaskId, S3SinkConfigDefBuilder(props)).value.map(_.dataStorage) should be( + List(DataStorageSettings.Default), + ) } "S3SinkConfigDefBuilder" should "default all fields to true when envelope is set" in { @@ -88,10 +87,9 @@ class S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with Matc "connect.s3.kcql" -> s"insert into mybucket:myprefix select * from $TopicName PARTITIONBY _key STOREAS `JSON` PROPERTIES('${DataStorageSettings.StoreEnvelopeKey}'=true,'${FlushCount.entryName}'=1)", ) - config.CloudSinkBucketOptions(connectorTaskId, S3SinkConfigDefBuilder(props)) match { - case Left(value) => fail(value.toString) - case Right(value) => value.map(_.dataStorage) should be(List(DataStorageSettings.enabled)) - } + config.CloudSinkBucketOptions(connectorTaskId, S3SinkConfigDefBuilder(props)).value.map(_.dataStorage) should be( + List(DataStorageSettings.enabled), + ) } "S3SinkConfigDefBuilder" should "enable Value and Key only" in { @@ -99,11 +97,10 @@ class S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with Matc "connect.s3.kcql" -> s"insert into mybucket:myprefix select * from $TopicName PARTITIONBY _key STOREAS `PARQUET` PROPERTIES('${DataStorageSettings.StoreEnvelopeKey}'=true, '${DataStorageSettings.StoreKeyKey}'=true, '${DataStorageSettings.StoreValueKey}'=true, '${DataStorageSettings.StoreMetadataKey}'=false, '${DataStorageSettings.StoreHeadersKey}'=false,'${FlushCount.entryName}'=1,'${PartitionIncludeKeys.entryName}'=false)", ) - config.CloudSinkBucketOptions(connectorTaskId, S3SinkConfigDefBuilder(props)) match { - case Left(value) => fail(value.toString) - case Right(value) => - value.map(_.dataStorage) should be(List(DataStorageSettings(true, true, true, false, false))) - } + config.CloudSinkBucketOptions(connectorTaskId, S3SinkConfigDefBuilder(props)).value.map(_.dataStorage) should be( + List(DataStorageSettings(true, true, true, false, false)), + ) + } "S3SinkConfigDefBuilder" should "data storage for each SQL statement" in { @@ -124,13 +121,9 @@ class S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with Matc |""".stripMargin, ) - config.CloudSinkBucketOptions(connectorTaskId, S3SinkConfigDefBuilder(props)) match { - case Left(value) => fail(value.toString) - case Right(value) => - value.map(_.dataStorage) should be(List(DataStorageSettings(true, true, true, false, false), - DataStorageSettings(true, true, true, false, true), - )) - } + config.CloudSinkBucketOptions(connectorTaskId, S3SinkConfigDefBuilder(props)).value.map(_.dataStorage) should be( + List(DataStorageSettings(true, true, true, false, false), DataStorageSettings(true, true, true, false, true)), + ) } "S3SinkConfigDefBuilder" should "respect default flush settings" in { @@ -200,16 +193,9 @@ class S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with Matc "connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName STOREAS `JSON` PROPERTIES('${DataStorageSettings.StoreEnvelopeKey}'=true, '${DataStorageSettings.StoreKeyKey}'=true, '${DataStorageSettings.StoreValueKey}'=true, '${DataStorageSettings.StoreMetadataKey}'=false, '${DataStorageSettings.StoreHeadersKey}'=false, '${FlushCount.entryName}'=1)", ) - config.CloudSinkBucketOptions(connectorTaskId, S3SinkConfigDefBuilder(props)) match { - case Left(value) => fail(value.toString) - case Right(value) => - value.map(_.dataStorage) should be(List(DataStorageSettings(envelope = true, - key = true, - value = true, - metadata = false, - headers = false, - ))) - } + config.CloudSinkBucketOptions(connectorTaskId, S3SinkConfigDefBuilder(props)).value.map(_.dataStorage) should be( + List(DataStorageSettings(envelope = true, key = true, value = true, metadata = false, headers = false)), + ) } "S3SinkConfigDefBuilder" should "return false on escape new lines" in { @@ -217,16 +203,19 @@ class S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with Matc "connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName STOREAS `JSON` PROPERTIES('${DataStorageSettings.StoreEnvelopeKey}'=true, '${DataStorageSettings.StoreKeyKey}'=true, '${DataStorageSettings.StoreValueKey}'=true, '${DataStorageSettings.StoreMetadataKey}'=false, '${DataStorageSettings.StoreHeadersKey}'=false, '${FlushCount.entryName}'=1)", ) - config.CloudSinkBucketOptions(connectorTaskId, S3SinkConfigDefBuilder(props)) match { - case Left(value) => fail(value.toString) - case Right(value) => - value.map(_.dataStorage) should be(List(DataStorageSettings(envelope = true, - key = true, - value = true, - metadata = false, - headers = false, - ))) - } + config.CloudSinkBucketOptions(connectorTaskId, S3SinkConfigDefBuilder(props)).value.map(_.dataStorage) should be( + List(DataStorageSettings(envelope = true, key = true, value = true, metadata = false, headers = false)), + ) + } + + "S3SinkConfigDefBuilder" should "support selecting from *" in { + val props = Map( + "connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from `*` STOREAS `JSON` PROPERTIES('${DataStorageSettings.StoreEnvelopeKey}'=true, '${FlushCount.entryName}'=1)", + ) + + config.CloudSinkBucketOptions(connectorTaskId, S3SinkConfigDefBuilder(props)).value.map(_.dataStorage) should be( + List(DataStorageSettings(envelope = true, key = true, value = true, metadata = true, headers = true)), + ) } "S3SinkConfigDefBuilder" should "error when old BYTES settings used" in { diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/transformers/AddConnectSchemaTransformer.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/transformers/AddConnectSchemaTransformer.scala index af6771151..afc11f25e 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/transformers/AddConnectSchemaTransformer.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/transformers/AddConnectSchemaTransformer.scala @@ -41,7 +41,7 @@ import scala.jdk.CollectionConverters.SeqHasAsJava */ class AddConnectSchemaTransformer(topic: Topic, settings: DataStorageSettings) extends Transformer { override def transform(message: MessageDetail): Either[RuntimeException, MessageDetail] = - if (topic != message.topic) { + if (topic != message.topic && topic != Topic.All) { Left( new RuntimeException( s"Invalid state reached. Schema enrichment transformer topic [${topic.value}] does not match incoming message topic [${message.topic.value}].", diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/transformers/EnvelopeWithSchemaTransformer.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/transformers/EnvelopeWithSchemaTransformer.scala index ba1a7bf0d..f785223f8 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/transformers/EnvelopeWithSchemaTransformer.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/transformers/EnvelopeWithSchemaTransformer.scala @@ -32,7 +32,7 @@ import scala.jdk.CollectionConverters.CollectionHasAsScala */ case class EnvelopeWithSchemaTransformer(topic: Topic, settings: DataStorageSettings) extends Transformer { def transform(message: MessageDetail): Either[RuntimeException, MessageDetail] = - if (message.topic != topic) { + if (message.topic != topic && topic != Topic.All) { Left( new RuntimeException( s"Invalid state reached. Envelope transformer topic [${topic.value}] does not match incoming message topic [${message.topic.value}].", diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/transformers/SchemalessEnvelopeTransformer.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/transformers/SchemalessEnvelopeTransformer.scala index f80a7ac6b..c79570514 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/transformers/SchemalessEnvelopeTransformer.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/transformers/SchemalessEnvelopeTransformer.scala @@ -32,7 +32,7 @@ import scala.jdk.CollectionConverters.SeqHasAsJava */ case class SchemalessEnvelopeTransformer(topic: Topic, settings: DataStorageSettings) extends Transformer { def transform(message: MessageDetail): Either[RuntimeException, MessageDetail] = - if (message.topic != topic) { + if (message.topic != topic && topic != Topic.All) { Left( new RuntimeException( s"Invalid state reached. Envelope transformer topic [${topic.value}] does not match incoming message topic [${message.topic.value}].", diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/transformers/Transformers.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/transformers/Transformers.scala index 6ff834bd4..bc381cbf7 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/transformers/Transformers.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/transformers/Transformers.scala @@ -16,6 +16,7 @@ package io.lenses.streamreactor.connect.cloud.common.sink.transformers import cats.implicits.catsSyntaxEitherId +import com.typesafe.scalalogging.StrictLogging import io.lenses.streamreactor.connect.cloud.common.config.AvroFormatSelection import io.lenses.streamreactor.connect.cloud.common.config.JsonFormatSelection import io.lenses.streamreactor.connect.cloud.common.config.ParquetFormatSelection @@ -40,19 +41,21 @@ case class SequenceTransformer(transformers: Transformer*) extends Transformer { case class TopicsTransformers(transformers: Map[Topic, Transformer]) extends Transformer { def get(topic: Topic): Option[Transformer] = transformers.get(topic) def transform(message: MessageDetail): Either[RuntimeException, MessageDetail] = - transformers.get(message.topic).fold(message.asRight[RuntimeException])(_.transform(message)) + transformers.get(message.topic) + //check to see if there is a `*` + .orElse(transformers.get(Topic.All)) + .fold(message.asRight[RuntimeException])(_.transform(message)) } -object TopicsTransformers { +object TopicsTransformers extends StrictLogging { def from(bucketOptions: Seq[WithTransformableDataStorage]): TopicsTransformers = { - val transformersMap = bucketOptions - .filter(_.sourceTopic.nonEmpty) .foldLeft(Map.empty[Topic, Transformer]) { case (map, bo) => if (bo.dataStorage.hasEnvelope) { - val topic = Topic(bo.sourceTopic.get) + //replace empty with `*`. This is because CloudSinkBucketOptions moves `*` to empty + val topic = Topic(bo.sourceTopic.getOrElse("*")) bo.formatSelection match { case JsonFormatSelection => val transformer = SequenceTransformer(SchemalessEnvelopeTransformer(topic, bo.dataStorage)) diff --git a/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/model/TopicPartitionOffset.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/model/TopicPartitionOffset.scala index d1e8eeb12..2a28351ca 100644 --- a/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/model/TopicPartitionOffset.scala +++ b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/model/TopicPartitionOffset.scala @@ -22,6 +22,12 @@ case class Topic(value: String) extends AnyVal { TopicPartition(this, partition) } +object Topic { + + val All: Topic = Topic("*") + +} + object Offset { implicit def orderingByOffsetValue[A <: Offset]: Ordering[A] =