From d0e29ba7789d3946009c6352e1d63c30366fd803 Mon Sep 17 00:00:00 2001 From: Exu-112 <93949832+Exu-112@users.noreply.github.com> Date: Mon, 8 Jul 2024 14:35:20 -0400 Subject: [PATCH] Ingest parallax function and ingest data from Ultracool sheet (#528) * updated ingest_parallax to return flags * formatting fixes and made logger stream to stdout * improved tests * refactored keyword arguments --- .../ultracool_sheet/Ingest_Parallax.py | 130 ++++++++ .../Ingest_spitzer_photometry.py | 2 +- scripts/ingests/ultracool_sheet/references.py | 2 + simple/utils/astrometry.py | 286 +++++++++--------- tests/test_astrometry.py | 121 +++++++- 5 files changed, 386 insertions(+), 155 deletions(-) create mode 100644 scripts/ingests/ultracool_sheet/Ingest_Parallax.py diff --git a/scripts/ingests/ultracool_sheet/Ingest_Parallax.py b/scripts/ingests/ultracool_sheet/Ingest_Parallax.py new file mode 100644 index 000000000..a703329ea --- /dev/null +++ b/scripts/ingests/ultracool_sheet/Ingest_Parallax.py @@ -0,0 +1,130 @@ +from astrodb_utils import load_astrodb, find_source_in_db, AstroDBError +import sys + +sys.path.append(".") +import logging +from astropy.io import ascii +from simple.schema import Photometry +from simple.schema import REFERENCE_TABLES +from math import isnan +from simple.utils.astrometry import ingest_parallax +from scripts.ingests.ultracool_sheet.references import uc_ref_to_simple_ref + + +logger = logging.getLogger(__name__) + +# Logger setup +# This will stream all logger messages to the standard output and +# apply formatting for that +logger.propagate = False # prevents duplicated logging messages +LOGFORMAT = logging.Formatter( + "%(asctime)s %(levelname)s: %(message)s", datefmt="%m/%d/%Y %I:%M:%S%p" +) +ch = logging.StreamHandler(stream=sys.stdout) +ch.setFormatter(LOGFORMAT) +# To prevent duplicate handlers, only add if they haven't been set previously +if len(logger.handlers) == 0: + logger.addHandler(ch) +logger.setLevel(logging.INFO) + +DB_SAVE = False +RECREATE_DB = True +db = load_astrodb( + "SIMPLE.sqlite", recreatedb=RECREATE_DB, reference_tables=REFERENCE_TABLES +) + + +# Load Ultracool sheet +doc_id = "1i98ft8g5mzPp2DNno0kcz4B9nzMxdpyz5UquAVhz-U8" +sheet_id = "361525788" +link = ( + f"https://docs.google.com/spreadsheets/d/{doc_id}/export?format=csv&gid={sheet_id}" +) + +# read the csv data into an astropy table +uc_sheet_table = ascii.read( + link, + format="csv", + data_start=1, + header_start=0, + guess=False, + fast_reader=False, + delimiter=",", +) + +no_sources = 0 +multiple_sources = 0 +ingested = 0 +already_exists = 0 +no_data = 0 + +# Ingest loop +for source in uc_sheet_table: + if isnan(source["plx_lit"]): # skip if no data + no_data += 1 + continue + uc_sheet_name = source["name"] + match = find_source_in_db( + db, + uc_sheet_name, + ra=source["ra_j2000_formula"], + dec=source["dec_j2000_formula"], + ) + + if len(match) == 1: + # 1 Match found. INGEST! + simple_source = match[0] + logger.debug(f"Match found for {uc_sheet_name}: {simple_source}") + + try: + references = source["ref_plx_lit"].split(";") + reference = uc_ref_to_simple_ref(db, references[0]) + + comment = None + if len(references) > 1: + comment = f"other references: {uc_ref_to_simple_ref(db, references[1])}" + ingest_parallax( + db, + source=simple_source, + parallax_mas=source["plx_lit"], + parallax_err_mas=source["plxerr_lit"], + reference=reference, + comment=comment, + ) + ingested += 1 + except AstroDBError as e: + msg = "ingest failed with error: " + str(e) + if "Duplicate measurement exists" in str(e): + already_exists += 1 + else: + logger.warning(msg) + raise AstroDBError(msg) from e + + elif len(match) == 0: + no_sources += 1 + elif len(match) > 1: + multiple_sources += 1 + else: + msg = "Unexpected situation occured" + logger.error(msg) + raise AstroDBError(msg) + + +# 1108 data points in UC sheet in total +logger.info(f"ingested:{ingested}") # 1014 ingested +logger.info(f"already exists:{already_exists}") # skipped 6 due to preexisting data +logger.info(f"no sources:{no_sources}") # skipped 86 due to 0 matches +logger.info(f"multiple sources:{multiple_sources}") # skipped 2 due to multiple matches +logger.info(f"no data: {no_data}") # 2782 +logger.info( + f"data points tracked:{ingested+already_exists+no_sources+multiple_sources}" +) # 1108 +total = ingested + already_exists + no_sources + multiple_sources + no_data +logger.info(f"total: {total}") # 3890 + +if total != len(uc_sheet_table): + msg = "data points tracked inconsistent with UC sheet" + logger.error(msg) + raise AstroDBError(msg) +elif DB_SAVE: + db.save_database(directory="data/") diff --git a/scripts/ingests/ultracool_sheet/Ingest_spitzer_photometry.py b/scripts/ingests/ultracool_sheet/Ingest_spitzer_photometry.py index 6b481eca1..e698fed20 100644 --- a/scripts/ingests/ultracool_sheet/Ingest_spitzer_photometry.py +++ b/scripts/ingests/ultracool_sheet/Ingest_spitzer_photometry.py @@ -17,7 +17,7 @@ logger = logging.getLogger("AstroDB") logger.setLevel(logging.INFO) -DB_SAVE = True +DB_SAVE = False RECREATE_DB = True db = load_astrodb( "SIMPLE.sqlite", recreatedb=RECREATE_DB, reference_tables=REFERENCE_TABLES diff --git a/scripts/ingests/ultracool_sheet/references.py b/scripts/ingests/ultracool_sheet/references.py index 2c693aa01..150d2e3e0 100644 --- a/scripts/ingests/ultracool_sheet/references.py +++ b/scripts/ingests/ultracool_sheet/references.py @@ -27,6 +27,8 @@ def uc_ref_to_simple_ref(db, ref): + if ref == "Harr15": # Reference with no ADS. + return ref t = ( db.query(db.Publications) .filter(db.Publications.c.bibcode == uc_ref_to_ADS[ref]) diff --git a/simple/utils/astrometry.py b/simple/utils/astrometry.py index 7f3394a55..8872e8522 100644 --- a/simple/utils/astrometry.py +++ b/simple/utils/astrometry.py @@ -2,6 +2,7 @@ from typing import Optional, Union from sqlalchemy import and_ import sqlalchemy.exc +from simple.schema import Parallaxes from astropy.units import Quantity from astropy.table import Table from astrodbkit2.astrodb import Database @@ -9,176 +10,169 @@ __all__ = [ - "ingest_parallaxes", + "ingest_parallax", "ingest_proper_motions", ] logger = logging.getLogger("SIMPLE") -# PARALLAXES -def ingest_parallaxes(db, sources, plxs, plx_errs, plx_refs, comments=None): +def ingest_parallax( + db, + source: str = None, + parallax_mas: float = None, + parallax_err_mas: float = None, + reference: str = None, + comment: str = None, + raise_error: bool = True, +): """ Parameters ---------- db: astrodbkit2.astrodb.Database Database object - sources: str or list[str] - list of source names - plxs: float or list[float] - list of parallaxes corresponding to the sources - plx_errs: float or list[float] - list of parallaxes uncertainties - plx_refs: str or list[str] - list of references for the parallax data - comments: Optional[Union[List[str], str]] + source: str + source name + parallax: float + parallax of source in milliarcseconds + plx_err: float + parallax uncertainty in milliarcseconds + reference: str + reference for the parallax data + comment: str + comments + raise_error: bool + True: raises error when encountered + False: does not raise error, returns flags dictionary - Examples - ---------- - > ingest_parallaxes(db, my_sources, my_plx, my_plx_unc, my_plx_refs) + Returns + ------- + flags: dict + 'added' : bool - true if properly ingested + 'content' : dict - content attempted to ingest + 'message' : str - error message """ + # Search for existing parallax data and determine if this is the best + # If no previous measurement exists, set the new one to the Adopted measurement + flags = {"added": False, "content": {}, "message": ""} + adopted = False + has_old_adopted = False + source_plx_data: Table = ( + db.query(db.Parallaxes).filter(db.Parallaxes.c.source == source).table() + ) - if isinstance(sources, str): - n_sources = 1 - sources = [sources] - else: - n_sources = len(sources) - - # Convert single element input value to list - if isinstance(plx_refs, str): - plx_refs = [plx_refs] * n_sources - - if isinstance(comments, str): - comments = [comments] * n_sources - elif comments is None: - comments = [None] * n_sources - - input_float_values = [plxs, plx_errs] - for i, input_value in enumerate(input_float_values): - if isinstance(input_value, float): - input_value = [input_value] * n_sources - input_float_values[i] = input_value - plxs, plx_errs = input_float_values - - n_added = 0 - - # loop through sources with parallax data to ingest - for i, source in enumerate(sources): - db_name = find_source_in_db(db, source) - - if len(db_name) != 1: - msg = f"No unique source match for {source} in the database" - raise AstroDBError(msg) - else: - db_name = db_name[0] - - # Search for existing parallax data and determine if this is the best - # If no previous measurement exists, set the new one to the Adopted measurement - adopted = None - source_plx_data: Table = ( - db.query(db.Parallaxes).filter(db.Parallaxes.c.source == db_name).table() - ) - - if source_plx_data is None or len(source_plx_data) == 0: - # if there's no other measurements in the database, - # set new data Adopted = True - adopted = True - # old_adopted = None # not used - logger.debug("No other measurement") - elif len(source_plx_data) > 0: # Parallax data already exists - # check for duplicate measurement - dupe_ind = source_plx_data["reference"] == plx_refs[i] - if sum(dupe_ind): - logger.debug(f"Duplicate measurement\n, {source_plx_data[dupe_ind]}") - continue + if source_plx_data is None or len(source_plx_data) == 0: + # if there's no other measurements in the database, + # set new data Adopted = True + adopted = True + logger.debug("No other measurement") + elif len(source_plx_data) > 0: # Parallax data already exists + # check for duplicate measurement + dupe_ind = source_plx_data["reference"] == reference + if sum(dupe_ind): + logger.debug(f"Duplicate measurement\n, {source_plx_data[dupe_ind]}") + msg = "Duplicate measurement exists with same reference" + flags["message"] = msg + if raise_error: + raise AstroDBError(msg) else: - logger.debug("!!! Another parallax measurement exists,") - if logger.level == 10: - source_plx_data.pprint_all() - - # check for previous adopted measurement and find new adopted - adopted_ind = source_plx_data["adopted"] == 1 - if sum(adopted_ind): - old_adopted = source_plx_data[adopted_ind] - # if errors of new data are less than other measurements, - # set Adopted = True. - if plx_errs[i] < min(source_plx_data["parallax_error"]): - adopted = True - - # unset old adopted - if old_adopted: - with db.engine.connect() as conn: - conn.execute( - db.Parallaxes.update() - .where( - and_( - db.Parallaxes.c.source - == old_adopted["source"][0], - db.Parallaxes.c.reference - == old_adopted["reference"][0], - ) - ) - .values(adopted=False) - ) - conn.commit() - # check that adopted flag is successfully changed - old_adopted_data = ( - db.query(db.Parallaxes) - .filter( - and_( - db.Parallaxes.c.source == old_adopted["source"][0], - db.Parallaxes.c.reference - == old_adopted["reference"][0], - ) - ) - .table() - ) - logger.debug("Old adopted measurement unset") - if logger.level == 10: - old_adopted_data.pprint_all() - else: - adopted = False - logger.debug(f"The new measurement's adopted flag is:, {adopted}") + return flags else: - msg = "Unexpected state" - logger.error(msg) - raise RuntimeError(msg) + logger.debug("!!! Another parallax measurement exists,") + if logger.level == 10: + source_plx_data.pprint_all() + + # check for previous adopted measurement and find new adopted + adopted_ind = source_plx_data["adopted"] == 1 + if sum(adopted_ind): + has_old_adopted = source_plx_data[adopted_ind] + # if errors of new data are less than other measurements, + # set Adopted = True. + if parallax_err_mas < min(source_plx_data["parallax_error"]): + adopted = True + else: + adopted = False + logger.debug(f"The new measurement's adopted flag is:, {adopted}") + else: + msg = "Unexpected state" + logger.error(msg) + raise RuntimeError(msg) - # Construct data to be added - parallax_data = [ - { - "source": db_name, - "parallax": plxs[i], - "parallax_error": plx_errs[i], - "reference": plx_refs[i], - "adopted": adopted, - "comments": comments[i], - } - ] + # Construct data to be added + parallax_data = { + "source": source, + "parallax": parallax_mas, + "parallax_error": parallax_err_mas, + "reference": reference, + "adopted": adopted, + "comments": comment, + } + flags["content"] = parallax_data - logger.debug(f"{parallax_data}") + try: + plx_obj = Parallaxes(**parallax_data) + with db.session as session: + session.add(plx_obj) + session.commit() + logger.info(f" Photometry added to database: {parallax_data}\n") + flags["added"] = True - try: + # unset old adopted only after ingest is successful! + if has_old_adopted: with db.engine.connect() as conn: - conn.execute(db.Parallaxes.insert().values(parallax_data)) + conn.execute( + db.Parallaxes.update() + .where( + and_( + db.Parallaxes.c.source == has_old_adopted["source"][0], + db.Parallaxes.c.reference + == has_old_adopted["reference"][0], + ) + ) + .values(adopted=False) + ) conn.commit() - n_added += 1 - logger.info(f"Parallax added to database: \n " f"{parallax_data}") - except sqlalchemy.exc.IntegrityError: - msg = ( - "The source may not exist in Sources table.\n" - "The parallax reference may not exist in Publications table. " - "Add it with add_publication function. \n" - "The parallax measurement may be a duplicate." + # check that adopted flag is successfully changed + old_adopted_data = ( + db.query(db.Parallaxes) + .filter( + and_( + db.Parallaxes.c.source == has_old_adopted["source"][0], + db.Parallaxes.c.reference == has_old_adopted["reference"][0], + ) + ) + .table() ) - logger.error(msg) - raise AstroDBError(msg) - - logger.info(f"Total Parallaxes added to database: {n_added} \n") - - return + logger.debug("Old adopted measurement unset") + if logger.level == 10: + old_adopted_data.pprint_all() + + return flags + except sqlalchemy.exc.IntegrityError as error_msg: + flags["added"] = False + msg = "" + matching_sources = ( + db.query(db.Sources).filter(db.Sources.c.source == source).astropy() + ) + if len(matching_sources) == 0: + msg += f"Source '{source}' does not exist in Sources table. " + matching_refs = ( + db.query(db.Publications) + .filter(db.Publications.c.reference == reference) + .astropy() + ) + if len(matching_refs) == 0: + msg += f"Reference '{reference}' does not exist in Publications table. " + if raise_error: + raise AstroDBError( + f"Error during parallax ingest. {msg}Error caught: {error_msg}" + ) + else: + logger.warning(msg) + flags["message"] = msg + return flags # PROPER MOTIONS diff --git a/tests/test_astrometry.py b/tests/test_astrometry.py index ee0f15136..f0fced6b7 100644 --- a/tests/test_astrometry.py +++ b/tests/test_astrometry.py @@ -2,7 +2,7 @@ from astropy.table import Table from astrodb_utils import AstroDBError from simple.utils.astrometry import ( - ingest_parallaxes, + ingest_parallax, ingest_proper_motions, ingest_radial_velocity, ) @@ -15,6 +15,7 @@ def t_plx(): [ {"source": "Fake 1", "plx": 113.0, "plx_err": 0.3, "plx_ref": "Ref 1"}, {"source": "Fake 2", "plx": 145.0, "plx_err": 0.5, "plx_ref": "Ref 1"}, + {"source": "Fake 1", "plx": 113.0, "plx_err": 0.2, "plx_ref": "Ref 2"}, {"source": "Fake 3", "plx": 155.0, "plx_err": 0.6, "plx_ref": "Ref 2"}, ] ) @@ -68,9 +69,15 @@ def t_rv(): def test_ingest_parallaxes(temp_db, t_plx): # Test ingest of parallax data - ingest_parallaxes( - temp_db, t_plx["source"], t_plx["plx"], t_plx["plx_err"], t_plx["plx_ref"] - ) + + for row in t_plx: + ingest_parallax( + temp_db, + row["source"], + row["plx"], + row["plx_err"], + row["plx_ref"], + ) results = ( temp_db.query(temp_db.Parallaxes) @@ -78,15 +85,113 @@ def test_ingest_parallaxes(temp_db, t_plx): .table() ) assert len(results) == 2 + assert not results["adopted"][0] # 1st source with ref 1 should not be adopted results = ( temp_db.query(temp_db.Parallaxes) .filter(temp_db.Parallaxes.c.reference == "Ref 2") .table() ) - assert len(results) == 1 - assert results["source"][0] == "Fake 3" - assert results["parallax"][0] == 155 - assert results["parallax_error"][0] == 0.6 + assert len(results) == 2 + assert results["source"][1] == "Fake 3" + assert results["parallax"][1] == 155 + assert results["parallax_error"][1] == 0.6 + assert results["adopted"][0] # 1st source with ref 2 should be adopted + + +def test_parallax_exceptions(temp_db): + with pytest.raises(AstroDBError) as error_message: + ingest_parallax( + temp_db, + source="bad source", + parallax_mas=1, + parallax_err_mas=0, + reference="Ref 1", + ) + assert "does not exist in Sources table" in str(error_message.value) + + flags = ingest_parallax( + temp_db, + source="bad source", + parallax_mas=1, + parallax_err_mas=0, + reference="Ref 1", + comment="comment", + raise_error=False, + ) + assert flags == { + "added": False, + "content": { + "source": "bad source", + "parallax": 1, + "parallax_error": 0, + "reference": "Ref 1", + "adopted": True, + "comments": "comment", + }, + "message": "Source 'bad source' does not exist in Sources table. ", + } + + with pytest.raises(AstroDBError) as error_message: + ingest_parallax( + temp_db, + source="Fake 1", + parallax_mas=1, + parallax_err_mas=0, + reference="bad ref", + ) + assert "does not exist in Publications table" in str(error_message.value) + + flags = ingest_parallax( + temp_db, + source="Fake 1", + parallax_mas=1, + parallax_err_mas=0, + reference="bad ref", + comment="comment", + raise_error=False, + ) + assert flags == { + "added": False, + "content": { + "source": "Fake 1", + "parallax": 1, + "parallax_error": 0, + "reference": "bad ref", + "adopted": True, + "comments": "comment", + }, + "message": "Reference 'bad ref' does not exist in Publications table. ", + } + + ingest_parallax( + temp_db, source="Fake 2", parallax_mas=1, parallax_err_mas=1, reference="Ref 2" + ) + with pytest.raises(AstroDBError) as error_message: + ingest_parallax( + temp_db, + source="Fake 2", + parallax_mas=1, + parallax_err_mas=1, + reference="Ref 2", + ) + assert "Duplicate measurement exists with same reference" in str( + error_message.value + ) + + flags = ingest_parallax( + temp_db, + source="Fake 2", + parallax_mas=1, + parallax_err_mas=1, + reference="Ref 2", + comment="comment", + raise_error=False, + ) + assert flags == { + "added": False, + "content": {}, + "message": "Duplicate measurement exists with same reference", + } def test_ingest_proper_motions(temp_db, t_pm):