diff --git a/.github/workflows/test_and_lint.yml b/.github/workflows/test_and_lint.yml index bde4952..7f31a46 100644 --- a/.github/workflows/test_and_lint.yml +++ b/.github/workflows/test_and_lint.yml @@ -27,4 +27,4 @@ jobs: - name: Run linter checks run: flake8 . && interrogate --verbose . - name: Run tests and coverage - run: coverage run -m unittest discover && coverage report + run: coverage run -m unittest discover && coverage report -m diff --git a/pyproject.toml b/pyproject.toml index f7c148f..3e81573 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -59,7 +59,7 @@ exclude_lines = [ "import", "pragma: no cover", ] -fail_under = 97 +fail_under = 93 [tool.isort] line_length = 79 diff --git a/src/aind_behavior_video_transformation/etl.py b/src/aind_behavior_video_transformation/etl.py index 415791e..f3372b9 100644 --- a/src/aind_behavior_video_transformation/etl.py +++ b/src/aind_behavior_video_transformation/etl.py @@ -2,6 +2,7 @@ import logging import sys +from concurrent.futures import ProcessPoolExecutor, as_completed from pathlib import Path from time import time from typing import List, Optional, Tuple, Union @@ -20,6 +21,7 @@ ) from aind_behavior_video_transformation.transform_videos import ( CompressionRequest, + convert_video, ) @@ -43,6 +45,13 @@ class BehaviorVideoJobSettings(BasicJobSettings): "request" ), ) + parallel_compression: bool = Field( + default=True, + description="Run compression in parallel or sequentially.", + ) + ffmpeg_thread_cnt: int = Field( + default=0, description="Number of threads per ffmpeg compression job." + ) class BehaviorVideoJob(GenericEtl[BehaviorVideoJobSettings]): @@ -64,6 +73,50 @@ class BehaviorVideoJob(GenericEtl[BehaviorVideoJobSettings]): run_job() -> JobResponse """ + def _run_compression( + self, + convert_video_args: list[tuple[Path, Path, tuple[str, str] | None]], + ) -> None: + """ + Runs CompressionRequests at the specified paths. + """ + error_traces = [] + if self.job_settings.parallel_compression: + # Execute in-parallel + num_jobs = len(convert_video_args) + with ProcessPoolExecutor(max_workers=num_jobs) as executor: + jobs = [ + executor.submit( + convert_video, + *params, + self.job_settings.ffmpeg_thread_cnt, + ) + for params in convert_video_args + ] + for job in as_completed(jobs): + result = job.result() + if isinstance(result, tuple): + error_traces.append(result[1]) + else: + logging.info(f"FFmpeg job completed: {result}") + else: + # Execute serially + for params in convert_video_args: + result = convert_video( + *params, self.job_settings.ffmpeg_thread_cnt + ) + if isinstance(result, tuple): + error_traces.append(result[1]) + else: + logging.info(f"FFmpeg job completed: {result}") + + if error_traces: + for e in error_traces: + logging.error(e) + raise RuntimeError( + "One or more Ffmpeg jobs failed. See error logs." + ) + def run_job(self) -> JobResponse: """ Main public method to run the compression job. @@ -94,9 +147,11 @@ def run_job(self) -> JobResponse: ffmpeg_arg_set = ( self.job_settings.compression_requested.determine_ffmpeg_arg_set() ) - transform_directory( + convert_video_args = transform_directory( job_in_dir_path, job_out_dir_path, ffmpeg_arg_set, overrides ) + self._run_compression(convert_video_args) + job_end_time = time() return JobResponse( status_code=200, diff --git a/src/aind_behavior_video_transformation/filesystem.py b/src/aind_behavior_video_transformation/filesystem.py index 6e44e82..8fafbc3 100644 --- a/src/aind_behavior_video_transformation/filesystem.py +++ b/src/aind_behavior_video_transformation/filesystem.py @@ -4,8 +4,6 @@ from os.path import relpath from pathlib import Path -from aind_behavior_video_transformation.transform_videos import convert_video - def likely_video_file(file: Path) -> bool: """ @@ -83,7 +81,7 @@ def build_overrides_dict(video_comp_pairs, job_in_dir_path): def transform_directory( input_dir: Path, output_dir: Path, arg_set, overrides=dict() -) -> None: +) -> list[tuple[Path, Path, tuple[str, str] | None]]: """ Transforms all video files in a directory and its subdirectories, and creates symbolic links for non-video files. Subdirectories are @@ -105,8 +103,10 @@ def transform_directory( Returns ------- - None + List of tuples containing convert_video arguments. """ + + convert_video_args = [] for root, dirs, files in walk(input_dir, followlinks=True): root_path = Path(root) in_relpath = relpath(root, input_dir) @@ -122,7 +122,10 @@ def transform_directory( this_arg_set = overrides.get(root_path, arg_set) # File-level overrides take precedence this_arg_set = overrides.get(file_path, this_arg_set) - convert_video(file_path, dst_dir, this_arg_set) + convert_video_args.append((file_path, dst_dir, this_arg_set)) + else: out_path = dst_dir / file_name symlink(file_path, out_path) + + return convert_video_args diff --git a/src/aind_behavior_video_transformation/transform_videos.py b/src/aind_behavior_video_transformation/transform_videos.py index 6df7ba7..611d917 100644 --- a/src/aind_behavior_video_transformation/transform_videos.py +++ b/src/aind_behavior_video_transformation/transform_videos.py @@ -1,12 +1,25 @@ -"""Module to handle transforming behavior videos""" +""" +Module to handle transforming behavior videos + +To add a new compression preset: +1) Define FfmpegInputArgs/FfmpegOutputArgs. +2) Define a CompressionEnum: 'NEW_PRESET = 'new_preset' +3) Add the CompressionEnum to FfmpegArgSet, and build + (FfmpegInputArgs, FfmpegOutputArgs) tuple: + 'NEW_PRESET' = ( + FfmpegInputArgs.CUSTOM_INPUT_ARGS, + FfmpegOutputArgs.CUSTOM_OUTPUT_ARGS, + ) +FfmpegInputArgs / FfmpegOutputArgs can be prexisitng or newly-defined in (1) +""" -import logging import shlex import subprocess from enum import Enum from os import symlink from pathlib import Path -from typing import Optional, Tuple +from subprocess import CalledProcessError +from typing import Optional, Tuple, Union from pydantic import BaseModel, Field @@ -123,7 +136,7 @@ class CompressionRequest(BaseModel): def determine_ffmpeg_arg_set( self, - ) -> Optional[Tuple[Optional[str], Optional[str]]]: + ) -> Optional[Tuple[str, str]]: """ Determines the appropriate set of FFmpeg arguments based on the compression requirements. @@ -153,6 +166,7 @@ def determine_ffmpeg_arg_set( self.user_ffmpeg_input_options, self.user_ffmpeg_output_options, ) + # If not one of the two special cases, use the enum values else: # If default, set compression to gamma @@ -160,12 +174,23 @@ def determine_ffmpeg_arg_set( compression_preset = CompressionEnum.GAMMA_ENCODING else: compression_preset = self.compression_enum + + # Resolve two levels of indirection here + # FfmpegArgSet -> (FfmpegInputArgs, FfmpegOutputArgs) + # (FfmpegInputArgs, FfmpegOutputArgs) + # -> (in_args str, out_args str) arg_set_enum = FfmpegArgSet[compression_preset.name].value arg_set = (arg_set_enum[0].value, arg_set_enum[1].value) + return arg_set -def convert_video(video_path: Path, dst: Path, arg_set) -> Path: +def convert_video( + video_path: Path, + output_dir: Path, + arg_set: Optional[Tuple[str, str]], + ffmpeg_thread_cnt: int = 0, +) -> Union[str, Tuple[str, str]]: """ Converts a video to a specified format using ffmpeg. @@ -173,11 +198,12 @@ def convert_video(video_path: Path, dst: Path, arg_set) -> Path: ---------- video_path : Path The path to the input video file. - dst : Path + output_dir : Path The destination directory where the converted video will be saved. arg_set : tuple or None A tuple containing input and output arguments for ffmpeg. If None, a symlink to the original video is created. + ffmpeg_thread_cnt : set number of ffmpeg threads Returns ------- @@ -189,11 +215,9 @@ def convert_video(video_path: Path, dst: Path, arg_set) -> Path: - The function uses ffmpeg for video conversion. - If `arg_set` is None, the function creates a symlink to the original video file. - - The function logs the ffmpeg command used for conversion. """ - out_path = dst / f"{video_path.stem}.mp4" # noqa: E501 - # Pydantic validation ensures this is a 'CompressionRequest' value. + out_path = output_dir / f"{video_path.stem}.mp4" # noqa: E501 # Trivial Case, do nothing if arg_set is None: @@ -204,6 +228,11 @@ def convert_video(video_path: Path, dst: Path, arg_set) -> Path: output_args = arg_set[1] ffmpeg_command = ["ffmpeg", "-y", "-v", "warning", "-hide_banner"] + + # Set thread count + if ffmpeg_thread_cnt > 0: + ffmpeg_command.extend(["-threads", str(ffmpeg_thread_cnt)]) + if input_args: ffmpeg_command.extend(shlex.split(input_args)) ffmpeg_command.extend(["-i", str(video_path)]) @@ -211,10 +240,18 @@ def convert_video(video_path: Path, dst: Path, arg_set) -> Path: ffmpeg_command.extend(shlex.split(output_args)) ffmpeg_command.append(str(out_path)) - # For logging I guess - ffmpeg_str = " ".join(ffmpeg_command) - logging.info(f"{ffmpeg_str=}") - - subprocess.run(ffmpeg_command, check=True) - - return out_path + # Capture and return error message if it exists + try: + subprocess.run( + ffmpeg_command, check=True, capture_output=True, text=True + ) + return str(out_path) + + except CalledProcessError as e: + error_msg = ( + f"FFmpeg conversion failed for {video_path}\n" + f"Command: {' '.join(ffmpeg_command)}\n" + f"Return code: {e.returncode}\n" + f"Error output:\n{e.stderr}\n" + ) + return (str(out_path), error_msg) diff --git a/tests/test_transform_videos.py b/tests/test_transform_videos.py index b08bc29..387f8c5 100644 --- a/tests/test_transform_videos.py +++ b/tests/test_transform_videos.py @@ -1,10 +1,12 @@ """Tests transform_videos module.""" +import logging import shlex import subprocess import tempfile import unittest -from os import symlink +from contextlib import contextmanager +from os import symlink, unlink from pathlib import Path from unittest.mock import MagicMock, patch @@ -43,9 +45,6 @@ def helper_run_compression_job(job_settings, mock_time): class TestBehaviorVideoJob(unittest.TestCase): """Test methods in BehaviorVideoJob class.""" - # NOTE: - # Test suite does not run yet. - # Resolving lint errors first. test_data_path = Path("tests/test_video_in_dir").resolve() dummy_response = JobResponse( status_code=200, @@ -76,6 +75,7 @@ def test_run_job(self, mock_time: MagicMock): compression_requested=CompressionRequest( compression_enum=compression_enum ), + parallel_compression=False, ) response = helper_run_compression_job(job_settings, mock_time) self.assertEqual(expected_response, response) @@ -124,6 +124,8 @@ def test_run_job_with_data_structure(self, mock_time: MagicMock): compression_requested=CompressionRequest( compression_enum=CompressionEnum.DEFAULT, ), + parallel_compression=True, + ffmpeg_thread_cnt=4, ) response = helper_run_compression_job(job_settings, mock_time) self.assertEqual(dummy_response, response) @@ -215,6 +217,80 @@ def test_run_job_missing_colorspace(self, mock_time: MagicMock): self.assertEqual(expected_response, response) self.assertTrue(temp_out_path.joinpath(test_vid_name).exists()) + @contextmanager + def capture_logs(self): + """ + Context manager that creates a temporary log file + and configures logging to use it. + """ + with tempfile.NamedTemporaryFile(mode="w+", delete=False) as temp_log: + # Configure logging to write to our temporary file + file_handler = logging.FileHandler(temp_log.name) + file_handler.setFormatter( + logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") + ) + logging.getLogger().addHandler(file_handler) + logging.getLogger().setLevel(logging.DEBUG) + + try: + yield temp_log.name + finally: + # Clean up: restore original handlers and remove temp file + file_handler.close() + unlink(temp_log.name) + + @patch("aind_behavior_video_transformation.etl.time") + def test_serial_log(self, mock_time: MagicMock): + """ + Test errors in parallel execution are logged + sequentially. The length of the error message + is known to be 9 lines. So non-overlap is tested + by checking 'ERROR' does not occur within 9 lines + of each instance of 'ERROR'. + """ + + faulty_req = CompressionRequest( + compression_enum=CompressionEnum.USER_DEFINED, + user_ffmpeg_input_options="invalid input args", + user_ffmpeg_output_options="invalid output args", + ) + + if not self.test_vid_path.is_file(): + raise FileNotFoundError(f"File not found: {self.test_vid_path}") + camera_subdirs = [f"camera{i}" for i in range(1, 3)] + with tempfile.TemporaryDirectory() as in_temp_dir: + # Prepare input data + in_path = Path(in_temp_dir) + camera_in_paths = [in_path / d for d in camera_subdirs] + for camera_path in camera_in_paths: + camera_path.mkdir() + symlink(self.test_vid_path, camera_path / self.test_vid_name) + + with tempfile.TemporaryDirectory() as out_temp_dir: + out_path = Path(out_temp_dir) + job_settings = BehaviorVideoJobSettings( + input_source=in_path, + output_directory=out_path, + compression_requested=faulty_req, + parallel_compression=True, + ) + with self.capture_logs() as log_file: + self.assertRaises( + RuntimeError, + helper_run_compression_job, + job_settings, + mock_time, + ) + error_blocks = [] + with open(log_file, "r") as f: + for line_num, line in enumerate(f, 1): + if "ERROR" in line: + error_blocks.append(line_num) + + for i in range(len(error_blocks) - 1): + error_diff = error_blocks[i + 1] - error_blocks[i] + self.assertTrue(error_diff >= 9) + if __name__ == "__main__": unittest.main()