Skip to content

Commit

Permalink
Parallel Compression (#16)
Browse files Browse the repository at this point in the history
Allow for parallel compression of videos.

Closes #14

---------

Co-authored-by: Galen Lynch <[email protected]>
  • Loading branch information
jwong-nd and galenlynch authored Nov 11, 2024
1 parent 7d5d6e9 commit b233655
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 28 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test_and_lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ jobs:
- name: Run linter checks
run: flake8 . && interrogate --verbose .
- name: Run tests and coverage
run: coverage run -m unittest discover && coverage report
run: coverage run -m unittest discover && coverage report -m
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ exclude_lines = [
"import",
"pragma: no cover",
]
fail_under = 97
fail_under = 93

[tool.isort]
line_length = 79
Expand Down
57 changes: 56 additions & 1 deletion src/aind_behavior_video_transformation/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import logging
import sys
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path
from time import time
from typing import List, Optional, Tuple, Union
Expand All @@ -20,6 +21,7 @@
)
from aind_behavior_video_transformation.transform_videos import (
CompressionRequest,
convert_video,
)


Expand All @@ -43,6 +45,13 @@ class BehaviorVideoJobSettings(BasicJobSettings):
"request"
),
)
parallel_compression: bool = Field(
default=True,
description="Run compression in parallel or sequentially.",
)
ffmpeg_thread_cnt: int = Field(
default=0, description="Number of threads per ffmpeg compression job."
)


class BehaviorVideoJob(GenericEtl[BehaviorVideoJobSettings]):
Expand All @@ -64,6 +73,50 @@ class BehaviorVideoJob(GenericEtl[BehaviorVideoJobSettings]):
run_job() -> JobResponse
"""

def _run_compression(
self,
convert_video_args: list[tuple[Path, Path, tuple[str, str] | None]],
) -> None:
"""
Runs CompressionRequests at the specified paths.
"""
error_traces = []
if self.job_settings.parallel_compression:
# Execute in-parallel
num_jobs = len(convert_video_args)
with ProcessPoolExecutor(max_workers=num_jobs) as executor:
jobs = [
executor.submit(
convert_video,
*params,
self.job_settings.ffmpeg_thread_cnt,
)
for params in convert_video_args
]
for job in as_completed(jobs):
result = job.result()
if isinstance(result, tuple):
error_traces.append(result[1])
else:
logging.info(f"FFmpeg job completed: {result}")
else:
# Execute serially
for params in convert_video_args:
result = convert_video(
*params, self.job_settings.ffmpeg_thread_cnt
)
if isinstance(result, tuple):
error_traces.append(result[1])
else:
logging.info(f"FFmpeg job completed: {result}")

if error_traces:
for e in error_traces:
logging.error(e)
raise RuntimeError(
"One or more Ffmpeg jobs failed. See error logs."
)

def run_job(self) -> JobResponse:
"""
Main public method to run the compression job.
Expand Down Expand Up @@ -94,9 +147,11 @@ def run_job(self) -> JobResponse:
ffmpeg_arg_set = (
self.job_settings.compression_requested.determine_ffmpeg_arg_set()
)
transform_directory(
convert_video_args = transform_directory(
job_in_dir_path, job_out_dir_path, ffmpeg_arg_set, overrides
)
self._run_compression(convert_video_args)

job_end_time = time()
return JobResponse(
status_code=200,
Expand Down
13 changes: 8 additions & 5 deletions src/aind_behavior_video_transformation/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
from os.path import relpath
from pathlib import Path

from aind_behavior_video_transformation.transform_videos import convert_video


def likely_video_file(file: Path) -> bool:
"""
Expand Down Expand Up @@ -83,7 +81,7 @@ def build_overrides_dict(video_comp_pairs, job_in_dir_path):

def transform_directory(
input_dir: Path, output_dir: Path, arg_set, overrides=dict()
) -> None:
) -> list[tuple[Path, Path, tuple[str, str] | None]]:
"""
Transforms all video files in a directory and its subdirectories,
and creates symbolic links for non-video files. Subdirectories are
Expand All @@ -105,8 +103,10 @@ def transform_directory(
Returns
-------
None
List of tuples containing convert_video arguments.
"""

convert_video_args = []
for root, dirs, files in walk(input_dir, followlinks=True):
root_path = Path(root)
in_relpath = relpath(root, input_dir)
Expand All @@ -122,7 +122,10 @@ def transform_directory(
this_arg_set = overrides.get(root_path, arg_set)
# File-level overrides take precedence
this_arg_set = overrides.get(file_path, this_arg_set)
convert_video(file_path, dst_dir, this_arg_set)
convert_video_args.append((file_path, dst_dir, this_arg_set))

else:
out_path = dst_dir / file_name
symlink(file_path, out_path)

return convert_video_args
69 changes: 53 additions & 16 deletions src/aind_behavior_video_transformation/transform_videos.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,25 @@
"""Module to handle transforming behavior videos"""
"""
Module to handle transforming behavior videos
To add a new compression preset:
1) Define FfmpegInputArgs/FfmpegOutputArgs.
2) Define a CompressionEnum: 'NEW_PRESET = 'new_preset'
3) Add the CompressionEnum to FfmpegArgSet, and build
(FfmpegInputArgs, FfmpegOutputArgs) tuple:
'NEW_PRESET' = (
FfmpegInputArgs.CUSTOM_INPUT_ARGS,
FfmpegOutputArgs.CUSTOM_OUTPUT_ARGS,
)
FfmpegInputArgs / FfmpegOutputArgs can be prexisitng or newly-defined in (1)
"""

import logging
import shlex
import subprocess
from enum import Enum
from os import symlink
from pathlib import Path
from typing import Optional, Tuple
from subprocess import CalledProcessError
from typing import Optional, Tuple, Union

from pydantic import BaseModel, Field

Expand Down Expand Up @@ -123,7 +136,7 @@ class CompressionRequest(BaseModel):

def determine_ffmpeg_arg_set(
self,
) -> Optional[Tuple[Optional[str], Optional[str]]]:
) -> Optional[Tuple[str, str]]:
"""
Determines the appropriate set of FFmpeg arguments based on the
compression requirements.
Expand Down Expand Up @@ -153,31 +166,44 @@ def determine_ffmpeg_arg_set(
self.user_ffmpeg_input_options,
self.user_ffmpeg_output_options,
)

# If not one of the two special cases, use the enum values
else:
# If default, set compression to gamma
if comp_req == CompressionEnum.DEFAULT:
compression_preset = CompressionEnum.GAMMA_ENCODING
else:
compression_preset = self.compression_enum

# Resolve two levels of indirection here
# FfmpegArgSet -> (FfmpegInputArgs, FfmpegOutputArgs)
# (FfmpegInputArgs, FfmpegOutputArgs)
# -> (in_args str, out_args str)
arg_set_enum = FfmpegArgSet[compression_preset.name].value
arg_set = (arg_set_enum[0].value, arg_set_enum[1].value)

return arg_set


def convert_video(video_path: Path, dst: Path, arg_set) -> Path:
def convert_video(
video_path: Path,
output_dir: Path,
arg_set: Optional[Tuple[str, str]],
ffmpeg_thread_cnt: int = 0,
) -> Union[str, Tuple[str, str]]:
"""
Converts a video to a specified format using ffmpeg.
Parameters
----------
video_path : Path
The path to the input video file.
dst : Path
output_dir : Path
The destination directory where the converted video will be saved.
arg_set : tuple or None
A tuple containing input and output arguments for ffmpeg. If None, a
symlink to the original video is created.
ffmpeg_thread_cnt : set number of ffmpeg threads
Returns
-------
Expand All @@ -189,11 +215,9 @@ def convert_video(video_path: Path, dst: Path, arg_set) -> Path:
- The function uses ffmpeg for video conversion.
- If `arg_set` is None, the function creates a symlink to the original
video file.
- The function logs the ffmpeg command used for conversion.
"""

out_path = dst / f"{video_path.stem}.mp4" # noqa: E501
# Pydantic validation ensures this is a 'CompressionRequest' value.
out_path = output_dir / f"{video_path.stem}.mp4" # noqa: E501

# Trivial Case, do nothing
if arg_set is None:
Expand All @@ -204,17 +228,30 @@ def convert_video(video_path: Path, dst: Path, arg_set) -> Path:
output_args = arg_set[1]

ffmpeg_command = ["ffmpeg", "-y", "-v", "warning", "-hide_banner"]

# Set thread count
if ffmpeg_thread_cnt > 0:
ffmpeg_command.extend(["-threads", str(ffmpeg_thread_cnt)])

if input_args:
ffmpeg_command.extend(shlex.split(input_args))
ffmpeg_command.extend(["-i", str(video_path)])
if output_args:
ffmpeg_command.extend(shlex.split(output_args))
ffmpeg_command.append(str(out_path))

# For logging I guess
ffmpeg_str = " ".join(ffmpeg_command)
logging.info(f"{ffmpeg_str=}")

subprocess.run(ffmpeg_command, check=True)

return out_path
# Capture and return error message if it exists
try:
subprocess.run(
ffmpeg_command, check=True, capture_output=True, text=True
)
return str(out_path)

except CalledProcessError as e:
error_msg = (
f"FFmpeg conversion failed for {video_path}\n"
f"Command: {' '.join(ffmpeg_command)}\n"
f"Return code: {e.returncode}\n"
f"Error output:\n{e.stderr}\n"
)
return (str(out_path), error_msg)
Loading

0 comments on commit b233655

Please sign in to comment.