From e685bc645e15d281ca9f8e2b049df400e6852267 Mon Sep 17 00:00:00 2001 From: jwong-nd Date: Mon, 4 Nov 2024 16:54:03 -0800 Subject: [PATCH] linting --- src/aind_behavior_video_transformation/etl.py | 77 ++++++++++++------- .../transform_videos.py | 13 ++-- 2 files changed, 56 insertions(+), 34 deletions(-) diff --git a/src/aind_behavior_video_transformation/etl.py b/src/aind_behavior_video_transformation/etl.py index cbf2590..f71f507 100644 --- a/src/aind_behavior_video_transformation/etl.py +++ b/src/aind_behavior_video_transformation/etl.py @@ -2,6 +2,7 @@ import logging import sys +from concurrent.futures import ProcessPoolExecutor, as_completed from pathlib import Path from time import time from typing import List, Optional, Tuple, Union @@ -12,16 +13,16 @@ JobResponse, get_parser, ) -from concurrent.futures import ProcessPoolExecutor, as_completed from pydantic import Field from aind_behavior_video_transformation.transform_videos import ( - CompressionRequest, convert_video + CompressionRequest, + convert_video, ) - PathLike = Union[Path, str] + class BehaviorVideoJobSettings(BasicJobSettings): """ BehaviorJob settings. Inherits both fields input_source and @@ -42,7 +43,15 @@ class BehaviorVideoJobSettings(BasicJobSettings): "request" ), ) - video_extensions = [".mp4", ".avi", ".mov", ".mkv", ".flv", ".wmv", ".webm"] + video_extensions = [ + ".mp4", + ".avi", + ".mov", + ".mkv", + ".flv", + ".wmv", + ".webm", + ] class BehaviorVideoJob(GenericEtl[BehaviorVideoJobSettings]): @@ -74,19 +83,19 @@ def _format_output_directory(self) -> None: output_dir = Path(self.job_settings.output_directory.resolve()) output_dir.mkdir(parents=True, exist_ok=True) - for file in input_dir.rglob('*'): - if (file.is_file() and - not any(file.suffix.lower() == ext - for ext in self.job_settings.video_extensions)): + for file in input_dir.rglob("*"): + if file.is_file() and not any( + file.suffix.lower() == ext + for ext in self.job_settings.video_extensions + ): relative_path = file.relative_to(input_dir) target_link = output_dir / relative_path target_link.parent.mkdir(parents=True, exist_ok=True) target_link.symlink_to(file) - def _resolve_compression_requests( - self - ) -> List[Tuple[PathLike, CompressionRequest]]: + self, + ) -> List[Tuple[PathLike, CompressionRequest]]: """ Recursively traverses input directory and resolves CompressionRequest of all videos. @@ -98,7 +107,8 @@ def _resolve_compression_requests( # Define map: abs_path -> override CompressionRequest overrides = {} - for vid_path, comp_req in self.job_settings.video_specific_compression_requests: + comp_reqs = self.job_settings.video_specific_compression_requests + for vid_path, comp_req in comp_reqs: vid_path = Path(vid_path) abs_path = None if vid_path.is_absolute(): @@ -110,24 +120,30 @@ def _resolve_compression_requests( overrides[abs_path] = comp_req # Produce list of all (abs_path, CompressionRequest) pairs - path_comp_req_pairs = \ - [(file, self.job_settings.compression_requested) - for file in input_dir.rglob('*') - if (file.is_file() and any(file.suffix.lower() == ext - for ext in self.job_settings.video_extensions))] - path_comp_req_pairs = \ - [(file, overrides[file]) + path_comp_req_pairs = [ + (file, self.job_settings.compression_requested) + for file in input_dir.rglob("*") + if ( + file.is_file() + and any( + file.suffix.lower() == ext + for ext in self.job_settings.video_extensions + ) + ) + ] + path_comp_req_pairs = [ + (file, overrides[file]) for (file, _) in path_comp_req_pairs - if file in overrides] + if file in overrides + ] return path_comp_req_pairs - def _run_compression( self, path_comp_req_pairs: List[Tuple[PathLike, CompressionRequest]], - parallel=True - ) -> None: + parallel=True, + ) -> None: """ Runs CompressionRequests at the specified paths. """ @@ -146,19 +162,25 @@ def _run_compression( # Add to job buffer convert_video_params.append((vid_path, output_dir, arg_set)) - logging.info(f'Compressing {str(vid_path)} w/ {comp_req.compression_enum}') + logging.info( + f"Compressing {str(vid_path)} \ + w/ {comp_req.compression_enum}" + ) if parallel: # Dask implementation # import dask - # jobs = [dask.delayed(convert_video)(*params) for params in convert_video_params] + # jobs = [dask.delayed(convert_video)(*params) + # for params in convert_video_params] # dask.compute(*jobs) # This returns an error if any jobs fail # ProcessPool implementation num_jobs = len(convert_video_params) with ProcessPoolExecutor(max_workers=num_jobs) as executor: - jobs = [executor.submit(convert_video, *params) - for params in convert_video_params] + jobs = [ + executor.submit(convert_video, *params) + for params in convert_video_params + ] for job in as_completed(jobs): try: result = job.result() @@ -171,7 +193,6 @@ def _run_compression( for params in convert_video_params: convert_video(params) - def run_job(self) -> JobResponse: """ Main public method to run the compression job. diff --git a/src/aind_behavior_video_transformation/transform_videos.py b/src/aind_behavior_video_transformation/transform_videos.py index d1acce5..a97550f 100644 --- a/src/aind_behavior_video_transformation/transform_videos.py +++ b/src/aind_behavior_video_transformation/transform_videos.py @@ -175,7 +175,8 @@ def determine_ffmpeg_arg_set( # Resolve two levels of indirection here # FfmpegArgSet -> (FfmpegInputArgs, FfmpegOutputArgs) - # (FfmpegInputArgs, FfmpegOutputArgs) -> (in_args str, out_args str) + # (FfmpegInputArgs, FfmpegOutputArgs) + # -> (in_args str, out_args str) arg_set_enum = FfmpegArgSet[compression_preset.name].value arg_set = (arg_set_enum[0].value, arg_set_enum[1].value) @@ -183,10 +184,8 @@ def determine_ffmpeg_arg_set( def convert_video( - video_path: Path, - output_dir: Path, - arg_set: Optional[Tuple[str, str]] - ) -> Path: + video_path: Path, output_dir: Path, arg_set: Optional[Tuple[str, str]] +) -> Path: """ Converts a video to a specified format using ffmpeg. @@ -223,7 +222,9 @@ def convert_video( output_args = arg_set[1] ffmpeg_command = ["ffmpeg", "-y", "-v", "warning", "-hide_banner"] - ffmpeg_command.extend(["-threads", "8"]) # Use 8 threads per compression job + # Use 8 threads per compression job + ffmpeg_command.extend(["-threads", "8"]) + if input_args: ffmpeg_command.extend(shlex.split(input_args)) ffmpeg_command.extend(["-i", str(video_path)])