diff --git a/hearpreprocess/pipeline.py b/hearpreprocess/pipeline.py index bb99ae3a..15b32f95 100644 --- a/hearpreprocess/pipeline.py +++ b/hearpreprocess/pipeline.py @@ -25,6 +25,7 @@ diagnostics, download_file, new_basedir, + safecopy, str2int, ) @@ -665,9 +666,17 @@ def run(self): # duration # sample duration is specified in the task config. # The specified sample duration is in seconds - metadata = self.trim_event_metadata( - metadata, duration=self.task_config["sample_duration"] - ) + + # If the sample duration is set to None, no trimming of events will + # be done and the full audio file will be selected. This mode is + # only for special tasks and should not be generally used. + # Having all the audio files of the same length is more + # efficient for downstream pipelines + + if self.task_config["sample_duration"] is not None: + metadata = self.trim_event_metadata( + metadata, duration=self.task_config["sample_duration"] + ) else: raise ValueError( "%s embedding_type unknown" % self.task_config["embedding_type"] @@ -818,6 +827,14 @@ def run(self): # minutes or the timestamp embeddings will explode sample_duration = self.task_config["sample_duration"] max_split_duration = self.get_max_split_duration() + if sample_duration is None: + assert max_split_duration is None, ( + "If the sample duration is set to None i.e. orignal audio files " + "are being used without any trimming or padding, then the " + "max_split_duration should also be None, so that no " + "subsampling is done as the audio file length is not " + "consistent." + ) # If max_split_duration is not None set the max_files so that # the total duration of all the audio files after subsampling @@ -919,15 +936,18 @@ def requires(self): def run(self): self.createsplit() - for audiofile in tqdm(list(self.requires()["corpus"].splitdir.iterdir())): newaudiofile = self.splitdir.joinpath(f"{audiofile.stem}.wav") - audio_util.trim_pad_wav( - str(audiofile), - str(newaudiofile), - duration=self.task_config["sample_duration"], - ) - + if self.task_config["sample_duration"] is not None: + audio_util.trim_pad_wav( + str(audiofile), + str(newaudiofile), + duration=self.task_config["sample_duration"], + ) + else: + # If the sample_duration is None, the file will be copied + # without any trimming or padding + safecopy(src=audiofile, dst=newaudiofile) self.mark_complete() diff --git a/hearpreprocess/sampler.py b/hearpreprocess/sampler.py index daf4c24d..fc09d148 100644 --- a/hearpreprocess/sampler.py +++ b/hearpreprocess/sampler.py @@ -12,13 +12,14 @@ it simple to scale across multiple dataset """ +import copy import logging import multiprocessing import random import shutil import tempfile from pathlib import Path -from typing import Any, Dict, Optional +from typing import Any, Callable, Dict, Optional, Type from urllib.parse import urlparse import click @@ -28,6 +29,7 @@ import hearpreprocess.pipeline as pipeline import hearpreprocess.tfds_pipeline as tfds_pipeline import hearpreprocess.util.audio as audio_util +import hearpreprocess.util.luigi as luigi_util from hearpreprocess import dcase2016_task2, nsynth_pitch, speech_commands, spoken_digit from hearpreprocess.util.luigi import WorkTask @@ -54,26 +56,34 @@ # Note: Necessary key helps to select audios with the necessary keys in there name +# Note: The `get_download_and_extract_tasks` is the task specific function which +# returns the tasks to download and extract the dataset for the task. This is +# requried here, because the sampling task needs to download and extract the +# tasks before actual sampling configs = { "dcase2016_task2": { "task_config": dcase2016_task2.generic_task_config, "audio_sample_size": 4, "necessary_keys": [], + "get_download_and_extract_tasks": pipeline.get_download_and_extract_tasks, }, "nsynth_pitch": { "task_config": nsynth_pitch.generic_task_config, "audio_sample_size": 100, "necessary_keys": [], + "get_download_and_extract_tasks": pipeline.get_download_and_extract_tasks, }, "speech_commands": { "task_config": speech_commands.generic_task_config, "audio_sample_size": 100, "necessary_keys": [], + "get_download_and_extract_tasks": pipeline.get_download_and_extract_tasks, }, "spoken_digit": { "task_config": spoken_digit.generic_task_config, "audio_sample_size": 100, "necessary_keys": [], + "get_download_and_extract_tasks": tfds_pipeline.get_download_and_extract_tasks_tfds, # noqa: E501 }, # Add the sampler config for the secrets task if the secret task config was found. # Not available for participants @@ -85,19 +95,6 @@ class RandomSampleOriginalDataset(WorkTask): necessary_keys = luigi.ListParameter() audio_sample_size = luigi.IntParameter() - def requires(self): - # If this is a TensorFlow dataset then use the tfds pipeline - if "tfds_task_name" in self.task_config: - return tfds_pipeline.get_download_and_extract_tasks_tfds(self.task_config) - - return pipeline.get_download_and_extract_tasks(self.task_config) - - @staticmethod - def safecopy(src, dst): - # Make sure the parent destination directory exists - dst.parent.mkdir(parents=True, exist_ok=True) - shutil.copy2(src, dst) - @staticmethod def trimcopy_audio(src, tmp_dst, fin_dst, small_duration): """ @@ -167,7 +164,9 @@ def run(self): # Copy all the non audio files for file in tqdm(copy_files): - self.safecopy(src=copy_from.joinpath(file), dst=copy_to.joinpath(file)) + luigi_util.safecopy( + src=copy_from.joinpath(file), dst=copy_to.joinpath(file) + ) # Save all the audio after trimming them to small sample duration # The small sample duration(in seconds) is specified in the small @@ -189,6 +188,31 @@ def run(self): shutil.make_archive(copy_to, "zip", copy_to) +def get_sampler_task( + sampler_config: Dict[str, Any] +) -> Type[RandomSampleOriginalDataset]: + """ + Returns a task to do sampling after downloading the dataset with + download and extract tasks from the dataset specific + `get_download_and_extract_tasks` function + """ + _task_config: Dict[str, Any] = copy.deepcopy(sampler_config["task_config"]) + _task_config["mode"] = _task_config["default_mode"] + _get_download_and_extract_tasks: Callable = sampler_config[ + "get_download_and_extract_tasks" + ] + + class _RandomSampleOriginalDataset(RandomSampleOriginalDataset): + task_config = _task_config + audio_sample_size = sampler_config["audio_sample_size"] + necessary_keys = sampler_config["necessary_keys"] + + def requires(self): + return _get_download_and_extract_tasks(self.task_config) + + return _RandomSampleOriginalDataset + + @click.command() @click.argument("task") @click.option( @@ -202,15 +226,10 @@ def main(task: str, num_workers: Optional[int] = None): if num_workers is None: num_workers = multiprocessing.cpu_count() logger.info(f"Using {num_workers} workers") - config: Dict[str, Any] = configs[task] - default_config: str = config["task_config"]["default_mode"] - config["task_config"]["mode"] = default_config - sampler = RandomSampleOriginalDataset( - task_config=config["task_config"], - audio_sample_size=config["audio_sample_size"], - necessary_keys=config["necessary_keys"], - ) - pipeline.run(sampler, num_workers=num_workers) + + sampler_config: Dict[str, Any] = configs[task] + sampler = get_sampler_task(sampler_config) + pipeline.run(sampler(), num_workers=num_workers) if __name__ == "__main__": diff --git a/hearpreprocess/util/audio.py b/hearpreprocess/util/audio.py index 9de8017a..d49e8e1c 100644 --- a/hearpreprocess/util/audio.py +++ b/hearpreprocess/util/audio.py @@ -160,6 +160,9 @@ def get_audio_dir_stats( all_file_paths, ) ) + if len(audio_paths) == 0: + print("No audio files present in the folder") + return {} rng = random.Random(0) rng.shuffle(audio_paths) diff --git a/hearpreprocess/util/luigi.py b/hearpreprocess/util/luigi.py index 5c293058..af68e835 100644 --- a/hearpreprocess/util/luigi.py +++ b/hearpreprocess/util/luigi.py @@ -8,6 +8,7 @@ import os.path from functools import partial from pathlib import Path +import shutil import luigi import requests @@ -219,3 +220,13 @@ def str2int(s: str) -> int: https://stackoverflow.com/a/16008760/82733 """ return int(hashlib.sha1(s.encode("utf-8")).hexdigest(), 16) % (2 ** 32 - 1) + + +def safecopy(src, dst): + """ + Copies a file after checking if the parent destination directory exists + If the parent doesnot exists, the parent directory will be made and the + file will be copied + """ + dst.parent.mkdir(parents=True, exist_ok=True) + shutil.copy2(src, dst) diff --git a/hearpreprocess/util/task_config.py b/hearpreprocess/util/task_config.py index 14d3bed2..d3af6daa 100644 --- a/hearpreprocess/util/task_config.py +++ b/hearpreprocess/util/task_config.py @@ -73,6 +73,17 @@ def validate_generic_task_config( This validator checks if the tfds task configuration is correctly defined + * sample_duration can also be set to `None`, rather than integer or + float, in which case, the audio files in the dataset will not + be trimmed or padded, rather the original file duration will be + retained in the output of the pipeline. + In this case, the max split duration should be set to + None and no subsampling can be done, as file durations are + not consistent. + However, this is only for specific tasks and should not be generally + used as it is not efficient for downstream pipelines, particularly + embedding generation in heareval + Args: task_config: Task config to be used with the pipeline ignore_extra_keys: Flag for ignoring extra keys in the task configuration. @@ -113,7 +124,9 @@ def validate_generic_task_config( "embedding_type": Or("scene", "event", str), "prediction_type": Or("multiclass", "multilabel", str), "split_mode": Or("trainvaltest", "presplit_kfold", "new_split_kfold"), - "sample_duration": Or(float, int), + # When the sample duration is None, the original audio is retained + # without any trimming and padding + "sample_duration": Or(float, int, None), "evaluation": Schema([str]), "default_mode": Or("5h", "50h", "full", str), } @@ -177,6 +190,12 @@ def validate_generic_task_config( ): object, } ) + # If the sample duration is set to None, the max_task_duration_by_split + # should also be None and no subsampling will be done + if task_config["sample_duration"] is None: + schema["max_task_duration_by_split"] = Schema( + {split: None for split in SPLITS} + ) elif split_mode in ["presplit_kfold", "new_split_kfold"]: assert ( @@ -203,6 +222,10 @@ def validate_generic_task_config( ): object, } ) + # If the sample duration is set to None, the max_task_duration_by_fold + # should also be None and no subsampling will be done + if task_config["sample_duration"] is None: + schema["max_task_duration_by_fold"] = None else: raise ValueError("Invalid split_mode")