Skip to content

Commit 2b22ff3

Browse files
committed
Replace long callback interval with request polling handled in renderer.
1 parent 3a207ce commit 2b22ff3

File tree

12 files changed

+575
-321
lines changed

12 files changed

+575
-321
lines changed

Diff for: dash/_callback.py

+196-9
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
import collections
22
from functools import wraps
33

4+
import flask
5+
46
from .dependencies import (
57
handle_callback_args,
68
handle_grouped_callback_args,
79
Output,
810
)
9-
from .exceptions import PreventUpdate
11+
from .exceptions import PreventUpdate, WildcardInLongCallback, DuplicateCallback
1012

1113
from ._grouping import (
1214
flatten_grouping,
@@ -17,9 +19,11 @@
1719
create_callback_id,
1820
stringify_id,
1921
to_json,
22+
coerce_to_list,
2023
)
2124

2225
from . import _validate
26+
from .long_callback.managers import BaseLongCallbackManager
2327

2428

2529
class NoUpdate:
@@ -30,15 +34,29 @@ def to_plotly_json(self): # pylint: disable=no-self-use
3034

3135
@staticmethod
3236
def is_no_update(obj):
33-
return obj == {"_dash_no_update": "_dash_no_update"}
37+
return isinstance(obj, NoUpdate) or obj == {
38+
"_dash_no_update": "_dash_no_update"
39+
}
3440

3541

3642
GLOBAL_CALLBACK_LIST = []
3743
GLOBAL_CALLBACK_MAP = {}
3844
GLOBAL_INLINE_SCRIPTS = []
3945

4046

41-
def callback(*_args, **_kwargs):
47+
def callback(
48+
*_args,
49+
long=False,
50+
long_interval=1000,
51+
long_progress=None,
52+
long_progress_default=None,
53+
long_running=None,
54+
long_cancel=None,
55+
long_manager=None,
56+
long_cache_args_to_ignore=None,
57+
config_prevent_initial_callbacks=False,
58+
**_kwargs,
59+
):
4260
"""
4361
Normally used as a decorator, `@dash.callback` provides a server-side
4462
callback relating the values of one or more `Output` items to one or
@@ -56,15 +74,79 @@ def callback(*_args, **_kwargs):
5674
not to fire when its outputs are first added to the page. Defaults to
5775
`False` and unlike `app.callback` is not configurable at the app level.
5876
"""
77+
78+
long_spec = None
79+
80+
if long:
81+
long_spec = {
82+
"interval": long_interval,
83+
}
84+
85+
if long_manager:
86+
long_spec["manager"] = long_manager
87+
88+
if long_progress:
89+
long_spec["progress"] = coerce_to_list(long_progress)
90+
validate_long_inputs(long_spec["progress"])
91+
92+
if long_progress_default:
93+
long_spec["progressDefault"] = coerce_to_list(long_progress_default)
94+
95+
if not len(long_spec["progress"]) == len(long_spec["progressDefault"]):
96+
raise Exception(
97+
"Progress and progress default needs to be of same length"
98+
)
99+
100+
if long_running:
101+
long_spec["running"] = coerce_to_list(long_running)
102+
validate_long_inputs(x[0] for x in long_spec["running"])
103+
104+
if long_cancel:
105+
cancel_inputs = coerce_to_list(long_cancel)
106+
validate_long_inputs(cancel_inputs)
107+
108+
cancels_output = [Output(c.component_id, "id") for c in cancel_inputs]
109+
110+
try:
111+
112+
@callback(cancels_output, cancel_inputs, prevent_initial_call=True)
113+
def cancel_call(*_):
114+
job_ids = flask.request.args.getlist("cancelJob")
115+
manager = long_manager or flask.g.long_callback_manager
116+
if job_ids:
117+
for job_id in job_ids:
118+
manager.terminate_job(int(job_id))
119+
return NoUpdate()
120+
121+
except DuplicateCallback:
122+
pass # Already a callback to cancel, will get the proper jobs from the store.
123+
124+
long_spec["cancel"] = [c.to_dict() for c in cancel_inputs]
125+
126+
if long_cache_args_to_ignore:
127+
long_spec["cache_args_to_ignore"] = long_cache_args_to_ignore
128+
59129
return register_callback(
60130
GLOBAL_CALLBACK_LIST,
61131
GLOBAL_CALLBACK_MAP,
62-
False,
132+
config_prevent_initial_callbacks,
63133
*_args,
64134
**_kwargs,
135+
long=long_spec,
65136
)
66137

67138

139+
def validate_long_inputs(deps):
140+
for dep in deps:
141+
if dep.has_wildcard():
142+
raise WildcardInLongCallback(
143+
f"""
144+
long callbacks does not support dependencies with
145+
pattern-matching ids
146+
Received: {repr(dep)}\n"""
147+
)
148+
149+
68150
def clientside_callback(clientside_function, *args, **kwargs):
69151
return register_clientside_callback(
70152
GLOBAL_CALLBACK_LIST,
@@ -87,6 +169,7 @@ def insert_callback(
87169
state,
88170
inputs_state_indices,
89171
prevent_initial_call,
172+
long=None,
90173
):
91174
if prevent_initial_call is None:
92175
prevent_initial_call = config_prevent_initial_callbacks
@@ -98,19 +181,26 @@ def insert_callback(
98181
"state": [c.to_dict() for c in state],
99182
"clientside_function": None,
100183
"prevent_initial_call": prevent_initial_call,
184+
"long": long
185+
and {
186+
"interval": long["interval"],
187+
},
101188
}
189+
102190
callback_map[callback_id] = {
103191
"inputs": callback_spec["inputs"],
104192
"state": callback_spec["state"],
105193
"outputs_indices": outputs_indices,
106194
"inputs_state_indices": inputs_state_indices,
195+
"long": long,
107196
}
108197
callback_list.append(callback_spec)
109198

110199
return callback_id
111200

112201

113-
def register_callback(
202+
# pylint: disable=R0912, R0915
203+
def register_callback( # pylint: disable=R0914
114204
callback_list, callback_map, config_prevent_initial_callbacks, *_args, **_kwargs
115205
):
116206
(
@@ -129,6 +219,8 @@ def register_callback(
129219
insert_output = flatten_grouping(output)
130220
multi = True
131221

222+
long = _kwargs.get("long")
223+
132224
output_indices = make_grouping_by_index(output, list(range(grouping_len(output))))
133225
callback_id = insert_callback(
134226
callback_list,
@@ -140,23 +232,118 @@ def register_callback(
140232
flat_state,
141233
inputs_state_indices,
142234
prevent_initial_call,
235+
long=long,
143236
)
144237

145238
# pylint: disable=too-many-locals
146239
def wrap_func(func):
240+
241+
if long is not None:
242+
long_key = BaseLongCallbackManager.register_func(
243+
func, long.get("progress") is not None
244+
)
245+
147246
@wraps(func)
148247
def add_context(*args, **kwargs):
149248
output_spec = kwargs.pop("outputs_list")
249+
app_callback_manager = kwargs.pop("long_callback_manager", None)
250+
callback_manager = long and long.get(
251+
"manager", app_callback_manager
252+
)
150253
_validate.validate_output_spec(insert_output, output_spec, Output)
151254

152255
func_args, func_kwargs = _validate.validate_and_group_input_args(
153256
args, inputs_state_indices
154257
)
155258

156-
# don't touch the comment on the next line - used by debugger
157-
output_value = func(*func_args, **func_kwargs) # %% callback invoked %%
259+
response = {"multi": True}
260+
261+
if long is not None:
262+
progress_outputs = long.get("progress")
263+
cache_key = flask.request.args.get("cacheKey")
264+
job_id = flask.request.args.get("job")
265+
266+
current_key = callback_manager.build_cache_key(
267+
func,
268+
# Inputs provided as dict is kwargs.
269+
func_args if func_args else func_kwargs,
270+
long.get("cache_args_to_ignore", []),
271+
)
272+
273+
if not cache_key:
274+
cache_key = current_key
275+
276+
job_fn = callback_manager.func_registry.get(long_key)
277+
278+
job = callback_manager.call_job_fn(
279+
cache_key,
280+
job_fn,
281+
args,
282+
)
283+
284+
data = {
285+
"cacheKey": cache_key,
286+
"job": job,
287+
}
288+
289+
running = long.get("running")
290+
291+
if running:
292+
data["running"] = {str(r[0]): r[1] for r in running}
293+
data["runningOff"] = {str(r[0]): r[2] for r in running}
294+
cancel = long.get("cancel")
295+
if cancel:
296+
data["cancel"] = cancel
297+
298+
progress_default = long.get("progressDefault")
299+
if progress_default:
300+
data["progressDefault"] = {
301+
str(o): x
302+
for o, x in zip(progress_outputs, progress_default)
303+
}
304+
return to_json(data)
305+
if progress_outputs:
306+
# Get the progress before the result as it would be erased after the results.
307+
progress = callback_manager.get_progress(cache_key)
308+
if progress:
309+
response["progress"] = {
310+
str(x): progress[i]
311+
for i, x in enumerate(progress_outputs)
312+
}
313+
314+
output_value = callback_manager.get_result(cache_key, job_id)
315+
# Must get job_running after get_result since get_results terminates it.
316+
job_running = callback_manager.job_running(job_id)
317+
if not job_running and output_value is callback_manager.UNDEFINED:
318+
# Job canceled -> no output to close the loop.
319+
output_value = NoUpdate()
320+
321+
elif (
322+
isinstance(output_value, dict)
323+
and "long_callback_error" in output_value
324+
):
325+
error = output_value.get("long_callback_error")
326+
raise Exception(
327+
f"An error occurred inside a long callback: {error['msg']}\n{error['tb']}"
328+
)
329+
330+
if job_running and output_value is not callback_manager.UNDEFINED:
331+
# cached results.
332+
callback_manager.terminate_job(job_id)
333+
334+
if multi and isinstance(output_value, (list, tuple)):
335+
output_value = [
336+
NoUpdate() if NoUpdate.is_no_update(r) else r
337+
for r in output_value
338+
]
339+
340+
if output_value is callback_manager.UNDEFINED:
341+
return to_json(response)
342+
else:
343+
# don't touch the comment on the next line - used by debugger
344+
output_value = func(*func_args, **func_kwargs) # %% callback invoked %%
158345

159-
if isinstance(output_value, NoUpdate):
346+
if NoUpdate.is_no_update(output_value):
160347
raise PreventUpdate
161348

162349
if not multi:
@@ -191,7 +378,7 @@ def add_context(*args, **kwargs):
191378
if not has_update:
192379
raise PreventUpdate
193380

194-
response = {"response": component_ids, "multi": True}
381+
response["response"] = component_ids
195382

196383
try:
197384
jsonResponse = to_json(response)

Diff for: dash/_utils.py

+6
Original file line numberDiff line numberDiff line change
@@ -217,3 +217,9 @@ def gen_salt(chars):
217217
return "".join(
218218
secrets.choice(string.ascii_letters + string.digits) for _ in range(chars)
219219
)
220+
221+
222+
def coerce_to_list(obj):
223+
if not isinstance(obj, (list, tuple)):
224+
return [obj]
225+
return obj

0 commit comments

Comments
 (0)