diff --git a/functions/allure_report/allure_report/make_allure_report.py b/functions/allure_report/allure_report/make_allure_report.py index 0ab0e44..0f4f56c 100644 --- a/functions/allure_report/allure_report/make_allure_report.py +++ b/functions/allure_report/allure_report/make_allure_report.py @@ -1,8 +1,9 @@ from mapper import create_json_report import os - +from loguru import logger def handler(event, context): + logger.info("Starting making of allure report") qa_bucket = os.environ['BUCKET'] reports_web = os.environ['REPORTS_WEB'] report = event['report'].get('Payload') @@ -12,5 +13,6 @@ def handler(event, context): link, key = create_json_report(suite, reports_web, folder_key, validate_id) os.system("chmod +x generate_report.sh") os.system(f"sh generate_report.sh {key} {qa_bucket}") + logger.info("Making of allure report is finished") return link diff --git a/functions/allure_report/allure_report/mapper.py b/functions/allure_report/allure_report/mapper.py index 4a99a00..f0e8967 100644 --- a/functions/allure_report/allure_report/mapper.py +++ b/functions/allure_report/allure_report/mapper.py @@ -10,7 +10,7 @@ import awswrangler as wr import boto3 import re - +from loguru import logger qa_bucket = os.environ['BUCKET'] s3 = boto3.resource('s3') bucket = s3.Bucket(qa_bucket) @@ -30,13 +30,16 @@ def get_test_human_name(file): for key, value in params.items(): if type(value) == list: if key == 'value_set': + logger.debug("key in param is value_set") for i in value: new_params[f"v__{str(value.index(i))}"] = i elif key == 'column_set': + logger.debug("key in param is column_set") for i in value: new_params[f"column_list_{str(value.index(i))}"] = i else: for i in value: + logger.debug("key in param is other") new_params[f"{str(key)}_{str(value.index(i))}"] = i if new_params: diff --git a/functions/allure_report/requirements.txt b/functions/allure_report/requirements.txt index f3ed878..3daa174 100644 --- a/functions/allure_report/requirements.txt +++ b/functions/allure_report/requirements.txt @@ -11,3 +11,4 @@ boto==2.49.0 pyarrow==3.0.0 fastparquet==0.8.1 awswrangler==2.12.1 +loguru==0.7.0 diff --git a/functions/data_test/data_test/Expectation_report_new.py b/functions/data_test/data_test/Expectation_report_new.py index ddbfcbd..315d457 100644 --- a/functions/data_test/data_test/Expectation_report_new.py +++ b/functions/data_test/data_test/Expectation_report_new.py @@ -9,7 +9,7 @@ from ydata_profiling.config import Settings from ydata_profiling.model.handler import Handler import re - +from loguru import logger class ExpectationsReportNew: config: Settings @@ -51,6 +51,7 @@ def to_expectation_suite( try: import great_expectations as ge except ImportError: + logger.error("Please install great expectations before using the expectation functionality") raise ImportError( "Please install great expectations before using the expectation functionality" ) @@ -66,7 +67,9 @@ def to_expectation_suite( new_column_in_mapping = {} try: mapping_schema = mapping_config[suite_name] + logger.debug(f"mapping schema was found for {suite_name}") except (KeyError, TypeError): + logger.debug(f"mapping schema was not found for {suite_name}") mapping_schema = None data_asset = data_context.get_datasource( @@ -74,19 +77,23 @@ def to_expectation_suite( batch_request = data_asset.build_batch_request() if reuse_suite: + logger.debug(f"reuse suite branch") if use_old_suite: + logger.debug(f"use old suite branch") suite_old = data_context.get_expectation_suite( f"{suite_name}_{old_suite_name}") data_context.add_or_update_expectation_suite( expectations=suite_old.expectations, expectation_suite_name=f"{suite_name}_{run_name}") else: + logger.debug(f"not use old suite branch") schema_list = list(mapping_schema.keys()) dict_keys = [ i for i in mapping_schema if isinstance( mapping_schema[i], dict)] if not dict_keys: + logger.debug(f"dict keys were not found") suite_old = data_context.get_expectation_suite( f"{suite_name}_{old_suite_name}") schema_list.append("_nocolumn") @@ -98,6 +105,7 @@ def to_expectation_suite( new_column_in_mapping.update( {key: mapping_schema[key]}) if new_column_in_mapping_keys: + logger.debug(f"new_column_in_mapping_keys were found") schema_list = [ x for x in schema_list if x not in new_column_in_mapping_keys and x not in ignored_columns] old_schema_list = list( @@ -164,6 +172,7 @@ def to_expectation_suite( expectations=suite.expectations, expectation_suite_name=f"{suite_name}_{run_name}") else: # if we have nested tables + logger.debug(f"dict keys were found") r = re.compile("new_col_added") new_column_in_mapping_keys = list( filter(r.match, schema_list)) @@ -288,7 +297,7 @@ def to_expectation_suite( expectations=suite.expectations, expectation_suite_name=f"{suite_name}_{run_name}") else: - + logger.debug(f"new suite branch") suite = data_context.add_or_update_expectation_suite( expectation_suite_name=f"{suite_name}_{run_name}" ) diff --git a/functions/data_test/data_test/data_source_factory.py b/functions/data_test/data_test/data_source_factory.py index c7278b0..9014642 100644 --- a/functions/data_test/data_test/data_source_factory.py +++ b/functions/data_test/data_test/data_source_factory.py @@ -2,7 +2,7 @@ import awswrangler as wr import os import json - +from loguru import logger ENV = os.environ['ENVIRONMENT'] @@ -40,6 +40,7 @@ def __init__(self, extension): self.extension = extension def read(self, source): + logger.info("DataSource is s3") if self.extension == 'csv': return wr.s3.read_csv(path=source), source elif self.extension == 'parquet': @@ -67,6 +68,7 @@ def read(self, source): ctas_approach=False, s3_output=f"s3://{self.qa_bucket_name}/athena_results/" ) + logger.info("DataSource is Athena") return final_df, source @@ -78,31 +80,41 @@ def __init__(self, qa_bucket_name, run_name, table_name, coverage_config): self.coverage_config = coverage_config def read(self, source): + logger.info("DataSource is Redshift") final_df = wr.s3.read_parquet(path=source, ignore_index=True) final_df.columns = final_df.columns.str.lower() if 'redshift' in self.run_name: redshift_db = os.environ['REDSHIFT_DB'] redshift_secret = os.environ['REDSHIFT_SECRET'] + logger.debug("self.run_name contains redshift") try: sort_keys_config = json.loads(wr.s3.read_json( path=f"s3://{self.qa_bucket_name}/test_configs/sort_keys.json").to_json()) sort_key = list(map(str.lower, sort_keys_config[self.table_name]["sortKey"])) + logger.debug(f"sort_key was found for {self.table_name} at sort_keys.json") except KeyError: sort_key = ['update_dt'] + logger.warning(f"sort_key was not found for {self.table_name} at sort_keys.json and set to update_dt") try: target_table = self.coverage_config["targetTable"] + logger.debug("targetTable was found at test_coverage.json") except (KeyError, IndexError, TypeError) as e: target_table = None + logger.warning("targetTable was not found at test_coverage.json and set to None") if target_table: table_name = target_table + logger.debug("targetTable is exist") con = wr.redshift.connect( secret_id=redshift_secret, dbname=redshift_db) try: nunique = final_df.nunique()[sort_key][0] + logger.debug("nunique is appear multiple time") except (KeyError, IndexError) as e: nunique = final_df.nunique()[sort_key] + logger.debug("nunique is appear once time") if nunique > 1: + logger.debug("nunique>1") min_key = final_df[sort_key].min() max_key = final_df[sort_key].max() if not isinstance( @@ -112,8 +124,10 @@ def read(self, source): str): min_key = final_df[sort_key].min()[0] max_key = final_df[sort_key].max()[0] + logger.debug("min and max key are not str") sql_query = f"SELECT * FROM public.{self.table_name} WHERE {sort_key[0]} between \\'{min_key}\\' and \\'{max_key}\\'" else: + logger.debug("min and max key are str") key = final_df[sort_key].values[0] if not isinstance(key, str): key = str(key[0]) @@ -141,6 +155,7 @@ def __init__(self, qa_bucket_name, run_name, table_name): self.table_name = table_name def read(self, source): + path = source columns_to_drop = [ '_hoodie_commit_time', '_hoodie_commit_seqno', @@ -156,9 +171,13 @@ def read(self, source): pyarrow_additional_kwargs=parquet_args) try: primary_key = pk_config[self.table_name][0] + logger.debug(f"pk key for {self.table_name} was found at pks.json") except KeyError: + logger.error(f"pk key for {self.table_name} was not found at pks.json and we can't process " + f"transform part for this file") raise KeyError('File not found in config') if 'transform' in self.run_name: + logger.debug("self.run_name contains transform") database_name = f"{ENV}_{self.table_name.split('.')[0]}" athena_table = self.table_name.split('.')[-1] keys = df.groupby(primary_key)['dms_load_at'].max().tolist() @@ -172,7 +191,9 @@ def read(self, source): keys)].reset_index(drop=True) try: path = final_df['_hoodie_commit_time'].iloc[0] + logger.debug("Keys from CDC was found at HUDI table") except IndexError: + logger.error("Keys from CDC was not found at HUDI table") raise IndexError('Keys from CDC not found in HUDI table') final_df = final_df.drop( columns_to_drop, @@ -180,11 +201,13 @@ def read(self, source): drop=True) return final_df, path else: + logger.debug("self.run_name not contains transform") keys = df.groupby(primary_key)['dms_load_at'].max().tolist() final_df = df[df['dms_load_at'].isin(keys)].reset_index(drop=True) final_df.columns = final_df.columns.str.lower() try: final_df = final_df.drop('op', axis=1).reset_index(drop=True) + logger.debug("Op column is exist") except KeyError: - print('Op column not exist') + logger.warning("Op column is not exist") return final_df, path diff --git a/functions/data_test/data_test/data_test.py b/functions/data_test/data_test/data_test.py index ad4ccc6..154cd90 100755 --- a/functions/data_test/data_test/data_test.py +++ b/functions/data_test/data_test/data_test.py @@ -5,25 +5,30 @@ import awswrangler as wr import json from datasource import prepare_final_ds, get_source_name, get_file_extension - +from loguru import logger def handler(event, context): + logger.info("Starting data test") if os.environ['ENVIRONMENT'] == 'local': endpoint_url = (f"http://{os.environ['S3_HOST']}:" f"{os.environ['S3_PORT']}") s3 = boto3.resource("s3", endpoint_url=endpoint_url) + logger.debug("ENVIRONMENT is local") else: s3 = boto3.resource("s3") + logger.debug("ENVIRONMENT is cloud") cloudfront = os.environ['REPORTS_WEB'] qa_bucket_name = os.environ['BUCKET'] run_name = event['run_name'] if 'engine' in event: engine = event['engine'] + logger.debug("engine was found at input") else: config_file = wr.s3.read_json( path=f"s3://{qa_bucket_name}/test_configs/pipeline.json").to_json() pipeline_config = json.loads(config_file) engine = pipeline_config[run_name]["engine"] + logger.debug("engine was not found at input, but in confing file") source_root = event["source_root"] source_input = event["source_data"] coverage_config = json.loads( @@ -34,22 +39,30 @@ def handler(event, context): .read().decode("utf-8")) if not isinstance(source_input, list): source = [source_input] + logger.debug("source_input is not list") else: source = source_input + logger.debug("source_input is list") if event.get('table'): source_name = event['table'] + logger.debug("input contains table name") else: source_extension = get_file_extension(source[0]) source_name = get_source_name(source[0], source_extension) + logger.debug("input not contains table name") suite_name = f"{source_name}_{run_name}" try: source_covered = coverage_config[suite_name]['complexSuite'] + logger.debug(f"complexSuite param for {suite_name} was found at test_coverage.json") except (IndexError, KeyError): source_covered = False + logger.warning(f"complexSuite param for {suite_name} was not found at test_coverage.json and set to False") try: suite_coverage_config = coverage_config[suite_name] + logger.debug(f" {suite_name} was found at test_coverage.json") except (IndexError, KeyError): suite_coverage_config = None + logger.warning(f" {suite_name} was not found at test_coverage.json and set to None") final_ds, path = prepare_final_ds(source, engine, source_root, run_name, source_name, suite_coverage_config) @@ -73,4 +86,5 @@ def handler(event, context): "run_name": run_name, "validate_id": validate_id } + logger.info("Data test is finished successfully") return report diff --git a/functions/data_test/data_test/datasource.py b/functions/data_test/data_test/datasource.py index 9013d99..8c36ffd 100644 --- a/functions/data_test/data_test/datasource.py +++ b/functions/data_test/data_test/datasource.py @@ -2,7 +2,7 @@ import re import pathlib from data_source_factory import DataSourceFactory - +from loguru import logger qa_bucket_name = os.environ['BUCKET'] @@ -14,7 +14,9 @@ def concat_source_list(source, source_engine): def get_file_extension(source): - return pathlib.Path(source).suffix[1:] + extension = pathlib.Path(source).suffix[1:] + logger.debug(f"file extension is {extension}") + return extension def read_source(source, engine, extension, run_name, table_name=None, @@ -33,18 +35,22 @@ def get_source_name(source, extension): def prepare_final_ds(source, engine, source_engine, run_name, source_name=None, coverage_config=None): + logger.info(f"Engine is {engine}") path = source if engine == 's3': source = concat_source_list(source, source_engine) source_extension = get_file_extension(source[0]) df, path = read_source(source, engine, source_extension, run_name) + logger.debug("going through s3 branch") elif engine == 'hudi': source = concat_source_list(source, source_engine) source_extension = get_file_extension(source[0]) df, path = read_source( source, engine, source_extension, run_name, source_name) + logger.debug("going through hudi branch") else: source = concat_source_list(source, source_engine) df, path = read_source(source, engine, None, run_name, source_name, coverage_config) + logger.debug("going through default branch") return df, path diff --git a/functions/data_test/data_test/profiling.py b/functions/data_test/data_test/profiling.py index 0fb59b6..cd4e58a 100755 --- a/functions/data_test/data_test/profiling.py +++ b/functions/data_test/data_test/profiling.py @@ -15,6 +15,7 @@ S3StoreBackendDefaults) import yaml from scipy.stats import t +from loguru import logger DEFAULT_CONFIG_FILE_PATH = "great_expectations/great_expectations.yml" @@ -76,6 +77,7 @@ def expectations_quantile(name, summary, batch, *args): def expectations_z_score(name, summary, batch, *args): threshold = calculate_z_score(summary) if threshold and threshold == threshold: + logger.debug("threshold is not None") batch.expect_column_value_z_scores_to_be_less_than( column=name, threshold=threshold, double_sided=False) return name, summary, batch @@ -253,6 +255,7 @@ def calculate_q_ranges(summary): def profile_data(df, suite_name, cloudfront, datasource_root, source_covered, mapping_config, run_name): + logger.info("starting profiling") qa_bucket = s3.Bucket(qa_bucket_name) config = change_ge_config(datasource_root) context_ge = EphemeralDataContext(project_config=config) @@ -261,26 +264,33 @@ def profile_data(df, suite_name, cloudfront, datasource_root, source_covered, try: profile = ProfileReport(df, title=f"{suite_name} Profiling Report", minimal=True, pool_size=1) + logger.info("profiling in minimal mode") except TypeError: profile = ProfileReport(df, title=f"{suite_name} Profiling Report", pool_size=1) + logger.warning("profiling in default mode") try: report = profile.to_html() + logger.debug("profiling converted to html successfully") except ValueError: profile.config.vars.text.words = False report = profile.to_html() + logger.warning("profiling had problems with text.words during process") if not source_covered: + logger.debug("suite is not covered") try: pipeline_config = json.loads(wr.s3.read_json( path=f"s3://{qa_bucket_name}/test_configs/pipeline.json").to_json()) reuse_suite = pipeline_config[run_name]['reuse_suite'] use_old_suite_only = pipeline_config[run_name]['use_old_suite_only'] old_suite_name = pipeline_config[run_name]['old_suite_name'] + logger.debug("all params were found at configs for pipeline") except KeyError: reuse_suite = False use_old_suite_only = False old_suite_name = None + logger.warning("some params were not found at configs for pipeline") ExpectationsReport.to_expectation_suite = ExpectationsReportNew.to_expectation_suite suite = profile.to_expectation_suite( data_context=context_ge, @@ -301,4 +311,5 @@ def profile_data(df, suite_name, cloudfront, datasource_root, source_covered, qa_bucket.put_object(Key=f"{folder}{suite_name}_profiling.html", Body=report, ContentType='text/html') profile_link = f"{cloudfront}/{folder}{suite_name}_profiling.html" + logger.info("profiling is finished") return profile_link, date_time, context_ge, data_asset diff --git a/functions/data_test/data_test/suite_run.py b/functions/data_test/data_test/suite_run.py index a278665..49bb5b9 100644 --- a/functions/data_test/data_test/suite_run.py +++ b/functions/data_test/data_test/suite_run.py @@ -1,10 +1,12 @@ from pathlib import Path from great_expectations.data_context import EphemeralDataContext from great_expectations.checkpoint import SimpleCheckpoint +from loguru import logger BASE_DIR = Path(__file__).resolve().parent def validate_data(file, suite_name, saved_context, data_asset): + logger.info("Starting suite run") context_ge = saved_context expectation_suite_name = suite_name batch_request = data_asset.build_batch_request() @@ -25,7 +27,7 @@ def validate_data(file, suite_name, saved_context, data_asset): results = checkpoint.run(result_format="SUMMARY", run_name=suite_name) validation_result_identifier = results.list_validation_result_identifiers()[ 0] - + logger.info("Suite running is finished") if not results['success']: context_ge.build_data_docs( site_names='s3_site', @@ -33,4 +35,5 @@ def validate_data(file, suite_name, saved_context, data_asset): ) result = str(validation_result_identifier).replace( 'ValidationResultIdentifier::', '') + logger.info("Building of data docs") return result diff --git a/functions/data_test/requirements.txt b/functions/data_test/requirements.txt index eab4527..fe82cfd 100755 --- a/functions/data_test/requirements.txt +++ b/functions/data_test/requirements.txt @@ -8,3 +8,4 @@ fastparquet==0.8.1 awswrangler==2.19.0 ydata-profiling==4.2.0 jinja2==3.0.3 +loguru==0.7.0 diff --git a/functions/report_push/report_push/jira_events.py b/functions/report_push/report_push/jira_events.py index 6ccf0ed..97f8fac 100644 --- a/functions/report_push/report_push/jira_events.py +++ b/functions/report_push/report_push/jira_events.py @@ -6,7 +6,7 @@ API_PASSWORD = os.getenv("DATAQA_JIRA_PASSWORD") options = {'server': API_URL} - +from loguru import logger def auth_in_jira(): global jira @@ -25,7 +25,7 @@ def open_bug(table_name: str, fail_step: str, description: str, elif summary == str(single_issue.fields.summary) and str( single_issue.fields.status) != 'Open': ticket_exist = True - print(f'Will be reopen bug with name[{summary}]') + logger.debug(f"Will be reopen bug with name[{summary}]") jira.transition_issue(single_issue.key, transition='Re-Open') break if not ticket_exist: @@ -41,7 +41,7 @@ def get_all_issues(jira_project_key): def create_new_bug(description, replaced_allure_links, summary, jira_project_key): - print(f'Will be created bug with name[{summary}]') + logger.debug(f"Will be created bug with name[{summary}]") jira.create_issue( fields={ "project": {"key": jira_project_key}, diff --git a/functions/report_push/report_push/push_data_report.py b/functions/report_push/report_push/push_data_report.py index 0c9aacb..09e5968 100644 --- a/functions/report_push/report_push/push_data_report.py +++ b/functions/report_push/report_push/push_data_report.py @@ -5,6 +5,7 @@ import awswrangler as wr import random from jira_events import auth_in_jira, get_all_issues, open_bug +from loguru import logger s3 = boto3.resource('s3') sns = boto3.client('sns') @@ -19,6 +20,7 @@ def handler(event, context): + logger.info("Starting pushing notifications and metadata") replaced_allure_links = event['links'].get('Payload') report = event['report'].get('Payload') profiling_link = (report.get('profiling')) @@ -76,16 +78,19 @@ def handler(event, context): path=f"s3://{qa_bucket}/test_configs/pipeline.json").to_json()) try: autobug = pipeline_config[run_name]['autobug'] + logger.debug("autobug param was found at pipeline.json") except KeyError: autobug = False - print(f"Can't find autobug param for {run_name}") + logger.warning(f"Can't find autobug param for {run_name}") try: only_failed = pipeline_config[run_name]["only_failed"] + logger.debug("only_failed param was found at pipeline.json") except KeyError: only_failed = True - print(f"Can't find only_failed param for {run_name}") + logger.warning(f"Can't find only_failed param for {run_name}") if autobug and failed: + logger.debug("autobug and failed params branch") jira_project_key = os.environ['JIRA_PROJECT_KEY'] auth_in_jira() created_bug_count, bug_name = create_jira_bugs_from_allure_result( @@ -110,6 +115,8 @@ def handler(event, context): sns_bugs_topic, only_failed) + logger.info("Finished pushing notifications and metadata") + return report @@ -119,6 +126,7 @@ def create_jira_bugs_from_allure_result( replaced_allure_links, suite, jira_project_key): + logger.info("Starting creating bugs at Jira") created_bug_count = 0 bug_name = [] all_result_files = bucket.objects.filter( @@ -142,6 +150,7 @@ def create_jira_bugs_from_allure_result( issues, jira_project_key, )) + logger.info("Finished creating bugs at Jira") return created_bug_count, bug_name @@ -157,6 +166,7 @@ def push_sns_message( passed, sns_bugs_topic, only_failed): + logger.info("Starting pushing sns messages") message_structure = 'json' allure_link = f"http://{replaced_allure_links}" if created_bug_count > 0: @@ -189,6 +199,7 @@ def push_sns_message( sns.publish(TopicArn=sns_bugs_topic, Message=sns_message, MessageStructure=message_structure) + logger.info("Finished pushing sns messages") def push_cloudwatch_metrics(suite, @@ -196,6 +207,7 @@ def push_cloudwatch_metrics(suite, failed, created_bug_count, cloudwatch): + logger.info("Starting pushing cloudwatch") if created_bug_count > 0: metric_data = { 'MetricName': 'bug_created_count', @@ -234,3 +246,4 @@ def push_cloudwatch_metrics(suite, metric_data, ] ) + logger.info("Finished pushing cloudwatch") diff --git a/functions/report_push/requirements.txt b/functions/report_push/requirements.txt index 7ac3ad7..019826f 100644 --- a/functions/report_push/requirements.txt +++ b/functions/report_push/requirements.txt @@ -5,3 +5,4 @@ s3fs==0.4.2 python-dateutil==2.8.2 awswrangler==2.19.0 jira==3.2.0 +loguru==0.7.0