From f5490e20a261ea2263d4e6d8b5819ab3c9563463 Mon Sep 17 00:00:00 2001 From: Mike Tokic Date: Wed, 17 Jul 2024 15:04:54 -0700 Subject: [PATCH 01/17] synapse best model scaling testing --- R/final_models.R | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/R/final_models.R b/R/final_models.R index 68510c0c..7d16e01e 100644 --- a/R/final_models.R +++ b/R/final_models.R @@ -580,7 +580,8 @@ final_models <- function(run_info, } } - return(best_model_mape) + # return(best_model_mape) + return(data.frame(Combo_Hash = combo)) } %>% base::suppressPackageStartupMessages() @@ -607,8 +608,8 @@ final_models <- function(run_info, ) %>% dplyr::mutate( average_models = average_models, - max_model_average = max_model_average, - weighted_mape = base::mean(best_model_tbl$Rolling_MAPE, na.rm = TRUE) + max_model_average = max_model_average#, + #weighted_mape = base::mean(best_model_tbl$Rolling_MAPE, na.rm = TRUE) ) write_data( From 71a8ea19cb5d7732cf23a20ea36d3781f8e43262 Mon Sep 17 00:00:00 2001 From: Mike Tokic Date: Wed, 17 Jul 2024 15:19:21 -0700 Subject: [PATCH 02/17] update version --- DESCRIPTION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/DESCRIPTION b/DESCRIPTION index 0f210549..c08430b8 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,6 +1,6 @@ Package: finnts Title: Microsoft Finance Time Series Forecasting Framework -Version: 0.4.0.9004 +Version: 0.4.0.9005 Authors@R: c(person(given = "Mike", family = "Tokic", From ec3d179eb30e3efc15343dfa1d81848a140f9183 Mon Sep 17 00:00:00 2001 From: Mike Tokic Date: Wed, 17 Jul 2024 18:18:38 -0700 Subject: [PATCH 03/17] testing --- R/final_models.R | 1 + 1 file changed, 1 insertion(+) diff --git a/R/final_models.R b/R/final_models.R index 7d16e01e..0550fcde 100644 --- a/R/final_models.R +++ b/R/final_models.R @@ -581,6 +581,7 @@ final_models <- function(run_info, } # return(best_model_mape) + Sys.sleep(runif(1, min = 10, max = 30)) return(data.frame(Combo_Hash = combo)) } %>% base::suppressPackageStartupMessages() From b9e6a9b9475d85cee0dc59331a1e4fec89b8bec7 Mon Sep 17 00:00:00 2001 From: Mike Tokic Date: Wed, 17 Jul 2024 19:25:53 -0700 Subject: [PATCH 04/17] testing --- R/final_models.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/final_models.R b/R/final_models.R index 0550fcde..033f9405 100644 --- a/R/final_models.R +++ b/R/final_models.R @@ -581,7 +581,7 @@ final_models <- function(run_info, } # return(best_model_mape) - Sys.sleep(runif(1, min = 10, max = 30)) + Sys.sleep(runif(1, min = 30, max = 90)) return(data.frame(Combo_Hash = combo)) } %>% base::suppressPackageStartupMessages() From 1781d821fbb6ba364545a360f4ee573acc2f6685 Mon Sep 17 00:00:00 2001 From: Mike Tokic Date: Thu, 18 Jul 2024 09:03:11 -0700 Subject: [PATCH 05/17] testing --- R/final_models.R | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/R/final_models.R b/R/final_models.R index 033f9405..3c1b2e0c 100644 --- a/R/final_models.R +++ b/R/final_models.R @@ -175,7 +175,7 @@ final_models <- function(run_info, # submit tasks best_model_tbl <- foreach::foreach( - x = current_combo_list, + x = current_combo_list_final, .combine = "rbind", .packages = packages, .errorhandling = "stop", @@ -581,7 +581,7 @@ final_models <- function(run_info, } # return(best_model_mape) - Sys.sleep(runif(1, min = 30, max = 90)) + # Sys.sleep(runif(1, min = 30, max = 90)) return(data.frame(Combo_Hash = combo)) } %>% base::suppressPackageStartupMessages() From 5603d01ff10163171999a92f1e96aa349a98172a Mon Sep 17 00:00:00 2001 From: Mike Tokic Date: Thu, 18 Jul 2024 13:54:57 -0700 Subject: [PATCH 06/17] testing --- R/final_models.R | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/R/final_models.R b/R/final_models.R index 3c1b2e0c..56157c57 100644 --- a/R/final_models.R +++ b/R/final_models.R @@ -288,6 +288,11 @@ final_models <- function(run_info, } else { best_model_check <- FALSE } + + # if best model is already selected, end job + if(best_model_check) { + return(data.frame(Combo_Hash = combo)) + } # combine all forecasts predictions_tbl <- local_model_tbl %>% From 11519d22172c726aa374951d305c7837bcb78176 Mon Sep 17 00:00:00 2001 From: Mike Tokic Date: Thu, 18 Jul 2024 16:00:18 -0700 Subject: [PATCH 07/17] testing --- R/final_models.R | 2 ++ 1 file changed, 2 insertions(+) diff --git a/R/final_models.R b/R/final_models.R index 56157c57..95b08e2e 100644 --- a/R/final_models.R +++ b/R/final_models.R @@ -186,6 +186,8 @@ final_models <- function(run_info, ) %op% { combo <- x + + Sys.sleep(runif(1, min = 3, max = 90)) # get individual and ensemble model predictions train_test_id_list <- model_train_test_tbl %>% From 3d23f816b9f59b010e02dbaabaa7a4e0dfdfedbf Mon Sep 17 00:00:00 2001 From: Mike Tokic Date: Fri, 19 Jul 2024 15:54:57 -0700 Subject: [PATCH 08/17] always write best average model to disk --- R/final_models.R | 93 ++++++++++++++++++++++++++++++------------------ 1 file changed, 58 insertions(+), 35 deletions(-) diff --git a/R/final_models.R b/R/final_models.R index 95b08e2e..b9a6b623 100644 --- a/R/final_models.R +++ b/R/final_models.R @@ -186,8 +186,6 @@ final_models <- function(run_info, ) %op% { combo <- x - - Sys.sleep(runif(1, min = 3, max = 90)) # get individual and ensemble model predictions train_test_id_list <- model_train_test_tbl %>% @@ -264,35 +262,6 @@ final_models <- function(run_info, # check if model averaging already happened if ("Best_Model" %in% colnames(local_model_tbl %>% rbind(global_model_tbl))) { - # see if average models file exists and add to model tbl - average_model_tbl <- tryCatch( - { - read_file(run_info, - path = paste0( - "/forecasts/", hash_data(run_info$experiment_name), "-", hash_data(run_info$run_name), - "-", combo, "-average_models.", run_info$data_output - ), - return_type = "df" - ) - }, - warning = function(w) { - # do nothing - }, - error = function(e) { - NULL - } - ) - - local_model_tbl <- local_model_tbl %>% - rbind(average_model_tbl) - - best_model_check <- TRUE - } else { - best_model_check <- FALSE - } - - # if best model is already selected, end job - if(best_model_check) { return(data.frame(Combo_Hash = combo)) } @@ -322,7 +291,7 @@ final_models <- function(run_info, final_model_list <- c(local_model_list, global_model_list) # simple model averaging - if (average_models & length(final_model_list) > 1 & !best_model_check) { + if (average_models & length(final_model_list) > 1) { # create model combinations list model_combinations <- tibble::tibble() @@ -391,8 +360,35 @@ final_models <- function(run_info, } else { averages_tbl <- NULL } + + # choose best average model + if(!is.null(averages_tbl)) { + avg_back_test_mape <- averages_tbl %>% + dplyr::mutate( + Train_Test_ID = as.numeric(Train_Test_ID), + Target = ifelse(Target == 0, 0.1, Target) + ) %>% + dplyr::filter(Train_Test_ID != 1) %>% + dplyr::mutate(MAPE = round(abs((Forecast - Target) / Target), digits = 4)) + + avg_best_model_mape <- avg_back_test_mape %>% + dplyr::group_by(Model_ID, Combo) %>% + dplyr::mutate( + Combo_Total = sum(abs(Target), na.rm = TRUE), + weighted_MAPE = (abs(Target) / Combo_Total) * MAPE + ) %>% + dplyr::summarise(Rolling_MAPE = sum(weighted_MAPE, na.rm = TRUE)) %>% + dplyr::arrange(Rolling_MAPE) %>% + dplyr::ungroup() %>% + dplyr::group_by(Combo) %>% + dplyr::slice(1) %>% + dplyr::ungroup() + + avg_best_model_tbl <- avg_best_model_mape %>% + dplyr::select(Combo, Model_ID) + } - # choose best model + # choose best overall model final_predictions_tbl <- predictions_tbl %>% dplyr::select(Combo, Model_ID, Train_Test_ID, Date, Forecast, Target) %>% rbind(averages_tbl) @@ -520,7 +516,6 @@ final_models <- function(run_info, ) } } else { # choose the most accurate individual model and write outputs - final_model_tbl <- tibble::tibble(Model_ID = final_model_list) %>% dplyr::left_join( best_model_final_tbl %>% @@ -528,6 +523,35 @@ final_models <- function(run_info, by = "Model_ID" ) %>% dplyr::mutate(Best_Model = ifelse(!is.na(Best_Model), "Yes", "No")) + + if(!is.null(averages_tbl)) { + avg_model_final_tbl <- averages_tbl %>% + dplyr::right_join(avg_best_model_tbl, + by = c("Combo", "Model_ID") + ) %>% + dplyr::mutate( + Combo_ID = Combo, + Model_Name = "NA", + Model_Type = "local", + Recipe_ID = "simple_average", + Hyperparameter_ID = "NA", + Best_Model = "No" + ) %>% + dplyr::group_by(Combo_ID, Model_ID, Train_Test_ID) %>% + dplyr::mutate(Horizon = dplyr::row_number()) %>% + dplyr::ungroup() %>% + create_prediction_intervals(model_train_test_tbl) %>% + convert_weekly_to_daily(date_type, weekly_to_daily) + + write_data( + x = avg_model_final_tbl, + combo = unique(avg_model_final_tbl$Combo), + run_info = run_info, + output_type = "data", + folder = "forecasts", + suffix = "-average_models" + ) + } if (!is.null(single_model_tbl)) { single_model_final_tbl <- single_model_tbl %>% @@ -588,7 +612,6 @@ final_models <- function(run_info, } # return(best_model_mape) - # Sys.sleep(runif(1, min = 30, max = 90)) return(data.frame(Combo_Hash = combo)) } %>% base::suppressPackageStartupMessages() From e44f0d07cfe602eff796590c928af79e5e1f0823 Mon Sep 17 00:00:00 2001 From: Mike Tokic Date: Fri, 19 Jul 2024 16:05:39 -0700 Subject: [PATCH 09/17] randomize time series list --- R/final_models.R | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/R/final_models.R b/R/final_models.R index b9a6b623..f8b6afa3 100644 --- a/R/final_models.R +++ b/R/final_models.R @@ -124,7 +124,8 @@ final_models <- function(run_info, current_combo_list_final <- setdiff( current_combo_list, prev_combo_list - ) + ) %>% + sample() prev_log_df <- read_file(run_info, path = paste0("logs/", hash_data(run_info$experiment_name), "-", hash_data(run_info$run_name), ".csv"), From c3e5558093a519602bbd108c959ae07f64fe3cfd Mon Sep 17 00:00:00 2001 From: Mike Tokic Date: Mon, 22 Jul 2024 10:42:30 -0700 Subject: [PATCH 10/17] updated weighted mape calc and docs --- DESCRIPTION | 2 +- NEWS.md | 4 +++- R/final_models.R | 22 ++++++++++++++++++---- R/utility.R | 2 +- 4 files changed, 23 insertions(+), 7 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index c08430b8..9354060e 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -24,7 +24,7 @@ License: MIT + file LICENSE Encoding: UTF-8 LazyData: true Roxygen: list(markdown = TRUE) -RoxygenNote: 7.1.1 +RoxygenNote: 7.3.1 Imports: cli, Cubist, diff --git a/NEWS.md b/NEWS.md index 444a0ad6..82ea150e 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,10 +1,12 @@ -# finnts 0.4.0.9004 (DEVELOPMENT VERSION) +# finnts 0.4.0.9005 (DEVELOPMENT VERSION) ## Improvements - Added support for hierarchical forecasting with external regressors - Allow global models for hierarchical forecasts - Multistep horizon forecasts for R1 recipe, listed as `multistep_horizon` within `prep_data()` +- Always save the most accurate model average, regardless if selected as best model. This allows for improved scaling with large data sets. +- Improved weighted MAPE calculation across all time series ## Bug Fixes diff --git a/R/final_models.R b/R/final_models.R index f8b6afa3..1a46e479 100644 --- a/R/final_models.R +++ b/R/final_models.R @@ -3,7 +3,8 @@ #' Select Best Models and Prep Final Outputs #' #' @param run_info run info using the [set_run_info()] function. -#' @param average_models If TRUE, create simple averages of individual models. +#' @param average_models If TRUE, create simple averages of individual models +#' and save the most accurate one. #' @param max_model_average Max number of models to average together. Will #' create model averages for 2 models up until input value or max number of #' models ran. @@ -612,7 +613,6 @@ final_models <- function(run_info, } } - # return(best_model_mape) return(data.frame(Combo_Hash = combo)) } %>% base::suppressPackageStartupMessages() @@ -632,6 +632,20 @@ final_models <- function(run_info, num_cores ) } + + # calculate weighted mape + weighted_mape <- get_forecast_data(run_info = run_info) %>% + dplyr::filter(Run_Type == "Back_Test", + Best_Model == "Yes") %>% + dplyr::mutate( + Target = ifelse(Target == 0, 0.1, Target) + ) %>% + dplyr::mutate(MAPE = round(abs((Forecast - Target) / Target), digits = 4), + Total = sum(Target, na.rm = TRUE), + Weight = (MAPE*Target)/Total) %>% + dplyr::pull(Weight) %>% + sum() %>% + round(digits = 4) # update logging file log_df <- read_file(run_info, @@ -640,8 +654,8 @@ final_models <- function(run_info, ) %>% dplyr::mutate( average_models = average_models, - max_model_average = max_model_average#, - #weighted_mape = base::mean(best_model_tbl$Rolling_MAPE, na.rm = TRUE) + max_model_average = max_model_average, + weighted_mape = round(weighted_mape, digits = 4) ) write_data( diff --git a/R/utility.R b/R/utility.R index cb5a57d5..c9708e0d 100644 --- a/R/utility.R +++ b/R/utility.R @@ -15,7 +15,7 @@ utils::globalVariables(c( "Auto_Accept", "Feature", "Imp", "Importance", "LOFO_Var", "Var_RMSE", "Vote", "Votes", "desc", "term", "Column", "Box_Cox_Lambda", "get_recipie_configurable", "Agg", "Unique", "Var", "Var_Combo", "regressor", "regressor_tbl", "value_level_iter", ".actual", ".fitted", - "forecast_horizon", "lag", "new_data", "object", "fit", "Row_Num", "Run_Number" + "forecast_horizon", "lag", "new_data", "object", "fit", "Row_Num", "Run_Number", "weight" )) #' @importFrom magrittr %>% From 5d10d7f35acdc7fd5bddf1b4135e525be9a54c93 Mon Sep 17 00:00:00 2001 From: Mike Tokic Date: Tue, 23 Jul 2024 18:58:23 -0700 Subject: [PATCH 11/17] condense data function --- R/parallel_util.R | 2 +- R/read_write_data.R | 103 +++++++++++++++++++++++++++++++++++++++++--- man/final_models.Rd | 3 +- 3 files changed, 100 insertions(+), 8 deletions(-) diff --git a/R/parallel_util.R b/R/parallel_util.R index 751ccad1..3746d971 100644 --- a/R/parallel_util.R +++ b/R/parallel_util.R @@ -34,7 +34,7 @@ par_start <- function(run_info, ) parallel_packages <- c( - "gtools", "hts", "magrittr", "methods", "base", "modeltime.resample", + "gtools", "hts", "magrittr", "methods", "base", "plyr", "rsample" ) diff --git a/R/read_write_data.R b/R/read_write_data.R index 34e0c2d3..36e68e2b 100644 --- a/R/read_write_data.R +++ b/R/read_write_data.R @@ -499,21 +499,28 @@ download_file <- function(storage_object, #' #' @param run_info run info using the [set_run_info()] function #' @param path file path +#' @param file_list files #' @param return_type type of data output read #' @param schema column schema for arrow::open_dataset() #' #' @return file read into memory #' @noRd read_file <- function(run_info, - path, + path = NULL, + file_list = NULL, return_type = "df", schema = NULL) { - folder <- fs::path_dir(path) storage_object <- run_info$storage_object - initial_path <- run_info$path - file <- fs::path_file(path) + + if(!is.null(path)) { + folder <- fs::path_dir(path) + initial_path <- run_info$path + file <- fs::path_file(path) + } - if (inherits(storage_object, c("blob_container", "ms_drive"))) { + if (!is.null(file_list)) { + files <- file_list + } else if (inherits(storage_object, c("blob_container", "ms_drive"))) { download_file(storage_object, fs::path(initial_path, path), folder) final_path <- fs::path(tempdir(), folder) files <- list_files(NULL, fs::path(final_path, file)) @@ -524,7 +531,10 @@ read_file <- function(run_info, files <- list_files(storage_object, fs::path(initial_path, path)) } - if (fs::path_ext(file) == "*") { + if(!is.null(file_list)) { + file_temp <- files[[1]] + file_ext <- fs::path_ext(file_temp) + } else if (fs::path_ext(file) == "*") { file_temp <- files[[1]] file_ext <- fs::path_ext(file_temp) } else { @@ -659,3 +669,84 @@ get_recipe_data <- function(run_info, return(recipe_tbl) } + +#' Condense forecast output files into less files +#' +#' @param run_info run info using the [set_run_info()] function +#' @param parallel_processing type of parallel processing to run +#' @param num_cores number of cores to use +#' +#' @return nothing +#' @noRd +condense_data <- function(run_info, + parallel_processing = NULL, + num_cores = NULL) { + + # get initial list of files to condense + initial_file_list <- list_files( + run_info$storage_object, + paste0( + run_info$path, "/forecasts/*", hash_data(run_info$experiment_name), "-", + hash_data(run_info$run_name), "*_models.", run_info$data_output + ) + ) + + # Initialize an empty list to store the batches + list_of_batches <- list() + + # Define the batch size + batch_size <- 10000 + + # Calculate the number of batches needed + num_batches <- ceiling(length(initial_file_list) / batch_size) + + # Loop through the large list and create batches + for (i in 1:num_batches) { + start_index <- (i - 1) * batch_size + 1 + end_index <- min(i * batch_size, length(initial_file_list)) + batch_name <- paste0("batch_", i) + list_of_batches[[batch_name]] <- initial_file_list[start_index:end_index] + } + + # parallel run info + par_info <- par_start( + run_info = run_info, + parallel_processing = parallel_processing, + num_cores = min(length(names(list_of_batches)), num_cores), + task_length = length(names(list_of_batches)) + ) + + cl <- par_info$cl + packages <- par_info$packages + `%op%` <- par_info$foreach_operator + + # submit tasks + condense_data_tbl <- foreach::foreach( + batch = names(list_of_batches), + .combine = "rbind", + .packages = packages, + .errorhandling = "stop", + .verbose = FALSE, + .inorder = FALSE, + .multicombine = TRUE, + .noexport = NULL + ) %op% + { + files <- list_of_batches[[batch]] + + data <- read_file(run_info, + file_list = files, + return_type = "df") + + write_data( + x = data, + combo = batch, + run_info = run_info, + output_type = "data", + folder = "forecasts", + suffix = "-condensed" + ) + + return(batch) + } +} diff --git a/man/final_models.Rd b/man/final_models.Rd index d6c86402..e6dcaaae 100644 --- a/man/final_models.Rd +++ b/man/final_models.Rd @@ -17,7 +17,8 @@ final_models( \arguments{ \item{run_info}{run info using the \code{\link[=set_run_info]{set_run_info()}} function.} -\item{average_models}{If TRUE, create simple averages of individual models.} +\item{average_models}{If TRUE, create simple averages of individual models +and save the most accurate one.} \item{max_model_average}{Max number of models to average together. Will create model averages for 2 models up until input value or max number of From 784a495b1f2f45396ec9da41c824f1da12aaed94 Mon Sep 17 00:00:00 2001 From: Mike Tokic Date: Tue, 23 Jul 2024 19:03:44 -0700 Subject: [PATCH 12/17] update news --- NEWS.md | 1 + 1 file changed, 1 insertion(+) diff --git a/NEWS.md b/NEWS.md index 82ea150e..ee1b746b 100644 --- a/NEWS.md +++ b/NEWS.md @@ -6,6 +6,7 @@ - Allow global models for hierarchical forecasts - Multistep horizon forecasts for R1 recipe, listed as `multistep_horizon` within `prep_data()` - Always save the most accurate model average, regardless if selected as best model. This allows for improved scaling with large data sets. +- Automatically condense large forecasts (+10k time series) into smaller amount of files to make it easier to read forecast outputs - Improved weighted MAPE calculation across all time series ## Bug Fixes From 25ce680ec20f708f0118fb64acab7ca8a4baa945 Mon Sep 17 00:00:00 2001 From: Mike Tokic Date: Wed, 24 Jul 2024 13:43:16 -0700 Subject: [PATCH 13/17] read condensed files --- R/final_models.R | 7 +++++++ R/read_write_data.R | 25 +++++++++++++++++++++++-- 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/R/final_models.R b/R/final_models.R index 1a46e479..fad901d7 100644 --- a/R/final_models.R +++ b/R/final_models.R @@ -633,6 +633,13 @@ final_models <- function(run_info, ) } + # condense outputs into less files for larger runs + if(length(combo_list) > 1000) { + condense_data(run_info, + parallel_processing, + num_cores) + } + # calculate weighted mape weighted_mape <- get_forecast_data(run_info = run_info) %>% dplyr::filter(Run_Type == "Back_Test", diff --git a/R/read_write_data.R b/R/read_write_data.R index 36e68e2b..7f289033 100644 --- a/R/read_write_data.R +++ b/R/read_write_data.R @@ -60,7 +60,7 @@ get_forecast_data <- function(run_info, combo_variables <- strsplit(log_df$combo_variables, split = "---")[[1]] forecast_approach <- log_df$forecast_approach - # get forecast data + # get train test split data model_train_test_tbl <- read_file(run_info, path = paste0( "/prep_models/", hash_data(run_info$experiment_name), "-", hash_data(run_info$run_name), @@ -71,7 +71,28 @@ get_forecast_data <- function(run_info, dplyr::select(Run_Type, Train_Test_ID) %>% dplyr::mutate(Train_Test_ID = as.numeric(Train_Test_ID)) - if (forecast_approach == "bottoms_up") { + # check if data has been condensed + cond_path <- paste0( + run_info$path, "/forecasts/*", hash_data(run_info$experiment_name), "-", + hash_data(run_info$run_name), "*condensed", ".", run_info$data_output + ) + + condensed_files <- list_files(run_info$storage_object, fs::path(cond_path)) + + if (length(condensed_files) > 0) { + condensed <- TRUE + } else { + condensed <- FALSE + } + + # get forecast data + if(condensed) { + print("reading condensed files") + fcst_path <- paste0( + "/forecasts/*", hash_data(run_info$experiment_name), "-", + hash_data(run_info$run_name), "*condensed", ".", run_info$data_output + ) + } else if (forecast_approach == "bottoms_up") { fcst_path <- paste0( "/forecasts/*", hash_data(run_info$experiment_name), "-", hash_data(run_info$run_name), "*models", ".", run_info$data_output From edae42f1a9091d3d50e4158b718a0e452feaa30d Mon Sep 17 00:00:00 2001 From: Mike Tokic Date: Wed, 24 Jul 2024 16:52:19 -0700 Subject: [PATCH 14/17] condense data updates --- R/final_models.R | 2 +- R/read_write_data.R | 2 +- R/utility.R | 3 ++- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/R/final_models.R b/R/final_models.R index fad901d7..cc97ab98 100644 --- a/R/final_models.R +++ b/R/final_models.R @@ -140,7 +140,7 @@ final_models <- function(run_info, run_local_models <- prev_log_df$run_local_models run_ensemble_models <- prev_log_df$run_ensemble_models - if ((length(current_combo_list_final) == 0 & length(prev_combo_list) > 0) | sum(colnames(prev_log_df) %in% "weighted_mape")) { + if (sum(colnames(prev_log_df) %in% "weighted_mape")) { # check if input values have changed current_log_df <- tibble::tibble( diff --git a/R/read_write_data.R b/R/read_write_data.R index 7f289033..a2999023 100644 --- a/R/read_write_data.R +++ b/R/read_write_data.R @@ -416,7 +416,7 @@ write_data_type <- function(x, switch(type, rds = saveRDS(x, path), parquet = arrow::write_parquet(x, path), - csv = utils::write.csv(x, path, row.names = FALSE), + csv = vroom::vroom_write(x, path, delim = ",", progress = FALSE), qs = qs::qsave(x, path) ) } diff --git a/R/utility.R b/R/utility.R index c9708e0d..2e9d96ce 100644 --- a/R/utility.R +++ b/R/utility.R @@ -15,7 +15,8 @@ utils::globalVariables(c( "Auto_Accept", "Feature", "Imp", "Importance", "LOFO_Var", "Var_RMSE", "Vote", "Votes", "desc", "term", "Column", "Box_Cox_Lambda", "get_recipie_configurable", "Agg", "Unique", "Var", "Var_Combo", "regressor", "regressor_tbl", "value_level_iter", ".actual", ".fitted", - "forecast_horizon", "lag", "new_data", "object", "fit", "Row_Num", "Run_Number", "weight" + "forecast_horizon", "lag", "new_data", "object", "fit", "Row_Num", "Run_Number", "weight", + "Total", "Weight", "batch" )) #' @importFrom magrittr %>% From e79a10dac1b2cd42efd1bf0a5ec930b84e81defe Mon Sep 17 00:00:00 2001 From: Mike Tokic Date: Wed, 24 Jul 2024 17:40:08 -0700 Subject: [PATCH 15/17] condense data testing --- R/final_models.R | 16 +++++++++------- R/hierarchy.R | 34 +++++++++++++++++++++++++--------- R/read_write_data.R | 13 ++++++------- 3 files changed, 40 insertions(+), 23 deletions(-) diff --git a/R/final_models.R b/R/final_models.R index cc97ab98..6823599f 100644 --- a/R/final_models.R +++ b/R/final_models.R @@ -619,6 +619,15 @@ final_models <- function(run_info, # clean up any parallel run process par_end(cl) + + # condense outputs into less files for larger runs + if(length(combo_list) > 100) { + cli::cli_progress_step("Condensing Forecasts") + + condense_data(run_info, + parallel_processing, + num_cores) + } # reconcile hierarchical forecasts if (forecast_approach != "bottoms_up") { @@ -633,13 +642,6 @@ final_models <- function(run_info, ) } - # condense outputs into less files for larger runs - if(length(combo_list) > 1000) { - condense_data(run_info, - parallel_processing, - num_cores) - } - # calculate weighted mape weighted_mape <- get_forecast_data(run_info = run_info) %>% dplyr::filter(Run_Type == "Back_Test", diff --git a/R/hierarchy.R b/R/hierarchy.R index da0ab062..72d87255 100644 --- a/R/hierarchy.R +++ b/R/hierarchy.R @@ -420,6 +420,20 @@ reconcile_hierarchical_data <- function(run_info, hts_nodes <- hts_list$nodes original_combo_list <- hts_list$original_combos hts_combo_list <- hts_list$hts_combos + + # check if data has been condensed + cond_path <- paste0( + run_info$path, "/forecasts/*", hash_data(run_info$experiment_name), "-", + hash_data(run_info$run_name), "*condensed", ".", run_info$data_output + ) + + condensed_files <- list_files(run_info$storage_object, fs::path(cond_path)) + + if (length(condensed_files) > 0) { + condensed <- TRUE + } else { + condensed <- FALSE + } # get unreconciled forecast data if (is.null(parallel_processing)) { @@ -430,10 +444,17 @@ reconcile_hierarchical_data <- function(run_info, return_type <- "df" } - fcst_path <- paste0( - "/forecasts/*", hash_data(run_info$experiment_name), "-", - hash_data(run_info$run_name), "*models", ".", run_info$data_output - ) + if(condensed) { + fcst_path <- paste0( + "/forecasts/*", hash_data(run_info$experiment_name), "-", + hash_data(run_info$run_name), "*condensed", ".", run_info$data_output + ) + } else { + fcst_path <- paste0( + "/forecasts/*", hash_data(run_info$experiment_name), "-", + hash_data(run_info$run_name), "*models", ".", run_info$data_output + ) + } unreconciled_tbl <- read_file(run_info, path = fcst_path, @@ -677,11 +698,6 @@ reconcile_hierarchical_data <- function(run_info, ) %>% dplyr::select(Combo, Date, Target) - fcst_path <- paste0( - "/forecasts/*", hash_data(run_info$experiment_name), "-", - hash_data(run_info$run_name), "*models", ".", run_info$data_output - ) - schema <- arrow::schema( arrow::field("Combo_ID", arrow::string()), arrow::field("Model_ID", arrow::string()), diff --git a/R/read_write_data.R b/R/read_write_data.R index a2999023..5ee304b0 100644 --- a/R/read_write_data.R +++ b/R/read_write_data.R @@ -86,21 +86,20 @@ get_forecast_data <- function(run_info, } # get forecast data - if(condensed) { - print("reading condensed files") + if(forecast_approach != "bottoms_up") { fcst_path <- paste0( "/forecasts/*", hash_data(run_info$experiment_name), "-", - hash_data(run_info$run_name), "*condensed", ".", run_info$data_output + hash_data(run_info$run_name), "*reconciled", ".", run_info$data_output ) - } else if (forecast_approach == "bottoms_up") { + } else if (condensed) { fcst_path <- paste0( "/forecasts/*", hash_data(run_info$experiment_name), "-", - hash_data(run_info$run_name), "*models", ".", run_info$data_output + hash_data(run_info$run_name), "*condensed", ".", run_info$data_output ) } else { fcst_path <- paste0( "/forecasts/*", hash_data(run_info$experiment_name), "-", - hash_data(run_info$run_name), "*reconciled", ".", run_info$data_output + hash_data(run_info$run_name), "*models", ".", run_info$data_output ) } @@ -716,7 +715,7 @@ condense_data <- function(run_info, list_of_batches <- list() # Define the batch size - batch_size <- 10000 + batch_size <- 100 # Calculate the number of batches needed num_batches <- ceiling(length(initial_file_list) / batch_size) From 31f60badf86ea1a65e86b12e303b69e9197a8ac4 Mon Sep 17 00:00:00 2001 From: Mike Tokic Date: Thu, 25 Jul 2024 09:18:56 -0700 Subject: [PATCH 16/17] adjust condense size --- R/final_models.R | 2 +- R/read_write_data.R | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/R/final_models.R b/R/final_models.R index 6823599f..f151a505 100644 --- a/R/final_models.R +++ b/R/final_models.R @@ -621,7 +621,7 @@ final_models <- function(run_info, par_end(cl) # condense outputs into less files for larger runs - if(length(combo_list) > 100) { + if(length(combo_list) > 10000) { cli::cli_progress_step("Condensing Forecasts") condense_data(run_info, diff --git a/R/read_write_data.R b/R/read_write_data.R index 5ee304b0..ecd8f9b2 100644 --- a/R/read_write_data.R +++ b/R/read_write_data.R @@ -715,7 +715,7 @@ condense_data <- function(run_info, list_of_batches <- list() # Define the batch size - batch_size <- 100 + batch_size <- 10000 # Calculate the number of batches needed num_batches <- ceiling(length(initial_file_list) / batch_size) From bc88b7b51232b6ec7ede0fcddd2148f704637b8c Mon Sep 17 00:00:00 2001 From: Mike Tokic Date: Thu, 25 Jul 2024 09:46:12 -0700 Subject: [PATCH 17/17] code formatting --- R/ensemble_models.R | 31 +- R/feature_selection.R | 4 - R/final_models.R | 47 ++- R/forecast_time_series.R | 1 - R/hierarchy.R | 15 +- R/input_checks.R | 26 +- R/models.R | 2 - R/multistep_cubist.R | 4 - R/multistep_glmnet.R | 4 - R/multistep_helper.R | 1 - R/multistep_mars.R | 4 - R/multistep_svm_poly.R | 4 - R/multistep_svm_rbf.R | 4 - R/multistep_xgboost.R | 6 +- R/prep_data.R | 383 +++++++++--------- R/prep_models.R | 3 - R/read_write_data.R | 76 ++-- R/run_info.R | 3 - R/train_models.R | 22 +- R/utility.R | 3 +- tests/testthat/test-forecast_time_series.R | 1 - tests/testthat/test-hierarchical.R | 3 - tests/testthat/test-multistep_horizon.R | 1 - ...back-testing-and-hyperparameter-tuning.Rmd | 33 +- vignettes/best-model-selection.Rmd | 27 +- vignettes/external-regressors.Rmd | 10 +- vignettes/feature-engineering.Rmd | 2 - vignettes/finnts.Rmd | 27 +- vignettes/forecast-components.Rmd | 53 ++- vignettes/hierarchical-forecasting.Rmd | 16 +- vignettes/models-used-in-finnts.Rmd | 8 +- vignettes/parallel-processing.Rmd | 16 +- 32 files changed, 396 insertions(+), 444 deletions(-) diff --git a/R/ensemble_models.R b/R/ensemble_models.R index df4eba6b..e39b5270 100644 --- a/R/ensemble_models.R +++ b/R/ensemble_models.R @@ -181,27 +181,29 @@ ensemble_models <- function(run_info, # model forecasts single_model_tbl <- NULL if (run_local_models) { - suppressWarnings(try(single_model_tbl <- read_file(run_info, - path = paste0( - "/forecasts/", hash_data(run_info$experiment_name), "-", hash_data(run_info$run_name), - "-", combo, "-single_models.", run_info$data_output + suppressWarnings(try( + single_model_tbl <- read_file(run_info, + path = paste0( + "/forecasts/", hash_data(run_info$experiment_name), "-", hash_data(run_info$run_name), + "-", combo, "-single_models.", run_info$data_output + ), + return_type = "df" ), - return_type = "df" - ), - silent = TRUE + silent = TRUE )) } global_model_tbl <- NULL if (run_global_models) { - suppressWarnings(try(global_model_tbl <- read_file(run_info, - path = paste0( - "/forecasts/", hash_data(run_info$experiment_name), "-", hash_data(run_info$run_name), - "-", combo, "-global_models.", run_info$data_output + suppressWarnings(try( + global_model_tbl <- read_file(run_info, + path = paste0( + "/forecasts/", hash_data(run_info$experiment_name), "-", hash_data(run_info$run_name), + "-", combo, "-global_models.", run_info$data_output + ), + return_type = "df" ), - return_type = "df" - ), - silent = TRUE + silent = TRUE )) } @@ -336,7 +338,6 @@ ensemble_models <- function(run_info, .multicombine = TRUE, .noexport = NULL ) %do% { - # get initial run info model <- model_run %>% dplyr::pull(Model_Name) diff --git a/R/feature_selection.R b/R/feature_selection.R index d93c790f..67f828f6 100644 --- a/R/feature_selection.R +++ b/R/feature_selection.R @@ -23,10 +23,8 @@ run_feature_selection <- function(input_data, forecast_horizon, external_regressors, multistep_horizon = FALSE) { - # check for more than one unique target value if (input_data %>% tidyr::drop_na(Target) %>% dplyr::pull(Target) %>% unique() %>% length() < 2) { - # just return the date features fs_list <- input_data %>% dplyr::select(tidyselect::contains("Date")) @@ -83,7 +81,6 @@ run_feature_selection <- function(input_data, # run feature selection if (date_type %in% c("day", "week")) { - # number of votes needed for feature to be selected votes_needed <- 3 @@ -410,7 +407,6 @@ lofo_fn <- function(run_info, parallel_processing, pca = FALSE, seed = 123) { - # parallel run info par_info <- par_start( run_info = run_info, diff --git a/R/final_models.R b/R/final_models.R index f151a505..c8cac696 100644 --- a/R/final_models.R +++ b/R/final_models.R @@ -141,7 +141,6 @@ final_models <- function(run_info, run_ensemble_models <- prev_log_df$run_ensemble_models if (sum(colnames(prev_log_df) %in% "weighted_mape")) { - # check if input values have changed current_log_df <- tibble::tibble( average_models = average_models, @@ -294,7 +293,6 @@ final_models <- function(run_info, # simple model averaging if (average_models & length(final_model_list) > 1) { - # create model combinations list model_combinations <- tibble::tibble() @@ -338,7 +336,6 @@ final_models <- function(run_info, .noexport = NULL ) %op% { - # get list of models to average model_list <- strsplit(x, "_")[[1]] @@ -362,9 +359,9 @@ final_models <- function(run_info, } else { averages_tbl <- NULL } - + # choose best average model - if(!is.null(averages_tbl)) { + if (!is.null(averages_tbl)) { avg_back_test_mape <- averages_tbl %>% dplyr::mutate( Train_Test_ID = as.numeric(Train_Test_ID), @@ -372,7 +369,7 @@ final_models <- function(run_info, ) %>% dplyr::filter(Train_Test_ID != 1) %>% dplyr::mutate(MAPE = round(abs((Forecast - Target) / Target), digits = 4)) - + avg_best_model_mape <- avg_back_test_mape %>% dplyr::group_by(Model_ID, Combo) %>% dplyr::mutate( @@ -385,7 +382,7 @@ final_models <- function(run_info, dplyr::group_by(Combo) %>% dplyr::slice(1) %>% dplyr::ungroup() - + avg_best_model_tbl <- avg_best_model_mape %>% dplyr::select(Combo, Model_ID) } @@ -525,11 +522,11 @@ final_models <- function(run_info, by = "Model_ID" ) %>% dplyr::mutate(Best_Model = ifelse(!is.na(Best_Model), "Yes", "No")) - - if(!is.null(averages_tbl)) { + + if (!is.null(averages_tbl)) { avg_model_final_tbl <- averages_tbl %>% dplyr::right_join(avg_best_model_tbl, - by = c("Combo", "Model_ID") + by = c("Combo", "Model_ID") ) %>% dplyr::mutate( Combo_ID = Combo, @@ -619,14 +616,16 @@ final_models <- function(run_info, # clean up any parallel run process par_end(cl) - + # condense outputs into less files for larger runs - if(length(combo_list) > 10000) { + if (length(combo_list) > 10000) { cli::cli_progress_step("Condensing Forecasts") - - condense_data(run_info, - parallel_processing, - num_cores) + + condense_data( + run_info, + parallel_processing, + num_cores + ) } # reconcile hierarchical forecasts @@ -641,17 +640,21 @@ final_models <- function(run_info, num_cores ) } - + # calculate weighted mape weighted_mape <- get_forecast_data(run_info = run_info) %>% - dplyr::filter(Run_Type == "Back_Test", - Best_Model == "Yes") %>% + dplyr::filter( + Run_Type == "Back_Test", + Best_Model == "Yes" + ) %>% dplyr::mutate( Target = ifelse(Target == 0, 0.1, Target) ) %>% - dplyr::mutate(MAPE = round(abs((Forecast - Target) / Target), digits = 4), - Total = sum(Target, na.rm = TRUE), - Weight = (MAPE*Target)/Total) %>% + dplyr::mutate( + MAPE = round(abs((Forecast - Target) / Target), digits = 4), + Total = sum(Target, na.rm = TRUE), + Weight = (MAPE * Target) / Total + ) %>% dplyr::pull(Weight) %>% sum() %>% round(digits = 4) diff --git a/R/forecast_time_series.R b/R/forecast_time_series.R index 3177edd2..fe35feca 100644 --- a/R/forecast_time_series.R +++ b/R/forecast_time_series.R @@ -323,7 +323,6 @@ forecast_backwards_compatibility <- function(run_info, dplyr::select(Combo, Model, Best_Model) %>% dplyr::distinct() } else { - # read in unreconciled results best_model_tbl <- read_file(run_info, path = paste0( diff --git a/R/hierarchy.R b/R/hierarchy.R index 72d87255..544d802d 100644 --- a/R/hierarchy.R +++ b/R/hierarchy.R @@ -82,7 +82,6 @@ prep_hierarchical_data <- function(input_data, hierarchical_tbl <- hierarchical_tbl %>% dplyr::left_join(temp_tbl, by = c("Date")) } else if (value_level != "All") { - # agg by lowest level bottom_tbl <- input_data_adj %>% tidyr::unite("Combo", @@ -400,7 +399,6 @@ reconcile_hierarchical_data <- function(run_info, forecast_approach, negative_forecast = FALSE, num_cores) { - # get run splits model_train_test_tbl <- read_file(run_info, path = paste0( @@ -420,15 +418,15 @@ reconcile_hierarchical_data <- function(run_info, hts_nodes <- hts_list$nodes original_combo_list <- hts_list$original_combos hts_combo_list <- hts_list$hts_combos - + # check if data has been condensed cond_path <- paste0( run_info$path, "/forecasts/*", hash_data(run_info$experiment_name), "-", hash_data(run_info$run_name), "*condensed", ".", run_info$data_output ) - + condensed_files <- list_files(run_info$storage_object, fs::path(cond_path)) - + if (length(condensed_files) > 0) { condensed <- TRUE } else { @@ -444,7 +442,7 @@ reconcile_hierarchical_data <- function(run_info, return_type <- "df" } - if(condensed) { + if (condensed) { fcst_path <- paste0( "/forecasts/*", hash_data(run_info$experiment_name), "-", hash_data(run_info$run_name), "*condensed", ".", run_info$data_output @@ -453,7 +451,7 @@ reconcile_hierarchical_data <- function(run_info, fcst_path <- paste0( "/forecasts/*", hash_data(run_info$experiment_name), "-", hash_data(run_info$run_name), "*models", ".", run_info$data_output - ) + ) } unreconciled_tbl <- read_file(run_info, @@ -889,7 +887,6 @@ reconcile_hierarchical_data <- function(run_info, external_regressor_mapping <- function(data, combo_variables, external_regressors) { - # create var combinations list var_combinations <- tibble::tibble() @@ -918,7 +915,6 @@ external_regressor_mapping <- function(data, .multicombine = TRUE, .noexport = NULL ) %do% { - # get unique values of regressor per combo variable iteration var_unique_tbl <- foreach::foreach( var = iter_list, @@ -1000,7 +996,6 @@ sum_hts_data <- function(bottom_level_tbl, forecast_approach, frequency_number, return_type = "data") { - # create aggregations for target variable Date <- bottom_level_tbl$Date diff --git a/R/input_checks.R b/R/input_checks.R index 0e174c0f..174a300b 100644 --- a/R/input_checks.R +++ b/R/input_checks.R @@ -1,4 +1,3 @@ - #' Check input values #' #' @param input_name input name @@ -13,21 +12,23 @@ check_input_type <- function(input_name, type, expected_value = NULL) { if (!inherits(input_value, type)) { - stop(paste0( - "invalid type for input name '", input_name, "', needs to be of type ", - glue::glue_collapse(type, " or ") - ), - call. = FALSE + stop( + paste0( + "invalid type for input name '", input_name, "', needs to be of type ", + glue::glue_collapse(type, " or ") + ), + call. = FALSE ) } if (!is.null(expected_value) & !is.null(input_value)) { if (!sum(input_value %in% expected_value)) { - stop(paste0( - "invalid value for input name '", input_name, "', value needs to equal ", - glue::glue_collapse(expected_value, " or ") - ), - call. = FALSE + stop( + paste0( + "invalid value for input name '", input_name, "', value needs to equal ", + glue::glue_collapse(expected_value, " or ") + ), + call. = FALSE ) } } @@ -52,7 +53,6 @@ check_input_data <- function(input_data, date_type, fiscal_year_start, parallel_processing) { - # data combo names match the input data if (sum(combo_variables %in% colnames(input_data)) != length(combo_variables)) { stop("combo variables do not match column headers in input data") @@ -103,7 +103,6 @@ check_input_data <- function(input_data, # input_data is correct type for parallel processing if (inherits(input_data, c("data.frame", "tbl")) & is.null(parallel_processing)) { - # do nothing } else if (inherits(input_data, "tbl_spark") & is.null(parallel_processing)) { stop("spark data frames should run with spark parallel processing", @@ -148,7 +147,6 @@ check_input_data <- function(input_data, check_parallel_processing <- function(run_info, parallel_processing, inner_parallel = FALSE) { - # parallel processing formatting if (is.null(parallel_processing)) { return() diff --git a/R/models.R b/R/models.R index 8cc9e420..67015038 100644 --- a/R/models.R +++ b/R/models.R @@ -701,7 +701,6 @@ glmnet <- function(train_data, horizon, external_regressors, frequency) { - # create model recipe and spec if (multistep) { recipe_spec_glmnet <- train_data %>% @@ -1328,7 +1327,6 @@ xgboost <- function(train_data, horizon, external_regressors, frequency) { - # create model recipe and spec if (multistep) { recipe_spec_xgboost <- train_data %>% diff --git a/R/multistep_cubist.R b/R/multistep_cubist.R index 8a005a4c..e03c047f 100644 --- a/R/multistep_cubist.R +++ b/R/multistep_cubist.R @@ -1,4 +1,3 @@ - # CUBIST Multistep ---- #' Initialize custom cubist parsnip model @@ -298,7 +297,6 @@ cubist_multistep_fit_impl <- function(x, y, external_regressors = NULL, forecast_horizon = NULL, selected_features = NULL) { - # X & Y # Expect outcomes = vector # Expect predictor = data.frame @@ -321,7 +319,6 @@ cubist_multistep_fit_impl <- function(x, y, model_predictions <- list() for (lag in get_multi_lags(lag_periods, forecast_horizon)) { - # get final features based on lag xreg_tbl_final <- multi_feature_selection( xreg_tbl, @@ -438,7 +435,6 @@ predict.cubist_multistep_fit_impl <- function(object, new_data, ...) { #' @keywords internal #' @export cubist_multistep_predict_impl <- function(object, new_data, ...) { - # Date Mapping Table date_tbl <- new_data %>% dplyr::select(Date, Date_index.num) %>% diff --git a/R/multistep_glmnet.R b/R/multistep_glmnet.R index f177aaa0..b67e8217 100644 --- a/R/multistep_glmnet.R +++ b/R/multistep_glmnet.R @@ -1,4 +1,3 @@ - # GLMNET Multistep ---- #' Initialize custom glmnet parsnip model @@ -282,7 +281,6 @@ glmnet_multistep_fit_impl <- function(x, y, external_regressors = NULL, forecast_horizon = NULL, selected_features = NULL) { - # X & Y # Expect outcomes = vector # Expect predictor = data.frame @@ -311,7 +309,6 @@ glmnet_multistep_fit_impl <- function(x, y, parsnip::set_engine("glmnet") for (lag in get_multi_lags(lag_periods, forecast_horizon)) { - # get final features based on lag xreg_tbl_final <- multi_feature_selection( xreg_tbl, @@ -426,7 +423,6 @@ predict.glmnet_multistep_fit_impl <- function(object, new_data, ...) { #' @keywords internal #' @export glmnet_multistep_predict_impl <- function(object, new_data, ...) { - # Date Mapping Table date_tbl <- new_data %>% dplyr::select(Date, Date_index.num) %>% diff --git a/R/multistep_helper.R b/R/multistep_helper.R index 9cd1cdc9..b8435cb1 100644 --- a/R/multistep_helper.R +++ b/R/multistep_helper.R @@ -1,4 +1,3 @@ - # Helper Functions ---- #' Return xregs that contain future values for multistep horizon forecast diff --git a/R/multistep_mars.R b/R/multistep_mars.R index 02e84e5b..68899d9e 100644 --- a/R/multistep_mars.R +++ b/R/multistep_mars.R @@ -1,4 +1,3 @@ - # MARS Multistep ---- #' Initialize custom mars parsnip model @@ -303,7 +302,6 @@ mars_multistep_fit_impl <- function(x, y, external_regressors = NULL, forecast_horizon = NULL, selected_features = NULL) { - # X & Y # Expect outcomes = vector # Expect predictor = data.frame @@ -334,7 +332,6 @@ mars_multistep_fit_impl <- function(x, y, parsnip::set_engine("earth") for (lag in get_multi_lags(lag_periods, forecast_horizon)) { - # get final features based on lag xreg_tbl_final <- multi_feature_selection( xreg_tbl, @@ -449,7 +446,6 @@ predict.mars_multistep_fit_impl <- function(object, new_data, ...) { #' @keywords internal #' @export mars_multistep_predict_impl <- function(object, new_data, ...) { - # Date Mapping Table date_tbl <- new_data %>% dplyr::select(Date, Date_index.num) %>% diff --git a/R/multistep_svm_poly.R b/R/multistep_svm_poly.R index 14ad5e21..4b57a8b6 100644 --- a/R/multistep_svm_poly.R +++ b/R/multistep_svm_poly.R @@ -1,4 +1,3 @@ - # SVM-POLY Multistep ---- #' Initialize custom svm-poly parsnip model @@ -325,7 +324,6 @@ svm_poly_multistep_fit_impl <- function(x, y, external_regressors = NULL, forecast_horizon = NULL, selected_features = NULL) { - # X & Y # Expect outcomes = vector # Expect predictor = data.frame @@ -357,7 +355,6 @@ svm_poly_multistep_fit_impl <- function(x, y, parsnip::set_engine("kernlab") for (lag in get_multi_lags(lag_periods, forecast_horizon)) { - # get final features based on lag xreg_tbl_final <- multi_feature_selection( xreg_tbl, @@ -475,7 +472,6 @@ predict.svm_poly_multistep_fit_impl <- function(object, new_data, ...) { #' @keywords internal #' @export svm_poly_multistep_predict_impl <- function(object, new_data, ...) { - # Date Mapping Table date_tbl <- new_data %>% dplyr::select(Date, Date_index.num) %>% diff --git a/R/multistep_svm_rbf.R b/R/multistep_svm_rbf.R index 6a7f779f..d7480912 100644 --- a/R/multistep_svm_rbf.R +++ b/R/multistep_svm_rbf.R @@ -1,4 +1,3 @@ - # SVM-RBF Multistep ---- #' Initialize custom svm-rbf parsnip model @@ -306,7 +305,6 @@ svm_rbf_multistep_fit_impl <- function(x, y, external_regressors = NULL, forecast_horizon = NULL, selected_features = NULL) { - # X & Y # Expect outcomes = vector # Expect predictor = data.frame @@ -337,7 +335,6 @@ svm_rbf_multistep_fit_impl <- function(x, y, parsnip::set_engine("kernlab") for (lag in get_multi_lags(lag_periods, forecast_horizon)) { - # get final features based on lag xreg_tbl_final <- multi_feature_selection( xreg_tbl, @@ -455,7 +452,6 @@ predict.svm_rbf_multistep_fit_impl <- function(object, new_data, ...) { #' @keywords internal #' @export svm_rbf_multistep_predict_impl <- function(object, new_data, ...) { - # Date Mapping Table date_tbl <- new_data %>% dplyr::select(Date, Date_index.num) %>% diff --git a/R/multistep_xgboost.R b/R/multistep_xgboost.R index 03b327f1..9ce5332d 100644 --- a/R/multistep_xgboost.R +++ b/R/multistep_xgboost.R @@ -1,4 +1,3 @@ - # XGBOOST Multistep ---- #' Initialize custom xgboost parsnip model @@ -389,7 +388,6 @@ xgboost_multistep_fit_impl <- function(x, y, forecast_horizon = NULL, selected_features = NULL, ...) { - # X & Y # Expect outcomes = vector # Expect predictor = data.frame @@ -412,7 +410,6 @@ xgboost_multistep_fit_impl <- function(x, y, model_predictions <- list() for (lag in get_multi_lags(lag_periods, forecast_horizon)) { - # get final features based on lag xreg_tbl_final <- multi_feature_selection( xreg_tbl, @@ -437,7 +434,7 @@ xgboost_multistep_fit_impl <- function(x, y, y = outcome, max_depth = max_depth, nrounds = nrounds, - eta = eta, + eta = eta, colsample_bytree = colsample_bytree, colsample_bynode = colsample_bynode, min_child_weight = min_child_weight, @@ -537,7 +534,6 @@ predict.xgboost_multistep_fit_impl <- function(object, new_data, ...) { #' @keywords internal #' @export xgboost_multistep_predict_impl <- function(object, new_data, ...) { - # Date Mapping Table date_tbl <- new_data %>% dplyr::select(Date, Date_index.num) %>% diff --git a/R/prep_data.R b/R/prep_data.R index 41d28735..92108afd 100644 --- a/R/prep_data.R +++ b/R/prep_data.R @@ -227,7 +227,6 @@ prep_data <- function(run_info, dplyr::filter(Combo %in% current_combo_list_final) if (length(combo_diff) == 0 & length(prev_combo_list) > 0) { - # check if input values have changed current_log_df <- tibble::tibble( combo_variables = paste(combo_variables, collapse = "---"), @@ -466,204 +465,205 @@ prep_data <- function(run_info, } else if (parallel_processing == "spark") { final_data <- filtered_initial_prep_tbl %>% adjust_df(return_type = "sdf") %>% - sparklyr::spark_apply(function(df, context) { - # update objects - fn_env <- .GlobalEnv - - for (name in names(context)) { - assign(name, context[[name]], envir = fn_env) - } - - # get specific time series - combo <- unique(df$Combo) - - return_tbl <- tibble::tibble( - Combo = combo, - Combo_Hash = hash_data(combo) - ) - - # handle external regressors - xregs_future_tbl <- get_xregs_future_values_tbl( - df, - external_regressors, - hist_end_date - ) - - if (length(colnames(xregs_future_tbl)) > 2) { - xregs_future_list <- xregs_future_tbl %>% - dplyr::select(-Date, -Combo) %>% - colnames() - } else { - xregs_future_list <- NULL - } + sparklyr::spark_apply( + function(df, context) { + # update objects + fn_env <- .GlobalEnv - # initial data prep - initial_tbl <- df %>% - dplyr::filter(Combo == combo) %>% - dplyr::select( - Combo, - Date, - Target, - tidyselect::all_of(external_regressors) - ) %>% - dplyr::group_by(Combo) %>% - timetk::pad_by_time(Date, - .by = date_type, - .pad_value = ifelse(clean_missing_values, NA, 0), - .end_date = hist_end_date - ) %>% # fill in missing values in between existing data points - timetk::pad_by_time(Date, - .by = date_type, - .pad_value = 0, - .start_date = hist_start_date, - .end_date = hist_end_date - ) %>% # fill in missing values at beginning of time series with zero - timetk::future_frame(Date, - .length_out = forecast_horizon, - .bind_data = TRUE - ) %>% # add future data - dplyr::ungroup() %>% - dplyr::left_join(xregs_future_tbl, - by = c("Combo", "Date") - ) %>% # join xregs that contain values given by user - clean_outliers_missing_values( - clean_outliers, - clean_missing_values, - get_frequency_number(date_type), - external_regressors - ) %>% # clean outliers and missing values - dplyr::mutate_if(is.numeric, list(~ replace(., is.infinite(.), NA))) %>% # replace infinite values - dplyr::mutate_if(is.numeric, list(~ replace(., is.nan(.), NA))) %>% # replace NaN values - dplyr::mutate_if(is.numeric, list(~ replace(., is.na(.), 0))) %>% # replace NA values - dplyr::mutate(Target = ifelse(Date > hist_end_date, - NA, - Target - )) + for (name in names(context)) { + assign(name, context[[name]], envir = fn_env) + } - # box-cox transformation - if (box_cox) { - box_cox_tbl <- initial_tbl %>% - apply_box_cox() + # get specific time series + combo <- unique(df$Combo) - initial_tbl <- box_cox_tbl$data + return_tbl <- tibble::tibble( + Combo = combo, + Combo_Hash = hash_data(combo) + ) - return_tbl <- return_tbl %>% - dplyr::left_join(box_cox_tbl$diff_info, by = "Combo") - } + # handle external regressors + xregs_future_tbl <- get_xregs_future_values_tbl( + df, + external_regressors, + hist_end_date + ) - # make stationary - if (stationary) { - stationary_tbl <- initial_tbl %>% - make_stationary() + if (length(colnames(xregs_future_tbl)) > 2) { + xregs_future_list <- xregs_future_tbl %>% + dplyr::select(-Date, -Combo) %>% + colnames() + } else { + xregs_future_list <- NULL + } - initial_tbl <- stationary_tbl$data + # initial data prep + initial_tbl <- df %>% + dplyr::filter(Combo == combo) %>% + dplyr::select( + Combo, + Date, + Target, + tidyselect::all_of(external_regressors) + ) %>% + dplyr::group_by(Combo) %>% + timetk::pad_by_time(Date, + .by = date_type, + .pad_value = ifelse(clean_missing_values, NA, 0), + .end_date = hist_end_date + ) %>% # fill in missing values in between existing data points + timetk::pad_by_time(Date, + .by = date_type, + .pad_value = 0, + .start_date = hist_start_date, + .end_date = hist_end_date + ) %>% # fill in missing values at beginning of time series with zero + timetk::future_frame(Date, + .length_out = forecast_horizon, + .bind_data = TRUE + ) %>% # add future data + dplyr::ungroup() %>% + dplyr::left_join(xregs_future_tbl, + by = c("Combo", "Date") + ) %>% # join xregs that contain values given by user + clean_outliers_missing_values( + clean_outliers, + clean_missing_values, + get_frequency_number(date_type), + external_regressors + ) %>% # clean outliers and missing values + dplyr::mutate_if(is.numeric, list(~ replace(., is.infinite(.), NA))) %>% # replace infinite values + dplyr::mutate_if(is.numeric, list(~ replace(., is.nan(.), NA))) %>% # replace NaN values + dplyr::mutate_if(is.numeric, list(~ replace(., is.na(.), 0))) %>% # replace NA values + dplyr::mutate(Target = ifelse(Date > hist_end_date, + NA, + Target + )) + + # box-cox transformation + if (box_cox) { + box_cox_tbl <- initial_tbl %>% + apply_box_cox() + + initial_tbl <- box_cox_tbl$data + + return_tbl <- return_tbl %>% + dplyr::left_join(box_cox_tbl$diff_info, by = "Combo") + } - return_tbl <- return_tbl %>% - dplyr::left_join(stationary_tbl$diff_info, by = "Combo") - } + # make stationary + if (stationary) { + stationary_tbl <- initial_tbl %>% + make_stationary() - # create date features - date_features <- initial_tbl %>% - dplyr::select(Date) %>% - dplyr::mutate( - Date_Adj = Date %m+% months(fiscal_year_start - 1), - Date_day_month_end = ifelse(lubridate::day(Date_Adj) == lubridate::days_in_month(Date_Adj), 1, 0) - ) %>% - timetk::tk_augment_timeseries_signature(Date_Adj) %>% - dplyr::select(!tidyselect::matches(get_date_regex(date_type)), -Date_Adj, -Date) + initial_tbl <- stationary_tbl$data - names(date_features) <- stringr::str_c("Date_", names(date_features)) + return_tbl <- return_tbl %>% + dplyr::left_join(stationary_tbl$diff_info, by = "Combo") + } - initial_tbl <- initial_tbl %>% - cbind(date_features) + # create date features + date_features <- initial_tbl %>% + dplyr::select(Date) %>% + dplyr::mutate( + Date_Adj = Date %m+% months(fiscal_year_start - 1), + Date_day_month_end = ifelse(lubridate::day(Date_Adj) == lubridate::days_in_month(Date_Adj), 1, 0) + ) %>% + timetk::tk_augment_timeseries_signature(Date_Adj) %>% + dplyr::select(!tidyselect::matches(get_date_regex(date_type)), -Date_Adj, -Date) - # Run Recipes - if (is.null(recipes_to_run)) { - run_all_recipes_override <- FALSE - } else if (recipes_to_run == "all") { - run_all_recipes_override <- TRUE - } else { - run_all_recipes_override <- FALSE - } + names(date_features) <- stringr::str_c("Date_", names(date_features)) - if (is.null(recipes_to_run) | "R1" %in% recipes_to_run | run_all_recipes_override) { - R1 <- initial_tbl %>% - multivariate_prep_recipe_1(external_regressors, - xregs_future_values_list = xregs_future_list, - get_fourier_periods(fourier_periods, date_type), - get_lag_periods(lag_periods, date_type, forecast_horizon, multistep_horizon, TRUE), - get_rolling_window_periods(rolling_window_periods, date_type) - ) %>% - dplyr::mutate(Target = base::ifelse(Date > hist_end_date, NA, Target)) + initial_tbl <- initial_tbl %>% + cbind(date_features) - write_data( - x = R1, - combo = combo, - run_info = run_info, - output_type = "data", - folder = "prep_data", - suffix = "-R1" - ) - } + # Run Recipes + if (is.null(recipes_to_run)) { + run_all_recipes_override <- FALSE + } else if (recipes_to_run == "all") { + run_all_recipes_override <- TRUE + } else { + run_all_recipes_override <- FALSE + } - if ((is.null(recipes_to_run) & date_type %in% c("month", "quarter", "year")) | "R2" %in% recipes_to_run | run_all_recipes_override) { - R2 <- initial_tbl %>% - multivariate_prep_recipe_2(external_regressors, - xregs_future_values_list = xregs_future_list, - get_fourier_periods(fourier_periods, date_type), - get_lag_periods(lag_periods, date_type, forecast_horizon), - get_rolling_window_periods(rolling_window_periods, date_type), - date_type, - forecast_horizon - ) %>% - dplyr::mutate(Target = base::ifelse(Date > hist_end_date, NA, Target)) + if (is.null(recipes_to_run) | "R1" %in% recipes_to_run | run_all_recipes_override) { + R1 <- initial_tbl %>% + multivariate_prep_recipe_1(external_regressors, + xregs_future_values_list = xregs_future_list, + get_fourier_periods(fourier_periods, date_type), + get_lag_periods(lag_periods, date_type, forecast_horizon, multistep_horizon, TRUE), + get_rolling_window_periods(rolling_window_periods, date_type) + ) %>% + dplyr::mutate(Target = base::ifelse(Date > hist_end_date, NA, Target)) + + write_data( + x = R1, + combo = combo, + run_info = run_info, + output_type = "data", + folder = "prep_data", + suffix = "-R1" + ) + } - write_data( - x = R2, - combo = combo, - run_info = run_info, - output_type = "data", - folder = "prep_data", - suffix = "-R2" - ) - } + if ((is.null(recipes_to_run) & date_type %in% c("month", "quarter", "year")) | "R2" %in% recipes_to_run | run_all_recipes_override) { + R2 <- initial_tbl %>% + multivariate_prep_recipe_2(external_regressors, + xregs_future_values_list = xregs_future_list, + get_fourier_periods(fourier_periods, date_type), + get_lag_periods(lag_periods, date_type, forecast_horizon), + get_rolling_window_periods(rolling_window_periods, date_type), + date_type, + forecast_horizon + ) %>% + dplyr::mutate(Target = base::ifelse(Date > hist_end_date, NA, Target)) + + write_data( + x = R2, + combo = combo, + run_info = run_info, + output_type = "data", + folder = "prep_data", + suffix = "-R2" + ) + } - return(data.frame(return_tbl)) - }, - group_by = "Combo", - context = list( - get_xregs_future_values_tbl = get_xregs_future_values_tbl, - external_regressors = external_regressors, - clean_missing_values = clean_missing_values, - clean_outliers_missing_values = clean_outliers_missing_values, - hash_data = hash_data, - hist_end_date = hist_end_date, - hist_start_date = hist_start_date, - forecast_approach = forecast_approach, - forecast_horizon = forecast_horizon, - clean_outliers = clean_outliers, - get_frequency_number = get_frequency_number, - date_type = date_type, - fiscal_year_start = fiscal_year_start, - get_date_regex = get_date_regex, - recipes_to_run = recipes_to_run, - multivariate_prep_recipe_1 = multivariate_prep_recipe_1, - multivariate_prep_recipe_2 = multivariate_prep_recipe_2, - run_info = run_info, - get_fourier_periods = get_fourier_periods, - fourier_periods = fourier_periods, - get_lag_periods = get_lag_periods, - lag_periods = lag_periods, - get_rolling_window_periods = get_rolling_window_periods, - rolling_window_periods = rolling_window_periods, - write_data = write_data, - write_data_folder = write_data_folder, - write_data_type = write_data_type, - box_cox = box_cox, - stationary = stationary, - make_stationary = make_stationary, - apply_box_cox = apply_box_cox - ) + return(data.frame(return_tbl)) + }, + group_by = "Combo", + context = list( + get_xregs_future_values_tbl = get_xregs_future_values_tbl, + external_regressors = external_regressors, + clean_missing_values = clean_missing_values, + clean_outliers_missing_values = clean_outliers_missing_values, + hash_data = hash_data, + hist_end_date = hist_end_date, + hist_start_date = hist_start_date, + forecast_approach = forecast_approach, + forecast_horizon = forecast_horizon, + clean_outliers = clean_outliers, + get_frequency_number = get_frequency_number, + date_type = date_type, + fiscal_year_start = fiscal_year_start, + get_date_regex = get_date_regex, + recipes_to_run = recipes_to_run, + multivariate_prep_recipe_1 = multivariate_prep_recipe_1, + multivariate_prep_recipe_2 = multivariate_prep_recipe_2, + run_info = run_info, + get_fourier_periods = get_fourier_periods, + fourier_periods = fourier_periods, + get_lag_periods = get_lag_periods, + lag_periods = lag_periods, + get_rolling_window_periods = get_rolling_window_periods, + rolling_window_periods = rolling_window_periods, + write_data = write_data, + write_data_folder = write_data_folder, + write_data_type = write_data_type, + box_cox = box_cox, + stationary = stationary, + make_stationary = make_stationary, + apply_box_cox = apply_box_cox + ) ) } @@ -693,12 +693,13 @@ prep_data <- function(run_info, length() if (successful_combos != total_combos) { - stop(paste0( - "Not all time series were prepped within 'prep_data', expected ", - total_combos, " time series but only ", successful_combos, - " time series are prepped. ", "Please run 'prep_data' again." - ), - call. = FALSE + stop( + paste0( + "Not all time series were prepped within 'prep_data', expected ", + total_combos, " time series but only ", successful_combos, + " time series are prepped. ", "Please run 'prep_data' again." + ), + call. = FALSE ) } @@ -1067,7 +1068,6 @@ apply_box_cox <- function(df) { ) for (column_name in names(df)) { - # Only check numeric columns with more than 2 unique values if (is.numeric(df[[column_name]]) & length(unique(df[[column_name]])) > 2) { temp_tbl <- df %>% @@ -1119,7 +1119,6 @@ make_stationary <- function(df) { ) for (column_name in names(df)) { - # Only check numeric columns with more than 2 unique values if (is.numeric(df[[column_name]]) & length(unique(df[[column_name]])) > 2) { temp_tbl <- df %>% @@ -1183,7 +1182,6 @@ multivariate_prep_recipe_1 <- function(data, rolling_window_periods, hist_end_date, date_type) { - # apply polynomial transformations numeric_xregs <- c() @@ -1345,7 +1343,6 @@ multivariate_prep_recipe_2 <- function(data, } for (period in 1:forecast_horizon) { - # add horizon and origin components data_lag_window <- df_poly %>% dplyr::mutate( diff --git a/R/prep_models.R b/R/prep_models.R index 7b9dede0..5602820d 100644 --- a/R/prep_models.R +++ b/R/prep_models.R @@ -1,4 +1,3 @@ - #' Prep Models #' #' Preps various aspects of run before training models. Things like train/test @@ -60,7 +59,6 @@ prep_models <- function(run_info, pca = NULL, num_hyperparameters = 10, seed = 123) { - # check input values check_input_type("run_info", run_info, "list") check_input_type("back_test_scenarios", back_test_scenarios, c("NULL", "numeric")) @@ -514,7 +512,6 @@ model_workflows <- function(run_info, ml_models <- list_models() if (is.null(models_to_run) & is.null(models_not_to_run)) { - # do nothing, using existing ml_models list } else if (is.null(models_to_run) & !is.null(models_not_to_run)) { ml_models <- setdiff(ml_models, models_not_to_run) diff --git a/R/read_write_data.R b/R/read_write_data.R index ecd8f9b2..2e43572a 100644 --- a/R/read_write_data.R +++ b/R/read_write_data.R @@ -1,4 +1,3 @@ - #' Get Final Forecast Data #' #' @param run_info run info using the [set_run_info()] function @@ -46,7 +45,6 @@ #' @export get_forecast_data <- function(run_info, return_type = "df") { - # check input values check_input_type("run_info", run_info, "list") check_input_type("return_type", return_type, "character", c("df", "sdf")) @@ -76,17 +74,17 @@ get_forecast_data <- function(run_info, run_info$path, "/forecasts/*", hash_data(run_info$experiment_name), "-", hash_data(run_info$run_name), "*condensed", ".", run_info$data_output ) - + condensed_files <- list_files(run_info$storage_object, fs::path(cond_path)) - + if (length(condensed_files) > 0) { condensed <- TRUE } else { condensed <- FALSE } - + # get forecast data - if(forecast_approach != "bottoms_up") { + if (forecast_approach != "bottoms_up") { fcst_path <- paste0( "/forecasts/*", hash_data(run_info$experiment_name), "-", hash_data(run_info$run_name), "*reconciled", ".", run_info$data_output @@ -172,7 +170,6 @@ get_forecast_data <- function(run_info, #' } #' @export get_trained_models <- function(run_info) { - # check input values check_input_type("run_info", run_info, "list") @@ -228,7 +225,6 @@ get_trained_models <- function(run_info) { get_prepped_data <- function(run_info, recipe, return_type = "df") { - # check input values check_input_type("run_info", run_info, "list") check_input_type("recipe", recipe, "character", c("R1", "R2")) @@ -299,7 +295,6 @@ get_prepped_data <- function(run_info, #' } #' @export get_prepped_models <- function(run_info) { - # check input values check_input_type("run_info", run_info, "list") @@ -527,12 +522,12 @@ download_file <- function(storage_object, #' @noRd read_file <- function(run_info, path = NULL, - file_list = NULL, + file_list = NULL, return_type = "df", schema = NULL) { storage_object <- run_info$storage_object - - if(!is.null(path)) { + + if (!is.null(path)) { folder <- fs::path_dir(path) initial_path <- run_info$path file <- fs::path_file(path) @@ -551,7 +546,7 @@ read_file <- function(run_info, files <- list_files(storage_object, fs::path(initial_path, path)) } - if(!is.null(file_list)) { + if (!is.null(file_list)) { file_temp <- files[[1]] file_ext <- fs::path_ext(file_temp) } else if (fs::path_ext(file) == "*") { @@ -698,10 +693,9 @@ get_recipe_data <- function(run_info, #' #' @return nothing #' @noRd -condense_data <- function(run_info, +condense_data <- function(run_info, parallel_processing = NULL, num_cores = NULL) { - # get initial list of files to condense initial_file_list <- list_files( run_info$storage_object, @@ -710,16 +704,16 @@ condense_data <- function(run_info, hash_data(run_info$run_name), "*_models.", run_info$data_output ) ) - + # Initialize an empty list to store the batches list_of_batches <- list() - + # Define the batch size batch_size <- 10000 - + # Calculate the number of batches needed num_batches <- ceiling(length(initial_file_list) / batch_size) - + # Loop through the large list and create batches for (i in 1:num_batches) { start_index <- (i - 1) * batch_size + 1 @@ -727,7 +721,7 @@ condense_data <- function(run_info, batch_name <- paste0("batch_", i) list_of_batches[[batch_name]] <- initial_file_list[start_index:end_index] } - + # parallel run info par_info <- par_start( run_info = run_info, @@ -735,11 +729,11 @@ condense_data <- function(run_info, num_cores = min(length(names(list_of_batches)), num_cores), task_length = length(names(list_of_batches)) ) - + cl <- par_info$cl packages <- par_info$packages `%op%` <- par_info$foreach_operator - + # submit tasks condense_data_tbl <- foreach::foreach( batch = names(list_of_batches), @@ -750,23 +744,23 @@ condense_data <- function(run_info, .inorder = FALSE, .multicombine = TRUE, .noexport = NULL - ) %op% - { - files <- list_of_batches[[batch]] - - data <- read_file(run_info, - file_list = files, - return_type = "df") - - write_data( - x = data, - combo = batch, - run_info = run_info, - output_type = "data", - folder = "forecasts", - suffix = "-condensed" - ) - - return(batch) - } + ) %op% { + files <- list_of_batches[[batch]] + + data <- read_file(run_info, + file_list = files, + return_type = "df" + ) + + write_data( + x = data, + combo = batch, + run_info = run_info, + output_type = "data", + folder = "forecasts", + suffix = "-condensed" + ) + + return(batch) + } } diff --git a/R/run_info.R b/R/run_info.R index bf9acc5b..bffece82 100644 --- a/R/run_info.R +++ b/R/run_info.R @@ -44,7 +44,6 @@ set_run_info <- function(experiment_name = "finn_fcst", data_output = "csv", object_output = "rds", add_unique_id = TRUE) { - # initial input checks if (!inherits(run_name, c("NULL", "character"))) { stop("`run_name` must either be a NULL or a string") @@ -151,7 +150,6 @@ set_run_info <- function(experiment_name = "finn_fcst", base::suppressWarnings() if (nrow(log_df) > 0 & add_unique_id == FALSE) { - # check if input values have changed current_log_df <- tibble::tibble( experiment_name = experiment_name, @@ -266,7 +264,6 @@ get_run_info <- function(experiment_name = NULL, run_name = NULL, storage_object = NULL, path = NULL) { - # input checks if (!inherits(run_name, c("NULL", "character"))) { stop("`run_name` must either be a NULL or a string") diff --git a/R/train_models.R b/R/train_models.R index 511c9383..d8f406e1 100644 --- a/R/train_models.R +++ b/R/train_models.R @@ -1,4 +1,3 @@ - #' Train Individual Models #' #' @param run_info run info using the [set_run_info()] function @@ -220,7 +219,6 @@ train_models <- function(run_info, stringr::str_replace(hash_data("All-Data"), "All-Data") if (length(combo_diff) == 0 & length(prev_combo_list) > 0) { - # check if input values have changed current_log_df <- tibble::tibble( run_global_models = run_global_models, @@ -270,7 +268,6 @@ train_models <- function(run_info, .noexport = NULL ) %op% { - # get time series combo_hash <- x @@ -402,7 +399,6 @@ train_models <- function(run_info, .multicombine = TRUE, .noexport = NULL ) %do% { - # get initial run info model <- model_run %>% dplyr::pull(Model_Name) @@ -719,12 +715,13 @@ train_models <- function(run_info, length() if (successful_combos != total_combos) { - stop(paste0( - "Not all time series were completed within 'train_models', expected ", - total_combos, " time series but only ", successful_combos, - " time series were ran. ", "Please run 'train_models' again." - ), - call. = FALSE + stop( + paste0( + "Not all time series were completed within 'train_models', expected ", + total_combos, " time series but only ", successful_combos, + " time series were ran. ", "Please run 'train_models' again." + ), + call. = FALSE ) } @@ -784,7 +781,6 @@ negative_fcst_adj <- function(data, #' @return tbl with train test splits #' @noRd create_splits <- function(data, train_test_splits) { - # Create the rsplit object analysis_split <- function(data, train_indices, test_indices) { rsplit_object <- rsample::make_splits( @@ -846,7 +842,6 @@ create_splits <- function(data, train_test_splits) { undifference_forecast <- function(forecast_data, recipe_data, diff_tbl) { - # check if data needs to be undifferenced diff1 <- diff_tbl$Diff_Value1 diff2 <- diff_tbl$Diff_Value2 @@ -863,10 +858,8 @@ undifference_forecast <- function(forecast_data, # non seasonal differencing if (!is.na(diff1)) { - # loop through each back test split for (id in train_test_id) { - # get specific train test split fcst_temp_tbl <- forecast_data %>% dplyr::filter(Train_Test_ID == id) @@ -950,7 +943,6 @@ undifference_forecast <- function(forecast_data, undifference_recipe <- function(recipe_data, diff_tbl, hist_end_date) { - # check if data needs to be undifferenced diff1 <- diff_tbl$Diff_Value1 diff2 <- diff_tbl$Diff_Value2 diff --git a/R/utility.R b/R/utility.R index 2e9d96ce..50125e32 100644 --- a/R/utility.R +++ b/R/utility.R @@ -15,7 +15,7 @@ utils::globalVariables(c( "Auto_Accept", "Feature", "Imp", "Importance", "LOFO_Var", "Var_RMSE", "Vote", "Votes", "desc", "term", "Column", "Box_Cox_Lambda", "get_recipie_configurable", "Agg", "Unique", "Var", "Var_Combo", "regressor", "regressor_tbl", "value_level_iter", ".actual", ".fitted", - "forecast_horizon", "lag", "new_data", "object", "fit", "Row_Num", "Run_Number", "weight", + "forecast_horizon", "lag", "new_data", "object", "fit", "Row_Num", "Run_Number", "weight", "Total", "Weight", "batch" )) @@ -89,7 +89,6 @@ cbind.fill <- function(..., fill = NA) { # been loaded. .onLoad <- function(libname, pkgname) { - # CRAN OMP THREAD LIMIT Sys.setenv("OMP_THREAD_LIMIT" = 1) diff --git a/tests/testthat/test-forecast_time_series.R b/tests/testthat/test-forecast_time_series.R index 2b5117d9..3f5bf646 100644 --- a/tests/testthat/test-forecast_time_series.R +++ b/tests/testthat/test-forecast_time_series.R @@ -1,4 +1,3 @@ - # * custom test functions ---- check_exist <- function(to_check, ret) { diff --git a/tests/testthat/test-hierarchical.R b/tests/testthat/test-hierarchical.R index fc7bde9d..61bc0c1b 100644 --- a/tests/testthat/test-hierarchical.R +++ b/tests/testthat/test-hierarchical.R @@ -1,6 +1,4 @@ - test_that("prep_hierarchical_data returns correct grouped hierarchies", { - # Mock data setup data <- tibble::tibble( Segment = as.character(c( @@ -77,7 +75,6 @@ test_that("prep_hierarchical_data returns correct grouped hierarchies", { }) test_that("prep_hierarchical_data returns correct standard hierarchies", { - # Mock data setup data <- tibble::tibble( Area = as.character(c("EMEA", "EMEA", "EMEA", "EMEA", "EMEA", "EMEA", "EMEA", "EMEA", "United States", "United States", "United States", "United States")), diff --git a/tests/testthat/test-multistep_horizon.R b/tests/testthat/test-multistep_horizon.R index de0c6492..7fcb3448 100644 --- a/tests/testthat/test-multistep_horizon.R +++ b/tests/testthat/test-multistep_horizon.R @@ -1,5 +1,4 @@ test_that("multistep_horizon monthly data", { - # Mock data setup data <- timetk::m4_monthly %>% dplyr::mutate(id = as.character(id)) %>% diff --git a/vignettes/back-testing-and-hyperparameter-tuning.Rmd b/vignettes/back-testing-and-hyperparameter-tuning.Rmd index 2c69aeb9..ab9a854d 100644 --- a/vignettes/back-testing-and-hyperparameter-tuning.Rmd +++ b/vignettes/back-testing-and-hyperparameter-tuning.Rmd @@ -22,27 +22,33 @@ When `prep_models()` is ran, hyperparameters and back test splits are calculated library(finnts) hist_data <- timetk::m4_monthly %>% - dplyr::filter(date >= "2012-01-01", - id == "M2") %>% + dplyr::filter( + date >= "2012-01-01", + id == "M2" + ) %>% dplyr::rename(Date = date) %>% dplyr::mutate(id = as.character(id)) run_info <- set_run_info( - experiment_name = "finnts_fcst", + experiment_name = "finnts_fcst", run_name = "get_prepped_models" ) -prep_data(run_info = run_info, - input_data = hist_data, - combo_variables = c("id"), - target_variable = "value", - date_type = "month", - recipes_to_run = "R1", - forecast_horizon = 6) +prep_data( + run_info = run_info, + input_data = hist_data, + combo_variables = c("id"), + target_variable = "value", + date_type = "month", + recipes_to_run = "R1", + forecast_horizon = 6 +) -prep_models(run_info = run_info, - models_to_run = c("arima", "ets", "xgboost"), - num_hyperparameters = 10) +prep_models( + run_info = run_info, + models_to_run = c("arima", "ets", "xgboost"), + num_hyperparameters = 10 +) model_info <- get_prepped_models(run_info = run_info) @@ -67,7 +73,6 @@ model_hyperparameter_info <- model_info %>% tidyr::unnest(Data) print(model_hyperparameter_info) - ``` The above outputs allow a Finn user to understand what hyperparameters are chosen for tuning and how the model refitting process will work. When tuning hyperparameters, Finn uses the "Validation" train/test splits, with the final parameters chosen using RMSE. For some models like ARIMA that don't follow a traditional hyperparameter tuning process, the model is fit from scratch across all train/test splits. After hyperparameters are chosen, the model is refit across the "Back_Test" and "Future_Forecast" splits. The "Back_Test" splits are the true testing data that will be used when selecting the final "Best-Model". "Ensemble" splits are also created as ensemble training data if ensemble models are chosen to run. Ensemble models follow a similar tuning process. diff --git a/vignettes/best-model-selection.Rmd b/vignettes/best-model-selection.Rmd index 6b869aa8..0b791151 100644 --- a/vignettes/best-model-selection.Rmd +++ b/vignettes/best-model-selection.Rmd @@ -23,17 +23,21 @@ suppressMessages(library(dplyr)) message("Simple Back Test Results") back_test_tbl <- tibble( - Combo = c("Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1"), - Date = c("2020-01-01", "2020-02-01", "2020-03-01", "2020-04-01", "2020-05-01", "2020-01-01", "2020-02-01", "2020-03-01", "2020-04-01", "2020-05-01"), - Model = c("arima", "arima", "arima", "arima", "arima", "ets", "ets", "ets", "ets", "ets"), - FCST = c(9, 23, 35, 41, 48, 7, 22, 29, 42, 53), + Combo = c("Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1"), + Date = c("2020-01-01", "2020-02-01", "2020-03-01", "2020-04-01", "2020-05-01", "2020-01-01", "2020-02-01", "2020-03-01", "2020-04-01", "2020-05-01"), + Model = c("arima", "arima", "arima", "arima", "arima", "ets", "ets", "ets", "ets", "ets"), + FCST = c(9, 23, 35, 41, 48, 7, 22, 29, 42, 53), Target = c(10, 20, 30, 40, 50, 10, 20, 30, 40, 50) ) %>% - dplyr::mutate(MAPE = abs(Target-FCST)/Target, - Date = as.Date(Date)) %>% + dplyr::mutate( + MAPE = abs(Target - FCST) / Target, + Date = as.Date(Date) + ) %>% dplyr::group_by(Combo, Model) %>% - dplyr::mutate(Target_Total = sum(Target), - Percent_Total = Target/Target_Total) %>% + dplyr::mutate( + Target_Total = sum(Target), + Percent_Total = Target / Target_Total + ) %>% dplyr::ungroup() print(back_test_tbl) @@ -44,12 +48,13 @@ message("Overall Model Accuracy by Combo") suppressMessages(best_model <- back_test_tbl %>% dplyr::group_by(Combo, Model) %>% dplyr::mutate(Weighted_MAPE = MAPE * Percent_Total) %>% - dplyr::summarise(MAPE = mean(MAPE), - Weighted_MAPE = sum(Weighted_MAPE)) %>% + dplyr::summarise( + MAPE = mean(MAPE), + Weighted_MAPE = sum(Weighted_MAPE) + ) %>% dplyr::ungroup()) print(best_model) - ``` During the simple back test process above, arima seems to be the better model from a pure MAPE perspective, but ETS ends up being the winner when using weighted MAPE. The benefits of weighted MAPE allow finnts to find the optimal model that performs the best on the biggest components of a forecast, which comes with the added benefit of putting more weight on more recent observations since those are more likely to have larger target values then ones further into the past. Another way of putting more weight on more recent observations is how Finn overlaps its back testing scenarios. This means the most recent observations are tested for accuracy in different forecast horizons (H=1, H=2, etc). More info on this in the back testing vignette. diff --git a/vignettes/external-regressors.Rmd b/vignettes/external-regressors.Rmd index d89387fa..ebd0f8e4 100644 --- a/vignettes/external-regressors.Rmd +++ b/vignettes/external-regressors.Rmd @@ -42,11 +42,11 @@ Below is an example of how you can set up your input_data to leverage future val library(dplyr) tibble( - Combo = c("Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1"), - Date = c("2020-01-01", "2020-02-01", "2020-03-01", "2020-04-01", "2020-05-01", "2020-06-01", "2020-07-01", "2020-08-01", "2020-09-01", "2020-10-01", "2020-11-01", "2020-12-01", "2021-01-01", "2021-02-01", "2021-03-01"), - Target = c(10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120, NA, NA, NA), - Holiday = c("New Years", "Valentines Day", "None", "Easter", "None", "None", "4th of July", "None", "Labor Day", "Halloween", "Thanksgiving", "Christmas", "New Years", "Valentines Day", "None"), - GDP = c(5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, NA, NA, NA), + Combo = c("Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1", "Country_1"), + Date = c("2020-01-01", "2020-02-01", "2020-03-01", "2020-04-01", "2020-05-01", "2020-06-01", "2020-07-01", "2020-08-01", "2020-09-01", "2020-10-01", "2020-11-01", "2020-12-01", "2021-01-01", "2021-02-01", "2021-03-01"), + Target = c(10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120, NA, NA, NA), + Holiday = c("New Years", "Valentines Day", "None", "Easter", "None", "None", "4th of July", "None", "Labor Day", "Halloween", "Thanksgiving", "Christmas", "New Years", "Valentines Day", "None"), + GDP = c(5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, NA, NA, NA), Sales_Pipeline = c(100, 110, 120, 130, 140, 150, 160, 170, 180, 190, 200, 210, 220, 230, 240) ) %>% dplyr::mutate(Date = as.Date(Date)) diff --git a/vignettes/feature-engineering.Rmd b/vignettes/feature-engineering.Rmd index 2cbd38dc..02122060 100644 --- a/vignettes/feature-engineering.Rmd +++ b/vignettes/feature-engineering.Rmd @@ -74,7 +74,6 @@ In addition to the standard approaches above, finnts also does two different way In the first recipe, referred to as "R1" in default finnts models, by default takes a single step horizon approach. Meaning all of the engineered target and external regressor features are used but the lags cannot be less than the forecast horizon. For example, a monthly data set with a forecast horizon of 3, finnts will take engineered features like lags and rolling window features but only use those lags that are for periods equal to or greater than 3. You can also run a multistep horizon approach by setting `multistep_horizon` to TRUE in `prep_models()`. The multistep approach will create features that can be used by specific multivariate models that optimize for each period in a forecast horizon. More on this in the "models used in finnts" vignette. Recursive forecasting is not supported in finnts multivariate machine learning models, since feeding forecast outputs as features to create another forecast adds complex layers of uncertainty that can easily spiral out of control and produce poor forecasts. NA values created by generating lag features are filled "up". This results in the first initial periods of a time series having some data leakage but the effect should be small if the time series is long enough. ```{r, message = FALSE} - library(finnts) hist_data <- timetk::m4_monthly %>% @@ -112,7 +111,6 @@ print(R1_prepped_data_tbl) The second recipe is referred to as "R2" in default finnts models. It takes a very different approach than the "R1" recipe. For a 3 month forecast horizon on a monthly dataset, target and rolling window features are created depending on the horizon period. They are also constrained to be equal or less than the forecast horizon. In the below example, "Origin" and "Horizon" features are created for each time period. This results in duplicating rows in the original data set to create new features that are now specific to each horizon period. This helps the default finnts models find new unique relationships to model, when compared to a more formal approach in "R1". NA values created by generating lag features are filled "up". ```{r, message = FALSE} - library(finnts) hist_data <- timetk::m4_monthly %>% diff --git a/vignettes/finnts.Rmd b/vignettes/finnts.Rmd index 0915bf17..32060018 100644 --- a/vignettes/finnts.Rmd +++ b/vignettes/finnts.Rmd @@ -10,8 +10,8 @@ vignette: > ```{r, include = FALSE} knitr::opts_chunk$set( collapse = TRUE, - comment = "#>", - warning = FALSE, + comment = "#>", + warning = FALSE, message = FALSE ) library(finnts) @@ -62,18 +62,15 @@ The above data set contains 4 individual time series, identified using the "id" Before we call the Finn forecast function. Let's first set up some run information using `set_run_info()`, this helps log all components of our Finn forecast successfully. ```{r, message = FALSE, eval = hist_data, error=FALSE, warning = FALSE, echo=T, eval = TRUE} - run_info <- set_run_info( - experiment_name = "finn_forecast", + experiment_name = "finn_forecast", run_name = "test_run" ) - ``` Calling the "forecast_time_series" function is the easiest part. In this example we will be running just two models. ```{r, message = FALSE, eval = hist_data, error=FALSE, warning = FALSE, echo=T, eval = TRUE} - # no need to assign it to a variable, since all of the outputs are written to disk :) forecast_time_series( run_info = run_info, @@ -82,8 +79,8 @@ forecast_time_series( target_variable = "value", date_type = "month", forecast_horizon = 3, - back_test_scenarios = 6, - models_to_run = c("arima", "ets"), + back_test_scenarios = 6, + models_to_run = c("arima", "ets"), return_data = FALSE ) ``` @@ -93,7 +90,6 @@ forecast_time_series( ### Initial Finn Outputs ```{r, message = FALSE, eval = finn_output, message = FALSE, eval = FALSE, echo=T} - finn_output_tbl <- get_forecast_data(run_info = run_info) print(finn_output_tbl) @@ -102,7 +98,6 @@ print(finn_output_tbl) ### Future Forecast ```{r, message = FALSE, eval = finn_output, message = FALSE, eval = FALSE, echo=T} - future_forecast_tbl <- finn_output_tbl %>% dplyr::filter(Run_Type == "Future_Forecast") @@ -142,13 +137,17 @@ print(trained_model_tbl) ### Initial Prepped Data ```{r, message = FALSE, eval = finn_output, eval = FALSE, echo=T} -R1_prepped_data_tbl <- get_prepped_data(run_info = run_info, - recipe = "R1") +R1_prepped_data_tbl <- get_prepped_data( + run_info = run_info, + recipe = "R1" +) print(R1_prepped_data_tbl) -R2_prepped_data_tbl <- get_prepped_data(run_info = run_info, - recipe = "R2") +R2_prepped_data_tbl <- get_prepped_data( + run_info = run_info, + recipe = "R2" +) print(R2_prepped_data_tbl) ``` diff --git a/vignettes/forecast-components.Rmd b/vignettes/forecast-components.Rmd index b1a29e47..7a23c43c 100644 --- a/vignettes/forecast-components.Rmd +++ b/vignettes/forecast-components.Rmd @@ -10,8 +10,8 @@ vignette: > ```{r, include = FALSE} knitr::opts_chunk$set( collapse = TRUE, - comment = "#>", - warning = FALSE, + comment = "#>", + warning = FALSE, message = FALSE ) library(finnts) @@ -29,13 +29,15 @@ Let's get some example data and then set our Finn run info. library(finnts) hist_data <- timetk::m4_monthly %>% - dplyr::filter(date >= "2013-01-01", - id == "M2") %>% + dplyr::filter( + date >= "2013-01-01", + id == "M2" + ) %>% dplyr::rename(Date = date) %>% dplyr::mutate(id = as.character(id)) run_info <- set_run_info( - experiment_name = "finnts_fcst", + experiment_name = "finnts_fcst", run_name = "finn_sub_component_run" ) ``` @@ -45,20 +47,26 @@ run_info <- set_run_info( Clean and prepare our data before training models. We can even pull out our prepped data to see the features and transformations applied before models are trained. ```{r message=FALSE} -prep_data(run_info = run_info, - input_data = hist_data, - combo_variables = c("id"), - target_variable = "value", - date_type = "month", - forecast_horizon = 6) +prep_data( + run_info = run_info, + input_data = hist_data, + combo_variables = c("id"), + target_variable = "value", + date_type = "month", + forecast_horizon = 6 +) -R1_prepped_data_tbl <- get_prepped_data(run_info = run_info, - recipe = "R1") +R1_prepped_data_tbl <- get_prepped_data( + run_info = run_info, + recipe = "R1" +) print(R1_prepped_data_tbl) -R2_prepped_data_tbl <- get_prepped_data(run_info = run_info, - recipe = "R2") +R2_prepped_data_tbl <- get_prepped_data( + run_info = run_info, + recipe = "R2" +) print(R2_prepped_data_tbl) ``` @@ -70,12 +78,16 @@ Now that our data is prepared for modeling, let's now train some models. First w Then we can kick off training each model on our data. ```{r, message = FALSE} -prep_models(run_info = run_info, - models_to_run = c("arima", "ets", "glmnet"), - num_hyperparameters = 2) +prep_models( + run_info = run_info, + models_to_run = c("arima", "ets", "glmnet"), + num_hyperparameters = 2 +) -train_models(run_info = run_info, - run_global_models = FALSE) +train_models( + run_info = run_info, + run_global_models = FALSE +) ``` ## Train Ensemble Models @@ -99,7 +111,6 @@ final_models(run_info = run_info) Finally we can now retrieve the forecast results from this Finn run. ```{r, message = FALSE} - finn_output_tbl <- get_forecast_data(run_info = run_info) print(finn_output_tbl) diff --git a/vignettes/hierarchical-forecasting.Rmd b/vignettes/hierarchical-forecasting.Rmd index f358456d..7f617130 100644 --- a/vignettes/hierarchical-forecasting.Rmd +++ b/vignettes/hierarchical-forecasting.Rmd @@ -27,15 +27,14 @@ message("Standard Hierarchical Time Series Data") hts <- tibble( Continent = c("North America", "North America", "North America", "North America", "North America", "North America", "North America", "North America", "North America"), - Country = c("United States", "United States", "United States", "United States", "United States", "United States", "Mexico", "Mexico", "Mexico"), - City = c("Kansas City", "Kansas City", "Kansas City", "Seattle", "Seattle", "Seattle", "Mexico City", "Mexico City", "Mexico City"), - Date = c("2020-01-01", "2020-02-01", "2020-03-01", "2020-01-01", "2020-02-01", "2020-03-01", "2020-01-01", "2020-02-01", "2020-03-01"), + Country = c("United States", "United States", "United States", "United States", "United States", "United States", "Mexico", "Mexico", "Mexico"), + City = c("Kansas City", "Kansas City", "Kansas City", "Seattle", "Seattle", "Seattle", "Mexico City", "Mexico City", "Mexico City"), + Date = c("2020-01-01", "2020-02-01", "2020-03-01", "2020-01-01", "2020-02-01", "2020-03-01", "2020-01-01", "2020-02-01", "2020-03-01"), Target = c(100, 250, 320, 80, 200, 270, 50, 80, 120) ) %>% dplyr::mutate(Date = as.Date(Date)) print(hts) - ``` In the above example, "City" was the lowest level of the hierarchy, which feeds into "Country", which then feeds into "Continent". Finn will take this data and will forecast by City, total Country, and total Continent. After each model is ran for every level in the hierarchy, the best model is chosen at each level, then the "Best Model" and every other model is reconciled back down to the lowest level. @@ -50,16 +49,15 @@ suppressMessages(library(dplyr)) message("Grouped Hierarchical Time Series Data") gts <- tibble( - Country = c("United States", "United States", "United States", "United States", "United States", "United States", "Mexico", "Mexico", "Mexico", "Mexico", "Mexico", "Mexico"), - Segment = c("Enterprise", "Enterprise", "Enterprise", "Public Sector", "Public Sector", "Public Sector", "Enterprise", "Enterprise", "Enterprise", "Enterprise", "Enterprise", "Enterprise"), - Product = c("Coffee", "Coffee", "Coffee", "Coffee", "Coffee", "Coffee", "Coffee", "Coffee", "Coffee", "Tea", "Tea", "Tea"), - Date = c("2020-01-01", "2020-02-01", "2020-03-01", "2020-01-01", "2020-02-01", "2020-03-01", "2020-01-01", "2020-02-01", "2020-03-01", "2020-01-01", "2020-02-01", "2020-03-01"), + Country = c("United States", "United States", "United States", "United States", "United States", "United States", "Mexico", "Mexico", "Mexico", "Mexico", "Mexico", "Mexico"), + Segment = c("Enterprise", "Enterprise", "Enterprise", "Public Sector", "Public Sector", "Public Sector", "Enterprise", "Enterprise", "Enterprise", "Enterprise", "Enterprise", "Enterprise"), + Product = c("Coffee", "Coffee", "Coffee", "Coffee", "Coffee", "Coffee", "Coffee", "Coffee", "Coffee", "Tea", "Tea", "Tea"), + Date = c("2020-01-01", "2020-02-01", "2020-03-01", "2020-01-01", "2020-02-01", "2020-03-01", "2020-01-01", "2020-02-01", "2020-03-01", "2020-01-01", "2020-02-01", "2020-03-01"), Target = c(10, 20, 30, 5, 8, 11, 20, 23, 27, 50, 55, 60) ) %>% dplyr::mutate(Date = as.Date(Date)) print(gts) - ``` It would be hard to aggregate the above data in a traditional hierarchy. The same products are found in different segments and countries, also the same segments are found in multiple countries. Finn will follow a similar modeling process as the one described for a traditional hierarchy, but instead will create forecasts at the below levels. diff --git a/vignettes/models-used-in-finnts.Rmd b/vignettes/models-used-in-finnts.Rmd index 0e5eae27..fb2da330 100644 --- a/vignettes/models-used-in-finnts.Rmd +++ b/vignettes/models-used-in-finnts.Rmd @@ -22,7 +22,7 @@ reactable::reactable( data.frame() %>% rbind(data.frame(Model = "arima", Type = "univariate, local", Underlying.Package = "modeltime, forecast", Description = "Regression model that is based on finding relationships between lagged values of the target variable you are trying to forecast.")) %>% rbind(data.frame(Model = "arima-boost", Type = "multivariate, local", Underlying.Package = "modeltime, forecast, xgboost", Description = "Arima model (refer to arima) that models the trend compoent of target variable, then uses xgboost model (refer to xgboost) to train on the remaining residuals.")) %>% - rbind(data.frame(Model = "arimax", Type = "multivariate, local", Underlying.Package = "modeltime, forecast", Description = "ARIMA model that incorporates external regressors and other engineered features.")) %>% + rbind(data.frame(Model = "arimax", Type = "multivariate, local", Underlying.Package = "modeltime, forecast", Description = "ARIMA model that incorporates external regressors and other engineered features.")) %>% rbind(data.frame(Model = "cubist", Type = "multivariate, local, global, ensemble", Underlying.Package = "rules", Description = "Hybrid of tree based and linear regression approach. Many decision trees are built, but regression coefficients are used at each terminal node instead of averging values in other tree based approaches.")) %>% rbind(data.frame(Model = "croston", Type = "univariate, local", Underlying.Package = "modeltime, forecast", Description = "Useful for intermittent demand forecasting, aka when there are a lot of periods of zero values. Involves simple exponential smoothing on non-zero values of target variable and another application of seasonal exponential smoothing on periods between non-zero elements of the target variable. Refer to ets for more details on exponential smoothing.")) %>% rbind(data.frame(Model = "ets", Type = "univariate, local", Underlying.Package = "modeltime, forecast", Description = "Forecasts produced using exponential smoothing methods are weighted averages of past observations, with the weights decaying exponentially as the observations get older. Exponential smoothing models try to forecast the components of a time series which can be broken down in to error, trend, and seasonality. These components can be forecasted separately then either added or multiplied together to get the final forecast output.")) %>% @@ -41,8 +41,7 @@ reactable::reactable( rbind(data.frame(Model = "svm-rbf", Type = "multivariate, local, global, ensemble", Underlying.Package = "parsnip, kernlab", Description = "Uses a nonlinear function, specifically a radial basis function, to create a regression line of the target variable.")) %>% rbind(data.frame(Model = "tbats", Type = "univariate, local", Underlying.Package = "modeltime, forecast", Description = "A spin off of the traditional ets model (refer to ets), with some additional components to capture multiple seasonalities.")) %>% rbind(data.frame(Model = "theta", Type = "univariate, local", Underlying.Package = "modeltime, forecast", Description = "Theta is similar to exponential smoothing (refer to ets) but with another component called drift. Adding drift to exponential smoothing allows the forecast to increase or decrease over time, where the amount of change over time (called the drift) is set to be the average change seen within the historical data.")) %>% - rbind(data.frame(Model = "xgboost", Type = "multivariate, local, global, ensemble", Underlying.Package = "parsnip, xgboost", Description = "Builds many decision trees (similar to random forests), but predictions that are initially inaccurate are applied more weight in subsequent training rounds to increase accuracy across all predictions.")) - , + rbind(data.frame(Model = "xgboost", Type = "multivariate, local, global, ensemble", Underlying.Package = "parsnip, xgboost", Description = "Builds many decision trees (similar to random forests), but predictions that are initially inaccurate are applied more weight in subsequent training rounds to increase accuracy across all predictions.")), defaultColDef = colDef( header = function(value) gsub(".", " ", value, fixed = TRUE), cell = function(value) format(value, nsmall = 1), @@ -51,12 +50,11 @@ reactable::reactable( headerStyle = list(background = "#f7f7f8") ), columns = list( - Description = colDef(minWidth = 140, align = "left") # overrides the default + Description = colDef(minWidth = 140, align = "left") # overrides the default ), bordered = TRUE, highlight = TRUE ) - ``` ### Univariate vs Multivariate Models diff --git a/vignettes/parallel-processing.Rmd b/vignettes/parallel-processing.Rmd index 2a2b8da3..b9ecc616 100644 --- a/vignettes/parallel-processing.Rmd +++ b/vignettes/parallel-processing.Rmd @@ -49,8 +49,8 @@ hist_data <- timetk::m4_monthly %>% data_sdf <- sparklyr::copy_to(sc, hist_data, "data_sdf", overwrite = TRUE) run_info <- set_run_info( - experiment_name = "finn_fcst", - run_name = "spark_run_1", + experiment_name = "finn_fcst", + run_name = "spark_run_1", path = "/dbfs/mnt/example/folder" # important that you mount an ADLS path ) @@ -60,16 +60,18 @@ forecast_time_series( combo_variables = c("id"), target_variable = "value", date_type = "month", - forecast_horizon = 3, - parallel_processing = "spark", + forecast_horizon = 3, + parallel_processing = "spark", return_data = FALSE ) # return the outputs as a spark data frame -finn_output_tbl <- get_forecast_data(run_info = run_info, - return_type = "sdf") +finn_output_tbl <- get_forecast_data( + run_info = run_info, + return_type = "sdf" +) ``` The above example runs each time series on a separate core on a spark cluster. You can also submit multiple time series where each time series runs on a separate spark executor (VM) and then leverage all of the cores on that executor to run things like hyperparameter tuning or model refitting in parallel. This creates two levels of parallelization. One at the time series level, then another when doing things like hyperparameter tuning within a specific time series. To do that set `inner_parallel` to TRUE in `forecast_time_series()`. Also make sure that you adjust the number of spark executor cores to 1, that ensures that only 1 time series runs on an executor at a time. Leverage the "spark.executor.cores" argument when configuring your spark connection. This can be done using [sparklyr](https://spark.rstudio.com/guides/connections#:~:text=In%20sparklyr%2C%20Spark%20properties%20can%20be%20set%20by,customized%20as%20shown%20in%20the%20example%20code%20below.) or within the cluster manager itself within the Azure resource. Use the "num_cores" argument in the "forecast_time_series" function to control how many cores should be used within an executor when running things like hyperparameter tuning. -`forecast_time_series()` will be looking for a variable called "sc" to use when submitting tasks to the spark cluster, so make sure you use that as the variable name when connecting to spark. Also it's important that you mount your spark session to an Azure Data Lake Storage (ADLS) account, and provide the mounted path to where you'd like your Finn results to be written to within `set_run_info()`. \ No newline at end of file +`forecast_time_series()` will be looking for a variable called "sc" to use when submitting tasks to the spark cluster, so make sure you use that as the variable name when connecting to spark. Also it's important that you mount your spark session to an Azure Data Lake Storage (ADLS) account, and provide the mounted path to where you'd like your Finn results to be written to within `set_run_info()`.