From f956e31681cef4d7e1a9e116779915096d1d83da Mon Sep 17 00:00:00 2001 From: Maximilian Mordig Date: Sat, 24 Jun 2023 14:04:24 +0200 Subject: [PATCH] refactored readfish to make it work with the ONT simulator without a guppy basecall server --- ru/ru_gen.py | 109 ++++++++++++++++++++++++++++------------------ ru/unblock_all.py | 2 +- 2 files changed, 67 insertions(+), 44 deletions(-) diff --git a/ru/ru_gen.py b/ru/ru_gen.py index 32473b7..7688a69 100644 --- a/ru/ru_gen.py +++ b/ru/ru_gen.py @@ -66,29 +66,29 @@ "counter", "mode", "decision", - "condition", - "min_threshold", - "count_threshold", - "start_analysis", - "end_analysis", - "timestamp", + "condition", # policy for decision + "min_threshold", # <= min chunk threshold + "count_threshold", # whether max chunk threshold reached + "start_analysis", # when last chunk was started processing, undefined reference point, so only difference is valid! + "end_analysis", # when last chunk was finished processing, undefined reference point, so only difference is valid! + "timestamp", # time the decision was made, only sent once all chunks in batch are decided ) def simple_analysis( - client, + client: RUClient, batch_size=512, throttle=0.1, unblock_duration=0.5, - cl=None, - pf=None, + chunk_logger=None, + paf_logger=None, live_toml_path=None, flowcell_size=512, dry_run=False, run_info=None, conditions=None, mapper=None, - caller_kwargs=None, + caller=None, ): """Analysis function @@ -102,9 +102,9 @@ def simple_analysis( The number of seconds interval between requests to the ReadUntilClient unblock_duration : int or float Time, in seconds, to apply unblock voltage - cl : logging.Logger + chunk_logger : logging.Logger Log file to log chunk data to - pf : logging.Logger + paf_logger : logging.Logger Log file to log alignments to live_toml_path : str Path to a `live` TOML configuration file for ReadFish. If this exists when @@ -118,7 +118,8 @@ def simple_analysis( conditions : list Experimental conditions as List of namedtuples. mapper : mappy.Aligner - caller_kwargs : dict + caller: Caller + guppy basecaller Returns ------- @@ -152,15 +153,6 @@ def simple_analysis( fh.write("# In the future this file may become a CSV file.\n") toml.dump(d, fh) - if caller_kwargs["host"].startswith("ipc"): - guppy_address = "{}/{}".format(caller_kwargs["host"], caller_kwargs["port"]) - else: - guppy_address = "{}:{}".format(caller_kwargs["host"], caller_kwargs["port"]) - - caller = Caller( - address=guppy_address, - config=caller_kwargs["config_name"], - ) # What if there is no reference or an empty MMI decisiontracker = DecisionTracker() @@ -269,7 +261,7 @@ def simple_analysis( mode = "" exceeded_threshold = False below_threshold = False - log_decision = lambda: cl.debug( + log_decision = lambda: chunk_logger.debug( l_string.format( loop_counter, r, @@ -317,7 +309,7 @@ def simple_analysis( hits = set() for result in results: - pf.debug("{}\t{}\t{}".format(read_id, seq_len, result)) + paf_logger.debug("{}\t{}\t{}".format(read_id, seq_len, result)) hits.add(result.ctg) if hits & conditions[run_info[channel]].targets: @@ -410,42 +402,71 @@ def simple_analysis( Severity.INFO, ) + + send_message(client.connection, "ReadFish Client Stopped.", Severity.WARN) + logger.info("Finished analysis of reads as client stopped.") + +def get_guppy_basecaller(caller_kwargs): + if caller_kwargs["host"].startswith("ipc"): + guppy_address = "{}/{}".format(caller_kwargs["host"], caller_kwargs["port"]) else: - send_message(client.connection, "ReadFish Client Stopped.", Severity.WARN) - caller.disconnect() - logger.info("Finished analysis of reads as client stopped.") + guppy_address = "{}:{}".format(caller_kwargs["host"], caller_kwargs["port"]) + caller = Caller( + address=guppy_address, + config=caller_kwargs["config_name"], + ) + return caller def main(): sys.exit("This entry point is deprecated, please use 'readfish targets' instead") - -def run(parser, args): - if args.chunk_log is not None: - chunk_log_exists = Path(args.chunk_log).is_file() - chunk_logger = setup_logger("chunk_log", log_file=args.chunk_log) +#todo1: refactor into setup_chunk_logger +def get_chunk_logger(chunk_log_file, delete_existing=True): + # log line header to chunk log file if it doesn't exist + + if chunk_log_file is not None: + if delete_existing: + Path(chunk_log_file).unlink(missing_ok=True) + chunk_log_exists = Path(chunk_log_file).is_file() + chunk_logger = setup_logger("chunk_log", log_file=chunk_log_file) if not chunk_log_exists: chunk_logger.debug("\t".join(CHUNK_LOG_FIELDS)) else: chunk_logger = logging.getLogger("chunk_log") chunk_logger.disabled = True - - if args.paf_log is not None: - paf_logger = setup_logger("paf_log", log_file=args.paf_log) + + return chunk_logger + +def get_paf_logger(paf_log_file): + if paf_log_file is not None: + paf_logger = setup_logger("paf_log", log_file=paf_log_file) else: paf_logger = logging.getLogger("paf_log") paf_logger.disabled = True + + return paf_logger +def file_and_console_logger(name, log_file, log_format): + # creates a logger to a file if file is provided, always attaches a stream handler to output to console logger = setup_logger( - __name__, - log_format=args.log_format, - log_file=args.log_file, + name, + log_format=log_format, + log_file=log_file, level=logging.INFO, ) - if args.log_file is not None: + if log_file is not None: h = logging.StreamHandler() - h.setFormatter(logging.Formatter(args.log_format)) + h.setFormatter(logging.Formatter(log_format)) logger.addHandler(h) + + return logger + +def run(parser, args): + chunk_logger = get_chunk_logger(args.chunk_log) + paf_logger = get_paf_logger(args.paf_log) + logger = file_and_console_logger(__name__, args.log_file, args.log_format) + logger.info(" ".join(sys.argv)) print_args(args, logger=logger) @@ -504,20 +525,22 @@ def run(parser, args): ) try: + caller = get_guppy_basecaller(caller_kwargs) simple_analysis( read_until_client, unblock_duration=args.unblock_duration, throttle=args.throttle, batch_size=args.batch_size, - cl=chunk_logger, - pf=paf_logger, + chunk_logger=chunk_logger, + paf_logger=paf_logger, live_toml_path=live_toml, dry_run=args.dry_run, run_info=run_info, conditions=conditions, mapper=mapper, - caller_kwargs=caller_kwargs, + basecaller=caller, ) + caller.disconnect() except KeyboardInterrupt: pass finally: diff --git a/ru/unblock_all.py b/ru/unblock_all.py index 29c1028..1a385b5 100644 --- a/ru/unblock_all.py +++ b/ru/unblock_all.py @@ -26,7 +26,7 @@ def simple_analysis( - client, duration, batch_size=512, throttle=0.4, unblock_duration=0.1 + client: RUClient, duration, batch_size=512, throttle=0.4, unblock_duration=0.1 ): """Analysis function