diff --git a/cafy_pytest/cafy_gta.py b/cafy_pytest/cafy_gta.py new file mode 100644 index 0000000..d5db032 --- /dev/null +++ b/cafy_pytest/cafy_gta.py @@ -0,0 +1,260 @@ +import time +import pytest +from logger.cafylog import CafyLog +import os +import inspect +from jinja2 import Template +import functools +import sys +import inspect +import requests +import json +from .cafygta_config import CafyGTA_Configs + +class TimeCollectorPlugin: + def __init__(self): + self.original_sleep = time.sleep + self.granular_time_testcase_dict = dict() + self.test_case_name = None + self.total_sleep_time = 0 + self.total_set_command_time = 0 + self.total_get_command_time = 0 + + def update_granular_time_testcase_dict(self, current_test, event, method_name, elapsed_time ): + """ + granular_time_testcase_dict + param current_test: current_test + param event: command like set , get or time.sleep + param method_name: method name + param elapsed_time: total time for each command ie, set or get + """ + if current_test not in self.granular_time_testcase_dict: + self.granular_time_testcase_dict[current_test] = dict() + if event not in self.granular_time_testcase_dict[current_test]: + self.granular_time_testcase_dict[current_test][event] = dict() + if method_name not in self.granular_time_testcase_dict[current_test][event]: + self.granular_time_testcase_dict[current_test][event][method_name] = [] + self.granular_time_testcase_dict[current_test][event][method_name].append(float(elapsed_time)) + + def measure_time_for_set_or_get_methods(self, method, cls_name): + """ + Measure the time taken by set methods. + This method wraps set methods to measure their execution time. + param method (function): The set method to be measured. + Returns: function: The wrapped method. + """ + @functools.wraps(method) + def wrapper(*args, **kwargs): + start_time = time.perf_counter() + result = method(*args, **kwargs) + end_time = time.perf_counter() + elapsed_time = '%.2f' % (end_time - start_time) + # Update granular time at the test case level + current_test = self.test_case_name + if current_test not in self.granular_time_testcase_dict: + self.granular_time_testcase_dict[current_test] = dict() + method_name = method.__name__ + if method_name.startswith('set'): + self.update_granular_time_testcase_dict(current_test,'set_command', ".".join([cls_name, method.__name__]), elapsed_time) + elif method_name.startswith('get'): + self.update_granular_time_testcase_dict(current_test, 'get_command', ".".join([cls_name, method.__name__]), elapsed_time) + return result + return wrapper + + def measure_sleep_time(self, duration): + ''' + Method measure_sleep_time : it will measure the actual time taken by testcase method during sleep + param duration: duration or sleep time declared in TC fucntion's + return : Update the graunular time at test case level + ''' + caller_frame = inspect.currentframe().f_back + caller_method_name = caller_frame.f_code.co_name + caller_class = caller_frame.f_locals.get('self').__class__ + # Get the method of the caller's class + caller_method = getattr(caller_class, caller_method_name) + current_test = self.test_case_name + start_time = time.perf_counter() + self.original_sleep(duration) + end_time = time.perf_counter() + elapsed_time = '%.2f' % (end_time - start_time) + self.update_granular_time_testcase_dict(current_test, "sleep_time", ".".join([caller_class.__name__, caller_method.__name__,"time.sleep"]), elapsed_time) + + def patch_set_or_get_methods_for_test_instance(self, item): + """ + Perform setup and teardown actions for test cases. + This method performs setup and teardown actions for test cases,including monkey patching sleep and set methods. + param request: The test request. + Yields: None + """ + # To avoid maximum recursion depth error + if getattr(item.cls, '_decorated', None): + return + test_case_class = item.cls + if test_case_class: + # Get the module of the test case class + module = inspect.getmodule(test_case_class) + for class_name, class_obj in inspect.getmembers(module, inspect.isclass): + # Iterate over the attributes of the class + for method_name, method in inspect.getmembers(class_obj, inspect.isfunction): + # Check if the attribute is callable and its name starts with 'set' + if callable(method) and method_name.startswith('set') or method_name.startswith('get') : + #skipping appyling timer on setup method of testclass + if method_name == 'setup_method': + continue + else: + original_method = getattr(class_obj, method_name) + setattr(class_obj, method_name, self.measure_time_for_set_or_get_methods(original_method,class_name)) + if item.cls is not None: + setattr(item.cls, '_decorated', True) + + def pytest_runtest_protocol(self, item, nextitem): + ''' + Method pytest_runtest_protocol : it will Monkey patch sleep , subprocess run etc + Monkey patching used for modifying the behavior of built-in classes or functions, or adding instrumentation or logging to existing code. + param item : test case item + param nextitem : test case nextitem + return : None + ''' + # Monkey patch time.sleep + time.sleep = self.measure_sleep_time + # Monkey patch 'set' methods for all classes in the module + self.patch_set_or_get_methods_for_test_instance(item) + # get class name of the test case method + base_class_name = "" + if item.cls: + class_name = item.cls + base_classes = inspect.getmro(class_name) + base_class_name = base_classes[0].__name__ + if base_class_name: + self.test_case_name = f"{base_class_name}.{item.name}" + else: + self.test_case_name = f"{item.name}" + return None + + def update_CafyLog_gta_dict(self, current_test): + """ + Update the CafyLog Granular Time Accounting (GTA) dictionary with timing information for the current test + :param current_test:The name of the current test being executed + :return : none + """ + if current_test not in self.granular_time_testcase_dict: + self.granular_time_testcase_dict[current_test] = dict() + if 'set_command' not in self.granular_time_testcase_dict[current_test]: + self.granular_time_testcase_dict[current_test]['set_command'] = dict() + if hasattr(CafyLog,"gta_dict") and 'set_command' in CafyLog.gta_dict: + for key, value in CafyLog.gta_dict['set_command'].items(): + self.granular_time_testcase_dict[current_test]['set_command'][key] = value + if 'get_command' not in self.granular_time_testcase_dict[current_test]: + self.granular_time_testcase_dict[current_test]['get_command'] = dict() + if hasattr(CafyLog,"gta_dict") and 'get_command' in CafyLog.gta_dict: + for key, value in CafyLog.gta_dict['get_command'].items(): + self.granular_time_testcase_dict[current_test]['get_command'][key] = value + + def pytest_runtest_teardown(self, item, nextitem): + """ + Execute teardown actions after a test has been executed + :param item: The test item that was executed + :param nextitem: The next test item in the test suite + :return: None + """ + current_test = self.test_case_name + self.update_CafyLog_gta_dict(current_test) + CafyLog.gta_dict = {} + + def get_time_data(self,data, event): + ''' + get time data + this method will take the timings list for each sleep_time , set_command or get_command + and it will club into as sum and occurence + param data : timings data + ''' + tmp_dict = {} + for command, timings_list in data.items(): + if isinstance(timings_list, list): + total_sum = sum(timings_list) + length = len(timings_list) + if event == 'sleep_time': + self.total_sleep_time = self.total_sleep_time + total_sum + elif event == 'set_command': + self.total_set_command_time = self.total_set_command_time + total_sum + elif event == 'get_command': + self.total_get_command_time = self.total_get_command_time + total_sum + tmp_dict[command] = ["{:.2f}".format(total_sum), length] + else: + tmp_dict[command] = timings_list + return tmp_dict + + def collect_granular_time_accouting_report(self): + ''' + Method collect_granular_time_accouting_report : it will create report and save in cafy work dir + return : create report for time accounting in cafy work dir as granular_time_report.json + ''' + time_report = dict() + for test_case, events in self.granular_time_testcase_dict.items(): + time_report[test_case] = dict() + if 'sleep_time' in events: + time_report[test_case]['sleep_time'] = self.get_time_data(events["sleep_time"],'sleep_time') + else: + time_report[test_case]['sleep_time'] = {} + if 'set_command' in events: + time_report[test_case]['set_command'] = self.get_time_data(events["set_command"],'set_command') + else: + time_report[test_case]['set_command'] = {} + if 'get_command' in events: + time_report[test_case]['get_command'] = self.get_time_data(events["get_command"],'get_command') + else: + time_report[test_case]['get_command'] = {} + + time_report[test_case]['total_sleep_time'] = "{:.2f}".format(self.total_sleep_time) + time_report[test_case]['total_set_command_time'] = "{:.2f}".format(self.total_set_command_time) + time_report[test_case]['total_get_command_time'] = "{:.2f}".format(self.total_get_command_time) + time_report[test_case]['total_time'] = "{:.2f}".format(self.total_sleep_time+self.total_set_command_time+self.total_get_command_time) + self.total_sleep_time = 0 + self.total_set_command_time = 0 + self.total_get_command_time = 0 + return time_report + + def add_gta_data_into_db(self, time_report,run_id='local_run'): + ''' + add_gta_data_into_db + :param time_report: gta report json data + ''' + try: + URL = CafyGTA_Configs.get_gta_url() + API_KEY = CafyGTA_Configs.get_api_key() + data = dict() + data['run_id'] = run_id + data['gta'] = time_report + data_json = json.dumps(data) + headers = {'Content-Type': 'application/json', + 'Authorization': 'Bearer {}'.format(API_KEY)} + response = requests.post(URL, data=data_json, headers=headers) + if response.status_code == 200: + print('GTA data updated to Mongo db:', response.status_code) + else: + print('Failed to update GTA data to Mongo db, Status code:', response.status_code) + except Exception as e: + print(e) + + def pytest_terminal_summary(self, terminalreporter): + ''' + Method pytest_terminal_summary : terminal reporting + return : None + ''' + time_report = self.collect_granular_time_accouting_report() + # Create a Jinja2 environment and load the HTML template + CURRENT_DIR = os.path.dirname(os.path.realpath(__file__)) + template_file = os.path.join(CURRENT_DIR,"resources/gta_template.html") + with open(template_file) as html_src: + html_template = html_src.read() + template = Template(html_template) + html_content = template.render(dictionary_data=time_report) + # Define the path to the output HTML file + path=CafyLog.work_dir + html_file_path = os.path.join(path, 'granular_time_report.html') + #Update gta data into mongo db + run_id = os.environ.get("CAFY_RUN_ID", 'local_run') + self.add_gta_data_into_db(time_report,run_id) + # Write the HTML content to the output file + with open(html_file_path, 'w') as html_file: + html_file.write(html_content) \ No newline at end of file diff --git a/cafy_pytest/cafygta_config.py b/cafy_pytest/cafygta_config.py new file mode 100644 index 0000000..12c6ecc --- /dev/null +++ b/cafy_pytest/cafygta_config.py @@ -0,0 +1,16 @@ +class CafyGTA_Configs: + GTA_URL = 'https://cafy3.cisco.com:3500/api/gta' + API_KEY = 'f912594816b83760756f3cfdcb32f08bdf3f9b6fe46183323ec0aaf0e4afe25b' + @staticmethod + def get_gta_url(): + """ + return GTA_Url + """ + return CafyGTA_Configs.GTA_URL + + @staticmethod + def get_api_key(): + """ + return Api Key + """ + return CafyGTA_Configs.API_KEY \ No newline at end of file diff --git a/cafy_pytest/plugin.py b/cafy_pytest/plugin.py index ab442c8..31b7799 100644 --- a/cafy_pytest/plugin.py +++ b/cafy_pytest/plugin.py @@ -48,6 +48,7 @@ from utils.collectors.confest import Config from .cafy import Cafy +from .cafy_gta import TimeCollectorPlugin from .cafy_pdb import CafyPdb from .cafypdb_config import CafyPdb_Configs @@ -558,6 +559,7 @@ def pytest_configure(config): collection_list, cafypdb) config.pluginmanager.register(config._email) + config.pluginmanager.register(TimeCollectorPlugin()) #Write all.log path to terminal reporter = TerminalReporter(config, sys.stdout) @@ -585,6 +587,8 @@ def pytest_unconfigure(config): tmp_str_text = str(tmp_text) with open(os.path.join(CafyLog.work_dir, "env.txt"), "w") as f: f.write(tmp_str_text) + time_collector_plugin = config.pluginmanager.get_plugin(TimeCollectorPlugin) + config.pluginmanager.unregister(time_collector_plugin) except: pass diff --git a/cafy_pytest/resources/gta_template.html b/cafy_pytest/resources/gta_template.html new file mode 100644 index 0000000..5f4e03d --- /dev/null +++ b/cafy_pytest/resources/gta_template.html @@ -0,0 +1,99 @@ + + +
+ + + +Testcase | +Step | +Sleep Time | +Set Level Command Time | +Get Level Command Time | +Occurrence | +Total Execution Time | +
---|---|---|---|---|---|---|
{{key}} | ++ | + | + | + | + | |
+ | {{ time_key }} | +{{ time_values[0] if time_values else '-' }} | +- | +- | +{{ time_values[1] if time_values else '-' }} | +- | +
+ | {{ set_key }} | +- | +{{ set_value[0] if set_value else '-' }} | +- | +{{ set_value[1] if set_value else '-' }} | +- | +
+ | {{ get_key }} | +- | +- | +{{ get_value[0] if get_value else '-' }} | +{{ get_value[1] if get_value else '-' }} | +- | +
+ | Total | +{{ values["total_sleep_time"] }} | +{{ values["total_set_command_time"] }} | +{{ values["total_get_command_time"] }} | +- | +{{ values["total_time"] }} | +