diff --git a/genshin/__main__.py b/genshin/__main__.py index fd047c9a..4e560898 100644 --- a/genshin/__main__.py +++ b/genshin/__main__.py @@ -338,8 +338,8 @@ def authkey() -> None: async def login(account: str, password: str, port: int) -> None: """Login with a password.""" client = genshin.Client() - cookies = await client.login_with_password(account, password, port=port) - cookies = await genshin.complete_cookies(cookies) + result = await client.os_login_with_password(account, password, port=port) + cookies = await genshin.complete_cookies(result.model_dump()) base: http.cookies.BaseCookie[str] = http.cookies.BaseCookie(cookies) click.echo(f"Your cookies are: {click.style(base.output(header='', sep=';'), bold=True)}") diff --git a/genshin/client/clients.py b/genshin/client/clients.py index 87e8aa9d..5162547d 100644 --- a/genshin/client/clients.py +++ b/genshin/client/clients.py @@ -6,7 +6,7 @@ daily, diary, gacha, - geetest, + auth, hoyolab, lineup, teapot, @@ -28,6 +28,6 @@ class Client( wiki.WikiClient, gacha.WishClient, transaction.TransactionClient, - geetest.GeetestClient, + auth.AuthClient, ): """A simple HTTP client for API endpoints.""" diff --git a/genshin/client/components/auth/__init__.py b/genshin/client/components/auth/__init__.py new file mode 100644 index 00000000..7e9ded02 --- /dev/null +++ b/genshin/client/components/auth/__init__.py @@ -0,0 +1,10 @@ +"""Auth-related utility. + +Credits to: +- JokelBaf - https://github.com/jokelbaf +- Seria - https://github.com/seriaati +- M-307 - https://github.com/mrwan200 +- gsuid_core - https://github.com/Genshin-bots/gsuid_core +""" + +from .client import * diff --git a/genshin/client/components/auth/client.py b/genshin/client/components/auth/client.py new file mode 100644 index 00000000..0bf323b4 --- /dev/null +++ b/genshin/client/components/auth/client.py @@ -0,0 +1,290 @@ +"""Main auth client.""" + +import asyncio +import logging +import typing + +import aiohttp +import qrcode +from qrcode import constants as qrcode_constants +from qrcode.image.pil import PilImage + +from genshin import errors, types +from genshin.client import routes +from genshin.client.components import base +from genshin.client.manager import managers +from genshin.client.manager.cookie import fetch_cookie_token_with_game_token, fetch_stoken_with_game_token +from genshin.models.auth.cookie import ( + AppLoginResult, + CNWebLoginResult, + GameLoginResult, + MobileLoginResult, + QRLoginResult, + WebLoginResult, +) +from genshin.models.auth.geetest import MMT, RiskyCheckMMT, RiskyCheckMMTResult, SessionMMT, SessionMMTResult +from genshin.models.auth.qrcode import QRCodeStatus +from genshin.models.auth.verification import ActionTicket +from genshin.types import Game +from genshin.utility import auth as auth_utility +from genshin.utility import ds as ds_utility + +from . import server, subclients + +__all__ = ["AuthClient"] + +LOGGER_ = logging.getLogger(__name__) + + +class AuthClient(subclients.AppAuthClient, subclients.WebAuthClient, subclients.GameAuthClient): + """Auth client component.""" + + @base.region_specific(types.Region.OVERSEAS) + async def os_login_with_password( + self, + account: str, + password: str, + *, + port: int = 5000, + token_type: typing.Optional[int] = 6, + geetest_solver: typing.Optional[typing.Callable[[SessionMMT], typing.Awaitable[SessionMMTResult]]] = None, + ) -> WebLoginResult: + """Login with a password via web endpoint. + + Note that this will start a webserver if captcha is + triggered and `geetest_solver` is not passed. + """ + result = await self._os_web_login(account, password, token_type=token_type) + + if not isinstance(result, SessionMMT): + # Captcha not triggered + return result + + if geetest_solver: + mmt_result = await geetest_solver(result) + else: + mmt_result = await server.solve_geetest(result, port=port) + + return await self._os_web_login(account, password, token_type=token_type, mmt_result=mmt_result) + + @base.region_specific(types.Region.CHINESE) + async def cn_login_with_password( + self, + account: str, + password: str, + *, + port: int = 5000, + geetest_solver: typing.Optional[typing.Callable[[SessionMMT], typing.Awaitable[SessionMMTResult]]] = None, + ) -> CNWebLoginResult: + """Login with a password via Miyoushe loginByPassword endpoint. + + Note that this will start a webserver if captcha is + triggered and `geetest_solver` is not passed. + """ + result = await self._cn_web_login(account, password) + + if not isinstance(result, SessionMMT): + # Captcha not triggered + return result + + if geetest_solver: + mmt_result = await geetest_solver(result) + else: + mmt_result = await server.solve_geetest(result, port=port) + + return await self._cn_web_login(account, password, mmt_result=mmt_result) + + @base.region_specific(types.Region.OVERSEAS) + async def check_mobile_number_validity(self, mobile: str) -> bool: + """Check if a mobile number is valid (it's registered on Miyoushe). + + Returns True if the mobile number is valid, False otherwise. + """ + async with aiohttp.ClientSession() as session: + async with session.get( + routes.CHECK_MOBILE_VALIDITY_URL.get_url(), + params={"mobile": mobile}, + ) as r: + data = await r.json() + + return data["data"]["status"] != data["data"]["is_registable"] + + @base.region_specific(types.Region.CHINESE) + async def login_with_mobile_number( + self, + mobile: str, + *, + port: int = 5000, + ) -> MobileLoginResult: + """Login with mobile number, returns cookies. + + Only works for Chinese region (Miyoushe) users, do not include + area code (+86) in the mobile number. + + Steps: + 1. Sends OTP to the provided mobile number. + 2. If captcha is triggered, prompts the user to solve it. + 3. Lets user enter the OTP. + 4. Logs in with the OTP. + 5. Returns cookies. + """ + result = await self._send_mobile_otp(mobile) + + if isinstance(result, SessionMMT): + # Captcha triggered + mmt_result = await server.solve_geetest(result, port=port) + await self._send_mobile_otp(mobile, mmt_result=mmt_result) + + otp = await server.enter_code(port=port) + return await self._login_with_mobile_otp(mobile, otp) + + @base.region_specific(types.Region.OVERSEAS) + async def login_with_app_password( + self, + account: str, + password: str, + *, + port: int = 5000, + geetest_solver: typing.Optional[typing.Callable[[SessionMMT], typing.Awaitable[SessionMMTResult]]] = None, + ) -> AppLoginResult: + """Login with a password via HoYoLab app endpoint. + + Note that this will start a webserver if either of the + following happens: + + 1. Captcha is triggered and `geetest_solver` is not passed. + 2. Email verification is triggered (can happen if you + first login with a new device). + """ + result = await self._app_login(account, password) + + if isinstance(result, SessionMMT): + # Captcha triggered + if geetest_solver: + mmt_result = await geetest_solver(result) + else: + mmt_result = await server.solve_geetest(result, port=port) + + result = await self._app_login(account, password, mmt_result=mmt_result) + + if isinstance(result, ActionTicket): + # Email verification required + mmt = await self._send_verification_email(result) + if mmt: + if geetest_solver: + mmt_result = await geetest_solver(mmt) + else: + mmt_result = await server.solve_geetest(mmt, port=port) + + await asyncio.sleep(2) # Add delay to prevent [-3206] + await self._send_verification_email(result, mmt_result=mmt_result) + + code = await server.enter_code(port=port) + await self._verify_email(code, result) + + result = await self._app_login(account, password, ticket=result) + + return result + + @base.region_specific(types.Region.CHINESE) + async def login_with_qrcode(self) -> QRLoginResult: + """Login with QR code, only available for Miyoushe users.""" + creation_result = await self._create_qrcode() + qrc: PilImage = qrcode.make( # type: ignore + creation_result.url, error_correction=qrcode_constants.ERROR_CORRECT_L + ) + qrc.show() + + scanned = False + while True: + check_result = await self._check_qrcode( + creation_result.app_id, creation_result.device_id, creation_result.ticket + ) + if check_result.status == QRCodeStatus.SCANNED and not scanned: + LOGGER_.info("QR code scanned") + scanned = True + elif check_result.status == QRCodeStatus.CONFIRMED: + LOGGER_.info("QR code login confirmed") + break + + await asyncio.sleep(2) + + raw_data = check_result.payload.raw + assert raw_data is not None + + cookie_token = await fetch_cookie_token_with_game_token( + game_token=raw_data.game_token, account_id=raw_data.account_id + ) + stoken = await fetch_stoken_with_game_token(game_token=raw_data.game_token, account_id=int(raw_data.account_id)) + + cookies = { + "stoken": stoken.token, + "stuid": stoken.aid, + "mid": stoken.mid, + "cookie_token": cookie_token, + } + self.set_cookies(cookies) + return QRLoginResult(**cookies) + + @base.region_specific(types.Region.CHINESE) + @managers.no_multi + async def create_mmt(self) -> MMT: + """Create a geetest challenge.""" + is_genshin = self.game is Game.GENSHIN + headers = { + "DS": ds_utility.generate_create_geetest_ds(), + "x-rpc-challenge_game": "2" if is_genshin else "6", + "x-rpc-page": "v4.1.5-ys_#ys" if is_genshin else "v1.4.1-rpg_#/rpg", + "x-rpc-tool-verison": "v4.1.5-ys" if is_genshin else "v1.4.1-rpg", + **auth_utility.CREATE_MMT_HEADERS, + } + + assert isinstance(self.cookie_manager, managers.CookieManager) + async with self.cookie_manager.create_session() as session: + async with session.get( + routes.CREATE_MMT_URL.get_url(), headers=headers, cookies=self.cookie_manager.cookies + ) as r: + data = await r.json() + + if not data["data"]: + errors.raise_for_retcode(data) + + return MMT(**data["data"]) + + @base.region_specific(types.Region.OVERSEAS) + async def os_game_login( + self, + account: str, + password: str, + *, + port: int = 5000, + geetest_solver: typing.Optional[typing.Callable[[RiskyCheckMMT], typing.Awaitable[RiskyCheckMMTResult]]] = None, + ) -> GameLoginResult: + """Perform a login to the game.""" + password = auth_utility.encrypt_geetest_credentials(password, 2) + result = await self._shield_login(account, password) + + if isinstance(result, RiskyCheckMMT): + if geetest_solver: + mmt_result = await geetest_solver(result) + else: + mmt_result = await server.solve_geetest(result, port=port) + + result = await self._shield_login(account, password, mmt_result=mmt_result) + + if not result.device_grant_required: + return await self._os_game_login(result.account.uid, result.account.token) + + mmt = await self._send_game_verification_email(result.account.device_grant_ticket) + if mmt: + if geetest_solver: + mmt_result = await geetest_solver(mmt) + else: + mmt_result = await server.solve_geetest(mmt, port=port) + + await self._send_game_verification_email(result.account.device_grant_ticket, mmt_result=mmt_result) + + code = await server.enter_code() + verification_result = await self._verify_game_email(code, result.account.device_grant_ticket) + + return await self._os_game_login(result.account.uid, verification_result.game_token) diff --git a/genshin/client/components/auth/server.py b/genshin/client/components/auth/server.py new file mode 100644 index 00000000..9cebb265 --- /dev/null +++ b/genshin/client/components/auth/server.py @@ -0,0 +1,310 @@ +"""Aiohttp webserver used for captcha solving and verification.""" + +from __future__ import annotations + +import asyncio +import typing +import webbrowser + +import aiohttp +from aiohttp import web + +from genshin.models.auth.geetest import ( + MMT, + MMTv4, + RiskyCheckMMT, + MMTResult, + MMTv4Result, + SessionMMT, + SessionMMTv4, + SessionMMTResult, + SessionMMTv4Result, + RiskyCheckMMTResult, +) +from genshin.utility import auth as auth_utility + +__all__ = ["PAGES", "enter_code", "launch_webapp", "solve_geetest"] + +PAGES: typing.Final[typing.Dict[typing.Literal["captcha", "captcha-v4", "enter-code"], str]] = { + "captcha": """ + + + + + + + """, + "enter-code": """ + + + + + + + + + """, +} + + +GT_V3_URL = "https://static.geetest.com/static/js/gt.0.5.0.js" +GT_V4_URL = "https://static.geetest.com/v4/gt4.js" + + +@typing.overload +async def launch_webapp( + page: typing.Literal["captcha"], + *, + mmt: typing.Union[MMT, MMTv4, SessionMMT, SessionMMTv4, RiskyCheckMMT], + lang: str = ..., + api_server: str = ..., + proxy_geetest: bool = ..., + port: int = ..., +) -> typing.Union[MMTResult, MMTv4Result, SessionMMTResult, SessionMMTv4Result, RiskyCheckMMTResult]: ... +@typing.overload +async def launch_webapp( + page: typing.Literal["enter-code"], + *, + mmt: None = ..., + lang: None = ..., + api_server: None = ..., + proxy_geetest: None = ..., + port: int = ..., +) -> str: ... +async def launch_webapp( + page: typing.Literal["captcha", "enter-code"], + *, + mmt: typing.Optional[typing.Union[MMT, MMTv4, SessionMMT, SessionMMTv4, RiskyCheckMMT]] = None, + lang: typing.Optional[str] = None, + api_server: typing.Optional[str] = None, + proxy_geetest: typing.Optional[bool] = None, + port: int = 5000, +) -> typing.Union[MMTResult, MMTv4Result, SessionMMTResult, SessionMMTv4Result, RiskyCheckMMTResult, str]: + """Create and run a webapp to solve captcha or enter a verification code.""" + routes = web.RouteTableDef() + future: asyncio.Future[typing.Any] = asyncio.Future() + + @routes.get("/") + async def index(request: web.Request) -> web.StreamResponse: + body = PAGES[page] + body = body.replace("{gt_version}", "4" if isinstance(mmt, MMTv4) else "3") + body = body.replace("{api_server}", api_server or "api-na.geetest.com") + body = body.replace("{proxy_geetest}", str(proxy_geetest or False).lower()) + body = body.replace("{lang}", lang or "en") + return web.Response(body=body, content_type="text/html") + + @routes.get("/gt/{version}.js") + async def gt(request: web.Request) -> web.StreamResponse: + version = request.match_info.get("version", "v3") + gt_url = GT_V4_URL if version == "v4" else GT_V3_URL + + async with aiohttp.ClientSession() as session: + r = await session.get(gt_url) + content = await r.read() + + return web.Response(body=content, content_type="text/javascript") + + @routes.get("/mmt") + async def mmt_endpoint(request: web.Request) -> web.Response: + return web.json_response(mmt.model_dump() if mmt else {}) + + @routes.post("/send-data") + async def send_data_endpoint(request: web.Request) -> web.Response: + result = await request.json() + if "code" in result: + result = result["code"] + else: + if isinstance(mmt, RiskyCheckMMT): + result = RiskyCheckMMTResult(**result) + elif isinstance(mmt, SessionMMT): + result = SessionMMTResult(**result) + elif isinstance(mmt, SessionMMTv4): + result = SessionMMTv4Result(**result) + elif isinstance(mmt, MMT): + result = MMTResult(**result) + elif isinstance(mmt, MMTv4): + result = MMTv4Result(**result) + + future.set_result(result) + return web.Response(status=204) + + @routes.get("/proxy") + async def proxy(request: web.Request) -> web.Response: + params = dict(request.query) + url = params.pop("url", None) + if not url: + return web.Response(status=400) + + async with aiohttp.ClientSession() as session: + async with session.get(url, params=params) as r: + content = await r.read() + + return web.Response(body=content, status=r.status, content_type="text/javascript") + + app = web.Application() + app.add_routes(routes) + + runner = web.AppRunner(app) + await runner.setup() + + site = web.TCPSite(runner, host="localhost", port=port) + print(f"Opening http://localhost:{port} in browser...") # noqa + webbrowser.open_new_tab(f"http://localhost:{port}") + + await site.start() + + try: + data = await future + finally: + await asyncio.sleep(0.3) + await runner.shutdown() + await runner.cleanup() + + return data + + +@typing.overload +async def solve_geetest( + mmt: RiskyCheckMMT, + *, + lang: str = ..., + api_server: str = ..., + proxy_geetest: bool = ..., + port: int = ..., +) -> RiskyCheckMMTResult: ... +@typing.overload +async def solve_geetest( + mmt: SessionMMT, + *, + lang: str = ..., + api_server: str = ..., + proxy_geetest: bool = ..., + port: int = ..., +) -> SessionMMTResult: ... +@typing.overload +async def solve_geetest( + mmt: MMT, + *, + lang: str = ..., + api_server: str = ..., + proxy_geetest: bool = ..., + port: int = ..., +) -> MMTResult: ... +@typing.overload +async def solve_geetest( + mmt: SessionMMTv4, + *, + lang: str = ..., + api_server: str = ..., + proxy_geetest: bool = ..., + port: int = ..., +) -> SessionMMTv4Result: ... +@typing.overload +async def solve_geetest( + mmt: MMTv4, + *, + lang: str = ..., + api_server: str = ..., + proxy_geetest: bool = ..., + port: int = ..., +) -> MMTv4Result: ... +async def solve_geetest( + mmt: typing.Union[MMT, MMTv4, SessionMMT, SessionMMTv4, RiskyCheckMMT], + *, + lang: str = "en-us", + api_server: str = "api-na.geetest.com", + proxy_geetest: bool = False, + port: int = 5000, +) -> typing.Union[MMTResult, MMTv4Result, SessionMMTResult, SessionMMTv4Result, RiskyCheckMMTResult]: + """Start a web server and manually solve geetest captcha.""" + lang = auth_utility.lang_to_geetest_lang(lang) + return await launch_webapp( + "captcha", + mmt=mmt, + lang=lang, + api_server=api_server, + proxy_geetest=proxy_geetest, + port=port, + ) + + +async def enter_code(*, port: int = 5000) -> str: + """Get email or phone number verification code from user.""" + return await launch_webapp("enter-code", port=port) diff --git a/genshin/client/components/auth/subclients/__init__.py b/genshin/client/components/auth/subclients/__init__.py new file mode 100644 index 00000000..566790b5 --- /dev/null +++ b/genshin/client/components/auth/subclients/__init__.py @@ -0,0 +1,5 @@ +"""Sub clients for AuthClient. Separated by functionality.""" + +from .app import * +from .game import * +from .web import * diff --git a/genshin/client/components/auth/subclients/app.py b/genshin/client/components/auth/subclients/app.py new file mode 100644 index 00000000..446f0196 --- /dev/null +++ b/genshin/client/components/auth/subclients/app.py @@ -0,0 +1,221 @@ +"""App sub client for AuthClient. + +Covers HoYoLAB and Miyoushe app auth endpoints. +""" + +import json +import random +import typing +from string import ascii_letters, digits + +import aiohttp + +from genshin import constants, errors +from genshin.client import routes +from genshin.client.components import base +from genshin.models.auth.cookie import AppLoginResult +from genshin.models.auth.geetest import SessionMMT, SessionMMTResult +from genshin.models.auth.qrcode import QRCodeCheckResult, QRCodeCreationResult +from genshin.models.auth.verification import ActionTicket +from genshin.utility import auth as auth_utility +from genshin.utility import ds as ds_utility + +__all__ = ["AppAuthClient"] + + +class AppAuthClient(base.BaseClient): + """App sub client for AuthClient.""" + + @typing.overload + async def _app_login( # noqa: D102 missing docstring in overload? + self, + account: str, + password: str, + *, + mmt_result: SessionMMTResult, + ticket: None = ..., + ) -> typing.Union[AppLoginResult, ActionTicket]: ... + + @typing.overload + async def _app_login( # noqa: D102 missing docstring in overload? + self, + account: str, + password: str, + *, + mmt_result: None = ..., + ticket: ActionTicket, + ) -> AppLoginResult: ... + + @typing.overload + async def _app_login( # noqa: D102 missing docstring in overload? + self, + account: str, + password: str, + *, + mmt_result: None = ..., + ticket: None = ..., + ) -> typing.Union[AppLoginResult, SessionMMT, ActionTicket]: ... + + async def _app_login( + self, + account: str, + password: str, + *, + mmt_result: typing.Optional[SessionMMTResult] = None, + ticket: typing.Optional[ActionTicket] = None, + ) -> typing.Union[AppLoginResult, SessionMMT, ActionTicket]: + """Login with a password using HoYoLab app endpoint. + + Returns + ------- + - Cookies if login is successful. + - SessionMMT if captcha is triggered. + - ActionTicket if email verification is required. + """ + headers = { + **auth_utility.APP_LOGIN_HEADERS, + "ds": ds_utility.generate_dynamic_secret(constants.DS_SALT["app_login"]), + } + if mmt_result: + headers["x-rpc-aigis"] = mmt_result.to_aigis_header() + + if ticket: + headers["x-rpc-verify"] = ticket.to_rpc_verify_header() + + payload = { + "account": auth_utility.encrypt_geetest_credentials(account, 1), + "password": auth_utility.encrypt_geetest_credentials(password, 1), + } + + async with aiohttp.ClientSession() as session: + async with session.post( + routes.APP_LOGIN_URL.get_url(), + json=payload, + headers=headers, + ) as r: + data = await r.json() + + if data["retcode"] == -3101: + # Captcha triggered + aigis = json.loads(r.headers["x-rpc-aigis"]) + return SessionMMT(**aigis) + + if data["retcode"] == -3239: + # Email verification required + action_ticket = json.loads(r.headers["x-rpc-verify"]) + return ActionTicket(**action_ticket) + + if not data["data"]: + errors.raise_for_retcode(data) + + cookies = { + "stoken": data["data"]["token"]["token"], + "ltuid_v2": data["data"]["user_info"]["aid"], + "ltmid_v2": data["data"]["user_info"]["mid"], + "account_id_v2": data["data"]["user_info"]["aid"], + "account_mid_v2": data["data"]["user_info"]["mid"], + } + self.set_cookies(cookies) + + return AppLoginResult(**cookies) + + async def _send_verification_email( + self, + ticket: ActionTicket, + *, + mmt_result: typing.Optional[SessionMMTResult] = None, + ) -> typing.Union[None, SessionMMT]: + """Send verification email. + + Returns None if success, SessionMMT data if geetest triggered. + """ + headers = {**auth_utility.EMAIL_SEND_HEADERS} + if mmt_result: + headers["x-rpc-aigis"] = mmt_result.to_aigis_header() + + async with aiohttp.ClientSession() as session: + async with session.post( + routes.SEND_VERIFICATION_CODE_URL.get_url(), + json={ + "action_type": "verify_for_component", + "action_ticket": ticket.verify_str.ticket, + }, + headers=headers, + ) as r: + data = await r.json() + + if data["retcode"] == -3101: + # Captcha triggered + aigis = json.loads(r.headers["x-rpc-aigis"]) + return SessionMMT(**aigis) + + if data["retcode"] != 0: + errors.raise_for_retcode(data) + + return None + + async def _verify_email(self, code: str, ticket: ActionTicket) -> None: + """Verify email.""" + async with aiohttp.ClientSession() as session: + async with session.post( + routes.VERIFY_EMAIL_URL.get_url(), + json={ + "action_type": "verify_for_component", + "action_ticket": ticket.verify_str.ticket, + "email_captcha": code, + "verify_method": 2, + }, + headers=auth_utility.EMAIL_VERIFY_HEADERS, + ) as r: + data = await r.json() + + if data["retcode"] != 0: + errors.raise_for_retcode(data) + + return None + + async def _create_qrcode(self) -> QRCodeCreationResult: + """Create a QR code for login.""" + if self.default_game is None: + raise RuntimeError("No default game set.") + + app_id = constants.APP_IDS[self.default_game][self.region] + device_id = "".join(random.choices(ascii_letters + digits, k=64)) + + async with aiohttp.ClientSession() as session: + async with session.post( + routes.CREATE_QRCODE_URL.get_url(), + json={"app_id": app_id, "device": device_id}, + ) as r: + data = await r.json() + + if not data["data"]: + errors.raise_for_retcode(data) + + url: str = data["data"]["url"] + return QRCodeCreationResult( + app_id=app_id, + ticket=url.split("ticket=")[1], + device_id=device_id, + url=url, + ) + + async def _check_qrcode(self, app_id: str, device_id: str, ticket: str) -> QRCodeCheckResult: + """Check the status of a QR code login.""" + payload = { + "app_id": app_id, + "device": device_id, + "ticket": ticket, + } + + async with aiohttp.ClientSession() as session: + async with session.post( + routes.CHECK_QRCODE_URL.get_url(), + json=payload, + ) as r: + data = await r.json() + + if not data["data"]: + errors.raise_for_retcode(data) + + return QRCodeCheckResult(**data["data"]) diff --git a/genshin/client/components/auth/subclients/game.py b/genshin/client/components/auth/subclients/game.py new file mode 100644 index 00000000..03406963 --- /dev/null +++ b/genshin/client/components/auth/subclients/game.py @@ -0,0 +1,186 @@ +"""Game sub client for AuthClient. + +Covers OS and CN game auth endpoints. +""" + +import json +import typing + +import aiohttp + +from genshin import constants, errors, types +from genshin.client import routes +from genshin.client.components import base +from genshin.models.auth.geetest import RiskyCheckMMT, RiskyCheckResult, RiskyCheckMMTResult +from genshin.models.auth.responses import ShieldLoginResponse +from genshin.models.auth.cookie import DeviceGrantResult, GameLoginResult +from genshin.utility import auth as auth_utility + +__all__ = ["GameAuthClient"] + + +class GameAuthClient(base.BaseClient): + """Game sub client for AuthClient.""" + + async def _risky_check( + self, action_type: str, api_name: str, *, username: typing.Optional[str] = None + ) -> RiskyCheckResult: + """Check if the given action (endpoint) is risky (whether captcha verification is required).""" + payload = {"action_type": action_type, "api_name": api_name} + if username: + payload["username"] = username + + async with aiohttp.ClientSession() as session: + async with session.post( + routes.GAME_RISKY_CHECK_URL.get_url(self.region), json=payload, headers=auth_utility.RISKY_CHECK_HEADERS + ) as r: + data = await r.json() + + if not data["data"]: + errors.raise_for_retcode(data) + + return RiskyCheckResult(**data["data"]) + + @typing.overload + async def _shield_login( # noqa: D102 missing docstring in overload? + self, + account: str, + password: str, + *, + mmt_result: RiskyCheckMMTResult, + ) -> ShieldLoginResponse: ... + + @typing.overload + async def _shield_login( # noqa: D102 missing docstring in overload? + self, + account: str, + password: str, + *, + mmt_result: None = ..., + ) -> typing.Union[ShieldLoginResponse, RiskyCheckMMT]: ... + + async def _shield_login( + self, account: str, password: str, *, mmt_result: typing.Optional[RiskyCheckMMTResult] = None + ) -> typing.Union[ShieldLoginResponse, RiskyCheckMMT]: + """Log in with the given account and password. + + Returns MMT if geetest verification is required. + """ + if self.default_game is None: + raise ValueError("No default game set.") + + headers = auth_utility.SHIELD_LOGIN_HEADERS.copy() + if mmt_result: + headers["x-rpc-risky"] = mmt_result.to_rpc_risky() + else: + # Check if geetest is required + check_result = await self._risky_check("login", "/shield/api/login", username=account) + if check_result.mmt: + return check_result.to_mmt() + else: + headers["x-rpc-risky"] = auth_utility.generate_risky_header(check_result.id) + + payload = {"account": account, "password": password, "is_crypto": True} + async with aiohttp.ClientSession() as session: + async with session.post( + routes.SHIELD_LOGIN_URL.get_url(self.region, self.default_game), json=payload, headers=headers + ) as r: + data = await r.json() + + if not data["data"]: + errors.raise_for_retcode(data) + + return ShieldLoginResponse(**data["data"]) + + @typing.overload + async def _send_game_verification_email( # noqa: D102 missing docstring in overload? + self, + action_ticket: str, + *, + mmt_result: RiskyCheckMMTResult, + ) -> None: ... + + @typing.overload + async def _send_game_verification_email( # noqa: D102 missing docstring in overload? + self, + action_ticket: str, + *, + mmt_result: None = ..., + ) -> typing.Union[None, RiskyCheckMMT]: ... + + async def _send_game_verification_email( + self, action_ticket: str, *, mmt_result: typing.Optional[RiskyCheckMMTResult] = None + ) -> typing.Union[None, RiskyCheckMMT]: + """Send email verification code. + + Returns `None` if success, `RiskyCheckMMT` if geetest verification is required. + """ + headers = auth_utility.GRANT_TICKET_HEADERS.copy() + if mmt_result: + headers["x-rpc-risky"] = mmt_result.to_rpc_risky() + else: + # Check if geetest is required + check_result = await self._risky_check("device_grant", "/device/api/preGrantByTicket") + if check_result.mmt: + return check_result.to_mmt() + else: + headers["x-rpc-risky"] = auth_utility.generate_risky_header(check_result.id) + + payload = { + "way": "Way_Email", + "action_ticket": action_ticket, + "device": { + "device_model": "iPhone15,4", + "device_id": auth_utility.DEVICE_ID, + "client": 1, + "device_name": "iPhone", + }, + } + async with aiohttp.ClientSession() as session: + async with session.post( + routes.PRE_GRANT_TICKET_URL.get_url(self.region), json=payload, headers=headers + ) as r: + data = await r.json() + + if data["retcode"] != 0: + errors.raise_for_retcode(data) + + return None + + async def _verify_game_email(self, code: str, action_ticket: str) -> DeviceGrantResult: + """Verify the email code.""" + payload = {"code": code, "ticket": action_ticket} + async with aiohttp.ClientSession() as session: + async with session.post( + routes.DEVICE_GRANT_URL.get_url(self.region), json=payload, headers=auth_utility.GRANT_TICKET_HEADERS + ) as r: + data = await r.json() + + return DeviceGrantResult(**data["data"]) + + @base.region_specific(types.Region.OVERSEAS) + async def _os_game_login(self, uid: str, game_token: str) -> GameLoginResult: + """Log in to the game.""" + if self.default_game is None: + raise ValueError("No default game set.") + + payload = { + "channel_id": 1, + "device": auth_utility.DEVICE_ID, + "app_id": constants.APP_IDS[self.default_game][self.region], + } + payload["data"] = json.dumps({"uid": uid, "token": game_token, "guest": False}) + payload["sign"] = auth_utility.generate_sign(payload, constants.APP_KEYS[self.default_game][self.region]) + + async with aiohttp.ClientSession() as session: + async with session.post( + routes.GAME_LOGIN_URL.get_url(self.region, self.default_game), + json=payload, + headers=auth_utility.GAME_LOGIN_HEADERS, + ) as r: + data = await r.json() + + if not data["data"]: + errors.raise_for_retcode(data) + + return GameLoginResult(**data["data"]) diff --git a/genshin/client/components/auth/subclients/web.py b/genshin/client/components/auth/subclients/web.py new file mode 100644 index 00000000..76201b2c --- /dev/null +++ b/genshin/client/components/auth/subclients/web.py @@ -0,0 +1,225 @@ +"""Web sub client for AuthClient. + +Covers HoYoLAB and Miyoushe web auth endpoints. +""" + +import json +import typing + +import aiohttp + +from genshin import constants, errors, types +from genshin.client import routes +from genshin.client.components import base +from genshin.models.auth.cookie import CNWebLoginResult, MobileLoginResult, WebLoginResult +from genshin.models.auth.geetest import SessionMMT, SessionMMTResult +from genshin.utility import auth as auth_utility +from genshin.utility import ds as ds_utility + +__all__ = ["WebAuthClient"] + + +class WebAuthClient(base.BaseClient): + """Web sub client for AuthClient.""" + + @typing.overload + async def _os_web_login( # noqa: D102 missing docstring in overload? + self, + account: str, + password: str, + *, + token_type: typing.Optional[int] = ..., + mmt_result: SessionMMTResult, + ) -> WebLoginResult: ... + + @typing.overload + async def _os_web_login( # noqa: D102 missing docstring in overload? + self, + account: str, + password: str, + *, + token_type: typing.Optional[int] = ..., + mmt_result: None = ..., + ) -> typing.Union[SessionMMT, WebLoginResult]: ... + + @base.region_specific(types.Region.OVERSEAS) + async def _os_web_login( + self, + account: str, + password: str, + *, + token_type: typing.Optional[int] = 6, + mmt_result: typing.Optional[SessionMMTResult] = None, + ) -> typing.Union[SessionMMT, WebLoginResult]: + """Login with a password using web endpoint. + + Returns either data from aigis header or cookies. + """ + headers = {**auth_utility.WEB_LOGIN_HEADERS} + if mmt_result: + headers["x-rpc-aigis"] = mmt_result.to_aigis_header() + + payload = { + "account": auth_utility.encrypt_geetest_credentials(account, 1), + "password": auth_utility.encrypt_geetest_credentials(password, 1), + "token_type": token_type, + } + + async with aiohttp.ClientSession() as session: + async with session.post( + routes.WEB_LOGIN_URL.get_url(), + json=payload, + headers=headers, + ) as r: + data = await r.json() + cookies = {cookie.key: cookie.value for cookie in r.cookies.values()} + + if data["retcode"] == -3101: + # Captcha triggered + aigis = json.loads(r.headers["x-rpc-aigis"]) + return SessionMMT(**aigis) + + if not data["data"]: + errors.raise_for_retcode(data) + + if data["data"].get("stoken"): + cookies["stoken"] = data["data"]["stoken"] + + self.set_cookies(cookies) + return WebLoginResult(**cookies) + + @typing.overload + async def _cn_web_login( # noqa: D102 missing docstring in overload? + self, + account: str, + password: str, + *, + mmt_result: SessionMMTResult, + ) -> CNWebLoginResult: ... + + @typing.overload + async def _cn_web_login( # noqa: D102 missing docstring in overload? + self, + account: str, + password: str, + *, + mmt_result: None = ..., + ) -> typing.Union[SessionMMT, CNWebLoginResult]: ... + + @base.region_specific(types.Region.CHINESE) + async def _cn_web_login( + self, + account: str, + password: str, + *, + mmt_result: typing.Optional[SessionMMTResult] = None, + ) -> typing.Union[SessionMMT, CNWebLoginResult]: + """ + Login with account and password using Miyoushe loginByPassword endpoint. + + Returns data from aigis header or cookies. + """ + headers = { + **auth_utility.CN_LOGIN_HEADERS, + "ds": ds_utility.generate_dynamic_secret(constants.DS_SALT["cn_signin"]), + } + if mmt_result: + headers["x-rpc-aigis"] = mmt_result.to_aigis_header() + + payload = { + "account": auth_utility.encrypt_geetest_credentials(account, 2), + "password": auth_utility.encrypt_geetest_credentials(password, 2), + } + + async with aiohttp.ClientSession() as session: + async with session.post( + routes.CN_WEB_LOGIN_URL.get_url(), + json=payload, + headers=headers, + ) as r: + data = await r.json() + + if data["retcode"] == -3102: + # Captcha triggered + aigis = json.loads(r.headers["x-rpc-aigis"]) + return SessionMMT(**aigis) + + if not data["data"]: + errors.raise_for_retcode(data) + + cookies = {cookie.key: cookie.value for cookie in r.cookies.values()} + self.set_cookies(cookies) + + return CNWebLoginResult(**cookies) + + async def _send_mobile_otp( + self, + mobile: str, + *, + mmt_result: typing.Optional[SessionMMTResult] = None, + ) -> typing.Union[None, SessionMMT]: + """Attempt to send OTP to the provided mobile number. + + May return aigis headers if captcha is triggered, None otherwise. + """ + headers = { + **auth_utility.CN_LOGIN_HEADERS, + "ds": ds_utility.generate_dynamic_secret(constants.DS_SALT["cn_signin"]), + } + if mmt_result: + headers["x-rpc-aigis"] = mmt_result.to_aigis_header() + + payload = { + "mobile": auth_utility.encrypt_geetest_credentials(mobile, 2), + "area_code": auth_utility.encrypt_geetest_credentials("+86", 2), + } + + async with aiohttp.ClientSession() as session: + async with session.post( + routes.MOBILE_OTP_URL.get_url(), + json=payload, + headers=headers, + ) as r: + data = await r.json() + + if data["retcode"] == -3101: + # Captcha triggered + aigis = json.loads(r.headers["x-rpc-aigis"]) + return SessionMMT(**aigis) + + if not data["data"]: + errors.raise_for_retcode(data) + + return None + + async def _login_with_mobile_otp(self, mobile: str, otp: str) -> MobileLoginResult: + """Login with OTP and mobile number. + + Returns cookies if OTP matches the one sent, raises an error otherwise. + """ + headers = { + **auth_utility.CN_LOGIN_HEADERS, + "ds": ds_utility.generate_dynamic_secret(constants.DS_SALT["cn_signin"]), + } + + payload = { + "mobile": auth_utility.encrypt_geetest_credentials(mobile, 2), + "area_code": auth_utility.encrypt_geetest_credentials("+86", 2), + "captcha": otp, + } + + async with aiohttp.ClientSession() as session: + async with session.post( + routes.MOBILE_LOGIN_URL.get_url(), + json=payload, + headers=headers, + ) as r: + data = await r.json() + + if not data["data"]: + errors.raise_for_retcode(data) + + cookies = {cookie.key: cookie.value for cookie in r.cookies.values()} + self.set_cookies(cookies) + + return MobileLoginResult(**cookies) diff --git a/genshin/client/components/base.py b/genshin/client/components/base.py index 49673cf9..90b15860 100644 --- a/genshin/client/components/base.py +++ b/genshin/client/components/base.py @@ -3,6 +3,7 @@ import abc import asyncio import base64 +import functools import json import logging import os @@ -24,6 +25,11 @@ __all__ = ["BaseClient"] +T = typing.TypeVar("T") +CallableT = typing.TypeVar("CallableT", bound="typing.Callable[..., object]") +AsyncCallableT = typing.TypeVar("AsyncCallableT", bound="typing.Callable[..., typing.Awaitable[object]]") + + class BaseClient(abc.ABC): """Base ABC Client.""" @@ -534,3 +540,22 @@ async def update_mi18n(self, langs: typing.Iterable[str] = constants.LANGS, *, f coros.append(self._fetch_mi18n(key, lang, force=force)) await asyncio.gather(*coros) + + +def region_specific(region: types.Region) -> typing.Callable[[AsyncCallableT], AsyncCallableT]: + """Prevent function to be ran with unsupported regions.""" + + def decorator(func: AsyncCallableT) -> AsyncCallableT: + @functools.wraps(func) + async def wrapper(self: typing.Any, *args: typing.Any, **kwargs: typing.Any) -> typing.Any: + + if not hasattr(self, "region"): + raise TypeError("Cannot use @region_specific on a plain function.") + if region != self.region: + raise RuntimeError("The method can only be used with client region set to " + region) + + return await func(self, *args, **kwargs) + + return typing.cast("AsyncCallableT", wrapper) + + return decorator diff --git a/genshin/client/components/geetest/__init__.py b/genshin/client/components/geetest/__init__.py deleted file mode 100644 index 3871455b..00000000 --- a/genshin/client/components/geetest/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -"""Geetest captcha handler. - -Credits to M-307 - https://github.com/mrwan200 -""" - -from .client import * diff --git a/genshin/client/components/geetest/client.py b/genshin/client/components/geetest/client.py deleted file mode 100644 index dbc9245c..00000000 --- a/genshin/client/components/geetest/client.py +++ /dev/null @@ -1,563 +0,0 @@ -"""Geetest client component.""" - -import asyncio -import json -import logging -import random -import typing -from string import ascii_letters, digits - -import aiohttp -import aiohttp.web -import qrcode -import qrcode.image.pil -from qrcode.constants import ERROR_CORRECT_L - -from genshin import constants, errors -from genshin.client import routes -from genshin.client.components import base -from genshin.client.manager.cookie import fetch_cookie_token_by_game_token, fetch_stoken_by_game_token -from genshin.models.miyoushe.qrcode import QRCodeCheckResult, QRCodeCreationResult, QRCodeStatus -from genshin.utility import ds as ds_utility -from genshin.utility import geetest as geetest_utility - -from . import server - -__all__ = ["GeetestClient"] - -LOGGER_ = logging.getLogger(__name__) - - -class GeetestClient(base.BaseClient): - """Geetest client component.""" - - async def _web_login( - self, - account: str, - password: str, - *, - tokenType: typing.Optional[int] = 6, - geetest: typing.Optional[typing.Dict[str, typing.Any]] = None, - ) -> typing.Dict[str, typing.Any]: - """Login with a password using web endpoint. - - Returns either data from aigis header or cookies. - """ - headers = {**geetest_utility.WEB_LOGIN_HEADERS} - if geetest: - mmt_data = geetest["data"] - session_id = geetest["session_id"] - headers["x-rpc-aigis"] = geetest_utility.get_aigis_header(session_id, mmt_data) - - payload = { - "account": geetest_utility.encrypt_geetest_credentials(account, self._region), - "password": geetest_utility.encrypt_geetest_credentials(password, self._region), - "token_type": tokenType, - } - - async with aiohttp.ClientSession() as session: - async with session.post( - routes.WEB_LOGIN_URL.get_url(), - json=payload, - headers=headers, - ) as r: - data = await r.json() - cookies = {cookie.key: cookie.value for cookie in r.cookies.values()} - - if data["retcode"] == -3101: - # Captcha triggered - aigis = json.loads(r.headers["x-rpc-aigis"]) - aigis["data"] = json.loads(aigis["data"]) - return aigis - - if not data["data"]: - errors.raise_for_retcode(data) - - if data["data"].get("stoken"): - cookies["stoken"] = data["data"]["stoken"] - - self.set_cookies(cookies) - - return cookies - - async def _cn_login_by_password( - self, - account: str, - password: str, - *, - geetest: typing.Optional[typing.Dict[str, typing.Any]] = None, - ) -> typing.Dict[str, typing.Any]: - """ - Login with account and password using Miyoushe loginByPassword endpoint. - - Returns data from aigis header or cookies. - """ - headers = { - **geetest_utility.CN_LOGIN_HEADERS, - "ds": ds_utility.generate_dynamic_secret(constants.DS_SALT["cn_signin"]), - } - if geetest: - mmt_data = geetest["data"] - session_id = geetest["session_id"] - headers["x-rpc-aigis"] = geetest_utility.get_aigis_header(session_id, mmt_data) - - payload = { - "account": geetest_utility.encrypt_geetest_credentials(account, self._region), - "password": geetest_utility.encrypt_geetest_credentials(password, self._region), - } - - async with aiohttp.ClientSession() as session: - async with session.post( - routes.CN_WEB_LOGIN_URL.get_url(), - json=payload, - headers=headers, - ) as r: - data = await r.json() - - if data["retcode"] == -3102: - # Captcha triggered - aigis = json.loads(r.headers["x-rpc-aigis"]) - aigis["data"] = json.loads(aigis["data"]) - return aigis - - if not data["data"]: - errors.raise_for_retcode(data) - - cookies = {cookie.key: cookie.value for cookie in r.cookies.values()} - - self.set_cookies(cookies) - return cookies - - async def _app_login( - self, - account: str, - password: str, - *, - geetest: typing.Optional[typing.Dict[str, typing.Any]] = None, - ticket: typing.Optional[typing.Dict[str, typing.Any]] = None, - ) -> typing.Dict[str, typing.Any]: - """Login with a password using HoYoLab app endpoint. - - Returns data from aigis header or action_ticket or cookies. - """ - headers = { - **geetest_utility.APP_LOGIN_HEADERS, - "ds": ds_utility.generate_dynamic_secret(constants.DS_SALT["app_login"]), - } - if geetest: - mmt_data = geetest["data"] - session_id = geetest["session_id"] - headers["x-rpc-aigis"] = geetest_utility.get_aigis_header(session_id, mmt_data) - - if ticket: - ticket["verify_str"] = json.dumps(ticket["verify_str"]) - headers["x-rpc-verify"] = json.dumps(ticket) - - payload = { - "account": geetest_utility.encrypt_geetest_credentials(account, self._region), - "password": geetest_utility.encrypt_geetest_credentials(password, self._region), - } - - async with aiohttp.ClientSession() as session: - async with session.post( - routes.APP_LOGIN_URL.get_url(), - json=payload, - headers=headers, - ) as r: - data = await r.json() - - if data["retcode"] == -3101: - # Captcha triggered - aigis = json.loads(r.headers["x-rpc-aigis"]) - aigis["data"] = json.loads(aigis["data"]) - return aigis - - if data["retcode"] == -3239: - # Email verification required - verify = json.loads(r.headers["x-rpc-verify"]) - verify["verify_str"] = json.loads(verify["verify_str"]) - return verify - - if not data["data"]: - errors.raise_for_retcode(data) - - cookies = { - "stoken": data["data"]["token"]["token"], - "ltuid_v2": data["data"]["user_info"]["aid"], - "ltmid_v2": data["data"]["user_info"]["mid"], - "account_id_v2": data["data"]["user_info"]["aid"], - "account_mid_v2": data["data"]["user_info"]["mid"], - } - self.set_cookies(cookies) - - return cookies - - async def _send_verification_email( - self, - ticket: typing.Dict[str, typing.Any], - *, - geetest: typing.Optional[typing.Dict[str, typing.Any]] = None, - ) -> typing.Union[None, typing.Dict[str, typing.Any]]: - """Send verification email. - - Returns None if success, aigis headers (mmt/aigis) otherwise. - """ - headers = {**geetest_utility.EMAIL_SEND_HEADERS} - if geetest: - mmt_data = geetest["data"] - session_id = geetest["session_id"] - headers["x-rpc-aigis"] = geetest_utility.get_aigis_header(session_id, mmt_data) - - async with aiohttp.ClientSession() as session: - async with session.post( - routes.SEND_VERIFICATION_CODE_URL.get_url(), - json={ - "action_type": "verify_for_component", - "action_ticket": ticket["verify_str"]["ticket"], - }, - headers=headers, - ) as r: - data = await r.json() - - if data["retcode"] == -3101: - # Captcha triggered - aigis = json.loads(r.headers["x-rpc-aigis"]) - aigis["data"] = json.loads(aigis["data"]) - return aigis - - if data["retcode"] != 0: - errors.raise_for_retcode(data) - - return None - - async def _verify_email(self, code: str, ticket: typing.Dict[str, typing.Any]) -> None: - """Verify email.""" - async with aiohttp.ClientSession() as session: - async with session.post( - routes.VERIFY_EMAIL_URL.get_url(), - json={ - "action_type": "verify_for_component", - "action_ticket": ticket["verify_str"]["ticket"], - "email_captcha": code, - "verify_method": 2, - }, - headers=geetest_utility.EMAIL_VERIFY_HEADERS, - ) as r: - data = await r.json() - - if data["retcode"] != 0: - errors.raise_for_retcode(data) - - return None - - async def _send_mobile_otp( - self, - mobile: str, - *, - geetest: typing.Optional[typing.Dict[str, typing.Any]] = None, - ) -> typing.Dict[str, typing.Any] | None: - """Attempt to send OTP to the provided mobile number. - - May return aigis headers if captcha is triggered, None otherwise. - """ - headers = { - **geetest_utility.CN_LOGIN_HEADERS, - "ds": ds_utility.generate_dynamic_secret(constants.DS_SALT["cn_signin"]), - } - if geetest: - mmt_data = geetest["data"] - session_id = geetest["session_id"] - headers["x-rpc-aigis"] = geetest_utility.get_aigis_header(session_id, mmt_data) - - payload = { - "mobile": geetest_utility.encrypt_geetest_credentials(mobile, self._region), - "area_code": geetest_utility.encrypt_geetest_credentials("+86", self._region), - } - - async with aiohttp.ClientSession() as session: - async with session.post( - routes.MOBILE_OTP_URL.get_url(), - json=payload, - headers=headers, - ) as r: - data = await r.json() - - if data["retcode"] == -3101: - # Captcha triggered - aigis = json.loads(r.headers["x-rpc-aigis"]) - aigis["data"] = json.loads(aigis["data"]) - return aigis - - if not data["data"]: - errors.raise_for_retcode(data) - - return None - - async def _login_with_mobile_otp(self, mobile: str, otp: str) -> typing.Dict[str, typing.Any]: - """Login with OTP and mobile number. - - Returns cookies if OTP matches the one sent, raises an error otherwise. - """ - headers = { - **geetest_utility.CN_LOGIN_HEADERS, - "ds": ds_utility.generate_dynamic_secret(constants.DS_SALT["cn_signin"]), - } - - payload = { - "mobile": geetest_utility.encrypt_geetest_credentials(mobile, self._region), - "area_code": geetest_utility.encrypt_geetest_credentials("+86", self._region), - "captcha": otp, - } - - async with aiohttp.ClientSession() as session: - async with session.post( - routes.MOBILE_LOGIN_URL.get_url(), - json=payload, - headers=headers, - ) as r: - data = await r.json() - - if not data["data"]: - errors.raise_for_retcode(data) - - cookies = {cookie.key: cookie.value for cookie in r.cookies.values()} - self.set_cookies(cookies) - - return cookies - - async def _create_qrcode(self) -> QRCodeCreationResult: - """Create a QR code for login.""" - device_id = "".join(random.choices(ascii_letters + digits, k=64)) - app_id = "8" - payload = { - "app_id": app_id, - "device": device_id, - } - - async with aiohttp.ClientSession() as session: - async with session.post( - routes.CREATE_QRCODE_URL.get_url(), - json=payload, - ) as r: - data = await r.json() - - if not data["data"]: - errors.raise_for_retcode(data) - - url: str = data["data"]["url"] - return QRCodeCreationResult( - app_id=app_id, - ticket=url.split("ticket=")[1], - device_id=device_id, - url=url, - ) - - async def _check_qrcode(self, app_id: str, device_id: str, ticket: str) -> QRCodeCheckResult: - """Check the status of a QR code login.""" - payload = { - "app_id": app_id, - "device": device_id, - "ticket": ticket, - } - - async with aiohttp.ClientSession() as session: - async with session.post( - routes.CHECK_QRCODE_URL.get_url(), - json=payload, - ) as r: - data = await r.json() - - if not data["data"]: - errors.raise_for_retcode(data) - - return QRCodeCheckResult(**data["data"]) - - async def login_with_password( - self, - account: str, - password: str, - *, - port: int = 5000, - tokenType: typing.Optional[int] = 6, - geetest_solver: typing.Optional[ - typing.Callable[ - [typing.Dict[str, typing.Any]], - typing.Awaitable[typing.Dict[str, typing.Any]], - ] - ] = None, - ) -> typing.Dict[str, str]: - """Login with a password via web endpoint. - - Note that this will start a webserver if captcha is - triggered and `geetest_solver` is not passed. - """ - result = await self._web_login(account, password, tokenType=tokenType) - - if "session_id" not in result: - # Captcha not triggered - return result - - if geetest_solver: - geetest = await geetest_solver(result) - else: - geetest = await server.solve_geetest(result, port=port) - - return await self._web_login(account, password, tokenType=tokenType, geetest=geetest) - - async def cn_login_by_password( - self, - account: str, - password: str, - *, - port: int = 5000, - geetest_solver: typing.Optional[ - typing.Callable[ - [typing.Dict[str, typing.Any]], - typing.Awaitable[typing.Dict[str, typing.Any]], - ] - ] = None, - ) -> typing.Dict[str, str]: - """Login with a password via Miyoushe loginByPassword endpoint. - - Note that this will start a webserver if captcha is triggered and `geetest_solver` is not passed. - """ - result = await self._cn_login_by_password(account, password) - - if "session_id" not in result: - # Captcha not triggered - return result - - if geetest_solver: - geetest = await geetest_solver(result) - else: - geetest = await server.solve_geetest(result, port=port) - - return await self._cn_login_by_password(account, password, geetest=geetest) - - async def check_mobile_number_validity(self, mobile: str) -> bool: - """Check if a mobile number is valid (it's registered on Miyoushe). - - Returns True if the mobile number is valid, False otherwise. - """ - async with aiohttp.ClientSession() as session: - async with session.get( - routes.CHECK_MOBILE_VALIDITY_URL.get_url(), - params={"mobile": mobile}, - ) as r: - data = await r.json() - - return data["data"]["status"] != data["data"]["is_registable"] - - async def login_with_mobile_number( - self, - mobile: str, - ) -> typing.Dict[str, str]: - """Login with mobile number, returns cookies. - - Only works for Chinese region (Miyoushe) users, do not include area code (+86) in the mobile number. - Steps: - 1. Sends OTP to the provided mobile number. - 1-1. If captcha is triggered, prompts the user to solve it. - 2. Lets user enter the OTP. - 3. Logs in with the OTP. - 4. Returns cookies. - """ - result = await self._send_mobile_otp(mobile) - - if result is not None and "session_id" in result: - # Captcha triggered - geetest = await server.solve_geetest(result) - await self._send_mobile_otp(mobile, geetest=geetest) - - otp = await server.enter_otp() - cookies = await self._login_with_mobile_otp(mobile, otp) - return cookies - - async def login_with_app_password( - self, - account: str, - password: str, - *, - port: int = 5000, - geetest_solver: typing.Optional[ - typing.Callable[ - [typing.Dict[str, typing.Any]], - typing.Awaitable[typing.Dict[str, typing.Any]], - ] - ] = None, - ) -> typing.Dict[str, str]: - """Login with a password via HoYoLab app endpoint. - - Note that this will start a webserver if either of the - following happens: - - 1. Captcha is triggered and `geetest_solver` is not passed. - 2. Email verification is triggered (can happen if you - first login with a new device). - """ - result = await self._app_login(account, password) - - if "session_id" in result: - # Captcha triggered - if geetest_solver: - geetest = await geetest_solver(result) - else: - geetest = await server.solve_geetest(result, port=port) - - result = await self._app_login(account, password, geetest=geetest) - - if "risk_ticket" in result: - # Email verification required - mmt = await self._send_verification_email(result) - if mmt: - if geetest_solver: - geetest = await geetest_solver(mmt) - else: - geetest = await server.solve_geetest(mmt, port=port) - - await self._send_verification_email(result, geetest=geetest) - - await server.verify_email(self, result, port=port) - result = await self._app_login(account, password, ticket=result) - - return result - - async def login_with_qrcode(self) -> typing.Dict[str, str]: - """Login with QR code, only available for Miyoushe users. - - Returns cookies. - """ - creation_result = await self._create_qrcode() - qrcode_: qrcode.image.pil.PilImage = qrcode.make(creation_result.url, error_correction=ERROR_CORRECT_L) # type: ignore - qrcode_.show() - - scanned = False - while True: - check_result = await self._check_qrcode( - creation_result.app_id, creation_result.device_id, creation_result.ticket - ) - if check_result.status == QRCodeStatus.SCANNED and not scanned: - LOGGER_.info("QR code scanned") - scanned = True - elif check_result.status == QRCodeStatus.CONFIRMED: - LOGGER_.info("QR code login confirmed") - break - - await asyncio.sleep(2) - - raw_data = check_result.payload.raw - assert raw_data is not None - - cookie_token = await fetch_cookie_token_by_game_token( - game_token=raw_data.game_token, account_id=raw_data.account_id - ) - stoken = await fetch_stoken_by_game_token(game_token=raw_data.game_token, account_id=int(raw_data.account_id)) - - cookies = { - "stoken_v2": stoken.token, - "stuid": stoken.aid, - "mid": stoken.mid, - "cookie_token": cookie_token, - } - self.set_cookies(cookies) - return cookies diff --git a/genshin/client/components/geetest/server.py b/genshin/client/components/geetest/server.py deleted file mode 100644 index 0aa0081d..00000000 --- a/genshin/client/components/geetest/server.py +++ /dev/null @@ -1,192 +0,0 @@ -"""Aiohttp webserver used for captcha solving and email verification.""" - -from __future__ import annotations - -import asyncio -import typing -import webbrowser - -import aiohttp -from aiohttp import web - -from . import client - -__all__ = ["PAGES", "launch_webapp", "solve_geetest", "verify_email"] - -PAGES: typing.Final[typing.Dict[typing.Literal["captcha", "verify-email", "enter-otp"], str]] = { - "captcha": """ - - - - - - - """, - "verify-email": """ - - - - - - - - - """, - "enter-otp": """ - - - - - - - - - """, -} - - -GT_URL = "https://raw.githubusercontent.com/GeeTeam/gt3-node-sdk/master/demo/static/libs/gt.js" - - -async def launch_webapp( - page: typing.Literal["captcha", "verify-email", "enter-otp"], - *, - port: int = 5000, - mmt: typing.Optional[typing.Dict[str, typing.Any]] = None, -) -> typing.Any: - """Create and run a webapp to solve captcha or send verification code.""" - routes = web.RouteTableDef() - future: asyncio.Future[typing.Any] = asyncio.Future() - - @routes.get("/captcha") - async def captcha(request: web.Request) -> web.StreamResponse: - return web.Response(body=PAGES["captcha"], content_type="text/html") - - @routes.get("/verify-email") - async def verify_email(request: web.Request) -> web.StreamResponse: - return web.Response(body=PAGES["verify-email"], content_type="text/html") - - @routes.get("/enter-otp") - async def enter_otp(request: web.Request) -> web.StreamResponse: - return web.Response(body=PAGES["enter-otp"], content_type="text/html") - - @routes.get("/gt.js") - async def gt(request: web.Request) -> web.StreamResponse: - async with aiohttp.ClientSession() as session: - r = await session.get(GT_URL) - content = await r.read() - - return web.Response(body=content, content_type="text/javascript") - - @routes.get("/mmt") - async def mmt_endpoint(request: web.Request) -> web.Response: - return web.json_response(mmt) - - @routes.post("/send-data") - async def send_data_endpoint(request: web.Request) -> web.Response: - body = await request.json() - future.set_result(body) - - return web.Response(status=204) - - app = web.Application() - app.add_routes(routes) - - runner = web.AppRunner(app) - await runner.setup() - - site = web.TCPSite(runner, host="localhost", port=port) - print(f"Opening http://localhost:{port}/{page} in browser...") # noqa - webbrowser.open_new_tab(f"http://localhost:{port}/{page}") - - await site.start() - - try: - data = await future - finally: - await asyncio.sleep(0.3) - await runner.shutdown() - await runner.cleanup() - - return data - - -async def solve_geetest( - mmt: typing.Dict[str, typing.Any], - *, - port: int = 5000, -) -> typing.Dict[str, typing.Any]: - """Solve a geetest captcha manually.""" - return await launch_webapp("captcha", port=port, mmt=mmt) - - -async def verify_email( - client: client.GeetestClient, - ticket: typing.Dict[str, typing.Any], - *, - port: int = 5000, -) -> None: - """Verify email to login via HoYoLab app endpoint.""" - data = await launch_webapp("verify-email", port=port) - code = data["code"] - - return await client._verify_email(code, ticket) - - -async def enter_otp(port: int = 5000) -> str: - """Lets user enter the OTP.""" - # The enter-otp page is the same as verify-email page. - data = await launch_webapp("enter-otp", port=port) - code = data["code"] - return code diff --git a/genshin/client/manager/cookie.py b/genshin/client/manager/cookie.py index 76779f4d..ef3af82e 100644 --- a/genshin/client/manager/cookie.py +++ b/genshin/client/manager/cookie.py @@ -1,6 +1,6 @@ """Cookie completion. -Available convertions: +Available conversions: - fetch_cookie_with_cookie - cookie_token -> cookie_token @@ -14,6 +14,10 @@ - fetch_cookie_with_stoken_v2 - stoken (v2) + mid -> ltoken_v2 (token_type=2) - stoken (v2) + mid -> cookie_token_v2 (token_type=4) +- fetch_cookie_token_with_game_token + - game_token -> cookie_token +- fetch_stoken_with_game_token + - game_token -> stoken """ from __future__ import annotations @@ -29,33 +33,19 @@ from genshin import constants, errors, types from genshin.client import routes from genshin.client.manager import managers -from genshin.models.miyoushe.cookie import StokenResult +from genshin.models.auth.cookie import StokenResult from genshin.utility import ds as ds_utility __all__ = [ "complete_cookies", - "fetch_cookie_token_by_game_token", "fetch_cookie_token_info", + "fetch_cookie_token_with_game_token", "fetch_cookie_with_cookie", "fetch_cookie_with_stoken_v2", - "fetch_stoken_by_game_token", + "fetch_stoken_with_game_token", "refresh_cookie_token", ] -STOKEN_BY_GAME_TOKEN_HEADERS = { - "x-rpc-app_version": "2.41.0", - "x-rpc-aigis": "", - "Content-Type": "application/json", - "Accept": "application/json", - "x-rpc-game_biz": "bbs_cn", - "x-rpc-sys_version": "11", - "x-rpc-device_name": "GenshinUid_login_device_lulu", - "x-rpc-device_model": "GenshinUid_login_device_lulu", - "x-rpc-app_id": "bll8iq97cem8", - "x-rpc-client_type": "2", - "User-Agent": "okhttp/4.8.0", -} - async def fetch_cookie_with_cookie( cookies: managers.CookieOrHeader, @@ -188,8 +178,8 @@ async def complete_cookies( return cookies -async def fetch_cookie_token_by_game_token(*, game_token: str, account_id: str) -> str: - """Fetch cookie token by game token, which is obtained by scanning a QR code.""" +async def fetch_cookie_token_with_game_token(*, game_token: str, account_id: str) -> str: + """Fetch cookie token with game token, which can be obtained by scanning a QR code.""" url = routes.GET_COOKIE_TOKEN_BY_GAME_TOKEN_URL.get_url() params = { "game_token": game_token, @@ -206,8 +196,8 @@ async def fetch_cookie_token_by_game_token(*, game_token: str, account_id: str) return data["data"]["cookie_token"] -async def fetch_stoken_by_game_token(*, game_token: str, account_id: int) -> StokenResult: - """Fetch cookie token by game token, which is obtained by scanning a QR code.""" +async def fetch_stoken_with_game_token(*, game_token: str, account_id: int) -> StokenResult: + """Fetch cookie token with game token, which can be obtained by scanning a QR code.""" url = routes.GET_STOKEN_BY_GAME_TOKEN_URL.get_url() payload = { "account_id": account_id, @@ -217,7 +207,7 @@ async def fetch_stoken_by_game_token(*, game_token: str, account_id: int) -> Sto "DS": ds_utility.generate_passport_ds(body=payload), "x-rpc-device_id": uuid.uuid4().hex, "x-rpc-device_fp": "".join(random.choices(ascii_letters + digits, k=13)), - **STOKEN_BY_GAME_TOKEN_HEADERS, + "x-rpc-app_id": "bll8iq97cem8", } async with aiohttp.ClientSession() as session: diff --git a/genshin/client/manager/managers.py b/genshin/client/manager/managers.py index 0e33324b..7b3b2704 100644 --- a/genshin/client/manager/managers.py +++ b/genshin/client/manager/managers.py @@ -17,6 +17,8 @@ from genshin.client import ratelimit from genshin.utility import fs as fs_utility +from ...constants import MIYOUSHE_GEETEST_RETCODES + _LOGGER = logging.getLogger(__name__) __all__ = [ @@ -153,6 +155,9 @@ async def _request( errors.check_for_geetest(data) + if data["retcode"] in MIYOUSHE_GEETEST_RETCODES: + raise errors.MiyousheGeetestError(data, {k: morsel.value for k, morsel in response.cookies.items()}) + if data["retcode"] == 0: return data["data"] diff --git a/genshin/client/routes.py b/genshin/client/routes.py index a24d048c..5cf3560a 100644 --- a/genshin/client/routes.py +++ b/genshin/client/routes.py @@ -17,6 +17,7 @@ "CN_WEB_LOGIN_URL", "COMMUNITY_URL", "COOKIE_V2_REFRESH_URL", + "CREATE_MMT_URL", "CREATE_QRCODE_URL", "DETAIL_LEDGER_URL", "GACHA_URL", @@ -237,3 +238,48 @@ def get_url(self, region: types.Region, game: types.Game) -> yarl.URL: CREATE_QRCODE_URL = Route("https://hk4e-sdk.mihoyo.com/hk4e_cn/combo/panda/qrcode/fetch") CHECK_QRCODE_URL = Route("https://hk4e-sdk.mihoyo.com/hk4e_cn/combo/panda/qrcode/query") + +CREATE_MMT_URL = Route( + "https://api-takumi-record.mihoyo.com/game_record/app/card/wapi/createVerification?is_high=false" +) + +GAME_RISKY_CHECK_URL = InternationalRoute( + overseas="https://api-account-os.hoyoverse.com/account/risky/api/check", + chinese="https://gameapi-account.mihoyo.com/account/risky/api/check", +) + +SHIELD_LOGIN_URL = GameRoute( + overseas=dict( + genshin="https://hk4e-sdk-os.hoyoverse.com/hk4e_global/mdk/shield/api/login", + honkai3rd="https://bh3-sdk-os.hoyoverse.com/bh3_os/mdk/shield/api/login", + hkrpg="https://hkrpg-sdk-os.hoyoverse.com/hkrpg_global/mdk/shield/api/login", + ), + chinese=dict( + genshin="https://hk4e-sdk.mihoyo.com/hk4e_cn/mdk/shield/api/login", + honkai3rd="https://api-sdk.mihoyo.com/bh3_cn/mdk/shield/api/login", + hkrpg="https://hkrpg-sdk.mihoyo.com/hkrpg_cn/mdk/shield/api/login", + ), +) + +PRE_GRANT_TICKET_URL = InternationalRoute( + overseas="https://api-account-os.hoyoverse.com/account/device/api/preGrantByTicket", + chinese="https://gameapi-account.mihoyo.com/account/device/api/preGrantByTicket", +) + +DEVICE_GRANT_URL = InternationalRoute( + overseas="https://api-account-os.hoyoverse.com/account/device/api/grant", + chinese="https://gameapi-account.mihoyo.com/account/device/api/grant", +) + +GAME_LOGIN_URL = GameRoute( + overseas=dict( + genshin="https://hk4e-sdk-os.hoyoverse.com/hk4e_global/combo/granter/login/v2/login", + honkai3rd="https://bh3-sdk-os.hoyoverse.com/bh3_os/combo/granter/login/v2/login", + hkrpg="https://hkrpg-sdk-os.hoyoverse.com/hkrpg_global/combo/granter/login/v2/login", + ), + chinese=dict( + genshin="https://hk4e-sdk.mihoyo.com/hk4e_cn/combo/granter/login/v2/login", + honkai3rd="https://api-sdk.mihoyo.com/bh3_cn/combo/granter/login/v2/login", + hkrpg="https://hkrpg-sdk.mihoyo.com/hkrpg_cn/combo/granter/login/v2/login", + ), +) diff --git a/genshin/constants.py b/genshin/constants.py index 64e42e05..fe5d3793 100644 --- a/genshin/constants.py +++ b/genshin/constants.py @@ -32,3 +32,38 @@ "cn_passport": "JwYDpKvLj6MrMqqYU6jTKF17KNO2PXoS", } """Dynamic Secret Salts.""" + +MIYOUSHE_GEETEST_RETCODES = {10035, 5003, 10041, 1034} +"""API error codes that indicate a Geetest was triggered during this Miyoushe API request.""" + +APP_KEYS = { + types.Game.GENSHIN: { + types.Region.OVERSEAS: "6a4c78fe0356ba4673b8071127b28123", + types.Region.CHINESE: "d0d3a7342df2026a70f650b907800111", + }, + types.Game.STARRAIL: { + types.Region.OVERSEAS: "d74818dabd4182d4fbac7f8df1622648", + types.Region.CHINESE: "4650f3a396d34d576c3d65df26415394", + }, + types.Game.HONKAI: { + types.Region.OVERSEAS: "243187699ab762b682a2a2e50ba02285", + types.Region.CHINESE: "0ebc517adb1b62c6b408df153331f9aa", + }, +} +"""App keys used for game login.""" + +APP_IDS = { + types.Game.GENSHIN: { + types.Region.OVERSEAS: "4", + types.Region.CHINESE: "4", + }, + types.Game.STARRAIL: { + types.Region.OVERSEAS: "11", + types.Region.CHINESE: "8", + }, + types.Game.HONKAI: { + types.Region.OVERSEAS: "8", + types.Region.CHINESE: "1", + }, +} +"""App IDs used for game login.""" diff --git a/genshin/errors.py b/genshin/errors.py index f62ade2f..0d293b59 100644 --- a/genshin/errors.py +++ b/genshin/errors.py @@ -14,6 +14,7 @@ "GenshinException", "InvalidAuthkey", "InvalidCookies", + "MiyousheGeetestError", "RedemptionClaimed", "RedemptionCooldown", "RedemptionException", @@ -186,6 +187,20 @@ class WrongOTP(GenshinException): msg = "The provided OTP code is wrong." +class MiyousheGeetestError(GenshinException): + """Geetest triggered during Miyoushe API request.""" + + def __init__( + self, + response: typing.Dict[str, typing.Any], + cookies: typing.Mapping[str, str], + ) -> None: + self.cookies = cookies + super().__init__(response) + + msg = "Geetest triggered during Miyoushe API request." + + _TGE = typing.Type[GenshinException] _errors: typing.Dict[int, typing.Union[_TGE, str, typing.Tuple[_TGE, typing.Optional[str]]]] = { # misc hoyolab diff --git a/genshin/models/__init__.py b/genshin/models/__init__.py index 38da6131..d7e16971 100644 --- a/genshin/models/__init__.py +++ b/genshin/models/__init__.py @@ -3,6 +3,6 @@ from .genshin import * from .honkai import * from .hoyolab import * -from .miyoushe import * +from .auth import * from .model import * from .starrail import * diff --git a/genshin/models/auth/__init__.py b/genshin/models/auth/__init__.py new file mode 100644 index 00000000..53f1a208 --- /dev/null +++ b/genshin/models/auth/__init__.py @@ -0,0 +1,6 @@ +"""Auth-related models.""" + +from .cookie import * +from .geetest import * +from .qrcode import * +from .verification import * diff --git a/genshin/models/auth/cookie.py b/genshin/models/auth/cookie.py new file mode 100644 index 00000000..77f2ad07 --- /dev/null +++ b/genshin/models/auth/cookie.py @@ -0,0 +1,133 @@ +"""Cookie-related models""" + +import typing + +import pydantic + +__all__ = [ + "AppLoginResult", + "DeviceGrantResult", + "GameLoginResult", + "QRLoginResult", + "StokenResult", + "WebLoginResult", +] + + +class StokenResult(pydantic.BaseModel): + """Result of fetching `stoken` with `fetch_stoken_by_game_token`.""" + + aid: str + mid: str + token: str + + @pydantic.model_validator(mode="before") + def _transform_result(cls, values: typing.Dict[str, typing.Any]) -> typing.Dict[str, typing.Any]: + return { + "aid": values["user_info"]["aid"], + "mid": values["user_info"]["mid"], + "token": values["token"]["token"], + } + + +class CookieLoginResult(pydantic.BaseModel): + """Base model for cookie login result.""" + + def to_str(self) -> str: + """Convert the login cookies to a string.""" + return "; ".join(f"{key}={value}" for key, value in self.model_dump().items()) + + def to_dict(self) -> typing.Dict[str, str]: + """Convert the login cookies to a dictionary.""" + return self.model_dump() + + +class QRLoginResult(CookieLoginResult): + """QR code login cookies. + + Returned by `client.login_with_qrcode`. + """ + + stoken: str + stuid: str + mid: str + cookie_token: str + + +class AppLoginResult(CookieLoginResult): + """App login cookies. + + Returned by `client.login_with_app_password`. + """ + + stoken: str + ltuid_v2: str + ltmid_v2: str + account_id_v2: str + account_mid_v2: str + + +class WebLoginResult(CookieLoginResult): + """Web login cookies. + + Returned by `client.login_with_password`. + """ + + cookie_token_v2: str + account_mid_v2: str + account_id_v2: str + ltoken_v2: str + ltmid_v2: str + ltuid_v2: str + + +class CNWebLoginResult(CookieLoginResult): + """Web login cookies. + + Returned by `client.cn_login_with_password`. + """ + + cookie_token_v2: str + account_mid_v2: str + account_id_v2: str + ltoken_v2: str + ltmid_v2: str + ltuid_v2: str + + +class MobileLoginResult(CookieLoginResult): + """Mobile number login cookies. + + Returned by `client.login_with_mobile_number`. + """ + + cookie_token_v2: str + account_mid_v2: str + account_id_v2: str + ltoken_v2: str + ltmid_v2: str + + +class DeviceGrantResult(pydantic.BaseModel): + """Cookies returned by the device grant endpoint.""" + + game_token: str + login_ticket: typing.Optional[str] = None + + @pydantic.model_validator(mode="before") + def _str_to_none(cls, data: typing.Dict[str, typing.Union[str, None]]) -> typing.Dict[str, typing.Union[str, None]]: + """Convert empty strings to `None`.""" + for key in data: + if data[key] == "" or data[key] == "None": + data[key] = None + return data + + +class GameLoginResult(pydantic.BaseModel): + """Game login result.""" + + combo_id: str + open_id: str + combo_token: str + heartbeat: bool + account_type: int diff --git a/genshin/models/auth/geetest.py b/genshin/models/auth/geetest.py new file mode 100644 index 00000000..389eb991 --- /dev/null +++ b/genshin/models/auth/geetest.py @@ -0,0 +1,167 @@ +"""Geetest-related models""" + +import enum +import json +import typing + +import pydantic + +from genshin.utility import auth as auth_utility + +__all__ = [ + "BaseMMT", + "BaseMMTResult", + "BaseSessionMMTResult", + "MMT", + "MMTResult", + "MMTv4", + "MMTv4Result", + "RiskyCheckMMT", + "RiskyCheckMMTResult", + "SessionMMT", + "SessionMMTResult", + "SessionMMTv4", + "SessionMMTv4Result", +] + + +class BaseMMT(pydantic.BaseModel): + """Base Geetest verification data model.""" + + new_captcha: int + success: int + + @pydantic.model_validator(mode="before") + def __parse_data(cls, data: typing.Dict[str, typing.Any]) -> typing.Dict[str, typing.Any]: + """Parse the data if it was provided in a raw format.""" + if "data" in data: + # Assume the data is aigis header and parse it + session_id = data["session_id"] + data = data["data"] + if isinstance(data, str): + data = json.loads(data) + + data["session_id"] = session_id + + return data + + +class MMT(BaseMMT): + """Geetest verification data.""" + + challenge: str + gt: str + + +class SessionMMT(MMT): + """Session-based geetest verification data.""" + + session_id: str + + def get_mmt(self) -> MMT: + """Get the base MMT data.""" + return MMT(**self.model_dump(exclude={"session_id"})) + + +class MMTv4(BaseMMT): + """Geetest verification data (V4).""" + + captcha_id: str = pydantic.Field(alias="gt") + risk_type: str + + +class SessionMMTv4(MMTv4): + """Session-based geetest verification data (V4).""" + + session_id: str + + def get_mmt(self) -> MMTv4: + """Get the base MMTv4 data.""" + return MMTv4(**self.model_dump(exclude={"session_id"})) + + +class RiskyCheckMMT(MMT): + """MMT returned by the risky check endpoint.""" + + check_id: str + + +class BaseMMTResult(pydantic.BaseModel): + """Base Geetest verification result model.""" + + def get_data(self) -> typing.Dict[str, typing.Any]: + """Get the base MMT result data. + + This method acts as `model_dump` but excludes the `session_id` field. + """ + return self.model_dump(exclude={"session_id"}) + + +class BaseSessionMMTResult(BaseMMTResult): + """Base session-based Geetest verification result model.""" + + session_id: str + + def to_aigis_header(self) -> str: + """Convert the result to `x-rpc-aigis` header.""" + return auth_utility.get_aigis_header(self.session_id, self.get_data()) + + +class MMTResult(BaseMMTResult): + """Geetest verification result.""" + + geetest_challenge: str + geetest_validate: str + geetest_seccode: str + + +class SessionMMTResult(MMTResult, BaseSessionMMTResult): + """Session-based geetest verification result.""" + + +class MMTv4Result(BaseMMTResult): + """Geetest verification result (V4).""" + + captcha_id: str + lot_number: str + pass_token: str + gen_time: str + captcha_output: str + + +class SessionMMTv4Result(MMTv4Result, BaseSessionMMTResult): + """Session-based geetest verification result (V4).""" + + +class RiskyCheckMMTResult(MMTResult): + """Risky check MMT result.""" + + check_id: str + + def to_rpc_risky(self) -> str: + """Convert the MMT result to a RPC risky header.""" + return auth_utility.generate_risky_header(self.check_id, self.geetest_challenge, self.geetest_validate) + + +class RiskyCheckAction(str, enum.Enum): + """Risky check action returned by the API.""" + + ACTION_NONE = "ACTION_NONE" + """No action required.""" + ACTION_GEETEST = "ACTION_GEETEST" + """Geetest verification required.""" + + +class RiskyCheckResult(pydantic.BaseModel): + """Model for the risky check result.""" + + id: str + action: RiskyCheckAction + mmt: typing.Optional[MMT] = pydantic.Field(alias="geetest") + + def to_mmt(self) -> RiskyCheckMMT: + """Convert the check result to a `RiskyCheckMMT` object.""" + if self.mmt is None: + raise ValueError("The check result does not contain a MMT object.") + + return RiskyCheckMMT(**self.mmt.model_dump(), check_id=self.id) diff --git a/genshin/models/auth/qrcode.py b/genshin/models/auth/qrcode.py new file mode 100644 index 00000000..5c2768cb --- /dev/null +++ b/genshin/models/auth/qrcode.py @@ -0,0 +1,55 @@ +"""Miyoushe QR Code Models""" + +import enum +import json +import typing + +import pydantic + +__all__ = ["QRCodeCheckResult", "QRCodeCreationResult", "QRCodePayload", "QRCodeRawData", "QRCodeStatus"] + + +class QRCodeStatus(enum.Enum): + """QR code check status.""" + + INIT = "Init" + SCANNED = "Scanned" + CONFIRMED = "Confirmed" + + +class QRCodeRawData(pydantic.BaseModel): + """QR code raw data.""" + + account_id: str = pydantic.Field(alias="uid") + """Miyoushe account id.""" + game_token: str = pydantic.Field(alias="token") + + +class QRCodePayload(pydantic.BaseModel): + """QR code check result payload.""" + + proto: str + ext: str + raw: typing.Optional[QRCodeRawData] = None + + @pydantic.field_validator("raw", mode="before") + def _convert_raw_data(cls, value: typing.Optional[str] = None) -> typing.Union[QRCodeRawData, None]: + if value: + return QRCodeRawData(**json.loads(value)) + return None + + +class QRCodeCheckResult(pydantic.BaseModel): + """QR code check result.""" + + status: QRCodeStatus = pydantic.Field(alias="stat") + payload: QRCodePayload + + +class QRCodeCreationResult(pydantic.BaseModel): + """QR code creation result.""" + + app_id: str + ticket: str + device_id: str + url: str diff --git a/genshin/models/auth/responses.py b/genshin/models/auth/responses.py new file mode 100644 index 00000000..31b99fb1 --- /dev/null +++ b/genshin/models/auth/responses.py @@ -0,0 +1,47 @@ +"""Auth endpoints responses models.""" + +import typing + +import pydantic + +__all__ = ["Account", "ShieldLoginResponse"] + + +class Account(pydantic.BaseModel): + """Account data returned by the shield login endpoint.""" + + uid: str + name: str + email: str + mobile: str + is_email_verify: str + realname: str + identity_card: str + token: str + safe_mobile: str + facebook_name: str + google_name: str + twitter_name: str + game_center_name: str + apple_name: str + sony_name: str + tap_name: str + country: str + reactivate_ticket: str + area_code: str + device_grant_ticket: str + steam_name: str + unmasked_email: str + unmasked_email_type: int + cx_name: typing.Optional[str] = None + + +class ShieldLoginResponse(pydantic.BaseModel): + """Response model for the shield login endpoint.""" + + account: Account + device_grant_required: bool + safe_moblie_required: bool + realperson_required: bool + reactivate_required: bool + realname_operation: str diff --git a/genshin/models/auth/verification.py b/genshin/models/auth/verification.py new file mode 100644 index 00000000..5d51a573 --- /dev/null +++ b/genshin/models/auth/verification.py @@ -0,0 +1,40 @@ +"""Email verification -related models""" + +import json +import typing + +import pydantic + +__all__ = [ + "ActionTicket", + "VerifyStrategy", +] + + +class VerifyStrategy(pydantic.BaseModel): + """Verification strategy.""" + + ticket: str + verify_type: str + + +class ActionTicket(pydantic.BaseModel): + """Action ticket. Can be used to verify email addresses.""" + + risk_ticket: str + verify_str: VerifyStrategy + + @pydantic.model_validator(mode="before") + def __parse_data(cls, data: typing.Dict[str, typing.Any]) -> typing.Dict[str, typing.Any]: + """Parse the data if it was provided in a raw format.""" + verify_str = data["verify_str"] + if isinstance(verify_str, str): + data["verify_str"] = json.loads(verify_str) + + return data + + def to_rpc_verify_header(self) -> str: + """Convert the action ticket to `x-rpc-verify` header.""" + ticket = self.model_dump() + ticket["verify_str"] = json.dumps(ticket["verify_str"]) + return json.dumps(ticket) diff --git a/genshin/models/miyoushe/__init__.py b/genshin/models/miyoushe/__init__.py deleted file mode 100644 index f34841ca..00000000 --- a/genshin/models/miyoushe/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -"""Miyoushe models.""" - -from .cookie import * -from .qrcode import * diff --git a/genshin/models/miyoushe/cookie.py b/genshin/models/miyoushe/cookie.py deleted file mode 100644 index 4cca01cf..00000000 --- a/genshin/models/miyoushe/cookie.py +++ /dev/null @@ -1,23 +0,0 @@ -"""Miyoushe Cookie Models""" - -import typing - -from pydantic import BaseModel, model_validator - -__all__ = ("StokenResult",) - - -class StokenResult(BaseModel): - """Stoken result.""" - - aid: str - mid: str - token: str - - @model_validator(mode="before") - def _transform_result(cls, values: typing.Dict[str, typing.Any]) -> typing.Dict[str, typing.Any]: - return { - "aid": values["user_info"]["aid"], - "mid": values["user_info"]["mid"], - "token": values["token"]["token"], - } diff --git a/genshin/models/miyoushe/qrcode.py b/genshin/models/miyoushe/qrcode.py deleted file mode 100644 index 5ce6d0a6..00000000 --- a/genshin/models/miyoushe/qrcode.py +++ /dev/null @@ -1,54 +0,0 @@ -"""Miyoushe QR Code Models""" - -import json -from enum import Enum - -from pydantic import BaseModel, Field, field_validator - -__all__ = ("QRCodeCheckResult", "QRCodeCreationResult", "QRCodePayload", "QRCodeRawData", "QRCodeStatus") - - -class QRCodeStatus(Enum): - """QR code check status.""" - - INIT = "Init" - SCANNED = "Scanned" - CONFIRMED = "Confirmed" - - -class QRCodeRawData(BaseModel): - """QR code raw data.""" - - account_id: str = Field(alias="uid") - """Miyoushe account id.""" - game_token: str = Field(alias="token") - - -class QRCodePayload(BaseModel): - """QR code check result payload.""" - - proto: str - raw: QRCodeRawData | None - ext: str - - @field_validator("raw", mode="before") - def _convert_raw_data(cls, value: str | None) -> QRCodeRawData | None: - if value: - return QRCodeRawData(**json.loads(value)) - return None - - -class QRCodeCheckResult(BaseModel): - """QR code check result.""" - - status: QRCodeStatus = Field(alias="stat") - payload: QRCodePayload - - -class QRCodeCreationResult(BaseModel): - """QR code creation result.""" - - app_id: str - ticket: str - device_id: str - url: str diff --git a/genshin/utility/__init__.py b/genshin/utility/__init__.py index cd1e07c3..fd4b76bc 100644 --- a/genshin/utility/__init__.py +++ b/genshin/utility/__init__.py @@ -1,6 +1,6 @@ """Utilities for genshin.py.""" -from . import geetest +from .auth import * from .concurrency import * from .ds import * from .extdb import * diff --git a/genshin/utility/auth.py b/genshin/utility/auth.py new file mode 100644 index 00000000..83591656 --- /dev/null +++ b/genshin/utility/auth.py @@ -0,0 +1,159 @@ +"""Auth utilities.""" + +import base64 +import hmac +import json +import typing +from hashlib import sha256 + +from genshin import constants + +__all__ = ["encrypt_geetest_credentials", "generate_sign"] + + +# RSA key used for OS app/web login +LOGIN_KEY_TYPE_1 = b""" +-----BEGIN PUBLIC KEY----- +MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA4PMS2JVMwBsOIrYWRluY +wEiFZL7Aphtm9z5Eu/anzJ09nB00uhW+ScrDWFECPwpQto/GlOJYCUwVM/raQpAj +/xvcjK5tNVzzK94mhk+j9RiQ+aWHaTXmOgurhxSp3YbwlRDvOgcq5yPiTz0+kSeK +ZJcGeJ95bvJ+hJ/UMP0Zx2qB5PElZmiKvfiNqVUk8A8oxLJdBB5eCpqWV6CUqDKQ +KSQP4sM0mZvQ1Sr4UcACVcYgYnCbTZMWhJTWkrNXqI8TMomekgny3y+d6NX/cFa6 +6jozFIF4HCX5aW8bp8C8vq2tFvFbleQ/Q3CU56EWWKMrOcpmFtRmC18s9biZBVR/ +8QIDAQAB +-----END PUBLIC KEY----- +""" + +# RSA key used for CN app/game and game login +LOGIN_KEY_TYPE_2 = b""" +-----BEGIN PUBLIC KEY----- +MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDDvekdPMHN3AYhm/vktJT+YJr7 +cI5DcsNKqdsx5DZX0gDuWFuIjzdwButrIYPNmRJ1G8ybDIF7oDW2eEpm5sMbL9zs +9ExXCdvqrn51qELbqj0XxtMTIpaCHFSI50PfPpTFV9Xt/hmyVwokoOXFlAEgCn+Q +CgGs52bFoYMtyi+xEQIDAQAB +-----END PUBLIC KEY----- +""" + +WEB_LOGIN_HEADERS = { + "x-rpc-app_id": "c9oqaq3s3gu8", + "x-rpc-client_type": "4", + # If not equal account.hoyolab.com It's will return retcode 1200 [Unauthorized] + "Origin": "https://account.hoyolab.com", + "Referer": "https://account.hoyolab.com/", +} + +APP_LOGIN_HEADERS = { + "x-rpc-app_id": "c9oqaq3s3gu8", + "x-rpc-client_type": "2", + # Passing "x-rpc-device_id" header will trigger email verification + # (unless the device_id is already verified). + # + # For some reason, without this header, email verification is not triggered. + # "x-rpc-device_id": "1c33337bd45c1bfs", +} + +CN_LOGIN_HEADERS = { + "x-rpc-app_id": "bll8iq97cem8", + "x-rpc-client_type": "4", + "x-rpc-source": "v2.webLogin", + "x-rpc-device_id": "586f2440-856a-4243-8076-2b0a12314197", +} + +EMAIL_SEND_HEADERS = { + "x-rpc-app_id": "c9oqaq3s3gu8", + "x-rpc-client_type": "2", +} + +EMAIL_VERIFY_HEADERS = { + "x-rpc-app_id": "c9oqaq3s3gu8", + "x-rpc-client_type": "2", +} + +CREATE_MMT_HEADERS = { + "x-rpc-app_version": "2.60.1", + "x-rpc-client_type": "5", +} + +DEVICE_ID = "D6AF5103-D297-4A01-B86A-87F87DS5723E" + +RISKY_CHECK_HEADERS = { + "x-rpc-client_type": "1", + "x-rpc-channel_id": "1", + "x-rpc-game_biz": "hkrpg_global", +} + +SHIELD_LOGIN_HEADERS = { + "x-rpc-client_type": "1", + "x-rpc-channel_id": "1", + "x-rpc-game_biz": "hkrpg_global", + "x-rpc-device_id": DEVICE_ID, +} + +GRANT_TICKET_HEADERS = { + "x-rpc-client_type": "1", + "x-rpc-channel_id": "1", + "x-rpc-game_biz": "hkrpg_global", + "x-rpc-device_id": DEVICE_ID, + "x-rpc-language": "ru", +} + +GAME_LOGIN_HEADERS = { + "x-rpc-client_type": "1", + "x-rpc-channel_id": "1", + "x-rpc-game_biz": "hkrpg_global", + "x-rpc-device_id": DEVICE_ID, +} + +GEETEST_LANGS = { + "简体中文": "zh-cn", + "繁體中文": "zh-tw", + "Deutsch": "de", + "English": "en", + "Español": "es", + "Français": "fr", + "Indonesia": "id", + "Italiano": "it", + "日本語": "ja", + "한국어": "ko", + "Português": "pt-pt", + "Pусский": "ru", + "ภาษาไทย": "th", + "Tiếng Việt": "vi", + "Türkçe": "tr", +} + + +def lang_to_geetest_lang(lang: str) -> str: + """Convert `client.lang` to geetest lang.""" + return GEETEST_LANGS.get(constants.LANGS.get(lang, "en-us"), "en") + + +def encrypt_geetest_credentials(text: str, key_type: typing.Literal[1, 2]) -> str: + """Encrypt text for geetest.""" + import rsa + + public_key = rsa.PublicKey.load_pkcs1_openssl_pem(LOGIN_KEY_TYPE_1 if key_type == 1 else LOGIN_KEY_TYPE_2) + crypto = rsa.encrypt(text.encode("utf-8"), public_key) + return base64.b64encode(crypto).decode("utf-8") + + +def get_aigis_header(session_id: str, mmt_data: typing.Dict[str, typing.Any]) -> str: + """Get aigis header.""" + return f"{session_id};{base64.b64encode(json.dumps(mmt_data).encode()).decode()}" + + +def generate_sign(data: typing.Dict[str, typing.Any], key: str) -> str: + """Generate a sign for the given `data` and `app_key`.""" + string = "" + for k in sorted(data.keys()): + string += k + "=" + str(data[k]) + "&" + return hmac.new(key.encode(), string[:-1].encode(), sha256).hexdigest() + + +def generate_risky_header( + check_id: str, + challenge: str = "", + validate: str = "", +) -> str: + """Generate risky header for geetest verification.""" + return f"id={check_id};c={challenge};s={validate}|jordan;v={validate}" diff --git a/genshin/utility/ds.py b/genshin/utility/ds.py index adae34c3..18bd619c 100644 --- a/genshin/utility/ds.py +++ b/genshin/utility/ds.py @@ -9,7 +9,13 @@ from genshin import constants, types -__all__ = ["generate_cn_dynamic_secret", "generate_dynamic_secret", "generate_passport_ds", "get_ds_headers"] +__all__ = [ + "generate_cn_dynamic_secret", + "generate_create_geetest_ds", + "generate_dynamic_secret", + "generate_passport_ds", + "get_ds_headers", +] def generate_dynamic_secret(salt: str = constants.DS_SALT[types.Region.OVERSEAS]) -> str: @@ -70,3 +76,12 @@ def generate_passport_ds(body: typing.Mapping[str, typing.Any]) -> str: h = hashlib.md5(f"salt={salt}&t={t}&r={r}&b={b}&q=".encode()).hexdigest() result = f"{t},{r},{h}" return result + + +def generate_create_geetest_ds() -> str: + """Create a dynamic secret for Miyoushe createVerification API endpoint.""" + salt = constants.DS_SALT[types.Region.CHINESE] + t = int(time.time()) + r = random.randint(100000, 200000) + h = hashlib.md5(f"salt={salt}&t={t}&r={r}&b=&q=is_high=false".encode()).hexdigest() + return f"{t},{r},{h}" diff --git a/genshin/utility/geetest.py b/genshin/utility/geetest.py deleted file mode 100644 index fe388afb..00000000 --- a/genshin/utility/geetest.py +++ /dev/null @@ -1,86 +0,0 @@ -"""Geetest utilities.""" - -import base64 -import json -import typing - -from ..types import Region - -__all__ = ["encrypt_geetest_credentials"] - - -# RSA key is the same for app and web login -OS_LOGIN_KEY_CERT = b""" ------BEGIN PUBLIC KEY----- -MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA4PMS2JVMwBsOIrYWRluY -wEiFZL7Aphtm9z5Eu/anzJ09nB00uhW+ScrDWFECPwpQto/GlOJYCUwVM/raQpAj -/xvcjK5tNVzzK94mhk+j9RiQ+aWHaTXmOgurhxSp3YbwlRDvOgcq5yPiTz0+kSeK -ZJcGeJ95bvJ+hJ/UMP0Zx2qB5PElZmiKvfiNqVUk8A8oxLJdBB5eCpqWV6CUqDKQ -KSQP4sM0mZvQ1Sr4UcACVcYgYnCbTZMWhJTWkrNXqI8TMomekgny3y+d6NX/cFa6 -6jozFIF4HCX5aW8bp8C8vq2tFvFbleQ/Q3CU56EWWKMrOcpmFtRmC18s9biZBVR/ -8QIDAQAB ------END PUBLIC KEY----- -""" - -CN_LOGIN_KEY_CERT = b""" ------BEGIN PUBLIC KEY----- -MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDDvekdPMHN3AYhm/vktJT+YJr7 -cI5DcsNKqdsx5DZX0gDuWFuIjzdwButrIYPNmRJ1G8ybDIF7oDW2eEpm5sMbL9zs -9ExXCdvqrn51qELbqj0XxtMTIpaCHFSI50PfPpTFV9Xt/hmyVwokoOXFlAEgCn+Q -CgGs52bFoYMtyi+xEQIDAQAB ------END PUBLIC KEY----- -""" - -WEB_LOGIN_HEADERS = { - "x-rpc-app_id": "c9oqaq3s3gu8", - "x-rpc-client_type": "4", - # If not equal account.hoyolab.com It's will return retcode 1200 [Unauthorized] - "Origin": "https://account.hoyolab.com", - "Referer": "https://account.hoyolab.com/", -} - -APP_LOGIN_HEADERS = { - "x-rpc-app_id": "c9oqaq3s3gu8", - "x-rpc-client_type": "2", - # Passing "x-rpc-device_id" header will trigger email verification - # (unless the device_id is already verified). - # - # For some reason, without this header, email verification is not triggered. - # "x-rpc-device_id": "1c33337bd45c1bfs", -} - -CN_LOGIN_HEADERS = { - "x-rpc-app_id": "bll8iq97cem8", - "x-rpc-client_type": "4", - "Origin": "https://user.miyoushe.com", - "Referer": "https://user.miyoushe.com/", - "x-rpc-source": "v2.webLogin", - "x-rpc-mi_referrer": "https://user.miyoushe.com/login-platform/index.html?app_id=bll8iq97cem8&theme=&token_type=4&game_biz=bbs_cn&message_origin=https%253A%252F%252Fwww.miyoushe.com&succ_back_type=message%253Alogin-platform%253Alogin-success&fail_back_type=message%253Alogin-platform%253Alogin-fail&ux_mode=popup&iframe_level=1#/login/password", # noqa: E501 - "x-rpc-device_id": "586f2440-856a-4243-8076-2b0a12314197", -} - -EMAIL_SEND_HEADERS = { - "x-rpc-app_id": "c9oqaq3s3gu8", - "x-rpc-client_type": "2", -} - -EMAIL_VERIFY_HEADERS = { - "x-rpc-app_id": "c9oqaq3s3gu8", - "x-rpc-client_type": "2", -} - - -def encrypt_geetest_credentials(text: str, region: Region = Region.OVERSEAS) -> str: - """Encrypt text for geetest.""" - import rsa - - public_key = rsa.PublicKey.load_pkcs1_openssl_pem( - OS_LOGIN_KEY_CERT if region is Region.OVERSEAS else CN_LOGIN_KEY_CERT - ) - crypto = rsa.encrypt(text.encode("utf-8"), public_key) - return base64.b64encode(crypto).decode("utf-8") - - -def get_aigis_header(session_id: str, mmt_data: typing.Dict[str, typing.Any]) -> str: - """Get aigis header.""" - return f"{session_id};{base64.b64encode(json.dumps(mmt_data).encode()).decode()}" diff --git a/setup.py b/setup.py index c8cb0974..ab7b352d 100644 --- a/setup.py +++ b/setup.py @@ -20,7 +20,7 @@ extras_require={ "all": ["browser-cookie3", "rsa", "click", "qrcode[pil]"], "cookies": ["browser-cookie3"], - "geetest": ["rsa", "qrcode[pil]"], + "auth": ["rsa", "qrcode[pil]"], "cli": ["click"], }, include_package_data=True,