Skip to content

Commit

Permalink
bug fix
Browse files Browse the repository at this point in the history
  • Loading branch information
jmargutt committed Mar 7, 2023
1 parent 0fefe55 commit cae2f5a
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 113 deletions.
5 changes: 3 additions & 2 deletions pipeline/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ requests==2.28.2
sentencepiece==0.1.97
# spacyturk==0.1.0
tqdm==4.64.1
torch==1.13.0
transformers==4.13.0
tweepy==4.12.1
#torch==1.13.0
#transformers==4.13.0
229 changes: 118 additions & 111 deletions pipeline/src/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
from google.cloud import translate_v2 as google_translate
from googleapiclient.discovery import build
from google.oauth2 import service_account
from transformers import pipeline
# from transformers import pipeline
import ast
import numpy as np
import tweepy
import pandas as pd
from tqdm import tqdm
Expand Down Expand Up @@ -52,6 +54,8 @@ def format_df(df_tweets):
df_tweets['url'] = df_tweets['entities'].apply(get_url_from_entities)
df_tweets['twitter_url'] = df_tweets.apply(get_url_from_tweet, axis=1)
df_tweets['url'] = df_tweets['url'].fillna(df_tweets['twitter_url'])
df_tweets['created_at'] = pd.to_datetime(df_tweets['created_at'])
df_tweets['created_at'] = df_tweets['created_at'].dt.tz_localize(None)
df_tweets = df_tweets[['created_at', 'id', 'full_text', 'source', 'geo', 'coordinates', 'place', 'retweet_count',
'favorite_count', 'possibly_sensitive', 'lang', 'url']]
return df_tweets
Expand Down Expand Up @@ -121,19 +125,22 @@ def main():
NewsFeed = feedparser.parse(source_url)

for entry in NewsFeed.entries:
datetime = pd.to_datetime(entry['published'])
if any(x not in entry.keys() for x in ['id', 'published', 'link', 'title']):
continue

datetime_entry = pd.to_datetime(entry['published'])

if not df_old_values.empty:
# skip if link already present in google sheet
if entry['link'] in df_old_values['Link'].unique():
continue
# skip if older than latest news
if datetime < df_old_values[df_old_values['Source'] == source_name]['datetime'].max().tz_localize(
if datetime_entry < df_old_values[df_old_values['Source'] == source_name]['datetime'].max().tz_localize(
'UTC+03:00'):
print(f"{datetime} is older than {df_old_values['datetime'].max()}, skipping")
print(f"{datetime_entry} is older than {df_old_values['datetime'].max()}, skipping")
continue
else:
print(f"{datetime} is newer than {df_old_values['datetime'].max()}, saving")
print(f"{datetime_entry} is newer than {df_old_values['datetime'].max()}, saving")

title = re.sub(r"<(.*)>", "", entry['title']) # clean title (without HTML leftovers)
title_en = title # translator.translate(title, target_language="en")["translatedText"] # translate title to english
Expand Down Expand Up @@ -172,16 +179,16 @@ def main():

# create simple entry
entry_simple = {
'Date': datetime.strftime("%d/%m/%Y"),
'Time': datetime.strftime("%H:%M"),
'Date': datetime_entry.strftime("%d/%m/%Y"),
'Time': datetime_entry.strftime("%H:%M"),
# 'Title (en)': title_en,
'Title': title,
# 'Content (en)': content_en,
'Content': content,
'Source': source_name,
'Source+datetime': f'{source_name}, {datetime.strftime("%d/%m/%Y")} {datetime.strftime("%H:%M")}',
'Source+datetime': f'{source_name}, {datetime_entry.strftime("%d/%m/%Y")} {datetime_entry.strftime("%H:%M")}',
'Link': entry['link'],
'datetime': datetime
'datetime': datetime_entry
}
entries.append(entry_simple)

Expand All @@ -197,119 +204,119 @@ def main():
valueInputOption="USER_ENTERED", insertDataOption="INSERT_ROWS", body=body).execute()
sleep(1)

# Twitter
twitter_sources = {
'SANA Syria': 'SANAEnOfficial',
'Alwatan Syria': 'AlwatanSy',
'HashtagSyria': 'presshashtag',
'Almasdar Online': 'AlmasdaronlineE',
'Alghad': 'AlghadNews',
'Shaam': 'ShaamNetwork',
'Syrian Observatory for Human Rights': 'syriahr',
'Baladi News': 'baladinetwork',
'North Press Agency': 'NPA_Arabic',
'Sky News Arabia': 'skynewsarabia',
'Al Maydeen': 'Almayadeennews',
'Monte Carlo Doualiya': 'MC_Doualiya',
'BBC Arabic': 'BBCArabic'
}

spreadsheet_id = '1p8zMlaXlC-3BpPbl5Yb61u6VZRUIxD1Gc2yo7PJ9ScY'
spreadsheet_range = 'Tweets!A:L'
# get data already in the spreadsheet
result = service.spreadsheets().values().get(spreadsheetId=spreadsheet_id,
range=spreadsheet_range).execute()
values = result.get('values', [])
df_old_values = pd.DataFrame.from_records(values[1:], columns=values[0])
df_old_values['created_at'] = pd.to_datetime(df_old_values['created_at'])

auth = tweepy.OAuthHandler(os.environ['TWITTER_API_KEY'], os.environ['TWITTER_API_SECRET'])
auth.set_access_token(os.environ['TWITTER_ACCESS_TOKEN'], os.environ['TWITTER_ACCESS_SECRET'])
api = tweepy.API(auth, wait_on_rate_limit=True)

twitter_data_path = "../data"
os.makedirs(twitter_data_path, exist_ok=True)

# track individual twitter accounts
for source_name, source_id in twitter_sources.items():
# save output as
save_file = twitter_data_path + '/tweets_' + source_id + '.json'
# Twitter
twitter_sources = {
'SANA Syria': 'SANAEnOfficial',
'Alwatan Syria': 'AlwatanSy',
'HashtagSyria': 'presshashtag',
'Almasdar Online': 'AlmasdaronlineE',
'Alghad': 'AlghadNews',
'Shaam': 'ShaamNetwork',
'Syrian Observatory for Human Rights': 'syriahr',
'Baladi News': 'baladinetwork',
'North Press Agency': 'NPA_Arabic',
'Sky News Arabia': 'skynewsarabia',
'Al Maydeen': 'Almayadeennews',
'Monte Carlo Doualiya': 'MC_Doualiya',
'BBC Arabic': 'BBCArabic'
}

spreadsheet_id = '1p8zMlaXlC-3BpPbl5Yb61u6VZRUIxD1Gc2yo7PJ9ScY'
spreadsheet_range = 'Tweets!A:L'
# get data already in the spreadsheet
result = service.spreadsheets().values().get(spreadsheetId=spreadsheet_id,
range=spreadsheet_range).execute()
values = result.get('values', [])
df_old_values = pd.DataFrame.from_records(values[1:], columns=values[0])
df_old_values['created_at'] = pd.to_datetime(df_old_values['created_at'])

auth = tweepy.OAuthHandler(os.environ['TWITTER_API_KEY'], os.environ['TWITTER_API_SECRET'])
auth.set_access_token(os.environ['TWITTER_ACCESS_TOKEN'], os.environ['TWITTER_ACCESS_SECRET'])
api = tweepy.API(auth, wait_on_rate_limit=True)

twitter_data_path = "../data"
os.makedirs(twitter_data_path, exist_ok=True)

# track individual twitter accounts
for source_name, source_id in twitter_sources.items():
# save output as
save_file = twitter_data_path + '/tweets_' + source_id + '.json'
tweets = api.user_timeline(
screen_name=source_id,
count=200,
include_rts=False,
tweet_mode='extended'
)

all_tweets = []
all_tweets.extend(tweets)
oldest_id = tweets[-1].id
while True:
tweets = api.user_timeline(
screen_name=source_id,
count=200,
include_rts=False,
max_id=oldest_id - 1,
tweet_mode='extended'
)

all_tweets = []
all_tweets.extend(tweets)
if len(tweets) == 0:
break
oldest_id = tweets[-1].id
while True:
tweets = api.user_timeline(
screen_name=source_id,
count=200,
include_rts=False,
max_id=oldest_id - 1,
tweet_mode='extended'
)
if len(tweets) == 0:
break
oldest_id = tweets[-1].id
all_tweets.extend(tweets)

with open(save_file, 'a') as tf:
for tweet in all_tweets:
try:
tf.write('\n')
json.dump(tweet._json, tf)
except Exception as e:
logging.warning("Some error occurred, skipping tweet:")
logging.warning(e)
pass

# parse tweets and store in dataframe
df_tweets = pd.DataFrame()
for file in os.listdir(twitter_data_path):
if file.endswith('.json'):
df_tweets_ = pd.read_json(os.path.join(twitter_data_path, file), lines=True)
df_tweets = df_tweets.append(df_tweets_, ignore_index=True)

df_tweets['relevant'] = True
for ix, row in df_tweets.iterrows():
# skip if link already present in google sheet
if not df_old_values.empty:
if row['url'] in df_old_values['url'].unique():
df_tweets.at[ix, 'relevant'] = False
all_tweets.extend(tweets)

if row['created_at'].dt.date < datetime.date.fromisoformat('2023-02-06'):
with open(save_file, 'a') as tf:
for tweet in all_tweets:
try:
tf.write('\n')
json.dump(tweet._json, tf)
except Exception as e:
logging.warning("Some error occurred, skipping tweet:")
logging.warning(e)
pass

# parse tweets and store in dataframe
df_tweets = pd.DataFrame()
for file in os.listdir(twitter_data_path):
if file.endswith('.json'):
df_tweets_ = pd.read_json(os.path.join(twitter_data_path, file), lines=True)
df_tweets = df_tweets.append(df_tweets_, ignore_index=True)

# drop duplicates
df_tweets = df_tweets.drop_duplicates(subset=['id'])
df_tweets = format_df(df_tweets)

df_tweets['relevant'] = True
for ix, row in df_tweets.iterrows():
# skip if link already present in google sheet
if not df_old_values.empty:
if row['url'] in df_old_values['url'].unique():
df_tweets.at[ix, 'relevant'] = False

# filter by location
if not ('syria' in row['full_text'] or 'سوريا' in row['full_text']):
df_tweets.at[ix, 'relevant'] = False
if row['created_at'].date() < datetime.date.fromisoformat('2023-02-06'):
df_tweets.at[ix, 'relevant'] = False

# filter by keyword
if not any(keyword.lower() in row['full_text'].lower() for keyword in keywords):
df_tweets.at[ix, 'relevant'] = False
df_tweets = df_tweets[df_tweets['relevant']].drop(columns=['relevant'])

# drop duplicates
df_tweets = df_tweets.drop_duplicates(subset=['id'])
df_tweets = format_df(df_tweets)
df_tweets['created_at'] = df_tweets['created_at'].dt.tz_localize(None)
df_tweets = df_tweets.sort_values(by='created_at')
df_tweets['created_at'] = df_tweets['created_at'].astype(str)
df_tweets = df_tweets.fillna('')

# add entries to google sheet
logging.info('updating Google sheet')
for ix, row in df_tweets.iterrows():
# add new row to google sheet
body = {'values': [list(row.values)]}
result = service.spreadsheets().values().append(
spreadsheetId=spreadsheet_id, range=spreadsheet_range,
valueInputOption="USER_ENTERED", insertDataOption="INSERT_ROWS", body=body).execute()
sleep(1)
# filter by location
if not ('syria' in row['full_text'] or 'سوريا' in row['full_text']):
df_tweets.at[ix, 'relevant'] = False

# filter by keyword
if not any(keyword.lower() in row['full_text'].lower() for keyword in keywords):
df_tweets.at[ix, 'relevant'] = False
df_tweets = df_tweets[df_tweets['relevant']].drop(columns=['relevant'])

df_tweets = df_tweets.sort_values(by='created_at')
df_tweets['created_at'] = df_tweets['created_at'].astype(str)
df_tweets = df_tweets.fillna('')

# add entries to google sheet
logging.info('updating Google sheet')
for ix, row in df_tweets.iterrows():
# add new row to google sheet
body = {'values': [list(row.values)]}
result = service.spreadsheets().values().append(
spreadsheetId=spreadsheet_id, range=spreadsheet_range,
valueInputOption="USER_ENTERED", insertDataOption="INSERT_ROWS", body=body).execute()
sleep(1)

except Exception as e:
logging.error(f"{e}")
Expand Down

0 comments on commit cae2f5a

Please sign in to comment.