Batch ETL pipeline to mirror Dog Aging Project (DAP) data into the Terra Data Repository (TDR).
DAP is a 10-year longitudinal study, eventually integrating multiple data streams. For now, we're focused on a single survey from their RedCap data stream. The extraction and transformation pipelines for HLES Baseline are implemented. We still need to define & deploy the automated orchestration component for that data stream.
DAP's various data streams will spin up, refresh, and turn down at semi-independent cadences. They will all need to flow into the same dataset (for ease of access control), but that doesn't mean they need to be processed as a single workflow. A more scalable pattern would likely be to:
- Use TDR schema migration capabilities to add new tables to the long-lived dataset when needed
- Set up distinct ETL workflows for each independent data stream
As an initial step towards this pattern, the extraction and transformation programs
for DAP's data stream (the HLES baseline) are nested under their own hles/
subdirectory.
The HLES "baseline" survey is the entry-point for all DAP participants. It is a relatively sparse dataset, with over 2000 possible (and usually mostly-null) variables. Dog owners complete this survey once as they enter the project.
We designed our version of the HLES schema with the following goals in mind:
- "Clean up" inconsistencies in column names where possible
- Reduce the magnitude of columns captured in the dataset
- Eliminate array-valued columns, for compatibility with common statistical applications
- Keep the total count of tables low, to keep the barrier to entry low for users new to cloud processing / Terra
Together, these principles led us to decide:
- Field names from the source data are renamed for consistency / accuracy, but retain a prefix indicating their RedCap instrument
- The single HLES record-per-dog pulled from RedCap are split into rows in 2-4 tables:
- An
hles_owner
table containing all owner-specific information (i.e. education, ethnicity). Splitting out owner info prepares us for a future where dogs transfer ownership. - An
hles_dog
table containing most dog-specific information, along with a foreign key to the owner table. The only data not included in this table is the "long tail" of health condition details. - An
hles_cancer_condition
table containing all the details around cancer diagnoses, with a foreign key to the dog table. The HLES questions around cancer use an entirely different pattern from other conditions, and splitting the data into a separate table was the simplest way for us to deal with it. - An
hles_health_condition
table containing the details of all non-cancer conditions, with a foreign key to the dog table. We reduce the cardinality of the entire dataset in this table by transposing the survey's 2K+ distinct questions about different conditions into a common set of columns, with a row in the table per dog/condition pair.
- An
- Array-valued columns from the source data (multi-selects) are "unrolled" into multiple top-level BOOLEAN columns
We use a Dataflow pipeline to extract HLES questions from RedCap. The pipeline filters records to ensure that only complete, consented data is downloaded. The resulting data is written out in "EAV" form, with fields:
- "record": The participant's ID across the entire study
- "redcap_event_name": An marker for the wave / round of surveys the data point was pulled from
- "field_name": The raw key of the question
- "value": The user-provided answer to the question
Note that:
- Multi-select columns can have multiple data points for the same record/event/field trio, appearing as separate rows in the output
- Answers to categorical questions will be exported in "raw" numeric form
The extraction pipeline can also download the JSON of each instrument's data dictionary.
The extraction pipeline requires a RedCap auth token as input. We've stashed our tokens in Vault. To get the token, run this shell command:
ENV=dev # ENV=prod also works
vault read -field=token secret/dsde/monster/${ENV}/dog-aging/redcap-tokens/automation
NEVER check the token into source control.
Extracted HLES data is transformed using another Dataflow pipeline. The pipeline includes a lot of code because of all the basic column-renamings and the full listing of health conditions. On the plus side, we've been told by DAP leadership that the HLES structure is effectively immutable on their end, so it's unlikely we'll ever need to rewrite large chunks of this processing logic.
The Canine Social and Learned Behavior is a followup survey to HLES that is administered annually. The goal of this survey is to assess age-related cognitive and behavioral changes in dogs. The first time a participant fills out the CSLB survey, a baseline score is established. Participants will have the opportunity to complete the survey again every year. This repeated design allows us to learn how the dogs in the study change over time. Answers to all questions are required.
- Pull all records from each yearly arm "annual_{yyyy}_arm_1" where canine_social_and_learned_behavior_complete is marked as complete.
- A single
cslb
table which contains a single CSLB record per dog per year. - The table includes the 16 CSLB question responses and a foreign key to the dog table.
- Rows in the final Cslb table will be unique on dog_id and cslb_date.
Environmental Data in the Dog Aging Project is published monthly to RedCap for each dog using the addresses collected in the HLES survey. It seeks to capture a snapshot of key environmental factors that may affect a dog's health over time so it can be used in conjunction with the other datasets. All fields from environmental data are calculated using the HLES primary and secondary addresses.
- Pull all records from each monthly arm for each address: "annual_{MMMyyyy}_arm_1", "annual_{MMMyyyy}_secondary_arm_1
- where baseline_complete is marked as complete.
Environmental data is modeled closely on what the DAP provided us, with a proto-schema which we converted to a repo schema. We decided to break the larger table up into smaller fragments: geocoding, census, pollutants, temperature_precipitation, and walkability variables.
- A single
environment
table which contains a single Environment record per dog per month per address. - The table includes all environmental variables and a foreign key to the dog table.
- Rows in the final Environment table will be unique on dog_id and address_month_year.
Sample Data in the Dog Aging Project is populated as different cohorts of participants are sent different sample kits. Our extraction of this data seeks to capture the linkage between study_id (dog_id) and sample_id as well as some other metadata about the sample.
- Pull all records from the baseline arm (baseline_arm_1)
- where k1_tube_serial and k1_rtn_tracking_date are populated.
- A single
sample
table which contains multiple samples per dog. - Simple lookup table with 5 fields including a foreign key to the dog table.
- Rows in the final Environment table will be unique on sample_id.
The End of Life Survey is administered when a dog enrolled in the study has passed away. The goal of this survey is to collect information about the circumstances surrounding the end of the dog's life and any information available regarding the cause of death.
- Pull all records from the baseline arm (baseline_arm_1)
- where end_of_life_complete and eol_willing_to_complete are marked as complete.
Eols data is also modeled closely to a DAP provided proto-schema which we converted to a repo schema. We decided to break the larger table up into smaller fragments: new_condition, recent_aging_char, recent_symptom, death, euthan, and illness variables.
- A single
eols
table which contains a single Eols record per dog. - The table includes all eols variables and a foreign key to the dog table.
- Rows in the final eols table will be unique on dog_id.
We have a manually maintained lookup table that we are constructing using the Terra JSON schema for the DAP tables and the DAP data dictionaries provided from RedCap. The table contains a row for each field in the the final dog aging tables. The mapping table includes fields to capture information about the upstream source of data, datatypes, and most importantly, the mapping of raw values ot value labels for RedCap survey questions with radio responses.
We are looking forward to building some tooling that will be able to parse a proto-schema provided by DAP into usable code for ingest in the form of a Terra JSON Schema output.
The DAP ingest's long term roadmap includes a TDR<->Workspace integration. Until that integration is live, we've written a script to convert the outputs of our transformation pipeline into workspace-compatible TSVs. The script takes two positional arguments:
- Path to the top-level directory containing the outputs of the transformation pipeline
- Path to a directory where converted TSVs should be written There are additional arguments to that script to add some flexibility to the tool:
- List of tables to process (currently looks for all tables)
- Whether to output primary keys with a modified column name for Firecloud compatibility: entity:{table_name}_id
- Can output primary keys with a modified column name for Firecloud compatibility: entity:{table_name}_id
- Call Format:
python convert-output-to-tsv.py {transform GS path} {tsv outfile local path} -d
python convert-output-to-tsv.py gs://example-bucket/weekly_refresh/20210525/transform /Users/DATA/DAP/tsv -d
While it is possible to manually process the DAP data on our local machines, it had become too cumbersome to do so once we crossed the threshold of around 10k+ participants. We pushed our extraction and transformation logic out to the cloud and are now running our refreshes in dev_mode using the dataflow_beam runner. The Dagster project we added for DAP includes 3 different modes of operation:
local_mode
uses local_beam_runnerdev_mode
uses dataflow_beam_runnerprod_mode
uses dataflow_beam_runner
For now, we can call individual pipelines manually with any runner and then call the tsv script to write files locally. The general structure for the SBT call to kick off one of our pipelines:
sbt "{scala_project} {target_class} {sbt parameters} {dataflow parameters}"
- Here is sample an sbt for the Environment extraction pipeline:
sbt "dog-aging-hles-extraction/runMain org.broadinstitute.monster.dap.environment.EnvironmentExtractionPipeline --apiToken=foo --pullDataDictionaries=false --outputPrefix=bar --runner=dataflow --project=baz --region=us-central1 --workerMachineType=n1-standard-1 --autoscalingAlgorithm=THROUGHPUT_BASED --numWorkers=4 --maxNumWorkers=8 --experiments=shuffle_mode=service --endTime=qux"
Alternatively we can also execute a Dagster pipeline which will run all the pipelines and write local TSVs and will use a runner based on the mode.
Make sure to install Poetry before getting started.
- Navigate to the orchestration subdirectory
- Setup python virtual environment
poetry install
to install your project’s packagepoetry update
to get the latest versions of the dependencies and to update the poetry.lock file
- Update config entries in orchestration/dev_refresh.yaml:
- Update the
refresh_directory
resource config entry (this should be a GCS path) - Update the
working_dir
config entry for the write_outfiles solid - Update the
end_time
,pull_data_dictionaries
, andapi_token
config entries for each extract solid
- Update the
- Execute the pipeline:
poetry shell
spawns a shell within the project's virtual environmentdagster pipeline execute -f repositories.py -a repositories -p refresh_data_all --mode dev -c dev_refresh.yaml
- Files should appear within a "tsv_output" relative to the
working_dir
specified in the config.
When the hles_data volume was much smaller, we were able to load the final TSVs directly to Terra using the UI prompts. Once the data grew too big to load via browser UI, we started utilizing the field engineering script to upload large entities. While still building out the TDR <-> workspace integration for DAP, we are using a Google bucket to deliver refreshed TSVs.