From bfbd59eec1b76b7eb374ebb94e0eb5309c5a12c2 Mon Sep 17 00:00:00 2001 From: philippe Date: Tue, 26 Apr 2022 16:23:20 -0400 Subject: [PATCH 01/35] Handle long callback errors. --- dash/dash.py | 38 ++++++++++++++++++- dash/long_callback/managers/celery_manager.py | 36 ++++++++++++++---- .../managers/diskcache_manager.py | 35 ++++++++++++++--- 3 files changed, 95 insertions(+), 14 deletions(-) diff --git a/dash/dash.py b/dash/dash.py index 7736bdd57b..38a44baad2 100644 --- a/dash/dash.py +++ b/dash/dash.py @@ -21,6 +21,7 @@ from dash import dcc from dash import html from dash import dash_table +from ._callback import NoUpdate from .fingerprint import build_fingerprint, check_fingerprint from .resources import Scripts, Css @@ -1169,7 +1170,17 @@ def long_callback(self, *_args, **_kwargs): # pylint: disable=too-many-statemen ) store_id = f"_long_callback_store_{long_callback_id}" store_component = dcc.Store(id=store_id, data={}) - self._extra_components.extend([interval_component, store_component]) + error_id = f"_long_callback_error_{long_callback_id}" + error_store_component = dcc.Store(id=error_id) + error_dummy = f"_long_callback_error_dummy_{long_callback_id}" + self._extra_components.extend( + [ + interval_component, + store_component, + error_store_component, + dcc.Store(id=error_dummy), + ] + ) # Compute full component plus property name for the cancel dependencies cancel_prop_ids = tuple( @@ -1214,6 +1225,7 @@ def callback(_triggers, user_store_data, user_callback_args): in_progress=[val for (_, _, val) in running], progress=clear_progress, user_store_data=user_store_data, + error=NoUpdate(), ) # Look up progress value if a job is in progress @@ -1228,6 +1240,19 @@ def callback(_triggers, user_store_data, user_callback_args): # to pending key (hash of data requested by client) user_store_data["current_key"] = pending_key + if isinstance(result, dict) and result.get("long_callback_error"): + error = result.get("long_callback_error") + print(result["long_callback_error"]["tb"], file=sys.stderr) + return dict( + error=f"An error occurred inside a long callback: {error['msg']}\n" + + error["tb"], + user_callback_output=NoUpdate(), + in_progress=[val for (_, _, val) in running], + interval_disabled=pending_job is None, + progress=clear_progress, + user_store_data=user_store_data, + ) + # Disable interval if this value was pulled from cache. # If this value was the result of a background calculation, don't # disable yet. If no other calculations are in progress, @@ -1240,6 +1265,7 @@ def callback(_triggers, user_store_data, user_callback_args): in_progress=[val for (_, _, val) in running], progress=clear_progress, user_store_data=user_store_data, + error=NoUpdate(), ) if progress_value: return dict( @@ -1248,6 +1274,7 @@ def callback(_triggers, user_store_data, user_callback_args): in_progress=[val for (_, val, _) in running], progress=progress_value or {}, user_store_data=user_store_data, + error=NoUpdate(), ) # Check if there is a running calculation that can now @@ -1273,8 +1300,16 @@ def callback(_triggers, user_store_data, user_callback_args): in_progress=[val for (_, val, _) in running], progress=clear_progress, user_store_data=user_store_data, + error=NoUpdate(), ) + self.clientside_callback( + "function (error) {throw new Error(error)}", + Output(error_dummy, "data"), + [Input(error_id, "data")], + prevent_initial_call=True, + ) + return self.callback( inputs=dict( _triggers=dict( @@ -1290,6 +1325,7 @@ def callback(_triggers, user_store_data, user_callback_args): in_progress=[dep for (dep, _, _) in running], progress=progress, user_store_data=Output(store_id, "data"), + error=Output(error_id, "data"), ), prevent_initial_call=prevent_initial_call, )(callback) diff --git a/dash/long_callback/managers/celery_manager.py b/dash/long_callback/managers/celery_manager.py index 863daa6816..2aa0356a3d 100644 --- a/dash/long_callback/managers/celery_manager.py +++ b/dash/long_callback/managers/celery_manager.py @@ -1,8 +1,12 @@ import json import inspect import hashlib +import traceback from _plotly_utils.utils import PlotlyJSONEncoder + +from dash._callback import NoUpdate +from dash.exceptions import PreventUpdate from dash.long_callback.managers import BaseLongCallbackManager @@ -132,13 +136,31 @@ def _set_progress(progress_value): cache.set(progress_key, json.dumps(progress_value, cls=PlotlyJSONEncoder)) maybe_progress = [_set_progress] if progress else [] - if isinstance(args_deps, dict): - user_callback_output = fn(*maybe_progress, **user_callback_args) - elif isinstance(args_deps, (list, tuple)): - user_callback_output = fn(*maybe_progress, *user_callback_args) - else: - user_callback_output = fn(*maybe_progress, user_callback_args) - cache.set(result_key, json.dumps(user_callback_output, cls=PlotlyJSONEncoder)) + try: + if isinstance(args_deps, dict): + user_callback_output = fn(*maybe_progress, **user_callback_args) + elif isinstance(args_deps, (list, tuple)): + user_callback_output = fn(*maybe_progress, *user_callback_args) + else: + user_callback_output = fn(*maybe_progress, user_callback_args) + except PreventUpdate: + cache.set(result_key, json.dumps(NoUpdate(), cls=PlotlyJSONEncoder)) + except Exception as err: # pylint: disable=broad-except + cache.set( + result_key, + json.dumps( + { + "long_callback_error": { + "msg": str(err), + "tb": traceback.format_exc(), + } + } + ), + ) + else: + cache.set( + result_key, json.dumps(user_callback_output, cls=PlotlyJSONEncoder) + ) return job_fn diff --git a/dash/long_callback/managers/diskcache_manager.py b/dash/long_callback/managers/diskcache_manager.py index ec0c92f981..3b57188e6d 100644 --- a/dash/long_callback/managers/diskcache_manager.py +++ b/dash/long_callback/managers/diskcache_manager.py @@ -1,8 +1,16 @@ +import traceback + from . import BaseLongCallbackManager +from ..._callback import NoUpdate +from ...exceptions import PreventUpdate _pending_value = "__$pending__" +class OriginalException: + pass + + class DiskcacheLongCallbackManager(BaseLongCallbackManager): def __init__(self, cache=None, cache_by=None, expire=None): """ @@ -140,12 +148,27 @@ def _set_progress(progress_value): cache.set(progress_key, progress_value) maybe_progress = [_set_progress] if progress else [] - if isinstance(args_deps, dict): - user_callback_output = fn(*maybe_progress, **user_callback_args) - elif isinstance(args_deps, (list, tuple)): - user_callback_output = fn(*maybe_progress, *user_callback_args) + + try: + if isinstance(args_deps, dict): + user_callback_output = fn(*maybe_progress, **user_callback_args) + elif isinstance(args_deps, (list, tuple)): + user_callback_output = fn(*maybe_progress, *user_callback_args) + else: + user_callback_output = fn(*maybe_progress, user_callback_args) + except PreventUpdate: + cache.set(result_key, NoUpdate()) + except Exception as err: # pylint: disable=broad-except + cache.set( + result_key, + { + "long_callback_error": { + "msg": str(err), + "tb": traceback.format_exc(), + } + }, + ) else: - user_callback_output = fn(*maybe_progress, user_callback_args) - cache.set(result_key, user_callback_output) + cache.set(result_key, user_callback_output) return job_fn From dbacfcc6da64052d31697f57922a4e9e1066df4c Mon Sep 17 00:00:00 2001 From: philippe Date: Tue, 26 Apr 2022 16:25:20 -0400 Subject: [PATCH 02/35] :hankey: Add test long callback error. --- tests/integration/long_callback/app_error.py | 42 +++++++++++++++++++ .../long_callback/test_basic_long_callback.py | 23 ++++++++++ 2 files changed, 65 insertions(+) create mode 100644 tests/integration/long_callback/app_error.py diff --git a/tests/integration/long_callback/app_error.py b/tests/integration/long_callback/app_error.py new file mode 100644 index 0000000000..ff071fa64e --- /dev/null +++ b/tests/integration/long_callback/app_error.py @@ -0,0 +1,42 @@ +import time + +import dash +from dash import html +from dash.dependencies import Input, Output +from dash.exceptions import PreventUpdate + +from tests.integration.long_callback.utils import get_long_callback_manager + +long_callback_manager = get_long_callback_manager() +handle = long_callback_manager.handle + +app = dash.Dash(__name__, long_callback_manager=long_callback_manager) +app.enable_dev_tools(debug=True, dev_tools_ui=True) +app.layout = html.Div( + [ + html.Div([html.P(id="output", children=["Button not clicked"])]), + html.Button(id="button", children="Run Job!"), + ] +) + + +@app.long_callback( + output=Output("output", "children"), + inputs=Input("button", "n_clicks"), + running=[ + (Output("button", "disabled"), True, False), + ], + prevent_initial_call=True, +) +def callback(n_clicks): + time.sleep(0.5) + if n_clicks == 2: + raise Exception("bad error") + + if n_clicks == 4: + raise PreventUpdate + return f"Clicked {n_clicks} times" + + +if __name__ == "__main__": + app.run_server(debug=True) diff --git a/tests/integration/long_callback/test_basic_long_callback.py b/tests/integration/long_callback/test_basic_long_callback.py index bbd7972934..a0d750ced9 100644 --- a/tests/integration/long_callback/test_basic_long_callback.py +++ b/tests/integration/long_callback/test_basic_long_callback.py @@ -426,3 +426,26 @@ def test_lcbc007_validation_layout(dash_duo, manager): assert not dash_duo.redux_state_is_loading assert dash_duo.get_logs() == [] + + +def test_lcbc008_long_callbacks_error(dash_duo, manager): + with setup_long_callback_app(manager, "app_error") as app: + dash_duo.start_server(app) + + clicker = dash_duo.find_element("#button") + + clicker.click() + dash_duo.wait_for_text_to_equal("#output", "Clicked 1 times") + + clicker.click() + dash_duo.wait_for_element("#button:not([disabled])") + # dash_duo.wait_for_contains_text('.dash-fe-error__title' 'An error occurred inside a long callback:') + + clicker.click() + dash_duo.wait_for_text_to_equal("#output", "Clicked 3 times") + + clicker.click() + clicker = dash_duo.wait_for_element("#button:not([disabled])") + dash_duo.wait_for_text_to_equal("#output", "Clicked 3 times") + clicker.click() + dash_duo.wait_for_text_to_equal("#output", "Clicked 5 times") From ac59e5969bbd076c72e6e596a410ed1475da41b8 Mon Sep 17 00:00:00 2001 From: philippe Date: Fri, 6 May 2022 15:11:54 -0400 Subject: [PATCH 03/35] Add lock to diskcache --- .../managers/diskcache_manager.py | 31 ++++++++++++------- 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/dash/long_callback/managers/diskcache_manager.py b/dash/long_callback/managers/diskcache_manager.py index 3b57188e6d..ce648ea3c2 100644 --- a/dash/long_callback/managers/diskcache_manager.py +++ b/dash/long_callback/managers/diskcache_manager.py @@ -1,4 +1,5 @@ import traceback +from multiprocessing import Lock from . import BaseLongCallbackManager from ..._callback import NoUpdate @@ -6,6 +7,8 @@ _pending_value = "__$pending__" +_lock = Lock() + class OriginalException: pass @@ -145,7 +148,8 @@ def get_result(self, key, job): def _make_job_fn(fn, cache, progress, args_deps): def job_fn(result_key, progress_key, user_callback_args): def _set_progress(progress_value): - cache.set(progress_key, progress_value) + with _lock: + cache.set(progress_key, progress_value) maybe_progress = [_set_progress] if progress else [] @@ -157,18 +161,21 @@ def _set_progress(progress_value): else: user_callback_output = fn(*maybe_progress, user_callback_args) except PreventUpdate: - cache.set(result_key, NoUpdate()) + with _lock: + cache.set(result_key, NoUpdate()) except Exception as err: # pylint: disable=broad-except - cache.set( - result_key, - { - "long_callback_error": { - "msg": str(err), - "tb": traceback.format_exc(), - } - }, - ) + with _lock: + cache.set( + result_key, + { + "long_callback_error": { + "msg": str(err), + "tb": traceback.format_exc(), + } + }, + ) else: - cache.set(result_key, user_callback_output) + with _lock: + cache.set(result_key, user_callback_output) return job_fn From e2d3d5ed89ff78841b57f67e0bc955aaab41c959 Mon Sep 17 00:00:00 2001 From: philippe Date: Fri, 6 May 2022 15:12:32 -0400 Subject: [PATCH 04/35] Fix test long callback error. --- tests/integration/long_callback/app_error.py | 2 +- .../long_callback/test_basic_long_callback.py | 28 +++++++++++++------ 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/tests/integration/long_callback/app_error.py b/tests/integration/long_callback/app_error.py index ff071fa64e..ce3a0a1023 100644 --- a/tests/integration/long_callback/app_error.py +++ b/tests/integration/long_callback/app_error.py @@ -29,7 +29,7 @@ prevent_initial_call=True, ) def callback(n_clicks): - time.sleep(0.5) + time.sleep(1) if n_clicks == 2: raise Exception("bad error") diff --git a/tests/integration/long_callback/test_basic_long_callback.py b/tests/integration/long_callback/test_basic_long_callback.py index a0d750ced9..2cc76aeab8 100644 --- a/tests/integration/long_callback/test_basic_long_callback.py +++ b/tests/integration/long_callback/test_basic_long_callback.py @@ -430,22 +430,34 @@ def test_lcbc007_validation_layout(dash_duo, manager): def test_lcbc008_long_callbacks_error(dash_duo, manager): with setup_long_callback_app(manager, "app_error") as app: - dash_duo.start_server(app) + dash_duo.start_server( + app, + debug=True, + use_reloader=False, + use_debugger=True, + dev_tools_hot_reload=False, + dev_tools_ui=True, + ) clicker = dash_duo.find_element("#button") + def click_n_wait(): + clicker.click() + dash_duo.wait_for_element("#button:disabled") + dash_duo.wait_for_element("#button:not([disabled])") + clicker.click() dash_duo.wait_for_text_to_equal("#output", "Clicked 1 times") - clicker.click() - dash_duo.wait_for_element("#button:not([disabled])") - # dash_duo.wait_for_contains_text('.dash-fe-error__title' 'An error occurred inside a long callback:') + click_n_wait() + dash_duo.wait_for_contains_text( + ".dash-fe-error__title", "An error occurred inside a long callback:" + ) - clicker.click() + click_n_wait() dash_duo.wait_for_text_to_equal("#output", "Clicked 3 times") - clicker.click() - clicker = dash_duo.wait_for_element("#button:not([disabled])") + click_n_wait() dash_duo.wait_for_text_to_equal("#output", "Clicked 3 times") - clicker.click() + click_n_wait() dash_duo.wait_for_text_to_equal("#output", "Clicked 5 times") From fb4cef92b72ed37b175973c8d4a4bf448814280a Mon Sep 17 00:00:00 2001 From: philippe Date: Fri, 6 May 2022 17:39:24 -0400 Subject: [PATCH 05/35] Handle no update in celery long callbacks. --- dash/_callback.py | 4 +++- dash/dash.py | 11 ++++++++++- dash/long_callback/managers/celery_manager.py | 2 +- tests/integration/long_callback/app_error.py | 5 ++++- .../long_callback/test_basic_long_callback.py | 2 +- 5 files changed, 19 insertions(+), 5 deletions(-) diff --git a/dash/_callback.py b/dash/_callback.py index 7653b29f7b..d14cce6059 100644 --- a/dash/_callback.py +++ b/dash/_callback.py @@ -24,7 +24,9 @@ class NoUpdate: # pylint: disable=too-few-public-methods - pass + + def to_plotly_json(self): # pylint: disable=no-self-use + return {"no_update": "no_update"} GLOBAL_CALLBACK_LIST = [] diff --git a/dash/dash.py b/dash/dash.py index 38a44baad2..c076dfa92a 100644 --- a/dash/dash.py +++ b/dash/dash.py @@ -1242,7 +1242,10 @@ def callback(_triggers, user_store_data, user_callback_args): if isinstance(result, dict) and result.get("long_callback_error"): error = result.get("long_callback_error") - print(result["long_callback_error"]["tb"], file=sys.stderr) + print( + result["long_callback_error"]["tb"], + file=sys.stderr, + ) return dict( error=f"An error occurred inside a long callback: {error['msg']}\n" + error["tb"], @@ -1253,6 +1256,12 @@ def callback(_triggers, user_store_data, user_callback_args): user_store_data=user_store_data, ) + if ( + isinstance(result, dict) + and result.get("no_update") == "no_update" + ): + result = NoUpdate() + # Disable interval if this value was pulled from cache. # If this value was the result of a background calculation, don't # disable yet. If no other calculations are in progress, diff --git a/dash/long_callback/managers/celery_manager.py b/dash/long_callback/managers/celery_manager.py index 2aa0356a3d..b65622c324 100644 --- a/dash/long_callback/managers/celery_manager.py +++ b/dash/long_callback/managers/celery_manager.py @@ -155,7 +155,7 @@ def _set_progress(progress_value): "msg": str(err), "tb": traceback.format_exc(), } - } + }, ), ) else: diff --git a/tests/integration/long_callback/app_error.py b/tests/integration/long_callback/app_error.py index ce3a0a1023..5e9bb0e3f0 100644 --- a/tests/integration/long_callback/app_error.py +++ b/tests/integration/long_callback/app_error.py @@ -1,3 +1,4 @@ +import os import time import dash @@ -29,7 +30,9 @@ prevent_initial_call=True, ) def callback(n_clicks): - time.sleep(1) + if os.getenv("LONG_CALLBACK_MANAGER") != "celery": + # Diskmanager needs some time, celery takes too long. + time.sleep(1) if n_clicks == 2: raise Exception("bad error") diff --git a/tests/integration/long_callback/test_basic_long_callback.py b/tests/integration/long_callback/test_basic_long_callback.py index 2cc76aeab8..4d6fe3ed87 100644 --- a/tests/integration/long_callback/test_basic_long_callback.py +++ b/tests/integration/long_callback/test_basic_long_callback.py @@ -21,7 +21,7 @@ def kill(proc_pid): if "REDIS_URL" in os.environ: - managers = ["diskcache", "celery"] + managers = ["celery", "diskcache"] else: print("Skipping celery tests because REDIS_URL is not defined") managers = ["diskcache"] From fa94a406506c9a4516610a5ac87a9ff66bc57faa Mon Sep 17 00:00:00 2001 From: philippe Date: Mon, 9 May 2022 10:42:13 -0400 Subject: [PATCH 06/35] Use diskcache lock --- dash/long_callback/managers/diskcache_manager.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/dash/long_callback/managers/diskcache_manager.py b/dash/long_callback/managers/diskcache_manager.py index ce648ea3c2..4528b9d979 100644 --- a/dash/long_callback/managers/diskcache_manager.py +++ b/dash/long_callback/managers/diskcache_manager.py @@ -1,5 +1,4 @@ import traceback -from multiprocessing import Lock from . import BaseLongCallbackManager from ..._callback import NoUpdate @@ -7,8 +6,6 @@ _pending_value = "__$pending__" -_lock = Lock() - class OriginalException: pass @@ -57,6 +54,7 @@ def __init__(self, cache=None, cache_by=None, expire=None): super().__init__(cache_by) self.expire = expire + self.lock = diskcache.Lock(self.handle, "long-callback-lock") def terminate_job(self, job): import psutil # pylint: disable=import-outside-toplevel,import-error @@ -105,7 +103,7 @@ def job_running(self, job): return False def make_job_fn(self, fn, progress, args_deps): - return _make_job_fn(fn, self.handle, progress, args_deps) + return _make_job_fn(fn, self.handle, progress, args_deps, self.lock) def clear_cache_entry(self, key): self.handle.delete(key) @@ -145,10 +143,10 @@ def get_result(self, key, job): return result -def _make_job_fn(fn, cache, progress, args_deps): +def _make_job_fn(fn, cache, progress, args_deps, lock): def job_fn(result_key, progress_key, user_callback_args): def _set_progress(progress_value): - with _lock: + with lock: cache.set(progress_key, progress_value) maybe_progress = [_set_progress] if progress else [] @@ -161,10 +159,10 @@ def _set_progress(progress_value): else: user_callback_output = fn(*maybe_progress, user_callback_args) except PreventUpdate: - with _lock: + with lock: cache.set(result_key, NoUpdate()) except Exception as err: # pylint: disable=broad-except - with _lock: + with lock: cache.set( result_key, { @@ -175,7 +173,7 @@ def _set_progress(progress_value): }, ) else: - with _lock: + with lock: cache.set(result_key, user_callback_output) return job_fn From 7688306831094f0e6e9fd88f1ad83f2b929fd3a6 Mon Sep 17 00:00:00 2001 From: philippe Date: Mon, 9 May 2022 11:01:13 -0400 Subject: [PATCH 07/35] Stricter no_update check. --- dash/_callback.py | 2 +- dash/dash.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dash/_callback.py b/dash/_callback.py index d14cce6059..313e56d20e 100644 --- a/dash/_callback.py +++ b/dash/_callback.py @@ -26,7 +26,7 @@ class NoUpdate: # pylint: disable=too-few-public-methods def to_plotly_json(self): # pylint: disable=no-self-use - return {"no_update": "no_update"} + return {"_dash_no_update": "_dash_no_update"} GLOBAL_CALLBACK_LIST = [] diff --git a/dash/dash.py b/dash/dash.py index c076dfa92a..9cff9588fc 100644 --- a/dash/dash.py +++ b/dash/dash.py @@ -1258,7 +1258,7 @@ def callback(_triggers, user_store_data, user_callback_args): if ( isinstance(result, dict) - and result.get("no_update") == "no_update" + and result == {"_dash_no_update": "_dash_no_update"} ): result = NoUpdate() From 20c72b979f5614ffe9c30cb7ba6931b7fad7c6e5 Mon Sep 17 00:00:00 2001 From: philippe Date: Mon, 9 May 2022 12:52:03 -0400 Subject: [PATCH 08/35] Handle no update in multi output. --- dash/_callback.py | 4 +++ dash/dash.py | 12 ++++++--- dash/development/base_component.py | 2 +- tests/integration/long_callback/app_error.py | 26 ++++++++++++++++++- .../long_callback/test_basic_long_callback.py | 12 +++++++++ 5 files changed, 50 insertions(+), 6 deletions(-) diff --git a/dash/_callback.py b/dash/_callback.py index 313e56d20e..20d5548fd8 100644 --- a/dash/_callback.py +++ b/dash/_callback.py @@ -28,6 +28,10 @@ class NoUpdate: def to_plotly_json(self): # pylint: disable=no-self-use return {"_dash_no_update": "_dash_no_update"} + @staticmethod + def is_no_update(obj): + return obj == {"_dash_no_update": "_dash_no_update"} + GLOBAL_CALLBACK_LIST = [] GLOBAL_CALLBACK_MAP = {} diff --git a/dash/dash.py b/dash/dash.py index 9cff9588fc..f9228b4571 100644 --- a/dash/dash.py +++ b/dash/dash.py @@ -1144,6 +1144,7 @@ def long_callback(self, *_args, **_kwargs): # pylint: disable=too-many-statemen ) = handle_grouped_callback_args(_args, _kwargs) inputs_and_state = flat_inputs + flat_state args_deps = map_grouping(lambda i: inputs_and_state[i], inputs_state_indices) + multi_output = isinstance(output, (list, tuple)) and len(output) > 1 # Disallow wildcard dependencies for deps in [output, flat_inputs, flat_state]: @@ -1256,12 +1257,15 @@ def callback(_triggers, user_store_data, user_callback_args): user_store_data=user_store_data, ) - if ( - isinstance(result, dict) - and result == {"_dash_no_update": "_dash_no_update"} - ): + if NoUpdate.is_no_update(result): result = NoUpdate() + if multi_output and isinstance(result, (list, tuple)): + result = [ + NoUpdate() if NoUpdate.is_no_update(r) else r + for r in result + ] + # Disable interval if this value was pulled from cache. # If this value was the result of a background calculation, don't # disable yet. If no other calculations are in progress, diff --git a/dash/development/base_component.py b/dash/development/base_component.py index 2d6cfbe6c9..6d5b973d02 100644 --- a/dash/development/base_component.py +++ b/dash/development/base_component.py @@ -188,7 +188,7 @@ def _set_random_id(self): """ ) - v = str(uuid.UUID(int=rd.randint(0, 2**128))) + v = str(uuid.UUID(int=rd.randint(0, 2 ** 128))) setattr(self, "id", v) return v diff --git a/tests/integration/long_callback/app_error.py b/tests/integration/long_callback/app_error.py index 5e9bb0e3f0..cf22eaa52d 100644 --- a/tests/integration/long_callback/app_error.py +++ b/tests/integration/long_callback/app_error.py @@ -2,7 +2,7 @@ import time import dash -from dash import html +from dash import html, no_update from dash.dependencies import Input, Output from dash.exceptions import PreventUpdate @@ -17,6 +17,11 @@ [ html.Div([html.P(id="output", children=["Button not clicked"])]), html.Button(id="button", children="Run Job!"), + html.Div(id="output-status"), + html.Div(id="output1"), + html.Div(id="output2"), + html.Div(id="output3"), + html.Button("multi-output", id="multi-output"), ] ) @@ -41,5 +46,24 @@ def callback(n_clicks): return f"Clicked {n_clicks} times" +@app.long_callback( + output=[Output("output-status", "children")] + + [Output(f"output{i}", "children") for i in range(1, 4)], + inputs=[Input("multi-output", "n_clicks")], + running=[ + (Output("multi-output", "disabled"), True, False), + ], + prevent_initial_call=True, +) +def long_multi(n_clicks): + if os.getenv("LONG_CALLBACK_MANAGER") != "celery": + time.sleep(1) + return ( + [f"Updated: {n_clicks}"] + + [i for i in range(1, n_clicks + 1)] + + [no_update for _ in range(n_clicks + 1, 4)] + ) + + if __name__ == "__main__": app.run_server(debug=True) diff --git a/tests/integration/long_callback/test_basic_long_callback.py b/tests/integration/long_callback/test_basic_long_callback.py index 4d6fe3ed87..4d624f4c51 100644 --- a/tests/integration/long_callback/test_basic_long_callback.py +++ b/tests/integration/long_callback/test_basic_long_callback.py @@ -461,3 +461,15 @@ def click_n_wait(): dash_duo.wait_for_text_to_equal("#output", "Clicked 3 times") click_n_wait() dash_duo.wait_for_text_to_equal("#output", "Clicked 5 times") + + def make_expect(n): + return [str(x) for x in range(1, n + 1)] + ["" for _ in range(n + 1, 4)] + + multi = dash_duo.wait_for_element("#multi-output") + + for i in range(1, 4): + multi.click() + expect = make_expect(i) + dash_duo.wait_for_text_to_equal("#output-status", f"Updated: {i}") + for j, e in enumerate(expect): + assert dash_duo.find_element(f"#output{j + 1}").text == e From 2f9fe065593de772fe6f97b9c2c71e5e2efb18a5 Mon Sep 17 00:00:00 2001 From: philippe Date: Mon, 9 May 2022 14:00:07 -0400 Subject: [PATCH 09/35] Add long callback test lock. --- tests/integration/long_callback/app_error.py | 25 ++++++++++--------- .../long_callback/test_basic_long_callback.py | 9 ++++--- tests/integration/long_callback/utils.py | 4 +++ 3 files changed, 23 insertions(+), 15 deletions(-) diff --git a/tests/integration/long_callback/app_error.py b/tests/integration/long_callback/app_error.py index cf22eaa52d..c0e47abc3e 100644 --- a/tests/integration/long_callback/app_error.py +++ b/tests/integration/long_callback/app_error.py @@ -24,6 +24,7 @@ html.Button("multi-output", id="multi-output"), ] ) +app.test_lock = lock = long_callback_manager.test_lock @app.long_callback( @@ -38,12 +39,13 @@ def callback(n_clicks): if os.getenv("LONG_CALLBACK_MANAGER") != "celery": # Diskmanager needs some time, celery takes too long. time.sleep(1) - if n_clicks == 2: - raise Exception("bad error") + with lock: + if n_clicks == 2: + raise Exception("bad error") - if n_clicks == 4: - raise PreventUpdate - return f"Clicked {n_clicks} times" + if n_clicks == 4: + raise PreventUpdate + return f"Clicked {n_clicks} times" @app.long_callback( @@ -56,13 +58,12 @@ def callback(n_clicks): prevent_initial_call=True, ) def long_multi(n_clicks): - if os.getenv("LONG_CALLBACK_MANAGER") != "celery": - time.sleep(1) - return ( - [f"Updated: {n_clicks}"] - + [i for i in range(1, n_clicks + 1)] - + [no_update for _ in range(n_clicks + 1, 4)] - ) + with lock: + return ( + [f"Updated: {n_clicks}"] + + [i for i in range(1, n_clicks + 1)] + + [no_update for _ in range(n_clicks + 1, 4)] + ) if __name__ == "__main__": diff --git a/tests/integration/long_callback/test_basic_long_callback.py b/tests/integration/long_callback/test_basic_long_callback.py index 4d624f4c51..bb04cd54a6 100644 --- a/tests/integration/long_callback/test_basic_long_callback.py +++ b/tests/integration/long_callback/test_basic_long_callback.py @@ -442,8 +442,9 @@ def test_lcbc008_long_callbacks_error(dash_duo, manager): clicker = dash_duo.find_element("#button") def click_n_wait(): - clicker.click() - dash_duo.wait_for_element("#button:disabled") + with app.test_lock: + clicker.click() + dash_duo.wait_for_element("#button:disabled") dash_duo.wait_for_element("#button:not([disabled])") clicker.click() @@ -468,7 +469,9 @@ def make_expect(n): multi = dash_duo.wait_for_element("#multi-output") for i in range(1, 4): - multi.click() + with app.test_lock: + multi.click() + dash_duo.wait_for_element("#multi-output:disabled") expect = make_expect(i) dash_duo.wait_for_text_to_equal("#output-status", f"Updated: {i}") for j, e in enumerate(expect): diff --git a/tests/integration/long_callback/utils.py b/tests/integration/long_callback/utils.py index c6882df1fc..262471d0f8 100644 --- a/tests/integration/long_callback/utils.py +++ b/tests/integration/long_callback/utils.py @@ -8,6 +8,7 @@ def get_long_callback_manager(): if os.environ.get("LONG_CALLBACK_MANAGER", None) == "celery": from dash.long_callback import CeleryLongCallbackManager from celery import Celery + import redis celery_app = Celery( __name__, @@ -15,12 +16,15 @@ def get_long_callback_manager(): backend=os.environ.get("CELERY_BACKEND"), ) long_callback_manager = CeleryLongCallbackManager(celery_app) + redis_conn = redis.Redis(host="localhost", port=6379, db=1) + long_callback_manager.test_lock = redis_conn.lock("test-lock") elif os.environ.get("LONG_CALLBACK_MANAGER", None) == "diskcache": from dash.long_callback import DiskcacheLongCallbackManager import diskcache cache = diskcache.Cache(os.environ.get("DISKCACHE_DIR")) long_callback_manager = DiskcacheLongCallbackManager(cache) + long_callback_manager.test_lock = diskcache.Lock(cache, "test-lock") else: raise ValueError( "Invalid long callback manager specified as LONG_CALLBACK_MANAGER " From 9cab5b4e26ae8aedf471a35474ff682994a533da Mon Sep 17 00:00:00 2001 From: philippe Date: Tue, 17 May 2022 13:07:20 -0400 Subject: [PATCH 10/35] Clean up no_update. --- dash/dash.py | 17 ++++++++--------- .../long_callback/managers/diskcache_manager.py | 4 ---- 2 files changed, 8 insertions(+), 13 deletions(-) diff --git a/dash/dash.py b/dash/dash.py index f9228b4571..ac7e28ee95 100644 --- a/dash/dash.py +++ b/dash/dash.py @@ -21,7 +21,6 @@ from dash import dcc from dash import html from dash import dash_table -from ._callback import NoUpdate from .fingerprint import build_fingerprint, check_fingerprint from .resources import Scripts, Css @@ -1226,7 +1225,7 @@ def callback(_triggers, user_store_data, user_callback_args): in_progress=[val for (_, _, val) in running], progress=clear_progress, user_store_data=user_store_data, - error=NoUpdate(), + error=no_update, ) # Look up progress value if a job is in progress @@ -1250,19 +1249,19 @@ def callback(_triggers, user_store_data, user_callback_args): return dict( error=f"An error occurred inside a long callback: {error['msg']}\n" + error["tb"], - user_callback_output=NoUpdate(), + user_callback_output=no_update, in_progress=[val for (_, _, val) in running], interval_disabled=pending_job is None, progress=clear_progress, user_store_data=user_store_data, ) - if NoUpdate.is_no_update(result): - result = NoUpdate() + if _callback.NoUpdate.is_no_update(result): + result = no_update if multi_output and isinstance(result, (list, tuple)): result = [ - NoUpdate() if NoUpdate.is_no_update(r) else r + no_update if _callback.NoUpdate.is_no_update(r) else r for r in result ] @@ -1278,7 +1277,7 @@ def callback(_triggers, user_store_data, user_callback_args): in_progress=[val for (_, _, val) in running], progress=clear_progress, user_store_data=user_store_data, - error=NoUpdate(), + error=no_update, ) if progress_value: return dict( @@ -1287,7 +1286,7 @@ def callback(_triggers, user_store_data, user_callback_args): in_progress=[val for (_, val, _) in running], progress=progress_value or {}, user_store_data=user_store_data, - error=NoUpdate(), + error=no_update, ) # Check if there is a running calculation that can now @@ -1313,7 +1312,7 @@ def callback(_triggers, user_store_data, user_callback_args): in_progress=[val for (_, val, _) in running], progress=clear_progress, user_store_data=user_store_data, - error=NoUpdate(), + error=no_update, ) self.clientside_callback( diff --git a/dash/long_callback/managers/diskcache_manager.py b/dash/long_callback/managers/diskcache_manager.py index 4528b9d979..4a5b6b76f1 100644 --- a/dash/long_callback/managers/diskcache_manager.py +++ b/dash/long_callback/managers/diskcache_manager.py @@ -7,10 +7,6 @@ _pending_value = "__$pending__" -class OriginalException: - pass - - class DiskcacheLongCallbackManager(BaseLongCallbackManager): def __init__(self, cache=None, cache_by=None, expire=None): """ From 2b22ff3f6af65ace578fd7531f4bb6a3b0587551 Mon Sep 17 00:00:00 2001 From: philippe Date: Mon, 13 Jun 2022 16:27:43 -0400 Subject: [PATCH 11/35] Replace long callback interval with request polling handled in renderer. --- dash/_callback.py | 205 ++++++++++++- dash/_utils.py | 6 + dash/dash-renderer/src/actions/callbacks.ts | 220 +++++++++++--- .../src/observers/prioritizedCallbacks.ts | 6 +- .../src/reducers/callbackJobs.ts | 35 +++ dash/dash-renderer/src/reducers/reducer.js | 3 + dash/dash-renderer/src/types/callbacks.ts | 25 +- dash/dash.py | 282 +++--------------- dash/long_callback/managers/__init__.py | 44 ++- dash/long_callback/managers/celery_manager.py | 26 +- .../managers/diskcache_manager.py | 35 ++- .../long_callback/test_basic_long_callback.py | 9 +- 12 files changed, 575 insertions(+), 321 deletions(-) create mode 100644 dash/dash-renderer/src/reducers/callbackJobs.ts diff --git a/dash/_callback.py b/dash/_callback.py index 20d5548fd8..0144d3b215 100644 --- a/dash/_callback.py +++ b/dash/_callback.py @@ -1,12 +1,14 @@ import collections from functools import wraps +import flask + from .dependencies import ( handle_callback_args, handle_grouped_callback_args, Output, ) -from .exceptions import PreventUpdate +from .exceptions import PreventUpdate, WildcardInLongCallback, DuplicateCallback from ._grouping import ( flatten_grouping, @@ -17,9 +19,11 @@ create_callback_id, stringify_id, to_json, + coerce_to_list, ) from . import _validate +from .long_callback.managers import BaseLongCallbackManager class NoUpdate: @@ -30,7 +34,9 @@ def to_plotly_json(self): # pylint: disable=no-self-use @staticmethod def is_no_update(obj): - return obj == {"_dash_no_update": "_dash_no_update"} + return isinstance(obj, NoUpdate) or obj == { + "_dash_no_update": "_dash_no_update" + } GLOBAL_CALLBACK_LIST = [] @@ -38,7 +44,19 @@ def is_no_update(obj): GLOBAL_INLINE_SCRIPTS = [] -def callback(*_args, **_kwargs): +def callback( + *_args, + long=False, + long_interval=1000, + long_progress=None, + long_progress_default=None, + long_running=None, + long_cancel=None, + long_manager=None, + long_cache_args_to_ignore=None, + config_prevent_initial_callbacks=False, + **_kwargs, +): """ Normally used as a decorator, `@dash.callback` provides a server-side callback relating the values of one or more `Output` items to one or @@ -56,15 +74,79 @@ def callback(*_args, **_kwargs): not to fire when its outputs are first added to the page. Defaults to `False` and unlike `app.callback` is not configurable at the app level. """ + + long_spec = None + + if long: + long_spec = { + "interval": long_interval, + } + + if long_manager: + long_spec["manager"] = long_manager + + if long_progress: + long_spec["progress"] = coerce_to_list(long_progress) + validate_long_inputs(long_spec["progress"]) + + if long_progress_default: + long_spec["progressDefault"] = coerce_to_list(long_progress_default) + + if not len(long_spec["progress"]) == len(long_spec["progressDefault"]): + raise Exception( + "Progress and progress default needs to be of same length" + ) + + if long_running: + long_spec["running"] = coerce_to_list(long_running) + validate_long_inputs(x[0] for x in long_spec["running"]) + + if long_cancel: + cancel_inputs = coerce_to_list(long_cancel) + validate_long_inputs(cancel_inputs) + + cancels_output = [Output(c.component_id, "id") for c in cancel_inputs] + + try: + + @callback(cancels_output, cancel_inputs, prevent_initial_call=True) + def cancel_call(*_): + job_ids = flask.request.args.getlist("cancelJob") + manager = long_manager or flask.g.long_callback_manager + if job_ids: + for job_id in job_ids: + manager.terminate_job(int(job_id)) + return NoUpdate() + + except DuplicateCallback: + pass # Already a callback to cancel, will get the proper jobs from the store. + + long_spec["cancel"] = [c.to_dict() for c in cancel_inputs] + + if long_cache_args_to_ignore: + long_spec["cache_args_to_ignore"] = long_cache_args_to_ignore + return register_callback( GLOBAL_CALLBACK_LIST, GLOBAL_CALLBACK_MAP, - False, + config_prevent_initial_callbacks, *_args, **_kwargs, + long=long_spec, ) +def validate_long_inputs(deps): + for dep in deps: + if dep.has_wildcard(): + raise WildcardInLongCallback( + f""" + long callbacks does not support dependencies with + pattern-matching ids + Received: {repr(dep)}\n""" + ) + + def clientside_callback(clientside_function, *args, **kwargs): return register_clientside_callback( GLOBAL_CALLBACK_LIST, @@ -87,6 +169,7 @@ def insert_callback( state, inputs_state_indices, prevent_initial_call, + long=None, ): if prevent_initial_call is None: prevent_initial_call = config_prevent_initial_callbacks @@ -98,19 +181,26 @@ def insert_callback( "state": [c.to_dict() for c in state], "clientside_function": None, "prevent_initial_call": prevent_initial_call, + "long": long + and { + "interval": long["interval"], + }, } + callback_map[callback_id] = { "inputs": callback_spec["inputs"], "state": callback_spec["state"], "outputs_indices": outputs_indices, "inputs_state_indices": inputs_state_indices, + "long": long, } callback_list.append(callback_spec) return callback_id -def register_callback( +# pylint: disable=R0912, R0915 +def register_callback( # pylint: disable=R0914 callback_list, callback_map, config_prevent_initial_callbacks, *_args, **_kwargs ): ( @@ -129,6 +219,8 @@ def register_callback( insert_output = flatten_grouping(output) multi = True + long = _kwargs.get("long") + output_indices = make_grouping_by_index(output, list(range(grouping_len(output)))) callback_id = insert_callback( callback_list, @@ -140,23 +232,118 @@ def register_callback( flat_state, inputs_state_indices, prevent_initial_call, + long=long, ) # pylint: disable=too-many-locals def wrap_func(func): + + if long is not None: + long_key = BaseLongCallbackManager.register_func( + func, long.get("progress") is not None + ) + @wraps(func) def add_context(*args, **kwargs): output_spec = kwargs.pop("outputs_list") + app_callback_manager = kwargs.pop("long_callback_manager", None) + callback_manager = long and long.get( + "manager", app_callback_manager + ) _validate.validate_output_spec(insert_output, output_spec, Output) func_args, func_kwargs = _validate.validate_and_group_input_args( args, inputs_state_indices ) - # don't touch the comment on the next line - used by debugger - output_value = func(*func_args, **func_kwargs) # %% callback invoked %% + response = {"multi": True} + + if long is not None: + progress_outputs = long.get("progress") + cache_key = flask.request.args.get("cacheKey") + job_id = flask.request.args.get("job") + + current_key = callback_manager.build_cache_key( + func, + # Inputs provided as dict is kwargs. + func_args if func_args else func_kwargs, + long.get("cache_args_to_ignore", []), + ) + + if not cache_key: + cache_key = current_key + + job_fn = callback_manager.func_registry.get(long_key) + + job = callback_manager.call_job_fn( + cache_key, + job_fn, + args, + ) + + data = { + "cacheKey": cache_key, + "job": job, + } + + running = long.get("running") + + if running: + data["running"] = {str(r[0]): r[1] for r in running} + data["runningOff"] = {str(r[0]): r[2] for r in running} + cancel = long.get("cancel") + if cancel: + data["cancel"] = cancel + + progress_default = long.get("progressDefault") + if progress_default: + data["progressDefault"] = { + str(o): x + for o, x in zip(progress_outputs, progress_default) + } + return to_json(data) + if progress_outputs: + # Get the progress before the result as it would be erased after the results. + progress = callback_manager.get_progress(cache_key) + if progress: + response["progress"] = { + str(x): progress[i] + for i, x in enumerate(progress_outputs) + } + + output_value = callback_manager.get_result(cache_key, job_id) + # Must get job_running after get_result since get_results terminates it. + job_running = callback_manager.job_running(job_id) + if not job_running and output_value is callback_manager.UNDEFINED: + # Job canceled -> no output to close the loop. + output_value = NoUpdate() + + elif ( + isinstance(output_value, dict) + and "long_callback_error" in output_value + ): + error = output_value.get("long_callback_error") + raise Exception( + f"An error occurred inside a long callback: {error['msg']}\n{error['tb']}" + ) + + if job_running and output_value is not callback_manager.UNDEFINED: + # cached results. + callback_manager.terminate_job(job_id) + + if multi and isinstance(output_value, (list, tuple)): + output_value = [ + NoUpdate() if NoUpdate.is_no_update(r) else r + for r in output_value + ] + + if output_value is callback_manager.UNDEFINED: + return to_json(response) + else: + # don't touch the comment on the next line - used by debugger + output_value = func(*func_args, **func_kwargs) # %% callback invoked %% - if isinstance(output_value, NoUpdate): + if NoUpdate.is_no_update(output_value): raise PreventUpdate if not multi: @@ -191,7 +378,7 @@ def add_context(*args, **kwargs): if not has_update: raise PreventUpdate - response = {"response": component_ids, "multi": True} + response["response"] = component_ids try: jsonResponse = to_json(response) diff --git a/dash/_utils.py b/dash/_utils.py index aa0470f43d..4ce9697d0c 100644 --- a/dash/_utils.py +++ b/dash/_utils.py @@ -217,3 +217,9 @@ def gen_salt(chars): return "".join( secrets.choice(string.ascii_letters + string.digits) for _ in range(chars) ) + + +def coerce_to_list(obj): + if not isinstance(obj, (list, tuple)): + return [obj] + return obj diff --git a/dash/dash-renderer/src/actions/callbacks.ts b/dash/dash-renderer/src/actions/callbacks.ts index 4fafaab16b..8393c8262b 100644 --- a/dash/dash-renderer/src/actions/callbacks.ts +++ b/dash/dash-renderer/src/actions/callbacks.ts @@ -1,12 +1,15 @@ import { concat, flatten, + intersection, keys, map, mergeDeepRight, path, pick, pluck, + values, + toPairs, zip } from 'ramda'; @@ -24,13 +27,18 @@ import { ICallbackPayload, IStoredCallback, IBlockedCallback, - IPrioritizedCallback + IPrioritizedCallback, + LongCallbackInfo, + CallbackResponse, + CallbackResponseData } from '../types/callbacks'; import {isMultiValued, stringifyId, isMultiOutputProp} from './dependencies'; import {urlBase} from './utils'; import {getCSRFHeader} from '.'; import {createAction, Action} from 'redux-actions'; import {addHttpHeaders} from '../actions'; +import {updateProps} from './index'; +import {CallbackJobPayload} from '../reducers/callbackJobs'; export const addBlockedCallbacks = createAction( CallbackActionType.AddBlocked @@ -83,6 +91,9 @@ export const aggregateCallbacks = createAction< const updateResourceUsage = createAction('UPDATE_RESOURCE_USAGE'); +const addCallbackJob = createAction('ADD_CALLBACK_JOB'); +const removeCallbackJob = createAction('REMOVE_CALLBACK_JOB'); + function unwrapIfNotMulti( paths: any, idProps: any, @@ -300,28 +311,73 @@ async function handleClientside( return result; } +function sideUpdate(outputs: any, dispatch: any, paths: any) { + toPairs(outputs).forEach(([id, value]) => { + const [componentId, propName] = id.split('.'); + const componentPath = paths.strs[componentId]; + dispatch( + updateProps({ + props: {[propName]: value}, + itempath: componentPath + }) + ); + }); +} + function handleServerside( dispatch: any, hooks: any, config: any, - payload: any -): Promise { + payload: any, + paths: any, + long?: LongCallbackInfo, + additionalArgs?: [string, string][] +): Promise { if (hooks.request_pre) { hooks.request_pre(payload); } const requestTime = Date.now(); const body = JSON.stringify(payload); + let cacheKey: string; + let job: string; + let runningOff: any; + let progressDefault: any; + + const fetchCallback = () => { + const headers = getCSRFHeader() as any; + let url = `${urlBase(config)}_dash-update-component`; + + const addArg = (name: string, value: string) => { + let delim = '?'; + if (url.includes('?')) { + delim = '&'; + } + url = `${url}${delim}${name}=${value}`; + }; + if (cacheKey) { + addArg('cacheKey', cacheKey); + } + if (job) { + addArg('job', job); + } + + if (additionalArgs) { + additionalArgs.forEach(([key, value]) => addArg(key, value)); + } + + return fetch( + url, + mergeDeepRight(config.fetch, { + method: 'POST', + headers, + body + }) + ); + }; - return fetch( - `${urlBase(config)}_dash-update-component`, - mergeDeepRight(config.fetch, { - method: 'POST', - headers: getCSRFHeader() as any, - body - }) - ).then( - (res: any) => { + return new Promise((resolve, reject) => { + const handleOutput = (res: any) => { const {status} = res; function recordProfile(result: any) { @@ -361,36 +417,86 @@ function handleServerside( } } + const finishLine = (data: CallbackResponseData) => { + const {multi, response} = data; + if (hooks.request_post) { + hooks.request_post(payload, response); + } + + let result; + if (multi) { + result = response as CallbackResponse; + } else { + const {output} = payload; + const id = output.substr(0, output.lastIndexOf('.')); + result = {[id]: (response as CallbackResponse).props}; + } + + recordProfile(result); + resolve(result); + }; + + const completeJob = () => { + if (job) { + dispatch(removeCallbackJob({jobId: job})); + } + if (runningOff) { + sideUpdate(runningOff, dispatch, paths); + } + if (progressDefault) { + sideUpdate(progressDefault, dispatch, paths); + } + }; + if (status === STATUS.OK) { - return res.json().then((data: any) => { - const {multi, response} = data; - if (hooks.request_post) { - hooks.request_post(payload, response); + res.json().then((data: CallbackResponseData) => { + if (!cacheKey && data.cacheKey) { + cacheKey = data.cacheKey; } - let result; - if (multi) { - result = response; - } else { - const {output} = payload; - const id = output.substr(0, output.lastIndexOf('.')); - result = {[id]: response.props}; + if (!job && data.job) { + const jobInfo: CallbackJobPayload = { + jobId: data.job, + cacheKey: data.cacheKey as string, + cancelInputs: data.cancel, + progressDefault: data.progressDefault + }; + dispatch(addCallbackJob(jobInfo)); + job = data.job; } - recordProfile(result); - return result; + if (data.progress) { + sideUpdate(data.progress, dispatch, paths); + } + if (data.running) { + sideUpdate(data.running, dispatch, paths); + } + if (!runningOff && data.runningOff) { + runningOff = data.runningOff; + } + if (!progressDefault && data.progressDefault) { + progressDefault = data.progressDefault; + } + + if (!long || data.response !== undefined) { + completeJob(); + finishLine(data); + } else { + // Poll chain. + setTimeout(handle, long.interval || 500); + } }); - } - if (status === STATUS.PREVENT_UPDATE) { + } else if (status === STATUS.PREVENT_UPDATE) { + completeJob(); recordProfile({}); - return {}; + resolve({}); + } else { + completeJob(); + reject(res); } - throw res; - }, - () => { - // fetch rejection - this means the request didn't return, - // we don't get here from 400/500 errors, only network - // errors or unresponsive servers. + }; + + const handleError = () => { if (config.ui) { dispatch( updateResourceUsage({ @@ -402,9 +508,14 @@ function handleServerside( }) ); } - throw new Error('Callback failed: the server did not respond.'); - } - ); + reject(new Error('Callback failed: the server did not respond.')); + }; + + const handle = () => { + fetchCallback().then(handleOutput, handleError); + }; + handle(); + }); } function inputsToDict(inputs_list: any) { @@ -443,10 +554,10 @@ export function executeCallback( paths: any, layout: any, {allOutputs}: any, - dispatch: any + dispatch: any, + getState: any ): IExecutingCallback { - const {output, inputs, state, clientside_function} = cb.callback; - + const {output, inputs, state, clientside_function, long} = cb.callback; try { const inVals = fillVals(paths, layout, cb, inputs, 'Input', true); @@ -518,13 +629,40 @@ export function executeCallback( let newHeaders: Record | null = null; let lastError: any; + const additionalArgs: [string, string][] = []; + values(getState().callbackJobs).forEach( + (job: CallbackJobPayload) => { + if (!job.cancelInputs) { + return; + } + const inter = intersection( + job.cancelInputs, + cb.callback.inputs + ); + if (inter.length) { + additionalArgs.push(['cancelJob', job.jobId]); + if (job.progressDefault) { + console.log(job.progressDefault); + sideUpdate( + job.progressDefault, + dispatch, + paths + ); + } + } + } + ); + for (let retry = 0; retry <= MAX_AUTH_RETRIES; retry++) { try { const data = await handleServerside( dispatch, hooks, newConfig, - payload + payload, + paths, + long, + additionalArgs.length ? additionalArgs : undefined ); if (newHeaders) { diff --git a/dash/dash-renderer/src/observers/prioritizedCallbacks.ts b/dash/dash-renderer/src/observers/prioritizedCallbacks.ts index 7407e24271..bf557913f8 100644 --- a/dash/dash-renderer/src/observers/prioritizedCallbacks.ts +++ b/dash/dash-renderer/src/observers/prioritizedCallbacks.ts @@ -110,7 +110,8 @@ const observer: IStoreObserverDefinition = { paths, layout, getStash(cb, paths), - dispatch + dispatch, + getState ), pickedSyncCallbacks ) @@ -162,7 +163,8 @@ const observer: IStoreObserverDefinition = { paths, layout, cb, - dispatch + dispatch, + getState ); dispatch( diff --git a/dash/dash-renderer/src/reducers/callbackJobs.ts b/dash/dash-renderer/src/reducers/callbackJobs.ts new file mode 100644 index 0000000000..00398eaef0 --- /dev/null +++ b/dash/dash-renderer/src/reducers/callbackJobs.ts @@ -0,0 +1,35 @@ +import {assoc, dissoc} from 'ramda'; +import {ICallbackProperty} from '../types/callbacks'; + +type CallbackJobState = {[k: string]: CallbackJobPayload}; + +export type CallbackJobPayload = { + cancelInputs?: ICallbackProperty[]; + cacheKey: string; + jobId: string; + progressDefault?: any; +}; + +type CallbackJobAction = { + type: 'ADD_CALLBACK_JOB' | 'REMOVE_CALLBACK_JOB'; + payload: CallbackJobPayload; +}; + +const setJob = (job: CallbackJobPayload, state: CallbackJobState) => + assoc(job.jobId, job, state); +const removeJob = (jobId: string, state: CallbackJobState) => + dissoc(jobId, state); + +export default function ( + state: CallbackJobState = {}, + action: CallbackJobAction +) { + switch (action.type) { + case 'ADD_CALLBACK_JOB': + return setJob(action.payload, state); + case 'REMOVE_CALLBACK_JOB': + return removeJob(action.payload.jobId, state); + default: + return state; + } +} diff --git a/dash/dash-renderer/src/reducers/reducer.js b/dash/dash-renderer/src/reducers/reducer.js index 0e0d844704..97b71b6bce 100644 --- a/dash/dash-renderer/src/reducers/reducer.js +++ b/dash/dash-renderer/src/reducers/reducer.js @@ -17,6 +17,7 @@ import isLoading from './isLoading'; import layout from './layout'; import loadingMap from './loadingMap'; import paths from './paths'; +import callbackJobs from './callbackJobs'; export const apiRequests = [ 'dependenciesRequest', @@ -45,6 +46,8 @@ function mainReducer() { parts[r] = createApiReducer(r); }, apiRequests); + parts.callbackJobs = callbackJobs; + return combineReducers(parts); } diff --git a/dash/dash-renderer/src/types/callbacks.ts b/dash/dash-renderer/src/types/callbacks.ts index 5e286fca97..0c51e13c85 100644 --- a/dash/dash-renderer/src/types/callbacks.ts +++ b/dash/dash-renderer/src/types/callbacks.ts @@ -11,6 +11,7 @@ export interface ICallbackDefinition { outputs: ICallbackProperty[]; prevent_initial_call: boolean; state: ICallbackProperty[]; + long?: LongCallbackInfo; } export interface ICallbackProperty { @@ -75,7 +76,29 @@ export interface ICallbackPayload { } export type CallbackResult = { - data?: any; + data?: CallbackResponse; error?: Error; payload: ICallbackPayload | null; }; + +export type LongCallbackInfo = { + interval?: number; + progress?: any; + running?: any; +}; + +export type CallbackResponse = { + [k: string]: any; +}; + +export type CallbackResponseData = { + response?: CallbackResponse; + multi?: boolean; + cacheKey?: string; + job?: string; + progressDefault?: CallbackResponse; + progress?: CallbackResponse; + running?: CallbackResponse; + runningOff?: CallbackResponse; + cancel?: ICallbackProperty[]; +}; diff --git a/dash/dash.py b/dash/dash.py index 33775a1732..d2cfc28042 100644 --- a/dash/dash.py +++ b/dash/dash.py @@ -27,9 +27,7 @@ from .fingerprint import build_fingerprint, check_fingerprint from .resources import Scripts, Css from .dependencies import ( - handle_grouped_callback_args, Output, - State, Input, ) from .development.base_component import ComponentRegistry @@ -61,7 +59,7 @@ from . import _watch from . import _get_app -from ._grouping import flatten_grouping, map_grouping, grouping_len, update_args_group +from ._grouping import map_grouping, grouping_len, update_args_group from . import _pages from ._pages import ( @@ -1118,6 +1116,7 @@ def clientside_callback(self, clientside_function, *args, **kwargs): **kwargs, ) + # pylint: disable=R0201 def callback(self, *_args, **_kwargs): """ Normally used as a decorator, `@app.callback` provides a server-side @@ -1132,16 +1131,28 @@ def callback(self, *_args, **_kwargs): """ - return _callback.register_callback( - self._callback_list, - self.callback_map, - self.config.prevent_initial_callbacks, + return _callback.callback( *_args, + config_prevent_initial_callbacks=self.config.prevent_initial_callbacks, **_kwargs, ) - def long_callback(self, *_args, **_kwargs): # pylint: disable=too-many-statements + # pylint: disable=R0201 + def long_callback( + self, + *_args, + manager=None, + interval=None, + running=None, + cancel=None, + progress=None, + progress_default=None, + cache_args_to_ignore=None, + **_kwargs, + ): # pylint: disable=too-many-statements """ + Deprecation Notice, use long + Normally used as a decorator, `@app.long_callback` is an alternative to `@app.callback` designed for callbacks that take a long time to run, without locking up the Dash app or timing out. @@ -1200,242 +1211,19 @@ def long_callback(self, *_args, **_kwargs): # pylint: disable=too-many-statemen this should be a list of argument names as strings. Otherwise, this should be a list of argument indices as integers. """ - # pylint: disable-next=import-outside-toplevel - from dash._callback_context import callback_context - - # pylint: disable-next=import-outside-toplevel - from dash.exceptions import WildcardInLongCallback - - # Get long callback manager - callback_manager = _kwargs.pop("manager", self._long_callback_manager) - if callback_manager is None: - raise ValueError( - "The @app.long_callback decorator requires a long callback manager\n" - "instance. This may be provided to the app using the \n" - "long_callback_manager argument to the dash.Dash constructor, or\n" - "it may be provided to the @app.long_callback decorator as the \n" - "manager argument" - ) - - # Extract special long_callback kwargs - running = _kwargs.pop("running", ()) - cancel = _kwargs.pop("cancel", ()) - progress = _kwargs.pop("progress", ()) - progress_default = _kwargs.pop("progress_default", None) - interval_time = _kwargs.pop("interval", 1000) - cache_args_to_ignore = _kwargs.pop("cache_args_to_ignore", []) - - # Parse remaining args just like app.callback - ( - output, - flat_inputs, - flat_state, - inputs_state_indices, - prevent_initial_call, - ) = handle_grouped_callback_args(_args, _kwargs) - inputs_and_state = flat_inputs + flat_state - args_deps = map_grouping(lambda i: inputs_and_state[i], inputs_state_indices) - multi_output = isinstance(output, (list, tuple)) and len(output) > 1 - - # Disallow wildcard dependencies - for deps in [output, flat_inputs, flat_state]: - for dep in flatten_grouping(deps): - if dep.has_wildcard(): - raise WildcardInLongCallback( - f""" - @app.long_callback does not support dependencies with - pattern-matching ids - Received: {repr(dep)}\n""" - ) - - # Get unique id for this long_callback definition. This increment is not - # thread safe, but it doesn't need to be because callback definitions - # happen on the main thread before the app starts - self._long_callback_count += 1 - long_callback_id = self._long_callback_count - - # Create Interval and Store for long callback and add them to the app's - # _extra_components list - interval_id = f"_long_callback_interval_{long_callback_id}" - interval_component = dcc.Interval( - id=interval_id, interval=interval_time, disabled=prevent_initial_call - ) - store_id = f"_long_callback_store_{long_callback_id}" - store_component = dcc.Store(id=store_id, data={}) - error_id = f"_long_callback_error_{long_callback_id}" - error_store_component = dcc.Store(id=error_id) - error_dummy = f"_long_callback_error_dummy_{long_callback_id}" - self._extra_components.extend( - [ - interval_component, - store_component, - error_store_component, - dcc.Store(id=error_dummy), - ] - ) - - # Compute full component plus property name for the cancel dependencies - cancel_prop_ids = tuple( - ".".join([dep.component_id, dep.component_property]) for dep in cancel + return _callback.callback( + *_args, + long=True, + long_manager=manager, + long_interval=interval, + long_progress=progress, + long_progress_default=progress_default, + long_running=running, + long_cancel=cancel, + long_cache_args_to_ignore=cache_args_to_ignore, + **_kwargs, ) - def wrapper(fn): - background_fn = callback_manager.make_job_fn(fn, bool(progress), args_deps) - - def callback(_triggers, user_store_data, user_callback_args): - # Build result cache key from inputs - pending_key = callback_manager.build_cache_key( - fn, user_callback_args, cache_args_to_ignore - ) - current_key = user_store_data.get("current_key", None) - pending_job = user_store_data.get("pending_job", None) - - should_cancel = pending_key == current_key or any( - trigger["prop_id"] in cancel_prop_ids - for trigger in callback_context.triggered - ) - - # Compute grouping of values to set the progress component's to - # when cleared - if progress_default is None: - clear_progress = ( - map_grouping(lambda x: None, progress) if progress else () - ) - else: - clear_progress = progress_default - - if should_cancel: - user_store_data["current_key"] = None - user_store_data["pending_key"] = None - user_store_data["pending_job"] = None - - callback_manager.terminate_job(pending_job) - - return dict( - user_callback_output=map_grouping(lambda x: no_update, output), - interval_disabled=True, - in_progress=[val for (_, _, val) in running], - progress=clear_progress, - user_store_data=user_store_data, - error=no_update, - ) - - # Look up progress value if a job is in progress - if pending_job: - progress_value = callback_manager.get_progress(pending_key) - else: - progress_value = None - - if callback_manager.result_ready(pending_key): - result = callback_manager.get_result(pending_key, pending_job) - # Set current key (hash of data stored in client) - # to pending key (hash of data requested by client) - user_store_data["current_key"] = pending_key - - if isinstance(result, dict) and result.get("long_callback_error"): - error = result.get("long_callback_error") - print( - result["long_callback_error"]["tb"], - file=sys.stderr, - ) - return dict( - error=f"An error occurred inside a long callback: {error['msg']}\n" - + error["tb"], - user_callback_output=no_update, - in_progress=[val for (_, _, val) in running], - interval_disabled=pending_job is None, - progress=clear_progress, - user_store_data=user_store_data, - ) - - if _callback.NoUpdate.is_no_update(result): - result = no_update - - if multi_output and isinstance(result, (list, tuple)): - result = [ - no_update if _callback.NoUpdate.is_no_update(r) else r - for r in result - ] - - # Disable interval if this value was pulled from cache. - # If this value was the result of a background calculation, don't - # disable yet. If no other calculations are in progress, - # interval will be disabled in should_cancel logic above - # the next time the interval fires. - interval_disabled = pending_job is None - return dict( - user_callback_output=result, - interval_disabled=interval_disabled, - in_progress=[val for (_, _, val) in running], - progress=clear_progress, - user_store_data=user_store_data, - error=no_update, - ) - if progress_value: - return dict( - user_callback_output=map_grouping(lambda x: no_update, output), - interval_disabled=False, - in_progress=[val for (_, val, _) in running], - progress=progress_value or {}, - user_store_data=user_store_data, - error=no_update, - ) - - # Check if there is a running calculation that can now - # be canceled - old_pending_key = user_store_data.get("pending_key", None) - if ( - old_pending_key - and old_pending_key != pending_key - and callback_manager.job_running(pending_job) - ): - callback_manager.terminate_job(pending_job) - - user_store_data["pending_key"] = pending_key - callback_manager.terminate_unhealthy_job(pending_job) - if not callback_manager.job_running(pending_job): - user_store_data["pending_job"] = callback_manager.call_job_fn( - pending_key, background_fn, user_callback_args - ) - - return dict( - user_callback_output=map_grouping(lambda x: no_update, output), - interval_disabled=False, - in_progress=[val for (_, val, _) in running], - progress=clear_progress, - user_store_data=user_store_data, - error=no_update, - ) - - self.clientside_callback( - "function (error) {throw new Error(error)}", - Output(error_dummy, "data"), - [Input(error_id, "data")], - prevent_initial_call=True, - ) - - return self.callback( - inputs=dict( - _triggers=dict( - n_intervals=Input(interval_id, "n_intervals"), - cancel=cancel, - ), - user_store_data=State(store_id, "data"), - user_callback_args=args_deps, - ), - output=dict( - user_callback_output=output, - interval_disabled=Output(interval_id, "disabled"), - in_progress=[dep for (dep, _, _) in running], - progress=progress, - user_store_data=Output(store_id, "data"), - error=Output(error_id, "data"), - ), - prevent_initial_call=prevent_initial_call, - )(callback) - - return wrapper - def dispatch(self): body = flask.request.get_json() @@ -1455,6 +1243,7 @@ def dispatch(self): flask.g.state_values = inputs_to_dict( # pylint: disable=assigning-non-slot state ) + flask.g.long_callback_manager = self._long_callback_manager # pylint: disable=E0237 changed_props = body.get("changedPropIds", []) flask.g.triggered_inputs = [ # pylint: disable=assigning-non-slot {"prop_id": x, "value": input_values.get(x)} for x in changed_props @@ -1517,7 +1306,14 @@ def dispatch(self): except KeyError as missing_callback_function: msg = f"Callback function not found for output '{output}', perhaps you forgot to prepend the '@'?" raise KeyError(msg) from missing_callback_function - response.set_data(func(*args, outputs_list=outputs_list)) + # noinspection PyArgumentList + response.set_data( + func( + *args, + outputs_list=outputs_list, + long_callback_manager=self._long_callback_manager, + ) + ) return response def _setup_server(self): diff --git a/dash/long_callback/managers/__init__.py b/dash/long_callback/managers/__init__.py index b6b05e89a8..c0bcac44af 100644 --- a/dash/long_callback/managers/__init__.py +++ b/dash/long_callback/managers/__init__.py @@ -4,12 +4,29 @@ class BaseLongCallbackManager(ABC): + UNDEFINED = object() + + # Keep a ref to all the ref to register every callback to every manager. + managers = [] + + # Keep every function for late registering. + functions = [] + def __init__(self, cache_by): if cache_by is not None and not isinstance(cache_by, list): cache_by = [cache_by] self.cache_by = cache_by + BaseLongCallbackManager.managers.append(self) + + self.func_registry = {} + + # Register all funcs that were added before instantiation. + # Ensure all celery task are registered. + for fdetails in self.functions: + self.register(*fdetails) + def terminate_job(self, job): raise NotImplementedError @@ -19,7 +36,7 @@ def terminate_unhealthy_job(self, job): def job_running(self, job): raise NotImplementedError - def make_job_fn(self, fn, progress, args_deps): + def make_job_fn(self, fn, progress): raise NotImplementedError def call_job_fn(self, key, job_fn, args): @@ -58,6 +75,31 @@ def build_cache_key(self, fn, args, cache_args_to_ignore): return hashlib.sha1(str(hash_dict).encode("utf-8")).hexdigest() + def register(self, key, fn, progress): + self.func_registry[key] = self.make_job_fn(fn, progress) + + @staticmethod + def register_func(fn, progress): + key = BaseLongCallbackManager.hash_function(fn) + BaseLongCallbackManager.functions.append( + ( + key, + fn, + progress, + ) + ) + + for manager in BaseLongCallbackManager.managers: + manager.register(key, fn, progress) + + return key + @staticmethod def _make_progress_key(key): return key + "-progress" + + @staticmethod + def hash_function(fn): + fn_source = inspect.getsource(fn) + fn_str = fn_source + return hashlib.sha1(fn_str.encode("utf-8")).hexdigest() diff --git a/dash/long_callback/managers/celery_manager.py b/dash/long_callback/managers/celery_manager.py index b65622c324..2f05dba898 100644 --- a/dash/long_callback/managers/celery_manager.py +++ b/dash/long_callback/managers/celery_manager.py @@ -5,7 +5,6 @@ from _plotly_utils.utils import PlotlyJSONEncoder -from dash._callback import NoUpdate from dash.exceptions import PreventUpdate from dash.long_callback.managers import BaseLongCallbackManager @@ -48,9 +47,9 @@ def __init__(self, celery_app, cache_by=None, expire=None): if isinstance(celery_app.backend, DisabledBackend): raise ValueError("Celery instance must be configured with a result backend") - super().__init__(cache_by) self.handle = celery_app self.expire = expire + super().__init__(cache_by) def terminate_job(self, job): if job is None: @@ -74,8 +73,8 @@ def job_running(self, job): "PROGRESS", ) - def make_job_fn(self, fn, progress, args_deps): - return _make_job_fn(fn, self.handle, progress, args_deps) + def make_job_fn(self, fn, progress): + return _make_job_fn(fn, self.handle, progress) def get_task(self, job): if job: @@ -105,7 +104,7 @@ def get_result(self, key, job): # Get result value result = self.handle.backend.get(key) if result is None: - return None + return self.UNDEFINED result = json.loads(result) @@ -122,7 +121,7 @@ def get_result(self, key, job): return result -def _make_job_fn(fn, celery_app, progress, args_deps): +def _make_job_fn(fn, celery_app, progress): cache = celery_app.backend # Hash function source and module to create a unique (but stable) celery task name @@ -133,19 +132,28 @@ def _make_job_fn(fn, celery_app, progress, args_deps): @celery_app.task(name=f"long_callback_{fn_hash}") def job_fn(result_key, progress_key, user_callback_args, fn=fn): def _set_progress(progress_value): + if not isinstance(progress_value, (list, tuple)): + progress_value = [progress_value] + cache.set(progress_key, json.dumps(progress_value, cls=PlotlyJSONEncoder)) maybe_progress = [_set_progress] if progress else [] try: - if isinstance(args_deps, dict): + if isinstance(user_callback_args, dict): user_callback_output = fn(*maybe_progress, **user_callback_args) - elif isinstance(args_deps, (list, tuple)): + elif isinstance(user_callback_args, (list, tuple)): user_callback_output = fn(*maybe_progress, *user_callback_args) else: user_callback_output = fn(*maybe_progress, user_callback_args) except PreventUpdate: - cache.set(result_key, json.dumps(NoUpdate(), cls=PlotlyJSONEncoder)) + # Put NoUpdate dict directly to avoid circular imports. + cache.set( + result_key, + json.dumps( + {"_dash_no_update": "_dash_no_update"}, cls=PlotlyJSONEncoder + ), + ) except Exception as err: # pylint: disable=broad-except cache.set( result_key, diff --git a/dash/long_callback/managers/diskcache_manager.py b/dash/long_callback/managers/diskcache_manager.py index 4a5b6b76f1..68bf4fdc52 100644 --- a/dash/long_callback/managers/diskcache_manager.py +++ b/dash/long_callback/managers/diskcache_manager.py @@ -1,7 +1,6 @@ import traceback from . import BaseLongCallbackManager -from ..._callback import NoUpdate from ...exceptions import PreventUpdate _pending_value = "__$pending__" @@ -48,9 +47,9 @@ def __init__(self, cache=None, cache_by=None, expire=None): ) self.handle = cache - super().__init__(cache_by) - self.expire = expire self.lock = diskcache.Lock(self.handle, "long-callback-lock") + self.expire = expire + super().__init__(cache_by) def terminate_job(self, job): import psutil # pylint: disable=import-outside-toplevel,import-error @@ -58,6 +57,8 @@ def terminate_job(self, job): if job is None: return + job = int(job) + # Use diskcache transaction so multiple process don't try to kill the # process at the same time with self.handle.transact(): @@ -83,6 +84,8 @@ def terminate_job(self, job): def terminate_unhealthy_job(self, job): import psutil # pylint: disable=import-outside-toplevel,import-error + job = int(job) + if job and psutil.pid_exists(job): if not self.job_running(job): self.terminate_job(job) @@ -93,13 +96,15 @@ def terminate_unhealthy_job(self, job): def job_running(self, job): import psutil # pylint: disable=import-outside-toplevel,import-error + job = int(job) + if job and psutil.pid_exists(job): proc = psutil.Process(job) return proc.status() != psutil.STATUS_ZOMBIE return False - def make_job_fn(self, fn, progress, args_deps): - return _make_job_fn(fn, self.handle, progress, args_deps, self.lock) + def make_job_fn(self, fn, progress): + return _make_job_fn(fn, self.handle, progress, self.lock) def clear_cache_entry(self, key): self.handle.delete(key) @@ -122,9 +127,9 @@ def result_ready(self, key): def get_result(self, key, job): # Get result value - result = self.handle.get(key) - if result is None: - return None + result = self.handle.get(key, self.UNDEFINED) + if result is self.UNDEFINED: + return self.UNDEFINED # Clear result if not caching if self.cache_by is None: @@ -135,28 +140,32 @@ def get_result(self, key, job): self.clear_cache_entry(self._make_progress_key(key)) - self.terminate_job(job) + if job: + self.terminate_job(job) return result -def _make_job_fn(fn, cache, progress, args_deps, lock): +def _make_job_fn(fn, cache, progress, lock): def job_fn(result_key, progress_key, user_callback_args): def _set_progress(progress_value): + if not isinstance(progress_value, (list, tuple)): + progress_value = [progress_value] + with lock: cache.set(progress_key, progress_value) maybe_progress = [_set_progress] if progress else [] try: - if isinstance(args_deps, dict): + if isinstance(user_callback_args, dict): user_callback_output = fn(*maybe_progress, **user_callback_args) - elif isinstance(args_deps, (list, tuple)): + elif isinstance(user_callback_args, (list, tuple)): user_callback_output = fn(*maybe_progress, *user_callback_args) else: user_callback_output = fn(*maybe_progress, user_callback_args) except PreventUpdate: with lock: - cache.set(result_key, NoUpdate()) + cache.set(result_key, {"_dash_no_update": "_dash_no_update"}) except Exception as err: # pylint: disable=broad-except with lock: cache.set( diff --git a/tests/integration/long_callback/test_basic_long_callback.py b/tests/integration/long_callback/test_basic_long_callback.py index bb04cd54a6..4b8979b9ef 100644 --- a/tests/integration/long_callback/test_basic_long_callback.py +++ b/tests/integration/long_callback/test_basic_long_callback.py @@ -451,9 +451,14 @@ def click_n_wait(): dash_duo.wait_for_text_to_equal("#output", "Clicked 1 times") click_n_wait() - dash_duo.wait_for_contains_text( - ".dash-fe-error__title", "An error occurred inside a long callback:" + dash_duo.wait_for_element(".dash-fe-error__title").click() + + dash_duo.driver.switch_to.frame(dash_duo.find_element("iframe")) + assert ( + "Exception: An error occurred inside a long callback:" + in dash_duo.wait_for_element(".errormsg").text ) + dash_duo.driver.switch_to.default_content() click_n_wait() dash_duo.wait_for_text_to_equal("#output", "Clicked 3 times") From c5f6fffc12d5517b59717e7ed87d651daa0fb26d Mon Sep 17 00:00:00 2001 From: philippe Date: Mon, 13 Jun 2022 19:34:38 -0400 Subject: [PATCH 12/35] Fix test_grouped_callbacks --- tests/unit/dash/test_grouped_callbacks.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/unit/dash/test_grouped_callbacks.py b/tests/unit/dash/test_grouped_callbacks.py index cdbe9dc124..5578f85b2e 100644 --- a/tests/unit/dash/test_grouped_callbacks.py +++ b/tests/unit/dash/test_grouped_callbacks.py @@ -1,6 +1,7 @@ import dash from dash._grouping import make_grouping_by_index, grouping_len, flatten_grouping from dash._utils import create_callback_id +from dash._callback import GLOBAL_CALLBACK_MAP from dash.dependencies import Input, State, Output, ClientsideFunction import mock import json @@ -50,7 +51,7 @@ def check_output_for_grouping(grouping): Input("input-a", "prop"), )(mock_fn) - wrapped_fn = app.callback_map[callback_id]["callback"] + wrapped_fn = GLOBAL_CALLBACK_MAP[callback_id]["callback"] expected_outputs = [ (dep.component_id, dep.component_property, val) @@ -108,7 +109,7 @@ def check_callback_inputs_for_grouping(grouping): inputs, )(mock_fn) - wrapped_fn = app.callback_map["output-a.prop"]["callback"] + wrapped_fn = GLOBAL_CALLBACK_MAP["output-a.prop"]["callback"] flat_input_state_values = flatten_grouping(grouping) flat_input_values = flat_input_state_values[0::2] From fc9a77bc63758cfa9ab3449123f81d25b2caca02 Mon Sep 17 00:00:00 2001 From: philippe Date: Tue, 14 Jun 2022 10:20:05 -0400 Subject: [PATCH 13/35] Fix cbva002 --- tests/integration/callbacks/test_validation.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/integration/callbacks/test_validation.py b/tests/integration/callbacks/test_validation.py index 829cc4b009..d437e04965 100644 --- a/tests/integration/callbacks/test_validation.py +++ b/tests/integration/callbacks/test_validation.py @@ -1,6 +1,7 @@ import pytest from dash import Dash, Input, Output, State, html +from dash._callback import GLOBAL_CALLBACK_MAP from dash.exceptions import InvalidCallbackReturnValue, IncorrectTypeException @@ -77,7 +78,7 @@ def test_cbva002_callback_return_validation(): def single(a): return set([1]) - single_wrapped = app.callback_map["b.children"]["callback"] + single_wrapped = GLOBAL_CALLBACK_MAP["b.children"]["callback"] with pytest.raises(InvalidCallbackReturnValue): # outputs_list (normally callback_context.outputs_list) is provided From 40bd1731274193148c1946982c1b917daff3d3b9 Mon Sep 17 00:00:00 2001 From: philippe Date: Tue, 14 Jun 2022 10:22:19 -0400 Subject: [PATCH 14/35] Fix celery cancel. --- dash/_callback.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dash/_callback.py b/dash/_callback.py index 0144d3b215..23b786f759 100644 --- a/dash/_callback.py +++ b/dash/_callback.py @@ -115,7 +115,7 @@ def cancel_call(*_): manager = long_manager or flask.g.long_callback_manager if job_ids: for job_id in job_ids: - manager.terminate_job(int(job_id)) + manager.terminate_job(job_id) return NoUpdate() except DuplicateCallback: From 28a6d7780798f7b33139f961d9f15e6a57d4b363 Mon Sep 17 00:00:00 2001 From: philippe Date: Tue, 14 Jun 2022 13:32:57 -0400 Subject: [PATCH 15/35] Fix callback_map --- dash/_callback.py | 14 ++++++++++++-- dash/dash.py | 2 ++ tests/integration/callbacks/test_validation.py | 3 +-- tests/unit/dash/test_grouped_callbacks.py | 5 ++--- 4 files changed, 17 insertions(+), 7 deletions(-) diff --git a/dash/_callback.py b/dash/_callback.py index 23b786f759..5f236c6409 100644 --- a/dash/_callback.py +++ b/dash/_callback.py @@ -44,6 +44,7 @@ def is_no_update(obj): GLOBAL_INLINE_SCRIPTS = [] +# pylint: disable=too-many-locals def callback( *_args, long=False, @@ -55,6 +56,8 @@ def callback( long_manager=None, long_cache_args_to_ignore=None, config_prevent_initial_callbacks=False, + callback_map=None, + callback_list=None, **_kwargs, ): """ @@ -126,9 +129,16 @@ def cancel_call(*_): if long_cache_args_to_ignore: long_spec["cache_args_to_ignore"] = long_cache_args_to_ignore + call_list = GLOBAL_CALLBACK_LIST + if callback_list is not None: + call_list = callback_list + call_map = GLOBAL_CALLBACK_MAP + if callback_map is not None: + call_map = callback_map + return register_callback( - GLOBAL_CALLBACK_LIST, - GLOBAL_CALLBACK_MAP, + call_list, + call_map, config_prevent_initial_callbacks, *_args, **_kwargs, diff --git a/dash/dash.py b/dash/dash.py index d2cfc28042..da824fdadf 100644 --- a/dash/dash.py +++ b/dash/dash.py @@ -1134,6 +1134,8 @@ def callback(self, *_args, **_kwargs): return _callback.callback( *_args, config_prevent_initial_callbacks=self.config.prevent_initial_callbacks, + callback_list=self._callback_list, + callback_map=self.callback_map, **_kwargs, ) diff --git a/tests/integration/callbacks/test_validation.py b/tests/integration/callbacks/test_validation.py index d437e04965..829cc4b009 100644 --- a/tests/integration/callbacks/test_validation.py +++ b/tests/integration/callbacks/test_validation.py @@ -1,7 +1,6 @@ import pytest from dash import Dash, Input, Output, State, html -from dash._callback import GLOBAL_CALLBACK_MAP from dash.exceptions import InvalidCallbackReturnValue, IncorrectTypeException @@ -78,7 +77,7 @@ def test_cbva002_callback_return_validation(): def single(a): return set([1]) - single_wrapped = GLOBAL_CALLBACK_MAP["b.children"]["callback"] + single_wrapped = app.callback_map["b.children"]["callback"] with pytest.raises(InvalidCallbackReturnValue): # outputs_list (normally callback_context.outputs_list) is provided diff --git a/tests/unit/dash/test_grouped_callbacks.py b/tests/unit/dash/test_grouped_callbacks.py index 5578f85b2e..cdbe9dc124 100644 --- a/tests/unit/dash/test_grouped_callbacks.py +++ b/tests/unit/dash/test_grouped_callbacks.py @@ -1,7 +1,6 @@ import dash from dash._grouping import make_grouping_by_index, grouping_len, flatten_grouping from dash._utils import create_callback_id -from dash._callback import GLOBAL_CALLBACK_MAP from dash.dependencies import Input, State, Output, ClientsideFunction import mock import json @@ -51,7 +50,7 @@ def check_output_for_grouping(grouping): Input("input-a", "prop"), )(mock_fn) - wrapped_fn = GLOBAL_CALLBACK_MAP[callback_id]["callback"] + wrapped_fn = app.callback_map[callback_id]["callback"] expected_outputs = [ (dep.component_id, dep.component_property, val) @@ -109,7 +108,7 @@ def check_callback_inputs_for_grouping(grouping): inputs, )(mock_fn) - wrapped_fn = GLOBAL_CALLBACK_MAP["output-a.prop"]["callback"] + wrapped_fn = app.callback_map["output-a.prop"]["callback"] flat_input_state_values = flatten_grouping(grouping) flat_input_values = flat_input_state_values[0::2] From 1c1c7a2df1a071d2365dd71913429b91dad978a8 Mon Sep 17 00:00:00 2001 From: philippe Date: Wed, 15 Jun 2022 09:51:36 -0400 Subject: [PATCH 16/35] Update callback docstrings. --- dash/_callback.py | 52 ++++++++++++++++++++++++++++++++++++ dash/dash.py | 68 +++++------------------------------------------ 2 files changed, 58 insertions(+), 62 deletions(-) diff --git a/dash/_callback.py b/dash/_callback.py index 5f236c6409..cba0955828 100644 --- a/dash/_callback.py +++ b/dash/_callback.py @@ -76,6 +76,58 @@ def callback( The last, optional argument `prevent_initial_call` causes the callback not to fire when its outputs are first added to the page. Defaults to `False` and unlike `app.callback` is not configurable at the app level. + + :Keyword Arguments: + :param long: + Mark the callback as a long callback to execute in a manager for + callbacks that take a long time without locking up the Dash app + or timing out. + :param long_manager: + A long callback manager instance. Currently an instance of one of + `DiskcacheLongCallbackManager` or `CeleryLongCallbackManager`. + Defaults to the `long_callback_manager` instance provided to the + `dash.Dash constructor`. + - A diskcache manager (`DiskcacheLongCallbackManager`) that runs callback + logic in a separate process and stores the results to disk using the + diskcache library. This is the easiest backend to use for local + development. + - A Celery manager (`CeleryLongCallbackManager`) that runs callback logic + in a celery worker and returns results to the Dash app through a Celery + broker like RabbitMQ or Redis. + :param long_running: + A list of 3-element tuples. The first element of each tuple should be + an `Output` dependency object referencing a property of a component in + the app layout. The second element is the value that the property + should be set to while the callback is running, and the third element + is the value the property should be set to when the callback completes. + :param long_cancel: + A list of `Input` dependency objects that reference a property of a + component in the app's layout. When the value of this property changes + while a callback is running, the callback is canceled. + Note that the value of the property is not significant, any change in + value will result in the cancellation of the running job (if any). + :param long_progress: + An `Output` dependency grouping that references properties of + components in the app's layout. When provided, the decorated function + will be called with an extra argument as the first argument to the + function. This argument, is a function handle that the decorated + function should call in order to provide updates to the app on its + current progress. This function accepts a single argument, which + correspond to the grouping of properties specified in the provided + `Output` dependency grouping + :param long_progress_default: + A grouping of values that should be assigned to the components + specified by the `progress` argument when the callback is not in + progress. If `progress_default` is not provided, all the dependency + properties specified in `progress` will be set to `None` when the + callback is not running. + :param long_cache_args_to_ignore: + Arguments to ignore when caching is enabled. If callback is configured + with keyword arguments (Input/State provided in a dict), + this should be a list of argument names as strings. Otherwise, + this should be a list of argument indices as integers. + :param long_interval: + Time to wait between the long callback update requests. """ long_spec = None diff --git a/dash/dash.py b/dash/dash.py index da824fdadf..15a6b4902b 100644 --- a/dash/dash.py +++ b/dash/dash.py @@ -1116,7 +1116,6 @@ def clientside_callback(self, clientside_function, *args, **kwargs): **kwargs, ) - # pylint: disable=R0201 def callback(self, *_args, **_kwargs): """ Normally used as a decorator, `@app.callback` provides a server-side @@ -1139,7 +1138,6 @@ def callback(self, *_args, **_kwargs): **_kwargs, ) - # pylint: disable=R0201 def long_callback( self, *_args, @@ -1151,67 +1149,10 @@ def long_callback( progress_default=None, cache_args_to_ignore=None, **_kwargs, - ): # pylint: disable=too-many-statements + ): """ - Deprecation Notice, use long - - Normally used as a decorator, `@app.long_callback` is an alternative to - `@app.callback` designed for callbacks that take a long time to run, - without locking up the Dash app or timing out. - - `@long_callback` is designed to support multiple callback managers. - Two long callback managers are currently implemented: - - - A diskcache manager (`DiskcacheLongCallbackManager`) that runs callback - logic in a separate process and stores the results to disk using the - diskcache library. This is the easiest backend to use for local - development. - - A Celery manager (`CeleryLongCallbackManager`) that runs callback logic - in a celery worker and returns results to the Dash app through a Celery - broker like RabbitMQ or Redis. - - The following arguments may include any valid arguments to `@app.callback`. - In addition, `@app.long_callback` supports the following optional - keyword arguments: - - :Keyword Arguments: - :param manager: - A long callback manager instance. Currently an instance of one of - `DiskcacheLongCallbackManager` or `CeleryLongCallbackManager`. - Defaults to the `long_callback_manager` instance provided to the - `dash.Dash constructor`. - :param running: - A list of 3-element tuples. The first element of each tuple should be - an `Output` dependency object referencing a property of a component in - the app layout. The second element is the value that the property - should be set to while the callback is running, and the third element - is the value the property should be set to when the callback completes. - :param cancel: - A list of `Input` dependency objects that reference a property of a - component in the app's layout. When the value of this property changes - while a callback is running, the callback is canceled. - Note that the value of the property is not significant, any change in - value will result in the cancellation of the running job (if any). - :param progress: - An `Output` dependency grouping that references properties of - components in the app's layout. When provided, the decorated function - will be called with an extra argument as the first argument to the - function. This argument, is a function handle that the decorated - function should call in order to provide updates to the app on its - current progress. This function accepts a single argument, which - correspond to the grouping of properties specified in the provided - `Output` dependency grouping - :param progress_default: - A grouping of values that should be assigned to the components - specified by the `progress` argument when the callback is not in - progress. If `progress_default` is not provided, all the dependency - properties specified in `progress` will be set to `None` when the - callback is not running. - :param cache_args_to_ignore: - Arguments to ignore when caching is enabled. If callback is configured - with keyword arguments (Input/State provided in a dict), - this should be a list of argument names as strings. Otherwise, - this should be a list of argument indices as integers. + Deprecation Notice, long callbacks are now supported natively with regular callbacks, + use `long=True` with `dash.callback` or `app.callback` instead. """ return _callback.callback( *_args, @@ -1223,6 +1164,9 @@ def long_callback( long_running=running, long_cancel=cancel, long_cache_args_to_ignore=cache_args_to_ignore, + callback_map=self.callback_map, + callback_list=self._callback_list, + config_prevent_initial_callbacks=self.config.prevent_initial_callbacks, **_kwargs, ) From 55455f952d3c68fc7d30de8378088efa4da9a7f7 Mon Sep 17 00:00:00 2001 From: philippe Date: Wed, 15 Jun 2022 11:12:53 -0400 Subject: [PATCH 17/35] Add test short interval. --- .../long_callback/app_short_interval.py | 38 +++++++++++++++++++ .../long_callback/test_basic_long_callback.py | 13 +++++++ 2 files changed, 51 insertions(+) create mode 100644 tests/integration/long_callback/app_short_interval.py diff --git a/tests/integration/long_callback/app_short_interval.py b/tests/integration/long_callback/app_short_interval.py new file mode 100644 index 0000000000..6a67a6e928 --- /dev/null +++ b/tests/integration/long_callback/app_short_interval.py @@ -0,0 +1,38 @@ +from dash import Dash, Input, Output, html +import time + +from tests.integration.long_callback.utils import get_long_callback_manager + +long_callback_manager = get_long_callback_manager() +handle = long_callback_manager.handle + +app = Dash(__name__, long_callback_manager=long_callback_manager) + +app.layout = html.Div( + [ + html.Button(id="run-button", children="Run"), + html.Button(id="cancel-button", children="Cancel"), + html.Div(id="status", children="Finished"), + html.Div(id="result", children="No results"), + ] +) + + +@app.long_callback( + Output("result", "children"), + [Input("run-button", "n_clicks")], + progress=Output("status", "children"), + progress_default="Finished", + cancel=[Input("cancel-button", "n_clicks")], + interval=0, + prevent_initial_call=True, +) +def update_output(set_progress, n_clicks): + for i in range(4): + set_progress(f"Progress {i}/4") + time.sleep(1) + return f"Clicked '{n_clicks}'" + + +if __name__ == "__main__": + app.run_server(debug=True) diff --git a/tests/integration/long_callback/test_basic_long_callback.py b/tests/integration/long_callback/test_basic_long_callback.py index 4b8979b9ef..7d40a08dea 100644 --- a/tests/integration/long_callback/test_basic_long_callback.py +++ b/tests/integration/long_callback/test_basic_long_callback.py @@ -481,3 +481,16 @@ def make_expect(n): dash_duo.wait_for_text_to_equal("#output-status", f"Updated: {i}") for j, e in enumerate(expect): assert dash_duo.find_element(f"#output{j + 1}").text == e + + +def test_lcbc009_short_interval(dash_duo, manager): + with setup_long_callback_app(manager, "app_short_interval") as app: + dash_duo.start_server(app) + dash_duo.find_element("#run-button").click() + dash_duo.wait_for_text_to_equal("#status", "Progress 2/4", 20) + dash_duo.wait_for_text_to_equal("#status", "Finished", 12) + dash_duo.wait_for_text_to_equal("#result", "Clicked '1'") + + time.sleep(2) + # Ensure the progress is still not running + assert dash_duo.find_element("#status").text == "Finished" From cc9b09b5939f9bf207c882a13188f0eff5db347c Mon Sep 17 00:00:00 2001 From: philippe Date: Wed, 15 Jun 2022 12:18:31 -0400 Subject: [PATCH 18/35] Add error if no manager. --- dash/_callback.py | 27 ++++++++++++++----- dash/dash-renderer/src/actions/callbacks.ts | 1 - dash/exceptions.py | 8 ++++++ .../long_callback/app_short_interval.py | 13 ++++----- 4 files changed, 35 insertions(+), 14 deletions(-) diff --git a/dash/_callback.py b/dash/_callback.py index cba0955828..48e4582aa0 100644 --- a/dash/_callback.py +++ b/dash/_callback.py @@ -8,7 +8,13 @@ handle_grouped_callback_args, Output, ) -from .exceptions import PreventUpdate, WildcardInLongCallback, DuplicateCallback +from .exceptions import ( + PreventUpdate, + WildcardInLongCallback, + DuplicateCallback, + MissingLongCallbackManagerError, + LongCallbackError, +) from ._grouping import ( flatten_grouping, @@ -309,9 +315,7 @@ def wrap_func(func): def add_context(*args, **kwargs): output_spec = kwargs.pop("outputs_list") app_callback_manager = kwargs.pop("long_callback_manager", None) - callback_manager = long and long.get( - "manager", app_callback_manager - ) + callback_manager = long and long.get("manager", app_callback_manager) _validate.validate_output_spec(insert_output, output_spec, Output) func_args, func_kwargs = _validate.validate_and_group_input_args( @@ -321,6 +325,16 @@ def add_context(*args, **kwargs): response = {"multi": True} if long is not None: + if not callback_manager: + raise MissingLongCallbackManagerError( + "Running `long` callbacks requires a manager to be installed.\n" + "Available managers:\n" + "- Diskcache (`pip install dash[diskcache]`) to run callbacks in a separate Process" + " and store results on the local filesystem.\n" + "- Celery (`pip install dash[celery]`) to run callbacks in a celery worker" + " and store results on redis.\n" + ) + progress_outputs = long.get("progress") cache_key = flask.request.args.get("cacheKey") job_id = flask.request.args.get("job") @@ -369,8 +383,7 @@ def add_context(*args, **kwargs): progress = callback_manager.get_progress(cache_key) if progress: response["progress"] = { - str(x): progress[i] - for i, x in enumerate(progress_outputs) + str(x): progress[i] for i, x in enumerate(progress_outputs) } output_value = callback_manager.get_result(cache_key, job_id) @@ -385,7 +398,7 @@ def add_context(*args, **kwargs): and "long_callback_error" in output_value ): error = output_value.get("long_callback_error") - raise Exception( + raise LongCallbackError( f"An error occurred inside a long callback: {error['msg']}\n{error['tb']}" ) diff --git a/dash/dash-renderer/src/actions/callbacks.ts b/dash/dash-renderer/src/actions/callbacks.ts index 8393c8262b..9dd6379abe 100644 --- a/dash/dash-renderer/src/actions/callbacks.ts +++ b/dash/dash-renderer/src/actions/callbacks.ts @@ -642,7 +642,6 @@ export function executeCallback( if (inter.length) { additionalArgs.push(['cancelJob', job.jobId]); if (job.progressDefault) { - console.log(job.progressDefault); sideUpdate( job.progressDefault, dispatch, diff --git a/dash/exceptions.py b/dash/exceptions.py index fd22dfa050..d2fa911a85 100644 --- a/dash/exceptions.py +++ b/dash/exceptions.py @@ -85,3 +85,11 @@ class ProxyError(DashException): class DuplicateCallback(DashException): pass + + +class LongCallbackError(DashException): + pass + + +class MissingLongCallbackManagerError(DashException): + pass diff --git a/tests/integration/long_callback/app_short_interval.py b/tests/integration/long_callback/app_short_interval.py index 6a67a6e928..249a65e5a6 100644 --- a/tests/integration/long_callback/app_short_interval.py +++ b/tests/integration/long_callback/app_short_interval.py @@ -1,4 +1,4 @@ -from dash import Dash, Input, Output, html +from dash import Dash, Input, Output, html, callback import time from tests.integration.long_callback.utils import get_long_callback_manager @@ -18,13 +18,14 @@ ) -@app.long_callback( +@callback( Output("result", "children"), [Input("run-button", "n_clicks")], - progress=Output("status", "children"), - progress_default="Finished", - cancel=[Input("cancel-button", "n_clicks")], - interval=0, + long=True, + long_progress=Output("status", "children"), + long_progress_default="Finished", + long_cancel=[Input("cancel-button", "n_clicks")], + long_interval=0, prevent_initial_call=True, ) def update_output(set_progress, n_clicks): From 0501fd9794a692af6b3211a96b5b9edb80807257 Mon Sep 17 00:00:00 2001 From: philippe Date: Wed, 15 Jun 2022 13:05:53 -0400 Subject: [PATCH 19/35] Fix error message assert --- tests/integration/long_callback/test_basic_long_callback.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/long_callback/test_basic_long_callback.py b/tests/integration/long_callback/test_basic_long_callback.py index 7d40a08dea..48859445ad 100644 --- a/tests/integration/long_callback/test_basic_long_callback.py +++ b/tests/integration/long_callback/test_basic_long_callback.py @@ -455,7 +455,7 @@ def click_n_wait(): dash_duo.driver.switch_to.frame(dash_duo.find_element("iframe")) assert ( - "Exception: An error occurred inside a long callback:" + "dash.exceptions.LongCallbackError: An error occurred inside a long callback:" in dash_duo.wait_for_element(".errormsg").text ) dash_duo.driver.switch_to.default_content() From 43e42eaecbd912c5391db5acd81566a35786a26a Mon Sep 17 00:00:00 2001 From: philippe Date: Mon, 20 Jun 2022 11:23:29 -0400 Subject: [PATCH 20/35] Remove leftover _NoUpdate. --- dash/_callback.py | 2 -- dash/dash.py | 9 +++------ 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/dash/_callback.py b/dash/_callback.py index 48e4582aa0..60c1237783 100644 --- a/dash/_callback.py +++ b/dash/_callback.py @@ -33,8 +33,6 @@ class NoUpdate: - # pylint: disable=too-few-public-methods - def to_plotly_json(self): # pylint: disable=no-self-use return {"_dash_no_update": "_dash_no_update"} diff --git a/dash/dash.py b/dash/dash.py index 15a6b4902b..83af79bdac 100644 --- a/dash/dash.py +++ b/dash/dash.py @@ -164,11 +164,6 @@ def _get_skip(text, divider=2): return tb[0] + "".join(tb[skip:]) -class _NoUpdate: - # pylint: disable=too-few-public-methods - pass - - # Singleton signal to not update an output, alternative to PreventUpdate no_update = _callback.NoUpdate() # pylint: disable=protected-access @@ -1189,7 +1184,9 @@ def dispatch(self): flask.g.state_values = inputs_to_dict( # pylint: disable=assigning-non-slot state ) - flask.g.long_callback_manager = self._long_callback_manager # pylint: disable=E0237 + flask.g.long_callback_manager = ( + self._long_callback_manager + ) # pylint: disable=E0237 changed_props = body.get("changedPropIds", []) flask.g.triggered_inputs = [ # pylint: disable=assigning-non-slot {"prop_id": x, "value": input_values.get(x)} for x in changed_props From de5ee349a74b1bfc6e9e0664f1c3ae46740816c7 Mon Sep 17 00:00:00 2001 From: Philippe Duval Date: Mon, 20 Jun 2022 13:27:06 -0400 Subject: [PATCH 21/35] Update dash/dash.py Co-authored-by: Alex Johnson --- dash/dash.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dash/dash.py b/dash/dash.py index 83af79bdac..7c72869c0d 100644 --- a/dash/dash.py +++ b/dash/dash.py @@ -1146,7 +1146,7 @@ def long_callback( **_kwargs, ): """ - Deprecation Notice, long callbacks are now supported natively with regular callbacks, + Deprecated: long callbacks are now supported natively with regular callbacks, use `long=True` with `dash.callback` or `app.callback` instead. """ return _callback.callback( From 08356f0fe90501287226fa1116cc294f3e557be4 Mon Sep 17 00:00:00 2001 From: philippe Date: Mon, 20 Jun 2022 13:59:40 -0400 Subject: [PATCH 22/35] Hide arguments. --- dash/_callback.py | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/dash/_callback.py b/dash/_callback.py index 60c1237783..cd2161fee7 100644 --- a/dash/_callback.py +++ b/dash/_callback.py @@ -59,9 +59,6 @@ def callback( long_cancel=None, long_manager=None, long_cache_args_to_ignore=None, - config_prevent_initial_callbacks=False, - callback_map=None, - callback_list=None, **_kwargs, ): """ @@ -136,6 +133,12 @@ def callback( long_spec = None + config_prevent_initial_callbacks = _kwargs.pop( + "config_prevent_initial_callbacks", False + ) + callback_map = _kwargs.pop("callback_map", GLOBAL_CALLBACK_MAP) + callback_list = _kwargs.pop("callback_list", GLOBAL_CALLBACK_LIST) + if long: long_spec = { "interval": long_interval, @@ -185,16 +188,9 @@ def cancel_call(*_): if long_cache_args_to_ignore: long_spec["cache_args_to_ignore"] = long_cache_args_to_ignore - call_list = GLOBAL_CALLBACK_LIST - if callback_list is not None: - call_list = callback_list - call_map = GLOBAL_CALLBACK_MAP - if callback_map is not None: - call_map = callback_map - return register_callback( - call_list, - call_map, + callback_list, + callback_map, config_prevent_initial_callbacks, *_args, **_kwargs, From edd7fd64c137c38c5284121ca6ea8a886bd2b6ab Mon Sep 17 00:00:00 2001 From: philippe Date: Tue, 21 Jun 2022 09:46:46 -0400 Subject: [PATCH 23/35] Add test side update. --- .../long_callback/app_side_update.py | 50 +++++++++++++++++++ .../long_callback/test_basic_long_callback.py | 8 +++ 2 files changed, 58 insertions(+) create mode 100644 tests/integration/long_callback/app_side_update.py diff --git a/tests/integration/long_callback/app_side_update.py b/tests/integration/long_callback/app_side_update.py new file mode 100644 index 0000000000..6d0cf2cd0e --- /dev/null +++ b/tests/integration/long_callback/app_side_update.py @@ -0,0 +1,50 @@ +from dash import Dash, Input, Output, html, callback +import time + +from tests.integration.long_callback.utils import get_long_callback_manager + +long_callback_manager = get_long_callback_manager() +handle = long_callback_manager.handle + +app = Dash(__name__, long_callback_manager=long_callback_manager) + +app.layout = html.Div( + [ + html.Button(id="run-button", children="Run"), + html.Button(id="cancel-button", children="Cancel"), + html.Div(id="status", children="Finished"), + html.Div(id="result", children="No results"), + html.Div(id="side-status"), + ] +) + + +@callback( + Output("result", "children"), + [Input("run-button", "n_clicks")], + long=True, + long_progress=Output("status", "children"), + long_progress_default="Finished", + long_cancel=[Input("cancel-button", "n_clicks")], + long_interval=0, + prevent_initial_call=True, +) +def update_output(set_progress, n_clicks): + print("trigger") + for i in range(4): + set_progress(f"Progress {i}/4") + time.sleep(1) + return f"Clicked '{n_clicks}'" + + +@callback( + Output("side-status", "children"), + [Input("status", "children")], + prevent_initial_call=True, +) +def update_side(progress): + return f"Side {progress}" + + +if __name__ == "__main__": + app.run_server(debug=True) diff --git a/tests/integration/long_callback/test_basic_long_callback.py b/tests/integration/long_callback/test_basic_long_callback.py index 48859445ad..390d2b12c9 100644 --- a/tests/integration/long_callback/test_basic_long_callback.py +++ b/tests/integration/long_callback/test_basic_long_callback.py @@ -494,3 +494,11 @@ def test_lcbc009_short_interval(dash_duo, manager): time.sleep(2) # Ensure the progress is still not running assert dash_duo.find_element("#status").text == "Finished" + + +def test_lcbc010_side_updates(dash_duo, manager): + with setup_long_callback_app(manager, "app_side_update") as app: + dash_duo.start_server(app) + dash_duo.find_element("#run-button").click() + for i in range(1, 4): + dash_duo.wait_for_text_to_equal("#side-status", f"Side Progress {i}/4") From 2fd56e93419b95a33b0847cbba95eef56bcfe706 Mon Sep 17 00:00:00 2001 From: philippe Date: Tue, 21 Jun 2022 09:47:43 -0400 Subject: [PATCH 24/35] Redux devtools ignore reloadRequest actions. --- dash/dash-renderer/src/store.ts | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/dash/dash-renderer/src/store.ts b/dash/dash-renderer/src/store.ts index 7244077460..09bf21be8b 100644 --- a/dash/dash-renderer/src/store.ts +++ b/dash/dash-renderer/src/store.ts @@ -82,7 +82,12 @@ export default class RendererStore { const reduxDTEC = (window as any) .__REDUX_DEVTOOLS_EXTENSION_COMPOSE__; if (reduxDTEC) { - this.createAppStore(reducer, reduxDTEC(applyMiddleware(thunk))); + this.createAppStore( + reducer, + reduxDTEC({actionsDenylist: ['reloadRequest']})( + applyMiddleware(thunk) + ) + ); } else { this.createAppStore(reducer, applyMiddleware(thunk)); } From da2b01f491f5b7898561aca82f09fed7a044e8ff Mon Sep 17 00:00:00 2001 From: philippe Date: Tue, 21 Jun 2022 09:48:55 -0400 Subject: [PATCH 25/35] Long callbacks side update to trigger other callbacks. --- dash/dash-renderer/src/actions/callbacks.ts | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/dash/dash-renderer/src/actions/callbacks.ts b/dash/dash-renderer/src/actions/callbacks.ts index 9dd6379abe..0c258a85c6 100644 --- a/dash/dash-renderer/src/actions/callbacks.ts +++ b/dash/dash-renderer/src/actions/callbacks.ts @@ -37,7 +37,7 @@ import {urlBase} from './utils'; import {getCSRFHeader} from '.'; import {createAction, Action} from 'redux-actions'; import {addHttpHeaders} from '../actions'; -import {updateProps} from './index'; +import {notifyObservers, updateProps} from './index'; import {CallbackJobPayload} from '../reducers/callbackJobs'; export const addBlockedCallbacks = createAction( @@ -321,6 +321,9 @@ function sideUpdate(outputs: any, dispatch: any, paths: any) { itempath: componentPath }) ); + dispatch( + notifyObservers({id: componentId, props: {[propName]: value}}) + ); }); } @@ -483,7 +486,10 @@ function handleServerside( finishLine(data); } else { // Poll chain. - setTimeout(handle, long.interval || 500); + setTimeout( + handle, + long.interval !== undefined ? long.interval : 500 + ); } }); } else if (status === STATUS.PREVENT_UPDATE) { From bfc3c8fc87f1a6ce9c0b8787f65d55ad3956be0b Mon Sep 17 00:00:00 2001 From: philippe Date: Tue, 21 Jun 2022 13:33:32 -0400 Subject: [PATCH 26/35] Add test long callback pattern matching. --- .../long_callback/app_pattern_matching.py | 32 +++++++++++++++++++ .../long_callback/test_basic_long_callback.py | 10 ++++++ 2 files changed, 42 insertions(+) create mode 100644 tests/integration/long_callback/app_pattern_matching.py diff --git a/tests/integration/long_callback/app_pattern_matching.py b/tests/integration/long_callback/app_pattern_matching.py new file mode 100644 index 0000000000..aa176dda32 --- /dev/null +++ b/tests/integration/long_callback/app_pattern_matching.py @@ -0,0 +1,32 @@ +from dash import Dash, Input, Output, html, callback, ALL + +from tests.integration.long_callback.utils import get_long_callback_manager + +long_callback_manager = get_long_callback_manager() +handle = long_callback_manager.handle + +app = Dash(__name__, long_callback_manager=long_callback_manager) + +app.layout = html.Div( + [ + html.Button(id={"type": "run-button", "index": 0}, children="Run 1"), + html.Button(id={"type": "run-button", "index": 1}, children="Run 2"), + html.Button(id={"type": "run-button", "index": 2}, children="Run 3"), + html.Div(id="result", children="No results"), + ] +) + + +@callback( + Output("result", "children"), + [Input({"type": "run-button", "index": ALL}, "n_clicks")], + long=True, + prevent_initial_call=True, +) +def update_output(n_clicks): + found = max(x for x in n_clicks if x is not None) + return f"Clicked '{found}'" + + +if __name__ == "__main__": + app.run_server(debug=True) diff --git a/tests/integration/long_callback/test_basic_long_callback.py b/tests/integration/long_callback/test_basic_long_callback.py index 390d2b12c9..b74d03b148 100644 --- a/tests/integration/long_callback/test_basic_long_callback.py +++ b/tests/integration/long_callback/test_basic_long_callback.py @@ -502,3 +502,13 @@ def test_lcbc010_side_updates(dash_duo, manager): dash_duo.find_element("#run-button").click() for i in range(1, 4): dash_duo.wait_for_text_to_equal("#side-status", f"Side Progress {i}/4") + + +def test_lcbc011_long_pattern_matching(dash_duo, manager): + with setup_long_callback_app(manager, "app_pattern_matching") as app: + dash_duo.start_server(app) + for i in range(1, 4): + for _ in range(i): + dash_duo.find_element(f"button:nth-child({i})").click() + + dash_duo.wait_for_text_to_equal("#result", f"Clicked '{i}'") From 077733c800da6ef0ddff74e4ce9526f671a0d139 Mon Sep 17 00:00:00 2001 From: philippe Date: Tue, 21 Jun 2022 13:36:12 -0400 Subject: [PATCH 27/35] Add circular check for long callbacks side outputs. --- dash/_callback.py | 2 + dash/_validate.py | 34 +++++++++++++- dash/dash.py | 2 + tests/unit/dash/long_callback_validation.py | 46 ------------------- .../dash/test_long_callback_validation.py | 26 +++++++++++ 5 files changed, 63 insertions(+), 47 deletions(-) delete mode 100644 tests/unit/dash/long_callback_validation.py create mode 100644 tests/unit/dash/test_long_callback_validation.py diff --git a/dash/_callback.py b/dash/_callback.py index cd2161fee7..ea7526c8e3 100644 --- a/dash/_callback.py +++ b/dash/_callback.py @@ -255,6 +255,8 @@ def insert_callback( "outputs_indices": outputs_indices, "inputs_state_indices": inputs_state_indices, "long": long, + "output": output, + "raw_inputs": inputs, } callback_list.append(callback_spec) diff --git a/dash/_validate.py b/dash/_validate.py index 0e5a097ec1..a4eb61cf1e 100644 --- a/dash/_validate.py +++ b/dash/_validate.py @@ -7,7 +7,7 @@ from ._grouping import grouping_len, map_grouping from .development.base_component import Component from . import exceptions -from ._utils import patch_collections_abc, stringify_id, to_json +from ._utils import patch_collections_abc, stringify_id, to_json, coerce_to_list def validate_callback(outputs, inputs, state, extra_args, types): @@ -479,3 +479,35 @@ def validate_module_name(module): "The first attribute of dash.register_page() must be a string or '__name__'" ) return module + + +def validate_long_callbacks(callback_map): + # Validate that long callback side output & inputs are not circular + # If circular, triggering a long callback would result in a fatal server/computer crash. + all_outputs = set() + input_indexed = {} + for callback in callback_map.values(): + out = coerce_to_list(callback["output"]) + all_outputs.update(out) + for o in out: + input_indexed.setdefault(o, set()) + input_indexed[o].update(coerce_to_list(callback["raw_inputs"])) + + for callback in (x for x in callback_map.values() if x.get("long")): + long_info = callback["long"] + progress = long_info.get("progress", []) + running = long_info.get("running", []) + + long_inputs = coerce_to_list(callback["raw_inputs"]) + outputs = set([x[0] for x in running] + progress) + circular = [ + x + for x in set(k for k, v in input_indexed.items() if v.intersection(outputs)) + if x in long_inputs + ] + + if circular: + raise exceptions.LongCallbackError( + f"Long callback circular error!\n{circular} is used as input for a long callback" + f" but also used as output from an input that is updated with progress or running argument." + ) diff --git a/dash/dash.py b/dash/dash.py index 7c72869c0d..44634472b3 100644 --- a/dash/dash.py +++ b/dash/dash.py @@ -1295,6 +1295,8 @@ def _setup_server(self): self._callback_list.extend(_callback.GLOBAL_CALLBACK_LIST) _callback.GLOBAL_CALLBACK_LIST.clear() + _validate.validate_long_callbacks(self.callback_map) + def _add_assets_resource(self, url_path, file_path): res = {"asset_path": url_path, "filepath": file_path} if self.config.assets_external_path: diff --git a/tests/unit/dash/long_callback_validation.py b/tests/unit/dash/long_callback_validation.py deleted file mode 100644 index 7d4542bedc..0000000000 --- a/tests/unit/dash/long_callback_validation.py +++ /dev/null @@ -1,46 +0,0 @@ -import pytest -import mock - -import dash -from dash.exceptions import WildcardInLongCallback -from dash.dependencies import Input, Output, State, ALL, MATCH, ALLSMALLER - - -def test_wildcard_ids_no_allowed_in_long_callback(): - """ - @app.long_callback doesn't support wildcard dependencies yet. This test can - be removed if wildcard support is added to @app.long_callback in the future. - """ - app = dash.Dash(long_callback_manager=mock.Mock()) - - # ALL - with pytest.raises(WildcardInLongCallback): - - @app.long_callback( - Output("output", "children"), - Input({"type": "filter", "index": ALL}, "value"), - ) - def callback(*args, **kwargs): - pass - - # MATCH - with pytest.raises(WildcardInLongCallback): - - @app.long_callback( - Output({"type": "dynamic-output", "index": MATCH}, "children"), - Input({"type": "dynamic-dropdown", "index": MATCH}, "value"), - State({"type": "dynamic-dropdown", "index": MATCH}, "id"), - ) - def callback(*args, **kwargs): - pass - - # ALLSMALLER - with pytest.raises(WildcardInLongCallback): - - @app.long_callback( - Output({"type": "output-ex3", "index": MATCH}, "children"), - Input({"type": "filter-dropdown-ex3", "index": MATCH}, "value"), - Input({"type": "filter-dropdown-ex3", "index": ALLSMALLER}, "value"), - ) - def callback(*args, **kwargs): - pass diff --git a/tests/unit/dash/test_long_callback_validation.py b/tests/unit/dash/test_long_callback_validation.py new file mode 100644 index 0000000000..db0561233c --- /dev/null +++ b/tests/unit/dash/test_long_callback_validation.py @@ -0,0 +1,26 @@ +import pytest + +from dash.exceptions import LongCallbackError +from dash.dependencies import Input, Output +from dash._validate import validate_long_callbacks + + +def test_circular_long_callback_progress(): + callback_map = { + "side": { + "output": [Output("side-progress", "children")], + "raw_inputs": [Input("progress", "children")], + }, + "long": { + "output": [Output("result", "children")], + "raw_inputs": [ + Input("click", "n_clicks"), + Input("side-progress", "children"), + ], + "long": {"progress": [Output("progress", "children")]}, + }, + } + + with pytest.raises(LongCallbackError): + + validate_long_callbacks(callback_map) From 638dacf8b344b0f38f1d1471d5856c67f537f505 Mon Sep 17 00:00:00 2001 From: philippe Date: Tue, 21 Jun 2022 15:31:42 -0400 Subject: [PATCH 28/35] Add test long callback context. --- .../long_callback/app_callback_ctx.py | 36 +++++++++++++++++++ .../long_callback/test_basic_long_callback.py | 12 +++++++ 2 files changed, 48 insertions(+) create mode 100644 tests/integration/long_callback/app_callback_ctx.py diff --git a/tests/integration/long_callback/app_callback_ctx.py b/tests/integration/long_callback/app_callback_ctx.py new file mode 100644 index 0000000000..0bb2ff1edd --- /dev/null +++ b/tests/integration/long_callback/app_callback_ctx.py @@ -0,0 +1,36 @@ +import json + +from dash import Dash, Input, Output, html, callback, ALL, ctx + +from tests.integration.long_callback.utils import get_long_callback_manager + +long_callback_manager = get_long_callback_manager() +handle = long_callback_manager.handle + +app = Dash(__name__, long_callback_manager=long_callback_manager) + +app.layout = html.Div( + [ + html.Button(id={"type": "run-button", "index": 0}, children="Run 1"), + html.Button(id={"type": "run-button", "index": 1}, children="Run 2"), + html.Button(id={"type": "run-button", "index": 2}, children="Run 3"), + html.Div(id="result", children="No results"), + html.Div(id="running"), + ] +) + + +@callback( + Output("result", "children"), + [Input({"type": "run-button", "index": ALL}, "n_clicks")], + long=True, + prevent_initial_call=True, + long_running=[(Output("running", "children"), "on", "off")], +) +def update_output(n_clicks): + triggered = json.loads(ctx.triggered[0]["prop_id"].split(".")[0]) + return json.dumps(dict(triggered=triggered, value=n_clicks[triggered["index"]])) + + +if __name__ == "__main__": + app.run_server(debug=True) diff --git a/tests/integration/long_callback/test_basic_long_callback.py b/tests/integration/long_callback/test_basic_long_callback.py index b74d03b148..70cc821019 100644 --- a/tests/integration/long_callback/test_basic_long_callback.py +++ b/tests/integration/long_callback/test_basic_long_callback.py @@ -1,3 +1,4 @@ +import json from multiprocessing import Lock import os from contextlib import contextmanager @@ -512,3 +513,14 @@ def test_lcbc011_long_pattern_matching(dash_duo, manager): dash_duo.find_element(f"button:nth-child({i})").click() dash_duo.wait_for_text_to_equal("#result", f"Clicked '{i}'") + + +def test_lcbc012_long_callback_ctx(dash_duo, manager): + with setup_long_callback_app(manager, "app_callback_ctx") as app: + dash_duo.start_server(app) + dash_duo.find_element("button:nth-child(1)").click() + dash_duo.wait_for_text_to_equal("#running", "off") + + output = json.loads(dash_duo.find_element("#result").text) + + assert output["triggered"]["index"] == 0 From 9df30820d0fdeafebe7e088a2aac08727ca33a5a Mon Sep 17 00:00:00 2001 From: philippe Date: Tue, 21 Jun 2022 16:11:09 -0400 Subject: [PATCH 29/35] Support callback context in long callbacks. --- dash/_callback.py | 12 ++++++ dash/_callback_context.py | 43 ++++++++++++------- dash/long_callback/managers/__init__.py | 2 +- dash/long_callback/managers/celery_manager.py | 10 +++-- .../managers/diskcache_manager.py | 13 ++++-- 5 files changed, 58 insertions(+), 22 deletions(-) diff --git a/dash/_callback.py b/dash/_callback.py index ea7526c8e3..b663477303 100644 --- a/dash/_callback.py +++ b/dash/_callback.py @@ -351,6 +351,18 @@ def add_context(*args, **kwargs): cache_key, job_fn, args, + dict( + args_grouping=flask.g.args_grouping, + using_args_grouping=flask.g.using_args_grouping, + outputs_grouping=flask.g.outputs_grouping, + using_outputs_grouping=flask.g.using_outputs_grouping, + inputs_list=flask.g.inputs_list, + states_list=flask.g.states_list, + outputs_list=flask.g.outputs_list, + input_values=flask.g.input_values, + state_values=flask.g.state_values, + triggered_inputs=flask.g.triggered_inputs, + ), ) data = { diff --git a/dash/_callback_context.py b/dash/_callback_context.py index 0d3ed3d2e8..5fa6a62fe4 100644 --- a/dash/_callback_context.py +++ b/dash/_callback_context.py @@ -1,8 +1,11 @@ import functools import warnings import json +import os + import flask + from . import exceptions from ._utils import AttributeDict @@ -10,7 +13,10 @@ def has_context(func): @functools.wraps(func) def assert_context(*args, **kwargs): - if not flask.has_request_context(): + if ( + not flask.has_request_context() + and "DASH_LONG_CALLBACK_CTX" not in os.environ + ): raise exceptions.MissingCallbackContextException( f"dash.callback_context.{getattr(func, '__name__')} is only available from a callback!" ) @@ -19,6 +25,13 @@ def assert_context(*args, **kwargs): return assert_context +def _get_global(): + long = os.getenv("DASH_LONG_CALLBACK_CTX") + if long: + return AttributeDict(**json.loads(long)) + return flask.g + + class FalsyList(list): def __bool__(self): # for Python 3 @@ -37,12 +50,12 @@ class CallbackContext: @property @has_context def inputs(self): - return getattr(flask.g, "input_values", {}) + return getattr(_get_global(), "input_values", {}) @property @has_context def states(self): - return getattr(flask.g, "state_values", {}) + return getattr(_get_global(), "state_values", {}) @property @has_context @@ -64,7 +77,7 @@ def triggered(self): # value - to avoid breaking existing apps, add a dummy item but # make the list still look falsy. So `if ctx.triggered` will make it # look empty, but you can still do `triggered[0]["prop_id"].split(".")` - return getattr(flask.g, "triggered_inputs", []) or falsy_triggered + return getattr(_get_global(), "triggered_inputs", []) or falsy_triggered @property @has_context @@ -90,7 +103,7 @@ def triggered_prop_ids(self): `if "btn-1.n_clicks" in ctx.triggered_prop_ids: do_something()` """ - triggered = getattr(flask.g, "triggered_inputs", []) + triggered = getattr(_get_global(), "triggered_inputs", []) ids = AttributeDict({}) for item in triggered: component_id, _, _ = item["prop_id"].rpartition(".") @@ -146,12 +159,12 @@ def display(btn1, btn2): return "No clicks yet" """ - return getattr(flask.g, "args_grouping", []) + return getattr(_get_global(), "args_grouping", []) @property @has_context def outputs_grouping(self): - return getattr(flask.g, "outputs_grouping", []) + return getattr(_get_global(), "outputs_grouping", []) @property @has_context @@ -162,7 +175,7 @@ def outputs_list(self): DeprecationWarning, ) - return getattr(flask.g, "outputs_list", []) + return getattr(_get_global(), "outputs_list", []) @property @has_context @@ -173,7 +186,7 @@ def inputs_list(self): DeprecationWarning, ) - return getattr(flask.g, "inputs_list", []) + return getattr(_get_global(), "inputs_list", []) @property @has_context @@ -183,12 +196,12 @@ def states_list(self): "states_list is deprecated, use args_grouping instead", DeprecationWarning, ) - return getattr(flask.g, "states_list", []) + return getattr(_get_global(), "states_list", []) @property @has_context def response(self): - return getattr(flask.g, "dash_response") + return getattr(_get_global(), "dash_response") @staticmethod @has_context @@ -205,14 +218,14 @@ def record_timing(name, duration=None, description=None): :param description: A description of the resource. :type description: string or None """ - timing_information = getattr(flask.g, "timing_information", {}) + timing_information = getattr(_get_global(), "timing_information", {}) if name in timing_information: raise KeyError(f'Duplicate resource name "{name}" found.') timing_information[name] = {"dur": round(duration * 1000), "desc": description} - setattr(flask.g, "timing_information", timing_information) + setattr(_get_global(), "timing_information", timing_information) @property @has_context @@ -221,7 +234,7 @@ def using_args_grouping(self): Return True if this callback is using dictionary or nested groupings for Input/State dependencies, or if Input and State dependencies are interleaved """ - return getattr(flask.g, "using_args_grouping", []) + return getattr(_get_global(), "using_args_grouping", []) @property @has_context @@ -230,7 +243,7 @@ def using_outputs_grouping(self): Return True if this callback is using dictionary or nested groupings for Output dependencies. """ - return getattr(flask.g, "using_outputs_grouping", []) + return getattr(_get_global(), "using_outputs_grouping", []) callback_context = CallbackContext() diff --git a/dash/long_callback/managers/__init__.py b/dash/long_callback/managers/__init__.py index c0bcac44af..b7bf175c4f 100644 --- a/dash/long_callback/managers/__init__.py +++ b/dash/long_callback/managers/__init__.py @@ -39,7 +39,7 @@ def job_running(self, job): def make_job_fn(self, fn, progress): raise NotImplementedError - def call_job_fn(self, key, job_fn, args): + def call_job_fn(self, key, job_fn, args, context): raise NotImplementedError def get_progress(self, key): diff --git a/dash/long_callback/managers/celery_manager.py b/dash/long_callback/managers/celery_manager.py index 2f05dba898..56d008742c 100644 --- a/dash/long_callback/managers/celery_manager.py +++ b/dash/long_callback/managers/celery_manager.py @@ -1,6 +1,7 @@ import json import inspect import hashlib +import os import traceback from _plotly_utils.utils import PlotlyJSONEncoder @@ -85,8 +86,8 @@ def get_task(self, job): def clear_cache_entry(self, key): self.handle.backend.delete(key) - def call_job_fn(self, key, job_fn, args): - task = job_fn.delay(key, self._make_progress_key(key), args) + def call_job_fn(self, key, job_fn, args, context): + task = job_fn.delay(key, self._make_progress_key(key), args, context) return task.task_id def get_progress(self, key): @@ -130,13 +131,16 @@ def _make_job_fn(fn, celery_app, progress): fn_hash = hashlib.sha1(fn_str.encode("utf-8")).hexdigest() @celery_app.task(name=f"long_callback_{fn_hash}") - def job_fn(result_key, progress_key, user_callback_args, fn=fn): + def job_fn(result_key, progress_key, user_callback_args, context=None): def _set_progress(progress_value): if not isinstance(progress_value, (list, tuple)): progress_value = [progress_value] cache.set(progress_key, json.dumps(progress_value, cls=PlotlyJSONEncoder)) + if context: + os.environ["DASH_LONG_CALLBACK_CTX"] = json.dumps(context) + maybe_progress = [_set_progress] if progress else [] try: diff --git a/dash/long_callback/managers/diskcache_manager.py b/dash/long_callback/managers/diskcache_manager.py index 68bf4fdc52..3afceaebad 100644 --- a/dash/long_callback/managers/diskcache_manager.py +++ b/dash/long_callback/managers/diskcache_manager.py @@ -1,3 +1,5 @@ +import json +import os import traceback from . import BaseLongCallbackManager @@ -109,12 +111,14 @@ def make_job_fn(self, fn, progress): def clear_cache_entry(self, key): self.handle.delete(key) - def call_job_fn(self, key, job_fn, args): + def call_job_fn(self, key, job_fn, args, context): # pylint: disable-next=import-outside-toplevel,no-name-in-module,import-error from multiprocess import Process # pylint: disable-next=not-callable - proc = Process(target=job_fn, args=(key, self._make_progress_key(key), args)) + proc = Process( + target=job_fn, args=(key, self._make_progress_key(key), args, context) + ) proc.start() return proc.pid @@ -146,7 +150,7 @@ def get_result(self, key, job): def _make_job_fn(fn, cache, progress, lock): - def job_fn(result_key, progress_key, user_callback_args): + def job_fn(result_key, progress_key, user_callback_args, context=None): def _set_progress(progress_value): if not isinstance(progress_value, (list, tuple)): progress_value = [progress_value] @@ -154,6 +158,9 @@ def _set_progress(progress_value): with lock: cache.set(progress_key, progress_value) + if context: + os.environ["DASH_LONG_CALLBACK_CTX"] = json.dumps(context) + maybe_progress = [_set_progress] if progress else [] try: From ccb53b9313d7ea7a6d0b3fac828d73bebe6ee006 Mon Sep 17 00:00:00 2001 From: philippe Date: Wed, 22 Jun 2022 16:52:38 -0400 Subject: [PATCH 30/35] Proper callback_context, replace flask.g context with contextvars. --- dash/_callback.py | 27 ++++--- dash/_callback_context.py | 48 ++++++------ dash/dash.py | 55 +++++++------- dash/long_callback/managers/celery_manager.py | 73 ++++++++++--------- .../managers/diskcache_manager.py | 11 +-- requires-install.txt | 1 + 6 files changed, 109 insertions(+), 106 deletions(-) diff --git a/dash/_callback.py b/dash/_callback.py index b663477303..deea79c621 100644 --- a/dash/_callback.py +++ b/dash/_callback.py @@ -26,10 +26,12 @@ stringify_id, to_json, coerce_to_list, + AttributeDict, ) from . import _validate from .long_callback.managers import BaseLongCallbackManager +from ._callback_context import context_value class NoUpdate: @@ -311,9 +313,12 @@ def wrap_func(func): def add_context(*args, **kwargs): output_spec = kwargs.pop("outputs_list") app_callback_manager = kwargs.pop("long_callback_manager", None) + callback_ctx = kwargs.pop("callback_context", {}) callback_manager = long and long.get("manager", app_callback_manager) _validate.validate_output_spec(insert_output, output_spec, Output) + context_value.set(callback_ctx) + func_args, func_kwargs = _validate.validate_and_group_input_args( args, inputs_state_indices ) @@ -351,17 +356,17 @@ def add_context(*args, **kwargs): cache_key, job_fn, args, - dict( - args_grouping=flask.g.args_grouping, - using_args_grouping=flask.g.using_args_grouping, - outputs_grouping=flask.g.outputs_grouping, - using_outputs_grouping=flask.g.using_outputs_grouping, - inputs_list=flask.g.inputs_list, - states_list=flask.g.states_list, - outputs_list=flask.g.outputs_list, - input_values=flask.g.input_values, - state_values=flask.g.state_values, - triggered_inputs=flask.g.triggered_inputs, + AttributeDict( + args_grouping=callback_ctx.args_grouping, + using_args_grouping=callback_ctx.using_args_grouping, + outputs_grouping=callback_ctx.outputs_grouping, + using_outputs_grouping=callback_ctx.using_outputs_grouping, + inputs_list=callback_ctx.inputs_list, + states_list=callback_ctx.states_list, + outputs_list=callback_ctx.outputs_list, + input_values=callback_ctx.input_values, + state_values=callback_ctx.state_values, + triggered_inputs=callback_ctx.triggered_inputs, ), ) diff --git a/dash/_callback_context.py b/dash/_callback_context.py index 5fa6a62fe4..62ed64c27e 100644 --- a/dash/_callback_context.py +++ b/dash/_callback_context.py @@ -1,22 +1,21 @@ import functools import warnings import json -import os - -import flask +import contextvars from . import exceptions from ._utils import AttributeDict +context_value = contextvars.ContextVar("callback_context") +context_value.set({}) + + def has_context(func): @functools.wraps(func) def assert_context(*args, **kwargs): - if ( - not flask.has_request_context() - and "DASH_LONG_CALLBACK_CTX" not in os.environ - ): + if not context_value.get(): raise exceptions.MissingCallbackContextException( f"dash.callback_context.{getattr(func, '__name__')} is only available from a callback!" ) @@ -25,11 +24,8 @@ def assert_context(*args, **kwargs): return assert_context -def _get_global(): - long = os.getenv("DASH_LONG_CALLBACK_CTX") - if long: - return AttributeDict(**json.loads(long)) - return flask.g +def _get_context_value(): + return context_value.get() class FalsyList(list): @@ -50,12 +46,12 @@ class CallbackContext: @property @has_context def inputs(self): - return getattr(_get_global(), "input_values", {}) + return getattr(_get_context_value(), "input_values", {}) @property @has_context def states(self): - return getattr(_get_global(), "state_values", {}) + return getattr(_get_context_value(), "state_values", {}) @property @has_context @@ -77,7 +73,7 @@ def triggered(self): # value - to avoid breaking existing apps, add a dummy item but # make the list still look falsy. So `if ctx.triggered` will make it # look empty, but you can still do `triggered[0]["prop_id"].split(".")` - return getattr(_get_global(), "triggered_inputs", []) or falsy_triggered + return getattr(_get_context_value(), "triggered_inputs", []) or falsy_triggered @property @has_context @@ -103,7 +99,7 @@ def triggered_prop_ids(self): `if "btn-1.n_clicks" in ctx.triggered_prop_ids: do_something()` """ - triggered = getattr(_get_global(), "triggered_inputs", []) + triggered = getattr(_get_context_value(), "triggered_inputs", []) ids = AttributeDict({}) for item in triggered: component_id, _, _ = item["prop_id"].rpartition(".") @@ -159,12 +155,12 @@ def display(btn1, btn2): return "No clicks yet" """ - return getattr(_get_global(), "args_grouping", []) + return getattr(_get_context_value(), "args_grouping", []) @property @has_context def outputs_grouping(self): - return getattr(_get_global(), "outputs_grouping", []) + return getattr(_get_context_value(), "outputs_grouping", []) @property @has_context @@ -175,7 +171,7 @@ def outputs_list(self): DeprecationWarning, ) - return getattr(_get_global(), "outputs_list", []) + return getattr(_get_context_value(), "outputs_list", []) @property @has_context @@ -186,7 +182,7 @@ def inputs_list(self): DeprecationWarning, ) - return getattr(_get_global(), "inputs_list", []) + return getattr(_get_context_value(), "inputs_list", []) @property @has_context @@ -196,12 +192,12 @@ def states_list(self): "states_list is deprecated, use args_grouping instead", DeprecationWarning, ) - return getattr(_get_global(), "states_list", []) + return getattr(_get_context_value(), "states_list", []) @property @has_context def response(self): - return getattr(_get_global(), "dash_response") + return getattr(_get_context_value(), "dash_response") @staticmethod @has_context @@ -218,14 +214,14 @@ def record_timing(name, duration=None, description=None): :param description: A description of the resource. :type description: string or None """ - timing_information = getattr(_get_global(), "timing_information", {}) + timing_information = getattr(_get_context_value(), "timing_information", {}) if name in timing_information: raise KeyError(f'Duplicate resource name "{name}" found.') timing_information[name] = {"dur": round(duration * 1000), "desc": description} - setattr(_get_global(), "timing_information", timing_information) + setattr(_get_context_value(), "timing_information", timing_information) @property @has_context @@ -234,7 +230,7 @@ def using_args_grouping(self): Return True if this callback is using dictionary or nested groupings for Input/State dependencies, or if Input and State dependencies are interleaved """ - return getattr(_get_global(), "using_args_grouping", []) + return getattr(_get_context_value(), "using_args_grouping", []) @property @has_context @@ -243,7 +239,7 @@ def using_outputs_grouping(self): Return True if this callback is using dictionary or nested groupings for Output dependencies. """ - return getattr(_get_global(), "using_outputs_grouping", []) + return getattr(_get_context_value(), "using_outputs_grouping", []) callback_context = CallbackContext() diff --git a/dash/dash.py b/dash/dash.py index 44634472b3..ec63ee7f9f 100644 --- a/dash/dash.py +++ b/dash/dash.py @@ -1,7 +1,9 @@ +import functools import os import sys import collections import importlib +from contextvars import copy_context from importlib.machinery import ModuleSpec import pkgutil import threading @@ -1168,32 +1170,30 @@ def long_callback( def dispatch(self): body = flask.request.get_json() - flask.g.inputs_list = inputs = body.get( # pylint: disable=assigning-non-slot + g = AttributeDict({}) + + g.inputs_list = inputs = body.get( # pylint: disable=assigning-non-slot "inputs", [] ) - flask.g.states_list = state = body.get( # pylint: disable=assigning-non-slot + g.states_list = state = body.get( # pylint: disable=assigning-non-slot "state", [] ) output = body["output"] outputs_list = body.get("outputs") or split_callback_id(output) - flask.g.outputs_list = outputs_list # pylint: disable=assigning-non-slot + g.outputs_list = outputs_list # pylint: disable=assigning-non-slot - flask.g.input_values = ( # pylint: disable=assigning-non-slot + g.input_values = ( # pylint: disable=assigning-non-slot input_values ) = inputs_to_dict(inputs) - flask.g.state_values = inputs_to_dict( # pylint: disable=assigning-non-slot - state - ) - flask.g.long_callback_manager = ( - self._long_callback_manager - ) # pylint: disable=E0237 + g.state_values = inputs_to_dict(state) # pylint: disable=assigning-non-slot + g.long_callback_manager = self._long_callback_manager # pylint: disable=E0237 changed_props = body.get("changedPropIds", []) - flask.g.triggered_inputs = [ # pylint: disable=assigning-non-slot + g.triggered_inputs = [ # pylint: disable=assigning-non-slot {"prop_id": x, "value": input_values.get(x)} for x in changed_props ] response = ( - flask.g.dash_response # pylint: disable=assigning-non-slot + g.dash_response # pylint: disable=assigning-non-slot ) = flask.Response(mimetype="application/json") args = inputs_to_vals(inputs + state) @@ -1208,19 +1208,19 @@ def dispatch(self): inputs_state = convert_to_AttributeDict(inputs_state) # update args_grouping attributes - for g in inputs_state: + for s in inputs_state: # check for pattern matching: list of inputs or state - if isinstance(g, list): - for pattern_match_g in g: + if isinstance(s, list): + for pattern_match_g in s: update_args_group(pattern_match_g, changed_props) - update_args_group(g, changed_props) + update_args_group(s, changed_props) args_grouping = map_grouping( lambda ind: inputs_state[ind], inputs_state_indices ) - flask.g.args_grouping = args_grouping # pylint: disable=assigning-non-slot - flask.g.using_args_grouping = ( # pylint: disable=assigning-non-slot + g.args_grouping = args_grouping # pylint: disable=assigning-non-slot + g.using_args_grouping = ( # pylint: disable=assigning-non-slot not isinstance(inputs_state_indices, int) and ( inputs_state_indices @@ -1238,10 +1238,8 @@ def dispatch(self): outputs_grouping = map_grouping( lambda ind: flat_outputs[ind], outputs_indices ) - flask.g.outputs_grouping = ( # pylint: disable=assigning-non-slot - outputs_grouping - ) - flask.g.using_outputs_grouping = ( # pylint: disable=assigning-non-slot + g.outputs_grouping = outputs_grouping # pylint: disable=assigning-non-slot + g.using_outputs_grouping = ( # pylint: disable=assigning-non-slot not isinstance(outputs_indices, int) and outputs_indices != list(range(grouping_len(outputs_indices))) ) @@ -1249,12 +1247,17 @@ def dispatch(self): except KeyError as missing_callback_function: msg = f"Callback function not found for output '{output}', perhaps you forgot to prepend the '@'?" raise KeyError(msg) from missing_callback_function + ctx = copy_context() # noinspection PyArgumentList response.set_data( - func( - *args, - outputs_list=outputs_list, - long_callback_manager=self._long_callback_manager, + ctx.run( + functools.partial( + func, + *args, + outputs_list=outputs_list, + long_callback_manager=self._long_callback_manager, + callback_context=g, + ) ) ) return response diff --git a/dash/long_callback/managers/celery_manager.py b/dash/long_callback/managers/celery_manager.py index 56d008742c..ae6f14903e 100644 --- a/dash/long_callback/managers/celery_manager.py +++ b/dash/long_callback/managers/celery_manager.py @@ -1,11 +1,13 @@ import json import inspect import hashlib -import os import traceback +from contextvars import copy_context from _plotly_utils.utils import PlotlyJSONEncoder +from dash._callback_context import context_value +from dash._utils import AttributeDict from dash.exceptions import PreventUpdate from dash.long_callback.managers import BaseLongCallbackManager @@ -138,41 +140,44 @@ def _set_progress(progress_value): cache.set(progress_key, json.dumps(progress_value, cls=PlotlyJSONEncoder)) - if context: - os.environ["DASH_LONG_CALLBACK_CTX"] = json.dumps(context) - maybe_progress = [_set_progress] if progress else [] - try: - if isinstance(user_callback_args, dict): - user_callback_output = fn(*maybe_progress, **user_callback_args) - elif isinstance(user_callback_args, (list, tuple)): - user_callback_output = fn(*maybe_progress, *user_callback_args) + ctx = copy_context() + + def run(): + context_value.set(AttributeDict(**context)) + try: + if isinstance(user_callback_args, dict): + user_callback_output = fn(*maybe_progress, **user_callback_args) + elif isinstance(user_callback_args, (list, tuple)): + user_callback_output = fn(*maybe_progress, *user_callback_args) + else: + user_callback_output = fn(*maybe_progress, user_callback_args) + except PreventUpdate: + # Put NoUpdate dict directly to avoid circular imports. + cache.set( + result_key, + json.dumps( + {"_dash_no_update": "_dash_no_update"}, cls=PlotlyJSONEncoder + ), + ) + except Exception as err: # pylint: disable=broad-except + cache.set( + result_key, + json.dumps( + { + "long_callback_error": { + "msg": str(err), + "tb": traceback.format_exc(), + } + }, + ), + ) else: - user_callback_output = fn(*maybe_progress, user_callback_args) - except PreventUpdate: - # Put NoUpdate dict directly to avoid circular imports. - cache.set( - result_key, - json.dumps( - {"_dash_no_update": "_dash_no_update"}, cls=PlotlyJSONEncoder - ), - ) - except Exception as err: # pylint: disable=broad-except - cache.set( - result_key, - json.dumps( - { - "long_callback_error": { - "msg": str(err), - "tb": traceback.format_exc(), - } - }, - ), - ) - else: - cache.set( - result_key, json.dumps(user_callback_output, cls=PlotlyJSONEncoder) - ) + cache.set( + result_key, json.dumps(user_callback_output, cls=PlotlyJSONEncoder) + ) + + ctx.run(run) return job_fn diff --git a/dash/long_callback/managers/diskcache_manager.py b/dash/long_callback/managers/diskcache_manager.py index 3afceaebad..44979c7d09 100644 --- a/dash/long_callback/managers/diskcache_manager.py +++ b/dash/long_callback/managers/diskcache_manager.py @@ -1,5 +1,3 @@ -import json -import os import traceback from . import BaseLongCallbackManager @@ -116,9 +114,7 @@ def call_job_fn(self, key, job_fn, args, context): from multiprocess import Process # pylint: disable-next=not-callable - proc = Process( - target=job_fn, args=(key, self._make_progress_key(key), args, context) - ) + proc = Process(target=job_fn, args=(key, self._make_progress_key(key), args)) proc.start() return proc.pid @@ -150,7 +146,7 @@ def get_result(self, key, job): def _make_job_fn(fn, cache, progress, lock): - def job_fn(result_key, progress_key, user_callback_args, context=None): + def job_fn(result_key, progress_key, user_callback_args): def _set_progress(progress_value): if not isinstance(progress_value, (list, tuple)): progress_value = [progress_value] @@ -158,9 +154,6 @@ def _set_progress(progress_value): with lock: cache.set(progress_key, progress_value) - if context: - os.environ["DASH_LONG_CALLBACK_CTX"] = json.dumps(context) - maybe_progress = [_set_progress] if progress else [] try: diff --git a/requires-install.txt b/requires-install.txt index 7e0ff3ec4f..ca4774ceff 100644 --- a/requires-install.txt +++ b/requires-install.txt @@ -5,3 +5,4 @@ dash_html_components==2.0.0 dash_core_components==2.0.0 dash_table==5.0.0 importlib-metadata==4.8.3;python_version<"3.7" +contextvars==2.4;python_version<"3.7" From 8525c732406cbdb4303a858d9399f6dcc0667d5d Mon Sep 17 00:00:00 2001 From: philippe Date: Thu, 23 Jun 2022 09:07:26 -0400 Subject: [PATCH 31/35] Fix cancel. --- dash/_callback.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dash/_callback.py b/dash/_callback.py index deea79c621..3f95f689fb 100644 --- a/dash/_callback.py +++ b/dash/_callback.py @@ -176,7 +176,7 @@ def callback( @callback(cancels_output, cancel_inputs, prevent_initial_call=True) def cancel_call(*_): job_ids = flask.request.args.getlist("cancelJob") - manager = long_manager or flask.g.long_callback_manager + manager = long_manager or context_value.get().long_callback_manager if job_ids: for job_id in job_ids: manager.terminate_job(job_id) From 34bd81d4a84122aaa20494254f7bf279e17c5657 Mon Sep 17 00:00:00 2001 From: philippe Date: Thu, 23 Jun 2022 09:52:17 -0400 Subject: [PATCH 32/35] Back to flask.g for timing_information. --- dash/_callback_context.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/dash/_callback_context.py b/dash/_callback_context.py index 62ed64c27e..7638d0a860 100644 --- a/dash/_callback_context.py +++ b/dash/_callback_context.py @@ -3,6 +3,7 @@ import json import contextvars +import flask from . import exceptions from ._utils import AttributeDict @@ -214,14 +215,14 @@ def record_timing(name, duration=None, description=None): :param description: A description of the resource. :type description: string or None """ - timing_information = getattr(_get_context_value(), "timing_information", {}) + timing_information = getattr(flask.g, "timing_information", {}) if name in timing_information: raise KeyError(f'Duplicate resource name "{name}" found.') timing_information[name] = {"dur": round(duration * 1000), "desc": description} - setattr(_get_context_value(), "timing_information", timing_information) + setattr(flask.g, "timing_information", timing_information) @property @has_context @@ -241,5 +242,10 @@ def using_outputs_grouping(self): """ return getattr(_get_context_value(), "using_outputs_grouping", []) + @property + @has_context + def timing_information(self): + return getattr(flask.g, "timing_information", {}) + callback_context = CallbackContext() From 02c15dcc08118cb0babefa48ba8291c483fcd429 Mon Sep 17 00:00:00 2001 From: philippe Date: Tue, 28 Jun 2022 16:13:01 -0400 Subject: [PATCH 33/35] Manage previous outdated running jobs. --- dash/_callback.py | 5 +++ dash/dash-renderer/src/actions/callbacks.ts | 40 +++++++++++++++---- .../src/reducers/callbackJobs.ts | 10 ++++- 3 files changed, 46 insertions(+), 9 deletions(-) diff --git a/dash/_callback.py b/dash/_callback.py index 3f95f689fb..e7763b4702 100644 --- a/dash/_callback.py +++ b/dash/_callback.py @@ -339,6 +339,7 @@ def add_context(*args, **kwargs): progress_outputs = long.get("progress") cache_key = flask.request.args.get("cacheKey") job_id = flask.request.args.get("job") + old_job = flask.request.args.getlist("oldJob") current_key = callback_manager.build_cache_key( func, @@ -347,6 +348,10 @@ def add_context(*args, **kwargs): long.get("cache_args_to_ignore", []), ) + if old_job: + for job in old_job: + callback_manager.terminate_job(job) + if not cache_key: cache_key = current_key diff --git a/dash/dash-renderer/src/actions/callbacks.ts b/dash/dash-renderer/src/actions/callbacks.ts index 78b8c26ff6..4764bdfb6c 100644 --- a/dash/dash-renderer/src/actions/callbacks.ts +++ b/dash/dash-renderer/src/actions/callbacks.ts @@ -93,6 +93,7 @@ const updateResourceUsage = createAction('UPDATE_RESOURCE_USAGE'); const addCallbackJob = createAction('ADD_CALLBACK_JOB'); const removeCallbackJob = createAction('REMOVE_CALLBACK_JOB'); +const setCallbackJobOutdated = createAction('CALLBACK_JOB_OUTDATED'); function unwrapIfNotMulti( paths: any, @@ -333,8 +334,10 @@ function handleServerside( config: any, payload: any, paths: any, - long?: LongCallbackInfo, - additionalArgs?: [string, string][] + long: LongCallbackInfo | undefined, + additionalArgs: [string, string, boolean?][] | undefined, + getState: any, + output: string ): Promise { if (hooks.request_pre) { hooks.request_pre(payload); @@ -346,6 +349,7 @@ function handleServerside( let job: string; let runningOff: any; let progressDefault: any; + let moreArgs = additionalArgs; const fetchCallback = () => { const headers = getCSRFHeader() as any; @@ -365,8 +369,9 @@ function handleServerside( addArg('job', job); } - if (additionalArgs) { - additionalArgs.forEach(([key, value]) => addArg(key, value)); + if (moreArgs) { + moreArgs.forEach(([key, value]) => addArg(key, value)); + moreArgs = moreArgs.filter(([_, __, single]) => !single); } return fetch( @@ -383,6 +388,14 @@ function handleServerside( const handleOutput = (res: any) => { const {status} = res; + if (job) { + const callbackJob = getState().callbackJobs[job]; + if (callbackJob?.outdated) { + dispatch(removeCallbackJob({jobId: job})); + return resolve({}); + } + } + function recordProfile(result: any) { if (config.ui) { // Callback profiling - only relevant if we're showing the debug ui @@ -462,7 +475,8 @@ function handleServerside( jobId: data.job, cacheKey: data.cacheKey as string, cancelInputs: data.cancel, - progressDefault: data.progressDefault + progressDefault: data.progressDefault, + output }; dispatch(addCallbackJob(jobInfo)); job = data.job; @@ -635,9 +649,19 @@ export function executeCallback( let newHeaders: Record | null = null; let lastError: any; - const additionalArgs: [string, string][] = []; + const additionalArgs: [string, string, boolean?][] = []; + console.log(cb.callback.output, getState().callbackJobs); values(getState().callbackJobs).forEach( (job: CallbackJobPayload) => { + if (cb.callback.output === job.output) { + // Terminate the old jobs that are not completed + // set as outdated for the callback promise to + // resolve and remove after. + additionalArgs.push(['oldJob', job.jobId, true]); + dispatch( + setCallbackJobOutdated({jobId: job.jobId}) + ); + } if (!job.cancelInputs) { return; } @@ -667,7 +691,9 @@ export function executeCallback( payload, paths, long, - additionalArgs.length ? additionalArgs : undefined + additionalArgs.length ? additionalArgs : undefined, + getState, + cb.callback.output ); if (newHeaders) { diff --git a/dash/dash-renderer/src/reducers/callbackJobs.ts b/dash/dash-renderer/src/reducers/callbackJobs.ts index 00398eaef0..658401b4d8 100644 --- a/dash/dash-renderer/src/reducers/callbackJobs.ts +++ b/dash/dash-renderer/src/reducers/callbackJobs.ts @@ -1,4 +1,4 @@ -import {assoc, dissoc} from 'ramda'; +import {assoc, assocPath, dissoc} from 'ramda'; import {ICallbackProperty} from '../types/callbacks'; type CallbackJobState = {[k: string]: CallbackJobPayload}; @@ -8,10 +8,12 @@ export type CallbackJobPayload = { cacheKey: string; jobId: string; progressDefault?: any; + output?: string; + outdated?: boolean; }; type CallbackJobAction = { - type: 'ADD_CALLBACK_JOB' | 'REMOVE_CALLBACK_JOB'; + type: 'ADD_CALLBACK_JOB' | 'REMOVE_CALLBACK_JOB' | 'CALLBACK_JOB_OUTDATED'; payload: CallbackJobPayload; }; @@ -19,6 +21,8 @@ const setJob = (job: CallbackJobPayload, state: CallbackJobState) => assoc(job.jobId, job, state); const removeJob = (jobId: string, state: CallbackJobState) => dissoc(jobId, state); +const setOutdated = (jobId: string, state: CallbackJobState) => + assocPath([jobId, 'outdated'], true, state); export default function ( state: CallbackJobState = {}, @@ -29,6 +33,8 @@ export default function ( return setJob(action.payload, state); case 'REMOVE_CALLBACK_JOB': return removeJob(action.payload.jobId, state); + case 'CALLBACK_JOB_OUTDATED': + return setOutdated(action.payload.jobId, state); default: return state; } From 529ec8e145b211cd50e67357e8132f75cc69be2a Mon Sep 17 00:00:00 2001 From: philippe Date: Thu, 30 Jun 2022 13:53:04 -0400 Subject: [PATCH 34/35] Lock selenium <=4.2.0 --- requires-testing.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requires-testing.txt b/requires-testing.txt index ee5aa609c4..c78142892b 100644 --- a/requires-testing.txt +++ b/requires-testing.txt @@ -5,5 +5,5 @@ lxml>=4.6.2 percy>=2.0.2 pytest>=6.0.2 requests[security]>=2.21.0 -selenium>=3.141.0 +selenium>=3.141.0,<=4.2.0 waitress>=1.4.4 From 2fb1cfacd19a667b301999bb8b184b8832c9ef8d Mon Sep 17 00:00:00 2001 From: philippe Date: Thu, 30 Jun 2022 15:12:07 -0400 Subject: [PATCH 35/35] Update changelog. --- CHANGELOG.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index ef290ce19f..f5048a7d6f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,17 @@ This project adheres to [Semantic Versioning](https://semver.org/). - [#2098](https://github.com/plotly/dash/pull/2098) Accept HTTP code 400 as well as 401 for JWT expiry - [#2097](https://github.com/plotly/dash/pull/2097) Fix bug [#2095](https://github.com/plotly/dash/issues/2095) with TypeScript compiler and `React.FC` empty valueDeclaration error & support empty props components. - [#2104](https://github.com/plotly/dash/pull/2104) Fix bug [#2099](https://github.com/plotly/dash/issues/2099) with Dropdown clearing search value when a value is selected. +- [#2039](https://github.com/plotly/dash/pull/2039) Fix bugs in long callbacks: + - Fix [#1769](https://github.com/plotly/dash/issues/1769) and [#1852](https://github.com/plotly/dash/issues/1852) short interval makes job run in loop. + - Fix [#1974](https://github.com/plotly/dash/issues/1974) returning `no_update` or raising `PreventUpdate` not supported with celery. + - Fix use of the callback context in celery long callbacks. + - Fix support of pattern matching for long callbacks. + +### Added + +- [#2039](https://github.com/plotly/dash/pull/2039) Long callback changes: + - Add `long=False` to `dash.callback` to use instead of `app.long_callback`. + - Add previous `app.long_callback` arguments to `dash.callback` prefixed with `long_` (`interval`, `running`, `cancel`, `progress`, `progress_default`, `cache_args_to_ignore`, `manager`) ## [2.5.1] - 2022-06-13