Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ci tests #82

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions wfc3tools/calwf3.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,3 +185,6 @@ def calwf3(input=None, output=None, printtime=False, save_tmp=False,
print("Unknown return code found!")
ec = return_code
raise RuntimeError("calwf3.e exited with code {}".format(ec))

# return 1 if pipeline runs without error
return 1
118 changes: 118 additions & 0 deletions wfc3tools/tests/test_calwf3.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
import pytest
from wfc3tools import calwf3

import glob
import os
import shutil


def test_no_valid_input():
"""Run a very simple aliveness test."""
Expand All @@ -20,3 +24,117 @@ def test_version_print():
def cal():
calwf3(version=True)
cal()

class TestCALWF3:
# Test that the wrapper for calwf3 runs, and that input
# parameters to the function are appropriatley passed
# to calwf3.e

# test data (and reference files) should be on disk
# (figure out best way to do this, for now just have test data in repo)
# tried making dummy files that are generated and written out then deleted,
# but calwf3 crashes. there should be a way to do this but giving up for now.

def __init__(self):
self.data_dir = os.path.join(os.getcwd(), 'wfc3_data')
self.tmp_dir = os.path.join(os.getcwd(), 'tmp_dir')
if not os.path.isdir(self.tmp_dir):
os.mkdir(self.tmp_dir)

# cleanup just to be safe before running any tests
self.cleanup_dir()

def cleanup_dir(self):
# remove all files in temporary directory

for f in glob.glob(self.tmp_dir + '/*'):
os.remove(f)

def test_calwf3_input_single_raw(self):

# copy over a single raw file to temp directory
shutil.copy(glob.glob(os.path.join(self.data_dir, 'IR/SA/iev519qzq_raw.fits'))[0], self.tmp_dir)

# single filename
input_raw = os.path.join(self.tmp_dir, 'iev519qzq_raw.fits')
ret = calwf3(input_raw)
self.cleanup_dir()
assert ret == 1

def test_calwf3_input_list_raws(self):

# xfail this test! even though the documentation says this input type is
# possible it's not.

# copy over all raw files in asn to temp directory
for f in glob.glob(os.path.join(self.data_dir, 'IR/SA/*raw.fits')):
shutil.copy(f, self.tmp_dir)

input_raws = glob.glob(self.tmp_dir + '/*raw.fits')

ret = calwf3(input_raws)
self.cleanup_dir()

assert ret == 1

def test_calwf3_input_wildcard(self):

# also xfail this for now until its fixed, documentation advertises
# this as an option but it fails in calwf3.e

# copy over all raw files in asn to temp directory
for f in glob.glob(os.path.join(self.data_dir, 'IR/SA/*raw.fits')):
shutil.copy(f, self.tmp_dir)

ret = calwf3(self.tmp_dir + '/*raw.fits')
self.cleanup_dir()

assert ret == 1

def test_calwf3_input_asn(self):

# this test is failing in python, but i can run it on command line.
# is this a character limit issue?

# copy over all raw files in asn to temp directory
for f in glob.glob(os.path.join(self.data_dir, 'UVIS/SA/*raw.fits')):
shutil.copy(f, self.tmp_dir)

# and asn
shutil.copy(glob.glob(os.path.join(self.data_dir, 'UVIS/SA/*asn*'))[0], self.tmp_dir)

input_asn = os.path.join(self.tmp_dir, 'iev510050_asn.fits')
ret = calwf3(input_asn)
#self.cleanup_dir()

assert ret == 1

def test_calwf3_output_filename(self):
# this also needs to be fixed, skip/xfail for now.

# copy over a single raw file to temp directory
shutil.copy(glob.glob(os.path.join(self.data_dir, 'IR/SA/iev519qzq_raw.fits'))[0], self.tmp_dir)

# single filename
input_raw = os.path.join(self.tmp_dir, 'iev519qzq_raw.fits')

calwf3(input_raw, output='test_output')
self.cleanup_dir()

assert os.path.isfile(os.path.join(self.tmp_dir, 'test_output.fits'))


def test_calwf3_save_tmp(self):
# test that all the intermediate files are saved to disk
# when ``save_tmp=True``. for some reason, this is only working
# for UVIS.

# run on single IR raw
shutil.copy(glob.glob(os.path.join(self.data_dir, 'UVIS/FF/id5e01pfq_raw.fits'))[0], self.tmp_dir)
input_raw = os.path.join(self.tmp_dir, 'id5e01pfq_raw.fits')
ret = calwf3(input_raw, save_tmp=True)

# make sure expected temporary outputs were produced
expected_tmp_outputs = ['rac_tmp', 'blc_tmp', 'blv_tmp']
for f in expected_tmp_outputs:
assert(os.path.isfile(os.path.join(self.tmp_dir, f'id5e01pfq_{f}.fits')))
55 changes: 55 additions & 0 deletions wfc3tools/tests/test_pstack.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from wfc3tools.pstack import pstack
from astropy.io import fits
import numpy as np


class TestPSTACK:

def __init__(self):
self.data_dir = os.path.join(os.getcwd(), 'wfc3_data')
self.tmp_dir = os.path.join(os.getcwd(), 'tmp_dir')
if not os.path.isdir(self.tmp_dir):
os.mkdir(self.tmp_dir)

# cleanup just to be safe before running any tests
self.cleanup_dir()

# copy test file once, no outputs created, clean up after all tests
shutil.copy(glob.glob(os.path.join(data_dir, 'IR/SA/iev519qyq_ima.fits'))[0], tmp_dir)
self.input_ima = os.path.join(tmp_dir, 'iev519qyq_ima.fits')

def cleanup_dir(self):
# remove all files in temporary directory

for f in glob.glob(self.tmp_dir + '/*'):
os.remove(f)


def test_pstack_basic():
# test basic functionality of pstack

with fits.open(self.input_ima, mode='update') as hdu:
for i in range(1, 17):
hdu["SCI", i].header['SAMPTIME'] = 1.0 # make all extimes 1s
hdu['SCI', i].data[0, 0] = 1. # and data values

xaxis, yaxis = pstack(self.input_ima, column=0, row=0, extname="sci")
assert(np.all(xaxis[:-1]) == 1.)
assert(np.all(yaxis[:-1]) == 1.)


def test_pstack_ext(self):
# test using 'ERR' or 'DQ' instead of 'SCI'

exts = ['ERR', 'DQ']

for ext in exts:
# set slice of image to 1 so stats are all just 1.
with fits.open(self.input_ima, mode='update') as hdu:
for i in range(1, 17):
hdu[ext, i].header['SAMPTIME'] = 1.0 # make all exptimes 1s
hdu[ext, i].data[:, :] = 1. # and all data values

xaxis, yaxis = pstack(self.input_ima, column=0, row=0, extname=ext)
assert(np.all(xaxis[:-1]) == 1.)
assert(np.all(yaxis[:-1]) == 1.)
91 changes: 91 additions & 0 deletions wfc3tools/tests/test_pstat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
from wfc3tools.pstat import pstat
from astropy.io import fits
import numpy as np

import glob
import os
import shutil


class TestPSTAT:

def __init__(self):
self.data_dir = os.path.join(os.getcwd(), 'wfc3_data')
self.tmp_dir = os.path.join(os.getcwd(), 'tmp_dir')
if not os.path.isdir(self.tmp_dir):
os.mkdir(self.tmp_dir)

# cleanup just to be safe before running any tests
self.cleanup_dir()

# copy test file once, no outputs created, clean up after all tests
shutil.copy(glob.glob(os.path.join(data_dir, 'IR/SA/iev519qyq_ima.fits'))[0], tmp_dir)
self.input_ima = os.path.join(tmp_dir, 'iev519qyq_ima.fits')

def cleanup_dir(self):
# remove all files in temporary directory

for f in glob.glob(self.tmp_dir + '/*'):
os.remove(f)


def test_pstat_basic(self):
# test basic functionality of pstat

# set slice of image to 1 so stats are all just 1.
with fits.open(self.input_ima, mode='update') as hdu:
for i in range(1, 17):
hdu["SCI", i].header['SAMPTIME'] = 1.0 # make all exptimes 1s
hdu['SCI', i].data[:, :] = 1. # and all data values

xdata, ydata = pstat(self.input_ima)
assert(np.all(xaxis[:-1]) == 1.)
assert(np.all(yaxis[:-1]) == 1.)


def test_pstat_stat_modes(self):

# set slice of image to 1 so stats are all just 1.
with fits.open(self.input_ima, mode='update') as hdu:
for i in range(1, 17):
hdu["SCI", i].header['SAMPTIME'] = 1.0 # make all exptimes 1s
hdu['SCI', i].data[:, :] = 1. # and all data values

# test every stat
stats = ["mean", "midpt", "mode", "stddev", "min", "max"]
for stat in stats:
xaxis, yaxis = pstat(input_ima)
assert(np.all(xaxis[:-1]) == 1.)
assert(np.all(yaxis[:-1]) == 1.)


def test_pstat_slice(self):
# test using slices of data rather than whole array

slicee = (0, 5)

# set slice of image to 1 so stats are all just 1.
with fits.open(self.input_ima, mode='update') as hdu:
for i in range(1, 17):
hdu["SCI", i].header['SAMPTIME'] = 1.0 # make all exptimes 1s
hdu['SCI', i].data[0:5, 0:5] = 99. # and all data values
xaxis, yaxis = pstat(self.input_ima, row_slice=slicee, col_slice=slicee)
assert(np.all(xaxis[:-1]) == 99.)
assert(np.all(yaxis[:-1]) == 99.)


def test_pstat_ext(self):
# test using err/dq instead of sci

exts = ['ERR', 'DQ']

for ext in exts:
# set slice of image to 1 so stats are all just 1.
with fits.open(self.input_ima, mode='update') as hdu:
for i in range(1, 17):
hdu[ext, i].header['SAMPTIME'] = 1.0 # make all exptimes 1s
hdu[ext, i].data[:, :] = 1. # and all data values

xdata, ydata = pstat(self.input_ima, extname=ext)
assert(np.all(xaxis[:-1]) == 1.)
assert(np.all(yaxis[:-1]) == 1.)