Skip to content

Commit

Permalink
debugging and testing
Browse files Browse the repository at this point in the history
  • Loading branch information
kelle committed Jul 24, 2024
1 parent f9d441a commit c463618
Show file tree
Hide file tree
Showing 3 changed files with 206 additions and 149 deletions.
56 changes: 28 additions & 28 deletions simple/utils/spectral_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
"convert_spt_code_to_string",
]

logger = logging.getLogger("SIMPLE")
logger.setLevel(logging.INFO) # set default log level to INFO
logger = logging.getLogger("AstroDB")
logger.setLevel(logging.DEBUG)


def ingest_spectral_type(
Expand Down Expand Up @@ -66,7 +66,6 @@ def ingest_spectral_type(
None
"""

db_name = find_source_in_db(db, source)

if len(db_name) != 1:
Expand All @@ -75,11 +74,10 @@ def ingest_spectral_type(
else:
db_name = db_name[0]

adopted = False
old_adopted = None
source_spt_data = (
db.query(db.SpectralTypes).filter(db.SpectralTypes.c.source == db_name).table()
)
logger.debug(f"Pre-existing Spectral Type data: \n {source_spt_data}")

# Check for duplicates
duplicate_check = (
Expand All @@ -93,47 +91,48 @@ def ingest_spectral_type(
)
.count()
)
logger.debug(f"Duplicate check: {duplicate_check}")
if duplicate_check > 0:
msg = f"Spectral type already in the database: {db_name}, {regime}, {reference}"
if raise_error:
logger.error(msg)
raise AstroDBError(msg)
else:
logger.warning(msg)
else:
logger.debug(f"No duplicates found for : {db_name}, {regime}, {reference}")

# set adopted flag
if source_spt_data is None or len(source_spt_data) == 0:
if len(source_spt_data) == 0:
adopted = True
logger.debug("No Spectral Type data for this source in the database")
old_adopted = None
logger.debug(
"No Spectral Type data for this source in the database, setting adopted flag to True"
)
elif len(source_spt_data) > 0:
# Spectral Type Data already exists
dupe_ind = source_spt_data["reference"] == reference
if sum(dupe_ind):
logger.debug(f"Duplicate measurement\n, {source_spt_data[dupe_ind]}")
else:
logger.debug("Another Spectral Type exists,")
if logger.level == 10:
source_spt_data.pprint_all()

adopted_ind = source_spt_data["adopted"] == 1
if sum(adopted_ind):
old_adopted = source_spt_data[adopted_ind]
print("spt_error:", spectral_type_error)
print("source spt data:", source_spt_data["spectral_type_error"])
logger.debug(f"Old adopted data: {old_adopted}")
if (
spectral_type_error is not None
and source_spt_data["spectral_type_error"] is not None
old_adopted["spectral_type_error"] is not None
and spectral_type_error is not None
):
if spectral_type_error < min(source_spt_data["spectral_type_error"]):
if spectral_type_error < min(old_adopted["spectral_type_error"]):
adopted = True
else:
adopted = False
logger.debug(f"The new spectral type's adopted flag is:, {adopted}")
else:
msg = f"Unexpected state while ingesting {source}"
if raise_error:
logger.error(msg)
raise RuntimeError
else:
adopted = True
logger.debug(
"No spectral type error found, setting adopted flag to True"
)
else:
logger.warning(msg)
adopted = True
old_adopted = None
logger.debug("No adopted data found, setting adopted flag to True")

if spectral_type_code is None:
spectral_type_code = convert_spt_string_to_code(spectral_type_string)
Expand Down Expand Up @@ -170,6 +169,7 @@ def ingest_spectral_type(
.where(
and_(
db.SpectralTypes.c.source == old_adopted["source"][0],
db.SpectralTypes.c.regime == old_adopted["regime"][0],
db.SpectralTypes.c.reference == old_adopted["reference"][0],
)
)
Expand All @@ -182,14 +182,14 @@ def ingest_spectral_type(
.filter(
and_(
db.SpectralTypes.c.source == old_adopted["source"][0],
db.SpectralTypes.c.regime == old_adopted["regime"][0],
db.SpectralTypes.c.reference == old_adopted["reference"][0],
)
)
.table()
)
logger.debug("Old adopted measurement unset")
if logger.level == 10:
old_adopted_data.pprint_all()
logger.debug(f"Old adopted data:\n {old_adopted_data}")
except sqlalchemy.exc.IntegrityError as e:
if (
db.query(db.Publications)
Expand Down
178 changes: 178 additions & 0 deletions tests/test_spectral_type_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
import pytest
from astrodb_utils.utils import (
AstroDBError,
)
from simple.utils.spectral_types import (
convert_spt_string_to_code,
convert_spt_code_to_string,
ingest_spectral_type,
)


def test_convert_spt_string_to_code():
# Test conversion of spectral types into numeric values
assert convert_spt_string_to_code("M5.6") == 65.6
assert convert_spt_string_to_code("T0.1") == 80.1
assert convert_spt_string_to_code("Y2pec") == 92


def test_convert_spt_code_to_string():
# Test conversion of spectral types into numeric values
assert convert_spt_code_to_string(65.6) == "M5.6"
assert convert_spt_code_to_string(80.1) == "T0.1"
assert convert_spt_code_to_string(92, decimals=0) == "Y2"


def test_ingest_spectral_type(temp_db):
spt_data1 = {
"source": "Fake 1",
"spectral_type": "M5.6",
"regime": "nir",
"reference": "Ref 1",
}
spt_data2 = {
"source": "Fake 2",
"spectral_type": "T0.1",
"regime": "nir",
"reference": "Ref 1",
}
spt_data3 = {
"source": "Fake 3",
"spectral_type": "Y2pec",
"regime": "nir",
"reference": "Ref 2",
}
for spt_data in [spt_data1, spt_data2, spt_data3]:
ingest_spectral_type(
temp_db,
source=spt_data["source"],
spectral_type_string=spt_data["spectral_type"],
spectral_type_error=1.0,
reference=spt_data["reference"],
regime=spt_data["regime"],
)
results = (
temp_db.query(temp_db.SpectralTypes)
.filter(temp_db.SpectralTypes.c.source == spt_data["source"])
.table()
)
assert len(results) == 1, f"Expecting this data: {spt_data} in \n {results}"
assert results["adopted"][0] == True # noqa: E712

assert (
temp_db.query(temp_db.SpectralTypes)
.filter(temp_db.SpectralTypes.c.reference == "Ref 1")
.count()
== 2
)
results = (
temp_db.query(temp_db.SpectralTypes)
.filter(temp_db.SpectralTypes.c.reference == "Ref 2")
.table()
)
assert len(results) == 1
assert results["source"][0] == "Fake 3"
assert results["spectral_type_string"][0] == "Y2pec"
assert results["spectral_type_code"][0] == [92]


def test_ingest_spectral_type_adopted(temp_db):
spt_data = {
"source": "Fake 1",
"spectral_type": "M5.0",
"spectral_type_error": 0.5,
"regime": "optical",
"reference": "Ref 1",
}
ingest_spectral_type(
temp_db,
source=spt_data["source"],
spectral_type_string=spt_data["spectral_type"],
spectral_type_error=spt_data["spectral_type_error"],
reference=spt_data["reference"],
regime=spt_data["regime"],
)
results = (
temp_db.query(temp_db.SpectralTypes)
.filter(temp_db.SpectralTypes.c.source == spt_data["source"])
.table()
)
assert len(results) == 2
results = (
temp_db.query(temp_db.SpectralTypes)
.filter(
temp_db.SpectralTypes.c.source == spt_data["source"],
temp_db.SpectralTypes.c.spectral_type_error == 0.5,
)
.table()
)
assert results["adopted"][0] == True # noqa: E712
results = (
temp_db.query(temp_db.SpectralTypes)
.filter(
temp_db.SpectralTypes.c.source == spt_data["source"],
temp_db.SpectralTypes.c.spectral_type_error == 1.0,
)
.table()
)
assert results["adopted"][0] == False # noqa: E712


@pytest.mark.filterwarnings(
"ignore", message=".*identifier has incorrect format for catalog*"
)
def test_ingest_spectral_type_errors(temp_db):
# testing for publication error
spt_data4 = {
"source": "Fake 1",
"spectral_type": "M5.6",
"regime": "nir",
"reference": "Ref 1",
}
# spt_data5 = {
# "source": "Fake 2",
# "spectral_type": "T0.1",
# "regime": "nir",
# "reference": "Ref 1",
# }
# spt_data6 = {
# "source": "Fake 3",
# "spectral_type": "Y2pec",
# "regime": "nir",
# "reference": "Ref 4",
# }

with pytest.raises(AstroDBError) as error_message:
ingest_spectral_type(
temp_db,
source="not a source",
spectral_type_string=spt_data4["spectral_type"],
reference=spt_data4["reference"],
regime=spt_data4["regime"],
)
assert "No unique source match" in str(error_message.value)

with pytest.raises(AstroDBError) as error_message:
ingest_spectral_type(
temp_db,
spt_data4["source"],
spectral_type_string=spt_data4["spectral_type"],
reference=spt_data4["reference"],
regime=spt_data4["regime"],
)
assert "Spectral type already in the database" in str(error_message.value)

with pytest.raises(AstroDBError) as error_message:
ingest_spectral_type(
temp_db,
spt_data4["source"],
spectral_type_string="M6",
reference="not a reference",
regime=spt_data4["regime"],
)
assert "The publication does not exist in the database" in str(error_message.value)


def test_ingest_spectral_type_logger(temp_db):
# https://stackoverflow.com/questions/59875983/why-is-caplog-text-empty-even-though-the-function-im-testing-is-logging
pass
Loading

0 comments on commit c463618

Please sign in to comment.