Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pyarrow writer not encoding correct URL for partitions in delta table #2978

Open
gprashmi opened this issue Nov 5, 2024 · 22 comments
Open
Labels
bug Something isn't working

Comments

@gprashmi
Copy link

gprashmi commented Nov 5, 2024

Environment

Delta-rs version: 0.19.0

What happened:
We write data to delta table using delta-rs with PyArrow engine with DayHour as partition column.

        deltalake.write_deltalake(
            table_or_uri=delta_table_path,
            data=df,
            partition_by=[dayhour_partition_column],
            schema_mode='overwrite',
            mode="append",
            storage_options={"AWS_S3_ALLOW_UNSAFE_RENAME": "true"},
        )

I ran the optimize command using the spark sql query below on the delta table

optimize_query = f"""
OPTIMIZE delta.`s3_table_path`
ZORDER BY (col1, col2)
"""
spark.sql(optimize_query)

After optimize, it creates partitions with spaces and does not properly encode the partition urls as shown in the below image i.e; it creates new partitions url with spaces (.zstd.parquet).

image

@ion-elgreco Can you please let me know how we can run the optimize.compact without having partitions with spaces?

Similar issue was raised in June (#2634), where it was mentioned it is fixed in the 0.18.3 version but I still see the same issue when I optimize now. To clarify, I use Pyarrow engine and not Rust if that is causing the break in partitions.

@gprashmi gprashmi added the bug Something isn't working label Nov 5, 2024
@gprashmi
Copy link
Author

gprashmi commented Nov 5, 2024

I tried to install the 0.18.3 version, but it says its not available , so I installed 0.19.0 and tried to optimize. the table is written in the 0.17.4 version

ERROR: Could not find a version that satisfies the requirement deltalake==0.18.3 (from versions: 0.2.0, 0.2.1, 0.3.0, 0.4.0, 0.4.1, 0.4.4, 0.4.5, 0.4.6, 0.4.7, 0.4.8, 0.5.0, 0.5.1, 0.5.2, 0.5.3, 0.5.4, 0.5.5, 0.5.6, 0.5.7, 0.5.8, 0.6.0, 0.6.1, 0.6.2, 0.6.3, 0.6.4, 0.7.0, 0.8.0, 0.8.1, 0.9.0, 0.10.0, 0.10.1, 0.10.2, 0.11.0, 0.12.0, 0.13.0, 0.14.0, 0.15.0, 0.15.1, 0.15.2, 0.15.3, 0.16.0, 0.16.1, 0.16.2, 0.16.3, 0.16.4, 0.17.0, 0.17.1, 0.17.2, 0.17.3, 0.17.4, 0.18.0, 0.18.1, 0.18.2, 0.19.0, 0.19.1, 0.19.2, 0.20.0, 0.20.1, 0.20.2, 0.21.0)
ERROR: No matching distribution found for deltalake==0.18.3

@gprashmi
Copy link
Author

gprashmi commented Nov 6, 2024

@ion-elgreco @rtyler

@ion-elgreco
Copy link
Collaborator

Please use the latest version, this is already resolved

@gprashmi
Copy link
Author

gprashmi commented Nov 6, 2024

@ion-elgreco
I used the latest version 0.21.0 to both write the table and optimize/ vacuum it .. even then I still see the same encoding, pls find details below:

deltalake = "0.21.0"

I tried to even optimize without Z-order
optimize_query = f"""
OPTIMIZE delta.s3_table_path
"""
spark.sql(optimize_query)

vacuum_query = f"""
VACUUM delta.`{s3_table_path}`
RETAIN 168 HOURS
"""

# Execute the VACUUM command
spark.sql(vacuum_query)

But still see the partitions like this after optimize/ vacuum
image

@gprashmi
Copy link
Author

gprashmi commented Nov 6, 2024

with optimize, I also do not see the number of partition files reduced, in fact with new partitions (with spaces) the file count has increased. This is expected?

@gprashmi
Copy link
Author

gprashmi commented Nov 6, 2024

@ion-elgreco It only works sometimes and since there is broken partitions created the optimize fails sometimes due to this with the below error:

pyspark.errors.exceptions.connect.SparkConnectGrpcException: (org.apache.spark.SparkException) Job aborted due to stage failure: Task 0 in stage 989746.0 failed 4 times, most recent failure: Lost task 0.3 in stage 989746.0 (TID 9353124) (10.13.14.133 executor 6): org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] User defined function (CanonicalPathFunction: (string) => string) failed due to: java.net.URISyntaxException: Illegal character in path at index 18: DayHour=2024-11-06 22%253A00%253A00/part-00000-eac21980-5584-4515-9a80-bfe806ab5ceb.c000.snappy.parquet. SQLSTATE: 39000

@ion-elgreco
Copy link
Collaborator

Try recreating the table with latest version

@gprashmi
Copy link
Author

gprashmi commented Nov 7, 2024

@ion-elgreco Yes, like I mentioned in the previous comment both the write to table, optimize/ vacuum is done using the latest version (0.21.0) which still breaks due to spaces in partition.

When I write to the table, there are no spaces in the partitions, but after optimize the spaces are created. My partition column is DayHour which is like (2024-1-09 21:00:00), is the spaces created because of this during optimize? Should we not have date and hour together as partition column? Is there an alternative we can do for this?

@thomasfrederikhoeck
Copy link
Contributor

@gprashmi are you on Windows by any chance?

@gprashmi
Copy link
Author

gprashmi commented Nov 11, 2024

@thomasfrederikhoeck I have a windows laptop, but I run these on a kubeflow experiment on a databricks cluster.

@thomasfrederikhoeck
Copy link
Contributor

thomasfrederikhoeck commented Nov 12, 2024

Okay. It was just because a similar issue (apache/arrow-rs#5592) has been fixed upstream but I don't think object_store has been upgraded in delta-rs so the fix is not part of delta-rs yet.

@thomasfrederikhoeck
Copy link
Contributor

Yeah delta-rs used object-store=0.10.1 and the fix was added in object-store=0.10.2

https://github.com/delta-io/delta-rs/blob/7a3b3ec38ce1004eab1998669a5a80f8e61c5589/Cargo.toml#L44C1-L45C1

@thomasfrederikhoeck
Copy link
Contributor

I guess this PR is fixed then it should also fix this: #2843

@gprashmi
Copy link
Author

gprashmi commented Nov 12, 2024

@thomasfrederikhoeck thank you for the update. Can you please let me know when would the delta-rs be updated to have the object-store=0.10.2?

@ion-elgreco Based on the comment from @thomasfrederikhoeck it looks like this would be fixed when delta-rs uses the updated object-store=0.10.2 version. Can you please let me know if this is in plan to have the delta-rs updated to latest object-store version?

@ion-elgreco
Copy link
Collaborator

@thomasfrederikhoeck thank you for the update. Can you please let me know when would the delta-rs be updated to have the object-store=0.10.2?

@ion-elgreco Based on the comment from @thomasfrederikhoeck it looks like this would be fixed when delta-rs uses the updated object-store=0.10.2 version. Can you please let me know if this is in plan to have the delta-rs updated to latest object-store version?

Feel free to create a PR for it

@thomasfrederikhoeck
Copy link
Contributor

Maybe fixed by #2994

@thomasfrederikhoeck
Copy link
Contributor

I'm not 100% sure this fixes this case so maybe leave it open @ion-elgreco ?

@gprashmi
Copy link
Author

gprashmi commented Nov 15, 2024

@thomasfrederikhoeck @ion-elgreco This did not fix the issue. I installed the delta-rs as python package and updated the object_store = 0.10.2 in the Cargo.toml and tested the delta-write, optimize and vacuum. It still shows the spacing in URL:

Sample code to re-produce

import deltalake

# Dummy data
initial_data = {
    'dayhour': ['2024-10-09 19:00:00', '2024-10-10 20:00:00'],
    'value1': [10, 20],
    'value2': [1.5, 2.5]
}
initial_df = pd.DataFrame(initial_data)

initial_df['dayhour'] = pd.to_datetime(initial_df['dayhour'])

# Define the schema for the Delta Lake table
schema = pa.schema([
    pa.field('dayhour', pa.timestamp('us')),  
    pa.field('value1', pa.int32()),          
    pa.field('value2', pa.float32())         
])

# Initialize the Delta table with the schema
deltalake.write_deltalake(
    table_or_uri=delta_table_path,
    data=initial_df,
    schema=schema,
    partition_by=['dayhour'],
    schema_mode='overwrite',
    mode="overwrite",
    storage_options={"AWS_S3_ALLOW_UNSAFE_RENAME": "true"},
)

optimize_query = f"""
OPTIMIZE delta_table_path
"""
spark.sql(optimize_query)

vacuum_query = f"""
VACUUM delta_table_path
RETAIN 168 HOURS
"""
spark.sql(vacuum_query)

This resulted in spaces in the URL encoding after optimize as below:
before optimize:
image

after optimize:
image

Can you please re-open this ticket? as I am unable to reopen from my end.

@ion-elgreco ion-elgreco reopened this Nov 15, 2024
@gprashmi
Copy link
Author

gprashmi commented Nov 15, 2024

@ion-elgreco Thank you for re-opening. So I guess the updated version of object_store did not help in optimize here. Please let me know if there are any other suggestions/ alternatives we can use?
@thomasfrederikhoeck

@thomasfrederikhoeck
Copy link
Contributor

thomasfrederikhoeck commented Nov 15, 2024

Edit:
@gprashmi So this indicates that sparks encodes : but not colon space, right? You are in AWS, right?

@gprashmi
Copy link
Author

@thomasfrederikhoeck I think it encodes colon, but not the spaces in the dayhour column between date and hour in some URLs. and yes on AWS.

@thomasfrederikhoeck
Copy link
Contributor

@gprashmi does spark encode the space in all instances (write/merge/etc.) ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants