From 52d89fccd24e9128244e86850681a182e6328b4e Mon Sep 17 00:00:00 2001 From: Admin_mschuemi Date: Thu, 19 Dec 2024 11:37:34 -0500 Subject: [PATCH] Refactoring code so connections do not stay open longer than strictly necessary (to avoid timeout errors on DataBricks). Increasing required version of FeatureExtraction to avoid warning about use of deprecated argument `oracleTempSchema`. Fixing nesting cohorts to end at end of observation. --- DESCRIPTION | 2 +- | 6 + R/Plots.R | 3 +- R/PositiveControlSynthesis.R | 802 +++++++++++++++---------- inst/sql/sql_server/NestingCohorts.sql | 23 +- man/synthesizePositiveControls.Rd | 5 +- 6 files changed, 526 insertions(+), 315 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index e4618b0..74084d1 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -2,7 +2,7 @@ Package: MethodEvaluation Type: Package Title: Package for Evaluation of Estimation Methods Version: 2.3.1 -Date: 2024-12-11 +Date: 2024-12-16 Authors@R: c( person("Martijn", "Schuemie", , "", role = c("aut", "cre")) ) diff --git a/ b/ index ac396ff..6929d75 100644 --- a/ +++ b/ @@ -5,6 +5,12 @@ Changes: 1. Updating Circe-generated SQL (to avoid errors on DataBricks). +2. Refactoring code so connections do not stay open longer than strictly necessary (to avoid timeout errors on DataBricks). + +3. Increasing required version of FeatureExtraction to avoid warning about use of deprecated argument `oracleTempSchema`. + +4. Fixing nesting cohorts to end at end of observation. + MethodEvaluation 2.3.0 ====================== diff --git a/R/Plots.R b/R/Plots.R index 2ee562c..0c271d7 100644 --- a/R/Plots.R +++ b/R/Plots.R @@ -167,7 +167,6 @@ plotCoverageInjectedSignals <- function(logRr, seLogRr, trueLogRr, region = 0.95 vizD$label <- paste(round(100 * vizD$fraction), "%", sep = "") vizD$group <- factor(vizD$group, levels = c("Below CI", "Within CI", "Above CI")) - theme <- ggplot2::element_text(colour = "#000000", size = 10) plot <- with(vizD, { ggplot2::ggplot(vizD, ggplot2::aes(x = as.factor(trueRr), y = fraction)) + ggplot2::geom_bar(ggplot2::aes(fill = group), stat = "identity", position = "stack", alpha = 0.8) + @@ -181,7 +180,7 @@ plotCoverageInjectedSignals <- function(logRr, seLogRr, trueLogRr, region = 0.95 panel.grid.major = ggplot2::element_blank(), axis.ticks = ggplot2::element_blank(), axis.text.y = ggplot2::element_blank(), - axis.text.x = theme, + axis.text.x = ggplot2::element_text(colour = "#000000", size = 10), legend.key = ggplot2::element_blank(), legend.position = "right" ) diff --git a/R/PositiveControlSynthesis.R b/R/PositiveControlSynthesis.R index feb7832..1b19653 100644 --- a/R/PositiveControlSynthesis.R +++ b/R/PositiveControlSynthesis.R @@ -117,8 +117,7 @@ #' @param outputTable The name of the table names that will contain the generated #' outcome cohorts. #' @param workFolder Path to a folder where intermediate data will be stored. -#' @param cdmVersion Define the OMOP CDM version used: currently support "4" and -#' "5". +#' @param cdmVersion DEPRECATED. This argument is ignored. #' @param modelThreads Number of parallel threads to use when fitting outcome #' models. #' @param generationThreads Number of parallel threads to use when generating outcomes. @@ -187,13 +186,16 @@ synthesizePositiveControls <- function(connectionDetails, precision = 0.01, outputIdOffset = 1000, workFolder = "./SignalInjectionTemp", - cdmVersion = "5", + cdmVersion = NULL, modelThreads = 1, generationThreads = 1) { if (!is.null(oracleTempSchema) && oracleTempSchema != "") { warning("The 'oracleTempSchema' argument is deprecated. Use 'tempEmulationSchema' instead.") tempEmulationSchema <- oracleTempSchema } + if (!is.null(cdmVersion) && cdmVersion != "") { + warning("The 'cdmVersion' argument is deprecated.") + } errorMessages <- checkmate::makeAssertCollection() checkmate::assertClass(connectionDetails, "ConnectionDetails", add = errorMessages) checkmate::assertCharacter(cdmDatabaseSchema, len = 1, add = errorMessages) @@ -222,7 +224,6 @@ synthesizePositiveControls <- function(connectionDetails, checkmate::assertNumeric(precision, lower = 0, len = 1, add = errorMessages) checkmate::assertInt(outputIdOffset, lower = 0, add = errorMessages) checkmate::assertCharacter(workFolder, len = 1, add = errorMessages) - checkmate::assertCharacter(cdmVersion, len = 1, add = errorMessages) checkmate::assertInt(modelThreads, lower = 1, add = errorMessages) checkmate::assertInt(generationThreads, lower = 1, add = errorMessages) checkmate::reportAssertions(collection = errorMessages) @@ -232,22 +233,105 @@ synthesizePositiveControls <- function(connectionDetails, if (modelType == "poisson" && addIntentToTreat) { stop("Intent to treat injection not yet implemented for Poisson models") } - if (cdmVersion == "4") { - stop("CDM version 4 is not supported") - } if (!grepl("start$|end$", endAnchor, = TRUE)) { stop("endAnchor should have value 'cohort start' or 'cohort end'") } - isEnd <- function(anchor) { - return(grepl("end$", anchor, = TRUE)) - } if (!file.exists(workFolder)) { dir.create(workFolder) } - exposuresFile <- file.path(workFolder, "exposures.rds") - outcomesFile <- file.path(workFolder, "outcomes.rds") - priorOutcomesFile <- file.path(workFolder, "priorOutcomes.rds") - countsFile <- file.path(workFolder, "counts.rds") + result <- loadDataFitModels(connectionDetails = connectionDetails, + cdmDatabaseSchema = cdmDatabaseSchema, + tempEmulationSchema = tempEmulationSchema, + exposureDatabaseSchema = exposureDatabaseSchema, + exposureTable = exposureTable, + outcomeDatabaseSchema = outcomeDatabaseSchema, + outcomeTable = outcomeTable, + exposureOutcomePairs = exposureOutcomePairs, + modelType = modelType, + minOutcomeCountForModel = minOutcomeCountForModel, + minModelCount = minModelCount, + covariateSettings = covariateSettings, + prior = prior, + control = control, + firstExposureOnly = firstExposureOnly, + washoutPeriod = washoutPeriod, + riskWindowStart = riskWindowStart, + riskWindowEnd = riskWindowEnd, + endAnchor = endAnchor, + firstOutcomeOnly = firstOutcomeOnly, + removePeopleWithPriorOutcomes = removePeopleWithPriorOutcomes, + maxSubjectsForModel = maxSubjectsForModel, + effectSizes = effectSizes, + outputIdOffset = outputIdOffset, + workFolder = workFolder, + modelThreads = modelThreads) + + result <- generateAllOutcomes(workFolder = workFolder, + generationThreads = generationThreads, + minOutcomeCountForInjection = minOutcomeCountForInjection, + removePeopleWithPriorOutcomes = removePeopleWithPriorOutcomes, + modelType = modelType, + effectSizes = effectSizes, + precision = precision, + addIntentToTreat = addIntentToTreat, + result = result) + + if (is.null(getOption("skipPositiveControlUpload")) || !getOption("skipPositiveControlUpload")) { + insertOutcomes(connectionDetails = connectionDetails, + cdmDatabaseSchema = cdmDatabaseSchema, + outputDatabaseSchema = outputDatabaseSchema, + outputTable = outputTable, + outcomeDatabaseSchema = outcomeDatabaseSchema, + outcomeTable = outcomeTable, + tempEmulationSchema = tempEmulationSchema, + createOutputTable = createOutputTable, + result = result) + } + + return(result) +} + +loadDataFitModels <- function(connectionDetails, + cdmDatabaseSchema, + tempEmulationSchema, + exposureDatabaseSchema, + exposureTable, + outcomeDatabaseSchema, + outcomeTable, + exposureOutcomePairs, + modelType, + minOutcomeCountForModel, + minModelCount, + covariateSettings, + prior, + control, + firstExposureOnly, + washoutPeriod, + riskWindowStart, + riskWindowEnd, + endAnchor, + firstOutcomeOnly, + removePeopleWithPriorOutcomes, + maxSubjectsForModel, + effectSizes, + outputIdOffset, + workFolder, + modelThreads){ + # Note: this is a big function, but the code shares several temp tables so + # need connection to stay alive. + + # Find all exposures for each outcome, then identify unique groups of exposures + group <- function(outcomeId) { + exposureIds <- exposureOutcomePairs$exposureId[exposureOutcomePairs$outcomeId == outcomeId] + return(exposureIds[order(exposureIds)]) + } + outcomeIds <- unique(exposureOutcomePairs$outcomeId) + groups <- purrr::map(unique(outcomeIds), group) + uniqueGroups <- unique(groups) + saveRDS(uniqueGroups, file.path(workFolder, "uniqueGroups.rds")) + outcomeIdToGroupId <- data.frame(outcomeIds = outcomeIds, groupIds = match(groups, uniqueGroups)) + saveRDS(outcomeIdToGroupId, file.path(workFolder, "outcomeIdToGroupId.rds")) + result <- tibble( exposureId = rep(exposureOutcomePairs$exposureId, each = length(effectSizes)), outcomeId = rep(exposureOutcomePairs$outcomeId, each = length(effectSizes)), @@ -260,46 +344,127 @@ synthesizePositiveControls <- function(connectionDetails, modelFolder = "", outcomesToInjectFile = "" ) + + connection <- DatabaseConnector::connect(connectionDetails) + on.exit(DatabaseConnector::disconnect(connection)) + + result <- loadDataForSynthesis(connection = connection, + cdmDatabaseSchema = cdmDatabaseSchema, + tempEmulationSchema = tempEmulationSchema, + exposureDatabaseSchema = exposureDatabaseSchema, + exposureTable = exposureTable, + outcomeDatabaseSchema = outcomeDatabaseSchema, + outcomeTable = outcomeTable, + exposureOutcomePairs = exposureOutcomePairs, + modelType = modelType, + firstExposureOnly = firstExposureOnly, + washoutPeriod = washoutPeriod, + riskWindowStart = riskWindowStart, + riskWindowEnd = riskWindowEnd, + endAnchor = endAnchor, + firstOutcomeOnly = firstOutcomeOnly, + removePeopleWithPriorOutcomes = removePeopleWithPriorOutcomes, + workFolder = workFolder, + result = result) + + getCovariatesForModels(connection = connection, + cdmDatabaseSchema = cdmDatabaseSchema, + tempEmulationSchema = tempEmulationSchema, + covariateSettings = covariateSettings, + maxSubjectsForModel = maxSubjectsForModel, + workFolder = workFolder, + uniqueGroups = uniqueGroups) + + result <- buildModels(outcomeIdToGroupId = outcomeIdToGroupId, + uniqueGroups = uniqueGroups, + exposureOutcomePairs = exposureOutcomePairs, + effectSizes = effectSizes, + minModelCount = minModelCount, + minOutcomeCountForModel = minOutcomeCountForModel, + workFolder = workFolder, + modelThreads = modelThreads, + removePeopleWithPriorOutcomes = removePeopleWithPriorOutcomes, + modelType = modelType, + prior = prior, + control = control, + result = result) + + getCovariatesOutsideSample(uniqueGroups = uniqueGroups, + outcomeIdToGroupId = outcomeIdToGroupId, + connection = connection, + cdmDatabaseSchema = cdmDatabaseSchema, + tempEmulationSchema = tempEmulationSchema, + covariateSettings = covariateSettings, + workFolder = workFolder, + result = result) + + sql <- "TRUNCATE TABLE #cohort_person; DROP TABLE #cohort_person;" + sql <- SqlRender::translate(sql, targetDialect = DatabaseConnector::dbms(connection), tempEmulationSchema = tempEmulationSchema) + DatabaseConnector::executeSql(connection, sql, progressBar = FALSE, reportOverallTime = FALSE) + + summaryFile <- file.path(workFolder, "summary.rds") + saveRDS(result, summaryFile) + return(result) +} +loadDataForSynthesis <- function(connection, + cdmDatabaseSchema, + tempEmulationSchema, + exposureDatabaseSchema, + exposureTable, + outcomeDatabaseSchema, + outcomeTable, + exposureOutcomePairs, + modelType, + firstExposureOnly, + washoutPeriod, + riskWindowStart, + riskWindowEnd, + endAnchor, + firstOutcomeOnly, + removePeopleWithPriorOutcomes, + workFolder, + result = result) { exposureIds <- unique(exposureOutcomePairs$exposureId) - - conn <- DatabaseConnector::connect(connectionDetails) - on.exit(DatabaseConnector::disconnect(conn)) - + + exposuresFile <- file.path(workFolder, "exposures.rds") + outcomesFile <- file.path(workFolder, "outcomes.rds") + priorOutcomesFile <- file.path(workFolder, "priorOutcomes.rds") + countsFile <- file.path(workFolder, "counts.rds") + # Create exposure cohorts ------------------------------------ - ParallelLogger::logInfo("\nCreating risk windows") renderedSql <- SqlRender::loadRenderTranslateSql("CreateExposedCohorts.sql", - packageName = "MethodEvaluation", - dbms = connectionDetails$dbms, - tempEmulationSchema = tempEmulationSchema, - cdm_database_schema = cdmDatabaseSchema, - exposure_ids = exposureIds, - washout_period = washoutPeriod, - exposure_database_schema = exposureDatabaseSchema, - exposure_table = exposureTable, - first_exposure_only = firstExposureOnly, - risk_window_start = riskWindowStart, - risk_window_end = riskWindowEnd, - add_exposure_days_to_end = isEnd(endAnchor) + packageName = "MethodEvaluation", + dbms = DatabaseConnector::dbms(connection), + tempEmulationSchema = tempEmulationSchema, + cdm_database_schema = cdmDatabaseSchema, + exposure_ids = exposureIds, + washout_period = washoutPeriod, + exposure_database_schema = exposureDatabaseSchema, + exposure_table = exposureTable, + first_exposure_only = firstExposureOnly, + risk_window_start = riskWindowStart, + risk_window_end = riskWindowEnd, + add_exposure_days_to_end = grepl("end$", endAnchor, = TRUE) ) - - DatabaseConnector::executeSql(conn, renderedSql) - + + DatabaseConnector::executeSql(connection, renderedSql) + # Get exposure cohorts from database ------------------------------- if (file.exists(exposuresFile)) { exposures <- readRDS(exposuresFile) } else { ParallelLogger::logInfo("\nRetrieving exposure cohorts") exposureSql <- SqlRender::loadRenderTranslateSql("GetExposedCohorts.sql", - packageName = "MethodEvaluation", - dbms = connectionDetails$dbms, - tempEmulationSchema = tempEmulationSchema + packageName = "MethodEvaluation", + dbms = DatabaseConnector::dbms(connection), + tempEmulationSchema = tempEmulationSchema ) - exposures <- DatabaseConnector::querySql(conn, exposureSql, snakeCaseToCamelCase = TRUE) + exposures <- DatabaseConnector::querySql(connection, exposureSql, snakeCaseToCamelCase = TRUE) exposures <- exposures[order(exposures$rowId), ] saveRDS(exposures, exposuresFile) } - + # Get prior outcomes (if needed) ------------------------------------ if (removePeopleWithPriorOutcomes) { if (file.exists(priorOutcomesFile)) { @@ -309,7 +474,7 @@ synthesizePositiveControls <- function(connectionDetails, table <- exposureOutcomePairs colnames(table) <- SqlRender::camelCaseToSnakeCase(colnames(table)) DatabaseConnector::insertTable( - connection = conn, + connection = connection, tableName = "#exposure_outcome", data = table, dropTableIfExists = TRUE, @@ -318,22 +483,22 @@ synthesizePositiveControls <- function(connectionDetails, tempEmulationSchema = tempEmulationSchema ) outcomeSql <- SqlRender::loadRenderTranslateSql("GetPriorOutcomes.sql", - packageName = "MethodEvaluation", - dbms = connectionDetails$dbms, - tempEmulationSchema = tempEmulationSchema, - cdm_database_schema = cdmDatabaseSchema, - outcome_database_schema = outcomeDatabaseSchema, - outcome_table = outcomeTable, - first_outcome_only = firstOutcomeOnly + packageName = "MethodEvaluation", + dbms = DatabaseConnector::dbms(connection), + tempEmulationSchema = tempEmulationSchema, + cdm_database_schema = cdmDatabaseSchema, + outcome_database_schema = outcomeDatabaseSchema, + outcome_table = outcomeTable, + first_outcome_only = firstOutcomeOnly ) - priorOutcomes <- DatabaseConnector::querySql(conn, outcomeSql, snakeCaseToCamelCase = TRUE) + priorOutcomes <- DatabaseConnector::querySql(connection, outcomeSql, snakeCaseToCamelCase = TRUE) saveRDS(priorOutcomes, priorOutcomesFile) sql <- "TRUNCATE TABLE #exposure_outcome; DROP TABLE #exposure_outcome;" - sql <- SqlRender::translate(sql, targetDialect = connectionDetails$dbms, tempEmulationSchema = tempEmulationSchema) - DatabaseConnector::executeSql(conn, sql, progressBar = FALSE, reportOverallTime = FALSE) + sql <- SqlRender::translate(sql, targetDialect = DatabaseConnector::dbms(connection), tempEmulationSchema = tempEmulationSchema) + DatabaseConnector::executeSql(connection, sql, progressBar = FALSE, reportOverallTime = FALSE) } } - + # Get outcomes --------------------------------------- if (file.exists(outcomesFile)) { outcomeCounts <- readRDS(outcomesFile) @@ -342,7 +507,7 @@ synthesizePositiveControls <- function(connectionDetails, table <- exposureOutcomePairs colnames(table) <- SqlRender::camelCaseToSnakeCase(colnames(table)) DatabaseConnector::insertTable( - connection = conn, + connection = connection, tableName = "#exposure_outcome", data = table, dropTableIfExists = TRUE, @@ -351,21 +516,21 @@ synthesizePositiveControls <- function(connectionDetails, tempEmulationSchema = tempEmulationSchema ) outcomeSql <- SqlRender::loadRenderTranslateSql("GetOutcomes.sql", - packageName = "MethodEvaluation", - dbms = connectionDetails$dbms, - tempEmulationSchema = tempEmulationSchema, - cdm_database_schema = cdmDatabaseSchema, - outcome_database_schema = outcomeDatabaseSchema, - outcome_table = outcomeTable, - first_outcome_only = firstOutcomeOnly + packageName = "MethodEvaluation", + dbms = DatabaseConnector::dbms(connection), + tempEmulationSchema = tempEmulationSchema, + cdm_database_schema = cdmDatabaseSchema, + outcome_database_schema = outcomeDatabaseSchema, + outcome_table = outcomeTable, + first_outcome_only = firstOutcomeOnly ) - outcomeCounts <- DatabaseConnector::querySql(conn, outcomeSql, snakeCaseToCamelCase = TRUE) + outcomeCounts <- DatabaseConnector::querySql(connection, outcomeSql, snakeCaseToCamelCase = TRUE) saveRDS(outcomeCounts, outcomesFile) sql <- "TRUNCATE TABLE #exposure_outcome; DROP TABLE #exposure_outcome;" - sql <- SqlRender::translate(sql, targetDialect = connectionDetails$dbms, tempEmulationSchema = tempEmulationSchema) - DatabaseConnector::executeSql(conn, sql, progressBar = FALSE, reportOverallTime = FALSE) + sql <- SqlRender::translate(sql, targetDialect = DatabaseConnector::dbms(connection), tempEmulationSchema = tempEmulationSchema) + DatabaseConnector::executeSql(connection, sql, progressBar = FALSE, reportOverallTime = FALSE) } - + # Generate counts for result object ------------------------------- if (file.exists(countsFile)) { result <- readRDS(countsFile) @@ -373,7 +538,7 @@ synthesizePositiveControls <- function(connectionDetails, ParallelLogger::logInfo("Computing counts per exposure - outcome pair") temp <- select(exposures, .data$rowId, .data$exposureId, .data$eraNumber) %>% inner_join(select(outcomeCounts, .data$rowId, .data$outcomeId, .data$y, .data$yItt), - by = "rowId" + by = "rowId" ) if (modelType == "survival") { temp <- temp %>% @@ -447,20 +612,17 @@ synthesizePositiveControls <- function(connectionDetails, ) saveRDS(result, countsFile) } + return(result) +} - # Build models ---------------------------- Find all exposures for each outcome, then identify unique - # groups of exposures - group <- function(outcomeId) { - exposureIds <- exposureOutcomePairs$exposureId[exposureOutcomePairs$outcomeId == outcomeId] - return(exposureIds[order(exposureIds)]) - } - outcomeIds <- unique(exposureOutcomePairs$outcomeId) - groups <- purrr::map(unique(outcomeIds), group) - uniqueGroups <- unique(groups) - saveRDS(uniqueGroups, file.path(workFolder, "uniqueGroups.rds")) - outcomeIdToGroupId <- data.frame(outcomeIds = outcomeIds, groupIds = match(groups, uniqueGroups)) - saveRDS(outcomeIdToGroupId, file.path(workFolder, "outcomeIdToGroupId.rds")) - +getCovariatesForModels <- function(connection, + cdmDatabaseSchema, + tempEmulationSchema, + covariateSettings, + maxSubjectsForModel, + workFolder, + uniqueGroups) { + # Fetch covariates for each group of exposures for (i in 1:length(uniqueGroups)) { covarFileName <- file.path(workFolder, sprintf("", i)) @@ -468,7 +630,7 @@ synthesizePositiveControls <- function(connectionDetails, cohortIds <- uniqueGroups[[i]] sql <- "SELECT COUNT(*) AS entries FROM (SELECT DISTINCT subject_id, cohort_start_date FROM #cohort_person WHERE cohort_definition_id IN (@cohort_ids)) tmp;" count <- DatabaseConnector::renderTranslateQuerySql( - connection = conn, + connection = connection, sql = sql, tempEmulationSchema = tempEmulationSchema, cohort_ids = cohortIds, @@ -477,17 +639,17 @@ synthesizePositiveControls <- function(connectionDetails, if (count > maxSubjectsForModel) { ParallelLogger::logInfo("Sampling exposed cohorts for model(s)") renderedSql <- SqlRender::loadRenderTranslateSql("SampleExposedCohorts.sql", - packageName = "MethodEvaluation", - dbms = connectionDetails$dbms, - tempEmulationSchema = tempEmulationSchema, - sample_size = maxSubjectsForModel, - cohort_ids = cohortIds + packageName = "MethodEvaluation", + dbms = DatabaseConnector::dbms(connection), + tempEmulationSchema = tempEmulationSchema, + sample_size = maxSubjectsForModel, + cohort_ids = cohortIds ) - DatabaseConnector::executeSql(conn, renderedSql) - + DatabaseConnector::executeSql(connection, renderedSql) + sql <- "SELECT row_id FROM #sampled_person" sampledRowIds <- DatabaseConnector::renderTranslateQuerySql( - connection = conn, + connection = connection, sql = sql, tempEmulationSchema = tempEmulationSchema, snakeCaseToCamelCase = TRUE @@ -498,7 +660,7 @@ synthesizePositiveControls <- function(connectionDetails, # Don't sample, just copy: sql <- "SELECT * INTO #sampled_person FROM #cohort_person WHERE cohort_definition_id IN (@cohort_ids);" DatabaseConnector::renderTranslateExecuteSql( - connection = conn, + connection = connection, sql = sql, tempEmulationSchema = tempEmulationSchema, cohort_ids = cohortIds, @@ -508,14 +670,13 @@ synthesizePositiveControls <- function(connectionDetails, } ParallelLogger::logInfo("Extracting covariates for fitting outcome model(s)") covariateData <- FeatureExtraction::getDbCovariateData( - connection = conn, + connection = connection, tempEmulationSchema = tempEmulationSchema, cdmDatabaseSchema = cdmDatabaseSchema, cohortTable = "#sampled_person", cohortTableIsTemp = TRUE, rowIdField = "row_id", covariateSettings = covariateSettings, - cdmVersion = cdmVersion, aggregated = FALSE ) covariateData <- FeatureExtraction::tidyCovariateData( @@ -524,10 +685,10 @@ synthesizePositiveControls <- function(connectionDetails, removeRedundancy = TRUE ) FeatureExtraction::saveCovariateData(covariateData, covarFileName) - + sql <- "TRUNCATE TABLE #sampled_person; DROP TABLE #sampled_person;" DatabaseConnector::renderTranslateExecuteSql( - connection = conn, + connection = connection, tempEmulationSchema = tempEmulationSchema, sql = sql, progressBar = FALSE, @@ -535,8 +696,26 @@ synthesizePositiveControls <- function(connectionDetails, ) } } +} +buildModels <- function(outcomeIdToGroupId, + uniqueGroups, + exposureOutcomePairs, + effectSizes, + minModelCount, + minOutcomeCountForModel, + workFolder, + modelThreads, + removePeopleWithPriorOutcomes, + modelType, + prior, + control, + result) { ParallelLogger::logInfo("Fitting outcome models") + exposuresFile <- file.path(workFolder, "exposures.rds") + outcomesFile <- file.path(workFolder, "outcomes.rds") + priorOutcomesFile <- file.path(workFolder, "priorOutcomes.rds") + tasks <- list() modelsWithEnoughOutcomes <- 0 outcomeIds <- unique(exposureOutcomePairs$outcomeId) @@ -574,14 +753,13 @@ synthesizePositiveControls <- function(connectionDetails, " controls." ) } - + if (length(tasks) > 0) { cluster <- ParallelLogger::makeCluster(modelThreads) ParallelLogger::clusterApply( cluster, tasks, fitModel, - result, exposuresFile, outcomesFile, priorOutcomesFile, @@ -592,9 +770,123 @@ synthesizePositiveControls <- function(connectionDetails, ) ParallelLogger::stopCluster(cluster) } + return(result) +} + +fitModel <- function(task, + exposuresFile, + outcomesFile, + priorOutcomesFile, + removePeopleWithPriorOutcomes, + modelType, + prior, + control) { + ParallelLogger::logInfo("Fitting model for outcome ", task$outcomeId) + exposures <- readRDS(exposuresFile) + + outcomes <- readRDS(outcomesFile) + outcomes <- outcomes[outcomes$outcomeId == task$outcomeId, ] + if (file.exists(task$sampledExposuresFile)) { + sampledExposures <- readRDS(task$sampledExposuresFile) + outcomes <- outcomes[outcomes$rowId %in% sampledExposures$rowId, ] + exposures <- exposures[exposures$rowId %in% sampledExposures$rowId, ] + } else { + exposures <- exposures[exposures$exposureId %in% task$groupExposureIds, ] + outcomes <- outcomes[outcomes$rowId %in% exposures$rowId, ] + } + # Dedupe exposures for model fitting, so we don't overfit: + exposures <- exposures[order(exposures$personId, exposures$cohortStartDate), ] + exposures <- exposures[!duplicated(exposures[, c("personId", "cohortStartDate")]), ] + + covariateData <- FeatureExtraction::loadCovariateData(task$covarFileName) + covariates <- covariateData$covariates %>% + filter(.data$rowId %in% local(exposures$rowId)) + + if (removePeopleWithPriorOutcomes) { + priorOutcomes <- readRDS(priorOutcomesFile) + removeRowIds <- priorOutcomes$rowId[priorOutcomes$outcomeId == task$outcomeId] + outcomes <- outcomes[!(outcomes$rowId %in% removeRowIds), ] + exposures <- exposures[!(exposures$rowId %in% removeRowIds), ] + covariates <- covariates %>% + filter(!.data$rowId %in% removeRowIds) + } + outcomes <- merge(exposures, outcomes[, c( + "rowId", + "y", + "timeToEvent" + )], by = c("rowId"), all.x = TRUE) + outcomes <- outcomes[order(outcomes$rowId), ] + outcomes$y[$y)] <- 0 + names(outcomes)[names(outcomes) == "daysAtRisk"] <- "time" + if (modelType == "survival") { + # For survival, time is either the time to the end of the risk window, or the event + outcomes$y[outcomes$y != 0] <- 1 + outcomes$time[outcomes$y != 0] <- outcomes$timeToEvent[outcomes$y != 0] + } + outcomes$time <- outcomes$time + 1 + + # Note: for survival, using Poisson regression with 1 outcome and censored time as equivalent of + # survival regression: + covariateData$outcomes <- outcomes + cyclopsData <- Cyclops::convertToCyclopsData(covariateData$outcomes, + covariates, + modelType = "pr", + quiet = TRUE + ) + fit <- tryCatch( + { + Cyclops::fitCyclopsModel(cyclopsData, prior = prior, control = control) + }, + error = function(e) { + e$message + } + ) + if (fit$return_flag != "SUCCESS") { + fit <- fit$return_flag + } + if (is.character(fit)) { + ParallelLogger::logInfo(paste("Unable to fit model for outcome", task$outcomeId, ":", fit)) + dir.create(task$modelFolder) + write.csv(fit, file.path(task$modelFolder, "Error.txt")) + } else { + betas <- coef(fit) + intercept <- tibble( + beta = betas[1], + id = bit64::as.integer64(0), + covariateName = "(Intercept)", + row.names = NULL + ) + betas <- betas[betas != 0] + if (length(betas) > 1) { + betas <- betas[2:length(betas)] + betas <- tibble(beta = betas, covariateId = bit64::as.integer64(attr(betas, "names"))) + betas <- betas %>% + inner_join( + covariateData$covariateRef %>% + collect() %>% + mutate(covariateId = bit64::as.integer64(.data$covariateId)), + by = "covariateId" + ) %>% + select(.data$beta, id = .data$covariateId, .data$covariateName) %>% + arrange(desc(abs(.data$beta))) + betas <- bind_rows(intercept, betas) + } else { + betas <- intercept + } + dir.create(task$modelFolder) + saveRDS(betas, file.path(task$modelFolder, "betas.rds")) + } +} - # Generate new outcomes ----------------------------------------- Fetch covariates for all rows if - # covars for model were based on a sample: +getCovariatesOutsideSample <- function(uniqueGroups, + outcomeIdToGroupId, + connection, + cdmDatabaseSchema, + tempEmulationSchema, + covariateSettings, + workFolder, + result) { + # Fetch covariates for all rows if covars for model were based on a sample for (i in 1:length(uniqueGroups)) { sampledExposuresFile <- file.path(workFolder, sprintf("sampledRowIds_g%s.rds", i)) covarFileName <- file.path(workFolder, sprintf("", i)) @@ -617,7 +909,7 @@ synthesizePositiveControls <- function(connectionDetails, } modelCovariateIds <- unique(modelCovariateIds) modelCovariateIds <- modelCovariateIds[modelCovariateIds != 0] - + ParallelLogger::logInfo(paste( "Number of unique covariates across outcome models:", length(modelCovariateIds) @@ -629,19 +921,18 @@ synthesizePositiveControls <- function(connectionDetails, sql <- "SELECT * INTO #selected_person FROM #cohort_person WHERE cohort_definition_id IN (@cohort_ids);" sql <- SqlRender::render(sql, cohort_ids = uniqueGroups[[i]]) sql <- SqlRender::translate(sql, - targetDialect = connectionDetails$dbms, - tempEmulationSchema = tempEmulationSchema + targetDialect = DatabaseConnector::dbms(connection), + tempEmulationSchema = tempEmulationSchema ) - DatabaseConnector::executeSql(conn, sql, progressBar = FALSE, reportOverallTime = FALSE) + DatabaseConnector::executeSql(connection, sql, progressBar = FALSE, reportOverallTime = FALSE) covariateData <- FeatureExtraction::getDbCovariateData( - connection = conn, + connection = connection, tempEmulationSchema = tempEmulationSchema, cdmDatabaseSchema = cdmDatabaseSchema, cohortTable = "#selected_person", cohortTableIsTemp = TRUE, rowIdField = "row_id", covariateSettings = covariateSettings, - cdmVersion = cdmVersion, aggregated = FALSE ) covariateData <- FeatureExtraction::tidyCovariateData( @@ -650,17 +941,33 @@ synthesizePositiveControls <- function(connectionDetails, removeRedundancy = FALSE ) FeatureExtraction::saveCovariateData(covariateData, covarFileName) - + sql <- "TRUNCATE TABLE #selected_person; DROP TABLE #selected_person;" sql <- SqlRender::translate(sql, - targetDialect = connectionDetails$dbms, - tempEmulationSchema = tempEmulationSchema + targetDialect = DatabaseConnector::dbms(connection), + tempEmulationSchema = tempEmulationSchema ) - DatabaseConnector::executeSql(conn, sql, progressBar = FALSE, reportOverallTime = FALSE) + DatabaseConnector::executeSql(connection, sql, progressBar = FALSE, reportOverallTime = FALSE) } } } +} + +generateAllOutcomes <- function(workFolder, + generationThreads, + minOutcomeCountForInjection, + removePeopleWithPriorOutcomes, + modelType, + effectSizes, + precision, + addIntentToTreat, + result) { + exposuresFile <- file.path(workFolder, "exposures.rds") + outcomesFile <- file.path(workFolder, "outcomes.rds") + priorOutcomesFile <- file.path(workFolder, "priorOutcomes.rds") + outcomeIdToGroupId <- readRDS(file.path(workFolder, "outcomeIdToGroupId.rds")) + ParallelLogger::logInfo("Generating outcomes") temp <- result[result$observedOutcomes > minOutcomeCountForInjection, c( "exposureId", @@ -706,195 +1013,9 @@ synthesizePositiveControls <- function(connectionDetails, ParallelLogger::stopCluster(cluster) result <-"rbind", results) } - - # Save summary ----------------------------------------------------------------- - summaryFile <- .createSummaryFileName(workFolder) - saveRDS(result, summaryFile) - - if (!is.null(getOption("skipPositiveControlUpload")) && getOption("skipPositiveControlUpload")) { - return(result) - } - - # Insert outcomes into database ------------------------------------------------ - ParallelLogger::logInfo("Inserting additional outcomes into database") - fileNames <- result$outcomesToInjectFile[result$outcomesToInjectFile != ""] - outcomesToInject <- lapply(fileNames, readRDS) - outcomesToInject <-"rbind", outcomesToInject) - colnames(outcomesToInject) <- SqlRender::camelCaseToSnakeCase(colnames(outcomesToInject)) - if (Sys.getenv("USE_MPP_BULK_LOAD") == "TRUE" || - Sys.getenv("DATABASE_CONNECTOR_BULK_UPLOAD") == "TRUE") { - tableName <- paste0( - outputDatabaseSchema, - ".temp_outcomes_", - paste(sample(letters, 5), collapse = "") - ) - tempTable <- FALSE - } else { - tableName <- "#temp_outcomes" - tempTable <- TRUE - } - - DatabaseConnector::insertTable( - connection = conn, - tableName = tableName, - data = outcomesToInject, - dropTableIfExists = TRUE, - createTable = TRUE, - tempTable = tempTable, - tempEmulationSchema = tempEmulationSchema, - progressBar = TRUE - ) - - toCopy <- result[result$modelFolder != "", c("outcomeId", "newOutcomeId")] - colnames(toCopy) <- SqlRender::camelCaseToSnakeCase(colnames(toCopy)) - DatabaseConnector::insertTable( - connection = conn, - tableName = "#to_copy", - data = toCopy, - dropTableIfExists = TRUE, - createTable = TRUE, - tempTable = TRUE, - tempEmulationSchema = tempEmulationSchema - ) - - ParallelLogger::logInfo("Copying negative control outcomes into database") - copySql <- SqlRender::loadRenderTranslateSql("CopyOutcomes.sql", - packageName = "MethodEvaluation", - dbms = connectionDetails$dbms, - cdm_database_schema = cdmDatabaseSchema, - tempEmulationSchema = tempEmulationSchema, - outcome_database_schema = outcomeDatabaseSchema, - outcome_table = outcomeTable, - output_database_schema = outputDatabaseSchema, - output_table = outputTable, - create_output_table = createOutputTable, - temp_outcomes_table = tableName - ) - - DatabaseConnector::executeSql(conn, copySql) - - sql <- "TRUNCATE TABLE #cohort_person; DROP TABLE #cohort_person;" - sql <- SqlRender::translate(sql, targetDialect = connectionDetails$dbms, tempEmulationSchema = tempEmulationSchema) - DatabaseConnector::executeSql(conn, sql, progressBar = FALSE, reportOverallTime = FALSE) - - sql <- "TRUNCATE TABLE #to_copy; DROP TABLE #to_copy;" - sql <- SqlRender::translate(sql, targetDialect = connectionDetails$dbms, tempEmulationSchema = tempEmulationSchema) - DatabaseConnector::executeSql(conn, sql, progressBar = FALSE, reportOverallTime = FALSE) - - sql <- "TRUNCATE TABLE @temp_outcomes_table; DROP TABLE @temp_outcomes_table;" - sql <- SqlRender::render(sql, temp_outcomes_table = tableName) - sql <- SqlRender::translate(sql, targetDialect = connectionDetails$dbms, tempEmulationSchema = tempEmulationSchema) - DatabaseConnector::executeSql(conn, sql, progressBar = FALSE, reportOverallTime = FALSE) - return(result) } -fitModel <- function(task, - result, - exposuresFile, - outcomesFile, - priorOutcomesFile, - removePeopleWithPriorOutcomes, - modelType, - prior, - control) { - ParallelLogger::logInfo("Fitting model for outcome ", task$outcomeId) - exposures <- readRDS(exposuresFile) - - outcomes <- readRDS(outcomesFile) - outcomes <- outcomes[outcomes$outcomeId == task$outcomeId, ] - if (file.exists(task$sampledExposuresFile)) { - sampledExposures <- readRDS(task$sampledExposuresFile) - outcomes <- outcomes[outcomes$rowId %in% sampledExposures$rowId, ] - exposures <- exposures[exposures$rowId %in% sampledExposures$rowId, ] - } else { - exposures <- exposures[exposures$exposureId %in% task$groupExposureIds, ] - outcomes <- outcomes[outcomes$rowId %in% exposures$rowId, ] - } - # Dedupe exposures for model fitting, so we don't overfit: - exposures <- exposures[order(exposures$personId, exposures$cohortStartDate), ] - exposures <- exposures[!duplicated(exposures[, c("personId", "cohortStartDate")]), ] - - covariateData <- FeatureExtraction::loadCovariateData(task$covarFileName) - covariates <- covariateData$covariates %>% - filter(.data$rowId %in% local(exposures$rowId)) - - if (removePeopleWithPriorOutcomes) { - priorOutcomes <- readRDS(priorOutcomesFile) - removeRowIds <- priorOutcomes$rowId[priorOutcomes$outcomeId == task$outcomeId] - outcomes <- outcomes[!(outcomes$rowId %in% removeRowIds), ] - exposures <- exposures[!(exposures$rowId %in% removeRowIds), ] - covariates <- covariates %>% - filter(!.data$rowId %in% removeRowIds) - } - outcomes <- merge(exposures, outcomes[, c( - "rowId", - "y", - "timeToEvent" - )], by = c("rowId"), all.x = TRUE) - outcomes <- outcomes[order(outcomes$rowId), ] - outcomes$y[$y)] <- 0 - names(outcomes)[names(outcomes) == "daysAtRisk"] <- "time" - if (modelType == "survival") { - # For survival, time is either the time to the end of the risk window, or the event - outcomes$y[outcomes$y != 0] <- 1 - outcomes$time[outcomes$y != 0] <- outcomes$timeToEvent[outcomes$y != 0] - } - outcomes$time <- outcomes$time + 1 - - # Note: for survival, using Poisson regression with 1 outcome and censored time as equivalent of - # survival regression: - covariateData$outcomes <- outcomes - cyclopsData <- Cyclops::convertToCyclopsData(covariateData$outcomes, - covariates, - modelType = "pr", - quiet = TRUE - ) - fit <- tryCatch( - { - Cyclops::fitCyclopsModel(cyclopsData, prior = prior, control = control) - }, - error = function(e) { - e$message - } - ) - if (fit$return_flag != "SUCCESS") { - fit <- fit$return_flag - } - if (is.character(fit)) { - ParallelLogger::logInfo(paste("Unable to fit model for outcome", task$outcomeId, ":", fit)) - dir.create(task$modelFolder) - write.csv(fit, file.path(task$modelFolder, "Error.txt")) - } else { - betas <- coef(fit) - intercept <- tibble( - beta = betas[1], - id = bit64::as.integer64(0), - covariateName = "(Intercept)", - row.names = NULL - ) - betas <- betas[betas != 0] - if (length(betas) > 1) { - betas <- betas[2:length(betas)] - betas <- tibble(beta = betas, covariateId = bit64::as.integer64(attr(betas, "names"))) - betas <- betas %>% - inner_join( - covariateData$covariateRef %>% - collect() %>% - mutate(covariateId = bit64::as.integer64(.data$covariateId)), - by = "covariateId" - ) %>% - select(.data$beta, id = .data$covariateId, .data$covariateName) %>% - arrange(desc(abs(.data$beta))) - betas <- bind_rows(intercept, betas) - } else { - betas <- intercept - } - dir.create(task$modelFolder) - saveRDS(betas, file.path(task$modelFolder, "betas.rds")) - } -} - generateOutcomes <- function(task, result, exposuresFile, @@ -1007,7 +1128,7 @@ generateOutcomes <- function(task, } } injectedRr <- 1 + (nrow(newOutcomes) / resultSubset$observedOutcomes[1]) - + # Count outcomes during first episodes: newOutcomeCountsFirstExposure <- sum(newOutcomeCounts[exposures$eraNumber == 1]) injectedRrFirstExposure <- 1 + @@ -1031,7 +1152,7 @@ generateOutcomes <- function(task, ", injected RR during ITT window =", injectedRrItt )) - + newOutcomeId <- resultSubset$newOutcomeId[resultSubset$targetEffectSize == effectSize] # Write new outcomes to file for later insertion into DB: if (nrow(newOutcomes) != 0) { @@ -1061,6 +1182,87 @@ generateOutcomes <- function(task, } } +insertOutcomes <- function(connectionDetails, + cdmDatabaseSchema, + outputDatabaseSchema, + outputTable, + outcomeDatabaseSchema, + outcomeTable, + tempEmulationSchema, + createOutputTable, + result) { + connection <- DatabaseConnector::connect(connectionDetails) + on.exit(DatabaseConnector::disconnect(connection)) + + # Insert outcomes into database ------------------------------------------------ + ParallelLogger::logInfo("Inserting additional outcomes into database") + fileNames <- result$outcomesToInjectFile[result$outcomesToInjectFile != ""] + outcomesToInject <- lapply(fileNames, readRDS) + outcomesToInject <-"rbind", outcomesToInject) + colnames(outcomesToInject) <- SqlRender::camelCaseToSnakeCase(colnames(outcomesToInject)) + if (Sys.getenv("USE_MPP_BULK_LOAD") == "TRUE" || + Sys.getenv("DATABASE_CONNECTOR_BULK_UPLOAD") == "TRUE") { + tableName <- paste0( + outputDatabaseSchema, + ".temp_outcomes_", + paste(sample(letters, 5), collapse = "") + ) + tempTable <- FALSE + } else { + tableName <- "#temp_outcomes" + tempTable <- TRUE + } + + DatabaseConnector::insertTable( + connection = connection, + tableName = tableName, + data = outcomesToInject, + dropTableIfExists = TRUE, + createTable = TRUE, + tempTable = tempTable, + tempEmulationSchema = tempEmulationSchema, + progressBar = TRUE + ) + + toCopy <- result[result$modelFolder != "", c("outcomeId", "newOutcomeId")] + colnames(toCopy) <- SqlRender::camelCaseToSnakeCase(colnames(toCopy)) + DatabaseConnector::insertTable( + connection = connection, + tableName = "#to_copy", + data = toCopy, + dropTableIfExists = TRUE, + createTable = TRUE, + tempTable = TRUE, + tempEmulationSchema = tempEmulationSchema + ) + + ParallelLogger::logInfo("Copying negative control outcomes into database") + copySql <- SqlRender::loadRenderTranslateSql("CopyOutcomes.sql", + packageName = "MethodEvaluation", + dbms = DatabaseConnector::dbms(connection), + cdm_database_schema = cdmDatabaseSchema, + tempEmulationSchema = tempEmulationSchema, + outcome_database_schema = outcomeDatabaseSchema, + outcome_table = outcomeTable, + output_database_schema = outputDatabaseSchema, + output_table = outputTable, + create_output_table = createOutputTable, + temp_outcomes_table = tableName + ) + + DatabaseConnector::executeSql(connection, copySql) + + sql <- "TRUNCATE TABLE #to_copy; DROP TABLE #to_copy;" + sql <- SqlRender::translate(sql, targetDialect = DatabaseConnector::dbms(connection), tempEmulationSchema = tempEmulationSchema) + DatabaseConnector::executeSql(connection, sql, progressBar = FALSE, reportOverallTime = FALSE) + + sql <- "TRUNCATE TABLE @temp_outcomes_table; DROP TABLE @temp_outcomes_table;" + sql <- SqlRender::render(sql, temp_outcomes_table = tableName) + sql <- SqlRender::translate(sql, targetDialect = DatabaseConnector::dbms(connection), tempEmulationSchema = tempEmulationSchema) + DatabaseConnector::executeSql(connection, sql, progressBar = FALSE, reportOverallTime = FALSE) +} + + injectSurvival <- function(exposures, effectSize, precision, addIntentToTreat) { hasOutcome <- exposures$y != 0 observedCount <- sum(hasOutcome) @@ -1121,7 +1323,7 @@ injectSurvival <- function(exposures, effectSize, precision, addIntentToTreat) { rateAfterFirstExposure <- (sum(firstHasOutcome[!firstHasNewOutcome]) + sum(firstHasNewOutcome)) / (sum(firstTime[!firstHasNewOutcome]) + sum(firstTimeToEvent[firstHasNewOutcome] + 1)) injectedRrFirstExposure <- rateAfterFirstExposure / rateBeforeFirstExposure } - + if (addIntentToTreat) { hasOutcomeItt <- exposures$yItt > 0 observedCountItt <- sum(hasOutcomeItt) @@ -1136,7 +1338,7 @@ injectSurvival <- function(exposures, effectSize, precision, addIntentToTreat) { hasNewOutcomeItt <- c() ParallelLogger::logTrace("Generating outcomes for intent-to-treat") while (round(abs(sum(hasNewOutcomeItt) + sum(hasNewOutcome) - correctedTargetCount)) > precision * - correctedTargetCount) { + correctedTargetCount) { timeToNewOutcomeItt <- round(rexp(nrow(exposures), multiplier * exposures$prediction * (effectSize - 1))) hasNewOutcomeItt <- timeToNewOutcomeItt > exposures$daysAtRisk & timeToNewOutcomeItt < survivalTimeIttNew # Correct for censored time and outcomes: @@ -1180,12 +1382,6 @@ injectSurvival <- function(exposures, effectSize, precision, addIntentToTreat) { )) } -.createSummaryFileName <- function(folder) { - name <- "summary" - name <- paste(name, ".rds", sep = "") - return(file.path(folder, name)) -} - .predict <- function(betas, exposures, covariates, modelType) { intercept <- betas$beta[1] if (nrow(betas) == 1) { @@ -1205,7 +1401,7 @@ injectSurvival <- function(exposures, effectSize, precision, addIntentToTreat) { head(10000) %>% pull(.data$covariateId) %>% is("integer64") - + if (!covariateIdIsInteger64) { betas$covariateId <- as.numeric(betas$covariateId) } @@ -1217,7 +1413,7 @@ injectSurvival <- function(exposures, effectSize, precision, addIntentToTreat) { select(.data$rowId, .data$value) %>% ungroup() %>% collect() - + prediction <- prediction %>% right_join(select(exposures, .data$rowId, .data$daysAtRisk), by = "rowId") %>% mutate(value = coalesce(.data$value, 0) + intercept) %>% diff --git a/inst/sql/sql_server/NestingCohorts.sql b/inst/sql/sql_server/NestingCohorts.sql index 2e89b98..1eed4d2 100644 --- a/inst/sql/sql_server/NestingCohorts.sql +++ b/inst/sql/sql_server/NestingCohorts.sql @@ -17,24 +17,35 @@ limitations under the License. ************************************************************************/ INSERT INTO @target_database_schema.@target_cohort_table (cohort_definition_id, subject_id, cohort_start_date, cohort_end_date) SELECT ancestor_concept_id AS cohort_definition_id, - condition_occurrence.person_id AS subject_id, - MIN(condition_start_date) AS cohort_start_date, - MIN(condition_start_date) AS cohort_end_date + condition_occurrence.person_id AS subject_id, + MIN(condition_start_date) AS cohort_start_date, + observation_period_end_date AS cohort_end_date FROM @cdm_database_schema.condition_occurrence INNER JOIN @cdm_database_schema.concept_ancestor ON condition_concept_id = descendant_concept_id +INNER JOIN @cdm_database_schema.observation_period + ON condition_occurrence.person_id = observation_period.person_id + AND condition_start_date >= observation_period_start_date + AND condition_start_date <= observation_period_end_date WHERE ancestor_concept_id IN (@nesting_ids) GROUP BY ancestor_concept_id, - person_id; + condition_occurrence.person_id, + observation_period_end_date; + INSERT INTO @target_database_schema.@target_cohort_table (cohort_definition_id, subject_id, cohort_start_date, cohort_end_date) SELECT ancestor_concept_id AS cohort_definition_id, procedure_occurrence.person_id AS subject_id, MIN(procedure_date) AS cohort_start_date, - MIN(procedure_date) AS cohort_end_date + observation_period_end_date AS cohort_end_date FROM @cdm_database_schema.procedure_occurrence INNER JOIN @cdm_database_schema.concept_ancestor ON procedure_concept_id = descendant_concept_id +INNER JOIN @cdm_database_schema.observation_period + ON procedure_occurrence.person_id = observation_period.person_id + AND procedure_date >= observation_period_start_date + AND procedure_date <= observation_period_end_date WHERE ancestor_concept_id IN (@nesting_ids) GROUP BY ancestor_concept_id, - person_id; + procedure_occurrence.person_id, + observation_period_end_date; \ No newline at end of file diff --git a/man/synthesizePositiveControls.Rd b/man/synthesizePositiveControls.Rd index 1d63553..def6480 100644 --- a/man/synthesizePositiveControls.Rd +++ b/man/synthesizePositiveControls.Rd @@ -43,7 +43,7 @@ synthesizePositiveControls( precision = 0.01, outputIdOffset = 1000, workFolder = "./SignalInjectionTemp", - cdmVersion = "5", + cdmVersion = NULL, modelThreads = 1, generationThreads = 1 ) @@ -170,8 +170,7 @@ created?} \item{workFolder}{Path to a folder where intermediate data will be stored.} -\item{cdmVersion}{Define the OMOP CDM version used: currently support "4" and -"5".} +\item{cdmVersion}{DEPRECATED. This argument is ignored.} \item{modelThreads}{Number of parallel threads to use when fitting outcome models.}