-
Notifications
You must be signed in to change notification settings - Fork 24
Batch Persistence
One of our primary goals is to have useable data in the database. We therefore want to make sure that our error rate is not too high before considering a new ingest pipeline's worth of writes for a region as valid. For this purpose, writes can optionally be batched, and when the pipeline is complete we check its total ingest successes and failures before committing to persist the results to the database.
In scrape-based ingest, we batch writes by default across all scraped regions. In non-batched persistence for scrapers, each individual scrape task that generates a new entity graph to be persisted immediately writes it to the database and terminates. In batched persistence, however, these tasks write a BatchIngestInfoData
object to a Pub/Sub topic: in the event of successful ingest, the object has the ingested entity graph as an IngestInfo
object; in the event of an error, the object has some error specified.
In direct ingest, we do not batch writes by default, since we naturally consolidate all writes into a single transaction for a single ingested file or request. However, batch persistence can be optionally enabled for any ingest process. The examples below are described within the context of scrape-based ingest.
The fields in BatchIngestInfoData
are:
-
task_hash
- the hash of the task which created this object -
ingest_info
- the ingested entity graph, if successful -
error
- an error message, if failure -
trace_id
- the trace id of the failure request, if failure
If the task fails, the failure is caught and a BatchIngestInfoData
is published with the task_hash
, error
, and trace_id
. Note that if a task fails, it still returns a 500
error and retries some number of times. Each retry incurs a failed write to the Pub/Sub, so these are de-duplicated using the task_hash
.
If the task passes, the BatchIngestInfoData
is published with the task_hash
and ingest_info
. Note that we still publish the task because Pub/Sub does not have exactly-once delivery semantics, so this is used to de-duplicate successes, as well.
Once the ingest pipeline is complete and all BatchIngestInfoData
messages are published to Pub/Sub, the batch persistence endpoint is requested, indicating which region should be written via query parameter. The batch persistence flow goes through the following steps:
- Read all messages for the given region from Pub/Sub
- For every message, check if we have already seen a success from that task: if so we discard it because if a task has a single success we can ignore failures (a task can fail twice and pass the third time, for example). This also protects against multiple successful messages being published.
- From these messages we convert and merge each message's
ingest_info
object into a single entity graph to be persisted, and only pass it down through the rest of the persistence layer if a sufficient number of objects passed conversion successfully. - Similarly, the persistence layer counts errors on each of the individual people in the merged entity graph, and fails the write if too many failed, namely in either conversion to the ORM models or in entity matching.
Pub/Sub has trouble reading all messages off of a topic in a timely fashion. The method we use is to spawn 5 threads that all pull from the topic at the same time and return. We continuously spawn 5 threads and keep reading, stopping if and only if all 5 threads returned no messages.
- Home
- Architecture
- Schemas
- Methodology
- Data Extraction
- Data Normalization
- Entity Matching
- Recidivism Measurement
- Development
- Local Development
- Create a Scraper
- Add a New Schema
- Update BigQuery Views
- Continuous Integration
- Operations