-
Notifications
You must be signed in to change notification settings - Fork 2
/
PGStore.py
60 lines (56 loc) · 2.53 KB
/
PGStore.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# PGStore.py
import sys
import datetime
from Store import Store
import simplejson as json
class TweetStore(Store):
def __init__(self, project):
# super(Store, self).__init__(project)
Store.__init__(self, project)
self.start_time = datetime.datetime.now()
self.counter = 0
def commit(self):
print "committing for project %s after %s seconds of streaming ... " % (self.project.name, (datetime.datetime.now() - self.start_time),)
geo_items = []
non_geo_items = []
commit_start = datetime.datetime.now()
for message in self.message_cache:
geo = message["geo"]
list = non_geo_items if geo is None else geo_items
if geo is not None:
geo = "POINT(%s %s)" % (message["coordinates"]["coordinates"][0],
message["coordinates"]["coordinates"][1],
)
list.append((
message["id_str"], message["user"]["screen_name"],
message["text"], json.dumps(message),
message["created_at"], message["user"]["id_str"],
geo, message["user"]["location"], message["user"]["lang"],
message["retweet_count"], message["in_reply_to_user_id_str"],
message["in_reply_to_screen_name"],
self.project.id,
))
non_geo_query = """ INSERT INTO tweets(source_id, username, content, raw_data,
sent_at, from_user_id, geo, location, iso_language_code, retweet_count,
to_user, to_user_id, project_id
) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
"""
geo_query = """ INSERT INTO tweets(source_id, username, content, raw_data,
sent_at, from_user_id, geo, location, iso_language_code, retweet_count,
to_user, to_user_id, project_id
) VALUES (%s,%s,%s,%s,%s,%s,ST_GeomFromText(%s),%s,%s,%s,%s,%s,%s)
"""
print "Commit complete in %s " % (datetime.datetime.now() - commit_start,)
cursor = self.db.cursor()
cursor.executemany(non_geo_query, non_geo_items)
cursor.executemany(geo_query, geo_items)
self.db.commit()
cursor.close()
self.message_cache = []
self.counter = 0
self.start_time = datetime.datetime.now()
def store(self, data):
self.message_cache.append(data)
self.counter += 1
if self.counter >= self.commit_limit:
self.commit()