generated from CDCgov/template
-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Assumes data is being read from a local file in parquet format, with schema matching our established ETL gold schema. I pull in the Gostic 2020 dataset to test data reading, with data in the exepcted format stored in `tests/testthat/data`.
- Loading branch information
Showing
4 changed files
with
238 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,5 +26,7 @@ Imports: | |
AzureRMR, | ||
AzureStor, | ||
cli, | ||
DBI, | ||
duckdb, | ||
rlang | ||
URL: https://cdcgov.github.io/cfa-epinow2-pipeline/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
#' Read in the dataset of incident case counts | ||
#' | ||
#' Each row of the table corresponds to a single facilities' cases for a | ||
#' reference-date/report-date/disease tuple. We want to aggregate these counts | ||
#' to the level of geographic aggregate/report-date/reference-date/disease. | ||
#' | ||
#' We handle two distinct cases for geographic aggregates: | ||
#' | ||
#' 1. A single state: Subset to facilities **in that state only** and aggregate | ||
#' up to the state level 2. The US overall: Aggregate over all facilities | ||
#' without any subsetting | ||
#' | ||
#' Note that we do _not_ apply exclusions here. The exclusions are applied | ||
#' later, after the aggregations. That means that for the US overall, we | ||
#' aggregate over points that might potentially be excluded at the state level. | ||
#' Our recourse in this case is to exclude the US overall aggregate point. | ||
#' | ||
#' @param data_path The path to the local file. This could contain a glob and | ||
#' must be in parquet format. | ||
#' @param disease One of "COVID-19" or "Influenza" | ||
#' @param state_abb A two-letter uppercase abbreviation | ||
#' | ||
#' @return A dataframe with one or more rows and columns `report_date`, | ||
#' `reference_date`, `state_abb`, `confirm` | ||
#' @export | ||
read_data <- function(data_path, | ||
disease = c("COVID-19", "Influenza", "test"), | ||
state_abb) { | ||
rlang::arg_match(disease) | ||
# NOTE: this is temporary workaround until we switch to the new API. I'm not | ||
# sure if there's a better way to do this without a whole bunch of special | ||
# casing -- which is its own code smell. I think this should really be handled | ||
# upstream in the ETL job and standardize on "COVID-19", but that's beyond | ||
# scope here and we need to do _something_ in the meantime so this runs. | ||
disease_map <- c( | ||
"COVID-19" = "COVID-19/Omicron", | ||
"Influenza" = "Influenza", | ||
"test" = "test" | ||
) | ||
mapped_disease <- disease_map[disease] | ||
|
||
# We need different queries for the states and the US overall For US overall | ||
# we need to aggregate over all the facilities in all the states. For the | ||
# states, we need to aggregate over all the facilities in that one state | ||
if (state_abb == "US") { | ||
query <- " | ||
SELECT | ||
sum(value) AS confirm, | ||
reference_date, | ||
report_date, | ||
-- We want to inject the 'US' as our abbrevation here bc data is not agg'd | ||
'US' AS state_abb | ||
FROM read_parquet(?) | ||
WHERE 1=1 | ||
AND disease = ? | ||
AND metric = 'count_ed_visits' | ||
GROUP BY reference_date, report_date | ||
ORDER BY reference_date | ||
" | ||
|
||
parameters <- list( | ||
data_path, | ||
mapped_disease | ||
) | ||
} else { | ||
# We want just one state so aggregate over facilites in that one state only | ||
query <- " | ||
SELECT | ||
sum(value) AS confirm, | ||
reference_date, | ||
report_date, | ||
geo_value AS state_abb | ||
FROM read_parquet(?) | ||
WHERE 1=1 | ||
AND geo_value = ? | ||
AND disease = ? | ||
AND metric = 'count_ed_visits' | ||
GROUP BY geo_value, reference_date, report_date | ||
ORDER BY reference_date | ||
" | ||
parameters <- list( | ||
data_path, | ||
state_abb, | ||
mapped_disease | ||
) | ||
} | ||
|
||
# Guard against file does not exist | ||
cli::cli_alert("Reading data from {.path {data_path}}") | ||
if (!file.exists(data_path)) { | ||
cli::cli_abort( | ||
"Cannot read data. File {.path {data_path}} doesn't exist" | ||
) | ||
} | ||
|
||
con <- DBI::dbConnect(duckdb::duckdb()) | ||
df <- rlang::try_fetch( | ||
DBI::dbGetQuery( | ||
con, | ||
statement = query, | ||
params = parameters | ||
), | ||
error = function(con) { | ||
cli::cli_abort(c( | ||
"Error fetching data from {.path {data_path}}", | ||
"Using parameters {parameters}", | ||
"Original error: {con}" | ||
)) | ||
} | ||
) | ||
DBI::dbDisconnect(con) | ||
|
||
# Guard against empty return | ||
if (nrow(df) == 0) { | ||
cli::cli_abort(c( | ||
"No data matching returned from {.path {data_path}}", | ||
"Using parameters {parameters}" | ||
)) | ||
} | ||
|
||
cli::cli_alert_success("Read {nrow(df)} rows from {.path {data_path}}") | ||
return(df) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
load("data/gostic_toy_rt.rda") | ||
gostic_toy_rt[["reference_date"]] <- as.Date("2023-01-01") + | ||
gostic_toy_rt[["time"]] | ||
gostic_toy_rt[["report_date"]] <- max(gostic_toy_rt[["reference_date"]]) + 1 | ||
|
||
con <- DBI::dbConnect(duckdb::duckdb()) | ||
|
||
duckdb::duckdb_register(con, "gostic_toy_rt", gostic_toy_rt) | ||
dbExecute(con, " | ||
COPY ( | ||
SELECT | ||
obs_incidence AS value, | ||
'test' AS geo_value, | ||
'test' AS disease, | ||
'count_ed_visits' AS metric, | ||
reference_date, | ||
report_date | ||
FROM gostic_toy_rt | ||
ORDER BY reference_date | ||
LIMIT 150 | ||
) TO | ||
'tests/testthat/data/test_data.parquet' (FORMAT PARQUET) | ||
; | ||
") | ||
|
||
# Repeat for US overall | ||
dbExecute(con, " | ||
COPY ( | ||
SELECT | ||
obs_incidence AS value, | ||
'US' AS geo_value, | ||
'test' AS disease, | ||
'count_ed_visits' AS metric, | ||
reference_date, | ||
report_date | ||
FROM gostic_toy_rt | ||
ORDER BY reference_date | ||
LIMIT 150 | ||
) TO | ||
'tests/testthat/data/us_overall_test_data.parquet' (FORMAT PARQUET) | ||
; | ||
") | ||
dbDisconnect(con) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
test_that("Data read for one state works on happy path", { | ||
data_path <- test_path("data/test_data.parquet") | ||
con <- DBI::dbConnect(duckdb::duckdb()) | ||
expected <- DBI::dbGetQuery(con, " | ||
SELECT | ||
value AS confirm, | ||
reference_date, | ||
report_date, | ||
geo_value AS state_abb | ||
FROM read_parquet(?)", | ||
params = list(data_path) | ||
) | ||
DBI::dbDisconnect(con) | ||
|
||
actual <- read_data(data_path, | ||
disease = "test", | ||
state_abb = "test" | ||
) | ||
|
||
expect_equal(actual, expected) | ||
}) | ||
|
||
test_that("Data read for US overall works on happy path", { | ||
data_path <- test_path("data/us_overall_test_data.parquet") | ||
con <- DBI::dbConnect(duckdb::duckdb()) | ||
expected <- DBI::dbGetQuery(con, " | ||
SELECT | ||
value AS confirm, | ||
reference_date, | ||
report_date, | ||
geo_value AS state_abb | ||
FROM read_parquet(?)", | ||
params = list(data_path) | ||
) | ||
DBI::dbDisconnect(con) | ||
|
||
actual <- read_data(data_path, | ||
disease = "test", | ||
state_abb = "US" | ||
) | ||
|
||
expect_equal(actual, expected) | ||
}) | ||
|
||
|
||
test_that("Reading a file that doesn't exist fails", { | ||
data_path <- "not_a_real_file" | ||
|
||
expect_error(read_data(data_path, | ||
disease = "test", | ||
state_abb = "US" | ||
)) | ||
}) | ||
|
||
test_that("A query with no matching return fails", { | ||
data_path <- test_path("data/us_overall_test_data.parquet") | ||
expect_error(read_data(data_path, | ||
disease = "test", | ||
state_abb = "not_a_real_state" | ||
)) | ||
}) | ||
|
||
test_that("An invalid query throws a wrapped error", { | ||
# point the query at a non-parquet file | ||
data_path <- test_path("../../data/gostic_toy_rt.rda") | ||
expect_error(read_data(data_path, | ||
disease = "test", | ||
state_abb = "US" | ||
)) | ||
}) |