From c463618820f842a4fb22bcc235197d196b44d4e3 Mon Sep 17 00:00:00 2001 From: kelle Date: Wed, 24 Jul 2024 16:01:39 -0400 Subject: [PATCH] debugging and testing --- simple/utils/spectral_types.py | 56 +++++----- tests/test_spectral_type_utils.py | 178 ++++++++++++++++++++++++++++++ tests/test_utils.py | 121 -------------------- 3 files changed, 206 insertions(+), 149 deletions(-) create mode 100644 tests/test_spectral_type_utils.py diff --git a/simple/utils/spectral_types.py b/simple/utils/spectral_types.py index 33bbe5881..88c40d7af 100644 --- a/simple/utils/spectral_types.py +++ b/simple/utils/spectral_types.py @@ -16,8 +16,8 @@ "convert_spt_code_to_string", ] -logger = logging.getLogger("SIMPLE") -logger.setLevel(logging.INFO) # set default log level to INFO +logger = logging.getLogger("AstroDB") +logger.setLevel(logging.DEBUG) def ingest_spectral_type( @@ -66,7 +66,6 @@ def ingest_spectral_type( None """ - db_name = find_source_in_db(db, source) if len(db_name) != 1: @@ -75,11 +74,10 @@ def ingest_spectral_type( else: db_name = db_name[0] - adopted = False - old_adopted = None source_spt_data = ( db.query(db.SpectralTypes).filter(db.SpectralTypes.c.source == db_name).table() ) + logger.debug(f"Pre-existing Spectral Type data: \n {source_spt_data}") # Check for duplicates duplicate_check = ( @@ -93,6 +91,7 @@ def ingest_spectral_type( ) .count() ) + logger.debug(f"Duplicate check: {duplicate_check}") if duplicate_check > 0: msg = f"Spectral type already in the database: {db_name}, {regime}, {reference}" if raise_error: @@ -100,40 +99,40 @@ def ingest_spectral_type( raise AstroDBError(msg) else: logger.warning(msg) + else: + logger.debug(f"No duplicates found for : {db_name}, {regime}, {reference}") # set adopted flag - if source_spt_data is None or len(source_spt_data) == 0: + if len(source_spt_data) == 0: adopted = True - logger.debug("No Spectral Type data for this source in the database") + old_adopted = None + logger.debug( + "No Spectral Type data for this source in the database, setting adopted flag to True" + ) elif len(source_spt_data) > 0: # Spectral Type Data already exists - dupe_ind = source_spt_data["reference"] == reference - if sum(dupe_ind): - logger.debug(f"Duplicate measurement\n, {source_spt_data[dupe_ind]}") - else: - logger.debug("Another Spectral Type exists,") - if logger.level == 10: - source_spt_data.pprint_all() - adopted_ind = source_spt_data["adopted"] == 1 if sum(adopted_ind): old_adopted = source_spt_data[adopted_ind] - print("spt_error:", spectral_type_error) - print("source spt data:", source_spt_data["spectral_type_error"]) + logger.debug(f"Old adopted data: {old_adopted}") if ( - spectral_type_error is not None - and source_spt_data["spectral_type_error"] is not None + old_adopted["spectral_type_error"] is not None + and spectral_type_error is not None ): - if spectral_type_error < min(source_spt_data["spectral_type_error"]): + if spectral_type_error < min(old_adopted["spectral_type_error"]): adopted = True + else: + adopted = False logger.debug(f"The new spectral type's adopted flag is:, {adopted}") - else: - msg = f"Unexpected state while ingesting {source}" - if raise_error: - logger.error(msg) - raise RuntimeError + else: + adopted = True + logger.debug( + "No spectral type error found, setting adopted flag to True" + ) else: - logger.warning(msg) + adopted = True + old_adopted = None + logger.debug("No adopted data found, setting adopted flag to True") if spectral_type_code is None: spectral_type_code = convert_spt_string_to_code(spectral_type_string) @@ -170,6 +169,7 @@ def ingest_spectral_type( .where( and_( db.SpectralTypes.c.source == old_adopted["source"][0], + db.SpectralTypes.c.regime == old_adopted["regime"][0], db.SpectralTypes.c.reference == old_adopted["reference"][0], ) ) @@ -182,14 +182,14 @@ def ingest_spectral_type( .filter( and_( db.SpectralTypes.c.source == old_adopted["source"][0], + db.SpectralTypes.c.regime == old_adopted["regime"][0], db.SpectralTypes.c.reference == old_adopted["reference"][0], ) ) .table() ) logger.debug("Old adopted measurement unset") - if logger.level == 10: - old_adopted_data.pprint_all() + logger.debug(f"Old adopted data:\n {old_adopted_data}") except sqlalchemy.exc.IntegrityError as e: if ( db.query(db.Publications) diff --git a/tests/test_spectral_type_utils.py b/tests/test_spectral_type_utils.py new file mode 100644 index 000000000..8b42882ae --- /dev/null +++ b/tests/test_spectral_type_utils.py @@ -0,0 +1,178 @@ +import pytest +from astrodb_utils.utils import ( + AstroDBError, +) +from simple.utils.spectral_types import ( + convert_spt_string_to_code, + convert_spt_code_to_string, + ingest_spectral_type, +) + + +def test_convert_spt_string_to_code(): + # Test conversion of spectral types into numeric values + assert convert_spt_string_to_code("M5.6") == 65.6 + assert convert_spt_string_to_code("T0.1") == 80.1 + assert convert_spt_string_to_code("Y2pec") == 92 + + +def test_convert_spt_code_to_string(): + # Test conversion of spectral types into numeric values + assert convert_spt_code_to_string(65.6) == "M5.6" + assert convert_spt_code_to_string(80.1) == "T0.1" + assert convert_spt_code_to_string(92, decimals=0) == "Y2" + + +def test_ingest_spectral_type(temp_db): + spt_data1 = { + "source": "Fake 1", + "spectral_type": "M5.6", + "regime": "nir", + "reference": "Ref 1", + } + spt_data2 = { + "source": "Fake 2", + "spectral_type": "T0.1", + "regime": "nir", + "reference": "Ref 1", + } + spt_data3 = { + "source": "Fake 3", + "spectral_type": "Y2pec", + "regime": "nir", + "reference": "Ref 2", + } + for spt_data in [spt_data1, spt_data2, spt_data3]: + ingest_spectral_type( + temp_db, + source=spt_data["source"], + spectral_type_string=spt_data["spectral_type"], + spectral_type_error=1.0, + reference=spt_data["reference"], + regime=spt_data["regime"], + ) + results = ( + temp_db.query(temp_db.SpectralTypes) + .filter(temp_db.SpectralTypes.c.source == spt_data["source"]) + .table() + ) + assert len(results) == 1, f"Expecting this data: {spt_data} in \n {results}" + assert results["adopted"][0] == True # noqa: E712 + + assert ( + temp_db.query(temp_db.SpectralTypes) + .filter(temp_db.SpectralTypes.c.reference == "Ref 1") + .count() + == 2 + ) + results = ( + temp_db.query(temp_db.SpectralTypes) + .filter(temp_db.SpectralTypes.c.reference == "Ref 2") + .table() + ) + assert len(results) == 1 + assert results["source"][0] == "Fake 3" + assert results["spectral_type_string"][0] == "Y2pec" + assert results["spectral_type_code"][0] == [92] + + +def test_ingest_spectral_type_adopted(temp_db): + spt_data = { + "source": "Fake 1", + "spectral_type": "M5.0", + "spectral_type_error": 0.5, + "regime": "optical", + "reference": "Ref 1", + } + ingest_spectral_type( + temp_db, + source=spt_data["source"], + spectral_type_string=spt_data["spectral_type"], + spectral_type_error=spt_data["spectral_type_error"], + reference=spt_data["reference"], + regime=spt_data["regime"], + ) + results = ( + temp_db.query(temp_db.SpectralTypes) + .filter(temp_db.SpectralTypes.c.source == spt_data["source"]) + .table() + ) + assert len(results) == 2 + results = ( + temp_db.query(temp_db.SpectralTypes) + .filter( + temp_db.SpectralTypes.c.source == spt_data["source"], + temp_db.SpectralTypes.c.spectral_type_error == 0.5, + ) + .table() + ) + assert results["adopted"][0] == True # noqa: E712 + results = ( + temp_db.query(temp_db.SpectralTypes) + .filter( + temp_db.SpectralTypes.c.source == spt_data["source"], + temp_db.SpectralTypes.c.spectral_type_error == 1.0, + ) + .table() + ) + assert results["adopted"][0] == False # noqa: E712 + + +@pytest.mark.filterwarnings( + "ignore", message=".*identifier has incorrect format for catalog*" +) +def test_ingest_spectral_type_errors(temp_db): + # testing for publication error + spt_data4 = { + "source": "Fake 1", + "spectral_type": "M5.6", + "regime": "nir", + "reference": "Ref 1", + } + # spt_data5 = { + # "source": "Fake 2", + # "spectral_type": "T0.1", + # "regime": "nir", + # "reference": "Ref 1", + # } + # spt_data6 = { + # "source": "Fake 3", + # "spectral_type": "Y2pec", + # "regime": "nir", + # "reference": "Ref 4", + # } + + with pytest.raises(AstroDBError) as error_message: + ingest_spectral_type( + temp_db, + source="not a source", + spectral_type_string=spt_data4["spectral_type"], + reference=spt_data4["reference"], + regime=spt_data4["regime"], + ) + assert "No unique source match" in str(error_message.value) + + with pytest.raises(AstroDBError) as error_message: + ingest_spectral_type( + temp_db, + spt_data4["source"], + spectral_type_string=spt_data4["spectral_type"], + reference=spt_data4["reference"], + regime=spt_data4["regime"], + ) + assert "Spectral type already in the database" in str(error_message.value) + + with pytest.raises(AstroDBError) as error_message: + ingest_spectral_type( + temp_db, + spt_data4["source"], + spectral_type_string="M6", + reference="not a reference", + regime=spt_data4["regime"], + ) + assert "The publication does not exist in the database" in str(error_message.value) + + +def test_ingest_spectral_type_logger(temp_db): + # https://stackoverflow.com/questions/59875983/why-is-caplog-text-empty-even-though-the-function-im-testing-is-logging + pass diff --git a/tests/test_utils.py b/tests/test_utils.py index e4f9cff1b..9cf4d2642 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -3,11 +3,6 @@ from astrodb_utils.utils import ( AstroDBError, ) -from simple.utils.spectral_types import ( - convert_spt_string_to_code, - convert_spt_code_to_string, - ingest_spectral_type, -) from simple.utils.companions import ingest_companion_relationships @@ -58,122 +53,6 @@ def t_pm(): return t_pm -def test_convert_spt_string_to_code(): - # Test conversion of spectral types into numeric values - assert convert_spt_string_to_code("M5.6") == 65.6 - assert convert_spt_string_to_code("T0.1") == 80.1 - assert convert_spt_string_to_code("Y2pec") == 92 - - -def test_convert_spt_code_to_string(): - # Test conversion of spectral types into numeric values - assert convert_spt_code_to_string(65.6) == "M5.6" - assert convert_spt_code_to_string(80.1) == "T0.1" - assert convert_spt_code_to_string(92, decimals=0) == "Y2" - - -def test_ingest_spectral_type(temp_db): - spt_data1 = { - "source": "Fake 1", - "spectral_type": "M5.6", - "regime": "nir", - "reference": "Ref 1", - } - spt_data2 = { - "source": "Fake 2", - "spectral_type": "T0.1", - "regime": "nir", - "reference": "Ref 1", - } - spt_data3 = { - "source": "Fake 3", - "spectral_type": "Y2pec", - "regime": "nir", - "reference": "Ref 2", - } - for spt_data in [spt_data1, spt_data2, spt_data3]: - ingest_spectral_type( - temp_db, - source=spt_data["source"], - spectral_type_string=spt_data["spectral_type"], - reference=spt_data["reference"], - regime=spt_data["regime"], - ) - - assert ( - temp_db.query(temp_db.SpectralTypes) - .filter(temp_db.SpectralTypes.c.reference == "Ref 1") - .count() - == 2 - ) - results = ( - temp_db.query(temp_db.SpectralTypes) - .filter(temp_db.SpectralTypes.c.reference == "Ref 2") - .table() - ) - assert len(results) == 1 - assert results["source"][0] == "Fake 3" - assert results["spectral_type_string"][0] == "Y2pec" - assert results["spectral_type_code"][0] == [92] - - -def test_ingest_spectral_type_adopted(temp_db): - # TODO: write test for adopted spectral type - pass - - -def test_ingest_spectral_type_errors(temp_db): - # testing for publication error - spt_data4 = { - "source": "Fake 1", - "spectral_type": "M5.6", - "regime": "nir", - "reference": "Ref 1", - } - # spt_data5 = { - # "source": "Fake 2", - # "spectral_type": "T0.1", - # "regime": "nir", - # "reference": "Ref 1", - # } - # spt_data6 = { - # "source": "Fake 3", - # "spectral_type": "Y2pec", - # "regime": "nir", - # "reference": "Ref 4", - # } - - with pytest.raises(AstroDBError) as error_message: - ingest_spectral_type( - temp_db, - source="not a source", - spectral_type_string=spt_data4["spectral_type"], - reference=spt_data4["reference"], - regime=spt_data4["regime"], - ) - assert "No unique source match" in str(error_message.value) - - with pytest.raises(AstroDBError) as error_message: - ingest_spectral_type( - temp_db, - spt_data4["source"], - spectral_type_string=spt_data4["spectral_type"], - reference=spt_data4["reference"], - regime=spt_data4["regime"], - ) - assert "Spectral type already in the database" in str(error_message.value) - - with pytest.raises(AstroDBError) as error_message: - ingest_spectral_type( - temp_db, - spt_data4["source"], - spectral_type_string="M6", - reference="not a reference", - regime=spt_data4["regime"], - ) - assert "The publication does not exist in the database" in str(error_message.value) - - def test_companion_relationships(temp_db): # testing companion ingest # trying no companion