Skip to content

simplify cache #587

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Aug 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 51 additions & 10 deletions pkg/controller/queuejob/queuejob_controller_ex.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ type XController struct {
// QJ queue that needs to be allocated
qjqueue SchedulingQueue

//TODO: Do we need this local cache?
// our own local cache, used for computing total amount of resources
cache clusterstatecache.Cache

Expand Down Expand Up @@ -154,6 +155,46 @@ func GetQueueJobKey(obj interface{}) (string, error) {
return fmt.Sprintf("%s/%s", qj.Namespace, qj.Name), nil
}

//allocatableCapacity calculates the capacity available on each node by substracting resources
//consumed by existing pods.
//For a large cluster with thousands of nodes and hundreds of thousands of pods this
//method could be a performance bottleneck
//We can then move this method to a seperate thread that basically runs every X interval and
//provides resources available to the next AW that needs to be dispatched.
//Obviously the thread would need locking and timer to expire cache.
//May be move to controller runtime can help.
func (qjm *XController) allocatableCapacity() *clusterstateapi.Resource {
capacity := clusterstateapi.EmptyResource()
nodes, _ := qjm.clients.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{})
startTime := time.Now()
for _, node := range nodes.Items {
// skip unschedulable nodes
if node.Spec.Unschedulable {
continue
}
nodeResource := clusterstateapi.NewResource(node.Status.Allocatable)
capacity.Add(nodeResource)
var specNodeName = "spec.nodeName"
labelSelector := fmt.Sprintf("%s=%s", specNodeName, node.Name)
podList, err := qjm.clients.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{FieldSelector: labelSelector})
//TODO: when no pods are listed, do we send entire node capacity as available
//this will cause false positive dispatch.
if err != nil {
klog.Errorf("[allocatableCapacity] Error listing pods %v", err)
}
for _, pod := range podList.Items {
if _, ok := pod.GetLabels()["appwrappers.mcad.ibm.com"]; !ok && pod.Status.Phase != v1.PodFailed && pod.Status.Phase != v1.PodSucceeded {
for _, container := range pod.Spec.Containers {
usedResource := clusterstateapi.NewResource(container.Resources.Requests)
capacity.Sub(usedResource)
}
}
}
}
klog.Info("[allocatableCapacity] The avaible capacity to dispatch appwrapper is %v and time took to calculate is %v", capacity, time.Now().Sub(startTime))
return capacity
}

// NewJobController create new AppWrapper Controller
func NewJobController(config *rest.Config, serverOption *options.ServerOption) *XController {
cc := &XController{
Expand All @@ -166,8 +207,9 @@ func NewJobController(config *rest.Config, serverOption *options.ServerOption) *
initQueue: cache.NewFIFO(GetQueueJobKey),
updateQueue: cache.NewFIFO(GetQueueJobKey),
qjqueue: NewSchedulingQueue(),
cache: clusterstatecache.New(config),
schedulingAW: nil,
//TODO: do we still need cache to be initialized?
cache: clusterstatecache.New(config),
schedulingAW: nil,
}
//TODO: work on enabling metrics adapter for correct MCAD mode
//metrics adapter is implemented through dynamic client which looks at all the
Expand Down Expand Up @@ -1098,26 +1140,24 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) {
if qjm.serverOption.DynamicPriority {
priorityindex = -math.MaxFloat64
}
//cache.go updatestate method fails resulting in empty resource object
//cache upate failure costly, as it will put current AW in backoff queue plus take another dispatch cycle
//In worst case the cache update could fail for subsequent dispatche cycles causing test cases to fail or AW never getting dispatched
//To avoid non-determinism below code is workaround. this should be issue should be fixed: https://github.com/project-codeflare/multi-cluster-app-dispatcher/issues/550
//cache now is a method inside the controller.
//The reimplementation should fix issue : https://github.com/project-codeflare/multi-cluster-app-dispatcher/issues/550
var unallocatedResources = clusterstateapi.EmptyResource()
unallocatedResources = qjm.cache.GetUnallocatedResources()
unallocatedResources = qjm.allocatableCapacity()
for unallocatedResources.IsEmpty() {
unallocatedResources.Add(qjm.cache.GetUnallocatedResources())
unallocatedResources.Add(qjm.allocatableCapacity())
if !unallocatedResources.IsEmpty() {
break
}
}

resources, proposedPreemptions := qjm.getAggregatedAvailableResourcesPriority(
unallocatedResources, priorityindex, qj, "")
klog.Infof("[ScheduleNext] [Agent Mode] Appwrapper '%s/%s' with resources %v to be scheduled on aggregated idle resources %v", qj.Namespace, qj.Name, aggqj, resources)

// Assume preemption will remove low priroity AWs in the system, optimistically dispatch such AWs

if aggqj.LessEqual(resources) {
//TODO: should we turn-off histograms?
unallocatedHistogramMap := qjm.cache.GetUnallocatedHistograms()
if !qjm.nodeChecks(unallocatedHistogramMap, qj) {
klog.Infof("[ScheduleNext] [Agent Mode] Optimistic dispatch for AW '%s/%s' requesting aggregated resources %v histogram for point in-time fragmented resources are available in the cluster %s",
Expand Down Expand Up @@ -1424,8 +1464,9 @@ func (cc *XController) Run(stopCh <-chan struct{}) {

cache.WaitForCacheSync(stopCh, cc.appWrapperSynced)

//TODO: do we still need to run cache every second?
// update snapshot of ClientStateCache every second
cc.cache.Run(stopCh)
//cc.cache.Run(stopCh)

// start preempt thread is used to preempt AWs that have partial pods or have reached dispatch duration
go wait.Until(cc.PreemptQueueJobs, 60*time.Second, stopCh)
Expand Down
20 changes: 16 additions & 4 deletions test/e2e/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,15 @@ var _ = Describe("AppWrapper E2E Test", func() {
// This should fill up the worker node and most of the master node
aw := createDeploymentAWwith550CPU(context, appendRandomString("aw-deployment-2-550cpu"))
appwrappers = append(appwrappers, aw)
time.Sleep(1 * time.Minute)
err := waitAWPodsReady(context, aw)
Expect(err).NotTo(HaveOccurred())

// This should not fit on cluster
aw2 := createDeploymentAWwith426CPU(context, appendRandomString("aw-deployment-2-426cpu"))
appwrappers = append(appwrappers, aw2)
err = waitAWAnyPodsExists(context, aw2)
Expect(err).NotTo(HaveOccurred())
//With improved accounting, no pods will be spawned
Expect(err).To(HaveOccurred())

// This should fit on cluster, initially queued because of aw2 above but should eventually
// run after prevention of aw2 above.
Expand Down Expand Up @@ -174,7 +174,7 @@ var _ = Describe("AppWrapper E2E Test", func() {
aw := createJobAWWithInitContainer(context, "aw-job-3-init-container", 1, "none", 3)
appwrappers = append(appwrappers, aw)

err := waitAWPodsCompleted(context, aw, 300*time.Second)
err := waitAWPodsCompleted(context, aw, 200*time.Second)
Expect(err).To(HaveOccurred())
})

Expand Down Expand Up @@ -713,8 +713,20 @@ var _ = Describe("AppWrapper E2E Test", func() {
Expect(err1).NotTo(HaveOccurred(), "Expecting pods to be ready for app wrapper: aw-deployment-rhc")
aw1, err := context.karclient.McadV1beta1().AppWrappers(aw.Namespace).Get(context.ctx, aw.Name, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred(), "Expecting to get app wrapper status")
pass := false
for true {
aw1, err := context.karclient.McadV1beta1().AppWrappers(aw.Namespace).Get(context.ctx, aw.Name, metav1.GetOptions{})
if err != nil {
fmt.Fprint(GinkgoWriter, "Error getting status")
}
fmt.Fprintf(GinkgoWriter, "[e2e] status of AW %v.\n", aw1.Status.State)
if aw1.Status.State == arbv1.AppWrapperStateRunningHoldCompletion {
pass = true
break
}
}
fmt.Fprintf(GinkgoWriter, "[e2e] status of AW %v.\n", aw1.Status.State)
Expect(aw1.Status.State).To(Equal(arbv1.AppWrapperStateRunningHoldCompletion))
Expect(pass).To(BeTrue())
fmt.Fprintf(os.Stdout, "[e2e] MCAD Deployment RuningHoldCompletion Test - Completed. Awaiting app wrapper cleanup.\n")
})

Expand Down