-
Notifications
You must be signed in to change notification settings - Fork 94
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[QUESTION] Pandas mutability causes different results compared to Spark and DuckDB #356
Comments
Hey @lukeb88, Thanks for the well-written issue. You've obviously spent a lot of time learning FugueSQL. I can answer the first question immediately while I look into the second. First, we need to break this expression down. The TLDR here is that Look at this.
So when you changed it to:
There are actually 2 statements here, the If we write it out fully with the query = '''
transformed = SELECT * FROM df1 WHERE 1=1 TRANSFORM USING make_new_col
YIELD DATAFRAME AS partial_result
PRINT transformed
...
'''
query = '''
transformed = SELECT * FROM df1 WHERE 1=1
transformed2 = TRANSFORM transformed USING make_new_col
YIELD DATAFRAME AS partial_result
PRINT transformed2
''' And you will find |
For the second question, there is something we should fix. It is a bug for the behavior to be inconsistent across execution engines, but I can explain why this happens and give you a way to work around it for now. This has to do with Pandas being mutable and Spark/DuckDB being immutable. For example, the code below has a function that does not return anything after performing an operation. import pandas as pd
df1 = pd.DataFrame({'col1': [1, 2, 3], 'col2': [ 2, 3, 4]})
def make_new_col(df):
df['col3'] = df['col1'] + df['col2']
make_new_col(df1)
df1.head() Still, the output of
But doing the same thing on PySpark results in from pyspark.sql import SparkSession
from pyspark.sql.functions import col
df1 = pd.DataFrame({'col1': [1, 2, 3], 'col2': [ 2, 3, 4]})
spark = SparkSession.builder.getOrCreate()
sdf = spark.createDataFrame(df1)
def make_new_col(sdf):
sdf = sdf.withColumn('col3', col('col1') + col('col2'))
make_new_col(sdf)
sdf.show() results in:
So let's recall in FugueSQL that you had the following statement:
And remember that Just to be clear, the So when you do:
on the Pandas backend, it will have
I tested this on all three backends and got consistent results. Just let me know if you still see something else. You may get faster help on Slack too if you're willing to join. Appreciate you digging in to FugueSQL! |
One last thing I forgot to mention is that the function # schema: *, col3:int
def make_new_col(df: pd.DataFrame) -> pd.DataFrame:
''''''
df['col3'] = df['col1'] + df['col2']
return df is the reason for the direct mutation of the Pandas DataFrame. You can change it to: # schema: *, col3:int
def make_new_col(df: pd.DataFrame) -> pd.DataFrame:
''''''
df = df.assign(col3 = df['col1'] + df['col2'])
return df which will create a copy of the DataFrame. That way, you will get I know this function was lifted off the tutorials. That's because I am trying to write code more familiar to Pandas users even if it may not be best practice. I guess it's coming back to bite me. 😬 |
Hi @kvnkho. thank you for your clear and detailed answers! Everything is much clearer to me now. My two cents for the problem raised from the mutability of Pandas and the immutability of Spark/DuckDB: I had the chance to see only a little bit of Fugue's code, and obviously I don't know how it works under the hood, however, couldn't it be an idea (with Pandas engine) that a deep copy of the dataframe is automatically passed to the python functions, in a completely transparent way for the user? while obviously there would be no need in the case of Spark/DuckDB... |
P.S. now that I had understood better how it works (just to complete your answer in case anyone else runs into this), another possibility to keep the consistency over all three engines is:
|
@lukeb88 that is a very good question, and I have to say it is a painful decision. Deepcopy can be very slow, but without a deepcopy it's not possible to prevent users from mutating the input dataframe (if not following good practice). Here our decision is in favor of the performance and we expect users to follow good practices. Given this context, what is your opinion about the choices? |
@goodwanghan I understand that it is a difficult choice. (Assuming that deepcopy can be done conditionally, only when Pandas has been chosen as the engine) I would probably go with deepcopy, keeping in mind that: it becomes a problem with large datasets, but at that point, it would probably not make sense to use pandas as the engine anyway. I understand that you place emphasis on performance, but on the other hand, Fugue is an interface and perhaps it's even more important the fact that it is 100% consistent across all engines... |
@lukeb88 , that is very well articulated. They shouldn't be using Pandas for larger datasets ideally. Thanks for that! And your example of using |
@lukeb88 I think it is very well said, and it also aligned with Fugue's priority: consistency is more important than performance. I will create a PR to make the change, or if you are interested you can create the first PR for Fugue :) |
The following code:
works in the same way using DuckDB, pands or Spark as engine. Returning:
But, if I change the first row of the sql, from:
transformed = TRANSFORM df1 USING make_new_col
To:
transformed = SELECT * FROM df1 WHERE 1=1 TRANSFORM USING make_new_col
I obtain 2 different solution, one for Pandas and another one for DuckDB and Spark: with Pandas the results remains the same as above, while for the other engines,
res['partial_result']
still the same, butres['result']
it's different:It seems that in the JOIN operation the
transformed
was missing of thecol3
generated by themake_new_col
function.Adding a
PRINT transformed
after the first yield (YIELD DATAFRAME AS partial_result
), i see that, for both pandas and Spark|DuckSB,transformed
does not contain the newcol3
.I don't understand 2 things at this point:
transformed
does not containscol3
, what is wrong withtransformed = SELECT * FROM df1 WHERE 1=1 TRANSFORM USING make_new_col
transformed
does not containscol3
, how it's possible that in after the JOIN i obtain aresult
with alsocol3
The text was updated successfully, but these errors were encountered: