-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathretrieve_tweets.py
136 lines (110 loc) · 4.87 KB
/
retrieve_tweets.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
"""
Retrieve tweets live - as they're posted
"""
import tweepy
from textblob import TextBlob
from pymongo import MongoClient
from keys import detailssDict
import tensorflow as tf
import pickle
details = detailssDict
TWITTER_APP_KEY = details.keysDict.TWITTER_APP_KEY
TWITTER_APP_SECRET = details.keysDict.TWITTER_APP_SECRET
TWITTER_KEY = details.keysDict.TWITTER_KEY
TWITTER_SECRET = details.keysDict.TWITTER_SECRET
# Set authentication for Twitter API
auth = tweepy.OAuthHandler(TWITTER_APP_KEY, TWITTER_APP_SECRET)
auth.set_access_token(TWITTER_KEY, TWITTER_SECRET)
api = tweepy.API(auth)
# Setup database connection
dbclient = MongoClient(details.db.URL)
db = dbclient[details.db.DATABASE]
col = db[details.db.COLLECTION]
# The tweets streamer
class StreamListener(tweepy.Stream):
# Filter out retweets
def on_status(self, tweet):
"""
Handles a new tweet
"""
if hasattr(tweet, "retweeted_status"):
return
# Using TextBlob to get subjectivity score of the tweet
tweet_text = TextBlob(tweet.text)
# Loading trained sentiment analysis model to get polarity of the tweet
sentiment_model = tf.keras.models.load_model("./Models/sentiment140_bert")
polarity = tf.sigmoid(sentiment_model.predict(tweet.text))
# Check whether the user that posted the tweet is already in the database
# If yes, update their existing record in the database
if col.find({"user_id": tweet.user.id}).count() >= 1:
# Get database record of the user
user_profile = col.find_one({"user_id": tweet.user.id})
# Increment tweet count
new_tweet_count = user_profile["no_tweets"] + 1
# Create updated user profile
tweet_profile = {
"user_id": tweet.user.id,
# Calculate new average polarity
"polarity": (
user_profile["polarity"] * user_profile["no_tweets"] + polarity
)
/ new_tweet_count,
# Calculate new average subjectivity
"subjectivity": (
user_profile["subjectivity"] * user_profile["no_tweets"]
+ tweet_text.sentiment.subjectivity
)
/ new_tweet_count,
"verified": 1 if tweet.user.verified else 0,
"protected": 1 if tweet.user.protected else 0,
# Update total favorites count
"favorites": user_profile["favorites"] + tweet.favorite_count,
# Update total retweets count
"retweets": user_profile["retweets"] + tweet.retweet_count,
"no_tweets": new_tweet_count,
"no_tweets_total": tweet.user.statuses_count,
"followers": tweet.user.followers_count,
}
# Preprocessing for clustering model
preprocessor = pickle.load(open("./Models/pipeline.pkl", "rb"))
preprocessed = preprocessor.transform(tweet_profile)
# Predicting the cluster of the updated profile
cluster_model = pickle.load(open("./Models/cluster_model.pkl", "rb"))
cluster = cluster_model.predict(preprocessed)
# Add the cluster to user profile
tweet_profile["cluster"] = cluster
# Update user record on the database
x = col.update_one({"user_id": tweet.user.id}, {"$set": tweet_profile})
print(x.upserted_id)
# If the user is not already in the database
else:
# Create a new user profile
tweet_profile = {
"user_id": tweet.user.id,
"polarity": polarity,
"subjectivity": tweet_text.sentiment.subjectivity,
"verified": 1 if tweet.user.verified else 0,
"protected": 1 if tweet.user.protected else 0,
"favorites": tweet.favorite_count,
"retweets": tweet.retweet_count,
"no_tweets": 1,
"no_tweets_total": tweet.user.statuses_count,
"followers": tweet.user.followers_count,
}
# Preprocess for clustering model
preprocessor = pickle.load(open("./Models/pipeline.pkl", "rb"))
preprocessed = preprocessor.transform(tweet_profile)
# Predicting the cluster of the user profile
cluster_model = pickle.load(open("./Models/cluster_model.pkl", "rb"))
cluster = cluster_model.predict(preprocessed)
# Add cluster to the user profile
tweet_profile["cluster"] = cluster
# Inser t user profile into the database
x = col.insert_one(tweet_profile)
print(x.inserted_id)
def on_error(self, status_code):
"""
Handles errors in tweet retrieval
"""
if status_code == 420:
return False