Skip to content

Commit

Permalink
Merge pull request #110 from neuralaudio/pipeline_changes
Browse files Browse the repository at this point in the history
Pipeline Modifications for variable Audio and Sampler Generalisation to different Download Tasks
  • Loading branch information
turian authored Nov 18, 2021
2 parents 25fcba9 + 8a56ace commit 943439c
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 35 deletions.
40 changes: 30 additions & 10 deletions hearpreprocess/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
diagnostics,
download_file,
new_basedir,
safecopy,
str2int,
)

Expand Down Expand Up @@ -665,9 +666,17 @@ def run(self):
# duration
# sample duration is specified in the task config.
# The specified sample duration is in seconds
metadata = self.trim_event_metadata(
metadata, duration=self.task_config["sample_duration"]
)

# If the sample duration is set to None, no trimming of events will
# be done and the full audio file will be selected. This mode is
# only for special tasks and should not be generally used.
# Having all the audio files of the same length is more
# efficient for downstream pipelines

if self.task_config["sample_duration"] is not None:
metadata = self.trim_event_metadata(
metadata, duration=self.task_config["sample_duration"]
)
else:
raise ValueError(
"%s embedding_type unknown" % self.task_config["embedding_type"]
Expand Down Expand Up @@ -818,6 +827,14 @@ def run(self):
# minutes or the timestamp embeddings will explode
sample_duration = self.task_config["sample_duration"]
max_split_duration = self.get_max_split_duration()
if sample_duration is None:
assert max_split_duration is None, (
"If the sample duration is set to None i.e. orignal audio files "
"are being used without any trimming or padding, then the "
"max_split_duration should also be None, so that no "
"subsampling is done as the audio file length is not "
"consistent."
)

# If max_split_duration is not None set the max_files so that
# the total duration of all the audio files after subsampling
Expand Down Expand Up @@ -919,15 +936,18 @@ def requires(self):

def run(self):
self.createsplit()

for audiofile in tqdm(list(self.requires()["corpus"].splitdir.iterdir())):
newaudiofile = self.splitdir.joinpath(f"{audiofile.stem}.wav")
audio_util.trim_pad_wav(
str(audiofile),
str(newaudiofile),
duration=self.task_config["sample_duration"],
)

if self.task_config["sample_duration"] is not None:
audio_util.trim_pad_wav(
str(audiofile),
str(newaudiofile),
duration=self.task_config["sample_duration"],
)
else:
# If the sample_duration is None, the file will be copied
# without any trimming or padding
safecopy(src=audiofile, dst=newaudiofile)
self.mark_complete()


Expand Down
67 changes: 43 additions & 24 deletions hearpreprocess/sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@
it simple to scale across multiple dataset
"""

import copy
import logging
import multiprocessing
import random
import shutil
import tempfile
from pathlib import Path
from typing import Any, Dict, Optional
from typing import Any, Callable, Dict, Optional, Type
from urllib.parse import urlparse

import click
Expand All @@ -28,6 +29,7 @@
import hearpreprocess.pipeline as pipeline
import hearpreprocess.tfds_pipeline as tfds_pipeline
import hearpreprocess.util.audio as audio_util
import hearpreprocess.util.luigi as luigi_util
from hearpreprocess import dcase2016_task2, nsynth_pitch, speech_commands, spoken_digit
from hearpreprocess.util.luigi import WorkTask

Expand All @@ -54,26 +56,34 @@


# Note: Necessary key helps to select audios with the necessary keys in there name
# Note: The `get_download_and_extract_tasks` is the task specific function which
# returns the tasks to download and extract the dataset for the task. This is
# requried here, because the sampling task needs to download and extract the
# tasks before actual sampling
configs = {
"dcase2016_task2": {
"task_config": dcase2016_task2.generic_task_config,
"audio_sample_size": 4,
"necessary_keys": [],
"get_download_and_extract_tasks": pipeline.get_download_and_extract_tasks,
},
"nsynth_pitch": {
"task_config": nsynth_pitch.generic_task_config,
"audio_sample_size": 100,
"necessary_keys": [],
"get_download_and_extract_tasks": pipeline.get_download_and_extract_tasks,
},
"speech_commands": {
"task_config": speech_commands.generic_task_config,
"audio_sample_size": 100,
"necessary_keys": [],
"get_download_and_extract_tasks": pipeline.get_download_and_extract_tasks,
},
"spoken_digit": {
"task_config": spoken_digit.generic_task_config,
"audio_sample_size": 100,
"necessary_keys": [],
"get_download_and_extract_tasks": tfds_pipeline.get_download_and_extract_tasks_tfds, # noqa: E501
},
# Add the sampler config for the secrets task if the secret task config was found.
# Not available for participants
Expand All @@ -85,19 +95,6 @@ class RandomSampleOriginalDataset(WorkTask):
necessary_keys = luigi.ListParameter()
audio_sample_size = luigi.IntParameter()

def requires(self):
# If this is a TensorFlow dataset then use the tfds pipeline
if "tfds_task_name" in self.task_config:
return tfds_pipeline.get_download_and_extract_tasks_tfds(self.task_config)

return pipeline.get_download_and_extract_tasks(self.task_config)

@staticmethod
def safecopy(src, dst):
# Make sure the parent destination directory exists
dst.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(src, dst)

@staticmethod
def trimcopy_audio(src, tmp_dst, fin_dst, small_duration):
"""
Expand Down Expand Up @@ -167,7 +164,9 @@ def run(self):

# Copy all the non audio files
for file in tqdm(copy_files):
self.safecopy(src=copy_from.joinpath(file), dst=copy_to.joinpath(file))
luigi_util.safecopy(
src=copy_from.joinpath(file), dst=copy_to.joinpath(file)
)

# Save all the audio after trimming them to small sample duration
# The small sample duration(in seconds) is specified in the small
Expand All @@ -189,6 +188,31 @@ def run(self):
shutil.make_archive(copy_to, "zip", copy_to)


def get_sampler_task(
sampler_config: Dict[str, Any]
) -> Type[RandomSampleOriginalDataset]:
"""
Returns a task to do sampling after downloading the dataset with
download and extract tasks from the dataset specific
`get_download_and_extract_tasks` function
"""
_task_config: Dict[str, Any] = copy.deepcopy(sampler_config["task_config"])
_task_config["mode"] = _task_config["default_mode"]
_get_download_and_extract_tasks: Callable = sampler_config[
"get_download_and_extract_tasks"
]

class _RandomSampleOriginalDataset(RandomSampleOriginalDataset):
task_config = _task_config
audio_sample_size = sampler_config["audio_sample_size"]
necessary_keys = sampler_config["necessary_keys"]

def requires(self):
return _get_download_and_extract_tasks(self.task_config)

return _RandomSampleOriginalDataset


@click.command()
@click.argument("task")
@click.option(
Expand All @@ -202,15 +226,10 @@ def main(task: str, num_workers: Optional[int] = None):
if num_workers is None:
num_workers = multiprocessing.cpu_count()
logger.info(f"Using {num_workers} workers")
config: Dict[str, Any] = configs[task]
default_config: str = config["task_config"]["default_mode"]
config["task_config"]["mode"] = default_config
sampler = RandomSampleOriginalDataset(
task_config=config["task_config"],
audio_sample_size=config["audio_sample_size"],
necessary_keys=config["necessary_keys"],
)
pipeline.run(sampler, num_workers=num_workers)

sampler_config: Dict[str, Any] = configs[task]
sampler = get_sampler_task(sampler_config)
pipeline.run(sampler(), num_workers=num_workers)


if __name__ == "__main__":
Expand Down
3 changes: 3 additions & 0 deletions hearpreprocess/util/audio.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ def get_audio_dir_stats(
all_file_paths,
)
)
if len(audio_paths) == 0:
print("No audio files present in the folder")
return {}
rng = random.Random(0)
rng.shuffle(audio_paths)

Expand Down
11 changes: 11 additions & 0 deletions hearpreprocess/util/luigi.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import os.path
from functools import partial
from pathlib import Path
import shutil

import luigi
import requests
Expand Down Expand Up @@ -219,3 +220,13 @@ def str2int(s: str) -> int:
https://stackoverflow.com/a/16008760/82733
"""
return int(hashlib.sha1(s.encode("utf-8")).hexdigest(), 16) % (2 ** 32 - 1)


def safecopy(src, dst):
"""
Copies a file after checking if the parent destination directory exists
If the parent doesnot exists, the parent directory will be made and the
file will be copied
"""
dst.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(src, dst)
25 changes: 24 additions & 1 deletion hearpreprocess/util/task_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,17 @@ def validate_generic_task_config(
This validator checks if the tfds task configuration is correctly
defined
* sample_duration can also be set to `None`, rather than integer or
float, in which case, the audio files in the dataset will not
be trimmed or padded, rather the original file duration will be
retained in the output of the pipeline.
In this case, the max split duration should be set to
None and no subsampling can be done, as file durations are
not consistent.
However, this is only for specific tasks and should not be generally
used as it is not efficient for downstream pipelines, particularly
embedding generation in heareval
Args:
task_config: Task config to be used with the pipeline
ignore_extra_keys: Flag for ignoring extra keys in the task configuration.
Expand Down Expand Up @@ -113,7 +124,9 @@ def validate_generic_task_config(
"embedding_type": Or("scene", "event", str),
"prediction_type": Or("multiclass", "multilabel", str),
"split_mode": Or("trainvaltest", "presplit_kfold", "new_split_kfold"),
"sample_duration": Or(float, int),
# When the sample duration is None, the original audio is retained
# without any trimming and padding
"sample_duration": Or(float, int, None),
"evaluation": Schema([str]),
"default_mode": Or("5h", "50h", "full", str),
}
Expand Down Expand Up @@ -177,6 +190,12 @@ def validate_generic_task_config(
): object,
}
)
# If the sample duration is set to None, the max_task_duration_by_split
# should also be None and no subsampling will be done
if task_config["sample_duration"] is None:
schema["max_task_duration_by_split"] = Schema(
{split: None for split in SPLITS}
)
elif split_mode in ["presplit_kfold", "new_split_kfold"]:

assert (
Expand All @@ -203,6 +222,10 @@ def validate_generic_task_config(
): object,
}
)
# If the sample duration is set to None, the max_task_duration_by_fold
# should also be None and no subsampling will be done
if task_config["sample_duration"] is None:
schema["max_task_duration_by_fold"] = None
else:
raise ValueError("Invalid split_mode")

Expand Down

0 comments on commit 943439c

Please sign in to comment.