@@ -367,6 +367,85 @@ await runner.task # This is not needed in a notebook environment!
367
367
timer.result()
368
368
```
369
369
370
+ ## Custom parallelization using coroutines
371
+
372
+ Adaptive by itself does not implement a way of sharing partial results between function executions.
373
+ Instead its implementation of parallel computation using executors is minimal by design.
374
+ The appropriate way to implement custom parallelization is by using coroutines (asynchronous functions).
375
+
376
+ We illustrate this approach by using ` dask.distributed ` for parallel computations in part because it supports asynchronous operation out-of-the-box.
377
+ Let us consider a function ` f(x) ` which is composed by two parts:
378
+ a slow part ` g ` which can be reused by multiple inputs and shared across function evaluations and a fast part ` h ` that will be computed for every ` x ` .
379
+
380
+ ``` {code-cell} ipython3
381
+ import time
382
+
383
+ def f(x):
384
+ """
385
+ Integer part of `x` repeats and should be reused
386
+ Decimal part requires a new computation
387
+ """
388
+ return g(int(x)) + h(x % 1)
389
+
390
+
391
+ def g(x):
392
+ """Slow but reusable function"""
393
+ time.sleep(random.randrange(5))
394
+ return x**2
395
+
396
+
397
+ def h(x):
398
+ """Fast function"""
399
+ return x**3
400
+ ```
401
+
402
+ In order to combine reuse of values of ` g ` with adaptive, we need to convert ` f ` into a dask graph by using ` dask.delayed ` .
403
+
404
+ ``` {code-cell} ipython3
405
+ from dask import delayed
406
+
407
+ # Convert g and h to dask.Delayed objects
408
+ g, h = delayed(g), delayed(h)
409
+
410
+ @delayed
411
+ def f(x, y):
412
+ return (x + y)**2
413
+ ```
414
+
415
+ Next we define a computation using coroutines such that it reuses previously submitted tasks.
416
+
417
+ ``` {code-cell} ipython3
418
+ from dask.distributed import Client
419
+
420
+ client = await Client(asynchronous=True)
421
+
422
+ g_futures = {}
423
+
424
+ async def f_parallel(x):
425
+ # Get or sumbit the slow function future
426
+ if (g_future := g_futures.get(int(x))) is None:
427
+ g_futures[int(x)] = g_future = client.compute(g(int(x)))
428
+
429
+ future_f = client.compute(f(g_future, h(x % 1)))
430
+
431
+ return await future_f
432
+ ```
433
+
434
+ To run the adaptive evaluation we provide the asynchronous function to the ` learner ` and run it via ` AsyncRunner ` without specifying an executor.
435
+
436
+ ``` {code-cell} ipython3
437
+ learner = adaptive.Learner1D(f_parallel, bounds=(-3.5, 3.5))
438
+
439
+ runner = adaptive.AsyncRunner(learner, goal=lambda l: l.loss() < 0.01, ntasks=20)
440
+ ```
441
+
442
+ Finally we await for the runner to finish, and then plot the result.
443
+
444
+ ``` {code-cell} ipython3
445
+ await runner.task
446
+ learner.plot()
447
+ ```
448
+
370
449
## Using Runners from a script
371
450
372
451
Runners can also be used from a Python script independently of the notebook.
0 commit comments