Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel Compression #16

Merged
merged 32 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
7b09ed3
base commit, agg compression jobs to run in loop
jwong-nd Nov 4, 2024
3c9e90a
add parallelization
jwong-nd Nov 5, 2024
e685bc6
linting
jwong-nd Nov 5, 2024
1e11007
2/3 tests passing
jwong-nd Nov 5, 2024
c94377e
pass tests
jwong-nd Nov 5, 2024
8b9c38f
linting
jwong-nd Nov 5, 2024
e961950
expose parallel parameter
jwong-nd Nov 5, 2024
d721833
coverage threshold to 80
jwong-nd Nov 5, 2024
2084397
remove dask comment
jwong-nd Nov 5, 2024
7d5205d
expose ffmpeg thread parameter
jwong-nd Nov 5, 2024
8a423c7
Revert to main
jwong-nd Nov 6, 2024
ba9dced
add parallelization
jwong-nd Nov 6, 2024
a49d0db
linting
jwong-nd Nov 6, 2024
9ac8ecb
log lines missing test coverage
jwong-nd Nov 6, 2024
36e1c61
test serial compression branch
jwong-nd Nov 6, 2024
b70512f
add unit test for conert_video
jwong-nd Nov 6, 2024
b68b908
lint the test
jwong-nd Nov 6, 2024
cfc3aa1
resolve serial runtime error
jwong-nd Nov 6, 2024
30476fd
linting...
jwong-nd Nov 6, 2024
f7ea735
Documentation
jwong-nd Nov 7, 2024
b16850c
parallel=true in directory test
jwong-nd Nov 7, 2024
7d0566d
Remove redundant test, top level export
galenlynch Nov 8, 2024
8ce822f
Merge pull request #17 from AllenNeuralDynamics/pr-testing
jwong-nd Nov 8, 2024
5d70338
add serial logging to parallel errors
jwong-nd Nov 8, 2024
f5a8b30
linting
jwong-nd Nov 8, 2024
6f963b3
linting 2
jwong-nd Nov 8, 2024
f08332d
runtime error
jwong-nd Nov 8, 2024
7a793a2
maybe this works
jwong-nd Nov 8, 2024
f09ae14
try this
jwong-nd Nov 8, 2024
364508d
add serial log test
jwong-nd Nov 9, 2024
8b76a56
linting
jwong-nd Nov 9, 2024
cb08829
coverage threshold
jwong-nd Nov 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test_and_lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ jobs:
- name: Run linter checks
run: flake8 . && interrogate --verbose .
- name: Run tests and coverage
run: coverage run -m unittest discover && coverage report
run: coverage run -m unittest discover && coverage report -m
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ exclude_lines = [
"import",
"pragma: no cover",
]
fail_under = 97
fail_under = 93

[tool.isort]
line_length = 79
Expand Down
57 changes: 56 additions & 1 deletion src/aind_behavior_video_transformation/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import logging
import sys
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path
from time import time
from typing import List, Optional, Tuple, Union
Expand All @@ -20,6 +21,7 @@
)
from aind_behavior_video_transformation.transform_videos import (
CompressionRequest,
convert_video,
)


Expand All @@ -43,6 +45,13 @@ class BehaviorVideoJobSettings(BasicJobSettings):
"request"
),
)
parallel_compression: bool = Field(
default=True,
description="Run compression in parallel or sequentially.",
)
ffmpeg_thread_cnt: int = Field(
default=0, description="Number of threads per ffmpeg compression job."
)


class BehaviorVideoJob(GenericEtl[BehaviorVideoJobSettings]):
Expand All @@ -64,6 +73,50 @@ class BehaviorVideoJob(GenericEtl[BehaviorVideoJobSettings]):
run_job() -> JobResponse
"""

def _run_compression(
self,
convert_video_args: list[tuple[Path, Path, tuple[str, str] | None]],
) -> None:
"""
Runs CompressionRequests at the specified paths.
"""
error_traces = []
if self.job_settings.parallel_compression:
# Execute in-parallel
num_jobs = len(convert_video_args)
with ProcessPoolExecutor(max_workers=num_jobs) as executor:
jobs = [
executor.submit(
convert_video,
*params,
self.job_settings.ffmpeg_thread_cnt,
)
for params in convert_video_args
]
for job in as_completed(jobs):
result = job.result()
if isinstance(result, tuple):
error_traces.append(result[1])
else:
logging.info(f"FFmpeg job completed: {result}")
else:
# Execute serially
for params in convert_video_args:
result = convert_video(
*params, self.job_settings.ffmpeg_thread_cnt
)
if isinstance(result, tuple):
error_traces.append(result[1])
else:
logging.info(f"FFmpeg job completed: {result}")

if error_traces:
for e in error_traces:
logging.error(e)
raise RuntimeError(
"One or more Ffmpeg jobs failed. See error logs."
)

def run_job(self) -> JobResponse:
"""
Main public method to run the compression job.
Expand Down Expand Up @@ -94,9 +147,11 @@ def run_job(self) -> JobResponse:
ffmpeg_arg_set = (
self.job_settings.compression_requested.determine_ffmpeg_arg_set()
)
transform_directory(
convert_video_args = transform_directory(
job_in_dir_path, job_out_dir_path, ffmpeg_arg_set, overrides
)
self._run_compression(convert_video_args)

job_end_time = time()
return JobResponse(
status_code=200,
Expand Down
13 changes: 8 additions & 5 deletions src/aind_behavior_video_transformation/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
from os.path import relpath
from pathlib import Path

from aind_behavior_video_transformation.transform_videos import convert_video


def likely_video_file(file: Path) -> bool:
"""
Expand Down Expand Up @@ -83,7 +81,7 @@ def build_overrides_dict(video_comp_pairs, job_in_dir_path):

def transform_directory(
input_dir: Path, output_dir: Path, arg_set, overrides=dict()
) -> None:
) -> list[tuple[Path, Path, tuple[str, str] | None]]:
"""
Transforms all video files in a directory and its subdirectories,
and creates symbolic links for non-video files. Subdirectories are
Expand All @@ -105,8 +103,10 @@ def transform_directory(

Returns
-------
None
List of tuples containing convert_video arguments.
"""

convert_video_args = []
for root, dirs, files in walk(input_dir, followlinks=True):
root_path = Path(root)
in_relpath = relpath(root, input_dir)
Expand All @@ -122,7 +122,10 @@ def transform_directory(
this_arg_set = overrides.get(root_path, arg_set)
# File-level overrides take precedence
this_arg_set = overrides.get(file_path, this_arg_set)
convert_video(file_path, dst_dir, this_arg_set)
convert_video_args.append((file_path, dst_dir, this_arg_set))

else:
out_path = dst_dir / file_name
symlink(file_path, out_path)

return convert_video_args
69 changes: 53 additions & 16 deletions src/aind_behavior_video_transformation/transform_videos.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,25 @@
"""Module to handle transforming behavior videos"""
"""
Module to handle transforming behavior videos

To add a new compression preset:
1) Define FfmpegInputArgs/FfmpegOutputArgs.
2) Define a CompressionEnum: 'NEW_PRESET = 'new_preset'
3) Add the CompressionEnum to FfmpegArgSet, and build
(FfmpegInputArgs, FfmpegOutputArgs) tuple:
'NEW_PRESET' = (
FfmpegInputArgs.CUSTOM_INPUT_ARGS,
FfmpegOutputArgs.CUSTOM_OUTPUT_ARGS,
)
FfmpegInputArgs / FfmpegOutputArgs can be prexisitng or newly-defined in (1)
"""

import logging
import shlex
import subprocess
from enum import Enum
from os import symlink
from pathlib import Path
from typing import Optional, Tuple
from subprocess import CalledProcessError
from typing import Optional, Tuple, Union

from pydantic import BaseModel, Field

Expand Down Expand Up @@ -123,7 +136,7 @@ class CompressionRequest(BaseModel):

def determine_ffmpeg_arg_set(
self,
) -> Optional[Tuple[Optional[str], Optional[str]]]:
) -> Optional[Tuple[str, str]]:
"""
Determines the appropriate set of FFmpeg arguments based on the
compression requirements.
Expand Down Expand Up @@ -153,31 +166,44 @@ def determine_ffmpeg_arg_set(
self.user_ffmpeg_input_options,
self.user_ffmpeg_output_options,
)

# If not one of the two special cases, use the enum values
else:
# If default, set compression to gamma
if comp_req == CompressionEnum.DEFAULT:
compression_preset = CompressionEnum.GAMMA_ENCODING
else:
compression_preset = self.compression_enum

# Resolve two levels of indirection here
# FfmpegArgSet -> (FfmpegInputArgs, FfmpegOutputArgs)
# (FfmpegInputArgs, FfmpegOutputArgs)
# -> (in_args str, out_args str)
arg_set_enum = FfmpegArgSet[compression_preset.name].value
arg_set = (arg_set_enum[0].value, arg_set_enum[1].value)

return arg_set


def convert_video(video_path: Path, dst: Path, arg_set) -> Path:
def convert_video(
video_path: Path,
output_dir: Path,
arg_set: Optional[Tuple[str, str]],
ffmpeg_thread_cnt: int = 0,
) -> Union[str, Tuple[str, str]]:
"""
Converts a video to a specified format using ffmpeg.

Parameters
----------
video_path : Path
The path to the input video file.
dst : Path
output_dir : Path
The destination directory where the converted video will be saved.
arg_set : tuple or None
A tuple containing input and output arguments for ffmpeg. If None, a
symlink to the original video is created.
ffmpeg_thread_cnt : set number of ffmpeg threads

Returns
-------
Expand All @@ -189,11 +215,9 @@ def convert_video(video_path: Path, dst: Path, arg_set) -> Path:
- The function uses ffmpeg for video conversion.
- If `arg_set` is None, the function creates a symlink to the original
video file.
- The function logs the ffmpeg command used for conversion.
"""

out_path = dst / f"{video_path.stem}.mp4" # noqa: E501
# Pydantic validation ensures this is a 'CompressionRequest' value.
out_path = output_dir / f"{video_path.stem}.mp4" # noqa: E501

# Trivial Case, do nothing
if arg_set is None:
Expand All @@ -204,17 +228,30 @@ def convert_video(video_path: Path, dst: Path, arg_set) -> Path:
output_args = arg_set[1]

ffmpeg_command = ["ffmpeg", "-y", "-v", "warning", "-hide_banner"]

# Set thread count
if ffmpeg_thread_cnt > 0:
ffmpeg_command.extend(["-threads", str(ffmpeg_thread_cnt)])

if input_args:
ffmpeg_command.extend(shlex.split(input_args))
ffmpeg_command.extend(["-i", str(video_path)])
if output_args:
ffmpeg_command.extend(shlex.split(output_args))
ffmpeg_command.append(str(out_path))

# For logging I guess
ffmpeg_str = " ".join(ffmpeg_command)
logging.info(f"{ffmpeg_str=}")

subprocess.run(ffmpeg_command, check=True)

return out_path
# Capture and return error message if it exists
try:
subprocess.run(
ffmpeg_command, check=True, capture_output=True, text=True
)
return str(out_path)

except CalledProcessError as e:
error_msg = (
f"FFmpeg conversion failed for {video_path}\n"
f"Command: {' '.join(ffmpeg_command)}\n"
f"Return code: {e.returncode}\n"
f"Error output:\n{e.stderr}\n"
)
return (str(out_path), error_msg)
Loading