diff --git a/NaaccrParser/DESCRIPTION b/NaaccrParser/DESCRIPTION index ce4172a..6420487 100644 --- a/NaaccrParser/DESCRIPTION +++ b/NaaccrParser/DESCRIPTION @@ -10,4 +10,6 @@ Encoding: UTF-8 LazyData: true Depends: SqlRender , - DatabaseConnector + DatabaseConnector, + XML, + tools diff --git a/NaaccrParser/R/parse_NAACCR.R b/NaaccrParser/R/parse_NAACCR.R index ee85fec..a3d98a9 100644 --- a/NaaccrParser/R/parse_NAACCR.R +++ b/NaaccrParser/R/parse_NAACCR.R @@ -1,9 +1,11 @@ + + +# main function for parsing fixed-width data NAACCR_to_db <- function(file_path ,record_id_prefix = NULL ,connectionDetails){ - conn <- DatabaseConnector::connect(connectionDetails) # Get NAACCR version # 160 = v16, 170 = v17, etc @@ -64,8 +66,11 @@ NAACCR_to_db <- function(file_path ,"naaccr_item_value" ) - ret_df <- data.frame(matrix(nrow= 0, ncol = length(col_names))) - names(ret_df) <- col_names + # ret_df <- data.frame(matrix(nrow= 0, ncol = length(col_names))) +# names(ret_df) <- col_names + + + conn <- DatabaseConnector::connect(connectionDetails) @@ -77,6 +82,8 @@ NAACCR_to_db <- function(file_path tmp_df <- data.frame(matrix(nrow= nrow(record_layout), ncol = length(col_names))) names(tmp_df) <- col_names + + # Get all (naaccr_item_number, naaccr_item_value) pairs for(j in 1:nrow(record_layout)){ curr_item <- record_layout[j,] @@ -130,38 +137,50 @@ NAACCR_to_db <- function(file_path # Remove empty fields tmp_df <- tmp_df[nchar(tmp_df$naaccr_item_value) > 0,] - + # Workaround for ingestion bug (R setting person_id to 'logi' which DB doesn't like) + tmp_df$person_id <- as.integer(tmp_df$person_id) if(nrow(tmp_df) > 0){ # Append rows to result dataframe + + + DatabaseConnector::insertTable(connection = conn, - tableName = "naaccr_data_points", - data = tmp_df, - dropTableIfExists = FALSE, - createTable = FALSE, - tempTable = FALSE, - useMppBulkLoad = FALSE) + tableName = "naaccr_data_points", + databaseSchema = "NAACCR_OMOP.dbo", + data = tmp_df, + dropTableIfExists = FALSE, + createTable = FALSE, + tempTable = FALSE) + + + + print(paste0("Row: ",i, "--", file_path, ": inserted ", nrow(tmp_df), " rows")) + } } } - - print("Completed") + DatabaseConnector::disconnect(conn) } - +# optional function to populate person_id after ingestion assign_person_id <- function(connectionDetails + ,ndp_schema + ,person_map_schema ,person_map_table ,person_map_field = "MRN"){ # requires DB.schema.table format - curr_sql <- SqlRender::render("UPDATE naaccr_data_points SET person_id = x.person_id FROM @table x WHERE naaccr_data_points.mrn = x.@field;" + curr_sql <- SqlRender::render("UPDATE @ndp_schema.naaccr_data_points SET person_id = x.person_id FROM @person_map_schema.@table x WHERE naaccr_data_points.mrn = x.@field;" + ,ndp_schema = ndp_schema + ,person_map_schema = person_map_schema ,field = person_map_field ,table = person_map_table) @@ -174,4 +193,290 @@ assign_person_id <- function(connectionDetails +# main function for parsing XML +parse_XML_to_DB <- function(file_path + ,record_id_prefix = NULL + ,connectionDetails){ + + if(is.null(file_path)){ + print("error: NULL file_path") + exit() + } + + if(is.null(connectionDetails)){ + print("Error: NULL connectionDetails") + exit() + } + + file_name <- basename(file_path_sans_ext(file_path)) + + + if(!is.null(record_id_prefix)){ + record_id_prefix <- paste0(file_name, "_") + }else{ + record_id_prefix <- paste0(record_id_prefix, "_") + } + + + + + + a <- XML::xmlParse(file_path) + + + # START - test check file typ + t1 <- file_ext("//nectsifs/Import/NAACCR_share/2020_2021.xml") + + + b <- XML::xmlToList(a) + + + itemNum_to_NaaccrID_v23 <- read.csv("//nectsifs/Import/NAACCR_2023/itemNum_to_NaaccrID_v23.csv") + + + + + + # 1) --------- Data set metadata -------------- # + + # create result df + mat <- matrix(nrow = 0, ncol = 2) + items_df <- as.data.frame(mat) + + for(i in 1:length(b)){ + if (names(b[i]) == 'Item'){ + curr <- unlist(b[i]) + items_df <- rbind(items_df, curr) + } + } + names(items_df) <- c("value", "naaccrId") + + + # create result df + mat <- matrix(nrow = 0, ncol = 4) + pat_df <- as.data.frame(mat) + names(pat_df) <- c("pat_id","tumor_index", "value", "naaccrId") + + + + # make template for recreating + pat_df_template <- pat_df + + # make final output table + pat_df_output <- pat_df + + + # split up parsing for easier computation + patient_count <- length(b) + + start_index <- 1 + end_index <- start_index + 100 + + + + + while(start_index < patient_count){ + + + pat_df <- pat_df_template + + + end_index <- start_index + 100 + if(end_index > patient_count){ + end_index <- patient_count + } + + for(j in start_index:end_index){ + + if (names(b[j]) == 'Patient'){ + + curr_pat <- b[j]$Patient + + tumor_number <- 1 + + for(i in 1:length(curr_pat)){ + if (names(curr_pat[i]) == 'Item'){ + curr <- unlist(curr_pat[i]) + curr <- c(j,tumor_number, curr) + pat_df <- rbind(pat_df, curr) + } + else if (names(curr_pat[i]) == 'Tumor'){ + curr_tumor <- curr_pat[i]$Tumor + + + for(k in 1:length(curr_tumor)){ + curr <- unlist(curr_tumor[k]) + curr <- c(j, tumor_number, curr) + pat_df <- rbind(pat_df, curr) + } + tumor_number <- tumor_number + 1 + } + }} + } + + names(pat_df) <- c("pat_id","tumor_index", "value", "naaccrId") + + # incrementally add chunks to output for efficiency + pat_df_output <- rbind(pat_df_output, pat_df) + start_index <- end_index + 1 + + + } + + + names(pat_df_output) <- c("pat_id","tumor_index", "value", "naaccrId") + + + timestamp() + + + + curr <- merge(pat_df_output, itemNum_to_NaaccrID_v23, by.x = "naaccrId", by.y = "XML.NAACCR.ID", all.x = TRUE) + + + # duplicates for some reason - removing + curr <- unique(curr) + + # ---- + + #temp for testing + + + var_check <- unique(curr[,(c("naaccrId","Data.Item.Number"))]) + + #curr <- zpat + + #----- + + + + # New + index_map <- unique(curr[,c("pat_id","tumor_index")]) + index_map$record_index <- seq(nrow(index_map)) + + index_map$record_id = paste0( record_id_prefix + ,"/" + ,index_map$record_index) + + + curr <- merge(curr, index_map, by=c("pat_id", "tumor_index")) + + #merge(curr, index_map, by.x = "naaccrId", by.y = "XML.NAACCR.ID", all.x = TRUE) + + # Get static values + + index_map$mrn <- '' + index_map$histology_site <- '' + + mrn_list <- subset(curr, Data.Item.Number == 21) + site_list <- subset(curr, Data.Item.Number == 400) + hist_list <- subset(curr, Data.Item.Number == 522) + behav_list <- subset(curr, Data.Item.Number == 523) + + for(i in 1:nrow(index_map)){ + tmp_record_id <- index_map$record_id[i] + + + index_map$mrn[i] <- mrn_list$value[mrn_list$record_id == tmp_record_id] + + tmp_site <- site_list$value[site_list$record_id == tmp_record_id] + + index_map$histology_site[i] <- paste0( + paste0( + hist_list$value[hist_list$record_id == tmp_record_id] + ,"/" + ,behav_list$value[behav_list$record_id == tmp_record_id] + ) + ,"-" + ,substr(tmp_site, 0,3) + ,"." + ,substr(tmp_site, 4,5) + ) + + } + + # make person_id placeholder + index_map$person_id <- rep('', nrow(index_map)) + + + tmp1 <- index_map[,c("person_id", "record_id", "mrn", "histology_site")] + tmp2 <- curr[,c("record_id", "Data.Item.Number", "naaccrId", "value")] + names(tmp2) <- c("record_id", "naaccr_item_number", "naaccr_item_name", "naaccr_item_value") + + # union is to prevent column reordering + res <- merge(tmp1, tmp2, by = "record_id")[, union(names(tmp1), names(tmp2))] + + ## -- CLEAN + + res$naaccr_item_value <- trimws(res$naaccr_item_value) + + # Remove empty fields + res <- res[nchar(res$naaccr_item_value) > 0,] + + conn <- DatabaseConnector::connect(connectionDetails) + + if(nrow(res) > 0){ + # Append rows to result dataframe + + DatabaseConnector::insertTable(connection = conn, + tableName = "naaccr_data_points", + databaseSchema = "NAACCR_OMOP.dbo", + data = res, + dropTableIfExists = FALSE, + createTable = FALSE, + tempTable = FALSE) + + + } + + DatabaseConnector::disconnect(conn) + + print(paste0("Completed ", file_name, ": inserted ", nrow(res), " rows")) + + +} + +# umbrella function to parse directory of source files +# can be either fixed width or XML +parse_directory <- function(dir_path + ,connectionDetails){ + + + tmp_files <- list.files(dir_path, full.names = TRUE) + + + for(i in 1:length(tmp_files)){ + curr_file <- tmp_files[i] + + + print(paste0("Parsing: ", curr_file)) + + fext <- file_ext(curr_file) + + if(fext == "XML"){ + parse_XML_to_DB(file_path = curr_file + ,record_id_prefix = NULL + ,connectionDetails = connectionDetails) + }else{ + NAACCR_to_db(file_path = curr_file + ,record_id_prefix = NULL + ,connectionDetails = connectionDetails) + } + + } + + + + + + + + +} + + + + + + diff --git a/NaaccrParser/README.md b/NaaccrParser/README.md index cbf536a..afd9eb9 100644 --- a/NaaccrParser/README.md +++ b/NaaccrParser/README.md @@ -4,8 +4,9 @@ NaaccrParser -This package ingests NAACCR fixed width format files (v15, v16, v18), converts the data into a parsable format, and ingests it into a database. This is meant to be the first step of an [ETL process](https://github.com/OHDSI/OncologyWG/wiki/NAACCR-ETL) by the OHDSI Oncology WG to translate NAACCR data into OHDSI CDM tables. This package transforms the source data into an [intermediate format](https://github.com/OHDSI/OncologyWG/wiki/NAACCR-ETL/ETL/NAACCR-ETL/naaccr_data_points), similar to an EAV structure. +This package ingests NAACCR formatted source files, converts the data into a parsable format, and ingests it into a database. This is meant to be the first step of an [ETL process](https://github.com/OHDSI/OncologyWG/wiki/NAACCR-ETL) by the OHDSI Oncology WG to translate NAACCR data into OHDSI CDM tables. This package transforms the source data into an [intermediate format](https://github.com/OHDSI/OncologyWG/wiki/NAACCR-ETL/ETL/NAACCR-ETL/naaccr_data_points), similar to an EAV structure. +This package supports parsing of source files from both fixed width (v15, v16, v18) and XML format (v20-23) NAACCR standards. Once ingested into an EAV format, there is a vocabulary-driven ETL written in SQL to convert the data into OMOP. ## Installation @@ -23,7 +24,7 @@ Load package library(NaaccrParser) ``` -As we are splitting up rows into an EAV structure, we need to retain provenance by appending a source row index to each value pair. The record id prefix is intended to be unique to each file and will be concatenated to the front of each row index. The record id is only used a means to complete the ETL and is not retained in the destination data. +As we are splitting up rows into an EAV structure, we need to retain provenance by appending a source row index to each value pair. The record id prefix is intended to be unique to each file and will be concatenated to the front of each row index. The record id is only used a means to complete the ETL and is not retained in the destination data. If left NULL (which is fine), it uses the current file name as the prefix. ```r record_id_prefix <- "my_id_prefix" @@ -37,25 +38,52 @@ connectionDetails <- createConnectionDetails( dbms="sql server", server="", user="", - password="", - schema ="NAACCR.dbo" + password="" ) ``` -Call the main function to ingest the data +There are separate functions for parsing and ingesting fixed-width source files (v16-18) as well as for XML formatted source files (v20+). If you have a collection of files in the same directory you can leverage an umbrella function that parses all files, regardless of version. + +### Option 1: (directory specific) + +Parse and ingest **all NAACCR files within a specified directory** +```r +parse_directory(dir_path = dir_path # folder containing NAACCR files + ,connectionDetails = connectionDetails) + +``` + +That umbrella function calls helper functions that can be used called to parse individual files. This takes more time but can be used directly when more logical for a specific environment. + +### Option 2: (file specific) + +Parse and ingest a **fixed-width file**: ```r # Import data into database NAACCR_to_db(file_path = "path_to_data/naaccr_file.csv" - , record_id_prefix = record_id_prefix + , record_id_prefix = record_id_prefix # optional , connectionDetails = connectionDetails) ``` + + +Parse and ingest a **XML file**: +```r +# Import data into database +parse_XML_to_DB(file_path = file_path + ,record_id_prefix = NULL # optional + ,connectionDetails = connectionDetails) +``` + +### Populate person_id + At this point the data exists in your database without person_id assigned. This step is optiona as this process can vary between institutions. To populate person_id using this function, you need to create a table in your database that maps MRN to person_id. If the mapping table has a different field name for MRN other than "MRN", it must be specified used the 'person_map_field' parameter. ```r assign_person_id(connectionDetails = connectionDetails - ,person_map_table = "thisdb.dbo.person_map_table" - ,person_map_field = "MRN") - + ,ndp_schema = 'NAACCR_OMOP.dbo' + ,person_map_schema = 'OMOP_COMMON.dbo' + ,person_map_table = 'person_map' + ,person_map_field = "MRN") ``` diff --git a/NaaccrParser/data/itemNum_to_NaaccrID_v23.rda b/NaaccrParser/data/itemNum_to_NaaccrID_v23.rda new file mode 100644 index 0000000..55ff626 Binary files /dev/null and b/NaaccrParser/data/itemNum_to_NaaccrID_v23.rda differ