From d53ba6e095947edf1c8f454749e7ee1651081c6c Mon Sep 17 00:00:00 2001 From: Zachary Susswein Date: Sat, 31 Aug 2024 19:00:49 +0000 Subject: [PATCH] Data reader function Assumes data is being read from a local file in parquet format, with schema matching our established ETL gold schema. I pull in the Gostic 2020 dataset to test data reading, with data in the exepcted format stored in `tests/testthat/data`. --- DESCRIPTION | 2 + R/read_data.R | 123 ++++++++++++++++++ .../convert_gostic_toy_rt_to_test_dataset.R | 43 ++++++ tests/testthat/test-read_data.R | 70 ++++++++++ 4 files changed, 238 insertions(+) create mode 100644 R/read_data.R create mode 100644 data-raw/convert_gostic_toy_rt_to_test_dataset.R create mode 100644 tests/testthat/test-read_data.R diff --git a/DESCRIPTION b/DESCRIPTION index d07939e2..d949442d 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -26,5 +26,7 @@ Imports: AzureRMR, AzureStor, cli, + DBI, + duckdb, rlang URL: https://cdcgov.github.io/cfa-epinow2-pipeline/ diff --git a/R/read_data.R b/R/read_data.R new file mode 100644 index 00000000..a988c7b5 --- /dev/null +++ b/R/read_data.R @@ -0,0 +1,123 @@ +#' Read in the dataset of incident case counts +#' +#' Each row of the table corresponds to a single facilities' cases for a +#' reference-date/report-date/disease tuple. We want to aggregate these counts +#' to the level of geographic aggregate/report-date/reference-date/disease. +#' +#' We handle two distinct cases for geographic aggregates: +#' +#' 1. A single state: Subset to facilities **in that state only** and aggregate +#' up to the state level 2. The US overall: Aggregate over all facilities +#' without any subsetting +#' +#' Note that we do _not_ apply exclusions here. The exclusions are applied +#' later, after the aggregations. That means that for the US overall, we +#' aggregate over points that might potentially be excluded at the state level. +#' Our recourse in this case is to exclude the US overall aggregate point. +#' +#' @param data_path The path to the local file. This could contain a glob and +#' must be in parquet format. +#' @param disease One of "COVID-19" or "Influenza" +#' @param state_abb A two-letter uppercase abbreviation +#' +#' @return A dataframe with one or more rows and columns `report_date`, +#' `reference_date`, `state_abb`, `confirm` +#' @export +read_data <- function(data_path, + disease = c("COVID-19", "Influenza", "test"), + state_abb) { + rlang::arg_match(disease) + # NOTE: this is temporary workaround until we switch to the new API. I'm not + # sure if there's a better way to do this without a whole bunch of special + # casing -- which is its own code smell. I think this should really be handled + # upstream in the ETL job and standardize on "COVID-19", but that's beyond + # scope here and we need to do _something_ in the meantime so this runs. + disease_map <- c( + "COVID-19" = "COVID-19/Omicron", + "Influenza" = "Influenza", + "test" = "test" + ) + mapped_disease <- disease_map[disease] + + # We need different queries for the states and the US overall For US overall + # we need to aggregate over all the facilities in all the states. For the + # states, we need to aggregate over all the facilities in that one state + if (state_abb == "US") { + query <- " + SELECT + sum(value) AS confirm, + reference_date, + report_date, + -- We want to inject the 'US' as our abbrevation here bc data is not agg'd + 'US' AS state_abb + FROM read_parquet(?) + WHERE 1=1 + AND disease = ? + AND metric = 'count_ed_visits' + GROUP BY reference_date, report_date + ORDER BY reference_date + " + + parameters <- list( + data_path, + mapped_disease + ) + } else { + # We want just one state so aggregate over facilites in that one state only + query <- " + SELECT + sum(value) AS confirm, + reference_date, + report_date, + geo_value AS state_abb + FROM read_parquet(?) + WHERE 1=1 + AND geo_value = ? + AND disease = ? + AND metric = 'count_ed_visits' + GROUP BY geo_value, reference_date, report_date + ORDER BY reference_date + " + parameters <- list( + data_path, + state_abb, + mapped_disease + ) + } + + # Guard against file does not exist + cli::cli_alert("Reading data from {.path {data_path}}") + if (!file.exists(data_path)) { + cli::cli_abort( + "Cannot read data. File {.path {data_path}} doesn't exist" + ) + } + + con <- DBI::dbConnect(duckdb::duckdb()) + df <- rlang::try_fetch( + DBI::dbGetQuery( + con, + statement = query, + params = parameters + ), + error = function(con) { + cli::cli_abort(c( + "Error fetching data from {.path {data_path}}", + "Using parameters {parameters}", + "Original error: {con}" + )) + } + ) + DBI::dbDisconnect(con) + + # Guard against empty return + if (nrow(df) == 0) { + cli::cli_abort(c( + "No data matching returned from {.path {data_path}}", + "Using parameters {parameters}" + )) + } + + cli::cli_alert_success("Read {nrow(df)} rows from {.path {data_path}}") + return(df) +} diff --git a/data-raw/convert_gostic_toy_rt_to_test_dataset.R b/data-raw/convert_gostic_toy_rt_to_test_dataset.R new file mode 100644 index 00000000..d0e849c3 --- /dev/null +++ b/data-raw/convert_gostic_toy_rt_to_test_dataset.R @@ -0,0 +1,43 @@ +load("data/gostic_toy_rt.rda") +gostic_toy_rt[["reference_date"]] <- as.Date("2023-01-01") + + gostic_toy_rt[["time"]] +gostic_toy_rt[["report_date"]] <- max(gostic_toy_rt[["reference_date"]]) + 1 + +con <- DBI::dbConnect(duckdb::duckdb()) + +duckdb::duckdb_register(con, "gostic_toy_rt", gostic_toy_rt) +dbExecute(con, " +COPY ( + SELECT + obs_incidence AS value, + 'test' AS geo_value, + 'test' AS disease, + 'count_ed_visits' AS metric, + reference_date, + report_date + FROM gostic_toy_rt + ORDER BY reference_date + LIMIT 150 +) TO + 'tests/testthat/data/test_data.parquet' (FORMAT PARQUET) + ; + ") + +# Repeat for US overall +dbExecute(con, " +COPY ( + SELECT + obs_incidence AS value, + 'US' AS geo_value, + 'test' AS disease, + 'count_ed_visits' AS metric, + reference_date, + report_date + FROM gostic_toy_rt + ORDER BY reference_date + LIMIT 150 +) TO + 'tests/testthat/data/us_overall_test_data.parquet' (FORMAT PARQUET) + ; + ") +dbDisconnect(con) diff --git a/tests/testthat/test-read_data.R b/tests/testthat/test-read_data.R new file mode 100644 index 00000000..502de632 --- /dev/null +++ b/tests/testthat/test-read_data.R @@ -0,0 +1,70 @@ +test_that("Data read for one state works on happy path", { + data_path <- test_path("data/test_data.parquet") + con <- DBI::dbConnect(duckdb::duckdb()) + expected <- DBI::dbGetQuery(con, " + SELECT + value AS confirm, + reference_date, + report_date, + geo_value AS state_abb + FROM read_parquet(?)", + params = list(data_path) + ) + DBI::dbDisconnect(con) + + actual <- read_data(data_path, + disease = "test", + state_abb = "test" + ) + + expect_equal(actual, expected) +}) + +test_that("Data read for US overall works on happy path", { + data_path <- test_path("data/us_overall_test_data.parquet") + con <- DBI::dbConnect(duckdb::duckdb()) + expected <- DBI::dbGetQuery(con, " + SELECT + value AS confirm, + reference_date, + report_date, + geo_value AS state_abb + FROM read_parquet(?)", + params = list(data_path) + ) + DBI::dbDisconnect(con) + + actual <- read_data(data_path, + disease = "test", + state_abb = "US" + ) + + expect_equal(actual, expected) +}) + + +test_that("Reading a file that doesn't exist fails", { + data_path <- "not_a_real_file" + + expect_error(read_data(data_path, + disease = "test", + state_abb = "US" + )) +}) + +test_that("A query with no matching return fails", { + data_path <- test_path("data/us_overall_test_data.parquet") + expect_error(read_data(data_path, + disease = "test", + state_abb = "not_a_real_state" + )) +}) + +test_that("An invalid query throws a wrapped error", { + # point the query at a non-parquet file + data_path <- test_path("../../data/gostic_toy_rt.rda") + expect_error(read_data(data_path, + disease = "test", + state_abb = "US" + )) +})