Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel Compression #16

Merged
merged 32 commits into from
Nov 11, 2024
Merged
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
7b09ed3
base commit, agg compression jobs to run in loop
jwong-nd Nov 4, 2024
3c9e90a
add parallelization
jwong-nd Nov 5, 2024
e685bc6
linting
jwong-nd Nov 5, 2024
1e11007
2/3 tests passing
jwong-nd Nov 5, 2024
c94377e
pass tests
jwong-nd Nov 5, 2024
8b9c38f
linting
jwong-nd Nov 5, 2024
e961950
expose parallel parameter
jwong-nd Nov 5, 2024
d721833
coverage threshold to 80
jwong-nd Nov 5, 2024
2084397
remove dask comment
jwong-nd Nov 5, 2024
7d5205d
expose ffmpeg thread parameter
jwong-nd Nov 5, 2024
8a423c7
Revert to main
jwong-nd Nov 6, 2024
ba9dced
add parallelization
jwong-nd Nov 6, 2024
a49d0db
linting
jwong-nd Nov 6, 2024
9ac8ecb
log lines missing test coverage
jwong-nd Nov 6, 2024
36e1c61
test serial compression branch
jwong-nd Nov 6, 2024
b70512f
add unit test for conert_video
jwong-nd Nov 6, 2024
b68b908
lint the test
jwong-nd Nov 6, 2024
cfc3aa1
resolve serial runtime error
jwong-nd Nov 6, 2024
30476fd
linting...
jwong-nd Nov 6, 2024
f7ea735
Documentation
jwong-nd Nov 7, 2024
b16850c
parallel=true in directory test
jwong-nd Nov 7, 2024
7d0566d
Remove redundant test, top level export
galenlynch Nov 8, 2024
8ce822f
Merge pull request #17 from AllenNeuralDynamics/pr-testing
jwong-nd Nov 8, 2024
5d70338
add serial logging to parallel errors
jwong-nd Nov 8, 2024
f5a8b30
linting
jwong-nd Nov 8, 2024
6f963b3
linting 2
jwong-nd Nov 8, 2024
f08332d
runtime error
jwong-nd Nov 8, 2024
7a793a2
maybe this works
jwong-nd Nov 8, 2024
f09ae14
try this
jwong-nd Nov 8, 2024
364508d
add serial log test
jwong-nd Nov 9, 2024
8b76a56
linting
jwong-nd Nov 9, 2024
cb08829
coverage threshold
jwong-nd Nov 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add serial logging to parallel errors
jwong-nd committed Nov 8, 2024
commit 5d7033871f677b482143e9efae0e80197aff80f4
28 changes: 19 additions & 9 deletions src/aind_behavior_video_transformation/etl.py
Original file line number Diff line number Diff line change
@@ -80,9 +80,9 @@ def _run_compression(
"""
Runs CompressionRequests at the specified paths.
"""

error_traces = []
if self.job_settings.parallel_compression:
# ProcessPool implementation
# Execute in-parallel
num_jobs = len(convert_video_args)
with ProcessPoolExecutor(max_workers=num_jobs) as executor:
jobs = [
@@ -94,15 +94,25 @@ def _run_compression(
for params in convert_video_args
]
for job in as_completed(jobs):
result = job.result()
logging.info("FFmpeg job completed:", result)

out_path, error = job.result()
if error:
error_traces.append(error)
else:
logging.info(f"FFmpeg job completed: {out_path}")
else:
# Execute Serially
# Execute serially
for params in convert_video_args:
out_path = convert_video(*params,
self.job_settings.ffmpeg_thread_cnt)
logging.info(f"FFmpeg job completed: {out_path}")
out_path, error = convert_video(*params,
self.job_settings.ffmpeg_thread_cnt)
if error:
error_traces.append(error)
else:
logging.info(f"FFmpeg job completed: {out_path}")

if error_traces:
for e in error_traces:
logging.error(e)
raise RuntimeError('One or more Ffmpeg jobs failed. See error logs.')

def run_job(self) -> JobResponse:
"""
28 changes: 23 additions & 5 deletions src/aind_behavior_video_transformation/transform_videos.py
Original file line number Diff line number Diff line change
@@ -15,14 +15,14 @@

import shlex
import subprocess
from subprocess import CalledProcessError
from enum import Enum
from os import symlink
from pathlib import Path
from typing import Optional, Tuple

from pydantic import BaseModel, Field


class CompressionEnum(Enum):
"""
Enum class to define different types of compression requests.
@@ -189,7 +189,7 @@ def convert_video(
output_dir: Path,
arg_set: Optional[Tuple[str, str]],
ffmpeg_thread_cnt: int = 0,
) -> Path:
) -> tuple[Path, Optional[str]]:
"""
Converts a video to a specified format using ffmpeg.

@@ -239,6 +239,24 @@ def convert_video(
ffmpeg_command.extend(shlex.split(output_args))
ffmpeg_command.append(str(out_path))

subprocess.run(ffmpeg_command, check=True)

return out_path
# Capture and return error message if it exists
import tempfile
with tempfile.NamedTemporaryFile(mode='w+', suffix='.log') as stdout_file, \
tempfile.NamedTemporaryFile(mode='w+', suffix='.log') as stderr_file:
try:
subprocess.run(
ffmpeg_command,
check=True,
capture_output=True,
text=True
)
return out_path, None

except CalledProcessError as e:
error_msg = (
f"FFmpeg conversion failed for {video_path}\n"
f"Command: {' '.join(ffmpeg_command)}\n"
f"Return code: {e.returncode}\n"
f"Error output:\n{e.stderr}\n"
)
return (out_path, error_msg)