Skip to content

Commit

Permalink
Add an option to run tasks so they raise an error during the run (#890)
Browse files Browse the repository at this point in the history
* bugfix: with some new signatures, need to remove duplicates

* add an option to raise within tasks

* typo
  • Loading branch information
oliche authored Dec 2, 2024
1 parent 136d6e0 commit 1382460
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 3 deletions.
2 changes: 1 addition & 1 deletion ibllib/oneibl/data_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ def getData(self, one=None):
one = one or self.one
session_datasets = one.list_datasets(one.path2eid(self.session_path), details=True)
dfs = [file.filter(session_datasets)[1] for file in self.signature['input_files']]
return one._cache.datasets.iloc[0:0] if len(dfs) == 0 else pd.concat(dfs)
return one._cache.datasets.iloc[0:0] if len(dfs) == 0 else pd.concat(dfs).drop_duplicates()

def getOutputFiles(self):
"""
Expand Down
8 changes: 6 additions & 2 deletions ibllib/pipes/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,10 @@ class Task(abc.ABC):
force = False # whether to re-download missing input files on local server if not present
job_size = 'small' # either 'small' or 'large', defines whether task should be run as part of the large or small job services
env = None # the environment name within which to run the task (NB: the env is not activated automatically!)
on_error = 'continue' # whether to raise an exception on error ('raise') or report the error and continue ('continue')

def __init__(self, session_path, parents=None, taskid=None, one=None,
machine=None, clobber=True, location='server', scratch_folder=None, **kwargs):
machine=None, clobber=True, location='server', scratch_folder=None, on_error='continue', **kwargs):
"""
Base task class
:param session_path: session path
Expand All @@ -129,6 +130,7 @@ def __init__(self, session_path, parents=None, taskid=None, one=None,
:param scratch_folder: optional: Path where to write intermediate temporary data
:param args: running arguments
"""
self.on_error = on_error
self.taskid = taskid
self.one = one
self.session_path = session_path
Expand Down Expand Up @@ -263,10 +265,12 @@ def run(self, **kwargs):
self.outputs = outputs if not self.outputs else self.outputs # ensure None if no inputs registered
else:
self.outputs.extend(ensure_list(outputs)) # Add output files to list of inputs to register
except Exception:
except Exception as e:
_logger.error(traceback.format_exc())
_logger.info(f'Job {self.__class__} errored')
self.status = -1
if self.on_error == 'raise':
raise e

self.time_elapsed_secs = time.time() - start_time
# log the outputs
Expand Down

0 comments on commit 1382460

Please sign in to comment.