Skip to content

Commit

Permalink
WIP: Attempt to preserve video file standards at AIND
Browse files Browse the repository at this point in the history
Transformation should preserve the [video file
standard](https://github.com/AllenNeuralDynamics/aind-physio-arch/blob/file-formats/doc/file_formats/video.md)
at AIND. Requires preserving directory structure, and copying non-video files.
  • Loading branch information
galenlynch committed Oct 24, 2024
1 parent a4e48d6 commit 794320f
Showing 1 changed file with 114 additions and 59 deletions.
173 changes: 114 additions & 59 deletions src/aind_behavior_video_transformation/transform_videos.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
import subprocess
import sys
from enum import Enum
from os import symlink
from os import symlink, walk
from os.path import relpath
from pathlib import Path
from time import time
from typing import Optional
from typing import Optional, Tuple

from aind_data_transformation.core import (
BasicJobSettings,
Expand All @@ -32,17 +33,17 @@ class CompressionRequest(Enum):
NO_COMPRESSION = "no compression"


class InputFfmpegParams(Enum):
class FfmpegInputArgs(Enum):
"""
Input parameter set referenced inside FfmpegParamSets
Input arguments set referenced inside FfmpegParamSets
"""

NONE = ""


class OutputFfmpegParams(Enum):
class FfmpegOutputArgs(Enum):
"""
Output parameter set referenced inside FfmpegParamSets
Output arguments set referenced inside FfmpegParamSets
"""

GAMMA_ENCODING = (
Expand All @@ -64,7 +65,7 @@ class OutputFfmpegParams(Enum):
NONE = ""


class FfmpegParamSets(Enum):
class FfmpegArgSet(Enum):
"""
Define different ffmpeg params to be used for video compression
Two-tuple with first element as input params and second element as output
Expand All @@ -78,12 +79,12 @@ class FfmpegParamSets(Enum):
"""

GAMMA_ENCODING = (
InputFfmpegParams.NONE,
OutputFfmpegParams.GAMMA_ENCODING,
FfmpegInputArgs.NONE,
FfmpegOutputArgs.GAMMA_ENCODING,
)
NO_GAMMA_ENCODING = (
InputFfmpegParams.NONE,
OutputFfmpegParams.NO_GAMMA_ENCODING,
FfmpegInputArgs.NONE,
FfmpegOutputArgs.NO_GAMMA_ENCODING,
)


Expand All @@ -101,50 +102,76 @@ class CompressionSettings(BasicJobSettings):
user_ffmpeg_output_options: Optional[str] = Field(
default=None, description="User defined ffmpeg output options"
)
dirs_to_process: Optional[list[Path]] = Field(
default=None, description="Directories to process"
)
preserve_structure: Optional[bool] = Field(
default=True, description="Preserve directory structure"
)


def likely_video_file(file: Path) -> bool:
"""
Check if a file is likely a video file
"""
return file.suffix in set(
".mp4",
".avi",
".mov",
".mkv",
".flv",
".wmv",
".webm",
)


class BehaviorVideoJob(GenericEtl[CompressionSettings]):
"""Main class to handle behavior video transformations"""

def convert_video(self, video_path: Path) -> None:
def determine_ffmpeg_arg_set(self) -> Optional[Tuple[str, str]]:
"""
Determine ffmpeg arguments from job settings
"""
comp_req = self.job_settings.compression_requested
# Handle two special cases
if comp_req == CompressionRequest.NO_COMPRESSION:
arg_set = None
elif comp_req == CompressionRequest.USER_DEFINED:
arg_set = (
self.job_settings.user_ffmpeg_input_options,
self.job_settings.user_ffmpeg_output_options,
)
# If not one of the two special cases, use the enum values
else:
# If default, set compression to gamma
if comp_req == CompressionRequest.DEFAULT:
compression_preset = CompressionRequest.GAMMA_ENCODING
else:
compression_preset = self.job_settings.compression_requested
arg_set_enum = FfmpegArgSet[compression_preset.name].value
arg_set = (arg_set_enum[0].value, arg_set_enum[1].value)
return arg_set

def convert_video(self, video_path: Path, dst: Path, arg_set) -> Path:
"""
Convert video to a different format
Parameters
----------
video_path : Path
Path to the video file to be converted
"""

out_path = (
self.job_settings.output_directory / f"{video_path.stem}.mp4"
) # noqa: E501
out_path = dst / f"{video_path.stem}.mp4" # noqa: E501
# Pydantic validation ensures this is a 'CompressionRequest' value.
compression_requested = self.job_settings.compression_requested

# Trivial Case, do nothing
if compression_requested == CompressionRequest.NO_COMPRESSION:
symlink(str(video_path), str(out_path))
return

# Compression Cases corresponding to each CompressionRequest.
# Each case sets input/output args to pass into ffmpeg command.
# If user defined, use the user defined options.
if compression_requested == CompressionRequest.USER_DEFINED:
input_args = self.job_settings.user_ffmpeg_input_options
output_args = self.job_settings.user_ffmpeg_output_options
# In all other cases, the options are defined in FfmpegParamSets.
else:
# If default, set compression to gamma
if compression_requested == CompressionRequest.DEFAULT:
compression_preset = CompressionRequest.GAMMA_ENCODING
else:
compression_preset = compression_requested
param_set = FfmpegParamSets[compression_preset.name].value
input_args = param_set[0].value
output_args = param_set[1].value
if arg_set is None:
symlink(video_path, out_path)
return out_path

logging.info(f"{input_args=}")
logging.info(f"{output_args=}")
input_args = arg_set[0]
output_args = arg_set[1]

ffmpeg_command = ["ffmpeg", "-y", "-v", "info"]
if input_args:
Expand All @@ -154,18 +181,33 @@ def convert_video(self, video_path: Path) -> None:
ffmpeg_command.extend(shlex.split(output_args))
ffmpeg_command.append(str(out_path))

# Run command in subprocess
try:
result = subprocess.run( # noqa: F841
ffmpeg_command,
check=True,
stderr=subprocess.PIPE, # Capture stderr
text=True, # Get output as string, not bytes
)
except subprocess.CalledProcessError as e:
print(f"Error running FFmpeg: {e.stderr}")
# For logging I guess
ffmpeg_str = " ".join(ffmpeg_command)
logging.info(f"{ffmpeg_str=}")

return
subprocess.run(ffmpeg_command, check=True)

return out_path

def transform_directory(
self, input_dir: Path, output_dir: Path, arg_set
) -> None:
"""
Transform all videos in a directory
"""
for root, dirs, files in walk(input_dir, followlinks=True):
in_relpath = relpath(root, input_dir)
dst_dir = output_dir / in_relpath
for dir_name in dirs:
out_path = dst_dir / dir_name
out_path.mkdir(parents=True, exist_ok=True)
for file_name in files:
file_path = Path(root) / file_name
if likely_video_file(file_path):
self.convert_video(file_path, dst_dir, arg_set)
else:
out_path = dst_dir / file_name
symlink(file_path, out_path)

def run_job(self) -> JobResponse:
"""
Expand All @@ -177,14 +219,28 @@ def run_job(self) -> JobResponse:
"""
job_start_time = time()
input_dir = self.job_settings.input_source
video_files = [
f
for f in input_dir.iterdir()
if f.suffix in (".mp4", ".avi", ".mov", ".mkv")
]
for video_file in video_files:
self.convert_video(video_file)
ffmpeg_arg_set = self.determine_ffmpeg_arg_set()

input_dir_paths = self.job_settings.dirs_to_process
job_out_dir_path = self.job_settings.output_directory
job_in_dir_path = self.job_settings.input_source
if input_dir_paths:
for dir_path in input_dir_paths:
if self.job_settings.preserve_structure:
rel_dir_name = relpath(dir_path, job_in_dir_path)
output_dir_path = job_out_dir_path / rel_dir_name
else:
output_dir_path = job_out_dir_path / dir_path.name
output_dir_path.mkdir(parents=True, exist_ok=True)
self.transform_directory(
dir_path, output_dir_path, ffmpeg_arg_set
)
else:
self.transform_directory(
job_in_dir_path,
job_out_dir_path,
ffmpeg_arg_set,
)

job_end_time = time()
return JobResponse(
Expand All @@ -207,11 +263,10 @@ def run_job(self) -> JobResponse:
cli_args.config_file
)
else:
# Construct settings from env vars
# Default settings
job_settings = CompressionSettings(
input_source=Path("tests/test_video_in_dir"),
output_directory=Path("tests/test_video_out_dir"),
compression_requested=CompressionRequest.DEFAULT,
)

job = BehaviorVideoJob(job_settings=job_settings)
Expand Down

0 comments on commit 794320f

Please sign in to comment.