Skip to content

Commit

Permalink
Handle offset deletes
Browse files Browse the repository at this point in the history
  • Loading branch information
stheppi committed Oct 31, 2023
1 parent a163ba9 commit f082c4a
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,25 +37,37 @@ class ConsumerGroupsWriter(location: S3ObjectKey, uploader: Uploader, taskId: Co
records.traverse(extractOffsets)
.map(_.flatten)
.map { offsets =>
offsets.foldLeft(Map.empty[GroupTopicPartition, Long]) {
case (acc, OffsetDetails(key, metadata)) =>
acc + (key.key -> metadata.offset)
offsets.foldLeft(Map.empty[GroupTopicPartition, OffsetAction]) {
case (acc, action: OffsetAction) =>
acc + (action.key -> action)
}
}.flatMap { map =>
map.toList.traverse {
case (groupTopicPartition, offset) =>
val content = ByteBuffer.allocate(8).putLong(offset).rewind()
case (groupTopicPartition, action) =>
val s3KeySuffix =
s"${groupTopicPartition.group}/${groupTopicPartition.topic}/${groupTopicPartition.partition}"
val s3Key = location.prefix.fold(s3KeySuffix)(prefix => s"$prefix/$s3KeySuffix")
logger.debug(s"[${taskId.show}] Uploading offset $offset to $s3Key")
val result = uploader.upload(
content,
location.bucket,
s3Key,
)
logger.debug(s"[${taskId.show}] Uploaded offset $offset to $s3Key")
result

action match {
case WriteOffset(offset) =>
val content = ByteBuffer.allocate(8).putLong(offset.metadata.offset).rewind()
logger.debug(s"[${taskId.show}] Uploading offset $offset to $s3Key")
val result = uploader.upload(
content,
location.bucket,
s3Key,
)
logger.debug(s"[${taskId.show}] Uploaded offset $offset to $s3Key")
result
case DeleteOffset(_) =>
logger.debug(s"[${taskId.show}] Deleting offset $s3Key")
val result = uploader.delete(
location.bucket,
s3Key,
)
logger.debug(s"[${taskId.show}] Deleted offset $s3Key")
result
}
}.map(_ => ())
}
}
Expand All @@ -68,7 +80,7 @@ object ConsumerGroupsWriter extends StrictLogging {
* @param record The [[SinkRecord]] to extract the offset details from.
* @return Either an error or the offset details.
*/
def extractOffsets(record: SinkRecord): Either[Throwable, Option[OffsetDetails]] =
def extractOffsets(record: SinkRecord): Either[Throwable, Option[OffsetAction]] =
Option(record.key()) match {
case None => Right(None)
case Some(key) =>
Expand All @@ -92,9 +104,10 @@ object ConsumerGroupsWriter extends StrictLogging {
)
metadata <- OffsetAndMetadata.from(ByteBuffer.wrap(valueBytes))
} yield {
Some(OffsetDetails(key, metadata))
Some(WriteOffset(OffsetDetails(key, metadata)))
}
case None => Right(None)
case None =>
Right(Some(DeleteOffset(key.key)))
}
} yield value
} else {
Expand All @@ -116,3 +129,12 @@ object ConsumerGroupsWriter extends StrictLogging {
}

case class OffsetDetails(key: OffsetKey, metadata: OffsetAndMetadata)

sealed trait OffsetAction {
def key: GroupTopicPartition
}

case class WriteOffset(offset: OffsetDetails) extends OffsetAction {
override def key: GroupTopicPartition = offset.key.key
}
case class DeleteOffset(key: GroupTopicPartition) extends OffsetAction
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import com.typesafe.scalalogging.LazyLogging
import io.lenses.streamreactor.connect.aws.s3.config.ConnectorTaskId
import software.amazon.awssdk.core.sync.RequestBody
import software.amazon.awssdk.services.s3.S3Client
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest
import software.amazon.awssdk.services.s3.model.PutObjectRequest

import java.nio.ByteBuffer
Expand All @@ -47,4 +48,16 @@ class AwsS3Uploader(s3Client: S3Client, connectorTaskId: ConnectorTaskId) extend
ex
}
}

override def delete(bucket: String, path: String): Either[Throwable, Unit] = {
logger.debug(s"[{}] AWS Deleting from s3 {}:{}", connectorTaskId.show, bucket, path)
Try {
s3Client.deleteObject(DeleteObjectRequest.builder().bucket(bucket).key(path).build())
logger.debug(s"[{}] Completed delete from s3 {}:{}", connectorTaskId.show, bucket, path)
}
.toEither.leftMap { ex =>
logger.error(s"[{}] Failed delete from s3 {}:{}", connectorTaskId.show, bucket, path, ex)
ex
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,6 @@ trait StorageInterface {

trait Uploader extends AutoCloseable {
def upload(source: ByteBuffer, bucket: String, path: String): Either[Throwable, Unit]

def delete(bucket: String, path: String): Either[Throwable, Unit]
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class ConsumerGroupsWriterTest extends AnyFunSuite with Matchers {
Right(())
}

override def delete(bucket: String, path: String): Either[Throwable, Unit] = Right(())
override def close(): Unit = {}
}

Expand Down Expand Up @@ -71,7 +72,7 @@ class ConsumerGroupsWriterTest extends AnyFunSuite with Matchers {
writes = writes :+ (path, source.array())
Right(())
}

override def delete(bucket: String, path: String): Either[Throwable, Unit] = Right(())
override def close(): Unit = {}
}
val writer = new ConsumerGroupsWriter(location, uploader, taskId)
Expand Down Expand Up @@ -133,7 +134,7 @@ class ConsumerGroupsWriterTest extends AnyFunSuite with Matchers {
writes = writes :+ (path, source.array())
Right(())
}

override def delete(bucket: String, path: String): Either[Throwable, Unit] = Right(())
override def close(): Unit = {}
}
val writer = new ConsumerGroupsWriter(location, uploader, taskId)
Expand Down Expand Up @@ -169,7 +170,7 @@ class ConsumerGroupsWriterTest extends AnyFunSuite with Matchers {
val uploader = new Uploader {
override def upload(source: ByteBuffer, bucket: String, path: String): Either[Throwable, Unit] =
Left(new RuntimeException("Boom!"))

override def delete(bucket: String, path: String): Either[Throwable, Unit] = Right(())
override def close(): Unit = {}
}
val writer = new ConsumerGroupsWriter(location, uploader, taskId)
Expand All @@ -189,6 +190,107 @@ class ConsumerGroupsWriterTest extends AnyFunSuite with Matchers {
result.left.getOrElse(fail("should not fail")).getMessage shouldBe "Boom!"
}

test("write the offset and delete on empty value payload") {
val location = S3ObjectKey("bucket", None)
var writes = Vector[(String, Array[Byte])]()

var deletes = Vector[String]()
val uploader = new Uploader {
override def upload(source: ByteBuffer, bucket: String, path: String): Either[Throwable, Unit] = {
writes = writes :+ (path, source.array())
Right(())
}
override def delete(bucket: String, path: String): Either[Throwable, Unit] = {
deletes = deletes :+ path
Right(())
}
override def close(): Unit = {}
}
val writer = new ConsumerGroupsWriter(location, uploader, taskId)
val records = List(
new SinkRecord(
"__consumer_offsets",
77,
null,
generateOffsetKey("group1", "topic1", 1),
null,
generateOffsetDetails(123L),
100,
),
new SinkRecord(
"__consumer_offsets",
77,
null,
generateOffsetKey("group1", "topic1", 1),
null,
null,
100,
),
)

writer.write(records) shouldBe Right(())
writes shouldBe empty
deletes should contain theSameElementsAs List(
"group1/topic1/1",
)
}

test("write, delete , write writes the offset") {
val location = S3ObjectKey("bucket", None)
var writes = Vector[(String, Array[Byte])]()

var deletes = Vector[String]()
val uploader = new Uploader {
override def upload(source: ByteBuffer, bucket: String, path: String): Either[Throwable, Unit] = {
writes = writes :+ (path, source.array())
Right(())
}

override def delete(bucket: String, path: String): Either[Throwable, Unit] = {
deletes = deletes :+ path
Right(())
}

override def close(): Unit = {}
}
val writer = new ConsumerGroupsWriter(location, uploader, taskId)
val records = List(
new SinkRecord(
"__consumer_offsets",
77,
null,
generateOffsetKey("group1", "topic1", 1),
null,
generateOffsetDetails(123L),
100,
),
new SinkRecord(
"__consumer_offsets",
77,
null,
generateOffsetKey("group1", "topic1", 1),
null,
null,
100,
),
new SinkRecord(
"__consumer_offsets",
77,
null,
generateOffsetKey("group1", "topic1", 1),
null,
generateOffsetDetails(333L),
100,
),
)

writer.write(records) shouldBe Right(())
val writesToLong = writes.map(w => (w._1, ByteBuffer.wrap(w._2).getLong))
writesToLong should contain theSameElementsAs List(
("group1/topic1/1", 333L),
)
deletes shouldBe empty
}
private def generateOffsetKey(group: String, topic: String, partition: Int): Array[Byte] = {
val buffer = ByteBuffer.allocate(256)
buffer.putShort(0.toShort)
Expand Down

0 comments on commit f082c4a

Please sign in to comment.