-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathOverlayThread.py
141 lines (124 loc) · 5.26 KB
/
OverlayThread.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# -*- coding: utf-8 -*-
"""
Created on Fri Jun 15, 2018
@author: Jack J Amend
Inherits from the Thread class. Takes the information from the analyze
queue. The information is then taken and an overlay frame is created to
identify the regions where the points are located.
"""
import queue
import threading
import cv2
import numpy as np
colors = [(255, 0, 0), (0, 255, 0), (0, 0, 255)]
class OverlayThread(threading.Thread):
def __init__(self, analyze_q: queue.Queue, overlay_q: queue.Queue,
resolution, reduction, name=None):
"""
Initializes an instance of an overlay thread. Checks list of points
and creates a table that is then displayed as an overlay on the frame.
:param analyze_q:
queue that holds the frame, tracks, and lidar information.
:param overlay_q:
queue that contains the overlay frame, the table with the
points, the scores for each zone, and the LiDAR data.
:param resolution:
a tuple of the number of pixels as height by width.
:param reduction:
the factor to reduce the frame size by.
:param name:
name of the thread.
"""
super(OverlayThread, self).__init__(name=name)
self.stop_request = threading.Event()
self.analyze_q = analyze_q
self.overlay_q = overlay_q
self.reduction = reduction
self.resolution = resolution
self.dx = resolution[0] // reduction
self.dy = resolution[1] // reduction
self.lookup = np.zeros((reduction, reduction))
self.history = np.zeros((reduction, reduction))
self.travel_zone = 1
self.scores = []
def run(self):
"""
Runs thread while stop request if not set. Takes frame and track
information from the analyze queue and runs an analysis on to find
out which smaller frames are occupied. Processed information is then
passed to the overlay queue for the main thread to process.
"""
while not self.stop_request.isSet():
while not self.analyze_q.empty():
# start = time.time()
frame, tracks, lidar = self.analyze_q.get()
_, danger_zone = lidar
output = self._image_with_boxes(frame, tracks,
show_image=False)
self.overlay_q.put((output, self.lookup, self.scores, lidar))
if danger_zone:
self._find_zone()
self.history *= .95
def join(self, timeout=None):
"""
Joins the thread.
:param timeout:
time until timeout of attempting to join thread
"""
self.stop_request.set()
super(OverlayThread, self).join(timeout)
'''Analyze the image to overlay with boxes'''
def _image_with_boxes(self, image, tracks, show_image=True):
# If there are no points to track, return
if len(tracks) <= 0:
return
new_image = image.copy()
# finds corners and retrieves x y points
corner_coordinates = self._get_coordinates_of_corners(tracks)
self._create_fill_in_array(corner_coordinates)
self._overlay_image(new_image)
if show_image:
cv2.imshow('Show image option window', new_image)
return new_image
def _get_coordinates_of_corners(self, tracks, full_track=True):
coordinates = []
for track in tracks:
# May need to adjust this line to account for past path
# Right now, looks at most recent
if full_track and len(track) > 2:
x, y = track[0]
coordinates.append((x, y))
x, y = track[len(track) - 1]
coordinates.append((x, y))
return coordinates
def _create_fill_in_array(self, points):
width, height = self.dx, self.dy
for point in points:
px, py = point
x_sector = px // width
y_sector = py // height
if x_sector != 8 and y_sector != 8:
self.lookup[x_sector][y_sector] += 1
self.history[x_sector][y_sector] += 1
def _overlay_image(self, image):
overlay = image.copy()
for index, x in np.ndenumerate(self.lookup):
top_x = index[0] * (self.resolution[0] // self.reduction)
top_y = index[1] * (self.resolution[1] // self.reduction)
bottom_x = top_x + (self.resolution[0] // self.reduction)
bottom_y = top_y + (self.resolution[1] // self.reduction)
cv2.rectangle(image, (top_x, top_y), (bottom_x, bottom_y),
(0, 0, 0), 1) # Grid Lines
if x:
cv2.rectangle(overlay, (top_x, top_y), (bottom_x, bottom_y),
colors[2], -1)
alpha = .4
cv2.addWeighted(overlay, alpha, image, 1 - alpha, 0, image)
def _find_zone(self):
zones = np.split(self.lookup, [3, 5])
# zones = np.delete(zones, 0, 1)
self.scores = []
for zone in zones:
self.scores.append(np.sum(zone) / np.size(zone))
self.travel_zone = np.argmin(np.delete(self.scores, 1)) * 2
self.lookup = np.zeros((self.reduction, self.reduction))