Skip to content

Commit

Permalink
Feat/5.0 changes (#1008)
Browse files Browse the repository at this point in the history
* An issue was present in the S3 sink handling null value. This was leading to an exception raised since the Connect struct for the envelope does not have the "value" field on null. The code change ensures the transformer does not populate the struct unless the filed is present.

* The PR introduces the support for S3 sink for BigDecimal.  A new case class extending SinkData is introduced. For text storage the safeValue function is overwritten to return the java BigDecimal. The JsonConverter is changed to store decimals as numbers, not base64 (default). While a breaking change, this should not be a cause of concern since BigDecimal was not handled before, so it should have created an error if that was the case.

Unit test and integration tests were added to ensure the storage can handle BigDecimal.

* Initialised the bigdecimal using string literal

* Fix the integration tests compilation errors

* Fix the integration tests compilation errors

* Set the S3 uri used in the test, or otherwise the test fails

---------

Co-authored-by: stheppi <[email protected]>
  • Loading branch information
stheppi and stheppi authored Nov 30, 2023
1 parent 190239b commit 5ed9e7a
Show file tree
Hide file tree
Showing 13 changed files with 642 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,17 @@ import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.PaddingS
import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.PaddingStrategy
import io.lenses.streamreactor.connect.cloud.common.sink.naming.OffsetFileNamer
import io.lenses.streamreactor.connect.cloud.common.sink.naming.CloudKeyNamer
import io.lenses.streamreactor.connect.cloud.common.utils.SampleData.UsersSchemaDecimal
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.connect.data.Decimal
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.SchemaBuilder
import org.apache.kafka.connect.data.Struct
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import java.nio.ByteBuffer

class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyContainerTest {

private val writerManagerCreator = new WriterManagerCreator[S3FileMetadata, S3SinkConfig]()
Expand Down Expand Up @@ -134,6 +138,69 @@ class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont

}

"avro sink" should "write BigDecimal" in {
val sink = writerManagerCreator.from(avroConfig)
val usersWithDecimal1 =
new Struct(UsersSchemaDecimal)
.put("name", "sam")
.put("title", "mr")
.put(
"salary",
BigDecimal("100.43").setScale(18).bigDecimal,
)
val writeRes1 = sink.write(
TopicPartitionOffset(Topic(TopicName), 1, Offset(1L)),
MessageDetail(
NullSinkData(None),
StructSinkData(usersWithDecimal1),
Map.empty,
None,
Topic(TopicName),
1,
Offset(1L),
),
)

writeRes1.isRight should be(true)

val usersWithDecimal2 =
new Struct(UsersSchemaDecimal)
.put("name", "maria")
.put("title", "ms")
.put(
"salary",
BigDecimal("100.43").setScale(18).bigDecimal,
)

val writeRes2 = sink.write(
TopicPartitionOffset(Topic(TopicName), 1, Offset(2L)),
MessageDetail(
NullSinkData(None),
StructSinkData(usersWithDecimal2),
Map.empty,
None,
Topic(TopicName),
1,
Offset(2L),
),
)
writeRes2.isRight should be(true)
sink.close()

listBucketPath(BucketName, "streamReactorBackups/myTopic/1/").size should be(1)

val byteArray = remoteFileAsBytes(BucketName, "streamReactorBackups/myTopic/1/2.avro")
val genericRecords: List[GenericRecord] = avroFormatReader.read(byteArray)
genericRecords.size should be(2)

genericRecords(0).get("name").toString should be("sam")
val bb1 = genericRecords(0).get("salary").asInstanceOf[ByteBuffer]
Decimal.toLogical(Decimal.builder(18).optional().build(), bb1.array()) should be(
BigDecimal(100.43).setScale(18).bigDecimal,
)

}

"avro sink" should "write start a new file in case of schema change" in {

val secondSchema: Schema = SchemaBuilder.struct()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.PaddingS
import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.PaddingStrategy
import io.lenses.streamreactor.connect.cloud.common.sink.naming.OffsetFileNamer
import io.lenses.streamreactor.connect.cloud.common.sink.naming.CloudKeyNamer
import io.lenses.streamreactor.connect.cloud.common.utils.SampleData.UsersSchemaDecimal
import org.apache.kafka.connect.data.Struct
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
Expand Down Expand Up @@ -176,4 +177,74 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
)
}

"json sink" should "write bigdecimal to json" in {

val bucketAndPrefix = CloudLocation(BucketName, PathPrefix.some)
val config = S3SinkConfig(
S3Config(
None,
Some(s3Container.identity.identity),
Some(s3Container.identity.credential),
AuthMode.Credentials,
),
bucketOptions = Seq(
CloudSinkBucketOptions(
TopicName.some,
bucketAndPrefix,
commitPolicy = CommitPolicy(Count(1)),
formatSelection = JsonFormatSelection,
keyNamer = new CloudKeyNamer(
AvroFormatSelection,
defaultPartitionSelection(Values),
new OffsetFileNamer(
identity[String],
JsonFormatSelection.extension,
),
new PaddingService(Map[String, PaddingStrategy](
"partition" -> NoOpPaddingStrategy,
"offset" -> LeftPadPaddingStrategy(12, 0),
)),
),
localStagingArea = LocalStagingArea(localRoot),
partitionSelection = defaultPartitionSelection(Values),
dataStorage = DataStorageSettings.disabled,
),
),
offsetSeekerOptions = OffsetSeekerOptions(5),
compressionCodec,
batchDelete = true,
)

val sink = writerManagerCreator.from(config)

val topic = Topic(TopicName)
val offset = Offset(1L)
val usersWithDecimal =
new Struct(UsersSchemaDecimal)
.put("name", "sam")
.put("title", "mr")
.put(
"salary",
BigDecimal("100.43").setScale(18).bigDecimal,
)
sink.write(
TopicPartitionOffset(topic, 1, offset),
MessageDetail(NullSinkData(None),
StructSinkData(usersWithDecimal),
Map.empty[String, SinkData],
None,
topic,
0,
offset,
),
)

sink.close()

listBucketPath(BucketName, "streamReactorBackups/myTopic/1/").size should be(1)

remoteFileAsString(BucketName, "streamReactorBackups/myTopic/1/1.json") should be(
s"""{"name":"sam","title":"mr","salary":100.430000000000000000}""",
)
}
}
Loading

0 comments on commit 5ed9e7a

Please sign in to comment.