forked from GajuuzZ/Human-Falling-Detect-Tracks
-
Notifications
You must be signed in to change notification settings - Fork 0
/
DetectorLoader.py
117 lines (88 loc) · 3.66 KB
/
DetectorLoader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import time
import torch
import numpy as np
import torchvision.transforms as transforms
from queue import Queue
from threading import Thread
from Detection.Models import Darknet
from Detection.Utils import non_max_suppression, ResizePadding
class TinyYOLOv3_onecls(object):
"""Load trained Tiny-YOLOv3 one class (person) detection model.
Args:
input_size: (int) Size of input image must be divisible by 32. Default: 416,
config_file: (str) Path to Yolo model structure config file.,
weight_file: (str) Path to trained weights file.,
nms: (float) Non-Maximum Suppression overlap threshold.,
conf_thres: (float) Minimum Confidence threshold of predicted bboxs to cut off.,
device: (str) Device to load the model on 'cpu' or 'cuda'.
"""
def __init__(self,
input_size=416,
config_file='Models/yolo-tiny-onecls/yolov3-tiny-onecls.cfg',
weight_file='Models/yolo-tiny-onecls/best-model.pth',
nms=0.2,
conf_thres=0.45,
device='cuda'):
self.input_size = input_size
self.model = Darknet(config_file).to(device)
self.model.load_state_dict(torch.load(weight_file))
self.model.eval()
self.device = device
self.nms = nms
self.conf_thres = conf_thres
self.resize_fn = ResizePadding(input_size, input_size)
self.transf_fn = transforms.ToTensor()
def detect(self, image, need_resize=True, expand_bb=5):
"""Feed forward to the model.
Args:
image: (numpy array) Single RGB image to detect.,
need_resize: (bool) Resize to input_size before feed and will return bboxs
with scale to image original size.,
expand_bb: (int) Expand boundary of the boxs.
Returns:
(torch.float32) Of each detected object contain a
[top, left, bottom, right, bbox_score, class_score, class]
return `None` if no detected.
"""
image_size = (self.input_size, self.input_size)
if need_resize:
image_size = image.shape[:2]
image = self.resize_fn(image)
image = self.transf_fn(image)[None, ...]
scf = torch.min(self.input_size / torch.FloatTensor([image_size]), 1)[0]
detected = self.model(image.to(self.device))
detected = non_max_suppression(detected, self.conf_thres, self.nms)[0]
if detected is not None:
detected[:, [0, 2]] -= (self.input_size - scf * image_size[1]) / 2
detected[:, [1, 3]] -= (self.input_size - scf * image_size[0]) / 2
detected[:, 0:4] /= scf
detected[:, 0:2] = np.maximum(0, detected[:, 0:2] - expand_bb)
detected[:, 2:4] = np.minimum(image_size[::-1], detected[:, 2:4] + expand_bb)
return detected
class ThreadDetection(object):
def __init__(self,
dataloader,
model,
queue_size=256):
self.model = model
self.dataloader = dataloader
self.stopped = False
self.Q = Queue(maxsize=queue_size)
def start(self):
t = Thread(target=self.update, args=(), daemon=True).start()
return self
def update(self):
while True:
if self.stopped:
return
images = self.dataloader.getitem()
outputs = self.model.detect(images)
if self.Q.full():
time.sleep(2)
self.Q.put((images, outputs))
def getitem(self):
return self.Q.get()
def stop(self):
self.stopped = True
def __len__(self):
return self.Q.qsize()