diff --git a/fips-pom.xml b/fips-pom.xml index d0fe21b6..351c9234 100644 --- a/fips-pom.xml +++ b/fips-pom.xml @@ -44,7 +44,6 @@ 4.3.0 2.13.2 2.13.4.2 - 2.13.5 @@ -145,11 +144,6 @@ jackson-annotations ${jackson.version} - - com.fasterxml.jackson.module - jackson-module-scala_2.12 - ${jackson.module.scala.version} - diff --git a/pom.xml b/pom.xml index 1e3e8367..eb40208b 100644 --- a/pom.xml +++ b/pom.xml @@ -46,7 +46,6 @@ 4.3.0 2.13.2 2.13.4.2 - 2.13.5 @@ -133,11 +132,6 @@ jackson-annotations ${jackson.version} - - com.fasterxml.jackson.module - jackson-module-scala_2.12 - ${jackson.module.scala.version} - diff --git a/src/main/scala/com/snowflake/snowpark/Session.scala b/src/main/scala/com/snowflake/snowpark/Session.scala index 90f7b83e..1beac1c0 100644 --- a/src/main/scala/com/snowflake/snowpark/Session.scala +++ b/src/main/scala/com/snowflake/snowpark/Session.scala @@ -1,8 +1,5 @@ package com.snowflake.snowpark -import com.fasterxml.jackson.databind.json.JsonMapper -import com.fasterxml.jackson.module.scala.{ClassTagExtensions, DefaultScalaModule} - import java.io.{File, FileInputStream, FileNotFoundException} import java.net.URI import java.sql.{Connection, Date, Time, Timestamp} @@ -29,7 +26,6 @@ import net.snowflake.client.jdbc.{SnowflakeConnectionV1, SnowflakeDriver, Snowfl import scala.concurrent.{ExecutionContext, Future} import scala.collection.JavaConverters._ import scala.reflect.runtime.universe.TypeTag -import scala.util.Try /** * @@ -65,11 +61,6 @@ import scala.util.Try * @since 0.1.0 */ class Session private (private[snowpark] val conn: ServerConnection) extends Logging { - private val jsonMapper = JsonMapper - .builder() - .addModule(DefaultScalaModule) - .build() :: ClassTagExtensions - private val STAGE_PREFIX = "@" // URI and file name with md5 private val classpathURIs = new ConcurrentHashMap[URI, Option[String]]().asScala @@ -397,7 +388,7 @@ class Session private (private[snowpark] val conn: ServerConnection) extends Log * successful, or `None` otherwise. */ private def parseJsonString(jsonString: String): Option[Map[String, Any]] = { - Try(jsonMapper.readValue[Map[String, Any]](jsonString)).toOption + Utils.jsonToMap(jsonString) } /** @@ -408,7 +399,7 @@ class Session private (private[snowpark] val conn: ServerConnection) extends Log * or `None` otherwise. */ private def toJsonString(map: Map[String, Any]): Option[String] = { - Try(jsonMapper.writeValueAsString(map)).toOption + Utils.mapToJson(map) } /* diff --git a/src/main/scala/com/snowflake/snowpark/internal/OpenTelemetry.scala b/src/main/scala/com/snowflake/snowpark/internal/OpenTelemetry.scala index 82108d43..8a219847 100644 --- a/src/main/scala/com/snowflake/snowpark/internal/OpenTelemetry.scala +++ b/src/main/scala/com/snowflake/snowpark/internal/OpenTelemetry.scala @@ -60,51 +60,33 @@ object OpenTelemetry extends Logging { execName: String, execHandler: String, execFilePath: String)(func: => T): T = { - try { - spanInfo.withValue[T](spanInfo.value match { - // empty info means this is the entry of the recursion - case None => - val stacks = Thread.currentThread().getStackTrace - val (fileName, lineNumber) = findLineNumber(stacks) - Some( - UdfInfo( - className, - funcName, - fileName, - lineNumber, - execName, - execHandler, - execFilePath)) - // if value is not empty, this function call should be recursion. - // do not issue new SpanInfo, use the info inherited from previous. - case other => other - }) { - val result: T = func - OpenTelemetry.emit(spanInfo.value.get) - result - } - } catch { - case error: Throwable => - OpenTelemetry.reportError(className, funcName, error) - throw error - } + val stacks = Thread.currentThread().getStackTrace + val (fileName, lineNumber) = findLineNumber(stacks) + val newSpan = + UdfInfo(className, funcName, fileName, lineNumber, execName, execHandler, execFilePath) + emitSpan(newSpan, className, funcName, func) } // wrapper of all action functions def action[T](className: String, funcName: String, methodChain: String)(func: => T): T = { + val stacks = Thread.currentThread().getStackTrace + val (fileName, lineNumber) = findLineNumber(stacks) + val newInfo = + ActionInfo(className, funcName, fileName, lineNumber, s"$methodChain.$funcName") + emitSpan(newInfo, className, funcName, func) + } + + private def emitSpan[T](span: SpanInfo, className: String, funcName: String, thunk: => T): T = { try { - spanInfo.withValue[T](spanInfo.value match { - // empty info means this is the entry of the recursion + spanInfo.value match { case None => - val stacks = Thread.currentThread().getStackTrace - val (fileName, lineNumber) = findLineNumber(stacks) - Some(ActionInfo(className, funcName, fileName, lineNumber, s"$methodChain.$funcName")) - // if value is not empty, this function call should be recursion. - // do not issue new SpanInfo, use the info inherited from previous. - case other => other - }) { - val result: T = func - OpenTelemetry.emit(spanInfo.value.get) - result + spanInfo.withValue(Some(span)) { + val result: T = thunk + // only emit one time, in the top level action + OpenTelemetry.emit(spanInfo.value.get) + result + } + case _ => + thunk } } catch { case error: Throwable => diff --git a/src/main/scala/com/snowflake/snowpark/internal/UDFClassPath.scala b/src/main/scala/com/snowflake/snowpark/internal/UDFClassPath.scala index 31c8200f..aedbc8a6 100644 --- a/src/main/scala/com/snowflake/snowpark/internal/UDFClassPath.scala +++ b/src/main/scala/com/snowflake/snowpark/internal/UDFClassPath.scala @@ -3,7 +3,6 @@ package com.snowflake.snowpark.internal import com.fasterxml.jackson.annotation.JsonView import com.fasterxml.jackson.core.TreeNode import com.fasterxml.jackson.databind.JsonNode -import com.fasterxml.jackson.module.scala.DefaultScalaModule import java.io.File import java.net.{URI, URLClassLoader} @@ -25,8 +24,6 @@ object UDFClassPath extends Logging { val jacksonDatabindClass: Class[JsonNode] = classOf[com.fasterxml.jackson.databind.JsonNode] val jacksonCoreClass: Class[TreeNode] = classOf[com.fasterxml.jackson.core.TreeNode] val jacksonAnnotationClass: Class[JsonView] = classOf[com.fasterxml.jackson.annotation.JsonView] - val jacksonModuleScalaClass: Class[DefaultScalaModule] = - classOf[com.fasterxml.jackson.module.scala.DefaultScalaModule] val jacksonJarSeq = Seq( RequiredLibrary( getPathForClass(jacksonDatabindClass), @@ -36,11 +33,7 @@ object UDFClassPath extends Logging { RequiredLibrary( getPathForClass(jacksonAnnotationClass), "jackson-annotation", - jacksonAnnotationClass), - RequiredLibrary( - getPathForClass(jacksonModuleScalaClass), - "jackson-module-scala", - jacksonModuleScalaClass)) + jacksonAnnotationClass)) /* * Libraries required to compile java code generated by snowpark for user's lambda. diff --git a/src/main/scala/com/snowflake/snowpark/internal/Utils.scala b/src/main/scala/com/snowflake/snowpark/internal/Utils.scala index d464566c..9b1b9a3b 100644 --- a/src/main/scala/com/snowflake/snowpark/internal/Utils.scala +++ b/src/main/scala/com/snowflake/snowpark/internal/Utils.scala @@ -1,5 +1,7 @@ package com.snowflake.snowpark.internal +import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper} +import com.fasterxml.jackson.databind.node.JsonNodeType import com.snowflake.snowpark.Column import com.snowflake.snowpark.internal.analyzer.{ Attribute, @@ -15,6 +17,7 @@ import java.util.Locale import com.snowflake.snowpark.udtf.UDTF import net.snowflake.client.jdbc.SnowflakeSQLException +import scala.collection.JavaConverters.{asScalaIteratorConverter, mapAsScalaMapConverter} import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.util.Random @@ -447,4 +450,69 @@ object Utils extends Logging { case _ => throw ErrorMessage.DF_JOIN_WITH_WRONG_ARGUMENT() } } + + private val objectMapper = new ObjectMapper() + + private[snowpark] def jsonToMap(jsonString: String): Option[Map[String, Any]] = { + try { + val node = objectMapper.readTree(jsonString) + assert(node.getNodeType == JsonNodeType.OBJECT) + Some(jsonToScala(node).asInstanceOf[Map[String, Any]]) + } catch { + case ex: Exception => + logError(ex.getMessage) + None + } + } + + private def jsonToScala(node: JsonNode): Any = { + node.getNodeType match { + case JsonNodeType.STRING => node.asText() + case JsonNodeType.NULL => null + case JsonNodeType.OBJECT => + node + .fields() + .asScala + .map(entry => { + entry.getKey -> jsonToScala(entry.getValue) + }) + .toMap + case JsonNodeType.ARRAY => + node.elements().asScala.map(entry => jsonToScala(entry)).toSeq + case JsonNodeType.BOOLEAN => node.asBoolean() + case JsonNodeType.NUMBER => node.numberValue() + case other => + throw new UnsupportedOperationException(s"Unsupported Type: ${other.name()}") + } + } + + private[snowpark] def mapToJson(map: Map[String, Any]): Option[String] = { + try { + Some(scalaToJson(map)) + } catch { + case ex: Exception => + logError(ex.getMessage) + None + } + } + + private def scalaToJson(input: Any): String = + input match { + case null => "null" + case str: String => s""""$str"""" + case _: Int | _: Short | _: Long | _: Byte | _: Double | _: Float | _: Boolean => + input.toString + case map: Map[String, _] => + map + .map { + case (key, value) => s"${scalaToJson(key)}:${scalaToJson(value)}" + } + .mkString("{", ",", "}") + case seq: Seq[_] => seq.map(scalaToJson).mkString("[", ",", "]") + case arr: Array[_] => scalaToJson(arr.toSeq) + case list: java.util.List[_] => scalaToJson(list.toArray) + case map: java.util.Map[String, _] => scalaToJson(map.asScala.toMap) + case _ => + throw new UnsupportedOperationException(s"Unsupported Type: ${input.getClass.getName}") + } } diff --git a/src/test/scala/com/snowflake/snowpark/UtilsSuite.scala b/src/test/scala/com/snowflake/snowpark/UtilsSuite.scala index 2067212f..14c26bc4 100644 --- a/src/test/scala/com/snowflake/snowpark/UtilsSuite.scala +++ b/src/test/scala/com/snowflake/snowpark/UtilsSuite.scala @@ -24,6 +24,7 @@ import java.lang.{ } import net.snowflake.client.jdbc.SnowflakeSQLException +import java.util import scala.collection.mutable.ArrayBuffer class UtilsSuite extends SNTestBase { @@ -672,6 +673,58 @@ class UtilsSuite extends SNTestBase { assert(Utils.quoteForOption("FALSE").equals("FALSE")) assert(Utils.quoteForOption("abc").equals("'abc'")) } + + test("Scala and Json format transformation") { + val javaHashMap = new util.HashMap[String, String]() { + { + put("one", "1") + put("two", "2") + put("three", "3") + } + } + val map = Map( + "nullKey" -> null, + "integerKey" -> 42, + "shortKey" -> 123.toShort, + "longKey" -> 1234567890L, + "byteKey" -> 123.toByte, + "doubleKey" -> 3.1415926, + "floatKey" -> 3.14F, + "boolKey" -> false, + "javaListKey" -> new util.ArrayList[String](util.Arrays.asList("a", "b")), + "javaMapKey" -> javaHashMap, + "seqKey" -> Seq(1, 2, 3), + "arrayKey" -> Array(1, 2, 3), + "seqOfStringKey" -> Seq("1", "2", "3"), + "stringKey" -> "stringValue", + "nestedMap" -> Map("insideKey" -> "stringValue", "insideList" -> Seq(1, 2, 3)), + "nestedList" -> Seq(1, Map("nestedKey" -> "nestedValue"), Array(1, 2, 3))) + val jsonString = Utils.mapToJson(map) + val expected_string = "{" + + "\"floatKey\":3.14," + + "\"javaMapKey\":{" + + "\"one\":\"1\"," + + "\"two\":\"2\"," + + "\"three\":\"3\"}," + + "\"integerKey\":42," + + "\"nullKey\":null," + + "\"longKey\":1234567890," + + "\"byteKey\":123," + + "\"seqKey\":[1,2,3]," + + "\"nestedMap\":{\"insideKey\":\"stringValue\",\"insideList\":[1,2,3]}," + + "\"stringKey\":\"stringValue\"," + + "\"doubleKey\":3.1415926," + + "\"seqOfStringKey\":[\"1\",\"2\",\"3\"]," + + "\"nestedList\":[1,{\"nestedKey\":\"nestedValue\"},[1,2,3]]," + + "\"javaListKey\":[\"a\",\"b\"]," + + "\"arrayKey\":[1,2,3]," + + "\"boolKey\":false," + + "\"shortKey\":123}" + val readMap = Utils.jsonToMap(jsonString.getOrElse("")) + val transformedString = Utils.mapToJson(readMap.getOrElse(Map())) + assert(jsonString.getOrElse("").equals(expected_string)) + assert(jsonString.equals(transformedString)) + } } object LoggingTester extends Logging { diff --git a/src/test/scala/com/snowflake/snowpark_test/OpenTelemetrySuite.scala b/src/test/scala/com/snowflake/snowpark_test/OpenTelemetrySuite.scala index 4e8f6d58..15118fd3 100644 --- a/src/test/scala/com/snowflake/snowpark_test/OpenTelemetrySuite.scala +++ b/src/test/scala/com/snowflake/snowpark_test/OpenTelemetrySuite.scala @@ -440,6 +440,12 @@ class OpenTelemetrySuite extends OpenTelemetryEnabled { checkSpanError("snow.snowpark.ClassA1", "functionB1", error) } + test("only emit span once in the nested actions") { + session.sql("select 1").count() + val l = testSpanExporter.getFinishedSpanItems + assert(l.size() == 1) + } + override def beforeAll: Unit = { super.beforeAll createStage(stageName1)