From 144215496a122e8c3dfa556abcb889a1059f36de Mon Sep 17 00:00:00 2001 From: Rany Date: Fri, 22 Nov 2024 22:57:54 +0200 Subject: [PATCH] Cleanup subtitle related code and make it easier to use SubMaker (#329) Also don't output subtitles to STDERR by default. Signed-off-by: rany --- examples/streaming_with_subtitles.py | 2 +- src/edge_tts/submaker.py | 23 +++++++++----- src/edge_tts/util.py | 46 +++++++++++++++++----------- 3 files changed, 44 insertions(+), 27 deletions(-) diff --git a/examples/streaming_with_subtitles.py b/examples/streaming_with_subtitles.py index 52e91bb..766ba67 100644 --- a/examples/streaming_with_subtitles.py +++ b/examples/streaming_with_subtitles.py @@ -26,7 +26,7 @@ async def amain() -> None: if chunk["type"] == "audio": file.write(chunk["data"]) elif chunk["type"] == "WordBoundary": - submaker.add_cue((chunk["offset"], chunk["duration"]), chunk["text"]) + submaker.feed(chunk) with open(SRT_FILE, "w", encoding="utf-8") as file: file.write(submaker.get_srt()) diff --git a/src/edge_tts/submaker.py b/src/edge_tts/submaker.py index 303689c..cfcfc03 100644 --- a/src/edge_tts/submaker.py +++ b/src/edge_tts/submaker.py @@ -1,9 +1,11 @@ """SubMaker module is used to generate subtitles from WordBoundary events.""" -from typing import List, Tuple +from typing import List import srt # type: ignore +from .typing import TTSChunk + class SubMaker: """ @@ -13,23 +15,25 @@ class SubMaker: def __init__(self) -> None: self.cues: List[srt.Subtitle] = [] # type: ignore - def add_cue(self, timestamp: Tuple[float, float], text: str) -> None: + def feed(self, msg: TTSChunk) -> None: """ - Add a cue to the SubMaker object. + Feed a WordBoundary message to the SubMaker object. Args: - timestamp (tuple): The offset and duration of the subtitle. - text (str): The text of the subtitle. + msg (dict): The WordBoundary message. Returns: None """ + if msg["type"] != "WordBoundary": + raise ValueError("Invalid message type, expected 'WordBoundary'") + self.cues.append( srt.Subtitle( index=len(self.cues) + 1, - start=srt.timedelta(microseconds=timestamp[0] / 10), - end=srt.timedelta(microseconds=sum(timestamp) / 10), - content=text, + start=srt.timedelta(microseconds=msg["duration"] / 10), + end=srt.timedelta(microseconds=(msg["duration"] + msg["offset"]) / 10), + content=msg["text"], ) ) @@ -41,3 +45,6 @@ def get_srt(self) -> str: str: The SRT formatted subtitles. """ return srt.compose(self.cues) # type: ignore + + def __str__(self) -> str: + return self.get_srt() diff --git a/src/edge_tts/util.py b/src/edge_tts/util.py index 9781f56..f16ee21 100644 --- a/src/edge_tts/util.py +++ b/src/edge_tts/util.py @@ -3,8 +3,7 @@ import argparse import asyncio import sys -from io import TextIOWrapper -from typing import Any, TextIO, Union +from typing import Any, Optional, TextIO from tabulate import tabulate @@ -45,31 +44,42 @@ async def _run_tts(args: Any) -> None: print("\nOperation canceled.", file=sys.stderr) return - tts: Communicate = Communicate( + communicate = Communicate( args.text, args.voice, - proxy=args.proxy, rate=args.rate, volume=args.volume, pitch=args.pitch, + proxy=args.proxy, ) - subs: SubMaker = SubMaker() - with ( - open(args.write_media, "wb") if args.write_media else sys.stdout.buffer - ) as audio_file: - async for chunk in tts.stream(): + submaker = SubMaker() + try: + audio_file = ( + open(args.write_media, "wb") + if args.write_media is not None and args.write_media != "-" + else sys.stdout.buffer + ) + sub_file: Optional[TextIO] = ( + open(args.write_subtitles, "w", encoding="utf-8") + if args.write_subtitles is not None and args.write_subtitles != "-" + else None + ) + if sub_file is None and args.write_subtitles == "-": + sub_file = sys.stderr + + async for chunk in communicate.stream(): if chunk["type"] == "audio": audio_file.write(chunk["data"]) elif chunk["type"] == "WordBoundary": - subs.add_cue((chunk["offset"], chunk["duration"]), chunk["text"]) - - sub_file: Union[TextIOWrapper, TextIO] = ( - open(args.write_subtitles, "w", encoding="utf-8") - if args.write_subtitles - else sys.stderr - ) - with sub_file: - sub_file.write(subs.get_srt()) + submaker.feed(chunk) + + if sub_file is not None: + sub_file.write(submaker.get_srt()) + finally: + if audio_file is not sys.stdout.buffer: + audio_file.close() + if sub_file is not None and sub_file is not sys.stderr: + sub_file.close() async def amain() -> None: