diff --git a/globals.py b/globals.py index 0dff356..e993d52 100644 --- a/globals.py +++ b/globals.py @@ -10,6 +10,8 @@ from loguru import logger from sentry_sdk.integrations.loguru import LoggingLevels, LoguruIntegration +import inquirer + logger.remove(handler_id=0) handler_id = logger.add( sys.stderr, @@ -22,9 +24,13 @@ elif os.path.exists("do-not-upload-error"): sample_rate=0 else: - logger.info("可选的错误上传:您是否选择上传可能遇到的错误以帮助我们改善脚本?(Y/n)") - is_upload_error = input() - if is_upload_error.lower() == "n": + is_upload_error = inquirer.prompt([ + inquirer.Confirm( + 'upload', + message="可选的错误上传:您是否选择上传可能遇到的错误以帮助我们改善脚本?", + ), + ]) + if not is_upload_error: logger.info("已选择不上传错误") sample_rate=0 with open("do-not-upload-error", "w") as f: @@ -63,13 +69,18 @@ class HygException(Exception): def load_config(): # 判断是否存在config.json if os.path.exists("config.json"): - is_use_config = input( - "已存在上一次的配置文件,是否沿用全部或只沿用登录信息(包括风控信息)?(Y/l/n)" - ) - if is_use_config == "n": + is_use_config = inquirer.prompt([ + inquirer.List( + 'use_config', + message="已存在上一次的配置文件,是否沿用全部或只沿用登录信息(包括风控信息)?", + choices=['全部', '只沿用登录信息', '不沿用'], + default='全部', + ), + ]) + if is_use_config["use_config"] == "不沿用": logger.info("重新配置") config = {} - elif is_use_config == "l": + elif is_use_config["use_config"] == "只沿用登录信息": logger.info("只沿用登录信息") with open("config.json", "r", encoding="utf-8") as f: config = {} @@ -83,7 +94,7 @@ def load_config(): logger.error("读取cookie失败,重新配置") config = {} else: - if is_use_config.lower() == "y": + if is_use_config["use_config"].lower() == "全部": logger.info("使用上次的配置文件") else: logger.info("已默认使用上次的配置文件") diff --git a/login.py b/login.py index 6fdb0b6..405c55d 100644 --- a/login.py +++ b/login.py @@ -1,5 +1,5 @@ import base64 -import json +import inquirer import time import qrcode @@ -76,15 +76,10 @@ def verify_code_login(session, headers): gt = captcha["data"]["geetest"]["gt"] challenge = captcha["data"]["geetest"]["challenge"] token = captcha["data"]["token"] - tel = input("请输入手机号(非大陆手机号请添加国家号,如+1 4438888888): ").split( - " " - ) - if len(tel) == 1: - cid = "+86" - tel = tel[0] - else: - cid = tel[0] - tel = tel[1] + tel = inquirer.prompt([ + inquirer.Text("tel", message="请输入手机号(仅支持大陆手机号)", validate=lambda _, x: len(x) == 11 and x.isdigit()) + ])["tel"] + cid = "86" logger.info("请稍后,正在执行自动验证...") cap_data = _verify(gt, challenge, token) while cap_data == False: @@ -99,6 +94,7 @@ def verify_code_login(session, headers): cap_data = _verify(gt, challenge, token) logger.success("验证完成") data = { + "source": "main-fe-header", "cid": cid, "tel": tel, "token": token, @@ -106,6 +102,7 @@ def verify_code_login(session, headers): "validate": cap_data["validate"], "seccode": cap_data["seccode"] + "|jordan", } + print(data) # https://passport.bilibili.com/x/passport-login/web/sms/send send = session.post( "https://passport.bilibili.com/x/passport-login/web/sms/send", @@ -119,7 +116,9 @@ def verify_code_login(session, headers): logger.success("验证码发送成功") send_token = send["data"]["captcha_key"] while True: - code = input("请输入验证码: ") + code = inquirer.prompt([ + inquirer.Text("code", message="请输入验证码", validate=lambda _, x: len(x) == 6 and x.isdigit()) + ])["code"] # https://passport.bilibili.com/x/passport-login/web/login/sms data = {"cid": cid, "tel": tel, "captcha_key": send_token, "code": code} login = session.post( @@ -140,10 +139,12 @@ def password_login(session, headers): from Crypto.PublicKey import RSA # https://passport.bilibili.com/x/passport-login/web/key - username = input("请输入用户名(通常为手机号): ") - import getpass - - password = getpass.getpass("请输入密码:") + username = inquirer.prompt([ + inquirer.Text("username", message="请输入用户名(通常为手机号)") + ])["username"] + password = inquirer.prompt([ + inquirer.Password("password", message="请输入密码") + ])["password"] captcha = session.get( "https://passport.bilibili.com/x/passport-login/captcha", headers=headers ).json() @@ -253,7 +254,9 @@ def password_login(session, headers): logger.success("验证码发送成功") send_token = send["data"]["captcha_key"] while True: - code = input("请输入验证码: ") + code = inquirer.prompt([ + inquirer.Text("code", message="请输入验证码", validate=lambda _, x: len(x) == 6 and x.isdigit()) + ])["code"] data = { "type": "loginTelCheck", "tmp_code": tmp_token, @@ -293,17 +296,23 @@ def interactive_login(): session = requests.session() session.get("https://www.bilibili.com/", headers=headers) - logger.info( - "请选择登录方式\n1. cookie登录\n2. 扫码登录\n3. 用户名密码登录\n4. 验证码登录" - ) - method = input("请输入数字: ") - if method == "1": - cookie_str = input("请输入cookie: ") - elif method == "2": + method = inquirer.prompt([ + inquirer.List( + 'method', + message="请选择登录方式", + choices=['Cookie登录', '扫码登录', '用户名密码登录', '验证码登录'], + default='扫码登录', + ), + ])["method"] + if method == "Cookie登录": + cookie_str = inquirer.prompt([ + inquirer.Text("cookie", message="请输入cookie") + ]) + elif method == "扫码登录": cookie_str = qr_login(session, headers) - elif method == "3": + elif method == "用户名密码登录": cookie_str = password_login(session, headers) - elif method == "4": + elif method == "验证码登录": cookie_str = verify_code_login(session, headers) else: logger.error("暂不支持此方式") diff --git a/main.py b/main.py index 974a7fd..48643e7 100644 --- a/main.py +++ b/main.py @@ -2,7 +2,7 @@ # Copyright (c) 2023 ZianTT import json import os -import threading +import inquirer import time import kdl @@ -41,22 +41,10 @@ def run(hyg): logger.warning("未开放购票") elif status == 3: logger.warning("已停售") - if not "ignore" in vars(): - ignore = input( - "当前状态可能无法抢票,请确认是否继续抢票,按回车继续" - ) elif status == 5: logger.warning("不可售") - if not "ignore" in vars(): - ignore = input( - "当前状态可能无法抢票,请确认是否继续抢票,按回车继续" - ) elif status == 102: logger.warning("已结束") - if not "ignore" in vars(): - ignore = input( - "当前状态可能无法抢票,请确认是否继续抢票,按回车继续" - ) while True: if last_reset + 60 > time.time(): hyg.session.close() @@ -124,51 +112,57 @@ def main(): session = requests.Session() if "mode" not in config: - mode_str = input("是否直接抢票(不进行检测)y/N:") - if mode_str.lower() == "y": + mode_str = inquirer.prompt([ + inquirer.List( + 'mode', + message="是否直接抢票(不进行检测)", + choices=['是', '否'], + default='否', + ), + ])["mode"] + if mode_str == "是": config["mode"] = True else: config["mode"] = False if "co_delay" not in config: - co_delay = input( - "请输入创建订单时间间隔(该选项影响412风控概率,单开建议使用0)(秒):" - ) - try: - config["co_delay"] = float(co_delay) - except: - logger.warning("未设置时间间隔,默认为0") - config["co_delay"] = 0 - while config["co_delay"] < 0: - logger.error("时间间隔过短") - config["co_delay"] = float( - input( - "请输入创建订单时间间隔(该选项影响412风控概率,单开建议使用0)(秒):" - ) - ) + co_delay = inquirer.prompt([ + inquirer.Text( + 'co_delay', + message="请输入创建订单时间间隔(该选项影响412风控概率,单开建议使用0)(秒)", + validate=lambda _, x: x.replace(".", "").isdigit() and float(x) >= 0, + default='0', + ), + ])["co_delay"] + config["co_delay"] = float(co_delay) if "status_delay" not in config and not config["mode"]: - try: - config["status_delay"] = float( - input("请输入票务信息检测时间间隔(该选项影响412风控概率)(秒):") - ) - except: - logger.warning("未设置时间间隔,默认为0.2") - config["status_delay"] = 0.2 - while config["status_delay"] < 0: - logger.error("时间间隔过短") - config["status_delay"] = float( - input("请输入票务信息检测时间间隔(该选项影响412风控概率)(秒):") - ) + status_delay = inquirer.prompt([ + inquirer.Text( + 'status_delay', + message="请输入票务信息检测时间间隔(该选项影响412风控概率)(秒)", + validate=lambda _, x: x.replace(".", "").isdigit() and float(x) >= 0, + default='0.2', + ), + ])["status_delay"] + config["status_delay"] = float(status_delay) if "proxy" not in config: - choice = input("是否使用代理?y/N:") - if choice.lower() == "y": - while True: - config["proxy_auth"] = input("请输入代理认证信息: ").split(" ") - if len(config["proxy_auth"]) != 3: - logger.error("代理认证信息错误") - continue - config["proxy"] = True - break + choice = inquirer.prompt([ + inquirer.List( + 'proxy', + message="是否使用代理", + choices=['是', '否'], + default='否', + ), + ])["proxy"] + if choice == "是": + config["proxy_auth"] = inquirer.prompt([ + inquirer.Text( + 'proxy_auth', + message="请输入代理认证信息: ", + validate=lambda _, x: len(x.split(" ")) == 3, + ), + ])["proxy_auth"].split(" ") + config["proxy"] = True else: config["proxy"] = False kdl_client = None @@ -185,8 +179,15 @@ def main(): + kdl_client.tps_current_ip(sign_type="hmacsha1") ) if "again" not in config: - choice = input("是否允许重复下单?Y/n:") - if choice.lower() == "n": + choice = inquirer.prompt([ + inquirer.List( + 'again', + message="是否允许重复下单?", + choices=['是', '否'], + default='是', + ), + ])["again"] + if choice == "否": config["again"] = False else: config["again"] = True @@ -207,7 +208,13 @@ def main(): ) if len(common_project_id) == 0: logger.info("暂无") - config["project_id"] = input("请输入项目id:") + config["project_id"] = inquirer.prompt([ + inquirer.Text( + 'project_id', + message="请输入项目id", + validate=lambda _, x: x.isdigit(), + ), + ])["project_id"] url = ( "https://show.bilibili.com/api/ticket/project/getV2?version=134&id=" + config["project_id"] @@ -237,34 +244,21 @@ def main(): config["id_bind"] = response["data"]["id_bind"] config["is_paper_ticket"] = response["data"]["has_paper_ticket"] screens = response["data"]["screen_list"] - for i in range(len(screens)): - logger.info(str(i) + ". " + screens[i]["name"]) - while True: - try: - screen_id = int(input("请输入场次序号:")) - if screen_id >= len(screens): - raise ValueError - break - except ValueError: - logger.error("序号错误") - tickets = screens[int(screen_id)]["ticket_list"] # type: ignore - for i in range(len(tickets)): - logger.info( - str(i) - + ". " - + tickets[i]["desc"] - + " " - + str(tickets[i]["price"] / 100) - + "元" - ) - while True: - try: - sku_id = int(input("请输入票档序号:")) - if sku_id >= len(tickets): - raise ValueError - break - except ValueError: - logger.error("序号错误") + screen_id = inquirer.prompt([ + inquirer.List( + 'screen_id', + message="请选择场次", + choices=[f"{i}. {screens[i]['name']}" for i in range(len(screens))], + ), + ])["screen_id"].split(".")[0] + tickets = screens[int(screen_id)]["ticket_list"] + sku_id = inquirer.prompt([ + inquirer.List( + 'sku_id', + message="请选择票档", + choices=[f"{i}. {tickets[i]['desc']} {tickets[i]['price'] / 100}元" for i in range(len(tickets))], + ), + ])["sku_id"].split(".")[0] config["screen_id"] = str(screens[int(screen_id)]["id"]) config["sku_id"] = str(tickets[int(sku_id)]["id"]) config["pay_money"] = str(tickets[int(sku_id)]["price"]) @@ -288,18 +282,13 @@ def main(): if len(addr_list) == 0: logger.error("没有收货地址,请先添加收货地址") else: - for i in range(len(addr_list)): - logger.info( - f"{str(i)}. {addr_list[i]['prov']+addr_list[i]['city']+addr_list[i]['area']+addr_list[i]['addr']} {addr_list[i]['name']} {addr_list[i]['phone']}" - ) - while True: - try: - addr_index = int(input("请选择收货地址序号:")) - if addr_index >= len(addr_list): - raise ValueError - break - except ValueError: - logger.error("序号错误") + addr_index = inquirer.prompt([ + inquirer.List( + 'addr_index', + message="请选择收货地址", + choices=[f"{i}. {addr_list[i]['prov']+addr_list[i]['city']+addr_list[i]['area']+addr_list[i]['addr']} {addr_list[i]['name']} {addr_list[i]['phone']}" for i in range(len(addr_list))], + ), + ])["addr_index"].split(".")[0] addr = addr_list[addr_index] config["deliver_info"] = json.dumps( { @@ -338,28 +327,37 @@ def main(): while True: try: if multiselect: - for i in range(len(buyer_infos)): - logger.info( - f"{str(i)}. {buyer_infos[i]['name']} {buyer_infos[i]['personal_id']} {buyer_infos[i]['tel']}" - ) - buyerids = input( - "请选择购票人序号(多人用空格隔开):" - ).split(" ") + buyerids = inquirer.prompt([ + inquirer.Checkbox( + 'buyerids', + message="请选择购票人", + choices=[f"{i}. {buyer_infos[i]['name']} {buyer_infos[i]['personal_id']} {buyer_infos[i]['tel']}" for i in range(len(buyer_infos))], + ), + ])["buyerids"] + buyerids = [int(i.split(".")[0]) for i in buyerids] config["buyer_info"] = [] for select in buyerids: config["buyer_info"].append( buyer_infos[int(select)] - ) # type: ignore - # type: ignore + ) logger.info( "已选择购票人" + buyer_infos[int(select)]["name"] ) else: - for i in range(len(buyer_infos)): - logger.info( - f"{str(i)}. {buyer_infos[i]['name']} {buyer_infos[i]['personal_id']} {buyer_infos[i]['tel']}" - ) - index = int(input("请选择购票人序号:")) + index = inquirer.prompt([ + inquirer.Select( + 'index', + message="请选择购票人", + choices=[f"{i}. {buyer_infos[i]['name']} {buyer_infos[i]['personal_id']} {buyer_infos[i]['tel']}" for i in range(len(buyer_infos))], + ), + ])["index"].split(".")[0] + index = inquirer.prompt([ + inquirer.Text( + 'index', + message="请选择购票人序号", + validate=lambda _, x: x.isdigit(), + ), + ])["index"] config["buyer_info"].append(buyer_infos[index]) logger.info("已选择购票人" + buyer_infos[index]["name"]) break @@ -372,14 +370,27 @@ def main(): "buyer" not in config or "tel" not in config ): logger.info("请添加联系人信息") - config["buyer"] = input("联系人姓名:") - while True: - config["tel"] = input("联系人手机号:") - if len(config["tel"]) == 11: - break - logger.error("手机号长度错误") + config["buyer"] = inquirer.prompt([ + inquirer.Text( + 'buyer', + message="联系人姓名", + ), + ])["buyer"] + config["tel"] = inquirer.prompt([ + inquirer.Text( + 'tel', + message="联系人手机号", + validate=lambda _, x: len(x) == 11, + ), + ])["tel"] if "count" not in config: - config["count"] = input("请输入票数:") + config["count"] = inquirer.prompt([ + inquirer.Text( + 'count', + message="请输入票数", + validate=lambda _, x: x.isdigit(), + ), + ])["count"] if config["is_paper_ticket"]: if config["express_fee"] == 0: config["all_price"] = int(config["pay_money"]) * int( diff --git a/requirements.txt b/requirements.txt index 71eaa17..a52deb7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,4 +6,5 @@ requests sentry-sdk kdl pycryptodome -bili_ticket_gt_python==0.2.3 \ No newline at end of file +bili_ticket_gt_python==0.2.3 +inquirer \ No newline at end of file