Skip to content

Commit

Permalink
Merge pull request #212 from datamade/hcg/batch-it-up
Browse files Browse the repository at this point in the history
Import transactions by quarter, in chunks of a few hundred
  • Loading branch information
hancush authored Sep 23, 2024
2 parents 560f70a + 0c189ca commit 25ba297
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 64 deletions.
31 changes: 9 additions & 22 deletions .github/workflows/etl.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,38 +23,25 @@ jobs:
-e DATABASE_URL=${{ secrets.DATABASE_URL }} \
app make import/candidates import/pacs import/candidate_filings import/pac_filings
import_2023:
import_transactions:
runs-on: ubuntu-latest
needs: import_filings
strategy:
matrix:
transaction_type: [CON, EXP]
year: [2023, 2024]
quarter: [1, 2, 3, 4]

steps:
- uses: actions/checkout@v3
with:
ref: "deploy"
- name: Import data for 2023
run: |
touch .env
docker compose -f docker-compose.etl.yml run --rm \
-e AWS_STORAGE_BUCKET_NAME=${{ secrets.AWS_STORAGE_BUCKET_NAME }} \
-e AWS_ACCESS_KEY_ID=${{ secrets.AWS_ACCESS_KEY_ID }} \
-e AWS_SECRET_ACCESS_KEY=${{ secrets.AWS_SECRET_ACCESS_KEY }} \
-e DATABASE_URL=${{ secrets.DATABASE_URL }} \
app make import/CON_2023 import/EXP_2023
import_2024:
runs-on: ubuntu-latest
needs: import_filings

steps:
- uses: actions/checkout@v3
with:
ref: "deploy"
- name: Import data for 2024
ref: "hcg/batch-it-up"
- name: Import transaction data
run: |
touch .env
docker compose -f docker-compose.etl.yml run --rm \
-e AWS_STORAGE_BUCKET_NAME=${{ secrets.AWS_STORAGE_BUCKET_NAME }} \
-e AWS_ACCESS_KEY_ID=${{ secrets.AWS_ACCESS_KEY_ID }} \
-e AWS_SECRET_ACCESS_KEY=${{ secrets.AWS_SECRET_ACCESS_KEY }} \
-e DATABASE_URL=${{ secrets.DATABASE_URL }} \
app make import/CON_2024 import/EXP_2024
app make import/${{ matrix.transaction_type }}_${{ matrix.quarter }}_${{ matrix.year }}
20 changes: 15 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,15 +1,26 @@
THIS_YEAR=$(shell date +"%Y")
NIGHTLY_YEARS=$(shell seq 2023 $(THIS_YEAR))
QUARTERLY_YEARS=$(shell seq 2020 $(THIS_YEAR))

define quarterly_target
$(foreach YEAR,$(1),$(patsubst %,import/$(2)_%_$(YEAR),1 2 3 4))
endef

.PHONY : quarterly
quarterly: import/candidates import/pacs import/candidate_filings import/pac_filings import/CON_2020 import/EXP_2020 import/CON_2021 import/EXP_2021 import/CON_2022 import/EXP_2022 import/CON_2023 import/EXP_2023 import/CON_2024 import/EXP_2024
quarterly: import/candidates import/pacs import/candidate_filings import/pac_filings \
$(call quarterly_target,$(QUARTERLY_YEARS),CON) $(call quarterly_target,$(QUARTERLY_YEARS),EXP)
python manage.py make_search_index

.PHONY : nightly
nightly: import/candidates import/pacs import/candidate_filings import/pac_filings import/CON_2023 import/EXP_2023 import/CON_2024 import/EXP_2024
nightly: import/candidates import/pacs import/candidate_filings import/pac_filings \
$(call quarterly_target,$(NIGHTLY_YEARS),CON) $(call quarterly_target,$(NIGHTLY_YEARS),EXP)
python manage.py make_search_index

import/% : _data/sorted/%.csv
.SECONDEXPANSION:
import/% : _data/sorted/$$(word 1, $$(subst _, , $$*))_$$(word 3, $$(subst _, , $$*)).csv
python manage.py import_transactions --transaction-type $(word 1, $(subst _, , $*)) \
--year $(word 2, $(subst _, , $*)) \
--quarters $(word 2, $(subst _, , $*)) \
--year $(word 3, $(subst _, , $*)) \
--file $<

import/pac_filings : _data/raw/pac_committee_filings.csv
Expand All @@ -30,7 +41,6 @@ _data/raw/%_committees.csv :
_data/raw/%_committee_filings.csv :
wget --no-check-certificate --no-use-server-timestamps -O $@ "https://openness-project-nmid.s3.amazonaws.com/$*_committee_filings.csv"


_data/sorted/%.csv : _data/raw/%.csv
xsv fixlengths $< | xsv sort -s OrgID,"Report Name","Start of Period","End of Period" > $@

Expand Down
153 changes: 124 additions & 29 deletions camp_fin/management/commands/import_transactions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import csv
import math
import re
from itertools import groupby

Expand All @@ -13,17 +14,35 @@


def filing_key(record):
start_date = parse_date(record["Start of Period"])
end_date = parse_date(record["End of Period"])

return (
record["OrgID"],
record["Report Name"],
start_date.year if start_date else None,
end_date.year if end_date else None,
parse_date(record["Start of Period"]),
parse_date(record["End of Period"]),
)


def get_quarter(date_str):
date = parse_date(date_str)
return math.ceil(date.month / 3.0)


def get_month_range(quarters):
quarter_to_month_range = {
1: (1, 3),
2: (4, 6),
3: (7, 9),
4: (10, 12),
}

months = []

for q in quarters:
months.extend(quarter_to_month_range[q])

return min(months), max(months)


class Command(BaseCommand):
help = """
Import data from the New Mexico Campaign Finance System:
Expand All @@ -39,12 +58,24 @@ def add_arguments(self, parser):
default="CON",
help="Type of transaction to import: CON, EXP (Default: CON)",
)
parser.add_argument(
"--quarters",
dest="quarters",
default="1,2,3,4",
help="Comma-separated list of quarters to import (Default: 1,2,3,4)",
)
parser.add_argument(
"--year",
dest="year",
default="2023",
help="Year to import (Default: 2023)",
)
parser.add_argument(
"--batch-size",
dest="batch_size",
default=500,
help="Number of transaction records to bulk create at once (Default: 500)",
)
parser.add_argument(
"--file",
dest="file",
Expand All @@ -53,38 +84,83 @@ def add_arguments(self, parser):
)

def handle(self, *args, **options):
if options["transaction_type"] not in ("EXP", "CON"):
transaction_type = options["transaction_type"]

if transaction_type not in ("EXP", "CON"):
raise ValueError("Transaction type must be one of: EXP, CON")

year = options["year"]

self.stdout.write(f"Loading data from {transaction_type}_{year}.csv")

quarters = {int(q) for q in options["quarters"].split(",")}
quarter_string = ", ".join(f"Q{q}" for q in quarters)

with open(options["file"]) as f:
if options["transaction_type"] == "CON":
self.import_contributions(f, year)
self.stdout.write(
f"Importing transactions from filing periods beginning in {quarter_string}"
)

if transaction_type == "CON":
self.import_contributions(f, quarters, year, options["batch_size"])

elif transaction_type == "EXP":
self.import_expenditures(f, quarters, year, options["batch_size"])

elif options["transaction_type"] == "EXP":
self.import_expenditures(f, year)
self.stdout.write(self.style.SUCCESS("Transactions imported!"))

self.stdout.write(
f"Totaling filings from periods beginning in {quarter_string}"
)
self.total_filings(quarters, year)
self.stdout.write(self.style.SUCCESS("Filings totaled!"))

self.total_filings(year)
call_command("aggregate_data")

def import_contributions(self, f, year):
def _records_by_filing(self, records, filing_quarters):
"""
Group records by filing, then filter for filings beginning in the specified
quarter/s. Note that, because transactions are organized by year, transactions
for one filing can appear across two files, if the reporting period begins in
one year and ends in the next. This approach will return filings beginning in
the specified quarter in *any* year, so that these split cases will be covered.
For example, consider a filing period starting in December 2023 and ending in
February 2024. Transactions would be split across the 2023 and 2024 files. To
get them all, you would run the Q4 import for both 2023 and 2024.
"""
records_in_quarters = filter(
lambda x: get_quarter(x["Start of Period"]) in filing_quarters, records
)
return groupby(tqdm(records_in_quarters), key=filing_key)

def _save_batch(self, batch):
"""
Contributions are represented by several different types of models. Sort
then group them by class, then save each group of records.
"""
for cls, cls_records in groupby(
sorted(batch, key=lambda x: str(type(x))), key=lambda x: type(x)
):
yield cls.objects.bulk_create(cls_records)

def import_contributions(self, f, quarters, year, batch_size):
reader = csv.DictReader(f)
batch = []

for filing_group, records in groupby(tqdm(reader), key=filing_key):
for _, records in self._records_by_filing(reader, quarters):
for i, record in enumerate(records):
if i == 0:
try:
filing = self._get_filing(record)
except ValueError:
break

# the contributions file are organized by the year
# of a transaction date not the date of the
# The contributions files are organized by the year
# of the transaction date, not the date of the
# filing, so transactions from the same filing can
# appear in multiple contribution files.
#
# we need to make sure we just clear out the
# We need to make sure we just clear out the
# contributions in a file that were purportedly made
# in a given year.
models.Loan.objects.filter(
Expand All @@ -105,17 +181,26 @@ def import_contributions(self, f, year):
record["Contribution Type"] in {"Loans Received", "Special Event"}
or "Contribution" in record["Contribution Type"]
):
self.make_contribution(record, contributor, filing).save()
contribution = self.make_contribution(record, contributor, filing)
batch.append(contribution)

else:
self.stderr.write(
f"Could not determine contribution type from record: {record['Contribution Type']}"
)

def import_expenditures(self, f, year):
if len(batch) % batch_size == 0:
self._save_batch(batch)
batch = []

if len(batch) > 0:
self._save_batch(batch)

def import_expenditures(self, f, quarters, year, batch_size):
reader = csv.DictReader(f)
batch = []

for filing_group, records in groupby(tqdm(reader), key=filing_key):
for _, records in self._records_by_filing(reader, quarters):
for i, record in enumerate(records):
if i == 0:
try:
Expand All @@ -129,7 +214,12 @@ def import_expenditures(self, f, year):
received_date__year=year,
).delete()

self.make_contribution(record, None, filing).save()
contribution = self.make_contribution(record, None, filing)
batch.append(contribution)

if not len(batch) % batch_size:
self._save_batch(batch)
batch = []

def make_contributor(self, record):
state, _ = models.State.objects.get_or_create(
Expand Down Expand Up @@ -268,7 +358,10 @@ def _get_filing(self, record):
"filing_period__initial_date",
"filing_period__end_date",
)
msg = f"{filings.count()} filings found for PAC {pac} from record {record}:\n{filing_meta}\n\nUsing most recent filing matching query..."
msg = (
f"{filings.count()} filings found for PAC {pac} from record "
f"{record}:\n{filing_meta}\n\nUsing most recent filing matching query..."
)
self.stderr.write(msg)

return filing
Expand Down Expand Up @@ -410,12 +503,16 @@ def make_contribution(self, record, contributor, filing):

return contribution

def total_filings(self, year):
for filing in models.Filing.objects.filter(
final=True,
filing_period__initial_date__year__lte=year,
filing_period__end_date__year__gte=year,
).iterator():
def total_filings(self, quarters, year):
start, end = get_month_range(quarters)

for filing in tqdm(
models.Filing.objects.filter(
final=True,
filing_period__initial_date__month__gte=start,
filing_period__initial_date__month__lte=end,
).iterator()
):
contributions = filing.contributions().aggregate(total=Sum("amount"))
expenditures = filing.expenditures().aggregate(total=Sum("amount"))
loans = filing.loans().aggregate(total=Sum("amount"))
Expand All @@ -425,5 +522,3 @@ def total_filings(self, year):
filing.total_loans = loans["total"] or 0

filing.save()

self.stdout.write(f"Totalled {filing}")
2 changes: 0 additions & 2 deletions camp_fin/tests/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
version: "2.4"

services:
app:
# Don't restart the service when the command exits
Expand Down
4 changes: 1 addition & 3 deletions docker-compose.etl.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
version: '2.4'

services:
app:
image: nmid
build: .
build: .
container_name: nmid-etl
environment:
DJANGO_SECRET_KEY: "etl secret key"
Expand Down
4 changes: 1 addition & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
version: '2.4'

services:
app:
image: nmid
build: .
build: .
container_name: nmid
stdin_open: true
tty: true
Expand Down

0 comments on commit 25ba297

Please sign in to comment.