@@ -254,7 +254,7 @@ func NewJobController(restConfig *rest.Config, mcadConfig *config.MCADConfigurat
254
254
FilterFunc : func (obj interface {}) bool {
255
255
switch t := obj .(type ) {
256
256
case * arbv1.AppWrapper :
257
- klog .V (10 ).Infof ("[Informer] Filter Name=%s Namespace=%s Version=%s Local=%t FilterIgnore=%t Sender=%s &qj=%p qj=%+v " , t .Name , t .Namespace , t .ResourceVersion , t .Status .Local , t .Status .FilterIgnore , t .Status .Sender , t , t )
257
+ klog .V (10 ).Infof ("[Informer] Filter Name=%s Namespace=%s Version=%s Local=%t FilterIgnore=%t Sender=%s " , t .Name , t .Namespace , t .ResourceVersion , t .Status .Local , t .Status .FilterIgnore , t .Status .Sender )
258
258
// todo: This is a current workaround for duplicate message bug.
259
259
// if t.Status.Local == true { // ignore duplicate message from cache
260
260
// return false
@@ -440,14 +440,15 @@ func (qjm *XController) PreemptQueueJobs(inspectAw *arbv1.AppWrapper) {
440
440
return
441
441
}
442
442
443
- if cleanAppWrapper {
444
- klog .V (4 ).Infof ("[PreemptQueueJobs] Deleting AppWrapper %s/%s due to maximum number of re-queueing(s) exceeded." , newjob .Namespace , newjob .Name )
445
- go qjm .Cleanup (ctx , updateNewJob )
446
- } else {
447
- // Only back-off AWs that are in state running and not in state Failed
448
- if updateNewJob .Status .State != arbv1 .AppWrapperStateFailed {
449
- klog .Infof ("[PreemptQueueJobs] Adding preempted AppWrapper %s/%s to back off queue." , newjob .Namespace , newjob .Name )
450
- qjm .backoff (ctx , updateNewJob , "PreemptionTriggered" , string (message ))
443
+ if cleanAppWrapper {
444
+ klog .V (4 ).Infof ("[PreemptQueueJobs] Deleting AppWrapper %s/%s due to maximum number of re-queueing(s) exceeded." , newjob .Namespace , newjob .Name )
445
+ go qjm .Cleanup (ctx , updateNewJob )
446
+ } else {
447
+ // Only back-off AWs that are in state running and not in state Failed
448
+ if updateNewJob .Status .State != arbv1 .AppWrapperStateFailed {
449
+ klog .Infof ("[PreemptQueueJobs] Adding preempted AppWrapper %s/%s to back off queue." , newjob .Namespace , newjob .Name )
450
+ qjm .backoff (ctx , updateNewJob , "PreemptionTriggered" , string (message ))
451
+ }
451
452
}
452
453
}
453
454
}
@@ -1367,7 +1368,7 @@ func (cc *XController) updateStatusInEtcd(ctx context.Context, currentAppwrapper
1367
1368
func (cc * XController ) updateStatusInEtcdWithRetry (ctx context.Context , source * arbv1.AppWrapper , caller string ) error {
1368
1369
klog .V (4 ).Infof ("[updateStatusInEtcdWithMergeFunction] trying to update '%s/%s' version '%s' called by '%s'" , source .Namespace , source .Name , source .ResourceVersion , caller )
1369
1370
source .Status .Sender = "before " + caller // set Sender string to indicate code location
1370
- updateStatusRetrierRetrier := retrier .New (retrier .ExponentialBackoff (10 , 100 * time .Millisecond ), & EtcdErrorClassifier {})
1371
+ updateStatusRetrierRetrier := retrier .New (retrier .ExponentialBackoff (1 , 100 * time .Millisecond ), & EtcdErrorClassifier {})
1371
1372
updateStatusRetrierRetrier .SetJitter (0.05 )
1372
1373
updatedAW := source .DeepCopy ()
1373
1374
err := updateStatusRetrierRetrier .RunCtx (ctx , func (localContext context.Context ) error {
@@ -1564,10 +1565,10 @@ func (cc *XController) addQueueJob(obj interface{}) {
1564
1565
firstTime := metav1 .NowMicro ()
1565
1566
qj , ok := obj .(* arbv1.AppWrapper )
1566
1567
if ! ok {
1567
- klog .Errorf ("[Informer-addQJ] object is not AppWrapper. object=%+v" , obj )
1568
+ klog .Errorf ("[Informer-addQJ] object is not AppWrapper." )
1568
1569
return
1569
1570
}
1570
- klog .V (6 ).Infof ("[Informer-addQJ] %s/%s &qj=%p qj=%+v " , qj .Namespace , qj .Name , qj , qj )
1571
+ klog .V (6 ).Infof ("[Informer-addQJ] %s/%s" , qj .Namespace , qj .Name )
1571
1572
if qj .Status .QueueJobState == "" {
1572
1573
qj .Status .ControllerFirstTimestamp = firstTime
1573
1574
qj .Status .SystemPriority = float64 (qj .Spec .Priority )
@@ -1602,18 +1603,19 @@ func (cc *XController) addQueueJob(obj interface{}) {
1602
1603
// updatequeuejobs now runs as a part of informer machinery. optimization here is to not use etcd to pullout submitted AWs and operate
1603
1604
// on stale AWs. This has potential to improve performance at scale.
1604
1605
if hasCompletionStatus {
1605
- requeueInterval := 5 * time .Second
1606
+ requeueIntervalForCompletionStatus := 5 * time .Second
1606
1607
key , err := cache .MetaNamespaceKeyFunc (qj )
1607
1608
if err != nil {
1608
1609
klog .Warningf ("[Informer-addQJ] Error getting AW %s/%s from cache cannot determine completion status" , qj .Namespace , qj .Name )
1609
1610
// TODO: should we return from this loop?
1610
1611
}
1611
1612
go func () {
1612
1613
for {
1613
- time .Sleep (requeueInterval )
1614
+ time .Sleep (requeueIntervalForCompletionStatus )
1614
1615
latestObj , exists , err := cc .appwrapperInformer .Informer ().GetStore ().GetByKey (key )
1615
- if err != nil && ! exists {
1616
- klog .Warningf ("[Informer-addQJ] Recent copy of AW %s/%s not found in cache" , qj .Namespace , qj .Name )
1616
+ if err != nil || ! exists {
1617
+ klog .Warningf ("[Informer-addQJ] Recent copy of AW %s/%s not found in cache,stopping check for completion status" , qj .Namespace , qj .Name )
1618
+ break
1617
1619
} else {
1618
1620
var latestAw * arbv1.AppWrapper
1619
1621
if latestObj != nil {
@@ -1648,8 +1650,9 @@ func (cc *XController) addQueueJob(obj interface{}) {
1648
1650
for {
1649
1651
time .Sleep (requeueInterval )
1650
1652
latestObj , exists , err := cc .appwrapperInformer .Informer ().GetStore ().GetByKey (key )
1651
- if err != nil && ! exists {
1652
- klog .Warningf ("[Informer-addQJ] Recent copy of AW %s/%s not found in cache" , qj .Namespace , qj .Name )
1653
+ if err != nil || ! exists {
1654
+ klog .Warningf ("[Informer-addQJ] Recent copy of AW %s/%s not found in cache, stopping check for minScheduling" , qj .Namespace , qj .Name )
1655
+ break
1653
1656
} else {
1654
1657
var latestAw * arbv1.AppWrapper
1655
1658
if latestObj != nil {
@@ -1757,9 +1760,9 @@ func (cc *XController) enqueue(obj interface{}) error {
1757
1760
1758
1761
err := cc .eventQueue .Add (qj ) // add to FIFO queue if not in, update object & keep position if already in FIFO queue
1759
1762
if err != nil {
1760
- klog .Errorf ("[enqueue] Fail to enqueue %s/%s to eventQueue, ignore. *Delay=%.6f seconds &qj=%p Version=%s Status=%+v err=%#v" , qj .Namespace , qj .Name , time .Now ().Sub (qj .Status .ControllerFirstTimestamp .Time ).Seconds (), qj , qj .ResourceVersion , qj .Status , err )
1763
+ klog .Errorf ("[enqueue] Fail to enqueue %s/%s to eventQueue, ignore. *Delay=%.6f seconds Version=%s Status=%+v err=%#v" , qj .Namespace , qj .Name , time .Now ().Sub (qj .Status .ControllerFirstTimestamp .Time ).Seconds (), qj .ResourceVersion , qj .Status , err )
1761
1764
} else {
1762
- klog .V (10 ).Infof ("[enqueue] %s/%s *Delay=%.6f seconds eventQueue.Add_byEnqueue &qj=%p Version=%s Status=%+v" , qj .Namespace , qj .Name , time .Now ().Sub (qj .Status .ControllerFirstTimestamp .Time ).Seconds (), qj , qj .ResourceVersion , qj .Status )
1765
+ klog .V (10 ).Infof ("[enqueue] %s/%s *Delay=%.6f seconds eventQueue.Add_byEnqueue Version=%s Status=%+v" , qj .Namespace , qj .Name , time .Now ().Sub (qj .Status .ControllerFirstTimestamp .Time ).Seconds (), qj .ResourceVersion , qj .Status )
1763
1766
}
1764
1767
return err
1765
1768
}
0 commit comments