diff --git a/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/ElasticJsonWriter.scala b/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/ElasticJsonWriter.scala index 5aab0f914..3ea9398bf 100644 --- a/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/ElasticJsonWriter.scala +++ b/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/ElasticJsonWriter.scala @@ -145,11 +145,13 @@ class ElasticJsonWriter(client: KElasticClient, settings: ElasticSettings) ) } else { TransformAndExtractPK( - kcqlValue.fields, - kcqlValue.primaryKeysPath, + kcqlValue, r.valueSchema(), r.value(), kcql.hasRetainStructure, + r.keySchema(), + r.key(), + r.headers(), ) } val idFromPk = pks.mkString(settings.pkJoinerSeparator) @@ -211,15 +213,13 @@ class ElasticJsonWriter(client: KElasticClient, settings: ElasticSettings) pks.mkString(settings.pkJoinerSeparator) } - private case class KcqlValues( - fields: Seq[Field], - ignoredFields: Seq[Field], - primaryKeysPath: Seq[Vector[String]], - behaviorOnNullValues: NullValueBehavior, - ) - } - +case class KcqlValues( + fields: Seq[Field], + ignoredFields: Seq[Field], + primaryKeysPath: Seq[Vector[String]], + behaviorOnNullValues: NullValueBehavior, +) case object IndexableJsonNode extends Indexable[JsonNode] { override def json(t: JsonNode): String = t.toString } diff --git a/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/JsonPayloadExtractor.scala b/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/JsonPayloadExtractor.scala new file mode 100644 index 000000000..a13115283 --- /dev/null +++ b/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/JsonPayloadExtractor.scala @@ -0,0 +1,215 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.elastic6 + +import cats.implicits.catsSyntaxEitherId +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.BigIntegerNode +import com.fasterxml.jackson.databind.node.BooleanNode +import com.fasterxml.jackson.databind.node.DecimalNode +import com.fasterxml.jackson.databind.node.DoubleNode +import com.fasterxml.jackson.databind.node.FloatNode +import com.fasterxml.jackson.databind.node.IntNode +import com.fasterxml.jackson.databind.node.LongNode +import com.fasterxml.jackson.databind.node.TextNode +import io.lenses.json.sql.JacksonJson +import io.lenses.streamreactor.connect.json.SimpleJsonConverter +import org.apache.kafka.connect.data.Schema +import org.apache.kafka.connect.data.Struct + +import java.nio.ByteBuffer +import scala.jdk.CollectionConverters.CollectionHasAsScala +import scala.jdk.CollectionConverters.MapHasAsScala +import scala.util.Failure +import scala.util.Success +import scala.util.Try + +object JsonPayloadExtractor { + lazy val simpleJsonConverter = new SimpleJsonConverter() + + def extractJsonNode(value: Any, schema: Schema): Either[String, Option[JsonNode]] = + (Option(value), Option(schema).map(_.`type`())) match { + case (None, _) => Right(None) + case (Some(_), Some(Schema.Type.BYTES)) => handleBytes(value) + case (Some(_), Some(Schema.Type.STRING)) => handleString(value) + case (Some(_), Some(Schema.Type.INT8)) => handleLong(value) + case (Some(_), Some(Schema.Type.INT16)) => handleLong(value) + case (Some(_), Some(Schema.Type.INT32)) => handleLong(value) + case (Some(_), Some(Schema.Type.INT64)) => handleLong(value) + case (Some(_), Some(Schema.Type.FLOAT32)) => handleFloat(value) + case (Some(_), Some(Schema.Type.FLOAT64)) => handleDouble(value) + case (Some(_), Some(Schema.Type.STRUCT)) => handleStruct(value) + case (Some(_), Some(Schema.Type.BOOLEAN)) => handleBoolean(value) + case (Some(_), Some(Schema.Type.ARRAY)) => handleArray(value) + case (Some(_), Some(Schema.Type.MAP)) => handleMap(value) + case (Some(_), Some(other)) => Left(s"Unsupported Schema type: $other") + case (Some(v), None) => handleSchemaLess(v) + } + + private def handleArray(value: Any): Either[String, Option[JsonNode]] = + value match { + case l: Iterable[_] => + val arrayNode = JacksonJson.mapper.createArrayNode() + l.foreach { item => + extractJsonNode(item, null) match { + case Right(Some(node)) => arrayNode.add(node) + case Right(None) => // ignore + case Left(err) => return Left(err) + } + } + Right(Some(arrayNode)) + + case jc: java.util.Collection[_] => + val arrayNode = JacksonJson.mapper.createArrayNode() + jc.asScala.foreach { item => + extractJsonNode(item, null) match { + case Right(Some(node)) => arrayNode.add(node) + case Right(None) => // ignore + case Left(err) => return Left(err) + } + } + Right(Some(arrayNode)) + case a: Array[_] => + val arrayNode = JacksonJson.mapper.createArrayNode() + a.foreach { item => + extractJsonNode(item, null) match { + case Right(Some(node)) => arrayNode.add(node) + case Right(None) => // ignore + case Left(err) => return Left(err) + } + } + Right(Some(arrayNode)) + case other => Left(s"Expected array but got: $other") + } + + private def handleMap(value: Any): Either[String, Option[JsonNode]] = + value match { + case m: java.util.Map[_, _] => + val map = m.asInstanceOf[java.util.Map[String, Any]] + val mapNode = JacksonJson.mapper.createObjectNode() + map.asScala.foreach { + case (key, value) => + extractJsonNode(value, null) match { + case Right(Some(node)) => mapNode.set(key, node) + case Right(None) => // ignore + case Left(err) => return Left(err) + } + } + Right(Some(mapNode)) + case other => Left(s"Expected map but got: $other") + } + private def handleBoolean(value: Any): Either[String, Option[JsonNode]] = + value match { + case b: Boolean => Some(BooleanNode.valueOf(b)).asRight[String] + case other => Left(s"Expected boolean but got: $other") + } + private def handleDouble(value: Any): Either[String, Option[JsonNode]] = + value match { + case f: Float => Some(DoubleNode.valueOf(f.toDouble)).asRight[String] + case d: Double => Some(DoubleNode.valueOf(d)).asRight[String] + case other => Left(s"Expected double but got: $other") + } + + private def handleFloat(value: Any): Either[String, Option[JsonNode]] = + value match { + case f: Float => Some(FloatNode.valueOf(f)).asRight[String] + case d: Double => Some(FloatNode.valueOf(d.toFloat)).asRight[String] + case other => Left(s"Expected float but got: $other") + } + + private def handleLong(value: Any): Either[String, Option[JsonNode]] = + value match { + case b: Byte => Some(LongNode.valueOf(b.toLong)).asRight[String] + case s: Short => Some(LongNode.valueOf(s.toLong)).asRight[String] + case i: Int => Some(LongNode.valueOf(i.toLong)).asRight[String] + case l: Long => Some(LongNode.valueOf(l)).asRight[String] + case other => Left(s"Expected long but got: $other") + } + + private def handleBytes(value: Any): Either[String, Option[JsonNode]] = + value match { + case bytes: Array[Byte] => + tryReadJson(bytes).map(Some(_)) + case byteBuffer: ByteBuffer => + val bytes = new Array[Byte](byteBuffer.remaining()) + byteBuffer.get(bytes) + tryReadJson(bytes).map(Some(_)) + case other => Left(s"Expected byte array or ByteBuffer but got: $other") + } + + private def handleString(value: Any): Either[String, Option[JsonNode]] = + value match { + case s: String => + tryParseJson(s).map(Some(_)) match { + case Left(_) => TextNode.valueOf(s).asRight[String].map(Some(_)) + case r => r + } + case other => Left(s"Expected string but got: $other") + } + + private def handleStruct(value: Any): Either[String, Option[JsonNode]] = + value match { + case struct: Struct => + Try(simpleJsonConverter.fromConnectData(struct.schema(), struct)) match { + case Success(jsonNode) => Right(Some(jsonNode)) + case Failure(e) => Left(s"Failed to convert Struct to JsonNode: ${e.getMessage}") + } + case other => Left(s"Expected Struct but got: $other") + } + + private def handleSchemaLess(value: Any): Either[String, Option[JsonNode]] = + value match { + case m: java.util.Map[_, _] => + Try { + val map = m.asInstanceOf[java.util.Map[String, Any]] + JacksonJson.mapper.valueToTree[JsonNode](map) + } match { + case Success(node) => Right(Some(node)) + case Failure(e) => Left(s"Failed to convert Map to JsonNode: ${e.getMessage}") + } + + case s: String => + tryParseJson(s).map(Some(_)) match { + case Left(_) => TextNode.valueOf(s).asRight[String].map(Some(_)) + case r => r + } + case b: Array[Byte] => tryReadJson(b).map(Some(_)) + case b: Byte => IntNode.valueOf(b.toInt).asRight[String].map(Some(_)) + case s: Short => IntNode.valueOf(s.toInt).asRight[String].map(Some(_)) + case i: Int => IntNode.valueOf(i).asRight[String].map(Some(_)) + case l: Long => LongNode.valueOf(l).asRight[String].map(Some(_)) + case f: Float => FloatNode.valueOf(f).asRight[String].map(Some(_)) + case double: Double => DoubleNode.valueOf(double).asRight[String].map(Some(_)) + case bigDecimal: BigDecimal => DecimalNode.valueOf(bigDecimal.bigDecimal).asRight[String].map(Some(_)) + case bigDecimal: java.math.BigDecimal => DecimalNode.valueOf(bigDecimal).asRight[String].map(Some(_)) + case boolean: Boolean => BooleanNode.valueOf(boolean).asRight[String].map(Some(_)) + case bi: BigInt => BigIntegerNode.valueOf(bi.bigInteger).asRight[String].map(Some(_)) + case bi: java.math.BigInteger => BigIntegerNode.valueOf(bi).asRight[String].map(Some(_)) + case other => Left(s"Unsupported value type: ${other.getClass.getName}") + } + + private def tryParseJson(str: String): Either[String, JsonNode] = + Try(JacksonJson.asJson(str)) match { + case Success(json) => Right(json) + case Failure(e) => Left(s"Invalid JSON string: ${e.getMessage}") + } + + private def tryReadJson(bytes: Array[Byte]): Either[String, JsonNode] = + Try(JacksonJson.mapper.readTree(bytes)) match { + case Success(json) => Right(json) + case Failure(e) => Left(s"Invalid JSON bytes: ${e.getMessage}") + } +} diff --git a/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/KafkaMessageParts.scala b/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/KafkaMessageParts.scala new file mode 100644 index 000000000..9fe64ddfc --- /dev/null +++ b/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/KafkaMessageParts.scala @@ -0,0 +1,23 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.elastic6 + +object KafkaMessageParts { + val Key = "_key" + val Value = "_value" + val Header = "_header" + val Topic = "_topic" +} diff --git a/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/PrimaryKeyExtractor.scala b/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/PrimaryKeyExtractor.scala index 34e45a4ff..471d745f5 100644 --- a/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/PrimaryKeyExtractor.scala +++ b/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/PrimaryKeyExtractor.scala @@ -25,13 +25,13 @@ import scala.jdk.CollectionConverters.IteratorHasAsScala import scala.jdk.CollectionConverters.ListHasAsScala object PrimaryKeyExtractor { - def extract(node: JsonNode, path: Vector[String]): Any = { + def extract(node: JsonNode, path: Vector[String], prefix: String = ""): Any = { @tailrec def innerExtract(n: JsonNode, p: Vector[String]): Any = { def checkValidPath(): Unit = if (p.nonEmpty) { throw new IllegalArgumentException( - s"Invalid field selection for '${path.mkString(".")}'. It doesn't resolve to a primitive field", + s"Invalid field selection for '$prefix${path.mkString(".")}'. It doesn't resolve to a primitive field", ) } @@ -79,24 +79,24 @@ object PrimaryKeyExtractor { case node: ObjectNode => if (p.isEmpty) { throw new IllegalArgumentException( - s"Invalid field selection for '${path.mkString(".")}'. The path is not resolving to a primitive field", + s"Invalid field selection for '$prefix${path.mkString(".")}'. The path is not resolving to a primitive field", ) } val childNode = Option(node.get(p.head)).getOrElse { throw new IllegalArgumentException( - s"Invalid field selection for '${path.mkString(".")}'. Can't find ${p.head} field. Field found are:${node.fieldNames().asScala.mkString(",")}", + s"Invalid field selection for '$prefix${path.mkString(".")}'. Can't find ${p.head} field. Field found are:${node.fieldNames().asScala.mkString(",")}", ) } innerExtract(childNode, p.tail) case _: ArrayNode => throw new IllegalArgumentException( - s"Invalid field selection for '${path.mkString(".")}'. The path is involving an array structure", + s"Invalid field selection for '$prefix${path.mkString(".")}'. The path is involving an array structure", ) case other => throw new IllegalArgumentException( - s"Invalid field selection for '${path.mkString(".")}'. $other is not handled", + s"Invalid field selection for '$prefix${path.mkString(".")}'. $other is not handled", ) } } diff --git a/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/TransformAndExtractPK.scala b/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/TransformAndExtractPK.scala index 354d59a00..8e4f0d6e1 100644 --- a/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/TransformAndExtractPK.scala +++ b/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/TransformAndExtractPK.scala @@ -15,17 +15,12 @@ */ package io.lenses.streamreactor.connect.elastic6 -import java.nio.ByteBuffer - -import io.lenses.streamreactor.connect.json.SimpleJsonConverter import com.fasterxml.jackson.databind.JsonNode -import io.lenses.connect.sql.StructSql._ -import io.lenses.json.sql.JacksonJson -import io.lenses.json.sql.JsonSql._ -import io.lenses.sql.Field import com.typesafe.scalalogging.StrictLogging +import io.lenses.json.sql.JsonSql._ +import io.lenses.streamreactor.connect.json.SimpleJsonConverter import org.apache.kafka.connect.data.Schema -import org.apache.kafka.connect.data.Struct +import org.apache.kafka.connect.header.Headers import scala.util.Failure import scala.util.Success @@ -35,94 +30,88 @@ private object TransformAndExtractPK extends StrictLogging { lazy val simpleJsonConverter = new SimpleJsonConverter() def apply( - fields: Seq[Field], - primaryKeysPaths: Seq[Vector[String]], - schema: Schema, - value: Any, - withStructure: Boolean, - ): (Option[JsonNode], Seq[Any]) = { - def raiseException(msg: String, t: Throwable) = throw new IllegalArgumentException(msg, t) - + kcqlValues: KcqlValues, + schema: Schema, + value: Any, + withStructure: Boolean, + keySchema: Schema, + key: Any, + headers: Headers, + ): (Option[JsonNode], Seq[Any]) = if (value == null) { (None, Seq.empty) } else { - if (schema != null) { - schema.`type`() match { - case Schema.Type.BYTES => - //we expected to be json - val array = value match { - case a: Array[Byte] => a - case b: ByteBuffer => b.array() - case other => raiseException(s"Invalid payload:$other for schema Schema.BYTES.", null) - } + val result = for { + jsonNode <- extractJsonNode(value, schema) + transformedJson = jsonNode.sql(kcqlValues.fields, !withStructure) + keyJsonNodeOpt: Option[JsonNode] <- if (hasKeyFieldPath(kcqlValues.primaryKeysPath)) + extractOptionalJsonNode(key, keySchema) + else Try(Option.empty[JsonNode]) + } yield { + val primaryKeys = kcqlValues.primaryKeysPath.map { path => + extractPrimaryKey(path, jsonNode, keyJsonNodeOpt, headers) + } + (Option(transformedJson), primaryKeys) + } - Try(JacksonJson.mapper.readTree(array)) match { - case Failure(e) => raiseException("Invalid json.", e) - case Success(json) => - Try(json.sql(fields, !withStructure)) match { - case Failure(e) => raiseException(s"A KCQL exception occurred. ${e.getMessage}", e) - case Success(jn) => - (Option(jn), primaryKeysPaths.map(PrimaryKeyExtractor.extract(json, _))) - } - } + result match { + case Success(value) => value + case Failure(e) => throw e + } + } - case Schema.Type.STRING => - //we expected to be json - Try(JacksonJson.asJson(value.asInstanceOf[String])) match { - case Failure(e) => raiseException("Invalid json", e) - case Success(json) => - Try(json.sql(fields, !withStructure)) match { - case Success(jn) => (Option(jn), primaryKeysPaths.map(PrimaryKeyExtractor.extract(json, _))) - case Failure(e) => raiseException(s"A KCQL exception occurred.${e.getMessage}", e) - } - } + private def hasKeyFieldPath(paths: Seq[Vector[String]]): Boolean = + paths.exists(_.head == KafkaMessageParts.Key) - case Schema.Type.STRUCT => - val struct = value.asInstanceOf[Struct] - Try(struct.sql(fields, !withStructure)) match { - case Success(s) => - (Option(simpleJsonConverter.fromConnectData(s.schema(), s)), - primaryKeysPaths.map(PrimaryKeyExtractor.extract(struct, _)), - ) + private def extractJsonNode(value: Any, schema: Schema): Try[JsonNode] = + JsonPayloadExtractor.extractJsonNode(value, schema) match { + case Left(error) => Failure(new IllegalArgumentException(error)) + case Right(Some(node)) => Success(node) + case Right(None) => Failure(new IllegalArgumentException("Failed to extract JsonNode from value")) + } - case Failure(e) => raiseException(s"A KCQL error occurred.${e.getMessage}", e) - } + private def extractOptionalJsonNode(value: Any, schema: Schema): Try[Option[JsonNode]] = + if (value == null) Success(None) + else { + JsonPayloadExtractor.extractJsonNode(value, schema) match { + case Left(error) => Failure(new IllegalArgumentException(error)) + case Right(nodeOpt) => Success(nodeOpt) + } + } - case other => raiseException(s"Can't transform Schema type:$other.", null) + private def extractPrimaryKey( + path: Vector[String], + jsonNode: JsonNode, + keyJsonNodeOpt: Option[JsonNode], + headers: Headers, + ): Any = + path.head match { + case KafkaMessageParts.Key => + keyJsonNodeOpt match { + case Some(keyNode) => PrimaryKeyExtractor.extract(keyNode, path.tail, "_key.") + case None => + throw new IllegalArgumentException( + s"Key path '${path.mkString(".")}' has a null value", + ) } - } else { - //we can handle java.util.Map (this is what JsonConverter can spit out) - value match { - case m: java.util.Map[_, _] => - val map = m.asInstanceOf[java.util.Map[String, Any]] - val jsonNode: JsonNode = JacksonJson.mapper.valueToTree[JsonNode](map) - Try(jsonNode.sql(fields, !withStructure)) match { - case Success(j) => (Option(j), primaryKeysPaths.map(PrimaryKeyExtractor.extract(jsonNode, _))) - case Failure(e) => raiseException(s"A KCQL exception occurred.${e.getMessage}", e) - } - case s: String => - Try(JacksonJson.asJson(s)) match { - case Failure(e) => raiseException("Invalid json.", e) - case Success(json) => - Try(json.sql(fields, !withStructure)) match { - case Success(jn) => (Option(jn), primaryKeysPaths.map(PrimaryKeyExtractor.extract(json, _))) - case Failure(e) => raiseException(s"A KCQL exception occurred.${e.getMessage}", e) - } - } - - case b: Array[Byte] => - Try(JacksonJson.mapper.readTree(b)) match { - case Failure(e) => raiseException("Invalid json.", e) - case Success(json) => - Try(json.sql(fields, !withStructure)) match { - case Failure(e) => raiseException(s"A KCQL exception occurred. ${e.getMessage}", e) - case Success(jn) => (Option(jn), primaryKeysPaths.map(PrimaryKeyExtractor.extract(json, _))) - } + case KafkaMessageParts.Value => + PrimaryKeyExtractor.extract(jsonNode, path.tail) + case KafkaMessageParts.Header => + if (path.tail.size != 1) { + throw new IllegalArgumentException( + s"Invalid field selection for '${path.mkString(".")}'. " + + s"Headers lookup only supports single-level keys. Nested header keys are not supported.", + ) + } + headers.lastWithName(path.tail.head) match { + case null => throw new IllegalArgumentException(s"Header with key '${path.tail.head}' not found") + case header => header.value() match { + case value: String => value + case null => throw new IllegalArgumentException(s"Header '${path.tail.head}' has a null value") + case _ => throw new IllegalArgumentException(s"Header '${path.tail.head}' is not a string") } - //we take it as String - case other => raiseException(s"Value:$other is not handled!", null) } - } + case _ => + PrimaryKeyExtractor.extract(jsonNode, path) } - } } diff --git a/kafka-connect-elastic6/src/test/scala/io/lenses/streamreactor/connect/elastic6/JsonPayloadExtractorTest.scala b/kafka-connect-elastic6/src/test/scala/io/lenses/streamreactor/connect/elastic6/JsonPayloadExtractorTest.scala new file mode 100644 index 000000000..ce799d373 --- /dev/null +++ b/kafka-connect-elastic6/src/test/scala/io/lenses/streamreactor/connect/elastic6/JsonPayloadExtractorTest.scala @@ -0,0 +1,188 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.elastic6 + +import com.fasterxml.jackson.databind.node.TextNode +import org.apache.kafka.connect.data.Schema +import org.apache.kafka.connect.data.SchemaBuilder +import org.apache.kafka.connect.data.Struct +import org.scalatest.EitherValues +import org.scalatest.OptionValues +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers + +import java.nio.ByteBuffer +import java.util.{ HashMap => JHashMap } + +class JsonPayloadExtractorTest extends AnyFunSuite with Matchers with EitherValues with OptionValues { + + test("handle null values") { + val result = JsonPayloadExtractor.extractJsonNode(null, null) + result shouldBe Right(None) + } + + test("handle string JSON with schema") { + val jsonString = """{"name": "test"}""" + val schema = Schema.STRING_SCHEMA + val result = JsonPayloadExtractor.extractJsonNode(jsonString, schema) + + result.value.value.get("name").asText() shouldBe "test" + } + + test("primitive string is read as TextNode") { + val invalidJson = "invalid json" + val schema = Schema.STRING_SCHEMA + val result = JsonPayloadExtractor.extractJsonNode(invalidJson, schema) + + result.value.value shouldBe a[TextNode] + result.value.value.asText() shouldBe invalidJson + } + + test("handle byte array with schema") { + val jsonString = """{"name": "test"}""" + val bytes = jsonString.getBytes("UTF-8") + val schema = Schema.BYTES_SCHEMA + val result = JsonPayloadExtractor.extractJsonNode(bytes, schema) + + result.value.value.get("name").asText() shouldBe "test" + } + + test("handle ByteBuffer with schema") { + val jsonString = """{"name": "test"}""" + val byteBuffer = ByteBuffer.wrap(jsonString.getBytes("UTF-8")) + val schema = Schema.BYTES_SCHEMA + val result = JsonPayloadExtractor.extractJsonNode(byteBuffer, schema) + + result.value.value.get("name").asText() shouldBe "test" + } + + test("handle Struct with schema") { + val schema = SchemaBuilder.struct() + .field("name", Schema.STRING_SCHEMA) + .field("age", Schema.INT32_SCHEMA) + .build() + + val struct = new Struct(schema) + .put("name", "John") + .put("age", 30) + + val result = JsonPayloadExtractor.extractJsonNode(struct, schema) + + val jsonNode = result.value.value + jsonNode.get("name").asText() shouldBe "John" + jsonNode.get("age").asInt() shouldBe 30 + } + + test("handle Map without schema") { + val javaMap = new JHashMap[String, Any]() + javaMap.put("name", "test") + javaMap.put("value", Integer.valueOf(42)) + + val result = JsonPayloadExtractor.extractJsonNode(javaMap, null) + + val jsonNode = result.value.value + jsonNode.get("name").asText() shouldBe "test" + jsonNode.get("value").asInt() shouldBe 42 + } + + test("handle nested Map without schema") { + val innerMap = new JHashMap[String, Any]() + innerMap.put("age", Integer.valueOf(25)) + innerMap.put("city", "New York") + + val outerMap = new JHashMap[String, Any]() + outerMap.put("name", "test") + outerMap.put("details", innerMap) + + val result = JsonPayloadExtractor.extractJsonNode(outerMap, null) + + val jsonNode = result.value.value + jsonNode.get("name").asText() shouldBe "test" + jsonNode.get("details").get("age").asInt() shouldBe 25 + jsonNode.get("details").get("city").asText() shouldBe "New York" + } + + test("handle string JSON without schema") { + val jsonString = """{"name": "test"}""" + val result = JsonPayloadExtractor.extractJsonNode(jsonString, null) + + result.value.value.get("name").asText() shouldBe "test" + } + + test("handle byte array without schema") { + val jsonString = """{"name": "test"}""" + val bytes = jsonString.getBytes("UTF-8") + val result = JsonPayloadExtractor.extractJsonNode(bytes, null) + + result.value.value.get("name").asText() shouldBe "test" + } + + test("handle complex nested JSON") { + val jsonString = """ + { + "name": "test", + "details": { + "age": 30, + "address": { + "street": "123 Main St", + "city": "Test City" + }, + "hobbies": ["reading", "coding"] + } + } + """ + val result = JsonPayloadExtractor.extractJsonNode(jsonString, Schema.STRING_SCHEMA) + + val jsonNode = result.value.value + jsonNode.get("name").asText() shouldBe "test" + jsonNode.get("details").get("age").asInt() shouldBe 30 + jsonNode.get("details").get("address").get("street").asText() shouldBe "123 Main St" + jsonNode.get("details").get("hobbies").get(0).asText() shouldBe "reading" + } + + test("handle boolean value type") { + val result = JsonPayloadExtractor.extractJsonNode(true, Schema.BOOLEAN_SCHEMA) + + result.value.value.asBoolean() shouldBe true + } + + test("handle int value type") { + val result = JsonPayloadExtractor.extractJsonNode(42, null) + + result.value.value.asInt() shouldBe 42 + } + + test("handle invalid type for schema") { + val result = JsonPayloadExtractor.extractJsonNode(42, Schema.STRING_SCHEMA) + + result.left.value should include("Expected string but got") + } + + test("handle invalid struct type") { + val schema = SchemaBuilder.struct() + .field("name", Schema.STRING_SCHEMA) + .build() + + val result = JsonPayloadExtractor.extractJsonNode("not a struct", schema) + + result.left.value should include("Expected Struct but got") + } + + test("handle invalid bytes type") { + val result = JsonPayloadExtractor.extractJsonNode("not bytes", Schema.BYTES_SCHEMA) + result.left.value should include("Expected byte array or ByteBuffer") + } +} diff --git a/kafka-connect-elastic6/src/test/scala/io/lenses/streamreactor/connect/elastic6/PrimaryKeyExtractorTest.scala b/kafka-connect-elastic6/src/test/scala/io/lenses/streamreactor/connect/elastic6/PrimaryKeyExtractorTest.scala new file mode 100644 index 000000000..917d66dac --- /dev/null +++ b/kafka-connect-elastic6/src/test/scala/io/lenses/streamreactor/connect/elastic6/PrimaryKeyExtractorTest.scala @@ -0,0 +1,269 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.elastic6 + +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.ObjectMapper +import org.apache.kafka.connect.data._ + +import scala.jdk.CollectionConverters._ +import org.apache.kafka.connect.errors.ConnectException + +class PrimaryKeyExtractorTest extends AnyFunSuite with Matchers { + def parseJson(jsonString: String): JsonNode = { + val mapper = new ObjectMapper() + mapper.readTree(jsonString) + } + + test("extract should retrieve a primitive value from JsonNode") { + val jsonString = """{"field": "value"}""" + val jsonNode = parseJson(jsonString) + val path = Vector("field") + + val result = PrimaryKeyExtractor.extract(jsonNode, path) + result shouldEqual "value" + } + + test("extract should retrieve a nested primitive value from JsonNode") { + val jsonString = """{"parent": {"child": 123}}""" + val jsonNode = parseJson(jsonString) + val path = Vector("parent", "child") + + val result = PrimaryKeyExtractor.extract(jsonNode, path) + result shouldEqual 123 + } + + test("extract should throw exception if path does not exist in JsonNode") { + val jsonString = """{"field": "value"}""" + val jsonNode = parseJson(jsonString) + val path = Vector("nonexistent") + + val exception = intercept[IllegalArgumentException] { + PrimaryKeyExtractor.extract(jsonNode, path) + } + exception.getMessage should include("Can't find nonexistent field") + } + + test("extract should throw exception if path leads to non-primitive field in JsonNode") { + val jsonString = """{"field": {"subfield": "value"}}""" + val jsonNode = parseJson(jsonString) + val path = Vector("field") + + val exception = intercept[IllegalArgumentException] { + PrimaryKeyExtractor.extract(jsonNode, path) + } + exception.getMessage should include("The path is not resolving to a primitive field") + } + + test("extract should throw exception when encountering an array in JsonNode") { + val jsonString = """{"field": [1, 2, 3]}""" + val jsonNode = parseJson(jsonString) + val path = Vector("field") + + val exception = intercept[IllegalArgumentException] { + PrimaryKeyExtractor.extract(jsonNode, path) + } + exception.getMessage should include("The path is involving an array structure") + } + + test("extract should handle various primitive types in JsonNode") { + val jsonString = + """ + |{ + | "stringField": "text", + | "intField": 42, + | "floatField": 3.14, + | "booleanField": true, + | "nullField": null + |} + |""".stripMargin + val jsonNode = parseJson(jsonString) + + PrimaryKeyExtractor.extract(jsonNode, Vector("stringField")) shouldEqual "text" + PrimaryKeyExtractor.extract(jsonNode, Vector("intField")) shouldEqual 42 + PrimaryKeyExtractor.extract(jsonNode, Vector("floatField")) shouldEqual 3.14 + PrimaryKeyExtractor.extract(jsonNode, Vector("booleanField")) shouldEqual true + PrimaryKeyExtractor.extract(jsonNode, Vector("nullField")).asInstanceOf[AnyRef] shouldBe null + } + + test("extract should retrieve a primitive value from Struct") { + val schema = SchemaBuilder.struct() + .field("field", Schema.STRING_SCHEMA) + .build() + + val struct = new Struct(schema) + .put("field", "value") + + val path = Vector("field") + + val result = PrimaryKeyExtractor.extract(struct, path) + result shouldEqual "value" + } + + test("extract should retrieve a nested primitive value from Struct") { + val nestedSchema = SchemaBuilder.struct() + .field("child", Schema.INT32_SCHEMA) + .build() + + val schema = SchemaBuilder.struct() + .field("parent", nestedSchema) + .build() + + val nestedStruct = new Struct(nestedSchema) + .put("child", 123) + + val struct = new Struct(schema) + .put("parent", nestedStruct) + + val path = Vector("parent", "child") + + val result = PrimaryKeyExtractor.extract(struct, path) + result shouldEqual 123 + } + + test("extract should throw exception if path does not exist in Struct") { + val schema = SchemaBuilder.struct() + .field("field", Schema.STRING_SCHEMA) + .build() + + val struct = new Struct(schema) + .put("field", "value") + + val path = Vector("nonexistent") + + val exception = intercept[IllegalArgumentException] { + PrimaryKeyExtractor.extract(struct, path) + } + exception.getMessage should include("Couldn't find field 'nonexistent'") + } + + test("extract should throw exception if field in Struct is null") { + val schema = SchemaBuilder.struct() + .field("field", Schema.OPTIONAL_STRING_SCHEMA) + .build() + + val struct = new Struct(schema) + .put("field", null) + + val path = Vector("field") + + val exception = intercept[IllegalArgumentException] { + PrimaryKeyExtractor.extract(struct, path) + } + exception.getMessage should include("Field 'field' is null") + } + + test("extract should handle various primitive types in Struct") { + val schema = SchemaBuilder.struct() + .field("stringField", Schema.STRING_SCHEMA) + .field("intField", Schema.INT32_SCHEMA) + .field("floatField", Schema.FLOAT32_SCHEMA) + .field("booleanField", Schema.BOOLEAN_SCHEMA) + .build() + + val struct = new Struct(schema) + .put("stringField", "text") + .put("intField", 42) + .put("floatField", 3.14f) + .put("booleanField", true) + + PrimaryKeyExtractor.extract(struct, Vector("stringField")) shouldEqual "text" + PrimaryKeyExtractor.extract(struct, Vector("intField")) shouldEqual 42 + PrimaryKeyExtractor.extract(struct, Vector("floatField")) shouldEqual 3.14f + PrimaryKeyExtractor.extract(struct, Vector("booleanField")) shouldEqual true + } + + test("extract should handle logical types in Struct (e.g., Date, Time, Timestamp)") { + val dateSchema = Date.SCHEMA + val timeSchema = Time.SCHEMA + val timestampSchema = Timestamp.SCHEMA + + val schema = SchemaBuilder.struct() + .field("dateField", dateSchema) + .field("timeField", timeSchema) + .field("timestampField", timestampSchema) + .build() + + val dateValue = new java.util.Date(1627843200000L) // Aug 1, 2021 + val timeValue = new java.util.Date(3600000L) // 1 hour in milliseconds + val timestampValue = new java.util.Date(1627843200000L) + + val struct = new Struct(schema) + .put("dateField", dateValue) + .put("timeField", timeValue) + .put("timestampField", timestampValue) + + PrimaryKeyExtractor.extract(struct, Vector("dateField")) shouldEqual dateValue + PrimaryKeyExtractor.extract(struct, Vector("timeField")) shouldEqual timeValue + PrimaryKeyExtractor.extract(struct, Vector("timestampField")) shouldEqual timestampValue + } + + test("extract should throw exception when encountering unsupported schema type in Struct") { + val schema = SchemaBuilder.struct() + .field("mapField", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA)) + .build() + + val mapValue = Map("key" -> 123).asJava + + val struct = new Struct(schema) + .put("mapField", mapValue) + + val path = Vector("mapField") + + val exception = intercept[IllegalArgumentException] { + PrimaryKeyExtractor.extract(struct, path) + } + exception.getMessage should include("It doesn't resolve to a primitive field") + } + + test("extract should traverse nested maps in Struct") { + val mapSchema = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build() + + val schema = SchemaBuilder.struct() + .field("mapField", mapSchema) + .build() + + val mapValue = Map("innerKey" -> 123).asJava + + val struct = new Struct(schema) + .put("mapField", mapValue) + + val path = Vector("mapField", "innerKey") + + val result = PrimaryKeyExtractor.extract(struct, path) + result shouldEqual 123 + } + + test("extract should throw exception when encountering unsupported schema type") { + val schema = SchemaBuilder.struct() + .field("arrayField", SchemaBuilder.array(Schema.INT32_SCHEMA)) + .build() + + val arrayValue = List(1, 2, 3).asJava + + val struct = new Struct(schema) + .put("arrayField", arrayValue) + + val path = Vector("arrayField") + + val exception = intercept[ConnectException] { + PrimaryKeyExtractor.extract(struct, path) + } + exception.getMessage should include("ARRAY is not a recognized schema") + } +} diff --git a/kafka-connect-elastic6/src/test/scala/io/lenses/streamreactor/connect/elastic6/TransformAndExtractPKTest.scala b/kafka-connect-elastic6/src/test/scala/io/lenses/streamreactor/connect/elastic6/TransformAndExtractPKTest.scala new file mode 100644 index 000000000..1636e5243 --- /dev/null +++ b/kafka-connect-elastic6/src/test/scala/io/lenses/streamreactor/connect/elastic6/TransformAndExtractPKTest.scala @@ -0,0 +1,601 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.elastic6 + +import io.lenses.sql.Field +import org.apache.kafka.connect.data.Schema +import org.apache.kafka.connect.data.SchemaBuilder +import org.apache.kafka.connect.data.Struct +import org.apache.kafka.connect.header.ConnectHeaders +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers + +class TransformAndExtractPKTest extends AnyFunSuite with Matchers { + // Helper method to create KcqlValues + def createKcqlValues(fields: Seq[(String, String, Vector[String])]): KcqlValues = { + val fieldObjects = fields.map { + case (name, alias, parents) => Field(name, alias, parents) + } + KcqlValues( + fields = fieldObjects, + ignoredFields = Seq.empty, + primaryKeysPath = Seq.empty, + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + } + + test("should return None and empty Seq when value is null") { + val result = TransformAndExtractPK.apply( + kcqlValues = createKcqlValues(Seq.empty), + schema = null, + value = null, + withStructure = false, + keySchema = null, + key = null, + headers = new ConnectHeaders(), + ) + result shouldEqual (None, Seq.empty) + } + + test("should handle valid JSON value and extract primary keys") { + val jsonString = """{"field1": "value1", "field2": 2}""" + val value = jsonString + val schema = Schema.STRING_SCHEMA + + val keyJsonString = """{"keyField": "keyValue"}""" + val key = keyJsonString + val keySchema = Schema.STRING_SCHEMA + + val headers = new ConnectHeaders() + headers.addString("headerKey", "headerValue") + + val kcqlValues = KcqlValues( + fields = Seq( + Field(name = "field1", alias = "field1", parents = Vector.empty), + Field(name = "field2", alias = "field2", parents = Vector.empty), + ), + ignoredFields = Seq.empty, + primaryKeysPath = Seq( + Vector("_value", "field1"), + Vector("_key", "keyField"), + Vector("_header", "headerKey"), + ), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val result = TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = value, + withStructure = false, + keySchema = keySchema, + key = key, + headers = headers, + ) + + val (transformedJsonOpt, primaryKeys) = result + + transformedJsonOpt shouldBe defined + val transformedJson = transformedJsonOpt.get + + transformedJson.get("field1").asText() shouldEqual "value1" + transformedJson.get("field2").asInt() shouldEqual 2 + + primaryKeys should have size 3 + primaryKeys(0) shouldEqual "value1" + primaryKeys(1) shouldEqual "keyValue" + primaryKeys(2) shouldEqual "headerValue" + } + + test("should throw exception when header is missing") { + + val jsonString = """{"field1": "value1"}""" + val value = jsonString + val schema = Schema.STRING_SCHEMA + + val headers = new ConnectHeaders() + + val kcqlValues = KcqlValues( + fields = Seq( + Field(name = "field1", alias = "field1", parents = Vector.empty), + ), + ignoredFields = Seq.empty, + primaryKeysPath = Seq(Vector("_header", "missingHeader")), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val exception = intercept[IllegalArgumentException] { + TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = value, + withStructure = false, + keySchema = null, + key = null, + headers = headers, + ) + } + + exception.getMessage should include("Header with key 'missingHeader' not found") + } + + test("should extract primary key from Struct value") { + + val schema = SchemaBuilder.struct() + .field("field1", Schema.STRING_SCHEMA) + .field("field2", Schema.INT32_SCHEMA) + .build() + + val struct = new Struct(schema) + .put("field1", "value1") + .put("field2", 2) + + val value = struct + + val kcqlValues = KcqlValues( + fields = Seq( + Field(name = "field1", alias = "field1", parents = Vector.empty), + Field(name = "field2", alias = "field2", parents = Vector.empty), + ), + ignoredFields = Seq.empty, + primaryKeysPath = Seq(Vector("_value", "field1")), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val result = TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = value, + withStructure = false, + keySchema = null, + key = null, + headers = new ConnectHeaders(), + ) + + val (transformedJsonOpt, primaryKeys) = result + + transformedJsonOpt shouldBe defined + val transformedJson = transformedJsonOpt.get + + transformedJson.get("field1").asText() shouldEqual "value1" + transformedJson.get("field2").asInt() shouldEqual 2 + + primaryKeys should have size 1 + primaryKeys(0) shouldEqual "value1" + } + + test("should use the primary key value when the field path is just _key and the key payload is a primitive LONG") { + //key payload is a primitive long + val key = 123L + val keySchema = Schema.INT64_SCHEMA + + val kcqlValues = KcqlValues( + fields = Seq.empty, + ignoredFields = Seq.empty, + primaryKeysPath = Seq(Vector("_key")), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val schema = SchemaBuilder.struct() + .field("field1", Schema.STRING_SCHEMA) + .field("field2", Schema.INT32_SCHEMA) + .build() + + val struct = new Struct(schema) + .put("field1", "value1") + .put("field2", 2) + + val result = TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = struct, + withStructure = false, + keySchema = keySchema, + key = key, + headers = new ConnectHeaders(), + ) + + val (transformedJsonOpt, primaryKeys) = result + primaryKeys should have size 1 + primaryKeys(0) shouldEqual 123L + } + test("should use the primary key value when the field path is just _key and the key payload is a primitive STRONG") { + //key payload is a primitive string + val key = "keyValue" + val keySchema = Schema.STRING_SCHEMA + + val kcqlValues = KcqlValues( + fields = Seq.empty, + ignoredFields = Seq.empty, + primaryKeysPath = Seq(Vector("_key")), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val schema = SchemaBuilder.struct() + .field("field1", Schema.STRING_SCHEMA) + .field("field2", Schema.INT32_SCHEMA) + .build() + + val struct = new Struct(schema) + .put("field1", "value1") + .put("field2", 2) + + val result = TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = struct, + withStructure = false, + keySchema = keySchema, + key = key, + headers = new ConnectHeaders(), + ) + + val (transformedJsonOpt, primaryKeys) = result + primaryKeys should have size 1 + primaryKeys(0) shouldEqual "keyValue" + + } + + test("fail when the PK path uses _key.a when the key is only a primitive STRING") { + //key payload is a primitive string + val key = "keyValue" + val keySchema = Schema.STRING_SCHEMA + + val kcqlValues = KcqlValues( + fields = Seq.empty, + ignoredFields = Seq.empty, + primaryKeysPath = Seq(Vector("_key", "a")), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val schema = SchemaBuilder.struct() + .field("field1", Schema.STRING_SCHEMA) + .field("field2", Schema.INT32_SCHEMA) + .build() + + val struct = new Struct(schema) + .put("field1", "value1") + .put("field2", 2) + + val exception = the[IllegalArgumentException] thrownBy { + TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = struct, + withStructure = false, + keySchema = keySchema, + key = key, + headers = new ConnectHeaders(), + ) + } + + exception.getMessage should include("Invalid field selection for '_key.a'") + } + + test("should throw exception when primary key path is invalid") { + + val jsonString = """{"field1": {"nestedField": "value1"}}""" + val value = jsonString + val schema = Schema.STRING_SCHEMA + + val kcqlValues = KcqlValues( + fields = Seq( + Field(name = "field1", alias = "field1", parents = Vector.empty), + ), + ignoredFields = Seq.empty, + primaryKeysPath = Seq(Vector("_value", "field1", "nonexistentField")), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val exception = intercept[IllegalArgumentException] { + TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = value, + withStructure = false, + keySchema = null, + key = null, + headers = new ConnectHeaders(), + ) + } + + exception.getMessage should include("Can't find nonexistentField field") + } + + test("should return the path when the message _key is involved") { + + val jsonString = """{"field1": "value1"}""" + val value = jsonString + val schema = Schema.STRING_SCHEMA + + val keyJsonString = """{"keyField": "keyValue"}""" + val key = keyJsonString + val keySchema = Schema.STRING_SCHEMA + + val headers = new ConnectHeaders() + headers.addString("headerKey", "headerValue") + + val kcqlValues = KcqlValues( + fields = Seq( + Field(name = "field1", alias = "field1", parents = Vector.empty), + ), + ignoredFields = Seq.empty, + primaryKeysPath = Seq( + Vector("_key", "keyField"), + ), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val result = TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = value, + withStructure = false, + keySchema = keySchema, + key = key, + headers = headers, + ) + + val (transformedJsonOpt, primaryKeys) = result + + transformedJsonOpt shouldBe defined + val transformedJson = transformedJsonOpt.get + + transformedJson.get("field1").asText() shouldEqual "value1" + + primaryKeys should have size 1 + primaryKeys(0) shouldEqual "keyValue" // Extracted from _key.keyField + } + test("return the primary key when the _key path is involved and the path is 2 levels deep") { + + val jsonString = """{"field1": {"nestedField": "value1"}}""" + val value = jsonString + val schema = Schema.STRING_SCHEMA + + val keyJsonString = """{"keyField": "keyValue"}""" + val key = keyJsonString + val keySchema = Schema.STRING_SCHEMA + + val headers = new ConnectHeaders() + headers.addString("headerKey", "headerValue") + + val kcqlValues = KcqlValues( + fields = Seq( + Field(name = "field1", alias = "field1", parents = Vector.empty), + ), + ignoredFields = Seq.empty, + primaryKeysPath = Seq( + Vector("_key", "keyField"), + ), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val result = TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = value, + withStructure = false, + keySchema = keySchema, + key = key, + headers = headers, + ) + + val (transformedJsonOpt, primaryKeys) = result + + transformedJsonOpt shouldBe defined + val transformedJson = transformedJsonOpt.get + + transformedJson.get("field1").get("nestedField").asText() shouldEqual "value1" + + primaryKeys should have size 1 + primaryKeys(0) shouldEqual "keyValue" + } + + test("returns the primary key from a header entry") { + val jsonString = """{"field1": "value1"}""" + val value = jsonString + val schema = Schema.STRING_SCHEMA + + val keyJsonString = """{"keyField": "keyValue"}""" + val key = keyJsonString + val keySchema = Schema.STRING_SCHEMA + + val headers = new ConnectHeaders() + headers.addString("headerKey", "headerValue") + + val kcqlValues = KcqlValues( + fields = Seq( + Field(name = "field1", alias = "field1", parents = Vector.empty), + ), + ignoredFields = Seq.empty, + primaryKeysPath = Seq( + Vector("_header", "headerKey"), + ), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val result = TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = value, + withStructure = false, + keySchema = keySchema, + key = key, + headers = headers, + ) + + val (transformedJsonOpt, primaryKeys) = result + + transformedJsonOpt shouldBe defined + val transformedJson = transformedJsonOpt.get + + transformedJson.get("field1").asText() shouldEqual "value1" + + primaryKeys should have size 1 + primaryKeys(0) shouldEqual "headerValue" + } + + test("fail when primary key path involves a missing header key") { + val jsonString = """{"field1": "value1"}""" + val value = jsonString + val schema = Schema.STRING_SCHEMA + + val keyJsonString = """{"keyField": "keyValue"}""" + val key = keyJsonString + val keySchema = Schema.STRING_SCHEMA + + val headers = new ConnectHeaders() + headers.addString("headerKey", "headerValue") + + val kcqlValues = KcqlValues( + fields = Seq( + Field(name = "field1", alias = "field1", parents = Vector.empty), + ), + ignoredFields = Seq.empty, + primaryKeysPath = Seq( + Vector("_header", "missingHeader"), + ), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val exception = intercept[IllegalArgumentException] { + TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = value, + withStructure = false, + keySchema = keySchema, + key = key, + headers = headers, + ) + } + + exception.getMessage should include("Header with key 'missingHeader' not found") + } + test("fail when the header key has a null value") { + val jsonString = """{"field1": "value1"}""" + val value = jsonString + val schema = Schema.STRING_SCHEMA + + val keyJsonString = """{"keyField": "keyValue"}""" + val key = keyJsonString + val keySchema = Schema.STRING_SCHEMA + + val headers = new ConnectHeaders() + headers.addString("headerKey", null) + + val kcqlValues = KcqlValues( + fields = Seq( + Field(name = "field1", alias = "field1", parents = Vector.empty), + ), + ignoredFields = Seq.empty, + primaryKeysPath = Seq( + Vector("_header", "headerKey"), + ), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val exception = intercept[IllegalArgumentException] { + TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = value, + withStructure = false, + keySchema = keySchema, + key = key, + headers = headers, + ) + } + + exception.getMessage should include("Header 'headerKey' has a null value") + } + test("fail when the primary key path uses a nested header key") { + val jsonString = """{"field1": "value1"}""" + val value = jsonString + val schema = Schema.STRING_SCHEMA + + val keyJsonString = """{"keyField": "keyValue"}""" + val key = keyJsonString + val keySchema = Schema.STRING_SCHEMA + + val headers = new ConnectHeaders() + headers.addString("headerKey", "headerValue") + + val kcqlValues = KcqlValues( + fields = Seq( + Field(name = "field1", alias = "field1", parents = Vector.empty), + ), + ignoredFields = Seq.empty, + primaryKeysPath = Seq( + Vector("_header", "headerKey", "nonexistentField"), + ), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val exception = intercept[IllegalArgumentException] { + TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = value, + withStructure = false, + keySchema = keySchema, + key = key, + headers = headers, + ) + } + + exception.getMessage shouldBe "Invalid field selection for '_header.headerKey.nonexistentField'. Headers lookup only supports single-level keys. Nested header keys are not supported." + } + + //this is not a header key path test + test("fail when primary key path uses a key path which does not exists") { + val jsonString = """{"field1": "value1"}""" + val value = jsonString + val schema = Schema.STRING_SCHEMA + + val keyJsonString = """{"keyField": "keyValue"}""" + val key = keyJsonString + val keySchema = Schema.STRING_SCHEMA + + val headers = new ConnectHeaders() + headers.addString("headerKey", "headerValue") + + val kcqlValues = KcqlValues( + fields = Seq( + Field(name = "field1", alias = "field1", parents = Vector.empty), + ), + ignoredFields = Seq.empty, + primaryKeysPath = Seq( + Vector("_key", "nonexistentField"), + ), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val exception = intercept[IllegalArgumentException] { + TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = value, + withStructure = false, + keySchema = keySchema, + key = key, + headers = headers, + ) + } + + exception.getMessage shouldBe "Invalid field selection for '_key.nonexistentField'. Can't find nonexistentField field. Field found are:keyField" + } +} diff --git a/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/ElasticJsonWriter.scala b/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/ElasticJsonWriter.scala index 37e0373c3..7841c8dcf 100644 --- a/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/ElasticJsonWriter.scala +++ b/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/ElasticJsonWriter.scala @@ -140,10 +140,12 @@ class ElasticJsonWriter(client: KElasticClient, settings: ElasticSettings) (Transform(kcqlValue.fields, r.valueSchema(), r.value(), kcql.hasRetainStructure), Seq.empty) } else { TransformAndExtractPK(kcqlValue, - kcqlValue.primaryKeysPath, r.valueSchema(), r.value(), kcql.hasRetainStructure, + r.keySchema(), + r.key(), + r.headers(), ) } val idFromPk = pks.mkString(settings.pkJoinerSeparator) diff --git a/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/JsonPayloadExtractor.scala b/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/JsonPayloadExtractor.scala new file mode 100644 index 000000000..3195cf4c4 --- /dev/null +++ b/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/JsonPayloadExtractor.scala @@ -0,0 +1,208 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.elastic7 + +import cats.implicits.catsSyntaxEitherId +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node._ +import io.lenses.json.sql.JacksonJson +import io.lenses.streamreactor.connect.json.SimpleJsonConverter +import org.apache.kafka.connect.data.Struct +import org.apache.kafka.connect.data.Schema + +import java.nio.ByteBuffer +import scala.jdk.CollectionConverters.MapHasAsScala +import scala.jdk.CollectionConverters.CollectionHasAsScala +import scala.util.Try +import scala.util.Success +import scala.util.Failure + +object JsonPayloadExtractor { + lazy val simpleJsonConverter = new SimpleJsonConverter() + + def extractJsonNode(value: Any, schema: Schema): Either[String, Option[JsonNode]] = + (Option(value), Option(schema).map(_.`type`())) match { + case (None, _) => Right(None) + case (Some(_), Some(Schema.Type.BYTES)) => handleBytes(value) + case (Some(_), Some(Schema.Type.STRING)) => handleString(value) + case (Some(_), Some(Schema.Type.INT8)) => handleLong(value) + case (Some(_), Some(Schema.Type.INT16)) => handleLong(value) + case (Some(_), Some(Schema.Type.INT32)) => handleLong(value) + case (Some(_), Some(Schema.Type.INT64)) => handleLong(value) + case (Some(_), Some(Schema.Type.FLOAT32)) => handleFloat(value) + case (Some(_), Some(Schema.Type.FLOAT64)) => handleDouble(value) + case (Some(_), Some(Schema.Type.STRUCT)) => handleStruct(value) + case (Some(_), Some(Schema.Type.BOOLEAN)) => handleBoolean(value) + case (Some(_), Some(Schema.Type.ARRAY)) => handleArray(value) + case (Some(_), Some(Schema.Type.MAP)) => handleMap(value) + case (Some(_), Some(other)) => Left(s"Unsupported Schema type: $other") + case (Some(v), None) => handleSchemaLess(v) + } + + private def handleArray(value: Any): Either[String, Option[JsonNode]] = + value match { + case l: Iterable[_] => + val arrayNode = JacksonJson.mapper.createArrayNode() + l.foreach { item => + extractJsonNode(item, null) match { + case Right(Some(node)) => arrayNode.add(node) + case Right(None) => // ignore + case Left(err) => return Left(err) + } + } + Right(Some(arrayNode)) + + case jc: java.util.Collection[_] => + val arrayNode = JacksonJson.mapper.createArrayNode() + jc.asScala.foreach { item => + extractJsonNode(item, null) match { + case Right(Some(node)) => arrayNode.add(node) + case Right(None) => // ignore + case Left(err) => return Left(err) + } + } + Right(Some(arrayNode)) + case a: Array[_] => + val arrayNode = JacksonJson.mapper.createArrayNode() + a.foreach { item => + extractJsonNode(item, null) match { + case Right(Some(node)) => arrayNode.add(node) + case Right(None) => // ignore + case Left(err) => return Left(err) + } + } + Right(Some(arrayNode)) + case other => Left(s"Expected array but got: $other") + } + + private def handleMap(value: Any): Either[String, Option[JsonNode]] = + value match { + case m: java.util.Map[_, _] => + val map = m.asInstanceOf[java.util.Map[String, Any]] + val mapNode = JacksonJson.mapper.createObjectNode() + map.asScala.foreach { + case (key, value) => + extractJsonNode(value, null) match { + case Right(Some(node)) => mapNode.set(key, node) + case Right(None) => // ignore + case Left(err) => return Left(err) + } + } + Right(Some(mapNode)) + case other => Left(s"Expected map but got: $other") + } + private def handleBoolean(value: Any): Either[String, Option[JsonNode]] = + value match { + case b: Boolean => Some(BooleanNode.valueOf(b)).asRight[String] + case other => Left(s"Expected boolean but got: $other") + } + private def handleDouble(value: Any): Either[String, Option[JsonNode]] = + value match { + case f: Float => Some(DoubleNode.valueOf(f.toDouble)).asRight[String] + case d: Double => Some(DoubleNode.valueOf(d)).asRight[String] + case other => Left(s"Expected double but got: $other") + } + + private def handleFloat(value: Any): Either[String, Option[JsonNode]] = + value match { + case f: Float => Some(FloatNode.valueOf(f)).asRight[String] + case d: Double => Some(FloatNode.valueOf(d.toFloat)).asRight[String] + case other => Left(s"Expected float but got: $other") + } + + private def handleLong(value: Any): Either[String, Option[JsonNode]] = + value match { + case b: Byte => Some(LongNode.valueOf(b.toLong)).asRight[String] + case s: Short => Some(LongNode.valueOf(s.toLong)).asRight[String] + case i: Int => Some(LongNode.valueOf(i.toLong)).asRight[String] + case l: Long => Some(LongNode.valueOf(l)).asRight[String] + case other => Left(s"Expected long but got: $other") + } + + private def handleBytes(value: Any): Either[String, Option[JsonNode]] = + value match { + case bytes: Array[Byte] => + tryReadJson(bytes).map(Some(_)) + case byteBuffer: ByteBuffer => + val bytes = new Array[Byte](byteBuffer.remaining()) + byteBuffer.get(bytes) + tryReadJson(bytes).map(Some(_)) + case other => Left(s"Expected byte array or ByteBuffer but got: $other") + } + + private def handleString(value: Any): Either[String, Option[JsonNode]] = + value match { + case s: String => + tryParseJson(s).map(Some(_)) match { + case Left(_) => TextNode.valueOf(s).asRight[String].map(Some(_)) + case r => r + } + case other => Left(s"Expected string but got: $other") + } + + private def handleStruct(value: Any): Either[String, Option[JsonNode]] = + value match { + case struct: Struct => + Try(simpleJsonConverter.fromConnectData(struct.schema(), struct)) match { + case Success(jsonNode) => Right(Some(jsonNode)) + case Failure(e) => Left(s"Failed to convert Struct to JsonNode: ${e.getMessage}") + } + case other => Left(s"Expected Struct but got: $other") + } + + private def handleSchemaLess(value: Any): Either[String, Option[JsonNode]] = + value match { + case m: java.util.Map[_, _] => + Try { + val map = m.asInstanceOf[java.util.Map[String, Any]] + JacksonJson.mapper.valueToTree[JsonNode](map) + } match { + case Success(node) => Right(Some(node)) + case Failure(e) => Left(s"Failed to convert Map to JsonNode: ${e.getMessage}") + } + + case s: String => + tryParseJson(s).map(Some(_)) match { + case Left(_) => TextNode.valueOf(s).asRight[String].map(Some(_)) + case r => r + } + case b: Array[Byte] => tryReadJson(b).map(Some(_)) + case b: Byte => IntNode.valueOf(b.toInt).asRight[String].map(Some(_)) + case s: Short => IntNode.valueOf(s.toInt).asRight[String].map(Some(_)) + case i: Int => IntNode.valueOf(i).asRight[String].map(Some(_)) + case l: Long => LongNode.valueOf(l).asRight[String].map(Some(_)) + case f: Float => FloatNode.valueOf(f).asRight[String].map(Some(_)) + case double: Double => DoubleNode.valueOf(double).asRight[String].map(Some(_)) + case bigDecimal: BigDecimal => DecimalNode.valueOf(bigDecimal.bigDecimal).asRight[String].map(Some(_)) + case bigDecimal: java.math.BigDecimal => DecimalNode.valueOf(bigDecimal).asRight[String].map(Some(_)) + case boolean: Boolean => BooleanNode.valueOf(boolean).asRight[String].map(Some(_)) + case bi: BigInt => BigIntegerNode.valueOf(bi.bigInteger).asRight[String].map(Some(_)) + case bi: java.math.BigInteger => BigIntegerNode.valueOf(bi).asRight[String].map(Some(_)) + case other => Left(s"Unsupported value type: ${other.getClass.getName}") + } + + private def tryParseJson(str: String): Either[String, JsonNode] = + Try(JacksonJson.asJson(str)) match { + case Success(json) => Right(json) + case Failure(e) => Left(s"Invalid JSON string: ${e.getMessage}") + } + + private def tryReadJson(bytes: Array[Byte]): Either[String, JsonNode] = + Try(JacksonJson.mapper.readTree(bytes)) match { + case Success(json) => Right(json) + case Failure(e) => Left(s"Invalid JSON bytes: ${e.getMessage}") + } +} diff --git a/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/KafkaMessageParts.scala b/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/KafkaMessageParts.scala new file mode 100644 index 000000000..402429c8d --- /dev/null +++ b/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/KafkaMessageParts.scala @@ -0,0 +1,22 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.elastic7 + +object KafkaMessageParts { + val Key = "_key" + val Value = "_value" + val Header = "_header" +} diff --git a/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/PrimaryKeyExtractor.scala b/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/PrimaryKeyExtractor.scala index 5b54ff38e..46c8d4839 100644 --- a/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/PrimaryKeyExtractor.scala +++ b/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/PrimaryKeyExtractor.scala @@ -25,13 +25,13 @@ import scala.jdk.CollectionConverters.IteratorHasAsScala import scala.jdk.CollectionConverters.ListHasAsScala object PrimaryKeyExtractor { - def extract(node: JsonNode, path: Vector[String]): Any = { + def extract(node: JsonNode, path: Vector[String], prefix: String = ""): Any = { @tailrec def innerExtract(n: JsonNode, p: Vector[String]): Any = { def checkValidPath(): Unit = if (p.nonEmpty) { throw new IllegalArgumentException( - s"Invalid field selection for '${path.mkString(".")}'. It doesn't resolve to a primitive field", + s"Invalid field selection for '$prefix${path.mkString(".")}'. It doesn't resolve to a primitive field", ) } @@ -79,24 +79,24 @@ object PrimaryKeyExtractor { case node: ObjectNode => if (p.isEmpty) { throw new IllegalArgumentException( - s"Invalid field selection for '${path.mkString(".")}'. The path is not resolving to a primitive field", + s"Invalid field selection for '$prefix${path.mkString(".")}'. The path is not resolving to a primitive field", ) } val childNode = Option(node.get(p.head)).getOrElse { throw new IllegalArgumentException( - s"Invalid field selection for '${path.mkString(".")}'. Can't find ${p.head} field. Field found are:${node.fieldNames().asScala.mkString(",")}", + s"Invalid field selection for '$prefix${path.mkString(".")}'. Can't find ${p.head} field. Field found are:${node.fieldNames().asScala.mkString(",")}", ) } innerExtract(childNode, p.tail) case _: ArrayNode => throw new IllegalArgumentException( - s"Invalid field selection for '${path.mkString(".")}'. The path is involving an array structure", + s"Invalid field selection for '$prefix${path.mkString(".")}'. The path is involving an array structure", ) case other => throw new IllegalArgumentException( - s"Invalid field selection for '${path.mkString(".")}'. $other is not handled", + s"Invalid field selection for '$prefix${path.mkString(".")}'. $other is not handled", ) } } diff --git a/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/TransformAndExtractPK.scala b/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/TransformAndExtractPK.scala index 70a6be24b..1dfadb166 100644 --- a/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/TransformAndExtractPK.scala +++ b/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/TransformAndExtractPK.scala @@ -17,14 +17,11 @@ package io.lenses.streamreactor.connect.elastic7 import com.fasterxml.jackson.databind.JsonNode import com.typesafe.scalalogging.StrictLogging -import io.lenses.connect.sql.StructSql._ -import io.lenses.json.sql.JacksonJson import io.lenses.json.sql.JsonSql._ import io.lenses.streamreactor.connect.json.SimpleJsonConverter import org.apache.kafka.connect.data.Schema -import org.apache.kafka.connect.data.Struct +import org.apache.kafka.connect.header.Headers -import java.nio.ByteBuffer import scala.util.Failure import scala.util.Success import scala.util.Try @@ -33,94 +30,88 @@ private object TransformAndExtractPK extends StrictLogging { lazy val simpleJsonConverter = new SimpleJsonConverter() def apply( - kcqlValues: KcqlValues, - primaryKeysPaths: Seq[Vector[String]], - schema: Schema, - value: Any, - withStructure: Boolean, - ): (Option[JsonNode], Seq[Any]) = { - def raiseException(msg: String, t: Throwable) = throw new IllegalArgumentException(msg, t) - + kcqlValues: KcqlValues, + schema: Schema, + value: Any, + withStructure: Boolean, + keySchema: Schema, + key: Any, + headers: Headers, + ): (Option[JsonNode], Seq[Any]) = if (value == null) { (None, Seq.empty) } else { - if (schema != null) { - schema.`type`() match { - case Schema.Type.BYTES => - //we expected to be json - val array = value match { - case a: Array[Byte] => a - case b: ByteBuffer => b.array() - case other => raiseException(s"Invalid payload:$other for schema Schema.BYTES.", null) - } + val result = for { + jsonNode <- extractJsonNode(value, schema) + transformedJson = jsonNode.sql(kcqlValues.fields, !withStructure) + keyJsonNodeOpt: Option[JsonNode] <- if (hasKeyFieldPath(kcqlValues.primaryKeysPath)) + extractOptionalJsonNode(key, keySchema) + else Try(Option.empty[JsonNode]) + } yield { + val primaryKeys = kcqlValues.primaryKeysPath.map { path => + extractPrimaryKey(path, jsonNode, keyJsonNodeOpt, headers) + } + (Option(transformedJson), primaryKeys) + } - Try(JacksonJson.mapper.readTree(array)) match { - case Failure(e) => raiseException("Invalid json.", e) - case Success(json) => - Try(json.sql(kcqlValues.fields, !withStructure)) match { - case Failure(e) => raiseException(s"A KCQL exception occurred. ${e.getMessage}", e) - case Success(jn) => - (Option(jn), primaryKeysPaths.map(PrimaryKeyExtractor.extract(json, _))) - } - } + result match { + case Success(value) => value + case Failure(e) => throw e + } + } - case Schema.Type.STRING => - //we expected to be json - Try(JacksonJson.asJson(value.asInstanceOf[String])) match { - case Failure(e) => raiseException("Invalid json", e) - case Success(json) => - Try(json.sql(kcqlValues.fields, !withStructure)) match { - case Success(jn) => (Option(jn), primaryKeysPaths.map(PrimaryKeyExtractor.extract(json, _))) - case Failure(e) => raiseException(s"A KCQL exception occurred.${e.getMessage}", e) - } - } + private def hasKeyFieldPath(paths: Seq[Vector[String]]): Boolean = + paths.exists(_.head == KafkaMessageParts.Key) - case Schema.Type.STRUCT => - val struct = value.asInstanceOf[Struct] - Try(struct.sql(kcqlValues.fields, !withStructure)) match { - case Success(s) => - (Option(simpleJsonConverter.fromConnectData(s.schema(), s)), - primaryKeysPaths.map(PrimaryKeyExtractor.extract(struct, _)), - ) + private def extractJsonNode(value: Any, schema: Schema): Try[JsonNode] = + JsonPayloadExtractor.extractJsonNode(value, schema) match { + case Left(error) => Failure(new IllegalArgumentException(error)) + case Right(Some(node)) => Success(node) + case Right(None) => Failure(new IllegalArgumentException("Failed to extract JsonNode from value")) + } - case Failure(e) => raiseException(s"A KCQL error occurred.${e.getMessage}", e) - } + private def extractOptionalJsonNode(value: Any, schema: Schema): Try[Option[JsonNode]] = + if (value == null) Success(None) + else { + JsonPayloadExtractor.extractJsonNode(value, schema) match { + case Left(error) => Failure(new IllegalArgumentException(error)) + case Right(nodeOpt) => Success(nodeOpt) + } + } - case other => raiseException(s"Can't transform Schema type:$other.", null) + private def extractPrimaryKey( + path: Vector[String], + jsonNode: JsonNode, + keyJsonNodeOpt: Option[JsonNode], + headers: Headers, + ): Any = + path.head match { + case KafkaMessageParts.Key => + keyJsonNodeOpt match { + case Some(keyNode) => PrimaryKeyExtractor.extract(keyNode, path.tail, "_key.") + case None => + throw new IllegalArgumentException( + s"Key path '${path.mkString(".")}' has a null value", + ) } - } else { - //we can handle java.util.Map (this is what JsonConverter can spit out) - value match { - case m: java.util.Map[_, _] => - val map = m.asInstanceOf[java.util.Map[String, Any]] - val jsonNode: JsonNode = JacksonJson.mapper.valueToTree[JsonNode](map) - Try(jsonNode.sql(kcqlValues.fields, !withStructure)) match { - case Success(j) => (Option(j), primaryKeysPaths.map(PrimaryKeyExtractor.extract(jsonNode, _))) - case Failure(e) => raiseException(s"A KCQL exception occurred.${e.getMessage}", e) - } - case s: String => - Try(JacksonJson.asJson(s)) match { - case Failure(e) => raiseException("Invalid json.", e) - case Success(json) => - Try(json.sql(kcqlValues.fields, !withStructure)) match { - case Success(jn) => (Option(jn), primaryKeysPaths.map(PrimaryKeyExtractor.extract(json, _))) - case Failure(e) => raiseException(s"A KCQL exception occurred.${e.getMessage}", e) - } - } - - case b: Array[Byte] => - Try(JacksonJson.mapper.readTree(b)) match { - case Failure(e) => raiseException("Invalid json.", e) - case Success(json) => - Try(json.sql(kcqlValues.fields, !withStructure)) match { - case Failure(e) => raiseException(s"A KCQL exception occurred. ${e.getMessage}", e) - case Success(jn) => (Option(jn), primaryKeysPaths.map(PrimaryKeyExtractor.extract(json, _))) - } + case KafkaMessageParts.Value => + PrimaryKeyExtractor.extract(jsonNode, path.tail) + case KafkaMessageParts.Header => + if (path.tail.size != 1) { + throw new IllegalArgumentException( + s"Invalid field selection for '${path.mkString(".")}'. " + + s"Headers lookup only supports single-level keys. Nested header keys are not supported.", + ) + } + headers.lastWithName(path.tail.head) match { + case null => throw new IllegalArgumentException(s"Header with key '${path.tail.head}' not found") + case header => header.value() match { + case value: String => value + case null => throw new IllegalArgumentException(s"Header '${path.tail.head}' has a null value") + case _ => throw new IllegalArgumentException(s"Header '${path.tail.head}' is not a string") } - //we take it as String - case other => raiseException(s"Value:$other is not handled!", null) } - } + case _ => + PrimaryKeyExtractor.extract(jsonNode, path) } - } } diff --git a/kafka-connect-elastic7/src/test/scala/io/lenses/streamreactor/connect/elastic7/JsonPayloadExtractorTest.scala b/kafka-connect-elastic7/src/test/scala/io/lenses/streamreactor/connect/elastic7/JsonPayloadExtractorTest.scala new file mode 100644 index 000000000..16ffbe1ae --- /dev/null +++ b/kafka-connect-elastic7/src/test/scala/io/lenses/streamreactor/connect/elastic7/JsonPayloadExtractorTest.scala @@ -0,0 +1,188 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.elastic7 + +import com.fasterxml.jackson.databind.node.TextNode +import org.apache.kafka.connect.data.Schema +import org.apache.kafka.connect.data.SchemaBuilder +import org.apache.kafka.connect.data.Struct +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers +import org.scalatest.EitherValues +import org.scalatest.OptionValues + +import java.nio.ByteBuffer +import java.util.{ HashMap => JHashMap } + +class JsonPayloadExtractorTest extends AnyFunSuite with Matchers with EitherValues with OptionValues { + + test("handle null values") { + val result = JsonPayloadExtractor.extractJsonNode(null, null) + result shouldBe Right(None) + } + + test("handle string JSON with schema") { + val jsonString = """{"name": "test"}""" + val schema = Schema.STRING_SCHEMA + val result = JsonPayloadExtractor.extractJsonNode(jsonString, schema) + + result.value.value.get("name").asText() shouldBe "test" + } + + test("primitive string is read as TextNode") { + val invalidJson = "invalid json" + val schema = Schema.STRING_SCHEMA + val result = JsonPayloadExtractor.extractJsonNode(invalidJson, schema) + + result.value.value shouldBe a[TextNode] + result.value.value.asText() shouldBe invalidJson + } + + test("handle byte array with schema") { + val jsonString = """{"name": "test"}""" + val bytes = jsonString.getBytes("UTF-8") + val schema = Schema.BYTES_SCHEMA + val result = JsonPayloadExtractor.extractJsonNode(bytes, schema) + + result.value.value.get("name").asText() shouldBe "test" + } + + test("handle ByteBuffer with schema") { + val jsonString = """{"name": "test"}""" + val byteBuffer = ByteBuffer.wrap(jsonString.getBytes("UTF-8")) + val schema = Schema.BYTES_SCHEMA + val result = JsonPayloadExtractor.extractJsonNode(byteBuffer, schema) + + result.value.value.get("name").asText() shouldBe "test" + } + + test("handle Struct with schema") { + val schema = SchemaBuilder.struct() + .field("name", Schema.STRING_SCHEMA) + .field("age", Schema.INT32_SCHEMA) + .build() + + val struct = new Struct(schema) + .put("name", "John") + .put("age", 30) + + val result = JsonPayloadExtractor.extractJsonNode(struct, schema) + + val jsonNode = result.value.value + jsonNode.get("name").asText() shouldBe "John" + jsonNode.get("age").asInt() shouldBe 30 + } + + test("handle Map without schema") { + val javaMap = new JHashMap[String, Any]() + javaMap.put("name", "test") + javaMap.put("value", Integer.valueOf(42)) + + val result = JsonPayloadExtractor.extractJsonNode(javaMap, null) + + val jsonNode = result.value.value + jsonNode.get("name").asText() shouldBe "test" + jsonNode.get("value").asInt() shouldBe 42 + } + + test("handle nested Map without schema") { + val innerMap = new JHashMap[String, Any]() + innerMap.put("age", Integer.valueOf(25)) + innerMap.put("city", "New York") + + val outerMap = new JHashMap[String, Any]() + outerMap.put("name", "test") + outerMap.put("details", innerMap) + + val result = JsonPayloadExtractor.extractJsonNode(outerMap, null) + + val jsonNode = result.value.value + jsonNode.get("name").asText() shouldBe "test" + jsonNode.get("details").get("age").asInt() shouldBe 25 + jsonNode.get("details").get("city").asText() shouldBe "New York" + } + + test("handle string JSON without schema") { + val jsonString = """{"name": "test"}""" + val result = JsonPayloadExtractor.extractJsonNode(jsonString, null) + + result.value.value.get("name").asText() shouldBe "test" + } + + test("handle byte array without schema") { + val jsonString = """{"name": "test"}""" + val bytes = jsonString.getBytes("UTF-8") + val result = JsonPayloadExtractor.extractJsonNode(bytes, null) + + result.value.value.get("name").asText() shouldBe "test" + } + + test("handle complex nested JSON") { + val jsonString = """ + { + "name": "test", + "details": { + "age": 30, + "address": { + "street": "123 Main St", + "city": "Test City" + }, + "hobbies": ["reading", "coding"] + } + } + """ + val result = JsonPayloadExtractor.extractJsonNode(jsonString, Schema.STRING_SCHEMA) + + val jsonNode = result.value.value + jsonNode.get("name").asText() shouldBe "test" + jsonNode.get("details").get("age").asInt() shouldBe 30 + jsonNode.get("details").get("address").get("street").asText() shouldBe "123 Main St" + jsonNode.get("details").get("hobbies").get(0).asText() shouldBe "reading" + } + + test("handle boolean value type") { + val result = JsonPayloadExtractor.extractJsonNode(true, Schema.BOOLEAN_SCHEMA) + + result.value.value.asBoolean() shouldBe true + } + + test("handle int value type") { + val result = JsonPayloadExtractor.extractJsonNode(42, null) + + result.value.value.asInt() shouldBe 42 + } + + test("handle invalid type for schema") { + val result = JsonPayloadExtractor.extractJsonNode(42, Schema.STRING_SCHEMA) + + result.left.value should include("Expected string but got") + } + + test("handle invalid struct type") { + val schema = SchemaBuilder.struct() + .field("name", Schema.STRING_SCHEMA) + .build() + + val result = JsonPayloadExtractor.extractJsonNode("not a struct", schema) + + result.left.value should include("Expected Struct but got") + } + + test("handle invalid bytes type") { + val result = JsonPayloadExtractor.extractJsonNode("not bytes", Schema.BYTES_SCHEMA) + result.left.value should include("Expected byte array or ByteBuffer") + } +} diff --git a/kafka-connect-elastic7/src/test/scala/io/lenses/streamreactor/connect/elastic7/PrimaryKeyExtractorTest.scala b/kafka-connect-elastic7/src/test/scala/io/lenses/streamreactor/connect/elastic7/PrimaryKeyExtractorTest.scala new file mode 100644 index 000000000..62c18f7fc --- /dev/null +++ b/kafka-connect-elastic7/src/test/scala/io/lenses/streamreactor/connect/elastic7/PrimaryKeyExtractorTest.scala @@ -0,0 +1,268 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.elastic7 + +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.ObjectMapper +import org.apache.kafka.connect.data._ +import scala.jdk.CollectionConverters._ +import org.apache.kafka.connect.errors.ConnectException + +class PrimaryKeyExtractorTest extends AnyFunSuite with Matchers { + def parseJson(jsonString: String): JsonNode = { + val mapper = new ObjectMapper() + mapper.readTree(jsonString) + } + + test("extract should retrieve a primitive value from JsonNode") { + val jsonString = """{"field": "value"}""" + val jsonNode = parseJson(jsonString) + val path = Vector("field") + + val result = PrimaryKeyExtractor.extract(jsonNode, path) + result shouldEqual "value" + } + + test("extract should retrieve a nested primitive value from JsonNode") { + val jsonString = """{"parent": {"child": 123}}""" + val jsonNode = parseJson(jsonString) + val path = Vector("parent", "child") + + val result = PrimaryKeyExtractor.extract(jsonNode, path) + result shouldEqual 123 + } + + test("extract should throw exception if path does not exist in JsonNode") { + val jsonString = """{"field": "value"}""" + val jsonNode = parseJson(jsonString) + val path = Vector("nonexistent") + + val exception = intercept[IllegalArgumentException] { + PrimaryKeyExtractor.extract(jsonNode, path) + } + exception.getMessage should include("Can't find nonexistent field") + } + + test("extract should throw exception if path leads to non-primitive field in JsonNode") { + val jsonString = """{"field": {"subfield": "value"}}""" + val jsonNode = parseJson(jsonString) + val path = Vector("field") + + val exception = intercept[IllegalArgumentException] { + PrimaryKeyExtractor.extract(jsonNode, path) + } + exception.getMessage should include("The path is not resolving to a primitive field") + } + + test("extract should throw exception when encountering an array in JsonNode") { + val jsonString = """{"field": [1, 2, 3]}""" + val jsonNode = parseJson(jsonString) + val path = Vector("field") + + val exception = intercept[IllegalArgumentException] { + PrimaryKeyExtractor.extract(jsonNode, path) + } + exception.getMessage should include("The path is involving an array structure") + } + + test("extract should handle various primitive types in JsonNode") { + val jsonString = + """ + |{ + | "stringField": "text", + | "intField": 42, + | "floatField": 3.14, + | "booleanField": true, + | "nullField": null + |} + |""".stripMargin + val jsonNode = parseJson(jsonString) + + PrimaryKeyExtractor.extract(jsonNode, Vector("stringField")) shouldEqual "text" + PrimaryKeyExtractor.extract(jsonNode, Vector("intField")) shouldEqual 42 + PrimaryKeyExtractor.extract(jsonNode, Vector("floatField")) shouldEqual 3.14 + PrimaryKeyExtractor.extract(jsonNode, Vector("booleanField")) shouldEqual true + PrimaryKeyExtractor.extract(jsonNode, Vector("nullField")).asInstanceOf[AnyRef] shouldBe null + } + + test("extract should retrieve a primitive value from Struct") { + val schema = SchemaBuilder.struct() + .field("field", Schema.STRING_SCHEMA) + .build() + + val struct = new Struct(schema) + .put("field", "value") + + val path = Vector("field") + + val result = PrimaryKeyExtractor.extract(struct, path) + result shouldEqual "value" + } + + test("extract should retrieve a nested primitive value from Struct") { + val nestedSchema = SchemaBuilder.struct() + .field("child", Schema.INT32_SCHEMA) + .build() + + val schema = SchemaBuilder.struct() + .field("parent", nestedSchema) + .build() + + val nestedStruct = new Struct(nestedSchema) + .put("child", 123) + + val struct = new Struct(schema) + .put("parent", nestedStruct) + + val path = Vector("parent", "child") + + val result = PrimaryKeyExtractor.extract(struct, path) + result shouldEqual 123 + } + + test("extract should throw exception if path does not exist in Struct") { + val schema = SchemaBuilder.struct() + .field("field", Schema.STRING_SCHEMA) + .build() + + val struct = new Struct(schema) + .put("field", "value") + + val path = Vector("nonexistent") + + val exception = intercept[IllegalArgumentException] { + PrimaryKeyExtractor.extract(struct, path) + } + exception.getMessage should include("Couldn't find field 'nonexistent'") + } + + test("extract should throw exception if field in Struct is null") { + val schema = SchemaBuilder.struct() + .field("field", Schema.OPTIONAL_STRING_SCHEMA) + .build() + + val struct = new Struct(schema) + .put("field", null) + + val path = Vector("field") + + val exception = intercept[IllegalArgumentException] { + PrimaryKeyExtractor.extract(struct, path) + } + exception.getMessage should include("Field 'field' is null") + } + + test("extract should handle various primitive types in Struct") { + val schema = SchemaBuilder.struct() + .field("stringField", Schema.STRING_SCHEMA) + .field("intField", Schema.INT32_SCHEMA) + .field("floatField", Schema.FLOAT32_SCHEMA) + .field("booleanField", Schema.BOOLEAN_SCHEMA) + .build() + + val struct = new Struct(schema) + .put("stringField", "text") + .put("intField", 42) + .put("floatField", 3.14f) + .put("booleanField", true) + + PrimaryKeyExtractor.extract(struct, Vector("stringField")) shouldEqual "text" + PrimaryKeyExtractor.extract(struct, Vector("intField")) shouldEqual 42 + PrimaryKeyExtractor.extract(struct, Vector("floatField")) shouldEqual 3.14f + PrimaryKeyExtractor.extract(struct, Vector("booleanField")) shouldEqual true + } + + test("extract should handle logical types in Struct (e.g., Date, Time, Timestamp)") { + val dateSchema = Date.SCHEMA + val timeSchema = Time.SCHEMA + val timestampSchema = Timestamp.SCHEMA + + val schema = SchemaBuilder.struct() + .field("dateField", dateSchema) + .field("timeField", timeSchema) + .field("timestampField", timestampSchema) + .build() + + val dateValue = new java.util.Date(1627843200000L) // Aug 1, 2021 + val timeValue = new java.util.Date(3600000L) // 1 hour in milliseconds + val timestampValue = new java.util.Date(1627843200000L) + + val struct = new Struct(schema) + .put("dateField", dateValue) + .put("timeField", timeValue) + .put("timestampField", timestampValue) + + PrimaryKeyExtractor.extract(struct, Vector("dateField")) shouldEqual dateValue + PrimaryKeyExtractor.extract(struct, Vector("timeField")) shouldEqual timeValue + PrimaryKeyExtractor.extract(struct, Vector("timestampField")) shouldEqual timestampValue + } + + test("extract should throw exception when encountering unsupported schema type in Struct") { + val schema = SchemaBuilder.struct() + .field("mapField", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA)) + .build() + + val mapValue = Map("key" -> 123).asJava + + val struct = new Struct(schema) + .put("mapField", mapValue) + + val path = Vector("mapField") + + val exception = intercept[IllegalArgumentException] { + PrimaryKeyExtractor.extract(struct, path) + } + exception.getMessage should include("It doesn't resolve to a primitive field") + } + + test("extract should traverse nested maps in Struct") { + val mapSchema = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build() + + val schema = SchemaBuilder.struct() + .field("mapField", mapSchema) + .build() + + val mapValue = Map("innerKey" -> 123).asJava + + val struct = new Struct(schema) + .put("mapField", mapValue) + + val path = Vector("mapField", "innerKey") + + val result = PrimaryKeyExtractor.extract(struct, path) + result shouldEqual 123 + } + + test("extract should throw exception when encountering unsupported schema type") { + val schema = SchemaBuilder.struct() + .field("arrayField", SchemaBuilder.array(Schema.INT32_SCHEMA)) + .build() + + val arrayValue = List(1, 2, 3).asJava + + val struct = new Struct(schema) + .put("arrayField", arrayValue) + + val path = Vector("arrayField") + + val exception = intercept[ConnectException] { + PrimaryKeyExtractor.extract(struct, path) + } + exception.getMessage should include("ARRAY is not a recognized schema") + } +} diff --git a/kafka-connect-elastic7/src/test/scala/io/lenses/streamreactor/connect/elastic7/TransformAndExtractPKTest.scala b/kafka-connect-elastic7/src/test/scala/io/lenses/streamreactor/connect/elastic7/TransformAndExtractPKTest.scala new file mode 100644 index 000000000..a9994d25d --- /dev/null +++ b/kafka-connect-elastic7/src/test/scala/io/lenses/streamreactor/connect/elastic7/TransformAndExtractPKTest.scala @@ -0,0 +1,601 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.elastic7 + +import io.lenses.sql.Field +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers +import org.apache.kafka.connect.data.Schema +import org.apache.kafka.connect.data.SchemaBuilder +import org.apache.kafka.connect.data.Struct +import org.apache.kafka.connect.header.ConnectHeaders + +class TransformAndExtractPKTest extends AnyFunSuite with Matchers { + // Helper method to create KcqlValues + def createKcqlValues(fields: Seq[(String, String, Vector[String])]): KcqlValues = { + val fieldObjects = fields.map { + case (name, alias, parents) => Field(name, alias, parents) + } + KcqlValues( + fields = fieldObjects, + ignoredFields = Seq.empty, + primaryKeysPath = Seq.empty, + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + } + + test("should return None and empty Seq when value is null") { + val result = TransformAndExtractPK.apply( + kcqlValues = createKcqlValues(Seq.empty), + schema = null, + value = null, + withStructure = false, + keySchema = null, + key = null, + headers = new ConnectHeaders(), + ) + result shouldEqual (None, Seq.empty) + } + + test("should handle valid JSON value and extract primary keys") { + val jsonString = """{"field1": "value1", "field2": 2}""" + val value = jsonString + val schema = Schema.STRING_SCHEMA + + val keyJsonString = """{"keyField": "keyValue"}""" + val key = keyJsonString + val keySchema = Schema.STRING_SCHEMA + + val headers = new ConnectHeaders() + headers.addString("headerKey", "headerValue") + + val kcqlValues = KcqlValues( + fields = Seq( + Field(name = "field1", alias = "field1", parents = Vector.empty), + Field(name = "field2", alias = "field2", parents = Vector.empty), + ), + ignoredFields = Seq.empty, + primaryKeysPath = Seq( + Vector("_value", "field1"), + Vector("_key", "keyField"), + Vector("_header", "headerKey"), + ), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val result = TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = value, + withStructure = false, + keySchema = keySchema, + key = key, + headers = headers, + ) + + val (transformedJsonOpt, primaryKeys) = result + + transformedJsonOpt shouldBe defined + val transformedJson = transformedJsonOpt.get + + transformedJson.get("field1").asText() shouldEqual "value1" + transformedJson.get("field2").asInt() shouldEqual 2 + + primaryKeys should have size 3 + primaryKeys(0) shouldEqual "value1" + primaryKeys(1) shouldEqual "keyValue" + primaryKeys(2) shouldEqual "headerValue" + } + + test("should throw exception when header is missing") { + + val jsonString = """{"field1": "value1"}""" + val value = jsonString + val schema = Schema.STRING_SCHEMA + + val headers = new ConnectHeaders() + + val kcqlValues = KcqlValues( + fields = Seq( + Field(name = "field1", alias = "field1", parents = Vector.empty), + ), + ignoredFields = Seq.empty, + primaryKeysPath = Seq(Vector("_header", "missingHeader")), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val exception = intercept[IllegalArgumentException] { + TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = value, + withStructure = false, + keySchema = null, + key = null, + headers = headers, + ) + } + + exception.getMessage should include("Header with key 'missingHeader' not found") + } + + test("should extract primary key from Struct value") { + + val schema = SchemaBuilder.struct() + .field("field1", Schema.STRING_SCHEMA) + .field("field2", Schema.INT32_SCHEMA) + .build() + + val struct = new Struct(schema) + .put("field1", "value1") + .put("field2", 2) + + val value = struct + + val kcqlValues = KcqlValues( + fields = Seq( + Field(name = "field1", alias = "field1", parents = Vector.empty), + Field(name = "field2", alias = "field2", parents = Vector.empty), + ), + ignoredFields = Seq.empty, + primaryKeysPath = Seq(Vector("_value", "field1")), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val result = TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = value, + withStructure = false, + keySchema = null, + key = null, + headers = new ConnectHeaders(), + ) + + val (transformedJsonOpt, primaryKeys) = result + + transformedJsonOpt shouldBe defined + val transformedJson = transformedJsonOpt.get + + transformedJson.get("field1").asText() shouldEqual "value1" + transformedJson.get("field2").asInt() shouldEqual 2 + + primaryKeys should have size 1 + primaryKeys(0) shouldEqual "value1" + } + + test("should use the primary key value when the field path is just _key and the key payload is a primitive LONG") { + //key payload is a primitive long + val key = 123L + val keySchema = Schema.INT64_SCHEMA + + val kcqlValues = KcqlValues( + fields = Seq.empty, + ignoredFields = Seq.empty, + primaryKeysPath = Seq(Vector("_key")), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val schema = SchemaBuilder.struct() + .field("field1", Schema.STRING_SCHEMA) + .field("field2", Schema.INT32_SCHEMA) + .build() + + val struct = new Struct(schema) + .put("field1", "value1") + .put("field2", 2) + + val result = TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = struct, + withStructure = false, + keySchema = keySchema, + key = key, + headers = new ConnectHeaders(), + ) + + val (transformedJsonOpt, primaryKeys) = result + primaryKeys should have size 1 + primaryKeys(0) shouldEqual 123L + } + test("should use the primary key value when the field path is just _key and the key payload is a primitive STRONG") { + //key payload is a primitive string + val key = "keyValue" + val keySchema = Schema.STRING_SCHEMA + + val kcqlValues = KcqlValues( + fields = Seq.empty, + ignoredFields = Seq.empty, + primaryKeysPath = Seq(Vector("_key")), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val schema = SchemaBuilder.struct() + .field("field1", Schema.STRING_SCHEMA) + .field("field2", Schema.INT32_SCHEMA) + .build() + + val struct = new Struct(schema) + .put("field1", "value1") + .put("field2", 2) + + val result = TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = struct, + withStructure = false, + keySchema = keySchema, + key = key, + headers = new ConnectHeaders(), + ) + + val (transformedJsonOpt, primaryKeys) = result + primaryKeys should have size 1 + primaryKeys(0) shouldEqual "keyValue" + + } + + test("fail when the PK path uses _key.a when the key is only a primitive STRING") { + //key payload is a primitive string + val key = "keyValue" + val keySchema = Schema.STRING_SCHEMA + + val kcqlValues = KcqlValues( + fields = Seq.empty, + ignoredFields = Seq.empty, + primaryKeysPath = Seq(Vector("_key", "a")), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val schema = SchemaBuilder.struct() + .field("field1", Schema.STRING_SCHEMA) + .field("field2", Schema.INT32_SCHEMA) + .build() + + val struct = new Struct(schema) + .put("field1", "value1") + .put("field2", 2) + + val exception = the[IllegalArgumentException] thrownBy { + TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = struct, + withStructure = false, + keySchema = keySchema, + key = key, + headers = new ConnectHeaders(), + ) + } + + exception.getMessage should include("Invalid field selection for '_key.a'") + } + + test("should throw exception when primary key path is invalid") { + + val jsonString = """{"field1": {"nestedField": "value1"}}""" + val value = jsonString + val schema = Schema.STRING_SCHEMA + + val kcqlValues = KcqlValues( + fields = Seq( + Field(name = "field1", alias = "field1", parents = Vector.empty), + ), + ignoredFields = Seq.empty, + primaryKeysPath = Seq(Vector("_value", "field1", "nonexistentField")), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val exception = intercept[IllegalArgumentException] { + TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = value, + withStructure = false, + keySchema = null, + key = null, + headers = new ConnectHeaders(), + ) + } + + exception.getMessage should include("Can't find nonexistentField field") + } + + test("should return the path when the message _key is involved") { + + val jsonString = """{"field1": "value1"}""" + val value = jsonString + val schema = Schema.STRING_SCHEMA + + val keyJsonString = """{"keyField": "keyValue"}""" + val key = keyJsonString + val keySchema = Schema.STRING_SCHEMA + + val headers = new ConnectHeaders() + headers.addString("headerKey", "headerValue") + + val kcqlValues = KcqlValues( + fields = Seq( + Field(name = "field1", alias = "field1", parents = Vector.empty), + ), + ignoredFields = Seq.empty, + primaryKeysPath = Seq( + Vector("_key", "keyField"), + ), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val result = TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = value, + withStructure = false, + keySchema = keySchema, + key = key, + headers = headers, + ) + + val (transformedJsonOpt, primaryKeys) = result + + transformedJsonOpt shouldBe defined + val transformedJson = transformedJsonOpt.get + + transformedJson.get("field1").asText() shouldEqual "value1" + + primaryKeys should have size 1 + primaryKeys(0) shouldEqual "keyValue" // Extracted from _key.keyField + } + test("return the primary key when the _key path is involved and the path is 2 levels deep") { + + val jsonString = """{"field1": {"nestedField": "value1"}}""" + val value = jsonString + val schema = Schema.STRING_SCHEMA + + val keyJsonString = """{"keyField": "keyValue"}""" + val key = keyJsonString + val keySchema = Schema.STRING_SCHEMA + + val headers = new ConnectHeaders() + headers.addString("headerKey", "headerValue") + + val kcqlValues = KcqlValues( + fields = Seq( + Field(name = "field1", alias = "field1", parents = Vector.empty), + ), + ignoredFields = Seq.empty, + primaryKeysPath = Seq( + Vector("_key", "keyField"), + ), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val result = TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = value, + withStructure = false, + keySchema = keySchema, + key = key, + headers = headers, + ) + + val (transformedJsonOpt, primaryKeys) = result + + transformedJsonOpt shouldBe defined + val transformedJson = transformedJsonOpt.get + + transformedJson.get("field1").get("nestedField").asText() shouldEqual "value1" + + primaryKeys should have size 1 + primaryKeys(0) shouldEqual "keyValue" + } + + test("returns the primary key from a header entry") { + val jsonString = """{"field1": "value1"}""" + val value = jsonString + val schema = Schema.STRING_SCHEMA + + val keyJsonString = """{"keyField": "keyValue"}""" + val key = keyJsonString + val keySchema = Schema.STRING_SCHEMA + + val headers = new ConnectHeaders() + headers.addString("headerKey", "headerValue") + + val kcqlValues = KcqlValues( + fields = Seq( + Field(name = "field1", alias = "field1", parents = Vector.empty), + ), + ignoredFields = Seq.empty, + primaryKeysPath = Seq( + Vector("_header", "headerKey"), + ), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val result = TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = value, + withStructure = false, + keySchema = keySchema, + key = key, + headers = headers, + ) + + val (transformedJsonOpt, primaryKeys) = result + + transformedJsonOpt shouldBe defined + val transformedJson = transformedJsonOpt.get + + transformedJson.get("field1").asText() shouldEqual "value1" + + primaryKeys should have size 1 + primaryKeys(0) shouldEqual "headerValue" + } + + test("fail when primary key path involves a missing header key") { + val jsonString = """{"field1": "value1"}""" + val value = jsonString + val schema = Schema.STRING_SCHEMA + + val keyJsonString = """{"keyField": "keyValue"}""" + val key = keyJsonString + val keySchema = Schema.STRING_SCHEMA + + val headers = new ConnectHeaders() + headers.addString("headerKey", "headerValue") + + val kcqlValues = KcqlValues( + fields = Seq( + Field(name = "field1", alias = "field1", parents = Vector.empty), + ), + ignoredFields = Seq.empty, + primaryKeysPath = Seq( + Vector("_header", "missingHeader"), + ), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val exception = intercept[IllegalArgumentException] { + TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = value, + withStructure = false, + keySchema = keySchema, + key = key, + headers = headers, + ) + } + + exception.getMessage should include("Header with key 'missingHeader' not found") + } + test("fail when the header key has a null value") { + val jsonString = """{"field1": "value1"}""" + val value = jsonString + val schema = Schema.STRING_SCHEMA + + val keyJsonString = """{"keyField": "keyValue"}""" + val key = keyJsonString + val keySchema = Schema.STRING_SCHEMA + + val headers = new ConnectHeaders() + headers.addString("headerKey", null) + + val kcqlValues = KcqlValues( + fields = Seq( + Field(name = "field1", alias = "field1", parents = Vector.empty), + ), + ignoredFields = Seq.empty, + primaryKeysPath = Seq( + Vector("_header", "headerKey"), + ), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val exception = intercept[IllegalArgumentException] { + TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = value, + withStructure = false, + keySchema = keySchema, + key = key, + headers = headers, + ) + } + + exception.getMessage should include("Header 'headerKey' has a null value") + } + test("fail when the primary key path uses a nested header key") { + val jsonString = """{"field1": "value1"}""" + val value = jsonString + val schema = Schema.STRING_SCHEMA + + val keyJsonString = """{"keyField": "keyValue"}""" + val key = keyJsonString + val keySchema = Schema.STRING_SCHEMA + + val headers = new ConnectHeaders() + headers.addString("headerKey", "headerValue") + + val kcqlValues = KcqlValues( + fields = Seq( + Field(name = "field1", alias = "field1", parents = Vector.empty), + ), + ignoredFields = Seq.empty, + primaryKeysPath = Seq( + Vector("_header", "headerKey", "nonexistentField"), + ), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val exception = intercept[IllegalArgumentException] { + TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = value, + withStructure = false, + keySchema = keySchema, + key = key, + headers = headers, + ) + } + + exception.getMessage shouldBe "Invalid field selection for '_header.headerKey.nonexistentField'. Headers lookup only supports single-level keys. Nested header keys are not supported." + } + + //this is not a header key path test + test("fail when primary key path uses a key path which does not exists") { + val jsonString = """{"field1": "value1"}""" + val value = jsonString + val schema = Schema.STRING_SCHEMA + + val keyJsonString = """{"keyField": "keyValue"}""" + val key = keyJsonString + val keySchema = Schema.STRING_SCHEMA + + val headers = new ConnectHeaders() + headers.addString("headerKey", "headerValue") + + val kcqlValues = KcqlValues( + fields = Seq( + Field(name = "field1", alias = "field1", parents = Vector.empty), + ), + ignoredFields = Seq.empty, + primaryKeysPath = Seq( + Vector("_key", "nonexistentField"), + ), + behaviorOnNullValues = NullValueBehavior.FAIL, + ) + + val exception = intercept[IllegalArgumentException] { + TransformAndExtractPK.apply( + kcqlValues = kcqlValues, + schema = schema, + value = value, + withStructure = false, + keySchema = keySchema, + key = key, + headers = headers, + ) + } + + exception.getMessage shouldBe "Invalid field selection for '_key.nonexistentField'. Can't find nonexistentField field. Field found are:keyField" + } +}