Skip to content

update pod count beofre preemption #628

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 7, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 27 additions & 21 deletions pkg/controller/queuejob/queuejob_controller_ex.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,15 +364,21 @@ func (qjm *XController) PreemptQueueJobs() {
klog.Warningf("[PreemptQueueJobs] failed in retrieving a fresh copy of the app wrapper '%s/%s', err=%v. Will try to preempt on the next run.", aw.Namespace, aw.Name, err)
continue
}
//we need to update AW before analyzing it as a candidate for preemption
updateErr := qjm.UpdateQueueJobStatus(newjob)
if updateErr != nil {
klog.Warningf("[PreemptQueueJobs] update of pod count to AW %v failed hence skipping preemption", newjob.Name)
return
}
newjob.Status.CanRun = false
newjob.Status.FilterIgnore = true // update QueueJobState only
cleanAppWrapper := false
// If dispatch deadline is exceeded no matter what the state of AW, kill the job and set status as Failed.
if (aw.Status.State == arbv1.AppWrapperStateActive) && (aw.Spec.SchedSpec.DispatchDuration.Limit > 0) {
if aw.Spec.SchedSpec.DispatchDuration.Overrun {
index := getIndexOfMatchedCondition(aw, arbv1.AppWrapperCondPreemptCandidate, "DispatchDeadlineExceeded")
if (newjob.Status.State == arbv1.AppWrapperStateActive) && (newjob.Spec.SchedSpec.DispatchDuration.Limit > 0) {
if newjob.Spec.SchedSpec.DispatchDuration.Overrun {
index := getIndexOfMatchedCondition(newjob, arbv1.AppWrapperCondPreemptCandidate, "DispatchDeadlineExceeded")
if index < 0 {
message = fmt.Sprintf("Dispatch deadline exceeded. allowed to run for %v seconds", aw.Spec.SchedSpec.DispatchDuration.Limit)
message = fmt.Sprintf("Dispatch deadline exceeded. allowed to run for %v seconds", newjob.Spec.SchedSpec.DispatchDuration.Limit)
cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondPreemptCandidate, v1.ConditionTrue, "DispatchDeadlineExceeded", message)
newjob.Status.Conditions = append(newjob.Status.Conditions, cond)
} else {
Expand All @@ -387,7 +393,7 @@ func (qjm *XController) PreemptQueueJobs() {

err := qjm.updateStatusInEtcdWithRetry(ctx, updateNewJob, "PreemptQueueJobs - CanRun: false -- DispatchDeadlineExceeded")
if err != nil {
klog.Warningf("[PreemptQueueJobs] status update CanRun: false -- DispatchDeadlineExceeded for '%s/%s' failed", aw.Namespace, aw.Name)
klog.Warningf("[PreemptQueueJobs] status update CanRun: false -- DispatchDeadlineExceeded for '%s/%s' failed", newjob.Namespace, newjob.Name)
continue
}
// cannot use cleanup AW, since it puts AW back in running state
Expand All @@ -398,33 +404,33 @@ func (qjm *XController) PreemptQueueJobs() {
}
}

if ((aw.Status.Running + aw.Status.Succeeded) < int32(aw.Spec.SchedSpec.MinAvailable)) && aw.Status.State == arbv1.AppWrapperStateActive {
index := getIndexOfMatchedCondition(aw, arbv1.AppWrapperCondPreemptCandidate, "MinPodsNotRunning")
if ((newjob.Status.Running + newjob.Status.Succeeded) < int32(newjob.Spec.SchedSpec.MinAvailable)) && newjob.Status.State == arbv1.AppWrapperStateActive {
index := getIndexOfMatchedCondition(newjob, arbv1.AppWrapperCondPreemptCandidate, "MinPodsNotRunning")
if index < 0 {
message = fmt.Sprintf("Insufficient number of Running and Completed pods, minimum=%d, running=%d, completed=%d.", aw.Spec.SchedSpec.MinAvailable, aw.Status.Running, aw.Status.Succeeded)
message = fmt.Sprintf("Insufficient number of Running and Completed pods, minimum=%d, running=%d, completed=%d.", newjob.Spec.SchedSpec.MinAvailable, newjob.Status.Running, newjob.Status.Succeeded)
cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondPreemptCandidate, v1.ConditionTrue, "MinPodsNotRunning", message)
newjob.Status.Conditions = append(newjob.Status.Conditions, cond)
} else {
cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondPreemptCandidate, v1.ConditionTrue, "MinPodsNotRunning", "")
newjob.Status.Conditions[index] = *cond.DeepCopy()
}

if aw.Spec.SchedSpec.Requeuing.InitialTimeInSeconds == 0 {
aw.Spec.SchedSpec.Requeuing.InitialTimeInSeconds = aw.Spec.SchedSpec.Requeuing.TimeInSeconds
if newjob.Spec.SchedSpec.Requeuing.InitialTimeInSeconds == 0 {
newjob.Spec.SchedSpec.Requeuing.InitialTimeInSeconds = newjob.Spec.SchedSpec.Requeuing.TimeInSeconds
}
if aw.Spec.SchedSpec.Requeuing.GrowthType == "exponential" {
if newjob.Spec.SchedSpec.Requeuing.GrowthType == "exponential" {
if newjob.Status.RequeueingTimeInSeconds == 0 {
newjob.Status.RequeueingTimeInSeconds += aw.Spec.SchedSpec.Requeuing.TimeInSeconds
newjob.Status.RequeueingTimeInSeconds += newjob.Spec.SchedSpec.Requeuing.TimeInSeconds
} else {
newjob.Status.RequeueingTimeInSeconds += newjob.Status.RequeueingTimeInSeconds
}
} else if aw.Spec.SchedSpec.Requeuing.GrowthType == "linear" {
newjob.Status.RequeueingTimeInSeconds += aw.Spec.SchedSpec.Requeuing.InitialTimeInSeconds
} else if newjob.Spec.SchedSpec.Requeuing.GrowthType == "linear" {
newjob.Status.RequeueingTimeInSeconds += newjob.Spec.SchedSpec.Requeuing.InitialTimeInSeconds
}

if aw.Spec.SchedSpec.Requeuing.MaxTimeInSeconds > 0 {
if aw.Spec.SchedSpec.Requeuing.MaxTimeInSeconds <= newjob.Status.RequeueingTimeInSeconds {
newjob.Status.RequeueingTimeInSeconds = aw.Spec.SchedSpec.Requeuing.MaxTimeInSeconds
if newjob.Spec.SchedSpec.Requeuing.MaxTimeInSeconds > 0 {
if newjob.Spec.SchedSpec.Requeuing.MaxTimeInSeconds <= newjob.Status.RequeueingTimeInSeconds {
newjob.Status.RequeueingTimeInSeconds = newjob.Spec.SchedSpec.Requeuing.MaxTimeInSeconds
}
}

Expand All @@ -438,7 +444,7 @@ func (qjm *XController) PreemptQueueJobs() {
updateNewJob = newjob.DeepCopy()
} else {
// If pods failed scheduling generate new preempt condition
message = fmt.Sprintf("Pods failed scheduling failed=%v, running=%v.", len(aw.Status.PendingPodConditions), aw.Status.Running)
message = fmt.Sprintf("Pods failed scheduling failed=%v, running=%v.", len(newjob.Status.PendingPodConditions), newjob.Status.Running)
index := getIndexOfMatchedCondition(newjob, arbv1.AppWrapperCondPreemptCandidate, "PodsFailedScheduling")
// ignore co-scheduler failed scheduling events. This is a temp
// work-around until co-scheduler version 0.22.X perf issues are resolved.
Expand All @@ -455,17 +461,17 @@ func (qjm *XController) PreemptQueueJobs() {

err = qjm.updateStatusInEtcdWithRetry(ctx, updateNewJob, "PreemptQueueJobs - CanRun: false -- MinPodsNotRunning")
if err != nil {
klog.Warningf("[PreemptQueueJobs] status update for '%s/%s' failed, skipping app wrapper err =%v", aw.Namespace, aw.Name, err)
klog.Warningf("[PreemptQueueJobs] status update for '%s/%s' failed, skipping app wrapper err =%v", newjob.Namespace, newjob.Name, err)
continue
}

if cleanAppWrapper {
klog.V(4).Infof("[PreemptQueueJobs] Deleting AppWrapper %s/%s due to maximum number of re-queueing(s) exceeded.", aw.Name, aw.Namespace)
klog.V(4).Infof("[PreemptQueueJobs] Deleting AppWrapper %s/%s due to maximum number of re-queueing(s) exceeded.", newjob.Name, newjob.Namespace)
go qjm.Cleanup(ctx, updateNewJob)
} else {
// Only back-off AWs that are in state running and not in state Failed
if updateNewJob.Status.State != arbv1.AppWrapperStateFailed {
klog.Infof("[PreemptQueueJobs] Adding preempted AppWrapper %s/%s to back off queue.", aw.Name, aw.Namespace)
klog.Infof("[PreemptQueueJobs] Adding preempted AppWrapper %s/%s to back off queue.", newjob.Name, newjob.Namespace)
qjm.backoff(ctx, updateNewJob, "PreemptionTriggered", string(message))
}
}
Expand Down