Skip to content

Commit 02c15dc

Browse files
committed
Manage previous outdated running jobs.
1 parent b2ac6ce commit 02c15dc

File tree

3 files changed

+46
-9
lines changed

3 files changed

+46
-9
lines changed

dash/_callback.py

+5
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,7 @@ def add_context(*args, **kwargs):
339339
progress_outputs = long.get("progress")
340340
cache_key = flask.request.args.get("cacheKey")
341341
job_id = flask.request.args.get("job")
342+
old_job = flask.request.args.getlist("oldJob")
342343

343344
current_key = callback_manager.build_cache_key(
344345
func,
@@ -347,6 +348,10 @@ def add_context(*args, **kwargs):
347348
long.get("cache_args_to_ignore", []),
348349
)
349350

351+
if old_job:
352+
for job in old_job:
353+
callback_manager.terminate_job(job)
354+
350355
if not cache_key:
351356
cache_key = current_key
352357

dash/dash-renderer/src/actions/callbacks.ts

+33-7
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ const updateResourceUsage = createAction('UPDATE_RESOURCE_USAGE');
9393

9494
const addCallbackJob = createAction('ADD_CALLBACK_JOB');
9595
const removeCallbackJob = createAction('REMOVE_CALLBACK_JOB');
96+
const setCallbackJobOutdated = createAction('CALLBACK_JOB_OUTDATED');
9697

9798
function unwrapIfNotMulti(
9899
paths: any,
@@ -333,8 +334,10 @@ function handleServerside(
333334
config: any,
334335
payload: any,
335336
paths: any,
336-
long?: LongCallbackInfo,
337-
additionalArgs?: [string, string][]
337+
long: LongCallbackInfo | undefined,
338+
additionalArgs: [string, string, boolean?][] | undefined,
339+
getState: any,
340+
output: string
338341
): Promise<CallbackResponse> {
339342
if (hooks.request_pre) {
340343
hooks.request_pre(payload);
@@ -346,6 +349,7 @@ function handleServerside(
346349
let job: string;
347350
let runningOff: any;
348351
let progressDefault: any;
352+
let moreArgs = additionalArgs;
349353

350354
const fetchCallback = () => {
351355
const headers = getCSRFHeader() as any;
@@ -365,8 +369,9 @@ function handleServerside(
365369
addArg('job', job);
366370
}
367371

368-
if (additionalArgs) {
369-
additionalArgs.forEach(([key, value]) => addArg(key, value));
372+
if (moreArgs) {
373+
moreArgs.forEach(([key, value]) => addArg(key, value));
374+
moreArgs = moreArgs.filter(([_, __, single]) => !single);
370375
}
371376

372377
return fetch(
@@ -383,6 +388,14 @@ function handleServerside(
383388
const handleOutput = (res: any) => {
384389
const {status} = res;
385390

391+
if (job) {
392+
const callbackJob = getState().callbackJobs[job];
393+
if (callbackJob?.outdated) {
394+
dispatch(removeCallbackJob({jobId: job}));
395+
return resolve({});
396+
}
397+
}
398+
386399
function recordProfile(result: any) {
387400
if (config.ui) {
388401
// Callback profiling - only relevant if we're showing the debug ui
@@ -462,7 +475,8 @@ function handleServerside(
462475
jobId: data.job,
463476
cacheKey: data.cacheKey as string,
464477
cancelInputs: data.cancel,
465-
progressDefault: data.progressDefault
478+
progressDefault: data.progressDefault,
479+
output
466480
};
467481
dispatch(addCallbackJob(jobInfo));
468482
job = data.job;
@@ -635,9 +649,19 @@ export function executeCallback(
635649
let newHeaders: Record<string, string> | null = null;
636650
let lastError: any;
637651

638-
const additionalArgs: [string, string][] = [];
652+
const additionalArgs: [string, string, boolean?][] = [];
653+
console.log(cb.callback.output, getState().callbackJobs);
639654
values(getState().callbackJobs).forEach(
640655
(job: CallbackJobPayload) => {
656+
if (cb.callback.output === job.output) {
657+
// Terminate the old jobs that are not completed
658+
// set as outdated for the callback promise to
659+
// resolve and remove after.
660+
additionalArgs.push(['oldJob', job.jobId, true]);
661+
dispatch(
662+
setCallbackJobOutdated({jobId: job.jobId})
663+
);
664+
}
641665
if (!job.cancelInputs) {
642666
return;
643667
}
@@ -667,7 +691,9 @@ export function executeCallback(
667691
payload,
668692
paths,
669693
long,
670-
additionalArgs.length ? additionalArgs : undefined
694+
additionalArgs.length ? additionalArgs : undefined,
695+
getState,
696+
cb.callback.output
671697
);
672698

673699
if (newHeaders) {

dash/dash-renderer/src/reducers/callbackJobs.ts

+8-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import {assoc, dissoc} from 'ramda';
1+
import {assoc, assocPath, dissoc} from 'ramda';
22
import {ICallbackProperty} from '../types/callbacks';
33

44
type CallbackJobState = {[k: string]: CallbackJobPayload};
@@ -8,17 +8,21 @@ export type CallbackJobPayload = {
88
cacheKey: string;
99
jobId: string;
1010
progressDefault?: any;
11+
output?: string;
12+
outdated?: boolean;
1113
};
1214

1315
type CallbackJobAction = {
14-
type: 'ADD_CALLBACK_JOB' | 'REMOVE_CALLBACK_JOB';
16+
type: 'ADD_CALLBACK_JOB' | 'REMOVE_CALLBACK_JOB' | 'CALLBACK_JOB_OUTDATED';
1517
payload: CallbackJobPayload;
1618
};
1719

1820
const setJob = (job: CallbackJobPayload, state: CallbackJobState) =>
1921
assoc(job.jobId, job, state);
2022
const removeJob = (jobId: string, state: CallbackJobState) =>
2123
dissoc(jobId, state);
24+
const setOutdated = (jobId: string, state: CallbackJobState) =>
25+
assocPath([jobId, 'outdated'], true, state);
2226

2327
export default function (
2428
state: CallbackJobState = {},
@@ -29,6 +33,8 @@ export default function (
2933
return setJob(action.payload, state);
3034
case 'REMOVE_CALLBACK_JOB':
3135
return removeJob(action.payload.jobId, state);
36+
case 'CALLBACK_JOB_OUTDATED':
37+
return setOutdated(action.payload.jobId, state);
3238
default:
3339
return state;
3440
}

0 commit comments

Comments
 (0)