-
Notifications
You must be signed in to change notification settings - Fork 74
/
Copy pathutils.py
219 lines (177 loc) · 5.76 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
import os
import shutil
import numpy as np
import librosa
import cv2
import subprocess as sp
from threading import Timer
def warpgrid(bs, HO, WO, warp=True):
# meshgrid
x = np.linspace(-1, 1, WO)
y = np.linspace(-1, 1, HO)
xv, yv = np.meshgrid(x, y)
grid = np.zeros((bs, HO, WO, 2))
grid_x = xv
if warp:
grid_y = (np.power(21, (yv+1)/2) - 11) / 10
else:
grid_y = np.log(yv * 10 + 11) / np.log(21) * 2 - 1
grid[:, :, :, 0] = grid_x
grid[:, :, :, 1] = grid_y
grid = grid.astype(np.float32)
return grid
def makedirs(path, remove=False):
if os.path.isdir(path):
if remove:
shutil.rmtree(path)
print('removed existing directory...')
else:
return
os.makedirs(path)
class AverageMeter(object):
"""Computes and stores the average and current value"""
def __init__(self):
self.initialized = False
self.val = None
self.avg = None
self.sum = None
self.count = None
def initialize(self, val, weight):
self.val = val
self.avg = val
self.sum = val*weight
self.count = weight
self.initialized = True
def update(self, val, weight=1):
val = np.asarray(val)
if not self.initialized:
self.initialize(val, weight)
else:
self.add(val, weight)
def add(self, val, weight):
self.val = val
self.sum += val * weight
self.count += weight
self.avg = self.sum / self.count
def value(self):
if self.val is None:
return 0.
else:
return self.val.tolist()
def average(self):
if self.avg is None:
return 0.
else:
return self.avg.tolist()
def recover_rgb(img):
for t, m, s in zip(img,
[0.485, 0.456, 0.406],
[0.229, 0.224, 0.225]):
t.mul_(s).add_(m)
img = (img.numpy().transpose((1, 2, 0)) * 255).astype(np.uint8)
return img
def magnitude2heatmap(mag, log=True, scale=200.):
if log:
mag = np.log10(mag + 1.)
mag *= scale
mag[mag > 255] = 255
mag = mag.astype(np.uint8)
mag_color = cv2.applyColorMap(mag, cv2.COLORMAP_JET)
mag_color = mag_color[:, :, ::-1]
return mag_color
def istft_reconstruction(mag, phase, hop_length=256):
spec = mag.astype(np.complex) * np.exp(1j*phase)
wav = librosa.istft(spec, hop_length=hop_length)
return np.clip(wav, -1., 1.)
class VideoWriter:
""" Combine numpy frames into video using ffmpeg
Arguments:
filename: name of the output video
fps: frame per second
shape: shape of video frame
Properties:
add_frame(frame):
add a frame to the video
add_frames(frames):
add multiple frames to the video
release():
release writing pipe
"""
def __init__(self, filename, fps, shape):
self.file = filename
self.fps = fps
self.shape = shape
# video codec
ext = filename.split('.')[-1]
if ext == "mp4":
self.vcodec = "h264"
else:
raise RuntimeError("Video codec not supoorted.")
# video writing pipe
cmd = [
"ffmpeg",
"-y", # overwrite existing file
"-f", "rawvideo", # file format
"-s", "{}x{}".format(shape[1], shape[0]), # size of one frame
"-pix_fmt", "rgb24", # 3 channels
"-r", str(self.fps), # frames per second
"-i", "-", # input comes from a pipe
"-an", # not to expect any audio
"-vcodec", self.vcodec, # video codec
"-pix_fmt", "yuv420p", # output video in yuv420p
self.file]
self.pipe = sp.Popen(cmd, stdin=sp.PIPE, stderr=sp.PIPE, bufsize=10**9)
def release(self):
self.pipe.stdin.close()
def add_frame(self, frame):
assert len(frame.shape) == 3
assert frame.shape[0] == self.shape[0]
assert frame.shape[1] == self.shape[1]
try:
self.pipe.stdin.write(frame.tostring())
except:
_, ffmpeg_error = self.pipe.communicate()
print(ffmpeg_error)
def add_frames(self, frames):
for frame in frames:
self.add_frame(frame)
def kill_proc(proc):
proc.kill()
print('Process running overtime! Killed.')
def run_proc_timeout(proc, timeout_sec):
# kill_proc = lambda p: p.kill()
timer = Timer(timeout_sec, kill_proc, [proc])
try:
timer.start()
proc.communicate()
finally:
timer.cancel()
def combine_video_audio(src_video, src_audio, dst_video, verbose=False):
try:
cmd = ["ffmpeg", "-y",
"-loglevel", "quiet",
"-i", src_video,
"-i", src_audio,
"-c:v", "copy",
"-c:a", "aac",
"-strict", "experimental",
dst_video]
proc = sp.Popen(cmd)
run_proc_timeout(proc, 10.)
if verbose:
print('Processed:{}'.format(dst_video))
except Exception as e:
print('Error:[{}] {}'.format(dst_video, e))
# save video to the disk using ffmpeg
def save_video(path, tensor, fps=25):
assert tensor.ndim == 4, 'video should be in 4D numpy array'
L, H, W, C = tensor.shape
writer = VideoWriter(
path,
fps=fps,
shape=[H, W])
for t in range(L):
writer.add_frame(tensor[t])
writer.release()
def save_audio(path, audio_numpy, sr):
librosa.output.write_wav(path, audio_numpy, sr)