Skip to content

Commit 1436509

Browse files
authored
Merge pull request #1489 from jmunroe/dask_dataframe
lazily load dask arrays to dask data frames by calling to_dask_dataframe
2 parents da7972c + ab8180b commit 1436509

File tree

7 files changed

+184
-5
lines changed

7 files changed

+184
-5
lines changed

doc/api.rst

+1
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,7 @@ Dataset methods
419419
save_mfdataset
420420
Dataset.to_array
421421
Dataset.to_dataframe
422+
Dataset.to_dask_dataframe
422423
Dataset.to_dict
423424
Dataset.from_dataframe
424425
Dataset.from_dict

doc/dask.rst

+10-1
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,15 @@ Once you've manipulated a dask array, you can still write a dataset too big to
100100
fit into memory back to disk by using :py:meth:`~xarray.Dataset.to_netcdf` in the
101101
usual way.
102102

103+
A dataset can also be converted to a dask DataFrame using :py:meth:`~xarray.Dataset.to_dask_dataframe`.
104+
105+
.. ipython:: python
106+
107+
df = ds.to_dask_dataframe()
108+
df
109+
110+
Dask DataFrames do not support multi-indexes so the coordinate variables from the dataset are included as columns in the dask DataFrame.
111+
103112
Using dask with xarray
104113
----------------------
105114

@@ -145,7 +154,7 @@ Explicit conversion by wrapping a DataArray with ``np.asarray`` also works:
145154
...
146155

147156
Alternatively you can load the data into memory but keep the arrays as
148-
dask arrays using the `~xarray.Dataset.persist` method:
157+
dask arrays using the :py:meth:`~xarray.Dataset.persist` method:
149158

150159
.. ipython::
151160

doc/pandas.rst

+3
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ To convert the ``DataFrame`` to any other convenient representation,
6060
use ``DataFrame`` methods like :py:meth:`~pandas.DataFrame.reset_index`,
6161
:py:meth:`~pandas.DataFrame.stack` and :py:meth:`~pandas.DataFrame.unstack`.
6262

63+
For datasets containing dask arrays where the data should be lazily loaded, see the
64+
:py:meth:`Dataset.to_dask_dataframe() <xarray.Dataset.to_dask_dataframe>` method.
65+
6366
To create a ``Dataset`` from a ``DataFrame``, use the
6467
:py:meth:`~xarray.Dataset.from_dataframe` class method or the equivalent
6568
:py:meth:`pandas.DataFrame.to_xarray <DataFrame.to_xarray>` method (pandas

doc/whats-new.rst

+5
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,11 @@ Enhancements
244244
functions on data stored as dask arrays (:issue:`1279`).
245245
By `Joe Hamman <https://github.com/jhamman>`_.
246246

247+
- Added new method :py:meth:`~Dataset.to_dask_dataframe` to
248+
``Dataset``, convert a dataset into a dask dataframe.
249+
This allows lazy loading of data from a dataset containing dask arrays (:issue:`1462`).
250+
By `James Munroe <https://github.com/jmunroe>`_.
251+
247252
- Support reading and writing unlimited dimensions with h5netcdf (:issue:`1636`).
248253
By `Joe Hamman <https://github.com/jhamman>`_.
249254

xarray/core/dataset.py

+60
Original file line numberDiff line numberDiff line change
@@ -2577,6 +2577,66 @@ def from_dataframe(cls, dataframe):
25772577
obj[name] = (dims, data)
25782578
return obj
25792579

2580+
def to_dask_dataframe(self, set_index=False):
2581+
"""
2582+
Convert this dataset into a dask.dataframe.DataFrame.
2583+
2584+
Both the coordinate and data variables in this dataset form
2585+
the columns of the DataFrame.
2586+
2587+
If set_index=True, the dask DataFrame is indexed by this dataset's
2588+
coordinate. Since dask DataFrames to not support multi-indexes,
2589+
set_index only works if there is one coordinate dimension.
2590+
"""
2591+
2592+
import dask.dataframe as dd
2593+
2594+
ordered_dims = self.dims
2595+
chunks = self.chunks
2596+
2597+
# order columns so that coordinates appear before data
2598+
columns = list(self.coords) + list(self.data_vars)
2599+
2600+
data = []
2601+
for k in columns:
2602+
v = self._variables[k]
2603+
2604+
# consider coordinate variables as well as data varibles
2605+
if isinstance(v, xr.IndexVariable):
2606+
v = v.to_base_variable()
2607+
2608+
# ensure all variables span the same dimensions
2609+
v = v.set_dims(ordered_dims)
2610+
2611+
# ensure all variables have the same chunking structure
2612+
if v.chunks != chunks:
2613+
v = v.chunk(chunks)
2614+
2615+
# reshape variable contents as a 1d array
2616+
d = v.data.reshape(-1)
2617+
2618+
# convert to dask DataFrames
2619+
s = dd.from_array(d, columns=[k])
2620+
2621+
data.append(s)
2622+
2623+
df = dd.concat(data, axis=1)
2624+
2625+
if set_index:
2626+
2627+
if len(ordered_dims) != 1:
2628+
raise ValueError(
2629+
'set_index=True only is valid for '
2630+
'for one-dimensional datasets')
2631+
2632+
# extract out first (and only) coordinate variable
2633+
coord_dim = list(ordered_dims)[0]
2634+
2635+
if coord_dim in df.columns:
2636+
df = df.set_index(coord_dim)
2637+
2638+
return df
2639+
25802640
def to_dict(self):
25812641
"""
25822642
Convert this dataset to a dictionary following xarray naming

xarray/tests/__init__.py

+6
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,12 @@
1717
from xarray.testing import assert_equal, assert_identical, assert_allclose
1818
from xarray.plot.utils import import_seaborn
1919

20+
try:
21+
from pandas.testing import assert_frame_equal
22+
except ImportError:
23+
# old location, for pandas < 0.20
24+
from pandas.util.testing import assert_frame_equal
25+
2026
try:
2127
import unittest2 as unittest
2228
except ImportError:

xarray/tests/test_dask.py

+99-4
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,14 @@
1313
import xarray as xr
1414
from xarray import Variable, DataArray, Dataset
1515
import xarray.ufuncs as xu
16-
from xarray.core.pycompat import suppress
17-
from . import TestCase
16+
from xarray.core.pycompat import suppress, OrderedDict
17+
from . import TestCase, assert_frame_equal
1818

1919
from xarray.tests import mock
2020

2121
dask = pytest.importorskip('dask')
2222
import dask.array as da
23+
import dask.dataframe as dd
2324

2425

2526
class DaskTestCase(TestCase):
@@ -29,9 +30,9 @@ def assertLazyAnd(self, expected, actual, test):
2930
if isinstance(actual, Dataset):
3031
for k, v in actual.variables.items():
3132
if k in actual.dims:
32-
self.assertIsInstance(var.data, np.ndarray)
33+
self.assertIsInstance(v.data, np.ndarray)
3334
else:
34-
self.assertIsInstance(var.data, da.Array)
35+
self.assertIsInstance(v.data, da.Array)
3536
elif isinstance(actual, DataArray):
3637
self.assertIsInstance(actual.data, da.Array)
3738
for k, v in actual.coords.items():
@@ -546,6 +547,100 @@ def test_from_dask_variable(self):
546547
coords={'x': range(4)}, name='foo')
547548
self.assertLazyAndIdentical(self.lazy_array, a)
548549

550+
def test_to_dask_dataframe(self):
551+
# Test conversion of Datasets to dask DataFrames
552+
x = da.from_array(np.random.randn(10), chunks=4)
553+
y = np.arange(10, dtype='uint8')
554+
t = list('abcdefghij')
555+
556+
ds = Dataset(OrderedDict([('a', ('t', x)),
557+
('b', ('t', y)),
558+
('t', ('t', t))]))
559+
560+
expected_pd = pd.DataFrame({'a': x,
561+
'b': y},
562+
index=pd.Index(t, name='t'))
563+
564+
# test if 1-D index is correctly set up
565+
expected = dd.from_pandas(expected_pd, chunksize=4)
566+
actual = ds.to_dask_dataframe(set_index=True)
567+
# test if we have dask dataframes
568+
self.assertIsInstance(actual, dd.DataFrame)
569+
570+
# use the .equals from pandas to check dataframes are equivalent
571+
assert_frame_equal(expected.compute(), actual.compute())
572+
573+
# test if no index is given
574+
expected = dd.from_pandas(expected_pd.reset_index(drop=False),
575+
chunksize=4)
576+
577+
actual = ds.to_dask_dataframe(set_index=False)
578+
579+
self.assertIsInstance(actual, dd.DataFrame)
580+
assert_frame_equal(expected.compute(), actual.compute())
581+
582+
def test_to_dask_dataframe_2D(self):
583+
# Test if 2-D dataset is supplied
584+
w = da.from_array(np.random.randn(2, 3), chunks=(1, 2))
585+
ds = Dataset({'w': (('x', 'y'), w)})
586+
ds['x'] = ('x', np.array([0, 1], np.int64))
587+
ds['y'] = ('y', list('abc'))
588+
589+
# dask dataframes do not (yet) support multiindex,
590+
# but when it does, this would be the expected index:
591+
exp_index = pd.MultiIndex.from_arrays(
592+
[[0, 0, 0, 1, 1, 1], ['a', 'b', 'c', 'a', 'b', 'c']],
593+
names=['x', 'y'])
594+
expected = pd.DataFrame({'w': w.reshape(-1)},
595+
index=exp_index)
596+
# so for now, reset the index
597+
expected = expected.reset_index(drop=False)
598+
599+
actual = ds.to_dask_dataframe(set_index=False)
600+
601+
self.assertIsInstance(actual, dd.DataFrame)
602+
assert_frame_equal(expected, actual.compute())
603+
604+
def test_to_dask_dataframe_coordinates(self):
605+
# Test if coordinate is also a dask array
606+
x = da.from_array(np.random.randn(10), chunks=4)
607+
t = da.from_array(np.arange(10)*2, chunks=4)
608+
609+
ds = Dataset(OrderedDict([('a', ('t', x)),
610+
('t', ('t', t))]))
611+
612+
expected_pd = pd.DataFrame({'a': x},
613+
index=pd.Index(t, name='t'))
614+
expected = dd.from_pandas(expected_pd, chunksize=4)
615+
actual = ds.to_dask_dataframe(set_index=True)
616+
self.assertIsInstance(actual, dd.DataFrame)
617+
assert_frame_equal(expected.compute(), actual.compute())
618+
619+
def test_to_dask_dataframe_not_daskarray(self):
620+
# Test if DataArray is not a dask array
621+
x = np.random.randn(10)
622+
y = np.arange(10, dtype='uint8')
623+
t = list('abcdefghij')
624+
625+
ds = Dataset(OrderedDict([('a', ('t', x)),
626+
('b', ('t', y)),
627+
('t', ('t', t))]))
628+
629+
expected = pd.DataFrame({'a': x, 'b': y},
630+
index=pd.Index(t, name='t'))
631+
632+
actual = ds.to_dask_dataframe(set_index=True)
633+
self.assertIsInstance(actual, dd.DataFrame)
634+
assert_frame_equal(expected, actual.compute())
635+
636+
def test_to_dask_dataframe_no_coordinate(self):
637+
# Test if Dataset has a dimension without coordinates
638+
x = da.from_array(np.random.randn(10), chunks=4)
639+
ds = Dataset({'x': ('dim_0', x)})
640+
expected = pd.DataFrame({'x': x.compute()})
641+
actual = ds.to_dask_dataframe(set_index=True)
642+
assert_frame_equal(expected, actual.compute())
643+
549644

550645
@pytest.mark.parametrize("method", ['load', 'compute'])
551646
def test_dask_kwargs_variable(method):

0 commit comments

Comments
 (0)