Skip to content

Commit

Permalink
Revert to main
Browse files Browse the repository at this point in the history
  • Loading branch information
jwong-nd committed Nov 6, 2024
1 parent 7d5205d commit 8a423c7
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 200 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ exclude_lines = [
"import",
"pragma: no cover",
]
fail_under = 80
fail_under = 97

[tool.isort]
line_length = 79
Expand Down
181 changes: 19 additions & 162 deletions src/aind_behavior_video_transformation/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import logging
import sys
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path
from time import time
from typing import List, Optional, Tuple, Union
Expand All @@ -15,13 +14,14 @@
)
from pydantic import Field

from aind_behavior_video_transformation.filesystem import (
build_overrides_dict,
transform_directory,
)
from aind_behavior_video_transformation.transform_videos import (
CompressionRequest,
convert_video,
)

PathLike = Union[Path, str]


class BehaviorVideoJobSettings(BasicJobSettings):
"""
Expand All @@ -34,7 +34,7 @@ class BehaviorVideoJobSettings(BasicJobSettings):
description="Compression requested for video files",
)
video_specific_compression_requests: Optional[
List[Tuple[PathLike, CompressionRequest]]
List[Tuple[Union[Path, str], CompressionRequest]]
] = Field(
default=None,
description=(
Expand All @@ -43,23 +43,6 @@ class BehaviorVideoJobSettings(BasicJobSettings):
"request"
),
)
parallel_compression: bool = Field(
default=True,
description="Run compression in parallel or sequentially.",
)
ffmpeg_thread_cnt: int = Field(
default=8,
description="Number of threads per ffmpeg compression job."
)
video_extensions: List[str] = [
".mp4",
".avi",
".mov",
".mkv",
".flv",
".wmv",
".webm",
]


class BehaviorVideoJob(GenericEtl[BehaviorVideoJobSettings]):
Expand All @@ -81,136 +64,6 @@ class BehaviorVideoJob(GenericEtl[BehaviorVideoJobSettings]):
run_job() -> JobResponse
"""

def _format_output_directory(self) -> None:
"""
Recurisively copies (symlink) non-video files
from input directory to output directory
perserving filesysem structure.
"""
input_dir = Path(self.job_settings.input_source.resolve())
output_dir = Path(self.job_settings.output_directory.resolve())
output_dir.mkdir(parents=True, exist_ok=True)

for file in input_dir.rglob("*"):
if file.is_file() and not (
file.suffix.lower() in self.job_settings.video_extensions
):
relative_path = file.relative_to(input_dir)
target_link = output_dir / relative_path
target_link.parent.mkdir(parents=True, exist_ok=True)
target_link.symlink_to(file)

def _resolve_compression_requests(
self,
) -> List[Tuple[PathLike, CompressionRequest]]:
"""
Recursively traverses input directory and
resolves CompressionRequest of all videos.
Sets 'compression_requested' as the default, unless
overrided in 'video_specific_compression_requests'.
"""
input_dir = Path(self.job_settings.input_source.resolve())

# Set all videos to global compression setting
path_req_pairs: List[Tuple[Path, CompressionRequest]] = [
(file.resolve(), self.job_settings.compression_requested)
for file in input_dir.rglob("*")
if (
file.is_file()
and (file.suffix.lower() in self.job_settings.video_extensions)
)
]

# Apply overrides if present.
comp_reqs = self.job_settings.video_specific_compression_requests
if comp_reqs:
# Define map: override abs_vid_path -> override CompressionRequest
overrides: dict[Path, CompressionRequest] = {}
for override_path, override_req in comp_reqs:
override_path = Path(override_path)

# Case 1: Override Path is a file.
if override_path.is_file():
override_path = override_path.resolve()
overrides[override_path] = override_req

# Case 2: Override Path is a subdirectory (relative/absolute)
else:
if not override_path.is_absolute():
override_path = input_dir / override_path
for override_file in override_path.rglob("*"):
if override_file.is_file() and (
override_file.suffix.lower()
in self.job_settings.video_extensions
):
override_file = override_file.resolve()
overrides[override_file] = override_req

# Filter path_req_pairs by overrides.
output_pairs: List[Tuple[Path, CompressionRequest]] = []
for vid_path, default_req in path_req_pairs:
for override_path, override_req in overrides.items():
if vid_path.samefile(override_path):
output_pairs.append((override_path, override_req))
else:
output_pairs.append((vid_path, default_req))
path_req_pairs = output_pairs

return path_req_pairs

def _run_compression(
self,
path_comp_req_pairs: List[Tuple[PathLike, CompressionRequest]],
) -> None:
"""
Runs CompressionRequests at the specified paths.
"""
input_dir = Path(self.job_settings.input_source.resolve())
output_dir = Path(self.job_settings.output_directory.resolve())

convert_video_params = []
for vid_path, comp_req in path_comp_req_pairs:
# Resolve destination
relative_path = vid_path.relative_to(input_dir)
output_path = output_dir / relative_path
output_vid_dir = output_path.parent

# Resolve compression params
arg_set = comp_req.determine_ffmpeg_arg_set()

# Add to job buffer
convert_video_params.append((vid_path, output_vid_dir, arg_set))
logging.info(
f"Compressing {str(vid_path)} \
w/ {comp_req.compression_enum}"
)

if self.job_settings.parallel_compression:
# ProcessPool implementation
num_jobs = len(convert_video_params)
with ProcessPoolExecutor(max_workers=num_jobs) as executor:
jobs = [
executor.submit(convert_video, *params,
self.job_settings.ffmpeg_thread_cnt)
for params in convert_video_params
]
for job in as_completed(jobs):
try:
result = job.result()
logging.info("FFmpeg job completed:", result)
except Exception as e:
logging.info("Error:", e)

else:
# Execute Serially
for params in convert_video_params:
try:
convert_video(*params, self.job_settings.ffmpeg_thread_cnt)
logging.info("FFmpeg job completed:", result)
except Exception as e:
logging.info("Error:", e)

def run_job(self) -> JobResponse:
"""
Main public method to run the compression job.
Expand All @@ -230,10 +83,20 @@ def run_job(self) -> JobResponse:
"""
job_start_time = time()

self._format_output_directory()
path_comp_req_pairs = self._resolve_compression_requests()
self._run_compression(path_comp_req_pairs)
video_comp_pairs = (
self.job_settings.video_specific_compression_requests
)
job_out_dir_path = self.job_settings.output_directory.resolve()
Path(job_out_dir_path).mkdir(exist_ok=True)
job_in_dir_path = self.job_settings.input_source.resolve()
overrides = build_overrides_dict(video_comp_pairs, job_in_dir_path)

ffmpeg_arg_set = (
self.job_settings.compression_requested.determine_ffmpeg_arg_set()
)
transform_directory(
job_in_dir_path, job_out_dir_path, ffmpeg_arg_set, overrides
)
job_end_time = time()
return JobResponse(
status_code=200,
Expand Down Expand Up @@ -264,11 +127,5 @@ def run_job(self) -> JobResponse:
job = BehaviorVideoJob(job_settings=job_settings)
job_response = job.run_job()
print(job_response.status_code)
logging.info(job_response.model_dump_json())

# TODO:
# - Expose number of thread parameter
# - Add unit tests for helper methods
# - Unit test convert_video

# Will push this in at the end and call it a day.
logging.info(job_response.model_dump_json())
128 changes: 128 additions & 0 deletions src/aind_behavior_video_transformation/filesystem.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
"""Module for handling file discovery to transform videos."""

from os import symlink, walk
from os.path import relpath
from pathlib import Path

from aind_behavior_video_transformation.transform_videos import convert_video


def likely_video_file(file: Path) -> bool:
"""
Check if a file is likely a video file based on its suffix.
Parameters
----------
file : Path
The file path to check.
Returns
-------
bool
True if the file suffix indicates it is a video file, False otherwise.
"""
return file.suffix in set(
[
".mp4",
".avi",
".mov",
".mkv",
".flv",
".wmv",
".webm",
]
)


def build_overrides_dict(video_comp_pairs, job_in_dir_path):
"""
Builds a dictionary of override arguments for video paths.
Parameters
----------
video_comp_pairs : list of tuple
A list of tuples where each tuple contains a video file name and a
corresponding CompressionRequest object.
job_in_dir_path : Path
The base directory path where the job input files are located.
Returns
-------
dict
A dictionary where keys are video paths (either files or directories)
and values are the override argument sets determined by the
CompressionRequest objects.
"""
overrides = dict()
if video_comp_pairs:
for video_name, comp_req in video_comp_pairs:
video_path = Path(video_name)
# Figure out how video path was passed, convert to absolute
if video_path.is_absolute():
in_path = video_path
elif video_path.exists():
in_path = video_path.resolve()
else:
in_path = (job_in_dir_path / video_path).resolve()
# Set overrides for the video path
override_arg_set = comp_req.determine_ffmpeg_arg_set()
# If it is a directory, set overrides for all subdirectories
if in_path.is_dir():
overrides[in_path] = override_arg_set
for root, dirs, _ in walk(in_path, followlinks=True):
root_path = Path(root)
for dir_name in dirs:
subdir = root_path / dir_name
overrides[subdir] = override_arg_set
# If it is a file, set override for the file
else:
overrides[in_path] = override_arg_set

return overrides


def transform_directory(
input_dir: Path, output_dir: Path, arg_set, overrides=dict()
) -> None:
"""
Transforms all video files in a directory and its subdirectories,
and creates symbolic links for non-video files. Subdirectories are
created as needed.
Parameters
----------
input_dir : Path
The directory containing the input files.
output_dir : Path
The directory where the transformed files and symbolic links will be
saved.
arg_set : Any
The set of arguments to be used for video transformation.
overrides : dict, optional
A dictionary containing overrides for specific directories or files.
Keys are Paths and values are argument sets. Default is an empty
dictionary.
Returns
-------
None
"""
for root, dirs, files in walk(input_dir, followlinks=True):
root_path = Path(root)
in_relpath = relpath(root, input_dir)
dst_dir = output_dir / in_relpath
for dir_name in dirs:
out_path = dst_dir / dir_name
out_path.mkdir(parents=True, exist_ok=True)

for file_name in files:
file_path = Path(root) / file_name
if likely_video_file(file_path):
# If the parent directory has an override, use that
this_arg_set = overrides.get(root_path, arg_set)
# File-level overrides take precedence
this_arg_set = overrides.get(file_path, this_arg_set)
convert_video(file_path, dst_dir, this_arg_set)
else:
out_path = dst_dir / file_name
symlink(file_path, out_path)
Loading

0 comments on commit 8a423c7

Please sign in to comment.