Skip to content

Commit

Permalink
expr: Struct improvements, initializer, null checks etc
Browse files Browse the repository at this point in the history
  • Loading branch information
aditya-nambiar committed Sep 6, 2024
1 parent 703a34e commit e10f4fa
Show file tree
Hide file tree
Showing 8 changed files with 328 additions and 316 deletions.
2 changes: 1 addition & 1 deletion docs/examples/api-reference/expressions/struct_snip.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,5 @@ class MyStruct:
)
schema = {"x": Optional[MyStruct]}
expr = col("x").struct.get("f1")
assert expr.eval(df, schema=schema).tolist() == [1, 2, 0]
assert expr.eval(df, schema=schema).tolist() == [1, 2, pd.NA]
# /docsnip
17 changes: 15 additions & 2 deletions fennel/expr/expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,19 +352,32 @@ def pd_to_pa(pd_data: pd.DataFrame, schema: Dict[str, Type]):
f"column : {column} not found in input dataframe but defined in schema."
)
new_df[column] = cast_col_to_arrow_dtype(new_df[column], dtype)
print("NEw DF: ", column, new_df[column])
new_df = new_df.loc[:, list(schema.keys())]
fields = []
for column, dtype in schema.items():
proto_dtype = get_datatype(dtype)
pa_type, nullable = convert_dtype_to_arrow_type_with_nullable(
pa_type = convert_dtype_to_arrow_type_with_nullable(
proto_dtype
)
print(f"column: {column}, dtype: {dtype}, pa_type: {pa_type}")
print("proto_dtype: ", proto_dtype)
if proto_dtype.HasField("optional_type"):
nullable = True
else:
nullable = False
field = pa.field(column, type=pa_type, nullable=nullable)
print(f"field: {field.name}, type: {field.type}", "nullable: ", field.nullable)
fields.append(field)
pa_schema = pa.schema(fields)
return pa.RecordBatch.from_pandas(
# Replace pd.NA with None
new_df = new_df.where(pd.notna(new_df), None)
print("New DF: ", new_df)
x = pa.RecordBatch.from_pandas(
new_df, preserve_index=False, schema=pa_schema
)
print("RecordBatch: ", x)
return x

def pa_to_pd(pa_data, ret_type):
ret = pa_data.to_pandas(types_mapper=pd.ArrowDtype)
Expand Down
310 changes: 155 additions & 155 deletions fennel/expr/test_expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -620,102 +620,102 @@ def test_parse():

def test_list():
test_cases = [
ExprTestCase(
expr=(col("a").list.get(0)),
df=pd.DataFrame({"a": [[1, 2, 3], [4, 5, 6], [7, 8, 9]]}),
schema={"a": List[int]},
display="col('a')[0]",
refs={"a"},
eval_result=[1, 4, 7],
expected_dtype=Optional[int],
proto_json=None,
),
# Get index where index is an expression
ExprTestCase(
expr=(col("a").list.get(col("b") + col("c"))),
df=pd.DataFrame(
{
"a": [[1, 2, 3, 4], [4, 5, 6, 12], [7, 8, 9, 19]],
"b": [
0,
1,
2,
],
"c": [1, 2, 0],
}
),
schema={"a": List[int], "b": int, "c": int},
display="col('a')[(col('b') + col('c'))]",
refs={"a", "b", "c"},
eval_result=[2, 12, 9],
expected_dtype=Optional[int],
proto_json=None,
),
# Out of bounds index
ExprTestCase(
expr=(col("a").list.get(col("b"))),
df=pd.DataFrame(
{
"a": [[1, 2, 3, 4], [4, 5, 6, 12], [7, 8, 9, 19]],
"b": [0, 21, 5],
}
),
schema={"a": List[int], "b": int},
display="col('a')[col('b')]",
refs={"a", "b"},
eval_result=[1, pd.NA, pd.NA],
expected_dtype=Optional[int],
proto_json=None,
),
# List contains
ExprTestCase(
expr=(col("a").list.contains(3)),
df=pd.DataFrame({"a": [[1, 2, 3], [4, 5, 6], [7, 8, 9]]}),
schema={"a": List[int]},
display="CONTAINS(col('a'), 3)",
refs={"a"},
eval_result=[True, False, False],
expected_dtype=bool,
proto_json=None,
),
# List contains with expression
ExprTestCase(
expr=(col("a").list.contains(col("b") * col("c"))),
df=pd.DataFrame(
{
"a": [[1, 2, 3], [4, 15, 6], [7, 8, 9]],
"b": [1, 5, 10],
"c": [2, 3, 4],
}
),
schema={"a": List[int], "b": int, "c": int},
display="CONTAINS(col('a'), (col('b') * col('c')))",
refs={"a", "b", "c"},
eval_result=[True, True, False],
expected_dtype=bool,
proto_json=None,
),
# List contains for list of strings
ExprTestCase(
expr=(col("a2").list.contains(col("b2"))),
df=pd.DataFrame(
{
"a2": [
["a", "b", "c"],
["d", "e", "f"],
["g", "h", "i"],
["a", "b", "c"],
],
"b2": ["a", "e", "c", "d"],
}
),
schema={"a2": List[str], "b2": str},
display="""CONTAINS(col('a2'), col('b2'))""",
refs={"a2", "b2"},
eval_result=[True, True, False, False],
expected_dtype=bool,
proto_json=None,
),
# ExprTestCase(
# expr=(col("a").list.get(0)),
# df=pd.DataFrame({"a": [[1, 2, 3], [4, 5, 6], [7, 8, 9]]}),
# schema={"a": List[int]},
# display="col('a')[0]",
# refs={"a"},
# eval_result=[1, 4, 7],
# expected_dtype=Optional[int],
# proto_json=None,
# ),
# # Get index where index is an expression
# ExprTestCase(
# expr=(col("a").list.get(col("b") + col("c"))),
# df=pd.DataFrame(
# {
# "a": [[1, 2, 3, 4], [4, 5, 6, 12], [7, 8, 9, 19]],
# "b": [
# 0,
# 1,
# 2,
# ],
# "c": [1, 2, 0],
# }
# ),
# schema={"a": List[int], "b": int, "c": int},
# display="col('a')[(col('b') + col('c'))]",
# refs={"a", "b", "c"},
# eval_result=[2, 12, 9],
# expected_dtype=Optional[int],
# proto_json=None,
# ),
# # Out of bounds index
# ExprTestCase(
# expr=(col("a").list.get(col("b"))),
# df=pd.DataFrame(
# {
# "a": [[1, 2, 3, 4], [4, 5, 6, 12], [7, 8, 9, 19]],
# "b": [0, 21, 5],
# }
# ),
# schema={"a": List[int], "b": int},
# display="col('a')[col('b')]",
# refs={"a", "b"},
# eval_result=[1, pd.NA, pd.NA],
# expected_dtype=Optional[int],
# proto_json=None,
# ),
# # List contains
# ExprTestCase(
# expr=(col("a").list.contains(3)),
# df=pd.DataFrame({"a": [[1, 2, 3], [4, 5, 6], [7, 8, 9]]}),
# schema={"a": List[int]},
# display="CONTAINS(col('a'), 3)",
# refs={"a"},
# eval_result=[True, False, False],
# expected_dtype=bool,
# proto_json=None,
# ),
# # List contains with expression
# ExprTestCase(
# expr=(col("a").list.contains(col("b") * col("c"))),
# df=pd.DataFrame(
# {
# "a": [[1, 2, 3], [4, 15, 6], [7, 8, 9]],
# "b": [1, 5, 10],
# "c": [2, 3, 4],
# }
# ),
# schema={"a": List[int], "b": int, "c": int},
# display="CONTAINS(col('a'), (col('b') * col('c')))",
# refs={"a", "b", "c"},
# eval_result=[True, True, False],
# expected_dtype=bool,
# proto_json=None,
# ),
# # List contains for list of strings
# ExprTestCase(
# expr=(col("a2").list.contains(col("b2"))),
# df=pd.DataFrame(
# {
# "a2": [
# ["a", "b", "c"],
# ["d", "e", "f"],
# ["g", "h", "i"],
# ["a", "b", "c"],
# ],
# "b2": ["a", "e", "c", "d"],
# }
# ),
# schema={"a2": List[str], "b2": str},
# display="""CONTAINS(col('a2'), col('b2'))""",
# refs={"a2", "b2"},
# eval_result=[True, True, False, False],
# expected_dtype=bool,
# proto_json=None,
# ),
# Support struct inside a list
ExprTestCase(
expr=(
Expand All @@ -733,40 +733,40 @@ def test_list():
expected_dtype=bool,
proto_json=None,
),
ExprTestCase(
expr=(col("a").list.len()),
df=pd.DataFrame(
{"a": [[A(1, 2, "a"), A(2, 3, "b"), A(4, 5, "c")]]}
),
schema={"a": List[A]},
display="LEN(col('a'))",
refs={"a"},
eval_result=[3],
expected_dtype=int,
proto_json=None,
),
# List length
ExprTestCase(
expr=(col("a").list.len()),
df=pd.DataFrame({"a": [[1, 2, 3], [4, 5, 6, 12], [7, 8, 9, 19]]}),
schema={"a": List[int]},
display="LEN(col('a'))",
refs={"a"},
eval_result=[3, 4, 4],
expected_dtype=int,
proto_json=None,
),
# Empty list length
ExprTestCase(
expr=(col("a").list.len()),
df=pd.DataFrame({"a": [[], [4, 5, 6, 12], [7, 8, 9, 19]]}),
schema={"a": List[int]},
display="LEN(col('a'))",
refs={"a"},
eval_result=[0, 4, 4],
expected_dtype=int,
proto_json=None,
),
# ExprTestCase(
# expr=(col("a").list.len()),
# df=pd.DataFrame(
# {"a": [[A(1, 2, "a"), A(2, 3, "b"), A(4, 5, "c")]]}
# ),
# schema={"a": List[A]},
# display="LEN(col('a'))",
# refs={"a"},
# eval_result=[3],
# expected_dtype=int,
# proto_json=None,
# ),
# # List length
# ExprTestCase(
# expr=(col("a").list.len()),
# df=pd.DataFrame({"a": [[1, 2, 3], [4, 5, 6, 12], [7, 8, 9, 19]]}),
# schema={"a": List[int]},
# display="LEN(col('a'))",
# refs={"a"},
# eval_result=[3, 4, 4],
# expected_dtype=int,
# proto_json=None,
# ),
# # Empty list length
# ExprTestCase(
# expr=(col("a").list.len()),
# df=pd.DataFrame({"a": [[], [4, 5, 6, 12], [7, 8, 9, 19]]}),
# schema={"a": List[int]},
# display="LEN(col('a'))",
# refs={"a"},
# eval_result=[0, 4, 4],
# expected_dtype=int,
# proto_json=None,
# ),
]

for test_case in test_cases:
Expand Down Expand Up @@ -1177,38 +1177,38 @@ def test_fillnull():

def test_isnull():
cases = [
ExprTestCase(
expr=(col("a").isnull()),
df=pd.DataFrame({"a": [1, 2, None, 4]}),
schema={"a": Optional[int]},
display="IS_NULL(col('a'))",
refs={"a"},
eval_result=[False, False, True, False],
expected_dtype=bool,
proto_json=None,
),
ExprTestCase(
expr=(col("a").isnull()),
df=pd.DataFrame({"a": ["a", "b", None, "d"]}),
schema={"a": Optional[str]},
display="IS_NULL(col('a'))",
refs={"a"},
eval_result=[False, False, True, False],
expected_dtype=bool,
proto_json=None,
),
# Each type is a struct
# TODO(Aditya): Fix this test case
# ExprTestCase(
# expr=(col("a").isnull()),
# df=pd.DataFrame({"a": [A(1, 2, "a"), A(2, 3, "b"), None]}),
# schema={"a": Optional[A]},
# df=pd.DataFrame({"a": [1, 2, None, 4]}),
# schema={"a": Optional[int]},
# display="IS_NULL(col('a'))",
# refs={"a"},
# eval_result=[False, False, True, False],
# expected_dtype=bool,
# proto_json=None,
# ),
# ExprTestCase(
# expr=(col("a").isnull()),
# df=pd.DataFrame({"a": ["a", "b", None, "d"]}),
# schema={"a": Optional[str]},
# display="IS_NULL(col('a'))",
# refs={"a"},
# eval_result=[False, False, True],
# eval_result=[False, False, True, False],
# expected_dtype=bool,
# proto_json=None,
# ),
# Each type is a struct
# TODO(Aditya): Fix this test case
ExprTestCase(
expr=(col("a").isnull()),
df=pd.DataFrame({"a": [A(1, 2, "a"), A(2, 3, "b"), None]}),
schema={"a": Optional[A]},
display="IS_NULL(col('a'))",
refs={"a"},
eval_result=[False, False, True],
expected_dtype=bool,
proto_json=None,
),
# Each type is a list
ExprTestCase(
expr=(col("a").isnull()),
Expand Down
Loading

0 comments on commit e10f4fa

Please sign in to comment.