diff --git a/R/executeDqChecks.R b/R/executeDqChecks.R index 69dcf381..6aded2ae 100644 --- a/R/executeDqChecks.R +++ b/R/executeDqChecks.R @@ -43,6 +43,7 @@ #' @param tableCheckThresholdLoc The location of the threshold file for evaluating the table checks. If not specified the default thresholds will be applied. #' @param fieldCheckThresholdLoc The location of the threshold file for evaluating the field checks. If not specified the default thresholds will be applied. #' @param conceptCheckThresholdLoc The location of the threshold file for evaluating the concept checks. If not specified the default thresholds will be applied. +#' @param resume Boolean to indicate if processing will be resumed #' #' @return If sqlOnly = FALSE, a list object of results #' @@ -75,7 +76,8 @@ executeDqChecks <- function(connectionDetails, cdmVersion = "5.3", tableCheckThresholdLoc = "default", fieldCheckThresholdLoc = "default", - conceptCheckThresholdLoc = "default") { + conceptCheckThresholdLoc = "default", + resume = FALSE) { # Check input ------------------------------------------------------------------------------------------------------------------- if (!("connectionDetails" %in% class(connectionDetails))) { stop("connectionDetails must be an object of class 'connectionDetails'.") @@ -96,6 +98,7 @@ executeDqChecks <- function(connectionDetails, stopifnot(is.null(checkNames) | is.character(checkNames), is.null(tablesToExclude) | is.character(tablesToExclude)) stopifnot(is.character(cdmVersion)) + stopifnot(is.logical(resume)) # Warning if check names for determining NA is missing if (!length(checkNames) == 0) { @@ -248,7 +251,9 @@ executeDqChecks <- function(connectionDetails, cohortDatabaseSchema, cohortDefinitionId, outputFolder, + outputFile, sqlOnly, + resume, progressBar = TRUE ) ParallelLogger::stopCluster(cluster = cluster) diff --git a/R/runCheck.R b/R/runCheck.R index e7bb491b..778619ea 100644 --- a/R/runCheck.R +++ b/R/runCheck.R @@ -27,7 +27,9 @@ #' @param cohortDatabaseSchema The schema where the cohort table is located. #' @param cohortDefinitionId The cohort definition id for the cohort you wish to run the DQD on. The package assumes a standard OHDSI cohort table called 'Cohort' #' @param outputFolder The folder to output logs and SQL files to +#' @param outputFile (OPTIONAL) File to re-use results of previous execution if resume is set #' @param sqlOnly Should the SQLs be executed (FALSE) or just returned (TRUE)? +#' @param resume Boolean to indicate if processing will be resumed #' #' @import magrittr #' @@ -44,7 +46,9 @@ cohortDatabaseSchema, cohortDefinitionId, outputFolder, - sqlOnly) { + outputFile = "", + sqlOnly, + resume) { ParallelLogger::logInfo(sprintf("Processing check description: %s", checkDescription$checkName)) filterExpression <- sprintf( @@ -65,6 +69,21 @@ } if (nrow(checks) > 0) { + recoveredNumber <- 0 + checkResultsSaved <- NULL + if (resume && nchar(outputFile) > 0 && file.exists(file.path(outputFolder, outputFile))) { + dqdResults <- jsonlite::read_json( + path = file.path(outputFolder, outputFile) + ) + checkResultsSaved <- lapply( + dqdResults$CheckResults, + function(cr) { + cr[sapply(cr, is.null)] <- NA + as.data.frame(cr) + } + ) + checkResultsSaved <- do.call(plyr::rbind.fill, checkResultsSaved) + } dfs <- apply(X = checks, MARGIN = 1, function(check) { columns <- lapply(names(check), function(c) { setNames(check[c], c) @@ -92,16 +111,52 @@ ), append = TRUE) data.frame() } else { - .processCheck( - connection = connection, - connectionDetails = connectionDetails, - check = check, - checkDescription = checkDescription, - sql = sql, - outputFolder = outputFolder - ) + checkResult <- NULL + if (!is.null(checkResultsSaved)) { + currentCheckId <- .getCheckId( + checkLevel = checkDescription$checkLevel, + checkName = checkDescription$checkName, + cdmTableName = check["cdmTableName"], + cdmFieldName = check["cdmFieldName"], + conceptId = check["conceptId"], + unitConceptId = check["unitConceptId"] + ) + checkResultCandidates <- checkResultsSaved %>% dplyr::filter(checkId == currentCheckId & is.na(ERROR)) + if (1 == nrow(checkResultCandidates)) { + savedResult <- checkResultCandidates[1, ] + warning <- if (is.null(savedResult$WARNING)) { NA } else { savedResult$WARNING } + checkResult <- .recordResult( + result = savedResult, + check = check, + checkDescription = checkDescription, + sql = sql, + executionTime = savedResult$EXECUTION_TIME, + warning = warning, + error = NA + ) + recoveredNumber <<- recoveredNumber + 1 + } + } + + if (is.null(checkResult)) { + checkResult <- .processCheck( + connection = connection, + connectionDetails = connectionDetails, + check = check, + checkDescription = checkDescription, + sql = sql, + outputFolder = outputFolder + ) + } + + checkResult } }) + + if (recoveredNumber > 0) { + ParallelLogger::logInfo(sprintf("Recovered %s of %s results from %s", recoveredNumber, nrow(checks), outputFile)) + } + do.call(rbind, dfs) } else { ParallelLogger::logWarn(paste0("Warning: Evaluation resulted in no checks: ", filterExpression))