Skip to content

update only running AWs #562

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 11, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 74 additions & 101 deletions pkg/controller/queuejob/queuejob_controller_ex.go
Original file line number Diff line number Diff line change
Expand Up @@ -1430,7 +1430,7 @@ func (cc *XController) Run(stopCh <-chan struct{}) {
// start preempt thread based on preemption of pods
go wait.Until(cc.PreemptQueueJobs, 60*time.Second, stopCh)

// This thread is used as a heartbeat to calculate runtime spec in the status
// This thread is used to update AW that has completionstatus set to Complete or RunningHoldCompletion
go wait.Until(cc.UpdateQueueJobs, 5*time.Second, stopCh)

if cc.isDispatcher {
Expand All @@ -1452,52 +1452,89 @@ func (qjm *XController) UpdateAgent() {
}
}

// Move AW from Running to Completed or RunningHoldCompletion
// Do not use event queues! Running AWs move to Completed, from which it will never transition to any other state.
// State transition: Running->RunningHoldCompletion->Completed
func (qjm *XController) UpdateQueueJobs() {
firstTime := metav1.NowMicro()
// retrieve queueJobs from local cache. no guarantee queueJobs contain up-to-date information
queueJobs, err := qjm.appWrapperLister.AppWrappers("").List(labels.Everything())
if err != nil {
klog.Errorf("[UpdateQueueJobs] Failed to get a list of active appwrappers, err=%+v", err)
return
}
containsCompletionStatus := false
for _, newjob := range queueJobs {
// UpdateQueueJobs can be the first to see a new AppWrapper job, under heavy load
if newjob.Status.QueueJobState == "" {
newjob.Status.ControllerFirstTimestamp = firstTime
newjob.Status.SystemPriority = float64(newjob.Spec.Priority)
newjob.Status.QueueJobState = arbv1.AppWrapperCondInit
newjob.Status.Conditions = []arbv1.AppWrapperCondition{
{
Type: arbv1.AppWrapperCondInit,
Status: v1.ConditionTrue,
LastUpdateMicroTime: metav1.NowMicro(),
LastTransitionMicroTime: metav1.NowMicro(),
},
}
klog.V(6).Infof("[UpdateQueueJobs] Found new appwraper '%s/%s' 0Delay=%.6f seconds CreationTimestamp=%s ControllerFirstTimestamp=%s",
newjob.Namespace, newjob.Name, time.Now().Sub(newjob.Status.ControllerFirstTimestamp.Time).Seconds(), newjob.CreationTimestamp, newjob.Status.ControllerFirstTimestamp)
}
// only set if appwrapper is running and dispatch time is not set previously
if newjob.Status.QueueJobState == "Running" && newjob.Status.ControllerFirstDispatchTimestamp.String() == "0001-01-01 00:00:00 +0000 UTC" {
newjob.Status.ControllerFirstDispatchTimestamp = firstTime
for _, item := range newjob.Spec.AggrResources.GenericItems {
if len(item.CompletionStatus) > 0 {
containsCompletionStatus = true
}
}
if (newjob.Status.State == arbv1.AppWrapperStateActive || newjob.Status.State == arbv1.AppWrapperStateRunningHoldCompletion) && containsCompletionStatus {
err := qjm.qjobResControls[arbv1.ResourceTypePod].UpdateQueueJobStatus(newjob)
if err != nil {
klog.Errorf("[UpdateQueueJobs] Error updating pod status counts for AppWrapper job: %s, err=%+v", newjob.Name, err)
continue
}
klog.V(6).Infof("[UpdateQueueJobs] %s: qjqueue=%t &qj=%p Version=%s Status=%+v", newjob.Name, qjm.qjqueue.IfExist(newjob), newjob, newjob.ResourceVersion, newjob.Status)
// set appwrapper status to Complete or RunningHoldCompletion
derivedAwStatus := qjm.getAppWrapperCompletionStatus(newjob)

klog.V(6).Infof("[UpdateQueueJobs] %s: qjqueue=%t &qj=%p Version=%s Status=%+v", newjob.Name, qjm.qjqueue.IfExist(newjob), newjob, newjob.ResourceVersion, newjob.Status)
// check eventQueue, qjqueue in program sequence to make sure job is not in qjqueue
if _, exists, _ := qjm.eventQueue.Get(newjob); exists {
klog.V(6).Infof("[UpdateQueueJobs] app wrapper %s/%s found in the event queue, not adding it", newjob.Namespace, newjob.Name)
continue
} // do not enqueue if already in eventQueue
if qjm.qjqueue.IfExist(newjob) {
klog.V(6).Infof("[UpdateQueueJobs] app wrapper %s/%s found in the job queue, not adding it", newjob.Namespace, newjob.Name)
continue
} // do not enqueue if already in qjqueue
klog.Infof("[UpdateQueueJobs] Got completion status '%s' for app wrapper '%s/%s' Version=%s Status.CanRun=%t Status.State=%s, pod counts [Pending: %d, Running: %d, Succeded: %d, Failed %d]", derivedAwStatus, newjob.Namespace, newjob.Name, newjob.ResourceVersion,
newjob.Status.CanRun, newjob.Status.State, newjob.Status.Pending, newjob.Status.Running, newjob.Status.Succeeded, newjob.Status.Failed)

err = qjm.enqueueIfNotPresent(newjob)
if err != nil {
klog.Errorf("[UpdateQueueJobs] Fail to enqueue %s to eventQueue, ignore. *Delay=%.6f seconds &qj=%p Version=%s Status=%+v err=%#v", newjob.Name, time.Now().Sub(newjob.Status.ControllerFirstTimestamp.Time).Seconds(), newjob, newjob.ResourceVersion, newjob.Status, err)
} else {
klog.V(6).Infof("[UpdateQueueJobs] %s *Delay=%.6f seconds eventQueue.Add_byUpdateQueueJobs &qj=%p Version=%s Status=%+v", newjob.Name, time.Now().Sub(newjob.Status.ControllerFirstTimestamp.Time).Seconds(), newjob, newjob.ResourceVersion, newjob.Status)
// Set Appwrapper state to complete if all items in Appwrapper
// are completed
if derivedAwStatus == arbv1.AppWrapperStateRunningHoldCompletion {
newjob.Status.State = derivedAwStatus
var updateQj *arbv1.AppWrapper
index := getIndexOfMatchedCondition(newjob, arbv1.AppWrapperCondRunningHoldCompletion, "SomeItemsCompleted")
if index < 0 {
newjob.Status.QueueJobState = arbv1.AppWrapperCondRunningHoldCompletion
cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondRunningHoldCompletion, v1.ConditionTrue, "SomeItemsCompleted", "")
newjob.Status.Conditions = append(newjob.Status.Conditions, cond)
newjob.Status.FilterIgnore = true // Update AppWrapperCondRunningHoldCompletion
updateQj = newjob.DeepCopy()
} else {
cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondRunningHoldCompletion, v1.ConditionTrue, "SomeItemsCompleted", "")
newjob.Status.Conditions[index] = *cond.DeepCopy()
updateQj = newjob.DeepCopy()
}
err := qjm.updateStatusInEtcdWithRetry(context.Background(), updateQj, "[UpdateQueueJobs] setRunningHoldCompletion")
if err != nil {
//TODO: implement retry
klog.Errorf("[UpdateQueueJobs] Error updating status 'setRunningHoldCompletion' for AppWrapper: '%s/%s',Status=%+v, err=%+v.", newjob.Namespace, newjob.Name, newjob.Status, err)
}
}
// Set appwrapper status to complete
if derivedAwStatus == arbv1.AppWrapperStateCompleted {
newjob.Status.State = derivedAwStatus
newjob.Status.CanRun = false
var updateQj *arbv1.AppWrapper
index := getIndexOfMatchedCondition(newjob, arbv1.AppWrapperCondCompleted, "PodsCompleted")
if index < 0 {
newjob.Status.QueueJobState = arbv1.AppWrapperCondCompleted
cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondCompleted, v1.ConditionTrue, "PodsCompleted", "")
newjob.Status.Conditions = append(newjob.Status.Conditions, cond)
newjob.Status.FilterIgnore = true // Update AppWrapperCondCompleted
updateQj = newjob.DeepCopy()
} else {
cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondCompleted, v1.ConditionTrue, "PodsCompleted", "")
newjob.Status.Conditions[index] = *cond.DeepCopy()
updateQj = newjob.DeepCopy()
}
err := qjm.updateStatusInEtcdWithRetry(context.Background(), updateQj, "[UpdateQueueJobs] setCompleted")
if err != nil {
if qjm.quotaManager != nil {
qjm.quotaManager.Release(updateQj)
}
//TODO: Implement retry
klog.Errorf("[UpdateQueueJobs] Error updating status 'setCompleted' AppWrapper: '%s/%s',Status=%+v, err=%+v.", newjob.Namespace, newjob.Name, newjob.Status, err)
}
if qjm.quotaManager != nil {
qjm.quotaManager.Release(updateQj)
}
}
klog.Infof("[UpdateQueueJobs] Done getting completion status for app wrapper '%s/%s' Version=%s Status.CanRun=%t Status.State=%s, pod counts [Pending: %d, Running: %d, Succeded: %d, Failed %d]", newjob.Namespace, newjob.Name, newjob.ResourceVersion,
newjob.Status.CanRun, newjob.Status.State, newjob.Status.Pending, newjob.Status.Running, newjob.Status.Succeeded, newjob.Status.Failed)
}
}
}
Expand Down Expand Up @@ -1872,9 +1909,6 @@ func (cc *XController) manageQueueJob(ctx context.Context, qj *arbv1.AppWrapper,
klog.Errorf("manageQueueJob] Failed to add '%s/%s' to activeQueue. Back to eventQueue activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v err=%#v",
qj.Namespace, qj.Name, cc.qjqueue.IfExistActiveQ(qj), cc.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status, err00)
cc.enqueue(qj)
} else {
klog.V(3).Infof("[manageQueueJob] Added '%s/%s' to activeQueue queue 1Delay=%.6f seconds activeQ.Add_success activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v",
qj.Namespace, qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), cc.qjqueue.IfExistActiveQ(qj), cc.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status)
}
return nil
}
Expand Down Expand Up @@ -1960,67 +1994,6 @@ func (cc *XController) manageQueueJob(ctx context.Context, qj *arbv1.AppWrapper,
klog.Infof("[manageQueueJob] Getting completion status for app wrapper '%s/%s' Version=%s Status.CanRun=%t Status.State=%s, pod counts [Pending: %d, Running: %d, Succeded: %d, Failed %d]", qj.Namespace, qj.Name, qj.ResourceVersion,
qj.Status.CanRun, qj.Status.State, qj.Status.Pending, qj.Status.Running, qj.Status.Succeeded, qj.Status.Failed)

// set appwrapper status to Complete or RunningHoldCompletion
derivedAwStatus := cc.getAppWrapperCompletionStatus(qj)

klog.Infof("[manageQueueJob] Got completion status '%s' for app wrapper '%s/%s' Version=%s Status.CanRun=%t Status.State=%s, pod counts [Pending: %d, Running: %d, Succeded: %d, Failed %d]", derivedAwStatus, qj.Namespace, qj.Name, qj.ResourceVersion,
qj.Status.CanRun, qj.Status.State, qj.Status.Pending, qj.Status.Running, qj.Status.Succeeded, qj.Status.Failed)

// Set Appwrapper state to complete if all items in Appwrapper
// are completed
if derivedAwStatus == arbv1.AppWrapperStateRunningHoldCompletion {
qj.Status.State = derivedAwStatus
var updateQj *arbv1.AppWrapper
index := getIndexOfMatchedCondition(qj, arbv1.AppWrapperCondRunningHoldCompletion, "SomeItemsCompleted")
if index < 0 {
qj.Status.QueueJobState = arbv1.AppWrapperCondRunningHoldCompletion
cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondRunningHoldCompletion, v1.ConditionTrue, "SomeItemsCompleted", "")
qj.Status.Conditions = append(qj.Status.Conditions, cond)
qj.Status.FilterIgnore = true // Update AppWrapperCondRunningHoldCompletion
updateQj = qj.DeepCopy()
} else {
cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondRunningHoldCompletion, v1.ConditionTrue, "SomeItemsCompleted", "")
qj.Status.Conditions[index] = *cond.DeepCopy()
updateQj = qj.DeepCopy()
}
err := cc.updateStatusInEtcdWithRetry(ctx, updateQj, "[manageQueueJob] setRunningHoldCompletion")
if err != nil {
klog.Errorf("[manageQueueJob] Error updating status 'setRunningHoldCompletion' for AppWrapper: '%s/%s',Status=%+v, err=%+v.", qj.Namespace, qj.Name, qj.Status, err)
return err
}
}
// Set appwrapper status to complete
if derivedAwStatus == arbv1.AppWrapperStateCompleted {
qj.Status.State = derivedAwStatus
qj.Status.CanRun = false
var updateQj *arbv1.AppWrapper
index := getIndexOfMatchedCondition(qj, arbv1.AppWrapperCondCompleted, "PodsCompleted")
if index < 0 {
qj.Status.QueueJobState = arbv1.AppWrapperCondCompleted
cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondCompleted, v1.ConditionTrue, "PodsCompleted", "")
qj.Status.Conditions = append(qj.Status.Conditions, cond)
qj.Status.FilterIgnore = true // Update AppWrapperCondCompleted
updateQj = qj.DeepCopy()
} else {
cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondCompleted, v1.ConditionTrue, "PodsCompleted", "")
qj.Status.Conditions[index] = *cond.DeepCopy()
updateQj = qj.DeepCopy()
}
err := cc.updateStatusInEtcdWithRetry(ctx, updateQj, "[manageQueueJob] setCompleted")
if err != nil {
if cc.quotaManager != nil {
cc.quotaManager.Release(updateQj)
}
klog.Errorf("[manageQueueJob] Error updating status 'setCompleted' AppWrapper: '%s/%s',Status=%+v, err=%+v.", qj.Namespace, qj.Name, qj.Status, err)
return err
}
if cc.quotaManager != nil {
cc.quotaManager.Release(updateQj)
}
}
klog.Infof("[manageQueueJob] Done getting completion status for app wrapper '%s/%s' Version=%s Status.CanRun=%t Status.State=%s, pod counts [Pending: %d, Running: %d, Succeded: %d, Failed %d]", qj.Namespace, qj.Name, qj.ResourceVersion,
qj.Status.CanRun, qj.Status.State, qj.Status.Pending, qj.Status.Running, qj.Status.Succeeded, qj.Status.Failed)

} else if podPhaseChanges { // Continued bug fix
// Only update etcd if AW status has changed. This can happen for periodic
// updates of pod phase counts done in caller of this function.
Expand Down