Skip to content

Commit 92b838b

Browse files
asm582openshift-merge-robot
authored andcommitted
address review
1 parent bb5f3b1 commit 92b838b

File tree

1 file changed

+112
-106
lines changed

1 file changed

+112
-106
lines changed

Diff for: pkg/controller/queuejob/queuejob_controller_ex.go

+112-106
Original file line numberDiff line numberDiff line change
@@ -348,21 +348,22 @@ func NewJobController(config *rest.Config, serverOption *options.ServerOption) *
348348

349349
// TODO: We can use informer to filter AWs that do not meet the minScheduling spec.
350350
// we still need a thread for dispatch duration but minScheduling spec can definetly be moved to an informer
351-
func (qjm *XController) PreemptQueueJobs() {
351+
func (qjm *XController) PreemptQueueJobs(inspectAw *arbv1.AppWrapper) {
352352
ctx := context.Background()
353+
aw := qjm.GetQueueJobEligibleForPreemption(inspectAw)
354+
if aw != nil {
353355

354-
qjobs := qjm.GetQueueJobsEligibleForPreemption()
355-
for _, aw := range qjobs {
356+
//for _, aw := range qjobs {
356357
if aw.Status.State == arbv1.AppWrapperStateCompleted || aw.Status.State == arbv1.AppWrapperStateDeleted || aw.Status.State == arbv1.AppWrapperStateFailed {
357-
continue
358+
return
358359
}
359360

360361
var updateNewJob *arbv1.AppWrapper
361362
var message string
362363
newjob, err := qjm.getAppWrapper(aw.Namespace, aw.Name, "[PreemptQueueJobs] get fresh app wrapper")
363364
if err != nil {
364365
klog.Warningf("[PreemptQueueJobs] failed in retrieving a fresh copy of the app wrapper '%s/%s', err=%v. Will try to preempt on the next run.", aw.Namespace, aw.Name, err)
365-
continue
366+
return
366367
}
367368
//we need to update AW before analyzing it as a candidate for preemption
368369
updateErr := qjm.UpdateQueueJobStatus(newjob)
@@ -394,13 +395,11 @@ func (qjm *XController) PreemptQueueJobs() {
394395
err := qjm.updateStatusInEtcdWithRetry(ctx, updateNewJob, "PreemptQueueJobs - CanRun: false -- DispatchDeadlineExceeded")
395396
if err != nil {
396397
klog.Warningf("[PreemptQueueJobs] status update CanRun: false -- DispatchDeadlineExceeded for '%s/%s' failed", newjob.Namespace, newjob.Name)
397-
continue
398+
return
398399
}
399400
// cannot use cleanup AW, since it puts AW back in running state
400401
qjm.qjqueue.AddUnschedulableIfNotPresent(updateNewJob)
401402

402-
// Move to next AW
403-
continue
404403
}
405404
}
406405

@@ -462,7 +461,7 @@ func (qjm *XController) PreemptQueueJobs() {
462461
err = qjm.updateStatusInEtcdWithRetry(ctx, updateNewJob, "PreemptQueueJobs - CanRun: false -- MinPodsNotRunning")
463462
if err != nil {
464463
klog.Warningf("[PreemptQueueJobs] status update for '%s/%s' failed, skipping app wrapper err =%v", newjob.Namespace, newjob.Name, err)
465-
continue
464+
return
466465
}
467466

468467
if cleanAppWrapper {
@@ -506,98 +505,83 @@ func (qjm *XController) preemptAWJobs(ctx context.Context, preemptAWs []*arbv1.A
506505
}
507506
}
508507

509-
func (qjm *XController) GetQueueJobsEligibleForPreemption() []*arbv1.AppWrapper {
510-
qjobs := make([]*arbv1.AppWrapper, 0)
511-
512-
queueJobs, err := qjm.appWrapperLister.AppWrappers("").List(labels.Everything())
513-
if err != nil {
514-
klog.Errorf("List of queueJobs %+v", qjobs)
515-
return qjobs
516-
}
508+
func (qjm *XController) GetQueueJobEligibleForPreemption(value *arbv1.AppWrapper) *arbv1.AppWrapper {
517509

518510
if !qjm.isDispatcher { // Agent Mode
519-
for _, value := range queueJobs {
520-
521-
// Skip if AW Pending or just entering the system and does not have a state yet.
522-
if (value.Status.State == arbv1.AppWrapperStateEnqueued) || (value.Status.State == "") {
523-
continue
524-
}
525511

526-
if value.Status.State == arbv1.AppWrapperStateActive && value.Spec.SchedSpec.DispatchDuration.Limit > 0 {
527-
awDispatchDurationLimit := value.Spec.SchedSpec.DispatchDuration.Limit
528-
dispatchDuration := value.Status.ControllerFirstDispatchTimestamp.Add(time.Duration(awDispatchDurationLimit) * time.Second)
529-
currentTime := time.Now()
530-
dispatchTimeExceeded := !currentTime.Before(dispatchDuration)
512+
if value.Status.State == arbv1.AppWrapperStateActive && value.Spec.SchedSpec.DispatchDuration.Limit > 0 {
513+
awDispatchDurationLimit := value.Spec.SchedSpec.DispatchDuration.Limit
514+
dispatchDuration := value.Status.ControllerFirstDispatchTimestamp.Add(time.Duration(awDispatchDurationLimit) * time.Second)
515+
currentTime := time.Now()
516+
dispatchTimeExceeded := !currentTime.Before(dispatchDuration)
531517

532-
if dispatchTimeExceeded {
533-
klog.V(8).Infof("Appwrapper Dispatch limit exceeded, currentTime %v, dispatchTimeInSeconds %v", currentTime, dispatchDuration)
534-
value.Spec.SchedSpec.DispatchDuration.Overrun = true
535-
qjobs = append(qjobs, value)
536-
// Got AW which exceeded dispatch runtime limit, move to next AW
537-
continue
538-
}
518+
if dispatchTimeExceeded {
519+
klog.V(8).Infof("Appwrapper Dispatch limit exceeded, currentTime %v, dispatchTimeInSeconds %v", currentTime, dispatchDuration)
520+
value.Spec.SchedSpec.DispatchDuration.Overrun = true
521+
// Got AW which exceeded dispatch runtime limit, move to next AW
522+
return value
539523
}
540-
replicas := value.Spec.SchedSpec.MinAvailable
524+
}
525+
replicas := value.Spec.SchedSpec.MinAvailable
541526

542-
if (int(value.Status.Running) + int(value.Status.Succeeded)) < replicas {
527+
if (int(value.Status.Running) + int(value.Status.Succeeded)) < replicas {
543528

544-
// Find the dispatched condition if there is any
545-
numConditions := len(value.Status.Conditions)
546-
var dispatchedCondition arbv1.AppWrapperCondition
547-
dispatchedConditionExists := false
529+
// Find the dispatched condition if there is any
530+
numConditions := len(value.Status.Conditions)
531+
var dispatchedCondition arbv1.AppWrapperCondition
532+
dispatchedConditionExists := false
548533

549-
for i := numConditions - 1; i > 0; i-- {
550-
dispatchedCondition = value.Status.Conditions[i]
551-
if dispatchedCondition.Type != arbv1.AppWrapperCondDispatched {
552-
continue
553-
}
554-
dispatchedConditionExists = true
555-
break
534+
for i := numConditions - 1; i > 0; i-- {
535+
dispatchedCondition = value.Status.Conditions[i]
536+
if dispatchedCondition.Type != arbv1.AppWrapperCondDispatched {
537+
continue
556538
}
539+
dispatchedConditionExists = true
540+
break
541+
}
557542

558-
// Check for the minimum age and then skip preempt if current time is not beyond minimum age
559-
// The minimum age is controlled by the requeuing.TimeInSeconds stanza
560-
// For preemption, the time is compared to the last condition or the dispatched condition in the AppWrapper, whichever happened later
561-
lastCondition := value.Status.Conditions[numConditions-1]
562-
var condition arbv1.AppWrapperCondition
543+
// Check for the minimum age and then skip preempt if current time is not beyond minimum age
544+
// The minimum age is controlled by the requeuing.TimeInSeconds stanza
545+
// For preemption, the time is compared to the last condition or the dispatched condition in the AppWrapper, whichever happened later
546+
lastCondition := value.Status.Conditions[numConditions-1]
547+
var condition arbv1.AppWrapperCondition
563548

564-
if dispatchedConditionExists && dispatchedCondition.LastTransitionMicroTime.After(lastCondition.LastTransitionMicroTime.Time) {
565-
condition = dispatchedCondition
566-
} else {
567-
condition = lastCondition
568-
}
569-
var requeuingTimeInSeconds int
570-
if value.Status.RequeueingTimeInSeconds > 0 {
571-
requeuingTimeInSeconds = value.Status.RequeueingTimeInSeconds
572-
} else if value.Spec.SchedSpec.Requeuing.InitialTimeInSeconds == 0 {
573-
requeuingTimeInSeconds = value.Spec.SchedSpec.Requeuing.TimeInSeconds
574-
} else {
575-
requeuingTimeInSeconds = value.Spec.SchedSpec.Requeuing.InitialTimeInSeconds
576-
}
549+
if dispatchedConditionExists && dispatchedCondition.LastTransitionMicroTime.After(lastCondition.LastTransitionMicroTime.Time) {
550+
condition = dispatchedCondition
551+
} else {
552+
condition = lastCondition
553+
}
554+
var requeuingTimeInSeconds int
555+
if value.Status.RequeueingTimeInSeconds > 0 {
556+
requeuingTimeInSeconds = value.Status.RequeueingTimeInSeconds
557+
} else if value.Spec.SchedSpec.Requeuing.InitialTimeInSeconds == 0 {
558+
requeuingTimeInSeconds = value.Spec.SchedSpec.Requeuing.TimeInSeconds
559+
} else {
560+
requeuingTimeInSeconds = value.Spec.SchedSpec.Requeuing.InitialTimeInSeconds
561+
}
577562

578-
minAge := condition.LastTransitionMicroTime.Add(time.Duration(requeuingTimeInSeconds) * time.Second)
579-
currentTime := time.Now()
563+
minAge := condition.LastTransitionMicroTime.Add(time.Duration(requeuingTimeInSeconds) * time.Second)
564+
currentTime := time.Now()
580565

581-
if currentTime.Before(minAge) {
582-
continue
583-
}
566+
if currentTime.Before(minAge) {
567+
return nil
568+
}
584569

585-
if replicas > 0 {
586-
klog.V(3).Infof("AppWrapper '%s/%s' is eligible for preemption Running: %d - minAvailable: %d , Succeeded: %d !!!", value.Namespace, value.Name, value.Status.Running, replicas, value.Status.Succeeded)
587-
qjobs = append(qjobs, value)
588-
}
589-
} else {
590-
// Preempt when schedulingSpec stanza is not set but pods fails scheduling.
591-
// ignore co-scheduler pods
592-
if len(value.Status.PendingPodConditions) > 0 {
593-
klog.V(3).Infof("AppWrapper '%s/%s' is eligible for preemption Running: %d , Succeeded: %d due to failed scheduling !!!", value.Namespace, value.Status.Running, value.Status.Succeeded)
594-
qjobs = append(qjobs, value)
595-
}
570+
if replicas > 0 {
571+
klog.V(3).Infof("AppWrapper '%s/%s' is eligible for preemption Running: %d - minAvailable: %d , Succeeded: %d !!!", value.Namespace, value.Name, value.Status.Running, replicas, value.Status.Succeeded)
572+
return value
573+
}
574+
} else {
575+
// Preempt when schedulingSpec stanza is not set but pods fails scheduling.
576+
// ignore co-scheduler pods
577+
if len(value.Status.PendingPodConditions) > 0 {
578+
klog.V(3).Infof("AppWrapper '%s/%s' is eligible for preemption Running: %d , Succeeded: %d due to failed scheduling !!!", value.Namespace, value.Status.Running, value.Status.Succeeded)
579+
return value
596580
}
597581
}
598582
}
599583

600-
return qjobs
584+
return nil
601585
}
602586

603587
func (qjm *XController) GetAggregatedResourcesPerGenericItem(cqj *arbv1.AppWrapper) []*clusterstateapi.Resource {
@@ -1500,20 +1484,8 @@ func (qjm *XController) backoff(ctx context.Context, q *arbv1.AppWrapper, reason
15001484
func (cc *XController) Run(stopCh <-chan struct{}) {
15011485
go cc.appwrapperInformer.Informer().Run(stopCh)
15021486

1503-
// go cc.qjobResControls[arbv1.ResourceTypePod].Run(stopCh)
1504-
15051487
cache.WaitForCacheSync(stopCh, cc.appWrapperSynced)
15061488

1507-
// cache is turned off, issue: https://github.com/project-codeflare/multi-cluster-app-dispatcher/issues/588
1508-
// update snapshot of ClientStateCache every second
1509-
// cc.cache.Run(stopCh)
1510-
1511-
// start preempt thread is used to preempt AWs that have partial pods or have reached dispatch duration
1512-
go wait.Until(cc.PreemptQueueJobs, 60*time.Second, stopCh)
1513-
1514-
// This thread is used to update AW that has completionstatus set to Complete or RunningHoldCompletion
1515-
//go wait.Until(cc.UpdateQueueJobs, 5*time.Second, stopCh)
1516-
15171489
if cc.isDispatcher {
15181490
go wait.Until(cc.UpdateAgent, 2*time.Second, stopCh) // In the Agent?
15191491
for _, jobClusterAgent := range cc.agentMap {
@@ -1653,29 +1625,63 @@ func (cc *XController) addQueueJob(obj interface{}) {
16531625
//When an AW entrs a system with completionstatus keep checking the AW until completed
16541626
//updatequeuejobs now runs as a part of informer machinery. optimization here is to not use etcd to pullout submitted AWs and operate
16551627
//on stale AWs. This has potential to improve performance at scale.
1656-
//if qj.Status.State != arbv1.AppWrapperStateCompleted && qj.Status.State != arbv1.AppWrapperStateFailed && qj.Status.State != "" {
1657-
requeueInterval := 30 * time.Second
1658-
key, err := cache.MetaNamespaceKeyFunc(qj)
1659-
if err == nil {
1628+
if hasCompletionStatus {
1629+
requeueInterval := 5 * time.Second
1630+
key, err := cache.MetaNamespaceKeyFunc(qj)
1631+
if err != nil {
1632+
klog.Warningf("[Informer-addQJ] Error getting AW %s from cache cannot determine completion status", qj.Name)
1633+
//TODO: should we return from this loop?
1634+
}
16601635
go func() {
16611636
for {
16621637
time.Sleep(requeueInterval)
16631638
latestAw, exists, err := cc.appwrapperInformer.Informer().GetStore().GetByKey(key)
1664-
if latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateActive && latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateEnqueued && latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateRunningHoldCompletion {
1665-
klog.V(2).Infof("[Informer-addQJ] Stopping requeue for AW %s with status %s", latestAw.(*arbv1.AppWrapper).Name, latestAw.(*arbv1.AppWrapper).Status.State)
1666-
break //Exit the loop
1667-
}
1668-
if err == nil && exists {
1639+
if err != nil && !exists {
1640+
klog.Warningf("[Informer-addQJ] Recent copy of AW %s not found in cache", qj.Name)
1641+
} else {
1642+
if latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateActive && latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateEnqueued && latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateRunningHoldCompletion {
1643+
klog.V(2).Infof("[Informer-addQJ] Stopping requeue for AW %s with status %s", latestAw.(*arbv1.AppWrapper).Name, latestAw.(*arbv1.AppWrapper).Status.State)
1644+
break //Exit the loop
1645+
}
16691646
// Enqueue the latest copy of the AW.
16701647
if (qj.Status.State != arbv1.AppWrapperStateCompleted && qj.Status.State != arbv1.AppWrapperStateFailed) && hasCompletionStatus {
16711648
cc.UpdateQueueJobs(latestAw.(*arbv1.AppWrapper))
1672-
klog.V(2).Infof("[Informer-addQJ] Finished requeing AW to determine completion status")
1649+
klog.V(2).Infof("[Informer-addQJ] requeing AW to determine completion status for AW", qj.Name)
1650+
}
1651+
1652+
}
1653+
1654+
}
1655+
}()
1656+
}
1657+
1658+
if qj.Spec.SchedSpec.MinAvailable > 0 {
1659+
requeueInterval := 60 * time.Second
1660+
key, err := cache.MetaNamespaceKeyFunc(qj)
1661+
if err != nil {
1662+
klog.Errorf("[Informer-addQJ] Error getting AW %s from cache cannot preempt AW", qj.Name)
1663+
//TODO: should we return from this loop?
1664+
}
1665+
go func() {
1666+
for {
1667+
time.Sleep(requeueInterval)
1668+
latestAw, exists, err := cc.appwrapperInformer.Informer().GetStore().GetByKey(key)
1669+
if err != nil && !exists {
1670+
klog.Warningf("[Informer-addQJ] Recent copy of AW %s not found in cache", qj.Name)
1671+
} else {
1672+
if latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateActive && latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateEnqueued && latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateRunningHoldCompletion {
1673+
klog.V(2).Infof("[Informer-addQJ] Stopping requeue for AW %s with status %s", latestAw.(*arbv1.AppWrapper).Name, latestAw.(*arbv1.AppWrapper).Status.State)
1674+
break //Exit the loop
1675+
}
1676+
// Enqueue the latest copy of the AW.
1677+
if (qj.Status.State != arbv1.AppWrapperStateCompleted && qj.Status.State != arbv1.AppWrapperStateFailed) && (qj.Spec.SchedSpec.MinAvailable > 0) {
1678+
cc.PreemptQueueJobs(latestAw.(*arbv1.AppWrapper))
1679+
klog.V(2).Infof("[Informer-addQJ] requeing AW to check minScheduling spec for AW", qj.Name)
16731680
}
16741681
}
16751682
}
16761683
}()
16771684
}
1678-
//}
16791685
}
16801686

16811687
func (cc *XController) updateQueueJob(oldObj, newObj interface{}) {

0 commit comments

Comments
 (0)