From 0efd28a7826aac6b14f74bedd41ab880338eb8f2 Mon Sep 17 00:00:00 2001 From: Tom White Date: Mon, 17 Oct 2022 14:58:22 +0100 Subject: [PATCH 1/5] Ensure default compressor is not ignored --- sgkit/io/vcf/utils.py | 29 +++++++++++++++++++++++++++ sgkit/io/vcf/vcf_reader.py | 10 +++++++-- sgkit/tests/io/vcf/test_utils.py | 13 +++++++++++- sgkit/tests/io/vcf/test_vcf_reader.py | 17 +++++++++++----- 4 files changed, 61 insertions(+), 8 deletions(-) diff --git a/sgkit/io/vcf/utils.py b/sgkit/io/vcf/utils.py index 2619f191d..3268951a3 100644 --- a/sgkit/io/vcf/utils.py +++ b/sgkit/io/vcf/utils.py @@ -168,3 +168,32 @@ def temporary_directory( finally: # Remove the temporary directory on exiting the context manager fs.rm(tempdir, recursive=True) + + +def merge_encodings( + default_encoding: Dict[str, Dict[str, Any]], overrides: Dict[str, Dict[str, Any]] +) -> Dict[str, Dict[str, Any]]: + """Merge a dictionary of dictionaries specifying encodings with another dictionary of dictionaries of overriding encodings. + + Parameters + ---------- + default_encoding : Dict[str, Dict[str, Any]] + The default encoding dictionary. + overrides : Dict[str, Dict[str, Any]] + A dictionary containing selective overrides. + + Returns + ------- + Dict[str, Dict[str, Any]] + The merged encoding dictionary + """ + merged = {} + for var, d in default_encoding.items(): + if var in overrides: + merged[var] = {**d, **overrides[var]} + else: + merged[var] = d + for var, d in overrides.items(): + if var not in merged: + merged[var] = d + return merged diff --git a/sgkit/io/vcf/vcf_reader.py b/sgkit/io/vcf/vcf_reader.py index ce682abd4..13e7934ee 100644 --- a/sgkit/io/vcf/vcf_reader.py +++ b/sgkit/io/vcf/vcf_reader.py @@ -37,7 +37,13 @@ STR_MISSING, ) from sgkit.io.vcf import partition_into_regions -from sgkit.io.vcf.utils import build_url, chunks, temporary_directory, url_filename +from sgkit.io.vcf.utils import ( + build_url, + chunks, + merge_encodings, + temporary_directory, + url_filename, +) from sgkit.io.vcfzarr_reader import ( concat_zarrs_optimized, vcf_number_to_dimension_and_size, @@ -556,7 +562,7 @@ def get_chunk_size(dim: Hashable, size: int) -> int: # values from function args (encoding) take precedence over default_encoding encoding = encoding or {} - merged_encoding = {**default_encoding, **encoding} + merged_encoding = merge_encodings(default_encoding, encoding) ds.to_zarr(output, mode="w", encoding=merged_encoding) first_variants_chunk = False diff --git a/sgkit/tests/io/vcf/test_utils.py b/sgkit/tests/io/vcf/test_utils.py index 22ca7c9eb..deb07a1b1 100644 --- a/sgkit/tests/io/vcf/test_utils.py +++ b/sgkit/tests/io/vcf/test_utils.py @@ -6,7 +6,7 @@ import pytest from callee.strings import StartsWith -from sgkit.io.vcf.utils import build_url, chunks, temporary_directory +from sgkit.io.vcf.utils import build_url, chunks, merge_encodings, temporary_directory from sgkit.io.vcf.vcf_reader import get_region_start @@ -118,3 +118,14 @@ def test_chunks(x, n, expected_values): ) def test_get_region_start(region: str, expected: int): assert get_region_start(region) == expected + + +def test_merge_encodings(): + default_encoding = dict(a=dict(a1=1, a2=2), b=dict(b1=5)) + overrides = dict(a=dict(a1=0, a3=3), c=dict(c1=7)) + assert merge_encodings(default_encoding, overrides) == dict( + a=dict(a1=0, a2=2, a3=3), b=dict(b1=5), c=dict(c1=7) + ) + + assert merge_encodings(default_encoding, {}) == default_encoding + assert merge_encodings({}, overrides) == overrides diff --git a/sgkit/tests/io/vcf/test_vcf_reader.py b/sgkit/tests/io/vcf/test_vcf_reader.py index eeeff2606..9473906b8 100644 --- a/sgkit/tests/io/vcf/test_vcf_reader.py +++ b/sgkit/tests/io/vcf/test_vcf_reader.py @@ -215,7 +215,7 @@ def test_vcf_to_zarr__compressor_and_filters(shared_datadir, is_path, tmp_path): path = path_for_test(shared_datadir, "sample.vcf.gz", is_path) output = tmp_path.joinpath("vcf.zarr").as_posix() - default_compressor = Blosc("zlib", 1, Blosc.NOSHUFFLE) + compressor = Blosc("zlib", 1, Blosc.NOSHUFFLE) variant_id_compressor = Blosc("zlib", 2, Blosc.NOSHUFFLE) encoding = dict( variant_id=dict(compressor=variant_id_compressor), @@ -226,18 +226,25 @@ def test_vcf_to_zarr__compressor_and_filters(shared_datadir, is_path, tmp_path): output, chunk_length=5, chunk_width=2, - compressor=default_compressor, + compressor=compressor, encoding=encoding, ) # look at actual Zarr store to check compressor and filters z = zarr.open(output) - assert z["call_genotype"].compressor == default_compressor - assert z["call_genotype"].filters is None - assert z["call_genotype_mask"].filters == [PackBits()] + assert z["call_genotype"].compressor == compressor + assert z["call_genotype"].filters is None # sgkit default + assert z["call_genotype"].chunks == (5, 2, 2) + assert z["call_genotype_mask"].compressor == compressor + assert z["call_genotype_mask"].filters == [PackBits()] # sgkit default + assert z["call_genotype_mask"].chunks == (5, 2, 2) assert z["variant_id"].compressor == variant_id_compressor + assert z["variant_id"].filters == [VLenUTF8()] # sgkit default + assert z["variant_id"].chunks == (5,) + assert z["variant_id_mask"].compressor == compressor assert z["variant_id_mask"].filters is None + assert z["variant_id_mask"].chunks == (5,) @pytest.mark.parametrize( From bd84bfbf3e80b6815c8a91203a7bf505e4d5cb2d Mon Sep 17 00:00:00 2001 From: Tom White Date: Mon, 17 Oct 2022 15:36:15 +0100 Subject: [PATCH 2/5] Add more default filters for VCF Issue warning for VCF FORMAT float fields --- sgkit/io/vcf/__init__.py | 2 + sgkit/io/vcf/utils.py | 33 +++++++++- sgkit/io/vcf/vcf_reader.py | 61 +++++++++---------- .../io/vcf/test_vcf_lossless_conversion.py | 1 + sgkit/tests/io/vcf/test_vcf_reader.py | 42 ++++++++++--- 5 files changed, 98 insertions(+), 41 deletions(-) diff --git a/sgkit/io/vcf/__init__.py b/sgkit/io/vcf/__init__.py index eadbe09ec..ec3956547 100644 --- a/sgkit/io/vcf/__init__.py +++ b/sgkit/io/vcf/__init__.py @@ -3,6 +3,7 @@ try: from .vcf_partition import partition_into_regions from .vcf_reader import ( + FloatFormatFieldWarning, MaxAltAllelesExceededWarning, concat_zarrs, vcf_to_zarr, @@ -10,6 +11,7 @@ ) __all__ = [ + "FloatFormatFieldWarning", "MaxAltAllelesExceededWarning", "concat_zarrs", "partition_into_regions", diff --git a/sgkit/io/vcf/utils.py b/sgkit/io/vcf/utils.py index 3268951a3..b4a5a165b 100644 --- a/sgkit/io/vcf/utils.py +++ b/sgkit/io/vcf/utils.py @@ -3,10 +3,11 @@ import tempfile import uuid from contextlib import contextmanager -from typing import IO, Any, Dict, Iterator, Optional, Sequence, TypeVar +from typing import IO, Any, Dict, Hashable, Iterator, Optional, Sequence, TypeVar from urllib.parse import urlparse import fsspec +from numcodecs import Delta, PackBits from yarl import URL from sgkit.typing import PathType @@ -170,6 +171,36 @@ def temporary_directory( fs.rm(tempdir, recursive=True) +def get_default_vcf_encoding(ds, chunk_length, chunk_width, compressor): + # Enforce uniform chunks in the variants dimension + # Also chunk in the samples direction + def get_chunk_size(dim: Hashable, size: int) -> int: + if dim == "variants": + return chunk_length + elif dim == "samples": + return chunk_width + else: + return size + + default_encoding = {} + for var in ds.data_vars: + var_chunks = tuple( + get_chunk_size(dim, size) + for (dim, size) in zip(ds[var].dims, ds[var].shape) + ) + default_encoding[var] = dict(chunks=var_chunks, compressor=compressor) + + # Enable bit packing by default for boolean arrays + if ds[var].dtype.kind == "b": + default_encoding[var]["filters"] = [PackBits()] + + # Position is monotonically increasing (within a contig) so benefits from delta encoding + if var == "variant_position": + default_encoding[var]["filters"] = [Delta(dtype="i4", astype="i4")] + + return default_encoding + + def merge_encodings( default_encoding: Dict[str, Dict[str, Any]], overrides: Dict[str, Dict[str, Any]] ) -> Dict[str, Dict[str, Any]]: diff --git a/sgkit/io/vcf/vcf_reader.py b/sgkit/io/vcf/vcf_reader.py index 13e7934ee..9131bdf5b 100644 --- a/sgkit/io/vcf/vcf_reader.py +++ b/sgkit/io/vcf/vcf_reader.py @@ -5,17 +5,7 @@ from contextlib import contextmanager from dataclasses import dataclass from pathlib import Path -from typing import ( - Any, - Dict, - Hashable, - Iterator, - MutableMapping, - Optional, - Sequence, - Tuple, - Union, -) +from typing import Any, Dict, Iterator, MutableMapping, Optional, Sequence, Tuple, Union import dask import fsspec @@ -23,7 +13,6 @@ import xarray as xr import zarr from cyvcf2 import VCF, Variant -from numcodecs import PackBits from sgkit import variables from sgkit.io.utils import ( @@ -40,6 +29,7 @@ from sgkit.io.vcf.utils import ( build_url, chunks, + get_default_vcf_encoding, merge_encodings, temporary_directory, url_filename, @@ -71,6 +61,12 @@ DEFAULT_COMPRESSOR = None +class FloatFormatFieldWarning(UserWarning): + """Warning for VCF FORMAT float fields, which can use a lot of storage.""" + + pass + + class MaxAltAllelesExceededWarning(UserWarning): """Warning when the number of alt alleles exceeds the maximum specified.""" @@ -535,35 +531,34 @@ def vcf_to_zarr_sequential( ds.attrs["max_alt_alleles_seen"] = max_alt_alleles_seen if first_variants_chunk: - # Enforce uniform chunks in the variants dimension - # Also chunk in the samples direction - - def get_chunk_size(dim: Hashable, size: int) -> int: - if dim == "variants": - return chunk_length - elif dim == "samples": - return chunk_width - else: - return size - - default_encoding = {} + # ensure that booleans are not stored as int8 by xarray https://github.com/pydata/xarray/issues/4386 for var in ds.data_vars: - var_chunks = tuple( - get_chunk_size(dim, size) - for (dim, size) in zip(ds[var].dims, ds[var].shape) - ) - default_encoding[var] = dict( - chunks=var_chunks, compressor=compressor - ) if ds[var].dtype.kind == "b": - # ensure that booleans are not stored as int8 by xarray https://github.com/pydata/xarray/issues/4386 ds[var].attrs["dtype"] = "bool" - default_encoding[var]["filters"] = [PackBits()] # values from function args (encoding) take precedence over default_encoding + default_encoding = get_default_vcf_encoding( + ds, chunk_length, chunk_width, compressor + ) encoding = encoding or {} merged_encoding = merge_encodings(default_encoding, encoding) + for var in ds.data_vars: + # Issue warning for VCF FORMAT float fields with no filter + if ( + var.startswith("call_") + and ds[var].dtype == np.float32 + and ( + var not in merged_encoding + or "filters" not in merged_encoding[var] + ) + ): + warnings.warn( + f"Storing call variable {var} (FORMAT field) as a float can result in large file sizes. " + f"Consider setting the encoding filters for this variable to FixedScaleOffset or similar.", + FloatFormatFieldWarning, + ) + ds.to_zarr(output, mode="w", encoding=merged_encoding) first_variants_chunk = False else: diff --git a/sgkit/tests/io/vcf/test_vcf_lossless_conversion.py b/sgkit/tests/io/vcf/test_vcf_lossless_conversion.py index d08230056..4a94408d1 100644 --- a/sgkit/tests/io/vcf/test_vcf_lossless_conversion.py +++ b/sgkit/tests/io/vcf/test_vcf_lossless_conversion.py @@ -17,6 +17,7 @@ ], ) @pytest.mark.filterwarnings( + "ignore::sgkit.io.vcf.FloatFormatFieldWarning", "ignore::sgkit.io.vcfzarr_reader.DimensionNameForFixedFormatFieldWarning", ) def test_lossless_conversion(shared_datadir, tmp_path, vcf_file): diff --git a/sgkit/tests/io/vcf/test_vcf_reader.py b/sgkit/tests/io/vcf/test_vcf_reader.py index 9473906b8..4534e1643 100644 --- a/sgkit/tests/io/vcf/test_vcf_reader.py +++ b/sgkit/tests/io/vcf/test_vcf_reader.py @@ -4,7 +4,7 @@ import pytest import xarray as xr import zarr -from numcodecs import Blosc, PackBits, VLenUTF8 +from numcodecs import Blosc, Delta, FixedScaleOffset, PackBits, VLenUTF8 from numpy.testing import assert_allclose, assert_array_equal from sgkit import load_dataset, save_dataset @@ -246,6 +246,10 @@ def test_vcf_to_zarr__compressor_and_filters(shared_datadir, is_path, tmp_path): assert z["variant_id_mask"].filters is None assert z["variant_id_mask"].chunks == (5,) + assert z["variant_position"].filters == [ + Delta(dtype="i4", astype="i4") + ] # sgkit default + @pytest.mark.parametrize( "is_path", @@ -259,7 +263,7 @@ def test_vcf_to_zarr__parallel_compressor_and_filters( output = tmp_path.joinpath("vcf_concat.zarr").as_posix() regions = ["20", "21"] - default_compressor = Blosc("zlib", 1, Blosc.NOSHUFFLE) + compressor = Blosc("zlib", 1, Blosc.NOSHUFFLE) variant_id_compressor = Blosc("zlib", 2, Blosc.NOSHUFFLE) encoding = dict( variant_id=dict(compressor=variant_id_compressor), @@ -270,18 +274,29 @@ def test_vcf_to_zarr__parallel_compressor_and_filters( output, regions=regions, chunk_length=5_000, - compressor=default_compressor, + compressor=compressor, encoding=encoding, ) # look at actual Zarr store to check compressor and filters z = zarr.open(output) - assert z["call_genotype"].compressor == default_compressor - assert z["call_genotype"].filters is None - assert z["call_genotype_mask"].filters == [PackBits()] + assert z["call_genotype"].compressor == compressor + assert z["call_genotype"].filters is None # sgkit default + assert z["call_genotype"].chunks == (5000, 1, 2) + assert z["call_genotype_mask"].compressor == compressor + assert z["call_genotype_mask"].filters == [PackBits()] # sgkit default + assert z["call_genotype_mask"].chunks == (5000, 1, 2) assert z["variant_id"].compressor == variant_id_compressor + assert z["variant_id"].filters == [VLenUTF8()] # sgkit default + assert z["variant_id"].chunks == (5000,) + assert z["variant_id_mask"].compressor == compressor assert z["variant_id_mask"].filters is None + assert z["variant_id_mask"].chunks == (5000,) + + assert z["variant_position"].filters == [ + Delta(dtype="i4", astype="i4") + ] # sgkit default @pytest.mark.parametrize( @@ -992,7 +1007,20 @@ def test_vcf_to_zarr__field_number_G_non_diploid(shared_datadir, tmp_path): path = path_for_test(shared_datadir, "simple.output.mixed_depth.likelihoods.vcf") output = tmp_path.joinpath("vcf.zarr").as_posix() - vcf_to_zarr(path, output, ploidy=4, max_alt_alleles=3, fields=["FORMAT/GL"]) + # store GL field as 2dp + encoding = { + "call_GL": { + "filters": [FixedScaleOffset(offset=0, scale=100, dtype="f4", astype="u1")] + } + } + vcf_to_zarr( + path, + output, + ploidy=4, + max_alt_alleles=3, + fields=["FORMAT/GL"], + encoding=encoding, + ) ds = xr.open_zarr(output) # comb(n_alleles + ploidy - 1, ploidy) = comb(4 + 4 - 1, 4) = comb(7, 4) = 35 From 6260a9b95a99d9e2b0b2cdc87a9a9d35ec2bcbc0 Mon Sep 17 00:00:00 2001 From: Tom White Date: Tue, 18 Oct 2022 10:45:30 +0100 Subject: [PATCH 3/5] Add section on compression to VCF docs --- docs/vcf.rst | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/docs/vcf.rst b/docs/vcf.rst index a690f5e0f..1773cd8e5 100644 --- a/docs/vcf.rst +++ b/docs/vcf.rst @@ -181,6 +181,47 @@ cloud storage. You can access files stored on Amazon S3 or Google Cloud Storage using ``s3://`` or ``gs://`` URLs. Setting credentials or other options is typically achieved using environment variables for the underlying cloud store. +Compression +----------- + +Zarr offers a lot of flexibility over controlling how data is compressed. Each variable can use +a different `compression algorithm `_, +and its own list of `filters `_. + +The :func:`sgkit.io.vcf.vcf_to_zarr` function tries to choose good defaults for compression, using +information about the variable's dtype, and also the nature of the data being stored. + +For example, ``variant_position`` (from the VCF ``POS`` field) is a monotonically increasing integer +(within a contig) so it benefits from using a delta encoding to store the differences in its values, +since these are smaller integers that compress better. This encoding is specified using the NumCodecs +`Delta `_ codec as a Zarr filter. + +When converting from VCF you can specify the default compression algorithm to use for all variables +by specifying ``compressor`` in the call to :func:`sgkit.io.vcf.vcf_to_zarr`. There are trade-offs +between compression speed and size, which this `benchmark `_ +does a good job of exploring. + +Sometimes you may want to override the compression for a particular variable. A good example of this +is for VCF FORMAT fields that are floats. Floats don't compress well, and since there is a value for +every sample they can take up a lot of space. In many cases full float precision is not needed, +so it is a good idea to use a filter to transform the float to an int, that takes less space. + +For example, the following code creates an encoding that can be passed to :func:`sgkit.io.vcf.vcf_to_zarr` +to store the VCF ``DS`` FORMAT field to 2 decimal places. (``DS`` is a dosage field that is between 0 and 2 +so we know it will fit into an unsigned 8-bit int.):: + + from numcodecs import FixedScaleOffset + + encoding = { + "call_DS": { + "filters": [FixedScaleOffset(offset=0, scale=100, dtype="f4", astype="u1")], + }, + } + +Note that this encoding won't work for floats that may be NaN. Consider using +`Quantize `_ (with ``astype=np.float16``) +or `Bitround `_ in that case. + .. _vcf_low_level_operation: Low-level operation From d04ad7acd4c46fb0951d5ca7ebb69e41719a7619 Mon Sep 17 00:00:00 2001 From: Tom White Date: Tue, 18 Oct 2022 14:38:33 +0100 Subject: [PATCH 4/5] Allow variant_position delta encoding to work with any dtype --- sgkit/io/vcf/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sgkit/io/vcf/utils.py b/sgkit/io/vcf/utils.py index b4a5a165b..7f398ee06 100644 --- a/sgkit/io/vcf/utils.py +++ b/sgkit/io/vcf/utils.py @@ -196,7 +196,7 @@ def get_chunk_size(dim: Hashable, size: int) -> int: # Position is monotonically increasing (within a contig) so benefits from delta encoding if var == "variant_position": - default_encoding[var]["filters"] = [Delta(dtype="i4", astype="i4")] + default_encoding[var]["filters"] = [Delta(ds[var].dtype)] return default_encoding From c4826a0cad59df393b4dae1f456db3750499d725 Mon Sep 17 00:00:00 2001 From: Tom White Date: Tue, 18 Oct 2022 14:56:54 +0100 Subject: [PATCH 5/5] Add test for FloatFormatFieldWarning --- sgkit/tests/io/vcf/test_vcf_reader.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/sgkit/tests/io/vcf/test_vcf_reader.py b/sgkit/tests/io/vcf/test_vcf_reader.py index 4534e1643..1051893af 100644 --- a/sgkit/tests/io/vcf/test_vcf_reader.py +++ b/sgkit/tests/io/vcf/test_vcf_reader.py @@ -14,7 +14,7 @@ partition_into_regions, vcf_to_zarr, ) -from sgkit.io.vcf.vcf_reader import zarr_array_sizes +from sgkit.io.vcf.vcf_reader import FloatFormatFieldWarning, zarr_array_sizes from sgkit.tests.io.test_dataset import assert_identical from .utils import path_for_test @@ -299,6 +299,20 @@ def test_vcf_to_zarr__parallel_compressor_and_filters( ] # sgkit default +def test_vcf_to_zarr__float_format_field_warning(shared_datadir, tmp_path): + path = path_for_test(shared_datadir, "simple.output.mixed_depth.likelihoods.vcf") + output = tmp_path.joinpath("vcf.zarr").as_posix() + + with pytest.warns(FloatFormatFieldWarning): + vcf_to_zarr( + path, + output, + ploidy=4, + max_alt_alleles=3, + fields=["FORMAT/GL"], + ) + + @pytest.mark.parametrize( "is_path", [True, False],