Skip to content

Commit

Permalink
[SPARK-42692][CONNECT] Implement Dataset.toJSON
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
This pr aims to implement Dataset.toJSON.

### Why are the changes needed?
Add Spark connect jvm client api coverage.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
- Add new test
- Manually checked Scala 2.13

Closes apache#40319 from LuciferYang/SPARK-42692.

Authored-by: yangjie01 <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
  • Loading branch information
LuciferYang authored and hvanhovell committed Mar 7, 2023
1 parent 67328de commit 51504e4
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{PrimitiveLongEnc
import org.apache.spark.sql.catalyst.expressions.RowOrdering
import org.apache.spark.sql.connect.client.SparkResult
import org.apache.spark.sql.connect.common.DataTypeProtoConverter
import org.apache.spark.sql.functions.{struct, to_json}
import org.apache.spark.sql.types.{Metadata, StructType}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -2777,7 +2778,7 @@ class Dataset[T] private[sql] (
}

def toJSON: Dataset[String] = {
throw new UnsupportedOperationException("toJSON is not implemented.")
select(to_json(struct(col("*")))).as(StringEncoder)
}

private[sql] def analyze: proto.AnalyzePlanResponse = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,19 @@ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper {
val otherPlan = spark.sql("select 1")
assert(plan.sameSemantics(otherPlan))
}

test("toJSON") {
val expected = Array(
"""{"b":0.0,"id":0,"d":"world","a":0}""",
"""{"b":0.1,"id":1,"d":"world","a":1}""",
"""{"b":0.2,"id":2,"d":"world","a":0}""")
val result = spark
.range(3)
.select(generateMyTypeColumns: _*)
.toJSON
.collect()
assert(result sameElements expected)
}
}

private[sql] case class MyType(id: Long, a: Double, b: Double)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,10 @@ class PlanGenerationTestSuite
session.read.json(testDataPath.resolve("people.json").toString)
}

test("toJSON") {
complex.toJSON
}

test("read csv") {
session.read.csv(testDataPath.resolve("people.csv").toString)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Project [to_json(struct(id, id#0L, a, a#0, b, b#0, d, d#0, e, e#0, f, f#0, g, g#0), Some(America/Los_Angeles)) AS to_json(struct(id, a, b, d, e, f, g))#0]
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{
"common": {
"planId": "1"
},
"project": {
"input": {
"common": {
"planId": "0"
},
"localRelation": {
"schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
}
},
"expressions": [{
"unresolvedFunction": {
"functionName": "to_json",
"arguments": [{
"unresolvedFunction": {
"functionName": "struct",
"arguments": [{
"unresolvedStar": {
}
}]
}
}]
}
}]
}
}
Binary file not shown.

0 comments on commit 51504e4

Please sign in to comment.