From 04b7c1bf50e080646082592f23f59077f3bf458f Mon Sep 17 00:00:00 2001 From: duembgen Date: Wed, 28 Apr 2021 09:28:06 +0200 Subject: [PATCH] Reformatting --- flight_contol/fly_forward_turn_forward.py | 23 +- flight_contol/fly_hover_land.py | 18 +- flight_contol/fly_hover_land_buzz.py | 33 +-- flight_contol/test_buzzer.py | 81 +++--- flight_contol/test_flow_detect.py | 16 +- python/TimerTuning.py | 144 ++++++----- python/TimerTuning_raw.py | 38 ++- python/algos_basics.py | 132 +++++++--- python/algos_beamforming.py | 44 ++-- python/bin_selection.py | 297 +++++++++++++++++++--- python/bin_selection_research.py | 70 ++--- python/convert_memory_dump.py | 18 +- python/convert_memory_dump_dma.py | 16 +- python/file_parser.py | 90 +++++-- python/mic_array.py | 35 ++- python/play_and_record.py | 60 +++-- python/play_and_record_with_thrust.py | 39 +-- python/pwm_tuning.py | 69 ++--- python/reader_crtp.py | 283 ++++++++++++--------- python/serial_motors.py | 66 ++--- python/signals.py | 64 +++-- python/test_beamforming.py | 112 +++++--- python/test_signals.py | 17 +- 23 files changed, 1142 insertions(+), 623 deletions(-) diff --git a/flight_contol/fly_forward_turn_forward.py b/flight_contol/fly_forward_turn_forward.py index c78e81b2..6f9d3b1a 100644 --- a/flight_contol/fly_forward_turn_forward.py +++ b/flight_contol/fly_forward_turn_forward.py @@ -7,7 +7,7 @@ from cflib.positioning.motion_commander import MotionCommander -URI = 'radio://0/80/2M/E7E7E7E7E7' +URI = "radio://0/80/2M/E7E7E7E7E7" DEFAULT_HEIGHT = 0.5 is_deck_attached = False @@ -19,15 +19,15 @@ def move_linear_simple(scf): with MotionCommander(scf, default_height=DEFAULT_HEIGHT) as mc: print("Take off") time.sleep(1) - + print("Forward") mc.forward(0.25) time.sleep(1) - + print("Turn left") mc.turn_left(180) time.sleep(1) - + print("Forward") mc.forward(0.25) time.sleep(1) @@ -42,21 +42,22 @@ def param_deck_flow(name, value): print(type(value)) - if value == '1': + if value == "1": is_deck_attached = True - print('Deck is attached!') + print("Deck is attached!") else: is_deck_attached = False - print('Deck is NOT attached!') + print("Deck is NOT attached!") -if __name__ == '__main__': +if __name__ == "__main__": cflib.crtp.init_drivers(enable_debug_driver=False) - with SyncCrazyflie(URI, cf=Crazyflie(rw_cache='./cache')) as scf: + with SyncCrazyflie(URI, cf=Crazyflie(rw_cache="./cache")) as scf: - scf.cf.param.add_update_callback(group='deck', name='bcFlow2', - cb=param_deck_flow) + scf.cf.param.add_update_callback( + group="deck", name="bcFlow2", cb=param_deck_flow + ) time.sleep(1) if is_deck_attached: diff --git a/flight_contol/fly_hover_land.py b/flight_contol/fly_hover_land.py index 58c02108..9b7199c6 100644 --- a/flight_contol/fly_hover_land.py +++ b/flight_contol/fly_hover_land.py @@ -25,6 +25,7 @@ z_filtered_logs = np.empty(1) vz_logs = np.empty(1) + def initialization(scf): cf = scf.cf activate_high_level_commander(cf) @@ -34,6 +35,7 @@ def initialization(scf): time.sleep(2) return cf.high_level_commander + def low_pass(z, z_old, dt, fc): tau = 1 / (2 * np.pi * fc) alpha = dt / tau @@ -67,7 +69,7 @@ def log_func(scf): vz = (z - z_filtered_logs[-1]) / dt vz_logs = np.append(vz_logs, vz) z_filtered_logs = np.append(z_filtered_logs, z) - print('thrust', data["stabilizer.thrust"]) + print("thrust", data["stabilizer.thrust"]) def take_off(commander): @@ -76,13 +78,14 @@ def take_off(commander): commander.takeoff(height, 0.5) time.sleep(1.0) + def param_update_callback(name, value): - print('The crazyflie has parameter ' + name + ' set at number: ' + value) + print("The crazyflie has parameter " + name + " set at number: " + value) if __name__ == "__main__": - #cf_id = "E7E7E7E7E8" - #id = f"radio://0/80/2M/{cf_id}" + # cf_id = "E7E7E7E7E8" + # id = f"radio://0/80/2M/{cf_id}" cf_id = "E7E7E7E7E7" id = f"radio://0/70/2M/{cf_id}" @@ -95,7 +98,7 @@ def param_update_callback(name, value): time_start = time.process_time() take_off(commander) - + try: Thread(target=log_func, args=[scf]).start() @@ -105,17 +108,16 @@ def param_update_callback(name, value): commander.land(0.0, 1.0) time.sleep(2) commander.stop() - + print("flying") time.sleep(2) - + print("landing") commander.land(0.0, 1.0) commander.stop() time.sleep(2) - if PLOTTING: fig, axs = plt.subplots(2) axs[0].plot(z_ranger_logs) diff --git a/flight_contol/fly_hover_land_buzz.py b/flight_contol/fly_hover_land_buzz.py index 481efbdb..b876bd28 100644 --- a/flight_contol/fly_hover_land_buzz.py +++ b/flight_contol/fly_hover_land_buzz.py @@ -27,6 +27,7 @@ id = "radio://0/80/2M" is_deck_attached = False + def initialization(scf): cf = scf.cf activate_high_level_commander(cf) @@ -36,15 +37,17 @@ def initialization(scf): time.sleep(2) return cf.high_level_commander + def param_deck_flow(name, value): global is_deck_attached print(value) if value: is_deck_attached = True - print('Deck is attached!') + print("Deck is attached!") else: is_deck_attached = False - print('Deck is NOT attached!') + print("Deck is NOT attached!") + def low_pass(z, z_old, dt, fc): tau = 1 / (2 * np.pi * fc) @@ -88,17 +91,17 @@ def take_off(commander): commander.takeoff(height, 0.5) time.sleep(1.0) + def param_update_callback(name, value): - print('The crazyflie has parameter ' + name + ' set at number: ' + value) + print("The crazyflie has parameter " + name + " set at number: " + value) def simple_param_async(scf, groupstr, namestr, value): cf = scf.cf - full_name = groupstr + '.' + namestr + full_name = groupstr + "." + namestr - cf.param.add_update_callback(group=groupstr, name=namestr, - cb=param_update_callback) - #time.sleep(1) + cf.param.add_update_callback(group=groupstr, name=namestr, cb=param_update_callback) + # time.sleep(1) cf.param.set_value(full_name, value) @@ -112,7 +115,7 @@ def simple_param_async(scf, groupstr, namestr, value): time_start = time.process_time() take_off(commander) - + try: Thread(target=log_func, args=[scf]).start() @@ -122,23 +125,21 @@ def simple_param_async(scf, groupstr, namestr, value): commander.land(0.0, 1.0) time.sleep(2) commander.stop() - - simple_param_async(scf, 'sound', 'effect', 12) - simple_param_async(scf, 'sound', 'freq', 600) - simple_param_async(scf, 'sound', 'ratio', 0) - + simple_param_async(scf, "sound", "effect", 12) + simple_param_async(scf, "sound", "freq", 600) + simple_param_async(scf, "sound", "ratio", 0) + print("Flying") time.sleep(2) - + print("Landing") commander.land(0.0, 1.0) commander.stop() - simple_param_async(scf, 'sound', 'effect', 0) + simple_param_async(scf, "sound", "effect", 0) time.sleep(2) - if PLOTTING: fig, axs = plt.subplots(2) diff --git a/flight_contol/test_buzzer.py b/flight_contol/test_buzzer.py index 8f4b31fc..e46168dd 100644 --- a/flight_contol/test_buzzer.py +++ b/flight_contol/test_buzzer.py @@ -8,29 +8,28 @@ from cflib.crazyflie.syncLogger import SyncLogger # URI to the Crazyflie to connect to -uri = 'radio://0/80/2M/E7E7E7E7E7' +uri = "radio://0/80/2M/E7E7E7E7E7" # Only output errors from the logging framework logging.basicConfig(level=logging.ERROR) def param_update_callback(name, value): - print('The crazyflie has parameter ' + name + ' set at number: ' + value) + print("The crazyflie has parameter " + name + " set at number: " + value) def simple_param_async(scf, groupstr, namestr, value): cf = scf.cf - full_name = groupstr + '.' + namestr + full_name = groupstr + "." + namestr - cf.param.add_update_callback(group=groupstr, name=namestr, - cb=param_update_callback) - #time.sleep(1) + cf.param.add_update_callback(group=groupstr, name=namestr, cb=param_update_callback) + # time.sleep(1) cf.param.set_value(full_name, value) - def log_stab_callback(timestamp, data, logconf): - print('[%d][%s]: %s' % (timestamp, logconf.name, data)) + print("[%d][%s]: %s" % (timestamp, logconf.name, data)) + def simple_log_async(scf, logconf): cf = scf.cf @@ -39,6 +38,8 @@ def simple_log_async(scf, logconf): logconf.start() time.sleep(5) logconf.stop() + + def simple_log(scf, logconf): with SyncLogger(scf, lg_stab) as logger: @@ -49,62 +50,54 @@ def simple_log(scf, logconf): data = log_entry[1] logconf_name = log_entry[2] - print('[%d][%s]: %s' % (timestamp, logconf_name, data)) + print("[%d][%s]: %s" % (timestamp, logconf_name, data)) break + + def simple_connect(): print("Yeah, I'm connected! :D") time.sleep(3) print("Now I will disconnect :'(") -if __name__ == '__main__': + +if __name__ == "__main__": # Initialize the low-level drivers (don't list the debug drivers) cflib.crtp.init_drivers(enable_debug_driver=False) - lg_stab = LogConfig(name='Stabilizer', period_in_ms=10) - lg_stab.add_variable('stabilizer.roll', 'float') - lg_stab.add_variable('stabilizer.pitch', 'float') - lg_stab.add_variable('stabilizer.yaw', 'float') + lg_stab = LogConfig(name="Stabilizer", period_in_ms=10) + lg_stab.add_variable("stabilizer.roll", "float") + lg_stab.add_variable("stabilizer.pitch", "float") + lg_stab.add_variable("stabilizer.yaw", "float") - group = 'sound' - name = 'effect' + group = "sound" + name = "effect" - with SyncCrazyflie(uri, cf=Crazyflie(rw_cache='./cache')) as scf: - simple_param_async(scf, 'sound', 'effect', 0) + with SyncCrazyflie(uri, cf=Crazyflie(rw_cache="./cache")) as scf: + simple_param_async(scf, "sound", "effect", 0) time.sleep(2) - - - simple_param_async(scf, 'sound', 'effect', 12) - simple_param_async(scf, 'sound', 'freq', 2000) - simple_param_async(scf, 'sound', 'ratio', 1) + + simple_param_async(scf, "sound", "effect", 12) + simple_param_async(scf, "sound", "freq", 2000) + simple_param_async(scf, "sound", "ratio", 1) time.sleep(5) - simple_param_async(scf, 'sound', 'effect', 12) - simple_param_async(scf, 'sound', 'freq', 600) - simple_param_async(scf, 'sound', 'ratio', 0) + simple_param_async(scf, "sound", "effect", 12) + simple_param_async(scf, "sound", "freq", 600) + simple_param_async(scf, "sound", "ratio", 0) time.sleep(5) - simple_param_async(scf, 'sound', 'effect', 0) + simple_param_async(scf, "sound", "effect", 0) time.sleep(2) - simple_param_async(scf, 'sound', 'effect', 12) - simple_param_async(scf, 'sound', 'freq', 600) - simple_param_async(scf, 'sound', 'ratio', 10) + simple_param_async(scf, "sound", "effect", 12) + simple_param_async(scf, "sound", "freq", 600) + simple_param_async(scf, "sound", "ratio", 10) time.sleep(5) - simple_param_async(scf, 'sound', 'effect', 12) - simple_param_async(scf, 'sound', 'freq', 600) - simple_param_async(scf, 'sound', 'ratio', 0) + simple_param_async(scf, "sound", "effect", 12) + simple_param_async(scf, "sound", "freq", 600) + simple_param_async(scf, "sound", "ratio", 0) time.sleep(5) - simple_param_async(scf, 'sound', 'effect', 0) + simple_param_async(scf, "sound", "effect", 0) time.sleep(2) - - - - - - - - - - diff --git a/flight_contol/test_flow_detect.py b/flight_contol/test_flow_detect.py index 3077f9ae..198f4de5 100644 --- a/flight_contol/test_flow_detect.py +++ b/flight_contol/test_flow_detect.py @@ -6,28 +6,30 @@ from cflib.crazyflie.syncCrazyflie import SyncCrazyflie from cflib.positioning.motion_commander import MotionCommander -URI = 'radio://0/80/2M/E7E7E7E7E7' +URI = "radio://0/80/2M/E7E7E7E7E7" is_deck_attached = False logging.basicConfig(level=logging.ERROR) + def param_deck_flow(name, value): global is_deck_attached print(value) if value: is_deck_attached = True - print('Deck is attached!') + print("Deck is attached!") else: is_deck_attached = False - print('Deck is NOT attached!') + print("Deck is NOT attached!") -if __name__ == '__main__': +if __name__ == "__main__": cflib.crtp.init_drivers(enable_debug_driver=False) - with SyncCrazyflie(URI, cf=Crazyflie(rw_cache='./cache')) as scf: + with SyncCrazyflie(URI, cf=Crazyflie(rw_cache="./cache")) as scf: - scf.cf.param.add_update_callback(group='deck', name='bcFlow2', - cb=param_deck_flow) + scf.cf.param.add_update_callback( + group="deck", name="bcFlow2", cb=param_deck_flow + ) time.sleep(1) diff --git a/python/TimerTuning.py b/python/TimerTuning.py index a49369a9..c047abe6 100644 --- a/python/TimerTuning.py +++ b/python/TimerTuning.py @@ -1,8 +1,25 @@ import numpy as np import pandas as pd -TARGET_Fs = [3000, 3125, 3250, 3375, 3500, 3625, 3750, 3875, 4000, 4125, 4250, 4375, 4500, 4625, 4750, 4875] -#TARGET_Fs = { 9391, 9672, 9953, 10266, 10594, 10938, 11328, 11734, 12172, 12641, 13141, 13688, 14281, 14922, 15641} +TARGET_Fs = [ + 3000, + 3125, + 3250, + 3375, + 3500, + 3625, + 3750, + 3875, + 4000, + 4125, + 4250, + 4375, + 4500, + 4625, + 4750, + 4875, +] +# TARGET_Fs = { 9391, 9672, 9953, 10266, 10594, 10938, 11328, 11734, 12172, 12641, 13141, 13688, 14281, 14922, 15641} TARGET_F = 1188 # In Hz so 50.0 is 0.020 seconds period and 0.25 is 4 seconds period CLOCK_MCU = 84000000 TOLERANCE = 0.00018 @@ -32,7 +49,7 @@ def perfect_divisors(): def add_exact_period(prescaler): entries = [] arr = CLOCK_MCU / (TARGET_F * prescaler) - if (arr == int(arr)): + if arr == int(arr): entry = [prescaler, arr, TARGET_F, 0.0] entries.append(entry) return entries @@ -53,7 +70,7 @@ def possible_prescaler_value(): def close_divisor(psc, tolerance): arr = CLOCK_MCU / (TARGET_F * psc) error = abs_error(int(arr), arr) - #if error < tolerance and arr < 65536.0: + # if error < tolerance and arr < 65536.0: if error < tolerance and arr < 65536.0 and arr > 2 and (int(arr) % 2) != 0: h = hertz(CLOCK_MCU, psc, int(arr)) return psc, int(arr), h, error @@ -63,60 +80,65 @@ def close_divisor(psc, tolerance): # ------------------------------------------------------------------------ for TARGET_F in TARGET_Fs: - # Make a dataframe to hold results as we compute them - df = pd.DataFrame(columns=['PSC', 'ARR', 'F', 'ERROR'], dtype=np.double) - - # Get exact prescalars first. - exact_prescalers = perfect_divisors() - - exact_values = [] - for index in range(len(exact_prescalers)): - rows = add_exact_period(exact_prescalers[index]) - for rowindex in range(len(rows)): - df = df.append(pd.DataFrame(np.array(rows[rowindex]).reshape(1, 4), columns=df.columns)) - - # Get possible prescalers. - poss_prescalers = possible_prescaler_value() - close_prescalers = [] - for index in range(len(poss_prescalers)): - value = close_divisor(poss_prescalers[index], TOLERANCE) - if value is not None: - close_prescalers.append((value[0], value[1], value[2], value[3])) - df = df.append(pd.DataFrame(np.array(close_prescalers).reshape(len(close_prescalers), 4), columns=df.columns)) - - # Adjust PSC and ARR values by -1 to reflect the way you'd code them. - df['PSC'] = df['PSC'] - 1 - df['ARR'] = df['ARR'] - 1 - - # Sort first by errors (zeroes and lowest errors at top of list, and - # then by prescaler value (ascending). - df = df.sort_values(['ERROR', 'PSC']) - - # Make and populate column indicating if combination is exact. - #df['EXACT'] = pd.Series("?", index=df.index) - #df['EXACT'] = np.where(df['ERROR'] == 0.0, "YES", "NO") - - # Format for output. - df['PSC'] = df['PSC'].map('{:.0f}'.format) - df['ARR'] = df['ARR'].map('{:.0f}'.format) - df['F'] = df['F'].map('{:.6f}'.format) - df['ERROR'] = df['ERROR'].map('{:.10f}'.format) - - output = df.to_string() - - #print(output) - #print('these are the ', df.shape[0], ' total combination meeting your tolerance requirement') - - - freal = df.iloc[[0]]['F'].values[0] - psc = df.iloc[[0]]['PSC'].values[0] - arr = df.iloc[[0]]['ARR'].values[0] - err = float(df.iloc[[0]]['ERROR'].values[0])*1000000 - - - #print(f'f = {TARGET_F} PSC = {psc}, ARR = {arr}, err = {err:.0f}') - print(f'{{{TARGET_F}, {psc}, {arr}, {err:.0f}}},') - #print(output[4:]) - #print(output[1:200]) -# print() -exit(0) \ No newline at end of file + # Make a dataframe to hold results as we compute them + df = pd.DataFrame(columns=["PSC", "ARR", "F", "ERROR"], dtype=np.double) + + # Get exact prescalars first. + exact_prescalers = perfect_divisors() + + exact_values = [] + for index in range(len(exact_prescalers)): + rows = add_exact_period(exact_prescalers[index]) + for rowindex in range(len(rows)): + df = df.append( + pd.DataFrame(np.array(rows[rowindex]).reshape(1, 4), columns=df.columns) + ) + + # Get possible prescalers. + poss_prescalers = possible_prescaler_value() + close_prescalers = [] + for index in range(len(poss_prescalers)): + value = close_divisor(poss_prescalers[index], TOLERANCE) + if value is not None: + close_prescalers.append((value[0], value[1], value[2], value[3])) + df = df.append( + pd.DataFrame( + np.array(close_prescalers).reshape(len(close_prescalers), 4), + columns=df.columns, + ) + ) + + # Adjust PSC and ARR values by -1 to reflect the way you'd code them. + df["PSC"] = df["PSC"] - 1 + df["ARR"] = df["ARR"] - 1 + + # Sort first by errors (zeroes and lowest errors at top of list, and + # then by prescaler value (ascending). + df = df.sort_values(["ERROR", "PSC"]) + + # Make and populate column indicating if combination is exact. + # df['EXACT'] = pd.Series("?", index=df.index) + # df['EXACT'] = np.where(df['ERROR'] == 0.0, "YES", "NO") + + # Format for output. + df["PSC"] = df["PSC"].map("{:.0f}".format) + df["ARR"] = df["ARR"].map("{:.0f}".format) + df["F"] = df["F"].map("{:.6f}".format) + df["ERROR"] = df["ERROR"].map("{:.10f}".format) + + output = df.to_string() + + # print(output) + # print('these are the ', df.shape[0], ' total combination meeting your tolerance requirement') + + freal = df.iloc[[0]]["F"].values[0] + psc = df.iloc[[0]]["PSC"].values[0] + arr = df.iloc[[0]]["ARR"].values[0] + err = float(df.iloc[[0]]["ERROR"].values[0]) * 1000000 + + # print(f'f = {TARGET_F} PSC = {psc}, ARR = {arr}, err = {err:.0f}') + print(f"{{{TARGET_F}, {psc}, {arr}, {err:.0f}}},") + # print(output[4:]) + # print(output[1:200]) +# print() +exit(0) diff --git a/python/TimerTuning_raw.py b/python/TimerTuning_raw.py index bb47229b..231241f4 100644 --- a/python/TimerTuning_raw.py +++ b/python/TimerTuning_raw.py @@ -61,7 +61,7 @@ def close_divisor(psc, tolerance): # ------------------------------------------------------------------------ # Make a dataframe to hold results as we compute them -df = pd.DataFrame(columns=['PSC', 'ARR', 'F', 'ERROR'], dtype=np.double) +df = pd.DataFrame(columns=["PSC", "ARR", "F", "ERROR"], dtype=np.double) # Get exact prescalars first. exact_prescalers = perfect_divisors() @@ -69,7 +69,9 @@ def close_divisor(psc, tolerance): for index in range(len(exact_prescalers)): rows = add_exact_period(exact_prescalers[index]) for rowindex in range(len(rows)): - df = df.append(pd.DataFrame(np.array(rows[rowindex]).reshape(1, 4), columns=df.columns)) + df = df.append( + pd.DataFrame(np.array(rows[rowindex]).reshape(1, 4), columns=df.columns) + ) # Get possible prescalers. poss_prescalers = possible_prescaler_value() @@ -78,28 +80,36 @@ def close_divisor(psc, tolerance): value = close_divisor(poss_prescalers[index], TOLERANCE) if value is not None: close_prescalers.append((value[0], value[1], value[2], value[3])) -df = df.append(pd.DataFrame(np.array(close_prescalers).reshape(len(close_prescalers), 4), columns=df.columns)) +df = df.append( + pd.DataFrame( + np.array(close_prescalers).reshape(len(close_prescalers), 4), columns=df.columns + ) +) # Adjust PSC and ARR values by -1 to reflect the way you'd code them. -df['PSC'] = df['PSC'] - 1 -df['ARR'] = df['ARR'] - 1 +df["PSC"] = df["PSC"] - 1 +df["ARR"] = df["ARR"] - 1 # Sort first by errors (zeroes and lowest errors at top of list, and # then by prescaler value (ascending). -df = df.sort_values(['ERROR', 'PSC']) +df = df.sort_values(["ERROR", "PSC"]) # Make and populate column indicating if combination is exact. -df['EXACT'] = pd.Series("?", index=df.index) -df['EXACT'] = np.where(df['ERROR'] == 0.0, "YES", "NO") +df["EXACT"] = pd.Series("?", index=df.index) +df["EXACT"] = np.where(df["ERROR"] == 0.0, "YES", "NO") # Format for output. -df['PSC'] = df['PSC'].map('{:.0f}'.format) -df['ARR'] = df['ARR'].map('{:.0f}'.format) -df['F'] = df['F'].map('{:.6f}'.format) -df['ERROR'] = df['ERROR'].map('{:.10f}'.format) +df["PSC"] = df["PSC"].map("{:.0f}".format) +df["ARR"] = df["ARR"].map("{:.0f}".format) +df["F"] = df["F"].map("{:.6f}".format) +df["ERROR"] = df["ERROR"].map("{:.10f}".format) output = df.to_string() print(output) print() -print('these are the ', df.shape[0], ' total combination meeting your tolerance requirement') -exit(0) \ No newline at end of file +print( + "these are the ", + df.shape[0], + " total combination meeting your tolerance requirement", +) +exit(0) diff --git a/python/algos_basics.py b/python/algos_basics.py index 26b8e852..5bc7f749 100644 --- a/python/algos_basics.py +++ b/python/algos_basics.py @@ -13,9 +13,9 @@ def low_rank_inverse(low_rank_matrix, rank=1): u, s, vt = np.linalg.svd(low_rank_matrix, full_matrices=True) - #np.testing.assert_allclose(u.dot(np.diag(s)).dot(vt), low_rank_matrix) - s[rank:] = 0 - s[:rank] = 1/s[:rank] + # np.testing.assert_allclose(u.dot(np.diag(s)).dot(vt), low_rank_matrix) + s[rank:] = 0 + s[:rank] = 1 / s[:rank] s_mat = np.diag(s) inverse = vt.T @ s_mat @ u.T return inverse @@ -23,7 +23,12 @@ def low_rank_inverse(low_rank_matrix, rank=1): def get_mic_delays(mic_positions, azimuth, elevation=None): r0 = mic_positions[0] - return np.array([get_mic_delta(r0, r1, azimuth, elevation) / SPEED_OF_SOUND for r1 in mic_positions]) + return np.array( + [ + get_mic_delta(r0, r1, azimuth, elevation) / SPEED_OF_SOUND + for r1 in mic_positions + ] + ) def get_mic_delays_near(mic_positions, source): @@ -32,7 +37,9 @@ def get_mic_delays_near(mic_positions, source): :param source: source coordinates of shape (2,) """ r0 = mic_positions[0] - return np.array([get_mic_delta_near(r0, r1, source) / SPEED_OF_SOUND for r1 in mic_positions]) + return np.array( + [get_mic_delta_near(r0, r1, source) / SPEED_OF_SOUND for r1 in mic_positions] + ) def get_mic_delta(r0, r1, azimuth, elevation=None): @@ -46,13 +53,19 @@ def get_mic_delta(r0, r1, azimuth, elevation=None): assert len(r0) == 3 if elevation is None: raise ValueError("need to specify elevation for 3D doa.") - s = np.array((np.cos(azimuth) * np.cos(elevation), np.sin(azimuth) * np.cos(elevation), np.sin(elevation))) + s = np.array( + ( + np.cos(azimuth) * np.cos(elevation), + np.sin(azimuth) * np.cos(elevation), + np.sin(elevation), + ) + ) # 2d source elif len(r0) == 2: assert len(r0) == 2 s = np.array((np.cos(azimuth), np.sin(azimuth))) else: - raise ValueError(f'r0 has to be of length 2 or 3, but is {len(r0)}') + raise ValueError(f"r0 has to be of length 2 or 3, but is {len(r0)}") return np.inner(r0 - r1, s) @@ -76,13 +89,28 @@ def get_autocorrelation(signals, frequency_bins=None): """ num_mics = signals.shape[0] if signals.shape[0] > signals.shape[1]: - print("Warning: probably shape mismatch for signals (expecting num_mics in frist dimension)", signals.shape) - - signals_f = np.fft.rfft(signals, n=signals.shape[1], axis=1).T # num_samples x num_mics + print( + "Warning: probably shape mismatch for signals (expecting num_mics in frist dimension)", + signals.shape, + ) + + signals_f = np.fft.rfft( + signals, n=signals.shape[1], axis=1 + ).T # num_samples x num_mics if frequency_bins is None: - Rx = 1 / num_mics * signals_f[:, :, np.newaxis] @ signals_f[:, np.newaxis, :].conj() + Rx = ( + 1 + / num_mics + * signals_f[:, :, np.newaxis] + @ signals_f[:, np.newaxis, :].conj() + ) else: - Rx = 1 / num_mics * signals_f[frequency_bins, :, np.newaxis] @ signals_f[frequency_bins, np.newaxis, :].conj() + Rx = ( + 1 + / num_mics + * signals_f[frequency_bins, :, np.newaxis] + @ signals_f[frequency_bins, np.newaxis, :].conj() + ) return Rx @@ -91,27 +119,32 @@ def get_responses_DAS_old(Rx, mic_positions, omega=None, num_angles=1000): dimension = mic_positions.shape[1] if dimension == 3 and num_angles > 100: - print(f'Warning: number of angles {num_angles} quite high for 3 dimensions.') + print(f"Warning: number of angles {num_angles} quite high for 3 dimensions.") azimuth_array = np.linspace(0, 2 * np.pi, num_angles) if dimension == 3: - #eps = 1e-3 + # eps = 1e-3 eps = 0 # doing -1 here to not confuse dimensions. elevation_array = np.linspace(-np.pi + eps, np.pi - eps, num_angles - 1) - #elevation_array = [0] + # elevation_array = [0] else: elevation_array = np.array([0]) # can use anything as reference - #ref_mic = np.array([10, 10]) + # ref_mic = np.array([10, 10]) ref_mic = mic_positions[0] responses = [] for az in azimuth_array: for el in el_array: h = [ - np.exp(-1j * omega * get_mic_delta(ref_mic, mic_pos, az, el) / SPEED_OF_SOUND) + np.exp( + -1j + * omega + * get_mic_delta(ref_mic, mic_pos, az, el) + / SPEED_OF_SOUND + ) for mic_pos in mic_positions ] @@ -130,7 +163,7 @@ def get_responses_DAS_old(Rx, mic_positions, omega=None, num_angles=1000): dimension = mic_positions.shape[1] if dimension == 3 and num_angles > 100: - print(f'Warning: number of angles {num_angles} quite high for 3 dimensions.') + print(f"Warning: number of angles {num_angles} quite high for 3 dimensions.") azimuth_array = np.linspace(0, 2 * np.pi, num_angles) if dimension == 3: @@ -146,7 +179,12 @@ def get_responses_DAS_old(Rx, mic_positions, omega=None, num_angles=1000): for az in azimuth_array: for elevation in elevation_array: h = [ - np.exp(-1j * omega * get_mic_delta(ref_mic, mic_pos, az, elevation) / SPEED_OF_SOUND) + np.exp( + -1j + * omega + * get_mic_delta(ref_mic, mic_pos, az, elevation) + / SPEED_OF_SOUND + ) for mic_pos in mic_positions ] @@ -183,34 +221,64 @@ def get_doa_DAS(azimuth_array, responses, elevation_array=np.array([0])): if __name__ == "__main__": from mic_array import get_uniform_array + # uniform linear array baseline = 1 # in m mic_number = 5 dimension = 2 - mic_positions_2d = get_uniform_array(mic_number=mic_number, baseline=baseline, dimension=dimension) + mic_positions_2d = get_uniform_array( + mic_number=mic_number, baseline=baseline, dimension=dimension + ) # testing delays for pi_2 and 0 azimuth_array = np.linspace(0, 2 * np.pi, 360) deltas = np.array( - [get_mic_delta(mic_positions_2d[0], mic_positions_2d[1], az) / SPEED_OF_SOUND for az in azimuth_array]) - assert np.allclose(deltas[azimuth_array == np.pi / 2.0], 0), deltas[azimuth_array == np.pi / 2.0] - assert np.allclose(deltas[azimuth_array == -np.pi / 2.0], 0), deltas[azimuth_array == np.pi / 2.0] - assert np.allclose(deltas[azimuth_array == 0], np.max(deltas)), deltas[azimuth_array == 0] - assert np.allclose(deltas[azimuth_array == np.pi], np.min(deltas)), deltas[azimuth_array == np.pi] + [ + get_mic_delta(mic_positions_2d[0], mic_positions_2d[1], az) / SPEED_OF_SOUND + for az in azimuth_array + ] + ) + assert np.allclose(deltas[azimuth_array == np.pi / 2.0], 0), deltas[ + azimuth_array == np.pi / 2.0 + ] + assert np.allclose(deltas[azimuth_array == -np.pi / 2.0], 0), deltas[ + azimuth_array == np.pi / 2.0 + ] + assert np.allclose(deltas[azimuth_array == 0], np.max(deltas)), deltas[ + azimuth_array == 0 + ] + assert np.allclose(deltas[azimuth_array == np.pi], np.min(deltas)), deltas[ + azimuth_array == np.pi + ] # uniform linear array dimension = 3 - mic_positions_2d = get_uniform_array(mic_number=mic_number, baseline=baseline, dimension=dimension) + mic_positions_2d = get_uniform_array( + mic_number=mic_number, baseline=baseline, dimension=dimension + ) # testing delays 0, -pi and pi az = 1 elevation_array = np.linspace(-np.pi, np.pi, 360) deltas = np.array( - [get_mic_delta(mic_positions_2d[0], mic_positions_2d[1], az, el) / SPEED_OF_SOUND for el in elevation_array]) - assert np.allclose(deltas[elevation_array == np.pi / 2.0], 0), deltas[elevation_array == np.pi / 2.0] - assert np.allclose(deltas[elevation_array == -np.pi / 2.0], 0), deltas[elevation_array == np.pi / 2.0] - assert np.allclose(deltas[elevation_array == 0], np.max(deltas)), deltas[elevation_array == 0] - assert np.allclose(deltas[elevation_array == np.pi], np.min(deltas)), deltas[elevation_array == np.pi] + [ + get_mic_delta(mic_positions_2d[0], mic_positions_2d[1], az, el) + / SPEED_OF_SOUND + for el in elevation_array + ] + ) + assert np.allclose(deltas[elevation_array == np.pi / 2.0], 0), deltas[ + elevation_array == np.pi / 2.0 + ] + assert np.allclose(deltas[elevation_array == -np.pi / 2.0], 0), deltas[ + elevation_array == np.pi / 2.0 + ] + assert np.allclose(deltas[elevation_array == 0], np.max(deltas)), deltas[ + elevation_array == 0 + ] + assert np.allclose(deltas[elevation_array == np.pi], np.min(deltas)), deltas[ + elevation_array == np.pi + ] # make sure far and near field converge to the same delays for very far source. baseline = 1 @@ -222,4 +290,4 @@ def get_doa_DAS(azimuth_array, responses, elevation_array=np.array([0])): delta_near = get_mic_delta_near(mic_positions_2d[0], mic_positions_2d[1], source) assert abs(delta_far - delta_near) < 1e-5, (delta_far, delta_near) - print('all tests ok.') + print("all tests ok.") diff --git a/python/algos_beamforming.py b/python/algos_beamforming.py index 4681888b..d601cfe7 100644 --- a/python/algos_beamforming.py +++ b/python/algos_beamforming.py @@ -8,8 +8,9 @@ from algos_basics import get_autocorrelation, get_mic_delays, low_rank_inverse from constants import SPEED_OF_SOUND -INVERSE = 'pinv' -#INVERSE = 'low-rank' +INVERSE = "pinv" +# INVERSE = 'low-rank' + def get_lcmv_beamformer( Rx, frequencies, mic_positions, constraints, rcond=0, lamda=1e-3, elevation=None @@ -32,7 +33,7 @@ def get_lcmv_beamformer( print("Deprecation warning: use get_lcmv_beamformer_fast for better performance.") assert mic_positions.ndim == 2, "watch out, changed argument order!!" - + n_mics = Rx.shape[1] assert mic_positions.shape[0] == n_mics @@ -61,9 +62,9 @@ def get_lcmv_beamformer( # solve the optimization problem Rx_inv = np.linalg.pinv(Rx[j, :, :] + lamda * np.eye(Rx.shape[1]), rcond=rcond) - # (n_constraints x n_mics) (n_mics x n_mics) (n_mics x n_constraints) + # (n_constraints x n_mics) (n_mics x n_mics) (n_mics x n_constraints) # = n_constraints x n_constraints - big_mat = C.conj().T @ Rx_inv @ C + big_mat = C.conj().T @ Rx_inv @ C big_inv = np.linalg.pinv(big_mat, rcond=rcond) H[j, :] = Rx_inv @ C @ big_inv @ c condition_numbers.append(np.linalg.cond(Rx[j, :, :])) @@ -72,7 +73,14 @@ def get_lcmv_beamformer( def get_lcmv_beamformer_fast( - Rx, frequencies, mic_positions, constraints, rcond=0, lamda=1e-3, elevation=None, inverse=INVERSE + Rx, + frequencies, + mic_positions, + constraints, + rcond=0, + lamda=1e-3, + elevation=None, + inverse=INVERSE, ): """ @@ -94,24 +102,26 @@ def get_lcmv_beamformer_fast( # generate constraints C = np.zeros((len(frequencies), n_mics, 0), dtype=np.complex128) c = np.array([c[1] for c in constraints], dtype=np.complex128) - for theta,__ in constraints: + for theta, __ in constraints: delays = get_mic_delays(mic_positions, theta, elevation) - exponent = 2 * np.pi * delays[None, :] * frequencies[:, None] # n_freq x n_mics + exponent = 2 * np.pi * delays[None, :] * frequencies[:, None] # n_freq x n_mics C_row = np.exp(-1j * exponent) - C = np.concatenate([C, C_row[:, :, None]], axis=2) # n_freq x n_mics x n_constr + C = np.concatenate([C, C_row[:, :, None]], axis=2) # n_freq x n_mics x n_constr # solve the optimization problem - if inverse == 'pinv': - Rx_inv = np.linalg.pinv(Rx + lamda * np.eye(Rx.shape[1])[None, :, :], rcond=rcond) # n_freq x n_mics x n_mics + if inverse == "pinv": + Rx_inv = np.linalg.pinv( + Rx + lamda * np.eye(Rx.shape[1])[None, :, :], rcond=rcond + ) # n_freq x n_mics x n_mics - elif inverse == 'low-rank': + elif inverse == "low-rank": Rx_inv = np.empty(Rx.shape, dtype=np.complex) for i in range(Rx.shape[0]): - Rx_inv[i,...] = low_rank_inverse(Rx[i,...], rank=1) + Rx_inv[i, ...] = low_rank_inverse(Rx[i, ...], rank=1) # big_mat should have dimension n_freq x n_constr x n_constr - # (n_freq x n_constr x n_mics) (n_freq x n_mics x n_mics) = n_freq x n_constr x n_mics + # (n_freq x n_constr x n_mics) (n_freq x n_mics x n_mics) = n_freq x n_constr x n_mics # @ n_freq x n_mics x n_constr = n_freq x n_constr x n_constr - big_mat = np.transpose(C.conj(), (0, 2, 1)) @ Rx_inv @ C # + big_mat = np.transpose(C.conj(), (0, 2, 1)) @ Rx_inv @ C # big_inv = np.linalg.inv(big_mat) # n_freq x n_mics x n_mics @ # n_freq x n_mics x n_constr = n_freq x n_mics x n_constr H = Rx_inv @ C @ big_inv @ c @@ -156,9 +166,9 @@ def get_powers(H, Rx): n_mics = Rx.shape[1] assert H.shape[0] == n_frequencies assert H.shape[1] == n_mics - # (n_frequencies x 1 x n_mics) (n_frequencies x n_mics x n_mics) (n_frequencies x n_mics x 1) + # (n_frequencies x 1 x n_mics) (n_frequencies x n_mics x n_mics) (n_frequencies x n_mics x 1) # n_frequencies x 1 x n_mics (n_frequencies x n_mics x 1) - powers = np.abs(H.conj()[:, None, :] @ Rx @ H[:, :, None]).squeeze() + powers = np.abs(H.conj()[:, None, :] @ Rx @ H[:, :, None]).squeeze() return powers diff --git a/python/bin_selection.py b/python/bin_selection.py index 2312ec02..4a07daab 100644 --- a/python/bin_selection.py +++ b/python/bin_selection.py @@ -5,63 +5,282 @@ from crazyflie_description_py.parameters import N_BUFFER, FS, FFTSIZE -def get_frequencies(): - return np.fft.rfftfreq(N_BUFFER, 1/FS) +def get_frequencies(): + return np.fft.rfftfreq(N_BUFFER, 1 / FS) def generate_sweep(key): from crazyflie_description_py.parameters import SOUND_EFFECTS + freqs = get_frequencies() - if key == 'sweep_all': - t_sec = 0.5 # duration of each note in seconds + if key == "sweep_all": + t_sec = 0.5 # duration of each note in seconds bins = list(range(len(freqs))) - elif key == 'sweep_buzzer': + elif key == "sweep_buzzer": # obtained from WallAnalysis.ipynb t_sec = 1.0 - freqs_buzz = [1000.0, 1015.625, 1031.25, 1046.875, 1062.5, 1078.125, 1093.75, 1109.375, 1125.0, 1140.625, 1156.25, 1171.875, 1187.5, 1203.125, 1218.75, 1234.375, 1250.0, 1265.625, 1281.25, 1296.875, 1312.5, 1328.125, 1343.75, 1359.375, 1375.0, 1390.625, 1406.25, 1421.875, 1437.5, 1453.125, 1468.75, 1484.375, 1500.0, 1515.625, 1531.25, 1546.875, 1562.5, 1578.125, 1593.75, 1609.375, 1625.0, 1640.625, 1656.25, 1671.875, 1687.5, 1703.125, 1718.75, 1734.375, 1750.0, 1765.625, 1781.25, 1796.875, 1812.5, 1828.125, 1843.75, 1859.375, 1875.0, 1890.625, 1906.25, 1921.875, 1937.5, 1953.125, 1968.75, 1984.375, 2000.0, 2015.625, 2031.25, 2046.875, 2062.5, 2078.125, 2093.75, 2109.375, 2125.0, 2140.625, 2156.25, 2171.875, 2187.5, 2203.125, 2218.75, 2234.375, 2250.0, 2265.625, 2281.25, 2296.875, 2312.5, 2328.125, 2343.75, 2359.375, 2375.0, 2390.625, 2406.25, 2421.875, 2437.5, 2453.125, 2468.75, 2484.375, 2500.0, 2515.625, 2519.384765625, 2562.451171875, 2627.05078125, 2691.650390625, 2777.783203125, 2885.44921875, 3014.6484375, 3122.314453125, 3186.9140625, 3273.046875, 3380.712890625, 3466.845703125, 3531.4453125, 3574.51171875, 3617.578125, 3682.177734375, 3725.244140625, 3768.310546875, 3811.376953125, 3854.443359375, 3919.04296875, 3962.109375, 4005.17578125, 4048.2421875, 4112.841796875, 4155.908203125, 4220.5078125, 4263.57421875, 4328.173828125, 4371.240234375, 4435.83984375, 4500.439453125, 4565.0390625, 4629.638671875, 4694.23828125, 4758.837890625, 4823.4375, 4909.5703125, 4974.169921875, 5060.302734375, 5124.90234375, 5211.03515625, 5297.16796875, 5383.30078125, 5469.43359375, 5577.099609375, 5663.232421875, 5770.8984375, 5857.03125, 5964.697265625, 6072.36328125, 6201.5625, 6309.228515625, 6438.427734375, 6567.626953125, 6696.826171875, 6847.55859375, 6998.291015625, 7149.0234375, 7299.755859375, 7472.021484375, 7644.287109375, 7816.552734375, 8010.3515625, 8204.150390625, 8419.482421875, 8634.814453125, 8871.6796875, 9130.078125, 9388.4765625, 9668.408203125, 9948.33984375, 10271.337890625, 10594.3359375, 10938.8671875, 11326.46484375, 11735.595703125, 12166.259765625, 12639.990234375, 13135.25390625, 13695.1171875, 14276.513671875, 14922.509765625, 15633.10546875] + freqs_buzz = [ + 1000.0, + 1015.625, + 1031.25, + 1046.875, + 1062.5, + 1078.125, + 1093.75, + 1109.375, + 1125.0, + 1140.625, + 1156.25, + 1171.875, + 1187.5, + 1203.125, + 1218.75, + 1234.375, + 1250.0, + 1265.625, + 1281.25, + 1296.875, + 1312.5, + 1328.125, + 1343.75, + 1359.375, + 1375.0, + 1390.625, + 1406.25, + 1421.875, + 1437.5, + 1453.125, + 1468.75, + 1484.375, + 1500.0, + 1515.625, + 1531.25, + 1546.875, + 1562.5, + 1578.125, + 1593.75, + 1609.375, + 1625.0, + 1640.625, + 1656.25, + 1671.875, + 1687.5, + 1703.125, + 1718.75, + 1734.375, + 1750.0, + 1765.625, + 1781.25, + 1796.875, + 1812.5, + 1828.125, + 1843.75, + 1859.375, + 1875.0, + 1890.625, + 1906.25, + 1921.875, + 1937.5, + 1953.125, + 1968.75, + 1984.375, + 2000.0, + 2015.625, + 2031.25, + 2046.875, + 2062.5, + 2078.125, + 2093.75, + 2109.375, + 2125.0, + 2140.625, + 2156.25, + 2171.875, + 2187.5, + 2203.125, + 2218.75, + 2234.375, + 2250.0, + 2265.625, + 2281.25, + 2296.875, + 2312.5, + 2328.125, + 2343.75, + 2359.375, + 2375.0, + 2390.625, + 2406.25, + 2421.875, + 2437.5, + 2453.125, + 2468.75, + 2484.375, + 2500.0, + 2515.625, + 2519.384765625, + 2562.451171875, + 2627.05078125, + 2691.650390625, + 2777.783203125, + 2885.44921875, + 3014.6484375, + 3122.314453125, + 3186.9140625, + 3273.046875, + 3380.712890625, + 3466.845703125, + 3531.4453125, + 3574.51171875, + 3617.578125, + 3682.177734375, + 3725.244140625, + 3768.310546875, + 3811.376953125, + 3854.443359375, + 3919.04296875, + 3962.109375, + 4005.17578125, + 4048.2421875, + 4112.841796875, + 4155.908203125, + 4220.5078125, + 4263.57421875, + 4328.173828125, + 4371.240234375, + 4435.83984375, + 4500.439453125, + 4565.0390625, + 4629.638671875, + 4694.23828125, + 4758.837890625, + 4823.4375, + 4909.5703125, + 4974.169921875, + 5060.302734375, + 5124.90234375, + 5211.03515625, + 5297.16796875, + 5383.30078125, + 5469.43359375, + 5577.099609375, + 5663.232421875, + 5770.8984375, + 5857.03125, + 5964.697265625, + 6072.36328125, + 6201.5625, + 6309.228515625, + 6438.427734375, + 6567.626953125, + 6696.826171875, + 6847.55859375, + 6998.291015625, + 7149.0234375, + 7299.755859375, + 7472.021484375, + 7644.287109375, + 7816.552734375, + 8010.3515625, + 8204.150390625, + 8419.482421875, + 8634.814453125, + 8871.6796875, + 9130.078125, + 9388.4765625, + 9668.408203125, + 9948.33984375, + 10271.337890625, + 10594.3359375, + 10938.8671875, + 11326.46484375, + 11735.595703125, + 12166.259765625, + 12639.990234375, + 13135.25390625, + 13695.1171875, + 14276.513671875, + 14922.509765625, + 15633.10546875, + ] bins = [np.argmin(np.abs(freqs - f)) for f in freqs_buzz] - elif key == 'sweep_hard': - t_sec = 0.5 # duration of each note in seconds + elif key == "sweep_hard": + t_sec = 0.5 # duration of each note in seconds # obtained from WallAnalysis.ipynb - freqs_hard = [1171.875, 1234.375, 1390.625, 3015.625, 3125.0, 3531.25, 3687.5, 4156.25, 4265.625, 5125.0, 5218.75, 6437.5, 6687.5, 8000.0, 8421.875, 10937.5, 13140.625, 14921.875] + freqs_hard = [ + 1171.875, + 1234.375, + 1390.625, + 3015.625, + 3125.0, + 3531.25, + 3687.5, + 4156.25, + 4265.625, + 5125.0, + 5218.75, + 6437.5, + 6687.5, + 8000.0, + 8421.875, + 10937.5, + 13140.625, + 14921.875, + ] bins = [np.argmin(np.abs(freqs - f)) for f in freqs_hard] - elif key == 'sweep_slow': - t_sec = 2.5 # duration of each frequency + elif key == "sweep_slow": + t_sec = 2.5 # duration of each frequency freqs_hard = [1750, 2375, 3125, 3875] bins = [np.argmin(np.abs(freqs - f)) for f in freqs_hard] - elif key == 'sweep_fast': - t_sec = 0.5 # duration of each frequency + elif key == "sweep_fast": + t_sec = 0.5 # duration of each frequency freqs_hard = [1750, 2375, 3125, 3875] bins = [np.argmin(np.abs(freqs - f)) for f in freqs_hard] - elif key == 'sweep_short': - t_sec = 1.0 # duration of each frequency - min_freq, max_freq = SOUND_EFFECTS[key][1] - bins = select_frequencies(min_freq=min_freq, max_freq=max_freq, n_freqs=int(FFTSIZE//2)) + elif key == "sweep_short": + t_sec = 1.0 # duration of each frequency + min_freq, max_freq = SOUND_EFFECTS[key][1] + bins = select_frequencies( + min_freq=min_freq, max_freq=max_freq, n_freqs=int(FFTSIZE // 2) + ) else: - t_sec = 1.0 # duration of each note in seconds - min_freq, max_freq = SOUND_EFFECTS[key][1] + t_sec = 1.0 # duration of each note in seconds + min_freq, max_freq = SOUND_EFFECTS[key][1] bins = select_frequencies(min_freq=min_freq, max_freq=max_freq) return bins, t_sec -def select_frequencies(n_buffer=N_BUFFER, fs=FS, thrust=0, min_freq=100, max_freq=10000, filter_snr=False, buffer_f=None, delta_freq=100, ax=None, verbose=False, n_freqs=FFTSIZE): +def select_frequencies( + n_buffer=N_BUFFER, + fs=FS, + thrust=0, + min_freq=100, + max_freq=10000, + filter_snr=False, + buffer_f=None, + delta_freq=100, + ax=None, + verbose=False, + n_freqs=FFTSIZE, +): freq = get_frequencies() n_frequencies = len(freq) df = fs / n_buffer - np.testing.assert_allclose(freq, df*np.arange(n_buffer//2 + 1)) + np.testing.assert_allclose(freq, df * np.arange(n_buffer // 2 + 1)) min_index = int(round(min_freq * n_buffer / fs)) max_index = int(round(max_freq * n_buffer / fs)) min_idx = np.argmin(np.abs(freq - min_freq)) max_idx = np.argmin(np.abs(freq - max_freq)) - assert min_index == min_idx, (min_idx, min_index, min_index * df, freq[min_idx]) - assert max_index == max_idx, (max_idx, max_index, max_index * df, freq[max_idx]) + assert min_index == min_idx, (min_idx, min_index, min_index * df, freq[min_idx]) + assert max_index == max_idx, (max_idx, max_index, max_index * df, freq[max_idx]) assert freq[min_idx] == df * min_index assert freq[max_idx] == df * max_index - assert max_index < n_frequencies, f"given max frequency {max_freq}Hz too high for sampling frequency {fs}Hz" + assert ( + max_index < n_frequencies + ), f"given max frequency {max_freq}Hz too high for sampling frequency {fs}Hz" potential_indices = [] @@ -76,23 +295,23 @@ def select_frequencies(n_buffer=N_BUFFER, fs=FS, thrust=0, min_freq=100, max_fre # if this frequency is not in propellers, add it to potential bins. if thrust > 0: for j in range(len(prop_factors)): - if (abs((i * df) - (prop_factors[j] * prop_freq)) < delta_freq): + if abs((i * df) - (prop_factors[j] * prop_freq)) < delta_freq: if ax is not None: - ax.scatter(i * df, 0, color='red') + ax.scatter(i * df, 0, color="red") use_this = False break if use_this: potential_indices.append(i) if verbose: - print(f'selecting {FFTSIZE} from {len(potential_indices)}') + print(f"selecting {FFTSIZE} from {len(potential_indices)}") if not potential_indices: print(f"Warning: did not find any potential indices. using min_freq={min_freq}") potential_indices = [min_index] - # Select indices based on snr or uniformly. + # Select indices based on snr or uniformly. selected_indices = [] - if not filter_snr: # choose uniformly every k-th bin + if not filter_snr: # choose uniformly every k-th bin if len(potential_indices) > n_freqs: decimation = float(len(potential_indices) / n_freqs) for i in range(n_freqs): @@ -101,28 +320,32 @@ def select_frequencies(n_buffer=N_BUFFER, fs=FS, thrust=0, min_freq=100, max_fre else: selected_indices = potential_indices - else: # choose the highest K amplitude bins. + else: # choose the highest K amplitude bins. # C-like structure to be used in qsort. signals_amp_list = [] for i in potential_indices: sum_ = 0 - for j in range(buffer_f.shape[0]): # buffer_f is of shape n_mics x n_bins_ + for j in range(buffer_f.shape[0]): # buffer_f is of shape n_mics x n_bins_ sum_ += np.abs(buffer_f[j, i]) - struct = {'amplitude': sum_, 'index': i} + struct = {"amplitude": sum_, "index": i} signals_amp_list.append(struct) # below would be replaced by qsort - sorted_signals_amp_list = sorted(signals_amp_list, key=lambda elem: elem['amplitude'])[::-1] + sorted_signals_amp_list = sorted( + signals_amp_list, key=lambda elem: elem["amplitude"] + )[::-1] for i in range(n_freqs): - # thanks to - elements = sorted_signals_amp_list[i:i+1] - selected_indices += [e['index'] for e in elements] + # thanks to + elements = sorted_signals_amp_list[i : i + 1] + selected_indices += [e["index"] for e in elements] if len(selected_indices) < n_freqs: if verbose: print("Warning: found less indices than asked for. Filling with zeros") - selected_indices = np.r_[selected_indices, [0] * (n_freqs - len(selected_indices))] + selected_indices = np.r_[ + selected_indices, [0] * (n_freqs - len(selected_indices)) + ] assert len(selected_indices) == n_freqs, len(selected_indices) return selected_indices diff --git a/python/bin_selection_research.py b/python/bin_selection_research.py index 90b01aa0..7f82d6d1 100644 --- a/python/bin_selection_research.py +++ b/python/bin_selection_research.py @@ -41,10 +41,14 @@ ) +signals_props, signals_source, signals_all = read_recordings( + dir_names[1], + loudness=loudnesses[1], + gt_degrees=gt_degrees_list[1], + source=sources[1], +) -signals_props, signals_source, signals_all = read_recordings( dir_names[1], loudness=loudnesses[1], gt_degrees=gt_degrees_list[1], source=sources[1], ) - -#signals_props = np.transpose(signals_props) +# signals_props = np.transpose(signals_props) f = 32000 @@ -53,16 +57,20 @@ om_0 = 3.27258551 * np.sqrt(thrust) - 26.41814899 om_0s = om_0 * np.append([0.5, 1, 1.5], np.arange(2, 27, 1)) -print(f'Number of harmonics {len(om_0s)}') +print(f"Number of harmonics {len(om_0s)}") NSamples = 1024 for signal in signals_props: freq = np.fft.rfftfreq(NSamples, 1 / f) Y = np.fft.rfft(signal[0:NSamples]) / NSamples - #plt.plot(range(len(freq)), np.abs(Y) , label = f"{dir_names[1]} {loudnesses[1]} {gt_degrees_list[1]} {sources[1]}") - plt.plot(freq, np.abs(Y) , label = f"{dir_names[1]} {loudnesses[1]} {gt_degrees_list[1]} {sources[1]}") - -#print(freq) + # plt.plot(range(len(freq)), np.abs(Y) , label = f"{dir_names[1]} {loudnesses[1]} {gt_degrees_list[1]} {sources[1]}") + plt.plot( + freq, + np.abs(Y), + label=f"{dir_names[1]} {loudnesses[1]} {gt_degrees_list[1]} {sources[1]}", + ) + +# print(freq) bins_basic_candidates = np.ones(np.size(freq)) DELTA_F_PROP = 100 @@ -71,18 +79,20 @@ hi_pass_f = 100 lo_pass_f = 10000 -lo_pass_index = lo_pass_f/(f/NSamples) -hi_pass_index = hi_pass_f/(f/NSamples) +lo_pass_index = lo_pass_f / (f / NSamples) +hi_pass_index = hi_pass_f / (f / NSamples) -print(f/NSamples) +print(f / NSamples) -print(f'hi_pass_index = {hi_pass_index } ({hi_pass_f}), lo_pass_index = {lo_pass_index } ({lo_pass_f})') +print( + f"hi_pass_index = {hi_pass_index } ({hi_pass_f}), lo_pass_index = {lo_pass_index } ({lo_pass_f})" +) # Remove propeler sound +- delta f for i in range(np.size(freq)): in_prop_range = 0 for om_0_i in om_0s: - if( np.abs((freq[i] - om_0_i )) < DELTA_F_PROP): + if np.abs((freq[i] - om_0_i)) < DELTA_F_PROP: in_prop_range = 1 break if in_prop_range: @@ -90,41 +100,41 @@ # Remove Hi-pass and low-pass candidates for i in range(np.size(freq)): - if((freq[i] < hi_pass_f) | (freq[i] > lo_pass_f )): + if (freq[i] < hi_pass_f) | (freq[i] > lo_pass_f): bins_basic_candidates[i] = 0 for i in range(np.size(freq)): - print(f'i = {i}: bins_basic_candidates = {bins_basic_candidates[i]}') + print(f"i = {i}: bins_basic_candidates = {bins_basic_candidates[i]}") # Count candidate that are still available sum_candidate = 0 for i in range(np.size(bins_basic_candidates)): - if(bins_basic_candidates[i] == 1): + if bins_basic_candidates[i] == 1: sum_candidate += 1 -# Backup +# Backup bins_snr_avoid_prop = np.zeros(np.shape(bins_basic_candidates)) bins_uniforme_avoid_prop = np.zeros(np.shape(bins_basic_candidates)) ## samples uniformly among remaining candidates decimation = int(np.ceil(sum_candidate / FFTSIZE_SENT)) + 1 -print(f'candidates = {sum_candidate}, decimation = {decimation}') +print(f"candidates = {sum_candidate}, decimation = {decimation}") number_selected_candidates = 0 selected_bins = np.zeros(FFTSIZE_SENT) for i in range(np.size(freq)): - if((bins_basic_candidates[i] == 1)): - + if bins_basic_candidates[i] == 1: + sum = 0 # check if any of the previous decimation bits where 1 for j in range(decimation): - sum += bins_uniforme_avoid_prop[i - j] + sum += bins_uniforme_avoid_prop[i - j] # if previous samples where 0, then ok to place a one - if( (sum == 0) & (number_selected_candidates < FFTSIZE_SENT)): + if (sum == 0) & (number_selected_candidates < FFTSIZE_SENT): bins_uniforme_avoid_prop[i] = 1 - selected_bins [number_selected_candidates] = i + selected_bins[number_selected_candidates] = i number_selected_candidates += 1 else: bins_uniforme_avoid_prop[i] = 0 @@ -135,15 +145,15 @@ sum = 0 for j in range(len(bins_uniforme_avoid_prop)): - sum += bins_uniforme_avoid_prop[j] -print(f'Final chosen bins_uniforme_avoid_prop count is: {number_selected_candidates} ') + sum += bins_uniforme_avoid_prop[j] +print(f"Final chosen bins_uniforme_avoid_prop count is: {number_selected_candidates} ") -#plt.scatter(range(len(freq)), + 1000 * bins_basic_candidates, label = 'bins_basic_candidates') -#plt.scatter(range(len(freq)), + 600 * bins_uniforme_avoid_prop, label = 'bins_uniforme_avoid_prop') -plt.scatter(freq, + 1000 * bins_basic_candidates, label = 'bins_basic_candidates') -plt.scatter(freq, + 600 * bins_uniforme_avoid_prop, label = 'bins_uniforme_avoid_prop') +# plt.scatter(range(len(freq)), + 1000 * bins_basic_candidates, label = 'bins_basic_candidates') +# plt.scatter(range(len(freq)), + 600 * bins_uniforme_avoid_prop, label = 'bins_uniforme_avoid_prop') +plt.scatter(freq, +1000 * bins_basic_candidates, label="bins_basic_candidates") +plt.scatter(freq, +600 * bins_uniforme_avoid_prop, label="bins_uniforme_avoid_prop") plt.legend() -plt.autoscale(enable = True, axis = 'x', tight = True) +plt.autoscale(enable=True, axis="x", tight=True) plt.show() diff --git a/python/convert_memory_dump.py b/python/convert_memory_dump.py index ca8dcdae..b729f0a6 100644 --- a/python/convert_memory_dump.py +++ b/python/convert_memory_dump.py @@ -4,22 +4,22 @@ datadir = "recordings_14_7_20/external_source_and_propellers/20deg/" fs = 32000 -dt = 1/fs -left_2 = np.fromfile(datadir + 'left_2', np.int16) +dt = 1 / fs +left_2 = np.fromfile(datadir + "left_2", np.int16) np.save(datadir + "front_right", left_2) -left_3 = np.fromfile(datadir + 'left_3', np.int16) -np.save(datadir+"back_right",left_3) +left_3 = np.fromfile(datadir + "left_3", np.int16) +np.save(datadir + "back_right", left_3) -right_2 = np.fromfile(datadir+'right_2',np.int16) -np.save(datadir+"front_left",right_2) +right_2 = np.fromfile(datadir + "right_2", np.int16) +np.save(datadir + "front_left", right_2) -right_3 = np.fromfile(datadir+'right_3',np.int16) -np.save(datadir+"back_left",right_3) +right_3 = np.fromfile(datadir + "right_3", np.int16) +np.save(datadir + "back_left", right_3) # t = np.arange(0,right_3.size/fs,dt) # plt.plot(t, left_2) # plt.plot(t,right_2) # plt.plot(t,left_3) # plt.plot(t,right_3) -# plt.show() \ No newline at end of file +# plt.show() diff --git a/python/convert_memory_dump_dma.py b/python/convert_memory_dump_dma.py index 554a1b5e..5508232a 100644 --- a/python/convert_memory_dump_dma.py +++ b/python/convert_memory_dump_dma.py @@ -4,21 +4,21 @@ datadir = "recordings_14_7_20/test_2/" fs = 32000 -dt = 1/fs -dma_2 = np.fromfile(datadir+'dma_2',np.int16) +dt = 1 / fs +dma_2 = np.fromfile(datadir + "dma_2", np.int16) left2 = dma_2[0:-1:2] right2 = dma_2[1:-2:2] -np.save(datadir+"front_right",left2) -np.save(datadir+"front_left",right2) +np.save(datadir + "front_right", left2) +np.save(datadir + "front_left", right2) -dma_3 = np.fromfile(datadir+'dma_3',np.float16) +dma_3 = np.fromfile(datadir + "dma_3", np.float16) left3 = dma_3[0:-1:2] right3 = dma_3[1:-1:2] -np.save(datadir+"back_right",left3) -np.save(datadir+"back_left",right2) +np.save(datadir + "back_right", left3) +np.save(datadir + "back_left", right2) # t = np.arange(0,left2.size/fs,dt) # plt.plot(t, left2/65535,'.-') -# plt.show() \ No newline at end of file +# plt.show() diff --git a/python/file_parser.py b/python/file_parser.py index 890caa9f..37214d96 100644 --- a/python/file_parser.py +++ b/python/file_parser.py @@ -20,7 +20,7 @@ "front_right.npy", # 3, should be 3 ] # Note that these positions were not very accurate -MEMS_MIC_POSITIONS = get_square_array(baseline=0.11, delta=0) +MEMS_MIC_POSITIONS = get_square_array(baseline=0.11, delta=0) MEMS_SOURCE_DISTANCE = 1.925 # These positions were more accurate @@ -31,28 +31,54 @@ SIM_SOURCE_DISTANCE = 1.925 parameters = { - "recordings_9_7_20": {"Fs": 32000, "time_index": 100, "mic_positions": MEMS_MIC_POSITIONS, "source_distance": MEMS_SOURCE_DISTANCE}, - "recordings_14_7_20": {"Fs": 32000, "time_index": 100, "mic_positions": MEMS_MIC_POSITIONS, "source_distance": MEMS_SOURCE_DISTANCE}, - "recordings_16_7_20": {"Fs": 48000, "time_index": 42000, "mic_positions": MEAS_MIC_POSITIONS, "source_distance": MEAS_SOURCE_DISTANCE}, - "analytical": {"Fs": 44100, "time_index": 0, "mic_positions": SIM_MIC_POSITIONS, "source_distance": SIM_SOURCE_DISTANCE}, - "pyroomacoustics": {"Fs": 44100, "time_index": 400, "mic_positions": SIM_MIC_POSITIONS, "source_distance": SIM_SOURCE_DISTANCE} + "recordings_9_7_20": { + "Fs": 32000, + "time_index": 100, + "mic_positions": MEMS_MIC_POSITIONS, + "source_distance": MEMS_SOURCE_DISTANCE, + }, + "recordings_14_7_20": { + "Fs": 32000, + "time_index": 100, + "mic_positions": MEMS_MIC_POSITIONS, + "source_distance": MEMS_SOURCE_DISTANCE, + }, + "recordings_16_7_20": { + "Fs": 48000, + "time_index": 42000, + "mic_positions": MEAS_MIC_POSITIONS, + "source_distance": MEAS_SOURCE_DISTANCE, + }, + "analytical": { + "Fs": 44100, + "time_index": 0, + "mic_positions": SIM_MIC_POSITIONS, + "source_distance": SIM_SOURCE_DISTANCE, + }, + "pyroomacoustics": { + "Fs": 44100, + "time_index": 400, + "mic_positions": SIM_MIC_POSITIONS, + "source_distance": SIM_SOURCE_DISTANCE, + }, } - def read_all(dir_name, Fs=None, verbose=False, fnames=FILENAMES): n_times = None for i, fname in enumerate(fnames): fullname = f"{dir_name}/{fname}" - if fullname.split('.')[-1] == "npy": - # Unfortunately we cannot check that the sampling rate matches for + if fullname.split(".")[-1] == "npy": + # Unfortunately we cannot check that the sampling rate matches for # numpy arrays as it is not saved. signal0 = np.load(fullname) - elif fullname.split('.')[-1] == "wav": + elif fullname.split(".")[-1] == "wav": Fs_new, signal0 = read(fullname) # Check that the sampling rate matches for wav files. if Fs is not None: - assert Fs == Fs_new, f"Fs {Fs_new} of file {fullname} does not match {Fs}" + assert ( + Fs == Fs_new + ), f"Fs {Fs_new} of file {fullname} does not match {Fs}" Fs = Fs_new if verbose: print("read", fullname) @@ -70,7 +96,10 @@ def read_recording_9_7_20(loudness="high", gt_degrees=0, verbose=False, type_="p elif type_ == "source": dir_name = root_dir + f"recordings_9_7_20/200Hz/{loudness}_sound/without_prop/" elif type_ == "all": - dir_name = root_dir + f"recordings_9_7_20/200Hz/{loudness}_sound/with_prop/{gt_degrees}_deg/" + dir_name = ( + root_dir + + f"recordings_9_7_20/200Hz/{loudness}_sound/with_prop/{gt_degrees}_deg/" + ) Fs = parameters["recordings_9_7_20"] return read_all(dir_name, Fs, verbose) @@ -81,7 +110,10 @@ def read_recording_14_7_20(gt_degrees=0, verbose=False, type_="props"): elif type_ == "source": dir_name = root_dir + f"/recordings_14_7_20/external_source_only/" elif type_ == "all": - dir_name = root_dir + f"/recordings_14_7_20/external_source_and_propellers/{gt_degrees}deg/" + dir_name = ( + root_dir + + f"/recordings_14_7_20/external_source_and_propellers/{gt_degrees}deg/" + ) Fs = parameters["recordings_14_7_20"] return read_all(dir_name, Fs, verbose) @@ -89,7 +121,9 @@ def read_recording_14_7_20(gt_degrees=0, verbose=False, type_="props"): def read_recordings_9_7_20(loudness="high", gt_degrees=0, verbose=False): signals_props = read_recording_9_7_20(loudness, gt_degrees, verbose, type_="props") - signals_source = read_recording_9_7_20(loudness, gt_degrees, verbose, type_="source") + signals_source = read_recording_9_7_20( + loudness, gt_degrees, verbose, type_="source" + ) signals_all = read_recording_9_7_20(loudness, gt_degrees, verbose, type_="all") return signals_props, signals_source, signals_all @@ -117,14 +151,28 @@ def read_recording_16_7_20(fname, verbose): return signals -def read_recordings_16_7_20(loudness="high", gt_degrees=0, source="white_noise", verbose=False): +def read_recordings_16_7_20( + loudness="high", gt_degrees=0, source="white_noise", verbose=False +): file_props = root_dir + "/recordings_16_7_20/propellers_only/all.npy" if source == "white_noise": - file_source = root_dir + f"/recordings_16_7_20/white_noise/{loudness}/{gt_degrees}deg/wn_only.npy" - file_all = root_dir + f"/recordings_16_7_20/white_noise/{loudness}/{gt_degrees}deg/wn_and_props.npy" + file_source = ( + root_dir + + f"/recordings_16_7_20/white_noise/{loudness}/{gt_degrees}deg/wn_only.npy" + ) + file_all = ( + root_dir + + f"/recordings_16_7_20/white_noise/{loudness}/{gt_degrees}deg/wn_and_props.npy" + ) elif source == "200Hz": - file_source = root_dir + f"/recordings_16_7_20/200Hz/{loudness}/{gt_degrees}deg/200Hz_only.npy" - file_all = root_dir + f"/recordings_16_7_20/200Hz/{loudness}/{gt_degrees}deg/200Hz_and_props.npy" + file_source = ( + root_dir + + f"/recordings_16_7_20/200Hz/{loudness}/{gt_degrees}deg/200Hz_only.npy" + ) + file_all = ( + root_dir + + f"/recordings_16_7_20/200Hz/{loudness}/{gt_degrees}deg/200Hz_and_props.npy" + ) else: raise ValueError(source) @@ -132,9 +180,7 @@ def read_recordings_16_7_20(loudness="high", gt_degrees=0, source="white_noise", signals_source = read_recording_16_7_20(file_source, verbose) signals_all = read_recording_16_7_20(file_all, verbose) n_times = min( - signals_props.shape[1], - signals_all.shape[1], - signals_source.shape[1], + signals_props.shape[1], signals_all.shape[1], signals_source.shape[1], ) signals_props = signals_props[:, -n_times:] signals_source = signals_source[:, -n_times:] diff --git a/python/mic_array.py b/python/mic_array.py index 85c990a1..e3ebc53e 100644 --- a/python/mic_array.py +++ b/python/mic_array.py @@ -42,8 +42,14 @@ def get_tetrahedron_array(dimension=2, baseline=0.05): if not dimension in (2, 3): raise ValueError(dimension) - tetrahedron = np.array([[baseline * np.sqrt(3) / 3, 0, 0], [-baseline * np.sqrt(3) / 6, baseline / 2., 0], - [-baseline * np.sqrt(3) / 6, -baseline / 2., 0], [0, 0, baseline * np.sqrt(6) / 3]]) + tetrahedron = np.array( + [ + [baseline * np.sqrt(3) / 3, 0, 0], + [-baseline * np.sqrt(3) / 6, baseline / 2.0, 0], + [-baseline * np.sqrt(3) / 6, -baseline / 2.0, 0], + [0, 0, baseline * np.sqrt(6) / 3], + ] + ) assert tetrahedron.shape[0] == 4 assert tetrahedron.shape[1] == 3 if dimension == 2: @@ -56,9 +62,12 @@ def get_tetrahedron_array(dimension=2, baseline=0.05): def ambiguity_test(mic_positions, freq): delta = get_min_distance(mic_positions) lamda = SPEED_OF_SOUND / freq - print(f'signal wavelength/2: {lamda/2:.1e}, minimum distance between mics {delta:.1e}') print( - f'signal wavelength is {lamda/delta:.1f} times bigger than minimum mic distance. (Make sure it is more than 1)') + f"signal wavelength/2: {lamda/2:.1e}, minimum distance between mics {delta:.1e}" + ) + print( + f"signal wavelength is {lamda/delta:.1f} times bigger than minimum mic distance. (Make sure it is more than 1)" + ) if __name__ == "__main__": @@ -69,14 +78,22 @@ def ambiguity_test(mic_positions, freq): num_mics = 5 for dimension in [2, 3]: - mic_positions = get_uniform_array(num_mics, baseline=baseline, dimension=dimension) + mic_positions = get_uniform_array( + num_mics, baseline=baseline, dimension=dimension + ) min_dist = get_min_distance(mic_positions) min_dist_theo = baseline / (num_mics - 1) - assert abs(min_dist - min_dist_theo) < eps, f'{dimension}D test failed for uniform: {min_dist}!={min_dist_theo}' + assert ( + abs(min_dist - min_dist_theo) < eps + ), f"{dimension}D test failed for uniform: {min_dist}!={min_dist_theo}" mic_positions = get_tetrahedron_array(baseline=baseline, dimension=dimension) min_dist = get_min_distance(mic_positions) - assert abs(min_dist - baseline) < eps, f'{dimension}D test failed for tetra: {min_dist}!={baseline}' + assert ( + abs(min_dist - baseline) < eps + ), f"{dimension}D test failed for tetra: {min_dist}!={baseline}" max_dist = get_max_distance(mic_positions) - assert abs(max_dist - baseline) < eps, f'{dimension}D test failed for tetra: {max_dist}!={baseline}' - print('all tests ok.') + assert ( + abs(max_dist - baseline) < eps + ), f"{dimension}D test failed for tetra: {max_dist}!={baseline}" + print("all tests ok.") diff --git a/python/play_and_record.py b/python/play_and_record.py index e45e1d5a..8c777b3a 100644 --- a/python/play_and_record.py +++ b/python/play_and_record.py @@ -7,66 +7,74 @@ from signals import generate_signal, amplify_signal -FS = 44100 # sampling frequency in Hz -N_MICS = 1 # number of mics -DURATION = 10 # duration of recording in seconds -MIN_DB = -60 # loudness, dB +FS = 44100 # sampling frequency in Hz +N_MICS = 1 # number of mics +DURATION = 10 # duration of recording in seconds +MIN_DB = -60 # loudness, dB MAX_DB = -10 IN_FILE = None -FREQ = 4100 #440 # Hz -#SIGNAL_TYPE = "mono" +FREQ = 4100 # 440 # Hz +# SIGNAL_TYPE = "mono" SIGNAL_TYPE = "mono" -#SIGNAL_TYPE = "random" -#SIGNAL_TYPE = "random_linear" -#SIGNAL_TYPE = "mono_linear" -#SIGNAL_TYPE = "sweep" -#SIGNAL_TYPE = "real"; IN_FILE = "../data/propellers/44000.wav" +# SIGNAL_TYPE = "random" +# SIGNAL_TYPE = "random_linear" +# SIGNAL_TYPE = "mono_linear" +# SIGNAL_TYPE = "sweep" +# SIGNAL_TYPE = "real"; IN_FILE = "../data/propellers/44000.wav" def get_usb_soundcard_ubuntu(fs=FS, n_mics=N_MICS): import sounddevice as sd # Sound card selection: input, output - name = 'UAC-2' + name = "UAC-2" sd.default.device = name, name # os-specific - sd.default.samplerate = fs + sd.default.samplerate = fs sd.default.dtype = np.float32 sd.default.channels = n_mics, 1 return sd -if __name__ == '__main__': +if __name__ == "__main__": np.random.seed(1) - signal = generate_signal(FS, duration_sec=DURATION, signal_type=SIGNAL_TYPE, frequency_hz=FREQ, min_dB=MIN_DB, max_dB=MAX_DB, fname=IN_FILE) + signal = generate_signal( + FS, + duration_sec=DURATION, + signal_type=SIGNAL_TYPE, + frequency_hz=FREQ, + min_dB=MIN_DB, + max_dB=MAX_DB, + fname=IN_FILE, + ) out_file = f"{SIGNAL_TYPE}" sd = get_usb_soundcard_ubuntu() - print('using devices:') + print("using devices:") print(sd.query_devices()) try: - print('start recording') + print("start recording") recording = sd.playrec(signal, blocking=True) except: raise - #print('sleeping...') - #time.sleep(DURATION) - print('status (empty is ok):', sd.get_status()) + # print('sleeping...') + # time.sleep(DURATION) + print("status (empty is ok):", sd.get_status()) # convert 64 to 32 float recording_float32 = recording.astype(np.float32) - wavfile.write(f'{out_file}_scipy.wav', FS, recording_float32) + wavfile.write(f"{out_file}_scipy.wav", FS, recording_float32) - with wave.open(f'{out_file}.wav', 'w') as f: + with wave.open(f"{out_file}.wav", "w") as f: f.setnchannels(N_MICS) f.setframerate(FS) - f.setsampwidth(4) # number of bytes for int32 - recording_int32 = (recording_float32 * (2**31 - 1)).astype(np.int32) + f.setsampwidth(4) # number of bytes for int32 + recording_int32 = (recording_float32 * (2 ** 31 - 1)).astype(np.int32) f.writeframes(recording_int32) - np.save(f'{out_file}.npy', recording_float32) - print(f'wrote files as {out_file}.wav, {out_file}_scipy.wav and {out_file}.npy.') + np.save(f"{out_file}.npy", recording_float32) + print(f"wrote files as {out_file}.wav, {out_file}_scipy.wav and {out_file}.npy.") diff --git a/python/play_and_record_with_thrust.py b/python/play_and_record_with_thrust.py index 327aacf9..b7d8a0dd 100644 --- a/python/play_and_record_with_thrust.py +++ b/python/play_and_record_with_thrust.py @@ -14,36 +14,39 @@ # Sound card selection # sd.default.device = 'ASIO4ALL v2' # sd.default.device = 'Yamaha Steinberg USB ASIO' -sd.default.device = 'M Audio M-Track Eight ASIO' +sd.default.device = "M Audio M-Track Eight ASIO" fs = 48000 sd.default.samplerate = fs sd.default.channels = 4 -id = 'radio://0/69/2M' +id = "radio://0/69/2M" -def create_white_noise(fs,duration): +def create_white_noise(fs, duration): wn = np.random.rand(fs * duration) return wn -def create_sinus(fs,f,duration): - dt = 1/fs - t = np.arange(0, duration-dt, dt) + +def create_sinus(fs, f, duration): + dt = 1 / fs + t = np.arange(0, duration - dt, dt) sin = np.sin(2 * np.pi * f * t) return sin -def set_thrust(cf,thrust): - thrust_str = '%d' % thrust - cf.param.set_value('motorPowerSet.m4', thrust_str) - cf.param.set_value('motorPowerSet.m1', thrust_str) - cf.param.set_value('motorPowerSet.m2', thrust_str) - cf.param.set_value('motorPowerSet.m3', thrust_str) - cf.param.set_value('motorPowerSet.enable', '1') - -if __name__ == '__main__': - duration = 10 #sec - #signal = create_white_noise(fs, duration) + +def set_thrust(cf, thrust): + thrust_str = "%d" % thrust + cf.param.set_value("motorPowerSet.m4", thrust_str) + cf.param.set_value("motorPowerSet.m1", thrust_str) + cf.param.set_value("motorPowerSet.m2", thrust_str) + cf.param.set_value("motorPowerSet.m3", thrust_str) + cf.param.set_value("motorPowerSet.enable", "1") + + +if __name__ == "__main__": + duration = 10 # sec + # signal = create_white_noise(fs, duration) signal = create_sinus(fs, 200, duration) # Initialize the low-level drivers (don't list the debug drivers) @@ -56,7 +59,7 @@ def set_thrust(cf,thrust): # recording = sd.rec(duration * fs, blocking=True) #uncomment if no sound should be played recording = sd.playrec(signal, blocking=True) - cf.param.set_value('motorPowerSet.enable', '0') + cf.param.set_value("motorPowerSet.enable", "0") time.sleep(1) np.save(record_dir, recording) diff --git a/python/pwm_tuning.py b/python/pwm_tuning.py index 59d601a7..00fa57f3 100644 --- a/python/pwm_tuning.py +++ b/python/pwm_tuning.py @@ -2,7 +2,7 @@ import pandas as pd TARGET_F = 3250 # In Hz so 50.0 is 0.020 seconds period and 0.25 is 4 seconds period -CLOCK_MCU = 84000000 # corresponds to ca. 329500 * 255 +CLOCK_MCU = 84000000 # corresponds to ca. 329500 * 255 TOLERANCE = 0.0001 PERIOD = 255 PSC = 1 @@ -32,9 +32,10 @@ def add_exact_period(prescaler, target_f=TARGET_F): period = CLOCK_MCU / (target_f * prescaler) if period == int(period): return np.array([prescaler, period, target_f, 0.0]).reshape((1, 4)) - else: + else: return None + def possible_prescaler_value(target_f=TARGET_F): possible_prescalers = [] exact_prescalers = perfect_divisors() @@ -57,9 +58,10 @@ def close_divisor(psc, tolerance, target_f=TARGET_F): else: return None + def print_errors(): # Make a dataframe to hold results as we compute them - df = pd.DataFrame(columns=['PSC', 'ARR', 'F', 'ERROR'], dtype=np.double) + df = pd.DataFrame(columns=["PSC", "ARR", "F", "ERROR"], dtype=np.double) # Get exact prescalers first. exact_prescalers = perfect_divisors() @@ -77,60 +79,63 @@ def print_errors(): df = df.append(pd.DataFrame(row, columns=df.columns)) # Adjust PSC and ARR values by -1 to reflect the way you'd code them. - df['PSC'] = df['PSC'] - 1 - df['ARR'] = df['ARR'] - 1 + df["PSC"] = df["PSC"] - 1 + df["ARR"] = df["ARR"] - 1 # Sort first by errors (zeroes and lowest errors at top of list, and # then by prescaler value (ascending). - df = df.sort_values(['ERROR', 'PSC']) + df = df.sort_values(["ERROR", "PSC"]) # Make and populate column indicating if combination is exact. - df['EXACT'] = pd.Series("?", index=df.index) - df['EXACT'] = np.where(df['ERROR'] == 0.0, "YES", "NO") + df["EXACT"] = pd.Series("?", index=df.index) + df["EXACT"] = np.where(df["ERROR"] == 0.0, "YES", "NO") # Format for output. - df['PSC'] = df['PSC'].map('{:.0f}'.format) - df['ARR'] = df['ARR'].map('{:.0f}'.format) - df['F'] = df['F'].map('{:.6f}'.format) - df['ERROR'] = df['ERROR'].map('{:.10f}'.format) + df["PSC"] = df["PSC"].map("{:.0f}".format) + df["ARR"] = df["ARR"].map("{:.0f}".format) + df["F"] = df["F"].map("{:.6f}".format) + df["ERROR"] = df["ERROR"].map("{:.10f}".format) output = df.to_string() print(output) - print('\nthese are the ', df.shape[0], ' total combination meeting your tolerance requirement') + print( + "\nthese are the ", + df.shape[0], + " total combination meeting your tolerance requirement", + ) -def print_frequencies(period=PERIOD, fmin=0, fmax=np.inf, n_freqs=None): - df = pd.DataFrame(columns=['PSC', 'ARR', 'F', 'ERROR']) + +def print_frequencies(period=PERIOD, fmin=0, fmax=np.inf, n_freqs=None): + df = pd.DataFrame(columns=["PSC", "ARR", "F", "ERROR"]) prescalers = np.arange(1, MAX_INT, step=1) for psc in prescalers: f = get_frequency(psc, period) - if fmax >= f >= fmin: - df.loc[len(df), :] = dict( - PSC=psc, - ARR=period, - F=int(round(f)), - ERROR=0 - ) - df.sort_values('F', axis=0, inplace=True) + if fmax >= f >= fmin: + df.loc[len(df), :] = dict(PSC=psc, ARR=period, F=int(round(f)), ERROR=0) + df.sort_values("F", axis=0, inplace=True) n_total = len(df) if n_freqs is not None: - #keep_indices = np.linspace(0, n_total-1, n_freqs).astype(int) + # keep_indices = np.linspace(0, n_total-1, n_freqs).astype(int) keep_frequencies = np.linspace(fmin, fmax, n_freqs) - keep_indices = np.argmin(np.abs(keep_frequencies[None, :] - df.F[:, None]), axis=0) + keep_indices = np.argmin( + np.abs(keep_frequencies[None, :] - df.F[:, None]), axis=0 + ) df = df.iloc[keep_indices] - df.loc[:, 'ERROR'] = (keep_frequencies - df.F) + df.loc[:, "ERROR"] = keep_frequencies - df.F df = df.astype(np.int) return df + def print_correct_format(df): - print(r'freq_list_t freq_list_tim[] = {') + print(r"freq_list_t freq_list_tim[] = {") for i, row in df.iterrows(): - print('\t' + r'{' + f'{row.F}, {row.PSC}, {row.ARR}, {row.ERROR}' + r'},') - print(r'};') - + print("\t" + r"{" + f"{row.F}, {row.PSC}, {row.ARR}, {row.ERROR}" + r"},") + print(r"};") + -if __name__ == "__main__": - #print_errors() +if __name__ == "__main__": + # print_errors() df = print_frequencies(period=256, fmin=3000, fmax=4875, n_freqs=16) print(df) print_correct_format(df) diff --git a/python/reader_crtp.py b/python/reader_crtp.py index 598ef730..3752bdec 100644 --- a/python/reader_crtp.py +++ b/python/reader_crtp.py @@ -14,6 +14,7 @@ import cflib.crtp from cflib.crtp.crtpstack import CRTPPort + # TODO(FD) for now, below only works with modified cflib. # CRTP_PORT_AUDIO = CRTPPort.AUDIO CRTP_PORT_AUDIO = 0x09 @@ -23,99 +24,98 @@ # TODO(FD): figure out if below changes when the Crazyflie actually flies. # -# Tests have shown that when the flowdeck is attached to the Crazyflie and -# it is moved around (without flying), then +# Tests have shown that when the flowdeck is attached to the Crazyflie and +# it is moved around (without flying), then # yaw: -# - stabilizer.yaw, controller.yaw and stateEstimate.yaw are all the same. +# - stabilizer.yaw, controller.yaw and stateEstimate.yaw are all the same. # They are in degrees and clipped to -180, 180. -# - mag.x sometimes gives values similar to above 3, -# but sometimes it is constantly zero (should it only be used outside? +# - mag.x sometimes gives values similar to above 3, +# but sometimes it is constantly zero (should it only be used outside? # - gyro.z gives the raw yaw rate (or acceleration?). -# dx/dy: -# - motion.deltaX and motion.deltaY are in milimeters can be very noisy, especially when -# the ground is not textured. motion.deltaY points in "wrong" direction. -# - stateEstimateZ.vx and stateEstimateZ.vy are in mm/s and more stable +# dx/dy: +# - motion.deltaX and motion.deltaY are in milimeters can be very noisy, especially when +# the ground is not textured. motion.deltaY points in "wrong" direction. +# - stateEstimateZ.vx and stateEstimateZ.vy are in mm/s and more stable # but of course would need to be integrated for a position estimate. -# - stateEstimate.vx and stateEstimate.vy are in m/s and more stable -# but of course would need to be integrated for a position estimate. Note that they use a different +# - stateEstimate.vx and stateEstimate.vy are in m/s and more stable +# but of course would need to be integrated for a position estimate. Note that they use a different # reference frame than motion.deltaX, so they are inverted in the logger. -# - kalman.PX is in m/s and almost the same as stateEstimate.vx -# - kalman.X is in m and quite a smooth position estimate +# - kalman.PX is in m/s and almost the same as stateEstimate.vx +# - kalman.X is in m and quite a smooth position estimate # z: # - range.zrange gives the raw data in milimeters (uint16), which is quite accurate. -# - stateEstimateZ.z in milimeters (uint16), is more smooth but also overshoots a little bit, and even -# goes negative sometimes which is impossible. -# - stateEstimate.z in meters (float), from barometer. Surprisingly accurate. -# -# Format: +# - stateEstimateZ.z in milimeters (uint16), is more smooth but also overshoots a little bit, and even +# goes negative sometimes which is impossible. +# - stateEstimate.z in meters (float), from barometer. Surprisingly accurate. +# +# Format: # : (, , ) -# -# Note that all parameters are scaled so that they are given in degrees or meters. -# Also note that we change the coordinate systems so that the "front" on Crazyflie -# points in positive y direction, and the x points to the right. +# +# Note that all parameters are scaled so that they are given in degrees or meters. +# Also note that we change the coordinate systems so that the "front" on Crazyflie +# points in positive y direction, and the x points to the right. CHOSEN_LOGGERS = { - 'motion': { - 'yaw': ('stabilizer.yaw', 'float'), + "motion": { + "yaw": ("stabilizer.yaw", "float"), #'yaw_rate': ('gyro.z', 'float'), # one too many to respect size - 'dx': ('motion.deltaX', 'int16_t', 1000), - 'dy': ('motion.deltaY', 'int16_t', -1000), - 'vx': ('stateEstimate.vy', 'float', -1), - 'vy': ('stateEstimate.vx', 'float'), - 'x': ('kalman.stateY', 'float', -1), - 'y': ('kalman.stateX', 'float'), - 'z': ('range.zrange', 'uint16_t', 1000), + "dx": ("motion.deltaX", "int16_t", 1000), + "dy": ("motion.deltaY", "int16_t", -1000), + "vx": ("stateEstimate.vy", "float", -1), + "vy": ("stateEstimate.vx", "float"), + "x": ("kalman.stateY", "float", -1), + "y": ("kalman.stateX", "float"), + "z": ("range.zrange", "uint16_t", 1000), }, - 'status': { - 'vbat': ('pm.vbat', 'float'), + "status": {"vbat": ("pm.vbat", "float"),}, + "motors": { + "m1_pwm": ("pwm.m1_pwm", "uint32_t"), + "m2_pwm": ("pwm.m2_pwm", "uint32_t"), + "m3_pwm": ("pwm.m3_pwm", "uint32_t"), + "m4_pwm": ("pwm.m4_pwm", "uint32_t"), + "m1_thrust": ("audio.m1_thrust", "uint16_t"), + "m2_thrust": ("audio.m2_thrust", "uint16_t"), + "m3_thrust": ("audio.m3_thrust", "uint16_t"), + "m4_thrust": ("audio.m4_thrust", "uint16_t"), }, - 'motors': { - 'm1_pwm': ('pwm.m1_pwm', 'uint32_t'), - 'm2_pwm': ('pwm.m2_pwm', 'uint32_t'), - 'm3_pwm': ('pwm.m3_pwm', 'uint32_t'), - 'm4_pwm': ('pwm.m4_pwm', 'uint32_t'), - 'm1_thrust': ('audio.m1_thrust', 'uint16_t'), - 'm2_thrust': ('audio.m2_thrust', 'uint16_t'), - 'm3_thrust': ('audio.m3_thrust', 'uint16_t'), - 'm4_thrust': ('audio.m4_thrust', 'uint16_t'), - } } LOGGING_PERIODS_MS = { - 'motion': 10, - 'status': 1000, - 'motors': 100, + "motion": 10, + "status": 1000, + "motors": 100, } FFTSIZE = 32 N_MICS = 4 -CRTP_PAYLOAD = 29 # number of bytes per package +CRTP_PAYLOAD = 29 # number of bytes per package # audio signals data N_FRAMES_AUDIO = FFTSIZE * N_MICS * 2 # *2 for complex numbers AUDIO_DTYPE = np.float32 -N_FRAMES_FBINS = FFTSIZE +N_FRAMES_FBINS = FFTSIZE FBINS_DTYPE = np.uint16 -# the timestamp is sent in the end of the fbins messages, as an uint32. +# the timestamp is sent in the end of the fbins messages, as an uint32. N_BYTES_TIMESTAMP = 4 ALLOWED_DELTA_US = 1e6 -def test_logging_size(max_size=26): + +def test_logging_size(max_size=26): sizes = { - 'float': 4, - 'uint32_t': 4, - 'uint16_t': 2, - 'uint8_t': 1, - 'int32_t': 4, - 'int16_t': 2, - 'int8_t': 1, + "float": 4, + "uint32_t": 4, + "uint16_t": 2, + "uint8_t": 1, + "int32_t": 4, + "int16_t": 2, + "int8_t": 1, } for logger, log_dict in CHOSEN_LOGGERS.items(): size = 0 for key, vals in log_dict.items(): size += sizes[vals[1]] if size > max_size: - raise RuntimeError(f'logging config {logger} too big: {size}>{max_size}') + raise RuntimeError(f"logging config {logger} too big: {size}>{max_size}") class ArrayCRTP(object): @@ -126,7 +126,7 @@ def __init__(self, dtype, n_frames, name="array", extra_bytes=0): :param dtype: the type of data to be read (np.float32/np.uint16/etc.) :param name: name of this array (used for printing only) """ - self.name = name + self.name = name self.n_frames = n_frames self.n_bytes = n_frames * np.dtype(dtype).itemsize + extra_bytes self.n_packets_full, self.n_bytes_last = divmod(self.n_bytes, CRTP_PAYLOAD) @@ -146,16 +146,20 @@ def fill_array_from_crtp(self, packet, verbose=False): :returns: True if the array was filled, False if it is not full yet. """ if verbose and (self.name == "fbins"): - print(f"ReaderCRTP: filling fbins {self.packet_counter} (every second should be zero or one): {packet.datal}") + print( + f"ReaderCRTP: filling fbins {self.packet_counter} (every second should be zero or one): {packet.datal}" + ) elif verbose and (self.name == "audio"): - print(f"ReaderCRTP: filling audio {self.packet_counter} (first 6 floats): {packet.datal[:6*4]}") + print( + f"ReaderCRTP: filling audio {self.packet_counter} (first 6 floats): {packet.datal[:6*4]}" + ) # received all full packets, read remaining bytes if self.packet_counter == self.n_packets_full: self.bytes_array[ - self.packet_counter * CRTP_PAYLOAD: - self.packet_counter * CRTP_PAYLOAD + self.n_bytes_last - ] = packet.datal[:self.n_bytes_last] + self.packet_counter * CRTP_PAYLOAD : self.packet_counter * CRTP_PAYLOAD + + self.n_bytes_last + ] = packet.datal[: self.n_bytes_last] self.array = np.frombuffer(self.bytes_array, dtype=self.dtype) @@ -163,15 +167,17 @@ def fill_array_from_crtp(self, packet, verbose=False): self.packet_counter += 1 return True elif self.packet_counter < self.n_packets_full: - if (self.packet_counter == 0): + if self.packet_counter == 0: self.packet_start_time = time.time() - assert (self.packet_counter + 1)*CRTP_PAYLOAD < len(self.bytes_array), \ - f"{self.name}: index {self.packet_counter * CRTP_PAYLOAD} exceeds length {len(self.array)}" + assert (self.packet_counter + 1) * CRTP_PAYLOAD < len( + self.bytes_array + ), f"{self.name}: index {self.packet_counter * CRTP_PAYLOAD} exceeds length {len(self.array)}" self.bytes_array[ - self.packet_counter * CRTP_PAYLOAD: - (self.packet_counter + 1) * CRTP_PAYLOAD + self.packet_counter + * CRTP_PAYLOAD : (self.packet_counter + 1) + * CRTP_PAYLOAD ] = packet.datal self.packet_counter += 1 @@ -180,10 +186,14 @@ def fill_array_from_crtp(self, packet, verbose=False): print(f"unexpected packet: {self.packet_counter} > {self.n_packets_full}") def reset_array(self): - if (self.packet_counter != 0) and (self.packet_counter != self.n_packets_full + 1): - print(f"{self.name}: packet loss, received only {self.packet_counter}/{self.n_packets_full+1}.") + if (self.packet_counter != 0) and ( + self.packet_counter != self.n_packets_full + 1 + ): + print( + f"{self.name}: packet loss, received only {self.packet_counter}/{self.n_packets_full+1}." + ) success = False - else: + else: success = True self.packet_counter = 0 @@ -208,10 +218,17 @@ class ReaderCRTP(object): """ # TODO(FD) potentially replace with constant read in ROS - VELOCITY = 0.05 # 3 seconds for 15cm - BATTERY_OK = 3 #4.1 #3.83 - - def __init__(self, crazyflie, verbose=False, log_motion=False, log_status=True, log_motors=False): + VELOCITY = 0.05 # 3 seconds for 15cm + BATTERY_OK = 3 # 4.1 #3.83 + + def __init__( + self, + crazyflie, + verbose=False, + log_motion=False, + log_status=True, + log_motors=False, + ): test_logging_size() self.start_time = time.time() @@ -224,29 +241,26 @@ def __init__(self, crazyflie, verbose=False, log_motion=False, log_status=True, self.mc = MotionCommander(self.cf) self.logging_dicts = { - key: { - 'timestamp': None, - 'data': None, - 'published': True - } for key in ['motion', 'status', 'motors'] + key: {"timestamp": None, "data": None, "published": True} + for key in ["motion", "status", "motors"] } if log_motion: - self.init_log_config('motion') + self.init_log_config("motion") if log_status: - self.init_log_config('status') - if log_motors: - self.init_log_config('motors') + self.init_log_config("status") + if log_motors: + self.init_log_config("motors") self.cf.add_port_callback(CRTP_PORT_AUDIO, self.callback_audio) self.cf.add_port_callback(CRTPPort.CONSOLE, self.callback_console) # this data can be read and published by ROS nodes self.audio_dict = { - 'timestamp': None, - 'audio_timestamp': None, - 'signals_f_vect': None, - 'fbins': None, - 'published': True + "timestamp": None, + "audio_timestamp": None, + "signals_f_vect": None, + "fbins": None, + "published": True, } self.audio_array = ArrayCRTP(AUDIO_DTYPE, N_FRAMES_AUDIO, "audio") @@ -282,71 +296,82 @@ def callback_audio(self, packet): self.audio_array.reset_array() self.fbins_array.reset_array() - if self.frame_started and (packet.channel in [0, 1]): # channel is either 0 or 1: read audio data + if self.frame_started and ( + packet.channel in [0, 1] + ): # channel is either 0 or 1: read audio data self.audio_array.fill_array_from_crtp(packet, verbose=False) - elif self.frame_started and packet.channel == 2: # channel is 2: read fbins + elif self.frame_started and packet.channel == 2: # channel is 2: read fbins filled = self.fbins_array.fill_array_from_crtp(packet, verbose=False) - if filled: + if filled: # read the timestamp from the last packet timestamp_bytes = np.array( - packet.datal[self.fbins_array.n_bytes_last: - self.fbins_array.n_bytes_last + N_BYTES_TIMESTAMP], - dtype=np.uint8) + packet.datal[ + self.fbins_array.n_bytes_last : self.fbins_array.n_bytes_last + + N_BYTES_TIMESTAMP + ], + dtype=np.uint8, + ) # frombuffer returns array of length 1 - new_audio_timestamp = int(np.frombuffer(timestamp_bytes, dtype=np.uint32)[0]) + new_audio_timestamp = int( + np.frombuffer(timestamp_bytes, dtype=np.uint32)[0] + ) # reject faulty packages - if self.audio_timestamp and (new_audio_timestamp > self.audio_timestamp + ALLOWED_DELTA_US): - return + if self.audio_timestamp and ( + new_audio_timestamp > self.audio_timestamp + ALLOWED_DELTA_US + ): + return self.audio_timestamp = new_audio_timestamp - self.audio_dict['published'] = False - self.audio_dict['signals_f_vect'] = self.audio_array.array - self.audio_dict['fbins'] = self.fbins_array.array - self.audio_dict['audio_timestamp'] = self.audio_timestamp - self.audio_dict['timestamp'] = self.get_time_ms() - + self.audio_dict["published"] = False + self.audio_dict["signals_f_vect"] = self.audio_array.array + self.audio_dict["fbins"] = self.fbins_array.array + self.audio_dict["audio_timestamp"] = self.audio_timestamp + self.audio_dict["timestamp"] = self.get_time_ms() + if self.verbose: - packet_time = time.time() - self.audio_dict['timestamp'] - print(f"ReaderCRTP callback: time for all packets: {packet_time}s, current timestamp: {new_audio_timestamp}, update rate: {self.update_rate:.2f}") - self.update_rate = 1/(time.time() - self.last_packet_time) + packet_time = time.time() - self.audio_dict["timestamp"] + print( + f"ReaderCRTP callback: time for all packets: {packet_time}s, current timestamp: {new_audio_timestamp}, update rate: {self.update_rate:.2f}" + ) + self.update_rate = 1 / (time.time() - self.last_packet_time) self.last_packet_time = time.time() def callback_console(self, packet): - message = ''.join(chr(n) for n in packet.datal) - print(message, end='') + message = "".join(chr(n) for n in packet.datal) + print(message, end="") def callback_logging(self, timestamp, data, logconf): dict_to_fill = self.logging_dicts[logconf.name] - dict_to_fill['timestamp'] = self.get_time_ms() - dict_to_fill['published'] = False - dict_to_fill['data'] = {} + dict_to_fill["timestamp"] = self.get_time_ms() + dict_to_fill["published"] = False + dict_to_fill["data"] = {} for key, vals in CHOSEN_LOGGERS[logconf.name].items(): value = data[vals[0]] if len(vals) == 3: value /= vals[2] - dict_to_fill['data'][key] = value - #if self.verbose: + dict_to_fill["data"][key] = value + # if self.verbose: # print(f'ReaderCRTP {logconf.name} callback: {dict_to_fill["data"]}') def battery_ok(self): - battery = self.logging_dicts['status']['data']['vbat'] + battery = self.logging_dicts["status"]["data"]["vbat"] if battery is None: return True - elif battery <= self.BATTERY_OK: + elif battery <= self.BATTERY_OK: print(f"Warning: battery only at {battery}, not executing command") return False return True - def send_thrust_command(self, value, motor='all'): + def send_thrust_command(self, value, motor="all"): # current_value = np.mean([self.cf.param.values['motorPowerSet.m{i}'] for i in range(1, 5)]) if (value > 0) and not self.battery_ok(): return False - if motor == 'all': + if motor == "all": [self.cf.param.set_value(f"motorPowerSet.m{i}", value) for i in range(1, 5)] else: self.cf.param.set_value(f"motorPowerSet.{motor}", value) @@ -406,15 +431,21 @@ def send_audio_enable(self, value): if __name__ == "__main__": import argparse + verbose = True log_motors = True log_motion = True log_status = True cflib.crtp.init_drivers(enable_debug_driver=False) - parser = argparse.ArgumentParser(description='Read CRTP data from Crazyflie.') - parser.add_argument('id', metavar='ID', type=int, help='number of Crazyflie ("radio://0/[ID]0/2M")', - default=8) + parser = argparse.ArgumentParser(description="Read CRTP data from Crazyflie.") + parser.add_argument( + "id", + metavar="ID", + type=int, + help='number of Crazyflie ("radio://0/[ID]0/2M")', + default=8, + ) args = parser.parse_args() id = f"radio://0/{args.id}0/2M/E7E7E7E7E{args.id}" @@ -422,7 +453,13 @@ def send_audio_enable(self, value): with SyncCrazyflie(id) as scf: cf = scf.cf - reader_crtp = ReaderCRTP(cf, verbose=verbose, log_motion=log_motion, log_motors=log_motors, log_status=log_status) + reader_crtp = ReaderCRTP( + cf, + verbose=verbose, + log_motion=log_motion, + log_motors=log_motors, + log_status=log_status, + ) try: while True: diff --git a/python/serial_motors.py b/python/serial_motors.py index 30038577..e8d26d00 100644 --- a/python/serial_motors.py +++ b/python/serial_motors.py @@ -18,34 +18,34 @@ # (distance (cm), command, time (s)) move = { - 'forward': [ - (0.1, b"q", 2.0), + "forward": [ + (0.1, b"q", 2.0), (1, b"w", 5.0), (10, b"e", 34.0), - (50, b"r", DURATION_50) + (50, b"r", DURATION_50), ], - 'backward': [ - (0.1, b"a", 2.0), + "backward": [ + (0.1, b"a", 2.0), (1, b"s", 5.0), (10, b"d", 34.0), - (50, b"f", DURATION_50) - ] + (50, b"f", DURATION_50), + ], } turn = { - 'forward': [ - (27, b"p", 3), - (90, b"o", 8), - (360, b"i", DURATION_360) - ], - 'backward': [ - (27, b"l", 3), - (90, b"k", 8), - (360, b"j", DURATION_360) - ] + "forward": [(27, b"p", 3), (90, b"o", 8), (360, b"i", DURATION_360)], + "backward": [(27, b"l", 3), (90, b"k", 8), (360, b"j", DURATION_360)], } + class SerialMotors(object): - def __init__(self, port=SERIAL_PORT, baudrate=115200, verbose=False, current_angle=0, current_distance=0): + def __init__( + self, + port=SERIAL_PORT, + baudrate=115200, + verbose=False, + current_angle=0, + current_distance=0, + ): self.port = SERIAL_PORT self.serial = serial.Serial(port, baudrate) self.verbose = verbose @@ -53,7 +53,7 @@ def __init__(self, port=SERIAL_PORT, baudrate=115200, verbose=False, current_ang self.current_angle = current_angle self.current_distance = current_distance - # turn is by default non-blocking because when we do the 360 degrees we + # turn is by default non-blocking because when we do the 360 degrees we # want to recording DURING, not after, as for the others. def turn(self, angle_deg, blocking=True): if angle_deg > 0: @@ -66,11 +66,11 @@ def turn_to(self, angle_deg, blocking=True): self.turn(delta, blocking) def turn_forward(self, angle_deg, blocking=True): - self.move_in_chunks(turn['forward'], angle_deg, blocking=blocking) + self.move_in_chunks(turn["forward"], angle_deg, blocking=blocking) self.current_angle += angle_deg def turn_back(self, angle_deg, blocking=True): - self.move_in_chunks(turn['backward'], angle_deg, blocking=blocking) + self.move_in_chunks(turn["backward"], angle_deg, blocking=blocking) self.current_angle -= angle_deg def move(self, delta_cm, blocking=True): @@ -84,11 +84,11 @@ def move_to(self, distance_cm, blocking=True): self.move(delta, blocking) def move_forward(self, delta_cm, blocking=True): - self.move_in_chunks(move['forward'], delta_cm, blocking=blocking) + self.move_in_chunks(move["forward"], delta_cm, blocking=blocking) self.current_distance += delta_cm def move_back(self, delta_cm, blocking=True): - self.move_in_chunks(move['backward'], delta_cm, blocking=blocking) + self.move_in_chunks(move["backward"], delta_cm, blocking=blocking) self.current_distance -= delta_cm def move_in_chunks(self, commands, total, blocking=True): @@ -99,32 +99,32 @@ def move_in_chunks(self, commands, total, blocking=True): num_commands = int(leftover // partial) if self.verbose: - print(f'moving {leftover} in chunks of {partial}') + print(f"moving {leftover} in chunks of {partial}") if (num_commands > 1) and not blocking: if self.verbose: - print(f'cannot move by {leftover} in non-blocking mode.') + print(f"cannot move by {leftover} in non-blocking mode.") blocking = True for i in range(num_commands): if self.verbose: - print(f'running command {i+1}/{num_commands}') + print(f"running command {i+1}/{num_commands}") self.serial.write(command) if blocking: - time.sleep(time_s) # wait for linear movement to be done + time.sleep(time_s) # wait for linear movement to be done leftover = leftover % partial if leftover != 0: - print(f'Warning: not moving by last {leftover} cm.') + print(f"Warning: not moving by last {leftover} cm.") if __name__ == "__main__": sm = SerialMotors(verbose=True) - #sm.turn(27) - #sm.turn_back(27) - #sm.turn(180) + # sm.turn(27) + # sm.turn_back(27) + # sm.turn(180) sm.move_back(50) - #sm.turn_back(360) - #sm.move_back(10) + # sm.turn_back(360) + # sm.move_back(10) diff --git a/python/signals.py b/python/signals.py index 073f101d..44cd069e 100644 --- a/python/signals.py +++ b/python/signals.py @@ -16,19 +16,31 @@ MIN_FREQ = 200 MAX_FREQ = 7000 -def generate_signal_sweep(Fs, duration_sec, min_freq=MIN_FREQ, max_freq=MAX_FREQ, step=False, **kwargs): + +def generate_signal_sweep( + Fs, duration_sec, min_freq=MIN_FREQ, max_freq=MAX_FREQ, step=False, **kwargs +): num_samples = int(ceil(Fs * duration_sec)) if step: # stay at each frequency at least 10 buffers at audio_deck sampling rate. - duration_per_freq = 2048. * 5 / 32000 # seconds + duration_per_freq = 2048.0 * 5 / 32000 # seconds max_samples_per_frequency = int(ceil(Fs * duration_per_freq)) num_frequencies = int(ceil(num_samples / max_samples_per_frequency)) - frequencies = np.exp(np.linspace(np.log(min_freq), np.log(max_freq), num_frequencies)) - frequencies_repeat = np.repeat(frequencies, max_samples_per_frequency)[:num_samples] - times = np.tile(np.linspace(0, duration_per_freq, max_samples_per_frequency), num_frequencies)[:num_samples] + frequencies = np.exp( + np.linspace(np.log(min_freq), np.log(max_freq), num_frequencies) + ) + frequencies_repeat = np.repeat(frequencies, max_samples_per_frequency)[ + :num_samples + ] + times = np.tile( + np.linspace(0, duration_per_freq, max_samples_per_frequency), + num_frequencies, + )[:num_samples] else: - frequencies_repeat = np.exp(np.linspace(np.log(min_freq), np.log(max_freq), num_samples)) + frequencies_repeat = np.exp( + np.linspace(np.log(min_freq), np.log(max_freq), num_samples) + ) times = np.linspace(0, duration_sec, num_samples) return np.sin(2 * np.pi * np.multiply(frequencies_repeat, times)) @@ -36,7 +48,7 @@ def generate_signal_sweep(Fs, duration_sec, min_freq=MIN_FREQ, max_freq=MAX_FREQ def generate_signal_mono(Fs, duration_sec, frequency_hz=1000, **kwargs): num_samples = int(ceil(Fs * duration_sec)) times = np.linspace(0, duration_sec, num_samples) - phase_offset = kwargs.get('phase_offset', 0) + phase_offset = kwargs.get("phase_offset", 0) return np.sin(2 * np.pi * frequency_hz * times + phase_offset) @@ -62,21 +74,26 @@ def generate_signal_real(Fs, duration_sec, fname, **kwargs): if q < 1: raise NotImplementedError("Cannot upsample file.") from scipy.signal import resample + old_length = len(real_signal) num = int(old_length / q) - print(f"Warning: specified Fs({Fs}) mismatches file ({Fs_file}). Upsampling by {q}") + print( + f"Warning: specified Fs({Fs}) mismatches file ({Fs_file}). Upsampling by {q}" + ) real_signal = resample(real_signal, num=num) assert len(real_signal) * q == old_length, (old_length, q * len(real_signal)) if duration_sec is not None: num_samples = int(ceil(Fs * duration_sec)) if num_samples >= len(real_signal): - print(f"Warning: requested more samples than available ({num_samples}>{len(real_signal)})") + print( + f"Warning: requested more samples than available ({num_samples}>{len(real_signal)})" + ) return real_signal start_idx = np.random.choice(np.arange(len(real_signal) - num_samples)) print(", starting at", start_idx) - return real_signal[start_idx:start_idx + num_samples] + return real_signal[start_idx : start_idx + num_samples] print("\n") return real_signal @@ -86,7 +103,9 @@ def generate_signal_zero(Fs, duration_sec, **kwargs): return np.zeros(num_samples) -def generate_signal(Fs, duration_sec, signal_type="mono", min_dB=-50, max_dB=0, **kwargs): +def generate_signal( + Fs, duration_sec, signal_type="mono", min_dB=-50, max_dB=0, **kwargs +): if signal_type == "mono": signal = generate_signal_mono(Fs, duration_sec, **kwargs) signal = amplify_signal(signal, target_dB=max_dB) @@ -103,7 +122,6 @@ def generate_signal(Fs, duration_sec, signal_type="mono", min_dB=-50, max_dB=0, signal = generate_signal_sweep(Fs, duration_sec, **kwargs) signal = amplify_signal(signal, target_dB=max_dB, verbose=True) - elif signal_type == "random_linear": signal = generate_signal_random(Fs, duration_sec, **kwargs) signal = linear_increase(signal, min_dB, max_dB) @@ -118,7 +136,7 @@ def generate_signal(Fs, duration_sec, signal_type="mono", min_dB=-50, max_dB=0, else: raise ValueError(signal_type) - #if np.any(signal > 1.0): + # if np.any(signal > 1.0): # print(f"Warning: signal is higher than 1.0. This could lead to clipping at the soundcard! Range: {np.min(signal)} to {np.max(signal)}") return signal @@ -131,7 +149,7 @@ def get_power(signal, dB=True): the average energy is calculated. """ - power = np.linalg.norm(signal)**2 / signal.size + power = np.linalg.norm(signal) ** 2 / signal.size if dB: return 10 * np.log10(power) return power @@ -146,10 +164,10 @@ def amplify_signal(signal, change_dB=None, target_dB=None, verbose=False): elif target_dB is not None: change_dB = target_dB - power_dB - ratio_amplitudes = np.sqrt(10**(change_dB / 10.0)) + ratio_amplitudes = np.sqrt(10 ** (change_dB / 10.0)) if verbose: - print('current power', power_dB) - print('target power', target_dB) + print("current power", power_dB) + print("target power", target_dB) print("amplitudes ratio:", ratio_amplitudes) new_signal = ratio_amplitudes * signal @@ -160,8 +178,8 @@ def amplify_signal(signal, change_dB=None, target_dB=None, verbose=False): def linear_increase(signal, min_dB=-50, max_dB=0): curr_dB = get_power(signal) - gains_dB = np.linspace(min_dB, max_dB, len(signal)) - curr_dB - gains = 10**(gains_dB/20) + gains_dB = np.linspace(min_dB, max_dB, len(signal)) - curr_dB + gains = 10 ** (gains_dB / 20) return np.multiply(gains, signal) @@ -177,7 +195,7 @@ def evaluate(self, times, noise=0): """ :param t: array or list of time instances. """ - raise NotImplementedError('needs to be implemented by inheriting classes') + raise NotImplementedError("needs to be implemented by inheriting classes") def create_audio_sample(self, sound_duration=1, sampling_rate=AUDIO_SAMPLING_RATE): time_samples = np.arange(0, sound_duration, 1 / sampling_rate) @@ -194,7 +212,9 @@ def evaluate(self, times, noise=0): """ :param t: array or list of time instances. """ - signal = self.params['amplitude'] * np.sin(self.params['omega'] * np.array(times)) + signal = self.params["amplitude"] * np.sin( + self.params["omega"] * np.array(times) + ) if noise > 0: signal += np.random.normal(scale=noise, loc=0, size=signal.shape) return signal.flatten() @@ -208,5 +228,5 @@ def evaluate(self, times, noise=0): """ :param t: array or list of time instances. """ - signal = np.random.normal(scale=self.params['sigma'], loc=0, size=len(times)) + signal = np.random.normal(scale=self.params["sigma"], loc=0, size=len(times)) return signal.flatten() diff --git a/python/test_beamforming.py b/python/test_beamforming.py index f505cb93..44912844 100644 --- a/python/test_beamforming.py +++ b/python/test_beamforming.py @@ -29,13 +29,16 @@ def prepare(mic_positions, gt_azimuth=30 * np.pi / 180): frequencies = np.fft.rfftfreq(n=n_times, d=1 / Fs) frequencies_signal = [200, 300, 400, 500] # Hz - assert np.all([f in frequencies for f in frequencies_signal - ]), f"choose valid signal frequencies! {(frequencies, frequencies_signal)}" + assert np.all( + [f in frequencies for f in frequencies_signal] + ), f"choose valid signal frequencies! {(frequencies, frequencies_signal)}" delays = get_mic_delays(mic_positions, azimuth=gt_azimuth) signals = np.zeros((mic_positions.shape[0], n_times)) for freq in frequencies_signal: - new_signals = np.array([np.cos(2 * np.pi * freq * (times - delay) + offset) for delay in delays]) + new_signals = np.array( + [np.cos(2 * np.pi * freq * (times - delay) + offset) for delay in delays] + ) assert new_signals.shape == signals.shape signals += new_signals @@ -46,13 +49,18 @@ def prepare(mic_positions, gt_azimuth=30 * np.pi / 180): np.testing.assert_allclose(signals, signals_test, atol=1e-10) freq_bins = [int(f / Fs * n_times) for f in frequencies_signal] - assert all([freq_bins[i] == np.where(f == frequencies)[0][0] for i, f in enumerate(frequencies_signal)]) + assert all( + [ + freq_bins[i] == np.where(f == frequencies)[0][0] + for i, f in enumerate(frequencies_signal) + ] + ) Rx = get_autocorrelation(signals, freq_bins) return signals_f, Rx, frequencies_signal, freq_bins def test_beamforming(): - def plot(responses, title='DAS', ax=None): + def plot(responses, title="DAS", ax=None): if ax is None: fig, ax = plt.subplots() combination_sum = np.sum(responses, axis=1) @@ -61,8 +69,8 @@ def plot(responses, title='DAS', ax=None): ax.plot(azimuth_scan * 180 / np.pi, responses[:, j], label=f"{f:.0f} Hz") ax.plot(azimuth_scan * 180 / np.pi, combination_sum, label=f"sum") ax.plot(azimuth_scan * 180 / np.pi, combination_product, label=f"product") - ax.axvline(x=gt_azimuth * 180 / np.pi, label='source direction', ls=':') - ax.set_yscale('log') + ax.axvline(x=gt_azimuth * 180 / np.pi, label="source direction", ls=":") + ax.set_yscale("log") ax.set_title(title) ax.legend() @@ -70,47 +78,54 @@ def plot(responses, title='DAS', ax=None): baseline = 0.11 # meters mic_positions_dict = { - 'ULA': baseline * np.c_[[0, 0], [1, 0], [2, 0]].T, + "ULA": baseline * np.c_[[0, 0], [1, 0], [2, 0]].T, #'modified ULA': baseline * np.c_[[0, 1], [1, 0], [2, 0]].T, - 'square': get_square_array(baseline=baseline, delta=0) + "square": get_square_array(baseline=baseline, delta=0), } for mic_name, mic_positions in mic_positions_dict.items(): - print('mic_positions', mic_name) + print("mic_positions", mic_name) n_mics = mic_positions.shape[0] for deg in test_angles_deg: - print('source at', deg) + print("source at", deg) gt_azimuth = deg * np.pi / 180 # radiants - signals_f, Rx, frequencies_signal, freq_bins = prepare(mic_positions, gt_azimuth) + signals_f, Rx, frequencies_signal, freq_bins = prepare( + mic_positions, gt_azimuth + ) # TODO: when we do the full circle, LCMV places a high responses symmetrically to the zero, # and we get a wrong estimate. We might want to figure out why. - #azimuth_scan = np.linspace(0, 360, 361) * np.pi / 180 + # azimuth_scan = np.linspace(0, 360, 361) * np.pi / 180 azimuth_scan = np.linspace(0, 180, 181) * np.pi / 180 responses_das = np.zeros((len(azimuth_scan), len(frequencies_signal))) responses_lcmv = np.zeros((len(azimuth_scan), len(frequencies_signal))) responses_mvdr = np.zeros((len(azimuth_scan), len(frequencies_signal))) lamda = 1e-3 - lcmv_zero = gt_azimuth + (30 * np.pi / 180.) + lcmv_zero = gt_azimuth + (30 * np.pi / 180.0) for i, az in enumerate(azimuth_scan): constraints = [(az, 1)] - h_mvdr_vector, *_ = get_lcmv_beamformer(Rx, frequencies_signal, mic_positions, constraints, - lamda=lamda) # n_freqs x n_mics + h_mvdr_vector, *_ = get_lcmv_beamformer( + Rx, frequencies_signal, mic_positions, constraints, lamda=lamda + ) # n_freqs x n_mics delays = get_mic_delays(mic_positions, azimuth=az) if az != lcmv_zero: constraints = [(az, 1), (lcmv_zero, 0)] - h_lcmv_vector, *_ = get_lcmv_beamformer(Rx, frequencies_signal, mic_positions, - constraints) # n_freqs x n_mics + h_lcmv_vector, *_ = get_lcmv_beamformer( + Rx, frequencies_signal, mic_positions, constraints + ) # n_freqs x n_mics else: constraints = [(az, 1)] - h_lcmv_vector, *_ = get_lcmv_beamformer(Rx, frequencies_signal, mic_positions, - constraints) # n_freqs x n_mics + h_lcmv_vector, *_ = get_lcmv_beamformer( + Rx, frequencies_signal, mic_positions, constraints + ) # n_freqs x n_mics - h_das_vector = get_das_beamformer(az, frequencies_signal, mic_positions) # n_freqs x n_mics + h_das_vector = get_das_beamformer( + az, frequencies_signal, mic_positions + ) # n_freqs x n_mics for j, f in enumerate(frequencies_signal): freq_bin = freq_bins[j] @@ -126,15 +141,23 @@ def plot(responses, title='DAS', ax=None): responses_das[i, j] = power_y_das power_y_das_test = get_powers(h_das_vector, Rx)[j] - np.testing.assert_allclose(power_y_das, power_y_das_test, atol=1e-10) + np.testing.assert_allclose( + power_y_das, power_y_das_test, atol=1e-10 + ) - power_y_das_test2 = np.abs(h_das.conj().dot(signals_f[:, freq_bin]))**2 / n_mics - np.testing.assert_allclose(power_y_das, power_y_das_test2, atol=1e-10) + power_y_das_test2 = ( + np.abs(h_das.conj().dot(signals_f[:, freq_bin])) ** 2 / n_mics + ) + np.testing.assert_allclose( + power_y_das, power_y_das_test2, atol=1e-10 + ) # check that phase of h exactly compensates for delays in ground truth direction. if az == gt_azimuth: np.testing.assert_allclose( - np.unwrap(np.angle(h_das)) / (2 * np.pi * f), -delays.reshape(h_das.shape)) + np.unwrap(np.angle(h_das)) / (2 * np.pi * f), + -delays.reshape(h_das.shape), + ) # MVDR np.testing.assert_allclose(h_mvdr.conj().dot(a_vector), 1.0) @@ -142,27 +165,35 @@ def plot(responses, title='DAS', ax=None): responses_mvdr[i, j] = power_y_mvdr power_y_mvdr_test = get_powers(h_mvdr_vector, Rx)[j] - np.testing.assert_allclose(power_y_mvdr, power_y_mvdr_test, atol=1e-10) + np.testing.assert_allclose( + power_y_mvdr, power_y_mvdr_test, atol=1e-10 + ) # TODO is it normal that this atol has to be relatively high? - power_y_mvdr_test2 = np.abs(h_mvdr.conj().dot(signals_f[:, freq_bin]))**2 / n_mics - np.testing.assert_allclose(power_y_mvdr, power_y_mvdr_test2, atol=1e-5) + power_y_mvdr_test2 = ( + np.abs(h_mvdr.conj().dot(signals_f[:, freq_bin])) ** 2 / n_mics + ) + np.testing.assert_allclose( + power_y_mvdr, power_y_mvdr_test2, atol=1e-5 + ) # TODO not sure why this does not work # however it does also work, so there is just a scaling issue somewhere. Rx_inv = np.linalg.inv(Rx[j] + lamda * np.eye(n_mics)) - power_y_mvdr_test3 = 1 / np.abs(a_vector.conj().dot(Rx_inv).dot(a_vector)) - #np.testing.assert_allclose(power_y_mvdr, power_y_mvdr_test3, atol=1e-5) + power_y_mvdr_test3 = 1 / np.abs( + a_vector.conj().dot(Rx_inv).dot(a_vector) + ) + # np.testing.assert_allclose(power_y_mvdr, power_y_mvdr_test3, atol=1e-5) # LCMV - #np.testing.assert_allclose(np.abs(h_lcmv.conj().dot(a_vector)), 1.0) + # np.testing.assert_allclose(np.abs(h_lcmv.conj().dot(a_vector)), 1.0) power_y_lcmv = np.abs(h_lcmv.conj().dot(Rx[j]).dot(h_lcmv)) responses_lcmv[i, j] = power_y_lcmv # TODO reponses_lcmv does not always work, to be checked later. for responses, name in zip( - [responses_das, responses_mvdr], # responses_lcmv], - ['das', 'mvdr']): #, 'lcmv']): + [responses_das, responses_mvdr], ["das", "mvdr"] # responses_lcmv], + ): # , 'lcmv']): combination_sum = np.sum(responses, axis=1) combination_product = np.product(responses, axis=1) @@ -176,13 +207,18 @@ def plot(responses, title='DAS', ax=None): guess_sum = 2 * np.pi - guess_sum if guess_product > np.pi: guess_product = 2 * np.pi - guess_product - assert abs(guess_sum - gt_azimuth) < angle_res / 2.0, f"failed for {name}, {deg}" - assert abs(guess_product - gt_azimuth) < angle_res / 2.0, f"failed for {name}, {deg}" + assert ( + abs(guess_sum - gt_azimuth) < angle_res / 2.0 + ), f"failed for {name}, {deg}" + assert ( + abs(guess_product - gt_azimuth) < angle_res / 2.0 + ), f"failed for {name}, {deg}" if PLOTTING: import matplotlib.pylab as plt + fig, axs = plt.subplots(3, 1) - plot(responses_das, 'DAS', ax=axs[0]) - plot(responses_mvdr, 'MVDR', ax=axs[1]) - plot(responses_lcmv, 'LCMV', ax=axs[2]) + plot(responses_das, "DAS", ax=axs[0]) + plot(responses_mvdr, "MVDR", ax=axs[1]) + plot(responses_lcmv, "LCMV", ax=axs[2]) plt.show() diff --git a/python/test_signals.py b/python/test_signals.py index 203fd779..384f4507 100644 --- a/python/test_signals.py +++ b/python/test_signals.py @@ -13,15 +13,15 @@ def test_power(): # 1-dimensional signals sigma = 1e-3 signal = np.random.normal(scale=sigma, size=length) - power = sigma**2 + power = sigma ** 2 assert abs(power - get_power(signal, dB=False)) < 1e-1 a = 5 f = 200 times = np.arange(length) / (10 * f) signal = a * np.sin(2 * np.pi * f * times) - assert signal.shape == (length, ) - power = a**2 / 2 + assert signal.shape == (length,) + power = a ** 2 / 2 assert abs(power - get_power(signal, dB=False)) < 1e-1 # multi-dimensional signals @@ -29,13 +29,18 @@ def test_power(): sigma = 1e-3 signal = np.random.normal(scale=sigma, size=(num_mics, length)) - power = sigma**2 + power = sigma ** 2 assert abs(power - get_power(signal, dB=False)) < 1e-1 a = 5 f = 200 times = np.arange(length) / (10 * f) - signal = np.r_[[a * np.sin(2 * np.pi * f * times + offset) for offset in np.random.rand(num_mics)]] + signal = np.r_[ + [ + a * np.sin(2 * np.pi * f * times + offset) + for offset in np.random.rand(num_mics) + ] + ] assert signal.shape == (num_mics, length) - power = a**2 / 2 + power = a ** 2 / 2 assert abs(power - get_power(signal, dB=False)) < 1e-1