-
Notifications
You must be signed in to change notification settings - Fork 0
/
preprocess.py
133 lines (95 loc) · 5.38 KB
/
preprocess.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import argparse
import os
import numpy as np
import SimpleITK as sitk
from utils.logger import logger
from utils.filemanager import create_directory_if_not_exists, get_paths, check_paths
from utils.preprocess import bilateral_filter_3d, clahe_3d
from utils.dataset import segment_body, min_max_normalization
def export(args, sample_name, filename_full, final_processed_sitk):
'''
Export the final processed image to the output folder.
Args:
args (argparse): arguments from the command line
sample_name (str): sample name (e.g. copd1, copd2, ...)
filename_full (str): filename full (e.g. copd1_eBHCT, copd1_iBHCT, ...)
final_processed_sitk_int16 (sitk.Image): final processed image
Returns:
None
'''
# cast the final processed image to sitk.int16
final_processed_sitk_int16 = sitk.Cast(final_processed_sitk, sitk.sitkInt16)
# save the final processed image
logger.info(">> Saving the final processed image...")
sample_output_path = os.path.join(args.exp_output, sample_name)
create_directory_if_not_exists(sample_output_path)
sitk.WriteImage(final_processed_sitk_int16, os.path.join(sample_output_path, f"{filename_full}.nii.gz"))
if __name__ == "__main__":
# optional arguments from the command line
parser = argparse.ArgumentParser()
parser.add_argument('--dataset_path', type=str, default='dataset/train', help='root dir for nifti data')
parser.add_argument('--experiment_name', type=str, default='preprocessing1', help='experiment name')
parser.add_argument('--output_path', type=str, default='dataset_processed', help='root dir for output scripts')
# parse the arguments
args = parser.parse_args()
# get the split name from the dataset_path
split_name = args.dataset_path.replace('\\', '/').split('/')[-1]
# create experiment output
args.exp_output = os.path.join(args.output_path, args.experiment_name, split_name)
create_directory_if_not_exists(args.exp_output)
# get the exhale and inhale volumes and segmentations
exhale_volumes = get_paths(args, "eBHCT")
inhale_volumes = get_paths(args, "iBHCT")
check_paths(args, exhale_volumes, "exhale volumes")
check_paths(args, inhale_volumes, "inhale volumes")
# logs
logger.warning("This script doesn't copy the landmarks, the lung masks (segmentation), or the dataset json to the output folder. Please copy them manually.")
logger.info(f"Experiment name: {args.experiment_name}")
logger.info(f"Split name: {split_name}")
logger.info(f"Output path: {args.exp_output}")
for sample_path in exhale_volumes + inhale_volumes:
# defining the sample name and the file names
sample_name = sample_path.split('/')[-1].split('_')[0] #copd1, copd2, ...
filename_full = sample_path.split('/')[-1].split('.')[0] #copd1_eBHCT, copd1_iBHCT, ..
logger.info(f"\nProcessing {sample_name} - {filename_full}")
# read the sample
sample_sitk = sitk.ReadImage(sample_path)
sample_image = sitk.GetArrayFromImage(sample_sitk)
# Get the minimum and maximum intensity values of the input image
min_max_filter = sitk.MinimumMaximumImageFilter()
min_max_filter.Execute(sample_sitk)
original_min = min_max_filter.GetMinimum()
original_max = min_max_filter.GetMaximum()
# set a specific threshold to copd2
if sample_name == 'copd2':
threshold = 430
else:
threshold = 700
print("thresh: ", threshold)
# note that the gantry and black background are still present and we need to remove them.
# segmenting the body and removing the gantry
mask, labeled_mask, largest_masks, body_segmented = \
segment_body(sample_image, threshold=threshold)
# inverging the largest masks to focus on the body for being used as a mask
largest_masks_sitk = sitk.GetImageFromArray(largest_masks)
largest_masks_sitk.CopyInformation(sample_sitk)
largest_masks_inverted_sitk = sitk.Not(largest_masks_sitk)
largest_masks_inverted_image = sitk.GetArrayFromImage(largest_masks_inverted_sitk)
# normalize the image using min-max normalization, excluding the gantry and black background using the largest mask that represents anything except the body
logger.info(">> Normalizing using min-max...")
normalized_image = min_max_normalization(sample_image, largest_masks_inverted_image).astype(np.int16) #
normalized_image_sitk = sitk.GetImageFromArray(normalized_image)
normalized_image_sitk.CopyInformation(sample_sitk)
# only export the normalized image
export(args, sample_name, filename_full, normalized_image_sitk)
# # denoising the image using bilateral filter
# logger.info(">> Denoising using bilateral filter...")
# domain_sigma = 2.0
# range_sigma = 50.0
# filtered_sitk = bilateral_filter_3d(normalized_image_sitk, domain_sigma, range_sigma)
# filtered_image = sitk.GetArrayFromImage(filtered_sitk)
# # contrast enhancment using adaptive histogram equalization
# logger.info(">> Contrast enhancement using CLAHE...")
# final_processed_sitk = clahe_3d(filtered_sitk, clip_limit=0.01)
# # export the final processed image to the output folder
# export(args, sample_name, filename_full, final_processed_sitk)