-
Notifications
You must be signed in to change notification settings - Fork 1
/
custom_pid.py
167 lines (148 loc) · 6.48 KB
/
custom_pid.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# coding: utf-8
from time import time
from typing import Union
from crappy.blocks.block import Block
class PID(Block):
"""A PID corrector.
A PID will continuously adjust its output based on the target value and the
actual measured value, to try to actually reach the target.
"""
def __init__(self,
kp: float,
ki: float = 0,
kd: float = 0,
freq: float = 500,
out_max: float = float('inf'),
out_min: float = -float('inf'),
target_label: str = 'cmd',
input_label: str = 'V',
time_label: str = 't(s)',
labels: list = None,
reverse: bool = False,
i_limit: Union[float, tuple] = 1,
send_terms: bool = False) -> None:
"""Sets the args and initializes the parent class.
Args:
kp (:obj:`float`): `P` gain.
ki (:obj:`float`): `I` gain.
kd (:obj:`float`): `D` gain.
freq (:obj:`float`, optional): The block will loop at this frequency.
out_max (:obj:`float`, optional): A value the output can never be
superior to.
out_min (:obj:`float`, optional): A value the output can never be
inferior to.
target_label (:obj:`str`, optional): The label of the setpoint.
input_label (:obj:`str`, optional): The reading of the actual value to be
compared with the setpoint.
time_label (:obj:`str`, optional): The label of the time.
labels (:obj:`list`, optional): The labels of the output of the block. It
must contain two :obj:`str` : the time label and the actual output.
reverse (:obj:`bool`, optional): To reverse the retro-action.
i_limit (:obj:`tuple`, optional): To avoid over-integration. If given as
a :obj:`tuple` of two values, they will be the boundaries for the `I`
term. If given as a single :obj:`float` the boundaries will be:
::
i_limit * out_min, i_limit * out_max
send_terms (:obj:`bool`, optional): To get the weight of each term in the
output value. It will add ``['p_term', 'i_term', 'd_term']`` to the
labels. This is particularly useful to tweak the gains.
"""
Block.__init__(self)
self.niceness = -10
self.freq = freq
self.out_max = out_max
self.out_min = out_min
self.target_label = target_label
self.input_label = input_label
self.time_label = time_label
self.labels = ['t(s)', 'pid'] if labels is None else labels
self.reverse = reverse
self.i_limit = i_limit
self.send_terms = send_terms
self.set_k(kp, ki, kd)
self.i_term = 0
self.last_val = 0
if self.send_terms:
self.labels.extend(['p_term', 'i_term', 'd_term'])
if not isinstance(self.i_limit, tuple):
i_min = self.i_limit * self.out_min if self.out_min is not None else None
i_max = self.i_limit * self.out_max if self.out_max is not None else None
self.i_limit = (i_min, i_max)
assert len(self.i_limit) == 2, "Invalid i_limit arg!"
self.current_cycle = 0
def begin(self) -> None:
self.last_t = self.t0
data = [inp.recv_last(True) for inp in self.inputs]
for i, r in enumerate(data):
if self.target_label in r:
self.target_link_id = i
if self.input_label in r and self.time_label in r:
self.feedback_link_id = i
assert hasattr(self, "target_link_id"), "[PID] Error: no link containing"\
" target label {}".format(self.target_label)
assert hasattr(self, "feedback_link_id"), \
"[PID] Error: no link containing input label {} " \
"and time label {}".format(self.input_label, self.time_label)
assert set(range(len(self.inputs))) == {self.target_link_id,
self.feedback_link_id}, \
"[PID] Error: useless link(s)! Make sure PID block does not " \
"have extra inputs"
self.last_target = data[self.target_link_id][self.target_label]
self.last_t = data[self.feedback_link_id][self.time_label]
# For the classical D approach:
# self.last_val = self.last_target -\
# data[self.feedback_link_id][self.input_label]
# When ignore setpoint mode
self.last_val = data[self.feedback_link_id][self.input_label]
if self.send_terms:
self.send([self.last_t, 0, 0, 0, 0])
else:
self.send([self.last_t, 0])
def clamp(self, v: float, limits: tuple = None) -> float:
if limits is None:
mini, maxi = self.out_min, self.out_max
else:
mini, maxi = limits
return max(v if maxi is None else min(v, maxi), mini)
def set_k(self, kp: float, ki: float = 0, kd: float = 0) -> None:
s = -1 if self.reverse else 1
self.kp = s * kp
self.ki = s * kp * ki
self.kd = s * kp * kd
def loop(self) -> None:
data = self.inputs[self.feedback_link_id].recv_last(True)
t = data[self.time_label]
dt = t - self.last_t
if dt <= 0:
return
if data["cycle"] != self.current_cycle : # modified
self.i_term = 0 # modified
self.current_cycle = data["cycle"] # modified
feedback = data[self.input_label]
if self.feedback_link_id == self.target_link_id:
target = data[self.target_label]
else:
data = self.inputs[self.target_link_id].recv_last()
if data is None:
target = self.last_target
else:
target = data[self.target_label]
self.last_target = target
diff = target-feedback
p_term = self.kp * diff
self.last_t = t
# Classical approach
# d_term = self.kd * (diff - self.last_val)
# self.last_val = diff
# Alternative: ignore setpoint to avoid derivative kick
d_term = -self.kd * (feedback - self.last_val) / dt
self.last_val = feedback
self.i_term += self.ki * diff * dt
out = p_term + self.i_term + d_term
if not self.out_min < out < self.out_max:
out = self.clamp(out)
self.i_term = self.clamp(self.i_term, self.i_limit)
if self.send_terms:
self.send([time() - self.t0, out, p_term, self.i_term, d_term])
else:
self.send([time() - self.t0, out])