Skip to content

Initial minimal working Cubed example for "map-reduce" #352

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ci/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ dependencies:
- cachey
- cftime
- codecov
- cubed>=0.14.2
- dask-core
- pandas
- numpy>=1.22
Expand Down
136 changes: 133 additions & 3 deletions flox/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
)
from .cache import memoize
from .xrutils import (
is_chunked_array,
is_duck_array,
is_duck_cubed_array,
is_duck_dask_array,
isnull,
module_available,
Expand All @@ -63,10 +65,11 @@
except (ModuleNotFoundError, ImportError):
Unpack: Any # type: ignore[no-redef]

import cubed.Array as CubedArray
import dask.array.Array as DaskArray
from dask.typing import Graph

T_DuckArray = Union[np.ndarray, DaskArray] # Any ?
T_DuckArray = Union[np.ndarray, DaskArray, CubedArray] # Any ?
T_By = T_DuckArray
T_Bys = tuple[T_By, ...]
T_ExpectIndex = pd.Index
Expand Down Expand Up @@ -95,7 +98,7 @@


IntermediateDict = dict[Union[str, Callable], Any]
FinalResultsDict = dict[str, Union["DaskArray", np.ndarray]]
FinalResultsDict = dict[str, Union["DaskArray", "CubedArray", np.ndarray]]
FactorProps = namedtuple("FactorProps", "offset_group nan_sentinel nanmask")

# This dummy axis is inserted using np.expand_dims
Expand Down Expand Up @@ -1718,6 +1721,109 @@ def dask_groupby_agg(
return (result, groups)


def cubed_groupby_agg(
array: CubedArray,
by: T_By,
agg: Aggregation,
expected_groups: pd.Index | None,
axis: T_Axes = (),
fill_value: Any = None,
method: T_Method = "map-reduce",
reindex: bool = False,
engine: T_Engine = "numpy",
sort: bool = True,
chunks_cohorts=None,
) -> tuple[CubedArray, tuple[np.ndarray | CubedArray]]:
import cubed
import cubed.core.groupby

# I think _tree_reduce expects this
assert isinstance(axis, Sequence)
assert all(ax >= 0 for ax in axis)

inds = tuple(range(array.ndim))

by_input = by

# Unifying chunks is necessary for argreductions.
# We need to rechunk before zipping up with the index
# let's always do it anyway
if not is_chunked_array(by):
# chunk numpy arrays like the input array
chunks = tuple(array.chunks[ax] if by.shape[ax] != 1 else (1,) for ax in range(-by.ndim, 0))

by = cubed.from_array(by, chunks=chunks, spec=array.spec)
_, (array, by) = cubed.core.unify_chunks(array, inds, by, inds[-by.ndim :])

# Cubed's groupby_reduction handles the generation of "intermediates", and the
# "map-reduce" combination step, so we don't have to do that here.
# Only the equivalent of "_simple_combine" is supported, there is no
# support for "_grouped_combine".
labels_are_unknown = is_chunked_array(by_input) and expected_groups is None
do_simple_combine = not _is_arg_reduction(agg) and not labels_are_unknown

assert do_simple_combine
assert method == "map-reduce"
assert expected_groups is not None
assert reindex is True
assert len(axis) == 1 # one axis/grouping

def _groupby_func(a, by, axis, intermediate_dtype, num_groups):
blockwise_method = partial(
_get_chunk_reduction(agg.reduction_type),
func=agg.chunk,
fill_value=agg.fill_value["intermediate"],
dtype=agg.dtype["intermediate"],
reindex=reindex,
user_dtype=agg.dtype["user"],
axis=axis,
expected_groups=expected_groups,
engine=engine,
sort=sort,
)
out = blockwise_method(a, by)
# Convert dict to one that cubed understands, dropping groups since they are
# known, and the same for every block.
return {f"f{idx}": intermediate for idx, intermediate in enumerate(out["intermediates"])}

def _groupby_combine(a, axis, dummy_axis, dtype, keepdims):
# this is similar to _simple_combine, except the dummy axis and concatenation is handled by cubed
# only combine over the dummy axis, to preserve grouping along 'axis'
dtype = dict(dtype)
out = {}
for idx, combine in enumerate(agg.simple_combine):
field = f"f{idx}"
out[field] = combine(a[field], axis=dummy_axis, keepdims=keepdims)
return out

def _groupby_aggregate(a):
# Convert cubed dict to one that _finalize_results works with
results = {"groups": expected_groups, "intermediates": a.values()}
out = _finalize_results(results, agg, axis, expected_groups, fill_value, reindex)
return out[agg.name]

# convert list of dtypes to a structured dtype for cubed
intermediate_dtype = [(f"f{i}", dtype) for i, dtype in enumerate(agg.dtype["intermediate"])]
dtype = agg.dtype["final"]
num_groups = len(expected_groups)

result = cubed.core.groupby.groupby_reduction(
array,
by,
func=_groupby_func,
combine_func=_groupby_combine,
aggregate_func=_groupby_aggregate,
axis=axis,
intermediate_dtype=intermediate_dtype,
dtype=dtype,
num_groups=num_groups,
)

groups = (expected_groups.to_numpy(),)

return (result, groups)


def _collapse_blocks_along_axes(reduced: DaskArray, axis: T_Axes, group_chunks) -> DaskArray:
import dask.array
from dask.highlevelgraph import HighLevelGraph
Expand Down Expand Up @@ -2240,6 +2346,7 @@ def groupby_reduce(
nax = len(axis_)

has_dask = is_duck_dask_array(array) or is_duck_dask_array(by_)
has_cubed = is_duck_cubed_array(array) or is_duck_cubed_array(by_)

if _is_first_last_reduction(func):
if has_dask and nax != 1:
Expand Down Expand Up @@ -2302,7 +2409,30 @@ def groupby_reduce(
kwargs["engine"] = _choose_engine(by_, agg) if engine is None else engine

groups: tuple[np.ndarray | DaskArray, ...]
if not has_dask:
if has_cubed:
if method is None:
method = "map-reduce"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will also need a assert reindex is True in _validate_reindex

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, not sure where that would go exactly.


if method != "map-reduce":
raise NotImplementedError(
"Reduction for Cubed arrays is only implemented for method 'map-reduce'."
)

partial_agg = partial(cubed_groupby_agg, **kwargs)

result, groups = partial_agg(
array,
by_,
expected_groups=expected_,
agg=agg,
reindex=reindex,
method=method,
sort=sort,
)

return (result, groups)

elif not has_dask:
results = _reduce_blockwise(
array, by_, agg, expected_groups=expected_, reindex=reindex, sort=sort, **kwargs
)
Expand Down
20 changes: 18 additions & 2 deletions flox/xrutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,18 @@ def is_duck_array(value: Any) -> bool:
hasattr(value, "ndim")
and hasattr(value, "shape")
and hasattr(value, "dtype")
and hasattr(value, "__array_function__")
and hasattr(value, "__array_ufunc__")
and (
(hasattr(value, "__array_function__") and hasattr(value, "__array_ufunc__"))
or hasattr(value, "__array_namespace__")
)
)


def is_chunked_array(x) -> bool:
"""True if dask or cubed"""
return is_duck_dask_array(x) or (is_duck_array(x) and hasattr(x, "chunks"))


def is_dask_collection(x):
try:
import dask
Expand All @@ -56,6 +63,15 @@ def is_duck_dask_array(x):
return is_duck_array(x) and is_dask_collection(x)


def is_duck_cubed_array(x):
try:
import cubed

return is_duck_array(x) and isinstance(x, cubed.Array)
except ImportError:
return False


class ReprObject:
"""Object that prints as the given value, for use with sentinel values."""

Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ module=[
"asv_runner.*",
"cachey",
"cftime",
"cubed.*",
"dask.*",
"importlib_metadata",
"numba",
Expand Down
1 change: 1 addition & 0 deletions tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def LooseVersion(vstring):


has_cftime, requires_cftime = _importorskip("cftime")
has_cubed, requires_cubed = _importorskip("cubed")
has_dask, requires_dask = _importorskip("dask")
has_numba, requires_numba = _importorskip("numba")
has_numbagg, requires_numbagg = _importorskip("numbagg")
Expand Down
49 changes: 49 additions & 0 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@
SCIPY_STATS_FUNCS,
assert_equal,
assert_equal_tuple,
has_cubed,
has_dask,
raise_if_dask_computes,
requires_cubed,
requires_dask,
)

Expand All @@ -61,6 +63,10 @@ def dask_array_ones(*args):
return None


if has_cubed:
import cubed


DEFAULT_QUANTILE = 0.9

if TYPE_CHECKING:
Expand Down Expand Up @@ -477,6 +483,49 @@ def test_groupby_agg_dask(func, shape, array_chunks, group_chunks, add_nan, dtyp
assert_equal(expected, actual)


@requires_cubed
@pytest.mark.parametrize("reindex", [True])
@pytest.mark.parametrize("func", ALL_FUNCS)
@pytest.mark.parametrize("add_nan", [False, True])
@pytest.mark.parametrize(
"shape, array_chunks, group_chunks",
[
((12,), (3,), 3), # form 1
],
)
def test_groupby_agg_cubed(func, shape, array_chunks, group_chunks, add_nan, engine, reindex):
"""Tests groupby_reduce with cubed arrays against groupby_reduce with numpy arrays"""

if func in ["first", "last"] or func in BLOCKWISE_FUNCS:
pytest.skip()

if "arg" in func and (engine in ["flox", "numbagg"] or reindex):
pytest.skip()

array = cubed.array_api.ones(shape, chunks=array_chunks)

labels = np.array([0, 0, 2, 2, 2, 1, 1, 2, 2, 1, 1, 0])
if add_nan:
labels = labels.astype(float)
labels[:3] = np.nan # entire block is NaN when group_chunks=3
labels[-2:] = np.nan

kwargs = dict(
func=func,
expected_groups=[0, 1, 2],
fill_value=False if func in ["all", "any"] else 123,
reindex=reindex,
)

expected, _ = groupby_reduce(array.compute(), labels, engine="numpy", **kwargs)
actual, _ = groupby_reduce(array.compute(), labels, engine=engine, **kwargs)
assert_equal(actual, expected)

# TODO: raise_if_cubed_computes
actual, _ = groupby_reduce(array, labels, engine=engine, **kwargs)
assert_equal(expected, actual)


def test_numpy_reduce_axis_subset(engine):
# TODO: add NaNs
by = labels2d
Expand Down