Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add concat algorithm parameter to vcf_to_zarr #365 #665

Merged
merged 4 commits into from
Sep 16, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ For more low-level control:

partition_into_regions
vcf_to_zarrs
zarrs_to_dataset
concat_zarrs

For converting from `scikit-allel's VCF Zarr representation <https://scikit-allel.readthedocs.io/en/stable/io.html#allel.vcf_to_zarr>`_ to sgkit's Zarr representation:

Expand Down
3 changes: 1 addition & 2 deletions docs/vcf.rst
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,7 @@ would set the temporary Zarr store to be a cloud storage URL (by setting ``tempd
that all workers can access the store (both for reading and writing).

For debugging, or for more control over the steps, consider using
:func:`sgkit.io.vcf.vcf_to_zarrs` followed by :func:`sgkit.io.vcf.zarrs_to_dataset`,
then saving the dataset using Xarray's :meth:`xarray.Dataset.to_zarr` method.
:func:`sgkit.io.vcf.vcf_to_zarrs` followed by :func:`sgkit.io.vcf.concat_zarrs`.

Polyploid and mixed-ploidy VCF
------------------------------
Expand Down
10 changes: 7 additions & 3 deletions sgkit/io/vcf/__init__.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
import platform

try:
from ..utils import zarrs_to_dataset
from .vcf_partition import partition_into_regions
from .vcf_reader import MaxAltAllelesExceededWarning, vcf_to_zarr, vcf_to_zarrs
from .vcf_reader import (
MaxAltAllelesExceededWarning,
concat_zarrs,
vcf_to_zarr,
vcf_to_zarrs,
)

__all__ = [
"MaxAltAllelesExceededWarning",
"concat_zarrs",
"partition_into_regions",
"vcf_to_zarr",
"vcf_to_zarrs",
"zarrs_to_dataset",
]
except ImportError as e: # pragma: no cover
if platform.system() == "Windows":
Expand Down
88 changes: 80 additions & 8 deletions sgkit/io/vcf/vcf_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@
import xarray as xr
from cyvcf2 import VCF, Variant
from numcodecs import PackBits
from typing_extensions import Literal

from sgkit import variables
from sgkit.io.dataset import load_dataset
from sgkit.io.utils import zarrs_to_dataset
from sgkit.io.vcf import partition_into_regions
from sgkit.io.vcf.utils import build_url, chunks, temporary_directory, url_filename
from sgkit.io.vcfzarr_reader import vcf_number_to_dimension_and_size
from sgkit.io.vcfzarr_reader import (
concat_zarrs_optimized,
vcf_number_to_dimension_and_size,
)
from sgkit.model import (
DIM_PLOIDY,
DIM_SAMPLE,
Expand Down Expand Up @@ -528,6 +532,7 @@ def vcf_to_zarr_parallel(
fields: Optional[Sequence[str]] = None,
exclude_fields: Optional[Sequence[str]] = None,
field_defs: Optional[Dict[str, Dict[str, Any]]] = None,
concat_algorithm: Optional[Literal["xarray_internal"]] = None,
) -> None:
"""Convert specified regions of one or more VCF files to zarr files, then concat, rechunk, write to zarr"""

Expand Down Expand Up @@ -556,11 +561,15 @@ def vcf_to_zarr_parallel(
field_defs=field_defs,
)

ds = zarrs_to_dataset(paths, chunk_length, chunk_width, tempdir_storage_options)

# Ensure Dask task graph is efficient, see https://github.com/dask/dask/issues/5105
with dask.config.set({"optimization.fuse.ave-width": dask_fuse_avg_width}):
ds.to_zarr(output, mode="w")
concat_zarrs(
paths,
output,
concat_algorithm=concat_algorithm,
chunk_length=chunk_length,
chunk_width=chunk_width,
storage_options=tempdir_storage_options,
dask_fuse_avg_width=dask_fuse_avg_width,
)


def vcf_to_zarrs(
Expand Down Expand Up @@ -702,6 +711,64 @@ def vcf_to_zarrs(
return parts


def concat_zarrs(
urls: Sequence[str],
output: Union[PathType, MutableMapping[str, bytes]],
*,
concat_algorithm: Optional[Literal["xarray_internal"]] = None,
chunk_length: int = 10_000,
chunk_width: int = 1_000,
storage_options: Optional[Dict[str, str]] = None,
dask_fuse_avg_width: int = 50,
) -> None:
"""Concatenate multiple Zarr stores into a single Zarr store.

The Zarr stores are concatenated and rechunked to produce a single combined store.

Parameters
----------
urls
A list of URLs to the Zarr stores to combine, typically the return value of
:func:`vcf_to_zarrs`.
output
Zarr store or path to directory in file system.
concat_algorithm
The algorithm to use to concatenate and rechunk Zarr files. The default None means
use the optimized version suitable for large files, whereas ``xarray_internal`` will
use built-in Xarray APIs, which can exhibit high memory usage, see https://github.com/dask/dask/issues/6745.
chunk_length
Length (number of variants) of chunks in which data are stored, by default 10,000.
This is only used when ``concat_algorithm`` is ``xarray_internal``.
chunk_width
Width (number of samples) to use when storing chunks in output, by default 1,000.
This is only used when ``concat_algorithm`` is ``xarray_internal``.
storage_options
Any additional parameters for the storage backend (see ``fsspec.open``).
dask_fuse_avg_width
Setting for Dask's ``optimization.fuse.ave-width``, see https://github.com/dask/dask/issues/5105
"""
if concat_algorithm == "xarray_internal":
ds = zarrs_to_dataset(urls, chunk_length, chunk_width, storage_options)

with dask.config.set({"optimization.fuse.ave-width": dask_fuse_avg_width}):
ds.to_zarr(output, mode="w")
else:

vars_to_rechunk = []
vars_to_copy = []
storage_options = storage_options or {}
ds = xr.open_zarr( # type: ignore[no-untyped-call]
fsspec.get_mapper(urls[0], **storage_options), concat_characters=False
)
for (var, arr) in ds.data_vars.items():
if arr.dims[0] == "variants":
vars_to_rechunk.append(var)
else:
vars_to_copy.append(var)

concat_zarrs_optimized(urls, output, vars_to_rechunk, vars_to_copy)


def vcf_to_zarr(
input: Union[PathType, Sequence[PathType]],
output: Union[PathType, MutableMapping[str, bytes]],
Expand All @@ -722,6 +789,7 @@ def vcf_to_zarr(
fields: Optional[Sequence[str]] = None,
exclude_fields: Optional[Sequence[str]] = None,
field_defs: Optional[Dict[str, Dict[str, Any]]] = None,
concat_algorithm: Optional[Literal["xarray_internal"]] = None,
) -> None:
"""Convert VCF files to a single Zarr on-disk store.

Expand All @@ -734,8 +802,7 @@ def vcf_to_zarr(
is None.

For more control over these two steps, consider using :func:`vcf_to_zarrs` followed by
:func:`zarrs_to_dataset`, then saving the dataset using Xarray's
:meth:`xarray.Dataset.to_zarr` method.
:func:`concat_zarrs`.

Parameters
----------
Expand Down Expand Up @@ -810,6 +877,10 @@ def vcf_to_zarr(
(which is defined as Number 2 in the VCF header) as ``haplotypes``.
(Note that Number ``A`` is the number of alternate alleles, see section 1.4.2 of the
VCF spec https://samtools.github.io/hts-specs/VCFv4.3.pdf.)
concat_algorithm
The algorithm to use to concatenate and rechunk Zarr files. The default None means
use the optimized version suitable for large files, whereas ``xarray_internal`` will
use built-in Xarray APIs, which can exhibit high memory usage, see https://github.com/dask/dask/issues/6745.
"""

if temp_chunk_length is not None:
Expand Down Expand Up @@ -841,6 +912,7 @@ def vcf_to_zarr(
temp_chunk_length=temp_chunk_length,
tempdir=tempdir,
tempdir_storage_options=tempdir_storage_options,
concat_algorithm=concat_algorithm,
)
convert_func(
input, # type: ignore
Expand Down
62 changes: 48 additions & 14 deletions sgkit/io/vcfzarr_reader.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,20 @@
import tempfile
from pathlib import Path
from typing import Any, Dict, Hashable, List, Optional, Tuple
from typing import (
Any,
Dict,
Hashable,
List,
MutableMapping,
Optional,
Sequence,
Tuple,
Union,
)

import dask
import dask.array as da
import numcodecs
import xarray as xr
import zarr
from fsspec import get_mapper
Expand Down Expand Up @@ -151,8 +162,8 @@ def vcfzarr_to_zarr(
ds.to_zarr(output, mode="w")
else:
# Use the optimized algorithm in `concatenate_and_rechunk`
_concat_zarrs_optimized(
zarr_files, output, vars_to_rechunk, vars_to_copy
concat_zarrs_optimized(
zarr_files, output, vars_to_rechunk, vars_to_copy, fix_strings=True
)


Expand Down Expand Up @@ -290,18 +301,22 @@ def _get_max_len(zarr_groups: List[zarr.Group], attr_name: str) -> int:
return max_len


def _concat_zarrs_optimized(
zarr_files: List[str],
output: PathType,
def concat_zarrs_optimized(
zarr_files: Sequence[str],
output: Union[PathType, MutableMapping[str, bytes]],
vars_to_rechunk: List[Hashable],
vars_to_copy: List[Hashable],
fix_strings: bool = False,
) -> None:
if isinstance(output, Path):
output = str(output)

zarr_groups = [zarr.open_group(f) for f in zarr_files]

first_zarr_group = zarr_groups[0]

# create the top-level group
zarr.open_group(str(output), mode="w")
zarr.open_group(output, mode="w")

# copy variables that are to be rechunked
# NOTE: that this uses _to_zarr function defined here that is needed to avoid
Expand All @@ -310,38 +325,57 @@ def _concat_zarrs_optimized(
delayed = [] # do all the rechunking operations in one computation
for var in vars_to_rechunk:
dtype = None
if var in {"variant_id", "variant_allele"}:
if fix_strings and var in {"variant_id", "variant_allele"}:
max_len = _get_max_len(zarr_groups, f"max_length_{var}")
dtype = f"S{max_len}"

arr = concatenate_and_rechunk(
[group[var] for group in zarr_groups], dtype=dtype
)

_to_zarr_kwargs = dict(
compressor=first_zarr_group[var].compressor,
filters=first_zarr_group[var].filters,
fill_value=None,
)
if not fix_strings and arr.dtype == "O":
# We assume that all object dtypes are variable length strings
var_len_str_codec = numcodecs.VLenUTF8()
_to_zarr_kwargs["object_codec"] = var_len_str_codec
# Remove from filters to avoid double encoding error
if var_len_str_codec in first_zarr_group[var].filters:
filters = list(first_zarr_group[var].filters)
filters.remove(var_len_str_codec)
_to_zarr_kwargs["filters"] = filters

d = _to_zarr( # type: ignore[no-untyped-call]
arr,
str(output),
output,
component=var,
overwrite=True,
compute=False,
fill_value=None,
attrs=first_zarr_group[var].attrs.asdict(),
**_to_zarr_kwargs,
)
delayed.append(d)
da.compute(*delayed)

# copy unchanged variables and top-level metadata
with zarr.open_group(str(output)) as output_zarr:
with zarr.open_group(output) as output_zarr:

# copy variables that are not rechunked (e.g. sample_id)
for var in vars_to_copy:
output_zarr[var] = first_zarr_group[var]
output_zarr[var].attrs.update(first_zarr_group[var].attrs)

# copy top-level attributes
output_zarr.attrs.update(first_zarr_group.attrs)
group_attrs = dict(first_zarr_group.attrs)
if "max_alt_alleles_seen" in group_attrs:
max_alt_alleles_seen = _get_max_len(zarr_groups, "max_alt_alleles_seen")
group_attrs["max_alt_alleles_seen"] = max_alt_alleles_seen
output_zarr.attrs.update(group_attrs)

# consolidate metadata
zarr.consolidate_metadata(str(output))
zarr.consolidate_metadata(output)


def _to_zarr( # type: ignore[no-untyped-def]
Expand Down
Loading