From ae6c1ba808f4f0662a50351fb9cbaf3ca77aadf6 Mon Sep 17 00:00:00 2001 From: vigneshwarrvenkat Date: Fri, 19 Apr 2024 07:20:09 -0700 Subject: [PATCH] Bug fix on PR- 80 (#88) * new changes * Added test cases for the test_writer * removed the debug statements * removed the debug statements from test scripts * added linting changes * added new enhancement for detailed stats table * added new test case for detailed stats version 3 * Added new test cases to writer and reader module * Added two new colums to the rules df - querdq_delimiter and enable_custom_querydq_output * Fixed the test case for custom_query_ouptut boolean variable * Added new testa cases in the writer and reader module * Formatted the detailed write stats method * Added comments to the newly added helper methods * Added all the Docs to the md file * Added docs to the .md files * Added the doc to the md file with formatting * updated the readme docs * Added the fix for hanlding env alias bug#https://github.com/Nike-Inc/spark-expectations/issues/78 * removed comments * removed all debugging statements * clean up * Adding comments and clean up activity * Changes to the regex to handle spaces in the expression * Bug fix to handle spaces in regex expression match * Change the default delimiter $ to @ - Test case * Clean up for regex * Added new test case for row_dq detailed stats * Updating the doc for the new default delimiter * Updating the doc for the new default delimiter in Rules.md --------- Co-authored-by: Vigneshwarr Venkatesan --- docs/configurations/rules.md | 2 +- docs/getting-started/setup.md | 4 +- spark_expectations/config/user_config.py | 1 + spark_expectations/examples/base_setup.py | 2 +- .../examples/sample_dq_iceberg.py | 2 + spark_expectations/sinks/utils/writer.py | 2 +- spark_expectations/utils/reader.py | 2 +- tests/resources/product_rules_pipe.csv | 2 +- tests/resources/product_rules_pipe2.csv | 2 +- tests/sinks/utils/test_writer.py | 112 +++++++++++++++++- 10 files changed, 117 insertions(+), 14 deletions(-) diff --git a/docs/configurations/rules.md b/docs/configurations/rules.md index 4726238..f89a1bb 100644 --- a/docs/configurations/rules.md +++ b/docs/configurations/rules.md @@ -64,7 +64,7 @@ Please find the different types of possible expectations | Expect the maximum value in a column to fall within a specified range| expect_column_max_to_be_between |accuracy |```(select max([col_name]) from [table_name]) between [lower_bound] and [upper_bound]``` | | Expect the minimum value in a column fall within a specified range | expect_column_min_to_be_between |accuracy | ```(select min([col_name]) from [table_name]) between [lower_bound] and [upper_bound]``` | | Expect referential integrity | expect_referential_integrity_between_two_table_should_be_less_than_100 | accuracy | ```( select * from [table_a] left join [table_b] on [condition] where [table_b.column] is null) select count(*) from refrentail_check) < 100``` | -| Compare the source table and target table output | customer_missing_count_threshold | validity | The_alias_within_the_curly_bracket_is_added_to_the_expectation_which_gets_resolved_at_compile_time_with_alias_values```((select count(*) from ({source_f1}) a join ({source_f2}) b on a.customer_id = b.customer_id) - (select count(*) from ({target_f1}) a join ({target_f2}) b on source_column = target_column)) > ({target_f3})$source_f1$select column, count(*) from source_tbl group by column$source_f2$select column2, count(*) from table2 group by column2$target_f1$select column, count(*) from target_tbl group by column$target_f2$select column2, count(*) from target_tbl2 group by column2$target_f3$select count(*) from source_tbl ``` | +| Compare the source table and target table output (by default @ is the delimiter. can be overriden by query_dq_delimiter atttribute in rules table)| customer_missing_count_threshold | validity | The_alias_within_the_curly_bracket_is_added_to_the_expectation_which_gets_resolved_at_compile_time_with_alias_values```((select count(*) from ({source_f1}) a join ({source_f2}) b on a.customer_id = b.customer_id) - (select count(*) from ({target_f1}) a join ({target_f2}) b on source_column = target_column)) > ({target_f3})@source_f1@select column, count(*) from source_tbl group by column@source_f2@select column2, count(*) from table2 group by column2@target_f1@select column, count(*) from target_tbl group by column@target_f2@select column2, count(*) from target_tbl2 group by column2@target_f3@select count(*) from source_tbl ``` | diff --git a/docs/getting-started/setup.md b/docs/getting-started/setup.md index 84d9e11..2c43f39 100644 --- a/docs/getting-started/setup.md +++ b/docs/getting-started/setup.md @@ -56,7 +56,7 @@ create table if not exists `catalog`.`schema`.`{product}_rules` ( 12. `is_active` true or false to indicate if the rule is active or not. 13. `enable_error_drop_alert` true or false. This determines if an alert notification should be sent out if row(s) is(are) dropped from the data set 14. `error_drop_threshold` Threshold for the alert notification that gets triggered when row(s) is(are) dropped from the data set -15. `query_dq_delimiter` segregate custom queries delimiter ex: $, @ etc +15. `query_dq_delimiter` segregate custom queries delimiter ex: $, @ etc. By default it is @. Users can override it with any other delimiter based on the need. The same delimiter mentioned here has to be used in the custom query. 16. `enable_querydq_custom_output` required custom query output in separate table rule_type, enable_for_source_dq_validation and enable_for_target_dq_validation columns define source_agg_dq, target_agg_dq,source_query_dq and target_query_dq. please see the below definitions: @@ -200,4 +200,4 @@ dq_time string, -- (23)! 20. `target_dq_error_row_count` Number of rows failed in the target dq 21. `target_dq_row_count` Number of rows of the target dq 22. `dq_date` Dq executed date -23. `dq_time` Dq executed timestamp \ No newline at end of file +23. `dq_time` Dq executed timestamp diff --git a/spark_expectations/config/user_config.py b/spark_expectations/config/user_config.py index 137ce32..af17c40 100644 --- a/spark_expectations/config/user_config.py +++ b/spark_expectations/config/user_config.py @@ -68,6 +68,7 @@ class Constants: querydq_output_custom_table_name = "spark.expectations.query.dq.custom.table_name" # declare const variable for agg query dq detailed stats + se_agg_dq_expectation_regex_pattern = ( r"(\(.+?\)|\w+\(.+?\))(\s*[<>!=]+\s*.+|\s*between\s*.+)$" ) diff --git a/spark_expectations/examples/base_setup.py b/spark_expectations/examples/base_setup.py index f9c6f44..89081b0 100644 --- a/spark_expectations/examples/base_setup.py +++ b/spark_expectations/examples/base_setup.py @@ -40,7 +40,7 @@ ,("your_product", "dq_spark_dev.customer_order", "query_dq", "order_count_validity", "*", "({source_f1}) > 10@source_f1@select count(*) from order_source", "ignore", "validity", "row count threshold", true, true, true, false, 0, "@", true) ,("your_product", "dq_spark_local.customer_order", "query_dq", "order_count_validity_check", "*", "(select count(*) from order_source) > 10", "ignore", "validity", "row count threshold", true, true, true, false, 0, null, true) ,("your_product", "dq_spark_{env}.customer_order", "query_dq", "product_category", "*", "(select count(distinct category) from {table}) < 5", "ignore", "validity", "distinct product category", true, true, true, false, 0,null, true) - ,("your_product", "dq_spark_{env}.customer_order", "agg_dq", "distinct_of_ship_mode", "ship_mode", "count(distinct ship_mode)<=3", "ignore", "validity", "regex format validation for quantity", true, true, true, false, 0,null, null) + ,("your_product", "dq_spark_{env}.customer_order", "agg_dq", "distinct_of_ship_mode", "ship_mode", "count(distinct ship_mode) <= 3", "ignore", "validity", "regex format validation for quantity", true, true, true, false, 0,null, null) """ diff --git a/spark_expectations/examples/sample_dq_iceberg.py b/spark_expectations/examples/sample_dq_iceberg.py index d0cb127..b7859ac 100644 --- a/spark_expectations/examples/sample_dq_iceberg.py +++ b/spark_expectations/examples/sample_dq_iceberg.py @@ -42,6 +42,8 @@ user_config.se_enable_query_dq_detailed_result: True, user_config.se_enable_agg_dq_detailed_result: True, user_config.se_enable_error_table: True, + user_config.enable_query_dq_detailed_result: True, + user_config.enable_agg_dq_detailed_result: True, user_config.se_dq_rules_params: { "env": "local", "table": "product", diff --git a/spark_expectations/sinks/utils/writer.py b/spark_expectations/sinks/utils/writer.py index 07d445f..bd3787c 100644 --- a/spark_expectations/sinks/utils/writer.py +++ b/spark_expectations/sinks/utils/writer.py @@ -175,7 +175,7 @@ def get_row_dq_detailed_stats( _rule_expectations, _rule_tag, _rule_desc, - "fail", + "pass" if int(_dq_res["failed_row_count"]) == 0 else "fail", None, None, (_input_count - int(_dq_res["failed_row_count"])), diff --git a/spark_expectations/utils/reader.py b/spark_expectations/utils/reader.py index 158358a..8bef122 100644 --- a/spark_expectations/utils/reader.py +++ b/spark_expectations/utils/reader.py @@ -159,7 +159,7 @@ def _process_rules_df( _dq_query_delimiter = _row["query_dq_delimiter"] column_map["enable_querydq_custom_output"] = True else: - _dq_query_delimiter = "$" + _dq_query_delimiter = "@" column_map["enable_querydq_custom_output"] = False if ("enable_querydq_custom_output" in _row.keys()) and ( diff --git a/tests/resources/product_rules_pipe.csv b/tests/resources/product_rules_pipe.csv index a9c4357..4a841af 100644 --- a/tests/resources/product_rules_pipe.csv +++ b/tests/resources/product_rules_pipe.csv @@ -10,7 +10,7 @@ product1|table2|agg_dq|rule8|column5|expectation8|ignore|tag8|description8|true| product1|table2|agg_dq|rule9|column6|expectation9|ignore|tag9|description9|true|true|false|false|0|null|null product1|table1|agg_dq|rule10|column7|expectation10|ignore|tag10|description10|false|true|true|false|0|null|null product1|table2|agg_dq|rule11|column8|expectation11|ignore|tag11|description11|true|false|true|false|0|null|null -product1|table1|query_dq|rule13|column10|expectation13{source_f1}$source_f1$expectation13a|fail|tag13|description13|true|false|true|false|0|null|true +product1|table1|query_dq|rule13|column10|expectation13{source_f1}@source_f1@expectation13a|fail|tag13|description13|true|false|true|false|0|null|true product1|table1|query_dq|rule13|column10|expectation13{source_f1}@source_f1@expectation13a|fail|tag13|description13|true|false|true|false|0|@|false product1|table1|query_dq|rule13|column10|expectation13{source_f1}@source_f1@expectation13a|fail|tag13|description13|true|false|true|false|0|@|null product1|table1|query_dq|rule13|column10|expectation13{source_f1}@source_f1@expectation13a|fail|tag13|description13|true|false|true|false|0|@|fal diff --git a/tests/resources/product_rules_pipe2.csv b/tests/resources/product_rules_pipe2.csv index 97e1425..2647f3a 100644 --- a/tests/resources/product_rules_pipe2.csv +++ b/tests/resources/product_rules_pipe2.csv @@ -5,6 +5,6 @@ product1|table1|row_dq|rule3|column3|expectation3|ignore|tag3|description3|true| product1|table1|agg_dq|rule6|column3|expectation6|fail|tag6|description6|true|true|true|false|0|null|false product1|table1|agg_dq|rule10|column7|expectation10|ignore|tag10|description10|false|true|true|false|0|null|false product1|table1|query_dq|rule13|column10|expectation13{source_f1}@source_f1@expectation13a|fail|tag13|description13|true|false|true|false|0|@|fase -product1|table1|query_dq|rule13|column10|expectation13{source_f1}$source_f1$expectation13a|fail|tag13|description13|true|false|true|false|0|null|true +product1|table1|query_dq|rule13|column10|expectation13{source_f1}@source_f1@expectation13a|fail|tag13|description13|true|false|true|false|0|null|true product1|table1|query_dq|rule13|column10|expectation13{source_f1}@source_f1@expectation13a|fail|tag13|description13|true|false|true|false|0|@|false product1|table1|query_dq|rule13|column10|expectation13{source_f1}@source_f1@expectation13a|fail|tag13|description13|true|false|true|false|0|@|false \ No newline at end of file diff --git a/tests/sinks/utils/test_writer.py b/tests/sinks/utils/test_writer.py index 9e5e04c..0cc9d30 100644 --- a/tests/sinks/utils/test_writer.py +++ b/tests/sinks/utils/test_writer.py @@ -1647,7 +1647,7 @@ def test_write_error_stats( -@pytest.mark.parametrize("input_record, expected_result, writer_config", [ +@pytest.mark.parametrize("input_record, expected_result,dq_check, writer_config", [ ({ "input_count": 100, @@ -1747,7 +1747,107 @@ def test_write_error_stats( "target_dq_row_count": '4', "source_expectations": "sum(sales)>10000", - }, None), + }, 'agg_dq',None), + + ({ + "input_count": 100, + "error_count": 10, + "output_count": 90, + "rules_execution_settings_config": + {'row_dq': True, 'source_agg_dq': True, 'source_query_dq': True, 'target_agg_dq': True, 'target_query_dq': True}, + "agg_dq_detailed_stats_status": True, + "source_agg_dq_status": "Passed", + "final_agg_dq_status": "Passed", + "query_dq_detailed_stats_status": False, + "source_query_dq_status": "Passed", + "final_query_dq_status": "Passed", + "row_dq_status": "Passed", + "summarised_row_dq_res" : [{'rule_type':"row_dq", "rule" : "sales_greater_than_zero", "description" : "sales value should be greater than zero", "failed_row_count": 1, "tag" :"validity", "action_if_failed" : "drop"}], + "run_id" : "product_1_01450932-d5c2-11ee-a9ca-88e9fe5a7109", + "input_count" : 5, + "dq_expectations" : { + 'row_dq_rules': [{'product_id': 'your_product', 'table_name': 'dq_spark_local.customer_order', 'rule_type': 'row_dq', 'rule': 'sales_greater_than_zero', 'column_name': 'sales', 'expectation': 'sales > 2', 'action_if_failed': 'drop', 'enable_for_source_dq_validation': False, 'enable_for_target_dq_validation': True, 'tag': 'accuracy', 'description': 'sales value should be greater than zero', 'enable_error_drop_alert': False, 'error_drop_threshold': 0}], + }, + "test_dq_detailed_stats_table":"test_dq_detailed_stats_table", + + "test_querydq_output_custom_table_name": "test_querydq_output_custom_table_name", + + "detailed_stats_table_writer_config" : {'mode': 'overwrite', "format": "delta", 'partitionBy': [], 'bucketBy': {}, 'sortBy': [], + 'options': {"mergeSchema": "true"}}, + + "rowdq_detailed_stats" : [('product_1_01450932-d5c2-11ee-a9ca-88e9fe5a7109', 'product_1','dq_spark_local.customer_order', 'row_dq', 'sales_greater_than_zero', 'sales > 2', 'accuracy', 'sales value should be greater than zero', 'fail', None, None, None, 4,0,4), + ], + + "source_agg_dq_detailed_stats": [('product_1_01450932-d5c2-11ee-a9ca-88e9fe5a7109', 'product_1', 'dq_spark_local.customer_order', 'agg_dq', 'sum_of_sales', 'sum(sales)>10000', 'validity', 'regex format validation for quantity','fail', 1988, '>10000', 5,0,5), + ], + + "target_agg_dq_detailed_stats": [('product_1_01450932-d5c2-11ee-a9ca-88e9fe5a7109', 'product_1', 'dq_spark_local.customer_order', 'agg_dq', 'sum_of_sales', 'sum(sales)>10000', 'validity', 'regex format validation for quantity','fail', 1030, '>10000', 4,0,4), + ], + + "source_query_dq_detailed_stats": [('product_1_52fed65a-d670-11ee-8dfb-ae03267c3341', 'product_1', 'dq_spark_local.customer_order', 'query_dq', 'product_missing_count_threshold', '((select count(*) from (select distinct product_id,order_id from order_source) a) - (select count(*) from (select distinct product_id,order_id from order_target) b) ) < 3', 'validity', 'row count threshold', 'pass', + 1, + '<3', 5,0,5) + ], + + "target_query_dq_detailed_stats": [('product_1_52fed65a-d670-11ee-8dfb-ae03267c3341', 'product_1', 'dq_spark_local.customer_order', 'query_dq', 'product_missing_count_threshold', '((select count(*) from (select distinct product_id,order_id from order_source) a) - (select count(*) from (select distinct product_id,order_id from order_target) b) ) < 3', 'validity', 'row count threshold', 'pass', + 1, + '<3', 5,0,5) + + ], + + + "source_query_dq_output": [('your_product_96bb003e-e1cf-11ee-9a59-ae03267c3340', + 'your_product', 'dq_spark_local.customer_order', + 'product_missing_count_threshold', 'source_f1', '_source_dq', + {'source_f1': ['{"product_id":"FUR-TA-10000577","order_id":"US-2015-108966"}', + '{"product_id":"OFF-ST-10000760","order_id":"US-2015-108966"}', + '{"product_id":"FUR-CH-10000454","order_id":"CA-2016-152156"}', + '{"product_id":"FUR-BO-10001798","order_id":"CA-2016-152156"}', + '{"product_id":"OFF-LA-10000240","order_id":"CA-2016-138688"}']}, '2024-03-14 06:53:39'), + ('your_product_96bb003e-e1cf-11ee-9a59-ae03267c3340', + 'your_product', 'dq_spark_local.customer_order', + 'product_missing_count_threshold', 'target_f1', '_source_dq', + {'target_f1': ['{"product_id":"FUR-TA-10000577","order_id":"US-2015-108966"}', + '{"product_id":"FUR-CH-10000454","order_id":"CA-2016-152156"}', + '{"product_id":"FUR-BO-10001798","order_id":"CA-2016-152156"}', + '{"product_id":"OFF-LA-10000240","order_id":"CA-2016-138688"}']}, '2024-03-14 06:53:39'), + ], + "target_query_dq_output": [('your_product_96bb003e-e1cf-11ee-9a59-ae03267c3340', + 'your_product', 'dq_spark_local.customer_order', + 'product_missing_count_threshold', 'source_f1', '_target_dq', + {'source_f1': ['{"product_id":"FUR-TA-10000577","order_id":"US-2015-108966"}', + '{"product_id":"OFF-ST-10000760","order_id":"US-2015-108966"}', + '{"product_id":"FUR-CH-10000454","order_id":"CA-2016-152156"}', + '{"product_id":"FUR-BO-10001798","order_id":"CA-2016-152156"}', + '{"product_id":"OFF-LA-10000240","order_id":"CA-2016-138688"}']}, '2024-03-14 06:53:39'), + ('your_product_96bb003e-e1cf-11ee-9a59-ae03267c3340', + 'your_product', 'dq_spark_local.customer_order', + 'product_missing_count_threshold', 'target_f1', '_target_dq', + {'target_f1': ['{"product_id":"FUR-TA-10000577","order_id":"US-2015-108966"}', + '{"product_id":"FUR-CH-10000454","order_id":"CA-2016-152156"}', + '{"product_id":"FUR-BO-10001798","order_id":"CA-2016-152156"}', + '{"product_id":"OFF-LA-10000240","order_id":"CA-2016-138688"}']}, '2024-03-14 06:53:39') + ], + + + + + }, { + "product_id": "product1", + "table_name": "dq_spark_local.customer_order", + "rule":"sales_greater_than_zero", + "rule_type": "row_dq", + "source_expectations": "sales > 2", + "source_dq_status": "fail", + "source_dq_actual_result": None, + "source_dq_row_count": '5', + "target_expectations": None, + "target_dq_status": None, + "target_dq_actual_result": None, + "target_dq_row_count": None, + + }, 'row_dq',None), + ({ "input_count": 100, "error_count": 10, @@ -1828,7 +1928,7 @@ def test_write_error_stats( "target_dq_actual_result": None, "target_dq_row_count": None, - }, None), + }, 'query_dq',None), ({ "input_count": 100, @@ -1924,13 +2024,13 @@ def test_write_error_stats( "target_dq_row_count": 4, - }, {'mode': 'append', "format": "bigquery", 'partitionBy': [], 'bucketBy': {}, 'sortBy': [], + },'query_dq', {'mode': 'append', "format": "bigquery", 'partitionBy': [], 'bucketBy': {}, 'sortBy': [], 'options': {"mergeSchema": "true"}}), ]) def test_write_detailed_stats(input_record, - expected_result, + expected_result,dq_check, writer_config,) -> None: """ This functions writes the detailed stats for all rule type into the detailed stats table @@ -2000,7 +2100,7 @@ def test_write_detailed_stats(input_record, setattr(_mock_context, 'get_se_streaming_stats_dict', {'se.enable.streaming': True}) _fixture_writer.write_detailed_stats() - stats_table = spark.sql("select * from test_dq_detailed_stats_table where rule_type in ('agg_dq','query_dq')") + stats_table = spark.sql(f"select * from test_dq_detailed_stats_table where rule_type = '{dq_check}'") assert stats_table.count() == 1 row = stats_table.first() assert row.product_id == expected_result.get("product_id")