From 5ebf989b6bcca7a78030195d184f299ed302cc7c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Cio=C5=82ecki?= Date: Mon, 18 Nov 2024 12:12:52 +0100 Subject: [PATCH] NU-1848: Improve missing Flink Kafka Source / Sink TypeInformation (#7116) --- docs/Changelog.md | 1 + docs/MigrationGuide.md | 8 + .../TypeInformationDetection.scala | 11 +- .../FlinkKafkaComponentProvider.scala | 8 +- .../ConsumerRecordTypeSerializerSnapshot.java | 44 ++++ ...inkUniversalSchemaBasedSerdeProvider.scala | 26 +++ .../typeinfo/ConsumerRecordTypeInfo.scala | 204 ++++++++++++++++++ .../sink/flink/FlinkKafkaUniversalSink.scala | 17 +- ...KeyValueDeserializationSchemaFactory.scala | 68 ++++++ .../ConsumerRecordSerializerSpec.scala | 118 ++++++++++ ...erRecordDeserializationSchemaFactory.scala | 67 +++--- .../SerializableConsumerRecord.scala | 3 - .../UniversalSchemaBasedSerdeProvider.scala | 2 +- ...KeyValueDeserializationSchemaFactory.scala | 49 ++--- .../sink/UniversalKafkaSinkFactory.scala | 1 + .../nussknacker/test/RandomImplicits.scala | 25 +++ 16 files changed, 580 insertions(+), 72 deletions(-) create mode 100644 engine/flink/schemed-kafka-components-utils/src/main/java/pl/touk/nussknacker/engine/schemedkafka/flink/typeinfo/ConsumerRecordTypeSerializerSnapshot.java create mode 100644 engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/FlinkUniversalSchemaBasedSerdeProvider.scala create mode 100644 engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/flink/typeinfo/ConsumerRecordTypeInfo.scala create mode 100644 engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/source/flink/FlinkKafkaSchemaRegistryBasedKeyValueDeserializationSchemaFactory.scala create mode 100644 engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/flink/typeinfo/ConsumerRecordSerializerSpec.scala create mode 100644 utils/test-utils/src/main/scala/pl/touk/nussknacker/test/RandomImplicits.scala diff --git a/docs/Changelog.md b/docs/Changelog.md index 8b71162cd36..cd3132a2435 100644 --- a/docs/Changelog.md +++ b/docs/Changelog.md @@ -11,6 +11,7 @@ ### 1.19.0 (Not released yet) * [#7145](https://github.com/TouK/nussknacker/pull/7145) Lift TypingResult information for dictionaries +* [#7116](https://github.com/TouK/nussknacker/pull/7116) Improve missing Flink Kafka Source / Sink TypeInformation ## 1.18 diff --git a/docs/MigrationGuide.md b/docs/MigrationGuide.md index 24ae0dbbc3d..308a2d12f8d 100644 --- a/docs/MigrationGuide.md +++ b/docs/MigrationGuide.md @@ -2,6 +2,14 @@ To see the biggest differences please consult the [changelog](Changelog.md). +## In version 1.19.0 (Not released yet) + +### Other changes + +* [#7116](https://github.com/TouK/nussknacker/pull/7116) Improve missing Flink Kafka Source / Sink TypeInformation + * We lost support for old ConsumerRecord constructor supported by Flink 1.14 / 1.15 + * If you used Kafka source/sink components in your scenarios then state of these scenarios won't be restored + ## In version 1.18.0 (Not released yet) ### Configuration changes diff --git a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypeInformationDetection.scala b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypeInformationDetection.scala index 50ad362f158..34bfbee9e3b 100644 --- a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypeInformationDetection.scala +++ b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypeInformationDetection.scala @@ -2,6 +2,7 @@ package pl.touk.nussknacker.engine.flink.api.typeinformation import org.apache.flink.api.common.typeinfo.{TypeInformation, Types} import pl.touk.nussknacker.engine.api.context.ValidationContext +import pl.touk.nussknacker.engine.api.generics.GenericType import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypingResult} import pl.touk.nussknacker.engine.api.{Context, ValueWithContext} import pl.touk.nussknacker.engine.util.Implicits.RichStringList @@ -38,8 +39,14 @@ trait TypeInformationDetection extends Serializable { forClass(klass) } - def forClass[T](klass: Class[T]): TypeInformation[T] = - forType[T](Typed.typedClass(klass)) + def forClass[T](klass: Class[T]): TypeInformation[T] = { + // Typed.typedClass doesn't support Any + if (klass == classOf[Any]) { + Types.GENERIC(klass) + } else { + forType[T](Typed.typedClass(klass)) + } + } def forType[T](typingResult: TypingResult): TypeInformation[T] diff --git a/engine/flink/components/kafka/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/FlinkKafkaComponentProvider.scala b/engine/flink/components/kafka/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/FlinkKafkaComponentProvider.scala index 9aa42b14e6d..dc8899248ac 100644 --- a/engine/flink/components/kafka/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/FlinkKafkaComponentProvider.scala +++ b/engine/flink/components/kafka/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/FlinkKafkaComponentProvider.scala @@ -12,11 +12,9 @@ import pl.touk.nussknacker.engine.api.component.{ import pl.touk.nussknacker.engine.api.process.ProcessObjectDependencies import pl.touk.nussknacker.engine.kafka.KafkaConfig import pl.touk.nussknacker.engine.kafka.source.flink.FlinkKafkaSourceImplFactory +import pl.touk.nussknacker.engine.schemedkafka.FlinkUniversalSchemaBasedSerdeProvider import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.SchemaRegistryClientFactory -import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.{ - UniversalSchemaBasedSerdeProvider, - UniversalSchemaRegistryClientFactory -} +import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.UniversalSchemaRegistryClientFactory import pl.touk.nussknacker.engine.schemedkafka.sink.UniversalKafkaSinkFactory import pl.touk.nussknacker.engine.schemedkafka.sink.flink.FlinkKafkaUniversalSinkImplFactory import pl.touk.nussknacker.engine.schemedkafka.source.UniversalKafkaSourceFactory @@ -36,7 +34,7 @@ class FlinkKafkaComponentProvider extends ComponentProvider { import docsConfig._ def universal(componentType: ComponentType) = s"DataSourcesAndSinks#kafka-$componentType" - val universalSerdeProvider = UniversalSchemaBasedSerdeProvider.create(schemaRegistryClientFactory) + val universalSerdeProvider = FlinkUniversalSchemaBasedSerdeProvider.create(schemaRegistryClientFactory) List( ComponentDefinition( diff --git a/engine/flink/schemed-kafka-components-utils/src/main/java/pl/touk/nussknacker/engine/schemedkafka/flink/typeinfo/ConsumerRecordTypeSerializerSnapshot.java b/engine/flink/schemed-kafka-components-utils/src/main/java/pl/touk/nussknacker/engine/schemedkafka/flink/typeinfo/ConsumerRecordTypeSerializerSnapshot.java new file mode 100644 index 00000000000..964a2d55dd5 --- /dev/null +++ b/engine/flink/schemed-kafka-components-utils/src/main/java/pl/touk/nussknacker/engine/schemedkafka/flink/typeinfo/ConsumerRecordTypeSerializerSnapshot.java @@ -0,0 +1,44 @@ +package pl.touk.nussknacker.engine.schemedkafka.flink.typeinfo; + +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +/** + * A {@link TypeSerializerSnapshot} for the Scala {@link ConsumerRecordTypeInfo}. + */ +public final class ConsumerRecordTypeSerializerSnapshot + extends CompositeTypeSerializerSnapshot, ConsumerRecordSerializer> { + + final private static int VERSION = 1; + + public ConsumerRecordTypeSerializerSnapshot() { + super(); + } + + public ConsumerRecordTypeSerializerSnapshot(ConsumerRecordSerializer serializerInstance) { + super(serializerInstance); + } + + @Override + protected int getCurrentOuterSnapshotVersion() { + return VERSION; + } + + @Override + protected TypeSerializer[] getNestedSerializers(ConsumerRecordSerializer outerSerializer) { + return new TypeSerializer[] { outerSerializer.keySerializer(), outerSerializer.valueSerializer() }; + } + + @Override + protected ConsumerRecordSerializer createOuterSerializerWithNestedSerializers(TypeSerializer[] nestedSerializers) { + @SuppressWarnings("unchecked") + TypeSerializer keySerializer = (TypeSerializer) nestedSerializers[0]; + + @SuppressWarnings("unchecked") + TypeSerializer valueSerializer = (TypeSerializer) nestedSerializers[1]; + + return new ConsumerRecordSerializer<>(keySerializer, valueSerializer); + } +} diff --git a/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/FlinkUniversalSchemaBasedSerdeProvider.scala b/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/FlinkUniversalSchemaBasedSerdeProvider.scala new file mode 100644 index 00000000000..4f14299516e --- /dev/null +++ b/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/FlinkUniversalSchemaBasedSerdeProvider.scala @@ -0,0 +1,26 @@ +package pl.touk.nussknacker.engine.schemedkafka + +import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.serialization.KafkaSchemaRegistryBasedValueSerializationSchemaFactory +import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.UniversalSchemaBasedSerdeProvider.createSchemaIdFromMessageExtractor +import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.{UniversalKafkaDeserializerFactory, UniversalSchemaValidator, UniversalSerializerFactory, UniversalToJsonFormatterFactory} +import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{SchemaBasedSerdeProvider, SchemaRegistryClientFactory} +import pl.touk.nussknacker.engine.schemedkafka.source.flink.FlinkKafkaSchemaRegistryBasedKeyValueDeserializationSchemaFactory + +object FlinkUniversalSchemaBasedSerdeProvider { + + def create(schemaRegistryClientFactory: SchemaRegistryClientFactory): SchemaBasedSerdeProvider = { + SchemaBasedSerdeProvider( + new KafkaSchemaRegistryBasedValueSerializationSchemaFactory( + schemaRegistryClientFactory, + UniversalSerializerFactory + ), + new FlinkKafkaSchemaRegistryBasedKeyValueDeserializationSchemaFactory( + schemaRegistryClientFactory, + new UniversalKafkaDeserializerFactory(createSchemaIdFromMessageExtractor) + ), + new UniversalToJsonFormatterFactory(schemaRegistryClientFactory, createSchemaIdFromMessageExtractor), + UniversalSchemaValidator + ) + } + +} diff --git a/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/flink/typeinfo/ConsumerRecordTypeInfo.scala b/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/flink/typeinfo/ConsumerRecordTypeInfo.scala new file mode 100644 index 00000000000..38643da0fe5 --- /dev/null +++ b/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/flink/typeinfo/ConsumerRecordTypeInfo.scala @@ -0,0 +1,204 @@ +package pl.touk.nussknacker.engine.schemedkafka.flink.typeinfo + +import com.github.ghik.silencer.silent +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot} +import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.header.internals.{RecordHeader, RecordHeaders} +import org.apache.kafka.common.record.TimestampType + +import java.util.{Objects, Optional} + +class ConsumerRecordTypeInfo[K, V](val keyTypeInfo: TypeInformation[K], val valueTypeInfo: TypeInformation[V]) + extends TypeInformation[ConsumerRecord[K, V]] { + + override def getTypeClass: Class[ConsumerRecord[K, V]] = classOf[ConsumerRecord[K, V]] + + @silent("deprecated") + override def createSerializer( + config: org.apache.flink.api.common.ExecutionConfig + ): TypeSerializer[ConsumerRecord[K, V]] = { + new ConsumerRecordSerializer[K, V](keyTypeInfo.createSerializer(config), valueTypeInfo.createSerializer(config)) + } + + // ConsumerRecord 8 simple fields + override def getArity: Int = 8 + + // TODO: find out what's the correct value here + // ConsumerRecord 8 fields (w/o: headers, key, value) + Headers 2 fields + key.fields + value.fields + override def getTotalFields: Int = 8 + 2 + keyTypeInfo.getTotalFields + valueTypeInfo.getTotalFields + + override def isKeyType: Boolean = false + + override def isBasicType: Boolean = false + + override def isTupleType: Boolean = false + + override def toString: String = + s"ConsumerRecordTypeInfo($keyTypeInfo, $valueTypeInfo)" + + override def canEqual(obj: Any): Boolean = + obj.isInstanceOf[ConsumerRecordTypeInfo[_, _]] + + override def equals(obj: Any): Boolean = + obj match { + case info: ConsumerRecordTypeInfo[_, _] => + keyTypeInfo.equals(info.keyTypeInfo) && valueTypeInfo.equals(info.valueTypeInfo) + case _ => false + } + + override def hashCode(): Int = + Objects.hashCode(keyTypeInfo, valueTypeInfo) +} + +class ConsumerRecordSerializer[K, V](val keySerializer: TypeSerializer[K], val valueSerializer: TypeSerializer[V]) + extends TypeSerializer[ConsumerRecord[K, V]] { + + override def getLength: Int = -1 + + override def isImmutableType: Boolean = true + + override def createInstance(): ConsumerRecord[K, V] = + new ConsumerRecord[K, V](null, 0, 0, null.asInstanceOf[K], null.asInstanceOf[V]) + + override def duplicate(): TypeSerializer[ConsumerRecord[K, V]] = { + val keyDuplicated = keySerializer.duplicate() + val valueDuplicated = valueSerializer.duplicate() + + if (keyDuplicated.equals(keySerializer) && valueDuplicated.equals(valueSerializer)) { + this + } else { + new ConsumerRecordSerializer(keyDuplicated, valueDuplicated) + } + } + + override def copy(record: ConsumerRecord[K, V]): ConsumerRecord[K, V] = + new ConsumerRecord[K, V]( + record.topic(), + record.partition(), + record.offset(), + record.timestamp(), + record.timestampType(), + ConsumerRecord.NULL_SIZE, + ConsumerRecord.NULL_SIZE, + record.key(), + record.value(), + record.headers(), + record.leaderEpoch() + ) + + override def copy(record: ConsumerRecord[K, V], reuse: ConsumerRecord[K, V]): ConsumerRecord[K, V] = + copy(record) + + override def copy(source: DataInputView, target: DataOutputView): Unit = + serialize(deserialize(source), target) + + override def serialize(record: ConsumerRecord[K, V], target: DataOutputView): Unit = { + target.writeUTF(record.topic()) + target.writeInt(record.partition()) + target.writeLong(record.offset()) + target.writeLong(record.timestamp()) + + // Short takes less space than int + target.writeShort(record.timestampType().id) + + target.writeInt(record.serializedKeySize()) + target.writeInt(record.serializedValueSize()) + + // Serialize the key (can be null) + if (record.key() == null) { + target.writeBoolean(false) + } else { + target.writeBoolean(true) + keySerializer.serialize(record.key(), target) + } + + // Serialize the value (can be null) + if (record.value() == null) { + target.writeBoolean(false) + } else { + target.writeBoolean(true) + valueSerializer.serialize(record.value(), target) + } + + if (record.leaderEpoch().isPresent) { + target.writeBoolean(true) + target.writeInt(record.leaderEpoch.get()) + } else { + target.writeBoolean(false) + } + + target.writeInt(record.headers().toArray.length) + record.headers().forEach { header => + target.writeUTF(header.key()) + target.writeInt(header.value().length) + target.write(header.value()) + } + } + + override def deserialize(reuse: ConsumerRecord[K, V], source: DataInputView): ConsumerRecord[K, V] = + deserialize(source) + + override def deserialize(source: DataInputView): ConsumerRecord[K, V] = { + val topic = source.readUTF() + val partition = source.readInt() + val offset = source.readLong() + val timestamp = source.readLong() + val timestampTypeId = source.readShort().toInt + val serializedKeySize = source.readInt() + val serializedValueSize = source.readInt() + + val key = if (source.readBoolean()) keySerializer.deserialize(source) else null.asInstanceOf[K] + val value = if (source.readBoolean()) valueSerializer.deserialize(source) else null.asInstanceOf[V] + val leaderEpoch = if (source.readBoolean()) Optional.of[Integer](source.readInt()) else Optional.empty[Integer] + + val headers = (0 until source.readInt()).foldLeft(new RecordHeaders) { (headers, _) => + val name = source.readUTF() + val len = source.readInt() + + val value = new Array[Byte](len) + source.read(value) + + val header = new RecordHeader(name, value) + headers.add(header) + headers + } + + val timestampType = + TimestampType + .values() + .toList + .find(_.id == timestampTypeId) + .getOrElse(throw new IllegalArgumentException(s"Unknown TimestampType id: $timestampTypeId.")) + + new ConsumerRecord[K, V]( + topic, + partition, + offset, + timestamp, + timestampType, + serializedKeySize, + serializedValueSize, + key, + value, + headers, + leaderEpoch + ) + } + + override def snapshotConfiguration(): TypeSerializerSnapshot[ConsumerRecord[K, V]] = + new ConsumerRecordTypeSerializerSnapshot() + + override def equals(obj: Any): Boolean = { + obj match { + case other: ConsumerRecordSerializer[_, _] => + keySerializer.equals(other.keySerializer) && valueSerializer.equals(other.valueSerializer) + case _ => false + } + } + + override def hashCode(): Int = + Objects.hashCode(keySerializer, valueSerializer) + +} diff --git a/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/sink/flink/FlinkKafkaUniversalSink.scala b/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/sink/flink/FlinkKafkaUniversalSink.scala index a7563c0ac54..4ef5e35d98a 100644 --- a/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/sink/flink/FlinkKafkaUniversalSink.scala +++ b/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/sink/flink/FlinkKafkaUniversalSink.scala @@ -3,6 +3,7 @@ package pl.touk.nussknacker.engine.schemedkafka.sink.flink import com.typesafe.scalalogging.LazyLogging import io.confluent.kafka.schemaregistry.ParsedSchema import org.apache.flink.api.common.functions.{RichMapFunction, RuntimeContext} +import org.apache.flink.api.common.typeinfo.{TypeInformation, Types} import org.apache.flink.configuration.Configuration import org.apache.flink.formats.avro.typeutils.NkSerializableParsedSchema import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink} @@ -13,6 +14,7 @@ import pl.touk.nussknacker.engine.api.validation.ValidationMode import pl.touk.nussknacker.engine.api.{Context, LazyParameter, ValueWithContext} import pl.touk.nussknacker.engine.flink.api.exception.{ExceptionHandler, WithExceptionHandler} import pl.touk.nussknacker.engine.flink.api.process.{FlinkCustomNodeContext, FlinkSink} +import pl.touk.nussknacker.engine.flink.typeinformation.KeyedValueType import pl.touk.nussknacker.engine.flink.util.keyed import pl.touk.nussknacker.engine.flink.util.keyed.KeyedValueMapper import pl.touk.nussknacker.engine.kafka.serialization.KafkaSerializationSchema @@ -40,12 +42,21 @@ class FlinkKafkaUniversalSink( override def registerSink( dataStream: DataStream[ValueWithContext[Value]], flinkNodeContext: FlinkCustomNodeContext - ): DataStreamSink[_] = - // FIXME: Missing map TypeInformation + ): DataStreamSink[_] = { + + // TODO: Creating TypeInformation for Avro / Json Schema is difficult because of schema evolution, therefore we rely on Kryo, e.g. serializer for GenericRecordWithSchemaId + val typeInfo = KeyedValueType + .info( + Types.STRING, // KafkaSink for key supports only String + Types.GENERIC(classOf[AnyRef]) + ) + .asInstanceOf[TypeInformation[KeyedValue[AnyRef, AnyRef]]] + dataStream - .map(new EncodeAvroRecordFunction(flinkNodeContext)) + .map(new EncodeAvroRecordFunction(flinkNodeContext), typeInfo) .filter(_.value != null) .addSink(toFlinkFunction) + } def prepareValue( ds: DataStream[Context], diff --git a/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/source/flink/FlinkKafkaSchemaRegistryBasedKeyValueDeserializationSchemaFactory.scala b/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/source/flink/FlinkKafkaSchemaRegistryBasedKeyValueDeserializationSchemaFactory.scala new file mode 100644 index 00000000000..3045db38a58 --- /dev/null +++ b/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/source/flink/FlinkKafkaSchemaRegistryBasedKeyValueDeserializationSchemaFactory.scala @@ -0,0 +1,68 @@ +package pl.touk.nussknacker.engine.schemedkafka.source.flink + +import io.confluent.kafka.schemaregistry.ParsedSchema +import org.apache.flink.api.common.typeinfo.{TypeInformation, Types} +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.serialization.Deserializer +import pl.touk.nussknacker.engine.flink.api.typeinformation.TypeInformationDetection +import pl.touk.nussknacker.engine.kafka.KafkaConfig +import pl.touk.nussknacker.engine.kafka.consumerrecord.ConsumerRecordKafkaDeserializationSchema +import pl.touk.nussknacker.engine.kafka.serialization.KafkaDeserializationSchema +import pl.touk.nussknacker.engine.schemedkafka.RuntimeSchemaData +import pl.touk.nussknacker.engine.schemedkafka.flink.typeinfo.ConsumerRecordTypeInfo +import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.SchemaRegistryClientFactory +import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.serialization.{ + KafkaSchemaRegistryBasedKeyValueDeserializationSchemaFactory, + SchemaRegistryBasedDeserializerFactory +} + +import scala.reflect.ClassTag + +class FlinkKafkaSchemaRegistryBasedKeyValueDeserializationSchemaFactory( + schemaRegistryClientFactory: SchemaRegistryClientFactory, + deserializerFactory: SchemaRegistryBasedDeserializerFactory +) extends KafkaSchemaRegistryBasedKeyValueDeserializationSchemaFactory( + schemaRegistryClientFactory, + deserializerFactory + ) { + + override def create[K: ClassTag, V: ClassTag]( + kafkaConfig: KafkaConfig, + keySchemaDataOpt: Option[RuntimeSchemaData[ParsedSchema]], + valueSchemaDataOpt: Option[RuntimeSchemaData[ParsedSchema]] + ): KafkaDeserializationSchema[ConsumerRecord[K, V]] = { + + // We extend by ResultTypeQueryable because we want to use Flink TypeInformation + new ConsumerRecordKafkaDeserializationSchema[K, V] with ResultTypeQueryable[ConsumerRecord[K, V]] { + + @transient + override protected lazy val keyDeserializer: Deserializer[K] = + createKeyOrUseStringDeserializer[K](keySchemaDataOpt, kafkaConfig) + + @transient + override protected lazy val valueDeserializer: Deserializer[V] = + createValueDeserializer[V](valueSchemaDataOpt, kafkaConfig) + + private lazy val typeInformationDetector = TypeInformationDetection.instance + + private lazy val keyTypeInfo: TypeInformation[K] = { + if (kafkaConfig.useStringForKey) { + Types.STRING.asInstanceOf[TypeInformation[K]] + } else { + // TODO: Creating TypeInformation for Avro / Json Schema is difficult because of schema evolution, therefore we rely on Kryo, e.g. serializer for GenericRecordWithSchemaId + typeInformationDetector.forClass[K] + } + } + + private lazy val valueTypeInfo: TypeInformation[V] = + // TODO: Creating TypeInformation for Avro / Json Schema is difficult because of schema evolution, therefore we rely on Kryo, e.g. serializer for GenericRecordWithSchemaId + typeInformationDetector.forClass[V] + + override def getProducedType: TypeInformation[ConsumerRecord[K, V]] = + new ConsumerRecordTypeInfo(keyTypeInfo, valueTypeInfo) + + } + } + +} diff --git a/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/flink/typeinfo/ConsumerRecordSerializerSpec.scala b/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/flink/typeinfo/ConsumerRecordSerializerSpec.scala new file mode 100644 index 00000000000..bc7335aeb0f --- /dev/null +++ b/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/flink/typeinfo/ConsumerRecordSerializerSpec.scala @@ -0,0 +1,118 @@ +package pl.touk.nussknacker.engine.schemedkafka.flink.typeinfo + +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.serialization.SerializerConfigImpl +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.typeutils.TypeExtractor +import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper} +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.header.internals.{RecordHeader, RecordHeaders} +import org.apache.kafka.common.record.TimestampType +import org.scalatest.Assertion +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.must.Matchers +import org.scalatest.prop.TableDrivenPropertyChecks +import pl.touk.nussknacker.engine.flink.test.FlinkTestConfiguration +import pl.touk.nussknacker.test.ProcessUtils.convertToAnyShouldWrapper + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.nio.charset.StandardCharsets +import java.time.LocalDate +import java.util.Optional +import scala.reflect.{ClassTag, classTag} +import scala.util.Random + +class ConsumerRecordSerializerSpec extends AnyFunSuite with Matchers with TableDrivenPropertyChecks { + + import pl.touk.nussknacker.test.RandomImplicits._ + + private val bufferSize = 1024 + + private val serializerConfig = { + val executionConfig = new ExecutionConfig() + val configuration = FlinkTestConfiguration.configuration() + new SerializerConfigImpl(configuration, executionConfig) + } + + test("should serialize and deserialize simple record") { + val table = Table[ConsumerRecord[_, _]]( + "record", + createConsumerRecord(Random.nextInt(), Random.nextInt()), + createConsumerRecord(Random.nextString(), Random.nextString()), + createConsumerRecord(Random.nextInt(), Random.nextString()), + createConsumerRecord(Random.nextString(), Random.nextInt()), + createConsumerRecord(Random.nextString(), Random.nextFloat()), + createConsumerRecord(Random.nextString(), Random.nextDouble()), + createConsumerRecord(Random.nextString(), Random.nextBoolean()), + createConsumerRecord(Random.nextString(), LocalDate.now()), + ) + + forAll(table) { record => + val results = serializeAndDeserialize(record) + compare(results, record) + } + + } + + // ConsumerRecord doesn't implement hashCode & equals methods + private def compare(result: ConsumerRecord[_, _], expected: ConsumerRecord[_, _]): Assertion = { + result.topic() shouldBe expected.topic() + result.partition() shouldBe expected.partition() + result.offset() shouldBe expected.offset() + result.timestamp() shouldBe expected.timestamp() + result.timestampType() shouldBe expected.timestampType() + result.serializedKeySize() shouldBe expected.serializedKeySize() + result.serializedValueSize() shouldBe expected.serializedValueSize() + result.key() shouldBe expected.key() + result.value() shouldBe expected.value() + result.headers() shouldBe expected.headers() + result.leaderEpoch() shouldBe expected.leaderEpoch() + } + + private def createConsumerRecord[K, V](key: K, value: V): ConsumerRecord[K, V] = { + val timestampTypes = TimestampType.values().toList + val timestampType = timestampTypes(Random.nextInt(timestampTypes.length)) + + val leaderEpoch = + if (System.currentTimeMillis() % 2 == 0) Optional.empty[Integer]() + else Optional.of(Random.nextInt().asInstanceOf[Integer]) + + val headers = (0 until Random.nextInt(25)).foldLeft(new RecordHeaders()) { (headers, _) => + headers.add(new RecordHeader(Random.nextString(), Random.nextString().getBytes(StandardCharsets.UTF_8))) + headers + } + + new ConsumerRecord[K, V]( + Random.nextString(), + Random.nextInt(), + Random.nextLong(), + Random.nextLong(), + timestampType, + ConsumerRecord.NULL_SIZE, + ConsumerRecord.NULL_SIZE, + key, + value, + headers, + leaderEpoch + ) + } + + private def serializeAndDeserialize[K: ClassTag, V: ClassTag](in: ConsumerRecord[K, V]): ConsumerRecord[K, V] = { + val keySerializer = getSerializer[K] + val valueSerializer = getSerializer[V] + + val serializer = new ConsumerRecordSerializer(keySerializer, valueSerializer) + + val outStream = new ByteArrayOutputStream(bufferSize) + val outWrapper = new DataOutputViewStreamWrapper(outStream) + + serializer.serialize(in, outWrapper) + serializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(outStream.toByteArray))) + } + + private def getSerializer[T: ClassTag]: TypeSerializer[T] = + TypeExtractor + .getForClass(classTag[T].runtimeClass.asInstanceOf[Class[T]]) + .createSerializer(serializerConfig) + +} diff --git a/utils/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/consumerrecord/ConsumerRecordDeserializationSchemaFactory.scala b/utils/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/consumerrecord/ConsumerRecordDeserializationSchemaFactory.scala index cfc8b264854..d2cf4776338 100644 --- a/utils/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/consumerrecord/ConsumerRecordDeserializationSchemaFactory.scala +++ b/utils/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/consumerrecord/ConsumerRecordDeserializationSchemaFactory.scala @@ -1,11 +1,10 @@ package pl.touk.nussknacker.engine.kafka.consumerrecord import cats.data.NonEmptyList -import com.github.ghik.silencer.silent import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.Deserializer import pl.touk.nussknacker.engine.api.process.TopicName -import pl.touk.nussknacker.engine.kafka.serialization.KafkaDeserializationSchemaFactory +import pl.touk.nussknacker.engine.kafka.serialization.{KafkaDeserializationSchema, KafkaDeserializationSchemaFactory} import pl.touk.nussknacker.engine.kafka.{KafkaConfig, serialization} /** @@ -28,36 +27,48 @@ abstract class ConsumerRecordDeserializationSchemaFactory[K, V] topics: NonEmptyList[TopicName.ForSource], kafkaConfig: KafkaConfig ): serialization.KafkaDeserializationSchema[ConsumerRecord[K, V]] = { - - new serialization.KafkaDeserializationSchema[ConsumerRecord[K, V]] { + new ConsumerRecordKafkaDeserializationSchema[K, V] { @transient - private lazy val keyDeserializer = createKeyDeserializer(kafkaConfig) + override protected lazy val keyDeserializer: Deserializer[K] = + createKeyDeserializer(kafkaConfig) + @transient - private lazy val valueDeserializer = createValueDeserializer(kafkaConfig) - - @silent("deprecated") // using deprecated constructor for Flink 1.14/15 compatibility - override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): ConsumerRecord[K, V] = { - val key = keyDeserializer.deserialize(record.topic(), record.key()) - val value = valueDeserializer.deserialize(record.topic(), record.value()) - new ConsumerRecord[K, V]( - record.topic(), - record.partition(), - record.offset(), - record.timestamp(), - record.timestampType(), - ConsumerRecord.NULL_CHECKSUM.longValue(), - record.serializedKeySize(), - record.serializedValueSize(), - key, - value, - record.headers(), - record.leaderEpoch() - ) - } - - override def isEndOfStream(nextElement: ConsumerRecord[K, V]): Boolean = false + override protected lazy val valueDeserializer: Deserializer[V] = + createValueDeserializer(kafkaConfig) + } } } + +trait ConsumerRecordKafkaDeserializationSchema[K, V] extends KafkaDeserializationSchema[ConsumerRecord[K, V]] { + + @transient + protected val keyDeserializer: Deserializer[K] + + @transient + protected val valueDeserializer: Deserializer[V] + + override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): ConsumerRecord[K, V] = { + val key = keyDeserializer.deserialize(record.topic(), record.headers(), record.key()) + val value = valueDeserializer.deserialize(record.topic(), record.headers(), record.value()) + + new ConsumerRecord[K, V]( + record.topic(), + record.partition(), + record.offset(), + record.timestamp(), + record.timestampType(), + record.serializedKeySize(), + record.serializedValueSize(), + key, + value, + record.headers(), + record.leaderEpoch() + ) + } + + override def isEndOfStream(nextElement: ConsumerRecord[K, V]): Boolean = false + +} diff --git a/utils/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/consumerrecord/SerializableConsumerRecord.scala b/utils/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/consumerrecord/SerializableConsumerRecord.scala index 41e7dae7b0f..8a6d6f93081 100644 --- a/utils/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/consumerrecord/SerializableConsumerRecord.scala +++ b/utils/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/consumerrecord/SerializableConsumerRecord.scala @@ -1,6 +1,5 @@ package pl.touk.nussknacker.engine.kafka.consumerrecord -import com.github.ghik.silencer.silent import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.record.TimestampType import pl.touk.nussknacker.engine.api.process.TopicName @@ -28,7 +27,6 @@ case class SerializableConsumerRecord[K, V]( /** * Converts SerializableConsumerRecord to ConsumerRecord, uses default values in case of missing attributes. */ - @silent("deprecated") // using deprecated constructor for Flink 1.14/15 compatibility def toKafkaConsumerRecord( formatterTopic: TopicName.ForSource, serializeKeyValue: (Option[K], V) => (Array[Byte], Array[Byte]) @@ -43,7 +41,6 @@ case class SerializableConsumerRecord[K, V]( offset.getOrElse(0L), timestamp.getOrElse(ConsumerRecord.NO_TIMESTAMP), timestampType.map(TimestampType.forName).getOrElse(TimestampType.NO_TIMESTAMP_TYPE), - ConsumerRecord.NULL_CHECKSUM.longValue(), ConsumerRecord.NULL_SIZE, ConsumerRecord.NULL_SIZE, keyBytes, diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalSchemaBasedSerdeProvider.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalSchemaBasedSerdeProvider.scala index 36e5a2b1720..acb539b703c 100644 --- a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalSchemaBasedSerdeProvider.scala +++ b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalSchemaBasedSerdeProvider.scala @@ -35,7 +35,7 @@ object UniversalSchemaBasedSerdeProvider { ) } - private def createSchemaIdFromMessageExtractor( + def createSchemaIdFromMessageExtractor( schemaRegistryClient: SchemaRegistryClient ): ChainedSchemaIdFromMessageExtractor = { val isConfluent = schemaRegistryClient.isInstanceOf[ConfluentSchemaRegistryClient] diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/serialization/KafkaSchemaBasedKeyValueDeserializationSchemaFactory.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/serialization/KafkaSchemaBasedKeyValueDeserializationSchemaFactory.scala index f5490f82b1b..fc6ee285886 100644 --- a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/serialization/KafkaSchemaBasedKeyValueDeserializationSchemaFactory.scala +++ b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/serialization/KafkaSchemaBasedKeyValueDeserializationSchemaFactory.scala @@ -1,11 +1,11 @@ package pl.touk.nussknacker.engine.schemedkafka.serialization -import com.github.ghik.silencer.silent import io.confluent.kafka.schemaregistry.ParsedSchema import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.{Deserializer, StringDeserializer} import pl.touk.nussknacker.engine.schemedkafka.RuntimeSchemaData import pl.touk.nussknacker.engine.kafka.KafkaConfig +import pl.touk.nussknacker.engine.kafka.consumerrecord.ConsumerRecordKafkaDeserializationSchema import pl.touk.nussknacker.engine.kafka.serialization.KafkaDeserializationSchema import scala.reflect.ClassTag @@ -18,6 +18,17 @@ import scala.reflect.ClassTag abstract class KafkaSchemaBasedKeyValueDeserializationSchemaFactory extends KafkaSchemaBasedDeserializationSchemaFactory { + protected def createKeyOrUseStringDeserializer[K: ClassTag]( + schemaDataOpt: Option[RuntimeSchemaData[ParsedSchema]], + kafkaConfig: KafkaConfig + ): Deserializer[K] = { + if (kafkaConfig.useStringForKey) { + createStringKeyDeserializer.asInstanceOf[Deserializer[K]] + } else { + createKeyDeserializer[K](schemaDataOpt, kafkaConfig) + } + } + protected def createKeyDeserializer[K: ClassTag]( schemaDataOpt: Option[RuntimeSchemaData[ParsedSchema]], kafkaConfig: KafkaConfig @@ -36,40 +47,18 @@ abstract class KafkaSchemaBasedKeyValueDeserializationSchemaFactory valueSchemaDataOpt: Option[RuntimeSchemaData[ParsedSchema]] ): KafkaDeserializationSchema[ConsumerRecord[K, V]] = { - new KafkaDeserializationSchema[ConsumerRecord[K, V]] { + new ConsumerRecordKafkaDeserializationSchema[K, V] { @transient - private lazy val keyDeserializer = if (kafkaConfig.useStringForKey) { - createStringKeyDeserializer.asInstanceOf[Deserializer[K]] - } else { - createKeyDeserializer[K](keySchemaDataOpt, kafkaConfig) - } - @transient - private lazy val valueDeserializer = createValueDeserializer[V](valueSchemaDataOpt, kafkaConfig) - - @silent("deprecated") // using deprecated constructor for Flink 1.14/15 compatibility - override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): ConsumerRecord[K, V] = { - val key = keyDeserializer.deserialize(record.topic(), record.headers(), record.key()) - val value = valueDeserializer.deserialize(record.topic(), record.headers(), record.value()) - new ConsumerRecord[K, V]( - record.topic(), - record.partition(), - record.offset(), - record.timestamp(), - record.timestampType(), - ConsumerRecord.NULL_CHECKSUM.longValue(), - record.serializedKeySize(), - record.serializedValueSize(), - key, - value, - record.headers(), - record.leaderEpoch() - ) - } + override protected lazy val keyDeserializer: Deserializer[K] = + createKeyOrUseStringDeserializer[K](keySchemaDataOpt, kafkaConfig) - override def isEndOfStream(nextElement: ConsumerRecord[K, V]): Boolean = false + @transient + override protected lazy val valueDeserializer: Deserializer[V] = + createValueDeserializer[V](valueSchemaDataOpt, kafkaConfig) } + } } diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/sink/UniversalKafkaSinkFactory.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/sink/UniversalKafkaSinkFactory.scala index a672d844e35..2c584efc02e 100644 --- a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/sink/UniversalKafkaSinkFactory.scala +++ b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/sink/UniversalKafkaSinkFactory.scala @@ -238,6 +238,7 @@ class UniversalKafkaSinkFactory( Option(finalState.schema), kafkaConfig ) + val clientId = s"${TypedNodeDependency[MetaData].extract(dependencies).name}-${preparedTopic.prepared}" val validationMode = if (params.extractUnsafe[Boolean](sinkRawEditorParamName)) { validationModeParamDeclaration.extractValue(params) match { diff --git a/utils/test-utils/src/main/scala/pl/touk/nussknacker/test/RandomImplicits.scala b/utils/test-utils/src/main/scala/pl/touk/nussknacker/test/RandomImplicits.scala new file mode 100644 index 00000000000..5a6825da1cb --- /dev/null +++ b/utils/test-utils/src/main/scala/pl/touk/nussknacker/test/RandomImplicits.scala @@ -0,0 +1,25 @@ +package pl.touk.nussknacker.test + +import scala.util.Random + +object RandomImplicits { + + implicit class RandomExt(rand: Random) { + + private val AllowedStringLetters = ('a' to 'z') ++ ('A' to 'Z') + + private val MinStringLength = 4 + + private val MaxStringLength = 32 + + def nextString(): String = + randomString(MinStringLength + rand.nextInt(MaxStringLength - MinStringLength)) + + def randomString(length: Int): String = { + require(length >= 0, "Length must be non-negative") + (0 until length).map(_ => AllowedStringLetters(rand.nextInt(AllowedStringLetters.length))).mkString + } + + } + +}