From f04edcce536586d9fc6acd47d9464d4af8d0c5cd Mon Sep 17 00:00:00 2001 From: RichieHakim Date: Tue, 2 Apr 2024 01:46:57 -0400 Subject: [PATCH] Add timeout functionality and file openability check --- bnpm/misc.py | 19 +++++++++++++ bnpm/path_helpers.py | 64 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+) diff --git a/bnpm/misc.py b/bnpm/misc.py index ff2739d..c827280 100644 --- a/bnpm/misc.py +++ b/bnpm/misc.py @@ -493,6 +493,25 @@ def __enter__(self): def __exit__(self, exc_type, exc_value, traceback): self.stack.__exit__(exc_type, exc_value, traceback) + +class TimeoutException(Exception): + pass +@contextmanager +def time_limit(seconds): + """ + Wrapper to set a time limit for a block of code, after which a + TimeoutException is raised. + """ + import signal + def signal_handler(signum, frame): + raise TimeoutException("Timed out") + signal.signal(signal.SIGALRM, signal_handler) + signal.alarm(seconds) + try: + yield + finally: + signal.alarm(0) + ######################################################### ############ INTRA-MODULE HELPER FUNCTIONS ############## diff --git a/bnpm/path_helpers.py b/bnpm/path_helpers.py index b8923de..b9e26a7 100644 --- a/bnpm/path_helpers.py +++ b/bnpm/path_helpers.py @@ -8,6 +8,8 @@ from pathlib import Path import re +from . import misc + def mkdir(directory, parents=True, exist_ok=True): ''' Create a directory if it doesn't exist. @@ -344,3 +346,65 @@ def _finder(regexs, parts): break return date + + +def check_files_openable(dir_outer, depth=2, time_limit_per_file=1, verbose=False): + """ + Check if files within an outer directory are able to be opened. \n + RH 2024 + + Args: + dir_outer (str): + Path to the outer directory + depth (int): + Maximum depth of subdirectories to search. \n + Depth=0 means only the outer directory. \n + Default is 2. + time_limit_per_file (int): + Time limit in seconds for checking if a file can be opened. + verbose (bool): + Whether to print the files that can't be opened. \n + Default is False. + + Returns: + (dict): + Dictionary with keys as the file paths and values as booleans + indicating whether the file can be opened. + """ + import os + + def check_file_openable(file_path): + """ + Check if a file can be opened. + """ + try: + with misc.time_limit(time_limit_per_file): + with open(file_path, 'rb') as f: + ## Read a maximum of 1024 bytes. If file is smaller, read the whole file. + f.read(1024) + print(f"File {file_path} can be opened.") if verbose > 1 else None + return True + except misc.TimeoutException: + print(f"File {file_path} took too long to open.") if verbose > 1 else None + return False + except Exception as e: + print(f"File {file_path} could not be opened: {e}") if verbose > 0 else None + return False + + def walk_files(dir_outer, depth): + """ + Walk through files in a directory. + """ + files = [] + for root, dirs, filenames in os.walk(dir_outer): + if depth < 0: + break + for filename in filenames: + files.append(os.path.join(root, filename)) + depth -= 1 + return files + + files = walk_files(dir_outer, depth) + file_openable = {file: check_file_openable(file) for file in files} + return file_openable + \ No newline at end of file