Skip to content

Commit

Permalink
Adding range comparision functionality to aggdq Detailed Stats table (#…
Browse files Browse the repository at this point in the history
…86)

* initial commit

* added range comparison to agg dq detailed stats

* modified sample_dq_iceberg to run detailed stats

* removing print statement as it is not needed

* modified example in rules.md file
  • Loading branch information
nishantsingh93 authored Apr 18, 2024
1 parent 19ed71a commit 002b742
Show file tree
Hide file tree
Showing 9 changed files with 326 additions and 57 deletions.
1 change: 1 addition & 0 deletions CONTRIBUTORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Thanks to the contributors who helped on this project apart from the authors
* [Samy Coenen](https://github.com/SamyCoenen)
* [Jagadapi Sivanaga Krishnam Raja Reddy](www.linkedin.com/in/jskrajareddy/)
* [Vigneshwarr Venkatesan](https://www.linkedin.com/in/vignesh15)
* [Nishant Singh](https://www.linkedin.com/in/singh-nishant/)

# Honorary Mentions
Thanks to the team below for invaluable insights and support throughout the initial release of this project
Expand Down
9 changes: 9 additions & 0 deletions docs/configurations/configure_rules.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,15 @@ action_if_failed, tag, description, enable_for_source_dq_validation, enable_fo
--statistics table when the sum of the sales values falls below 10000
,('apla_nd', '`catalog`.`schema`.customer_order', 'agg_dq', 'sum_of_sales', 'sales', 'sum(sales)>10000', 'ignore',
'validity', 'sum of sales must be greater than 10000', true, true, true,false, 0,null, null)

--The aggregation rule is established on the 'sales' column and the metadata of the rule will be captured in the
--statistics table when the sum of the sales values falls between 1000 and 10000
,('apla_nd', '`catalog`.`schema`.customer_order', 'agg_dq', 'sum_of_sales_range_type1', 'sales', 'sum(sales) between 1000 and 10000', 'ignore',
'validity', 'sum of sales must be between 1000 and 1000', true, true, true)

--The aggregation rule is established on the 'sales' column and the metadata of the rule will be captured in the
--statistics table when the sum of the sales value is greater than 1000 and less than 10000
,('apla_nd', '`catalog`.`schema`.customer_order', 'agg_dq', 'sum_of_sales_range_type2', 'sales', 'sum(sales)>1000 and sum(sales)<10000', 'ignore', 'validity', 'sum of sales must be greater than 1000 and less than 10000', true, true, true)

--The aggregation rule is established on the 'ship_mode' column and the metadata of the rule will be captured in
--the statistics table when distinct ship_mode greater than 3 and enabled for only source data set
Expand Down
1 change: 1 addition & 0 deletions docs/configurations/rules.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ Please find the different types of possible expectations
| Expect the maximum value in a column to fall within a specified range| expect_column_max_to_be_between |accuracy |```max([col_name]) between [lower_bound] and [upper_bound]``` |
| Expect the minimum value in a column fall within a specified range | expect_column_sum_to_be_between |accuracy | ```min([col_name]) between [lower_bound] and [upper_bound]``` |
| Expect row count of the dataset fall within certain range | expect_row_count_to_be_between | accuracy | ```count(*) between [lower_bound] and [upper_bound]``` |
| Expect row count of the dataset fall within certain range | expect_row_count_to_be_in_range | accuracy | ```count(*) >[lower_bound] and count(*) < [upper_bound]``` |


#### Possible Query Data Quality Expectations
Expand Down
8 changes: 7 additions & 1 deletion spark_expectations/config/user_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,10 @@ class Constants:
querydq_output_custom_table_name = "spark.expectations.query.dq.custom.table_name"

# declare const variable for agg query dq detailed stats
se_agg_dq_expectation_regex_pattern = r"(\w+\(.+?\))([<>!=]+.+)$"
se_agg_dq_expectation_regex_pattern = (
r"(\(.+?\)|\w+\(.+?\))(\s*[<>!=]+\s*.+|\s*between\s*.+)$"
)
# declare const variable for range in agg query dq detailed stats
se_agg_dq_expectation_range_regex_pattern = (
r"(\w+\(\w+\)|\w+)(\s*[><]\s*\d+)\s+(and)\s+(\w+\(\w+\)|\w+)(\s*[><]\s*\d+)"
)
2 changes: 2 additions & 0 deletions spark_expectations/examples/base_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
,("your_product", "dq_spark_{env}.customer_order", "row_dq", "discount_threshold", "discount", "discount*100 < 60","drop", "validity", "discount should be less than 40", true, true, true, false, 0,null, null)
,("your_product", "dq_spark_{env}.customer_order", "row_dq", "ship_mode_in_set", "ship_mode", "lower(trim(ship_mode)) in('second class', 'standard class', 'standard class')", "drop", "validity", "ship_mode mode belongs in the sets", true, true, true, false, 0,null, null)
,("your_product", "dq_spark_{env}.customer_order", "row_dq", "profit_threshold", "profit", "profit>0", "ignore", "validity", "profit threshold should be greater tahn 0", false, true, true, true, 0,null, null)
,("your_product", "dq_spark_local.customer_order", "agg_dq", "sum_of_sales_range type 1", "sales", "sum(sales)>99 and sum(sales)<99999", "ignore", "validity", "regex format validation for quantity", true, true, true, false, 0, null, true)
,("your_product", "dq_spark_local.customer_order", "agg_dq", "sum_of_sales_range type 2", "sales", "sum(sales) between 100 and 10000 ", "ignore", "validity", "regex format validation for quantity", true, true, true, false, 0, null, true)
,("your_product", "dq_spark_local.customer_order", "agg_dq", "sum_of_sales", "sales", "sum(sales)>10000", "ignore", "validity", "regex format validation for quantity", true, true, true, false, 0,null, null)
,("your_product", "dq_spark_local.customer_order", "agg_dq", "sum_of_quantity", "quantity", "sum(quantity)>10000", "ignore", "validity", "regex format validation for quantity", true, true, true, false, 0,null, null)
,("your_product", "dq_spark_local.customer_order", "query_dq", "product_missing_count_threshold", "*", "((select count(*) from ({source_f1}) a) - (select count(*) from ({target_f1}) b) ) < 3$source_f1$select distinct product_id,order_id from order_source$target_f1$select distinct product_id,order_id from order_target", "ignore", "validity", "row count threshold", true, true, true, false, 0,null, true)
Expand Down
2 changes: 1 addition & 1 deletion spark_expectations/examples/sample_dq_iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def build_new() -> DataFrame:

spark.sql("use dq_spark_local")
spark.sql("select * from dq_spark_local.dq_stats").show(truncate=False)
spark.sql("select * from dq_spark_local.dq_stats_custom").show(truncate=False)
spark.sql("select * from dq_spark_local.dq_stats_detailed").show(truncate=False)
spark.sql("select * from dq_spark_local.dq_stats_querydq_output").show(
truncate=False
)
Expand Down
180 changes: 132 additions & 48 deletions spark_expectations/utils/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,67 +147,151 @@ def agg_query_dq_detailed_result(
_dq_rule["rule_type"] == _context.get_agg_dq_rule_type_name
and _context.get_agg_dq_detailed_stats_status is True
):
_pattern = rf"{constant_config.se_agg_dq_expectation_regex_pattern}"
_re_compile = re.compile(_pattern)
_agg_dq_expectation_match = re.match(
_re_compile, _dq_rule["expectation"]
)
if not (
">" in _dq_rule["expectation"] and "<" in _dq_rule["expectation"]
):
_pattern = rf"{constant_config.se_agg_dq_expectation_regex_pattern}"
_re_compile = re.compile(_pattern)
_agg_dq_expectation_match = re.match(
_re_compile, _dq_rule["expectation"]
)
_agg_dq_expectation_expr = None
if _agg_dq_expectation_match:
_agg_dq_expectation_aggstring = _agg_dq_expectation_match.group(
1
)
_agg_dq_expectation_expr = _agg_dq_expectation_match.group(2)
_agg_dq_expectation_cond_expr = expr(
_agg_dq_expectation_aggstring
)

if _agg_dq_expectation_match:
_agg_dq_expectation_aggstring = _agg_dq_expectation_match.group(1)
_agg_dq_expectation_expr = _agg_dq_expectation_match.group(2)
_agg_dq_expectation_cond_expr = expr(_agg_dq_expectation_aggstring)
_agg_dq_actual_count_value = int(
df.agg(_agg_dq_expectation_cond_expr).collect()[0][0]
)

_agg_dq_actual_count_value = int(
df.agg(_agg_dq_expectation_cond_expr).collect()[0][0]
)
_agg_dq_expression_str = (
str(_agg_dq_actual_count_value) + _agg_dq_expectation_expr
)

_agg_dq_expression_str = (
str(_agg_dq_actual_count_value) + _agg_dq_expectation_expr
)
_agg_dq_expr_condition = []

_agg_dq_expr_condition = []
_agg_dq_expr_condition.append(
when(expr(_agg_dq_expression_str), True)
.otherwise(False)
.alias("agg_dq_aggregation_check")
)

_agg_dq_expr_condition.append(
when(expr(_agg_dq_expression_str), True)
.otherwise(False)
.alias("agg_dq_aggregation_check")
)
_df_agg_dq_expr_result = df.select(*_agg_dq_expr_condition)

_df_agg_dq_expr_result = df.select(*_agg_dq_expr_condition)
# status = "pass" if eval(_agg_dq_expression_str) else "fail"

# status = "pass" if eval(_agg_dq_expression_str) else "fail"
status = (
"pass"
if _df_agg_dq_expr_result.filter(
_df_agg_dq_expr_result["agg_dq_aggregation_check"]
).count()
> 0
else "fail"
)

if _source_dq_status:
row_count = _context.get_input_count
elif _target_dq_status:
row_count = _context.get_output_count
else:
row_count = None

actual_row_count = row_count if status == "pass" else None
error_row_count = 0 if status == "pass" else row_count

status = (
"pass"
if _df_agg_dq_expr_result.filter(
_df_agg_dq_expr_result["agg_dq_aggregation_check"]
).count()
> 0
else "fail"
actual_outcome = (
_agg_dq_actual_count_value
if (_agg_dq_actual_count_value is not None)
else None
)
expected_outcome = (
str(_agg_dq_expectation_expr)
if (_agg_dq_expectation_expr is not None)
else None
)
else:
pattern = (
rf"{constant_config.se_agg_dq_expectation_range_regex_pattern}"
)
matches = re.match(pattern, _dq_rule["expectation"])
result = None
if matches:
result = matches.groups()
_agg_dq_expectation_aggstring = result[0]
_agg_dq_expectation_expr_lowerbound = result[1]
_agg_dq_expectation_expr_upperbound = result[4]

_agg_dq_expectation_cond_expr = expr(
_agg_dq_expectation_aggstring
)

if _source_dq_status:
row_count = _context.get_input_count
elif _target_dq_status:
row_count = _context.get_output_count
else:
row_count = None
_agg_dq_actual_count_value = int(
df.agg(_agg_dq_expectation_cond_expr).collect()[0][0]
)

actual_row_count = row_count if status == "pass" else None
error_row_count = 0 if status == "pass" else row_count
_agg_dq_expression_str_lower = (
str(_agg_dq_actual_count_value)
+ _agg_dq_expectation_expr_lowerbound
)

actual_outcome = (
_agg_dq_actual_count_value
if (_agg_dq_actual_count_value is not None)
else None
)
expected_outcome = (
str(_agg_dq_expectation_expr)
if (_agg_dq_expectation_expr is not None)
else None
)
_agg_dq_expression_str_upper = (
str(_agg_dq_actual_count_value)
+ _agg_dq_expectation_expr_upperbound
)

_agg_dq_expr_condition = []

_agg_dq_expression_str = (
f"{str(_agg_dq_expression_str_lower)}"
" and "
f"{str(_agg_dq_expression_str_upper)}"
)

_agg_dq_expr_condition.append(
when(expr(_agg_dq_expression_str), True)
.otherwise(False)
.alias("agg_dq_aggregation_check")
)

_df_agg_dq_expr_result = df.select(*_agg_dq_expr_condition)

status = (
"pass"
if _df_agg_dq_expr_result.filter(
_df_agg_dq_expr_result["agg_dq_aggregation_check"]
).count()
> 0
else "fail"
)

if _source_dq_status:
row_count = _context.get_input_count
elif _target_dq_status:
row_count = _context.get_output_count
else:
row_count = None
actual_row_count = row_count if status == "pass" else None
error_row_count = 0 if status == "pass" else row_count

actual_outcome = (
_agg_dq_actual_count_value
if (_agg_dq_actual_count_value is not None)
else None
)

expected_outcome = (
_agg_dq_expression_str
if (
_agg_dq_expression_str_lower is not None
and _agg_dq_expression_str_upper is not None
)
else None
)
elif (
_dq_rule["rule_type"] == _context.get_query_dq_rule_type_name
and _context.get_query_dq_detailed_stats_status is True
Expand Down
Loading

0 comments on commit 002b742

Please sign in to comment.