Skip to content

Expose xarray's h5py serialization capabilites as public API? #4242

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
rabernat opened this issue Jul 21, 2020 · 5 comments
Open

Expose xarray's h5py serialization capabilites as public API? #4242

rabernat opened this issue Jul 21, 2020 · 5 comments

Comments

@rabernat
Copy link
Contributor

Xarray has a magic ability to serialize h5py datasets. We should expose this somehow and allow it to be used outside of xarray.

Consider the following example:

import s3fs
import h5py
import dask.array as dsa
import xarray as xr
import cloudpickle

url = 'noaa-goes16/ABI-L2-RRQPEF/2020/001/00/OR_ABI-L2-RRQPEF-M6_G16_s20200010000216_e20200010009524_c20200010010034.nc'
fs = s3fs.S3FileSystem(anon=True)
f = fs.open(url)
ds = h5py.File(f, mode='r')
data = dsa.from_array(ds['RRQPE'])
_ = cloudpickle.dumps(data)

This raises TypeError: h5py objects cannot be pickled.

However, if I read the file with xarray...

ds = xr.open_dataset(f, chunks={})
data = ds['RRQPE'].data
_ = cloudpickle.dumps(data)

It works just fine. This has come up in several places (e.g. fsspec/s3fs#337, dask/distributed#2787).

It seems like the ability to pickle these arrays is broadly useful, beyond xarray.

  1. How does our magic work?
  2. What would it look like to break this magic out and expose it as public API (or inside another package)
@shoyer
Copy link
Member

shoyer commented Jul 25, 2020

The secret is our CachingFileManager object:
https://github.com/pydata/xarray/blob/b1c7e315e8a18e86c5751a0aa9024d41a42ca5e8/xarray/backends/file_manager.py

This class is essential to writing a performant / serializable backend for xarray/dask, so we definitely should expose it as public API as part of our backends refactor. I would not object to breaking it out into another package if someone is interested enough to make that happen.

@stale
Copy link

stale bot commented Apr 17, 2022

In order to maintain a list of currently relevant issues, we mark issues as stale after a period of inactivity

If this issue remains relevant, please comment here or remove the stale label; otherwise it will be marked as closed automatically

@jakirkham
Copy link

FWIW this sounds similar to what h5pickle does. Maybe it is worth improving that package with whatever logic Xarray has?

@ilan-gold
Copy link
Contributor

I wound be extremely interested in, at the minimum, documenting this publicly. For example, this is what I think should work based on looking at the code

import dask.array as da
import dask.distributed as dd
import h5py
import numpy as np
from xarray.backends import CachingFileManager

X = np.random.randn(100, 100)
manager = CachingFileManager(h5py.File, 'data.h5', mode='w')
manager.acquire().create_dataset("X", (100,100), 'f', data=X)
with dd.LocalCluster(n_workers=1,threads_per_worker=1) as cluster:
    with dd.Client(cluster) as client:
        with manager.acquire_context(needs_lock=False) as root: # or True
            X_h5 = root['X']
            da.from_array(X_h5)[...].compute()

Or something along these lines but I am not sure really how to use CachingFileManager properly so don't know. I would like to be able to use lazily loaded hdf5 files with dask distributed.

Thanks!

@ilan-gold
Copy link
Contributor

Update:

import dask.array as da
import dask.distributed as dd
import h5py
import numpy as np
from xarray.backends import CachingFileManager

chunksize = 10
numchunks = 10
size = (chunksize * numchunks, chunksize * numchunks)
X = np.random.randn(*size)
h5py.File('data.h5', 'w').create_dataset("X", size, 'f', data=X)
manager = CachingFileManager(h5py.File, 'data.h5', mode='r')
def get_chunk(block_id=None):
    with manager.acquire_context(needs_lock=False) as f:
        x = block_id[0] * chunksize
        y = block_id[1] * chunksize
        chunk = f['X'][x: x + chunksize, y : y + chunksize]
    return chunk
with dd.LocalCluster(n_workers=1,threads_per_worker=1) as cluster:
    with dd.Client(cluster) as client:
        res = da.map_blocks(get_chunk, chunks=((chunksize, )* numchunks, (chunksize,) * numchunks), meta=np.array([]), dtype=X.dtype)[...].compute()
print(res)

works

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants