Skip to content

Commit

Permalink
Added main functions to use as endpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
mbsantiago committed Jan 25, 2023
1 parent 046b399 commit 71d7d53
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 145 deletions.
4 changes: 2 additions & 2 deletions application.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from batdetect2_gui.application import application
from batdetect2_gui.application import main

if __name__ == "__main__":
application.run(host="127.0.0.1", port=8000)
main()
Empty file added batdetect2_gui/__init__.py
Empty file.
54 changes: 16 additions & 38 deletions batdetect2_gui/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,16 +157,11 @@ def submit_annotations():
dataset["annotations"][input_id]["issues"] = False

# save to disk
save_annotation(
dataset["annotation_dir"], dataset["annotations"][input_id]
)
save_annotation(dataset["annotation_dir"], dataset["annotations"][input_id])

# add the submitted classes to the list, then sort them and remove duplicates
this_file_class_list = set(
[
aa["class"]
for aa in dataset["annotations"][input_id]["annotation"]
]
[aa["class"] for aa in dataset["annotations"][input_id]["annotation"]]
)
sorted_classes = sorted(
list(
Expand All @@ -178,9 +173,7 @@ def submit_annotations():

# redirect to the next item to annotate
file_id = dataset["file_names"].index(request.form["file_name"])
next_id = select_file(
len(dataset["file_names"]), file_id=file_id, next_file=True
)
next_id = select_file(len(dataset["file_names"]), file_id=file_id, next_file=True)
url_str = (
"/annotate/?file_name="
+ dataset["file_names"][next_id]
Expand All @@ -207,10 +200,7 @@ def render_annotation_page():
if "name" not in session.keys():
initialize_session(default_file)

if (
"dataset_id" in request.args
and request.args["dataset_id"] in datasets
):
if "dataset_id" in request.args and request.args["dataset_id"] in datasets:
session["dataset_id"] = request.args["dataset_id"]
session.modified = True

Expand All @@ -235,16 +225,10 @@ def render_annotation_page():

print("serving ", annotation["file_name"])
file_params, im_data, aud_data = get_data(annotation, use_cache=True)
next_file = dataset["file_names"][
(cur_file + 1) % len(dataset["file_names"])
]
prev_file = dataset["file_names"][
(cur_file - 1) % len(dataset["file_names"])
]
next_file = dataset["file_names"][(cur_file + 1) % len(dataset["file_names"])]
prev_file = dataset["file_names"][(cur_file - 1) % len(dataset["file_names"])]

annotations_sorted = [
dataset["annotations"][ff] for ff in dataset["file_names"]
]
annotations_sorted = [dataset["annotations"][ff] for ff in dataset["file_names"]]

return render_template(
"annotate.html",
Expand Down Expand Up @@ -312,10 +296,7 @@ def render_file_list_page():
if "name" not in session.keys():
initialize_session()

if (
"dataset_id" in request.args
and request.args["dataset_id"] in datasets
):
if "dataset_id" in request.args and request.args["dataset_id"] in datasets:
session["dataset_id"] = request.args["dataset_id"]
session.modified = True

Expand Down Expand Up @@ -439,9 +420,7 @@ def get_data(annotation, use_cache=True):
"""
# check if data has already been computed - if yes, return it
dataset = datasets[session["dataset_id"]]
cache_key = gen_cache_key(
annotation["file_name"], get_spectrogram_params()
)
cache_key = gen_cache_key(annotation["file_name"], get_spectrogram_params())
if use_cache:
if cache_key in cache and cache[cache_key]["thread_lock"] is False:
print(" using cached data for " + annotation["file_name"])
Expand All @@ -464,9 +443,7 @@ def get_data(annotation, use_cache=True):

else:
# if no, compute from scratch
file_params, im_data, aud_data = compute_data(
annotation, dataset["audio_dir"]
)
file_params, im_data, aud_data = compute_data(annotation, dataset["audio_dir"])

return file_params, im_data, aud_data

Expand Down Expand Up @@ -495,9 +472,7 @@ def cache_item(annotation):
:return:
"""
audio_dir = datasets[session["dataset_id"]]["audio_dir"]
cache_key = gen_cache_key(
annotation["file_name"], get_spectrogram_params()
)
cache_key = gen_cache_key(annotation["file_name"], get_spectrogram_params())

# initialise cache item
cache[cache_key] = {
Expand Down Expand Up @@ -567,8 +542,7 @@ def compute_cache_data(anns):
# to save space, we remove the oldest items from the cache
# size of cache is set in the config file
cache_data = [
{k: item[k] for k in ("cache_key", "created_at")}
for item in cache.values()
{k: item[k] for k in ("cache_key", "created_at")} for item in cache.values()
]
cache_data = sorted(cache_data, key=lambda item: item["created_at"])
cache_size = max(config.CACHE_SIZE, 3) # want to keep the most recent ones
Expand Down Expand Up @@ -770,3 +744,7 @@ def create_dataset(audio_dir, annotation_dir):
datasets[dataset["id"]] = dataset

return dataset["id"]


def main():
application.run(host="127.0.0.1", port=8000)
106 changes: 106 additions & 0 deletions batdetect2_gui/prepare_audio_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
"""
You can clip your files so that they are shorter using this script.
You need to specify the locations of the input files and where you want the
shorter files to be saved.
There are additional settings that allow you to specify the output duration
and where in the file you start clipping from.
"""

import argparse
import os
from pathlib import Path

import numpy as np

from batdetect2_gui import audio_utils as au
from batdetect2_gui import wavfile


def parse_args():
info_str = (
"\nScript that extracts smaller segment of audio from a larger file.\n"
+ " Place the files that should be clipped into the input directory.\n"
)

print(info_str)
parser = argparse.ArgumentParser()
parser.add_argument(
"input_directory",
type=str,
help="Input directory containing the audio files",
)
parser.add_argument(
"output_directory",
type=str,
help="Output directory the clipped audio files",
)
parser.add_argument(
"--output_duration",
default=2.0,
type=float,
help="Length of output clipped file (default is 2 seconds)",
)
parser.add_argument(
"--start_time",
type=float,
default=0.0,
help="Start time from which the audio file is clipped (deafult is 0.0)",
)
parser.add_argument(
"--time_expansion_factor",
type=int,
default=1,
help="The time expansion factor used for all files (default is 1)",
)
return vars(parser.parse_args())


def main():
args = parse_args()

audio_files = list(Path(args["input_directory"]).rglob("*.wav")) + list(
Path(args["input_directory"]).rglob("*.WAV")
)
ip_files = [os.path.join(aa.parent, aa.name) for aa in audio_files]

print("Input directory : " + args["input_directory"])
print("Output directory : " + args["output_directory"])
print("Start time : {}".format(args["start_time"]))
print("Output duration : {}".format(args["output_duration"]))
print("Audio files found : {}".format(len(ip_files)))

if len(ip_files) == 0:
return False

if not os.path.isdir(os.path.dirname(args["output_directory"])):
os.makedirs(os.path.dirname(args["output_directory"]))

for ii, ip_path in enumerate(ip_files):
sampling_rate, ip_audio = au.load_audio_file(
ip_path, args["time_expansion_factor"]
)

st_time = args["start_time"]
en_time = st_time + args["output_duration"]
st_samp = int(st_time * sampling_rate)
en_samp = np.minimum(int(en_time * sampling_rate), ip_audio.shape[0])

op_audio = np.zeros(
int(sampling_rate * args["output_duration"]), dtype=ip_audio.dtype
)
op_audio[: en_samp - st_samp] = ip_audio[st_samp:en_samp]

op_file = os.path.basename(ip_path).replace(" ", "_")
op_file_en = "__{:.2f}".format(st_time) + "_" + "{:.2f}".format(en_time)
op_file = op_file[:-4] + op_file_en + ".wav"

op_path = os.path.join(args["output_directory"], op_file)
wavfile.write(op_path, sampling_rate, op_audio)

print("\n{}\tIP: ".format(ii) + os.path.basename(ip_path))
print("\tOP: " + os.path.basename(op_path))


if __name__ == "__main__":
main()
107 changes: 2 additions & 105 deletions prepare_audio_files.py
Original file line number Diff line number Diff line change
@@ -1,107 +1,4 @@
"""
You can clip your files so that they are shorter using this script.
You need to specify the locations of the input files and where you want the
shorter files to be saved.
There are additional settings that allow you to specify the output duration
and where in the file you start clipping from.
"""

import argparse
import os
from pathlib import Path

import numpy as np

from batdetect2_gui import audio_utils as au
from batdetect2_gui import wavfile


def main(args):

audio_files = list(Path(args["input_directory"]).rglob("*.wav")) + list(
Path(args["input_directory"]).rglob("*.WAV")
)
ip_files = [os.path.join(aa.parent, aa.name) for aa in audio_files]

print("Input directory : " + args["input_directory"])
print("Output directory : " + args["output_directory"])
print("Start time : {}".format(args["start_time"]))
print("Output duration : {}".format(args["output_duration"]))
print("Audio files found : {}".format(len(ip_files)))

if len(ip_files) == 0:
return False

if not os.path.isdir(os.path.dirname(args["output_directory"])):
os.makedirs(os.path.dirname(args["output_directory"]))

for ii, ip_path in enumerate(ip_files):
sampling_rate, ip_audio = au.load_audio_file(
ip_path, args["time_expansion_factor"]
)
duration = ip_audio.shape[0] / sampling_rate

st_time = args["start_time"]
en_time = st_time + args["output_duration"]
st_samp = int(st_time * sampling_rate)
en_samp = np.minimum(int(en_time * sampling_rate), ip_audio.shape[0])

op_audio = np.zeros(
int(sampling_rate * args["output_duration"]), dtype=ip_audio.dtype
)
op_audio[: en_samp - st_samp] = ip_audio[st_samp:en_samp]

op_file = os.path.basename(ip_path).replace(" ", "_")
op_file_en = (
"__{:.2f}".format(st_time) + "_" + "{:.2f}".format(en_time)
)
op_file = op_file[:-4] + op_file_en + ".wav"

op_path = os.path.join(args["output_directory"], op_file)
wavfile.write(op_path, sampling_rate, op_audio)

print("\n{}\tIP: ".format(ii) + os.path.basename(ip_path))
print("\tOP: " + os.path.basename(op_path))

from batdetect2_gui.prepare_audio_files import main

if __name__ == "__main__":

info_str = (
"\nScript that extracts smaller segment of audio from a larger file.\n"
+ " Place the files that should be clipped into the input directory.\n"
)

print(info_str)
parser = argparse.ArgumentParser()
parser.add_argument(
"input_directory",
type=str,
help="Input directory containing the audio files",
)
parser.add_argument(
"output_directory",
type=str,
help="Output directory the clipped audio files",
)
parser.add_argument(
"--output_duration",
default=2.0,
type=float,
help="Length of output clipped file (default is 2 seconds)",
)
parser.add_argument(
"--start_time",
type=float,
default=0.0,
help="Start time from which the audio file is clipped (deafult is 0.0)",
)
parser.add_argument(
"--time_expansion_factor",
type=int,
default=1,
help="The time expansion factor used for all files (default is 1)",
)
args = vars(parser.parse_args())

main(args)
main()

0 comments on commit 71d7d53

Please sign in to comment.