Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-44508][PYTHON][DOCS] Add user guide for Python user-defined table functions #42272

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 169 additions & 0 deletions examples/src/main/python/sql/udtf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
A simple example demonstrating Python UDTFs in Spark
Run with:
./bin/spark-submit examples/src/main/python/sql/udtf.py
"""

# NOTE that this file is imported in user guide in PySpark documentation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "user guide" -> "User Guides" to follow official documentation name?

Also maybe adding a doc link(https://spark.apache.org/docs/latest/api/python/user_guide/index.html) would helpful?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup it's on the user guide page. I will add a screenshot in the PR description.

# The codes are referred via line numbers. See also `literalinclude` directive in Sphinx.
import pandas as pd
from typing import Iterator, Any

from pyspark.sql import SparkSession
from pyspark.sql.pandas.utils import require_minimum_pandas_version, require_minimum_pyarrow_version

# Python UDTFs use Arrow by default.
require_minimum_pandas_version()
require_minimum_pyarrow_version()


def python_udtf_simple_example(spark: SparkSession):

from pyspark.sql.functions import lit, udtf

class SimpleUDTF:
def eval(self, x: int, y: int):
yield x + y, x - y

# Now, create a Python UDTF using the defined class and specify a return type
func = udtf(SimpleUDTF, returnType="c1: int, c2: int")

func(lit(1), lit(2)).show()
# +---+---+
# | c1| c2|
# +---+---+
# | 3| -1|
# +---+---+


def python_udtf_registration(spark: SparkSession):

from pyspark.sql.functions import udtf

# Use the decorator to define the UDTF.
@udtf(returnType="c1: int, c2: int")
class PlusOne:
def eval(self, x: int):
yield x, x + 1

# Register the UDTF
spark.udtf.register("plus_one", PlusOne)

# Use the UDTF in SQL
spark.sql("SELECT * FROM plus_one(1)").show()
# +---+---+
# | c1| c2|
# +---+---+
# | 1| 2|
# +---+---+

# Use the UDTF in SQL with lateral join
spark.sql("SELECT * FROM VALUES (0, 1), (1, 2) t(x, y), LATERAL plus_one(x)").show()
# +---+---+---+---+
# | x| y| c1| c2|
# +---+---+---+---+
# | 0| 1| 0| 1|
# | 1| 2| 1| 2|
# +---+---+---+---+


def python_udtf_terminate_example(spark: SparkSession):

from pyspark.sql.functions import udtf

@udtf(returnType="cnt: int")
class CountUDTF:
def __init__(self):
self.count = 0

def eval(self, x):
self.count += 1

def terminate(self):
yield self.count,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qq: should we always yield the data as tuple for UDTF?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, each element corresponds to one column in the output schema.



spark.udtf.register("count_udtf", CountUDTF)
spark.sql("SELECT * FROM range(0, 10, 1, 1), LATERAL count_udtf(id)")
# +---+---+
# | id|cnt|
# +---+---+
# | 9| 10|
# +---+---+


def python_udtf_calendar_example(spark: SparkSession):

import datetime
from pyspark.sql.functions import udtf

@udtf(returnType="date: string, year: int, month: int, day: int, day_of_week: string")
class Calendar:
def eval(self, start_date: str, end_date: str):
start_date = datetime.datetime.strptime(start_date, "%Y-%m-%d").date()
end_date = datetime.datetime.strptime(end_date, "%Y-%m-%d").date()
delta = end_date - start_date
dates = [start_date + datetime.timedelta(days=i) for i in range(delta.days + 1)]
for date in dates:
date_str = date.strftime("%Y-%m-%d")
yield (
date.strftime("%Y-%m-%d"),
date.year,
date.month,
date.day,
date.strftime("%A"),
)

Calendar(lit("2023-01-01"), lit("2023-01-10")).show()
# +----------+----+-----+---+-----------+
# | date|year|month|day|day_of_week|
# +----------+----+-----+---+-----------+
# |2023-01-01|2023| 1| 1| Sunday|
# |2023-01-02|2023| 1| 2| Monday|
# |2023-01-03|2023| 1| 3| Tuesday|
# |2023-01-04|2023| 1| 4| Wednesday|
# |2023-01-05|2023| 1| 5| Thursday|
# |2023-01-06|2023| 1| 6| Friday|
# |2023-01-07|2023| 1| 7| Saturday|
# |2023-01-08|2023| 1| 8| Sunday|
# |2023-01-09|2023| 1| 9| Monday|
# |2023-01-10|2023| 1| 10| Tuesday|
# +----------+----+-----+---+-----------+


if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("Python UDTF example") \
.getOrCreate()

print("Running simple Python UDTF example")
python_udtf_simple_example(spark)

print("Running Python UDTF registration example")
python_udtf_registration(spark)

print("Running Python UDTF terminate example")
python_udtf_terminate_example(spark)

print("Running Python UDTF calendar example")
python_udtf_calendar_example(spark)

spark.stop()
1 change: 1 addition & 0 deletions python/docs/source/user_guide/sql/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ Spark SQL
:maxdepth: 2

arrow_pandas
python_udtf

140 changes: 140 additions & 0 deletions python/docs/source/user_guide/sql/python_udtf.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.

===========================================
Python User-defined Table Functions (UDTFs)
===========================================

Spark 3.5 introduces a new type of user-defined fucntion: Python user-defined table functions (UDTFs),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: "fucntion" -> "function"

which take zero or more arguments and return a set of rows.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
which take zero or more arguments and return a set of rows.
wherein each invocation appears in the FROM clause and returns an entire
relation as output instead of a single result value. Every UDTF call accepts
zero or more arguments each comprising either a scalar constant expression or
a separate input relation.


Implementing a Python UDTF
--------------------------

.. currentmodule:: pyspark.sql.functions

To implement a Python UDTF, you can implement this class:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
To implement a Python UDTF, you can implement this class:
To implement a Python UDTF, you can define a class implementing these methods:


.. code-block:: python

class PythonUDTF:

def __init__(self) -> None:
"""
Initialize the user-defined table function (UDTF).
allisonwang-db marked this conversation as resolved.
Show resolved Hide resolved

This method is optional to implement and is called once when the UDTF is
instantiated. Use it to perform any initialization required for the UDTF.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also describe the UDTF class instance's lifetime here? For example, any class fields assigned here will be available for subsequent eval method call(s) to consume (either just one eval call for a UDTF call accepting only scalar constant arg(s) or several eval calls for a UDTF call accepting an input relation arg).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also should mention that it should be a default constructor which doesn’t accept any extra arguments?

"""
...

def eval(self, *args: Any) -> Iterator[Any]:
""""
Evaluate the function using the given input arguments.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking about this too, but I found it difficult to explain in words. The interface is the same as scalar UDFs so I think Spark users should be able to figure it out. I can provide more examples.

Copy link
Contributor

@dtenedor dtenedor Aug 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 more examples should be helpful. Maybe we could also add:

The arguments provided to the UDTF call map to the values in this *args list,
in order. Each provided scalar expression maps to exactly one value in this
*args list. Each provided TABLE argument of N columns maps to exactly N
values in this *args list, in the order of the columns as they appear in the
table.

allisonwang-db marked this conversation as resolved.
Show resolved Hide resolved

This method is required to implement.

Args:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not pretty sure if we should follow numpydoc style here since we're following them in overall PySpark code base. WDYT @HyukjinKwon ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah should follow numpy doc style I think

*args: Arbitrary positional arguments representing the input
to the UDTF.

Yields:
tuple: A tuple representing a single row in the UDTF result relation.
Yield thisas many times as needed to produce multiple rows.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo?: "thisas" -> "this as"


Note:
- The result must be a tuple.
- UDTFs do not accept keyword arguments on the calling side.
- Use "yield" to produce one row at a time for the UDTF result relation,
or use "return" to produce multiple rows for the UDTF result relation at once.
allisonwang-db marked this conversation as resolved.
Show resolved Hide resolved

Example:
def eval(self, x: int, y: int):
allisonwang-db marked this conversation as resolved.
Show resolved Hide resolved
yield x + y, x - y
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also add an example with a combination of scalar constant arguments and a relation input argument, to show how the mapping from provided SQL arguments to the python *args works? Could we include a SQL query and its results with each example as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! I will add a simple one here and a more complex one in the example section below.

"""
...

def terminate(self) -> Iterator[Any]:
"""
Called when the UDTF has processed all rows in a partition.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We haven't really precisely defined what comprises a partition yet. Should we define it using the definitions from #42100 and #42174? Alternatively if these docs are targeting Spark 3.5 but those PRs are only going into master, we could simply define a partition here as either (1) just one eval call with the provided scalar argument(s), if any, or (2) several eval calls with an undefined subset of the rows from the input relation. Then we can expand it later.


This method is optional to implement and is useful for performing any
cleanup or finalization operations after the UDTF has processed all rows.
You can also yield additional rows if needed.

Yields:
tuple: A tuple representing a single row in the UDTF result relation.
Yield this if you want to return additional rows during termination.

Example:
def terminate(self):
yield "done", None
"""
...


The return type of the UDTF must be either a ``StructType`` or a DDL string, both of which
define the schema of the UDTF output.

Here's a simple example of a UDTF implementation:

.. literalinclude:: ../../../../../examples/src/main/python/sql/udtf.py
:language: python
:lines: 39-53
:dedent: 4


For more detailed usage, please see :func:`udtf`.


Registering and Using Python UDTFs in SQL
-----------------------------------------

Python UDTFs can also be registered and used in SQL queries.

.. literalinclude:: ../../../../../examples/src/main/python/sql/udtf.py
:language: python
:lines: 58-84
:dedent: 4


Apache Arrow
------------
Apache Arrow is an in-memory columnar data format that is used in Spark to efficiently transfer
data between JVM and Python processes. Apache Arrow is by default enabled for Python UDTFs.
allisonwang-db marked this conversation as resolved.
Show resolved Hide resolved
You can set ``spark.sql.execution.pythonUDTF.arrow.enabled`` to ``false`` to disable Arrow optimization.

For more details, please see `Apache Arrow in PySpark <../arrow_pandas.rst>`_.


More Examples
-------------

A Python UDTF with `__init__` and `terminate`:

.. literalinclude:: ../../../../../examples/src/main/python/sql/udtf.py
:language: python
:lines: 89-109
:dedent: 4


A Python UDTF to generate a list of dates:

.. literalinclude:: ../../../../../examples/src/main/python/sql/udtf.py
:language: python
:lines: 114-148
:dedent: 4