diff --git a/doc/whats-new.rst b/doc/whats-new.rst index d2b44ca77f7..f4805d233da 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -30,9 +30,9 @@ Enhancements cannot be otherwise merged automatically, e.g., if the original datasets have conflicting index coordinates (:issue:`443`). - :py:func:`~xray.open_dataset` and :py:func:`~xray.open_mfdataset` now use a - thread lock by default for reading from netCDF files. This avoids possible - segmentation faults for reading from netCDF4 files when HDF5 is not - configured properly for concurrent access (:issue:`444`). + global thread lock by default for reading from netCDF files with dask. This + avoids possible segmentation faults for reading from netCDF4 files when HDF5 + is not configured properly for concurrent access (:issue:`444`). - Added support for serializing arrays of complex numbers with `engine='h5netcdf'`. - The new :py:func:`~xray.save_mfdataset` function allows for saving multiple datasets to disk simultaneously. This is useful when processing large datasets @@ -51,7 +51,7 @@ Enhancements Bug fixes ~~~~~~~~~ -- Fix ``min``, ``max``, ``argmin`` and ``argmax``for arrays with string or +- Fixed ``min``, ``max``, ``argmin`` and ``argmax``for arrays with string or unicode types (:issue:`453`). - :py:func:`~xray.open_dataset` and :py:func:`~xray.open_mfdataset` support supplying chunks as a single integer. diff --git a/xray/backends/api.py b/xray/backends/api.py index 81e7e7947d1..2d113593f42 100644 --- a/xray/backends/api.py +++ b/xray/backends/api.py @@ -1,5 +1,6 @@ import sys import gzip +import threading from glob import glob from io import BytesIO @@ -36,10 +37,34 @@ def _get_default_engine(path, allow_remote=False): return engine +_global_lock = threading.Lock() + + +def _default_lock(filename, engine): + if filename.endswith('.gz'): + lock = False + else: + if engine is None: + engine = _get_default_engine(filename, allow_remote=True) + + if engine == 'netcdf4': + if is_remote_uri(filename): + lock = False + else: + # TODO: identify netcdf3 files and don't use the global lock + # for them + lock = _global_lock + elif engine == 'h5netcdf': + lock = _global_lock + else: + lock = False + return lock + + def open_dataset(filename_or_obj, group=None, decode_cf=True, mask_and_scale=True, decode_times=True, concat_characters=True, decode_coords=True, engine=None, - chunks=None, lock=True): + chunks=None, lock=None): """Load and decode a dataset from a file or file-like object. Parameters @@ -80,10 +105,12 @@ def open_dataset(filename_or_obj, group=None, decode_cf=True, If chunks is provided, it used to load the new dataset into dask arrays. This is an experimental feature; see the documentation for more details. - lock : optional + lock : False, True or threading.Lock, optional If chunks is provided, this argument is passed on to - :py:func:`dask.array.from_array`. By default, a lock is used to avoid - issues with concurrent access with dask's multithreaded backend. + :py:func:`dask.array.from_array`. By default, a per-variable lock is + used when reading data from netCDF files with the netcdf4 and h5netcdf + engines to avoid issues with concurrent access when using dask's + multithreaded backend. Returns ------- @@ -100,7 +127,7 @@ def open_dataset(filename_or_obj, group=None, decode_cf=True, concat_characters = False decode_coords = False - def maybe_decode_store(store): + def maybe_decode_store(store, lock=False): ds = conventions.decode_cf( store, mask_and_scale=mask_and_scale, decode_times=decode_times, concat_characters=concat_characters, decode_coords=decode_coords) @@ -129,8 +156,6 @@ def maybe_decode_store(store): else: raise else: - # TODO: automatically fall back to using pydap if given a URL and - # netCDF4 is not available if engine is None: engine = _get_default_engine(filename_or_obj, allow_remote=True) @@ -145,15 +170,17 @@ def maybe_decode_store(store): else: raise ValueError('unrecognized engine for open_dataset: %r' % engine) - + if lock is None: + lock = _default_lock(filename_or_obj, engine) with close_on_error(store): - return maybe_decode_store(store) + return maybe_decode_store(store, lock) else: if engine is not None and engine != 'scipy': raise ValueError('can only read file-like objects with ' "default engine or engine='scipy'") # assume filename_or_obj is a file-like object store = backends.ScipyDataStore(filename_or_obj) + return maybe_decode_store(store) @@ -167,7 +194,7 @@ def close(self): def open_mfdataset(paths, chunks=None, concat_dim=None, preprocess=None, - lock=True, **kwargs): + engine=None, lock=None, **kwargs): """Open multiple files as a single dataset. Experimental. Requires dask to be installed. @@ -191,10 +218,15 @@ def open_mfdataset(paths, chunks=None, concat_dim=None, preprocess=None, want to stack a collection of 2D arrays along a third dimension. preprocess : callable, optional If provided, call this function on each dataset prior to concatenation. - lock : optional + engine : {'netcdf4', 'scipy', 'pydap', 'h5netcdf'}, optional + Engine to use when reading netCDF files. If not provided, the default + engine is chosen based on available dependencies, with a preference for + 'netcdf4'. + lock : False, True or threading.Lock, optional This argument is passed on to :py:func:`dask.array.from_array`. By - default, a lock is used to avoid issues with concurrent access with - dask's multithreaded backend. + default, a per-variable lock is used when reading data from netCDF + files with the netcdf4 and h5netcdf engines to avoid issues with + concurrent access when using dask's multithreaded backend. **kwargs : optional Additional arguments passed on to :py:func:`xray.open_dataset`. @@ -211,11 +243,16 @@ def open_mfdataset(paths, chunks=None, concat_dim=None, preprocess=None, paths = sorted(glob(paths)) if not paths: raise IOError('no files to open') - datasets = [open_dataset(p, **kwargs) for p in paths] + + datasets = [open_dataset(p, engine=engine, **kwargs) for p in paths] + if lock is None: + lock = _default_lock(paths[0], engine) file_objs = [ds._file_obj for ds in datasets] datasets = [ds.chunk(chunks, lock=lock) for ds in datasets] + if preprocess is not None: datasets = [preprocess(ds) for ds in datasets] + combined = auto_combine(datasets, concat_dim=concat_dim) combined._file_obj = _MultiFileCloser(file_objs) return combined diff --git a/xray/test/test_backends.py b/xray/test/test_backends.py index 9fb99e259c9..b2a41338880 100644 --- a/xray/test/test_backends.py +++ b/xray/test/test_backends.py @@ -705,6 +705,9 @@ def test_complex(self): self.assertDatasetEqual(expected, actual) def test_cross_engine_read_write_netcdf4(self): + # Drop dim3, because its labels include strings. These appear to be + # not properly read with python-netCDF4, which converts them into + # unicode instead of leaving them as bytes. data = create_test_data().drop('dim3') data.attrs['foo'] = 'bar' valid_engines = ['netcdf4', 'h5netcdf'] @@ -725,6 +728,7 @@ def test_read_byte_attrs_as_unicode(self): @requires_dask +@requires_scipy @requires_netCDF4 class DaskTest(TestCase): def test_open_mfdataset(self): @@ -757,13 +761,16 @@ def test_preprocess_mfdataset(self): def test_lock(self): original = Dataset({'foo': ('x', np.random.randn(10))}) with create_tmp_file() as tmp: - original.to_netcdf(tmp) + original.to_netcdf(tmp, format='NETCDF3_CLASSIC') with open_dataset(tmp, chunks=10) as ds: task = ds.foo.data.dask[ds.foo.data.name, 0] self.assertIsInstance(task[-1], type(Lock())) with open_mfdataset(tmp) as ds: task = ds.foo.data.dask[ds.foo.data.name, 0] self.assertIsInstance(task[-1], type(Lock())) + with open_mfdataset(tmp, engine='scipy') as ds: + task = ds.foo.data.dask[ds.foo.data.name, 0] + self.assertNotIsInstance(task[-1], type(Lock())) def test_save_mfdataset_roundtrip(self): original = Dataset({'foo': ('x', np.random.randn(10))})