-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathapp.py
401 lines (323 loc) · 12.8 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
import boto3
import datetime
from environs import Env
import json
import logging
import os
import shutil
import signal
import sys
import urllib.parse
from django.db import transaction # type: ignore
from openstates.utils.instrument import Instrumentation
env = Env()
env.read_env()
logging.getLogger("botocore").setLevel(logging.WARNING)
logging.getLogger("boto3").setLevel(logging.WARNING)
logging.getLogger("s3transfer").setLevel(logging.WARNING)
logging.getLogger("urllib3.connectionpool").setLevel(logging.WARNING)
logger = logging.getLogger()
logger.setLevel(logging.INFO)
s3_client = boto3.client("s3")
s3_resource = boto3.resource("s3")
stats = Instrumentation()
def process_import_function(event, context):
"""
Process a file upload.
Args:
event (dict): The event object
context (dict): The context object
Returns:
None
Example for unique_jurisdictions:
unique_jurisdictions = {
"az": {
"id": "ocd-jurisdiction/country:us/state:az/government",
"keys": ["az/2021/2021-01-01.json"],
}
}
Example SQS Message:
{
'bucket': 'openstates-realtime-bills',
'file_archiving_enabled': False,
'file_path': 'ny/bill_184e17ec-919f-11ef-b06c-0a58a9feac02.json',
'jurisdiction_id': 'ocd-jurisdiction/country:us/state:ny/government',
'jurisdiction_name': 'New York'
}
"""
datadir = "/tmp/"
# making this a set to avoid duplicate files in queue
all_files = set()
unique_jurisdictions = {}
# Get the uploaded file's information
sqs_fetch_batch_size = env.int("SQS_FETCH_BATCH_SIZE", 600)
sqs_delete_fetched_messages = env.bool("SQS_DELETE_FETCHED_MESSAGES", True)
messages = batch_retrieval_from_sqs(
sqs_fetch_batch_size, sqs_delete_fetched_messages
)
if not messages:
return
bucket = messages[0].get("bucket")
file_archiving_enabled = env.bool("FILE_ARCHIVING_ENABLED", False)
for message in messages:
bucket = message.get("bucket")
key = message.get("file_path")
jurisdiction_id = message.get("jurisdiction_id")
jurisdiction_name = message.get("jurisdiction_name")
# Archiving processed realtime bills defaults to False, except it was
# explicitly set on cli or on task-definitions Repo as <--archive>
# or added on AWS admin console for os-realtime lambda function
# config as FILE_ARCHIVING_ENABLED=True
file_archiving_enabled = (
message.get("file_archiving_enabled") or file_archiving_enabled
)
# for some reason, the key is url encoded sometimes
key = urllib.parse.unquote(key, encoding="utf-8")
# we want to ignore the trigger for files that are dumped in archive
if key.startswith("archive"):
return
# Get the bytes from S3
key_list = key.split("/")
jurisdiction_abbreviation = key_list[0] # e.g az, al, etc
# we want to filter out unique jurisdiction
if jurisdiction_abbreviation not in unique_jurisdictions:
unique_jurisdictions[jurisdiction_abbreviation] = {
"id": jurisdiction_id,
"name": jurisdiction_name,
"keys": [],
}
unique_jurisdictions[jurisdiction_abbreviation]["keys"].append(key)
# name_of_file = key_list[-1] # e.g. bills.json, votes.json, etc
filedir = f"{datadir}{key}"
# Check if the directory for jurisdiction exists
dir_path = os.path.join(datadir, jurisdiction_abbreviation)
if not os.path.exists(dir_path):
os.makedirs(dir_path)
all_files.add(filedir)
# Download this file to writable tmp space.
try:
s3_client.download_file(bucket, key, filedir)
except Exception as e:
logger.error(f"Error downloading file: {e}")
all_files.remove(filedir)
continue
# Process imports for all files per jurisdiction in a batch
for abbreviation, juris in unique_jurisdictions.items():
file_paths = juris["keys"]
jur_id = juris["id"]
if len(file_paths) == 0:
logger.error(
f"Was about to do an import of {jur_id} "
f"with an empty file_paths list, skipping it"
)
continue
logger.info(f"importing {jur_id}...")
try:
do_import(jur_id, f"{datadir}{abbreviation}")
stats.write_stats(
[
{
"metric": "objects",
"fields": {"realtime_import": 1},
"tags": {"jurisdiction": juris["name"]},
}
]
)
if file_archiving_enabled:
archive_individual_files(bucket, file_paths, filedir)
# delete object from original bucket
for file_path in file_paths:
s3_client.delete_object(Bucket=bucket, Key=file_path)
logger.info(f"Deleted files :: {file_paths}")
except Exception as e:
# Create zip of files we processed for debugging
failed_import_dir = f"{datadir}{abbreviation}"
# upload zip file that contains the directory failed_import_dir
archive_key = archive_jurisdiction_file_folder(
abbreviation, bucket, datadir, failed_import_dir
)
logger.error(
f"Error importing jurisdiction {jur_id}, "
f"stored snapshot of import dir as {archive_key}, error: {e}"
) # noqa: E501
continue
logger.info(f"{len(all_files)} files processed")
stats.close()
def remove_duplicate_message(items):
"""
Remove duplicate messages from the list
"""
# parse the JSON strings and create a list of dictionaries
parsed_items = [json.loads(item) for item in items]
# Use another list comprehension to create a list of unique dictionaries
filtered_items = [
dict(i)
for i in set(tuple(i.items()) for i in parsed_items) # noqa: E501
]
return filtered_items
def archive_jurisdiction_file_folder(
jurisdiction_abbreviation, bucket, tmp_folder_path, file_folder_path
):
# Make a zip file of the jurisdiction's source data
now = datetime.datetime.now()
zip_filename = f"{jurisdiction_abbreviation}-{now.isoformat()}"
zip_filepath = os.path.join(tmp_folder_path, zip_filename)
# shutil puts all the files into the zip folder at root level.
# It does not include the folder in contents
# it does add the ".zip" extension
archive_filename = shutil.make_archive(
zip_filepath, "zip", file_folder_path
)
# Upload to archive section of S3 bucket
s3_destination_key = f"archive/{zip_filename}.zip"
s3_resource.meta.client.upload_file(
archive_filename, bucket, s3_destination_key
)
return s3_destination_key
def archive_individual_files(bucket, all_keys, dest="archive"):
"""
Archive the processed file to avoid possible scenarios of race conditions.
We currently use `meta.client.copy` instead of `client.copy` b/c
it can copy multiple files via multiple threads, since we have batching
in view.
Args:
bucket (str): The s3 bucket name
all_keys (list): The key of the file to be archived
dest (str): The destination folder to move the file to.
Returns:
None
Example:
>>> archive_processed_file("my-bucket", "my-file.json")
"""
for key in all_keys:
copy_source = {"Bucket": bucket, "Key": key}
logger.info(f"Archiving file {key} to {dest}")
try:
s3_resource.meta.client.copy(
copy_source,
bucket,
f"{dest}/{datetime.datetime.utcnow().date()}/{key}", # noqa: E501
)
except Exception as e:
logger.error(f"Error archiving file {key}: {e}")
continue
def retrieve_messages_from_queue(delete_after_fetch=True):
"""
Get the file paths from the SQS.
"""
# Create SQS client
sqs = boto3.client("sqs")
sqs_url = env.str("SQS_QUEUE_URL")
# Receive message from SQS queue
response = sqs.receive_message(
QueueUrl=sqs_url,
AttributeNames=["SentTimestamp"],
MaxNumberOfMessages=10,
MessageAttributeNames=["All"],
VisibilityTimeout=30,
WaitTimeSeconds=1,
)
messages = response.get("Messages", [])
message_bodies = []
if messages:
for message in messages:
message_bodies.append(message.get("Body"))
receipt_handle = message["ReceiptHandle"]
if delete_after_fetch:
# Delete received message from queue
sqs.delete_message(
QueueUrl=sqs_url, ReceiptHandle=receipt_handle
)
return message_bodies
def batch_retrieval_from_sqs(batch_size=600, delete_after_fetch=True):
"""
Retrieve messages from SQS in batches
"""
msg = []
# SQS allows a maximum of 10 messages to be retrieved at a time
for _ in range(batch_size // 10):
msg.extend(retrieve_messages_from_queue(delete_after_fetch))
filtered_messages = remove_duplicate_message(msg)
logger.info(f"message_count: {len(filtered_messages)} received from SQS")
return filtered_messages
def do_atomic(func, datadir, type_):
"""
Run a function in an atomic transaction.
Args:
func (function): The function to run
datadir (str): The directory to run the function on
type_ (str): The type of import
Returns:
The return value of the function
Note: This some imports usually timeout when there are no results for
persons or there are more than one result for a unique person. This error
is meant to be properly handled in the os-core project in the future.
"""
with transaction.atomic():
logger.info(f"Running {type_}...")
try:
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(360)
return func(datadir)
except TimeoutError:
logger.error(f"Timeout running {type_}")
# noqa: E501
finally:
# deactivate the alarm
signal.alarm(0)
def timeout_handler(signum, frame):
raise TimeoutError("Timeout occurred while running the do_atomic function")
def do_import(jurisdiction_id: str, datadir: str) -> None:
"""
Import data for a jurisdiction into DB
Args:
jurisdiction_id (str): The jurisdiction id
datadir (str): The directory where the data is stored temproarily
Returns:
None
"""
# import inside here because to avoid loading Django code unnecessarily
from openstates.importers import (
JurisdictionImporter,
BillImporter,
VoteEventImporter,
EventImporter,
)
from openstates.data.models import Jurisdiction
# datadir = os.path.join(settings.SCRAPED_DATA_DIR, state)
juris_importer = JurisdictionImporter(jurisdiction_id)
# bills postimport disabled because bill postimport does some expensive SQL
# that we want this to be fast
bill_importer = BillImporter(jurisdiction_id, do_postimport=False)
# votes postimport disabled because it will delete all vote events for the
# related bill except the vote event being processed now, causing votes to
# be deleted and re-created
vote_event_importer = VoteEventImporter(
jurisdiction_id, bill_importer, do_postimport=False
)
event_importer = EventImporter(jurisdiction_id, vote_event_importer)
logger.info(f"Datadir: {datadir}")
do_atomic(juris_importer.import_directory, datadir, "jurisdictions")
do_atomic(bill_importer.import_directory, datadir, "bill") # noqa: E501
do_atomic(vote_event_importer.import_directory, datadir, "vote_event")
do_atomic(event_importer.import_directory, datadir, "event") # noqa: E501
Jurisdiction.objects.filter(id=jurisdiction_id).update(
latest_bill_update=datetime.datetime.utcnow()
)
# run process_import_function or do_import
if __name__ == "__main__":
# pass arguments to run do_import() process,
# for reproducing import errors locally
# poetry run python app.py do_import {os_jurisdiction_id} {path_to_folder}
# e.g. poetry run python app.py do_import
# "ocd-jurisdiction/country:us/state:ma/government"
# "/path/to/ma-2024-11-15T12_00_59.748666"
if sys.argv[1] == "do_import":
# we need to set up django here, because we're running locally
# and don't have zappa packaging
import django
django.setup()
do_import(sys.argv[2], sys.argv[3])
else:
# just run the entrypoint function, if no args present
process_import_function({}, {})