Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add date range option for downloads. #65

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/cta_data_downloads.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ on:
push:
branches:
- 'automate-schedule-downloads'
- 'date-range-downloads'

schedule:
# Run every day at 12:30pm CST which is 5:30pm UTC
Expand Down Expand Up @@ -45,10 +46,14 @@ jobs:
python-version: ${{ env.PYTHON_VERSION }}

- name: 'Save schedule summaries'
# Test with no date and with date range
run: |
pip install -r requirements.txt
python -c 'from scrape_data.cta_data_downloads import save_sched_daily_summary; \
save_sched_daily_summary()' $AWS_ACCESS_KEY_ID $AWS_SECRET_ACCESS_KEY
python -c 'from scrape_data.cta_data_downloads import save_sched_daily_summary; \
save_sched_daily_summary(["2023-05-20", "2023-08-20"])' $AWS_ACCESS_KEY_ID $AWS_SECRET_ACCESS_KEY



save-realtime-daily-summary:
Expand Down
7 changes: 3 additions & 4 deletions data_analysis/static_gtfs_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,16 +359,15 @@ def download_zip(version_id: str) -> zipfile.ZipFile:
zipfile.ZipFile: A zipfile for the CTA version id.
"""
logger.info('Downloading CTA data')
CTA_GTFS = zipfile.ZipFile(
BytesIO(
zipfile_bytes_io = BytesIO(
requests.get(
f"https://transitfeeds.com/p/chicago-transit-authority"
f"/165/{version_id}/download"
).content
)
)
CTA_GTFS = zipfile.ZipFile(zipfile_bytes_io)
logging.info('Download complete')
return CTA_GTFS
return CTA_GTFS, zipfile_bytes_io
Comment on lines +362 to +370

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on this change, I think it's true that it's necessary that we have the BytesIO type data in order to do what we want with the date ranges, however, other parts of the code expect that we have the ZipFile type data. Is it then true that we are essentially returning duplicate data here?
I'm not suggesting that we change this, just want to make sure I'm understanding it correctly.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I didn't have a good way of obtaining the BytesIO object that is needed for uploading files to s3. For other parts of the code that expect the ZipFile output, I would use something like CTA_GTFS, _ = download_cta_zip(), so that the BytesIO object isn't created where it isn't needed.

Maybe creating a different data type would be better?



def download_extract_format(version_id: str = None) -> GTFSFeed:
Expand Down
170 changes: 140 additions & 30 deletions scrape_data/cta_data_downloads.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
import data_analysis.static_gtfs_analysis as sga
import data_analysis.compare_scheduled_and_rt as csrt
import pendulum
from io import StringIO
from io import StringIO, BytesIO
import pandas as pd

from typing import List

ACCESS_KEY = sys.argv[1]
SECRET_KEY = sys.argv[2]
Expand Down Expand Up @@ -60,56 +60,166 @@ def save_csv_to_bucket(df: pd.DataFrame, filename: str) -> None:
.put(Body=csv_buffer.getvalue())


def save_sched_daily_summary() -> None:
data = sga.GTFSFeed.extract_data(CTA_GTFS)
data = sga.format_dates_hours(data)
trip_summary = sga.make_trip_summary(data)

route_daily_summary = (
sga.summarize_date_rt(trip_summary)
)
route_daily_summary['date'] = route_daily_summary['date'].astype(str)
route_daily_summary_today = route_daily_summary.loc[route_daily_summary['date'] == today]

print(f'Saving cta_route_daily_summary_{today}.csv to public bucket')
filename = f'schedule_summaries/daily_job/cta_route_daily_summary_{today}.csv'
save_csv_to_bucket(
route_daily_summary_today,
filename=filename
def save_sched_daily_summary(date_range: List[str] = None) -> None:
if date_range is None:
date_range = [today]
print(f"No date range given. Using {today} only")
start_date = pendulum.parse(min(date_range))
end_date = pendulum.parse(max(date_range))
period = pendulum.period(start_date, end_date)
full_date_range = [dt.to_date_string() for dt in period.range('days')]
zip_filename_list = [f'cta_schedule_zipfiles_raw/google_transit_{date}.zip'
for date in full_date_range]

# Check for files in bucket.
found_list = keys(
csrt.BUCKET_PUBLIC,
zip_filename_list
)
print(f'Confirm that {filename} exists in bucket')
keys(csrt.BUCKET_PUBLIC, [filename])
def confirm_saved_files(file_dict: dict) -> None:
for fname in ['csv_filenames', 'zip_filenames']:
print('Confirm that ' + ', '.join(file_dict[fname])
+ ' exist in bucket')
_ = keys(csrt.BUCKET_PUBLIC, file_dict[fname])

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code in the keys function implicitly confirms that the file has been saved?
Is it true that file_dict is a mapping from a string to a list of strings?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the keys function confirms that a list of filenames exists in the bucket, and file_dict has values that are lists of strings.


def extract_date(fname: str) -> str:
return fname.split('_')[-1].split('.')[0]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I trust that this is the right way to parse it, is there a way I can see an example of the type of file that is being parsed though? If it's difficult, don't worry about it, just curious.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So let's say you have

date_range = ['2023-01-01', '2023-05-05']
start_date = pendulum.parse(min(date_range))
end_date = pendulum.parse(max(date_range))
period = pendulum.period(start_date, end_date)
full_date_range = [dt.to_date_string() for dt in period.range('days')]
zip_filename_list = [f'cta_schedule_zipfiles_raw/google_transit_{date}.zip'
                                       for date in full_date_range]

An example filename would look like

print(zip_filename_list[0])
cta_schedule_zipfiles_raw/google_transit_2023-01-01.zip

Calling extract_date gives

print(extract_date(zip_filename_list[0]))
2023-01-01

It will split on '_' and take the last entry which is '2023-01-01.zip'. It then splits on '.' and takes the first entry, which is '2023-01-01'.

There's probably a library or regex that would be more robust though.


def create_route_summary(CTA_GTFS: sga.GTFSFeed) -> pd.DataFrame:
data = sga.GTFSFeed.extract_data(CTA_GTFS)
data = sga.format_dates_hours(data)
trip_summary = sga.make_trip_summary(data)

route_daily_summary = (
sga.summarize_date_rt(trip_summary)
)

route_daily_summary['date'] = route_daily_summary['date'].astype(str)
route_daily_summary_today = route_daily_summary.loc[route_daily_summary['date'].isin(date_range)]
return route_daily_summary_today

print('Using zipfiles found in public bucket')
s3zip_list = []
for fname in found_list:
zip_bytes = BytesIO()
zip_bytes.seek(0)
client.download_fileobj(Bucket=sga.BUCKET, Key=fname, Fileobj=zip_bytes)
zipfilesched = sga.zipfile.ZipFile(zip_bytes)
fdate = extract_date(fname)
s3zip_list.append(
{
'zip_filename': fname,
'zip': zipfilesched,
'csv_filename': f'schedule_summaries/daily_job/'
f'cta/cta_route_daily_summary_{fdate}.csv'
}
)

s3_route_daily_summary_dict = {
'zip_filenames': [gtfs['zip_filename'] for gtfs in s3zip_list],
'summaries': [create_route_summary(gtfs['zip']) for gtfs in s3zip_list],
'csv_filenames': [gtfs['csv_filename'] for gtfs in s3zip_list]
}

print(f'Saving cta schedule summary files in {date_range} to public bucket')
for filename, summary in zip(
s3_route_daily_summary_dict['csv_filenames'],
s3_route_daily_summary_dict['summaries']
):
save_csv_to_bucket(summary, filename=filename)

confirm_saved_files(s3_route_daily_summary_dict)

transitfeeds_list = list(set(zip_filename_list).difference(set(found_list)))
Copy link
Member

@lauriemerrell lauriemerrell Sep 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the part that I think I would break out into like a one-time backfill... So instead of having this within the daily summaries, have like one function in a one-time script that checks for every date between 2022-05-20 and the present (maybe you can specify a smaller date range) and saves the zipfiles to S3 if they don't already exist. Maybe zipfiles from transitfeeds have a different name or something to distinguish them.

And then all the daily summary stuff would only look for individual zipfiles in S3 and if a zipfile is not present it can just print an error and keep going and we can kind of fully decouple the zipfile downloading from the daily summary generation.... If that makes sense?

if transitfeeds_list:
print(', '.join(transitfeeds_list) + ' were not found in s3. Using transitfeeds.com')
transitfeeds_dates = []
for fname in transitfeeds_list:
# Extract date from string after splitting on '_' and then '.'
fdate = extract_date(fname)
transitfeeds_dates.append(fdate)


transitfeeds_dates = sorted(transitfeeds_dates)
schedule_list = csrt.create_schedule_list(month=5, year=2022)
schedule_list_filtered = [
s for s in schedule_list
if s['feed_start_date'] >= min(transitfeeds_dates)
and s['feed_start_date'] <= max(transitfeeds_dates)
]


def save_realtime_daily_summary() -> None:
if pendulum.now("America/Chicago").hour >= 11:
end_date = pendulum.yesterday("America/Chicago")
else:
end_date = pendulum.now("America/Chicago").subtract(days=2)
trip_summaries_transitfeeds_dict = {'zip_filenames': [], 'zips': [], 'csv_filenames': [],
'summaries': []}

for sched in schedule_list_filtered:
CTA_GTFS, zipfile_bytes_io = sga.download_zip(sched['schedule_version'])
trip_summaries_transitfeeds_dict['zip_filenames'].append(
f"transitfeeds_schedule_zipfiles_raw/{sched['schedule_version']}.zip"
)
trip_summaries_transitfeeds_dict['zips'].append((CTA_GTFS, zipfile_bytes_io))
trip_summaries_transitfeeds_dict['summaries'].append(create_route_summary(CTA_GTFS))
trip_summaries_transitfeeds_dict['csv_filenames'].append(
f'schedule_summaries/daily_job/transitfeeds/'
f'transitfeeds_route_daily_summary_v{sched["schedule_version"]}.csv'
)
print(
f'Saving transitfeeds schedule summary files and zip files '
f'in {date_range} to public bucket'
)
for csv_filename, summary, zip_filename, zipfile in zip(
trip_summaries_transitfeeds_dict['csv_filenames'],
trip_summaries_transitfeeds_dict['summaries'],
trip_summaries_transitfeeds_dict['zip_filenames'],
trip_summaries_transitfeeds_dict['zips']
):
save_csv_to_bucket(summary, filename=csv_filename)
# Save the zip file
client.upload_fileobj(
zipfile[1],
csrt.BUCKET_PUBLIC,
zip_filename
)
confirm_saved_files(trip_summaries_transitfeeds_dict)

end_date = end_date.to_date_string()

def save_realtime_daily_summary(date: str = None) -> None:
if date is None:
if pendulum.now("America/Chicago").hour >= 11:
date = pendulum.yesterday("America/Chicago")
else:
date = pendulum.now("America/Chicago").subtract(days=2)

date = date.to_date_string()
print(f'Date not given. Taking the latest available date {date}.')
else:
date = pendulum.parse(date).strftime('%Y-%m-%d')

daily_data = pd.read_csv(
(csrt.BASE_PATH / f"bus_full_day_data_v2/{end_date}.csv")
(csrt.BASE_PATH / f"bus_full_day_data_v2/{date}.csv")
.as_uri(),
low_memory=False
)

daily_data = csrt.make_daily_summary(daily_data)
filename = f'realtime_summaries/daily_job/bus_full_day_data_v2/{end_date}.csv'
filename = f'realtime_summaries/daily_job/bus_full_day_data_v2/{date}.csv'
save_csv_to_bucket(daily_data, filename=filename)

print(f'Confirm that {filename} exists in bucket')
keys(csrt.BUCKET_PUBLIC, [filename])
_ = keys(csrt.BUCKET_PUBLIC, [filename])

# https://stackoverflow.com/questions/30249069/listing-contents-of-a-bucket-with-boto3
def keys(bucket_name: str, filenames: list,
prefix: str='/', delimiter: str='/',
start_after: str='') -> None:
start_after: str='') -> list:
s3_paginator = client.get_paginator('list_objects_v2')
prefix = prefix.lstrip(delimiter)
start_after = (start_after or prefix) if prefix.endswith(delimiter) else start_after
found_list = []
for page in s3_paginator.paginate(Bucket=bucket_name, Prefix=prefix, StartAfter=start_after):
for content in page.get('Contents', ()):
if content['Key'] in filenames:
print(f"{content['Key']} exists")
found_list.append(content['Key'])
return found_list