Skip to content

Commit

Permalink
Bug fix on PR- 80 (#88)
Browse files Browse the repository at this point in the history
* new changes

* Added test cases for the test_writer

* removed the debug statements

* removed the debug statements from test scripts

* added linting changes

* added new enhancement for detailed stats table

* added new test case for detailed stats version 3

* Added new test cases to writer and reader module

* Added two new colums to the rules df - querdq_delimiter and enable_custom_querydq_output

* Fixed the test case for custom_query_ouptut boolean variable

* Added new testa cases in the writer and reader module

* Formatted the detailed write stats method

* Added comments to the newly added helper methods

* Added all the Docs to the md file

* Added docs to the .md files

* Added the doc to the md file with formatting

* updated the readme docs

* Added the fix for hanlding env alias bug##78

* removed comments

* removed all debugging statements

* clean up

* Adding comments and clean up activity

* Changes to the regex to handle spaces in the expression

* Bug fix to handle spaces in regex expression match

* Change the default delimiter $ to @ - Test case

* Clean up for regex

* Added new test case for row_dq detailed stats

* Updating the doc for the new default delimiter

* Updating the doc for the new default delimiter in Rules.md

---------

Co-authored-by: Vigneshwarr Venkatesan <[email protected]>
  • Loading branch information
vigneshwarrvenkat and Vigneshwarr Venkatesan authored Apr 19, 2024
1 parent 002b742 commit ae6c1ba
Show file tree
Hide file tree
Showing 10 changed files with 117 additions and 14 deletions.
2 changes: 1 addition & 1 deletion docs/configurations/rules.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ Please find the different types of possible expectations
| Expect the maximum value in a column to fall within a specified range| expect_column_max_to_be_between |accuracy |```(select max([col_name]) from [table_name]) between [lower_bound] and [upper_bound]``` |
| Expect the minimum value in a column fall within a specified range | expect_column_min_to_be_between |accuracy | ```(select min([col_name]) from [table_name]) between [lower_bound] and [upper_bound]``` |
| Expect referential integrity | expect_referential_integrity_between_two_table_should_be_less_than_100 | accuracy | ```( select * from [table_a] left join [table_b] on [condition] where [table_b.column] is null) select count(*) from refrentail_check) < 100``` |
| Compare the source table and target table output | customer_missing_count_threshold | validity | The_alias_within_the_curly_bracket_is_added_to_the_expectation_which_gets_resolved_at_compile_time_with_alias_values```((select count(*) from ({source_f1}) a join ({source_f2}) b on a.customer_id = b.customer_id) - (select count(*) from ({target_f1}) a join ({target_f2}) b on source_column = target_column)) > ({target_f3})$source_f1$select column, count(*) from source_tbl group by column$source_f2$select column2, count(*) from table2 group by column2$target_f1$select column, count(*) from target_tbl group by column$target_f2$select column2, count(*) from target_tbl2 group by column2$target_f3$select count(*) from source_tbl ``` |
| Compare the source table and target table output (by default @ is the delimiter. can be overriden by query_dq_delimiter atttribute in rules table)| customer_missing_count_threshold | validity | The_alias_within_the_curly_bracket_is_added_to_the_expectation_which_gets_resolved_at_compile_time_with_alias_values```((select count(*) from ({source_f1}) a join ({source_f2}) b on a.customer_id = b.customer_id) - (select count(*) from ({target_f1}) a join ({target_f2}) b on source_column = target_column)) > ({target_f3})@source_f1@select column, count(*) from source_tbl group by column@source_f2@select column2, count(*) from table2 group by column2@target_f1@select column, count(*) from target_tbl group by column@target_f2@select column2, count(*) from target_tbl2 group by column2@target_f3@select count(*) from source_tbl ``` |



Expand Down
4 changes: 2 additions & 2 deletions docs/getting-started/setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ create table if not exists `catalog`.`schema`.`{product}_rules` (
12. `is_active` true or false to indicate if the rule is active or not.
13. `enable_error_drop_alert` true or false. This determines if an alert notification should be sent out if row(s) is(are) dropped from the data set
14. `error_drop_threshold` Threshold for the alert notification that gets triggered when row(s) is(are) dropped from the data set
15. `query_dq_delimiter` segregate custom queries delimiter ex: $, @ etc
15. `query_dq_delimiter` segregate custom queries delimiter ex: $, @ etc. By default it is @. Users can override it with any other delimiter based on the need. The same delimiter mentioned here has to be used in the custom query.
16. `enable_querydq_custom_output` required custom query output in separate table

rule_type, enable_for_source_dq_validation and enable_for_target_dq_validation columns define source_agg_dq, target_agg_dq,source_query_dq and target_query_dq. please see the below definitions:
Expand Down Expand Up @@ -200,4 +200,4 @@ dq_time string, -- (23)!
20. `target_dq_error_row_count` Number of rows failed in the target dq
21. `target_dq_row_count` Number of rows of the target dq
22. `dq_date` Dq executed date
23. `dq_time` Dq executed timestamp
23. `dq_time` Dq executed timestamp
1 change: 1 addition & 0 deletions spark_expectations/config/user_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class Constants:
querydq_output_custom_table_name = "spark.expectations.query.dq.custom.table_name"

# declare const variable for agg query dq detailed stats

se_agg_dq_expectation_regex_pattern = (
r"(\(.+?\)|\w+\(.+?\))(\s*[<>!=]+\s*.+|\s*between\s*.+)$"
)
Expand Down
2 changes: 1 addition & 1 deletion spark_expectations/examples/base_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
,("your_product", "dq_spark_dev.customer_order", "query_dq", "order_count_validity", "*", "({source_f1}) > 10@source_f1@select count(*) from order_source", "ignore", "validity", "row count threshold", true, true, true, false, 0, "@", true)
,("your_product", "dq_spark_local.customer_order", "query_dq", "order_count_validity_check", "*", "(select count(*) from order_source) > 10", "ignore", "validity", "row count threshold", true, true, true, false, 0, null, true)
,("your_product", "dq_spark_{env}.customer_order", "query_dq", "product_category", "*", "(select count(distinct category) from {table}) < 5", "ignore", "validity", "distinct product category", true, true, true, false, 0,null, true)
,("your_product", "dq_spark_{env}.customer_order", "agg_dq", "distinct_of_ship_mode", "ship_mode", "count(distinct ship_mode)<=3", "ignore", "validity", "regex format validation for quantity", true, true, true, false, 0,null, null)
,("your_product", "dq_spark_{env}.customer_order", "agg_dq", "distinct_of_ship_mode", "ship_mode", "count(distinct ship_mode) <= 3", "ignore", "validity", "regex format validation for quantity", true, true, true, false, 0,null, null)
"""
Expand Down
2 changes: 2 additions & 0 deletions spark_expectations/examples/sample_dq_iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
user_config.se_enable_query_dq_detailed_result: True,
user_config.se_enable_agg_dq_detailed_result: True,
user_config.se_enable_error_table: True,
user_config.enable_query_dq_detailed_result: True,
user_config.enable_agg_dq_detailed_result: True,
user_config.se_dq_rules_params: {
"env": "local",
"table": "product",
Expand Down
2 changes: 1 addition & 1 deletion spark_expectations/sinks/utils/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def get_row_dq_detailed_stats(
_rule_expectations,
_rule_tag,
_rule_desc,
"fail",
"pass" if int(_dq_res["failed_row_count"]) == 0 else "fail",
None,
None,
(_input_count - int(_dq_res["failed_row_count"])),
Expand Down
2 changes: 1 addition & 1 deletion spark_expectations/utils/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def _process_rules_df(
_dq_query_delimiter = _row["query_dq_delimiter"]
column_map["enable_querydq_custom_output"] = True
else:
_dq_query_delimiter = "$"
_dq_query_delimiter = "@"
column_map["enable_querydq_custom_output"] = False

if ("enable_querydq_custom_output" in _row.keys()) and (
Expand Down
2 changes: 1 addition & 1 deletion tests/resources/product_rules_pipe.csv
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ product1|table2|agg_dq|rule8|column5|expectation8|ignore|tag8|description8|true|
product1|table2|agg_dq|rule9|column6|expectation9|ignore|tag9|description9|true|true|false|false|0|null|null
product1|table1|agg_dq|rule10|column7|expectation10|ignore|tag10|description10|false|true|true|false|0|null|null
product1|table2|agg_dq|rule11|column8|expectation11|ignore|tag11|description11|true|false|true|false|0|null|null
product1|table1|query_dq|rule13|column10|expectation13{source_f1}$source_f1$expectation13a|fail|tag13|description13|true|false|true|false|0|null|true
product1|table1|query_dq|rule13|column10|expectation13{source_f1}@source_f1@expectation13a|fail|tag13|description13|true|false|true|false|0|null|true
product1|table1|query_dq|rule13|column10|expectation13{source_f1}@source_f1@expectation13a|fail|tag13|description13|true|false|true|false|0|@|false
product1|table1|query_dq|rule13|column10|expectation13{source_f1}@source_f1@expectation13a|fail|tag13|description13|true|false|true|false|0|@|null
product1|table1|query_dq|rule13|column10|expectation13{source_f1}@source_f1@expectation13a|fail|tag13|description13|true|false|true|false|0|@|fal
Expand Down
2 changes: 1 addition & 1 deletion tests/resources/product_rules_pipe2.csv
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ product1|table1|row_dq|rule3|column3|expectation3|ignore|tag3|description3|true|
product1|table1|agg_dq|rule6|column3|expectation6|fail|tag6|description6|true|true|true|false|0|null|false
product1|table1|agg_dq|rule10|column7|expectation10|ignore|tag10|description10|false|true|true|false|0|null|false
product1|table1|query_dq|rule13|column10|expectation13{source_f1}@source_f1@expectation13a|fail|tag13|description13|true|false|true|false|0|@|fase
product1|table1|query_dq|rule13|column10|expectation13{source_f1}$source_f1$expectation13a|fail|tag13|description13|true|false|true|false|0|null|true
product1|table1|query_dq|rule13|column10|expectation13{source_f1}@source_f1@expectation13a|fail|tag13|description13|true|false|true|false|0|null|true
product1|table1|query_dq|rule13|column10|expectation13{source_f1}@source_f1@expectation13a|fail|tag13|description13|true|false|true|false|0|@|false
product1|table1|query_dq|rule13|column10|expectation13{source_f1}@source_f1@expectation13a|fail|tag13|description13|true|false|true|false|0|@|false
112 changes: 106 additions & 6 deletions tests/sinks/utils/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1647,7 +1647,7 @@ def test_write_error_stats(



@pytest.mark.parametrize("input_record, expected_result, writer_config", [
@pytest.mark.parametrize("input_record, expected_result,dq_check, writer_config", [
({
"input_count": 100,
Expand Down Expand Up @@ -1747,7 +1747,107 @@ def test_write_error_stats(
"target_dq_row_count": '4',
"source_expectations": "sum(sales)>10000",
}, None),
}, 'agg_dq',None),
({
"input_count": 100,
"error_count": 10,
"output_count": 90,
"rules_execution_settings_config":
{'row_dq': True, 'source_agg_dq': True, 'source_query_dq': True, 'target_agg_dq': True, 'target_query_dq': True},
"agg_dq_detailed_stats_status": True,
"source_agg_dq_status": "Passed",
"final_agg_dq_status": "Passed",
"query_dq_detailed_stats_status": False,
"source_query_dq_status": "Passed",
"final_query_dq_status": "Passed",
"row_dq_status": "Passed",
"summarised_row_dq_res" : [{'rule_type':"row_dq", "rule" : "sales_greater_than_zero", "description" : "sales value should be greater than zero", "failed_row_count": 1, "tag" :"validity", "action_if_failed" : "drop"}],
"run_id" : "product_1_01450932-d5c2-11ee-a9ca-88e9fe5a7109",
"input_count" : 5,
"dq_expectations" : {
'row_dq_rules': [{'product_id': 'your_product', 'table_name': 'dq_spark_local.customer_order', 'rule_type': 'row_dq', 'rule': 'sales_greater_than_zero', 'column_name': 'sales', 'expectation': 'sales > 2', 'action_if_failed': 'drop', 'enable_for_source_dq_validation': False, 'enable_for_target_dq_validation': True, 'tag': 'accuracy', 'description': 'sales value should be greater than zero', 'enable_error_drop_alert': False, 'error_drop_threshold': 0}],
},
"test_dq_detailed_stats_table":"test_dq_detailed_stats_table",
"test_querydq_output_custom_table_name": "test_querydq_output_custom_table_name",
"detailed_stats_table_writer_config" : {'mode': 'overwrite', "format": "delta", 'partitionBy': [], 'bucketBy': {}, 'sortBy': [],
'options': {"mergeSchema": "true"}},
"rowdq_detailed_stats" : [('product_1_01450932-d5c2-11ee-a9ca-88e9fe5a7109', 'product_1','dq_spark_local.customer_order', 'row_dq', 'sales_greater_than_zero', 'sales > 2', 'accuracy', 'sales value should be greater than zero', 'fail', None, None, None, 4,0,4),
],
"source_agg_dq_detailed_stats": [('product_1_01450932-d5c2-11ee-a9ca-88e9fe5a7109', 'product_1', 'dq_spark_local.customer_order', 'agg_dq', 'sum_of_sales', 'sum(sales)>10000', 'validity', 'regex format validation for quantity','fail', 1988, '>10000', 5,0,5),
],
"target_agg_dq_detailed_stats": [('product_1_01450932-d5c2-11ee-a9ca-88e9fe5a7109', 'product_1', 'dq_spark_local.customer_order', 'agg_dq', 'sum_of_sales', 'sum(sales)>10000', 'validity', 'regex format validation for quantity','fail', 1030, '>10000', 4,0,4),
],
"source_query_dq_detailed_stats": [('product_1_52fed65a-d670-11ee-8dfb-ae03267c3341', 'product_1', 'dq_spark_local.customer_order', 'query_dq', 'product_missing_count_threshold', '((select count(*) from (select distinct product_id,order_id from order_source) a) - (select count(*) from (select distinct product_id,order_id from order_target) b) ) < 3', 'validity', 'row count threshold', 'pass',
1,
'<3', 5,0,5)
],
"target_query_dq_detailed_stats": [('product_1_52fed65a-d670-11ee-8dfb-ae03267c3341', 'product_1', 'dq_spark_local.customer_order', 'query_dq', 'product_missing_count_threshold', '((select count(*) from (select distinct product_id,order_id from order_source) a) - (select count(*) from (select distinct product_id,order_id from order_target) b) ) < 3', 'validity', 'row count threshold', 'pass',
1,
'<3', 5,0,5)
],
"source_query_dq_output": [('your_product_96bb003e-e1cf-11ee-9a59-ae03267c3340',
'your_product', 'dq_spark_local.customer_order',
'product_missing_count_threshold', 'source_f1', '_source_dq',
{'source_f1': ['{"product_id":"FUR-TA-10000577","order_id":"US-2015-108966"}',
'{"product_id":"OFF-ST-10000760","order_id":"US-2015-108966"}',
'{"product_id":"FUR-CH-10000454","order_id":"CA-2016-152156"}',
'{"product_id":"FUR-BO-10001798","order_id":"CA-2016-152156"}',
'{"product_id":"OFF-LA-10000240","order_id":"CA-2016-138688"}']}, '2024-03-14 06:53:39'),
('your_product_96bb003e-e1cf-11ee-9a59-ae03267c3340',
'your_product', 'dq_spark_local.customer_order',
'product_missing_count_threshold', 'target_f1', '_source_dq',
{'target_f1': ['{"product_id":"FUR-TA-10000577","order_id":"US-2015-108966"}',
'{"product_id":"FUR-CH-10000454","order_id":"CA-2016-152156"}',
'{"product_id":"FUR-BO-10001798","order_id":"CA-2016-152156"}',
'{"product_id":"OFF-LA-10000240","order_id":"CA-2016-138688"}']}, '2024-03-14 06:53:39'),
],
"target_query_dq_output": [('your_product_96bb003e-e1cf-11ee-9a59-ae03267c3340',
'your_product', 'dq_spark_local.customer_order',
'product_missing_count_threshold', 'source_f1', '_target_dq',
{'source_f1': ['{"product_id":"FUR-TA-10000577","order_id":"US-2015-108966"}',
'{"product_id":"OFF-ST-10000760","order_id":"US-2015-108966"}',
'{"product_id":"FUR-CH-10000454","order_id":"CA-2016-152156"}',
'{"product_id":"FUR-BO-10001798","order_id":"CA-2016-152156"}',
'{"product_id":"OFF-LA-10000240","order_id":"CA-2016-138688"}']}, '2024-03-14 06:53:39'),
('your_product_96bb003e-e1cf-11ee-9a59-ae03267c3340',
'your_product', 'dq_spark_local.customer_order',
'product_missing_count_threshold', 'target_f1', '_target_dq',
{'target_f1': ['{"product_id":"FUR-TA-10000577","order_id":"US-2015-108966"}',
'{"product_id":"FUR-CH-10000454","order_id":"CA-2016-152156"}',
'{"product_id":"FUR-BO-10001798","order_id":"CA-2016-152156"}',
'{"product_id":"OFF-LA-10000240","order_id":"CA-2016-138688"}']}, '2024-03-14 06:53:39')
],
}, {
"product_id": "product1",
"table_name": "dq_spark_local.customer_order",
"rule":"sales_greater_than_zero",
"rule_type": "row_dq",
"source_expectations": "sales > 2",
"source_dq_status": "fail",
"source_dq_actual_result": None,
"source_dq_row_count": '5',
"target_expectations": None,
"target_dq_status": None,
"target_dq_actual_result": None,
"target_dq_row_count": None,
}, 'row_dq',None),
({
"input_count": 100,
"error_count": 10,
Expand Down Expand Up @@ -1828,7 +1928,7 @@ def test_write_error_stats(
"target_dq_actual_result": None,
"target_dq_row_count": None,
}, None),
}, 'query_dq',None),
({
"input_count": 100,
Expand Down Expand Up @@ -1924,13 +2024,13 @@ def test_write_error_stats(
"target_dq_row_count": 4,
}, {'mode': 'append', "format": "bigquery", 'partitionBy': [], 'bucketBy': {}, 'sortBy': [],
},'query_dq', {'mode': 'append', "format": "bigquery", 'partitionBy': [], 'bucketBy': {}, 'sortBy': [],
'options': {"mergeSchema": "true"}}),
])

def test_write_detailed_stats(input_record,
expected_result,
expected_result,dq_check,
writer_config,) -> None:
"""
This functions writes the detailed stats for all rule type into the detailed stats table
Expand Down Expand Up @@ -2000,7 +2100,7 @@ def test_write_detailed_stats(input_record,
setattr(_mock_context, 'get_se_streaming_stats_dict', {'se.enable.streaming': True})
_fixture_writer.write_detailed_stats()

stats_table = spark.sql("select * from test_dq_detailed_stats_table where rule_type in ('agg_dq','query_dq')")
stats_table = spark.sql(f"select * from test_dq_detailed_stats_table where rule_type = '{dq_check}'")
assert stats_table.count() == 1
row = stats_table.first()
assert row.product_id == expected_result.get("product_id")
Expand Down

0 comments on commit ae6c1ba

Please sign in to comment.