Skip to content

simplify acc logic #451

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 30, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 17 additions & 59 deletions pkg/controller/queuejob/queuejob_controller_ex.go
Original file line number Diff line number Diff line change
Expand Up @@ -904,7 +904,9 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust
klog.V(11).Infof("[getAggAvaiResPri] %s: Skipping adjustments for %s since it is the job being processed.", time.Now().String(), value.Name)
continue
} else if !value.Status.CanRun {
klog.V(11).Infof("[getAggAvaiResPri] %s: Skipping adjustments for %s since it can not run.", time.Now().String(), value.Name)
totalResource := qjm.addTotalSnapshotResourcesConsumedByAw(value.Status.TotalGPU, value.Status.TotalCPU, value.Status.TotalMemory)
klog.V(6).Infof("[getAggAvaiResPri] %s: AW %s cannot run, adding any dangling pod resources %v while it being preempted.", time.Now().String(), value.Name, totalResource)
preemptable = preemptable.Add(totalResource)
continue
} else if value.Status.SystemPriority < targetpr {
// Dispatcher Mode: Ensure this job is part of the target cluster
Expand All @@ -929,7 +931,7 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust

totalResource := qjm.addTotalSnapshotResourcesConsumedByAw(value.Status.TotalGPU, value.Status.TotalCPU, value.Status.TotalMemory)
preemptable = preemptable.Add(totalResource)

klog.V(6).Infof("[getAggAvaiResPri] %s proirity %v is lower target priority %v reclaiming total preemptable resources %v", value.Name, value.Status.SystemPriority, targetpr, totalResource)
continue
} else if qjm.isDispatcher {
// Dispatcher job does not currently track pod states. This is
Expand All @@ -938,70 +940,26 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust
klog.V(10).Infof("[getAggAvaiResPri] %s: Skipping adjustments for %s since priority %f is >= %f of requesting job: %s.", time.Now().String(),
value.Name, value.Status.SystemPriority, targetpr, requestingJob.Name)
continue
} else if value.Status.State == arbv1.AppWrapperStateEnqueued {
// Don't count the resources that can run but not yet realized (job orchestration pending or partially running).
} else if value.Status.CanRun {
qjv := clusterstateapi.EmptyResource()
for _, resctrl := range qjm.qjobResControls {
qjv := resctrl.GetAggregatedResources(value)
pending = pending.Add(qjv)
res := resctrl.GetAggregatedResources(value)
qjv.Add(res)
klog.V(10).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v but state is still pending.", qjv, resctrl, value.Name, value.Status.CanRun)
}
for _, genericItem := range value.Spec.AggrResources.GenericItems {
qjv, _ := genericresource.GetResources(&genericItem)
pending = pending.Add(qjv)
klog.V(10).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v but state is still pending.", qjv, genericItem, value.Name, value.Status.CanRun)
res, _ := genericresource.GetResources(&genericItem)
qjv.Add(res)
klog.V(10).Infof("[getAggAvaiResPri] Subtract all resources %+v in genericItem=%T for job %s which can-run is set to: %v but state is still pending.", qjv, genericItem, value.Name, value.Status.CanRun)
}

continue
} else if value.Status.State == arbv1.AppWrapperStateActive {
if value.Status.Pending > 0 {
//Don't count partially running jobs with pods still pending.
for _, resctrl := range qjm.qjobResControls {
qjv := resctrl.GetAggregatedResources(value)
pending = pending.Add(qjv)
klog.V(4).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v and status set to: %s but no pod counts in the state have been defined.", qjv, resctrl, value.Name, value.Status.CanRun, value.Status.State)
}
for _, genericItem := range value.Spec.AggrResources.GenericItems {
qjv, _ := genericresource.GetResources(&genericItem)
pending = pending.Add(qjv)
klog.V(4).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v and status set to: %s but no pod counts in the state have been defined.", qjv, genericItem, value.Name, value.Status.CanRun, value.Status.State)
}

} else {
// TODO: Hack to handle race condition when Running jobs have not yet updated the pod counts (In-Flight AW Jobs)
// This hack uses the golang struct implied behavior of defining the object without a value. In this case
// of using 'int32' novalue and value of 0 are the same.
if value.Status.Pending == 0 && value.Status.Running == 0 && value.Status.Succeeded == 0 && value.Status.Failed == 0 {

// In some cases the object wrapped in the appwrapper never creates pod. This likely happens
// in a custom resource that does some processing and errors occur before creating the pod or
// even there is not a problem within the CR controler but when the K8s quota is hit not
// allowing pods to get create due the admission controller. This check will now put a timeout
// on reserving these resources that are "in-flight")
dispatchedCond := qjm.getLatestStatusConditionType(value, arbv1.AppWrapperCondDispatched)

// If pod counts for AW have not updated within the timeout window, account for
// this object's resources to give the object controller more time to start creating
// pods. This matters when resources are scare. Once the timeout expires,
// resources for this object will not be held and other AW may be dispatched which
// could consume resources initially allocated for this object. This is to handle
// object controllers (essentially custom resource controllers) that do not work as
// expected by creating pods.
if qjm.waitForPodCountUpdates(dispatchedCond) {
for _, resctrl := range qjm.qjobResControls {
qjv := resctrl.GetAggregatedResources(value)
pending = pending.Add(qjv)
klog.V(4).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v and status set to: %s but no pod counts in the state have been defined.", qjv, resctrl, value.Name, value.Status.CanRun, value.Status.State)
}
for _, genericItem := range value.Spec.AggrResources.GenericItems {
qjv, _ := genericresource.GetResources(&genericItem)
pending = pending.Add(qjv)
klog.V(4).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v and status set to: %s but no pod counts in the state have been defined.", qjv, genericItem, value.Name, value.Status.CanRun, value.Status.State)
}
} else {
klog.V(4).Infof("[getAggAvaiResPri] Resources will no longer be reserved for %s/%s due to timeout of %d ms for pod creating.", value.Name, value.Namespace, qjm.serverOption.DispatchResourceReservationTimeout)
}
}
totalResource := qjm.addTotalSnapshotResourcesConsumedByAw(value.Status.TotalGPU, value.Status.TotalCPU, value.Status.TotalMemory)
delta, err := qjv.NonNegSub(totalResource)
if err != nil {
klog.Warningf("[getAggAvaiResPri] Subtraction of resources failed, adding entire appwrapper resoources %v, %v", qjv, err)
pending = qjv
}
pending = pending.Add(delta)
continue
} else {
//Do nothing
Expand Down