Skip to content

Add docs section about executing coroutines #364

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Oct 29, 2022
1 change: 1 addition & 0 deletions docs/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ dependencies:
- loky
- furo
- myst-parser
- dask
79 changes: 79 additions & 0 deletions docs/source/tutorial/tutorial.advanced-topics.md
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,85 @@ await runner.task # This is not needed in a notebook environment!
timer.result()
```

## Custom parallelization using coroutines

Adaptive by itself does not implement a way of sharing partial results between function executions.
Instead its implementation of parallel computation using executors is minimal by design.
The appropriate way to implement custom parallelization is by using coroutines (asynchronous functions).

We illustrate this approach by using `dask.distributed` for parallel computations in part because it supports asynchronous operation out-of-the-box.
Let us consider a function `f(x)` which is composed by two parts:
a slow part `g` which can be reused by multiple inputs and shared across function evaluations and a fast part `h` that will be computed for every `x`.

```{code-cell} ipython3
import time

def f(x):
"""
Integer part of `x` repeats and should be reused
Decimal part requires a new computation
"""
return g(int(x)) + h(x % 1)


def g(x):
"""Slow but reusable function"""
time.sleep(random.randrange(5))
return x**2


def h(x):
"""Fast function"""
return x**3
```

In order to combine reuse of values of `g` with adaptive, we need to convert `f` into a dask graph by using `dask.delayed`.

```{code-cell} ipython3
from dask import delayed

# Convert g and h to dask.Delayed objects
g, h = delayed(g), delayed(h)

@delayed
def f(x, y):
return (x + y)**2
```

Next we define a computation using coroutines such that it reuses previously submitted tasks.

```{code-cell} ipython3
from dask.distributed import Client

client = await Client(asynchronous=True)

g_futures = {}

async def f_parallel(x):
# Get or sumbit the slow function future
if (g_future := g_futures.get(int(x))) is None:
g_futures[int(x)] = g_future = client.compute(g(int(x)))

future_f = client.compute(f(g_future, h(x % 1)))

return await future_f
```

To run the adaptive evaluation we provide the asynchronous function to the `learner` and run it via `AsyncRunner` without specifying an executor.

```{code-cell} ipython3
learner = adaptive.Learner1D(f_parallel, bounds=(-3.5, 3.5))

runner = adaptive.AsyncRunner(learner, goal=lambda l: l.loss() < 0.01, ntasks=20)
```

Finally we await for the runner to finish, and then plot the result.

```{code-cell} ipython3
await runner.task
learner.plot()
```

## Using Runners from a script

Runners can also be used from a Python script independently of the notebook.
Expand Down