Skip to content

Commit

Permalink
* sort instances numerically by cluster ID and then batch ID
Browse files Browse the repository at this point in the history
* Always merge results, even when not batching
* readability improvements
  • Loading branch information
FabianJetzinger committed Aug 7, 2024
1 parent 7961c0c commit 8c5f018
Showing 1 changed file with 29 additions and 35 deletions.
64 changes: 29 additions & 35 deletions isONform_parallel
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ def split_cluster_in_batches_corrected(indir, outdir, tmp_work_dir, max_seqs):
#print("FLIST",file_list)
#iterate over the fastq_files
for filepath in file_list:
smaller_than_max_seqs = False
#print("FPATH",filepath)
old_fastq_file=str(filepath.resolve())
path_split=old_fastq_file.split("/")
Expand All @@ -135,16 +134,13 @@ def split_cluster_in_batches_corrected(indir, outdir, tmp_work_dir, max_seqs):
#if we have more lines than max_seqs
new_indir=os.path.join(indir,folder)
#print(new_indir)
if not smaller_than_max_seqs:

num_lines = sum(1 for line in open(os.path.join(new_indir, fastq_file)))
#print("Number Lines", fastq_file, num_lines)
#we reset smaller_than_max_seqs as we now want to see if we really have more than max_seqs reads
smaller_than_max_seqs = False if num_lines > 4 * max_seqs else True
else:
smaller_than_max_seqs = True
num_lines = sum(1 for line in open(os.path.join(new_indir, fastq_file)))
#print("Number Lines", fastq_file, num_lines)

if not smaller_than_max_seqs:
# determine whether the file is larger than max_seqs
larger_than_max_seqs = num_lines > 4 * max_seqs
if larger_than_max_seqs:
#print("Splitting",filepath)
ext = fastq_file.rsplit('.', 1)[1]
splitfile(new_indir, tmp_work_dir, fastq_file, 4 * max_seqs,cl_id,ext) # is fastq file
Expand All @@ -167,18 +163,17 @@ def split_cluster_in_batches_clust(indir, outdir, tmp_work_dir, max_seqs):
file_list = list(pat.rglob('*.fastq'))
# add split fiels to this indir
for file_ in file_list:
smaller_than_max_seqs = False
#for file_ in sorted(os.listdir(indir), key=lambda x: int(x.split('.')[0])):
#fastq_path = os.fsdecode(file_)
old_fastq_file = str(file_.resolve())
fastq_file = old_fastq_file.split("/")[-1]
#print("FASTQ",fastq_file)
if not smaller_than_max_seqs:
num_lines = sum(1 for line in open(os.path.join(indir, fastq_file)))
smaller_than_max_seqs = False if num_lines > 4 * max_seqs else True
else:
smaller_than_max_seqs = True
if not smaller_than_max_seqs:

num_lines = sum(1 for line in open(os.path.join(indir, fastq_file)))

# determine whether the file is larger than max_seqs
larger_than_max_seqs = num_lines > 4 * max_seqs
if larger_than_max_seqs:
cl_id, ext = fastq_file.rsplit('.', 1)
splitfile(indir, tmp_work_dir, fastq_file, 4 * max_seqs, cl_id, ext) # is fastq file
else:
Expand Down Expand Up @@ -216,7 +211,7 @@ def main(args):
tmp_work_dir = tempfile.mkdtemp()
#print("SPLITWRTBATCHES")

print("Temporary workdirektory:", tmp_work_dir)
print("Temporary workdirectory:", tmp_work_dir)
if args.clustered:
split_tmp_directory = split_cluster_in_batches_clust(directory, args.outfolder, tmp_work_dir,
args.max_seqs)
Expand All @@ -233,10 +228,10 @@ def main(args):
read_fastq_file = os.fsdecode(file_)
if read_fastq_file.endswith(".fastq"):
#print("True")
tmp_id= read_fastq_file.split(".")[0]
snd_tmp_id=tmp_id.split("_")
tmp_id = read_fastq_file.split(".")[0]
snd_tmp_id = tmp_id.split("_")
cl_id = snd_tmp_id[0]
batch_id=snd_tmp_id[1] if len(snd_tmp_id) > 1 else 0
batch_id = snd_tmp_id[1] if len(snd_tmp_id) > 1 else 0
outfolder = os.path.join(args.outfolder, cl_id)
#print(batch_id,cl_id)
#print(outfolder)
Expand All @@ -259,11 +254,11 @@ def main(args):
"max_seqs": args.max_seqs, "parallel": True, "--slow": True, "delta_iso_len_3": args.delta_iso_len_3,
"delta_iso_len_5": args.delta_iso_len_5}
instances.append(
(isONform_location, fastq_file_path, outfolder, batch_id, isONform_algorithm_params,cl_id))
(isONform_location, fastq_file_path, outfolder, batch_id, isONform_algorithm_params, cl_id))
else:
continue

instances.sort(key=lambda x: x[3]) # sorting on batch ids as strings
instances.sort(key=lambda x: (int(x[5]), int(x[3]))) # sorting on cluster_id and then batch_id numerically
print("Printing instances")
for t in instances:
print(t)
Expand Down Expand Up @@ -292,19 +287,18 @@ def main(args):
pool.join()
print("Time elapsed multiprocessing:", time() - start_multi)

if args.split_wrt_batches:
print("STILLSPLITWRTBATCHES")
file_handling = time()
if args.write_fastq:
write_fastq = True
else:
write_fastq = False
batch_merging_parallel.join_back_via_batch_merging(args.outfolder, args.delta, args.delta_len, args.delta_iso_len_3, args.delta_iso_len_5, args.max_seqs_to_spoa, args.iso_abundance, write_fastq, write_low_abundance)
Parallelization_side_functions.generate_full_output(args.outfolder, write_fastq, write_low_abundance)
Parallelization_side_functions.remove_folders(args.outfolder)
shutil.rmtree(split_directory)
print("Joined back batched files in:", time() - file_handling)
print("Finished full algo after :", time() - globstart)
print("Merging...")
file_handling = time()
if args.write_fastq:
write_fastq = True
else:
write_fastq = False
batch_merging_parallel.join_back_via_batch_merging(args.outfolder, args.delta, args.delta_len, args.delta_iso_len_3, args.delta_iso_len_5, args.max_seqs_to_spoa, args.iso_abundance, write_fastq, write_low_abundance)
Parallelization_side_functions.generate_full_output(args.outfolder, write_fastq, write_low_abundance)
Parallelization_side_functions.remove_folders(args.outfolder)
shutil.rmtree(split_directory)
print("Joined back batched files in:", time() - file_handling)
print("Finished full algo after :", time() - globstart)
return


Expand Down

0 comments on commit 8c5f018

Please sign in to comment.