From 54f3a5e62101df33d312d6aa294a38665e0d5f22 Mon Sep 17 00:00:00 2001 From: Sg4Dylan Date: Sun, 1 Jul 2018 18:59:37 +0800 Subject: [PATCH] Init copyband version --- EmiyaEngine.py | 349 +++++++++---------------------------------------- README.md | 45 +------ 2 files changed, 70 insertions(+), 324 deletions(-) diff --git a/EmiyaEngine.py b/EmiyaEngine.py index b42cca7..1ae0e64 100644 --- a/EmiyaEngine.py +++ b/EmiyaEngine.py @@ -2,296 +2,73 @@ # -*- coding: utf-8 -*- # Emiya Engine -# Version: Alpha.1 Rev.1 +# Version: Alpha.3 Copyband aka DSEE HX # Author: Sg4Dylan - -# Created: 02/26/2017 +# Created: 07/01/2018 # 真正重要的東西,只用眼睛是看不見的, # 只要蘊藏著想成為真物的意志,偽物就比真物還要來得真實 - -import math import numpy as np -import scipy +import scipy.signal as signal import librosa import resampy -import random -import multiprocessing - - -class EmiyaEngine: - - # 输入参数 - input_file_path = '' - input_sr = 0 - input_signal_array = '' - # 中间处理参数 - cpu_thread = 0 - mid_signal_larray = '' - mid_signal_rarray = '' - mid_signal_carray = '' - # 各进程处理参数 - proc_mid_sr_rate = 16 - proc_fft_length = 2048 - # 输出用参数 - output_file_path = '' - output_sr = 0 - split_size = 500 - # 时域处理模式=Akko 频域处理=None - mode="Akko" - - def __init__(self, input=None, output=None, output_sr=96000, split_size=500, cpu_thread=None): - # 导入参数 - if not input or not output: - print("Missing parameters.") - return - # 读取 CPU 线程数 - if not cpu_thread: - self.cpu_thread = multiprocessing.cpu_count() - self.input_file_path = input - self.output_file_path = output - self.output_sr = output_sr - self.split_size = split_size - # 初始化程序 - self.init_engine() - # 启动处理 - self.process_leader() - - def init_engine(self): - # 将输入文件转为 numpy 数组 - self.input_signal_array, self.input_sr = librosa.load( - self.input_file_path, sr=None, mono=False) - print("Load signal complete. ChannelCount: %s SampleRate: %s Hz" % ( - str(len(self.input_signal_array)), str(self.input_sr))) - # CAUTION - if self.mode == "Akko": - print("You are processing in AKKO mode.") - - def process_leader(self): - # 左右声道 - for channel_index in range(2): - # 生成进程池 - pool = multiprocessing.Pool(processes=self.cpu_thread) - # 初始化结果字典 - raw_result_dict = {} - result_dict = {} - # SRC 预处理 - pre_src_data = resampy.resample(self.input_signal_array[channel_index], - self.input_sr, - self.input_sr * self.proc_mid_sr_rate, - filter='kaiser_fast') - # 分割时间域 - splited_input_signal = np.array_split(pre_src_data, self.cpu_thread) - for i in range(self.cpu_thread): - # 下发分片 - raw_result_dict[i] = pool.apply_async(self.process_core, (splited_input_signal[i], i, )) - # raw_result_dict[i] = self.process_core(splited_input_signal[i], i) - pool.close() - pool.join() - # 拼合分片 - temp_array = np.array([()]) - # 理清顺序 - for i in range(self.cpu_thread): - result_dict[raw_result_dict[i].get()[1]] = raw_result_dict[i].get()[0] - # result_dict[raw_result_dict[i][1]] = raw_result_dict[i][0] - # 追加分片 - for i in range(self.cpu_thread): - temp_array = np.append(temp_array, result_dict[i]) - print("Whole length: before -> %s after -> %s" % (len(pre_src_data), len(temp_array))) - # SRC 后处理 - temp_src_array = resampy.resample(temp_array, - self.input_sr * self.proc_mid_sr_rate, - self.output_sr, - filter='kaiser_fast') - # temp_src_array = temp_array - # 合并到各声道 - if channel_index == 0: - self.mid_signal_larray = temp_src_array - else: - self.mid_signal_rarray = temp_src_array - # 拼合左右声道 - self.mid_signal_carray = np.array([self.mid_signal_larray, self.mid_signal_rarray]) - # 保存文件 - self.save_file() - - def process_core(self, signal_piece, index): - # 总长及分割数目 - this_whole_length = len(signal_piece) - this_div_count = round(this_whole_length/self.proc_fft_length) - # 输出数组 - this_output = np.array([()]) - # 输出临时数组 - this_temp_block = np.array([()]) - this_temp_count = 0 - # 各分片运算 - if self.mode == "Akko": - # 是否第一次执行 - is_loop_once = True - # 前一次的数值 - pre_value = 0 - # 前一次操作的数值 - pre_opt = 0 - # Akko 系数: 直接影响频谱图的美观程度 - sv_l = 0.02 - sv_h = 0.55 - # 实际操作 - for i in range(this_whole_length): - # 构造抖动值 - this_value = signal_piece[i] - linear_jitter = 0 - if pre_value < this_value: - linear_jitter = random.uniform(this_value*-sv_l, this_value*sv_h) - else: - linear_jitter = random.uniform(this_value*sv_h, this_value*-sv_l) - # 应用抖动 - if pre_opt*linear_jitter > 0: - signal_piece[i] = this_value + linear_jitter - elif pre_opt*linear_jitter < 0: - signal_piece[i] = this_value - linear_jitter - else: - pass - # 第一次操作特殊化处理 - if is_loop_once: - linear_jitter = random.uniform(this_value*-sv_h, this_value*sv_h) - signal_piece[i] = this_value + linear_jitter - is_loop_once = False - # 保存到上一次记录 - pre_value = this_value - pre_opt = linear_jitter - - this_output = signal_piece - else: - for i in range(this_div_count): - # 起始点 - this_start_pos = i * self.proc_fft_length - this_end_pos = i * self.proc_fft_length + self.proc_fft_length - # 当前分片 - this_proc_piece = signal_piece[this_start_pos:this_end_pos] - # 待修正尾部指示及尾部补齐长度 - this_suffix_flag = False - this_suffix_length = 0 - # 真尾部指示 - this_tail_flag = False - if i == (this_div_count - 1): - this_tail_flag = True - # 尾部判定 - if len(this_proc_piece) != 2048: - this_suffix_flag = True - # 尾部补齐及长度记录 - while len(this_proc_piece) != 2048: - this_proc_piece = np.append(this_proc_piece, [0]) - this_suffix_length += 1 - # FFT - this_proc_piece_fft = np.fft.fft(this_proc_piece, self.proc_fft_length) / (self.proc_fft_length) - # 计算接续点及最大幅值 - this_freq_thd, this_base_amp, this_threshold_point = self.find_threshold_point(this_proc_piece_fft*2) - # print("Max Amp -> %s Threshold point -> %s" % (this_base_freq, this_threshold_point)) - # 加抖动 - this_proc_piece_fft = self.generate_jitter(this_proc_piece_fft, this_freq_thd, this_base_amp, this_threshold_point) - # IFFT - this_proc_piece_ifft = np.fft.ifft(this_proc_piece_fft, n=self.proc_fft_length) - # 输出分片实部 - this_proc_piece = this_proc_piece_ifft.real - # 计算最终输出长度 (尾部需要排除掉补零部分) - this_append_length = self.proc_fft_length - if this_suffix_flag: - this_append_length -= this_suffix_length - # 直接追加到目标输出返回数组 - # this_output = np.append(this_output, this_proc_piece[0:this_append_length]) - # 使用缓冲区再输出返回 (内存消耗和上边的方法差不多,但是能稍微抵消掉 numpy 对大数组拼接的问题) - this_temp_block = np.append(this_temp_block, this_proc_piece[0:this_append_length]) - this_temp_count += 1 - # 尾部判定追加 - if this_temp_count > self.split_size or this_tail_flag: - this_output = np.append(this_output, this_temp_block) - this_temp_block = np.array([()]) - this_temp_count = 0 - - return [this_output, index] - - def find_threshold_point(self, input_fft): - # 鉴定频谱基本参数 - amp_fft = abs(input_fft[range(self.proc_fft_length // 2)]) - # Step0. 计算基波幅度及该幅度所在的位置 - base_amp = amp_fft.max() - base_amp_freq = np.argmax(amp_fft) - # Step0.1. 计算次峰对应的位置 - secondary_array = amp_fft - secondary_array[base_amp_freq] = 0 - secondary_amp = secondary_array.max() - secondary_amp_freq = np.argmax(secondary_array) + 1 - # Step1. 找出接续点的搜索范围 - fft_resolution = (self.proc_mid_sr_rate * self.input_sr / 2) / (self.proc_fft_length / 2) - hit_start = base_amp_freq * 2 - hit_end = int((self.input_sr / 2) / fft_resolution) - # Step1.1 计算保护阈值频率所在位置 - freq_thd = secondary_amp_freq * 2 - # Setp2. 找出接续点 - threshold_hit = 8.0e-10 - fin_threshold_point = 0 - for hit_pos in range(hit_start, hit_end): - if hit_pos+6>1023: - break - if np.var(amp_fft[hit_pos:hit_pos+4]) < threshold_hit and \ - np.var(amp_fft[hit_pos+1:hit_pos+5]) < threshold_hit and \ - np.var(amp_fft[hit_pos+2:hit_pos+6]) < threshold_hit: - fin_threshold_point = hit_pos - 3 - break - # Setp3. 检查接续点 - # Step3.1 检查最高电平是否达标 - threshold_amp = 7.45e-8 - if base_amp < threshold_amp: - fin_threshold_point = 0 - else: - # Step3.2 检查目标频率是否小于等于 0 - if fin_threshold_point <= 0: - if base_amp_freq != 0: - fin_threshold_point = hit_start * 3 # 钦定三次谐波位置 - else: - fin_threshold_point = secondary_amp_freq * 2 # 钦定二次谐波位置 - # 打印 DEBUG - print("AMP-> %s THD-> %s START-> %s END-> %s HIT_POINT-> %s " % (base_amp, freq_thd, hit_start, hit_end, fin_threshold_point)) - return freq_thd, base_amp, fin_threshold_point - - def generate_jitter(self, input_fft, freq_thd, base_amp, fin_threshold_point): - # 判定点为0时做忽略处理 - if fin_threshold_point == 0: - return input_fft - # 插入抖动 - for i in range(fin_threshold_point, self.proc_fft_length - fin_threshold_point): - # 生成概率,频率越高概率越低 - if fin_threshold_point >= freq_thd: - gen_possible = abs((self.proc_fft_length / 2) - i) / ((self.proc_fft_length / 2) - fin_threshold_point) - else: - if i > (self.proc_fft_length / 2): - gen_possible = (i - (self.proc_fft_length - freq_thd)) / (freq_thd - fin_threshold_point) - else: - gen_possible = (freq_thd - i) / (freq_thd - fin_threshold_point) - if random.randint(0, 1000000) < 800000 * gen_possible: # 0<=x<=10 - # 基础范围倍率 - base_jitter_rate = [0.15,1.75] # 适用于乐器纯音乐、古典乐、轻音乐(频谱不是特别亮) - # base_jitter_rate = [0.7,2.5] # 适用于流行乐、电子乐、摇滚乐(频谱亮且充满理论范围) - # 计算基础抖动范围 - base_jitter_delta = abs(input_fft.real[i]) - base_jitter_min = base_jitter_delta * base_jitter_rate[0] * (1 - gen_possible) - base_jitter_max = base_jitter_delta * base_jitter_rate[1] * gen_possible - # 根据基波电平的额外抖动 - amp_jitter_min = base_amp * base_jitter_delta * 0.05 - amp_jitter_max = base_amp * base_jitter_delta * 1.5 - # 随机正负号添加 - amp_jitter_prefix = -1 if random.randint(0, 100000) < 50000 else 1 - jitter_prefix = - 1 if random.randint(0, 100000) < 50000 else 1 - # 加入抖动 - delta_jitter_value = random.uniform(base_jitter_min, base_jitter_max) + amp_jitter_prefix * random.uniform(amp_jitter_min, amp_jitter_max) - input_fft.real[i] += jitter_prefix * delta_jitter_value - return input_fft - - def save_file(self): - librosa.output.write_wav(self.output_file_path, self.mid_signal_carray, self.output_sr) - - -if __name__ == "__main__": - - multiprocessing.freeze_support() - # 输入文件路径, 输出文件路径 - EmiyaEngine("demi.mp3","demi-output.wav") - +from tqdm import tqdm + +# 待处理文件位置 +file_path = 'Input.mp3' +# 欲输出采样率 +output_sr = 48000 +# 中间处理采样率倍数 +mid_sr_rate = 1 +# HPF 截止频率, 调制频率, 增益 +# (请在观察过原始音频频谱后修改,必须手工修改后使用) +harmonic_hpfc,harmonic_sft,harmonic_gain = 3000,4200,0.9 +percussive_hpfc,percussive_stf,percussive_gain = 2000,4000,1.5 + +def hpd_n_shift(data, lpf, sft, gain): + # 高通滤波 + b,a = signal.butter(3,lpf/(sr/2),'high') + data = librosa.stft(signal.filtfilt(b,a,librosa.istft(data))) + # 拷贝频谱 + for i in tqdm(range(data.shape[1]),unit='Segment',ascii=True): + shift = sft + shift_point = round(shift/(sr/data.shape[0])) + # 调制 + for p in reversed(range(len(chan[:,i]))): + data[:,i][p] = data[:,i][p-shift_point] + # 高通滤波 + data = librosa.stft(signal.filtfilt(b,a,librosa.istft(data))) + data *= gain + return data + +# 加载音频 +print('Opening file...') +y, sr = librosa.load(file_path,mono=False,sr=None) # offset=40,duration=5, +print('Resampling to HiRes...') +y = resampy.resample(y, sr, output_sr * mid_sr_rate, filter='kaiser_fast') +# 产生 STFT 谱 +print('Generating STFT data...') +stft_list = [librosa.stft(chan) for chan in y] +# 显示基本信息 +print(f'InputSr: {sr}, OutputSr: {output_sr}, Shape: {stft_list[0].shape}') + +# 谐波增强模式 +print('Processing...') +for chan in stft_list: + print('Generating HPSS data...') + D_harmonic,D_percussive = librosa.decompose.hpss(chan, margin=4) + print('...') + D_harmonic = hpd_n_shift(D_harmonic,harmonic_hpfc,harmonic_sft,harmonic_gain) + D_percussive = hpd_n_shift(D_percussive,percussive_hpfc,percussive_stf,percussive_gain) + chan += D_harmonic + chan += D_percussive + +# 合并输出 +print('Generating output file...') +istft_list = [librosa.istft(chan) for chan in stft_list] +final_data = resampy.resample(np.array(istft_list), + output_sr * mid_sr_rate, + output_sr, + filter='kaiser_fast') +print('Writing wave...') +librosa.output.write_wav('eh_output.wav', final_data, output_sr) diff --git a/README.md b/README.md index e0c0097..6be64aa 100644 --- a/README.md +++ b/README.md @@ -4,44 +4,13 @@ > "只要蘊藏著想成為真物的意志,偽物就比真物還要來得真實。" Emiya Engine 是一个用来丰富音频频谱的脚本。可以将频谱变得好看那么一点。 -原理是使用 FFT (快速傅立叶变换) 将音频信号采样转到频域,在频域上为空白的频谱加上与时域幅值相称的微小抖动。 -###敬告: -你当前处于 dev 分支,本分支下的程序还处于测试开发阶段,使用前请三思。 +### 使用说明 +当前分支使用的技术类似于大法的 DSEE HX,透过频带拷贝增强效果。 +效果虽好,但需要使用者具有一定的观察能力。 +请务必阅读完程序内参数注释再使用。 +不同于 master 分支或 dev 分支,当前分支版本伴随的噪声更小。 -### 当前版本: -Alpha.1 Rev.1 -~~终于 Alpha.1 了~~ +### 特别提醒 +~~请不要使用这个脚本制造 `'HiRes'` 逗玄学家玩~~ -### 使用注意 -新架构已采用多进程并行处理。在数据未处理完成前,数据将占用大量内存。 -没有 8GB 及更大物理内存的机器,请尝试降低 proc_mid_sr_rate (不低于2), -或者初始化类加入 cpu_thread 参数指定使用的进程数量 (不包含主进程)。 -作者听歌去了,觉得有意思就点个 star 吧。 - -### 关于 Akko 模式 -~~首先要说,《Little Witch Academia》真好看。~~ -这个模式下,程序在时域(并不是原来的频域)动态加随机抖动。 -具体的配置值只有抖动的上下限两个值,具体地在 134L 及 135L (Alpha.1 Rev.1), -欢迎调戏使用 - -### 已经做的: - - - 重构程序结构 - - 改成多进程处理结构 - -### 接下来做的(咕咕咕咕咕): - - - 测试消除掉可以听见的噪音 - - 构建 GUI 界面 - -### 当前改进思路(已经完成): - -提高因采样点数不足而下降的 FFT 精度: -对低采样点数做时域重采样。 - -设置灵活的输出采样率: -处理过程中生成目标采样率。 - -多进程处理结构: -分割时域处理后等待同步合并。 \ No newline at end of file