Skip to content

Commit

Permalink
Merge pull request #14 from YeChen-IDM/analyzer_pipeline
Browse files Browse the repository at this point in the history
Add analyzers to snakemake pipeline, download work item and suite features
  • Loading branch information
MAmbrose-IDM authored Jun 1, 2022
2 parents 9b97c3a + ff978ce commit 8efcc90
Show file tree
Hide file tree
Showing 11 changed files with 486 additions and 157 deletions.
48 changes: 48 additions & 0 deletions simulations/add_suite.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from COMPS.Data import Experiment
from idmtools.core.platform_factory import Platform
from idmtools.entities import Suite
from simulations.load_inputs import load_sites
import simulations.manifest as manifest
from simulations.helpers import get_comps_id_filename


def add_suite(sites: list, suite_name: str = 'Malaria Model Validation Suite') -> object:
"""
Add all experiments to the suite of the 1st experiment, if the 1st experiment doesn't belong to any suite, add them
to a new suite.
Args:
sites (): list of site names
suite_name (): str for suite name
Returns:
suite_id
"""
platform = Platform(manifest.platform_name)
first_exp_found = True
for site in sites:
exp_id_file = get_comps_id_filename(site, level=0)
with open(exp_id_file, "r") as f:
exp_id = f.readline().rstrip()
exp = Experiment.get(exp_id)
if first_exp_found:
first_exp_found = False
if exp.suite_id:
suite_id = exp.suite_id
print(f"Found existing suite {suite_id} for the 1st experiment, will save to this suite instead.")
continue
else:
suite = Suite(name=suite_name)
suite.update_tags({'name': suite_name})
platform.create_items([suite])
suite_id = suite.id
print(f"Try to add all experiments to suite.id={suite_id}:")
exp.suite_id = suite_id
exp.save()
with open(manifest.suite_id_file, "w") as file:
file.write(str(suite_id))
return suite_id


if __name__ == '__main__':
all_sites, _, _ = load_sites()
add_suite(all_sites)
56 changes: 56 additions & 0 deletions simulations/download_wi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import argparse
import simulations.params as params
import simulations.manifest as manifest
from simulations.helpers import get_comps_id_filename, get_suite_id

from idmtools.core.platform_factory import Platform
from idmtools_platform_comps.utils.download.download import DownloadWorkItem
from idmtools.core import ItemType


def download_output(site: str, platform: Platform = None) -> bool:
"""
Download output csv files to output folder from analyzer work item for given site.
Args:
site ():
platform ():
Returns: status of download work item
"""
if not platform:
platform = Platform(manifest.platform_name)
analyzer_id_file = get_comps_id_filename(site, level=2)
with open(analyzer_id_file, 'r') as id_file:
wi_id = id_file.readline()
wi_id = platform.get_item(wi_id, item_type=ItemType.WORKFLOW_ITEM)
dl_wi = DownloadWorkItem(
output_path="output",
delete_after_download=False,
extract_after_download=True,
zip_name=f"{site}.zip",
file_patterns=[f"{site}/**"]
)
dl_wi.related_work_items = [wi_id]
suite_id = get_suite_id()
dl_wi.tags['Suite'] = suite_id
dl_wi.run(wait_until_done=True, platform=platform)

# Check result
if not dl_wi.succeeded:
print(f"Download work item {dl_wi.uid} failed.\n")
else:
print(f"Download work item {dl_wi.uid} succeeded.")
download_id_file = get_comps_id_filename(site, level=3)
with open(download_id_file, 'w') as id_file:
id_file.write(dl_wi.uid.hex)

return dl_wi.succeeded


if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Process site name')
parser.add_argument('--site', '-s', type=str, help='site name',
default=params.sites[0]) # not sure if we want to make this required argument
args = parser.parse_args()
download_output(args.site)
29 changes: 20 additions & 9 deletions simulations/generate_site_rules.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,53 @@
from simulations.load_inputs import load_sites
from simulations.helpers import get_comps_id_filename

sites, nSims, script_names = load_sites()


def generate_rule(site, n, script_name="run_sims.py"):

exp_id_file = get_comps_id_filename(site=site)
analyzer_id_file = get_comps_id_filename(site=site, level=2)
download_id_file = get_comps_id_filename(site=site, level=3)
rule = f"""
rule {site}_run_sim:
input:
output: "COMPS_ID/{site}_COMPS_ID_submit"
output: '{exp_id_file}'
priority: 50
run:
shell(get_command("{site}", script="{script_name}", n={n}))
shell(get_command(script="{script_name}", site="{site}", n={n}))
rule {site}_analyzer:
input: "COMPS_ID/{site}_COMPS_ID_submit"
output: "COMPS_ID/{site}_COMPS_ID_done"
input: '{exp_id_file}'
output: '{analyzer_id_file}'
run:
shell(get_command(script="run_analyzers.py", site="{site}"))
rule {site}_download:
input: '{analyzer_id_file}'
output: '{download_id_file}'
run:
shell(get_command("{site}", script="wait_for_experiment.py"))
shell(get_command(script="download_wi.py", site="{site}"))
"""
return rule


def run(snakefile='snakefile_bak'):
with open(snakefile, 'r') as file:
snakefile_str = file.read()
snakefile_str = delete_old_rules(sites, nSims, script_names, snakefile_str)
write_rules(sites, nSims, script_names, snakefile_str, 'snakefile')
snakefile_str = delete_old_rules(snakefile_str)
write_rules(snakefile_str, 'snakefile')


def delete_old_rules(sites, nSims, script_names, snakefile_str):
def delete_old_rules(snakefile_str):
for site, n, script_name in zip(sites, nSims, script_names):
if f'rule {site}' in snakefile_str:
rule = generate_rule(site, n, script_name)
snakefile_str = snakefile_str.replace(rule, '')
return snakefile_str


def write_rules(sites, nSims, script_names, snakefile_str, snakefile):
def write_rules(snakefile_str, snakefile):
new_rules = []
for site, n, script_name in zip(sites, nSims, script_names):
if f'rule {site}' not in snakefile_str:
Expand Down
32 changes: 30 additions & 2 deletions simulations/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,7 @@ def set_simulation_scenario(simulation, site, csv_path):


set_simulation_scenario_for_matched_site = partial(set_simulation_scenario, csv_path=manifest.simulation_coordinator_path)
# TODO: update csv filename in manifest.py and next line
set_simulation_scenario_for_characteristic_site = partial(set_simulation_scenario, csv_path=manifest.simulation_coordinator_path)
set_simulation_scenario_for_characteristic_site = partial(set_simulation_scenario, csv_path=manifest.sweep_sim_coordinator_path)


def build_standard_campaign_object(manifest):
Expand Down Expand Up @@ -335,3 +334,32 @@ def build_demog():
return demog


def get_comps_id_filename(site: str, level: int = 0):
folder_name = manifest.comps_id_folder
if level == 0:
return folder_name + site + '_exp_submit'
elif level == 1:
return folder_name + site + '_exp_done'
elif level == 2:
return folder_name + site + '_analyzers'
else:
return folder_name + site + '_download'


def load_coordinator_df(characteristic=False, set_index=True):
csv_file = manifest.sweep_sim_coordinator_path if characteristic else manifest.simulation_coordinator_path
coord_df = pd.read_csv(csv_file)
if set_index:
coord_df = coord_df.set_index('site')
return coord_df


def get_suite_id():
if os.path.exists(manifest.suite_id_file):
with open(manifest.suite_id_file, 'r') as id_file:
suite_id = id_file.readline()
return suite_id
else:
return 0


4 changes: 2 additions & 2 deletions simulations/load_inputs.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import pandas as pd
import simulations.manifest as manifest
from simulations.helpers import load_coordinator_df


def load_sites():
skipped_sites = list()

coord_df = pd.read_csv(manifest.simulation_coordinator_path)
coord_df = coord_df.set_index('site')
coord_df = load_coordinator_df(characteristic=False, set_index=True)
unfiltered_sites = coord_df.index.tolist()
for site in unfiltered_sites:
eir_df = pd.read_csv(manifest.input_files_path / coord_df.at[site, 'EIR_filepath'])
Expand Down
7 changes: 4 additions & 3 deletions simulations/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
# Create 'Assets' directory or change to a path you prefer. idmtools will upload files found here.
assets_input_dir = CURRENT_DIR / "Assets"
plugins_folder = CURRENT_DIR / "download/reporter_plugins"
analyzed_ouptut_path = PROJECT_DIR / 'EMOD_validation_recalibration/simulation_output'
analyzed_ouptut_path = PROJECT_DIR / "EMOD_validation_recalibration/simulation_output"
comps_id_folder = "./COMPS_ID/"
suite_id_file = comps_id_folder + 'Suite'

# TODO: remove following lines
input_files_path = PROJECT_DIR / "simulation_inputs"
asset_path = input_files_path / "demographics_files/demographics_vital_1000.json"
# todo: add path for the csv file for site charatistic sweeps
simulation_coordinator_path = input_files_path / "simulation_coordinator.csv"
sweep_sim_coordinator_path = input_files_path / "sweep_sim_coordinator.csv"
intervention_visualizer_path = CURRENT_DIR / "download/index.html"

my_ep4_assets = None
Expand All @@ -29,4 +31,3 @@
priority = 'Normal'
node_group_private = 'idm_48cores'
node_group = 'idm_abcd'

Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# use the coordinator csv to determine which analyzers should be run for each site
import pandas as pd
import argparse
import simulations.params as params
import simulations.manifest as manifest
from simulations.helpers import get_comps_id_filename, load_coordinator_df, get_suite_id

from idmtools.core.platform_factory import Platform
from idmtools.analysis.platform_anaylsis import PlatformAnalysis
Expand All @@ -9,33 +10,30 @@
from simulations.analyzers.InfectiousnessByParDensAgeAnalyzer import InfectiousnessByParDensAgeAnalyzer
from simulations.analyzers.PatientReportAnalyzer import PatientAnalyzer
from simulations.analyzers.MonthlySummaryReportAnalyzer import MonthlySummaryReportAnalyzer
from simulations.wait_for_experiment import check_experiment


# TODO: replace manual specification of exp ids once we are using a single thread to track experiments
exp_name_id = {
# 'validation_navrongo_2000': '71cb33c9-e5cc-ec11-a9f8-b88303911bc1',
# 'validation_sotuba_1999': '0bcc33c9-e5cc-ec11-a9f8-b88303911bc1',
# 'validation_dongubougou_1999': '12cc33c9-e5cc-ec11-a9f8-b88303911bc1',
# 'validation_sugungum_1970': '0ccc33c9-e5cc-ec11-a9f8-b88303911bc1',
# 'validation_ebolakounou_1997': '6ccb33c9-e5cc-ec11-a9f8-b88303911bc1',
# 'validation_dielmo_1990': '6ecb33c9-e5cc-ec11-a9f8-b88303911bc1',
# 'validation_chonyi_1999': '70cb33c9-e5cc-ec11-a9f8-b88303911bc1',
# 'validation_ngerenya_1999': '68cb33c9-e5cc-ec11-a9f8-b88303911bc1',
# 'validation_ndiop_1993': '69cb33c9-e5cc-ec11-a9f8-b88303911bc1',
# 'validation_matsari_1970': '6dcb33c9-e5cc-ec11-a9f8-b88303911bc1',
# 'validation_rafin_marke_1970': '6fcb33c9-e5cc-ec11-a9f8-b88303911bc1',
'validation_laye_2007': 'ee4df6e2-5ed5-ec11-a9f8-b88303911bc1',
'validation_dapelogo_2007': 'f24df6e2-5ed5-ec11-a9f8-b88303911bc1', # 6bcb33c9-e5cc-ec11-a9f8-b88303911bc1
# 'validation_koundou_1997': '67cb33c9-e5cc-ec11-a9f8-b88303911bc1',
}
def run_analyzers(site: str, characteristic: bool = False) -> (bool, str):
"""
Wait for experiment to be done and run relevant analyzers for site on Comps with SSMT
Args:
site ():
characteristic ():
if __name__ == "__main__":
platform = Platform('CALCULON')
coord_df = pd.read_csv(manifest.simulation_coordinator_path)
coord_df = coord_df.set_index('site')
Returns: If experiment is succeeded, returns analyzer work item status and id,
if not, return experiment status and id.
"""
platform = Platform(manifest.platform_name)
comps_id_file = get_comps_id_filename(site=site)
with open(comps_id_file, 'r') as id_file:
exp_id = id_file.readline()
# Wait for experiment to be done
if check_experiment(site, platform):
coord_df = load_coordinator_df(characteristic=characteristic, set_index=True)

for expt_name, id in exp_name_id.items():
site = expt_name.replace('validation_', '')
# for expt_name, id in exp_name_id.items():
# site = expt_name.replace('validation_', '')
report_start_day = int(coord_df.at[site, 'report_start_day'])
# determine the analyzers to run for each site
analyzers = []
Expand All @@ -45,20 +43,20 @@
analyzers.append(ParDensAgeAnalyzer)
analyzer_args.append({'expt_name': site,
'sweep_variables': ['Run_Number', 'Site'],
'start_year': int(report_start_day/365),
'end_year': int(coord_df.at[site, 'simulation_duration']/365)})
'start_year': int(report_start_day / 365),
'end_year': int(coord_df.at[site, 'simulation_duration'] / 365)})
if coord_df.at[site, 'infectiousness_to_mosquitos']:
analyzers.append(InfectiousnessByParDensAgeAnalyzer)
analyzer_args.append({'expt_name': site,
'sweep_variables': ['Run_Number', 'Site'],
'start_year': int(report_start_day/365),
'end_year': int(coord_df.at[site, 'simulation_duration']/365)})
'start_year': int(report_start_day / 365),
'end_year': int(coord_df.at[site, 'simulation_duration'] / 365)})
if coord_df.at[site, 'age_prevalence']:
analyzers.append(MonthlySummaryReportAnalyzer)
analyzer_args.append({'expt_name': site,
'sweep_variables': ['Run_Number', 'Site'],
'start_year': int(report_start_day / 365),
'end_year': int(coord_df.at[site, 'simulation_duration']/365)})
'end_year': int(coord_df.at[site, 'simulation_duration'] / 365)})
if coord_df.at[site, 'include_AnnualMalariaSummaryReport']:
analyzers.append(AnnualSummaryReportAnalyzer)
analyzer_args.append({'expt_name': site,
Expand All @@ -68,11 +66,33 @@
analyzer_args.append({'expt_name': site,
'start_report_day': report_start_day})

analysis = PlatformAnalysis(platform=platform, experiment_ids=[id],
analysis = PlatformAnalysis(platform=platform, experiment_ids=[exp_id],
analyzers=analyzers,
analyzers_args=analyzer_args,
analysis_name=site)

suite_id = get_suite_id()
analysis.tags = {'Suite': suite_id}
analysis.analyze(check_status=True)

wi = analysis.get_work_item()
print(wi)
analyzers_id_file = get_comps_id_filename(site=site, level=2)

if wi.succeeded:
print(f"Analyzer work item {wi.uid} succeeded.\n")
with open(analyzers_id_file, 'w') as id_file:
id_file.write(wi.uid.hex)
else:
print(f"Analyzer work item {wi.uid} failed.")

return wi.succeeded, wi.uid
else:
return False, exp_id


if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Process site name')
parser.add_argument('--site', '-s', type=str, help='site name',
default=params.sites[0]) # not sure if we want to make this required argument
args = parser.parse_args()
run_analyzers(args.site)
5 changes: 3 additions & 2 deletions simulations/run_sims.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,15 @@ def general_sim(site=None, nSims=1, characteristic=False, priority=manifest.prio
builder.add_sweep_definition(set_simulation_scenario_for_matched_site, [site])

# create experiment from builder
print( f"Prompting for COMPS creds if necessary..." )
print(f"Prompting for COMPS creds if necessary...")
experiment = Experiment.from_builder(builder, task, name=exp_name)

# The last step is to call run() on the ExperimentManager to run the simulations.
experiment.run(wait_until_done=False, platform=platform)

# Save experiment id to file
with open(f"COMPS_ID/{site}_COMPS_ID_submit", "w") as fd:
comps_id_file = get_comps_id_filename(site=site)
with open(comps_id_file, "w") as fd:
fd.write(experiment.uid.hex)
print()
print(experiment.uid.hex)
Expand Down
Loading

0 comments on commit 8efcc90

Please sign in to comment.