-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parallel Compression #16
Conversation
I will review this soon. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please split this PR into the following distinct PRs:
- a PR that is just related to parallelization
- a PR that moves functions into class methods
- a PR that changes the logic of testing for overrides etc
Too many things are happening in this PR, and because you've moved functions around it's hard to tell what's changed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is pretty close. I haven't had a chance to run the PR yet, but did you confirm that the information from each process, printed by ffmpeg to stderr, is being combined correctly? I would have thought that you would need to redirect the stderr of each subprocess if you were invoking them in parallel, so that you could log it or print it back out when the future completes.
It wasn't clear to me why you exported convert_video
at the top level (in the __init__
file), or why you added another test of it. Each of the other tests should also test convert_video
] | ||
for job in as_completed(jobs): | ||
result = job.result() | ||
logging.info("FFmpeg job completed:", result) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am worried that this will garble the stderr info output of each call. Here the result is simply the filename of the output. What does the output look like? I would think that each subprocess would still print to stderr, but they would all do it at the same time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the output of a working call (same input directory as test_run_job_with_data_structure):
"""
2024-11-07 11:04:40,762 - INFO - FFmpeg job completed: /private/var/folders/jp/tzg4yhb959s7g46rlqjjl2qh0000gp/T/tmpxq15v823/camera2/clip.mp4
2024-11-07 11:04:40,792 - INFO - FFmpeg job completed: /private/var/folders/jp/tzg4yhb959s7g46rlqjjl2qh0000gp/T/tmpxq15v823/camera1/clip.mp4
"""
This is the output if I set the requested compression to:
"""
faulty_req = CompressionRequest(
compression_enum=CompressionEnum.USER_DEFINED,
user_ffmpeg_input_options = "invalid user input args",
user_ffmpeg_output_options = "invalid user output args",
)
"""
Nothing is logged and I believe stderr is forwarded to stdout:
"""
[AVFormatContext @ 0x11df07720] Unable to choose an output format for 'invalid'; use a standard extension for the filename or specify the format manually.
[AVFormatContext @ 0x12e006c50] [out#0 @ 0x11df12660] Unable to choose an output format for 'invalid'; use a standard extension for the filename or specify the format manually.
Error initializing the muxer for invalid: Invalid argument
Error opening output file invalid.
[out#0 @ 0x12e011b70] Error opening output files: Invalid argument
Error initializing the muxer for invalid: Invalid argument
Error opening output file invalid.
Error opening output files: Invalid argument
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
File "/Users/jonathan.wong/miniconda3/envs/compression/lib/python3.11/concurrent/futures/process.py", line 261, in process_worker
r = call_item.fn(*call_item.args, **call_item.kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/jonathan.wong/Projects/aind-behavior-video-transformation/src/aind_behavior_video_transformation/transform_videos.py", line 242, in convert_video
subprocess.run(ffmpeg_command, check=True)
File "/Users/jonathan.wong/miniconda3/envs/compression/lib/python3.11/subprocess.py", line 571, in run
raise CalledProcessError(retcode, process.args,
subprocess.CalledProcessError: Command '['ffmpeg', '-y', '-v', 'warning', '-hide_banner', 'invalid', 'user', 'input', 'args', '-i', '/private/var/folders/jp/tzg4yhb959s7g46rlqjjl2qh0000gp/T/tmp7ogiojea/camera2/clip.mp4', 'invalid', 'user', 'output', 'args', '/private/var/folders/jp/tzg4yhb959s7g46rlqjjl2qh0000gp/T/tmpxj4mbyd/camera2/clip.mp4']' returned non-zero exit status 234.
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/jonathan.wong/Projects/aind-behavior-video-transformation/src/aind_behavior_video_transformation/etl.py", line 234, in
job_response = job.run_job()
^^^^^^^^^^^^^
File "/Users/jonathan.wong/Projects/aind-behavior-video-transformation/src/aind_behavior_video_transformation/etl.py", line 140, in run_job
self._run_compression(convert_video_args)
File "/Users/jonathan.wong/Projects/aind-behavior-video-transformation/src/aind_behavior_video_transformation/etl.py", line 97, in _run_compression
result = job.result()
^^^^^^^^^^^^
File "/Users/jonathan.wong/miniconda3/envs/compression/lib/python3.11/concurrent/futures/_base.py", line 449, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/Users/jonathan.wong/miniconda3/envs/compression/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self.exception
subprocess.CalledProcessError: Command '['ffmpeg', '-y', '-v', 'warning', '-hide_banner', 'invalid', 'user', 'input', 'args', '-i', '/private/var/folders/jp/tzg4yhb959s7g46rlqjjl2qh0000gp/T/tmp7ogiojea/camera2/clip.mp4', 'invalid', 'user', 'output', 'args', '/private/var/folders/jp/tzg4yhb959s7g46rlqjjl2qh0000gp/T/tmpxj4mbyd/camera2/clip.mp4']' returned non-zero exit status 234.
"""
Is this garbled? It appears like this returns an error for any failed process.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like the two outputs are being interleaved in a way that makes it hard to follow, for example:
[AVFormatContext @ 0x11df07720] Unable to choose an output format for 'invalid'; use a standard extension for the filename or specify the format manually.
[AVFormatContext @ 0x12e006c50] [out#0 @ 0x11df12660] Unable to choose an output format for 'invalid'; use a standard extension for the filename or specify the format manually.
IMO it would be better if the output of each process was captured and printed to stdout or stderr in a block, so you can follow the messages from each subprocess.
# For logging I guess | ||
ffmpeg_str = " ".join(ffmpeg_command) | ||
logging.info(f"{ffmpeg_str=}") | ||
|
||
subprocess.run(ffmpeg_command, check=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you might need to make sure the stderr output of this subprocess call is routed correctly if you're running many subprocesses at the same time. Right now the stderr of the subprocess goes to stderr.
Remove redundant test, top level export
Coverage threshold decreased to 93 as coverage library is not registering error handling code as run when it is. Extra info about logging update:
|
I see, so if any job fails then the relevant errors will be in the log? This looks great! I will give it a once over one last time tomorrow. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great!
Logic:
Readibility: