Skip to content

Commit

Permalink
Initial implementation of Vizier search for CatalogData
Browse files Browse the repository at this point in the history
  • Loading branch information
mwcraig committed Nov 26, 2023
1 parent cf5191e commit e08aee7
Show file tree
Hide file tree
Showing 2 changed files with 296 additions and 1 deletion.
215 changes: 215 additions & 0 deletions stellarphot/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
from astropy.time import Time
from astropy.units import Quantity

from astroquery.vizier import Vizier

import pandas as pd
from pydantic import BaseModel, root_validator

import numpy as np
Expand Down Expand Up @@ -773,6 +776,218 @@ def catalog_name(self):
def catalog_source(self):
return self.meta['catalog_source']

@staticmethod
def _tidy_vizier_catalog(data, mag_column_regex, color_column_regex):
"""
Transform a Vizier catalog with magnitudes into tidy structure. Or
at least tidier -- this only handles changing magnitude and color
columns into tidy format.
Parameters
----------
data : `astropy.table.Table`
Table of catalog information. There are no restrictions on the columns.
mag_column_regex : str
Regular expression to match magnitude columns.
color_column_regex : str
Regular expression to match color columns.
Returns
-------
`astropy.table.Table`
Table with magnitude and color columns in tidy format. All other
columns are preserved as they were in the input data.
"""

mag_re = re.compile(mag_column_regex)
color_re = re.compile(color_column_regex)

# Find all the magnitude and color columns
mag_match = [mag_re.match(col) for col in data.colnames]
color_match = [color_re.match(col) for col in data.colnames]

# create a single list of all the matches
matches = [m_match if m_match else c_match
for m_match, c_match in zip(mag_match, color_match)]

# The passband should be the first group match.
passbands = [match[1] for match in matches if match]

# The original column names for those that match
orig_cols = [match.string for match in matches if match]

# Each magnitude column should have an error column whose name
# is the magnitude column name with 'e_' prepended. While prepending
# is what pandas will need to transform the data, many non-magnitude
# columns will also start ``e_`` and we don't want to change those,
# so we will rename the error columns too.
mag_err_cols = [f'e_{col}' for col in orig_cols]

# Dictionary to update the magnitude column names. The prepended
# part could be anything, but the choice below is unlikely to be
# used in a column name in a real catalog.
mag_col_prepend = 'magstphot'
mag_col_map = {orig_col: f'{mag_col_prepend}_{passband}' for orig_col, passband
in zip(orig_cols, passbands)}

# Dictionary to update the magnitude error column names. The
# prepended part could be anything, but the choice below is
# unlikely to be used in a column name in a real catalog.
mag_err_col_prepend = 'errorstphot'
mag_err_col_map = {orig_col: f'{mag_err_col_prepend}_{passband}' for orig_col,
passband in zip(mag_err_cols, passbands)}

# All columns except those we have renamed should be preserved, so make
# a list of them for us in wide_to_long.
id_columns = set(data.colnames) - set(orig_cols) - set(mag_err_cols)

# Make the input data into a Pandas DataFrame
df = data.to_pandas()

# Rename the magnitude and magnitude error columns
df.rename(columns=mag_col_map, inplace=True)
df.rename(columns=mag_err_col_map, inplace=True)

# Make the DataFrame tidy
df = pd.wide_to_long(df, stubnames=[mag_col_prepend, mag_err_col_prepend],
i=id_columns, j='passband', sep='_', suffix='.*')

# Make the magnitude and error column names more sensible
df.rename(columns={mag_col_prepend: 'mag',
mag_err_col_prepend: 'mag_error'},
inplace=True)
# Reset the index, which is otherwise a multi-index of the id columns.
df = df.reset_index()

# Convert back to an astropy table
return Table.from_pandas(df)

@classmethod
def from_vizier(cls,
header_or_center,
desired_catalog,
radius=0.5 * u.degree,
clip_by_frame=True,
padding=100,
colname_map=None,
mag_column_regex=r'^([a-zA-Z]+|[a-zA-Z]+-[a-zA-Z]+)_?mag$',
color_column_regex=r'^([a-zA-Z]+-[a-zA-Z]+)$',
prepare_catalog=None):
"""
Return the items from catalog that are within the search radius and
(optionally) within the field of view of a frame.
Parameters
----------
header_or_center : FITS header or `astropy.coordinates.SkyCoord`
Either a FITS header with WCS information or a `SkyCoord` object.
The center of the frame or the input coordinate is the center
of the cone search.
desired_catalog : str
Vizier name of the catalog to be searched.
ra_column : str, optional
Name of the column in the catalog that contains the RA values.
Default is 'RAJ2000'.
dec_column : str, optional
Name of the column in the catalog that contains the Dec values.
Default is 'DEJ2000'.
radius : float, optional
Radius, in degrees, around which to search. Default is 0.5.
clip_by_frame : bool, optional
If ``True``, only return items that are within the field of view
of the frame. Default is ``True``.
padding : int, optional
Coordinates need to be at least this many pixels in from the edge
of the frame to be considered in the field of view. Default value
is 100.
colname_map : dict, optional
Dictionary mapping column names in the catalog to column names in
a `stellarphot.CatalogData` object. Default is ``None``.
mag_column_regex : str, optional
Regular expression to match magnitude columns. See notes below for
more information about the default value.
color_column_regex : str, optional
Regular expression to match color columns. See notes below for
more information about the default value.
prepare_catalog : callable, optional
Function to call on the catalog after it is retrieved from Vizier.
Returns
-------
`astropy.table.Table`
Table of catalog information for stars in the field of view.
"""

if isinstance(header_or_center, SkyCoord):
# Center was passed in, just use it.
center = header_or_center
if clip_by_frame:
raise ValueError('To clip entries by frame you must use '
'a WCS as the first argument.')
else:
# Find the center of the frame
shape = (header_or_center['NAXIS2'], header_or_center['NAXIS1'])
center = header_or_center.wcs.pixel_to_world(shape[1] / 2,
shape[0] / 2)

# Get catalog via cone search
Vizier.ROW_LIMIT = -1 # Set row_limit to have no limit
cat = Vizier.query_region(center, radius=radius, catalog=desired_catalog)

# Vizier always returns list even if there is only one element. Grab that
# element.
cat = cat[0]

if prepare_catalog is not None:
final_cat = prepare_catalog(cat)
else:
final_cat = CatalogData._tidy_vizier_catalog(cat, mag_column_regex,
color_column_regex)

# Since we go through pandas, we lose the units, so we need to add them back
# in.
#
# We need to swap the key/values on the input map to get the old column names
# as values.
invert_map = {v: k for k, v in colname_map.items()}
final_cat[invert_map['ra']].unit = u.deg
final_cat[invert_map['dec']].unit = u.deg

# Make the CatalogData object....
cat = cls(input_data=final_cat,
colname_map=colname_map,
catalog_name=desired_catalog,
catalog_source='Vizier')

# ...and now that the column names are standardized, clip by frame if
# desired.
if clip_by_frame:
cat_coords = SkyCoord(ra=cat['ra'], dec=cat['dec'])
x, y = header_or_center.wcs.all_world2pix(cat_coords.ra, cat_coords.dec, 0)
in_x = (x >= padding) & (x <= header_or_center.wcs.pixel_shape[0] - padding)
in_y = (y >= padding) & (y <= header_or_center.wcs.pixel_shape[1] - padding)
in_fov = in_x & in_y
cat = cat[in_fov]

return cat


class SourceListData(BaseEnhancedTable):
"""
Expand Down
82 changes: 81 additions & 1 deletion stellarphot/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
from astropy.table import Table, Column
from astropy.time import Time
from astropy.io import ascii
from astropy.coordinates import EarthLocation
from astropy.coordinates import EarthLocation, SkyCoord
from astropy.utils.data import get_pkg_data_filename
from pydantic import ValidationError

from stellarphot.core import (Camera, BaseEnhancedTable, PhotometryData,
CatalogData, SourceListData)

Expand Down Expand Up @@ -487,6 +488,85 @@ def test_catalog_recursive():
catalog_source="Vizier", colname_map=vsx_colname_map)


def test_tidy_vizier_catalog():
# Test just the part of the code that converts the table returned by Vizier
# into a table that can be used by CatalogData.
apass_input = Table.read(get_pkg_data_filename('data/test_apass_subset.ecsv'))

result = CatalogData._tidy_vizier_catalog(apass_input,
r'^([a-zA-Z]+|[a-zA-Z]+-[a-zA-Z]+)_?mag$',
r'^([a-zA-Z]+-[a-zA-Z]+)$')
assert len(result) == 6

# Check some column names
assert 'passband' in result.colnames
assert 'mag' in result.colnames
assert 'mag_error' in result.colnames

# Spot check a couple of values
one_star = 16572870
one_Vmag = 13.399
one_Vmag_error = 0.075

just_one = result[(result['recno'] == one_star) & (result['passband'] == 'V')]
assert np.abs(just_one['mag'][0] - one_Vmag) < 1e-6
assert np.abs(just_one['mag_error'][0] - one_Vmag_error) < 1e-6


def test_catalog_from_vizier_search_apass():
# Nothing special about this point...
sc = SkyCoord(ra=0, dec=0, unit='deg')

# Small enough radius to get only one star
radius = 0.03 * u.deg

apass_colnames = {
'recno': 'id', # There is no APASS ID, this is the one generated by Vizier
'RAJ2000': 'ra',
'DEJ2000': 'dec',
}

apass = CatalogData.from_vizier(sc, 'II/336/apass9',
radius=radius,
colname_map=apass_colnames,
clip_by_frame=False)
assert len(apass) == 6

# Got the values below from Vizier on 2023-11-28
assert apass['id'][0] == 17672748

just_V = apass[apass['passband'] == 'V']
assert np.abs(just_V['mag'][0] - 15.559) < 1e-6


def test_catalog_from_vizier_search_vsx():
# Do a cone search with a small enough radius to return exaclty one star,
# DQ Psc, which happens to already be in the test data.
coordinate = SkyCoord(ra=359.94371 * u.deg, dec=-0.2801 * u.deg)
vsx_map = dict(
Name='id',
RAJ2000='ra',
DEJ2000='dec',
)

# This one is easier -- it already has the passband in a column name.
# We'll use the maximum magnitude as the magnitude column.
def prepare_cat(cat):
cat.rename_column('max', 'mag')
cat.rename_column('n_max', 'passband')
return cat
my_cat = CatalogData.from_vizier(coordinate,
'B/vsx/vsx',
radius=0.1*u.arcmin,
clip_by_frame=False,
colname_map=vsx_map,
prepare_catalog=prepare_cat)

assert my_cat['id'][0] == 'DQ Psc'
assert my_cat['passband'][0] == 'Hp'
assert my_cat['Type'][0] == 'SRB'


# Load test apertures
test_sl_data = ascii.read(get_pkg_data_filename('data/test_sourcelist.ecsv'),
format='ecsv',
Expand Down

0 comments on commit e08aee7

Please sign in to comment.