From 343392813338ae7b10b0a3bbb3b5a9a7da6e588d Mon Sep 17 00:00:00 2001 From: torzdf <36920800+torzdf@users.noreply.github.com> Date: Thu, 27 Aug 2020 16:49:57 +0100 Subject: [PATCH] Add Mouth and Eye Priority to Loss options (#1054) * Priority Training for Mouth and Eyes - Tensorflow * Use chosen loss function for area multipliers * loss multipliers for AMD * Fix mask multipliers for plaid and roll PenalizedMaskLoss into LossWrapper * losses_tf: roll PenalizedMaskLoss into LossWrapper --- lib/faces_detect.py | 117 ++++++++++++++++++++++- lib/model/losses_plaid.py | 164 ++++++++++++++------------------- lib/model/losses_tf.py | 154 +++++++++++++------------------ lib/training_data.py | 122 +++++++++++++++++++----- plugins/train/_config.py | 26 ++++++ plugins/train/model/_base.py | 67 +++++++++++--- plugins/train/trainer/_base.py | 134 +++++++++++++++++++-------- 7 files changed, 521 insertions(+), 263 deletions(-) diff --git a/lib/faces_detect.py b/lib/faces_detect.py index 9e3a370177..37eb3a92c7 100644 --- a/lib/faces_detect.py +++ b/lib/faces_detect.py @@ -147,6 +147,50 @@ def add_mask(self, name, mask, affine_matrix, interpolator, storage_size=128): fsmask.add(mask, affine_matrix, interpolator) self.mask[name] = fsmask + def get_landmark_mask(self, size, area, aligned=True, dilation=0, blur_kernel=0, as_zip=False): + """ Obtain a single channel mask based on the face's landmark points. + + Parameters + ---------- + size: int or tuple + The size of the aligned mask to retrieve. Should be an `int` if an aligned face is + being requested, or a ('height', 'width') shape tuple if a full frame is being + requested + area: ["mouth", "eyes"] + The type of mask to obtain. `face` is a full face mask the others are masks for those + specific areas + aligned: bool + ``True`` if the returned mask should be for an aligned face. ``False`` if a full frame + mask should be returned + dilation: int, optional + The amount of dilation to apply to the mask. `0` for none. Default: `0` + blur_kernel: int, optional + The kernel size for applying gaussian blur to apply to the mask. `0` for none. + Default: `0` + as_zip: bool, optional + ``True`` if the mask should be returned zipped otherwise ``False`` + + Returns + ------- + :class:`numpy.ndarray` or zipped array + The mask as a single channel image of the given :attr:`size` dimension. If + :attr:`as_zip` is ``True`` then the :class:`numpy.ndarray` will be contained within a + zipped container + """ + # TODO Face mask generation from landmarks + logger.trace("size: %s, area: %s, aligned: %s, dilation: %s, blur_kernel: %s, as_zip: %s", + size, area, aligned, dilation, blur_kernel, as_zip) + areas = dict(mouth=[slice(48, 60)], + eyes=[slice(36, 42), slice(42, 48)]) + if aligned and self.aligned.get("size") != size: + self.load_aligned(None, size=size, force=True) + size = (size, size) if aligned else size + landmarks = self.aligned_landmarks if aligned else self.landmarks_xy + points = [landmarks[zone] for zone in areas[area]] + mask = _LandmarksMask(size, points, dilation=dilation, blur_kernel=blur_kernel) + retval = mask.get(as_zip=as_zip) + return retval + def to_alignment(self): """ Return the detected face formatted for an alignments file @@ -511,6 +555,77 @@ def reference_interpolators(self): return get_matrix_scaling(self.reference_matrix) +class _LandmarksMask(): # pylint:disable=too-few-public-methods + """ Create a single channel mask from aligned landmark points. + + size: tuple + The (height, width) shape tuple that the mask should be returned as + points: list + A list of landmark points that correspond to the given shape tuple to create + the mask. Each item in the list should be a :class:`numpy.ndarray` that a filled + convex polygon will be created from + dilation: int, optional + The amount of dilation to apply to the mask. `0` for none. Default: `0` + blur_kernel: int, optional + The kernel size for applying gaussian blur to apply to the mask. `0` for none. Default: `0` + """ + def __init__(self, size, points, dilation=0, blur_kernel=0): + logger.trace("Initializing: %s: (size: %s, points: %s, dilation: %s, blur_kernel: %s)", + size, points, dilation, blur_kernel) + self._size = size + self._points = points + self._dilation = dilation + self._blur_kernel = blur_kernel + self._mask = None + logger.trace("Initialized: %s", self.__class__.__name__) + + def get(self, as_zip=False): + """ Obtain the mask. + + Parameters + ---------- + as_zip: bool, optional + ``True`` if the mask should be returned zipped otherwise ``False`` + + Returns + ------- + :class:`numpy.ndarray` or zipped array + The mask as a single channel image of the given :attr:`size` dimension. If + :attr:`as_zip` is ``True`` then the :class:`numpy.ndarray` will be contained within a + zipped container + """ + if not np.any(self._mask): + self._generate_mask() + retval = compress(self._mask) if as_zip else self._mask + logger.trace("as_zip: %s, retval type: %s", as_zip, type(retval)) + return retval + + def _generate_mask(self): + """ Generate the mask. + + Creates the mask applying any requested dilation and blurring and assigns to + :attr:`_mask` + + Returns + ------- + :class:`numpy.ndarray` + The mask as a single channel image of the given :attr:`size` dimension. + """ + mask = np.zeros((self._size) + (1, ), dtype="float32") + for landmarks in self._points: + lms = np.rint(landmarks).astype("int") + cv2.fillConvexPoly(mask, cv2.convexHull(lms), 1.0, lineType=cv2.LINE_AA) + if self._dilation != 0: + mask = cv2.dilate(mask, + cv2.getStructuringElement(cv2.MORPH_ELLIPSE, + (self._dilation, self._dilation)), + iterations=1) + if self._blur_kernel != 0: + mask = BlurMask("gaussian", mask, self._blur_kernel).blurred + logger.trace("mask: (shape: %s, dtype: %s)", mask.shape, mask.dtype) + self._mask = (mask * 255.0).astype("uint8") + + class Mask(): """ Face Mask information and convenience methods @@ -741,7 +856,7 @@ def _attr_name(dict_key): return retval -class BlurMask(): +class BlurMask(): # pylint:disable=too-few-public-methods """ Factory class to return the correct blur object for requested blur type. Works for square images only. Currently supports Gaussian and Normalized Box Filters. diff --git a/lib/model/losses_plaid.py b/lib/model/losses_plaid.py index f2ffd4a07d..edbb9673ee 100644 --- a/lib/model/losses_plaid.py +++ b/lib/model/losses_plaid.py @@ -138,7 +138,7 @@ def __call__(self, y_true, y_pred): denom = (K.square(u_true) + K.square(u_pred) + self.c_1) * ( var_pred + var_true + self.c_2) ssim /= denom # no need for clipping, c_1 + c_2 make the denorm non-zero - return K.mean((1.0 - ssim) / 2.0) + return (1.0 - ssim) / 2.0 @staticmethod def _preprocess_padding(padding): @@ -199,66 +199,6 @@ def extract_image_patches(self, input_tensor, k_sizes, s_sizes, return patches -class PenalizedLoss(): # pylint:disable=too-few-public-methods - """ Penalized Loss function. - - Applies the given loss function just to the masked area of the image. - - Parameters - ---------- - loss_func: function - The actual loss function to use - mask_prop: float, optional - The amount of mask propagation. Default: `1.0` - """ - def __init__(self, loss_func, mask_prop=1.0): - self._loss_func = loss_func - self._mask_prop = mask_prop - - def __call__(self, y_true, y_pred): - """ Apply the loss function to the masked area of the image. - - Parameters - ---------- - y_true: tensor or variable - The ground truth value. This should contain the mask in the 4th channel that will be - split off for penalizing. - y_pred: tensor or variable - The predicted value - - Returns - ------- - tensor - The Loss value - """ - mask = self._prepare_mask(K.expand_dims(y_true[..., -1], axis=-1)) - y_true = y_true[..., :-1] - n_true = y_true * mask - n_pred = y_pred * mask - if isinstance(self._loss_func, DSSIMObjective): - # Extract Image Patches in SSIM requires that y_pred be of a known shape, so - # specifically reshape the tensor. - n_pred = K.reshape(n_pred, K.int_shape(y_pred)) - return self._loss_func(n_true, n_pred) - - def _prepare_mask(self, mask): - """ Prepare the masks for calculating loss - - Parameters - ---------- - mask: :class:`numpy.ndarray` - The masks for the current batch - - Returns - ------- - tensor - The prepared mask for applying to loss - """ - mask_as_k_inv_prop = 1 - self._mask_prop - mask = (mask * self._mask_prop) + mask_as_k_inv_prop - return mask - - class GeneralizedLoss(): # pylint:disable=too-few-public-methods """ Generalized function used to return a large variety of mathematical loss functions. @@ -564,46 +504,33 @@ def _scharr_edges(cls, image, magnitude): class LossWrapper(): # pylint:disable=too-few-public-methods """ A wrapper class for multiple keras losses to enable multiple weighted loss functions on a - single output. - - Parameters - ---------- - loss_functions: list - A list of either a tuple of (:class:`keras.losses.Loss`, scalar weight) or just a - :class:`keras.losses.Loss` function. If just the loss function is passed, then the weight - is assumed to be 1.0 """ - def __init__(self, loss_functions): - logger.debug("Initializing: %s: (loss_functions: %s)", - self.__class__.__name__, loss_functions) + single output and masking. + """ + def __init__(self): + logger.debug("Initializing: %s", self.__class__.__name__) self._loss_functions = [] self._loss_weights = [] - self._compile_losses(loss_functions) + self._mask_channels = [] logger.debug("Initialized: %s", self.__class__.__name__) - def _compile_losses(self, loss_functions): - """ Splits the given loss_functions into the corresponding :attr:`_loss_functions' and - :attr:`_loss_weights' lists. - - Loss functions are compiled into :class:`keras.compile_utils.LossesContainer` objects + def add_loss(self, function, weight=1.0, mask_channel=-1): + """ Add the given loss function with the given weight to the loss function chain. Parameters ---------- - loss_functions: list - A list of either a tuple of (:class:`keras.losses.Loss`, scalar weight) or just a - :class:`keras.losses.Loss` function. If just the loss function is passed, then the - weight is assumed to be 1.0 """ - for loss_func in loss_functions: - if isinstance(loss_func, tuple): - assert len(loss_func) == 2, "Tuple loss functions should contain 2 items" - assert isinstance(loss_func[1], float), "weight should be a float" - func, weight = loss_func - else: - func = loss_func - weight = 1.0 - self._loss_functions.append(func) - self._loss_weights.append(weight) - logger.debug("Compiled losses: (functions: %s, weights: %s", - self._loss_functions, self._loss_weights) + function: :class:`keras.losses.Loss` + The loss function to add to the loss chain + weight: float, optional + The weighting to apply to the loss function. Default: `1.0` + mask_channel: int, optional + The channel in the `y_true` image that the mask exists in. Set to `-1` if there is no + mask for the given loss function. Default: `-1` + """ + logger.debug("Adding loss: (function: %s, weight: %s, mask_channel: %s)", + function, weight, mask_channel) + self._loss_functions.append(function) + self._loss_weights.append(weight) + self._mask_channels.append(mask_channel) def __call__(self, y_true, y_pred): """ Call the sub loss functions for the loss wrapper. @@ -623,6 +550,51 @@ def __call__(self, y_true, y_pred): The final loss value """ loss = 0.0 - for func, weight in zip(self._loss_functions, self._loss_weights): - loss += (K.mean(func(y_true, y_pred)) * weight) + for func, weight, mask_channel in zip(self._loss_functions, + self._loss_weights, + self._mask_channels): + logger.debug("Processing loss function: (func: %s, weight: %s, mask_channel: %s)", + func, weight, mask_channel) + n_true, n_pred = self._apply_mask(y_true, y_pred, mask_channel) + if isinstance(func, DSSIMObjective): + # Extract Image Patches in SSIM requires that y_pred be of a known shape, so + # specifically reshape the tensor. + n_pred = K.reshape(n_pred, K.int_shape(y_pred)) + this_loss = func(n_true, n_pred) + loss_dims = K.ndim(this_loss) + loss += (K.mean(this_loss, axis=list(range(1, loss_dims))) * weight) return loss + + @classmethod + def _apply_mask(cls, y_true, y_pred, mask_channel, mask_prop=1.0): + """ Apply the mask to the input y_true and y_pred. If a mask is not required then + return the unmasked inputs. + + Parameters + ---------- + y_true: tensor or variable + The ground truth value + y_pred: tensor or variable + The predicted value + mask_channel: int + The channel within y_true that the required mask resides in + mask_prop: float, optional + The amount of mask propagation. Default: `1.0` + + Returns + ------- + tuple + (n_true, n_pred): The ground truth and predicted value tensors with the mask applied + """ + if mask_channel == -1: + logger.debug("No mask to apply") + return y_true[..., :3], y_pred[..., :3] + + logger.debug("Applying mask from channel %s", mask_channel) + mask = K.expand_dims(y_true[..., mask_channel], axis=-1) + mask_as_k_inv_prop = 1 - mask_prop + mask = (mask * mask_prop) + mask_as_k_inv_prop + + n_true = y_true[..., :3] * mask + n_pred = y_pred * mask + return n_true, n_pred diff --git a/lib/model/losses_tf.py b/lib/model/losses_tf.py index c4e5ad78ee..cb5640b5b2 100644 --- a/lib/model/losses_tf.py +++ b/lib/model/losses_tf.py @@ -197,63 +197,6 @@ def extract_image_patches(self, input_tensor, k_sizes, s_sizes, return patches -class PenalizedLoss(tf.keras.losses.Loss): - """ Penalized Loss function. - - Applies the given loss function just to the masked area of the image. - - Parameters - ---------- - loss_func: function - The actual loss function to use - mask_prop: float, optional - The amount of mask propagation. Default: `1.0` - """ - def __init__(self, loss_func, mask_prop=1.0): - super().__init__(name="penalized_loss") - self._loss_func = compile_utils.LossesContainer(loss_func) - self._mask_prop = mask_prop - - def call(self, y_true, y_pred): - """ Apply the loss function to the masked area of the image. - - Parameters - ---------- - y_true: tensor or variable - The ground truth value. This should contain the mask in the 4th channel that will be - split off for penalizing. - y_pred: tensor or variable - The predicted value - - Returns - ------- - tensor - The Loss value - """ - mask = self._prepare_mask(K.expand_dims(y_true[..., -1], axis=-1)) - y_true = y_true[..., :-1] - n_true = K.concatenate([y_true[:, :, :, i:i+1] * mask for i in range(3)], axis=-1) - n_pred = K.concatenate([y_pred[:, :, :, i:i+1] * mask for i in range(3)], axis=-1) - return self._loss_func(n_true, n_pred) - - def _prepare_mask(self, mask): - """ Prepare the masks for calculating loss - - Parameters - ---------- - mask: :class:`numpy.ndarray` - The masks for the current batch - - Returns - ------- - tensor - The prepared mask for applying to loss - """ - mask_as_k_inv_prop = 1 - self._mask_prop - mask = (mask * self._mask_prop) + mask_as_k_inv_prop - return mask - - class GeneralizedLoss(tf.keras.losses.Loss): """ Generalized function used to return a large variety of mathematical loss functions. @@ -559,52 +502,42 @@ def _scharr_edges(cls, image, magnitude): class LossWrapper(tf.keras.losses.Loss): """ A wrapper class for multiple keras losses to enable multiple weighted loss functions on a single output. - - Parameters - ---------- - loss_functions: list - A list of either a tuple of (:class:`keras.losses.Loss`, scalar weight) or just a - :class:`keras.losses.Loss` function. If just the loss function is passed, then the weight - is assumed to be 1.0 """ - def __init__(self, loss_functions): - logger.debug("Initializing: %s: (loss_functions: %s)", - self.__class__.__name__, loss_functions) + """ + def __init__(self): + logger.debug("Initializing: %s", self.__class__.__name__) super().__init__(name="LossWrapper") self._loss_functions = [] self._loss_weights = [] - self._compile_losses(loss_functions) + self._mask_channels = [] logger.debug("Initialized: %s", self.__class__.__name__) - def _compile_losses(self, loss_functions): - """ Splits the given loss_functions into the corresponding :attr:`_loss_functions' and - :attr:`_loss_weights' lists. - - Loss functions are compiled into :class:`keras.compile_utils.LossesContainer` objects + def add_loss(self, function, weight=1.0, mask_channel=-1): + """ Add the given loss function with the given weight to the loss function chain. Parameters ---------- - loss_functions: list - A list of either a tuple of (:class:`keras.losses.Loss`, scalar weight) or just a - :class:`keras.losses.Loss` function. If just the loss function is passed, then the - weight is assumed to be 1.0 """ - for loss_func in loss_functions: - if isinstance(loss_func, tuple): - assert len(loss_func) == 2, "Tuple loss functions should contain 2 items" - assert isinstance(loss_func[1], float), "weight should be a float" - func, weight = loss_func - else: - func = loss_func - weight = 1.0 - self._loss_functions.append(compile_utils.LossesContainer(func)) - self._loss_weights.append(weight) - logger.debug("Compiled losses: (functions: %s, weights: %s", - self._loss_functions, self._loss_weights) + function: :class:`keras.losses.Loss` + The loss function to add to the loss chain + weight: float, optional + The weighting to apply to the loss function. Default: `1.0` + mask_channel: int, optional + The channel in the `y_true` image that the mask exists in. Set to `-1` if there is no + mask for the given loss function. Default: `-1` + """ + logger.debug("Adding loss: (function: %s, weight: %s, mask_channel: %s)", + function, weight, mask_channel) + self._loss_functions.append(compile_utils.LossesContainer(function)) + self._loss_weights.append(weight) + self._mask_channels.append(mask_channel) def call(self, y_true, y_pred): """ Call the sub loss functions for the loss wrapper. Weights are returned as the weighted sum of the chosen losses. + If a mask is being applied to the loss, then the appropriate mask is extracted from y_true + and added as the 4th channel being passed to the penalized loss function. + Parameters ---------- y_true: tensor or variable @@ -618,6 +551,45 @@ def call(self, y_true, y_pred): The final loss value """ loss = 0.0 - for func, weight in zip(self._loss_functions, self._loss_weights): - loss += (func(y_true, y_pred) * weight) + for func, weight, mask_channel in zip(self._loss_functions, + self._loss_weights, + self._mask_channels): + logger.debug("Processing loss function: (func: %s, weight: %s, mask_channel: %s)", + func, weight, mask_channel) + n_true, n_pred = self._apply_mask(y_true, y_pred, mask_channel) + loss += (func(n_true, n_pred) * weight) return loss + + @classmethod + def _apply_mask(cls, y_true, y_pred, mask_channel, mask_prop=1.0): + """ Apply the mask to the input y_true and y_pred. If a mask is not required then + return the unmasked inputs. + + Parameters + ---------- + y_true: tensor or variable + The ground truth value + y_pred: tensor or variable + The predicted value + mask_channel: int + The channel within y_true that the required mask resides in + mask_prop: float, optional + The amount of mask propagation. Default: `1.0` + + Returns + ------- + tuple + (n_true, n_pred): The ground truth and predicted value tensors with the mask applied + """ + if mask_channel == -1: + logger.debug("No mask to apply") + return y_true[..., :3], y_pred[..., :3] + + logger.debug("Applying mask from channel %s", mask_channel) + mask = K.expand_dims(y_true[..., mask_channel], axis=-1) + mask_as_k_inv_prop = 1 - mask_prop + mask = (mask * mask_prop) + mask_as_k_inv_prop + + n_true = K.concatenate([y_true[:, :, :, i:i+1] * mask for i in range(3)], axis=-1) + n_pred = K.concatenate([y_pred[:, :, :, i:i+1] * mask for i in range(3)], axis=-1) + return n_true, n_pred diff --git a/lib/training_data.py b/lib/training_data.py index e6bb5af9a6..0b8aa938c1 100644 --- a/lib/training_data.py +++ b/lib/training_data.py @@ -4,6 +4,7 @@ import logging from random import shuffle, choice +from zlib import decompress import numpy as np import cv2 @@ -16,7 +17,7 @@ logger = logging.getLogger(__name__) # pylint: disable=invalid-name -class TrainingDataGenerator(): +class TrainingDataGenerator(): # pylint:disable=too-few-public-methods """ A Training Data Generator for compiling data for feeding to a model. This class is called from :mod:`plugins.train.trainer._base` and launches a background @@ -54,6 +55,17 @@ class TrainingDataGenerator(): * **masks** (`dict`, `optional`). Required if :attr:`penalized_mask_loss` or \ :attr:`learn_mask` is ``True``. Returning dictionary has a key of **side** (`str`) the \ value of which is a `dict` of {**filename** (`str`): :class:`lib.faces_detect.Mask`}. + + * **masks_eye** (`dict`, `optional`). Required if config option "eye_multiplier" is \ + a value greater than 1. Returning dictionary has a key of **side** (`str`) the \ + value of which is a `dict` of {**filename** (`str`): :class:`bytes`} which is a zipped \ + eye mask. + + * **masks_mouth** (`dict`, `optional`). Required if config option "mouth_multiplier" is \ + a value greater than 1. Returning dictionary has a key of **side** (`str`) the \ + value of which is a `dict` of {**filename** (`str`): :class:`bytes`} which is a zipped \ + mouth mask. + config: dict The configuration `dict` generated from :file:`config.train.ini` containing the trainer \ plugin configuration options. @@ -74,7 +86,9 @@ def __init__(self, model_input_size, model_output_shapes, coverage_ratio, augmen self._no_flip = no_flip self._warp_to_landmarks = warp_to_landmarks self._landmarks = alignments.get("landmarks", None) - self._masks = alignments.get("masks", None) + self._masks = dict(masks=alignments.get("masks", None), + eyes=alignments.get("masks_eye", None), + mouths=alignments.get("masks_mouth", None)) self._nearest_landmarks = {} # Batchsize and processing class are set when this class is called by a feeder @@ -234,27 +248,71 @@ def _process_batch(self, filenames, side): side, {k: v.shape if isinstance(v, np.ndarray) else[i.shape for i in v] for k, v in processed.items()}) - return processed def _apply_mask(self, filenames, batch, side): """ Applies the mask to the 4th channel of the image. If masks are not being used - applies a dummy all ones mask """ - logger.trace("Input batch shape: %s, side: %s", batch.shape, side) - if self._masks is None: - logger.trace("Creating dummy masks. side: %s", side) - masks = np.ones_like(batch[..., :1], dtype=batch.dtype) - else: - logger.trace("Obtaining masks for batch. side: %s", side) - masks = np.array([self._masks[side][filename].mask - for filename, face in zip(filenames, batch)], dtype=batch.dtype) - masks = self._resize_masks(batch.shape[1], masks) + applies a dummy all ones mask. + + If the configuration options `eye_multiplier` and/or `mouth_multiplier` are greater than 1 + then these masks are applied to the final channels of the batch respectively. + + Parameters + ---------- + filenames: list + The list of filenames that correspond to this batch + batch: :class:`numpy.ndarray` + The batch of faces that have been loaded from disk + side: str + '"a"' or '"b"' the side that is being processed - logger.trace("masks shape: %s", masks.shape) - batch = np.concatenate((batch, masks), axis=-1) + Returns + ------- + :class:`numpy.ndarray` + The batch with masks applied to the final channels + """ + logger.trace("Input batch shape: %s, side: %s", batch.shape, side) + size = batch.shape[1] + for key in ("masks", "eyes", "mouths"): + item = self._masks[key] + if item is None and key != "masks": + continue + if item is None and key == "masks": + logger.trace("Creating dummy masks. side: %s", side) + masks = np.ones_like(batch[..., :1], dtype=batch.dtype) + else: + logger.trace("Obtaining masks for batch. (key: %s side: %s)", key, side) + masks = np.array([self._get_mask(item[side][filename], size) + for filename, face in zip(filenames, batch)], dtype=batch.dtype) + masks = self._resize_masks(size, masks) + + logger.trace("masks: (key: %s, shape: %s)", key, masks.shape) + batch = np.concatenate((batch, masks), axis=-1) logger.trace("Output batch shape: %s, side: %s", batch.shape, side) return batch + @classmethod + def _get_mask(cls, item, size): + """ Decompress zipped eye and mouth masks, or return the stored mask + + Parameters + ---------- + item: :class:`lib.faces_detect.Mask` or `bytes` + Either a stored face mask object or a zipped eye or mouth mask + size: int + The size of the stored eye or mouth mask for reshaping + + Returns + ------- + class:`numpy.ndarray` + The decompressed mask + """ + if isinstance(item, bytes): + retval = np.frombuffer(decompress(item), dtype="uint8").reshape(size, size, 1) + else: + retval = item.mask + return retval + @staticmethod def _resize_masks(target_size, masks): """ Resize the masks to the target size """ @@ -446,10 +504,13 @@ def get_targets(self, batch): Parameters ---------- batch: :class:`numpy.ndarray` - This should be a 4-dimensional array of training images in the format (`batchsize`, + This should be a 4+-dimensional array of training images in the format (`batchsize`, `height`, `width`, `channels`). Targets should be requested after performing image transformations but prior to performing warps. + The 4th channel should be the mask. Any channels above the 4th should be any additional + masks that are requested. + Returns ------- dict @@ -472,7 +533,7 @@ def get_targets(self, batch): for image in batch], dtype='float32') / 255. for size in self._output_sizes] logger.trace("Target image shapes: %s", - [tgt_images.shape[1:] for tgt_images in target_batch]) + [tgt_images.shape for tgt_images in target_batch]) retval = self._separate_target_mask(target_batch) logger.trace("Final targets: %s", @@ -484,16 +545,31 @@ def get_targets(self, batch): def _separate_target_mask(target_batch): """ Return the batch and the batch of final masks - Returns the targets as a list of 4-dimensional :class:`numpy.ndarray` s of shape - (`batchsize`, `height`, `width`, `3`). + Parameters + ---------- + target_batch: list + List of 4 dimension :class:`numpy.ndarray` objects resized the model outputs. + The 4th channel of the array contains the face mask, any additional channels after + this are additional masks (e.g. eye mask and mouth mask) - The target masks are returned as its own item and is the 4th channel of the final target - output. + Returns + ------- + dict: + The targets and the masks separated into their own items. The targets are a list of + 3 channel, 4 dimensional :class:`numpy.ndarray` objects sized for each output from the + model. The masks are a :class:`numpy.ndarray` of the final output size. Any additional + masks(e.g. eye and mouth masks) will be collated together into a :class:`numpy.ndarray` + of the final output size. The number of channels will be the number of additional + masks available """ logger.trace("target_batch shapes: %s", [tgt.shape for tgt in target_batch]) retval = dict(targets=[batch[..., :3] for batch in target_batch], - masks=[target_batch[-1][..., 3:]]) - logger.trace("returning: %s", {k: [tgt.shape for tgt in v] for k, v in retval.items()}) + masks=target_batch[-1][..., 3][..., None]) + if target_batch[-1].shape[-1] > 4: + retval["additional_masks"] = target_batch[-1][..., 4:] + logger.trace("returning: %s", {k: v.shape if isinstance(v, np.ndarray) else [tgt.shape + for tgt in v] + for k, v in retval.items()}) return retval # <<< COLOR AUGMENTATION >>> # diff --git a/plugins/train/_config.py b/plugins/train/_config.py index 9a4e4aeb9c..8468bcd3a0 100644 --- a/plugins/train/_config.py +++ b/plugins/train/_config.py @@ -264,6 +264,32 @@ def _set_loss(self): "\n\t 400 - Will give the penalty function 4x as much importance as the main " "loss function." "\n\t 0 - Disables L2 Regularization altogether.") + self.add_item( + section=section, + title="eye_multiplier", + datatype=int, + group="loss", + min_max=(1, 40), + rounding=1, + default=12, + info="The amount of priority to give to the eyes.\n\nThe value given here is as a " + "multiplier of the main loss score. For example:" + "\n\t 1 - The eyes will receive the same priority as the rest of the face. " + "\n\t 10 - The eyes will be given a score 10 times higher than the rest of the " + "face.") + self.add_item( + section=section, + title="mouth_multiplier", + datatype=int, + group="loss", + min_max=(1, 40), + rounding=1, + default=8, + info="The amount of priority to give to the mouth.\n\nThe value given here is as a " + "multiplier of the main loss score. For example:" + "\n\t 1 - The mouth will receive the same priority as the rest of the face. " + "\n\t 10 - The mouth will be given a score 10 times higher than the rest of the " + "face.") self.add_item( section=section, title="penalized_mask_loss", diff --git a/plugins/train/model/_base.py b/plugins/train/model/_base.py index 49f87f7819..87d48e4e9d 100644 --- a/plugins/train/model/_base.py +++ b/plugins/train/model/_base.py @@ -991,28 +991,69 @@ def _set_loss_functions(self, output_names): output_names: list The output names from the model """ - selected_loss = self._loss_dict[self._config["loss_function"]] + mask_channels = self._get_mask_channels() + face_loss = self._loss_dict[self._config["loss_function"]] + for name, output_name in zip(self._names, output_names): if name.startswith("mask"): loss_func = self._loss_dict[self._config["mask_loss_function"]] else: - if (self._config["loss_function"] in self._uses_l2_reg - and self._config["l2_reg_term"] != 0): - loss_funcs = [selected_loss, self._loss_dict["mse"]] - loss_weights = [1.0, self._config["l2_reg_term"] / 100.0] - else: - loss_funcs = [selected_loss] - loss_weights = [1.0] - - if self._config["penalized_mask_loss"]: - loss_funcs = [losses.PenalizedLoss(loss) for loss in loss_funcs] - - loss_func = losses.LossWrapper(loss_functions=list(zip(loss_funcs, loss_weights))) + loss_func = losses.LossWrapper() + loss_func.add_loss(face_loss, mask_channel=mask_channels[0]) + self._add_l2_regularization_term(loss_func, mask_channels[0]) + + mask_channel = 1 + for multiplier in ("eye_multiplier", "mouth_multiplier"): + if self._config[multiplier] > 1: + loss_func.add_loss(face_loss, + weight=self._config[multiplier] * 1.0, + mask_channel=mask_channels[mask_channel]) + self._add_l2_regularization_term(loss_func, mask_channel) + mask_channel += 1 logger.debug("%s: (output_name: '%s', function: %s)", name, output_name, loss_func) self._funcs[output_name] = loss_func logger.debug("functions: %s", self._funcs) + def _add_l2_regularization_term(self, loss_wrapper, mask_channel): + """ Check if an L2 Regularization term should be added and add to the loss function + wrapper. + + Parameters + ---------- + loss_wrapper: :class:`lib.model.losses.LossWrapper` + The wrapper loss function that holds the face losses + mask_channel: int + The channel that holds the mask in `y_true`, if a mask is used for the loss. + `-1` if the input is not masked + """ + if self._config["loss_function"] in self._uses_l2_reg and self._config["l2_reg_term"] > 0: + logger.debug("Adding L2 Regularization for Structural Loss") + loss_wrapper.add_loss(self._loss_dict["mse"], + weight=self._config["l2_reg_term"] / 100.0, + mask_channel=mask_channel) + + def _get_mask_channels(self): + """ Obtain the channels from the face targets that the masks reside in from the training + data generator. + + Returns + ------- + list: + A list of channel indices that contain the mask for the corresponding config item + """ + uses_masks = (self._config["penalized_mask_loss"], + self._config["eye_multiplier"] > 1, + self._config["mouth_multiplier"] > 1) + mask_channels = [-1 for _ in range(len(uses_masks))] + current_channel = 3 + for idx, mask_required in enumerate(uses_masks): + if mask_required: + mask_channels[idx] = current_channel + current_channel += 1 + logger.debug("uses_masks: %s, mask_channels: %s", uses_masks, mask_channels) + return mask_channels + class State(): """ Holds state information relating to the plugin's saved model. diff --git a/plugins/train/trainer/_base.py b/plugins/train/trainer/_base.py index 6e3ab7a997..92e5d9163a 100644 --- a/plugins/train/trainer/_base.py +++ b/plugins/train/trainer/_base.py @@ -101,12 +101,16 @@ def _get_alignments_data(self): Returns ------- dict: - Includes the key `landmarks` if landmarks are required for training and the key `masks` - if the masks are required for training. """ + Includes the key `landmarks` if landmarks are required for training, `masks` if masks + are required for training, `masks_eye` if eye masks are required and `masks_mouth` if + mouth masks are required. """ retval = dict() - get_masks = self._model.config["learn_mask"] or self._model.config["penalized_mask_loss"] - if not self._model.command_line_arguments.warp_to_landmarks and not get_masks: + if not any([self._model.config["learn_mask"], + self._model.config["penalized_mask_loss"], + self._model.config["eye_multiplier"] > 1, + self._model.config["mouth_multiplier"] > 1, + self._model.command_line_arguments.warp_to_landmarks]): return retval alignments = _TrainingAlignments(self._model, self._images) @@ -115,10 +119,17 @@ def _get_alignments_data(self): logger.debug("Adding landmarks to training opts dict") retval["landmarks"] = alignments.landmarks - if get_masks: + if self._model.config["learn_mask"] or self._model.config["penalized_mask_loss"]: logger.debug("Adding masks to training opts dict") retval["masks"] = alignments.masks - logger.debug(retval) + + if self._model.config["eye_multiplier"] > 1: + retval["masks_eye"] = alignments.masks_eye + + if self._model.config["mouth_multiplier"] > 1: + retval["masks_mouth"] = alignments.masks_mouth + + logger.debug({key: {k: len(v) for k, v in val.items()} for key, val in retval.items()}) return retval def _set_tensorboard(self): @@ -407,11 +418,13 @@ def get_batch(self): model_inputs = [] model_targets = [] for side in ("a", "b"): - side_inputs, side_targets = self._get_next(side) - if self._model.config["penalized_mask_loss"]: - side_targets = self._compile_masks(side_targets) - if not self._model.config["learn_mask"]: # Remove masks from the model targets - side_targets = side_targets[:-1] + batch = next(self._feeds[side]) + side_inputs = batch["feed"] + side_targets = self._compile_mask_targets(batch["targets"], + batch["masks"], + batch.get("additional_masks", None)) + if self._model.config["learn_mask"]: + side_targets = side_targets + [batch["masks"]] logger.trace("side: %s, input_shapes: %s, target_shapes: %s", side, [i.shape for i in side_inputs], [i.shape for i in side_targets]) if get_backend() == "amd": @@ -422,35 +435,21 @@ def get_batch(self): model_targets.append(side_targets) return model_inputs, model_targets - def _get_next(self, side): - """ Return the next batch from the :class:`lib.training_data.TrainingDataGenerator` for - this feeder ready for feeding into the model. - - Returns - ------- - model_inputs: list - A list of :class:`numpy.ndarray` for feeding into the model - model_targets: list - A list of :class:`numpy.ndarray` for comparing the output of the model - """ - logger.trace("Generating targets") - batch = next(self._feeds[side]) - targets_use_mask = (self._model.config["learn_mask"] - or self._model.config["penalized_mask_loss"]) - model_targets = batch["targets"] + batch["masks"] if targets_use_mask else batch["targets"] - return batch["feed"], model_targets - - @classmethod - def _compile_masks(cls, targets): - """ Compile the masks into the targets for penalized loss. + def _compile_mask_targets(self, targets, masks, additional_masks): + """ Compile the masks into the targets for penalized loss and for targeted learning. Penalized loss expects the target mask to be included for all outputs in the 4th channel - of the targets. The final output and final mask are always the last 2 outputs + of the targets. Any additional masks are placed into subsequent channels for extraction + by the relevant loss functions. Parameters ---------- targets: list The targets for the model, with the mask as the final entry in the list + masks: list + The masks for the model + additional_masks: list or ``None`` + Any additional masks for the model, or ``None`` if no additional masks are required Returns ------- @@ -458,15 +457,26 @@ def _compile_masks(cls, targets): The targets for the model with the mask compiled into the 4th channel. The original mask is still output as the final item in the list """ - masks = targets[-1] - for idx, tgt in enumerate(targets[:-1]): + if not self._model.config["penalized_mask_loss"] and additional_masks is None: + logger.trace("No masks to compile. Returning targets") + return targets + + if not self._model.config["penalized_mask_loss"] and additional_masks is not None: + masks = additional_masks + elif additional_masks is not None: + masks = np.concatenate((masks, additional_masks), axis=-1) + + for idx, tgt in enumerate(targets): tgt_dim = tgt.shape[1] if tgt_dim == masks.shape[1]: add_masks = masks else: add_masks = np.array([cv2.resize(mask, (tgt_dim, tgt_dim)) - for mask in masks])[..., None] + for mask in masks]) + if add_masks.ndim == 3: + add_masks = add_masks[..., None] targets[idx] = np.concatenate((tgt, add_masks), axis=-1) + logger.trace("masks added to targets: %s", [tgt.shape for tgt in targets]) return targets def generate_preview(self, do_preview): @@ -488,7 +498,7 @@ def generate_preview(self, do_preview): batch = next(self._display_feeds["preview"][side]) self._samples[side] = batch["samples"] self._target[side] = batch["targets"][-1] - self._masks[side] = batch["masks"][0] + self._masks[side] = batch["masks"] def compile_sample(self, batch_size, samples=None, images=None, masks=None): """ Compile the preview samples for display. @@ -547,7 +557,7 @@ def compile_timelapse_sample(self): batchsizes.append(len(batch["samples"])) samples[side] = batch["samples"] images[side] = batch["targets"][-1] - masks[side] = batch["masks"][0] + masks[side] = batch["masks"] batchsize = min(batchsizes) sample = self.compile_sample(batchsize, samples=samples, images=images, masks=masks) return sample @@ -1124,7 +1134,6 @@ def _get_masks(self, side, detected_faces): dict The face filenames as keys with the :class:`lib.faces_detect.Mask` as value. """ - masks = dict() for fhash, face in detected_faces.items(): mask = face.mask[self._config["mask_type"]] @@ -1134,6 +1143,53 @@ def _get_masks(self, side, detected_faces): masks[filename] = mask return masks + @property + def masks_eye(self): + """ dict: filename mapping to zip compressed eye masks for keys "a" and "b" """ + retval = {side: self._get_landmarks_masks(side, detected_faces, "eyes") + for side, detected_faces in self._detected_faces.items()} + return retval + + @property + def masks_mouth(self): + """ dict: filename mapping to zip compressed mouth masks for keys "a" and "b" """ + retval = {side: self._get_landmarks_masks(side, detected_faces, "mouth") + for side, detected_faces in self._detected_faces.items()} + return retval + + def _get_landmarks_masks(self, side, detected_faces, area): + """ Obtain the area landmarks masks for the given area. + + Parameters + ---------- + side: {"a" or "b"} + The side currently being processed + detected_faces: dict + Key is the hash of the face, value is the corresponding + :class:`lib.faces_detect.DetectedFace` object + area: {"eyes" or "mouth"} + The area of the face to obtain the mask for + + Returns + ------- + dict + The face filenames as keys with the zip compressed mask as value. + """ + logger.trace("side: %s, detected_faces: %s, area: %s", side, detected_faces, area) + masks = dict() + for fhash, face in detected_faces.items(): + mask = face.get_landmark_mask(self._training_size, + area, + aligned=True, + dilation=self._training_size // 32, + blur_kernel=self._training_size // 16, + as_zip=True) + for filename in self._hash_to_filenames(side, fhash): + masks[filename] = mask + logger.trace("side: %s, area: %s, masks: %s", + side, area, {key: type(val) for key, val in masks.items()}) + return masks + # Hashes for image folders @classmethod def _get_image_hashes(cls, image_list):