diff --git a/DESCRIPTION b/DESCRIPTION index e4618b0..74084d1 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -2,7 +2,7 @@ Package: MethodEvaluation Type: Package Title: Package for Evaluation of Estimation Methods Version: 2.3.1 -Date: 2024-12-11 +Date: 2024-12-16 Authors@R: c( person("Martijn", "Schuemie", , "schuemie@ohdsi.org", role = c("aut", "cre")) ) diff --git a/NEWS.md b/NEWS.md index ac396ff..6929d75 100644 --- a/NEWS.md +++ b/NEWS.md @@ -5,6 +5,12 @@ Changes: 1. Updating Circe-generated SQL (to avoid errors on DataBricks). +2. Refactoring code so connections do not stay open longer than strictly necessary (to avoid timeout errors on DataBricks). + +3. Increasing required version of FeatureExtraction to avoid warning about use of deprecated argument `oracleTempSchema`. + +4. Fixing nesting cohorts to end at end of observation. + MethodEvaluation 2.3.0 ====================== diff --git a/R/Plots.R b/R/Plots.R index 2ee562c..0c271d7 100644 --- a/R/Plots.R +++ b/R/Plots.R @@ -167,7 +167,6 @@ plotCoverageInjectedSignals <- function(logRr, seLogRr, trueLogRr, region = 0.95 vizD$label <- paste(round(100 * vizD$fraction), "%", sep = "") vizD$group <- factor(vizD$group, levels = c("Below CI", "Within CI", "Above CI")) - theme <- ggplot2::element_text(colour = "#000000", size = 10) plot <- with(vizD, { ggplot2::ggplot(vizD, ggplot2::aes(x = as.factor(trueRr), y = fraction)) + ggplot2::geom_bar(ggplot2::aes(fill = group), stat = "identity", position = "stack", alpha = 0.8) + @@ -181,7 +180,7 @@ plotCoverageInjectedSignals <- function(logRr, seLogRr, trueLogRr, region = 0.95 panel.grid.major = ggplot2::element_blank(), axis.ticks = ggplot2::element_blank(), axis.text.y = ggplot2::element_blank(), - axis.text.x = theme, + axis.text.x = ggplot2::element_text(colour = "#000000", size = 10), legend.key = ggplot2::element_blank(), legend.position = "right" ) diff --git a/R/PositiveControlSynthesis.R b/R/PositiveControlSynthesis.R index feb7832..1b19653 100644 --- a/R/PositiveControlSynthesis.R +++ b/R/PositiveControlSynthesis.R @@ -117,8 +117,7 @@ #' @param outputTable The name of the table names that will contain the generated #' outcome cohorts. #' @param workFolder Path to a folder where intermediate data will be stored. -#' @param cdmVersion Define the OMOP CDM version used: currently support "4" and -#' "5". +#' @param cdmVersion DEPRECATED. This argument is ignored. #' @param modelThreads Number of parallel threads to use when fitting outcome #' models. #' @param generationThreads Number of parallel threads to use when generating outcomes. @@ -187,13 +186,16 @@ synthesizePositiveControls <- function(connectionDetails, precision = 0.01, outputIdOffset = 1000, workFolder = "./SignalInjectionTemp", - cdmVersion = "5", + cdmVersion = NULL, modelThreads = 1, generationThreads = 1) { if (!is.null(oracleTempSchema) && oracleTempSchema != "") { warning("The 'oracleTempSchema' argument is deprecated. Use 'tempEmulationSchema' instead.") tempEmulationSchema <- oracleTempSchema } + if (!is.null(cdmVersion) && cdmVersion != "") { + warning("The 'cdmVersion' argument is deprecated.") + } errorMessages <- checkmate::makeAssertCollection() checkmate::assertClass(connectionDetails, "ConnectionDetails", add = errorMessages) checkmate::assertCharacter(cdmDatabaseSchema, len = 1, add = errorMessages) @@ -222,7 +224,6 @@ synthesizePositiveControls <- function(connectionDetails, checkmate::assertNumeric(precision, lower = 0, len = 1, add = errorMessages) checkmate::assertInt(outputIdOffset, lower = 0, add = errorMessages) checkmate::assertCharacter(workFolder, len = 1, add = errorMessages) - checkmate::assertCharacter(cdmVersion, len = 1, add = errorMessages) checkmate::assertInt(modelThreads, lower = 1, add = errorMessages) checkmate::assertInt(generationThreads, lower = 1, add = errorMessages) checkmate::reportAssertions(collection = errorMessages) @@ -232,22 +233,105 @@ synthesizePositiveControls <- function(connectionDetails, if (modelType == "poisson" && addIntentToTreat) { stop("Intent to treat injection not yet implemented for Poisson models") } - if (cdmVersion == "4") { - stop("CDM version 4 is not supported") - } if (!grepl("start$|end$", endAnchor, ignore.case = TRUE)) { stop("endAnchor should have value 'cohort start' or 'cohort end'") } - isEnd <- function(anchor) { - return(grepl("end$", anchor, ignore.case = TRUE)) - } if (!file.exists(workFolder)) { dir.create(workFolder) } - exposuresFile <- file.path(workFolder, "exposures.rds") - outcomesFile <- file.path(workFolder, "outcomes.rds") - priorOutcomesFile <- file.path(workFolder, "priorOutcomes.rds") - countsFile <- file.path(workFolder, "counts.rds") + result <- loadDataFitModels(connectionDetails = connectionDetails, + cdmDatabaseSchema = cdmDatabaseSchema, + tempEmulationSchema = tempEmulationSchema, + exposureDatabaseSchema = exposureDatabaseSchema, + exposureTable = exposureTable, + outcomeDatabaseSchema = outcomeDatabaseSchema, + outcomeTable = outcomeTable, + exposureOutcomePairs = exposureOutcomePairs, + modelType = modelType, + minOutcomeCountForModel = minOutcomeCountForModel, + minModelCount = minModelCount, + covariateSettings = covariateSettings, + prior = prior, + control = control, + firstExposureOnly = firstExposureOnly, + washoutPeriod = washoutPeriod, + riskWindowStart = riskWindowStart, + riskWindowEnd = riskWindowEnd, + endAnchor = endAnchor, + firstOutcomeOnly = firstOutcomeOnly, + removePeopleWithPriorOutcomes = removePeopleWithPriorOutcomes, + maxSubjectsForModel = maxSubjectsForModel, + effectSizes = effectSizes, + outputIdOffset = outputIdOffset, + workFolder = workFolder, + modelThreads = modelThreads) + + result <- generateAllOutcomes(workFolder = workFolder, + generationThreads = generationThreads, + minOutcomeCountForInjection = minOutcomeCountForInjection, + removePeopleWithPriorOutcomes = removePeopleWithPriorOutcomes, + modelType = modelType, + effectSizes = effectSizes, + precision = precision, + addIntentToTreat = addIntentToTreat, + result = result) + + if (is.null(getOption("skipPositiveControlUpload")) || !getOption("skipPositiveControlUpload")) { + insertOutcomes(connectionDetails = connectionDetails, + cdmDatabaseSchema = cdmDatabaseSchema, + outputDatabaseSchema = outputDatabaseSchema, + outputTable = outputTable, + outcomeDatabaseSchema = outcomeDatabaseSchema, + outcomeTable = outcomeTable, + tempEmulationSchema = tempEmulationSchema, + createOutputTable = createOutputTable, + result = result) + } + + return(result) +} + +loadDataFitModels <- function(connectionDetails, + cdmDatabaseSchema, + tempEmulationSchema, + exposureDatabaseSchema, + exposureTable, + outcomeDatabaseSchema, + outcomeTable, + exposureOutcomePairs, + modelType, + minOutcomeCountForModel, + minModelCount, + covariateSettings, + prior, + control, + firstExposureOnly, + washoutPeriod, + riskWindowStart, + riskWindowEnd, + endAnchor, + firstOutcomeOnly, + removePeopleWithPriorOutcomes, + maxSubjectsForModel, + effectSizes, + outputIdOffset, + workFolder, + modelThreads){ + # Note: this is a big function, but the code shares several temp tables so + # need connection to stay alive. + + # Find all exposures for each outcome, then identify unique groups of exposures + group <- function(outcomeId) { + exposureIds <- exposureOutcomePairs$exposureId[exposureOutcomePairs$outcomeId == outcomeId] + return(exposureIds[order(exposureIds)]) + } + outcomeIds <- unique(exposureOutcomePairs$outcomeId) + groups <- purrr::map(unique(outcomeIds), group) + uniqueGroups <- unique(groups) + saveRDS(uniqueGroups, file.path(workFolder, "uniqueGroups.rds")) + outcomeIdToGroupId <- data.frame(outcomeIds = outcomeIds, groupIds = match(groups, uniqueGroups)) + saveRDS(outcomeIdToGroupId, file.path(workFolder, "outcomeIdToGroupId.rds")) + result <- tibble( exposureId = rep(exposureOutcomePairs$exposureId, each = length(effectSizes)), outcomeId = rep(exposureOutcomePairs$outcomeId, each = length(effectSizes)), @@ -260,46 +344,127 @@ synthesizePositiveControls <- function(connectionDetails, modelFolder = "", outcomesToInjectFile = "" ) + + connection <- DatabaseConnector::connect(connectionDetails) + on.exit(DatabaseConnector::disconnect(connection)) + + result <- loadDataForSynthesis(connection = connection, + cdmDatabaseSchema = cdmDatabaseSchema, + tempEmulationSchema = tempEmulationSchema, + exposureDatabaseSchema = exposureDatabaseSchema, + exposureTable = exposureTable, + outcomeDatabaseSchema = outcomeDatabaseSchema, + outcomeTable = outcomeTable, + exposureOutcomePairs = exposureOutcomePairs, + modelType = modelType, + firstExposureOnly = firstExposureOnly, + washoutPeriod = washoutPeriod, + riskWindowStart = riskWindowStart, + riskWindowEnd = riskWindowEnd, + endAnchor = endAnchor, + firstOutcomeOnly = firstOutcomeOnly, + removePeopleWithPriorOutcomes = removePeopleWithPriorOutcomes, + workFolder = workFolder, + result = result) + + getCovariatesForModels(connection = connection, + cdmDatabaseSchema = cdmDatabaseSchema, + tempEmulationSchema = tempEmulationSchema, + covariateSettings = covariateSettings, + maxSubjectsForModel = maxSubjectsForModel, + workFolder = workFolder, + uniqueGroups = uniqueGroups) + + result <- buildModels(outcomeIdToGroupId = outcomeIdToGroupId, + uniqueGroups = uniqueGroups, + exposureOutcomePairs = exposureOutcomePairs, + effectSizes = effectSizes, + minModelCount = minModelCount, + minOutcomeCountForModel = minOutcomeCountForModel, + workFolder = workFolder, + modelThreads = modelThreads, + removePeopleWithPriorOutcomes = removePeopleWithPriorOutcomes, + modelType = modelType, + prior = prior, + control = control, + result = result) + + getCovariatesOutsideSample(uniqueGroups = uniqueGroups, + outcomeIdToGroupId = outcomeIdToGroupId, + connection = connection, + cdmDatabaseSchema = cdmDatabaseSchema, + tempEmulationSchema = tempEmulationSchema, + covariateSettings = covariateSettings, + workFolder = workFolder, + result = result) + + sql <- "TRUNCATE TABLE #cohort_person; DROP TABLE #cohort_person;" + sql <- SqlRender::translate(sql, targetDialect = DatabaseConnector::dbms(connection), tempEmulationSchema = tempEmulationSchema) + DatabaseConnector::executeSql(connection, sql, progressBar = FALSE, reportOverallTime = FALSE) + + summaryFile <- file.path(workFolder, "summary.rds") + saveRDS(result, summaryFile) + return(result) +} +loadDataForSynthesis <- function(connection, + cdmDatabaseSchema, + tempEmulationSchema, + exposureDatabaseSchema, + exposureTable, + outcomeDatabaseSchema, + outcomeTable, + exposureOutcomePairs, + modelType, + firstExposureOnly, + washoutPeriod, + riskWindowStart, + riskWindowEnd, + endAnchor, + firstOutcomeOnly, + removePeopleWithPriorOutcomes, + workFolder, + result = result) { exposureIds <- unique(exposureOutcomePairs$exposureId) - - conn <- DatabaseConnector::connect(connectionDetails) - on.exit(DatabaseConnector::disconnect(conn)) - + + exposuresFile <- file.path(workFolder, "exposures.rds") + outcomesFile <- file.path(workFolder, "outcomes.rds") + priorOutcomesFile <- file.path(workFolder, "priorOutcomes.rds") + countsFile <- file.path(workFolder, "counts.rds") + # Create exposure cohorts ------------------------------------ - ParallelLogger::logInfo("\nCreating risk windows") renderedSql <- SqlRender::loadRenderTranslateSql("CreateExposedCohorts.sql", - packageName = "MethodEvaluation", - dbms = connectionDetails$dbms, - tempEmulationSchema = tempEmulationSchema, - cdm_database_schema = cdmDatabaseSchema, - exposure_ids = exposureIds, - washout_period = washoutPeriod, - exposure_database_schema = exposureDatabaseSchema, - exposure_table = exposureTable, - first_exposure_only = firstExposureOnly, - risk_window_start = riskWindowStart, - risk_window_end = riskWindowEnd, - add_exposure_days_to_end = isEnd(endAnchor) + packageName = "MethodEvaluation", + dbms = DatabaseConnector::dbms(connection), + tempEmulationSchema = tempEmulationSchema, + cdm_database_schema = cdmDatabaseSchema, + exposure_ids = exposureIds, + washout_period = washoutPeriod, + exposure_database_schema = exposureDatabaseSchema, + exposure_table = exposureTable, + first_exposure_only = firstExposureOnly, + risk_window_start = riskWindowStart, + risk_window_end = riskWindowEnd, + add_exposure_days_to_end = grepl("end$", endAnchor, ignore.case = TRUE) ) - - DatabaseConnector::executeSql(conn, renderedSql) - + + DatabaseConnector::executeSql(connection, renderedSql) + # Get exposure cohorts from database ------------------------------- if (file.exists(exposuresFile)) { exposures <- readRDS(exposuresFile) } else { ParallelLogger::logInfo("\nRetrieving exposure cohorts") exposureSql <- SqlRender::loadRenderTranslateSql("GetExposedCohorts.sql", - packageName = "MethodEvaluation", - dbms = connectionDetails$dbms, - tempEmulationSchema = tempEmulationSchema + packageName = "MethodEvaluation", + dbms = DatabaseConnector::dbms(connection), + tempEmulationSchema = tempEmulationSchema ) - exposures <- DatabaseConnector::querySql(conn, exposureSql, snakeCaseToCamelCase = TRUE) + exposures <- DatabaseConnector::querySql(connection, exposureSql, snakeCaseToCamelCase = TRUE) exposures <- exposures[order(exposures$rowId), ] saveRDS(exposures, exposuresFile) } - + # Get prior outcomes (if needed) ------------------------------------ if (removePeopleWithPriorOutcomes) { if (file.exists(priorOutcomesFile)) { @@ -309,7 +474,7 @@ synthesizePositiveControls <- function(connectionDetails, table <- exposureOutcomePairs colnames(table) <- SqlRender::camelCaseToSnakeCase(colnames(table)) DatabaseConnector::insertTable( - connection = conn, + connection = connection, tableName = "#exposure_outcome", data = table, dropTableIfExists = TRUE, @@ -318,22 +483,22 @@ synthesizePositiveControls <- function(connectionDetails, tempEmulationSchema = tempEmulationSchema ) outcomeSql <- SqlRender::loadRenderTranslateSql("GetPriorOutcomes.sql", - packageName = "MethodEvaluation", - dbms = connectionDetails$dbms, - tempEmulationSchema = tempEmulationSchema, - cdm_database_schema = cdmDatabaseSchema, - outcome_database_schema = outcomeDatabaseSchema, - outcome_table = outcomeTable, - first_outcome_only = firstOutcomeOnly + packageName = "MethodEvaluation", + dbms = DatabaseConnector::dbms(connection), + tempEmulationSchema = tempEmulationSchema, + cdm_database_schema = cdmDatabaseSchema, + outcome_database_schema = outcomeDatabaseSchema, + outcome_table = outcomeTable, + first_outcome_only = firstOutcomeOnly ) - priorOutcomes <- DatabaseConnector::querySql(conn, outcomeSql, snakeCaseToCamelCase = TRUE) + priorOutcomes <- DatabaseConnector::querySql(connection, outcomeSql, snakeCaseToCamelCase = TRUE) saveRDS(priorOutcomes, priorOutcomesFile) sql <- "TRUNCATE TABLE #exposure_outcome; DROP TABLE #exposure_outcome;" - sql <- SqlRender::translate(sql, targetDialect = connectionDetails$dbms, tempEmulationSchema = tempEmulationSchema) - DatabaseConnector::executeSql(conn, sql, progressBar = FALSE, reportOverallTime = FALSE) + sql <- SqlRender::translate(sql, targetDialect = DatabaseConnector::dbms(connection), tempEmulationSchema = tempEmulationSchema) + DatabaseConnector::executeSql(connection, sql, progressBar = FALSE, reportOverallTime = FALSE) } } - + # Get outcomes --------------------------------------- if (file.exists(outcomesFile)) { outcomeCounts <- readRDS(outcomesFile) @@ -342,7 +507,7 @@ synthesizePositiveControls <- function(connectionDetails, table <- exposureOutcomePairs colnames(table) <- SqlRender::camelCaseToSnakeCase(colnames(table)) DatabaseConnector::insertTable( - connection = conn, + connection = connection, tableName = "#exposure_outcome", data = table, dropTableIfExists = TRUE, @@ -351,21 +516,21 @@ synthesizePositiveControls <- function(connectionDetails, tempEmulationSchema = tempEmulationSchema ) outcomeSql <- SqlRender::loadRenderTranslateSql("GetOutcomes.sql", - packageName = "MethodEvaluation", - dbms = connectionDetails$dbms, - tempEmulationSchema = tempEmulationSchema, - cdm_database_schema = cdmDatabaseSchema, - outcome_database_schema = outcomeDatabaseSchema, - outcome_table = outcomeTable, - first_outcome_only = firstOutcomeOnly + packageName = "MethodEvaluation", + dbms = DatabaseConnector::dbms(connection), + tempEmulationSchema = tempEmulationSchema, + cdm_database_schema = cdmDatabaseSchema, + outcome_database_schema = outcomeDatabaseSchema, + outcome_table = outcomeTable, + first_outcome_only = firstOutcomeOnly ) - outcomeCounts <- DatabaseConnector::querySql(conn, outcomeSql, snakeCaseToCamelCase = TRUE) + outcomeCounts <- DatabaseConnector::querySql(connection, outcomeSql, snakeCaseToCamelCase = TRUE) saveRDS(outcomeCounts, outcomesFile) sql <- "TRUNCATE TABLE #exposure_outcome; DROP TABLE #exposure_outcome;" - sql <- SqlRender::translate(sql, targetDialect = connectionDetails$dbms, tempEmulationSchema = tempEmulationSchema) - DatabaseConnector::executeSql(conn, sql, progressBar = FALSE, reportOverallTime = FALSE) + sql <- SqlRender::translate(sql, targetDialect = DatabaseConnector::dbms(connection), tempEmulationSchema = tempEmulationSchema) + DatabaseConnector::executeSql(connection, sql, progressBar = FALSE, reportOverallTime = FALSE) } - + # Generate counts for result object ------------------------------- if (file.exists(countsFile)) { result <- readRDS(countsFile) @@ -373,7 +538,7 @@ synthesizePositiveControls <- function(connectionDetails, ParallelLogger::logInfo("Computing counts per exposure - outcome pair") temp <- select(exposures, .data$rowId, .data$exposureId, .data$eraNumber) %>% inner_join(select(outcomeCounts, .data$rowId, .data$outcomeId, .data$y, .data$yItt), - by = "rowId" + by = "rowId" ) if (modelType == "survival") { temp <- temp %>% @@ -447,20 +612,17 @@ synthesizePositiveControls <- function(connectionDetails, ) saveRDS(result, countsFile) } + return(result) +} - # Build models ---------------------------- Find all exposures for each outcome, then identify unique - # groups of exposures - group <- function(outcomeId) { - exposureIds <- exposureOutcomePairs$exposureId[exposureOutcomePairs$outcomeId == outcomeId] - return(exposureIds[order(exposureIds)]) - } - outcomeIds <- unique(exposureOutcomePairs$outcomeId) - groups <- purrr::map(unique(outcomeIds), group) - uniqueGroups <- unique(groups) - saveRDS(uniqueGroups, file.path(workFolder, "uniqueGroups.rds")) - outcomeIdToGroupId <- data.frame(outcomeIds = outcomeIds, groupIds = match(groups, uniqueGroups)) - saveRDS(outcomeIdToGroupId, file.path(workFolder, "outcomeIdToGroupId.rds")) - +getCovariatesForModels <- function(connection, + cdmDatabaseSchema, + tempEmulationSchema, + covariateSettings, + maxSubjectsForModel, + workFolder, + uniqueGroups) { + # Fetch covariates for each group of exposures for (i in 1:length(uniqueGroups)) { covarFileName <- file.path(workFolder, sprintf("covarsForModel_g%s.zip", i)) @@ -468,7 +630,7 @@ synthesizePositiveControls <- function(connectionDetails, cohortIds <- uniqueGroups[[i]] sql <- "SELECT COUNT(*) AS entries FROM (SELECT DISTINCT subject_id, cohort_start_date FROM #cohort_person WHERE cohort_definition_id IN (@cohort_ids)) tmp;" count <- DatabaseConnector::renderTranslateQuerySql( - connection = conn, + connection = connection, sql = sql, tempEmulationSchema = tempEmulationSchema, cohort_ids = cohortIds, @@ -477,17 +639,17 @@ synthesizePositiveControls <- function(connectionDetails, if (count > maxSubjectsForModel) { ParallelLogger::logInfo("Sampling exposed cohorts for model(s)") renderedSql <- SqlRender::loadRenderTranslateSql("SampleExposedCohorts.sql", - packageName = "MethodEvaluation", - dbms = connectionDetails$dbms, - tempEmulationSchema = tempEmulationSchema, - sample_size = maxSubjectsForModel, - cohort_ids = cohortIds + packageName = "MethodEvaluation", + dbms = DatabaseConnector::dbms(connection), + tempEmulationSchema = tempEmulationSchema, + sample_size = maxSubjectsForModel, + cohort_ids = cohortIds ) - DatabaseConnector::executeSql(conn, renderedSql) - + DatabaseConnector::executeSql(connection, renderedSql) + sql <- "SELECT row_id FROM #sampled_person" sampledRowIds <- DatabaseConnector::renderTranslateQuerySql( - connection = conn, + connection = connection, sql = sql, tempEmulationSchema = tempEmulationSchema, snakeCaseToCamelCase = TRUE @@ -498,7 +660,7 @@ synthesizePositiveControls <- function(connectionDetails, # Don't sample, just copy: sql <- "SELECT * INTO #sampled_person FROM #cohort_person WHERE cohort_definition_id IN (@cohort_ids);" DatabaseConnector::renderTranslateExecuteSql( - connection = conn, + connection = connection, sql = sql, tempEmulationSchema = tempEmulationSchema, cohort_ids = cohortIds, @@ -508,14 +670,13 @@ synthesizePositiveControls <- function(connectionDetails, } ParallelLogger::logInfo("Extracting covariates for fitting outcome model(s)") covariateData <- FeatureExtraction::getDbCovariateData( - connection = conn, + connection = connection, tempEmulationSchema = tempEmulationSchema, cdmDatabaseSchema = cdmDatabaseSchema, cohortTable = "#sampled_person", cohortTableIsTemp = TRUE, rowIdField = "row_id", covariateSettings = covariateSettings, - cdmVersion = cdmVersion, aggregated = FALSE ) covariateData <- FeatureExtraction::tidyCovariateData( @@ -524,10 +685,10 @@ synthesizePositiveControls <- function(connectionDetails, removeRedundancy = TRUE ) FeatureExtraction::saveCovariateData(covariateData, covarFileName) - + sql <- "TRUNCATE TABLE #sampled_person; DROP TABLE #sampled_person;" DatabaseConnector::renderTranslateExecuteSql( - connection = conn, + connection = connection, tempEmulationSchema = tempEmulationSchema, sql = sql, progressBar = FALSE, @@ -535,8 +696,26 @@ synthesizePositiveControls <- function(connectionDetails, ) } } +} +buildModels <- function(outcomeIdToGroupId, + uniqueGroups, + exposureOutcomePairs, + effectSizes, + minModelCount, + minOutcomeCountForModel, + workFolder, + modelThreads, + removePeopleWithPriorOutcomes, + modelType, + prior, + control, + result) { ParallelLogger::logInfo("Fitting outcome models") + exposuresFile <- file.path(workFolder, "exposures.rds") + outcomesFile <- file.path(workFolder, "outcomes.rds") + priorOutcomesFile <- file.path(workFolder, "priorOutcomes.rds") + tasks <- list() modelsWithEnoughOutcomes <- 0 outcomeIds <- unique(exposureOutcomePairs$outcomeId) @@ -574,14 +753,13 @@ synthesizePositiveControls <- function(connectionDetails, " controls." ) } - + if (length(tasks) > 0) { cluster <- ParallelLogger::makeCluster(modelThreads) ParallelLogger::clusterApply( cluster, tasks, fitModel, - result, exposuresFile, outcomesFile, priorOutcomesFile, @@ -592,9 +770,123 @@ synthesizePositiveControls <- function(connectionDetails, ) ParallelLogger::stopCluster(cluster) } + return(result) +} + +fitModel <- function(task, + exposuresFile, + outcomesFile, + priorOutcomesFile, + removePeopleWithPriorOutcomes, + modelType, + prior, + control) { + ParallelLogger::logInfo("Fitting model for outcome ", task$outcomeId) + exposures <- readRDS(exposuresFile) + + outcomes <- readRDS(outcomesFile) + outcomes <- outcomes[outcomes$outcomeId == task$outcomeId, ] + if (file.exists(task$sampledExposuresFile)) { + sampledExposures <- readRDS(task$sampledExposuresFile) + outcomes <- outcomes[outcomes$rowId %in% sampledExposures$rowId, ] + exposures <- exposures[exposures$rowId %in% sampledExposures$rowId, ] + } else { + exposures <- exposures[exposures$exposureId %in% task$groupExposureIds, ] + outcomes <- outcomes[outcomes$rowId %in% exposures$rowId, ] + } + # Dedupe exposures for model fitting, so we don't overfit: + exposures <- exposures[order(exposures$personId, exposures$cohortStartDate), ] + exposures <- exposures[!duplicated(exposures[, c("personId", "cohortStartDate")]), ] + + covariateData <- FeatureExtraction::loadCovariateData(task$covarFileName) + covariates <- covariateData$covariates %>% + filter(.data$rowId %in% local(exposures$rowId)) + + if (removePeopleWithPriorOutcomes) { + priorOutcomes <- readRDS(priorOutcomesFile) + removeRowIds <- priorOutcomes$rowId[priorOutcomes$outcomeId == task$outcomeId] + outcomes <- outcomes[!(outcomes$rowId %in% removeRowIds), ] + exposures <- exposures[!(exposures$rowId %in% removeRowIds), ] + covariates <- covariates %>% + filter(!.data$rowId %in% removeRowIds) + } + outcomes <- merge(exposures, outcomes[, c( + "rowId", + "y", + "timeToEvent" + )], by = c("rowId"), all.x = TRUE) + outcomes <- outcomes[order(outcomes$rowId), ] + outcomes$y[is.na(outcomes$y)] <- 0 + names(outcomes)[names(outcomes) == "daysAtRisk"] <- "time" + if (modelType == "survival") { + # For survival, time is either the time to the end of the risk window, or the event + outcomes$y[outcomes$y != 0] <- 1 + outcomes$time[outcomes$y != 0] <- outcomes$timeToEvent[outcomes$y != 0] + } + outcomes$time <- outcomes$time + 1 + + # Note: for survival, using Poisson regression with 1 outcome and censored time as equivalent of + # survival regression: + covariateData$outcomes <- outcomes + cyclopsData <- Cyclops::convertToCyclopsData(covariateData$outcomes, + covariates, + modelType = "pr", + quiet = TRUE + ) + fit <- tryCatch( + { + Cyclops::fitCyclopsModel(cyclopsData, prior = prior, control = control) + }, + error = function(e) { + e$message + } + ) + if (fit$return_flag != "SUCCESS") { + fit <- fit$return_flag + } + if (is.character(fit)) { + ParallelLogger::logInfo(paste("Unable to fit model for outcome", task$outcomeId, ":", fit)) + dir.create(task$modelFolder) + write.csv(fit, file.path(task$modelFolder, "Error.txt")) + } else { + betas <- coef(fit) + intercept <- tibble( + beta = betas[1], + id = bit64::as.integer64(0), + covariateName = "(Intercept)", + row.names = NULL + ) + betas <- betas[betas != 0] + if (length(betas) > 1) { + betas <- betas[2:length(betas)] + betas <- tibble(beta = betas, covariateId = bit64::as.integer64(attr(betas, "names"))) + betas <- betas %>% + inner_join( + covariateData$covariateRef %>% + collect() %>% + mutate(covariateId = bit64::as.integer64(.data$covariateId)), + by = "covariateId" + ) %>% + select(.data$beta, id = .data$covariateId, .data$covariateName) %>% + arrange(desc(abs(.data$beta))) + betas <- bind_rows(intercept, betas) + } else { + betas <- intercept + } + dir.create(task$modelFolder) + saveRDS(betas, file.path(task$modelFolder, "betas.rds")) + } +} - # Generate new outcomes ----------------------------------------- Fetch covariates for all rows if - # covars for model were based on a sample: +getCovariatesOutsideSample <- function(uniqueGroups, + outcomeIdToGroupId, + connection, + cdmDatabaseSchema, + tempEmulationSchema, + covariateSettings, + workFolder, + result) { + # Fetch covariates for all rows if covars for model were based on a sample for (i in 1:length(uniqueGroups)) { sampledExposuresFile <- file.path(workFolder, sprintf("sampledRowIds_g%s.rds", i)) covarFileName <- file.path(workFolder, sprintf("covarsForPrediction_g%s.zip", i)) @@ -617,7 +909,7 @@ synthesizePositiveControls <- function(connectionDetails, } modelCovariateIds <- unique(modelCovariateIds) modelCovariateIds <- modelCovariateIds[modelCovariateIds != 0] - + ParallelLogger::logInfo(paste( "Number of unique covariates across outcome models:", length(modelCovariateIds) @@ -629,19 +921,18 @@ synthesizePositiveControls <- function(connectionDetails, sql <- "SELECT * INTO #selected_person FROM #cohort_person WHERE cohort_definition_id IN (@cohort_ids);" sql <- SqlRender::render(sql, cohort_ids = uniqueGroups[[i]]) sql <- SqlRender::translate(sql, - targetDialect = connectionDetails$dbms, - tempEmulationSchema = tempEmulationSchema + targetDialect = DatabaseConnector::dbms(connection), + tempEmulationSchema = tempEmulationSchema ) - DatabaseConnector::executeSql(conn, sql, progressBar = FALSE, reportOverallTime = FALSE) + DatabaseConnector::executeSql(connection, sql, progressBar = FALSE, reportOverallTime = FALSE) covariateData <- FeatureExtraction::getDbCovariateData( - connection = conn, + connection = connection, tempEmulationSchema = tempEmulationSchema, cdmDatabaseSchema = cdmDatabaseSchema, cohortTable = "#selected_person", cohortTableIsTemp = TRUE, rowIdField = "row_id", covariateSettings = covariateSettings, - cdmVersion = cdmVersion, aggregated = FALSE ) covariateData <- FeatureExtraction::tidyCovariateData( @@ -650,17 +941,33 @@ synthesizePositiveControls <- function(connectionDetails, removeRedundancy = FALSE ) FeatureExtraction::saveCovariateData(covariateData, covarFileName) - + sql <- "TRUNCATE TABLE #selected_person; DROP TABLE #selected_person;" sql <- SqlRender::translate(sql, - targetDialect = connectionDetails$dbms, - tempEmulationSchema = tempEmulationSchema + targetDialect = DatabaseConnector::dbms(connection), + tempEmulationSchema = tempEmulationSchema ) - DatabaseConnector::executeSql(conn, sql, progressBar = FALSE, reportOverallTime = FALSE) + DatabaseConnector::executeSql(connection, sql, progressBar = FALSE, reportOverallTime = FALSE) } } } +} + +generateAllOutcomes <- function(workFolder, + generationThreads, + minOutcomeCountForInjection, + removePeopleWithPriorOutcomes, + modelType, + effectSizes, + precision, + addIntentToTreat, + result) { + exposuresFile <- file.path(workFolder, "exposures.rds") + outcomesFile <- file.path(workFolder, "outcomes.rds") + priorOutcomesFile <- file.path(workFolder, "priorOutcomes.rds") + outcomeIdToGroupId <- readRDS(file.path(workFolder, "outcomeIdToGroupId.rds")) + ParallelLogger::logInfo("Generating outcomes") temp <- result[result$observedOutcomes > minOutcomeCountForInjection, c( "exposureId", @@ -706,195 +1013,9 @@ synthesizePositiveControls <- function(connectionDetails, ParallelLogger::stopCluster(cluster) result <- do.call("rbind", results) } - - # Save summary ----------------------------------------------------------------- - summaryFile <- .createSummaryFileName(workFolder) - saveRDS(result, summaryFile) - - if (!is.null(getOption("skipPositiveControlUpload")) && getOption("skipPositiveControlUpload")) { - return(result) - } - - # Insert outcomes into database ------------------------------------------------ - ParallelLogger::logInfo("Inserting additional outcomes into database") - fileNames <- result$outcomesToInjectFile[result$outcomesToInjectFile != ""] - outcomesToInject <- lapply(fileNames, readRDS) - outcomesToInject <- do.call("rbind", outcomesToInject) - colnames(outcomesToInject) <- SqlRender::camelCaseToSnakeCase(colnames(outcomesToInject)) - if (Sys.getenv("USE_MPP_BULK_LOAD") == "TRUE" || - Sys.getenv("DATABASE_CONNECTOR_BULK_UPLOAD") == "TRUE") { - tableName <- paste0( - outputDatabaseSchema, - ".temp_outcomes_", - paste(sample(letters, 5), collapse = "") - ) - tempTable <- FALSE - } else { - tableName <- "#temp_outcomes" - tempTable <- TRUE - } - - DatabaseConnector::insertTable( - connection = conn, - tableName = tableName, - data = outcomesToInject, - dropTableIfExists = TRUE, - createTable = TRUE, - tempTable = tempTable, - tempEmulationSchema = tempEmulationSchema, - progressBar = TRUE - ) - - toCopy <- result[result$modelFolder != "", c("outcomeId", "newOutcomeId")] - colnames(toCopy) <- SqlRender::camelCaseToSnakeCase(colnames(toCopy)) - DatabaseConnector::insertTable( - connection = conn, - tableName = "#to_copy", - data = toCopy, - dropTableIfExists = TRUE, - createTable = TRUE, - tempTable = TRUE, - tempEmulationSchema = tempEmulationSchema - ) - - ParallelLogger::logInfo("Copying negative control outcomes into database") - copySql <- SqlRender::loadRenderTranslateSql("CopyOutcomes.sql", - packageName = "MethodEvaluation", - dbms = connectionDetails$dbms, - cdm_database_schema = cdmDatabaseSchema, - tempEmulationSchema = tempEmulationSchema, - outcome_database_schema = outcomeDatabaseSchema, - outcome_table = outcomeTable, - output_database_schema = outputDatabaseSchema, - output_table = outputTable, - create_output_table = createOutputTable, - temp_outcomes_table = tableName - ) - - DatabaseConnector::executeSql(conn, copySql) - - sql <- "TRUNCATE TABLE #cohort_person; DROP TABLE #cohort_person;" - sql <- SqlRender::translate(sql, targetDialect = connectionDetails$dbms, tempEmulationSchema = tempEmulationSchema) - DatabaseConnector::executeSql(conn, sql, progressBar = FALSE, reportOverallTime = FALSE) - - sql <- "TRUNCATE TABLE #to_copy; DROP TABLE #to_copy;" - sql <- SqlRender::translate(sql, targetDialect = connectionDetails$dbms, tempEmulationSchema = tempEmulationSchema) - DatabaseConnector::executeSql(conn, sql, progressBar = FALSE, reportOverallTime = FALSE) - - sql <- "TRUNCATE TABLE @temp_outcomes_table; DROP TABLE @temp_outcomes_table;" - sql <- SqlRender::render(sql, temp_outcomes_table = tableName) - sql <- SqlRender::translate(sql, targetDialect = connectionDetails$dbms, tempEmulationSchema = tempEmulationSchema) - DatabaseConnector::executeSql(conn, sql, progressBar = FALSE, reportOverallTime = FALSE) - return(result) } -fitModel <- function(task, - result, - exposuresFile, - outcomesFile, - priorOutcomesFile, - removePeopleWithPriorOutcomes, - modelType, - prior, - control) { - ParallelLogger::logInfo("Fitting model for outcome ", task$outcomeId) - exposures <- readRDS(exposuresFile) - - outcomes <- readRDS(outcomesFile) - outcomes <- outcomes[outcomes$outcomeId == task$outcomeId, ] - if (file.exists(task$sampledExposuresFile)) { - sampledExposures <- readRDS(task$sampledExposuresFile) - outcomes <- outcomes[outcomes$rowId %in% sampledExposures$rowId, ] - exposures <- exposures[exposures$rowId %in% sampledExposures$rowId, ] - } else { - exposures <- exposures[exposures$exposureId %in% task$groupExposureIds, ] - outcomes <- outcomes[outcomes$rowId %in% exposures$rowId, ] - } - # Dedupe exposures for model fitting, so we don't overfit: - exposures <- exposures[order(exposures$personId, exposures$cohortStartDate), ] - exposures <- exposures[!duplicated(exposures[, c("personId", "cohortStartDate")]), ] - - covariateData <- FeatureExtraction::loadCovariateData(task$covarFileName) - covariates <- covariateData$covariates %>% - filter(.data$rowId %in% local(exposures$rowId)) - - if (removePeopleWithPriorOutcomes) { - priorOutcomes <- readRDS(priorOutcomesFile) - removeRowIds <- priorOutcomes$rowId[priorOutcomes$outcomeId == task$outcomeId] - outcomes <- outcomes[!(outcomes$rowId %in% removeRowIds), ] - exposures <- exposures[!(exposures$rowId %in% removeRowIds), ] - covariates <- covariates %>% - filter(!.data$rowId %in% removeRowIds) - } - outcomes <- merge(exposures, outcomes[, c( - "rowId", - "y", - "timeToEvent" - )], by = c("rowId"), all.x = TRUE) - outcomes <- outcomes[order(outcomes$rowId), ] - outcomes$y[is.na(outcomes$y)] <- 0 - names(outcomes)[names(outcomes) == "daysAtRisk"] <- "time" - if (modelType == "survival") { - # For survival, time is either the time to the end of the risk window, or the event - outcomes$y[outcomes$y != 0] <- 1 - outcomes$time[outcomes$y != 0] <- outcomes$timeToEvent[outcomes$y != 0] - } - outcomes$time <- outcomes$time + 1 - - # Note: for survival, using Poisson regression with 1 outcome and censored time as equivalent of - # survival regression: - covariateData$outcomes <- outcomes - cyclopsData <- Cyclops::convertToCyclopsData(covariateData$outcomes, - covariates, - modelType = "pr", - quiet = TRUE - ) - fit <- tryCatch( - { - Cyclops::fitCyclopsModel(cyclopsData, prior = prior, control = control) - }, - error = function(e) { - e$message - } - ) - if (fit$return_flag != "SUCCESS") { - fit <- fit$return_flag - } - if (is.character(fit)) { - ParallelLogger::logInfo(paste("Unable to fit model for outcome", task$outcomeId, ":", fit)) - dir.create(task$modelFolder) - write.csv(fit, file.path(task$modelFolder, "Error.txt")) - } else { - betas <- coef(fit) - intercept <- tibble( - beta = betas[1], - id = bit64::as.integer64(0), - covariateName = "(Intercept)", - row.names = NULL - ) - betas <- betas[betas != 0] - if (length(betas) > 1) { - betas <- betas[2:length(betas)] - betas <- tibble(beta = betas, covariateId = bit64::as.integer64(attr(betas, "names"))) - betas <- betas %>% - inner_join( - covariateData$covariateRef %>% - collect() %>% - mutate(covariateId = bit64::as.integer64(.data$covariateId)), - by = "covariateId" - ) %>% - select(.data$beta, id = .data$covariateId, .data$covariateName) %>% - arrange(desc(abs(.data$beta))) - betas <- bind_rows(intercept, betas) - } else { - betas <- intercept - } - dir.create(task$modelFolder) - saveRDS(betas, file.path(task$modelFolder, "betas.rds")) - } -} - generateOutcomes <- function(task, result, exposuresFile, @@ -1007,7 +1128,7 @@ generateOutcomes <- function(task, } } injectedRr <- 1 + (nrow(newOutcomes) / resultSubset$observedOutcomes[1]) - + # Count outcomes during first episodes: newOutcomeCountsFirstExposure <- sum(newOutcomeCounts[exposures$eraNumber == 1]) injectedRrFirstExposure <- 1 + @@ -1031,7 +1152,7 @@ generateOutcomes <- function(task, ", injected RR during ITT window =", injectedRrItt )) - + newOutcomeId <- resultSubset$newOutcomeId[resultSubset$targetEffectSize == effectSize] # Write new outcomes to file for later insertion into DB: if (nrow(newOutcomes) != 0) { @@ -1061,6 +1182,87 @@ generateOutcomes <- function(task, } } +insertOutcomes <- function(connectionDetails, + cdmDatabaseSchema, + outputDatabaseSchema, + outputTable, + outcomeDatabaseSchema, + outcomeTable, + tempEmulationSchema, + createOutputTable, + result) { + connection <- DatabaseConnector::connect(connectionDetails) + on.exit(DatabaseConnector::disconnect(connection)) + + # Insert outcomes into database ------------------------------------------------ + ParallelLogger::logInfo("Inserting additional outcomes into database") + fileNames <- result$outcomesToInjectFile[result$outcomesToInjectFile != ""] + outcomesToInject <- lapply(fileNames, readRDS) + outcomesToInject <- do.call("rbind", outcomesToInject) + colnames(outcomesToInject) <- SqlRender::camelCaseToSnakeCase(colnames(outcomesToInject)) + if (Sys.getenv("USE_MPP_BULK_LOAD") == "TRUE" || + Sys.getenv("DATABASE_CONNECTOR_BULK_UPLOAD") == "TRUE") { + tableName <- paste0( + outputDatabaseSchema, + ".temp_outcomes_", + paste(sample(letters, 5), collapse = "") + ) + tempTable <- FALSE + } else { + tableName <- "#temp_outcomes" + tempTable <- TRUE + } + + DatabaseConnector::insertTable( + connection = connection, + tableName = tableName, + data = outcomesToInject, + dropTableIfExists = TRUE, + createTable = TRUE, + tempTable = tempTable, + tempEmulationSchema = tempEmulationSchema, + progressBar = TRUE + ) + + toCopy <- result[result$modelFolder != "", c("outcomeId", "newOutcomeId")] + colnames(toCopy) <- SqlRender::camelCaseToSnakeCase(colnames(toCopy)) + DatabaseConnector::insertTable( + connection = connection, + tableName = "#to_copy", + data = toCopy, + dropTableIfExists = TRUE, + createTable = TRUE, + tempTable = TRUE, + tempEmulationSchema = tempEmulationSchema + ) + + ParallelLogger::logInfo("Copying negative control outcomes into database") + copySql <- SqlRender::loadRenderTranslateSql("CopyOutcomes.sql", + packageName = "MethodEvaluation", + dbms = DatabaseConnector::dbms(connection), + cdm_database_schema = cdmDatabaseSchema, + tempEmulationSchema = tempEmulationSchema, + outcome_database_schema = outcomeDatabaseSchema, + outcome_table = outcomeTable, + output_database_schema = outputDatabaseSchema, + output_table = outputTable, + create_output_table = createOutputTable, + temp_outcomes_table = tableName + ) + + DatabaseConnector::executeSql(connection, copySql) + + sql <- "TRUNCATE TABLE #to_copy; DROP TABLE #to_copy;" + sql <- SqlRender::translate(sql, targetDialect = DatabaseConnector::dbms(connection), tempEmulationSchema = tempEmulationSchema) + DatabaseConnector::executeSql(connection, sql, progressBar = FALSE, reportOverallTime = FALSE) + + sql <- "TRUNCATE TABLE @temp_outcomes_table; DROP TABLE @temp_outcomes_table;" + sql <- SqlRender::render(sql, temp_outcomes_table = tableName) + sql <- SqlRender::translate(sql, targetDialect = DatabaseConnector::dbms(connection), tempEmulationSchema = tempEmulationSchema) + DatabaseConnector::executeSql(connection, sql, progressBar = FALSE, reportOverallTime = FALSE) +} + + injectSurvival <- function(exposures, effectSize, precision, addIntentToTreat) { hasOutcome <- exposures$y != 0 observedCount <- sum(hasOutcome) @@ -1121,7 +1323,7 @@ injectSurvival <- function(exposures, effectSize, precision, addIntentToTreat) { rateAfterFirstExposure <- (sum(firstHasOutcome[!firstHasNewOutcome]) + sum(firstHasNewOutcome)) / (sum(firstTime[!firstHasNewOutcome]) + sum(firstTimeToEvent[firstHasNewOutcome] + 1)) injectedRrFirstExposure <- rateAfterFirstExposure / rateBeforeFirstExposure } - + if (addIntentToTreat) { hasOutcomeItt <- exposures$yItt > 0 observedCountItt <- sum(hasOutcomeItt) @@ -1136,7 +1338,7 @@ injectSurvival <- function(exposures, effectSize, precision, addIntentToTreat) { hasNewOutcomeItt <- c() ParallelLogger::logTrace("Generating outcomes for intent-to-treat") while (round(abs(sum(hasNewOutcomeItt) + sum(hasNewOutcome) - correctedTargetCount)) > precision * - correctedTargetCount) { + correctedTargetCount) { timeToNewOutcomeItt <- round(rexp(nrow(exposures), multiplier * exposures$prediction * (effectSize - 1))) hasNewOutcomeItt <- timeToNewOutcomeItt > exposures$daysAtRisk & timeToNewOutcomeItt < survivalTimeIttNew # Correct for censored time and outcomes: @@ -1180,12 +1382,6 @@ injectSurvival <- function(exposures, effectSize, precision, addIntentToTreat) { )) } -.createSummaryFileName <- function(folder) { - name <- "summary" - name <- paste(name, ".rds", sep = "") - return(file.path(folder, name)) -} - .predict <- function(betas, exposures, covariates, modelType) { intercept <- betas$beta[1] if (nrow(betas) == 1) { @@ -1205,7 +1401,7 @@ injectSurvival <- function(exposures, effectSize, precision, addIntentToTreat) { head(10000) %>% pull(.data$covariateId) %>% is("integer64") - + if (!covariateIdIsInteger64) { betas$covariateId <- as.numeric(betas$covariateId) } @@ -1217,7 +1413,7 @@ injectSurvival <- function(exposures, effectSize, precision, addIntentToTreat) { select(.data$rowId, .data$value) %>% ungroup() %>% collect() - + prediction <- prediction %>% right_join(select(exposures, .data$rowId, .data$daysAtRisk), by = "rowId") %>% mutate(value = coalesce(.data$value, 0) + intercept) %>% diff --git a/inst/sql/sql_server/NestingCohorts.sql b/inst/sql/sql_server/NestingCohorts.sql index 2e89b98..1eed4d2 100644 --- a/inst/sql/sql_server/NestingCohorts.sql +++ b/inst/sql/sql_server/NestingCohorts.sql @@ -17,24 +17,35 @@ limitations under the License. ************************************************************************/ INSERT INTO @target_database_schema.@target_cohort_table (cohort_definition_id, subject_id, cohort_start_date, cohort_end_date) SELECT ancestor_concept_id AS cohort_definition_id, - condition_occurrence.person_id AS subject_id, - MIN(condition_start_date) AS cohort_start_date, - MIN(condition_start_date) AS cohort_end_date + condition_occurrence.person_id AS subject_id, + MIN(condition_start_date) AS cohort_start_date, + observation_period_end_date AS cohort_end_date FROM @cdm_database_schema.condition_occurrence INNER JOIN @cdm_database_schema.concept_ancestor ON condition_concept_id = descendant_concept_id +INNER JOIN @cdm_database_schema.observation_period + ON condition_occurrence.person_id = observation_period.person_id + AND condition_start_date >= observation_period_start_date + AND condition_start_date <= observation_period_end_date WHERE ancestor_concept_id IN (@nesting_ids) GROUP BY ancestor_concept_id, - person_id; + condition_occurrence.person_id, + observation_period_end_date; + INSERT INTO @target_database_schema.@target_cohort_table (cohort_definition_id, subject_id, cohort_start_date, cohort_end_date) SELECT ancestor_concept_id AS cohort_definition_id, procedure_occurrence.person_id AS subject_id, MIN(procedure_date) AS cohort_start_date, - MIN(procedure_date) AS cohort_end_date + observation_period_end_date AS cohort_end_date FROM @cdm_database_schema.procedure_occurrence INNER JOIN @cdm_database_schema.concept_ancestor ON procedure_concept_id = descendant_concept_id +INNER JOIN @cdm_database_schema.observation_period + ON procedure_occurrence.person_id = observation_period.person_id + AND procedure_date >= observation_period_start_date + AND procedure_date <= observation_period_end_date WHERE ancestor_concept_id IN (@nesting_ids) GROUP BY ancestor_concept_id, - person_id; + procedure_occurrence.person_id, + observation_period_end_date; \ No newline at end of file diff --git a/man/synthesizePositiveControls.Rd b/man/synthesizePositiveControls.Rd index 1d63553..def6480 100644 --- a/man/synthesizePositiveControls.Rd +++ b/man/synthesizePositiveControls.Rd @@ -43,7 +43,7 @@ synthesizePositiveControls( precision = 0.01, outputIdOffset = 1000, workFolder = "./SignalInjectionTemp", - cdmVersion = "5", + cdmVersion = NULL, modelThreads = 1, generationThreads = 1 ) @@ -170,8 +170,7 @@ created?} \item{workFolder}{Path to a folder where intermediate data will be stored.} -\item{cdmVersion}{Define the OMOP CDM version used: currently support "4" and -"5".} +\item{cdmVersion}{DEPRECATED. This argument is ignored.} \item{modelThreads}{Number of parallel threads to use when fitting outcome models.}