Skip to content

Commit c4315c8

Browse files
authored
simplify cache (#587)
* simplify cache * fix test * fix test-1 * add polling logic * address review * preempt queue job runs at 60 sec interval
1 parent f34e1f5 commit c4315c8

File tree

2 files changed

+67
-14
lines changed

2 files changed

+67
-14
lines changed

Diff for: pkg/controller/queuejob/queuejob_controller_ex.go

+51-10
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ type XController struct {
108108
// QJ queue that needs to be allocated
109109
qjqueue SchedulingQueue
110110

111+
//TODO: Do we need this local cache?
111112
// our own local cache, used for computing total amount of resources
112113
cache clusterstatecache.Cache
113114

@@ -154,6 +155,46 @@ func GetQueueJobKey(obj interface{}) (string, error) {
154155
return fmt.Sprintf("%s/%s", qj.Namespace, qj.Name), nil
155156
}
156157

158+
//allocatableCapacity calculates the capacity available on each node by substracting resources
159+
//consumed by existing pods.
160+
//For a large cluster with thousands of nodes and hundreds of thousands of pods this
161+
//method could be a performance bottleneck
162+
//We can then move this method to a seperate thread that basically runs every X interval and
163+
//provides resources available to the next AW that needs to be dispatched.
164+
//Obviously the thread would need locking and timer to expire cache.
165+
//May be move to controller runtime can help.
166+
func (qjm *XController) allocatableCapacity() *clusterstateapi.Resource {
167+
capacity := clusterstateapi.EmptyResource()
168+
nodes, _ := qjm.clients.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{})
169+
startTime := time.Now()
170+
for _, node := range nodes.Items {
171+
// skip unschedulable nodes
172+
if node.Spec.Unschedulable {
173+
continue
174+
}
175+
nodeResource := clusterstateapi.NewResource(node.Status.Allocatable)
176+
capacity.Add(nodeResource)
177+
var specNodeName = "spec.nodeName"
178+
labelSelector := fmt.Sprintf("%s=%s", specNodeName, node.Name)
179+
podList, err := qjm.clients.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{FieldSelector: labelSelector})
180+
//TODO: when no pods are listed, do we send entire node capacity as available
181+
//this will cause false positive dispatch.
182+
if err != nil {
183+
klog.Errorf("[allocatableCapacity] Error listing pods %v", err)
184+
}
185+
for _, pod := range podList.Items {
186+
if _, ok := pod.GetLabels()["appwrappers.mcad.ibm.com"]; !ok && pod.Status.Phase != v1.PodFailed && pod.Status.Phase != v1.PodSucceeded {
187+
for _, container := range pod.Spec.Containers {
188+
usedResource := clusterstateapi.NewResource(container.Resources.Requests)
189+
capacity.Sub(usedResource)
190+
}
191+
}
192+
}
193+
}
194+
klog.Info("[allocatableCapacity] The avaible capacity to dispatch appwrapper is %v and time took to calculate is %v", capacity, time.Now().Sub(startTime))
195+
return capacity
196+
}
197+
157198
// NewJobController create new AppWrapper Controller
158199
func NewJobController(config *rest.Config, serverOption *options.ServerOption) *XController {
159200
cc := &XController{
@@ -166,8 +207,9 @@ func NewJobController(config *rest.Config, serverOption *options.ServerOption) *
166207
initQueue: cache.NewFIFO(GetQueueJobKey),
167208
updateQueue: cache.NewFIFO(GetQueueJobKey),
168209
qjqueue: NewSchedulingQueue(),
169-
cache: clusterstatecache.New(config),
170-
schedulingAW: nil,
210+
//TODO: do we still need cache to be initialized?
211+
cache: clusterstatecache.New(config),
212+
schedulingAW: nil,
171213
}
172214
//TODO: work on enabling metrics adapter for correct MCAD mode
173215
//metrics adapter is implemented through dynamic client which looks at all the
@@ -1098,26 +1140,24 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) {
10981140
if qjm.serverOption.DynamicPriority {
10991141
priorityindex = -math.MaxFloat64
11001142
}
1101-
//cache.go updatestate method fails resulting in empty resource object
1102-
//cache upate failure costly, as it will put current AW in backoff queue plus take another dispatch cycle
1103-
//In worst case the cache update could fail for subsequent dispatche cycles causing test cases to fail or AW never getting dispatched
1104-
//To avoid non-determinism below code is workaround. this should be issue should be fixed: https://github.com/project-codeflare/multi-cluster-app-dispatcher/issues/550
1143+
//cache now is a method inside the controller.
1144+
//The reimplementation should fix issue : https://github.com/project-codeflare/multi-cluster-app-dispatcher/issues/550
11051145
var unallocatedResources = clusterstateapi.EmptyResource()
1106-
unallocatedResources = qjm.cache.GetUnallocatedResources()
1146+
unallocatedResources = qjm.allocatableCapacity()
11071147
for unallocatedResources.IsEmpty() {
1108-
unallocatedResources.Add(qjm.cache.GetUnallocatedResources())
1148+
unallocatedResources.Add(qjm.allocatableCapacity())
11091149
if !unallocatedResources.IsEmpty() {
11101150
break
11111151
}
11121152
}
1113-
11141153
resources, proposedPreemptions := qjm.getAggregatedAvailableResourcesPriority(
11151154
unallocatedResources, priorityindex, qj, "")
11161155
klog.Infof("[ScheduleNext] [Agent Mode] Appwrapper '%s/%s' with resources %v to be scheduled on aggregated idle resources %v", qj.Namespace, qj.Name, aggqj, resources)
11171156

11181157
// Assume preemption will remove low priroity AWs in the system, optimistically dispatch such AWs
11191158

11201159
if aggqj.LessEqual(resources) {
1160+
//TODO: should we turn-off histograms?
11211161
unallocatedHistogramMap := qjm.cache.GetUnallocatedHistograms()
11221162
if !qjm.nodeChecks(unallocatedHistogramMap, qj) {
11231163
klog.Infof("[ScheduleNext] [Agent Mode] Optimistic dispatch for AW '%s/%s' requesting aggregated resources %v histogram for point in-time fragmented resources are available in the cluster %s",
@@ -1424,8 +1464,9 @@ func (cc *XController) Run(stopCh <-chan struct{}) {
14241464

14251465
cache.WaitForCacheSync(stopCh, cc.appWrapperSynced)
14261466

1467+
//TODO: do we still need to run cache every second?
14271468
// update snapshot of ClientStateCache every second
1428-
cc.cache.Run(stopCh)
1469+
//cc.cache.Run(stopCh)
14291470

14301471
// start preempt thread is used to preempt AWs that have partial pods or have reached dispatch duration
14311472
go wait.Until(cc.PreemptQueueJobs, 60*time.Second, stopCh)

Diff for: test/e2e/queue.go

+16-4
Original file line numberDiff line numberDiff line change
@@ -120,15 +120,15 @@ var _ = Describe("AppWrapper E2E Test", func() {
120120
// This should fill up the worker node and most of the master node
121121
aw := createDeploymentAWwith550CPU(context, appendRandomString("aw-deployment-2-550cpu"))
122122
appwrappers = append(appwrappers, aw)
123-
time.Sleep(1 * time.Minute)
124123
err := waitAWPodsReady(context, aw)
125124
Expect(err).NotTo(HaveOccurred())
126125

127126
// This should not fit on cluster
128127
aw2 := createDeploymentAWwith426CPU(context, appendRandomString("aw-deployment-2-426cpu"))
129128
appwrappers = append(appwrappers, aw2)
130129
err = waitAWAnyPodsExists(context, aw2)
131-
Expect(err).NotTo(HaveOccurred())
130+
//With improved accounting, no pods will be spawned
131+
Expect(err).To(HaveOccurred())
132132

133133
// This should fit on cluster, initially queued because of aw2 above but should eventually
134134
// run after prevention of aw2 above.
@@ -174,7 +174,7 @@ var _ = Describe("AppWrapper E2E Test", func() {
174174
aw := createJobAWWithInitContainer(context, "aw-job-3-init-container", 1, "none", 3)
175175
appwrappers = append(appwrappers, aw)
176176

177-
err := waitAWPodsCompleted(context, aw, 300*time.Second)
177+
err := waitAWPodsCompleted(context, aw, 200*time.Second)
178178
Expect(err).To(HaveOccurred())
179179
})
180180

@@ -713,8 +713,20 @@ var _ = Describe("AppWrapper E2E Test", func() {
713713
Expect(err1).NotTo(HaveOccurred(), "Expecting pods to be ready for app wrapper: aw-deployment-rhc")
714714
aw1, err := context.karclient.McadV1beta1().AppWrappers(aw.Namespace).Get(context.ctx, aw.Name, metav1.GetOptions{})
715715
Expect(err).NotTo(HaveOccurred(), "Expecting to get app wrapper status")
716+
pass := false
717+
for true {
718+
aw1, err := context.karclient.McadV1beta1().AppWrappers(aw.Namespace).Get(context.ctx, aw.Name, metav1.GetOptions{})
719+
if err != nil {
720+
fmt.Fprint(GinkgoWriter, "Error getting status")
721+
}
722+
fmt.Fprintf(GinkgoWriter, "[e2e] status of AW %v.\n", aw1.Status.State)
723+
if aw1.Status.State == arbv1.AppWrapperStateRunningHoldCompletion {
724+
pass = true
725+
break
726+
}
727+
}
716728
fmt.Fprintf(GinkgoWriter, "[e2e] status of AW %v.\n", aw1.Status.State)
717-
Expect(aw1.Status.State).To(Equal(arbv1.AppWrapperStateRunningHoldCompletion))
729+
Expect(pass).To(BeTrue())
718730
fmt.Fprintf(os.Stdout, "[e2e] MCAD Deployment RuningHoldCompletion Test - Completed. Awaiting app wrapper cleanup.\n")
719731
})
720732

0 commit comments

Comments
 (0)