diff --git a/app.py b/app.py index 7205952..3bec32e 100755 --- a/app.py +++ b/app.py @@ -1,68 +1,88 @@ -"""Define the Flask-based server app basics """ +"""Main module for the CRISPRme web application. + +This module sets up the Flask and Dash applications, configures the server, and initializes the web application. It also defines stylesheets, caching, and various application settings. + +Attributes: + WEBADDRESS (str): The web address for accessing the application. + IPADDRESS (str): The IP address and port for local access to the web application. + URL (str): The server URL. + external_stylesheets (list): List of external stylesheets for the web application. + server (Flask): The Flask server instance. + app (Dash): The Dash application instance. + app_directory (str): The directory of the current application. + current_working_directory (str): The current working directory. + operators (list): List of filtering operators used for querying tables. + ONLINE (bool): Flag indicating if the application is online or offline. + DISPLAY_OFFLINE (str): CSS display property for offline mode. + DISPLAY_ONLINE (str): CSS display property for online mode. + pool_executor (ProcessPoolExecutor): Executor for running multiple jobs + concurrently. + CACHE_CONFIG (dict): Configuration settings for caching. + cache (Cache): Cache instance for the application. +""" from flask_caching import Cache import dash_bootstrap_components as dbc -import concurrent.futures +import concurrent.futures import flask import dash import sys import os - +WEBADDRESS = "http://crisprme.di.univr.it" IPADDRESS = "127.0.0.1:8080" -def __start_message() -> None: - """ (PRIVATE) - Write server start message to stderr. +def _start_message() -> None: + """Prints a startup message to the standard error stream. - ... + This function outputs a message indicating that the server has started and + provides the URL to access the web application. - Parameters - ---------- - None + Args: + None - Returns - ------- - None + Returns: + None """ sys.stderr.write("SERVER STARTED\n") - sys.stderr.write(f"GO TO {IPADDRESS} TO USE THE WEB APP\n\n") + sys.stderr.write(f"GO TO http://{IPADDRESS} TO USE THE WEB APP\n\n") URL = "" # server URL external_stylesheets = [ - "https://codepen.io/chriddyp/pen/bWLwgP.css", dbc.themes.BOOTSTRAP + "https://codepen.io/chriddyp/pen/bWLwgP.css", + dbc.themes.BOOTSTRAP, ] # CSS stylesheet used to style the website server = flask.Flask(__name__) # define flask app.server app = dash.Dash( - __name__, - external_stylesheets=external_stylesheets, - suppress_callback_exceptions=True, - server=server + __name__, + external_stylesheets=external_stylesheets, + suppress_callback_exceptions=True, + server=server, # type: ignore ) # initialize server app app_directory = os.path.dirname(os.path.realpath(__file__)) # current location -__start_message() # print server start message -current_working_directory = os.getcwd() + '/' # This for files TODO: remove -app.title = "CRISPRme" # assign flask app name +_start_message() # print server start message +current_working_directory = os.getcwd() + "/" # This for files TODO: remove +app.title = "CRISPRme" # type: ignore - assign flask app name # necessary if update element in a callback generated in another callback # app.config['suppress_callback_exceptions'] = True app.css.config.serve_locally = True app.scripts.config.serve_locally = True # define filtering operators used when querying tables operators = [ - ["ge ", ">="], - ["le ", "<="], - ["lt ", "<"], - ["gt ", ">"], + ["ge ", ">="], + ["le ", "<="], + ["lt ", "<"], + ["gt ", ">"], ["ne ", "!="], ["eq ", "="], - ["contains "] -] -ONLINE = False # NOTE change to True for online version, False for offline + ["contains "], +] +ONLINE = True # NOTE change to True for online version, False for offline DISPLAY_OFFLINE = "" DISPLAY_ONLINE = "" if ONLINE: @@ -75,8 +95,7 @@ def __start_message() -> None: CACHE_CONFIG = { # try 'filesystem' if you don't want to setup redis "CACHE_TYPE": "filesystem", - "CACHE_DIR": ("Cache") # os.environ.get('REDIS_URL', 'localhost:6379') + "CACHE_DIR": ("Cache"), # os.environ.get('REDIS_URL', 'localhost:6379') } cache = Cache() # initialize cache cache.init_app(app.server, config=CACHE_CONFIG) # start web-app - diff --git a/index.py b/index.py index 94655d0..ed470e7 100755 --- a/index.py +++ b/index.py @@ -1,19 +1,39 @@ #!/usr/bin/env python -"""The script creates the index page, constituting the back bone of CRISPRme web -interface. The web interface can also be created locally, to provide an easy-to-use -GUI to submit not released or private data and perform the off-targets search. +"""Main module for the CRISPRme web application. + +This module initializes the web application, sets up the server, and defines the +layout and callbacks for page navigation. It handles the routing of different pages +based on the URL and manages the server's running mode. + +Attributes: + MODEFILE (str): The filename for storing the running mode. + HOST (str): The host address for the server. + PORTWEB (int): The port number for the website. + PORTLOCAL (int): The port number for the local server. + server (Flask): The Flask server instance. + navbar (html.Div): The navigation bar component of the web application. + app.layout (html.Div): The layout structure of the Dash application. """ + from pages import ( - main_page, - navbar_creation, - results_page, - load_page, - history_page, - help_page, - contacts_page + main_page, + navbar_creation, + results_page, + load_page, + history_page, + help_page, + contacts_page, +) +from app import ( + URL, + IPADDRESS, + WEBADDRESS, + ONLINE, + app, + current_working_directory, + cache, ) -from app import app, URL, current_working_directory, cache from utils import check_directories from dash.dependencies import Input, Output, State @@ -38,9 +58,9 @@ app.layout = html.Div( [ navbar, - dcc.Location(id='url', refresh=False), - html.Div(id='page-content'), - html.P(id='signal', style={'visibility': 'hidden'}) + dcc.Location(id="url", refresh=False), + html.Div(id="page-content"), + html.P(id="signal", style={"visibility": "hidden"}), ] ) @@ -49,27 +69,27 @@ @app.callback( [Output("page-content", "children"), Output("job-link", "children")], [Input("url", "href"), Input("url", "pathname"), Input("url", "search")], - [State("url", "hash")] + [State("url", "hash")], ) def change_page(href: str, path: str, search: str, hash_guide: str) -> Tuple: - """The function switches between the selected pages. - - ... - - Parameters - ---------- - href : str - URL - path : str - Current path - search : str - Current search - hash_guide : str - Guide hash - - Returns - ------- - Tuple + """Handles page changes based on the current URL and its components. + + This callback function updates the content of the web application based on + the provided URL parameters. It determines which page to display and returns + the corresponding content and link based on the path and search parameters. + + Args: + href (str): The full URL of the current page. + path (str): The path component of the URL. + search (str): The query string of the URL. + hash_guide (str): The hash component of the URL. + + Returns: + Tuple: A tuple containing the children for the page content and the job + link. + + Raises: + TypeError: If any of the input parameters are not of type str. """ if not isinstance(href, str): @@ -81,99 +101,86 @@ def change_page(href: str, path: str, search: str, hash_guide: str) -> Tuple: if not isinstance(hash_guide, str): raise TypeError(f"Expected {str.__name__}, got {type(hash_guide).__name__}") if path == "/load": # show loading page - return ( - load_page.load_page(), - os.path.join("".join(href.split("/")[:-1]), "/load", search) - ) + # define url to display on load page to check on job status + # if online show the webaddress, show the ip address otherwise + job_loading_url = WEBADDRESS if ONLINE else IPADDRESS + return (load_page.load_page(), f"{job_loading_url}/load{search}") if path == "/result": # display results page - job_id = search.split("=")[-1] + job_id = search.split("=")[-1] # recover job id from url if not hash_guide or hash_guide is None: return results_page.result_page(job_id), os.path.join(URL, "load", search) - elif "new" in hash_guide: # TODO: change name to guide page + elif "new" in hash_guide: # targets table tab return ( results_page.guidePagev3(job_id, hash_guide.split("#")[1]), - os.path.join(URL, "load", search) + os.path.join(URL, "load", search), ) - elif "-Sample-" in hash_guide: + elif "-Sample-" in hash_guide: # sample tab return ( results_page.sample_page(job_id, hash_guide.split("#")[1]), - os.path.join(URL, "load", search) + os.path.join(URL, "load", search), ) - elif "-Pos-" in hash_guide: + elif "-Pos-" in hash_guide: # genomic region tab return ( results_page.cluster_page(job_id, hash_guide.split("#")[1]), - os.path.join(URL, "load", search) + os.path.join(URL, "load", search), ) return results_page.result_page(job_id), os.path.join(URL, "load", search) - if path == "/user-guide": # display manual page return help_page.helpPage(), os.path.join(URL, "load", search) if path == "/contacts": # display contacts page return contacts_page.contact_page(), os.path.join(URL, "load", search) if path == "/history": # display results history page return history_page.history_page(), os.path.join(URL, "load", search) - if path == "/index": # display main page - return main_page.index_page(), "/index" - return main_page.index_page(), "/index" + return main_page.index_page(), "/index" # display main page def index(): - """The function creates the index page, managing the whole CRISPRme web - interface. The webpage displays four main pages (accessible by the user): - - Home page - - Manual page - - Contacts page - - History page (accessible only locally) + """Starts the CRISPRme web application server. - The webpage can be created locally, using the appropriate command line - arguments. + This function checks the directory structure for consistency, determines the + running mode (local or website), and starts the Dash application server + accordingly. It also handles the creation of a mode file to track the current + running mode and clears the cache before starting the server. - ... + Args: + None - Parameters - ---------- - None + Returns: + None - Returns - ------- - None + Raises: + OSError: If there is an issue writing the mode file. """ # check CRISPRme directory tree consistency check_directories(current_working_directory) - # check if debug mode is active # TODO: replace using argparse in crisprme.py - debug = False - if "--debug" in sys.argv[1:]: - debug = True - # check if local server or website - website = False - if "--website" in sys.argv[1:]: - website = True - # keep track of running mode - try: - handle = open(os.path.join(current_working_directory, MODEFILE), mode="w") - if website: # 'server' mode - handle.write("server") - else: # 'local' mode - handle.write("local") - except OSError: - raise OSError(f"An error occurred while writing {MODEFILE}") - finally: - handle.close() - if website: + debug = "--debug" in sys.argv[1:] # check if debug mode is active + website = "--website" in sys.argv[1:] # check if local server or website + try: # keep track of the running mode (debugging purposes) + modefname = os.path.join(current_working_directory, MODEFILE) + with open(modefname, mode="w") as outfile: + mode = "server" if website else "local" + outfile.write(mode) + except IOError as e: + raise OSError("Cannot write mode file") from e + if website: # online web-interface running app.run_server( - host=HOST, port=PORTWEB, debug=debug, dev_tools_ui=debug, dev_tools_props_check=debug + host=HOST, + port=PORTWEB, # type: ignore + debug=debug, + dev_tools_ui=debug, + dev_tools_props_check=debug, ) - cache.clear() # clear cache once server is closed else: # local web-interface running app.run_server( - host=HOST, port=PORTLOCAL, debug=debug, dev_tools_ui=debug, dev_tools_props_check=debug - ) - app.run_server( - host=HOST, port=PORTWEB, debug=debug, dev_tools_ui=debug, dev_tools_props_check=debug + host=HOST, + port=PORTLOCAL, # type: ignore + debug=debug, + dev_tools_ui=debug, + dev_tools_props_check=debug, ) - cache.clear() # clear cache once server is closed + cache.clear() # clear cache once server is closed if __name__ == "__main__": diff --git a/pages/__init__.py b/pages/__init__.py deleted file mode 100755 index e69de29..0000000 diff --git a/pages/main_page.py b/pages/main_page.py index 886a06e..e10119d 100755 --- a/pages/main_page.py +++ b/pages/main_page.py @@ -2,8 +2,6 @@ The main webpage reads the input data and manages the analysis. """ - -import re from seq_script import extract_seq, convert_pam from .pages_utils import ( ANNOTATIONS_DIR, @@ -94,14 +92,12 @@ def split_filter_part(filter_part: str) -> Tuple: """ if not isinstance(filter_part, str): - raise TypeError( - f"Expected {str.__name__}, got {type(filter_part).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(filter_part).__name__}") for operator_type in operators: for operator in operator_type: if operator in filter_part: name_part, value_part = filter_part.split(operator, 1) - name = name_part[(name_part.find( - "{") + 1): name_part.rfind("}")] + name = name_part[(name_part.find("{") + 1) : name_part.rfind("}")] value_part = value_part.strip() v0 = value_part[0] if v0 == value_part[-1] and v0 in ("'", '"', "`"): @@ -131,7 +127,7 @@ def split_filter_part(filter_part: str) -> Tuple: Output("be-window-start", "value"), Output("be-window-stop", "value"), Output("be-nts", "value"), - Output("radio-base_editor", "value") + Output("radio-base_editor", "value"), ], [Input("load-example-button", "n_clicks")], ) @@ -163,7 +159,7 @@ def load_example_data(load_button_click: int) -> List[str]: "4", # start window in base editor "8", # stop window in base editor "A", # nt to check in base editor - "Y" # base editor radio button to yes + "Y", # base editor radio button to yes ] @@ -296,20 +292,17 @@ def change_url( raise TypeError(f"Expected {str.__name__}, got {type(href).__name__}") if nuclease is not None: if not isinstance(nuclease, str): - raise TypeError( - f"Expected {str.__name__}, got {type(nuclease).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(nuclease).__name__}") if genome_selected is not None: if not isinstance(genome_selected, str): raise TypeError( f"Expected {str.__name__}, got {type(genome_selected).__name__}" ) if not isinstance(ref_var, list): - raise TypeError( - f"Expected {list.__name__}, got {type(ref_var).__name__}") + raise TypeError(f"Expected {list.__name__}, got {type(ref_var).__name__}") if pam is not None: if not isinstance(pam, str): - raise TypeError( - f"Exepcted {str.__name__}, got {type(pam).__name__}") + raise TypeError(f"Exepcted {str.__name__}, got {type(pam).__name__}") if text_guides is not None: if not isinstance(text_guides, str): raise TypeError( @@ -323,16 +316,13 @@ def change_url( # raise TypeError(f"Expected {str.__name__}, got {type(dna).__name__}") if adv_opts is not None: if not isinstance(adv_opts, list): - raise TypeError( - f"Expected {list.__name__}, got {type(adv_opts).__name__}") + raise TypeError(f"Expected {list.__name__}, got {type(adv_opts).__name__}") if dest_email is not None: if not isinstance(dest_email, str): - raise TypeError( - f"Expected {str.__name__}, got {type(dest_email).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(dest_email).__name__}") if job_name is not None: if not isinstance(job_name, str): - raise TypeError( - f"Expected {str.__name__}, got {type(job_name).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(job_name).__name__}") if n is None: raise PreventUpdate # do not update the page # job start @@ -371,8 +361,7 @@ def change_url( d for d in os.listdir(os.path.join(current_working_directory, RESULTS_DIR)) if ( - os.path.isdir(os.path.join( - current_working_directory, RESULTS_DIR, d)) + os.path.isdir(os.path.join(current_working_directory, RESULTS_DIR, d)) and not d.startswith(".") # avoid hidden files/directories ) ] @@ -416,10 +405,8 @@ def change_url( ".bed", ] ) - annotation_dir = os.path.join( - current_working_directory, ANNOTATIONS_DIR) - annotation_tmp = os.path.join( - annotation_dir, f"ann_tmp_{job_id}.bed") + annotation_dir = os.path.join(current_working_directory, ANNOTATIONS_DIR) + annotation_tmp = os.path.join(annotation_dir, f"ann_tmp_{job_id}.bed") cmd = f"cp {os.path.join(annotation_dir, annotation_name)} {annotation_tmp}" code = subprocess.call(cmd, shell=True) if code != 0: @@ -550,8 +537,8 @@ def change_url( for seqname_and_seq in text_guides.split(">"): if not seqname_and_seq: continue - seqname = seqname_and_seq[:seqname_and_seq.find("\n")] - seq = seqname_and_seq[seqname_and_seq.find("\n"):] + seqname = seqname_and_seq[: seqname_and_seq.find("\n")] + seq = seqname_and_seq[seqname_and_seq.find("\n") :] seq = seq.strip() # remove endline if "chr" in seq: for line in seq.split("\n"): @@ -569,8 +556,9 @@ def change_url( ) else: seq_read = "".join(seq.split()).strip() - guides.extend(convert_pam.getGuides(seq_read, pam_char, - guide_seqlen, pam_begin)) + guides.extend( + convert_pam.getGuides(seq_read, pam_char, guide_seqlen, pam_begin) + ) guides = list(set(guides)) # remove potential duplicate guides # create new guides dataset if not guides: @@ -617,8 +605,7 @@ def change_url( ) else: text_guides = ( - text_guides.replace( - "\n", "N" * pam_len + "\n") + "N" * pam_len + text_guides.replace("\n", "N" * pam_len + "\n") + "N" * pam_len ) handle_guides.write(text_guides) except OSError as e: @@ -632,15 +619,15 @@ def change_url( if dna > rna: max_bulges = dna # base editing - if be_start is None or not bool(be_start) or radio_be_value == 'N': + if be_start is None or not bool(be_start) or radio_be_value == "N": be_start = 1 else: be_start = int(be_start) - if be_stop is None or not bool(be_stop) or radio_be_value == 'N': + if be_stop is None or not bool(be_stop) or radio_be_value == "N": be_stop = 0 else: be_stop = int(be_stop) - if be_nt is None or not bool(be_nt) or radio_be_value == 'N': + if be_nt is None or not bool(be_nt) or radio_be_value == "N": be_nt = "none" else: assert be_nt in DNA_ALPHABET @@ -693,16 +680,14 @@ def change_url( d for d in os.listdir(os.path.join(current_working_directory, RESULTS_DIR)) if ( - os.path.isdir(os.path.join( - current_working_directory, RESULTS_DIR, d)) + os.path.isdir(os.path.join(current_working_directory, RESULTS_DIR, d)) and not d.startswith(".") # ignore hidden directories ) ] computed_results_dirs.remove(job_id) # remove current job results for res_dir in computed_results_dirs: if os.path.exists( - os.path.join(current_working_directory, - RESULTS_DIR, res_dir, PARAMS_FILE) + os.path.join(current_working_directory, RESULTS_DIR, res_dir, PARAMS_FILE) ): if filecmp.cmp( os.path.join( @@ -712,29 +697,35 @@ def change_url( ): try: # old job guides - guides_old = open( - os.path.join( - current_working_directory, - RESULTS_DIR, - res_dir, - GUIDES_FILE, + guides_old = ( + open( + os.path.join( + current_working_directory, + RESULTS_DIR, + res_dir, + GUIDES_FILE, + ) ) - ).read().split("\n") + .read() + .split("\n") + ) # current job guides - guides_current = open( - os.path.join( - current_working_directory, - RESULTS_DIR, - job_id, - GUIDES_FILE, + guides_current = ( + open( + os.path.join( + current_working_directory, + RESULTS_DIR, + job_id, + GUIDES_FILE, + ) ) - ).read().split("\n") + .read() + .split("\n") + ) except OSError as e: raise e - if ( - collections.Counter(guides_old) == collections.Counter( - guides_current - ) + if collections.Counter(guides_old) == collections.Counter( + guides_current ): if os.path.exists( os.path.join( @@ -885,8 +876,7 @@ def change_url( cmd = f"rm -r {current_job_dir}" code = subprocess.call(cmd, shell=True) if code != 0: - raise ValueError( - f"An error occurred while running {cmd}") + raise ValueError(f"An error occurred while running {cmd}") return "/load", f"?job={res_dir}" else: # log file not found @@ -896,7 +886,7 @@ def change_url( current_working_directory, RESULTS_DIR, res_dir, - QUEUE_FILE + QUEUE_FILE, ) ): if send_email: @@ -918,10 +908,8 @@ def change_url( ), mode="a+", ) as handle_email: - handle_email.write( - "--OTHEREMAIL--") - handle_email.write( - f"{dest_email}\n") + handle_email.write("--OTHEREMAIL--") + handle_email.write(f"{dest_email}\n") handle_email.write( f"{''.join(href.split('/')[:-1])}/load?job={job_id}\n" ) @@ -948,8 +936,7 @@ def change_url( ), mode="w+", ) as handle_email: - handle_email.write( - f"{dest_email}\n") + handle_email.write(f"{dest_email}\n") handle_email.write( f"{''.join(href.split('/')[:-1])}/load?job={job_id}\n" ) @@ -983,13 +970,10 @@ def change_url( annotation = os.path.join( current_working_directory, ANNOTATIONS_DIR, annotation_name ) - pam_file = os.path.join( - current_working_directory, PAMS_DIR, f"{pam}.txt" - ) + pam_file = os.path.join(current_working_directory, PAMS_DIR, f"{pam}.txt") samples_ids = os.path.join(result_dir, SAMPLES_FILE_LIST) postprocess = os.path.join(app_directory, POSTPROCESS_DIR) - gencode = os.path.join(current_working_directory, - ANNOTATIONS_DIR, gencode_name) + gencode = os.path.join(current_working_directory, ANNOTATIONS_DIR, gencode_name) log_verbose = os.path.join(result_dir, "log_verbose.txt") log_error = os.path.join(result_dir, "log_error.txt") assert isinstance(dna, int) @@ -1082,8 +1066,7 @@ def check_input( raise TypeError(f"Expected {int.__name__}, got {type(n).__name__}") if is_open is not None: if not isinstance(is_open, bool): - raise TypeError( - f"Expected {bool.__name__}, got {type(is_open).__name__}") + raise TypeError(f"Expected {bool.__name__}, got {type(is_open).__name__}") print("Check input for JOB") if n is None: raise PreventUpdate # do not check data --> no trigger @@ -1139,8 +1122,7 @@ def check_input( elif guide_type != "GS": text_guides = text_guides.strip() if not all( - [len(g) == len(text_guides.split("\n")[0]) - for g in text_guides.split("\n")] + [len(g) == len(text_guides.split("\n")[0]) for g in text_guides.split("\n")] ): text_guides = select_same_len_guides(text_guides) # check PAM @@ -1156,7 +1138,7 @@ def check_input( pam_begin = True else: end_idx = index_pam_value - pam_char = pam_char.split()[0][(end_idx * (-1)):] + pam_char = pam_char.split()[0][(end_idx * (-1)) :] pam_begin = False except OSError as e: raise e @@ -1167,7 +1149,7 @@ def check_input( if not seqname_and_seq: continue seqname = seqname_and_seq[: seqname_and_seq.find("\n")] - seq = seqname_and_seq[seqname_and_seq.find("\n"):] + seq = seqname_and_seq[seqname_and_seq.find("\n") :] seq = seq.strip() if "chr" in seq: for line in seq.split("\n"): @@ -1250,9 +1232,7 @@ def check_input( if len(text_guides.split("\n")) > 1000000000: text_guides = "\n".join(text_guides.split("\n")[:1000000000]).strip() if no_guides: - text_update = { - "width": "300px", "height": "30px", "border": "1px solid red" - } + text_update = {"width": "300px", "height": "30px", "border": "1px solid red"} update_style = True miss_input_list.append( str( @@ -1351,8 +1331,7 @@ def is_email_valid(email: str) -> Dict[str, str]: """ if email is not None: if not isinstance(email, str): - raise TypeError( - f"Expected {str.__name__}, got {type(email).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(email).__name__}") if email is None: raise PreventUpdate # do not do anything if ("@" in email) and (len(email.split("@")) == 2): @@ -1439,8 +1418,7 @@ def change_disabled_vcf_dropdown(checklist_value: List) -> Tuple[bool, str]: @app.callback( - [Output("annotation-dropdown", "disabled"), - Output("annotation-dropdown", "value")], + [Output("annotation-dropdown", "disabled"), Output("annotation-dropdown", "value")], [Input("checklist-annotations", "value")], ) def change_disabled_annotation_dropdown(checklist_value: List) -> Tuple[bool, str]: @@ -1484,8 +1462,7 @@ def select_cas_pam_dropdown(casprot: str) -> List: """ if not isinstance(casprot, str): - raise TypeError( - f"Expected {str.__name__}, got {type(casprot).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(casprot).__name__}") available_pams = get_available_PAM() options = [ {"label": pam["label"], "value": pam["value"]} @@ -1513,12 +1490,10 @@ def change_placeholder_guide_textbox(guide_type: str) -> List: """ if not isinstance(guide_type, str): - raise TypeError( - f"Expected {str.__name__}, got {type(guide_type).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(guide_type).__name__}") place_holder_text = "" if guide_type == "IP": # individual spacers - place_holder_text = str( - "GAGTCCGAGCAGAAGAAGAA\n" "CCATCGGTGGCCGTTTGCCC") + place_holder_text = str("GAGTCCGAGCAGAAGAAGAA\n" "CCATCGGTGGCCGTTTGCCC") elif guide_type == "GS": # genomic sequences place_holder_text = str( ">sequence1\n" @@ -1535,8 +1510,7 @@ def change_placeholder_guide_textbox(guide_type: str) -> List: # change variants options @app.callback( - [Output("checklist-variants", "options"), - Output("vcf-dropdown", "options")], + [Output("checklist-variants", "options"), Output("vcf-dropdown", "options")], [Input("available-genome", "value")], ) def change_variants_checklist_state(genome_value: str) -> List: @@ -1635,7 +1609,7 @@ def index_page() -> html.Div: id="warning-list", ), dbc.ModalFooter( - dbc.Button("Close", id="close",className="modal-button") + dbc.Button("Close", id="close", className="modal-button") ), ], id="modal", @@ -1657,8 +1631,7 @@ def index_page() -> html.Div: ), dcc.Textarea( id="text-guides", - placeholder=str( - "GAGTCCGAGCAGAAGAAGAA\n" "CCATCGGTGGCCGTTTGCCC"), + placeholder=str("GAGTCCGAGCAGAAGAAGAA\n" "CCATCGGTGGCCGTTTGCCC"), style={"width": "300px", "height": "30px"}, ), dbc.FormText( @@ -1813,8 +1786,7 @@ def index_page() -> html.Div: [ html.Div( html.H4("Base editing?"), - style={"display": "inline-block", - "margin-right": "20px"} + style={"display": "inline-block", "margin-right": "20px"}, ), html.Div( dcc.RadioItems( @@ -1823,9 +1795,14 @@ def index_page() -> html.Div: {"label": "Yes", "value": "Y"}, {"label": "No", "value": "N"}, ], - value="N", labelStyle={"margin-right": "5px", "display": "inline-block"}), - style={"display": "inline-block"} - ) + value="N", + labelStyle={ + "margin-right": "5px", + "display": "inline-block", + }, + ), + style={"display": "inline-block"}, + ), ] ), html.Div( @@ -1839,8 +1816,7 @@ def index_page() -> html.Div: style={"width": "60px"}, ), ], - style={"display": "inline-block", - "margin-right": "20px"}, + style={"display": "inline-block", "margin-right": "20px"}, ), html.Div( # BE window stop dropdown [ @@ -1851,8 +1827,7 @@ def index_page() -> html.Div: style={"width": "60px"}, ), ], - style={"display": "inline-block", - "margin-right": "20px"}, + style={"display": "inline-block", "margin-right": "20px"}, ), html.Div( # BE nucleotides dropdown [ @@ -1864,12 +1839,12 @@ def index_page() -> html.Div: style={"width": "60px"}, ), ], - style={"display": "inline-block", - "margin-right": "20px"}, - ) + style={"display": "inline-block", "margin-right": "20px"}, + ), ], - id="div-base-editor-dropdowns", style={"display": "none"} - ) + id="div-base-editor-dropdowns", + style={"display": "none"}, + ), ], style={"margin-top": "10%"}, ) @@ -2053,8 +2028,7 @@ def index_page() -> html.Div: @app.callback( - Output("div-base-editor-dropdowns", "style"), - [Input("radio-base_editor", "value")] + Output("div-base-editor-dropdowns", "style"), [Input("radio-base_editor", "value")] ) def update_visibility_base_editor_dropdowns(radio_value: str) -> Dict: """Update visibilyt of base editing dropdowns. @@ -2077,8 +2051,7 @@ def update_visibility_base_editor_dropdowns(radio_value: str) -> Dict: @app.callback( - [Output("be-window-start", "options"), - Output("be-window-stop", "options")], + [Output("be-window-start", "options"), Output("be-window-stop", "options")], [Input("text-guides", "value")], [State("radio-guide", "value"), State("available-genome", "value")], ) @@ -2111,9 +2084,7 @@ def update_base_editing_dropdown( f"Expected {str.__name__}, got {type(text_guides).__name__}" ) if not isinstance(guide_type, str): - raise TypeError( - f"Expected {str.__name__}, got {type(guide_type).__name__}" - ) + raise TypeError(f"Expected {str.__name__}, got {type(guide_type).__name__}") dropdown_options = [{"label": "", "value": ""}] if text_guides is None: return dropdown_options, dropdown_options @@ -2124,8 +2095,8 @@ def update_base_editing_dropdown( for seqname_and_seq in text_guides.split(">"): if not seqname_and_seq: continue - seqname = seqname_and_seq[:seqname_and_seq.find("\n")] - seq = seqname_and_seq[seqname_and_seq.find("\n"):].strip() + seqname = seqname_and_seq[: seqname_and_seq.find("\n")] + seq = seqname_and_seq[seqname_and_seq.find("\n") :].strip() if "chr" in seq: # BED regions for line in seq.split("\n"): if not line: @@ -2142,16 +2113,11 @@ def update_base_editing_dropdown( guides.append(seq_read) guides = "\n".join(list(set(guides))) if not all( - [ - len(guide) == len(guides.split("\n")[0]) - for guide in guides.split("\n") - ] + [len(guide) == len(guides.split("\n")[0]) for guide in guides.split("\n")] ): guides = select_same_len_guides(guides) guides = guides.split("\n") - dropdown_options = [ - {"label": i, "value": i} for i in range(1, len(guides[0]) + 1) - ] + dropdown_options = [{"label": i, "value": i} for i in range(1, len(guides[0]) + 1)] return dropdown_options, dropdown_options @@ -2174,8 +2140,7 @@ def check_mail_address(mail_address: str) -> bool: return False assert mail_address is not None if not isinstance(mail_address, str): - raise TypeError( - f"Expected {str.__name__}, got {type(mail_address).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(mail_address).__name__}") mail_address_fields = mail_address.split("@") if len(mail_address_fields) > 1 and bool(mail_address_fields[-1]): return True diff --git a/pages/pages_utils.py b/pages/pages_utils.py index dc50bbe..bd71cdb 100755 --- a/pages/pages_utils.py +++ b/pages/pages_utils.py @@ -2,7 +2,6 @@ webpages. """ - from app import current_working_directory, operators from typing import Dict, List, Optional, Tuple @@ -442,18 +441,18 @@ def get_query_column(filter_criterion: str) -> Dict[str, str]: "sort": "", "samples": "", } - if filter_criterion == FILTERING_CRITERIA[0]: - for key in query_columns.keys(): + if filter_criterion == FILTERING_CRITERIA[0]: # fewest mm+bulges + for key in query_columns: query_columns[key] = "_".join([query_columns[key], f"({MMBULGES_FILTER})"]) query_columns["sort"] = TOTAL_FEWEST_COLUMN query_columns["samples"] = SAMPLES_FEWEST_COLUMN - elif filter_criterion == FILTERING_CRITERIA[1]: - for key in query_columns.keys(): + elif filter_criterion == FILTERING_CRITERIA[1]: # cfd + for key in query_columns: query_columns[key] = "_".join([query_columns[key], f"({CFD_FILTER})"]) query_columns["sort"] = CFD_COLUMN query_columns["samples"] = SAMPLES_COLUMN - elif filter_criterion == FILTERING_CRITERIA[2]: - for key in query_columns.keys(): + elif filter_criterion == FILTERING_CRITERIA[2]: # crista + for key in query_columns: query_columns[key] = "_".join([query_columns[key], f"({CRISTA_FILTER})"]) query_columns["sort"] = CRISTA_COLUMN query_columns["samples"] = SAMPLES_CRISTA_COLUMN @@ -595,26 +594,28 @@ def generate_table( [ html.Tr( [ - html.Td( - html.A( - dataframe.loc[i, col], - href="".join( - [ - "result?job=", - f"{job_id}#{guide}new", - dataframe.loc[i, "Bulge Type"], - str(dataframe.loc[i, "Bulge Size"]), - str(dataframe.loc[i, "Mismatches"]), - ] + ( + html.Td( + html.A( + dataframe.loc[i, col], + href="".join( + [ + "result?job=", + f"{job_id}#{guide}new", + dataframe.loc[i, "Bulge Type"], + str(dataframe.loc[i, "Bulge Size"]), + str(dataframe.loc[i, "Mismatches"]), + ] + ), + target="_blank", ), - target="_blank", - ), - style={"vertical-align": "middle", "text-align": "center"}, - ) - if col == "" - else html.Td( - dataframe.iloc[i][col], - style={"vertical-align": "middle", "text-align": "center"}, + style={"vertical-align": "middle", "text-align": "center"}, + ) + if col == "" + else html.Td( + dataframe.iloc[i][col], + style={"vertical-align": "middle", "text-align": "center"}, + ) ) for col in dataframe.columns ] @@ -689,24 +690,28 @@ def generate_table_samples( [ html.Tr( [ - html.Td( - html.A( - dataframe.iloc[i + (page - 1) * max_rows][col], - href="".join( - [ - "result?job=", - job_id, - "#", - guide, - "-Sample-", - dataframe.iloc[i + (page - 1) * max_rows]["Sample"], - ] - ), - target="_blank", + ( + html.Td( + html.A( + dataframe.iloc[i + (page - 1) * max_rows][col], + href="".join( + [ + "result?job=", + job_id, + "#", + guide, + "-Sample-", + dataframe.iloc[i + (page - 1) * max_rows][ + "Sample" + ], + ] + ), + target="_blank", + ) ) + if col == "" + else html.Td(dataframe.iloc[i + (page - 1) * max_rows][col]) ) - if col == "" - else html.Td(dataframe.iloc[i + (page - 1) * max_rows][col]) for col in dataframe.columns ] ) diff --git a/pages/results_page.py b/pages/results_page.py index c4db73d..1166a88 100755 --- a/pages/results_page.py +++ b/pages/results_page.py @@ -83,6 +83,7 @@ import dash_table import sqlite3 import flask +import errno import re import os @@ -118,20 +119,29 @@ def result_page(job_id: str) -> html.Div: # check input function arguments if not isinstance(job_id, str): - raise TypeError( - f"Expected {str.__name__}, got {type(job_id).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(job_id).__name__}") # start result page creation code value = job_id - job_directory = os.path.join( - current_working_directory, "Results", f"{job_id}") + job_directory = os.path.join(current_working_directory, "Results", f"{job_id}") # check existance and zip integrated file - integrated_file_name = glob(os.path.join( - current_working_directory, "Results", f"{job_id}", "*integrated*"))[0] # take the first list element + integrated_file_name = glob( + os.path.join(current_working_directory, "Results", f"{job_id}", "*integrated*") + )[ + 0 + ] # take the first list element assert isinstance(integrated_file_name, str) integrated_file_name_zip = integrated_file_name.replace("tsv", "zip") # check existence and zip alt_merge file - alt_merge_file_name = glob(os.path.join(current_working_directory, "Results", - f"{job_id}", "*all_results_with_alternative_alignments*"))[0] # take the first list element + alt_merge_file_name = glob( + os.path.join( + current_working_directory, + "Results", + f"{job_id}", + "*all_results_with_alternative_alignments*", + ) + )[ + 0 + ] # take the first list element assert isinstance(alt_merge_file_name, str) alt_merge_file_name_zip = alt_merge_file_name.replace("tsv", "zip") # check job directory existence to avoid crush @@ -153,8 +163,7 @@ def result_page(job_id: str) -> html.Div: # Load mismatches try: with open( - os.path.join(current_working_directory, - RESULTS_DIR, value, PARAMS_FILE) + os.path.join(current_working_directory, RESULTS_DIR, value, PARAMS_FILE) ) as p: all_params = p.read() real_genome_name = ( @@ -170,8 +179,7 @@ def result_page(job_id: str) -> html.Div: "\t" )[-1] genome_type_f = ( - next(s for s in all_params.split( - "\n") if "Genome_selected" in s) + next(s for s in all_params.split("\n") if "Genome_selected" in s) ).split("\t")[-1] ref_comp = ( next(s for s in all_params.split("\n") if "Ref_comp" in s) @@ -298,8 +306,7 @@ def result_page(job_id: str) -> html.Div: "Warning: Some guides have too many targets! ", html.A( "Click here", - href=os.path.join( - URL, DATA_DIR, job_id, "guides_error.txt"), + href=os.path.join(URL, DATA_DIR, job_id, "guides_error.txt"), className="alert-link", ), " to view them", @@ -370,8 +377,7 @@ def result_page(job_id: str) -> html.Div: current_working_directory, RESULTS_DIR, job_id, - ".".join( - [job_id, "general_table.txt"]), + ".".join([job_id, "general_table.txt"]), ), style={"display": "none"}, id="div-info-general-table", @@ -547,8 +553,7 @@ def result_page(job_id: str) -> html.Div: dcc.Tab( label="Query Genomic Region", value="tab-summary-by-position" ), - dcc.Tab(label="Graphical Reports", - value="tab-summary-graphical"), + dcc.Tab(label="Graphical Reports", value="tab-summary-graphical"), ], ) ) @@ -569,8 +574,7 @@ def result_page(job_id: str) -> html.Div: ), dbc.Collapse( dbc.Card( - dbc.CardBody( - html.Div(id="content-collapse-population")) + dbc.CardBody(html.Div(id="content-collapse-population")) ), id="collapse-populations", ), @@ -590,13 +594,11 @@ def result_page(job_id: str) -> html.Div: label="Summary by Mismatches/Bulges", value="tab-summary-by-guide", ), - dcc.Tab(label="Summary by Sample", - value="tab-summary-by-sample"), + dcc.Tab(label="Summary by Sample", value="tab-summary-by-sample"), dcc.Tab( label="Query Genomic Region", value="tab-summary-by-position" ), - dcc.Tab(label="Graphical Reports", - value="tab-summary-graphical"), + dcc.Tab(label="Graphical Reports", value="tab-summary-graphical"), dcc.Tab( label="Personal Risk Cards", value="tab-graphical-sample-card" ), @@ -646,8 +648,7 @@ def sendto_write_json(filter_criterion: str, search: str) -> None: if not filter_criterion in FILTERING_CRITERIA: raise ValueError(f"Forbidden filtering criterion ({filter_criterion})") if not isinstance(search, str): - raise TypeError( - f"Expected {str.__name__}, got {type(search).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(search).__name__}") job_id = search.split("=")[-1] write_json(filter_criterion, job_id) @@ -656,6 +657,7 @@ def sendto_write_json(filter_criterion: str, search: str) -> None: # Download links generation and actions definition # + # Generate download link summary_by_sample @app.callback( [ @@ -687,11 +689,9 @@ def download_link_sample( """ if not isinstance(file_to_load, str): - raise TypeError( - f"Expected {str.__name__}, got {type(file_to_load).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(file_to_load).__name__}") if not isinstance(search, str): - raise TypeError( - f"Expected {str.__name__}, got {type(search).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(search).__name__}") if n is None: raise PreventUpdate # nothing to do job_id = search.split("=")[-1] @@ -699,8 +699,7 @@ def download_link_sample( file_to_load = file_to_load.strip().split("/")[-1] # print(file_to_load) if os.path.exists( - os.path.join(current_working_directory, - RESULTS_DIR, job_id, file_to_load) + os.path.join(current_working_directory, RESULTS_DIR, job_id, file_to_load) ): return ( html.A( @@ -744,19 +743,16 @@ def download_general_table( """ if not isinstance(file_to_load, str): - raise TypeError( - f"Expected {str.__name__}, got {type(file_to_load).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(file_to_load).__name__}") if not isinstance(search, str): - raise TypeError( - f"Expected {str.__name__}, got {type(search).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(search).__name__}") if n is None: raise PreventUpdate job_id = search.split("=")[-1] file_to_load = file_to_load.split("/")[-1] # print(file_to_load) if os.path.exists( - os.path.join(current_working_directory, - RESULTS_DIR, job_id, file_to_load) + os.path.join(current_working_directory, RESULTS_DIR, job_id, file_to_load) ): return ( html.A( @@ -800,19 +796,16 @@ def download_general_table( """ if not isinstance(file_to_load, str): - raise TypeError( - f"Expected {str.__name__}, got {type(file_to_load).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(file_to_load).__name__}") if not isinstance(search, str): - raise TypeError( - f"Expected {str.__name__}, got {type(search).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(search).__name__}") if n is None: raise PreventUpdate job_id = search.split("=")[-1] file_to_load = file_to_load.split("/")[-1] # print(file_to_load) if os.path.exists( - os.path.join(current_working_directory, - RESULTS_DIR, job_id, file_to_load) + os.path.join(current_working_directory, RESULTS_DIR, job_id, file_to_load) ): return ( html.A( @@ -824,6 +817,7 @@ def download_general_table( ) return "Generating download link, Please wait...", False + # download alt_merge results @@ -857,19 +851,16 @@ def download_general_table( """ if not isinstance(file_to_load, str): - raise TypeError( - f"Expected {str.__name__}, got {type(file_to_load).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(file_to_load).__name__}") if not isinstance(search, str): - raise TypeError( - f"Expected {str.__name__}, got {type(search).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(search).__name__}") if n is None: raise PreventUpdate job_id = search.split("=")[-1] file_to_load = file_to_load.split("/")[-1] # print(file_to_load) if os.path.exists( - os.path.join(current_working_directory, - RESULTS_DIR, job_id, file_to_load) + os.path.join(current_working_directory, RESULTS_DIR, job_id, file_to_load) ): return ( html.A( @@ -913,18 +904,15 @@ def download_link_sample( """ if not isinstance(file_to_load, str): - raise TypeError( - f"Expected {str.__name__}, got {type(file_to_load).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(file_to_load).__name__}") if not isinstance(search, str): - raise TypeError( - f"Expected {str.__name__}, got {type(search).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(search).__name__}") if n is None: raise PreventUpdate job_id = search.split("=")[-1] file_to_load = ".".join([file_to_load, "zip"]) if os.path.exists( - os.path.join(current_working_directory, - RESULTS_DIR, job_id, file_to_load) + os.path.join(current_working_directory, RESULTS_DIR, job_id, file_to_load) ): return ( html.A( @@ -968,18 +956,15 @@ def downloadLinkGuide( """ if not isinstance(file_to_load, str): - raise TypeError( - f"Expected {str.__name__}, got {type(file_to_load).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(file_to_load).__name__}") if not isinstance(search, str): - raise TypeError( - f"Expected {str.__name__}, got {type(search).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(search).__name__}") if n is None: raise PreventUpdate job_id = search.split("=")[-1] file_to_load = ".".join([file_to_load, "zip"]) if os.path.exists( - os.path.join(current_working_directory, - RESULTS_DIR, job_id, file_to_load) + os.path.join(current_working_directory, RESULTS_DIR, job_id, file_to_load) ): return ( html.A( @@ -1059,36 +1044,30 @@ def update_iupac_decomposition_table_cluster( """ if not isinstance(page_current, int): - raise TypeError( - f"Expected {int.__name__}, got {type(page_current).__name__}") + raise TypeError(f"Expected {int.__name__}, got {type(page_current).__name__}") if not isinstance(page_size, int): - raise TypeError( - f"Expected {int.__name__}, got {type(page_size).__name__}") + raise TypeError(f"Expected {int.__name__}, got {type(page_size).__name__}") if not isinstance(filter_criterion, str): raise TypeError( f"Expected {str.__name__}, got {type(filter_criterion).__name__}" ) if not isinstance(search, str): - raise TypeError( - f"Expected {str.__name__}, got {type(search).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(search).__name__}") if not isinstance(hash_term, str): - raise TypeError( - f"Expected {str.__name__}, got {type(hash_term).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(hash_term).__name__}") job_id = search.split("=")[-1] hash_term = hash_term.split("#")[1] guide = hash_term[: hash_term.find("-Pos-")] - chr_pos = hash_term[(hash_term.find("-Pos-") + 5):] + chr_pos = hash_term[(hash_term.find("-Pos-") + 5) :] chromosome = chr_pos.split("-")[0] position = chr_pos.split("-")[1] try: with open( - os.path.join(current_working_directory, - RESULTS_DIR, job_id, PARAMS_FILE) + os.path.join(current_working_directory, RESULTS_DIR, job_id, PARAMS_FILE) ) as handle: all_params = handle.read() genome_type_f = ( - next(s for s in all_params.split( - "\n") if "Genome_selected" in s) + next(s for s in all_params.split("\n") if "Genome_selected" in s) ).split("\t")[-1] ref_comp = ( next(s for s in all_params.split("\n") if "Ref_comp" in s) @@ -1108,8 +1087,7 @@ def update_iupac_decomposition_table_cluster( ) # load data and cache the data table (in pd.DataFrame) df_cached = global_store_general( - os.path.join(current_working_directory, - RESULTS_DIR, job_id, decomp_fname) + os.path.join(current_working_directory, RESULTS_DIR, job_id, decomp_fname) ) if df_cached is None: # nothing to display and do not update the page raise PreventUpdate @@ -1123,16 +1101,14 @@ def update_iupac_decomposition_table_cluster( getattr(df_cached[col_name], operator)(filter_value) ] elif operator == "contains": - df_cached = df_cached.loc[df_cached[col_name].str.contains( - filter_value)] + df_cached = df_cached.loc[df_cached[col_name].str.contains(filter_value)] elif operator == "datestartswith": # this is a simplification of the front-end filtering logic, # only works with complete fields in standard format - df_cached = df_cached.loc[df_cached[col_name].str.startswith( - filter_value)] + df_cached = df_cached.loc[df_cached[col_name].str.startswith(filter_value)] # Calculate sample count data_to_send = df_cached.iloc[ - page_current * page_size: (page_current + 1) * page_size + page_current * page_size : (page_current + 1) * page_size ].to_dict("records") return data_to_send @@ -1184,44 +1160,35 @@ def update_table_cluster( """ if not isinstance(page_current, int): - raise TypeError( - f"Expected {int.__name__}, got {type(page_current).__name__}") + raise TypeError(f"Expected {int.__name__}, got {type(page_current).__name__}") if not isinstance(page_size, int): - raise TypeError( - f"Exepcted {int.__name__}, got {type(page_size).__name__}") + raise TypeError(f"Exepcted {int.__name__}, got {type(page_size).__name__}") if not isinstance(sort_by, list): - raise TypeError( - f"Expected {list.__name__}, got {type(sort_by).__name__}") + raise TypeError(f"Expected {list.__name__}, got {type(sort_by).__name__}") if not isinstance(filter_criterion, str): raise TypeError( f"Exepcted {str.__name__}, got {type(filter_criterion).__name__}" ) if not isinstance(hide_reference, str): - raise TypeError( - f"Expected {str.__name__}, got {type(hide_reference).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(hide_reference).__name__}") if not isinstance(search, str): - raise TypeError( - f"Expected {str.__name__}, got {type(search).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(search).__name__}") if not isinstance(hash_term, str): - raise TypeError( - f"Expected {str.__name__}, got {type(hash_term).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(hash_term).__name__}") job_id = search.split("=")[-1] - job_directory = os.path.join( - current_working_directory, RESULTS_DIR, job_id) + job_directory = os.path.join(current_working_directory, RESULTS_DIR, job_id) hash_term = hash_term.split("#")[1] guide = hash_term[: hash_term.find("-Pos-")] - chr_pos = hash_term[hash_term.find("-Pos-") + 5:] + chr_pos = hash_term[hash_term.find("-Pos-") + 5 :] chromosome = chr_pos.split("-")[0] position = chr_pos.split("-")[1] try: with open( - os.path.join(current_working_directory, - RESULTS_DIR, job_id, PARAMS_FILE) + os.path.join(current_working_directory, RESULTS_DIR, job_id, PARAMS_FILE) ) as handle: all_params = handle.read() genome_type_f = ( - next(s for s in all_params.split( - "\n") if "Genome_selected" in s) + next(s for s in all_params.split("\n") if "Genome_selected" in s) ).split("\t")[-1] ref_comp = ( next(s for s in all_params.split("\n") if "Ref_comp" in s) @@ -1237,8 +1204,7 @@ def update_table_cluster( guide_fname = job_id + "." + chromosome + "_" + position + "." + guide + ".txt" # cache guide data table df_cached = global_store_general( - os.path.join(current_working_directory, - RESULTS_DIR, job_id, guide_fname) + os.path.join(current_working_directory, RESULTS_DIR, job_id, guide_fname) ) if df_cached is None: # empty file -> nothing cached and nothing to do raise PreventUpdate @@ -1248,8 +1214,7 @@ def update_table_cluster( df_cached.rename(columns=COL_BOTH_RENAME, inplace=True) # drop unused columns if "hide-ref" in hide_reference or genome_type == "var": - df_cached.drop( - df_cached[(df_cached["Samples"] == "n")].index, inplace=True) + df_cached.drop(df_cached[(df_cached["Samples"] == "n")].index, inplace=True) # hide reference data if "hide-cluster" in hide_reference: df_cached = df_cached.head(1) @@ -1261,13 +1226,11 @@ def update_table_cluster( getattr(df_cached[col_name], operator)(filter_value) ] elif operator == "contains": - df_cached = df_cached.loc[df_cached[col_name].str.contains( - filter_value)] + df_cached = df_cached.loc[df_cached[col_name].str.contains(filter_value)] elif operator == "datestartswith": # this is a simplification of the front-end filtering logic, # only works with complete fields in standard format - df_cached = df_cached.loc[df_cached[col_name].str.startswith( - filter_value)] + df_cached = df_cached.loc[df_cached[col_name].str.startswith(filter_value)] # sort data table by the defined columns if bool(sort_by): df_cached = df_cached.sort_values( @@ -1280,7 +1243,7 @@ def update_table_cluster( ) # Calculate sample count data_to_send = df_cached.iloc[ - (page_current * page_size): ((page_current + 1) * page_size) + (page_current * page_size) : ((page_current + 1) * page_size) ].to_dict("records") if genome_type != "ref": ( @@ -1335,21 +1298,18 @@ def cluster_page(job_id: str, hash_term: str) -> html.Div: """ if not isinstance(job_id, str): - raise TypeError( - f"Expected {str.__name__}, got {type(job_id).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(job_id).__name__}") if not isinstance(hash_term, str): - raise TypeError( - f"Expected {str.__name__}, got {type(hash_term).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(hash_term).__name__}") guide = hash_term[: hash_term.find("-Pos-")] - chr_pos = hash_term[(hash_term.find("-Pos-") + 5):] + chr_pos = hash_term[(hash_term.find("-Pos-") + 5) :] chromosome = chr_pos.split("-")[0] position = chr_pos.split("-")[1] if not os.path.isdir(os.path.join(current_working_directory, RESULTS_DIR, job_id)): return html.Div(dbc.Alert("The selected result does not exist", color="danger")) try: with open( - os.path.join(current_working_directory, - RESULTS_DIR, job_id, PARAMS_FILE) + os.path.join(current_working_directory, RESULTS_DIR, job_id, PARAMS_FILE) ) as handle_params: params = handle_params.read() genome_type_f = ( @@ -1368,8 +1328,7 @@ def cluster_page(job_id: str, hash_term: str) -> html.Div: if "True" in ref_comp: genome_type = "both" style_hide_reference = {} - value_hide_reference = ["hide-ref", - "hide-cluster"] # hide reference data + value_hide_reference = ["hide-ref", "hide-cluster"] # hide reference data # begin page body construction final_list = [] # HTML page handler assert isinstance(chromosome, str) @@ -1550,8 +1509,7 @@ def cluster_page(job_id: str, hash_term: str) -> html.Div: "Generating download link, Please wait...", id="download-link-sumbyposition", ), - dcc.Interval(interval=5 * 1000, - id="interval-sumbyposition"), + dcc.Interval(interval=5 * 1000, id="interval-sumbyposition"), ] ), ] @@ -1730,11 +1688,9 @@ def global_get_sample_targets( """ if not isinstance(job_id, str): - raise TypeError( - f"Expected {str.__name__}, got {type(job_id).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(job_id).__name__}") if not isinstance(sample, str): - raise TypeError( - f"Expected {str.__name__}, got {type(sample).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(sample).__name__}") if not isinstance(guide, str): raise TypeError(f"Expected {str.__name__}, got {type(guide).__name__}") if not isinstance(page, int): @@ -1766,8 +1722,7 @@ def global_get_sample_targets( # callback to update the samples table @app.callback( - [Output("table-sample-target", "data"), - Output("table-sample-target", "columns")], + [Output("table-sample-target", "data"), Output("table-sample-target", "columns")], [ Input("table-sample-target", "page_current"), Input("table-sample-target", "page_size"), @@ -1810,30 +1765,25 @@ def update_table_sample( """ if not isinstance(page_current, int): - raise TypeError( - f"Expected {int.__name__}, got {type(page_current).__name__}") + raise TypeError(f"Expected {int.__name__}, got {type(page_current).__name__}") if not isinstance(search, str): - raise TypeError( - f"Expected {str.__name__}, got {type(search).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(search).__name__}") if not isinstance(hash_term, str): - raise TypeError( - f"Expected {str.__name__}, got {type(hash_term).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(hash_term).__name__}") job_id = search.split("=")[-1] filter_criterion = read_json(job_id) # recover filter criterion assert isinstance(filter_criterion, str) assert filter_criterion in FILTERING_CRITERIA hash_term = hash_term.split("#")[1] guide = hash_term[: hash_term.find("-Sample-")] - sample = str(hash_term[hash_term.rfind("-") + 1:]) + sample = str(hash_term[hash_term.rfind("-") + 1 :]) try: with open( - os.path.join(current_working_directory, - RESULTS_DIR, job_id, PARAMS_FILE) + os.path.join(current_working_directory, RESULTS_DIR, job_id, PARAMS_FILE) ) as handle: all_params = handle.read() genome_type_f = ( - next(s for s in all_params.split( - "\n") if "Genome_selected" in s) + next(s for s in all_params.split("\n") if "Genome_selected" in s) ).split("\t")[-1] ref_comp = ( next(s for s in all_params.split("\n") if "Ref_comp" in s) @@ -1901,19 +1851,16 @@ def sample_page(job_id: str, hash_term: str) -> html.Div: """ if not isinstance(job_id, str): - raise TypeError( - f"Expected {str.__name__}, got {type(job_id).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(job_id).__name__}") if not isinstance(hash_term, str): - raise TypeError( - f"Expected {str.__name__}, got {type(hash_term).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(hash_term).__name__}") guide = hash_term[: hash_term.find("-Sample-")] - sample = str(hash_term[(hash_term.rfind("-") + 1):]) + sample = str(hash_term[(hash_term.rfind("-") + 1) :]) if not os.path.isdir(os.path.join(current_working_directory, RESULTS_DIR, job_id)): return html.Div(dbc.Alert("The selected result does not exist", color="danger")) try: with open( - os.path.join(current_working_directory, - RESULTS_DIR, job_id, PARAMS_FILE) + os.path.join(current_working_directory, RESULTS_DIR, job_id, PARAMS_FILE) ) as handle_params: params = handle_params.read() genome_type_f = ( @@ -1944,21 +1891,18 @@ def sample_page(job_id: str, hash_term: str) -> html.Div: "Generating download link, Please wait...", id="download-link-sumbysample", ), - dcc.Interval(interval=(5 * 1000), - id="interval-sumbysample"), + dcc.Interval(interval=(5 * 1000), id="interval-sumbysample"), ] ), ] ) ) # header file - header = os.path.join(current_working_directory, - RESULTS_DIR, job_id, "header.txt") + header = os.path.join(current_working_directory, RESULTS_DIR, job_id, "header.txt") # file_to_grep = current_working_directory + 'Results/' + \ # job_id + '/.' + job_id + '.bestMerge.txt' integrated_fname = glob( - os.path.join(current_working_directory, - RESULTS_DIR, job_id, "*integrated*") + os.path.join(current_working_directory, RESULTS_DIR, job_id, "*integrated*") )[ 0 ] # take the first element @@ -1978,8 +1922,7 @@ def sample_page(job_id: str, hash_term: str) -> html.Div: job_id, f"{job_id}.{sample}.{guide}.personal_targets.tsv", ) - integrated_sample_personal_zip = integrated_sample_personal.replace( - "tsv", "zip") + integrated_sample_personal_zip = integrated_sample_personal.replace("tsv", "zip") final_list.append( html.Div( f"{job_id}.{sample}.{guide}.personal_targets", @@ -2096,8 +2039,7 @@ def global_store_general(path_file_to_load: str) -> pd.DataFrame: # make sure file to cache is not empty if os.path.getsize(path_file_to_load) > 0: # TSV format -> sep="\t" - df = pd.read_csv(path_file_to_load, sep="\t", - index_col=False, na_filter=False) + df = pd.read_csv(path_file_to_load, sep="\t", index_col=False, na_filter=False) else: df = None # empty file, no need for caching return df @@ -2107,6 +2049,7 @@ def global_store_general(path_file_to_load: str) -> pd.DataFrame: # Summary by Mismatches/Bulges tab # + # Update primary table of 'Show targets' of Summary by Mismatches/Bulges @app.callback( [ @@ -2171,29 +2114,24 @@ def update_table_subset( """ if not isinstance(page_current, int): - raise TypeError( - f"Expected {int.__name__}, got {type(page_current).__name__}") + raise TypeError(f"Expected {int.__name__}, got {type(page_current).__name__}") if not isinstance(page_size, int): - raise TypeError( - f"Expected {int.__name__}, got {type(page_size).__name__}") + raise TypeError(f"Expected {int.__name__}, got {type(page_size).__name__}") if not isinstance(hide_reference, list): raise TypeError( f"Expected {list.__name__}, got {type(hide_reference).__name__}" ) if not isinstance(search, str): - raise TypeError( - f"Expected {str.__name__}, got {type(search).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(search).__name__}") if not isinstance(hash_guide, str): - raise TypeError( - f"Expected {str.__name__}, got {type(hash_guide).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(hash_guide).__name__}") # recover job identifier job_id = search.split("=")[-1] # recover the filtering criterion from drop-down bar filter_criterion = read_json(job_id) try: with open( - os.path.join(current_working_directory, - RESULTS_DIR, job_id, PARAMS_FILE) + os.path.join(current_working_directory, RESULTS_DIR, job_id, PARAMS_FILE) ) as handle_params: params = handle_params.read() genome_type_f = ( @@ -2216,7 +2154,7 @@ def update_table_subset( filtering_expressions = filter_term.split(" && ") # filtering_expressions.append(['{crRNA} = ' + guide]) # recover guide, mismatches and bulges - guide = hash_guide[1: hash_guide.find("new")] + guide = hash_guide[1 : hash_guide.find("new")] mms = hash_guide[-1:] bulge_s = hash_guide[-2:-1] if "DNA" in hash_guide: @@ -2231,8 +2169,7 @@ def update_table_subset( job_id, bulge_t, bulge_s, mms, guide, page_current ) else: - result = global_store_subset( - job_id, bulge_t, bulge_s, mms, guide, page_current) + result = global_store_subset(job_id, bulge_t, bulge_s, mms, guide, page_current) drop_cols = drop_columns(result, filter_criterion) result.drop(drop_cols, axis=1, inplace=True) # name of target file filtered with bul-type, mm and bul @@ -2283,8 +2220,7 @@ def guidePagev3(job_id, hash): ref_comp = (next(s for s in all_params.split("\n") if "Ref_comp" in s)).split( "\t" )[-1] - pam = (next(s for s in all_params.split( - "\n") if "Pam" in s)).split("\t")[-1] + pam = (next(s for s in all_params.split("\n") if "Pam" in s)).split("\t")[-1] job_directory = current_working_directory + "Results/" + job_id + "/" genome_type = "ref" @@ -2305,15 +2241,13 @@ def guidePagev3(job_id, hash): if pam_at_start: final_list.append( html.H3( - "Selected Guide: " + - str(pam) + str(guide).replace("N", "") + add_header + "Selected Guide: " + str(pam) + str(guide).replace("N", "") + add_header ) ) else: final_list.append( html.H3( - "Selected Guide: " + - str(guide).replace("N", "") + str(pam) + add_header + "Selected Guide: " + str(guide).replace("N", "") + str(pam) + add_header ) ) final_list.append( @@ -2322,8 +2256,7 @@ def guidePagev3(job_id, hash): # 'Select a row to view the target IUPAC character scomposition. The rows highlighted in red indicates that the target was found only in the genome with variants.', "List of Targets found for the selected guide.", dcc.Checklist( - options=[ - {"label": "Hide Reference Targets", "value": "hide-ref"}], + options=[{"label": "Hide Reference Targets", "value": "hide-ref"}], id="hide-reference-targets", value=value_hide_reference, style=style_hide_reference, @@ -2334,8 +2267,7 @@ def guidePagev3(job_id, hash): "Generating download link, Please wait...", id="download-link-sumbyguide", ), - dcc.Interval(interval=5 * 1000, - id="interval-sumbyguide"), + dcc.Interval(interval=5 * 1000, id="interval-sumbyguide"), ] ), ] @@ -2380,8 +2312,7 @@ def guidePagev3(job_id, hash): ) ) - path_db = glob(current_working_directory + - "Results/" + job_id + "/.*.db")[0] + path_db = glob(current_working_directory + "Results/" + job_id + "/.*.db")[0] path_db = str(path_db) conn = sqlite3.connect(path_db) c = conn.cursor() @@ -2479,14 +2410,11 @@ def global_store_subset_no_ref( """ if not isinstance(job_id, str): - raise TypeError( - f"Expected {str.__name__}, got {type(job_id).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(job_id).__name__}") if not isinstance(bulge_t, str): - raise TypeError( - f"Expected {str.__name__}, got {type(bulge_t).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(bulge_t).__name__}") if not isinstance(bulge_s, str): - raise TypeError( - f"Expected {str.__name__}, got {type(bulge_s).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(bulge_s).__name__}") if not isinstance(mms, str): raise TypeError(f"Expected {str.__name__}, got {type(mms).__name__}") if not isinstance(guide, str): @@ -2560,14 +2488,11 @@ def global_store_subset( """ if not isinstance(job_id, str): - raise TypeError( - f"Expected {str.__name__}, got {type(job_id).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(job_id).__name__}") if not isinstance(bulge_t, str): - raise TypeError( - f"Expected {str.__name__}, got {type(bulge_t).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(bulge_t).__name__}") if not isinstance(bulge_s, str): - raise TypeError( - f"Expected {str.__name__}, got {type(bulge_s).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(bulge_s).__name__}") if not isinstance(mms, str): raise TypeError(f"Expected {str.__name__}, got {type(mms).__name__}") if not isinstance(guide, str): @@ -2648,11 +2573,9 @@ def load_distribution_populations( """ if not isinstance(sel_cel, list): - raise TypeError( - f"Expected {list.__name__}, got {type(sel_cel).__name__}") + raise TypeError(f"Expected {list.__name__}, got {type(sel_cel).__name__}") if not isinstance(job_id, str): - raise TypeError( - f"Expected {str.__name__}, got {type(job_id).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(job_id).__name__}") if sel_cel is None or not sel_cel or not all_guides: raise PreventUpdate # do not do anything # get the guide @@ -2660,8 +2583,7 @@ def load_distribution_populations( job_id = job_id.split("=")[-1] # job identifier try: with open( - os.path.join(current_working_directory, - RESULTS_DIR, job_id, PARAMS_FILE) + os.path.join(current_working_directory, RESULTS_DIR, job_id, PARAMS_FILE) ) as handle_params: all_params = handle_params.read() mms = (next(s for s in all_params.split("\n") if "Mismatches" in s)).split( @@ -2785,6 +2707,7 @@ def toggle_collapse_distribution_populations(n, is_open): # Custom Ranking tab # + # trigger guides table construction @app.callback( [ @@ -2838,17 +2761,13 @@ def update_table_general_profile( """ if not isinstance(page_current, int): - raise TypeError( - f"Expected {int.__name__}, got {type(page_current).__name__}") + raise TypeError(f"Expected {int.__name__}, got {type(page_current).__name__}") if not isinstance(page_size, int): - raise TypeError( - f"Expected {int.__name__}, got {type(page_size).__name__}") + raise TypeError(f"Expected {int.__name__}, got {type(page_size).__name__}") if not isinstance(sort_by, list): - raise TypeError( - f"Expected {list.__name__}, got {type(sort_by).__name__}") + raise TypeError(f"Expected {list.__name__}, got {type(sort_by).__name__}") if not isinstance(filter_term, str): - raise TypeError( - f"Expected {str.__name__}, got {type(filter_term).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(filter_term).__name__}") if not isinstance(filter_criterion, str): raise TypeError( f"Expected {str.__name__}, got {type(filter_criterion).__name__}" @@ -2856,14 +2775,12 @@ def update_table_general_profile( if filter_criterion not in FILTERING_CRITERIA: raise ValueError(f"Forbidden filter criterion ({filter_criterion})") if not isinstance(search, str): - raise TypeError( - f"Expected {str.__name__}, got {type(search).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(search).__name__}") # recover job identifier job_id = search.split("=")[-1] try: with open( - os.path.join(current_working_directory, - RESULTS_DIR, job_id, PARAMS_FILE) + os.path.join(current_working_directory, RESULTS_DIR, job_id, PARAMS_FILE) ) as handle_params: params = handle_params.read() genome_type_f = ( @@ -2894,8 +2811,7 @@ def update_table_general_profile( # Get error guides error_guides = [] if os.path.exists( - os.path.join(current_working_directory, RESULTS_DIR, - job_id, "guides_error.txt") + os.path.join(current_working_directory, RESULTS_DIR, job_id, "guides_error.txt") ): try: with open( @@ -2910,8 +2826,7 @@ def update_table_general_profile( # Get guide from .guide.txt try: with open( - os.path.join(current_working_directory, - RESULTS_DIR, job_id, GUIDES_FILE) + os.path.join(current_working_directory, RESULTS_DIR, job_id, GUIDES_FILE) ) as handle_guides: guides = handle_guides.read().strip().split("\n") guides.sort() @@ -2950,9 +2865,11 @@ def update_table_general_profile( ] # acfd = [int(round((100/(100 + x))*100)) for x in acfd] acfd = [ - float("{:.3f}".format(x * 100)) - if x < 1 and x >= 0 - else "CFD score not available" + ( + float("{:.3f}".format(x * 100)) + if x < 1 and x >= 0 + else "CFD score not available" + ) for x in acfd ] df = [] @@ -2992,8 +2909,7 @@ def update_table_general_profile( # append CFD to table table_to_file.append(f"Filter_criterion: {filter_criterion}") table_to_file.append("\t\t\t\tMismatches") - table_to_file.append( - data_general_count_copy.to_string(index=False)) + table_to_file.append(data_general_count_copy.to_string(index=False)) if genome_type == "both": data_guides["Doench 2016"] = doench[i] else: @@ -3012,8 +2928,7 @@ def update_table_general_profile( if j == 1: data_guides["Total"].append( "\t".join( - ["REFERENCE", str( - sum(data_general_count.iloc[j, :]))] + ["REFERENCE", str(sum(data_general_count.iloc[j, :]))] ) ) elif j == 2: @@ -3024,8 +2939,7 @@ def update_table_general_profile( elif j == 4: data_guides["Total"].append( "\t\t".join( - ["VARIANT", str( - sum(data_general_count.iloc[j, :]))] + ["VARIANT", str(sum(data_general_count.iloc[j, :]))] ) ) else: @@ -3037,16 +2951,14 @@ def update_table_general_profile( if j == 1: data_guides["Total"].append( "\t".join( - ["REFERENCE", str( - sum(data_general_count.iloc[j, :]))] + ["REFERENCE", str(sum(data_general_count.iloc[j, :]))] ) ) data_guides["Total"].append("\t") elif j == 3: data_guides["Total"].append( "\t\t".join( - ["VARIANT", str( - sum(data_general_count.iloc[j, :]))] + ["VARIANT", str(sum(data_general_count.iloc[j, :]))] ) ) else: @@ -3058,16 +2970,14 @@ def update_table_general_profile( if j == 0: data_guides["Total"].append( "\t".join( - ["REFERENCE", str( - sum(data_general_count.iloc[j, :]))] + ["REFERENCE", str(sum(data_general_count.iloc[j, :]))] ) ) data_guides["Total"].append("\t") elif j == 1: data_guides["Total"].append( "\t\t".join( - ["VARIANT", str( - sum(data_general_count.iloc[j, :]))] + ["VARIANT", str(sum(data_general_count.iloc[j, :]))] ) ) else: @@ -3075,8 +2985,7 @@ def update_table_general_profile( if j == len(data_guides["# Bulges"].split("\n")) // 2: data_guides["Total"].append( "\t".join( - ["REFERENCE", str( - sum(data_general_count.iloc[j, :]))] + ["REFERENCE", str(sum(data_general_count.iloc[j, :]))] ) ) else: @@ -3091,7 +3000,7 @@ def update_table_general_profile( else: for j in range(mms + 1): tmp = list(data_general_count.iloc[:, j].values.astype(str)) - tmp = tmp[:max_bulges+1] + tmp = tmp[: max_bulges + 1] # tmp.insert(len(tmp)//2, "") data_guides[str(j) + "MM"] = "\n".join(tmp) data_guides["Total"] = "\n".join(data_guides["Total"]) @@ -3109,8 +3018,9 @@ def update_table_general_profile( finally: outfile.close() # zip integrated results - integrated_fname = glob(os.path.join( - current_working_directory, RESULTS_DIR, job_id, "*integrated*"))[0] + integrated_fname = glob( + os.path.join(current_working_directory, RESULTS_DIR, job_id, "*integrated*") + )[0] assert isinstance(integrated_fname, str) # integrated_file = integrated_fname # zip integrated file @@ -3121,8 +3031,14 @@ def update_table_general_profile( if code != 0: raise ValueError(f"An error occurred while running {cmd}") # zip alt_merge results - alt_merge_fname = glob(os.path.join(current_working_directory, RESULTS_DIR, - job_id, "*all_results_with_alternative_alignments*"))[0] + alt_merge_fname = glob( + os.path.join( + current_working_directory, + RESULTS_DIR, + job_id, + "*all_results_with_alternative_alignments*", + ) + )[0] assert isinstance(alt_merge_fname, str) # integrated_file = alt_merge_fname # zip integrated file @@ -3135,11 +3051,9 @@ def update_table_general_profile( # score checking if "NO SCORES" not in all_scores: try: - dff = dff.sort_values(["CFD", "Doench 2016"], - ascending=[False, False]) + dff = dff.sort_values(["CFD", "Doench 2016"], ascending=[False, False]) except: # for BOTH - dff = dff.sort_values(["CFD", "Enriched"], - ascending=[False, False]) + dff = dff.sort_values(["CFD", "Enriched"], ascending=[False, False]) else: try: dff = dff.sort_values("On-Targets Reference", ascending=True) @@ -3167,7 +3081,7 @@ def update_table_general_profile( ) # Calculate sample count data_to_send = dff.iloc[ - page_current * page_size: (page_current + 1) * page_size + page_current * page_size : (page_current + 1) * page_size ].to_dict("records") return data_to_send, [{"row": 0, "column": 0}] @@ -3216,6 +3130,7 @@ def color_selected_row(sel_cel: List, all_guides: List) -> List: # Query genomic region tab # + # trigger filtering table by genomic coordinates @app.callback( [ @@ -3285,8 +3200,7 @@ def filter_position_table( if not filter_criterion in FILTERING_CRITERIA: raise ValueError(f"Forbidden filtering criterion ({filter_criterion})") if not isinstance(search, str): - raise TypeError( - f"Expected {str.__name__}, got {type(search).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(search).__name__}") if sel_cel is None: raise PreventUpdate if n is None: @@ -3449,11 +3363,9 @@ def update_position_filter( if not isinstance(chrom, str): raise TypeError(f"Expected {str.__name__}, got {type(chrom).__name__}") if not isinstance(pos_start, str): - raise TypeError( - f"Expected {str.__name__}, got {type(pos_start).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(pos_start).__name__}") if not isinstance(pos_end, str): - raise TypeError( - f"Expected {str.__name__}, got {type(pos_end).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(pos_end).__name__}") if n is None: # no click -> no page update raise PreventUpdate if pos_start == "": @@ -3468,6 +3380,7 @@ def update_position_filter( # Summary by Sample tab # + # View next/prev page on sample table @app.callback( [ @@ -3536,24 +3449,19 @@ def filter_sample_table( if n_prev is not None: if not isinstance(n_prev, int): - raise TypeError( - f"Expected {int.__name__}, got {type(n_prev).__name__}") + raise TypeError(f"Expected {int.__name__}, got {type(n_prev).__name__}") if n_next is not None: if not isinstance(n_next, int): - raise TypeError( - f"Expected {int.__name__}, got {type(n_next).__name__}") + raise TypeError(f"Expected {int.__name__}, got {type(n_next).__name__}") if not isinstance(filter_q, str): - raise TypeError( - f"Expected {str.__name__}, got {type(filter_q).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(filter_q).__name__}") if n is not None: if not isinstance(n, int): raise TypeError(f"Expected {int.__name__}, got {type(n).__name__}") if not isinstance(search, str): - raise TypeError( - f"Expected {str.__name__}, got {type(search).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(search).__name__}") if not isinstance(current_page, str): - raise TypeError( - f"Expected {str.__name__}, got {type(current_page).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(current_page).__name__}") if sel_cel is None: raise PreventUpdate # do not do anything if n_prev is None and n_next is None and n is None: @@ -3580,16 +3488,14 @@ def filter_sample_table( btn_sample_section = [n, n_prev, n_next] # get job identifier job_id = search.split("=")[-1] - job_directory = os.path.join( - current_working_directory, RESULTS_DIR, job_id) + job_directory = os.path.join(current_working_directory, RESULTS_DIR, job_id) population_1000gp = associateSample.loadSampleAssociation( os.path.join(job_directory, SAMPLES_ID_FILE) )[2] # read CRISPRme run parameters try: with open( - os.path.join(current_working_directory, - RESULTS_DIR, job_id, PARAMS_FILE) + os.path.join(current_working_directory, RESULTS_DIR, job_id, PARAMS_FILE) ) as handle_params: params = handle_params.read() genome_type_f = ( @@ -3693,14 +3599,12 @@ def filter_sample_table( if pop is None or pop == "": df.drop( df[ - (~(df["Population"].isin( - population_1000gp[sup_pop]))) + (~(df["Population"].isin(population_1000gp[sup_pop]))) ].index, inplace=True, ) else: - df.drop(df[(df["Population"] != pop)].index, - inplace=True) + df.drop(df[(df["Population"] != pop)].index, inplace=True) else: df.drop(df[(df["Sample"] != sample)].index, inplace=True) if ((current_page - 1) * 10) > len(df): @@ -3732,21 +3636,18 @@ def filter_sample_table( if pop is None or pop == "": df.drop( df[ - (~(df["Population"].isin( - population_1000gp[sup_pop]))) + (~(df["Population"].isin(population_1000gp[sup_pop]))) ].index, inplace=True, ) else: - df.drop(df[(df["Population"] != pop)].index, - inplace=True) + df.drop(df[(df["Population"] != pop)].index, inplace=True) else: df.drop(df[(df["Sample"] != sample)].index, inplace=True) max_page = len(df.index) max_page = math.floor(max_page / 10) + 1 return ( - generate_table_samples(df, "table-samples", - current_page, guide, job_id), + generate_table_samples(df, "table-samples", current_page, guide, job_id), f"{current_page}/{max_page}", ) @@ -3795,12 +3696,10 @@ def update_sample_filter( ) if population is not None: if not isinstance(population, str): - raise TypeError( - f"Expected {str.__name__}, got {type(population).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(population).__name__}") if sample is not None: if not isinstance(sample, str): - raise TypeError( - f"Expected {str.__name__}, got {type(sample).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(sample).__name__}") if n is None: raise PreventUpdate # prevent page updates when at least one filter element is none @@ -3838,16 +3737,13 @@ def update_sample_drop(pop: str, search: str) -> Tuple[List, None]: if pop is not None: if not isinstance(pop, str): - raise TypeError( - f"Expected {str.__name__}, got {type(pop).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(pop).__name__}") if not isinstance(search, str): - raise TypeError( - f"Expected {str.__name__}, got {type(search).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(search).__name__}") if pop is None or pop == "": return [], None # no update required job_id = search.split("=")[-1] - job_directory = os.path.join( - current_working_directory, RESULTS_DIR, job_id) + job_directory = os.path.join(current_working_directory, RESULTS_DIR, job_id) pop_dict = associateSample.loadSampleAssociation( os.path.join(job_directory, SAMPLES_ID_FILE) )[3] @@ -3883,16 +3779,13 @@ def update_population_drop(superpop: str, search: str) -> Tuple[Dict, None]: if superpop is not None: if not isinstance(superpop, str): - raise TypeError( - f"Expected {str.__name__}, got {type(superpop).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(superpop).__name__}") if not isinstance(search, str): - raise TypeError( - f"Expected {str.__name__}, got {type(search).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(search).__name__}") if superpop is None or superpop == "": raise PreventUpdate # no update required job_id = search.split("=")[-1] - job_directory = os.path.join( - current_working_directory, RESULTS_DIR, job_id) + job_directory = os.path.join(current_working_directory, RESULTS_DIR, job_id) population_1000gp = associateSample.loadSampleAssociation( os.path.join(job_directory, SAMPLES_ID_FILE) )[2] @@ -3919,14 +3812,11 @@ def check_existance_sample(job_directory: str, job_id: str, sample: str) -> bool """ if not isinstance(job_directory, str): - raise TypeError( - f"Expected {str.__name__}, got {type(job_directory).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(job_directory).__name__}") if not isinstance(job_id, str): - raise TypeError( - f"Expected {str.__name__}, got {type(job_id).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(job_id).__name__}") if not isinstance(sample, str): - raise TypeError( - f"Expected {str.__name__}, got {type(sample).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(sample).__name__}") dataset = pd.read_csv( os.path.join(job_directory, job_id, SAMPLES_ID_FILE), sep="\t", na_filter=False ) @@ -3940,6 +3830,7 @@ def check_existance_sample(job_directory: str, job_id: str, sample: str) -> bool # Graphical Reports tab # + # Select figures on mms value, sample value @app.callback( [ @@ -3993,12 +3884,10 @@ def update_images_tabs( if filter_criterion not in FILTERING_CRITERIA: raise ValueError(f"Forbidden filtering criterion ({filter_criterion})") if not isinstance(search, str): - raise TypeError( - f"Expected {str.__name__}, got {type(search).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(search).__name__}") bulge = 0 job_id = search.split("=")[-1] - job_directory = os.path.join( - current_working_directory, RESULTS_DIR, job_id) + job_directory = os.path.join(current_working_directory, RESULTS_DIR, job_id) guide = all_guides[int(sel_cel[0]["row"])]["Guide"] # define plot containers # radar_chart_images = list() @@ -4051,8 +3940,7 @@ def update_images_tabs( except: population_barplots = [ html.Div( - html.H2( - "No result found for this combination of mismatches and bulges") + html.H2("No result found for this combination of mismatches and bulges") ) ] # radar chart @@ -4138,7 +4026,6 @@ def update_images_tabs( State("url", "search"), ], ) -# FUNCTION TO GENERATE SAMPLE CARD, UPDATE WITH FILTER DROPDOWN def generate_sample_card( n: int, filter_criterion: str, @@ -4176,127 +4063,181 @@ def generate_sample_card( Sample card webpage """ - if n is not None: - if not isinstance(n, int): - raise TypeError(f"Expected {int.__name__}, got {type(n).__name__}") + if n is not None and not isinstance(n, int): + raise TypeError(f"Expected {int.__name__}, got {type(n).__name__}") if not isinstance(filter_criterion, str): raise TypeError( f"Expected {str.__name__}, got {type(filter_criterion).__name__}" ) - if not filter_criterion in FILTERING_CRITERIA: + if filter_criterion not in FILTERING_CRITERIA: raise ValueError(f"Forbidden filtering criterion ({filter_criterion})") if not isinstance(sample, str): - raise TypeError( - f"Expected {str.__name__}, got {type(sample).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(sample).__name__}") if not isinstance(search, str): - raise TypeError( - f"Expected {str.__name__}, got {type(search).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(search).__name__}") if n is None: raise PreventUpdate # do not do anything # recover guide guide = all_guides[int(sel_cel[0]["row"])]["Guide"] # recover job id job_id = search.split("=")[-1] - job_directory = os.path.join( - current_working_directory, RESULTS_DIR, job_id) + job_directory = os.path.join(current_working_directory, RESULTS_DIR, job_id) # read summary by sample data samples_summary = pd.read_csv( os.path.join( job_directory, f"{job_id}.summary_by_samples.{guide}_{filter_criterion}.txt" ), sep="\t", - skiprows=2, - index_col=0, - header=None, - na_filter=False, + skiprows=2, # skip first two rows (guide + header) + index_col=0, # use sample ids as index + header=None, # header has been skipped + na_filter=False, # keep nan values ) - # try to cast sample to int - try: - sample_idx = int(sample) - except: - sample_idx = sample # maybe already int? - personal = samples_summary.loc[sample_idx, 4] # personal data in 4th col - pam_creation = samples_summary.loc[sample_idx, 7] # pam in 7th col - integrated_fname = glob(os.path.join(job_directory, "*integrated*"))[0] - assert isinstance(integrated_fname, str) - # integrated file with personal targets - integrated_personal = os.path.join( - job_directory, f"{job_id}.{sample}.{guide}.personal_targets.txt" + # select the number of personal targets and pam creation events for sample + # these data are in the 4th and 7th column of the summary by sample files, + # respectively + targets_sample_num = samples_summary.loc[sample, 4] + pam_creation_sample_num = samples_summary.loc[sample, 7] + # output filenames to store personal and private targets for sample + targets_personal_fname = os.path.join( + job_directory, f"{job_id}.{sample}.{guide}.personal_targets.tsv" ) - # integrated file with private targets - integrated_private = os.path.join( + targets_private_fname = os.path.join( job_directory, f"{job_id}.{sample}.{guide}.private_targets.tsv" ) - # path to database - db_path = glob( - os.path.join(current_working_directory, RESULTS_DIR, job_id, ".*.db") - )[0] - assert isinstance(db_path, str) - if not os.path.isfile(db_path): - raise FileNotFoundError(f"Unable to locate {db_path}") - # open connection to database - conn = sqlite3.connect(db_path) - c = conn.cursor() - # get query columns - query_cols = get_query_column(filter_criterion) - # perform the query - result_personal = pd.read_sql_query( - "SELECT * FROM final_table WHERE \"{}\"='{}' AND \"{}\" LIKE '%{}%'".format( - GUIDE_COLUMN, guide, query_cols["samples"], sample - ), - conn, - ) - # sort personal targets data - order = False - if filter_criterion == "fewest": - order = True - result_personal = result_personal.sort_values( - [query_cols["sort"]], ascending=order) - # extract sample private targets - result_private = result_personal[result_personal[query_cols["samples"]] == sample] - conn.commit() - conn.close() # close connection to db - # store personal and private targets - result_personal.to_csv(integrated_personal, sep="\t", index=False) - result_private.to_csv(integrated_private, sep="\t", index=False) - # zip target files - integrated_private_zip = integrated_private.replace("tsv", "zip") - cmd = f"zip -j {integrated_private_zip} {integrated_private}" - code = subprocess.call(cmd, shell=True) - if code != 0: - raise ValueError(f'An error occurred while running "{cmd}"') - # plot images in personal card tab - # TODO: avoid calling scripts, use functions instead - os.system( - f"python {app_directory}/PostProcess/CRISPRme_plots_personal.py {integrated_personal} {current_working_directory}/Results/{job_id}/imgs/ {guide}.{sample}.personal > /dev/null 2>&1" + imgsdir = os.path.join(job_directory, IMGS_DIR) + plotfname = os.path.join( + imgsdir, "CRISPRme_{}_top_1000_log_for_main_text_{}.{}.{}.png" ) - os.system( - f"python {app_directory}/PostProcess/CRISPRme_plots_personal.py {integrated_private} {current_working_directory}/Results/{job_id}/imgs/ {guide}.{sample}.private > /dev/null 2>&1" + if os.path.isfile(targets_private_fname) and ( + all( + os.path.isfile(plotfname.format(c, guide, sample, "personal")) + for c in FILTERING_CRITERIA + ) + and all( + os.path.isfile(plotfname.format(c, guide, sample, "private")) + for c in FILTERING_CRITERIA + ) + ): # cached private targets for sample + targets_private = pd.read_csv(targets_private_fname, sep="\t") + # sort private targets according the filtering criterion + order = filter_criterion == FILTERING_CRITERIA[0] + criterion_cname = get_query_column(filter_criterion)["sort"] + targets_private = targets_private.sort_values( + [criterion_cname], ascending=order + ) + else: # first query for this sample + # sql database to be used for queries + db_path = os.path.join(job_directory, f".{job_id}.db") + if not os.path.isfile(db_path): + raise FileNotFoundError(f"Cannot locate database file {db_path}") + try: + conn = sqlite3.connect(db_path) # connect to database + c = conn.cursor() # sql database cursor to execute queries + query_cols = get_query_column(filter_criterion) # get query columns + # recover colname for sample column and filtering/sorting criterion + samples_cname, criterion_cname = query_cols["samples"], query_cols["sort"] + # define and perform sql database to retrieve sample targets + sqlquery = ( + f"SELECT * FROM final_table WHERE \"{GUIDE_COLUMN}\"='{guide}' " + f"AND \"{samples_cname}\" LIKE '%{sample}%'" + ) + targets_personal = pd.read_sql_query(sqlquery, conn) # perform query + except sqlite3.Error as e: + raise sqlite3.Error(f"Database error ({db_path})") from e + except KeyError as e: + raise KeyError( + "Key error - possibly missing column in query columns" + ) from e + except Exception as e: + # sourcery skip: raise-specific-error + raise Exception( + f"Unexpected error while querying {db_path} for sample {sample}" + ) from e + finally: + if "conn" in locals(): # close connection to database + conn.commit() + conn.close() + if targets_personal.shape[0] != targets_sample_num: + raise ValueError( + f"Mismatching sample targets number (expected: {targets_sample_num}, got: {targets_personal.shape[0]})" + ) + # sort personal targets - sort in ascending order if fewest is the criterion + # top targets have less mm+bulges + order = filter_criterion == FILTERING_CRITERIA[0] + targets_personal = targets_personal.sort_values( + [criterion_cname], ascending=order + ) + # retrieve sample private targets from personal targets + targets_private = targets_personal[targets_personal[samples_cname] == sample] + # plot images to be displayed in the personal risk card tab + try: + targets_personal.to_csv(targets_personal_fname, sep="\t", index=False) + targets_private.to_csv(targets_private_fname, sep="\t", index=False) + except FileNotFoundError as e: + raise FileNotFoundError(f"File path not found {e.filename}") from e + except PermissionError as e: + raise PermissionError(f"Permission denied: {e.filename}") from e + except OSError as e: + if e.errno == errno.ENOSPC: + raise OSError("No space left on device") from e + else: + raise OSError(f"Cannot write {e.filename}") from e + except Exception as e: + # sourcery skip: raise-specific-error + raise Exception( + "An unexpected error occurred while saving personal and integrated targets" + ) from e + # compute sample's personal and private targets with lolliplots + crisprme_plots_personal = ( + f"python {app_directory}/PostProcess/CRISPRme_plots_personal.py" + ) + crisprme_plots_personal_cmd = ( + f"{crisprme_plots_personal} {targets_personal_fname} {imgsdir}/" + ) + code = subprocess.call( + f"{crisprme_plots_personal_cmd} {guide}.{sample}.personal", shell=True + ) + if code != 0: + raise subprocess.SubprocessError( + f"Failed personal targets plot generation on sample {sample}" + ) + code = subprocess.call( + f"{crisprme_plots_personal_cmd} {guide}.{sample}.private", shell=True + ) + if code != 0: + raise subprocess.SubprocessError( + f"Failed private targets plot generation on sample {sample}" + ) + # compress private targets file for download + targets_private_zip = os.path.join( + job_directory, + os.path.basename(f"{os.path.splitext(targets_private_fname)[0]}.zip"), ) - cmd = f"rm -rf {integrated_personal}" - code = subprocess.call(cmd, shell=True) - if code != 0: - raise ValueError(f'An error occurred while running "{cmd}"') - # recover final results table - results_table = pd.DataFrame( - [[len(result_personal.index), pam_creation, len(result_private.index)]], - columns=["Personal", "PAM Creation", "Private"], + if not os.path.isfile(targets_private_zip): + code = subprocess.call( + f"zip -j {targets_private_zip} {targets_private_fname}", shell=True + ) + if code != 0: + raise subprocess.SubprocessError( + f"Failed to compress {targets_private_fname}" + ) + # compute targets stats for sample -> displayed on top of the page + sample_stats = pd.DataFrame( + { + "Personal": [targets_sample_num], + "PAM creation": [pam_creation_sample_num], + "Private": [targets_private.shape[0]], + } ).astype(str) - # read the private targets file, if not created, pass - try: - ans = result_private - except: - pass - # put images for personal and private in HTML + ans = targets_private # load private targets for sample; targets are displayed + # put images for personal and private targets in HTML try: image_personal_top = "data:image/png;base64,{}".format( base64.b64encode( open( os.path.join( - current_working_directory, - RESULTS_DIR, - job_id, - IMGS_DIR, + imgsdir, f"CRISPRme_{filter_criterion}_top_1000_log_for_main_text_{guide}.{sample}.personal.png", ), mode="rb", @@ -4307,10 +4248,7 @@ def generate_sample_card( base64.b64encode( open( os.path.join( - current_working_directory, - RESULTS_DIR, - job_id, - IMGS_DIR, + imgsdir, f"CRISPRme_{filter_criterion}_top_1000_log_for_main_text_{guide}.{sample}.private.png", ), mode="rb", @@ -4318,7 +4256,7 @@ def generate_sample_card( ).decode() ) except: - raise ValueError("Personal and Private Lollipop plots not generated") + raise ValueError("Personal and Private Lolliplots not found") # recover filtering criterion selected via drop-down bar filter_criterion = read_json(job_id) assert filter_criterion in FILTERING_CRITERIA @@ -4333,8 +4271,7 @@ def generate_sample_card( ), False, [ - html.P( - f"Top 100 Personal Targets ordered by {filter_criterion}"), + html.P(f"Top 100 Personal Targets ordered by {filter_criterion}"), html.A( html.Img( src=image_personal_top, @@ -4346,8 +4283,7 @@ def generate_sample_card( ), ], [ - html.P( - f"Top 100 Private Targets ordered by {filter_criterion}"), + html.P(f"Top 100 Private Targets ordered by {filter_criterion}"), html.A( html.Img( src=image_private_top, @@ -4361,8 +4297,8 @@ def generate_sample_card( dash_table.DataTable( css=[{"selector": ".row", "rule": "margin: 0"}], id="results-table", - columns=[{"name": i, "id": i} for i in results_table.columns], - data=results_table.to_dict("records"), + columns=[{"name": i, "id": i} for i in sample_stats.columns], + data=sample_stats.to_dict("records"), style_cell_conditional=[ { "if": {"column_id": "Variant_samples_(highest_CFD)"}, @@ -4393,10 +4329,7 @@ def generate_sample_card( dash_table.DataTable( css=[{"selector": ".row", "rule": "margin: 0"}], id="results-table-risk", - columns=[ - {"name": i, "id": i, "hideable": True} - for count, i in enumerate(ans.columns) - ], + columns=[{"name": i, "id": i, "hideable": True} for i in ans.columns], data=ans.to_dict("records"), style_cell_conditional=[ { @@ -4440,8 +4373,7 @@ def generate_sample_card( ), True, [ - html.P( - f"Top 100 Personal Targets ordered by {filter_criterion}"), + html.P(f"Top 100 Personal Targets ordered by {filter_criterion}"), html.A( html.Img( src=image_personal_top, @@ -4453,8 +4385,7 @@ def generate_sample_card( ), ], [ - html.P( - f"Top 100 Private Targets ordered by {filter_criterion}"), + html.P(f"Top 100 Private Targets ordered by {filter_criterion}"), html.A( html.Img( src=image_private_top, @@ -4494,18 +4425,18 @@ def generate_sample_card( "overflow": "hidden", }, ], - columns=[{"name": i, "id": i} for i in results_table.columns], - data=results_table.to_dict("records"), + columns=[{"name": i, "id": i} for i in sample_stats.columns], + data=sample_stats.to_dict("records"), ), [], ] - assert isinstance(out_1, list) return out_1 # ------------------------------------------------------------------------------ # main page layout + # update the main content table @app.callback( Output("div-tab-content", "children"), @@ -4555,29 +4486,24 @@ def update_content_tab( if value is not None: if not isinstance(value, str): - raise TypeError( - f"Expected {str.__name__}, got {type(value).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(value).__name__}") if not isinstance(filter_criterion, str): raise TypeError( f"Expected {str.__name__}, got {type(filter_criterion).__name__}" ) if filter_criterion not in FILTERING_CRITERIA: - raise ValueError( - f"Forbidden filtering criterion selected ({filter_criterion})") + raise ValueError(f"Forbidden filtering criterion selected ({filter_criterion})") if not isinstance(search, str): - raise TypeError( - f"Expected {str.__name__}, got {type(search).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(search).__name__}") if not isinstance(genome_type, str): - raise TypeError( - f"Expected {str.__name__}, got {type(genome_type).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(genome_type).__name__}") if value is None or sel_cel is None or not sel_cel or not all_guides: raise PreventUpdate # do not do anything # recover current guide guide = all_guides[int(sel_cel[0]["row"])]["Guide"] # recover job ID job_id = search.split("=")[-1] - job_directory = os.path.join( - current_working_directory, RESULTS_DIR, job_id) + job_directory = os.path.join(current_working_directory, RESULTS_DIR, job_id) # read parameters file try: with open(os.path.join(job_directory, PARAMS_FILE)) as handle_params: @@ -4591,8 +4517,7 @@ def update_content_tab( max_bulges = ( next(s for s in params.split("\n") if "Max_bulges" in s) ).split("\t")[-1] - pam = (next(s for s in params.split("\n") - if "Pam" in s)).split("\t")[-1] + pam = (next(s for s in params.split("\n") if "Pam" in s)).split("\t")[-1] nuclease = (next(s for s in params.split("\n") if "Nuclease" in s)).split( "\t" )[-1] @@ -4611,8 +4536,7 @@ def update_content_tab( else: CFD_notification = html.Div("", hidden=True) if nuclease != CAS9 and filter_criterion != FILTERING_CRITERIA[0]: - raise ValueError( - f"Wrong filtering criterion selected for nuclease {nuclease}") + raise ValueError(f"Wrong filtering criterion selected for nuclease {nuclease}") # PAM(s) pam_at_start = False assert isinstance(guide, str) @@ -4716,15 +4640,13 @@ def update_content_tab( samples_summary = samples_summary.sort_values( "Targets in Sample", ascending=False ) - more_info_col = ["Show Targets" for _ in range( - samples_summary.shape[0])] + more_info_col = ["Show Targets" for _ in range(samples_summary.shape[0])] samples_summary[""] = more_info_col population_1000gp = associateSample.loadSampleAssociation( os.path.join(job_directory, SAMPLES_ID_FILE) )[2] - super_populations = [{"label": i, "value": i} - for i in population_1000gp.keys()] + super_populations = [{"label": i, "value": i} for i in population_1000gp.keys()] populations = [] for pop in population_1000gp.keys(): for i in population_1000gp[pop]: @@ -4824,8 +4746,7 @@ def update_content_tab( ) max_page = samples_summary.shape[0] max_page = math.floor(max_page / 10) + 1 - fl.append(html.Div(f"1/{max_page}", - id="div-current-page-table-samples")) + fl.append(html.Div(f"1/{max_page}", id="div-current-page-table-samples")) return fl elif value == "tab-summary-by-position": # Show Summary by position table (Query Genomic regions tab) @@ -4853,8 +4774,7 @@ def update_content_tab( onlyfile = [ f for f in os.listdir( - os.path.join(current_working_directory, - "Genomes", genome_selected) + os.path.join(current_working_directory, "Genomes", genome_selected) ) if ( os.path.isfile( @@ -4891,8 +4811,7 @@ def update_content_tab( ] ) chr_file += chr_file_unset - chr_file = [{"label": chr_name, "value": chr_name} - for chr_name in chr_file] + chr_file = [{"label": chr_name, "value": chr_name} for chr_name in chr_file] # TODO: insert failsafe if no chromosome is found fl.append( html.Div( @@ -4926,8 +4845,7 @@ def update_content_tab( ), dbc.Col( html.Div( - html.Button( - "Filter", id="button-filter-position") + html.Button("Filter", id="button-filter-position") ) ), html.Br(), @@ -4946,11 +4864,9 @@ def update_content_tab( ) ) fl.append(html.Br()) - fl.append( - html.Div(style={"text-align": "center"}, id="div-table-position")) + fl.append(html.Div(style={"text-align": "center"}, id="div-table-position")) max_page = 1 # maximum one single page - fl.append(html.Div(f"1/{max_page}", - id="div-current-page-table-position")) + fl.append(html.Div(f"1/{max_page}", id="div-current-page-table-position")) fl.append( html.Div( f"{mms}-{max_bulges}", @@ -4999,13 +4915,11 @@ def update_content_tab( ), dbc.Col( html.Div( - html.Button( - "Generate", id="button-sample-card") + html.Button("Generate", id="button-sample-card") ) ), dbc.Col( - html.Div( - id="download-link-personal-card", hidden=True) + html.Div(id="download-link-personal-card", hidden=True) ), ] ), @@ -5120,8 +5034,7 @@ def update_content_tab( html.Div( [ html.H4("Group by"), - dcc.RadioItems( - id="order", value="CFD_score"), + dcc.RadioItems(id="order", value="CFD_score"), ] ), width=3, @@ -5162,8 +5075,7 @@ def update_content_tab( html.Div( [ html.H6("Max"), - dcc.Dropdown( - id="maxdrop"), + dcc.Dropdown(id="maxdrop"), ] ) ] @@ -5181,10 +5093,8 @@ def update_content_tab( dcc.RadioItems( id="Radio-asc-1", options=[ - {"label": " Ascending", - "value": "ASC"}, - {"label": " Descending", - "value": "DESC"}, + {"label": " Ascending", "value": "ASC"}, + {"label": " Descending", "value": "DESC"}, ], value="DESC", labelStyle={ @@ -5474,8 +5384,7 @@ def update_content_tab( html.Div( [ CFD_notification, - dbc.Row(dbc.Col(top1000_image, width={ - "size": 10, "offset": 2})), + dbc.Row(dbc.Col(top1000_image, width={"size": 10, "offset": 2})), dbc.Row(total_buttons, justify="center"), html.Br(), ] @@ -5486,8 +5395,7 @@ def update_content_tab( ) populations_barplots = dbc.Col(html.Div(id="div-population-barplot")) if genome_type != "ref": - graph_summary_both = [ - populations_barplots, radar_chart_encode_gencode] + graph_summary_both = [populations_barplots, radar_chart_encode_gencode] else: graph_summary_both = [radar_chart_encode_gencode] fl.append(html.Div([dbc.Row(graph_summary_both)])) @@ -5545,8 +5453,7 @@ def global_store(job_id: str) -> pd.DataFrame: if job_id is not None: if not isinstance(job_id, str): - raise TypeError( - f"Expected {str.__name__}, got {type(job_id).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(job_id).__name__}") if job_id is None: return "" # nothing to return target = [ @@ -5572,8 +5479,7 @@ def global_store(job_id: str) -> pd.DataFrame: and f.endswith("targets.txt") ] targets_summary = pd.read_csv( - os.path.join(current_working_directory, - RESULTS_DIR, job_id, target[0]), + os.path.join(current_working_directory, RESULTS_DIR, job_id, target[0]), sep="\t", usecols=range(0, 38), na_filter=False, @@ -5646,31 +5552,24 @@ def update_table( """ if not isinstance(page_current, int): - raise TypeError( - f"Expected {int.__name__}, got {type(page_current).__name__}") + raise TypeError(f"Expected {int.__name__}, got {type(page_current).__name__}") if not isinstance(page_size, int): - raise TypeError( - f"Expected {int.__name__}, got {type(page_size).__name__}") + raise TypeError(f"Expected {int.__name__}, got {type(page_size).__name__}") if not isinstance(sort_by, list): - raise TypeError( - f"Expected {list.__name__}, got {type(sort_by).__name__}") + raise TypeError(f"Expected {list.__name__}, got {type(sort_by).__name__}") if not isinstance(filter_term, str): - raise TypeError( - f"Expected {str.__name__}, got {type(filter_term).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(filter_term).__name__}") if search is not None: if not isinstance(search, str): - raise TypeError( - f"Expected {str.__name__}, got {type(search).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(search).__name__}") if not isinstance(hash_guide, str): - raise TypeError( - f"Expected {str.__name__}, got {type(hash_guide).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(hash_guide).__name__}") if search is None: raise PreventUpdate # do not do anything # recover job ID job_id = search.split("=")[-1] # recover job directory - job_directory = os.path.join( - current_working_directory, RESULTS_DIR, job_id) + job_directory = os.path.join(current_working_directory, RESULTS_DIR, job_id) # recover guide guide = hash_guide.split("#")[1] filtering_expressions = filter_term.split(" && ") @@ -5729,13 +5628,14 @@ def update_table( "No results were found with the given parameters", color="warning" ) return df_filtered.iloc[ - page_current * page_size: (page_current + 1) * page_size + page_current * page_size : (page_current + 1) * page_size ].to_dict("records") # ------------------------------------------------------------------------------ # Callbacks for querying part + # Return the table with the query's result @app.callback( # [Output('live_table', 'data'), @@ -5819,21 +5719,17 @@ def update_output( """ if not isinstance(n_clicks, int): - raise TypeError( - f"Expected {int.__name__}, got {type(n_clicks).__name__}") + raise TypeError(f"Expected {int.__name__}, got {type(n_clicks).__name__}") if not isinstance(page_current, int): - raise TypeError( - f"Expected {int.__name__}, got {type(page_current).__name__}") + raise TypeError(f"Expected {int.__name__}, got {type(page_current).__name__}") if not isinstance(filter_target_value, str): raise TypeError( f"Expected {str.__name__}, got {type(filter_target_value).__name__}" ) if not isinstance(page_size, int): - raise TypeError( - f"Expected {int.__name__}, got {type(page_size).__name__}") + raise TypeError(f"Expected {int.__name__}, got {type(page_size).__name__}") if not isinstance(target, str): - raise TypeError( - f"Expected {str.__name__}, got {type(target).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(target).__name__}") # prevent update on None inputs if radio_order is None or ( order_drop is None and thresh_drop is None and asc1 is None @@ -6136,13 +6032,11 @@ def maxdrop(thresh_drop: str, order: str) -> List: ) if order is not None: if not isinstance(order, str): - raise TypeError( - f"Expected {str.__name__}, got {type(order).__name__}") + raise TypeError(f"Expected {str.__name__}, got {type(order).__name__}") if order == "Mismatches": if thresh_drop: start_value = int(thresh_drop) - data = [{"label": str(i), "value": str(i)} - for i in range(start_value, 7)] + data = [{"label": str(i), "value": str(i)} for i in range(start_value, 7)] else: data = [] elif order == "CFD_score": @@ -6167,15 +6061,13 @@ def maxdrop(thresh_drop: str, order: str) -> List: elif order == "Bulges": if thresh_drop: start_value = int(thresh_drop) - data = [{"label": str(i), "value": str(i)} - for i in range(start_value, 3)] + data = [{"label": str(i), "value": str(i)} for i in range(start_value, 3)] else: data = [] elif order == "Mismatches+bulges": if thresh_drop: start_value = int(thresh_drop) - data = [{"label": str(i), "value": str(i)} - for i in range(start_value, 9)] + data = [{"label": str(i), "value": str(i)} for i in range(start_value, 9)] else: data = [] else: