Skip to content

Commit

Permalink
Merge pull request #113 from provectus/DQ-53
Browse files Browse the repository at this point in the history
[MAINTENANCE] Added logging instead of print
  • Loading branch information
bvolodarskiy authored Aug 15, 2023
2 parents f2d5a7d + 0abb891 commit e54c92b
Show file tree
Hide file tree
Showing 13 changed files with 102 additions and 15 deletions.
4 changes: 3 additions & 1 deletion functions/allure_report/allure_report/make_allure_report.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from mapper import create_json_report
import os

from loguru import logger

def handler(event, context):
logger.info("Starting making of allure report")
qa_bucket = os.environ['BUCKET']
reports_web = os.environ['REPORTS_WEB']
report = event['report'].get('Payload')
Expand All @@ -12,5 +13,6 @@ def handler(event, context):
link, key = create_json_report(suite, reports_web, folder_key, validate_id)
os.system("chmod +x generate_report.sh")
os.system(f"sh generate_report.sh {key} {qa_bucket}")
logger.info("Making of allure report is finished")

return link
5 changes: 4 additions & 1 deletion functions/allure_report/allure_report/mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import awswrangler as wr
import boto3
import re

from loguru import logger
qa_bucket = os.environ['BUCKET']
s3 = boto3.resource('s3')
bucket = s3.Bucket(qa_bucket)
Expand All @@ -30,13 +30,16 @@ def get_test_human_name(file):
for key, value in params.items():
if type(value) == list:
if key == 'value_set':
logger.debug("key in param is value_set")
for i in value:
new_params[f"v__{str(value.index(i))}"] = i
elif key == 'column_set':
logger.debug("key in param is column_set")
for i in value:
new_params[f"column_list_{str(value.index(i))}"] = i
else:
for i in value:
logger.debug("key in param is other")
new_params[f"{str(key)}_{str(value.index(i))}"] = i

if new_params:
Expand Down
1 change: 1 addition & 0 deletions functions/allure_report/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ boto==2.49.0
pyarrow==3.0.0
fastparquet==0.8.1
awswrangler==2.12.1
loguru==0.7.0
13 changes: 11 additions & 2 deletions functions/data_test/data_test/Expectation_report_new.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from ydata_profiling.config import Settings
from ydata_profiling.model.handler import Handler
import re

from loguru import logger

class ExpectationsReportNew:
config: Settings
Expand Down Expand Up @@ -51,6 +51,7 @@ def to_expectation_suite(
try:
import great_expectations as ge
except ImportError:
logger.error("Please install great expectations before using the expectation functionality")
raise ImportError(
"Please install great expectations before using the expectation functionality"
)
Expand All @@ -66,27 +67,33 @@ def to_expectation_suite(
new_column_in_mapping = {}
try:
mapping_schema = mapping_config[suite_name]
logger.debug(f"mapping schema was found for {suite_name}")
except (KeyError, TypeError):
logger.debug(f"mapping schema was not found for {suite_name}")
mapping_schema = None

data_asset = data_context.get_datasource(
"cloud").get_asset(f"{suite_name}_{run_name}")
batch_request = data_asset.build_batch_request()

if reuse_suite:
logger.debug(f"reuse suite branch")
if use_old_suite:
logger.debug(f"use old suite branch")
suite_old = data_context.get_expectation_suite(
f"{suite_name}_{old_suite_name}")
data_context.add_or_update_expectation_suite(
expectations=suite_old.expectations,
expectation_suite_name=f"{suite_name}_{run_name}")
else:
logger.debug(f"not use old suite branch")
schema_list = list(mapping_schema.keys())
dict_keys = [
i for i in mapping_schema if isinstance(
mapping_schema[i], dict)]

if not dict_keys:
logger.debug(f"dict keys were not found")
suite_old = data_context.get_expectation_suite(
f"{suite_name}_{old_suite_name}")
schema_list.append("_nocolumn")
Expand All @@ -98,6 +105,7 @@ def to_expectation_suite(
new_column_in_mapping.update(
{key: mapping_schema[key]})
if new_column_in_mapping_keys:
logger.debug(f"new_column_in_mapping_keys were found")
schema_list = [
x for x in schema_list if x not in new_column_in_mapping_keys and x not in ignored_columns]
old_schema_list = list(
Expand Down Expand Up @@ -164,6 +172,7 @@ def to_expectation_suite(
expectations=suite.expectations,
expectation_suite_name=f"{suite_name}_{run_name}")
else: # if we have nested tables
logger.debug(f"dict keys were found")
r = re.compile("new_col_added")
new_column_in_mapping_keys = list(
filter(r.match, schema_list))
Expand Down Expand Up @@ -288,7 +297,7 @@ def to_expectation_suite(
expectations=suite.expectations,
expectation_suite_name=f"{suite_name}_{run_name}")
else:

logger.debug(f"new suite branch")
suite = data_context.add_or_update_expectation_suite(
expectation_suite_name=f"{suite_name}_{run_name}"
)
Expand Down
27 changes: 25 additions & 2 deletions functions/data_test/data_test/data_source_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import awswrangler as wr
import os
import json

from loguru import logger
ENV = os.environ['ENVIRONMENT']


Expand Down Expand Up @@ -40,6 +40,7 @@ def __init__(self, extension):
self.extension = extension

def read(self, source):
logger.info("DataSource is s3")
if self.extension == 'csv':
return wr.s3.read_csv(path=source), source
elif self.extension == 'parquet':
Expand Down Expand Up @@ -67,6 +68,7 @@ def read(self, source):
ctas_approach=False,
s3_output=f"s3://{self.qa_bucket_name}/athena_results/"
)
logger.info("DataSource is Athena")
return final_df, source


Expand All @@ -78,31 +80,41 @@ def __init__(self, qa_bucket_name, run_name, table_name, coverage_config):
self.coverage_config = coverage_config

def read(self, source):
logger.info("DataSource is Redshift")
final_df = wr.s3.read_parquet(path=source, ignore_index=True)
final_df.columns = final_df.columns.str.lower()
if 'redshift' in self.run_name:
redshift_db = os.environ['REDSHIFT_DB']
redshift_secret = os.environ['REDSHIFT_SECRET']
logger.debug("self.run_name contains redshift")
try:
sort_keys_config = json.loads(wr.s3.read_json(
path=f"s3://{self.qa_bucket_name}/test_configs/sort_keys.json").to_json())
sort_key = list(map(str.lower,
sort_keys_config[self.table_name]["sortKey"]))
logger.debug(f"sort_key was found for {self.table_name} at sort_keys.json")
except KeyError:
sort_key = ['update_dt']
logger.warning(f"sort_key was not found for {self.table_name} at sort_keys.json and set to update_dt")
try:
target_table = self.coverage_config["targetTable"]
logger.debug("targetTable was found at test_coverage.json")
except (KeyError, IndexError, TypeError) as e:
target_table = None
logger.warning("targetTable was not found at test_coverage.json and set to None")
if target_table:
table_name = target_table
logger.debug("targetTable is exist")
con = wr.redshift.connect(
secret_id=redshift_secret, dbname=redshift_db)
try:
nunique = final_df.nunique()[sort_key][0]
logger.debug("nunique is appear multiple time")
except (KeyError, IndexError) as e:
nunique = final_df.nunique()[sort_key]
logger.debug("nunique is appear once time")
if nunique > 1:
logger.debug("nunique>1")
min_key = final_df[sort_key].min()
max_key = final_df[sort_key].max()
if not isinstance(
Expand All @@ -112,8 +124,10 @@ def read(self, source):
str):
min_key = final_df[sort_key].min()[0]
max_key = final_df[sort_key].max()[0]
logger.debug("min and max key are not str")
sql_query = f"SELECT * FROM public.{self.table_name} WHERE {sort_key[0]} between \\'{min_key}\\' and \\'{max_key}\\'"
else:
logger.debug("min and max key are str")
key = final_df[sort_key].values[0]
if not isinstance(key, str):
key = str(key[0])
Expand Down Expand Up @@ -141,6 +155,7 @@ def __init__(self, qa_bucket_name, run_name, table_name):
self.table_name = table_name

def read(self, source):
path = source
columns_to_drop = [
'_hoodie_commit_time',
'_hoodie_commit_seqno',
Expand All @@ -156,9 +171,13 @@ def read(self, source):
pyarrow_additional_kwargs=parquet_args)
try:
primary_key = pk_config[self.table_name][0]
logger.debug(f"pk key for {self.table_name} was found at pks.json")
except KeyError:
logger.error(f"pk key for {self.table_name} was not found at pks.json and we can't process "
f"transform part for this file")
raise KeyError('File not found in config')
if 'transform' in self.run_name:
logger.debug("self.run_name contains transform")
database_name = f"{ENV}_{self.table_name.split('.')[0]}"
athena_table = self.table_name.split('.')[-1]
keys = df.groupby(primary_key)['dms_load_at'].max().tolist()
Expand All @@ -172,19 +191,23 @@ def read(self, source):
keys)].reset_index(drop=True)
try:
path = final_df['_hoodie_commit_time'].iloc[0]
logger.debug("Keys from CDC was found at HUDI table")
except IndexError:
logger.error("Keys from CDC was not found at HUDI table")
raise IndexError('Keys from CDC not found in HUDI table')
final_df = final_df.drop(
columns_to_drop,
axis=1).reset_index(
drop=True)
return final_df, path
else:
logger.debug("self.run_name not contains transform")
keys = df.groupby(primary_key)['dms_load_at'].max().tolist()
final_df = df[df['dms_load_at'].isin(keys)].reset_index(drop=True)
final_df.columns = final_df.columns.str.lower()
try:
final_df = final_df.drop('op', axis=1).reset_index(drop=True)
logger.debug("Op column is exist")
except KeyError:
print('Op column not exist')
logger.warning("Op column is not exist")
return final_df, path
16 changes: 15 additions & 1 deletion functions/data_test/data_test/data_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,30 @@
import awswrangler as wr
import json
from datasource import prepare_final_ds, get_source_name, get_file_extension

from loguru import logger

def handler(event, context):
logger.info("Starting data test")
if os.environ['ENVIRONMENT'] == 'local':
endpoint_url = (f"http://{os.environ['S3_HOST']}:"
f"{os.environ['S3_PORT']}")
s3 = boto3.resource("s3", endpoint_url=endpoint_url)
logger.debug("ENVIRONMENT is local")
else:
s3 = boto3.resource("s3")
logger.debug("ENVIRONMENT is cloud")
cloudfront = os.environ['REPORTS_WEB']
qa_bucket_name = os.environ['BUCKET']
run_name = event['run_name']
if 'engine' in event:
engine = event['engine']
logger.debug("engine was found at input")
else:
config_file = wr.s3.read_json(
path=f"s3://{qa_bucket_name}/test_configs/pipeline.json").to_json()
pipeline_config = json.loads(config_file)
engine = pipeline_config[run_name]["engine"]
logger.debug("engine was not found at input, but in confing file")
source_root = event["source_root"]
source_input = event["source_data"]
coverage_config = json.loads(
Expand All @@ -34,22 +39,30 @@ def handler(event, context):
.read().decode("utf-8"))
if not isinstance(source_input, list):
source = [source_input]
logger.debug("source_input is not list")
else:
source = source_input
logger.debug("source_input is list")
if event.get('table'):
source_name = event['table']
logger.debug("input contains table name")
else:
source_extension = get_file_extension(source[0])
source_name = get_source_name(source[0], source_extension)
logger.debug("input not contains table name")
suite_name = f"{source_name}_{run_name}"
try:
source_covered = coverage_config[suite_name]['complexSuite']
logger.debug(f"complexSuite param for {suite_name} was found at test_coverage.json")
except (IndexError, KeyError):
source_covered = False
logger.warning(f"complexSuite param for {suite_name} was not found at test_coverage.json and set to False")
try:
suite_coverage_config = coverage_config[suite_name]
logger.debug(f" {suite_name} was found at test_coverage.json")
except (IndexError, KeyError):
suite_coverage_config = None
logger.warning(f" {suite_name} was not found at test_coverage.json and set to None")

final_ds, path = prepare_final_ds(source, engine, source_root, run_name,
source_name, suite_coverage_config)
Expand All @@ -73,4 +86,5 @@ def handler(event, context):
"run_name": run_name,
"validate_id": validate_id
}
logger.info("Data test is finished successfully")
return report
10 changes: 8 additions & 2 deletions functions/data_test/data_test/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import re
import pathlib
from data_source_factory import DataSourceFactory

from loguru import logger
qa_bucket_name = os.environ['BUCKET']


Expand All @@ -14,7 +14,9 @@ def concat_source_list(source, source_engine):


def get_file_extension(source):
return pathlib.Path(source).suffix[1:]
extension = pathlib.Path(source).suffix[1:]
logger.debug(f"file extension is {extension}")
return extension


def read_source(source, engine, extension, run_name, table_name=None,
Expand All @@ -33,18 +35,22 @@ def get_source_name(source, extension):

def prepare_final_ds(source, engine, source_engine, run_name, source_name=None,
coverage_config=None):
logger.info(f"Engine is {engine}")
path = source
if engine == 's3':
source = concat_source_list(source, source_engine)
source_extension = get_file_extension(source[0])
df, path = read_source(source, engine, source_extension, run_name)
logger.debug("going through s3 branch")
elif engine == 'hudi':
source = concat_source_list(source, source_engine)
source_extension = get_file_extension(source[0])
df, path = read_source(
source, engine, source_extension, run_name, source_name)
logger.debug("going through hudi branch")
else:
source = concat_source_list(source, source_engine)
df, path = read_source(source, engine, None,
run_name, source_name, coverage_config)
logger.debug("going through default branch")
return df, path
Loading

0 comments on commit e54c92b

Please sign in to comment.