diff --git a/.github/workflows/cta_data_downloads.yml b/.github/workflows/cta_data_downloads.yml new file mode 100644 index 0000000..d98520b --- /dev/null +++ b/.github/workflows/cta_data_downloads.yml @@ -0,0 +1,68 @@ +name: Automate CTA schedule and realtime downloads + +on: + + schedule: + # Run every day at 12:30pm CST which is 5:30pm UTC + - cron: 30 17 * * * + +env: + PYTHON_VERSION: 3.10.6 + AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} + AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + +jobs: + download-cta-schedule-data: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v4 + with: + python-version: ${{ env.PYTHON_VERSION }} + + - name: Download and save CTA schedule data + + run: | + pip install -r requirements.txt + python -c 'from scrape_data.cta_data_downloads import save_cta_zip; \ + save_cta_zip()' \ + $AWS_ACCESS_KEY_ID $AWS_SECRET_ACCESS_KEY + + + save-schedule-daily-summary: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v4 + with: + python-version: ${{ env.PYTHON_VERSION }} + + - name: 'Save schedule summaries' + run: | + pip install -r requirements.txt + python -c 'from scrape_data.cta_data_downloads import save_sched_daily_summary; \ + save_sched_daily_summary()' $AWS_ACCESS_KEY_ID $AWS_SECRET_ACCESS_KEY + + + save-realtime-daily-summary: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v4 + with: + python-version: ${{ env.PYTHON_VERSION }} + + - name: 'Save realtime summaries' + + run: | + pip install -r requirements.txt + + python -c 'from scrape_data.cta_data_downloads import save_realtime_daily_summary; \ + save_realtime_daily_summary()' $AWS_ACCESS_KEY_ID $AWS_SECRET_ACCESS_KEY + diff --git a/data_analysis/static_gtfs_analysis.py b/data_analysis/static_gtfs_analysis.py index 119366c..65bba88 100644 --- a/data_analysis/static_gtfs_analysis.py +++ b/data_analysis/static_gtfs_analysis.py @@ -13,7 +13,7 @@ import os from pathlib import Path from dataclasses import dataclass -from typing import List +from typing import Tuple import logging import calendar @@ -38,7 +38,6 @@ datefmt='%m/%d/%Y %I:%M:%S %p' ) - @dataclass class GTFSFeed: """Class for storing GTFSFeed data. @@ -53,14 +52,14 @@ class GTFSFeed: @classmethod def extract_data(cls, gtfs_zipfile: zipfile.ZipFile, - version_id: str = None) -> GTFSFeed: + version_id: str = None, cta_download: bool = True) -> GTFSFeed: """Load each text file in zipfile into a DataFrame Args: gtfs_zipfile (zipfile.ZipFile): Zipfile downloaded from - CTA transit feeds e.g. + transitfeeds.com or transitchicago.com e.g. https://transitfeeds.com/p/chicago-transit-authority/ - 165/20220718/download" + 165/20220718/download or https://www.transitchicago.com/downloads/sch_data/ version_id (str, optional): The schedule version in use. Defaults to None. @@ -68,9 +67,17 @@ def extract_data(cls, gtfs_zipfile: zipfile.ZipFile, GTFSFeed: A GTFSFeed object containing multiple DataFrames accessible by name. """ - if version_id is None: - version_id = VERSION_ID - logging.info(f"Extracting data from CTA zipfile version {version_id}") + if cta_download: + if version_id is not None: + raise ValueError("version_id is not used for downloads directly from CTA") + else: + logging.info(f"Extracting data from transitchicago.com zipfile") + + else: + if version_id is None: + version_id = VERSION_ID + logging.info(f"Extracting data from transitfeeds.com zipfile version {version_id}") + data_dict = {} pbar = tqdm(cls.__annotations__.keys()) for txt_file in pbar: @@ -140,14 +147,16 @@ def format_dates_hours(data: GTFSFeed) -> GTFSFeed: def make_trip_summary( data: GTFSFeed, - feed_start_date: pendulum.datetime, - feed_end_date: pendulum.datetime) -> pd.DataFrame: + feed_start_date: pendulum.datetime = None, + feed_end_date: pendulum.datetime = None) -> pd.DataFrame: """Create a summary of trips with one row per date Args: data (GTFSFeed): GTFS data from CTA - feed_start_date (datetime): Date from which this feed is valid (inclusive) - feed_end_date (datetime): Date until which this feed is valid (inclusive) + feed_start_date (datetime): Date from which this feed is valid (inclusive). + Defaults to None + feed_end_date (datetime): Date until which this feed is valid (inclusive). + Defaults to None Returns: pd.DataFrame: A DataFrame with each trip that occurred per row. @@ -161,7 +170,7 @@ def make_trip_summary( ), columns=["raw_date"], ) - + # cross join calendar index with actual calendar to get all combos of # possible dates & services calendar_cross = calendar_date_range.merge(data.calendar, how="cross") @@ -244,9 +253,10 @@ def make_trip_summary( trip_stop_hours, how="left", on="trip_id") # filter to only the rows for the period where this specific feed version was in effect - trip_summary = trip_summary.loc[ - (trip_summary['raw_date'] >= feed_start_date) - & (trip_summary['raw_date'] <= feed_end_date), :] + if feed_start_date is not None and feed_end_date is not None: + trip_summary = trip_summary.loc[ + (trip_summary['raw_date'] >= feed_start_date) + & (trip_summary['raw_date'] <= feed_end_date), :] return trip_summary @@ -321,6 +331,23 @@ def make_linestring_of_points( return shapely.geometry.LineString(list(sorted_df["pt"])) +def download_cta_zip() -> Tuple[zipfile.ZipFile, BytesIO]: + """Download CTA schedule data from transitchicago.com + + Returns: + zipfile.ZipFile: A zipfile of the latest GTFS schedule data from transitchicago.com + """ + logger.info('Downloading CTA data') + zip_bytes_io = BytesIO( + requests.get("https://www.transitchicago.com/downloads/sch_data/google_transit.zip" + ).content + ) + CTA_GTFS = zipfile.ZipFile(zip_bytes_io) + logging.info('Download complete') + return CTA_GTFS, zip_bytes_io + + + def download_zip(version_id: str) -> zipfile.ZipFile: """Download a version schedule from transitfeeds.com @@ -344,17 +371,22 @@ def download_zip(version_id: str) -> zipfile.ZipFile: return CTA_GTFS -def download_extract_format(version_id: str) -> GTFSFeed: +def download_extract_format(version_id: str = None) -> GTFSFeed: """Download a zipfile of GTFS data for a given version_id, extract data, and format date column. Args: - version_id (str): The version of the GTFS schedule data to download + version_id (str): The version of the GTFS schedule data to download. Defaults to None + If version_id is None, data will be downloaded from the CTA directly (transitchicag.com) + instead of transitfeeds.com Returns: GTFSFeed: A GTFSFeed object with formated dates """ - CTA_GTFS = download_zip(version_id) + if version_id is None: + CTA_GTFS, _ = download_cta_zip() + else: + CTA_GTFS = download_zip(version_id) data = GTFSFeed.extract_data(CTA_GTFS, version_id=version_id) data = format_dates_hours(data) return data diff --git a/requirements.txt b/requirements.txt index a709e5d..9e3218d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,7 +9,7 @@ python-dotenv==0.20.0 seaborn==0.12.0 PyQt5==5.15.7 folium==0.12.1.post1 -mapclassify==2.4.2+55.g0155c6e +mapclassify>=2.4.2+55.g0155c6e plotly==5.11.0 kaleido==0.2.1 pre-commit==2.20.0 diff --git a/scrape_data/cta_data_downloads.py b/scrape_data/cta_data_downloads.py new file mode 100644 index 0000000..75f10d3 --- /dev/null +++ b/scrape_data/cta_data_downloads.py @@ -0,0 +1,115 @@ +import boto3 +import sys +import data_analysis.static_gtfs_analysis as sga +import data_analysis.compare_scheduled_and_rt as csrt +import pendulum +from io import StringIO +import pandas as pd + + +ACCESS_KEY = sys.argv[1] +SECRET_KEY = sys.argv[2] + +client = boto3.client( + 's3', + aws_access_key_id=ACCESS_KEY, + aws_secret_access_key=SECRET_KEY +) + +s3 = boto3.resource( + 's3', + region_name='us-east-1', + aws_access_key_id=ACCESS_KEY, + aws_secret_access_key=SECRET_KEY +) + +today = pendulum.now('America/Chicago').to_date_string() + +CTA_GTFS, zipfile_bytes_io = sga.download_cta_zip() + +def save_cta_zip() -> None: + print(f'Saving zipfile available at ' + f'https://www.transitchicago.com/downloads/sch_data/google_transit.zip ' + f'on {today} to public bucket') + filename = f'cta_schedule_zipfiles_raw/google_transit_{today}.zip' + zipfile_bytes_io.seek(0) + client.upload_fileobj( + zipfile_bytes_io, + csrt.BUCKET_PUBLIC, + filename + ) + print(f'Confirm that {filename} exists in bucket') + keys('chn-ghost-buses-public', [filename]) + + +def save_csv_to_bucket(df: pd.DataFrame, filename: str) -> None: + """Save pandas DataFrame to csv in s3 + + Args: + df (pd.DataFrame): DataFrame to be saved + filename (str): Name of the saved filename in s3. + Should contain the .csv suffix. + """ + csv_buffer = StringIO() + df.to_csv(csv_buffer) + + print(f'Saving {filename} to public bucket') + s3.Object( + csrt.BUCKET_PUBLIC, + f'{filename}')\ + .put(Body=csv_buffer.getvalue()) + + +def save_sched_daily_summary() -> None: + data = sga.GTFSFeed.extract_data(CTA_GTFS) + data = sga.format_dates_hours(data) + trip_summary = sga.make_trip_summary(data) + + route_daily_summary = ( + sga.summarize_date_rt(trip_summary) + ) + route_daily_summary['date'] = route_daily_summary['date'].astype(str) + route_daily_summary_today = route_daily_summary.loc[route_daily_summary['date'] == today] + + print(f'Saving cta_route_daily_summary_{today}.csv to public bucket') + filename = f'schedule_summaries/daily_job/cta_route_daily_summary_{today}.csv' + save_csv_to_bucket( + route_daily_summary_today, + filename=filename + ) + print(f'Confirm that {filename} exists in bucket') + keys(csrt.BUCKET_PUBLIC, [filename]) + + +def save_realtime_daily_summary() -> None: + if pendulum.now("America/Chicago").hour >= 11: + end_date = pendulum.yesterday("America/Chicago") + else: + end_date = pendulum.now("America/Chicago").subtract(days=2) + + end_date = end_date.to_date_string() + + daily_data = pd.read_csv( + (csrt.BASE_PATH / f"bus_full_day_data_v2/{end_date}.csv") + .as_uri(), + low_memory=False + ) + + daily_data = csrt.make_daily_summary(daily_data) + filename = f'realtime_summaries/daily_job/bus_full_day_data_v2/{end_date}.csv' + save_csv_to_bucket(daily_data, filename=filename) + + print(f'Confirm that {filename} exists in bucket') + keys(csrt.BUCKET_PUBLIC, [filename]) + +# https://stackoverflow.com/questions/30249069/listing-contents-of-a-bucket-with-boto3 +def keys(bucket_name: str, filenames: list, + prefix: str='/', delimiter: str='/', + start_after: str='') -> None: + s3_paginator = client.get_paginator('list_objects_v2') + prefix = prefix.lstrip(delimiter) + start_after = (start_after or prefix) if prefix.endswith(delimiter) else start_after + for page in s3_paginator.paginate(Bucket=bucket_name, Prefix=prefix, StartAfter=start_after): + for content in page.get('Contents', ()): + if content['Key'] in filenames: + print(f"{content['Key']} exists")