Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-44508][PYTHON][DOCS] Add user guide for Python user-defined table functions #42272

Closed

Conversation

allisonwang-db
Copy link
Contributor

@allisonwang-db allisonwang-db commented Aug 1, 2023

What changes were proposed in this pull request?

This PR adds a user guide for Python user-defined table functions (UDTFs) introduced in Spark 3.5.
Screenshot 2023-08-04 at 14 46 13

Why are the changes needed?

To help users write Python UDTFs.

Does this PR introduce any user-facing change?

No

How was this patch tested?

docs test

Copy link
Contributor

@itholic itholic left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise looks pretty fine to me

./bin/spark-submit examples/src/main/python/sql/udtf.py
"""

# NOTE that this file is imported in user guide in PySpark documentation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "user guide" -> "User Guides" to follow official documentation name?

Also maybe adding a doc link(https://spark.apache.org/docs/latest/api/python/user_guide/index.html) would helpful?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup it's on the user guide page. I will add a screenshot in the PR description.

self.count += 1

def terminate(self):
yield self.count,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qq: should we always yield the data as tuple for UDTF?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, each element corresponds to one column in the output schema.

Python User-defined Table Functions (UDTFs)
===========================================

Spark 3.5 introduces a new type of user-defined fucntion: Python user-defined table functions (UDTFs),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: "fucntion" -> "function"


Yields:
tuple: A tuple representing a single row in the UDTF result relation.
Yield thisas many times as needed to produce multiple rows.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo?: "thisas" -> "this as"


This method is required to implement.

Args:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not pretty sure if we should follow numpydoc style here since we're following them in overall PySpark code base. WDYT @HyukjinKwon ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah should follow numpy doc style I think

@itholic
Copy link
Contributor

itholic commented Aug 1, 2023

I think attaching screen capture(or something visible stuffs for built documentation) in the PR description would be great!

@allisonwang-db
Copy link
Contributor Author

cc @dtenedor @ueshin

Copy link
Contributor

@dtenedor dtenedor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These docs look great, thanks Allison for working on this!

===========================================

Spark 3.5 introduces a new type of user-defined fucntion: Python user-defined table functions (UDTFs),
which take zero or more arguments and return a set of rows.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
which take zero or more arguments and return a set of rows.
wherein each invocation appears in the FROM clause and returns an entire
relation as output instead of a single result value. Every UDTF call accepts
zero or more arguments each comprising either a scalar constant expression or
a separate input relation.


.. currentmodule:: pyspark.sql.functions

To implement a Python UDTF, you can implement this class:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
To implement a Python UDTF, you can implement this class:
To implement a Python UDTF, you can define a class implementing these methods:

Initialize the user-defined table function (UDTF).

This method is optional to implement and is called once when the UDTF is
instantiated. Use it to perform any initialization required for the UDTF.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also describe the UDTF class instance's lifetime here? For example, any class fields assigned here will be available for subsequent eval method call(s) to consume (either just one eval call for a UDTF call accepting only scalar constant arg(s) or several eval calls for a UDTF call accepting an input relation arg).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also should mention that it should be a default constructor which doesn’t accept any extra arguments?


def eval(self, *args: Any) -> Iterator[Any]:
""""
Evaluate the function using the given input arguments.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking about this too, but I found it difficult to explain in words. The interface is the same as scalar UDFs so I think Spark users should be able to figure it out. I can provide more examples.

Copy link
Contributor

@dtenedor dtenedor Aug 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 more examples should be helpful. Maybe we could also add:

The arguments provided to the UDTF call map to the values in this *args list,
in order. Each provided scalar expression maps to exactly one value in this
*args list. Each provided TABLE argument of N columns maps to exactly N
values in this *args list, in the order of the columns as they appear in the
table.

python/docs/source/user_guide/sql/python_udtf.rst Outdated Show resolved Hide resolved

Example:
def eval(self, x: int, y: int):
yield x + y, x - y
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also add an example with a combination of scalar constant arguments and a relation input argument, to show how the mapping from provided SQL arguments to the python *args works? Could we include a SQL query and its results with each example as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! I will add a simple one here and a more complex one in the example section below.


def terminate(self) -> Iterator[Any]:
"""
Called when the UDTF has processed all rows in a partition.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We haven't really precisely defined what comprises a partition yet. Should we define it using the definitions from #42100 and #42174? Alternatively if these docs are targeting Spark 3.5 but those PRs are only going into master, we could simply define a partition here as either (1) just one eval call with the provided scalar argument(s), if any, or (2) several eval calls with an undefined subset of the rows from the input relation. Then we can expand it later.


def eval(self, *args: Any) -> Iterator[Any]:
""""
Evaluate the function using the given input arguments.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking about this too, but I found it difficult to explain in words. The interface is the same as scalar UDFs so I think Spark users should be able to figure it out. I can provide more examples.

Initialize the user-defined table function (UDTF).

This method is optional to implement and is called once when the UDTF is
instantiated. Use it to perform any initialization required for the UDTF.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also should mention that it should be a default constructor which doesn’t accept any extra arguments?

python/docs/source/user_guide/sql/python_udtf.rst Outdated Show resolved Hide resolved
python/docs/source/user_guide/sql/python_udtf.rst Outdated Show resolved Hide resolved
@allisonwang-db
Copy link
Contributor Author

@ueshin @dtenedor @itholic @allanf-db @dstrodtman-db I've addressed the comments; PTAL thanks!

@johnayoub
Copy link

@allisonwang-db would we able to use this feature to return a dataframe? I think this will be extremely useful especially that some functions such as dropDuplicates have no equivalent in SQL and wrapping them would be helpful.

@allisonwang-db
Copy link
Contributor Author

@johnayoub A Python UDTF is a table-valued function and it returns a dataframe. However, I don't think you can directly use dataframe functions like dropDuplicates directly inside the UDTF.


The arguments provided to the UDTF call are mapped to the values in the
`*args` list sequentially. Each provided scalar expression maps to exactly
one value in this `*args` list. Each provided TABLE argument of N columns

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dtenedor Here's the line

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@allisonwang-db it turns out this part about TABLE arguments is wrong (I think I suggested it before, sorry). Instead of:

Each provided TABLE argument of N columns
maps to exactly N values in this `*args` list, in the order of the columns
as they appear in the table.

it should be something like

Each provided TABLE argument maps to a pyspark.sql.Row object containing
the columns in the order they appear in the provided input relation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated!

examples/src/main/python/sql/udtf.py Outdated Show resolved Hide resolved
examples/src/main/python/sql/udtf.py Outdated Show resolved Hide resolved
python/docs/source/user_guide/sql/python_udtf.rst Outdated Show resolved Hide resolved
Copy link
Contributor

@dtenedor dtenedor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @allisonwang-db for putting in the work to get this drafted, the documentation will be very useful for Spark users!

python/docs/source/user_guide/sql/python_udtf.rst Outdated Show resolved Hide resolved
python/docs/source/user_guide/sql/python_udtf.rst Outdated Show resolved Hide resolved
python/docs/source/user_guide/sql/python_udtf.rst Outdated Show resolved Hide resolved
python/docs/source/user_guide/sql/python_udtf.rst Outdated Show resolved Hide resolved
python/docs/source/user_guide/sql/python_udtf.rst Outdated Show resolved Hide resolved
------
tuple
A tuple representing a single row in the UDTF result relation.
Yield this if you want to return additional rows during termination.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we mention here the tricky detail that you have to include a trailing comma when yielding a row of just one value (here and above in the eval description)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea let me add an example



The return type of the UDTF defines the schema of the table it outputs.
It must be either a ``StructType`` or a DDL string representing a struct type.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we put an example with this DDL string as well? It looks useful :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do. All the examples below are actually using DDL strings, but I couldn't find any documentation on this. cc @HyukjinKwon do you know if we have documentation on DDL strings of pyspark types?

Advanced Featuress
------------------

TABLE input argument
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend to propose this as the primary way of passing relation arguments, rather than in the "additional features" section, since this syntax conforms to the SQL standard.

One way is to just swap the LATERAL syntax to this "advanced features" section instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved. But we might need to improve it in the future (SPARK-44746)


spark.udtf.register("filter_udtf", FilterUDTF)

spark.sql("SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)))").show()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is good, let's also add an example just passing a table by name directly as well, e.g. TABLE(t)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can follow up in SPARK-44746

@allisonwang-db
Copy link
Contributor Author

I've addressed all comments. cc @HyukjinKwon we should merge this soon in spark 3.5. I can create follow up PRs if there are additional comments. Thanks!

@HyukjinKwon
Copy link
Member

Merged to master and branch-3.5.

HyukjinKwon pushed a commit that referenced this pull request Sep 8, 2023
…ble functions

### What changes were proposed in this pull request?

This PR adds a user guide for Python user-defined table functions (UDTFs) introduced in Spark 3.5.
<img width="468" alt="Screenshot 2023-08-04 at 14 46 13" src="https://github.com/apache/spark/assets/66282705/11f5dc5e-681b-4677-a466-1a23c0b8dd01">

### Why are the changes needed?

To help users write Python UDTFs.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

docs test

Closes #42272 from allisonwang-db/spark-44508-udtf-user-guide.

Authored-by: allisonwang-db <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit aaf413c)
Signed-off-by: Hyukjin Kwon <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants