diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/consumers/ConsumerGroupsWriter.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/consumers/ConsumerGroupsWriter.scala index 048bc505f..07a0078fe 100644 --- a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/consumers/ConsumerGroupsWriter.scala +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/consumers/ConsumerGroupsWriter.scala @@ -37,25 +37,37 @@ class ConsumerGroupsWriter(location: S3ObjectKey, uploader: Uploader, taskId: Co records.traverse(extractOffsets) .map(_.flatten) .map { offsets => - offsets.foldLeft(Map.empty[GroupTopicPartition, Long]) { - case (acc, OffsetDetails(key, metadata)) => - acc + (key.key -> metadata.offset) + offsets.foldLeft(Map.empty[GroupTopicPartition, OffsetAction]) { + case (acc, action: OffsetAction) => + acc + (action.key -> action) } }.flatMap { map => map.toList.traverse { - case (groupTopicPartition, offset) => - val content = ByteBuffer.allocate(8).putLong(offset).rewind() + case (groupTopicPartition, action) => val s3KeySuffix = s"${groupTopicPartition.group}/${groupTopicPartition.topic}/${groupTopicPartition.partition}" val s3Key = location.prefix.fold(s3KeySuffix)(prefix => s"$prefix/$s3KeySuffix") - logger.debug(s"[${taskId.show}] Uploading offset $offset to $s3Key") - val result = uploader.upload( - content, - location.bucket, - s3Key, - ) - logger.debug(s"[${taskId.show}] Uploaded offset $offset to $s3Key") - result + + action match { + case WriteOffset(offset) => + val content = ByteBuffer.allocate(8).putLong(offset.metadata.offset).rewind() + logger.debug(s"[${taskId.show}] Uploading offset $offset to $s3Key") + val result = uploader.upload( + content, + location.bucket, + s3Key, + ) + logger.debug(s"[${taskId.show}] Uploaded offset $offset to $s3Key") + result + case DeleteOffset(_) => + logger.debug(s"[${taskId.show}] Deleting offset $s3Key") + val result = uploader.delete( + location.bucket, + s3Key, + ) + logger.debug(s"[${taskId.show}] Deleted offset $s3Key") + result + } }.map(_ => ()) } } @@ -68,7 +80,7 @@ object ConsumerGroupsWriter extends StrictLogging { * @param record The [[SinkRecord]] to extract the offset details from. * @return Either an error or the offset details. */ - def extractOffsets(record: SinkRecord): Either[Throwable, Option[OffsetDetails]] = + def extractOffsets(record: SinkRecord): Either[Throwable, Option[OffsetAction]] = Option(record.key()) match { case None => Right(None) case Some(key) => @@ -92,9 +104,10 @@ object ConsumerGroupsWriter extends StrictLogging { ) metadata <- OffsetAndMetadata.from(ByteBuffer.wrap(valueBytes)) } yield { - Some(OffsetDetails(key, metadata)) + Some(WriteOffset(OffsetDetails(key, metadata))) } - case None => Right(None) + case None => + Right(Some(DeleteOffset(key.key))) } } yield value } else { @@ -116,3 +129,12 @@ object ConsumerGroupsWriter extends StrictLogging { } case class OffsetDetails(key: OffsetKey, metadata: OffsetAndMetadata) + +sealed trait OffsetAction { + def key: GroupTopicPartition +} + +case class WriteOffset(offset: OffsetDetails) extends OffsetAction { + override def key: GroupTopicPartition = offset.key.key +} +case class DeleteOffset(key: GroupTopicPartition) extends OffsetAction diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/storage/AwsS3Uploader.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/storage/AwsS3Uploader.scala index 0994ec82d..3cdf45de2 100644 --- a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/storage/AwsS3Uploader.scala +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/storage/AwsS3Uploader.scala @@ -21,6 +21,7 @@ import com.typesafe.scalalogging.LazyLogging import io.lenses.streamreactor.connect.aws.s3.config.ConnectorTaskId import software.amazon.awssdk.core.sync.RequestBody import software.amazon.awssdk.services.s3.S3Client +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest import software.amazon.awssdk.services.s3.model.PutObjectRequest import java.nio.ByteBuffer @@ -47,4 +48,16 @@ class AwsS3Uploader(s3Client: S3Client, connectorTaskId: ConnectorTaskId) extend ex } } + + override def delete(bucket: String, path: String): Either[Throwable, Unit] = { + logger.debug(s"[{}] AWS Deleting from s3 {}:{}", connectorTaskId.show, bucket, path) + Try { + s3Client.deleteObject(DeleteObjectRequest.builder().bucket(bucket).key(path).build()) + logger.debug(s"[{}] Completed delete from s3 {}:{}", connectorTaskId.show, bucket, path) + } + .toEither.leftMap { ex => + logger.error(s"[{}] Failed delete from s3 {}:{}", connectorTaskId.show, bucket, path, ex) + ex + } + } } diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/storage/StorageInterface.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/storage/StorageInterface.scala index 593c6e3f9..27266d7fe 100644 --- a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/storage/StorageInterface.scala +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/storage/StorageInterface.scala @@ -68,4 +68,6 @@ trait StorageInterface { trait Uploader extends AutoCloseable { def upload(source: ByteBuffer, bucket: String, path: String): Either[Throwable, Unit] + + def delete(bucket: String, path: String): Either[Throwable, Unit] } diff --git a/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/sink/consumers/ConsumerGroupsWriterTest.scala b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/sink/consumers/ConsumerGroupsWriterTest.scala index ee59cd69a..988d17a1f 100644 --- a/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/sink/consumers/ConsumerGroupsWriterTest.scala +++ b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/sink/consumers/ConsumerGroupsWriterTest.scala @@ -38,6 +38,7 @@ class ConsumerGroupsWriterTest extends AnyFunSuite with Matchers { Right(()) } + override def delete(bucket: String, path: String): Either[Throwable, Unit] = Right(()) override def close(): Unit = {} } @@ -71,7 +72,7 @@ class ConsumerGroupsWriterTest extends AnyFunSuite with Matchers { writes = writes :+ (path, source.array()) Right(()) } - + override def delete(bucket: String, path: String): Either[Throwable, Unit] = Right(()) override def close(): Unit = {} } val writer = new ConsumerGroupsWriter(location, uploader, taskId) @@ -133,7 +134,7 @@ class ConsumerGroupsWriterTest extends AnyFunSuite with Matchers { writes = writes :+ (path, source.array()) Right(()) } - + override def delete(bucket: String, path: String): Either[Throwable, Unit] = Right(()) override def close(): Unit = {} } val writer = new ConsumerGroupsWriter(location, uploader, taskId) @@ -169,7 +170,7 @@ class ConsumerGroupsWriterTest extends AnyFunSuite with Matchers { val uploader = new Uploader { override def upload(source: ByteBuffer, bucket: String, path: String): Either[Throwable, Unit] = Left(new RuntimeException("Boom!")) - + override def delete(bucket: String, path: String): Either[Throwable, Unit] = Right(()) override def close(): Unit = {} } val writer = new ConsumerGroupsWriter(location, uploader, taskId) @@ -189,6 +190,107 @@ class ConsumerGroupsWriterTest extends AnyFunSuite with Matchers { result.left.getOrElse(fail("should not fail")).getMessage shouldBe "Boom!" } + test("write the offset and delete on empty value payload") { + val location = S3ObjectKey("bucket", None) + var writes = Vector[(String, Array[Byte])]() + + var deletes = Vector[String]() + val uploader = new Uploader { + override def upload(source: ByteBuffer, bucket: String, path: String): Either[Throwable, Unit] = { + writes = writes :+ (path, source.array()) + Right(()) + } + override def delete(bucket: String, path: String): Either[Throwable, Unit] = { + deletes = deletes :+ path + Right(()) + } + override def close(): Unit = {} + } + val writer = new ConsumerGroupsWriter(location, uploader, taskId) + val records = List( + new SinkRecord( + "__consumer_offsets", + 77, + null, + generateOffsetKey("group1", "topic1", 1), + null, + generateOffsetDetails(123L), + 100, + ), + new SinkRecord( + "__consumer_offsets", + 77, + null, + generateOffsetKey("group1", "topic1", 1), + null, + null, + 100, + ), + ) + + writer.write(records) shouldBe Right(()) + writes shouldBe empty + deletes should contain theSameElementsAs List( + "group1/topic1/1", + ) + } + + test("write, delete , write writes the offset") { + val location = S3ObjectKey("bucket", None) + var writes = Vector[(String, Array[Byte])]() + + var deletes = Vector[String]() + val uploader = new Uploader { + override def upload(source: ByteBuffer, bucket: String, path: String): Either[Throwable, Unit] = { + writes = writes :+ (path, source.array()) + Right(()) + } + + override def delete(bucket: String, path: String): Either[Throwable, Unit] = { + deletes = deletes :+ path + Right(()) + } + + override def close(): Unit = {} + } + val writer = new ConsumerGroupsWriter(location, uploader, taskId) + val records = List( + new SinkRecord( + "__consumer_offsets", + 77, + null, + generateOffsetKey("group1", "topic1", 1), + null, + generateOffsetDetails(123L), + 100, + ), + new SinkRecord( + "__consumer_offsets", + 77, + null, + generateOffsetKey("group1", "topic1", 1), + null, + null, + 100, + ), + new SinkRecord( + "__consumer_offsets", + 77, + null, + generateOffsetKey("group1", "topic1", 1), + null, + generateOffsetDetails(333L), + 100, + ), + ) + + writer.write(records) shouldBe Right(()) + val writesToLong = writes.map(w => (w._1, ByteBuffer.wrap(w._2).getLong)) + writesToLong should contain theSameElementsAs List( + ("group1/topic1/1", 333L), + ) + deletes shouldBe empty + } private def generateOffsetKey(group: String, topic: String, partition: Int): Array[Byte] = { val buffer = ByteBuffer.allocate(256) buffer.putShort(0.toShort)