From 59442e57997ce889e69eb0dbdc5e04326851e997 Mon Sep 17 00:00:00 2001 From: Mike Smith Date: Mon, 30 Sep 2024 12:14:03 +0200 Subject: [PATCH] Some performance improvements --- R/read_data.R | 80 +++++++++++++++++++++++++++++--------------------- R/utils.R | 2 +- R/write_data.R | 19 ++++++++---- 3 files changed, 61 insertions(+), 40 deletions(-) diff --git a/R/read_data.R b/R/read_data.R index 80ac5ea..7400444 100644 --- a/R/read_data.R +++ b/R/read_data.R @@ -72,45 +72,59 @@ read_zarr_array <- function(zarr_array_path, index, s3_client) { return(res$output) } +.extract_elements <- function(i, metadata, index, required_chunks, zarr_array_path, s3_client, tmp) { + ## find elements to select from the chunk and what in the output we replace + index_in_result <- index_in_chunk <- list() + alt_chunk_dim <- unlist(metadata$chunks) + + for (j in seq_len(ncol(required_chunks))) { + index_in_result[[j]] <- which(tmp[[j]] == required_chunks[i, j]) + ## are we requesting values outside the array due to overhanging chunks? + outside_extent <- index_in_result[[j]] > metadata$shape[[j]] + if (any(outside_extent)) + index_in_result[[j]] <- index_in_result[[j]][-outside_extent] + if (any(index_in_result[[j]] == metadata$shape[[j]])) + alt_chunk_dim[j] <- length(index_in_result[[j]]) + + index_in_chunk[[j]] <- ((index[[j]][index_in_result[[j]]] - 1) %% metadata$chunks[[j]]) + 1 + } + + ## read this chunk + chunk <- read_chunk(zarr_array_path, + chunk_id = required_chunks[i, ], + metadata = metadata, + s3_client = s3_client, + alt_chunk_dim = alt_chunk_dim + ) + warn <- chunk$warning[1] + chunk_data <- chunk$chunk_data + + ## extract the required elements from the chunk + selection <- R.utils::extract(chunk_data, indices = index_in_chunk, drop = FALSE) + + return(list(selection, index_in_result, warning = warn)) +} + + #' @importFrom R.utils extract read_data <- function(required_chunks, zarr_array_path, s3_client, index, metadata) { warn <- 0L + + tmp <- list() + for(j in seq_len(ncol(required_chunks))) { + tmp[[j]] <- (index[[j]] - 1) %/% metadata$chunks[[j]] + } ## hopefully we can eventually do this in parallel - chunk_selections <- lapply(seq_len(nrow(required_chunks)), FUN = function(i) { - ## find elements to select from the chunk and what in the output we replace - index_in_result <- index_in_chunk <- list() - alt_chunk_dim <- unlist(metadata$chunks) - - for (j in seq_len(ncol(required_chunks))) { - index_in_result[[j]] <- which((index[[j]] - 1) %/% metadata$chunks[[j]] == required_chunks[i, j]) - ## are we requesting values outside the array due to overhanging chunks? - outside_extent <- index_in_result[[j]] > metadata$shape[[j]] - if (any(outside_extent)) - index_in_result[[j]] <- index_in_result[[j]][-outside_extent] - if (any(index_in_result[[j]] == metadata$shape[[j]])) - alt_chunk_dim[j] <- length(index_in_result[[j]]) - - index_in_chunk[[j]] <- ((index[[j]][index_in_result[[j]]] - 1) %% metadata$chunks[[j]]) + 1 - } - - ## read this chunk - chunk <- read_chunk(zarr_array_path, - chunk_id = required_chunks[i, ], - metadata = metadata, - s3_client = s3_client, - alt_chunk_dim = alt_chunk_dim - ) - warn <- chunk$warning[1] - chunk_data <- chunk$chunk_data - - ## extract the required elements from the chunk - selection <- R.utils::extract(chunk_data, indices = index_in_chunk, drop = FALSE) - - return(list(selection, index_in_result, warning = warn)) - }) + chunk_selections <- lapply(seq_len(nrow(required_chunks)), + FUN = .extract_elements, + metadata = metadata, index = index, + required_chunks = required_chunks, + zarr_array_path = zarr_array_path, + s3_client = s3_client, + tmp = tmp) ## predefine our array to be populated from the read chunks output <- array(metadata$fill_value, dim = vapply(index, length, integer(1))) @@ -201,7 +215,7 @@ read_chunk <- function(zarr_array_path, chunk_id, metadata, s3_client = NULL, dim_separator <- ifelse(is.null(metadata$dimension_separator), yes = ".", no = metadata$dimension_separator ) - chunk_id <- paste(chunk_id, collapse = dim_separator) + chunk_id <- paste0(chunk_id, collapse = dim_separator) datatype <- .parse_datatype(metadata$dtype) chunk_file <- paste0(zarr_array_path, chunk_id) diff --git a/R/utils.R b/R/utils.R index b2e65c1..5be2b83 100644 --- a/R/utils.R +++ b/R/utils.R @@ -41,7 +41,7 @@ check_index <- function(index, metadata) { .create_replace_call <- function(x_name, idx_name, idx_length, y_name) { args <- sprintf("%s[[%d]]", idx_name, seq_len(idx_length)) - args <- paste(args, collapse = ",") + args <- paste0(args, collapse = ",") cmd <- sprintf("%s[%s] <- %s", x_name, args, y_name) return(cmd) diff --git a/R/write_data.R b/R/write_data.R index 1db2519..ed41ef6 100644 --- a/R/write_data.R +++ b/R/write_data.R @@ -297,10 +297,14 @@ update_zarr_array <- function(zarr_array_path, x, index) { ## create all possible chunk names, then remove those that won't be touched chunk_names <- expand.grid(lapply(zarr_dim %/% chunk_dim, seq_len)) - 1 chunk_needed <- rep(FALSE, nrow(chunk_names)) + + ## determine which chunk each of the requests indices belongs to + chunk_idx <- mapply(\(x,y) { (x-1) %/% y }, index, chunk_dim) + for (i in seq_len(nrow(chunk_names))) { idx_in_zarr <- list() for (j in seq_along(zarr_dim)) { - idx_in_zarr[[j]] <- index[[j]][which((index[[j]] - 1) %/% chunk_dim[j] == chunk_names[i, j])] + idx_in_zarr[[j]] <- index[[j]][which(chunk_idx[[j]] == chunk_names[i, j])] } chunk_needed[i] <- all(lengths(idx_in_zarr) > 0) } @@ -312,29 +316,32 @@ update_zarr_array <- function(zarr_array_path, x, index) { ## TODO: maybe this can be done in parallel is bplapply() ? res <- lapply(chunk_ids, FUN = .update_chunk, x = x, path = zarr_array_path, - chunk_dim = chunk_dim, index = index, + chunk_dim = chunk_dim, chunk_idx = chunk_idx, metadata = metadata ) return(invisible(all(unlist(res)))) } -.update_chunk <- function(chunk_id, x, path, chunk_dim, index, - metadata) { +.update_chunk <- function(chunk_id, x, path, chunk_dim, + chunk_idx, metadata) { chunk_id_split <- as.integer( strsplit(chunk_id, metadata$dimension_separator, fixed = TRUE )[[1]] ) chunk_path <- paste0(path, chunk_id) + + ## determine which chunk each of the requests indices belongs to + #chunk_idx <- mapply(\(x,y) { (x-1) %/% y }, index, chunk_dim) ## determine which elements of x are being used and where in this specific ## chunk they should be inserted ## TODO: This is pretty ugly, maybe there's something more elegant idx_in_zarr <- idx_in_x <- idx_in_chunk <- list() for (j in seq_along(chunk_dim)) { - idx_in_zarr[[j]] <- index[[j]][which((index[[j]] - 1) %/% chunk_dim[j] == chunk_id_split[j])] - idx_in_x[[j]] <- which((index[[j]] - 1) %/% chunk_dim[j] == chunk_id_split[j]) + idx_in_x[[j]] <- which(chunk_idx[[j]] == chunk_id_split[j]) + idx_in_zarr[[j]] <- index[[j]][ idx_in_x[[j]] ] idx_in_chunk[[j]] <- ((idx_in_zarr[[j]] - 1) %% chunk_dim[j]) + 1 }