diff --git a/client.py b/client.py index 97a89ba..1e79b78 100644 --- a/client.py +++ b/client.py @@ -43,10 +43,12 @@ def exit_msg(msg: str, is_error: bool = True) -> NoReturn: parser = ArgumentParser() group = parser.add_mutually_exclusive_group() - group.add_argument("-s", "--stream", type=str, help="Input (sup, mnu) to convert to xES+MUI.", default='') + group.add_argument("-s", "--stream", type=str, help="Input (sup, mnu, textst) to convert to xES+MUI.", default='') group.add_argument("-x", "--xes", type=str, help="Input xES to convert.", default='') parser.add_argument("-m", "--mui", type=str, help="Input MUI associated to xES to convert.", default='') + parser.add_argument("-t", "--textst", help="Use if TextST.", action='store_true') + parser.add_argument('-v', '--version', action='version', version=f"(c) {__author__}, v{__version__}") parser.add_argument("-o", "--output", type=str, required=True) @@ -80,10 +82,16 @@ def exit_msg(msg: str, is_error: bool = True) -> NoReturn: exit_msg("Desired output format is not xES? Exiting.") print("Converting to xES+MUI...") - EsMuiStream.convert_to_esmui(args.stream, args.output, args.output + '.mui') + if (args.stream.lower().endswith('textst') or args.xes.lower().endswith('tes')) and not args.textst: + exit_msg("Is the conversion for TextST? Flag it as such if so. Exiting...") + elif args.textst: + EsMuiStream.convert_to_tesmui(args.stream, args.output, args.output + '.mui') + else: + EsMuiStream.convert_to_pesmui(args.stream, args.output, args.output + '.mui') exit_msg("", is_error=False) elif args.mui: print("Converting from xES+MUI...") + assert args.textst is False and args.xes.lower().endswith('tes') is False, "Cannot convert TES to TextST at this time." emf = EsMuiStream(args.mui, args.xes) emf.convert_to_stream(args.output) exit_msg("", is_error=False) diff --git a/scenaristream/__init__.py b/scenaristream/__init__.py index 43b6a65..3934fb5 100644 --- a/scenaristream/__init__.py +++ b/scenaristream/__init__.py @@ -31,28 +31,31 @@ from pathlib import Path from enum import IntEnum, Enum from struct import unpack, pack -from typing import Generator, Union, Optional +from typing import Generator, Union, Optional, Type class MUIType(IntEnum): - VIDEO = 0x01 - AUDIO = 0x02 - GRAPHICS=0x03 + VIDEO = 0x01 + AUDIO = 0x02 + GRAPHICS = 0x03 + TEXT = 0x04 -#TextST should also be MUIType.GRAPHICS, but what is the header? 'TS'? class StreamHeader(Enum): PG = b'PG' IG = b'IG' - TS = b'TS' #!!! This is likely incorrect, see above comment !!! + MPEG_TS = bytes([0x00, 0x00, 0x01, 0xBF]) -class BDGraphicSegment(IntEnum): - PDS = 0x14 - ODS = 0x15 +class GraphicSegment(IntEnum): + PDS = 0x14 #PGS+IGS + ODS = 0x15 #PGS+IGS PCS = 0x16 - WDS = 0x17 + WDS = 0x17 #PGS ICS = 0x18 #IGS - END = 0x80 - TDIALOG = 0x81 #TextST - TSTYLE = 0x82 #TextST + END = 0x80 #All + +class TextSegment(IntEnum): + END = 0x80 #? + STYLE = 0x81 + DIALOG = 0x82 #%% Raw stream format (tsMuxer, SUPer, avs2bdnxml) class StreamFile: @@ -68,10 +71,15 @@ def __init__(self, fp: Union[Path, str], **kwargs) -> None: def get_header(self) -> StreamHeader: with open(self.file, 'rb') as f: header = f.read(2) + long_header = f.read(2) try: header = StreamHeader(header) except: - raise AssertionError("File contains garbage or unknown stream type.") + if header + long_header == StreamHeader.MPEG_TS: + print("Found MPEG-TS header, assuming TextST.") + header = StreamHeader.MPEG_TS + else: + raise AssertionError("File contains garbage or unknown stream type.") return header @property @@ -127,18 +135,71 @@ def segments(self) -> list[bytes]: return list(self.gen_segments()) ####StreamFile +class TextSTFile(StreamFile): + def get_header(self) -> StreamHeader: + """ + TextST files don't have clear formatting. This assume SubtitleEdit output format + is the correct one, so we check for M2TS packet header and the ID of the first + segment to ensure this is TextST. + """ + with open(self.file, 'rb') as f: + header = f.read(7) + assert header[:4] == StreamHeader.MPEG_TS.value + assert header[-1] in [TextSegment.STYLE, TextSegment.DIALOG] + return StreamHeader.MPEG_TS + + def gen_segments(self) -> Generator[bytes, None, None]: + """ + Generator of segments. Stops when all segments in the + file have been consumed. This is the parsing function. + + :yield: Every segment, in order, as they appear in the stream file. + """ + MAGIC = self.get_header().value + header_len = len(MAGIC) + 2 + assert header_len == 6 + + with open(self.file, 'rb') as f: + buff = f.read(self.bytes_per_read) + while buff: + renew = False + if len(buff) >= 2: + assert buff[:4] == MAGIC, "Encountered garbage in stream." + if len(buff) >= header_len: + segment_length = unpack(">H", buff[header_len-2:header_len])[0] + if len(buff) >= segment_length+header_len: + #Sanity check, M2TS length should equal TextST one minus header + assert segment_length-3 == unpack(">H", buff[header_len+1:header_len+3])[0] + #Return packet with MPEG_TS header stripped. + yield buff[header_len:segment_length+header_len] + buff = buff[segment_length+header_len:] + else: + renew = True + else: + renew = True + + if renew or not buff: + if not (new_data := f.read(self.bytes_per_read)): + break + buff = buff + new_data + ####while + ####with + return + + + #%% Scenarist BD format parser class EsMuiStream: - def __init__(self, mui_file: Union[str, Path], pes_file: Union[str, Path]) -> None: - if not os.path.exists(mui_file) or not os.path.exists(pes_file): + def __init__(self, mui_file: Union[str, Path], es_file: Union[str, Path]) -> None: + if not os.path.exists(mui_file) or not os.path.exists(es_file): raise FileNotFoundError("Missing MUI or xES file.") #MUI files are lightweight, read it all at once. with open(mui_file, 'rb') as f: self._mui_data = f.read() - self._pes_file = pes_file + self._es_file = es_file - assert self.type == MUIType.GRAPHICS, f"Not a MUI Graphics file, got '{self.type}'" + assert self.type in [MUIType.GRAPHICS, MUIType.TEXT], f"Not a support MUI file, got '{self.type}'" assert self.__class__._mui_tail() == self._mui_data[-14:], "MUI tail signature not found." self._mui_data = self._mui_data[:-14] @@ -172,9 +233,39 @@ def encode_timestamps(pts: int, dts: int) -> bytes: return payload def gen_segments(self) -> Generator[bytes, None, None]: - valid_segments = [pgst for pgst in BDGraphicSegment] + if self.type == MUIType.GRAPHICS: + yield from self._gen_segments_graphics() + elif self.type == MUIType.TEXT: + yield from self._gen_segments_text() + else: + raise AssertionError(f"Unhandled MUI type '{self.type}'.") + + def _gen_segments_text(self) -> Generator[bytes, None, None]: + valid_segments = [tseg for tseg in TextSegment] + index = 4 + assert self.type == MUIType.TEXT, "Not a Text asset." + with open(self._es_file, 'rb') as tes: + while self._mui_data[index:]: + segment_type = self._mui_data[index] + assert segment_type in valid_segments + + index += 1 + block_length = unpack(">I", self._mui_data[index:(index:=index+4)])[0] + + assert self._mui_data[index:(index:=index+9)] == b'\x00'*9, "Encountered non-null timestamp in TES.MUI?!" + segment_data = tes.read(block_length) + if len(segment_data) < block_length: + segment_data += tes.read(block_length-len(segment_data)) + assert len(segment_data) == block_length, "IO error or incomplete TES file." + assert segment_data[0] == segment_type, "Segment type mismatch between MUI and TES." + yield segment_data + return None + + def _gen_segments_graphics(self) -> Generator[bytes, None, None]: + valid_segments = [pgst for pgst in GraphicSegment] index = 4 - with open(self._pes_file, 'rb') as pes: + assert self.type == MUIType.GRAPHICS + with open(self._es_file, 'rb') as pes: while self._mui_data[index:]: segment_type = self._mui_data[index] assert segment_type in valid_segments @@ -194,20 +285,37 @@ def gen_segments(self) -> Generator[bytes, None, None]: def segments(self) -> list[bytes]: return [seg for seg in self.gen_segments()] + def check_integrity(self) -> bool: + try: + for seg in self.gen_segments(): ... + except AssertionError as e: + return False + else: + return True + @classmethod def _mui_tail(cls) -> bytes: return bytes([0xFF] + [0x00]*13) @classmethod - def segment_writer(cls, es_file: Union[str, Path], mui_file: Optional[Union[str, Path]] = None) -> Generator[None, None, None]: + def segment_writer(cls, + es_file: Union[str, Path], + mui_file: Optional[Union[str, Path]] = None, + mui_type: MUIType = MUIType.GRAPHICS + ) -> Generator[None, Type[bytes], None]: + """ + Write segments as they arrive to manage memory efficiently. + """ if mui_file is None: ext = '.' + ('MUI' if str(es_file).endswith('ES') else 'mui') mui_file = str(es_file) + ext + assert mui_type == MUIType.GRAPHICS, f"'{MUIType(mui_type)}' not yet supported in segment_writer." + esf = open(es_file, 'wb') mui = open(mui_file, 'wb') - mui.write(bytes([0x00, 0x00, 0x00, MUIType.GRAPHICS])) + mui.write(bytes([0x00, 0x00, 0x00, mui_type])) try: segment = yield @@ -225,9 +333,64 @@ def segment_writer(cls, es_file: Union[str, Path], mui_file: Optional[Union[str, yield None @classmethod - def convert_to_esmui(cls, stream_file: Union[str, Path], es_file: Union[str, Path], mui_file: Optional[Union[str, Path]] = None) -> None: + def convert_to_tesmui(cls, + stream_file: Union[str, Path], + es_file: Union[str, Path], + mui_file: Optional[Union[str, Path]] = None + ) -> None: + stream = TextSTFile(stream_file) + + def shift_pts(pts: bytes): + ticks = 0 + for byte in pts: + ticks = (ticks << 8) + byte + return ticks + 54000000 #600*90e3 + + def encode_pts(pts: int) -> bytes: + return bytes([(pts >> (8*(4-k))) & 0xFF for k in range(5)]) + + if mui_file is None: + ext = '.' + ('MUI' if str(es_file).endswith('ES') else 'mui') + mui_file = str(es_file) + ext + + esf = open(es_file, 'wb') + mui = open(mui_file, 'wb') + + mui.write(bytes([0x00, 0x00, 0x00, MUIType.TEXT])) + + try: + for sc, segment in enumerate(stream.gen_segments()): + length = unpack(">H", segment[1:3])[0] + #Write segment without length and timing data + if segment[0] == TextSegment.STYLE: + #hack, SubtitleEdit includes number of dialog linked to style + #in length but Scenarist does not. SubtitleEdit may do something wrong. + ts_length = length-2 + esf.write(segment[0:1] + bytes([ts_length >> 8, ts_length & 0xFF]) + segment[3:]) + elif segment[0] == TextSegment.DIALOG: + pts1 = encode_pts(shift_pts(segment[3:8])) + pts2 = encode_pts(shift_pts(segment[8:13])) + esf.write(segment[:3] + pts1 + pts2 + segment[13:]) + else: + raise AssertionError("Unknown segment found in TextST stream.") + #Write header (segment type, length+3, mux_dts=0, mux_pts=0) + mui.write(segment[0:1] + pack(">I", length+3) + b'\x00'*9) + #write tail + mui.write(bytes([0xFF] + [0x00]*13)) + print(f"Converted {sc} segments.") + except Exception as e: + print(f"Critical error while writing PES+MUI: '{e}'") + mui.close() + esf.close() + + @classmethod + def convert_to_pesmui(cls, + stream_file: Union[str, Path], + es_file: Union[str, Path], + mui_file: Optional[Union[str, Path]] = None, + ) -> None: """ - Convert a raw stream to a MuiFile. + Convert a graphic stream to a MuiFile. """ stream = StreamFile(stream_file) @@ -251,7 +414,7 @@ def convert_to_esmui(cls, stream_file: Union[str, Path], es_file: Union[str, Pat mui.write(bytes([0xFF] + [0x00]*13)) print(f"Converted {sc} segments.") except Exception as e: - print(f"Critical error while writing ES+MUI: '{e}'") + print(f"Critical error while writing PES+MUI: '{e}'") mui.close() esf.close()