Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1015703: Memory Leak when running queries with many PARTITION clauses #1859

Closed
willgraf opened this issue Jan 22, 2024 · 5 comments
Closed

Comments

@willgraf
Copy link

willgraf commented Jan 22, 2024

Python version

Python 3.9.18 (main, Aug 25 2023, 13:20:04) [GCC 9.4.0]

Operating system and processor architecture

Linux-5.10.199-190.747.amzn2.x86_64-x86_64-with-glibc2.31

Installed packages

amqp==5.2.0
anyio==4.2.0
asgiref==3.7.2
asn1crypto==1.5.1
async-timeout==4.0.3
attrs==23.2.0
aws-encryption-sdk==3.1.1
backoff==2.2.1
billiard==3.6.4.0
boto3==1.34.24
botocore==1.34.24
celery==5.2.7
certifi==2023.11.17
cffi==1.16.0
charset-normalizer==3.3.2
click==8.1.7
click-didyoumean==0.3.0
click-plugins==1.1.1
click-repl==0.3.0
cryptography==39.0.2
Deprecated==1.2.14
distlib==0.3.8
exceptiongroup==1.2.0
fastapi==0.92.0
filelock==3.13.1
googleapis-common-protos==1.62.0
greenlet==3.0.3
grpcio==1.60.0
gunicorn==20.1.0
h11==0.14.0
idna==3.6
importlib-metadata==6.11.0
Jinja2==3.1.3
jmespath==1.0.1
kombu==5.3.5
MarkupSafe==2.1.4
newrelic==7.16.0.178
numpy==1.26.3
oauthlib==3.2.2
packaging==23.2
pandas==1.3.5
pipenv==2023.11.15
platformdirs==3.11.0
prompt-toolkit==3.0.43
protobuf==4.25.2
pycparser==2.21
pydantic==1.8.2
PyJWT==2.5.0
pyodbc==4.0.39
pyOpenSSL==23.2.0
python-dateutil==2.8.2
pytz==2023.3.post1
PyYAML==5.4.1
redis==5.0.1
requests==2.31.0
requests-oauthlib==1.3.1
s3transfer==0.10.0
six==1.16.0
sniffio==1.3.0
snowflake-connector-python==3.5.0
sortedcontainers==2.4.0
sql_metadata==2.10.0
SQLAlchemy==1.4.51
sqlparse==0.4.4
starlette==0.25.0
tomlkit==0.12.3
types-cryptography==3.3.23.2
typing_extensions==4.4.0
urllib3==1.26.18
uvicorn==0.17.6
vine==5.1.0
virtualenv==20.25.0
wcwidth==0.2.13
wrapt==1.16.0
zipp==3.17.0

What did you do?

Summary

This service uses snowflake-connector-python in a Celery worker to execute a snowflake query and extract results to an S3 bucket. Simple queries (SELECT * FROM table) behave as expected. However, queries with several PARTITION clauses cause a memory leak and the process is OOM Killed.

Steps to reproduce

I generated a mock dataset with 10M rows via a Python worksheet

# The Snowpark package is required for Python Worksheets. 
# You can add more packages by selecting them using the Packages control and then importing them.

import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import col
import pandas as pd
from snowflake.connector.pandas_tools import write_pandas
import numpy as np

from random import randrange
from datetime import timedelta, datetime

# 2 years, pre aggregate
def random_date(x,y, start=datetime(2021,1,1), end=datetime(2023,1,1)):
    """
    This function will return a random datetime between two datetime 
    objects.
    """
    delta = end - start
    int_delta = (delta.days * 24 * 60 * 60) + delta.seconds
    random_second = randrange(int_delta)
    return (start + timedelta(seconds=random_second))
    
def create_df():
    max_rows = 10000000
    float_column_count = 13
    max_float = 10
    rng = np.random.default_rng(seed=42)
    float_list = [f'float{i}' for i in range(float_column_count)]
    float_df = pd.DataFrame(rng.random((max_rows, float_column_count)), columns=float_list)
    float_df = float_df*max_float
    dt_arr = np.fromfunction(np.vectorize(random_date), (max_rows, 1), dtype=float)
    int_column_count = 3
    max_int = 10
    int_column_list = [f'int{i}' for i in range(int_column_count)]
    
    int_df = pd.DataFrame(np.random.randint(0, max_int, (max_rows, int_column_count)), columns=int_column_list)
    dt_df = pd.DataFrame(dt_arr, columns=['date'])
    segment_basename = 'segment_no_'
    num_labels = 5
    segment_labels = [f'{segment_basename}{i}' for i in range(num_labels)]
    segment_labels = np.random.choice(segment_labels, (max_rows, 1))
    user_max_id = 20000

    user_df = pd.DataFrame(np.random.randint(0, user_max_id, (max_rows, 1)), columns=["userid"])

    labels_df = pd.DataFrame(segment_labels, columns=['segment'])
    stitched_df = pd.concat([dt_df, user_df, labels_df, int_df], axis=1)
    with_floats = pd.concat([stitched_df, float_df], axis=1)
    return with_floats
    
def main(session: snowpark.Session): 
    # Your code goes here, inside the "main" handler.
    
    # Return value will appear in the Results tab.
    session.connection.cursor().execute(
    "CREATE OR REPLACE TABLE "
    "test_table(date date, userid integer, segment string, int0 integer, int1 integer, int2 integer, float0 float, float1 float, float2 float, float3 float, float4 float, float5 float, float6 float, float7 float, float8 float, float9 float, float10 float, float11 float, float12 float)"
    )
    df = create_df()
    write_pandas(session.connection, df, "test_table2", auto_create_table=True)
    return df

Then I run a query and repeatedly fetchmany until all results have been fetched:

query = """
SELECT
  TO_DATE("date"::string) as realdate
  ,"userid"
  ,"segment"
  ,count(distinct "int0") as cat1
  ,sum("int1") as cat2
  ,sum("int2") as cat3
  ,case when cat2 = 0 then 0 else cat3/cat2 end as cat4
  ,avg(cat1) OVER ( PARTITION BY "userid", "userid" ORDER BY TO_DATE("date"::string) ROWS between 6 PRECEDING AND CURRENT ROW) as seven_day_avg_cat1
  ,avg(cat2) OVER ( PARTITION BY "userid", "userid" ORDER BY TO_DATE("date"::string) ROWS between 6 PRECEDING AND CURRENT ROW) as seven_day_avg_cat2
  ,avg(cat3) OVER ( PARTITION BY "userid", "userid" ORDER BY TO_DATE("date"::string) ROWS between 6 PRECEDING AND CURRENT ROW) as seven_day_avg_cat3
  ,avg(cat4) OVER ( PARTITION BY "userid", "userid" ORDER BY TO_DATE("date"::string) ROWS between 6 PRECEDING AND CURRENT ROW) as seven_day_avg_cat4
  ,avg(cat1) OVER ( PARTITION BY "userid", "userid" ORDER BY TO_DATE("date"::string) ROWS between 13 PRECEDING AND CURRENT ROW) as two_week_avg_cat1
  ,avg(cat2) OVER ( PARTITION BY "userid", "userid" ORDER BY TO_DATE("date"::string) ROWS between 13 PRECEDING AND CURRENT ROW) as two_week_avg_cat2
  ,avg(cat3) OVER ( PARTITION BY "userid", "userid" ORDER BY TO_DATE("date"::string) ROWS between 13 PRECEDING AND CURRENT ROW) as two_week_avg_cat3
  ,avg(cat4) OVER ( PARTITION BY "userid", "userid" ORDER BY TO_DATE("date"::string) ROWS between 13 PRECEDING AND CURRENT ROW) as two_week_avg_cat4
  ,avg(cat1) OVER ( PARTITION BY "userid", "userid" ORDER BY TO_DATE("date"::string) ROWS between 30 PRECEDING AND CURRENT ROW) as month_avg_cat1
  ,avg(cat2) OVER ( PARTITION BY "userid", "userid" ORDER BY TO_DATE("date"::string) ROWS between 30 PRECEDING AND CURRENT ROW) as month_avg_cat2
  ,avg(cat3) OVER ( PARTITION BY "userid", "userid" ORDER BY TO_DATE("date"::string) ROWS between 30 PRECEDING AND CURRENT ROW) as month_avg_cat3
  ,avg(cat4) OVER ( PARTITION BY "userid", "userid" ORDER BY TO_DATE("date"::string) ROWS between 30 PRECEDING AND CURRENT ROW) as month_avg_cat4
  FROM "test_table2"
group by 1, 2, 3
order by 1,2,3
;
"""
connection = snowflake.connector.connect(...)

curs.execute_async(query)
query_id = curs.sfqid

while connection.is_still_running(query_status):
   # monitor status
   time.sleep(3)

curs.get_results_from_sfqid(query_id)

size = 10000
data = curs.fetchmany(size)
while data:
    yield data
    data = curs.fetchmany(size)

Versioning

I found this behavior was introduced in version 3.3.0 and exists up through 3.6.0. v3.2.1 seems to work as expected.

What did you expect to see?

I expected to see memory footprint the same as the SELECT * and did NOT expect the query to change the memory footprint of the container using the snowflake-connector-python lib.

The PARTITION query uses up nearly 8GB of RAM and the vanilla SELECT query uses <1GB.

Can you set logging to DEBUG and collect the logs?

No response

@github-actions github-actions bot changed the title Memory Leak when running queries with many PARTITION clauses SNOW-1015703: Memory Leak when running queries with many PARTITION clauses Jan 22, 2024
@sfc-gh-yixie
Copy link
Collaborator

@willgraf So the problem you observed is python connector used more memory when the sql had partition by? The SQL runs in the Snowflake server. In theory it shouldn't impact the client memory footprint unless the returned dataset has different sizes. Is the other sql select * from test_table2?

@willgraf
Copy link
Author

Yes, the other SQL that uses <1GB of RAM is select * from test_table2, and the other query with partition by is using nearly 8GB of RAM. I've reproduced this behavior in versions 3.3 through 3.6 (but 3.2.1 behaves as expected). Additionally, the amount of memory seems to scale with the number of partition by statements.

I was also surprised to see this behavior as I would have thought the memory usage would be independent of the query as the server runs the query.

@sfc-gh-aling
Copy link
Collaborator

it might be related to our nanoarrow c extension change -- starting from v3.3.0 we switched to use nanoarrow as our underlying data converter. I will take a closer look at it.

@sfc-gh-aling
Copy link
Collaborator

sfc-gh-aling commented Jan 30, 2024

I think I have found the leaking code -- it's inside the decimal data extraction in the cython extension module, avg(cat1) OVER ( PARTITION BY.. would return column of decimal data type, and that also explains "scale with the number of partition by statements.".

I have a PR to fix it: #1867
I'm running some stress test to make sure it's not leaking any more. we will do a patch release once we confirm the fix.

thanks for the report!

@sfc-gh-aling
Copy link
Collaborator

hey @willgraf, we have released v3.7.1 which includes the fix. please try it out and let me know if it works.
I'm closing the ticket, but feel free to reopen it if you still encounter the issue

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants