Skip to content

Commit

Permalink
add parallelization
Browse files Browse the repository at this point in the history
  • Loading branch information
jwong-nd committed Nov 6, 2024
1 parent 8a423c7 commit ba9dced
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 16 deletions.
41 changes: 40 additions & 1 deletion src/aind_behavior_video_transformation/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import logging
import sys
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path
from time import time
from typing import List, Optional, Tuple, Union
Expand All @@ -20,6 +21,7 @@
)
from aind_behavior_video_transformation.transform_videos import (
CompressionRequest,
convert_video
)


Expand All @@ -43,6 +45,14 @@ class BehaviorVideoJobSettings(BasicJobSettings):
"request"
),
)
parallel_compression: bool = Field(
default=True,
description="Run compression in parallel or sequentially.",
)
ffmpeg_thread_cnt: int = Field(
default=0,
description="Number of threads per ffmpeg compression job."
)


class BehaviorVideoJob(GenericEtl[BehaviorVideoJobSettings]):
Expand All @@ -64,6 +74,33 @@ class BehaviorVideoJob(GenericEtl[BehaviorVideoJobSettings]):
run_job() -> JobResponse
"""

def _run_compression(
self,
convert_video_args: list[tuple[Path, Path, tuple[str, str] | None]]
) -> None:
"""
Runs CompressionRequests at the specified paths.
"""

if self.job_settings.parallel_compression:
# ProcessPool implementation
num_jobs = len(convert_video_args)
with ProcessPoolExecutor(max_workers=num_jobs) as executor:
jobs = [
executor.submit(convert_video, *params,
self.job_settings.ffmpeg_thread_cnt)
for params in convert_video_args
]
for job in as_completed(jobs):
result = job.result()
logging.info("FFmpeg job completed:", result)

else:
# Execute Serially
for params in convert_video_args:
convert_video(*params, self.job_settings.ffmpeg_thread_cnt)
logging.info("FFmpeg job completed:", result)

def run_job(self) -> JobResponse:
"""
Main public method to run the compression job.
Expand Down Expand Up @@ -94,9 +131,11 @@ def run_job(self) -> JobResponse:
ffmpeg_arg_set = (
self.job_settings.compression_requested.determine_ffmpeg_arg_set()
)
transform_directory(
convert_video_args = transform_directory(
job_in_dir_path, job_out_dir_path, ffmpeg_arg_set, overrides
)
self._run_compression(convert_video_args)

job_end_time = time()
return JobResponse(
status_code=200,
Expand Down
11 changes: 8 additions & 3 deletions src/aind_behavior_video_transformation/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def build_overrides_dict(video_comp_pairs, job_in_dir_path):

def transform_directory(
input_dir: Path, output_dir: Path, arg_set, overrides=dict()
) -> None:
) -> list[tuple[Path, Path, tuple[str, str] | None]]:
"""
Transforms all video files in a directory and its subdirectories,
and creates symbolic links for non-video files. Subdirectories are
Expand All @@ -105,8 +105,10 @@ def transform_directory(
Returns
-------
None
List of tuples containing convert_video arguments.
"""

convert_video_args = []
for root, dirs, files in walk(input_dir, followlinks=True):
root_path = Path(root)
in_relpath = relpath(root, input_dir)
Expand All @@ -122,7 +124,10 @@ def transform_directory(
this_arg_set = overrides.get(root_path, arg_set)
# File-level overrides take precedence
this_arg_set = overrides.get(file_path, this_arg_set)
convert_video(file_path, dst_dir, this_arg_set)
convert_video_args.append((file_path, dst_dir, this_arg_set))

else:
out_path = dst_dir / file_name
symlink(file_path, out_path)

return convert_video_args
46 changes: 34 additions & 12 deletions src/aind_behavior_video_transformation/transform_videos.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
"""Module to handle transforming behavior videos"""
"""
Module to handle transforming behavior videos
To add a new compression preset:
1) Define a CompressionEnum: 'CUSTOM_COMPRESSION = 'custom'
2) Define corresponding FfmpegInputArgs/FfmpegOutputArgs.
3) Add the CompressionEnum to FfmpegArgSet, and build
(FfmpegInputArgs, FfmpegOutputArgs) tuple:
'CUSTOM_COMPRESSION' = (
FfmpegInputArgs.CUSTOM_INPUT_ARGS,
FfmpegOutputArgs.CUSTOM_OUTPUT_ARGS,
)
"""

import logging
import shlex
import subprocess
from enum import Enum
Expand Down Expand Up @@ -123,7 +134,7 @@ class CompressionRequest(BaseModel):

def determine_ffmpeg_arg_set(
self,
) -> Optional[Tuple[Optional[str], Optional[str]]]:
) -> Optional[Tuple[str, str]]:
"""
Determines the appropriate set of FFmpeg arguments based on the
compression requirements.
Expand Down Expand Up @@ -153,31 +164,44 @@ def determine_ffmpeg_arg_set(
self.user_ffmpeg_input_options,
self.user_ffmpeg_output_options,
)

# If not one of the two special cases, use the enum values
else:
# If default, set compression to gamma
if comp_req == CompressionEnum.DEFAULT:
compression_preset = CompressionEnum.GAMMA_ENCODING
else:
compression_preset = self.compression_enum

# Resolve two levels of indirection here
# FfmpegArgSet -> (FfmpegInputArgs, FfmpegOutputArgs)
# (FfmpegInputArgs, FfmpegOutputArgs)
# -> (in_args str, out_args str)
arg_set_enum = FfmpegArgSet[compression_preset.name].value
arg_set = (arg_set_enum[0].value, arg_set_enum[1].value)

return arg_set


def convert_video(video_path: Path, dst: Path, arg_set) -> Path:
def convert_video(
video_path: Path,
output_dir: Path,
arg_set: Optional[Tuple[str, str]],
ffmpeg_thread_cnt: int = 0
) -> Path:
"""
Converts a video to a specified format using ffmpeg.
Parameters
----------
video_path : Path
The path to the input video file.
dst : Path
output_dir : Path
The destination directory where the converted video will be saved.
arg_set : tuple or None
A tuple containing input and output arguments for ffmpeg. If None, a
symlink to the original video is created.
ffmpeg_thread_cnt : set number of ffmpeg threads
Returns
-------
Expand All @@ -189,11 +213,9 @@ def convert_video(video_path: Path, dst: Path, arg_set) -> Path:
- The function uses ffmpeg for video conversion.
- If `arg_set` is None, the function creates a symlink to the original
video file.
- The function logs the ffmpeg command used for conversion.
"""

out_path = dst / f"{video_path.stem}.mp4" # noqa: E501
# Pydantic validation ensures this is a 'CompressionRequest' value.
out_path = output_dir / f"{video_path.stem}.mp4" # noqa: E501

# Trivial Case, do nothing
if arg_set is None:
Expand All @@ -204,17 +226,17 @@ def convert_video(video_path: Path, dst: Path, arg_set) -> Path:
output_args = arg_set[1]

ffmpeg_command = ["ffmpeg", "-y", "-v", "warning", "-hide_banner"]

# Set thread count
ffmpeg_command.extend(["-threads", str(ffmpeg_thread_cnt)])

if input_args:
ffmpeg_command.extend(shlex.split(input_args))
ffmpeg_command.extend(["-i", str(video_path)])
if output_args:
ffmpeg_command.extend(shlex.split(output_args))
ffmpeg_command.append(str(out_path))

# For logging I guess
ffmpeg_str = " ".join(ffmpeg_command)
logging.info(f"{ffmpeg_str=}")

subprocess.run(ffmpeg_command, check=True)

return out_path

0 comments on commit ba9dced

Please sign in to comment.