Skip to content

Fst windowing #303

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Oct 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 58 additions & 16 deletions sgkit/stats/popgen.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from sgkit.stats.utils import assert_array_shape
from sgkit.typing import ArrayLike
from sgkit.utils import conditional_merge_datasets
from sgkit.window import has_windows, window_statistic

from .. import variables
from .aggregation import count_cohort_alleles, count_variant_alleles
Expand Down Expand Up @@ -68,15 +69,33 @@ def diversity(
# replace zeros to avoid divide by zero error
n_pairs_na = n_pairs.where(n_pairs != 0)
pi = n_diff / n_pairs_na
pi_sum = pi.sum(axis=0, skipna=False)
new_ds = Dataset(
{
variables.stat_diversity: (
"cohorts",
pi_sum,
)
}
)

if has_windows(ds):
div = window_statistic(
pi,
np.sum,
ds.window_start.values,
ds.window_stop.values,
Comment on lines +77 to +78
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ds.window_{start, stop} is never lazy right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, not at the moment at least.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be? The loose guideline I have in my head is that anything that doesn't run interactively (<1s or so) across a whole genome dataset (~100M variants) should probably be lazy. Is it worth adding an issue to make this lazy or test that the latter is true?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened #340

dtype=pi.dtype,
axis=0,
)
new_ds = Dataset(
{
"stat_diversity": (
("windows", "cohorts"),
div,
)
}
)
else:
new_ds = Dataset(
{
"stat_diversity": (
("variants", "cohorts"),
pi,
)
}
)
return conditional_merge_datasets(ds, variables.validate(new_ds), merge)


Expand Down Expand Up @@ -178,10 +197,32 @@ def divergence(
d = da.map_blocks(_divergence, ac, chunks=shape, dtype=np.float64)
assert_array_shape(d, n_variants, n_cohorts, n_cohorts)

d_sum = d.sum(axis=0)
assert_array_shape(d_sum, n_cohorts, n_cohorts)

new_ds = Dataset({variables.stat_divergence: (("cohorts_0", "cohorts_1"), d_sum)})
if has_windows(ds):
div = window_statistic(
d,
np.sum,
ds.window_start.values,
ds.window_stop.values,
dtype=d.dtype,
axis=0,
)
new_ds = Dataset(
{
"stat_divergence": (
("windows", "cohorts_0", "cohorts_1"),
div,
)
}
)
else:
new_ds = Dataset(
{
"stat_divergence": (
("variants", "cohorts_0", "cohorts_1"),
d,
)
}
)
return conditional_merge_datasets(ds, variables.validate(new_ds), merge)


Expand Down Expand Up @@ -307,10 +348,11 @@ def Fst(
ds, allele_counts=allele_counts, call_genotype=call_genotype, merge=False
).stat_divergence
gs = da.asarray(gs)
shape = (n_cohorts, n_cohorts)
shape = (gs.chunks[0], n_cohorts, n_cohorts)
fst = da.map_blocks(known_estimators[estimator], gs, chunks=shape, dtype=np.float64)
assert_array_shape(fst, n_cohorts, n_cohorts)
new_ds = Dataset({variables.stat_Fst: (("cohorts_0", "cohorts_1"), fst)})
# TODO: reinstate assert (first dim could be either variants or windows)
# assert_array_shape(fst, n_windows, n_cohorts, n_cohorts)
new_ds = Dataset({variables.stat_Fst: (("windows", "cohorts_0", "cohorts_1"), fst)})
return conditional_merge_datasets(ds, variables.validate(new_ds), merge)


Expand Down
203 changes: 177 additions & 26 deletions sgkit/tests/test_popgen.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
import itertools

import allel
import msprime # type: ignore
import numpy as np
import pytest
import xarray as xr
from allel import hudson_fst

from sgkit import Fst, Tajimas_D, create_genotype_call_dataset, divergence, diversity
from sgkit import (
Fst,
Tajimas_D,
count_variant_alleles,
create_genotype_call_dataset,
divergence,
diversity,
)
from sgkit.window import window


def ts_to_dataset(ts, chunks=None, samples=None):
Expand Down Expand Up @@ -39,28 +48,57 @@ def ts_to_dataset(ts, chunks=None, samples=None):
return ds


@pytest.mark.parametrize("size", [2, 3, 10, 100])
@pytest.mark.parametrize("sample_size", [2, 3, 10, 100])
@pytest.mark.parametrize("chunks", [(-1, -1), (10, -1)])
def test_diversity(size, chunks):
ts = msprime.simulate(size, length=100, mutation_rate=0.05, random_seed=42)
def test_diversity(sample_size, chunks):
ts = msprime.simulate(sample_size, length=100, mutation_rate=0.05, random_seed=42)
ds = ts_to_dataset(ts, chunks) # type: ignore[no-untyped-call]
ds = ds.chunk(dict(zip(["variants", "samples"], chunks)))
sample_cohorts = np.full_like(ts.samples(), 0)
ds["sample_cohort"] = xr.DataArray(sample_cohorts, dims="samples")
ds = ds.assign_coords({"cohorts": ["co_0"]})
ds = diversity(ds)
div = ds.stat_diversity.sel(cohorts="co_0").values
div = ds.stat_diversity.sum(axis=0, skipna=False).sel(cohorts="co_0").values
ts_div = ts.diversity(span_normalise=False)
np.testing.assert_allclose(div, ts_div)


@pytest.mark.parametrize("sample_size", [10])
def test_diversity__windowed(sample_size):
ts = msprime.simulate(sample_size, length=200, mutation_rate=0.05, random_seed=42)
ds = ts_to_dataset(ts) # type: ignore[no-untyped-call]
sample_cohorts = np.full_like(ts.samples(), 0)
ds["sample_cohort"] = xr.DataArray(sample_cohorts, dims="samples")
ds = ds.assign_coords({"cohorts": ["co_0"]})
ds = window(ds, size=25, step=25)
ds = diversity(ds)
div = ds["stat_diversity"].sel(cohorts="co_0").compute()

# Calculate diversity using tskit windows
# Find the variant positions so we can have windows with a fixed number of variants
positions = ts.tables.sites.position
windows = np.concatenate(([0], positions[::25][1:], [ts.sequence_length]))
ts_div = ts.diversity(windows=windows, span_normalise=False)
np.testing.assert_allclose(div, ts_div)
Copy link
Collaborator

@ravwojdyla ravwojdyla Oct 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably a bit overarching question, not necessary this PR, but since we heavily depend on dask, I wonder if we should use more of their dask.array.utils.assert_eq (which under the hood calls allclose), which afaiu would handle things like calling compute etc which would potentially make this more "array" idiomatic/agnostic.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. I opened #332


# Calculate diversity using scikit-allel moving_statistic
# (Don't use windowed_diversity, since it treats the last window differently)
ds = count_variant_alleles(ts_to_dataset(ts)) # type: ignore[no-untyped-call]
ac = ds["variant_allele_count"].values
mpd = allel.mean_pairwise_difference(ac, fill=0)
ska_div = allel.moving_statistic(mpd, np.sum, size=25, step=25)
np.testing.assert_allclose(
div[:-1], ska_div
) # scikit-allel has final window missing


@pytest.mark.parametrize(
"size, n_cohorts",
"sample_size, n_cohorts",
[(2, 2), (3, 2), (3, 3), (10, 2), (10, 3), (10, 4), (100, 2), (100, 3), (100, 4)],
)
@pytest.mark.parametrize("chunks", [(-1, -1), (10, -1)])
def test_divergence(size, n_cohorts, chunks):
ts = msprime.simulate(size, length=100, mutation_rate=0.05, random_seed=42)
def test_divergence(sample_size, n_cohorts, chunks):
ts = msprime.simulate(sample_size, length=100, mutation_rate=0.05, random_seed=42)
subsets = np.array_split(ts.samples(), n_cohorts)
ds = ts_to_dataset(ts, chunks) # type: ignore[no-untyped-call]
sample_cohorts = np.concatenate(
Expand All @@ -70,7 +108,7 @@ def test_divergence(size, n_cohorts, chunks):
cohort_names = [f"co_{i}" for i in range(n_cohorts)]
ds = ds.assign_coords({"cohorts_0": cohort_names, "cohorts_1": cohort_names})
ds = divergence(ds)
div = ds.stat_divergence.values
div = ds.stat_divergence.sum(axis=0, skipna=False).values

# entries on the diagonal are diversity values
for i in range(n_cohorts):
Expand All @@ -86,20 +124,86 @@ def test_divergence(size, n_cohorts, chunks):
np.testing.assert_allclose(div, ts_div)


@pytest.mark.parametrize("size", [2, 3, 10, 100])
@pytest.mark.parametrize("chunks", [(-1, -1), (10, -1)])
def test_Fst__Hudson(size, chunks):
@pytest.mark.parametrize("sample_size, n_cohorts", [(10, 2)])
@pytest.mark.parametrize("chunks", [(-1, -1), (50, -1)])
def test_divergence__windowed(sample_size, n_cohorts, chunks):
ts = msprime.simulate(sample_size, length=200, mutation_rate=0.05, random_seed=42)
subsets = np.array_split(ts.samples(), n_cohorts)
ds = ts_to_dataset(ts, chunks) # type: ignore[no-untyped-call]
sample_cohorts = np.concatenate(
[np.full_like(subset, i) for i, subset in enumerate(subsets)]
)
ds["sample_cohort"] = xr.DataArray(sample_cohorts, dims="samples")
cohort_names = [f"co_{i}" for i in range(n_cohorts)]
ds = ds.assign_coords({"cohorts_0": cohort_names, "cohorts_1": cohort_names})
ds = window(ds, size=25, step=25)
ds = divergence(ds)
div = ds["stat_divergence"].values
# test off-diagonal entries, by replacing diagonal with NaNs
div[:, np.arange(2), np.arange(2)] = np.nan

# Calculate diversity using tskit windows
# Find the variant positions so we can have windows with a fixed number of variants
positions = ts.tables.sites.position
windows = np.concatenate(([0], positions[::25][1:], [ts.sequence_length]))
n_windows = len(windows) - 1
ts_div = np.full([n_windows, n_cohorts, n_cohorts], np.nan)
for i, j in itertools.combinations(range(n_cohorts), 2):
ts_div[:, i, j] = ts.divergence(
[subsets[i], subsets[j]], windows=windows, span_normalise=False
)
ts_div[:, j, i] = ts_div[:, i, j]
np.testing.assert_allclose(div, ts_div)


@pytest.mark.parametrize("sample_size, n_cohorts", [(10, 2)])
@pytest.mark.parametrize("chunks", [(-1, -1), (50, -1)])
@pytest.mark.xfail() # combine with test_divergence__windowed when this is passing
def test_divergence__windowed_scikit_allel_comparison(sample_size, n_cohorts, chunks):
ts = msprime.simulate(sample_size, length=200, mutation_rate=0.05, random_seed=42)
subsets = np.array_split(ts.samples(), n_cohorts)
ds = ts_to_dataset(ts, chunks) # type: ignore[no-untyped-call]
sample_cohorts = np.concatenate(
[np.full_like(subset, i) for i, subset in enumerate(subsets)]
)
ds["sample_cohort"] = xr.DataArray(sample_cohorts, dims="samples")
cohort_names = [f"co_{i}" for i in range(n_cohorts)]
ds = ds.assign_coords({"cohorts_0": cohort_names, "cohorts_1": cohort_names})
ds = window(ds, size=25, step=25)
ds = divergence(ds)
div = ds["stat_divergence"].values
# test off-diagonal entries, by replacing diagonal with NaNs
div[:, np.arange(2), np.arange(2)] = np.nan

# Calculate divergence using scikit-allel moving_statistic
# (Don't use windowed_divergence, since it treats the last window differently)
ds1 = count_variant_alleles(ts_to_dataset(ts, samples=ts.samples()[:1])) # type: ignore[no-untyped-call]
ds2 = count_variant_alleles(ts_to_dataset(ts, samples=ts.samples()[1:])) # type: ignore[no-untyped-call]
ac1 = ds1["variant_allele_count"].values
ac2 = ds2["variant_allele_count"].values
mpd = allel.mean_pairwise_difference_between(ac1, ac2, fill=0)
ska_div = allel.moving_statistic(mpd, np.sum, size=25, step=25) # noqa: F841
# TODO: investigate why numbers are different
np.testing.assert_allclose(
div[:-1], ska_div
) # scikit-allel has final window missing


@pytest.mark.parametrize("sample_size", [2, 3, 10, 100])
def test_Fst__Hudson(sample_size):
# scikit-allel can only calculate Fst for pairs of cohorts (populations)
n_cohorts = 2
ts = msprime.simulate(size, length=100, mutation_rate=0.05, random_seed=42)
ts = msprime.simulate(sample_size, length=100, mutation_rate=0.05, random_seed=42)
subsets = np.array_split(ts.samples(), n_cohorts)
ds = ts_to_dataset(ts, chunks) # type: ignore[no-untyped-call]
ds = ts_to_dataset(ts) # type: ignore[no-untyped-call]
sample_cohorts = np.concatenate(
[np.full_like(subset, i) for i, subset in enumerate(subsets)]
)
ds["sample_cohort"] = xr.DataArray(sample_cohorts, dims="samples")
cohort_names = [f"co_{i}" for i in range(n_cohorts)]
ds = ds.assign_coords({"cohorts_0": cohort_names, "cohorts_1": cohort_names})
n_variants = ds.dims["variants"]
ds = window(ds, size=n_variants, step=n_variants) # single window
ds = Fst(ds, estimator="Hudson")
fst = ds.stat_Fst.sel(cohorts_0="co_0", cohorts_1="co_1").values

Expand All @@ -113,27 +217,28 @@ def test_Fst__Hudson(size, chunks):


@pytest.mark.parametrize(
"size, n_cohorts",
"sample_size, n_cohorts",
[(2, 2), (3, 2), (3, 3), (10, 2), (10, 3), (10, 4), (100, 2), (100, 3), (100, 4)],
)
@pytest.mark.parametrize("chunks", [(-1, -1), (10, -1)])
def test_Fst__Nei(size, n_cohorts, chunks):
ts = msprime.simulate(size, length=100, mutation_rate=0.05, random_seed=42)
def test_Fst__Nei(sample_size, n_cohorts):
ts = msprime.simulate(sample_size, length=100, mutation_rate=0.05, random_seed=42)
subsets = np.array_split(ts.samples(), n_cohorts)
ds = ts_to_dataset(ts, chunks) # type: ignore[no-untyped-call]
ds = ts_to_dataset(ts) # type: ignore[no-untyped-call]
sample_cohorts = np.concatenate(
[np.full_like(subset, i) for i, subset in enumerate(subsets)]
)
ds["sample_cohort"] = xr.DataArray(sample_cohorts, dims="samples")
cohort_names = [f"co_{i}" for i in range(n_cohorts)]
ds = ds.assign_coords({"cohorts_0": cohort_names, "cohorts_1": cohort_names})
n_variants = ds.dims["variants"]
ds = window(ds, size=n_variants, step=n_variants) # single window
ds = Fst(ds, estimator="Nei")
fst = ds.stat_Fst.values

ts_fst = np.full([n_cohorts, n_cohorts], np.nan)
ts_fst = np.full([1, n_cohorts, n_cohorts], np.nan)
for i, j in itertools.combinations(range(n_cohorts), 2):
ts_fst[i, j] = ts.Fst([subsets[i], subsets[j]])
ts_fst[j, i] = ts_fst[i, j]
ts_fst[0, i, j] = ts.Fst([subsets[i], subsets[j]])
ts_fst[0, j, i] = ts_fst[0, i, j]
np.testing.assert_allclose(fst, ts_fst)


Expand All @@ -146,13 +251,59 @@ def test_Fst__unknown_estimator():
Fst(ds, estimator="Unknown")


@pytest.mark.parametrize("size", [2, 3, 10, 100])
@pytest.mark.parametrize("chunks", [(-1, -1), (10, -1)])
def test_Tajimas_D(size, chunks):
ts = msprime.simulate(size, length=100, mutation_rate=0.05, random_seed=42)
@pytest.mark.parametrize(
"sample_size, n_cohorts",
[(10, 2)],
)
@pytest.mark.parametrize("chunks", [(-1, -1), (50, -1)])
def test_Fst__windowed(sample_size, n_cohorts, chunks):
ts = msprime.simulate(sample_size, length=200, mutation_rate=0.05, random_seed=42)
subsets = np.array_split(ts.samples(), n_cohorts)
ds = ts_to_dataset(ts, chunks) # type: ignore[no-untyped-call]
sample_cohorts = np.concatenate(
[np.full_like(subset, i) for i, subset in enumerate(subsets)]
)
ds["sample_cohort"] = xr.DataArray(sample_cohorts, dims="samples")
cohort_names = [f"co_{i}" for i in range(n_cohorts)]
ds = ds.assign_coords({"cohorts_0": cohort_names, "cohorts_1": cohort_names})
ds = window(ds, size=25, step=25)
fst_ds = Fst(ds, estimator="Nei")
fst = fst_ds["stat_Fst"].values

# Calculate Fst using tskit windows
# Find the variant positions so we can have windows with a fixed number of variants
positions = ts.tables.sites.position
windows = np.concatenate(([0], positions[::25][1:], [ts.sequence_length]))
n_windows = len(windows) - 1
ts_fst = np.full([n_windows, n_cohorts, n_cohorts], np.nan)
for i, j in itertools.combinations(range(n_cohorts), 2):
ts_fst[:, i, j] = ts.Fst(
[subsets[i], subsets[j]], windows=windows, span_normalise=False
)
ts_fst[:, j, i] = ts_fst[:, i, j]

np.testing.assert_allclose(fst, ts_fst)

fst_ds = Fst(ds, estimator="Hudson")
fst = fst_ds["stat_Fst"].sel(cohorts_0="co_0", cohorts_1="co_1").values

ac1 = fst_ds.cohort_allele_count.values[:, 0, :]
ac2 = fst_ds.cohort_allele_count.values[:, 1, :]
ska_fst = allel.moving_hudson_fst(ac1, ac2, size=25, step=25)

np.testing.assert_allclose(
fst[:-1], ska_fst
) # scikit-allel has final window missing


@pytest.mark.parametrize("sample_size", [2, 3, 10, 100])
def test_Tajimas_D(sample_size):
ts = msprime.simulate(sample_size, length=100, mutation_rate=0.05, random_seed=42)
ds = ts_to_dataset(ts) # type: ignore[no-untyped-call]
sample_cohorts = np.full_like(ts.samples(), 0)
ds["sample_cohort"] = xr.DataArray(sample_cohorts, dims="samples")
n_variants = ds.dims["variants"]
ds = window(ds, size=n_variants, step=n_variants) # single window
ds = Tajimas_D(ds)
d = ds.stat_Tajimas_D.compute()
ts_d = ts.Tajimas_D()
Expand Down
Loading