Skip to content

Commit

Permalink
Refactoring, added tests, working for old and new miseq
Browse files Browse the repository at this point in the history
  • Loading branch information
Tomáš Houfek committed Oct 3, 2024
1 parent f380eda commit 8fff7a4
Show file tree
Hide file tree
Showing 1,217 changed files with 430 additions and 192 deletions.
2 changes: 1 addition & 1 deletion compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ services:
run:
build: .
volumes:
- /home/houfek/Work/MMCI/sequencing_pipeline/data-catalogue-playground/muni-sc/MiSEQ/:/PseudonymizedRuns
- /home/houfek/Work/MMCI/sequencing_pipeline/data-catalogue-playground/muni-sc/PseudonymizedRunes:/PseudonymizedRuns
- /home/houfek/Work/MMCI/sequencing_pipeline/data-catalogue-playground/muni-sc/OrganisedRuns/:/OrganisedRuns
- /home/houfek/Work/MMCI/sequencing_pipeline/data-catalogue-playground/muni-sc/Patients/:/Patients
command: bash -c "python main.py
Expand Down
40 changes: 2 additions & 38 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,5 @@
import argparse
import os
import shutil
import logging
from datetime import datetime
from organiser.organise_run import RunOrganiser
from organiser.helpers.file_helpers import create_dictionary_if_not_exist


def _create_important_folders_if_not_exist(organisation_folder):
create_dictionary_if_not_exist(os.path.join(organisation_folder, "logs"))
create_dictionary_if_not_exist(os.path.join(organisation_folder, "backups"))
create_dictionary_if_not_exist(os.path.join(organisation_folder, "errors"))


def run(path_to_runs_for_processing, path_to_organisation_folder, path_to_patient_organisation_folder):
_create_important_folders_if_not_exist(path_to_organisation_folder)
logging.basicConfig(filename=os.path.join(path_to_organisation_folder, "logs",
datetime.now().strftime('%d_%m_%Y-%H_%M.log')),
encoding='utf-8',
level=logging.INFO)

for file in os.listdir(path_to_runs_for_processing):
logging.info(f"Organising: {file}")
try:
RunOrganiser(path_to_runs_for_processing, file,
path_to_organisation_folder, path_to_patient_organisation_folder)()
except FileNotFoundError as e:
logging.error(f"Run {file} is missing some data\nError:\n{e}")
shutil.move(os.path.join(path_to_runs_for_processing, file),
os.path.join(path_to_organisation_folder, "errors", file))
continue

shutil.move(os.path.join(path_to_runs_for_processing, file),
os.path.join(path_to_organisation_folder, "backups", file))
logging.info("File moved into backups")

logging.info("Done!")
from organiser.process.processor import Processor


if __name__ == "__main__":
Expand All @@ -48,4 +12,4 @@ def run(path_to_runs_for_processing, path_to_organisation_folder, path_to_patien
parser.add_argument("-p", "--patients", type=str, required=True, help="Path to a patient folder")
args = parser.parse_args()

run(args.runs, args.output, args.patients)
Processor(args.runs, args.output, args.patients).process_runs()
File renamed without changes.
64 changes: 64 additions & 0 deletions organiser/process/processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import os
import shutil
import logging
import sys
from datetime import datetime
from organiser.run_organisers.old_miseq_organise_run import OldMiseqRunOrganiser
from organiser.run_organisers.new_miseq_organise_run import NewMiseqOrganiseRun
from organiser.run_organisers.organise_run import OrganiseRun
from organiser.helpers.file_helpers import create_dictionary_if_not_exist


class Processor:
def __init__(self, pseudnymized_runs_folder, folder_for_organised_files, patient_folder):
self.psedunymized_runs_folder = pseudnymized_runs_folder
self.organised_files_folder = folder_for_organised_files
self.patient_folder = patient_folder

def process_runs(self):
self._create_important_folders_if_not_exist()
logging.basicConfig(filename=os.path.join(self.organised_files_folder, "logs",
datetime.now().strftime('%d_%m_%Y-%H_%M.log')),
encoding='utf-8',
level=logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler(sys.stdout))

for run in os.listdir(self.psedunymized_runs_folder):
if run in ["backups", "logs", "errors"]:
continue
logging.info(f"Organising: {run}")
organiser = self._get_correct_organiser(run)
self._try_organise_run(run, organiser)

logging.info("Done!")

def _try_organise_run(self, run, organiser) -> bool:
try:
organiser.organise_run()
except FileNotFoundError as e:
logging.error(f"Run {run} is missing some data\nError:\n{e}")
shutil.move(os.path.join(self.psedunymized_runs_folder, run),
os.path.join(self.organised_files_folder, "errors", run))
return False

shutil.move(os.path.join(self.psedunymized_runs_folder, run),
os.path.join(self.organised_files_folder, "backups", run))
logging.info(f"Run {run} moved into backups")
return True

def _create_important_folders_if_not_exist(self):
create_dictionary_if_not_exist(os.path.join(self.organised_files_folder, "logs"))
create_dictionary_if_not_exist(os.path.join(self.organised_files_folder, "backups"))
create_dictionary_if_not_exist(os.path.join(self.organised_files_folder, "errors"))

def _get_correct_organiser(self, run_path) -> OrganiseRun:
full_run_path = os.path.join(self.psedunymized_runs_folder, run_path)

if "Alignment_1" in os.listdir(full_run_path) or "SoftwareVersionsFile" in os.listdir(full_run_path):
logging.info(f"{run_path} processed as New Miseq")
return NewMiseqOrganiseRun(self.psedunymized_runs_folder, run_path,
self.organised_files_folder, self.patient_folder)
else:
logging.info(f"{run_path} processed as Old Miseq")
return OldMiseqRunOrganiser(self.psedunymized_runs_folder, run_path,
self.organised_files_folder, self.patient_folder)
File renamed without changes.
61 changes: 61 additions & 0 deletions organiser/run_organisers/new_miseq_organise_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import os
from pathlib import Path

from organiser.helpers.file_helpers import copy_folder_if_exists, copy_if_exists
from .old_miseq_organise_run import OldMiseqRunOrganiser


class NewMiseqOrganiseRun(OldMiseqRunOrganiser):

def organise_run(self):
y = self._get_file_year()
machine = "MiSEQ"
folder_for_run_path = os.path.join(self.organised_runs, y, machine)
Path(folder_for_run_path).mkdir(parents=True, exist_ok=True)
self._create_sample_dirs(folder_for_run_path)
self._create_general_file(folder_for_run_path)
self._create_patient_files_if_clinical_data_exist()
return os.path.join(folder_for_run_path, self.file)

def _collect_data_for_pseudo_number(self, new_folder, pseudo_number):
fastq_folder = os.path.join(self.pseudo_run, self.file, "Alignment_1", "Fastq")
new_fastq_folder = os.path.join(new_folder, "FASTQ")
Path(new_fastq_folder).mkdir(parents=True, exist_ok=True)

for file in os.listdir(fastq_folder):
if pseudo_number in file:
copy_if_exists(os.path.join(fastq_folder, file),
os.path.join(new_fastq_folder, file))

self._collect_analysis(new_folder, pseudo_number)

def _create_general_file(self, new_file_path):
general_file_path = os.path.join(self.pseudo_run, self.file)
new_general_file_path = os.path.join(new_file_path, self.file)
Path(new_general_file_path).mkdir(parents=True, exist_ok=True)

self._copy_important_files(general_file_path, new_general_file_path)
self._copy_important_folders(general_file_path, new_general_file_path)

def _copy_important_files(self, old_path, new_path):
files_to_move = [
os.path.join("Alignment_1", "AnalysisLog.txt"),
os.path.join("Alignment_1", "CompletedJobInfo.xml"),
"RunParameters.xml",
"RunInfo.xml",
"SampleSheet.csv",
"GenerateFASTQRunStatistics.xml"
]
for file in files_to_move:
base = os.path.basename(file)
old_file_path = os.path.join(old_path, file)
new_file_path = os.path.join(new_path, os.path.basename(file))
copy_if_exists(old_file_path, new_file_path)

def _copy_important_folders(self, old_path, new_path):
folder_paths = [("Alignment_1", "Alignment"),
("catalog_info_per_pred_number", "catalog_info_per_pred_number")]
for old, new in folder_paths:
old_folder_path = os.path.join(old_path, old)
new_folder_path = os.path.join(new_path, new)
copy_folder_if_exists(old_folder_path, new_folder_path)
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
import json
import logging

from .organise_run import OrganiseRun
from organiser.helpers.file_helpers import copy_folder_if_exists, copy_if_exists


class RunOrganiser:
class OldMiseqRunOrganiser(OrganiseRun):

def __init__(self, path_to_pseudonymized_runs_folder, name_of_single_run,
path_to_oragnised_storage, path_to_patients):
Expand All @@ -17,82 +18,56 @@ def __init__(self, path_to_pseudonymized_runs_folder, name_of_single_run,
self.organised_runs = path_to_oragnised_storage
self.organised_patients = path_to_patients

def __call__(self):
return self.organise_run()

def organise_run(self):
y, machine, run_number = self._split_file_to_parts(self.file)
run_path = os.path.join(self.organised_runs, y, machine, run_number)
Path(run_path).mkdir(parents=True, exist_ok=True)
self._create_sample_dirs(run_path, self.file)
self._create_general_file(run_path, self.file)
self._create_patient_files_if_clinical_data_exist(self.file)
return os.path.join(y, machine, run_number, self.file)

def _split_file_to_parts(self, filename):
splitted_filename = filename.split("_")
y = self._get_file_year()
machine = "MiSEQ"
folder_for_run_path = os.path.join(self.organised_runs, y, machine)
Path(folder_for_run_path).mkdir(parents=True, exist_ok=True)
self._create_sample_dirs(folder_for_run_path)
self._create_general_file(folder_for_run_path)
self._create_patient_files_if_clinical_data_exist()
return os.path.join(folder_for_run_path, self.file)

def _get_file_year(self):
splitted_filename = self.file.split("_")
year = splitted_filename[0][:2]
if splitted_filename[1][0] == "M":
machine = "MiSEQ"
run_number = splitted_filename[1][1:]
else:
machine = "NextSEQ"
run_number = splitted_filename[1][2:]

return f"20{year}", machine, run_number

def _create_sample_dirs(self, run_path, filename):
sample_sheet_path = os.path.join(self.pseudo_run, filename, "SampleSheet.csv")
return f"20{year}"

def _create_sample_dirs(self, run_samples_path):
sample_sheet_path = os.path.join(self.pseudo_run, self.file, "SampleSheet.csv")
pseudo_numbers = self._get_pseudo_numbers(sample_sheet_path)
run_path = os.path.join(run_path, filename, "Samples")
Path(run_path).mkdir(parents=True, exist_ok=True)
run_samples_path = os.path.join(run_samples_path, self.file, "Samples")
Path(run_samples_path).mkdir(parents=True, exist_ok=True)

for pseudo_number in pseudo_numbers:
new_pseudo_folder = os.path.join(run_path, pseudo_number)
new_pseudo_folder = os.path.join(run_samples_path, pseudo_number)
Path(new_pseudo_folder).mkdir(parents=True, exist_ok=True)
self._collect_data_for_pseudo_number(filename, new_pseudo_folder, pseudo_number)
self._collect_data_for_pseudo_number(new_pseudo_folder, pseudo_number)

def _create_general_file(self, new_file_path, file):
general_file_path = os.path.join(self.pseudo_run, file)
new_general_file_path = os.path.join(new_file_path, file)
def _create_general_file(self, new_file_path):
general_file_path = os.path.join(self.pseudo_run, self.file)
new_general_file_path = os.path.join(new_file_path, self.file)
Path(new_general_file_path).mkdir(parents=True, exist_ok=True)

run_parameters = os.path.join(general_file_path, "runParameters.xml")
new_run_parameters = os.path.join(new_general_file_path, "runParameters.xml")
copy_if_exists(run_parameters, new_run_parameters)

run_info = os.path.join(general_file_path, "RunInfo.xml")
new_run_info = os.path.join(new_general_file_path, "RunInfo.xml")
copy_if_exists(run_info, new_run_info)

completed_job = os.path.join(general_file_path, "CompletedJobInfo.xml")
new_completed_job = os.path.join(new_general_file_path, "CompletedJobInfo.xml")
copy_if_exists(completed_job, new_completed_job)
self._copy_important_files(general_file_path, new_general_file_path)
self._copy_important_folders(general_file_path, new_general_file_path)

generate_statistics = os.path.join(general_file_path, "GenerateFASTQRunStatistics.xml")
new_generate_statistics = os.path.join(new_general_file_path, "GenerateFASTQRunStatistics.xml")
copy_if_exists(generate_statistics, new_generate_statistics)
def _copy_important_files(self, old_path, new_path):
files_to_move = ["runParameters.xml", "RunInfo.xml", "CompletedJobInfo.xml",
"GenerateFASTQRunStatistics.xml", "AnalysisLog.txt", "SampleSheet.csv"]

analysis_log = os.path.join(general_file_path, "AnalysisLog.txt")
new_analysis_log = os.path.join(new_general_file_path, "AnalysisLog.txt")
copy_if_exists(analysis_log, new_analysis_log)
for file in files_to_move:
run_parameters = os.path.join(old_path, file)
new_run_parameters = os.path.join(new_path, file)
copy_if_exists(run_parameters, new_run_parameters)

sample_sheet = os.path.join(general_file_path, "SampleSheet.csv")
new_sample_sheet = os.path.join(new_general_file_path, "SampleSheet.csv")
copy_if_exists(sample_sheet, new_sample_sheet)

data_intensities_basecalls_alignments = os.path.join(general_file_path, "Data",
"Intensities", "BaseCalls", "Alignment")
new_data_intenstisities_basecalls_alignment = os.path.join(new_general_file_path, "Data",
"Intensities", "BaseCalls")
Path(new_data_intenstisities_basecalls_alignment).mkdir(parents=True, exist_ok=True)
copy_folder_if_exists(
data_intensities_basecalls_alignments,
os.path.join(new_data_intenstisities_basecalls_alignment, "Alignment"))

catalog_info_per_pac = os.path.join(general_file_path, "catalog_info_per_pred_number")
new_catalog_info_per_pac = os.path.join(new_general_file_path, "catalog_info_per_pred_number")
copy_folder_if_exists(catalog_info_per_pac, new_catalog_info_per_pac)
def _copy_important_folders(self, old_path, new_path):
folder_paths = [os.path.join("Data", "Intensities", "BaseCalls", "Alignment"),
"catalog_info_per_pred_number"]
for folder in folder_paths:
old_folder_path = os.path.join(old_path, folder)
new_folder_path = os.path.join(new_path, os.path.basename(folder))
copy_folder_if_exists(old_folder_path, new_folder_path)

def _get_pseudo_numbers(self, sample_sheet_path):
df = pd.read_csv(sample_sheet_path, delimiter=",",
Expand All @@ -104,17 +79,21 @@ def _get_pseudo_numbers(self, sample_sheet_path):

return pseudo_numbers

def _collect_data_for_pseudo_number(self, filename, new_folder, pseudo_number):
basecalls = os.path.join(self.pseudo_run, filename, "Data", "Intensities", "BaseCalls")
def _collect_data_for_pseudo_number(self, new_folder, pseudo_number):
basecalls = os.path.join(self.pseudo_run, self.file, "Data", "Intensities", "BaseCalls")

new_fastq_folder = os.path.join(new_folder, "FASTQ")
Path(new_fastq_folder).mkdir(parents=True, exist_ok=True)

for file in os.listdir(basecalls):
if pseudo_number in file:
copy_if_exists(os.path.join(basecalls, file), os.path.join(new_fastq_folder, file))
copy_if_exists(os.path.join(basecalls, file),
os.path.join(new_fastq_folder, file))

self._collect_analysis(new_folder, pseudo_number)

# analysis
analysis = os.path.join(self.pseudo_run, filename, "Analysis")
def _collect_analysis(self, new_folder, pseudo_number):
analysis = os.path.join(self.pseudo_run, self.file, "Analysis")
if os.path.exists(analysis):
new_analysis = os.path.join(new_folder, "Analysis")
Path(new_analysis).mkdir(parents=True, exist_ok=True)
Expand Down Expand Up @@ -165,8 +144,8 @@ def _get_convert(self, path, new_path):
if file.endswith("RemoveDuplicates.log"):
shutil.copy2(os.path.join(path, file), os.path.join(new_path, file))

def _create_patient_files_if_clinical_data_exist(self, run_file):
clinical_info_path = os.path.join(self.pseudo_run, run_file, "catalog_info_per_pred_number")
def _create_patient_files_if_clinical_data_exist(self):
clinical_info_path = os.path.join(self.pseudo_run, self.file, "catalog_info_per_pred_number")

if not os.path.exists(clinical_info_path):
return
Expand Down
8 changes: 8 additions & 0 deletions organiser/run_organisers/organise_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from abc import ABC, abstractmethod


class OrganiseRun(ABC):

@abstractmethod
def organise_run(self):
...
Loading

0 comments on commit 8fff7a4

Please sign in to comment.