forked from caktux/pytrader
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pubnub_light.py
222 lines (195 loc) · 7.92 KB
/
pubnub_light.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
"""pubnub light API (only subscribe, not publish)"""
# Copyright (c) 2013 Bernd Kreuss <[email protected]>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301, USA.
import base64
from Crypto.Cipher import AES
import gzip
import hashlib
import io
import json
import socket
import ssl
import uuid
class SocketClosedException(Exception):
"""raised when socket read fails. This normally happens when the
hup() method is invoked, your thread that loops over read() should
catch this exception and then decide whether to retry or terminate"""
pass
class PubNub(): #pylint: disable=R0902
"""implements a simple pubnub client that tries to stay connected
and is interruptible immediately (using socket instead of urllib2).
This client supports multiplexing, SSL and gzip compression."""
def __init__(self):
self.sock = None
self.uuid = uuid.uuid4()
self.timestamp = 0
self.connected = False
self.sub = ""
self.chan = ""
self.auth = ""
self.cipher = ""
self.use_ssl = False
#pylint: disable=R0913
def subscribe(self, sub, chan, auth="", cipher="", use_ssl=False):
"""set the subscription parameters. This is needed after __init__(),
chan is a string containing a channel name or a comma separated list of
multiple cannels, it will replace all previously set subscriptions."""
self.sub = sub
self.chan = chan
self.auth = auth
self.cipher = cipher
self.use_ssl = use_ssl
# force disconnect of currently active longpoll.
self.hup()
def read(self):
"""read (blocking) and return list of messages. Each message in the
list a tuple of (channel, msg) where channel is the name of the channel
the message came from and msg is the payload. Right after subscribe()
you should enter a loop over this blocking read() call to read messages
from the subscribed channels. It will raise an exception if interrupted
(for example by hup() or by subscribe() or if something goes wrong),
so you should catch exceptions and then decide whether to re-enter your
loop because you merely called subscribe() again or whether you want
to terminate because your application ends.
"""
try:
if not self.connected:
self._connect()
(length, encoding, chunked) = self._send_request()
if chunked:
data = self._read_chunked()
else:
data = self._read_num_bytes(length)
if encoding == "gzip":
data = self._unzip(data)
data = json.loads(data)
self.timestamp = int(data[1])
if len(data[0]):
if self.cipher:
msg_list = [self._decrypt(m) for m in data[0]]
else:
msg_list = data[0]
if len(data) > 2:
chan_list = data[2].split(",")
else:
chan_list = [self.chan for m in msg_list]
return zip(chan_list, msg_list)
else:
return []
except:
self.connected = False
self.sock.close()
raise
def hup(self):
"""close socket and force the blocking read() to exit with an Exception.
Usually the thread in your app that does the read() will then have
the opportunity to decide whether to re-enter the read() because you
only set new subscription parameters or to terminate because you want
to shut down the client completely."""
if self.sock:
self.connected = False
self.sock.shutdown(2)
self.sock.close()
def _connect(self):
"""connect and set self.connected flag, raise exception if error.
This method is used internally, you don't explicitly call it yourself,
the read() method will invoke it automatically if necessary."""
self.sock = socket.socket()
host = "pubsub.pubnub.com"
port = 80
if self.use_ssl:
self.sock = ssl.wrap_socket(self.sock)
port = 443
self.sock.connect((host, port))
self.connected = True
def _send_request(self):
"""send http request, read response header and return
response header info tuple (see: _read_response_header)."""
headers = [
"GET /subscribe/%s/%s/0/%i?uuid=%s&auth=%s HTTP/1.1" \
% (self.sub, self.chan, self.timestamp, self.uuid, self.auth),
"Accept-Encoding: gzip",
"Host: pubsub.pubnub.com",
"Connection: keep-alive"]
str_headers = "%s\r\n\r\n" % "\r\n".join(headers)
self.sock.send(str_headers)
return self._read_response_header()
def _read_response_header(self):
"""read the http response header and return a tuple containing
the values (length, encoding, chunked) which will be needed to
correctly read and interpret the rest of the response."""
length = None
encoding = "identity"
chunked = False
hdr = []
while True:
line = self._read_line()
if not line:
break
hdr.append(line)
for line in hdr:
if "Content-Length" in line:
length = int(line[15:])
if "Content-Encoding" in line:
encoding = line[17:].strip()
if "Transfer-Encoding: chunked" in line:
chunked = True
return (length, encoding, chunked)
def _read_line(self):
"""read one line from socket until and including CRLF, return stripped
line or raise SocketClosedException if socket was closed"""
line = ""
while not line[-2:] == "\r\n":
char = self.sock.recv(1)
if not char:
raise SocketClosedException
line += char
return line.strip()
def _read_num_bytes(self, num):
"""read (blocking) exactly num bytes from socket,
raise SocketClosedException if the socket is closed."""
buf = ""
while len(buf) < num:
chunk = self.sock.recv(num - len(buf))
if not chunk:
raise SocketClosedException
buf += chunk
return buf
def _read_chunked(self):
"""read chunked transfer encoding"""
buf = ""
size = 1
while size:
size = int(self._read_line(), 16)
buf += self._read_num_bytes(size)
self._read_num_bytes(2) # CRLF
return buf
#pylint: disable=R0201
def _unzip(self, data):
"""unzip the gzip content encoding"""
with io.BytesIO(data) as buf:
with gzip.GzipFile(fileobj=buf) as unzipped:
return unzipped.read()
def _decrypt(self, msg):
"""decrypt a single pubnub message"""
# they must be real crypto experts at pubnub.com
# two lines of code and two capital mistakes :-(
# pylint: disable=E1101
key = hashlib.sha256(self.cipher).hexdigest()[0:32]
aes = AES.new(key, AES.MODE_CBC, "0123456789012345")
decrypted = aes.decrypt(base64.decodestring(msg))
return json.loads(decrypted[0:-ord(decrypted[-1])])