diff --git a/.github/workflows/R_CMD_check_Hades.yaml b/.github/workflows/R_CMD_check_Hades.yaml index fe6bcdf..7280c66 100644 --- a/.github/workflows/R_CMD_check_Hades.yaml +++ b/.github/workflows/R_CMD_check_Hades.yaml @@ -48,9 +48,20 @@ jobs: CDM5_REDSHIFT_PASSWORD: ${{ secrets.CDM5_REDSHIFT_PASSWORD }} CDM5_REDSHIFT_SERVER: ${{ secrets.CDM5_REDSHIFT_SERVER }} CDM5_REDSHIFT_USER: ${{ secrets.CDM5_REDSHIFT_USER }} + CDM_SNOWFLAKE_CDM53_SCHEMA: ${{ secrets.CDM_SNOWFLAKE_CDM53_SCHEMA }} + CDM_SNOWFLAKE_OHDSI_SCHEMA: ${{ secrets.CDM_SNOWFLAKE_OHDSI_SCHEMA }} + CDM_SNOWFLAKE_PASSWORD: ${{ secrets.CDM_SNOWFLAKE_PASSWORD }} + CDM_SNOWFLAKE_CONNECTION_STRING: ${{ secrets.CDM_SNOWFLAKE_CONNECTION_STRING }} + CDM_SNOWFLAKE_USER: ${{ secrets.CDM_SNOWFLAKE_USER }} CDM5_SPARK_USER: ${{ secrets.CDM5_SPARK_USER }} CDM5_SPARK_PASSWORD: ${{ secrets.CDM5_SPARK_PASSWORD }} CDM5_SPARK_CONNECTION_STRING: ${{ secrets.CDM5_SPARK_CONNECTION_STRING }} + CDM5_SPARK_CDM_SCHEMA: ${{ secrets.CDM5_SPARK_CDM_SCHEMA }} + CDM5_SPARK_OHDSI_SCHEMA: ${{ secrets.CDM5_SPARK_OHDSI_SCHEMA }} + CDM_BIG_QUERY_CONNECTION_STRING: ${{ secrets.CDM_BIG_QUERY_CONNECTION_STRING }} + CDM_BIG_QUERY_KEY_FILE: ${{ secrets.CDM_BIG_QUERY_KEY_FILE }} + CDM_BIG_QUERY_CDM_SCHEMA: ${{ secrets.CDM_BIG_QUERY_CDM_SCHEMA }} + CDM_BIG_QUERY_OHDSI_SCHEMA: ${{ secrets.CDM_BIG_QUERY_OHDSI_SCHEMA }} steps: - uses: actions/checkout@v3 diff --git a/DESCRIPTION b/DESCRIPTION index f9ef5fb..7790036 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,11 +1,12 @@ Package: Characterization Type: Package Title: Characterizations of Cohorts -Version: 0.2.0 -Date: 2024-04-03 +Version: 2.0.0 +Date: 2024-08-07 Authors@R: c( person("Jenna", "Reps", , "reps@ohdsi.org", role = c("aut", "cre")), - person("Patrick", "Ryan", , "ryan@ohdsi.org", role = c("aut")) + person("Patrick", "Ryan", , "ryan@ohdsi.org", role = c("aut")), + person("Chris", "Knoll", , "knoll@ohdsi.org", role = c("aut")) ) Maintainer: Jenna Reps Description: Various characterizations of target and outcome cohorts. @@ -17,9 +18,10 @@ Depends: Imports: Andromeda, DatabaseConnector (>= 6.3.1), - FeatureExtraction (>= 3.5.0), + FeatureExtraction (>= 3.6.0), SqlRender (>= 1.9.0), ParallelLogger (>= 3.0.0), + ResultModelManager, checkmate, dplyr, readr, @@ -31,16 +33,12 @@ Suggests: kableExtra, knitr, markdown, - ResultModelManager, + rmarkdown, ShinyAppBuilder, shiny, withr Remotes: - ohdsi/FeatureExtraction, - ohdsi/Eunomia, - ohdsi/ResultModelManager, - ohdsi/ShinyAppBuilder, - ohdsi/DatabaseConnector + ohdsi/ShinyAppBuilder NeedsCompilation: no RoxygenNote: 7.3.1 Encoding: UTF-8 diff --git a/NAMESPACE b/NAMESPACE index dfa7bce..2e3185b 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -1,6 +1,7 @@ # Generated by roxygen2: do not edit by hand -export(computeAggregateCovariateAnalyses) +export(cleanIncremental) +export(cleanNonIncremental) export(computeDechallengeRechallengeAnalyses) export(computeRechallengeFailCaseSeriesAnalyses) export(computeTimeToEventAnalyses) @@ -8,24 +9,17 @@ export(createAggregateCovariateSettings) export(createCharacterizationSettings) export(createCharacterizationTables) export(createDechallengeRechallengeSettings) +export(createDuringCovariateSettings) export(createSqliteDatabase) export(createTimeToEventSettings) -export(exportAggregateCovariateToCsv) -export(exportDatabaseToCsv) export(exportDechallengeRechallengeToCsv) export(exportRechallengeFailCaseSeriesToCsv) export(exportTimeToEventToCsv) -export(loadAggregateCovariateAnalyses) +export(getDbDuringCovariateData) +export(insertResultsToDatabase) export(loadCharacterizationSettings) -export(loadDechallengeRechallengeAnalyses) -export(loadRechallengeFailCaseSeriesAnalyses) -export(loadTimeToEventAnalyses) export(runCharacterizationAnalyses) -export(saveAggregateCovariateAnalyses) export(saveCharacterizationSettings) -export(saveDechallengeRechallengeAnalyses) -export(saveRechallengeFailCaseSeriesAnalyses) -export(saveTimeToEventAnalyses) export(viewCharacterization) importFrom(dplyr,"%>%") importFrom(rlang,.data) diff --git a/NEWS.md b/NEWS.md index c004996..076b428 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,3 +1,35 @@ +Characterization 2.0.0 +====================== +- added tests for all HADES supported dbms +- updated minCellCount censoring +- fixed issues with incremental +- made the code more modular to enable new characterizations to be added +- added job optimization code to optimize the distributuion of jobs +- fixed tests and made minor bug updates + +Characterization 1.0.0 +====================== +- Added parallelization in the aggregate covariates analysis +- Extact all results as csv files +- Removed sqlite result creation +- now using ResultModelManager to upload results into database + +Characterization 0.3.1 +====================== +- Removed DatabaseConnector from Remotes in DESCRIPTION. Fixes GitHb issue 38. +- Added check to covariateSettings input in createAggregateCovariateSettings to error if temporal is T +- adding during cohort covariate settings +- added a case covariate settings inputs to aggregate covariates +- added casePreTargetDuration and casePostTreatmentDuration integer inputs to aggregate covariates + +Characterization 0.3.0 +====================== +- Added new outcomeWashoutDays parameter to createAggregateCovariateSettings to remove outcome occurances that are continuations of a prior outcome occurrence +- Changed the way cohort definition ids are created in createAggregateCovariateSettings to use hash of target id, outcome id and type. This lets users combine different studies into a single result database. +- Added database migration capability and created new migrations for the recent updates. + + + Characterization 0.2.0 ====================== Updated dependency to FeatureExtraction (>= 3.5.0) to support minCharacterizationMean paramater. diff --git a/R/AggregateCovariates.R b/R/AggregateCovariates.R index 2f043f9..8a3c220 100644 --- a/R/AggregateCovariates.R +++ b/R/AggregateCovariates.R @@ -18,13 +18,15 @@ #' #' @param targetIds A list of cohortIds for the target cohorts #' @param outcomeIds A list of cohortIds for the outcome cohorts -#' @param minPriorObservation The minimum time in the database a patient in the target cohorts must be observed prior to index +#' @param minPriorObservation The minimum time (in days) in the database a patient in the target cohorts must be observed prior to index +#' @param outcomeWashoutDays Patients with the outcome within outcomeWashout days prior to index are excluded from the risk factor analysis #' @template timeAtRisk #' @param covariateSettings An object created using \code{FeatureExtraction::createCovariateSettings} -#' @param minCharacterizationMean The minimum mean value for characterization output. Values below this will be cut off from output. This -#' will help reduce the file size of the characterization output, but will remove information -#' on covariates that have very low values. The default is 0. -#' +#' @param caseCovariateSettings An object created using \code{createDuringCovariateSettings} +#' @param casePreTargetDuration The number of days prior to case index we use for FeatureExtraction +#' @param casePostOutcomeDuration The number of days prior to case index we use for FeatureExtraction +#' @param extractNonCaseCovariates Whether to extract aggregate covariates and counts for patients in the targets and outcomes in addition to the cases +#' @family {Aggregate} #' @return #' A list with the settings #' @@ -33,12 +35,53 @@ createAggregateCovariateSettings <- function( targetIds, outcomeIds, minPriorObservation = 0, + outcomeWashoutDays = 0, riskWindowStart = 1, startAnchor = "cohort start", riskWindowEnd = 365, endAnchor = "cohort start", - covariateSettings, - minCharacterizationMean = 0) { + covariateSettings = FeatureExtraction::createCovariateSettings( + useDemographicsGender = T, + useDemographicsAge = T, + useDemographicsAgeGroup = T, + useDemographicsRace = T, + useDemographicsEthnicity = T, + useDemographicsIndexYear = T, + useDemographicsIndexMonth = T, + useDemographicsTimeInCohort = T, + useDemographicsPriorObservationTime = T, + useDemographicsPostObservationTime = T, + useConditionGroupEraLongTerm = T, + useDrugGroupEraOverlapping = T, + useDrugGroupEraLongTerm = T, + useProcedureOccurrenceLongTerm = T, + useMeasurementLongTerm = T, + useObservationLongTerm = T, + useDeviceExposureLongTerm = T, + useVisitConceptCountLongTerm = T, + useConditionGroupEraShortTerm = T, + useDrugGroupEraShortTerm = T, + useProcedureOccurrenceShortTerm = T, + useMeasurementShortTerm = T, + useObservationShortTerm = T, + useDeviceExposureShortTerm = T, + useVisitConceptCountShortTerm = T, + endDays = 0, + longTermStartDays = -365, + shortTermStartDays = -30 + ), + caseCovariateSettings = createDuringCovariateSettings( + useConditionGroupEraDuring = T, + useDrugGroupEraDuring = T, + useProcedureOccurrenceDuring = T, + useDeviceExposureDuring = T, + useMeasurementDuring = T, + useObservationDuring = T, + useVisitConceptCountDuring = T + ), + casePreTargetDuration = 365, + casePostOutcomeDuration = 365, + extractNonCaseCovariates = T) { errorMessages <- checkmate::makeAssertCollection() # check targetIds is a vector of int/double .checkCohortIds( @@ -53,7 +96,10 @@ createAggregateCovariateSettings <- function( errorMessages = errorMessages ) - # check TAR + # check TAR - EFF edit + if (length(riskWindowStart) > 1) { + stop("Please add one time-at-risk per setting") + } .checkTimeAtRisk( riskWindowStart = riskWindowStart, startAnchor = startAnchor, @@ -68,47 +114,66 @@ createAggregateCovariateSettings <- function( errorMessages = errorMessages ) + # check temporal is false + if (inherits(covariateSettings, "covariateSettings")) { + covariateSettings <- list(covariateSettings) + } + if (sum(unlist(lapply(covariateSettings, function(x) { + x$temporal + }))) > 0) { + stop("Temporal covariateSettings not supported by createAggregateCovariateSettings()") + } + # check minPriorObservation .checkMinPriorObservation( minPriorObservation = minPriorObservation, errorMessages = errorMessages ) + # add check for outcomeWashoutDays + checkmate::reportAssertions(errorMessages) + # check unique Ts and Os + if (length(targetIds) != length(unique(targetIds))) { + message("targetIds have duplicates - making unique") + targetIds <- unique(targetIds) + } + if (length(outcomeIds) != length(unique(outcomeIds))) { + message("outcomeIds have duplicates - making unique") + outcomeIds <- unique(outcomeIds) + } + + # create list result <- list( targetIds = targetIds, - outcomeIds = outcomeIds, minPriorObservation = minPriorObservation, + outcomeIds = outcomeIds, + outcomeWashoutDays = outcomeWashoutDays, riskWindowStart = riskWindowStart, startAnchor = startAnchor, riskWindowEnd = riskWindowEnd, endAnchor = endAnchor, - covariateSettings = covariateSettings, - minCharacterizationMean = minCharacterizationMean + covariateSettings = covariateSettings, # risk factors + caseCovariateSettings = caseCovariateSettings, # case series + casePreTargetDuration = casePreTargetDuration, # case series + casePostOutcomeDuration = casePostOutcomeDuration, # case series, + + extractNonCaseCovariates = extractNonCaseCovariates ) class(result) <- "aggregateCovariateSettings" return(result) } -#' Compute aggregate covariate study -#' -#' @template ConnectionDetails -#' @param cdmDatabaseSchema The schema with the OMOP CDM data -#' @param cdmVersion The version of the OMOP CDM -#' @template TargetOutcomeTables -#' @template TempEmulationSchema -#' @param aggregateCovariateSettings The settings for the AggregateCovariate study -#' @param databaseId Unique identifier for the database (string) -#' @param runId Unique identifier for the tar and covariate setting -#' -#' @return -#' The descriptive results for each target cohort in the settings. -#' -#' @export -computeAggregateCovariateAnalyses <- function( +createExecutionIds <- function(size) { + executionIds <- gsub(" ", "", gsub("[[:punct:]]", "", paste(Sys.time(), sample(1000000, size), sep = ""))) + return(executionIds) +} + +# TODO cdmVersion should be in runChar +computeTargetAggregateCovariateAnalyses <- function( connectionDetails = NULL, cdmDatabaseSchema, cdmVersion = 5, @@ -117,12 +182,36 @@ computeAggregateCovariateAnalyses <- function( outcomeDatabaseSchema = targetDatabaseSchema, # remove outcomeTable = targetTable, # remove tempEmulationSchema = getOption("sqlRenderTempEmulationSchema"), - aggregateCovariateSettings, + settings, databaseId = "database 1", - runId = 1) { - # check inputs + outputFolder = file.path(getwd(), "characterization_results"), + minCharacterizationMean = 0, + minCellCount = 0, + ...) { + # get settings + settingId <- unique(settings$settingId) + targetIds <- unique(settings$targetId) + minPriorObservation <- settings$minPriorObservation + covariateSettings <- settings$covariateSettings - start <- Sys.time() + # create cohortDetails - all Ts, minPriorObservation, twice (type = Tall, Target) + cohortDetails <- data.frame( + settingId = settingId, + targetCohortId = rep(targetIds, 2), + outcomeCohortId = 0, # cannot be NA due to pk/index + cohortType = c(rep("Target", length(targetIds)), rep("Tall", length(targetIds))), + cohortDefinitionId = 1:(length(targetIds) * 2), + minPriorObservation = minPriorObservation, + outcomeWashoutDays = NA, + casePreTargetDuration = NA, + casePostOutcomeDuration = NA, + riskWindowStart = NA, + startAnchor = NA, + riskWindowEnd = NA, + endAnchor = NA, + covariateSettingJson = covariateSettings, + caseCovariateSettingJson = NA + ) connection <- DatabaseConnector::connect( connectionDetails = connectionDetails @@ -131,22 +220,54 @@ computeAggregateCovariateAnalyses <- function( DatabaseConnector::disconnect(connection) ) - # select T, O, create TnO, TnOc, Onprior T - # into temp table #agg_cohorts - createCohortsOfInterest( + # create the temp table with cohort_details + DatabaseConnector::insertTable( + data = cohortDetails[, c("settingId", "targetCohortId", "outcomeCohortId", "cohortType", "cohortDefinitionId")], + camelCaseToSnakeCase = T, connection = connection, + tableName = "#cohort_details", + tempTable = T, + dropTableIfExists = T, + createTable = T, + progressBar = F + ) + + message("Computing aggregate target cohorts") + start <- Sys.time() + + sql <- SqlRender::loadRenderTranslateSql( + sqlFilename = "TargetCohorts.sql", + packageName = "Characterization", dbms = connectionDetails$dbms, - cdmDatabaseSchema = cdmDatabaseSchema, - aggregateCovariateSettings, - targetDatabaseSchema, - targetTable, - outcomeDatabaseSchema, - outcomeTable, - tempEmulationSchema + cdm_database_schema = cdmDatabaseSchema, + tempEmulationSchema = tempEmulationSchema, + target_database_schema = targetDatabaseSchema, + target_table = targetTable, + target_ids = paste(targetIds, collapse = ",", sep = ","), + min_prior_observation = minPriorObservation + ) + + DatabaseConnector::executeSql( + connection = connection, + sql = sql, + progressBar = FALSE, + reportOverallTime = FALSE ) + completionTime <- Sys.time() - start + message(paste0("Computing target cohorts took ", round(completionTime, digits = 1), " ", units(completionTime))) ## get counts - sql <- "select cohort_definition_id, count(*) row_count, count(distinct subject_id) person_count from #agg_cohorts group by cohort_definition_id;" + message("Extracting target cohort counts") + sql <- "select + cohort_definition_id, + count(*) row_count, + count(distinct subject_id) person_count, + min(datediff(day, cohort_start_date, cohort_end_date)) min_exposure_time, + avg(datediff(day, cohort_start_date, cohort_end_date)) mean_exposure_time, + max(datediff(day, cohort_start_date, cohort_end_date)) max_exposure_time + from + (select * from #agg_cohorts_before union select * from #agg_cohorts_extras) temp + group by cohort_definition_id;" sql <- SqlRender::translate( sql = sql, targetDialect = connectionDetails$dbms @@ -157,136 +278,799 @@ computeAggregateCovariateAnalyses <- function( snakeCaseToCamelCase = T, ) - message("Computing aggregate covariate results") + message("Computing aggregate target covariate results") result <- FeatureExtraction::getDbCovariateData( connection = connection, oracleTempSchema = tempEmulationSchema, cdmDatabaseSchema = cdmDatabaseSchema, - cohortTable = "#agg_cohorts", + cohortTable = "#agg_cohorts_before", cohortTableIsTemp = T, - cohortId = -1, - covariateSettings = aggregateCovariateSettings$covariateSettings, + cohortIds = -1, + covariateSettings = ParallelLogger::convertJsonToSettings(covariateSettings), cdmVersion = cdmVersion, aggregated = T, - minCharacterizationMean = aggregateCovariateSettings$minCharacterizationMean - ) - # adding counts as a new table - result$cohortCounts <- counts - - # add databaseId and runId to each table in results - # could add settings table with this and just have setting id - # as single extra column? - - for (tableName in names(result)) { - result[[tableName]] <- result[[tableName]] %>% - dplyr::mutate( - runId = !!runId, - databaseId = !!databaseId - ) %>% - dplyr::relocate( - "databaseId", - "runId" - ) - } - - # cohort details: - - result$cohortDetails <- DatabaseConnector::querySql( - connection = connection, - sql = SqlRender::translate( - sql = " select * from #cohort_details;", - targetDialect = connectionDetails$dbms - ), - snakeCaseToCamelCase = T - ) %>% - dplyr::mutate( - runId = !!runId, - databaseId = !!databaseId - ) %>% - dplyr::relocate( - "databaseId", - "runId" - ) - - # settings: - # run_id, database_id, covariate_setting_json, - # riskWindowStart, startAnchor, riskWindowEnd, endAnchor - - covariateSettingsJson <- as.character( - ParallelLogger::convertSettingsToJson( - aggregateCovariateSettings$covariateSettings - ) - ) - - result$settings <- data.frame( - runId = runId, - databaseId = databaseId, - covariateSettingJson = covariateSettingsJson, - riskWindowStart = aggregateCovariateSettings$riskWindowStart, - startAnchor = aggregateCovariateSettings$startAnchor, - riskWindowEnd = aggregateCovariateSettings$riskWindowEnd, - endAnchor = aggregateCovariateSettings$endAnchor + minCharacterizationMean = minCharacterizationMean ) + # drop temp tables + message("Dropping temp tables") sql <- SqlRender::loadRenderTranslateSql( - sqlFilename = "DropAggregateCovariate.sql", + sqlFilename = "DropTargetCovariate.sql", packageName = "Characterization", dbms = connectionDetails$dbms, tempEmulationSchema = tempEmulationSchema ) - DatabaseConnector::executeSql( connection = connection, sql = sql, progressBar = FALSE, reportOverallTime = FALSE ) - return(result) + # export all results to csv files + exportAndromedaToCsv( + andromeda = result, + outputFolder = outputFolder, + cohortDetails = cohortDetails, + counts = counts, + databaseId = databaseId, + minCharacterizationMean = minCharacterizationMean, + minCellCount = minCellCount + ) + + return(invisible(T)) } -createCohortsOfInterest <- function( - connection, +computeCaseAggregateCovariateAnalyses <- function( + connectionDetails = NULL, cdmDatabaseSchema, - dbms, - aggregateCovariateSettings, + cdmVersion = 5, targetDatabaseSchema, targetTable, - outcomeDatabaseSchema, - outcomeTable, - tempEmulationSchema) { + outcomeDatabaseSchema = targetDatabaseSchema, # remove + outcomeTable = targetTable, # remove + tempEmulationSchema = getOption("sqlRenderTempEmulationSchema"), + settings, + databaseId = "database 1", + outputFolder = file.path(getwd(), "characterization_results"), + minCharacterizationMean = 0, + minCellCount = 0, + ...) { + # check inputs + + # create cohortDetails - all Ts, minPriorObservation, twice (type = Tall, Target) + + + # get settings + targetIds <- unique(settings$targetId) + outcomeIds <- unique(settings$outcomeId) + minPriorObservation <- settings$minPriorObservation + outcomeWashoutDays <- settings$outcomeWashoutDays + casePreTargetDuration <- settings$casePreTargetDuration + casePostOutcomeDuration <- settings$casePostOutcomeDuration + covariateSettings <- settings$covariateSettings # json + caseCovariateSettings <- settings$caseCovariateSettings # json + + # 'cohortType' + cohortDetails <- expand.grid( + targetCohortId = unique(settings$targetId), + outcomeCohortId = unique(settings$outcomeId), + cohortType = c("Cases", "CasesBefore", "CasesAfter", "CasesBetween") + ) + + cohortDetails$minPriorObservation <- settings$minPriorObservation + cohortDetails$outcomeWashoutDays <- settings$outcomeWashoutDays + cohortDetails$casePreTargetDuration <- settings$casePreTargetDuration + cohortDetails$casePostOutcomeDuration <- settings$casePostOutcomeDuration + cohortDetails$covariateSettingJson <- settings$covariateSettings + cohortDetails$caseCovariateSettingJson <- settings$caseCovariateSettings + + tars <- settings$tar + tars$settingId <- unique(settings$settingId) + + # add executionIds + cohortDetails <- merge(cohortDetails, tars) + # moved id so different tars have different id? + cohortDetails$cohortDefinitionId <- 1:nrow(cohortDetails) + + # add 'Exclude' with random tar + cohortDetailsExtra <- expand.grid( + targetCohortId = unique(settings$targetId), + outcomeCohortId = unique(settings$outcomeId), + cohortType = "Exclude", + minPriorObservation = settings$minPriorObservation, + outcomeWashoutDays = settings$outcomeWashoutDays, + casePreTargetDuration = settings$casePreTargetDuration, + casePostOutcomeDuration = settings$casePostOutcomeDuration, + covariateSettingJson = settings$covariateSettings, + caseCovariateSettingJson = settings$caseCovariateSettings, + riskWindowStart = tars$riskWindowStart[1], + startAnchor = tars$startAnchor[1], + riskWindowEnd = tars$riskWindowEnd[1], + endAnchor = tars$endAnchor[1], + settingId = settings$settingId[1] + ) + cohortDetailsExtra$cohortDefinitionId <- max(cohortDetails$cohortDefinitionId) + (1:nrow(cohortDetailsExtra)) + cohortDetails <- rbind(cohortDetails, cohortDetailsExtra[colnames(cohortDetails)]) + + connection <- DatabaseConnector::connect( + connectionDetails = connectionDetails + ) + on.exit( + DatabaseConnector::disconnect(connection) + ) + + # create the temp table with cohort_details + DatabaseConnector::insertTable( + data = cohortDetails[, c("targetCohortId", "outcomeCohortId", "cohortType", "cohortDefinitionId", "settingId")], + camelCaseToSnakeCase = T, + connection = connection, + tableName = "#cohort_details", + tempTable = T, + dropTableIfExists = T, + createTable = T, + progressBar = F + ) + + message("Computing aggregate case covariate cohorts") + start <- Sys.time() + + # this is run for all tars sql <- SqlRender::loadRenderTranslateSql( - sqlFilename = "createTargetOutcomeCombinations.sql", + sqlFilename = "CaseCohortsPart1.sql", packageName = "Characterization", - dbms = dbms, + dbms = connectionDetails$dbms, cdm_database_schema = cdmDatabaseSchema, tempEmulationSchema = tempEmulationSchema, target_database_schema = targetDatabaseSchema, target_table = targetTable, + target_ids = paste(targetIds, collapse = ",", sep = ","), outcome_database_schema = outcomeDatabaseSchema, outcome_table = outcomeTable, - target_ids = paste(aggregateCovariateSettings$targetIds, collapse = ",", sep = ","), - outcome_ids = paste(aggregateCovariateSettings$outcomeIds, collapse = ",", sep = ","), - min_prior_observation = aggregateCovariateSettings$minPriorObservation, - tar_start = aggregateCovariateSettings$riskWindowStart, - tar_start_anchor = ifelse( - aggregateCovariateSettings$startAnchor == "cohort start", - "cohort_start_date", - "cohort_end_date" - ), - tar_end = aggregateCovariateSettings$riskWindowEnd, - tar_end_anchor = ifelse( - aggregateCovariateSettings$endAnchor == "cohort start", - "cohort_start_date", - "cohort_end_date" - ) + outcome_ids = paste(outcomeIds, collapse = ",", sep = ","), + min_prior_observation = minPriorObservation, + outcome_washout_days = outcomeWashoutDays ) - DatabaseConnector::executeSql( connection = connection, sql = sql, progressBar = FALSE, reportOverallTime = FALSE ) + + # extract the excluded people into excluded_covariates, excluded_covariates_continuous, + # excluded_analysis_ref, excluded_covariate_ref + + # loop over settingId which contains tars: + for (i in 1:nrow(tars)) { + sql <- SqlRender::loadRenderTranslateSql( + sqlFilename = "CaseCohortsPart2.sql", + packageName = "Characterization", + dbms = connectionDetails$dbms, + tempEmulationSchema = tempEmulationSchema, + first = i == 1, + case_pre_target_duration = casePreTargetDuration, + case_post_outcome_duration = casePostOutcomeDuration, + setting_id = tars$settingId[i], + tar_start = tars$riskWindowStart[i], + tar_start_anchor = ifelse(tars$startAnchor[i] == "cohort start", "cohort_start_date", "cohort_end_date"), ## TODO change? + tar_end = tars$riskWindowEnd[i], + tar_end_anchor = ifelse(tars$endAnchor[i] == "cohort start", "cohort_start_date", "cohort_end_date") ## TODO change? + ) + DatabaseConnector::executeSql( + connection = connection, + sql = sql, + progressBar = FALSE, + reportOverallTime = FALSE + ) + } + completionTime <- Sys.time() - start + + message(paste0("Computing case cohorts took ", round(completionTime, digits = 1), " ", units(completionTime))) + + ## get counts + message("Extracting case cohort counts") + sql <- "select + cohort_definition_id, + count(*) row_count, + count(distinct subject_id) person_count, + min(datediff(day, cohort_start_date, cohort_end_date)) min_exposure_time, + avg(datediff(day, cohort_start_date, cohort_end_date)) mean_exposure_time, + max(datediff(day, cohort_start_date, cohort_end_date)) max_exposure_time + from #cases + group by cohort_definition_id;" + sql <- SqlRender::translate( + sql = sql, + targetDialect = connectionDetails$dbms + ) + counts <- DatabaseConnector::querySql( + connection = connection, + sql = sql, + snakeCaseToCamelCase = T, + ) + + message("Computing aggregate before case covariate results") + + result <- FeatureExtraction::getDbCovariateData( + connection = connection, + oracleTempSchema = tempEmulationSchema, + cdmDatabaseSchema = cdmDatabaseSchema, + cohortTable = "#cases", + cohortTableIsTemp = T, + cohortIds = -1, + covariateSettings = ParallelLogger::convertJsonToSettings(covariateSettings), + cdmVersion = cdmVersion, + aggregated = T, + minCharacterizationMean = minCharacterizationMean + ) + + message("Computing aggregate during case covariate results") + + result2 <- tryCatch( + { + FeatureExtraction::getDbCovariateData( + connection = connection, + oracleTempSchema = tempEmulationSchema, + cdmDatabaseSchema = cdmDatabaseSchema, + cohortTable = "#case_series", + cohortTableIsTemp = T, + cohortIds = -1, + covariateSettings = ParallelLogger::convertJsonToSettings(caseCovariateSettings), + cdmVersion = cdmVersion, + aggregated = T, + minCharacterizationMean = minCharacterizationMean + ) + }, + error = function(e) { + message(e) + return(NULL) + } + ) + if (is.null(result2)) { + stop("Issue with case series data extraction") + } + + # drop temp tables + message("Dropping temp tables") + sql <- SqlRender::loadRenderTranslateSql( + sqlFilename = "DropCaseCovariate.sql", + packageName = "Characterization", + dbms = connectionDetails$dbms, + tempEmulationSchema = tempEmulationSchema + ) + DatabaseConnector::executeSql( + connection = connection, + sql = sql, progressBar = FALSE, + reportOverallTime = FALSE + ) + + # export all results to csv files + message("Exporting results to csv") + exportAndromedaToCsv( # TODO combine export of result and result2 + andromeda = result, + outputFolder = outputFolder, + cohortDetails = cohortDetails, + counts = counts, + databaseId = databaseId, + minCharacterizationMean = minCharacterizationMean, + minCellCount = minCellCount + ) + exportAndromedaToCsv( + andromeda = result2, + outputFolder = outputFolder, + cohortDetails = cohortDetails, + counts = NULL, # previously added + databaseId = databaseId, + minCharacterizationMean = minCharacterizationMean, + includeSettings = F, + minCellCount = minCellCount + ) + + return(invisible(T)) +} + + +exportAndromedaToCsv <- function( + andromeda, + outputFolder, + cohortDetails, + counts, + databaseId, + minCharacterizationMean, + batchSize = 100000, + minCellCount = 0, + includeSettings = T) { + saveLocation <- outputFolder + if (!dir.exists(saveLocation)) { + dir.create(saveLocation, recursive = T) + } + + ids <- data.frame( + settingId = unique(cohortDetails$settingId), + databaseId = databaseId + ) + + # analysis_ref and covariate_ref + # add database_id and setting_id + if (!is.null(andromeda$analysisRef)) { + Andromeda::batchApply( + tbl = andromeda$analysisRef, + fun = function(x) { + data <- merge(x, ids) + colnames(data) <- SqlRender::camelCaseToSnakeCase(colnames(data)) + + if (file.exists(file.path(saveLocation, "analysis_ref.csv"))) { + append <- T + } else { + append <- F + } + readr::write_csv( + x = formatDouble(data), + file = file.path(saveLocation, "analysis_ref.csv"), + append = append + ) + }, + batchSize = batchSize + ) + } + + if (!is.null(andromeda$covariateRef)) { + Andromeda::batchApply( + tbl = andromeda$covariateRef, + fun = function(x) { + data <- merge(x, ids) + colnames(data) <- SqlRender::camelCaseToSnakeCase(colnames(data)) + + if (file.exists(file.path(saveLocation, "covariate_ref.csv"))) { + append <- T + } else { + append <- F + } + readr::write_csv( + x = formatDouble(data), + file = file.path(saveLocation, "covariate_ref.csv"), + append = append + ) + }, + batchSize = batchSize + ) + } + + # covariates and covariate_continuous + extras <- cohortDetails[, c("cohortDefinitionId", "settingId", "targetCohortId", "outcomeCohortId", "cohortType")] + extras$databaseId <- databaseId + extras$minCharacterizationMean <- minCharacterizationMean + + # add database_id, setting_id, target_cohort_id, outcome_cohort_id and cohort_type + if (!is.null(andromeda$covariates)) { + Andromeda::batchApply( + tbl = andromeda$covariates, + fun = function(x) { + data <- merge(x, extras, by = "cohortDefinitionId") + data <- data %>% dplyr::select(-"cohortDefinitionId") + colnames(data) <- SqlRender::camelCaseToSnakeCase(colnames(data)) + + # censor minCellCount columns sum_value + removeInd <- data$sum_value < minCellCount + if (sum(removeInd) > 0) { + ParallelLogger::logInfo(paste0("Removing sum_value counts less than ", minCellCount)) + if (sum(removeInd) > 0) { + data$sum_value[removeInd] <- -1 * minCellCount + # adding other calculated columns + data$average_value[removeInd] <- NA + } + } + + if (file.exists(file.path(saveLocation, "covariates.csv"))) { + append <- T + } else { + append <- F + } + readr::write_csv( + x = formatDouble(data), + file = file.path(saveLocation, "covariates.csv"), + append = append + ) + }, + batchSize = batchSize + ) + } + + if (!is.null(andromeda$covariatesContinuous)) { + Andromeda::batchApply( + tbl = andromeda$covariatesContinuous, + fun = function(x) { + data <- merge(x, extras %>% dplyr::select(-"minCharacterizationMean"), by = "cohortDefinitionId") + data <- data %>% dplyr::select(-"cohortDefinitionId") + colnames(data) <- SqlRender::camelCaseToSnakeCase(colnames(data)) + + # count_value + removeInd <- data$count_value < minCellCount + if (sum(removeInd) > 0) { + ParallelLogger::logInfo(paste0("Removing count_value counts less than ", minCellCount)) + if (sum(removeInd) > 0) { + data$count_value[removeInd] <- -1 * minCellCount + # adding columns calculated from count + data$min_value[removeInd] <- NA + data$max_value[removeInd] <- NA + data$average_value[removeInd] <- NA + data$standard_deviation[removeInd] <- NA + data$median_value[removeInd] <- NA + data$p_10_value[removeInd] <- NA + data$p_25_value[removeInd] <- NA + data$p_75_value[removeInd] <- NA + data$p_90_value[removeInd] <- NA + } + } + + if (file.exists(file.path(saveLocation, "covariates_continuous.csv"))) { + append <- T + } else { + append <- F + } + readr::write_csv( + x = formatDouble(data), + file = file.path(saveLocation, "covariates_continuous.csv"), + append = append + ) + }, + batchSize = batchSize + ) + } + + # cohort_counts: + if (!is.null(counts)) { + cohortCounts <- cohortDetails %>% + dplyr::select( + "targetCohortId", + "outcomeCohortId", + "cohortType", + "cohortDefinitionId", + "riskWindowStart", + "riskWindowEnd", + "startAnchor", + "endAnchor", + "minPriorObservation", + "outcomeWashoutDays" + ) %>% + dplyr::mutate( + databaseId = !!databaseId + ) %>% + dplyr::inner_join(counts, by = "cohortDefinitionId") %>% + dplyr::select(-"cohortDefinitionId") + cohortCounts <- unique(cohortCounts) + colnames(cohortCounts) <- SqlRender::camelCaseToSnakeCase(colnames(cohortCounts)) + + # TODO apply minCellCount to columns row_count, person_count + removeInd <- cohortCounts$row_count < minCellCount + if (sum(removeInd) > 0) { + ParallelLogger::logInfo(paste0("Removing row_count counts less than ", minCellCount)) + if (sum(removeInd) > 0) { + cohortCounts$row_count[removeInd] <- -1 * minCellCount + } + } + removeInd <- cohortCounts$person_count < minCellCount + if (sum(removeInd) > 0) { + ParallelLogger::logInfo(paste0("Removing person_count counts less than ", minCellCount)) + if (sum(removeInd) > 0) { + cohortCounts$person_count[removeInd] <- -1 * minCellCount + } + } + + if (file.exists(file.path(saveLocation, "cohort_counts.csv"))) { + append <- T + } else { + append <- F + } + readr::write_csv( + x = formatDouble(cohortCounts), + file = file.path(saveLocation, "cohort_counts.csv"), + append = append + ) + } + + if (includeSettings) { + settings <- cohortDetails %>% + dplyr::select( + "settingId", "minPriorObservation", "outcomeWashoutDays", + "riskWindowStart", "riskWindowEnd", "startAnchor", "endAnchor", + "casePreTargetDuration", "casePostOutcomeDuration", + "covariateSettingJson", "caseCovariateSettingJson" + ) %>% + dplyr::mutate(databaseId = !!databaseId) %>% + dplyr::distinct() + colnames(settings) <- SqlRender::camelCaseToSnakeCase(colnames(settings)) + + # add setting.csv with cohortDetails plus database + readr::write_csv( + x = settings, + file = file.path(saveLocation, "settings.csv"), + append = F + ) + + cohortDetails <- cohortDetails %>% + dplyr::select( + "settingId", "targetCohortId", + "outcomeCohortId", "cohortType" + ) %>% + dplyr::mutate(databaseId = !!databaseId) %>% + dplyr::distinct() + colnames(cohortDetails) <- SqlRender::camelCaseToSnakeCase(colnames(cohortDetails)) + + # add cohort_details.csv with cohortDetails plus database + readr::write_csv( + x = cohortDetails, + file = file.path(saveLocation, "cohort_details.csv"), + append = F + ) + } + + return(invisible(T)) +} + + + + +combineCovariateSettingsJsons <- function(covariateSettingsJsonList) { + # get unique + covariateSettingsJsonList <- unique(covariateSettingsJsonList) + + # first convert from json + covariateSettings <- lapply( + X = covariateSettingsJsonList, + FUN = function(x) { + ParallelLogger::convertJsonToSettings(x) + } + ) + + # then combine the covariates + singleSettings <- which(unlist(lapply(covariateSettings, function(x) inherits(x, "covariateSettings")))) + multipleSettings <- which(unlist(lapply(covariateSettings, function(x) inherits(x, "list")))) + + covariateSettingList <- list() + if (length(singleSettings) > 0) { + for (i in singleSettings) { + covariateSettingList[[length(covariateSettingList) + 1]] <- covariateSettings[[i]] + } + } + if (length(multipleSettings) > 0) { + for (i in multipleSettings) { + settingList <- covariateSettings[[i]] + for (j in 1:length(settingList)) { + if (inherits(settingList[[j]], "covariateSettings")) { + covariateSettingList[[length(covariateSettingList) + 1]] <- settingList[[j]] + } else { + message("Incorrect covariate settings found") # stop? + } + } + } + } + + # check for covariates with same id but different + endDays <- unique(unlist(lapply(covariateSettingList, function(x) { + x$endDays + }))) + if (length(endDays) > 1) { + stop("Covariate settings for aggregate covariates using different end days") + } + longTermStartDays <- unique(unlist(lapply(covariateSettingList, function(x) { + x$longTermStartDays + }))) + if (length(longTermStartDays) > 1) { + stop("Covariate settings for aggregate covariates using different longTermStartDays") + } + mediumTermStartDays <- unique(unlist(lapply(covariateSettingList, function(x) { + x$mediumTermStartDays + }))) + if (length(mediumTermStartDays) > 1) { + stop("Covariate settings for aggregate covariates using different mediumTermStartDays") + } + shortTermStartDays <- unique(unlist(lapply(covariateSettingList, function(x) { + x$shortTermStartDays + }))) + if (length(shortTermStartDays) > 1) { + stop("Covariate settings for aggregate covariates using different shortTermStartDays") + } + + # convert to json + covariateSettingList <- as.character(ParallelLogger::convertSettingsToJson(covariateSettingList)) + return(covariateSettingList) +} + +getAggregateCovariatesJobs <- function( + characterizationSettings, + threads) { + characterizationSettings <- characterizationSettings$aggregateCovariateSettings + if (length(characterizationSettings) == 0) { + return(NULL) + } + ind <- 1:length(characterizationSettings) + + + # target combinations + targetCombinations <- do.call( + what = "rbind", + args = + lapply( + 1:length(characterizationSettings), + function(i) { + if (characterizationSettings[[i]]$extractNonCaseCovariates) { + result <- data.frame( + targetIds = c( + characterizationSettings[[i]]$targetIds, + characterizationSettings[[i]]$outcomeIds + ), + minPriorObservation = characterizationSettings[[i]]$minPriorObservation, + covariateSettingsJson = as.character(ParallelLogger::convertSettingsToJson(characterizationSettings[[i]]$covariateSettings)) + ) + return(result) + } else { + return( + data.frame(targetIds = 1, minPriorObservation = 1, covariateSettingsJson = 1)[-1, ] + ) + } + } + ) + ) + + if (nrow(targetCombinations) > 0) { + threadCols <- c("targetIds") + settingCols <- c("minPriorObservation") + + # thread split + threadSettings <- targetCombinations %>% + dplyr::select(dplyr::all_of(threadCols)) %>% + dplyr::distinct() + threadSettings$thread <- rep(1:threads, ceiling(nrow(threadSettings) / threads))[1:nrow(threadSettings)] + targetCombinations <- merge(targetCombinations, threadSettings, by = threadCols) + + executionSettings <- data.frame( + minPriorObservation = unique(targetCombinations$minPriorObservation) + ) + executionSettings$settingId <- createExecutionIds(nrow(executionSettings)) + targetCombinations <- merge(targetCombinations, executionSettings, by = settingCols) + + # recreate settings + settings <- c() + for (settingId in unique(executionSettings$settingId)) { + settingVal <- executionSettings %>% + dplyr::filter(.data$settingId == !!settingId) %>% + dplyr::select(dplyr::all_of(settingCols)) + + restrictedData <- targetCombinations %>% + dplyr::inner_join(settingVal, by = settingCols) + + for (i in unique(restrictedData$thread)) { + ind <- restrictedData$thread == i + settings <- rbind( + settings, + data.frame( + functionName = "computeTargetAggregateCovariateAnalyses", + settings = as.character(ParallelLogger::convertSettingsToJson( + list( + targetIds = unique(restrictedData$targetId[ind]), + minPriorObservation = unique(restrictedData$minPriorObservation[ind]), + covariateSettingsJson = combineCovariateSettingsJsons(as.list(restrictedData$covariateSettingsJson[ind])), + settingId = settingId + ) + )), + executionFolder = paste("tac", i, paste(settingVal, collapse = "_"), sep = "_"), + jobId = paste("tac", i, paste(settingVal, collapse = "_"), sep = "_") + ) + ) + } + } + } else { + settings <- c() + } + + # adding a delay so the target and case setting ids are always unique + Sys.sleep(time = 2) + + # get all combinations of TnOs, then split by treads + caseCombinations <- do.call( + what = "rbind", + args = + lapply( + 1:length(characterizationSettings), + function(i) { + result <- expand.grid( + targetId = characterizationSettings[[i]]$targetIds, + outcomeId = characterizationSettings[[i]]$outcomeIds + ) + result$minPriorObservation <- characterizationSettings[[i]]$minPriorObservation + result$outcomeWashoutDays <- characterizationSettings[[i]]$outcomeWashoutDays + + result$riskWindowStart <- characterizationSettings[[i]]$riskWindowStart + result$startAnchor <- characterizationSettings[[i]]$startAnchor + result$riskWindowEnd <- characterizationSettings[[i]]$riskWindowEnd + result$endAnchor <- characterizationSettings[[i]]$endAnchor + + result$casePreTargetDuration <- characterizationSettings[[i]]$casePreTargetDuration + result$casePostOutcomeDuration <- characterizationSettings[[i]]$casePostOutcomeDuration + + result$covariateSettingsJson <- as.character(ParallelLogger::convertSettingsToJson(characterizationSettings[[i]]$covariateSettings)) + result$caseCovariateSettingsJson <- as.character(ParallelLogger::convertSettingsToJson(characterizationSettings[[i]]$caseCovariateSettings)) + return(result) + } + ) + ) + + # create executionIds + settingCols <- c( + "minPriorObservation", "outcomeWashoutDays", + "casePreTargetDuration", "casePostOutcomeDuration", + "riskWindowStart", "startAnchor", + "riskWindowEnd", "endAnchor" + ) + executionSettings <- unique(caseCombinations[, settingCols]) + executionSettings$settingId <- createExecutionIds(nrow(executionSettings)) + caseCombinations <- merge(caseCombinations, executionSettings, by = settingCols) + + # create thread split + threadCombinations <- caseCombinations %>% + dplyr::select( + "targetId", + "minPriorObservation", + "outcomeWashoutDays", + "casePreTargetDuration", + "casePostOutcomeDuration" + ) %>% + dplyr::distinct() + threadCombinations$thread <- rep(1:threads, ceiling(nrow(threadCombinations) / threads))[1:nrow(threadCombinations)] + caseCombinations <- merge(caseCombinations, threadCombinations, by = c( + "targetId", + "minPriorObservation", + "outcomeWashoutDays", + "casePreTargetDuration", + "casePostOutcomeDuration" + )) + + executionCols <- c( + "minPriorObservation", "outcomeWashoutDays", + "casePreTargetDuration", "casePostOutcomeDuration" + ) + executions <- unique(caseCombinations[, executionCols]) + + # now create the settings + for (j in 1:nrow(executions)) { + settingVal <- executions[j, ] + + restrictedData <- caseCombinations %>% + dplyr::inner_join(settingVal, by = executionCols) + + for (i in unique(restrictedData$thread)) { + ind <- restrictedData$thread == i + settings <- rbind( + settings, + data.frame( + functionName = "computeCaseAggregateCovariateAnalyses", + settings = as.character(ParallelLogger::convertSettingsToJson( + list( + targetIds = unique(restrictedData$targetId[ind]), + outcomeIds = unique(restrictedData$outcomeId[ind]), + minPriorObservation = unique(restrictedData$minPriorObservation[ind]), + outcomeWashoutDays = unique(restrictedData$outcomeWashoutDays[ind]), + tar = unique(data.frame( + riskWindowStart = restrictedData$riskWindowStart[ind], + startAnchor = restrictedData$startAnchor[ind], + riskWindowEnd = restrictedData$riskWindowEnd[ind], + endAnchor = restrictedData$endAnchor[ind] + )), + casePreTargetDuration = unique(restrictedData$casePreTargetDuration[ind]), + casePostOutcomeDuration = unique(restrictedData$casePostOutcomeDuration[ind]), + covariateSettingsJson = combineCovariateSettingsJsons(as.list(restrictedData$covariateSettingsJson[ind])), + caseCovariateSettingsJson = combineCovariateSettingsJsons(as.list(restrictedData$caseCovariateSettingsJson[ind])), + settingIds = unique(restrictedData$settingId[ind]) + ) + )), + executionFolder = paste("cac", i, paste0(settingVal, collapse = "_"), sep = "_"), + jobId = paste("cac", i, paste0(settingVal, collapse = "_"), sep = "_") + ) + ) + } + } + + return(settings) } diff --git a/R/CustomCovariates.R b/R/CustomCovariates.R new file mode 100644 index 0000000..df9b8e4 --- /dev/null +++ b/R/CustomCovariates.R @@ -0,0 +1,407 @@ +#' Create during covariate settings +#' +#' @details +#' creates an object specifying how during covariates should be constructed from data in the CDM model. +#' +#' @param useConditionOccurrenceDuring One covariate per condition in the +#' condition_occurrence table starting between +#' cohort start and cohort end. (analysis ID 109) +#' @param useConditionOccurrencePrimaryInpatientDuring One covariate per condition observed as +#' a primary diagnosis in an inpatient +#' setting in the condition_occurrence table starting between +#' cohort start and cohort end. (analysis ID 110) +#' @param useConditionEraDuring One covariate per condition in the condition_era table +#' starting between cohort start and cohort end. +#' (analysis ID 217) +#' @param useConditionGroupEraDuring One covariate per condition era rolled +#' up to groups in the condition_era table +#' starting between cohort start and cohort end. +#' (analysis ID 218) +#' @param useDrugExposureDuring One covariate per drug in the drug_exposure table between cohort start and end. +#' (analysisId 305) +#' @param useDrugEraDuring One covariate per drug in the drug_era table between cohort start and end. +#' (analysis ID 417) +#' @param useDrugGroupEraDuring One covariate per drug rolled up to ATC groups in the drug_era table between cohort start and end. +#' (analysis ID 418) +#' @param useProcedureOccurrenceDuring One covariate per procedure in the procedure_occurrence table between cohort start and end. +#' (analysis ID 505) +#' @param useDeviceExposureDuring One covariate per device in the device exposure table starting between cohort start and end. +#' (analysis ID 605) +#' @param useMeasurementDuring One covariate per measurement in the measurement table between cohort start and end. +#' (analysis ID 713) +#' @param useObservationDuring One covariate per observation in the observation table between cohort start and end. +#' (analysis ID 805) +#' @param useVisitCountDuring The number of visits observed between cohort start and end. +#' (analysis ID 926) +#' @param useVisitConceptCountDuring The number of visits observed between cohort start and end, stratified by visit concept ID. +#' (analysis ID 927) +#' @param includedCovariateConceptIds A list of concept IDs that should be +#' used to construct covariates. +#' @param addDescendantsToInclude Should descendant concept IDs be added +#' to the list of concepts to include? +#' @param excludedCovariateConceptIds A list of concept IDs that should NOT be +#' used to construct covariates. +#' @param addDescendantsToExclude Should descendant concept IDs be added +#' to the list of concepts to exclude? +#' @param includedCovariateIds A list of covariate IDs that should be +#' restricted to. +#' @family {CovariateSetting} +#' +#' @return +#' An object of type \code{covariateSettings}, to be used in other functions. +#' +#' @examples +#' settings <- createDuringCovariateSettings( +#' useConditionOccurrenceDuring = TRUE, +#' useConditionOccurrencePrimaryInpatientDuring = FALSE, +#' useConditionEraDuring = FALSE, +#' useConditionGroupEraDuring = FALSE +#' ) +#' +#' @export +createDuringCovariateSettings <- function( + useConditionOccurrenceDuring = F, + useConditionOccurrencePrimaryInpatientDuring = F, + useConditionEraDuring = F, + useConditionGroupEraDuring = F, + useDrugExposureDuring = F, + useDrugEraDuring = F, + useDrugGroupEraDuring = F, + useProcedureOccurrenceDuring = F, + useDeviceExposureDuring = F, + useMeasurementDuring = F, + useObservationDuring = F, + useVisitCountDuring = F, + useVisitConceptCountDuring = F, + includedCovariateConceptIds = c(), + addDescendantsToInclude = F, + excludedCovariateConceptIds = c(), + addDescendantsToExclude = F, + includedCovariateIds = c()) { + covariateSettings <- list( + temporal = FALSE, # needed? + temporalSequence = FALSE + ) + formalNames <- names(formals(createDuringCovariateSettings)) + anyUseTrue <- FALSE + for (name in formalNames) { + value <- get(name) + if (is.null(value)) { + value <- vector() + } + if (grepl("use.*", name)) { + if (value) { + covariateSettings[[sub("use", "", name)]] <- value + anyUseTrue <- TRUE + } + } else { + covariateSettings[[name]] <- value + } + } + if (!anyUseTrue) { + stop("No covariate analysis selected. Must select at least one") + } + + attr(covariateSettings, "fun") <- "Characterization::getDbDuringCovariateData" + class(covariateSettings) <- "covariateSettings" + return(covariateSettings) +} + +#' Extracts covariates that occur during a cohort +#' +#' @details +#' The user specifies a what during covariates they want and this executes them using FE +#' +#' @param connection The database connection +#' @param oracleTempSchema The temp schema if using oracle +#' @param cdmDatabaseSchema The schema of the OMOP CDM data +#' @param cdmVersion version of the OMOP CDM data +#' @param cohortTable the table name that contains the target population cohort +#' @param rowIdField string representing the unique identifier in the target population cohort +#' @param aggregated whether the covariate should be aggregated +#' @param cohortIds cohort id for the target cohort +#' @param covariateSettings settings for the covariate cohorts and time periods +#' @param minCharacterizationMean the minimum value for a covariate to be extracted +#' @param ... additional arguments from FeatureExtraction +#' @family {CovariateSetting} +#' @return +#' The the during covariates based on user settings +#' +#' @export +getDbDuringCovariateData <- function( + connection, + oracleTempSchema = NULL, + cdmDatabaseSchema, + cdmVersion = "5", + cohortTable = "#cohort_person", + rowIdField = "subject_id", + aggregated = T, + cohortIds = c(-1), + covariateSettings, + minCharacterizationMean = 0, + ...) { + writeLines("Constructing during cohort covariates") + if (!aggregated) { + stop("Only aggregation supported") + } + + getDomainSettings <- utils::read.csv(system.file("csv/PrespecAnalyses.csv", package = "Characterization")) + + + # create Tables + sql <- "DROP TABLE IF EXISTS #cov_ref; + CREATE TABLE #cov_ref( + covariate_id bigint, + covariate_name varchar(1000), + analysis_id int, + concept_id bigint, + value_as_concept_id int, + collisions int + );" + sql <- SqlRender::translate( + sql = sql, + targetDialect = DatabaseConnector::dbms(connection) + ) + DatabaseConnector::executeSql(connection, sql = sql) + + sql <- "DROP TABLE IF EXISTS #analysis_ref; + CREATE TABLE #analysis_ref( + analysis_id int, + analysis_name varchar(100), + domain_id varchar(100), + start_day varchar(100), + end_day varchar(100), + is_binary varchar(1), + missing_means_zero varchar(1) + );" + sql <- SqlRender::translate( + sql = sql, + targetDialect = DatabaseConnector::dbms(connection) + ) + DatabaseConnector::executeSql(connection, sql) + + # included covariates + includedCovTable <- "" + if (length(covariateSettings$includedCovariateIds) > 0) { + # create- + includedCovTable <- "#included_cov" + DatabaseConnector::insertTable( + connection = connection, + tableName = includedCovTable, + dropTableIfExists = T, + createTable = T, + tempTable = T, + data = data.frame(id = covariateSettings$includedCovariateIds), + camelCaseToSnakeCase = T + ) + } + + # including concept ids + includedConceptTable <- "" + if (length(covariateSettings$includedCovariateConceptIds) > 0) { + includedConceptTable <- "#include_concepts" + DatabaseConnector::insertTable( + connection = connection, + tableName = includedConceptTable, + dropTableIfExists = T, + createTable = T, + tempTable = T, + data = data.frame(id = covariateSettings$includedCovariateConceptIds), + camelCaseToSnakeCase = T + ) + + if (covariateSettings$addDescendantsToInclude) { + SqlRender::loadRenderTranslateSql( + sqlFilename = "IncludeDescendants.sql", + packageName = "Characterization", + dbms = DatabaseConnector::dbms(connection), + table_name = includedConceptTable, + cdm_database_schema = cdmDatabaseSchema + ) + } + } + + # exlcuding concept ids + excludedConceptTable <- "" + if (length(covariateSettings$excludedCovariateConceptIds) > 0) { + excludedConceptTable <- "#exclude_concepts" + DatabaseConnector::insertTable( + connection = connection, + tableName = excludedConceptTable, + dropTableIfExists = T, + createTable = T, + tempTable = T, + data = data.frame(id = covariateSettings$excludedCovariateConceptIds), + camelCaseToSnakeCase = T + ) + + if (covariateSettings$addDescendantsToInclude) { + SqlRender::loadRenderTranslateSql( + sqlFilename = "IncludeDescendants.sql", + packageName = "Characterization", + dbms = DatabaseConnector::dbms(connection), + table_name = excludedConceptTable, + cdm_database_schema = cdmDatabaseSchema + ) + } + } + + domainSettingsIndexes <- which(getDomainSettings$analysisName %in% names(covariateSettings)) + i <- 0 + binaryInd <- c() + continuousInd <- c() + useBinary <- F + useContinuous <- F + result <- Andromeda::andromeda() + + for (domainSettingsIndex in domainSettingsIndexes) { + i <- i + 1 + + if (getDomainSettings$isBinary[domainSettingsIndex] == "Y") { + binaryInd <- c(i, binaryInd) + useBinary <- T + } else { + continuousInd <- c(i, continuousInd) + useContinuous <- T + } + # Load template sql and fill + sql <- SqlRender::loadRenderTranslateSql( + sqlFilename = getDomainSettings$sqlFileName[domainSettingsIndex], + packageName = "Characterization", + dbms = attr(connection, "dbms"), + cohort_table = cohortTable, + # cohort_ids = cohortIds, + cohort_definition_id = cohortIds, # added? + row_id_field = rowIdField, + cdm_database_schema = cdmDatabaseSchema, + aggregated = aggregated, + sub_type = getDomainSettings$subType[domainSettingsIndex], + analysis_id = getDomainSettings$analysisId[domainSettingsIndex], + analysis_name = getDomainSettings$analysisName[domainSettingsIndex], + domain_table = getDomainSettings$domainTable[domainSettingsIndex], + domain_start_date = getDomainSettings$domainStartDate[domainSettingsIndex], + domain_end_date = getDomainSettings$domainEndDate[domainSettingsIndex], + domain_concept_id = getDomainSettings$domainConceptId[domainSettingsIndex], + domain_id = getDomainSettings$domainId[domainSettingsIndex], + included_cov_table = includedCovTable, + included_concept_table = includedConceptTable, + excluded_concept_table = excludedConceptTable, + covariate_table = paste0("#cov_", i) + ) + message(paste0("Executing during sql code for ", getDomainSettings$analysisName[domainSettingsIndex])) + start <- Sys.time() + DatabaseConnector::executeSql( + connection = connection, + sql = sql, + progressBar = T + ) + time <- Sys.time() - start + message(paste0("Execution took ", round(time, digits = 2), " ", units(time))) + } + + # all_covariates.cohort_definition_id,\n all_covariates.covariate_id,\n all_covariates.sum_value,\n CAST(all_covariates.sum_value / (1.0 * total.total_count) AS FLOAT) AS average_value + + message(paste0("Extracting covariates")) + start <- Sys.time() + # Retrieve the covariate: + if (useBinary) { + sql <- paste0( + "select temp.*, CAST(temp.sum_value / (1.0 * total.total_count) AS FLOAT) AS average_value from (", + paste0(paste0("select cohort_definition_id, covariate_id, sum_value from #cov_", binaryInd), collapse = " union "), + ") temp inner join + (SELECT cohort_definition_id, COUNT(*) AS total_count + FROM @cohort_table {@cohort_definition_id != -1} ? {\nWHERE cohort_definition_id IN (@cohort_definition_id)} + GROUP BY cohort_definition_id ) total + on temp.cohort_definition_id = total.cohort_definition_id;" + ) + sql <- SqlRender::render( + sql = sql, + cohort_table = cohortTable, + cohort_definition_id = paste0(c(-1), collapse = ",") + ) + sql <- SqlRender::translate( + sql = sql, + targetDialect = DatabaseConnector::dbms(connection) + ) + + DatabaseConnector::querySqlToAndromeda( + connection = connection, + sql = sql, + andromeda = result, + andromedaTableName = "covariates", + appendToTable = F, + snakeCaseToCamelCase = TRUE + ) + if (minCharacterizationMean != 0 && "averageValue" %in% colnames(result$covariates)) { + result$covariates <- result$covariates %>% + dplyr::filter(.data$averageValue >= minCharacterizationMean) + } + } + + if (useContinuous) { + sql <- paste0(paste0("select * from #cov_", continuousInd), collapse = " union ") + sql <- SqlRender::translate( + sql = sql, + targetDialect = DatabaseConnector::dbms(connection) + ) + DatabaseConnector::querySqlToAndromeda( + connection = connection, + sql = sql, + andromeda = result, + andromedaTableName = "covariatesContinuous", + appendToTable = F, + snakeCaseToCamelCase = TRUE + ) + } + # Retrieve the covariate ref: + DatabaseConnector::querySqlToAndromeda( + connection = connection, + sql = SqlRender::translate( + sql = "select * from #cov_ref;", + targetDialect = DatabaseConnector::dbms(connection) + ), + andromeda = result, + andromedaTableName = "covariateRef", + appendToTable = F, + snakeCaseToCamelCase = TRUE + ) + + # Retrieve the analysis ref: + DatabaseConnector::querySqlToAndromeda( + connection = connection, + sql = SqlRender::translate( + sql = "select * from #analysis_ref;", + targetDialect = DatabaseConnector::dbms(connection) + ), + andromeda = result, + andromedaTableName = "analysisRef", + appendToTable = F, + snakeCaseToCamelCase = TRUE + ) + time <- Sys.time() - start + message(paste0("Extracting covariates took ", round(time, digits = 2), " ", units(time))) + + # clean up: drop tables... + if (length(c(binaryInd, continuousInd)) != 0) { + message(paste0("Removing temp covariate tables")) + for (i in c(binaryInd, continuousInd)) { + sql <- "TRUNCATE TABLE #cov_@id;\nDROP TABLE #cov_@id;\n" + sql <- SqlRender::render( + sql, + id = i + ) + sql <- SqlRender::translate( + sql = sql, + targetDialect = attr(connection, "dbms"), + oracleTempSchema = oracleTempSchema + ) + DatabaseConnector::executeSql(connection, sql, progressBar = FALSE, reportOverallTime = FALSE) + } + } + + # Construct metaData + metaData <- list(sql = sql, call = match.call()) + attr(result, "metaData") <- metaData + class(result) <- "CovariateData" + return(result) +} diff --git a/R/Database.R b/R/Database.R index 249d761..1fbe141 100644 --- a/R/Database.R +++ b/R/Database.R @@ -24,7 +24,7 @@ #' This function creates a sqlite database and connection #' #' @param sqliteLocation The location of the sqlite database -#' +#' @family {Database} #' @return #' Returns the connection to the sqlite database #' @@ -47,60 +47,52 @@ createSqliteDatabase <- function( dbms = "sqlite", server = file.path(sqliteLocation, "sqlite.sqlite") ) - connection <- DatabaseConnector::connect( - connectionDetails = connectionDetails - ) - return(connection) + return(connectionDetails) } -# move Andromeda to sqlite database -insertAndromedaToDatabase <- function( - connection, - databaseSchema, - tableName, - andromedaObject, - tempEmulationSchema, - bulkLoad = T, - tablePrefix = "c_", - minCellCount = 0, - minCellCountColumns = list()) { - errorMessages <- checkmate::makeAssertCollection() - .checkTablePrefix( - tablePrefix = tablePrefix, - errorMessages = errorMessages +#' Upload the results into a result database +#' @description +#' This function uploads results in csv format into a result database +#' +#' @details +#' Calls ResultModelManager uploadResults function to upload the csv files +#' +#' @param connectionDetails The connection details to the result database +#' @param schema The schema for the result database +#' @param resultsFolder The folder containing the csv results +#' @param tablePrefix A prefix to append to the result tables for the characterization results +#' @param csvTablePrefix The prefix added to the csv results - default is 'c_' +#' @family {Database} +#' @return +#' Returns the connection to the sqlite database +#' +#' @export +insertResultsToDatabase <- function( + connectionDetails, + schema, + resultsFolder, + tablePrefix = "", + csvTablePrefix = "c_") { + specLoc <- system.file("settings", "resultsDataModelSpecification.csv", + package = "Characterization" ) - checkmate::reportAssertions(errorMessages) - - message("Inserting Andromeda table into database table ", tablePrefix, tableName) - - Andromeda::batchApply( - tbl = andromedaObject, - fun = function(x) { - data <- as.data.frame(x %>% dplyr::collect()) # apply minCellCount - data <- removeMinCell( - data = data, - minCellCount = minCellCount, - minCellCountColumns = minCellCountColumns - ) - - DatabaseConnector::insertTable( - connection = connection, - databaseSchema = databaseSchema, - tableName = paste0(tablePrefix, tableName), - data = data, - dropTableIfExists = F, - createTable = F, - tempEmulationSchema = tempEmulationSchema, - bulkLoad = bulkLoad, - camelCaseToSnakeCase = T - ) - } + specs <- utils::read.csv(specLoc) + colnames(specs) <- SqlRender::snakeCaseToCamelCase(colnames(specs)) + specs$tableName <- paste0(csvTablePrefix, specs$tableName) + ResultModelManager::uploadResults( + connectionDetails = connectionDetails, + schema = schema, + resultsFolder = resultsFolder, + tablePrefix = tablePrefix, + specifications = specs, + purgeSiteDataBeforeUploading = F ) - return(TRUE) + return(invisible(NULL)) } +## TODO add this into the csv exporting removeMinCell <- function( data, minCellCount = 0, @@ -128,7 +120,6 @@ removeMinCell <- function( } - #' Create the results tables to store characterization results into a database #' @description #' This function executes a large set of SQL statements to create tables that can store results @@ -136,8 +127,8 @@ removeMinCell <- function( #' @details #' This function can be used to create (or delete) Characterization result tables #' -#' @param conn A connection to a database created by using the -#' function \code{connect} in the +#' @param connectionDetails The connectionDetails to a database created by using the +#' function \code{createConnectDetails} in the #' \code{DatabaseConnector} package. #' @param resultSchema The name of the database schema that the result tables will be created. #' @param targetDialect The database management system being used @@ -145,13 +136,13 @@ removeMinCell <- function( #' @param createTables If true the Characterization result tables will be created #' @param tablePrefix A string appended to the Characterization result tables #' @param tempEmulationSchema The temp schema used when the database management system is oracle -#' +#' @family {Database} #' @return #' Returns NULL but creates the required tables into the specified database schema. #' #' @export createCharacterizationTables <- function( - conn, + connectionDetails, resultSchema, targetDialect = "postgresql", deleteExistingTables = T, @@ -165,18 +156,26 @@ createCharacterizationTables <- function( ) checkmate::reportAssertions(errorMessages) + conn <- DatabaseConnector::connect(connectionDetails = connectionDetails) + on.exit(DatabaseConnector::disconnect(conn)) + + alltables <- tolower( + DatabaseConnector::getTableNames( + connection = conn, + databaseSchema = resultSchema + ) + ) + tables <- getResultTables() + tables <- paste0(tablePrefix, tables) + + # adding this to not create tables if all tables esist + if (sum(tables %in% alltables) == length(tables) & !deleteExistingTables) { + message("All tables exist so no need to recreate") + createTables <- FALSE + } if (deleteExistingTables) { message("Deleting existing tables") - tables <- getResultTables() - tables <- paste0(tablePrefix, tables) - - alltables <- tolower( - DatabaseConnector::getTableNames( - connection = conn, - databaseSchema = resultSchema - ) - ) for (tb in tables) { if (tb %in% alltables) { @@ -232,193 +231,63 @@ createCharacterizationTables <- function( ) # add database migration here in the future + migrateDataModel( + connectionDetails = connectionDetails, + databaseSchema = resultSchema, + tablePrefix = tablePrefix + ) } } -#' Exports all tables in the result database to csv files -#' @description -#' This function extracts the database tables into csv files -#' -#' @details -#' This function extracts the database tables into csv files -#' -#' @param connectionDetails The connection details to input into the -#' function \code{connect} in the -#' \code{DatabaseConnector} package. -#' @param resultSchema The name of the database schema that the result tables will be created. -#' @param targetDialect DEPRECATED: derived from \code{connectionDetails}. -#' @param tablePrefix The table prefix to apply to the characterization result tables -#' @param filePrefix The prefix to apply to the files -#' @param tempEmulationSchema The temp schema used when the database management system is oracle -#' @param saveDirectory The directory to save the csv results -#' @param minMeanCovariateValue The minimum mean covariate value (i.e. the minimum proportion for -#' binary covariates) for a covariate to be included in covariate table. -#' Other covariates are removed to save space. -#' -#' @return -#' csv file per table into the saveDirectory -#' -#' @export -exportDatabaseToCsv <- function( - connectionDetails, - resultSchema, - targetDialect = NULL, - tablePrefix = "c_", - filePrefix = NULL, - tempEmulationSchema = getOption("sqlRenderTempEmulationSchema"), - saveDirectory, - minMeanCovariateValue = 0.001) { - errorMessages <- checkmate::makeAssertCollection() - .checkConnectionDetails(connectionDetails, errorMessages) - .checkTablePrefix( - tablePrefix = tablePrefix, - errorMessages = errorMessages - ) - checkmate::reportAssertions(errorMessages) - if (!is.null(targetDialect)) { - warning("The targetDialect argument is deprecated") - } - - if (is.null(filePrefix)) { - filePrefix <- "" - } - # connect to result database - connection <- DatabaseConnector::connect( - connectionDetails = connectionDetails +migrateDataModel <- function(connectionDetails, databaseSchema, tablePrefix = "") { + ParallelLogger::logInfo("Migrating data set") + migrator <- getDataMigrator( + connectionDetails = connectionDetails, + databaseSchema = databaseSchema, + tablePrefix = tablePrefix ) - on.exit( - DatabaseConnector::disconnect(connection) + migrator$executeMigrations() + migrator$finalize() + + ParallelLogger::logInfo("Updating version number") + updateVersionSql <- SqlRender::loadRenderTranslateSql("UpdateVersionNumber.sql", + packageName = utils::packageName(), + database_schema = databaseSchema, + table_prefix = tablePrefix, + dbms = connectionDetails$dbms ) - # create the folder to save the csv files - if (!dir.exists(saveDirectory)) { - dir.create( - path = saveDirectory, - recursive = T - ) - } - - # max number of rows extracted at a time - maxRowCount <- 1e6 - - # get the table names using the function in uploadToDatabase.R - tables <- getResultTables() - - # extract result per table - for (table in tables) { - ParallelLogger::logInfo(paste0("Exporting rows from ", table, " to csv file")) - # get row count and figure out number of loops - sql <- "select count(*) as N from @resultSchema.@appendtotable@tablename;" - sql <- SqlRender::render( - sql = sql, - resultSchema = resultSchema, - appendtotable = tablePrefix, - tablename = table - ) - sql <- SqlRender::translate( - sql = sql, - targetDialect = connectionDetails$dbms, - tempEmulationSchema = tempEmulationSchema - ) - countN <- DatabaseConnector::querySql( - connection = connection, - sql = sql, - snakeCaseToCamelCase = F - )$N - - # get column names - sql <- "select * from @resultSchema.@appendtotable@tablename where 1=0;" - sql <- SqlRender::render( - sql = sql, - resultSchema = resultSchema, - appendtotable = tablePrefix, - tablename = table - ) - sql <- SqlRender::translate( - sql = sql, - targetDialect = connectionDetails$dbms, - tempEmulationSchema = tempEmulationSchema - ) - cnames <- colnames(DatabaseConnector::querySql( - connection = connection, - sql = sql, - snakeCaseToCamelCase = F - )) - - inds <- floor(countN / maxRowCount) - tableAppend <- F - # NOTE: If the table has 0 rows (countN == 0), - # then setting the txtProgressBar will fail since - # min < max. So, setting max = countN+1 for this - # reason. - pb <- utils::txtProgressBar(min = 0, max = countN + 1, initial = 0) - - for (i in 1:length(inds)) { - startRow <- (i - 1) * maxRowCount + 1 - endRow <- min(i * maxRowCount, countN) - - sql <- "select @cnames from - (select *, - ROW_NUMBER() OVER(ORDER BY @cnames) AS row - from @resultSchema.@appendtotable@tablename - ) temp - where - temp.row >= @start_row and - temp.row <= @end_row;" - sql <- SqlRender::render( - sql = sql, - resultSchema = resultSchema, - appendtotable = tablePrefix, - tablename = table, - cnames = paste(cnames, collapse = ","), - start_row = startRow, - end_row = endRow - ) - sql <- SqlRender::translate( - sql = sql, - targetDialect = connectionDetails$dbms, - tempEmulationSchema = tempEmulationSchema - ) - result <- DatabaseConnector::querySql( - connection = connection, - sql = sql, - snakeCaseToCamelCase = F - ) - result <- formatDouble(result) + connection <- DatabaseConnector::connect(connectionDetails = connectionDetails) + on.exit(DatabaseConnector::disconnect(connection)) + DatabaseConnector::executeSql(connection, updateVersionSql) +} - # save the results as a csv - readr::write_csv( - x = result, - file = file.path(saveDirectory, paste0(tolower(filePrefix), table, ".csv")), - append = tableAppend - ) - tableAppend <- T - # NOTE: Handling progresss bar per note on txtProgressBar - # above. Otherwise the progress bar doesn't show that it completed. - if (endRow == countN) { - utils::setTxtProgressBar(pb, countN + 1) - } else { - utils::setTxtProgressBar(pb, endRow) - } - } - close(pb) - } - invisible(saveDirectory) +getDataMigrator <- function(connectionDetails, databaseSchema, tablePrefix = "") { + ResultModelManager::DataMigrationManager$new( + connectionDetails = connectionDetails, + databaseSchema = databaseSchema, + tablePrefix = tablePrefix, + migrationPath = "migrations", + packageName = utils::packageName() + ) } getResultTables <- function() { return( unique( - readr::read_csv( - file = system.file( - "settings", - "resultsDataModelSpecification.csv", - package = "Characterization" - ), - show_col_types = FALSE - )$table_name + c( + readr::read_csv( + file = system.file( + "settings", + "resultsDataModelSpecification.csv", + package = "Characterization" + ), + show_col_types = FALSE + )$table_name, + "migration", "package_version" + ) ) ) } diff --git a/R/DechallengeRechallenge.R b/R/DechallengeRechallenge.R index 4ad299b..985fdef 100644 --- a/R/DechallengeRechallenge.R +++ b/R/DechallengeRechallenge.R @@ -20,7 +20,7 @@ #' @param outcomeIds A list of cohortIds for the outcome cohorts #' @param dechallengeStopInterval An integer specifying the how much time to add to the cohort_end when determining whether the event starts during cohort and ends after #' @param dechallengeEvaluationWindow An integer specifying the period of time after the cohort_end when you cannot see an outcome for a dechallenge success -#' +#' @family {DechallengeRechallenge} #' @return #' A list with the settings #' @@ -85,9 +85,12 @@ createDechallengeRechallengeSettings <- function( #' @template ConnectionDetails #' @template TargetOutcomeTables #' @template TempEmulationSchema -#' @param dechallengeRechallengeSettings The settings for the timeToEvent study +#' @param settings The settings for the timeToEvent study #' @param databaseId An identifier for the database (string) -#' +#' @param outputFolder A directory to save the results as csv files +#' @param minCellCount The minimum cell value to display, values less than this will be replaced by -1 +#' @param ... extra inputs +#' @family {DechallengeRechallenge} #' @return #' An \code{Andromeda::andromeda()} object containing the dechallenge rechallenge results #' @@ -99,8 +102,11 @@ computeDechallengeRechallengeAnalyses <- function( outcomeDatabaseSchema = targetDatabaseSchema, outcomeTable = targetTable, tempEmulationSchema = getOption("sqlRenderTempEmulationSchema"), - dechallengeRechallengeSettings, - databaseId = "database 1") { + settings, + databaseId = "database 1", + outputFolder = file.path(getwd(), "results"), + minCellCount = 0, + ...) { # check inputs errorMessages <- checkmate::makeAssertCollection() .checkConnectionDetails(connectionDetails, errorMessages) @@ -121,7 +127,7 @@ computeDechallengeRechallengeAnalyses <- function( errorMessages = errorMessages ) .checkDechallengeRechallengeSettings( - settings = dechallengeRechallengeSettings, + settings = settings, errorMessages = errorMessages ) @@ -153,10 +159,10 @@ computeDechallengeRechallengeAnalyses <- function( target_table = targetTable, outcome_database_schema = outcomeDatabaseSchema, outcome_table = outcomeTable, - target_ids = paste(dechallengeRechallengeSettings$targetCohortDefinitionIds, sep = "", collapse = ","), - outcome_ids = paste(dechallengeRechallengeSettings$outcomeCohortDefinitionIds, sep = "", collapse = ","), - dechallenge_stop_interval = dechallengeRechallengeSettings$dechallengeStopInterval, - dechallenge_evaluation_window = dechallengeRechallengeSettings$dechallengeEvaluationWindow + target_ids = paste(settings$targetCohortDefinitionIds, sep = "", collapse = ","), + outcome_ids = paste(settings$outcomeCohortDefinitionIds, sep = "", collapse = ","), + dechallenge_stop_interval = settings$dechallengeStopInterval, + dechallenge_evaluation_window = settings$dechallengeEvaluationWindow ) DatabaseConnector::executeSql( connection = connection, @@ -195,14 +201,22 @@ computeDechallengeRechallengeAnalyses <- function( message( paste0( "Computing dechallenge rechallenge for ", - length(dechallengeRechallengeSettings$targetCohortDefinitionIds), " target ids and ", - length(dechallengeRechallengeSettings$outcomeCohortDefinitionIds), "outcome ids took ", + length(settings$targetCohortDefinitionIds), " target ids and ", + length(settings$outcomeCohortDefinitionIds), " outcome ids took ", signif(delta, 3), " ", attr(delta, "units") ) ) - return(result) + # export results to csv + message("exporting to csv file") + exportDechallengeRechallengeToCsv( + result = result, + saveDirectory = outputFolder, + minCellCount = minCellCount + ) + + return(invisible(TRUE)) } } @@ -212,10 +226,13 @@ computeDechallengeRechallengeAnalyses <- function( #' @template ConnectionDetails #' @template TargetOutcomeTables #' @template TempEmulationSchema -#' @param dechallengeRechallengeSettings The settings for the timeToEvent study +#' @param settings The settings for the timeToEvent study #' @param databaseId An identifier for the database (string) #' @param showSubjectId if F then subject_ids are hidden (recommended if sharing results) -#' +#' @param outputFolder A directory to save the results as csv files +#' @param minCellCount The minimum cell value to display, values less than this will be replaced by -1 +#' @param ... extra inputs +#' @family {DechallengeRechallenge} #' @return #' An \code{Andromeda::andromeda()} object with the case series details of the failed rechallenge #' @@ -227,9 +244,12 @@ computeRechallengeFailCaseSeriesAnalyses <- function( outcomeDatabaseSchema = targetDatabaseSchema, outcomeTable = targetTable, tempEmulationSchema = getOption("sqlRenderTempEmulationSchema"), - dechallengeRechallengeSettings, + settings, databaseId = "database 1", - showSubjectId = F) { + showSubjectId = F, + outputFolder = file.path(getwd(), "results"), + minCellCount = 0, + ...) { # check inputs errorMessages <- checkmate::makeAssertCollection() .checkConnectionDetails(connectionDetails, errorMessages) @@ -250,7 +270,7 @@ computeRechallengeFailCaseSeriesAnalyses <- function( errorMessages = errorMessages ) .checkDechallengeRechallengeSettings( - settings = dechallengeRechallengeSettings, + settings = settings, errorMessages = errorMessages ) @@ -269,7 +289,7 @@ computeRechallengeFailCaseSeriesAnalyses <- function( DatabaseConnector::disconnect(connection) ) - message("Computing dechallenge rechallenge results") + message("Computing dechallenge rechallenge fails results") sql <- SqlRender::loadRenderTranslateSql( sqlFilename = "RechallengeFailCaseSeries.sql", packageName = "Characterization", @@ -280,10 +300,10 @@ computeRechallengeFailCaseSeriesAnalyses <- function( target_table = targetTable, outcome_database_schema = outcomeDatabaseSchema, outcome_table = outcomeTable, - target_ids = paste(dechallengeRechallengeSettings$targetCohortDefinitionIds, sep = "", collapse = ","), - outcome_ids = paste(dechallengeRechallengeSettings$outcomeCohortDefinitionIds, sep = "", collapse = ","), - dechallenge_stop_interval = dechallengeRechallengeSettings$dechallengeStopInterval, - dechallenge_evaluation_window = dechallengeRechallengeSettings$dechallengeEvaluationWindow, + target_ids = paste(settings$targetCohortDefinitionIds, sep = "", collapse = ","), + outcome_ids = paste(settings$outcomeCohortDefinitionIds, sep = "", collapse = ","), + dechallenge_stop_interval = settings$dechallengeStopInterval, + dechallenge_evaluation_window = settings$dechallengeEvaluationWindow, show_subject_id = showSubjectId ) DatabaseConnector::executeSql( @@ -323,13 +343,146 @@ computeRechallengeFailCaseSeriesAnalyses <- function( message( paste0( "Computing dechallenge failed case series for ", - length(dechallengeRechallengeSettings$targetCohortDefinitionIds), " target IDs and ", - length(dechallengeRechallengeSettings$outcomeCohortDefinitionIds), " outcome IDs took ", + length(settings$targetCohortDefinitionIds), " target IDs and ", + length(settings$outcomeCohortDefinitionIds), " outcome IDs took ", signif(delta, 3), " ", attr(delta, "units") ) ) - return(result) + # add the csv export here + message("exporting to csv file") + exportRechallengeFailCaseSeriesToCsv( + result = result, + saveDirectory = outputFolder + ) + + return(invisible(TRUE)) } } + +getDechallengeRechallengeJobs <- function( + characterizationSettings, + threads) { + characterizationSettings <- characterizationSettings$dechallengeRechallengeSettings + if (length(characterizationSettings) == 0) { + return(NULL) + } + ind <- 1:length(characterizationSettings) + targetIds <- lapply(ind, function(i) { + characterizationSettings[[i]]$targetCohortDefinitionIds + }) + outcomeIds <- lapply(ind, function(i) { + characterizationSettings[[i]]$outcomeCohortDefinitionIds + }) + dechallengeStopIntervals <- lapply(ind, function(i) { + characterizationSettings[[i]]$dechallengeStopInterval + }) + dechallengeEvaluationWindows <- lapply(ind, function(i) { + characterizationSettings[[i]]$dechallengeEvaluationWindow + }) + + # get all combinations of TnOs, then split by treads + + combinations <- do.call( + what = "rbind", + args = + lapply( + 1:length(targetIds), + function(i) { + result <- expand.grid( + targetId = targetIds[[i]], + outcomeId = outcomeIds[[i]] + ) + result$dechallengeStopInterval <- dechallengeStopIntervals[[i]] + result$dechallengeEvaluationWindow <- dechallengeEvaluationWindows[[i]] + return(result) + } + ) + ) + # find out whether more Ts or more Os + tcount <- nrow( + combinations %>% + dplyr::count( + .data$targetId, + .data$dechallengeStopInterval, + .data$dechallengeEvaluationWindow + ) + ) + + ocount <- nrow( + combinations %>% + dplyr::count( + .data$outcomeId, + .data$dechallengeStopInterval, + .data$dechallengeEvaluationWindow + ) + ) + + if (threads > max(tcount, ocount)) { + message("Tnput parameter threads greater than number of targets and outcomes") + message(paste0("Only using ", max(tcount, ocount), " threads for TimeToEvent")) + } + + if (tcount >= ocount) { + threadDf <- combinations %>% + dplyr::count( + .data$targetId, + .data$dechallengeStopInterval, + .data$dechallengeEvaluationWindow + ) + threadDf$thread <- rep(1:threads, ceiling(tcount / threads))[1:tcount] + mergeColumn <- c("targetId", "dechallengeStopInterval", "dechallengeEvaluationWindow") + } else { + threadDf <- combinations %>% + dplyr::count( + .data$outcomeId, + .data$dechallengeStopInterval, + .data$dechallengeEvaluationWindow + ) + threadDf$thread <- rep(1:threads, ceiling(ocount / threads))[1:ocount] + mergeColumn <- c("outcomeId", "dechallengeStopInterval", "dechallengeEvaluationWindow") + } + + combinations <- merge(combinations, threadDf, by = mergeColumn) + sets <- lapply( + X = 1:max(threadDf$thread), + FUN = function(i) { + createDechallengeRechallengeSettings( + targetIds = unique(combinations$targetId[combinations$thread == i]), + outcomeIds = unique(combinations$outcomeId[combinations$thread == i]), + dechallengeStopInterval = unique(combinations$dechallengeStopInterval[combinations$thread == i]), + dechallengeEvaluationWindow = unique(combinations$dechallengeEvaluationWindow[combinations$thread == i]) + ) + } + ) + + # recreate settings + settings <- c() + for (i in 1:length(sets)) { + settings <- rbind( + settings, + data.frame( + functionName = "computeDechallengeRechallengeAnalyses", + settings = as.character(ParallelLogger::convertSettingsToJson( + sets[[i]] + )), + executionFolder = paste0("dr_", i), + jobId = paste0("dr_", i) + ) + ) + settings <- rbind( + settings, + data.frame( + functionName = "computeRechallengeFailCaseSeriesAnalyses", + settings = as.character(ParallelLogger::convertSettingsToJson( + sets[[i]] + )), + executionFolder = paste0("rfcs_", i), + jobId = paste0("rfcs_", i) + ) + ) + } + + return(settings) +} diff --git a/R/HelperFunctions.R b/R/HelperFunctions.R index 973ec25..0432995 100644 --- a/R/HelperFunctions.R +++ b/R/HelperFunctions.R @@ -212,7 +212,7 @@ .checkCovariateSettings <- function(covariateSettings, errorMessages) { - if (class(covariateSettings) == "covariateSettings") { + if (inherits(covariateSettings, "covariateSettings")) { checkmate::assertClass( x = covariateSettings, classes = "covariateSettings", @@ -239,3 +239,25 @@ add = errorMessages ) } + + + +checkNoCsv <- function( + csvFiles, + errorMessage) { + csvExists <- sapply(csvFiles, function(x) { + file.exists(x) + }) + + if (sum(csvExists) > 0) { + stop(errorMessage) + } + + return(invisible(TRUE)) +} + +cleanCsv <- function( + resultFolder, + fileName = "time_to_event.csv") { + file.remove(file.path(resultFolder, fileName)) +} diff --git a/R/Incremental.R b/R/Incremental.R new file mode 100644 index 0000000..f05d9d2 --- /dev/null +++ b/R/Incremental.R @@ -0,0 +1,185 @@ +createIncrementalLog <- function( + executionFolder, + logname = "execution.csv") { + if (!dir.exists(executionFolder)) { + dir.create(executionFolder, recursive = T) + } + + if (!file.exists(file.path(executionFolder, logname))) { + x <- data.frame( + run_date_time = Sys.time(), + job_id = 0, + start_time = Sys.time(), + end_time = Sys.time() + ) + readr::write_csv( + x = x, + file = file.path(executionFolder, logname) + ) + } +} + +loadIncrementalFiles <- function(executionFolder) { + if (file.exists(file.path(executionFolder, "execution.csv"))) { + executed <- utils::read.csv(file.path(executionFolder, "execution.csv")) + } else { + stop("execution.csv missing") + } + + if (file.exists(file.path(executionFolder, "completed.csv"))) { + completed <- utils::read.csv(file.path(executionFolder, "completed.csv")) + } else { + stop("completed.csv missing") + } + return(list( + executed = executed, + completed = completed + )) +} + +getExecutionJobIssues <- function( + executed, + completed) { + executedJobs <- unique(executed$job_id) + completedJobs <- unique(completed$job_id) + + issues <- executedJobs[!executedJobs %in% completedJobs] + return(issues) +} + +#' Removes csv files from folders that have not been marked as completed +#' and removes the record of the execution file +#' +#' @param executionFolder The folder that has the execution files +#' @family {Incremental} +#' @return +#' A list with the settings +#' +#' @export +cleanIncremental <- function( + executionFolder) { + incrementalFiles <- loadIncrementalFiles( + executionFolder + ) + + issues <- getExecutionJobIssues( + executed = incrementalFiles$executed, + completed = incrementalFiles$completed + ) + + if (length(issues) > 0) { + # delete contents inside folder + for (i in 1:length(issues)) { + files <- dir(file.path(executionFolder, issues[i]), full.names = T) + for (file in files) { + message(paste0("Deleting incomplete result file ", file)) + file.remove(file) + } + } + } + + # now update the execution to remove the issue rows + executionFile <- utils::read.csv( + file = file.path(executionFolder, "execution.csv") + ) + fixedExecution <- executionFile[!executionFile$job_id %in% issues, ] + utils::write.csv( + x = fixedExecution, + file = file.path(executionFolder, "execution.csv") + ) + + return(invisible(NULL)) +} + +checkResultFilesIncremental <- function( + executionFolder) { + incrementalFiles <- loadIncrementalFiles( + executionFolder + ) + + issues <- getExecutionJobIssues( + executed = incrementalFiles$executed, + completed = incrementalFiles$completed + ) + + if (length(issues) > 0) { + stop(paste0("jobIds: ", paste0(issues, collapse = ","), "executed but not completed. Please run cleanIncremental() to remove incomplete results.")) + } + + return(invisible(NULL)) +} + +findCompletedJobs <- function(executionFolder) { + incrementalFiles <- loadIncrementalFiles(executionFolder) + return(unique(incrementalFiles$completed$job_id)) +} + + +recordIncremental <- function( + executionFolder, + runDateTime, + jobId, + startTime, + endTime, + logname = "execution.csv") { + if (file.exists(file.path(executionFolder, logname))) { + x <- data.frame( + run_date_time = runDateTime, + job_id = jobId, + start_time = startTime, + end_time = endTime + ) + readr::write_csv( + x = x, + file = file.path(executionFolder, logname), + append = T + ) + } else { + warning(paste0(logname, " file missing so no logging possible")) + } +} + +#' Removes csv files from the execution folder as there should be no csv files +#' when running in non-incremental model +#' +#' @param executionFolder The folder that has the execution files +#' @family {Incremental} +#' @return +#' A list with the settings +#' +#' @export +cleanNonIncremental <- function( + executionFolder) { + # remove all files from the executionFolder + files <- dir( + path = executionFolder, + recursive = T, + full.names = T, + pattern = ".csv" + ) + if (length(files) > 0) { + for (file in files) { + message(paste0("Deleting file ", file)) + file.remove(file) + } + } +} + +checkResultFilesNonIncremental <- function( + executionFolder) { + files <- dir( + path = executionFolder, + recursive = T, + full.names = T, + pattern = ".csv" + ) + if (length(files) > 0) { + errorMessage <- paste0( + "Running in non-incremental but csv files exist in execution folder.", + " please delete manually or using cleanNonIncremental()" + ) + stop(errorMessage) + } + + return(invisible(NULL)) +} diff --git a/R/RunCharacterization.R b/R/RunCharacterization.R index 94bc488..4d8a0fa 100644 --- a/R/RunCharacterization.R +++ b/R/RunCharacterization.R @@ -8,7 +8,7 @@ #' @param timeToEventSettings A list of timeToEvent settings #' @param dechallengeRechallengeSettings A list of dechallengeRechallenge settings #' @param aggregateCovariateSettings A list of aggregateCovariate settings -#' +#' @family {LargeScale} #' @return #' Returns the connection to the sqlite database #' @@ -54,6 +54,7 @@ createCharacterizationSettings <- function( return(settings) } + #' Save the characterization settings as a json #' @description #' This function converts the settings into a json object and saves it @@ -63,7 +64,7 @@ createCharacterizationSettings <- function( #' #' @param settings An object of class characterizationSettings created using \code{createCharacterizationSettings} #' @param fileName The location to save the json settings -#' +#' @family {LargeScale} #' @return #' Returns the location of the directory containing the json settings #' @@ -90,7 +91,7 @@ saveCharacterizationSettings <- function( #' #' @return #' Returns the json settings as an R object -#' +#' @family {LargeScale} #' @export loadCharacterizationSettings <- function( fileName) { @@ -115,15 +116,18 @@ loadCharacterizationSettings <- function( #' @template TempEmulationSchema #' @param cdmDatabaseSchema The schema with the OMOP CDM data #' @param characterizationSettings The study settings created using \code{createCharacterizationSettings} -#' @param saveDirectory The location to save the results to -#' @param tablePrefix A string to append the tables in the results +#' @param outputDirectory The location to save the final csv files to +#' @param executionPath The location where intermediate results are saved to +#' @param csvFilePrefix A string to append the csv files in the outputDirectory #' @param databaseId The unique identifier for the cdm database #' @param showSubjectId Whether to include subjectId of failed rechallenge case series or hide #' @param minCellCount The minimum count value that is calculated -#' +#' @param incremental If TRUE then skip previously executed analyses that completed +#' @param threads The number of threads to use when running aggregate covariates +#' @param minCharacterizationMean The minimum mean threshold to extract when running aggregate covariates +#' @family {LargeScale} #' @return -#' An sqlite database with the results is saved into the saveDirectory and a csv file named tacker.csv -#' details which analyses have run to completion. +#' Multiple csv files in the outputDirectory. #' #' @export runCharacterizationAnalyses <- function( @@ -132,14 +136,18 @@ runCharacterizationAnalyses <- function( targetTable, outcomeDatabaseSchema, outcomeTable, - tempEmulationSchema = NULL, + tempEmulationSchema = getOption("sqlRenderTempEmulationSchema"), cdmDatabaseSchema, characterizationSettings, - saveDirectory, - tablePrefix = "c_", + outputDirectory, + executionPath = file.path(outputDirectory, "execution"), + csvFilePrefix = "c_", databaseId = "1", showSubjectId = F, - minCellCount = 0) { + minCellCount = 0, + incremental = T, + threads = 1, + minCharacterizationMean = 0.01) { # inputs checks errorMessages <- checkmate::makeAssertCollection() .checkCharacterizationSettings( @@ -147,290 +155,324 @@ runCharacterizationAnalyses <- function( errorMessages = errorMessages ) .checkTablePrefix( - tablePrefix = tablePrefix, + tablePrefix = csvFilePrefix, errorMessages = errorMessages ) checkmate::reportAssertions( errorMessages ) - # create the Database - conn <- createSqliteDatabase( - sqliteLocation = saveDirectory - ) - on.exit( - DatabaseConnector::disconnect(conn) + runDateTime <- Sys.time() + + createDirectory(outputDirectory) + createDirectory(executionPath) + + logger <- createLogger( + logPath = file.path(executionPath), + logName = "log.txt" ) + ParallelLogger::registerLogger(logger) + on.exit(ParallelLogger::unregisterLogger(logger)) - createCharacterizationTables( - conn = conn, - resultSchema = "main", - targetDialect = "sqlite", - deleteExistingTables = T, - createTables = T, - tablePrefix = tablePrefix + jobs <- createJobs( + characterizationSettings = characterizationSettings, + threads = threads ) - if (!is.null(characterizationSettings$timeToEventSettings)) { - for (i in 1:length(characterizationSettings$timeToEventSettings)) { - message("Running time to event analysis ", i) - - result <- tryCatch( - { - computeTimeToEventAnalyses( - connectionDetails = connectionDetails, - targetDatabaseSchema = targetDatabaseSchema, - targetTable = targetTable, - outcomeDatabaseSchema = outcomeDatabaseSchema, - outcomeTable = outcomeTable, - tempEmulationSchema = tempEmulationSchema, - cdmDatabaseSchema = cdmDatabaseSchema, - timeToEventSettings = characterizationSettings$timeToEventSettings[[i]], - databaseId = databaseId - ) - }, - error = function(e) { - message(paste0("ERROR in time-to-event analysis: ", e$message)) - return(NULL) - } - ) + # save settings + if (!file.exists(file.path(executionPath, "settings.rds"))) { + saveRDS( + object = list( + characterizationSettings = characterizationSettings, + threads = threads + ), + file = file.path(executionPath, "settings.rds") + ) + } - if (!is.null(result)) { - # log that run was sucessful - readr::write_csv( - x = data.frame( - analysis_type = "timeToEvent", - run_id = i, - database_id = databaseId, - date_time = as.character(Sys.time()) - ), - file = file.path(saveDirectory, "tracker.csv"), - append = file.exists(file.path(saveDirectory, "tracker.csv")) - ) + if (incremental) { + # check for any issues with current incremental + oldSettings <- readRDS( + file = file.path(executionPath, "settings.rds") + ) + if (!identical(characterizationSettings, oldSettings$characterizationSettings)) { + stop("Settings have changed - please turn off incremental") + } + if (!identical(threads, oldSettings$threads)) { + stop("Cannot change number of threads in incremental model") + } - insertAndromedaToDatabase( - connection = conn, - databaseSchema = "main", - tableName = "time_to_event", - andromedaObject = result$timeToEvent, - tablePrefix = tablePrefix, - minCellCount = minCellCount, - minCellCountColumns = list("numEvents") - ) - } + # create logs if not exists + createIncrementalLog( + executionFolder = executionPath, + logname = "execution.csv" + ) + createIncrementalLog( + executionFolder = executionPath, + logname = "completed.csv" + ) + + checkResultFilesIncremental( + executionFolder = executionPath + ) + + # remove any previously completed jobs + completedJobIds <- findCompletedJobs(executionFolder = executionPath) + + completedJobIndex <- jobs$jobId %in% completedJobIds + if (sum(completedJobIndex) > 0) { + message(paste0("Removing ", sum(completedJobIndex), " previously completed jobs")) + jobs <- jobs[!completedJobIndex, ] + } + + if (nrow(jobs) == 0) { + message("No jobs left") + return(invisible(T)) } + } else { + # check for any csv files in folder + checkResultFilesNonIncremental( + executionFolder = executionPath + ) } - if (!is.null(characterizationSettings$dechallengeRechallengeSettings)) { - for (i in 1:length(characterizationSettings$dechallengeRechallengeSettings)) { - ParallelLogger::logInfo(paste0("Running dechallenge rechallenge analysis ", i)) - - result <- tryCatch( - { - computeDechallengeRechallengeAnalyses( - connectionDetails = connectionDetails, - targetDatabaseSchema = targetDatabaseSchema, - targetTable = targetTable, - outcomeDatabaseSchema = outcomeDatabaseSchema, - outcomeTable = outcomeTable, - tempEmulationSchema = tempEmulationSchema, - dechallengeRechallengeSettings = characterizationSettings$dechallengeRechallengeSettings[[i]], - databaseId = databaseId - ) - }, - error = function(e) { - message(paste0("ERROR in dechallenge rechallenge analysis: ", e$message)) - return(NULL) - } - ) - if (!is.null(result)) { - # log that run was sucessful - readr::write_csv( - x = data.frame( - analysis_type = "dechallengeRechallenge", - run_id = i, - database_id = databaseId, - date_time = as.character(Sys.time()) - ), - file = file.path(saveDirectory, "tracker.csv"), - append = file.exists(file.path(saveDirectory, "tracker.csv")) - ) - insertAndromedaToDatabase( - connection = conn, - databaseSchema = "main", - tableName = "dechallenge_rechallenge", - andromedaObject = result$dechallengeRechallenge, - tablePrefix = tablePrefix, - minCellCount = minCellCount, - minCellCountColumns = list( - c("numCases"), - c("dechallengeAttempt"), - c("dechallengeFail", "dechallengeSuccess"), - c("rechallengeAttempt"), - c("rechallengeFail", "rechallengeSuccess") - ) - ) - } + # Now loop over the jobs + inputSettings <- list( + connectionDetails = connectionDetails, + targetDatabaseSchema = targetDatabaseSchema, + targetTable = targetTable, + outcomeDatabaseSchema = outcomeDatabaseSchema, + outcomeTable = outcomeTable, + tempEmulationSchema = tempEmulationSchema, + cdmDatabaseSchema = cdmDatabaseSchema, + databaseId = databaseId, + showSubjectId = showSubjectId, + minCellCount = minCellCount, + minCharacterizationMean = minCharacterizationMean, + executionPath = executionPath, + incremental = incremental + ) - # run failed analysis - message("Running rechallenge failed case analysis ", i) - - result <- tryCatch( - { - computeRechallengeFailCaseSeriesAnalyses( - connectionDetails = connectionDetails, - targetDatabaseSchema = targetDatabaseSchema, - targetTable = targetTable, - outcomeDatabaseSchema = outcomeDatabaseSchema, - outcomeTable = outcomeTable, - tempEmulationSchema = tempEmulationSchema, - dechallengeRechallengeSettings = characterizationSettings$dechallengeRechallengeSettings[[i]], - databaseId = databaseId, - showSubjectId = showSubjectId - ) - }, - error = function(e) { - message(paste0("ERROR in rechallenge failed case analysis: ", e$message)) - return(NULL) - } + # convert jobList to list with extra inputs + jobList <- lapply( + X = 1:nrow(jobs), + FUN = function(ind) { + inputs <- inputSettings + inputs$settings <- jobs$settings[ind] + inputs$functionName <- jobs$functionName[ind] + inputs$executionFolder <- jobs$executionFolder[ind] + inputs$jobId <- jobs$jobId[ind] + inputs$runDateTime <- runDateTime + return(inputs) + } + ) + + message("Creating new cluster") + cluster <- ParallelLogger::makeCluster( + numberOfThreads = threads, + singleThreadToMain = T, + setAndromedaTempFolder = T + ) + + ParallelLogger::clusterApply( + cluster = cluster, + x = jobList, + fun = runCharacterizationsInParallel + ) + + # code to export all csvs into one file + aggregateCsvs( + outputFolder = outputDirectory, + executionPath = executionPath, + executionFolders = jobs$executionFolder, + csvFilePrefix = csvFilePrefix + ) + + invisible(outputDirectory) +} + +createDirectory <- function(x) { + if (!dir.exists(x)) { + message(paste0("Creating directory ", x)) + dir.create(x, recursive = T) + } +} + +createLogger <- function(logPath, logName) { + createDirectory(logPath) + ParallelLogger::createLogger( + name = "Characterization", + threshold = "INFO", + appenders = list( + ParallelLogger::createFileAppender( + fileName = file.path(logPath, logName), + layout = ParallelLogger::layoutParallel, + expirationTime = 60 * 60 * 48 ) + ) + ) +} - if (!is.null(result)) { - # log that run was successful - readr::write_csv( - x = data.frame( - analysis_type = "rechallengeFailCaseSeries", - run_id = i, - database_id = databaseId, - date_time = as.character(Sys.time()) - ), - file = file.path(saveDirectory, "tracker.csv"), - append = file.exists(file.path(saveDirectory, "tracker.csv")) - ) +runCharacterizationsInParallel <- function(x) { + startTime <- Sys.time() + + functionName <- x$functionName + inputSettings <- x + inputSettings$functionName <- NULL + inputSettings$settings <- ParallelLogger::convertJsonToSettings(inputSettings$settings) + inputSettings$outputFolder <- file.path(x$executionPath, x$executionFolder) + + if (x$incremental) { + recordIncremental( + executionFolder = x$executionPath, + runDateTime = x$runDateTime, + jobId = x$jobId, + startTime = startTime, + endTime = startTime, + logname = "execution.csv" + ) + } - insertAndromedaToDatabase( - connection = conn, - databaseSchema = "main", - tableName = "rechallenge_fail_case_series", - andromedaObject = result$rechallengeFailCaseSeries, - tablePrefix = tablePrefix - ) - } + completed <- tryCatch( + { + do.call( + what = eval(parse(text = functionName)), + args = inputSettings + ) + }, + error = function(e) { + print(e) + return(FALSE) } + ) + + endTime <- Sys.time() + + # if it completed without issues save it + if (x$incremental & completed) { + recordIncremental( + executionFolder = x$executionPath, + runDateTime = x$runDateTime, + jobId = x$jobId, + startTime = startTime, + endTime = endTime, + logname = "completed.csv" + ) } +} +createJobs <- function( + characterizationSettings, + threads) { + jobDf <- rbind( + getTimeToEventJobs( + characterizationSettings, + threads + ), + getDechallengeRechallengeJobs( + characterizationSettings, + threads + ), + getAggregateCovariatesJobs( + characterizationSettings, + threads + ) + ) - if (!is.null(characterizationSettings$aggregateCovariateSettings)) { - ParallelLogger::logInfo("Running aggregate covariate analyses") - - for (i in 1:length(characterizationSettings$aggregateCovariateSettings)) { - result <- tryCatch( - { - computeAggregateCovariateAnalyses( - connectionDetails = connectionDetails, - cdmDatabaseSchema = cdmDatabaseSchema, - targetDatabaseSchema = targetDatabaseSchema, - targetTable = targetTable, - outcomeDatabaseSchema = outcomeDatabaseSchema, - outcomeTable = outcomeTable, - tempEmulationSchema = tempEmulationSchema, - aggregateCovariateSettings = characterizationSettings$aggregateCovariateSettings[[i]], - databaseId = databaseId, - runId = i - ) - }, - error = function(e) { - message(paste0("ERROR in aggregate covariate analyses: ", e$message)) - message(e) - return(NULL) - } - ) + # data.frame( + # functionName, + # settings # json, + # executionFolder, + # jobId + # ) - if (!is.null(result)) { - # log that run was successful - readr::write_csv( - x = data.frame( - analysis_type = "aggregateCovariates", - run_id = i, - database_id = databaseId, - date_time = as.character(Sys.time()) - ), - file = file.path(saveDirectory, "tracker.csv"), - append = file.exists(file.path(saveDirectory, "tracker.csv")) - ) + return(jobDf) +} - insertAndromedaToDatabase( - connection = conn, - databaseSchema = "main", - tableName = "settings", - andromedaObject = result$settings, - tablePrefix = tablePrefix - ) - insertAndromedaToDatabase( - connection = conn, - databaseSchema = "main", - tableName = "cohort_counts", - andromedaObject = result$cohortCounts, - tablePrefix = tablePrefix - ) - insertAndromedaToDatabase( - connection = conn, - databaseSchema = "main", - tableName = "cohort_details", - andromedaObject = result$cohortDetails, - tablePrefix = tablePrefix - ) +aggregateCsvs <- function( + executionPath, + outputFolder, + executionFolders, # needed? + csvFilePrefix) { + tables <- c( + "cohort_details.csv", "settings.csv", "covariates.csv", + "covariates_continuous.csv", "covariate_ref.csv", + "analysis_ref.csv", "cohort_counts.csv", + "time_to_event.csv", + "rechallenge_fail_case_series.csv", "dechallenge_rechallenge.csv" + ) - insertAndromedaToDatabase( - connection = conn, - databaseSchema = "main", - tableName = "analysis_ref", - andromedaObject = result$analysisRef, - tablePrefix = tablePrefix - ) - insertAndromedaToDatabase( - connection = conn, - databaseSchema = "main", - tableName = "covariate_ref", - andromedaObject = result$covariateRef, - tablePrefix = tablePrefix + # this makes sure results are recreated + firstTracker <- data.frame( + table = tables, + first = rep(T, length(tables)) + ) + + analysisRefTracker <- c() + covariateRefTracker <- c() + settingsTracker <- c() + + # create outputFolder + + folderNames <- dir(executionPath) + + # for each folder load covariates, covariates_continuous, + # covariate_ref and analysis_ref + for (folderName in folderNames) { + for (csvType in tables) { + loadPath <- file.path(executionPath, folderName, csvType) + savePath <- file.path(outputFolder, paste0(csvFilePrefix, csvType)) + if (file.exists(loadPath)) { + # TODO do this in batches + data <- readr::read_csv( + file = loadPath, + show_col_types = F ) - if (!is.null(result$covariates)) { - insertAndromedaToDatabase( - connection = conn, - databaseSchema = "main", - tableName = "covariates", - andromedaObject = result$covariates, - tablePrefix = tablePrefix, - minCellCount = minCellCount, - minCellCountColumns = list( - c("sumValue") # c('SUM_VALUE') #AVERAGE_VALUE - ) - ) + if (csvType == "analysis_ref.csv") { + data <- data %>% + dplyr::mutate( + unique_id = paste0(.data$setting_id, "-", .data$analysis_id) + ) %>% + dplyr::filter( # need to filter analysis_id and setting_id + !.data$unique_id %in% analysisRefTracker + ) %>% + dplyr::select(-"unique_id") + + analysisRefTracker <- unique(c(analysisRefTracker, paste0(data$setting_id, "-", data$analysis_id))) } - - if (!is.null(result$covariatesContinuous)) { - insertAndromedaToDatabase( - connection = conn, - databaseSchema = "main", - tableName = "covariates_continuous", - andromedaObject = result$covariatesContinuous, - tablePrefix = tablePrefix, - minCellCount = minCellCount, - minCellCountColumns = list( - c("countValue") + if (csvType == "covariate_ref.csv") { # this could be problematic as may have differnet covariate_ids + data <- data %>% + dplyr::mutate( + unique_id = paste0(.data$setting_id, "-", .data$covariate_id) + ) %>% + dplyr::filter( # need to filter covariate_id and setting_id + !.data$unique_id %in% covariateRefTracker + ) %>% + dplyr::select(-"unique_id") + + covariateRefTracker <- unique(c(covariateRefTracker, paste0(data$setting_id, "-", data$covariate_id))) + } + if (csvType == "settings.csv") { + data <- data %>% + dplyr::filter( + !.data$setting_id %in% settingsTracker ) - ) + settingsTracker <- c(settingsTracker, unique(data$setting_id)) } + + append <- file.exists(savePath) + readr::write_csv( + x = data, + file = savePath, quote = "all", + append = append & !firstTracker$first[firstTracker$table == csvType] + ) + firstTracker$first[firstTracker$table == csvType] <- F } } } - - - invisible(saveDirectory) } diff --git a/R/SaveLoad.R b/R/SaveLoad.R index af7bb4e..61583bd 100644 --- a/R/SaveLoad.R +++ b/R/SaveLoad.R @@ -14,40 +14,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -colnamesLower <- function(data) { - colnames(data) <- tolower( - x = colnames(data) - ) - return(data) -} - -#' Save the TimeToEvent results -#' -#' @param result The output of running \code{computeTimeToEventAnalyses()} -#' @template FileName -#' -#' @return -#' A string specifying the directory the results are saved to -#' -#' @export -saveTimeToEventAnalyses <- function( - result, - fileName) { - Andromeda::saveAndromeda( - andromeda = result, - fileName = fileName, - maintainConnection = T - ) - - invisible(fileName) -} - #' export the TimeToEvent results as csv #' #' @param result The output of running \code{computeTimeToEventAnalyses()} #' @template saveDirectory #' @param minCellCount The minimum value that will be displayed in count columns -#' +#' @family {SaveLoad} #' @return #' A string specifying the directory the csv results are saved to #' @@ -82,13 +54,13 @@ exportTimeToEventToCsv <- function( string = colnames(dat) ) - if (sum(dat$NUM_EVENTS < minCellCount) > 0) { - ParallelLogger::logInfo(paste0("Removing NUM_EVENTS less than ", minCellCount)) - dat$NUM_EVENTS[dat$NUM_EVENTS < minCellCount] <- -1 + if (sum(dat$num_events < minCellCount) > 0) { + ParallelLogger::logInfo(paste0("Removing num_events less than ", minCellCount)) + dat$num_events[dat$num_events < minCellCount] <- -minCellCount } readr::write_csv( - x = dat, + x = formatDouble(x = dat), file = file.path( saveDirectory, "time_to_event.csv" @@ -106,97 +78,13 @@ exportTimeToEventToCsv <- function( ) } -#' Load the TimeToEvent results -#' -#' @template FileName -#' -#' @return -#' A data.frame with the TimeToEvent results -#' -#' @export -loadTimeToEventAnalyses <- function(fileName) { - result <- Andromeda::loadAndromeda(fileName) - return(result) -} - -#' Save the DechallengeRechallenge results -#' -#' @param result The output of running \code{computeDechallengeRechallengeAnalyses()} -#' @template FileName -#' -#' @return -#' A string specifying the directory the results are saved to -#' -#' @export -saveDechallengeRechallengeAnalyses <- function( - result, - fileName) { - Andromeda::saveAndromeda( - andromeda = result, - fileName = fileName, - maintainConnection = T - ) - - invisible(fileName) -} - - -#' Save the RechallengeFailCaseSeries results -#' -#' @param result The output of running \code{computeRechallengeFailCaseSeriesAnalyses()} -#' @template FileName -#' -#' @return -#' A string specifying the directory the results are saved to -#' -#' @export -saveRechallengeFailCaseSeriesAnalyses <- function( - result, - fileName) { - Andromeda::saveAndromeda( - andromeda = result, - fileName = fileName, - maintainConnection = T - ) - - invisible(fileName) -} - - -#' Load the DechallengeRechallenge results -#' -#' @template FileName -#' -#' @return -#' A data.frame with the DechallengeRechallenge results -#' -#' @export -loadDechallengeRechallengeAnalyses <- function( - fileName) { - result <- Andromeda::loadAndromeda(fileName) - return(result) -} - -#' Load the RechallengeFailCaseSeries results -#' -#' @template FileName -#' -#' @return -#' A data.frame with the RechallengeFailCaseSeries results -#' -#' @export -loadRechallengeFailCaseSeriesAnalyses <- function( - fileName) { - result <- Andromeda::loadAndromeda(fileName) - return(result) -} #' export the DechallengeRechallenge results as csv #' #' @param result The output of running \code{computeDechallengeRechallengeAnalyses()} #' @template saveDirectory #' @param minCellCount The minimum value that will be displayed in count columns -#' +#' @family {SaveLoad} #' @return #' A string specifying the directory the csv results are saved to #' @@ -210,6 +98,10 @@ exportDechallengeRechallengeToCsv <- function( ) message("Writing ", countN, " rows to csv") + if (!dir.exists(saveDirectory)) { + dir.create(saveDirectory, recursive = T) + } + Andromeda::batchApply( tbl = result$dechallengeRechallenge, fun = function(x) { @@ -228,50 +120,72 @@ exportDechallengeRechallengeToCsv <- function( string = colnames(dat) ) - removeInd <- dat$NUM_EVENTS < minCellCount + removeInd <- dat$num_exposure_eras < minCellCount if (sum(removeInd) > 0) { - ParallelLogger::logInfo(paste0("Removing NUM_EVENTS counts less than ", minCellCount)) + ParallelLogger::logInfo(paste0("Censoring num_exposure_eras counts less than ", minCellCount)) if (sum(removeInd) > 0) { - dat$NUM_CASES[removeInd] <- -1 + dat$num_exposure_eras[removeInd] <- -minCellCount } } - removeInd <- dat$DECHALLENGE_ATTEMPT < minCellCount + removeInd <- dat$num_persons_exposed < minCellCount if (sum(removeInd) > 0) { - ParallelLogger::logInfo(paste0("Removing DECHALLENGE_ATTEMPT counts less than ", minCellCount)) + ParallelLogger::logInfo(paste0("Censoring num_persons_exposed counts less than ", minCellCount)) if (sum(removeInd) > 0) { - dat$DECHALLENGE_ATTEMPT[removeInd] <- -1 + dat$num_persons_exposed[removeInd] <- -minCellCount } } - removeInd <- dat$DECHALLENGE_FAIL < minCellCount | dat$DECHALLENGE_SUCCESS < minCellCount + removeInd <- dat$num_cases < minCellCount if (sum(removeInd) > 0) { - ParallelLogger::logInfo(paste0("Removing DECHALLENGE FAIL or SUCCESS counts less than ", minCellCount)) + ParallelLogger::logInfo(paste0("Censoring num_cases counts less than ", minCellCount)) if (sum(removeInd) > 0) { - dat$DECHALLENGE_FAIL[removeInd] <- -1 - dat$DECHALLENGE_SUCCESS[removeInd] <- -1 + dat$num_cases[removeInd] <- -minCellCount } } - removeInd <- dat$RECHALLENGE_ATTEMPT < minCellCount + removeInd <- dat$dechallenge_attempt < minCellCount if (sum(removeInd) > 0) { - ParallelLogger::logInfo(paste0("Removing RECHALLENGE_ATTEMPT counts less than ", minCellCount)) + ParallelLogger::logInfo(paste0("Censoring/removing dechallenge_attempt counts less than ", minCellCount)) if (sum(removeInd) > 0) { - dat$RECHALLENGE_ATTEMPT[removeInd] <- -1 + dat$dechallenge_attempt[removeInd] <- -minCellCount + dat$pct_dechallenge_attempt[removeInd] <- NA } } - removeInd <- dat$RECHALLENGE_FAIL < minCellCount | dat$RECHALLENGE_SUCCESS < minCellCount + removeInd <- dat$dechallenge_fail < minCellCount | dat$dechallenge_success < minCellCount if (sum(removeInd) > 0) { - ParallelLogger::logInfo(paste0("Removing RECHALLENGE FAIL or SUCCESS counts less than ", minCellCount)) + ParallelLogger::logInfo(paste0("Censoring/removing DECHALLENGE FAIL or SUCCESS counts less than ", minCellCount)) if (sum(removeInd) > 0) { - dat$RECHALLENGE_FAIL[removeInd] <- -1 - dat$RECHALLENGE_SUCCESS[removeInd] <- -1 + dat$dechallenge_fail[removeInd] <- -minCellCount + dat$dechallenge_success[removeInd] <- -minCellCount + dat$pct_dechallenge_fail[removeInd] <- NA + dat$pct_dechallenge_success[removeInd] <- NA + } + } + + removeInd <- dat$rechallenge_attempt < minCellCount + if (sum(removeInd) > 0) { + ParallelLogger::logInfo(paste0("Censoring/removing rechallenge_attempt counts less than ", minCellCount)) + if (sum(removeInd) > 0) { + dat$rechallenge_attempt[removeInd] <- -minCellCount + dat$pct_rechallenge_attempt[removeInd] <- NA + } + } + + removeInd <- dat$rechallenge_fail < minCellCount | dat$rechallenge_success < minCellCount + if (sum(removeInd) > 0) { + ParallelLogger::logInfo(paste0("Censoring/removing rechallenge_fail or rechallenge_success counts less than ", minCellCount)) + if (sum(removeInd) > 0) { + dat$rechallenge_fail[removeInd] <- -minCellCount + dat$rechallenge_success[removeInd] <- -minCellCount + dat$pct_rechallenge_fail[removeInd] <- NA + dat$pct_rechallenge_success[removeInd] <- NA } } readr::write_csv( - x = dat, + x = formatDouble(x = dat), file = file.path( saveDirectory, "dechallenge_rechallenge.csv" @@ -293,7 +207,7 @@ exportDechallengeRechallengeToCsv <- function( #' #' @param result The output of running \code{computeRechallengeFailCaseSeriesAnalyses()} #' @template saveDirectory -#' +#' @family {SaveLoad} #' @return #' A string specifying the directory the csv results are saved to #' @@ -334,7 +248,7 @@ exportRechallengeFailCaseSeriesToCsv <- function( ) readr::write_csv( - x = dat, + x = formatDouble(x = dat), file = file.path( saveDirectory, "rechallenge_fail_case_series.csv" @@ -351,307 +265,3 @@ exportRechallengeFailCaseSeriesToCsv <- function( ) ) } - -#' Save the AggregateCovariate results -#' -#' @param result The output of running \code{computeAggregateCovariateAnalyses()} -#' @template FileName -#' -#' @return -#' A string specifying the directory the results are saved to -#' -#' @export -saveAggregateCovariateAnalyses <- function( - result, - fileName) { - Andromeda::saveAndromeda( - andromeda = result, - fileName = fileName, - maintainConnection = T - ) - - invisible(fileName) -} - -#' Load the AggregateCovariate results -#' -#' @template FileName -#' -#' @return -#' A list of data.frames with the AggregateCovariate results -#' -#' @export -loadAggregateCovariateAnalyses <- function( - fileName) { - result <- Andromeda::loadAndromeda( - fileName = fileName - ) - - return(result) -} - -#' export the AggregateCovariate results as csv -#' -#' @param result The output of running \code{computeAggregateCovariateAnalyses()} -#' @template saveDirectory -#' @param minCellCount The minimum value that will be displayed in count columns -#' -#' @return -#' A string specifying the directory the csv results are saved to -#' -#' @export -exportAggregateCovariateToCsv <- function( - result, - saveDirectory, - minCellCount = 0) { - if (!dir.exists(saveDirectory)) { - dir.create(saveDirectory, recursive = T) - } - - # settings - Andromeda::batchApply( - tbl = result$settings, - fun = function(x) { - append <- file.exists( - file.path( - saveDirectory, - "settings.csv" - ) - ) - - dat <- as.data.frame( - x %>% - dplyr::collect() - ) - - colnames(dat) <- SqlRender::camelCaseToSnakeCase( - string = colnames(dat) - ) - - readr::write_csv( - x = dat, - file = file.path( - saveDirectory, - "settings.csv" - ), - append = append - ) - } - ) - # cohort details - Andromeda::batchApply( - tbl = result$cohortCounts, - fun = function(x) { - append <- file.exists( - file.path( - saveDirectory, - "cohort_counts.csv" - ) - ) - - dat <- as.data.frame( - x %>% - dplyr::collect() - ) - - colnames(dat) <- SqlRender::camelCaseToSnakeCase( - string = colnames(dat) - ) - - readr::write_csv( - x = dat, - file = file.path( - saveDirectory, - "cohort_counts.csv" - ), - append = append - ) - } - ) - - # cohort details - Andromeda::batchApply( - tbl = result$cohortDetails, - fun = function(x) { - append <- file.exists( - file.path( - saveDirectory, - "cohort_details.csv" - ) - ) - - dat <- as.data.frame( - x %>% - dplyr::collect() - ) - - colnames(dat) <- SqlRender::camelCaseToSnakeCase( - string = colnames(dat) - ) - - readr::write_csv( - x = dat, - file = file.path( - saveDirectory, - "cohort_details.csv" - ), - append = append - ) - } - ) - - # analysisRef - Andromeda::batchApply( - tbl = result$analysisRef, - fun = function(x) { - append <- file.exists( - file.path( - saveDirectory, - "analysis_ref.csv" - ) - ) - - dat <- as.data.frame( - x %>% - dplyr::collect() - ) - - colnames(dat) <- SqlRender::camelCaseToSnakeCase( - string = colnames(dat) - ) - - readr::write_csv( - x = dat, - file = file.path( - saveDirectory, - "analysis_ref.csv" - ), - append = append - ) - } - ) - - # covariateRef - Andromeda::batchApply( - tbl = result$covariateRef, - fun = function(x) { - append <- file.exists( - file.path( - saveDirectory, - "covariate_ref.csv" - ) - ) - - dat <- as.data.frame( - x %>% - dplyr::collect() - ) - - colnames(dat) <- SqlRender::camelCaseToSnakeCase( - string = colnames(dat) - ) - - readr::write_csv( - x = dat, - file = file.path( - saveDirectory, - "covariate_ref.csv" - ), - append = append - ) - } - ) - - # covariates - Andromeda::batchApply( - tbl = result$covariates, - fun = function(x) { - append <- file.exists( - file.path( - saveDirectory, - "covariates.csv" - ) - ) - - dat <- as.data.frame( - x %>% - dplyr::collect() - ) - - colnames(dat) <- SqlRender::camelCaseToSnakeCase( - string = colnames(dat) - ) - - removeInd <- dat$SUM_VALUE < minCellCount - if (sum(removeInd) > 0) { - ParallelLogger::logInfo(paste0("Removing SUM_VALUE counts less than ", minCellCount)) - if (sum(removeInd) > 0) { - dat$SUM_VALUE[removeInd] <- -1 - dat$AVERAGE_VALUE[removeInd] <- -1 - } - } - - readr::write_csv( - x = dat, - file = file.path( - saveDirectory, - "covariates.csv" - ), - append = append - ) - } - ) - - # covariatesContinuous - Andromeda::batchApply( - tbl = result$covariatesContinuous, - fun = function(x) { - append <- file.exists( - file.path( - saveDirectory, - "covariates_continuous.csv" - ) - ) - - dat <- as.data.frame( - x %>% - dplyr::collect() - ) - - colnames(dat) <- SqlRender::camelCaseToSnakeCase( - string = colnames(dat) - ) - - removeInd <- dat$COUNT_VALUE < minCellCount - if (sum(removeInd) > 0) { - ParallelLogger::logInfo(paste0("Removing COUNT_VALUE counts less than ", minCellCount)) - if (sum(removeInd) > 0) { - dat$COUNT_VALUE[removeInd] <- -1 - } - } - - readr::write_csv( - x = dat, - file = file.path( - saveDirectory, - "covariates_continuous.csv" - ), - append = append - ) - } - ) - invisible( - file.path( - saveDirectory, - c( - "cohort_details.csv", - "settings.csv", - "analysis_ref.csv", - "covariate_ref.csv", - "covariates.csv", - "covariates_continuous.csv", - "cohort_counts.csv" - ) - ) - ) -} diff --git a/R/TimeToEvent.R b/R/TimeToEvent.R index 5affa92..18c9d9c 100644 --- a/R/TimeToEvent.R +++ b/R/TimeToEvent.R @@ -18,7 +18,7 @@ #' #' @param targetIds A list of cohortIds for the target cohorts #' @param outcomeIds A list of cohortIds for the outcome cohorts -#' +#' @family {TimeToEvent} #' @return #' An list with the time to event settings #' @@ -59,9 +59,12 @@ createTimeToEventSettings <- function( #' @template TargetOutcomeTables #' @template TempEmulationSchema #' @param cdmDatabaseSchema The database schema containing the OMOP CDM data -#' @param timeToEventSettings The settings for the timeToEvent study +#' @param settings The settings for the timeToEvent study #' @param databaseId An identifier for the database (string) -#' +#' @param outputFolder A directory to save the results as csv files +#' @param minCellCount The minimum cell value to display, values less than this will be replaced by -1 +#' @param ... extra inputs +#' @family {TimeToEvent} #' @return #' An \code{Andromeda::andromeda()} object containing the time to event results. #' @@ -74,8 +77,11 @@ computeTimeToEventAnalyses <- function( outcomeTable = targetTable, tempEmulationSchema = getOption("sqlRenderTempEmulationSchema"), cdmDatabaseSchema, - timeToEventSettings, - databaseId = "database 1") { + settings, + databaseId = "database 1", + outputFolder = file.path(getwd(), "results"), + minCellCount = 0, + ...) { # check inputs errorMessages <- checkmate::makeAssertCollection() .checkConnectionDetails(connectionDetails, errorMessages) @@ -96,8 +102,8 @@ computeTimeToEventAnalyses <- function( errorMessages = errorMessages ) .checkTimeToEventSettings( - settings = timeToEventSettings, - errorMessages = errorMessages + settings = settings, + errorMessages = errorMessages ) valid <- checkmate::reportAssertions(errorMessages) @@ -116,8 +122,8 @@ computeTimeToEventAnalyses <- function( message("Uploading #cohort_settings") pairs <- expand.grid( - targetCohortDefinitionId = timeToEventSettings$targetIds, - outcomeCohortDefinitionId = timeToEventSettings$outcomeIds + targetCohortDefinitionId = settings$targetIds, + outcomeCohortDefinitionId = settings$outcomeIds ) DatabaseConnector::insertTable( @@ -184,13 +190,107 @@ computeTimeToEventAnalyses <- function( message( paste0( "Computing time-to-event for ", - nrow(timeToEventSettings), - "T-O pairs took ", + nrow(pairs), + " T-O pairs took ", signif(delta, 3), " ", attr(delta, "units") ) ) - return(result) + # add the csv export here + message("exporting to csv file") + exportTimeToEventToCsv( + result = result, + saveDirectory = outputFolder, + minCellCount = minCellCount + ) + + return(invisible(TRUE)) + } +} + +# code that takes a characterizationSettings list, extracts +# timeToEvent settings and then converts into distinct jobs +# based on the number of threads +getTimeToEventJobs <- function( + characterizationSettings, + threads) { + characterizationSettings <- characterizationSettings$timeToEventSettings + if (length(characterizationSettings) == 0) { + return(NULL) + } + ind <- 1:length(characterizationSettings) + targetIds <- lapply(ind, function(i) { + characterizationSettings[[i]]$targetIds + }) + outcomeIds <- lapply(ind, function(i) { + characterizationSettings[[i]]$outcomeIds + }) + + # get all combinations of TnOs, then split by treads + + tnos <- do.call( + what = "rbind", + args = + lapply( + 1:length(targetIds), + function(i) { + expand.grid( + targetId = targetIds[[i]], + outcomeId = outcomeIds[[i]] + ) + } + ) + ) + # find out whether more Ts or more Os + tcount <- length(unique(tnos$targetId)) + ocount <- length(unique(tnos$outcomeId)) + + if (threads > max(tcount, ocount)) { + message("Tnput parameter threads greater than number of targets and outcomes") + message(paste0("Only using ", max(tcount, ocount), " threads for TimeToEvent")) + } + + if (tcount >= ocount) { + threadDf <- data.frame( + targetId = unique(tnos$targetId), + thread = rep(1:threads, ceiling(tcount / threads))[1:tcount] + ) + mergeColumn <- "targetId" + } else { + threadDf <- data.frame( + outcomeId = unique(tnos$outcomeId), + thread = rep(1:threads, ceiling(ocount / threads))[1:ocount] + ) + mergeColumn <- "outcomeId" + } + + tnos <- merge(tnos, threadDf, by = mergeColumn) + sets <- lapply( + X = 1:max(threadDf$thread), + FUN = function(i) { + createTimeToEventSettings( + targetIds = unique(tnos$targetId[tnos$thread == i]), + outcomeIds = unique(tnos$outcomeId[tnos$thread == i]) + ) + } + ) + + # recreate settings + settings <- c() + for (i in 1:length(sets)) { + settings <- rbind( + settings, + data.frame( + functionName = "computeTimeToEventAnalyses", + settings = as.character(ParallelLogger::convertSettingsToJson( + sets[[i]] + )), + executionFolder = paste0("tte_", i), + jobId = paste0("tte_", i) + ) + ) } + + return(settings) } diff --git a/R/ViewShiny.R b/R/ViewShiny.R index 3eea909..bcee5c3 100644 --- a/R/ViewShiny.R +++ b/R/ViewShiny.R @@ -4,17 +4,18 @@ #' This is a shiny app for viewing interactive plots and tables #' @details #' Input is the output of ... -#' @param resultLocation The location of the results +#' @param resultFolder The location of the csv results #' @param cohortDefinitionSet The cohortDefinitionSet extracted using webAPI +#' @family {Shiny} #' @return #' Opens a shiny app for interactively viewing the results #' #' @export viewCharacterization <- function( - resultLocation, + resultFolder, cohortDefinitionSet = NULL) { databaseSettings <- prepareCharacterizationShiny( - resultLocation = resultLocation, + resultFolder = resultFolder, cohortDefinitionSet = cohortDefinitionSet ) @@ -22,34 +23,57 @@ viewCharacterization <- function( } prepareCharacterizationShiny <- function( - resultLocation, - cohortDefinitionSet) { - server <- file.path(resultLocation, "sqliteCharacterization", "sqlite.sqlite") + resultFolder, + cohortDefinitionSet, + sqliteLocation = file.path(tempdir(), "results.sqlite"), + tablePrefix = "", + csvTablePrefix = "c_") { + if (!dir.exists(dirname(sqliteLocation))) { + dir.create(dirname(sqliteLocation), recursive = T) + } - connectionDetailsSettings <- list( + # create sqlite connection + server <- sqliteLocation + connectionDetails <- DatabaseConnector::createConnectionDetails( dbms = "sqlite", server = server ) - connectionDetails <- do.call( - what = DatabaseConnector::createConnectionDetails, - args = connectionDetailsSettings + # create the tables + createCharacterizationTables( + connectionDetails = connectionDetails, + resultSchema = "main", + targetDialect = "sqlite", + deleteExistingTables = T, + createTables = T, + tablePrefix = paste0(tablePrefix, csvTablePrefix) + ) + + # upload the results + insertResultsToDatabase( + connectionDetails = connectionDetails, + schema = "main", + resultsFolder = resultFolder, + tablePrefix = tablePrefix, + csvTablePrefix = csvTablePrefix ) + # add extra tables (cohorts and databases) con <- DatabaseConnector::connect(connectionDetails) on.exit(DatabaseConnector::disconnect(con)) tables <- tolower(DatabaseConnector::getTableNames(con, "main")) + # this now works for different prefixes if (!"cg_cohort_definition" %in% tables) { cohortIds <- unique( c( - DatabaseConnector::querySql(con, "select distinct TARGET_COHORT_ID from c_cohort_details where TARGET_COHORT_ID != 0;")$TARGET_COHORT_ID, - DatabaseConnector::querySql(con, "select distinct OUTCOME_COHORT_ID from c_cohort_details where OUTCOME_COHORT_ID != 0;")$OUTCOME_COHORT_ID, - DatabaseConnector::querySql(con, "select distinct TARGET_COHORT_DEFINITION_ID from c_time_to_event;")$TARGET_COHORT_DEFINITION_ID, - DatabaseConnector::querySql(con, "select distinct OUTCOME_COHORT_DEFINITION_ID from c_time_to_event;")$OUTCOME_COHORT_DEFINITION_ID, - DatabaseConnector::querySql(con, "select distinct TARGET_COHORT_DEFINITION_ID from c_rechallenge_fail_case_series;")$TARGET_COHORT_DEFINITION_ID, - DatabaseConnector::querySql(con, "select distinct OUTCOME_COHORT_DEFINITION_ID from c_rechallenge_fail_case_series;")$OUTCOME_COHORT_DEFINITION_ID + DatabaseConnector::querySql(con, paste0("select distinct TARGET_COHORT_ID from ", tablePrefix, csvTablePrefix, "cohort_details where COHORT_TYPE = 'Target';"))$TARGET_COHORT_ID, + DatabaseConnector::querySql(con, paste0("select distinct OUTCOME_COHORT_ID from ", tablePrefix, csvTablePrefix, "cohort_details where COHORT_TYPE = 'TnO';"))$OUTCOME_COHORT_ID, + DatabaseConnector::querySql(con, paste0("select distinct TARGET_COHORT_DEFINITION_ID from ", tablePrefix, csvTablePrefix, "time_to_event;"))$TARGET_COHORT_DEFINITION_ID, + DatabaseConnector::querySql(con, paste0("select distinct OUTCOME_COHORT_DEFINITION_ID from ", tablePrefix, csvTablePrefix, "time_to_event;"))$OUTCOME_COHORT_DEFINITION_ID, + DatabaseConnector::querySql(con, paste0("select distinct TARGET_COHORT_DEFINITION_ID from ", tablePrefix, csvTablePrefix, "rechallenge_fail_case_series;"))$TARGET_COHORT_DEFINITION_ID, + DatabaseConnector::querySql(con, paste0("select distinct OUTCOME_COHORT_DEFINITION_ID from ", tablePrefix, csvTablePrefix, "rechallenge_fail_case_series;"))$OUTCOME_COHORT_DEFINITION_ID ) ) @@ -68,9 +92,9 @@ prepareCharacterizationShiny <- function( if (!"database_meta_data" %in% tables) { dbIds <- unique( c( - DatabaseConnector::querySql(con, "select distinct DATABASE_ID from c_analysis_ref;")$DATABASE_ID, - DatabaseConnector::querySql(con, "select distinct DATABASE_ID from c_dechallenge_rechallenge;")$DATABASE_ID, - DatabaseConnector::querySql(con, "select distinct DATABASE_ID from c_time_to_event;")$DATABASE_ID + DatabaseConnector::querySql(con, paste0("select distinct DATABASE_ID from ", tablePrefix, csvTablePrefix, "analysis_ref;"))$DATABASE_ID, + DatabaseConnector::querySql(con, paste0("select distinct DATABASE_ID from ", tablePrefix, csvTablePrefix, "dechallenge_rechallenge;"))$DATABASE_ID, + DatabaseConnector::querySql(con, paste0("select distinct DATABASE_ID from ", tablePrefix, csvTablePrefix, "time_to_event;"))$DATABASE_ID ) ) @@ -86,41 +110,15 @@ prepareCharacterizationShiny <- function( ) } - if (!"i_incidence_summary" %in% tables) { - x <- c( - "refId", "databaseId", "sourceName", - "targetCohortDefinitionId", "targetName", "tarId", - "tarStartWith", "tarStartOffset", "tarEndWith", "tarEndOffset", - "subgroupId", "subgroupName", - "outcomeId", "outcomeCohortDefinitionId", "outcomeName", - "clean_window", - "ageId", "ageGroupName", - "genderId", "genderName", - "startYear", "personsAtRiskPe", "personsAtRisk", - "personDaysPe", "personDays", - "personOutcomesPe", "personOutcomes", - "outcomesPe", "outcomes", - "incidenceProportionP100p", - "incidenceRateP100py" - ) - df <- data.frame(matrix(ncol = length(x), nrow = 0)) - colnames(df) <- x - - DatabaseConnector::insertTable( - connection = con, - databaseSchema = "main", - tableName = "i_incidence_summary", - data = df, - camelCaseToSnakeCase = T - ) - } - + # create the settings for the database databaseSettings <- list( - connectionDetailsSettings = connectionDetailsSettings, + connectionDetailsSettings = list( + dbms = "sqlite", + server = server + ), schema = "main", - tablePrefix = "c_", + tablePrefix = paste0(tablePrefix, csvTablePrefix), cohortTablePrefix = "cg_", - incidenceTablePrefix = "i_", databaseTable = "DATABASE_META_DATA" ) @@ -140,7 +138,6 @@ viewChars <- function( connection <- ResultModelManager::ConnectionHandler$new(connectionDetails) databaseSettings$connectionDetailsSettings <- NULL - if (utils::packageVersion("ShinyAppBuilder") < "1.2.0") { # use old method # set database settings into system variables @@ -174,7 +171,7 @@ viewChars <- function( databaseSettings$cgTablePrefix <- databaseSettings$cohortTablePrefix databaseSettings$databaseTable <- "DATABASE_META_DATA" databaseSettings$databaseTablePrefix <- "" - databaseSettings$iTablePrefix <- databaseSettings$incidenceTablePrefix + # databaseSettings$iTablePrefix <- databaseSettings$incidenceTablePrefix databaseSettings$cgTable <- "cohort_definition" if (!testApp) { @@ -212,11 +209,11 @@ getCohortNames <- function(cohortIds, cohortDefinitionSet) { # Borrowed from devtools: https://github.com/hadley/devtools/blob/ba7a5a4abd8258c52cb156e7b26bb4bf47a79f0b/R/utils.r#L44 -is_installed <- function(pkg, version = 0) { +is_installed <- function(pkg) { installed_version <- tryCatch(utils::packageVersion(pkg), error = function(e) NA ) - !is.na(installed_version) && installed_version >= version + !is.na(installed_version) } # Borrowed and adapted from devtools: https://github.com/hadley/devtools/blob/ba7a5a4abd8258c52cb156e7b26bb4bf47a79f0b/R/utils.r#L74 diff --git a/README.md b/README.md index b3668ec..e913fef 100644 --- a/README.md +++ b/README.md @@ -97,9 +97,10 @@ runCharacterizationAnalyses( targetTable = 'cohort', outcomeDatabaseSchema = 'main', outcomeTable = 'cohort', - characterizationSettings = characterizationSettings, - saveDirectory = file.path(tempdir(), 'example'), - tablePrefix = 'c_', + characterizationSettings = characterizationSettings, + outputDirectory = file.path(tempdir(), 'example', 'results'), + executionPath = file.path(tempdir(), 'example', 'execution'), + csvFilePrefix = 'c_', databaseId = 'Eunomia' ) ``` diff --git a/_pkgdown.yml b/_pkgdown.yml index 0549e21..5e3ef84 100644 --- a/_pkgdown.yml +++ b/_pkgdown.yml @@ -40,57 +40,36 @@ reference: - title: "Aggregate Covariate Analysis" desc: > This analysis calculates the aggregate characteristics for a Target cohort (T), an Outcome cohort (O) and combiations of T with O during time at risk and T without O during time at risk. - contents: - - createAggregateCovariateSettings - - computeAggregateCovariateAnalyses + contents: has_concept("Aggregate") - title: "Dechallenge Rechallenge Analysis" desc: > For a given Target cohort (T) and Outcome cohort (O) find any occurrances of a dechallenge (when the T cohort stops close to when O started) and a rechallenge (when T restarts and O starts again) This is useful for investigating causality between drugs and events. - contents: - - createDechallengeRechallengeSettings - - computeDechallengeRechallengeAnalyses - - computeRechallengeFailCaseSeriesAnalyses + contents: has_concept("DechallengeRechallenge") - title: "Time to Event Analysis" desc: > This analysis calculates the timing between the Target cohort (T) and an Outcome cohort (O). - contents: - - createTimeToEventSettings - - computeTimeToEventAnalyses + contents: has_concept("TimeToEvent") - title: "Run Large Scale Characterization Study" desc: > Run multipe aggregate covariate analysis, time to event and dechallenge/rechallenge studies. - contents: - - createCharacterizationSettings - - runCharacterizationAnalyses - - saveCharacterizationSettings - - loadCharacterizationSettings + contents: has_concept("LargeScale") - title: "Save Load" desc: > Functions to save the analysis settings and the results (as sqlite or csv files). - contents: - - saveTimeToEventAnalyses - - exportTimeToEventToCsv - - loadTimeToEventAnalyses - - saveDechallengeRechallengeAnalyses - - saveRechallengeFailCaseSeriesAnalyses - - loadDechallengeRechallengeAnalyses - - loadRechallengeFailCaseSeriesAnalyses - - exportDechallengeRechallengeToCsv - - exportRechallengeFailCaseSeriesToCsv - - saveAggregateCovariateAnalyses - - loadAggregateCovariateAnalyses - - exportAggregateCovariateToCsv + contents: has_concept("SaveLoad") - title: "Insert into Database" desc: > Functions to insert the results into a database. - contents: - - createSqliteDatabase - - createCharacterizationTables - - exportDatabaseToCsv + contents: has_concept("Database") - title: "Shiny App" desc: > Functions to interactively exlore the results from runCharacterizationAnalyses(). - contents: - - viewCharacterization - - + contents: has_concept("Shiny") + - title: "Custom covariates" + desc: > + Code to create covariates during cohort start and end + contents: has_concept("CovariateSetting") + - title: "Incremental" + desc: > + Code to run incremetal model + contents: has_concept("Incremental") diff --git a/docs/404.html b/docs/404.html index 16fc3ae..7ee4ee6 100644 --- a/docs/404.html +++ b/docs/404.html @@ -6,7 +6,7 @@ Page not found (404) • Characterization - + @@ -32,7 +32,7 @@ Characterization - 0.2.0 + 2.0.0 @@ -61,7 +61,7 @@ Characterization Installation Guide
  • - Using Characterization Package + Using Characterization Package
  • @@ -71,7 +71,7 @@ @@ -72,7 +72,7 @@
  • Changelog
  • Changelog
  • Changelog
  • Changelog
  • Changelog
  • Changelog
  • Changelog
  • Changelog
  • Changelog
  • Changelog