Skip to content

Commit

Permalink
Working azure download flow (#106)
Browse files Browse the repository at this point in the history
* Working azure download flow

* Clean up some typos from translation

* More debugging the flow

* Run pipeline off ABS config

* Download remote config in run

* Add azure stuff

* Fix an extra

* Clean up build checks

* Try rerunning after dropping cache

* Temporarily drop config checks to match generator

* Revert "Temporarily drop config checks to match generator"

This reverts commit a5d5776.

* Working!!!

* Working input/output dirs

* Bump NEWS

* Add doc for demo

* Fix tests

* Auto-detect tag

* Typos

* Move demo out of pkgdown's dir

* Add working Batch implementation

* Ignore secrets

* Added prerequisites

* Clean up a bit of the Azure stuff

* Update Dockerfile

* Remove non-standard uuid install

* Update azure/job.py

Co-authored-by: Nate McIntosh <[email protected]>

* One more cleanup from the big rebase

* Mount dirs in `mnt`

* Put everything in mnt for now

* Clean up WARNING from undocumented arg

* Ignore Azure stuff in R package

* Add family for Azure

* Pass image name and pool name

This is a stopgap because we should be able to parse both from the
same variable because everything is named by the branch. But for now,
doing this.

---------

Co-authored-by: kgostic <[email protected]>
Co-authored-by: Nate McIntosh <[email protected]>
  • Loading branch information
3 people authored Dec 16, 2024
1 parent dbfb98d commit 2a9798c
Show file tree
Hide file tree
Showing 25 changed files with 621 additions and 225 deletions.
3 changes: 3 additions & 0 deletions .Rbuildignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
$Dockerfile-batch^
^.env$
^CONTRIBUTING.md$
^DISCLAIMER.md$
^Dockerfile$
Expand All @@ -11,6 +13,7 @@
^\.vscode$
^\.vscode$
^_pkgdown\.yml$
^azure$
^batch-autoscale-formula.txt$
^code-of-conduct.md$
^codecov\.yml$
Expand Down
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -385,3 +385,7 @@ docs
# for now... will have to gpg encrypt
cfa-epinow2-batch-pool-config.json
azure/*.toml

# Careful with Secrets!
*.env
*.env.gpg
6 changes: 6 additions & 0 deletions Dockerfile-batch
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
FROM ghcr.io/astral-sh/uv:python3.12-bookworm

COPY azure .
RUN uv pip install -r requirements.txt --system

CMD /bin/bash
40 changes: 35 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,25 +1,55 @@
ifndef TAG
TAG = local
REGISTRY=cfaprdbatchcr.azurecr.io/
IMAGE_NAME=cfa-epinow2-pipeline
BRANCH=$(shell git branch --show-current)
ifeq ($(BRANCH), 'main')
TAG=latest
else
TAG=$(BRANCH)
endif

IMAGE_NAME=cfa-epinow2-pipeline
CONFIG=test.json
JOB=batch-test


deps:
docker build -t $(REGISTRY)$(IMAGE_NAME)-dependencies:$(TAG) -f Dockerfile-dependencies

pull:
az acr login --name 'cfaprdbatchcr'
docker pull $(REGISTRY)$(IMAGE_NAME)-dependencies:$(TAG)
docker pull $(REGISTRY)$(IMAGE_NAME):test-$(TAG)

build:
docker build -t $(REGISTRY)$(IMAGE_NAME):$(TAG) \
docker build -t $(REGISTRY)$(IMAGE_NAME):test-$(TAG) \
--build-arg TAG=$(TAG) -f Dockerfile .

tag:
docker tag $(IMAGE_NAME):$(TAG) $(REGISTRY)$(IMAGE_NAME):$(TAG)

config:
gh workflow run \
-R cdcgov/cfa-config-generator run-workload.yaml \
-f disease=all \
-f state=all

run-batch:
docker build -f Dockerfile-batch -t batch . --no-cache
docker run --rm \
--env-file .env \
-it \
batch python job.py "$(IMAGE)" "$(POOL)" "$(JOB)"

run:
docker run --mount type=bind,source=$(PWD),target=/mnt -it \
--env-file .env \
--rm $(REGISTRY)$(IMAGE_NAME):test-$(TAG) \
Rscript -e "CFAEpiNow2Pipeline::orchestrate_pipeline('$(CONFIG)', config_container = 'rt-epinow2-config', input_dir = '/mnt/input', output_dir = '/mnt', output_container = 'zs-test-pipeline-update')"


up:
docker run --mount type=bind,source=$(PWD),target=/cfa-epinow2-pipeline -it \
--rm $(REGISTRY)$(IMAGE_NAME):$(TAG) /bin/bash
--env-file .env \
--rm $(REGISTRY)$(IMAGE_NAME):test-$(TAG) /bin/bash

run-function:
docker run --mount type=bind,source=$(PWD),target=/cfa-epinow2-pipeline -it \
Expand Down
3 changes: 2 additions & 1 deletion NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ export(GenerationInterval)
export(Parameters)
export(RightTruncation)
export(apply_exclusions)
export(download_from_azure_blob)
export(download_file_from_container)
export(download_if_specified)
export(execute_model_logic)
export(extract_diagnostics)
export(fetch_blob_container)
Expand Down
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
* Use empty string for paths when non-existant.
* Add function families
* Populated the default values of the metadata to be saved.
* Working upload/download from ABS
* Working Azure upload/download
* Creating a Config class to make syncing configuration differences easier.
* Add a JSON reader for the Config class.
* Use the Config class throughout the pipeline.
Expand Down
165 changes: 95 additions & 70 deletions R/azure.R
Original file line number Diff line number Diff line change
@@ -1,70 +1,73 @@
#' Download specified blobs from Blob Storage and save them in a local dir
#'
#' Note that I think it might be wise to instead specify a blob prefix, list the
#' blobs, and download all the listed blobs. This would let us have some more
#' flexibility with downloading whole remote directories (like delta tables)
#'
#' @param blob_names A vector of blobs to donwload from `container_name`
#' @param local_dest The path to the local directory to save the files in
#' @param container_name The Azure Blob Storage container with `blob_names`
#' Download if specified
#'
#' @return NULL on success
#' @param blob_path The name of the blob to download
#' @param blob_storage_container The name of the container to donwload from
#' @param output_dir The directory to write the downloaded file to
#' @return The path of the file
#' @family azure
#' @export
download_from_azure_blob <- function(blob_names, local_dest, container_name) {
# Attempt to connect to the storage container
blob_container <- rlang::try_fetch(
fetch_blob_container(container_name),
error = function(con) {
cli::cli_abort(
c(
"Unable to authenticate connection to Blob endpoint",
"!" = "Check correct credentials are present as env variables",
"!" = "Check container {.var {container_name}} is correct"
),
parent = con
download_if_specified <- function(
blob_path,
blob_storage_container,
output_dir) {
# Guard against null input erroring out file.exists()
if (rlang::is_null(blob_path)) {
local_path <- NULL
} else {
file_exists <- file.exists(file.path(output_dir, blob_path))
if (!rlang::is_null(blob_storage_container) && !file_exists) {
container <- fetch_blob_container(blob_storage_container)
local_path <- download_file_from_container(
blob_storage_path = blob_path,
local_file_path = file.path(output_dir, blob_path),
storage_container = container
)
} else {
local_path <- file.path(output_dir, blob_path)
}
)

# Attempt to save each blob into local storage
for (blob in blob_names) {
local_file_path <- file.path(local_dest, blob)
rlang::try_fetch(
download_file_from_container(
blob,
blob_container,
local_file_path
),
error = function(con) {
cli::cli_abort(
c(
"Error downloading blob {.path {blob}}",
"Using container {.path {container_name}}",
"Writing to local file path {.path local_file_path}"
),
parent = con
)
}
)
}
cli::cli_alert_success("Blobs {.path {blob_names}} downloaded successfully")
invisible(NULL)
local_path
}

#' Download specified blobs from Blob Storage and save them in a local dir
#'
#' @param blob_storage_path A character of a blob in `storage_container`
#' @param local_file_path The local path to save the blob
#' @param storage_container The blob storage container with `blob_storage_path`
#
#' @return Invisibly, `local_file_path`
#' @family azure
#' @export
download_file_from_container <- function(
blob_storage_path,
container,
local_file_path) {
local_file_path,
storage_container) {
cli::cli_alert_info(
"Downloading blob {.path {blob_storage_path}} to {.path {local_file_path}}"
)

AzureStor::download_blob(
container = container,
src = blob_storage_path,
dest = local_file_path,
overwrite = TRUE
rlang::try_fetch(
{
dirs <- dirname(local_file_path)

if (!dir.exists(dirs)) {
cli::cli_alert("Creating directory {.path {dirs}}")
dir.create(dirs, recursive = TRUE)
}

AzureStor::download_blob(
container = storage_container,
src = blob_storage_path,
dest = local_file_path,
overwrite = TRUE
)
},
error = function(cnd) {
cli::cli_abort(c(
"Failed to download {.path {blob_storage_path}}",
">" = "Does the blob exist in the container?"
))
}
)

cli::cli_alert_success(
Expand All @@ -74,7 +77,7 @@ download_file_from_container <- function(
invisible(local_file_path)
}

#' Load Azure Blob endpoint using credentials in environment variables
#' Load Azure Blob container using credentials in environment variables
#'
#' This **impure** function depends on the environment variables:
#' * az_tenant_id
Expand All @@ -94,30 +97,52 @@ fetch_blob_container <- function(container_name) {
)
cli::cli_alert_info("Loading Azure credentials from env vars")
# nolint start: object_name_linter
az_tenant_id <- fetch_credential_from_env_var("az_tenant_id ")
az_subscription_id <- fetch_credential_from_env_var("az_subscription_id")
az_resource_group <- fetch_credential_from_env_var("az_resource_group")
az_storage_account <- fetch_credential_from_env_var("az_storage_account")
az_tenant_id <- fetch_credential_from_env_var("az_tenant_id")
az_client_id <- fetch_credential_from_env_var("az_client_id")
az_service_principal <- fetch_credential_from_env_var("az_service_principal")
# nolint end: object_name_linter
cli::cli_alert_success("Credentials loaded successfully")


cli::cli_alert_info("Authenticating with loaded credentials")
az <- AzureRMR::get_azure_login(az_tenant_id)
subscription <- az$get_subscription(az_subscription_id)
resource_group <- subscription$get_resource_group(az_resource_group)
storage_account <- resource_group$get_storage_account(az_storage_account)

# Getting the access key
keys <- storage_account$list_keys()
access_key <- keys[["key1"]]
rlang::try_fetch(
{
# First, get a general-purpose token using SP flow
# Analogous to:
# az login --service-principal \
# --username $az_client_id \
# --password $az_service_principal \
# --tenant $az_tenant_id
# NOTE: the SP is also sometimes called the `client_secret`
token <- AzureRMR::get_azure_token(
resource = "https://storage.azure.com",
tenant = az_tenant_id,
app = az_client_id,
password = az_service_principal
)
# Then fetch a storage endpoint using the token. Follows flow from
# https://github.com/Azure/AzureStor.
# Note that we're using the ABS endpoint (the first example line)
# but following the AAD token flow from the AAD alternative at
# end of the box. If we didn't replace the endpoint and used the
# example flow then it allows authentication to blob but throws
# a 409 when trying to download.
endpoint <- AzureStor::storage_endpoint(
"https://cfaazurebatchprd.blob.core.windows.net",
token = token
)

endpoint <- AzureStor::blob_endpoint(
storage_account$properties$primaryEndpoints$blob,
key = access_key
# Finally, set up instantiation of storage container generic
container <- AzureStor::storage_container(endpoint, container_name)
},
error = function(cnd) {
cli::cli_abort(
"Failure authenticating connection to {.var {container_name}}",
parent = cnd
)
}
)

container <- AzureStor::storage_container(endpoint, container_name)
cli::cli_alert_success("Authenticated connection to {.var {container_name}}")

return(container)
Expand Down
Loading

0 comments on commit 2a9798c

Please sign in to comment.