diff --git a/pyproject.toml b/pyproject.toml index b691508..f7c148f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -59,7 +59,7 @@ exclude_lines = [ "import", "pragma: no cover", ] -fail_under = 80 +fail_under = 97 [tool.isort] line_length = 79 diff --git a/src/aind_behavior_video_transformation/etl.py b/src/aind_behavior_video_transformation/etl.py index 2c54dc9..415791e 100644 --- a/src/aind_behavior_video_transformation/etl.py +++ b/src/aind_behavior_video_transformation/etl.py @@ -2,7 +2,6 @@ import logging import sys -from concurrent.futures import ProcessPoolExecutor, as_completed from pathlib import Path from time import time from typing import List, Optional, Tuple, Union @@ -15,13 +14,14 @@ ) from pydantic import Field +from aind_behavior_video_transformation.filesystem import ( + build_overrides_dict, + transform_directory, +) from aind_behavior_video_transformation.transform_videos import ( CompressionRequest, - convert_video, ) -PathLike = Union[Path, str] - class BehaviorVideoJobSettings(BasicJobSettings): """ @@ -34,7 +34,7 @@ class BehaviorVideoJobSettings(BasicJobSettings): description="Compression requested for video files", ) video_specific_compression_requests: Optional[ - List[Tuple[PathLike, CompressionRequest]] + List[Tuple[Union[Path, str], CompressionRequest]] ] = Field( default=None, description=( @@ -43,23 +43,6 @@ class BehaviorVideoJobSettings(BasicJobSettings): "request" ), ) - parallel_compression: bool = Field( - default=True, - description="Run compression in parallel or sequentially.", - ) - ffmpeg_thread_cnt: int = Field( - default=8, - description="Number of threads per ffmpeg compression job." - ) - video_extensions: List[str] = [ - ".mp4", - ".avi", - ".mov", - ".mkv", - ".flv", - ".wmv", - ".webm", - ] class BehaviorVideoJob(GenericEtl[BehaviorVideoJobSettings]): @@ -81,136 +64,6 @@ class BehaviorVideoJob(GenericEtl[BehaviorVideoJobSettings]): run_job() -> JobResponse """ - def _format_output_directory(self) -> None: - """ - Recurisively copies (symlink) non-video files - from input directory to output directory - perserving filesysem structure. - """ - input_dir = Path(self.job_settings.input_source.resolve()) - output_dir = Path(self.job_settings.output_directory.resolve()) - output_dir.mkdir(parents=True, exist_ok=True) - - for file in input_dir.rglob("*"): - if file.is_file() and not ( - file.suffix.lower() in self.job_settings.video_extensions - ): - relative_path = file.relative_to(input_dir) - target_link = output_dir / relative_path - target_link.parent.mkdir(parents=True, exist_ok=True) - target_link.symlink_to(file) - - def _resolve_compression_requests( - self, - ) -> List[Tuple[PathLike, CompressionRequest]]: - """ - Recursively traverses input directory and - resolves CompressionRequest of all videos. - - Sets 'compression_requested' as the default, unless - overrided in 'video_specific_compression_requests'. - """ - input_dir = Path(self.job_settings.input_source.resolve()) - - # Set all videos to global compression setting - path_req_pairs: List[Tuple[Path, CompressionRequest]] = [ - (file.resolve(), self.job_settings.compression_requested) - for file in input_dir.rglob("*") - if ( - file.is_file() - and (file.suffix.lower() in self.job_settings.video_extensions) - ) - ] - - # Apply overrides if present. - comp_reqs = self.job_settings.video_specific_compression_requests - if comp_reqs: - # Define map: override abs_vid_path -> override CompressionRequest - overrides: dict[Path, CompressionRequest] = {} - for override_path, override_req in comp_reqs: - override_path = Path(override_path) - - # Case 1: Override Path is a file. - if override_path.is_file(): - override_path = override_path.resolve() - overrides[override_path] = override_req - - # Case 2: Override Path is a subdirectory (relative/absolute) - else: - if not override_path.is_absolute(): - override_path = input_dir / override_path - for override_file in override_path.rglob("*"): - if override_file.is_file() and ( - override_file.suffix.lower() - in self.job_settings.video_extensions - ): - override_file = override_file.resolve() - overrides[override_file] = override_req - - # Filter path_req_pairs by overrides. - output_pairs: List[Tuple[Path, CompressionRequest]] = [] - for vid_path, default_req in path_req_pairs: - for override_path, override_req in overrides.items(): - if vid_path.samefile(override_path): - output_pairs.append((override_path, override_req)) - else: - output_pairs.append((vid_path, default_req)) - path_req_pairs = output_pairs - - return path_req_pairs - - def _run_compression( - self, - path_comp_req_pairs: List[Tuple[PathLike, CompressionRequest]], - ) -> None: - """ - Runs CompressionRequests at the specified paths. - """ - input_dir = Path(self.job_settings.input_source.resolve()) - output_dir = Path(self.job_settings.output_directory.resolve()) - - convert_video_params = [] - for vid_path, comp_req in path_comp_req_pairs: - # Resolve destination - relative_path = vid_path.relative_to(input_dir) - output_path = output_dir / relative_path - output_vid_dir = output_path.parent - - # Resolve compression params - arg_set = comp_req.determine_ffmpeg_arg_set() - - # Add to job buffer - convert_video_params.append((vid_path, output_vid_dir, arg_set)) - logging.info( - f"Compressing {str(vid_path)} \ - w/ {comp_req.compression_enum}" - ) - - if self.job_settings.parallel_compression: - # ProcessPool implementation - num_jobs = len(convert_video_params) - with ProcessPoolExecutor(max_workers=num_jobs) as executor: - jobs = [ - executor.submit(convert_video, *params, - self.job_settings.ffmpeg_thread_cnt) - for params in convert_video_params - ] - for job in as_completed(jobs): - try: - result = job.result() - logging.info("FFmpeg job completed:", result) - except Exception as e: - logging.info("Error:", e) - - else: - # Execute Serially - for params in convert_video_params: - try: - convert_video(*params, self.job_settings.ffmpeg_thread_cnt) - logging.info("FFmpeg job completed:", result) - except Exception as e: - logging.info("Error:", e) - def run_job(self) -> JobResponse: """ Main public method to run the compression job. @@ -230,10 +83,20 @@ def run_job(self) -> JobResponse: """ job_start_time = time() - self._format_output_directory() - path_comp_req_pairs = self._resolve_compression_requests() - self._run_compression(path_comp_req_pairs) + video_comp_pairs = ( + self.job_settings.video_specific_compression_requests + ) + job_out_dir_path = self.job_settings.output_directory.resolve() + Path(job_out_dir_path).mkdir(exist_ok=True) + job_in_dir_path = self.job_settings.input_source.resolve() + overrides = build_overrides_dict(video_comp_pairs, job_in_dir_path) + ffmpeg_arg_set = ( + self.job_settings.compression_requested.determine_ffmpeg_arg_set() + ) + transform_directory( + job_in_dir_path, job_out_dir_path, ffmpeg_arg_set, overrides + ) job_end_time = time() return JobResponse( status_code=200, @@ -264,11 +127,5 @@ def run_job(self) -> JobResponse: job = BehaviorVideoJob(job_settings=job_settings) job_response = job.run_job() print(job_response.status_code) - logging.info(job_response.model_dump_json()) -# TODO: -# - Expose number of thread parameter -# - Add unit tests for helper methods -# - Unit test convert_video - -# Will push this in at the end and call it a day. + logging.info(job_response.model_dump_json()) diff --git a/src/aind_behavior_video_transformation/filesystem.py b/src/aind_behavior_video_transformation/filesystem.py new file mode 100644 index 0000000..6e44e82 --- /dev/null +++ b/src/aind_behavior_video_transformation/filesystem.py @@ -0,0 +1,128 @@ +"""Module for handling file discovery to transform videos.""" + +from os import symlink, walk +from os.path import relpath +from pathlib import Path + +from aind_behavior_video_transformation.transform_videos import convert_video + + +def likely_video_file(file: Path) -> bool: + """ + Check if a file is likely a video file based on its suffix. + + Parameters + ---------- + file : Path + The file path to check. + + Returns + ------- + bool + True if the file suffix indicates it is a video file, False otherwise. + """ + return file.suffix in set( + [ + ".mp4", + ".avi", + ".mov", + ".mkv", + ".flv", + ".wmv", + ".webm", + ] + ) + + +def build_overrides_dict(video_comp_pairs, job_in_dir_path): + """ + Builds a dictionary of override arguments for video paths. + + Parameters + ---------- + video_comp_pairs : list of tuple + A list of tuples where each tuple contains a video file name and a + corresponding CompressionRequest object. + job_in_dir_path : Path + The base directory path where the job input files are located. + + Returns + ------- + dict + A dictionary where keys are video paths (either files or directories) + and values are the override argument sets determined by the + CompressionRequest objects. + """ + overrides = dict() + if video_comp_pairs: + for video_name, comp_req in video_comp_pairs: + video_path = Path(video_name) + # Figure out how video path was passed, convert to absolute + if video_path.is_absolute(): + in_path = video_path + elif video_path.exists(): + in_path = video_path.resolve() + else: + in_path = (job_in_dir_path / video_path).resolve() + # Set overrides for the video path + override_arg_set = comp_req.determine_ffmpeg_arg_set() + # If it is a directory, set overrides for all subdirectories + if in_path.is_dir(): + overrides[in_path] = override_arg_set + for root, dirs, _ in walk(in_path, followlinks=True): + root_path = Path(root) + for dir_name in dirs: + subdir = root_path / dir_name + overrides[subdir] = override_arg_set + # If it is a file, set override for the file + else: + overrides[in_path] = override_arg_set + + return overrides + + +def transform_directory( + input_dir: Path, output_dir: Path, arg_set, overrides=dict() +) -> None: + """ + Transforms all video files in a directory and its subdirectories, + and creates symbolic links for non-video files. Subdirectories are + created as needed. + + Parameters + ---------- + input_dir : Path + The directory containing the input files. + output_dir : Path + The directory where the transformed files and symbolic links will be + saved. + arg_set : Any + The set of arguments to be used for video transformation. + overrides : dict, optional + A dictionary containing overrides for specific directories or files. + Keys are Paths and values are argument sets. Default is an empty + dictionary. + + Returns + ------- + None + """ + for root, dirs, files in walk(input_dir, followlinks=True): + root_path = Path(root) + in_relpath = relpath(root, input_dir) + dst_dir = output_dir / in_relpath + for dir_name in dirs: + out_path = dst_dir / dir_name + out_path.mkdir(parents=True, exist_ok=True) + + for file_name in files: + file_path = Path(root) / file_name + if likely_video_file(file_path): + # If the parent directory has an override, use that + this_arg_set = overrides.get(root_path, arg_set) + # File-level overrides take precedence + this_arg_set = overrides.get(file_path, this_arg_set) + convert_video(file_path, dst_dir, this_arg_set) + else: + out_path = dst_dir / file_name + symlink(file_path, out_path) diff --git a/src/aind_behavior_video_transformation/transform_videos.py b/src/aind_behavior_video_transformation/transform_videos.py index cbf6b80..6df7ba7 100644 --- a/src/aind_behavior_video_transformation/transform_videos.py +++ b/src/aind_behavior_video_transformation/transform_videos.py @@ -1,17 +1,6 @@ -""" -Module to handle transforming behavior videos - -To add a new compression preset: -1) Define a CompressionEnum: 'CUSTOM_COMPRESSION = 'custom' -2) Define corresponding FfmpegInputArgs/FfmpegOutputArgs. -3) Add the CompressionEnum to FfmpegArgSet, and build - (FfmpegInputArgs, FfmpegOutputArgs) tuple: - 'CUSTOM_COMPRESSION' = ( - FfmpegInputArgs.CUSTOM_INPUT_ARGS, - FfmpegOutputArgs.CUSTOM_OUTPUT_ARGS, - ) -""" +"""Module to handle transforming behavior videos""" +import logging import shlex import subprocess from enum import Enum @@ -134,7 +123,7 @@ class CompressionRequest(BaseModel): def determine_ffmpeg_arg_set( self, - ) -> Optional[Tuple[str, str]]: + ) -> Optional[Tuple[Optional[str], Optional[str]]]: """ Determines the appropriate set of FFmpeg arguments based on the compression requirements. @@ -164,7 +153,6 @@ def determine_ffmpeg_arg_set( self.user_ffmpeg_input_options, self.user_ffmpeg_output_options, ) - # If not one of the two special cases, use the enum values else: # If default, set compression to gamma @@ -172,23 +160,12 @@ def determine_ffmpeg_arg_set( compression_preset = CompressionEnum.GAMMA_ENCODING else: compression_preset = self.compression_enum - - # Resolve two levels of indirection here - # FfmpegArgSet -> (FfmpegInputArgs, FfmpegOutputArgs) - # (FfmpegInputArgs, FfmpegOutputArgs) - # -> (in_args str, out_args str) arg_set_enum = FfmpegArgSet[compression_preset.name].value arg_set = (arg_set_enum[0].value, arg_set_enum[1].value) - return arg_set -def convert_video( - video_path: Path, - output_dir: Path, - arg_set: Optional[Tuple[str, str]], - ffmpeg_thread_cnt: int = 8 -) -> Path: +def convert_video(video_path: Path, dst: Path, arg_set) -> Path: """ Converts a video to a specified format using ffmpeg. @@ -196,7 +173,7 @@ def convert_video( ---------- video_path : Path The path to the input video file. - output_dir : Path + dst : Path The destination directory where the converted video will be saved. arg_set : tuple or None A tuple containing input and output arguments for ffmpeg. If None, a @@ -212,9 +189,11 @@ def convert_video( - The function uses ffmpeg for video conversion. - If `arg_set` is None, the function creates a symlink to the original video file. + - The function logs the ffmpeg command used for conversion. """ - out_path = output_dir / f"{video_path.stem}.mp4" # noqa: E501 + out_path = dst / f"{video_path.stem}.mp4" # noqa: E501 + # Pydantic validation ensures this is a 'CompressionRequest' value. # Trivial Case, do nothing if arg_set is None: @@ -225,10 +204,6 @@ def convert_video( output_args = arg_set[1] ffmpeg_command = ["ffmpeg", "-y", "-v", "warning", "-hide_banner"] - - # Set thread count - ffmpeg_command.extend(["-threads", str(ffmpeg_thread_cnt)]) - if input_args: ffmpeg_command.extend(shlex.split(input_args)) ffmpeg_command.extend(["-i", str(video_path)]) @@ -236,6 +211,10 @@ def convert_video( ffmpeg_command.extend(shlex.split(output_args)) ffmpeg_command.append(str(out_path)) + # For logging I guess + ffmpeg_str = " ".join(ffmpeg_command) + logging.info(f"{ffmpeg_str=}") + subprocess.run(ffmpeg_command, check=True) return out_path diff --git a/tests/test_transform_videos.py b/tests/test_transform_videos.py index 8d31332..b08bc29 100644 --- a/tests/test_transform_videos.py +++ b/tests/test_transform_videos.py @@ -1,10 +1,10 @@ """Tests transform_videos module.""" import shlex -import shutil import subprocess import tempfile import unittest +from os import symlink from pathlib import Path from unittest.mock import MagicMock, patch @@ -43,6 +43,9 @@ def helper_run_compression_job(job_settings, mock_time): class TestBehaviorVideoJob(unittest.TestCase): """Test methods in BehaviorVideoJob class.""" + # NOTE: + # Test suite does not run yet. + # Resolving lint errors first. test_data_path = Path("tests/test_video_in_dir").resolve() dummy_response = JobResponse( status_code=200, @@ -110,9 +113,7 @@ def test_run_job_with_data_structure(self, mock_time: MagicMock): camera_in_paths = [in_path / d for d in camera_subdirs] for camera_path in camera_in_paths: camera_path.mkdir() - shutil.copy( - str(test_vid_path), str(camera_path / test_vid_name) - ) + symlink(test_vid_path, camera_path / test_vid_name) open(camera_path / metadata_file, "w").close() with tempfile.TemporaryDirectory() as out_temp_dir: