Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Working azure download flow #106

Open
wants to merge 25 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 23 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .Rbuildignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
^.env$
^CONTRIBUTING.md$
^DISCLAIMER.md$
^Dockerfile$
Expand Down
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -385,3 +385,7 @@ docs
# for now... will have to gpg encrypt
cfa-epinow2-batch-pool-config.json
azure/*.toml

# Careful with Secrets!
*.env
*.env.gpg
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Adding arguments
ARG TAG=local
ARG TAG=edit-azure-flow
zsusswein marked this conversation as resolved.
Show resolved Hide resolved

# This requires access to the Azure Container Registry
FROM cfaprdbatchcr.azurecr.io/cfa-epinow2-pipeline-dependencies:${TAG}
Expand Down
6 changes: 6 additions & 0 deletions Dockerfile-batch
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
FROM ghcr.io/astral-sh/uv:python3.12-bookworm

COPY azure .
RUN uv pip install -r requirements.txt --system

CMD /bin/bash
40 changes: 35 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,25 +1,55 @@
ifndef TAG
TAG = local
REGISTRY=cfaprdbatchcr.azurecr.io/
IMAGE_NAME=cfa-epinow2-pipeline
BRANCH=$(shell git branch --show-current)
ifeq ($(BRANCH), 'main')
TAG=latest
else
TAG=$(BRANCH)
endif

IMAGE_NAME=cfa-epinow2-pipeline
CONFIG=test.json
JOB=batch-test


deps:
docker build -t $(REGISTRY)$(IMAGE_NAME)-dependencies:$(TAG) -f Dockerfile-dependencies

pull:
az acr login --name 'cfaprdbatchcr'
docker pull $(REGISTRY)$(IMAGE_NAME)-dependencies:$(TAG)
docker pull $(REGISTRY)$(IMAGE_NAME):test-$(TAG)

build:
docker build -t $(REGISTRY)$(IMAGE_NAME):$(TAG) \
docker build -t $(REGISTRY)$(IMAGE_NAME):test-$(TAG) \
--build-arg TAG=$(TAG) -f Dockerfile .
zsusswein marked this conversation as resolved.
Show resolved Hide resolved

tag:
docker tag $(IMAGE_NAME):$(TAG) $(REGISTRY)$(IMAGE_NAME):$(TAG)

config:
gh workflow run \
-R cdcgov/cfa-config-generator run-workload.yaml \
-f disease=all \
-f state=all

run-batch:
docker build -f Dockerfile-batch -t batch . --no-cache
docker run --rm \
--env-file .env \
-it \
batch python job.py "cfa-epinow2-edit-azure-flow" "$(JOB)"

natemcintosh marked this conversation as resolved.
Show resolved Hide resolved
run:
docker run --mount type=bind,source=$(PWD),target=/cfa-epinow2-pipeline -it \
--env-file .env \
--rm $(REGISTRY)$(IMAGE_NAME):test-$(TAG) \
Rscript -e "CFAEpiNow2Pipeline::orchestrate_pipeline('$(CONFIG)', config_container = 'rt-epinow2-config', input_dir = '/cfa-epinow2-pipeline/input', output_dir = '/cfa-epinow2-pipeline', output_container = 'zs-test-pipeline-update')"

zsusswein marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Rscript -e "CFAEpiNow2Pipeline::orchestrate_pipeline('$(CONFIG)', config_container = 'rt-epinow2-config', input_dir = '/cfa-epinow2-pipeline/input', output_dir = '/cfa-epinow2-pipeline', output_container = 'zs-test-pipeline-update')"
Rscript -e "CFAEpiNow2Pipeline::orchestrate_pipeline('$(CONFIG)', config_container = 'rt-epinow2-config', input_dir = '/cfa-epinow2-pipeline/input', output_dir = '/', output_container = 'zs-test-pipeline-update')"

Use the same output dir as for the run-batch command. This way, the file paths in the metadata will always be the same as the those created by the batch command, which is desirable for maintaining consistency in output folder structure. However, this would require changing the bind mount above to mount to the container's root, instead of /cfa-epinow2-pipeline.

What do you think?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the point about consistency, but I think we should follow the pattern of this one, not run-batch. We want the local run to write into the bind mount, which this edit would prevent.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But what if we change the bind mount to be / inside the container for this make command?


up:
docker run --mount type=bind,source=$(PWD),target=/cfa-epinow2-pipeline -it \
--rm $(REGISTRY)$(IMAGE_NAME):$(TAG) /bin/bash
--env-file .env \
--rm $(REGISTRY)$(IMAGE_NAME):test-$(TAG) /bin/bash

zsusswein marked this conversation as resolved.
Show resolved Hide resolved
run-function:
docker run --mount type=bind,source=$(PWD),target=/cfa-epinow2-pipeline -it \
Expand Down
3 changes: 2 additions & 1 deletion NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ export(GenerationInterval)
export(Parameters)
export(RightTruncation)
export(apply_exclusions)
export(download_from_azure_blob)
export(download_file_from_container)
export(download_if_specified)
export(execute_model_logic)
export(extract_diagnostics)
export(fetch_blob_container)
Expand Down
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# CFAEpiNow2Pipeline (development version)

* Populated the default values of the metadata to be saved.
* Working upload/download from ABS
* Working Azure upload/download
* Creating a Config class to make syncing configuration differences easier.
* Add a JSON reader for the Config class.
* Use the Config class throughout the pipeline.
Expand Down
164 changes: 94 additions & 70 deletions R/azure.R
Original file line number Diff line number Diff line change
@@ -1,69 +1,71 @@
#' Download specified blobs from Blob Storage and save them in a local dir
#'
#' Note that I think it might be wise to instead specify a blob prefix, list the
#' blobs, and download all the listed blobs. This would let us have some more
#' flexibility with downloading whole remote directories (like delta tables)
#'
#' @param blob_names A vector of blobs to donwload from `container_name`
#' @param local_dest The path to the local directory to save the files in
#' @param container_name The Azure Blob Storage container with `blob_names`
#' Download if specified
#'
#' @return NULL on success
#' @param blob_path The name of the blob to download
#' @param blob_storage_container The name of the container to donwload from
#' @param output_dir The directory to write the downloaded file to
#' @return The path of the file
#' @export
download_from_azure_blob <- function(blob_names, local_dest, container_name) {
# Attempt to connect to the storage container
blob_container <- rlang::try_fetch(
fetch_blob_container(container_name),
error = function(con) {
cli::cli_abort(
c(
"Unable to authenticate connection to Blob endpoint",
"!" = "Check correct credentials are present as env variables",
"!" = "Check container {.var {container_name}} is correct"
),
parent = con
download_if_specified <- function(
blob_path,
blob_storage_container,
output_dir) {
# Guard against null input erroring out file.exists()
if (rlang::is_null(blob_path)) {
local_path <- NULL
} else {
file_exists <- file.exists(file.path(output_dir, blob_path))
if (!rlang::is_null(blob_storage_container) && !file_exists) {
container <- fetch_blob_container(blob_storage_container)
local_path <- download_file_from_container(
blob_storage_path = blob_path,
local_file_path = file.path(output_dir, blob_path),
storage_container = container

Check warning on line 22 in R/azure.R

View check run for this annotation

Codecov / codecov/patch

R/azure.R#L18-L22

Added lines #L18 - L22 were not covered by tests
)
} else {
local_path <- file.path(output_dir, blob_path)
}
)

# Attempt to save each blob into local storage
for (blob in blob_names) {
local_file_path <- file.path(local_dest, blob)
rlang::try_fetch(
download_file_from_container(
blob,
blob_container,
local_file_path
),
error = function(con) {
cli::cli_abort(
c(
"Error downloading blob {.path {blob}}",
"Using container {.path {container_name}}",
"Writing to local file path {.path local_file_path}"
),
parent = con
)
}
)
}
cli::cli_alert_success("Blobs {.path {blob_names}} downloaded successfully")
invisible(NULL)
local_path
}

#' Download specified blobs from Blob Storage and save them in a local dir
#'
#' @param blob_storage_path A character of a blob in `storage_container`
#' @param local_file_path The local path to save the blob
#' @param storage_container The blob storage container with `blob_storage_path`
#'
#' @return Invisibly, `local_file_path`
#' @export
download_file_from_container <- function(
blob_storage_path,
container,
local_file_path) {
local_file_path,
storage_container) {
cli::cli_alert_info(
"Downloading blob {.path {blob_storage_path}} to {.path {local_file_path}}"
)

AzureStor::download_blob(
container = container,
src = blob_storage_path,
dest = local_file_path,
overwrite = TRUE
rlang::try_fetch(

Check warning on line 47 in R/azure.R

View check run for this annotation

Codecov / codecov/patch

R/azure.R#L47

Added line #L47 was not covered by tests
{
dirs <- dirname(local_file_path)

Check warning on line 49 in R/azure.R

View check run for this annotation

Codecov / codecov/patch

R/azure.R#L49

Added line #L49 was not covered by tests

if (!dir.exists(dirs)) {
cli::cli_alert("Creating directory {.path {dirs}}")
dir.create(dirs, recursive = TRUE)

Check warning on line 53 in R/azure.R

View check run for this annotation

Codecov / codecov/patch

R/azure.R#L51-L53

Added lines #L51 - L53 were not covered by tests
}

AzureStor::download_blob(
container = storage_container,
src = blob_storage_path,
dest = local_file_path,
overwrite = TRUE

Check warning on line 60 in R/azure.R

View check run for this annotation

Codecov / codecov/patch

R/azure.R#L56-L60

Added lines #L56 - L60 were not covered by tests
)
},
error = function(cnd) {
cli::cli_abort(c(
"Failed to download {.path {blob_storage_path}}",
">" = "Does the blob exist in the container?"

Check warning on line 66 in R/azure.R

View check run for this annotation

Codecov / codecov/patch

R/azure.R#L63-L66

Added lines #L63 - L66 were not covered by tests
))
}
)

cli::cli_alert_success(
Expand All @@ -73,7 +75,7 @@
invisible(local_file_path)
}

#' Load Azure Blob endpoint using credentials in environment variables
#' Load Azure Blob container using credentials in environment variables
#'
#' This **impure** function depends on the environment variables:
#' * az_tenant_id
Expand All @@ -92,30 +94,52 @@
)
cli::cli_alert_info("Loading Azure credentials from env vars")
# nolint start: object_name_linter
az_tenant_id <- fetch_credential_from_env_var("az_tenant_id ")
az_subscription_id <- fetch_credential_from_env_var("az_subscription_id")
az_resource_group <- fetch_credential_from_env_var("az_resource_group")
az_storage_account <- fetch_credential_from_env_var("az_storage_account")
az_tenant_id <- fetch_credential_from_env_var("az_tenant_id")
az_client_id <- fetch_credential_from_env_var("az_client_id")
az_service_principal <- fetch_credential_from_env_var("az_service_principal")

Check warning on line 99 in R/azure.R

View check run for this annotation

Codecov / codecov/patch

R/azure.R#L97-L99

Added lines #L97 - L99 were not covered by tests
# nolint end: object_name_linter
cli::cli_alert_success("Credentials loaded successfully")


cli::cli_alert_info("Authenticating with loaded credentials")
az <- AzureRMR::get_azure_login(az_tenant_id)
subscription <- az$get_subscription(az_subscription_id)
resource_group <- subscription$get_resource_group(az_resource_group)
storage_account <- resource_group$get_storage_account(az_storage_account)

# Getting the access key
keys <- storage_account$list_keys()
access_key <- keys[["key1"]]
rlang::try_fetch(

Check warning on line 105 in R/azure.R

View check run for this annotation

Codecov / codecov/patch

R/azure.R#L105

Added line #L105 was not covered by tests
{
# First, get a general-purpose token using SP flow
# Analogous to:
# az login --service-principal \
# --username $az_client_id \
# --password $az_service_principal \
# --tenant $az_tenant_id
# NOTE: the SP is also sometimes called the `client_secret`
token <- AzureRMR::get_azure_token(
resource = "https://storage.azure.com",
tenant = az_tenant_id,
app = az_client_id,
password = az_service_principal

Check warning on line 118 in R/azure.R

View check run for this annotation

Codecov / codecov/patch

R/azure.R#L114-L118

Added lines #L114 - L118 were not covered by tests
)
# Then fetch a storage endpoint using the token. Follows flow from
# https://github.com/Azure/AzureStor.
# Note that we're using the ABS endpoint (the first example line)
# but following the AAD token flow from the AAD alternative at
# end of the box. If we didn't replace the endpoint and used the
# example flow then it allows authentication to blob but throws
# a 409 when trying to download.
endpoint <- AzureStor::storage_endpoint(
"https://cfaazurebatchprd.blob.core.windows.net",
token = token

Check warning on line 129 in R/azure.R

View check run for this annotation

Codecov / codecov/patch

R/azure.R#L127-L129

Added lines #L127 - L129 were not covered by tests
)

endpoint <- AzureStor::blob_endpoint(
storage_account$properties$primaryEndpoints$blob,
key = access_key
# Finally, set up instantiation of storage container generic
container <- AzureStor::storage_container(endpoint, container_name)

Check warning on line 133 in R/azure.R

View check run for this annotation

Codecov / codecov/patch

R/azure.R#L133

Added line #L133 was not covered by tests
},
error = function(cnd) {
cli::cli_abort(
"Failure authenticating connection to {.var {container_name}}",
parent = cnd

Check warning on line 138 in R/azure.R

View check run for this annotation

Codecov / codecov/patch

R/azure.R#L135-L138

Added lines #L135 - L138 were not covered by tests
)
}
)

container <- AzureStor::storage_container(endpoint, container_name)
cli::cli_alert_success("Authenticated connection to {.var {container_name}}")

return(container)
Expand Down
Loading
Loading