forked from somelab/SoMeToolkit
-
Notifications
You must be signed in to change notification settings - Fork 0
/
tweetstream.py
119 lines (104 loc) · 3.71 KB
/
tweetstream.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#
# swipped from https://github.com/salathegroup/mkondo/tree/master/mkondo
#
import tweepy
import httplib
from socket import timeout
from socket import error as socket_error
from time import sleep
import logging
import logging.config
class CompliantStream(tweepy.Stream):
''' This class extends Tweepy's Stream class by adding HTTP and TCP/IP back-off
(according to Twitter's guidelines). '''
def __init__(self, auth, listener, retry_count, logger, min_http_delay=10, max_http_delay=240,
min_tcp_ip_delay=0.5, max_tcp_ip_delay=16, **options):
self.logger = logger
self.logger.info('COMPLIENT STREAM: Initializing complient stream...')
self.min_http_delay = min_http_delay
self.max_http_delay = max_http_delay
self.min_tcp_ip_delay = min_tcp_ip_delay
self.max_tcp_ip_delay = max_tcp_ip_delay
self.running = False
self.retry_count = retry_count
self.auth = auth
#Twitter sends a keep-alive every twitter_keepalive seconds
self.twitter_keepalive = 30
#Add a couple seconds more wait time.
self.twitter_keepalive += 2.0
#logging.info('COMPLIANT STREAM: Initializing compliant stream...')
tweepy.Stream.__init__(self, auth, listener, secure=True, **options)
def _run(self):
url = "%s://%s%s" % (self.scheme, self.host, self.url)
# Connect and process the stream
error_counter = 0
conn = None
exception = None
while self.running:
if self.retry_count and error_counter > self.retry_count:
# quit if error count greater than retry count
break
try:
if self.scheme == "http":
conn = httplib.HTTPConnection(self.host)
else:
conn = httplib.HTTPSConnection(self.host)
self.auth.apply_auth(url, 'POST', self.headers, self.parameters)
conn.connect()
conn.sock.settimeout(self.twitter_keepalive)
conn.request('POST', self.url, self.body, headers=self.headers)
resp = conn.getresponse()
if resp.status != 200:
if self.listener.on_error(resp.status) is False:
break
error_counter += 1
#HTTP delay is based on error count, since we have exponential back-off
http_delay = self.get_http_delay(error_counter)
sleep(http_delay)
else:
error_counter = 0
http_delay = 0
tcp_ip_delay = 0
self._read_loop(resp)
except (timeout, socket_error):
if self.listener.on_timeout() == False:
break
if self.running is False:
break
conn.close()
error_counter += 1
self.logger.exception(e)
tcp_ip_delay = self.get_tcp_ip_delay(error_counter)
sleep(tcp_ip_delay)
except httplib.IncompleteRead:
self.logger.exception('COMPLIANT STREAM: Incomplete Read.')
#We assume there are connection issues at the other end, so we'll
#try again in a little bit.
error_counter += 1
#HTTP delay is based on error count, since we have exponential back-off
http_delay = self.get_http_delay(error_counter)
self.logger.info('COMPLIANT STREAM: HTTP Delay. Sleeping for: %s' % tcp_ip_delay)
sleep(http_delay)
except Exception, exception:
self.logger.exception('Unexpected exception: %s' % exception)
self.logger.exception(e)
break
# any other exception is fatal, so kill loop
# cleanup
self.running = False
if conn:
conn.close()
if exception:
raise
def get_http_delay(self, error_count):
''' Exponential back-off, based on the number of times we've failed (error_count) '''
delay = self.min_http_delay * (2.0 ** error_count)
if delay > self.max_http_delay:
return self.max_http_delay
return delay
def get_tcp_ip_delay(self, error_count):
''' Linear back-off, based on the number of times we've failed (error_count) '''
delay = float(self.min_tcp_ip_delay * error_count)
if delay > self.max_tcp_ip_delay:
return self.max_tcp_ip_delay
return delay