From df80d1c6221e66165a8ba738efc3bdf059c1d2ec Mon Sep 17 00:00:00 2001 From: seriaati Date: Mon, 25 Mar 2024 16:52:08 +0800 Subject: [PATCH] feat: Add CN password login --- genshin/client/components/geetest/client.py | 104 ++++++++++++++++++-- genshin/client/routes.py | 2 + genshin/utility/geetest.py | 29 +++++- 3 files changed, 126 insertions(+), 9 deletions(-) diff --git a/genshin/client/components/geetest/client.py b/genshin/client/components/geetest/client.py index d5d794bd..282b81d4 100644 --- a/genshin/client/components/geetest/client.py +++ b/genshin/client/components/geetest/client.py @@ -2,6 +2,7 @@ import json import typing +from http.cookies import SimpleCookie import aiohttp import aiohttp.web @@ -39,8 +40,8 @@ async def _web_login( headers["x-rpc-aigis"] = geetest_utility.get_aigis_header(session_id, mmt_data) payload = { - "account": geetest_utility.encrypt_geetest_credentials(account), - "password": geetest_utility.encrypt_geetest_credentials(password), + "account": geetest_utility.encrypt_geetest_credentials(account, self._region), + "password": geetest_utility.encrypt_geetest_credentials(password, self._region), "token_type": tokenType, } @@ -69,6 +70,60 @@ async def _web_login( return cookies + async def _cn_login_by_password( + self, + account: str, + password: str, + *, + geetest: typing.Optional[typing.Dict[str, typing.Any]] = None, + ) -> typing.Dict[str, typing.Any]: + """ + Login with account and password using Miyoushe loginByPassword endpoint. + + Returns data from aigis header or cookies. + """ + headers = { + **geetest_utility.CN_LOGIN_HEADERS, + "ds": ds_utility.generate_dynamic_secret(constants.DS_SALT["cn_signin"]), + } + if geetest: + mmt_data = geetest["data"] + session_id = geetest["session_id"] + headers["x-rpc-aigis"] = geetest_utility.get_aigis_header(session_id, mmt_data) + + payload = { + "account": geetest_utility.encrypt_geetest_credentials(account, self._region), + "password": geetest_utility.encrypt_geetest_credentials(password, self._region), + } + + async with aiohttp.ClientSession() as session: + async with session.post( + routes.CN_LOGIN_URL.get_url(), + json=payload, + headers=headers, + ) as r: + data = await r.json() + + if data["retcode"] == -3102: + # Captcha triggered + aigis = json.loads(r.headers["x-rpc-aigis"]) + aigis["data"] = json.loads(aigis["data"]) + return aigis + + if not data["data"]: + errors.raise_for_retcode(data) + + # Parse cookies from headers + cookies: dict[str, str] = {} + for data in r.headers.items(): + if data[0] == "Set-Cookie": + cookie_parser = SimpleCookie() + cookie_parser.load(data[1]) + cookies.update({key: morsel.value for key, morsel in cookie_parser.items()}) + + self.set_cookies(cookies) + return cookies + async def _app_login( self, account: str, @@ -95,8 +150,8 @@ async def _app_login( headers["x-rpc-verify"] = json.dumps(ticket) payload = { - "account": geetest_utility.encrypt_geetest_credentials(account), - "password": geetest_utility.encrypt_geetest_credentials(password), + "account": geetest_utility.encrypt_geetest_credentials(account, self._region), + "password": geetest_utility.encrypt_geetest_credentials(password, self._region), } async with aiohttp.ClientSession() as session: @@ -199,7 +254,10 @@ async def login_with_password( port: int = 5000, tokenType: typing.Optional[int] = 6, geetest_solver: typing.Optional[ - typing.Callable[[typing.Dict[str, typing.Any]], typing.Awaitable[typing.Dict[str, typing.Any]]] + typing.Callable[ + [typing.Dict[str, typing.Any]], + typing.Awaitable[typing.Dict[str, typing.Any]], + ] ] = None, ) -> typing.Dict[str, str]: """Login with a password via web endpoint. @@ -220,6 +278,37 @@ async def login_with_password( return await self._web_login(account, password, tokenType=tokenType, geetest=geetest) + async def cn_login_by_password( + self, + account: str, + password: str, + *, + port: int = 5000, + geetest_solver: typing.Optional[ + typing.Callable[ + [typing.Dict[str, typing.Any]], + typing.Awaitable[typing.Dict[str, typing.Any]], + ] + ] = None, + ) -> typing.Dict[str, str]: + """Login with a password via Miyoushe loginByPassword endpoint. + + Note that this will start a webserver if captcha is + triggered and `geetest_solver` is not passed. + """ + result = await self._cn_login_by_password(account, password) + + if "session_id" not in result: + # Captcha not triggered + return result + + if geetest_solver: + geetest = await geetest_solver(result) + else: + geetest = await server.solve_geetest(result, port=port) + + return await self._cn_login_by_password(account, password, geetest=geetest) + async def login_with_app_password( self, account: str, @@ -227,7 +316,10 @@ async def login_with_app_password( *, port: int = 5000, geetest_solver: typing.Optional[ - typing.Callable[[typing.Dict[str, typing.Any]], typing.Awaitable[typing.Dict[str, typing.Any]]] + typing.Callable[ + [typing.Dict[str, typing.Any]], + typing.Awaitable[typing.Dict[str, typing.Any]], + ] ] = None, ) -> typing.Dict[str, str]: """Login with a password via HoYoLab app endpoint. diff --git a/genshin/client/routes.py b/genshin/client/routes.py index f61dea89..ca103bf3 100644 --- a/genshin/client/routes.py +++ b/genshin/client/routes.py @@ -13,6 +13,7 @@ "BBS_REFERER_URL", "BBS_URL", "CALCULATOR_URL", + "CN_LOGIN_URL", "COMMUNITY_URL", "COOKIE_V2_REFRESH_URL", "DETAIL_LEDGER_URL", @@ -217,6 +218,7 @@ def get_url(self, region: types.Region, game: types.Game) -> yarl.URL: WEB_LOGIN_URL = Route("https://sg-public-api.hoyolab.com/account/ma-passport/api/webLoginByPassword") APP_LOGIN_URL = Route("https://sg-public-api.hoyoverse.com/account/ma-passport/api/appLoginByPassword") +CN_LOGIN_URL = Route("https://passport-api.miyoushe.com/account/ma-cn-passport/web/loginByPassword") SEND_VERIFICATION_CODE_URL = Route( "https://sg-public-api.hoyoverse.com/account/ma-verifier/api/createEmailCaptchaByActionTicket" diff --git a/genshin/utility/geetest.py b/genshin/utility/geetest.py index ad8ed8bd..9d697a63 100644 --- a/genshin/utility/geetest.py +++ b/genshin/utility/geetest.py @@ -4,11 +4,13 @@ import json import typing +from ..types import Region + __all__ = ["encrypt_geetest_credentials"] # RSA key is the same for app and web login -LOGIN_KEY_CERT = b""" +OS_LOGIN_KEY_CERT = b""" -----BEGIN PUBLIC KEY----- MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA4PMS2JVMwBsOIrYWRluY wEiFZL7Aphtm9z5Eu/anzJ09nB00uhW+ScrDWFECPwpQto/GlOJYCUwVM/raQpAj @@ -20,6 +22,15 @@ -----END PUBLIC KEY----- """ +CN_LOGIN_KEY_CERT = b""" +-----BEGIN PUBLIC KEY----- +MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDDvekdPMHN3AYhm/vktJT+YJr7 +cI5DcsNKqdsx5DZX0gDuWFuIjzdwButrIYPNmRJ1G8ybDIF7oDW2eEpm5sMbL9zs +9ExXCdvqrn51qELbqj0XxtMTIpaCHFSI50PfPpTFV9Xt/hmyVwokoOXFlAEgCn+Q +CgGs52bFoYMtyi+xEQIDAQAB +-----END PUBLIC KEY----- +""" + WEB_LOGIN_HEADERS = { "x-rpc-app_id": "c9oqaq3s3gu8", "x-rpc-client_type": "4", @@ -38,6 +49,16 @@ # "x-rpc-device_id": "1c33337bd45c1bfs", } +CN_LOGIN_HEADERS = { + "x-rpc-app_id": "bll8iq97cem8", + "x-rpc-client_type": "4", + "Origin": "https://user.miyoushe.com", + "Referer": "https://user.miyoushe.com/", + "x-rpc-source": "v2.webLogin", + "x-rpc-mi_referrer": "https://user.miyoushe.com/login-platform/index.html?app_id=bll8iq97cem8&theme=&token_type=4&game_biz=bbs_cn&message_origin=https%253A%252F%252Fwww.miyoushe.com&succ_back_type=message%253Alogin-platform%253Alogin-success&fail_back_type=message%253Alogin-platform%253Alogin-fail&ux_mode=popup&iframe_level=1#/login/password", # noqa: E501 + "x-rpc-device_id": "586f2440-856a-4243-8076-2b0a12314197", +} + EMAIL_SEND_HEADERS = { "x-rpc-app_id": "c9oqaq3s3gu8", "x-rpc-client_type": "2", @@ -49,11 +70,13 @@ } -def encrypt_geetest_credentials(text: str) -> str: +def encrypt_geetest_credentials(text: str, region: Region) -> str: """Encrypt text for geetest.""" import rsa - public_key = rsa.PublicKey.load_pkcs1_openssl_pem(LOGIN_KEY_CERT) + public_key = rsa.PublicKey.load_pkcs1_openssl_pem( + OS_LOGIN_KEY_CERT if region is Region.OVERSEAS else CN_LOGIN_KEY_CERT + ) crypto = rsa.encrypt(text.encode("utf-8"), public_key) return base64.b64encode(crypto).decode("utf-8")