Skip to content

Commit

Permalink
Some performance improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
grimbough committed Sep 30, 2024
1 parent 31dd0f4 commit 59442e5
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 40 deletions.
80 changes: 47 additions & 33 deletions R/read_data.R
Original file line number Diff line number Diff line change
Expand Up @@ -72,45 +72,59 @@ read_zarr_array <- function(zarr_array_path, index, s3_client) {
return(res$output)
}

.extract_elements <- function(i, metadata, index, required_chunks, zarr_array_path, s3_client, tmp) {
## find elements to select from the chunk and what in the output we replace
index_in_result <- index_in_chunk <- list()
alt_chunk_dim <- unlist(metadata$chunks)

for (j in seq_len(ncol(required_chunks))) {
index_in_result[[j]] <- which(tmp[[j]] == required_chunks[i, j])
## are we requesting values outside the array due to overhanging chunks?
outside_extent <- index_in_result[[j]] > metadata$shape[[j]]
if (any(outside_extent))
index_in_result[[j]] <- index_in_result[[j]][-outside_extent]
if (any(index_in_result[[j]] == metadata$shape[[j]]))
alt_chunk_dim[j] <- length(index_in_result[[j]])

index_in_chunk[[j]] <- ((index[[j]][index_in_result[[j]]] - 1) %% metadata$chunks[[j]]) + 1
}

## read this chunk
chunk <- read_chunk(zarr_array_path,
chunk_id = required_chunks[i, ],
metadata = metadata,
s3_client = s3_client,
alt_chunk_dim = alt_chunk_dim
)
warn <- chunk$warning[1]
chunk_data <- chunk$chunk_data

## extract the required elements from the chunk
selection <- R.utils::extract(chunk_data, indices = index_in_chunk, drop = FALSE)

return(list(selection, index_in_result, warning = warn))
}


#' @importFrom R.utils extract
read_data <- function(required_chunks, zarr_array_path, s3_client,
index, metadata) {

warn <- 0L

tmp <- list()
for(j in seq_len(ncol(required_chunks))) {
tmp[[j]] <- (index[[j]] - 1) %/% metadata$chunks[[j]]
}

## hopefully we can eventually do this in parallel
chunk_selections <- lapply(seq_len(nrow(required_chunks)), FUN = function(i) {
## find elements to select from the chunk and what in the output we replace
index_in_result <- index_in_chunk <- list()
alt_chunk_dim <- unlist(metadata$chunks)

for (j in seq_len(ncol(required_chunks))) {
index_in_result[[j]] <- which((index[[j]] - 1) %/% metadata$chunks[[j]] == required_chunks[i, j])
## are we requesting values outside the array due to overhanging chunks?
outside_extent <- index_in_result[[j]] > metadata$shape[[j]]
if (any(outside_extent))
index_in_result[[j]] <- index_in_result[[j]][-outside_extent]
if (any(index_in_result[[j]] == metadata$shape[[j]]))
alt_chunk_dim[j] <- length(index_in_result[[j]])

index_in_chunk[[j]] <- ((index[[j]][index_in_result[[j]]] - 1) %% metadata$chunks[[j]]) + 1
}

## read this chunk
chunk <- read_chunk(zarr_array_path,
chunk_id = required_chunks[i, ],
metadata = metadata,
s3_client = s3_client,
alt_chunk_dim = alt_chunk_dim
)
warn <- chunk$warning[1]
chunk_data <- chunk$chunk_data

## extract the required elements from the chunk
selection <- R.utils::extract(chunk_data, indices = index_in_chunk, drop = FALSE)

return(list(selection, index_in_result, warning = warn))
})
chunk_selections <- lapply(seq_len(nrow(required_chunks)),
FUN = .extract_elements,
metadata = metadata, index = index,
required_chunks = required_chunks,
zarr_array_path = zarr_array_path,
s3_client = s3_client,
tmp = tmp)

## predefine our array to be populated from the read chunks
output <- array(metadata$fill_value, dim = vapply(index, length, integer(1)))
Expand Down Expand Up @@ -201,7 +215,7 @@ read_chunk <- function(zarr_array_path, chunk_id, metadata, s3_client = NULL,
dim_separator <- ifelse(is.null(metadata$dimension_separator),
yes = ".", no = metadata$dimension_separator
)
chunk_id <- paste(chunk_id, collapse = dim_separator)
chunk_id <- paste0(chunk_id, collapse = dim_separator)

datatype <- .parse_datatype(metadata$dtype)
chunk_file <- paste0(zarr_array_path, chunk_id)
Expand Down
2 changes: 1 addition & 1 deletion R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ check_index <- function(index, metadata) {
.create_replace_call <- function(x_name, idx_name, idx_length, y_name) {

args <- sprintf("%s[[%d]]", idx_name, seq_len(idx_length))
args <- paste(args, collapse = ",")
args <- paste0(args, collapse = ",")
cmd <- sprintf("%s[%s] <- %s", x_name, args, y_name)

return(cmd)
Expand Down
19 changes: 13 additions & 6 deletions R/write_data.R
Original file line number Diff line number Diff line change
Expand Up @@ -297,10 +297,14 @@ update_zarr_array <- function(zarr_array_path, x, index) {
## create all possible chunk names, then remove those that won't be touched
chunk_names <- expand.grid(lapply(zarr_dim %/% chunk_dim, seq_len)) - 1
chunk_needed <- rep(FALSE, nrow(chunk_names))

## determine which chunk each of the requests indices belongs to
chunk_idx <- mapply(\(x,y) { (x-1) %/% y }, index, chunk_dim)

for (i in seq_len(nrow(chunk_names))) {
idx_in_zarr <- list()
for (j in seq_along(zarr_dim)) {
idx_in_zarr[[j]] <- index[[j]][which((index[[j]] - 1) %/% chunk_dim[j] == chunk_names[i, j])]
idx_in_zarr[[j]] <- index[[j]][which(chunk_idx[[j]] == chunk_names[i, j])]
}
chunk_needed[i] <- all(lengths(idx_in_zarr) > 0)
}
Expand All @@ -312,29 +316,32 @@ update_zarr_array <- function(zarr_array_path, x, index) {
## TODO: maybe this can be done in parallel is bplapply() ?
res <- lapply(chunk_ids,
FUN = .update_chunk, x = x, path = zarr_array_path,
chunk_dim = chunk_dim, index = index,
chunk_dim = chunk_dim, chunk_idx = chunk_idx,
metadata = metadata
)

return(invisible(all(unlist(res))))
}

.update_chunk <- function(chunk_id, x, path, chunk_dim, index,
metadata) {
.update_chunk <- function(chunk_id, x, path, chunk_dim,
chunk_idx, metadata) {
chunk_id_split <- as.integer(
strsplit(chunk_id, metadata$dimension_separator,
fixed = TRUE
)[[1]]
)
chunk_path <- paste0(path, chunk_id)

## determine which chunk each of the requests indices belongs to
#chunk_idx <- mapply(\(x,y) { (x-1) %/% y }, index, chunk_dim)

## determine which elements of x are being used and where in this specific
## chunk they should be inserted
## TODO: This is pretty ugly, maybe there's something more elegant
idx_in_zarr <- idx_in_x <- idx_in_chunk <- list()
for (j in seq_along(chunk_dim)) {
idx_in_zarr[[j]] <- index[[j]][which((index[[j]] - 1) %/% chunk_dim[j] == chunk_id_split[j])]
idx_in_x[[j]] <- which((index[[j]] - 1) %/% chunk_dim[j] == chunk_id_split[j])
idx_in_x[[j]] <- which(chunk_idx[[j]] == chunk_id_split[j])
idx_in_zarr[[j]] <- index[[j]][ idx_in_x[[j]] ]
idx_in_chunk[[j]] <- ((idx_in_zarr[[j]] - 1) %% chunk_dim[j]) + 1
}

Expand Down

0 comments on commit 59442e5

Please sign in to comment.