diff --git a/project/thscoreboard/replays/constant_helpers.py b/project/thscoreboard/replays/constant_helpers.py index b68468ff..ae8f720b 100644 --- a/project/thscoreboard/replays/constant_helpers.py +++ b/project/thscoreboard/replays/constant_helpers.py @@ -12,6 +12,10 @@ from replays import models +def _Game(): + return apps.apps.get_model("replays", "Game") + + def _Route(): return apps.apps.get_model("replays", "Route") @@ -20,6 +24,14 @@ def _Shot(): return apps.apps.get_model("replays", "Shot") +class UnknownGameError(Exception): + """An ID for a constant row did not match any row in the database.""" + + def __init__(self, game_id: str): + super().__init__(self, game_id) + self.id = game_id + + @dataclasses.dataclass(frozen=True) class ReplayConstantModels: game: models.Game @@ -30,12 +42,28 @@ class ReplayConstantModels: def GetModelInstancesForReplay( replay_info: replay_parsing.ReplayInfo, ) -> models.ReplayConstantModels: - """Get the constant model instances related to this replay.""" - shot = ( - _Shot() - .objects.select_related("game") - .get(game=replay_info.game, shot_id=replay_info.shot) - ) + """Get the constant model instances related to this replay. + + Raises: + UnknownGameError: If the "game" field of the replay does not correspond + to a row in the database. + """ + try: + shot = ( + _Shot() + .objects.select_related("game") + .get(game=replay_info.game, shot_id=replay_info.shot) + ) + except _Shot().DoesNotExist: + # If the game also does not exist, we can raise an appropriate exception. + # If the game exists but the shot does not, either we have a bug or the + # replay is extremely bizarre. + try: + _ = _Game().objects.get(game_id=replay_info.game) + except _Game().DoesNotExist: + raise UnknownGameError(replay_info.game) + raise + if replay_info.route: route = _Route().objects.get(game=shot.game, route_id=replay_info.route) else: diff --git a/project/thscoreboard/replays/game_ids.py b/project/thscoreboard/replays/game_ids.py index 3d31a964..bac6492b 100644 --- a/project/thscoreboard/replays/game_ids.py +++ b/project/thscoreboard/replays/game_ids.py @@ -18,6 +18,7 @@ class GameIDs: TH07 = "th07" TH08 = "th08" TH09 = "th09" + TH095 = "th095" TH10 = "th10" TH11 = "th11" TH12 = "th12" diff --git a/project/thscoreboard/replays/kaitai_parsers/th095_encrypted.py b/project/thscoreboard/replays/kaitai_parsers/th095_encrypted.py new file mode 100644 index 00000000..9f3a758c --- /dev/null +++ b/project/thscoreboard/replays/kaitai_parsers/th095_encrypted.py @@ -0,0 +1,103 @@ +# This is a generated file! Please edit source .ksy file and use kaitai-struct-compiler to rebuild + +import kaitaistruct +from kaitaistruct import KaitaiStruct, KaitaiStream, BytesIO + + +if getattr(kaitaistruct, 'API_VERSION', (0, 9)) < (0, 9): + raise Exception("Incompatible Kaitai Struct Python API: 0.9 or later is required, but you have %s" % (kaitaistruct.__version__)) + +class Th095Encrypted(KaitaiStruct): + def __init__(self, _io, _parent=None, _root=None): + self._io = _io + self._parent = _parent + self._root = _root if _root else self + self._read() + + def _read(self): + self.header = Th095Encrypted.FileHeader(self._io, self, self._root) + + class FileHeader(KaitaiStruct): + def __init__(self, _io, _parent=None, _root=None): + self._io = _io + self._parent = _parent + self._root = _root if _root else self + self._read() + + def _read(self): + self.magic = self._io.read_bytes(4) + if not self.magic == b"\x74\x39\x35\x72": + raise kaitaistruct.ValidationNotEqualError(b"\x74\x39\x35\x72", self.magic, self._io, u"/types/file_header/seq/0") + self.unknown_1 = self._io.read_bytes(8) + self.userdata_offset = self._io.read_u4le() + + + class Userdata(KaitaiStruct): + def __init__(self, _io, _parent=None, _root=None): + self._io = _io + self._parent = _parent + self._root = _root if _root else self + self._read() + + def _read(self): + self.magic_user = self._io.read_bytes(4) + if not self.magic_user == b"\x55\x53\x45\x52": + raise kaitaistruct.ValidationNotEqualError(b"\x55\x53\x45\x52", self.magic_user, self._io, u"/types/userdata/seq/0") + self.user_length = self._io.read_u4le() + self.unknown = self._io.read_bytes(4) + self.user_desc = [] + i = 0 + while True: + _ = self._io.read_u1() + self.user_desc.append(_) + if _ == 13: + break + i += 1 + self.user_desc_term = (self._io.read_bytes_term(10, False, True, True)).decode(u"ASCII") + self.version = Th095Encrypted.UserdataField(u"Version", self._io, self, self._root) + self.username = Th095Encrypted.UserdataField(u"Name", self._io, self, self._root) + self.level = Th095Encrypted.UserdataField(u"Level", self._io, self, self._root) + self.scene = Th095Encrypted.UserdataField(u"Scene", self._io, self, self._root) + self.date = Th095Encrypted.UserdataField(u"Date", self._io, self, self._root) + self.score = Th095Encrypted.UserdataField(u"Score", self._io, self, self._root) + self.slowdown = Th095Encrypted.UserdataField(u"Slow Rate", self._io, self, self._root) + + + class UserdataField(KaitaiStruct): + def __init__(self, expected_name, _io, _parent=None, _root=None): + self._io = _io + self._parent = _parent + self._root = _root if _root else self + self.expected_name = expected_name + self._read() + + def _read(self): + self.name = (self._io.read_bytes(len(self.expected_name))).decode(u"ASCII") + if not self.name == self.expected_name: + raise kaitaistruct.ValidationNotEqualError(self.expected_name, self.name, self._io, u"/types/userdata_field/seq/0") + self.name_value_separator_space = self._io.read_bytes(1) + if not self.name_value_separator_space == b"\x20": + raise kaitaistruct.ValidationNotEqualError(b"\x20", self.name_value_separator_space, self._io, u"/types/userdata_field/seq/1") + self.value_with_space = (self._io.read_bytes_term(10, False, True, True)).decode(u"ASCII") + + @property + def value(self): + if hasattr(self, '_m_value'): + return self._m_value + + self._m_value = (self.value_with_space)[0:(len(self.value_with_space) - 1)] + return getattr(self, '_m_value', None) + + + @property + def userdata(self): + if hasattr(self, '_m_userdata'): + return self._m_userdata + + _pos = self._io.pos() + self._io.seek(self.header.userdata_offset) + self._m_userdata = Th095Encrypted.Userdata(self._io, self, self._root) + self._io.seek(_pos) + return getattr(self, '_m_userdata', None) + + diff --git a/project/thscoreboard/replays/replay_parsing.py b/project/thscoreboard/replays/replay_parsing.py index 94cc09dc..08b38419 100644 --- a/project/thscoreboard/replays/replay_parsing.py +++ b/project/thscoreboard/replays/replay_parsing.py @@ -9,6 +9,7 @@ from .kaitai_parsers import th07 from .kaitai_parsers import th08 from .kaitai_parsers import th09 +from .kaitai_parsers import th095_encrypted from .kaitai_parsers import th10 from .kaitai_parsers import th11 from .kaitai_parsers import th12 @@ -90,7 +91,11 @@ class ReplayInfo: name: str replay_type: int route: Optional[str] = None + spell_card_id: Optional[int] = None + scene_game_level: Optional[int] = None + scene_game_scene: Optional[int] = None + stages: List[ReplayStage] = dataclasses.field(default_factory=list) slowdown: Optional[float] = None @@ -446,6 +451,26 @@ def _Parse09(rep_raw): return r +def _Parse095(rep_raw): + encrypted_replay = th095_encrypted.Th095Encrypted.from_bytes(rep_raw) + + spell_level = int(encrypted_replay.userdata.level.value) + spell_scene = int(encrypted_replay.userdata.scene.value) + + return ReplayInfo( + game=game_ids.GameIDs.TH095, + shot="Aya", + difficulty=0, + score=int(encrypted_replay.userdata.score.value), + timestamp=time.strptime(encrypted_replay.userdata.date.value, "%y/%m/%d %H:%M"), + name=encrypted_replay.userdata.username.value, + replay_type=game_ids.ReplayTypes.SPELL_PRACTICE, + scene_game_level=spell_level, + scene_game_scene=spell_scene, + slowdown=float(encrypted_replay.userdata.slowdown.value), + ) + + def _Parse10(rep_raw): header = th_modern.ThModern.from_bytes(rep_raw) comp_data = bytearray(header.main.comp_data) @@ -1104,6 +1129,8 @@ def Parse(replay) -> ReplayInfo: return _Parse08(replay) elif gamecode == b"T9RP": return _Parse09(replay) + elif gamecode == b"t95r": + return _Parse095(replay) elif gamecode == b"t10r": return _Parse10(replay) elif gamecode == b"t11r": diff --git a/project/thscoreboard/replays/replays_for_tests/th95_2-5.rpy b/project/thscoreboard/replays/replays_for_tests/th95_2-5.rpy new file mode 100644 index 00000000..1bf9d102 Binary files /dev/null and b/project/thscoreboard/replays/replays_for_tests/th95_2-5.rpy differ diff --git a/project/thscoreboard/replays/replays_for_tests/th95_3-1.rpy b/project/thscoreboard/replays/replays_for_tests/th95_3-1.rpy new file mode 100644 index 00000000..c28eabed Binary files /dev/null and b/project/thscoreboard/replays/replays_for_tests/th95_3-1.rpy differ diff --git a/project/thscoreboard/replays/test_constant_helpers.py b/project/thscoreboard/replays/test_constant_helpers.py index ebaedc96..9d55c2ae 100644 --- a/project/thscoreboard/replays/test_constant_helpers.py +++ b/project/thscoreboard/replays/test_constant_helpers.py @@ -28,3 +28,13 @@ def testGetWithRoute(self): self.assertEqual(constants.shot.shot_id, "Yukari") self.assertIsNotNone(constants.route) self.assertEqual(constants.route.route_id, "Final B") + + def testGetUnknownGame(self): + replay_file_contents = test_replays.GetRaw("th10_normal") + replay_info = replay_parsing.Parse(replay_file_contents) + replay_info.game = "th5000" + + with self.assertRaises(constant_helpers.UnknownGameError) as ctx: + constant_helpers.GetModelInstancesForReplay(replay_info) + + self.assertEqual(ctx.exception.id, "th5000") diff --git a/project/thscoreboard/replays/test_replay_parsing.py b/project/thscoreboard/replays/test_replay_parsing.py index 289a2523..2281a53f 100644 --- a/project/thscoreboard/replays/test_replay_parsing.py +++ b/project/thscoreboard/replays/test_replay_parsing.py @@ -209,6 +209,34 @@ def testPVP(self): self.assertEqual(s.th09_p2_score, 0) +class Th095ReplayTestCase(unittest.TestCase): + def testOne(self): + r = ParseTestReplay("th95_3-1") + self.assertEqual(r.game, "th095") + self.assertEqual(r.score, 132030) + self.assertEqual( + r.timestamp, + datetime.datetime(2024, 4, 27, 16, 42, tzinfo=datetime.timezone.utc), + ) + self.assertEqual(r.scene_game_level, 3) + self.assertEqual(r.scene_game_scene, 1) + self.assertEqual(r.name, "nrook3.1") + self.assertEqual(r.slowdown, 0.00) + + def testTwo(self): + r = ParseTestReplay("th95_2-5") + self.assertEqual(r.game, "th095") + self.assertEqual(r.score, 128830) + self.assertEqual( + r.timestamp, + datetime.datetime(2024, 4, 27, 19, 54, tzinfo=datetime.timezone.utc), + ) + self.assertEqual(r.scene_game_level, 2) + self.assertEqual(r.scene_game_scene, 5) + self.assertEqual(r.name, "nrook ") + self.assertEqual(r.slowdown, 0.00) + + class Th10ReplayTestCase(unittest.TestCase): def testNormal(self): r = ParseTestReplay("th10_normal") diff --git a/project/thscoreboard/replays/views/create_replay.py b/project/thscoreboard/replays/views/create_replay.py index 8455fba6..8389dfd9 100644 --- a/project/thscoreboard/replays/views/create_replay.py +++ b/project/thscoreboard/replays/views/create_replay.py @@ -2,6 +2,7 @@ from django.http import Http404 from django.shortcuts import get_object_or_404, redirect, render +from django.utils.translation import gettext as _ from django.views.decorators import http as http_decorators from django.contrib.auth import decorators as auth_decorators from django.core.exceptions import ValidationError @@ -41,10 +42,21 @@ def _HandleReplay(request, replay_bytes): ) try: - return replay_parsing.Parse(replay_bytes) + replay_info = replay_parsing.Parse(replay_bytes) except replay_parsing.Error as e: raise ValidationError(str(e)) + try: + constant_helpers.GetModelInstancesForReplay(replay_info) + except constant_helpers.UnknownGameError: + # It would be nice to actually specify the game here, but adding formatting parameters + # runs into issues with Django rendering. + raise ValidationError( + message=_("This game is not yet supported."), + ) + + return replay_info + @auth_decorators.login_required @http_decorators.require_http_methods(["GET", "HEAD", "POST"]) diff --git a/ref/threp-ksy/th095_encrypted.ksy b/ref/threp-ksy/th095_encrypted.ksy new file mode 100644 index 00000000..e8707adf --- /dev/null +++ b/ref/threp-ksy/th095_encrypted.ksy @@ -0,0 +1,71 @@ +meta: + id: th095_encrypted + file-extension: rpy + endian: le +seq: + - id: header + type: file_header +instances: + userdata: + pos: header.userdata_offset + type: userdata +types: + file_header: + seq: + - id: magic + contents: t95r + # Probably includes version + - id: unknown_1 + size: 8 + - id: userdata_offset + type: u4 + userdata: + seq: + - id: magic_user + contents: USER + - id: user_length + type: u4 + - id: unknown + size: 4 + - id: user_desc + type: u1 + repeat: until + repeat-until: _ == 0xd + - id: user_desc_term + type: str + terminator: 0xa + encoding: ASCII + - id: version + type: userdata_field("Version") + - id: username + type: userdata_field("Name") + - id: level + type: userdata_field("Level") + - id: scene + type: userdata_field("Scene") + - id: date + type: userdata_field("Date") + - id: score + type: userdata_field("Score") + - id: slowdown + type: userdata_field("Slow Rate") + userdata_field: + params: + - id: expected_name + type: str + seq: + - id: name + type: str + size: expected_name.length + encoding: ASCII + valid: expected_name + - id: name_value_separator_space + contents: " " + - id: value_with_space + type: str + # Always ends with 0x0d0a; that is, space then LF + terminator: 0x0a + encoding: ASCII + instances: + value: + value: value_with_space.substring(0, value_with_space.length - 1)