-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathneopixel_utility.py
423 lines (347 loc) · 16.6 KB
/
neopixel_utility.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
import sys
import logging
import random
import math
import ast
import collections
import uuid
from colour import Color
from neopixel import PrinterNeoPixel
GAMMA_TABLE_STEPS=100
Pattern = collections.namedtuple('Pattern','name function')
Animation = collections.namedtuple('Animation','name function')
class AnimationManager:
def __init__(self, generator, start, end, step, duration, utility, ignore_gamma=False, id=None):
self.generator = generator
self.start = start
self.end = end
self.step = step
self.duration = duration
self.utility = utility
self.reactor = self.utility.reactor
self.ignore_gamma = ignore_gamma
self.frame_counter = 0
if id:
self.id = id
else:
self.id = uuid.uuid1()
self.timer = None
def begin(self):
self.timer = self.reactor.register_timer(self.update)
self.endtime = self.reactor.monotonic() + self.duration
self.reactor.update_timer(self.timer, self.reactor.NOW)
def pause(self):
pass
def resume(self):
pass
def update(self, eventtime):
self.frame_counter += 1
if eventtime > self.endtime:
self.cleanup()
return self.reactor.NEVER
else:
next_event = self.reactor.monotonic() + self.step
state = next(self.generator)
logging.debug('[{frame},{targettime},{eventtime},{nexttime}] Update event for animation {id}' \
' to state {state} ....'.format(id=self.id, state=state[:4],
frame=self.frame_counter, targettime = self.timer.waketime,
eventtime=eventtime, nexttime=next_event))
self.utility.set_range_colours(self.start, self.end, state, self.ignore_gamma)
#self.reactor.update_timer(self.sample_timer, self.reactor.monotonic() + self.step)
return next_event
def cleanup(self):
# Get utility to remove this generator from the list
# Unregister timer from reactor
logging.debug('Completed running animation {0}. Dropping animation from queue.'.format(self.id))
# Cannot remove timer if this is within update as will be inside reactor for loop over timer list
# May need to make cleanup a periodic activity for the utility - either polled or done before insertion of new
# animations to the queue
# unregister_timer(self.timer)
self.utility.remove_animation(self)
class NeopixelUtility(PrinterNeoPixel):
def __init__(self, config):
PrinterNeoPixel.__init__(self, config)
name = config.get_name().split()[1]
self.gcode = self.printer.lookup_object('gcode')
self.reactor = self.printer.get_reactor()
self.gamma = config.get('gamma', 2.7)
self.gamma_adjust = config.getboolean('gamma_adjust', True)
self.gamma_table = self._gamma_table(GAMMA_TABLE_STEPS, self.gamma)
self.animations = []
self.gcode.register_mux_command(
"SET_LED_PATTERN", "LED", name,
self.cmd_SET_LED_PATTERN,
desc=self.cmd_SET_LED_PATTERN_help)
self.gcode.register_mux_command(
"SET_LED_ANIMATION", "LED", name,
self.cmd_SET_LED_ANIMATION,
desc=self.cmd_SET_LED_ANIMATION_help)
# Parameters:
# - SPEED Relative speed of animations to base ([0 to 10] -> default 1.)
# - TERMINATE Length of time to run before terminating (for looping animations)
# - RANGE Allow a subset of pixels to be set?
# RANGE=x,y: Select a subset of LEDs to apply effects to (index x to y inclusive)
# Should do an out of bounds check on this
# Patterns
# ========
# Random
# Gradient (DIRECTION: 1=*Ascending, 0=Descending)
# Custom (CUSTOM: Define the pattern to use)
# Animations (+ Animation Specific Parameters) / Separate into animations and allocations? What about Rider?
# Random
# Rainbow (repeat vs full range)
# March Direction, Speed, Steps
# Pattern Pattern
# Fade
# Pulse
# Solid Colour
# Rider Pattern
# Loading pattern (slowly fills up full lights - loadtofull? - needs acceleration option as well?
# Lightning
# Raindrops
cmd_SET_LED_PATTERN_help = "Set a static pattern for the LEDs"
def cmd_SET_LED_PATTERN(self, params):
#logging.debug(self.get_status(None)['color_data'])
pattern = params.get('PATTERN', 'Unknown')
limits = map(int,params.get('RANGE', '1,{0}'.format(self.chain_count)).split(','))
patterns = [
Pattern('random', self.__pattern_random),
Pattern('gradient', self.__pattern_gradient),
Pattern('custom', self.__pattern_custom)
]
pattern_list = map(str.lower,list(zip(*patterns))[0])
if pattern.lower() not in pattern_list:
pattern = 'random'
self.gcode.respond_info(
'Using random pattern. Please select a pattern using' \
' PATTERN= and pass one of the following' \
' patterns: {}'.format(', '.join(pattern_list)))
func = [x.function for x in patterns if x.name == pattern.lower()][0]
#logging.debug(pattern)
#logging.debug(func)
func(params, limits)
cmd_SET_LED_ANIMATION_help = "Start an animation"
def cmd_SET_LED_ANIMATION(self, params):
animation = params.get('ANIMATION', 'Unknown')
limits = map(int,params.get('RANGE', '1,{0}'.format(self.chain_count)).split(','))
animations = [
Animation('march', self.__animation_march),
Animation('strobe', self.__animation_strobe)
]
animation_list = map(str.lower,list(zip(*animations))[0])
if animation.lower() not in animation_list:
animation = 'march'
self.gcode.respond_info(
'Using march animation. Please select an animation using' \
' ANIMATION= and pass one of the following'\
' animations: {}'.format(', '.join(animation_list)))
func = [x.function for x in animations if x.name == animation.lower()][0]
#logging.debug(animation)
#logging.debug(func)
func(params, limits)
# Split speed from the normal rate? Make it a multiplier? Or just document
def __animation_march(self, params, limits):
ascending = params.get_int('ASCENDING', 1)
speed = params.get_float('SPEED', 0.1)
duration = params.get_float('DURATION',5.)
state = self.__get_status_range(limits[0], limits[1])
start = limits[0]
end = limits[1]
#chain_length = limits[1] - limits[0] + 1
state_generator = self.__animation_march_generator(state, ascending)
animation = AnimationManager(generator=state_generator, start=start,
end=end, step=speed, duration=duration, utility=self,
ignore_gamma=True)
self.animations.append(animation)
animation.begin()
def __animation_march_generator(self, state, ascending):
while True:
if ascending:
state = state[1:] + state[:1]
else:
state = state[-1:] + state[:-1]
yield state
def __animation_strobe(self, params, limits):
ascending = params.get_int('ASCENDING', 1)
speed = params.get_float('SPEED', 0.05)
duration = params.get_float('DURATION',5.)
colour_string = params.get('COLOUR','red')
try:
if colour_string.strip().startswith('rgb') and ('=' in colour_string):
colour = Color(rgb=ast.literal_eval(colour_string.split('=')[1]))
else:
colour = Color(colour_string)
except:
logging.debug('Exception: {0}'.format(sys.exc_info()[0]))
self.gcode.respond_info(
'Could not intepret {0} as a colour. Please check' \
' the documentation. Replacing entry with red'.format(colour_string))
colour = Color('red')
start = limits[0]
end = limits[1]
state_generator = self.__animation_strobe_generator(end - start + 1, colour, ascending)
animation = AnimationManager(generator=state_generator, start=start,
end=end, step=speed, duration=duration, utility=self,
ignore_gamma=True)
self.animations.append(animation)
animation.begin()
def __animation_strobe_generator(self, length, colour, ascending):
state = [Color('black')] * length
state[0] = colour
yield state
while True:
if ascending:
state = state[1:] + state[:1]
else:
state = state[-1:] + state[:-1]
yield state
def __pattern_gradient(self, params, limits):
ascending = params.get_int('ASCENDING', 1)
if ascending:
for i in range(1,self.chain_count):
linear_gradient = float(i) / self.chain_count
c = Color(rgb=(linear_gradient,linear_gradient,linear_gradient))
self._set_neopixels(*c.rgb, index=i, transmit=False)
self._set_neopixels(1.,1.,1.,index=self.chain_count)
else:
for i in range(self.chain_count,1,-1):
linear_gradient = float(self.chain_count - i + 1) / self.chain_count
c = Color(rgb=(linear_gradient,linear_gradient,linear_gradient))
self._set_neopixels(*c.grb, index=i, transmit=False)
self._set_neopixels(1.,1.,1.,index=1)
def __pattern_random(self, params, limits):
for i in range(limits[0],limits[1]):
self._set_neopixels(random.random(),random.random(),random.random(),index=i, transmit=False)
self._set_neopixels(1.,1.,1.,index=limits[1])
def __pattern_custom(self, params, limits):
custom = params.get('CUSTOM', '')
if custom == '':
self.gcode.respond_info(
'Please define a pattern using CUSTOM=. See the' \
' documentation for details. Defaulting to red|white|blue')
custom = 'red|white|blue'
colour_pattern = []
colour_pattern_strings = [x for x in custom.split('|')]
for string in colour_pattern_strings:
try:
if string.strip().startswith('rgb') and ('=' in string):
colour_pattern.append(Color(rgb=ast.literal_eval(string.split('=')[1])))
else:
colour_pattern.append(Color(string))
except:
logging.debug('Exception: {0}'.format(sys.exc_info()[0]))
self.gcode.respond_info(
'Could not intepret {0} as a colour. Please check' \
' the documentation. Replacing entry with white'.format(string))
colour_pattern.append(Color('white'))
pattern_length = len(colour_pattern)
if pattern_length == 0:
self.gcode.respond_info(
'Pattern is empty. Defaulting to red|white|blue')
colour_pattern = [Color('red'), Color('white'), Color('blue')]
pattern_length = 3
chain_length = limits[1] - limits[0] + 1
q, r = divmod(chain_length, pattern_length)
for i in range(q):
start = limits[0] + (i * pattern_length)
for j in range(pattern_length):
transmit = (i == q-1) and (r==0)
self._set_neopixels(*colour_pattern[j].rgb, index=(start+j), transmit=transmit)
if r > 0:
start = limits[0] + (q * pattern_length)
for i in range(r):
transmit = (i == r-1)
self._set_neopixels(*colour_pattern[i].rgb, index=(start+i), transmit=transmit)
def _gamma_lookup(self, number):
return self.gamma_table[int(round((GAMMA_TABLE_STEPS-1) * number))]
def _gamma_convert(self, colour):
return Color(rgb=map(self._gamma_lookup, colour.rgb))
def _gamma_table(self, nsteps, gamma):
gammaedUp = [math.pow(x, gamma) for x in range(nsteps)]
return [x/max(gammaedUp) for x in gammaedUp]
def _pause(self, time=0.):
eventtime = self.reactor.monotonic()
end = eventtime + time
while eventtime < end:
eventtime = self.reactor.pause(eventtime + .05)
def __get_status_range(self, start_index, end_index):
state = self.get_status(None)['color_data'][start_index - 1:end_index]
return self.__dicts_to_colors(state)
def __dicts_to_colors(self, dicts):
return [Color(rgb=(x['R'],x['G'],x['B'])) for x in dicts]
def set_range_colours(self, start, end, colours, ignore_gamma=False):
for i in range(end - start + 1):
index = start + i
c = colours[i]
if self.gamma_adjust and not ignore_gamma:
c = self._gamma_convert(c)
self.update_color_data(*c.rgb, white=0., index=index)
if index == end:
self.send_data()
def get_animation_by_id(self, id):
for animation in self.animations:
if animation == id:
return animation
return None
def remove_animation(self, animation):
index = self.animations.index(animation)
if index:
self.animations.pop(index)
else:
logging.debug("Could not find animation in queue to remove it: {0}".format(animation.id))
# Copied relevant parts from neopixels SET_LED cmd
def _set_neopixels(self, red, green, blue, white=0., index=None, transmit=True, ignore_gamma=False):
def reactor_bgfunc(print_time):
with self.mutex:
c = Color(rgb=(red,green,blue))
if self.gamma_adjust and not ignore_gamma:
c = self._gamma_convert(c)
#logging.info("Setting: {0} {1} {2}".format(red, green, blue))
self.update_color_data(*c.rgb, white=white, index=index)
if transmit:
self.send_data(print_time)
def lookahead_bgfunc(print_time):
self.reactor.register_callback(lambda et: reactor_bgfunc(print_time))
# No sync - just do it
lookahead_bgfunc(None)
def load_config_prefix(config):
return NeopixelUtility(config)
## Utility functions for the Colour library
def __limit(x):
return max(0., min(1., x))
def __coloradd(self, other):
if type(other) == Color:
return Color(rgb=(__limit(self.red + other.red),__limit(self.green + other.green), __limit( self.blue + other.blue)))
elif type(other) in [float, int]:
return Color(rgb=(__limit(self.red + other),__limit(self.green + other), __limit( self.blue + other)))
else:
raise TypeError('unsupported operand type(s) for +: {0} and {1}'.format(type(self), type(other)))
def __colorsub(self, other):
if type(other) == Color:
return Color(rgb=(__limit(self.red - other.red),__limit(self.green - other.green), __limit( self.blue - other.blue)))
elif type(other) in [float, int]:
return Color(rgb=(__limit(self.red - other),__limit(self.green - other), __limit( self.blue - other)))
else:
raise TypeError('unsupported operand type(s) for -: {0} and {1}'.format(type(self), type(other)))
def __rcolorsub(self, other):
if type(other) in [float, int]:
return Color(rgb=(__limit(other - self.red),__limit(other - self.green), __limit( other - self.blue)))
else:
raise TypeError('unsupported operand type(s) for -: {0} and {1}'.format(type(self), type(other)))
def __colormult(self, other):
if type(other) in [float, int]:
return Color(rgb=(__limit(other * self.red),__limit(other * self.green), __limit( other * self.blue)))
else:
raise TypeError('unsupported operand type(s) for *: {0} and {1}'.format(type(self), type(other)))
def __colordiv(self, other):
if type(other) in [float, int]:
return Color(rgb=(__limit(self.red / other),__limit(self.green / other), __limit(self.blue / other)))
else:
raise TypeError('unsupported operand type(s) for *: {0} and {1}'.format(type(self), type(other)))
Color.__add__ = __coloradd
Color.__radd__ = __coloradd
Color.__sub__ = __colorsub
Color.__rsub__ = __rcolorsub
Color.__mul__ = __colormult
Color.__rmul__ = __colormult
Color.__div__ = __colordiv