Skip to content

Commit

Permalink
Add GZIP support for JSON in S3 sink connector
Browse files Browse the repository at this point in the history
  • Loading branch information
brandon-powers committed Feb 20, 2024
1 parent b8d9c6b commit 0ecfe34
Show file tree
Hide file tree
Showing 11 changed files with 129 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package io.lenses.streamreactor.connect.aws.s3.sink.config
import io.lenses.streamreactor.common.config.base.traits._
import io.lenses.streamreactor.connect.aws.s3.config.DeleteModeSettings
import io.lenses.streamreactor.connect.aws.s3.config.S3ConfigSettings
import io.lenses.streamreactor.connect.cloud.common.config.CompressionCodecSettings
import io.lenses.streamreactor.connect.cloud.common.sink.config.CloudSinkConfigDefBuilder

import java.util
Expand All @@ -31,7 +30,6 @@ case class S3SinkConfigDefBuilder(props: util.Map[String, String])
with NumberRetriesSettings
with UserSettings
with ConnectionSettings
with CompressionCodecSettings
with DeleteModeSettings {

def getParsedValues: Map[String, _] = values().asScala.toMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.lenses.streamreactor.connect.datalake.sink.config

import io.lenses.streamreactor.common.config.base.traits._
import io.lenses.streamreactor.connect.cloud.common.config.CompressionCodecSettings
import io.lenses.streamreactor.connect.cloud.common.sink.config.CloudSinkConfigDefBuilder
import io.lenses.streamreactor.connect.datalake.config.AuthModeSettings
import io.lenses.streamreactor.connect.datalake.config.AzureConfigSettings
Expand All @@ -31,7 +30,6 @@ case class DatalakeSinkConfigDefBuilder(props: util.Map[String, String])
with NumberRetriesSettings
with UserSettings
with ConnectionSettings
with CompressionCodecSettings
with AuthModeSettings {

def getParsedValues: Map[String, _] = values().asScala.toMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ trait CompressionCodecConfigKeys extends WithConnectorPrefix {

def COMPRESSION_CODEC = s"$connectorPrefix.compression.codec"

val COMPRESSION_CODEC_DOC = "Compression codec to use for Avro or Parquet."
val COMPRESSION_CODEC_DOC = "Compression codec to use for Avro, Parquet or JSON."
val COMPRESSION_CODEC_DEFAULT: String = UNCOMPRESSED.entryName

def COMPRESSION_LEVEL = s"$connectorPrefix.compression.level"
Expand All @@ -41,9 +41,25 @@ trait CompressionCodecSettings extends BaseSettings with CompressionCodecConfigK
CompressionCodecName.withNameInsensitiveOption(getString(COMPRESSION_CODEC)).getOrElse(
CompressionCodecName.UNCOMPRESSED,
)
val level = getInt(COMPRESSION_LEVEL)
val levelOpt = Option.when(level != -1)(level.toInt)

CompressionCodec(codec, levelOpt)
val level = getInt(COMPRESSION_LEVEL)
val levelOpt = Option.when(level != -1)(level.toInt)
val extension = getCompressedFileExtension(codec)

CompressionCodec(codec, levelOpt, extension)
}

private def getCompressedFileExtension(codec: CompressionCodecName): Option[String] =
codec match {
case CompressionCodecName.UNCOMPRESSED => None
case CompressionCodecName.SNAPPY => Some("sz")
case CompressionCodecName.GZIP => Some("gz")
case CompressionCodecName.LZO => Some("lzo")
case CompressionCodecName.BROTLI => Some("br")
case CompressionCodecName.LZ4 => Some("lz4")
case CompressionCodecName.BZIP2 => Some("bz2")
case CompressionCodecName.ZSTD => Some("zst")
case CompressionCodecName.DEFLATE => Some("gz")
case CompressionCodecName.XZ => Some("xz")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ case object FormatSelection {
}

case object JsonFormatSelection extends FormatSelection {
override def availableCompressionCodecs: Map[CompressionCodecName, Boolean] = Map(
UNCOMPRESSED -> true,
GZIP -> true, // Only applies to sink currently.
)

override def toStreamReader(
input: ReaderBuilderContext,
): CloudStreamReader = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ class AvroFormatWriter(outputStream: CloudOutputStream)(implicit compressionCode

private val avroCompressionCodec: CodecFactory = {
compressionCodec match {
case CompressionCodec(UNCOMPRESSED, _) => CodecFactory.nullCodec()
case CompressionCodec(SNAPPY, _) => CodecFactory.snappyCodec()
case CompressionCodec(BZIP2, _) => CodecFactory.bzip2Codec()
case CompressionCodec(ZSTD, Some(level)) => CodecFactory.zstandardCodec(level)
case CompressionCodec(DEFLATE, Some(level)) => CodecFactory.deflateCodec(level)
case CompressionCodec(XZ, Some(level)) => CodecFactory.xzCodec(level)
case CompressionCodec(UNCOMPRESSED, _, _) => CodecFactory.nullCodec()
case CompressionCodec(SNAPPY, _, _) => CodecFactory.snappyCodec()
case CompressionCodec(BZIP2, _, _) => CodecFactory.bzip2Codec()
case CompressionCodec(ZSTD, Some(level), _) => CodecFactory.zstandardCodec(level)
case CompressionCodec(DEFLATE, Some(level), _) => CodecFactory.deflateCodec(level)
case CompressionCodec(XZ, Some(level), _) => CodecFactory.xzCodec(level)
case _ =>
throw new IllegalArgumentException("No or invalid compressionCodec specified - does codec require a level?")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,48 +16,77 @@
package io.lenses.streamreactor.connect.cloud.common.formats.writer

import JsonFormatWriter._
import LineSeparatorUtil.LineSeparatorBytes
import io.lenses.streamreactor.connect.cloud.common.formats.writer.LineSeparatorUtil.LineSeparatorBytes
import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodec
import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodecName
import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodecName.GZIP
import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodecName.UNCOMPRESSED
import io.lenses.streamreactor.connect.cloud.common.sink.SinkError
import io.lenses.streamreactor.connect.cloud.common.sink.conversion.ToJsonDataConverter
import io.lenses.streamreactor.connect.cloud.common.stream.CloudOutputStream
import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream
import org.apache.commons.compress.compressors.gzip.GzipParameters
import org.apache.kafka.connect.json.JsonConverter
import org.apache.kafka.connect.json.DecimalFormat
import org.apache.kafka.connect.json.JsonConverterConfig

import scala.jdk.CollectionConverters.MapHasAsJava
import scala.util.Try
class JsonFormatWriter(outputStream: CloudOutputStream) extends FormatWriter {
import java.io.OutputStream

class JsonFormatWriter(outputStream: CloudOutputStream)(implicit compressionCodec: CompressionCodec)
extends FormatWriter {
private var compressedOutputStream: OutputStream = _

private val jsonCompressionCodec: CompressionCodec = {
compressionCodec match {
case CompressionCodec(UNCOMPRESSED, _, _) => compressionCodec
case CompressionCodec(GZIP, _, _) => compressionCodec
case _ => throw new IllegalArgumentException("Invalid or missing `compressionCodec` specified.")
}
}

override def write(messageDetail: MessageDetail): Either[Throwable, Unit] = {
val topic = messageDetail.topic
val valueSinkData = messageDetail.value
override def write(messageDetail: MessageDetail): Either[Throwable, Unit] =
Try {
val dataBytes: Array[Byte] = ToJsonDataConverter.convertMessageValueToByteArray(
Converter,
messageDetail.topic,
messageDetail.value,
)

val dataBytes = messageDetail.value match {
case data: PrimitiveSinkData =>
Converter.fromConnectData(topic.value, valueSinkData.schema().orNull, data.safeValue)
case StructSinkData(structVal) =>
Converter.fromConnectData(topic.value, valueSinkData.schema().orNull, structVal)
case MapSinkData(map, schema) =>
Converter.fromConnectData(topic.value, schema.orNull, map)
case ArraySinkData(array, schema) =>
Converter.fromConnectData(topic.value, schema.orNull, array)
case ByteArraySinkData(_, _) => throw new IllegalStateException("Cannot currently write byte array as json")
case NullSinkData(schema) => Converter.fromConnectData(topic.value, schema.orNull, null)
}
if (jsonCompressionCodec.compressionCodec == CompressionCodecName.GZIP) {
if (compressedOutputStream == null) {
val parameters = new GzipParameters()
val defaultGzipCompressionLevel = 6

parameters.setCompressionLevel(jsonCompressionCodec.level.getOrElse(defaultGzipCompressionLevel))
compressedOutputStream = new GzipCompressorOutputStream(outputStream, parameters)
}

outputStream.write(dataBytes)
outputStream.write(LineSeparatorBytes)
outputStream.flush()
compressedOutputStream.write(dataBytes)
compressedOutputStream.flush()
} else {
outputStream.write(dataBytes)
outputStream.write(LineSeparatorBytes)
outputStream.flush()
}
}.toEither
}

override def rolloverFileOnSchemaChange(): Boolean = false

override def complete(): Either[SinkError, Unit] =
for {
closed <- outputStream.complete()
_ <- Suppress(outputStream.flush())
_ <- Suppress(outputStream.close())
} yield closed
if (compressedOutputStream != null)
for {
_ <- Suppress(compressedOutputStream.flush())
_ <- Suppress(compressedOutputStream.close())
closed <- outputStream.complete()
} yield closed
else
for {
_ <- Suppress(outputStream.flush())
_ <- Suppress(outputStream.close())
closed <- outputStream.complete()
} yield closed

override def getPointer: Long = outputStream.getPointer

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,13 @@ object CompressionCodecName extends Enum[CompressionCodecName] {
override def values: IndexedSeq[CompressionCodecName] = findValues
}

case class CompressionCodec(compressionCodec: CompressionCodecName, level: Option[Int] = Option.empty)
/**
* @param extension
* Some format selections have compression built-in, such as Avro and Parquet.
* Text formats like CSV and JSON do not, and require an update to the file extension when compressed.
*/
case class CompressionCodec(
compressionCodec: CompressionCodecName,
level: Option[Int] = Option.empty,
extension: Option[String] = None,
)
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import io.lenses.streamreactor.connect.cloud.common.config.BytesFormatSelection
import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId
import io.lenses.streamreactor.connect.cloud.common.config.DataStorageSettings
import io.lenses.streamreactor.connect.cloud.common.config.FormatSelection
import io.lenses.streamreactor.connect.cloud.common.config.JsonFormatSelection
import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation
import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocationValidator
import io.lenses.streamreactor.connect.cloud.common.sink.commit.CloudCommitPolicy
Expand All @@ -47,6 +48,7 @@ object CloudSinkBucketOptions extends LazyLogging {
config.getKCQL.map { kcql: Kcql =>
for {
formatSelection <- FormatSelection.fromKcql(kcql, SinkPropsSchema.schema)
fileExtension = extractFileExtension(config, formatSelection)
sinkProps = CloudSinkProps.fromKcql(kcql)
partitionSelection = PartitionSelection(kcql, sinkProps)
paddingService <- PaddingService.fromConfig(config, sinkProps)
Expand All @@ -55,12 +57,12 @@ object CloudSinkBucketOptions extends LazyLogging {
new TopicPartitionOffsetFileNamer(
paddingService.padderFor("partition"),
paddingService.padderFor("offset"),
formatSelection.extension,
fileExtension,
)
} else {
new OffsetFileNamer(
paddingService.padderFor("offset"),
formatSelection.extension,
fileExtension,
)
}
keyNamer = CloudKeyNamer(formatSelection, partitionSelection, fileNamer, paddingService)
Expand Down Expand Up @@ -107,6 +109,14 @@ object CloudSinkBucketOptions extends LazyLogging {
else
new IllegalArgumentException(s"Envelope is not supported for format ${format.extension.toUpperCase()}.").asLeft
}

private def extractFileExtension(config: CloudSinkConfigDefBuilder, formatSelection: FormatSelection): String = {
// Avro or Parquet do not change filenames when compressed; guards for JSON only.
if (formatSelection != JsonFormatSelection)
return formatSelection.extension

config.getCompressionCodec().extension.getOrElse(formatSelection.extension)
}
}

case class CloudSinkBucketOptions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ package io.lenses.streamreactor.connect.cloud.common.sink.config

import io.lenses.streamreactor.common.config.base.traits.KcqlSettings
import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.PaddingStrategySettings
import io.lenses.streamreactor.connect.cloud.common.config.CompressionCodecSettings

trait CloudSinkConfigDefBuilder
extends KcqlSettings
with FlushSettings
with LocalStagingAreaSettings
with PaddingStrategySettings {}
with PaddingStrategySettings
with CompressionCodecSettings {}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,31 @@
*/
package io.lenses.streamreactor.connect.cloud.common.sink.conversion

import io.lenses.streamreactor.connect.cloud.common.formats.writer.ArraySinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.ByteArraySinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.MapSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.NullSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.PrimitiveSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.SinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.StructSinkData
import io.lenses.streamreactor.connect.cloud.common.model.Topic
import org.apache.kafka.connect.json.JsonConverter

import java.nio.ByteBuffer

object ToJsonDataConverter {

def convertMessageValueToByteArray(converter: JsonConverter, topic: Topic, data: SinkData): Array[Byte] =
data match {
case data: PrimitiveSinkData => converter.fromConnectData(topic.value, data.schema().orNull, data.safeValue)
case StructSinkData(structVal) => converter.fromConnectData(topic.value, data.schema().orNull, structVal)
case MapSinkData(map, schema) => converter.fromConnectData(topic.value, schema.orNull, map)
case ArraySinkData(array, schema) => converter.fromConnectData(topic.value, schema.orNull, array)
case ByteArraySinkData(_, _) => throw new IllegalStateException("Cannot currently write byte array as json")
case NullSinkData(schema) => converter.fromConnectData(topic.value, schema.orNull, null)
case other => throw new IllegalStateException(s"Unknown SinkData type, ${other.getClass.getSimpleName}")
}

def convert(data: SinkData): Any = data match {
case data: PrimitiveSinkData => data.safeValue
case ByteArraySinkData(bArray, _) => ByteBuffer.wrap(bArray)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.lenses.streamreactor.connect.gcp.storage.sink.config

import io.lenses.streamreactor.common.config.base.traits._
import io.lenses.streamreactor.connect.cloud.common.config.CompressionCodecSettings
import io.lenses.streamreactor.connect.cloud.common.sink.config.CloudSinkConfigDefBuilder
import io.lenses.streamreactor.connect.gcp.storage.config.AuthModeSettings
import io.lenses.streamreactor.connect.gcp.storage.config.GCPConfigSettings
Expand All @@ -32,7 +31,6 @@ case class GCPStorageSinkConfigDefBuilder(props: util.Map[String, String])
with NumberRetriesSettings
with UserSettings
with ConnectionSettings
with CompressionCodecSettings
with AuthModeSettings
with UploadSettings {

Expand Down

0 comments on commit 0ecfe34

Please sign in to comment.