Skip to content

Commit

Permalink
uupdated ingest_parallax to return flags
Browse files Browse the repository at this point in the history
  • Loading branch information
Exu-112 committed Jul 8, 2024
1 parent e756682 commit 06b6738
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 179 deletions.
208 changes: 31 additions & 177 deletions simple/utils/astrometry.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,178 +10,13 @@


__all__ = [
"ingest_parallaxes",
"ingest_parallax",
"ingest_proper_motions",
]

logger = logging.getLogger("SIMPLE")


# PARALLAXES
def ingest_parallaxes(db, sources, plxs, plx_errs, plx_refs, comments=None):
"""
Parameters
----------
db: astrodbkit2.astrodb.Database
Database object
sources: str or list[str]
list of source names
plxs: float or list[float]
list of parallaxes corresponding to the sources
plx_errs: float or list[float]
list of parallaxes uncertainties
plx_refs: str or list[str]
list of references for the parallax data
comments: Optional[Union[List[str], str]]
Examples
----------
> ingest_parallaxes(db, my_sources, my_plx, my_plx_unc, my_plx_refs)
"""

if isinstance(sources, str):
n_sources = 1
sources = [sources]
else:
n_sources = len(sources)

# Convert single element input value to list
if isinstance(plx_refs, str):
plx_refs = [plx_refs] * n_sources

if isinstance(comments, str):
comments = [comments] * n_sources
elif comments is None:
comments = [None] * n_sources

input_float_values = [plxs, plx_errs]
for i, input_value in enumerate(input_float_values):
if isinstance(input_value, float):
input_value = [input_value] * n_sources
input_float_values[i] = input_value
plxs, plx_errs = input_float_values

n_added = 0

# loop through sources with parallax data to ingest
for i, source in enumerate(sources):
db_name = find_source_in_db(db, source)

if len(db_name) != 1:
msg = f"No unique source match for {source} in the database"
raise AstroDBError(msg)
else:
db_name = db_name[0]

# Search for existing parallax data and determine if this is the best
# If no previous measurement exists, set the new one to the Adopted measurement
adopted = None
source_plx_data: Table = (
db.query(db.Parallaxes).filter(db.Parallaxes.c.source == db_name).table()
)

if source_plx_data is None or len(source_plx_data) == 0:
# if there's no other measurements in the database,
# set new data Adopted = True
adopted = True
# old_adopted = None # not used
logger.debug("No other measurement")
elif len(source_plx_data) > 0: # Parallax data already exists
# check for duplicate measurement
dupe_ind = source_plx_data["reference"] == plx_refs[i]
if sum(dupe_ind):
logger.debug(f"Duplicate measurement\n, {source_plx_data[dupe_ind]}")
continue
else:
logger.debug("!!! Another parallax measurement exists,")
if logger.level == 10:
source_plx_data.pprint_all()

# check for previous adopted measurement and find new adopted
adopted_ind = source_plx_data["adopted"] == 1
if sum(adopted_ind):
old_adopted = source_plx_data[adopted_ind]
# if errors of new data are less than other measurements,
# set Adopted = True.
if plx_errs[i] < min(source_plx_data["parallax_error"]):
adopted = True

# unset old adopted
if old_adopted:
with db.engine.connect() as conn:
conn.execute(
db.Parallaxes.update()
.where(
and_(
db.Parallaxes.c.source
== old_adopted["source"][0],
db.Parallaxes.c.reference
== old_adopted["reference"][0],
)
)
.values(adopted=False)
)
conn.commit()
# check that adopted flag is successfully changed
old_adopted_data = (
db.query(db.Parallaxes)
.filter(
and_(
db.Parallaxes.c.source == old_adopted["source"][0],
db.Parallaxes.c.reference
== old_adopted["reference"][0],
)
)
.table()
)
logger.debug("Old adopted measurement unset")
if logger.level == 10:
old_adopted_data.pprint_all()
else:
adopted = False
logger.debug(f"The new measurement's adopted flag is:, {adopted}")
else:
msg = "Unexpected state"
logger.error(msg)
raise RuntimeError(msg)

# Construct data to be added
parallax_data = [
{
"source": db_name,
"parallax": plxs[i],
"parallax_error": plx_errs[i],
"reference": plx_refs[i],
"adopted": adopted,
"comments": comments[i],
}
]

logger.debug(f"{parallax_data}")

try:
with db.engine.connect() as conn:
conn.execute(db.Parallaxes.insert().values(parallax_data))
conn.commit()
n_added += 1
logger.info(f"Parallax added to database: \n " f"{parallax_data}")
except sqlalchemy.exc.IntegrityError:
msg = (
"The source may not exist in Sources table.\n"
"The parallax reference may not exist in Publications table. "
"Add it with add_publication function. \n"
"The parallax measurement may be a duplicate."
)
logger.error(msg)
raise AstroDBError(msg)

logger.info(f"Total Parallaxes added to database: {n_added} \n")

return


def ingest_parallax(
db,
source: str = None,
Expand All @@ -199,11 +34,11 @@ def ingest_parallax(
Database object
sources: str
source name
plxs: float
parallax: float
parallax corresponding to the source
plx_errs: float
plx_err: float
parallax uncertainties
plx_refs: str
reference: str
reference for the parallax data
comment: str
comments
Expand All @@ -213,6 +48,7 @@ def ingest_parallax(
"""
# Search for existing parallax data and determine if this is the best
# If no previous measurement exists, set the new one to the Adopted measurement
flags = {"added": False, "content": {}, "message": ""}
adopted = None
source_plx_data: Table = (
db.query(db.Parallaxes).filter(db.Parallaxes.c.source == source).table()
Expand All @@ -229,7 +65,11 @@ def ingest_parallax(
if sum(dupe_ind):
logger.debug(f"Duplicate measurement\n, {source_plx_data[dupe_ind]}")
msg = "Duplicate measurement exists with same reference"
raise AstroDBError(msg)
flags["message"] = msg
if raise_error:
raise AstroDBError(msg)
else:
return flags
else:
logger.debug("!!! Another parallax measurement exists,")
if logger.level == 10:
Expand Down Expand Up @@ -291,24 +131,38 @@ def ingest_parallax(
"adopted": adopted,
"comments": comment,
}
flags["content"] = parallax_data

try:
plx_obj = Parallaxes(**parallax_data)
with db.session as session:
session.add(plx_obj)
session.commit()
logger.info(f" Photometry added to database: {parallax_data}\n")
except sqlalchemy.exc.IntegrityError as e:

msg = (
"The source may not exist in Sources table.\n"
"The parallax reference may not exist in Publications table. "
"Add it with add_publication function. \n"
flags["added"] = True
return flags
except sqlalchemy.exc.IntegrityError as error_msg:
msg = ""
matching_sources = (
db.query(db.Sources).filter(db.Sources.c.source == source).astropy()
)
if len(matching_sources) == 0:
msg += f"Source '{source}' does not exist in Sources table. "
matching_refs = (
db.query(db.Publications)
.filter(db.Publications.c.reference == reference)
.astropy()
)
if len(matching_refs) == 0:
msg += f"Reference '{reference}' does not exist in Publications table. "
if raise_error:
raise AstroDBError(e)
raise AstroDBError(
f"Error during parallax ingest. {msg}Error caught: {error_msg}"
)
else:
logger.warning(msg)
flags["message"] = msg
return flags


# PROPER MOTIONS
Expand Down
4 changes: 2 additions & 2 deletions tests/test_astrometry.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,11 @@ def test_ingest_parallaxes(temp_db, t_plx):
def test_parallax_exceptions(temp_db):
with pytest.raises(AstroDBError) as error_message:
ingest_parallax(temp_db, "bad source", 1, 1, "Ref 1")
assert "FOREIGN KEY constraint failed" in str(error_message.value)
assert "does not exist in Sources table" in str(error_message.value)

with pytest.raises(AstroDBError) as error_message:
ingest_parallax(temp_db, "Fake 1", 1, 1, "bad ref")
assert "FOREIGN KEY constraint failed" in str(error_message.value)
assert "does not exist in Publications table" in str(error_message.value)

ingest_parallax(temp_db, "Fake 2", 1, 1, "Ref 2")
with pytest.raises(AstroDBError) as error_message:
Expand Down

0 comments on commit 06b6738

Please sign in to comment.