Skip to content

Commit cdc68cc

Browse files
authored
AMM high level documentation (#5456)
1 parent 33d83bc commit cdc68cc

File tree

6 files changed

+255
-10
lines changed

6 files changed

+255
-10
lines changed

distributed/active_memory_manager.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -304,9 +304,9 @@ def run(
304304
You may optionally retrieve which worker it was decided the key will be
305305
replicated to or dropped from, as follows:
306306
307-
```python
308-
choice = yield "replicate", ts, None
309-
```
307+
.. code-block:: python
308+
309+
choice = (yield "replicate", ts, None)
310310
311311
``choice`` is either a WorkerState or None; the latter is returned if the
312312
ActiveMemoryManager chose to disregard the request.

docs/source/active_memory_manager.rst

Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
Active Memory Manager
2+
=====================
3+
The Active Memory Manager, or *AMM*, is an experimental daemon that optimizes memory
4+
usage of workers across the Dask cluster. It is disabled by default.
5+
6+
7+
Memory imbalance and duplication
8+
--------------------------------
9+
Whenever a Dask task returns data, it is stored on the worker that executed the task for
10+
as long as it's a dependency of other tasks, is referenced by a ``Client`` through a
11+
``Future``, or is part of a :doc:`published dataset <publish>`.
12+
13+
Dask assigns tasks to workers following criteria of CPU occupancy, :doc:`resources`, and
14+
locality. In the trivial use case of tasks that are not connected to each other, take
15+
the same time to compute, return data of the same size, and have no resource
16+
constraints, one will observe a perfect balance in memory occupation across workers too.
17+
In all other use cases, however, as the computation goes it could cause an imbalance in
18+
memory usage.
19+
20+
When a task runs on a worker and requires in input the output of a task from a different
21+
worker, Dask will transparently transfer the data between workers, ending up with
22+
multiple copies of the same data on different workers. This is generally desirable, as
23+
it avoids re-transferring the data if it's required again later on. However, it also
24+
causes increased overall memory usage across the cluster.
25+
26+
27+
Enabling the Active Memory Manager
28+
----------------------------------
29+
The AMM can be enabled through the :doc:`Dask configuration file <configuration>`:
30+
31+
.. code-block:: yaml
32+
33+
distributed:
34+
scheduler:
35+
active-memory-manager:
36+
start: true
37+
interval: 2s
38+
39+
The above is the recommended setup and will run all enabled *AMM policies* (see below)
40+
every two seconds. Alternatively, you can manually start/stop the AMM from the
41+
``Client`` or trigger a one-off iteration:
42+
43+
.. code-block:: python
44+
45+
>>> client.scheduler.amm_start() # Start running every 2 seconds
46+
>>> client.scheduler.amm_stop() # Stop running periodically
47+
>>> client.scheduler.amm_run_once()
48+
49+
50+
Policies
51+
--------
52+
The AMM by itself doesn't do anything. The user must enable *policies* which suggest
53+
actions regarding Dask data. The AMM runs the policies and enacts their suggestions, as
54+
long as they don't harm data integrity. These suggestions can be of two types:
55+
56+
- Replicate the data of an in-memory Dask task from one worker to another.
57+
This should not be confused with replication caused by task dependencies.
58+
- Delete one or more replicas of an in-memory task. The AMM will never delete the last
59+
replica of a task, even if a policy asks to.
60+
61+
Unless a policy puts constraints on which workers should be impacted, the AMM will
62+
automatically create replicas on workers with the lowest memory usage first and delete
63+
them from workers with the highest memory usage first.
64+
65+
Individual policies are enabled, disabled, and configured through the Dask config:
66+
67+
68+
.. code-block:: yaml
69+
70+
distributed:
71+
scheduler:
72+
active-memory-manager:
73+
start: true
74+
interval: 2s
75+
policies:
76+
- class: distributed.active_memory_manager.ReduceReplicas
77+
- class: my_package.MyPolicy
78+
arg1: foo
79+
arg2: bar
80+
81+
See below for custom policies like the one in the example above.
82+
83+
The default Dask config file contains a sane selection of builtin policies that should
84+
be generally desirable. You should try first with just ``start: true`` in your Dask
85+
config and see if it is fit for purpose for you before you tweak individual policies.
86+
87+
88+
Built-in policies
89+
-----------------
90+
ReduceReplicas
91+
++++++++++++++
92+
class
93+
``distributed.active_memory_manager.ReduceReplicas``
94+
parameters
95+
None
96+
97+
This policy is enabled in the default Dask config. Whenever a Dask task is replicated
98+
on more than one worker and the additional replicas don't appear to serve an ongoing
99+
computation, this policy drops all excess replicas.
100+
101+
.. note::
102+
This policy is incompatible with :meth:`~distributed.Client.replicate` and with the
103+
``broadcast=True`` parameter of :meth:`~distributed.Client.scatter`. If you invoke
104+
``replicate`` to create additional replicas and then later run this policy, it will
105+
delete all replicas but one (but not necessarily the new ones).
106+
107+
108+
Custom policies
109+
---------------
110+
Power users can write their own policies by subclassing
111+
:class:`~distributed.active_memory_manager.ActiveMemoryManagerPolicy`. The class should
112+
define two methods:
113+
114+
``__init__``
115+
A custom policy may load parameters from the Dask config through ``__init__``
116+
parameters. If you don't need configuration, you don't need to implement this
117+
method.
118+
``run``
119+
This method accepts no parameters and is invoked by the AMM every 2 seconds (or
120+
whatever the AMM interval is).
121+
It must yield zero or more of the following *suggestion* tuples:
122+
123+
``yield "replicate", <TaskState>, None``
124+
Create one replica of the target task on the worker with the lowest memory usage
125+
that doesn't hold a replica yet. To create more than one replica, you need to
126+
yield the same command more than once.
127+
``yield "replicate", <TaskState>, {<WorkerState>, <WorkerState>, ...}``
128+
Create one replica of the target task on the worker with the lowest memory among
129+
the listed candidates.
130+
``yield "drop", <TaskState>, None``
131+
Delete one replica of the target task one the worker with the highest memory
132+
usage across the whole cluster.
133+
``yield "drop", <TaskState>, {<WorkerState>, <WorkerState>, ...}``
134+
Delete one replica of the target task on the worker with the highest memory
135+
among the listed candidates.
136+
137+
The AMM will silently reject unacceptable suggestions, such as:
138+
139+
- Delete the last replica of a task
140+
- Delete a replica from a subset of workers that don't hold any
141+
- Delete a replica from a worker that currently needs it for computation
142+
- Replicate a task that is not yet in memory
143+
- Create more replicas of a task than there are workers
144+
- Create replicas of a task on workers that already hold them
145+
- Create replicas on paused or retiring workers
146+
147+
It is generally a good idea to design policies to be as simple as possible and let
148+
the AMM take care of the edge cases above by ignoring some of the suggestions.
149+
150+
Optionally, the ``run`` method may retrieve which worker the AMM just selected, as
151+
follows:
152+
153+
.. code-block:: python
154+
155+
ws = (yield "drop", ts, None)
156+
157+
The ``run`` method can access the following attributes:
158+
159+
``self.manager``
160+
The :class:`~distributed.active_memory_manager.ActiveMemoryManagerExtension` that
161+
the policy is attached to
162+
``self.manager.scheduler``
163+
:class:`~distributed.Scheduler` to which the suggestions will be applied. From there
164+
you can access various attributes such as ``tasks`` and ``workers``.
165+
``self.manager.workers_memory``
166+
Read-only mapping of ``{WorkerState: bytes}``. bytes is the expected RAM usage of
167+
the worker after all suggestions accepted so far in the current AMM iteration, from
168+
all policies, will be enacted. Note that you don't need to access this if you are
169+
happy to always create/delete replicas on the workers with the lowest and highest
170+
memory usage respectively - the AMM will handle it for you.
171+
``self.manager.pending``
172+
Read-only mapping of ``{TaskState: ({<WorkerState>, ...}, {<WorkerState>, ...})``.
173+
The first set contains the workers that will receive a new replica of the task
174+
according to the suggestions accepted so far; the second set contains the workers
175+
which will lose a replica.
176+
``self.manager.policies``
177+
Set of policies registered in the AMM. A policy can deregister itself as follows:
178+
179+
.. code-block:: python
180+
181+
def run(self):
182+
self.manager.policies.drop(self)
183+
184+
Example
185+
+++++++
186+
The following custom policy ensures that keys "foo" and "bar" are replicated on all
187+
workers at all times. New workers will receive a replica soon after connecting to the
188+
scheduler. The policy will do nothing if the target keys are not in memory somewhere or
189+
if all workers already hold a replica.
190+
Note that this example is incompatible with the ``ReduceReplicas`` built-in policy.
191+
192+
In mymodule.py (it must be accessible by the scheduler):
193+
194+
.. code-block:: python
195+
196+
from distributed.active_memory_manager import ActiveMemoryManagerPolicy
197+
198+
199+
class EnsureBroadcast(ActiveMemoryManagerPolicy):
200+
def __init__(self, key):
201+
self.key = key
202+
203+
def run(self):
204+
ts = self.manager.scheduler.tasks.get(self.key)
205+
if not ts:
206+
return
207+
for _ in range(len(self.manager.scheduler.workers) - len(ts.who_has)):
208+
yield "replicate", ts, None
209+
210+
Note that the policy doesn't bother testing for edge cases such as paused workers or
211+
other policies also requesting replicas; the AMM takes care of it. In theory you could
212+
rewrite the last two lines as follows (at the cost of some wasted CPU cycles):
213+
214+
.. code-block:: python
215+
216+
for _ in range(1000):
217+
yield "replicate", ts, None
218+
219+
In distributed.yaml:
220+
221+
.. code-block:: yaml
222+
223+
distributed:
224+
scheduler:
225+
active-memory-manager:
226+
start: true
227+
interval: 2s
228+
policies:
229+
- class: mymodule.EnsureBroadcast
230+
key: foo
231+
- class: mymodule.EnsureBroadcast
232+
key: bar
233+
234+
We could have alternatively used a single policy instance with a list of keys - the
235+
above design merely illustrates that you may have multiple instances of the same policy
236+
running side by side.
237+
238+
239+
API reference
240+
-------------
241+
.. autoclass:: distributed.active_memory_manager.ActiveMemoryManagerExtension
242+
:members:
243+
244+
.. autoclass:: distributed.active_memory_manager.ActiveMemoryManagerPolicy
245+
:members:
246+
247+
.. autoclass:: distributed.active_memory_manager.ReduceReplicas

docs/source/conf.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@
147147
# Add any paths that contain custom static files (such as style sheets) here,
148148
# relative to this directory. They are copied after the builtin static files,
149149
# so a file named "default.css" will overwrite the builtin "default.css".
150-
html_static_path = ["_static"]
150+
html_static_path: list[str] = []
151151

152152
# Add any extra paths that contain custom files (such as robots.txt or
153153
# .htaccess) here, relative to this directory. These files are copied

docs/source/diagnosing-performance.rst

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,8 @@ identify performance issues.
1919
Fortunately, Dask collects a variety of diagnostic information during
2020
execution. It does this both to provide performance feedback to users, but
2121
also for its own internal scheduling decisions. The primary place to observe
22-
this feedback is the :doc:`diagnostic dashboard <web>`. This document
23-
describes the various pieces of performance information available and how to
24-
access them.
22+
this feedback is the diagnostic dashboard. This document describes the various
23+
pieces of performance information available and how to access them.
2524

2625

2726
Task start and stop times

docs/source/index.rst

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,14 +109,13 @@ Contents
109109

110110
actors
111111
asynchronous
112-
configuration
113112
ipython
114-
prometheus
115113
http_services
116114
publish
117115
resources
118116
task-launch
119117
tls
118+
active_memory_manager
120119

121120
.. toctree::
122121
:maxdepth: 1

docs/source/tls.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ One can also pass additional parameters:
5050

5151
All those parameters can be passed in several ways:
5252

53-
* through the Dask :ref:`configuration file <configuration>`;
53+
* through the Dask :doc:`configuration file <configuration>`;
5454
* if using the command line, through options to ``dask-scheduler`` and
5555
``dask-worker``;
5656
* if using the API, through a ``Security`` object. For example, here is

0 commit comments

Comments
 (0)