Skip to content

Commit

Permalink
Rewrite Geetest logic entirely and add stoken cookie refresh function (
Browse files Browse the repository at this point in the history
  • Loading branch information
jokelbaf authored Jan 13, 2024
1 parent f0dbef5 commit 3b30133
Show file tree
Hide file tree
Showing 7 changed files with 412 additions and 134 deletions.
2 changes: 1 addition & 1 deletion genshin/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ async def lineups(client: genshin.Client, scenario: typing.Optional[str]) -> Non
click.echo(f"{click.style('Characters:', bold=True)}")
for group, characters in enumerate(lineup.characters, 1):
if len(lineup.characters) > 1:
click.echo(f"- Group {group }:")
click.echo(f"- Group {group}:")
for character in characters:
click.echo(f" - {character.name} ({character.role})")

Expand Down
240 changes: 215 additions & 25 deletions genshin/client/components/geetest/client.py
Original file line number Diff line number Diff line change
@@ -1,54 +1,63 @@
"""Geetest client component."""
import base64
import json
import typing

import aiohttp
import aiohttp.web
import yarl

from genshin import errors
from genshin import constants, errors
from genshin.client import routes
from genshin.client.components import base
from genshin.utility import ds as ds_utility
from genshin.utility import geetest as geetest_utility

from . import server

__all__ = ["GeetestClient"]


WEB_LOGIN_URL = yarl.URL("https://sg-public-api.hoyolab.com/account/ma-passport/api/webLoginByPassword")


class GeetestClient(base.BaseClient):
"""Geetest client component."""

async def login_with_geetest(
self, account: str, password: str, session_id: str, geetest: typing.Dict[str, str]
) -> typing.Mapping[str, str]:
"""Login with a password and a solved geetest.
async def web_login(
self,
account: str,
password: str,
*,
tokenType: typing.Optional[int] = 6,
geetest: typing.Optional[typing.Dict[str, typing.Any]] = None,
) -> typing.Dict[str, typing.Any]:
"""Login with a password using web endpoint.
Token type is a bitfield of cookie_token, ltoken, stoken.
Returns either data from aigis header or cookies.
"""
headers = {**geetest_utility.WEB_LOGIN_HEADERS}
if geetest:
mmt_data = geetest["data"]
session_id = geetest["session_id"]
headers["x-rpc-aigis"] = geetest_utility.get_aigis_header(session_id, mmt_data)

payload = {
"account": geetest_utility.encrypt_geetest_password(account),
"password": geetest_utility.encrypt_geetest_password(password),
"token_type": 6,
"account": geetest_utility.encrypt_geetest_credentials(account),
"password": geetest_utility.encrypt_geetest_credentials(password),
"token_type": tokenType,
}

# we do not want to use the previous cookie manager sessions

async with aiohttp.ClientSession() as session:
async with session.post(
WEB_LOGIN_URL,
routes.WEB_LOGIN_URL.get_url(),
json=payload,
headers={
**geetest_utility.HEADERS,
"x-rpc-aigis": f"{session_id};{base64.b64encode(json.dumps(geetest).encode()).decode()}",
},
headers=headers,
) as r:
data = await r.json()
cookies = {cookie.key: cookie.value for cookie in r.cookies.values()}

if data["retcode"] == -3101:
# Captcha triggered
aigis = json.loads(r.headers["x-rpc-aigis"])
aigis["data"] = json.loads(aigis["data"])
return aigis

if not data["data"]:
errors.raise_for_retcode(data)

Expand All @@ -59,9 +68,190 @@ async def login_with_geetest(

return cookies

async def login_with_password(self, account: str, password: str, *, port: int = 5000) -> typing.Mapping[str, str]:
"""Login with a password.
async def app_login(
self,
account: str,
password: str,
*,
geetest: typing.Optional[typing.Dict[str, typing.Any]] = None,
) -> typing.Dict[str, typing.Any]:
"""Login with a password using HoYoLab app endpoint.
Returns data from aigis header or action_ticket or cookies.
"""
headers = {
**geetest_utility.APP_LOGIN_HEADERS,
"ds": ds_utility.generate_dynamic_secret(constants.DS_SALT["app_login"]),
}
if geetest:
mmt_data = geetest["data"]
session_id = geetest["session_id"]
headers["x-rpc-aigis"] = geetest_utility.get_aigis_header(session_id, mmt_data)

payload = {
"account": geetest_utility.encrypt_geetest_credentials(account),
"password": geetest_utility.encrypt_geetest_credentials(password),
}

async with aiohttp.ClientSession() as session:
async with session.post(
routes.APP_LOGIN_URL.get_url(),
json=payload,
headers=headers,
) as r:
data = await r.json()

if data["retcode"] == -3101:
# Captcha triggered
aigis = json.loads(r.headers["x-rpc-aigis"])
aigis["data"] = json.loads(aigis["data"])
return aigis

if data["retcode"] == -3239:
# Email verification required
return json.loads(r.headers["x-rpc-verify"])

if not data["data"]:
errors.raise_for_retcode(data)

cookies = {
"stoken": data["data"]["token"]["token"],
"ltuid_v2": data["data"]["user_info"]["aid"],
"ltmid_v2": data["data"]["user_info"]["mid"],
"account_id_v2": data["data"]["user_info"]["aid"],
"account_mid_v2": data["data"]["user_info"]["mid"],
}
self.set_cookies(cookies)

return cookies

async def send_verification_email(
self,
ticket: typing.Dict[str, typing.Any],
*,
geetest: typing.Optional[typing.Dict[str, typing.Any]] = None,
) -> typing.Union[None, typing.Dict[str, typing.Any]]:
"""Send verification email.
This will start a webserver.
Returns None if success, aigis headers (mmt/aigis) otherwise.
"""
return await server.login_with_app(self, account, password, port=port)
headers = {**geetest_utility.EMAIL_SEND_HEADERS}
if geetest:
mmt_data = geetest["data"]
session_id = geetest["session_id"]
headers["x-rpc-aigis"] = geetest_utility.get_aigis_header(session_id, mmt_data)

async with aiohttp.ClientSession() as session:
async with session.post(
routes.SEND_VERIFICATION_CODE_URL.get_url(),
json={
"action_type": "verify_for_component",
"action_ticket": ticket["verify_str"]["ticket"],
},
headers=headers,
) as r:
data = await r.json()

if data["retcode"] == -3101:
# Captcha triggered
aigis = json.loads(r.headers["x-rpc-aigis"])
aigis["data"] = json.loads(aigis["data"])
return aigis

if data["retcode"] != 0:
errors.raise_for_retcode(data)

return None

async def verify_email(self, code: str, ticket: typing.Dict[str, typing.Any]) -> None:
"""Verify email."""
async with aiohttp.ClientSession() as session:
async with session.post(
routes.VERIFY_EMAIL_URL.get_url(),
json={
"action_type": "verify_for_component",
"action_ticket": ticket["verify_str"]["ticket"],
"email_captcha": code,
"verify_method": 2,
},
headers=geetest_utility.EMAIL_VERIFY_HEADERS,
) as r:
data = await r.json()

if data["retcode"] != 0:
errors.raise_for_retcode(data)

return None

async def login_with_password(
self,
account: str,
password: str,
*,
port: int = 5000,
tokenType: typing.Optional[int] = 6,
geetest_solver: typing.Optional[
typing.Callable[[typing.Dict[str, typing.Any]], typing.Awaitable[typing.Dict[str, typing.Any]]]
] = None,
) -> typing.Dict[str, str]:
"""Login with a password via web endpoint.
Note that this will start a webserver if captcha is
triggered and `geetest_solver` is not passed.
"""
result = await self.web_login(account, password, tokenType=tokenType)

if "session_id" not in result:
# Captcha not triggered
return result

if geetest_solver:
geetest = await geetest_solver(result)
else:
geetest = await server.solve_geetest(result, port=port)

return await self.web_login(account, password, tokenType=tokenType, geetest=geetest)

async def login_with_app_password(
self,
account: str,
password: str,
*,
port: int = 5000,
geetest_solver: typing.Optional[
typing.Callable[[typing.Dict[str, typing.Any]], typing.Awaitable[typing.Dict[str, typing.Any]]]
] = None,
) -> typing.Dict[str, str]:
"""Login with a password via HoYoLab app endpoint.
Note that this will start a webserver if either of the
following happens:
1. Captcha is triggered and `geetest_solver` is not passed.
2. Email verification is triggered (can happen if you
first login with a new device).
"""
result = await self.app_login(account, password)

if "session_id" in result:
# Captcha triggered
if geetest_solver:
geetest = await geetest_solver(result)
else:
geetest = await server.solve_geetest(result, port=port)

result = await self.app_login(account, password, geetest=geetest)

if "risk_ticket" in result:
# Email verification required
mmt = await self.send_verification_email(result)
if mmt:
if geetest_solver:
geetest = await geetest_solver(mmt)
else:
geetest = await server.solve_geetest(mmt, port=port)

await server.verify_email(self, result, port=port)
result = await self.app_login(account, password)

return result
Loading

0 comments on commit 3b30133

Please sign in to comment.