-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
39f5ac2
commit 4d29c7b
Showing
18 changed files
with
30,972 additions
and
0 deletions.
There are no files selected for viewing
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
Below are the steps to run the twitter analysis application on AWS | ||
1. Launch EC2 Linux instance | ||
2. Default installed version of python in linux AMI is 2.7. Applications needs python 3 and above as it uses boto 3 library.Install python 3.6 by executing following commands | ||
sudo yum install python36 | ||
sudo yum install python36-pip | ||
3. Set python 3 as default version by executing command sudo update-alternatives --config python | ||
4. Confirm the version by running python --version command | ||
5. Install boto3 (SDK for AWS) by using command sudo pip install boto3 | ||
6. Install tweepy by using command sudo pip install tweepy | ||
7. Create Lambda function on AWS with python 3.6 runtime. Paste the code from Lambda.py | ||
9. Create Kinesis FireHose delivery stream and choose Source as Direct PUT and destination as Elastic Search Service. Choose to transform the records using Lambda option and specify the lambda function created in previous step. | ||
10. Create a domain in Elastic Search Service with index as Comment and type as comment.Choose t2.small.elasticsearch as instance type and version as 6.5 | ||
11. Copy the python file TwitterStreaming.py on EC2 instance and run it using command python TwitterStreaming.py | ||
12. Application can be monitored using logs created by lambda function under CloudWatchLogs. | ||
13. One can confirm the data storage on elastic search by visiting indices tab on elastic search dashboard. | ||
14. Graphs can be created using Kibana Visualization tool which is automatically deployed with elastic search domain. | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
from __future__ import print_function | ||
|
||
import base64 | ||
import json | ||
import boto3 | ||
|
||
|
||
|
||
def lambda_handler(event, context): | ||
output = [] | ||
|
||
for record in event['records']: | ||
dict_data = json.loads(base64.b64decode(record['data']).decode('utf-8').strip()) | ||
|
||
|
||
comprehend = boto3.client(service_name='comprehend', region_name='us-east-1') | ||
sentiment_all = comprehend.detect_sentiment(Text=dict_data["text"], LanguageCode='en') | ||
sentiment = sentiment_all['Sentiment'] | ||
print(sentiment) | ||
positive = sentiment_all['SentimentScore']['Positive'] | ||
negative = sentiment_all['SentimentScore']['Negative'] | ||
total = positive + negative/2 | ||
|
||
|
||
data_record = { | ||
'message': dict_data["text"], | ||
'created_at': dict_data["created_at"], | ||
'location': dict_data["location"], | ||
'sentiment': sentiment, | ||
'total': total | ||
} | ||
|
||
output_record = { | ||
'recordId': record['recordId'], | ||
'result': 'Ok', | ||
'data': base64.b64encode(json.dumps(data_record).encode('utf-8')).decode('utf-8') | ||
} | ||
|
||
|
||
output.append(output_record) | ||
|
||
return {'records': output} |
114 changes: 114 additions & 0 deletions
114
projectdocs_Stratosphere/AWS/SourceFiles/TwitterStreaming.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
import boto3 | ||
import random | ||
import time | ||
import json | ||
from tweepy.streaming import StreamListener | ||
from tweepy import OAuthHandler | ||
from tweepy import Stream | ||
import re | ||
import datetime | ||
import sys | ||
|
||
|
||
#This is the super secret information | ||
access_token = "1116427206874419200-VBEb7qwPAmO6C4MJDSlkKHawoSb5JW" | ||
access_token_secret = "LVQOcBlrFmjMMI47uZt4AYvjEHv2laXznD0zrJyvz7Jmx" | ||
consumer_key = "6pB1GfnrSvOg6emWnJn1zvuvO" | ||
consumer_secret = "Hht2zmPKKDLVfyI2zupL37SQTgzaIFKw3AwGxsRjMcsEzU0vi9" | ||
aws_key_id = "AKIASERF2GZL6V3G5SID" | ||
aws_key = "hRr74UWlN6nun/2Ob09XxRjjgP4bC43Fggih94iF" | ||
DeliveryStreamName = 'TwitterStreaming' | ||
client = boto3.client('firehose', region_name='us-east-1', | ||
aws_access_key_id=aws_key_id, | ||
aws_secret_access_key=aws_key | ||
) | ||
num_tweets=0 | ||
|
||
class PreProcessor: | ||
def clean_data(self,text): | ||
|
||
|
||
#removing hashtags | ||
newtext= re.sub(r'#\w*','',text); | ||
#removing urls | ||
newtext= re.sub(r'(?i)\b((?:https?://|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:\'".,<>?\xab\xbb\u201c\u201d\u2018\u2019]))','',newtext); | ||
#removing mentions | ||
newtext= re.sub(r'@\w*','',newtext); | ||
#removing emojis | ||
newtext=re.sub(u'([\U00002600-\U000027BF])|([\U0001f300-\U0001f64F])|([\U0001f680-\U0001f6FF])' , '',newtext); | ||
#removing smileys | ||
newtext=re.sub(r"(?:X|:|;|=)(?:-)?(?:\)|\(|O|D|P|S){1,}",'',newtext); | ||
#removing numbers | ||
newtext=re.sub(r"(^|\s)(\-?\d+(?:\.\d)*|\d+)",'',newtext); | ||
#removing resreved words | ||
newtext=re.sub(r'^(RT|FAV)','',newtext); | ||
|
||
newtext = re.sub(r':', '', newtext); | ||
# replace consecutive non-ASCII characters with a space | ||
newtext = re.sub(r'[^\x00-\x7F]+', ' ', newtext); | ||
|
||
return newtext | ||
#This is a basic listener that just prints received tweets and put them into the stream. | ||
class StdOutListener(StreamListener): | ||
|
||
def on_data(self, data): | ||
|
||
|
||
global num_tweets | ||
global numberofTweets | ||
|
||
if(num_tweets>=numberofTweets): | ||
sys.exit() | ||
|
||
|
||
|
||
p=PreProcessor(); | ||
twitter_data = json.loads(data); | ||
text=twitter_data["text"]; | ||
|
||
|
||
if 'extended_tweet' in twitter_data: | ||
text = twitter_data['extended_tweet']['full_text']; | ||
|
||
cleantext=p.clean_data(text); | ||
|
||
created_at=twitter_data["created_at"]; | ||
|
||
|
||
created_date = datetime.datetime.strptime(created_at, '%a %b %d %H:%M:%S %z %Y').date(); | ||
created_date_text=created_date.strftime("%m/%d/%Y"); | ||
|
||
if twitter_data["place"] is not None: | ||
location = (twitter_data["place"]["country"]); | ||
else: | ||
location = "Unknown"; | ||
|
||
|
||
|
||
|
||
data={"created_at":created_date_text,"text":cleantext,"location":location} | ||
|
||
|
||
|
||
client.put_record(DeliveryStreamName=DeliveryStreamName, Record={'Data':json.dumps(data)}) | ||
|
||
num_tweets+=1 | ||
|
||
print(str(num_tweets) + cleantext); | ||
|
||
return True | ||
|
||
def on_error(self, status): | ||
print (status) | ||
|
||
|
||
if __name__ == '__main__': | ||
|
||
#This handles Twitter authetification and the connection to Twitter Streaming API | ||
l = StdOutListener() | ||
auth = OAuthHandler(consumer_key, consumer_secret) | ||
auth.set_access_token(access_token, access_token_secret) | ||
keyword = input("Please enter keyword to search tweets: "); | ||
numberofTweets=int(input("Please enter number of tweets to be searched: ")); | ||
stream = Stream(auth, l) | ||
stream.filter(track=[keyword],languages=["en"]) |
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
1. Create an instance on GCP under your project | ||
2. SSH to the instance and run TwitterHelper.py | ||
3. Create a topic in pub/sub for the project and ingest tweets. | ||
4. Cloud function subscribed to the topic receives the tweets. Paste cloudfunction.py into the cloud function console. Cloud Natural Language API is invoked. | ||
5. The scores are visible in the logging service of GCP. | ||
6. Create another topic to stream the scores. Enable Logstash to receive this data through the subscription. | ||
7. Create index on Kibana to view the logs stored in Elasticsearch | ||
|
||
Excerpts of Twitterhelper.py is taken from Twitter website |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
import json | ||
from tweepy.streaming import StreamListener | ||
from tweepy import OAuthHandler | ||
from tweepy import Stream | ||
import twitter | ||
|
||
from google.cloud import pubsub_v1 | ||
# Code to publish tweets to pubsub | ||
|
||
|
||
project_id = "prianalyze" | ||
topic_name = "filtertweets" | ||
|
||
publisher = pubsub_v1.PublisherClient() | ||
topic_path = publisher.topic_path(project_id, topic_name) | ||
|
||
|
||
def callback(message_future): | ||
# When timeout is unspecified, the exception method waits indefinitely. | ||
if message_future.exception(timeout=30): | ||
print('Publishing message on {} threw an Exception {}.'.format( | ||
topic_name, message_future.exception())) | ||
else: | ||
print('result=' + message_future.result()) | ||
|
||
|
||
def publish_to_pubsub(tweet): | ||
# Data must be a byte string | ||
tweet = tweet.encode('utf-8') | ||
# When you publish a message, the client returns a Future. | ||
message_future = publisher.publish(topic_path, data=tweet) | ||
message_future.add_done_callback(callback) | ||
|
||
|
||
# We must keep the main thread from exiting to allow it to process | ||
# messages in the background. | ||
|
||
|
||
####################################################################### | ||
|
||
# Below code is to read tweets from twitter api | ||
|
||
|
||
class StdOutListener(StreamListener): | ||
def on_data(self, data): | ||
twitter_data = json.loads(data) | ||
twitter_text = twitter_data['text'] | ||
if 'extended_tweet' in twitter_data: | ||
twitter_text = twitter_data['extended_tweet']['full_text'] | ||
print(twitter_text) | ||
publish_to_pubsub(twitter_text) | ||
|
||
return True | ||
|
||
def on_error(self, status): | ||
print('error:' + status) | ||
|
||
|
||
def tweets_getter(): | ||
|
||
TWITTER_APP_KEY = 'OvYWp33Qoh75Bqmu25va7AVwV' | ||
TWITTER_APP_SECRET_KEY = '12duj5j4VaNSBOjgbyIDvNVFrYwpDnzGEr3879woAjnbFd64e2' | ||
TWITTER_ACCESS_TOKEN = '1118948324986351619-8z7DPA8ouYwmyZg9iNrpffoKNJGW5I' | ||
TWITTER_TOKEN_SECRET = 'Aus32bnB8LqJdPrvWyBoSnUQpw28wXEu2yvHh61e2eQMq' | ||
# api = twitter.Api(consumer_key=TWITTER_APP_KEY, consumer_secret=TWITTER_APP_SECRET_KEY, | ||
# access_token_key=TWITTER_ACCESS_TOKEN, access_token_secret=TWITTER_TOKEN_SECRET, | ||
# tweet_mode='extended') | ||
twitter_listener = StdOutListener() | ||
auth = OAuthHandler(TWITTER_APP_KEY, TWITTER_APP_SECRET_KEY) | ||
auth.set_access_token(TWITTER_ACCESS_TOKEN, TWITTER_TOKEN_SECRET) | ||
stream = Stream(auth, twitter_listener, tweet_mode='extended') | ||
stream.filter(track=['#AvengersEndGame'], languages=["en"]) | ||
|
||
|
||
if __name__ == '__main__': | ||
tweets_getter() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
def sentiment_cloudfunction(data, context): | ||
"""Background Cloud Function to be triggered by Pub/Sub. | ||
Args: | ||
data (dict): The dictionary with data specific to this type of event. | ||
context (google.cloud.functions.Context): The Cloud Functions event | ||
metadata. | ||
""" | ||
import base64 | ||
from google.cloud import language | ||
from google.cloud.language import enums | ||
from google.cloud.language import types | ||
|
||
tweet = '' | ||
if 'data' in data: | ||
try: | ||
tweet = base64.b64decode(data['data']).decode('utf-8') | ||
except Exception: | ||
tweet = data['data'] | ||
print('not base64 encoded') | ||
pass | ||
|
||
# print('Hello {}!'.format(tweet)) | ||
"""Run a sentiment analysis request on text within a passed filename.""" | ||
client = language.LanguageServiceClient() | ||
|
||
# with open(movie_review_filename, 'r') as review_file: | ||
# Instantiates a plain text document. | ||
# content = review_file.read() | ||
content = tweet | ||
|
||
document = types.Document( | ||
content=content, | ||
type=enums.Document.Type.PLAIN_TEXT) | ||
annotations = client.analyze_sentiment(document=document) | ||
|
||
# Print the results | ||
# print(annotations) | ||
score = annotations.document_sentiment.score | ||
adjusted_score = (score + 1) * 5 | ||
magnitude = annotations.document_sentiment.magnitude | ||
import json | ||
dic = {"tweet": str(tweet), "score": str(adjusted_score), "magnitude": str(magnitude)} | ||
print(json.dumps(dic)) | ||
# print(adjusted_score); | ||
|
||
|
Oops, something went wrong.