@@ -368,16 +368,17 @@ func (qjm *XController) PreemptQueueJobs() {
368
368
updateErr := qjm .UpdateQueueJobStatus (newjob )
369
369
if updateErr != nil {
370
370
klog .Warningf ("[PreemptQueueJobs] update of pod count to AW %v failed hence skipping preemption" , newjob .Name )
371
+ return
371
372
}
372
373
newjob .Status .CanRun = false
373
374
newjob .Status .FilterIgnore = true // update QueueJobState only
374
375
cleanAppWrapper := false
375
376
// If dispatch deadline is exceeded no matter what the state of AW, kill the job and set status as Failed.
376
- if (aw .Status .State == arbv1 .AppWrapperStateActive ) && (aw .Spec .SchedSpec .DispatchDuration .Limit > 0 ) {
377
- if aw .Spec .SchedSpec .DispatchDuration .Overrun {
378
- index := getIndexOfMatchedCondition (aw , arbv1 .AppWrapperCondPreemptCandidate , "DispatchDeadlineExceeded" )
377
+ if (newjob .Status .State == arbv1 .AppWrapperStateActive ) && (newjob .Spec .SchedSpec .DispatchDuration .Limit > 0 ) {
378
+ if newjob .Spec .SchedSpec .DispatchDuration .Overrun {
379
+ index := getIndexOfMatchedCondition (newjob , arbv1 .AppWrapperCondPreemptCandidate , "DispatchDeadlineExceeded" )
379
380
if index < 0 {
380
- message = fmt .Sprintf ("Dispatch deadline exceeded. allowed to run for %v seconds" , aw .Spec .SchedSpec .DispatchDuration .Limit )
381
+ message = fmt .Sprintf ("Dispatch deadline exceeded. allowed to run for %v seconds" , newjob .Spec .SchedSpec .DispatchDuration .Limit )
381
382
cond := GenerateAppWrapperCondition (arbv1 .AppWrapperCondPreemptCandidate , v1 .ConditionTrue , "DispatchDeadlineExceeded" , message )
382
383
newjob .Status .Conditions = append (newjob .Status .Conditions , cond )
383
384
} else {
@@ -392,7 +393,7 @@ func (qjm *XController) PreemptQueueJobs() {
392
393
393
394
err := qjm .updateStatusInEtcdWithRetry (ctx , updateNewJob , "PreemptQueueJobs - CanRun: false -- DispatchDeadlineExceeded" )
394
395
if err != nil {
395
- klog .Warningf ("[PreemptQueueJobs] status update CanRun: false -- DispatchDeadlineExceeded for '%s/%s' failed" , aw .Namespace , aw .Name )
396
+ klog .Warningf ("[PreemptQueueJobs] status update CanRun: false -- DispatchDeadlineExceeded for '%s/%s' failed" , newjob .Namespace , newjob .Name )
396
397
continue
397
398
}
398
399
// cannot use cleanup AW, since it puts AW back in running state
@@ -403,33 +404,33 @@ func (qjm *XController) PreemptQueueJobs() {
403
404
}
404
405
}
405
406
406
- if ((aw .Status .Running + aw .Status .Succeeded ) < int32 (aw .Spec .SchedSpec .MinAvailable )) && aw .Status .State == arbv1 .AppWrapperStateActive {
407
- index := getIndexOfMatchedCondition (aw , arbv1 .AppWrapperCondPreemptCandidate , "MinPodsNotRunning" )
407
+ if ((newjob .Status .Running + newjob .Status .Succeeded ) < int32 (newjob .Spec .SchedSpec .MinAvailable )) && newjob .Status .State == arbv1 .AppWrapperStateActive {
408
+ index := getIndexOfMatchedCondition (newjob , arbv1 .AppWrapperCondPreemptCandidate , "MinPodsNotRunning" )
408
409
if index < 0 {
409
- message = fmt .Sprintf ("Insufficient number of Running and Completed pods, minimum=%d, running=%d, completed=%d." , aw .Spec .SchedSpec .MinAvailable , aw .Status .Running , aw .Status .Succeeded )
410
+ message = fmt .Sprintf ("Insufficient number of Running and Completed pods, minimum=%d, running=%d, completed=%d." , newjob .Spec .SchedSpec .MinAvailable , newjob .Status .Running , newjob .Status .Succeeded )
410
411
cond := GenerateAppWrapperCondition (arbv1 .AppWrapperCondPreemptCandidate , v1 .ConditionTrue , "MinPodsNotRunning" , message )
411
412
newjob .Status .Conditions = append (newjob .Status .Conditions , cond )
412
413
} else {
413
414
cond := GenerateAppWrapperCondition (arbv1 .AppWrapperCondPreemptCandidate , v1 .ConditionTrue , "MinPodsNotRunning" , "" )
414
415
newjob .Status .Conditions [index ] = * cond .DeepCopy ()
415
416
}
416
417
417
- if aw .Spec .SchedSpec .Requeuing .InitialTimeInSeconds == 0 {
418
- aw .Spec .SchedSpec .Requeuing .InitialTimeInSeconds = aw .Spec .SchedSpec .Requeuing .TimeInSeconds
418
+ if newjob .Spec .SchedSpec .Requeuing .InitialTimeInSeconds == 0 {
419
+ newjob .Spec .SchedSpec .Requeuing .InitialTimeInSeconds = newjob .Spec .SchedSpec .Requeuing .TimeInSeconds
419
420
}
420
- if aw .Spec .SchedSpec .Requeuing .GrowthType == "exponential" {
421
+ if newjob .Spec .SchedSpec .Requeuing .GrowthType == "exponential" {
421
422
if newjob .Status .RequeueingTimeInSeconds == 0 {
422
- newjob .Status .RequeueingTimeInSeconds += aw .Spec .SchedSpec .Requeuing .TimeInSeconds
423
+ newjob .Status .RequeueingTimeInSeconds += newjob .Spec .SchedSpec .Requeuing .TimeInSeconds
423
424
} else {
424
425
newjob .Status .RequeueingTimeInSeconds += newjob .Status .RequeueingTimeInSeconds
425
426
}
426
- } else if aw .Spec .SchedSpec .Requeuing .GrowthType == "linear" {
427
- newjob .Status .RequeueingTimeInSeconds += aw .Spec .SchedSpec .Requeuing .InitialTimeInSeconds
427
+ } else if newjob .Spec .SchedSpec .Requeuing .GrowthType == "linear" {
428
+ newjob .Status .RequeueingTimeInSeconds += newjob .Spec .SchedSpec .Requeuing .InitialTimeInSeconds
428
429
}
429
430
430
- if aw .Spec .SchedSpec .Requeuing .MaxTimeInSeconds > 0 {
431
- if aw .Spec .SchedSpec .Requeuing .MaxTimeInSeconds <= newjob .Status .RequeueingTimeInSeconds {
432
- newjob .Status .RequeueingTimeInSeconds = aw .Spec .SchedSpec .Requeuing .MaxTimeInSeconds
431
+ if newjob .Spec .SchedSpec .Requeuing .MaxTimeInSeconds > 0 {
432
+ if newjob .Spec .SchedSpec .Requeuing .MaxTimeInSeconds <= newjob .Status .RequeueingTimeInSeconds {
433
+ newjob .Status .RequeueingTimeInSeconds = newjob .Spec .SchedSpec .Requeuing .MaxTimeInSeconds
433
434
}
434
435
}
435
436
@@ -443,7 +444,7 @@ func (qjm *XController) PreemptQueueJobs() {
443
444
updateNewJob = newjob .DeepCopy ()
444
445
} else {
445
446
// If pods failed scheduling generate new preempt condition
446
- message = fmt .Sprintf ("Pods failed scheduling failed=%v, running=%v." , len (aw .Status .PendingPodConditions ), aw .Status .Running )
447
+ message = fmt .Sprintf ("Pods failed scheduling failed=%v, running=%v." , len (newjob .Status .PendingPodConditions ), newjob .Status .Running )
447
448
index := getIndexOfMatchedCondition (newjob , arbv1 .AppWrapperCondPreemptCandidate , "PodsFailedScheduling" )
448
449
// ignore co-scheduler failed scheduling events. This is a temp
449
450
// work-around until co-scheduler version 0.22.X perf issues are resolved.
@@ -460,17 +461,17 @@ func (qjm *XController) PreemptQueueJobs() {
460
461
461
462
err = qjm .updateStatusInEtcdWithRetry (ctx , updateNewJob , "PreemptQueueJobs - CanRun: false -- MinPodsNotRunning" )
462
463
if err != nil {
463
- klog .Warningf ("[PreemptQueueJobs] status update for '%s/%s' failed, skipping app wrapper err =%v" , aw .Namespace , aw .Name , err )
464
+ klog .Warningf ("[PreemptQueueJobs] status update for '%s/%s' failed, skipping app wrapper err =%v" , newjob .Namespace , newjob .Name , err )
464
465
continue
465
466
}
466
467
467
468
if cleanAppWrapper {
468
- klog .V (4 ).Infof ("[PreemptQueueJobs] Deleting AppWrapper %s/%s due to maximum number of re-queueing(s) exceeded." , aw .Name , aw .Namespace )
469
+ klog .V (4 ).Infof ("[PreemptQueueJobs] Deleting AppWrapper %s/%s due to maximum number of re-queueing(s) exceeded." , newjob .Name , newjob .Namespace )
469
470
go qjm .Cleanup (ctx , updateNewJob )
470
471
} else {
471
472
// Only back-off AWs that are in state running and not in state Failed
472
473
if updateNewJob .Status .State != arbv1 .AppWrapperStateFailed {
473
- klog .Infof ("[PreemptQueueJobs] Adding preempted AppWrapper %s/%s to back off queue." , aw .Name , aw .Namespace )
474
+ klog .Infof ("[PreemptQueueJobs] Adding preempted AppWrapper %s/%s to back off queue." , newjob .Name , newjob .Namespace )
474
475
qjm .backoff (ctx , updateNewJob , "PreemptionTriggered" , string (message ))
475
476
}
476
477
}
0 commit comments