Skip to content

Commit 21a792d

Browse files
authored
Switch to shared Lock (SerializableLock if possible) for reading/writing (#1179)
* Switch to shared Lock (SerializableLock if possible) for reading and writing Fixes #1172 The serializable lock will be useful for dask.distributed or multi-processing (xref #798, #1173, among others). * Test serializable lock * Use conda-forge for builds * remove broken/fragile .test_lock
1 parent 3001ee1 commit 21a792d

9 files changed

+27
-29
lines changed

ci/requirements-py27-netcdf4-dev.yml

+2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
name: test_env
2+
channels:
3+
- conda-forge
24
dependencies:
35
- python=2.7
46
- cython

ci/requirements-py27-pydap.yml

+2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
name: test_env
2+
channels:
3+
- conda-forge
24
dependencies:
35
- python=2.7
46
- dask

ci/requirements-py35.yml

+2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
name: test_env
2+
channels:
3+
- conda-forge
24
dependencies:
35
- python=3.5
46
- cython

doc/dask.rst

+1-1
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ larger chunksizes.
225225
226226
import os
227227
os.remove('example-data.nc')
228-
228+
229229
Optimization Tips
230230
-----------------
231231

doc/whats-new.rst

+5-1
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ Breaking changes
6262
By `Guido Imperiale <https://github.com/crusaderky>`_ and
6363
`Stephan Hoyer <https://github.com/shoyer>`_.
6464
- Pickling a ``Dataset`` or ``DataArray`` linked to a file on disk no longer
65-
caches its values into memory before pickling :issue:`1128`. Instead, pickle
65+
caches its values into memory before pickling (:issue:`1128`). Instead, pickle
6666
stores file paths and restores objects by reopening file references. This
6767
enables preliminary, experimental use of xarray for opening files with
6868
`dask.distributed <https://distributed.readthedocs.io>`_.
@@ -227,6 +227,10 @@ Bug fixes
227227
- Fixed a bug with facetgrid (the ``norm`` keyword was ignored, :issue:`1159`).
228228
By `Fabien Maussion <https://github.com/fmaussion>`_.
229229

230+
- Resolved a concurrency bug that could cause Python to crash when
231+
simultaneously reading and writing netCDF4 files with dask (:issue:`1172`).
232+
By `Stephan Hoyer <https://github.com/shoyer>`_.
233+
230234
- Fix to make ``.copy()`` actually copy dask arrays, which will be relevant for
231235
future releases of dask in which dask arrays will be mutable (:issue:`1180`).
232236

xarray/backends/api.py

+3-7
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
from __future__ import print_function
44
import gzip
55
import os.path
6-
import threading
76
from distutils.version import StrictVersion
87
from glob import glob
98
from io import BytesIO
@@ -12,7 +11,7 @@
1211
import numpy as np
1312

1413
from .. import backends, conventions
15-
from .common import ArrayWriter
14+
from .common import ArrayWriter, GLOBAL_LOCK
1615
from ..core import indexing
1716
from ..core.combine import auto_combine
1817
from ..core.utils import close_on_error, is_remote_uri
@@ -55,9 +54,6 @@ def _normalize_path(path):
5554
return os.path.abspath(os.path.expanduser(path))
5655

5756

58-
_global_lock = threading.Lock()
59-
60-
6157
def _default_lock(filename, engine):
6258
if filename.endswith('.gz'):
6359
lock = False
@@ -71,9 +67,9 @@ def _default_lock(filename, engine):
7167
else:
7268
# TODO: identify netcdf3 files and don't use the global lock
7369
# for them
74-
lock = _global_lock
70+
lock = GLOBAL_LOCK
7571
elif engine in {'h5netcdf', 'pynio'}:
76-
lock = _global_lock
72+
lock = GLOBAL_LOCK
7773
else:
7874
lock = False
7975
return lock

xarray/backends/common.py

+11-3
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
from __future__ import division
33
from __future__ import print_function
44
import numpy as np
5-
import itertools
65
import logging
76
import time
87
import traceback
@@ -12,7 +11,12 @@
1211

1312
from ..conventions import cf_encoder
1413
from ..core.utils import FrozenOrderedDict
15-
from ..core.pycompat import iteritems, dask_array_type, OrderedDict
14+
from ..core.pycompat import iteritems, dask_array_type
15+
16+
try:
17+
from dask.utils import SerializableLock as Lock
18+
except ImportError:
19+
from threading import Lock
1620

1721
# Create a logger object, but don't add any handlers. Leave that to user code.
1822
logger = logging.getLogger(__name__)
@@ -21,6 +25,10 @@
2125
NONE_VAR_NAME = '__values__'
2226

2327

28+
# dask.utils.SerializableLock if available, otherwise just a threading.Lock
29+
GLOBAL_LOCK = Lock()
30+
31+
2432
def _encode_variable_name(name):
2533
if name is None:
2634
name = NONE_VAR_NAME
@@ -150,7 +158,7 @@ def sync(self):
150158
import dask.array as da
151159
import dask
152160
if StrictVersion(dask.__version__) > StrictVersion('0.8.1'):
153-
da.store(self.sources, self.targets, lock=threading.Lock())
161+
da.store(self.sources, self.targets, lock=GLOBAL_LOCK)
154162
else:
155163
da.store(self.sources, self.targets)
156164
self.sources = []

xarray/test/test_backends.py

-14
Original file line numberDiff line numberDiff line change
@@ -1034,20 +1034,6 @@ def preprocess(ds):
10341034
with open_mfdataset(tmp, preprocess=preprocess) as actual:
10351035
self.assertDatasetIdentical(expected, actual)
10361036

1037-
def test_lock(self):
1038-
original = Dataset({'foo': ('x', np.random.randn(10))})
1039-
with create_tmp_file() as tmp:
1040-
original.to_netcdf(tmp, format='NETCDF3_CLASSIC')
1041-
with open_dataset(tmp, chunks=10) as ds:
1042-
task = ds.foo.data.dask[ds.foo.data.name, 0]
1043-
self.assertIsInstance(task[-1], type(Lock()))
1044-
with open_mfdataset(tmp) as ds:
1045-
task = ds.foo.data.dask[ds.foo.data.name, 0]
1046-
self.assertIsInstance(task[-1], type(Lock()))
1047-
with open_mfdataset(tmp, engine='scipy') as ds:
1048-
task = ds.foo.data.dask[ds.foo.data.name, 0]
1049-
self.assertNotIsInstance(task[-1], type(Lock()))
1050-
10511037
def test_save_mfdataset_roundtrip(self):
10521038
original = Dataset({'foo': ('x', np.random.randn(10))})
10531039
datasets = [original.isel(x=slice(5)),

xarray/test/test_distributed.py

+1-3
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,7 @@ def test_dask_distributed_integration_test(loop, engine):
2828
original = create_test_data()
2929
with create_tmp_file() as filename:
3030
original.to_netcdf(filename, engine=engine)
31-
# TODO: should be able to serialize locks
32-
restored = xr.open_dataset(filename, chunks=3, lock=False,
33-
engine=engine)
31+
restored = xr.open_dataset(filename, chunks=3, engine=engine)
3432
assert isinstance(restored.var1.data, da.Array)
3533
computed = restored.compute()
3634
assert_dataset_allclose(original, computed)

0 commit comments

Comments
 (0)