From 7b09ed3f1f86b7f49e609878e6635ada9d01b615 Mon Sep 17 00:00:00 2001 From: jwong-nd Date: Mon, 4 Nov 2024 15:27:18 -0800 Subject: [PATCH 01/31] base commit, agg compression jobs to run in loop --- src/aind_behavior_video_transformation/etl.py | 117 +++++++++++++--- .../filesystem.py | 128 ------------------ .../transform_videos.py | 39 ++++-- tests/test_transform_videos.py | 3 - 4 files changed, 125 insertions(+), 162 deletions(-) delete mode 100644 src/aind_behavior_video_transformation/filesystem.py diff --git a/src/aind_behavior_video_transformation/etl.py b/src/aind_behavior_video_transformation/etl.py index 415791e..8746e93 100644 --- a/src/aind_behavior_video_transformation/etl.py +++ b/src/aind_behavior_video_transformation/etl.py @@ -14,15 +14,13 @@ ) from pydantic import Field -from aind_behavior_video_transformation.filesystem import ( - build_overrides_dict, - transform_directory, -) from aind_behavior_video_transformation.transform_videos import ( - CompressionRequest, + CompressionRequest, convert_video ) +PathLike = Union[Path, str] + class BehaviorVideoJobSettings(BasicJobSettings): """ BehaviorJob settings. Inherits both fields input_source and @@ -34,7 +32,7 @@ class BehaviorVideoJobSettings(BasicJobSettings): description="Compression requested for video files", ) video_specific_compression_requests: Optional[ - List[Tuple[Union[Path, str], CompressionRequest]] + List[Tuple[PathLike, CompressionRequest]] ] = Field( default=None, description=( @@ -43,6 +41,7 @@ class BehaviorVideoJobSettings(BasicJobSettings): "request" ), ) + video_extensions = [".mp4", ".avi", ".mov", ".mkv", ".flv", ".wmv", ".webm"] class BehaviorVideoJob(GenericEtl[BehaviorVideoJobSettings]): @@ -64,6 +63,96 @@ class BehaviorVideoJob(GenericEtl[BehaviorVideoJobSettings]): run_job() -> JobResponse """ + def _format_output_directory(self) -> None: + """ + Recurisively copies (symlink) non-video files + from input directory to output directory + perserving filesysem structure. + """ + input_dir = Path(self.job_settings.input_source.resolve()) + output_dir = Path(self.job_settings.output_directory.resolve()) + output_dir.mkdir(parents=True, exist_ok=True) + + for file in input_dir.rglob('*'): + if (file.is_file() and + not any(file.suffix.lower() == ext + for ext in self.job_settings.video_extensions)): + relative_path = file.relative_to(input_dir) + target_link = output_dir / relative_path + target_link.parent.mkdir(parents=True, exist_ok=True) + target_link.symlink_to(file) + + + def _resolve_compression_requests( + self + ) -> List[Tuple[PathLike, CompressionRequest]]: + """ + Recursively traverses input directory and + resolves CompressionRequest of all videos. + + Sets 'compression_requested' as the default, unless + overrided in 'video_specific_compression_requests'. + """ + input_dir = Path(self.job_settings.input_source.resolve()) + + # Define map: abs_path -> override CompressionRequest + overrides = {} + for vid_path, comp_req in self.job_settings.video_specific_compression_requests: + vid_path = Path(vid_path) + abs_path = None + if vid_path.is_absolute(): + abs_path = vid_path + elif vid_path.exists(): + abs_path = vid_path.resolve() + else: + abs_path = (input_dir / vid_path).resolve() + overrides[abs_path] = comp_req + + # Produce list of all (abs_path, CompressionRequest) pairs + path_comp_req_pairs = \ + [(file, self.job_settings.compression_requested) + for file in input_dir.rglob('*') + if (file.is_file() and any(file.suffix.lower() == ext + for ext in self.job_settings.video_extensions))] + path_comp_req_pairs = \ + [(file, overrides[file]) + for (file, _) in path_comp_req_pairs + if file in overrides] + + return path_comp_req_pairs + + + def _run_compression( + self, + path_comp_req_pairs: List[Tuple[PathLike, CompressionRequest]], + parallel=True + ) -> None: + """ + Runs CompressionRequests at the specified paths. + """ + input_dir = Path(self.job_settings.input_source.resolve()) + output_dir = Path(self.job_settings.output_directory.resolve()) + + convert_video_params = [] + for vid_path, comp_req in path_comp_req_pairs: + # Resolve destination + relative_path = vid_path.relative_to(input_dir) + output_path = output_dir / relative_path + output_dir = output_path.parent + + # Resolve compression params + arg_set = comp_req.determine_ffmpeg_arg_set() + + # Add to job buffer + convert_video_params.append((vid_path, output_dir, arg_set)) + logging.info(f'Compressing {str(vid_path)} w/ {comp_req.compression_enum}') + + # TODO: Parallelize this loop with Dask/Futures. + # Most important is no silent errors. + for params in convert_video_params: + convert_video(params) + + def run_job(self) -> JobResponse: """ Main public method to run the compression job. @@ -83,20 +172,10 @@ def run_job(self) -> JobResponse: """ job_start_time = time() - video_comp_pairs = ( - self.job_settings.video_specific_compression_requests - ) - job_out_dir_path = self.job_settings.output_directory.resolve() - Path(job_out_dir_path).mkdir(exist_ok=True) - job_in_dir_path = self.job_settings.input_source.resolve() - overrides = build_overrides_dict(video_comp_pairs, job_in_dir_path) + self._format_output_directory() + path_comp_req_pairs = self._resolve_compression_requests() + self._run_compression(path_comp_req_pairs) - ffmpeg_arg_set = ( - self.job_settings.compression_requested.determine_ffmpeg_arg_set() - ) - transform_directory( - job_in_dir_path, job_out_dir_path, ffmpeg_arg_set, overrides - ) job_end_time = time() return JobResponse( status_code=200, diff --git a/src/aind_behavior_video_transformation/filesystem.py b/src/aind_behavior_video_transformation/filesystem.py deleted file mode 100644 index 6e44e82..0000000 --- a/src/aind_behavior_video_transformation/filesystem.py +++ /dev/null @@ -1,128 +0,0 @@ -"""Module for handling file discovery to transform videos.""" - -from os import symlink, walk -from os.path import relpath -from pathlib import Path - -from aind_behavior_video_transformation.transform_videos import convert_video - - -def likely_video_file(file: Path) -> bool: - """ - Check if a file is likely a video file based on its suffix. - - Parameters - ---------- - file : Path - The file path to check. - - Returns - ------- - bool - True if the file suffix indicates it is a video file, False otherwise. - """ - return file.suffix in set( - [ - ".mp4", - ".avi", - ".mov", - ".mkv", - ".flv", - ".wmv", - ".webm", - ] - ) - - -def build_overrides_dict(video_comp_pairs, job_in_dir_path): - """ - Builds a dictionary of override arguments for video paths. - - Parameters - ---------- - video_comp_pairs : list of tuple - A list of tuples where each tuple contains a video file name and a - corresponding CompressionRequest object. - job_in_dir_path : Path - The base directory path where the job input files are located. - - Returns - ------- - dict - A dictionary where keys are video paths (either files or directories) - and values are the override argument sets determined by the - CompressionRequest objects. - """ - overrides = dict() - if video_comp_pairs: - for video_name, comp_req in video_comp_pairs: - video_path = Path(video_name) - # Figure out how video path was passed, convert to absolute - if video_path.is_absolute(): - in_path = video_path - elif video_path.exists(): - in_path = video_path.resolve() - else: - in_path = (job_in_dir_path / video_path).resolve() - # Set overrides for the video path - override_arg_set = comp_req.determine_ffmpeg_arg_set() - # If it is a directory, set overrides for all subdirectories - if in_path.is_dir(): - overrides[in_path] = override_arg_set - for root, dirs, _ in walk(in_path, followlinks=True): - root_path = Path(root) - for dir_name in dirs: - subdir = root_path / dir_name - overrides[subdir] = override_arg_set - # If it is a file, set override for the file - else: - overrides[in_path] = override_arg_set - - return overrides - - -def transform_directory( - input_dir: Path, output_dir: Path, arg_set, overrides=dict() -) -> None: - """ - Transforms all video files in a directory and its subdirectories, - and creates symbolic links for non-video files. Subdirectories are - created as needed. - - Parameters - ---------- - input_dir : Path - The directory containing the input files. - output_dir : Path - The directory where the transformed files and symbolic links will be - saved. - arg_set : Any - The set of arguments to be used for video transformation. - overrides : dict, optional - A dictionary containing overrides for specific directories or files. - Keys are Paths and values are argument sets. Default is an empty - dictionary. - - Returns - ------- - None - """ - for root, dirs, files in walk(input_dir, followlinks=True): - root_path = Path(root) - in_relpath = relpath(root, input_dir) - dst_dir = output_dir / in_relpath - for dir_name in dirs: - out_path = dst_dir / dir_name - out_path.mkdir(parents=True, exist_ok=True) - - for file_name in files: - file_path = Path(root) / file_name - if likely_video_file(file_path): - # If the parent directory has an override, use that - this_arg_set = overrides.get(root_path, arg_set) - # File-level overrides take precedence - this_arg_set = overrides.get(file_path, this_arg_set) - convert_video(file_path, dst_dir, this_arg_set) - else: - out_path = dst_dir / file_name - symlink(file_path, out_path) diff --git a/src/aind_behavior_video_transformation/transform_videos.py b/src/aind_behavior_video_transformation/transform_videos.py index 6df7ba7..383ee13 100644 --- a/src/aind_behavior_video_transformation/transform_videos.py +++ b/src/aind_behavior_video_transformation/transform_videos.py @@ -1,6 +1,17 @@ -"""Module to handle transforming behavior videos""" +""" +Module to handle transforming behavior videos + +To add a new compression preset: +1) Define a CompressionEnum: 'CUSTOM_COMPRESSION = 'custom' +2) Define corresponding FfmpegInputArgs/FfmpegOutputArgs. +3) Add the CompressionEnum to FfmpegArgSet, and build + (FfmpegInputArgs, FfmpegOutputArgs) tuple: + 'CUSTOM_COMPRESSION' = ( + FfmpegInputArgs.CUSTOM_INPUT_ARGS, + FfmpegOutputArgs.CUSTOM_OUTPUT_ARGS, + ) +""" -import logging import shlex import subprocess from enum import Enum @@ -123,7 +134,7 @@ class CompressionRequest(BaseModel): def determine_ffmpeg_arg_set( self, - ) -> Optional[Tuple[Optional[str], Optional[str]]]: + ) -> Optional[Tuple[str, str]]: """ Determines the appropriate set of FFmpeg arguments based on the compression requirements. @@ -153,6 +164,7 @@ def determine_ffmpeg_arg_set( self.user_ffmpeg_input_options, self.user_ffmpeg_output_options, ) + # If not one of the two special cases, use the enum values else: # If default, set compression to gamma @@ -160,12 +172,21 @@ def determine_ffmpeg_arg_set( compression_preset = CompressionEnum.GAMMA_ENCODING else: compression_preset = self.compression_enum + + # Resolve two levels of indirection here + # FfmpegArgSet -> (FfmpegInputArgs, FfmpegOutputArgs) + # (FfmpegInputArgs, FfmpegOutputArgs) -> (in_args str, out_args str) arg_set_enum = FfmpegArgSet[compression_preset.name].value arg_set = (arg_set_enum[0].value, arg_set_enum[1].value) + return arg_set -def convert_video(video_path: Path, dst: Path, arg_set) -> Path: +def convert_video( + video_path: Path, + output_dir: Path, + arg_set: Optional[Tuple[str, str]] + ) -> Path: """ Converts a video to a specified format using ffmpeg. @@ -173,7 +194,7 @@ def convert_video(video_path: Path, dst: Path, arg_set) -> Path: ---------- video_path : Path The path to the input video file. - dst : Path + output_dir : Path The destination directory where the converted video will be saved. arg_set : tuple or None A tuple containing input and output arguments for ffmpeg. If None, a @@ -189,11 +210,9 @@ def convert_video(video_path: Path, dst: Path, arg_set) -> Path: - The function uses ffmpeg for video conversion. - If `arg_set` is None, the function creates a symlink to the original video file. - - The function logs the ffmpeg command used for conversion. """ - out_path = dst / f"{video_path.stem}.mp4" # noqa: E501 - # Pydantic validation ensures this is a 'CompressionRequest' value. + out_path = output_dir / f"{video_path.stem}.mp4" # noqa: E501 # Trivial Case, do nothing if arg_set is None: @@ -211,10 +230,6 @@ def convert_video(video_path: Path, dst: Path, arg_set) -> Path: ffmpeg_command.extend(shlex.split(output_args)) ffmpeg_command.append(str(out_path)) - # For logging I guess - ffmpeg_str = " ".join(ffmpeg_command) - logging.info(f"{ffmpeg_str=}") - subprocess.run(ffmpeg_command, check=True) return out_path diff --git a/tests/test_transform_videos.py b/tests/test_transform_videos.py index b08bc29..ce72e02 100644 --- a/tests/test_transform_videos.py +++ b/tests/test_transform_videos.py @@ -43,9 +43,6 @@ def helper_run_compression_job(job_settings, mock_time): class TestBehaviorVideoJob(unittest.TestCase): """Test methods in BehaviorVideoJob class.""" - # NOTE: - # Test suite does not run yet. - # Resolving lint errors first. test_data_path = Path("tests/test_video_in_dir").resolve() dummy_response = JobResponse( status_code=200, From 3c9e90ad72ed2c527a59bf5950020f28508d240a Mon Sep 17 00:00:00 2001 From: jwong-nd Date: Mon, 4 Nov 2024 16:45:47 -0800 Subject: [PATCH 02/31] add parallelization --- src/aind_behavior_video_transformation/etl.py | 27 ++++++++++++++++--- .../transform_videos.py | 1 + 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/src/aind_behavior_video_transformation/etl.py b/src/aind_behavior_video_transformation/etl.py index 8746e93..cbf2590 100644 --- a/src/aind_behavior_video_transformation/etl.py +++ b/src/aind_behavior_video_transformation/etl.py @@ -12,6 +12,7 @@ JobResponse, get_parser, ) +from concurrent.futures import ProcessPoolExecutor, as_completed from pydantic import Field from aind_behavior_video_transformation.transform_videos import ( @@ -147,10 +148,28 @@ def _run_compression( convert_video_params.append((vid_path, output_dir, arg_set)) logging.info(f'Compressing {str(vid_path)} w/ {comp_req.compression_enum}') - # TODO: Parallelize this loop with Dask/Futures. - # Most important is no silent errors. - for params in convert_video_params: - convert_video(params) + if parallel: + # Dask implementation + # import dask + # jobs = [dask.delayed(convert_video)(*params) for params in convert_video_params] + # dask.compute(*jobs) # This returns an error if any jobs fail + + # ProcessPool implementation + num_jobs = len(convert_video_params) + with ProcessPoolExecutor(max_workers=num_jobs) as executor: + jobs = [executor.submit(convert_video, *params) + for params in convert_video_params] + for job in as_completed(jobs): + try: + result = job.result() + print("FFmpeg job completed:", result) + except Exception as e: + print("Error:", e) + + else: + # Execute Serially + for params in convert_video_params: + convert_video(params) def run_job(self) -> JobResponse: diff --git a/src/aind_behavior_video_transformation/transform_videos.py b/src/aind_behavior_video_transformation/transform_videos.py index 383ee13..d1acce5 100644 --- a/src/aind_behavior_video_transformation/transform_videos.py +++ b/src/aind_behavior_video_transformation/transform_videos.py @@ -223,6 +223,7 @@ def convert_video( output_args = arg_set[1] ffmpeg_command = ["ffmpeg", "-y", "-v", "warning", "-hide_banner"] + ffmpeg_command.extend(["-threads", "8"]) # Use 8 threads per compression job if input_args: ffmpeg_command.extend(shlex.split(input_args)) ffmpeg_command.extend(["-i", str(video_path)]) From e685bc645e15d281ca9f8e2b049df400e6852267 Mon Sep 17 00:00:00 2001 From: jwong-nd Date: Mon, 4 Nov 2024 16:54:03 -0800 Subject: [PATCH 03/31] linting --- src/aind_behavior_video_transformation/etl.py | 77 ++++++++++++------- .../transform_videos.py | 13 ++-- 2 files changed, 56 insertions(+), 34 deletions(-) diff --git a/src/aind_behavior_video_transformation/etl.py b/src/aind_behavior_video_transformation/etl.py index cbf2590..f71f507 100644 --- a/src/aind_behavior_video_transformation/etl.py +++ b/src/aind_behavior_video_transformation/etl.py @@ -2,6 +2,7 @@ import logging import sys +from concurrent.futures import ProcessPoolExecutor, as_completed from pathlib import Path from time import time from typing import List, Optional, Tuple, Union @@ -12,16 +13,16 @@ JobResponse, get_parser, ) -from concurrent.futures import ProcessPoolExecutor, as_completed from pydantic import Field from aind_behavior_video_transformation.transform_videos import ( - CompressionRequest, convert_video + CompressionRequest, + convert_video, ) - PathLike = Union[Path, str] + class BehaviorVideoJobSettings(BasicJobSettings): """ BehaviorJob settings. Inherits both fields input_source and @@ -42,7 +43,15 @@ class BehaviorVideoJobSettings(BasicJobSettings): "request" ), ) - video_extensions = [".mp4", ".avi", ".mov", ".mkv", ".flv", ".wmv", ".webm"] + video_extensions = [ + ".mp4", + ".avi", + ".mov", + ".mkv", + ".flv", + ".wmv", + ".webm", + ] class BehaviorVideoJob(GenericEtl[BehaviorVideoJobSettings]): @@ -74,19 +83,19 @@ def _format_output_directory(self) -> None: output_dir = Path(self.job_settings.output_directory.resolve()) output_dir.mkdir(parents=True, exist_ok=True) - for file in input_dir.rglob('*'): - if (file.is_file() and - not any(file.suffix.lower() == ext - for ext in self.job_settings.video_extensions)): + for file in input_dir.rglob("*"): + if file.is_file() and not any( + file.suffix.lower() == ext + for ext in self.job_settings.video_extensions + ): relative_path = file.relative_to(input_dir) target_link = output_dir / relative_path target_link.parent.mkdir(parents=True, exist_ok=True) target_link.symlink_to(file) - def _resolve_compression_requests( - self - ) -> List[Tuple[PathLike, CompressionRequest]]: + self, + ) -> List[Tuple[PathLike, CompressionRequest]]: """ Recursively traverses input directory and resolves CompressionRequest of all videos. @@ -98,7 +107,8 @@ def _resolve_compression_requests( # Define map: abs_path -> override CompressionRequest overrides = {} - for vid_path, comp_req in self.job_settings.video_specific_compression_requests: + comp_reqs = self.job_settings.video_specific_compression_requests + for vid_path, comp_req in comp_reqs: vid_path = Path(vid_path) abs_path = None if vid_path.is_absolute(): @@ -110,24 +120,30 @@ def _resolve_compression_requests( overrides[abs_path] = comp_req # Produce list of all (abs_path, CompressionRequest) pairs - path_comp_req_pairs = \ - [(file, self.job_settings.compression_requested) - for file in input_dir.rglob('*') - if (file.is_file() and any(file.suffix.lower() == ext - for ext in self.job_settings.video_extensions))] - path_comp_req_pairs = \ - [(file, overrides[file]) + path_comp_req_pairs = [ + (file, self.job_settings.compression_requested) + for file in input_dir.rglob("*") + if ( + file.is_file() + and any( + file.suffix.lower() == ext + for ext in self.job_settings.video_extensions + ) + ) + ] + path_comp_req_pairs = [ + (file, overrides[file]) for (file, _) in path_comp_req_pairs - if file in overrides] + if file in overrides + ] return path_comp_req_pairs - def _run_compression( self, path_comp_req_pairs: List[Tuple[PathLike, CompressionRequest]], - parallel=True - ) -> None: + parallel=True, + ) -> None: """ Runs CompressionRequests at the specified paths. """ @@ -146,19 +162,25 @@ def _run_compression( # Add to job buffer convert_video_params.append((vid_path, output_dir, arg_set)) - logging.info(f'Compressing {str(vid_path)} w/ {comp_req.compression_enum}') + logging.info( + f"Compressing {str(vid_path)} \ + w/ {comp_req.compression_enum}" + ) if parallel: # Dask implementation # import dask - # jobs = [dask.delayed(convert_video)(*params) for params in convert_video_params] + # jobs = [dask.delayed(convert_video)(*params) + # for params in convert_video_params] # dask.compute(*jobs) # This returns an error if any jobs fail # ProcessPool implementation num_jobs = len(convert_video_params) with ProcessPoolExecutor(max_workers=num_jobs) as executor: - jobs = [executor.submit(convert_video, *params) - for params in convert_video_params] + jobs = [ + executor.submit(convert_video, *params) + for params in convert_video_params + ] for job in as_completed(jobs): try: result = job.result() @@ -171,7 +193,6 @@ def _run_compression( for params in convert_video_params: convert_video(params) - def run_job(self) -> JobResponse: """ Main public method to run the compression job. diff --git a/src/aind_behavior_video_transformation/transform_videos.py b/src/aind_behavior_video_transformation/transform_videos.py index d1acce5..a97550f 100644 --- a/src/aind_behavior_video_transformation/transform_videos.py +++ b/src/aind_behavior_video_transformation/transform_videos.py @@ -175,7 +175,8 @@ def determine_ffmpeg_arg_set( # Resolve two levels of indirection here # FfmpegArgSet -> (FfmpegInputArgs, FfmpegOutputArgs) - # (FfmpegInputArgs, FfmpegOutputArgs) -> (in_args str, out_args str) + # (FfmpegInputArgs, FfmpegOutputArgs) + # -> (in_args str, out_args str) arg_set_enum = FfmpegArgSet[compression_preset.name].value arg_set = (arg_set_enum[0].value, arg_set_enum[1].value) @@ -183,10 +184,8 @@ def determine_ffmpeg_arg_set( def convert_video( - video_path: Path, - output_dir: Path, - arg_set: Optional[Tuple[str, str]] - ) -> Path: + video_path: Path, output_dir: Path, arg_set: Optional[Tuple[str, str]] +) -> Path: """ Converts a video to a specified format using ffmpeg. @@ -223,7 +222,9 @@ def convert_video( output_args = arg_set[1] ffmpeg_command = ["ffmpeg", "-y", "-v", "warning", "-hide_banner"] - ffmpeg_command.extend(["-threads", "8"]) # Use 8 threads per compression job + # Use 8 threads per compression job + ffmpeg_command.extend(["-threads", "8"]) + if input_args: ffmpeg_command.extend(shlex.split(input_args)) ffmpeg_command.extend(["-i", str(video_path)]) From 1e110077944d4e83de8aa484a7f960ce2fc306c3 Mon Sep 17 00:00:00 2001 From: jwong-nd Date: Mon, 4 Nov 2024 17:27:54 -0800 Subject: [PATCH 04/31] 2/3 tests passing --- src/aind_behavior_video_transformation/etl.py | 46 +++++++++---------- 1 file changed, 21 insertions(+), 25 deletions(-) diff --git a/src/aind_behavior_video_transformation/etl.py b/src/aind_behavior_video_transformation/etl.py index f71f507..0de1369 100644 --- a/src/aind_behavior_video_transformation/etl.py +++ b/src/aind_behavior_video_transformation/etl.py @@ -43,7 +43,7 @@ class BehaviorVideoJobSettings(BasicJobSettings): "request" ), ) - video_extensions = [ + video_extensions: List[str] = [ ".mp4", ".avi", ".mov", @@ -84,9 +84,8 @@ def _format_output_directory(self) -> None: output_dir.mkdir(parents=True, exist_ok=True) for file in input_dir.rglob("*"): - if file.is_file() and not any( - file.suffix.lower() == ext - for ext in self.job_settings.video_extensions + if (file.is_file() and not + (file.suffix.lower() in self.job_settings.video_extensions) ): relative_path = file.relative_to(input_dir) target_link = output_dir / relative_path @@ -108,34 +107,29 @@ def _resolve_compression_requests( # Define map: abs_path -> override CompressionRequest overrides = {} comp_reqs = self.job_settings.video_specific_compression_requests - for vid_path, comp_req in comp_reqs: - vid_path = Path(vid_path) - abs_path = None - if vid_path.is_absolute(): - abs_path = vid_path - elif vid_path.exists(): - abs_path = vid_path.resolve() - else: - abs_path = (input_dir / vid_path).resolve() - overrides[abs_path] = comp_req + if comp_reqs: + for vid_path, comp_req in comp_reqs: + vid_path = Path(vid_path) + abs_path = None + if vid_path.is_absolute(): + abs_path = vid_path + elif vid_path.exists(): + abs_path = vid_path.resolve() + else: + abs_path = (input_dir / vid_path).resolve() + overrides[abs_path] = comp_req # Produce list of all (abs_path, CompressionRequest) pairs path_comp_req_pairs = [ (file, self.job_settings.compression_requested) for file in input_dir.rglob("*") - if ( - file.is_file() - and any( - file.suffix.lower() == ext - for ext in self.job_settings.video_extensions - ) + if (file.is_file() + and (file.suffix.lower() in self.job_settings.video_extensions) ) ] - path_comp_req_pairs = [ - (file, overrides[file]) - for (file, _) in path_comp_req_pairs - if file in overrides - ] + for file, override in overrides.items(): + path_comp_req_pairs.pop((file, self.job_settings.compression_requested)) + path_comp_req_pairs.append((file, override)) return path_comp_req_pairs @@ -223,6 +217,8 @@ def run_job(self) -> JobResponse: data=None, ) +# Resolve last error tomorrow, requires reading the test code +# to see what it is expecting. if __name__ == "__main__": sys_args = sys.argv[1:] From c94377ece05ee8d3a48fb023be913a7061001f4d Mon Sep 17 00:00:00 2001 From: jwong-nd Date: Tue, 5 Nov 2024 12:47:48 -0800 Subject: [PATCH 05/31] pass tests --- src/aind_behavior_video_transformation/etl.py | 77 ++++++++++++------- tests/test_transform_videos.py | 4 +- 2 files changed, 50 insertions(+), 31 deletions(-) diff --git a/src/aind_behavior_video_transformation/etl.py b/src/aind_behavior_video_transformation/etl.py index 0de1369..59aa96d 100644 --- a/src/aind_behavior_video_transformation/etl.py +++ b/src/aind_behavior_video_transformation/etl.py @@ -104,34 +104,51 @@ def _resolve_compression_requests( """ input_dir = Path(self.job_settings.input_source.resolve()) - # Define map: abs_path -> override CompressionRequest - overrides = {} - comp_reqs = self.job_settings.video_specific_compression_requests - if comp_reqs: - for vid_path, comp_req in comp_reqs: - vid_path = Path(vid_path) - abs_path = None - if vid_path.is_absolute(): - abs_path = vid_path - elif vid_path.exists(): - abs_path = vid_path.resolve() - else: - abs_path = (input_dir / vid_path).resolve() - overrides[abs_path] = comp_req - - # Produce list of all (abs_path, CompressionRequest) pairs - path_comp_req_pairs = [ - (file, self.job_settings.compression_requested) + # Set all videos to global compression setting + path_req_pairs: List[Tuple[Path, CompressionRequest]] = [ + (file.resolve(), self.job_settings.compression_requested) for file in input_dir.rglob("*") if (file.is_file() and (file.suffix.lower() in self.job_settings.video_extensions) ) ] - for file, override in overrides.items(): - path_comp_req_pairs.pop((file, self.job_settings.compression_requested)) - path_comp_req_pairs.append((file, override)) - return path_comp_req_pairs + # Apply overrides if present. + comp_reqs = self.job_settings.video_specific_compression_requests + if comp_reqs: + # Define map: override abs_vid_path -> override CompressionRequest + overrides: dict[Path, CompressionRequest] = {} + for override_path, override_req in comp_reqs: + override_path = Path(override_path) + + # Case 1: Override Path is a file. + if override_path.is_file(): + override_path = override_path.resolve() + overrides[override_path] = override_req + + # Case 2: Override Path is a subdirectory (relative/absolute) + else: + if not override_path.is_absolute(): + override_path = (input_dir / override_path) + for override_file in override_path.rglob("*"): + if (override_file.is_file() and + (override_file.suffix.lower() + in self.job_settings.video_extensions) + ): + override_file = override_file.resolve() + overrides[override_file] = override_req + + # Filter path_req_pairs by overrides. + output_pairs: List[Tuple[Path, CompressionRequest]] = [] + for vid_path, default_req in path_req_pairs: + for override_path, override_req in overrides.items(): + if vid_path.samefile(override_path): + output_pairs.append((override_path, override_req)) + else: + output_pairs.append((vid_path, default_req)) + path_req_pairs = output_pairs + + return path_req_pairs def _run_compression( self, @@ -149,13 +166,13 @@ def _run_compression( # Resolve destination relative_path = vid_path.relative_to(input_dir) output_path = output_dir / relative_path - output_dir = output_path.parent + output_vid_dir = output_path.parent # Resolve compression params arg_set = comp_req.determine_ffmpeg_arg_set() # Add to job buffer - convert_video_params.append((vid_path, output_dir, arg_set)) + convert_video_params.append((vid_path, output_vid_dir, arg_set)) logging.info( f"Compressing {str(vid_path)} \ w/ {comp_req.compression_enum}" @@ -178,9 +195,9 @@ def _run_compression( for job in as_completed(jobs): try: result = job.result() - print("FFmpeg job completed:", result) + logging.info("FFmpeg job completed:", result) except Exception as e: - print("Error:", e) + logging.info("Error:", e) else: # Execute Serially @@ -217,8 +234,6 @@ def run_job(self) -> JobResponse: data=None, ) -# Resolve last error tomorrow, requires reading the test code -# to see what it is expecting. if __name__ == "__main__": sys_args = sys.argv[1:] @@ -242,5 +257,9 @@ def run_job(self) -> JobResponse: job = BehaviorVideoJob(job_settings=job_settings) job_response = job.run_job() print(job_response.status_code) - logging.info(job_response.model_dump_json()) + + +# TODO: +# Expose parallel parameter. +# Linting \ No newline at end of file diff --git a/tests/test_transform_videos.py b/tests/test_transform_videos.py index ce72e02..a746cba 100644 --- a/tests/test_transform_videos.py +++ b/tests/test_transform_videos.py @@ -1,10 +1,10 @@ """Tests transform_videos module.""" import shlex +import shutil import subprocess import tempfile import unittest -from os import symlink from pathlib import Path from unittest.mock import MagicMock, patch @@ -110,7 +110,7 @@ def test_run_job_with_data_structure(self, mock_time: MagicMock): camera_in_paths = [in_path / d for d in camera_subdirs] for camera_path in camera_in_paths: camera_path.mkdir() - symlink(test_vid_path, camera_path / test_vid_name) + shutil.copy(str(test_vid_path), str(camera_path / test_vid_name)) open(camera_path / metadata_file, "w").close() with tempfile.TemporaryDirectory() as out_temp_dir: From 8b9c38ff98fe4178bc7cc7a9305dfed4aed5a15a Mon Sep 17 00:00:00 2001 From: jwong-nd Date: Tue, 5 Nov 2024 14:15:48 -0800 Subject: [PATCH 06/31] linting --- src/aind_behavior_video_transformation/etl.py | 17 ++++++++--------- tests/test_transform_videos.py | 4 +++- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/src/aind_behavior_video_transformation/etl.py b/src/aind_behavior_video_transformation/etl.py index 59aa96d..890729a 100644 --- a/src/aind_behavior_video_transformation/etl.py +++ b/src/aind_behavior_video_transformation/etl.py @@ -84,8 +84,8 @@ def _format_output_directory(self) -> None: output_dir.mkdir(parents=True, exist_ok=True) for file in input_dir.rglob("*"): - if (file.is_file() and not - (file.suffix.lower() in self.job_settings.video_extensions) + if file.is_file() and not ( + file.suffix.lower() in self.job_settings.video_extensions ): relative_path = file.relative_to(input_dir) target_link = output_dir / relative_path @@ -108,7 +108,8 @@ def _resolve_compression_requests( path_req_pairs: List[Tuple[Path, CompressionRequest]] = [ (file.resolve(), self.job_settings.compression_requested) for file in input_dir.rglob("*") - if (file.is_file() + if ( + file.is_file() and (file.suffix.lower() in self.job_settings.video_extensions) ) ] @@ -129,11 +130,11 @@ def _resolve_compression_requests( # Case 2: Override Path is a subdirectory (relative/absolute) else: if not override_path.is_absolute(): - override_path = (input_dir / override_path) + override_path = input_dir / override_path for override_file in override_path.rglob("*"): - if (override_file.is_file() and - (override_file.suffix.lower() - in self.job_settings.video_extensions) + if override_file.is_file() and ( + override_file.suffix.lower() + in self.job_settings.video_extensions ): override_file = override_file.resolve() overrides[override_file] = override_req @@ -259,7 +260,5 @@ def run_job(self) -> JobResponse: print(job_response.status_code) logging.info(job_response.model_dump_json()) - # TODO: # Expose parallel parameter. -# Linting \ No newline at end of file diff --git a/tests/test_transform_videos.py b/tests/test_transform_videos.py index a746cba..8d31332 100644 --- a/tests/test_transform_videos.py +++ b/tests/test_transform_videos.py @@ -110,7 +110,9 @@ def test_run_job_with_data_structure(self, mock_time: MagicMock): camera_in_paths = [in_path / d for d in camera_subdirs] for camera_path in camera_in_paths: camera_path.mkdir() - shutil.copy(str(test_vid_path), str(camera_path / test_vid_name)) + shutil.copy( + str(test_vid_path), str(camera_path / test_vid_name) + ) open(camera_path / metadata_file, "w").close() with tempfile.TemporaryDirectory() as out_temp_dir: From e9619503cc82ef996f6ae0fbb416e189d7b81e5a Mon Sep 17 00:00:00 2001 From: jwong-nd Date: Tue, 5 Nov 2024 14:22:05 -0800 Subject: [PATCH 07/31] expose parallel parameter --- src/aind_behavior_video_transformation/etl.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/aind_behavior_video_transformation/etl.py b/src/aind_behavior_video_transformation/etl.py index 890729a..2340199 100644 --- a/src/aind_behavior_video_transformation/etl.py +++ b/src/aind_behavior_video_transformation/etl.py @@ -43,6 +43,10 @@ class BehaviorVideoJobSettings(BasicJobSettings): "request" ), ) + parallel_compression: bool = Field( + default=True, + description="Run compression in parallel or sequentially.", + ) video_extensions: List[str] = [ ".mp4", ".avi", @@ -154,7 +158,6 @@ def _resolve_compression_requests( def _run_compression( self, path_comp_req_pairs: List[Tuple[PathLike, CompressionRequest]], - parallel=True, ) -> None: """ Runs CompressionRequests at the specified paths. @@ -179,7 +182,7 @@ def _run_compression( w/ {comp_req.compression_enum}" ) - if parallel: + if self.job_settings.parallel_compression: # Dask implementation # import dask # jobs = [dask.delayed(convert_video)(*params) @@ -259,6 +262,3 @@ def run_job(self) -> JobResponse: job_response = job.run_job() print(job_response.status_code) logging.info(job_response.model_dump_json()) - -# TODO: -# Expose parallel parameter. From d721833cff585592730a9d01c8e6c782eea82587 Mon Sep 17 00:00:00 2001 From: jwong-nd Date: Tue, 5 Nov 2024 14:22:20 -0800 Subject: [PATCH 08/31] coverage threshold to 80 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index f7c148f..b691508 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -59,7 +59,7 @@ exclude_lines = [ "import", "pragma: no cover", ] -fail_under = 97 +fail_under = 80 [tool.isort] line_length = 79 From 20843973224391ab3ee2c0a7957354b33e7eada8 Mon Sep 17 00:00:00 2001 From: jwong-nd Date: Tue, 5 Nov 2024 14:55:32 -0800 Subject: [PATCH 09/31] remove dask comment --- src/aind_behavior_video_transformation/etl.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/aind_behavior_video_transformation/etl.py b/src/aind_behavior_video_transformation/etl.py index 2340199..e615b0e 100644 --- a/src/aind_behavior_video_transformation/etl.py +++ b/src/aind_behavior_video_transformation/etl.py @@ -183,12 +183,6 @@ def _run_compression( ) if self.job_settings.parallel_compression: - # Dask implementation - # import dask - # jobs = [dask.delayed(convert_video)(*params) - # for params in convert_video_params] - # dask.compute(*jobs) # This returns an error if any jobs fail - # ProcessPool implementation num_jobs = len(convert_video_params) with ProcessPoolExecutor(max_workers=num_jobs) as executor: From 7d5205d0353b45e29bf8909530f061569b3a8479 Mon Sep 17 00:00:00 2001 From: jwong-nd Date: Tue, 5 Nov 2024 15:14:10 -0800 Subject: [PATCH 10/31] expose ffmpeg thread parameter --- src/aind_behavior_video_transformation/etl.py | 20 +++++++++++++++++-- .../transform_videos.py | 10 +++++++--- 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/src/aind_behavior_video_transformation/etl.py b/src/aind_behavior_video_transformation/etl.py index e615b0e..2c54dc9 100644 --- a/src/aind_behavior_video_transformation/etl.py +++ b/src/aind_behavior_video_transformation/etl.py @@ -47,6 +47,10 @@ class BehaviorVideoJobSettings(BasicJobSettings): default=True, description="Run compression in parallel or sequentially.", ) + ffmpeg_thread_cnt: int = Field( + default=8, + description="Number of threads per ffmpeg compression job." + ) video_extensions: List[str] = [ ".mp4", ".avi", @@ -187,7 +191,8 @@ def _run_compression( num_jobs = len(convert_video_params) with ProcessPoolExecutor(max_workers=num_jobs) as executor: jobs = [ - executor.submit(convert_video, *params) + executor.submit(convert_video, *params, + self.job_settings.ffmpeg_thread_cnt) for params in convert_video_params ] for job in as_completed(jobs): @@ -200,7 +205,11 @@ def _run_compression( else: # Execute Serially for params in convert_video_params: - convert_video(params) + try: + convert_video(*params, self.job_settings.ffmpeg_thread_cnt) + logging.info("FFmpeg job completed:", result) + except Exception as e: + logging.info("Error:", e) def run_job(self) -> JobResponse: """ @@ -256,3 +265,10 @@ def run_job(self) -> JobResponse: job_response = job.run_job() print(job_response.status_code) logging.info(job_response.model_dump_json()) + +# TODO: +# - Expose number of thread parameter +# - Add unit tests for helper methods +# - Unit test convert_video + +# Will push this in at the end and call it a day. diff --git a/src/aind_behavior_video_transformation/transform_videos.py b/src/aind_behavior_video_transformation/transform_videos.py index a97550f..cbf6b80 100644 --- a/src/aind_behavior_video_transformation/transform_videos.py +++ b/src/aind_behavior_video_transformation/transform_videos.py @@ -184,7 +184,10 @@ def determine_ffmpeg_arg_set( def convert_video( - video_path: Path, output_dir: Path, arg_set: Optional[Tuple[str, str]] + video_path: Path, + output_dir: Path, + arg_set: Optional[Tuple[str, str]], + ffmpeg_thread_cnt: int = 8 ) -> Path: """ Converts a video to a specified format using ffmpeg. @@ -222,8 +225,9 @@ def convert_video( output_args = arg_set[1] ffmpeg_command = ["ffmpeg", "-y", "-v", "warning", "-hide_banner"] - # Use 8 threads per compression job - ffmpeg_command.extend(["-threads", "8"]) + + # Set thread count + ffmpeg_command.extend(["-threads", str(ffmpeg_thread_cnt)]) if input_args: ffmpeg_command.extend(shlex.split(input_args)) From 8a423c75aa5a53d180527fa76fda75a683fe0253 Mon Sep 17 00:00:00 2001 From: jwong-nd Date: Wed, 6 Nov 2024 09:49:10 -0800 Subject: [PATCH 11/31] Revert to main --- pyproject.toml | 2 +- src/aind_behavior_video_transformation/etl.py | 181 ++---------------- .../filesystem.py | 128 +++++++++++++ .../transform_videos.py | 45 ++--- tests/test_transform_videos.py | 9 +- 5 files changed, 165 insertions(+), 200 deletions(-) create mode 100644 src/aind_behavior_video_transformation/filesystem.py diff --git a/pyproject.toml b/pyproject.toml index b691508..f7c148f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -59,7 +59,7 @@ exclude_lines = [ "import", "pragma: no cover", ] -fail_under = 80 +fail_under = 97 [tool.isort] line_length = 79 diff --git a/src/aind_behavior_video_transformation/etl.py b/src/aind_behavior_video_transformation/etl.py index 2c54dc9..415791e 100644 --- a/src/aind_behavior_video_transformation/etl.py +++ b/src/aind_behavior_video_transformation/etl.py @@ -2,7 +2,6 @@ import logging import sys -from concurrent.futures import ProcessPoolExecutor, as_completed from pathlib import Path from time import time from typing import List, Optional, Tuple, Union @@ -15,13 +14,14 @@ ) from pydantic import Field +from aind_behavior_video_transformation.filesystem import ( + build_overrides_dict, + transform_directory, +) from aind_behavior_video_transformation.transform_videos import ( CompressionRequest, - convert_video, ) -PathLike = Union[Path, str] - class BehaviorVideoJobSettings(BasicJobSettings): """ @@ -34,7 +34,7 @@ class BehaviorVideoJobSettings(BasicJobSettings): description="Compression requested for video files", ) video_specific_compression_requests: Optional[ - List[Tuple[PathLike, CompressionRequest]] + List[Tuple[Union[Path, str], CompressionRequest]] ] = Field( default=None, description=( @@ -43,23 +43,6 @@ class BehaviorVideoJobSettings(BasicJobSettings): "request" ), ) - parallel_compression: bool = Field( - default=True, - description="Run compression in parallel or sequentially.", - ) - ffmpeg_thread_cnt: int = Field( - default=8, - description="Number of threads per ffmpeg compression job." - ) - video_extensions: List[str] = [ - ".mp4", - ".avi", - ".mov", - ".mkv", - ".flv", - ".wmv", - ".webm", - ] class BehaviorVideoJob(GenericEtl[BehaviorVideoJobSettings]): @@ -81,136 +64,6 @@ class BehaviorVideoJob(GenericEtl[BehaviorVideoJobSettings]): run_job() -> JobResponse """ - def _format_output_directory(self) -> None: - """ - Recurisively copies (symlink) non-video files - from input directory to output directory - perserving filesysem structure. - """ - input_dir = Path(self.job_settings.input_source.resolve()) - output_dir = Path(self.job_settings.output_directory.resolve()) - output_dir.mkdir(parents=True, exist_ok=True) - - for file in input_dir.rglob("*"): - if file.is_file() and not ( - file.suffix.lower() in self.job_settings.video_extensions - ): - relative_path = file.relative_to(input_dir) - target_link = output_dir / relative_path - target_link.parent.mkdir(parents=True, exist_ok=True) - target_link.symlink_to(file) - - def _resolve_compression_requests( - self, - ) -> List[Tuple[PathLike, CompressionRequest]]: - """ - Recursively traverses input directory and - resolves CompressionRequest of all videos. - - Sets 'compression_requested' as the default, unless - overrided in 'video_specific_compression_requests'. - """ - input_dir = Path(self.job_settings.input_source.resolve()) - - # Set all videos to global compression setting - path_req_pairs: List[Tuple[Path, CompressionRequest]] = [ - (file.resolve(), self.job_settings.compression_requested) - for file in input_dir.rglob("*") - if ( - file.is_file() - and (file.suffix.lower() in self.job_settings.video_extensions) - ) - ] - - # Apply overrides if present. - comp_reqs = self.job_settings.video_specific_compression_requests - if comp_reqs: - # Define map: override abs_vid_path -> override CompressionRequest - overrides: dict[Path, CompressionRequest] = {} - for override_path, override_req in comp_reqs: - override_path = Path(override_path) - - # Case 1: Override Path is a file. - if override_path.is_file(): - override_path = override_path.resolve() - overrides[override_path] = override_req - - # Case 2: Override Path is a subdirectory (relative/absolute) - else: - if not override_path.is_absolute(): - override_path = input_dir / override_path - for override_file in override_path.rglob("*"): - if override_file.is_file() and ( - override_file.suffix.lower() - in self.job_settings.video_extensions - ): - override_file = override_file.resolve() - overrides[override_file] = override_req - - # Filter path_req_pairs by overrides. - output_pairs: List[Tuple[Path, CompressionRequest]] = [] - for vid_path, default_req in path_req_pairs: - for override_path, override_req in overrides.items(): - if vid_path.samefile(override_path): - output_pairs.append((override_path, override_req)) - else: - output_pairs.append((vid_path, default_req)) - path_req_pairs = output_pairs - - return path_req_pairs - - def _run_compression( - self, - path_comp_req_pairs: List[Tuple[PathLike, CompressionRequest]], - ) -> None: - """ - Runs CompressionRequests at the specified paths. - """ - input_dir = Path(self.job_settings.input_source.resolve()) - output_dir = Path(self.job_settings.output_directory.resolve()) - - convert_video_params = [] - for vid_path, comp_req in path_comp_req_pairs: - # Resolve destination - relative_path = vid_path.relative_to(input_dir) - output_path = output_dir / relative_path - output_vid_dir = output_path.parent - - # Resolve compression params - arg_set = comp_req.determine_ffmpeg_arg_set() - - # Add to job buffer - convert_video_params.append((vid_path, output_vid_dir, arg_set)) - logging.info( - f"Compressing {str(vid_path)} \ - w/ {comp_req.compression_enum}" - ) - - if self.job_settings.parallel_compression: - # ProcessPool implementation - num_jobs = len(convert_video_params) - with ProcessPoolExecutor(max_workers=num_jobs) as executor: - jobs = [ - executor.submit(convert_video, *params, - self.job_settings.ffmpeg_thread_cnt) - for params in convert_video_params - ] - for job in as_completed(jobs): - try: - result = job.result() - logging.info("FFmpeg job completed:", result) - except Exception as e: - logging.info("Error:", e) - - else: - # Execute Serially - for params in convert_video_params: - try: - convert_video(*params, self.job_settings.ffmpeg_thread_cnt) - logging.info("FFmpeg job completed:", result) - except Exception as e: - logging.info("Error:", e) - def run_job(self) -> JobResponse: """ Main public method to run the compression job. @@ -230,10 +83,20 @@ def run_job(self) -> JobResponse: """ job_start_time = time() - self._format_output_directory() - path_comp_req_pairs = self._resolve_compression_requests() - self._run_compression(path_comp_req_pairs) + video_comp_pairs = ( + self.job_settings.video_specific_compression_requests + ) + job_out_dir_path = self.job_settings.output_directory.resolve() + Path(job_out_dir_path).mkdir(exist_ok=True) + job_in_dir_path = self.job_settings.input_source.resolve() + overrides = build_overrides_dict(video_comp_pairs, job_in_dir_path) + ffmpeg_arg_set = ( + self.job_settings.compression_requested.determine_ffmpeg_arg_set() + ) + transform_directory( + job_in_dir_path, job_out_dir_path, ffmpeg_arg_set, overrides + ) job_end_time = time() return JobResponse( status_code=200, @@ -264,11 +127,5 @@ def run_job(self) -> JobResponse: job = BehaviorVideoJob(job_settings=job_settings) job_response = job.run_job() print(job_response.status_code) - logging.info(job_response.model_dump_json()) -# TODO: -# - Expose number of thread parameter -# - Add unit tests for helper methods -# - Unit test convert_video - -# Will push this in at the end and call it a day. + logging.info(job_response.model_dump_json()) diff --git a/src/aind_behavior_video_transformation/filesystem.py b/src/aind_behavior_video_transformation/filesystem.py new file mode 100644 index 0000000..6e44e82 --- /dev/null +++ b/src/aind_behavior_video_transformation/filesystem.py @@ -0,0 +1,128 @@ +"""Module for handling file discovery to transform videos.""" + +from os import symlink, walk +from os.path import relpath +from pathlib import Path + +from aind_behavior_video_transformation.transform_videos import convert_video + + +def likely_video_file(file: Path) -> bool: + """ + Check if a file is likely a video file based on its suffix. + + Parameters + ---------- + file : Path + The file path to check. + + Returns + ------- + bool + True if the file suffix indicates it is a video file, False otherwise. + """ + return file.suffix in set( + [ + ".mp4", + ".avi", + ".mov", + ".mkv", + ".flv", + ".wmv", + ".webm", + ] + ) + + +def build_overrides_dict(video_comp_pairs, job_in_dir_path): + """ + Builds a dictionary of override arguments for video paths. + + Parameters + ---------- + video_comp_pairs : list of tuple + A list of tuples where each tuple contains a video file name and a + corresponding CompressionRequest object. + job_in_dir_path : Path + The base directory path where the job input files are located. + + Returns + ------- + dict + A dictionary where keys are video paths (either files or directories) + and values are the override argument sets determined by the + CompressionRequest objects. + """ + overrides = dict() + if video_comp_pairs: + for video_name, comp_req in video_comp_pairs: + video_path = Path(video_name) + # Figure out how video path was passed, convert to absolute + if video_path.is_absolute(): + in_path = video_path + elif video_path.exists(): + in_path = video_path.resolve() + else: + in_path = (job_in_dir_path / video_path).resolve() + # Set overrides for the video path + override_arg_set = comp_req.determine_ffmpeg_arg_set() + # If it is a directory, set overrides for all subdirectories + if in_path.is_dir(): + overrides[in_path] = override_arg_set + for root, dirs, _ in walk(in_path, followlinks=True): + root_path = Path(root) + for dir_name in dirs: + subdir = root_path / dir_name + overrides[subdir] = override_arg_set + # If it is a file, set override for the file + else: + overrides[in_path] = override_arg_set + + return overrides + + +def transform_directory( + input_dir: Path, output_dir: Path, arg_set, overrides=dict() +) -> None: + """ + Transforms all video files in a directory and its subdirectories, + and creates symbolic links for non-video files. Subdirectories are + created as needed. + + Parameters + ---------- + input_dir : Path + The directory containing the input files. + output_dir : Path + The directory where the transformed files and symbolic links will be + saved. + arg_set : Any + The set of arguments to be used for video transformation. + overrides : dict, optional + A dictionary containing overrides for specific directories or files. + Keys are Paths and values are argument sets. Default is an empty + dictionary. + + Returns + ------- + None + """ + for root, dirs, files in walk(input_dir, followlinks=True): + root_path = Path(root) + in_relpath = relpath(root, input_dir) + dst_dir = output_dir / in_relpath + for dir_name in dirs: + out_path = dst_dir / dir_name + out_path.mkdir(parents=True, exist_ok=True) + + for file_name in files: + file_path = Path(root) / file_name + if likely_video_file(file_path): + # If the parent directory has an override, use that + this_arg_set = overrides.get(root_path, arg_set) + # File-level overrides take precedence + this_arg_set = overrides.get(file_path, this_arg_set) + convert_video(file_path, dst_dir, this_arg_set) + else: + out_path = dst_dir / file_name + symlink(file_path, out_path) diff --git a/src/aind_behavior_video_transformation/transform_videos.py b/src/aind_behavior_video_transformation/transform_videos.py index cbf6b80..6df7ba7 100644 --- a/src/aind_behavior_video_transformation/transform_videos.py +++ b/src/aind_behavior_video_transformation/transform_videos.py @@ -1,17 +1,6 @@ -""" -Module to handle transforming behavior videos - -To add a new compression preset: -1) Define a CompressionEnum: 'CUSTOM_COMPRESSION = 'custom' -2) Define corresponding FfmpegInputArgs/FfmpegOutputArgs. -3) Add the CompressionEnum to FfmpegArgSet, and build - (FfmpegInputArgs, FfmpegOutputArgs) tuple: - 'CUSTOM_COMPRESSION' = ( - FfmpegInputArgs.CUSTOM_INPUT_ARGS, - FfmpegOutputArgs.CUSTOM_OUTPUT_ARGS, - ) -""" +"""Module to handle transforming behavior videos""" +import logging import shlex import subprocess from enum import Enum @@ -134,7 +123,7 @@ class CompressionRequest(BaseModel): def determine_ffmpeg_arg_set( self, - ) -> Optional[Tuple[str, str]]: + ) -> Optional[Tuple[Optional[str], Optional[str]]]: """ Determines the appropriate set of FFmpeg arguments based on the compression requirements. @@ -164,7 +153,6 @@ def determine_ffmpeg_arg_set( self.user_ffmpeg_input_options, self.user_ffmpeg_output_options, ) - # If not one of the two special cases, use the enum values else: # If default, set compression to gamma @@ -172,23 +160,12 @@ def determine_ffmpeg_arg_set( compression_preset = CompressionEnum.GAMMA_ENCODING else: compression_preset = self.compression_enum - - # Resolve two levels of indirection here - # FfmpegArgSet -> (FfmpegInputArgs, FfmpegOutputArgs) - # (FfmpegInputArgs, FfmpegOutputArgs) - # -> (in_args str, out_args str) arg_set_enum = FfmpegArgSet[compression_preset.name].value arg_set = (arg_set_enum[0].value, arg_set_enum[1].value) - return arg_set -def convert_video( - video_path: Path, - output_dir: Path, - arg_set: Optional[Tuple[str, str]], - ffmpeg_thread_cnt: int = 8 -) -> Path: +def convert_video(video_path: Path, dst: Path, arg_set) -> Path: """ Converts a video to a specified format using ffmpeg. @@ -196,7 +173,7 @@ def convert_video( ---------- video_path : Path The path to the input video file. - output_dir : Path + dst : Path The destination directory where the converted video will be saved. arg_set : tuple or None A tuple containing input and output arguments for ffmpeg. If None, a @@ -212,9 +189,11 @@ def convert_video( - The function uses ffmpeg for video conversion. - If `arg_set` is None, the function creates a symlink to the original video file. + - The function logs the ffmpeg command used for conversion. """ - out_path = output_dir / f"{video_path.stem}.mp4" # noqa: E501 + out_path = dst / f"{video_path.stem}.mp4" # noqa: E501 + # Pydantic validation ensures this is a 'CompressionRequest' value. # Trivial Case, do nothing if arg_set is None: @@ -225,10 +204,6 @@ def convert_video( output_args = arg_set[1] ffmpeg_command = ["ffmpeg", "-y", "-v", "warning", "-hide_banner"] - - # Set thread count - ffmpeg_command.extend(["-threads", str(ffmpeg_thread_cnt)]) - if input_args: ffmpeg_command.extend(shlex.split(input_args)) ffmpeg_command.extend(["-i", str(video_path)]) @@ -236,6 +211,10 @@ def convert_video( ffmpeg_command.extend(shlex.split(output_args)) ffmpeg_command.append(str(out_path)) + # For logging I guess + ffmpeg_str = " ".join(ffmpeg_command) + logging.info(f"{ffmpeg_str=}") + subprocess.run(ffmpeg_command, check=True) return out_path diff --git a/tests/test_transform_videos.py b/tests/test_transform_videos.py index 8d31332..b08bc29 100644 --- a/tests/test_transform_videos.py +++ b/tests/test_transform_videos.py @@ -1,10 +1,10 @@ """Tests transform_videos module.""" import shlex -import shutil import subprocess import tempfile import unittest +from os import symlink from pathlib import Path from unittest.mock import MagicMock, patch @@ -43,6 +43,9 @@ def helper_run_compression_job(job_settings, mock_time): class TestBehaviorVideoJob(unittest.TestCase): """Test methods in BehaviorVideoJob class.""" + # NOTE: + # Test suite does not run yet. + # Resolving lint errors first. test_data_path = Path("tests/test_video_in_dir").resolve() dummy_response = JobResponse( status_code=200, @@ -110,9 +113,7 @@ def test_run_job_with_data_structure(self, mock_time: MagicMock): camera_in_paths = [in_path / d for d in camera_subdirs] for camera_path in camera_in_paths: camera_path.mkdir() - shutil.copy( - str(test_vid_path), str(camera_path / test_vid_name) - ) + symlink(test_vid_path, camera_path / test_vid_name) open(camera_path / metadata_file, "w").close() with tempfile.TemporaryDirectory() as out_temp_dir: From ba9dced0c597e5e32a0ef30182a7326a107ce9ee Mon Sep 17 00:00:00 2001 From: jwong-nd Date: Wed, 6 Nov 2024 10:02:15 -0800 Subject: [PATCH 12/31] add parallelization --- src/aind_behavior_video_transformation/etl.py | 41 ++++++++++++++++- .../filesystem.py | 11 +++-- .../transform_videos.py | 46 ++++++++++++++----- 3 files changed, 82 insertions(+), 16 deletions(-) diff --git a/src/aind_behavior_video_transformation/etl.py b/src/aind_behavior_video_transformation/etl.py index 415791e..f09839a 100644 --- a/src/aind_behavior_video_transformation/etl.py +++ b/src/aind_behavior_video_transformation/etl.py @@ -2,6 +2,7 @@ import logging import sys +from concurrent.futures import ProcessPoolExecutor, as_completed from pathlib import Path from time import time from typing import List, Optional, Tuple, Union @@ -20,6 +21,7 @@ ) from aind_behavior_video_transformation.transform_videos import ( CompressionRequest, + convert_video ) @@ -43,6 +45,14 @@ class BehaviorVideoJobSettings(BasicJobSettings): "request" ), ) + parallel_compression: bool = Field( + default=True, + description="Run compression in parallel or sequentially.", + ) + ffmpeg_thread_cnt: int = Field( + default=0, + description="Number of threads per ffmpeg compression job." + ) class BehaviorVideoJob(GenericEtl[BehaviorVideoJobSettings]): @@ -64,6 +74,33 @@ class BehaviorVideoJob(GenericEtl[BehaviorVideoJobSettings]): run_job() -> JobResponse """ + def _run_compression( + self, + convert_video_args: list[tuple[Path, Path, tuple[str, str] | None]] + ) -> None: + """ + Runs CompressionRequests at the specified paths. + """ + + if self.job_settings.parallel_compression: + # ProcessPool implementation + num_jobs = len(convert_video_args) + with ProcessPoolExecutor(max_workers=num_jobs) as executor: + jobs = [ + executor.submit(convert_video, *params, + self.job_settings.ffmpeg_thread_cnt) + for params in convert_video_args + ] + for job in as_completed(jobs): + result = job.result() + logging.info("FFmpeg job completed:", result) + + else: + # Execute Serially + for params in convert_video_args: + convert_video(*params, self.job_settings.ffmpeg_thread_cnt) + logging.info("FFmpeg job completed:", result) + def run_job(self) -> JobResponse: """ Main public method to run the compression job. @@ -94,9 +131,11 @@ def run_job(self) -> JobResponse: ffmpeg_arg_set = ( self.job_settings.compression_requested.determine_ffmpeg_arg_set() ) - transform_directory( + convert_video_args = transform_directory( job_in_dir_path, job_out_dir_path, ffmpeg_arg_set, overrides ) + self._run_compression(convert_video_args) + job_end_time = time() return JobResponse( status_code=200, diff --git a/src/aind_behavior_video_transformation/filesystem.py b/src/aind_behavior_video_transformation/filesystem.py index 6e44e82..6e2098b 100644 --- a/src/aind_behavior_video_transformation/filesystem.py +++ b/src/aind_behavior_video_transformation/filesystem.py @@ -83,7 +83,7 @@ def build_overrides_dict(video_comp_pairs, job_in_dir_path): def transform_directory( input_dir: Path, output_dir: Path, arg_set, overrides=dict() -) -> None: +) -> list[tuple[Path, Path, tuple[str, str] | None]]: """ Transforms all video files in a directory and its subdirectories, and creates symbolic links for non-video files. Subdirectories are @@ -105,8 +105,10 @@ def transform_directory( Returns ------- - None + List of tuples containing convert_video arguments. """ + + convert_video_args = [] for root, dirs, files in walk(input_dir, followlinks=True): root_path = Path(root) in_relpath = relpath(root, input_dir) @@ -122,7 +124,10 @@ def transform_directory( this_arg_set = overrides.get(root_path, arg_set) # File-level overrides take precedence this_arg_set = overrides.get(file_path, this_arg_set) - convert_video(file_path, dst_dir, this_arg_set) + convert_video_args.append((file_path, dst_dir, this_arg_set)) + else: out_path = dst_dir / file_name symlink(file_path, out_path) + + return convert_video_args \ No newline at end of file diff --git a/src/aind_behavior_video_transformation/transform_videos.py b/src/aind_behavior_video_transformation/transform_videos.py index 6df7ba7..5421e66 100644 --- a/src/aind_behavior_video_transformation/transform_videos.py +++ b/src/aind_behavior_video_transformation/transform_videos.py @@ -1,6 +1,17 @@ -"""Module to handle transforming behavior videos""" +""" +Module to handle transforming behavior videos + +To add a new compression preset: +1) Define a CompressionEnum: 'CUSTOM_COMPRESSION = 'custom' +2) Define corresponding FfmpegInputArgs/FfmpegOutputArgs. +3) Add the CompressionEnum to FfmpegArgSet, and build + (FfmpegInputArgs, FfmpegOutputArgs) tuple: + 'CUSTOM_COMPRESSION' = ( + FfmpegInputArgs.CUSTOM_INPUT_ARGS, + FfmpegOutputArgs.CUSTOM_OUTPUT_ARGS, + ) +""" -import logging import shlex import subprocess from enum import Enum @@ -123,7 +134,7 @@ class CompressionRequest(BaseModel): def determine_ffmpeg_arg_set( self, - ) -> Optional[Tuple[Optional[str], Optional[str]]]: + ) -> Optional[Tuple[str, str]]: """ Determines the appropriate set of FFmpeg arguments based on the compression requirements. @@ -153,6 +164,7 @@ def determine_ffmpeg_arg_set( self.user_ffmpeg_input_options, self.user_ffmpeg_output_options, ) + # If not one of the two special cases, use the enum values else: # If default, set compression to gamma @@ -160,12 +172,23 @@ def determine_ffmpeg_arg_set( compression_preset = CompressionEnum.GAMMA_ENCODING else: compression_preset = self.compression_enum + + # Resolve two levels of indirection here + # FfmpegArgSet -> (FfmpegInputArgs, FfmpegOutputArgs) + # (FfmpegInputArgs, FfmpegOutputArgs) + # -> (in_args str, out_args str) arg_set_enum = FfmpegArgSet[compression_preset.name].value arg_set = (arg_set_enum[0].value, arg_set_enum[1].value) + return arg_set -def convert_video(video_path: Path, dst: Path, arg_set) -> Path: +def convert_video( + video_path: Path, + output_dir: Path, + arg_set: Optional[Tuple[str, str]], + ffmpeg_thread_cnt: int = 0 +) -> Path: """ Converts a video to a specified format using ffmpeg. @@ -173,11 +196,12 @@ def convert_video(video_path: Path, dst: Path, arg_set) -> Path: ---------- video_path : Path The path to the input video file. - dst : Path + output_dir : Path The destination directory where the converted video will be saved. arg_set : tuple or None A tuple containing input and output arguments for ffmpeg. If None, a symlink to the original video is created. + ffmpeg_thread_cnt : set number of ffmpeg threads Returns ------- @@ -189,11 +213,9 @@ def convert_video(video_path: Path, dst: Path, arg_set) -> Path: - The function uses ffmpeg for video conversion. - If `arg_set` is None, the function creates a symlink to the original video file. - - The function logs the ffmpeg command used for conversion. """ - out_path = dst / f"{video_path.stem}.mp4" # noqa: E501 - # Pydantic validation ensures this is a 'CompressionRequest' value. + out_path = output_dir / f"{video_path.stem}.mp4" # noqa: E501 # Trivial Case, do nothing if arg_set is None: @@ -204,6 +226,10 @@ def convert_video(video_path: Path, dst: Path, arg_set) -> Path: output_args = arg_set[1] ffmpeg_command = ["ffmpeg", "-y", "-v", "warning", "-hide_banner"] + + # Set thread count + ffmpeg_command.extend(["-threads", str(ffmpeg_thread_cnt)]) + if input_args: ffmpeg_command.extend(shlex.split(input_args)) ffmpeg_command.extend(["-i", str(video_path)]) @@ -211,10 +237,6 @@ def convert_video(video_path: Path, dst: Path, arg_set) -> Path: ffmpeg_command.extend(shlex.split(output_args)) ffmpeg_command.append(str(out_path)) - # For logging I guess - ffmpeg_str = " ".join(ffmpeg_command) - logging.info(f"{ffmpeg_str=}") - subprocess.run(ffmpeg_command, check=True) return out_path From a49d0db5d8816939afb5d9f2e319f1590299fc96 Mon Sep 17 00:00:00 2001 From: jwong-nd Date: Wed, 6 Nov 2024 11:37:29 -0800 Subject: [PATCH 13/31] linting --- src/aind_behavior_video_transformation/etl.py | 14 ++++++++------ .../filesystem.py | 4 +--- .../transform_videos.py | 2 +- tests/test_transform_videos.py | 3 --- 4 files changed, 10 insertions(+), 13 deletions(-) diff --git a/src/aind_behavior_video_transformation/etl.py b/src/aind_behavior_video_transformation/etl.py index f09839a..ca1609b 100644 --- a/src/aind_behavior_video_transformation/etl.py +++ b/src/aind_behavior_video_transformation/etl.py @@ -21,7 +21,7 @@ ) from aind_behavior_video_transformation.transform_videos import ( CompressionRequest, - convert_video + convert_video, ) @@ -50,8 +50,7 @@ class BehaviorVideoJobSettings(BasicJobSettings): description="Run compression in parallel or sequentially.", ) ffmpeg_thread_cnt: int = Field( - default=0, - description="Number of threads per ffmpeg compression job." + default=0, description="Number of threads per ffmpeg compression job." ) @@ -76,7 +75,7 @@ class BehaviorVideoJob(GenericEtl[BehaviorVideoJobSettings]): def _run_compression( self, - convert_video_args: list[tuple[Path, Path, tuple[str, str] | None]] + convert_video_args: list[tuple[Path, Path, tuple[str, str] | None]], ) -> None: """ Runs CompressionRequests at the specified paths. @@ -87,8 +86,11 @@ def _run_compression( num_jobs = len(convert_video_args) with ProcessPoolExecutor(max_workers=num_jobs) as executor: jobs = [ - executor.submit(convert_video, *params, - self.job_settings.ffmpeg_thread_cnt) + executor.submit( + convert_video, + *params, + self.job_settings.ffmpeg_thread_cnt, + ) for params in convert_video_args ] for job in as_completed(jobs): diff --git a/src/aind_behavior_video_transformation/filesystem.py b/src/aind_behavior_video_transformation/filesystem.py index 6e2098b..8fafbc3 100644 --- a/src/aind_behavior_video_transformation/filesystem.py +++ b/src/aind_behavior_video_transformation/filesystem.py @@ -4,8 +4,6 @@ from os.path import relpath from pathlib import Path -from aind_behavior_video_transformation.transform_videos import convert_video - def likely_video_file(file: Path) -> bool: """ @@ -130,4 +128,4 @@ def transform_directory( out_path = dst_dir / file_name symlink(file_path, out_path) - return convert_video_args \ No newline at end of file + return convert_video_args diff --git a/src/aind_behavior_video_transformation/transform_videos.py b/src/aind_behavior_video_transformation/transform_videos.py index 5421e66..0c46207 100644 --- a/src/aind_behavior_video_transformation/transform_videos.py +++ b/src/aind_behavior_video_transformation/transform_videos.py @@ -187,7 +187,7 @@ def convert_video( video_path: Path, output_dir: Path, arg_set: Optional[Tuple[str, str]], - ffmpeg_thread_cnt: int = 0 + ffmpeg_thread_cnt: int = 0, ) -> Path: """ Converts a video to a specified format using ffmpeg. diff --git a/tests/test_transform_videos.py b/tests/test_transform_videos.py index b08bc29..ce72e02 100644 --- a/tests/test_transform_videos.py +++ b/tests/test_transform_videos.py @@ -43,9 +43,6 @@ def helper_run_compression_job(job_settings, mock_time): class TestBehaviorVideoJob(unittest.TestCase): """Test methods in BehaviorVideoJob class.""" - # NOTE: - # Test suite does not run yet. - # Resolving lint errors first. test_data_path = Path("tests/test_video_in_dir").resolve() dummy_response = JobResponse( status_code=200, From 9ac8ecb025cbf10eb79db924176d88f26671ecb9 Mon Sep 17 00:00:00 2001 From: jwong-nd Date: Wed, 6 Nov 2024 11:47:54 -0800 Subject: [PATCH 14/31] log lines missing test coverage --- .github/workflows/test_and_lint.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test_and_lint.yml b/.github/workflows/test_and_lint.yml index bde4952..7f31a46 100644 --- a/.github/workflows/test_and_lint.yml +++ b/.github/workflows/test_and_lint.yml @@ -27,4 +27,4 @@ jobs: - name: Run linter checks run: flake8 . && interrogate --verbose . - name: Run tests and coverage - run: coverage run -m unittest discover && coverage report + run: coverage run -m unittest discover && coverage report -m From 36e1c61d60a607401f79eda19d1c94216bb7494b Mon Sep 17 00:00:00 2001 From: jwong-nd Date: Wed, 6 Nov 2024 11:54:37 -0800 Subject: [PATCH 15/31] test serial compression branch --- tests/test_transform_videos.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_transform_videos.py b/tests/test_transform_videos.py index ce72e02..7f76351 100644 --- a/tests/test_transform_videos.py +++ b/tests/test_transform_videos.py @@ -73,6 +73,7 @@ def test_run_job(self, mock_time: MagicMock): compression_requested=CompressionRequest( compression_enum=compression_enum ), + parallel_compression=False ) response = helper_run_compression_job(job_settings, mock_time) self.assertEqual(expected_response, response) From b70512f9ae11e7e35438dc08933e402fe39933e0 Mon Sep 17 00:00:00 2001 From: jwong-nd Date: Wed, 6 Nov 2024 12:01:24 -0800 Subject: [PATCH 16/31] add unit test for conert_video --- .../__init__.py | 1 + tests/test_transform_videos.py | 23 +++++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/src/aind_behavior_video_transformation/__init__.py b/src/aind_behavior_video_transformation/__init__.py index fd0e5ab..62a9247 100644 --- a/src/aind_behavior_video_transformation/__init__.py +++ b/src/aind_behavior_video_transformation/__init__.py @@ -9,4 +9,5 @@ from aind_behavior_video_transformation.transform_videos import ( # noqa F401 CompressionEnum, CompressionRequest, + convert_video ) diff --git a/tests/test_transform_videos.py b/tests/test_transform_videos.py index 7f76351..dfa5653 100644 --- a/tests/test_transform_videos.py +++ b/tests/test_transform_videos.py @@ -15,6 +15,7 @@ BehaviorVideoJobSettings, CompressionEnum, CompressionRequest, + convert_video ) @@ -52,6 +53,28 @@ class TestBehaviorVideoJob(unittest.TestCase): test_vid_name = "clip.mp4" test_vid_path = test_data_path / test_vid_name + @patch("aind_behavior_video_transformation.etl.time") + def test_convert_video(self, mock_time: MagicMock): + """Unit test convert video.""" + + # Equivalent to CompressionEnum.GAMMA_ENCODING + arg_set = ("", "-vf " + '"scale=out_color_matrix=bt709:out_range=full:sws_dither=none,' + "format=yuv420p10le,colorspace=ispace=bt709:all=bt709:dither=none," + 'scale=out_range=tv:sws_dither=none,format=yuv420p" -c:v libx264 ' + "-preset veryslow -crf 18 -pix_fmt yuv420p " + '-metadata author="Allen Institute for Neural Dyamics" ' + "-movflags +faststart+write_colr") + with tempfile.TemporaryDirectory() as temp_dir: + temp_path = Path(temp_dir) + compressed_out_path = convert_video(self.test_vid_path, + temp_path, + arg_set) + + out_path = temp_path / self.test_vid_name + + self.assertTrue(str(out_path) == str(compressed_out_path)) + @patch("aind_behavior_video_transformation.etl.time") def test_run_job(self, mock_time: MagicMock): """Tests run_job method.""" From b68b9089fb005539b906a6b264dce76009857dc7 Mon Sep 17 00:00:00 2001 From: jwong-nd Date: Wed, 6 Nov 2024 12:04:08 -0800 Subject: [PATCH 17/31] lint the test --- .../__init__.py | 2 +- tests/test_transform_videos.py | 27 ++++++++++--------- 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/src/aind_behavior_video_transformation/__init__.py b/src/aind_behavior_video_transformation/__init__.py index 62a9247..9b4339a 100644 --- a/src/aind_behavior_video_transformation/__init__.py +++ b/src/aind_behavior_video_transformation/__init__.py @@ -9,5 +9,5 @@ from aind_behavior_video_transformation.transform_videos import ( # noqa F401 CompressionEnum, CompressionRequest, - convert_video + convert_video, ) diff --git a/tests/test_transform_videos.py b/tests/test_transform_videos.py index dfa5653..cc7e905 100644 --- a/tests/test_transform_videos.py +++ b/tests/test_transform_videos.py @@ -15,7 +15,7 @@ BehaviorVideoJobSettings, CompressionEnum, CompressionRequest, - convert_video + convert_video, ) @@ -58,18 +58,21 @@ def test_convert_video(self, mock_time: MagicMock): """Unit test convert video.""" # Equivalent to CompressionEnum.GAMMA_ENCODING - arg_set = ("", "-vf " - '"scale=out_color_matrix=bt709:out_range=full:sws_dither=none,' - "format=yuv420p10le,colorspace=ispace=bt709:all=bt709:dither=none," - 'scale=out_range=tv:sws_dither=none,format=yuv420p" -c:v libx264 ' - "-preset veryslow -crf 18 -pix_fmt yuv420p " - '-metadata author="Allen Institute for Neural Dyamics" ' - "-movflags +faststart+write_colr") + arg_set = ( + "", + "-vf " + '"scale=out_color_matrix=bt709:out_range=full:sws_dither=none,' + "format=yuv420p10le,colorspace=ispace=bt709:all=bt709:dither=none," + 'scale=out_range=tv:sws_dither=none,format=yuv420p" -c:v libx264 ' + "-preset veryslow -crf 18 -pix_fmt yuv420p " + '-metadata author="Allen Institute for Neural Dyamics" ' + "-movflags +faststart+write_colr", + ) with tempfile.TemporaryDirectory() as temp_dir: temp_path = Path(temp_dir) - compressed_out_path = convert_video(self.test_vid_path, - temp_path, - arg_set) + compressed_out_path = convert_video( + self.test_vid_path, temp_path, arg_set + ) out_path = temp_path / self.test_vid_name @@ -96,7 +99,7 @@ def test_run_job(self, mock_time: MagicMock): compression_requested=CompressionRequest( compression_enum=compression_enum ), - parallel_compression=False + parallel_compression=False, ) response = helper_run_compression_job(job_settings, mock_time) self.assertEqual(expected_response, response) From cfc3aa12ccf8ea5da9022bf3467f9909f247bc6c Mon Sep 17 00:00:00 2001 From: jwong-nd Date: Wed, 6 Nov 2024 12:11:14 -0800 Subject: [PATCH 18/31] resolve serial runtime error --- src/aind_behavior_video_transformation/etl.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/aind_behavior_video_transformation/etl.py b/src/aind_behavior_video_transformation/etl.py index ca1609b..46da479 100644 --- a/src/aind_behavior_video_transformation/etl.py +++ b/src/aind_behavior_video_transformation/etl.py @@ -100,8 +100,8 @@ def _run_compression( else: # Execute Serially for params in convert_video_args: - convert_video(*params, self.job_settings.ffmpeg_thread_cnt) - logging.info("FFmpeg job completed:", result) + out_path = convert_video(*params, self.job_settings.ffmpeg_thread_cnt) + logging.info(f"FFmpeg job completed: {out_path}") def run_job(self) -> JobResponse: """ From 30476fd79c499c412cffd048cfab1a4f7b1f2a61 Mon Sep 17 00:00:00 2001 From: jwong-nd Date: Wed, 6 Nov 2024 12:12:39 -0800 Subject: [PATCH 19/31] linting... --- src/aind_behavior_video_transformation/etl.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/aind_behavior_video_transformation/etl.py b/src/aind_behavior_video_transformation/etl.py index 46da479..356d33f 100644 --- a/src/aind_behavior_video_transformation/etl.py +++ b/src/aind_behavior_video_transformation/etl.py @@ -100,7 +100,8 @@ def _run_compression( else: # Execute Serially for params in convert_video_args: - out_path = convert_video(*params, self.job_settings.ffmpeg_thread_cnt) + out_path = convert_video(*params, + self.job_settings.ffmpeg_thread_cnt) logging.info(f"FFmpeg job completed: {out_path}") def run_job(self) -> JobResponse: From f7ea735687899b1ce9e5b8e041b111239839d368 Mon Sep 17 00:00:00 2001 From: jwong-nd Date: Thu, 7 Nov 2024 10:30:13 -0800 Subject: [PATCH 20/31] Documentation --- .../transform_videos.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/aind_behavior_video_transformation/transform_videos.py b/src/aind_behavior_video_transformation/transform_videos.py index 0c46207..f8b0e6a 100644 --- a/src/aind_behavior_video_transformation/transform_videos.py +++ b/src/aind_behavior_video_transformation/transform_videos.py @@ -2,14 +2,15 @@ Module to handle transforming behavior videos To add a new compression preset: -1) Define a CompressionEnum: 'CUSTOM_COMPRESSION = 'custom' -2) Define corresponding FfmpegInputArgs/FfmpegOutputArgs. +1) Define FfmpegInputArgs/FfmpegOutputArgs. +2) Define a CompressionEnum: 'NEW_PRESET = 'new_preset' 3) Add the CompressionEnum to FfmpegArgSet, and build (FfmpegInputArgs, FfmpegOutputArgs) tuple: - 'CUSTOM_COMPRESSION' = ( + 'NEW_PRESET' = ( FfmpegInputArgs.CUSTOM_INPUT_ARGS, FfmpegOutputArgs.CUSTOM_OUTPUT_ARGS, ) +FfmpegInputArgs / FfmpegOutputArgs can be prexisitng or newly-defined in (1) """ import shlex @@ -228,7 +229,8 @@ def convert_video( ffmpeg_command = ["ffmpeg", "-y", "-v", "warning", "-hide_banner"] # Set thread count - ffmpeg_command.extend(["-threads", str(ffmpeg_thread_cnt)]) + if ffmpeg_thread_cnt > 0: + ffmpeg_command.extend(["-threads", str(ffmpeg_thread_cnt)]) if input_args: ffmpeg_command.extend(shlex.split(input_args)) From b16850ce54ea31d2cb05cef6dcf78212c8eea7ce Mon Sep 17 00:00:00 2001 From: jwong-nd Date: Thu, 7 Nov 2024 10:30:42 -0800 Subject: [PATCH 21/31] parallel=true in directory test --- tests/test_transform_videos.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_transform_videos.py b/tests/test_transform_videos.py index cc7e905..edf69ea 100644 --- a/tests/test_transform_videos.py +++ b/tests/test_transform_videos.py @@ -148,6 +148,7 @@ def test_run_job_with_data_structure(self, mock_time: MagicMock): compression_requested=CompressionRequest( compression_enum=CompressionEnum.DEFAULT, ), + parallel_compression=True, ) response = helper_run_compression_job(job_settings, mock_time) self.assertEqual(dummy_response, response) From 7d0566d57615af1078fdccd65c1be1168dbdcb31 Mon Sep 17 00:00:00 2001 From: Galen Lynch Date: Thu, 7 Nov 2024 16:01:34 -0800 Subject: [PATCH 22/31] Remove redundant test, top level export --- .../__init__.py | 1 - tests/test_transform_videos.py | 26 ------------------- 2 files changed, 27 deletions(-) diff --git a/src/aind_behavior_video_transformation/__init__.py b/src/aind_behavior_video_transformation/__init__.py index 9b4339a..fd0e5ab 100644 --- a/src/aind_behavior_video_transformation/__init__.py +++ b/src/aind_behavior_video_transformation/__init__.py @@ -9,5 +9,4 @@ from aind_behavior_video_transformation.transform_videos import ( # noqa F401 CompressionEnum, CompressionRequest, - convert_video, ) diff --git a/tests/test_transform_videos.py b/tests/test_transform_videos.py index edf69ea..fa06498 100644 --- a/tests/test_transform_videos.py +++ b/tests/test_transform_videos.py @@ -15,7 +15,6 @@ BehaviorVideoJobSettings, CompressionEnum, CompressionRequest, - convert_video, ) @@ -53,31 +52,6 @@ class TestBehaviorVideoJob(unittest.TestCase): test_vid_name = "clip.mp4" test_vid_path = test_data_path / test_vid_name - @patch("aind_behavior_video_transformation.etl.time") - def test_convert_video(self, mock_time: MagicMock): - """Unit test convert video.""" - - # Equivalent to CompressionEnum.GAMMA_ENCODING - arg_set = ( - "", - "-vf " - '"scale=out_color_matrix=bt709:out_range=full:sws_dither=none,' - "format=yuv420p10le,colorspace=ispace=bt709:all=bt709:dither=none," - 'scale=out_range=tv:sws_dither=none,format=yuv420p" -c:v libx264 ' - "-preset veryslow -crf 18 -pix_fmt yuv420p " - '-metadata author="Allen Institute for Neural Dyamics" ' - "-movflags +faststart+write_colr", - ) - with tempfile.TemporaryDirectory() as temp_dir: - temp_path = Path(temp_dir) - compressed_out_path = convert_video( - self.test_vid_path, temp_path, arg_set - ) - - out_path = temp_path / self.test_vid_name - - self.assertTrue(str(out_path) == str(compressed_out_path)) - @patch("aind_behavior_video_transformation.etl.time") def test_run_job(self, mock_time: MagicMock): """Tests run_job method.""" From 5d7033871f677b482143e9efae0e80197aff80f4 Mon Sep 17 00:00:00 2001 From: jwong-nd Date: Fri, 8 Nov 2024 13:49:12 -0800 Subject: [PATCH 23/31] add serial logging to parallel errors --- src/aind_behavior_video_transformation/etl.py | 28 +++++++++++++------ .../transform_videos.py | 28 +++++++++++++++---- 2 files changed, 42 insertions(+), 14 deletions(-) diff --git a/src/aind_behavior_video_transformation/etl.py b/src/aind_behavior_video_transformation/etl.py index 356d33f..80621ce 100644 --- a/src/aind_behavior_video_transformation/etl.py +++ b/src/aind_behavior_video_transformation/etl.py @@ -80,9 +80,9 @@ def _run_compression( """ Runs CompressionRequests at the specified paths. """ - + error_traces = [] if self.job_settings.parallel_compression: - # ProcessPool implementation + # Execute in-parallel num_jobs = len(convert_video_args) with ProcessPoolExecutor(max_workers=num_jobs) as executor: jobs = [ @@ -94,15 +94,25 @@ def _run_compression( for params in convert_video_args ] for job in as_completed(jobs): - result = job.result() - logging.info("FFmpeg job completed:", result) - + out_path, error = job.result() + if error: + error_traces.append(error) + else: + logging.info(f"FFmpeg job completed: {out_path}") else: - # Execute Serially + # Execute serially for params in convert_video_args: - out_path = convert_video(*params, - self.job_settings.ffmpeg_thread_cnt) - logging.info(f"FFmpeg job completed: {out_path}") + out_path, error = convert_video(*params, + self.job_settings.ffmpeg_thread_cnt) + if error: + error_traces.append(error) + else: + logging.info(f"FFmpeg job completed: {out_path}") + + if error_traces: + for e in error_traces: + logging.error(e) + raise RuntimeError('One or more Ffmpeg jobs failed. See error logs.') def run_job(self) -> JobResponse: """ diff --git a/src/aind_behavior_video_transformation/transform_videos.py b/src/aind_behavior_video_transformation/transform_videos.py index f8b0e6a..d667a97 100644 --- a/src/aind_behavior_video_transformation/transform_videos.py +++ b/src/aind_behavior_video_transformation/transform_videos.py @@ -15,6 +15,7 @@ import shlex import subprocess +from subprocess import CalledProcessError from enum import Enum from os import symlink from pathlib import Path @@ -22,7 +23,6 @@ from pydantic import BaseModel, Field - class CompressionEnum(Enum): """ Enum class to define different types of compression requests. @@ -189,7 +189,7 @@ def convert_video( output_dir: Path, arg_set: Optional[Tuple[str, str]], ffmpeg_thread_cnt: int = 0, -) -> Path: +) -> tuple[Path, Optional[str]]: """ Converts a video to a specified format using ffmpeg. @@ -239,6 +239,24 @@ def convert_video( ffmpeg_command.extend(shlex.split(output_args)) ffmpeg_command.append(str(out_path)) - subprocess.run(ffmpeg_command, check=True) - - return out_path + # Capture and return error message if it exists + import tempfile + with tempfile.NamedTemporaryFile(mode='w+', suffix='.log') as stdout_file, \ + tempfile.NamedTemporaryFile(mode='w+', suffix='.log') as stderr_file: + try: + subprocess.run( + ffmpeg_command, + check=True, + capture_output=True, + text=True + ) + return out_path, None + + except CalledProcessError as e: + error_msg = ( + f"FFmpeg conversion failed for {video_path}\n" + f"Command: {' '.join(ffmpeg_command)}\n" + f"Return code: {e.returncode}\n" + f"Error output:\n{e.stderr}\n" + ) + return (out_path, error_msg) From f5a8b309b38c46c7ba069fc5c48345ee116c1527 Mon Sep 17 00:00:00 2001 From: jwong-nd Date: Fri, 8 Nov 2024 13:54:43 -0800 Subject: [PATCH 24/31] linting --- src/aind_behavior_video_transformation/etl.py | 11 +++--- .../transform_videos.py | 35 ++++++++----------- 2 files changed, 22 insertions(+), 24 deletions(-) diff --git a/src/aind_behavior_video_transformation/etl.py b/src/aind_behavior_video_transformation/etl.py index 80621ce..aef6b7d 100644 --- a/src/aind_behavior_video_transformation/etl.py +++ b/src/aind_behavior_video_transformation/etl.py @@ -102,17 +102,20 @@ def _run_compression( else: # Execute serially for params in convert_video_args: - out_path, error = convert_video(*params, - self.job_settings.ffmpeg_thread_cnt) + out_path, error = convert_video( + *params, self.job_settings.ffmpeg_thread_cnt + ) if error: - error_traces.append(error) + error_traces.append(error) else: logging.info(f"FFmpeg job completed: {out_path}") if error_traces: for e in error_traces: logging.error(e) - raise RuntimeError('One or more Ffmpeg jobs failed. See error logs.') + raise RuntimeError( + "One or more Ffmpeg jobs failed. See error logs." + ) def run_job(self) -> JobResponse: """ diff --git a/src/aind_behavior_video_transformation/transform_videos.py b/src/aind_behavior_video_transformation/transform_videos.py index d667a97..60563b3 100644 --- a/src/aind_behavior_video_transformation/transform_videos.py +++ b/src/aind_behavior_video_transformation/transform_videos.py @@ -23,6 +23,7 @@ from pydantic import BaseModel, Field + class CompressionEnum(Enum): """ Enum class to define different types of compression requests. @@ -240,23 +241,17 @@ def convert_video( ffmpeg_command.append(str(out_path)) # Capture and return error message if it exists - import tempfile - with tempfile.NamedTemporaryFile(mode='w+', suffix='.log') as stdout_file, \ - tempfile.NamedTemporaryFile(mode='w+', suffix='.log') as stderr_file: - try: - subprocess.run( - ffmpeg_command, - check=True, - capture_output=True, - text=True - ) - return out_path, None - - except CalledProcessError as e: - error_msg = ( - f"FFmpeg conversion failed for {video_path}\n" - f"Command: {' '.join(ffmpeg_command)}\n" - f"Return code: {e.returncode}\n" - f"Error output:\n{e.stderr}\n" - ) - return (out_path, error_msg) + try: + subprocess.run( + ffmpeg_command, check=True, capture_output=True, text=True + ) + return out_path, None + + except CalledProcessError as e: + error_msg = ( + f"FFmpeg conversion failed for {video_path}\n" + f"Command: {' '.join(ffmpeg_command)}\n" + f"Return code: {e.returncode}\n" + f"Error output:\n{e.stderr}\n" + ) + return (out_path, error_msg) From 6f963b3ea86cb8eef2b5a086fe02c4c098afabbc Mon Sep 17 00:00:00 2001 From: jwong-nd Date: Fri, 8 Nov 2024 13:55:25 -0800 Subject: [PATCH 25/31] linting 2 --- src/aind_behavior_video_transformation/transform_videos.py | 2 +- tests/test_transform_videos.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/aind_behavior_video_transformation/transform_videos.py b/src/aind_behavior_video_transformation/transform_videos.py index 60563b3..1004682 100644 --- a/src/aind_behavior_video_transformation/transform_videos.py +++ b/src/aind_behavior_video_transformation/transform_videos.py @@ -15,10 +15,10 @@ import shlex import subprocess -from subprocess import CalledProcessError from enum import Enum from os import symlink from pathlib import Path +from subprocess import CalledProcessError from typing import Optional, Tuple from pydantic import BaseModel, Field diff --git a/tests/test_transform_videos.py b/tests/test_transform_videos.py index fa06498..b0db13d 100644 --- a/tests/test_transform_videos.py +++ b/tests/test_transform_videos.py @@ -123,6 +123,7 @@ def test_run_job_with_data_structure(self, mock_time: MagicMock): compression_enum=CompressionEnum.DEFAULT, ), parallel_compression=True, + ffmpeg_thread_cnt=4, ) response = helper_run_compression_job(job_settings, mock_time) self.assertEqual(dummy_response, response) From f08332d76f3c843c2c852c0e55ff80e878fa9eaa Mon Sep 17 00:00:00 2001 From: jwong-nd Date: Fri, 8 Nov 2024 14:13:46 -0800 Subject: [PATCH 26/31] runtime error --- src/aind_behavior_video_transformation/transform_videos.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/aind_behavior_video_transformation/transform_videos.py b/src/aind_behavior_video_transformation/transform_videos.py index 1004682..a46f5ff 100644 --- a/src/aind_behavior_video_transformation/transform_videos.py +++ b/src/aind_behavior_video_transformation/transform_videos.py @@ -190,7 +190,7 @@ def convert_video( output_dir: Path, arg_set: Optional[Tuple[str, str]], ffmpeg_thread_cnt: int = 0, -) -> tuple[Path, Optional[str]]: +) -> tuple[Path, str]: """ Converts a video to a specified format using ffmpeg. @@ -245,7 +245,7 @@ def convert_video( subprocess.run( ffmpeg_command, check=True, capture_output=True, text=True ) - return out_path, None + return (out_path, "") except CalledProcessError as e: error_msg = ( From 7a793a2b0e6fd59bb03f37c495f5b1575cff2c30 Mon Sep 17 00:00:00 2001 From: jwong-nd Date: Fri, 8 Nov 2024 14:20:05 -0800 Subject: [PATCH 27/31] maybe this works --- src/aind_behavior_video_transformation/transform_videos.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/aind_behavior_video_transformation/transform_videos.py b/src/aind_behavior_video_transformation/transform_videos.py index a46f5ff..df00037 100644 --- a/src/aind_behavior_video_transformation/transform_videos.py +++ b/src/aind_behavior_video_transformation/transform_videos.py @@ -190,7 +190,7 @@ def convert_video( output_dir: Path, arg_set: Optional[Tuple[str, str]], ffmpeg_thread_cnt: int = 0, -) -> tuple[Path, str]: +) -> tuple[str, Optional[str]]: """ Converts a video to a specified format using ffmpeg. @@ -245,7 +245,7 @@ def convert_video( subprocess.run( ffmpeg_command, check=True, capture_output=True, text=True ) - return (out_path, "") + return (str(out_path), None) except CalledProcessError as e: error_msg = ( @@ -254,4 +254,4 @@ def convert_video( f"Return code: {e.returncode}\n" f"Error output:\n{e.stderr}\n" ) - return (out_path, error_msg) + return (str(out_path), error_msg) From f09ae1472d89ebc9b15a018e741662a19bcb0ec5 Mon Sep 17 00:00:00 2001 From: jwong-nd Date: Fri, 8 Nov 2024 14:30:08 -0800 Subject: [PATCH 28/31] try this --- src/aind_behavior_video_transformation/etl.py | 16 ++++++++-------- .../transform_videos.py | 6 +++--- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/aind_behavior_video_transformation/etl.py b/src/aind_behavior_video_transformation/etl.py index aef6b7d..f3372b9 100644 --- a/src/aind_behavior_video_transformation/etl.py +++ b/src/aind_behavior_video_transformation/etl.py @@ -94,21 +94,21 @@ def _run_compression( for params in convert_video_args ] for job in as_completed(jobs): - out_path, error = job.result() - if error: - error_traces.append(error) + result = job.result() + if isinstance(result, tuple): + error_traces.append(result[1]) else: - logging.info(f"FFmpeg job completed: {out_path}") + logging.info(f"FFmpeg job completed: {result}") else: # Execute serially for params in convert_video_args: - out_path, error = convert_video( + result = convert_video( *params, self.job_settings.ffmpeg_thread_cnt ) - if error: - error_traces.append(error) + if isinstance(result, tuple): + error_traces.append(result[1]) else: - logging.info(f"FFmpeg job completed: {out_path}") + logging.info(f"FFmpeg job completed: {result}") if error_traces: for e in error_traces: diff --git a/src/aind_behavior_video_transformation/transform_videos.py b/src/aind_behavior_video_transformation/transform_videos.py index df00037..611d917 100644 --- a/src/aind_behavior_video_transformation/transform_videos.py +++ b/src/aind_behavior_video_transformation/transform_videos.py @@ -19,7 +19,7 @@ from os import symlink from pathlib import Path from subprocess import CalledProcessError -from typing import Optional, Tuple +from typing import Optional, Tuple, Union from pydantic import BaseModel, Field @@ -190,7 +190,7 @@ def convert_video( output_dir: Path, arg_set: Optional[Tuple[str, str]], ffmpeg_thread_cnt: int = 0, -) -> tuple[str, Optional[str]]: +) -> Union[str, Tuple[str, str]]: """ Converts a video to a specified format using ffmpeg. @@ -245,7 +245,7 @@ def convert_video( subprocess.run( ffmpeg_command, check=True, capture_output=True, text=True ) - return (str(out_path), None) + return str(out_path) except CalledProcessError as e: error_msg = ( From 364508daa1751eef1d951d25965b2b8366b68c67 Mon Sep 17 00:00:00 2001 From: jwong-nd Date: Fri, 8 Nov 2024 16:33:16 -0800 Subject: [PATCH 29/31] add serial log test --- tests/test_transform_videos.py | 77 +++++++++++++++++++++++++++++++++- 1 file changed, 76 insertions(+), 1 deletion(-) diff --git a/tests/test_transform_videos.py b/tests/test_transform_videos.py index b0db13d..548551b 100644 --- a/tests/test_transform_videos.py +++ b/tests/test_transform_videos.py @@ -1,10 +1,14 @@ """Tests transform_videos module.""" +import logging +import tempfile +from contextlib import contextmanager + import shlex import subprocess import tempfile import unittest -from os import symlink +from os import symlink, unlink from pathlib import Path from unittest.mock import MagicMock, patch @@ -215,6 +219,77 @@ def test_run_job_missing_colorspace(self, mock_time: MagicMock): self.assertEqual(expected_response, response) self.assertTrue(temp_out_path.joinpath(test_vid_name).exists()) + @contextmanager + def capture_logs(self): + """ + Context manager that creates a temporary log file + and configures logging to use it. + """ + with tempfile.NamedTemporaryFile(mode='w+', delete=False) as temp_log: + # Configure logging to write to our temporary file + file_handler = logging.FileHandler(temp_log.name) + file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) + logging.getLogger().addHandler(file_handler) + logging.getLogger().setLevel(logging.DEBUG) + + try: + yield temp_log.name + finally: + # Clean up: restore original handlers and remove temp file + file_handler.close() + unlink(temp_log.name) + + @patch("aind_behavior_video_transformation.etl.time") + def test_serial_log(self, mock_time: MagicMock): + """ + Test errors in parallel execution are logged + sequentially. The length of the error message + is known to be 9 lines. So non-overlap is tested + by checking 'ERROR' does not occur within 9 lines + of each instance of 'ERROR'. + """ + + faulty_req = CompressionRequest( + compression_enum=CompressionEnum.USER_DEFINED, + user_ffmpeg_input_options = "invalid input args", + user_ffmpeg_output_options = "invalid output args", + ) + + if not self.test_vid_path.is_file(): + raise FileNotFoundError(f"File not found: {self.test_vid_path}") + camera_subdirs = [f"camera{i}" for i in range(1, 3)] + with tempfile.TemporaryDirectory() as in_temp_dir: + # Prepare input data + in_path = Path(in_temp_dir) + camera_in_paths = [in_path / d for d in camera_subdirs] + for camera_path in camera_in_paths: + camera_path.mkdir() + symlink(self.test_vid_path, camera_path / self.test_vid_name) + + with tempfile.TemporaryDirectory() as out_temp_dir: + out_path = Path(out_temp_dir) + job_settings = BehaviorVideoJobSettings( + input_source=in_path, + output_directory=out_path, + compression_requested=faulty_req, + parallel_compression=True, + ) + with self.capture_logs() as log_file: + self.assertRaises( + RuntimeError, + helper_run_compression_job, + job_settings, + mock_time + ) + error_blocks = [] + with open(log_file, 'r') as f: + for line_num, line in enumerate(f, 1): + if 'ERROR' in line: + error_blocks.append(line_num) + + for i in range(len(error_blocks) - 1): + error_diff = error_blocks[i + 1] - error_blocks[i] + self.assertTrue(error_diff >= 9) if __name__ == "__main__": unittest.main() From 8b76a563e768046e9c562d882cfb23381fa7a4ef Mon Sep 17 00:00:00 2001 From: jwong-nd Date: Fri, 8 Nov 2024 16:36:36 -0800 Subject: [PATCH 30/31] linting --- tests/test_transform_videos.py | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/tests/test_transform_videos.py b/tests/test_transform_videos.py index 548551b..387f8c5 100644 --- a/tests/test_transform_videos.py +++ b/tests/test_transform_videos.py @@ -1,13 +1,11 @@ """Tests transform_videos module.""" import logging -import tempfile -from contextlib import contextmanager - import shlex import subprocess import tempfile import unittest +from contextlib import contextmanager from os import symlink, unlink from pathlib import Path from unittest.mock import MagicMock, patch @@ -225,10 +223,12 @@ def capture_logs(self): Context manager that creates a temporary log file and configures logging to use it. """ - with tempfile.NamedTemporaryFile(mode='w+', delete=False) as temp_log: + with tempfile.NamedTemporaryFile(mode="w+", delete=False) as temp_log: # Configure logging to write to our temporary file file_handler = logging.FileHandler(temp_log.name) - file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) + file_handler.setFormatter( + logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") + ) logging.getLogger().addHandler(file_handler) logging.getLogger().setLevel(logging.DEBUG) @@ -251,8 +251,8 @@ def test_serial_log(self, mock_time: MagicMock): faulty_req = CompressionRequest( compression_enum=CompressionEnum.USER_DEFINED, - user_ffmpeg_input_options = "invalid input args", - user_ffmpeg_output_options = "invalid output args", + user_ffmpeg_input_options="invalid input args", + user_ffmpeg_output_options="invalid output args", ) if not self.test_vid_path.is_file(): @@ -279,17 +279,18 @@ def test_serial_log(self, mock_time: MagicMock): RuntimeError, helper_run_compression_job, job_settings, - mock_time + mock_time, ) error_blocks = [] - with open(log_file, 'r') as f: + with open(log_file, "r") as f: for line_num, line in enumerate(f, 1): - if 'ERROR' in line: + if "ERROR" in line: error_blocks.append(line_num) for i in range(len(error_blocks) - 1): error_diff = error_blocks[i + 1] - error_blocks[i] self.assertTrue(error_diff >= 9) + if __name__ == "__main__": unittest.main() From cb088293218159ea33a3820c8a07ffe342f9fc1c Mon Sep 17 00:00:00 2001 From: jwong-nd Date: Fri, 8 Nov 2024 16:42:12 -0800 Subject: [PATCH 31/31] coverage threshold --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index f7c148f..3e81573 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -59,7 +59,7 @@ exclude_lines = [ "import", "pragma: no cover", ] -fail_under = 97 +fail_under = 93 [tool.isort] line_length = 79