forked from mylovedonotcry/bienaoccc_hym
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathchina_unicom_game.py
289 lines (267 loc) · 11.7 KB
/
china_unicom_game.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
#!/usr/bin/python3
# -- coding: utf-8 --
# -------------------------------
# @Author : github@limoruirui https://github.com/limoruirui by院长修改
# @Time : 2023/1/27 21:03
# cron "" script-path=xxx.py,tag=匹配cron用
# const $ = new Env('某通畅游');
# -------------------------------
"""
1 脚本仅供学习交流使用, 请在下载后24h内删除
2. 活动入口: 某通app首页-5g新通信-某通畅游
3. 暂时先弄抓包版本 后续再加短信验证码获取token_online
4. 环境变量 UNICOM_GAME_ACCOUNT_INFO 格式 某通手机号#appid#token_online
抓包得到appid和token_online,不要名称和等号,只要其内容值即可
appid可抓包获取 安卓也有不抓包的方法 自行搜索
token_online 抓包获取 搜索 mobileService/onLine 切换账号可触发此数据包 此数据包包含 token_online和appid
login文件夹内有短信验证码登录(经过几天的反馈测试 不一定能用 有人可以有人不行 暂未知道原因) 抄自小一佬 github@https://github.com/xream 感谢
默认不添加新crontab 需要手动新建 task yuanter_misaka/login/unicom_login.py
5. 特别说明
i.第一次运行会因为没有积分而无法进行积分抽奖 可再运行一次或者等第二天再抽奖即可 目前场次不多 不会每天都抽
ii. 兑换话费已写 但未调用 有需要自行修改调用
"""
from time import sleep
from requests import post, get
from random import randint
from tools.tool import get_environ, timestamp
from tools.send_msg import push
from uuid import uuid4
from tools.notify import send
from tools.ql_api import get_cookie
import threading
msg_str = "活动入口: 联通app首页-5g新通信-联通畅游\n\n"
class CUG:
def __init__(self, phone: str, appid: str, token_online: str):
self.phone_num = phone.rstrip("\n")
self.appId = appid.rstrip("\n")
self.token_online = token_online.rstrip("\n")
default_ua = f"Mozilla/5.0 (Linux; Android {randint(8, 13)}; SM-S908U Build/TP1A.220810.014; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/{randint(95, 108)}.0.5359.128 Mobile Safari/537.36; unicom{{version:[email protected]{randint(0, 6)}00,desmobile:{self.phone_num}}};devicetype{{deviceBrand:,deviceModel:}};{{yw_code:}}"
self.run_ua = get_environ(key="UNICOM_USERAGENT", default=default_ua, output=False)
self.deviceId = uuid4().hex
self.msg = ""
def get_ecsToken(self):
url = "https://m.client.10010.com/mobileService/onLine.htm"
body = f"reqtime={timestamp()}&netWay=Wifi&version=android%4010.0100&deviceId={self.deviceId}&token_online={self.token_online}&provinceChanel=general&appId={self.appId}&deviceModel=SM-S908U&step=bindlist&androidId={uuid4().hex[8:24]}&deviceBrand=&flushkey=1"
headers = {
"content-type": "application/x-www-form-urlencoded",
"user-agent": self.run_ua
}
try:
data = post(url, headers=headers, data=body).json()
print(data)
self.ecs_token = data["ecs_token"]
# print(self.ecs_token)
except Exception as e:
print(f"\n账号{self.phone_num}获取token请求出现错误,请检查CK是否正确或已失效,跳过该账号\n" )
exit(0)
def login(self):
url = "https://game.wostore.cn/api/app//user/v2/login"
body = {
"identityType": "esToken",
"code": self.ecs_token
}
headers = {
"pragma": "no-cache",
"cache-control": "no-cache",
"accept": "application/json",
"content-type": "application/json;charset=utf-8",
"user-agent": self.run_ua,
"channelid": "GAMELTAPP_90006",
"device": "5",
"origin": "https://web.wostore.cn",
"x-requested-with": "com.sinovatech.unicom.ui",
"sec-fetch-site": "same-site",
"sec-fetch-mode": "cors",
"sec-fetch-dest": "empty",
"referer": "https://web.wostore.cn/",
"accept-encoding": "gzip, deflate",
"accept-language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7"
}
data = ""
try:
data = post(url, headers=headers, json=body).json()
self.access_token = data["data"]["access_token"]
self.headers = {
"pragma": "no-cache",
"cache-control": "no-cache",
"accept": "application/json, text/plain, */*",
"authorization": self.access_token,
"user-agent": self.run_ua,
"origin": "https://web.wostore.cn",
"x-requested-with": "com.sinovatech.unicom.ui",
"sec-fetch-site": "same-site",
"sec-fetch-mode": "cors",
"sec-fetch-dest": "empty",
"referer": "https://web.wostore.cn/",
"accept-encoding": "gzip, deflate",
"accept-language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7"
}
except Exception as e:
print(f"\n账号{self.phone_num}登录请求出现错误,请检查CK是否正确或已失效,跳过该账号\n" )
exit(0)
def check_in(self):
url = "https://game.wostore.cn/api/app/user/v2/signIn"
data = get(url, headers=self.headers).json()
print(data)
def lotter(self):
url = "https://game.wostore.cn/api/app/user/v2/benefit/lottery?id=1"
data = get(url, headers=self.headers).json()
print(data)
def pay_lotter(self, lotter_id):
if lotter_id is None:
return
url = f"https://game.wostore.cn/api/app/user/v2/lottery/join?id={lotter_id}"
data = get(url, headers=self.headers).json()
print(data)
def get_task(self):
"""
receiveStatus 2 为已完成
:return:
"""
url = "https://game.wostore.cn/api/app/user/v2/task/list"
data = get(url, headers=self.headers).json()
all_task_info = {}
print(data)
for task_info in data["data"]:
if task_info["receiveStatus"] == 2:
print(f"任务{task_info['taskName']}已完成 跳过领取")
continue
elif task_info["receiveStatus"] == 0:
print(f"任务{task_info['taskName']}已完成 跳过领取")
continue
all_task_info[task_info["id"]] = task_info["productId"]
return all_task_info
def finish_task(self, task_id, productId):
url = f"https://game.wostore.cn/api/app/user/v2/task/receive?productId={productId}&taskId={task_id}"
data = get(url, headers=self.headers).json()
print(data)
sleep(3)
def play_game(self):
url = "https://game.wostore.cn/api/app/user/v2/play/save"
body = {
"cpGameId": f"1500019{randint(900, 999)}"
}
data = post(url, headers=self.headers, json=body).json()
print(data)
# 兑换5元话费,兑换其他话费,自行抓包替换productId
def exchange(self):
def get_exchange():
url = "https://game.wostore.cn/api/app/game/v2/shop/getToken"
data = get(url, headers=self.headers).json()
# print(data)
return data["data"]
url = f"https://game.wostore.cn/api/app/shop/order/product/order/mall?Authorization={get_exchange()}"
body = {
"productId": "803779159738716160"
}
data = post(url, headers=self.headers, json=body).json()
print(data)
try:
print(f"账号{self.phone_num}---执行兑换5元话费结果为:{data['msg']}\n\n")
return f"账号{self.phone_num}---执行兑换5元话费结果为:{data['msg']}\n\n"
except Exception as e:
print(f"账号{self.phone_num}出现错误,请求失败结果: " )
print_now(data)
def init(self):
"""
初始化活动及查询积分
:return:
"""
url = "https://game.wostore.cn/api/app/user/v2/getMemberInfo"
data = get(url, headers=self.headers).json()
# print(data)
return data["data"]["userIntegral"]
def get_pay_lotter_list(self):
"""
status
0: 可参与
1: 已参与
2: 已结束
:return:
"""
def get_shopToken():
url = "https://game.wostore.cn/api/app/game/v2/shop/getToken"
data = get(url, headers=self.headers).json()
# print(data)
return data["data"]
url = f"https://game.wostore.cn/api/app/shop/business/lottery/available?Authorization={get_shopToken()}"
headers = {
"pragma": "no-cache",
"cache-control": "no-cache",
"accept": "application/json",
"user-agent": self.run_ua,
"channelid": "GAMELTAPP_90006",
"device": "5",
"origin": "https://web.wostore.cn",
"x-requested-with": "com.sinovatech.unicom.ui",
"sec-fetch-site": "same-site",
"sec-fetch-mode": "cors",
"sec-fetch-dest": "empty",
"referer": "https://web.wostore.cn/",
"accept-encoding": "gzip, deflate",
"accept-language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7"
}
data = get(url, headers=headers).json()
# print(data)
for lotter_info in data["data"]["list"]:
if lotter_info["status"] == 0 and lotter_info["points"] <= 30:
return lotter_info["id"]
print("当前抽奖轮次已全部参加或没有符合条件的场次 跳过")
return None
def main(self):
global msg_str #声明我们在函数内部使用的是在函数外部定义的全局变量msg_str
self.get_ecsToken()
self.login()
old_score = self.init()
self.check_in()
self.lotter()
self.pay_lotter(self.get_pay_lotter_list())
self.play_game()
[self.finish_task(task_id, productId) for task_id, productId in self.get_task().items()]
# 新增调用原作者写好的兑换奖品方法
exchange_result = self.exchange()
sleep(5)
now_score = self.init()
today_score = now_score - old_score
self.msg += f"账号{self.phone_num}---本次运行获得{today_score}分, 当前共有{now_score}分\n{exchange_result}"
msg_str += f"账号{self.phone_num}---本次运行获得{today_score}分, 当前共有{now_score}分\n{exchange_result}"
# push("某通畅游", self.msg)
def start(unicom_game_info):
if unicom_game_info == "":
exit(0)
cug = CUG(*unicom_game_info.split("#"))
cug.main()
print("\n")
print("\n")
if __name__ == '__main__':
"""读取环境变量"""
l = []
user_map = []
cklist = get_cookie("UNICOM_GAME_ACCOUNT_INFO")
for i in range(len(cklist)):
#以&分割开的ck
split1 = cklist[i].split("&")
split2 = cklist[i].split("\n")
if len(split1)>1:
for j in range(len(split1)):
user_map.append(split1[j])
elif len(split2)>1:
for j in range(len(split2)):
user_map.append(split2[j])
else:
user_map.append(cklist[i])
for i in range(len(user_map)):
unicom_game_info=""
unicom_game_info = user_map[i].split("&")[0]
if unicom_game_info == "":
print("当前账号未填写CK 跳过")
print("\n")
continue
print('开始执行第{}个账号:{}'.format((i+1),unicom_game_info.split("#")[0]))
p = threading.Thread(target=start,args=(unicom_game_info, ))
l.append(p)
p.start()
print("\n")
for i in l:
i.join()
send("联通畅游",msg_str)