Skip to content

Commit

Permalink
feat: Refactor anomaly test output (#112)
Browse files Browse the repository at this point in the history
* refactor: Use normalized score as default for anomaly inference

* docs: Update changelog

* build: Upgrade version
  • Loading branch information
lorenzomammana authored Apr 18, 2024
1 parent 1c04008 commit a79d63a
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 22 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@
# Changelog
All notable changes to this project will be documented in this file.

### [2.1.1]

#### Updated

- Anomaly test task now exports results based on the normalized anomaly scores instead of the raw scores. The normalized anomaly scores and the optimal threshold are computed based on the training threshold of the model.

### [2.1.0]

#### Updated
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "quadra"
version = "2.1.0"
version = "2.1.1"
description = "Deep Learning experiment orchestration library"
authors = [
"Federico Belotti <[email protected]>",
Expand Down
2 changes: 1 addition & 1 deletion quadra/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "2.1.0"
__version__ = "2.1.1"


def get_version():
Expand Down
46 changes: 26 additions & 20 deletions quadra/tasks/anomaly.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,10 +433,22 @@ def generate_report(self) -> None:
if len(self.report_path) > 0:
os.makedirs(self.report_path, exist_ok=True)

# TODO: We currently don't use anomaly for segmentation, so the pixel threshold handling is not properly
# implemented and we produce as output only a single threshold.
training_threshold = self.model_data[f"{self.training_threshold_type}_threshold"]
optimal_threshold = self.metadata["threshold"]

normalized_optimal_threshold = cast(float, normalize_anomaly_score(optimal_threshold, training_threshold))

os.makedirs(os.path.join(self.report_path, "predictions"), exist_ok=True)
os.makedirs(os.path.join(self.report_path, "heatmaps"), exist_ok=True)

anomaly_scores = self.metadata["anomaly_scores"].cpu().numpy()
anomaly_scores = normalize_anomaly_score(anomaly_scores, training_threshold)

if not isinstance(anomaly_scores, np.ndarray):
raise ValueError("Anomaly scores must be a numpy array")

good_scores = anomaly_scores[np.where(np.array(self.metadata["image_labels"]) == 0)]
defect_scores = anomaly_scores[np.where(np.array(self.metadata["image_labels"]) == 1)]

Expand All @@ -447,25 +459,18 @@ def generate_report(self) -> None:
np.where((anomaly_scores >= defect_scores.min()) & (anomaly_scores <= good_scores.max()))[0]
)

plot_cumulative_histogram(good_scores, defect_scores, self.metadata["threshold"], self.report_path)
plot_cumulative_histogram(good_scores, defect_scores, normalized_optimal_threshold, self.report_path)

json_output = {
"observations": [],
"threshold": np.round(self.metadata["threshold"], 3),
"threshold": np.round(normalized_optimal_threshold, 3),
"unnormalized_threshold": np.round(optimal_threshold, 3),
"f1_score": np.round(self.metadata["optimal_f1"], 3),
"metrics": {
"overlapping_scores": count_overlapping_scores,
},
}

min_anomaly_score = self.metadata["anomaly_scores"].min().item()
max_anomaly_score = self.metadata["anomaly_scores"].max().item()

if min_anomaly_score == max_anomaly_score:
# Handle the case where all anomaly scores are the same, skip normalization
min_anomaly_score = 0
max_anomaly_score = 1

tg, fb, fg, tb = 0, 0, 0, 0

mask_area = None
Expand All @@ -478,12 +483,17 @@ def generate_report(self) -> None:
if hasattr(self.datamodule, "crop_area") and self.datamodule.crop_area is not None:
crop_area = self.datamodule.crop_area

anomaly_maps = normalize_anomaly_score(self.metadata["anomaly_maps"], training_threshold)

if not isinstance(anomaly_maps, torch.Tensor):
raise ValueError("Anomaly maps must be a tensor")

for img_path, gt_label, anomaly_score, anomaly_map in tqdm(
zip(
self.metadata["image_paths"],
self.metadata["image_labels"],
self.metadata["anomaly_scores"],
self.metadata["anomaly_maps"],
anomaly_scores,
anomaly_maps,
),
total=len(self.metadata["image_paths"]),
):
Expand All @@ -494,11 +504,10 @@ def generate_report(self) -> None:
if crop_area is not None:
img = img[crop_area[1] : crop_area[3], crop_area[0] : crop_area[2]]

output_mask = (anomaly_map >= self.metadata["threshold"]).cpu().numpy().squeeze().astype(np.uint8)
output_mask = (anomaly_map >= normalized_optimal_threshold).cpu().numpy().squeeze().astype(np.uint8)
output_mask_label = os.path.basename(os.path.dirname(img_path))
output_mask_name = os.path.splitext(os.path.basename(img_path))[0] + ".png"
pred_label = int(anomaly_score >= self.metadata["threshold"])
anomaly_confidence = normalize_anomaly_score(anomaly_score.item(), threshold=self.metadata["threshold"])
pred_label = int(anomaly_score >= normalized_optimal_threshold)

json_output["observations"].append(
{
Expand All @@ -510,7 +519,6 @@ def generate_report(self) -> None:
"prediction_heatmap": os.path.join("heatmaps", output_mask_label, output_mask_name),
"is_correct": pred_label == gt_label if gt_label != -1 else True,
"anomaly_score": f"{anomaly_score.item():.3f}",
"anomaly_confidence": f"{anomaly_confidence:.3f}",
}
)

Expand All @@ -530,12 +538,10 @@ def generate_report(self) -> None:
cv2.imwrite(os.path.join(output_prediction_folder, output_mask_name), output_mask)

# Normalize the map and rescale it to 0-1 range
# In this case we are saying that the anomaly map is in the range [50, 150]
# In this case we are saying that the anomaly map is in the range [normalized_th - 50, normalized_th + 50]
# This allow to have a stronger color for the anomalies and a lighter one for really normal regions
# It's also independent from the max or min anomaly score!
normalized_map: MapOrValue = (normalize_anomaly_score(anomaly_map, self.metadata["threshold"]) - 50.0) / (
150.0 - 50.0
)
normalized_map: MapOrValue = (anomaly_map - (normalized_optimal_threshold - 50)) / 100

if isinstance(normalized_map, torch.Tensor):
normalized_map = normalized_map.cpu().numpy().squeeze()
Expand Down

0 comments on commit a79d63a

Please sign in to comment.