-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
0 parents
commit 69a744a
Showing
5 changed files
with
779 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,219 @@ | ||
import argparse | ||
import csv | ||
import time | ||
|
||
import cv2 | ||
import numpy as np | ||
import tqdm | ||
from PIL import Image | ||
from face_detection import RetinaFace | ||
from torch.utils.data import DataLoader | ||
|
||
from utils.datasets import Datasets | ||
from utils.util import * | ||
|
||
|
||
def train(args): | ||
model = load_model(args, True).cuda() | ||
dataset = Datasets(f'{args.data_dir}', '300W_LP', get_transforms(True), True) | ||
loader = DataLoader(dataset, batch_size=args.batch_size, shuffle=True, num_workers=4) | ||
|
||
criterion = GeodesicLoss().cuda() | ||
optimizer = torch.optim.Adam(model.parameters(), args.lr) | ||
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, milestones=[10, 20], gamma=0.5) | ||
|
||
best_loss = float('inf') | ||
with open('outputs/weights/step.csv', 'w') as log: | ||
logger = csv.DictWriter(log, fieldnames=['epoch', 'Loss', 'Pitch', 'Yaw', 'Roll']) | ||
logger.writeheader() | ||
for epoch in range(args.epochs): | ||
print(('\n' + '%10s' * 3) % ('epoch', 'memory', 'loss')) | ||
p_bar = tqdm.tqdm(loader, total=len(loader)) | ||
model.train() | ||
total_loss = 0 | ||
for i, (samples, labels) in enumerate(p_bar): | ||
samples = samples.cuda() | ||
labels = labels.cuda() | ||
optimizer.zero_grad() | ||
outputs = model(samples) | ||
loss = criterion(outputs, labels) | ||
loss.backward() | ||
optimizer.step() | ||
|
||
total_loss += loss.item() | ||
memory = f'{torch.cuda.memory_reserved() / 1E9:.3g}G' | ||
s = ('%10s' * 2 + '%10.3g') % (f'{epoch + 1}/{args.epochs}', memory, loss.item()) | ||
p_bar.set_description(s) | ||
|
||
avg_loss = total_loss / len(loader) | ||
val_loss, val_pitch, val_yaw, val_roll = test(args, model) | ||
scheduler.step() | ||
|
||
logger.writerow({'Pitch': str(f'{val_pitch:.3f}'), | ||
'Yaw': str(f'{val_yaw:.3f}'), | ||
'Roll': str(f'{val_roll:.3f}'), | ||
'Loss': str(f'{avg_loss:.3f}'), | ||
'epoch': str(epoch + 1).zfill(3)}) | ||
log.flush() | ||
if val_loss < best_loss: | ||
best_loss = val_loss | ||
torch.save(model.state_dict(), f'{args.save_dir}/weights/best.pt') | ||
print(f'Epoch {epoch + 1}: New best model saved with val_loss: {best_loss:.3f}') | ||
|
||
torch.save(model.state_dict(), f'{args.save_dir}/weights/last.pt') | ||
scheduler.step() | ||
|
||
torch.cuda.empty_cache() | ||
print('Training completed.') | ||
|
||
|
||
@torch.no_grad() | ||
def test(args, model=None): | ||
dataset = Datasets(f'{args.data_dir}', 'AFLW2K', get_transforms(False), False) | ||
loader = DataLoader(dataset, batch_size=64) | ||
if model is None: | ||
model = load_model(args, False).cuda() | ||
# model = model.float() | ||
model.half() | ||
model.eval() | ||
|
||
total, y_error, p_error, r_error = 0, 0.0, 0.0, 0.0 | ||
for sample, label in tqdm.tqdm(loader, ('%10s' * 3) % ('Pitch', 'Yaw', 'Roll')): | ||
sample = sample.cuda() | ||
sample = sample.half() | ||
total += label.size(0) | ||
|
||
p_gt = label[:, 0].float() * 180 / np.pi | ||
y_gt = label[:, 1].float() * 180 / np.pi | ||
r_gt = label[:, 2].float() * 180 / np.pi | ||
|
||
output = model(sample) | ||
euler = compute_euler(output) * 180 / np.pi | ||
|
||
p_pred = euler[:, 0].cpu() | ||
y_pred = euler[:, 1].cpu() | ||
r_pred = euler[:, 2].cpu() | ||
|
||
p_error += torch.sum(torch.min(torch.stack((torch.abs(p_gt - p_pred), | ||
torch.abs(p_pred + 360 - p_gt), | ||
torch.abs(p_pred - 360 - p_gt), | ||
torch.abs(p_pred + 180 - p_gt), | ||
torch.abs(p_pred - 180 - p_gt))), 0)[0]) | ||
|
||
y_error += torch.sum(torch.min(torch.stack((torch.abs(y_gt - y_pred), | ||
torch.abs(y_pred + 360 - y_gt), | ||
torch.abs(y_pred - 360 - y_gt), | ||
torch.abs(y_pred + 180 - y_gt), | ||
torch.abs(y_pred - 180 - y_gt))), 0)[0]) | ||
|
||
r_error += torch.sum(torch.min(torch.stack((torch.abs(r_gt - r_pred), | ||
torch.abs(r_pred + 360 - r_gt), | ||
torch.abs(r_pred - 360 - r_gt), | ||
torch.abs(r_pred + 180 - r_gt), | ||
torch.abs(r_pred - 180 - r_gt))), 0)[0]) | ||
|
||
p_error, y_error, r_error = p_error / total, y_error / total, r_error / total | ||
avg_error = (p_error + y_error + r_error) / (3 * total) | ||
print(('%10.3g' * 3) % (p_error, y_error, r_error)) | ||
|
||
model.float() # for training | ||
return avg_error, p_error, y_error, r_error | ||
|
||
|
||
@torch.no_grad() | ||
def inference(args): | ||
model = load_model(args, False).cuda() | ||
model.eval() | ||
detector = RetinaFace(0) | ||
|
||
cap = cv2.VideoCapture(0) | ||
frame_width = int(cap.get(3)) | ||
frame_height = int(cap.get(4)) | ||
out = cv2.VideoWriter(f'{args.save_dir}/output.avi', cv2.VideoWriter_fourcc('M', 'J', 'P', 'G'), 25, | ||
(frame_width, frame_height)) | ||
# Check if the webcam is opened correctly | ||
if not cap.isOpened(): | ||
raise IOError("Cannot open webcam") | ||
|
||
with torch.no_grad(): | ||
while True: | ||
ret, frame = cap.read() | ||
|
||
faces = detector(frame) | ||
|
||
for box, landmarks, score in faces: | ||
|
||
# Print the location of each face in this image | ||
if score < .95: | ||
continue | ||
x_min = int(box[0]) | ||
y_min = int(box[1]) | ||
x_max = int(box[2]) | ||
y_max = int(box[3]) | ||
bbox_width = abs(x_max - x_min) | ||
bbox_height = abs(y_max - y_min) | ||
|
||
x_min = max(0, x_min - int(0.2 * bbox_height)) | ||
y_min = max(0, y_min - int(0.2 * bbox_width)) | ||
x_max = x_max + int(0.2 * bbox_height) | ||
y_max = y_max + int(0.2 * bbox_width) | ||
|
||
img = frame[y_min:y_max, x_min:x_max] | ||
img = Image.fromarray(img) | ||
img = img.convert('RGB') | ||
img = get_transforms(False)(img) | ||
|
||
img = torch.Tensor(img[None, :]).cuda() | ||
|
||
c = cv2.waitKey(1) | ||
if c == 27: | ||
break | ||
|
||
start = time.time() | ||
R_pred = model(img) | ||
end = time.time() | ||
print('Head pose estimation: %2f ms' % ((end - start) * 1000.)) | ||
|
||
euler = compute_euler( | ||
R_pred) * 180 / np.pi | ||
p_pred_deg = euler[:, 0].cpu() | ||
y_pred_deg = euler[:, 1].cpu() | ||
r_pred_deg = euler[:, 2].cpu() | ||
|
||
# utils.draw_axis(frame, y_pred_deg, p_pred_deg, r_pred_deg, left+int(.5*(right-left)), top, size=100) | ||
plot_pose_cube(frame, y_pred_deg, p_pred_deg, r_pred_deg, x_min + int(.5 * ( | ||
x_max - x_min)), y_min + int(.5 * (y_max - y_min)), size=bbox_width) | ||
|
||
cv2.imshow("Demo", frame) | ||
out.write(frame) | ||
cv2.waitKey(5) | ||
cap.release() | ||
out.release() | ||
|
||
# Closes all the frames | ||
cv2.destroyAllWindows() | ||
|
||
|
||
def main(): | ||
parser = argparse.ArgumentParser(description='Head Pose Estimation') | ||
parser.add_argument('--model_name', type=str, default='RepVGG-A2') | ||
parser.add_argument('--data_dir', type=str, default='../../Datasets/HPE') | ||
parser.add_argument('--save-dir', type=str, default='./outputs') | ||
parser.add_argument('--epochs', type=int, default=100) | ||
parser.add_argument('--lr', type=float, default=0.0001) | ||
parser.add_argument('--batch-size', type=int, default=64) | ||
parser.add_argument('--train', action='store_true') | ||
parser.add_argument('--test', action='store_true') | ||
parser.add_argument('--inference', default=True, action='store_true') | ||
|
||
args = parser.parse_args() | ||
if args.train: | ||
train(args) | ||
if args.test: | ||
test(args) | ||
if args.inference: | ||
inference(args) | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,158 @@ | ||
import torch | ||
import torch.nn as nn | ||
from utils import util | ||
|
||
|
||
def conv_bn(inp, oup, kernel_size, stride, padding, groups=1): | ||
result = nn.Sequential() | ||
result.add_module('conv', nn.Conv2d(inp, oup, kernel_size, stride, padding, groups=groups, bias=False)) | ||
result.add_module('bn', nn.BatchNorm2d(oup)) | ||
return result | ||
|
||
|
||
class RepVGGBlock(nn.Module): | ||
def __init__(self, inp, oup, k, s=1, p=0, d=1, gr=1, padding_mode='zeros', deploy=False): | ||
super(RepVGGBlock, self).__init__() | ||
self.inp = inp | ||
self.groups = gr | ||
self.deploy = deploy | ||
self.nonlinearity = nn.ReLU() | ||
self.se = nn.Identity() | ||
|
||
assert k == 3 | ||
assert p == 1 | ||
|
||
padding = p - k // 2 | ||
|
||
if deploy: | ||
self.rbr_reparam = nn.Conv2d(inp, oup, k, s, p, d, gr, bias=True, padding_mode=padding_mode) | ||
else: | ||
self.rbr_identity = nn.BatchNorm2d(inp) if oup == inp and s == 1 else None | ||
self.rbr_dense = conv_bn(inp, oup, k, s, p, groups=gr) | ||
self.rbr_1x1 = conv_bn(inp, oup, 1, s, padding, groups=gr) | ||
|
||
def forward(self, x): | ||
if hasattr(self, 'rbr_reparam'): | ||
return self.nonlinearity(self.se(self.rbr_reparam(x))) | ||
|
||
if self.rbr_identity is None: | ||
out = 0 | ||
else: | ||
out = self.rbr_identity(x) | ||
|
||
return self.nonlinearity(self.se(self.rbr_dense(x) + self.rbr_1x1(x) + out)) | ||
|
||
|
||
class RepVGG(nn.Module): | ||
def __init__(self, layers, width=None, num_cls=1000, gr_map=None, deploy=False): | ||
super(RepVGG, self).__init__() | ||
self.deploy = deploy | ||
self.cur_layer_idx = 1 | ||
self.gr_map = gr_map or dict() | ||
|
||
assert len(width) == 4 | ||
assert 0 not in self.gr_map | ||
|
||
self.inp = min(64, int(64 * width[0])) | ||
|
||
self.stage0 = RepVGGBlock(3, self.inp, 3, 2, 1, deploy=self.deploy) | ||
self.stage1 = self._make_stage(int(64 * width[0]), layers[0], stride=2) | ||
self.stage2 = self._make_stage(int(128 * width[1]), layers[1], stride=2) | ||
self.stage3 = self._make_stage(int(256 * width[2]), layers[2], stride=2) | ||
self.stage4 = self._make_stage(int(512 * width[3]), layers[3], stride=2) | ||
self.gap = nn.AdaptiveAvgPool2d(output_size=1) | ||
self.linear = nn.Linear(int(512 * width[3]), num_cls) | ||
|
||
def _make_stage(self, oup, layer, stride): | ||
strides = [stride] + [1] * (layer - 1) | ||
layers = [] | ||
for stride in strides: | ||
cur_groups = self.gr_map.get(self.cur_layer_idx, 1) | ||
layers.append(RepVGGBlock(self.inp, oup, 3, stride, p=1, gr=cur_groups, deploy=self.deploy, )) | ||
self.inp = oup | ||
self.cur_layer_idx += 1 | ||
return nn.Sequential(*layers) | ||
|
||
def forward(self, x): | ||
out = self.stage0(x) | ||
out = self.stage1(out) | ||
out = self.stage2(out) | ||
out = self.stage3(out) | ||
out = self.stage4(out) | ||
out = self.gap(out) | ||
out = out.view(out.size(0), -1) | ||
out = self.linear(out) | ||
return out | ||
|
||
|
||
def create_model(backbone_name, num_cls=1000): | ||
optional_groupwise_layers = [2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26] | ||
g2_map = {l: 2 for l in optional_groupwise_layers} | ||
g4_map = {l: 4 for l in optional_groupwise_layers} | ||
default_group_map = None | ||
net_configs = { | ||
'RepVGG-A0': ([2, 4, 14, 1], [0.75, 0.75, 0.75, 2.5], default_group_map), | ||
'RepVGG-A1': ([2, 4, 14, 1], [1, 1, 1, 2.5], default_group_map), | ||
'RepVGG-A2': ([2, 4, 14, 1], [1.5, 1.5, 1.5, 2.75], default_group_map), | ||
'RepVGG-B0': ([4, 6, 16, 1], [1, 1, 1, 2.5], default_group_map), | ||
'RepVGG-B1': ([4, 6, 16, 1], [2, 2, 2, 4], default_group_map), | ||
'RepVGG-B1g2': ([4, 6, 16, 1], [2, 2, 2, 4], g2_map), | ||
'RepVGG-B1g4': ([4, 6, 16, 1], [2, 2, 2, 4], g4_map), | ||
'RepVGG-B2': ([4, 6, 16, 1], [2.5, 2.5, 2.5, 5], default_group_map), | ||
'RepVGG-B2g2': ([4, 6, 16, 1], [2.5, 2.5, 2.5, 5], g2_map), | ||
'RepVGG-B2g4': ([4, 6, 16, 1], [2.5, 2.5, 2.5, 5], g4_map), | ||
'RepVGG-B3': ([4, 6, 16, 1], [3, 3, 3, 5], default_group_map), | ||
'RepVGG-B3g2': ([4, 6, 16, 1], [3, 3, 3, 5], g2_map), | ||
'RepVGG-B3g4': ([4, 6, 16, 1], [3, 3, 3, 5], g4_map), | ||
} | ||
|
||
def model_constructor(deploy): | ||
configs = net_configs.get(backbone_name) | ||
if configs is None: | ||
raise ValueError(f"Network {backbone_name} is not supported.") | ||
layers, width, gr_map = configs[:3] | ||
return RepVGG(layers, width, num_cls, gr_map, deploy=deploy) | ||
|
||
return model_constructor | ||
|
||
|
||
class HPE(nn.Module): | ||
def __init__(self, model_name, weight, deploy, pretrained=True): | ||
super(HPE, self).__init__() | ||
repvgg = create_model(model_name) | ||
backbone = repvgg(deploy) | ||
if pretrained: | ||
checkpoint = torch.load(weight) | ||
if 'state_dict' in checkpoint: | ||
checkpoint = checkpoint['state_dict'] | ||
ckpt = {k.replace('module.', ''): v for k, | ||
v in checkpoint.items()} # strip the names | ||
backbone.load_state_dict(ckpt) | ||
|
||
self.layer0 = backbone.stage0 | ||
self.layer1 = backbone.stage1 | ||
self.layer2 = backbone.stage2 | ||
self.layer3 = backbone.stage3 | ||
self.layer4 = backbone.stage4 | ||
self.gap = nn.AdaptiveAvgPool2d(output_size=1) | ||
|
||
last_channel = 0 | ||
for n, m in self.layer4.named_modules(): | ||
if ('rbr_dense' in n or 'rbr_reparam' in n) and isinstance(m, nn.Conv2d): | ||
last_channel = m.out_channels | ||
|
||
fea_dim = last_channel | ||
|
||
self.linear_reg = nn.Linear(fea_dim, 6) | ||
|
||
def forward(self, x): | ||
x = self.layer0(x) | ||
x = self.layer1(x) | ||
x = self.layer2(x) | ||
x = self.layer3(x) | ||
x = self.layer4(x) | ||
x = self.gap(x) | ||
x = torch.flatten(x, 1) | ||
x = self.linear_reg(x) | ||
|
||
return util.compute_rotation(x) |
Oops, something went wrong.