Skip to content

Commit 44696cf

Browse files
authored
remove pod informer (#601)
* remove pod informer * remove unneeded interface
1 parent ccd2acf commit 44696cf

File tree

6 files changed

+126
-444
lines changed

6 files changed

+126
-444
lines changed

Diff for: pkg/controller/queuejob/queuejob_controller_ex.go

+63-24
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,7 @@ import (
5454

5555
v1 "k8s.io/api/core/v1"
5656

57-
"github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/queuejobresources"
5857
"github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/queuejobresources/genericresource"
59-
respod "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/queuejobresources/pod"
6058
"k8s.io/apimachinery/pkg/labels"
6159

6260
arbv1 "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/apis/controller/v1beta1"
@@ -79,9 +77,9 @@ type XController struct {
7977

8078
appwrapperInformer arbinformers.AppWrapperInformer
8179
// resources registered for the AppWrapper
82-
qjobRegisteredResources queuejobresources.RegisteredResources
80+
//qjobRegisteredResources queuejobresources.RegisteredResources
8381
// controllers for these resources
84-
qjobResControls map[arbv1.ResourceType]queuejobresources.Interface
82+
//qjobResControls map[arbv1.ResourceType]queuejobresources.Interface
8583

8684
// Captures all available resources in the cluster
8785
genericresources *genericresource.GenericResources
@@ -140,9 +138,9 @@ type JobAndClusterAgent struct {
140138
}
141139

142140
// RegisterAllQueueJobResourceTypes - registers all resources
143-
func RegisterAllQueueJobResourceTypes(regs *queuejobresources.RegisteredResources) {
144-
respod.Register(regs)
145-
}
141+
// func RegisterAllQueueJobResourceTypes(regs *queuejobresources.RegisteredResources) {
142+
// respod.Register(regs)
143+
// }
146144

147145
func GetQueueJobKey(obj interface{}) (string, error) {
148146
qj, ok := obj.(*arbv1.AppWrapper)
@@ -153,6 +151,47 @@ func GetQueueJobKey(obj interface{}) (string, error) {
153151
return fmt.Sprintf("%s/%s", qj.Namespace, qj.Name), nil
154152
}
155153

154+
//UpdateQueueJobStatus was part of pod informer, this is now a method of queuejob_controller file.
155+
//This change is done in an effort to simplify the controller and enable to move to controller runtime.
156+
func (qjm *XController) UpdateQueueJobStatus(queuejob *arbv1.AppWrapper) error {
157+
158+
labelSelector := fmt.Sprintf("%s=%s", "appwrapper.mcad.ibm.com", queuejob.Name)
159+
pods, errt := qjm.clients.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{LabelSelector: labelSelector})
160+
if errt != nil {
161+
return errt
162+
}
163+
164+
running := int32(FilterPods(pods.Items, v1.PodRunning))
165+
podPhases := []v1.PodPhase{v1.PodRunning, v1.PodSucceeded}
166+
totalResourcesConsumedForPodPhases := clusterstateapi.EmptyResource()
167+
for _, phase := range podPhases {
168+
totalResourcesConsumedForPodPhases.Add(GetPodResourcesByPhase(phase, pods.Items))
169+
}
170+
pending := int32(FilterPods(pods.Items, v1.PodPending))
171+
succeeded := int32(FilterPods(pods.Items, v1.PodSucceeded))
172+
failed := int32(FilterPods(pods.Items, v1.PodFailed))
173+
podsConditionMap := PendingPodsFailedSchd(pods.Items)
174+
klog.V(10).Infof("[UpdateQueueJobStatus] There are %d pods of AppWrapper %s: pending %d, running %d, succeeded %d, failed %d, pendingpodsfailedschd %d, total resource consumed %v",
175+
len(pods.Items), queuejob.Name, pending, running, succeeded, failed, len(podsConditionMap), totalResourcesConsumedForPodPhases)
176+
177+
queuejob.Status.Pending = pending
178+
queuejob.Status.Running = running
179+
queuejob.Status.Succeeded = succeeded
180+
queuejob.Status.Failed = failed
181+
// Total resources by all running pods
182+
queuejob.Status.TotalGPU = int32(totalResourcesConsumedForPodPhases.GPU)
183+
queuejob.Status.TotalCPU = int32(totalResourcesConsumedForPodPhases.MilliCPU)
184+
queuejob.Status.TotalMemory = int32(totalResourcesConsumedForPodPhases.Memory)
185+
186+
queuejob.Status.PendingPodConditions = nil
187+
for podName, cond := range podsConditionMap {
188+
podCond := GeneratePodFailedCondition(podName, cond)
189+
queuejob.Status.PendingPodConditions = append(queuejob.Status.PendingPodConditions, podCond)
190+
}
191+
192+
return nil
193+
}
194+
156195
//allocatableCapacity calculates the capacity available on each node by substracting resources
157196
//consumed by existing pods.
158197
//For a large cluster with thousands of nodes and hundreds of thousands of pods this
@@ -217,20 +256,20 @@ func NewJobController(config *rest.Config, serverOption *options.ServerOption) *
217256

218257
cc.genericresources = genericresource.NewAppWrapperGenericResource(config)
219258

220-
cc.qjobResControls = map[arbv1.ResourceType]queuejobresources.Interface{}
221-
RegisterAllQueueJobResourceTypes(&cc.qjobRegisteredResources)
259+
//cc.qjobResControls = map[arbv1.ResourceType]queuejobresources.Interface{}
260+
//RegisterAllQueueJobResourceTypes(&cc.qjobRegisteredResources)
222261

223262
// initialize pod sub-resource control
224-
resControlPod, found, err := cc.qjobRegisteredResources.InitQueueJobResource(arbv1.ResourceTypePod, config)
225-
if err != nil {
226-
klog.Errorf("fail to create queuejob resource control")
227-
return nil
228-
}
229-
if !found {
230-
klog.Errorf("queuejob resource type Pod not found")
231-
return nil
232-
}
233-
cc.qjobResControls[arbv1.ResourceTypePod] = resControlPod
263+
// resControlPod, found, err := cc.qjobRegisteredResources.InitQueueJobResource(arbv1.ResourceTypePod, config)
264+
// if err != nil {
265+
// klog.Errorf("fail to create queuejob resource control")
266+
// return nil
267+
// }
268+
// if !found {
269+
// klog.Errorf("queuejob resource type Pod not found")
270+
// return nil
271+
// }
272+
// cc.qjobResControls[arbv1.ResourceTypePod] = resControlPod
234273

235274
appWrapperClient, err := clientset.NewForConfig(cc.config)
236275
if err != nil {
@@ -816,7 +855,7 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust
816855

817856
}
818857

819-
err := qjm.qjobResControls[arbv1.ResourceTypePod].UpdateQueueJobStatus(value)
858+
err := qjm.UpdateQueueJobStatus(value)
820859
if err != nil {
821860
klog.Warningf("[getAggAvaiResPri] Error updating pod status counts for AppWrapper job: %s, err=%+v", value.Name, err)
822861
}
@@ -843,7 +882,7 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust
843882
klog.V(10).Infof("[getAggAvaiResPri] Subtract all resources %+v in genericItem=%T for job %s which can-run is set to: %v but state is still pending.", qjv, genericItem, value.Name, value.Status.CanRun)
844883
}
845884

846-
err := qjm.qjobResControls[arbv1.ResourceTypePod].UpdateQueueJobStatus(value)
885+
err := qjm.UpdateQueueJobStatus(value)
847886
if err != nil {
848887
klog.Warningf("[getAggAvaiResPri] Error updating pod status counts for AppWrapper job: %s, err=%+v", value.Name, err)
849888
}
@@ -1458,7 +1497,7 @@ func (qjm *XController) backoff(ctx context.Context, q *arbv1.AppWrapper, reason
14581497
func (cc *XController) Run(stopCh <-chan struct{}) {
14591498
go cc.appwrapperInformer.Informer().Run(stopCh)
14601499

1461-
go cc.qjobResControls[arbv1.ResourceTypePod].Run(stopCh)
1500+
//go cc.qjobResControls[arbv1.ResourceTypePod].Run(stopCh)
14621501

14631502
cache.WaitForCacheSync(stopCh, cc.appWrapperSynced)
14641503

@@ -1508,7 +1547,7 @@ func (qjm *XController) UpdateQueueJobs() {
15081547
}
15091548
}
15101549
if (newjob.Status.State == arbv1.AppWrapperStateActive || newjob.Status.State == arbv1.AppWrapperStateRunningHoldCompletion) && containsCompletionStatus {
1511-
err := qjm.qjobResControls[arbv1.ResourceTypePod].UpdateQueueJobStatus(newjob)
1550+
err := qjm.UpdateQueueJobStatus(newjob)
15121551
if err != nil {
15131552
klog.Errorf("[UpdateQueueJobs] Error updating pod status counts for AppWrapper job: %s, err=%+v", newjob.Name, err)
15141553
continue
@@ -1911,7 +1950,7 @@ func (cc *XController) syncQueueJob(ctx context.Context, qj *arbv1.AppWrapper) e
19111950
awNew := qj.DeepCopy()
19121951
// we call sync to update pods running, pending,...
19131952
if qj.Status.State == arbv1.AppWrapperStateActive {
1914-
err := cc.qjobResControls[arbv1.ResourceTypePod].UpdateQueueJobStatus(awNew)
1953+
err := cc.UpdateQueueJobStatus(awNew)
19151954
if err != nil {
19161955
klog.Errorf("[syncQueueJob] Error updating pod status counts for AppWrapper job: %s, err=%+v", qj.Name, err)
19171956
return err

Diff for: pkg/controller/queuejob/utils.go

+63
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,14 @@ limitations under the License.
1717
package queuejob
1818

1919
import (
20+
"strings"
21+
2022
corev1 "k8s.io/api/core/v1"
23+
v1 "k8s.io/api/core/v1"
2124
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2225

2326
arbv1 "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/apis/controller/v1beta1"
27+
clusterstateapi "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/clusterstate/api"
2428
)
2529

2630
func GetXQJFullName(qj *arbv1.AppWrapper) string {
@@ -77,3 +81,62 @@ func getIndexOfMatchedCondition(aw *arbv1.AppWrapper, condType arbv1.AppWrapperC
7781
}
7882
return index
7983
}
84+
85+
// PendingPodsFailedSchd checks if pods pending have failed scheduling
86+
func PendingPodsFailedSchd(pods []v1.Pod) map[string][]v1.PodCondition {
87+
var podCondition = make(map[string][]v1.PodCondition)
88+
for i := range pods {
89+
if pods[i].Status.Phase == v1.PodPending {
90+
for _, cond := range pods[i].Status.Conditions {
91+
// Hack: ignore pending pods due to co-scheduler FailedScheduling event
92+
// this exists until coscheduler performance issue is resolved.
93+
if cond.Type == v1.PodScheduled && cond.Status == v1.ConditionFalse && cond.Reason == v1.PodReasonUnschedulable {
94+
if strings.Contains(cond.Message, "pgName") && strings.Contains(cond.Message, "last") && strings.Contains(cond.Message, "failed") && strings.Contains(cond.Message, "deny") {
95+
// ignore co-scheduled pending pods for coscheduler version:0.22.6
96+
continue
97+
} else if strings.Contains(cond.Message, "optimistic") && strings.Contains(cond.Message, "rejection") && strings.Contains(cond.Message, "PostFilter") ||
98+
strings.Contains(cond.Message, "cannot") && strings.Contains(cond.Message, "find") && strings.Contains(cond.Message, "enough") && strings.Contains(cond.Message, "sibling") {
99+
// ignore co-scheduled pending pods for coscheduler version:0.23.10
100+
continue
101+
} else {
102+
podName := pods[i].Name
103+
podCondition[podName] = append(podCondition[podName], *cond.DeepCopy())
104+
}
105+
}
106+
}
107+
}
108+
}
109+
return podCondition
110+
}
111+
112+
// filterPods returns pods based on their phase.
113+
func FilterPods(pods []v1.Pod, phase v1.PodPhase) int {
114+
result := 0
115+
for i := range pods {
116+
if phase == pods[i].Status.Phase {
117+
result++
118+
}
119+
}
120+
return result
121+
}
122+
123+
//GetPodResourcesByPhase returns pods based on their phase.
124+
func GetPodResourcesByPhase(phase v1.PodPhase, pods []v1.Pod) *clusterstateapi.Resource {
125+
req := clusterstateapi.EmptyResource()
126+
for i := range pods {
127+
if pods[i].Status.Phase == phase {
128+
for _, c := range pods[i].Spec.Containers {
129+
req.Add(clusterstateapi.NewResource(c.Resources.Requests))
130+
}
131+
}
132+
}
133+
return req
134+
}
135+
136+
//GeneratePodFailedCondition returns condition of a AppWrapper condition.
137+
func GeneratePodFailedCondition(podName string, podCondition []v1.PodCondition) arbv1.PendingPodSpec {
138+
return arbv1.PendingPodSpec{
139+
PodName: podName,
140+
Conditions: podCondition,
141+
}
142+
}

Diff for: pkg/controller/queuejobresources/interfaces.go

-29
This file was deleted.

0 commit comments

Comments
 (0)