Skip to content

Commit

Permalink
[SPARK-40434][SS][PYTHON][FOLLOWUP] Address review comments
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR addresses the review comments from the last round of review from HyukjinKwon  in apache#37893.

### Why are the changes needed?

Better documentation and removing unnecessary code.

### Does this PR introduce _any_ user-facing change?

Slight documentation change.

### How was this patch tested?

N/A

Closes apache#37964 from HeartSaVioR/SPARK-40434-followup.

Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
  • Loading branch information
HeartSaVioR committed Sep 22, 2022
1 parent c4a0360 commit e5b4b32
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 20 deletions.
13 changes: 7 additions & 6 deletions python/pyspark/sql/pandas/group_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,9 @@ def applyInPandasWithState(
per-group state. The result Dataset will represent the flattened record returned by the
function.
For a streaming Dataset, the function will be invoked first for all input groups and then
for all timed out states where the input data is set to be empty. Updates to each group's
state will be saved across invocations.
For a streaming :class:`DataFrame`, the function will be invoked first for all input groups
and then for all timed out states where the input data is set to be empty. Updates to each
group's state will be saved across invocations.
The function should take parameters (key, Iterator[`pandas.DataFrame`], state) and
return another Iterator[`pandas.DataFrame`]. The grouping key(s) will be passed as a tuple
Expand All @@ -257,10 +257,10 @@ def applyInPandasWithState(
user-defined state. The value of the state will be presented as a tuple, as well as the
update should be performed with the tuple. The corresponding Python types for
:class:DataType are supported. Please refer to the page
https://spark.apache.org/docs/latest/sql-ref-datatypes.html (python tab).
https://spark.apache.org/docs/latest/sql-ref-datatypes.html (Python tab).
The size of each DataFrame in both the input and output can be arbitrary. The number of
DataFrames in both the input and output can also be arbitrary.
The size of each `pandas.DataFrame` in both the input and output can be arbitrary. The
number of `pandas.DataFrame` in both the input and output can also be arbitrary.
.. versionadded:: 3.4.0
Expand Down Expand Up @@ -294,6 +294,7 @@ def applyInPandasWithState(
... total_len += len(pdf)
... state.update((total_len,))
... yield pd.DataFrame({"id": [key[0]], "countAsString": [str(total_len)]})
...
>>> df.groupby("id").applyInPandasWithState(
... count_fn, outputStructType="id long, countAsString string",
... stateStructType="len long", outputMode="Update",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,16 +98,6 @@ class ArrowWriter(val root: VectorSchemaRoot, fields: Array[ArrowFieldWriter]) {
count += 1
}

def sizeInBytes(): Int = {
var i = 0
var bytes = 0
while (i < fields.size) {
bytes += fields(i).getSizeInBytes()
i += 1
}
bytes
}

def finish(): Unit = {
root.setRowCount(count)
fields.foreach(_.finish())
Expand Down Expand Up @@ -142,10 +132,6 @@ private[arrow] abstract class ArrowFieldWriter {
count += 1
}

def getSizeInBytes(): Int = {
valueVector.getBufferSizeFor(count)
}

def finish(): Unit = {
valueVector.setValueCount(count)
}
Expand Down

0 comments on commit e5b4b32

Please sign in to comment.