Skip to content

Xray v0.5.2 updates #478

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 16, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ Enhancements
cannot be otherwise merged automatically, e.g., if the original datasets
have conflicting index coordinates (:issue:`443`).
- :py:func:`~xray.open_dataset` and :py:func:`~xray.open_mfdataset` now use a
thread lock by default for reading from netCDF files. This avoids possible
segmentation faults for reading from netCDF4 files when HDF5 is not
configured properly for concurrent access (:issue:`444`).
global thread lock by default for reading from netCDF files with dask. This
avoids possible segmentation faults for reading from netCDF4 files when HDF5
is not configured properly for concurrent access (:issue:`444`).
- Added support for serializing arrays of complex numbers with `engine='h5netcdf'`.
- The new :py:func:`~xray.save_mfdataset` function allows for saving multiple
datasets to disk simultaneously. This is useful when processing large datasets
Expand All @@ -51,7 +51,7 @@ Enhancements
Bug fixes
~~~~~~~~~

- Fix ``min``, ``max``, ``argmin`` and ``argmax``for arrays with string or
- Fixed ``min``, ``max``, ``argmin`` and ``argmax``for arrays with string or
unicode types (:issue:`453`).
- :py:func:`~xray.open_dataset` and :py:func:`~xray.open_mfdataset` support
supplying chunks as a single integer.
Expand Down
65 changes: 51 additions & 14 deletions xray/backends/api.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import sys
import gzip
import threading
from glob import glob
from io import BytesIO

Expand Down Expand Up @@ -36,10 +37,34 @@ def _get_default_engine(path, allow_remote=False):
return engine


_global_lock = threading.Lock()


def _default_lock(filename, engine):
if filename.endswith('.gz'):
lock = False
else:
if engine is None:
engine = _get_default_engine(filename, allow_remote=True)

if engine == 'netcdf4':
if is_remote_uri(filename):
lock = False
else:
# TODO: identify netcdf3 files and don't use the global lock
# for them
lock = _global_lock
elif engine == 'h5netcdf':
lock = _global_lock
else:
lock = False
return lock


def open_dataset(filename_or_obj, group=None, decode_cf=True,
mask_and_scale=True, decode_times=True,
concat_characters=True, decode_coords=True, engine=None,
chunks=None, lock=True):
chunks=None, lock=None):
"""Load and decode a dataset from a file or file-like object.

Parameters
Expand Down Expand Up @@ -80,10 +105,12 @@ def open_dataset(filename_or_obj, group=None, decode_cf=True,
If chunks is provided, it used to load the new dataset into dask
arrays. This is an experimental feature; see the documentation for more
details.
lock : optional
lock : False, True or threading.Lock, optional
If chunks is provided, this argument is passed on to
:py:func:`dask.array.from_array`. By default, a lock is used to avoid
issues with concurrent access with dask's multithreaded backend.
:py:func:`dask.array.from_array`. By default, a per-variable lock is
used when reading data from netCDF files with the netcdf4 and h5netcdf
engines to avoid issues with concurrent access when using dask's
multithreaded backend.

Returns
-------
Expand All @@ -100,7 +127,7 @@ def open_dataset(filename_or_obj, group=None, decode_cf=True,
concat_characters = False
decode_coords = False

def maybe_decode_store(store):
def maybe_decode_store(store, lock=False):
ds = conventions.decode_cf(
store, mask_and_scale=mask_and_scale, decode_times=decode_times,
concat_characters=concat_characters, decode_coords=decode_coords)
Expand Down Expand Up @@ -129,8 +156,6 @@ def maybe_decode_store(store):
else:
raise
else:
# TODO: automatically fall back to using pydap if given a URL and
# netCDF4 is not available
if engine is None:
engine = _get_default_engine(filename_or_obj,
allow_remote=True)
Expand All @@ -145,15 +170,17 @@ def maybe_decode_store(store):
else:
raise ValueError('unrecognized engine for open_dataset: %r'
% engine)

if lock is None:
lock = _default_lock(filename_or_obj, engine)
with close_on_error(store):
return maybe_decode_store(store)
return maybe_decode_store(store, lock)
else:
if engine is not None and engine != 'scipy':
raise ValueError('can only read file-like objects with '
"default engine or engine='scipy'")
# assume filename_or_obj is a file-like object
store = backends.ScipyDataStore(filename_or_obj)

return maybe_decode_store(store)


Expand All @@ -167,7 +194,7 @@ def close(self):


def open_mfdataset(paths, chunks=None, concat_dim=None, preprocess=None,
lock=True, **kwargs):
engine=None, lock=None, **kwargs):
"""Open multiple files as a single dataset.

Experimental. Requires dask to be installed.
Expand All @@ -191,10 +218,15 @@ def open_mfdataset(paths, chunks=None, concat_dim=None, preprocess=None,
want to stack a collection of 2D arrays along a third dimension.
preprocess : callable, optional
If provided, call this function on each dataset prior to concatenation.
lock : optional
engine : {'netcdf4', 'scipy', 'pydap', 'h5netcdf'}, optional
Engine to use when reading netCDF files. If not provided, the default
engine is chosen based on available dependencies, with a preference for
'netcdf4'.
lock : False, True or threading.Lock, optional
This argument is passed on to :py:func:`dask.array.from_array`. By
default, a lock is used to avoid issues with concurrent access with
dask's multithreaded backend.
default, a per-variable lock is used when reading data from netCDF
files with the netcdf4 and h5netcdf engines to avoid issues with
concurrent access when using dask's multithreaded backend.
**kwargs : optional
Additional arguments passed on to :py:func:`xray.open_dataset`.

Expand All @@ -211,11 +243,16 @@ def open_mfdataset(paths, chunks=None, concat_dim=None, preprocess=None,
paths = sorted(glob(paths))
if not paths:
raise IOError('no files to open')
datasets = [open_dataset(p, **kwargs) for p in paths]

datasets = [open_dataset(p, engine=engine, **kwargs) for p in paths]
if lock is None:
lock = _default_lock(paths[0], engine)
file_objs = [ds._file_obj for ds in datasets]
datasets = [ds.chunk(chunks, lock=lock) for ds in datasets]

if preprocess is not None:
datasets = [preprocess(ds) for ds in datasets]

combined = auto_combine(datasets, concat_dim=concat_dim)
combined._file_obj = _MultiFileCloser(file_objs)
return combined
Expand Down
9 changes: 8 additions & 1 deletion xray/test/test_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,9 @@ def test_complex(self):
self.assertDatasetEqual(expected, actual)

def test_cross_engine_read_write_netcdf4(self):
# Drop dim3, because its labels include strings. These appear to be
# not properly read with python-netCDF4, which converts them into
# unicode instead of leaving them as bytes.
data = create_test_data().drop('dim3')
data.attrs['foo'] = 'bar'
valid_engines = ['netcdf4', 'h5netcdf']
Expand All @@ -725,6 +728,7 @@ def test_read_byte_attrs_as_unicode(self):


@requires_dask
@requires_scipy
@requires_netCDF4
class DaskTest(TestCase):
def test_open_mfdataset(self):
Expand Down Expand Up @@ -757,13 +761,16 @@ def test_preprocess_mfdataset(self):
def test_lock(self):
original = Dataset({'foo': ('x', np.random.randn(10))})
with create_tmp_file() as tmp:
original.to_netcdf(tmp)
original.to_netcdf(tmp, format='NETCDF3_CLASSIC')
with open_dataset(tmp, chunks=10) as ds:
task = ds.foo.data.dask[ds.foo.data.name, 0]
self.assertIsInstance(task[-1], type(Lock()))
with open_mfdataset(tmp) as ds:
task = ds.foo.data.dask[ds.foo.data.name, 0]
self.assertIsInstance(task[-1], type(Lock()))
with open_mfdataset(tmp, engine='scipy') as ds:
task = ds.foo.data.dask[ds.foo.data.name, 0]
self.assertNotIsInstance(task[-1], type(Lock()))

def test_save_mfdataset_roundtrip(self):
original = Dataset({'foo': ('x', np.random.randn(10))})
Expand Down