-
Notifications
You must be signed in to change notification settings - Fork 11
/
detect_webcam.py
127 lines (104 loc) · 4.21 KB
/
detect_webcam.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import datetime
from utils.detector_utils import WebcamVideoStream
import time
from multiprocessing import Queue, Pool
import multiprocessing
import tensorflow as tf
import cv2
from utils import detector_utils as detector_utils
from utils import recognizer_utils as recognizer_utils
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
# 消除警告
ACCURACY_GAP = 60
frame_processed = 0
score_thresh = 0.5
num_hands = 1
num_workers = 2
queue_size = 3
# Create a worker thread that loads graph and
# does detection on images in an input queue and puts it on an output queue
def worker(input_q, output_q, cap_params, frame_processed):
print(">> loading frozen model for worker")
detection_graph, sess = detector_utils.load_inference_graph()
sess = tf.compat.v1.Session(graph=detection_graph)
while True:
frame = input_q.get()
if (frame is not None):
boxes, scores = detector_utils.detect_objects(
frame, detection_graph, sess)
# draw bounding boxes
boxes_to_recog, scores_to_show = detector_utils.draw_box_on_image(
cap_params['num_hands_detect'], cap_params["score_thresh"],
scores, boxes, cap_params['im_width'], cap_params['im_height'],
frame)
b_have_hand, img_roi, img_extend = recognizer_utils.drawBoxOfROI(
scores_to_show, boxes_to_recog, 0.2, 0.8,
cap_params['im_width'], cap_params['im_height'], frame)
img_roi, str_gesture = recognizer_utils.processROI(b_have_hand, img_roi, img_extend)
# add frame annotated with bounding box to queue
output_q.put(frame)
output_q.put(img_roi)
output_q.put(str_gesture)
frame_processed += 1
else:
output_q.put(frame)
sess.close()
if __name__ == '__main__':
input_q = Queue(maxsize=queue_size)
output_q = Queue(maxsize=queue_size)
video_capture = WebcamVideoStream(
src=0, width=600, height=400).start()
cap_params = {}
frame_processed = 0
cap_params['im_width'], cap_params['im_height'] = video_capture.size()
cap_params['score_thresh'] = score_thresh
cap_params['num_hands_detect'] = num_hands
print(cap_params)
# spin up workers to paralleize detection.
pool = Pool(num_workers, worker,
(input_q, output_q, cap_params, frame_processed))
start_time = datetime.datetime.now()
num_frames = 0
num_gesture_count = 0
fps = 0
index = 0
cv2.namedWindow('Multi-Threaded Detection', cv2.WINDOW_NORMAL)
cv2.namedWindow('ROI', cv2.WINDOW_AUTOSIZE)
# ----------------Loop开始---------------- #
while True:
try:
frame = video_capture.read()
frame = cv2.flip(frame, 1)
if frame is not None:
input_q.put(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
output_frame = output_q.get()
output_roi = output_q.get()
str_gesture = output_q.get() # 获取手势判别输出
num_frames += 1
num_frames_mod = num_frames % ACCURACY_GAP
if (output_frame is not None):
output_frame = cv2.cvtColor(output_frame, cv2.COLOR_RGB2BGR)
if (output_roi is not None):
output_roi = cv2.cvtColor(output_roi, cv2.COLOR_RGB2BGR)
cv2.imshow('Multi-Threaded Detection', output_frame)
cv2.imshow('ROI', output_roi)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
if str_gesture == '5':
num_gesture_count += 1
if num_frames_mod == ACCURACY_GAP - 1:
print("Accuracy:", num_gesture_count / ACCURACY_GAP)
num_gesture_count = 0
else:
break
except Exception as e:
print('【Exception】: ', e)
# ----------------Loop结束---------------- #
elapsed_time = (datetime.datetime.now() - start_time).total_seconds()
fps = num_frames / elapsed_time
print("fps", fps)
video_capture.stop()
cv2.destroyAllWindows()
pool.terminate()
pool.join()