diff --git a/pipeline/requirements.txt b/pipeline/requirements.txt index 9b27f4d..7f93770 100644 --- a/pipeline/requirements.txt +++ b/pipeline/requirements.txt @@ -15,5 +15,6 @@ requests==2.28.2 sentencepiece==0.1.97 # spacyturk==0.1.0 tqdm==4.64.1 -torch==1.13.0 -transformers==4.13.0 \ No newline at end of file +tweepy==4.12.1 +#torch==1.13.0 +#transformers==4.13.0 \ No newline at end of file diff --git a/pipeline/src/pipeline/pipeline.py b/pipeline/src/pipeline/pipeline.py index 7240fa0..e6c94f8 100644 --- a/pipeline/src/pipeline/pipeline.py +++ b/pipeline/src/pipeline/pipeline.py @@ -3,7 +3,9 @@ from google.cloud import translate_v2 as google_translate from googleapiclient.discovery import build from google.oauth2 import service_account -from transformers import pipeline +# from transformers import pipeline +import ast +import numpy as np import tweepy import pandas as pd from tqdm import tqdm @@ -52,6 +54,8 @@ def format_df(df_tweets): df_tweets['url'] = df_tweets['entities'].apply(get_url_from_entities) df_tweets['twitter_url'] = df_tweets.apply(get_url_from_tweet, axis=1) df_tweets['url'] = df_tweets['url'].fillna(df_tweets['twitter_url']) + df_tweets['created_at'] = pd.to_datetime(df_tweets['created_at']) + df_tweets['created_at'] = df_tweets['created_at'].dt.tz_localize(None) df_tweets = df_tweets[['created_at', 'id', 'full_text', 'source', 'geo', 'coordinates', 'place', 'retweet_count', 'favorite_count', 'possibly_sensitive', 'lang', 'url']] return df_tweets @@ -121,19 +125,22 @@ def main(): NewsFeed = feedparser.parse(source_url) for entry in NewsFeed.entries: - datetime = pd.to_datetime(entry['published']) + if any(x not in entry.keys() for x in ['id', 'published', 'link', 'title']): + continue + + datetime_entry = pd.to_datetime(entry['published']) if not df_old_values.empty: # skip if link already present in google sheet if entry['link'] in df_old_values['Link'].unique(): continue # skip if older than latest news - if datetime < df_old_values[df_old_values['Source'] == source_name]['datetime'].max().tz_localize( + if datetime_entry < df_old_values[df_old_values['Source'] == source_name]['datetime'].max().tz_localize( 'UTC+03:00'): - print(f"{datetime} is older than {df_old_values['datetime'].max()}, skipping") + print(f"{datetime_entry} is older than {df_old_values['datetime'].max()}, skipping") continue else: - print(f"{datetime} is newer than {df_old_values['datetime'].max()}, saving") + print(f"{datetime_entry} is newer than {df_old_values['datetime'].max()}, saving") title = re.sub(r"<(.*)>", "", entry['title']) # clean title (without HTML leftovers) title_en = title # translator.translate(title, target_language="en")["translatedText"] # translate title to english @@ -172,16 +179,16 @@ def main(): # create simple entry entry_simple = { - 'Date': datetime.strftime("%d/%m/%Y"), - 'Time': datetime.strftime("%H:%M"), + 'Date': datetime_entry.strftime("%d/%m/%Y"), + 'Time': datetime_entry.strftime("%H:%M"), # 'Title (en)': title_en, 'Title': title, # 'Content (en)': content_en, 'Content': content, 'Source': source_name, - 'Source+datetime': f'{source_name}, {datetime.strftime("%d/%m/%Y")} {datetime.strftime("%H:%M")}', + 'Source+datetime': f'{source_name}, {datetime_entry.strftime("%d/%m/%Y")} {datetime_entry.strftime("%H:%M")}', 'Link': entry['link'], - 'datetime': datetime + 'datetime': datetime_entry } entries.append(entry_simple) @@ -197,119 +204,119 @@ def main(): valueInputOption="USER_ENTERED", insertDataOption="INSERT_ROWS", body=body).execute() sleep(1) - # Twitter - twitter_sources = { - 'SANA Syria': 'SANAEnOfficial', - 'Alwatan Syria': 'AlwatanSy', - 'HashtagSyria': 'presshashtag', - 'Almasdar Online': 'AlmasdaronlineE', - 'Alghad': 'AlghadNews', - 'Shaam': 'ShaamNetwork', - 'Syrian Observatory for Human Rights': 'syriahr', - 'Baladi News': 'baladinetwork', - 'North Press Agency': 'NPA_Arabic', - 'Sky News Arabia': 'skynewsarabia', - 'Al Maydeen': 'Almayadeennews', - 'Monte Carlo Doualiya': 'MC_Doualiya', - 'BBC Arabic': 'BBCArabic' - } - - spreadsheet_id = '1p8zMlaXlC-3BpPbl5Yb61u6VZRUIxD1Gc2yo7PJ9ScY' - spreadsheet_range = 'Tweets!A:L' - # get data already in the spreadsheet - result = service.spreadsheets().values().get(spreadsheetId=spreadsheet_id, - range=spreadsheet_range).execute() - values = result.get('values', []) - df_old_values = pd.DataFrame.from_records(values[1:], columns=values[0]) - df_old_values['created_at'] = pd.to_datetime(df_old_values['created_at']) - - auth = tweepy.OAuthHandler(os.environ['TWITTER_API_KEY'], os.environ['TWITTER_API_SECRET']) - auth.set_access_token(os.environ['TWITTER_ACCESS_TOKEN'], os.environ['TWITTER_ACCESS_SECRET']) - api = tweepy.API(auth, wait_on_rate_limit=True) - - twitter_data_path = "../data" - os.makedirs(twitter_data_path, exist_ok=True) - - # track individual twitter accounts - for source_name, source_id in twitter_sources.items(): - # save output as - save_file = twitter_data_path + '/tweets_' + source_id + '.json' + # Twitter + twitter_sources = { + 'SANA Syria': 'SANAEnOfficial', + 'Alwatan Syria': 'AlwatanSy', + 'HashtagSyria': 'presshashtag', + 'Almasdar Online': 'AlmasdaronlineE', + 'Alghad': 'AlghadNews', + 'Shaam': 'ShaamNetwork', + 'Syrian Observatory for Human Rights': 'syriahr', + 'Baladi News': 'baladinetwork', + 'North Press Agency': 'NPA_Arabic', + 'Sky News Arabia': 'skynewsarabia', + 'Al Maydeen': 'Almayadeennews', + 'Monte Carlo Doualiya': 'MC_Doualiya', + 'BBC Arabic': 'BBCArabic' + } + + spreadsheet_id = '1p8zMlaXlC-3BpPbl5Yb61u6VZRUIxD1Gc2yo7PJ9ScY' + spreadsheet_range = 'Tweets!A:L' + # get data already in the spreadsheet + result = service.spreadsheets().values().get(spreadsheetId=spreadsheet_id, + range=spreadsheet_range).execute() + values = result.get('values', []) + df_old_values = pd.DataFrame.from_records(values[1:], columns=values[0]) + df_old_values['created_at'] = pd.to_datetime(df_old_values['created_at']) + + auth = tweepy.OAuthHandler(os.environ['TWITTER_API_KEY'], os.environ['TWITTER_API_SECRET']) + auth.set_access_token(os.environ['TWITTER_ACCESS_TOKEN'], os.environ['TWITTER_ACCESS_SECRET']) + api = tweepy.API(auth, wait_on_rate_limit=True) + + twitter_data_path = "../data" + os.makedirs(twitter_data_path, exist_ok=True) + + # track individual twitter accounts + for source_name, source_id in twitter_sources.items(): + # save output as + save_file = twitter_data_path + '/tweets_' + source_id + '.json' + tweets = api.user_timeline( + screen_name=source_id, + count=200, + include_rts=False, + tweet_mode='extended' + ) + + all_tweets = [] + all_tweets.extend(tweets) + oldest_id = tweets[-1].id + while True: tweets = api.user_timeline( screen_name=source_id, count=200, include_rts=False, + max_id=oldest_id - 1, tweet_mode='extended' ) - - all_tweets = [] - all_tweets.extend(tweets) + if len(tweets) == 0: + break oldest_id = tweets[-1].id - while True: - tweets = api.user_timeline( - screen_name=source_id, - count=200, - include_rts=False, - max_id=oldest_id - 1, - tweet_mode='extended' - ) - if len(tweets) == 0: - break - oldest_id = tweets[-1].id - all_tweets.extend(tweets) - - with open(save_file, 'a') as tf: - for tweet in all_tweets: - try: - tf.write('\n') - json.dump(tweet._json, tf) - except Exception as e: - logging.warning("Some error occurred, skipping tweet:") - logging.warning(e) - pass - - # parse tweets and store in dataframe - df_tweets = pd.DataFrame() - for file in os.listdir(twitter_data_path): - if file.endswith('.json'): - df_tweets_ = pd.read_json(os.path.join(twitter_data_path, file), lines=True) - df_tweets = df_tweets.append(df_tweets_, ignore_index=True) - - df_tweets['relevant'] = True - for ix, row in df_tweets.iterrows(): - # skip if link already present in google sheet - if not df_old_values.empty: - if row['url'] in df_old_values['url'].unique(): - df_tweets.at[ix, 'relevant'] = False + all_tweets.extend(tweets) - if row['created_at'].dt.date < datetime.date.fromisoformat('2023-02-06'): + with open(save_file, 'a') as tf: + for tweet in all_tweets: + try: + tf.write('\n') + json.dump(tweet._json, tf) + except Exception as e: + logging.warning("Some error occurred, skipping tweet:") + logging.warning(e) + pass + + # parse tweets and store in dataframe + df_tweets = pd.DataFrame() + for file in os.listdir(twitter_data_path): + if file.endswith('.json'): + df_tweets_ = pd.read_json(os.path.join(twitter_data_path, file), lines=True) + df_tweets = df_tweets.append(df_tweets_, ignore_index=True) + + # drop duplicates + df_tweets = df_tweets.drop_duplicates(subset=['id']) + df_tweets = format_df(df_tweets) + + df_tweets['relevant'] = True + for ix, row in df_tweets.iterrows(): + # skip if link already present in google sheet + if not df_old_values.empty: + if row['url'] in df_old_values['url'].unique(): df_tweets.at[ix, 'relevant'] = False - # filter by location - if not ('syria' in row['full_text'] or 'سوريا' in row['full_text']): - df_tweets.at[ix, 'relevant'] = False + if row['created_at'].date() < datetime.date.fromisoformat('2023-02-06'): + df_tweets.at[ix, 'relevant'] = False - # filter by keyword - if not any(keyword.lower() in row['full_text'].lower() for keyword in keywords): - df_tweets.at[ix, 'relevant'] = False - df_tweets = df_tweets[df_tweets['relevant']].drop(columns=['relevant']) - - # drop duplicates - df_tweets = df_tweets.drop_duplicates(subset=['id']) - df_tweets = format_df(df_tweets) - df_tweets['created_at'] = df_tweets['created_at'].dt.tz_localize(None) - df_tweets = df_tweets.sort_values(by='created_at') - df_tweets['created_at'] = df_tweets['created_at'].astype(str) - df_tweets = df_tweets.fillna('') - - # add entries to google sheet - logging.info('updating Google sheet') - for ix, row in df_tweets.iterrows(): - # add new row to google sheet - body = {'values': [list(row.values)]} - result = service.spreadsheets().values().append( - spreadsheetId=spreadsheet_id, range=spreadsheet_range, - valueInputOption="USER_ENTERED", insertDataOption="INSERT_ROWS", body=body).execute() - sleep(1) + # filter by location + if not ('syria' in row['full_text'] or 'سوريا' in row['full_text']): + df_tweets.at[ix, 'relevant'] = False + + # filter by keyword + if not any(keyword.lower() in row['full_text'].lower() for keyword in keywords): + df_tweets.at[ix, 'relevant'] = False + df_tweets = df_tweets[df_tweets['relevant']].drop(columns=['relevant']) + + df_tweets = df_tweets.sort_values(by='created_at') + df_tweets['created_at'] = df_tweets['created_at'].astype(str) + df_tweets = df_tweets.fillna('') + + # add entries to google sheet + logging.info('updating Google sheet') + for ix, row in df_tweets.iterrows(): + # add new row to google sheet + body = {'values': [list(row.values)]} + result = service.spreadsheets().values().append( + spreadsheetId=spreadsheet_id, range=spreadsheet_range, + valueInputOption="USER_ENTERED", insertDataOption="INSERT_ROWS", body=body).execute() + sleep(1) except Exception as e: logging.error(f"{e}")