-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathquestion2.zpln
148 lines (148 loc) · 9.17 KB
/
question2.zpln
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
{
"paragraphs": [
{
"text": "val dataPathRoot = \"file:///data/\"\nval dataPath = dataPathRoot+\"dataset/*\"\nval df = spark.sparkContext.wholeTextFiles(dataPath).toDF\nval output = df\n.select((substring_index(col(\"_1\"), \"/\", -1)).cast(\"Int\").alias(\"docId\"), explode(split((regexp_replace(col(\"_2\"), \"[^a-zA-Z\\\\s]\", \"\")),\"\\\\s+\")).alias(\"w\"))\n.select(col(\"docId\"), (lower(regexp_replace(col(\"w\"),\"^[^a-zA-Z]+\",\"\"))).alias(\"w\"))\n.filter(!(col(\"w\")===\"\" || col(\"w\").contains(\"--\") || col(\"w\")===\"-\"))\n.groupBy(\"w\").agg(concat_ws(\" \",sort_array(collect_set(\"docId\"))).alias(\"docList\"))\n.orderBy(\"w\")\n.repartition(1)\noutput.write.mode(\"overwrite\").option(\"header\",\"false\").csv(dataPathRoot+\"output\")",
"user": "anonymous",
"dateUpdated": "2020-10-11T09:11:01+0000",
"config": {
"editorSetting": {
"language": "scala",
"editOnDblClick": false,
"completionKey": "TAB",
"completionSupport": true
},
"colWidth": 12,
"editorMode": "ace/mode/scala",
"fontSize": 9,
"results": {},
"enabled": true
},
"settings": {
"params": {},
"forms": {}
},
"results": {
"code": "SUCCESS",
"msg": [
{
"type": "TEXT",
"data": "\u001b[1m\u001b[34mdataPathRoot\u001b[0m: \u001b[1m\u001b[32mString\u001b[0m = file:///data/\n\u001b[1m\u001b[34mdataPath\u001b[0m: \u001b[1m\u001b[32mString\u001b[0m = file:///data/dataset/*\n\u001b[1m\u001b[34mdf\u001b[0m: \u001b[1m\u001b[32morg.apache.spark.sql.DataFrame\u001b[0m = [_1: string, _2: string]\n\u001b[1m\u001b[34moutput\u001b[0m: \u001b[1m\u001b[32morg.apache.spark.sql.Dataset[org.apache.spark.sql.Row]\u001b[0m = [w: string, docList: string]\n"
}
]
},
"apps": [],
"runtimeInfos": {
"jobUrl": {
"propertyName": "jobUrl",
"label": "SPARK JOB",
"tooltip": "View in Spark web UI",
"group": "spark",
"values": [
{
"jobUrl": "http://f16139b6ba49:4040/jobs/job?id=0",
"$$hashKey": "object:473"
},
{
"jobUrl": "http://f16139b6ba49:4040/jobs/job?id=1",
"$$hashKey": "object:474"
}
],
"interpreterSettingId": "spark"
}
},
"progressUpdateIntervalMs": 500,
"jobName": "paragraph_1602407419624_-1388632396",
"id": "paragraph_1602356060652_-357909338",
"dateCreated": "2020-10-11T09:10:19+0000",
"status": "FINISHED",
"focus": true,
"$$hashKey": "object:223",
"dateFinished": "2020-10-11T09:11:34+0000",
"dateStarted": "2020-10-11T09:11:01+0000"
},
{
"text": "output.explain(true)",
"user": "anonymous",
"dateUpdated": "2020-10-11T09:11:43+0000",
"config": {
"editorSetting": {
"language": "scala",
"editOnDblClick": false,
"completionKey": "TAB",
"completionSupport": true
},
"colWidth": 12,
"editorMode": "ace/mode/scala",
"fontSize": 9,
"results": {},
"enabled": true
},
"settings": {
"params": {},
"forms": {}
},
"results": {
"code": "SUCCESS",
"msg": [
{
"type": "TEXT",
"data": "== Parsed Logical Plan ==\nRepartition 1, true\n+- Sort [w#13 ASC NULLS FIRST], true\n +- Aggregate [w#13], [w#13, concat_ws( , cast(sort_array(collect_set(docId#8, 0, 0), true) as array<string>)) AS docList#19]\n +- Filter NOT (((w#13 = ) || Contains(w#13, --)) || (w#13 = -))\n +- Project [docId#8, lower(regexp_replace(w#10, ^[^a-zA-Z]+, )) AS w#13]\n +- Project [cast(substring_index(_1#3, /, -1) as int) AS docId#8, w#10]\n +- Generate explode(split(regexp_replace(_2#4, [^a-zA-Z\\s], ), \\s+)), false, [w#10]\n +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, true, false) AS _2#4]\n +- ExternalRDD [obj#2]\n\n== Analyzed Logical Plan ==\nw: string, docList: string\nRepartition 1, true\n+- Sort [w#13 ASC NULLS FIRST], true\n +- Aggregate [w#13], [w#13, concat_ws( , cast(sort_array(collect_set(docId#8, 0, 0), true) as array<string>)) AS docList#19]\n +- Filter NOT (((w#13 = ) || Contains(w#13, --)) || (w#13 = -))\n +- Project [docId#8, lower(regexp_replace(w#10, ^[^a-zA-Z]+, )) AS w#13]\n +- Project [cast(substring_index(_1#3, /, -1) as int) AS docId#8, w#10]\n +- Generate explode(split(regexp_replace(_2#4, [^a-zA-Z\\s], ), \\s+)), false, [w#10]\n +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, true, false) AS _2#4]\n +- ExternalRDD [obj#2]\n\n== Optimized Logical Plan ==\nRepartition 1, true\n+- Sort [w#13 ASC NULLS FIRST], true\n +- Aggregate [w#13], [w#13, concat_ws( , cast(sort_array(collect_set(docId#8, 0, 0), true) as array<string>)) AS docList#19]\n +- Project [cast(substring_index(_1#3, /, -1) as int) AS docId#8, lower(regexp_replace(w#10, ^[^a-zA-Z]+, )) AS w#13]\n +- Filter ((NOT (lower(regexp_replace(w#10, ^[^a-zA-Z]+, )) = ) && NOT Contains(lower(regexp_replace(w#10, ^[^a-zA-Z]+, )), --)) && NOT (lower(regexp_replace(w#10, ^[^a-zA-Z]+, )) = -))\n +- Generate explode(split(regexp_replace(_2#4, [^a-zA-Z\\s], ), \\s+)), [1], false, [w#10]\n +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#4]\n +- ExternalRDD [obj#2]\n\n== Physical Plan ==\nExchange RoundRobinPartitioning(1)\n+- *(3) Sort [w#13 ASC NULLS FIRST], true, 0\n +- Exchange rangepartitioning(w#13 ASC NULLS FIRST, 200)\n +- ObjectHashAggregate(keys=[w#13], functions=[collect_set(docId#8, 0, 0)], output=[w#13, docList#19])\n +- Exchange hashpartitioning(w#13, 200)\n +- ObjectHashAggregate(keys=[w#13], functions=[partial_collect_set(docId#8, 0, 0)], output=[w#13, buf#25])\n +- *(2) Project [cast(substring_index(_1#3, /, -1) as int) AS docId#8, lower(regexp_replace(w#10, ^[^a-zA-Z]+, )) AS w#13]\n +- *(2) Filter ((NOT (lower(regexp_replace(w#10, ^[^a-zA-Z]+, )) = ) && NOT Contains(lower(regexp_replace(w#10, ^[^a-zA-Z]+, )), --)) && NOT (lower(regexp_replace(w#10, ^[^a-zA-Z]+, )) = -))\n +- Generate explode(split(regexp_replace(_2#4, [^a-zA-Z\\s], ), \\s+)), [_1#3], false, [w#10]\n +- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#4]\n +- Scan file:///data/dataset/*[obj#2]\n"
}
]
},
"apps": [],
"runtimeInfos": {},
"progressUpdateIntervalMs": 500,
"jobName": "paragraph_1602407419625_-519910376",
"id": "paragraph_1602358047477_2053527356",
"dateCreated": "2020-10-11T09:10:19+0000",
"status": "FINISHED",
"$$hashKey": "object:224",
"dateFinished": "2020-10-11T09:11:43+0000",
"dateStarted": "2020-10-11T09:11:43+0000"
},
{
"user": "anonymous",
"config": {
"colWidth": 12,
"fontSize": 9,
"enabled": true,
"results": {},
"editorSetting": {
"language": "scala",
"editOnDblClick": false,
"completionKey": "TAB",
"completionSupport": true
},
"editorMode": "ace/mode/scala"
},
"settings": {
"params": {},
"forms": {}
},
"apps": [],
"runtimeInfos": {},
"progressUpdateIntervalMs": 500,
"jobName": "paragraph_1602407503492_294177231",
"id": "paragraph_1602407503492_294177231",
"dateCreated": "2020-10-11T09:11:43+0000",
"status": "READY",
"focus": true,
"$$hashKey": "object:483"
}
],
"name": "question2",
"id": "2FQBK9RWG",
"defaultInterpreterGroup": "spark",
"version": "0.9.0-preview1",
"noteParams": {},
"noteForms": {},
"angularObjects": {},
"config": {
"isZeppelinNotebookCronEnable": false,
"looknfeel": "default",
"personalizedMode": "false"
},
"info": {},
"path": "/question2"
}