-
Notifications
You must be signed in to change notification settings - Fork 34
/
toogg
executable file
·819 lines (718 loc) · 33 KB
/
toogg
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
#!/usr/bin/env python3
import itertools as it, operator as op, functools as ft, datetime as dt, pathlib as pl
import os, sys, re, logging, math, contextlib, signal, json, tempfile, shutil
import asyncio, asyncio.subprocess as sp
class LogMessage:
def __init__(self, fmt, a, k): self.fmt, self.a, self.k = fmt, a, k
def __str__(self): return self.fmt.format(*self.a, **self.k) if self.a or self.k else self.fmt
class LogStyleAdapter(logging.LoggerAdapter):
def __init__(self, logger, extra=None):
super(LogStyleAdapter, self).__init__(logger, extra or {})
def log(self, level, msg, *args, **kws):
if not self.isEnabledFor(level): return
log_kws = {} if 'exc_info' not in kws else dict(exc_info=kws.pop('exc_info'))
msg, kws = self.process(msg, kws)
self.logger._log(level, LogMessage(msg, args, kws), (), log_kws)
get_logger = lambda name: LogStyleAdapter(logging.getLogger(name))
err_fmt = lambda err: f'[{err.__class__.__name__}] {err}'
_td_days = dict(
y=365.25, yr=365.25, year=365.25,
mo=30.5, month=30.5, w=7, week=7, d=1, day=1 )
_td_s = dict( h=3600, hr=3600, hour=3600,
m=60, min=60, minute=60, s=1, sec=1, second=1, ms=1e-3 )
_td_usort = lambda d: sorted(
d.items(), key=lambda kv: (kv[1], len(kv[0])), reverse=True )
_td_re = re.compile(r'(?i)^[-+]?\s*' + ''.join(
fr'(?P<{k}>\d+(?:\.\d+)?{k}\s*)?' for k, v in
[*_td_usort(_td_days), *_td_usort(_td_s)] ) + '$')
def parse_pos_spec(pos):
if not pos: return
try: return float(pos)
except ValueError: pass
if ':' in pos:
mins, secs = pos.rsplit(':', 1)
try: hrs, mins = mins.rsplit(':', 1)
except ValueError: hrs = 0
return sum( a*b for a, b in
zip([3600, 60, 1], map(float, [hrs, mins, secs])) )
else:
if not ((m := _td_re.search(pos)) and any(m.groups())):
raise ValueError(pos) from None
delta = 0
for dm, units in enumerate([_td_s, _td_days]):
for k, v in units.items():
if not m.group(k): continue
delta += dm * 3600 * 24 + v * float(''.join(
filter('.0123456789'.__contains__, m.group(k)) ) or 1)
return delta
def td_repr( ts, ts0=None, units_max=2, units_res=None,
_units=dict( h=3600, m=60, s=1,
y=365.25*86400, mo=30.5*86400, w=7*86400, d=1*86400 ) ):
delta = ts if ts0 is None else (ts - ts0)
if isinstance(delta, dt.timedelta): delta = ts.total_seconds()
res, s, n_last = list(), abs(delta), units_max - 1
units = sorted(_units.items(), key=lambda v: v[1], reverse=True)
for unit, unit_s in units:
val = math.floor(s / unit_s)
if not val:
if units_res == unit: break
continue
if len(res) == n_last or units_res == unit:
val, n_last = round(s / unit_s), True
res.append(f'{val:.0f}{unit}')
if n_last is True: break
s -= val * unit_s
return ' '.join(res) if res else '<1s'
async def aio_proc_close(proc, timeout=2.0):
try:
proc.terminate()
try: await asyncio.wait_for(proc.wait(), timeout)
except TimeoutError: proc.kill()
except OSError: pass
finally: await proc.wait()
class FFProc:
proc = finished = None
stream_reader_limit = 32 * 2**10
prefix, crop_mark = '', r'... ¯\_(ツ)_/¯'
def __init__( self, loop, src, cmd,
stream_len=None, progress_func=None, prefix=None, limit=None, **kws ):
self.loop, self.src, self.cmd, self.kws = loop, src, cmd, kws
self.progress_func, self.stream_len = progress_func, stream_len
self.term_width = shutil.get_terminal_size()[0] - 2
if prefix: self.prefix = prefix
self.line_max_len = self.kws['limit'] = limit or self.stream_reader_limit
self.cmd_repr = ' '.join((v if len(str(v).split()) == 1 else repr(v)) for v in cmd)
async def __aenter__(self):
await self.run(wait=False)
return self
async def __aexit__(self, *err):
if self.finished and not self.finished.done():
self.finished.cancel()
with contextlib.suppress(asyncio.CancelledError): await self.finished
async def run(self, wait=True):
assert not self.proc
log.debug('[{!r}] running: {}', self.src, self.cmd_repr)
self.kws.setdefault('stdin', sp.DEVNULL)
if self.progress_func: self.kws['stdout'] = sp.PIPE
self.proc = await asyncio.create_subprocess_exec(*self.cmd, **self.kws)
for k in 'stdin', 'stdout', 'stderr': setattr(self, k, getattr(self.proc, k))
self.finished = self.loop.create_task(self.wait())
if wait: await self.finished
return self
async def wait(self):
progress_task = None
if self.progress_func and self.proc.stdout:
progress_task = self.loop.create_task(self.print_progress())
try:
await self.proc.wait()
if progress_task: await progress_task
if self.proc.returncode != 0:
cmd_repr = '' if not self.cmd_repr else f': {self.cmd_repr}'
raise AudioConvError( f'Command for src {self.src!r}'
f' exited with non-zero status ({self.proc.returncode}){cmd_repr}' )
finally:
if progress_task and not progress_task.done():
progress_task.cancel()
with contextlib.suppress(asyncio.CancelledError): await progress_task
if self.proc.returncode is None: await aio_proc_close(self.proc)
async def readline(self, stream=None, sep=b'\n',
decode_enc='utf-8', decode_err='replace', crop_to=8 * 2**10, crop_mark=None):
'''Similar to StreamReader.readline, but also:
- Decodes all lines (with `decode_enc` and `decode_err` opts).
- Lines that are longer than `crop_to` (after decoding)
are cropped to that limit and `crop_mark` is appended.
- StreamReader `limit` is not an issue -
line tails above that are cropped-out and discarded.'''
if not stream: stream = self.stdout
elif isinstance(stream, str): stream = getattr(self, stream)
assert crop_to <= self.line_max_len
crop_mark = crop_mark or self.crop_mark
try: line = await stream.readuntil(sep)
except asyncio.IncompleteReadError as err: line = err.partial
except asyncio.LimitOverrunError:
line = await stream.read(self.line_max_len)
while True:
try: await stream.readuntil(sep)
except asyncio.IncompleteReadError: break
except asyncio.LimitOverrunError: await stream.read(self.line_max_len)
else: break
line = line.decode(decode_enc, decode_err)
if len(line) > crop_to: line = line[:crop_to] + crop_mark
return line
def print_progress_prefix(self, msg):
prefix_max_len = self.term_width - len(msg)
if prefix_max_len >= len(self.prefix) + 10:
prefix_len, prefix = prefix_max_len - len(self.prefix) - 6, repr(self.src)
if len(prefix) > prefix_len: prefix = prefix[:prefix_len]
prefix = ' '.join(filter(None, [f'[{prefix}]', self.prefix]))
msg = f'{prefix}: {msg}'
elif prefix_max_len > 3: msg = '{}: {}'.format(self.prefix[:prefix_max_len-2], msg)
else: msg = msg[:self.term_width]
return msg
async def print_progress(self):
report, pos_str = dict(), lambda n:\
'{:02d}:{:02d}:{:02d}'.format(*map(int, [n//3600, (n%3600)//60, n%60]))
print_progress = ft.partial(self.progress_func, (self.src, self.prefix))
try:
while True:
line = await self.readline()
if not line: return
k, v = line.split('=', 1)
report[k] = v
if k != 'progress': continue
try: pos = int(report.get('out_time_ms', 0)) / 1_000_000
except: pos = -1 # assuming "N/A" value is "seek"
if not pos: continue
elif pos < 0: msg = 'seek'
else:
msg = ( pos_str(pos) if not self.stream_len else
'{: 5.1f}% - {} / {}'.format(
min(1, pos / self.stream_len) * 100,
pos_str(pos), pos_str(self.stream_len) )[1:] )
msg = self.print_progress_prefix(msg)
print_progress(msg)
finally: print_progress()
class YTDLProc(FFProc):
prefix = 'ytdl'
async def print_progress(self):
print_progress = ft.partial(self.progress_func, (self.src, self.prefix))
try:
while True:
line = await self.readline()
if not line: return
if not line.startswith('[download]'): continue
msg = self.print_progress_prefix(f'{line[10:].strip()}')
print_progress(msg)
finally: print_progress()
class AudioConvError(Exception): pass
class AudioConvJob:
__slots__ = ( 'name src dst offset length'
' tmp_file ffmpeg can_copy_stream' ).split()
def __init__(self, *args, **kws):
for k,v in it.chain(zip(self.__slots__, args), kws.items()): setattr(self, k, v)
class AudioConvPath:
def __init__(self, src):
self.p = pl.Path(src) if not isinstance(src, AudioConvPath) else src.p
def _repr(self, ext=''): return f'<{self.__class__.__name__} [ {self.p} ]>'
def __repr__(self): return self._repr()
class AudioConvSubResult(AudioConvPath): pass
class AudioConvProbed(AudioConvSubResult):
@classmethod
def ensure(cls, src):
self = cls(src)
if isinstance(src, cls):
for k in 'cleanup duration codec label'.split(): setattr(self, k, getattr(src, k))
return self
def __init__(self, src, cleanup=False, duration=None, codec=None, label=None):
super().__init__(src)
self.label = label or self.p.name
self.cleanup, self.duration, self.codec = cleanup, duration, codec
def _repr(self, ext=''):
return ( f'<{self.__class__.__name__} {self.label!r}'
f' [del={self.cleanup} d={self.duration} c={self.codec}{ext}]>' )
class AudioConvChunk(AudioConvProbed):
def __init__(self, src, dst, start, duration=None, codec=None, label=None):
super().__init__(src, False, duration, codec, label)
self.dst, self.start = dst, start
def _repr(self, ext=''): return super()._repr(f' ss={self.start}')
class AudioConvChunkDir(AudioConvPath):
def __init__(self, path, src, chunks, cleanup=False):
super().__init__(path)
self.src, self.chunks, self.cleanup = src, chunks, cleanup
class AudioConv:
@classmethod
async def run_async(cls, *args, **kws):
async with cls(*args, **kws) as self: return await self.run()
def __init__(self, src_list, dst_name_tpl=None,
start=None, length=None,
chunks=None, chunks_beep=False, chunks_fn_tpl=None,
probe=None, inplace=False, remove_src=False,
loudnorm=False, loudnorm_opts=None, ytdl_opts=None, max_parallel=None ):
self.loop = asyncio.get_running_loop()
self.src_list, self.dst_name_tpl = src_list, dst_name_tpl or ''
self.max_parallel = max_parallel or os.cpu_count()
self.start, self.length, self.probe = start, length, probe
self.chunks, self.chunks_beep = chunks, chunks_beep
self.chunks_fn_tpl = chunks_fn_tpl or '{n:03d}__{name}.ogg'
self.inplace, self.remove_src = inplace, remove_src
self.loudnorm, self.loudnorm_opts = loudnorm, loudnorm_opts or ''
self.ytdl_opts = ytdl_opts or list()
self.status_lines, self.chunk_dirs = dict(), set()
async def __aenter__(self):
self.conv_jobs = dict()
self.status_line_task = self.chunks_beep_task = None
self.success, self.src_done, self.exit_sig = False, dict(), None
if self.chunks_beep and not self.probe:
self.chunks_beep_task = self.loop.create_task(self.chunks_beep_path())
return self
async def __aexit__(self, *err):
if self.conv_jobs: await self.conv_cleanup()
if self.status_line_task:
self.status_line_task.cancel()
await self.status_line_task
self.status_line_task = None
if self.chunks_beep:
if self.chunks_beep_task:
self.chunks_beep_task.cancel()
with contextlib.suppress(asyncio.CancelledError): await self.chunks_beep_task
beep, self.chunks_beep = self.chunks_beep, None
if isinstance(beep, pl.Path): beep.unlink(missing_ok=True)
if self.chunk_dirs:
for cd in self.chunk_dirs:
chunk_files = set(chunk.dst for chunk in cd.chunks)
if chunk_files != self.src_done.get(cd.src.p, set()):
for p in chunk_files: p.unlink(missing_ok=True)
with contextlib.suppress(OSError): cd.p.rmdir()
if cd.cleanup: cd.src.p.unlink(missing_ok=True)
self.chunk_dirs.clear()
async def run(self):
self.conv = self.loop.create_task(self.conv_list())
self.status_line_task = self.loop.create_task(self.status_line_cycle())
def sig_handler(sig):
self.exit_sig = sig
self.conv.cancel()
for sig in 'int', 'term':
self.loop.add_signal_handler(
getattr(signal, f'SIG{sig.upper()}'), ft.partial(sig_handler, sig) )
with contextlib.suppress(asyncio.CancelledError): await self.conv
return self.success
async def status_line_cycle(self, interval=1.0):
status_queue, term_width = list(), shutil.get_terminal_size()[0] - 2
term_pad = lambda line: (
(line + ' '*max(0, term_width - len(line)))\
if len(line) < term_width else line[:term_width] )
while True:
if self.status_lines:
if not status_queue: status_queue.extend(sorted(self.status_lines))
k = status_queue.pop()
try: line, end = self.status_lines[k], ''
except KeyError: continue
if line.endswith('\n'): # final status for this key
line, end = line.rstrip('\n'), '\n'
del self.status_lines[k]
print('\r' + term_pad(line), end=end, flush=True)
try: await asyncio.sleep(interval)
except asyncio.CancelledError:
for k in it.chain(status_queue, list(self.status_lines)):
line = self.status_lines.pop(k, None)
if line: print('\r' + term_pad(line.rstrip('\n')), end='\n', flush=True)
break
def status_line_set(self, key, line=None):
if line: self.status_lines[key] = line
elif key in self.status_lines: self.status_lines[key] += '\n'
async def chunks_beep_path(self):
if self.chunks_beep is True:
self.chunks_beep = fut = asyncio.Future()
proc, p_dir = None, os.environ.get('XDG_RUNTIME_DIR') or tempfile.gettempdir()
try:
with tempfile.NamedTemporaryFile( dir=p_dir,
prefix='toogg.chunks-beep.', suffix='.tmp.ogg', delete=False ) as tmp:
p = pl.Path(tmp.name)
proc = await asyncio.create_subprocess_exec(
*'ffmpeg -v error -hide_banner -y -f lavfi -i'.split(),
"aevalsrc='sin(PI/t^3)':s=44100:d=0.4,afade=in:st=0:d=0.2,afade=out:st=0.2:d=0.2",
*'-c:a libvorbis -ac:a 2 -f ogg'.split(), p )
await proc.wait()
if proc.returncode != 0 or not p.stat().st_size > 0:
raise AudioConvError('ffmpeg failed to generate chunks-beep audio file')
p_new = pl.Path(str(p).rsplit('.', 2)[0] + '.ogg')
p.rename(p_new)
self.chunks_beep = p_new
fut.set_result(p_new)
except Exception as err:
self.chunks_beep = err
fut.set_exception(err)
raise
finally:
if proc: await aio_proc_close(proc)
p.unlink(missing_ok=True)
return self.chunks_beep
elif isinstance(self.chunks_beep, asyncio.Future): return await self.chunks_beep
elif isinstance(self.chunks_beep, Exception): raise self.chunks_beep
else: return self.chunks_beep
async def conv_cleanup(self, *task_ids, raise_errors=False):
if not task_ids: task_ids = list(self.conv_jobs.keys())
for task_id in task_ids:
(conv := self.conv_jobs.pop(task_id)).cancel()
try: await conv
except asyncio.CancelledError: pass
except Exception as err:
if self.exit_sig: log.debug('conv-job interrupted/failed: {}', err_fmt(err)); continue
log.exception('conv-job failed: {}', err_fmt(err))
if raise_errors: raise
async def conv_list(self):
src_stack = list(reversed(self.src_list))
if len(src_set := set(src_stack)) != len(src_stack):
src_dup = '\n'.join(sorted(
f' {src}' for src in src_set if src_stack.count(src) > 1 ))
raise AudioConvError(f'Duplicate source paths detected:\n{src_dup}')
# Deduplication of dst filenames
dst_name_aliases, dst_name_map = dict(), dict()
def dst_name_base(p): # returns name without extension, incl. for dirs
if not (fn := dst_name_aliases.get(ps := str(p))):
fn = os.path.basename(ps).rsplit('.', 1)[0]
fn = dst_name_aliases[ps] = self.dst_name_tpl.format(name=fn) or fn
return fn
for p in sorted(self.src_list, key=lambda p: (os.path.basename(p), str(p))):
dst_name_map.setdefault(dst_name_base(ps := str(p)), list()).append(ps)
for dst_name, pss in dst_name_map.items():
if len(pss) == 1: continue
nf = str(len(str(len(pss))))
for n, ps in enumerate(pss, 1):
if '.' not in dst_name: fn, ext = dst_name, ''
else: fn, ext = dst_name.rsplit('.'); ext = f'.{ext}'
dst_name_aliases[ps] = ('{}.{:0'+nf+'d}{}').format(fn, n, ext)
while True:
# Processing sequence via src_stack:
# str or pl.Path or AudioConvPath
# if str (ytdl url): [ -(conv_ytdl)-> AudioConvSubResult ]
# -(conv_probe)-> AudioConvProbed
# if chunks: [ -> N x AudioConvChunk ]
# -(conv_src)-> dst
# All AudioConvPath subtypes have .p path + metadata.
while src_stack and len(self.conv_jobs) < self.max_parallel:
src, start, length = src_stack.pop(), self.start, self.length
if isinstance(src, pl.Path): src = AudioConvPath(src)
if not isinstance(src, AudioConvPath): conv = self.conv_ytdl(src, self.ytdl_opts)
elif not isinstance(src, AudioConvProbed):
conv = self.conv_probe(src, self.start, self.length)
elif self.probe:
print(f'{src.label} [{src.codec}] :: {td_repr(src.duration)}')
conv = None
elif self.chunks and not isinstance(src, AudioConvChunk):
if not src.duration:
raise AudioConvError( 'Cannot split src'
f' into chunks due to unknown duration: {src}' )
src_chunks, chunk_len = list(), self.chunks
dst_name = dst_name_base(src.p)
start, length = start or 0, length or src.duration
length_total = length
dst_dir = ( dst_name if not self.inplace
else (str(src.p.parent.resolve()).strip(os.sep) + f'/{dst_name}') )
dst_dir = AudioConvChunkDir(dst_dir, src, src_chunks, cleanup=src.cleanup)
dst_dir.p.mkdir(parents=True, exist_ok=True)
while length > 0:
n = len(src_chunks) + 1
dst_file = self.chunks_fn_tpl.format(n=n, name=dst_name)
src_chunks.append(AudioConvChunk(
src, dst_dir.p / dst_file, start, min(chunk_len, length), src.codec, dst_file ))
start += chunk_len; length -= chunk_len
self.chunk_dirs.add(dst_dir)
src_stack.extend(reversed(src_chunks))
print( f'Splitting {src.label!r}: {len(src_chunks)} chunks'
f' ({td_repr(chunk_len)} each), {td_repr(length_total)} total' )
continue
else:
if isinstance(src, AudioConvChunk):
dst, start, length = src.dst, src.start, src.duration
else:
dst_base = dst_name_base(src.p)
if self.inplace: dst_base = str(src.p.parent.resolve()).strip(os.sep) + f'/{dst_base}'
if (dst := pl.Path(f'{dst_base}.ogg')).resolve() == src.p.resolve():
for n in it.chain([''], range(2**30)):
if not (dst := f'{dst_base}.copy{n}.ogg').exists(): break
else: raise AudioConvError(f'Failed to generate copy-path for dst: {dst}')
dst.parent.mkdir(parents=True, exist_ok=True)
conv = self.conv_src(src, dst, start, length)
if conv:
log.debug('Scheduling new conv-job for src: {}', src)
conv = self.loop.create_task(conv)
conv.task_src, conv.task_id = src, id(conv)
self.conv_jobs[conv.task_id] = conv
if not self.conv_jobs: break
done, pending = await asyncio.wait(
self.conv_jobs.values(), return_when=asyncio.FIRST_COMPLETED )
for res in done:
dst_sub = isinstance(dst_path := await res, AudioConvSubResult)
log.debug( 'conv-job done (sub={},'
' dst-file={!r}) for src: {!r}', int(dst_sub), dst_path, src )
await self.conv_cleanup(res.task_id, raise_errors=True)
if dst_sub: src_stack.append(dst_path) # keep processing it
else: self.src_done.setdefault(res.task_src.p, set()).add(dst_path)
if not self.probe:
assert len(self.src_done) == len(self.src_list)
assert all((len(self.src_done[cd.src.p]) == len(cd.chunks)) for cd in self.chunk_dirs)
if self.remove_src:
for src, dst_set in self.src_done.items():
if len(dst_set) == 1:
dst = list(dst_set)[0]
if dst.resolve() == src.resolve(): src = None
elif ( self.inplace
and dst.parent.resolve() == src.parent.resolve()
and src.name.lower().endswith('.ogg') ):
os.rename(dst, src) # special case for .copyX.ogg files in the same dir
src = None
if src: src.unlink()
self.success = True
conv_ytdl_name_len_max = 100
def conv_ytdl_fix_name(self, name, len_max, len_slug=20):
if len(name) <= len_max: return name
name, ext = name.rsplit('.', 1) if '.' in name else (name, None)
if len(ext) >= len_slug: name, ext = f'{name}.{ext}', None
name_list = name.split()
for n, slug in sorted(enumerate(name_list), key=lambda v: -len(v[1])):
if len(slug) > len_slug: name_list[n] = slug[:len_slug-3] + '---'
else: break
name = ' '.join(name_list)
if len(name) > len_max: name = name[:len_max]
if ext: name = f'{name}.{ext}'
assert len(name) < len_max + len_slug, [len(name), name]
return name
async def conv_ytdl(self, url, ytdl_opts=None):
if not ytdl_opts:
# Preference should be for something with vorbis-encoded audio, but YT
# audio-only dls can also be slower due to throttling, or give 403 errors
ytdl_opts = ['-f', '43/18/480p/bestaudio[ext=ogg]/bestaudio[ext=webm]/bestaudio/best']
ytdl_opts = ['youtube-dl', '--newline'] + ytdl_opts
async with YTDLProc( self.loop, url,
ytdl_opts + ['--get-filename', url],
stdin=sp.DEVNULL, stdout=sp.PIPE ) as proc:
src = (await proc.stdout.read()).strip().decode()
await proc.finished
if not src or '\n' in src:
raise AudioConvError(f'Weird output from "youtube-dl --get-filename": {src!r}')
src = pl.Path(src)
if len(src.name) > self.conv_ytdl_name_len_max:
src = src.parent / self.conv_ytdl_fix_name(src.name, self.conv_ytdl_name_len_max)
src, src_part = AudioConvPath(src), AudioConvPath(src.parent / f'{src.name}.part')
ytdl_opts.extend(['-o', str(src.p).replace('%', '%%')])
try:
async with YTDLProc( self.loop, src.p.name,
ytdl_opts + [url], progress_func=self.status_line_set ) as proc:
await proc.finished
finally: src_part.p.unlink(missing_ok=True)
return AudioConvSubResult(src)
async def conv_probe(self, src, start=None, length=None):
src_name, src_cleanup = src.p.name, isinstance(src, AudioConvSubResult)
async with FFProc( self.loop, src_name,
[ 'ffprobe', '-v', 'fatal', '-show_entries',
'stream=codec_type,codec_name,channels,duration:format=duration',
'-print_format', 'json', str(src.p) ], stdout=sp.PIPE ) as proc:
src_info = (await proc.stdout.read()).decode()
await proc.finished
src_duration = src_codec = src_chans = None
try:
src_info = json.loads(src_info)
for stream in src_info['streams']:
if stream.get('codec_type') != 'audio': continue
src_duration = float(stream.get('duration', 0))
src_codec, src_chans = stream.get('codec_name'), stream.get('channels')
break
if not src_duration:
src_duration = float(src_info['format']['duration'])
except Exception as err:
log.error( '[{!r}] ffprobe failed, progress'
' info wont be available: {}', src_name, err_fmt(err) )
if src_duration:
if start: src_duration = max(0, src_duration - start)
if length: src_duration = min(src_duration, length)
if src_codec: src_codec = f'{src_codec}:{src_chans or 0}'
log.debug('[{!r}] Detected audio stream duration: {:.1f}s', src_name, src_duration)
log.debug('[{!r}] Detected audio stream codec: {!r}', src_name, src_codec)
return AudioConvProbed(src, src_cleanup, src_duration, src_codec)
async def conv_src(self, src, dst, start=None, length=None):
paths, src = {dst}, AudioConvProbed.ensure(src)
if src.cleanup: paths.add(src.p)
def tmp_file(ext): paths.add(p := f'{dst}.{ext}'); return p
job = AudioConvJob(src.label, src.p, dst, start, length, tmp_file)
ffmpeg_cmd = 'ffmpeg -v error -hide_banner -y -progress pipe:1'.split()
job.ffmpeg = lambda pre, cmd_ext, **kws: FFProc(
self.loop, job.name, ffmpeg_cmd + cmd_ext, stream_len=src.duration,
prefix=pre, progress_func=self.status_line_set, **kws )
job.can_copy_stream = src.codec == 'vorbis:2'
conv_func = ( self.conv_src_simple
if not self.loudnorm else self.conv_src_loudnorm )
try:
await conv_func(job)
paths.remove(dst)
finally:
for p in paths:
with contextlib.suppress(OSError): os.unlink(p)
return dst
def _conv_offset_opts(self, job):
offset_opts = list()
if job.offset: offset_opts.extend(['-ss', str(job.offset)])
if job.length: offset_opts.extend(['-t', str(job.length)])
return offset_opts
async def conv_src_simple(self, job):
input_opts = ['-i', str(job.src)]
coding_opts = ['-vn'] + self._conv_offset_opts(job)
if job.can_copy_stream and not self.chunks_beep: coding_opts += ['-c:a', 'copy']
else:
if self.chunks_beep:
input_opts += ['-i', str(await self.chunks_beep_path())]
offset = job.offset and int(job.offset * 1000)
offset = 'acopy' if not offset else f'adelay={offset}|{offset}'
coding_opts += [ '-filter_complex',
f'[1:a]{offset}[beep];[0:a][beep]amix=inputs=2:duration=longest[out]',
'-map', '[out]' ]
coding_opts += ['-c:a', 'libvorbis', '-ac:a', '2']
await job.ffmpeg('to-ogg', input_opts + coding_opts + ['-f', 'ogg', str(job.dst)]).run()
async def conv_src_loudnorm(self, job):
await job.ffmpeg( 'to-wav [1/3]', ['-i', str(job.src)] +
self._conv_offset_opts(job) + ['-f', 'wav', job.tmp_file('src.wav')] ).run()
ffmpeg_env = os.environ.copy()
ffmpeg_env['NO_COLOR'] = '1'
opts = self.loudnorm_opts.strip(':')
if opts: opts += ':'
async with job.ffmpeg( 'loudnorm-info [2/3]',
[ '-v', 'info', '-i', job.tmp_file('src.wav'),
'-af', f'loudnorm={opts}print_format=json', '-f', 'null', '/dev/null' ],
env=ffmpeg_env, stderr=sp.PIPE ) as proc:
norm_info, ffmpeg_stderr = list(), list()
while True:
line = await proc.readline('stderr')
ffmpeg_stderr.append(line.rstrip())
if not line: break
line = line.strip()
if norm_info:
assert not line.endswith(proc.crop_mark)
norm_info.append(line)
if line == '}': break
if re.search(r'^\[Parsed_loudnorm_0 @ 0x[0-f]+\]$', line): norm_info.append('')
norm_info = ''.join(norm_info)
try: await proc.finished
except:
log.error(
'ffmpeg stderr:\n--------------------\n{}\n--------------------',
'\n'.join(ffmpeg_stderr).rstrip() )
raise
log.debug('[{!r}] detected audio normalization info: {!r}', job.name, norm_info)
if not norm_info:
raise AudioConvError( 'ffmpeg failed to produce'
f' normalization info json (source file: {job.name!r})' )
norm_info = json.loads(norm_info)
with open(job.tmp_file('loudnorm.json'), 'w') as dst:
json.dump(norm_info, dst, sort_keys=True, indent=2, separators=(',', ': '))
opts_ext = (
'measured_I={}:measured_TP={}:'
'measured_LRA={}:measured_thresh={}:offset={}'
).format(*op.itemgetter(
'input_i', 'input_tp', 'input_lra', 'input_thresh', 'target_offset' )(norm_info))
loudnorm = f'loudnorm={opts}{opts_ext}'
input_opts = ['-i', job.tmp_file('src.wav')]
if self.chunks_beep:
input_opts += ['-i', str(await self.chunks_beep_path())]
filter_opts = [ '-filter_complex',
f'[0:a][1:a]amix=inputs=2:duration=longest,{loudnorm}[out]', '-map', '[out]' ]
else: filter_opts = ['-af', loudnorm]
await job.ffmpeg( 'to-ogg [3/3]', input_opts + filter_opts +
['-c:a', 'libvorbis', '-ac:a', '2', '-ar', '48k', '-f', 'ogg', str(job.dst)] ).run()
def main(args=None):
import argparse, textwrap
dd = lambda text: re.sub( r' \t+', ' ',
textwrap.dedent(text).strip('\n') + '\n' ).replace('\t', ' ')
parser = argparse.ArgumentParser(
formatter_class=argparse.RawTextHelpFormatter,
usage='%(prog)s [options] src [src ...]', description=dd('''
Convert source file(s) to audio, normalize it and encode to ogg/vorbis.
Optionally splits files too, adds beeps
to split chunks, uses ffmpeg "loudnorm" filter, etc.'''))
group = parser.add_argument_group('Source/destination')
group.add_argument('src', nargs='+', help=dd('''
File(s) or URL(s) to convert.
Files that both dont exist and have ":" in the name
will be treated as URLs (to be processed with youtube-dl first).'''))
group.add_argument('-x', '--remove-src', action='store_true',
help='Remove source file(s) on success of the whole operation.')
group.add_argument('-i', '--inplace', action='store_true', help=dd('''
Recursively (following symlinks) replace source files
with .ogg ones in the same dir(s) where they are found.
All "src" args must be paths.
Does not imply --remove-src (creates copies next to src files), unless specified.'''))
group.add_argument('-F', '--inplace-name-filter',
action='append', metavar='glob', help=dd('''
Case-insensitive (!!!) shell-style glob/fnmatch
pattern(s) to match against each file basename for --inplace conversion.
All files that do not match will be ignored.
Can be specified multiple times, combined via "any match" logic.
Remember to quote pattern(s) in shells. Example: toogg -i -F '*.flac' somedir'''))
group.add_argument('-T', '--dst-dir', metavar='path', help=dd('''
Existing path to store resulting files in. Defaults to current one (unless --inplace).
With --inplace also specified, full dst realpath (!!!) will be re-created under this dir.'''))
group.add_argument('-y', '--ytdl-opts', action='append', metavar='opts', help=dd('''
Extra opts for youtube-dl command.
Will determine the name for resulting file from youtube-dl -o option.
Default is to override -f option for suitable audio formats,
but leave -o unspecified (i.e. use default or config file).
Will be split on spaces, unless option is used multiple times.'''))
group.add_argument('-n', '--name', metavar='tpl', help=dd('''
Template to rename resulting file(s), instead of default "{name}".
Names are deduplicated with number-suffix when multiple sources are used.
Substituted keys: "name" - source filename without extension.'''))
group = parser.add_argument_group('Volume normalization')
group.add_argument('--loudnorm', action='store_true', help=dd('''
Use ffmpeg "loudnorm" filter to have sane volume level.
It takes quite a while to process longer files with it.
Should not be available in pre-3.1 (2016-06-27) ffmpeg builds.
ffmpeg docs link: https://ffmpeg.org/ffmpeg-all.html#loudnorm'''))
group.add_argument('-o', '--loudnorm-opts', metavar='ffmpeg-af-opts', help=dd('''
String of options to pass to loudnorm filter, same as they
would appear on ffmpeg command line. Example: I=-16:TP=-1.5:LRA=11'''))
group = parser.add_argument_group('Other ffmpeg options')
group.add_argument('-s', '--start', metavar='[[hh:]mm:]ss[.ms]', help=dd('''
Convert only part of src file(s), starting from specified timestamp.
Passed to ffmpeg -ss option, see "Time duration" in ffmpeg-utils(1) manpage.
Aside from ffmpeg format, short N[y/mo/w/d/h/m/s]+ format with
unit suffixes can also be used, for example: 131.721s, 1h10m, 229s 600ms.'''))
group.add_argument('-l', '--length', metavar='[[hh:]mm:]ss[.ms]', help=dd('''
Convert only specified length of src file(s).
Can be combined with -s/--start, same value format as that.'''))
group.add_argument('-c', '--chunks',
nargs='?', metavar='[[hh:]mm:]ss[.ms]', const='15:00', help=dd('''
Create dst directory instead of file with
output pre-split into chunks of specified length.
Can be used to enable faster parallel encoding.
Length argument is optional and defaults to 15:00 (15min).'''))
group.add_argument('-C', '--chunks-fn', metavar='tpl', default='{n:03d}__{name}.ogg',
help='Filename template of each chunk-file. Default: %(default)s')
group.add_argument('-b', '--chunks-beep', action='store_true',
help='Prepend some static + beep sound to each chink-file, to make track-switching audible.')
group = parser.add_argument_group('Debug/misc')
group.add_argument('-j', '--max-parallel', type=int, metavar='n', help=dd('''
Max number of processing jobs to run in parallel.
Default or 0 will be set to number of available cpu threads.'''))
group.add_argument('-p', '--probe', action='store_true',
help='Only probe and print length for each file, without converting anything.')
group.add_argument('-d', '--debug', action='store_true', help='Verbose operation mode.')
opts = parser.parse_args(sys.argv[1:] if args is None else args)
global log
logging.basicConfig(level=logging.DEBUG if opts.debug else logging.WARNING)
log = get_logger('main')
src_list = list()
for src in opts.src:
if ':' in src and not os.path.exists(src): src_list.append(src); continue # urls
try: src_list.append(pl.Path(src).resolve(strict=True))
except FileNotFoundError: parser.error(f'Source path missing/inaccessible: {src}')
if opts.inplace:
name_filter = opts.inplace_name_filter or list()
if name_filter:
import fnmatch
name_filter = list(ft.partial(fnmatch.fnmatch, pat=pat) for pat in name_filter)
src_list, src_list_old = list(), src_list
for p in src_list_old:
if not isinstance(p, pl.Path):
parser.error(f'Source arg is not a path (with --inplace specified): {p}')
if p.is_dir():
for root, dirs, files in p.walk(follow_symlinks=True):
for fn in files:
if name_filter and not any(m(fn) for m in name_filter):
log.debug('Skipping file due to fnmatch=false: {}', root / fn)
continue
src_list.append(pl.Path(root / fn))
else: src_list.append(p)
os.chdir(opts.dst_dir or '/')
elif opts.dst_dir: os.chdir(opts.dst_dir)
start, length, chunks = map(
parse_pos_spec, [opts.start, opts.length, opts.chunks] )
ytdl_opts = opts.ytdl_opts or list()
if len(ytdl_opts) == 1: ytdl_opts = ytdl_opts[0].strip().split()
success = asyncio.run(AudioConv.run_async(
src_list, dst_name_tpl=opts.name, start=start, length=length,
chunks=chunks, chunks_fn_tpl=opts.chunks_fn,
chunks_beep=bool(chunks and opts.chunks_beep),
max_parallel=opts.max_parallel, probe=opts.probe, inplace=opts.inplace,
remove_src=opts.remove_src, ytdl_opts=ytdl_opts,
loudnorm=opts.loudnorm, loudnorm_opts=opts.loudnorm_opts ))
return int(not success)
if __name__ == '__main__': sys.exit(main())