From f02059068f85b0b03f54db8000b2299bbf97cde6 Mon Sep 17 00:00:00 2001 From: Spiros Maggioros Date: Tue, 19 Nov 2024 01:07:27 +0200 Subject: [PATCH] Fixed pre-commit for all files | Fixed return types and types for some functions with test cases --- merge_ROI_demo_and_test.py | 97 ----------------------- spare_scores/data_prep.py | 6 +- spare_scores/util.py | 4 +- tests/conftest.py | 25 ------ tests/unit/test_data_prep.py | 36 +++++---- tests/unit/test_spare_scores.py | 134 +++++++++++++------------------- tests/unit/test_util.py | 28 +++---- 7 files changed, 96 insertions(+), 234 deletions(-) delete mode 100644 merge_ROI_demo_and_test.py delete mode 100644 tests/conftest.py diff --git a/merge_ROI_demo_and_test.py b/merge_ROI_demo_and_test.py deleted file mode 100644 index 7f85d8d..0000000 --- a/merge_ROI_demo_and_test.py +++ /dev/null @@ -1,97 +0,0 @@ -import argparse - -import pandas as pd - -from spare_scores import spare_test - - -def merge_and_test( - roi_file, - demographic_file, - model_path, - key_var, - output_file, - spare_column_name, - verbose, - logs_file, -): - - # Read the input CSV files into pandas DataFrames - df_roi = pd.read_csv(roi_file) - df_demographic = pd.read_csv(demographic_file) - - # Merge the DataFrames based on the desired column - if not key_var: - key_var = df_roi.columns[0] - merged_df = pd.merge(df_roi, df_demographic, on=key_var) - - # Call the spare_test function from the spare_scores package - result = spare_test( - merged_df, - model_path, - key_var, - output_file, - spare_column_name, - verbose, - logs_file, - ) - - return result - - -if __name__ == "__main__": - # Example usage: - # python merge_ROI_demo_and_test.py -i spare_scores/data/example_data_ROIs.csv \ - # -d spare_scores/data/example_data_demographics.csv \ - # -m spare_scores/mdl/mdl_SPARE_AD_hMUSE_single.pkl.gz \ - # -kv ID \ - # -o zzz_output.csv \ - # -l zzz_logs.txt \ - # -sv SPARE_score - # # Create an ArgumentParser object - parser = argparse.ArgumentParser(description="Spare Scores Analysis") - - # Define the command-line arguments - parser.add_argument("-i", "--input", required=True, help="Input ROI CSV file") - parser.add_argument( - "-d", "--demographic", required=True, help="Input demographic CSV file" - ) - parser.add_argument("-m", "--model", required=True, help="Model for spare_train") - parser.add_argument( - "-kv", - "--key_var", - required=False, - default="", - help="The key variable of the dataset.", - ) - parser.add_argument( - "-o", "--output", required=False, default="", help="Output CSV file" - ) - parser.add_argument( - "-l", "--logs", required=False, default="", help="Output logs file" - ) - parser.add_argument( - "-sv", - "--spare_var", - required=False, - default="SPARE_score", - help="Column for calculated spare score", - ) - parser.add_argument( - "-v", "--verbose", required=False, default=1, help="Type of logging messages." - ) - - # Parse the command-line arguments - args = parser.parse_args() - - # Call the merge_and_test function with the provided arguments - merge_and_test( - args.input, - args.demographic, - args.model, - args.key_var, - args.output, - args.spare_var, - args.verbose, - args.logs, - ) diff --git a/spare_scores/data_prep.py b/spare_scores/data_prep.py index 93d59ad..0549a52 100644 --- a/spare_scores/data_prep.py +++ b/spare_scores/data_prep.py @@ -1,7 +1,7 @@ import logging import os import random -from typing import Any, Tuple, Union +from typing import Any, Optional, Tuple, Union import numpy as np import pandas as pd @@ -16,7 +16,7 @@ def check_train( to_predict: str, verbose: int = 1, # this needs to be removed(non used) pos_group: str = "", -) -> Union[str, Tuple[pd.DataFrame, list, str]]: +) -> Union[Tuple[pd.DataFrame, list, str], str]: """ Checks training dataframe for errors. @@ -221,7 +221,7 @@ def smart_unique( def age_sex_match( df1: pd.DataFrame, df2: Union[pd.DataFrame, None] = None, - to_match: str = "", + to_match: Optional[str] = "", p_threshold: float = 0.15, verbose: int = 1, age_out_percentage: float = 20, diff --git a/spare_scores/util.py b/spare_scores/util.py index 9e47915..ba0d48c 100644 --- a/spare_scores/util.py +++ b/spare_scores/util.py @@ -2,7 +2,7 @@ import logging import os import pickle -from typing import Any, Union +from typing import Any, Optional, Union import numpy as np import pandas as pd @@ -43,7 +43,7 @@ def add_file_extension(filename: str, extension: str) -> str: return filename -def check_file_exists(filename: str, logger: Any) -> Any: +def check_file_exists(filename: Optional[str], logger: Any) -> Any: """ Checks if file exists diff --git a/tests/conftest.py b/tests/conftest.py deleted file mode 100644 index 4d4aec1..0000000 --- a/tests/conftest.py +++ /dev/null @@ -1,25 +0,0 @@ -import gzip -import pickle -from pathlib import Path - -import pandas as pd -import pytest - - -@pytest.fixture -def df_fixture(): - # Load the sample data from the fixture - data_path = Path(__file__).resolve().parent / "fixtures" / "sample_data.csv" - data_path = str(data_path) - return pd.read_csv(data_path) - - -@pytest.fixture -def model_fixture(): - # Load the sample model from the fixture - # This model was created using this package based on the above (randomly - # generated data) - model_path = Path(__file__).resolve().parent / "fixtures" / "sample_model.pkl.gz" - with gzip.open(model_path, "rb") as f: - model = pickle.load(f) - return model diff --git a/tests/unit/test_data_prep.py b/tests/unit/test_data_prep.py index 8f07ac0..e63d8e4 100644 --- a/tests/unit/test_data_prep.py +++ b/tests/unit/test_data_prep.py @@ -16,15 +16,19 @@ class CheckDataPrep(unittest.TestCase): - def test_check_train(self): + def test_check_train(self) -> None: # Test case 1: Valid input dataframe and predictors self.df_fixture = load_df("../fixtures/sample_data.csv") predictors = ["ROI1", "ROI2", "ROI3"] to_predict = "Sex" pos_group = "M" - filtered_df, filtered_predictors, mdl_type = check_train( + result = check_train( self.df_fixture, predictors, to_predict, pos_group=pos_group ) + if isinstance(result, str): + self.fail("check_train returned an error") + else: + filtered_df, filtered_predictors, mdl_type = result self.assertTrue( filtered_df.equals(self.df_fixture) ) # Check if filtered dataframe is the same as the input dataframe @@ -42,7 +46,9 @@ def test_check_train(self): predictors = ["Var1", "Var2"] to_predict = "ToPredict" pos_group = "1" - res = check_train(df_missing_columns, predictors, to_predict, pos_group) + res = check_train( + df_missing_columns, predictors, to_predict, pos_group=pos_group + ) self.assertTrue(res == "Variable to predict is not in the input dataframe.") # Test case 3: Predictor not in input dataframe @@ -57,10 +63,10 @@ def test_check_train(self): predictors = ["Var1", "Var2"] # Var2 is not in the input dataframe to_predict = "ToPredict" pos_group = "1" - res = check_train(df, predictors, to_predict, pos_group) + res = check_train(df, predictors, to_predict, pos_group=pos_group) self.assertTrue(res == "Not all predictors exist in the input dataframe.") - def test_check_test(self): + def test_check_test(self) -> None: # Test case 1: Valid input dataframe and meta_data df = pd.DataFrame( { @@ -121,9 +127,9 @@ def test_check_test(self): ), } res = check_test(df_age_outside_range, meta_data) - self.assertTrue(res[1] == None) + self.assertTrue(res[1] is None) - def test_smart_unique(self): + def test_smart_unique(self) -> None: # test case 1: testing smart_unique with df2=None, to_predict=None self.df_fixture = load_df("../fixtures/sample_data.csv") result = smart_unique(self.df_fixture, None) @@ -146,8 +152,8 @@ def test_smart_unique(self): # test case 3: testing smart_unique with variance and no duplicate ID's. df2=None self.df_fixture = load_df("../fixtures/sample_data.csv") - result = smart_unique(self.df_fixture, None, "ROI1") - self.assertTrue(result.equals(self.df_fixture)) + result_df: pd.DataFrame = smart_unique(self.df_fixture, None, "ROI1") + self.assertTrue(result_df.equals(self.df_fixture)) # test case 4: testing smart_unique with variance and duplicate ID's. df2=None self.df_fixture = pd.DataFrame(data=df) @@ -161,7 +167,7 @@ def test_smart_unique(self): "ROI2": 0.73, } self.df_fixture = self.df_fixture._append(new_row, ignore_index=True) - result = smart_unique(self.df_fixture, None, "ROI1") + result_df_2: pd.DataFrame = smart_unique(self.df_fixture, None, "ROI1") correct_df = { "Id": [1.0, 2.0, 3.0, 4.0, 5.0, float("nan")], "ScanID": [ @@ -186,7 +192,7 @@ def test_smart_unique(self): ], } correct_df = pd.DataFrame(data=correct_df) - self.assertTrue(result.equals(correct_df)) + self.assertTrue(result_df_2.equals(correct_df)) # test case 5: testing df2 != None and no_df2=False df1 = { @@ -199,10 +205,10 @@ def test_smart_unique(self): self.df_fixture1 = pd.DataFrame(data=df1) self.df_fixture2 = pd.DataFrame(data=df2) - result = smart_unique(self.df_fixture1, self.df_fixture2, to_predict=None) + result = smart_unique(self.df_fixture1, self.df_fixture2, to_predict="") self.assertTrue(result == (self.df_fixture1, self.df_fixture2)) - def test_age_sex_match(self): + def test_age_sex_match(self) -> None: # test case 1: testing df2=None and to_match=None self.df_fixture = load_df("../fixtures/sample_data.csv") result = age_sex_match(self.df_fixture, None) @@ -265,7 +271,7 @@ def test_age_sex_match(self): print(result) self.assertTrue(result.equals(correct_df)) - def test_logging_basic_config(self): + def test_logging_basic_config(self) -> None: logging_level = { 0: logging.WARNING, 1: logging.INFO, @@ -291,5 +297,5 @@ def test_logging_basic_config(self): self.assertTrue(os.path.exists("test_data_prep.py")) self.assertTrue(result == logging.getLogger()) - def test_convert_cat_variables(self): + def test_convert_cat_variables(self) -> None: pass diff --git a/tests/unit/test_spare_scores.py b/tests/unit/test_spare_scores.py index bcbf974..3b8e0ec 100644 --- a/tests/unit/test_spare_scores.py +++ b/tests/unit/test_spare_scores.py @@ -1,22 +1,25 @@ +import os import unittest from pathlib import Path + import numpy as np import pandas as pd -import os + from spare_scores.data_prep import check_test -from spare_scores.util import load_df, load_model from spare_scores.mlp_torch import MLPDataset from spare_scores.spare import spare_test, spare_train +from spare_scores.util import load_df, load_model + class CheckMLPDataset(unittest.TestCase): - def test_len(self): + def test_len(self) -> None: # test case 1: testing length self.X = np.array([1, 2, 3, 4, 5, 6, 7, 8]) self.Y = np.array([1, 2, 3, 4, 5, 6, 7, 8]) self.Dataset = MLPDataset(self.X, self.Y) self.assertTrue(len(self.Dataset) == 8) - def test_idx(self): + def test_idx(self) -> None: # test case 2: testing getter self.X = np.array([1, 2, 3, 4, 5, 6, 7, 8]) self.Y = np.array([1, 2, 3, 4, 5, 6, 7, 8]) @@ -24,15 +27,16 @@ def test_idx(self): self.assertTrue(self.Dataset[0] == (1, 1)) self.assertTrue(self.Dataset[len(self.Dataset) - 1] == (8, 8)) + class CheckSpareScores(unittest.TestCase): - def test_spare_test_SVM(self): + def test_spare_test_SVM(self) -> None: self.df_fixture = load_df("../fixtures/sample_data.csv") self.model_fixture = load_model("../fixtures/sample_model.pkl.gz") # Test case 1: Test with df result = spare_test(self.df_fixture, self.model_fixture) - status_code, status, result = ( + _, status, result = ( result["status_code"], result["status"], result["data"], @@ -46,8 +50,8 @@ def test_spare_test_SVM(self): filepath = ( Path(__file__).resolve().parent.parent / "fixtures" / "sample_data.csv" ) - filepath = str(filepath) - result = spare_test(filepath, self.model_fixture) + filepath_str = str(filepath) + result = spare_test(filepath_str, self.model_fixture) status, result = result["status"], result["data"] self.assertTrue(status == "OK") self.assertTrue(isinstance(result, pd.DataFrame)) @@ -69,7 +73,7 @@ def test_spare_test_SVM(self): ) self.assertTrue(result == ["ROI1"]) - def test_spare_train_MLP(self): + def test_spare_train_MLP(self) -> None: self.df_fixture = load_df("../fixtures/sample_data.csv") self.model_fixture = load_model("../fixtures/sample_model.pkl.gz") # Test case 1: Testing spare_train with MLP model @@ -105,7 +109,7 @@ def test_spare_train_MLP(self): self.df_fixture, "ROI1", model_type="MLP", - data_vars = [ + data_vars=[ "ROI2", "ROI3", "ROI4", @@ -114,8 +118,8 @@ def test_spare_train_MLP(self): "ROI7", "ROI8", "ROI9", - "ROI10" - ] + "ROI10", + ], ) status, result_data = result["status"], result["data"] metadata = result_data[1] @@ -124,7 +128,7 @@ def test_spare_train_MLP(self): self.assertTrue(metadata["kernel"] == "linear") # self.assertTrue(metadata["to_predict"] == "to_predict") - def test_spare_train_MLPTorch(self): + def test_spare_train_MLPTorch(self) -> None: self.df_fixture = load_df("../fixtures/sample_data.csv") self.model_fixture = load_model("../fixtures/sample_model.pkl.gz") # Test case 1: testing training an MLPTorch model @@ -162,7 +166,7 @@ def test_spare_train_MLPTorch(self): self.df_fixture, "ROI1", model_type="MLPTorch", - data_vars = [ + data_vars=[ "ROI2", "ROI3", "ROI4", @@ -172,7 +176,7 @@ def test_spare_train_MLPTorch(self): "ROI8", "ROI9", "ROI10", - ] + ], ) status, result_data = result["status"], result["data"] metadata = result_data[1] @@ -181,7 +185,7 @@ def test_spare_train_MLPTorch(self): self.assertTrue(metadata["kernel"] == "linear") # self.assertTrue(metadata["to_predict"] == "to_predict") - def test_spare_train_SVM(self): + def test_spare_train_SVM(self) -> None: self.df_fixture = load_df("../fixtures/sample_data.csv") self.model_fixture = load_model("../fixtures/sample_model.pkl.gz") @@ -222,7 +226,7 @@ def test_spare_train_SVM(self): result = spare_train( self.df_fixture, "ROI1", - data_vars = [ + data_vars=[ "ROI2", "ROI3", "ROI4", @@ -231,8 +235,8 @@ def test_spare_train_SVM(self): "ROI7", "ROI8", "ROI9", - "ROI10" - ] + "ROI10", + ], ) status, result_data = result["status"], result["data"] metadata = result_data[1] @@ -241,24 +245,16 @@ def test_spare_train_SVM(self): self.assertTrue(metadata["kernel"] == "linear") # self.assertTrue(metadata["to_predict"] == "to_predict") - def test_spare_train_SVM_None(self): + def test_spare_train_SVM_None(self) -> None: self.df_fixture = load_df("../fixtures/sample_data.csv") # Test case 1: Training with no data vars - result = spare_train( - self.df_fixture, - "Age" - ) + result = spare_train(self.df_fixture, "Age") self.assertTrue(result is not None) - - def test_spare_train_SVM2(self): + def test_spare_train_SVM2(self) -> None: self.df_fixture = load_df("../fixtures/sample_data.csv") # Test case 1: Test overwrites - result = spare_train( - self.df_fixture, - "Age", - output="test_util.py" - ) + result = spare_train(self.df_fixture, "Age", output="test_util.py") self.assertTrue(result["status_code"] == 2) # Test case 2: Train with non existing output file @@ -277,12 +273,12 @@ def test_spare_train_SVM2(self): "ROI9", "ROI10", ], - output="results" + output="results", ) - self.assertTrue(os.path.isfile("results.pkl.gz") == True) + self.assertTrue(os.path.isfile("results.pkl.gz") is True) os.remove("results.pkl.gz") - def test_spare_train_non_existing_model(self): + def test_spare_train_non_existing_model(self) -> None: self.df_fixture = load_df("../fixtures/sample_data.csv") # Test case 1: training with non existing model type result = spare_train( @@ -304,13 +300,13 @@ def test_spare_train_non_existing_model(self): ) self.assertTrue(result["status_code"] == 2) - def test_spare_test_exceptions(self): + def test_spare_test_exceptions(self) -> None: self.df_fixture = load_df("../fixtures/sample_data.csv") self.model_fixture = load_model("../fixtures/sample_model.pkl.gz") # Test case 1: Test with existing output path - if(not os.path.isfile("output.csv")): - f = open("output.csv", "x") + if not os.path.isfile("output.csv"): + _ = open("output.csv", "x") result = spare_test(self.df_fixture, self.model_fixture, output="output") self.assertTrue(result["status_code"] == 0) os.remove("output.csv") @@ -319,18 +315,15 @@ def test_spare_test_exceptions(self): data = { "Var1": [x for x in range(100)], "Var2": [x for x in range(100)], - "label": [x**2 for x in range(100)] + "label": [x**2 for x in range(100)], } self.df_fixture = pd.DataFrame(data=data) - meta_data = { - "predictors": "Not_existing" - } + meta_data = {"predictors": "Not_existing"} err, cols_not_found = check_test(self.df_fixture, meta_data) self.assertTrue(len(err) != 0) self.assertTrue(cols_not_found is not None) - - def test_spare_train_regression_error(self): + def test_spare_train_regression_error(self) -> None: self.df_fixture = load_df("../fixtures/sample_data.csv") # Test case 1: testing with non-integer like as predictor result = spare_train( @@ -347,65 +340,50 @@ def test_spare_train_regression_error(self): "ROI8", "ROI9", "ROI10", - ] + ], ) self.assertTrue(result["status_code"] == 2) - self.assertTrue(result["status"] == "Dataset check failed before training was initiated.") + self.assertTrue( + result["status"] == "Dataset check failed before training was initiated." + ) # Test case 2: testing with a too-small dataset data = { - "Var1": [1,2,3,4,5], - "Var2": [2,4,6,8,10], - "label": [1.5,2.4,3.2,4.5,5.5] + "Var1": [1, 2, 3, 4, 5], + "Var2": [2, 4, 6, 8, 10], + "label": [1.5, 2.4, 3.2, 4.5, 5.5], } self.df_fixture = pd.DataFrame(data=data) - result = spare_train( - self.df_fixture, - "label", - data_vars=[ - "Var1", - "Var2" - ] - ) + result = spare_train(self.df_fixture, "label", data_vars=["Var1", "Var2"]) self.assertTrue(result["status_code"] == 2) - self.assertTrue(result["status"] == "Dataset check failed before training was initiated.") + self.assertTrue( + result["status"] == "Dataset check failed before training was initiated." + ) # Test case 3: testing with a label that has to variance data = { - "Var1": [1,2,3,4,5], - "Var2": [2,4,6,8,10], - "label": [1,1,1,1,1] + "Var1": [1, 2, 3, 4, 5], + "Var2": [2, 4, 6, 8, 10], + "label": [1, 1, 1, 1, 1], } self.df_fixture = pd.DataFrame(data=data) - result = spare_train( - self.df_fixture, - "label", - data_vars=[ - "Var1", - "Var2" - ] - ) + result = spare_train(self.df_fixture, "label", data_vars=["Var1", "Var2"]) self.assertTrue(result["status_code"] == 2) - self.assertTrue(result["status"] == "Dataset check failed before training was initiated.") + self.assertTrue( + result["status"] == "Dataset check failed before training was initiated." + ) # Test case 4: testing with a dataset that may be too small data = { "Var1": [x for x in range(80)], "Var2": [x for x in range(80)], "Var3": [x for x in range(80)], - "label": [x*2 for x in range(80)] + "label": [x * 2 for x in range(80)], } self.df_fixture = pd.DataFrame(data=data) - result = spare_train( - self.df_fixture, - "label", - data_vars=[ - "Var1", - "Var2" - ] - ) + result = spare_train(self.df_fixture, "label", data_vars=["Var1", "Var2"]) self.assertTrue(result is not None) diff --git a/tests/unit/test_util.py b/tests/unit/test_util.py index 7512b3d..800298d 100644 --- a/tests/unit/test_util.py +++ b/tests/unit/test_util.py @@ -20,15 +20,15 @@ class CheckSpareScoresUtil(unittest.TestCase): - def test_load_model(self): + def test_load_model(self) -> None: self.model_fixture = load_model("../../tests/fixtures/sample_model.pkl.gz") # Test case 1: Load a model filepath = ( Path(__file__).resolve().parent.parent / "fixtures" / "sample_model.pkl.gz" ) - filepath = str(filepath) - result = load_model(filepath) + str_filepath = str(filepath) + result = load_model(str_filepath) self.assertTrue(result[1]["mdl_type"] == self.model_fixture[1]["mdl_type"]) self.assertTrue(result[1]["kernel"] == self.model_fixture[1]["kernel"]) self.assertTrue(result[1]["predictors"] == self.model_fixture[1]["predictors"]) @@ -38,7 +38,7 @@ def test_load_model(self): == self.model_fixture[1]["categorical_var_map"] ) - def test_expspace(self): + def test_expspace(self) -> None: # Test case 1: span = [0, 2] span = [0, 2] expected_result = np.array([1.0, 2.71828183, 7.3890561]) @@ -56,7 +56,7 @@ def test_expspace(self): expected_result = np.array([0.13533528, 0.36787944, 1.0, 2.71828183]) self.assertTrue(np.allclose(expspace(span), expected_result)) - def test_check_file_exists(self): + def test_check_file_exists(self) -> None: # test case 1: filename=None logger = logging.getLogger(__name__) result = check_file_exists(None, logger) @@ -71,7 +71,7 @@ def test_check_file_exists(self): err_msg = "The output filename test_util.py, corresponds to an existing file, interrupting execution to avoid overwrite." self.assertTrue(result == err_msg) - def test_save_file(self): + def test_save_file(self) -> None: # test case 1: testing training output file that don't exist result = pd.DataFrame( data={ @@ -95,7 +95,7 @@ def test_save_file(self): self.assertTrue(os.path.exists(output + ".csv")) os.remove(output + ".csv") - def test_is_unique_identifier(self): + def test_is_unique_identifier(self) -> None: # test case 1: testing with a unique identifier df = { "ID": [0, 1, 2, 3, 4], @@ -117,7 +117,7 @@ def test_is_unique_identifier(self): self.df_fixture = pd.DataFrame(data=df) self.assertFalse(is_unique_identifier(self.df_fixture, ["Var1", "Var2"])) - def test_load_examples(self): + def test_load_examples(self) -> None: # test case 1: testing loading example csv file_name = "example_data.csv" result = load_examples(file_name) @@ -133,7 +133,7 @@ def test_load_examples(self): result = load_examples(file_name) self.assertTrue(result is None) - def test_convert_to_number_if_possible(self): + def test_convert_to_number_if_possible(self) -> None: # test case 1: valid convertion to integer num = "254" self.assertTrue(convert_to_number_if_possible(num) == 254) @@ -142,14 +142,14 @@ def test_convert_to_number_if_possible(self): num = "CBICA" self.assertTrue(convert_to_number_if_possible(num) == num) - def test_load_df(self): + def test_load_df(self) -> None: # Test case 1: Input is a string (CSV file path) filepath = ( Path(__file__).resolve().parent.parent / "fixtures" / "sample_data.csv" ) - filepath = str(filepath) - expected_df = pd.read_csv(filepath, low_memory=False) - self.assertTrue(load_df(filepath).equals(expected_df)) + new_filepath = str(filepath) + expected_df = pd.read_csv(new_filepath, low_memory=False) + self.assertTrue(load_df(new_filepath).equals(expected_df)) # Test case 2: Input is already a DataFrame input_df = pd.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]}) @@ -166,7 +166,7 @@ def test_load_df(self): expected_df = input_df.copy() self.assertTrue(load_df(input_df).equals(expected_df)) - def test_add_file_extension(self): + def test_add_file_extension(self) -> None: # Test case 1: File extension already present filename = "myfile.txt" extension = ".txt"