Skip to content

Commit

Permalink
Merge pull request #61 from chihacknight/automate-schedule-downloads
Browse files Browse the repository at this point in the history
Automate schedule downloads
  • Loading branch information
lauriemerrell authored Sep 20, 2023
2 parents e3830ac + 4c06991 commit 0a73534
Show file tree
Hide file tree
Showing 4 changed files with 235 additions and 20 deletions.
68 changes: 68 additions & 0 deletions .github/workflows/cta_data_downloads.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
name: Automate CTA schedule and realtime downloads

on:

schedule:
# Run every day at 12:30pm CST which is 5:30pm UTC
- cron: 30 17 * * *

env:
PYTHON_VERSION: 3.10.6
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}

jobs:
download-cta-schedule-data:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v3

- uses: actions/setup-python@v4
with:
python-version: ${{ env.PYTHON_VERSION }}

- name: Download and save CTA schedule data

run: |
pip install -r requirements.txt
python -c 'from scrape_data.cta_data_downloads import save_cta_zip; \
save_cta_zip()' \
$AWS_ACCESS_KEY_ID $AWS_SECRET_ACCESS_KEY

save-schedule-daily-summary:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v3

- uses: actions/setup-python@v4
with:
python-version: ${{ env.PYTHON_VERSION }}

- name: 'Save schedule summaries'
run: |
pip install -r requirements.txt
python -c 'from scrape_data.cta_data_downloads import save_sched_daily_summary; \
save_sched_daily_summary()' $AWS_ACCESS_KEY_ID $AWS_SECRET_ACCESS_KEY

save-realtime-daily-summary:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v3

- uses: actions/setup-python@v4
with:
python-version: ${{ env.PYTHON_VERSION }}

- name: 'Save realtime summaries'

run: |
pip install -r requirements.txt
python -c 'from scrape_data.cta_data_downloads import save_realtime_daily_summary; \
save_realtime_daily_summary()' $AWS_ACCESS_KEY_ID $AWS_SECRET_ACCESS_KEY
70 changes: 51 additions & 19 deletions data_analysis/static_gtfs_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import os
from pathlib import Path
from dataclasses import dataclass
from typing import List
from typing import Tuple

import logging
import calendar
Expand All @@ -38,7 +38,6 @@
datefmt='%m/%d/%Y %I:%M:%S %p'
)


@dataclass
class GTFSFeed:
"""Class for storing GTFSFeed data.
Expand All @@ -53,24 +52,32 @@ class GTFSFeed:

@classmethod
def extract_data(cls, gtfs_zipfile: zipfile.ZipFile,
version_id: str = None) -> GTFSFeed:
version_id: str = None, cta_download: bool = True) -> GTFSFeed:
"""Load each text file in zipfile into a DataFrame
Args:
gtfs_zipfile (zipfile.ZipFile): Zipfile downloaded from
CTA transit feeds e.g.
transitfeeds.com or transitchicago.com e.g.
https://transitfeeds.com/p/chicago-transit-authority/
165/20220718/download"
165/20220718/download or https://www.transitchicago.com/downloads/sch_data/
version_id (str, optional): The schedule version in use.
Defaults to None.
Returns:
GTFSFeed: A GTFSFeed object containing multiple DataFrames
accessible by name.
"""
if version_id is None:
version_id = VERSION_ID
logging.info(f"Extracting data from CTA zipfile version {version_id}")
if cta_download:
if version_id is not None:
raise ValueError("version_id is not used for downloads directly from CTA")
else:
logging.info(f"Extracting data from transitchicago.com zipfile")

else:
if version_id is None:
version_id = VERSION_ID
logging.info(f"Extracting data from transitfeeds.com zipfile version {version_id}")

data_dict = {}
pbar = tqdm(cls.__annotations__.keys())
for txt_file in pbar:
Expand Down Expand Up @@ -140,14 +147,16 @@ def format_dates_hours(data: GTFSFeed) -> GTFSFeed:

def make_trip_summary(
data: GTFSFeed,
feed_start_date: pendulum.datetime,
feed_end_date: pendulum.datetime) -> pd.DataFrame:
feed_start_date: pendulum.datetime = None,
feed_end_date: pendulum.datetime = None) -> pd.DataFrame:
"""Create a summary of trips with one row per date
Args:
data (GTFSFeed): GTFS data from CTA
feed_start_date (datetime): Date from which this feed is valid (inclusive)
feed_end_date (datetime): Date until which this feed is valid (inclusive)
feed_start_date (datetime): Date from which this feed is valid (inclusive).
Defaults to None
feed_end_date (datetime): Date until which this feed is valid (inclusive).
Defaults to None
Returns:
pd.DataFrame: A DataFrame with each trip that occurred per row.
Expand All @@ -161,7 +170,7 @@ def make_trip_summary(
),
columns=["raw_date"],
)

# cross join calendar index with actual calendar to get all combos of
# possible dates & services
calendar_cross = calendar_date_range.merge(data.calendar, how="cross")
Expand Down Expand Up @@ -244,9 +253,10 @@ def make_trip_summary(
trip_stop_hours, how="left", on="trip_id")

# filter to only the rows for the period where this specific feed version was in effect
trip_summary = trip_summary.loc[
(trip_summary['raw_date'] >= feed_start_date)
& (trip_summary['raw_date'] <= feed_end_date), :]
if feed_start_date is not None and feed_end_date is not None:
trip_summary = trip_summary.loc[
(trip_summary['raw_date'] >= feed_start_date)
& (trip_summary['raw_date'] <= feed_end_date), :]

return trip_summary

Expand Down Expand Up @@ -321,6 +331,23 @@ def make_linestring_of_points(
return shapely.geometry.LineString(list(sorted_df["pt"]))


def download_cta_zip() -> Tuple[zipfile.ZipFile, BytesIO]:
"""Download CTA schedule data from transitchicago.com
Returns:
zipfile.ZipFile: A zipfile of the latest GTFS schedule data from transitchicago.com
"""
logger.info('Downloading CTA data')
zip_bytes_io = BytesIO(
requests.get("https://www.transitchicago.com/downloads/sch_data/google_transit.zip"
).content
)
CTA_GTFS = zipfile.ZipFile(zip_bytes_io)
logging.info('Download complete')
return CTA_GTFS, zip_bytes_io



def download_zip(version_id: str) -> zipfile.ZipFile:
"""Download a version schedule from transitfeeds.com
Expand All @@ -344,17 +371,22 @@ def download_zip(version_id: str) -> zipfile.ZipFile:
return CTA_GTFS


def download_extract_format(version_id: str) -> GTFSFeed:
def download_extract_format(version_id: str = None) -> GTFSFeed:
"""Download a zipfile of GTFS data for a given version_id,
extract data, and format date column.
Args:
version_id (str): The version of the GTFS schedule data to download
version_id (str): The version of the GTFS schedule data to download. Defaults to None
If version_id is None, data will be downloaded from the CTA directly (transitchicag.com)
instead of transitfeeds.com
Returns:
GTFSFeed: A GTFSFeed object with formated dates
"""
CTA_GTFS = download_zip(version_id)
if version_id is None:
CTA_GTFS, _ = download_cta_zip()
else:
CTA_GTFS = download_zip(version_id)
data = GTFSFeed.extract_data(CTA_GTFS, version_id=version_id)
data = format_dates_hours(data)
return data
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ python-dotenv==0.20.0
seaborn==0.12.0
PyQt5==5.15.7
folium==0.12.1.post1
mapclassify==2.4.2+55.g0155c6e
mapclassify>=2.4.2+55.g0155c6e
plotly==5.11.0
kaleido==0.2.1
pre-commit==2.20.0
115 changes: 115 additions & 0 deletions scrape_data/cta_data_downloads.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import boto3
import sys
import data_analysis.static_gtfs_analysis as sga
import data_analysis.compare_scheduled_and_rt as csrt
import pendulum
from io import StringIO
import pandas as pd


ACCESS_KEY = sys.argv[1]
SECRET_KEY = sys.argv[2]

client = boto3.client(
's3',
aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_KEY
)

s3 = boto3.resource(
's3',
region_name='us-east-1',
aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_KEY
)

today = pendulum.now('America/Chicago').to_date_string()

CTA_GTFS, zipfile_bytes_io = sga.download_cta_zip()

def save_cta_zip() -> None:
print(f'Saving zipfile available at '
f'https://www.transitchicago.com/downloads/sch_data/google_transit.zip '
f'on {today} to public bucket')
filename = f'cta_schedule_zipfiles_raw/google_transit_{today}.zip'
zipfile_bytes_io.seek(0)
client.upload_fileobj(
zipfile_bytes_io,
csrt.BUCKET_PUBLIC,
filename
)
print(f'Confirm that {filename} exists in bucket')
keys('chn-ghost-buses-public', [filename])


def save_csv_to_bucket(df: pd.DataFrame, filename: str) -> None:
"""Save pandas DataFrame to csv in s3
Args:
df (pd.DataFrame): DataFrame to be saved
filename (str): Name of the saved filename in s3.
Should contain the .csv suffix.
"""
csv_buffer = StringIO()
df.to_csv(csv_buffer)

print(f'Saving {filename} to public bucket')
s3.Object(
csrt.BUCKET_PUBLIC,
f'{filename}')\
.put(Body=csv_buffer.getvalue())


def save_sched_daily_summary() -> None:
data = sga.GTFSFeed.extract_data(CTA_GTFS)
data = sga.format_dates_hours(data)
trip_summary = sga.make_trip_summary(data)

route_daily_summary = (
sga.summarize_date_rt(trip_summary)
)
route_daily_summary['date'] = route_daily_summary['date'].astype(str)
route_daily_summary_today = route_daily_summary.loc[route_daily_summary['date'] == today]

print(f'Saving cta_route_daily_summary_{today}.csv to public bucket')
filename = f'schedule_summaries/daily_job/cta_route_daily_summary_{today}.csv'
save_csv_to_bucket(
route_daily_summary_today,
filename=filename
)
print(f'Confirm that {filename} exists in bucket')
keys(csrt.BUCKET_PUBLIC, [filename])


def save_realtime_daily_summary() -> None:
if pendulum.now("America/Chicago").hour >= 11:
end_date = pendulum.yesterday("America/Chicago")
else:
end_date = pendulum.now("America/Chicago").subtract(days=2)

end_date = end_date.to_date_string()

daily_data = pd.read_csv(
(csrt.BASE_PATH / f"bus_full_day_data_v2/{end_date}.csv")
.as_uri(),
low_memory=False
)

daily_data = csrt.make_daily_summary(daily_data)
filename = f'realtime_summaries/daily_job/bus_full_day_data_v2/{end_date}.csv'
save_csv_to_bucket(daily_data, filename=filename)

print(f'Confirm that {filename} exists in bucket')
keys(csrt.BUCKET_PUBLIC, [filename])

# https://stackoverflow.com/questions/30249069/listing-contents-of-a-bucket-with-boto3
def keys(bucket_name: str, filenames: list,
prefix: str='/', delimiter: str='/',
start_after: str='') -> None:
s3_paginator = client.get_paginator('list_objects_v2')
prefix = prefix.lstrip(delimiter)
start_after = (start_after or prefix) if prefix.endswith(delimiter) else start_after
for page in s3_paginator.paginate(Bucket=bucket_name, Prefix=prefix, StartAfter=start_after):
for content in page.get('Contents', ()):
if content['Key'] in filenames:
print(f"{content['Key']} exists")

0 comments on commit 0a73534

Please sign in to comment.