Skip to content

Commit

Permalink
Fix input to final dataframe dq rules (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
phanikumarvemuri authored Aug 14, 2023
1 parent 0268543 commit 78c4759
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 10 deletions.
4 changes: 2 additions & 2 deletions spark_expectations/core/expectations.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ def wrapper(*args: tuple, **kwargs: dict) -> DataFrame:
_,
status,
) = func_process(
_df,
_row_dq_df,
self._context.get_agg_dq_rule_type_name,
final_agg_dq_flag=True,
error_count=_error_count,
Expand Down Expand Up @@ -510,7 +510,7 @@ def wrapper(*args: tuple, **kwargs: dict) -> DataFrame:
_,
status,
) = func_process(
_df,
_row_dq_df,
self._context.get_query_dq_rule_type_name,
final_query_dq_flag=True,
error_count=_error_count,
Expand Down
31 changes: 23 additions & 8 deletions tests/core/test_expectations.py
Original file line number Diff line number Diff line change
Expand Up @@ -1296,6 +1296,7 @@ def fixture_create_stats_table():
# row meets all row_dq_expectations
{"col1": 3, "col2": "c", "col3": 6},
# row meets all row_dq_expectations
{"col1": 2, "col2": "d", "col3": 7}
]
),
{ # expectations rules
Expand Down Expand Up @@ -1340,12 +1341,25 @@ def fixture_create_stats_table():
"rule_type": "agg_dq",
"rule": "stddev_col3_threshold",
"column_name": "col3",
"expectation": "stddev(col3) > 0",
"expectation": "stddev(col3) > 1",
"enable_for_source_dq_validation": True,
"enable_for_target_dq_validation": False,
"action_if_failed": "fail",
"tag": "validity",
"description": "stddev of col3 value must be greater than one"
},
{
"product_id": "product1",
"target_table_name": "dq_spark.test_table",
"rule_type": "agg_dq",
"rule": "stddev_col3_threshold",
"column_name": "col3",
"expectation": "stddev(col3) < 1",
"enable_for_source_dq_validation": False,
"enable_for_target_dq_validation": True,
"action_if_failed": "fail",
"tag": "validity",
"description": "avg of col3 value must be greater than 0"
"description": "avg of col3 value must be less than one"
}
],
"target_table_name": "dq_spark.test_final_table"
Expand All @@ -1361,11 +1375,12 @@ def fixture_create_stats_table():
False, # source_query_dq
False, # final_query_dq
spark.createDataFrame([ # expected_output
{"col1": 3, "col2": "c", "col3": 6}
{"col1": 3, "col2": "c", "col3": 6},
{"col1": 2, "col2": "d", "col3": 7}
]), # expected result
3, # input count
2, # error count
1, # output count
4, # input count
3, # error count
2, # output count
[{"description": "avg of col1 value must be greater than 4",
"rule": "avg_col1_threshold",
"rule_type": "agg_dq", "action_if_failed": "ignore", "tag": "validity"}],
Expand All @@ -1376,10 +1391,10 @@ def fixture_create_stats_table():
# final_agg_result
None, # source_query_dq_res
None, # final_query_dq_res
{"rules": {"num_dq_rules": 4, "num_row_dq_rules": 2},
{"rules": {"num_dq_rules": 5, "num_row_dq_rules": 2},
"query_dq_rules": {"num_final_query_dq_rules": 0, "num_source_query_dq_rules": 0,
"num_query_dq_rules": 0}, # dq_rules
"agg_dq_rules": {"num_source_agg_dq_rules": 2, "num_agg_dq_rules": 2,
"agg_dq_rules": {"num_source_agg_dq_rules": 2, "num_agg_dq_rules": 3,
"num_final_agg_dq_rules": 2}},
{"row_dq_status": "Passed", "source_agg_dq_status": "Passed",
"final_agg_dq_status": "Passed", "run_status": "Passed",
Expand Down

0 comments on commit 78c4759

Please sign in to comment.