Skip to content

Commit

Permalink
Face filter vgg face (deepfakes#724)
Browse files Browse the repository at this point in the history
* Implement extraction pipeline

* Face filter to vgg_face. Resume partial model downloads

* On-the-fly conversion to extraction pipeline

* Move git model ids from get_model to model definition
  • Loading branch information
torzdf authored May 11, 2019
1 parent c7b128a commit 1483187
Show file tree
Hide file tree
Showing 16 changed files with 646 additions and 420 deletions.
23 changes: 13 additions & 10 deletions lib/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,29 +438,32 @@ def get_argument_list():
"rounding": 2,
"type": float,
"dest": "ref_threshold",
"default": 0.6,
"default": 0.4,
"help": "Threshold for positive face recognition. For use with "
"nfilter or filter. Lower values are stricter."})
"nfilter or filter. Lower values are stricter. NB: Using "
"face filter will significantly decrease extraction speed."})
argument_list.append({"opts": ("-n", "--nfilter"),
"action": FilesFullPaths,
"filetypes": "image",
"dest": "nfilter",
"nargs": "+",
"default": None,
"help": "Reference image for the persons you do "
"not want to process. Should be a front "
"portrait. Multiple images can be added "
"space separated"})
"help": "Reference image for the persons you do not want to "
"process. Should be a front portrait with a single person "
"in the image. Multiple images can be added space "
"separated. NB: Using face filter will significantly "
"decrease extraction speed."})
argument_list.append({"opts": ("-f", "--filter"),
"action": FilesFullPaths,
"filetypes": "image",
"dest": "filter",
"nargs": "+",
"default": None,
"help": "Reference images for the person you "
"want to process. Should be a front "
"portrait. Multiple images can be added "
"space separated"})
"help": "Reference images for the person you want to process. "
"Should be a front portrait with a single person in the "
"image. Multiple images can be added space separated. NB: "
"Using face filter will significantly decrease extraction "
"speed."})
return argument_list


Expand Down
274 changes: 205 additions & 69 deletions lib/face_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,16 @@
""" Face Filterer for extraction in faceswap.py """

import logging
import os
import sys

import face_recognition
import cv2
import numpy as np

from lib.faces_detect import DetectedFace
from lib.logger import get_loglevel
from lib.utils import GetModel
from plugins.extract.pipeline import Extractor

logger = logging.getLogger(__name__) # pylint: disable=invalid-name

Expand All @@ -15,77 +22,206 @@ def avg(arr):


class FaceFilter():
""" Face filter for extraction """
def __init__(self, reference_file_paths, nreference_file_paths, threshold=0.6):
""" Face filter for extraction
NB: we take only first face, so the reference file should only contain one face. """

def __init__(self, reference_file_paths, nreference_file_paths, detector, aligner, loglevel,
multiprocess=False, threshold=0.4):
logger.debug("Initializing %s: (reference_file_paths: %s, nreference_file_paths: %s, "
"threshold: %s)", self.__class__.__name__, reference_file_paths,
nreference_file_paths, threshold)
images = list(map(face_recognition.load_image_file, reference_file_paths))
nimages = list(map(face_recognition.load_image_file, nreference_file_paths))
# Note: we take only first face, so the reference file should only contain one face.
self.encodings = list(map(lambda im: face_recognition.face_encodings(im)[0], images))
self.nencodings = list(map(lambda im: face_recognition.face_encodings(im)[0], nimages))
"detector: %s, aligner: %s. loglevel: %s, multiprocess: %s, threshold: %s)",
self.__class__.__name__, reference_file_paths, nreference_file_paths,
detector, aligner, loglevel, multiprocess, threshold)
git_model_id = 7
model_filename = ["vgg_face_v1.caffemodel", "vgg_face_v1.prototxt"]

self.input_size = 224
self.numeric_loglevel = get_loglevel(loglevel)
self.vgg_face = self.get_model(git_model_id, model_filename)
self.filters = self.load_images(reference_file_paths, nreference_file_paths)
self.align_faces(detector, aligner, loglevel, multiprocess)
self.get_filter_encodings()
self.threshold = threshold
logger.trace("encodings: %s", self.encodings)
logger.trace("nencodings: %s", self.nencodings)
logger.debug("Initialized %s", self.__class__.__name__)

# <<< GET MODEL >>> #
@staticmethod
def get_model(git_model_id, model_filename):
""" Check if model is available, if not, download and unzip it """
root_path = os.path.abspath(os.path.dirname(sys.argv[0]))
cache_path = os.path.join(root_path, "plugins", "extract", ".cache")
model = GetModel(model_filename, cache_path, git_model_id).model_path
vgg_face = cv2.dnn.readNetFromCaffe(model[1], model[0]) # pylint: disable=no-member
vgg_face.setPreferableTarget(cv2.dnn.DNN_TARGET_CPU) # pylint: disable=no-member
return vgg_face

@staticmethod
def load_images(reference_file_paths, nreference_file_paths):
""" Load the images """
retval = dict()
for fpath in reference_file_paths:
retval[fpath] = {"image": cv2.imread(fpath), # pylint: disable=no-member
"type": "filter"}
for fpath in nreference_file_paths:
retval[fpath] = {"image": cv2.imread(fpath), # pylint: disable=no-member
"type": "nfilter"}
logger.debug("Loaded filter images: %s", {k: v["type"] for k, v in retval.items()})
return retval

# Extraction pipeline
def align_faces(self, detector_name, aligner_name, loglevel, multiprocess):
""" Use the requested detectors to retrieve landmarks for filter images """
extractor = Extractor(detector_name, aligner_name, loglevel, multiprocess=multiprocess)
self.run_extractor(extractor)
del extractor
self.load_aligned_face()

def run_extractor(self, extractor):
""" Run extractor to get faces """
exception = False
for _ in range(extractor.passes):
self.queue_images(extractor)
if exception:
break
extractor.launch()
for faces in extractor.detected_faces():
exception = faces.get("exception", False)
if exception:
break
filename = faces["filename"]
detected_faces = faces["detected_faces"]

if len(detected_faces) > 1:
logger.warning("Multiple faces found in %s file: '%s'. Using first detected "
"face.", self.filters[filename]["type"], filename)
detected_faces = [detected_faces[0]]
self.filters[filename]["detected_faces"] = detected_faces

# Aligner output
if extractor.final_pass:
landmarks = faces["landmarks"]
self.filters[filename]["landmarks"] = landmarks

def queue_images(self, extractor):
""" queue images for detection and alignment """
in_queue = extractor.input_queue
for fname, img in self.filters.items():
logger.debug("Adding to filter queue: '%s' (%s)", fname, img["type"])
feed_dict = dict(filename=fname, image=img["image"])
if img.get("detected_faces", None):
feed_dict["detected_faces"] = img["detected_faces"]
logger.debug("Queueing filename: '%s' items: %s",
fname, list(feed_dict.keys()))
in_queue.put(feed_dict)
logger.debug("Sending EOF to filter queue")
in_queue.put("EOF")

def load_aligned_face(self):
""" Align the faces for vgg_face input """
for filename, face in self.filters.items():
logger.debug("Loading aligned face: '%s'", filename)
bounding_box = face["detected_faces"][0]
image = face["image"]
landmarks = face["landmarks"][0]

detected_face = DetectedFace()
detected_face.from_bounding_box(bounding_box, image)
detected_face.landmarksXY = landmarks
detected_face.load_aligned(image, size=224)
face["face"] = detected_face.aligned_face
del face["image"]
logger.debug("Loaded aligned face: ('%s', shape: %s)",
filename, face["face"].shape)

def get_filter_encodings(self):
""" Return filter face encodings from Keras VGG Face """
for filename, face in self.filters.items():
logger.debug("Getting encodings for: '%s'", filename)
encodings = self.predict(face["face"])
logger.debug("Filter Filename: %s, encoding shape: %s", filename, encodings.shape)
face["encoding"] = encodings
del face["face"]

def predict(self, face):
""" Return encodings for given image from vgg_face """
if face.shape[0] != self.input_size:
face = self.resize_face(face)
blob = cv2.dnn.blobFromImage(face, # pylint: disable=no-member
1.0,
(self.input_size, self.input_size),
[104, 117, 123],
False,
False)
self.vgg_face.setInput(blob)
preds = self.vgg_face.forward()[0, :]
return preds

def resize_face(self, face):
""" Resize incoming face to model_input_size """
if face.shape[0] < self.input_size:
interpolation = cv2.INTER_CUBIC # pylint:disable=no-member
else:
interpolation = cv2.INTER_AREA # pylint:disable=no-member

face = cv2.resize(face, # pylint:disable=no-member
dsize=(self.input_size, self.input_size),
interpolation=interpolation)
return face

def check(self, detected_face):
""" Check Face
we could use detected landmarks, but I did not manage to do so.
TODO The copy/paste below should help """
""" Check the extracted Face """
logger.trace("Checking face with FaceFilter")
encodings = face_recognition.face_encodings(detected_face.image)
if not encodings:
logger.verbose("No face encodings found")
return False

if self.encodings:
distances = list(face_recognition.face_distance(self.encodings, encodings[0]))
logger.trace("Distances: %s", distances)
distance = avg(distances)
logger.trace("Average Distance: %s", distance)
mindistance = min(distances)
logger.trace("Minimum Distance: %s", mindistance)
if distance > self.threshold:
logger.verbose("Distance above threshold: %f < %f", distance, self.threshold)
return False
if self.nencodings:
ndistances = list(face_recognition.face_distance(self.nencodings, encodings[0]))
logger.trace("nDistances: %s", ndistances)
ndistance = avg(ndistances)
logger.trace("Average nDistance: %s", ndistance)
nmindistance = min(ndistances)
logger.trace("Minimum nDistance: %s", nmindistance)
if not self.encodings and ndistance < self.threshold:
logger.verbose("nDistance below threshold: %f < %f", ndistance, self.threshold)
return False
if self.encodings:
if mindistance > nmindistance:
logger.verbose("Distance to negative sample is smaller")
return False
if distance > ndistance:
logger.verbose("Average distance to negative sample is smaller")
return False
# k-nn classifier
var_k = min(5, min(len(distances), len(ndistances)) + 1)
var_n = sum(list(map(lambda x: x[0],
list(sorted([(1, d) for d in distances] +
[(0, d) for d in ndistances],
key=lambda x: x[1]))[:var_k])))
ratio = var_n/var_k
if ratio < 0.5:
logger.verbose("K-nn is %.2f", ratio)
return False
return True


# # Copy/Paste (mostly) from private method in face_recognition
# face_recognition_model = face_recognition_models.face_recognition_model_location()
# face_encoder = dlib.face_recognition_model_v1(face_recognition_model)

# def convert(detected_face):
# return np.array(face_encoder.compute_face_descriptor(detected_face.image,
# detected_face.landmarks,
# 1))
# # end of Copy/Paste
distances = {"filter": list(), "nfilter": list()}
encodings = self.predict(detected_face.aligned_face)
for filt in self.filters.values():
similarity = self.find_cosine_similiarity(filt["encoding"], encodings)
distances[filt["type"]].append(similarity)

avgs = {key: avg(val) if val else None for key, val in distances.items()}
mins = {key: min(val) if val else None for key, val in distances.items()}
# Filter
if distances["filter"] and avgs["filter"] > self.threshold:
msg = "Rejecting filter face: {} > {}".format(round(avgs["filter"], 2), self.threshold)
retval = False
# nFilter no Filter
elif not distances["filter"] and avgs["nfilter"] < self.threshold:
msg = "Rejecting nFilter face: {} < {}".format(round(avgs["nfilter"], 2),
self.threshold)
retval = False
# Filter with nFilter
elif distances["filter"] and distances["nfilter"] and mins["filter"] > mins["nfilter"]:
msg = ("Rejecting face as distance from nfilter sample is smaller: (filter: {}, "
"nfilter: {})".format(round(mins["filter"], 2), round(mins["nfilter"], 2)))
retval = False
elif distances["filter"] and distances["nfilter"] and avgs["filter"] > avgs["nfilter"]:
msg = ("Rejecting face as average distance from nfilter sample is smaller: (filter: "
"{}, nfilter: {})".format(round(mins["filter"], 2), round(mins["nfilter"], 2)))
retval = False
elif distances["filter"] and distances["nfilter"]:
# k-nn classifier
var_k = min(5, min(len(distances["filter"]), len(distances["nfilter"])) + 1)
var_n = sum(list(map(lambda x: x[0],
list(sorted([(1, d) for d in distances["filter"]] +
[(0, d) for d in distances["nfilter"]],
key=lambda x: x[1]))[:var_k])))
ratio = var_n/var_k
if ratio < 0.5:
msg = ("Rejecting face as k-nearest neighbors classification is less than "
"0.5: {}".format(round(ratio, 2)))
retval = False
else:
msg = None
retval = True
if msg:
logger.verbose(msg)
else:
logger.trace("Accepted face: (similarity: %s, threshold: %s)",
distances, self.threshold)
return retval

@staticmethod
def find_cosine_similiarity(source_face, test_face):
""" Find the cosine similarity between a source face and a test face """
var_a = np.matmul(np.transpose(source_face), test_face)
var_b = np.sum(np.multiply(source_face, source_face))
var_c = np.sum(np.multiply(test_face, test_face))
return 1 - (var_a / (np.sqrt(var_b) * np.sqrt(var_c)))
3 changes: 2 additions & 1 deletion lib/multithreading.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,8 @@ def join(self, timeout=None):
""" Add logging to join function """
logger.debug("Joining Process: (name: '%s', PID: %s)", self._name, self.pid)
super().join(timeout=timeout)
_launched_processes.remove(self)
if self in _launched_processes:
_launched_processes.remove(self)
logger.debug("Joined Process: (name: '%s', PID: %s)", self._name, self.pid)


Expand Down
Loading

0 comments on commit 1483187

Please sign in to comment.