diff --git a/EmiyaEngine.py b/EmiyaEngine.py index 67fb7f6..62035af 100644 --- a/EmiyaEngine.py +++ b/EmiyaEngine.py @@ -9,10 +9,12 @@ # 只要蘊藏著想成為真物的意志,偽物就比真物還要來得真實 +import math import numpy as np import scipy import librosa import resampy +import random import multiprocessing @@ -27,8 +29,9 @@ class EmiyaEngine: mid_signal_larray = '' mid_signal_rarray = '' mid_signal_carray = '' - mid_process_ponit = 32 - mid_src_point = 2048 + # 各进程处理参数 + proc_mid_sr_rate = 32 + proc_fft_length = 2048 # 输出用参数 output_file_path = '' output_sr = 0 @@ -91,12 +94,85 @@ def process_leader(self): self.save_file() def process_core(self, signal_piece, index): + # SRC 预处理 + proc_mid_data = resampy.resample(signal_piece, + self.input_sr, + self.input_sr * self.proc_mid_sr_rate, + filter='kaiser_fast') + # 加抖动处理 - - return [signal_piece, index] + # SRC 后处理 + proc_fin_piece = resampy.resample(proc_mid_data, + self.input_sr * self.proc_mid_sr_rate, + self.output_sr, + filter='kaiser_fast') + return [proc_fin_piece, index] + + def find_threshold_point(self, input_fft): + # 鉴定频谱基本参数 + amp_fft = abs(input_fft[range(self.proc_fft_length // 2)]) + # Step0. 找出基波幅度 + base_amp_freq = amp_fft.max() + # Step1. 找出接续的阈值 + threshold_hit = 1.0e-11 # 方差判定阈值 + fin_threshold_point = 0 # 最后的阈值点 + find_range = int((self.proc_fft_length / 2) - 1) # 搜索的范围 + start_find_pos = round(2000 / (self.input_sr / (self.proc_fft_length / 2))) # 从2K频点附近开始寻找,加快速度 + start_flag = True # 循环用的启动Flag + loop_count = 0 # 循环计数器 + legal_freq = (self.input_sr / 2) - 500 # 判定结果合法的阈值频率 + forward_freq = 3000 # 前向修正频率 + order_freq = (self.input_sr / 2) - 6000 # 钦定频率 + # Rev.1: 检查接续点是否符合常理 + while start_flag or fin_threshold_point > round(legal_freq / (self.input_sr / (self.proc_fft_length / 2))): + start_flag = False + if (fin_threshold_point * (self.input_sr / (self.proc_fft_length / 2))) > int(self.input_sr / 2): + threshold_hit *= 2 + for i in range(start_find_pos, find_range): + if i + 5 > find_range: + break + # 计算连续五个采样*3 的方差,与阈值比较,判断频谱消失的位置 + if np.var(amp_fft[i:i + 4]) < threshold_hit and \ + np.var(amp_fft[i + 1:i + 5]) < threshold_hit: + # 定位到当前位置的前500Hz位置 + fin_threshold_point = i - round(forward_freq / (self.input_sr / (self.proc_fft_length / 2))) + break + # 错误超过5把就强行钦定频率 + loop_count += 1 + if loop_count > 5: + fin_threshold_point = round( + order_freq / (self.input_sr / (self.proc_fft_length / 2))) + break + return base_amp_freq, fin_threshold_point + + def generate_jitter(self, input_fft, fin_threshold_point, base_amp_freq): + # 构造抖动 + if fin_threshold_point <= 0: + return input_fft + for i in range(fin_threshold_point, self.proc_fft_length - fin_threshold_point): + # Rev.0: 调整生成概率,频率越高概率越低 + # Rev.1: 加入幅值判定,幅度越大概率越大 + gen_possible = abs((self.proc_fft_length / 2) - i) / ((self.proc_fft_length / + 2) - fin_threshold_point) * (base_amp_freq / 0.22) + if random.randint(0, 1000000) < 800000 * gen_possible: # 0<=x<=10 + real_value = abs(input_fft.real[i]) + base_jitter_min = real_value * 0.5 * (1 - gen_possible) + base_jitter_max = real_value * 6 * gen_possible + amp_jitter_min = base_amp_freq * real_value * 0.5 + amp_jitter_max = base_amp_freq * real_value * 2 + amp_jitter_prefix = - \ + 1 if random.randint(0, 100000) < 50000 else 1 + jitter_prefix = - \ + 1 if random.randint(0, 100000) < 50000 else 1 + delta_jitter_value = random.uniform( + base_jitter_min, base_jitter_max) + amp_jitter_prefix * random.uniform(amp_jitter_min, amp_jitter_max) + input_fft.real[ + i] += jitter_prefix * delta_jitter_value + return input_fft def save_file(self): - librosa.output.write_wav(self.output_file_path, self.mid_signal_carray, self.input_sr) + librosa.output.write_wav(self.output_file_path, self.mid_signal_carray, self.output_sr) + if __name__ == "__main__":