-
Notifications
You must be signed in to change notification settings - Fork 1
/
blive_danmu.py
138 lines (123 loc) · 4.52 KB
/
blive_danmu.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import json
import struct
import time
import threading
import zlib
from collections import namedtuple
import requests
import websocket
HEADER_STRUCT = struct.Struct('>I2H2I')
# 大端 >
# 四字节长度 I
# 2字节长度 H
HeaderTuple = namedtuple('HeaderTuple', ('pack_len', 'raw_header_size', 'ver', 'operation', 'seq_id'))
class Spider(object):
def __init__(self):
self.ws = websocket.create_connection('wss://broadcastlv.chat.bilibili.com:443/sub')
self.uid = 10
self.room_id = 68808
# 消息处理
def _handle_message(self, data):
offset = 0
while offset < len(data):
try:
header = HeaderTuple(*HEADER_STRUCT.unpack_from(data, offset))
except struct.error:
break
if header.operation == 5:
# 弹幕消息
body = data[offset + HEADER_STRUCT.size: offset + header.pack_len]
if header.ver == 2:
# zlib加密消息
body = zlib.decompress(body)
self._handle_message(body)
else:
# 正常json消息
try:
body = json.loads(body.decode('utf-8'))
# 弹幕
if body['cmd'] == 'DANMU_MSG':
info = body['info']
# info结构是数组,第二个值为弹幕内容,第三个为用户信息,包括uid和昵称,第四个为粉丝牌信息,其他不清楚
user = info[2][1]
msg = info[1]
print(user, ':', msg)
# 礼物
if body['cmd'] == 'SEND_GIFT':
data = body['data']
user = data['uname']
uid = data['uid']
num = data['num']
giftName = data['giftName']
action = data['action']
coin_type = data['coin_type']
price = data['price']
# coin_type silver-银瓜子 glod-金瓜子
print('感谢\t', user, '\t', action, '\t的', num, '个', giftName)
if body['cmd'] == 'INTERACT_WORD':
data = body['data']
print('欢迎\t{}({})\t进入直播间'.format(data['uname'], data['uid']))
except Exception as e:
print(e)
raise
else:
pass
offset += header.pack_len
# 封包
def make_packet(self, data, operation):
body = json.dumps(data).encode('utf-8')
header = HEADER_STRUCT.pack(
HEADER_STRUCT.size + len(body),
HEADER_STRUCT.size,
1,
operation,
1
)
return header + body
# 登录
def login(self):
auth_params = {
'uid': self.uid,
'roomid': self.room_id,
'protover': 2,
'type': 2,
'platform': 'web',
'clientver': '2.6.2',
}
try:
self.ws.send(self.make_packet(auth_params, 7))
print('登陆成功')
except Exception as e:
print('登录失败')
exit(1)
# 获取消息
def get_msg(self):
print('开始接收消息')
while True:
try:
msg_bytes = self.ws.recv()
self._handle_message(msg_bytes)
except Exception as e:
print(e)
exit(1)
# 心跳连接
def keep_alive(self):
"""
客户端每隔 30 秒发送心跳信息给弹幕服务器
"""
while True:
try:
print('心跳\n')
self.ws.send(self.make_packet({}, 2))
time.sleep(30)
except Exception as e:
print(e)
exit(1)
if __name__ == '__main__':
dm = Spider()
dm.login()
t1 = threading.Thread(target=dm.get_msg)
t2 = threading.Thread(target=dm.keep_alive)
t2.setDaemon(True)
t1.start()
t2.start()