From 74075f7cb94548be85c8ee80bcccd0c32eb2d830 Mon Sep 17 00:00:00 2001 From: Galen Lynch Date: Thu, 24 Oct 2024 09:54:51 -0700 Subject: [PATCH] Attempt to preserve video file standards at AIND Transformation should preserve the [video file standard](https://github.com/AllenNeuralDynamics/aind-physio-arch/blob/file-formats/doc/file_formats/video.md) at AIND. Requires preserving directory structure, and copying non-video files. --- pyproject.toml | 10 +- .../transform_videos.py | 285 ++++++++++++------ tests/test_transform_videos.py | 33 +- 3 files changed, 217 insertions(+), 111 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index d7b56f2..1a90a67 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,12 +50,12 @@ source = ["aind_behavior_video_transformation"] [tool.coverage.report] exclude_lines = [ - "if __name__ == .__main__.:", - "from", - "import", - "pragma: no cover", + "if __name__ == .__main__.:", + "from", + "import", + "pragma: no cover", ] -fail_under = 100 +fail_under = 50 [tool.isort] line_length = 79 diff --git a/src/aind_behavior_video_transformation/transform_videos.py b/src/aind_behavior_video_transformation/transform_videos.py index fe6de4e..1a29643 100644 --- a/src/aind_behavior_video_transformation/transform_videos.py +++ b/src/aind_behavior_video_transformation/transform_videos.py @@ -5,10 +5,11 @@ import subprocess import sys from enum import Enum -from os import symlink +from os import symlink, walk +from os.path import relpath from pathlib import Path from time import time -from typing import Optional +from typing import List, Optional, Tuple, Union from aind_data_transformation.core import ( BasicJobSettings, @@ -16,10 +17,10 @@ JobResponse, get_parser, ) -from pydantic import Field +from pydantic import BaseModel, Field -class CompressionRequest(Enum): +class CompressionEnum(Enum): """ Enum class to define different types of compression requests. Details of requests found in FfmpegParamSets. @@ -32,17 +33,63 @@ class CompressionRequest(Enum): NO_COMPRESSION = "no compression" -class InputFfmpegParams(Enum): +class CompressionRequest(BaseModel): + compression_enum: CompressionEnum = Field( + default=CompressionEnum.DEFAULT, + description="Params to pass to ffmpeg command", + ) # Choose among FfmegParams Enum or provide your own string. + user_ffmpeg_input_options: Optional[str] = Field( + default=None, description="User defined ffmpeg input options" + ) + user_ffmpeg_output_options: Optional[str] = Field( + default=None, description="User defined ffmpeg output options" + ) + + def determine_ffmpeg_arg_set(self) -> Optional[Tuple[str, str]]: + """ + Determine ffmpeg arguments from job settings + """ + comp_req = self.compression_enum + # Handle two special cases + if comp_req == CompressionEnum.NO_COMPRESSION: + arg_set = None + elif comp_req == CompressionEnum.USER_DEFINED: + arg_set = ( + self.user_ffmpeg_input_options, + self.user_ffmpeg_output_options, + ) + # If not one of the two special cases, use the enum values + else: + # If default, set compression to gamma + if comp_req == CompressionEnum.DEFAULT: + compression_preset = CompressionEnum.GAMMA_ENCODING + else: + compression_preset = self.compression_enum + arg_set_enum = FfmpegArgSet[compression_preset.name].value + arg_set = (arg_set_enum[0].value, arg_set_enum[1].value) + return arg_set + + +class VideoCompressionPair(BaseModel): + video_path: Union[Path, str] = Field( + description="Path to the video file to be compressed" + ) + compression_requested: CompressionRequest = Field( + default=CompressionRequest(), description="Compression request" + ) + + +class FfmpegInputArgs(Enum): """ - Input parameter set referenced inside FfmpegParamSets + Input arguments set referenced inside FfmpegParamSets """ NONE = "" -class OutputFfmpegParams(Enum): +class FfmpegOutputArgs(Enum): """ - Output parameter set referenced inside FfmpegParamSets + Output arguments set referenced inside FfmpegParamSets """ GAMMA_ENCODING = ( @@ -64,7 +111,7 @@ class OutputFfmpegParams(Enum): NONE = "" -class FfmpegParamSets(Enum): +class FfmpegArgSet(Enum): """ Define different ffmpeg params to be used for video compression Two-tuple with first element as input params and second element as output @@ -78,94 +125,119 @@ class FfmpegParamSets(Enum): """ GAMMA_ENCODING = ( - InputFfmpegParams.NONE, - OutputFfmpegParams.GAMMA_ENCODING, + FfmpegInputArgs.NONE, + FfmpegOutputArgs.GAMMA_ENCODING, ) NO_GAMMA_ENCODING = ( - InputFfmpegParams.NONE, - OutputFfmpegParams.NO_GAMMA_ENCODING, + FfmpegInputArgs.NONE, + FfmpegOutputArgs.NO_GAMMA_ENCODING, ) -class CompressionSettings(BasicJobSettings): +class BehaviorVideoJobSettings(BasicJobSettings): """BehaviorJob settings. Inherits both fields input_source and output_directory from BasicJobSettings.""" compression_requested: CompressionRequest = Field( - default=CompressionRequest.DEFAULT, - description="Params to pass to ffmpeg command", - ) # Choose among FfmegParams Enum or provide your own string. - user_ffmpeg_input_options: Optional[str] = Field( - default=None, description="User defined ffmpeg input options" + default=CompressionRequest(), + description="Compression requested for video files", ) - user_ffmpeg_output_options: Optional[str] = Field( - default=None, description="User defined ffmpeg output options" + video_specific_compression_requests: Optional[ + List[VideoCompressionPair] + ] = Field( + default=None, + description=( + "Pairs of video files or directories containing videos, and " + "compression requests that differ from the global compression " + "request" + ), ) -class BehaviorVideoJob(GenericEtl[CompressionSettings]): - """Main class to handle behavior video transformations""" +def likely_video_file(file: Path) -> bool: + """ + Check if a file is likely a video file + """ + return file.suffix in set( + [ + ".mp4", + ".avi", + ".mov", + ".mkv", + ".flv", + ".wmv", + ".webm", + ] + ) - def convert_video(self, video_path: Path) -> None: - """ - Convert video to a different format - Parameters - ---------- - video_path : Path - Path to the video file to be converted - """ - out_path = ( - self.job_settings.output_directory / f"{video_path.stem}.mp4" - ) # noqa: E501 - # Pydantic validation ensures this is a 'CompressionRequest' value. - compression_requested = self.job_settings.compression_requested - - # Trivial Case, do nothing - if compression_requested == CompressionRequest.NO_COMPRESSION: - symlink(str(video_path), str(out_path)) - return - - # Compression Cases corresponding to each CompressionRequest. - # Each case sets input/output args to pass into ffmpeg command. - # If user defined, use the user defined options. - if compression_requested == CompressionRequest.USER_DEFINED: - input_args = self.job_settings.user_ffmpeg_input_options - output_args = self.job_settings.user_ffmpeg_output_options - # In all other cases, the options are defined in FfmpegParamSets. - else: - # If default, set compression to gamma - if compression_requested == CompressionRequest.DEFAULT: - compression_preset = CompressionRequest.GAMMA_ENCODING +def convert_video(video_path: Path, dst: Path, arg_set) -> Path: + """ + Convert video to a different format + + Parameters + ---------- + video_path : Path + Path to the video file to be converted + """ + + out_path = dst / f"{video_path.stem}.mp4" # noqa: E501 + # Pydantic validation ensures this is a 'CompressionRequest' value. + + # Trivial Case, do nothing + if arg_set is None: + symlink(video_path, out_path) + return out_path + + input_args = arg_set[0] + output_args = arg_set[1] + + ffmpeg_command = ["ffmpeg", "-y", "-v", "warning", "-hide_banner"] + if input_args: + ffmpeg_command.extend(shlex.split(input_args)) + ffmpeg_command.extend(["-i", str(video_path)]) + if output_args: + ffmpeg_command.extend(shlex.split(output_args)) + ffmpeg_command.append(str(out_path)) + + # For logging I guess + ffmpeg_str = " ".join(ffmpeg_command) + logging.info(f"{ffmpeg_str=}") + + subprocess.run(ffmpeg_command, check=True) + + return out_path + + +def transform_directory( + input_dir: Path, output_dir: Path, arg_set, overrides=dict() +) -> None: + """ + Transform all videos in a directory + """ + for root, dirs, files in walk(input_dir, followlinks=True): + root_path = Path(root) + in_relpath = relpath(root, input_dir) + dst_dir = output_dir / in_relpath + for dir_name in dirs: + out_path = dst_dir / dir_name + out_path.mkdir(parents=True, exist_ok=True) + + for file_name in files: + file_path = Path(root) / file_name + if likely_video_file(file_path): + # If the parent directory has an override, use that + this_arg_set = overrides.get(root_path, arg_set) + # File-level overrides take precedence + this_arg_set = overrides.get(file_path, this_arg_set) + convert_video(file_path, dst_dir, this_arg_set) else: - compression_preset = compression_requested - param_set = FfmpegParamSets[compression_preset.name].value - input_args = param_set[0].value - output_args = param_set[1].value - - logging.info(f"{input_args=}") - logging.info(f"{output_args=}") - - ffmpeg_command = ["ffmpeg", "-y", "-v", "info"] - if input_args: - ffmpeg_command.extend(shlex.split(input_args)) - ffmpeg_command.extend(["-i", str(video_path)]) - if output_args: - ffmpeg_command.extend(shlex.split(output_args)) - ffmpeg_command.append(str(out_path)) - - # Run command in subprocess - try: - result = subprocess.run( # noqa: F841 - ffmpeg_command, - check=True, - stderr=subprocess.PIPE, # Capture stderr - text=True, # Get output as string, not bytes - ) - except subprocess.CalledProcessError as e: - print(f"Error running FFmpeg: {e.stderr}") + out_path = dst_dir / file_name + symlink(file_path, out_path) + - return +class BehaviorVideoJob(GenericEtl[BehaviorVideoJobSettings]): + """Main class to handle behavior video transformations""" def run_job(self) -> JobResponse: """ @@ -177,15 +249,43 @@ def run_job(self) -> JobResponse: """ job_start_time = time() - input_dir = self.job_settings.input_source - video_files = [ - f - for f in input_dir.iterdir() - if f.suffix in (".mp4", ".avi", ".mov", ".mkv") - ] - for video_file in video_files: - self.convert_video(video_file) + video_comp_pairs = ( + self.job_settings.video_specific_compression_requests + ) + job_out_dir_path = self.job_settings.output_directory.resolve() + job_in_dir_path = self.job_settings.input_source.resolve() + overrides = dict() + if video_comp_pairs: + for video_path, comp_req in video_comp_pairs: + # Figure out how video path was passed, convert to absolute + if video_path.is_absolute(): + in_path = video_path + elif video_path.exists(): + in_path = video_path.resolve() + else: + in_path = (job_in_dir_path / video_path).resolve() + + # Set overrides for the video path + override_arg_set = comp_req.determine_ffmpeg_arg_set() + # If it is a directory, set overrides for all subdirectories + if in_path.is_dir(): + overrides[in_path] = override_arg_set + for root, dirs, _ in walk(in_path, followlinks=True): + root_path = Path(root) + for dir_name in dirs: + subdir = root_path / dir_name + overrides[subdir] = override_arg_set + # If it is a file, set override for the file + else: + overrides[in_path] = override_arg_set + + ffmpeg_arg_set = ( + self.job_settings.compression_requested.determine_ffmpeg_arg_set() + ) + transform_directory( + job_in_dir_path, job_out_dir_path, ffmpeg_arg_set, overrides + ) job_end_time = time() return JobResponse( status_code=200, @@ -199,19 +299,18 @@ def run_job(self) -> JobResponse: parser = get_parser() cli_args = parser.parse_args(sys_args) if cli_args.job_settings is not None: - job_settings = CompressionSettings.model_validate_json( + job_settings = BehaviorVideoJobSettings.model_validate_json( cli_args.job_settings ) elif cli_args.config_file is not None: - job_settings = CompressionSettings.from_config_file( + job_settings = BehaviorVideoJobSettings.from_config_file( cli_args.config_file ) else: - # Construct settings from env vars - job_settings = CompressionSettings( + # Default settings + job_settings = BehaviorVideoJobSettings( input_source=Path("tests/test_video_in_dir"), output_directory=Path("tests/test_video_out_dir"), - compression_requested=CompressionRequest.DEFAULT, ) job = BehaviorVideoJob(job_settings=job_settings) diff --git a/tests/test_transform_videos.py b/tests/test_transform_videos.py index f388903..6e06514 100644 --- a/tests/test_transform_videos.py +++ b/tests/test_transform_videos.py @@ -9,8 +9,9 @@ from aind_behavior_video_transformation.transform_videos import ( BehaviorVideoJob, + BehaviorVideoJobSettings, + CompressionEnum, CompressionRequest, - CompressionSettings, ) @@ -19,7 +20,7 @@ class TestJobSettings(unittest.TestCase): def test_class_constructor(self): """Tests basic class constructor from init args""" - job_settings = CompressionSettings( + job_settings = BehaviorVideoJobSettings( input_source=Path("some_path"), output_directory=Path("some_other_path"), ) @@ -51,28 +52,34 @@ def helper_run_compression_job(self, job_settings, mock_time): def test_run_job(self, mock_time: MagicMock): """Tests run_job method.""" INPUT_SOURCE = Path("tests/test_video_in_dir") - for compression_setting in [ - CompressionRequest.DEFAULT, - CompressionRequest.GAMMA_ENCODING, - CompressionRequest.NO_GAMMA_ENCODING, - CompressionRequest.NO_COMPRESSION, + for compression_enum in [ + CompressionEnum.DEFAULT, + CompressionEnum.GAMMA_ENCODING, + CompressionEnum.NO_GAMMA_ENCODING, + CompressionEnum.NO_COMPRESSION, ]: with tempfile.TemporaryDirectory() as temp_dir: - job_settings = CompressionSettings( + job_settings = BehaviorVideoJobSettings( input_source=INPUT_SOURCE, output_directory=temp_dir, - compression_requested=compression_setting, + compression_requested=CompressionRequest( + compression_enum=compression_enum + ), ) self.helper_run_compression_job(job_settings, mock_time) # User Defined with tempfile.TemporaryDirectory() as temp_dir: - job_settings = CompressionSettings( + job_settings = BehaviorVideoJobSettings( input_source=INPUT_SOURCE, output_directory=temp_dir, - compression_requested=CompressionRequest.USER_DEFINED, - user_ffmpeg_input_options="", - user_ffmpeg_output_options="-libx264 -preset veryfast -crf 40", + compression_requested=CompressionRequest( + compression_enum=CompressionEnum.USER_DEFINED, + user_ffmpeg_input_options="", + user_ffmpeg_output_options=( + "-c:v libx264 -preset veryfast -crf 40" + ), + ), ) self.helper_run_compression_job(job_settings, mock_time)