Skip to content

Commit

Permalink
0.7.0
Browse files Browse the repository at this point in the history
  • Loading branch information
fabkury committed Jan 14, 2023
1 parent 71b7a7e commit 0a4d2d9
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 76 deletions.
47 changes: 28 additions & 19 deletions R/calculate_formula.R
Original file line number Diff line number Diff line change
Expand Up @@ -250,18 +250,33 @@ calculate_formula <- function(components, fml = NULL, window = NULL, export = NU
prid <- dbplyr::win_over(con = .pheaglobalenv$con,
expr = dplyr::sql('row_number()'), order = c('pid', 'ts'))

# Apply commands to the board all at once, so we only generate a single layer of "SELECT ... FROM (SELECT ...)".
# dates_from ------------------------------------------------------------------------------------------------------
if(filtering_dates) {
board <- dplyr::transmute(board,
phea_row_id = prid,
pid, ts, name,
!!!commands)
} else {
# Obtain `rec_name`s from the record sources of the target components.
rec_names <- unique(var_map[var_map$component_name %in% dates_from,]$rec_name)

dates_filter_sql <- paste0(dbQuoteId('name'), ' in (', paste0(rec_names, collapse = ', '), ')')

if(!.pheaglobalenv$compatibility_mode) {
# Apply the filter now.
# Keep only the rows coming from those `rec_name`s.
board <- board |>
dplyr::filter(dplyr::sql(dates_filter_sql))
}
}

# Apply commands to the board all at once, so we only generate a single layer of "SELECT ... FROM (SELECT ...)".
# if(filtering_dates) {
# board <- dplyr::transmute(board,
# phea_row_id = prid,
# pid, ts, name,
# !!!commands)
# } else {
board <- dplyr::transmute(board,
phea_row_id = prid,
pid, ts,
!!!commands)
}
# }

if(.pheaglobalenv$compatibility_mode) {
## Fill the blanks downward with the last non-blank value, within the patient.
Expand All @@ -279,18 +294,12 @@ calculate_formula <- function(components, fml = NULL, window = NULL, export = NU
# tidyr::fill.lazy_tbl() above.
board <- board |>
arrange()
}

# dates_from ------------------------------------------------------------------------------------------------------
if(filtering_dates) {
# Obtain `rec_name`s from the record sources of the target components.
rec_names <- unique(var_map[var_map$component_name %in% dates_from,]$rec_name)

sql_txt <- paste0(dbQuoteId('name'), ' in (', paste0(rec_names, collapse = ', '), ')')

# Keep only the rows coming from those `rec_name`s.
board <- board |>
dplyr::filter(dplyr::sql(sql_txt))

if(filtering_dates) {
# Keep only the rows coming from those `rec_name`s.
board <- board |>
dplyr::filter(dplyr::sql(dates_filter_sql))
}
}

# Compute window --------------------------------------------------------------------------------------------------
Expand Down
76 changes: 36 additions & 40 deletions R/component.R
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ make_component <- function(input_source,
line = NULL, bound = NULL, delay = NULL, window = NULL, ahead = NULL, up_to = NULL,
pid = NULL, .pid = NULL ,ts = NULL, .ts = NULL,
fn = NULL, ts_fn = NULL, arg = NULL, ts_arg = NULL, omit_value = NULL, ts_omit_value = NULL,
# pick_by = NULL,
passthrough = FALSE
) {
# Prepare ---------------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -135,9 +134,9 @@ make_component <- function(input_source,
if(!is.null(window)) component$window <- window
if(!is.null(ahead)) component$ahead <- ahead
if(!is.null(up_to)) component$up_to <- up_to
if(!is.null(fn)) component$fn <- fn
if(!is.null(fn)) component$fn <- tolower(fn)
if(!is.null(arg)) component$arg <- arg
if(!is.null(ts_fn)) component$ts_fn <- ts_fn
if(!is.null(ts_fn)) component$ts_fn <- tolower(ts_fn)
if(!is.null(ts_arg)) component$ts_arg <- ts_arg
if(!is.null(omit_value)) component$omit_value <- omit_value
if(!is.null(ts_omit_value)) component$ts_omit_value <- ts_omit_value
Expand Down Expand Up @@ -271,28 +270,14 @@ make_component <- function(input_source,
# use_omit_value is a vector of Boolean, one for each column.
use_omit_value <- capture_named_args(component$omit_value, FALSE,
component$ts_omit_value, component$columns, component$passthrough)

# Default nulls treatment -----------------------------------------------------------------------------------------
if(.pheaglobalenv$engine_code == 3) {
# TODO: Test this on DataBricks.
nulls_treatment_mask <- sapply(use_fn, grepl, pattern = 'last_value|first_value', ignore.case = TRUE)
if(any(nulls_treatment_mask)) {
use_arg[nulls_treatment_mask] <- ', TRUE'
use_omit_value[nulls_treatment_mask] <- FALSE
}
}

# Default to line = 0 if needed -----------------------------------------------------------------------------------
# At this point, if any of the parameters line/bound/delay/window/ahead/up_to were NA, they are now NULL (or their
# appropriate default value). Notice that we didn't change the parameters themselves to NULL inside `component`,
# but just the local variables here -- except for `fn` and `ts_fn`.
if(isTRUE(is.na(line)))
line <- NULL

# In the absence of any parameter line/bound/delay/window/ahead/up_to, we default to line = 0. Notice we also store
# it in the `component`.
if(is.null(line) && is.null(bound) && is.null(delay) && is.null(window) && is.null(ahead) && is.null(up_to)) {
# In the absence of any parameter line/bound/delay/window/ahead/up_to, we default to line = 0.
if(is.null(line) && is.null(bound) && is.null(delay) && is.null(window) && is.null(ahead) && is.null(up_to))
line <- 0
component$line <- 0
}

# Build window function SQL ---------------------------------------------------------------------------------------
# columns_sql is vectorized by the presence of `component$columns`
Expand All @@ -304,31 +289,42 @@ make_component <- function(input_source,
over_clause <- paste0(
'partition by ', dbQuoteId('pid'),
# ', ', dbQuoteId('name'),
' order by ',
dbQuoteId('ts'))
' order by ', dbQuoteId('ts'))

# component_has_been_built is just to help us trim the code identation, as opposed to using nested if-else`s.
component_has_been_built <- FALSE
make_params_sql <- function() {
purrr::pmap(list(columns_sql, use_arg, use_omit_value), function(x, y, z) {
if(z) return(y)
if(y == '') return(x)
else return(paste0(x, ', ', y))
})
make_window_fn_call_expr <- function(use_fn, columns_sql, use_arg, use_omit_value) {
list(use_fn, columns_sql, use_arg, use_omit_value) |>
purrr::pmap(function(fn, col_acc, arg, omit_value) {
if(fn %in% c('last_value', 'first_value')) {
# NULLs treatment
return(
switch(.pheaglobalenv$engine_code,
NA, # 1: engine_code == 1 doesn't arrive here because the function name is changed
paste0(fn, '(', col_acc, ' IGNORE NULLS)'), # 2: last_value(expr IGNORE NULLS)
paste0(fn, '(', col_acc, ', TRUE)'), # 3: last_value(expr, TRUE)
paste0(fn, '(', col_acc, ') IGNORE NULLS'), # 4: last_value(expr) IGNORE NULLS
stop('Unable to recognize SQL engine code.'))
)
} else {
if(omit_value)
return(paste0(fn, '(', arg, ')'))

if(arg == '')
return(paste0(fn, '(', col_acc, ')'))

return(paste0(fn, '(', col_acc, ', ', arg, ')'))
}
})
}

# Now we figure out the acess mode, and produce the window function calls.
if(isTRUE(is.na(line)))
line <- NULL

if(.pheaglobalenv$compatibility_mode) {
# This is the "most default" case: the user just wants the most recent record of the component, without line/
# bound/delay/window/ahead/up_to. In this case, we don't need a window function. We can just copy the column
# whenever the line comes from the correct record source, then use tidyr::fill() to fill NULLs downward.
# In other words, the SQL to access the value is merely the CASE WHEN ... statement that otherwise goes inside the
# window function call.
component$access <- 'line'

component$access_sql <- dplyr::sql(columns_sql)
component_has_been_built <- TRUE
}
Expand All @@ -339,10 +335,11 @@ make_component <- function(input_source,
# window functions' RANGE mode, forcing us to not use it when mode is not ROWS.
component$access <- 'line'

params_sql <- make_params_sql()
winfn_call_expr <- make_window_fn_call_expr(use_fn, columns_sql, use_arg, use_omit_value)

component$access_sql <- lapply(seq(component$columns), \(i) {
sql_txt <- paste0(use_fn[i], '(', params_sql[i], ')')
sql_txt <- winfn_call_expr[[i]]

dbplyr::win_over(con = .pheaglobalenv$con,
expr = dplyr::sql(sql_txt),
partition = 'pid',
Expand All @@ -360,11 +357,10 @@ make_component <- function(input_source,
if(!component_has_been_built) {
# As commented above, for access other than *line* we need to write out the window function call by ourselves,
# because dbplyr::win_over() does not support RANGE mode.
params_sql <- make_params_sql()

sql_start <- paste0(use_fn, '(', params_sql, ') over (', over_clause, ' range between ') # 1
# switch(.pheaglobalenv$engine_code,
# paste0(use_fn, '(', params_sql, ') over (', over_clause, ' range between ')) # 1
winfn_call_expr <- make_window_fn_call_expr(use_fn, columns_sql, use_arg, use_omit_value)

sql_start <- paste0(winfn_call_expr, ' over (', over_clause, ' range between ')

if(!is.null(delay) || !is.null(window)) {
# Produce access via *delay/window*.
Expand Down
9 changes: 5 additions & 4 deletions R/setup.R
Original file line number Diff line number Diff line change
Expand Up @@ -84,22 +84,23 @@ setup_phea <- function(connection, schema, verbose = TRUE, engine = NULL, compat
assign('engine_code', engine_code, envir = .pheaglobalenv)
assign('compatibility_mode', compatibility_mode, envir = .pheaglobalenv)

if(engine == 'postgres') {
if(engine == 'postgres' && !compatibility_mode) {
sql_function_exists <- function(name) {
function_check <- DBI::dbGetQuery(.pheaglobalenv$con,
paste0('select * from
pg_proc p
join pg_namespace n
inner join pg_namespace n
on p.pronamespace = n.oid
where proname =\'', name, '\';')) |>
nrow()
return(function_check == 1)
}

need_to_install <- c('phea_coalesce_r_sfunc', 'phea_coalesce_nr_sfunc',
already_installed <- c('phea_coalesce_r_sfunc', 'phea_coalesce_nr_sfunc',
'phea_last_value_ignore_nulls', 'phea_first_value_ignore_nulls') |>
sapply(sql_function_exists, USE.NAMES = TRUE)
need_to_install <- !need_to_install

need_to_install <- !already_installed

if(any(need_to_install) && verbose)
message('Engine configured to PostgreSQL.')
Expand Down
9 changes: 3 additions & 6 deletions man/make_record_source.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 19 additions & 7 deletions man/setup_phea.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 0a4d2d9

Please sign in to comment.