Skip to content

Commit

Permalink
speedup DP3
Browse files Browse the repository at this point in the history
  • Loading branch information
jurjen93 committed Oct 8, 2024
1 parent 6167bd0 commit 78fbc41
Showing 1 changed file with 110 additions and 65 deletions.
175 changes: 110 additions & 65 deletions subtract/subtract_with_wsclean.py
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ def predict(self, h5parm: str = None, facet_regions: str = None):


def run_DP3(self, phaseshift: str = None, freqavg: str = None,
timeres: str = None, concat: bool = None,
timeres: str = None, concat: bool = None, speedup_facet_subtract: bool = None,
applybeam: bool = None, applycal_h5: str = None, dirname: str = None):
"""
Run DP3 command
Expand Down Expand Up @@ -582,70 +582,114 @@ def run_DP3(self, phaseshift: str = None, freqavg: str = None,
command += ['ps.type=phaseshifter',
'ps.phasecenter=' + phasecenter]

# 2) APPLY BEAM
if applybeam:
steps.append('beam1')
command += ['beam1.type=applybeam',
'beam1.updateweights=True']
if applycal_h5 is not None and phaseshift is not None and dirname is not None:
H = tables.open_file(applycal_h5)
sources = H.root.sol000.source[:]
H.close()
dirs = [make_utf8(dir) for dir in sources['name']]
dir_idx = dirs.index(dirname)
ra, dec = sources['dir'][dir_idx]
dir = str(f"[{round(ra,5)},{round(dec,5)}]")
command += ['beam1.direction='+dir]
else:
command += ['beam1.direction=[]']

# 3) APPLYCAL
if applycal_h5 is not None:
# add fulljones solutions apply
if isfulljones(applycal_h5):
steps.append('ac')
command += ['ac.type=applycal',
'ac.parmdb=' + applycal_h5,
'ac.correction=fulljones',
'ac.soltab=[amplitude000,phase000]']
if phaseshift is not None and dirname is not None:
command += ['ac.direction=' + dirname]
# add non-fulljones solutions apply
else:
ac_count = 0
T = tables.open_file(applycal_h5)
for corr in T.root.sol000._v_groups.keys():
if 'phase' in corr or 'amp' in corr:
command += [f'ac{ac_count}.type=applycal',
f'ac{ac_count}.parmdb={applycal_h5}',
f'ac{ac_count}.correction={corr}']
if phaseshift is not None and dirname is not None:
command += [f'ac{ac_count}.direction=' + dirname]
steps.append(f'ac{ac_count}')
ac_count += 1
T.close()

# 4) APPLY BEAM (OPTIONAL IF APPLY BEAM USED FOR APPLYCAL)
if applybeam and applycal_h5 is not None and phaseshift is not None and dirname is not None:
steps.append('beam2')
command += ['beam2.type=applybeam',
'beam2.updateweights=True',
'beam2.direction=[]']

# 5) AVERAGING
if freqavg is not None or timeres is not None:
steps.append('avg')
command += ['avg.type=averager']
if freqavg is not None:
if str(freqavg).isdigit() or not str(freqavg)[-1].isalpha():
command += [f'avg.freqstep={int(freqavg)}']
if speedup_facet_subtract:
# 2) AVERAGING
if freqavg is not None or timeres is not None:
steps.append('avg')
command += ['avg.type=averager']
if freqavg is not None:
if str(freqavg).isdigit() or not str(freqavg)[-1].isalpha():
command += [f'avg.freqstep={int(freqavg)}']
else:
command += [f'avg.freqresolution={freqavg}']
if timeres is not None:
command += [f'avg.timeresolution={timeres}']

# 3) APPLY BEAM (OPTIONAL IF APPLY BEAM USED FOR APPLYCAL)
if applybeam and applycal_h5 is not None and phaseshift is not None and dirname is not None:
steps.append('beam2')
command += ['beam2.type=applybeam',
'beam2.updateweights=True',
'beam2.direction=[]']

# 4) APPLYCAL
if applycal_h5 is not None:
# add fulljones solutions apply
if isfulljones(applycal_h5):
steps.append('ac')
command += ['ac.type=applycal',
'ac.parmdb=' + applycal_h5,
'ac.correction=fulljones',
'ac.soltab=[amplitude000,phase000]']
if phaseshift is not None and dirname is not None:
command += ['ac.direction=' + dirname]
# add non-fulljones solutions apply
else:
ac_count = 0
T = tables.open_file(applycal_h5)
for corr in T.root.sol000._v_groups.keys():
if 'phase' in corr or 'amp' in corr:
command += [f'ac{ac_count}.type=applycal',
f'ac{ac_count}.parmdb={applycal_h5}',
f'ac{ac_count}.correction={corr}']
if phaseshift is not None and dirname is not None:
command += [f'ac{ac_count}.direction=' + dirname]
steps.append(f'ac{ac_count}')
ac_count += 1
T.close()

else:
# 2) APPLY BEAM
if applybeam:
steps.append('beam1')
command += ['beam1.type=applybeam',
'beam1.updateweights=True']
if applycal_h5 is not None and phaseshift is not None and dirname is not None:
H = tables.open_file(applycal_h5)
sources = H.root.sol000.source[:]
H.close()
dirs = [make_utf8(dir) for dir in sources['name']]
dir_idx = dirs.index(dirname)
ra, dec = sources['dir'][dir_idx]
dir = str(f"[{round(ra,5)},{round(dec,5)}]")
command += ['beam1.direction='+dir]
else:
command += [f'avg.freqresolution={freqavg}']
if timeres is not None:
# if str(timeres).isdigit():
# command += [f'avg.timestep={int(timeres)}']
# else:
command += [f'avg.timeresolution={timeres}']
command += ['beam1.direction=[]']

# 3) APPLYCAL
if applycal_h5 is not None:
# add fulljones solutions apply
if isfulljones(applycal_h5):
steps.append('ac')
command += ['ac.type=applycal',
'ac.parmdb=' + applycal_h5,
'ac.correction=fulljones',
'ac.soltab=[amplitude000,phase000]']
if phaseshift is not None and dirname is not None:
command += ['ac.direction=' + dirname]
# add non-fulljones solutions apply
else:
ac_count = 0
T = tables.open_file(applycal_h5)
for corr in T.root.sol000._v_groups.keys():
if 'phase' in corr or 'amp' in corr:
command += [f'ac{ac_count}.type=applycal',
f'ac{ac_count}.parmdb={applycal_h5}',
f'ac{ac_count}.correction={corr}']
if phaseshift is not None and dirname is not None:
command += [f'ac{ac_count}.direction=' + dirname]
steps.append(f'ac{ac_count}')
ac_count += 1
T.close()

# 4) APPLY BEAM (OPTIONAL IF APPLY BEAM USED FOR APPLYCAL)
if applybeam and applycal_h5 is not None and phaseshift is not None and dirname is not None:
steps.append('beam2')
command += ['beam2.type=applybeam',
'beam2.updateweights=True',
'beam2.direction=[]']

# 5) AVERAGING
if freqavg is not None or timeres is not None:
steps.append('avg')
command += ['avg.type=averager']
if freqavg is not None:
if str(freqavg).isdigit() or not str(freqavg)[-1].isalpha():
command += [f'avg.freqstep={int(freqavg)}']
else:
command += [f'avg.freqresolution={freqavg}']
if timeres is not None:
command += [f'avg.timeresolution={timeres}']

command += ['steps=' + str(steps).replace(" ", "").replace("\'", "")]

Expand Down Expand Up @@ -709,6 +753,7 @@ def parse_args():
parser.add_argument('--even_time_avg', action='store_true', help='(only if --forwidefield) only allow even time averaging (in case of stacking nights with different averaging)')
parser.add_argument('--inverse', action='store_true', help='instead of subtracting, you predict and add model data from a single facet')
parser.add_argument('--scratch_toil', action='store_true', help='Experts only: Run on scratch when using toil')
parser.add_argument('--speedup_facet_subtract', action='store_true', help='DP3 speedup for facet subtraction')

return parser.parse_args()

Expand Down Expand Up @@ -892,7 +937,7 @@ def main():

# run DP3
msout = subpred.run_DP3(phaseshift=phasecenter, freqavg=freqavg, timeres=timeres,
concat=args.concat, applybeam=args.applybeam,
concat=args.concat, applybeam=args.applybeam, speedup_facet_subtract=args.speedup_facet_subtract,
applycal_h5=applycalh5 if not args.scratch_toil else applycalh5.split('/')[-1], dirname=dirname)

if args.scratch_toil:
Expand Down

0 comments on commit 78fbc41

Please sign in to comment.