From ae0cb03e9aa8b01de6de908c8763c4eab848848c Mon Sep 17 00:00:00 2001 From: dcjohnson24 Date: Tue, 29 Aug 2023 20:04:20 -0500 Subject: [PATCH 1/9] add date range option for downloads. --- data_analysis/static_gtfs_analysis.py | 7 +- scrape_data/cta_data_downloads.py | 168 +++++++++++++++++++++----- 2 files changed, 141 insertions(+), 34 deletions(-) diff --git a/data_analysis/static_gtfs_analysis.py b/data_analysis/static_gtfs_analysis.py index 65bba88..32102c4 100644 --- a/data_analysis/static_gtfs_analysis.py +++ b/data_analysis/static_gtfs_analysis.py @@ -359,16 +359,15 @@ def download_zip(version_id: str) -> zipfile.ZipFile: zipfile.ZipFile: A zipfile for the CTA version id. """ logger.info('Downloading CTA data') - CTA_GTFS = zipfile.ZipFile( - BytesIO( + zipfile_bytes_io = BytesIO( requests.get( f"https://transitfeeds.com/p/chicago-transit-authority" f"/165/{version_id}/download" ).content ) - ) + CTA_GTFS = zipfile.ZipFile(zipfile_bytes_io) logging.info('Download complete') - return CTA_GTFS + return CTA_GTFS, zipfile_bytes_io def download_extract_format(version_id: str = None) -> GTFSFeed: diff --git a/scrape_data/cta_data_downloads.py b/scrape_data/cta_data_downloads.py index 75f10d3..26d5cba 100644 --- a/scrape_data/cta_data_downloads.py +++ b/scrape_data/cta_data_downloads.py @@ -3,9 +3,9 @@ import data_analysis.static_gtfs_analysis as sga import data_analysis.compare_scheduled_and_rt as csrt import pendulum -from io import StringIO +from io import StringIO, BytesIO import pandas as pd - +from typing import List ACCESS_KEY = sys.argv[1] SECRET_KEY = sys.argv[2] @@ -60,56 +60,164 @@ def save_csv_to_bucket(df: pd.DataFrame, filename: str) -> None: .put(Body=csv_buffer.getvalue()) -def save_sched_daily_summary() -> None: - data = sga.GTFSFeed.extract_data(CTA_GTFS) - data = sga.format_dates_hours(data) - trip_summary = sga.make_trip_summary(data) - - route_daily_summary = ( - sga.summarize_date_rt(trip_summary) - ) - route_daily_summary['date'] = route_daily_summary['date'].astype(str) - route_daily_summary_today = route_daily_summary.loc[route_daily_summary['date'] == today] - - print(f'Saving cta_route_daily_summary_{today}.csv to public bucket') - filename = f'schedule_summaries/daily_job/cta_route_daily_summary_{today}.csv' - save_csv_to_bucket( - route_daily_summary_today, - filename=filename +def save_sched_daily_summary(date_range: List[str, str] = None) -> None: + if date_range is None: + date_range = [today] + print(f"No date range given. Using {today} only") + + start_date = pendulum.parse(min(date_range)) + end_date = pendulum.parse(max(date_range)) + period = pendulum.period(start_date, end_date) + full_date_range = [dt.to_date_string() for dt in period.range('days')] + zip_filename_list = [f'cta_schedule_zipfiles_raw/google_transit_{date}.zip' + for date in full_date_range] + + # Check for files in bucket. + found_list = keys( + csrt.BUCKET_PUBLIC, + zip_filename_list ) - print(f'Confirm that {filename} exists in bucket') - keys(csrt.BUCKET_PUBLIC, [filename]) + + def extract_date(fname: str) -> str: + return fname.split('_')[-1].split('.')[0] + + def create_route_summary(CTA_GTFS: sga.GTFSFeed) -> pd.DataFrame: + data = sga.GTFSFeed.extract_data(CTA_GTFS) + data = sga.format_dates_hours(data) + trip_summary = sga.make_trip_summary(data) + route_daily_summary = ( + sga.summarize_date_rt(trip_summary) + ) + + route_daily_summary['date'] = route_daily_summary['date'].astype(str) + route_daily_summary_today = route_daily_summary.loc[route_daily_summary['date'].isin(date_range)] + return route_daily_summary_today + + print('Using zipfiles found in public bucket') + s3zip_list = [] + for fname in found_list: + zip_bytes = BytesIO() + zip_bytes.seek(0) + client.download_fileobj(fname, zip_bytes) + zipfilesched = sga.zipfile.Zipfile(zip_bytes) + fdate = extract_date(fname) + s3zip_list.append( + { + 'zip_filename': fname, + 'zip': zipfilesched, + 'csv_filename': f'schedule_summaries/daily_job/' + f'cta/cta_route_daily_summary_{fdate}.csv' + } + ) + + s3_route_daily_summary_dict = { + 'zip_filenames': [gtfs['zip_filename'] for gtfs in s3zip_list], + 'summaries': [create_route_summary(gtfs['zip']) for gtfs in s3zip_list], + 'csv_filenames': [gtfs['csv_filename'] for gtfs in s3zip_list] + } + + transitfeeds_list = list(set(zip_filename_list).difference(set(found_list))) + print(', '.join(transitfeeds_list) + ' were not found in s3. Using transitfeeds.com') + transitfeeds_dates = [] + for fname in transitfeeds_list: + # Extract date from string after splitting on '_' and then '.' + fdate = extract_date(fname) + transitfeeds_dates.append(fdate) + + + transitfeeds_dates = sorted(transitfeeds_dates) + schedule_list = csrt.create_schedule_list(month=5, year=2022) + schedule_list_filtered = [ + s for s in schedule_list + if s['feed_start_date'] >= min(transitfeeds_dates) + and s['feed_start_date'] <= max(transitfeeds_dates) + ] + -def save_realtime_daily_summary() -> None: - if pendulum.now("America/Chicago").hour >= 11: - end_date = pendulum.yesterday("America/Chicago") - else: - end_date = pendulum.now("America/Chicago").subtract(days=2) + trip_summaries_transitfeeds_dict = {'zip_filenames': [], 'zips': [], 'csv_filenames': [], + 'summaries': []} - end_date = end_date.to_date_string() + for sched in schedule_list_filtered: + CTA_GTFS, zipfile_bytes_io = sga.download_zip(sched['schedule_version']) + trip_summaries_transitfeeds_dict['zip_filenames'].append( + f"transitfeeds_schedule_zipfiles_raw/{sched['schedule_version']}.zip" + ) + trip_summaries_transitfeeds_dict['zips'].append((CTA_GTFS, zipfile_bytes_io)) + trip_summaries_transitfeeds_dict['summaries'].append(create_route_summary(CTA_GTFS)) + trip_summaries_transitfeeds_dict['csv_filenames'].append( + f'schedule_summaries/daily_job/transitfeeds/' + f'transitfeeds_route_daily_summary_v{sched["schedule_version"]}.csv' + ) + + print(f'Saving cta schedule summary files in {date_range} to public bucket') + for filename, summary in zip( + s3_route_daily_summary_dict['csv_filenames'], + s3_route_daily_summary_dict['summaries'] + ): + save_csv_to_bucket(summary, filename=filename) + + print(f'Saving transitfeeds schedule summary files and zip files ' + f'in {date_range} to public bucket') + for csv_filename, summary, zip_filename, zipfile in zip( + trip_summaries_transitfeeds_dict['csv_filenames'], + trip_summaries_transitfeeds_dict['summaries'], + trip_summaries_transitfeeds_dict['zip_filenames'], + trip_summaries_transitfeeds_dict['zips'] + ): + save_csv_to_bucket(summary, filename=csv_filename) + # Save the zip file + client.upload_fileobj( + zipfile[1], + csrt.BUCKET_PUBLIC, + zip_filename + ) + + for fname in ['csv_filenames', 'zip_filenames']: + print('Confirm that ' + ', '.join(s3_route_daily_summary_dict[fname]) + + ' exist in bucket') + _ = keys(csrt.BUCKET_PUBLIC, s3_route_daily_summary_dict[fname]) + + print('Confirm that ' + ', '.join(trip_summaries_transitfeeds_dict[fname]) + + ' exists in bucket') + _ = keys(csrt.BUCKET_PUBLIC, trip_summaries_transitfeeds_dict[fname]) + +def save_realtime_daily_summary(date: str = None) -> None: + if date is None: + if pendulum.now("America/Chicago").hour >= 11: + date = pendulum.yesterday("America/Chicago") + else: + date = pendulum.now("America/Chicago").subtract(days=2) + + date = date.to_date_string() + print(f'Date not given. Taking the latest available date {date}.') + else: + date = pendulum.parse(date).strftime('%Y-%m-%d') daily_data = pd.read_csv( - (csrt.BASE_PATH / f"bus_full_day_data_v2/{end_date}.csv") + (csrt.BASE_PATH / f"bus_full_day_data_v2/{date}.csv") .as_uri(), low_memory=False ) daily_data = csrt.make_daily_summary(daily_data) - filename = f'realtime_summaries/daily_job/bus_full_day_data_v2/{end_date}.csv' + filename = f'realtime_summaries/daily_job/bus_full_day_data_v2/{date}.csv' save_csv_to_bucket(daily_data, filename=filename) print(f'Confirm that {filename} exists in bucket') - keys(csrt.BUCKET_PUBLIC, [filename]) + _ = keys(csrt.BUCKET_PUBLIC, [filename]) # https://stackoverflow.com/questions/30249069/listing-contents-of-a-bucket-with-boto3 def keys(bucket_name: str, filenames: list, prefix: str='/', delimiter: str='/', - start_after: str='') -> None: + start_after: str='') -> list: s3_paginator = client.get_paginator('list_objects_v2') prefix = prefix.lstrip(delimiter) start_after = (start_after or prefix) if prefix.endswith(delimiter) else start_after + found_list = [] for page in s3_paginator.paginate(Bucket=bucket_name, Prefix=prefix, StartAfter=start_after): for content in page.get('Contents', ()): if content['Key'] in filenames: print(f"{content['Key']} exists") + found_list.append(content['Key']) + return found_list \ No newline at end of file From 68a55bd43b3743f7b6ce8807f8286e5909baa080 Mon Sep 17 00:00:00 2001 From: dcjohnson24 Date: Tue, 29 Aug 2023 20:07:03 -0500 Subject: [PATCH 2/9] Add branch for GitHub actions --- .github/workflows/cta_data_downloads.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/cta_data_downloads.yml b/.github/workflows/cta_data_downloads.yml index c36457d..a353a2f 100644 --- a/.github/workflows/cta_data_downloads.yml +++ b/.github/workflows/cta_data_downloads.yml @@ -4,6 +4,7 @@ on: push: branches: - 'automate-schedule-downloads' + - 'date-range-downloads' schedule: # Run every day at 12:30pm CST which is 5:30pm UTC From 640b30a8619e39beb7c5cee322a3bb8446bd83cc Mon Sep 17 00:00:00 2001 From: dcjohnson24 Date: Tue, 29 Aug 2023 20:20:06 -0500 Subject: [PATCH 3/9] Fix typing error with List[str, str] --- scrape_data/cta_data_downloads.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scrape_data/cta_data_downloads.py b/scrape_data/cta_data_downloads.py index 26d5cba..6b0abe8 100644 --- a/scrape_data/cta_data_downloads.py +++ b/scrape_data/cta_data_downloads.py @@ -60,7 +60,7 @@ def save_csv_to_bucket(df: pd.DataFrame, filename: str) -> None: .put(Body=csv_buffer.getvalue()) -def save_sched_daily_summary(date_range: List[str, str] = None) -> None: +def save_sched_daily_summary(date_range: List[str] = None) -> None: if date_range is None: date_range = [today] print(f"No date range given. Using {today} only") From 26169adfc2cc25a5084dc2fadc8c1aa5ef2e2897 Mon Sep 17 00:00:00 2001 From: dcjohnson24 Date: Sun, 3 Sep 2023 19:08:22 -0500 Subject: [PATCH 4/9] Add bucket name argument to download_fileobj --- scrape_data/cta_data_downloads.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scrape_data/cta_data_downloads.py b/scrape_data/cta_data_downloads.py index 6b0abe8..8bc5b87 100644 --- a/scrape_data/cta_data_downloads.py +++ b/scrape_data/cta_data_downloads.py @@ -99,7 +99,7 @@ def create_route_summary(CTA_GTFS: sga.GTFSFeed) -> pd.DataFrame: for fname in found_list: zip_bytes = BytesIO() zip_bytes.seek(0) - client.download_fileobj(fname, zip_bytes) + client.download_fileobj(Bucket=sga.BUCKET, Key=fname, Fileobj=zip_bytes) zipfilesched = sga.zipfile.Zipfile(zip_bytes) fdate = extract_date(fname) s3zip_list.append( From 9c2a93d3a10b2d18362f406abd0e7c1ba99fa993 Mon Sep 17 00:00:00 2001 From: dcjohnson24 Date: Sun, 3 Sep 2023 19:21:06 -0500 Subject: [PATCH 5/9] Change Zipfile to ZipFile --- scrape_data/cta_data_downloads.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scrape_data/cta_data_downloads.py b/scrape_data/cta_data_downloads.py index 8bc5b87..599514c 100644 --- a/scrape_data/cta_data_downloads.py +++ b/scrape_data/cta_data_downloads.py @@ -100,7 +100,7 @@ def create_route_summary(CTA_GTFS: sga.GTFSFeed) -> pd.DataFrame: zip_bytes = BytesIO() zip_bytes.seek(0) client.download_fileobj(Bucket=sga.BUCKET, Key=fname, Fileobj=zip_bytes) - zipfilesched = sga.zipfile.Zipfile(zip_bytes) + zipfilesched = sga.zipfile.ZipFile(zip_bytes) fdate = extract_date(fname) s3zip_list.append( { From 48a1d537fc2a4a20bb2128c77f01c12031a7a1e9 Mon Sep 17 00:00:00 2001 From: dcjohnson24 Date: Sun, 3 Sep 2023 19:41:59 -0500 Subject: [PATCH 6/9] Add case for nothing to check in transitfeeds --- scrape_data/cta_data_downloads.py | 114 +++++++++++++++--------------- 1 file changed, 58 insertions(+), 56 deletions(-) diff --git a/scrape_data/cta_data_downloads.py b/scrape_data/cta_data_downloads.py index 599514c..55a570a 100644 --- a/scrape_data/cta_data_downloads.py +++ b/scrape_data/cta_data_downloads.py @@ -77,6 +77,11 @@ def save_sched_daily_summary(date_range: List[str] = None) -> None: csrt.BUCKET_PUBLIC, zip_filename_list ) + def confirm_saved_files(file_dict: dict) -> None: + for fname in ['csv_filenames', 'zip_filenames']: + print('Confirm that ' + ', '.join(file_dict[fname]) + + ' exist in bucket') + _ = keys(csrt.BUCKET_PUBLIC, file_dict[fname]) def extract_date(fname: str) -> str: return fname.split('_')[-1].split('.')[0] @@ -115,40 +120,7 @@ def create_route_summary(CTA_GTFS: sga.GTFSFeed) -> pd.DataFrame: 'zip_filenames': [gtfs['zip_filename'] for gtfs in s3zip_list], 'summaries': [create_route_summary(gtfs['zip']) for gtfs in s3zip_list], 'csv_filenames': [gtfs['csv_filename'] for gtfs in s3zip_list] - } - - transitfeeds_list = list(set(zip_filename_list).difference(set(found_list))) - print(', '.join(transitfeeds_list) + ' were not found in s3. Using transitfeeds.com') - transitfeeds_dates = [] - for fname in transitfeeds_list: - # Extract date from string after splitting on '_' and then '.' - fdate = extract_date(fname) - transitfeeds_dates.append(fdate) - - - transitfeeds_dates = sorted(transitfeeds_dates) - schedule_list = csrt.create_schedule_list(month=5, year=2022) - schedule_list_filtered = [ - s for s in schedule_list - if s['feed_start_date'] >= min(transitfeeds_dates) - and s['feed_start_date'] <= max(transitfeeds_dates) - ] - - - trip_summaries_transitfeeds_dict = {'zip_filenames': [], 'zips': [], 'csv_filenames': [], - 'summaries': []} - - for sched in schedule_list_filtered: - CTA_GTFS, zipfile_bytes_io = sga.download_zip(sched['schedule_version']) - trip_summaries_transitfeeds_dict['zip_filenames'].append( - f"transitfeeds_schedule_zipfiles_raw/{sched['schedule_version']}.zip" - ) - trip_summaries_transitfeeds_dict['zips'].append((CTA_GTFS, zipfile_bytes_io)) - trip_summaries_transitfeeds_dict['summaries'].append(create_route_summary(CTA_GTFS)) - trip_summaries_transitfeeds_dict['csv_filenames'].append( - f'schedule_summaries/daily_job/transitfeeds/' - f'transitfeeds_route_daily_summary_v{sched["schedule_version"]}.csv' - ) + } print(f'Saving cta schedule summary files in {date_range} to public bucket') for filename, summary in zip( @@ -156,31 +128,61 @@ def create_route_summary(CTA_GTFS: sga.GTFSFeed) -> pd.DataFrame: s3_route_daily_summary_dict['summaries'] ): save_csv_to_bucket(summary, filename=filename) + + confirm_saved_files(s3_route_daily_summary_dict) - print(f'Saving transitfeeds schedule summary files and zip files ' - f'in {date_range} to public bucket') - for csv_filename, summary, zip_filename, zipfile in zip( - trip_summaries_transitfeeds_dict['csv_filenames'], - trip_summaries_transitfeeds_dict['summaries'], - trip_summaries_transitfeeds_dict['zip_filenames'], - trip_summaries_transitfeeds_dict['zips'] - ): - save_csv_to_bucket(summary, filename=csv_filename) - # Save the zip file - client.upload_fileobj( - zipfile[1], - csrt.BUCKET_PUBLIC, - zip_filename - ) + transitfeeds_list = list(set(zip_filename_list).difference(set(found_list))) + if transitfeeds_list: + print(', '.join(transitfeeds_list) + ' were not found in s3. Using transitfeeds.com') + transitfeeds_dates = [] + for fname in transitfeeds_list: + # Extract date from string after splitting on '_' and then '.' + fdate = extract_date(fname) + transitfeeds_dates.append(fdate) + + + transitfeeds_dates = sorted(transitfeeds_dates) + schedule_list = csrt.create_schedule_list(month=5, year=2022) + schedule_list_filtered = [ + s for s in schedule_list + if s['feed_start_date'] >= min(transitfeeds_dates) + and s['feed_start_date'] <= max(transitfeeds_dates) + ] + - for fname in ['csv_filenames', 'zip_filenames']: - print('Confirm that ' + ', '.join(s3_route_daily_summary_dict[fname]) - + ' exist in bucket') - _ = keys(csrt.BUCKET_PUBLIC, s3_route_daily_summary_dict[fname]) + trip_summaries_transitfeeds_dict = {'zip_filenames': [], 'zips': [], 'csv_filenames': [], + 'summaries': []} - print('Confirm that ' + ', '.join(trip_summaries_transitfeeds_dict[fname]) - + ' exists in bucket') - _ = keys(csrt.BUCKET_PUBLIC, trip_summaries_transitfeeds_dict[fname]) + for sched in schedule_list_filtered: + CTA_GTFS, zipfile_bytes_io = sga.download_zip(sched['schedule_version']) + trip_summaries_transitfeeds_dict['zip_filenames'].append( + f"transitfeeds_schedule_zipfiles_raw/{sched['schedule_version']}.zip" + ) + trip_summaries_transitfeeds_dict['zips'].append((CTA_GTFS, zipfile_bytes_io)) + trip_summaries_transitfeeds_dict['summaries'].append(create_route_summary(CTA_GTFS)) + trip_summaries_transitfeeds_dict['csv_filenames'].append( + f'schedule_summaries/daily_job/transitfeeds/' + f'transitfeeds_route_daily_summary_v{sched["schedule_version"]}.csv' + ) + print( + f'Saving transitfeeds schedule summary files and zip files ' + f'in {date_range} to public bucket' + ) + for csv_filename, summary, zip_filename, zipfile in zip( + trip_summaries_transitfeeds_dict['csv_filenames'], + trip_summaries_transitfeeds_dict['summaries'], + trip_summaries_transitfeeds_dict['zip_filenames'], + trip_summaries_transitfeeds_dict['zips'] + ): + save_csv_to_bucket(summary, filename=csv_filename) + # Save the zip file + client.upload_fileobj( + zipfile[1], + csrt.BUCKET_PUBLIC, + zip_filename + ) + confirm_saved_files(trip_summaries_transitfeeds_dict) + def save_realtime_daily_summary(date: str = None) -> None: if date is None: From ec95194cb3dda8801a8299fe10df2346af406641 Mon Sep 17 00:00:00 2001 From: dcjohnson24 Date: Sun, 3 Sep 2023 20:23:09 -0500 Subject: [PATCH 7/9] Test with date range --- .github/workflows/cta_data_downloads.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/cta_data_downloads.yml b/.github/workflows/cta_data_downloads.yml index a353a2f..9febec8 100644 --- a/.github/workflows/cta_data_downloads.yml +++ b/.github/workflows/cta_data_downloads.yml @@ -46,10 +46,13 @@ jobs: python-version: ${{ env.PYTHON_VERSION }} - name: 'Save schedule summaries' + # Test with no date and with date range run: | pip install -r requirements.txt python -c 'from scrape_data.cta_data_downloads import save_sched_daily_summary; \ save_sched_daily_summary()' $AWS_ACCESS_KEY_ID $AWS_SECRET_ACCESS_KEY + python -c 'from scrape_data.cta_data_downloads import save_sched_daily_summary; \ + save_sched_daily_summary(["2023-05-02", "2023-08-02"])' $AWS_ACCESS_KEY_ID $AWS_SECRET_ACCESS_KEY save-realtime-daily-summary: From 98cac8e813c212b1f17361cd5352d555bf051c18 Mon Sep 17 00:00:00 2001 From: dcjohnson24 Date: Mon, 4 Sep 2023 19:08:44 -0500 Subject: [PATCH 8/9] Add 2022 data from transitfeeds to s3 --- .github/workflows/cta_data_downloads.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/cta_data_downloads.yml b/.github/workflows/cta_data_downloads.yml index 9febec8..1253710 100644 --- a/.github/workflows/cta_data_downloads.yml +++ b/.github/workflows/cta_data_downloads.yml @@ -52,7 +52,8 @@ jobs: python -c 'from scrape_data.cta_data_downloads import save_sched_daily_summary; \ save_sched_daily_summary()' $AWS_ACCESS_KEY_ID $AWS_SECRET_ACCESS_KEY python -c 'from scrape_data.cta_data_downloads import save_sched_daily_summary; \ - save_sched_daily_summary(["2023-05-02", "2023-08-02"])' $AWS_ACCESS_KEY_ID $AWS_SECRET_ACCESS_KEY + save_sched_daily_summary(["2022-05-20", "2023-05-20"])' $AWS_ACCESS_KEY_ID $AWS_SECRET_ACCESS_KEY + save-realtime-daily-summary: From 3ea96303200ad0e73238b86f42600472f69c2ca8 Mon Sep 17 00:00:00 2001 From: dcjohnson24 Date: Mon, 4 Sep 2023 20:18:21 -0500 Subject: [PATCH 9/9] Shorten date range for testing --- .github/workflows/cta_data_downloads.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/cta_data_downloads.yml b/.github/workflows/cta_data_downloads.yml index 1253710..57cb15d 100644 --- a/.github/workflows/cta_data_downloads.yml +++ b/.github/workflows/cta_data_downloads.yml @@ -52,7 +52,7 @@ jobs: python -c 'from scrape_data.cta_data_downloads import save_sched_daily_summary; \ save_sched_daily_summary()' $AWS_ACCESS_KEY_ID $AWS_SECRET_ACCESS_KEY python -c 'from scrape_data.cta_data_downloads import save_sched_daily_summary; \ - save_sched_daily_summary(["2022-05-20", "2023-05-20"])' $AWS_ACCESS_KEY_ID $AWS_SECRET_ACCESS_KEY + save_sched_daily_summary(["2023-05-20", "2023-08-20"])' $AWS_ACCESS_KEY_ID $AWS_SECRET_ACCESS_KEY