From 8c5f01850687fc83836c1175f78389e95a926ebc Mon Sep 17 00:00:00 2001 From: Fabian Jetzinger Date: Wed, 7 Aug 2024 17:38:05 +0200 Subject: [PATCH] * sort instances numerically by cluster ID and then batch ID * Always merge results, even when not batching * readability improvements --- isONform_parallel | 64 +++++++++++++++++++++-------------------------- 1 file changed, 29 insertions(+), 35 deletions(-) diff --git a/isONform_parallel b/isONform_parallel index 2670d75..a6fc1e1 100755 --- a/isONform_parallel +++ b/isONform_parallel @@ -120,7 +120,6 @@ def split_cluster_in_batches_corrected(indir, outdir, tmp_work_dir, max_seqs): #print("FLIST",file_list) #iterate over the fastq_files for filepath in file_list: - smaller_than_max_seqs = False #print("FPATH",filepath) old_fastq_file=str(filepath.resolve()) path_split=old_fastq_file.split("/") @@ -135,16 +134,13 @@ def split_cluster_in_batches_corrected(indir, outdir, tmp_work_dir, max_seqs): #if we have more lines than max_seqs new_indir=os.path.join(indir,folder) #print(new_indir) - if not smaller_than_max_seqs: - num_lines = sum(1 for line in open(os.path.join(new_indir, fastq_file))) - #print("Number Lines", fastq_file, num_lines) - #we reset smaller_than_max_seqs as we now want to see if we really have more than max_seqs reads - smaller_than_max_seqs = False if num_lines > 4 * max_seqs else True - else: - smaller_than_max_seqs = True + num_lines = sum(1 for line in open(os.path.join(new_indir, fastq_file))) + #print("Number Lines", fastq_file, num_lines) - if not smaller_than_max_seqs: + # determine whether the file is larger than max_seqs + larger_than_max_seqs = num_lines > 4 * max_seqs + if larger_than_max_seqs: #print("Splitting",filepath) ext = fastq_file.rsplit('.', 1)[1] splitfile(new_indir, tmp_work_dir, fastq_file, 4 * max_seqs,cl_id,ext) # is fastq file @@ -167,18 +163,17 @@ def split_cluster_in_batches_clust(indir, outdir, tmp_work_dir, max_seqs): file_list = list(pat.rglob('*.fastq')) # add split fiels to this indir for file_ in file_list: - smaller_than_max_seqs = False #for file_ in sorted(os.listdir(indir), key=lambda x: int(x.split('.')[0])): #fastq_path = os.fsdecode(file_) old_fastq_file = str(file_.resolve()) fastq_file = old_fastq_file.split("/")[-1] #print("FASTQ",fastq_file) - if not smaller_than_max_seqs: - num_lines = sum(1 for line in open(os.path.join(indir, fastq_file))) - smaller_than_max_seqs = False if num_lines > 4 * max_seqs else True - else: - smaller_than_max_seqs = True - if not smaller_than_max_seqs: + + num_lines = sum(1 for line in open(os.path.join(indir, fastq_file))) + + # determine whether the file is larger than max_seqs + larger_than_max_seqs = num_lines > 4 * max_seqs + if larger_than_max_seqs: cl_id, ext = fastq_file.rsplit('.', 1) splitfile(indir, tmp_work_dir, fastq_file, 4 * max_seqs, cl_id, ext) # is fastq file else: @@ -216,7 +211,7 @@ def main(args): tmp_work_dir = tempfile.mkdtemp() #print("SPLITWRTBATCHES") - print("Temporary workdirektory:", tmp_work_dir) + print("Temporary workdirectory:", tmp_work_dir) if args.clustered: split_tmp_directory = split_cluster_in_batches_clust(directory, args.outfolder, tmp_work_dir, args.max_seqs) @@ -233,10 +228,10 @@ def main(args): read_fastq_file = os.fsdecode(file_) if read_fastq_file.endswith(".fastq"): #print("True") - tmp_id= read_fastq_file.split(".")[0] - snd_tmp_id=tmp_id.split("_") + tmp_id = read_fastq_file.split(".")[0] + snd_tmp_id = tmp_id.split("_") cl_id = snd_tmp_id[0] - batch_id=snd_tmp_id[1] if len(snd_tmp_id) > 1 else 0 + batch_id = snd_tmp_id[1] if len(snd_tmp_id) > 1 else 0 outfolder = os.path.join(args.outfolder, cl_id) #print(batch_id,cl_id) #print(outfolder) @@ -259,11 +254,11 @@ def main(args): "max_seqs": args.max_seqs, "parallel": True, "--slow": True, "delta_iso_len_3": args.delta_iso_len_3, "delta_iso_len_5": args.delta_iso_len_5} instances.append( - (isONform_location, fastq_file_path, outfolder, batch_id, isONform_algorithm_params,cl_id)) + (isONform_location, fastq_file_path, outfolder, batch_id, isONform_algorithm_params, cl_id)) else: continue - instances.sort(key=lambda x: x[3]) # sorting on batch ids as strings + instances.sort(key=lambda x: (int(x[5]), int(x[3]))) # sorting on cluster_id and then batch_id numerically print("Printing instances") for t in instances: print(t) @@ -292,19 +287,18 @@ def main(args): pool.join() print("Time elapsed multiprocessing:", time() - start_multi) - if args.split_wrt_batches: - print("STILLSPLITWRTBATCHES") - file_handling = time() - if args.write_fastq: - write_fastq = True - else: - write_fastq = False - batch_merging_parallel.join_back_via_batch_merging(args.outfolder, args.delta, args.delta_len, args.delta_iso_len_3, args.delta_iso_len_5, args.max_seqs_to_spoa, args.iso_abundance, write_fastq, write_low_abundance) - Parallelization_side_functions.generate_full_output(args.outfolder, write_fastq, write_low_abundance) - Parallelization_side_functions.remove_folders(args.outfolder) - shutil.rmtree(split_directory) - print("Joined back batched files in:", time() - file_handling) - print("Finished full algo after :", time() - globstart) + print("Merging...") + file_handling = time() + if args.write_fastq: + write_fastq = True + else: + write_fastq = False + batch_merging_parallel.join_back_via_batch_merging(args.outfolder, args.delta, args.delta_len, args.delta_iso_len_3, args.delta_iso_len_5, args.max_seqs_to_spoa, args.iso_abundance, write_fastq, write_low_abundance) + Parallelization_side_functions.generate_full_output(args.outfolder, write_fastq, write_low_abundance) + Parallelization_side_functions.remove_folders(args.outfolder) + shutil.rmtree(split_directory) + print("Joined back batched files in:", time() - file_handling) + print("Finished full algo after :", time() - globstart) return