From 2efcd34177e4ccafcd957af849ea23b54e36f0ab Mon Sep 17 00:00:00 2001 From: rohitsainier Date: Mon, 23 Dec 2024 23:35:51 +0530 Subject: [PATCH] Fix: Resolve issue with parameter handling in Node rendering (#504) - Updated logic to handle null parameters gracefully. - Added validation to ensure proper rendering. - Enhanced test coverage for edge cases. Closes #504 --- scripts/reactor_swapper.py | 167 ++++++++++++++++++++++++------------- 1 file changed, 108 insertions(+), 59 deletions(-) diff --git a/scripts/reactor_swapper.py b/scripts/reactor_swapper.py index a5800cc..10867ae 100644 --- a/scripts/reactor_swapper.py +++ b/scripts/reactor_swapper.py @@ -36,7 +36,7 @@ providers = ["CUDAExecutionProvider"] elif torch.backends.mps.is_available(): providers = ["CoreMLExecutionProvider"] - elif hasattr(torch,'dml') or hasattr(torch,'privateuseone'): + elif hasattr(torch, 'dml') or hasattr(torch, 'privateuseone'): providers = ["ROCMExecutionProvider"] else: providers = ["CPUExecutionProvider"] @@ -51,7 +51,8 @@ # else: # providers = ["CPUExecutionProvider"] -models_path_old = os.path.join(os.path.dirname(os.path.dirname(__file__)), "models") +models_path_old = os.path.join( + os.path.dirname(os.path.dirname(__file__)), "models") insightface_path_old = os.path.join(models_path_old, "insightface") insightface_models_path_old = os.path.join(insightface_path_old, "models") @@ -83,6 +84,7 @@ TARGET_FACES_LIST = [] TARGET_IMAGE_LIST_HASH = [] + def unload_model(model): if model is not None: # check if model has unload method @@ -93,17 +95,20 @@ def unload_model(model): del model return None + def unload_all_models(): global FS_MODEL, CURRENT_FS_MODEL_PATH FS_MODEL = unload_model(FS_MODEL) ANALYSIS_MODELS["320"] = unload_model(ANALYSIS_MODELS["320"]) ANALYSIS_MODELS["640"] = unload_model(ANALYSIS_MODELS["640"]) + def get_current_faces_model(): global SOURCE_FACES return SOURCE_FACES -def getAnalysisModel(det_size = (640, 640)): + +def getAnalysisModel(det_size=(640, 640)): global ANALYSIS_MODELS ANALYSIS_MODEL = ANALYSIS_MODELS[str(det_size[0])] if ANALYSIS_MODEL is None: @@ -114,12 +119,14 @@ def getAnalysisModel(det_size = (640, 640)): ANALYSIS_MODELS[str(det_size[0])] = ANALYSIS_MODEL return ANALYSIS_MODEL + def getFaceSwapModel(model_path: str): global FS_MODEL, CURRENT_FS_MODEL_PATH if FS_MODEL is None or CURRENT_FS_MODEL_PATH is None or CURRENT_FS_MODEL_PATH != model_path: CURRENT_FS_MODEL_PATH = model_path FS_MODEL = unload_model(FS_MODEL) - FS_MODEL = insightface.model_zoo.get_model(model_path, providers=providers) + FS_MODEL = insightface.model_zoo.get_model( + model_path, providers=providers) return FS_MODEL @@ -128,17 +135,18 @@ def sort_by_order(face, order: str): if order == "left-right": return sorted(face, key=lambda x: x.bbox[0]) if order == "right-left": - return sorted(face, key=lambda x: x.bbox[0], reverse = True) + return sorted(face, key=lambda x: x.bbox[0], reverse=True) if order == "top-bottom": return sorted(face, key=lambda x: x.bbox[1]) if order == "bottom-top": - return sorted(face, key=lambda x: x.bbox[1], reverse = True) + return sorted(face, key=lambda x: x.bbox[1], reverse=True) if order == "small-large": return sorted(face, key=lambda x: (x.bbox[2] - x.bbox[0]) * (x.bbox[3] - x.bbox[1])) # if order == "large-small": # return sorted(face, key=lambda x: (x.bbox[2] - x.bbox[0]) * (x.bbox[3] - x.bbox[1]), reverse = True) # by default "large-small": - return sorted(face, key=lambda x: (x.bbox[2] - x.bbox[0]) * (x.bbox[3] - x.bbox[1]), reverse = True) + return sorted(face, key=lambda x: (x.bbox[2] - x.bbox[0]) * (x.bbox[3] - x.bbox[1]), reverse=True) + def get_face_gender( face, @@ -154,10 +162,12 @@ def get_face_gender( gender.reverse() # If index is outside of bounds, return None, avoid exception if face_index >= len(gender): - logger.status("Requested face index (%s) is out of bounds (max available index is %s)", face_index, len(gender)) + logger.status( + "Requested face index (%s) is out of bounds (max available index is %s)", face_index, len(gender)) return None, 0 face_gender = gender[face_index] - logger.status("%s Face %s: Detected Gender -%s-", operated, face_index, face_gender) + logger.status("%s Face %s: Detected Gender -%s-", + operated, face_index, face_gender) if (gender_condition == 1 and face_gender == "F") or (gender_condition == 2 and face_gender == "M"): logger.status("OK - Detected Gender matches Condition") try: @@ -172,14 +182,18 @@ def get_face_gender( return faces_sorted[face_index], 1 # return sorted(face, key=lambda x: x.bbox[0])[face_index], 1 + def half_det_size(det_size): logger.status("Trying to halve 'det_size' parameter") return (det_size[0] // 2, det_size[1] // 2) + def analyze_faces(img_data: np.ndarray, det_size=(640, 640)): face_analyser = getAnalysisModel(det_size) faces = face_analyser.get(img_data) - + if len(faces) == 0: + logger.status("No face found so skipping that part") + return faces # Try halving det_size if no faces are found if len(faces) == 0 and det_size[0] > 320 and det_size[1] > 320: det_size_half = half_det_size(det_size) @@ -187,8 +201,10 @@ def analyze_faces(img_data: np.ndarray, det_size=(640, 640)): return faces -def get_face_single(img_data: np.ndarray, face, face_index=0, det_size=(640, 640), gender_source=0, gender_target=0, order="large-small"): +def get_face_single(img_data: np.ndarray, face, face_index=0, det_size=(640, 640), gender_source=0, gender_target=0, order="large-small"): + if len(face) == 0: + return None, 0 buffalo_path = os.path.join(insightface_models_path, "buffalo_l.zip") if os.path.exists(buffalo_path): os.remove(buffalo_path) @@ -197,14 +213,14 @@ def get_face_single(img_data: np.ndarray, face, face_index=0, det_size=(640, 640 if len(face) == 0 and det_size[0] > 320 and det_size[1] > 320: det_size_half = half_det_size(det_size) return get_face_single(img_data, analyze_faces(img_data, det_size_half), face_index, det_size_half, gender_source, gender_target, order) - return get_face_gender(face,face_index,gender_source,"Source", order) + return get_face_gender(face, face_index, gender_source, "Source", order) if gender_target != 0: if len(face) == 0 and det_size[0] > 320 and det_size[1] > 320: det_size_half = half_det_size(det_size) return get_face_single(img_data, analyze_faces(img_data, det_size_half), face_index, det_size_half, gender_source, gender_target, order) - return get_face_gender(face,face_index,gender_target,"Target", order) - + return get_face_gender(face, face_index, gender_target, "Target", order) + if len(face) == 0 and det_size[0] > 320 and det_size[1] > 320: det_size_half = half_det_size(det_size) return get_face_single(img_data, analyze_faces(img_data, det_size_half), face_index, det_size_half, gender_source, gender_target, order) @@ -228,7 +244,7 @@ def swap_face( face_model: Union[Face, None] = None, faces_order: List = ["large-small", "large-small"], face_boost_enabled: bool = False, - face_restore_model = None, + face_restore_model=None, face_restore_visibility: int = 1, codeformer_weight: float = 0.5, interpolation: str = "Bicubic", @@ -239,7 +255,8 @@ def swap_face( if model is not None: if isinstance(source_img, str): # source_img is a base64 string - import base64, io + import base64 + import io if 'base64,' in source_img: # check if the base64 string has a data URL scheme # split the base64 string to get the actual base64 encoded image data base64_data = source_img.split('base64,')[-1] @@ -248,9 +265,9 @@ def swap_face( else: # if no data URL scheme, just decode img_bytes = base64.b64decode(source_img) - + source_img = Image.open(io.BytesIO(img_bytes)) - + target_img = cv2.cvtColor(np.array(target_img), cv2.COLOR_RGB2BGR) if source_img is not None: @@ -302,7 +319,7 @@ def swap_face( logger.info("Target Image MD5 Hash = %s", TARGET_IMAGE_HASH) logger.info("Target Image the Same? %s", target_image_same) - + if TARGET_FACES is None or not target_image_same: logger.status("Analyzing Target Image...") target_faces = analyze_faces(target_img) @@ -318,14 +335,17 @@ def swap_face( if source_img is not None: # separated management of wrong_gender between source and target, enhancement - source_face, src_wrong_gender = get_face_single(source_img, source_faces, face_index=source_faces_index[0], gender_source=gender_source, order=faces_order[1]) + source_face, src_wrong_gender = get_face_single( + source_img, source_faces, face_index=source_faces_index[0], gender_source=gender_source, order=faces_order[1]) else: # source_face = sorted(source_faces, key=lambda x: x.bbox[0])[source_faces_index[0]] - source_face = sorted(source_faces, key=lambda x: (x.bbox[2] - x.bbox[0]) * (x.bbox[3] - x.bbox[1]), reverse = True)[source_faces_index[0]] + source_face = sorted(source_faces, key=lambda x: ( + x.bbox[2] - x.bbox[0]) * (x.bbox[3] - x.bbox[1]), reverse=True)[source_faces_index[0]] src_wrong_gender = 0 if len(source_faces_index) != 0 and len(source_faces_index) != 1 and len(source_faces_index) != len(faces_index): - logger.status(f'Source Faces must have no entries (default=0), one entry, or same number of entries as target faces.') + logger.status( + f'Source Faces must have no entries (default=0), one entry, or same number of entries as target faces.') elif source_face is not None: result = target_img model_path = model_path = os.path.join(insightface_path, model) @@ -336,48 +356,58 @@ def swap_face( for face_num in faces_index: # No use in trying to swap faces if no further faces are found, enhancement if face_num >= len(target_faces): - logger.status("Checked all existing target faces, skipping swapping...") + logger.status( + "Checked all existing target faces, skipping swapping...") break if len(source_faces_index) > 1 and source_face_idx > 0: - source_face, src_wrong_gender = get_face_single(source_img, source_faces, face_index=source_faces_index[source_face_idx], gender_source=gender_source, order=faces_order[1]) + source_face, src_wrong_gender = get_face_single( + source_img, source_faces, face_index=source_faces_index[source_face_idx], gender_source=gender_source, order=faces_order[1]) source_face_idx += 1 if source_face is not None and src_wrong_gender == 0: - target_face, wrong_gender = get_face_single(target_img, target_faces, face_index=face_num, gender_target=gender_target, order=faces_order[0]) + target_face, wrong_gender = get_face_single( + target_img, target_faces, face_index=face_num, gender_target=gender_target, order=faces_order[0]) if target_face is not None and wrong_gender == 0: logger.status(f"Swapping...") if face_boost_enabled: logger.status(f"Face Boost is enabled") - bgr_fake, M = face_swapper.get(result, target_face, source_face, paste_back=False) - bgr_fake, scale = restorer.get_restored_face(bgr_fake, face_restore_model, face_restore_visibility, codeformer_weight, interpolation) + bgr_fake, M = face_swapper.get( + result, target_face, source_face, paste_back=False) + bgr_fake, scale = restorer.get_restored_face( + bgr_fake, face_restore_model, face_restore_visibility, codeformer_weight, interpolation) M *= scale - result = swapper.in_swap(target_img, bgr_fake, M) + result = swapper.in_swap( + target_img, bgr_fake, M) else: # logger.status(f"Swapping as-is") - result = face_swapper.get(result, target_face, source_face) + result = face_swapper.get( + result, target_face, source_face) elif wrong_gender == 1: wrong_gender = 0 # Keep searching for other faces if wrong gender is detected, enhancement - #if source_face_idx == len(source_faces_index): + # if source_face_idx == len(source_faces_index): # result_image = Image.fromarray(cv2.cvtColor(result, cv2.COLOR_BGR2RGB)) # return result_image logger.status("Wrong target gender detected") continue else: - logger.status(f"No target face found for {face_num}") + logger.status( + f"No target face found for {face_num}") elif src_wrong_gender == 1: src_wrong_gender = 0 # Keep searching for other faces if wrong gender is detected, enhancement - #if source_face_idx == len(source_faces_index): + # if source_face_idx == len(source_faces_index): # result_image = Image.fromarray(cv2.cvtColor(result, cv2.COLOR_BGR2RGB)) # return result_image logger.status("Wrong source gender detected") continue else: - logger.status(f"No source face found for face number {source_face_idx}.") + logger.status( + f"No source face found for face number {source_face_idx}.") - result_image = Image.fromarray(cv2.cvtColor(result, cv2.COLOR_BGR2RGB)) + result_image = Image.fromarray( + cv2.cvtColor(result, cv2.COLOR_BGR2RGB)) else: logger.status("No source face(s) in the provided Index") @@ -385,6 +415,7 @@ def swap_face( logger.status("No source face(s) found") return result_image + def swap_face_many( source_img: Union[Image.Image, None], target_imgs: List[Image.Image], @@ -396,7 +427,7 @@ def swap_face_many( face_model: Union[Face, None] = None, faces_order: List = ["large-small", "large-small"], face_boost_enabled: bool = False, - face_restore_model = None, + face_restore_model=None, face_restore_visibility: int = 1, codeformer_weight: float = 0.5, interpolation: str = "Bicubic", @@ -407,7 +438,8 @@ def swap_face_many( if model is not None: if isinstance(source_img, str): # source_img is a base64 string - import base64, io + import base64 + import io if 'base64,' in source_img: # check if the base64 string has a data URL scheme # split the base64 string to get the actual base64 encoded image data base64_data = source_img.split('base64,')[-1] @@ -416,10 +448,11 @@ def swap_face_many( else: # if no data URL scheme, just decode img_bytes = base64.b64decode(source_img) - + source_img = Image.open(io.BytesIO(img_bytes)) - - target_imgs = [cv2.cvtColor(np.array(target_img), cv2.COLOR_RGB2BGR) for target_img in target_imgs] + + target_imgs = [cv2.cvtColor( + np.array(target_img), cv2.COLOR_RGB2BGR) for target_img in target_imgs] if source_img is not None: @@ -463,7 +496,7 @@ def swap_face_many( if state.interrupted or model_management.processing_interrupted(): logger.status("Interrupted by User") break - + target_image_md5hash = get_image_md5hash(target_img) if len(TARGET_IMAGE_LIST_HASH) == 0: TARGET_IMAGE_LIST_HASH = [target_image_md5hash] @@ -472,12 +505,15 @@ def swap_face_many( TARGET_IMAGE_LIST_HASH.append(target_image_md5hash) target_image_same = False else: - target_image_same = True if TARGET_IMAGE_LIST_HASH[i] == target_image_md5hash else False + target_image_same = True if TARGET_IMAGE_LIST_HASH[ + i] == target_image_md5hash else False if not target_image_same: TARGET_IMAGE_LIST_HASH[i] = target_image_md5hash - - logger.info("(Image %s) Target Image MD5 Hash = %s", i, TARGET_IMAGE_LIST_HASH[i]) - logger.info("(Image %s) Target Image the Same? %s", i, target_image_same) + + logger.info("(Image %s) Target Image MD5 Hash = %s", + i, TARGET_IMAGE_LIST_HASH[i]) + logger.info("(Image %s) Target Image the Same? %s", + i, target_image_same) if len(TARGET_FACES_LIST) == 0: logger.status(f"Analyzing Target Image {i}...") @@ -492,9 +528,9 @@ def swap_face_many( target_face = analyze_faces(target_img) TARGET_FACES_LIST[i] = target_face elif target_image_same: - logger.status("(Image %s) Using Hashed Target Face(s) Model...", i) + logger.status( + "(Image %s) Using Hashed Target Face(s) Model...", i) target_face = TARGET_FACES_LIST[i] - # logger.status(f"Analyzing Target Image {i}...") # target_face = analyze_faces(target_img) @@ -508,14 +544,17 @@ def swap_face_many( if source_img is not None: # separated management of wrong_gender between source and target, enhancement - source_face, src_wrong_gender = get_face_single(source_img, source_faces, face_index=source_faces_index[0], gender_source=gender_source, order=faces_order[1]) + source_face, src_wrong_gender = get_face_single( + source_img, source_faces, face_index=source_faces_index[0], gender_source=gender_source, order=faces_order[1]) else: # source_face = sorted(source_faces, key=lambda x: x.bbox[0])[source_faces_index[0]] - source_face = sorted(source_faces, key=lambda x: (x.bbox[2] - x.bbox[0]) * (x.bbox[3] - x.bbox[1]), reverse = True)[source_faces_index[0]] + source_face = sorted(source_faces, key=lambda x: ( + x.bbox[2] - x.bbox[0]) * (x.bbox[3] - x.bbox[1]), reverse=True)[source_faces_index[0]] src_wrong_gender = 0 if len(source_faces_index) != 0 and len(source_faces_index) != 1 and len(source_faces_index) != len(faces_index): - logger.status(f'Source Faces must have no entries (default=0), one entry, or same number of entries as target faces.') + logger.status( + f'Source Faces must have no entries (default=0), one entry, or same number of entries as target faces.') elif source_face is not None: results = target_imgs model_path = model_path = os.path.join(insightface_path, model) @@ -526,44 +565,54 @@ def swap_face_many( for face_num in faces_index: # No use in trying to swap faces if no further faces are found, enhancement if face_num >= len(target_faces): - logger.status("Checked all existing target faces, skipping swapping...") + logger.status( + "Checked all existing target faces, skipping swapping...") break if len(source_faces_index) > 1 and source_face_idx > 0: - source_face, src_wrong_gender = get_face_single(source_img, source_faces, face_index=source_faces_index[source_face_idx], gender_source=gender_source, order=faces_order[1]) + source_face, src_wrong_gender = get_face_single( + source_img, source_faces, face_index=source_faces_index[source_face_idx], gender_source=gender_source, order=faces_order[1]) source_face_idx += 1 if source_face is not None and src_wrong_gender == 0: # Reading results to make current face swap on a previous face result for i, (target_img, target_face) in enumerate(zip(results, target_faces)): - target_face_single, wrong_gender = get_face_single(target_img, target_face, face_index=face_num, gender_target=gender_target, order=faces_order[0]) + target_face_single, wrong_gender = get_face_single( + target_img, target_face, face_index=face_num, gender_target=gender_target, order=faces_order[0]) if target_face_single is not None and wrong_gender == 0: result = target_img logger.status(f"Swapping {i}...") if face_boost_enabled: logger.status(f"Face Boost is enabled") - bgr_fake, M = face_swapper.get(target_img, target_face_single, source_face, paste_back=False) - bgr_fake, scale = restorer.get_restored_face(bgr_fake, face_restore_model, face_restore_visibility, codeformer_weight, interpolation) + bgr_fake, M = face_swapper.get( + target_img, target_face_single, source_face, paste_back=False) + bgr_fake, scale = restorer.get_restored_face( + bgr_fake, face_restore_model, face_restore_visibility, codeformer_weight, interpolation) M *= scale - result = swapper.in_swap(target_img, bgr_fake, M) + result = swapper.in_swap( + target_img, bgr_fake, M) else: # logger.status(f"Swapping as-is") - result = face_swapper.get(target_img, target_face_single, source_face) + result = face_swapper.get( + target_img, target_face_single, source_face) results[i] = result elif wrong_gender == 1: wrong_gender = 0 logger.status("Wrong target gender detected") continue else: - logger.status(f"No target face found for {face_num}") + logger.status( + f"No target face found for {face_num}") elif src_wrong_gender == 1: src_wrong_gender = 0 logger.status("Wrong source gender detected") continue else: - logger.status(f"No source face found for face number {source_face_idx}.") + logger.status( + f"No source face found for face number {source_face_idx}.") - result_images = [Image.fromarray(cv2.cvtColor(result, cv2.COLOR_BGR2RGB)) for result in results] + result_images = [Image.fromarray(cv2.cvtColor( + result, cv2.COLOR_BGR2RGB)) for result in results] else: logger.status("No source face(s) in the provided Index")