Skip to content

Commit a70c5dd

Browse files
authored
Update visualizations & docs (#189)
* [WIP] Update visualizations * [WIP] Update docs * Updates * Add more files * Updatees * Updates * More updates * docstring * Reorder * Edits * Update split-apply-combine image * edits * Update arrays * rename user stories section
1 parent 3ea081b commit a70c5dd

22 files changed

+20304
-89
lines changed

docs/diagrams/new-blockwise-annotated.svg

+1,185
Loading

docs/diagrams/new-blockwise.svg

+1,563
Loading

docs/diagrams/new-cohorts-annotated.svg

+1,845
Loading

docs/diagrams/new-cohorts.svg

+2,261
Loading

docs/diagrams/new-map-reduce-reindex-False-annotated.svg

+1,887
Loading

docs/diagrams/new-map-reduce-reindex-False.svg

+2,382
Loading

docs/diagrams/new-map-reduce-reindex-True-annotated.svg

+1,794
Loading

docs/diagrams/new-map-reduce-reindex-True.svg

+2,373
Loading

docs/diagrams/new-split-apply-combine-annotated.svg

+2,370
Loading

docs/diagrams/split-apply-combine.svg

+2,342
Loading

docs/source/custom.md renamed to docs/source/aggregations.md

+23-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,27 @@
1-
# Custom reductions
1+
# Aggregations
22

3-
`flox` implements all common reductions provided by `numpy_groupies` in `aggregations.py`.
4-
It also allows you to specify a custom Aggregation (again inspired by dask.dataframe),
3+
`flox` implements all common reductions provided by `numpy_groupies` in `aggregations.py`. Control this by passing
4+
the `func` kwarg:
5+
6+
- `"sum"`, `"nansum"`
7+
- `"prod"`, `"nanprod"`
8+
- `"count"` - number of non-NaN elements by group
9+
- `"mean"`, `"nanmean"`
10+
- `"var"`, `"nanvar"`
11+
- `"std"`, `"nanstd"`
12+
- `"argmin"`
13+
- `"argmax"`
14+
- `"first"`
15+
- `"last"`
16+
17+
18+
```{tip}
19+
We would like to add support for `cumsum`, `cumprod` ([issue](https://github.com/xarray-contrib/flox/issues/91)). Contributions are welcome!
20+
```
21+
22+
## Custom Aggregations
23+
24+
`flox` also allows you to specify a custom Aggregation (again inspired by dask.dataframe),
525
though this might not be fully functional at the moment. See `aggregations.py` for examples.
626

727
See the ["Custom Aggregations"](user-stories/custom-aggregations.ipynb) user story for a more user-friendly example.

docs/source/api.rst

+4-4
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ Functions
99
.. autosummary::
1010
:toctree: generated/
1111

12-
~core.groupby_reduce
12+
groupby_reduce
1313
xarray.xarray_reduce
1414

1515
Rechunking
@@ -18,8 +18,8 @@ Rechunking
1818
.. autosummary::
1919
:toctree: generated/
2020

21-
~core.rechunk_for_blockwise
22-
~core.rechunk_for_cohorts
21+
rechunk_for_blockwise
22+
rechunk_for_cohorts
2323
xarray.rechunk_for_blockwise
2424
xarray.rechunk_for_cohorts
2525

@@ -30,7 +30,7 @@ Visualization
3030
:toctree: generated/
3131

3232
visualize.draw_mesh
33-
visualize.visualize_groups
33+
visualize.visualize_groups_1d
3434
visualize.visualize_cohorts_2d
3535

3636
Aggregation Objects

docs/source/arrays.md

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# Duck Array Support
2+
3+
Aggregating over other array types will work if the array types supports the following methods, [ufunc.reduceat](https://numpy.org/doc/stable/reference/generated/numpy.ufunc.reduceat.html) or [ufunc.at](https://numpy.org/doc/stable/reference/generated/numpy.ufunc.at.html)
4+
5+
6+
| Reduction | `method="numpy"` | `method="flox"` |
7+
|--------------------------------|------------------|-------------------|
8+
| sum, nansum | bincount | add.reduceat |
9+
| mean, nanmean | bincount | add.reduceat |
10+
| var, nanvar | bincount | add.reduceat |
11+
| std, nanstd | bincount | add.reduceat |
12+
| count | bincount | add.reduceat |
13+
| prod | multiply.at | multiply.reduceat |
14+
| max, nanmax, argmax, nanargmax | maximum.at | maximum.reduceat |
15+
| min, nanmin, argmin, nanargmin | minimum.at | minimum.reduceat |

docs/source/conf.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@
4343
]
4444

4545
extlinks = {
46-
"issue": ("https://github.com/xarray-contrib/flox/issues/%s", "GH#"),
47-
"pr": ("https://github.com/xarray-contrib/flox/pull/%s", "GH#"),
46+
"issue": ("https://github.com/xarray-contrib/flox/issues/%s", "GH#%s"),
47+
"pr": ("https://github.com/xarray-contrib/flox/pull/%s", "PR#%s"),
4848
}
4949

5050
templates_path = ["_templates"]
@@ -174,7 +174,7 @@
174174
"numpy": ("https://numpy.org/doc/stable", None),
175175
# "numba": ("https://numba.pydata.org/numba-doc/latest", None),
176176
"dask": ("https://docs.dask.org/en/latest", None),
177-
"xarray": ("http://xarray.pydata.org/en/stable/", None),
177+
"xarray": ("https://docs.xarray.dev/en/stable/", None),
178178
}
179179

180180
autosummary_generate = True

docs/source/engines.md

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
(engines)=
2+
# Engines
3+
4+
`flox` provides multiple options, using the `engine` kwarg, for computing the core GroupBy reduction on numpy or other array types other than dask.
5+
6+
1. `engine="numpy"` wraps `numpy_groupies.aggregate_numpy`. This uses indexing tricks and functions like `np.bincount`, or the ufunc `.at` methods
7+
(.e.g `np.maximum.at`) to provided reasonably performant aggregations.
8+
1. `engine="numba"` wraps `numpy_groupies.aggregate_numba`. This uses `numba` kernels for the core aggregation.
9+
1. `engine="flox"` uses the `ufunc.reduceat` method after first argsorting the array so that all group members occur sequentially. This was copied from
10+
a [gist by Stephan Hoyer](https://gist.github.com/shoyer/f538ac78ae904c936844)
11+
12+
See [](arrays) for more details.
13+
14+
## Tradeoffs
15+
16+
For the common case of reducing a nD array by a 1D array of group labels (e.g. `groupby("time.month")`), `engine="flox"` *can* be faster.
17+
The reason is that `numpy_groupies` converts all groupby problems to a 1D problem, this can involve [some overhead](https://github.com/ml31415/numpy-groupies/pull/46).
18+
It is possible to optimize this a bit in `flox` or `numpy_groupies`, but the work has not been done yet.
19+
The advantage of `engine="numpy"` is that it tends to work for more array types, since it appears to be more common to implement `np.bincount`, and not `np.add.reduceat`.
20+
21+
```{tip}
22+
Other potential engines we could add are [`numbagg`](https://github.com/numbagg/numbagg) ([stalled PR here](https://github.com/xarray-contrib/flox/pull/72)) and [`datashader`](https://github.com/xarray-contrib/flox/issues/142).
23+
Both use numba for high-performance aggregations. Contributions or discussion is very welcome!
24+
```

docs/source/implementation.md

+134-44
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,29 @@
1-
# Algorithms
1+
(algorithms)=
2+
# Parallel Algorithms
23

3-
`flox` outsources the core GroupBy operation to the vectorized implementations in
4-
[numpy_groupies](https://github.com/ml31415/numpy-groupies). Constructing
5-
an efficient groupby reduction with dask is hard, and depends on how the
6-
groups are distributed amongst the blocks of an array. `flox` implements 4 strategies for
7-
grouped reductions, each is appropriate for a particular distribution of groups
8-
among the blocks of a dask array.
4+
`flox` outsources the core GroupBy operation to the vectorized implementations controlled by the
5+
[`engine` kwarg](engines.md). Applying these implementations on a parallel array type like dask
6+
can be hard. Performance strongly depends on how the groups are distributed amongst the blocks of an array.
97

10-
Switch between the various strategies by passing `method` to either {py:func}`flox.core.groupby_reduce`
11-
or `xarray_reduce`.
8+
`flox` implements 4 strategies for grouped reductions, each is appropriate for a particular distribution of groups
9+
among the blocks of a dask array. Switch between the various strategies by passing `method`
10+
and/or `reindex` to either {py:func}`flox.groupby_reduce` or {py:func}`flox.xarray.xarray_reduce`.
1211

12+
Your options are:
13+
1. [`method="map-reduce"` with `reindex=False`](map-reindex-false)
14+
1. [`method="map-reduce"` with `reindex=True`](map-reindex-True)
15+
1. [`method="blockwise"`](method-blockwise)
16+
1. [`method="cohorts"`](method-cohorts)
1317

14-
First we describe xarray's current strategy
18+
The most appropriate strategy for your problem will depend on the chunking of your dataset,
19+
and the distribution of group labels across those chunks.
1520

21+
```{tip}
22+
Currently these strategies are implemented for dask. We would like to generalize to other parallel array types
23+
as appropriate (e.g. Ramba, cubed, arkouda). Please open an issue to discuss if you are interested.
24+
```
25+
26+
(xarray-split)=
1627
## Background: Xarray's current GroupBy strategy
1728

1829
Xarray's current strategy is to find all unique group labels, index out each group,
@@ -21,7 +32,10 @@ labels (i.e. you cannot use this strategy to group by a dask array).
2132

2233
Schematically, this looks like (colors indicate group labels; separated groups of colors
2334
indicate different blocks of an array):
24-
![xarray-current-strategy](../diagrams/split-reduce.png)
35+
```{image} ../diagrams/new-split-apply-combine-annotated.svg
36+
:alt: xarray-current-strategy
37+
:width: 100%
38+
```
2539

2640
The first step is to extract all members of a group, which involves a *lot* of
2741
communication and is quite expensive (in dataframe terminology, this is a "shuffle").
@@ -30,89 +44,165 @@ big datasets.
3044

3145
## `method="map-reduce"`
3246

33-
The first idea is to use the "map-reduce" strategy (inspired by `dask.dataframe`).
3447

3548
![map-reduce-strategy-schematic](/../diagrams/map-reduce.png)
3649

50+
The "map-reduce" strategy is inspired by `dask.dataframe.groupby`).
3751
The GroupBy reduction is first applied blockwise. Those intermediate results are
3852
combined by concatenating to form a new array which is then reduced
3953
again. The combining of intermediate results uses dask\'s `_tree_reduce`
4054
till all group results are in one block. At that point the result is
4155
\"finalized\" and returned to the user.
4256

43-
*Tradeoffs*:
44-
1. Allows grouping by a dask array so group labels need not be known at graph construction
45-
time.
46-
1. Works well when either the initial blockwise reduction is effective, or if the
47-
reduction at the first combine step is effective. "effective" means we actually
48-
reduce values and release some memory.
57+
### General Tradeoffs
58+
1. This approach works well when either the initial blockwise reduction is effective, or if the
59+
reduction at the first combine step is effective. Here "effective" means we have multiple members of a single
60+
group in a block so the blockwise application of groupby-reduce actually reduces values and releases some memory.
61+
2. One downside is that the final result will only have one chunk along the new group axis.
62+
3. We have two choices for how to construct the intermediate arrays. See below.
63+
64+
(map-reindex-True)=
65+
### `reindex=True`
66+
67+
If we know all the group labels, we can do so right at the blockwise step (`reindex=True`). This matches `dask.array.histogram` and
68+
`xhistogram`, where the bin edges, or group labels oof the output, are known. The downside is the potential of large memory use
69+
if number of output groups is much larger than number of groups in a block.
70+
71+
```{image} ../diagrams/new-map-reduce-reindex-True-annotated.svg
72+
:alt: map-reduce-reindex-True-strategy-schematic
73+
:width: 100%
74+
```
75+
76+
(map-reindex-False)=
77+
### `reindex=False`
78+
We can `reindex` at the combine stage to groups present in the blocks being combined (`reindex=False`). This can limit memory use at the cost
79+
of a performance reduction due to extra copies of the intermediate data during reindexing.
80+
81+
```{image} ../diagrams/new-map-reduce-reindex-False-annotated.svg
82+
:alt: map-reduce-reindex-True-strategy-schematic
83+
:width: 100%
84+
```
85+
86+
This approach allows grouping by a dask array so group labels can be discovered at compute time, similar to `dask.dataframe.groupby`.
4987

88+
### Example
89+
90+
For example, consider `groupby("time.month")` with monthly frequency data and chunksize of 4 along `time`.
91+
![cohorts-schematic](/../diagrams/cohorts-month-chunk4.png)
92+
With `reindex=True`, each block will become 3x its original size at the blockwise step: input blocks have 4 timesteps while output block
93+
has a value for all 12 months. One could use `reindex=False` to control memory usage but also see [`method="cohorts"`](method-cohorts) below.
94+
95+
96+
(method-blockwise)=
5097
## `method="blockwise"`
5198

52-
One case where `"map-reduce"` doesn't work well is the case of "resampling" reductions. An
53-
example here is resampling from daily frequency to monthly frequency data: `da.resample(time="M").mean()`
99+
One case where `method="map-reduce"` doesn't work well is the case of "resampling" reductions. An
100+
example here is resampling from daily frequency to monthly frequency data: `da.resample(time="M").mean()`
54101
For resampling type reductions,
55102
1. Group members occur sequentially (all days in January 2001 occur one after the other)
56-
2. All groups are roughly equal length (31 days in January but 28 in most Februaries)
103+
2. All groups not of exactly equal length (31 days in January but 28 in most Februaries)
57104
3. All members in a group are next to each other (if the time series is sorted, which it
58105
usually is).
106+
4. Because there can be a large number of groups, concatenating results for all groups in a single chunk could be catastrophic.
59107

60-
In this case, it makes sense to use `dask.dataframe` resample strategy which is to rechunk
108+
In this case, it makes sense to use `dask.dataframe` resample strategy which is to rechunk using {py:func}`flox.rechunk_for_blockwise`
61109
so that all members of a group are in a single block. Then, the groupby operation can be applied blockwise.
62110

63-
![blockwise-strategy-schematic](/../diagrams/blockwise.png)
111+
```{image} ../diagrams/new-blockwise-annotated.svg
112+
:alt: blockwise-strategy-schematic
113+
:width: 100%
114+
```
64115

65116
*Tradeoffs*
66117
1. Only works for certain groupings.
67118
1. Group labels must be known at graph construction time, so this only works for numpy arrays
68119
1. Currently the rechunking is only implemented for 1D arrays (being motivated by time resampling),
69120
but a nD generalization seems possible.
121+
1. Only can use the `blockwise` strategy for grouping by `nD` arrays.
70122
1. Works better when multiple groups are already in a single block; so that the intial
71123
rechunking only involves a small amount of communication.
72124

125+
(method-cohorts)=
73126
## `method="cohorts"`
74127

75-
We can combine all of the above ideas for cases where members from different groups tend to occur close to each other.
128+
The `map-reduce` strategy is quite effective but can involve some unnecessary communication. It can be possible to exploit
129+
patterns in how group labels are distributed across chunks (similar to `method="blockwise"` above). Two cases are illustrative:
130+
1. Groups labels can be *approximately-periodic*: e.g. `time.dayofyear` (period 365 or 366) or `time.month` (period 12).
131+
Consider our earlier example, `groupby("time.month")` with monthly frequency data and chunksize of 4 along `time`.
132+
![cohorts-schematic](/../diagrams/cohorts-month-chunk4.png)
133+
Because a chunksize of 4 evenly divides the number of groups (12) all we need to do is index out blocks
134+
0, 3, 7 and then apply the `"map-reduce"` strategy to form the final result for months Jan-Apr. Repeat for the
135+
remaining groups of months (May-Aug; Sep-Dec) and then concatenate.
136+
137+
2. Groups can be *spatially localized* like the blockwise case above, for example grouping by country administrative boundaries like
138+
counties or districts. In this case, concatenating the result for the northwesternmost county or district and the southeasternmost
139+
district can involve a lot of wasteful communication (again depending on chunking).
140+
141+
For such cases, we can adapt xarray's shuffling or subsetting strategy by indexing out "cohorts" or group labels
142+
that tend to occur next to each other.
143+
144+
### A motivating example : time grouping
145+
76146
One example is the construction of "climatologies" which is a climate science term for something like `groupby("time.month")`
77147
("monthly climatology") or `groupby("time.dayofyear")` ("daily climatology"). In these cases,
78148
1. Groups occur sequentially (day 2 is always after day 1; and February is always after January)
79149
2. Groups are approximately periodic (some years have 365 days and others have 366)
80150

81-
The idea here is to copy xarray's subsetting strategy but instead index out "cohorts" or group labels
82-
that tend to occur next to each other.
83-
84-
Consider this example of monthly average data; where 4 months are present in a single block (i.e. chunksize=4)
151+
Consider our earlier example, `groupby("time.month")` with monthly frequency data and chunksize of 4 along `time`.
85152
![cohorts-schematic](/../diagrams/cohorts-month-chunk4.png)
86153

87-
Because a chunksize of 4 evenly divides the number of groups (12) all we need to do is index out blocks
154+
With `method="map-reduce", reindex=True`, each block will become 3x its original size at the blockwise step: input blocks have 4 timesteps while output block
155+
has a value for all 12 months. Note that the blockwise groupby-reduction *does not reduce* the data since there is only one element in each
156+
group. In addition, since `map-reduce` will make the final result have only one chunk of size 12 along the new `month`
157+
dimension, the final result has chunk sizes 3x that of the input, which may not be ideal.
158+
159+
However, because a chunksize of 4 evenly divides the number of groups (12) all we need to do is index out blocks
88160
0, 3, 7 and then apply the `"map-reduce"` strategy to form the final result for months Jan-Apr. Repeat for the
89-
remaining groups of months (May-Aug; Sep-Dec) and then concatenate.
161+
remaining groups of months (May-Aug; Sep-Dec) and then concatenate. This is the essence of `method="cohorts"`
162+
163+
164+
### Summary
165+
166+
We can generalize this idea for more complicated problems (inspired by the ``split_out``kwarg in `dask.dataframe.groupby`)
167+
We first apply the groupby-reduction blockwise, then split and reindex blocks to create a new array with which we complete the reduction
168+
using `map-reduce`. Because the split or shuffle step occurs after the blockwise reduction, we *sometimes* communicate a significantly smaller
169+
amount of data than if we split or shuffled the input array.
170+
171+
```{image} /../diagrams/new-cohorts-annotated.svg
172+
:alt: cohorts-strategy-schematic
173+
:width: 100%
174+
```
175+
176+
### Tradeoffs
177+
1. Group labels must be known at graph construction time, so this only works for numpy arrays.
178+
1. This does require more tasks and a more complicated graph, but the communication overhead can be significantly lower.
179+
1. The detection of "cohorts" is currrently slow but could be improved.
180+
1. The extra effort of detecting cohorts and mutiple copying of intermediate blocks may be worthwhile only if the chunk sizes are small
181+
relative to the approximate period of group labels, or small relative to the size of spatially localized groups.
182+
183+
184+
### Example : sensitivity to chunking
185+
186+
One annoyance is that if the chunksize doesn't evenly divide the number of groups, we still end up splitting a number of chunks.
187+
Consider our earlier example, `groupby("time.month")` with monthly frequency data and chunksize of 4 along `time`.
188+
![cohorts-schematic](/../diagrams/cohorts-month-chunk4.png)
90189

91190
`flox` can find these cohorts, below it identifies the cohorts with labels `1,2,3,4`; `5,6,7,8`, and `9,10,11,12`.
92191
``` python
93-
>>> flox.core.find_group_cohorts(labels, array.chunks[-1]))
192+
>>> flox.find_group_cohorts(labels, array.chunks[-1]))
94193
[[[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12]] # 3 cohorts
95194
```
96-
For each cohort, it counts the number of blocks that need to be reduced. If `1` then it applies the reduction blockwise.
97-
If > 1; then it uses `"map-reduce"`.
98195

99-
One annoyance is that if the chunksize doesn't evenly divide the number of groups, we still end up splitting a number of chunks.
100-
For example, when `chunksize=5`
196+
Now consider `chunksize=5`.
101197
![cohorts-schematic](/../diagrams/cohorts-month-chunk5.png)
102198

103199
``` python
104200
>>> flox.core.find_group_cohorts(labels, array.chunks[-1]))
105201
[[1], [2, 3], [4, 5], [6], [7, 8], [9, 10], [11], [12]] # 8 cohorts
106202
```
107-
We find 8 cohorts (note the original xarray strategy is equivalent to constructing 12 cohorts).
108203

109-
It's possible that some initial rechunking makes the situation better (just rechunk from 5-4), but it isn't an obvious improvement.
204+
We find 8 cohorts (note the original xarray strategy is equivalent to constructing 12 cohorts).
205+
In this case, it seems to better to rechunk to a size of `4` along `time`.
110206
If you have ideas for improving this case, please open an issue.
111207

112-
*Tradeoffs*
113-
1. Generalizes well; when there's exactly one groups per chunk, this replicates Xarray's
114-
strategy which is optimal. For resampling type reductions, as long as the array
115-
is chunked appropriately ({py:func}`flox.core.rechunk_for_blockwise`, {py:func}`flox.xarray.rechunk_for_blockwise`), `method="cohorts"` is equivalent to `method="blockwise"`!
116-
1. Group labels must be known at graph construction time, so this only works for numpy arrays
117-
1. Currenltly implemented for grouping by 1D arrays. An nD generalization seems possible,
118-
but hard?
208+
### Example : spatial grouping

0 commit comments

Comments
 (0)