Skip to content

Commit

Permalink
[VideoStreamAv] Implement multithreaded decoding. #213
Browse files Browse the repository at this point in the history
  • Loading branch information
Breakthrough committed Mar 12, 2022
1 parent 32658a0 commit 3b28522
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 91 deletions.
4 changes: 2 additions & 2 deletions scenedetect/backends/opencv.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def __init__(self, path_or_device: Union[bytes, str, int], framerate: Optional[f
framerate: If set, overrides the detected framerate.
Raises:
IOError: file could not be found or access was denied
OSError: file could not be found or access was denied
VideoOpenFailure: video could not be opened (may be corrupted)
ValueError: specified framerate is invalid
"""
Expand Down Expand Up @@ -237,7 +237,7 @@ def _open_capture(self, framerate: Optional[float] = None):
if not self._is_device and not ('%' in self._path_or_device
or '://' in self._path_or_device):
if not os.path.exists(self._path_or_device):
raise IOError('Video file not found.')
raise OSError('Video file not found.')

cap = cv2.VideoCapture(self._path_or_device)
if not cap.isOpened():
Expand Down
192 changes: 107 additions & 85 deletions scenedetect/backends/pyav.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"""

from logging import getLogger
from typing import BinaryIO, Optional, Tuple, Union
from typing import AnyStr, BinaryIO, Optional, Tuple, Union

import av
from numpy import ndarray
Expand All @@ -32,56 +32,57 @@
class VideoStreamAv(VideoStream):
"""PyAV `av.InputContainer` backend."""

def __init__(self,
path_or_io: Union[str, bytes, BinaryIO],
framerate: Optional[float] = None,
name: Optional[str] = None):
def __init__(
self,
path_or_io: Union[AnyStr, BinaryIO],
framerate: Optional[float] = None,
name: Optional[str] = None,
threading_mode: str = 'AUTO',
):
"""Open a video by path.
Arguments:
path_or_io: Path to the video, or a file-like object.
framerate: If set, overrides the detected framerate.
name: Overrides the `name` property derived from the video path.
Should be set if `path_or_io` is a file-like object.
name: Overrides the `name` property derived from the video path. Should be set if
`path_or_io` is a file-like object.
threading_mode: The PyAV video stream `thread_type`. See av.codec.context.ThreadType
for valid threading modes ('AUTO', 'FRAME', 'NONE', and 'SLICE'). If this mode is
'AUTO' or 'FRAME' and not all frames have been decoded, the video will be reopened
if it is seekable, and the remaining frames will be decoded in single-threaded mode.
Default is 'AUTO' for performance (there will be a slight pause near the end).
Raises:
IOError: file could not be found or access was denied
OSError: file could not be found or access was denied
VideoOpenFailure: video could not be opened (may be corrupted)
ValueError: specified framerate is invalid
"""
# TODO: Investigate why setting the video stream threading mode to 'AUTO' / 'FRAME'
# causes decoding to stop early, e.g. adding the following:
#
# self._container.streams.video[0].thread_type = 'AUTO' # Go faster!
#
# As a workaround, we can re-open the video without threading, and continue decoding from
# where the multithreaded version left off. That could be as simple as re-initializing
# self._container and retrying the read() call.
#
# The 'FRAME' threading method provides a significant speed boost (~400 FPS vs
# 240 FPS without), so this seems like a worth-while tradeoff. The OpenCV backend
# gets around 350 FPS for comparison.

# TODO(#258): See if setting self._container.discard_corrupt = True affects anything.
# TODO(#258): See what self._container.discard_corrupt = True does with corrupt videos.
super().__init__()

self._path: Union[str, bytes] = ''
# Ensure specified framerate is valid if set.
if framerate is not None and framerate < MAX_FPS_DELTA:
raise ValueError('Specified framerate (%f) is invalid!' % framerate)

self._name: Union[str, bytes] = '' if name is None else name
self._io: Optional[BinaryIO] = None
self._duration_frames: int = 0
self._frame = None

if isinstance(path_or_io, (str, bytes)):
self._path = path_or_io
if not self._name:
self._name = get_file_name(self.path, include_extension=False)
else:
self._io = path_or_io
self._reopened = True

try:
self._container = av.open(self._path if self._path else self._io)
except av.error.FileNotFoundError as ex:
raise IOError from ex
if isinstance(path_or_io, (str, bytes)):
self._path = path_or_io
self._io = open(path_or_io, 'rb')
if not self._name:
self._name = get_file_name(self.path, include_extension=False)
else:
self._io = path_or_io

self._container = av.open(self._io)
if threading_mode is not None:
self._video_stream.thread_type = threading_mode
self._reopened = False
except OSError:
raise
except Exception as ex:
raise VideoOpenFailure(str(ex)) from ex

Expand All @@ -95,57 +96,12 @@ def __init__(self,
raise FrameRateUnavailable()
self._frame_rate: float = frame_rate
else:
# Ensure specified framerate is valid.
if framerate < MAX_FPS_DELTA:
raise ValueError('Specified framerate (%f) is invalid!' % framerate)
assert framerate >= MAX_FPS_DELTA
self._frame_rate: float = framerate

# Calculate duration in terms of number of frames once we have set the framerate.
self._duration_frames = self._get_duration()

#
# Backend-Specific Methods/Properties
#

@property
def _video_stream(self):
"""PyAV `av.video.stream.VideoStream` being used."""
return self._container.streams.video[0]

@property
def _codec_context(self):
"""PyAV `av.codec.context.CodecContext` associated with the `video_stream`."""
return self._video_stream.codec_context

def _get_duration(self) -> int:
"""Get video duration as number of frames based on the video and set framerate."""
# See https://pyav.org/docs/develop/api/time.html for details on how ffmpeg/PyAV
# handle time calculations internally and which time base to use.
assert self.frame_rate is not None, "Frame rate must be set before calling _get_duration!"
# See if we can obtain the number of frames directly from the stream itself.
if self._video_stream.frames > 0:
return self._video_stream.frames
# Calculate based on the reported container duration.
duration_sec = None
container = self._video_stream.container
if container.duration is not None and container.duration > 0:
# Containers use AV_TIME_BASE as the time base.
duration_sec = float(self._video_stream.container.duration / av.time_base)
# Lastly, if that calculation fails, try to calculate it based on the stream duration.
if duration_sec is None or duration_sec < MAX_FPS_DELTA:
if self._video_stream.duration is None:
logger.warning('Video duration unavailable.')
return 0
# Streams use stream `time_base` as the time base.
time_base = self._video_stream.time_base
if time_base.denominator == 0:
logger.warning(
'Unable to calculate video duration: time_base (%s) has zero denominator!',
str(time_base))
return 0
duration_sec = float(self._video_stream.duration / time_base)
return round(duration_sec * self.frame_rate)

#
# VideoStream Methods/Properties
#
Expand All @@ -166,9 +122,7 @@ def name(self) -> Union[bytes, str]:
@property
def is_seekable(self) -> bool:
"""True if seek() is allowed, False otherwise."""
if not self._io is None and not self._io.seekable():
return False
return self._container.format.seek_to_pts
return self._io.seekable()

@property
def frame_size(self) -> Tuple[int, int]:
Expand Down Expand Up @@ -249,7 +203,8 @@ def seek(self, target: Union[FrameTimecode, float, int]) -> None:
if not beginning:
self.read(decode=False, advance=True)
while self.position < target:
self.read(decode=False, advance=True)
if self.read(decode=False, advance=True) is False:
break

def reset(self):
""" Close and re-open the VideoStream (should be equivalent to calling `seek(0)`). """
Expand Down Expand Up @@ -279,10 +234,77 @@ def read(self, decode: bool = True, advance: bool = True) -> Union[ndarray, bool
self._frame = next(self._container.decode(video=0))
except av.error.EOFError:
self._frame = last_frame
if self._handle_eof():
return self.read(decode, advance=True)
return False
except StopIteration:
return False
has_advanced = True
if decode:
return self._frame.to_ndarray(format='bgr24')
return has_advanced

#
# Private Methods/Properties
#

@property
def _video_stream(self):
"""PyAV `av.video.stream.VideoStream` being used."""
return self._container.streams.video[0]

@property
def _codec_context(self):
"""PyAV `av.codec.context.CodecContext` associated with the `video_stream`."""
return self._video_stream.codec_context

def _get_duration(self) -> int:
"""Get video duration as number of frames based on the video and set framerate."""
# See https://pyav.org/docs/develop/api/time.html for details on how ffmpeg/PyAV
# handle time calculations internally and which time base to use.
assert self.frame_rate is not None, "Frame rate must be set before calling _get_duration!"
# See if we can obtain the number of frames directly from the stream itself.
if self._video_stream.frames > 0:
return self._video_stream.frames
# Calculate based on the reported container duration.
duration_sec = None
container = self._video_stream.container
if container.duration is not None and container.duration > 0:
# Containers use AV_TIME_BASE as the time base.
duration_sec = float(self._video_stream.container.duration / av.time_base)
# Lastly, if that calculation fails, try to calculate it based on the stream duration.
if duration_sec is None or duration_sec < MAX_FPS_DELTA:
if self._video_stream.duration is None:
logger.warning('Video duration unavailable.')
return 0
# Streams use stream `time_base` as the time base.
time_base = self._video_stream.time_base
if time_base.denominator == 0:
logger.warning(
'Unable to calculate video duration: time_base (%s) has zero denominator!',
str(time_base))
return 0
duration_sec = float(self._video_stream.duration / time_base)
return round(duration_sec * self.frame_rate)

def _handle_eof(self):
"""Fix issue where if thread_type is 'AUTO' the whole video is sometimes not decoded.
Re-open video if the threading mode is AUTO and we didn't decode all of the frames."""
# Don't re-open the video if we already did, or if we already decoded all the frames.
if self._reopened or self.frame_number >= self.duration:
return False
# Don't re-open the video if we can't seek or aren't in AUTO/FRAME thread_type mode.
if not self.is_seekable or not self._video_stream.thread_type in ('AUTO', 'FRAME'):
return False
last_frame = self.frame_number
orig_pos = self._io.tell()
try:
self._io.seek(0)
container = av.open(self._io)
except:
self._io.seek(orig_pos)
raise
self._container = container
self.seek(last_frame)
return True
4 changes: 2 additions & 2 deletions scenedetect/cli/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ def handle_options(
self._open_video_stream(
input_path=input_path,
framerate=framerate,
backend=self.config.get_value("global", "backend", ignore_default=True))
backend=self.config.get_value("global", "backend", backend, ignore_default=True))

self.output_directory = output if output else self.config.get_value("global", "output")
if self.output_directory:
Expand Down Expand Up @@ -756,7 +756,7 @@ def _open_video_stream(self, input_path: AnyStr, framerate: Optional[float],
'Failed to open input video%s: %s' %
(' using %s backend' % backend if backend else '', str(ex)),
param_hint='-i/--input') from ex
except IOError as ex:
except OSError as ex:
raise click.BadParameter('Input error:\n\n\t%s\n' % str(ex), param_hint='-i/--input')

def _open_stats_file(self, file_path: str):
Expand Down
2 changes: 1 addition & 1 deletion scenedetect/stats_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ def save_to_csv(self,
Raises:
ValueError: If both path and file are specified.
IOError: If `path` cannot be opened or a write failure occurs.
OSError: If `path` cannot be opened or a write failure occurs.
"""

if path is not None and file is not None:
Expand Down
2 changes: 1 addition & 1 deletion tests/test_video_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ def test_seek(self, vs_type: Type[VideoStream], test_video: VideoParameters):

def test_invalid_path(vs_type: Type[VideoStream]):
"""Ensure correct exception is thrown if the path does not exist."""
with pytest.raises(IOError):
with pytest.raises(OSError):
_ = vs_type('this_path_should_not_exist.mp4')


Expand Down

0 comments on commit 3b28522

Please sign in to comment.