-
Notifications
You must be signed in to change notification settings - Fork 39
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Support for PythonUDAF expression #937
base: dev
Are you sure you want to change the base?
Changes from 4 commits
6d651b6
cadb525
f9701a1
d70668a
297c7ad
8a9fec9
e542bf3
cfae1a8
948a344
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -297,10 +297,12 @@ object ExecInfo { | |
// check is the node has a dataset operations and if so change to not supported | ||
val rddCheckRes = RDDCheckHelper.isDatasetOrRDDPlan(nodeName, node.desc) | ||
val ds = dataSet || rddCheckRes.isRDD | ||
val containsPythonUDF = ExecHelper.isPythonUDF(node) | ||
val finalSupported = isSupported || containsPythonUDF | ||
|
||
// if the expression is RDD because of the node name, then we do not want to add the | ||
// unsupportedExpressions because it becomes bogus. | ||
val finalUnsupportedExpr = if (rddCheckRes.nodeDescRDD) { | ||
val finalUnsupportedExpr = if (rddCheckRes.nodeDescRDD || containsPythonUDF) { | ||
Seq.empty[UnsupportedExpr] | ||
} else { | ||
unsupportedExprs | ||
|
@@ -313,7 +315,7 @@ object ExecInfo { | |
duration, | ||
nodeId, | ||
opType, | ||
isSupported, | ||
finalSupported, | ||
children, | ||
stages, | ||
shouldRemove, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to make changes to the usage of variable There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think so. Now |
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -331,7 +331,7 @@ abstract class AppBase( | |
} | ||
} | ||
|
||
private val UDFRegex = ".*UDF.*" | ||
private val UDFRegex = ".*(?<!python)UDF.*" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this won't match |
||
|
||
private val potentialIssuesRegexMap = Map( | ||
UDFRegex -> "UDF", | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -355,9 +355,13 @@ object RDDCheckHelper { | |
} | ||
|
||
|
||
object ExecHelper { | ||
object ExecHelper extends Logging { | ||
private val UDFRegExLookup = Set( | ||
".*UDF.*".r | ||
".*(?<!python)UDF.*".r | ||
) | ||
|
||
private val pythonUDFRegExLookUp = Set( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here as my previous comment. There are three regular expressions that look overlapping. |
||
".*pythonUDF.*".r | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This does not take |
||
) | ||
|
||
// we don't want to mark the *InPandas and ArrowEvalPythonExec as unsupported with UDF | ||
|
@@ -385,6 +389,10 @@ object ExecHelper { | |
} | ||
} | ||
|
||
def isPythonUDF(node: SparkPlanGraphNode): Boolean = { | ||
pythonUDFRegExLookUp.exists(regEx => node.desc.matches(regEx.regex)) | ||
} | ||
|
||
def shouldBeRemoved(nodeName: String): Boolean = { | ||
execsToBeRemoved.contains(nodeName) | ||
} | ||
|
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -858,6 +858,21 @@ class SQLPlanParserSuite extends BaseTestSuite { | |
} | ||
} | ||
|
||
test("Python UDFs should be supported") { | ||
val eventLog = s"$qualLogDir/python_udf_eventlog" | ||
val pluginTypeChecker = new PluginTypeChecker() | ||
val app = createAppFromEventlog(eventLog) | ||
assert(app.sqlPlans.size == 1) | ||
val parsedPlans = app.sqlPlans.map { case (sqlID, plan) => | ||
SQLPlanParser.parseSQLPlan(app.appId, plan, sqlID, "", pluginTypeChecker, app) | ||
} | ||
val execInfo = getAllExecsFromPlan(parsedPlans.toSeq) | ||
val projectExec = execInfo.filter(_.exec.contains("Project")) | ||
assertSizeAndSupported(2, projectExec) | ||
val batchEvalPythonExec = execInfo.filter(_.exec.contains("BatchEvalPython")) | ||
assertSizeAndSupported(1, batchEvalPythonExec) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there is something I missing here. I cannot find anywhere in the code that we handle There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. BatchEvalPython should not be supported. I will fix this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Have we confirmed that for a fact? |
||
} | ||
|
||
test("Expression not supported in FilterExec") { | ||
TrampolineUtil.withTempDir { eventLogDir => | ||
val (eventLog, _) = ToolTestUtils.generateEventLog(eventLogDir, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is correct.
The code comment just above that change is explaining that we avoid parsing the node content when it is
rddCheckRes.nodeDescRDD
. But the new code tries to check if it is pythonUDF anyway which could lead to bogus results.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the confusion. I will address this in the next iteration.