From 1e7fea47e3c545a045918d73d51c20803fc4ffce Mon Sep 17 00:00:00 2001 From: Kelle Cruz Date: Fri, 14 Jun 2024 14:54:34 -0400 Subject: [PATCH] spectra convert utils update (#512) --- simple/utils/spectra_convert.py | 145 +++++++++++++++++--------------- 1 file changed, 77 insertions(+), 68 deletions(-) diff --git a/simple/utils/spectra_convert.py b/simple/utils/spectra_convert.py index 718b8e631..7b38fa5eb 100644 --- a/simple/utils/spectra_convert.py +++ b/simple/utils/spectra_convert.py @@ -10,56 +10,16 @@ logger = logging.getLogger("SIMPLE") -def convert_to_fits(spectrum_info_all, spectrum_table, header): - # TODO: add docstring with expected keywords - # TODO: add error handling for expected keywords +def compile_header(wavelength_data, **original_header_dict): + """Creates a header from a dictionary of values. - object_name = header["OBJECT"] - - header["HISTORY"] = "File made with the SIMPLE convert_to_fits.py function" + wavelength_data: an array of wavelengths - wavelength = spectrum_table["wavelength"] - flux = spectrum_table["flux"] - flux_unc = spectrum_table["flux_uncertainty"] + original_header_dict: a dictionary of values to be included in the header - # header = compile_header(wavelength, **spectrum_info_all) - - spectrum_data_out = Table( - { - "wavelength": wavelength * u.um, - "flux": flux * u.Jy, - "flux_uncertainty": flux_unc * u.Jy, - } - ) + Returns: a fits header dictionary - # Make the HDUs - hdu1 = fits.BinTableHDU(data=spectrum_data_out) - hdu1.header["EXTNAME"] = "SPECTRUM" - hdu1.header.set("OBJECT", object_name, "Object Name") - hdu0 = fits.PrimaryHDU(header=header) - - # Write the MEF with the header and the data - spectrum_mef = fits.HDUList([hdu0, hdu1]) # hdu0 is header and hdu1 is data - - fits_filename = ( - spectrum_info_all["fits_data_dir"] - + object_name - + "_" - + header["DATE-OBS"] - + ".fits" - ) - try: - spectrum_mef.writeto(fits_filename, overwrite=True, output_verify="exception") - # TODO: think about overwrite - logger.info(f"Wrote {fits_filename}") - except: - raise - - return - - -def compile_header(wavelength_data, **spectra_data_info): - """Creates a header from a dictionary of values.""" + """ required_keywords = [ "VOCLASS", @@ -81,7 +41,7 @@ def compile_header(wavelength_data, **spectra_data_info): "observatory", ] - keywords_given = list(spectra_data_info.keys()) + keywords_given = list(original_header_dict.keys()) for key in keywords_given: if key not in required_keywords: @@ -89,7 +49,7 @@ def compile_header(wavelength_data, **spectra_data_info): else: pass - history = spectra_data_info["history"] + history = original_header_dict["history"] comment = "To read this file in with specutils use Spectrum1D.read() with format = 'tabular-fits'" @@ -97,26 +57,26 @@ def compile_header(wavelength_data, **spectra_data_info): header.set("EXTNAME", "PRIMARY", "name of this extension") try: - header.set("VOCLASS", spectra_data_info["voclass"], "VO Data Model") + header.set("VOCLASS", original_header_dict["voclass"], "VO Data Model") except KeyError: logging.warning("No VO Data Model") # Target Data try: header.set( - "OBJECT", spectra_data_info["object_name"], "Name of observed object" + "OBJECT", original_header_dict["object_name"], "Name of observed object" ) except KeyError: logging.warning("No object name") try: - ra = spectra_data_info["RA"] + ra = original_header_dict["RA"] header.set("RA", ra, "[deg] Pointing position") except KeyError: logging.warning("No RA") ra = None try: - dec = spectra_data_info["dec"] + dec = original_header_dict["dec"] header.set("DEC", dec, "[deg] Pointing position") except KeyError: logging.warning("No DEC") @@ -124,8 +84,8 @@ def compile_header(wavelength_data, **spectra_data_info): try: time = ( - Time(to_datetime(spectra_data_info["start_time"])).jd - + Time(to_datetime(spectra_data_info["stop_time"])).jd + Time(to_datetime(original_header_dict["start_time"])).jd + + Time(to_datetime(original_header_dict["stop_time"])).jd ) / 2 header.set("TMID", time, "[d] MJD mid expsoure") except KeyError: @@ -133,40 +93,40 @@ def compile_header(wavelength_data, **spectra_data_info): logging.warning("No time") try: - exposure_time = spectra_data_info["exposure_time"] + exposure_time = original_header_dict["exposure_time"] header.set("TELAPSE", exposure_time, "[s] Total elapsed time") except KeyError: exposure_time = None logging.warning("No exposure time") try: - time_start = Time(to_datetime(spectra_data_info["start_time"])).jd + time_start = Time(to_datetime(original_header_dict["start_time"])).jd header.set("TSTART", time_start, "[d] MJD start time") except KeyError: time_start = None try: - time_stop = Time(to_datetime(spectra_data_info["stop_time"])).jd + time_stop = Time(to_datetime(original_header_dict["stop_time"])).jd header.set("TSTOP", time_stop, "[d] MJD stop time") except KeyError: time_stop = None try: - obs_location = spectra_data_info["observatory"] + obs_location = original_header_dict["observatory"] header.set("OBSERVAT", obs_location, "name of observatory") except KeyError: obs_location = None logging.warning("No observatory") try: - telescope = spectra_data_info["telescope"] + telescope = original_header_dict["telescope"] header.set("TELESCOP", telescope, "name of telescope") except KeyError: telescope = None logging.warning("No telescope") try: - instrument = spectra_data_info["instrument"] + instrument = original_header_dict["instrument"] header.set("INSTRUME", instrument, "name of instrument") except KeyError: instrument = None @@ -180,7 +140,7 @@ def compile_header(wavelength_data, **spectra_data_info): w_mid = ((w_max + w_min) / 2).astype(np.single) try: - bandpass = spectra_data_info["bandpass"] + bandpass = original_header_dict["bandpass"] header.set("SPECBAND", bandpass, "SED.bandpass") except KeyError: bandpass = None @@ -196,14 +156,14 @@ def compile_header(wavelength_data, **spectra_data_info): header.set("TDMAX1", w_max, f"[{w_units}] Ending wavelength") try: - aperture = spectra_data_info["aperture"] + aperture = original_header_dict["aperture"] header.set("APERTURE", aperture, "[arcsec] slit width") except KeyError: aperture = None logging.warning("No aperature") try: - obs_date = to_datetime(spectra_data_info["obs_date"]).strftime("%Y-%m-%d") + obs_date = to_datetime(original_header_dict["obs_date"]).strftime("%Y-%m-%d") header.set("DATE-OBS", obs_date, "date of the observation") except KeyError: print(KeyError) @@ -212,30 +172,30 @@ def compile_header(wavelength_data, **spectra_data_info): # Publication Information try: - title = spectra_data_info["title"] # trim so header wraps nicely + title = original_header_dict["title"] # trim so header wraps nicely header.set("TITLE", title, "Data set title") except KeyError: pass logging.warning("No title") try: - header.set("AUTHOR", spectra_data_info["author"], "Authors of the data") + header.set("AUTHOR", original_header_dict["author"], "Authors of the data") except KeyError: pass logging.warning("No author") try: - header.set("VOREF", spectra_data_info["bibcode"], "Bibcode of dataset") + header.set("VOREF", original_header_dict["bibcode"], "Bibcode of dataset") except KeyError: pass try: - header.set("REFERENC", spectra_data_info["doi"], "DOI of dataset") + header.set("REFERENC", original_header_dict["doi"], "DOI of dataset") except KeyError: pass try: - header.set("VOPUB", spectra_data_info["VOPUB"], "VO Publisher") + header.set("VOPUB", original_header_dict["VOPUB"], "VO Publisher") except KeyError: logging.warning("No VO Publisher") @@ -252,3 +212,52 @@ def compile_header(wavelength_data, **spectra_data_info): header.set("DATE", date.today().strftime("%Y-%m-%d"), "Date of file creation") return header + + +def convert_to_fits( + wavelength=None, + flux=None, + flux_unc=None, + header: dict = None, + out_directory: str = ".", +): + """Converts a spectrum to a fits file. + # TODO: add typehits for astropy quantities + + wavelength: an array of wavelengths with units + flux: an array of fluxes with units + flux_unc: an array of flux uncertainties with units + header: a dictionary of header values made with compile_header function + + """ + + object_name = header["OBJECT"] + + header["HISTORY"] = "File made with the simple.spectra.convert_to_fits.py function" + + spectrum_data_out = Table( + { + "wavelength": wavelength, + "flux": flux, + "flux_uncertainty": flux_unc, + } + ) + + # Make the HDUs + hdu1 = fits.BinTableHDU(data=spectrum_data_out) + hdu1.header["EXTNAME"] = "SPECTRUM" + hdu1.header.set("OBJECT", object_name, "Object Name") + hdu0 = fits.PrimaryHDU(header=header) + + # Write the MEF with the header and the data + spectrum_mef = fits.HDUList([hdu0, hdu1]) # hdu0 is header and hdu1 is data + + fits_filename = out_directory + object_name + "_" + header["DATE-OBS"] + ".fits" + try: + spectrum_mef.writeto(fits_filename, overwrite=True, output_verify="exception") + # TODO: think about overwrite + logger.info(f"Wrote {fits_filename}") + except: + raise + + return