From 21c7f464b3309511337d3d7e95bdac0973a779c1 Mon Sep 17 00:00:00 2001 From: f18326186224 <f18326186224@gmail.com> Date: Fri, 24 May 2024 19:21:10 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E9=98=BF=E9=87=8C=E4=BA=91=E8=B0=83?= =?UTF-8?q?=E7=94=A8api=E5=A4=B1=E8=B4=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- auto_check_in.py | 245 ++++++++++++++++++++++------------------------- main.py | 33 +------ 2 files changed, 118 insertions(+), 160 deletions(-) diff --git a/auto_check_in.py b/auto_check_in.py index 89d21a9..bb25d1a 100644 --- a/auto_check_in.py +++ b/auto_check_in.py @@ -20,151 +20,138 @@ def show_qrcode(qr_link: str): - # 将qr_link生成二维码 - logger.info('正在生成二维码') - qr_img = qrcode.make(qr_link) - qr_img.get_image() - qr_img_path = tempfile.mktemp() - qr_img.save(qr_img_path) - qr_data = open(qr_img_path, 'rb').read() - logger.info('二维码生成成功') - bot.send_photo(chat_id=os.environ['TG_CHAT_ID'], - photo=qr_data, caption='请扫码登录阿里云盘') + # 将qr_link生成二维码 + logger.info('正在生成二维码') + qr_img = qrcode.make(qr_link) + qr_img.get_image() + qr_img_path = tempfile.mktemp() + qr_img.save(qr_img_path) + qr_data = open(qr_img_path, 'rb').read() + logger.info('二维码生成成功') + bot.send_photo(chat_id=os.environ['TG_CHAT_ID'], + photo=qr_data, caption='请扫码登录阿里云盘') def format_date(): - """获取今天的字符串格式化日期 格式为: 2021-09-01 + """获取今天的字符串格式化日期 格式为: 2021-09-01 - Args: - date (datetime.date): 日期 + Args: + date (datetime.date): 日期 - Returns: - [str]: 格式化后的日期 - """ - return datetime.date.today().strftime('%Y-%m-%d') + Returns: + [str]: 格式化后的日期 + """ + return datetime.date.today().strftime('%Y-%m-%d') def days_between(old_date: str): - """计算字符串日期与今天的天数差 格式为: 2021-09-01 + """计算字符串日期与今天的天数差 格式为: 2021-09-01 - Args: - old_date (str): _description_ - """ - old_date = time.strptime(old_date, '%Y-%m-%d') - old_date = time.mktime(old_date) - now = time.time() - days = (now - old_date) / (24 * 60 * 60) - return days + Args: + old_date (str): _description_ + """ + old_date = time.strptime(old_date, '%Y-%m-%d') + old_date = time.mktime(old_date) + now = time.time() + days = (now - old_date) / (24 * 60 * 60) + return days def sign_in(refresh_token: str, bot: TeleBot): - tg_content = "" - - if refresh_token != "": - logger.info('阿里云盘自动签到开始') - response_data = aliyundriveAutoCheckin.get_token(refresh_token.strip()) - if isinstance(response_data, str): - tg_content += response_data - - access_token = response_data.get('access_token') - user_name = response_data.get("user_name") - - if access_token is None: - tg_content += f"令牌错误: 请检查您的令牌值。\n" - - response_data = aliyundriveAutoCheckin.sign_in(access_token) - if isinstance(response_data, str): - tg_content += response_data - - signin_count = response_data['result']['signInCount'] - tg_content += f"账号: {user_name} - 成功签到, 本月累计签到: {signin_count}天\n" - - response_data = aliyundriveAutoCheckin.get_reward( - access_token, signin_count) - if isinstance(response_data, str): - tg_content += response_data - - tg_content += f"本次签到的奖励: {response_data['result']['name']}, {response_data['result']['description']}\n" - - # bot.send_message(chat_id=os.environ['TG_CHAT_ID'], text=tg_content) - logger.info('阿里云盘自动签到成功') + tg_content = "" + + if refresh_token != "": + logger.info('阿里云盘自动签到开始') + response_data = aliyundriveAutoCheckin.get_token(refresh_token.strip()) + if isinstance(response_data, str): + tg_content += response_data + + access_token = response_data.get('access_token') + user_name = response_data.get("user_name") + + if access_token is None: + tg_content += f"令牌错误: 请检查您的令牌值。\n" + + response_data = aliyundriveAutoCheckin.sign_in(access_token) + if isinstance(response_data, str): + tg_content += response_data + + signin_count = response_data['result']['signInCount'] + tg_content += f"账号: {user_name} - 成功签到, 本月累计签到: {signin_count}天\n" + + response_data = aliyundriveAutoCheckin.get_reward( + access_token, signin_count) + if isinstance(response_data, str): + tg_content += response_data + + tg_content += f"本次签到的奖励: {response_data['result']['name']}, {response_data['result']['description']}\n" + + # bot.send_message(chat_id=os.environ['TG_CHAT_ID'], text=tg_content) + logger.info('阿里云盘自动签到成功') # 准备aligo需要的配置文件 def prepare_for_aligo(base64_userdata: str): - subprocess.call('mkdir -p /home/runner/.aligo', shell=True) - aligo_config_folder = Path.home().joinpath('.aligo') / 'aligo.json' - try: - aligo_config_str = base64.b64decode( - base64_userdata).decode(encoding='utf-8') - aligo_config: dict = json.loads(aligo_config_str) - refresh_token = aligo_config['refresh_token'] - device_id = aligo_config['device_id'] - x_device_id = aligo_config['x_device_id'] - aligo = Aligo(refresh_token=refresh_token, re_login=False) - # 更新session的x-device-id - aligo._session.headers.update({'x-device-id': x_device_id, 'x-signature': aligo._auth._X_SIGNATURE}) - aligo._auth.token.device_id = device_id - aligo._auth.token.x_device_id = x_device_id - # 上次更新日期 - if 'last_updated' not in aligo_config: - aligo_config['last_updated'] = aligo_config['expire_time'].split('T')[ - 0] - aligo_config_str = json.dumps(aligo_config) - aligo_config_str = base64.b64encode(aligo_config_str.encode( - encoding='utf-8')).decode(encoding='utf-8') - os.system( - f'echo "aligo_token={aligo_config_str}" >> "$GITHUB_OUTPUT"') - else: - last_updated = aligo_config['last_updated'] - if days_between(last_updated) >= 25: - # 超过25天 刷新凭证 - # 登录成功后 将配置信息base64编码更新到github的secrets中 - new_aligo_config = json.loads( - aligo_config_folder.read_text(encoding='utf8')) - # 更新上次更新日期 - new_aligo_config['last_updated'] = format_date() - new_aligo_config_str = json.dumps(new_aligo_config) - new_aligo_config_str = base64.b64encode( - aligo_config_str.encode(encoding='utf-8')).decode(encoding='utf-8') - os.system( - f'echo "aligo_token={new_aligo_config_str}" >> "$GITHUB_OUTPUT"') - sign_in(refresh_token, bot) - return aligo - except Exception as e: - logger.info(f'登录失败:{e},重新通过扫码登录') - # 登录失败 重新通过扫码登录 - if aligo_config_folder.exists(): - aligo_config_folder.unlink() - aligo = Aligo(show=show_qrcode) - bot.send_message(chat_id=os.environ['TG_CHAT_ID'], text='阿里云盘登录成功!') - aligo_config = json.loads( - aligo_config_folder.read_text(encoding='utf8')) - aligo_config['last_updated'] = format_date() - device_id = aligo_config['device_id'] - x_device_id = aligo_config['x_device_id'] - # 更新session的x-device-id - aligo._session.headers.update({'x-device-id': x_device_id, 'x-signature': aligo._auth._X_SIGNATURE}) - aligo._auth.token.device_id = device_id - aligo._auth.token.x_device_id = x_device_id - # 将配置信息base64编码更新到github的secrets中 - aligo_config_str = json.dumps(aligo_config) - aligo_config_str = base64.b64encode(aligo_config_str.encode( - encoding='utf-8')).decode(encoding='utf-8') - # 执行linux命令 - os.system(f'echo "aligo_token={aligo_config_str}" >> "$GITHUB_OUTPUT"') - # 签到 - refresh_token = aligo_config['refresh_token'] - sign_in(refresh_token, bot) - return aligo + subprocess.call('mkdir -p /home/runner/.aligo', shell=True) + aligo_config_folder = Path.home().joinpath('.aligo') / 'aligo.json' + try: + aligo_config_str = base64.b64decode( + base64_userdata).decode(encoding='utf-8') + aligo_config: dict = json.loads(aligo_config_str) + refresh_token = aligo_config['refresh_token'] + device_id = aligo_config['device_id'] + x_device_id = aligo_config['x_device_id'] + aligo = Aligo(refresh_token=refresh_token, re_login=False) + # 更新session的x-device-id + aligo._session.headers.update({'x-device-id': x_device_id, 'x-signature': aligo._auth._X_SIGNATURE}) + aligo._auth.token.device_id = device_id + aligo._auth.token.x_device_id = x_device_id + + # 登录成功后 将配置信息base64编码更新到github的secrets中 + new_aligo_config = json.loads( + aligo_config_folder.read_text(encoding='utf8')) + new_aligo_config_str = json.dumps(new_aligo_config) + new_aligo_config_code = base64.b64encode( + new_aligo_config_str.encode(encoding='utf-8')).decode(encoding='utf-8') + os.system( + f'echo "aligo_token={new_aligo_config_code}" >> "$GITHUB_OUTPUT"') + + sign_in(refresh_token, bot) + return aligo + except Exception as e: + logger.info(f'登录失败:{e},重新通过扫码登录') + # 登录失败 重新通过扫码登录 + if aligo_config_folder.exists(): + aligo_config_folder.unlink() + aligo = Aligo(show=show_qrcode) + bot.send_message(chat_id=os.environ['TG_CHAT_ID'], text='阿里云盘登录成功!') + aligo_config = json.loads( + aligo_config_folder.read_text(encoding='utf8')) + aligo_config['last_updated'] = format_date() + device_id = aligo_config['device_id'] + x_device_id = aligo_config['x_device_id'] + # 更新session的x-device-id + aligo._session.headers.update({'x-device-id': x_device_id, 'x-signature': aligo._auth._X_SIGNATURE}) + aligo._auth.token.device_id = device_id + aligo._auth.token.x_device_id = x_device_id + # 将配置信息base64编码更新到github的secrets中 + aligo_config_str = json.dumps(aligo_config) + aligo_config_code = base64.b64encode(aligo_config_str.encode( + encoding='utf-8')).decode(encoding='utf-8') + # 执行linux命令 + os.system(f'echo "aligo_token={aligo_config_code}" >> "$GITHUB_OUTPUT"') + # 签到 + refresh_token = aligo_config['refresh_token'] + sign_in(refresh_token, bot) + return aligo if __name__ == '__main__': - try: - # Aligo的配置文件aligo.json的base64字符串 - base64_userdata = sys.argv[1] - aligo = prepare_for_aligo(base64_userdata) - except: - # 本地环境直接扫码 - logger.info(f'本地环境直接扫码') - aligo = Aligo() + try: + # Aligo的配置文件aligo.json的base64字符串 + base64_userdata = sys.argv[1] + aligo = prepare_for_aligo(base64_userdata) + except: + # 本地环境直接扫码 + logger.info(f'本地环境直接扫码') + aligo = Aligo() diff --git a/main.py b/main.py index c3e5174..4767fa2 100644 --- a/main.py +++ b/main.py @@ -99,33 +99,6 @@ def prepare_for_aligo(base64_userdata: str): device_id = aligo_config['device_id'] x_device_id = aligo_config['x_device_id'] aligo = Aligo(refresh_token=refresh_token, re_login=False) - # 更新session的x-device-id - aligo._session.headers.update({'x-device-id': x_device_id, 'x-signature': aligo._auth._X_SIGNATURE}) - aligo._auth.token.device_id = device_id - aligo._auth.token.x_device_id = x_device_id - # 上次更新日期 - if 'last_updated' not in aligo_config: - aligo_config['last_updated'] = aligo_config['expire_time'].split('T')[ - 0] - aligo_config_str = json.dumps(aligo_config) - aligo_config_str = base64.b64encode(aligo_config_str.encode( - encoding='utf-8')).decode(encoding='utf-8') - os.system( - f'echo "aligo_token={aligo_config_str}" >> "$GITHUB_OUTPUT"') - else: - last_updated = aligo_config['last_updated'] - if days_between(last_updated) >= 25: - # 超过25天 刷新凭证 - # 登录成功后 将配置信息base64编码更新到github的secrets中 - new_aligo_config = json.loads( - aligo_config_folder.read_text(encoding='utf8')) - # 更新上次更新日期 - new_aligo_config['last_updated'] = format_date() - json.dumps(new_aligo_config) - new_aligo_config_str = base64.b64encode( - aligo_config_str.encode(encoding='utf-8')).decode(encoding='utf-8') - os.system( - f'echo "aligo_token={new_aligo_config_str}" >> "$GITHUB_OUTPUT"') return aligo except Exception as e: logger.info(f'登录失败:{e},重新通过扫码登录') @@ -145,12 +118,10 @@ def prepare_for_aligo(base64_userdata: str): aligo._session.headers.update({'x-device-id': x_device_id, 'x-signature': aligo._auth._X_SIGNATURE}) # 将配置信息base64编码更新到github的secrets中 aligo_config_str = json.dumps(aligo_config) - aligo_config_str = base64.b64encode(aligo_config_str.encode( + aligo_config_code = base64.b64encode(aligo_config_str.encode( encoding='utf-8')).decode(encoding='utf-8') # 执行linux命令 - os.system(f'echo "aligo_token={aligo_config_str}" >> "$GITHUB_OUTPUT"') - # 签到 - refresh_token = aligo_config['refresh_token'] + os.system(f'echo "aligo_token={aligo_config_code}" >> "$GITHUB_OUTPUT"') return aligo