Skip to content

Commit

Permalink
Reverting error_df optimization to fix #101 (#104)
Browse files Browse the repository at this point in the history
Co-authored-by: Arthur Shing <[email protected]>
  • Loading branch information
shinga and Arthur Shing authored Jul 16, 2024
1 parent d3edcde commit d860ae5
Showing 1 changed file with 1 addition and 56 deletions.
57 changes: 1 addition & 56 deletions spark_expectations/sinks/utils/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
col,
split,
current_date,
monotonically_increasing_id,
coalesce,
)
from pyspark.sql.types import StructType
from spark_expectations import _log
Expand Down Expand Up @@ -879,19 +877,6 @@ def write_error_records_final(
) -> Tuple[int, DataFrame]:
try:
_log.info("_write_error_records_final started")
df = df.withColumn("sequence_number", monotonically_increasing_id())

df_seq = df

df = df.select(
"sequence_number",
*[
dq_column
for dq_column in df.columns
if dq_column.startswith(f"{rule_type}")
],
)
df.cache()

failed_records = [
f"size({dq_column}) != 0"
Expand Down Expand Up @@ -929,26 +914,7 @@ def write_error_records_final(
lit(self._context.get_run_date),
)
)
error_df_seq = df.filter(f"size(meta_{rule_type}_results) != 0")

error_df = df_seq.join(
error_df_seq,
df_seq.sequence_number == error_df_seq.sequence_number,
"inner",
)

# sequence number column removing from the data frame
error_df_columns = [
dq_column
for dq_column in error_df.columns
if (
dq_column.startswith("sequence_number")
or dq_column.startswith(rule_type)
)
is False
]

error_df = error_df.select(error_df_columns)
error_df = df.filter(f"size(meta_{rule_type}_results) != 0")
self._context.print_dataframe_with_debugger(error_df)

print(
Expand All @@ -965,27 +931,6 @@ def write_error_records_final(
# if _error_count > 0:
self.generate_summarized_row_dq_res(error_df, rule_type)

# sequence number adding to dataframe for passing to action function
df = df_seq.join(
error_df_seq,
df_seq.sequence_number == error_df_seq.sequence_number,
"left",
).withColumn(
f"meta_{rule_type}_results",
coalesce(col(f"meta_{rule_type}_results"), array()),
)

df = (
df.select(error_df_columns)
.withColumn(
self._context.get_run_id_name, lit(self._context.get_run_id)
)
.withColumn(
self._context.get_run_date_time_name,
lit(self._context.get_run_date),
)
)

_log.info("_write_error_records_final ended")
return _error_count, df

Expand Down

0 comments on commit d860ae5

Please sign in to comment.