From 71d7d53fa05a0e5d74f26e3fd4d530a2240dbd93 Mon Sep 17 00:00:00 2001 From: Santiago Martinez Date: Wed, 25 Jan 2023 18:55:45 +0000 Subject: [PATCH] Added main functions to use as endpoints --- application.py | 4 +- batdetect2_gui/__init__.py | 0 batdetect2_gui/application.py | 54 ++++--------- batdetect2_gui/prepare_audio_files.py | 106 +++++++++++++++++++++++++ prepare_audio_files.py | 107 +------------------------- 5 files changed, 126 insertions(+), 145 deletions(-) create mode 100644 batdetect2_gui/__init__.py create mode 100644 batdetect2_gui/prepare_audio_files.py diff --git a/application.py b/application.py index 6333d54..52e0748 100644 --- a/application.py +++ b/application.py @@ -1,4 +1,4 @@ -from batdetect2_gui.application import application +from batdetect2_gui.application import main if __name__ == "__main__": - application.run(host="127.0.0.1", port=8000) + main() diff --git a/batdetect2_gui/__init__.py b/batdetect2_gui/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/batdetect2_gui/application.py b/batdetect2_gui/application.py index 94ccbf6..d058c41 100755 --- a/batdetect2_gui/application.py +++ b/batdetect2_gui/application.py @@ -157,16 +157,11 @@ def submit_annotations(): dataset["annotations"][input_id]["issues"] = False # save to disk - save_annotation( - dataset["annotation_dir"], dataset["annotations"][input_id] - ) + save_annotation(dataset["annotation_dir"], dataset["annotations"][input_id]) # add the submitted classes to the list, then sort them and remove duplicates this_file_class_list = set( - [ - aa["class"] - for aa in dataset["annotations"][input_id]["annotation"] - ] + [aa["class"] for aa in dataset["annotations"][input_id]["annotation"]] ) sorted_classes = sorted( list( @@ -178,9 +173,7 @@ def submit_annotations(): # redirect to the next item to annotate file_id = dataset["file_names"].index(request.form["file_name"]) - next_id = select_file( - len(dataset["file_names"]), file_id=file_id, next_file=True - ) + next_id = select_file(len(dataset["file_names"]), file_id=file_id, next_file=True) url_str = ( "/annotate/?file_name=" + dataset["file_names"][next_id] @@ -207,10 +200,7 @@ def render_annotation_page(): if "name" not in session.keys(): initialize_session(default_file) - if ( - "dataset_id" in request.args - and request.args["dataset_id"] in datasets - ): + if "dataset_id" in request.args and request.args["dataset_id"] in datasets: session["dataset_id"] = request.args["dataset_id"] session.modified = True @@ -235,16 +225,10 @@ def render_annotation_page(): print("serving ", annotation["file_name"]) file_params, im_data, aud_data = get_data(annotation, use_cache=True) - next_file = dataset["file_names"][ - (cur_file + 1) % len(dataset["file_names"]) - ] - prev_file = dataset["file_names"][ - (cur_file - 1) % len(dataset["file_names"]) - ] + next_file = dataset["file_names"][(cur_file + 1) % len(dataset["file_names"])] + prev_file = dataset["file_names"][(cur_file - 1) % len(dataset["file_names"])] - annotations_sorted = [ - dataset["annotations"][ff] for ff in dataset["file_names"] - ] + annotations_sorted = [dataset["annotations"][ff] for ff in dataset["file_names"]] return render_template( "annotate.html", @@ -312,10 +296,7 @@ def render_file_list_page(): if "name" not in session.keys(): initialize_session() - if ( - "dataset_id" in request.args - and request.args["dataset_id"] in datasets - ): + if "dataset_id" in request.args and request.args["dataset_id"] in datasets: session["dataset_id"] = request.args["dataset_id"] session.modified = True @@ -439,9 +420,7 @@ def get_data(annotation, use_cache=True): """ # check if data has already been computed - if yes, return it dataset = datasets[session["dataset_id"]] - cache_key = gen_cache_key( - annotation["file_name"], get_spectrogram_params() - ) + cache_key = gen_cache_key(annotation["file_name"], get_spectrogram_params()) if use_cache: if cache_key in cache and cache[cache_key]["thread_lock"] is False: print(" using cached data for " + annotation["file_name"]) @@ -464,9 +443,7 @@ def get_data(annotation, use_cache=True): else: # if no, compute from scratch - file_params, im_data, aud_data = compute_data( - annotation, dataset["audio_dir"] - ) + file_params, im_data, aud_data = compute_data(annotation, dataset["audio_dir"]) return file_params, im_data, aud_data @@ -495,9 +472,7 @@ def cache_item(annotation): :return: """ audio_dir = datasets[session["dataset_id"]]["audio_dir"] - cache_key = gen_cache_key( - annotation["file_name"], get_spectrogram_params() - ) + cache_key = gen_cache_key(annotation["file_name"], get_spectrogram_params()) # initialise cache item cache[cache_key] = { @@ -567,8 +542,7 @@ def compute_cache_data(anns): # to save space, we remove the oldest items from the cache # size of cache is set in the config file cache_data = [ - {k: item[k] for k in ("cache_key", "created_at")} - for item in cache.values() + {k: item[k] for k in ("cache_key", "created_at")} for item in cache.values() ] cache_data = sorted(cache_data, key=lambda item: item["created_at"]) cache_size = max(config.CACHE_SIZE, 3) # want to keep the most recent ones @@ -770,3 +744,7 @@ def create_dataset(audio_dir, annotation_dir): datasets[dataset["id"]] = dataset return dataset["id"] + + +def main(): + application.run(host="127.0.0.1", port=8000) diff --git a/batdetect2_gui/prepare_audio_files.py b/batdetect2_gui/prepare_audio_files.py new file mode 100644 index 0000000..892815d --- /dev/null +++ b/batdetect2_gui/prepare_audio_files.py @@ -0,0 +1,106 @@ +""" +You can clip your files so that they are shorter using this script. +You need to specify the locations of the input files and where you want the +shorter files to be saved. + +There are additional settings that allow you to specify the output duration +and where in the file you start clipping from. +""" + +import argparse +import os +from pathlib import Path + +import numpy as np + +from batdetect2_gui import audio_utils as au +from batdetect2_gui import wavfile + + +def parse_args(): + info_str = ( + "\nScript that extracts smaller segment of audio from a larger file.\n" + + " Place the files that should be clipped into the input directory.\n" + ) + + print(info_str) + parser = argparse.ArgumentParser() + parser.add_argument( + "input_directory", + type=str, + help="Input directory containing the audio files", + ) + parser.add_argument( + "output_directory", + type=str, + help="Output directory the clipped audio files", + ) + parser.add_argument( + "--output_duration", + default=2.0, + type=float, + help="Length of output clipped file (default is 2 seconds)", + ) + parser.add_argument( + "--start_time", + type=float, + default=0.0, + help="Start time from which the audio file is clipped (deafult is 0.0)", + ) + parser.add_argument( + "--time_expansion_factor", + type=int, + default=1, + help="The time expansion factor used for all files (default is 1)", + ) + return vars(parser.parse_args()) + + +def main(): + args = parse_args() + + audio_files = list(Path(args["input_directory"]).rglob("*.wav")) + list( + Path(args["input_directory"]).rglob("*.WAV") + ) + ip_files = [os.path.join(aa.parent, aa.name) for aa in audio_files] + + print("Input directory : " + args["input_directory"]) + print("Output directory : " + args["output_directory"]) + print("Start time : {}".format(args["start_time"])) + print("Output duration : {}".format(args["output_duration"])) + print("Audio files found : {}".format(len(ip_files))) + + if len(ip_files) == 0: + return False + + if not os.path.isdir(os.path.dirname(args["output_directory"])): + os.makedirs(os.path.dirname(args["output_directory"])) + + for ii, ip_path in enumerate(ip_files): + sampling_rate, ip_audio = au.load_audio_file( + ip_path, args["time_expansion_factor"] + ) + + st_time = args["start_time"] + en_time = st_time + args["output_duration"] + st_samp = int(st_time * sampling_rate) + en_samp = np.minimum(int(en_time * sampling_rate), ip_audio.shape[0]) + + op_audio = np.zeros( + int(sampling_rate * args["output_duration"]), dtype=ip_audio.dtype + ) + op_audio[: en_samp - st_samp] = ip_audio[st_samp:en_samp] + + op_file = os.path.basename(ip_path).replace(" ", "_") + op_file_en = "__{:.2f}".format(st_time) + "_" + "{:.2f}".format(en_time) + op_file = op_file[:-4] + op_file_en + ".wav" + + op_path = os.path.join(args["output_directory"], op_file) + wavfile.write(op_path, sampling_rate, op_audio) + + print("\n{}\tIP: ".format(ii) + os.path.basename(ip_path)) + print("\tOP: " + os.path.basename(op_path)) + + +if __name__ == "__main__": + main() diff --git a/prepare_audio_files.py b/prepare_audio_files.py index a1077a0..58e06aa 100644 --- a/prepare_audio_files.py +++ b/prepare_audio_files.py @@ -1,107 +1,4 @@ -""" -You can clip your files so that they are shorter using this script. -You need to specify the locations of the input files and where you want the -shorter files to be saved. - -There are additional settings that allow you to specify the output duration -and where in the file you start clipping from. -""" - -import argparse -import os -from pathlib import Path - -import numpy as np - -from batdetect2_gui import audio_utils as au -from batdetect2_gui import wavfile - - -def main(args): - - audio_files = list(Path(args["input_directory"]).rglob("*.wav")) + list( - Path(args["input_directory"]).rglob("*.WAV") - ) - ip_files = [os.path.join(aa.parent, aa.name) for aa in audio_files] - - print("Input directory : " + args["input_directory"]) - print("Output directory : " + args["output_directory"]) - print("Start time : {}".format(args["start_time"])) - print("Output duration : {}".format(args["output_duration"])) - print("Audio files found : {}".format(len(ip_files))) - - if len(ip_files) == 0: - return False - - if not os.path.isdir(os.path.dirname(args["output_directory"])): - os.makedirs(os.path.dirname(args["output_directory"])) - - for ii, ip_path in enumerate(ip_files): - sampling_rate, ip_audio = au.load_audio_file( - ip_path, args["time_expansion_factor"] - ) - duration = ip_audio.shape[0] / sampling_rate - - st_time = args["start_time"] - en_time = st_time + args["output_duration"] - st_samp = int(st_time * sampling_rate) - en_samp = np.minimum(int(en_time * sampling_rate), ip_audio.shape[0]) - - op_audio = np.zeros( - int(sampling_rate * args["output_duration"]), dtype=ip_audio.dtype - ) - op_audio[: en_samp - st_samp] = ip_audio[st_samp:en_samp] - - op_file = os.path.basename(ip_path).replace(" ", "_") - op_file_en = ( - "__{:.2f}".format(st_time) + "_" + "{:.2f}".format(en_time) - ) - op_file = op_file[:-4] + op_file_en + ".wav" - - op_path = os.path.join(args["output_directory"], op_file) - wavfile.write(op_path, sampling_rate, op_audio) - - print("\n{}\tIP: ".format(ii) + os.path.basename(ip_path)) - print("\tOP: " + os.path.basename(op_path)) - +from batdetect2_gui.prepare_audio_files import main if __name__ == "__main__": - - info_str = ( - "\nScript that extracts smaller segment of audio from a larger file.\n" - + " Place the files that should be clipped into the input directory.\n" - ) - - print(info_str) - parser = argparse.ArgumentParser() - parser.add_argument( - "input_directory", - type=str, - help="Input directory containing the audio files", - ) - parser.add_argument( - "output_directory", - type=str, - help="Output directory the clipped audio files", - ) - parser.add_argument( - "--output_duration", - default=2.0, - type=float, - help="Length of output clipped file (default is 2 seconds)", - ) - parser.add_argument( - "--start_time", - type=float, - default=0.0, - help="Start time from which the audio file is clipped (deafult is 0.0)", - ) - parser.add_argument( - "--time_expansion_factor", - type=int, - default=1, - help="The time expansion factor used for all files (default is 1)", - ) - args = vars(parser.parse_args()) - - main(args) + main()