Skip to content

Commit

Permalink
Refactored coverage processor in to class hierarchy (#230)
Browse files Browse the repository at this point in the history
* Refactored coverage processor

* A new class hierarchy for processing coverage from different tools

* New interface representing CoverageReport and CoverageData

* A factory to create the appropriate coverage processor

* Added missing documentation

* increment patch version
  • Loading branch information
coderustic authored Jan 3, 2025
1 parent b4ef3fb commit 3496069
Show file tree
Hide file tree
Showing 11 changed files with 899 additions and 1,067 deletions.
14 changes: 7 additions & 7 deletions cover_agent/CoverAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,19 +199,19 @@ def run_test_gen(self, failed_test_runs: List, language: str, test_framework: st

# Check if the desired coverage has been reached
failed_test_runs, language, test_framework, coverage_report = self.test_validator.get_coverage()
if self.test_validator.current_coverage >= (self.test_validator.desired_coverage / 100):
if self.test_validator.get_current_coverage() >= (self.test_validator.desired_coverage / 100):
break

# Log the final coverage
if self.test_validator.current_coverage >= (self.test_validator.desired_coverage / 100):
if self.test_validator.get_current_coverage() >= (self.test_validator.desired_coverage / 100):
self.logger.info(
f"Reached above target coverage of {self.test_validator.desired_coverage}% (Current Coverage: {round(self.test_validator.current_coverage * 100, 2)}%) in {iteration_count} iterations."
f"Reached above target coverage of {self.test_validator.desired_coverage}% (Current Coverage: {round(self.test_validator.get_current_coverage() * 100, 2)}%) in {iteration_count} iterations."
)
elif iteration_count == self.args.max_iterations:
if self.args.diff_coverage:
failure_message = f"Reached maximum iteration limit without achieving desired diff coverage. Current Coverage: {round(self.test_validator.current_coverage * 100, 2)}%"
failure_message = f"Reached maximum iteration limit without achieving desired diff coverage. Current Coverage: {round(self.test_validator.get_current_coverage() * 100, 2)}%"
else:
failure_message = f"Reached maximum iteration limit without achieving desired coverage. Current Coverage: {round(self.test_validator.current_coverage * 100, 2)}%"
failure_message = f"Reached maximum iteration limit without achieving desired coverage. Current Coverage: {round(self.test_validator.get_current_coverage() * 100, 2)}%"
if self.args.strict_coverage:
# User requested strict coverage (similar to "--cov-fail-under in pytest-cov"). Fail with exist code 2.
self.logger.error(failure_message)
Expand All @@ -237,11 +237,11 @@ def run_test_gen(self, failed_test_runs: List, language: str, test_framework: st
def log_coverage(self):
if self.args.diff_coverage:
self.logger.info(
f"Current Diff Coverage: {round(self.test_validator.current_coverage * 100, 2)}%"
f"Current Diff Coverage: {round(self.test_validator.get_current_coverage() * 100, 2)}%"
)
else:
self.logger.info(
f"Current Coverage: {round(self.test_validator.current_coverage * 100, 2)}%"
f"Current Coverage: {round(self.test_validator.get_current_coverage() * 100, 2)}%"
)
self.logger.info(f"Desired Coverage: {self.test_validator.desired_coverage}%")

Expand Down
413 changes: 0 additions & 413 deletions cover_agent/CoverageProcessor.py

This file was deleted.

1 change: 0 additions & 1 deletion cover_agent/UnitTestGenerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import re

from cover_agent.AICaller import AICaller
from cover_agent.CoverageProcessor import CoverageProcessor
from cover_agent.CustomLogger import CustomLogger
from cover_agent.FilePreprocessor import FilePreprocessor
from cover_agent.PromptBuilder import PromptBuilder
Expand Down
117 changes: 33 additions & 84 deletions cover_agent/UnitTestValidator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,15 @@
import json
import logging
import os
import re

from cover_agent.AICaller import AICaller
from cover_agent.CoverageProcessor import CoverageProcessor
from cover_agent.CustomLogger import CustomLogger
from cover_agent.FilePreprocessor import FilePreprocessor
from cover_agent.PromptBuilder import PromptBuilder
from cover_agent.Runner import Runner
from cover_agent.settings.config_loader import get_settings
from cover_agent.utils import load_yaml

from cover_agent.coverage.processor import process_coverage, CoverageReport, CoverageData

class UnitTestValidator:
def __init__(
Expand Down Expand Up @@ -106,15 +104,6 @@ def __init__(
with open(self.source_file_path, "r") as f:
self.source_code = f.read()

# initialize the coverage processor
self.coverage_processor = CoverageProcessor(
file_path=self.code_coverage_report_path,
src_file_path=self.source_file_path,
coverage_type=self.coverage_type,
use_report_coverage_feature_flag=self.use_report_coverage_feature_flag,
diff_coverage_report_path=self.diff_cover_report_path,
)

def get_coverage(self):
"""
Run code coverage and build the prompt to be used for generating tests.
Expand All @@ -124,6 +113,9 @@ def get_coverage(self):
"""
# Run coverage and build the prompt
self.run_coverage()
# Run diff coverage if enabled
if self.diff_coverage:
self.generate_diff_coverage_report()
return self.failed_test_runs, self.language, self.testing_framework, self.code_coverage_report

def get_code_language(self, source_file_path: str) -> str:
Expand Down Expand Up @@ -294,14 +286,9 @@ def run_coverage(self):
), f'Fatal: Error running test command. Are you sure the command is correct? "{self.test_command}"\nExit code {exit_code}. \nStdout: \n{stdout} \nStderr: \n{stderr}'

try:
# Process the extracted coverage metrics
coverage, coverage_percentages = self.post_process_coverage_report(
time_of_test_command
)
self.current_coverage = coverage
self.last_coverage_percentages = coverage_percentages.copy()
self.current_coverage_report = self.post_process_coverage_report(time_of_test_command)
self.logger.info(
f"Initial coverage: {round(self.current_coverage * 100, 2)}%"
f"Initial coverage: {round(self.current_coverage_report.total_coverage * 100, 2)}%"
)

except AssertionError as error:
Expand Down Expand Up @@ -503,11 +490,9 @@ def validate_test(self, generated_test: dict):

# If test passed, check for coverage increase
try:
new_percentage_covered, new_coverage_percentages = self.post_process_coverage_report(
time_of_test_command
)
new_coverage_report = self.post_process_coverage_report(time_of_test_command)

if new_percentage_covered <= self.current_coverage:
if self.current_coverage_report is not None and new_coverage_report.total_coverage <= self.current_coverage_report.total_coverage:
# Coverage has not increased, rollback the test by removing it from the test file
with open(self.test_file_path, "w") as test_file:
test_file.write(original_content)
Expand Down Expand Up @@ -579,20 +564,20 @@ def validate_test(self, generated_test: dict):
additional_imports_lines
) # this is important, otherwise the next test will be inserted at the wrong line

for key in new_coverage_percentages:
if new_coverage_percentages[key] > self.last_coverage_percentages[key] and key == self.source_file_path.split("/")[-1]:
for key in new_coverage_report.file_coverage:
new_v: CoverageData = new_coverage_report.file_coverage[key]
old_v: CoverageData = self.current_coverage_report.file_coverage[key]
if new_v.coverage > old_v.coverage and key == self.source_file_path.split("/")[-1]:
self.logger.info(
f"Coverage for provided source file: {key} increased from {round(self.last_coverage_percentages[key] * 100, 2)} to {round(new_coverage_percentages[key] * 100, 2)}"
f"Coverage for provided source file: {key} increased from {round(self.last_coverage_percentages[key] * 100, 2)} to {round(new_v.coverage * 100, 2)}"
)
elif new_coverage_percentages[key] > self.last_coverage_percentages[key]:
elif new_v.coverage > old_v.coverage:
self.logger.info(
f"Coverage for non-source file: {key} increased from {round(self.last_coverage_percentages[key] * 100, 2)} to {round(new_coverage_percentages[key] * 100, 2)}"
f"Coverage for non-source file: {key} increased from {round(self.last_coverage_percentages[key] * 100, 2)} to {round(new_v.coverage * 100, 2)}"
)
self.current_coverage = new_percentage_covered
self.last_coverage_percentages = new_coverage_percentages.copy()

self.logger.info(
f"Test passed and coverage increased. Current coverage: {round(new_percentage_covered * 100, 2)}%"
f"Test passed and coverage increased. Current coverage: {round(new_coverage_report.total_coverage * 100, 2)}%"
)
return {
"status": "PASS",
Expand Down Expand Up @@ -689,59 +674,20 @@ def extract_error_message(self, fail_details):
logging.error(f"Error extracting error message: {e}")
return ""

def post_process_coverage_report(self, time_of_test_command):
coverage_percentages = {}
if self.use_report_coverage_feature_flag:
self.logger.info(
"Using the report coverage feature flag to process the coverage report"
)
file_coverage_dict = self.coverage_processor.process_coverage_report(
time_of_test_command=time_of_test_command
)
total_lines_covered = 0
total_lines_missed = 0
total_lines = 0
for key in file_coverage_dict:
lines_covered, lines_missed, percentage_covered = (
file_coverage_dict[key]
)
total_lines_covered += len(lines_covered)
total_lines_missed += len(lines_missed)
total_lines += len(lines_covered) + len(lines_missed)
if key == self.source_file_path:
self.last_source_file_coverage = percentage_covered
if key not in coverage_percentages:
coverage_percentages[key] = 0
coverage_percentages[key] = percentage_covered
try:
percentage_covered = total_lines_covered / total_lines
except ZeroDivisionError:
self.logger.error(f"ZeroDivisionError: Attempting to perform total_lines_covered / total_lines: {total_lines_covered} / {total_lines}.")
percentage_covered = 0

self.logger.info(
f"Total lines covered: {total_lines_covered}, Total lines missed: {total_lines_missed}, Total lines: {total_lines}"
)
self.logger.info(
f"coverage: Percentage {round(percentage_covered * 100, 2)}%"
)
elif self.diff_coverage:
self.generate_diff_coverage_report()
lines_covered, lines_missed, percentage_covered = (
self.coverage_processor.process_coverage_report(
time_of_test_command=time_of_test_command
)
)
self.code_coverage_report = f"Lines covered: {lines_covered}\nLines missed: {lines_missed}\nPercentage covered: {round(percentage_covered * 100, 2)}%"
else:
lines_covered, lines_missed, percentage_covered = (
self.coverage_processor.process_coverage_report(
time_of_test_command=time_of_test_command
)
)
self.code_coverage_report = f"Lines covered: {lines_covered}\nLines missed: {lines_missed}\nPercentage covered: {round(percentage_covered * 100, 2)}%"
return percentage_covered, coverage_percentages

def post_process_coverage_report(self, time_of_test_command: int):
report: CoverageReport = process_coverage(
tool_type=self.coverage_type,
time_of_test_command=time_of_test_command,
report_path=self.code_coverage_report_path,
src_file_path=self.source_file_path,
is_global_coverage_enabled=self.use_report_coverage_feature_flag,
file_pattern=None,
diff_coverage_report_path=self.diff_cover_report_path,
)
self.logger.info(
f"coverage: Percentage {round(report.total_coverage * 100, 2)}%"
)
return report

def generate_diff_coverage_report(self):
# Run the diff-cover command to generate a JSON diff coverage report
Expand All @@ -758,3 +704,6 @@ def generate_diff_coverage_report(self):
f'Fatal: Error running diff coverage command. Are you sure the command is correct? "{coverage_command}"'
f"\nExit code {exit_code}. \nStdout: \n{stdout} \nStderr: \n{stderr}"
)

def get_current_coverage(self):
return self.current_coverage_report.total_coverage
Loading

0 comments on commit 3496069

Please sign in to comment.