Skip to content

Commit

Permalink
feat: Add CN password login
Browse files Browse the repository at this point in the history
  • Loading branch information
seriaati committed Mar 25, 2024
1 parent f23467a commit df80d1c
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 9 deletions.
104 changes: 98 additions & 6 deletions genshin/client/components/geetest/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import json
import typing
from http.cookies import SimpleCookie

import aiohttp
import aiohttp.web
Expand Down Expand Up @@ -39,8 +40,8 @@ async def _web_login(
headers["x-rpc-aigis"] = geetest_utility.get_aigis_header(session_id, mmt_data)

payload = {
"account": geetest_utility.encrypt_geetest_credentials(account),
"password": geetest_utility.encrypt_geetest_credentials(password),
"account": geetest_utility.encrypt_geetest_credentials(account, self._region),
"password": geetest_utility.encrypt_geetest_credentials(password, self._region),
"token_type": tokenType,
}

Expand Down Expand Up @@ -69,6 +70,60 @@ async def _web_login(

return cookies

async def _cn_login_by_password(
self,
account: str,
password: str,
*,
geetest: typing.Optional[typing.Dict[str, typing.Any]] = None,
) -> typing.Dict[str, typing.Any]:
"""
Login with account and password using Miyoushe loginByPassword endpoint.
Returns data from aigis header or cookies.
"""
headers = {
**geetest_utility.CN_LOGIN_HEADERS,
"ds": ds_utility.generate_dynamic_secret(constants.DS_SALT["cn_signin"]),
}
if geetest:
mmt_data = geetest["data"]
session_id = geetest["session_id"]
headers["x-rpc-aigis"] = geetest_utility.get_aigis_header(session_id, mmt_data)

payload = {
"account": geetest_utility.encrypt_geetest_credentials(account, self._region),
"password": geetest_utility.encrypt_geetest_credentials(password, self._region),
}

async with aiohttp.ClientSession() as session:
async with session.post(
routes.CN_LOGIN_URL.get_url(),
json=payload,
headers=headers,
) as r:
data = await r.json()

if data["retcode"] == -3102:
# Captcha triggered
aigis = json.loads(r.headers["x-rpc-aigis"])
aigis["data"] = json.loads(aigis["data"])
return aigis

if not data["data"]:
errors.raise_for_retcode(data)

# Parse cookies from headers
cookies: dict[str, str] = {}
for data in r.headers.items():
if data[0] == "Set-Cookie":
cookie_parser = SimpleCookie()
cookie_parser.load(data[1])
cookies.update({key: morsel.value for key, morsel in cookie_parser.items()})

self.set_cookies(cookies)
return cookies

async def _app_login(
self,
account: str,
Expand All @@ -95,8 +150,8 @@ async def _app_login(
headers["x-rpc-verify"] = json.dumps(ticket)

payload = {
"account": geetest_utility.encrypt_geetest_credentials(account),
"password": geetest_utility.encrypt_geetest_credentials(password),
"account": geetest_utility.encrypt_geetest_credentials(account, self._region),
"password": geetest_utility.encrypt_geetest_credentials(password, self._region),
}

async with aiohttp.ClientSession() as session:
Expand Down Expand Up @@ -199,7 +254,10 @@ async def login_with_password(
port: int = 5000,
tokenType: typing.Optional[int] = 6,
geetest_solver: typing.Optional[
typing.Callable[[typing.Dict[str, typing.Any]], typing.Awaitable[typing.Dict[str, typing.Any]]]
typing.Callable[
[typing.Dict[str, typing.Any]],
typing.Awaitable[typing.Dict[str, typing.Any]],
]
] = None,
) -> typing.Dict[str, str]:
"""Login with a password via web endpoint.
Expand All @@ -220,14 +278,48 @@ async def login_with_password(

return await self._web_login(account, password, tokenType=tokenType, geetest=geetest)

async def cn_login_by_password(
self,
account: str,
password: str,
*,
port: int = 5000,
geetest_solver: typing.Optional[
typing.Callable[
[typing.Dict[str, typing.Any]],
typing.Awaitable[typing.Dict[str, typing.Any]],
]
] = None,
) -> typing.Dict[str, str]:
"""Login with a password via Miyoushe loginByPassword endpoint.
Note that this will start a webserver if captcha is
triggered and `geetest_solver` is not passed.
"""
result = await self._cn_login_by_password(account, password)

if "session_id" not in result:
# Captcha not triggered
return result

if geetest_solver:
geetest = await geetest_solver(result)
else:
geetest = await server.solve_geetest(result, port=port)

return await self._cn_login_by_password(account, password, geetest=geetest)

async def login_with_app_password(
self,
account: str,
password: str,
*,
port: int = 5000,
geetest_solver: typing.Optional[
typing.Callable[[typing.Dict[str, typing.Any]], typing.Awaitable[typing.Dict[str, typing.Any]]]
typing.Callable[
[typing.Dict[str, typing.Any]],
typing.Awaitable[typing.Dict[str, typing.Any]],
]
] = None,
) -> typing.Dict[str, str]:
"""Login with a password via HoYoLab app endpoint.
Expand Down
2 changes: 2 additions & 0 deletions genshin/client/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"BBS_REFERER_URL",
"BBS_URL",
"CALCULATOR_URL",
"CN_LOGIN_URL",
"COMMUNITY_URL",
"COOKIE_V2_REFRESH_URL",
"DETAIL_LEDGER_URL",
Expand Down Expand Up @@ -217,6 +218,7 @@ def get_url(self, region: types.Region, game: types.Game) -> yarl.URL:

WEB_LOGIN_URL = Route("https://sg-public-api.hoyolab.com/account/ma-passport/api/webLoginByPassword")
APP_LOGIN_URL = Route("https://sg-public-api.hoyoverse.com/account/ma-passport/api/appLoginByPassword")
CN_LOGIN_URL = Route("https://passport-api.miyoushe.com/account/ma-cn-passport/web/loginByPassword")

SEND_VERIFICATION_CODE_URL = Route(
"https://sg-public-api.hoyoverse.com/account/ma-verifier/api/createEmailCaptchaByActionTicket"
Expand Down
29 changes: 26 additions & 3 deletions genshin/utility/geetest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
import json
import typing

from ..types import Region

__all__ = ["encrypt_geetest_credentials"]


# RSA key is the same for app and web login
LOGIN_KEY_CERT = b"""
OS_LOGIN_KEY_CERT = b"""
-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA4PMS2JVMwBsOIrYWRluY
wEiFZL7Aphtm9z5Eu/anzJ09nB00uhW+ScrDWFECPwpQto/GlOJYCUwVM/raQpAj
Expand All @@ -20,6 +22,15 @@
-----END PUBLIC KEY-----
"""

CN_LOGIN_KEY_CERT = b"""
-----BEGIN PUBLIC KEY-----
MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDDvekdPMHN3AYhm/vktJT+YJr7
cI5DcsNKqdsx5DZX0gDuWFuIjzdwButrIYPNmRJ1G8ybDIF7oDW2eEpm5sMbL9zs
9ExXCdvqrn51qELbqj0XxtMTIpaCHFSI50PfPpTFV9Xt/hmyVwokoOXFlAEgCn+Q
CgGs52bFoYMtyi+xEQIDAQAB
-----END PUBLIC KEY-----
"""

WEB_LOGIN_HEADERS = {
"x-rpc-app_id": "c9oqaq3s3gu8",
"x-rpc-client_type": "4",
Expand All @@ -38,6 +49,16 @@
# "x-rpc-device_id": "1c33337bd45c1bfs",
}

CN_LOGIN_HEADERS = {
"x-rpc-app_id": "bll8iq97cem8",
"x-rpc-client_type": "4",
"Origin": "https://user.miyoushe.com",
"Referer": "https://user.miyoushe.com/",
"x-rpc-source": "v2.webLogin",
"x-rpc-mi_referrer": "https://user.miyoushe.com/login-platform/index.html?app_id=bll8iq97cem8&theme=&token_type=4&game_biz=bbs_cn&message_origin=https%253A%252F%252Fwww.miyoushe.com&succ_back_type=message%253Alogin-platform%253Alogin-success&fail_back_type=message%253Alogin-platform%253Alogin-fail&ux_mode=popup&iframe_level=1#/login/password", # noqa: E501
"x-rpc-device_id": "586f2440-856a-4243-8076-2b0a12314197",
}

EMAIL_SEND_HEADERS = {
"x-rpc-app_id": "c9oqaq3s3gu8",
"x-rpc-client_type": "2",
Expand All @@ -49,11 +70,13 @@
}


def encrypt_geetest_credentials(text: str) -> str:
def encrypt_geetest_credentials(text: str, region: Region) -> str:
"""Encrypt text for geetest."""
import rsa

public_key = rsa.PublicKey.load_pkcs1_openssl_pem(LOGIN_KEY_CERT)
public_key = rsa.PublicKey.load_pkcs1_openssl_pem(
OS_LOGIN_KEY_CERT if region is Region.OVERSEAS else CN_LOGIN_KEY_CERT
)
crypto = rsa.encrypt(text.encode("utf-8"), public_key)
return base64.b64encode(crypto).decode("utf-8")

Expand Down

0 comments on commit df80d1c

Please sign in to comment.