-
Notifications
You must be signed in to change notification settings - Fork 2
/
PTZ_Abstractions.py
446 lines (311 loc) · 13 KB
/
PTZ_Abstractions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
"""
PTZ cameras are controlled via python Abstractions.
OpenAI: GYM Abstractions which are part of the Scene-Controller exposing Eagle Capbilitis to
simulate the evolution of the environment.
Assumes: This file assumes that the virtual worlds is running on 41451 port.
This file simulate random PTZ actions in the scenes to show-case usage of python wrappers.
"""
#Assign which cuda GPU to use
import os
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
#print the python env being used.
#make sure python environment is all set up.
import sys
print(sys.executable)
import gym
import sys
import multiprocessing
import os.path as osp
import gym
from collections import defaultdict
import numpy as np
from gym import spaces
from PIL import Image
import airsim
import cv2
import numpy as np
import os
import time
import pprint
from airsim.types import Pose
from airsim import Vector3r, Quaternionr, Pose
import random
import _thread
import matplotlib.pyplot as plt
import math
class PTZEnv(gym.Env):
#camera starting pose: 8 meters
camera_height = -8.0
#camera initial fov
initial_fov = 60
initial_angleh = 0
initial_anglev = -10.0
#state space size: 120*120 image
screen_height = 120
screen_width = 120
captured_image = 120
seg_img = 120
#camera is attached to Car0: PTZ camera
camera_name = "0"
vehicle_name="Car0"
camera_pos = None
#store the current camera values
angleh = initial_angleh
anglev = initial_anglev
fov = initial_fov
#change allowed in pan,tilt in each step
delta = 2.0
#penalty on changing cameras PTZ
penalty_movement = 0.01
curr_captured_cv_image = None
curr_detections_engine = None
def __init__(self, start_airsim=True, env_id = 0):
# actions -> PTZ
# actions -> PTZ, 3 for pan, 3 for tilt, 3 for zoom
self.action_space = spaces.MultiDiscrete([ 3, 3, 3])
# given image from Airsim simulator
self.observation_space = spaces.Box(low=0, high=255,shape=(self.screen_height, self.screen_width, 1), dtype=np.uint8)
self.steps = 0
#records if camera was modified
self.modified_cam = False
#used to store one past state to be used when image collection fails
self.past_image_state = None
#if car is outside of boundary
self.outside_car = False
self.port = 41451+env_id
if start_airsim:
self.client = airsim.CarClient(port = self.port)
self.initializeEnv()
self.client_det = airsim.CarClient(port = self.port)
self.client_det.simAddDetectionFilterMeshName(camera_name=self.camera_name, image_type=0, mesh_name="Car1",vehicle_name = self.vehicle_name)
self.reward_x = 0
self.reward_y = 0
self.reward_object_size = 0
self.curr_seg_image = None
#set this this appropritate value to make training step delay within in 25ms - 30ms
#this will depend on the compute of your machine. Monitor the FPS of stable_baselines3 during training
self.delay_to_introduce = 0.0/1000.0
def clamp(self, num, min_value, max_value):
return max(min(num, max_value), min_value)
def initializeEnv(self):
client = self.client
client.reset()
#Random Iniliaze the Car1 position in relative to the Car0
car1_pos = Pose()
car1_pos.position.x_val= random.randint(-5, 5)
car1_pos.position.y_val= random.randint(-5, 5)
car1_pos.position.z_val= 0
random_orientation = random.randint(0, 360)*0.0174533
car1_pos.orientation = airsim.to_quaternion(0 , 0, random_orientation)
client.simSetObjectPose("Car1", car1_pos)
#internal camera settings
self.angleh = self.initial_angleh
self.anglev = self.initial_anglev
self.fov = self.initial_fov
#set the correct camera_pos
self.camera_pos = Pose()
self.camera_pos.position.x_val= 0
self.camera_pos.position.y_val= 0
self.camera_pos.position.z_val= self.camera_height
tilt = self.anglev/57.2958
pan = self.angleh/57.2958
self.camera_pos.orientation = airsim.to_quaternion(tilt , 0, pan)
success = client.simSetCameraPose(self.camera_name, self.camera_pos, vehicle_name = self.vehicle_name)
fov_to_set = random.randint(40, 60)
self.fov = fov_to_set
client.simSetCameraFov(self.camera_name, fov_to_set, vehicle_name=self.vehicle_name)
#get the exact position of camera from the engine
car0_pos = client.simGetObjectPose("Car0").position;
self.camera_pos_pix = client.simGetCameraInfo(self.camera_name, self.vehicle_name).pose.position + car0_pos
def set_camera(self, fov=90, angleh = 0.0, anglev = 0.0):
tilt = anglev/57.2958
pan = angleh/57.2958
self.camera_pos.orientation = airsim.to_quaternion(tilt , 0, pan)
self.client.simSetCameraPose(self.camera_name, self.camera_pos, vehicle_name = self.vehicle_name)
self.client.simSetCameraFov(self.camera_name, fov, vehicle_name=self.vehicle_name)
def reset(self):
self.steps = 0
self.outside_car = False
self.initializeEnv()
#the next state
image_data = self.get_image()
image = Image.frombytes('RGB', (image_data.width, image_data.height), image_data.image_data_uint8, 'raw', 'RGB', 0, 1)
if self.screen_height!=image_data.width:
image = image.resize((self.screen_height,self.screen_width ), resample=2)
image = np.array(image)
image_gray = self.convert_rgb_to_gray(image)
image_gray = image_gray.reshape(self.screen_height, self.screen_width, 1)
state = image_gray
return state
def convert_rgb_to_gray(self, observation):
r, g, b = observation[:, :, 0], observation[:, :, 1], observation[:, :, 2]
observation = 0.2989 * r + 0.5870 * g + 0.1140 * b
return observation
def step(self, action):
# Initialize next state, reward, done flag
self.next_state = None
self.reward = None
self.done = False
self.steps += 1
self.reward_x = 0
self.reward_y = 0
self.reward_object_size = 0
modified_cam = False
try:
#send action to the camera
self.reward, self.next_state, modified_cam = self.send_action(action)
except Exception as e:
self.reward = 0.0 #some reward for failure in the system
self.next_state = self.past_image_state
self.initializeEnv()
print('Except in the step(self, action)')
print(e)
if self.steps==2000:
self.done = True
print('completed episode:2000')
if self.outside_car == True:
self.done = True
if modified_cam:
self.reward = self.reward - self.penalty_movement
return self.next_state, self.reward, self.done, {}
def outside(self, Car1_pix):
width = (self.captured_image/2.0)
vx,vy = abs(Car1_pix[0]), abs(Car1_pix[1])
if vx> width or vy>width:
return True
return False
def calculate_object_detection_reward(self):
reward = 0
num_of_tries = 5
#this is needed, as sometimes the engine may fail even when Car is in FOV
for i in range(num_of_tries):
car1 = self.client_det.simGetDetections(camera_name=self.camera_name, image_type=0)
self.curr_detections_engine = car1
if car1:
car1 = car1[0]
x1 = int(car1.box2D.min.x_val)
y1 = int(car1.box2D.min.y_val)
x2 = int(car1.box2D.max.x_val)
y2 = int(car1.box2D.max.y_val)
x_center = (x1+x2)/2.0
y_center = (y1+y2)/2.0
#print('box:', x_center, y_center)
width = self.screen_height/2.0
x_distance = abs(width - x_center)
y_distance = abs(width - y_center)
self.reward_x = (width - x_distance)/width
self.reward_y = (width - y_distance)/width
size_of_box = (y2-y1)*(x2-x1)/(self.screen_height*self.screen_height)
self.reward_object_size = size_of_box
reward = self.reward_object_size*self.reward_x*self.reward_y
if x1==0 or y1==0:
reward = reward*0.3
if x2==0 or y2==0:
reward = reward*0.3
if x1==self.screen_height or y1==self.screen_height:
reward = reward*0.3
if x2==self.screen_height or y2==self.screen_height:
reward = reward*0.3
return reward
self.outside_car = True
reward = -10
print('done:',self.steps)
return reward
def draw_bounding_box(self):
response = self.curr_captured_cv_image
img1d = np.fromstring(response.image_data_uint8, dtype=np.uint8) #get numpy array
img_rgb = img1d.reshape(response.height, response.width, 3)
cars = self.curr_detections_engine
if cars:
car = cars[0]
x1 = int(car.box2D.min.x_val)
y1 = int(car.box2D.min.y_val)
x2 = int(car.box2D.max.x_val)
y2 = int(car.box2D.max.y_val)
cv2.rectangle(img_rgb,(x1,y1),(x2,y2),(255,0,0),2)
print('bounding: (x1,y1,x2,y2): ', (x1,y1,x2,y2))
return img_rgb
def send_action(self,action):
local_reward = 0
pan_a = action[0]
tilt_a = action[1]
zoom_a = action[2]
self.modified_cam = False
local_reward = 0
if pan_a==0:
self.angleh+=self.delta
self.modified_cam = True
elif pan_a==1:
self.angleh-=self.delta
self.modified_cam = True
if tilt_a==0:
self.anglev+=self.delta
self.modified_cam = True
elif tilt_a==1:
self.anglev-=self.delta
self.modified_cam = True
if zoom_a==0:
self.fov+=1.0
elif zoom_a==1:
self.fov-=1.0
#send the action to the camera
self.fov = self.clamp(self.fov, 5, 90)
self.anglev = self.clamp(self.anglev, -120, 120)
self.angleh = self.clamp(self.angleh, -120, 120)
if self.delay_to_introduce >0.0:
time.sleep(self.delay_to_introduce)
self.set_camera(fov=self.fov, angleh = self.angleh, anglev = self.anglev)
reward = 0
#calculate the state
try:
image_data = self.get_image()
image = Image.frombytes('RGB', (image_data.width, image_data.height), image_data.image_data_uint8, 'raw', 'RGB', 0, 1)
if self.screen_height!=image_data.width:
image = image.resize((self.screen_height,self.screen_width), resample=2)
image = np.array(image)
image_gray = self.convert_rgb_to_gray(image)
image_gray = image_gray.reshape(self.screen_height, self.screen_width, 1)
state = image_gray
t1 = time.time()
reward = self.calculate_object_detection_reward()
t2 = time.time()
#keep track of successful state
self.past_image_state = state
except Exception as e:
state = self.past_image_state
print('Except in: send_action(self,action)')
print(e)
return reward, state, self.modified_cam
def get_image(self):
responses = self.client.simGetImages([airsim.ImageRequest(self.camera_name, airsim.ImageType.Scene, False, False)],self.vehicle_name) #scene vision image in uncompressed RGB array
response = responses[0]
self.curr_captured_cv_image = response
return response
def get_reward_image(self):
responses = self.client_reward.simGetImages([airsim.ImageRequest(self.camera_name, airsim.ImageType.Segmentation, False, False)],self.vehicle_name) #Scene segmentation image
response = responses[0]
img1d = np.fromstring(response.image_data_uint8, dtype=np.uint8) #get numpy array
img_rgb = img1d.reshape(response.height, response.width, 3)
return img_rgb
def visualize_response_CV2(self,response):
img1d = np.fromstring(response.image_data_uint8, dtype=np.uint8)
img_rgb = img1d.reshheightape(response.height, response.width, 3)
cv2.imshow(self.vehicle_name, img_rgb)
cv2.waitKey(1)
#Play the environment using random actions for PTZ camera
env = PTZEnv()
no_of_eposides = 10
for i in range(no_of_eposides):
done = False
next_state = env.reset()
while not done:
action = env.action_space.sample()
print('action:', action)
t1 = time.time()
next_state, reward, done,_ = env.step(action)
t2 = time.time()
print('time taken:', t2-t1)
print(i, 'reward is:',reward, env.outside_car, env.steps)
print('****************** Doing Reset *********************')
env.reset()