-
Notifications
You must be signed in to change notification settings - Fork 0
/
TwitterListener.py
58 lines (49 loc) · 1.83 KB
/
TwitterListener.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
from tweepy import OAuthHandler
from tweepy import Stream
from tweepy.streaming import StreamListener
import json
import sys
import boto3
from settings import *
from dateutil.parser import parse
GEO_TABLE = 'twitter-geo'
AWS_REGION = 'eu-west-1'
class MyListener(StreamListener):
def __init__(self):
dynamodb = boto3.resource('dynamodb', region_name=AWS_REGION)
try:
self.table = dynamodb.Table(GEO_TABLE)
except Exception as e:
print('\nError connecting to database table: ' + (e.fmt if hasattr(e, 'fmt') else '') + ','.join(e.args))
sys.exit(-1)
def on_data(self, data):
tweet = json.loads(data)
if not tweet['coordinates']:
sys.stdout.write('.')
sys.stdout.flush()
return True
try:
response = self.table.put_item(
Item={
'id': tweet['id_str'],
'c0': str(tweet['coordinates']['coordinates'][0]),
'c1': str(tweet['coordinates']['coordinates'][1]),
'text': tweet['text'],
"created_at": parse(tweet['created_at']).isoformat(),
}
)
except Exception as e:
print('\nError adding item to database: ' + (e.fmt if hasattr(e, 'fmt') else '') + ','.join(e.args))
else:
status = response['ResponseMetadata']['HTTPStatusCode']
if status == 200:
sys.stdout.write('x')
sys.stdout.flush()
def on_error(self, status):
print('status:%d' % status)
return True
def run():
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_secret)
twitter_stream = Stream(auth, MyListener())
twitter_stream.filter(locations=[-10.1, 35.9, 38.6, 63.9])