diff --git a/src/aind_behavior_video_transformation/etl.py b/src/aind_behavior_video_transformation/etl.py index 415791e..f09839a 100644 --- a/src/aind_behavior_video_transformation/etl.py +++ b/src/aind_behavior_video_transformation/etl.py @@ -2,6 +2,7 @@ import logging import sys +from concurrent.futures import ProcessPoolExecutor, as_completed from pathlib import Path from time import time from typing import List, Optional, Tuple, Union @@ -20,6 +21,7 @@ ) from aind_behavior_video_transformation.transform_videos import ( CompressionRequest, + convert_video ) @@ -43,6 +45,14 @@ class BehaviorVideoJobSettings(BasicJobSettings): "request" ), ) + parallel_compression: bool = Field( + default=True, + description="Run compression in parallel or sequentially.", + ) + ffmpeg_thread_cnt: int = Field( + default=0, + description="Number of threads per ffmpeg compression job." + ) class BehaviorVideoJob(GenericEtl[BehaviorVideoJobSettings]): @@ -64,6 +74,33 @@ class BehaviorVideoJob(GenericEtl[BehaviorVideoJobSettings]): run_job() -> JobResponse """ + def _run_compression( + self, + convert_video_args: list[tuple[Path, Path, tuple[str, str] | None]] + ) -> None: + """ + Runs CompressionRequests at the specified paths. + """ + + if self.job_settings.parallel_compression: + # ProcessPool implementation + num_jobs = len(convert_video_args) + with ProcessPoolExecutor(max_workers=num_jobs) as executor: + jobs = [ + executor.submit(convert_video, *params, + self.job_settings.ffmpeg_thread_cnt) + for params in convert_video_args + ] + for job in as_completed(jobs): + result = job.result() + logging.info("FFmpeg job completed:", result) + + else: + # Execute Serially + for params in convert_video_args: + convert_video(*params, self.job_settings.ffmpeg_thread_cnt) + logging.info("FFmpeg job completed:", result) + def run_job(self) -> JobResponse: """ Main public method to run the compression job. @@ -94,9 +131,11 @@ def run_job(self) -> JobResponse: ffmpeg_arg_set = ( self.job_settings.compression_requested.determine_ffmpeg_arg_set() ) - transform_directory( + convert_video_args = transform_directory( job_in_dir_path, job_out_dir_path, ffmpeg_arg_set, overrides ) + self._run_compression(convert_video_args) + job_end_time = time() return JobResponse( status_code=200, diff --git a/src/aind_behavior_video_transformation/filesystem.py b/src/aind_behavior_video_transformation/filesystem.py index 6e44e82..6e2098b 100644 --- a/src/aind_behavior_video_transformation/filesystem.py +++ b/src/aind_behavior_video_transformation/filesystem.py @@ -83,7 +83,7 @@ def build_overrides_dict(video_comp_pairs, job_in_dir_path): def transform_directory( input_dir: Path, output_dir: Path, arg_set, overrides=dict() -) -> None: +) -> list[tuple[Path, Path, tuple[str, str] | None]]: """ Transforms all video files in a directory and its subdirectories, and creates symbolic links for non-video files. Subdirectories are @@ -105,8 +105,10 @@ def transform_directory( Returns ------- - None + List of tuples containing convert_video arguments. """ + + convert_video_args = [] for root, dirs, files in walk(input_dir, followlinks=True): root_path = Path(root) in_relpath = relpath(root, input_dir) @@ -122,7 +124,10 @@ def transform_directory( this_arg_set = overrides.get(root_path, arg_set) # File-level overrides take precedence this_arg_set = overrides.get(file_path, this_arg_set) - convert_video(file_path, dst_dir, this_arg_set) + convert_video_args.append((file_path, dst_dir, this_arg_set)) + else: out_path = dst_dir / file_name symlink(file_path, out_path) + + return convert_video_args \ No newline at end of file diff --git a/src/aind_behavior_video_transformation/transform_videos.py b/src/aind_behavior_video_transformation/transform_videos.py index 6df7ba7..5421e66 100644 --- a/src/aind_behavior_video_transformation/transform_videos.py +++ b/src/aind_behavior_video_transformation/transform_videos.py @@ -1,6 +1,17 @@ -"""Module to handle transforming behavior videos""" +""" +Module to handle transforming behavior videos + +To add a new compression preset: +1) Define a CompressionEnum: 'CUSTOM_COMPRESSION = 'custom' +2) Define corresponding FfmpegInputArgs/FfmpegOutputArgs. +3) Add the CompressionEnum to FfmpegArgSet, and build + (FfmpegInputArgs, FfmpegOutputArgs) tuple: + 'CUSTOM_COMPRESSION' = ( + FfmpegInputArgs.CUSTOM_INPUT_ARGS, + FfmpegOutputArgs.CUSTOM_OUTPUT_ARGS, + ) +""" -import logging import shlex import subprocess from enum import Enum @@ -123,7 +134,7 @@ class CompressionRequest(BaseModel): def determine_ffmpeg_arg_set( self, - ) -> Optional[Tuple[Optional[str], Optional[str]]]: + ) -> Optional[Tuple[str, str]]: """ Determines the appropriate set of FFmpeg arguments based on the compression requirements. @@ -153,6 +164,7 @@ def determine_ffmpeg_arg_set( self.user_ffmpeg_input_options, self.user_ffmpeg_output_options, ) + # If not one of the two special cases, use the enum values else: # If default, set compression to gamma @@ -160,12 +172,23 @@ def determine_ffmpeg_arg_set( compression_preset = CompressionEnum.GAMMA_ENCODING else: compression_preset = self.compression_enum + + # Resolve two levels of indirection here + # FfmpegArgSet -> (FfmpegInputArgs, FfmpegOutputArgs) + # (FfmpegInputArgs, FfmpegOutputArgs) + # -> (in_args str, out_args str) arg_set_enum = FfmpegArgSet[compression_preset.name].value arg_set = (arg_set_enum[0].value, arg_set_enum[1].value) + return arg_set -def convert_video(video_path: Path, dst: Path, arg_set) -> Path: +def convert_video( + video_path: Path, + output_dir: Path, + arg_set: Optional[Tuple[str, str]], + ffmpeg_thread_cnt: int = 0 +) -> Path: """ Converts a video to a specified format using ffmpeg. @@ -173,11 +196,12 @@ def convert_video(video_path: Path, dst: Path, arg_set) -> Path: ---------- video_path : Path The path to the input video file. - dst : Path + output_dir : Path The destination directory where the converted video will be saved. arg_set : tuple or None A tuple containing input and output arguments for ffmpeg. If None, a symlink to the original video is created. + ffmpeg_thread_cnt : set number of ffmpeg threads Returns ------- @@ -189,11 +213,9 @@ def convert_video(video_path: Path, dst: Path, arg_set) -> Path: - The function uses ffmpeg for video conversion. - If `arg_set` is None, the function creates a symlink to the original video file. - - The function logs the ffmpeg command used for conversion. """ - out_path = dst / f"{video_path.stem}.mp4" # noqa: E501 - # Pydantic validation ensures this is a 'CompressionRequest' value. + out_path = output_dir / f"{video_path.stem}.mp4" # noqa: E501 # Trivial Case, do nothing if arg_set is None: @@ -204,6 +226,10 @@ def convert_video(video_path: Path, dst: Path, arg_set) -> Path: output_args = arg_set[1] ffmpeg_command = ["ffmpeg", "-y", "-v", "warning", "-hide_banner"] + + # Set thread count + ffmpeg_command.extend(["-threads", str(ffmpeg_thread_cnt)]) + if input_args: ffmpeg_command.extend(shlex.split(input_args)) ffmpeg_command.extend(["-i", str(video_path)]) @@ -211,10 +237,6 @@ def convert_video(video_path: Path, dst: Path, arg_set) -> Path: ffmpeg_command.extend(shlex.split(output_args)) ffmpeg_command.append(str(out_path)) - # For logging I guess - ffmpeg_str = " ".join(ffmpeg_command) - logging.info(f"{ffmpeg_str=}") - subprocess.run(ffmpeg_command, check=True) return out_path