-
Notifications
You must be signed in to change notification settings - Fork 0
/
cableController.py
439 lines (321 loc) · 18.2 KB
/
cableController.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
# cableController.py
# shotmanager
#
# The cable movement controller.
# Runs as a DroneKit-Python script.
#
# Created by Jon Challinger and Will Silva on 1/21/2015.
# Copyright (c) 2016 3D Robotics. All rights reserved.
from catmullRom import CatmullRom
from vector3 import *
from numpy import linspace
import math
import threading
import itertools
# epsilon to detect if we've reached a target in meters
TARGET_EPSILON_M = 0.1
# Length of each segment that is assigned a maximum speed based on its maximum curvature
CURVATURE_MAP_RES = 1. # meters
def goldenSection(func, a, b, tol = 1e-5):
gr = 0.61803398875
c = b - gr * (b - a)
d = a + gr * (b - a)
fc = func(c)
fd = func(d)
while abs(c-d) > tol:
if fc < fd:
b = d
d = c
c = b - gr * (b - a)
fd = fc
fc = func(c)
else:
a = c
c = d
d = a + gr * (b - a)
fc = fd
fd = func(d)
return (b+a) / 2.
def constrain(val,minval,maxval):
if val < minval:
return minval
elif val > maxval:
return maxval
return val
class CableController():
def __init__(self, points, maxSpeed, minSpeed, tanAccelLim, normAccelLim, smoothStopP, maxAlt):
# Maximum tangential acceleration along the cable, m/s^2
self.tanAccelLim = tanAccelLim
# Maximum acceleration normal to the cable, m/s^2
self.normAccelLim = normAccelLim
# Smoothness of stops at the endpoints and at targets along the cable
self.smoothStopP = smoothStopP
# Maximum speed along the cable, m/s
self.maxSpeed = maxSpeed
# Minimum speed along the cable, m/s
self.minSpeed = minSpeed
# Minimum allowable position.z, meters (AKA max altitude), Convert Altitude (NEU) to NED
if maxAlt is not None:
self.posZLimit = -maxAlt
else:
self.posZLimit = None
# Input speed
self.desiredSpeed = 0.
# Current speed along the cable, m/s
self.speed = 0.
# Catmull-Rom spline with added virtual tangency control points at either end
self.spline = CatmullRom([points[0]*2 - points[1]]+points+[points[-1]*2 - points[-2]])
# Number of spline segments (should really come from CatmullRom)
self.numSegments = len(points)-1
# Current position in P domain, parameter normalized to cable total arc length
self.currentP = 1.0
# Target position in P domain
self.targetP = self.currentP
# Previously reached target, once set
self.prevReachedTarget = None
# Current segment, ranges from 0 to # of segments-1
self.currentSeg, self.currentU = self.spline.arclengthToNonDimensional(self.currentP)
# Current position as a Vector3, meters
self.position = self.spline.position(self.currentSeg, self.currentU)
# Current velocity as a Vector3, m/s
self.velocity = Vector3()
# Flag to indicate that the maximum altitude has been exceeded
self.maxAltExceeded = False
# Number of segments in curvature map
self.curvatureMapNumSegments = int(math.ceil(self.spline.totalArcLength/CURVATURE_MAP_RES))
# Number of joints in curvature map
self.curvatureMapNumJoints = self.curvatureMapNumSegments+1
# Curvature map joint positions in p domain
self.curvatureMapJointsP, self.curvatureMapSegLengthP = linspace(0., 1., self.curvatureMapNumJoints, retstep = True)
# Curvature map segment length in meters
self.curvatureMapSegLengthM = self.curvatureMapSegLengthP * self.spline.totalArcLength
# Non-dimensional curvature map joint position (cache)
self.curvatureMapJointsNonDimensional = [None for _ in range(self.curvatureMapNumJoints)]
# Speed limits for each curvature map segment (cache)
self.curvatureMapSpeedLimits = [None for _ in range(self.curvatureMapNumSegments)]
# Thread lock on curvature map segments
self.curvatureMapLocks = [threading.Lock() for _ in range(self.curvatureMapNumSegments)]
self.curvatureMapSegmentsComputedLock = threading.Lock()
# number of map segments that have been computed by the curvatureMapThread
self.curvatureMapSegmentsComputed = 0
# flag that indicates to the thread to die
self.poisonPill = False
# setup a worker thread to compute map segment maximum speeds
self.curvatureMapThread = threading.Thread(target=self._computeCurvatureMap)
self.curvatureMapThread.setDaemon(True)
# start the worker thread
self.curvatureMapThread.start()
def __del__(self):
self.poisonPill = True
self.curvatureMapThread.join(timeout = 2)
# Public interface:
def reachedTarget(self):
'''Return True if we've reached the target, else False'''
return abs(self.currentP - self.targetP) * self.spline.totalArcLength < TARGET_EPSILON_M
def setTargetP(self, targetP):
'''Interface to set a target P'''
self.targetP = targetP
def trackSpeed(self, speed):
'''Updates controller desired speed'''
self.desiredSpeed = speed
def update(self, dt):
'''Advances controller along cable by dt'''
# Speed always in direction of target
self.desiredSpeed = math.copysign(self.desiredSpeed, self.targetP - self.currentP)
self.speed = constrain(self._constrainSpeed(self.desiredSpeed), self.speed - self.tanAccelLim*dt, self.speed + self.tanAccelLim*dt)
self._traverse(dt)
def setCurrentP(self,p):
'''Sets the controller's current P position on the cable'''
self.currentP = p
self.currentSeg, self.currentU = self.spline.arclengthToNonDimensional(self.currentP)
def killCurvatureMapThread(self):
'''Sets poisonPill to True so the curvatureMapThread knows to die'''
self.poisonPill = True
# Internal functions:
def _computeCurvatureMap(self):
'''Computes curvature map, prioritizes map construction based on vehicle position and direction of motion'''
while True:
searchStart = self._getCurvatureMapSegment(self.currentP)
if self.speed > 0:
# Search ahead, then behind
for i in range(searchStart, self.curvatureMapNumSegments)+list(reversed(range(0, searchStart))):
if self._computeCurvatureMapSpeedLimit(i):
break
elif self.speed < 0:
# Search behind, then ahead
for i in list(reversed(range(0, searchStart+1)))+range(searchStart+1, self.curvatureMapNumSegments):
if self._computeCurvatureMapSpeedLimit(i):
break
else: # speed == 0
# Search alternately ahead and behind
searchList = [x for t in list(itertools.izip_longest(range(searchStart, self.curvatureMapNumSegments), reversed(range(0, searchStart)))) for x in t if x is not None]
for i in searchList:
if self._computeCurvatureMapSpeedLimit(i):
break
# if all map segments have been computed then quit the thread
with self.curvatureMapSegmentsComputedLock:
if self.curvatureMapSegmentsComputed == self.curvatureMapNumSegments:
self.poisonPill = True
if self.poisonPill:
break
def _computeCurvatureMapSpeedLimit(self, mapSeg):
'''Computes speed limit for the requested map segment'''
with self.curvatureMapLocks[mapSeg]:
# if the speed limit has already been computed for this map segment, then don't do any work
if self.curvatureMapSpeedLimits[mapSeg] is not None:
return False
# if non-dimensional parameter has not yet been created for the associated left joint, then create it
if self.curvatureMapJointsNonDimensional[mapSeg] is None:
self.curvatureMapJointsNonDimensional[mapSeg] = self.spline.arclengthToNonDimensional(self.curvatureMapJointsP[mapSeg])
# if non-dimensional parameter has not yet been created for the associated right joint, then create it
if self.curvatureMapJointsNonDimensional[mapSeg+1] is None:
self.curvatureMapJointsNonDimensional[mapSeg+1] = self.spline.arclengthToNonDimensional(self.curvatureMapJointsP[mapSeg+1])
# split returned non-dimensional parameter tuple (seg,u) into separate values
seg1, u1 = self.curvatureMapJointsNonDimensional[mapSeg]
seg2, u2 = self.curvatureMapJointsNonDimensional[mapSeg+1]
# returns arc length for current spline segment, or the larger of the two segments if our map segment spans across multiple spline segments
maxSegLen = max(self.spline.arcLengths[seg1:seg2+1]) # m
# run a golden section search to find the segment,u pair for the point of maximum curvature in the requested map segment
# (segment,u) are stored as segment+u, e.g. segment 1, u = 0.25 -> 1.25
maxCurvatureSegU = goldenSection(lambda x: -self.spline.curvature(int(x), x-int(x)), seg1+u1, seg2+u2, tol = 1e-1/maxSegLen)
# run a golden section search to find the segment,u pair for the point of minimum Z (aka max altitude)
minPosZSegU = goldenSection(lambda x: self.spline.position(int(x), x-int(x)).z, seg1+u1, seg2+u2, tol = 1e-1/maxSegLen)
# split segment+u into segment,u and evaluate curvature at this point
maxCurvature = self.spline.curvature(int(maxCurvatureSegU),maxCurvatureSegU-int(maxCurvatureSegU))
#split segment+u into segment,u and evalute position.z at this point
minPosZ = self.spline.position(int(minPosZSegU),minPosZSegU-int(minPosZSegU)).z #m
# this prevents the copter from traversing segments of the cable
# that are above its altitude limit
if self.posZLimit is not None and minPosZ < self.posZLimit:
self.maxAltExceeded = True
#this cable will breach the altitude limit, make the speed limit for this segment 0 to stop the vehicle
self.curvatureMapSpeedLimits[mapSeg] = 0.
else:
if maxCurvature != 0.:
# limit maxspeed by the max allowable normal acceleration at that point, bounded on the lower end by minSpeed
self.curvatureMapSpeedLimits[mapSeg] = max(math.sqrt(self.normAccelLim / maxCurvature), self.minSpeed)
else:
# if curvature is zero, means a straight segment
self.curvatureMapSpeedLimits[mapSeg] = self.maxSpeed
with self.curvatureMapSegmentsComputedLock:
self.curvatureMapSegmentsComputed += 1
return True
def _getCurvatureMapSpeedLimit(self, mapSeg):
'''Look up the speed limit for the requested map segment'''
# sanitize mapSeg
if mapSeg < 0 or mapSeg >= self.curvatureMapNumSegments:
return 0.
self._computeCurvatureMapSpeedLimit(mapSeg)
return self.curvatureMapSpeedLimits[mapSeg]
def _traverse(self, dt):
''' Advances the controller along the spline '''
spline_vel_unit = self.spline.velocity(self.currentSeg, self.currentU)
spline_vel_norm = spline_vel_unit.normalize()
# advances u by the amount specified by our speed and dt
self.currentU += self.speed * dt / spline_vel_norm
# handle traversing spline segments
if self.currentU > 1.:
if self.currentSeg < self.numSegments-1:
self.currentSeg += 1
self.currentU = 0. # NOTE: this truncates steps which cross spline joints
else:
self.currentU = 1.
elif self.currentU < 0.:
if self.currentSeg > 0:
self.currentSeg -= 1
self.currentU = 1. # NOTE: this truncates steps which cross spline joints
else:
self.currentU = 0.
# calculate our currentP
self.currentP = self.spline.nonDimensionalToArclength(self.currentSeg, self.currentU)[0]
# calculate our position and velocity commands
self.position = self.spline.position(self.currentSeg, self.currentU)
self.velocity = spline_vel_unit * self.speed
def _constrainSpeed(self, speed):
'''Looks ahead and behind current controller position and constrains to a speed limit'''
if speed > 0:
return min(self.maxSpeed, speed, self._getPosSpeedLimit(self.currentP))
elif speed < 0:
return max(-self.maxSpeed, speed, self._getNegSpeedLimit(self.currentP))
return speed
def _speedCurve(self, dist, speed):
'''Returns speed based on the sqrt function or a linear ramp (depending on dist)'''
linear_velocity = self.tanAccelLim / self.smoothStopP
linear_dist = linear_velocity / self.smoothStopP
if speed > linear_velocity:
return math.sqrt(2. * self.tanAccelLim * (speed**2/(2.*self.tanAccelLim) + dist))
else:
p1 = speed / self.smoothStopP
p2 = p1 + dist
if p2 > linear_dist:
return math.sqrt(2. * self.tanAccelLim * (p2 - 0.5*linear_dist))
else:
return p2 * self.smoothStopP
def _maxLookAheadDist(self):
'''Calculate how far it would take to come to a complete stop '''
linear_velocity = self.tanAccelLim / self.smoothStopP
linear_dist = linear_velocity / self.smoothStopP
if abs(self.speed) > linear_velocity:
return 0.5 * abs(self.speed)**2 / self.tanAccelLim + 0.5*linear_dist
else:
return abs(self.speed)/self.smoothStopP
def _getCurvatureMapSegment(self, p):
'''Get the curvature map segment index at the location p'''
return int(min(math.floor(p / self.curvatureMapSegLengthP),self.curvatureMapNumSegments-1))
def _getDistToCurvatureMapSegmentBegin(self, p1, idx):
'''Get distance from p1 to the beginning of the idx curvature map segment in meters'''
p2 = self.curvatureMapJointsP[idx]
return abs(p1-p2) * self.spline.totalArcLength
def _getDistToCurvatureMapSegmentEnd(self, p1, idx):
'''Get distance from p1 to the end of the idx curvature map segment in meters'''
p2 = self.curvatureMapJointsP[idx+1]
return abs(p1-p2) * self.spline.totalArcLength
def _getPosSpeedLimit(self, p):
'''Returns speed limit for a requested arc length normalized parameter, p, moving in the positive direction'''
# Identify our current curvature map segment
mapSeg = self._getCurvatureMapSegment(p)
# get speed limit for the upcoming curvature map segment
nextMapSegSpeed = self._getCurvatureMapSpeedLimit(mapSeg+1)
# get distance (in meters) from current position to start of next curvature map segment
nextMapSegDist = self._getDistToCurvatureMapSegmentEnd(p, mapSeg)
# set speed limit to the minimum of the current curvature map segment and the transition to the next curvature map segment speed
speedLimit = min(self._getCurvatureMapSpeedLimit(mapSeg), self._speedCurve(nextMapSegDist, nextMapSegSpeed)) # m/s
# loop through all remaining segments in that direction
for mapSeg in range(mapSeg+1,self.curvatureMapNumSegments):
# increment distance by another curvature map segment length
nextMapSegDist += self.curvatureMapSegLengthM
# if that distance is greater than the distance it would take to stop, then break to save time (no need to look ahead any further)
if nextMapSegDist > self._maxLookAheadDist():
break
# get curvature map seg speed at this next segment
nextMapSegSpeed = self._getCurvatureMapSpeedLimit(mapSeg+1) # NOTE: self.getCurvatureMapSpeedLimit(self.curvatureMapNumSegments) is 0
# limit us if the new map segment speed is slower than our current speed limit
speedLimit = min(speedLimit, self._speedCurve(nextMapSegDist, nextMapSegSpeed))
# if targetP is ahead of currentP then check for a speed limit to slow down at the target
if self.targetP >= self.currentP:
speedLimit = min(speedLimit, self._speedCurve(abs(self.targetP - self.currentP)*self.spline.totalArcLength, 0))
return speedLimit
def _getNegSpeedLimit(self, p):
'''Returns speed limit for a requested arc length normalized parameter, p, moving in the negative direction'''
# Identify our current curvature map segment
mapSeg = self._getCurvatureMapSegment(p)
# get speed limit for the previous curvature map segment
prevMapSegSpeed = self._getCurvatureMapSpeedLimit(mapSeg-1)
# get distance (in meters) from current position to start of previous curvature map segment
prevMapSegDist = self._getDistToCurvatureMapSegmentBegin(p, mapSeg)
# set speed limit to the minimum of the current curvature map segment and the transition to the previous curvature map segment speed
speedLimit = min(self._getCurvatureMapSpeedLimit(mapSeg), self._speedCurve(prevMapSegDist, prevMapSegSpeed)) # m/s
# loop through all remaining segments in that direction
for mapSeg in reversed(range(0,mapSeg)):
# increment distance by another curvature map segment length
prevMapSegDist += self.curvatureMapSegLengthM
# if that distance is greater than the distance it would take to stop, then break to save time (no need to look ahead any further)
if prevMapSegDist > self._maxLookAheadDist():
break
# get curvature map seg speed at this previous segment
prevMapSegSpeed = self._getCurvatureMapSpeedLimit(mapSeg-1) # NOTE: self.getCurvatureMapSpeedLimit(-1) is 0
# limit us if the new map segment speed is slower than our current speed limit
speedLimit = min(speedLimit, self._speedCurve(prevMapSegDist, prevMapSegSpeed))
if self.targetP <= self.currentP:
speedLimit = min(speedLimit, self._speedCurve(abs(self.targetP - self.currentP)*self.spline.totalArcLength, 0))
return -speedLimit