Skip to content

Pairwise distance calculation #241

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
jeromekelleher opened this issue Sep 8, 2020 · 17 comments
Closed

Pairwise distance calculation #241

jeromekelleher opened this issue Sep 8, 2020 · 17 comments

Comments

@jeromekelleher
Copy link
Collaborator

A fundamental operation that we want to support is efficiently calculating pairwise distances over a number of different metrics across a number of different computational backends. See the prototype as an example of what we're trying to achieve.

@alimanfoo - I think you had a blog post about this also?

@alimanfoo
Copy link
Collaborator

Hi @jeromekelleher, yes @jakirkham and I started a blog post last year but afraid I haven't had time to see it through. FWIW it's over here: dask/dask-blog#50

@jakirkham
Copy link

It would still be cool to finish that blog 😉

@jeromekelleher
Copy link
Collaborator Author

Just had a chat with @aktech, and he's going to have a look at this.

@aktech
Copy link
Contributor

aktech commented Sep 24, 2020

Some of the potential options that I am looking at for dispatching:

NEP 37

__array_module__ : NEP 37: https://numpy.org/neps/nep-0037-array-module.html
This is in draft, but ready to be used, it will be merged in the numpy by end of this year meanwhile we can use the external package from here: https://github.com/seberg/numpy-dispatch, developed by the author of NEP37 (also a numpy core developer.)

NEP 18

__array_function__ : NEP 18: https://numpy.org/neps/nep-0018-array-function-protocol.html
This is not great for long-term as NEP37 is a better successor due to the following reasons:

  • Overrides existing numpy namespace
  • Currently requires all or nothing approach to implementing numpy’s API
  • No good pathway for incremental adoption
  • Would result in breaking changes
  • Backward compatibility concerns
  • Limitation on what can be overridden

unumpy

unumpy builds on top of uarray. It is an effort to specify the core NumPy API, and provide backends for the API.

@alimanfoo
Copy link
Collaborator

Just to add a bit more information about some motivating use cases. Prime motivation was to analyse population structure by computing pairwise genetic distance between individuals. From the distance matrix we would typically compute and visualise a neighbour joining tree. Here's what such a tree might look like:

Screenshot from 2020-09-24 17-24-54

Taking this further, we also do an analysis where we compute pairwise distances in windows along the genome, and then compare those distance matrices to each other, to look for regions of the genome that convey similar (or different/unusual) patterns of relatedness. Here's a figure from our 2017 paper where we scanned the genome and painted it according to different patterns of relatedness:

Screenshot from 2020-09-24 17-33-34

That analysis was done by computing a pairwise distance matrix for each 200 kbp window along the genome, then computing a pairwise correlation matrix between those distance matrices, then doing dimensionality reduction.

@eric-czech
Copy link
Collaborator

eric-czech commented Sep 24, 2020

The only way I can find to make this work with gufuncs (which again would cover the numpy/cupy backend cases well), is with broadcasting. This could actually be quite nice to build on if these benchmarks hold up on bigger datasets:

from numba import guvectorize, njit
import dask.array as da
import numpy as np

# Generalized function that leaves pairwise definition up to caller
@guvectorize(['void(int8[:], int8[:], float64[:])'], '(n),(n)->()')
def corr1(x, y, out):
    out[0] = np.corrcoef(x, y)[0, 1]
    
# A more typical pairwise compiled function
@njit
def corr2(x):
    n = x.shape[0]
    out = np.zeros((n, n), dtype=np.float64)
    for i in range(n):
        for j in range(n):
            out[i, j] = np.corrcoef(x[i], x[j])[0, 1]
    return out

# Make sure the results are the same for all functions
rs = da.random.RandomState(0)
x = rs.randint(0, 3, size=(500, 1000), dtype='i1')
c1 = corr1(x[:, None, :], x).compute() # Implicit broadcast creates pairs row vector pairs
c2 = corr2(x.compute())
c3 = np.corrcoef(x)
np.testing.assert_almost_equal(c1, c2)
np.testing.assert_almost_equal(c1, c3)

# Run some benchmarks
%%timeit
corr1(x[:, None, :], x).compute()
3.04 s ± 50.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%%timeit
corr2(x.compute())
3.02 s ± 19 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

tl;dr numba.guvectorize can be used for pairwise calcs and is as fast as numba.njit for pairwise correlation on small datasets

@eric-czech
Copy link
Collaborator

eric-czech commented Sep 25, 2020

Thinking a little more about this, I believe there may be some utility in trying to find primitives like these to build on (the map/reduce idea I mentioned on the call yesterday):

import numpy as np
import dask.array as da
from numba import guvectorize

rs = da.random.RandomState(0)
x = rs.randint(0, 3, size=(18, 40), dtype='i1').rechunk((6, 10))

# Pearson correlation "map" function for partial vector pairs
@guvectorize(['void(int8[:], int8[:], float64[:], float64[:])'], '(n),(n),(p)->(p)')
def corr_map(x, y, _, out):
    out[:] = np.array([
        np.sum(x),
        np.sum(y),
        np.sum(x * x),
        np.sum(y * y),
        np.sum(x * y),
        len(x)
    ])

# Corresponding "reduce" function
@guvectorize(['void(float64[:,:], float64[:])'], '(p,m)->()')
def corr_reduce(v, out):
    v = v.sum(axis=-1)
    n = v[5]
    num = n * v[4] - v[0] * v[1]
    denom1 = np.sqrt(n * v[2] - v[0]**2)
    denom2 = np.sqrt(n * v[3] - v[1]**2)
    out[0] = num / (denom1 * denom2)

# A generic pairwise function for any separable distance metric
def pairwise(x, map_fn, reduce_fn, n_map_param):
    # Pair rows of blocks together that correspond to the
    # grid element in the resulting pairwise matrix before
    # then computing partial calculations for each pair of
    # row vectors within those paired blocks
    x_map = da.concatenate([
        da.concatenate([
            da.stack([
                map_fn(
                    x.blocks[i1, j][:, None, :],
                    x.blocks[i2, j],
                    np.empty(n_map_param))
                for j in range(x.numblocks[1])
            ], axis=-1)
            for i2 in range(x.numblocks[0])
        ], axis=1)
        for i1 in range(x.numblocks[0])
    ], axis=0)
    assert x_map.shape == (len(x), len(x), n_map_param, x.numblocks[1])

    # Apply reduction to arrays with shape (n_map_param, n_column_chunk),
    # which would easily fit in memory
    x_reduce = reduce_fn(x_map.rechunk((None, None, -1, -1)))
    return x_reduce

# There are 6 intermediate values in this case
x_corr = pairwise(x, corr_map, corr_reduce, 6)

np.testing.assert_almost_equal(x_corr.compute(), np.corrcoef(x).compute())

x_corr.compute().round(2)[:5, :5]
array([[ 1.  ,  0.09, -0.03,  0.14,  0.28],
       [ 0.09,  1.  ,  0.04,  0.07,  0.03],
       [-0.03,  0.04,  1.  ,  0.32, -0.25],
       [ 0.14,  0.07,  0.32,  1.  ,  0.07],
       [ 0.28,  0.03, -0.25,  0.07,  1.  ]])

The logic there is a bit gnarly and hard to put in words, but I it's not as complicated as it looks if there was an easy way to visualize it. This would support distance metrics on arrays with chunking in both dimensions and let us program conditional logic for sentinel values easily. That's particularly important given that even a simple metric like this doesn't have nan-aware support in dask (i.e. in da.corrcoef). And even if it did, it would only work if we promoted our 1-byte int types to larger floats.

I view this as fairly important since at least being able to compute something like a genetic relatedness matrix on fully chunked arrays is something we ought to be able to do. I think this would also scale down well to cases where chunking is only in one dimension and at a glance, it looks like everything in https://github.com/scikit-allel/skallel-stats/blob/master/src/skallel_stats/distance/numpy_backend.py could be implemented this way. Computing only the upper or lower triangle for symmetric metrics would be tricky, but it's probably doable too.

@jakirkham
Copy link

FWIW I did some work some time back on wrapping up distance computations with Dask in dask-distance, which may be of some use here.

@aktech
Copy link
Contributor

aktech commented Oct 1, 2020

@eric-czech Thanks for the example, I thought I understood your map reduce idea and
started implementing the same for Euclidean distance and while implementing
I realised, I might not have understood it well enough.

One question I have regarding this is consider an example of euclidean distance,
between all pair of vectors. If I understand your idea properly then
the map and reduce steps will be the following

For a given set of vectors: V = [v1, v2, v3, ..., vn]

  • Map step:

[(v1 - v2), (v1 - v3), ... (v2 - v3), ... (vn - vn-1)]

  • Reduce step:

[sqrt(sum((v1 - v2)**2)), ... sqrt(sum(((vn - vn-1)**2)]

What I don't clearly understand here is, what is the benefit of
separating the last step of square root of the squared sum, which can be
done in the map step as well, with same parallelisation?

@eric-czech
Copy link
Collaborator

Hey @aktech, I don't see how the square root could be in the map step. What I would have imagined for euclidean distance is something like:

  • Map (where vN_M is the Mth part of the Nth vector)
    • On chunk 1: c1 = [sum((v1_1 - v2_1)**2), sum((v1_1 - v3_1)**2, ...]
    • On chunk 2: c2 = [sum((v1_2 - v2_2)**2), sum((v1_2 - v3_2)**2, ...]
  • Reduce
    • sqrt(sum(c1 + c2 + ...))

That's not great pseudocode but hopefully that makes some more sense. The map step would have to compress the two vectors down to some small number of scalars to be very useful.

@eric-czech
Copy link
Collaborator

Looking at dask-distance some more (thanks for that bump @jakirkham!), it makes me wonder:

  1. How practical is using numba.guvectorize as a target for our custom logic to compile to GPU and CPU?
  2. Should we try to wrap our functions in blockwise instead? Despite some attempts, I still don't really understand how that function works or what performance benefits it might offer over working with Array.blocks instead.
  3. Are there performance problems with using broadcasting to pair vectors?

Do you have any advice for us on those @jakirkham?

@aktech
Copy link
Contributor

aktech commented Oct 1, 2020

@eric-czech Thanks for the explanation. That makes sense. I think I
missed the part that the map function is called over "chunks" (not a complete vector)
and those chunks are reduced in the reduce function. I imagined, the map
was on a pair of full vectors, that's why I thought the reduce was unnecessary.

@eric-czech
Copy link
Collaborator

I imagined, the map was on a pair of full vectors, that's why I thought the reduce was unnecessary.

Makes sense, I agree this would be unnecessary if the arrays aren't so big that you can fit them comfortably in memory along at least one dimension.

@jakirkham
Copy link

Using guvectorize is also a great idea! Dask Arrays support having gufuncs called on them directly. It won't necessarily pick up on any symmetry that might be there in the computation. Though one could use numpy.triu or numpy.tril on the result. Dask will replace the unused portion with zeros. So it won't require any extra compute for off diagonal chunks. However if one wants the diagonal chunks to be computed efficiently, one might need to apply a different function to handle this optimally. Not sure how much this latter point matters for this use case.

@eric-czech
Copy link
Collaborator

Thanks again @jakirkham. On a related note, do you know an efficient way to go from a symmetric matrix to something like the dask-distance.squareform, but with column/row indexes in a dataframe? There are several use cases we have for that, and I'm wondering if there is a better way to do it than raveling the whole array, concatenating all the indexes to that 1D result (as in Dataset.to_dask_dataframe) and then filtering. E.g.:

import xarray as xr
import numpy as np
dist = np.array([[1, .5], [.5, 1]])
ds = xr.Dataset(dict(dist=(('i', 'j'), dist)))
ds.to_dask_dataframe().query('i >= j').compute()
   i  j  dist
0  0  0   1.0
2  1  0   0.5
3  1  1   1.0

It would be great if there was a way to avoid that filter.

@tomwhite
Copy link
Collaborator

tomwhite commented Nov 5, 2020

Can this be closed now that #306 has been merged?

@aktech
Copy link
Contributor

aktech commented Nov 5, 2020

Can this be closed now that #306 has been merged?

I think so.

@tomwhite tomwhite closed this as completed Nov 5, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants