Skip to content

Commit d4d856b

Browse files
authored
simplify acc logic (#451)
1 parent 7171e8e commit d4d856b

File tree

1 file changed

+17
-59
lines changed

1 file changed

+17
-59
lines changed

Diff for: pkg/controller/queuejob/queuejob_controller_ex.go

+17-59
Original file line numberDiff line numberDiff line change
@@ -904,7 +904,9 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust
904904
klog.V(11).Infof("[getAggAvaiResPri] %s: Skipping adjustments for %s since it is the job being processed.", time.Now().String(), value.Name)
905905
continue
906906
} else if !value.Status.CanRun {
907-
klog.V(11).Infof("[getAggAvaiResPri] %s: Skipping adjustments for %s since it can not run.", time.Now().String(), value.Name)
907+
totalResource := qjm.addTotalSnapshotResourcesConsumedByAw(value.Status.TotalGPU, value.Status.TotalCPU, value.Status.TotalMemory)
908+
klog.V(6).Infof("[getAggAvaiResPri] %s: AW %s cannot run, adding any dangling pod resources %v while it being preempted.", time.Now().String(), value.Name, totalResource)
909+
preemptable = preemptable.Add(totalResource)
908910
continue
909911
} else if value.Status.SystemPriority < targetpr {
910912
// Dispatcher Mode: Ensure this job is part of the target cluster
@@ -929,7 +931,7 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust
929931

930932
totalResource := qjm.addTotalSnapshotResourcesConsumedByAw(value.Status.TotalGPU, value.Status.TotalCPU, value.Status.TotalMemory)
931933
preemptable = preemptable.Add(totalResource)
932-
934+
klog.V(6).Infof("[getAggAvaiResPri] %s proirity %v is lower target priority %v reclaiming total preemptable resources %v", value.Name, value.Status.SystemPriority, targetpr, totalResource)
933935
continue
934936
} else if qjm.isDispatcher {
935937
// Dispatcher job does not currently track pod states. This is
@@ -938,70 +940,26 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust
938940
klog.V(10).Infof("[getAggAvaiResPri] %s: Skipping adjustments for %s since priority %f is >= %f of requesting job: %s.", time.Now().String(),
939941
value.Name, value.Status.SystemPriority, targetpr, requestingJob.Name)
940942
continue
941-
} else if value.Status.State == arbv1.AppWrapperStateEnqueued {
942-
// Don't count the resources that can run but not yet realized (job orchestration pending or partially running).
943+
} else if value.Status.CanRun {
944+
qjv := clusterstateapi.EmptyResource()
943945
for _, resctrl := range qjm.qjobResControls {
944-
qjv := resctrl.GetAggregatedResources(value)
945-
pending = pending.Add(qjv)
946+
res := resctrl.GetAggregatedResources(value)
947+
qjv.Add(res)
946948
klog.V(10).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v but state is still pending.", qjv, resctrl, value.Name, value.Status.CanRun)
947949
}
948950
for _, genericItem := range value.Spec.AggrResources.GenericItems {
949-
qjv, _ := genericresource.GetResources(&genericItem)
950-
pending = pending.Add(qjv)
951-
klog.V(10).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v but state is still pending.", qjv, genericItem, value.Name, value.Status.CanRun)
951+
res, _ := genericresource.GetResources(&genericItem)
952+
qjv.Add(res)
953+
klog.V(10).Infof("[getAggAvaiResPri] Subtract all resources %+v in genericItem=%T for job %s which can-run is set to: %v but state is still pending.", qjv, genericItem, value.Name, value.Status.CanRun)
952954
}
953955

954-
continue
955-
} else if value.Status.State == arbv1.AppWrapperStateActive {
956-
if value.Status.Pending > 0 {
957-
//Don't count partially running jobs with pods still pending.
958-
for _, resctrl := range qjm.qjobResControls {
959-
qjv := resctrl.GetAggregatedResources(value)
960-
pending = pending.Add(qjv)
961-
klog.V(4).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v and status set to: %s but no pod counts in the state have been defined.", qjv, resctrl, value.Name, value.Status.CanRun, value.Status.State)
962-
}
963-
for _, genericItem := range value.Spec.AggrResources.GenericItems {
964-
qjv, _ := genericresource.GetResources(&genericItem)
965-
pending = pending.Add(qjv)
966-
klog.V(4).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v and status set to: %s but no pod counts in the state have been defined.", qjv, genericItem, value.Name, value.Status.CanRun, value.Status.State)
967-
}
968-
969-
} else {
970-
// TODO: Hack to handle race condition when Running jobs have not yet updated the pod counts (In-Flight AW Jobs)
971-
// This hack uses the golang struct implied behavior of defining the object without a value. In this case
972-
// of using 'int32' novalue and value of 0 are the same.
973-
if value.Status.Pending == 0 && value.Status.Running == 0 && value.Status.Succeeded == 0 && value.Status.Failed == 0 {
974-
975-
// In some cases the object wrapped in the appwrapper never creates pod. This likely happens
976-
// in a custom resource that does some processing and errors occur before creating the pod or
977-
// even there is not a problem within the CR controler but when the K8s quota is hit not
978-
// allowing pods to get create due the admission controller. This check will now put a timeout
979-
// on reserving these resources that are "in-flight")
980-
dispatchedCond := qjm.getLatestStatusConditionType(value, arbv1.AppWrapperCondDispatched)
981-
982-
// If pod counts for AW have not updated within the timeout window, account for
983-
// this object's resources to give the object controller more time to start creating
984-
// pods. This matters when resources are scare. Once the timeout expires,
985-
// resources for this object will not be held and other AW may be dispatched which
986-
// could consume resources initially allocated for this object. This is to handle
987-
// object controllers (essentially custom resource controllers) that do not work as
988-
// expected by creating pods.
989-
if qjm.waitForPodCountUpdates(dispatchedCond) {
990-
for _, resctrl := range qjm.qjobResControls {
991-
qjv := resctrl.GetAggregatedResources(value)
992-
pending = pending.Add(qjv)
993-
klog.V(4).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v and status set to: %s but no pod counts in the state have been defined.", qjv, resctrl, value.Name, value.Status.CanRun, value.Status.State)
994-
}
995-
for _, genericItem := range value.Spec.AggrResources.GenericItems {
996-
qjv, _ := genericresource.GetResources(&genericItem)
997-
pending = pending.Add(qjv)
998-
klog.V(4).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v and status set to: %s but no pod counts in the state have been defined.", qjv, genericItem, value.Name, value.Status.CanRun, value.Status.State)
999-
}
1000-
} else {
1001-
klog.V(4).Infof("[getAggAvaiResPri] Resources will no longer be reserved for %s/%s due to timeout of %d ms for pod creating.", value.Name, value.Namespace, qjm.serverOption.DispatchResourceReservationTimeout)
1002-
}
1003-
}
956+
totalResource := qjm.addTotalSnapshotResourcesConsumedByAw(value.Status.TotalGPU, value.Status.TotalCPU, value.Status.TotalMemory)
957+
delta, err := qjv.NonNegSub(totalResource)
958+
if err != nil {
959+
klog.Warningf("[getAggAvaiResPri] Subtraction of resources failed, adding entire appwrapper resoources %v, %v", qjv, err)
960+
pending = qjv
1004961
}
962+
pending = pending.Add(delta)
1005963
continue
1006964
} else {
1007965
//Do nothing

0 commit comments

Comments
 (0)