Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline Modifications for variable Audio and Sampler Generalisation to different Download Tasks #110

Merged
merged 12 commits into from
Nov 18, 2021
40 changes: 30 additions & 10 deletions hearpreprocess/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
diagnostics,
download_file,
new_basedir,
safecopy,
str2int,
)

Expand Down Expand Up @@ -665,9 +666,17 @@ def run(self):
# duration
# sample duration is specified in the task config.
# The specified sample duration is in seconds
metadata = self.trim_event_metadata(
metadata, duration=self.task_config["sample_duration"]
)

# If the sample duration is set to None, no trimming of events will
# be done and the full audio file will be selected. This mode is
# only for special tasks and should not be generally used.
# Having all the audio files of the same length is more
# efficient for downstream pipelines

if self.task_config["sample_duration"] is not None:
khumairraj marked this conversation as resolved.
Show resolved Hide resolved
metadata = self.trim_event_metadata(
metadata, duration=self.task_config["sample_duration"]
)
else:
raise ValueError(
"%s embedding_type unknown" % self.task_config["embedding_type"]
Expand Down Expand Up @@ -818,6 +827,14 @@ def run(self):
# minutes or the timestamp embeddings will explode
sample_duration = self.task_config["sample_duration"]
max_split_duration = self.get_max_split_duration()
if sample_duration is None:
assert max_split_duration is None, (
"If the sample duration is set to None i.e. orignal audio files "
"are being used without any trimming or padding, then the "
"max_split_duration should also be None, so that no "
"subsampling is done as the audio file length is not "
"consistent."
)

# If max_split_duration is not None set the max_files so that
# the total duration of all the audio files after subsampling
Expand Down Expand Up @@ -919,15 +936,18 @@ def requires(self):

def run(self):
self.createsplit()

for audiofile in tqdm(list(self.requires()["corpus"].splitdir.iterdir())):
newaudiofile = self.splitdir.joinpath(f"{audiofile.stem}.wav")
audio_util.trim_pad_wav(
str(audiofile),
str(newaudiofile),
duration=self.task_config["sample_duration"],
)

if self.task_config["sample_duration"] is not None:
audio_util.trim_pad_wav(
str(audiofile),
str(newaudiofile),
duration=self.task_config["sample_duration"],
)
else:
# If the sample_duration is None, the file will be copied
# without any trimming or padding
safecopy(src=audiofile, dst=newaudiofile)
self.mark_complete()


Expand Down
67 changes: 43 additions & 24 deletions hearpreprocess/sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@
it simple to scale across multiple dataset
"""

import copy
import logging
import multiprocessing
import random
import shutil
import tempfile
from pathlib import Path
from typing import Any, Dict, Optional
from typing import Any, Callable, Dict, Optional, Type
from urllib.parse import urlparse

import click
Expand All @@ -28,6 +29,7 @@
import hearpreprocess.pipeline as pipeline
import hearpreprocess.tfds_pipeline as tfds_pipeline
import hearpreprocess.util.audio as audio_util
import hearpreprocess.util.luigi as luigi_util
from hearpreprocess import dcase2016_task2, nsynth_pitch, speech_commands, spoken_digit
from hearpreprocess.util.luigi import WorkTask

Expand All @@ -54,26 +56,34 @@


# Note: Necessary key helps to select audios with the necessary keys in there name
# Note: The `get_download_and_extract_tasks` is the task specific function which
# returns the tasks to download and extract the dataset for the task. This is
# requried here, because the sampling task needs to download and extract the
# tasks before actual sampling
configs = {
"dcase2016_task2": {
"task_config": dcase2016_task2.generic_task_config,
"audio_sample_size": 4,
"necessary_keys": [],
"get_download_and_extract_tasks": pipeline.get_download_and_extract_tasks,
turian marked this conversation as resolved.
Show resolved Hide resolved
},
"nsynth_pitch": {
"task_config": nsynth_pitch.generic_task_config,
"audio_sample_size": 100,
"necessary_keys": [],
"get_download_and_extract_tasks": pipeline.get_download_and_extract_tasks,
},
"speech_commands": {
"task_config": speech_commands.generic_task_config,
"audio_sample_size": 100,
"necessary_keys": [],
"get_download_and_extract_tasks": pipeline.get_download_and_extract_tasks,
},
"spoken_digit": {
"task_config": spoken_digit.generic_task_config,
"audio_sample_size": 100,
"necessary_keys": [],
"get_download_and_extract_tasks": tfds_pipeline.get_download_and_extract_tasks_tfds, # noqa: E501
},
# Add the sampler config for the secrets task if the secret task config was found.
# Not available for participants
Expand All @@ -85,19 +95,6 @@ class RandomSampleOriginalDataset(WorkTask):
necessary_keys = luigi.ListParameter()
audio_sample_size = luigi.IntParameter()

def requires(self):
# If this is a TensorFlow dataset then use the tfds pipeline
if "tfds_task_name" in self.task_config:
return tfds_pipeline.get_download_and_extract_tasks_tfds(self.task_config)

return pipeline.get_download_and_extract_tasks(self.task_config)

@staticmethod
def safecopy(src, dst):
# Make sure the parent destination directory exists
dst.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(src, dst)

@staticmethod
def trimcopy_audio(src, tmp_dst, fin_dst, small_duration):
"""
Expand Down Expand Up @@ -167,7 +164,9 @@ def run(self):

# Copy all the non audio files
for file in tqdm(copy_files):
self.safecopy(src=copy_from.joinpath(file), dst=copy_to.joinpath(file))
luigi_util.safecopy(
src=copy_from.joinpath(file), dst=copy_to.joinpath(file)
)

# Save all the audio after trimming them to small sample duration
# The small sample duration(in seconds) is specified in the small
Expand All @@ -189,6 +188,31 @@ def run(self):
shutil.make_archive(copy_to, "zip", copy_to)


def get_sampler_task(
sampler_config: Dict[str, Any]
) -> Type[RandomSampleOriginalDataset]:
"""
Returns a task to do sampling after downloading the dataset with
download and extract tasks from the dataset specific
`get_download_and_extract_tasks` function
"""
_task_config: Dict[str, Any] = copy.deepcopy(sampler_config["task_config"])
_task_config["mode"] = _task_config["default_mode"]
_get_download_and_extract_tasks: Callable = sampler_config[
"get_download_and_extract_tasks"
]

class _RandomSampleOriginalDataset(RandomSampleOriginalDataset):
task_config = _task_config
audio_sample_size = sampler_config["audio_sample_size"]
necessary_keys = sampler_config["necessary_keys"]

def requires(self):
return _get_download_and_extract_tasks(self.task_config)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hrmmm really? I mean this is the sampler so it's not core code, it's just for testing, but this seems wrong and smells bad.

requires() in luigi is supposed to return a Luigi task. Not do work. run() is where work should occur. Otherwise you might have weird luigi bugs that are hard to debug

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_get_download_and_extract_tasks returns the tasks which will download and extract. So technically, the requires is still returning a list of tasks. This is how our main pipeline is working where we use this function to build the task and then pass it in the ExtractMetadata as luigiParameter and put it in the requires. As discussed in the previous comment, the main reason to do this here, is that the _get_download_and_extract_tasks is different for different tasks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


return _RandomSampleOriginalDataset


@click.command()
@click.argument("task")
@click.option(
Expand All @@ -202,15 +226,10 @@ def main(task: str, num_workers: Optional[int] = None):
if num_workers is None:
num_workers = multiprocessing.cpu_count()
logger.info(f"Using {num_workers} workers")
config: Dict[str, Any] = configs[task]
default_config: str = config["task_config"]["default_mode"]
config["task_config"]["mode"] = default_config
sampler = RandomSampleOriginalDataset(
task_config=config["task_config"],
audio_sample_size=config["audio_sample_size"],
necessary_keys=config["necessary_keys"],
)
pipeline.run(sampler, num_workers=num_workers)

sampler_config: Dict[str, Any] = configs[task]
sampler = get_sampler_task(sampler_config)
pipeline.run(sampler(), num_workers=num_workers)


if __name__ == "__main__":
Expand Down
3 changes: 3 additions & 0 deletions hearpreprocess/util/audio.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ def get_audio_dir_stats(
all_file_paths,
)
)
if len(audio_paths) == 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait why would this happen? Why is this okay?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is no audio in the downloaded directory, we should return nothing. This can happen, for example, if a downloaded directory has just metadata files, we still need to put that in the requires of the ExtractMetadata, so that the task can actually run. This function, the one here, runs on all the downloaded directories in the requires. So, this is not a problem as we still have another assert below, So that if audio files are found and the stats are not calculated, the assert will throw, if audio files are not found, which is in this case, it will return the empty dict.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

print("No audio files present in the folder")
return {}
rng = random.Random(0)
rng.shuffle(audio_paths)

Expand Down
11 changes: 11 additions & 0 deletions hearpreprocess/util/luigi.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import os.path
from functools import partial
from pathlib import Path
import shutil

import luigi
import requests
Expand Down Expand Up @@ -219,3 +220,13 @@ def str2int(s: str) -> int:
https://stackoverflow.com/a/16008760/82733
"""
return int(hashlib.sha1(s.encode("utf-8")).hexdigest(), 16) % (2 ** 32 - 1)


def safecopy(src, dst):
"""
Copies a file after checking if the parent destination directory exists
If the parent doesnot exists, the parent directory will be made and the
file will be copied
"""
dst.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(src, dst)
25 changes: 24 additions & 1 deletion hearpreprocess/util/task_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,17 @@ def validate_generic_task_config(
This validator checks if the tfds task configuration is correctly
defined
* sample_duration can also be set to `None`, rather than integer or
float, in which case, the audio files in the dataset will not
be trimmed or padded, rather the original file duration will be
retained in the output of the pipeline.
In this case, the max split duration should be set to
None and no subsampling can be done, as file durations are
not consistent.
However, this is only for specific tasks and should not be generally
used as it is not efficient for downstream pipelines, particularly
embedding generation in heareval
Args:
task_config: Task config to be used with the pipeline
ignore_extra_keys: Flag for ignoring extra keys in the task configuration.
Expand Down Expand Up @@ -113,7 +124,9 @@ def validate_generic_task_config(
"embedding_type": Or("scene", "event", str),
"prediction_type": Or("multiclass", "multilabel", str),
"split_mode": Or("trainvaltest", "presplit_kfold", "new_split_kfold"),
"sample_duration": Or(float, int),
# When the sample duration is None, the original audio is retained
# without any trimming and padding
"sample_duration": Or(float, int, None),
"evaluation": Schema([str]),
"default_mode": Or("5h", "50h", "full", str),
}
Expand Down Expand Up @@ -177,6 +190,12 @@ def validate_generic_task_config(
): object,
}
)
# If the sample duration is set to None, the max_task_duration_by_split
# should also be None and no subsampling will be done
if task_config["sample_duration"] is None:
schema["max_task_duration_by_split"] = Schema(
{split: None for split in SPLITS}
)
elif split_mode in ["presplit_kfold", "new_split_kfold"]:

assert (
Expand All @@ -203,6 +222,10 @@ def validate_generic_task_config(
): object,
}
)
# If the sample duration is set to None, the max_task_duration_by_fold
# should also be None and no subsampling will be done
if task_config["sample_duration"] is None:
schema["max_task_duration_by_fold"] = None
else:
raise ValueError("Invalid split_mode")

Expand Down