From 0fe0531bde582c131eb1bf3e72819d4a4a28d1d4 Mon Sep 17 00:00:00 2001 From: Stefan Bocutiu Date: Mon, 25 Nov 2024 09:54:34 +0000 Subject: [PATCH] Feat/es suport pk from key (#162) * ElasticSearch Document Primary Key The ES sink connector misses the feature of choosing the key from the Key or Header. No SMT would help move data from the Key into the Value payload so that the connector can work in the scenarios where the Key or a Header carries information to be used as part of the ElasticSearch document primary key. The change refines the TransformAndExtractPK to take the Key and Headers. It adds tests that were missing for PrimaryKeyExtractor, JsonPayloadExtractor and TransformAndExtractPK * Improve the code complexity Co-authored-by: David Sloan <33483659+davidsloan@users.noreply.github.com> * Improve the test for json payload to mix ing OptionValues and reduce the code required Make the _key/_value/_header a constant. * Avoid deseralising the key a json if there is not _key path in the primary keys list * Enhances the functionality of PK path extraction by allowing the path to be specified as _key or nested paths like _key.fieldA.fieldB. This change broadens the scope of supported incoming types, ensuring compatibility with all Kafka Connect Struct types, as well as schemaless input. It provides more flexibility and robustness in handling diverse data formats for primary key extraction. * Fix the unit tests and the handling of bytes/string * Remove unused import --------- Co-authored-by: stheppi Co-authored-by: David Sloan <33483659+davidsloan@users.noreply.github.com> --- .../connect/elastic6/ElasticJsonWriter.scala | 20 +- .../elastic6/JsonPayloadExtractor.scala | 215 +++++++ .../connect/elastic6/KafkaMessageParts.scala | 23 + .../elastic6/PrimaryKeyExtractor.scala | 12 +- .../elastic6/TransformAndExtractPK.scala | 161 +++-- .../elastic6/JsonPayloadExtractorTest.scala | 188 ++++++ .../elastic6/PrimaryKeyExtractorTest.scala | 269 ++++++++ .../elastic6/TransformAndExtractPKTest.scala | 601 ++++++++++++++++++ .../connect/elastic7/ElasticJsonWriter.scala | 4 +- .../elastic7/JsonPayloadExtractor.scala | 208 ++++++ .../connect/elastic7/KafkaMessageParts.scala | 22 + .../elastic7/PrimaryKeyExtractor.scala | 12 +- .../elastic7/TransformAndExtractPK.scala | 155 +++-- .../elastic7/JsonPayloadExtractorTest.scala | 188 ++++++ .../elastic7/PrimaryKeyExtractorTest.scala | 268 ++++++++ .../elastic7/TransformAndExtractPKTest.scala | 601 ++++++++++++++++++ 16 files changed, 2756 insertions(+), 191 deletions(-) create mode 100644 kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/JsonPayloadExtractor.scala create mode 100644 kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/KafkaMessageParts.scala create mode 100644 kafka-connect-elastic6/src/test/scala/io/lenses/streamreactor/connect/elastic6/JsonPayloadExtractorTest.scala create mode 100644 kafka-connect-elastic6/src/test/scala/io/lenses/streamreactor/connect/elastic6/PrimaryKeyExtractorTest.scala create mode 100644 kafka-connect-elastic6/src/test/scala/io/lenses/streamreactor/connect/elastic6/TransformAndExtractPKTest.scala create mode 100644 kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/JsonPayloadExtractor.scala create mode 100644 kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/KafkaMessageParts.scala create mode 100644 kafka-connect-elastic7/src/test/scala/io/lenses/streamreactor/connect/elastic7/JsonPayloadExtractorTest.scala create mode 100644 kafka-connect-elastic7/src/test/scala/io/lenses/streamreactor/connect/elastic7/PrimaryKeyExtractorTest.scala create mode 100644 kafka-connect-elastic7/src/test/scala/io/lenses/streamreactor/connect/elastic7/TransformAndExtractPKTest.scala diff --git a/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/ElasticJsonWriter.scala b/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/ElasticJsonWriter.scala index 5aab0f914f..3ea9398bf3 100644 --- a/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/ElasticJsonWriter.scala +++ b/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/ElasticJsonWriter.scala @@ -145,11 +145,13 @@ class ElasticJsonWriter(client: KElasticClient, settings: ElasticSettings) ) } else { TransformAndExtractPK( - kcqlValue.fields, - kcqlValue.primaryKeysPath, + kcqlValue, r.valueSchema(), r.value(), kcql.hasRetainStructure, + r.keySchema(), + r.key(), + r.headers(), ) } val idFromPk = pks.mkString(settings.pkJoinerSeparator) @@ -211,15 +213,13 @@ class ElasticJsonWriter(client: KElasticClient, settings: ElasticSettings) pks.mkString(settings.pkJoinerSeparator) } - private case class KcqlValues( - fields: Seq[Field], - ignoredFields: Seq[Field], - primaryKeysPath: Seq[Vector[String]], - behaviorOnNullValues: NullValueBehavior, - ) - } - +case class KcqlValues( + fields: Seq[Field], + ignoredFields: Seq[Field], + primaryKeysPath: Seq[Vector[String]], + behaviorOnNullValues: NullValueBehavior, +) case object IndexableJsonNode extends Indexable[JsonNode] { override def json(t: JsonNode): String = t.toString } diff --git a/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/JsonPayloadExtractor.scala b/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/JsonPayloadExtractor.scala new file mode 100644 index 0000000000..a13115283f --- /dev/null +++ b/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/JsonPayloadExtractor.scala @@ -0,0 +1,215 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.elastic6 + +import cats.implicits.catsSyntaxEitherId +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.BigIntegerNode +import com.fasterxml.jackson.databind.node.BooleanNode +import com.fasterxml.jackson.databind.node.DecimalNode +import com.fasterxml.jackson.databind.node.DoubleNode +import com.fasterxml.jackson.databind.node.FloatNode +import com.fasterxml.jackson.databind.node.IntNode +import com.fasterxml.jackson.databind.node.LongNode +import com.fasterxml.jackson.databind.node.TextNode +import io.lenses.json.sql.JacksonJson +import io.lenses.streamreactor.connect.json.SimpleJsonConverter +import org.apache.kafka.connect.data.Schema +import org.apache.kafka.connect.data.Struct + +import java.nio.ByteBuffer +import scala.jdk.CollectionConverters.CollectionHasAsScala +import scala.jdk.CollectionConverters.MapHasAsScala +import scala.util.Failure +import scala.util.Success +import scala.util.Try + +object JsonPayloadExtractor { + lazy val simpleJsonConverter = new SimpleJsonConverter() + + def extractJsonNode(value: Any, schema: Schema): Either[String, Option[JsonNode]] = + (Option(value), Option(schema).map(_.`type`())) match { + case (None, _) => Right(None) + case (Some(_), Some(Schema.Type.BYTES)) => handleBytes(value) + case (Some(_), Some(Schema.Type.STRING)) => handleString(value) + case (Some(_), Some(Schema.Type.INT8)) => handleLong(value) + case (Some(_), Some(Schema.Type.INT16)) => handleLong(value) + case (Some(_), Some(Schema.Type.INT32)) => handleLong(value) + case (Some(_), Some(Schema.Type.INT64)) => handleLong(value) + case (Some(_), Some(Schema.Type.FLOAT32)) => handleFloat(value) + case (Some(_), Some(Schema.Type.FLOAT64)) => handleDouble(value) + case (Some(_), Some(Schema.Type.STRUCT)) => handleStruct(value) + case (Some(_), Some(Schema.Type.BOOLEAN)) => handleBoolean(value) + case (Some(_), Some(Schema.Type.ARRAY)) => handleArray(value) + case (Some(_), Some(Schema.Type.MAP)) => handleMap(value) + case (Some(_), Some(other)) => Left(s"Unsupported Schema type: $other") + case (Some(v), None) => handleSchemaLess(v) + } + + private def handleArray(value: Any): Either[String, Option[JsonNode]] = + value match { + case l: Iterable[_] => + val arrayNode = JacksonJson.mapper.createArrayNode() + l.foreach { item => + extractJsonNode(item, null) match { + case Right(Some(node)) => arrayNode.add(node) + case Right(None) => // ignore + case Left(err) => return Left(err) + } + } + Right(Some(arrayNode)) + + case jc: java.util.Collection[_] => + val arrayNode = JacksonJson.mapper.createArrayNode() + jc.asScala.foreach { item => + extractJsonNode(item, null) match { + case Right(Some(node)) => arrayNode.add(node) + case Right(None) => // ignore + case Left(err) => return Left(err) + } + } + Right(Some(arrayNode)) + case a: Array[_] => + val arrayNode = JacksonJson.mapper.createArrayNode() + a.foreach { item => + extractJsonNode(item, null) match { + case Right(Some(node)) => arrayNode.add(node) + case Right(None) => // ignore + case Left(err) => return Left(err) + } + } + Right(Some(arrayNode)) + case other => Left(s"Expected array but got: $other") + } + + private def handleMap(value: Any): Either[String, Option[JsonNode]] = + value match { + case m: java.util.Map[_, _] => + val map = m.asInstanceOf[java.util.Map[String, Any]] + val mapNode = JacksonJson.mapper.createObjectNode() + map.asScala.foreach { + case (key, value) => + extractJsonNode(value, null) match { + case Right(Some(node)) => mapNode.set(key, node) + case Right(None) => // ignore + case Left(err) => return Left(err) + } + } + Right(Some(mapNode)) + case other => Left(s"Expected map but got: $other") + } + private def handleBoolean(value: Any): Either[String, Option[JsonNode]] = + value match { + case b: Boolean => Some(BooleanNode.valueOf(b)).asRight[String] + case other => Left(s"Expected boolean but got: $other") + } + private def handleDouble(value: Any): Either[String, Option[JsonNode]] = + value match { + case f: Float => Some(DoubleNode.valueOf(f.toDouble)).asRight[String] + case d: Double => Some(DoubleNode.valueOf(d)).asRight[String] + case other => Left(s"Expected double but got: $other") + } + + private def handleFloat(value: Any): Either[String, Option[JsonNode]] = + value match { + case f: Float => Some(FloatNode.valueOf(f)).asRight[String] + case d: Double => Some(FloatNode.valueOf(d.toFloat)).asRight[String] + case other => Left(s"Expected float but got: $other") + } + + private def handleLong(value: Any): Either[String, Option[JsonNode]] = + value match { + case b: Byte => Some(LongNode.valueOf(b.toLong)).asRight[String] + case s: Short => Some(LongNode.valueOf(s.toLong)).asRight[String] + case i: Int => Some(LongNode.valueOf(i.toLong)).asRight[String] + case l: Long => Some(LongNode.valueOf(l)).asRight[String] + case other => Left(s"Expected long but got: $other") + } + + private def handleBytes(value: Any): Either[String, Option[JsonNode]] = + value match { + case bytes: Array[Byte] => + tryReadJson(bytes).map(Some(_)) + case byteBuffer: ByteBuffer => + val bytes = new Array[Byte](byteBuffer.remaining()) + byteBuffer.get(bytes) + tryReadJson(bytes).map(Some(_)) + case other => Left(s"Expected byte array or ByteBuffer but got: $other") + } + + private def handleString(value: Any): Either[String, Option[JsonNode]] = + value match { + case s: String => + tryParseJson(s).map(Some(_)) match { + case Left(_) => TextNode.valueOf(s).asRight[String].map(Some(_)) + case r => r + } + case other => Left(s"Expected string but got: $other") + } + + private def handleStruct(value: Any): Either[String, Option[JsonNode]] = + value match { + case struct: Struct => + Try(simpleJsonConverter.fromConnectData(struct.schema(), struct)) match { + case Success(jsonNode) => Right(Some(jsonNode)) + case Failure(e) => Left(s"Failed to convert Struct to JsonNode: ${e.getMessage}") + } + case other => Left(s"Expected Struct but got: $other") + } + + private def handleSchemaLess(value: Any): Either[String, Option[JsonNode]] = + value match { + case m: java.util.Map[_, _] => + Try { + val map = m.asInstanceOf[java.util.Map[String, Any]] + JacksonJson.mapper.valueToTree[JsonNode](map) + } match { + case Success(node) => Right(Some(node)) + case Failure(e) => Left(s"Failed to convert Map to JsonNode: ${e.getMessage}") + } + + case s: String => + tryParseJson(s).map(Some(_)) match { + case Left(_) => TextNode.valueOf(s).asRight[String].map(Some(_)) + case r => r + } + case b: Array[Byte] => tryReadJson(b).map(Some(_)) + case b: Byte => IntNode.valueOf(b.toInt).asRight[String].map(Some(_)) + case s: Short => IntNode.valueOf(s.toInt).asRight[String].map(Some(_)) + case i: Int => IntNode.valueOf(i).asRight[String].map(Some(_)) + case l: Long => LongNode.valueOf(l).asRight[String].map(Some(_)) + case f: Float => FloatNode.valueOf(f).asRight[String].map(Some(_)) + case double: Double => DoubleNode.valueOf(double).asRight[String].map(Some(_)) + case bigDecimal: BigDecimal => DecimalNode.valueOf(bigDecimal.bigDecimal).asRight[String].map(Some(_)) + case bigDecimal: java.math.BigDecimal => DecimalNode.valueOf(bigDecimal).asRight[String].map(Some(_)) + case boolean: Boolean => BooleanNode.valueOf(boolean).asRight[String].map(Some(_)) + case bi: BigInt => BigIntegerNode.valueOf(bi.bigInteger).asRight[String].map(Some(_)) + case bi: java.math.BigInteger => BigIntegerNode.valueOf(bi).asRight[String].map(Some(_)) + case other => Left(s"Unsupported value type: ${other.getClass.getName}") + } + + private def tryParseJson(str: String): Either[String, JsonNode] = + Try(JacksonJson.asJson(str)) match { + case Success(json) => Right(json) + case Failure(e) => Left(s"Invalid JSON string: ${e.getMessage}") + } + + private def tryReadJson(bytes: Array[Byte]): Either[String, JsonNode] = + Try(JacksonJson.mapper.readTree(bytes)) match { + case Success(json) => Right(json) + case Failure(e) => Left(s"Invalid JSON bytes: ${e.getMessage}") + } +} diff --git a/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/KafkaMessageParts.scala b/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/KafkaMessageParts.scala new file mode 100644 index 0000000000..9fe64ddfcd --- /dev/null +++ b/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/KafkaMessageParts.scala @@ -0,0 +1,23 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.elastic6 + +object KafkaMessageParts { + val Key = "_key" + val Value = "_value" + val Header = "_header" + val Topic = "_topic" +} diff --git a/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/PrimaryKeyExtractor.scala b/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/PrimaryKeyExtractor.scala index 34e45a4ff0..471d745f58 100644 --- a/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/PrimaryKeyExtractor.scala +++ b/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/PrimaryKeyExtractor.scala @@ -25,13 +25,13 @@ import scala.jdk.CollectionConverters.IteratorHasAsScala import scala.jdk.CollectionConverters.ListHasAsScala object PrimaryKeyExtractor { - def extract(node: JsonNode, path: Vector[String]): Any = { + def extract(node: JsonNode, path: Vector[String], prefix: String = ""): Any = { @tailrec def innerExtract(n: JsonNode, p: Vector[String]): Any = { def checkValidPath(): Unit = if (p.nonEmpty) { throw new IllegalArgumentException( - s"Invalid field selection for '${path.mkString(".")}'. It doesn't resolve to a primitive field", + s"Invalid field selection for '$prefix${path.mkString(".")}'. It doesn't resolve to a primitive field", ) } @@ -79,24 +79,24 @@ object PrimaryKeyExtractor { case node: ObjectNode => if (p.isEmpty) { throw new IllegalArgumentException( - s"Invalid field selection for '${path.mkString(".")}'. The path is not resolving to a primitive field", + s"Invalid field selection for '$prefix${path.mkString(".")}'. The path is not resolving to a primitive field", ) } val childNode = Option(node.get(p.head)).getOrElse { throw new IllegalArgumentException( - s"Invalid field selection for '${path.mkString(".")}'. Can't find ${p.head} field. Field found are:${node.fieldNames().asScala.mkString(",")}", + s"Invalid field selection for '$prefix${path.mkString(".")}'. Can't find ${p.head} field. Field found are:${node.fieldNames().asScala.mkString(",")}", ) } innerExtract(childNode, p.tail) case _: ArrayNode => throw new IllegalArgumentException( - s"Invalid field selection for '${path.mkString(".")}'. The path is involving an array structure", + s"Invalid field selection for '$prefix${path.mkString(".")}'. The path is involving an array structure", ) case other => throw new IllegalArgumentException( - s"Invalid field selection for '${path.mkString(".")}'. $other is not handled", + s"Invalid field selection for '$prefix${path.mkString(".")}'. $other is not handled", ) } } diff --git a/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/TransformAndExtractPK.scala b/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/TransformAndExtractPK.scala index 354d59a00d..8e4f0d6e19 100644 --- a/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/TransformAndExtractPK.scala +++ b/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/TransformAndExtractPK.scala @@ -15,17 +15,12 @@ */ package io.lenses.streamreactor.connect.elastic6 -import java.nio.ByteBuffer - -import io.lenses.streamreactor.connect.json.SimpleJsonConverter import com.fasterxml.jackson.databind.JsonNode -import io.lenses.connect.sql.StructSql._ -import io.lenses.json.sql.JacksonJson -import io.lenses.json.sql.JsonSql._ -import io.lenses.sql.Field import com.typesafe.scalalogging.StrictLogging +import io.lenses.json.sql.JsonSql._ +import io.lenses.streamreactor.connect.json.SimpleJsonConverter import org.apache.kafka.connect.data.Schema -import org.apache.kafka.connect.data.Struct +import org.apache.kafka.connect.header.Headers import scala.util.Failure import scala.util.Success @@ -35,94 +30,88 @@ private object TransformAndExtractPK extends StrictLogging { lazy val simpleJsonConverter = new SimpleJsonConverter() def apply( - fields: Seq[Field], - primaryKeysPaths: Seq[Vector[String]], - schema: Schema, - value: Any, - withStructure: Boolean, - ): (Option[JsonNode], Seq[Any]) = { - def raiseException(msg: String, t: Throwable) = throw new IllegalArgumentException(msg, t) - + kcqlValues: KcqlValues, + schema: Schema, + value: Any, + withStructure: Boolean, + keySchema: Schema, + key: Any, + headers: Headers, + ): (Option[JsonNode], Seq[Any]) = if (value == null) { (None, Seq.empty) } else { - if (schema != null) { - schema.`type`() match { - case Schema.Type.BYTES => - //we expected to be json - val array = value match { - case a: Array[Byte] => a - case b: ByteBuffer => b.array() - case other => raiseException(s"Invalid payload:$other for schema Schema.BYTES.", null) - } + val result = for { + jsonNode <- extractJsonNode(value, schema) + transformedJson = jsonNode.sql(kcqlValues.fields, !withStructure) + keyJsonNodeOpt: Option[JsonNode] <- if (hasKeyFieldPath(kcqlValues.primaryKeysPath)) + extractOptionalJsonNode(key, keySchema) + else Try(Option.empty[JsonNode]) + } yield { + val primaryKeys = kcqlValues.primaryKeysPath.map { path => + extractPrimaryKey(path, jsonNode, keyJsonNodeOpt, headers) + } + (Option(transformedJson), primaryKeys) + } - Try(JacksonJson.mapper.readTree(array)) match { - case Failure(e) => raiseException("Invalid json.", e) - case Success(json) => - Try(json.sql(fields, !withStructure)) match { - case Failure(e) => raiseException(s"A KCQL exception occurred. ${e.getMessage}", e) - case Success(jn) => - (Option(jn), primaryKeysPaths.map(PrimaryKeyExtractor.extract(json, _))) - } - } + result match { + case Success(value) => value + case Failure(e) => throw e + } + } - case Schema.Type.STRING => - //we expected to be json - Try(JacksonJson.asJson(value.asInstanceOf[String])) match { - case Failure(e) => raiseException("Invalid json", e) - case Success(json) => - Try(json.sql(fields, !withStructure)) match { - case Success(jn) => (Option(jn), primaryKeysPaths.map(PrimaryKeyExtractor.extract(json, _))) - case Failure(e) => raiseException(s"A KCQL exception occurred.${e.getMessage}", e) - } - } + private def hasKeyFieldPath(paths: Seq[Vector[String]]): Boolean = + paths.exists(_.head == KafkaMessageParts.Key) - case Schema.Type.STRUCT => - val struct = value.asInstanceOf[Struct] - Try(struct.sql(fields, !withStructure)) match { - case Success(s) => - (Option(simpleJsonConverter.fromConnectData(s.schema(), s)), - primaryKeysPaths.map(PrimaryKeyExtractor.extract(struct, _)), - ) + private def extractJsonNode(value: Any, schema: Schema): Try[JsonNode] = + JsonPayloadExtractor.extractJsonNode(value, schema) match { + case Left(error) => Failure(new IllegalArgumentException(error)) + case Right(Some(node)) => Success(node) + case Right(None) => Failure(new IllegalArgumentException("Failed to extract JsonNode from value")) + } - case Failure(e) => raiseException(s"A KCQL error occurred.${e.getMessage}", e) - } + private def extractOptionalJsonNode(value: Any, schema: Schema): Try[Option[JsonNode]] = + if (value == null) Success(None) + else { + JsonPayloadExtractor.extractJsonNode(value, schema) match { + case Left(error) => Failure(new IllegalArgumentException(error)) + case Right(nodeOpt) => Success(nodeOpt) + } + } - case other => raiseException(s"Can't transform Schema type:$other.", null) + private def extractPrimaryKey( + path: Vector[String], + jsonNode: JsonNode, + keyJsonNodeOpt: Option[JsonNode], + headers: Headers, + ): Any = + path.head match { + case KafkaMessageParts.Key => + keyJsonNodeOpt match { + case Some(keyNode) => PrimaryKeyExtractor.extract(keyNode, path.tail, "_key.") + case None => + throw new IllegalArgumentException( + s"Key path '${path.mkString(".")}' has a null value", + ) } - } else { - //we can handle java.util.Map (this is what JsonConverter can spit out) - value match { - case m: java.util.Map[_, _] => - val map = m.asInstanceOf[java.util.Map[String, Any]] - val jsonNode: JsonNode = JacksonJson.mapper.valueToTree[JsonNode](map) - Try(jsonNode.sql(fields, !withStructure)) match { - case Success(j) => (Option(j), primaryKeysPaths.map(PrimaryKeyExtractor.extract(jsonNode, _))) - case Failure(e) => raiseException(s"A KCQL exception occurred.${e.getMessage}", e) - } - case s: String => - Try(JacksonJson.asJson(s)) match { - case Failure(e) => raiseException("Invalid json.", e) - case Success(json) => - Try(json.sql(fields, !withStructure)) match { - case Success(jn) => (Option(jn), primaryKeysPaths.map(PrimaryKeyExtractor.extract(json, _))) - case Failure(e) => raiseException(s"A KCQL exception occurred.${e.getMessage}", e) - } - } - - case b: Array[Byte] => - Try(JacksonJson.mapper.readTree(b)) match { - case Failure(e) => raiseException("Invalid json.", e) - case Success(json) => - Try(json.sql(fields, !withStructure)) match { - case Failure(e) => raiseException(s"A KCQL exception occurred. ${e.getMessage}", e) - case Success(jn) => (Option(jn), primaryKeysPaths.map(PrimaryKeyExtractor.extract(json, _))) - } + case KafkaMessageParts.Value => + PrimaryKeyExtractor.extract(jsonNode, path.tail) + case KafkaMessageParts.Header => + if (path.tail.size != 1) { + throw new IllegalArgumentException( + s"Invalid field selection for '${path.mkString(".")}'. " + + s"Headers lookup only supports single-level keys. Nested header keys are not supported.", + ) + } + headers.lastWithName(path.tail.head) match { + case null => throw new IllegalArgumentException(s"Header with key '${path.tail.head}' not found") + case header => header.value() match { + case value: String => value + case null => throw new IllegalArgumentException(s"Header '${path.tail.head}' has a null value") + case _ => throw new IllegalArgumentException(s"Header '${path.tail.head}' is not a string") } - //we take it as String - case other => raiseException(s"Value:$other is not handled!", null) } - } + case _ => + PrimaryKeyExtractor.extract(jsonNode, path) } - } } diff --git a/kafka-connect-elastic6/src/test/scala/io/lenses/streamreactor/connect/elastic6/JsonPayloadExtractorTest.scala b/kafka-connect-elastic6/src/test/scala/io/lenses/streamreactor/connect/elastic6/JsonPayloadExtractorTest.scala new file mode 100644 index 0000000000..ce799d3739 --- /dev/null +++ b/kafka-connect-elastic6/src/test/scala/io/lenses/streamreactor/connect/elastic6/JsonPayloadExtractorTest.scala @@ -0,0 +1,188 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.elastic6 + +import com.fasterxml.jackson.databind.node.TextNode +import org.apache.kafka.connect.data.Schema +import org.apache.kafka.connect.data.SchemaBuilder +import org.apache.kafka.connect.data.Struct +import org.scalatest.EitherValues +import org.scalatest.OptionValues +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers + +import java.nio.ByteBuffer +import java.util.{ HashMap => JHashMap } + +class JsonPayloadExtractorTest extends AnyFunSuite with Matchers with EitherValues with OptionValues { + + test("handle null values") { + val result = JsonPayloadExtractor.extractJsonNode(null, null) + result shouldBe Right(None) + } + + test("handle string JSON with schema") { + val jsonString = """{"name": "test"}""" + val schema = Schema.STRING_SCHEMA + val result = JsonPayloadExtractor.extractJsonNode(jsonString, schema) + + result.value.value.get("name").asText() shouldBe "test" + } + + test("primitive string is read as TextNode") { + val invalidJson = "invalid json" + val schema = Schema.STRING_SCHEMA + val result = JsonPayloadExtractor.extractJsonNode(invalidJson, schema) + + result.value.value shouldBe a[TextNode] + result.value.value.asText() shouldBe invalidJson + } + + test("handle byte array with schema") { + val jsonString = """{"name": "test"}""" + val bytes = jsonString.getBytes("UTF-8") + val schema = Schema.BYTES_SCHEMA + val result = JsonPayloadExtractor.extractJsonNode(bytes, schema) + + result.value.value.get("name").asText() shouldBe "test" + } + + test("handle ByteBuffer with schema") { + val jsonString = """{"name": "test"}""" + val byteBuffer = ByteBuffer.wrap(jsonString.getBytes("UTF-8")) + val schema = Schema.BYTES_SCHEMA + val result = JsonPayloadExtractor.extractJsonNode(byteBuffer, schema) + + result.value.value.get("name").asText() shouldBe "test" + } + + test("handle Struct with schema") { + val schema = SchemaBuilder.struct() + .field("name", Schema.STRING_SCHEMA) + .field("age", Schema.INT32_SCHEMA) + .build() + + val struct = new Struct(schema) + .put("name", "John") + .put("age", 30) + + val result = JsonPayloadExtractor.extractJsonNode(struct, schema) + + val jsonNode = result.value.value + jsonNode.get("name").asText() shouldBe "John" + jsonNode.get("age").asInt() shouldBe 30 + } + + test("handle Map without schema") { + val javaMap = new JHashMap[String, Any]() + javaMap.put("name", "test") + javaMap.put("value", Integer.valueOf(42)) + + val result = JsonPayloadExtractor.extractJsonNode(javaMap, null) + + val jsonNode = result.value.value + jsonNode.get("name").asText() shouldBe "test" + jsonNode.get("value").asInt() shouldBe 42 + } + + test("handle nested Map without schema") { + val innerMap = new JHashMap[String, Any]() + innerMap.put("age", Integer.valueOf(25)) + innerMap.put("city", "New York") + + val outerMap = new JHashMap[String, Any]() + outerMap.put("name", "test") + outerMap.put("details", innerMap) + + val result = JsonPayloadExtractor.extractJsonNode(outerMap, null) + + val jsonNode = result.value.value + jsonNode.get("name").asText() shouldBe "test" + jsonNode.get("details").get("age").asInt() shouldBe 25 + jsonNode.get("details").get("city").asText() shouldBe "New York" + } + + test("handle string JSON without schema") { + val jsonString = """{"name": "test"}""" + val result = JsonPayloadExtractor.extractJsonNode(jsonString, null) + + result.value.value.get("name").asText() shouldBe "test" + } + + test("handle byte array without schema") { + val jsonString = """{"name": "test"}""" + val bytes = jsonString.getBytes("UTF-8") + val result = JsonPayloadExtractor.extractJsonNode(bytes, null) + + result.value.value.get("name").asText() shouldBe "test" + } + + test("handle complex nested JSON") { + val jsonString = """ + { + "name": "test", + "details": { + "age": 30, + "address": { + "street": "123 Main St", + "city": "Test City" + }, + "hobbies": ["reading", "coding"] + } + } + """ + val result = JsonPayloadExtractor.extractJsonNode(jsonString, Schema.STRING_SCHEMA) + + val jsonNode = result.value.value + jsonNode.get("name").asText() shouldBe "test" + jsonNode.get("details").get("age").asInt() shouldBe 30 + jsonNode.get("details").get("address").get("street").asText() shouldBe "123 Main St" + jsonNode.get("details").get("hobbies").get(0).asText() shouldBe "reading" + } + + test("handle boolean value type") { + val result = JsonPayloadExtractor.extractJsonNode(true, Schema.BOOLEAN_SCHEMA) + + result.value.value.asBoolean() shouldBe true + } + + test("handle int value type") { + val result = JsonPayloadExtractor.extractJsonNode(42, null) + + result.value.value.asInt() shouldBe 42 + } + + test("handle invalid type for schema") { + val result = JsonPayloadExtractor.extractJsonNode(42, Schema.STRING_SCHEMA) + + result.left.value should include("Expected string but got") + } + + test("handle invalid struct type") { + val schema = SchemaBuilder.struct() + .field("name", Schema.STRING_SCHEMA) + .build() + + val result = JsonPayloadExtractor.extractJsonNode("not a struct", schema) + + result.left.value should include("Expected Struct but got") + } + + test("handle invalid bytes type") { + val result = JsonPayloadExtractor.extractJsonNode("not bytes", Schema.BYTES_SCHEMA) + result.left.value should include("Expected byte array or ByteBuffer") + } +} diff --git a/kafka-connect-elastic6/src/test/scala/io/lenses/streamreactor/connect/elastic6/PrimaryKeyExtractorTest.scala b/kafka-connect-elastic6/src/test/scala/io/lenses/streamreactor/connect/elastic6/PrimaryKeyExtractorTest.scala new file mode 100644 index 0000000000..917d66dac1 --- /dev/null +++ b/kafka-connect-elastic6/src/test/scala/io/lenses/streamreactor/connect/elastic6/PrimaryKeyExtractorTest.scala @@ -0,0 +1,269 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.elastic6 + +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.ObjectMapper +import org.apache.kafka.connect.data._ + +import scala.jdk.CollectionConverters._ +import org.apache.kafka.connect.errors.ConnectException + +class PrimaryKeyExtractorTest extends AnyFunSuite with Matchers { + def parseJson(jsonString: String): JsonNode = { + val mapper = new ObjectMapper() + mapper.readTree(jsonString) + } + + test("extract should retrieve a primitive value from JsonNode") { + val jsonString = """{"field": "value"}""" + val jsonNode = parseJson(jsonString) + val path = Vector("field") + + val result = PrimaryKeyExtractor.extract(jsonNode, path) + result shouldEqual "value" + } + + test("extract should retrieve a nested primitive value from JsonNode") { + val jsonString = """{"parent": {"child": 123}}""" + val jsonNode = parseJson(jsonString) + val path = Vector("parent", "child") + + val result = PrimaryKeyExtractor.extract(jsonNode, path) + result shouldEqual 123 + } + + test("extract should throw exception if path does not exist in JsonNode") { + val jsonString = """{"field": "value"}""" + val jsonNode = parseJson(jsonString) + val path = Vector("nonexistent") + + val exception = intercept[IllegalArgumentException] { + PrimaryKeyExtractor.extract(jsonNode, path) + } + exception.getMessage should include("Can't find nonexistent field") + } + + test("extract should throw exception if path leads to non-primitive field in JsonNode") { + val jsonString = """{"field": {"subfield": "value"}}""" + val jsonNode = parseJson(jsonString) + val path = Vector("field") + + val exception = intercept[IllegalArgumentException] { + PrimaryKeyExtractor.extract(jsonNode, path) + } + exception.getMessage should include("The path is not resolving to a primitive field") + } + + test("extract should throw exception when encountering an array in JsonNode") { + val jsonString = """{"field": [1, 2, 3]}""" + val jsonNode = parseJson(jsonString) + val path = Vector("field") + + val exception = intercept[IllegalArgumentException] { + PrimaryKeyExtractor.extract(jsonNode, path) + } + exception.getMessage should include("The path is involving an array structure") + } + + test("extract should handle various primitive types in JsonNode") { + val jsonString = + """ + |{ + | "stringField": "text", + | "intField": 42, + | "floatField": 3.14, + | "booleanField": true, + | "nullField": null + |} + |""".stripMargin + val jsonNode = parseJson(jsonString) + + PrimaryKeyExtractor.extract(jsonNode, Vector("stringField")) shouldEqual "text" + PrimaryKeyExtractor.extract(jsonNode, Vector("intField")) shouldEqual 42 + PrimaryKeyExtractor.extract(jsonNode, Vector("floatField")) shouldEqual 3.14 + PrimaryKeyExtractor.extract(jsonNode, Vector("booleanField")) shouldEqual true + PrimaryKeyExtractor.extract(jsonNode, Vector("nullField")).asInstanceOf[AnyRef] shouldBe null + } + + test("extract should retrieve a primitive value from Struct") { + val schema = SchemaBuilder.struct() + .field("field", Schema.STRING_SCHEMA) + .build() + + val struct = new Struct(schema) + .put("field", "value") + + val path = Vector("field") + + val result = PrimaryKeyExtractor.extract(struct, path) + result shouldEqual "value" + } + + test("extract should retrieve a nested primitive value from Struct") { + val nestedSchema = SchemaBuilder.struct() + .field("child", Schema.INT32_SCHEMA) + .build() + + val schema = SchemaBuilder.struct() + .field("parent", nestedSchema) + .build() + + val nestedStruct = new Struct(nestedSchema) + .put("child", 123) + + val struct = new Struct(schema) + .put("parent", nestedStruct) + + val path = Vector("parent", "child") + + val result = PrimaryKeyExtractor.extract(struct, path) + result shouldEqual 123 + } + + test("extract should throw exception if path does not exist in Struct") { + val schema = SchemaBuilder.struct() + .field("field", Schema.STRING_SCHEMA) + .build() + + val struct = new Struct(schema) + .put("field", "value") + + val path = Vector("nonexistent") + + val exception = intercept[IllegalArgumentException] { + PrimaryKeyExtractor.extract(struct, path) + } + exception.getMessage should include("Couldn't find field 'nonexistent'") + } + + test("extract should throw exception if field in Struct is null") { + val schema = SchemaBuilder.struct() + .field("field", Schema.OPTIONAL_STRING_SCHEMA) + .build() + + val struct = new Struct(schema) + .put("field", null) + + val path = Vector("field") + + val exception = intercept[IllegalArgumentException] { + PrimaryKeyExtractor.extract(struct, path) + } + exception.getMessage should include("Field 'field' is null") + } + + test("extract should handle various primitive types in Struct") { + val schema = SchemaBuilder.struct() + .field("stringField", Schema.STRING_SCHEMA) + .field("intField", Schema.INT32_SCHEMA) + .field("floatField", Schema.FLOAT32_SCHEMA) + .field("booleanField", Schema.BOOLEAN_SCHEMA) + .build() + + val struct = new Struct(schema) + .put("stringField", "text") + .put("intField", 42) + .put("floatField", 3.14f) + .put("booleanField", true) + + PrimaryKeyExtractor.extract(struct, Vector("stringField")) shouldEqual "text" + PrimaryKeyExtractor.extract(struct, Vector("intField")) shouldEqual 42 + PrimaryKeyExtractor.extract(struct, Vector("floatField")) shouldEqual 3.14f + PrimaryKeyExtractor.extract(struct, Vector("booleanField")) shouldEqual true + } + + test("extract should handle logical types in Struct (e.g., Date, Time, Timestamp)") { + val dateSchema = Date.SCHEMA + val timeSchema = Time.SCHEMA + val timestampSchema = Timestamp.SCHEMA + + val schema = SchemaBuilder.struct() + .field("dateField", dateSchema) + .field("timeField", timeSchema) + .field("timestampField", timestampSchema) + .build() + + val dateValue = new java.util.Date(1627843200000L) // Aug 1, 2021 + val timeValue = new java.util.Date(3600000L) // 1 hour in milliseconds + val timestampValue = new java.util.Date(1627843200000L) + + val struct = new Struct(schema) + .put("dateField", dateValue) + .put("timeField", timeValue) + .put("timestampField", timestampValue) + + PrimaryKeyExtractor.extract(struct, Vector("dateField")) shouldEqual dateValue + PrimaryKeyExtractor.extract(struct, Vector("timeField")) shouldEqual timeValue + PrimaryKeyExtractor.extract(struct, Vector("timestampField")) shouldEqual timestampValue + } + + test("extract should throw exception when encountering unsupported schema type in Struct") { + val schema = SchemaBuilder.struct() + .field("mapField", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA)) + .build() + + val mapValue = Map("key" -> 123).asJava + + val struct = new Struct(schema) + .put("mapField", mapValue) + + val path = Vector("mapField") + + val exception = intercept[IllegalArgumentException] { + PrimaryKeyExtractor.extract(struct, path) + } + exception.getMessage should include("It doesn't resolve to a primitive field") + } + + test("extract should traverse nested maps in Struct") { + val mapSchema = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build() + + val schema = SchemaBuilder.struct() + .field("mapField", mapSchema) + .build() + + val mapValue = Map("innerKey" -> 123).asJava + + val struct = new Struct(schema) + .put("mapField", mapValue) + + val path = Vector("mapField", "innerKey") + + val result = PrimaryKeyExtractor.extract(struct, path) + result shouldEqual 123 + } + + test("extract should throw exception when encountering unsupported schema type") { + val schema = SchemaBuilder.struct() + .field("arrayField", SchemaBuilder.array(Schema.INT32_SCHEMA)) + .build() + + val arrayValue = List(1, 2, 3).asJava + + val struct = new Struct(schema) + .put("arrayField", arrayValue) + + val path = Vector("arrayField") + + val exception = intercept[ConnectException] { + PrimaryKeyExtractor.extract(struct, path) + } + exception.getMessage should include("ARRAY is not a recognized schema") + } +} diff --git a/kafka-connect-elastic6/src/test/scala/io/lenses/streamreactor/connect/elastic6/TransformAndExtractPKTest.scala b/kafka-connect-elastic6/src/test/scala/io/lenses/streamreactor/connect/elastic6/TransformAndExtractPKTest.scala new file mode 100644 index 0000000000..1636e52438 --- /dev/null +++ b/kafka-connect-elastic6/src/test/scala/io/lenses/streamreactor/connect/elastic6/TransformAndExtractPKTest.scala @@ -0,0 +1,601 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.elastic6 + +import io.lenses.sql.Field +import org.apache.kafka.connect.data.Schema +import org.apache.kafka.connect.data.SchemaBuilder +import org.apache.kafka.connect.data.Struct +import org.apache.kafka.connect.header.ConnectHeaders +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers + +class TransformAndExtractPKTest extends AnyFunSuite with Matchers { + // Helper method to create KcqlValues + def createKcqlValues(fields: Seq[(String, String, Vector[String])]): KcqlValues = { + val fieldObjects = fields.map { + case (name, alias, parents) => Field(name, alias, parents) + } + KcqlValues( + fields = fieldObjects, + ignoredFields = Seq.empty, + primaryKeysPath = Seq.empty, + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + } + + test("should return None and empty Seq when value is null") { + val result = TransformAndExtractPK.apply( + kcqlValues = createKcqlValues(Seq.empty), + schema = null, + value = null, + withStructure = false, + keySchema = null, + key = null, + headers = new ConnectHeaders(), + ) + result shouldEqual (None, Seq.empty) + } + + test("should handle valid JSON value and extract primary keys") { + val jsonString = """{"field1": "value1", "field2": 2}""" + val value = jsonString + val schema = Schema.STRING_SCHEMA + + val keyJsonString = """{"keyField": "keyValue"}""" + val key = keyJsonString + val keySchema = Schema.STRING_SCHEMA + + val headers = new ConnectHeaders() + headers.addString("headerKey", "headerValue") + + val kcqlValues = KcqlValues( + fields = Seq( + Field(name = "field1", alias = "field1", parents = Vector.empty), + Field(name = "field2", alias = "field2", parents = Vector.empty), + ), + ignoredFields = Seq.empty, + primaryKeysPath = Seq( + Vector("_value", "field1"), + Vector("_key", "keyField"), + Vector("_header", "headerKey"), + ), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val result = TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = value, + withStructure = false, + keySchema = keySchema, + key = key, + headers = headers, + ) + + val (transformedJsonOpt, primaryKeys) = result + + transformedJsonOpt shouldBe defined + val transformedJson = transformedJsonOpt.get + + transformedJson.get("field1").asText() shouldEqual "value1" + transformedJson.get("field2").asInt() shouldEqual 2 + + primaryKeys should have size 3 + primaryKeys(0) shouldEqual "value1" + primaryKeys(1) shouldEqual "keyValue" + primaryKeys(2) shouldEqual "headerValue" + } + + test("should throw exception when header is missing") { + + val jsonString = """{"field1": "value1"}""" + val value = jsonString + val schema = Schema.STRING_SCHEMA + + val headers = new ConnectHeaders() + + val kcqlValues = KcqlValues( + fields = Seq( + Field(name = "field1", alias = "field1", parents = Vector.empty), + ), + ignoredFields = Seq.empty, + primaryKeysPath = Seq(Vector("_header", "missingHeader")), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val exception = intercept[IllegalArgumentException] { + TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = value, + withStructure = false, + keySchema = null, + key = null, + headers = headers, + ) + } + + exception.getMessage should include("Header with key 'missingHeader' not found") + } + + test("should extract primary key from Struct value") { + + val schema = SchemaBuilder.struct() + .field("field1", Schema.STRING_SCHEMA) + .field("field2", Schema.INT32_SCHEMA) + .build() + + val struct = new Struct(schema) + .put("field1", "value1") + .put("field2", 2) + + val value = struct + + val kcqlValues = KcqlValues( + fields = Seq( + Field(name = "field1", alias = "field1", parents = Vector.empty), + Field(name = "field2", alias = "field2", parents = Vector.empty), + ), + ignoredFields = Seq.empty, + primaryKeysPath = Seq(Vector("_value", "field1")), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val result = TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = value, + withStructure = false, + keySchema = null, + key = null, + headers = new ConnectHeaders(), + ) + + val (transformedJsonOpt, primaryKeys) = result + + transformedJsonOpt shouldBe defined + val transformedJson = transformedJsonOpt.get + + transformedJson.get("field1").asText() shouldEqual "value1" + transformedJson.get("field2").asInt() shouldEqual 2 + + primaryKeys should have size 1 + primaryKeys(0) shouldEqual "value1" + } + + test("should use the primary key value when the field path is just _key and the key payload is a primitive LONG") { + //key payload is a primitive long + val key = 123L + val keySchema = Schema.INT64_SCHEMA + + val kcqlValues = KcqlValues( + fields = Seq.empty, + ignoredFields = Seq.empty, + primaryKeysPath = Seq(Vector("_key")), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val schema = SchemaBuilder.struct() + .field("field1", Schema.STRING_SCHEMA) + .field("field2", Schema.INT32_SCHEMA) + .build() + + val struct = new Struct(schema) + .put("field1", "value1") + .put("field2", 2) + + val result = TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = struct, + withStructure = false, + keySchema = keySchema, + key = key, + headers = new ConnectHeaders(), + ) + + val (transformedJsonOpt, primaryKeys) = result + primaryKeys should have size 1 + primaryKeys(0) shouldEqual 123L + } + test("should use the primary key value when the field path is just _key and the key payload is a primitive STRONG") { + //key payload is a primitive string + val key = "keyValue" + val keySchema = Schema.STRING_SCHEMA + + val kcqlValues = KcqlValues( + fields = Seq.empty, + ignoredFields = Seq.empty, + primaryKeysPath = Seq(Vector("_key")), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val schema = SchemaBuilder.struct() + .field("field1", Schema.STRING_SCHEMA) + .field("field2", Schema.INT32_SCHEMA) + .build() + + val struct = new Struct(schema) + .put("field1", "value1") + .put("field2", 2) + + val result = TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = struct, + withStructure = false, + keySchema = keySchema, + key = key, + headers = new ConnectHeaders(), + ) + + val (transformedJsonOpt, primaryKeys) = result + primaryKeys should have size 1 + primaryKeys(0) shouldEqual "keyValue" + + } + + test("fail when the PK path uses _key.a when the key is only a primitive STRING") { + //key payload is a primitive string + val key = "keyValue" + val keySchema = Schema.STRING_SCHEMA + + val kcqlValues = KcqlValues( + fields = Seq.empty, + ignoredFields = Seq.empty, + primaryKeysPath = Seq(Vector("_key", "a")), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val schema = SchemaBuilder.struct() + .field("field1", Schema.STRING_SCHEMA) + .field("field2", Schema.INT32_SCHEMA) + .build() + + val struct = new Struct(schema) + .put("field1", "value1") + .put("field2", 2) + + val exception = the[IllegalArgumentException] thrownBy { + TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = struct, + withStructure = false, + keySchema = keySchema, + key = key, + headers = new ConnectHeaders(), + ) + } + + exception.getMessage should include("Invalid field selection for '_key.a'") + } + + test("should throw exception when primary key path is invalid") { + + val jsonString = """{"field1": {"nestedField": "value1"}}""" + val value = jsonString + val schema = Schema.STRING_SCHEMA + + val kcqlValues = KcqlValues( + fields = Seq( + Field(name = "field1", alias = "field1", parents = Vector.empty), + ), + ignoredFields = Seq.empty, + primaryKeysPath = Seq(Vector("_value", "field1", "nonexistentField")), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val exception = intercept[IllegalArgumentException] { + TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = value, + withStructure = false, + keySchema = null, + key = null, + headers = new ConnectHeaders(), + ) + } + + exception.getMessage should include("Can't find nonexistentField field") + } + + test("should return the path when the message _key is involved") { + + val jsonString = """{"field1": "value1"}""" + val value = jsonString + val schema = Schema.STRING_SCHEMA + + val keyJsonString = """{"keyField": "keyValue"}""" + val key = keyJsonString + val keySchema = Schema.STRING_SCHEMA + + val headers = new ConnectHeaders() + headers.addString("headerKey", "headerValue") + + val kcqlValues = KcqlValues( + fields = Seq( + Field(name = "field1", alias = "field1", parents = Vector.empty), + ), + ignoredFields = Seq.empty, + primaryKeysPath = Seq( + Vector("_key", "keyField"), + ), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val result = TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = value, + withStructure = false, + keySchema = keySchema, + key = key, + headers = headers, + ) + + val (transformedJsonOpt, primaryKeys) = result + + transformedJsonOpt shouldBe defined + val transformedJson = transformedJsonOpt.get + + transformedJson.get("field1").asText() shouldEqual "value1" + + primaryKeys should have size 1 + primaryKeys(0) shouldEqual "keyValue" // Extracted from _key.keyField + } + test("return the primary key when the _key path is involved and the path is 2 levels deep") { + + val jsonString = """{"field1": {"nestedField": "value1"}}""" + val value = jsonString + val schema = Schema.STRING_SCHEMA + + val keyJsonString = """{"keyField": "keyValue"}""" + val key = keyJsonString + val keySchema = Schema.STRING_SCHEMA + + val headers = new ConnectHeaders() + headers.addString("headerKey", "headerValue") + + val kcqlValues = KcqlValues( + fields = Seq( + Field(name = "field1", alias = "field1", parents = Vector.empty), + ), + ignoredFields = Seq.empty, + primaryKeysPath = Seq( + Vector("_key", "keyField"), + ), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val result = TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = value, + withStructure = false, + keySchema = keySchema, + key = key, + headers = headers, + ) + + val (transformedJsonOpt, primaryKeys) = result + + transformedJsonOpt shouldBe defined + val transformedJson = transformedJsonOpt.get + + transformedJson.get("field1").get("nestedField").asText() shouldEqual "value1" + + primaryKeys should have size 1 + primaryKeys(0) shouldEqual "keyValue" + } + + test("returns the primary key from a header entry") { + val jsonString = """{"field1": "value1"}""" + val value = jsonString + val schema = Schema.STRING_SCHEMA + + val keyJsonString = """{"keyField": "keyValue"}""" + val key = keyJsonString + val keySchema = Schema.STRING_SCHEMA + + val headers = new ConnectHeaders() + headers.addString("headerKey", "headerValue") + + val kcqlValues = KcqlValues( + fields = Seq( + Field(name = "field1", alias = "field1", parents = Vector.empty), + ), + ignoredFields = Seq.empty, + primaryKeysPath = Seq( + Vector("_header", "headerKey"), + ), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val result = TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = value, + withStructure = false, + keySchema = keySchema, + key = key, + headers = headers, + ) + + val (transformedJsonOpt, primaryKeys) = result + + transformedJsonOpt shouldBe defined + val transformedJson = transformedJsonOpt.get + + transformedJson.get("field1").asText() shouldEqual "value1" + + primaryKeys should have size 1 + primaryKeys(0) shouldEqual "headerValue" + } + + test("fail when primary key path involves a missing header key") { + val jsonString = """{"field1": "value1"}""" + val value = jsonString + val schema = Schema.STRING_SCHEMA + + val keyJsonString = """{"keyField": "keyValue"}""" + val key = keyJsonString + val keySchema = Schema.STRING_SCHEMA + + val headers = new ConnectHeaders() + headers.addString("headerKey", "headerValue") + + val kcqlValues = KcqlValues( + fields = Seq( + Field(name = "field1", alias = "field1", parents = Vector.empty), + ), + ignoredFields = Seq.empty, + primaryKeysPath = Seq( + Vector("_header", "missingHeader"), + ), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val exception = intercept[IllegalArgumentException] { + TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = value, + withStructure = false, + keySchema = keySchema, + key = key, + headers = headers, + ) + } + + exception.getMessage should include("Header with key 'missingHeader' not found") + } + test("fail when the header key has a null value") { + val jsonString = """{"field1": "value1"}""" + val value = jsonString + val schema = Schema.STRING_SCHEMA + + val keyJsonString = """{"keyField": "keyValue"}""" + val key = keyJsonString + val keySchema = Schema.STRING_SCHEMA + + val headers = new ConnectHeaders() + headers.addString("headerKey", null) + + val kcqlValues = KcqlValues( + fields = Seq( + Field(name = "field1", alias = "field1", parents = Vector.empty), + ), + ignoredFields = Seq.empty, + primaryKeysPath = Seq( + Vector("_header", "headerKey"), + ), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val exception = intercept[IllegalArgumentException] { + TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = value, + withStructure = false, + keySchema = keySchema, + key = key, + headers = headers, + ) + } + + exception.getMessage should include("Header 'headerKey' has a null value") + } + test("fail when the primary key path uses a nested header key") { + val jsonString = """{"field1": "value1"}""" + val value = jsonString + val schema = Schema.STRING_SCHEMA + + val keyJsonString = """{"keyField": "keyValue"}""" + val key = keyJsonString + val keySchema = Schema.STRING_SCHEMA + + val headers = new ConnectHeaders() + headers.addString("headerKey", "headerValue") + + val kcqlValues = KcqlValues( + fields = Seq( + Field(name = "field1", alias = "field1", parents = Vector.empty), + ), + ignoredFields = Seq.empty, + primaryKeysPath = Seq( + Vector("_header", "headerKey", "nonexistentField"), + ), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val exception = intercept[IllegalArgumentException] { + TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = value, + withStructure = false, + keySchema = keySchema, + key = key, + headers = headers, + ) + } + + exception.getMessage shouldBe "Invalid field selection for '_header.headerKey.nonexistentField'. Headers lookup only supports single-level keys. Nested header keys are not supported." + } + + //this is not a header key path test + test("fail when primary key path uses a key path which does not exists") { + val jsonString = """{"field1": "value1"}""" + val value = jsonString + val schema = Schema.STRING_SCHEMA + + val keyJsonString = """{"keyField": "keyValue"}""" + val key = keyJsonString + val keySchema = Schema.STRING_SCHEMA + + val headers = new ConnectHeaders() + headers.addString("headerKey", "headerValue") + + val kcqlValues = KcqlValues( + fields = Seq( + Field(name = "field1", alias = "field1", parents = Vector.empty), + ), + ignoredFields = Seq.empty, + primaryKeysPath = Seq( + Vector("_key", "nonexistentField"), + ), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val exception = intercept[IllegalArgumentException] { + TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = value, + withStructure = false, + keySchema = keySchema, + key = key, + headers = headers, + ) + } + + exception.getMessage shouldBe "Invalid field selection for '_key.nonexistentField'. Can't find nonexistentField field. Field found are:keyField" + } +} diff --git a/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/ElasticJsonWriter.scala b/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/ElasticJsonWriter.scala index 37e0373c35..7841c8dcfc 100644 --- a/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/ElasticJsonWriter.scala +++ b/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/ElasticJsonWriter.scala @@ -140,10 +140,12 @@ class ElasticJsonWriter(client: KElasticClient, settings: ElasticSettings) (Transform(kcqlValue.fields, r.valueSchema(), r.value(), kcql.hasRetainStructure), Seq.empty) } else { TransformAndExtractPK(kcqlValue, - kcqlValue.primaryKeysPath, r.valueSchema(), r.value(), kcql.hasRetainStructure, + r.keySchema(), + r.key(), + r.headers(), ) } val idFromPk = pks.mkString(settings.pkJoinerSeparator) diff --git a/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/JsonPayloadExtractor.scala b/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/JsonPayloadExtractor.scala new file mode 100644 index 0000000000..3195cf4c4c --- /dev/null +++ b/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/JsonPayloadExtractor.scala @@ -0,0 +1,208 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.elastic7 + +import cats.implicits.catsSyntaxEitherId +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node._ +import io.lenses.json.sql.JacksonJson +import io.lenses.streamreactor.connect.json.SimpleJsonConverter +import org.apache.kafka.connect.data.Struct +import org.apache.kafka.connect.data.Schema + +import java.nio.ByteBuffer +import scala.jdk.CollectionConverters.MapHasAsScala +import scala.jdk.CollectionConverters.CollectionHasAsScala +import scala.util.Try +import scala.util.Success +import scala.util.Failure + +object JsonPayloadExtractor { + lazy val simpleJsonConverter = new SimpleJsonConverter() + + def extractJsonNode(value: Any, schema: Schema): Either[String, Option[JsonNode]] = + (Option(value), Option(schema).map(_.`type`())) match { + case (None, _) => Right(None) + case (Some(_), Some(Schema.Type.BYTES)) => handleBytes(value) + case (Some(_), Some(Schema.Type.STRING)) => handleString(value) + case (Some(_), Some(Schema.Type.INT8)) => handleLong(value) + case (Some(_), Some(Schema.Type.INT16)) => handleLong(value) + case (Some(_), Some(Schema.Type.INT32)) => handleLong(value) + case (Some(_), Some(Schema.Type.INT64)) => handleLong(value) + case (Some(_), Some(Schema.Type.FLOAT32)) => handleFloat(value) + case (Some(_), Some(Schema.Type.FLOAT64)) => handleDouble(value) + case (Some(_), Some(Schema.Type.STRUCT)) => handleStruct(value) + case (Some(_), Some(Schema.Type.BOOLEAN)) => handleBoolean(value) + case (Some(_), Some(Schema.Type.ARRAY)) => handleArray(value) + case (Some(_), Some(Schema.Type.MAP)) => handleMap(value) + case (Some(_), Some(other)) => Left(s"Unsupported Schema type: $other") + case (Some(v), None) => handleSchemaLess(v) + } + + private def handleArray(value: Any): Either[String, Option[JsonNode]] = + value match { + case l: Iterable[_] => + val arrayNode = JacksonJson.mapper.createArrayNode() + l.foreach { item => + extractJsonNode(item, null) match { + case Right(Some(node)) => arrayNode.add(node) + case Right(None) => // ignore + case Left(err) => return Left(err) + } + } + Right(Some(arrayNode)) + + case jc: java.util.Collection[_] => + val arrayNode = JacksonJson.mapper.createArrayNode() + jc.asScala.foreach { item => + extractJsonNode(item, null) match { + case Right(Some(node)) => arrayNode.add(node) + case Right(None) => // ignore + case Left(err) => return Left(err) + } + } + Right(Some(arrayNode)) + case a: Array[_] => + val arrayNode = JacksonJson.mapper.createArrayNode() + a.foreach { item => + extractJsonNode(item, null) match { + case Right(Some(node)) => arrayNode.add(node) + case Right(None) => // ignore + case Left(err) => return Left(err) + } + } + Right(Some(arrayNode)) + case other => Left(s"Expected array but got: $other") + } + + private def handleMap(value: Any): Either[String, Option[JsonNode]] = + value match { + case m: java.util.Map[_, _] => + val map = m.asInstanceOf[java.util.Map[String, Any]] + val mapNode = JacksonJson.mapper.createObjectNode() + map.asScala.foreach { + case (key, value) => + extractJsonNode(value, null) match { + case Right(Some(node)) => mapNode.set(key, node) + case Right(None) => // ignore + case Left(err) => return Left(err) + } + } + Right(Some(mapNode)) + case other => Left(s"Expected map but got: $other") + } + private def handleBoolean(value: Any): Either[String, Option[JsonNode]] = + value match { + case b: Boolean => Some(BooleanNode.valueOf(b)).asRight[String] + case other => Left(s"Expected boolean but got: $other") + } + private def handleDouble(value: Any): Either[String, Option[JsonNode]] = + value match { + case f: Float => Some(DoubleNode.valueOf(f.toDouble)).asRight[String] + case d: Double => Some(DoubleNode.valueOf(d)).asRight[String] + case other => Left(s"Expected double but got: $other") + } + + private def handleFloat(value: Any): Either[String, Option[JsonNode]] = + value match { + case f: Float => Some(FloatNode.valueOf(f)).asRight[String] + case d: Double => Some(FloatNode.valueOf(d.toFloat)).asRight[String] + case other => Left(s"Expected float but got: $other") + } + + private def handleLong(value: Any): Either[String, Option[JsonNode]] = + value match { + case b: Byte => Some(LongNode.valueOf(b.toLong)).asRight[String] + case s: Short => Some(LongNode.valueOf(s.toLong)).asRight[String] + case i: Int => Some(LongNode.valueOf(i.toLong)).asRight[String] + case l: Long => Some(LongNode.valueOf(l)).asRight[String] + case other => Left(s"Expected long but got: $other") + } + + private def handleBytes(value: Any): Either[String, Option[JsonNode]] = + value match { + case bytes: Array[Byte] => + tryReadJson(bytes).map(Some(_)) + case byteBuffer: ByteBuffer => + val bytes = new Array[Byte](byteBuffer.remaining()) + byteBuffer.get(bytes) + tryReadJson(bytes).map(Some(_)) + case other => Left(s"Expected byte array or ByteBuffer but got: $other") + } + + private def handleString(value: Any): Either[String, Option[JsonNode]] = + value match { + case s: String => + tryParseJson(s).map(Some(_)) match { + case Left(_) => TextNode.valueOf(s).asRight[String].map(Some(_)) + case r => r + } + case other => Left(s"Expected string but got: $other") + } + + private def handleStruct(value: Any): Either[String, Option[JsonNode]] = + value match { + case struct: Struct => + Try(simpleJsonConverter.fromConnectData(struct.schema(), struct)) match { + case Success(jsonNode) => Right(Some(jsonNode)) + case Failure(e) => Left(s"Failed to convert Struct to JsonNode: ${e.getMessage}") + } + case other => Left(s"Expected Struct but got: $other") + } + + private def handleSchemaLess(value: Any): Either[String, Option[JsonNode]] = + value match { + case m: java.util.Map[_, _] => + Try { + val map = m.asInstanceOf[java.util.Map[String, Any]] + JacksonJson.mapper.valueToTree[JsonNode](map) + } match { + case Success(node) => Right(Some(node)) + case Failure(e) => Left(s"Failed to convert Map to JsonNode: ${e.getMessage}") + } + + case s: String => + tryParseJson(s).map(Some(_)) match { + case Left(_) => TextNode.valueOf(s).asRight[String].map(Some(_)) + case r => r + } + case b: Array[Byte] => tryReadJson(b).map(Some(_)) + case b: Byte => IntNode.valueOf(b.toInt).asRight[String].map(Some(_)) + case s: Short => IntNode.valueOf(s.toInt).asRight[String].map(Some(_)) + case i: Int => IntNode.valueOf(i).asRight[String].map(Some(_)) + case l: Long => LongNode.valueOf(l).asRight[String].map(Some(_)) + case f: Float => FloatNode.valueOf(f).asRight[String].map(Some(_)) + case double: Double => DoubleNode.valueOf(double).asRight[String].map(Some(_)) + case bigDecimal: BigDecimal => DecimalNode.valueOf(bigDecimal.bigDecimal).asRight[String].map(Some(_)) + case bigDecimal: java.math.BigDecimal => DecimalNode.valueOf(bigDecimal).asRight[String].map(Some(_)) + case boolean: Boolean => BooleanNode.valueOf(boolean).asRight[String].map(Some(_)) + case bi: BigInt => BigIntegerNode.valueOf(bi.bigInteger).asRight[String].map(Some(_)) + case bi: java.math.BigInteger => BigIntegerNode.valueOf(bi).asRight[String].map(Some(_)) + case other => Left(s"Unsupported value type: ${other.getClass.getName}") + } + + private def tryParseJson(str: String): Either[String, JsonNode] = + Try(JacksonJson.asJson(str)) match { + case Success(json) => Right(json) + case Failure(e) => Left(s"Invalid JSON string: ${e.getMessage}") + } + + private def tryReadJson(bytes: Array[Byte]): Either[String, JsonNode] = + Try(JacksonJson.mapper.readTree(bytes)) match { + case Success(json) => Right(json) + case Failure(e) => Left(s"Invalid JSON bytes: ${e.getMessage}") + } +} diff --git a/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/KafkaMessageParts.scala b/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/KafkaMessageParts.scala new file mode 100644 index 0000000000..402429c8de --- /dev/null +++ b/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/KafkaMessageParts.scala @@ -0,0 +1,22 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.elastic7 + +object KafkaMessageParts { + val Key = "_key" + val Value = "_value" + val Header = "_header" +} diff --git a/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/PrimaryKeyExtractor.scala b/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/PrimaryKeyExtractor.scala index 5b54ff38e1..46c8d4839e 100644 --- a/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/PrimaryKeyExtractor.scala +++ b/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/PrimaryKeyExtractor.scala @@ -25,13 +25,13 @@ import scala.jdk.CollectionConverters.IteratorHasAsScala import scala.jdk.CollectionConverters.ListHasAsScala object PrimaryKeyExtractor { - def extract(node: JsonNode, path: Vector[String]): Any = { + def extract(node: JsonNode, path: Vector[String], prefix: String = ""): Any = { @tailrec def innerExtract(n: JsonNode, p: Vector[String]): Any = { def checkValidPath(): Unit = if (p.nonEmpty) { throw new IllegalArgumentException( - s"Invalid field selection for '${path.mkString(".")}'. It doesn't resolve to a primitive field", + s"Invalid field selection for '$prefix${path.mkString(".")}'. It doesn't resolve to a primitive field", ) } @@ -79,24 +79,24 @@ object PrimaryKeyExtractor { case node: ObjectNode => if (p.isEmpty) { throw new IllegalArgumentException( - s"Invalid field selection for '${path.mkString(".")}'. The path is not resolving to a primitive field", + s"Invalid field selection for '$prefix${path.mkString(".")}'. The path is not resolving to a primitive field", ) } val childNode = Option(node.get(p.head)).getOrElse { throw new IllegalArgumentException( - s"Invalid field selection for '${path.mkString(".")}'. Can't find ${p.head} field. Field found are:${node.fieldNames().asScala.mkString(",")}", + s"Invalid field selection for '$prefix${path.mkString(".")}'. Can't find ${p.head} field. Field found are:${node.fieldNames().asScala.mkString(",")}", ) } innerExtract(childNode, p.tail) case _: ArrayNode => throw new IllegalArgumentException( - s"Invalid field selection for '${path.mkString(".")}'. The path is involving an array structure", + s"Invalid field selection for '$prefix${path.mkString(".")}'. The path is involving an array structure", ) case other => throw new IllegalArgumentException( - s"Invalid field selection for '${path.mkString(".")}'. $other is not handled", + s"Invalid field selection for '$prefix${path.mkString(".")}'. $other is not handled", ) } } diff --git a/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/TransformAndExtractPK.scala b/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/TransformAndExtractPK.scala index 70a6be24b7..1dfadb1661 100644 --- a/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/TransformAndExtractPK.scala +++ b/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/TransformAndExtractPK.scala @@ -17,14 +17,11 @@ package io.lenses.streamreactor.connect.elastic7 import com.fasterxml.jackson.databind.JsonNode import com.typesafe.scalalogging.StrictLogging -import io.lenses.connect.sql.StructSql._ -import io.lenses.json.sql.JacksonJson import io.lenses.json.sql.JsonSql._ import io.lenses.streamreactor.connect.json.SimpleJsonConverter import org.apache.kafka.connect.data.Schema -import org.apache.kafka.connect.data.Struct +import org.apache.kafka.connect.header.Headers -import java.nio.ByteBuffer import scala.util.Failure import scala.util.Success import scala.util.Try @@ -33,94 +30,88 @@ private object TransformAndExtractPK extends StrictLogging { lazy val simpleJsonConverter = new SimpleJsonConverter() def apply( - kcqlValues: KcqlValues, - primaryKeysPaths: Seq[Vector[String]], - schema: Schema, - value: Any, - withStructure: Boolean, - ): (Option[JsonNode], Seq[Any]) = { - def raiseException(msg: String, t: Throwable) = throw new IllegalArgumentException(msg, t) - + kcqlValues: KcqlValues, + schema: Schema, + value: Any, + withStructure: Boolean, + keySchema: Schema, + key: Any, + headers: Headers, + ): (Option[JsonNode], Seq[Any]) = if (value == null) { (None, Seq.empty) } else { - if (schema != null) { - schema.`type`() match { - case Schema.Type.BYTES => - //we expected to be json - val array = value match { - case a: Array[Byte] => a - case b: ByteBuffer => b.array() - case other => raiseException(s"Invalid payload:$other for schema Schema.BYTES.", null) - } + val result = for { + jsonNode <- extractJsonNode(value, schema) + transformedJson = jsonNode.sql(kcqlValues.fields, !withStructure) + keyJsonNodeOpt: Option[JsonNode] <- if (hasKeyFieldPath(kcqlValues.primaryKeysPath)) + extractOptionalJsonNode(key, keySchema) + else Try(Option.empty[JsonNode]) + } yield { + val primaryKeys = kcqlValues.primaryKeysPath.map { path => + extractPrimaryKey(path, jsonNode, keyJsonNodeOpt, headers) + } + (Option(transformedJson), primaryKeys) + } - Try(JacksonJson.mapper.readTree(array)) match { - case Failure(e) => raiseException("Invalid json.", e) - case Success(json) => - Try(json.sql(kcqlValues.fields, !withStructure)) match { - case Failure(e) => raiseException(s"A KCQL exception occurred. ${e.getMessage}", e) - case Success(jn) => - (Option(jn), primaryKeysPaths.map(PrimaryKeyExtractor.extract(json, _))) - } - } + result match { + case Success(value) => value + case Failure(e) => throw e + } + } - case Schema.Type.STRING => - //we expected to be json - Try(JacksonJson.asJson(value.asInstanceOf[String])) match { - case Failure(e) => raiseException("Invalid json", e) - case Success(json) => - Try(json.sql(kcqlValues.fields, !withStructure)) match { - case Success(jn) => (Option(jn), primaryKeysPaths.map(PrimaryKeyExtractor.extract(json, _))) - case Failure(e) => raiseException(s"A KCQL exception occurred.${e.getMessage}", e) - } - } + private def hasKeyFieldPath(paths: Seq[Vector[String]]): Boolean = + paths.exists(_.head == KafkaMessageParts.Key) - case Schema.Type.STRUCT => - val struct = value.asInstanceOf[Struct] - Try(struct.sql(kcqlValues.fields, !withStructure)) match { - case Success(s) => - (Option(simpleJsonConverter.fromConnectData(s.schema(), s)), - primaryKeysPaths.map(PrimaryKeyExtractor.extract(struct, _)), - ) + private def extractJsonNode(value: Any, schema: Schema): Try[JsonNode] = + JsonPayloadExtractor.extractJsonNode(value, schema) match { + case Left(error) => Failure(new IllegalArgumentException(error)) + case Right(Some(node)) => Success(node) + case Right(None) => Failure(new IllegalArgumentException("Failed to extract JsonNode from value")) + } - case Failure(e) => raiseException(s"A KCQL error occurred.${e.getMessage}", e) - } + private def extractOptionalJsonNode(value: Any, schema: Schema): Try[Option[JsonNode]] = + if (value == null) Success(None) + else { + JsonPayloadExtractor.extractJsonNode(value, schema) match { + case Left(error) => Failure(new IllegalArgumentException(error)) + case Right(nodeOpt) => Success(nodeOpt) + } + } - case other => raiseException(s"Can't transform Schema type:$other.", null) + private def extractPrimaryKey( + path: Vector[String], + jsonNode: JsonNode, + keyJsonNodeOpt: Option[JsonNode], + headers: Headers, + ): Any = + path.head match { + case KafkaMessageParts.Key => + keyJsonNodeOpt match { + case Some(keyNode) => PrimaryKeyExtractor.extract(keyNode, path.tail, "_key.") + case None => + throw new IllegalArgumentException( + s"Key path '${path.mkString(".")}' has a null value", + ) } - } else { - //we can handle java.util.Map (this is what JsonConverter can spit out) - value match { - case m: java.util.Map[_, _] => - val map = m.asInstanceOf[java.util.Map[String, Any]] - val jsonNode: JsonNode = JacksonJson.mapper.valueToTree[JsonNode](map) - Try(jsonNode.sql(kcqlValues.fields, !withStructure)) match { - case Success(j) => (Option(j), primaryKeysPaths.map(PrimaryKeyExtractor.extract(jsonNode, _))) - case Failure(e) => raiseException(s"A KCQL exception occurred.${e.getMessage}", e) - } - case s: String => - Try(JacksonJson.asJson(s)) match { - case Failure(e) => raiseException("Invalid json.", e) - case Success(json) => - Try(json.sql(kcqlValues.fields, !withStructure)) match { - case Success(jn) => (Option(jn), primaryKeysPaths.map(PrimaryKeyExtractor.extract(json, _))) - case Failure(e) => raiseException(s"A KCQL exception occurred.${e.getMessage}", e) - } - } - - case b: Array[Byte] => - Try(JacksonJson.mapper.readTree(b)) match { - case Failure(e) => raiseException("Invalid json.", e) - case Success(json) => - Try(json.sql(kcqlValues.fields, !withStructure)) match { - case Failure(e) => raiseException(s"A KCQL exception occurred. ${e.getMessage}", e) - case Success(jn) => (Option(jn), primaryKeysPaths.map(PrimaryKeyExtractor.extract(json, _))) - } + case KafkaMessageParts.Value => + PrimaryKeyExtractor.extract(jsonNode, path.tail) + case KafkaMessageParts.Header => + if (path.tail.size != 1) { + throw new IllegalArgumentException( + s"Invalid field selection for '${path.mkString(".")}'. " + + s"Headers lookup only supports single-level keys. Nested header keys are not supported.", + ) + } + headers.lastWithName(path.tail.head) match { + case null => throw new IllegalArgumentException(s"Header with key '${path.tail.head}' not found") + case header => header.value() match { + case value: String => value + case null => throw new IllegalArgumentException(s"Header '${path.tail.head}' has a null value") + case _ => throw new IllegalArgumentException(s"Header '${path.tail.head}' is not a string") } - //we take it as String - case other => raiseException(s"Value:$other is not handled!", null) } - } + case _ => + PrimaryKeyExtractor.extract(jsonNode, path) } - } } diff --git a/kafka-connect-elastic7/src/test/scala/io/lenses/streamreactor/connect/elastic7/JsonPayloadExtractorTest.scala b/kafka-connect-elastic7/src/test/scala/io/lenses/streamreactor/connect/elastic7/JsonPayloadExtractorTest.scala new file mode 100644 index 0000000000..16ffbe1aea --- /dev/null +++ b/kafka-connect-elastic7/src/test/scala/io/lenses/streamreactor/connect/elastic7/JsonPayloadExtractorTest.scala @@ -0,0 +1,188 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.elastic7 + +import com.fasterxml.jackson.databind.node.TextNode +import org.apache.kafka.connect.data.Schema +import org.apache.kafka.connect.data.SchemaBuilder +import org.apache.kafka.connect.data.Struct +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers +import org.scalatest.EitherValues +import org.scalatest.OptionValues + +import java.nio.ByteBuffer +import java.util.{ HashMap => JHashMap } + +class JsonPayloadExtractorTest extends AnyFunSuite with Matchers with EitherValues with OptionValues { + + test("handle null values") { + val result = JsonPayloadExtractor.extractJsonNode(null, null) + result shouldBe Right(None) + } + + test("handle string JSON with schema") { + val jsonString = """{"name": "test"}""" + val schema = Schema.STRING_SCHEMA + val result = JsonPayloadExtractor.extractJsonNode(jsonString, schema) + + result.value.value.get("name").asText() shouldBe "test" + } + + test("primitive string is read as TextNode") { + val invalidJson = "invalid json" + val schema = Schema.STRING_SCHEMA + val result = JsonPayloadExtractor.extractJsonNode(invalidJson, schema) + + result.value.value shouldBe a[TextNode] + result.value.value.asText() shouldBe invalidJson + } + + test("handle byte array with schema") { + val jsonString = """{"name": "test"}""" + val bytes = jsonString.getBytes("UTF-8") + val schema = Schema.BYTES_SCHEMA + val result = JsonPayloadExtractor.extractJsonNode(bytes, schema) + + result.value.value.get("name").asText() shouldBe "test" + } + + test("handle ByteBuffer with schema") { + val jsonString = """{"name": "test"}""" + val byteBuffer = ByteBuffer.wrap(jsonString.getBytes("UTF-8")) + val schema = Schema.BYTES_SCHEMA + val result = JsonPayloadExtractor.extractJsonNode(byteBuffer, schema) + + result.value.value.get("name").asText() shouldBe "test" + } + + test("handle Struct with schema") { + val schema = SchemaBuilder.struct() + .field("name", Schema.STRING_SCHEMA) + .field("age", Schema.INT32_SCHEMA) + .build() + + val struct = new Struct(schema) + .put("name", "John") + .put("age", 30) + + val result = JsonPayloadExtractor.extractJsonNode(struct, schema) + + val jsonNode = result.value.value + jsonNode.get("name").asText() shouldBe "John" + jsonNode.get("age").asInt() shouldBe 30 + } + + test("handle Map without schema") { + val javaMap = new JHashMap[String, Any]() + javaMap.put("name", "test") + javaMap.put("value", Integer.valueOf(42)) + + val result = JsonPayloadExtractor.extractJsonNode(javaMap, null) + + val jsonNode = result.value.value + jsonNode.get("name").asText() shouldBe "test" + jsonNode.get("value").asInt() shouldBe 42 + } + + test("handle nested Map without schema") { + val innerMap = new JHashMap[String, Any]() + innerMap.put("age", Integer.valueOf(25)) + innerMap.put("city", "New York") + + val outerMap = new JHashMap[String, Any]() + outerMap.put("name", "test") + outerMap.put("details", innerMap) + + val result = JsonPayloadExtractor.extractJsonNode(outerMap, null) + + val jsonNode = result.value.value + jsonNode.get("name").asText() shouldBe "test" + jsonNode.get("details").get("age").asInt() shouldBe 25 + jsonNode.get("details").get("city").asText() shouldBe "New York" + } + + test("handle string JSON without schema") { + val jsonString = """{"name": "test"}""" + val result = JsonPayloadExtractor.extractJsonNode(jsonString, null) + + result.value.value.get("name").asText() shouldBe "test" + } + + test("handle byte array without schema") { + val jsonString = """{"name": "test"}""" + val bytes = jsonString.getBytes("UTF-8") + val result = JsonPayloadExtractor.extractJsonNode(bytes, null) + + result.value.value.get("name").asText() shouldBe "test" + } + + test("handle complex nested JSON") { + val jsonString = """ + { + "name": "test", + "details": { + "age": 30, + "address": { + "street": "123 Main St", + "city": "Test City" + }, + "hobbies": ["reading", "coding"] + } + } + """ + val result = JsonPayloadExtractor.extractJsonNode(jsonString, Schema.STRING_SCHEMA) + + val jsonNode = result.value.value + jsonNode.get("name").asText() shouldBe "test" + jsonNode.get("details").get("age").asInt() shouldBe 30 + jsonNode.get("details").get("address").get("street").asText() shouldBe "123 Main St" + jsonNode.get("details").get("hobbies").get(0).asText() shouldBe "reading" + } + + test("handle boolean value type") { + val result = JsonPayloadExtractor.extractJsonNode(true, Schema.BOOLEAN_SCHEMA) + + result.value.value.asBoolean() shouldBe true + } + + test("handle int value type") { + val result = JsonPayloadExtractor.extractJsonNode(42, null) + + result.value.value.asInt() shouldBe 42 + } + + test("handle invalid type for schema") { + val result = JsonPayloadExtractor.extractJsonNode(42, Schema.STRING_SCHEMA) + + result.left.value should include("Expected string but got") + } + + test("handle invalid struct type") { + val schema = SchemaBuilder.struct() + .field("name", Schema.STRING_SCHEMA) + .build() + + val result = JsonPayloadExtractor.extractJsonNode("not a struct", schema) + + result.left.value should include("Expected Struct but got") + } + + test("handle invalid bytes type") { + val result = JsonPayloadExtractor.extractJsonNode("not bytes", Schema.BYTES_SCHEMA) + result.left.value should include("Expected byte array or ByteBuffer") + } +} diff --git a/kafka-connect-elastic7/src/test/scala/io/lenses/streamreactor/connect/elastic7/PrimaryKeyExtractorTest.scala b/kafka-connect-elastic7/src/test/scala/io/lenses/streamreactor/connect/elastic7/PrimaryKeyExtractorTest.scala new file mode 100644 index 0000000000..62c18f7fc9 --- /dev/null +++ b/kafka-connect-elastic7/src/test/scala/io/lenses/streamreactor/connect/elastic7/PrimaryKeyExtractorTest.scala @@ -0,0 +1,268 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.elastic7 + +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.ObjectMapper +import org.apache.kafka.connect.data._ +import scala.jdk.CollectionConverters._ +import org.apache.kafka.connect.errors.ConnectException + +class PrimaryKeyExtractorTest extends AnyFunSuite with Matchers { + def parseJson(jsonString: String): JsonNode = { + val mapper = new ObjectMapper() + mapper.readTree(jsonString) + } + + test("extract should retrieve a primitive value from JsonNode") { + val jsonString = """{"field": "value"}""" + val jsonNode = parseJson(jsonString) + val path = Vector("field") + + val result = PrimaryKeyExtractor.extract(jsonNode, path) + result shouldEqual "value" + } + + test("extract should retrieve a nested primitive value from JsonNode") { + val jsonString = """{"parent": {"child": 123}}""" + val jsonNode = parseJson(jsonString) + val path = Vector("parent", "child") + + val result = PrimaryKeyExtractor.extract(jsonNode, path) + result shouldEqual 123 + } + + test("extract should throw exception if path does not exist in JsonNode") { + val jsonString = """{"field": "value"}""" + val jsonNode = parseJson(jsonString) + val path = Vector("nonexistent") + + val exception = intercept[IllegalArgumentException] { + PrimaryKeyExtractor.extract(jsonNode, path) + } + exception.getMessage should include("Can't find nonexistent field") + } + + test("extract should throw exception if path leads to non-primitive field in JsonNode") { + val jsonString = """{"field": {"subfield": "value"}}""" + val jsonNode = parseJson(jsonString) + val path = Vector("field") + + val exception = intercept[IllegalArgumentException] { + PrimaryKeyExtractor.extract(jsonNode, path) + } + exception.getMessage should include("The path is not resolving to a primitive field") + } + + test("extract should throw exception when encountering an array in JsonNode") { + val jsonString = """{"field": [1, 2, 3]}""" + val jsonNode = parseJson(jsonString) + val path = Vector("field") + + val exception = intercept[IllegalArgumentException] { + PrimaryKeyExtractor.extract(jsonNode, path) + } + exception.getMessage should include("The path is involving an array structure") + } + + test("extract should handle various primitive types in JsonNode") { + val jsonString = + """ + |{ + | "stringField": "text", + | "intField": 42, + | "floatField": 3.14, + | "booleanField": true, + | "nullField": null + |} + |""".stripMargin + val jsonNode = parseJson(jsonString) + + PrimaryKeyExtractor.extract(jsonNode, Vector("stringField")) shouldEqual "text" + PrimaryKeyExtractor.extract(jsonNode, Vector("intField")) shouldEqual 42 + PrimaryKeyExtractor.extract(jsonNode, Vector("floatField")) shouldEqual 3.14 + PrimaryKeyExtractor.extract(jsonNode, Vector("booleanField")) shouldEqual true + PrimaryKeyExtractor.extract(jsonNode, Vector("nullField")).asInstanceOf[AnyRef] shouldBe null + } + + test("extract should retrieve a primitive value from Struct") { + val schema = SchemaBuilder.struct() + .field("field", Schema.STRING_SCHEMA) + .build() + + val struct = new Struct(schema) + .put("field", "value") + + val path = Vector("field") + + val result = PrimaryKeyExtractor.extract(struct, path) + result shouldEqual "value" + } + + test("extract should retrieve a nested primitive value from Struct") { + val nestedSchema = SchemaBuilder.struct() + .field("child", Schema.INT32_SCHEMA) + .build() + + val schema = SchemaBuilder.struct() + .field("parent", nestedSchema) + .build() + + val nestedStruct = new Struct(nestedSchema) + .put("child", 123) + + val struct = new Struct(schema) + .put("parent", nestedStruct) + + val path = Vector("parent", "child") + + val result = PrimaryKeyExtractor.extract(struct, path) + result shouldEqual 123 + } + + test("extract should throw exception if path does not exist in Struct") { + val schema = SchemaBuilder.struct() + .field("field", Schema.STRING_SCHEMA) + .build() + + val struct = new Struct(schema) + .put("field", "value") + + val path = Vector("nonexistent") + + val exception = intercept[IllegalArgumentException] { + PrimaryKeyExtractor.extract(struct, path) + } + exception.getMessage should include("Couldn't find field 'nonexistent'") + } + + test("extract should throw exception if field in Struct is null") { + val schema = SchemaBuilder.struct() + .field("field", Schema.OPTIONAL_STRING_SCHEMA) + .build() + + val struct = new Struct(schema) + .put("field", null) + + val path = Vector("field") + + val exception = intercept[IllegalArgumentException] { + PrimaryKeyExtractor.extract(struct, path) + } + exception.getMessage should include("Field 'field' is null") + } + + test("extract should handle various primitive types in Struct") { + val schema = SchemaBuilder.struct() + .field("stringField", Schema.STRING_SCHEMA) + .field("intField", Schema.INT32_SCHEMA) + .field("floatField", Schema.FLOAT32_SCHEMA) + .field("booleanField", Schema.BOOLEAN_SCHEMA) + .build() + + val struct = new Struct(schema) + .put("stringField", "text") + .put("intField", 42) + .put("floatField", 3.14f) + .put("booleanField", true) + + PrimaryKeyExtractor.extract(struct, Vector("stringField")) shouldEqual "text" + PrimaryKeyExtractor.extract(struct, Vector("intField")) shouldEqual 42 + PrimaryKeyExtractor.extract(struct, Vector("floatField")) shouldEqual 3.14f + PrimaryKeyExtractor.extract(struct, Vector("booleanField")) shouldEqual true + } + + test("extract should handle logical types in Struct (e.g., Date, Time, Timestamp)") { + val dateSchema = Date.SCHEMA + val timeSchema = Time.SCHEMA + val timestampSchema = Timestamp.SCHEMA + + val schema = SchemaBuilder.struct() + .field("dateField", dateSchema) + .field("timeField", timeSchema) + .field("timestampField", timestampSchema) + .build() + + val dateValue = new java.util.Date(1627843200000L) // Aug 1, 2021 + val timeValue = new java.util.Date(3600000L) // 1 hour in milliseconds + val timestampValue = new java.util.Date(1627843200000L) + + val struct = new Struct(schema) + .put("dateField", dateValue) + .put("timeField", timeValue) + .put("timestampField", timestampValue) + + PrimaryKeyExtractor.extract(struct, Vector("dateField")) shouldEqual dateValue + PrimaryKeyExtractor.extract(struct, Vector("timeField")) shouldEqual timeValue + PrimaryKeyExtractor.extract(struct, Vector("timestampField")) shouldEqual timestampValue + } + + test("extract should throw exception when encountering unsupported schema type in Struct") { + val schema = SchemaBuilder.struct() + .field("mapField", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA)) + .build() + + val mapValue = Map("key" -> 123).asJava + + val struct = new Struct(schema) + .put("mapField", mapValue) + + val path = Vector("mapField") + + val exception = intercept[IllegalArgumentException] { + PrimaryKeyExtractor.extract(struct, path) + } + exception.getMessage should include("It doesn't resolve to a primitive field") + } + + test("extract should traverse nested maps in Struct") { + val mapSchema = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build() + + val schema = SchemaBuilder.struct() + .field("mapField", mapSchema) + .build() + + val mapValue = Map("innerKey" -> 123).asJava + + val struct = new Struct(schema) + .put("mapField", mapValue) + + val path = Vector("mapField", "innerKey") + + val result = PrimaryKeyExtractor.extract(struct, path) + result shouldEqual 123 + } + + test("extract should throw exception when encountering unsupported schema type") { + val schema = SchemaBuilder.struct() + .field("arrayField", SchemaBuilder.array(Schema.INT32_SCHEMA)) + .build() + + val arrayValue = List(1, 2, 3).asJava + + val struct = new Struct(schema) + .put("arrayField", arrayValue) + + val path = Vector("arrayField") + + val exception = intercept[ConnectException] { + PrimaryKeyExtractor.extract(struct, path) + } + exception.getMessage should include("ARRAY is not a recognized schema") + } +} diff --git a/kafka-connect-elastic7/src/test/scala/io/lenses/streamreactor/connect/elastic7/TransformAndExtractPKTest.scala b/kafka-connect-elastic7/src/test/scala/io/lenses/streamreactor/connect/elastic7/TransformAndExtractPKTest.scala new file mode 100644 index 0000000000..a9994d25d2 --- /dev/null +++ b/kafka-connect-elastic7/src/test/scala/io/lenses/streamreactor/connect/elastic7/TransformAndExtractPKTest.scala @@ -0,0 +1,601 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.elastic7 + +import io.lenses.sql.Field +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers +import org.apache.kafka.connect.data.Schema +import org.apache.kafka.connect.data.SchemaBuilder +import org.apache.kafka.connect.data.Struct +import org.apache.kafka.connect.header.ConnectHeaders + +class TransformAndExtractPKTest extends AnyFunSuite with Matchers { + // Helper method to create KcqlValues + def createKcqlValues(fields: Seq[(String, String, Vector[String])]): KcqlValues = { + val fieldObjects = fields.map { + case (name, alias, parents) => Field(name, alias, parents) + } + KcqlValues( + fields = fieldObjects, + ignoredFields = Seq.empty, + primaryKeysPath = Seq.empty, + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + } + + test("should return None and empty Seq when value is null") { + val result = TransformAndExtractPK.apply( + kcqlValues = createKcqlValues(Seq.empty), + schema = null, + value = null, + withStructure = false, + keySchema = null, + key = null, + headers = new ConnectHeaders(), + ) + result shouldEqual (None, Seq.empty) + } + + test("should handle valid JSON value and extract primary keys") { + val jsonString = """{"field1": "value1", "field2": 2}""" + val value = jsonString + val schema = Schema.STRING_SCHEMA + + val keyJsonString = """{"keyField": "keyValue"}""" + val key = keyJsonString + val keySchema = Schema.STRING_SCHEMA + + val headers = new ConnectHeaders() + headers.addString("headerKey", "headerValue") + + val kcqlValues = KcqlValues( + fields = Seq( + Field(name = "field1", alias = "field1", parents = Vector.empty), + Field(name = "field2", alias = "field2", parents = Vector.empty), + ), + ignoredFields = Seq.empty, + primaryKeysPath = Seq( + Vector("_value", "field1"), + Vector("_key", "keyField"), + Vector("_header", "headerKey"), + ), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val result = TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = value, + withStructure = false, + keySchema = keySchema, + key = key, + headers = headers, + ) + + val (transformedJsonOpt, primaryKeys) = result + + transformedJsonOpt shouldBe defined + val transformedJson = transformedJsonOpt.get + + transformedJson.get("field1").asText() shouldEqual "value1" + transformedJson.get("field2").asInt() shouldEqual 2 + + primaryKeys should have size 3 + primaryKeys(0) shouldEqual "value1" + primaryKeys(1) shouldEqual "keyValue" + primaryKeys(2) shouldEqual "headerValue" + } + + test("should throw exception when header is missing") { + + val jsonString = """{"field1": "value1"}""" + val value = jsonString + val schema = Schema.STRING_SCHEMA + + val headers = new ConnectHeaders() + + val kcqlValues = KcqlValues( + fields = Seq( + Field(name = "field1", alias = "field1", parents = Vector.empty), + ), + ignoredFields = Seq.empty, + primaryKeysPath = Seq(Vector("_header", "missingHeader")), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val exception = intercept[IllegalArgumentException] { + TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = value, + withStructure = false, + keySchema = null, + key = null, + headers = headers, + ) + } + + exception.getMessage should include("Header with key 'missingHeader' not found") + } + + test("should extract primary key from Struct value") { + + val schema = SchemaBuilder.struct() + .field("field1", Schema.STRING_SCHEMA) + .field("field2", Schema.INT32_SCHEMA) + .build() + + val struct = new Struct(schema) + .put("field1", "value1") + .put("field2", 2) + + val value = struct + + val kcqlValues = KcqlValues( + fields = Seq( + Field(name = "field1", alias = "field1", parents = Vector.empty), + Field(name = "field2", alias = "field2", parents = Vector.empty), + ), + ignoredFields = Seq.empty, + primaryKeysPath = Seq(Vector("_value", "field1")), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val result = TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = value, + withStructure = false, + keySchema = null, + key = null, + headers = new ConnectHeaders(), + ) + + val (transformedJsonOpt, primaryKeys) = result + + transformedJsonOpt shouldBe defined + val transformedJson = transformedJsonOpt.get + + transformedJson.get("field1").asText() shouldEqual "value1" + transformedJson.get("field2").asInt() shouldEqual 2 + + primaryKeys should have size 1 + primaryKeys(0) shouldEqual "value1" + } + + test("should use the primary key value when the field path is just _key and the key payload is a primitive LONG") { + //key payload is a primitive long + val key = 123L + val keySchema = Schema.INT64_SCHEMA + + val kcqlValues = KcqlValues( + fields = Seq.empty, + ignoredFields = Seq.empty, + primaryKeysPath = Seq(Vector("_key")), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val schema = SchemaBuilder.struct() + .field("field1", Schema.STRING_SCHEMA) + .field("field2", Schema.INT32_SCHEMA) + .build() + + val struct = new Struct(schema) + .put("field1", "value1") + .put("field2", 2) + + val result = TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = struct, + withStructure = false, + keySchema = keySchema, + key = key, + headers = new ConnectHeaders(), + ) + + val (transformedJsonOpt, primaryKeys) = result + primaryKeys should have size 1 + primaryKeys(0) shouldEqual 123L + } + test("should use the primary key value when the field path is just _key and the key payload is a primitive STRONG") { + //key payload is a primitive string + val key = "keyValue" + val keySchema = Schema.STRING_SCHEMA + + val kcqlValues = KcqlValues( + fields = Seq.empty, + ignoredFields = Seq.empty, + primaryKeysPath = Seq(Vector("_key")), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val schema = SchemaBuilder.struct() + .field("field1", Schema.STRING_SCHEMA) + .field("field2", Schema.INT32_SCHEMA) + .build() + + val struct = new Struct(schema) + .put("field1", "value1") + .put("field2", 2) + + val result = TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = struct, + withStructure = false, + keySchema = keySchema, + key = key, + headers = new ConnectHeaders(), + ) + + val (transformedJsonOpt, primaryKeys) = result + primaryKeys should have size 1 + primaryKeys(0) shouldEqual "keyValue" + + } + + test("fail when the PK path uses _key.a when the key is only a primitive STRING") { + //key payload is a primitive string + val key = "keyValue" + val keySchema = Schema.STRING_SCHEMA + + val kcqlValues = KcqlValues( + fields = Seq.empty, + ignoredFields = Seq.empty, + primaryKeysPath = Seq(Vector("_key", "a")), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val schema = SchemaBuilder.struct() + .field("field1", Schema.STRING_SCHEMA) + .field("field2", Schema.INT32_SCHEMA) + .build() + + val struct = new Struct(schema) + .put("field1", "value1") + .put("field2", 2) + + val exception = the[IllegalArgumentException] thrownBy { + TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = struct, + withStructure = false, + keySchema = keySchema, + key = key, + headers = new ConnectHeaders(), + ) + } + + exception.getMessage should include("Invalid field selection for '_key.a'") + } + + test("should throw exception when primary key path is invalid") { + + val jsonString = """{"field1": {"nestedField": "value1"}}""" + val value = jsonString + val schema = Schema.STRING_SCHEMA + + val kcqlValues = KcqlValues( + fields = Seq( + Field(name = "field1", alias = "field1", parents = Vector.empty), + ), + ignoredFields = Seq.empty, + primaryKeysPath = Seq(Vector("_value", "field1", "nonexistentField")), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val exception = intercept[IllegalArgumentException] { + TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = value, + withStructure = false, + keySchema = null, + key = null, + headers = new ConnectHeaders(), + ) + } + + exception.getMessage should include("Can't find nonexistentField field") + } + + test("should return the path when the message _key is involved") { + + val jsonString = """{"field1": "value1"}""" + val value = jsonString + val schema = Schema.STRING_SCHEMA + + val keyJsonString = """{"keyField": "keyValue"}""" + val key = keyJsonString + val keySchema = Schema.STRING_SCHEMA + + val headers = new ConnectHeaders() + headers.addString("headerKey", "headerValue") + + val kcqlValues = KcqlValues( + fields = Seq( + Field(name = "field1", alias = "field1", parents = Vector.empty), + ), + ignoredFields = Seq.empty, + primaryKeysPath = Seq( + Vector("_key", "keyField"), + ), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val result = TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = value, + withStructure = false, + keySchema = keySchema, + key = key, + headers = headers, + ) + + val (transformedJsonOpt, primaryKeys) = result + + transformedJsonOpt shouldBe defined + val transformedJson = transformedJsonOpt.get + + transformedJson.get("field1").asText() shouldEqual "value1" + + primaryKeys should have size 1 + primaryKeys(0) shouldEqual "keyValue" // Extracted from _key.keyField + } + test("return the primary key when the _key path is involved and the path is 2 levels deep") { + + val jsonString = """{"field1": {"nestedField": "value1"}}""" + val value = jsonString + val schema = Schema.STRING_SCHEMA + + val keyJsonString = """{"keyField": "keyValue"}""" + val key = keyJsonString + val keySchema = Schema.STRING_SCHEMA + + val headers = new ConnectHeaders() + headers.addString("headerKey", "headerValue") + + val kcqlValues = KcqlValues( + fields = Seq( + Field(name = "field1", alias = "field1", parents = Vector.empty), + ), + ignoredFields = Seq.empty, + primaryKeysPath = Seq( + Vector("_key", "keyField"), + ), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val result = TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = value, + withStructure = false, + keySchema = keySchema, + key = key, + headers = headers, + ) + + val (transformedJsonOpt, primaryKeys) = result + + transformedJsonOpt shouldBe defined + val transformedJson = transformedJsonOpt.get + + transformedJson.get("field1").get("nestedField").asText() shouldEqual "value1" + + primaryKeys should have size 1 + primaryKeys(0) shouldEqual "keyValue" + } + + test("returns the primary key from a header entry") { + val jsonString = """{"field1": "value1"}""" + val value = jsonString + val schema = Schema.STRING_SCHEMA + + val keyJsonString = """{"keyField": "keyValue"}""" + val key = keyJsonString + val keySchema = Schema.STRING_SCHEMA + + val headers = new ConnectHeaders() + headers.addString("headerKey", "headerValue") + + val kcqlValues = KcqlValues( + fields = Seq( + Field(name = "field1", alias = "field1", parents = Vector.empty), + ), + ignoredFields = Seq.empty, + primaryKeysPath = Seq( + Vector("_header", "headerKey"), + ), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val result = TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = value, + withStructure = false, + keySchema = keySchema, + key = key, + headers = headers, + ) + + val (transformedJsonOpt, primaryKeys) = result + + transformedJsonOpt shouldBe defined + val transformedJson = transformedJsonOpt.get + + transformedJson.get("field1").asText() shouldEqual "value1" + + primaryKeys should have size 1 + primaryKeys(0) shouldEqual "headerValue" + } + + test("fail when primary key path involves a missing header key") { + val jsonString = """{"field1": "value1"}""" + val value = jsonString + val schema = Schema.STRING_SCHEMA + + val keyJsonString = """{"keyField": "keyValue"}""" + val key = keyJsonString + val keySchema = Schema.STRING_SCHEMA + + val headers = new ConnectHeaders() + headers.addString("headerKey", "headerValue") + + val kcqlValues = KcqlValues( + fields = Seq( + Field(name = "field1", alias = "field1", parents = Vector.empty), + ), + ignoredFields = Seq.empty, + primaryKeysPath = Seq( + Vector("_header", "missingHeader"), + ), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val exception = intercept[IllegalArgumentException] { + TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = value, + withStructure = false, + keySchema = keySchema, + key = key, + headers = headers, + ) + } + + exception.getMessage should include("Header with key 'missingHeader' not found") + } + test("fail when the header key has a null value") { + val jsonString = """{"field1": "value1"}""" + val value = jsonString + val schema = Schema.STRING_SCHEMA + + val keyJsonString = """{"keyField": "keyValue"}""" + val key = keyJsonString + val keySchema = Schema.STRING_SCHEMA + + val headers = new ConnectHeaders() + headers.addString("headerKey", null) + + val kcqlValues = KcqlValues( + fields = Seq( + Field(name = "field1", alias = "field1", parents = Vector.empty), + ), + ignoredFields = Seq.empty, + primaryKeysPath = Seq( + Vector("_header", "headerKey"), + ), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val exception = intercept[IllegalArgumentException] { + TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = value, + withStructure = false, + keySchema = keySchema, + key = key, + headers = headers, + ) + } + + exception.getMessage should include("Header 'headerKey' has a null value") + } + test("fail when the primary key path uses a nested header key") { + val jsonString = """{"field1": "value1"}""" + val value = jsonString + val schema = Schema.STRING_SCHEMA + + val keyJsonString = """{"keyField": "keyValue"}""" + val key = keyJsonString + val keySchema = Schema.STRING_SCHEMA + + val headers = new ConnectHeaders() + headers.addString("headerKey", "headerValue") + + val kcqlValues = KcqlValues( + fields = Seq( + Field(name = "field1", alias = "field1", parents = Vector.empty), + ), + ignoredFields = Seq.empty, + primaryKeysPath = Seq( + Vector("_header", "headerKey", "nonexistentField"), + ), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val exception = intercept[IllegalArgumentException] { + TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = value, + withStructure = false, + keySchema = keySchema, + key = key, + headers = headers, + ) + } + + exception.getMessage shouldBe "Invalid field selection for '_header.headerKey.nonexistentField'. Headers lookup only supports single-level keys. Nested header keys are not supported." + } + + //this is not a header key path test + test("fail when primary key path uses a key path which does not exists") { + val jsonString = """{"field1": "value1"}""" + val value = jsonString + val schema = Schema.STRING_SCHEMA + + val keyJsonString = """{"keyField": "keyValue"}""" + val key = keyJsonString + val keySchema = Schema.STRING_SCHEMA + + val headers = new ConnectHeaders() + headers.addString("headerKey", "headerValue") + + val kcqlValues = KcqlValues( + fields = Seq( + Field(name = "field1", alias = "field1", parents = Vector.empty), + ), + ignoredFields = Seq.empty, + primaryKeysPath = Seq( + Vector("_key", "nonexistentField"), + ), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val exception = intercept[IllegalArgumentException] { + TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = value, + withStructure = false, + keySchema = keySchema, + key = key, + headers = headers, + ) + } + + exception.getMessage shouldBe "Invalid field selection for '_key.nonexistentField'. Can't find nonexistentField field. Field found are:keyField" + } +}