-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdanmu-big-gift.py
304 lines (284 loc) · 11.5 KB
/
danmu-big-gift.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
import json
import time
import threading
from datetime import datetime
import pymysql
import requests
import websocket
class Spider(object):
def __init__(self):
self.conn = pymysql.connect(host='127.0.0.1', user='user', password='password', db='house', charset='utf8mb4')
self.cursor = self.conn.cursor()
self.ws = websocket.create_connection('wss://danmuproxy.douyu.com:8506')
self.gift_dict = self.get_gift_dict()
self.gift_dict_keys = self.gift_dict.keys()
# 编码请求消息
def dy_encode(self, msg):
try:
data_len = len(msg) + 9
msg_byte = msg.encode('utf-8')
len_byte = int.to_bytes(data_len, 4, 'little')
send_byte = bytearray([0xb1, 0x02, 0x00, 0x00])
end_byte = bytearray([0x00])
data = len_byte + len_byte + send_byte + msg_byte + end_byte
return data
except:
pass
# 解码响应消息
def dy_decode(self, msg_byte):
try:
pos = 0
msg = []
while pos < len(msg_byte):
content_length = int.from_bytes(msg_byte[pos: pos + 4], byteorder='little')
content = msg_byte[pos + 12: pos + 3 + content_length].decode(encoding='utf-8', errors='ignore')
msg.append(content)
pos += (4 + content_length)
return msg
except:
pass
# 解析响应消息
def parse_msg(self, raw_msg):
res = {}
attrs = raw_msg.split('/')[0:-1]
for attr in attrs:
try:
attr = attr.replace('@S', '/')
attr = attr.replace('@A', '@')
couple = attr.split('@=')
res[couple[0]] = couple[1]
except Exception as e:
print(e)
pass
return res
# 登录
def login(self):
"""
1.客户端向弹幕服务器发送登录请求
2.客户端收到登录成功消息后发送进入弹幕分组请求给弹幕服务器
"""
login_msg = "type@=loginreq/roomid@=74751/dfl@=/username@=380054864=/uid@=380054864/ver@=20190610/aver@=218101901/ct@=0/"
try:
self.ws.send(self.dy_encode(login_msg))
print('登陆成功')
except Exception as e:
print(e)
exit(1)
# 加入分组,-9999为海量弹幕
def join_group(self):
join_group_msg = 'type@=joingroup/rid@=74751/gid@=-9999/'
try:
self.ws.send(self.dy_encode(join_group_msg))
print('加入分组成功')
except Exception as e:
exit(1)
# 获取消息
def get_msg(self):
print('开始接收消息')
while True:
try:
msg_bytes = self.ws.recv()
msg_arr = self.dy_decode(msg_bytes)
msg = self.parse_msg(msg_arr[0])
# 弹幕
if msg['type'] == 'chatmsg':
self.parse_chatmsg(msg)
# 禁言
if msg['type'] == 'newblackres':
self.parse_newblackres(msg)
# 小礼物
if msg['type'] == 'dgb':
self.parse_gift(msg)
# 大礼物
if msg['type'] == 'tsboxb':
self.parse_big_gift(msg)
except ConnectionError:
exit(1)
except Exception as e:
with open('error.txt', 'a+') as f:
f.write(json.dumps(msg_arr))
print(e)
print(msg_arr)
pass
# 提取弹幕内容
def parse_chatmsg(self, msg):
item = {}
try:
content = msg['txt']
content = content.replace('\\', '')
content = content.replace('\'', '')
content = content.replace('"', '')
item['nickname'] = msg['nn'] # 昵称
item['content'] = content # 内容
item['level'] = msg['level'] # 用户等级
item['fan_card'] = msg['bnn'] # 粉丝牌
item['send_time'] = datetime.now()
item['fan_level'] = msg['bl'] # 粉丝牌等级
self.add_dm_sql(item)
except Exception as e:
pass
# 提取禁言信息
def parse_newblackres(self, msg):
item = {}
try:
item['snic'] = msg['snic'] # 禁言者昵称
item['dnic'] = msg['dnic'] # 被禁言用户昵称
item['otype'] = msg['otype'] # 禁言操作人类型 1房管 2主播 3超管
item['endtime'] = msg['endtime'] # 禁言结束时间
self.add_jy_sql(item)
except Exception as e:
pass
# 提取小礼物信息
def parse_gift(self, msg):
item = {}
try:
item['nickname'] = msg['nn'] # 用户昵称
item['uid'] = msg['uid'] # 用户id
item['gift_id'] = msg['gfid'] # 礼物id
if msg['gfid'] in self.gift_dict_keys:
item['gift_name'] = self.gift_dict[msg['gfid']] # 礼物名称
else:
item['gift_name'] = '未知'
item['level'] = msg['level'] # 用户等级
item['gift_count'] = msg['gfcnt'] # 礼物个数
item['fan_card'] = msg['bnn'] # 粉丝牌
item['fan_level'] = msg['bl'] # 粉丝牌等级
item['fan_card_room_id'] = msg['brid'] # 粉丝牌房间号
item['gift_from'] = msg['from'] # 礼物来源 2-背包
item['send_time'] = datetime.now()
self.add_gift_sql(item)
except Exception as e:
pass
# 提取大礼物
def parse_big_gift(self, msg):
item = {}
try:
item['nickname'] = msg['snk'] # 用户昵称
if msg['rpt'] in self.gift_dict_keys:
item['gift_name'] = self.gift_dict[msg['rpt']] # 礼物名称
else:
item['gift_name'] = '未知'
item['gift_from'] = msg['from'] # 礼物来源 2-背包
item['send_time'] = datetime.now()
self.add_big_gift_sql(item)
except Exception as e:
print(e)
print(msg)
pass
# 保存弹幕数据到数据库
def add_dm_sql(self, obj):
try:
nickname = obj['nickname']
content = obj['content']
fan_card = obj['fan_card']
fan_level = obj['fan_level']
level = obj['level']
send_time = obj['send_time']
sql = "insert into douyu_danmu (nickname,content,user_level,fan_card,fan_level,send_time) values ('%s','%s','%s','%s','%s','%s')" % (
nickname, content, level, fan_card, fan_level, send_time)
self.cursor.execute(sql)
self.conn.commit()
except Exception as e:
self.conn.ping(reconnect=True) # 检查连接是否断开,断开重连
# 保存禁言数据到数据库
def add_jy_sql(self, obj):
try:
snic = obj['snic']
dnic = obj['dnic']
otype = obj['otype']
endtime = obj['endtime']
timestamp = int(endtime)
endtime = datetime.fromtimestamp(timestamp)
sql = "insert into douyu_jinyan (snic,dnic,otype,endtime) values ('%s','%s','%s','%s')" % (
snic, dnic, otype, endtime)
self.cursor.execute(sql)
self.conn.commit()
except Exception as e:
self.conn.ping(reconnect=True) # 检查连接是否断开,断开重连
# 保存小礼物数据到数据库
def add_gift_sql(self, obj):
try:
nickname = obj['nickname']
uid = obj['uid']
fan_card = obj['fan_card']
fan_level = obj['fan_level']
fan_card_room_id = obj['fan_card_room_id']
user_level = obj['level']
gift_id = obj['gift_id']
gift_name = obj['gift_name']
gift_count = obj['gift_count']
gift_from = obj['gift_from']
send_time = obj['send_time']
sql = "insert into douyu_gift (nickname,uid,user_level,fan_card,fan_level,fan_room_id,gift_id,gift_name,gift_count,gift_from,send_time) " \
"values ('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s')" \
% (nickname, uid, user_level, fan_card, fan_level, fan_card_room_id, gift_id, gift_name, gift_count,
gift_from, send_time)
self.cursor.execute(sql)
self.conn.commit()
except Exception as e:
print(e)
self.conn.ping(reconnect=True) # 检查连接是否断开,断开重连
# 保存大礼物到数据库
def add_big_gift_sql(self, obj):
try:
nickname = obj['nickname']
gift_name = obj['gift_name']
gift_from = obj['gift_from']
send_time = obj['send_time']
sql = "insert into douyu_biggift (nickname,gift_name,gift_from,send_time) values ('%s','%s','%s','%s')" % (
nickname, gift_name, gift_from, send_time)
self.cursor.execute(sql)
self.conn.commit()
except Exception as e:
print(e)
self.conn.ping(reconnect=True) # 检查连接是否断开,断开重连
# 心跳连接
def keep_alive(self):
print('建立初始心跳连接...')
"""
客户端每隔 45 秒发送心跳信息给弹幕服务器
"""
while True:
try:
msg = 'type@=mrkl/'
self.ws.send(self.dy_encode(msg))
time.sleep(45)
except ConnectionError:
exit(1)
except Exception:
pass
# 礼物id映射
def get_gift_dict(self):
gift_json = {}
gift1 = requests.get('https://www.douyu.com/betard/74751').json()
gift2 = requests.get(
'https://webconf.douyucdn.cn/resource/common/prop_gift_list/prop_gift_config.json').text
gift3 = requests.get(
'https://webconf.douyucdn.cn/resource/common/gift/gift_template/20003.json').text
gift_json2 = gift2.lstrip('DYConfigCallback(').rstrip(');')
gift_json3 = gift3.lstrip('DYConfigCallback(').rstrip(');')
gift_json1 = gift1['room_gift']['gift']
gift_json2 = json.loads(gift_json2)['data']
gift_json3 = json.loads(gift_json3)['data']
for gift in gift_json1:
gift_json[gift] = gift_json1[gift]['name']
# 有子选项
if gift_json1[gift].get('effect'):
treasure_type = gift_json1[gift].get('effect')['treasure_type']
name = gift_json1[gift].get('effect')['name']
gift_json[treasure_type] = name
for gift in gift_json2:
gift_json[gift] = gift_json2[gift]['name']
for gift in gift_json3:
gift_json[str(gift['id'])] = gift['name']
return gift_json
if __name__ == '__main__':
print('启动程序...', time.strftime('%Y-%m-%d %H:%M:%S'))
dm = Spider()
dm.login()
dm.join_group()
t2 = threading.Thread(target=dm.keep_alive)
t1 = threading.Thread(target=dm.get_msg)
t2.setDaemon(True)
t1.start()
t2.start()