diff --git a/ibllib/oneibl/data_handlers.py b/ibllib/oneibl/data_handlers.py index 2ccc0960d..a0b085150 100644 --- a/ibllib/oneibl/data_handlers.py +++ b/ibllib/oneibl/data_handlers.py @@ -563,7 +563,7 @@ def getData(self, one=None): one = one or self.one session_datasets = one.list_datasets(one.path2eid(self.session_path), details=True) dfs = [file.filter(session_datasets)[1] for file in self.signature['input_files']] - return one._cache.datasets.iloc[0:0] if len(dfs) == 0 else pd.concat(dfs) + return one._cache.datasets.iloc[0:0] if len(dfs) == 0 else pd.concat(dfs).drop_duplicates() def getOutputFiles(self): """ diff --git a/ibllib/pipes/tasks.py b/ibllib/pipes/tasks.py index d6f7fe1cd..660c6502d 100644 --- a/ibllib/pipes/tasks.py +++ b/ibllib/pipes/tasks.py @@ -112,9 +112,10 @@ class Task(abc.ABC): force = False # whether to re-download missing input files on local server if not present job_size = 'small' # either 'small' or 'large', defines whether task should be run as part of the large or small job services env = None # the environment name within which to run the task (NB: the env is not activated automatically!) + on_error = 'continue' # whether to raise an exception on error ('raise') or report the error and continue ('continue') def __init__(self, session_path, parents=None, taskid=None, one=None, - machine=None, clobber=True, location='server', scratch_folder=None, **kwargs): + machine=None, clobber=True, location='server', scratch_folder=None, on_error='continue', **kwargs): """ Base task class :param session_path: session path @@ -129,6 +130,7 @@ def __init__(self, session_path, parents=None, taskid=None, one=None, :param scratch_folder: optional: Path where to write intermediate temporary data :param args: running arguments """ + self.on_error = on_error self.taskid = taskid self.one = one self.session_path = session_path @@ -263,10 +265,12 @@ def run(self, **kwargs): self.outputs = outputs if not self.outputs else self.outputs # ensure None if no inputs registered else: self.outputs.extend(ensure_list(outputs)) # Add output files to list of inputs to register - except Exception: + except Exception as e: _logger.error(traceback.format_exc()) _logger.info(f'Job {self.__class__} errored') self.status = -1 + if self.on_error == 'raise': + raise e self.time_elapsed_secs = time.time() - start_time # log the outputs