diff --git a/EmiyaEngine.py b/EmiyaEngine.py index 29334b0..44216a6 100644 --- a/EmiyaEngine.py +++ b/EmiyaEngine.py @@ -36,6 +36,8 @@ class EmiyaEngine: output_file_path = '' output_sr = 0 split_size = 500 + # 时域处理模式=Akko 频域处理=None + mode="Akko" def __init__(self, input=None, output=None, output_sr=96000, split_size=500, cpu_thread=None): # 导入参数 @@ -60,6 +62,9 @@ def init_engine(self): self.input_file_path, sr=None, mono=False) print("Load signal complete. ChannelCount: %s SampleRate: %s Hz" % ( str(len(self.input_signal_array)), str(self.input_sr))) + # CAUTION + if self.mode == "Akko": + print("You are processing in AKKO mode.") def process_leader(self): # 左右声道 @@ -118,51 +123,88 @@ def process_core(self, signal_piece, index): this_temp_block = np.array([()]) this_temp_count = 0 # 各分片运算 - for i in range(this_div_count): - # 起始点 - this_start_pos = i * self.proc_fft_length - this_end_pos = i * self.proc_fft_length + self.proc_fft_length - # 当前分片 - this_proc_piece = signal_piece[this_start_pos:this_end_pos] - # 待修正尾部指示及尾部补齐长度 - this_suffix_flag = False - this_suffix_length = 0 - # 真尾部指示 - this_tail_flag = False - if i == (this_div_count - 1): - this_tail_flag = True - # 尾部判定 - if len(this_proc_piece) != 2048: - this_suffix_flag = True - # 尾部补齐及长度记录 - while len(this_proc_piece) != 2048: - this_proc_piece = np.append(this_proc_piece, [0]) - this_suffix_length += 1 - # FFT - this_proc_piece_fft = np.fft.fft(this_proc_piece, self.proc_fft_length) / (self.proc_fft_length) - # 计算接续点及最大幅值 - this_freq_thd, this_base_amp, this_threshold_point = self.find_threshold_point(this_proc_piece_fft*2) - # print("Max Amp -> %s Threshold point -> %s" % (this_base_freq, this_threshold_point)) - # 加抖动 - this_proc_piece_fft = self.generate_jitter(this_proc_piece_fft, this_freq_thd, this_base_amp, this_threshold_point) - # IFFT - this_proc_piece_ifft = np.fft.ifft(this_proc_piece_fft, n=self.proc_fft_length) - # 输出分片实部 - this_proc_piece = this_proc_piece_ifft.real - # 计算最终输出长度 (尾部需要排除掉补零部分) - this_append_length = self.proc_fft_length - if this_suffix_flag: - this_append_length -= this_suffix_length - # 直接追加到目标输出返回数组 - # this_output = np.append(this_output, this_proc_piece[0:this_append_length]) - # 使用缓冲区再输出返回 (内存消耗和上边的方法差不多,但是能稍微抵消掉 numpy 对大数组拼接的问题) - this_temp_block = np.append(this_temp_block, this_proc_piece[0:this_append_length]) - this_temp_count += 1 - # 尾部判定追加 - if this_temp_count > self.split_size or this_tail_flag: - this_output = np.append(this_output, this_temp_block) - this_temp_block = np.array([()]) - this_temp_count = 0 + if self.mode == "Akko": + # 是否第一次执行 + is_loop_once = True + # 前一次的数值 + pre_value = 0 + # 前一次操作的数值 + pre_opt = 0 + # Akko 系数: 直接影响频谱图的美观程度 + sv_l = 0.02 + sv_h = 0.55 + # 实际操作 + for i in range(this_whole_length): + # 构造抖动值 + this_value = signal_piece[i] + linear_jitter = 0 + if pre_value < this_value: + linear_jitter = random.uniform(this_value*-sv_l, this_value*sv_h) + else: + linear_jitter = random.uniform(this_value*sv_h, this_value*-sv_l) + # 应用抖动 + if pre_opt*linear_jitter > 0: + signal_piece[i] = this_value + linear_jitter + elif pre_opt*linear_jitter < 0: + signal_piece[i] = this_value - linear_jitter + else: + pass + # 第一次操作特殊化处理 + if is_loop_once: + linear_jitter = random.uniform(this_value*-sv_h, this_value*sv_h) + signal_piece[i] = this_value + linear_jitter + is_loop_once = False + # 保存到上一次记录 + pre_value = this_value + pre_opt = linear_jitter + + this_output = signal_piece + else: + for i in range(this_div_count): + # 起始点 + this_start_pos = i * self.proc_fft_length + this_end_pos = i * self.proc_fft_length + self.proc_fft_length + # 当前分片 + this_proc_piece = signal_piece[this_start_pos:this_end_pos] + # 待修正尾部指示及尾部补齐长度 + this_suffix_flag = False + this_suffix_length = 0 + # 真尾部指示 + this_tail_flag = False + if i == (this_div_count - 1): + this_tail_flag = True + # 尾部判定 + if len(this_proc_piece) != 2048: + this_suffix_flag = True + # 尾部补齐及长度记录 + while len(this_proc_piece) != 2048: + this_proc_piece = np.append(this_proc_piece, [0]) + this_suffix_length += 1 + # FFT + this_proc_piece_fft = np.fft.fft(this_proc_piece, self.proc_fft_length) / (self.proc_fft_length) + # 计算接续点及最大幅值 + this_freq_thd, this_base_amp, this_threshold_point = self.find_threshold_point(this_proc_piece_fft*2) + # print("Max Amp -> %s Threshold point -> %s" % (this_base_freq, this_threshold_point)) + # 加抖动 + this_proc_piece_fft = self.generate_jitter(this_proc_piece_fft, this_freq_thd, this_base_amp, this_threshold_point) + # IFFT + this_proc_piece_ifft = np.fft.ifft(this_proc_piece_fft, n=self.proc_fft_length) + # 输出分片实部 + this_proc_piece = this_proc_piece_ifft.real + # 计算最终输出长度 (尾部需要排除掉补零部分) + this_append_length = self.proc_fft_length + if this_suffix_flag: + this_append_length -= this_suffix_length + # 直接追加到目标输出返回数组 + # this_output = np.append(this_output, this_proc_piece[0:this_append_length]) + # 使用缓冲区再输出返回 (内存消耗和上边的方法差不多,但是能稍微抵消掉 numpy 对大数组拼接的问题) + this_temp_block = np.append(this_temp_block, this_proc_piece[0:this_append_length]) + this_temp_count += 1 + # 尾部判定追加 + if this_temp_count > self.split_size or this_tail_flag: + this_output = np.append(this_output, this_temp_block) + this_temp_block = np.array([()]) + this_temp_count = 0 return [this_output, index]