From 3c9e90ad72ed2c527a59bf5950020f28508d240a Mon Sep 17 00:00:00 2001 From: jwong-nd Date: Mon, 4 Nov 2024 16:45:47 -0800 Subject: [PATCH] add parallelization --- src/aind_behavior_video_transformation/etl.py | 27 ++++++++++++++++--- .../transform_videos.py | 1 + 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/src/aind_behavior_video_transformation/etl.py b/src/aind_behavior_video_transformation/etl.py index 8746e93..cbf2590 100644 --- a/src/aind_behavior_video_transformation/etl.py +++ b/src/aind_behavior_video_transformation/etl.py @@ -12,6 +12,7 @@ JobResponse, get_parser, ) +from concurrent.futures import ProcessPoolExecutor, as_completed from pydantic import Field from aind_behavior_video_transformation.transform_videos import ( @@ -147,10 +148,28 @@ def _run_compression( convert_video_params.append((vid_path, output_dir, arg_set)) logging.info(f'Compressing {str(vid_path)} w/ {comp_req.compression_enum}') - # TODO: Parallelize this loop with Dask/Futures. - # Most important is no silent errors. - for params in convert_video_params: - convert_video(params) + if parallel: + # Dask implementation + # import dask + # jobs = [dask.delayed(convert_video)(*params) for params in convert_video_params] + # dask.compute(*jobs) # This returns an error if any jobs fail + + # ProcessPool implementation + num_jobs = len(convert_video_params) + with ProcessPoolExecutor(max_workers=num_jobs) as executor: + jobs = [executor.submit(convert_video, *params) + for params in convert_video_params] + for job in as_completed(jobs): + try: + result = job.result() + print("FFmpeg job completed:", result) + except Exception as e: + print("Error:", e) + + else: + # Execute Serially + for params in convert_video_params: + convert_video(params) def run_job(self) -> JobResponse: diff --git a/src/aind_behavior_video_transformation/transform_videos.py b/src/aind_behavior_video_transformation/transform_videos.py index 383ee13..d1acce5 100644 --- a/src/aind_behavior_video_transformation/transform_videos.py +++ b/src/aind_behavior_video_transformation/transform_videos.py @@ -223,6 +223,7 @@ def convert_video( output_args = arg_set[1] ffmpeg_command = ["ffmpeg", "-y", "-v", "warning", "-hide_banner"] + ffmpeg_command.extend(["-threads", "8"]) # Use 8 threads per compression job if input_args: ffmpeg_command.extend(shlex.split(input_args)) ffmpeg_command.extend(["-i", str(video_path)])