-
Notifications
You must be signed in to change notification settings - Fork 0
/
get_tweets.py
131 lines (106 loc) · 3.65 KB
/
get_tweets.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import twitter
import load_passwords
import load_settings
import csv
import time
import os.path
import sys
import pytz
from datetime import datetime, timedelta
from util import *
# https://github.com/bear/python-twitter
# https://python-twitter.readthedocs.io/en/latest/
config = load_passwords.load()
settings = load_settings.load()
api = twitter.Api(consumer_key=config["consumer_key"],
consumer_secret=config["consumer_secret"],
access_token_key=config["access_token_key"],
access_token_secret=config["access_token_secret"],
sleep_on_rate_limit=True,
tweet_mode="extended")
# Note: for a list of timezones, look at pytz.common_timezones
timezone = pytz.timezone(settings["timezone"])
tweetsOutputFileName = 'hashtag_tweets.csv'
startDateString = "2019-05-15"
# endDateString = "2019-06-17"
start = parseDate(startDateString)
# end = parseDate(endDateString) + timedelta(days=1)
apiDateFormat = "%Y-%m-%d"
twitterDateFormat = "%a %b %d %H:%M:%S %z %Y"
tweetsPerRequest=500 # standard API is 100, premium is 500
hashtags = [
'#ibiza',
'#ibizagate',
'#ibizavideo',
'#ibizaaffaere',
'#HCStrache',
'#Strache',
'#StracheVideo',
'#StracheGate',
'#Kurz',
'#FPÖ',
'#fpoe',
'#Bierlein',
'#übergangsregierung',
'#neuwahlen',
'#neuwahl',
'#philippastrache',
'#Kickl',
'#Regierungskrise',
'#Misstrauensantrag',
'#Misstrauensvotum',
]
# def write_tweets_for_tag(tag):
# search_query = 'q=' + hash_tag
# search_query = search_query.replace('#', '%23')
# search_query += '&result_type=recent'
# search_query += '&since=' + startDateString
# search_query += '&count=' + tweetsPerRequest
# tweets = get_all_for_query(search_query)
# return tweets
def write_all_for_query(query):
earliestTweet = None
max_id = 1136971040280731648
while(True):
raw_query = query + '&count=' + tweetsPerRequest
if max_id is not None:
raw_query += '&max_id=' + str(max_id)
newTweets = api.GetSearch(raw_query=raw_query)
if not newTweets:
return
for tweet in newTweets:
tweet.created_at_date = parseTwitterDate(tweet.created_at)
earliestTweet = min(newTweets, key=lambda x: x.created_at_date)
earliestTweetDate = earliestTweet.created_at_date
print(printDateOnly(earliestTweetDate), end=' ', flush=True)
write_tweets_to_file(newTweets)
max_id = earliestTweet.id
if earliestTweetDate < start or len(newTweets) == 1:
return
time.sleep(1)
def write_tweets_any_hashtag():
sep = '%20OR%20'
search_query = 'q=' + sep.join(hashtags)
search_query = search_query.replace('#', '%23')
search_query += '&result_type=recent'
search_query += '&since=' + startDateString
tweets = write_all_for_query(search_query)
return tweets
def write_tweets_to_file(tweets):
with open(tweetsOutputFileName, 'a', newline='', encoding='utf-8') as csvfile:
csvWriter = csv.writer(csvfile)
csvWriter.writerow(["screen_name", "tweet_id", "created_at", "text"])
for tweet in tweets:
screen_name = tweet.user.screen_name
created_at_date = datetime.fromtimestamp(tweet.created_at_in_seconds)
row = [
screen_name,
tweet.id,
printDate(created_at_date),
tweet.full_text or tweet.text
]
csvWriter.writerow(row)
if os.path.exists(tweetsOutputFileName):
os.remove(tweetsOutputFileName)
write_tweets_any_hashtag()
print("Done!")