Skip to content

Commit

Permalink
refactored readfish to make it work with the ONT simulator without a …
Browse files Browse the repository at this point in the history
…guppy basecall server
  • Loading branch information
maximilianmordig committed Jun 24, 2023
1 parent ab3b25a commit f956e31
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 44 deletions.
109 changes: 66 additions & 43 deletions ru/ru_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,29 +66,29 @@
"counter",
"mode",
"decision",
"condition",
"min_threshold",
"count_threshold",
"start_analysis",
"end_analysis",
"timestamp",
"condition", # policy for decision
"min_threshold", # <= min chunk threshold
"count_threshold", # whether max chunk threshold reached
"start_analysis", # when last chunk was started processing, undefined reference point, so only difference is valid!
"end_analysis", # when last chunk was finished processing, undefined reference point, so only difference is valid!
"timestamp", # time the decision was made, only sent once all chunks in batch are decided
)


def simple_analysis(
client,
client: RUClient,
batch_size=512,
throttle=0.1,
unblock_duration=0.5,
cl=None,
pf=None,
chunk_logger=None,
paf_logger=None,
live_toml_path=None,
flowcell_size=512,
dry_run=False,
run_info=None,
conditions=None,
mapper=None,
caller_kwargs=None,
caller=None,
):
"""Analysis function
Expand All @@ -102,9 +102,9 @@ def simple_analysis(
The number of seconds interval between requests to the ReadUntilClient
unblock_duration : int or float
Time, in seconds, to apply unblock voltage
cl : logging.Logger
chunk_logger : logging.Logger
Log file to log chunk data to
pf : logging.Logger
paf_logger : logging.Logger
Log file to log alignments to
live_toml_path : str
Path to a `live` TOML configuration file for ReadFish. If this exists when
Expand All @@ -118,7 +118,8 @@ def simple_analysis(
conditions : list
Experimental conditions as List of namedtuples.
mapper : mappy.Aligner
caller_kwargs : dict
caller: Caller
guppy basecaller
Returns
-------
Expand Down Expand Up @@ -152,15 +153,6 @@ def simple_analysis(
fh.write("# In the future this file may become a CSV file.\n")
toml.dump(d, fh)

if caller_kwargs["host"].startswith("ipc"):
guppy_address = "{}/{}".format(caller_kwargs["host"], caller_kwargs["port"])
else:
guppy_address = "{}:{}".format(caller_kwargs["host"], caller_kwargs["port"])

caller = Caller(
address=guppy_address,
config=caller_kwargs["config_name"],
)
# What if there is no reference or an empty MMI

decisiontracker = DecisionTracker()
Expand Down Expand Up @@ -269,7 +261,7 @@ def simple_analysis(
mode = ""
exceeded_threshold = False
below_threshold = False
log_decision = lambda: cl.debug(
log_decision = lambda: chunk_logger.debug(
l_string.format(
loop_counter,
r,
Expand Down Expand Up @@ -317,7 +309,7 @@ def simple_analysis(

hits = set()
for result in results:
pf.debug("{}\t{}\t{}".format(read_id, seq_len, result))
paf_logger.debug("{}\t{}\t{}".format(read_id, seq_len, result))
hits.add(result.ctg)

if hits & conditions[run_info[channel]].targets:
Expand Down Expand Up @@ -410,42 +402,71 @@ def simple_analysis(
Severity.INFO,
)


send_message(client.connection, "ReadFish Client Stopped.", Severity.WARN)
logger.info("Finished analysis of reads as client stopped.")

def get_guppy_basecaller(caller_kwargs):
if caller_kwargs["host"].startswith("ipc"):
guppy_address = "{}/{}".format(caller_kwargs["host"], caller_kwargs["port"])
else:
send_message(client.connection, "ReadFish Client Stopped.", Severity.WARN)
caller.disconnect()
logger.info("Finished analysis of reads as client stopped.")
guppy_address = "{}:{}".format(caller_kwargs["host"], caller_kwargs["port"])

caller = Caller(
address=guppy_address,
config=caller_kwargs["config_name"],
)
return caller

def main():
sys.exit("This entry point is deprecated, please use 'readfish targets' instead")


def run(parser, args):
if args.chunk_log is not None:
chunk_log_exists = Path(args.chunk_log).is_file()
chunk_logger = setup_logger("chunk_log", log_file=args.chunk_log)
#todo1: refactor into setup_chunk_logger
def get_chunk_logger(chunk_log_file, delete_existing=True):
# log line header to chunk log file if it doesn't exist

if chunk_log_file is not None:
if delete_existing:
Path(chunk_log_file).unlink(missing_ok=True)
chunk_log_exists = Path(chunk_log_file).is_file()
chunk_logger = setup_logger("chunk_log", log_file=chunk_log_file)
if not chunk_log_exists:
chunk_logger.debug("\t".join(CHUNK_LOG_FIELDS))
else:
chunk_logger = logging.getLogger("chunk_log")
chunk_logger.disabled = True

if args.paf_log is not None:
paf_logger = setup_logger("paf_log", log_file=args.paf_log)

return chunk_logger

def get_paf_logger(paf_log_file):
if paf_log_file is not None:
paf_logger = setup_logger("paf_log", log_file=paf_log_file)
else:
paf_logger = logging.getLogger("paf_log")
paf_logger.disabled = True

return paf_logger

def file_and_console_logger(name, log_file, log_format):
# creates a logger to a file if file is provided, always attaches a stream handler to output to console
logger = setup_logger(
__name__,
log_format=args.log_format,
log_file=args.log_file,
name,
log_format=log_format,
log_file=log_file,
level=logging.INFO,
)
if args.log_file is not None:
if log_file is not None:
h = logging.StreamHandler()
h.setFormatter(logging.Formatter(args.log_format))
h.setFormatter(logging.Formatter(log_format))
logger.addHandler(h)

return logger

def run(parser, args):
chunk_logger = get_chunk_logger(args.chunk_log)
paf_logger = get_paf_logger(args.paf_log)
logger = file_and_console_logger(__name__, args.log_file, args.log_format)

logger.info(" ".join(sys.argv))
print_args(args, logger=logger)

Expand Down Expand Up @@ -504,20 +525,22 @@ def run(parser, args):
)

try:
caller = get_guppy_basecaller(caller_kwargs)
simple_analysis(
read_until_client,
unblock_duration=args.unblock_duration,
throttle=args.throttle,
batch_size=args.batch_size,
cl=chunk_logger,
pf=paf_logger,
chunk_logger=chunk_logger,
paf_logger=paf_logger,
live_toml_path=live_toml,
dry_run=args.dry_run,
run_info=run_info,
conditions=conditions,
mapper=mapper,
caller_kwargs=caller_kwargs,
basecaller=caller,
)
caller.disconnect()
except KeyboardInterrupt:
pass
finally:
Expand Down
2 changes: 1 addition & 1 deletion ru/unblock_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@


def simple_analysis(
client, duration, batch_size=512, throttle=0.4, unblock_duration=0.1
client: RUClient, duration, batch_size=512, throttle=0.4, unblock_duration=0.1
):
"""Analysis function
Expand Down

0 comments on commit f956e31

Please sign in to comment.