Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Userdata-only parser for Touhou 9.5 (Shoot the Bullet). #506

Merged
merged 5 commits into from
Nov 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 34 additions & 6 deletions project/thscoreboard/replays/constant_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
from replays import models


def _Game():
return apps.apps.get_model("replays", "Game")


def _Route():
return apps.apps.get_model("replays", "Route")

Expand All @@ -20,6 +24,14 @@ def _Shot():
return apps.apps.get_model("replays", "Shot")


class UnknownGameError(Exception):
"""An ID for a constant row did not match any row in the database."""

def __init__(self, game_id: str):
super().__init__(self, game_id)
self.id = game_id


@dataclasses.dataclass(frozen=True)
class ReplayConstantModels:
game: models.Game
Expand All @@ -30,12 +42,28 @@ class ReplayConstantModels:
def GetModelInstancesForReplay(
replay_info: replay_parsing.ReplayInfo,
) -> models.ReplayConstantModels:
"""Get the constant model instances related to this replay."""
shot = (
_Shot()
.objects.select_related("game")
.get(game=replay_info.game, shot_id=replay_info.shot)
)
"""Get the constant model instances related to this replay.

Raises:
UnknownGameError: If the "game" field of the replay does not correspond
to a row in the database.
"""
try:
shot = (
_Shot()
.objects.select_related("game")
.get(game=replay_info.game, shot_id=replay_info.shot)
)
except _Shot().DoesNotExist:
# If the game also does not exist, we can raise an appropriate exception.
# If the game exists but the shot does not, either we have a bug or the
# replay is extremely bizarre.
try:
_ = _Game().objects.get(game_id=replay_info.game)
except _Game().DoesNotExist:
raise UnknownGameError(replay_info.game)
raise

if replay_info.route:
route = _Route().objects.get(game=shot.game, route_id=replay_info.route)
else:
Expand Down
1 change: 1 addition & 0 deletions project/thscoreboard/replays/game_ids.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class GameIDs:
TH07 = "th07"
TH08 = "th08"
TH09 = "th09"
TH095 = "th095"
TH10 = "th10"
TH11 = "th11"
TH12 = "th12"
Expand Down
103 changes: 103 additions & 0 deletions project/thscoreboard/replays/kaitai_parsers/th095_encrypted.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# This is a generated file! Please edit source .ksy file and use kaitai-struct-compiler to rebuild

import kaitaistruct
from kaitaistruct import KaitaiStruct, KaitaiStream, BytesIO


if getattr(kaitaistruct, 'API_VERSION', (0, 9)) < (0, 9):
raise Exception("Incompatible Kaitai Struct Python API: 0.9 or later is required, but you have %s" % (kaitaistruct.__version__))

class Th095Encrypted(KaitaiStruct):
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()

def _read(self):
self.header = Th095Encrypted.FileHeader(self._io, self, self._root)

class FileHeader(KaitaiStruct):
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()

def _read(self):
self.magic = self._io.read_bytes(4)
if not self.magic == b"\x74\x39\x35\x72":
raise kaitaistruct.ValidationNotEqualError(b"\x74\x39\x35\x72", self.magic, self._io, u"/types/file_header/seq/0")
self.unknown_1 = self._io.read_bytes(8)
self.userdata_offset = self._io.read_u4le()


class Userdata(KaitaiStruct):
def __init__(self, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self._read()

def _read(self):
self.magic_user = self._io.read_bytes(4)
if not self.magic_user == b"\x55\x53\x45\x52":
raise kaitaistruct.ValidationNotEqualError(b"\x55\x53\x45\x52", self.magic_user, self._io, u"/types/userdata/seq/0")
self.user_length = self._io.read_u4le()
self.unknown = self._io.read_bytes(4)
self.user_desc = []
i = 0
while True:
_ = self._io.read_u1()
self.user_desc.append(_)
if _ == 13:
break
i += 1
self.user_desc_term = (self._io.read_bytes_term(10, False, True, True)).decode(u"ASCII")
self.version = Th095Encrypted.UserdataField(u"Version", self._io, self, self._root)
self.username = Th095Encrypted.UserdataField(u"Name", self._io, self, self._root)
self.level = Th095Encrypted.UserdataField(u"Level", self._io, self, self._root)
self.scene = Th095Encrypted.UserdataField(u"Scene", self._io, self, self._root)
self.date = Th095Encrypted.UserdataField(u"Date", self._io, self, self._root)
self.score = Th095Encrypted.UserdataField(u"Score", self._io, self, self._root)
self.slowdown = Th095Encrypted.UserdataField(u"Slow Rate", self._io, self, self._root)


class UserdataField(KaitaiStruct):
def __init__(self, expected_name, _io, _parent=None, _root=None):
self._io = _io
self._parent = _parent
self._root = _root if _root else self
self.expected_name = expected_name
self._read()

def _read(self):
self.name = (self._io.read_bytes(len(self.expected_name))).decode(u"ASCII")
if not self.name == self.expected_name:
raise kaitaistruct.ValidationNotEqualError(self.expected_name, self.name, self._io, u"/types/userdata_field/seq/0")
self.name_value_separator_space = self._io.read_bytes(1)
if not self.name_value_separator_space == b"\x20":
raise kaitaistruct.ValidationNotEqualError(b"\x20", self.name_value_separator_space, self._io, u"/types/userdata_field/seq/1")
self.value_with_space = (self._io.read_bytes_term(10, False, True, True)).decode(u"ASCII")

@property
def value(self):
if hasattr(self, '_m_value'):
return self._m_value

self._m_value = (self.value_with_space)[0:(len(self.value_with_space) - 1)]
return getattr(self, '_m_value', None)


@property
def userdata(self):
if hasattr(self, '_m_userdata'):
return self._m_userdata

_pos = self._io.pos()
self._io.seek(self.header.userdata_offset)
self._m_userdata = Th095Encrypted.Userdata(self._io, self, self._root)
self._io.seek(_pos)
return getattr(self, '_m_userdata', None)


27 changes: 27 additions & 0 deletions project/thscoreboard/replays/replay_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from .kaitai_parsers import th07
from .kaitai_parsers import th08
from .kaitai_parsers import th09
from .kaitai_parsers import th095_encrypted
from .kaitai_parsers import th10
from .kaitai_parsers import th11
from .kaitai_parsers import th12
Expand Down Expand Up @@ -90,7 +91,11 @@ class ReplayInfo:
name: str
replay_type: int
route: Optional[str] = None

spell_card_id: Optional[int] = None
scene_game_level: Optional[int] = None
scene_game_scene: Optional[int] = None

stages: List[ReplayStage] = dataclasses.field(default_factory=list)
slowdown: Optional[float] = None

Expand Down Expand Up @@ -446,6 +451,26 @@ def _Parse09(rep_raw):
return r


def _Parse095(rep_raw):
encrypted_replay = th095_encrypted.Th095Encrypted.from_bytes(rep_raw)

spell_level = int(encrypted_replay.userdata.level.value)
spell_scene = int(encrypted_replay.userdata.scene.value)

return ReplayInfo(
game=game_ids.GameIDs.TH095,
shot="Aya",
difficulty=0,
score=int(encrypted_replay.userdata.score.value),
timestamp=time.strptime(encrypted_replay.userdata.date.value, "%y/%m/%d %H:%M"),
name=encrypted_replay.userdata.username.value,
replay_type=game_ids.ReplayTypes.SPELL_PRACTICE,
scene_game_level=spell_level,
scene_game_scene=spell_scene,
slowdown=float(encrypted_replay.userdata.slowdown.value),
)


def _Parse10(rep_raw):
header = th_modern.ThModern.from_bytes(rep_raw)
comp_data = bytearray(header.main.comp_data)
Expand Down Expand Up @@ -1104,6 +1129,8 @@ def Parse(replay) -> ReplayInfo:
return _Parse08(replay)
elif gamecode == b"T9RP":
return _Parse09(replay)
elif gamecode == b"t95r":
return _Parse095(replay)
elif gamecode == b"t10r":
return _Parse10(replay)
elif gamecode == b"t11r":
Expand Down
Binary file not shown.
Binary file not shown.
10 changes: 10 additions & 0 deletions project/thscoreboard/replays/test_constant_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,13 @@ def testGetWithRoute(self):
self.assertEqual(constants.shot.shot_id, "Yukari")
self.assertIsNotNone(constants.route)
self.assertEqual(constants.route.route_id, "Final B")

def testGetUnknownGame(self):
replay_file_contents = test_replays.GetRaw("th10_normal")
replay_info = replay_parsing.Parse(replay_file_contents)
replay_info.game = "th5000"

with self.assertRaises(constant_helpers.UnknownGameError) as ctx:
constant_helpers.GetModelInstancesForReplay(replay_info)

self.assertEqual(ctx.exception.id, "th5000")
28 changes: 28 additions & 0 deletions project/thscoreboard/replays/test_replay_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,34 @@ def testPVP(self):
self.assertEqual(s.th09_p2_score, 0)


class Th095ReplayTestCase(unittest.TestCase):
def testOne(self):
r = ParseTestReplay("th95_3-1")
self.assertEqual(r.game, "th095")
self.assertEqual(r.score, 132030)
self.assertEqual(
r.timestamp,
datetime.datetime(2024, 4, 27, 16, 42, tzinfo=datetime.timezone.utc),
)
self.assertEqual(r.scene_game_level, 3)
self.assertEqual(r.scene_game_scene, 1)
self.assertEqual(r.name, "nrook3.1")
self.assertEqual(r.slowdown, 0.00)

def testTwo(self):
r = ParseTestReplay("th95_2-5")
self.assertEqual(r.game, "th095")
self.assertEqual(r.score, 128830)
self.assertEqual(
r.timestamp,
datetime.datetime(2024, 4, 27, 19, 54, tzinfo=datetime.timezone.utc),
)
self.assertEqual(r.scene_game_level, 2)
self.assertEqual(r.scene_game_scene, 5)
self.assertEqual(r.name, "nrook ")
self.assertEqual(r.slowdown, 0.00)


class Th10ReplayTestCase(unittest.TestCase):
def testNormal(self):
r = ParseTestReplay("th10_normal")
Expand Down
14 changes: 13 additions & 1 deletion project/thscoreboard/replays/views/create_replay.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from django.http import Http404
from django.shortcuts import get_object_or_404, redirect, render
from django.utils.translation import gettext as _
from django.views.decorators import http as http_decorators
from django.contrib.auth import decorators as auth_decorators
from django.core.exceptions import ValidationError
Expand Down Expand Up @@ -41,10 +42,21 @@ def _HandleReplay(request, replay_bytes):
)

try:
return replay_parsing.Parse(replay_bytes)
replay_info = replay_parsing.Parse(replay_bytes)
except replay_parsing.Error as e:
raise ValidationError(str(e))

try:
constant_helpers.GetModelInstancesForReplay(replay_info)
except constant_helpers.UnknownGameError:
# It would be nice to actually specify the game here, but adding formatting parameters
# runs into issues with Django rendering.
raise ValidationError(
message=_("This game is not yet supported."),
)

return replay_info


@auth_decorators.login_required
@http_decorators.require_http_methods(["GET", "HEAD", "POST"])
Expand Down
71 changes: 71 additions & 0 deletions ref/threp-ksy/th095_encrypted.ksy
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
meta:
id: th095_encrypted
file-extension: rpy
endian: le
seq:
- id: header
type: file_header
instances:
userdata:
pos: header.userdata_offset
type: userdata
types:
file_header:
seq:
- id: magic
contents: t95r
# Probably includes version
- id: unknown_1
size: 8
- id: userdata_offset
type: u4
userdata:
seq:
- id: magic_user
contents: USER
- id: user_length
type: u4
- id: unknown
size: 4
- id: user_desc
type: u1
repeat: until
repeat-until: _ == 0xd
- id: user_desc_term
type: str
terminator: 0xa
encoding: ASCII
- id: version
type: userdata_field("Version")
- id: username
type: userdata_field("Name")
- id: level
type: userdata_field("Level")
- id: scene
type: userdata_field("Scene")
- id: date
type: userdata_field("Date")
- id: score
type: userdata_field("Score")
- id: slowdown
type: userdata_field("Slow Rate")
userdata_field:
params:
- id: expected_name
type: str
seq:
- id: name
type: str
size: expected_name.length
encoding: ASCII
valid: expected_name
- id: name_value_separator_space
contents: " "
- id: value_with_space
type: str
# Always ends with 0x0d0a; that is, space then LF
terminator: 0x0a
encoding: ASCII
instances:
value:
value: value_with_space.substring(0, value_with_space.length - 1)
Loading