-
Notifications
You must be signed in to change notification settings - Fork 1
/
tweet_cluster.py
123 lines (104 loc) · 4.78 KB
/
tweet_cluster.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import time
from pymongo import MongoClient
from pymongo.operations import UpdateOne
from cluster import BERTopicModel
class TwitterClustering:
def __init__(self):
"""
Initialize the TwitterClustering class.
Connect to MongoDB and set up the required collections.
"""
self.client = MongoClient('localhost', 27017)
self.db = self.client["twitter_data"]
self.tweet_filtered = self.db["tweet_filtered"]
self.topic_status_change = self.db['topic_status_change']
def get_unclustered_tweets(self):
"""
Retrieve unclustered tweets from the database.
Returns:
- tweet_txt_id: A list of dictionaries containing tweet IDs and texts.
"""
filter = {'topic_label': {'$exists': False}}
projection = {'_id': 1, 'text': 1}
tweet_txt_id = list(self.tweet_filtered.find(filter, projection))
return tweet_txt_id
def update_tweet_clustering(self):
"""
Update tweet clustering based on topic modeling.
Returns:
- topic_info: Information about the topics generated by the clustering.
"""
tweets_to_cluster = self.get_unclustered_tweets()
if not tweets_to_cluster:
print("No tweets to cluster.")
return None
clusterModel = BERTopicModel('cluster_model')
tweet_id_label, topic_info = clusterModel.online_topic_modeling(tweets_to_cluster)
clusterModel.save_model()
operations = [
UpdateOne({'_id': d['_id']}, {'$set': {'topic_label': d['topic_label']}})
for d in tweet_id_label
]
result = self.tweet_filtered.bulk_write(operations)
print(f"Number of tweets clustered: {result.modified_count}")
return topic_info
def topics_to_delete(self, new_topic_dict):
"""
Determine the topic labels to delete based on the new topics generated.
Args:
- new_topic_dict: A list of dictionaries representing the new topics.
Returns:
- labels_to_delete: A list of topic labels to delete.
"""
new_topic_labels = set(topic['topic_label'] for topic in new_topic_dict)
change_topic_labels = set(doc['topic_label'] for doc in self.topic_status_change.find())
return list(change_topic_labels - new_topic_labels)
def delete_tweets_by_cluster_label(self, labels_to_delete):
"""
Delete tweets with specified cluster labels.
Args:
- labels_to_delete: A list of cluster labels to delete.
"""
self.topic_status_change.delete_many({'topic_label': {'$in': labels_to_delete}})
result = self.tweet_filtered.delete_many({'topic_label': {'$in': labels_to_delete}})
print('Number of tweets deleted: ', result.deleted_count, 'Labels deleted: ', labels_to_delete)
def delete_irrelevant_tweets(self):
"""
For every cluster, delete all the tweets that are no longer relevant to the cluster
by comparing cluster keywords to tweet text.
"""
# TODO: Implement this function.
pass
def topics_over_time(self, new_topic_stats):
"""
Track the changes in topics over time and update the topic status collection.
Args:
- new_topic_stats: Statistics and information about the new topics.
"""
new_topics = new_topic_stats['topics']
for new_topic in new_topics:
doc = self.topic_status_change.find_one({'topic_label': new_topic['topic_label']})
if doc:
if doc['size'][len(doc['size'])-1] != new_topic['size'] or doc['keywords'][len(doc['keywords'])-1] != new_topic['keywords']:
doc['size'].append(new_topic['size'])
doc['keywords'].append(new_topic['keywords'])
doc['time'].append(new_topic_stats['time'])
self.topic_status_change.replace_one({'topic_label': new_topic['topic_label']}, doc)
else:
new_doc = {
'topic_label': new_topic['topic_label'],
'size': [new_topic['size']],
'keywords': [new_topic['keywords']],
'time': [new_topic_stats['time']]
}
self.topic_status_change.insert_one(new_doc)
if __name__ == '__main__':
twitter_clustering = TwitterClustering()
while True:
new_topic_stats = twitter_clustering.update_tweet_clustering()
if new_topic_stats:
twitter_clustering.topics_over_time(new_topic_stats)
labels_to_delete = twitter_clustering.topics_to_delete(new_topic_stats['topics'])
twitter_clustering.delete_tweets_by_cluster_label(labels_to_delete)
print('Sleeping for 5 minutes')
time.sleep(300)