Skip to content

Commit 86c140a

Browse files
authored
Add long_callback decorator (#1702)
* Add long_callback decorator * Add long_callback tests * Have the diskcache long_callback manager rely on multiprocess on all platforms * long_callback docstring * Add cache_args_to_skip option to long_callback * Add dual long_callback test. Set explicit celery task name to avoid task conflict * celery tests on circleci * Support single list input argument * Raise informative error when dependency to long_callback has pattern-matching id * validate that celery app has result backend configured * Test celery manager with multiple celery workers * Add long callback manager docstrings * Add CHANGELOG entr * Add extra components to validation_layout * Add test where the dependencies of long_callback are added dynamically, and validation is handled using validation_layout and prevent_initial_call=True
1 parent 9d10871 commit 86c140a

22 files changed

+1492
-3
lines changed

.circleci/config.yml

+4
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,8 @@ jobs:
208208
PERCY_PARALLEL_TOTAL: -1
209209
PUPPETEER_SKIP_CHROMIUM_DOWNLOAD: True
210210
PYVERSION: python39
211+
REDIS_URL: redis://localhost:6379
212+
- image: circleci/redis
211213
parallelism: 3
212214
steps:
213215
- checkout
@@ -252,6 +254,8 @@ jobs:
252254
environment:
253255
PERCY_ENABLE: 0
254256
PYVERSION: python36
257+
REDIS_URL: redis://localhost:6379
258+
- image: circleci/redis
255259

256260
workflows:
257261
version: 2

CHANGELOG.md

+1-2
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,9 @@ This project adheres to [Semantic Versioning](https://semver.org/).
77
## Dash and Dash Renderer
88

99
### Added
10+
- [#1702](https://github.com/plotly/dash/pull/1702) Added a new `@app.long_callback` decorator to support callback functions that take a long time to run. See the PR and documentation for more information.
1011
- [#1514](https://github.com/plotly/dash/pull/1514) Perform json encoding using the active plotly JSON engine. This will default to the faster orjson encoder if the `orjson` package is installed.
1112

12-
13-
1413
### Changed
1514
- [#1707](https://github.com/plotly/dash/pull/1707) Change the default value of the `compress` argument to the `dash.Dash` constructor to `False`. This change reduces CPU usage, and was made in recognition of the fact that many deployment platforms (e.g. Dash Enterprise) already apply their own compression. If deploying to an environment that does not already provide compression, the Dash 1 behavior may be restored by adding `compress=True` to the `dash.Dash` constructor.
1615

dash/dash.py

+271-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
handle_callback_args,
2828
handle_grouped_callback_args,
2929
Output,
30+
State,
31+
Input,
3032
)
3133
from .development.base_component import ComponentRegistry
3234
from .exceptions import PreventUpdate, InvalidResourceError, ProxyError
@@ -58,6 +60,7 @@
5860
grouping_len,
5961
)
6062

63+
6164
_flask_compress_version = parse_version(get_distribution("flask-compress").version)
6265

6366
# Add explicit mapping for map files
@@ -258,6 +261,10 @@ class Dash(object):
258261
Set to None or '' if you don't want the document.title to change or if you
259262
want to control the document.title through a separate component or
260263
clientside callback.
264+
265+
:param long_callback_manager: Long callback manager instance to support the
266+
``@app.long_callback`` decorator. Currently an instance of one of
267+
``DiskcacheLongCallbackManager`` or ``CeleryLongCallbackManager``
261268
"""
262269

263270
def __init__(
@@ -286,6 +293,7 @@ def __init__(
286293
plugins=None,
287294
title="Dash",
288295
update_title="Updating...",
296+
long_callback_manager=None,
289297
**obsolete,
290298
):
291299
_validate.check_obsolete(obsolete)
@@ -409,6 +417,8 @@ def __init__(
409417
)
410418

411419
self._assets_files = []
420+
self._long_callback_count = 0
421+
self._long_callback_manager = long_callback_manager
412422

413423
self.logger = logging.getLogger(name)
414424
self.logger.addHandler(logging.StreamHandler(stream=sys.stdout))
@@ -559,6 +569,8 @@ def serve_layout(self):
559569
)
560570

561571
def _config(self):
572+
from dash_html_components import Div # pylint: disable=import-outside-toplevel
573+
562574
# pieces of config needed by the front end
563575
config = {
564576
"url_base_pathname": self.config.url_base_pathname,
@@ -576,7 +588,15 @@ def _config(self):
576588
"max_retry": self._dev_tools.hot_reload_max_retry,
577589
}
578590
if self.validation_layout and not self.config.suppress_callback_exceptions:
579-
config["validation_layout"] = self.validation_layout
591+
validation_layout = self.validation_layout
592+
593+
# Add extra components
594+
if self._extra_components:
595+
validation_layout = Div(
596+
children=[validation_layout] + self._extra_components
597+
)
598+
599+
config["validation_layout"] = validation_layout
580600

581601
return config
582602

@@ -1119,6 +1139,256 @@ def add_context(*args, **kwargs):
11191139

11201140
return wrap_func
11211141

1142+
def long_callback(self, *_args, **_kwargs):
1143+
"""
1144+
Normally used as a decorator, `@app.long_callback` is an alternative to
1145+
`@app.callback` designed for callbacks that take a long time to run,
1146+
without locking up the Dash app or timing out.
1147+
1148+
`@long_callback` is designed to support multiple callback managers.
1149+
Two long callback managers are currently implemented:
1150+
1151+
- A diskcache manager (`DiskcacheLongCallbackManager`) that runs callback
1152+
logic in a separate process and stores the results to disk using the
1153+
diskcache library. This is the easiest backend to use for local
1154+
development.
1155+
- A Celery manager (`CeleryLongCallbackManager`) that runs callback logic
1156+
in a celery worker and returns results to the Dash app through a Celery
1157+
broker like RabbitMQ or Redis.
1158+
1159+
The following arguments may include any valid arguments to `@app.callback`.
1160+
In addition, `@app.long_callback` supports the following optional
1161+
keyword arguments:
1162+
1163+
:Keyword Arguments:
1164+
:param manager:
1165+
A long callback manager instance. Currently an instance of one of
1166+
`DiskcacheLongCallbackManager` or `CeleryLongCallbackManager`.
1167+
Defaults to the `long_callback_manager` instance provided to the
1168+
`dash.Dash constructor`.
1169+
:param running:
1170+
A list of 3-element tuples. The first element of each tuple should be
1171+
an `Output` dependency object referencing a property of a component in
1172+
the app layout. The second element is the value that the property
1173+
should be set to while the callback is running, and the third element
1174+
is the value the property should be set to when the callback completes.
1175+
:param cancel:
1176+
A list of `Input` dependency objects that reference a property of a
1177+
component in the app's layout. When the value of this property changes
1178+
while a callback is running, the callback is canceled.
1179+
Note that the value of the property is not significant, any change in
1180+
value will result in the cancellation of the running job (if any).
1181+
:param progress:
1182+
An `Output` dependency grouping that references properties of
1183+
components in the app's layout. When provided, the decorated function
1184+
will be called with an extra argument as the first argument to the
1185+
function. This argument, is a function handle that the decorated
1186+
function should call in order to provide updates to the app on its
1187+
current progress. This function accepts a single argument, which
1188+
correspond to the grouping of properties specified in the provided
1189+
`Output` dependency grouping
1190+
:param progress_default:
1191+
A grouping of values that should be assigned to the components
1192+
specified by the `progress` argument when the callback is not in
1193+
progress. If `progress_default` is not provided, all the dependency
1194+
properties specified in `progress` will be set to `None` when the
1195+
callback is not running.
1196+
:param cache_args_to_ignore:
1197+
Arguments to ignore when caching is enabled. If callback is configured
1198+
with keyword arguments (Input/State provided in a dict),
1199+
this should be a list of argument names as strings. Otherwise,
1200+
this should be a list of argument indices as integers.
1201+
"""
1202+
from dash._callback_context import ( # pylint: disable=import-outside-toplevel
1203+
callback_context,
1204+
)
1205+
import dash_core_components as dcc # pylint: disable=import-outside-toplevel
1206+
from dash.exceptions import ( # pylint: disable=import-outside-toplevel
1207+
WildcardInLongCallback,
1208+
)
1209+
1210+
# Get long callback manager
1211+
callback_manager = _kwargs.pop("manager", self._long_callback_manager)
1212+
if callback_manager is None:
1213+
raise ValueError(
1214+
"The @app.long_callback decorator requires a long callback manager\n"
1215+
"instance. This may be provided to the app using the \n"
1216+
"long_callback_manager argument to the dash.Dash constructor, or\n"
1217+
"it may be provided to the @app.long_callback decorator as the \n"
1218+
"manager argument"
1219+
)
1220+
1221+
# Extract special long_callback kwargs
1222+
running = _kwargs.pop("running", ())
1223+
cancel = _kwargs.pop("cancel", ())
1224+
progress = _kwargs.pop("progress", ())
1225+
progress_default = _kwargs.pop("progress_default", None)
1226+
interval_time = _kwargs.pop("interval", 1000)
1227+
cache_args_to_ignore = _kwargs.pop("cache_args_to_ignore", [])
1228+
1229+
# Parse remaining args just like app.callback
1230+
(
1231+
output,
1232+
flat_inputs,
1233+
flat_state,
1234+
inputs_state_indices,
1235+
prevent_initial_call,
1236+
) = handle_grouped_callback_args(_args, _kwargs)
1237+
inputs_and_state = flat_inputs + flat_state
1238+
args_deps = map_grouping(lambda i: inputs_and_state[i], inputs_state_indices)
1239+
1240+
# Disallow wildcard dependencies
1241+
for deps in [output, flat_inputs, flat_state]:
1242+
for dep in flatten_grouping(deps):
1243+
if dep.has_wildcard():
1244+
raise WildcardInLongCallback(
1245+
f"""
1246+
@app.long_callback does not support dependencies with
1247+
pattern-matching ids
1248+
Received: {repr(dep)}\n"""
1249+
)
1250+
1251+
# Get unique id for this long_callback definition. This increment is not
1252+
# thread safe, but it doesn't need to be because callback definitions
1253+
# happen on the main thread before the app starts
1254+
self._long_callback_count += 1
1255+
long_callback_id = self._long_callback_count
1256+
1257+
# Create Interval and Store for long callback and add them to the app's
1258+
# _extra_components list
1259+
interval_id = f"_long_callback_interval_{long_callback_id}"
1260+
interval_component = dcc.Interval(
1261+
id=interval_id, interval=interval_time, disabled=prevent_initial_call
1262+
)
1263+
store_id = f"_long_callback_store_{long_callback_id}"
1264+
store_component = dcc.Store(id=store_id, data=dict())
1265+
self._extra_components.extend([interval_component, store_component])
1266+
1267+
# Compute full component plus property name for the cancel dependencies
1268+
cancel_prop_ids = tuple(
1269+
".".join([dep.component_id, dep.component_property]) for dep in cancel
1270+
)
1271+
1272+
def wrapper(fn):
1273+
background_fn = callback_manager.make_job_fn(fn, bool(progress), args_deps)
1274+
1275+
def callback(_triggers, user_store_data, user_callback_args):
1276+
# Build result cache key from inputs
1277+
pending_key = callback_manager.build_cache_key(
1278+
fn, user_callback_args, cache_args_to_ignore
1279+
)
1280+
current_key = user_store_data.get("current_key", None)
1281+
pending_job = user_store_data.get("pending_job", None)
1282+
1283+
should_cancel = pending_key == current_key or any(
1284+
trigger["prop_id"] in cancel_prop_ids
1285+
for trigger in callback_context.triggered
1286+
)
1287+
1288+
# Compute grouping of values to set the progress component's to
1289+
# when cleared
1290+
if progress_default is None:
1291+
clear_progress = (
1292+
map_grouping(lambda x: None, progress) if progress else ()
1293+
)
1294+
else:
1295+
clear_progress = progress_default
1296+
1297+
if should_cancel:
1298+
user_store_data["current_key"] = None
1299+
user_store_data["pending_key"] = None
1300+
user_store_data["pending_job"] = None
1301+
1302+
callback_manager.terminate_job(pending_job)
1303+
1304+
return dict(
1305+
user_callback_output=map_grouping(lambda x: no_update, output),
1306+
interval_disabled=True,
1307+
in_progress=[val for (_, _, val) in running],
1308+
progress=clear_progress,
1309+
user_store_data=user_store_data,
1310+
)
1311+
1312+
# Look up progress value if a job is in progress
1313+
if pending_job:
1314+
progress_value = callback_manager.get_progress(pending_key)
1315+
else:
1316+
progress_value = None
1317+
1318+
if callback_manager.result_ready(pending_key):
1319+
result = callback_manager.get_result(pending_key, pending_job)
1320+
# Set current key (hash of data stored in client)
1321+
# to pending key (hash of data requested by client)
1322+
user_store_data["current_key"] = pending_key
1323+
1324+
# Disable interval if this value was pulled from cache.
1325+
# If this value was the result of a background calculation, don't
1326+
# disable yet. If no other calculations are in progress,
1327+
# interval will be disabled in should_cancel logic above
1328+
# the next time the interval fires.
1329+
interval_disabled = pending_job is None
1330+
return dict(
1331+
user_callback_output=result,
1332+
interval_disabled=interval_disabled,
1333+
in_progress=[val for (_, _, val) in running],
1334+
progress=clear_progress,
1335+
user_store_data=user_store_data,
1336+
)
1337+
elif progress_value:
1338+
return dict(
1339+
user_callback_output=map_grouping(lambda x: no_update, output),
1340+
interval_disabled=False,
1341+
in_progress=[val for (_, val, _) in running],
1342+
progress=progress_value or {},
1343+
user_store_data=user_store_data,
1344+
)
1345+
else:
1346+
# Check if there is a running calculation that can now
1347+
# be canceled
1348+
old_pending_key = user_store_data.get("pending_key", None)
1349+
if (
1350+
old_pending_key
1351+
and old_pending_key != pending_key
1352+
and callback_manager.job_running(pending_job)
1353+
):
1354+
callback_manager.terminate_job(pending_job)
1355+
1356+
user_store_data["pending_key"] = pending_key
1357+
callback_manager.terminate_unhealthy_job(pending_job)
1358+
if not callback_manager.job_running(pending_job):
1359+
user_store_data["pending_job"] = callback_manager.call_job_fn(
1360+
pending_key, background_fn, user_callback_args
1361+
)
1362+
1363+
return dict(
1364+
user_callback_output=map_grouping(lambda x: no_update, output),
1365+
interval_disabled=False,
1366+
in_progress=[val for (_, val, _) in running],
1367+
progress=clear_progress,
1368+
user_store_data=user_store_data,
1369+
)
1370+
1371+
return self.callback(
1372+
inputs=dict(
1373+
_triggers=dict(
1374+
n_intervals=Input(interval_id, "n_intervals"),
1375+
cancel=cancel,
1376+
),
1377+
user_store_data=State(store_id, "data"),
1378+
user_callback_args=args_deps,
1379+
),
1380+
output=dict(
1381+
user_callback_output=output,
1382+
interval_disabled=Output(interval_id, "disabled"),
1383+
in_progress=[dep for (dep, _, _) in running],
1384+
progress=progress,
1385+
user_store_data=Output(store_id, "data"),
1386+
),
1387+
prevent_initial_call=prevent_initial_call,
1388+
)(callback)
1389+
1390+
return wrapper
1391+
11221392
def dispatch(self):
11231393
body = flask.request.get_json()
11241394
flask.g.inputs_list = inputs = body.get( # pylint: disable=assigning-non-slot

dash/dependencies.py

+10
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,16 @@ def _id_matches(self, other):
101101
def __hash__(self):
102102
return hash(str(self))
103103

104+
def has_wildcard(self):
105+
"""
106+
Return true if id contains a wildcard (MATCH, ALL, or ALLSMALLER)
107+
"""
108+
if isinstance(self.component_id, dict):
109+
for v in self.component_id.values():
110+
if isinstance(v, _Wildcard):
111+
return True
112+
return False
113+
104114

105115
class Output(DashDependency): # pylint: disable=too-few-public-methods
106116
"""Output of a callback."""

dash/exceptions.py

+4
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ class IDsCantContainPeriods(CallbackException):
3030
pass
3131

3232

33+
class WildcardInLongCallback(CallbackException):
34+
pass
35+
36+
3337
# Better error name now that more than periods are not permitted.
3438
class InvalidComponentIdError(IDsCantContainPeriods):
3539
pass

dash/long_callback/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
from .managers.celery_manager import CeleryLongCallbackManager # noqa: F401,E402
2+
from .managers.diskcache_manager import DiskcacheLongCallbackManager # noqa: F401,E402

0 commit comments

Comments
 (0)