diff --git a/genshin-dev/lint-requirements.txt b/genshin-dev/lint-requirements.txt index 4073601f..ede3eb4e 100644 --- a/genshin-dev/lint-requirements.txt +++ b/genshin-dev/lint-requirements.txt @@ -1,13 +1 @@ -flake8 - -flake8-annotations-complexity # complex annotation -flake8-black # runs black -flake8-builtins # builtin shadowing -flake8-docstrings # proper formatting and grammar in docstrings -flake8-isort # runs isort -flake8-mutable # mutable default argument detection -flake8-pep3101 # new-style format strings only -flake8-print # complain about print statements in code -flake8-pytest-style # pytest checks -flake8-raise # exception raising -flake8-requirements # requirements.txt check +ruff \ No newline at end of file diff --git a/genshin-dev/reformat-requirements.txt b/genshin-dev/reformat-requirements.txt index bee49858..1cf5c13f 100644 --- a/genshin-dev/reformat-requirements.txt +++ b/genshin-dev/reformat-requirements.txt @@ -1,3 +1,3 @@ black -isort -sort-all +ruff +sort-all \ No newline at end of file diff --git a/genshin/__main__.py b/genshin/__main__.py index 2f7e8f3a..fd047c9a 100644 --- a/genshin/__main__.py +++ b/genshin/__main__.py @@ -68,8 +68,10 @@ async def accounts(client: genshin.Client) -> None: genshin_group: click.Group = click.Group("genshin", help="Genshin-related commands.") honkai_group: click.Group = click.Group("honkai", help="Honkai-related commands.") +starrail_group: click.Group = click.Group("starrail", help="StarRail-related commands.") cli.add_command(genshin_group) cli.add_command(honkai_group) +cli.add_command(starrail_group) @honkai_group.command("stats") @@ -199,6 +201,33 @@ async def genshin_notes(client: genshin.Client, uid: typing.Optional[int]) -> No click.echo(f" - {expedition.status}") +@starrail_group.command("notes") +@click.argument("uid", type=int, default=None, required=False) +@client_command +async def starrail_notes(client: genshin.Client, uid: typing.Optional[int]) -> None: + """Show real-Time starrail notes.""" + click.echo("Real-Time notes.") + + data = await client.get_starrail_notes(uid) + + click.echo(f"{click.style('TB power:', bold=True)} {data.current_stamina}/{data.max_stamina}", nl=False) + click.echo(f" (Full in {data.stamina_recover_time})" if data.stamina_recover_time > datetime.timedelta(0) else "") + click.echo(f"{click.style('Reserved TB power:', bold=True)} {data.current_reserve_stamina}/2400") + click.echo(f"{click.style('Daily training:', bold=True)} {data.current_train_score}/{data.max_train_score}") + click.echo(f"{click.style('Simulated Universe:', bold=True)} {data.current_rogue_score}/{data.max_rogue_score}") + click.echo( + f"{click.style('Echo of War:', bold=True)} {data.remaining_weekly_discounts}/{data.max_weekly_discounts}" + ) + + click.echo(f"\n{click.style('Assignments:', bold=True)} {data.accepted_epedition_num}/{data.total_expedition_num}") + for expedition in data.expeditions: + if expedition.remaining_time > datetime.timedelta(0): + remaining = f"{expedition.remaining_time} remaining" + click.echo(f" - {expedition.name} | {remaining}") + else: + click.echo(f" - {expedition.name} | Finished") + + @cli.command() @click.option("--scenario", help="Scenario ID or name to use (eg '12-3').", type=str, default=None) @client_command diff --git a/genshin/client/components/gacha.py b/genshin/client/components/gacha.py index fe6cad62..19fc90b0 100644 --- a/genshin/client/components/gacha.py +++ b/genshin/client/components/gacha.py @@ -222,7 +222,7 @@ async def _get_banner_details( server = "prod_official_asia" if game == types.Game.STARRAIL else "os_asia" data = await self.request_webstatic( - f"/{region}/gacha_info/{server}/{banner_id}/{lang}.json", + f"/gacha_info/{region}/{server}/{banner_id}/{lang}.json", cache=client_cache.cache_key("banner", endpoint="details", banner=banner_id, lang=lang), ) return models.BannerDetails(**data, banner_id=banner_id) @@ -240,12 +240,19 @@ async def get_genshin_banner_ids(self) -> typing.Sequence[str]: Uses the current cn banners. """ + + def process_gacha(data: typing.Mapping[str, typing.Any]) -> str: + # Temporary fix for 4.5 chronicled wish + if data["gacha_type"] == 500: + return "8b10b48c52dd6870f92d72e9963b44bb8968ed2f" + return data["gacha_id"] + data = await self.request_webstatic( - "hk4e/gacha_info/cn_gf01/gacha/list.json", + "gacha_info/hk4e/cn_gf01/gacha/list.json", region=types.Region.CHINESE, cache=client_cache.cache_key("banner", endpoint="ids"), ) - return [i["gacha_id"] for i in data["data"]["list"]] + return list(map(process_gacha, data["data"]["list"])) async def get_banner_details( self, diff --git a/genshin/client/components/geetest/client.py b/genshin/client/components/geetest/client.py index 036fa4b4..98f86979 100644 --- a/genshin/client/components/geetest/client.py +++ b/genshin/client/components/geetest/client.py @@ -1,14 +1,26 @@ """Geetest client component.""" +import asyncio import json +import logging +import random import typing +from string import ascii_letters, digits import aiohttp import aiohttp.web +import qrcode +import qrcode.image.pil +from qrcode.constants import ERROR_CORRECT_L from genshin import constants, errors from genshin.client import routes from genshin.client.components import base +from genshin.client.manager.cookie import ( + fetch_cookie_token_by_game_token, + fetch_stoken_by_game_token, +) +from genshin.models.miyoushe.qrcode import QRCodeCheckResult, QRCodeCreationResult, QRCodeStatus from genshin.utility import ds as ds_utility from genshin.utility import geetest as geetest_utility @@ -16,11 +28,13 @@ __all__ = ["GeetestClient"] +LOGGER_ = logging.getLogger(__name__) + class GeetestClient(base.BaseClient): """Geetest client component.""" - async def web_login( + async def _web_login( self, account: str, password: str, @@ -39,8 +53,8 @@ async def web_login( headers["x-rpc-aigis"] = geetest_utility.get_aigis_header(session_id, mmt_data) payload = { - "account": geetest_utility.encrypt_geetest_credentials(account), - "password": geetest_utility.encrypt_geetest_credentials(password), + "account": geetest_utility.encrypt_geetest_credentials(account, self._region), + "password": geetest_utility.encrypt_geetest_credentials(password, self._region), "token_type": tokenType, } @@ -69,12 +83,61 @@ async def web_login( return cookies - async def app_login( + async def _cn_login_by_password( + self, + account: str, + password: str, + *, + geetest: typing.Optional[typing.Dict[str, typing.Any]] = None, + ) -> typing.Dict[str, typing.Any]: + """ + Login with account and password using Miyoushe loginByPassword endpoint. + + Returns data from aigis header or cookies. + """ + headers = { + **geetest_utility.CN_LOGIN_HEADERS, + "ds": ds_utility.generate_dynamic_secret(constants.DS_SALT["cn_signin"]), + } + if geetest: + mmt_data = geetest["data"] + session_id = geetest["session_id"] + headers["x-rpc-aigis"] = geetest_utility.get_aigis_header(session_id, mmt_data) + + payload = { + "account": geetest_utility.encrypt_geetest_credentials(account, self._region), + "password": geetest_utility.encrypt_geetest_credentials(password, self._region), + } + + async with aiohttp.ClientSession() as session: + async with session.post( + routes.CN_WEB_LOGIN_URL.get_url(), + json=payload, + headers=headers, + ) as r: + data = await r.json() + + if data["retcode"] == -3102: + # Captcha triggered + aigis = json.loads(r.headers["x-rpc-aigis"]) + aigis["data"] = json.loads(aigis["data"]) + return aigis + + if not data["data"]: + errors.raise_for_retcode(data) + + cookies = {cookie.key: cookie.value for cookie in r.cookies.values()} + + self.set_cookies(cookies) + return cookies + + async def _app_login( self, account: str, password: str, *, geetest: typing.Optional[typing.Dict[str, typing.Any]] = None, + ticket: typing.Optional[typing.Dict[str, typing.Any]] = None, ) -> typing.Dict[str, typing.Any]: """Login with a password using HoYoLab app endpoint. @@ -89,9 +152,13 @@ async def app_login( session_id = geetest["session_id"] headers["x-rpc-aigis"] = geetest_utility.get_aigis_header(session_id, mmt_data) + if ticket: + ticket["verify_str"] = json.dumps(ticket["verify_str"]) + headers["x-rpc-verify"] = json.dumps(ticket) + payload = { - "account": geetest_utility.encrypt_geetest_credentials(account), - "password": geetest_utility.encrypt_geetest_credentials(password), + "account": geetest_utility.encrypt_geetest_credentials(account, self._region), + "password": geetest_utility.encrypt_geetest_credentials(password, self._region), } async with aiohttp.ClientSession() as session: @@ -110,7 +177,9 @@ async def app_login( if data["retcode"] == -3239: # Email verification required - return json.loads(r.headers["x-rpc-verify"]) + verify = json.loads(r.headers["x-rpc-verify"]) + verify["verify_str"] = json.loads(verify["verify_str"]) + return verify if not data["data"]: errors.raise_for_retcode(data) @@ -126,7 +195,7 @@ async def app_login( return cookies - async def send_verification_email( + async def _send_verification_email( self, ticket: typing.Dict[str, typing.Any], *, @@ -164,7 +233,7 @@ async def send_verification_email( return None - async def verify_email(self, code: str, ticket: typing.Dict[str, typing.Any]) -> None: + async def _verify_email(self, code: str, ticket: typing.Dict[str, typing.Any]) -> None: """Verify email.""" async with aiohttp.ClientSession() as session: async with session.post( @@ -184,6 +253,128 @@ async def verify_email(self, code: str, ticket: typing.Dict[str, typing.Any]) -> return None + async def _send_mobile_otp( + self, + mobile: str, + *, + geetest: typing.Optional[typing.Dict[str, typing.Any]] = None, + ) -> typing.Dict[str, typing.Any] | None: + """Attempt to send OTP to the provided mobile number. + + May return aigis headers if captcha is triggered, None otherwise. + """ + headers = { + **geetest_utility.CN_LOGIN_HEADERS, + "ds": ds_utility.generate_dynamic_secret(constants.DS_SALT["cn_signin"]), + } + if geetest: + mmt_data = geetest["data"] + session_id = geetest["session_id"] + headers["x-rpc-aigis"] = geetest_utility.get_aigis_header(session_id, mmt_data) + + payload = { + "mobile": geetest_utility.encrypt_geetest_credentials(mobile, self._region), + "area_code": geetest_utility.encrypt_geetest_credentials("+86", self._region), + } + + async with aiohttp.ClientSession() as session: + async with session.post( + routes.MOBILE_OTP_URL.get_url(), + json=payload, + headers=headers, + ) as r: + data = await r.json() + + if data["retcode"] == -3101: + # Captcha triggered + aigis = json.loads(r.headers["x-rpc-aigis"]) + aigis["data"] = json.loads(aigis["data"]) + return aigis + + if not data["data"]: + errors.raise_for_retcode(data) + + return None + + async def _login_with_mobile_otp(self, mobile: str, otp: str) -> typing.Dict[str, typing.Any]: + """Login with OTP and mobile number. + + Returns cookies if OTP matches the one sent, raises an error otherwise. + """ + headers = { + **geetest_utility.CN_LOGIN_HEADERS, + "ds": ds_utility.generate_dynamic_secret(constants.DS_SALT["cn_signin"]), + } + + payload = { + "mobile": geetest_utility.encrypt_geetest_credentials(mobile, self._region), + "area_code": geetest_utility.encrypt_geetest_credentials("+86", self._region), + "captcha": otp, + } + + async with aiohttp.ClientSession() as session: + async with session.post( + routes.MOBILE_LOGIN_URL.get_url(), + json=payload, + headers=headers, + ) as r: + data = await r.json() + + if not data["data"]: + errors.raise_for_retcode(data) + + cookies = {cookie.key: cookie.value for cookie in r.cookies.values()} + self.set_cookies(cookies) + + return cookies + + async def _create_qrcode(self) -> QRCodeCreationResult: + """Create a QR code for login.""" + device_id = "".join(random.choices(ascii_letters + digits, k=64)) + app_id = "8" + payload = { + "app_id": app_id, + "device": device_id, + } + + async with aiohttp.ClientSession() as session: + async with session.post( + routes.CREATE_QRCODE_URL.get_url(), + json=payload, + ) as r: + data = await r.json() + + if not data["data"]: + errors.raise_for_retcode(data) + + url: str = data["data"]["url"] + return QRCodeCreationResult( + app_id=app_id, + ticket=url.split("ticket=")[1], + device_id=device_id, + url=url, + ) + + async def _check_qrcode(self, app_id: str, device_id: str, ticket: str) -> QRCodeCheckResult: + """Check the status of a QR code login.""" + payload = { + "app_id": app_id, + "device": device_id, + "ticket": ticket, + } + + async with aiohttp.ClientSession() as session: + async with session.post( + routes.CHECK_QRCODE_URL.get_url(), + json=payload, + ) as r: + data = await r.json() + + if not data["data"]: + errors.raise_for_retcode(data) + + return QRCodeCheckResult(**data["data"]) + async def login_with_password( self, account: str, @@ -192,7 +383,10 @@ async def login_with_password( port: int = 5000, tokenType: typing.Optional[int] = 6, geetest_solver: typing.Optional[ - typing.Callable[[typing.Dict[str, typing.Any]], typing.Awaitable[typing.Dict[str, typing.Any]]] + typing.Callable[ + [typing.Dict[str, typing.Any]], + typing.Awaitable[typing.Dict[str, typing.Any]], + ] ] = None, ) -> typing.Dict[str, str]: """Login with a password via web endpoint. @@ -200,7 +394,7 @@ async def login_with_password( Note that this will start a webserver if captcha is triggered and `geetest_solver` is not passed. """ - result = await self.web_login(account, password, tokenType=tokenType) + result = await self._web_login(account, password, tokenType=tokenType) if "session_id" not in result: # Captcha not triggered @@ -211,7 +405,76 @@ async def login_with_password( else: geetest = await server.solve_geetest(result, port=port) - return await self.web_login(account, password, tokenType=tokenType, geetest=geetest) + return await self._web_login(account, password, tokenType=tokenType, geetest=geetest) + + async def cn_login_by_password( + self, + account: str, + password: str, + *, + port: int = 5000, + geetest_solver: typing.Optional[ + typing.Callable[ + [typing.Dict[str, typing.Any]], + typing.Awaitable[typing.Dict[str, typing.Any]], + ] + ] = None, + ) -> typing.Dict[str, str]: + """Login with a password via Miyoushe loginByPassword endpoint. + + Note that this will start a webserver if captcha is triggered and `geetest_solver` is not passed. + """ + result = await self._cn_login_by_password(account, password) + + if "session_id" not in result: + # Captcha not triggered + return result + + if geetest_solver: + geetest = await geetest_solver(result) + else: + geetest = await server.solve_geetest(result, port=port) + + return await self._cn_login_by_password(account, password, geetest=geetest) + + async def check_mobile_number_validity(self, mobile: str) -> bool: + """Check if a mobile number is valid (it's registered on Miyoushe). + + Returns True if the mobile number is valid, False otherwise. + """ + async with aiohttp.ClientSession() as session: + async with session.get( + routes.CHECK_MOBILE_VALIDITY_URL.get_url(), + params={"mobile": mobile}, + ) as r: + data = await r.json() + + return data["data"]["status"] != data["data"]["is_registable"] + + async def login_with_mobile_number( + self, + mobile: str, + ) -> typing.Dict[str, str]: + """Login with mobile number, returns cookies. + + Only works for Chinese region (Miyoushe) users, do not include area code (+86) in the mobile number. + Steps: + 1. Sends OTP to the provided mobile number. + 1-1. If captcha is triggered, prompts the user to solve it. + 2. Lets user enter the OTP. + 3. Logs in with the OTP. + 4. Returns cookies. + """ + result = await self._send_mobile_otp(mobile) + + if result is not None and "session_id" in result: + # Captcha triggered + geetest = await server.solve_geetest(result) + await self._send_mobile_otp(mobile, geetest=geetest) + + otp = await server.enter_otp() + cookies = await self._login_with_mobile_otp(mobile, otp) + return cookies async def login_with_app_password( self, @@ -220,7 +483,10 @@ async def login_with_app_password( *, port: int = 5000, geetest_solver: typing.Optional[ - typing.Callable[[typing.Dict[str, typing.Any]], typing.Awaitable[typing.Dict[str, typing.Any]]] + typing.Callable[ + [typing.Dict[str, typing.Any]], + typing.Awaitable[typing.Dict[str, typing.Any]], + ] ] = None, ) -> typing.Dict[str, str]: """Login with a password via HoYoLab app endpoint. @@ -232,7 +498,7 @@ async def login_with_app_password( 2. Email verification is triggered (can happen if you first login with a new device). """ - result = await self.app_login(account, password) + result = await self._app_login(account, password) if "session_id" in result: # Captcha triggered @@ -241,18 +507,60 @@ async def login_with_app_password( else: geetest = await server.solve_geetest(result, port=port) - result = await self.app_login(account, password, geetest=geetest) + result = await self._app_login(account, password, geetest=geetest) if "risk_ticket" in result: # Email verification required - mmt = await self.send_verification_email(result) + mmt = await self._send_verification_email(result) if mmt: if geetest_solver: geetest = await geetest_solver(mmt) else: geetest = await server.solve_geetest(mmt, port=port) + await self._send_verification_email(result, geetest=geetest) + await server.verify_email(self, result, port=port) - result = await self.app_login(account, password) + result = await self._app_login(account, password, ticket=result) return result + + async def login_with_qrcode(self) -> typing.Dict[str, str]: + """Login with QR code, only available for Miyoushe users. + + Returns cookies. + """ + creation_result = await self._create_qrcode() + qrcode_: qrcode.image.pil.PilImage = qrcode.make(creation_result.url, error_correction=ERROR_CORRECT_L) # type: ignore + qrcode_.show() + + scanned = False + while True: + check_result = await self._check_qrcode( + creation_result.app_id, creation_result.device_id, creation_result.ticket + ) + if check_result.status == QRCodeStatus.SCANNED and not scanned: + LOGGER_.info("QR code scanned") + scanned = True + elif check_result.status == QRCodeStatus.CONFIRMED: + LOGGER_.info("QR code login confirmed") + break + + await asyncio.sleep(2) + + raw_data = check_result.payload.raw + assert raw_data is not None + + cookie_token = await fetch_cookie_token_by_game_token( + game_token=raw_data.game_token, account_id=raw_data.account_id + ) + stoken = await fetch_stoken_by_game_token(game_token=raw_data.game_token, account_id=int(raw_data.account_id)) + + cookies = { + "stoken_v2": stoken.token, + "stuid": stoken.aid, + "mid": stoken.mid, + "cookie_token": cookie_token, + } + self.set_cookies(cookies) + return cookies diff --git a/genshin/client/components/geetest/server.py b/genshin/client/components/geetest/server.py index a8bb641b..0aa0081d 100644 --- a/genshin/client/components/geetest/server.py +++ b/genshin/client/components/geetest/server.py @@ -11,13 +11,10 @@ from . import client -__all__ = ["get_page", "launch_webapp", "solve_geetest", "verify_email"] +__all__ = ["PAGES", "launch_webapp", "solve_geetest", "verify_email"] - -def get_page(page: typing.Literal["captcha", "verify-email"]) -> str: - """Get the HTML page.""" - return ( - """ +PAGES: typing.Final[typing.Dict[typing.Literal["captcha", "verify-email", "enter-otp"], str]] = { + "captcha": """ @@ -55,9 +52,28 @@ def get_page(page: typing.Literal["captcha", "verify-email"]) -> str: ); - """ - if page == "captcha" - else """ + """, + "verify-email": """ + + + + + + + + + """, + "enter-otp": """ @@ -65,7 +81,7 @@ def get_page(page: typing.Literal["captcha", "verify-email"]) -> str: - """ - ) + """, +} GT_URL = "https://raw.githubusercontent.com/GeeTeam/gt3-node-sdk/master/demo/static/libs/gt.js" async def launch_webapp( - page: typing.Literal["captcha", "verify-email"], + page: typing.Literal["captcha", "verify-email", "enter-otp"], *, port: int = 5000, mmt: typing.Optional[typing.Dict[str, typing.Any]] = None, @@ -95,11 +111,15 @@ async def launch_webapp( @routes.get("/captcha") async def captcha(request: web.Request) -> web.StreamResponse: - return web.Response(body=get_page("captcha"), content_type="text/html") + return web.Response(body=PAGES["captcha"], content_type="text/html") @routes.get("/verify-email") async def verify_email(request: web.Request) -> web.StreamResponse: - return web.Response(body=get_page("verify-email"), content_type="text/html") + return web.Response(body=PAGES["verify-email"], content_type="text/html") + + @routes.get("/enter-otp") + async def enter_otp(request: web.Request) -> web.StreamResponse: + return web.Response(body=PAGES["enter-otp"], content_type="text/html") @routes.get("/gt.js") async def gt(request: web.Request) -> web.StreamResponse: @@ -137,6 +157,7 @@ async def send_data_endpoint(request: web.Request) -> web.Response: finally: await asyncio.sleep(0.3) await runner.shutdown() + await runner.cleanup() return data @@ -160,4 +181,12 @@ async def verify_email( data = await launch_webapp("verify-email", port=port) code = data["code"] - return await client.verify_email(code, ticket) + return await client._verify_email(code, ticket) + + +async def enter_otp(port: int = 5000) -> str: + """Lets user enter the OTP.""" + # The enter-otp page is the same as verify-email page. + data = await launch_webapp("enter-otp", port=port) + code = data["code"] + return code diff --git a/genshin/client/manager/cookie.py b/genshin/client/manager/cookie.py index a2b59d0c..76779f4d 100644 --- a/genshin/client/manager/cookie.py +++ b/genshin/client/manager/cookie.py @@ -18,7 +18,10 @@ from __future__ import annotations +import random import typing +import uuid +from string import ascii_letters, digits import aiohttp import aiohttp.typedefs @@ -26,16 +29,33 @@ from genshin import constants, errors, types from genshin.client import routes from genshin.client.manager import managers +from genshin.models.miyoushe.cookie import StokenResult from genshin.utility import ds as ds_utility __all__ = [ "complete_cookies", + "fetch_cookie_token_by_game_token", "fetch_cookie_token_info", "fetch_cookie_with_cookie", "fetch_cookie_with_stoken_v2", + "fetch_stoken_by_game_token", "refresh_cookie_token", ] +STOKEN_BY_GAME_TOKEN_HEADERS = { + "x-rpc-app_version": "2.41.0", + "x-rpc-aigis": "", + "Content-Type": "application/json", + "Accept": "application/json", + "x-rpc-game_biz": "bbs_cn", + "x-rpc-sys_version": "11", + "x-rpc-device_name": "GenshinUid_login_device_lulu", + "x-rpc-device_model": "GenshinUid_login_device_lulu", + "x-rpc-app_id": "bll8iq97cem8", + "x-rpc-client_type": "2", + "User-Agent": "okhttp/4.8.0", +} + async def fetch_cookie_with_cookie( cookies: managers.CookieOrHeader, @@ -166,3 +186,45 @@ async def complete_cookies( cookies = await refresh_cookie_token(cookies, region=region) # type: ignore[assignment] return cookies + + +async def fetch_cookie_token_by_game_token(*, game_token: str, account_id: str) -> str: + """Fetch cookie token by game token, which is obtained by scanning a QR code.""" + url = routes.GET_COOKIE_TOKEN_BY_GAME_TOKEN_URL.get_url() + params = { + "game_token": game_token, + "account_id": account_id, + } + + async with aiohttp.ClientSession() as session: + async with session.get(url, params=params) as r: + data = await r.json() + + if not data["data"]: + errors.raise_for_retcode(data) + + return data["data"]["cookie_token"] + + +async def fetch_stoken_by_game_token(*, game_token: str, account_id: int) -> StokenResult: + """Fetch cookie token by game token, which is obtained by scanning a QR code.""" + url = routes.GET_STOKEN_BY_GAME_TOKEN_URL.get_url() + payload = { + "account_id": account_id, + "game_token": game_token, + } + headers = { + "DS": ds_utility.generate_passport_ds(body=payload), + "x-rpc-device_id": uuid.uuid4().hex, + "x-rpc-device_fp": "".join(random.choices(ascii_letters + digits, k=13)), + **STOKEN_BY_GAME_TOKEN_HEADERS, + } + + async with aiohttp.ClientSession() as session: + async with session.post(url, json=payload, headers=headers) as r: + data = await r.json() + + if not data["data"]: + errors.raise_for_retcode(data) + + return StokenResult(**data["data"]) diff --git a/genshin/client/routes.py b/genshin/client/routes.py index 5a26f3fd..9114f676 100644 --- a/genshin/client/routes.py +++ b/genshin/client/routes.py @@ -13,10 +13,15 @@ "BBS_REFERER_URL", "BBS_URL", "CALCULATOR_URL", + "CHECK_QRCODE_URL", + "CN_WEB_LOGIN_URL", "COMMUNITY_URL", "COOKIE_V2_REFRESH_URL", + "CREATE_QRCODE_URL", "DETAIL_LEDGER_URL", "GACHA_URL", + "GET_COOKIE_TOKEN_BY_GAME_TOKEN_URL", + "GET_STOKEN_BY_GAME_TOKEN_URL", "HK4E_URL", "INFO_LEDGER_URL", "LINEUP_URL", @@ -97,8 +102,8 @@ def get_url(self, region: types.Region, game: types.Game) -> yarl.URL: WEBSTATIC_URL = InternationalRoute( - "https://webstatic-sea.hoyoverse.com/", - "https://webstatic.mihoyo.com/", + "https://operation-webstatic.hoyoverse.com/", + "https://operation-webstatic.mihoyo.com/", ) WEBAPI_URL = InternationalRoute( @@ -212,16 +217,26 @@ def get_url(self, region: types.Region, game: types.Game) -> yarl.URL: ) MI18N = dict( - bbs="https://webstatic-sea.mihoyo.com/admin/mi18n/bbs_cn/m11241040191111/m11241040191111-{lang}.json", + bbs="https://fastcdn.hoyoverse.com/mi18n/bbs_oversea/m11241040191111/m11241040191111-{lang}.json", inquiry="https://mi18n-os.hoyoverse.com/webstatic/admin/mi18n/hk4e_global/m02251421001311/m02251421001311-{lang}.json", ) COOKIE_V2_REFRESH_URL = Route("https://sg-public-api.hoyoverse.com/account/ma-passport/token/getBySToken") +GET_COOKIE_TOKEN_BY_GAME_TOKEN_URL = Route("https://api-takumi.mihoyo.com/auth/api/getCookieAccountInfoByGameToken") +GET_STOKEN_BY_GAME_TOKEN_URL = Route("https://passport-api.mihoyo.com/account/ma-cn-session/app/getTokenByGameToken") WEB_LOGIN_URL = Route("https://sg-public-api.hoyolab.com/account/ma-passport/api/webLoginByPassword") APP_LOGIN_URL = Route("https://sg-public-api.hoyoverse.com/account/ma-passport/api/appLoginByPassword") +CN_WEB_LOGIN_URL = Route("https://passport-api.miyoushe.com/account/ma-cn-passport/web/loginByPassword") SEND_VERIFICATION_CODE_URL = Route( "https://sg-public-api.hoyoverse.com/account/ma-verifier/api/createEmailCaptchaByActionTicket" ) VERIFY_EMAIL_URL = Route("https://sg-public-api.hoyoverse.com/account/ma-verifier/api/verifyActionTicketPartly") + +CHECK_MOBILE_VALIDITY_URL = Route("https://webapi.account.mihoyo.com/Api/is_mobile_registrable") +MOBILE_OTP_URL = Route("https://passport-api.miyoushe.com/account/ma-cn-verifier/verifier/createLoginCaptcha") +MOBILE_LOGIN_URL = Route("https://passport-api.miyoushe.com/account/ma-cn-passport/web/loginByMobileCaptcha") + +CREATE_QRCODE_URL = Route("https://hk4e-sdk.mihoyo.com/hk4e_cn/combo/panda/qrcode/fetch") +CHECK_QRCODE_URL = Route("https://hk4e-sdk.mihoyo.com/hk4e_cn/combo/panda/qrcode/query") diff --git a/genshin/constants.py b/genshin/constants.py index 2d86a6ff..64e42e05 100644 --- a/genshin/constants.py +++ b/genshin/constants.py @@ -29,5 +29,6 @@ types.Region.CHINESE: "xV8v4Qu54lUKrEYFZkJhB8cuOh9Asafs", "app_login": "IZPgfb0dRPtBeLuFkdDznSZ6f4wWt6y2", "cn_signin": "9nQiU3AV0rJSIBWgdynfoGMGKaklfbM7", + "cn_passport": "JwYDpKvLj6MrMqqYU6jTKF17KNO2PXoS", } """Dynamic Secret Salts.""" diff --git a/genshin/errors.py b/genshin/errors.py index 1ea5e1e2..f62ade2f 100644 --- a/genshin/errors.py +++ b/genshin/errors.py @@ -31,7 +31,11 @@ class GenshinException(Exception): original: str = "" msg: str = "" - def __init__(self, response: typing.Mapping[str, typing.Any] = {}, msg: typing.Optional[str] = None) -> None: + def __init__( + self, + response: typing.Mapping[str, typing.Any] = {}, + msg: typing.Optional[str] = None, + ) -> None: self.retcode = response.get("retcode", self.retcode) self.original = response.get("message", "") self.msg = msg or self.msg or self.original @@ -176,6 +180,12 @@ class AccountHasLocked(GenshinException): msg = "Account has been locked because exceeded password limit. Please wait 20 minute and try again" +class WrongOTP(GenshinException): + """Wrong OTP code.""" + + msg = "The provided OTP code is wrong." + + _TGE = typing.Type[GenshinException] _errors: typing.Dict[int, typing.Union[_TGE, str, typing.Tuple[_TGE, typing.Optional[str]]]] = { # misc hoyolab @@ -189,7 +199,10 @@ class AccountHasLocked(GenshinException): # database game record 10101: TooManyRequests, 10102: DataNotPublic, - 10103: (InvalidCookies, "Cookies are valid but do not have a hoyolab account bound to them."), + 10103: ( + InvalidCookies, + "Cookies are valid but do not have a hoyolab account bound to them.", + ), 10104: "Cannot view real-time notes of other users.", # calculator -500001: "Invalid fields in calculation.", @@ -210,7 +223,10 @@ class AccountHasLocked(GenshinException): -2016: RedemptionCooldown, -2017: RedemptionClaimed, -2018: RedemptionClaimed, - -2021: (RedemptionException, "Cannot claim codes for accounts with adventure rank lower than 10."), + -2021: ( + RedemptionException, + "Cannot claim codes for accounts with adventure rank lower than 10.", + ), # rewards -5003: AlreadyClaimed, # chinese @@ -219,6 +235,7 @@ class AccountHasLocked(GenshinException): # account -3208: AccountLoginFail, -3202: AccountHasLocked, + -3205: WrongOTP, } ERRORS: typing.Dict[int, typing.Tuple[_TGE, typing.Optional[str]]] = { diff --git a/genshin/models/__init__.py b/genshin/models/__init__.py index 35edc5e7..38da6131 100644 --- a/genshin/models/__init__.py +++ b/genshin/models/__init__.py @@ -3,5 +3,6 @@ from .genshin import * from .honkai import * from .hoyolab import * +from .miyoushe import * from .model import * from .starrail import * diff --git a/genshin/models/genshin/character.py b/genshin/models/genshin/character.py index 065cc14b..4720d009 100644 --- a/genshin/models/genshin/character.py +++ b/genshin/models/genshin/character.py @@ -4,6 +4,8 @@ import re import typing +from genshin.utility import deprecation + if typing.TYPE_CHECKING: import pydantic.v1 as pydantic else: @@ -20,7 +22,7 @@ _LOGGER = logging.getLogger(__name__) -ICON_BASE = "https://upload-os-bbs.mihoyo.com/game_record/genshin/" +ICON_BASE = "https://enka.network/ui/" def _parse_icon(icon: typing.Union[str, int]) -> str: @@ -86,7 +88,14 @@ def _get_db_char( constants.CHARACTER_NAMES[lang][char.id] = char return char - return constants.DBChar(id or 0, icon_name, name or icon_name, element or "Anemo", rarity or 5, guessed=True) + return constants.DBChar( + id or 0, + icon_name, + name or icon_name, + element or "Anemo", + rarity or 5, + guessed=True, + ) if name: for char in constants.CHARACTER_NAMES[lang].values(): @@ -115,7 +124,7 @@ def __autocomplete(cls, values: typing.Dict[str, typing.Any]) -> typing.Dict[str id, name, icon, element, rarity = (values.get(x) for x in ("id", "name", "icon", "element", "rarity")) char = _get_db_char(id, name, icon, element, rarity, lang=values["lang"]) - icon = _create_icon(char.icon_name, "character_icon/UI_AvatarIcon_{}") + icon = _create_icon(char.icon_name, "UI_AvatarIcon_{}") values["id"] = char.id values["name"] = char.name @@ -143,16 +152,21 @@ def __autocomplete(cls, values: typing.Dict[str, typing.Any]) -> typing.Dict[str return values @property + @deprecation.deprecated("gacha_art") def image(self) -> str: - return _create_icon(self.icon, "character_image/UI_AvatarIcon_{}@2x") + return _create_icon(self.icon, "UI_Gacha_AvatarImg_{}") + + @property + def gacha_art(self) -> str: + return _create_icon(self.icon, "UI_Gacha_AvatarImg_{}") @property def side_icon(self) -> str: - return _create_icon(self.icon, "character_side_icon/UI_AvatarIcon_Side_{}") + return _create_icon(self.icon, "UI_AvatarIcon_Side_{}") @property def card_icon(self) -> str: - return _create_icon(self.icon, "character_card_icon/UI_AvatarIcon_{}_Card") + return _create_icon(self.icon, "UI_AvatarIcon_{}_Card") @property def traveler_name(self) -> str: diff --git a/genshin/models/genshin/chronicle/abyss.py b/genshin/models/genshin/chronicle/abyss.py index 829fcd7d..159c2f8f 100644 --- a/genshin/models/genshin/chronicle/abyss.py +++ b/genshin/models/genshin/chronicle/abyss.py @@ -47,13 +47,13 @@ class CharacterRanks(APIModel): most_played: typing.Sequence[AbyssRankCharacter] = Aliased("reveal_rank", default=[], mi18n="bbs/go_fight_count") most_kills: typing.Sequence[AbyssRankCharacter] = Aliased("defeat_rank", default=[], mi18n="bbs/max_rout_count") strongest_strike: typing.Sequence[AbyssRankCharacter] = Aliased("damage_rank", default=[], mi18n="bbs/powerful_attack") - most_damage_taken: typing.Sequence[AbyssRankCharacter] = Aliased("take_damage_rank", default=[], mi18n="bbs/receive_max_damage") - most_bursts_used: typing.Sequence[AbyssRankCharacter] = Aliased("energy_skill_rank", default=[], mi18n="bbs/element_break_count") - most_skills_used: typing.Sequence[AbyssRankCharacter] = Aliased("normal_skill_rank", default=[], mi18n="bbs/element_skill_use_count") + most_damage_taken: typing.Sequence[AbyssRankCharacter] = Aliased("take_damage_rank", default=[], mi18n="bbs/receive_max_damage") # noqa: E501 + most_bursts_used: typing.Sequence[AbyssRankCharacter] = Aliased("energy_skill_rank", default=[], mi18n="bbs/element_break_count") # noqa: E501 + most_skills_used: typing.Sequence[AbyssRankCharacter] = Aliased("normal_skill_rank", default=[], mi18n="bbs/element_skill_use_count") # noqa: E501 # fmt: on def as_dict(self, lang: typing.Optional[str] = None) -> typing.Mapping[str, typing.Any]: - """Helper function which turns fields into properly named ones""" + """Turn fields into properly named ones.""" return { self._get_mi18n(field, lang or self.lang): getattr(self, field.name) for field in self.__fields__.values() diff --git a/genshin/models/genshin/chronicle/stats.py b/genshin/models/genshin/chronicle/stats.py index fb4d9e75..0734a886 100644 --- a/genshin/models/genshin/chronicle/stats.py +++ b/genshin/models/genshin/chronicle/stats.py @@ -53,7 +53,7 @@ class Stats(APIModel): # fmt: on def as_dict(self, lang: typing.Optional[str] = None) -> typing.Mapping[str, typing.Any]: - """Helper function which turns fields into properly named ones""" + """Turn fields into properly named ones.""" return { self._get_mi18n(field, lang or self.lang): getattr(self, field.name) for field in self.__fields__.values() diff --git a/genshin/models/genshin/daily.py b/genshin/models/genshin/daily.py index 1b2e2443..113eaee6 100644 --- a/genshin/models/genshin/daily.py +++ b/genshin/models/genshin/daily.py @@ -1,6 +1,5 @@ """Daily reward models.""" -import calendar import datetime import typing @@ -19,8 +18,7 @@ class DailyRewardInfo(typing.NamedTuple): def missed_rewards(self) -> int: cn_timezone = datetime.timezone(datetime.timedelta(hours=8)) now = datetime.datetime.now(cn_timezone) - month_days = calendar.monthrange(now.year, now.month)[1] - return month_days - self.claimed_rewards + return now.day - self.claimed_rewards class DailyReward(APIModel): diff --git a/genshin/models/genshin/lineup.py b/genshin/models/genshin/lineup.py index 6ae71946..76f3865e 100644 --- a/genshin/models/genshin/lineup.py +++ b/genshin/models/genshin/lineup.py @@ -296,7 +296,7 @@ def __parse_characters(cls, value: typing.Any) -> typing.Any: if isinstance(value[0], typing.Sequence): return value - return [[character for character in group["group"]] for group in value] + return [list(group["group"]) for group in value] class Lineup(LineupPreview): diff --git a/genshin/models/honkai/chronicle/stats.py b/genshin/models/honkai/chronicle/stats.py index b7f57490..1bb6745d 100644 --- a/genshin/models/honkai/chronicle/stats.py +++ b/genshin/models/honkai/chronicle/stats.py @@ -24,7 +24,7 @@ def _model_to_dict(model: APIModel, lang: str = "en-us") -> typing.Mapping[str, typing.Any]: - """Helper function which turns fields into properly named ones""" + """Turn fields into properly named ones.""" ret: typing.Dict[str, typing.Any] = {} for field in model.__fields__.values(): if not field.field_info.extra.get("mi18n"): @@ -123,7 +123,7 @@ class OldAbyssStats(APIModel): raw_tier: int = Aliased("latest_area", mi18n="bbs/settled_level") raw_latest_rank: typing.Optional[int] = Aliased("latest_level", mi18n="bbs/rank") # TODO: Add proper key - latest_type: str = Aliased( mi18n="bbs/latest_type") + latest_type: str = Aliased( mi18n="bbs/latest_type") # fmt: on @pydantic.validator("raw_q_singularis_rank", "raw_dirac_sea_rank", "raw_latest_rank", pre=True) @@ -240,6 +240,7 @@ def __pack_gamemode_stats(cls, values: typing.Dict[str, typing.Any]) -> typing.D return values def as_dict(self, lang: str = "en-us") -> typing.Mapping[str, typing.Any]: + """Turn fields into properly named ones.""" return _model_to_dict(self, lang) diff --git a/genshin/models/miyoushe/__init__.py b/genshin/models/miyoushe/__init__.py new file mode 100644 index 00000000..f34841ca --- /dev/null +++ b/genshin/models/miyoushe/__init__.py @@ -0,0 +1,4 @@ +"""Miyoushe models.""" + +from .cookie import * +from .qrcode import * diff --git a/genshin/models/miyoushe/cookie.py b/genshin/models/miyoushe/cookie.py new file mode 100644 index 00000000..4cca01cf --- /dev/null +++ b/genshin/models/miyoushe/cookie.py @@ -0,0 +1,23 @@ +"""Miyoushe Cookie Models""" + +import typing + +from pydantic import BaseModel, model_validator + +__all__ = ("StokenResult",) + + +class StokenResult(BaseModel): + """Stoken result.""" + + aid: str + mid: str + token: str + + @model_validator(mode="before") + def _transform_result(cls, values: typing.Dict[str, typing.Any]) -> typing.Dict[str, typing.Any]: + return { + "aid": values["user_info"]["aid"], + "mid": values["user_info"]["mid"], + "token": values["token"]["token"], + } diff --git a/genshin/models/miyoushe/qrcode.py b/genshin/models/miyoushe/qrcode.py new file mode 100644 index 00000000..5ce6d0a6 --- /dev/null +++ b/genshin/models/miyoushe/qrcode.py @@ -0,0 +1,54 @@ +"""Miyoushe QR Code Models""" + +import json +from enum import Enum + +from pydantic import BaseModel, Field, field_validator + +__all__ = ("QRCodeCheckResult", "QRCodeCreationResult", "QRCodePayload", "QRCodeRawData", "QRCodeStatus") + + +class QRCodeStatus(Enum): + """QR code check status.""" + + INIT = "Init" + SCANNED = "Scanned" + CONFIRMED = "Confirmed" + + +class QRCodeRawData(BaseModel): + """QR code raw data.""" + + account_id: str = Field(alias="uid") + """Miyoushe account id.""" + game_token: str = Field(alias="token") + + +class QRCodePayload(BaseModel): + """QR code check result payload.""" + + proto: str + raw: QRCodeRawData | None + ext: str + + @field_validator("raw", mode="before") + def _convert_raw_data(cls, value: str | None) -> QRCodeRawData | None: + if value: + return QRCodeRawData(**json.loads(value)) + return None + + +class QRCodeCheckResult(BaseModel): + """QR code check result.""" + + status: QRCodeStatus = Field(alias="stat") + payload: QRCodePayload + + +class QRCodeCreationResult(BaseModel): + """QR code creation result.""" + + app_id: str + ticket: str + device_id: str + url: str diff --git a/genshin/models/starrail/chronicle/challenge.py b/genshin/models/starrail/chronicle/challenge.py index 1441f190..17afd98f 100644 --- a/genshin/models/starrail/chronicle/challenge.py +++ b/genshin/models/starrail/chronicle/challenge.py @@ -1,6 +1,14 @@ """Starrail chronicle challenge.""" -from typing import List, Optional +from typing import TYPE_CHECKING, Any, Dict, List, Optional + +if TYPE_CHECKING: + import pydantic.v1 as pydantic +else: + try: + import pydantic.v1 as pydantic + except ImportError: + import pydantic from genshin.models.model import Aliased, APIModel from genshin.models.starrail.character import FloorCharacter diff --git a/genshin/models/starrail/chronicle/characters.py b/genshin/models/starrail/chronicle/characters.py index f6d3bde3..cda87544 100644 --- a/genshin/models/starrail/chronicle/characters.py +++ b/genshin/models/starrail/chronicle/characters.py @@ -1,14 +1,30 @@ """Starrail chronicle character.""" -from typing import List, Optional +import typing +from typing import Any, Mapping, Optional, Sequence + +if typing.TYPE_CHECKING: + import pydantic.v1 as pydantic +else: + try: + import pydantic.v1 as pydantic + except ImportError: + import pydantic from genshin.models.model import APIModel from .. import character __all__ = [ + "CharacterProperty", + "ModifyRelicProperty", + "PropertyInfo", "Rank", + "RecommendProperty", "Relic", + "RelicProperty", + "Skill", + "SkillStage", "StarRailDetailCharacter", "StarRailDetailCharacters", "StarRailEquipment", @@ -24,6 +40,29 @@ class StarRailEquipment(APIModel): name: str desc: str icon: str + rarity: int + wiki: str + + +class PropertyInfo(APIModel): + """Relic property info.""" + + property_type: int + name: str + icon: str + property_name_relic: str + property_name_filter: str + + +class RelicProperty(APIModel): + """Relic property.""" + + property_type: int + value: str + times: int + preferred: bool + recommended: bool + info: PropertyInfo class Relic(APIModel): @@ -36,6 +75,9 @@ class Relic(APIModel): desc: str icon: str rarity: int + wiki: str + main_property: RelicProperty + properties: Sequence[RelicProperty] class Rank(APIModel): @@ -49,17 +91,119 @@ class Rank(APIModel): is_unlocked: bool +class CharacterProperty(APIModel): + """Base character property.""" + + property_type: int + base: str + add: str + final: str + preferred: bool + recommended: bool + info: PropertyInfo + + +class SkillStage(APIModel): + """Character skill stage.""" + + name: str + desc: str + level: int + remake: str + item_url: str + is_activated: bool + is_rank_work: bool + + +class Skill(APIModel): + """Character skill.""" + + point_id: str + point_type: int + item_url: str + level: int + is_activated: bool + is_rank_work: bool + pre_point: str + anchor: str + remake: str + skill_stages: Sequence[SkillStage] + + +class RecommendProperty(APIModel): + """Character recommended and preferred properties.""" + + recommend_relic_properties: Sequence[int] + custom_relic_properties: Sequence[int] + is_custom_property_valid: bool + + class StarRailDetailCharacter(character.StarRailPartialCharacter): """StarRail character with equipment and relics.""" image: str equip: Optional[StarRailEquipment] - relics: List[Relic] - ornaments: List[Relic] - ranks: List[Rank] + relics: Sequence[Relic] + ornaments: Sequence[Relic] + ranks: Sequence[Rank] + properties: Sequence[CharacterProperty] + base_type: int + figure_path: str + skills: Sequence[Skill] + + +class ModifyRelicProperty(APIModel): + """Modify relic property.""" + + property_type: int + modify_property_type: int class StarRailDetailCharacters(APIModel): """StarRail characters.""" - avatar_list: List[StarRailDetailCharacter] + avatar_list: Sequence[StarRailDetailCharacter] + equip_wiki: Mapping[str, str] + relic_wiki: Mapping[str, str] + property_info: Mapping[str, PropertyInfo] + recommend_property: Mapping[str, RecommendProperty] + relic_properties: Sequence[ModifyRelicProperty] + + @pydantic.root_validator(pre=True) + def __fill_additional_fields(cls, values: Mapping[str, Any]) -> Mapping[str, Any]: + """Fill additional fields for convenience.""" + characters = values.get("avatar_list", []) + props_info = values.get("property_info", {}) + rec_props = values.get("recommend_property", {}) + equip_wiki = values.get("equip_wiki", {}) + relic_wiki = values.get("relic_wiki", {}) + + for char in characters: + char_id = str(char["id"]) + char_rec_props = rec_props[char_id]["recommend_relic_properties"] + char_custom_props = rec_props[char_id]["custom_relic_properties"] + + for relic in char["relics"] + char["ornaments"]: + prop_type = relic["main_property"]["property_type"] + relic["main_property"]["info"] = props_info[str(prop_type)] + relic["main_property"]["recommended"] = prop_type in char_rec_props + relic["main_property"]["preferred"] = prop_type in char_custom_props + + for prop in relic["properties"]: + prop_type = prop["property_type"] + prop["recommended"] = prop_type in char_rec_props + prop["preferred"] = prop_type in char_custom_props + prop["info"] = props_info[str(prop_type)] + + relic["wiki"] = relic_wiki.get(str(relic["id"]), "") + + for prop in char["properties"]: + prop_type = prop["property_type"] + prop["recommended"] = prop_type in char_rec_props + prop["preferred"] = prop_type in char_custom_props + prop["info"] = props_info[str(prop_type)] + + if char["equip"]: + char["equip"]["wiki"] = equip_wiki.get(str(char["equip"]["id"]), "") + + return values diff --git a/genshin/models/starrail/chronicle/notes.py b/genshin/models/starrail/chronicle/notes.py index d3fa8f8e..ca09e147 100644 --- a/genshin/models/starrail/chronicle/notes.py +++ b/genshin/models/starrail/chronicle/notes.py @@ -15,6 +15,7 @@ class StarRailExpedition(APIModel): status: typing.Literal["Ongoing", "Finished"] remaining_time: datetime.timedelta name: str + item_url: str @property def finished(self) -> bool: diff --git a/genshin/utility/ds.py b/genshin/utility/ds.py index 1cd169f5..adae34c3 100644 --- a/genshin/utility/ds.py +++ b/genshin/utility/ds.py @@ -9,7 +9,7 @@ from genshin import constants, types -__all__ = ["generate_cn_dynamic_secret", "generate_dynamic_secret", "get_ds_headers"] +__all__ = ["generate_cn_dynamic_secret", "generate_dynamic_secret", "generate_passport_ds", "get_ds_headers"] def generate_dynamic_secret(salt: str = constants.DS_SALT[types.Region.OVERSEAS]) -> str: @@ -59,3 +59,14 @@ def get_ds_headers( else: raise TypeError(f"{region!r} is not a valid region.") return ds_headers + + +def generate_passport_ds(body: typing.Mapping[str, typing.Any]) -> str: + """Create a dynamic secret for Miyoushe passport API.""" + salt = constants.DS_SALT["cn_passport"] + t = int(time.time()) + r = "".join(random.sample(string.ascii_letters, 6)) + b = json.dumps(body) + h = hashlib.md5(f"salt={salt}&t={t}&r={r}&b={b}&q=".encode()).hexdigest() + result = f"{t},{r},{h}" + return result diff --git a/genshin/utility/extdb.py b/genshin/utility/extdb.py index 52fb5ac6..b3058cd3 100644 --- a/genshin/utility/extdb.py +++ b/genshin/utility/extdb.py @@ -2,6 +2,7 @@ import asyncio import json +import logging import time import typing import warnings @@ -20,6 +21,8 @@ "update_characters_genshindata", ) +LOGGER_ = logging.getLogger(__name__) + CACHE_FILE = fs.get_tempdir() / "characters.json" if CACHE_FILE.exists() and time.time() - CACHE_FILE.stat().st_mtime < 7 * 24 * 60 * 60: @@ -170,8 +173,10 @@ async def update_characters_enka(langs: typing.Sequence[str] = ()) -> None: continue # traveler element for short_lang, loc in locs.items(): + if (lang := ENKA_LANG_MAP.get(short_lang)) is None: + continue update_character_name( - lang=ENKA_LANG_MAP[short_lang], + lang=lang, id=int(strid), icon_name=char["SideIconName"][len("UI_AvatarIcon_Side_") :], # noqa: E203 name=loc[str(char["NameTextMapHash"])], @@ -235,7 +240,7 @@ async def update_characters_any( try: await updator(langs) except Exception: - continue + LOGGER_.exception("Failed to update characters with %s", updator.__name__) else: return diff --git a/genshin/utility/geetest.py b/genshin/utility/geetest.py index 300ba2ae..fe388afb 100644 --- a/genshin/utility/geetest.py +++ b/genshin/utility/geetest.py @@ -4,11 +4,13 @@ import json import typing +from ..types import Region + __all__ = ["encrypt_geetest_credentials"] # RSA key is the same for app and web login -LOGIN_KEY_CERT = b""" +OS_LOGIN_KEY_CERT = b""" -----BEGIN PUBLIC KEY----- MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA4PMS2JVMwBsOIrYWRluY wEiFZL7Aphtm9z5Eu/anzJ09nB00uhW+ScrDWFECPwpQto/GlOJYCUwVM/raQpAj @@ -20,13 +22,18 @@ -----END PUBLIC KEY----- """ +CN_LOGIN_KEY_CERT = b""" +-----BEGIN PUBLIC KEY----- +MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDDvekdPMHN3AYhm/vktJT+YJr7 +cI5DcsNKqdsx5DZX0gDuWFuIjzdwButrIYPNmRJ1G8ybDIF7oDW2eEpm5sMbL9zs +9ExXCdvqrn51qELbqj0XxtMTIpaCHFSI50PfPpTFV9Xt/hmyVwokoOXFlAEgCn+Q +CgGs52bFoYMtyi+xEQIDAQAB +-----END PUBLIC KEY----- +""" + WEB_LOGIN_HEADERS = { "x-rpc-app_id": "c9oqaq3s3gu8", "x-rpc-client_type": "4", - "x-rpc-sdk_version": "2.14.1", - "x-rpc-game_biz": "bbs_oversea", - "x-rpc-source": "v2.webLogin", - "x-rpc-referrer": "https://www.hoyolab.com", # If not equal account.hoyolab.com It's will return retcode 1200 [Unauthorized] "Origin": "https://account.hoyolab.com", "Referer": "https://account.hoyolab.com/", @@ -34,28 +41,42 @@ APP_LOGIN_HEADERS = { "x-rpc-app_id": "c9oqaq3s3gu8", - "x-rpc-app_version": "2.47.0", "x-rpc-client_type": "2", - "x-rpc-sdk_version": "2.22.0", - "x-rpc-game_biz": "bbs_oversea", - "Origin": "https://account.hoyoverse.com", - "Referer": "https://account.hoyoverse.com/", + # Passing "x-rpc-device_id" header will trigger email verification + # (unless the device_id is already verified). + # + # For some reason, without this header, email verification is not triggered. + # "x-rpc-device_id": "1c33337bd45c1bfs", +} + +CN_LOGIN_HEADERS = { + "x-rpc-app_id": "bll8iq97cem8", + "x-rpc-client_type": "4", + "Origin": "https://user.miyoushe.com", + "Referer": "https://user.miyoushe.com/", + "x-rpc-source": "v2.webLogin", + "x-rpc-mi_referrer": "https://user.miyoushe.com/login-platform/index.html?app_id=bll8iq97cem8&theme=&token_type=4&game_biz=bbs_cn&message_origin=https%253A%252F%252Fwww.miyoushe.com&succ_back_type=message%253Alogin-platform%253Alogin-success&fail_back_type=message%253Alogin-platform%253Alogin-fail&ux_mode=popup&iframe_level=1#/login/password", # noqa: E501 + "x-rpc-device_id": "586f2440-856a-4243-8076-2b0a12314197", } EMAIL_SEND_HEADERS = { "x-rpc-app_id": "c9oqaq3s3gu8", + "x-rpc-client_type": "2", } EMAIL_VERIFY_HEADERS = { "x-rpc-app_id": "c9oqaq3s3gu8", + "x-rpc-client_type": "2", } -def encrypt_geetest_credentials(text: str) -> str: +def encrypt_geetest_credentials(text: str, region: Region = Region.OVERSEAS) -> str: """Encrypt text for geetest.""" import rsa - public_key = rsa.PublicKey.load_pkcs1_openssl_pem(LOGIN_KEY_CERT) + public_key = rsa.PublicKey.load_pkcs1_openssl_pem( + OS_LOGIN_KEY_CERT if region is Region.OVERSEAS else CN_LOGIN_KEY_CERT + ) crypto = rsa.encrypt(text.encode("utf-8"), public_key) return base64.b64encode(crypto).decode("utf-8") diff --git a/genshin/utility/uid.py b/genshin/utility/uid.py index c074be42..d8e9193b 100644 --- a/genshin/utility/uid.py +++ b/genshin/utility/uid.py @@ -15,22 +15,42 @@ "recognize_starrail_server", ] -UID_RANGE: typing.Mapping[types.Game, typing.Mapping[types.Region, typing.Sequence[int]]] = { +UID_RANGE: typing.Mapping[types.Game, typing.Mapping[types.Region, typing.Sequence[str]]] = { types.Game.GENSHIN: { - types.Region.OVERSEAS: (6, 7, 8, 9), - types.Region.CHINESE: (1, 2, 5), + types.Region.OVERSEAS: ("6", "7", "8", "18", "9"), + types.Region.CHINESE: ("1", "2", "3", "5"), }, types.Game.STARRAIL: { - types.Region.OVERSEAS: (6, 7, 8, 9), - types.Region.CHINESE: (1, 2, 5), + types.Region.OVERSEAS: ("6", "7", "8", "9"), + types.Region.CHINESE: ("1", "2", "5"), }, types.Game.HONKAI: { - types.Region.OVERSEAS: (1, 2), - types.Region.CHINESE: (3, 4), + types.Region.OVERSEAS: ("1", "2"), + types.Region.CHINESE: ("3", "4"), }, } """Mapping of games and regions to their respective UID ranges.""" +GENSHIN_SERVER_RANGE: typing.Mapping[str, typing.Sequence[str]] = { + "cn_gf01": ("1", "2", "3"), + "cn_qd01": ("5",), + "os_usa": ("6",), + "os_euro": ("7",), + "os_asia": ("8", "18"), + "os_cht": ("9",), +} +"""Mapping of Genshin servers to their respective UID ranges.""" + +STARRAIL_SERVER_RANGE: typing.Mapping[str, typing.Sequence[str]] = { + "prod_gf_cn": ("1", "2"), + "prod_qd_cn": ("5",), + "prod_official_usa": ("6",), + "prod_official_eur": ("7",), + "prod_official_asia": ("8",), + "prod_official_cht": ("9",), +} +"""Mapping of Star Rail servers to their respective UID ranges.""" + def create_short_lang_code(lang: str) -> str: """Create an alternative short lang code.""" @@ -39,18 +59,9 @@ def create_short_lang_code(lang: str) -> str: def recognize_genshin_server(uid: int) -> str: """Recognize which server a Genshin UID is from.""" - server = { - "1": "cn_gf01", - "2": "cn_gf01", - "5": "cn_qd01", - "6": "os_usa", - "7": "os_euro", - "8": "os_asia", - "9": "os_cht", - }.get(str(uid)[0]) - - if server: - return server + for server_name, digits in GENSHIN_SERVER_RANGE.items(): + if str(uid)[:-8] in digits: + return server_name raise ValueError(f"UID {uid} isn't associated with any server") @@ -95,18 +106,9 @@ def recognize_honkai_server(uid: int) -> str: def recognize_starrail_server(uid: int) -> str: """Recognize which server a Star Rail UID is from.""" - server = { - "1": "prod_gf_cn", - "2": "prod_gf_cn", - "5": "prod_qd_cn", - "6": "prod_official_usa", - "7": "prod_official_eur", - "8": "prod_official_asia", - "9": "prod_official_cht", - }.get(str(uid)[0]) - - if server: - return server + for server, digits in STARRAIL_SERVER_RANGE.items(): + if str(uid)[:-8] in digits: + return server raise ValueError(f"UID {uid} isn't associated with any server") @@ -128,10 +130,8 @@ def recognize_game(uid: int, region: types.Region) -> typing.Optional[types.Game if len(str(uid)) == 8: return types.Game.HONKAI - first = int(str(uid)[0]) - for game, digits in UID_RANGE.items(): - if first in digits[region]: + if str(uid)[:-8] in digits[region]: return game return None @@ -139,10 +139,8 @@ def recognize_game(uid: int, region: types.Region) -> typing.Optional[types.Game def recognize_region(uid: int, game: types.Game) -> typing.Optional[types.Region]: """Recognize the region of a uid.""" - first = int(str(uid)[0]) - for region, digits in UID_RANGE[game].items(): - if first in digits: + if str(uid)[:-8] in digits: return region return None diff --git a/noxfile.py b/noxfile.py index 550a0401..3da55b37 100644 --- a/noxfile.py +++ b/noxfile.py @@ -31,7 +31,7 @@ def install_requirements(session: nox.Session, *requirements: str, literal: bool """Install requirements.""" if not literal and all(requirement.isalpha() for requirement in requirements): files = ["requirements.txt"] + [f"./genshin-dev/{requirement}-requirements.txt" for requirement in requirements] - requirements = ("pip",) + tuple(arg for file in files for arg in ("-r", file)) + requirements = ("pip", *tuple(arg for file in files for arg in ("-r", file))) session.install("--upgrade", *requirements, silent=not isverbose()) @@ -52,16 +52,15 @@ def docs(session: nox.Session) -> None: def lint(session: nox.Session) -> None: """Run this project's modules against the pre-defined flake8 linters.""" install_requirements(session, "lint") - session.run("flake8", "--version") - session.run("flake8", *GENERAL_TARGETS, *verbose_args()) + session.run("ruff", "check", *GENERAL_TARGETS, *verbose_args()) @nox.session() def reformat(session: nox.Session) -> None: """Reformat this project's modules to fit the standard style.""" install_requirements(session, "reformat") - session.run("black", *GENERAL_TARGETS, *verbose_args()) - session.run("isort", *GENERAL_TARGETS, *verbose_args()) + session.run("python", "-m", "black", *GENERAL_TARGETS, *verbose_args()) + session.run("python", "-m", "ruff", "check", "--fix-only", "--fixable", "ALL", *GENERAL_TARGETS, *verbose_args()) session.log("sort-all") LOGGER.disabled = True diff --git a/pyproject.toml b/pyproject.toml index 804aafe2..96634f51 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,10 +4,72 @@ build-backend = "setuptools.build_meta" [tool.black] line-length = 120 -target-version = ["py39"] -[tool.isort] -profile = "black" +[tool.ruff] +line-length = 120 +target-version = "py38" + +[tool.ruff.lint] +select = [ + "A", + "C4", + "C9", + "D", + "E", + "F", + "S", + "W", + "T20", + "PT", + "RSE" +] +exclude = ["tests", "test.py"] + +# A001, A002, A003: `id` variable/parameter/attribute +# C408: dict() with keyword arguments +# D101: Missing docstring in public module +# D105: Missing docstring in magic method +# D106: Missing docstring Model.Config +# D400: First line should end with a period +# D419: Docstring is empty +# PT007: Wrong values type in `@pytest.mark.parametrize` expected `list` of `tuple` +# PT018: Assertion should be broken down into multiple parts +# S101: Use of assert for type checking +# S303: Use of md5 +# S311: Use of pseudo-random generators +# S324: Use of md5 without usedforsecurity=False (3.9+) +ignore = [ + "A001", "A002", "A003", + "C408", + "D100", "D105", "D106", "D400", "D419", + "PT007", "PT018", + "S101", "S303", "S311", "S324", +] + +# auto-fixing too intrusive +# F401: Unused import +# F841: Unused variable +# B007: Unused loop variable +unfixable = ["F401", "F841", "B007"] + +[tool.ruff.lint.per-file-ignores] +# F401: unused import. +# F403: cannot detect unused vars if we use starred import +# D10*: docstrings +# S10*: hardcoded passwords +# F841: unused variable +"**/__init__.py" = ["F401", "F403"] +"tests/**" = ["D10", "S10", "F841"] + +[tool.ruff.lint.mccabe] +max-complexity = 16 + +[tool.ruff.lint.pycodestyle] +max-line-length = 130 + +[tool.ruff.lint.pydocstyle] +convention = "numpy" +ignore-decorators = ["property"] [tool.pytest.ini_options] asyncio_mode = "auto" diff --git a/requirements.txt b/requirements.txt index 95cbe366..fa1c601e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,3 +9,4 @@ browser-cookie3 rsa aioredis click +qrcode[pil] \ No newline at end of file diff --git a/setup.py b/setup.py index 96e87d66..c8cb0974 100644 --- a/setup.py +++ b/setup.py @@ -1,9 +1,10 @@ """Run setuptools.""" + from setuptools import find_packages, setup setup( name="genshin", - version="1.6.2", + version="1.6.3", author="thesadru", author_email="thesadru@gmail.com", description="An API wrapper for Genshin Impact.", @@ -17,9 +18,9 @@ python_requires=">=3.8", install_requires=["aiohttp", "pydantic"], extras_require={ - "all": ["browser-cookie3", "rsa", "click"], + "all": ["browser-cookie3", "rsa", "click", "qrcode[pil]"], "cookies": ["browser-cookie3"], - "geetest": ["rsa"], + "geetest": ["rsa", "qrcode[pil]"], "cli": ["click"], }, include_package_data=True,