Skip to content

Commit

Permalink
Data reader function
Browse files Browse the repository at this point in the history
Assumes data is being read from a local file in parquet format, with
schema matching our established ETL gold schema.

I pull in the Gostic 2020 dataset to test data reading, with data in
the exepcted format stored in `tests/testthat/data`.
  • Loading branch information
zsusswein committed Aug 31, 2024
1 parent e05a8b0 commit 38f4e3d
Show file tree
Hide file tree
Showing 9 changed files with 243 additions and 2 deletions.
5 changes: 5 additions & 0 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,10 @@ Imports:
AzureRMR,
AzureStor,
cli,
DBI,
duckdb,
rlang
URL: https://cdcgov.github.io/cfa-epinow2-pipeline/
Depends:
R (>= 2.10)
LazyData: true
4 changes: 2 additions & 2 deletions R/data.R
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
#' }
#' @source
#' <https://github.com/cobeylab/Rt_estimation/tree/d9d8977ba8492ac1a3b8287d2f470b313bfb9f1d> # nolint
NULL
"gostic_toy_rt"

#' Generation interval corresponding to the sample `gostic_toy_rt` dataset
#'
Expand Down Expand Up @@ -76,4 +76,4 @@ NULL
#' @name gostic_gt_pmf
#' @format `gostic_gt_pmf` A numeric vector of length 26 that sums to one within
#' numerical tolerance
NULL
"gostic_gt_pmf"
123 changes: 123 additions & 0 deletions R/read_data.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
#' Read in the dataset of incident case counts
#'
#' Each row of the table corresponds to a single facilities' cases for a
#' reference-date/report-date/disease tuple. We want to aggregate these counts
#' to the level of geographic aggregate/report-date/reference-date/disease.
#'
#' We handle two distinct cases for geographic aggregates:
#'
#' 1. A single state: Subset to facilities **in that state only** and aggregate
#' up to the state level 2. The US overall: Aggregate over all facilities
#' without any subsetting
#'
#' Note that we do _not_ apply exclusions here. The exclusions are applied
#' later, after the aggregations. That means that for the US overall, we
#' aggregate over points that might potentially be excluded at the state level.
#' Our recourse in this case is to exclude the US overall aggregate point.
#'
#' @param data_path The path to the local file. This could contain a glob and
#' must be in parquet format.
#' @param disease One of "COVID-19" or "Influenza"
#' @param state_abb A two-letter uppercase abbreviation
#'
#' @return A dataframe with one or more rows and columns `report_date`,
#' `reference_date`, `state_abb`, `confirm`
#' @export
read_data <- function(data_path,
disease = c("COVID-19", "Influenza", "test"),
state_abb) {
rlang::arg_match(disease)
# NOTE: this is temporary workaround until we switch to the new API. I'm not
# sure if there's a better way to do this without a whole bunch of special
# casing -- which is its own code smell. I think this should really be handled
# upstream in the ETL job and standardize on "COVID-19", but that's beyond
# scope here and we need to do _something_ in the meantime so this runs.
disease_map <- c(
"COVID-19" = "COVID-19/Omicron",
"Influenza" = "Influenza",
"test" = "test"
)
mapped_disease <- disease_map[disease]

# We need different queries for the states and the US overall For US overall
# we need to aggregate over all the facilities in all the states. For the
# states, we need to aggregate over all the facilities in that one state
if (state_abb == "US") {
query <- "
SELECT
sum(value) AS confirm,
reference_date,
report_date,
-- We want to inject the 'US' as our abbrevation here bc data is not agg'd
'US' AS state_abb
FROM read_parquet(?)
WHERE 1=1
AND disease = ?
AND metric = 'count_ed_visits'
GROUP BY reference_date, report_date
ORDER BY reference_date
"

parameters <- list(
data_path,
mapped_disease
)
} else {
# We want just one state so aggregate over facilites in that one state only
query <- "
SELECT
sum(value) AS confirm,
reference_date,
report_date,
geo_value AS state_abb
FROM read_parquet(?)
WHERE 1=1
AND geo_value = ?
AND disease = ?
AND metric = 'count_ed_visits'
GROUP BY geo_value, reference_date, report_date
ORDER BY reference_date
"
parameters <- list(
data_path,
state_abb,
mapped_disease
)
}

# Guard against file does not exist
cli::cli_alert("Reading data from {.path {data_path}}")
if (!file.exists(data_path)) {
cli::cli_abort(
"Cannot read data. File {.path {data_path}} doesn't exist"
)
}

con <- DBI::dbConnect(duckdb::duckdb())
df <- rlang::try_fetch(
DBI::dbGetQuery(
con,
statement = query,
params = parameters
),
error = function(con) {
cli::cli_abort(c(
"Error fetching data from {.path {data_path}}",
"Using parameters {parameters}",
"Original error: {con}"

Check warning on line 107 in R/read_data.R

View check run for this annotation

Codecov / codecov/patch

R/read_data.R#L104-L107

Added lines #L104 - L107 were not covered by tests
))
}
)
DBI::dbDisconnect(con)

# Guard against empty return
if (nrow(df) == 0) {
cli::cli_abort(c(
"No data matching returned from {.path {data_path}}",
"Using parameters {parameters}"
))
}

cli::cli_alert_success("Read {nrow(df)} rows from {.path {data_path}}")
return(df)
}
43 changes: 43 additions & 0 deletions data-raw/convert_gostic_toy_rt_to_test_dataset.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
load("data/gostic_toy_rt.rda")
gostic_toy_rt[["reference_date"]] <- as.Date("2023-01-01") +
gostic_toy_rt[["time"]]
gostic_toy_rt[["report_date"]] <- max(gostic_toy_rt[["reference_date"]]) + 1

con <- DBI::dbConnect(duckdb::duckdb())

duckdb::duckdb_register(con, "gostic_toy_rt", gostic_toy_rt)
dbExecute(con, "
COPY (
SELECT
obs_incidence AS value,
'test' AS geo_value,
'test' AS disease,
'count_ed_visits' AS metric,
reference_date,
report_date
FROM gostic_toy_rt
ORDER BY reference_date
LIMIT 150
) TO
'tests/testthat/data/test_data.parquet' (FORMAT PARQUET)
;
")

# Repeat for US overall
dbExecute(con, "
COPY (
SELECT
obs_incidence AS value,
'US' AS geo_value,
'test' AS disease,
'count_ed_visits' AS metric,
reference_date,
report_date
FROM gostic_toy_rt
ORDER BY reference_date
LIMIT 150
) TO
'tests/testthat/data/us_overall_test_data.parquet' (FORMAT PARQUET)
;
")
dbDisconnect(con)
Binary file modified data/gostic_gt_pmf.rda
Binary file not shown.
Binary file modified data/gostic_toy_rt.rda
Binary file not shown.
Binary file added tests/testthat/data/test_data.parquet
Binary file not shown.
Binary file added tests/testthat/data/us_overall_test_data.parquet
Binary file not shown.
70 changes: 70 additions & 0 deletions tests/testthat/test-read_data.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
test_that("Data read for one state works on happy path", {
data_path <- test_path("data/test_data.parquet")
con <- DBI::dbConnect(duckdb::duckdb())
expected <- DBI::dbGetQuery(con, "
SELECT
value AS confirm,
reference_date,
report_date,
geo_value AS state_abb
FROM read_parquet(?)",
params = list(data_path)
)
DBI::dbDisconnect(con)

actual <- read_data(data_path,
disease = "test",
state_abb = "test"
)

expect_equal(actual, expected)
})

test_that("Data read for US overall works on happy path", {
data_path <- test_path("data/us_overall_test_data.parquet")
con <- DBI::dbConnect(duckdb::duckdb())
expected <- DBI::dbGetQuery(con, "
SELECT
value AS confirm,
reference_date,
report_date,
geo_value AS state_abb
FROM read_parquet(?)",
params = list(data_path)
)
DBI::dbDisconnect(con)

actual <- read_data(data_path,
disease = "test",
state_abb = "US"
)

expect_equal(actual, expected)
})


test_that("Reading a file that doesn't exist fails", {
data_path <- "not_a_real_file"

expect_error(read_data(data_path,
disease = "test",
state_abb = "US"
))
})

test_that("A query with no matching return fails", {
data_path <- test_path("data/us_overall_test_data.parquet")
expect_error(read_data(data_path,
disease = "test",
state_abb = "not_a_real_state"
))
})

test_that("An invalid query throws a wrapped error", {
# point the query at a non-parquet file
data_path <- test_path("../../data/gostic_toy_rt.rda")
expect_error(read_data(data_path,
disease = "test",
state_abb = "US"
))
})

0 comments on commit 38f4e3d

Please sign in to comment.