Skip to content

update only running AWs #562

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 11, 2023
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 21 additions & 38 deletions pkg/controller/queuejob/queuejob_controller_ex.go
Original file line number Diff line number Diff line change
Expand Up @@ -1452,52 +1452,35 @@ func (qjm *XController) UpdateAgent() {
}
}

//Move AW from Running to Completed or RunningHoldCompletion
func (qjm *XController) UpdateQueueJobs() {
firstTime := metav1.NowMicro()
// retrieve queueJobs from local cache. no guarantee queueJobs contain up-to-date information
queueJobs, err := qjm.appWrapperLister.AppWrappers("").List(labels.Everything())
if err != nil {
klog.Errorf("[UpdateQueueJobs] Failed to get a list of active appwrappers, err=%+v", err)
return
}
for _, newjob := range queueJobs {
// UpdateQueueJobs can be the first to see a new AppWrapper job, under heavy load
if newjob.Status.QueueJobState == "" {
newjob.Status.ControllerFirstTimestamp = firstTime
newjob.Status.SystemPriority = float64(newjob.Spec.Priority)
newjob.Status.QueueJobState = arbv1.AppWrapperCondInit
newjob.Status.Conditions = []arbv1.AppWrapperCondition{
{
Type: arbv1.AppWrapperCondInit,
Status: v1.ConditionTrue,
LastUpdateMicroTime: metav1.NowMicro(),
LastTransitionMicroTime: metav1.NowMicro(),
},
}
klog.V(6).Infof("[UpdateQueueJobs] Found new appwraper '%s/%s' 0Delay=%.6f seconds CreationTimestamp=%s ControllerFirstTimestamp=%s",
newjob.Namespace, newjob.Name, time.Now().Sub(newjob.Status.ControllerFirstTimestamp.Time).Seconds(), newjob.CreationTimestamp, newjob.Status.ControllerFirstTimestamp)
}
// only set if appwrapper is running and dispatch time is not set previously
if newjob.Status.QueueJobState == "Running" && newjob.Status.ControllerFirstDispatchTimestamp.String() == "0001-01-01 00:00:00 +0000 UTC" {
newjob.Status.ControllerFirstDispatchTimestamp = firstTime
}

klog.V(6).Infof("[UpdateQueueJobs] %s: qjqueue=%t &qj=%p Version=%s Status=%+v", newjob.Name, qjm.qjqueue.IfExist(newjob), newjob, newjob.ResourceVersion, newjob.Status)
// check eventQueue, qjqueue in program sequence to make sure job is not in qjqueue
if _, exists, _ := qjm.eventQueue.Get(newjob); exists {
klog.V(6).Infof("[UpdateQueueJobs] app wrapper %s/%s found in the event queue, not adding it", newjob.Namespace, newjob.Name)
continue
} // do not enqueue if already in eventQueue
if qjm.qjqueue.IfExist(newjob) {
klog.V(6).Infof("[UpdateQueueJobs] app wrapper %s/%s found in the job queue, not adding it", newjob.Namespace, newjob.Name)
continue
} // do not enqueue if already in qjqueue

err = qjm.enqueueIfNotPresent(newjob)
if err != nil {
klog.Errorf("[UpdateQueueJobs] Fail to enqueue %s to eventQueue, ignore. *Delay=%.6f seconds &qj=%p Version=%s Status=%+v err=%#v", newjob.Name, time.Now().Sub(newjob.Status.ControllerFirstTimestamp.Time).Seconds(), newjob, newjob.ResourceVersion, newjob.Status, err)
} else {
klog.V(6).Infof("[UpdateQueueJobs] %s *Delay=%.6f seconds eventQueue.Add_byUpdateQueueJobs &qj=%p Version=%s Status=%+v", newjob.Name, time.Now().Sub(newjob.Status.ControllerFirstTimestamp.Time).Seconds(), newjob, newjob.ResourceVersion, newjob.Status)
if newjob.Status.State == arbv1.AppWrapperStateActive {
klog.V(6).Infof("[UpdateQueueJobs] %s: qjqueue=%t &qj=%p Version=%s Status=%+v", newjob.Name, qjm.qjqueue.IfExist(newjob), newjob, newjob.ResourceVersion, newjob.Status)
// check eventQueue, qjqueue in program sequence to make sure job is not in qjqueue
if _, exists, _ := qjm.eventQueue.Get(newjob); exists {
klog.V(6).Infof("[UpdateQueueJobs] app wrapper %s/%s found in the event queue, not adding it", newjob.Namespace, newjob.Name)
continue
} // do not enqueue if already in eventQueue
if qjm.qjqueue.IfExist(newjob) {
klog.V(6).Infof("[UpdateQueueJobs] app wrapper %s/%s found in the job queue, not adding it", newjob.Namespace, newjob.Name)
continue
} // do not enqueue if already in qjqueue
//Remove stale copy
qjm.eventQueue.Delete(newjob)
//Add fresh copy
err = qjm.enqueueIfNotPresent(newjob)
if err != nil {
klog.Errorf("[UpdateQueueJobs] Fail to enqueue %s to eventQueue, ignore. *Delay=%.6f seconds &qj=%p Version=%s Status=%+v err=%#v", newjob.Name, time.Now().Sub(newjob.Status.ControllerFirstTimestamp.Time).Seconds(), newjob, newjob.ResourceVersion, newjob.Status, err)
} else {
klog.V(6).Infof("[UpdateQueueJobs] %s *Delay=%.6f seconds eventQueue.Add_byUpdateQueueJobs &qj=%p Version=%s Status=%+v", newjob.Name, time.Now().Sub(newjob.Status.ControllerFirstTimestamp.Time).Seconds(), newjob, newjob.ResourceVersion, newjob.Status)
}
}
}
}
Expand Down