Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MAINTENANCE] Added logging instead of print #113

Merged
merged 4 commits into from
Aug 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion functions/allure_report/allure_report/make_allure_report.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from mapper import create_json_report
import os

from loguru import logger

def handler(event, context):
logger.info("Starting making of allure report")
qa_bucket = os.environ['BUCKET']
reports_web = os.environ['REPORTS_WEB']
report = event['report'].get('Payload')
Expand All @@ -12,5 +13,6 @@ def handler(event, context):
link, key = create_json_report(suite, reports_web, folder_key, validate_id)
os.system("chmod +x generate_report.sh")
os.system(f"sh generate_report.sh {key} {qa_bucket}")
logger.info("Making of allure report is finished")

return link
5 changes: 4 additions & 1 deletion functions/allure_report/allure_report/mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import awswrangler as wr
import boto3
import re

from loguru import logger
qa_bucket = os.environ['BUCKET']
s3 = boto3.resource('s3')
bucket = s3.Bucket(qa_bucket)
Expand All @@ -30,13 +30,16 @@ def get_test_human_name(file):
for key, value in params.items():
if type(value) == list:
if key == 'value_set':
logger.debug("key in param is value_set")
for i in value:
new_params[f"v__{str(value.index(i))}"] = i
elif key == 'column_set':
logger.debug("key in param is column_set")
for i in value:
new_params[f"column_list_{str(value.index(i))}"] = i
else:
for i in value:
logger.debug("key in param is other")
new_params[f"{str(key)}_{str(value.index(i))}"] = i

if new_params:
Expand Down
1 change: 1 addition & 0 deletions functions/allure_report/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ boto==2.49.0
pyarrow==3.0.0
fastparquet==0.8.1
awswrangler==2.12.1
loguru==0.7.0
13 changes: 11 additions & 2 deletions functions/data_test/data_test/Expectation_report_new.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from ydata_profiling.config import Settings
from ydata_profiling.model.handler import Handler
import re

from loguru import logger

class ExpectationsReportNew:
config: Settings
Expand Down Expand Up @@ -51,6 +51,7 @@ def to_expectation_suite(
try:
import great_expectations as ge
except ImportError:
logger.error("Please install great expectations before using the expectation functionality")
raise ImportError(
"Please install great expectations before using the expectation functionality"
)
Expand All @@ -66,27 +67,33 @@ def to_expectation_suite(
new_column_in_mapping = {}
try:
mapping_schema = mapping_config[suite_name]
logger.debug(f"mapping schema was found for {suite_name}")
except (KeyError, TypeError):
logger.debug(f"mapping schema was not found for {suite_name}")
mapping_schema = None

data_asset = data_context.get_datasource(
"cloud").get_asset(f"{suite_name}_{run_name}")
batch_request = data_asset.build_batch_request()

if reuse_suite:
logger.debug(f"reuse suite branch")
if use_old_suite:
logger.debug(f"use old suite branch")
suite_old = data_context.get_expectation_suite(
f"{suite_name}_{old_suite_name}")
data_context.add_or_update_expectation_suite(
expectations=suite_old.expectations,
expectation_suite_name=f"{suite_name}_{run_name}")
else:
logger.debug(f"not use old suite branch")
schema_list = list(mapping_schema.keys())
dict_keys = [
i for i in mapping_schema if isinstance(
mapping_schema[i], dict)]

if not dict_keys:
logger.debug(f"dict keys were not found")
suite_old = data_context.get_expectation_suite(
f"{suite_name}_{old_suite_name}")
schema_list.append("_nocolumn")
Expand All @@ -98,6 +105,7 @@ def to_expectation_suite(
new_column_in_mapping.update(
{key: mapping_schema[key]})
if new_column_in_mapping_keys:
logger.debug(f"new_column_in_mapping_keys were found")
schema_list = [
x for x in schema_list if x not in new_column_in_mapping_keys and x not in ignored_columns]
old_schema_list = list(
Expand Down Expand Up @@ -164,6 +172,7 @@ def to_expectation_suite(
expectations=suite.expectations,
expectation_suite_name=f"{suite_name}_{run_name}")
else: # if we have nested tables
logger.debug(f"dict keys were found")
r = re.compile("new_col_added")
new_column_in_mapping_keys = list(
filter(r.match, schema_list))
Expand Down Expand Up @@ -288,7 +297,7 @@ def to_expectation_suite(
expectations=suite.expectations,
expectation_suite_name=f"{suite_name}_{run_name}")
else:

logger.debug(f"new suite branch")
suite = data_context.add_or_update_expectation_suite(
expectation_suite_name=f"{suite_name}_{run_name}"
)
Expand Down
27 changes: 25 additions & 2 deletions functions/data_test/data_test/data_source_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import awswrangler as wr
import os
import json

from loguru import logger
ENV = os.environ['ENVIRONMENT']


Expand Down Expand Up @@ -40,6 +40,7 @@ def __init__(self, extension):
self.extension = extension

def read(self, source):
logger.info("DataSource is s3")
if self.extension == 'csv':
return wr.s3.read_csv(path=source), source
elif self.extension == 'parquet':
Expand Down Expand Up @@ -67,6 +68,7 @@ def read(self, source):
ctas_approach=False,
s3_output=f"s3://{self.qa_bucket_name}/athena_results/"
)
logger.info("DataSource is Athena")
return final_df, source


Expand All @@ -78,31 +80,41 @@ def __init__(self, qa_bucket_name, run_name, table_name, coverage_config):
self.coverage_config = coverage_config

def read(self, source):
logger.info("DataSource is Redshift")
final_df = wr.s3.read_parquet(path=source, ignore_index=True)
final_df.columns = final_df.columns.str.lower()
if 'redshift' in self.run_name:
redshift_db = os.environ['REDSHIFT_DB']
redshift_secret = os.environ['REDSHIFT_SECRET']
logger.debug("self.run_name contains redshift")
try:
sort_keys_config = json.loads(wr.s3.read_json(
path=f"s3://{self.qa_bucket_name}/test_configs/sort_keys.json").to_json())
sort_key = list(map(str.lower,
sort_keys_config[self.table_name]["sortKey"]))
logger.debug(f"sort_key was found for {self.table_name} at sort_keys.json")
except KeyError:
sort_key = ['update_dt']
logger.warning(f"sort_key was not found for {self.table_name} at sort_keys.json and set to update_dt")
try:
target_table = self.coverage_config["targetTable"]
logger.debug("targetTable was found at test_coverage.json")
except (KeyError, IndexError, TypeError) as e:
target_table = None
logger.warning("targetTable was not found at test_coverage.json and set to None")
if target_table:
table_name = target_table
logger.debug("targetTable is exist")
con = wr.redshift.connect(
secret_id=redshift_secret, dbname=redshift_db)
try:
nunique = final_df.nunique()[sort_key][0]
logger.debug("nunique is appear multiple time")
except (KeyError, IndexError) as e:
nunique = final_df.nunique()[sort_key]
logger.debug("nunique is appear once time")
if nunique > 1:
logger.debug("nunique>1")
min_key = final_df[sort_key].min()
max_key = final_df[sort_key].max()
if not isinstance(
Expand All @@ -112,8 +124,10 @@ def read(self, source):
str):
min_key = final_df[sort_key].min()[0]
max_key = final_df[sort_key].max()[0]
logger.debug("min and max key are not str")
sql_query = f"SELECT * FROM public.{self.table_name} WHERE {sort_key[0]} between \\'{min_key}\\' and \\'{max_key}\\'"
else:
logger.debug("min and max key are str")
key = final_df[sort_key].values[0]
if not isinstance(key, str):
key = str(key[0])
Expand Down Expand Up @@ -141,6 +155,7 @@ def __init__(self, qa_bucket_name, run_name, table_name):
self.table_name = table_name

def read(self, source):
path = source
columns_to_drop = [
'_hoodie_commit_time',
'_hoodie_commit_seqno',
Expand All @@ -156,9 +171,13 @@ def read(self, source):
pyarrow_additional_kwargs=parquet_args)
try:
primary_key = pk_config[self.table_name][0]
logger.debug(f"pk key for {self.table_name} was found at pks.json")
except KeyError:
logger.error(f"pk key for {self.table_name} was not found at pks.json and we can't process "
f"transform part for this file")
raise KeyError('File not found in config')
if 'transform' in self.run_name:
logger.debug("self.run_name contains transform")
database_name = f"{ENV}_{self.table_name.split('.')[0]}"
athena_table = self.table_name.split('.')[-1]
keys = df.groupby(primary_key)['dms_load_at'].max().tolist()
Expand All @@ -172,19 +191,23 @@ def read(self, source):
keys)].reset_index(drop=True)
try:
path = final_df['_hoodie_commit_time'].iloc[0]
logger.debug("Keys from CDC was found at HUDI table")
except IndexError:
logger.error("Keys from CDC was not found at HUDI table")
raise IndexError('Keys from CDC not found in HUDI table')
final_df = final_df.drop(
columns_to_drop,
axis=1).reset_index(
drop=True)
return final_df, path
else:
logger.debug("self.run_name not contains transform")
keys = df.groupby(primary_key)['dms_load_at'].max().tolist()
final_df = df[df['dms_load_at'].isin(keys)].reset_index(drop=True)
final_df.columns = final_df.columns.str.lower()
try:
final_df = final_df.drop('op', axis=1).reset_index(drop=True)
logger.debug("Op column is exist")
except KeyError:
print('Op column not exist')
logger.warning("Op column is not exist")
return final_df, path
16 changes: 15 additions & 1 deletion functions/data_test/data_test/data_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,30 @@
import awswrangler as wr
import json
from datasource import prepare_final_ds, get_source_name, get_file_extension

from loguru import logger

def handler(event, context):
logger.info("Starting data test")
if os.environ['ENVIRONMENT'] == 'local':
endpoint_url = (f"http://{os.environ['S3_HOST']}:"
f"{os.environ['S3_PORT']}")
s3 = boto3.resource("s3", endpoint_url=endpoint_url)
logger.debug("ENVIRONMENT is local")
else:
s3 = boto3.resource("s3")
logger.debug("ENVIRONMENT is cloud")
cloudfront = os.environ['REPORTS_WEB']
qa_bucket_name = os.environ['BUCKET']
run_name = event['run_name']
if 'engine' in event:
engine = event['engine']
logger.debug("engine was found at input")
else:
config_file = wr.s3.read_json(
path=f"s3://{qa_bucket_name}/test_configs/pipeline.json").to_json()
pipeline_config = json.loads(config_file)
engine = pipeline_config[run_name]["engine"]
logger.debug("engine was not found at input, but in confing file")
source_root = event["source_root"]
source_input = event["source_data"]
coverage_config = json.loads(
Expand All @@ -34,22 +39,30 @@ def handler(event, context):
.read().decode("utf-8"))
if not isinstance(source_input, list):
source = [source_input]
logger.debug("source_input is not list")
else:
source = source_input
logger.debug("source_input is list")
if event.get('table'):
source_name = event['table']
logger.debug("input contains table name")
else:
source_extension = get_file_extension(source[0])
source_name = get_source_name(source[0], source_extension)
logger.debug("input not contains table name")
suite_name = f"{source_name}_{run_name}"
try:
source_covered = coverage_config[suite_name]['complexSuite']
logger.debug(f"complexSuite param for {suite_name} was found at test_coverage.json")
except (IndexError, KeyError):
source_covered = False
logger.warning(f"complexSuite param for {suite_name} was not found at test_coverage.json and set to False")
try:
suite_coverage_config = coverage_config[suite_name]
logger.debug(f" {suite_name} was found at test_coverage.json")
except (IndexError, KeyError):
suite_coverage_config = None
logger.warning(f" {suite_name} was not found at test_coverage.json and set to None")

final_ds, path = prepare_final_ds(source, engine, source_root, run_name,
source_name, suite_coverage_config)
Expand All @@ -73,4 +86,5 @@ def handler(event, context):
"run_name": run_name,
"validate_id": validate_id
}
logger.info("Data test is finished successfully")
return report
10 changes: 8 additions & 2 deletions functions/data_test/data_test/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import re
import pathlib
from data_source_factory import DataSourceFactory

from loguru import logger
qa_bucket_name = os.environ['BUCKET']


Expand All @@ -14,7 +14,9 @@ def concat_source_list(source, source_engine):


def get_file_extension(source):
return pathlib.Path(source).suffix[1:]
extension = pathlib.Path(source).suffix[1:]
logger.debug(f"file extension is {extension}")
return extension


def read_source(source, engine, extension, run_name, table_name=None,
Expand All @@ -33,18 +35,22 @@ def get_source_name(source, extension):

def prepare_final_ds(source, engine, source_engine, run_name, source_name=None,
coverage_config=None):
logger.info(f"Engine is {engine}")
path = source
if engine == 's3':
source = concat_source_list(source, source_engine)
source_extension = get_file_extension(source[0])
df, path = read_source(source, engine, source_extension, run_name)
logger.debug("going through s3 branch")
elif engine == 'hudi':
source = concat_source_list(source, source_engine)
source_extension = get_file_extension(source[0])
df, path = read_source(
source, engine, source_extension, run_name, source_name)
logger.debug("going through hudi branch")
else:
source = concat_source_list(source, source_engine)
df, path = read_source(source, engine, None,
run_name, source_name, coverage_config)
logger.debug("going through default branch")
return df, path
Loading
Loading