Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 [BUGFIX] For SAPRFCV2 adding whitespaces to match sub - queries #1100

Open
wants to merge 16 commits into
base: 2.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -187,3 +187,6 @@ profiles.yaml

# AWS
.aws

# Rye
.rye/
65 changes: 41 additions & 24 deletions src/viadot/sources/sap_rfc.py
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,7 @@ def __init__(

if rfc_unique_id is not None:
self.rfc_unique_id = list(set(rfc_unique_id))
self._rfc_unique_id_len = {}
else:
self.rfc_unique_id = rfc_unique_id

Expand Down Expand Up @@ -966,24 +967,24 @@ def query(self, sql: str, sep: str | None = None) -> None: # noqa: C901, PLR091
col_length_total = 0
if isinstance(self.rfc_unique_id[0], str):
character_limit = self.rfc_total_col_width_character_limit
for ref_column in self.rfc_unique_id:
col_length_reference_column = int(
for rfc_unique_col in self.rfc_unique_id:
rfc_unique_col_len = int(
self.call(
"DDIF_FIELDINFO_GET",
TABNAME=table_name,
FIELDNAME=ref_column,
FIELDNAME=rfc_unique_col,
)["DFIES_TAB"][0]["LENG"]
)
if col_length_reference_column > int(
if rfc_unique_col_len > int(
self.rfc_total_col_width_character_limit / 4
):
msg = f"{ref_column} can't be used as unique column, too large."
msg = f"{rfc_unique_col} can't be used as unique column, too large."
raise ValueError(msg)
local_limit = (
self.rfc_total_col_width_character_limit
- col_length_reference_column
self.rfc_total_col_width_character_limit - rfc_unique_col_len
)
character_limit = min(local_limit, character_limit)
self._rfc_unique_id_len[rfc_unique_col] = rfc_unique_col_len
else:
character_limit = self.rfc_total_col_width_character_limit

Expand All @@ -995,21 +996,21 @@ def query(self, sql: str, sep: str | None = None) -> None: # noqa: C901, PLR091
cols.append(col)
else:
if isinstance(self.rfc_unique_id[0], str) and all(
rfc_col not in cols for rfc_col in self.rfc_unique_id
rfc_unique_col not in cols for rfc_unique_col in self.rfc_unique_id
):
for rfc_col in self.rfc_unique_id:
if rfc_col not in cols:
cols.append(rfc_col)
for rfc_unique_col in self.rfc_unique_id:
if rfc_unique_col not in cols:
cols.append(rfc_unique_col)
lists_of_columns.append(cols)
cols = [col]
col_length_total = int(col_length)

if isinstance(self.rfc_unique_id[0], str) and all(
rfc_col not in cols for rfc_col in self.rfc_unique_id
rfc_unique_col not in cols for rfc_col in self.rfc_unique_id
):
for rfc_col in self.rfc_unique_id:
if rfc_col not in cols:
cols.append(rfc_col)
for rfc_unique_col in self.rfc_unique_id:
if rfc_unique_col not in cols:
cols.append(rfc_unique_col)
lists_of_columns.append(cols)

columns = lists_of_columns
Expand Down Expand Up @@ -1040,6 +1041,30 @@ def _get_alias(self, column: str) -> str:
def _get_client_side_filter_cols(self):
return [f[1].split()[0] for f in self.client_side_filters.items()]

def _adjust_whitespaces(self, df: pd.DataFrame) -> pd.DataFrame:
"""Adjust the number of whitespaces.

Add whitespace characters in each row of each unique column to achieve
equal length of values in these columns, ensuring proper merging of subqueries.

"""
for rfc_unique_col in self.rfc_unique_id:
# Check in SAP metadata what is the declared
# dtype characters amount
rfc_unique_column_len = self._rfc_unique_id_len[rfc_unique_col]
actual_length_of_field = df[rfc_unique_col].str.len()
# Check which rows have fewer characters
# than specified in the column data type.
rows_missing_whitespaces = actual_length_of_field < rfc_unique_column_len
if any(rows_missing_whitespaces):
# Check how many whitespaces are missing in each row.
logger.info(f"Adding whitespaces for {rfc_unique_col} column")
n_missing_whitespaces = rfc_unique_column_len - actual_length_of_field
df.loc[rows_missing_whitespaces, rfc_unique_col] += np.char.multiply(
" ", n_missing_whitespaces[rows_missing_whitespaces]
)
return df

# TODO: refactor to remove linter warnings and so this can be tested.
@add_viadot_metadata_columns
def to_df(self, tests: dict | None = None) -> pd.DataFrame: # noqa: C901, PLR0912, PLR0915
Expand Down Expand Up @@ -1117,7 +1142,6 @@ def to_df(self, tests: dict | None = None) -> pd.DataFrame: # noqa: C901, PLR09
record_key = "WA"
data_raw = np.array(response["DATA"])
del response

# If reference columns are provided, it's not necessary to remove
# any extra row.
if not isinstance(self.rfc_unique_id[0], str):
Expand All @@ -1126,22 +1150,15 @@ def to_df(self, tests: dict | None = None) -> pd.DataFrame: # noqa: C901, PLR09
)
else:
start = False

records = list(_gen_split(data_raw, sep, record_key))
del data_raw

if (
isinstance(self.rfc_unique_id[0], str)
and list(df.columns) != fields
):
df_tmp = pd.DataFrame(columns=fields)
df_tmp[fields] = records
# SAP adds whitespaces to the first extracted column value.
# If whitespace is in unique column, it must be removed to make
# a proper merge.
for col in self.rfc_unique_id:
df_tmp[col] = df_tmp[col].str.strip()
df[col] = df[col].str.strip()
df_tmp = self._adjust_whitespaces(df_tmp)
df = pd.merge(df, df_tmp, on=self.rfc_unique_id, how="outer")
elif not start:
df[fields] = records
Expand Down
13 changes: 13 additions & 0 deletions tests/unit/test_sap_rfc_2.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from collections import OrderedDict

from pandas import DataFrame

from viadot.utils import skip_test_on_missing_extra

from .test_sap_rfc import (
Expand Down Expand Up @@ -104,3 +106,14 @@ def test___build_pandas_filter_query():
sap._build_pandas_filter_query(sap.client_side_filters)
== "thirdlongcolname == 01234"
), sap._build_pandas_filter_query(sap.client_side_filters)


def test__adjust_whitespaces():
sap.rfc_unique_id = ["column1", "column2"]
sap._rfc_unique_id_len = {"column1": 5, "column2": 4}
data = {"column1": ["xyz ", "oiu"], "column2": ["yrt ", "lkj"]}
df = DataFrame(data)
df = sap._adjust_whitespaces(df)
col_values_len = df.applymap(lambda x: len(x))
check_if_length_match = col_values_len == sap._rfc_unique_id_len.values()
assert check_if_length_match.all().all()
Loading