Skip to content

Commit

Permalink
atlz
Browse files Browse the repository at this point in the history
  • Loading branch information
fabkury committed Dec 6, 2022
1 parent 839513a commit 259066b
Show file tree
Hide file tree
Showing 7 changed files with 184 additions and 64 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Package: phea
Title: Phenotyping Algebra
Version: 0.4.1.0000
Version: 0.5.0.0000
Authors@R:
person("Fabrício", "Kury", , "[email protected]", role = c("aut", "cre"),
comment = c(ORCID = "YOUR-ORCID-ID"))
Expand Down
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
export(calculate_formula)
export(code_shot)
export(head_shot)
export(keep_change_of)
export(keep_row_by)
export(make_component)
export(make_record_source)
Expand Down
201 changes: 142 additions & 59 deletions R/phea.R
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,25 @@ if(!exists('.pheaglobalenv'))
.pheaglobalenv <- new.env(parent=emptyenv())


# Setup Phea ------------------------------------------------------------------------------------------------------
#' Setup Phea
#'
#' Configures functions `sqlt()` and `sql0()` for use.
#'
#' Stores the DBI connection for later use.
#'
#' @export
#' @param connection DBI-compatible SQL connection (e.g. produced by DBI::dbConnect).
#' @param schema Schema to be used by default in `sqlt()`.
setup_phea <- function(connection, schema, .verbose = TRUE) {
assign('con', connection, envir = .pheaglobalenv)
assign('schema', schema, envir = .pheaglobalenv)
assign('verbose', .verbose, envir = .pheaglobalenv)
}


# Keep row by & keep change of ------------------------------------------------------------------------------------
#' Keep [first] row by [window function]
#' Keep [first or last] row by [window function]
#'
#' Keeps the row containing the group-wise maximum or minimum.
#'
Expand Down Expand Up @@ -35,13 +52,29 @@ keep_row_by <- function(lazy_tbl, by, partition, pick_last = FALSE) {
res
}


#' Keep change of [a column or SQL expression]
#'
#' Keeps only the rows where the value of `of` changes. `of` can be the name of a column, or any SQL expression valid
#' inside a `SELECT` statement.
#'
#' If `partition` is provided, changes are limited to within it. If `order` provided, rows
#' are ordered (and change is detected) according to that column or columns. The first row (within a partition or not)
#' is always kept.
#'
#' @export
#' @param lazy_tbl Lazy table to be filtered.
#' @param of Character. Column or SQL expression. Only rows where `of` changes are kept.
#' @param partition Character. Optional. Variable or variables to define the partition.
#' @param order Character. Optional. If provided, this or these column(s) will define the ordering of the rows and hence
#' how changes are detected.
#' @return Lazy table with only rows where `of` changes in comparison to the previous row.
keep_change_of <- function(lazy_tbl, of, partition = NULL, order = NULL) {
lazy_tbl |>
mutate(phea_kco_lag = dbplyr::win_over(sql(paste0('lag(', of, ')')),
mutate(phea_kco_var = sql(of)) |>
mutate(phea_kco_lag = dbplyr::win_over(sql('lag(phea_kco_var)'),
partition = partition, order = order, con = .pheaglobalenv$con)) |>
filter(is.na(phea_kco_lag) || phea_kco_lag != !!sym(of)) |>
select(-phea_kco_lag)
filter(is.na(phea_kco_lag) || phea_kco_lag != phea_kco_var) |>
select(-phea_kco_var, -phea_kco_lag)
}


Expand Down Expand Up @@ -94,22 +127,6 @@ code_shot <- function(lazy_tbl, clip = TRUE) {
return(res)
}

# Setup Phea ------------------------------------------------------------------------------------------------------
#' Setup Phea
#'
#' Configures functions `sqlt()` and `sql0()` for use.
#'
#' Stores the DBI connection for later use.
#'
#' @export
#' @param connection DBI-compatible SQL connection (e.g. produced by DBI::dbConnect).
#' @param schema Schema to be used by default in `sqlt()`.
setup_phea <- function(connection, schema, .verbose = TRUE) {
assign('con', connection, envir = .pheaglobalenv)
assign('schema', schema, envir = .pheaglobalenv)
assign('verbose', .verbose, envir = .pheaglobalenv)
}


# sql0, sqlt & sqla -----------------------------------------------------------------------------------------------
#' SQL table
Expand Down Expand Up @@ -222,9 +239,14 @@ sqla <- function(args, ...) {
#' @seealso [make_record_source()] to create a record source.
#' @return Phea component object.
make_component <- function(input_source, line = NA, delay = NA, window = NA, rec_name = NA,
.passthrough = FALSE, .ts = NULL, .pid = NULL, .delay_fn = 'last_value', .ts_fn = NULL) {
.passthrough = FALSE, .ts = NULL, .pid = NULL, .fn = NA, .ts_fn = NULL,
ahead = NA, up_to = NA) {
component <- list()

if((!is.na(ahead) || !is.na(up_to)) && (!is.na(delay) || !is.na(window))) {
stop('Cannot utilize ahead/up_to together with delay/window.')
}

if(isTRUE(attr(input_source, 'phea') == 'component')) {
# rec_source is actually a component.
component <- input_source
Expand All @@ -233,32 +255,36 @@ make_component <- function(input_source, line = NA, delay = NA, window = NA, rec
component$rec_source <- input_source
} else {
if(isTRUE(attr(input_source, 'phea') == 'phenotype')) {
# result from formula. Read its value column.
# Result is from calculate_formula().
component$rec_source <- make_record_source(
records = input_source,
rec_name = 'value',
ts = ts,
pid = pid)
records = input_source, ts = ts, pid = pid)
} else {
if('tbl_lazy' %in% class(input_source)) {
component$rec_source <- make_record_source(
records = input_source,
.ts = deparse(substitute(.ts)),
.pid = deparse(substitute(.pid)))
} else {
stop('Unable to recognize input_source.')
}
}
}

if(is.na(line) && is.na(delay) && is.na(window))
if(is.na(line) && is.na(delay) && is.na(window) && is.na(ahead) && is.na(up_to))
line <- 0

# Guarantee existance of the objects, even if it's NA.
component$line <- line
component$delay <- delay
component$comp_window <- window
component$.passthrough <- .passthrough
component$ahead <- ahead
component$up_to <- up_to
component$fn <- .fn
}

# Overwrite if provided.
## Overwrite if provided.

if(!is.na(line))
component$line <- line

Expand All @@ -274,14 +300,20 @@ make_component <- function(input_source, line = NA, delay = NA, window = NA, rec

if(!is.na(.passthrough))
component$.passthrough <- .passthrough

if(!is.na(ahead))
component$ahead <- ahead

if(!is.na(up_to))
component$up_to <- up_to

if(!is.na(.delay_fn))
component$delay_fn <- .delay_fn
if(!is.na(.fn))
component$fn <- .fn

if(!is.null(.ts_fn))
component$ts_fn <- .ts_fn
else
component$ts_fn <- component$delay_fn
component$ts_fn <- component$fn

attr(component, 'phea') <- 'component'

Expand Down Expand Up @@ -388,11 +420,13 @@ make_record_source <- function(records, rec_name = NULL, ts, pid, vars = NULL, .
#' character using `as.character()`), and also copies it to the clipboard.
#' @param .filter Character vector. Logical conditions to satisfy. These go into the SQL `WHERE`
#' clause. Only rows satisfying all conditions provided will be returned.
#' @param .out_window Character vector. Names of components to not be included when calculating the window.
#' @param .out_window Character vector. Names of components to not be included when calculating the window.
#' @param .dates Tibble. Column names must be `pid` (person ID) and `ts` (timestamp). If provided, these dates (for each
#' person ID) are added to the board, so that the phenotype computation can be attempted at those times.
#' @return Lazy table with result of formula or formulas.
calculate_formula <- function(components, fml = NULL, window = NA, export = NULL, add_components = NULL,
.ts = NULL, .pid = NULL, .delay = NULL, .line = NULL, .require_all = FALSE, .lim = NA, .dont_require = NULL,
.filter = NULL, .cascaded = TRUE, .clip_sql = FALSE, .out_window = NULL) {
.filter = NULL, .cascaded = TRUE, .clip_sql = FALSE, .out_window = NULL, .dates = NULL) {
# Prepare ---------------------------------------------------------------------------------------------------------
# TODO: Improve the logic regarding these two variables below.
keep_names_unchanged <- FALSE
Expand Down Expand Up @@ -546,7 +580,7 @@ calculate_formula <- function(components, fml = NULL, window = NA, export = NULL
dplyr::filter(rec_name == .env$rec_name) |>
dplyr::pull(column) |>
unique()

export_records <- record_source$records |>
dplyr::transmute(
name = local(rec_name),
Expand All @@ -561,6 +595,15 @@ calculate_formula <- function(components, fml = NULL, window = NA, export = NULL
board <- record_sources |>
purrr::map(prepare_record_source) |>
purrr::reduce(dplyr::union_all)

# Add extra dates, if any.
if(!is.null(.dates)) {
dates <- paste0("SELECT * FROM (VALUES ",
paste0("(", .dates$pid, ", '", .dates$ts, "'::date)", collapse = ', '),
") AS pid_ts (pid, ts)") |>
sql0()
board <- dplyr::union_all(board, dates)
}

# Produce components ----------------------------------------------------------------------------------------------
produce_component <- function(comp_name, component) {
Expand Down Expand Up @@ -601,8 +644,6 @@ calculate_formula <- function(components, fml = NULL, window = NA, export = NULL
} else
component_columns <- columns

# Por algum motivo, o "lag()" não funciona com "range between window preceeding and delay preceeding", i.e. não é
# possível aplicar line *e* delay em uma só chamada à função de window. O last_value() funciona com esse "range".
if(component$rec_source$type == 'column') {
stop('Column-type record source is not yet supported.')
capture_col <- component$rec_source$capture_col
Expand All @@ -620,19 +661,56 @@ calculate_formula <- function(components, fml = NULL, window = NA, export = NULL
sql_start <- paste0('last_value(', columns_sql, ') over (', over_clause, ' ')

sql_txts <- paste0(sql_start,
'rows between unbounded preceding and ', component$line, ' preceding)')
'rows between unbounded preceding and ',
ifelse(component$line == 0, 'current row', paste0(component$line, ' preceding')), ')')
} else {
# Otherwise, produce access via *delay*.
use_delay_fn <- rep(component$delay_fn, length(columns))
use_delay_fn[columns == 'ts'] <- component$ts_fn

sql_start <- paste0(use_delay_fn, '(', columns_sql, ') over (', over_clause, ' ')

sql_txts <- paste0(sql_start, 'range between ',
ifelse(is.na(component$comp_window), 'unbounded',
paste0('\'', component$comp_window, '\'::interval')), ' preceding ',
'and \'', ifelse(is.na(component$delay), '0 days', component$delay),
'\'::interval preceding)')
if(!is.na(component$delay) || !is.na(component$comp_window)) {
# Otherwise, produce access via *delay*.
comp_fn <- component$fn
if(is.na(comp_fn))
comp_fn <- 'last_value' # Lookbehind defaults to last_value

ts_fn <- component$ts_fn
if(is.na(ts_fn))
ts_fn <- comp_fn

use_fn <- rep(comp_fn, length(columns))
use_fn[columns == 'ts'] <- ts_fn

sql_start <- paste0(use_fn, '(', columns_sql, ') over (', over_clause, ' ')

sql_txts <- paste0(sql_start, 'range between ',
ifelse(is.na(component$comp_window) || component$comp_window == -Inf, 'unbounded',
paste0('\'', component$comp_window, '\'::interval')), ' preceding ',
'and ', ifelse(is.na(component$delay), 'current row',
paste0('\'', component$delay, ' days\'::interval preceding')),
')')
} else {
# Otherwise, produce access via *ahead/up_to*.
if(is.na(component$ahead) && is.na(component$up_to)) {
stop('Unable to identify type of component. All parameters are empty.')
}

comp_fn <- component$fn
if(is.na(comp_fn))
comp_fn <- 'first_value' # Lookahead defaults to first_value

ts_fn <- component$ts_fn
if(is.na(ts_fn))
ts_fn <- comp_fn

use_fn <- rep(comp_fn, length(columns))
use_fn[columns == 'ts'] <- ts_fn

sql_start <- paste0(use_fn, '(', columns_sql, ') over (', over_clause, ' ')

sql_txts <- paste0(sql_start, 'range between ',
ifelse(is.na(component$ahead), '0',
paste0('\'', component$ahead, '\'::interval')), ' following ',
'and ', ifelse(is.na(component$up_to) || component$up_to == Inf, 'unbounded',
paste0('\'', component$up_to, '\'::interval')),
' following)')
}
}

commands <- purrr::map2(component_columns, sql_txts,
Expand Down Expand Up @@ -668,18 +746,23 @@ calculate_formula <- function(components, fml = NULL, window = NA, export = NULL
arrange()

# Compute window --------------------------------------------------------------------------------------------------
# The front (most recent point) of the window is column ts of the current line. The back (oldest point) is the
# smallest among the ts's of the components.
window_components <- unique(setdiff(var_map$component_name, .out_window))
if(length(window_components) > 1 && !input_is_phenotype)
# TODO: REVISE THIS: The front (most recent point) of the window is column ts of the current line. The back (oldest
# point) is the smallest among the ts's of the components.
window_components <- setdiff(var_map$component_name, .out_window) |> unique()
if(length(window_components) > 1 && !input_is_phenotype) {
sql_ts_least <- paste0('least(', paste0(paste0(window_components, '_ts'), collapse = ', '), ')')
else
sql_ts_greatest <- paste0('greatest(', paste0(paste0(window_components, '_ts'), collapse = ', '), ')')
}
else {
# TODO: Improve this a bit.
sql_ts_least <- 'ts'
sql_ts_greatest <- 'ts'
}

board <- board |>
dplyr::mutate(
window = ts - dplyr::sql(sql_ts_least),
ts_row = dplyr::sql('last_value(row_id) over (partition by "pid", "ts")'))
window = dplyr::sql(sql_ts_greatest) - dplyr::sql(sql_ts_least),
phea_ts_row = dplyr::sql('last_value(row_id) over (partition by "pid", "ts")'))

# Keep the most complete computations -----------------------------------------------------------------------------
# Keep only the most complete computation in each timestamp. The most recent computation is the last one in each
Expand All @@ -694,15 +777,15 @@ calculate_formula <- function(components, fml = NULL, window = NA, export = NULL
if(length(required_components) > 0) {
sql_txt <- paste0(required_components, '_ts is not null') |> paste(collapse = ' and ')
board <- dplyr::filter(board,
row_id == ts_row && dplyr::sql(sql_txt))
row_id == phea_ts_row && dplyr::sql(sql_txt))
} else {
# No required components after all, because all were excluded by .dont_require.
board <- dplyr::filter(board,
row_id == ts_row)
row_id == phea_ts_row)
}
} else {
board <- board |>
dplyr::filter(row_id == ts_row)
dplyr::filter(row_id == phea_ts_row)
}

# Filter and calculate --------------------------------------------------------------------------------------------
Expand Down
6 changes: 5 additions & 1 deletion man/calculate_formula.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 259066b

Please sign in to comment.