@@ -466,7 +466,7 @@ func (qjm *XController) PreemptQueueJobs() {
466
466
// Only back-off AWs that are in state running and not in state Failed
467
467
if updateNewJob .Status .State != arbv1 .AppWrapperStateFailed {
468
468
klog .Infof ("[PreemptQueueJobs] Adding preempted AppWrapper %s/%s to back off queue." , aw .Name , aw .Namespace )
469
- go qjm .backoff (ctx , updateNewJob , "PreemptionTriggered" , string (message ))
469
+ qjm .backoff (ctx , updateNewJob , "PreemptionTriggered" , string (message ))
470
470
}
471
471
}
472
472
}
@@ -1155,7 +1155,7 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) {
1155
1155
} else {
1156
1156
dispatchFailedMessage = "Cannot find an cluster with enough resources to dispatch AppWrapper."
1157
1157
klog .V (2 ).Infof ("[ScheduleNex] [Dispatcher Mode] %s %s\n " , dispatchFailedReason , dispatchFailedMessage )
1158
- go qjm .backoff (ctx , qj , dispatchFailedReason , dispatchFailedMessage )
1158
+ qjm .backoff (ctx , qj , dispatchFailedReason , dispatchFailedMessage )
1159
1159
}
1160
1160
} else { // Agent Mode
1161
1161
aggqj := qjm .GetAggregatedResources (qj )
@@ -1284,7 +1284,7 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) {
1284
1284
// TODO: Remove forwarded logic as a big AW will never be forwarded
1285
1285
forwarded = true
1286
1286
// should we call backoff or update etcd?
1287
- go qjm .backoff (ctx , qj , dispatchFailedReason , dispatchFailedMessage )
1287
+ qjm .backoff (ctx , qj , dispatchFailedReason , dispatchFailedMessage )
1288
1288
}
1289
1289
}
1290
1290
forwarded = true
@@ -1347,7 +1347,7 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) {
1347
1347
if qjm .quotaManager != nil && quotaFits {
1348
1348
qjm .quotaManager .Release (qj )
1349
1349
}
1350
- go qjm .backoff (ctx , qj , dispatchFailedReason , dispatchFailedMessage )
1350
+ qjm .backoff (ctx , qj , dispatchFailedReason , dispatchFailedMessage )
1351
1351
}
1352
1352
}
1353
1353
return nil
@@ -1672,6 +1672,20 @@ func (cc *XController) updateQueueJob(oldObj, newObj interface{}) {
1672
1672
}
1673
1673
1674
1674
klog .V (6 ).Infof ("[Informer-updateQJ] '%s/%s' *Delay=%.6f seconds normal enqueue Version=%s Status=%v" , newQJ .Namespace , newQJ .Name , time .Now ().Sub (newQJ .Status .ControllerFirstTimestamp .Time ).Seconds (), newQJ .ResourceVersion , newQJ .Status )
1675
+ for _ , cond := range newQJ .Status .Conditions {
1676
+ if cond .Type == arbv1 .AppWrapperCondBackoff {
1677
+ //AWs that have backoff conditions have a delay of 10 seconds before getting added to enqueue.
1678
+ //TODO: we could plug an interface here with back-off strategies for different MCAD use cases.
1679
+ time .AfterFunc (time .Duration (cc .serverOption .BackoffTime )* time .Second , func () {
1680
+ if cc .serverOption .QuotaEnabled && cc .quotaManager != nil {
1681
+ cc .quotaManager .Release (newQJ )
1682
+ }
1683
+ cc .enqueue (newQJ )
1684
+ })
1685
+ return
1686
+ }
1687
+ }
1688
+
1675
1689
// cc.eventQueue.Delete(oldObj)
1676
1690
cc .enqueue (newQJ )
1677
1691
}
0 commit comments