Skip to content

Commit

Permalink
[timestamps] from_video_file - Use mkvextract if the file is a mkv
Browse files Browse the repository at this point in the history
ffprobe can be slow for big mkv file (1 gb or more)
mkvextract is way faster, so if it is available, we use it
  • Loading branch information
moi15moi committed Dec 27, 2023
1 parent 04d30fa commit 6940066
Showing 1 changed file with 137 additions and 13 deletions.
150 changes: 137 additions & 13 deletions pyonfx/timestamps.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
from decimal import Decimal
from fractions import Fraction
from io import StringIO, TextIOWrapper
from platform import system
from tempfile import gettempdir
from typing import Any, Callable, List, Optional, Tuple, Union


Expand Down Expand Up @@ -403,7 +405,9 @@ def from_timestamps_file(
To extract the timestamps file, you have 2 options:
- Open the video with Aegisub. "Video" --> "Save Timecodes File";
- Using [gMKVExtractGUI](https://sourceforge.net/projects/gmkvextractgui/) (warning: it will produce one timestamp too many at the end of the file, and you will need to manually remove it).
- Using [gMKVExtractGUI](https://sourceforge.net/projects/gmkvextractgui/)
Warning: it will produce one timestamp too many at the end of the file, and you will need to manually remove it
See: https://gitlab.com/mbunkus/mkvtoolnix/-/issues/3075
Parameters:
path_to_timestamps_file_or_content (str | os.PathLike[str]):
Expand Down Expand Up @@ -436,12 +440,12 @@ def from_timestamps_file(
)

@classmethod
def from_video_file(
def _from_ffprobe(
cls: Timestamps,
video_path: str,
index: Optional[int] = 0,
normalize: Optional[bool] = True,
rounding_method: Optional[RoundingMethod] = RoundingMethod.ROUND,
index: int,
normalize: bool,
rounding_method: RoundingMethod,
) -> Timestamps:
"""Create timestamps based on the ``video_path`` provided.
Expand Down Expand Up @@ -480,14 +484,6 @@ def get_timestamps(packets) -> Tuple[Fraction, Fraction, List[int]]:
if shutil.which("ffprobe") is None:
raise Exception("ffprobe is not in the environment variable.")

if not os.path.isfile(video_path):
raise FileNotFoundError(f'Invalid path for the video file: "{video_path}"')

# Getting video absolute path and checking for its existance
if not os.path.isabs(video_path):
dirname = os.path.dirname(os.path.abspath(sys.argv[0]))
video_path = os.path.join(dirname, video_path)

cmd = [
"ffprobe",
"-select_streams",
Expand Down Expand Up @@ -535,6 +531,134 @@ def get_timestamps(packets) -> Tuple[Fraction, Fraction, List[int]]:
last_frame_time=last_frame_time,
)

@staticmethod
def _from_mkvextract(
mkvextract_path: os.PathLike[str],
mkvmerge_path: os.PathLike[str],
video_path: str,
index: int,
normalize: bool,
rounding_method: RoundingMethod,
) -> Timestamps:
"""Create timestamps based on the ``video_path`` provided.
Note:
This method requires the ``mkvextract`` and ``mkvmerge`` programs to be available.
Parameters:
video_path (str): A Video path.
index (int, optional): Stream index of the video.
normalize (bool, optional): If True, it will shift the timestamps to make them start from 0. If false, the option does nothing.
rounding_method (RoundingMethod, optional): A rounding method. See the comment in Timestamps description about floor vs round.
Returns:
An Timestamps instance.
"""

cmd = [
mkvmerge_path,
video_path,
"-J",
]
mkvmerge_output = subprocess.run(cmd, capture_output=True, text=True)
if mkvmerge_output.returncode == 2:
raise ValueError(f"mkvmerge reported this error: {mkvmerge_output.stdout}")
mkvmerge_output_dict = json.loads(mkvmerge_output.stdout)

is_index_in_video = False
for track in mkvmerge_output_dict["tracks"]:
if track["id"] == index:
if track["type"] != "video":
raise ValueError(
f'The index {index} is not a video stream. It is an "{track["type"]}" stream.'
)
is_index_in_video = True
break

if not is_index_in_video:
raise ValueError(f"The index {index} is not in the file {video_path}.")

temp_dir = gettempdir()
timestamps_file_path = os.path.join(temp_dir, "temp_timestamps.txt")
cmd = [
mkvextract_path,
video_path,
"timestamps_v2",
"0:" + timestamps_file_path
]

mkvextract_output = subprocess.run(cmd, capture_output=True, text=True)
if mkvextract_output.returncode == 2:
raise ValueError(f"mkvextract reported this error: {mkvextract_output.stdout}")

with open(timestamps_file_path, "r") as file:
lines = file.readlines()

os.remove(timestamps_file_path)

# Ignore the last line due to this issue: https://gitlab.com/mbunkus/mkvtoolnix/-/issues/3075
content = "".join(lines[:-1])

return Timestamps.from_timestamps_file(
content,
normalize=normalize,
rounding_method=rounding_method
)

@classmethod
def from_video_file(
cls: Timestamps,
video_path: str,
index: Optional[int] = 0,
normalize: Optional[bool] = True,
rounding_method: Optional[RoundingMethod] = RoundingMethod.ROUND,
) -> Timestamps:
"""Create timestamps based on the ``video_path`` provided.
Note:
This method requires the ``ffprobe`` or ``mkvextract/mkvmerge`` (if your if file is a mkv) programs to be available.
Parameters:
video_path (str): A Video path.
index (int, optional): Stream index of the video.
normalize (bool, optional): If True, it will shift the timestamps to make them start from 0. If false, the option does nothing.
rounding_method (RoundingMethod, optional): A rounding method. See the comment in Timestamps description about floor vs round.
Returns:
An Timestamps instance.
"""

if not os.path.isfile(video_path):
raise FileNotFoundError(f'Invalid path for the video file: "{video_path}"')

# Getting video absolute path and checking for its existance
if not os.path.isabs(video_path):
dirname = os.path.dirname(os.path.abspath(sys.argv[0]))
video_path = os.path.join(dirname, video_path)

with open(video_path, "rb") as f:
# From https://en.wikipedia.org/wiki/List_of_file_signatures
is_mkv = f.read(4) == b"\x1a\x45\xdf\xa3"

mkvextract_path = None
if is_mkv:
# Verify if mkvextract and mkvmerge are installed
mkvextract_path = shutil.which("mkvextract")
mkvmerge_path = shutil.which("mkvmerge")
if mkvextract_path is None and system() == "Windows":
for possible_path in ["C:\\Program Files\\MKVToolNix\\mkvextract.exe", "C:\\Program Files (x86)\\MKVToolNix\\mkvextract.exe"]:
if os.path.isfile(os.path.join(possible_path)):
mkvextract_path = possible_path
break
if mkvmerge_path is None and system() == "Windows":
for possible_path in ["C:\\Program Files\\MKVToolNix\\mkvmerge.exe", "C:\\Program Files (x86)\\MKVToolNix\\mkvmerge.exe"]:
if os.path.isfile(os.path.join(possible_path)):
mkvmerge_path = possible_path
break

if is_mkv and mkvextract_path is not None and mkvmerge_path is not None:
return Timestamps._from_mkvextract(mkvextract_path, mkvmerge_path, video_path, index, normalize, rounding_method)
else:
return Timestamps._from_ffprobe(video_path, index, normalize, rounding_method)

@staticmethod
def normalize(
timestamps: List[int], last_frame_time: Fraction
Expand Down

0 comments on commit 6940066

Please sign in to comment.