-
Notifications
You must be signed in to change notification settings - Fork 2
/
restrict.py
executable file
·1482 lines (1242 loc) · 53.5 KB
/
restrict.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
import argparse
import json
import os
import subprocess
import sys
from textwrap import dedent, indent
# ANSI escape codes don't work on Windows, unless the user jumps through
# additional hoops (either by using 3rd-party software or enabling VT100
# emulation with Windows 10)
# colorama solves this issue by converting ANSI escape codes into the
# appropriate win32 calls (only on Windows)
# If colorama isn't available, disable colorized output on Windows
colors = True
try:
import colorama
colorama.init()
except ModuleNotFoundError:
if os.name == "nt":
colors = False
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Global constants
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
class ExitCodes:
"""Store exit codes for non-successful execution."""
DEP = 1 # missing required software
OPT = 2 # invalid user-specified option
RUN = 3 # misc. runtime error
SIZE = 4 # failed to fit all videos into size range
INT = 5 # early termination was requested by the user (i.e. Ctrl+C)
class Colors:
"""Store ANSI escape codes for colorized output."""
# https://en.wikipedia.org/wiki/ANSI_escape_code#Colors
HEADER = '\033[1;36m' # Cyan (bold)
FILE = '\033[35m' # Magenta
INFO = '\033[32m' # Green
ERROR = '\033[31m' # Red
WARNING = '\033[33m' # Yellow
SUCCESS = '\033[32m' # Green
DEFAULT = '\033[39m' # Default foreground color
BOLD = '\033[1m'
RESET = '\033[0m'
def disable(self):
"""Disable colorized output by removing escape codes."""
self.HEADER = ''
self.FILE = ''
self.INFO = ''
self.ERROR = ''
self.WARNING = ''
self.SUCCESS = ''
self.DEFAULT = ''
self.BOLD = ''
self.RESET = ''
# Create objects to hold constants
status = ExitCodes()
size_fail = False
fgcolors = Colors()
if not colors:
fgcolors.disable()
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Classes
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
class DefaultOptions:
"""Stores general options"""
# Set output verbosity
# 0 -> quiet mode (only warnings and errors)
# 1 -> default mode (0 + basic progress information)
# 2 -> verbose mode (0 + 1 + FFmpeg options + size info)
VERBOSITY = 1
# Enable audio output
# Converts all available audio streams by default
# Only has an effect, when the input comes with at least one audio stream
AUDIO = False
# Upper size limit in MB
LIMIT = 3
# FFmpeg filter string (https://ffmpeg.org/ffmpeg-filters.html)
# Gets passed directly to FFmpeg (and will throw errors if wrong)
F_USER = None
# Number of passes to use during 1st and 2nd bitrate mode
# (CBR will always be done with a single pass)
# 1 -> only really useful for more consistent quality for very low bitrate
# encodes or when converting GIFs while preserving transparency
# 2 -> should always be preferred as libvpx doesn't offer effective
# bitrate control for single pass encodes
PASSES = 2
# Lower size limit as percentage of the upper one
# 0 to disable, >0.95 (or >0.9 with --crf) is discouraged
UNDER = 0.75
# How many attempts to make for each bitrate mode
# This is an upper limit, the remaining attempts of a mode may be skipped
# if the file size doesn't change enough (<1%)
ITERS = 3
# How many threads libvpx should use
# FFmpeg discourages >16, but VP8 encoding doesn't scale well beyond 4-6
THREADS = 1
# How to trim the input video (same as FFmpeg's -ss and -to)
# Values must be in seconds (int/float) or need to be passed to valid_time()
# Negative time values aren't supported
GLOBAL_START = None
GLOBAL_END = None
# Force 2 channels per audio stream
# Useful to avoid wasting bitrate on surround sound or to use libopus
# for otherwise unsupported channel configurations
# IMPORTANT: Also force-converts mono input to stereo
FORCE_STEREO = False
# Limit the output to 1 video + 1 audio stream (no effect on subtitles)
BASIC_FORMAT = False
# Enable subtitle output
# Input with image-based subtitles will be skipped by default
SUBS = False
# Use MKV as fallback container for input with image-based subtitles,
# instead of skipping the files entirely
# Has no effect if subtitle output is disabled
MKV_FALLBACK = False
# Map input subtitles, but disable subtitle output
# Useful to prevent unnecessary soft subs in the output while hardsubbing
BURN_SUBS = False
# What video encoder to use
# libvpx -> VP8
# libvpv-vp9 -> VP9
# AV1 (via libaom or libsvt_av1) isn't supported
V_CODEC = "libvpx"
# Use CQ (constrained quality) instead of classic VBR
CRF = False
# Skip 1st bitrate mode (VBR or CQ + min. quality)
NO_QMAX = False
# Skip 3rd bitrate mode (CBR; also allowed to drop frames)
NO_CBR = False
# Bits per pixel threshold (steers downscaling and frame rate reduction)
# Personal recommendations for VP8:
# < 0.01: bad quality
# ~ 0.04: med quality
# > 0.075: good quality
BPP_THRESH = 0.075
# Preserve input transparency
# Overrides any value of pix_fmt with "yuva420p"
TRANSPARENCY = False
# Pixel format to use
# See "ffmpeg -h encoder=libvpx(-vp9)" for a full list of supported values
PIX_FMT = "yuv420p"
# Min. height threshold for automatic downscaling
# Has no influence on input that is already below the threshold
MIN_HEIGHT = 240
# Max. height threshold for automatic downscaling
# Can be used to force-downscale, but has to be higher than min_height
MAX_HEIGHT = None
# Min. fps threshold for automatic frame rate reduction
# Has no influence on input that is already below the threshold
MIN_FPS = 24
# Max. fps threshold for automatic frame rate reduction
# Can be used to force a lower frame rate, but has to be higher than min_fps
MAX_FPS = None
# What audio encoder to use
# libvorbis -> Vorbis
# libopus -> Opus (fails on some surround sound configurations)
# opus -> Opus (not tested, but it should work in theory)
A_CODEC = "libvorbis"
# Disable audio copying (i.e. always reencode all audio)
NO_COPY = False
# Copy audio streams regardless of their bitrate
# Audio streams will still be reencoded in case of unsupported audio
# formats, -ss or audio filters
FORCE_COPY = False
# Min. audio threshold for automatic audio bitrate selection
# Represents the audio bitrate per channel (e.g. 24 -> 48Kbps for stereo)
MIN_AUDIO = 24
# Max. audio threshold for automatic audio bitrate selection
# Represents the audio bitrate per channel (e.g. 80 -> 160Kbps for stereo)
# Has to be higher than min_audio and >6 (limit for libvorbis)
MAX_AUDIO = None
# Disable user set filters during the 1st of a 2-pass encode
# Useful when very demanding filters are used (e.g. nlmeans)
# Has no influence on automatically used filters (scale and fps)
NO_FILTER_FIRSTPASS = False
# Set FFmpeg verbosity (ffmpeg -v/-loglevel)
# Special option "stats" is a shortcut for "-v error -stats"
FFMPEG_VERBOSITY = "stats"
# Enable debug mode
# Prints FFmpeg commands without executing them
DEBUG = False
class CustomArgumentParser(argparse.ArgumentParser):
"""Override ArgumentParser's automatic help text."""
def format_help(self):
"""Return custom help text."""
help_text = dedent(f"""\
RestrictedWebM is a script to produce WebMs within a certain size range.
Usage: {self.prog} [OPTIONS] INPUT [INPUT]...
Input:
Absolute or relative path to a video/image
Common options:
-h, --help show help
-q, --quiet suppress non-error output
-v, --verbose print verbose information
-a, --audio enable audio output
-s, --size SIZE limit max. output file size in MB (def: {self.get_default("limit")})
-f, --filters FILTERS use custom ffmpeg filters
-p, --passes {{1,2}} specify number of passes (def: {self.get_default("passes")})
-u, --undershoot RATIO specify undershoot ratio (def: {self.get_default("under")})
-i, --iterations ITER iterations for each bitrate mode (def: {self.get_default("iters")})
-t, --threads THREADS enable multithreading
-ss, --start TIME start encoding at the specified time
-to, --end TIME end encoding at the specified time
-fs, --force-stereo force stereo audio output
-bf, --basic-format restrict output to one video/audio stream
Subtitle options:
--subtitles enable subtitle output
--mkv-fallback allow usage of MKV for image-based subtitles
--burn-subs discard soft subtitles after hardsubbing
Advanced video options:
--vp9 use VP9 instead of VP8
--crf use constrained quality instead of VBR
--no-qmax skip first bitrate mode (VBR with qmax)
--no-cbr skip last bitrate mode (CBR with frame dropping)
--bpp BPP set custom bpp threshold (def: {self.get_default("bpp_thresh")})
--transparency preserve input transparency
--pix-fmt FORMAT choose color space (def: {self.get_default("pix_fmt")})
--min-height HEIGHT force min. output height (def: {self.get_default("min_height")})
--max-height HEIGHT force max. output height
--min-fps FPS force min. frame rate (def: {self.get_default("min_fps")})
--max-fps FPS force max. frame rate
Advanced audio options:
--opus use and allow Opus as audio codec
--no-copy disable stream copying
--force-copy force-copy compatible (!) audio streams
--min-audio RATE force min. channel bitrate in Kbps (def: {self.get_default("min_audio")})
--max-audio RATE force max. channel bitrate in Kbps
Misc. options:
--no-filter-firstpass disable user filters during the first pass
--ffmpeg-verbosity LEVEL change FFmpeg command verbosity (def: {self.get_default("ffmpeg_verbosity")})
--debug only print ffmpeg commands
All output will be saved in '{self.get_default("out_dir")}/'.
'{self.get_default("out_dir")}/' is located in the same directory as the input.
For more information visit:
https://github.com/HelpSeeker/Restricted-WebM/wiki
""")
return help_text
class FileInfo:
"""Gathers information about output settings"""
def __init__(self, in_path):
"""Initialize all properties."""
# Subtitle-related
self.image_subs = out_image_subs(in_path)
# Path-related
self.input = in_path
self.dir = os.path.join(os.path.dirname(self.input), opts.out_dir)
ext = f"{'mkv' if self.image_subs else 'webm'}"
self.name = os.path.splitext(os.path.basename(self.input))[0]
self.output = os.path.join(self.dir, f"{self.name}.{ext}")
self.temp = os.path.join(self.dir, f"{self.name}_{opts.suffix}.{ext}")
command = [
"ffprobe", "-v", "error", "-show_format", "-show_streams",
"-print_format", "json", self.input
]
info = subprocess.run(command, stdout=subprocess.PIPE, check=False).stdout
info = json.loads(info)
# Check input file for basic validity
# Needs to be done here as the following steps could already fail
self.valid = self.is_valid(info)
if not self.valid:
return
# Duration-related
try:
self.in_dur = float(info['format']['duration'])
except KeyError:
self.in_dur = self.brute_input_duration()
self.out_dur = self.calc_output_duration()
# Audio-related
a_streams = [s for s in info['streams'] if s['codec_type'] == "audio"]
self.a_rate, self.a_list, self.a_sample = self.audio_properties(a_streams)
# Video-related
self.v_rate = self.init_video_bitrate()
v_stream = info['streams'][0]
self.in_height, self.in_fps, self.ratio = self.video_properties(v_stream)
self.out_height = self.in_height
self.out_fps = self.in_fps
def is_valid(self, info):
"""Test for basic pitfalls that would lead to FFmpeg failure."""
streams = [s['codec_type'] for s in info['streams']]
if self.image_subs and not opts.mkv_fallback:
err(f"{self.input}: "
"Conversion of image-based subtitles not supported!")
return False
if streams[0] != "video":
err(f"{self.input}: "
"Unsupported stream order (first stream not video)!")
return False
if [s for s in streams[1:] if s == "video"]:
err(f"{self.input}: "
"Files with more than one video stream not supported!")
return False
return True
def brute_input_duration(self):
"""Brute-force detect input duration for GIFs and other images."""
# Encodes input as AVC (fast) and reads duration from the output
ffmpeg = [
"ffmpeg", "-y", "-v", "error", "-i", self.input, "-map", "0:v",
"-c:v", "libx264", "-preset", "ultrafast", "-crf", "51",
f"{self.name}_{opts.suffix}_.mkv"
]
ffprobe = [
"ffprobe", "-v", "error", "-show_format", "-show_streams",
"-print_format", "json", f"{self.name}_{opts.suffix}.mkv"
]
subprocess.run(ffmpeg, check=False)
info = subprocess.run(ffprobe, stdout=subprocess.PIPE, check=False).stdout
info = json.loads(info)
return float(info['format']['duration'])
def calc_output_duration(self):
"""Calculate output duration."""
if not (opts.global_start or opts.global_end):
return self.in_dur
start = 0
end = self.in_dur
if opts.global_start and opts.global_start < end:
start = opts.global_start
if opts.global_end and opts.global_end < end:
end = opts.global_end
return round(end - start, 3)
def audio_properties(self, streams):
"""Gather various audio properties (e.g. bitrate, sample rate)."""
if not opts.audio:
return (0, [], None)
c_list = [int(s['channels']) if not opts.force_stereo else 2
for s in streams]
if opts.basic_format:
del c_list[1:]
channels = sum(c_list)
# Formula was originally based on my experience with stereo audio
# for 4MB WebMs, but was later changed to accommodate more bitrates
# and channels
factor = opts.max_size*8 \
/ (self.out_dur*channels*4*1000) \
/ opts.a_factor
c_rate = choose_audio_bitrate(factor)
b_list = [c*c_rate for c in c_list]
bitrate = sum(b_list)
# Downsample necessary for lower bitrates with libvorbis
if c_rate <= 6:
sample = 8000
elif c_rate <= 12:
sample = 12000
elif c_rate <= 16:
sample = 24000
else:
sample = None
return (bitrate, b_list, sample)
def video_properties(self, stream):
"""Gather various video properties (e.g. height, fps)."""
# If user scale/fps filter -> test encode
# Read the effect (i.e. changed resolution / frame rate) from output
if opts.user_scale or opts.user_fps:
ffmpeg = [
"ffmpeg", "-y", "-v", "error", "-i", self.input, "-vframes", "1",
"-filter_complex", opts.f_user, f"{self.name}_{opts.suffix}.mkv"
]
ffprobe = [
"ffprobe", "-v", "error", "-show_format", "-show_streams",
"-print_format", "json", f"{self.name}_{opts.suffix}.mkv"
]
subprocess.run(ffmpeg, check=False)
info = subprocess.run(ffprobe, stdout=subprocess.PIPE, check=False).stdout
info = json.loads(info)
stream = info['streams'][0]
h = int(stream['height'])
w = int(stream['width'])
ratio = w/h
try:
num, den = stream['r_frame_rate'].split("/")
fps = int(num)/int(den)
except ValueError:
fps = stream['r_frame_rate']
return (h, fps, ratio)
def init_video_bitrate(self):
"""Initialize video bitrate to theoretical value."""
bitrate = int(opts.max_size*8 / (self.out_dur*1000) - self.a_rate)
if bitrate <= 0:
bitrate = opts.fallback_bitrate
return bitrate
def update_video_bitrate(self, size):
"""Update video bitrate based on output file size."""
# Size ratio dictates overall bitrate change, but audio bitrate is const
# (v+a) = (v_old+a) * (max/curr)
# v = v_old * (max/curr) + (max/curr - 1) * a
a_offset = int((opts.max_size/size - 1) * self.a_rate)
new_rate = int(self.v_rate * opts.max_size/size + a_offset)
min_rate = int(self.v_rate * opts.min_bitrate_ratio)
# Force min. decrease (% of last bitrate)
if min_rate < new_rate < self.v_rate:
bitrate = min_rate
else:
bitrate = new_rate
if bitrate <= 0:
bitrate = opts.fallback_bitrate
self.v_rate = bitrate
def update_filters(self):
"""Update filter values based on the current video bitrate."""
if opts.user_scale and opts.user_fps:
return
# Perform frame rate drop
if not opts.user_fps:
possible_fps = [
f for f in opts.fps_list
if self.v_rate*1000 / (f*self.ratio*self.in_height**2)
>= opts.bpp_thresh / 2
]
if not possible_fps:
possible_fps = [opts.min_fps]
fps = possible_fps[0]
# Enfore frame rate thresholds
if fps < opts.min_fps:
fps = opts.min_fps
if opts.max_fps and fps > opts.max_fps:
fps = opts.max_fps
if fps > self.in_fps:
fps = self.in_fps
self.out_fps = fps
# Perform downscale
if not opts.user_scale:
possible_heights = [
h for h in range(self.in_height, 0, opts.height_reduction)
if self.v_rate*1000 / (self.out_fps*self.ratio*h**2)
>= opts.bpp_thresh
]
if not possible_heights:
possible_heights = [opts.min_height]
height = possible_heights[0]
# Enforce height thresholds
if height < opts.min_height:
height = opts.min_height
if opts.max_height and height > opts.max_height:
height = opts.max_height
if height > self.in_height:
height = self.in_height
self.out_height = height
class ConvertibleFile:
"""Assembles FFmpeg settings"""
def __init__(self, info):
"""Initialize all properties"""
self.info = info
self.valid = self.info.valid
if not self.valid:
return
# Verbosity-related
if opts.ffmpeg_verbosity == "stats":
self.verbosity = ["-v", "error", "-stats"]
else:
self.verbosity = ["-v", opts.ffmpeg_verbosity]
# Trim/input-related
self.input = ["-i", self.info.input]
if opts.global_start and opts.global_start < self.info.in_dur:
self.input = ["-ss", str(opts.global_start)] + self.input
if opts.global_end and opts.global_end <= self.info.in_dur:
self.input.extend(["-t", str(self.info.out_dur)])
# Map-related
self.map = ["-map", "0:v"]
if opts.subs or opts.burn_subs:
self.map.extend(["-map", "0:s?"])
if opts.audio:
a_streams = [
["-map", f"0:a:{i}?"]
for i in range(len(self.info.a_list))
if opts.audio and not (opts.basic_format and i > 0)
]
for s in a_streams:
self.map.extend(s)
# Subtitle-related
if opts.burn_subs:
self.subs = ["-sn"]
elif self.info.image_subs:
self.subs = ["-c:s", "copy"]
elif opts.subs:
self.subs = ["-c:s", "webvtt"]
else:
self.subs = []
# Audio-related
self.audio = self.init_audio_flags()
# These get updated before each encoding attempt
self.video = []
self.filter = []
def init_audio_flags(self):
"""Initialize audio-related FFmpeg options."""
audio = []
a_streams = [i for i in range(len(self.info.a_list))
if opts.audio and not (opts.basic_format and i > 0)]
for s in a_streams:
if audio_copy(self, s):
audio.extend([f"-c:a:{s}", "copy"])
elif opts.a_codec == "libopus" and opus_fallback(self.info.input, s):
audio.extend([f"-c:a:{s}", opts.fallback_codec])
audio.extend([f"-b:a:{s}", f"{self.info.a_list[s]}K"])
else:
audio.extend([f"-c:a:{s}", opts.a_codec])
audio.extend([f"-b:a:{s}", f"{self.info.a_list[s]}K"])
# -ac/-ar have no effect without audio encoding
# there's no need for additional checks
if opts.force_stereo:
audio.extend(["-ac", "2"])
if self.info.a_sample is not None:
audio.extend(["-ar", str(self.info.a_sample)])
return audio
def update_video_flags(self, mode):
"""Update video-related FFmpeg options."""
self.video = ["-c:v", opts.v_codec]
self.video.extend(["-deadline", "good"])
# -cpu-used defined in call_ffmpeg, since it depends on the pass
# TO-DO:
# Test how strong temporal filtering influences high quality encodes
# Figure out how/why alpha channel support cuts short GIFs during 2-pass
self.video.extend(["-pix_fmt", opts.pix_fmt])
if opts.pix_fmt == "yuva420p":
self.video.extend(["-auto-alt-ref", "0"])
else:
self.video.extend(["-auto-alt-ref", "1",
"-lag-in-frames", "25",
"-arnr-maxframes", "15",
"-arnr-strength", "6"])
self.video.extend(["-b:v", f"{self.info.v_rate}K"])
if opts.crf and mode in (1, 2):
self.video.extend(["-crf", str(opts.crf_value)])
if mode == 1:
self.video.extend(["-qmax", str(opts.min_quality)])
elif mode == 3:
self.video.extend(["-minrate:v", f"{self.info.v_rate}K",
"-maxrate:v", f"{self.info.v_rate}K",
"-bufsize", f"{self.info.v_rate*5}K",
"-skip_threshold", "100"])
self.video.extend(["-threads", str(opts.threads)])
# This check isn't necessary, but it avoids command bloat
if opts.v_codec == "libvpx-vp9" and opts.threads > 1:
self.video.extend(["-tile-columns", "6",
"-tile-rows", "2",
"-row-mt", "1"])
def update_filters_flags(self):
"""Update filter-related FFmpeg options."""
f_scale = f"scale=-2:{self.info.out_height}:flags=lanczos"
f_fps = f"fps={self.info.out_fps}"
if self.info.out_height < self.info.in_height \
and self.info.out_fps < self.info.in_fps:
self.filter = ["-vf", f"{f_scale},{f_fps}"]
elif self.info.out_height < self.info.in_height:
self.filter = ["-vf", f_scale]
elif self.info.out_fps < self.info.in_fps:
self.filter = ["-vf", f_fps]
def assemble_raw_command(self, ff_pass):
"""Assemble custom filter-applying FFmpeg command."""
video = ["-c:v", "copy"]
if opts.global_start or opts.f_video:
video = ["-c:v", "rawvideo"]
audio = ["-c:a", "copy"]
if opts.global_start or opts.f_audio:
audio = ["-c:a", "pcm_s16le"]
if opts.no_filter_firstpass and opts.passes == 2 and ff_pass == 1:
filters = []
else:
filters = ["-filter_complex", opts.f_user]
command = ["ffmpeg", "-y", "-v", "error"]
command.extend(self.input)
command.extend(self.map)
command.extend(video)
command.extend(audio)
command.extend(["-c:s", "copy"])
command.extend(filters)
command.extend(["-strict", "-2", "-f", "matroska", "-"])
return command
def assemble_command(self, mode, ff_pass):
"""Assemble final FFmpeg command, which creates the output file."""
if opts.passes == 1 or mode == 3:
output = ["-cpu-used", "0", self.info.temp]
elif ff_pass == 1:
output = ["-cpu-used", "5", "-passlogfile", self.info.name,
"-pass", "1", "-f", "null", "-"]
elif ff_pass == 2:
output = ["-cpu-used", "0", "-passlogfile", self.info.name,
"-pass", "2", self.info.temp]
command = ["ffmpeg", "-y"]
command.extend(self.verbosity)
if opts.f_user:
command.extend(["-i", "-", "-map", "0"])
else:
command.extend(self.input)
command.extend(self.map)
command.extend(self.video)
command.extend(self.audio)
command.extend(self.subs)
command.extend(self.filter)
command.extend(output)
return command
class FileConverter:
"""Handle the conversion process of a convertible file."""
def __init__(self):
"""Initialize all properties."""
self.curr_size = 0
self.best_size = 0
self.last_size = 0
# Bitrate modes (not in technically accurate sense):
# 1 -> VBR/CQ + qmax
# 2 -> VBR/CQ
# 3 -> CBR
# Set first mode to use
if opts.no_qmax:
self.min_mode = 2
else:
self.min_mode = 1
# Set last mode to use
if opts.no_cbr:
self.max_mode = 2
else:
self.max_mode = 3
# Initialize mode tracker with the first mode
self.mode = self.min_mode
def update_size(self, video):
"""Update size information after an encoding attempt."""
# Save size from previous attempt for later rel. difference check
self.last_size = self.curr_size
# Get current attempt size
if opts.debug:
try:
user = float(input("\nOutput size in MB: "))
except ValueError:
# Use empty input as shortcut to end debug mode (simulate success)
user = opts.limit
self.curr_size = int(user*1024**2)
else:
self.curr_size = os.path.getsize(video.info.temp)
# Test if current size is the best attempt yet
# True, if
# -> first try (no best size yet)
# -> best try too large; smaller than best try (still tries to limit)
# -> best try ok; bigger than best try and smaller than max size
if not self.best_size \
or self.curr_size < self.best_size and self.best_size > opts.max_size \
or self.best_size < self.curr_size <= opts.max_size \
and self.best_size < opts.max_size:
self.best_size = self.curr_size
if not opts.debug:
os.replace(video.info.temp, video.info.output)
def size_info(self):
"""Fetch info text about the currently saved file sizes."""
if self.best_size > opts.max_size:
color = fgcolors.ERROR
elif self.best_size < opts.min_size:
color = fgcolors.WARNING
else:
color = fgcolors.SUCCESS
info = indent(dedent(f"""
Curr. size: {round(self.curr_size/1024**2, 2)} MB
Last size: {round(self.last_size/1024**2, 2)} MB
Best try: {color}{round(self.best_size/1024**2, 2)}{fgcolors.DEFAULT} MB
"""), " ")
return info
def skip_mode(self):
"""Check for insufficient file size change."""
diff = abs((self.curr_size-self.last_size) / self.last_size)
return bool(diff < opts.skip_limit)
def limit_size(self, video):
"""Limit output size to the given upper limit."""
while self.mode <= self.max_mode:
for i in range(1, opts.iters+1):
# Reset bitrate for 1st attempt of a new mode
if i == 1:
video.info.v_rate = video.info.init_video_bitrate()
else:
video.info.update_video_bitrate(self.curr_size)
video.info.update_filters()
video.update_video_flags(self.mode)
video.update_filters_flags()
msg(f"Mode {fgcolors.INFO}{self.mode}{fgcolors.DEFAULT} (of 3) | "
f"Attempt {fgcolors.INFO}{i}{fgcolors.DEFAULT} (of {opts.iters}) | "
f"Height: {fgcolors.INFO}{video.info.out_height}{fgcolors.DEFAULT} | "
f"FPS: {fgcolors.INFO}{video.info.out_fps}{fgcolors.DEFAULT}")
msg(indent(dedent(f"""
Video: {' '.join(video.video)}
Filters: {' '.join(video.filter)}
"""), " "),
level=2)
call_ffmpeg(video, self.mode)
self.update_size(video)
msg(self.size_info(), level=2)
# Skip remaining iters, if change too small (defaul: <1%)
if i > 1 and self.skip_mode():
break
if self.best_size <= opts.max_size:
return
self.mode += 1
def raise_size(self, video):
"""Raise output size above the given lower limit."""
for i in range(1, opts.iters+1):
# don't re-initialize; adjust the last bitrate from limit_size()
video.info.update_video_bitrate(self.curr_size)
video.info.update_filters()
video.update_video_flags(self.mode)
video.update_filters_flags()
msg(f"Enhance Attempt {fgcolors.INFO}{i}{fgcolors.DEFAULT} (of {opts.iters}) | "
f"Height: {fgcolors.INFO}{video.info.out_height}{fgcolors.DEFAULT} | "
f"FPS: {fgcolors.INFO}{video.info.out_fps}{fgcolors.DEFAULT}")
msg(indent(dedent(f"""
Video: {' '.join(video.video)}
Filters: {' '.join(video.filter)}
"""), " "),
level=2)
call_ffmpeg(video, self.mode)
self.update_size(video)
msg(self.size_info(), level=2)
# Skip remaining iters, if change too small (defaul: <1%)
if self.skip_mode():
return
if opts.min_size <= self.best_size <= opts.max_size:
return
def process(self, video):
"""Process (i.e. convert) a single input video."""
global size_fail
self.limit_size(video)
if self.best_size > opts.max_size:
err(f"{video.info.output}: Still too large", color=fgcolors.WARNING)
size_fail = True
return
if self.best_size >= opts.min_size:
return
self.raise_size(video)
if self.best_size < opts.min_size:
err(f"{video.info.output}: Still too small", color=fgcolors.WARNING)
size_fail = True
def reset(self):
"""Reset instance variables after conversion."""
self.curr_size = 0
self.best_size = 0
self.last_size = 0
self.mode = self.min_mode
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Functions
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def err(*args, level=0, color=fgcolors.ERROR, **kwargs):
"""Print to stderr."""
if level <= opts.verbosity:
sys.stderr.write(color)
print(*args, file=sys.stderr, **kwargs)
sys.stderr.write(fgcolors.RESET)
sys.stdout.write(fgcolors.RESET)
def msg(*args, level=1, color=fgcolors.DEFAULT, **kwargs):
"""Print to stdout based on verbosity level."""
if level < opts.verbosity:
# Print "lower-level" info bold in more verbose modes
sys.stdout.write(fgcolors.BOLD)
if level <= opts.verbosity:
sys.stdout.write(color)
print(*args, **kwargs)
sys.stderr.write(fgcolors.RESET)
sys.stdout.write(fgcolors.RESET)
def check_prereq():
"""Test if all required software is installed."""
reqs = ["ffmpeg", "ffprobe"]
for r in reqs:
try:
subprocess.run([r], check=False,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
except FileNotFoundError:
# Can't use err(), since opts isn't initialized yet
print(f"Error: {r} not found!", file=sys.stderr)
sys.exit(status.DEP)
def positive_int(string):
"""Convert string provided by argparse to a positive int."""
try:
value = int(string)
if value <= 0:
raise ValueError
except ValueError:
error = f"invalid positive int value: {string}"
raise argparse.ArgumentTypeError(error)
return value
def positive_float(string):
"""Convert string provided by argparse to a positive float."""
try:
value = float(string)
if value <= 0:
raise ValueError
except ValueError:
error = f"invalid positive float value: {string}"
raise argparse.ArgumentTypeError(error)
return value
def valid_time(string):
"""Convert string provided by argparse to time in seconds."""
# Just test validity with FFmpeg (reasonable fast even for >1h durations)
command = ["ffmpeg", "-v", "quiet",
"-f", "lavfi", "-i", "anullsrc",
"-t", string, "-c", "copy",
"-f", "null", "-"]
try:
subprocess.check_call(command)
except subprocess.CalledProcessError:
error = f"invalid FFmpeg time syntax: {string}"
raise argparse.ArgumentTypeError(error)
# Split into h, m and s
time = [float(t) for t in string.split(":")]
if len(time) == 3:
sec = 3600*time[0] + 60*time[1] + time[2]
elif len(time) == 2:
sec = 60*time[0] + time[1]
elif len(time) == 1:
sec = time[0]
if sec <= 0:
error = f"invalid positive time value: {string}"
raise argparse.ArgumentTypeError(error)
return sec
def valid_file(string):
"""Convert string provided by argparse to valid file path."""
path = os.path.abspath(string)
if not os.path.exists(path):
error = f"file doesn't exist: {path}"
raise argparse.ArgumentTypeError(error)
if not os.path.isfile(path):
error = f"not a regular file: {path}"
raise argparse.ArgumentTypeError(error)
return path
def analyze_filters(filters):
"""Test user set filters"""
if not filters:
return (False, False)
# existing filters let copy fail
# only crude test; stream specifiers will let it fail as well
command = ["ffmpeg", "-v", "quiet",
"-f", "lavfi", "-i", "nullsrc",
"-f", "lavfi", "-i", "anullsrc",
"-t", "1", "-c:v", "copy",
"-filter_complex", filters,