Skip to content

Commit

Permalink
SNOW-1800374: adding support for options and partition_by to datafram…
Browse files Browse the repository at this point in the history
…ewriter (#2841)

<!---
Please answer these questions before creating your pull request. Thanks!
--->

1. Which Jira issue is this PR addressing? Make sure that there is an
accompanying issue to your PR.

   <!---
   In this section, please add a Snowflake Jira issue number.

Note that if a corresponding GitHub issue exists, you should still
include
   the Snowflake Jira issue number. For example, for GitHub issue
#1400, you should
   add "SNOW-1335071" here.
    --->

   Fixes SNOW-1800374

2. Fill out the following pre-review checklist:

- [x] I am adding a new automated test(s) to verify correctness of my
new code
- [ ] If this test skips Local Testing mode, I'm requesting review from
@snowflakedb/local-testing
   - [ ] I am adding new logging messages
   - [ ] I am adding a new telemetry message
   - [ ] I am adding new credentials
   - [ ] I am adding a new dependency
- [ ] If this is a new feature/behavior, I'm adding the Local Testing
parity changes.
- [x] I acknowledge that I have ensured my changes to be thread-safe.
Follow the link for more information: [Thread-safe Developer
Guidelines](https://github.com/snowflakedb/snowpark-python/blob/main/CONTRIBUTING.md#thread-safe-development)

3. Please describe how your code solves the related issue.

Please write a short description of how your code change solves the
related issue.
  • Loading branch information
sfc-gh-batur authored Jan 14, 2025
1 parent 01b8f33 commit 2f06485
Show file tree
Hide file tree
Showing 3 changed files with 566 additions and 107 deletions.
70 changes: 36 additions & 34 deletions src/snowflake/snowpark/_internal/proto/ast.proto
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ message PythonTimeZone {
int64 offset_seconds = 2;
}

// sp-type.ir:70
// sp-type.ir:71
message SpCallable {
int64 id = 1;
string name = 2;
Expand Down Expand Up @@ -136,71 +136,71 @@ message SpDataType {
}
}

// sp-type.ir:27
// sp-type.ir:28
message SpArrayType {
bool structured = 1;
SpDataType ty = 2;
}

// sp-type.ir:31
// sp-type.ir:32
message SpColumnIdentifier {
string name = 1;
}

// sp-type.ir:33
// sp-type.ir:34
message SpDecimalType {
int64 precision = 1;
int64 scale = 2;
}

// sp-type.ir:40
// sp-type.ir:41
message SpMapType {
SpDataType key_ty = 1;
bool structured = 2;
SpDataType value_ty = 3;
}

// sp-type.ir:43
// sp-type.ir:44
message SpStringType {
google.protobuf.Int64Value length = 1;
}

// sp-type.ir:44
// sp-type.ir:45
message SpStructField {
SpColumnIdentifier column_identifier = 1;
SpDataType data_type = 2;
bool nullable = 3;
}

// sp-type.ir:45
// sp-type.ir:46
message SpStructType {
repeated SpStructField fields = 1;
bool structured = 2;
}

// sp-type.ir:47
// sp-type.ir:48
message SpTimestampType {
SpTimestampTimeZone time_zone = 1;
}

// sp-type.ir:49
// sp-type.ir:50
message SpVectorType {
int64 dimension = 1;
SpDataType ty = 2;
}

// sp-type.ir:51
// sp-type.ir:52
message SpPandasSeriesType {
SpDataType el_ty = 1;
}

// sp-type.ir:52
// sp-type.ir:53
message SpPandasDataFrameType {
repeated string col_names = 1;
repeated SpDataType col_types = 2;
}

// sp-type.ir:59
// sp-type.ir:60
message SpDataframeData {
oneof sealed_value {
SpDataframeData_List sp_dataframe_data__list = 1;
Expand All @@ -209,35 +209,35 @@ message SpDataframeData {
}
}

// sp-type.ir:60
// sp-type.ir:61
message SpDataframeData_List {
repeated Expr vs = 1;
}

// sp-type.ir:61
// sp-type.ir:62
message SpDataframeData_Tuple {
repeated Expr vs = 1;
}

// sp-type.ir:62
// sp-type.ir:63
message SpDataframeData_Pandas {
StagedPandasDataframe v = 1;
}

// sp-type.ir:65
// sp-type.ir:66
message SpDataframeSchema {
oneof sealed_value {
SpDataframeSchema_List sp_dataframe_schema__list = 1;
SpDataframeSchema_Struct sp_dataframe_schema__struct = 2;
}
}

// sp-type.ir:66
// sp-type.ir:67
message SpDataframeSchema_List {
repeated string vs = 1;
}

// sp-type.ir:67
// sp-type.ir:68
message SpDataframeSchema_Struct {
SpStructType v = 1;
}
Expand Down Expand Up @@ -292,20 +292,20 @@ message SpNullOrder {
}
}

// sp-type.ir:82
// sp-type.ir:83
message SpPivotValue {
oneof sealed_value {
SpPivotValue_Dataframe sp_pivot_value__dataframe = 1;
SpPivotValue_Expr sp_pivot_value__expr = 2;
}
}

// sp-type.ir:83
// sp-type.ir:84
message SpPivotValue_Expr {
Expr v = 1;
}

// sp-type.ir:84
// sp-type.ir:85
message SpPivotValue_Dataframe {
SpDataframeRef v = 1;
}
Expand Down Expand Up @@ -363,7 +363,7 @@ message SrcPosition {
int64 start_line = 5;
}

// sp-type.ir:55
// sp-type.ir:56
message StagedPandasDataframe {
SpNameRef temp_table = 1;
}
Expand Down Expand Up @@ -1585,7 +1585,7 @@ message SpDataframeApply {
SrcPosition src = 3;
}

// sp-df-io.ir:185
// sp-df-io.ir:187
message SpDataframeCacheResult {
SpDataframeExpr df = 1;
SrcPosition src = 2;
Expand All @@ -1610,7 +1610,7 @@ message SpDataframeCollect {
repeated Tuple_String_String statement_params = 7;
}

// sp-df-io.ir:167
// sp-df-io.ir:169
message SpDataframeCopyIntoTable {
repeated Tuple_String_Expr copy_options = 1;
SpDataframeExpr df = 2;
Expand All @@ -1634,7 +1634,7 @@ message SpDataframeCount {
repeated Tuple_String_String statement_params = 4;
}

// sp-df-io.ir:151
// sp-df-io.ir:153
message SpDataframeCreateOrReplaceDynamicTable {
List_Expr clustering_keys = 1;
google.protobuf.StringValue comment = 2;
Expand All @@ -1652,7 +1652,7 @@ message SpDataframeCreateOrReplaceDynamicTable {
string warehouse = 14;
}

// sp-df-io.ir:143
// sp-df-io.ir:145
message SpDataframeCreateOrReplaceView {
google.protobuf.StringValue comment = 1;
SpDataframeExpr df = 2;
Expand Down Expand Up @@ -2248,8 +2248,10 @@ message SpDataframeWithColumns {
// sp-df-io.ir:84
message SpDataframeWrite {
SpDataframeExpr df = 1;
SpSaveMode save_mode = 2;
SrcPosition src = 3;
repeated Tuple_String_Expr options = 2;
Expr partition_by = 3;
SpSaveMode save_mode = 4;
SrcPosition src = 5;
}

message SpDataframeWriter {
Expand Down Expand Up @@ -2609,7 +2611,7 @@ message SpWindowSpecRowsBetween {
message SpWindowType {
}

// sp-df-io.ir:133
// sp-df-io.ir:135
message SpWriteCopyIntoLocation {
bool block = 1;
repeated Tuple_String_Expr copy_options = 2;
Expand All @@ -2624,7 +2626,7 @@ message SpWriteCopyIntoLocation {
repeated Tuple_String_String statement_params = 11;
}

// sp-df-io.ir:100
// sp-df-io.ir:102
message SpWriteCsv {
bool block = 1;
repeated Tuple_String_Expr copy_options = 2;
Expand All @@ -2646,7 +2648,7 @@ message SpWriteFile {
}
}

// sp-df-io.ir:104
// sp-df-io.ir:106
message SpWriteJson {
bool block = 1;
repeated Tuple_String_Expr copy_options = 2;
Expand Down Expand Up @@ -2676,7 +2678,7 @@ message SpWritePandas {
string table_type = 13;
}

// sp-df-io.ir:108
// sp-df-io.ir:110
message SpWriteParquet {
bool block = 1;
repeated Tuple_String_Expr copy_options = 2;
Expand All @@ -2689,7 +2691,7 @@ message SpWriteParquet {
repeated Tuple_String_String statement_params = 9;
}

// sp-df-io.ir:112
// sp-df-io.ir:114
message SpWriteTable {
bool block = 1;
google.protobuf.BoolValue change_tracking = 2;
Expand Down
24 changes: 22 additions & 2 deletions src/snowflake/snowpark/dataframe_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
)
from snowflake.snowpark._internal.ast.utils import (
build_expr_from_snowpark_column_or_col_name,
build_expr_from_snowpark_column_or_sql_str,
build_expr_from_snowpark_column_or_python_val,
debug_check_missing_ast,
fill_sp_save_mode,
fill_sp_write_file,
Expand Down Expand Up @@ -130,19 +132,37 @@ def mode(self, save_mode: str, _emit_ast: bool = True) -> "DataFrameWriter":

return self

def partition_by(self, expr: ColumnOrSqlExpr) -> "DataFrameWriter":
def partition_by(
self, expr: ColumnOrSqlExpr, _emit_ast: bool = True
) -> "DataFrameWriter":
"""Specifies an expression used to partition the unloaded table rows into separate files. It can be a
:class:`Column`, a column name, or a SQL expression.
"""
self._partition_by = expr

# Update AST if it exists.
if _emit_ast:
if self._ast_stmt is not None:
build_expr_from_snowpark_column_or_sql_str(
self._ast_stmt.expr.sp_dataframe_write.partition_by, expr
)

return self

def option(self, key: str, value: Any) -> "DataFrameWriter":
def option(self, key: str, value: Any, _emit_ast: bool = True) -> "DataFrameWriter":
"""Depending on the ``file_format_type`` specified, you can include more format specific options.
Use the options documented in the `Format Type Options <https://docs.snowflake.com/en/sql-reference/sql/copy-into-location.html#format-type-options-formattypeoptions>`__.
"""
aliased_key = get_aliased_option_name(key, WRITER_OPTIONS_ALIAS_MAP)
self._cur_options[aliased_key] = value

# Update AST if it exists.
if _emit_ast:
if self._ast_stmt is not None:
t = self._ast_stmt.expr.sp_dataframe_write.options.add()
t._1 = aliased_key
build_expr_from_snowpark_column_or_python_val(t._2, value)

return self

def options(self, configs: Optional[Dict] = None, **kwargs) -> "DataFrameWriter":
Expand Down
Loading

0 comments on commit 2f06485

Please sign in to comment.