Skip to content

Commit

Permalink
Add unidirectional TextST to TES+MUI conversion.
Browse files Browse the repository at this point in the history
  • Loading branch information
cubicibo committed Jun 18, 2023
1 parent 67cd632 commit fd47e5e
Show file tree
Hide file tree
Showing 2 changed files with 198 additions and 27 deletions.
12 changes: 10 additions & 2 deletions client.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ def exit_msg(msg: str, is_error: bool = True) -> NoReturn:

parser = ArgumentParser()
group = parser.add_mutually_exclusive_group()
group.add_argument("-s", "--stream", type=str, help="Input (sup, mnu) to convert to xES+MUI.", default='')
group.add_argument("-s", "--stream", type=str, help="Input (sup, mnu, textst) to convert to xES+MUI.", default='')
group.add_argument("-x", "--xes", type=str, help="Input xES to convert.", default='')

parser.add_argument("-m", "--mui", type=str, help="Input MUI associated to xES to convert.", default='')
parser.add_argument("-t", "--textst", help="Use if TextST.", action='store_true')


parser.add_argument('-v', '--version', action='version', version=f"(c) {__author__}, v{__version__}")
parser.add_argument("-o", "--output", type=str, required=True)
Expand Down Expand Up @@ -80,10 +82,16 @@ def exit_msg(msg: str, is_error: bool = True) -> NoReturn:
exit_msg("Desired output format is not xES? Exiting.")

print("Converting to xES+MUI...")
EsMuiStream.convert_to_esmui(args.stream, args.output, args.output + '.mui')
if (args.stream.lower().endswith('textst') or args.xes.lower().endswith('tes')) and not args.textst:
exit_msg("Is the conversion for TextST? Flag it as such if so. Exiting...")
elif args.textst:
EsMuiStream.convert_to_tesmui(args.stream, args.output, args.output + '.mui')
else:
EsMuiStream.convert_to_pesmui(args.stream, args.output, args.output + '.mui')
exit_msg("", is_error=False)
elif args.mui:
print("Converting from xES+MUI...")
assert args.textst is False and args.xes.lower().endswith('tes') is False, "Cannot convert TES to TextST at this time."
emf = EsMuiStream(args.mui, args.xes)
emf.convert_to_stream(args.output)
exit_msg("", is_error=False)
Expand Down
213 changes: 188 additions & 25 deletions scenaristream/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,28 +31,31 @@
from pathlib import Path
from enum import IntEnum, Enum
from struct import unpack, pack
from typing import Generator, Union, Optional
from typing import Generator, Union, Optional, Type

class MUIType(IntEnum):
VIDEO = 0x01
AUDIO = 0x02
GRAPHICS=0x03
VIDEO = 0x01
AUDIO = 0x02
GRAPHICS = 0x03
TEXT = 0x04

#TextST should also be MUIType.GRAPHICS, but what is the header? 'TS'?
class StreamHeader(Enum):
PG = b'PG'
IG = b'IG'
TS = b'TS' #!!! This is likely incorrect, see above comment !!!
MPEG_TS = bytes([0x00, 0x00, 0x01, 0xBF])

class BDGraphicSegment(IntEnum):
PDS = 0x14
ODS = 0x15
class GraphicSegment(IntEnum):
PDS = 0x14 #PGS+IGS
ODS = 0x15 #PGS+IGS
PCS = 0x16
WDS = 0x17
WDS = 0x17 #PGS
ICS = 0x18 #IGS
END = 0x80
TDIALOG = 0x81 #TextST
TSTYLE = 0x82 #TextST
END = 0x80 #All

class TextSegment(IntEnum):
END = 0x80 #?
STYLE = 0x81
DIALOG = 0x82

#%% Raw stream format (tsMuxer, SUPer, avs2bdnxml)
class StreamFile:
Expand All @@ -68,10 +71,15 @@ def __init__(self, fp: Union[Path, str], **kwargs) -> None:
def get_header(self) -> StreamHeader:
with open(self.file, 'rb') as f:
header = f.read(2)
long_header = f.read(2)
try:
header = StreamHeader(header)
except:
raise AssertionError("File contains garbage or unknown stream type.")
if header + long_header == StreamHeader.MPEG_TS:
print("Found MPEG-TS header, assuming TextST.")
header = StreamHeader.MPEG_TS
else:
raise AssertionError("File contains garbage or unknown stream type.")
return header

@property
Expand Down Expand Up @@ -127,18 +135,71 @@ def segments(self) -> list[bytes]:
return list(self.gen_segments())
####StreamFile

class TextSTFile(StreamFile):
def get_header(self) -> StreamHeader:
"""
TextST files don't have clear formatting. This assume SubtitleEdit output format
is the correct one, so we check for M2TS packet header and the ID of the first
segment to ensure this is TextST.
"""
with open(self.file, 'rb') as f:
header = f.read(7)
assert header[:4] == StreamHeader.MPEG_TS.value
assert header[-1] in [TextSegment.STYLE, TextSegment.DIALOG]
return StreamHeader.MPEG_TS

def gen_segments(self) -> Generator[bytes, None, None]:
"""
Generator of segments. Stops when all segments in the
file have been consumed. This is the parsing function.
:yield: Every segment, in order, as they appear in the stream file.
"""
MAGIC = self.get_header().value
header_len = len(MAGIC) + 2
assert header_len == 6

with open(self.file, 'rb') as f:
buff = f.read(self.bytes_per_read)
while buff:
renew = False
if len(buff) >= 2:
assert buff[:4] == MAGIC, "Encountered garbage in stream."
if len(buff) >= header_len:
segment_length = unpack(">H", buff[header_len-2:header_len])[0]
if len(buff) >= segment_length+header_len:
#Sanity check, M2TS length should equal TextST one minus header
assert segment_length-3 == unpack(">H", buff[header_len+1:header_len+3])[0]
#Return packet with MPEG_TS header stripped.
yield buff[header_len:segment_length+header_len]
buff = buff[segment_length+header_len:]
else:
renew = True
else:
renew = True

if renew or not buff:
if not (new_data := f.read(self.bytes_per_read)):
break
buff = buff + new_data
####while
####with
return



#%% Scenarist BD format parser
class EsMuiStream:
def __init__(self, mui_file: Union[str, Path], pes_file: Union[str, Path]) -> None:
if not os.path.exists(mui_file) or not os.path.exists(pes_file):
def __init__(self, mui_file: Union[str, Path], es_file: Union[str, Path]) -> None:
if not os.path.exists(mui_file) or not os.path.exists(es_file):
raise FileNotFoundError("Missing MUI or xES file.")

#MUI files are lightweight, read it all at once.
with open(mui_file, 'rb') as f:
self._mui_data = f.read()
self._pes_file = pes_file
self._es_file = es_file

assert self.type == MUIType.GRAPHICS, f"Not a MUI Graphics file, got '{self.type}'"
assert self.type in [MUIType.GRAPHICS, MUIType.TEXT], f"Not a support MUI file, got '{self.type}'"
assert self.__class__._mui_tail() == self._mui_data[-14:], "MUI tail signature not found."
self._mui_data = self._mui_data[:-14]

Expand Down Expand Up @@ -172,9 +233,39 @@ def encode_timestamps(pts: int, dts: int) -> bytes:
return payload

def gen_segments(self) -> Generator[bytes, None, None]:
valid_segments = [pgst for pgst in BDGraphicSegment]
if self.type == MUIType.GRAPHICS:
yield from self._gen_segments_graphics()
elif self.type == MUIType.TEXT:
yield from self._gen_segments_text()
else:
raise AssertionError(f"Unhandled MUI type '{self.type}'.")

def _gen_segments_text(self) -> Generator[bytes, None, None]:
valid_segments = [tseg for tseg in TextSegment]
index = 4
assert self.type == MUIType.TEXT, "Not a Text asset."
with open(self._es_file, 'rb') as tes:
while self._mui_data[index:]:
segment_type = self._mui_data[index]
assert segment_type in valid_segments

index += 1
block_length = unpack(">I", self._mui_data[index:(index:=index+4)])[0]

assert self._mui_data[index:(index:=index+9)] == b'\x00'*9, "Encountered non-null timestamp in TES.MUI?!"
segment_data = tes.read(block_length)
if len(segment_data) < block_length:
segment_data += tes.read(block_length-len(segment_data))
assert len(segment_data) == block_length, "IO error or incomplete TES file."
assert segment_data[0] == segment_type, "Segment type mismatch between MUI and TES."
yield segment_data
return None

def _gen_segments_graphics(self) -> Generator[bytes, None, None]:
valid_segments = [pgst for pgst in GraphicSegment]
index = 4
with open(self._pes_file, 'rb') as pes:
assert self.type == MUIType.GRAPHICS
with open(self._es_file, 'rb') as pes:
while self._mui_data[index:]:
segment_type = self._mui_data[index]
assert segment_type in valid_segments
Expand All @@ -194,20 +285,37 @@ def gen_segments(self) -> Generator[bytes, None, None]:
def segments(self) -> list[bytes]:
return [seg for seg in self.gen_segments()]

def check_integrity(self) -> bool:
try:
for seg in self.gen_segments(): ...
except AssertionError as e:
return False
else:
return True

@classmethod
def _mui_tail(cls) -> bytes:
return bytes([0xFF] + [0x00]*13)

@classmethod
def segment_writer(cls, es_file: Union[str, Path], mui_file: Optional[Union[str, Path]] = None) -> Generator[None, None, None]:
def segment_writer(cls,
es_file: Union[str, Path],
mui_file: Optional[Union[str, Path]] = None,
mui_type: MUIType = MUIType.GRAPHICS
) -> Generator[None, Type[bytes], None]:
"""
Write segments as they arrive to manage memory efficiently.
"""
if mui_file is None:
ext = '.' + ('MUI' if str(es_file).endswith('ES') else 'mui')
mui_file = str(es_file) + ext

assert mui_type == MUIType.GRAPHICS, f"'{MUIType(mui_type)}' not yet supported in segment_writer."

esf = open(es_file, 'wb')
mui = open(mui_file, 'wb')

mui.write(bytes([0x00, 0x00, 0x00, MUIType.GRAPHICS]))
mui.write(bytes([0x00, 0x00, 0x00, mui_type]))

try:
segment = yield
Expand All @@ -225,9 +333,64 @@ def segment_writer(cls, es_file: Union[str, Path], mui_file: Optional[Union[str,
yield None

@classmethod
def convert_to_esmui(cls, stream_file: Union[str, Path], es_file: Union[str, Path], mui_file: Optional[Union[str, Path]] = None) -> None:
def convert_to_tesmui(cls,
stream_file: Union[str, Path],
es_file: Union[str, Path],
mui_file: Optional[Union[str, Path]] = None
) -> None:
stream = TextSTFile(stream_file)

def shift_pts(pts: bytes):
ticks = 0
for byte in pts:
ticks = (ticks << 8) + byte
return ticks + 54000000 #600*90e3

def encode_pts(pts: int) -> bytes:
return bytes([(pts >> (8*(4-k))) & 0xFF for k in range(5)])

if mui_file is None:
ext = '.' + ('MUI' if str(es_file).endswith('ES') else 'mui')
mui_file = str(es_file) + ext

esf = open(es_file, 'wb')
mui = open(mui_file, 'wb')

mui.write(bytes([0x00, 0x00, 0x00, MUIType.TEXT]))

try:
for sc, segment in enumerate(stream.gen_segments()):
length = unpack(">H", segment[1:3])[0]
#Write segment without length and timing data
if segment[0] == TextSegment.STYLE:
#hack, SubtitleEdit includes number of dialog linked to style
#in length but Scenarist does not. SubtitleEdit may do something wrong.
ts_length = length-2
esf.write(segment[0:1] + bytes([ts_length >> 8, ts_length & 0xFF]) + segment[3:])
elif segment[0] == TextSegment.DIALOG:
pts1 = encode_pts(shift_pts(segment[3:8]))
pts2 = encode_pts(shift_pts(segment[8:13]))
esf.write(segment[:3] + pts1 + pts2 + segment[13:])
else:
raise AssertionError("Unknown segment found in TextST stream.")
#Write header (segment type, length+3, mux_dts=0, mux_pts=0)
mui.write(segment[0:1] + pack(">I", length+3) + b'\x00'*9)
#write tail
mui.write(bytes([0xFF] + [0x00]*13))
print(f"Converted {sc} segments.")
except Exception as e:
print(f"Critical error while writing PES+MUI: '{e}'")
mui.close()
esf.close()

@classmethod
def convert_to_pesmui(cls,
stream_file: Union[str, Path],
es_file: Union[str, Path],
mui_file: Optional[Union[str, Path]] = None,
) -> None:
"""
Convert a raw stream to a MuiFile.
Convert a graphic stream to a MuiFile.
"""
stream = StreamFile(stream_file)

Expand All @@ -251,7 +414,7 @@ def convert_to_esmui(cls, stream_file: Union[str, Path], es_file: Union[str, Pat
mui.write(bytes([0xFF] + [0x00]*13))
print(f"Converted {sc} segments.")
except Exception as e:
print(f"Critical error while writing ES+MUI: '{e}'")
print(f"Critical error while writing PES+MUI: '{e}'")
mui.close()
esf.close()

Expand Down

0 comments on commit fd47e5e

Please sign in to comment.