diff --git a/.Rbuildignore b/.Rbuildignore index af67f6c0..32129eac 100644 --- a/.Rbuildignore +++ b/.Rbuildignore @@ -1,3 +1,5 @@ +$Dockerfile-batch^ +^.env$ ^CONTRIBUTING.md$ ^DISCLAIMER.md$ ^Dockerfile$ @@ -11,6 +13,7 @@ ^\.vscode$ ^\.vscode$ ^_pkgdown\.yml$ +^azure$ ^batch-autoscale-formula.txt$ ^code-of-conduct.md$ ^codecov\.yml$ diff --git a/.gitignore b/.gitignore index daa0f4be..45238f86 100644 --- a/.gitignore +++ b/.gitignore @@ -385,3 +385,7 @@ docs # for now... will have to gpg encrypt cfa-epinow2-batch-pool-config.json azure/*.toml + +# Careful with Secrets! +*.env +*.env.gpg diff --git a/Dockerfile-batch b/Dockerfile-batch new file mode 100644 index 00000000..20e6de28 --- /dev/null +++ b/Dockerfile-batch @@ -0,0 +1,6 @@ +FROM ghcr.io/astral-sh/uv:python3.12-bookworm + +COPY azure . +RUN uv pip install -r requirements.txt --system + +CMD /bin/bash diff --git a/Makefile b/Makefile index ba918dc5..a7c02296 100644 --- a/Makefile +++ b/Makefile @@ -1,25 +1,55 @@ -ifndef TAG - TAG = local +REGISTRY=cfaprdbatchcr.azurecr.io/ +IMAGE_NAME=cfa-epinow2-pipeline +BRANCH=$(shell git branch --show-current) +ifeq ($(BRANCH), 'main') +TAG=latest +else +TAG=$(BRANCH) endif -IMAGE_NAME=cfa-epinow2-pipeline +CONFIG=test.json +JOB=batch-test + deps: docker build -t $(REGISTRY)$(IMAGE_NAME)-dependencies:$(TAG) -f Dockerfile-dependencies pull: + az acr login --name 'cfaprdbatchcr' docker pull $(REGISTRY)$(IMAGE_NAME)-dependencies:$(TAG) + docker pull $(REGISTRY)$(IMAGE_NAME):test-$(TAG) build: - docker build -t $(REGISTRY)$(IMAGE_NAME):$(TAG) \ + docker build -t $(REGISTRY)$(IMAGE_NAME):test-$(TAG) \ --build-arg TAG=$(TAG) -f Dockerfile . tag: docker tag $(IMAGE_NAME):$(TAG) $(REGISTRY)$(IMAGE_NAME):$(TAG) +config: + gh workflow run \ + -R cdcgov/cfa-config-generator run-workload.yaml \ + -f disease=all \ + -f state=all + +run-batch: + docker build -f Dockerfile-batch -t batch . --no-cache + docker run --rm \ + --env-file .env \ + -it \ + batch python job.py "$(IMAGE)" "$(POOL)" "$(JOB)" + +run: + docker run --mount type=bind,source=$(PWD),target=/mnt -it \ + --env-file .env \ + --rm $(REGISTRY)$(IMAGE_NAME):test-$(TAG) \ + Rscript -e "CFAEpiNow2Pipeline::orchestrate_pipeline('$(CONFIG)', config_container = 'rt-epinow2-config', input_dir = '/mnt/input', output_dir = '/mnt', output_container = 'zs-test-pipeline-update')" + + up: docker run --mount type=bind,source=$(PWD),target=/cfa-epinow2-pipeline -it \ - --rm $(REGISTRY)$(IMAGE_NAME):$(TAG) /bin/bash + --env-file .env \ + --rm $(REGISTRY)$(IMAGE_NAME):test-$(TAG) /bin/bash run-function: docker run --mount type=bind,source=$(PWD),target=/cfa-epinow2-pipeline -it \ diff --git a/NAMESPACE b/NAMESPACE index d57090ee..2d0df7bf 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -8,7 +8,8 @@ export(GenerationInterval) export(Parameters) export(RightTruncation) export(apply_exclusions) -export(download_from_azure_blob) +export(download_file_from_container) +export(download_if_specified) export(execute_model_logic) export(extract_diagnostics) export(fetch_blob_container) diff --git a/NEWS.md b/NEWS.md index 208bbf01..67b22c29 100644 --- a/NEWS.md +++ b/NEWS.md @@ -7,6 +7,8 @@ * Use empty string for paths when non-existant. * Add function families * Populated the default values of the metadata to be saved. +* Working upload/download from ABS +* Working Azure upload/download * Creating a Config class to make syncing configuration differences easier. * Add a JSON reader for the Config class. * Use the Config class throughout the pipeline. diff --git a/R/azure.R b/R/azure.R index 2900a064..65bf49fb 100644 --- a/R/azure.R +++ b/R/azure.R @@ -1,70 +1,73 @@ -#' Download specified blobs from Blob Storage and save them in a local dir -#' -#' Note that I think it might be wise to instead specify a blob prefix, list the -#' blobs, and download all the listed blobs. This would let us have some more -#' flexibility with downloading whole remote directories (like delta tables) -#' -#' @param blob_names A vector of blobs to donwload from `container_name` -#' @param local_dest The path to the local directory to save the files in -#' @param container_name The Azure Blob Storage container with `blob_names` +#' Download if specified #' -#' @return NULL on success +#' @param blob_path The name of the blob to download +#' @param blob_storage_container The name of the container to donwload from +#' @param output_dir The directory to write the downloaded file to +#' @return The path of the file #' @family azure #' @export -download_from_azure_blob <- function(blob_names, local_dest, container_name) { - # Attempt to connect to the storage container - blob_container <- rlang::try_fetch( - fetch_blob_container(container_name), - error = function(con) { - cli::cli_abort( - c( - "Unable to authenticate connection to Blob endpoint", - "!" = "Check correct credentials are present as env variables", - "!" = "Check container {.var {container_name}} is correct" - ), - parent = con +download_if_specified <- function( + blob_path, + blob_storage_container, + output_dir) { + # Guard against null input erroring out file.exists() + if (rlang::is_null(blob_path)) { + local_path <- NULL + } else { + file_exists <- file.exists(file.path(output_dir, blob_path)) + if (!rlang::is_null(blob_storage_container) && !file_exists) { + container <- fetch_blob_container(blob_storage_container) + local_path <- download_file_from_container( + blob_storage_path = blob_path, + local_file_path = file.path(output_dir, blob_path), + storage_container = container ) + } else { + local_path <- file.path(output_dir, blob_path) } - ) - - # Attempt to save each blob into local storage - for (blob in blob_names) { - local_file_path <- file.path(local_dest, blob) - rlang::try_fetch( - download_file_from_container( - blob, - blob_container, - local_file_path - ), - error = function(con) { - cli::cli_abort( - c( - "Error downloading blob {.path {blob}}", - "Using container {.path {container_name}}", - "Writing to local file path {.path local_file_path}" - ), - parent = con - ) - } - ) } - cli::cli_alert_success("Blobs {.path {blob_names}} downloaded successfully") - invisible(NULL) + local_path } +#' Download specified blobs from Blob Storage and save them in a local dir +#' +#' @param blob_storage_path A character of a blob in `storage_container` +#' @param local_file_path The local path to save the blob +#' @param storage_container The blob storage container with `blob_storage_path` +# +#' @return Invisibly, `local_file_path` +#' @family azure +#' @export download_file_from_container <- function( blob_storage_path, - container, - local_file_path) { + local_file_path, + storage_container) { cli::cli_alert_info( "Downloading blob {.path {blob_storage_path}} to {.path {local_file_path}}" ) - AzureStor::download_blob( - container = container, - src = blob_storage_path, - dest = local_file_path, - overwrite = TRUE + rlang::try_fetch( + { + dirs <- dirname(local_file_path) + + if (!dir.exists(dirs)) { + cli::cli_alert("Creating directory {.path {dirs}}") + dir.create(dirs, recursive = TRUE) + } + + AzureStor::download_blob( + container = storage_container, + src = blob_storage_path, + dest = local_file_path, + overwrite = TRUE + ) + }, + error = function(cnd) { + cli::cli_abort(c( + "Failed to download {.path {blob_storage_path}}", + ">" = "Does the blob exist in the container?" + )) + } ) cli::cli_alert_success( @@ -74,7 +77,7 @@ download_file_from_container <- function( invisible(local_file_path) } -#' Load Azure Blob endpoint using credentials in environment variables +#' Load Azure Blob container using credentials in environment variables #' #' This **impure** function depends on the environment variables: #' * az_tenant_id @@ -94,30 +97,52 @@ fetch_blob_container <- function(container_name) { ) cli::cli_alert_info("Loading Azure credentials from env vars") # nolint start: object_name_linter - az_tenant_id <- fetch_credential_from_env_var("az_tenant_id ") - az_subscription_id <- fetch_credential_from_env_var("az_subscription_id") - az_resource_group <- fetch_credential_from_env_var("az_resource_group") - az_storage_account <- fetch_credential_from_env_var("az_storage_account") + az_tenant_id <- fetch_credential_from_env_var("az_tenant_id") + az_client_id <- fetch_credential_from_env_var("az_client_id") + az_service_principal <- fetch_credential_from_env_var("az_service_principal") # nolint end: object_name_linter cli::cli_alert_success("Credentials loaded successfully") cli::cli_alert_info("Authenticating with loaded credentials") - az <- AzureRMR::get_azure_login(az_tenant_id) - subscription <- az$get_subscription(az_subscription_id) - resource_group <- subscription$get_resource_group(az_resource_group) - storage_account <- resource_group$get_storage_account(az_storage_account) - - # Getting the access key - keys <- storage_account$list_keys() - access_key <- keys[["key1"]] + rlang::try_fetch( + { + # First, get a general-purpose token using SP flow + # Analogous to: + # az login --service-principal \ + # --username $az_client_id \ + # --password $az_service_principal \ + # --tenant $az_tenant_id + # NOTE: the SP is also sometimes called the `client_secret` + token <- AzureRMR::get_azure_token( + resource = "https://storage.azure.com", + tenant = az_tenant_id, + app = az_client_id, + password = az_service_principal + ) + # Then fetch a storage endpoint using the token. Follows flow from + # https://github.com/Azure/AzureStor. + # Note that we're using the ABS endpoint (the first example line) + # but following the AAD token flow from the AAD alternative at + # end of the box. If we didn't replace the endpoint and used the + # example flow then it allows authentication to blob but throws + # a 409 when trying to download. + endpoint <- AzureStor::storage_endpoint( + "https://cfaazurebatchprd.blob.core.windows.net", + token = token + ) - endpoint <- AzureStor::blob_endpoint( - storage_account$properties$primaryEndpoints$blob, - key = access_key + # Finally, set up instantiation of storage container generic + container <- AzureStor::storage_container(endpoint, container_name) + }, + error = function(cnd) { + cli::cli_abort( + "Failure authenticating connection to {.var {container_name}}", + parent = cnd + ) + } ) - container <- AzureStor::storage_container(endpoint, container_name) cli::cli_alert_success("Authenticated connection to {.var {container_name}}") return(container) diff --git a/R/pipeline.R b/R/pipeline.R index c5b825ca..c508faca 100644 --- a/R/pipeline.R +++ b/R/pipeline.R @@ -9,9 +9,13 @@ #' #' @param config_path A string specifying the file path to the JSON #' configuration file. -#' @param blob_storage_container Optional. The name of the blob storage +#' @param config_container Optional. The name of the blob storage container +#' from which the config file will be downloaded. +#' @param output_container Optional. The name of the blob storage #' container to which logs and outputs will be uploaded. If NULL, no upload -#' will occur. (Planned feature, not currently implemented) +#' will occur. +#' @param input_dir A string specifying the directory to read inputs from. If +#' passing storage containers, this is where the files will be downloaded to. #' @param output_dir A string specifying the directory where output, logs, and #' other pipeline artifacts will be saved. Defaults to the root directory ("/"). #' @@ -70,10 +74,19 @@ #' @family pipeline #' @export orchestrate_pipeline <- function(config_path, - blob_storage_container = NULL, - output_dir = "/") { + output_container = NULL, + config_container = NULL, + input_dir = "/input", + output_dir = "/output") { config <- rlang::try_fetch( - read_json_into_config(config_path, c("exclusions")), + { + config_path <- download_if_specified( + blob_path = config_path, + blob_storage_container = config_container, + output_dir = input_dir + ) + read_json_into_config(config_path, c("exclusions")) + }, error = function(con) { cli::cli_warn("Bad config file", parent = con, @@ -122,7 +135,7 @@ orchestrate_pipeline <- function(config_path, # `pipeline_success` is set to false, which will be stored in the # metadata in the next PR. pipeline_success <- rlang::try_fetch( - execute_model_logic(config, output_dir, blob_storage_container), + execute_model_logic(config, input_dir = input_dir, output_dir = output_dir), error = function(con) { cli::cli_warn("Pipeline run failed", parent = con, @@ -134,6 +147,19 @@ orchestrate_pipeline <- function(config_path, # TODO: Move metadata to outer wrapper cli::cli_alert_info("Finishing run at {Sys.time()}") + + if (!rlang::is_null(output_container)) { + outfiles <- file.path(output_dir, config@job_id, "*") + cli::cli_alert("Uploading {.path {outfiles}} to {.path {output_container}}") + cont <- fetch_blob_container(output_container) + AzureStor::multiupload_blob( + container = cont, + src = outfiles, + dest = config@job_id, + recursive = TRUE + ) + } + invisible(pipeline_success) } @@ -154,9 +180,14 @@ orchestrate_pipeline <- function(config_path, #' @rdname pipeline #' @family pipeline #' @export -execute_model_logic <- function(config, output_dir, blob_storage_container) { +execute_model_logic <- function(config, input_dir, output_dir) { + data_path <- download_if_specified( + blob_path = config@data@path, + blob_storage_container = config@data@blob_storage_container, + output_dir = input_dir + ) cases_df <- read_data( - data_path = config@data@path, + data_path = data_path, disease = config@disease, state_abb = config@geo_value, report_date = config@report_date, @@ -166,16 +197,39 @@ execute_model_logic <- function(config, output_dir, blob_storage_container) { # rlang::is_empty() checks for empty and NULL values if (!rlang::is_empty(config@exclusions@path)) { - exclusions_df <- read_exclusions(config@exclusions@path) + exclusions_path <- download_if_specified( + blob_path = config@exclusions@path, + blob_storage_container = config@exclusions@blob_storage_container, + output_dir = input_dir + ) + exclusions_df <- read_exclusions(exclusions_path) cases_df <- apply_exclusions(cases_df, exclusions_df) } else { cli::cli_alert("No exclusions file provided. Skipping exclusions") } + # GI + gi_path <- download_if_specified( + blob_path = config@parameters@generation_interval@path, + blob_storage_container = config@parameters@generation_interval@blob_storage_container, # nolint + output_dir = input_dir + ) + # Delay + delay_path <- download_if_specified( + blob_path = config@parameters@delay_interval@path, + blob_storage_container = config@parameters@delay_interval@blob_storage_container, # nolint + output_dir = input_dir + ) + right_trunc_path <- download_if_specified( + blob_path = config@parameters@right_truncation@path, + blob_storage_container = config@parameters@right_truncation@blob_storage_container, # nolint + output_dir = input_dir + ) + params <- read_disease_parameters( - generation_interval_path = config@parameters@generation_interval@path, - delay_interval_path = config@parameters@delay_interval@path, - right_truncation_path = config@parameters@right_truncation@path, + generation_interval_path = gi_path, + delay_interval_path = delay_path, + right_truncation_path = right_trunc_path, disease = config@disease, as_of_date = config@parameters@as_of_date, group = config@geo_value, @@ -226,7 +280,7 @@ execute_model_logic <- function(config, output_dir, blob_storage_container) { max_reference_date = config@max_reference_date, min_reference_date = config@min_reference_date, exclusions = empty_str_if_non_existant(config@exclusions@path), - blob_storage_container = empty_str_if_non_existant(blob_storage_container), + # Add the config container here when refactoring out to outer func run_at = format(Sys.time(), "%Y-%m-%dT%H:%M:%S%z") ) diff --git a/R/utils.R b/R/utils.R index 939d1079..8ad4c6f4 100644 --- a/R/utils.R +++ b/R/utils.R @@ -21,6 +21,7 @@ check_file_exists <- function(data_path) { } #' If `x` is null or empty, return an empty string, otherwise `x` +#' @noRd empty_str_if_non_existant <- function(x) { ifelse(rlang::is_empty(x), "", x) } diff --git a/azure/job.py b/azure/job.py new file mode 100644 index 00000000..982c4804 --- /dev/null +++ b/azure/job.py @@ -0,0 +1,96 @@ +import datetime +import sys +import os +import uuid + +from azure.identity import DefaultAzureCredential +from msrest.authentication import BasicTokenAuthentication +from azure.storage.blob import BlobServiceClient +from azure.batch import BatchServiceClient +import azure.batch.models as batchmodels + +blob_account = os.environ["BLOB_ACCOUNT"] +blob_url = f"https://{blob_account}.blob.core.windows.net" +batch_account = os.environ["BATCH_ACCOUNT"] +batch_url = f"https://{batch_account}.eastus.batch.azure.com" +image_name = sys.argv[1] +config_container = sys.argv[2] +pool_id = sys.argv[3] +# Re-use Azure Pool name unless otherwise specified +job_id = sys.argv[4] if len(sys.argv) > 3 else pool_id + +if __name__ == "__main__": + # Authenticate with workaround because Batch is the one remaining + # service that doesn't yet support Azure auth flow v2 :) :) + # https://github.com/Azure/azure-sdk-for-python/issues/30468 + credential_v2 = DefaultAzureCredential() + token = {"access_token": credential_v2.get_token("https://batch.core.windows.net/.default").token} + credential_v1 = BasicTokenAuthentication(token) + + batch_client = BatchServiceClient( + credentials=credential_v1, + batch_url=batch_url + ) + + ############# + # Set up job + job = batchmodels.JobAddParameter( + id=job_id, + pool_info=batchmodels.PoolInformation(pool_id=pool_id) + ) + + try: + batch_client.job.add(job) + except batchmodels.BatchErrorException as err: + if err.error.code != "JobExists": + raise + else: + print("Job already exists. Using job object") + + ########## + # Get tasks + blob_service_client = BlobServiceClient(blob_url, credential_v2) + container_client = blob_service_client.get_container_client(container=config_container) + two_mins_ago = datetime.datetime.now(datetime.UTC) - datetime.timedelta(minutes=2) + task_configs: list[str] = [ + b.name + for b in container_client.list_blobs() + if b.creation_time > two_mins_ago + ] + if len(task_configs) == 0: + raise ValueError("No tasks found") + else: + print(f"Creating {len(task_configs)} tasks in job {job_id} on pool {pool_id}") + + ########### + # Set up task on job + registry = os.environ["AZURE_CONTAINER_REGISTRY"] + task_container_settings = batchmodels.TaskContainerSettings( + image_name=registry + '/cfa-epinow2-pipeline:test-' + image_name, + container_run_options='--rm --workdir /' + ) + task_env_settings = [ + batchmodels.EnvironmentSetting(name="az_tenant_id", value=os.environ["AZURE_TENANT_ID"]), + batchmodels.EnvironmentSetting(name="az_client_id", value=os.environ["AZURE_CLIENT_ID"]), + batchmodels.EnvironmentSetting(name="az_service_principal", value=os.environ["AZURE_CLIENT_SECRET"]) + ] + + # Run task at the admin level to be able to read/write to mounted drives + user_identity=batchmodels.UserIdentity( + auto_user=batchmodels.AutoUserSpecification( + scope=batchmodels.AutoUserScope.pool, + elevation_level=batchmodels.ElevationLevel.admin, + ) + ) + + for config_path in task_configs: + command = f"Rscript -e \"CFAEpiNow2Pipeline::orchestrate_pipeline('{config_path}', config_container = '{config_container}', input_dir = '/mnt/input', output_dir = '/mnt/output', output_container = 'zs-test-pipeline-update')\"" + task = batchmodels.TaskAddParameter( + id=str(uuid.uuid4()), + command_line=command, + container_settings=task_container_settings, + environment_settings=task_env_settings, + user_identity=user_identity + ) + + batch_client.task.add(job_id, task) diff --git a/azure/requirements.txt b/azure/requirements.txt index 9bf73d54..4507f789 100644 --- a/azure/requirements.txt +++ b/azure/requirements.txt @@ -1,11 +1,26 @@ +adal==1.2.7 +azure-batch==14.2.0 azure-common==1.1.28 azure-core==1.32.0 azure-identity==1.19.0 -azure-keyvault==4.2.0 -azure-keyvault-certificates==4.9.0 -azure-keyvault-keys==4.10.0 -azure-keyvault-secrets==4.9.0 -azure-mgmt-batch==18.0.0 -azure-mgmt-core==1.5.0 azure-storage-blob==12.24.0 -toml==0.10.2 +certifi==2024.8.30 +cffi==1.17.1 +charset-normalizer==3.4.0 +cryptography==44.0.0 +idna==3.10 +isodate==0.7.2 +msal==1.31.1 +msal-extensions==1.2.0 +msrest==0.7.1 +msrestazure==0.6.4.post1 +oauthlib==3.2.2 +portalocker==2.10.1 +pycparser==2.22 +PyJWT==2.10.1 +python-dateutil==2.9.0.post0 +requests==2.32.3 +requests-oauthlib==2.0.0 +six==1.17.0 +typing_extensions==4.12.2 +urllib3==2.2.3 diff --git a/demo/2024-12-09_demo.md b/demo/2024-12-09_demo.md new file mode 100644 index 00000000..f7b19eff --- /dev/null +++ b/demo/2024-12-09_demo.md @@ -0,0 +1,171 @@ +# Introduction to EpiNow2 pipeline 2.0 + +## Goals + +1. Show what we've accomplished thus far +2. Talk about what's left to do +3. Give some hands on experience + +## What have we done? + +In many ways, this pipeline is similar in spirit to our existing pipeline. +It fits a model in EpiNow2 and saves some outputs to a storage container. + +But it makes a lot of annoying things easier. + +Lets talk it through on my work-in-progress branch: +```sh +gh repo clone cdcgov/cfa-epinow2-pipeline +gh pr checkout 106 +``` + +## Prerequisites + +**System setup and access (one time)** + +0. Complete your [technical onboarding](https://potential-adventure-637em36.pages.github.io/docs/onboarding.html), including gaining access to Predict resources in Azure and the VAP, and becoming a member of the cfa-predict teams in the CDC Enterprise and CDC Gov GitHub github organizations. +1. Install the [Azure CLI](https://learn.microsoft.com/en-us/cli/azure/get-started-with-azure-cli). +2. Ensure that you have write access to the [config generation repo](https://github.com/CDCgov/cfa-config-generator). (If you're joined the cfa-predict team in CDC Gov, this should be taken care of). +3. *Recommended but not required*: Install the [nnh cli tool](https://github.com/cdcent/nnh-cli-tools). + +**Local environment setup (every time)** + +- You must be logged in to the VAP (.ext domain) to access Azure resources. We recommend working from a vap-wsl-ubuntu terminal. +- Run `az login` using the Azure CLI in your current bash session before attempting to run the pipeline. +- Docker must be installed, running, and connected to WSL2. To use the version of Docker that comes pre-installed in the VAP, use the start menu to open the Docker Desktop application and then [ensure that WSL2 is enabled](https://docs/docker.com/go/wsl2). + + +## Maintaining Docker images + +>[!NOTE] +> This section was masterminded by George. He did a fabulous job and I've already found it a huge quality-of-life increase. + +We don't need to do (long, slow) local builds anymore. +GitHub is set up to rebuild the Docker image when you open a PR and on merge to main. +GitHub maintains its own cache, so this build should only ever take a minute or two unless there's a change in the base image. +In that case it currently takes ~10 mins, but that's on GitHub's time and you don't need to worry about it. + +After building the image, GitHub automatically tags it and pushes it to ACR. +If you have our NNH CLI installed, you can see all the tagged images in ACR with +```sh +nnh list-acr-image-tags cfa-epinow2-pipeline +``` + +You can pull the built image down locally for prototyping with +```sh +make pull +``` + +If you `make pull`, it will pull the image associated with your branch (if there's an open PR for that branch). + +I've been prepping for today on the `edit-azure-flow` branch. +Try pulling that image: +```sh +git pull +git switch edit-azure-flow +make pull +``` + +This command should handle all the login for you and set you up with a local copy of the image we build in GitHub. + +If you want to look in the image environment, you can run `make up` to get dropped into shell of a loaded, working container with everything set up. + +If you want to do some local iteration, you can easily rebuild everything with `make build`. This command handles all the steps and, if you run it after an initial `make pull`, takes advantage of the cached image to keep things quick. + +## Azure Batch + +We also automatically set up an associated Batch pool linked to the image. +We refresh the pool every time we refresh the image. + +Basically, we can stop worrying about managing pools and spend more time thinking about model performance. + +## Managing configs + +>[!NOTE] +> This bit was masterminded by Agastya. He implemented some incredibly cool stuff and I learned quite a bit watching him. Go check out the code when you have a free minute. + +We can use a nice button or a script-based workflow to generate configs and drop them into a dedicated `rt-epinow2-config` storage container. + +[See here](https://github.com/CDCgov/cfa-config-generator/actions/workflows/run-workload.yaml) + +Or try running this command +```sh +gh workflow run \ + -R cdcgov/cfa-config-generator run-workload.yaml \ + -f disease=all \ + -f state=all \ + -f reference_dates=2024-01-01,2024-10-10 \ + -f report_date=2024-11-12 +``` + +This means no more hand editing configs! And please don't hand-edit them. +We want to use new unique IDs for every run, which this service automatically handles for us. + +## Pipeline development + +>[!NOTE] +> Adam, Katie, and Nate contributed extensively here. + +We've moved the modeling code into a package and added extensive testing in CI. +We've also added quite a few runtime checks, to prevent regressions from past bugs (e.g., GI formatting). + +These changes should make the code safer and easier to update. +You get immediate feedback on your changes before anything gets close to production. + +These changes won't eliminate bugs, but hopefully it will help. + + +## Updated code + +The code now has some more guardrails and automated checks. +We use the REST API to pull files up and down, so no more worrying about blobfuse caching. +It points to Adam's new parameter estimates and checks that the results match its expected format. +It generates the summaries and draws we want and saves them in parquet for easy exploration (no more munging the Stan object!). + +## Running the pipeline + +Let's give it a local test. +Automatically pointing the pipeline to a newly generated config is still a poin point, so for today let's use a pre-generated configuration file. + +First, I'm going to send everyone the credentials. (Zack -- send people creds). + +The command to run things is +```sh +make run CONFIG='test.json' +``` + +The pipeline will automatically pull down this config and run it. +In the future we could use this approach for local debugging of unexpected pipeline failures. +When run locally, the pipeline will save outputs onto your local directory. +It will also save everything in the storage container (in this case `zs-test-pipeline-update`). + +## Running in Batch + +I only got this running this morning......so let's give it a shot. + +Give this a go: +```sh +make run-batch JOB='` +``` + +If all goes well, this should submit a batch job. Go check Batch Explorer and let me know what you see. + +## Known pain points + +* Credentials: A local run requires credentials to be set up locally (like the `configuration.toml` in the current pipeline + * Not ideal! + * I don't have a workaround right now + * But we're working toward making GitHub handle the Azure bits, so non-local runs should all be automatic and easy. +* The docs are still messy and need some love. + +## Big things left to do + +* Stitching services together: + * Config generation -> validation + * Config -> pipeline run + * Job kickoff +* Some live fire testing + * Things will break + * Some tasks will be harder or annoying + * I want to hear about them. We will make things better. +* More automation of jobs diff --git a/man/download_file_from_container.Rd b/man/download_file_from_container.Rd new file mode 100644 index 00000000..115d55f4 --- /dev/null +++ b/man/download_file_from_container.Rd @@ -0,0 +1,32 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/azure.R +\name{download_file_from_container} +\alias{download_file_from_container} +\title{Download specified blobs from Blob Storage and save them in a local dir} +\usage{ +download_file_from_container( + blob_storage_path, + local_file_path, + storage_container +) +} +\arguments{ +\item{blob_storage_path}{A character of a blob in \code{storage_container}} + +\item{local_file_path}{The local path to save the blob} + +\item{storage_container}{The blob storage container with \code{blob_storage_path}} +} +\value{ +Invisibly, \code{local_file_path} +} +\description{ +Download specified blobs from Blob Storage and save them in a local dir +} +\seealso{ +Other azure: +\code{\link{download_if_specified}()}, +\code{\link{fetch_blob_container}()}, +\code{\link{fetch_credential_from_env_var}()} +} +\concept{azure} diff --git a/man/download_from_azure_blob.Rd b/man/download_from_azure_blob.Rd deleted file mode 100644 index bfdb7161..00000000 --- a/man/download_from_azure_blob.Rd +++ /dev/null @@ -1,29 +0,0 @@ -% Generated by roxygen2: do not edit by hand -% Please edit documentation in R/azure.R -\name{download_from_azure_blob} -\alias{download_from_azure_blob} -\title{Download specified blobs from Blob Storage and save them in a local dir} -\usage{ -download_from_azure_blob(blob_names, local_dest, container_name) -} -\arguments{ -\item{blob_names}{A vector of blobs to donwload from \code{container_name}} - -\item{local_dest}{The path to the local directory to save the files in} - -\item{container_name}{The Azure Blob Storage container with \code{blob_names}} -} -\value{ -NULL on success -} -\description{ -Note that I think it might be wise to instead specify a blob prefix, list the -blobs, and download all the listed blobs. This would let us have some more -flexibility with downloading whole remote directories (like delta tables) -} -\seealso{ -Other azure: -\code{\link{fetch_blob_container}()}, -\code{\link{fetch_credential_from_env_var}()} -} -\concept{azure} diff --git a/man/download_if_specified.Rd b/man/download_if_specified.Rd new file mode 100644 index 00000000..6de6b698 --- /dev/null +++ b/man/download_if_specified.Rd @@ -0,0 +1,28 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/azure.R +\name{download_if_specified} +\alias{download_if_specified} +\title{Download if specified} +\usage{ +download_if_specified(blob_path, blob_storage_container, output_dir) +} +\arguments{ +\item{blob_path}{The name of the blob to download} + +\item{blob_storage_container}{The name of the container to donwload from} + +\item{output_dir}{The directory to write the downloaded file to} +} +\value{ +The path of the file +} +\description{ +Download if specified +} +\seealso{ +Other azure: +\code{\link{download_file_from_container}()}, +\code{\link{fetch_blob_container}()}, +\code{\link{fetch_credential_from_env_var}()} +} +\concept{azure} diff --git a/man/extract_diagnostics.Rd b/man/extract_diagnostics.Rd index 5c3c5e3f..f7328f33 100644 --- a/man/extract_diagnostics.Rd +++ b/man/extract_diagnostics.Rd @@ -1,5 +1,5 @@ % Generated by roxygen2: do not edit by hand -% Please edit documentation in R/extract_diagnostics.R +% Please edit documentation in R/diagnostics.R \name{extract_diagnostics} \alias{extract_diagnostics} \title{Extract diagnostic metrics from model fit and data} diff --git a/man/fetch_blob_container.Rd b/man/fetch_blob_container.Rd index 3613ebd8..82738253 100644 --- a/man/fetch_blob_container.Rd +++ b/man/fetch_blob_container.Rd @@ -2,7 +2,7 @@ % Please edit documentation in R/azure.R \name{fetch_blob_container} \alias{fetch_blob_container} -\title{Load Azure Blob endpoint using credentials in environment variables} +\title{Load Azure Blob container using credentials in environment variables} \usage{ fetch_blob_container(container_name) } @@ -27,7 +27,8 @@ It will error out if any of the above is not set. } \seealso{ Other azure: -\code{\link{download_from_azure_blob}()}, +\code{\link{download_file_from_container}()}, +\code{\link{download_if_specified}()}, \code{\link{fetch_credential_from_env_var}()} } \concept{azure} diff --git a/man/fetch_credential_from_env_var.Rd b/man/fetch_credential_from_env_var.Rd index 5708ed87..2e177dd1 100644 --- a/man/fetch_credential_from_env_var.Rd +++ b/man/fetch_credential_from_env_var.Rd @@ -17,7 +17,8 @@ And throw an informative error if credential is not found } \seealso{ Other azure: -\code{\link{download_from_azure_blob}()}, +\code{\link{download_file_from_container}()}, +\code{\link{download_if_specified}()}, \code{\link{fetch_blob_container}()} } \concept{azure} diff --git a/man/low_case_count_diagnostic.Rd b/man/low_case_count_diagnostic.Rd index 5d806554..ef315898 100644 --- a/man/low_case_count_diagnostic.Rd +++ b/man/low_case_count_diagnostic.Rd @@ -1,5 +1,5 @@ % Generated by roxygen2: do not edit by hand -% Please edit documentation in R/extract_diagnostics.R +% Please edit documentation in R/diagnostics.R \name{low_case_count_diagnostic} \alias{low_case_count_diagnostic} \title{Calculate low case count diagnostic flag} diff --git a/man/opts_formatter.Rd b/man/opts_formatter.Rd index 2952a9d0..8d266ff4 100644 --- a/man/opts_formatter.Rd +++ b/man/opts_formatter.Rd @@ -1,5 +1,5 @@ % Generated by roxygen2: do not edit by hand -% Please edit documentation in R/fit_model.R +% Please edit documentation in R/parameters.R \name{opts_formatter} \alias{opts_formatter} \alias{format_generation_interval} diff --git a/man/pipeline.Rd b/man/pipeline.Rd index 0671522b..3acb7b29 100644 --- a/man/pipeline.Rd +++ b/man/pipeline.Rd @@ -7,19 +7,27 @@ \usage{ orchestrate_pipeline( config_path, - blob_storage_container = NULL, - output_dir = "/" + output_container = NULL, + config_container = NULL, + input_dir = "/input", + output_dir = "/output" ) -execute_model_logic(config, output_dir, blob_storage_container) +execute_model_logic(config, input_dir, output_dir) } \arguments{ \item{config_path}{A string specifying the file path to the JSON configuration file.} -\item{blob_storage_container}{Optional. The name of the blob storage +\item{output_container}{Optional. The name of the blob storage container to which logs and outputs will be uploaded. If NULL, no upload -will occur. (Planned feature, not currently implemented)} +will occur.} + +\item{config_container}{Optional. The name of the blob storage container +from which the config file will be downloaded.} + +\item{input_dir}{A string specifying the directory to read inputs from. If +passing storage containers, this is where the files will be downloaded to.} \item{output_dir}{A string specifying the directory where output, logs, and other pipeline artifacts will be saved. Defaults to the root directory ("/").} diff --git a/tests/testthat/data/CA_COVID-19.json b/tests/testthat/data/CA_COVID-19.json index 2f1046ee..4817e054 100644 --- a/tests/testthat/data/CA_COVID-19.json +++ b/tests/testthat/data/CA_COVID-19.json @@ -11,7 +11,7 @@ "parameters": { "as_of_date": "2024-11-26", "generation_interval": { - "path": "data/test_parameters.parquet", + "path": "test_parameters.parquet", "blob_storage_container": null }, "delay_interval": { @@ -24,7 +24,7 @@ } }, "data": { - "path": "data/CA_test.parquet", + "path": "CA_test.parquet", "blob_storage_container": null }, "seed": 42, diff --git a/tests/testthat/data/sample_config_with_exclusion.json b/tests/testthat/data/sample_config_with_exclusion.json index b92c8c17..98edf47c 100644 --- a/tests/testthat/data/sample_config_with_exclusion.json +++ b/tests/testthat/data/sample_config_with_exclusion.json @@ -13,7 +13,7 @@ "parameters": { "as_of_date": "2023-10-28", "generation_interval": { - "path": "data/test_parameters.parquet", + "path": "test_parameters.parquet", "blob_storage_container": null }, "delay_interval": { @@ -26,11 +26,11 @@ } }, "data": { - "path": "data/test_data.parquet", + "path": "test_data.parquet", "blob_storage_container": null }, "exclusions": { - "path": "data/test_exclusions.csv", + "path": "test_exclusions.csv", "blob_storage_container": null }, "seed": 42, diff --git a/tests/testthat/test-azure.R b/tests/testthat/test-azure.R deleted file mode 100644 index 58992ae2..00000000 --- a/tests/testthat/test-azure.R +++ /dev/null @@ -1,66 +0,0 @@ -# NOTE: these tests don't test the happy path because they don't interact with -# Azure resources and mocking a full Azure Blob interface is hard. Instead, they -# test that expected errors are thrown and that if Azure access is mocked, the -# core download function runs all the way through. The function -# `download_file_from_container` isn't tested because it's a simple wrapper -# around `AzureStor::download_blob()` and `testthat::with_mocked_bindings()` -# advises mocking wrappers for tests rather than injecting the mock into the -# external lib. -test_that("Downloading file smoke test", { - file_path <- "not_a_real_file.ext" - download_status <- testthat::with_mocked_bindings( - { - withr::with_tempdir({ - download_from_azure_blob( - blob_names = c(file_path), - local_dest = ".", - container_name = "test_container" - ) - }) - }, - fetch_blob_container = function(...) "test-container", - download_file_from_container = function(...) file_path - ) - - expect_null(download_status) -}) - -test_that("Download fail throws informative error", { - # Errors on fetching credentials - expect_error( - download_from_azure_blob( - blob_names = c("test.json"), - local_dest = "./", - container_name = "test_container" - ) - ) - - # Credentials mocked, errors on downloading file - testthat::with_mocked_bindings( - { - withr::with_tempdir({ - expect_error( - download_from_azure_blob( - blob_names = c("not_a_real_file.ext"), - local_dest = ".", - container_name = "test_container" - ) - ) - }) - }, - fetch_blob_container = function(...) "test-container" - ) -}) - -test_that("Credential fetched successfully from env var", { - withr::with_envvar(c("KEY" = "VALUE"), { - expect_equal(fetch_credential_from_env_var("KEY"), "VALUE") - }) -}) - -test_that("Missing credential fails", { - withr::with_envvar(c("MISSING_KEY" = ""), { - expect_error(fetch_credential_from_env_var("MISSING_KEY")) - }) - expect_error(fetch_credential_from_env_var("NOT_A_REAL_KEY")) -}) diff --git a/tests/testthat/test-pipeline.R b/tests/testthat/test-pipeline.R index 3aa83bf0..46544e10 100644 --- a/tests/testthat/test-pipeline.R +++ b/tests/testthat/test-pipeline.R @@ -3,15 +3,17 @@ test_that("Bad config throws warning and returns failure", { config_path <- test_path("data", "bad_config.json") config <- jsonlite::read_json(config_path) # Read from locally - blob_storage_container <- NULL + output_container <- NULL output_dir <- "pipeline_test" + input_dir <- "." on.exit(unlink(output_dir, recursive = TRUE)) # Act expect_warning( pipeline_success <- orchestrate_pipeline( config_path = config_path, - blob_storage_container = blob_storage_container, + output_container = output_container, + input_dir = input_dir, output_dir = output_dir ), class = "Bad_config" @@ -24,14 +26,16 @@ test_that("Pipeline run produces expected outputs with NO exclusions", { config_path <- test_path("data", "sample_config_no_exclusion.json") config <- jsonlite::read_json(config_path) # Read from locally - blob_storage_container <- NULL + output_container <- NULL output_dir <- "pipeline_test" + input_dir <- "." on.exit(unlink(output_dir, recursive = TRUE)) # Act pipeline_success <- orchestrate_pipeline( config_path = config_path, - blob_storage_container = blob_storage_container, + output_container = output_container, + input_dir = input_dir, output_dir = output_dir ) expect_true(pipeline_success) @@ -44,17 +48,19 @@ test_that("Pipeline run produces expected outputs with NO exclusions", { test_that("Pipeline run produces expected outputs with exclusions", { # Arrange - config_path <- test_path("data", "sample_config_with_exclusion.json") - config <- jsonlite::read_json(config_path) + input_dir <- test_path("data") + config_path <- "sample_config_with_exclusion.json" + config <- jsonlite::read_json(file.path(input_dir, config_path)) # Read from locally - blob_storage_container <- NULL + output_container <- NULL output_dir <- "pipeline_test" on.exit(unlink(output_dir, recursive = TRUE)) # Act pipeline_success <- orchestrate_pipeline( config_path = config_path, - blob_storage_container = blob_storage_container, + output_container = output_container, + input_dir = input_dir, output_dir = output_dir ) @@ -70,7 +76,8 @@ test_that("Pipeline run produces expected outputs with exclusions", { test_that("Process pipeline produces expected outputs and returns success", { # Arrange - config_path <- test_path("data", "sample_config_with_exclusion.json") + input_dir <- "data" + config_path <- file.path(input_dir, "sample_config_with_exclusion.json") config <- read_json_into_config(config_path, c("exclusions")) # Read from locally output_dir <- "pipeline_test" @@ -79,8 +86,8 @@ test_that("Process pipeline produces expected outputs and returns success", { # Act pipeline_success <- execute_model_logic( config = config, - output_dir = output_dir, - blob_storage_container = "blah" + input_dir = input_dir, + output_dir = output_dir ) expect_true(pipeline_success) @@ -99,17 +106,21 @@ test_that("Process pipeline produces expected outputs and returns success", { test_that("Runs on config from generator as of 2024-11-26", { # Arrange - config_path <- test_path("data", "CA_COVID-19.json") - config <- read_json_into_config(config_path, c("exclusions")) + config_path <- "CA_COVID-19.json" + input_dir <- test_path("data") + config <- read_json_into_config( + file.path(input_dir, config_path), + c("exclusions") + ) # Read from locally - output_dir <- "pipeline_test" + output_dir <- test_path("pipeline_test") on.exit(unlink(output_dir, recursive = TRUE)) # Act pipeline_success <- execute_model_logic( config = config, output_dir = output_dir, - blob_storage_container = "blah" + input_dir = input_dir ) expect_true(pipeline_success) @@ -128,16 +139,18 @@ test_that("Runs on config from generator as of 2024-11-26", { test_that("Warning and exit for bad config file", { # Arrange - config_path <- test_path("data", "v_bad_config.json") + config_path <- test_path("v_bad_config.json") # Read from locally - output_dir <- "pipeline_test" + input_dir <- test_path("data") + output_dir <- test_path("bad_output") on.exit(unlink(output_dir, recursive = TRUE)) # Act expect_warning( pipeline_success <- orchestrate_pipeline( config_path = config_path, - blob_storage_container = NULL, + output_container = NULL, + input_dir = input_dir, output_dir = output_dir ), class = "Bad_config"